Skip to main content

fathomdb_engine/writer/
mod.rs

1mod fts_extract;
2
3// Re-export fts_extract items at the writer module level so that external
4// callers (projection.rs, admin.rs, coordinator.rs, rebuild_actor.rs) can
5// still access them via `crate::writer::*`.
6// LEAF_SEPARATOR is used in coordinator.rs and in the test submodule below;
7// the allow attr silences the per-file unused-import lint since Rust does not
8// cross-file-lint pub(crate) re-exports.
9#[allow(unused_imports)]
10pub(crate) use fts_extract::{
11    ExtractStats, LEAF_SEPARATOR, PositionEntry, PropertyFtsSchema, PropertyPathMode,
12    extract_property_fts, extract_property_fts_columns, load_fts_property_schemas,
13    parse_property_schema_json,
14};
15
16// Re-export items that are only needed within this module's test submodules.
17#[cfg(test)]
18#[allow(unused_imports)]
19pub(crate) use fts_extract::{
20    MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PropertyPathEntry, extract_json_path,
21};
22
23use std::collections::HashMap;
24use std::mem::ManuallyDrop;
25use std::panic::AssertUnwindSafe;
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::mpsc::{self, Sender, SyncSender};
29use std::thread;
30use std::time::Duration;
31
32use fathomdb_schema::{DEFAULT_FTS_TOKENIZER, SchemaManager};
33use rusqlite::{OptionalExtension, TransactionBehavior, params};
34
35use crate::operational::{
36    OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
37    OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
38    OperationalValidationMode, extract_secondary_index_entries_for_current,
39    extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
40    parse_operational_validation_contract, validate_operational_payload_against_contract,
41};
42use crate::telemetry::TelemetryCounters;
43use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
44
45/// A deferred projection backfill task submitted alongside a write.
46#[derive(Clone, Debug, PartialEq, Eq)]
47pub struct OptionalProjectionTask {
48    /// Which projection to backfill (FTS, vec, etc.).
49    pub target: ProjectionTarget,
50    /// JSON payload describing the backfill work.
51    pub payload: String,
52}
53
54/// Policy for handling existing chunks when upserting a node.
55#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
56pub enum ChunkPolicy {
57    /// Keep existing chunks and FTS rows.
58    #[default]
59    Preserve,
60    /// Delete existing chunks and FTS rows before inserting new ones.
61    Replace,
62}
63
64/// Controls how missing `source_ref` values are handled at write time.
65#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
66pub enum ProvenanceMode {
67    /// Emit a warning in the [`WriteReceipt`] but allow the write to proceed.
68    #[default]
69    Warn,
70    /// Reject the write with [`EngineError::InvalidWrite`] if any canonical
71    /// insert or retire is missing `source_ref`.
72    Require,
73}
74
75/// A node to be inserted in a [`WriteRequest`].
76#[derive(Clone, Debug, PartialEq, Eq)]
77pub struct NodeInsert {
78    pub row_id: String,
79    pub logical_id: String,
80    pub kind: String,
81    pub properties: String,
82    pub source_ref: Option<String>,
83    /// When true the writer supersedes the current active row for this `logical_id`
84    /// before inserting this new version. The supersession and insert are atomic.
85    pub upsert: bool,
86    /// Controls whether existing chunks and FTS rows are deleted when upsert=true.
87    pub chunk_policy: ChunkPolicy,
88    /// Optional URI referencing external content (e.g. a PDF, web page, or dataset).
89    pub content_ref: Option<String>,
90}
91
92/// An edge to be inserted in a [`WriteRequest`].
93#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeInsert {
95    pub row_id: String,
96    pub logical_id: String,
97    pub source_logical_id: String,
98    pub target_logical_id: String,
99    pub kind: String,
100    pub properties: String,
101    pub source_ref: Option<String>,
102    /// When true the writer supersedes the current active edge for this `logical_id`
103    /// before inserting this new version. The supersession and insert are atomic.
104    pub upsert: bool,
105}
106
107/// A node to be retired (soft-deleted) in a [`WriteRequest`].
108#[derive(Clone, Debug, PartialEq, Eq)]
109pub struct NodeRetire {
110    pub logical_id: String,
111    pub source_ref: Option<String>,
112}
113
114/// An edge to be retired (soft-deleted) in a [`WriteRequest`].
115#[derive(Clone, Debug, PartialEq, Eq)]
116pub struct EdgeRetire {
117    pub logical_id: String,
118    pub source_ref: Option<String>,
119}
120
121/// A text chunk to be inserted in a [`WriteRequest`].
122#[derive(Clone, Debug, PartialEq, Eq)]
123pub struct ChunkInsert {
124    pub id: String,
125    pub node_logical_id: String,
126    pub text_content: String,
127    pub byte_start: Option<i64>,
128    pub byte_end: Option<i64>,
129    /// Optional hash of the external content this chunk was derived from.
130    pub content_hash: Option<String>,
131}
132
133/// A vector embedding to attach to an existing chunk.
134///
135/// The `chunk_id` must reference a chunk already present in the database or
136/// co-submitted in the same [`WriteRequest`].  The embedding is stored in the
137/// `vec_nodes_active` virtual table when the `sqlite-vec` feature is enabled;
138/// without the feature the insert is silently skipped.
139#[derive(Clone, Debug, PartialEq)]
140pub struct VecInsert {
141    pub chunk_id: String,
142    pub embedding: Vec<f32>,
143}
144
145/// A mutation to an operational collection submitted as part of a write request.
146#[derive(Clone, Debug, PartialEq, Eq)]
147pub enum OperationalWrite {
148    Append {
149        collection: String,
150        record_key: String,
151        payload_json: String,
152        source_ref: Option<String>,
153    },
154    Put {
155        collection: String,
156        record_key: String,
157        payload_json: String,
158        source_ref: Option<String>,
159    },
160    Delete {
161        collection: String,
162        record_key: String,
163        source_ref: Option<String>,
164    },
165}
166
167/// A run to be inserted in a [`WriteRequest`].
168#[derive(Clone, Debug, PartialEq, Eq)]
169pub struct RunInsert {
170    pub id: String,
171    pub kind: String,
172    pub status: String,
173    pub properties: String,
174    pub source_ref: Option<String>,
175    pub upsert: bool,
176    pub supersedes_id: Option<String>,
177}
178
179/// A step to be inserted in a [`WriteRequest`].
180#[derive(Clone, Debug, PartialEq, Eq)]
181pub struct StepInsert {
182    pub id: String,
183    pub run_id: String,
184    pub kind: String,
185    pub status: String,
186    pub properties: String,
187    pub source_ref: Option<String>,
188    pub upsert: bool,
189    pub supersedes_id: Option<String>,
190}
191
192/// An action to be inserted in a [`WriteRequest`].
193#[derive(Clone, Debug, PartialEq, Eq)]
194pub struct ActionInsert {
195    pub id: String,
196    pub step_id: String,
197    pub kind: String,
198    pub status: String,
199    pub properties: String,
200    pub source_ref: Option<String>,
201    pub upsert: bool,
202    pub supersedes_id: Option<String>,
203}
204
205// --- Write-request size limits ---
206const MAX_NODES: usize = 10_000;
207const MAX_EDGES: usize = 10_000;
208const MAX_CHUNKS: usize = 50_000;
209const MAX_RETIRES: usize = 10_000;
210const MAX_RUNTIME_ITEMS: usize = 10_000;
211const MAX_OPERATIONAL: usize = 10_000;
212const MAX_TOTAL_ITEMS: usize = 100_000;
213
214/// How long `submit` / `touch_last_accessed` wait for the writer thread to reply.
215///
216/// After this timeout the caller receives [`EngineError::WriterTimedOut`] — the
217/// writer thread is **not** cancelled, so the write may still commit.  This is
218/// intentionally distinct from [`EngineError::WriterRejected`], which signals
219/// that the writer thread has disconnected and the write will **not** commit.
220const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
221
222/// A batch of graph mutations to be applied atomically in a single `SQLite` transaction.
223#[derive(Clone, Debug, PartialEq)]
224pub struct WriteRequest {
225    pub label: String,
226    pub nodes: Vec<NodeInsert>,
227    pub node_retires: Vec<NodeRetire>,
228    pub edges: Vec<EdgeInsert>,
229    pub edge_retires: Vec<EdgeRetire>,
230    pub chunks: Vec<ChunkInsert>,
231    pub runs: Vec<RunInsert>,
232    pub steps: Vec<StepInsert>,
233    pub actions: Vec<ActionInsert>,
234    pub optional_backfills: Vec<OptionalProjectionTask>,
235    /// Vector embeddings to persist alongside chunks.  Silently skipped when the
236    /// `sqlite-vec` feature is absent.
237    pub vec_inserts: Vec<VecInsert>,
238    pub operational_writes: Vec<OperationalWrite>,
239}
240
241/// Receipt returned after a successful write transaction.
242#[derive(Clone, Debug, PartialEq, Eq)]
243pub struct WriteReceipt {
244    pub label: String,
245    pub optional_backfill_count: usize,
246    pub warnings: Vec<String>,
247    pub provenance_warnings: Vec<String>,
248}
249
250/// Request to update `last_accessed_at` timestamps for a batch of nodes.
251#[derive(Clone, Debug, PartialEq, Eq)]
252pub struct LastAccessTouchRequest {
253    pub logical_ids: Vec<String>,
254    pub touched_at: i64,
255    pub source_ref: Option<String>,
256}
257
258/// Report from a last-access touch operation.
259#[derive(Clone, Debug, PartialEq, Eq)]
260pub struct LastAccessTouchReport {
261    pub touched_logical_ids: usize,
262    pub touched_at: i64,
263}
264
265#[derive(Clone, Debug, PartialEq, Eq)]
266struct FtsProjectionRow {
267    chunk_id: String,
268    node_logical_id: String,
269    kind: String,
270    text_content: String,
271}
272
273#[derive(Clone, Debug, PartialEq, Eq)]
274struct FtsPropertyProjectionRow {
275    node_logical_id: String,
276    kind: String,
277    text_content: String,
278    positions: Vec<PositionEntry>,
279    /// Per-column text extracted for weighted schemas. Empty for non-weighted schemas.
280    /// When non-empty, the writer uses per-column INSERT instead of single `text_content`.
281    columns: Vec<(String, String)>,
282}
283
284struct PreparedWrite {
285    label: String,
286    nodes: Vec<NodeInsert>,
287    node_retires: Vec<NodeRetire>,
288    edges: Vec<EdgeInsert>,
289    edge_retires: Vec<EdgeRetire>,
290    chunks: Vec<ChunkInsert>,
291    runs: Vec<RunInsert>,
292    steps: Vec<StepInsert>,
293    actions: Vec<ActionInsert>,
294    /// Suppressed when sqlite-vec feature is absent: field is set by `prepare_write` but only
295    /// consumed by the cfg-gated apply block.
296    #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
297    vec_inserts: Vec<VecInsert>,
298    operational_writes: Vec<OperationalWrite>,
299    operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
300    operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
301    operational_validation_warnings: Vec<String>,
302    /// `node_logical_id` → kind for nodes co-submitted in this request.
303    /// Used by `resolve_fts_rows` to avoid a DB round-trip for the common case.
304    node_kinds: HashMap<String, String>,
305    /// Filled in by `resolve_fts_rows` in the writer thread before BEGIN IMMEDIATE.
306    required_fts_rows: Vec<FtsProjectionRow>,
307    /// Filled in by `resolve_property_fts_rows` in the writer thread before BEGIN IMMEDIATE.
308    required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
309    optional_backfills: Vec<OptionalProjectionTask>,
310}
311
312enum WriteMessage {
313    Submit {
314        prepared: Box<PreparedWrite>,
315        reply: Sender<Result<WriteReceipt, EngineError>>,
316    },
317    TouchLastAccessed {
318        request: LastAccessTouchRequest,
319        reply: Sender<Result<LastAccessTouchReport, EngineError>>,
320    },
321}
322
323/// Single-threaded writer that serializes all mutations through one `SQLite` connection.
324///
325/// On drop, the channel is closed and the writer thread is joined, ensuring all
326/// in-flight writes complete and the `SQLite` connection closes cleanly.
327#[derive(Debug)]
328pub struct WriterActor {
329    sender: ManuallyDrop<SyncSender<WriteMessage>>,
330    thread_handle: Option<thread::JoinHandle<()>>,
331    provenance_mode: ProvenanceMode,
332    // Retained for Phase 2 (statement-level telemetry from WriterActor methods).
333    _telemetry: Arc<TelemetryCounters>,
334}
335
336impl WriterActor {
337    /// # Errors
338    /// Returns [`EngineError`] if the writer thread cannot be spawned.
339    pub fn start(
340        path: impl AsRef<Path>,
341        schema_manager: Arc<SchemaManager>,
342        provenance_mode: ProvenanceMode,
343        telemetry: Arc<TelemetryCounters>,
344    ) -> Result<Self, EngineError> {
345        let database_path = path.as_ref().to_path_buf();
346        let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
347
348        let writer_telemetry = Arc::clone(&telemetry);
349        let handle = thread::Builder::new()
350            .name("fathomdb-writer".to_owned())
351            .spawn(move || {
352                writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
353            })
354            .map_err(EngineError::Io)?;
355
356        Ok(Self {
357            sender: ManuallyDrop::new(sender),
358            thread_handle: Some(handle),
359            provenance_mode,
360            _telemetry: telemetry,
361        })
362    }
363
364    /// Returns `true` if the writer thread is still running.
365    fn is_thread_alive(&self) -> bool {
366        self.thread_handle
367            .as_ref()
368            .is_some_and(|h| !h.is_finished())
369    }
370
371    /// Returns `WriterRejected` if the writer thread has exited.
372    fn check_thread_alive(&self) -> Result<(), EngineError> {
373        if self.is_thread_alive() {
374            Ok(())
375        } else {
376            Err(EngineError::WriterRejected(
377                "writer thread has exited".to_owned(),
378            ))
379        }
380    }
381
382    /// # Errors
383    /// Returns [`EngineError`] if the write request validation fails, the writer actor has shut
384    /// down, or the underlying `SQLite` transaction fails.
385    pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
386        self.check_thread_alive()?;
387        let prepared = prepare_write(request, self.provenance_mode)?;
388        let (reply_tx, reply_rx) = mpsc::channel();
389        self.sender
390            .send(WriteMessage::Submit {
391                prepared: Box::new(prepared),
392                reply: reply_tx,
393            })
394            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
395
396        recv_with_timeout(&reply_rx)
397    }
398
399    /// # Errors
400    /// Returns [`EngineError`] if validation fails, the writer actor has shut down,
401    /// or the underlying `SQLite` transaction fails.
402    pub fn touch_last_accessed(
403        &self,
404        request: LastAccessTouchRequest,
405    ) -> Result<LastAccessTouchReport, EngineError> {
406        self.check_thread_alive()?;
407        prepare_touch_last_accessed(&request, self.provenance_mode)?;
408        let (reply_tx, reply_rx) = mpsc::channel();
409        self.sender
410            .send(WriteMessage::TouchLastAccessed {
411                request,
412                reply: reply_tx,
413            })
414            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
415
416        recv_with_timeout(&reply_rx)
417    }
418}
419
420#[cfg(not(feature = "tracing"))]
421#[allow(clippy::print_stderr)]
422fn stderr_panic_notice() {
423    eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
424}
425
426impl Drop for WriterActor {
427    fn drop(&mut self) {
428        // Phase 1: close the channel so the writer thread's `for msg in receiver`
429        // loop exits after finishing any in-progress message.
430        // Must happen BEFORE join to avoid deadlock.
431        //
432        // SAFETY: drop is called exactly once, and no method accesses the sender
433        // after drop begins.
434        unsafe { ManuallyDrop::drop(&mut self.sender) };
435
436        // Phase 2: join the writer thread to ensure the SQLite connection closes.
437        if let Some(handle) = self.thread_handle.take() {
438            match handle.join() {
439                Ok(()) => {}
440                Err(payload) => {
441                    if std::thread::panicking() {
442                        trace_warn!(
443                            "writer thread panicked during shutdown (suppressed: already panicking)"
444                        );
445                        #[cfg(not(feature = "tracing"))]
446                        stderr_panic_notice();
447                    } else {
448                        std::panic::resume_unwind(payload);
449                    }
450                }
451            }
452        }
453    }
454}
455
456/// Wait for a reply from the writer thread, with a timeout.
457fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
458    rx.recv_timeout(WRITER_REPLY_TIMEOUT)
459        .map_err(|error| match error {
460            mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
461                "write timed out waiting for writer thread reply — the write may still commit"
462                    .to_owned(),
463            ),
464            mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
465        })
466        .and_then(|result| result)
467}
468
469fn prepare_touch_last_accessed(
470    request: &LastAccessTouchRequest,
471    mode: ProvenanceMode,
472) -> Result<(), EngineError> {
473    if request.logical_ids.is_empty() {
474        return Err(EngineError::InvalidWrite(
475            "touch_last_accessed requires at least one logical_id".to_owned(),
476        ));
477    }
478    for logical_id in &request.logical_ids {
479        if logical_id.trim().is_empty() {
480            return Err(EngineError::InvalidWrite(
481                "touch_last_accessed requires non-empty logical_ids".to_owned(),
482            ));
483        }
484    }
485    if mode == ProvenanceMode::Require && request.source_ref.is_none() {
486        return Err(EngineError::InvalidWrite(
487            "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
488                .to_owned(),
489        ));
490    }
491    Ok(())
492}
493
494fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
495    let missing: Vec<String> = request
496        .nodes
497        .iter()
498        .filter(|n| n.source_ref.is_none())
499        .map(|n| format!("node '{}'", n.logical_id))
500        .chain(
501            request
502                .node_retires
503                .iter()
504                .filter(|r| r.source_ref.is_none())
505                .map(|r| format!("node retire '{}'", r.logical_id)),
506        )
507        .chain(
508            request
509                .edges
510                .iter()
511                .filter(|e| e.source_ref.is_none())
512                .map(|e| format!("edge '{}'", e.logical_id)),
513        )
514        .chain(
515            request
516                .edge_retires
517                .iter()
518                .filter(|r| r.source_ref.is_none())
519                .map(|r| format!("edge retire '{}'", r.logical_id)),
520        )
521        .chain(
522            request
523                .runs
524                .iter()
525                .filter(|r| r.source_ref.is_none())
526                .map(|r| format!("run '{}'", r.id)),
527        )
528        .chain(
529            request
530                .steps
531                .iter()
532                .filter(|s| s.source_ref.is_none())
533                .map(|s| format!("step '{}'", s.id)),
534        )
535        .chain(
536            request
537                .actions
538                .iter()
539                .filter(|a| a.source_ref.is_none())
540                .map(|a| format!("action '{}'", a.id)),
541        )
542        .chain(
543            request
544                .operational_writes
545                .iter()
546                .filter(|write| operational_write_source_ref(write).is_none())
547                .map(|write| {
548                    format!(
549                        "operational {} '{}:{}'",
550                        operational_write_kind(write),
551                        operational_write_collection(write),
552                        operational_write_record_key(write)
553                    )
554                }),
555        )
556        .collect();
557
558    if missing.is_empty() {
559        Ok(())
560    } else {
561        Err(EngineError::InvalidWrite(format!(
562            "ProvenanceMode::Require: missing source_ref on: {}",
563            missing.join(", ")
564        )))
565    }
566}
567
568fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
569    if request.nodes.len() > MAX_NODES {
570        return Err(EngineError::InvalidWrite(format!(
571            "too many nodes: {} exceeds limit of {MAX_NODES}",
572            request.nodes.len()
573        )));
574    }
575    if request.edges.len() > MAX_EDGES {
576        return Err(EngineError::InvalidWrite(format!(
577            "too many edges: {} exceeds limit of {MAX_EDGES}",
578            request.edges.len()
579        )));
580    }
581    if request.chunks.len() > MAX_CHUNKS {
582        return Err(EngineError::InvalidWrite(format!(
583            "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
584            request.chunks.len()
585        )));
586    }
587    let retires = request.node_retires.len() + request.edge_retires.len();
588    if retires > MAX_RETIRES {
589        return Err(EngineError::InvalidWrite(format!(
590            "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
591        )));
592    }
593    let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
594    if runtime_items > MAX_RUNTIME_ITEMS {
595        return Err(EngineError::InvalidWrite(format!(
596            "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
597        )));
598    }
599    if request.operational_writes.len() > MAX_OPERATIONAL {
600        return Err(EngineError::InvalidWrite(format!(
601            "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
602            request.operational_writes.len()
603        )));
604    }
605    let total = request.nodes.len()
606        + request.node_retires.len()
607        + request.edges.len()
608        + request.edge_retires.len()
609        + request.chunks.len()
610        + request.runs.len()
611        + request.steps.len()
612        + request.actions.len()
613        + request.vec_inserts.len()
614        + request.operational_writes.len();
615    if total > MAX_TOTAL_ITEMS {
616        return Err(EngineError::InvalidWrite(format!(
617            "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
618        )));
619    }
620    Ok(())
621}
622
623#[allow(clippy::too_many_lines)]
624fn prepare_write(
625    request: WriteRequest,
626    mode: ProvenanceMode,
627) -> Result<PreparedWrite, EngineError> {
628    validate_request_size(&request)?;
629
630    // --- ID validation: reject empty IDs ---
631    for node in &request.nodes {
632        if node.row_id.is_empty() {
633            return Err(EngineError::InvalidWrite(
634                "NodeInsert has empty row_id".to_owned(),
635            ));
636        }
637        if node.logical_id.is_empty() {
638            return Err(EngineError::InvalidWrite(
639                "NodeInsert has empty logical_id".to_owned(),
640            ));
641        }
642    }
643    for edge in &request.edges {
644        if edge.row_id.is_empty() {
645            return Err(EngineError::InvalidWrite(
646                "EdgeInsert has empty row_id".to_owned(),
647            ));
648        }
649        if edge.logical_id.is_empty() {
650            return Err(EngineError::InvalidWrite(
651                "EdgeInsert has empty logical_id".to_owned(),
652            ));
653        }
654    }
655    for chunk in &request.chunks {
656        if chunk.id.is_empty() {
657            return Err(EngineError::InvalidWrite(
658                "ChunkInsert has empty id".to_owned(),
659            ));
660        }
661        if chunk.text_content.is_empty() {
662            return Err(EngineError::InvalidWrite(format!(
663                "chunk '{}' has empty text_content; empty chunks are not allowed",
664                chunk.id
665            )));
666        }
667    }
668    for run in &request.runs {
669        if run.id.is_empty() {
670            return Err(EngineError::InvalidWrite(
671                "RunInsert has empty id".to_owned(),
672            ));
673        }
674    }
675    for step in &request.steps {
676        if step.id.is_empty() {
677            return Err(EngineError::InvalidWrite(
678                "StepInsert has empty id".to_owned(),
679            ));
680        }
681    }
682    for action in &request.actions {
683        if action.id.is_empty() {
684            return Err(EngineError::InvalidWrite(
685                "ActionInsert has empty id".to_owned(),
686            ));
687        }
688    }
689    for vi in &request.vec_inserts {
690        if vi.chunk_id.is_empty() {
691            return Err(EngineError::InvalidWrite(
692                "VecInsert has empty chunk_id".to_owned(),
693            ));
694        }
695        if vi.embedding.is_empty() {
696            return Err(EngineError::InvalidWrite(format!(
697                "VecInsert for chunk '{}' has empty embedding",
698                vi.chunk_id
699            )));
700        }
701    }
702    for operational in &request.operational_writes {
703        if operational_write_collection(operational).is_empty() {
704            return Err(EngineError::InvalidWrite(
705                "OperationalWrite has empty collection".to_owned(),
706            ));
707        }
708        if operational_write_record_key(operational).is_empty() {
709            return Err(EngineError::InvalidWrite(format!(
710                "OperationalWrite for collection '{}' has empty record_key",
711                operational_write_collection(operational)
712            )));
713        }
714        match operational {
715            OperationalWrite::Append { payload_json, .. }
716            | OperationalWrite::Put { payload_json, .. } => {
717                if payload_json.is_empty() {
718                    return Err(EngineError::InvalidWrite(format!(
719                        "OperationalWrite {} '{}:{}' has empty payload_json",
720                        operational_write_kind(operational),
721                        operational_write_collection(operational),
722                        operational_write_record_key(operational)
723                    )));
724                }
725            }
726            OperationalWrite::Delete { .. } => {}
727        }
728    }
729
730    // --- ID validation: reject duplicate row_ids within the request ---
731    {
732        let mut seen = std::collections::HashSet::new();
733        for node in &request.nodes {
734            if !seen.insert(node.row_id.as_str()) {
735                return Err(EngineError::InvalidWrite(format!(
736                    "duplicate row_id '{}' within the same WriteRequest",
737                    node.row_id
738                )));
739            }
740        }
741        for edge in &request.edges {
742            if !seen.insert(edge.row_id.as_str()) {
743                return Err(EngineError::InvalidWrite(format!(
744                    "duplicate row_id '{}' within the same WriteRequest",
745                    edge.row_id
746                )));
747            }
748        }
749    }
750
751    // --- ProvenanceMode::Require enforcement ---
752    if mode == ProvenanceMode::Require {
753        check_require_provenance(&request)?;
754    }
755
756    // --- Runtime table upsert validation ---
757    for run in &request.runs {
758        if run.upsert && run.supersedes_id.is_none() {
759            return Err(EngineError::InvalidWrite(format!(
760                "run '{}': upsert=true requires supersedes_id to be set",
761                run.id
762            )));
763        }
764    }
765    for step in &request.steps {
766        if step.upsert && step.supersedes_id.is_none() {
767            return Err(EngineError::InvalidWrite(format!(
768                "step '{}': upsert=true requires supersedes_id to be set",
769                step.id
770            )));
771        }
772    }
773    for action in &request.actions {
774        if action.upsert && action.supersedes_id.is_none() {
775            return Err(EngineError::InvalidWrite(format!(
776                "action '{}': upsert=true requires supersedes_id to be set",
777                action.id
778            )));
779        }
780    }
781
782    let node_kinds = request
783        .nodes
784        .iter()
785        .map(|node| (node.logical_id.clone(), node.kind.clone()))
786        .collect::<HashMap<_, _>>();
787
788    Ok(PreparedWrite {
789        label: request.label,
790        nodes: request.nodes,
791        node_retires: request.node_retires,
792        edges: request.edges,
793        edge_retires: request.edge_retires,
794        chunks: request.chunks,
795        runs: request.runs,
796        steps: request.steps,
797        actions: request.actions,
798        vec_inserts: request.vec_inserts,
799        operational_writes: request.operational_writes,
800        operational_collection_kinds: HashMap::new(),
801        operational_collection_filter_fields: HashMap::new(),
802        operational_validation_warnings: Vec::new(),
803        node_kinds,
804        required_fts_rows: Vec::new(),
805        required_property_fts_rows: Vec::new(),
806        optional_backfills: request.optional_backfills,
807    })
808}
809
810fn writer_loop(
811    database_path: &Path,
812    schema_manager: &Arc<SchemaManager>,
813    receiver: mpsc::Receiver<WriteMessage>,
814    telemetry: &TelemetryCounters,
815) {
816    trace_info!("writer thread started");
817
818    let mut conn = match sqlite::open_connection(database_path) {
819        Ok(conn) => conn,
820        Err(error) => {
821            trace_error!(error = %error, "writer thread: database connection failed");
822            reject_all(receiver, &error.to_string());
823            return;
824        }
825    };
826
827    if let Err(error) = schema_manager.bootstrap(&conn) {
828        trace_error!(error = %error, "writer thread: schema bootstrap failed");
829        reject_all(receiver, &error.to_string());
830        return;
831    }
832
833    for message in receiver {
834        match message {
835            WriteMessage::Submit {
836                mut prepared,
837                reply,
838            } => {
839                #[cfg(feature = "tracing")]
840                let start = std::time::Instant::now();
841                let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
842                    resolve_and_apply(&mut conn, &mut prepared)
843                }));
844                if let Ok(inner) = result {
845                    #[allow(unused_variables)]
846                    if let Err(error) = &inner {
847                        trace_error!(
848                            label = %prepared.label,
849                            error = %error,
850                            "write failed"
851                        );
852                        telemetry.increment_errors();
853                    } else {
854                        let row_count = (prepared.nodes.len()
855                            + prepared.edges.len()
856                            + prepared.chunks.len()) as u64;
857                        telemetry.increment_writes(row_count);
858                        trace_info!(
859                            label = %prepared.label,
860                            nodes = prepared.nodes.len(),
861                            edges = prepared.edges.len(),
862                            chunks = prepared.chunks.len(),
863                            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
864                            "write committed"
865                        );
866                    }
867                    let _ = reply.send(inner);
868                } else {
869                    trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
870                    telemetry.increment_errors();
871                    // Attempt to clean up any open transaction after a panic.
872                    let _ = conn.execute_batch("ROLLBACK");
873                    let _ = reply.send(Err(EngineError::WriterRejected(
874                        "writer thread panic during resolve_and_apply".to_owned(),
875                    )));
876                }
877            }
878            WriteMessage::TouchLastAccessed { request, reply } => {
879                let result = apply_touch_last_accessed(&mut conn, &request);
880                if result.is_ok() {
881                    telemetry.increment_writes(0);
882                } else {
883                    telemetry.increment_errors();
884                }
885                let _ = reply.send(result);
886            }
887        }
888    }
889
890    trace_info!("writer thread shutting down");
891}
892
893fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
894    for message in receiver {
895        match message {
896            WriteMessage::Submit { reply, .. } => {
897                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
898            }
899            WriteMessage::TouchLastAccessed { reply, .. } => {
900                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
901            }
902        }
903    }
904}
905
906/// Resolve FTS projection rows before the write transaction begins.
907///
908/// For each chunk in the prepared write, determines the node `kind` needed
909/// to populate the FTS index. If the node was co-submitted in the same
910/// request its kind is taken from `prepared.node_kinds` without a DB query.
911/// If the node is pre-existing it is looked up in the database. If the node
912/// cannot be found in either place, an `InvalidWrite` error is returned.
913fn resolve_fts_rows(
914    conn: &rusqlite::Connection,
915    prepared: &mut PreparedWrite,
916) -> Result<(), EngineError> {
917    let retiring_ids: std::collections::HashSet<&str> = prepared
918        .node_retires
919        .iter()
920        .map(|r| r.logical_id.as_str())
921        .collect();
922    for chunk in &prepared.chunks {
923        if retiring_ids.contains(chunk.node_logical_id.as_str()) {
924            return Err(EngineError::InvalidWrite(format!(
925                "chunk '{}' references node_logical_id '{}' which is being retired in the same \
926                 WriteRequest; retire and chunk insertion for the same node must not be combined",
927                chunk.id, chunk.node_logical_id
928            )));
929        }
930    }
931    for chunk in &prepared.chunks {
932        let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
933            k.clone()
934        } else {
935            match conn.query_row(
936                "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
937                params![chunk.node_logical_id],
938                |row| row.get::<_, String>(0),
939            ) {
940                Ok(kind) => kind,
941                Err(rusqlite::Error::QueryReturnedNoRows) => {
942                    return Err(EngineError::InvalidWrite(format!(
943                        "chunk '{}' references node_logical_id '{}' that is not present in this \
944                         write request or the database \
945                         (v1 limitation: chunks and their nodes must be submitted together or the \
946                         node must already exist)",
947                        chunk.id, chunk.node_logical_id
948                    )));
949                }
950                Err(e) => return Err(EngineError::Sqlite(e)),
951            }
952        };
953        prepared.required_fts_rows.push(FtsProjectionRow {
954            chunk_id: chunk.id.clone(),
955            node_logical_id: chunk.node_logical_id.clone(),
956            kind,
957            text_content: chunk.text_content.clone(),
958        });
959    }
960    trace_debug!(
961        fts_rows = prepared.required_fts_rows.len(),
962        chunks_processed = prepared.chunks.len(),
963        "fts row resolution completed"
964    );
965    Ok(())
966}
967
968/// Load FTS property schemas from the database and extract property values from
969/// co-submitted nodes, generating `FtsPropertyProjectionRow` entries.
970fn resolve_property_fts_rows(
971    conn: &rusqlite::Connection,
972    prepared: &mut PreparedWrite,
973) -> Result<(), EngineError> {
974    if prepared.nodes.is_empty() {
975        return Ok(());
976    }
977
978    let schemas: HashMap<String, PropertyFtsSchema> =
979        load_fts_property_schemas(conn)?.into_iter().collect();
980
981    if schemas.is_empty() {
982        return Ok(());
983    }
984
985    let mut combined_stats = ExtractStats::default();
986    for node in &prepared.nodes {
987        let Some(schema) = schemas.get(&node.kind) else {
988            continue;
989        };
990        let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
991        let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
992        let (text_content, positions, stats) = extract_property_fts(&props, schema);
993        combined_stats.merge(stats);
994        if let Some(text_content) = text_content {
995            let columns = if has_weights {
996                extract_property_fts_columns(&props, schema)
997            } else {
998                Vec::new()
999            };
1000            prepared
1001                .required_property_fts_rows
1002                .push(FtsPropertyProjectionRow {
1003                    node_logical_id: node.logical_id.clone(),
1004                    kind: node.kind.clone(),
1005                    text_content,
1006                    positions,
1007                    columns,
1008                });
1009        }
1010    }
1011    if combined_stats != ExtractStats::default() {
1012        trace_debug!(
1013            depth_cap_hit = combined_stats.depth_cap_hit,
1014            byte_cap_reached = combined_stats.byte_cap_reached,
1015            excluded_subtree = combined_stats.excluded_subtree,
1016            "property fts recursive extraction guardrails engaged"
1017        );
1018    }
1019    trace_debug!(
1020        property_fts_rows = prepared.required_property_fts_rows.len(),
1021        nodes_processed = prepared.nodes.len(),
1022        "property fts row resolution completed"
1023    );
1024    Ok(())
1025}
1026
1027fn resolve_operational_writes(
1028    conn: &rusqlite::Connection,
1029    prepared: &mut PreparedWrite,
1030) -> Result<(), EngineError> {
1031    let mut collection_kinds = HashMap::new();
1032    let mut collection_filter_fields = HashMap::new();
1033    let mut collection_validation_contracts = HashMap::new();
1034    for write in &prepared.operational_writes {
1035        let collection = operational_write_collection(write);
1036        if !collection_kinds.contains_key(collection) {
1037            let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1038                .query_row(
1039                    "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1040                    params![collection],
1041                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1042                )
1043                .optional()
1044                .map_err(EngineError::Sqlite)?;
1045            let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1046                .ok_or_else(|| {
1047                    EngineError::InvalidWrite(format!(
1048                        "operational collection '{collection}' is not registered"
1049                    ))
1050                })?;
1051            if disabled_at.is_some() {
1052                return Err(EngineError::InvalidWrite(format!(
1053                    "operational collection '{collection}' is disabled"
1054                )));
1055            }
1056            let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1057                .map_err(EngineError::InvalidWrite)?;
1058            let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1059            let validation_contract = parse_operational_validation_contract(&validation_json)
1060                .map_err(EngineError::InvalidWrite)?;
1061            collection_kinds.insert(collection.to_owned(), kind);
1062            collection_filter_fields.insert(collection.to_owned(), filter_fields);
1063            collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1064        }
1065
1066        let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1067            EngineError::InvalidWrite("missing operational collection kind".to_owned())
1068        })?;
1069        match (kind, write) {
1070            (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1071            | (
1072                OperationalCollectionKind::LatestState,
1073                OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1074            ) => {}
1075            (OperationalCollectionKind::AppendOnlyLog, _) => {
1076                return Err(EngineError::InvalidWrite(format!(
1077                    "operational collection '{collection}' is append_only_log and only accepts Append"
1078                )));
1079            }
1080            (OperationalCollectionKind::LatestState, _) => {
1081                return Err(EngineError::InvalidWrite(format!(
1082                    "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1083                )));
1084            }
1085        }
1086        if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1087            let _ = check_operational_write_against_contract(write, contract)?;
1088        }
1089    }
1090    prepared.operational_collection_kinds = collection_kinds;
1091    prepared.operational_collection_filter_fields = collection_filter_fields;
1092    Ok(())
1093}
1094
1095fn parse_operational_filter_fields(
1096    filter_fields_json: &str,
1097) -> Result<Vec<OperationalFilterField>, EngineError> {
1098    let fields: Vec<OperationalFilterField> =
1099        serde_json::from_str(filter_fields_json).map_err(|error| {
1100            EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1101        })?;
1102    let mut seen = std::collections::HashSet::new();
1103    for field in &fields {
1104        if field.name.trim().is_empty() {
1105            return Err(EngineError::InvalidWrite(
1106                "filter_fields_json field names must not be empty".to_owned(),
1107            ));
1108        }
1109        if !seen.insert(field.name.as_str()) {
1110            return Err(EngineError::InvalidWrite(format!(
1111                "filter_fields_json contains duplicate field '{}'",
1112                field.name
1113            )));
1114        }
1115        if field.modes.is_empty() {
1116            return Err(EngineError::InvalidWrite(format!(
1117                "filter_fields_json field '{}' must declare at least one mode",
1118                field.name
1119            )));
1120        }
1121        if field.modes.contains(&OperationalFilterMode::Prefix)
1122            && field.field_type != OperationalFilterFieldType::String
1123        {
1124            return Err(EngineError::InvalidWrite(format!(
1125                "filter field '{}' only supports prefix for string types",
1126                field.name
1127            )));
1128        }
1129    }
1130    Ok(fields)
1131}
1132
1133#[derive(Clone, Debug, PartialEq, Eq)]
1134struct OperationalFilterValueRow {
1135    field_name: String,
1136    string_value: Option<String>,
1137    integer_value: Option<i64>,
1138}
1139
1140fn extract_operational_filter_values(
1141    filter_fields: &[OperationalFilterField],
1142    payload_json: &str,
1143) -> Vec<OperationalFilterValueRow> {
1144    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1145        return Vec::new();
1146    };
1147    let Some(object) = parsed.as_object() else {
1148        return Vec::new();
1149    };
1150
1151    filter_fields
1152        .iter()
1153        .filter_map(|field| {
1154            let value = object.get(&field.name)?;
1155            match field.field_type {
1156                OperationalFilterFieldType::String => {
1157                    value
1158                        .as_str()
1159                        .map(|string_value| OperationalFilterValueRow {
1160                            field_name: field.name.clone(),
1161                            string_value: Some(string_value.to_owned()),
1162                            integer_value: None,
1163                        })
1164                }
1165                OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1166                    value
1167                        .as_i64()
1168                        .map(|integer_value| OperationalFilterValueRow {
1169                            field_name: field.name.clone(),
1170                            string_value: None,
1171                            integer_value: Some(integer_value),
1172                        })
1173                }
1174            }
1175        })
1176        .collect()
1177}
1178
1179fn resolve_and_apply(
1180    conn: &mut rusqlite::Connection,
1181    prepared: &mut PreparedWrite,
1182) -> Result<WriteReceipt, EngineError> {
1183    resolve_fts_rows(conn, prepared)?;
1184    resolve_property_fts_rows(conn, prepared)?;
1185    resolve_operational_writes(conn, prepared)?;
1186    apply_write(conn, prepared)
1187}
1188
1189fn apply_touch_last_accessed(
1190    conn: &mut rusqlite::Connection,
1191    request: &LastAccessTouchRequest,
1192) -> Result<LastAccessTouchReport, EngineError> {
1193    let mut seen = std::collections::HashSet::new();
1194    let logical_ids = request
1195        .logical_ids
1196        .iter()
1197        .filter(|logical_id| seen.insert(logical_id.as_str()))
1198        .cloned()
1199        .collect::<Vec<_>>();
1200    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1201
1202    for logical_id in &logical_ids {
1203        let exists = tx
1204            .query_row(
1205                "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1206                params![logical_id],
1207                |row| row.get::<_, i64>(0),
1208            )
1209            .optional()?
1210            .is_some();
1211        if !exists {
1212            return Err(EngineError::InvalidWrite(format!(
1213                "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1214            )));
1215        }
1216    }
1217
1218    {
1219        let mut upsert_metadata = tx.prepare_cached(
1220            "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1221             VALUES (?1, ?2, ?2) \
1222             ON CONFLICT(logical_id) DO UPDATE SET \
1223                 last_accessed_at = excluded.last_accessed_at, \
1224                 updated_at = excluded.updated_at",
1225        )?;
1226        let mut insert_provenance = tx.prepare_cached(
1227            "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1228             VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1229        )?;
1230        for logical_id in &logical_ids {
1231            upsert_metadata.execute(params![logical_id, request.touched_at])?;
1232            insert_provenance.execute(params![
1233                new_id(),
1234                logical_id,
1235                request.source_ref.as_deref(),
1236                format!("{{\"touched_at\":{}}}", request.touched_at),
1237            ])?;
1238        }
1239    }
1240
1241    tx.commit()?;
1242    Ok(LastAccessTouchReport {
1243        touched_logical_ids: logical_ids.len(),
1244        touched_at: request.touched_at,
1245    })
1246}
1247
1248fn ensure_operational_collections_writable(
1249    tx: &rusqlite::Transaction<'_>,
1250    prepared: &PreparedWrite,
1251) -> Result<(), EngineError> {
1252    for collection in prepared.operational_collection_kinds.keys() {
1253        let disabled_at: Option<Option<i64>> = tx
1254            .query_row(
1255                "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1256                params![collection],
1257                |row| row.get::<_, Option<i64>>(0),
1258            )
1259            .optional()?;
1260        match disabled_at {
1261            Some(Some(_)) => {
1262                return Err(EngineError::InvalidWrite(format!(
1263                    "operational collection '{collection}' is disabled"
1264                )));
1265            }
1266            Some(None) => {}
1267            None => {
1268                return Err(EngineError::InvalidWrite(format!(
1269                    "operational collection '{collection}' is not registered"
1270                )));
1271            }
1272        }
1273    }
1274    Ok(())
1275}
1276
1277fn validate_operational_writes_against_live_contracts(
1278    tx: &rusqlite::Transaction<'_>,
1279    prepared: &PreparedWrite,
1280) -> Result<Vec<String>, EngineError> {
1281    let mut collection_validation_contracts =
1282        HashMap::<String, Option<OperationalValidationContract>>::new();
1283    for collection in prepared.operational_collection_kinds.keys() {
1284        let validation_json: String = tx
1285            .query_row(
1286                "SELECT validation_json FROM operational_collections WHERE name = ?1",
1287                params![collection],
1288                |row| row.get(0),
1289            )
1290            .map_err(EngineError::Sqlite)?;
1291        let validation_contract = parse_operational_validation_contract(&validation_json)
1292            .map_err(EngineError::InvalidWrite)?;
1293        collection_validation_contracts.insert(collection.clone(), validation_contract);
1294    }
1295
1296    let mut warnings = Vec::new();
1297    for write in &prepared.operational_writes {
1298        if let Some(Some(contract)) =
1299            collection_validation_contracts.get(operational_write_collection(write))
1300            && let Some(warning) = check_operational_write_against_contract(write, contract)?
1301        {
1302            warnings.push(warning);
1303        }
1304    }
1305
1306    Ok(warnings)
1307}
1308
1309fn load_live_operational_secondary_indexes(
1310    tx: &rusqlite::Transaction<'_>,
1311    prepared: &PreparedWrite,
1312) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1313    let mut collection_indexes = HashMap::new();
1314    for (collection, collection_kind) in &prepared.operational_collection_kinds {
1315        let secondary_indexes_json: String = tx
1316            .query_row(
1317                "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1318                params![collection],
1319                |row| row.get(0),
1320            )
1321            .map_err(EngineError::Sqlite)?;
1322        let indexes =
1323            parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1324                .map_err(EngineError::InvalidWrite)?;
1325        collection_indexes.insert(collection.clone(), indexes);
1326    }
1327    Ok(collection_indexes)
1328}
1329
1330fn check_operational_write_against_contract(
1331    write: &OperationalWrite,
1332    contract: &OperationalValidationContract,
1333) -> Result<Option<String>, EngineError> {
1334    if contract.mode == OperationalValidationMode::Disabled {
1335        return Ok(None);
1336    }
1337
1338    let (payload_json, collection, record_key) = match write {
1339        OperationalWrite::Append {
1340            collection,
1341            record_key,
1342            payload_json,
1343            ..
1344        }
1345        | OperationalWrite::Put {
1346            collection,
1347            record_key,
1348            payload_json,
1349            ..
1350        } => (
1351            payload_json.as_str(),
1352            collection.as_str(),
1353            record_key.as_str(),
1354        ),
1355        OperationalWrite::Delete { .. } => return Ok(None),
1356    };
1357
1358    match validate_operational_payload_against_contract(contract, payload_json) {
1359        Ok(()) => Ok(None),
1360        Err(message) => match contract.mode {
1361            OperationalValidationMode::Disabled => Ok(None),
1362            OperationalValidationMode::ReportOnly => Ok(Some(format!(
1363                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1364                kind = operational_write_kind(write)
1365            ))),
1366            OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1367                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1368                kind = operational_write_kind(write)
1369            ))),
1370        },
1371    }
1372}
1373
1374#[allow(clippy::too_many_lines)]
1375fn apply_write(
1376    conn: &mut rusqlite::Connection,
1377    prepared: &mut PreparedWrite,
1378) -> Result<WriteReceipt, EngineError> {
1379    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1380
1381    // Node retires: clear rebuildable FTS rows (chunk-based and property-based),
1382    // preserve chunks/vec for possible restore, mark superseded, record audit event.
1383    {
1384        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1385        let mut del_prop_positions = tx
1386            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1387        // Double-write cleanup: also remove staging rows for this node so that a
1388        // rebuild-in-progress does not re-insert a retired node at swap time.
1389        let mut del_staging = tx.prepare_cached(
1390            "DELETE FROM fts_property_rebuild_staging WHERE node_logical_id = ?1",
1391        )?;
1392        let mut sup_node = tx.prepare_cached(
1393            "UPDATE nodes SET superseded_at = unixepoch() \
1394             WHERE logical_id = ?1 AND superseded_at IS NULL",
1395        )?;
1396        let mut ins_event = tx.prepare_cached(
1397            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1398             VALUES (?1, 'node_retire', ?2, ?3)",
1399        )?;
1400        for retire in &prepared.node_retires {
1401            del_fts.execute(params![retire.logical_id])?;
1402            // Delete from the per-kind FTS table: look up the node's kind first.
1403            let kind_opt: Option<String> = tx
1404                .query_row(
1405                    "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1406                    params![retire.logical_id],
1407                    |r| r.get(0),
1408                )
1409                .optional()?;
1410            if let Some(kind) = kind_opt {
1411                let table = fathomdb_schema::fts_kind_table_name(&kind);
1412                // Only delete if the per-kind table exists (may not exist if no schema registered).
1413                let table_exists: bool = tx
1414                    .query_row(
1415                        "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1416                         AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1417                        rusqlite::params![table],
1418                        |r| r.get::<_, i64>(0),
1419                    )
1420                    .unwrap_or(0)
1421                    > 0;
1422                if table_exists {
1423                    tx.execute(
1424                        &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1425                        params![retire.logical_id],
1426                    )?;
1427                }
1428            }
1429            del_prop_positions.execute(params![retire.logical_id])?;
1430            del_staging.execute(params![retire.logical_id])?;
1431            sup_node.execute(params![retire.logical_id])?;
1432            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1433        }
1434    }
1435
1436    // Edge retires: mark superseded, record audit event.
1437    {
1438        let mut sup_edge = tx.prepare_cached(
1439            "UPDATE edges SET superseded_at = unixepoch() \
1440             WHERE logical_id = ?1 AND superseded_at IS NULL",
1441        )?;
1442        let mut ins_event = tx.prepare_cached(
1443            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1444             VALUES (?1, 'edge_retire', ?2, ?3)",
1445        )?;
1446        for retire in &prepared.edge_retires {
1447            sup_edge.execute(params![retire.logical_id])?;
1448            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1449        }
1450    }
1451
1452    // Node inserts (with optional upsert + chunk-policy handling).
1453    {
1454        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1455        let mut del_prop_positions = tx
1456            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1457        let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1458        let mut sup_node = tx.prepare_cached(
1459            "UPDATE nodes SET superseded_at = unixepoch() \
1460             WHERE logical_id = ?1 AND superseded_at IS NULL",
1461        )?;
1462        let mut ins_node = tx.prepare_cached(
1463            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1464             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1465        )?;
1466        for node in &prepared.nodes {
1467            if node.upsert {
1468                // Property FTS rows are always replaced on upsert since properties change.
1469                // Delete from the per-kind table (table name is dynamic).
1470                let table = fathomdb_schema::fts_kind_table_name(&node.kind);
1471                // Only delete if the per-kind table exists (may not exist if no schema registered).
1472                let table_exists: bool = tx
1473                    .query_row(
1474                        "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1475                         AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1476                        rusqlite::params![table],
1477                        |r| r.get::<_, i64>(0),
1478                    )
1479                    .unwrap_or(0)
1480                    > 0;
1481                if table_exists {
1482                    tx.execute(
1483                        &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1484                        params![node.logical_id],
1485                    )?;
1486                }
1487                del_prop_positions.execute(params![node.logical_id])?;
1488                if node.chunk_policy == ChunkPolicy::Replace {
1489                    // 0.5.0: delete from the per-kind vec table instead of the global sentinel.
1490                    #[cfg(feature = "sqlite-vec")]
1491                    {
1492                        let vec_table = fathomdb_schema::vec_kind_table_name(&node.kind);
1493                        let del_sql = format!(
1494                            "DELETE FROM {vec_table} WHERE chunk_id IN \
1495                             (SELECT id FROM chunks WHERE node_logical_id = ?1)"
1496                        );
1497                        match tx.execute(&del_sql, params![node.logical_id]) {
1498                            Ok(_) => {}
1499                            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {}
1500                            Err(e) => return Err(e.into()),
1501                        }
1502                    }
1503                    del_fts.execute(params![node.logical_id])?;
1504                    del_chunks.execute(params![node.logical_id])?;
1505                }
1506                sup_node.execute(params![node.logical_id])?;
1507            }
1508            ins_node.execute(params![
1509                node.row_id,
1510                node.logical_id,
1511                node.kind,
1512                node.properties,
1513                node.source_ref,
1514                node.content_ref,
1515            ])?;
1516        }
1517    }
1518
1519    // Edge inserts (with optional upsert).
1520    {
1521        let mut sup_edge = tx.prepare_cached(
1522            "UPDATE edges SET superseded_at = unixepoch() \
1523             WHERE logical_id = ?1 AND superseded_at IS NULL",
1524        )?;
1525        let mut ins_edge = tx.prepare_cached(
1526            "INSERT INTO edges \
1527             (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1528             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1529        )?;
1530        for edge in &prepared.edges {
1531            if edge.upsert {
1532                sup_edge.execute(params![edge.logical_id])?;
1533            }
1534            ins_edge.execute(params![
1535                edge.row_id,
1536                edge.logical_id,
1537                edge.source_logical_id,
1538                edge.target_logical_id,
1539                edge.kind,
1540                edge.properties,
1541                edge.source_ref,
1542            ])?;
1543        }
1544    }
1545
1546    // Chunk inserts.
1547    {
1548        let mut ins_chunk = tx.prepare_cached(
1549            "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
1550             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1551        )?;
1552        for chunk in &prepared.chunks {
1553            ins_chunk.execute(params![
1554                chunk.id,
1555                chunk.node_logical_id,
1556                chunk.text_content,
1557                chunk.byte_start,
1558                chunk.byte_end,
1559                chunk.content_hash,
1560            ])?;
1561        }
1562    }
1563
1564    // Run inserts (with optional upsert).
1565    {
1566        let mut sup_run = tx.prepare_cached(
1567            "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1568        )?;
1569        let mut ins_run = tx.prepare_cached(
1570            "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1571             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1572        )?;
1573        for run in &prepared.runs {
1574            if run.upsert
1575                && let Some(ref prior_id) = run.supersedes_id
1576            {
1577                sup_run.execute(params![prior_id])?;
1578            }
1579            ins_run.execute(params![
1580                run.id,
1581                run.kind,
1582                run.status,
1583                run.properties,
1584                run.source_ref
1585            ])?;
1586        }
1587    }
1588
1589    // Step inserts (with optional upsert).
1590    {
1591        let mut sup_step = tx.prepare_cached(
1592            "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1593        )?;
1594        let mut ins_step = tx.prepare_cached(
1595            "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1596             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1597        )?;
1598        for step in &prepared.steps {
1599            if step.upsert
1600                && let Some(ref prior_id) = step.supersedes_id
1601            {
1602                sup_step.execute(params![prior_id])?;
1603            }
1604            ins_step.execute(params![
1605                step.id,
1606                step.run_id,
1607                step.kind,
1608                step.status,
1609                step.properties,
1610                step.source_ref,
1611            ])?;
1612        }
1613    }
1614
1615    // Action inserts (with optional upsert).
1616    {
1617        let mut sup_action = tx.prepare_cached(
1618            "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1619        )?;
1620        let mut ins_action = tx.prepare_cached(
1621            "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1622             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1623        )?;
1624        for action in &prepared.actions {
1625            if action.upsert
1626                && let Some(ref prior_id) = action.supersedes_id
1627            {
1628                sup_action.execute(params![prior_id])?;
1629            }
1630            ins_action.execute(params![
1631                action.id,
1632                action.step_id,
1633                action.kind,
1634                action.status,
1635                action.properties,
1636                action.source_ref,
1637            ])?;
1638        }
1639    }
1640
1641    // Operational mutation log writes and latest-state current rows.
1642    {
1643        ensure_operational_collections_writable(&tx, prepared)?;
1644        prepared.operational_validation_warnings =
1645            validate_operational_writes_against_live_contracts(&tx, prepared)?;
1646        let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
1647
1648        let mut next_mutation_order: i64 = tx.query_row(
1649            "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
1650            [],
1651            |row| row.get(0),
1652        )?;
1653        let mut ins_mutation = tx.prepare_cached(
1654            "INSERT INTO operational_mutations \
1655             (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1656             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1657        )?;
1658        let mut ins_filter_value = tx.prepare_cached(
1659            "INSERT INTO operational_filter_values \
1660             (mutation_id, collection_name, field_name, string_value, integer_value) \
1661             VALUES (?1, ?2, ?3, ?4, ?5)",
1662        )?;
1663        let mut upsert_current = tx.prepare_cached(
1664            "INSERT INTO operational_current \
1665             (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1666             VALUES (?1, ?2, ?3, unixepoch(), ?4) \
1667             ON CONFLICT(collection_name, record_key) DO UPDATE SET \
1668                 payload_json = excluded.payload_json, \
1669                 updated_at = excluded.updated_at, \
1670                 last_mutation_id = excluded.last_mutation_id",
1671        )?;
1672        let mut del_current = tx.prepare_cached(
1673            "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
1674        )?;
1675        let mut del_current_secondary_indexes = tx.prepare_cached(
1676            "DELETE FROM operational_secondary_index_entries \
1677             WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
1678        )?;
1679        let mut ins_secondary_index = tx.prepare_cached(
1680            "INSERT INTO operational_secondary_index_entries \
1681             (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
1682              slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
1683             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1684        )?;
1685        let mut current_row_stmt = tx.prepare_cached(
1686            "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
1687             WHERE collection_name = ?1 AND record_key = ?2",
1688        )?;
1689
1690        for write in &prepared.operational_writes {
1691            let collection = operational_write_collection(write);
1692            let record_key = operational_write_record_key(write);
1693            let mutation_id = new_id();
1694            next_mutation_order += 1;
1695            let payload_json = operational_write_payload(write);
1696            ins_mutation.execute(params![
1697                &mutation_id,
1698                collection,
1699                record_key,
1700                operational_write_kind(write),
1701                payload_json,
1702                operational_write_source_ref(write),
1703                next_mutation_order,
1704            ])?;
1705            if let Some(indexes) = collection_secondary_indexes.get(collection) {
1706                for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
1707                    ins_secondary_index.execute(params![
1708                        collection,
1709                        entry.index_name,
1710                        "mutation",
1711                        &mutation_id,
1712                        record_key,
1713                        entry.sort_timestamp,
1714                        entry.slot1_text,
1715                        entry.slot1_integer,
1716                        entry.slot2_text,
1717                        entry.slot2_integer,
1718                        entry.slot3_text,
1719                        entry.slot3_integer,
1720                    ])?;
1721                }
1722            }
1723            if let Some(filter_fields) = prepared
1724                .operational_collection_filter_fields
1725                .get(collection)
1726            {
1727                for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
1728                    ins_filter_value.execute(params![
1729                        &mutation_id,
1730                        collection,
1731                        filter_value.field_name,
1732                        filter_value.string_value,
1733                        filter_value.integer_value,
1734                    ])?;
1735                }
1736            }
1737
1738            if prepared.operational_collection_kinds.get(collection)
1739                == Some(&OperationalCollectionKind::LatestState)
1740            {
1741                del_current_secondary_indexes.execute(params![collection, record_key])?;
1742                match write {
1743                    OperationalWrite::Put { payload_json, .. } => {
1744                        upsert_current.execute(params![
1745                            collection,
1746                            record_key,
1747                            payload_json,
1748                            &mutation_id,
1749                        ])?;
1750                        if let Some(indexes) = collection_secondary_indexes.get(collection) {
1751                            let (current_payload_json, updated_at, last_mutation_id): (
1752                                String,
1753                                i64,
1754                                String,
1755                            ) = current_row_stmt
1756                                .query_row(params![collection, record_key], |row| {
1757                                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1758                                })?;
1759                            for entry in extract_secondary_index_entries_for_current(
1760                                indexes,
1761                                &current_payload_json,
1762                                updated_at,
1763                            ) {
1764                                ins_secondary_index.execute(params![
1765                                    collection,
1766                                    entry.index_name,
1767                                    "current",
1768                                    last_mutation_id.as_str(),
1769                                    record_key,
1770                                    entry.sort_timestamp,
1771                                    entry.slot1_text,
1772                                    entry.slot1_integer,
1773                                    entry.slot2_text,
1774                                    entry.slot2_integer,
1775                                    entry.slot3_text,
1776                                    entry.slot3_integer,
1777                                ])?;
1778                            }
1779                        }
1780                    }
1781                    OperationalWrite::Delete { .. } => {
1782                        del_current.execute(params![collection, record_key])?;
1783                    }
1784                    OperationalWrite::Append { .. } => {}
1785                }
1786            }
1787        }
1788    }
1789
1790    // FTS row inserts.
1791    {
1792        let mut ins_fts = tx.prepare_cached(
1793            "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
1794             VALUES (?1, ?2, ?3, ?4)",
1795        )?;
1796        for fts_row in &prepared.required_fts_rows {
1797            ins_fts.execute(params![
1798                fts_row.chunk_id,
1799                fts_row.node_logical_id,
1800                fts_row.kind,
1801                fts_row.text_content,
1802            ])?;
1803        }
1804    }
1805
1806    // Property FTS row inserts.
1807    if !prepared.required_property_fts_rows.is_empty() {
1808        let mut ins_positions = tx.prepare_cached(
1809            "INSERT INTO fts_node_property_positions \
1810             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
1811             VALUES (?1, ?2, ?3, ?4, ?5)",
1812        )?;
1813        // Delete any stale position rows for these nodes; writer already
1814        // deleted the per-kind FTS rows in the upsert block above.
1815        let mut del_positions = tx
1816            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1817        for row in &prepared.required_property_fts_rows {
1818            del_positions.execute(params![row.node_logical_id])?;
1819            // Insert into per-kind FTS table (table name is dynamic).
1820            let table = fathomdb_schema::fts_kind_table_name(&row.kind);
1821            if row.columns.is_empty() {
1822                // Non-weighted schema: single text_content column.
1823                // Ensure the per-kind table exists (handles new-kind first write).
1824                tx.execute_batch(&format!(
1825                    "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
1826                        node_logical_id UNINDEXED, text_content, \
1827                        tokenize = '{DEFAULT_FTS_TOKENIZER}'\
1828                    )"
1829                ))?;
1830                tx.prepare(&format!(
1831                    "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
1832                ))?
1833                .execute(params![row.node_logical_id, row.text_content])?;
1834            } else {
1835                // Weighted schema: per-column INSERT.
1836                let col_names: Vec<&str> = row.columns.iter().map(|(n, _)| n.as_str()).collect();
1837                let placeholders: Vec<String> = (2..=row.columns.len() + 1)
1838                    .map(|i| format!("?{i}"))
1839                    .collect();
1840                let sql = format!(
1841                    "INSERT INTO {table}(node_logical_id, {cols}) VALUES (?1, {placeholders})",
1842                    cols = col_names.join(", "),
1843                    placeholders = placeholders.join(", "),
1844                );
1845                let mut stmt = tx.prepare(&sql)?;
1846                stmt.execute(rusqlite::params_from_iter(
1847                    std::iter::once(row.node_logical_id.as_str())
1848                        .chain(row.columns.iter().map(|(_, v)| v.as_str())),
1849                ))?;
1850            }
1851            for pos in &row.positions {
1852                ins_positions.execute(params![
1853                    row.node_logical_id,
1854                    row.kind,
1855                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
1856                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
1857                    pos.leaf_path,
1858                ])?;
1859            }
1860        }
1861    }
1862
1863    // Double-write to staging: for nodes whose kind has an active rebuild
1864    // (state IN ('PENDING','BUILDING','SWAPPING')), upsert the precomputed
1865    // text_content into fts_property_rebuild_staging.  Both this upsert and
1866    // the per-kind FTS insert above happen inside the same transaction.
1867    if !prepared.required_property_fts_rows.is_empty() {
1868        let mut ins_staging_simple = tx.prepare_cached(
1869            "INSERT INTO fts_property_rebuild_staging \
1870             (kind, node_logical_id, text_content) \
1871             VALUES (?1, ?2, ?3) \
1872             ON CONFLICT(kind, node_logical_id) DO UPDATE \
1873             SET text_content = excluded.text_content",
1874        )?;
1875        let mut ins_staging_columns = tx.prepare_cached(
1876            "INSERT INTO fts_property_rebuild_staging \
1877             (kind, node_logical_id, text_content, columns_json) \
1878             VALUES (?1, ?2, ?3, ?4) \
1879             ON CONFLICT(kind, node_logical_id) DO UPDATE \
1880             SET text_content = excluded.text_content, \
1881                 columns_json = excluded.columns_json",
1882        )?;
1883        let mut check_rebuild = tx.prepare_cached(
1884            "SELECT 1 FROM fts_property_rebuild_state \
1885             WHERE kind = ?1 AND state IN ('PENDING','BUILDING','SWAPPING') LIMIT 1",
1886        )?;
1887        for row in &prepared.required_property_fts_rows {
1888            let in_rebuild: bool = check_rebuild
1889                .query_row(params![row.kind], |_| Ok(true))
1890                .optional()?
1891                .unwrap_or(false);
1892            if in_rebuild {
1893                if row.columns.is_empty() {
1894                    ins_staging_simple.execute(params![
1895                        row.kind,
1896                        row.node_logical_id,
1897                        row.text_content
1898                    ])?;
1899                } else {
1900                    let json_map: serde_json::Map<String, serde_json::Value> = row
1901                        .columns
1902                        .iter()
1903                        .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
1904                        .collect();
1905                    let columns_json =
1906                        serde_json::to_string(&serde_json::Value::Object(json_map)).ok();
1907                    ins_staging_columns.execute(params![
1908                        row.kind,
1909                        row.node_logical_id,
1910                        row.text_content,
1911                        columns_json
1912                    ])?;
1913                }
1914            }
1915        }
1916    }
1917
1918    // Vec inserts (feature-gated; silently skipped when sqlite-vec is absent or table missing).
1919    // 0.5.0: inserts target per-kind tables (vec_<sanitized_kind>) instead of vec_nodes_active.
1920    #[cfg(feature = "sqlite-vec")]
1921    {
1922        // Build chunk_id → node_logical_id from the current request's chunks.
1923        let chunk_to_node: std::collections::HashMap<&str, &str> = prepared
1924            .chunks
1925            .iter()
1926            .map(|c| (c.id.as_str(), c.node_logical_id.as_str()))
1927            .collect();
1928
1929        for vi in &prepared.vec_inserts {
1930            // Resolve kind: prefer in-memory lookup; fall back to DB for pre-existing chunks.
1931            let kind: Option<String> = chunk_to_node
1932                .get(vi.chunk_id.as_str())
1933                .and_then(|lid| prepared.node_kinds.get(*lid))
1934                .cloned()
1935                .or_else(|| {
1936                    tx.query_row(
1937                        "SELECT n.kind FROM chunks c \
1938                         JOIN nodes n ON n.logical_id = c.node_logical_id \
1939                         WHERE c.id = ?1 LIMIT 1",
1940                        rusqlite::params![vi.chunk_id],
1941                        |row| row.get::<_, String>(0),
1942                    )
1943                    .optional()
1944                    .unwrap_or(None)
1945                });
1946
1947            let Some(kind) = kind else {
1948                // No kind found — chunk not yet inserted; skip silently.
1949                continue;
1950            };
1951
1952            let table_name = fathomdb_schema::vec_kind_table_name(&kind);
1953            let dimension = vi.embedding.len();
1954            let bytes: Vec<u8> = vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
1955            // Auto-create the per-kind vec table if it does not exist yet.
1956            tx.execute_batch(&format!(
1957                "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1958                    chunk_id TEXT PRIMARY KEY,\
1959                    embedding float[{dimension}]\
1960                )"
1961            ))?;
1962            tx.execute(
1963                &format!(
1964                    "INSERT OR REPLACE INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
1965                ),
1966                rusqlite::params![vi.chunk_id, bytes],
1967            )?;
1968        }
1969    }
1970
1971    tx.commit()?;
1972
1973    let provenance_warnings: Vec<String> = prepared
1974        .nodes
1975        .iter()
1976        .filter(|node| node.source_ref.is_none())
1977        .map(|node| format!("node '{}' has no source_ref", node.logical_id))
1978        .chain(
1979            prepared
1980                .node_retires
1981                .iter()
1982                .filter(|r| r.source_ref.is_none())
1983                .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
1984        )
1985        .chain(
1986            prepared
1987                .edges
1988                .iter()
1989                .filter(|e| e.source_ref.is_none())
1990                .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
1991        )
1992        .chain(
1993            prepared
1994                .edge_retires
1995                .iter()
1996                .filter(|r| r.source_ref.is_none())
1997                .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
1998        )
1999        .chain(
2000            prepared
2001                .runs
2002                .iter()
2003                .filter(|r| r.source_ref.is_none())
2004                .map(|r| format!("run '{}' has no source_ref", r.id)),
2005        )
2006        .chain(
2007            prepared
2008                .steps
2009                .iter()
2010                .filter(|s| s.source_ref.is_none())
2011                .map(|s| format!("step '{}' has no source_ref", s.id)),
2012        )
2013        .chain(
2014            prepared
2015                .actions
2016                .iter()
2017                .filter(|a| a.source_ref.is_none())
2018                .map(|a| format!("action '{}' has no source_ref", a.id)),
2019        )
2020        .chain(
2021            prepared
2022                .operational_writes
2023                .iter()
2024                .filter(|write| operational_write_source_ref(write).is_none())
2025                .map(|write| {
2026                    format!(
2027                        "operational {} '{}:{}' has no source_ref",
2028                        operational_write_kind(write),
2029                        operational_write_collection(write),
2030                        operational_write_record_key(write)
2031                    )
2032                }),
2033        )
2034        .collect();
2035
2036    let mut warnings = provenance_warnings.clone();
2037    warnings.extend(prepared.operational_validation_warnings.clone());
2038
2039    Ok(WriteReceipt {
2040        label: prepared.label.clone(),
2041        optional_backfill_count: prepared.optional_backfills.len(),
2042        warnings,
2043        provenance_warnings,
2044    })
2045}
2046
2047fn operational_write_collection(write: &OperationalWrite) -> &str {
2048    match write {
2049        OperationalWrite::Append { collection, .. }
2050        | OperationalWrite::Put { collection, .. }
2051        | OperationalWrite::Delete { collection, .. } => collection,
2052    }
2053}
2054
2055fn operational_write_record_key(write: &OperationalWrite) -> &str {
2056    match write {
2057        OperationalWrite::Append { record_key, .. }
2058        | OperationalWrite::Put { record_key, .. }
2059        | OperationalWrite::Delete { record_key, .. } => record_key,
2060    }
2061}
2062
2063fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2064    match write {
2065        OperationalWrite::Append { .. } => "append",
2066        OperationalWrite::Put { .. } => "put",
2067        OperationalWrite::Delete { .. } => "delete",
2068    }
2069}
2070
2071fn operational_write_payload(write: &OperationalWrite) -> &str {
2072    match write {
2073        OperationalWrite::Append { payload_json, .. }
2074        | OperationalWrite::Put { payload_json, .. } => payload_json,
2075        OperationalWrite::Delete { .. } => "null",
2076    }
2077}
2078
2079fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2080    match write {
2081        OperationalWrite::Append { source_ref, .. }
2082        | OperationalWrite::Put { source_ref, .. }
2083        | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2084    }
2085}
2086
2087#[cfg(test)]
2088#[allow(clippy::expect_used)]
2089mod tests {
2090    use std::sync::Arc;
2091
2092    use fathomdb_schema::SchemaManager;
2093    use tempfile::NamedTempFile;
2094
2095    use super::{apply_write, prepare_write, resolve_operational_writes};
2096    use crate::{
2097        ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2098        NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2099        StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2100        projection::ProjectionTarget,
2101    };
2102
2103    #[test]
2104    fn writer_executes_runtime_table_rows() {
2105        let db = NamedTempFile::new().expect("temporary db");
2106        let writer = WriterActor::start(
2107            db.path(),
2108            Arc::new(SchemaManager::new()),
2109            ProvenanceMode::Warn,
2110            Arc::new(TelemetryCounters::default()),
2111        )
2112        .expect("writer");
2113
2114        let receipt = writer
2115            .submit(WriteRequest {
2116                label: "runtime".to_owned(),
2117                nodes: vec![],
2118                node_retires: vec![],
2119                edges: vec![],
2120                edge_retires: vec![],
2121                chunks: vec![],
2122                runs: vec![RunInsert {
2123                    id: "run-1".to_owned(),
2124                    kind: "session".to_owned(),
2125                    status: "completed".to_owned(),
2126                    properties: "{}".to_owned(),
2127                    source_ref: Some("src-1".to_owned()),
2128                    upsert: false,
2129                    supersedes_id: None,
2130                }],
2131                steps: vec![StepInsert {
2132                    id: "step-1".to_owned(),
2133                    run_id: "run-1".to_owned(),
2134                    kind: "llm".to_owned(),
2135                    status: "completed".to_owned(),
2136                    properties: "{}".to_owned(),
2137                    source_ref: Some("src-1".to_owned()),
2138                    upsert: false,
2139                    supersedes_id: None,
2140                }],
2141                actions: vec![ActionInsert {
2142                    id: "action-1".to_owned(),
2143                    step_id: "step-1".to_owned(),
2144                    kind: "emit".to_owned(),
2145                    status: "completed".to_owned(),
2146                    properties: "{}".to_owned(),
2147                    source_ref: Some("src-1".to_owned()),
2148                    upsert: false,
2149                    supersedes_id: None,
2150                }],
2151                optional_backfills: vec![],
2152                vec_inserts: vec![],
2153                operational_writes: vec![],
2154            })
2155            .expect("write receipt");
2156
2157        assert_eq!(receipt.label, "runtime");
2158    }
2159    #[test]
2160    fn writer_put_operational_write_updates_current_and_mutations() {
2161        let db = NamedTempFile::new().expect("temporary db");
2162        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2163        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2164        conn.execute(
2165            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2166             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2167            [],
2168        )
2169        .expect("seed collection");
2170        drop(conn);
2171        let writer = WriterActor::start(
2172            db.path(),
2173            Arc::new(SchemaManager::new()),
2174            ProvenanceMode::Warn,
2175            Arc::new(TelemetryCounters::default()),
2176        )
2177        .expect("writer");
2178
2179        writer
2180            .submit(WriteRequest {
2181                label: "node-and-operational".to_owned(),
2182                nodes: vec![NodeInsert {
2183                    row_id: "row-1".to_owned(),
2184                    logical_id: "lg-1".to_owned(),
2185                    kind: "Meeting".to_owned(),
2186                    properties: "{}".to_owned(),
2187                    source_ref: Some("src-1".to_owned()),
2188                    upsert: false,
2189                    chunk_policy: ChunkPolicy::Preserve,
2190                    content_ref: None,
2191                }],
2192                node_retires: vec![],
2193                edges: vec![],
2194                edge_retires: vec![],
2195                chunks: vec![],
2196                runs: vec![],
2197                steps: vec![],
2198                actions: vec![],
2199                optional_backfills: vec![],
2200                vec_inserts: vec![],
2201                operational_writes: vec![OperationalWrite::Put {
2202                    collection: "connector_health".to_owned(),
2203                    record_key: "gmail".to_owned(),
2204                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2205                    source_ref: Some("src-1".to_owned()),
2206                }],
2207            })
2208            .expect("write receipt");
2209
2210        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2211        let node_count: i64 = conn
2212            .query_row(
2213                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2214                [],
2215                |row| row.get(0),
2216            )
2217            .expect("node count");
2218        assert_eq!(node_count, 1);
2219        let mutation_count: i64 = conn
2220            .query_row(
2221                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2222                 AND record_key = 'gmail'",
2223                [],
2224                |row| row.get(0),
2225            )
2226            .expect("mutation count");
2227        assert_eq!(mutation_count, 1);
2228        let payload: String = conn
2229            .query_row(
2230                "SELECT payload_json FROM operational_current \
2231                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2232                [],
2233                |row| row.get(0),
2234            )
2235            .expect("current payload");
2236        assert_eq!(payload, r#"{"status":"ok"}"#);
2237    }
2238
2239    #[test]
2240    fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2241        let db = NamedTempFile::new().expect("temporary db");
2242        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2243        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2244        conn.execute(
2245            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2246             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2247            [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2248        )
2249        .expect("seed collection");
2250        drop(conn);
2251        let writer = WriterActor::start(
2252            db.path(),
2253            Arc::new(SchemaManager::new()),
2254            ProvenanceMode::Warn,
2255            Arc::new(TelemetryCounters::default()),
2256        )
2257        .expect("writer");
2258
2259        writer
2260            .submit(WriteRequest {
2261                label: "disabled-validation".to_owned(),
2262                nodes: vec![],
2263                node_retires: vec![],
2264                edges: vec![],
2265                edge_retires: vec![],
2266                chunks: vec![],
2267                runs: vec![],
2268                steps: vec![],
2269                actions: vec![],
2270                optional_backfills: vec![],
2271                vec_inserts: vec![],
2272                operational_writes: vec![OperationalWrite::Put {
2273                    collection: "connector_health".to_owned(),
2274                    record_key: "gmail".to_owned(),
2275                    payload_json: r#"{"bogus":true}"#.to_owned(),
2276                    source_ref: Some("src-1".to_owned()),
2277                }],
2278            })
2279            .expect("write receipt");
2280
2281        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2282        let payload: String = conn
2283            .query_row(
2284                "SELECT payload_json FROM operational_current \
2285                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2286                [],
2287                |row| row.get(0),
2288            )
2289            .expect("current payload");
2290        assert_eq!(payload, r#"{"bogus":true}"#);
2291    }
2292
2293    #[test]
2294    fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2295        let db = NamedTempFile::new().expect("temporary db");
2296        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2297        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2298        conn.execute(
2299            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2300             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2301            [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2302        )
2303        .expect("seed collection");
2304        drop(conn);
2305        let writer = WriterActor::start(
2306            db.path(),
2307            Arc::new(SchemaManager::new()),
2308            ProvenanceMode::Warn,
2309            Arc::new(TelemetryCounters::default()),
2310        )
2311        .expect("writer");
2312
2313        let receipt = writer
2314            .submit(WriteRequest {
2315                label: "report-only-validation".to_owned(),
2316                nodes: vec![],
2317                node_retires: vec![],
2318                edges: vec![],
2319                edge_retires: vec![],
2320                chunks: vec![],
2321                runs: vec![],
2322                steps: vec![],
2323                actions: vec![],
2324                optional_backfills: vec![],
2325                vec_inserts: vec![],
2326                operational_writes: vec![OperationalWrite::Put {
2327                    collection: "connector_health".to_owned(),
2328                    record_key: "gmail".to_owned(),
2329                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2330                    source_ref: Some("src-1".to_owned()),
2331                }],
2332            })
2333            .expect("report_only write should succeed");
2334
2335        assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2336        assert_eq!(receipt.warnings.len(), 1);
2337        assert!(
2338            receipt.warnings[0].contains("connector_health"),
2339            "warning should identify collection"
2340        );
2341        assert!(
2342            receipt.warnings[0].contains("must be one of"),
2343            "warning should explain validation failure"
2344        );
2345
2346        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2347        let payload: String = conn
2348            .query_row(
2349                "SELECT payload_json FROM operational_current \
2350                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2351                [],
2352                |row| row.get(0),
2353            )
2354            .expect("current payload");
2355        assert_eq!(payload, r#"{"status":"bogus"}"#);
2356    }
2357
2358    #[test]
2359    fn writer_rejects_operational_write_for_missing_collection() {
2360        let db = NamedTempFile::new().expect("temporary db");
2361        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2362        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2363        drop(conn);
2364        let writer = WriterActor::start(
2365            db.path(),
2366            Arc::new(SchemaManager::new()),
2367            ProvenanceMode::Warn,
2368            Arc::new(TelemetryCounters::default()),
2369        )
2370        .expect("writer");
2371
2372        let result = writer.submit(WriteRequest {
2373            label: "missing-operational-collection".to_owned(),
2374            nodes: vec![],
2375            node_retires: vec![],
2376            edges: vec![],
2377            edge_retires: vec![],
2378            chunks: vec![],
2379            runs: vec![],
2380            steps: vec![],
2381            actions: vec![],
2382            optional_backfills: vec![],
2383            vec_inserts: vec![],
2384            operational_writes: vec![OperationalWrite::Put {
2385                collection: "connector_health".to_owned(),
2386                record_key: "gmail".to_owned(),
2387                payload_json: r#"{"status":"ok"}"#.to_owned(),
2388                source_ref: Some("src-1".to_owned()),
2389            }],
2390        });
2391
2392        assert!(
2393            matches!(result, Err(EngineError::InvalidWrite(_))),
2394            "missing operational collection must return InvalidWrite"
2395        );
2396    }
2397
2398    #[test]
2399    fn writer_append_operational_write_records_history_without_current_row() {
2400        let db = NamedTempFile::new().expect("temporary db");
2401        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2402        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2403        conn.execute(
2404            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2405             VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2406            [],
2407        )
2408        .expect("seed collection");
2409        drop(conn);
2410        let writer = WriterActor::start(
2411            db.path(),
2412            Arc::new(SchemaManager::new()),
2413            ProvenanceMode::Warn,
2414            Arc::new(TelemetryCounters::default()),
2415        )
2416        .expect("writer");
2417
2418        writer
2419            .submit(WriteRequest {
2420                label: "append-operational".to_owned(),
2421                nodes: vec![],
2422                node_retires: vec![],
2423                edges: vec![],
2424                edge_retires: vec![],
2425                chunks: vec![],
2426                runs: vec![],
2427                steps: vec![],
2428                actions: vec![],
2429                optional_backfills: vec![],
2430                vec_inserts: vec![],
2431                operational_writes: vec![OperationalWrite::Append {
2432                    collection: "audit_log".to_owned(),
2433                    record_key: "evt-1".to_owned(),
2434                    payload_json: r#"{"type":"sync"}"#.to_owned(),
2435                    source_ref: Some("src-1".to_owned()),
2436                }],
2437            })
2438            .expect("write receipt");
2439
2440        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2441        let mutation: (String, String) = conn
2442            .query_row(
2443                "SELECT op_kind, payload_json FROM operational_mutations \
2444                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2445                [],
2446                |row| Ok((row.get(0)?, row.get(1)?)),
2447            )
2448            .expect("mutation row");
2449        assert_eq!(mutation.0, "append");
2450        assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2451        let current_count: i64 = conn
2452            .query_row(
2453                "SELECT count(*) FROM operational_current \
2454                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2455                [],
2456                |row| row.get(0),
2457            )
2458            .expect("current count");
2459        assert_eq!(current_count, 0);
2460    }
2461
2462    #[test]
2463    fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2464        let db = NamedTempFile::new().expect("temporary db");
2465        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2466        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2467        conn.execute(
2468            "INSERT INTO operational_collections \
2469             (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2470             VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2471                     '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2472            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2473        )
2474        .expect("seed collection");
2475        drop(conn);
2476        let writer = WriterActor::start(
2477            db.path(),
2478            Arc::new(SchemaManager::new()),
2479            ProvenanceMode::Warn,
2480            Arc::new(TelemetryCounters::default()),
2481        )
2482        .expect("writer");
2483
2484        let error = writer
2485            .submit(WriteRequest {
2486                label: "invalid-append".to_owned(),
2487                nodes: vec![],
2488                node_retires: vec![],
2489                edges: vec![],
2490                edge_retires: vec![],
2491                chunks: vec![],
2492                runs: vec![],
2493                steps: vec![],
2494                actions: vec![],
2495                optional_backfills: vec![],
2496                vec_inserts: vec![],
2497                operational_writes: vec![OperationalWrite::Append {
2498                    collection: "audit_log".to_owned(),
2499                    record_key: "evt-1".to_owned(),
2500                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2501                    source_ref: Some("src-1".to_owned()),
2502                }],
2503            })
2504            .expect_err("invalid append must reject");
2505        assert!(matches!(error, EngineError::InvalidWrite(_)));
2506        assert!(error.to_string().contains("must be one of"));
2507
2508        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2509        let mutation_count: i64 = conn
2510            .query_row(
2511                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2512                [],
2513                |row| row.get(0),
2514            )
2515            .expect("mutation count");
2516        assert_eq!(mutation_count, 0);
2517        let filter_count: i64 = conn
2518            .query_row(
2519                "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2520                [],
2521                |row| row.get(0),
2522            )
2523            .expect("filter count");
2524        assert_eq!(filter_count, 0);
2525    }
2526
2527    #[test]
2528    fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2529        let db = NamedTempFile::new().expect("temporary db");
2530        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2531        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2532        conn.execute(
2533            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2534             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2535            [],
2536        )
2537        .expect("seed collection");
2538        drop(conn);
2539        let writer = WriterActor::start(
2540            db.path(),
2541            Arc::new(SchemaManager::new()),
2542            ProvenanceMode::Warn,
2543            Arc::new(TelemetryCounters::default()),
2544        )
2545        .expect("writer");
2546
2547        writer
2548            .submit(WriteRequest {
2549                label: "put-operational".to_owned(),
2550                nodes: vec![],
2551                node_retires: vec![],
2552                edges: vec![],
2553                edge_retires: vec![],
2554                chunks: vec![],
2555                runs: vec![],
2556                steps: vec![],
2557                actions: vec![],
2558                optional_backfills: vec![],
2559                vec_inserts: vec![],
2560                operational_writes: vec![OperationalWrite::Put {
2561                    collection: "connector_health".to_owned(),
2562                    record_key: "gmail".to_owned(),
2563                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2564                    source_ref: Some("src-1".to_owned()),
2565                }],
2566            })
2567            .expect("put receipt");
2568
2569        writer
2570            .submit(WriteRequest {
2571                label: "delete-operational".to_owned(),
2572                nodes: vec![],
2573                node_retires: vec![],
2574                edges: vec![],
2575                edge_retires: vec![],
2576                chunks: vec![],
2577                runs: vec![],
2578                steps: vec![],
2579                actions: vec![],
2580                optional_backfills: vec![],
2581                vec_inserts: vec![],
2582                operational_writes: vec![OperationalWrite::Delete {
2583                    collection: "connector_health".to_owned(),
2584                    record_key: "gmail".to_owned(),
2585                    source_ref: Some("src-2".to_owned()),
2586                }],
2587            })
2588            .expect("delete receipt");
2589
2590        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2591        let mutation_kinds: Vec<String> = {
2592            let mut stmt = conn
2593                .prepare(
2594                    "SELECT op_kind FROM operational_mutations \
2595                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2596                     ORDER BY mutation_order ASC",
2597                )
2598                .expect("stmt");
2599            stmt.query_map([], |row| row.get(0))
2600                .expect("rows")
2601                .collect::<Result<_, _>>()
2602                .expect("collect")
2603        };
2604        assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
2605        let current_count: i64 = conn
2606            .query_row(
2607                "SELECT count(*) FROM operational_current \
2608                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2609                [],
2610                |row| row.get(0),
2611            )
2612            .expect("current count");
2613        assert_eq!(current_count, 0);
2614    }
2615
2616    #[test]
2617    fn writer_delete_bypasses_validation_contract() {
2618        let db = NamedTempFile::new().expect("temporary db");
2619        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2620        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2621        conn.execute(
2622            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2623             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2624            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2625        )
2626        .expect("seed collection");
2627        drop(conn);
2628        let writer = WriterActor::start(
2629            db.path(),
2630            Arc::new(SchemaManager::new()),
2631            ProvenanceMode::Warn,
2632            Arc::new(TelemetryCounters::default()),
2633        )
2634        .expect("writer");
2635
2636        writer
2637            .submit(WriteRequest {
2638                label: "valid-put".to_owned(),
2639                nodes: vec![],
2640                node_retires: vec![],
2641                edges: vec![],
2642                edge_retires: vec![],
2643                chunks: vec![],
2644                runs: vec![],
2645                steps: vec![],
2646                actions: vec![],
2647                optional_backfills: vec![],
2648                vec_inserts: vec![],
2649                operational_writes: vec![OperationalWrite::Put {
2650                    collection: "connector_health".to_owned(),
2651                    record_key: "gmail".to_owned(),
2652                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2653                    source_ref: Some("src-1".to_owned()),
2654                }],
2655            })
2656            .expect("put receipt");
2657        writer
2658            .submit(WriteRequest {
2659                label: "delete-after-put".to_owned(),
2660                nodes: vec![],
2661                node_retires: vec![],
2662                edges: vec![],
2663                edge_retires: vec![],
2664                chunks: vec![],
2665                runs: vec![],
2666                steps: vec![],
2667                actions: vec![],
2668                optional_backfills: vec![],
2669                vec_inserts: vec![],
2670                operational_writes: vec![OperationalWrite::Delete {
2671                    collection: "connector_health".to_owned(),
2672                    record_key: "gmail".to_owned(),
2673                    source_ref: Some("src-2".to_owned()),
2674                }],
2675            })
2676            .expect("delete receipt");
2677
2678        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2679        let current_count: i64 = conn
2680            .query_row(
2681                "SELECT count(*) FROM operational_current \
2682                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2683                [],
2684                |row| row.get(0),
2685            )
2686            .expect("current count");
2687        assert_eq!(current_count, 0);
2688    }
2689
2690    #[test]
2691    fn writer_latest_state_secondary_indexes_track_put_and_delete() {
2692        let db = NamedTempFile::new().expect("temporary db");
2693        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2694        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2695        conn.execute(
2696            "INSERT INTO operational_collections \
2697             (name, kind, schema_json, retention_json, secondary_indexes_json) \
2698             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2699            [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
2700        )
2701        .expect("seed collection");
2702        drop(conn);
2703        let writer = WriterActor::start(
2704            db.path(),
2705            Arc::new(SchemaManager::new()),
2706            ProvenanceMode::Warn,
2707            Arc::new(TelemetryCounters::default()),
2708        )
2709        .expect("writer");
2710
2711        writer
2712            .submit(WriteRequest {
2713                label: "secondary-index-put".to_owned(),
2714                nodes: vec![],
2715                node_retires: vec![],
2716                edges: vec![],
2717                edge_retires: vec![],
2718                chunks: vec![],
2719                runs: vec![],
2720                steps: vec![],
2721                actions: vec![],
2722                optional_backfills: vec![],
2723                vec_inserts: vec![],
2724                operational_writes: vec![OperationalWrite::Put {
2725                    collection: "connector_health".to_owned(),
2726                    record_key: "gmail".to_owned(),
2727                    payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
2728                        .to_owned(),
2729                    source_ref: Some("src-1".to_owned()),
2730                }],
2731            })
2732            .expect("put receipt");
2733
2734        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2735        let current_entry_count: i64 = conn
2736            .query_row(
2737                "SELECT count(*) FROM operational_secondary_index_entries \
2738                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2739                [],
2740                |row| row.get(0),
2741            )
2742            .expect("current secondary index count");
2743        assert_eq!(current_entry_count, 2);
2744        drop(conn);
2745
2746        writer
2747            .submit(WriteRequest {
2748                label: "secondary-index-delete".to_owned(),
2749                nodes: vec![],
2750                node_retires: vec![],
2751                edges: vec![],
2752                edge_retires: vec![],
2753                chunks: vec![],
2754                runs: vec![],
2755                steps: vec![],
2756                actions: vec![],
2757                optional_backfills: vec![],
2758                vec_inserts: vec![],
2759                operational_writes: vec![OperationalWrite::Delete {
2760                    collection: "connector_health".to_owned(),
2761                    record_key: "gmail".to_owned(),
2762                    source_ref: Some("src-2".to_owned()),
2763                }],
2764            })
2765            .expect("delete receipt");
2766
2767        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2768        let current_entry_count: i64 = conn
2769            .query_row(
2770                "SELECT count(*) FROM operational_secondary_index_entries \
2771                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2772                [],
2773                |row| row.get(0),
2774            )
2775            .expect("current secondary index count");
2776        assert_eq!(current_entry_count, 0);
2777    }
2778
2779    #[test]
2780    fn writer_latest_state_operational_writes_persist_mutation_order() {
2781        let db = NamedTempFile::new().expect("temporary db");
2782        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2783        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2784        conn.execute(
2785            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2786             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2787            [],
2788        )
2789        .expect("seed collection");
2790        drop(conn);
2791
2792        let writer = WriterActor::start(
2793            db.path(),
2794            Arc::new(SchemaManager::new()),
2795            ProvenanceMode::Warn,
2796            Arc::new(TelemetryCounters::default()),
2797        )
2798        .expect("writer");
2799
2800        writer
2801            .submit(WriteRequest {
2802                label: "ordered-operational-batch".to_owned(),
2803                nodes: vec![],
2804                node_retires: vec![],
2805                edges: vec![],
2806                edge_retires: vec![],
2807                chunks: vec![],
2808                runs: vec![],
2809                steps: vec![],
2810                actions: vec![],
2811                optional_backfills: vec![],
2812                vec_inserts: vec![],
2813                operational_writes: vec![
2814                    OperationalWrite::Put {
2815                        collection: "connector_health".to_owned(),
2816                        record_key: "gmail".to_owned(),
2817                        payload_json: r#"{"status":"old"}"#.to_owned(),
2818                        source_ref: Some("src-1".to_owned()),
2819                    },
2820                    OperationalWrite::Delete {
2821                        collection: "connector_health".to_owned(),
2822                        record_key: "gmail".to_owned(),
2823                        source_ref: Some("src-2".to_owned()),
2824                    },
2825                    OperationalWrite::Put {
2826                        collection: "connector_health".to_owned(),
2827                        record_key: "gmail".to_owned(),
2828                        payload_json: r#"{"status":"new"}"#.to_owned(),
2829                        source_ref: Some("src-3".to_owned()),
2830                    },
2831                ],
2832            })
2833            .expect("write receipt");
2834
2835        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2836        let rows: Vec<(String, i64)> = {
2837            let mut stmt = conn
2838                .prepare(
2839                    "SELECT op_kind, mutation_order FROM operational_mutations \
2840                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2841                     ORDER BY mutation_order ASC",
2842                )
2843                .expect("stmt");
2844            stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
2845                .expect("rows")
2846                .collect::<Result<_, _>>()
2847                .expect("collect")
2848        };
2849        assert_eq!(
2850            rows,
2851            vec![
2852                ("put".to_owned(), 1),
2853                ("delete".to_owned(), 2),
2854                ("put".to_owned(), 3),
2855            ]
2856        );
2857        let payload: String = conn
2858            .query_row(
2859                "SELECT payload_json FROM operational_current \
2860                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2861                [],
2862                |row| row.get(0),
2863            )
2864            .expect("current payload");
2865        assert_eq!(payload, r#"{"status":"new"}"#);
2866    }
2867
2868    #[test]
2869    fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
2870        let db = NamedTempFile::new().expect("temporary db");
2871        let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
2872        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2873        conn.execute(
2874            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2875             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2876            [],
2877        )
2878        .expect("seed collection");
2879
2880        let request = WriteRequest {
2881            label: "disabled-race".to_owned(),
2882            nodes: vec![],
2883            node_retires: vec![],
2884            edges: vec![],
2885            edge_retires: vec![],
2886            chunks: vec![],
2887            runs: vec![],
2888            steps: vec![],
2889            actions: vec![],
2890            optional_backfills: vec![],
2891            vec_inserts: vec![],
2892            operational_writes: vec![OperationalWrite::Put {
2893                collection: "connector_health".to_owned(),
2894                record_key: "gmail".to_owned(),
2895                payload_json: r#"{"status":"ok"}"#.to_owned(),
2896                source_ref: Some("src-1".to_owned()),
2897            }],
2898        };
2899        let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
2900        resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
2901
2902        conn.execute(
2903            "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
2904            [],
2905        )
2906        .expect("disable collection after preflight");
2907
2908        let error =
2909            apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
2910        assert!(matches!(error, EngineError::InvalidWrite(_)));
2911        assert!(error.to_string().contains("is disabled"));
2912
2913        let mutation_count: i64 = conn
2914            .query_row(
2915                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
2916                [],
2917                |row| row.get(0),
2918            )
2919            .expect("mutation count");
2920        assert_eq!(mutation_count, 0);
2921
2922        let current_count: i64 = conn
2923            .query_row(
2924                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
2925                [],
2926                |row| row.get(0),
2927            )
2928            .expect("current count");
2929        assert_eq!(current_count, 0);
2930    }
2931
2932    #[test]
2933    fn writer_enforce_validation_rejects_invalid_put_atomically() {
2934        let db = NamedTempFile::new().expect("temporary db");
2935        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2936        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2937        conn.execute(
2938            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2939             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2940            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2941        )
2942        .expect("seed collection");
2943        drop(conn);
2944        let writer = WriterActor::start(
2945            db.path(),
2946            Arc::new(SchemaManager::new()),
2947            ProvenanceMode::Warn,
2948            Arc::new(TelemetryCounters::default()),
2949        )
2950        .expect("writer");
2951
2952        let error = writer
2953            .submit(WriteRequest {
2954                label: "invalid-put".to_owned(),
2955                nodes: vec![NodeInsert {
2956                    row_id: "row-1".to_owned(),
2957                    logical_id: "lg-1".to_owned(),
2958                    kind: "Meeting".to_owned(),
2959                    properties: "{}".to_owned(),
2960                    source_ref: Some("src-1".to_owned()),
2961                    upsert: false,
2962                    chunk_policy: ChunkPolicy::Preserve,
2963                    content_ref: None,
2964                }],
2965                node_retires: vec![],
2966                edges: vec![],
2967                edge_retires: vec![],
2968                chunks: vec![],
2969                runs: vec![],
2970                steps: vec![],
2971                actions: vec![],
2972                optional_backfills: vec![],
2973                vec_inserts: vec![],
2974                operational_writes: vec![OperationalWrite::Put {
2975                    collection: "connector_health".to_owned(),
2976                    record_key: "gmail".to_owned(),
2977                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2978                    source_ref: Some("src-1".to_owned()),
2979                }],
2980            })
2981            .expect_err("invalid put must reject");
2982        assert!(matches!(error, EngineError::InvalidWrite(_)));
2983        assert!(error.to_string().contains("must be one of"));
2984
2985        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2986        let node_count: i64 = conn
2987            .query_row(
2988                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2989                [],
2990                |row| row.get(0),
2991            )
2992            .expect("node count");
2993        assert_eq!(node_count, 0);
2994        let mutation_count: i64 = conn
2995            .query_row(
2996                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
2997                [],
2998                |row| row.get(0),
2999            )
3000            .expect("mutation count");
3001        assert_eq!(mutation_count, 0);
3002        let current_count: i64 = conn
3003            .query_row(
3004                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3005                [],
3006                |row| row.get(0),
3007            )
3008            .expect("current count");
3009        assert_eq!(current_count, 0);
3010    }
3011
3012    #[test]
3013    fn writer_rejects_append_against_latest_state_collection() {
3014        let db = NamedTempFile::new().expect("temporary db");
3015        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3016        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3017        conn.execute(
3018            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3019             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3020            [],
3021        )
3022        .expect("seed collection");
3023        drop(conn);
3024        let writer = WriterActor::start(
3025            db.path(),
3026            Arc::new(SchemaManager::new()),
3027            ProvenanceMode::Warn,
3028            Arc::new(TelemetryCounters::default()),
3029        )
3030        .expect("writer");
3031
3032        let result = writer.submit(WriteRequest {
3033            label: "bad-append".to_owned(),
3034            nodes: vec![],
3035            node_retires: vec![],
3036            edges: vec![],
3037            edge_retires: vec![],
3038            chunks: vec![],
3039            runs: vec![],
3040            steps: vec![],
3041            actions: vec![],
3042            optional_backfills: vec![],
3043            vec_inserts: vec![],
3044            operational_writes: vec![OperationalWrite::Append {
3045                collection: "connector_health".to_owned(),
3046                record_key: "gmail".to_owned(),
3047                payload_json: r#"{"status":"ok"}"#.to_owned(),
3048                source_ref: Some("src-1".to_owned()),
3049            }],
3050        });
3051
3052        assert!(
3053            matches!(result, Err(EngineError::InvalidWrite(_))),
3054            "latest_state collection must reject Append"
3055        );
3056    }
3057
3058    #[test]
3059    fn writer_upsert_supersedes_prior_active_node() {
3060        let db = NamedTempFile::new().expect("temporary db");
3061        let writer = WriterActor::start(
3062            db.path(),
3063            Arc::new(SchemaManager::new()),
3064            ProvenanceMode::Warn,
3065            Arc::new(TelemetryCounters::default()),
3066        )
3067        .expect("writer");
3068
3069        writer
3070            .submit(WriteRequest {
3071                label: "v1".to_owned(),
3072                nodes: vec![NodeInsert {
3073                    row_id: "row-1".to_owned(),
3074                    logical_id: "lg-1".to_owned(),
3075                    kind: "Meeting".to_owned(),
3076                    properties: r#"{"version":1}"#.to_owned(),
3077                    source_ref: Some("src-1".to_owned()),
3078                    upsert: false,
3079                    chunk_policy: ChunkPolicy::Preserve,
3080                    content_ref: None,
3081                }],
3082                node_retires: vec![],
3083                edges: vec![],
3084                edge_retires: vec![],
3085                chunks: vec![],
3086                runs: vec![],
3087                steps: vec![],
3088                actions: vec![],
3089                optional_backfills: vec![],
3090                vec_inserts: vec![],
3091                operational_writes: vec![],
3092            })
3093            .expect("v1 write");
3094
3095        writer
3096            .submit(WriteRequest {
3097                label: "v2".to_owned(),
3098                nodes: vec![NodeInsert {
3099                    row_id: "row-2".to_owned(),
3100                    logical_id: "lg-1".to_owned(),
3101                    kind: "Meeting".to_owned(),
3102                    properties: r#"{"version":2}"#.to_owned(),
3103                    source_ref: Some("src-2".to_owned()),
3104                    upsert: true,
3105                    chunk_policy: ChunkPolicy::Preserve,
3106                    content_ref: None,
3107                }],
3108                node_retires: vec![],
3109                edges: vec![],
3110                edge_retires: vec![],
3111                chunks: vec![],
3112                runs: vec![],
3113                steps: vec![],
3114                actions: vec![],
3115                optional_backfills: vec![],
3116                vec_inserts: vec![],
3117                operational_writes: vec![],
3118            })
3119            .expect("v2 upsert write");
3120
3121        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3122        let (active_row_id, props): (String, String) = conn
3123            .query_row(
3124                "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3125                [],
3126                |row| Ok((row.get(0)?, row.get(1)?)),
3127            )
3128            .expect("active row");
3129        assert_eq!(active_row_id, "row-2");
3130        assert!(props.contains("\"version\":2"));
3131
3132        let superseded: i64 = conn
3133            .query_row(
3134                "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3135                [],
3136                |row| row.get(0),
3137            )
3138            .expect("superseded count");
3139        assert_eq!(superseded, 1);
3140    }
3141
3142    #[test]
3143    fn writer_inserts_edge_between_two_nodes() {
3144        let db = NamedTempFile::new().expect("temporary db");
3145        let writer = WriterActor::start(
3146            db.path(),
3147            Arc::new(SchemaManager::new()),
3148            ProvenanceMode::Warn,
3149            Arc::new(TelemetryCounters::default()),
3150        )
3151        .expect("writer");
3152
3153        writer
3154            .submit(WriteRequest {
3155                label: "nodes-and-edge".to_owned(),
3156                nodes: vec![
3157                    NodeInsert {
3158                        row_id: "row-meeting".to_owned(),
3159                        logical_id: "meeting-1".to_owned(),
3160                        kind: "Meeting".to_owned(),
3161                        properties: "{}".to_owned(),
3162                        source_ref: Some("src-1".to_owned()),
3163                        upsert: false,
3164                        chunk_policy: ChunkPolicy::Preserve,
3165                        content_ref: None,
3166                    },
3167                    NodeInsert {
3168                        row_id: "row-task".to_owned(),
3169                        logical_id: "task-1".to_owned(),
3170                        kind: "Task".to_owned(),
3171                        properties: "{}".to_owned(),
3172                        source_ref: Some("src-1".to_owned()),
3173                        upsert: false,
3174                        chunk_policy: ChunkPolicy::Preserve,
3175                        content_ref: None,
3176                    },
3177                ],
3178                node_retires: vec![],
3179                edges: vec![EdgeInsert {
3180                    row_id: "edge-1".to_owned(),
3181                    logical_id: "edge-lg-1".to_owned(),
3182                    source_logical_id: "meeting-1".to_owned(),
3183                    target_logical_id: "task-1".to_owned(),
3184                    kind: "HAS_TASK".to_owned(),
3185                    properties: "{}".to_owned(),
3186                    source_ref: Some("src-1".to_owned()),
3187                    upsert: false,
3188                }],
3189                edge_retires: vec![],
3190                chunks: vec![],
3191                runs: vec![],
3192                steps: vec![],
3193                actions: vec![],
3194                optional_backfills: vec![],
3195                vec_inserts: vec![],
3196                operational_writes: vec![],
3197            })
3198            .expect("write receipt");
3199
3200        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3201        let (src, tgt, kind): (String, String, String) = conn
3202            .query_row(
3203                "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3204                [],
3205                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3206            )
3207            .expect("edge row");
3208        assert_eq!(src, "meeting-1");
3209        assert_eq!(tgt, "task-1");
3210        assert_eq!(kind, "HAS_TASK");
3211    }
3212
3213    #[test]
3214    #[allow(clippy::too_many_lines)]
3215    fn writer_upsert_supersedes_prior_active_edge() {
3216        let db = NamedTempFile::new().expect("temporary db");
3217        let writer = WriterActor::start(
3218            db.path(),
3219            Arc::new(SchemaManager::new()),
3220            ProvenanceMode::Warn,
3221            Arc::new(TelemetryCounters::default()),
3222        )
3223        .expect("writer");
3224
3225        // Write two nodes
3226        writer
3227            .submit(WriteRequest {
3228                label: "nodes".to_owned(),
3229                nodes: vec![
3230                    NodeInsert {
3231                        row_id: "row-a".to_owned(),
3232                        logical_id: "node-a".to_owned(),
3233                        kind: "Meeting".to_owned(),
3234                        properties: "{}".to_owned(),
3235                        source_ref: Some("src-1".to_owned()),
3236                        upsert: false,
3237                        chunk_policy: ChunkPolicy::Preserve,
3238                        content_ref: None,
3239                    },
3240                    NodeInsert {
3241                        row_id: "row-b".to_owned(),
3242                        logical_id: "node-b".to_owned(),
3243                        kind: "Task".to_owned(),
3244                        properties: "{}".to_owned(),
3245                        source_ref: Some("src-1".to_owned()),
3246                        upsert: false,
3247                        chunk_policy: ChunkPolicy::Preserve,
3248                        content_ref: None,
3249                    },
3250                ],
3251                node_retires: vec![],
3252                edges: vec![],
3253                edge_retires: vec![],
3254                chunks: vec![],
3255                runs: vec![],
3256                steps: vec![],
3257                actions: vec![],
3258                optional_backfills: vec![],
3259                vec_inserts: vec![],
3260                operational_writes: vec![],
3261            })
3262            .expect("nodes write");
3263
3264        // Write v1 edge
3265        writer
3266            .submit(WriteRequest {
3267                label: "edge-v1".to_owned(),
3268                nodes: vec![],
3269                node_retires: vec![],
3270                edges: vec![EdgeInsert {
3271                    row_id: "edge-row-1".to_owned(),
3272                    logical_id: "edge-lg-1".to_owned(),
3273                    source_logical_id: "node-a".to_owned(),
3274                    target_logical_id: "node-b".to_owned(),
3275                    kind: "HAS_TASK".to_owned(),
3276                    properties: r#"{"weight":1}"#.to_owned(),
3277                    source_ref: Some("src-1".to_owned()),
3278                    upsert: false,
3279                }],
3280                edge_retires: vec![],
3281                chunks: vec![],
3282                runs: vec![],
3283                steps: vec![],
3284                actions: vec![],
3285                optional_backfills: vec![],
3286                vec_inserts: vec![],
3287                operational_writes: vec![],
3288            })
3289            .expect("edge v1 write");
3290
3291        // Upsert v2 edge
3292        writer
3293            .submit(WriteRequest {
3294                label: "edge-v2".to_owned(),
3295                nodes: vec![],
3296                node_retires: vec![],
3297                edges: vec![EdgeInsert {
3298                    row_id: "edge-row-2".to_owned(),
3299                    logical_id: "edge-lg-1".to_owned(),
3300                    source_logical_id: "node-a".to_owned(),
3301                    target_logical_id: "node-b".to_owned(),
3302                    kind: "HAS_TASK".to_owned(),
3303                    properties: r#"{"weight":2}"#.to_owned(),
3304                    source_ref: Some("src-2".to_owned()),
3305                    upsert: true,
3306                }],
3307                edge_retires: vec![],
3308                chunks: vec![],
3309                runs: vec![],
3310                steps: vec![],
3311                actions: vec![],
3312                optional_backfills: vec![],
3313                vec_inserts: vec![],
3314                operational_writes: vec![],
3315            })
3316            .expect("edge v2 upsert");
3317
3318        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3319        let (active_row_id, props): (String, String) = conn
3320            .query_row(
3321                "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3322                [],
3323                |row| Ok((row.get(0)?, row.get(1)?)),
3324            )
3325            .expect("active edge");
3326        assert_eq!(active_row_id, "edge-row-2");
3327        assert!(props.contains("\"weight\":2"));
3328
3329        let superseded: i64 = conn
3330            .query_row(
3331                "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3332                [],
3333                |row| row.get(0),
3334            )
3335            .expect("superseded count");
3336        assert_eq!(superseded, 1);
3337    }
3338
3339    #[test]
3340    fn writer_fts_rows_are_written_to_database() {
3341        let db = NamedTempFile::new().expect("temporary db");
3342        let writer = WriterActor::start(
3343            db.path(),
3344            Arc::new(SchemaManager::new()),
3345            ProvenanceMode::Warn,
3346            Arc::new(TelemetryCounters::default()),
3347        )
3348        .expect("writer");
3349
3350        writer
3351            .submit(WriteRequest {
3352                label: "seed".to_owned(),
3353                nodes: vec![NodeInsert {
3354                    row_id: "row-1".to_owned(),
3355                    logical_id: "logical-1".to_owned(),
3356                    kind: "Meeting".to_owned(),
3357                    properties: "{}".to_owned(),
3358                    source_ref: Some("src-1".to_owned()),
3359                    upsert: false,
3360                    chunk_policy: ChunkPolicy::Preserve,
3361                    content_ref: None,
3362                }],
3363                node_retires: vec![],
3364                edges: vec![],
3365                edge_retires: vec![],
3366                chunks: vec![ChunkInsert {
3367                    id: "chunk-1".to_owned(),
3368                    node_logical_id: "logical-1".to_owned(),
3369                    text_content: "budget discussion".to_owned(),
3370                    byte_start: None,
3371                    byte_end: None,
3372                    content_hash: None,
3373                }],
3374                runs: vec![],
3375                steps: vec![],
3376                actions: vec![],
3377                optional_backfills: vec![],
3378                vec_inserts: vec![],
3379                operational_writes: vec![],
3380            })
3381            .expect("write receipt");
3382
3383        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3384        let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3385            conn.query_row(
3386                "SELECT chunk_id, node_logical_id, kind, text_content \
3387                 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3388                [],
3389                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3390            )
3391            .expect("fts row");
3392        assert_eq!(chunk_id, "chunk-1");
3393        assert_eq!(node_logical_id, "logical-1");
3394        assert_eq!(kind, "Meeting");
3395        assert_eq!(text_content, "budget discussion");
3396    }
3397
3398    #[test]
3399    fn writer_receipt_warns_on_nodes_without_source_ref() {
3400        let db = NamedTempFile::new().expect("temporary db");
3401        let writer = WriterActor::start(
3402            db.path(),
3403            Arc::new(SchemaManager::new()),
3404            ProvenanceMode::Warn,
3405            Arc::new(TelemetryCounters::default()),
3406        )
3407        .expect("writer");
3408
3409        let receipt = writer
3410            .submit(WriteRequest {
3411                label: "no-source".to_owned(),
3412                nodes: vec![NodeInsert {
3413                    row_id: "row-1".to_owned(),
3414                    logical_id: "logical-1".to_owned(),
3415                    kind: "Meeting".to_owned(),
3416                    properties: "{}".to_owned(),
3417                    source_ref: None,
3418                    upsert: false,
3419                    chunk_policy: ChunkPolicy::Preserve,
3420                    content_ref: None,
3421                }],
3422                node_retires: vec![],
3423                edges: vec![],
3424                edge_retires: vec![],
3425                chunks: vec![],
3426                runs: vec![],
3427                steps: vec![],
3428                actions: vec![],
3429                optional_backfills: vec![],
3430                vec_inserts: vec![],
3431                operational_writes: vec![],
3432            })
3433            .expect("write receipt");
3434
3435        assert_eq!(receipt.provenance_warnings.len(), 1);
3436        assert!(receipt.provenance_warnings[0].contains("logical-1"));
3437    }
3438
3439    #[test]
3440    fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3441        let db = NamedTempFile::new().expect("temporary db");
3442        let writer = WriterActor::start(
3443            db.path(),
3444            Arc::new(SchemaManager::new()),
3445            ProvenanceMode::Warn,
3446            Arc::new(TelemetryCounters::default()),
3447        )
3448        .expect("writer");
3449
3450        let receipt = writer
3451            .submit(WriteRequest {
3452                label: "with-source".to_owned(),
3453                nodes: vec![NodeInsert {
3454                    row_id: "row-1".to_owned(),
3455                    logical_id: "logical-1".to_owned(),
3456                    kind: "Meeting".to_owned(),
3457                    properties: "{}".to_owned(),
3458                    source_ref: Some("src-1".to_owned()),
3459                    upsert: false,
3460                    chunk_policy: ChunkPolicy::Preserve,
3461                    content_ref: None,
3462                }],
3463                node_retires: vec![],
3464                edges: vec![],
3465                edge_retires: vec![],
3466                chunks: vec![],
3467                runs: vec![],
3468                steps: vec![],
3469                actions: vec![],
3470                optional_backfills: vec![],
3471                vec_inserts: vec![],
3472                operational_writes: vec![],
3473            })
3474            .expect("write receipt");
3475
3476        assert!(receipt.provenance_warnings.is_empty());
3477    }
3478
3479    #[test]
3480    fn writer_accepts_chunk_for_pre_existing_node() {
3481        let db = NamedTempFile::new().expect("temporary db");
3482        let writer = WriterActor::start(
3483            db.path(),
3484            Arc::new(SchemaManager::new()),
3485            ProvenanceMode::Warn,
3486            Arc::new(TelemetryCounters::default()),
3487        )
3488        .expect("writer");
3489
3490        // Request 1: submit node only
3491        writer
3492            .submit(WriteRequest {
3493                label: "r1".to_owned(),
3494                nodes: vec![NodeInsert {
3495                    row_id: "row-1".to_owned(),
3496                    logical_id: "logical-1".to_owned(),
3497                    kind: "Meeting".to_owned(),
3498                    properties: "{}".to_owned(),
3499                    source_ref: Some("src-1".to_owned()),
3500                    upsert: false,
3501                    chunk_policy: ChunkPolicy::Preserve,
3502                    content_ref: None,
3503                }],
3504                node_retires: vec![],
3505                edges: vec![],
3506                edge_retires: vec![],
3507                chunks: vec![],
3508                runs: vec![],
3509                steps: vec![],
3510                actions: vec![],
3511                optional_backfills: vec![],
3512                vec_inserts: vec![],
3513                operational_writes: vec![],
3514            })
3515            .expect("r1 write");
3516
3517        // Request 2: submit chunk for pre-existing node
3518        writer
3519            .submit(WriteRequest {
3520                label: "r2".to_owned(),
3521                nodes: vec![],
3522                node_retires: vec![],
3523                edges: vec![],
3524                edge_retires: vec![],
3525                chunks: vec![ChunkInsert {
3526                    id: "chunk-1".to_owned(),
3527                    node_logical_id: "logical-1".to_owned(),
3528                    text_content: "budget discussion".to_owned(),
3529                    byte_start: None,
3530                    byte_end: None,
3531                    content_hash: None,
3532                }],
3533                runs: vec![],
3534                steps: vec![],
3535                actions: vec![],
3536                optional_backfills: vec![],
3537                vec_inserts: vec![],
3538                operational_writes: vec![],
3539            })
3540            .expect("r2 write — chunk for pre-existing node");
3541
3542        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3543        let count: i64 = conn
3544            .query_row(
3545                "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3546                [],
3547                |row| row.get(0),
3548            )
3549            .expect("fts count");
3550        assert_eq!(
3551            count, 1,
3552            "FTS row must exist for chunk attached to pre-existing node"
3553        );
3554    }
3555
3556    #[test]
3557    fn writer_rejects_chunk_for_completely_unknown_node() {
3558        let db = NamedTempFile::new().expect("temporary db");
3559        let writer = WriterActor::start(
3560            db.path(),
3561            Arc::new(SchemaManager::new()),
3562            ProvenanceMode::Warn,
3563            Arc::new(TelemetryCounters::default()),
3564        )
3565        .expect("writer");
3566
3567        let result = writer.submit(WriteRequest {
3568            label: "bad".to_owned(),
3569            nodes: vec![],
3570            node_retires: vec![],
3571            edges: vec![],
3572            edge_retires: vec![],
3573            chunks: vec![ChunkInsert {
3574                id: "chunk-1".to_owned(),
3575                node_logical_id: "nonexistent".to_owned(),
3576                text_content: "some text".to_owned(),
3577                byte_start: None,
3578                byte_end: None,
3579                content_hash: None,
3580            }],
3581            runs: vec![],
3582            steps: vec![],
3583            actions: vec![],
3584            optional_backfills: vec![],
3585            vec_inserts: vec![],
3586            operational_writes: vec![],
3587        });
3588
3589        assert!(
3590            matches!(result, Err(EngineError::InvalidWrite(_))),
3591            "completely unknown node must return InvalidWrite"
3592        );
3593    }
3594
3595    #[test]
3596    fn writer_executes_typed_nodes_chunks_and_derived_projections() {
3597        let db = NamedTempFile::new().expect("temporary db");
3598        let writer = WriterActor::start(
3599            db.path(),
3600            Arc::new(SchemaManager::new()),
3601            ProvenanceMode::Warn,
3602            Arc::new(TelemetryCounters::default()),
3603        )
3604        .expect("writer");
3605
3606        let receipt = writer
3607            .submit(WriteRequest {
3608                label: "seed".to_owned(),
3609                nodes: vec![NodeInsert {
3610                    row_id: "row-1".to_owned(),
3611                    logical_id: "logical-1".to_owned(),
3612                    kind: "Meeting".to_owned(),
3613                    properties: "{}".to_owned(),
3614                    source_ref: None,
3615                    upsert: false,
3616                    chunk_policy: ChunkPolicy::Preserve,
3617                    content_ref: None,
3618                }],
3619                node_retires: vec![],
3620                edges: vec![],
3621                edge_retires: vec![],
3622                chunks: vec![ChunkInsert {
3623                    id: "chunk-1".to_owned(),
3624                    node_logical_id: "logical-1".to_owned(),
3625                    text_content: "budget discussion".to_owned(),
3626                    byte_start: None,
3627                    byte_end: None,
3628                    content_hash: None,
3629                }],
3630                runs: vec![],
3631                steps: vec![],
3632                actions: vec![],
3633                optional_backfills: vec![],
3634                vec_inserts: vec![],
3635                operational_writes: vec![],
3636            })
3637            .expect("write receipt");
3638
3639        assert_eq!(receipt.label, "seed");
3640    }
3641
3642    #[test]
3643    fn writer_node_retire_supersedes_active_node() {
3644        let db = NamedTempFile::new().expect("temporary db");
3645        let writer = WriterActor::start(
3646            db.path(),
3647            Arc::new(SchemaManager::new()),
3648            ProvenanceMode::Warn,
3649            Arc::new(TelemetryCounters::default()),
3650        )
3651        .expect("writer");
3652
3653        writer
3654            .submit(WriteRequest {
3655                label: "seed".to_owned(),
3656                nodes: vec![NodeInsert {
3657                    row_id: "row-1".to_owned(),
3658                    logical_id: "meeting-1".to_owned(),
3659                    kind: "Meeting".to_owned(),
3660                    properties: "{}".to_owned(),
3661                    source_ref: Some("src-1".to_owned()),
3662                    upsert: false,
3663                    chunk_policy: ChunkPolicy::Preserve,
3664                    content_ref: None,
3665                }],
3666                node_retires: vec![],
3667                edges: vec![],
3668                edge_retires: vec![],
3669                chunks: vec![],
3670                runs: vec![],
3671                steps: vec![],
3672                actions: vec![],
3673                optional_backfills: vec![],
3674                vec_inserts: vec![],
3675                operational_writes: vec![],
3676            })
3677            .expect("seed write");
3678
3679        writer
3680            .submit(WriteRequest {
3681                label: "retire".to_owned(),
3682                nodes: vec![],
3683                node_retires: vec![NodeRetire {
3684                    logical_id: "meeting-1".to_owned(),
3685                    source_ref: Some("src-2".to_owned()),
3686                }],
3687                edges: vec![],
3688                edge_retires: vec![],
3689                chunks: vec![],
3690                runs: vec![],
3691                steps: vec![],
3692                actions: vec![],
3693                optional_backfills: vec![],
3694                vec_inserts: vec![],
3695                operational_writes: vec![],
3696            })
3697            .expect("retire write");
3698
3699        let conn = rusqlite::Connection::open(db.path()).expect("open");
3700        let active: i64 = conn
3701            .query_row(
3702                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
3703                [],
3704                |r| r.get(0),
3705            )
3706            .expect("count active");
3707        let historical: i64 = conn
3708            .query_row(
3709                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
3710                [],
3711                |r| r.get(0),
3712            )
3713            .expect("count historical");
3714
3715        assert_eq!(active, 0, "active count must be 0 after retire");
3716        assert_eq!(historical, 1, "historical count must be 1 after retire");
3717    }
3718
3719    #[test]
3720    fn writer_node_retire_preserves_chunks_and_clears_fts() {
3721        let db = NamedTempFile::new().expect("temporary db");
3722        let writer = WriterActor::start(
3723            db.path(),
3724            Arc::new(SchemaManager::new()),
3725            ProvenanceMode::Warn,
3726            Arc::new(TelemetryCounters::default()),
3727        )
3728        .expect("writer");
3729
3730        writer
3731            .submit(WriteRequest {
3732                label: "seed".to_owned(),
3733                nodes: vec![NodeInsert {
3734                    row_id: "row-1".to_owned(),
3735                    logical_id: "meeting-1".to_owned(),
3736                    kind: "Meeting".to_owned(),
3737                    properties: "{}".to_owned(),
3738                    source_ref: Some("src-1".to_owned()),
3739                    upsert: false,
3740                    chunk_policy: ChunkPolicy::Preserve,
3741                    content_ref: None,
3742                }],
3743                node_retires: vec![],
3744                edges: vec![],
3745                edge_retires: vec![],
3746                chunks: vec![ChunkInsert {
3747                    id: "chunk-1".to_owned(),
3748                    node_logical_id: "meeting-1".to_owned(),
3749                    text_content: "budget discussion".to_owned(),
3750                    byte_start: None,
3751                    byte_end: None,
3752                    content_hash: None,
3753                }],
3754                runs: vec![],
3755                steps: vec![],
3756                actions: vec![],
3757                optional_backfills: vec![],
3758                vec_inserts: vec![],
3759                operational_writes: vec![],
3760            })
3761            .expect("seed write");
3762
3763        writer
3764            .submit(WriteRequest {
3765                label: "retire".to_owned(),
3766                nodes: vec![],
3767                node_retires: vec![NodeRetire {
3768                    logical_id: "meeting-1".to_owned(),
3769                    source_ref: Some("src-2".to_owned()),
3770                }],
3771                edges: vec![],
3772                edge_retires: vec![],
3773                chunks: vec![],
3774                runs: vec![],
3775                steps: vec![],
3776                actions: vec![],
3777                optional_backfills: vec![],
3778                vec_inserts: vec![],
3779                operational_writes: vec![],
3780            })
3781            .expect("retire write");
3782
3783        let conn = rusqlite::Connection::open(db.path()).expect("open");
3784        let chunk_count: i64 = conn
3785            .query_row(
3786                "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
3787                [],
3788                |r| r.get(0),
3789            )
3790            .expect("chunk count");
3791        let fts_count: i64 = conn
3792            .query_row(
3793                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
3794                [],
3795                |r| r.get(0),
3796            )
3797            .expect("fts count");
3798
3799        assert_eq!(
3800            chunk_count, 1,
3801            "chunks must remain after node retire so restore can re-establish content"
3802        );
3803        assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
3804    }
3805
3806    #[test]
3807    fn writer_edge_retire_supersedes_active_edge() {
3808        let db = NamedTempFile::new().expect("temporary db");
3809        let writer = WriterActor::start(
3810            db.path(),
3811            Arc::new(SchemaManager::new()),
3812            ProvenanceMode::Warn,
3813            Arc::new(TelemetryCounters::default()),
3814        )
3815        .expect("writer");
3816
3817        writer
3818            .submit(WriteRequest {
3819                label: "seed".to_owned(),
3820                nodes: vec![
3821                    NodeInsert {
3822                        row_id: "row-a".to_owned(),
3823                        logical_id: "node-a".to_owned(),
3824                        kind: "Meeting".to_owned(),
3825                        properties: "{}".to_owned(),
3826                        source_ref: Some("src-1".to_owned()),
3827                        upsert: false,
3828                        chunk_policy: ChunkPolicy::Preserve,
3829                        content_ref: None,
3830                    },
3831                    NodeInsert {
3832                        row_id: "row-b".to_owned(),
3833                        logical_id: "node-b".to_owned(),
3834                        kind: "Task".to_owned(),
3835                        properties: "{}".to_owned(),
3836                        source_ref: Some("src-1".to_owned()),
3837                        upsert: false,
3838                        chunk_policy: ChunkPolicy::Preserve,
3839                        content_ref: None,
3840                    },
3841                ],
3842                node_retires: vec![],
3843                edges: vec![EdgeInsert {
3844                    row_id: "edge-1".to_owned(),
3845                    logical_id: "edge-lg-1".to_owned(),
3846                    source_logical_id: "node-a".to_owned(),
3847                    target_logical_id: "node-b".to_owned(),
3848                    kind: "HAS_TASK".to_owned(),
3849                    properties: "{}".to_owned(),
3850                    source_ref: Some("src-1".to_owned()),
3851                    upsert: false,
3852                }],
3853                edge_retires: vec![],
3854                chunks: vec![],
3855                runs: vec![],
3856                steps: vec![],
3857                actions: vec![],
3858                optional_backfills: vec![],
3859                vec_inserts: vec![],
3860                operational_writes: vec![],
3861            })
3862            .expect("seed write");
3863
3864        writer
3865            .submit(WriteRequest {
3866                label: "retire-edge".to_owned(),
3867                nodes: vec![],
3868                node_retires: vec![],
3869                edges: vec![],
3870                edge_retires: vec![EdgeRetire {
3871                    logical_id: "edge-lg-1".to_owned(),
3872                    source_ref: Some("src-2".to_owned()),
3873                }],
3874                chunks: vec![],
3875                runs: vec![],
3876                steps: vec![],
3877                actions: vec![],
3878                optional_backfills: vec![],
3879                vec_inserts: vec![],
3880                operational_writes: vec![],
3881            })
3882            .expect("retire edge write");
3883
3884        let conn = rusqlite::Connection::open(db.path()).expect("open");
3885        let active: i64 = conn
3886            .query_row(
3887                "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3888                [],
3889                |r| r.get(0),
3890            )
3891            .expect("active edge count");
3892
3893        assert_eq!(active, 0, "active edge count must be 0 after retire");
3894    }
3895
3896    #[test]
3897    fn writer_retire_without_source_ref_emits_provenance_warning() {
3898        let db = NamedTempFile::new().expect("temporary db");
3899        let writer = WriterActor::start(
3900            db.path(),
3901            Arc::new(SchemaManager::new()),
3902            ProvenanceMode::Warn,
3903            Arc::new(TelemetryCounters::default()),
3904        )
3905        .expect("writer");
3906
3907        writer
3908            .submit(WriteRequest {
3909                label: "seed".to_owned(),
3910                nodes: vec![NodeInsert {
3911                    row_id: "row-1".to_owned(),
3912                    logical_id: "meeting-1".to_owned(),
3913                    kind: "Meeting".to_owned(),
3914                    properties: "{}".to_owned(),
3915                    source_ref: Some("src-1".to_owned()),
3916                    upsert: false,
3917                    chunk_policy: ChunkPolicy::Preserve,
3918                    content_ref: None,
3919                }],
3920                node_retires: vec![],
3921                edges: vec![],
3922                edge_retires: vec![],
3923                chunks: vec![],
3924                runs: vec![],
3925                steps: vec![],
3926                actions: vec![],
3927                optional_backfills: vec![],
3928                vec_inserts: vec![],
3929                operational_writes: vec![],
3930            })
3931            .expect("seed write");
3932
3933        let receipt = writer
3934            .submit(WriteRequest {
3935                label: "retire-no-src".to_owned(),
3936                nodes: vec![],
3937                node_retires: vec![NodeRetire {
3938                    logical_id: "meeting-1".to_owned(),
3939                    source_ref: None,
3940                }],
3941                edges: vec![],
3942                edge_retires: vec![],
3943                chunks: vec![],
3944                runs: vec![],
3945                steps: vec![],
3946                actions: vec![],
3947                optional_backfills: vec![],
3948                vec_inserts: vec![],
3949                operational_writes: vec![],
3950            })
3951            .expect("retire write");
3952
3953        assert!(
3954            !receipt.provenance_warnings.is_empty(),
3955            "retire without source_ref must emit a provenance warning"
3956        );
3957    }
3958
3959    #[test]
3960    #[allow(clippy::too_many_lines)]
3961    fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
3962        let db = NamedTempFile::new().expect("temporary db");
3963        let writer = WriterActor::start(
3964            db.path(),
3965            Arc::new(SchemaManager::new()),
3966            ProvenanceMode::Warn,
3967            Arc::new(TelemetryCounters::default()),
3968        )
3969        .expect("writer");
3970
3971        writer
3972            .submit(WriteRequest {
3973                label: "v1".to_owned(),
3974                nodes: vec![NodeInsert {
3975                    row_id: "row-1".to_owned(),
3976                    logical_id: "meeting-1".to_owned(),
3977                    kind: "Meeting".to_owned(),
3978                    properties: "{}".to_owned(),
3979                    source_ref: Some("src-1".to_owned()),
3980                    upsert: false,
3981                    chunk_policy: ChunkPolicy::Preserve,
3982                    content_ref: None,
3983                }],
3984                node_retires: vec![],
3985                edges: vec![],
3986                edge_retires: vec![],
3987                chunks: vec![ChunkInsert {
3988                    id: "chunk-old".to_owned(),
3989                    node_logical_id: "meeting-1".to_owned(),
3990                    text_content: "old text".to_owned(),
3991                    byte_start: None,
3992                    byte_end: None,
3993                    content_hash: None,
3994                }],
3995                runs: vec![],
3996                steps: vec![],
3997                actions: vec![],
3998                optional_backfills: vec![],
3999                vec_inserts: vec![],
4000                operational_writes: vec![],
4001            })
4002            .expect("v1 write");
4003
4004        writer
4005            .submit(WriteRequest {
4006                label: "v2".to_owned(),
4007                nodes: vec![NodeInsert {
4008                    row_id: "row-2".to_owned(),
4009                    logical_id: "meeting-1".to_owned(),
4010                    kind: "Meeting".to_owned(),
4011                    properties: "{}".to_owned(),
4012                    source_ref: Some("src-2".to_owned()),
4013                    upsert: true,
4014                    chunk_policy: ChunkPolicy::Replace,
4015                    content_ref: None,
4016                }],
4017                node_retires: vec![],
4018                edges: vec![],
4019                edge_retires: vec![],
4020                chunks: vec![ChunkInsert {
4021                    id: "chunk-new".to_owned(),
4022                    node_logical_id: "meeting-1".to_owned(),
4023                    text_content: "new text".to_owned(),
4024                    byte_start: None,
4025                    byte_end: None,
4026                    content_hash: None,
4027                }],
4028                runs: vec![],
4029                steps: vec![],
4030                actions: vec![],
4031                optional_backfills: vec![],
4032                vec_inserts: vec![],
4033                operational_writes: vec![],
4034            })
4035            .expect("v2 write");
4036
4037        let conn = rusqlite::Connection::open(db.path()).expect("open");
4038        let old_chunk: i64 = conn
4039            .query_row(
4040                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4041                [],
4042                |r| r.get(0),
4043            )
4044            .expect("old chunk count");
4045        let new_chunk: i64 = conn
4046            .query_row(
4047                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4048                [],
4049                |r| r.get(0),
4050            )
4051            .expect("new chunk count");
4052        let fts_old: i64 = conn
4053            .query_row(
4054                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4055                [],
4056                |r| r.get(0),
4057            )
4058            .expect("old fts count");
4059
4060        assert_eq!(
4061            old_chunk, 0,
4062            "old chunk must be deleted by ChunkPolicy::Replace"
4063        );
4064        assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4065        assert_eq!(
4066            fts_old, 0,
4067            "old FTS row must be deleted by ChunkPolicy::Replace"
4068        );
4069    }
4070
4071    #[test]
4072    fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4073        let db = NamedTempFile::new().expect("temporary db");
4074        let writer = WriterActor::start(
4075            db.path(),
4076            Arc::new(SchemaManager::new()),
4077            ProvenanceMode::Warn,
4078            Arc::new(TelemetryCounters::default()),
4079        )
4080        .expect("writer");
4081
4082        writer
4083            .submit(WriteRequest {
4084                label: "v1".to_owned(),
4085                nodes: vec![NodeInsert {
4086                    row_id: "row-1".to_owned(),
4087                    logical_id: "meeting-1".to_owned(),
4088                    kind: "Meeting".to_owned(),
4089                    properties: "{}".to_owned(),
4090                    source_ref: Some("src-1".to_owned()),
4091                    upsert: false,
4092                    chunk_policy: ChunkPolicy::Preserve,
4093                    content_ref: None,
4094                }],
4095                node_retires: vec![],
4096                edges: vec![],
4097                edge_retires: vec![],
4098                chunks: vec![ChunkInsert {
4099                    id: "chunk-old".to_owned(),
4100                    node_logical_id: "meeting-1".to_owned(),
4101                    text_content: "old text".to_owned(),
4102                    byte_start: None,
4103                    byte_end: None,
4104                    content_hash: None,
4105                }],
4106                runs: vec![],
4107                steps: vec![],
4108                actions: vec![],
4109                optional_backfills: vec![],
4110                vec_inserts: vec![],
4111                operational_writes: vec![],
4112            })
4113            .expect("v1 write");
4114
4115        writer
4116            .submit(WriteRequest {
4117                label: "v2-props-only".to_owned(),
4118                nodes: vec![NodeInsert {
4119                    row_id: "row-2".to_owned(),
4120                    logical_id: "meeting-1".to_owned(),
4121                    kind: "Meeting".to_owned(),
4122                    properties: r#"{"status":"updated"}"#.to_owned(),
4123                    source_ref: Some("src-2".to_owned()),
4124                    upsert: true,
4125                    chunk_policy: ChunkPolicy::Preserve,
4126                    content_ref: None,
4127                }],
4128                node_retires: vec![],
4129                edges: vec![],
4130                edge_retires: vec![],
4131                chunks: vec![],
4132                runs: vec![],
4133                steps: vec![],
4134                actions: vec![],
4135                optional_backfills: vec![],
4136                vec_inserts: vec![],
4137                operational_writes: vec![],
4138            })
4139            .expect("v2 preserve write");
4140
4141        let conn = rusqlite::Connection::open(db.path()).expect("open");
4142        let old_chunk: i64 = conn
4143            .query_row(
4144                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4145                [],
4146                |r| r.get(0),
4147            )
4148            .expect("old chunk count");
4149
4150        assert_eq!(
4151            old_chunk, 1,
4152            "old chunk must be preserved by ChunkPolicy::Preserve"
4153        );
4154    }
4155
4156    #[test]
4157    fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4158        let db = NamedTempFile::new().expect("temporary db");
4159        let writer = WriterActor::start(
4160            db.path(),
4161            Arc::new(SchemaManager::new()),
4162            ProvenanceMode::Warn,
4163            Arc::new(TelemetryCounters::default()),
4164        )
4165        .expect("writer");
4166
4167        writer
4168            .submit(WriteRequest {
4169                label: "v1".to_owned(),
4170                nodes: vec![NodeInsert {
4171                    row_id: "row-1".to_owned(),
4172                    logical_id: "meeting-1".to_owned(),
4173                    kind: "Meeting".to_owned(),
4174                    properties: "{}".to_owned(),
4175                    source_ref: Some("src-1".to_owned()),
4176                    upsert: false,
4177                    chunk_policy: ChunkPolicy::Preserve,
4178                    content_ref: None,
4179                }],
4180                node_retires: vec![],
4181                edges: vec![],
4182                edge_retires: vec![],
4183                chunks: vec![ChunkInsert {
4184                    id: "chunk-existing".to_owned(),
4185                    node_logical_id: "meeting-1".to_owned(),
4186                    text_content: "existing text".to_owned(),
4187                    byte_start: None,
4188                    byte_end: None,
4189                    content_hash: None,
4190                }],
4191                runs: vec![],
4192                steps: vec![],
4193                actions: vec![],
4194                optional_backfills: vec![],
4195                vec_inserts: vec![],
4196                operational_writes: vec![],
4197            })
4198            .expect("v1 write");
4199
4200        // Insert a second node (not upsert) with ChunkPolicy::Replace — should NOT delete prior chunks
4201        writer
4202            .submit(WriteRequest {
4203                label: "insert-no-upsert".to_owned(),
4204                nodes: vec![NodeInsert {
4205                    row_id: "row-2".to_owned(),
4206                    logical_id: "meeting-2".to_owned(),
4207                    kind: "Meeting".to_owned(),
4208                    properties: "{}".to_owned(),
4209                    source_ref: Some("src-2".to_owned()),
4210                    upsert: false,
4211                    chunk_policy: ChunkPolicy::Replace,
4212                    content_ref: None,
4213                }],
4214                node_retires: vec![],
4215                edges: vec![],
4216                edge_retires: vec![],
4217                chunks: vec![],
4218                runs: vec![],
4219                steps: vec![],
4220                actions: vec![],
4221                optional_backfills: vec![],
4222                vec_inserts: vec![],
4223                operational_writes: vec![],
4224            })
4225            .expect("insert no-upsert write");
4226
4227        let conn = rusqlite::Connection::open(db.path()).expect("open");
4228        let existing_chunk: i64 = conn
4229            .query_row(
4230                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4231                [],
4232                |r| r.get(0),
4233            )
4234            .expect("chunk count");
4235
4236        assert_eq!(
4237            existing_chunk, 1,
4238            "ChunkPolicy::Replace without upsert must not delete existing chunks"
4239        );
4240    }
4241
4242    #[test]
4243    fn writer_run_upsert_supersedes_prior_active_run() {
4244        let db = NamedTempFile::new().expect("temporary db");
4245        let writer = WriterActor::start(
4246            db.path(),
4247            Arc::new(SchemaManager::new()),
4248            ProvenanceMode::Warn,
4249            Arc::new(TelemetryCounters::default()),
4250        )
4251        .expect("writer");
4252
4253        writer
4254            .submit(WriteRequest {
4255                label: "v1".to_owned(),
4256                nodes: vec![],
4257                node_retires: vec![],
4258                edges: vec![],
4259                edge_retires: vec![],
4260                chunks: vec![],
4261                runs: vec![RunInsert {
4262                    id: "run-v1".to_owned(),
4263                    kind: "session".to_owned(),
4264                    status: "completed".to_owned(),
4265                    properties: "{}".to_owned(),
4266                    source_ref: Some("src-1".to_owned()),
4267                    upsert: false,
4268                    supersedes_id: None,
4269                }],
4270                steps: vec![],
4271                actions: vec![],
4272                optional_backfills: vec![],
4273                vec_inserts: vec![],
4274                operational_writes: vec![],
4275            })
4276            .expect("v1 run write");
4277
4278        writer
4279            .submit(WriteRequest {
4280                label: "v2".to_owned(),
4281                nodes: vec![],
4282                node_retires: vec![],
4283                edges: vec![],
4284                edge_retires: vec![],
4285                chunks: vec![],
4286                runs: vec![RunInsert {
4287                    id: "run-v2".to_owned(),
4288                    kind: "session".to_owned(),
4289                    status: "completed".to_owned(),
4290                    properties: "{}".to_owned(),
4291                    source_ref: Some("src-2".to_owned()),
4292                    upsert: true,
4293                    supersedes_id: Some("run-v1".to_owned()),
4294                }],
4295                steps: vec![],
4296                actions: vec![],
4297                optional_backfills: vec![],
4298                vec_inserts: vec![],
4299                operational_writes: vec![],
4300            })
4301            .expect("v2 run write");
4302
4303        let conn = rusqlite::Connection::open(db.path()).expect("open");
4304        let v1_historical: i64 = conn
4305            .query_row(
4306                "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4307                [],
4308                |r| r.get(0),
4309            )
4310            .expect("v1 historical count");
4311        let v2_active: i64 = conn
4312            .query_row(
4313                "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4314                [],
4315                |r| r.get(0),
4316            )
4317            .expect("v2 active count");
4318
4319        assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4320        assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4321    }
4322
4323    #[test]
4324    fn writer_step_upsert_supersedes_prior_active_step() {
4325        let db = NamedTempFile::new().expect("temporary db");
4326        let writer = WriterActor::start(
4327            db.path(),
4328            Arc::new(SchemaManager::new()),
4329            ProvenanceMode::Warn,
4330            Arc::new(TelemetryCounters::default()),
4331        )
4332        .expect("writer");
4333
4334        writer
4335            .submit(WriteRequest {
4336                label: "v1".to_owned(),
4337                nodes: vec![],
4338                node_retires: vec![],
4339                edges: vec![],
4340                edge_retires: vec![],
4341                chunks: vec![],
4342                runs: vec![RunInsert {
4343                    id: "run-1".to_owned(),
4344                    kind: "session".to_owned(),
4345                    status: "completed".to_owned(),
4346                    properties: "{}".to_owned(),
4347                    source_ref: Some("src-1".to_owned()),
4348                    upsert: false,
4349                    supersedes_id: None,
4350                }],
4351                steps: vec![StepInsert {
4352                    id: "step-v1".to_owned(),
4353                    run_id: "run-1".to_owned(),
4354                    kind: "llm".to_owned(),
4355                    status: "completed".to_owned(),
4356                    properties: "{}".to_owned(),
4357                    source_ref: Some("src-1".to_owned()),
4358                    upsert: false,
4359                    supersedes_id: None,
4360                }],
4361                actions: vec![],
4362                optional_backfills: vec![],
4363                vec_inserts: vec![],
4364                operational_writes: vec![],
4365            })
4366            .expect("v1 step write");
4367
4368        writer
4369            .submit(WriteRequest {
4370                label: "v2".to_owned(),
4371                nodes: vec![],
4372                node_retires: vec![],
4373                edges: vec![],
4374                edge_retires: vec![],
4375                chunks: vec![],
4376                runs: vec![],
4377                steps: vec![StepInsert {
4378                    id: "step-v2".to_owned(),
4379                    run_id: "run-1".to_owned(),
4380                    kind: "llm".to_owned(),
4381                    status: "completed".to_owned(),
4382                    properties: "{}".to_owned(),
4383                    source_ref: Some("src-2".to_owned()),
4384                    upsert: true,
4385                    supersedes_id: Some("step-v1".to_owned()),
4386                }],
4387                actions: vec![],
4388                optional_backfills: vec![],
4389                vec_inserts: vec![],
4390                operational_writes: vec![],
4391            })
4392            .expect("v2 step write");
4393
4394        let conn = rusqlite::Connection::open(db.path()).expect("open");
4395        let v1_historical: i64 = conn
4396            .query_row(
4397                "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4398                [],
4399                |r| r.get(0),
4400            )
4401            .expect("v1 historical count");
4402        let v2_active: i64 = conn
4403            .query_row(
4404                "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4405                [],
4406                |r| r.get(0),
4407            )
4408            .expect("v2 active count");
4409
4410        assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4411        assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4412    }
4413
4414    #[test]
4415    fn writer_action_upsert_supersedes_prior_active_action() {
4416        let db = NamedTempFile::new().expect("temporary db");
4417        let writer = WriterActor::start(
4418            db.path(),
4419            Arc::new(SchemaManager::new()),
4420            ProvenanceMode::Warn,
4421            Arc::new(TelemetryCounters::default()),
4422        )
4423        .expect("writer");
4424
4425        writer
4426            .submit(WriteRequest {
4427                label: "v1".to_owned(),
4428                nodes: vec![],
4429                node_retires: vec![],
4430                edges: vec![],
4431                edge_retires: vec![],
4432                chunks: vec![],
4433                runs: vec![RunInsert {
4434                    id: "run-1".to_owned(),
4435                    kind: "session".to_owned(),
4436                    status: "completed".to_owned(),
4437                    properties: "{}".to_owned(),
4438                    source_ref: Some("src-1".to_owned()),
4439                    upsert: false,
4440                    supersedes_id: None,
4441                }],
4442                steps: vec![StepInsert {
4443                    id: "step-1".to_owned(),
4444                    run_id: "run-1".to_owned(),
4445                    kind: "llm".to_owned(),
4446                    status: "completed".to_owned(),
4447                    properties: "{}".to_owned(),
4448                    source_ref: Some("src-1".to_owned()),
4449                    upsert: false,
4450                    supersedes_id: None,
4451                }],
4452                actions: vec![ActionInsert {
4453                    id: "action-v1".to_owned(),
4454                    step_id: "step-1".to_owned(),
4455                    kind: "emit".to_owned(),
4456                    status: "completed".to_owned(),
4457                    properties: "{}".to_owned(),
4458                    source_ref: Some("src-1".to_owned()),
4459                    upsert: false,
4460                    supersedes_id: None,
4461                }],
4462                optional_backfills: vec![],
4463                vec_inserts: vec![],
4464                operational_writes: vec![],
4465            })
4466            .expect("v1 action write");
4467
4468        writer
4469            .submit(WriteRequest {
4470                label: "v2".to_owned(),
4471                nodes: vec![],
4472                node_retires: vec![],
4473                edges: vec![],
4474                edge_retires: vec![],
4475                chunks: vec![],
4476                runs: vec![],
4477                steps: vec![],
4478                actions: vec![ActionInsert {
4479                    id: "action-v2".to_owned(),
4480                    step_id: "step-1".to_owned(),
4481                    kind: "emit".to_owned(),
4482                    status: "completed".to_owned(),
4483                    properties: "{}".to_owned(),
4484                    source_ref: Some("src-2".to_owned()),
4485                    upsert: true,
4486                    supersedes_id: Some("action-v1".to_owned()),
4487                }],
4488                optional_backfills: vec![],
4489                vec_inserts: vec![],
4490                operational_writes: vec![],
4491            })
4492            .expect("v2 action write");
4493
4494        let conn = rusqlite::Connection::open(db.path()).expect("open");
4495        let v1_historical: i64 = conn
4496            .query_row(
4497                "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4498                [],
4499                |r| r.get(0),
4500            )
4501            .expect("v1 historical count");
4502        let v2_active: i64 = conn
4503            .query_row(
4504                "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4505                [],
4506                |r| r.get(0),
4507            )
4508            .expect("v2 active count");
4509
4510        assert_eq!(
4511            v1_historical, 1,
4512            "action-v1 must be historical after upsert"
4513        );
4514        assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4515    }
4516
4517    // P0: runtime upsert without supersedes_id must be rejected
4518
4519    #[test]
4520    fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4521        let db = NamedTempFile::new().expect("temporary db");
4522        let writer = WriterActor::start(
4523            db.path(),
4524            Arc::new(SchemaManager::new()),
4525            ProvenanceMode::Warn,
4526            Arc::new(TelemetryCounters::default()),
4527        )
4528        .expect("writer");
4529
4530        let result = writer.submit(WriteRequest {
4531            label: "bad".to_owned(),
4532            nodes: vec![],
4533            node_retires: vec![],
4534            edges: vec![],
4535            edge_retires: vec![],
4536            chunks: vec![],
4537            runs: vec![RunInsert {
4538                id: "run-1".to_owned(),
4539                kind: "session".to_owned(),
4540                status: "completed".to_owned(),
4541                properties: "{}".to_owned(),
4542                source_ref: None,
4543                upsert: true,
4544                supersedes_id: None,
4545            }],
4546            steps: vec![],
4547            actions: vec![],
4548            optional_backfills: vec![],
4549            vec_inserts: vec![],
4550            operational_writes: vec![],
4551        });
4552
4553        assert!(
4554            matches!(result, Err(EngineError::InvalidWrite(_))),
4555            "run upsert=true without supersedes_id must return InvalidWrite"
4556        );
4557    }
4558
4559    #[test]
4560    fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4561        let db = NamedTempFile::new().expect("temporary db");
4562        let writer = WriterActor::start(
4563            db.path(),
4564            Arc::new(SchemaManager::new()),
4565            ProvenanceMode::Warn,
4566            Arc::new(TelemetryCounters::default()),
4567        )
4568        .expect("writer");
4569
4570        let result = writer.submit(WriteRequest {
4571            label: "bad".to_owned(),
4572            nodes: vec![],
4573            node_retires: vec![],
4574            edges: vec![],
4575            edge_retires: vec![],
4576            chunks: vec![],
4577            runs: vec![],
4578            steps: vec![StepInsert {
4579                id: "step-1".to_owned(),
4580                run_id: "run-1".to_owned(),
4581                kind: "llm".to_owned(),
4582                status: "completed".to_owned(),
4583                properties: "{}".to_owned(),
4584                source_ref: None,
4585                upsert: true,
4586                supersedes_id: None,
4587            }],
4588            actions: vec![],
4589            optional_backfills: vec![],
4590            vec_inserts: vec![],
4591            operational_writes: vec![],
4592        });
4593
4594        assert!(
4595            matches!(result, Err(EngineError::InvalidWrite(_))),
4596            "step upsert=true without supersedes_id must return InvalidWrite"
4597        );
4598    }
4599
4600    #[test]
4601    fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
4602        let db = NamedTempFile::new().expect("temporary db");
4603        let writer = WriterActor::start(
4604            db.path(),
4605            Arc::new(SchemaManager::new()),
4606            ProvenanceMode::Warn,
4607            Arc::new(TelemetryCounters::default()),
4608        )
4609        .expect("writer");
4610
4611        let result = writer.submit(WriteRequest {
4612            label: "bad".to_owned(),
4613            nodes: vec![],
4614            node_retires: vec![],
4615            edges: vec![],
4616            edge_retires: vec![],
4617            chunks: vec![],
4618            runs: vec![],
4619            steps: vec![],
4620            actions: vec![ActionInsert {
4621                id: "action-1".to_owned(),
4622                step_id: "step-1".to_owned(),
4623                kind: "emit".to_owned(),
4624                status: "completed".to_owned(),
4625                properties: "{}".to_owned(),
4626                source_ref: None,
4627                upsert: true,
4628                supersedes_id: None,
4629            }],
4630            optional_backfills: vec![],
4631            vec_inserts: vec![],
4632            operational_writes: vec![],
4633        });
4634
4635        assert!(
4636            matches!(result, Err(EngineError::InvalidWrite(_))),
4637            "action upsert=true without supersedes_id must return InvalidWrite"
4638        );
4639    }
4640
4641    // P1a/b: provenance warnings for edge inserts and runtime table inserts
4642
4643    #[test]
4644    fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
4645        let db = NamedTempFile::new().expect("temporary db");
4646        let writer = WriterActor::start(
4647            db.path(),
4648            Arc::new(SchemaManager::new()),
4649            ProvenanceMode::Warn,
4650            Arc::new(TelemetryCounters::default()),
4651        )
4652        .expect("writer");
4653
4654        let receipt = writer
4655            .submit(WriteRequest {
4656                label: "test".to_owned(),
4657                nodes: vec![
4658                    NodeInsert {
4659                        row_id: "row-a".to_owned(),
4660                        logical_id: "node-a".to_owned(),
4661                        kind: "Meeting".to_owned(),
4662                        properties: "{}".to_owned(),
4663                        source_ref: Some("src-1".to_owned()),
4664                        upsert: false,
4665                        chunk_policy: ChunkPolicy::Preserve,
4666                        content_ref: None,
4667                    },
4668                    NodeInsert {
4669                        row_id: "row-b".to_owned(),
4670                        logical_id: "node-b".to_owned(),
4671                        kind: "Task".to_owned(),
4672                        properties: "{}".to_owned(),
4673                        source_ref: Some("src-1".to_owned()),
4674                        upsert: false,
4675                        chunk_policy: ChunkPolicy::Preserve,
4676                        content_ref: None,
4677                    },
4678                ],
4679                node_retires: vec![],
4680                edges: vec![EdgeInsert {
4681                    row_id: "edge-1".to_owned(),
4682                    logical_id: "edge-lg-1".to_owned(),
4683                    source_logical_id: "node-a".to_owned(),
4684                    target_logical_id: "node-b".to_owned(),
4685                    kind: "HAS_TASK".to_owned(),
4686                    properties: "{}".to_owned(),
4687                    source_ref: None,
4688                    upsert: false,
4689                }],
4690                edge_retires: vec![],
4691                chunks: vec![],
4692                runs: vec![],
4693                steps: vec![],
4694                actions: vec![],
4695                optional_backfills: vec![],
4696                vec_inserts: vec![],
4697                operational_writes: vec![],
4698            })
4699            .expect("write");
4700
4701        assert!(
4702            !receipt.provenance_warnings.is_empty(),
4703            "edge insert without source_ref must emit a provenance warning"
4704        );
4705    }
4706
4707    #[test]
4708    fn writer_run_insert_without_source_ref_emits_provenance_warning() {
4709        let db = NamedTempFile::new().expect("temporary db");
4710        let writer = WriterActor::start(
4711            db.path(),
4712            Arc::new(SchemaManager::new()),
4713            ProvenanceMode::Warn,
4714            Arc::new(TelemetryCounters::default()),
4715        )
4716        .expect("writer");
4717
4718        let receipt = writer
4719            .submit(WriteRequest {
4720                label: "test".to_owned(),
4721                nodes: vec![],
4722                node_retires: vec![],
4723                edges: vec![],
4724                edge_retires: vec![],
4725                chunks: vec![],
4726                runs: vec![RunInsert {
4727                    id: "run-1".to_owned(),
4728                    kind: "session".to_owned(),
4729                    status: "completed".to_owned(),
4730                    properties: "{}".to_owned(),
4731                    source_ref: None,
4732                    upsert: false,
4733                    supersedes_id: None,
4734                }],
4735                steps: vec![],
4736                actions: vec![],
4737                optional_backfills: vec![],
4738                vec_inserts: vec![],
4739                operational_writes: vec![],
4740            })
4741            .expect("write");
4742
4743        assert!(
4744            !receipt.provenance_warnings.is_empty(),
4745            "run insert without source_ref must emit a provenance warning"
4746        );
4747    }
4748
4749    // P1c: retire a node AND submit chunks for the same logical_id in one request
4750
4751    #[test]
4752    fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
4753        let db = NamedTempFile::new().expect("temporary db");
4754        let writer = WriterActor::start(
4755            db.path(),
4756            Arc::new(SchemaManager::new()),
4757            ProvenanceMode::Warn,
4758            Arc::new(TelemetryCounters::default()),
4759        )
4760        .expect("writer");
4761
4762        // First seed the node so it exists
4763        writer
4764            .submit(WriteRequest {
4765                label: "seed".to_owned(),
4766                nodes: vec![NodeInsert {
4767                    row_id: "row-1".to_owned(),
4768                    logical_id: "meeting-1".to_owned(),
4769                    kind: "Meeting".to_owned(),
4770                    properties: "{}".to_owned(),
4771                    source_ref: Some("src-1".to_owned()),
4772                    upsert: false,
4773                    chunk_policy: ChunkPolicy::Preserve,
4774                    content_ref: None,
4775                }],
4776                node_retires: vec![],
4777                edges: vec![],
4778                edge_retires: vec![],
4779                chunks: vec![],
4780                runs: vec![],
4781                steps: vec![],
4782                actions: vec![],
4783                optional_backfills: vec![],
4784                vec_inserts: vec![],
4785                operational_writes: vec![],
4786            })
4787            .expect("seed write");
4788
4789        // Now try to retire it AND add a chunk for it in the same request
4790        let result = writer.submit(WriteRequest {
4791            label: "bad".to_owned(),
4792            nodes: vec![],
4793            node_retires: vec![NodeRetire {
4794                logical_id: "meeting-1".to_owned(),
4795                source_ref: Some("src-2".to_owned()),
4796            }],
4797            edges: vec![],
4798            edge_retires: vec![],
4799            chunks: vec![ChunkInsert {
4800                id: "chunk-bad".to_owned(),
4801                node_logical_id: "meeting-1".to_owned(),
4802                text_content: "some text".to_owned(),
4803                byte_start: None,
4804                byte_end: None,
4805                content_hash: None,
4806            }],
4807            runs: vec![],
4808            steps: vec![],
4809            actions: vec![],
4810            optional_backfills: vec![],
4811            vec_inserts: vec![],
4812            operational_writes: vec![],
4813        });
4814
4815        assert!(
4816            matches!(result, Err(EngineError::InvalidWrite(_))),
4817            "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
4818        );
4819    }
4820
4821    // --- Item 1: prepare_cached batch insert ---
4822
4823    #[test]
4824    fn writer_batch_insert_multiple_nodes() {
4825        let db = NamedTempFile::new().expect("temporary db");
4826        let writer = WriterActor::start(
4827            db.path(),
4828            Arc::new(SchemaManager::new()),
4829            ProvenanceMode::Warn,
4830            Arc::new(TelemetryCounters::default()),
4831        )
4832        .expect("writer");
4833
4834        let nodes: Vec<NodeInsert> = (0..100)
4835            .map(|i| NodeInsert {
4836                row_id: format!("row-{i}"),
4837                logical_id: format!("lg-{i}"),
4838                kind: "Note".to_owned(),
4839                properties: "{}".to_owned(),
4840                source_ref: Some("batch-src".to_owned()),
4841                upsert: false,
4842                chunk_policy: ChunkPolicy::Preserve,
4843                content_ref: None,
4844            })
4845            .collect();
4846
4847        writer
4848            .submit(WriteRequest {
4849                label: "batch".to_owned(),
4850                nodes,
4851                node_retires: vec![],
4852                edges: vec![],
4853                edge_retires: vec![],
4854                chunks: vec![],
4855                runs: vec![],
4856                steps: vec![],
4857                actions: vec![],
4858                optional_backfills: vec![],
4859                vec_inserts: vec![],
4860                operational_writes: vec![],
4861            })
4862            .expect("batch write");
4863
4864        let conn = rusqlite::Connection::open(db.path()).expect("open");
4865        let count: i64 = conn
4866            .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
4867            .expect("count nodes");
4868        assert_eq!(
4869            count, 100,
4870            "all 100 nodes must be present after batch insert"
4871        );
4872    }
4873
4874    // --- Item 2: ID validation ---
4875
4876    #[test]
4877    fn prepare_write_rejects_empty_node_row_id() {
4878        let db = NamedTempFile::new().expect("temporary db");
4879        let writer = WriterActor::start(
4880            db.path(),
4881            Arc::new(SchemaManager::new()),
4882            ProvenanceMode::Warn,
4883            Arc::new(TelemetryCounters::default()),
4884        )
4885        .expect("writer");
4886
4887        let result = writer.submit(WriteRequest {
4888            label: "test".to_owned(),
4889            nodes: vec![NodeInsert {
4890                row_id: String::new(),
4891                logical_id: "lg-1".to_owned(),
4892                kind: "Note".to_owned(),
4893                properties: "{}".to_owned(),
4894                source_ref: None,
4895                upsert: false,
4896                chunk_policy: ChunkPolicy::Preserve,
4897                content_ref: None,
4898            }],
4899            node_retires: vec![],
4900            edges: vec![],
4901            edge_retires: vec![],
4902            chunks: vec![],
4903            runs: vec![],
4904            steps: vec![],
4905            actions: vec![],
4906            optional_backfills: vec![],
4907            vec_inserts: vec![],
4908            operational_writes: vec![],
4909        });
4910
4911        assert!(
4912            matches!(result, Err(EngineError::InvalidWrite(_))),
4913            "empty row_id must be rejected"
4914        );
4915    }
4916
4917    #[test]
4918    fn prepare_write_rejects_empty_node_logical_id() {
4919        let db = NamedTempFile::new().expect("temporary db");
4920        let writer = WriterActor::start(
4921            db.path(),
4922            Arc::new(SchemaManager::new()),
4923            ProvenanceMode::Warn,
4924            Arc::new(TelemetryCounters::default()),
4925        )
4926        .expect("writer");
4927
4928        let result = writer.submit(WriteRequest {
4929            label: "test".to_owned(),
4930            nodes: vec![NodeInsert {
4931                row_id: "row-1".to_owned(),
4932                logical_id: String::new(),
4933                kind: "Note".to_owned(),
4934                properties: "{}".to_owned(),
4935                source_ref: None,
4936                upsert: false,
4937                chunk_policy: ChunkPolicy::Preserve,
4938                content_ref: None,
4939            }],
4940            node_retires: vec![],
4941            edges: vec![],
4942            edge_retires: vec![],
4943            chunks: vec![],
4944            runs: vec![],
4945            steps: vec![],
4946            actions: vec![],
4947            optional_backfills: vec![],
4948            vec_inserts: vec![],
4949            operational_writes: vec![],
4950        });
4951
4952        assert!(
4953            matches!(result, Err(EngineError::InvalidWrite(_))),
4954            "empty logical_id must be rejected"
4955        );
4956    }
4957
4958    #[test]
4959    fn prepare_write_rejects_duplicate_row_ids_in_request() {
4960        let db = NamedTempFile::new().expect("temporary db");
4961        let writer = WriterActor::start(
4962            db.path(),
4963            Arc::new(SchemaManager::new()),
4964            ProvenanceMode::Warn,
4965            Arc::new(TelemetryCounters::default()),
4966        )
4967        .expect("writer");
4968
4969        let result = writer.submit(WriteRequest {
4970            label: "test".to_owned(),
4971            nodes: vec![
4972                NodeInsert {
4973                    row_id: "row-1".to_owned(),
4974                    logical_id: "lg-1".to_owned(),
4975                    kind: "Note".to_owned(),
4976                    properties: "{}".to_owned(),
4977                    source_ref: None,
4978                    upsert: false,
4979                    chunk_policy: ChunkPolicy::Preserve,
4980                    content_ref: None,
4981                },
4982                NodeInsert {
4983                    row_id: "row-1".to_owned(), // duplicate
4984                    logical_id: "lg-2".to_owned(),
4985                    kind: "Note".to_owned(),
4986                    properties: "{}".to_owned(),
4987                    source_ref: None,
4988                    upsert: false,
4989                    chunk_policy: ChunkPolicy::Preserve,
4990                    content_ref: None,
4991                },
4992            ],
4993            node_retires: vec![],
4994            edges: vec![],
4995            edge_retires: vec![],
4996            chunks: vec![],
4997            runs: vec![],
4998            steps: vec![],
4999            actions: vec![],
5000            optional_backfills: vec![],
5001            vec_inserts: vec![],
5002            operational_writes: vec![],
5003        });
5004
5005        assert!(
5006            matches!(result, Err(EngineError::InvalidWrite(_))),
5007            "duplicate row_id within request must be rejected"
5008        );
5009    }
5010
5011    #[test]
5012    fn prepare_write_rejects_empty_chunk_id() {
5013        let db = NamedTempFile::new().expect("temporary db");
5014        let writer = WriterActor::start(
5015            db.path(),
5016            Arc::new(SchemaManager::new()),
5017            ProvenanceMode::Warn,
5018            Arc::new(TelemetryCounters::default()),
5019        )
5020        .expect("writer");
5021
5022        let result = writer.submit(WriteRequest {
5023            label: "test".to_owned(),
5024            nodes: vec![NodeInsert {
5025                row_id: "row-1".to_owned(),
5026                logical_id: "lg-1".to_owned(),
5027                kind: "Note".to_owned(),
5028                properties: "{}".to_owned(),
5029                source_ref: None,
5030                upsert: false,
5031                chunk_policy: ChunkPolicy::Preserve,
5032                content_ref: None,
5033            }],
5034            node_retires: vec![],
5035            edges: vec![],
5036            edge_retires: vec![],
5037            chunks: vec![ChunkInsert {
5038                id: String::new(),
5039                node_logical_id: "lg-1".to_owned(),
5040                text_content: "some text".to_owned(),
5041                byte_start: None,
5042                byte_end: None,
5043                content_hash: None,
5044            }],
5045            runs: vec![],
5046            steps: vec![],
5047            actions: vec![],
5048            optional_backfills: vec![],
5049            vec_inserts: vec![],
5050            operational_writes: vec![],
5051        });
5052
5053        assert!(
5054            matches!(result, Err(EngineError::InvalidWrite(_))),
5055            "empty chunk id must be rejected"
5056        );
5057    }
5058
5059    // --- Item 4: provenance warning coverage tests ---
5060
5061    #[test]
5062    fn writer_receipt_warns_on_step_without_source_ref() {
5063        let db = NamedTempFile::new().expect("temporary db");
5064        let writer = WriterActor::start(
5065            db.path(),
5066            Arc::new(SchemaManager::new()),
5067            ProvenanceMode::Warn,
5068            Arc::new(TelemetryCounters::default()),
5069        )
5070        .expect("writer");
5071
5072        // seed a run first so step FK is satisfied
5073        writer
5074            .submit(WriteRequest {
5075                label: "seed-run".to_owned(),
5076                nodes: vec![],
5077                node_retires: vec![],
5078                edges: vec![],
5079                edge_retires: vec![],
5080                chunks: vec![],
5081                runs: vec![RunInsert {
5082                    id: "run-1".to_owned(),
5083                    kind: "session".to_owned(),
5084                    status: "active".to_owned(),
5085                    properties: "{}".to_owned(),
5086                    source_ref: Some("src-1".to_owned()),
5087                    upsert: false,
5088                    supersedes_id: None,
5089                }],
5090                steps: vec![],
5091                actions: vec![],
5092                optional_backfills: vec![],
5093                vec_inserts: vec![],
5094                operational_writes: vec![],
5095            })
5096            .expect("seed run");
5097
5098        let receipt = writer
5099            .submit(WriteRequest {
5100                label: "test".to_owned(),
5101                nodes: vec![],
5102                node_retires: vec![],
5103                edges: vec![],
5104                edge_retires: vec![],
5105                chunks: vec![],
5106                runs: vec![],
5107                steps: vec![StepInsert {
5108                    id: "step-1".to_owned(),
5109                    run_id: "run-1".to_owned(),
5110                    kind: "llm_call".to_owned(),
5111                    status: "completed".to_owned(),
5112                    properties: "{}".to_owned(),
5113                    source_ref: None,
5114                    upsert: false,
5115                    supersedes_id: None,
5116                }],
5117                actions: vec![],
5118                optional_backfills: vec![],
5119                vec_inserts: vec![],
5120                operational_writes: vec![],
5121            })
5122            .expect("write");
5123
5124        assert!(
5125            !receipt.provenance_warnings.is_empty(),
5126            "step insert without source_ref must emit a provenance warning"
5127        );
5128    }
5129
5130    #[test]
5131    fn writer_receipt_warns_on_action_without_source_ref() {
5132        let db = NamedTempFile::new().expect("temporary db");
5133        let writer = WriterActor::start(
5134            db.path(),
5135            Arc::new(SchemaManager::new()),
5136            ProvenanceMode::Warn,
5137            Arc::new(TelemetryCounters::default()),
5138        )
5139        .expect("writer");
5140
5141        // seed run and step so action FK is satisfied
5142        writer
5143            .submit(WriteRequest {
5144                label: "seed".to_owned(),
5145                nodes: vec![],
5146                node_retires: vec![],
5147                edges: vec![],
5148                edge_retires: vec![],
5149                chunks: vec![],
5150                runs: vec![RunInsert {
5151                    id: "run-1".to_owned(),
5152                    kind: "session".to_owned(),
5153                    status: "active".to_owned(),
5154                    properties: "{}".to_owned(),
5155                    source_ref: Some("src-1".to_owned()),
5156                    upsert: false,
5157                    supersedes_id: None,
5158                }],
5159                steps: vec![StepInsert {
5160                    id: "step-1".to_owned(),
5161                    run_id: "run-1".to_owned(),
5162                    kind: "llm_call".to_owned(),
5163                    status: "completed".to_owned(),
5164                    properties: "{}".to_owned(),
5165                    source_ref: Some("src-1".to_owned()),
5166                    upsert: false,
5167                    supersedes_id: None,
5168                }],
5169                actions: vec![],
5170                optional_backfills: vec![],
5171                vec_inserts: vec![],
5172                operational_writes: vec![],
5173            })
5174            .expect("seed");
5175
5176        let receipt = writer
5177            .submit(WriteRequest {
5178                label: "test".to_owned(),
5179                nodes: vec![],
5180                node_retires: vec![],
5181                edges: vec![],
5182                edge_retires: vec![],
5183                chunks: vec![],
5184                runs: vec![],
5185                steps: vec![],
5186                actions: vec![ActionInsert {
5187                    id: "action-1".to_owned(),
5188                    step_id: "step-1".to_owned(),
5189                    kind: "tool_call".to_owned(),
5190                    status: "completed".to_owned(),
5191                    properties: "{}".to_owned(),
5192                    source_ref: None,
5193                    upsert: false,
5194                    supersedes_id: None,
5195                }],
5196                optional_backfills: vec![],
5197                vec_inserts: vec![],
5198                operational_writes: vec![],
5199            })
5200            .expect("write");
5201
5202        assert!(
5203            !receipt.provenance_warnings.is_empty(),
5204            "action insert without source_ref must emit a provenance warning"
5205        );
5206    }
5207
5208    #[test]
5209    fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5210        let db = NamedTempFile::new().expect("temporary db");
5211        let writer = WriterActor::start(
5212            db.path(),
5213            Arc::new(SchemaManager::new()),
5214            ProvenanceMode::Warn,
5215            Arc::new(TelemetryCounters::default()),
5216        )
5217        .expect("writer");
5218
5219        let receipt = writer
5220            .submit(WriteRequest {
5221                label: "test".to_owned(),
5222                nodes: vec![NodeInsert {
5223                    row_id: "row-1".to_owned(),
5224                    logical_id: "node-1".to_owned(),
5225                    kind: "Note".to_owned(),
5226                    properties: "{}".to_owned(),
5227                    source_ref: Some("src-1".to_owned()),
5228                    upsert: false,
5229                    chunk_policy: ChunkPolicy::Preserve,
5230                    content_ref: None,
5231                }],
5232                node_retires: vec![],
5233                edges: vec![],
5234                edge_retires: vec![],
5235                chunks: vec![],
5236                runs: vec![RunInsert {
5237                    id: "run-1".to_owned(),
5238                    kind: "session".to_owned(),
5239                    status: "active".to_owned(),
5240                    properties: "{}".to_owned(),
5241                    source_ref: Some("src-1".to_owned()),
5242                    upsert: false,
5243                    supersedes_id: None,
5244                }],
5245                steps: vec![StepInsert {
5246                    id: "step-1".to_owned(),
5247                    run_id: "run-1".to_owned(),
5248                    kind: "llm_call".to_owned(),
5249                    status: "completed".to_owned(),
5250                    properties: "{}".to_owned(),
5251                    source_ref: Some("src-1".to_owned()),
5252                    upsert: false,
5253                    supersedes_id: None,
5254                }],
5255                actions: vec![ActionInsert {
5256                    id: "action-1".to_owned(),
5257                    step_id: "step-1".to_owned(),
5258                    kind: "tool_call".to_owned(),
5259                    status: "completed".to_owned(),
5260                    properties: "{}".to_owned(),
5261                    source_ref: Some("src-1".to_owned()),
5262                    upsert: false,
5263                    supersedes_id: None,
5264                }],
5265                optional_backfills: vec![],
5266                vec_inserts: vec![],
5267                operational_writes: vec![],
5268            })
5269            .expect("write");
5270
5271        assert!(
5272            receipt.provenance_warnings.is_empty(),
5273            "no warnings expected when all types have source_ref; got: {:?}",
5274            receipt.provenance_warnings
5275        );
5276    }
5277
5278    // --- Item 4 Task 2: ProvenanceMode tests ---
5279
5280    #[test]
5281    fn default_provenance_mode_is_warn() {
5282        // ProvenanceMode::Warn is the Default; a node without source_ref must warn, not error
5283        let db = NamedTempFile::new().expect("temporary db");
5284        let writer = WriterActor::start(
5285            db.path(),
5286            Arc::new(SchemaManager::new()),
5287            ProvenanceMode::default(),
5288            Arc::new(TelemetryCounters::default()),
5289        )
5290        .expect("writer");
5291
5292        let receipt = writer
5293            .submit(WriteRequest {
5294                label: "test".to_owned(),
5295                nodes: vec![NodeInsert {
5296                    row_id: "row-1".to_owned(),
5297                    logical_id: "node-1".to_owned(),
5298                    kind: "Note".to_owned(),
5299                    properties: "{}".to_owned(),
5300                    source_ref: None,
5301                    upsert: false,
5302                    chunk_policy: ChunkPolicy::Preserve,
5303                    content_ref: None,
5304                }],
5305                node_retires: vec![],
5306                edges: vec![],
5307                edge_retires: vec![],
5308                chunks: vec![],
5309                runs: vec![],
5310                steps: vec![],
5311                actions: vec![],
5312                optional_backfills: vec![],
5313                vec_inserts: vec![],
5314                operational_writes: vec![],
5315            })
5316            .expect("Warn mode must not reject missing source_ref");
5317
5318        assert!(
5319            !receipt.provenance_warnings.is_empty(),
5320            "Warn mode must emit a warning instead of rejecting"
5321        );
5322    }
5323
5324    #[test]
5325    fn require_mode_rejects_node_without_source_ref() {
5326        let db = NamedTempFile::new().expect("temporary db");
5327        let writer = WriterActor::start(
5328            db.path(),
5329            Arc::new(SchemaManager::new()),
5330            ProvenanceMode::Require,
5331            Arc::new(TelemetryCounters::default()),
5332        )
5333        .expect("writer");
5334
5335        let result = writer.submit(WriteRequest {
5336            label: "test".to_owned(),
5337            nodes: vec![NodeInsert {
5338                row_id: "row-1".to_owned(),
5339                logical_id: "node-1".to_owned(),
5340                kind: "Note".to_owned(),
5341                properties: "{}".to_owned(),
5342                source_ref: None,
5343                upsert: false,
5344                chunk_policy: ChunkPolicy::Preserve,
5345                content_ref: None,
5346            }],
5347            node_retires: vec![],
5348            edges: vec![],
5349            edge_retires: vec![],
5350            chunks: vec![],
5351            runs: vec![],
5352            steps: vec![],
5353            actions: vec![],
5354            optional_backfills: vec![],
5355            vec_inserts: vec![],
5356            operational_writes: vec![],
5357        });
5358
5359        assert!(
5360            matches!(result, Err(EngineError::InvalidWrite(_))),
5361            "Require mode must reject node without source_ref"
5362        );
5363    }
5364
5365    #[test]
5366    fn require_mode_accepts_node_with_source_ref() {
5367        let db = NamedTempFile::new().expect("temporary db");
5368        let writer = WriterActor::start(
5369            db.path(),
5370            Arc::new(SchemaManager::new()),
5371            ProvenanceMode::Require,
5372            Arc::new(TelemetryCounters::default()),
5373        )
5374        .expect("writer");
5375
5376        let result = writer.submit(WriteRequest {
5377            label: "test".to_owned(),
5378            nodes: vec![NodeInsert {
5379                row_id: "row-1".to_owned(),
5380                logical_id: "node-1".to_owned(),
5381                kind: "Note".to_owned(),
5382                properties: "{}".to_owned(),
5383                source_ref: Some("src-1".to_owned()),
5384                upsert: false,
5385                chunk_policy: ChunkPolicy::Preserve,
5386                content_ref: None,
5387            }],
5388            node_retires: vec![],
5389            edges: vec![],
5390            edge_retires: vec![],
5391            chunks: vec![],
5392            runs: vec![],
5393            steps: vec![],
5394            actions: vec![],
5395            optional_backfills: vec![],
5396            vec_inserts: vec![],
5397            operational_writes: vec![],
5398        });
5399
5400        assert!(
5401            result.is_ok(),
5402            "Require mode must accept node with source_ref"
5403        );
5404    }
5405
5406    #[test]
5407    fn require_mode_rejects_edge_without_source_ref() {
5408        let db = NamedTempFile::new().expect("temporary db");
5409        let writer = WriterActor::start(
5410            db.path(),
5411            Arc::new(SchemaManager::new()),
5412            ProvenanceMode::Require,
5413            Arc::new(TelemetryCounters::default()),
5414        )
5415        .expect("writer");
5416
5417        // seed nodes first so FK check doesn't interfere (Require mode rejects before DB touch)
5418        let result = writer.submit(WriteRequest {
5419            label: "test".to_owned(),
5420            nodes: vec![
5421                NodeInsert {
5422                    row_id: "row-a".to_owned(),
5423                    logical_id: "node-a".to_owned(),
5424                    kind: "Note".to_owned(),
5425                    properties: "{}".to_owned(),
5426                    source_ref: Some("src-1".to_owned()),
5427                    upsert: false,
5428                    chunk_policy: ChunkPolicy::Preserve,
5429                    content_ref: None,
5430                },
5431                NodeInsert {
5432                    row_id: "row-b".to_owned(),
5433                    logical_id: "node-b".to_owned(),
5434                    kind: "Note".to_owned(),
5435                    properties: "{}".to_owned(),
5436                    source_ref: Some("src-1".to_owned()),
5437                    upsert: false,
5438                    chunk_policy: ChunkPolicy::Preserve,
5439                    content_ref: None,
5440                },
5441            ],
5442            node_retires: vec![],
5443            edges: vec![EdgeInsert {
5444                row_id: "edge-row-1".to_owned(),
5445                logical_id: "edge-1".to_owned(),
5446                source_logical_id: "node-a".to_owned(),
5447                target_logical_id: "node-b".to_owned(),
5448                kind: "LINKS_TO".to_owned(),
5449                properties: "{}".to_owned(),
5450                source_ref: None,
5451                upsert: false,
5452            }],
5453            edge_retires: vec![],
5454            chunks: vec![],
5455            runs: vec![],
5456            steps: vec![],
5457            actions: vec![],
5458            optional_backfills: vec![],
5459            vec_inserts: vec![],
5460            operational_writes: vec![],
5461        });
5462
5463        assert!(
5464            matches!(result, Err(EngineError::InvalidWrite(_))),
5465            "Require mode must reject edge without source_ref"
5466        );
5467    }
5468
5469    // --- Item 5: FTS projection coverage tests ---
5470
5471    #[test]
5472    fn fts_row_has_correct_kind_from_co_submitted_node() {
5473        let db = NamedTempFile::new().expect("temporary db");
5474        let writer = WriterActor::start(
5475            db.path(),
5476            Arc::new(SchemaManager::new()),
5477            ProvenanceMode::Warn,
5478            Arc::new(TelemetryCounters::default()),
5479        )
5480        .expect("writer");
5481
5482        writer
5483            .submit(WriteRequest {
5484                label: "test".to_owned(),
5485                nodes: vec![NodeInsert {
5486                    row_id: "row-1".to_owned(),
5487                    logical_id: "node-1".to_owned(),
5488                    kind: "Meeting".to_owned(),
5489                    properties: "{}".to_owned(),
5490                    source_ref: Some("src-1".to_owned()),
5491                    upsert: false,
5492                    chunk_policy: ChunkPolicy::Preserve,
5493                    content_ref: None,
5494                }],
5495                node_retires: vec![],
5496                edges: vec![],
5497                edge_retires: vec![],
5498                chunks: vec![ChunkInsert {
5499                    id: "chunk-1".to_owned(),
5500                    node_logical_id: "node-1".to_owned(),
5501                    text_content: "some text".to_owned(),
5502                    byte_start: None,
5503                    byte_end: None,
5504                    content_hash: None,
5505                }],
5506                runs: vec![],
5507                steps: vec![],
5508                actions: vec![],
5509                optional_backfills: vec![],
5510                vec_inserts: vec![],
5511                operational_writes: vec![],
5512            })
5513            .expect("write");
5514
5515        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5516        let kind: String = conn
5517            .query_row(
5518                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5519                [],
5520                |row| row.get(0),
5521            )
5522            .expect("fts row");
5523
5524        assert_eq!(kind, "Meeting");
5525    }
5526
5527    #[test]
5528    fn fts_row_has_correct_text_content() {
5529        let db = NamedTempFile::new().expect("temporary db");
5530        let writer = WriterActor::start(
5531            db.path(),
5532            Arc::new(SchemaManager::new()),
5533            ProvenanceMode::Warn,
5534            Arc::new(TelemetryCounters::default()),
5535        )
5536        .expect("writer");
5537
5538        writer
5539            .submit(WriteRequest {
5540                label: "test".to_owned(),
5541                nodes: vec![NodeInsert {
5542                    row_id: "row-1".to_owned(),
5543                    logical_id: "node-1".to_owned(),
5544                    kind: "Note".to_owned(),
5545                    properties: "{}".to_owned(),
5546                    source_ref: Some("src-1".to_owned()),
5547                    upsert: false,
5548                    chunk_policy: ChunkPolicy::Preserve,
5549                    content_ref: None,
5550                }],
5551                node_retires: vec![],
5552                edges: vec![],
5553                edge_retires: vec![],
5554                chunks: vec![ChunkInsert {
5555                    id: "chunk-1".to_owned(),
5556                    node_logical_id: "node-1".to_owned(),
5557                    text_content: "exactly this text".to_owned(),
5558                    byte_start: None,
5559                    byte_end: None,
5560                    content_hash: None,
5561                }],
5562                runs: vec![],
5563                steps: vec![],
5564                actions: vec![],
5565                optional_backfills: vec![],
5566                vec_inserts: vec![],
5567                operational_writes: vec![],
5568            })
5569            .expect("write");
5570
5571        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5572        let text: String = conn
5573            .query_row(
5574                "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5575                [],
5576                |row| row.get(0),
5577            )
5578            .expect("fts row");
5579
5580        assert_eq!(text, "exactly this text");
5581    }
5582
5583    #[test]
5584    fn fts_row_has_correct_kind_from_pre_existing_node() {
5585        let db = NamedTempFile::new().expect("temporary db");
5586        let writer = WriterActor::start(
5587            db.path(),
5588            Arc::new(SchemaManager::new()),
5589            ProvenanceMode::Warn,
5590            Arc::new(TelemetryCounters::default()),
5591        )
5592        .expect("writer");
5593
5594        // Request 1: node only
5595        writer
5596            .submit(WriteRequest {
5597                label: "r1".to_owned(),
5598                nodes: vec![NodeInsert {
5599                    row_id: "row-1".to_owned(),
5600                    logical_id: "node-1".to_owned(),
5601                    kind: "Document".to_owned(),
5602                    properties: "{}".to_owned(),
5603                    source_ref: Some("src-1".to_owned()),
5604                    upsert: false,
5605                    chunk_policy: ChunkPolicy::Preserve,
5606                    content_ref: None,
5607                }],
5608                node_retires: vec![],
5609                edges: vec![],
5610                edge_retires: vec![],
5611                chunks: vec![],
5612                runs: vec![],
5613                steps: vec![],
5614                actions: vec![],
5615                optional_backfills: vec![],
5616                vec_inserts: vec![],
5617                operational_writes: vec![],
5618            })
5619            .expect("r1 write");
5620
5621        // Request 2: chunk for pre-existing node
5622        writer
5623            .submit(WriteRequest {
5624                label: "r2".to_owned(),
5625                nodes: vec![],
5626                node_retires: vec![],
5627                edges: vec![],
5628                edge_retires: vec![],
5629                chunks: vec![ChunkInsert {
5630                    id: "chunk-1".to_owned(),
5631                    node_logical_id: "node-1".to_owned(),
5632                    text_content: "some text".to_owned(),
5633                    byte_start: None,
5634                    byte_end: None,
5635                    content_hash: None,
5636                }],
5637                runs: vec![],
5638                steps: vec![],
5639                actions: vec![],
5640                optional_backfills: vec![],
5641                vec_inserts: vec![],
5642                operational_writes: vec![],
5643            })
5644            .expect("r2 write");
5645
5646        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5647        let kind: String = conn
5648            .query_row(
5649                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5650                [],
5651                |row| row.get(0),
5652            )
5653            .expect("fts row");
5654
5655        assert_eq!(kind, "Document");
5656    }
5657
5658    #[test]
5659    fn fts_derives_rows_for_multiple_chunks_per_node() {
5660        let db = NamedTempFile::new().expect("temporary db");
5661        let writer = WriterActor::start(
5662            db.path(),
5663            Arc::new(SchemaManager::new()),
5664            ProvenanceMode::Warn,
5665            Arc::new(TelemetryCounters::default()),
5666        )
5667        .expect("writer");
5668
5669        writer
5670            .submit(WriteRequest {
5671                label: "test".to_owned(),
5672                nodes: vec![NodeInsert {
5673                    row_id: "row-1".to_owned(),
5674                    logical_id: "node-1".to_owned(),
5675                    kind: "Meeting".to_owned(),
5676                    properties: "{}".to_owned(),
5677                    source_ref: Some("src-1".to_owned()),
5678                    upsert: false,
5679                    chunk_policy: ChunkPolicy::Preserve,
5680                    content_ref: None,
5681                }],
5682                node_retires: vec![],
5683                edges: vec![],
5684                edge_retires: vec![],
5685                chunks: vec![
5686                    ChunkInsert {
5687                        id: "chunk-a".to_owned(),
5688                        node_logical_id: "node-1".to_owned(),
5689                        text_content: "intro".to_owned(),
5690                        byte_start: None,
5691                        byte_end: None,
5692                        content_hash: None,
5693                    },
5694                    ChunkInsert {
5695                        id: "chunk-b".to_owned(),
5696                        node_logical_id: "node-1".to_owned(),
5697                        text_content: "body".to_owned(),
5698                        byte_start: None,
5699                        byte_end: None,
5700                        content_hash: None,
5701                    },
5702                    ChunkInsert {
5703                        id: "chunk-c".to_owned(),
5704                        node_logical_id: "node-1".to_owned(),
5705                        text_content: "conclusion".to_owned(),
5706                        byte_start: None,
5707                        byte_end: None,
5708                        content_hash: None,
5709                    },
5710                ],
5711                runs: vec![],
5712                steps: vec![],
5713                actions: vec![],
5714                optional_backfills: vec![],
5715                vec_inserts: vec![],
5716                operational_writes: vec![],
5717            })
5718            .expect("write");
5719
5720        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5721        let count: i64 = conn
5722            .query_row(
5723                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5724                [],
5725                |row| row.get(0),
5726            )
5727            .expect("fts count");
5728
5729        assert_eq!(count, 3, "three chunks must produce three FTS rows");
5730    }
5731
5732    #[test]
5733    fn fts_resolves_mixed_fast_and_db_paths() {
5734        let db = NamedTempFile::new().expect("temporary db");
5735        let writer = WriterActor::start(
5736            db.path(),
5737            Arc::new(SchemaManager::new()),
5738            ProvenanceMode::Warn,
5739            Arc::new(TelemetryCounters::default()),
5740        )
5741        .expect("writer");
5742
5743        // Seed pre-existing node
5744        writer
5745            .submit(WriteRequest {
5746                label: "seed".to_owned(),
5747                nodes: vec![NodeInsert {
5748                    row_id: "row-existing".to_owned(),
5749                    logical_id: "existing-node".to_owned(),
5750                    kind: "Archive".to_owned(),
5751                    properties: "{}".to_owned(),
5752                    source_ref: Some("src-1".to_owned()),
5753                    upsert: false,
5754                    chunk_policy: ChunkPolicy::Preserve,
5755                    content_ref: None,
5756                }],
5757                node_retires: vec![],
5758                edges: vec![],
5759                edge_retires: vec![],
5760                chunks: vec![],
5761                runs: vec![],
5762                steps: vec![],
5763                actions: vec![],
5764                optional_backfills: vec![],
5765                vec_inserts: vec![],
5766                operational_writes: vec![],
5767            })
5768            .expect("seed");
5769
5770        // Mixed request: new node (fast path) + chunk for pre-existing node (DB path)
5771        writer
5772            .submit(WriteRequest {
5773                label: "mixed".to_owned(),
5774                nodes: vec![NodeInsert {
5775                    row_id: "row-new".to_owned(),
5776                    logical_id: "new-node".to_owned(),
5777                    kind: "Inbox".to_owned(),
5778                    properties: "{}".to_owned(),
5779                    source_ref: Some("src-2".to_owned()),
5780                    upsert: false,
5781                    chunk_policy: ChunkPolicy::Preserve,
5782                    content_ref: None,
5783                }],
5784                node_retires: vec![],
5785                edges: vec![],
5786                edge_retires: vec![],
5787                chunks: vec![
5788                    ChunkInsert {
5789                        id: "chunk-fast".to_owned(),
5790                        node_logical_id: "new-node".to_owned(),
5791                        text_content: "new content".to_owned(),
5792                        byte_start: None,
5793                        byte_end: None,
5794                        content_hash: None,
5795                    },
5796                    ChunkInsert {
5797                        id: "chunk-db".to_owned(),
5798                        node_logical_id: "existing-node".to_owned(),
5799                        text_content: "archive content".to_owned(),
5800                        byte_start: None,
5801                        byte_end: None,
5802                        content_hash: None,
5803                    },
5804                ],
5805                runs: vec![],
5806                steps: vec![],
5807                actions: vec![],
5808                optional_backfills: vec![],
5809                vec_inserts: vec![],
5810                operational_writes: vec![],
5811            })
5812            .expect("mixed write");
5813
5814        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5815        let fast_kind: String = conn
5816            .query_row(
5817                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
5818                [],
5819                |row| row.get(0),
5820            )
5821            .expect("fast path fts row");
5822        let db_kind: String = conn
5823            .query_row(
5824                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
5825                [],
5826                |row| row.get(0),
5827            )
5828            .expect("db path fts row");
5829
5830        assert_eq!(fast_kind, "Inbox");
5831        assert_eq!(db_kind, "Archive");
5832    }
5833
5834    #[test]
5835    fn prepare_write_rejects_empty_chunk_text() {
5836        let db = NamedTempFile::new().expect("temporary db");
5837        let writer = WriterActor::start(
5838            db.path(),
5839            Arc::new(SchemaManager::new()),
5840            ProvenanceMode::Warn,
5841            Arc::new(TelemetryCounters::default()),
5842        )
5843        .expect("writer");
5844
5845        let result = writer.submit(WriteRequest {
5846            label: "test".to_owned(),
5847            nodes: vec![NodeInsert {
5848                row_id: "row-1".to_owned(),
5849                logical_id: "node-1".to_owned(),
5850                kind: "Note".to_owned(),
5851                properties: "{}".to_owned(),
5852                source_ref: None,
5853                upsert: false,
5854                chunk_policy: ChunkPolicy::Preserve,
5855                content_ref: None,
5856            }],
5857            node_retires: vec![],
5858            edges: vec![],
5859            edge_retires: vec![],
5860            chunks: vec![ChunkInsert {
5861                id: "chunk-1".to_owned(),
5862                node_logical_id: "node-1".to_owned(),
5863                text_content: String::new(),
5864                byte_start: None,
5865                byte_end: None,
5866                content_hash: None,
5867            }],
5868            runs: vec![],
5869            steps: vec![],
5870            actions: vec![],
5871            optional_backfills: vec![],
5872            vec_inserts: vec![],
5873            operational_writes: vec![],
5874        });
5875
5876        assert!(
5877            matches!(result, Err(EngineError::InvalidWrite(_))),
5878            "empty text_content must be rejected"
5879        );
5880    }
5881
5882    #[test]
5883    fn receipt_reports_zero_backfills_when_none_submitted() {
5884        let db = NamedTempFile::new().expect("temporary db");
5885        let writer = WriterActor::start(
5886            db.path(),
5887            Arc::new(SchemaManager::new()),
5888            ProvenanceMode::Warn,
5889            Arc::new(TelemetryCounters::default()),
5890        )
5891        .expect("writer");
5892
5893        let receipt = writer
5894            .submit(WriteRequest {
5895                label: "test".to_owned(),
5896                nodes: vec![NodeInsert {
5897                    row_id: "row-1".to_owned(),
5898                    logical_id: "node-1".to_owned(),
5899                    kind: "Note".to_owned(),
5900                    properties: "{}".to_owned(),
5901                    source_ref: Some("src-1".to_owned()),
5902                    upsert: false,
5903                    chunk_policy: ChunkPolicy::Preserve,
5904                    content_ref: None,
5905                }],
5906                node_retires: vec![],
5907                edges: vec![],
5908                edge_retires: vec![],
5909                chunks: vec![],
5910                runs: vec![],
5911                steps: vec![],
5912                actions: vec![],
5913                optional_backfills: vec![],
5914                vec_inserts: vec![],
5915                operational_writes: vec![],
5916            })
5917            .expect("write");
5918
5919        assert_eq!(receipt.optional_backfill_count, 0);
5920    }
5921
5922    #[test]
5923    fn receipt_reports_correct_backfill_count() {
5924        let db = NamedTempFile::new().expect("temporary db");
5925        let writer = WriterActor::start(
5926            db.path(),
5927            Arc::new(SchemaManager::new()),
5928            ProvenanceMode::Warn,
5929            Arc::new(TelemetryCounters::default()),
5930        )
5931        .expect("writer");
5932
5933        let receipt = writer
5934            .submit(WriteRequest {
5935                label: "test".to_owned(),
5936                nodes: vec![NodeInsert {
5937                    row_id: "row-1".to_owned(),
5938                    logical_id: "node-1".to_owned(),
5939                    kind: "Note".to_owned(),
5940                    properties: "{}".to_owned(),
5941                    source_ref: Some("src-1".to_owned()),
5942                    upsert: false,
5943                    chunk_policy: ChunkPolicy::Preserve,
5944                    content_ref: None,
5945                }],
5946                node_retires: vec![],
5947                edges: vec![],
5948                edge_retires: vec![],
5949                chunks: vec![],
5950                runs: vec![],
5951                steps: vec![],
5952                actions: vec![],
5953                optional_backfills: vec![
5954                    OptionalProjectionTask {
5955                        target: ProjectionTarget::Fts,
5956                        payload: "p1".to_owned(),
5957                    },
5958                    OptionalProjectionTask {
5959                        target: ProjectionTarget::Vec,
5960                        payload: "p2".to_owned(),
5961                    },
5962                    OptionalProjectionTask {
5963                        target: ProjectionTarget::All,
5964                        payload: "p3".to_owned(),
5965                    },
5966                ],
5967                vec_inserts: vec![],
5968                operational_writes: vec![],
5969            })
5970            .expect("write");
5971
5972        assert_eq!(receipt.optional_backfill_count, 3);
5973    }
5974
5975    #[test]
5976    fn backfill_tasks_are_not_executed_during_write() {
5977        let db = NamedTempFile::new().expect("temporary db");
5978        let writer = WriterActor::start(
5979            db.path(),
5980            Arc::new(SchemaManager::new()),
5981            ProvenanceMode::Warn,
5982            Arc::new(TelemetryCounters::default()),
5983        )
5984        .expect("writer");
5985
5986        // Write a node + chunk. Submit a backfill task targeting FTS.
5987        // The write path must not create any extra FTS rows beyond the required one.
5988        writer
5989            .submit(WriteRequest {
5990                label: "test".to_owned(),
5991                nodes: vec![NodeInsert {
5992                    row_id: "row-1".to_owned(),
5993                    logical_id: "node-1".to_owned(),
5994                    kind: "Note".to_owned(),
5995                    properties: "{}".to_owned(),
5996                    source_ref: Some("src-1".to_owned()),
5997                    upsert: false,
5998                    chunk_policy: ChunkPolicy::Preserve,
5999                    content_ref: None,
6000                }],
6001                node_retires: vec![],
6002                edges: vec![],
6003                edge_retires: vec![],
6004                chunks: vec![ChunkInsert {
6005                    id: "chunk-1".to_owned(),
6006                    node_logical_id: "node-1".to_owned(),
6007                    text_content: "required text".to_owned(),
6008                    byte_start: None,
6009                    byte_end: None,
6010                    content_hash: None,
6011                }],
6012                runs: vec![],
6013                steps: vec![],
6014                actions: vec![],
6015                optional_backfills: vec![OptionalProjectionTask {
6016                    target: ProjectionTarget::Fts,
6017                    payload: "backfill-payload".to_owned(),
6018                }],
6019                vec_inserts: vec![],
6020                operational_writes: vec![],
6021            })
6022            .expect("write");
6023
6024        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6025        let count: i64 = conn
6026            .query_row(
6027                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6028                [],
6029                |row| row.get(0),
6030            )
6031            .expect("fts count");
6032
6033        assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6034    }
6035
6036    #[test]
6037    fn fts_row_uses_new_kind_after_node_replace() {
6038        let db = NamedTempFile::new().expect("temporary db");
6039        let writer = WriterActor::start(
6040            db.path(),
6041            Arc::new(SchemaManager::new()),
6042            ProvenanceMode::Warn,
6043            Arc::new(TelemetryCounters::default()),
6044        )
6045        .expect("writer");
6046
6047        // Write original node as "Note"
6048        writer
6049            .submit(WriteRequest {
6050                label: "v1".to_owned(),
6051                nodes: vec![NodeInsert {
6052                    row_id: "row-1".to_owned(),
6053                    logical_id: "node-1".to_owned(),
6054                    kind: "Note".to_owned(),
6055                    properties: "{}".to_owned(),
6056                    source_ref: Some("src-1".to_owned()),
6057                    upsert: false,
6058                    chunk_policy: ChunkPolicy::Preserve,
6059                    content_ref: None,
6060                }],
6061                node_retires: vec![],
6062                edges: vec![],
6063                edge_retires: vec![],
6064                chunks: vec![ChunkInsert {
6065                    id: "chunk-v1".to_owned(),
6066                    node_logical_id: "node-1".to_owned(),
6067                    text_content: "original".to_owned(),
6068                    byte_start: None,
6069                    byte_end: None,
6070                    content_hash: None,
6071                }],
6072                runs: vec![],
6073                steps: vec![],
6074                actions: vec![],
6075                optional_backfills: vec![],
6076                vec_inserts: vec![],
6077                operational_writes: vec![],
6078            })
6079            .expect("v1 write");
6080
6081        // Replace with "Meeting" kind + new chunk using ChunkPolicy::Replace
6082        writer
6083            .submit(WriteRequest {
6084                label: "v2".to_owned(),
6085                nodes: vec![NodeInsert {
6086                    row_id: "row-2".to_owned(),
6087                    logical_id: "node-1".to_owned(),
6088                    kind: "Meeting".to_owned(),
6089                    properties: "{}".to_owned(),
6090                    source_ref: Some("src-2".to_owned()),
6091                    upsert: true,
6092                    chunk_policy: ChunkPolicy::Replace,
6093                    content_ref: None,
6094                }],
6095                node_retires: vec![],
6096                edges: vec![],
6097                edge_retires: vec![],
6098                chunks: vec![ChunkInsert {
6099                    id: "chunk-v2".to_owned(),
6100                    node_logical_id: "node-1".to_owned(),
6101                    text_content: "updated".to_owned(),
6102                    byte_start: None,
6103                    byte_end: None,
6104                    content_hash: None,
6105                }],
6106                runs: vec![],
6107                steps: vec![],
6108                actions: vec![],
6109                optional_backfills: vec![],
6110                vec_inserts: vec![],
6111                operational_writes: vec![],
6112            })
6113            .expect("v2 write");
6114
6115        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6116
6117        // Old FTS row must be gone
6118        let old_count: i64 = conn
6119            .query_row(
6120                "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6121                [],
6122                |row| row.get(0),
6123            )
6124            .expect("old fts count");
6125        assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6126
6127        // New FTS row must use the new kind
6128        let new_kind: String = conn
6129            .query_row(
6130                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6131                [],
6132                |row| row.get(0),
6133            )
6134            .expect("new fts row");
6135        assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6136    }
6137
6138    // --- Item 3: VecInsert tests ---
6139
6140    #[test]
6141    fn vec_insert_empty_chunk_id_is_rejected() {
6142        let db = NamedTempFile::new().expect("temporary db");
6143        let writer = WriterActor::start(
6144            db.path(),
6145            Arc::new(SchemaManager::new()),
6146            ProvenanceMode::Warn,
6147            Arc::new(TelemetryCounters::default()),
6148        )
6149        .expect("writer");
6150        let result = writer.submit(WriteRequest {
6151            label: "vec-test".to_owned(),
6152            nodes: vec![],
6153            node_retires: vec![],
6154            edges: vec![],
6155            edge_retires: vec![],
6156            chunks: vec![],
6157            runs: vec![],
6158            steps: vec![],
6159            actions: vec![],
6160            optional_backfills: vec![],
6161            vec_inserts: vec![VecInsert {
6162                chunk_id: String::new(),
6163                embedding: vec![0.1, 0.2, 0.3],
6164            }],
6165            operational_writes: vec![],
6166        });
6167        assert!(
6168            matches!(result, Err(EngineError::InvalidWrite(_))),
6169            "empty chunk_id in VecInsert must be rejected"
6170        );
6171    }
6172
6173    #[test]
6174    fn vec_insert_empty_embedding_is_rejected() {
6175        let db = NamedTempFile::new().expect("temporary db");
6176        let writer = WriterActor::start(
6177            db.path(),
6178            Arc::new(SchemaManager::new()),
6179            ProvenanceMode::Warn,
6180            Arc::new(TelemetryCounters::default()),
6181        )
6182        .expect("writer");
6183        let result = writer.submit(WriteRequest {
6184            label: "vec-test".to_owned(),
6185            nodes: vec![],
6186            node_retires: vec![],
6187            edges: vec![],
6188            edge_retires: vec![],
6189            chunks: vec![],
6190            runs: vec![],
6191            steps: vec![],
6192            actions: vec![],
6193            optional_backfills: vec![],
6194            vec_inserts: vec![VecInsert {
6195                chunk_id: "chunk-1".to_owned(),
6196                embedding: vec![],
6197            }],
6198            operational_writes: vec![],
6199        });
6200        assert!(
6201            matches!(result, Err(EngineError::InvalidWrite(_))),
6202            "empty embedding in VecInsert must be rejected"
6203        );
6204    }
6205
6206    #[test]
6207    fn vec_insert_noop_without_feature() {
6208        // Without the sqlite-vec feature, a well-formed VecInsert must succeed
6209        // (no error) but not write to any vec table.
6210        let db = NamedTempFile::new().expect("temporary db");
6211        let writer = WriterActor::start(
6212            db.path(),
6213            Arc::new(SchemaManager::new()),
6214            ProvenanceMode::Warn,
6215            Arc::new(TelemetryCounters::default()),
6216        )
6217        .expect("writer");
6218        let result = writer.submit(WriteRequest {
6219            label: "vec-noop".to_owned(),
6220            nodes: vec![],
6221            node_retires: vec![],
6222            edges: vec![],
6223            edge_retires: vec![],
6224            chunks: vec![],
6225            runs: vec![],
6226            steps: vec![],
6227            actions: vec![],
6228            optional_backfills: vec![],
6229            vec_inserts: vec![VecInsert {
6230                chunk_id: "chunk-noop".to_owned(),
6231                embedding: vec![1.0, 2.0, 3.0],
6232            }],
6233            operational_writes: vec![],
6234        });
6235        #[cfg(not(feature = "sqlite-vec"))]
6236        result.expect("noop VecInsert without feature must succeed");
6237        // The result variable is used above; silence unused warning for cfg-on path.
6238        #[cfg(feature = "sqlite-vec")]
6239        let _ = result;
6240    }
6241
6242    #[cfg(feature = "sqlite-vec")]
6243    #[test]
6244    fn node_retire_preserves_vec_rows_for_later_restore() {
6245        use crate::sqlite::open_connection_with_vec;
6246
6247        let db = NamedTempFile::new().expect("temporary db");
6248        let schema_manager = Arc::new(SchemaManager::new());
6249
6250        {
6251            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6252            schema_manager.bootstrap(&conn).expect("bootstrap");
6253        }
6254
6255        let writer = WriterActor::start(
6256            db.path(),
6257            Arc::clone(&schema_manager),
6258            ProvenanceMode::Warn,
6259            Arc::new(TelemetryCounters::default()),
6260        )
6261        .expect("writer");
6262
6263        // Insert node + chunk + vec row
6264        writer
6265            .submit(WriteRequest {
6266                label: "setup".to_owned(),
6267                nodes: vec![NodeInsert {
6268                    row_id: "row-retire-vec".to_owned(),
6269                    logical_id: "node-retire-vec".to_owned(),
6270                    kind: "Doc".to_owned(),
6271                    properties: "{}".to_owned(),
6272                    source_ref: Some("src".to_owned()),
6273                    upsert: false,
6274                    chunk_policy: ChunkPolicy::Preserve,
6275                    content_ref: None,
6276                }],
6277                node_retires: vec![],
6278                edges: vec![],
6279                edge_retires: vec![],
6280                chunks: vec![ChunkInsert {
6281                    id: "chunk-retire-vec".to_owned(),
6282                    node_logical_id: "node-retire-vec".to_owned(),
6283                    text_content: "text".to_owned(),
6284                    byte_start: None,
6285                    byte_end: None,
6286                    content_hash: None,
6287                }],
6288                runs: vec![],
6289                steps: vec![],
6290                actions: vec![],
6291                optional_backfills: vec![],
6292                vec_inserts: vec![VecInsert {
6293                    chunk_id: "chunk-retire-vec".to_owned(),
6294                    embedding: vec![0.1, 0.2, 0.3],
6295                }],
6296                operational_writes: vec![],
6297            })
6298            .expect("setup write");
6299
6300        // Retire the node
6301        writer
6302            .submit(WriteRequest {
6303                label: "retire".to_owned(),
6304                nodes: vec![],
6305                node_retires: vec![NodeRetire {
6306                    logical_id: "node-retire-vec".to_owned(),
6307                    source_ref: Some("src".to_owned()),
6308                }],
6309                edges: vec![],
6310                edge_retires: vec![],
6311                chunks: vec![],
6312                runs: vec![],
6313                steps: vec![],
6314                actions: vec![],
6315                optional_backfills: vec![],
6316                vec_inserts: vec![],
6317                operational_writes: vec![],
6318            })
6319            .expect("retire write");
6320
6321        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6322        let count: i64 = conn
6323            .query_row(
6324                "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-retire-vec'",
6325                [],
6326                |row| row.get(0),
6327            )
6328            .expect("count");
6329        assert_eq!(
6330            count, 1,
6331            "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6332        );
6333    }
6334
6335    #[cfg(feature = "sqlite-vec")]
6336    #[test]
6337    #[allow(clippy::too_many_lines)]
6338    fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6339        use crate::sqlite::open_connection_with_vec;
6340
6341        let db = NamedTempFile::new().expect("temporary db");
6342        let schema_manager = Arc::new(SchemaManager::new());
6343
6344        {
6345            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6346            schema_manager.bootstrap(&conn).expect("bootstrap");
6347        }
6348
6349        let writer = WriterActor::start(
6350            db.path(),
6351            Arc::clone(&schema_manager),
6352            ProvenanceMode::Warn,
6353            Arc::new(TelemetryCounters::default()),
6354        )
6355        .expect("writer");
6356
6357        // Insert node + chunk-A + vec-A
6358        writer
6359            .submit(WriteRequest {
6360                label: "v1".to_owned(),
6361                nodes: vec![NodeInsert {
6362                    row_id: "row-replace-v1".to_owned(),
6363                    logical_id: "node-replace-vec".to_owned(),
6364                    kind: "Doc".to_owned(),
6365                    properties: "{}".to_owned(),
6366                    source_ref: Some("src".to_owned()),
6367                    upsert: false,
6368                    chunk_policy: ChunkPolicy::Preserve,
6369                    content_ref: None,
6370                }],
6371                node_retires: vec![],
6372                edges: vec![],
6373                edge_retires: vec![],
6374                chunks: vec![ChunkInsert {
6375                    id: "chunk-replace-A".to_owned(),
6376                    node_logical_id: "node-replace-vec".to_owned(),
6377                    text_content: "version one".to_owned(),
6378                    byte_start: None,
6379                    byte_end: None,
6380                    content_hash: None,
6381                }],
6382                runs: vec![],
6383                steps: vec![],
6384                actions: vec![],
6385                optional_backfills: vec![],
6386                vec_inserts: vec![VecInsert {
6387                    chunk_id: "chunk-replace-A".to_owned(),
6388                    embedding: vec![0.1, 0.2, 0.3],
6389                }],
6390                operational_writes: vec![],
6391            })
6392            .expect("v1 write");
6393
6394        // Upsert with Replace + chunk-B + vec-B
6395        writer
6396            .submit(WriteRequest {
6397                label: "v2".to_owned(),
6398                nodes: vec![NodeInsert {
6399                    row_id: "row-replace-v2".to_owned(),
6400                    logical_id: "node-replace-vec".to_owned(),
6401                    kind: "Doc".to_owned(),
6402                    properties: "{}".to_owned(),
6403                    source_ref: Some("src".to_owned()),
6404                    upsert: true,
6405                    chunk_policy: ChunkPolicy::Replace,
6406                    content_ref: None,
6407                }],
6408                node_retires: vec![],
6409                edges: vec![],
6410                edge_retires: vec![],
6411                chunks: vec![ChunkInsert {
6412                    id: "chunk-replace-B".to_owned(),
6413                    node_logical_id: "node-replace-vec".to_owned(),
6414                    text_content: "version two".to_owned(),
6415                    byte_start: None,
6416                    byte_end: None,
6417                    content_hash: None,
6418                }],
6419                runs: vec![],
6420                steps: vec![],
6421                actions: vec![],
6422                optional_backfills: vec![],
6423                vec_inserts: vec![VecInsert {
6424                    chunk_id: "chunk-replace-B".to_owned(),
6425                    embedding: vec![0.4, 0.5, 0.6],
6426                }],
6427                operational_writes: vec![],
6428            })
6429            .expect("v2 write");
6430
6431        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6432        let count_a: i64 = conn
6433            .query_row(
6434                "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-replace-A'",
6435                [],
6436                |row| row.get(0),
6437            )
6438            .expect("count A");
6439        let count_b: i64 = conn
6440            .query_row(
6441                "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-replace-B'",
6442                [],
6443                |row| row.get(0),
6444            )
6445            .expect("count B");
6446        assert_eq!(
6447            count_a, 0,
6448            "old vec row (chunk-A) must be deleted on Replace"
6449        );
6450        assert_eq!(
6451            count_b, 1,
6452            "new vec row (chunk-B) must be present after Replace"
6453        );
6454    }
6455
6456    #[cfg(feature = "sqlite-vec")]
6457    #[test]
6458    fn vec_insert_is_persisted_when_feature_enabled() {
6459        use crate::sqlite::open_connection_with_vec;
6460
6461        let db = NamedTempFile::new().expect("temporary db");
6462        let schema_manager = Arc::new(SchemaManager::new());
6463
6464        // Bootstrap the schema (vec table is auto-created on first VecInsert).
6465        {
6466            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6467            schema_manager.bootstrap(&conn).expect("bootstrap");
6468        }
6469
6470        let writer = WriterActor::start(
6471            db.path(),
6472            Arc::clone(&schema_manager),
6473            ProvenanceMode::Warn,
6474            Arc::new(TelemetryCounters::default()),
6475        )
6476        .expect("writer");
6477
6478        // Submit a node + chunk + vec insert so kind can be resolved from the request.
6479        writer
6480            .submit(WriteRequest {
6481                label: "vec-insert".to_owned(),
6482                nodes: vec![NodeInsert {
6483                    row_id: "row-vec".to_owned(),
6484                    logical_id: "node-vec".to_owned(),
6485                    kind: "Document".to_owned(),
6486                    properties: "{}".to_owned(),
6487                    source_ref: Some("test".to_owned()),
6488                    upsert: false,
6489                    chunk_policy: ChunkPolicy::Preserve,
6490                    content_ref: None,
6491                }],
6492                node_retires: vec![],
6493                edges: vec![],
6494                edge_retires: vec![],
6495                chunks: vec![ChunkInsert {
6496                    id: "chunk-vec".to_owned(),
6497                    node_logical_id: "node-vec".to_owned(),
6498                    text_content: "test text".to_owned(),
6499                    byte_start: None,
6500                    byte_end: None,
6501                    content_hash: None,
6502                }],
6503                runs: vec![],
6504                steps: vec![],
6505                actions: vec![],
6506                optional_backfills: vec![],
6507                vec_inserts: vec![VecInsert {
6508                    chunk_id: "chunk-vec".to_owned(),
6509                    embedding: vec![0.1, 0.2, 0.3],
6510                }],
6511                operational_writes: vec![],
6512            })
6513            .expect("vec insert write");
6514
6515        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6516        let count: i64 = conn
6517            .query_row(
6518                "SELECT count(*) FROM vec_document WHERE chunk_id = 'chunk-vec'",
6519                [],
6520                |row| row.get(0),
6521            )
6522            .expect("count");
6523        assert_eq!(count, 1, "VecInsert must persist a row in vec_document");
6524    }
6525
6526    // --- WriteRequest size validation tests ---
6527
6528    #[test]
6529    fn write_request_exceeding_node_limit_is_rejected() {
6530        let nodes: Vec<NodeInsert> = (0..10_001)
6531            .map(|i| NodeInsert {
6532                row_id: format!("row-{i}"),
6533                logical_id: format!("lg-{i}"),
6534                kind: "Note".to_owned(),
6535                properties: "{}".to_owned(),
6536                source_ref: None,
6537                upsert: false,
6538                chunk_policy: ChunkPolicy::Preserve,
6539                content_ref: None,
6540            })
6541            .collect();
6542
6543        let request = WriteRequest {
6544            label: "too-many-nodes".to_owned(),
6545            nodes,
6546            node_retires: vec![],
6547            edges: vec![],
6548            edge_retires: vec![],
6549            chunks: vec![],
6550            runs: vec![],
6551            steps: vec![],
6552            actions: vec![],
6553            optional_backfills: vec![],
6554            vec_inserts: vec![],
6555            operational_writes: vec![],
6556        };
6557
6558        let result = prepare_write(request, ProvenanceMode::Warn)
6559            .map(|_| ())
6560            .map_err(|e| format!("{e}"));
6561        assert!(
6562            matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6563            "exceeding node limit must return InvalidWrite: got {result:?}"
6564        );
6565    }
6566
6567    #[test]
6568    fn write_request_exceeding_total_limit_is_rejected() {
6569        // Spread items across fields to exceed 100_000 total
6570        // without exceeding any single per-field limit.
6571        // nodes(10k) + edges(10k) + chunks(50k) + vec_inserts(20001) + operational(10k) = 100_001
6572        let request = WriteRequest {
6573            label: "too-many-total".to_owned(),
6574            nodes: (0..10_000)
6575                .map(|i| NodeInsert {
6576                    row_id: format!("row-{i}"),
6577                    logical_id: format!("lg-{i}"),
6578                    kind: "Note".to_owned(),
6579                    properties: "{}".to_owned(),
6580                    source_ref: None,
6581                    upsert: false,
6582                    chunk_policy: ChunkPolicy::Preserve,
6583                    content_ref: None,
6584                })
6585                .collect(),
6586            node_retires: vec![],
6587            edges: (0..10_000)
6588                .map(|i| EdgeInsert {
6589                    row_id: format!("edge-row-{i}"),
6590                    logical_id: format!("edge-lg-{i}"),
6591                    kind: "link".to_owned(),
6592                    source_logical_id: format!("lg-{i}"),
6593                    target_logical_id: format!("lg-{}", i + 1),
6594                    properties: "{}".to_owned(),
6595                    source_ref: None,
6596                    upsert: false,
6597                })
6598                .collect(),
6599            edge_retires: vec![],
6600            chunks: (0..50_000)
6601                .map(|i| ChunkInsert {
6602                    id: format!("chunk-{i}"),
6603                    node_logical_id: "lg-0".to_owned(),
6604                    text_content: "text".to_owned(),
6605                    byte_start: None,
6606                    byte_end: None,
6607                    content_hash: None,
6608                })
6609                .collect(),
6610            runs: vec![],
6611            steps: vec![],
6612            actions: vec![],
6613            optional_backfills: vec![],
6614            vec_inserts: (0..20_001)
6615                .map(|i| VecInsert {
6616                    chunk_id: format!("vec-chunk-{i}"),
6617                    embedding: vec![0.1],
6618                })
6619                .collect(),
6620            operational_writes: (0..10_000)
6621                .map(|i| OperationalWrite::Append {
6622                    collection: format!("col-{i}"),
6623                    record_key: format!("key-{i}"),
6624                    payload_json: "{}".to_owned(),
6625                    source_ref: None,
6626                })
6627                .collect(),
6628        };
6629
6630        let result = prepare_write(request, ProvenanceMode::Warn)
6631            .map(|_| ())
6632            .map_err(|e| format!("{e}"));
6633        assert!(
6634            matches!(result, Err(ref msg) if msg.contains("too many total items")),
6635            "exceeding total item limit must return InvalidWrite: got {result:?}"
6636        );
6637    }
6638
6639    #[test]
6640    fn write_request_within_limits_succeeds() {
6641        let db = NamedTempFile::new().expect("temporary db");
6642        let writer = WriterActor::start(
6643            db.path(),
6644            Arc::new(SchemaManager::new()),
6645            ProvenanceMode::Warn,
6646            Arc::new(TelemetryCounters::default()),
6647        )
6648        .expect("writer");
6649
6650        let result = writer.submit(WriteRequest {
6651            label: "within-limits".to_owned(),
6652            nodes: vec![NodeInsert {
6653                row_id: "row-1".to_owned(),
6654                logical_id: "lg-1".to_owned(),
6655                kind: "Note".to_owned(),
6656                properties: "{}".to_owned(),
6657                source_ref: None,
6658                upsert: false,
6659                chunk_policy: ChunkPolicy::Preserve,
6660                content_ref: None,
6661            }],
6662            node_retires: vec![],
6663            edges: vec![],
6664            edge_retires: vec![],
6665            chunks: vec![],
6666            runs: vec![],
6667            steps: vec![],
6668            actions: vec![],
6669            optional_backfills: vec![],
6670            vec_inserts: vec![],
6671            operational_writes: vec![],
6672        });
6673
6674        assert!(
6675            result.is_ok(),
6676            "write request within limits must succeed: got {result:?}"
6677        );
6678    }
6679
6680    #[test]
6681    fn property_fts_rows_created_on_node_insert() {
6682        let db = NamedTempFile::new().expect("temporary db");
6683        // Bootstrap and register a property schema before starting the writer.
6684        let schema = Arc::new(SchemaManager::new());
6685        {
6686            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6687            schema.bootstrap(&conn).expect("bootstrap");
6688            conn.execute(
6689                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6690                 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
6691                [],
6692            )
6693            .expect("register schema");
6694        }
6695        let writer = WriterActor::start(
6696            db.path(),
6697            Arc::clone(&schema),
6698            ProvenanceMode::Warn,
6699            Arc::new(TelemetryCounters::default()),
6700        )
6701        .expect("writer");
6702
6703        writer
6704            .submit(WriteRequest {
6705                label: "goal-insert".to_owned(),
6706                nodes: vec![NodeInsert {
6707                    row_id: "row-1".to_owned(),
6708                    logical_id: "goal-1".to_owned(),
6709                    kind: "Goal".to_owned(),
6710                    properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
6711                        .to_owned(),
6712                    source_ref: Some("src-1".to_owned()),
6713                    upsert: false,
6714                    chunk_policy: ChunkPolicy::Preserve,
6715                    content_ref: None,
6716                }],
6717                node_retires: vec![],
6718                edges: vec![],
6719                edge_retires: vec![],
6720                chunks: vec![],
6721                runs: vec![],
6722                steps: vec![],
6723                actions: vec![],
6724                optional_backfills: vec![],
6725                vec_inserts: vec![],
6726                operational_writes: vec![],
6727            })
6728            .expect("write");
6729
6730        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6731        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
6732        let text: String = conn
6733            .query_row(
6734                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6735                [],
6736                |row| row.get(0),
6737            )
6738            .expect("property FTS row must exist");
6739        assert_eq!(text, "Ship v2 Launch the redesign");
6740    }
6741
6742    #[test]
6743    fn property_fts_rows_replaced_on_upsert() {
6744        let db = NamedTempFile::new().expect("temporary db");
6745        let schema = Arc::new(SchemaManager::new());
6746        {
6747            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6748            schema.bootstrap(&conn).expect("bootstrap");
6749            conn.execute(
6750                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6751                 VALUES ('Goal', '[\"$.name\"]', ' ')",
6752                [],
6753            )
6754            .expect("register schema");
6755        }
6756        let writer = WriterActor::start(
6757            db.path(),
6758            Arc::clone(&schema),
6759            ProvenanceMode::Warn,
6760            Arc::new(TelemetryCounters::default()),
6761        )
6762        .expect("writer");
6763
6764        // Initial insert.
6765        writer
6766            .submit(WriteRequest {
6767                label: "insert".to_owned(),
6768                nodes: vec![NodeInsert {
6769                    row_id: "row-1".to_owned(),
6770                    logical_id: "goal-1".to_owned(),
6771                    kind: "Goal".to_owned(),
6772                    properties: r#"{"name":"Alpha"}"#.to_owned(),
6773                    source_ref: Some("src-1".to_owned()),
6774                    upsert: false,
6775                    chunk_policy: ChunkPolicy::Preserve,
6776                    content_ref: None,
6777                }],
6778                node_retires: vec![],
6779                edges: vec![],
6780                edge_retires: vec![],
6781                chunks: vec![],
6782                runs: vec![],
6783                steps: vec![],
6784                actions: vec![],
6785                optional_backfills: vec![],
6786                vec_inserts: vec![],
6787                operational_writes: vec![],
6788            })
6789            .expect("insert");
6790
6791        // Upsert with changed property.
6792        writer
6793            .submit(WriteRequest {
6794                label: "upsert".to_owned(),
6795                nodes: vec![NodeInsert {
6796                    row_id: "row-2".to_owned(),
6797                    logical_id: "goal-1".to_owned(),
6798                    kind: "Goal".to_owned(),
6799                    properties: r#"{"name":"Beta"}"#.to_owned(),
6800                    source_ref: Some("src-2".to_owned()),
6801                    upsert: true,
6802                    chunk_policy: ChunkPolicy::Preserve,
6803                    content_ref: None,
6804                }],
6805                node_retires: vec![],
6806                edges: vec![],
6807                edge_retires: vec![],
6808                chunks: vec![],
6809                runs: vec![],
6810                steps: vec![],
6811                actions: vec![],
6812                optional_backfills: vec![],
6813                vec_inserts: vec![],
6814                operational_writes: vec![],
6815            })
6816            .expect("upsert");
6817
6818        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6819        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
6820        let count: i64 = conn
6821            .query_row(
6822                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6823                [],
6824                |row| row.get(0),
6825            )
6826            .expect("count");
6827        assert_eq!(
6828            count, 1,
6829            "must have exactly one property FTS row after upsert"
6830        );
6831
6832        let text: String = conn
6833            .query_row(
6834                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6835                [],
6836                |row| row.get(0),
6837            )
6838            .expect("text");
6839        assert_eq!(text, "Beta", "property FTS must reflect updated properties");
6840    }
6841
6842    #[test]
6843    fn property_fts_rows_deleted_on_retire() {
6844        let db = NamedTempFile::new().expect("temporary db");
6845        let schema = Arc::new(SchemaManager::new());
6846        {
6847            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6848            schema.bootstrap(&conn).expect("bootstrap");
6849            conn.execute(
6850                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6851                 VALUES ('Goal', '[\"$.name\"]', ' ')",
6852                [],
6853            )
6854            .expect("register schema");
6855        }
6856        let writer = WriterActor::start(
6857            db.path(),
6858            Arc::clone(&schema),
6859            ProvenanceMode::Warn,
6860            Arc::new(TelemetryCounters::default()),
6861        )
6862        .expect("writer");
6863
6864        // Insert a node.
6865        writer
6866            .submit(WriteRequest {
6867                label: "insert".to_owned(),
6868                nodes: vec![NodeInsert {
6869                    row_id: "row-1".to_owned(),
6870                    logical_id: "goal-1".to_owned(),
6871                    kind: "Goal".to_owned(),
6872                    properties: r#"{"name":"Alpha"}"#.to_owned(),
6873                    source_ref: Some("src-1".to_owned()),
6874                    upsert: false,
6875                    chunk_policy: ChunkPolicy::Preserve,
6876                    content_ref: None,
6877                }],
6878                node_retires: vec![],
6879                edges: vec![],
6880                edge_retires: vec![],
6881                chunks: vec![],
6882                runs: vec![],
6883                steps: vec![],
6884                actions: vec![],
6885                optional_backfills: vec![],
6886                vec_inserts: vec![],
6887                operational_writes: vec![],
6888            })
6889            .expect("insert");
6890
6891        // Retire the node.
6892        writer
6893            .submit(WriteRequest {
6894                label: "retire".to_owned(),
6895                nodes: vec![],
6896                node_retires: vec![NodeRetire {
6897                    logical_id: "goal-1".to_owned(),
6898                    source_ref: Some("forget-1".to_owned()),
6899                }],
6900                edges: vec![],
6901                edge_retires: vec![],
6902                chunks: vec![],
6903                runs: vec![],
6904                steps: vec![],
6905                actions: vec![],
6906                optional_backfills: vec![],
6907                vec_inserts: vec![],
6908                operational_writes: vec![],
6909            })
6910            .expect("retire");
6911
6912        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6913        let table = fathomdb_schema::fts_kind_table_name("Goal");
6914        let count: i64 = conn
6915            .query_row(
6916                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
6917                [],
6918                |row| row.get(0),
6919            )
6920            .expect("count");
6921        assert_eq!(count, 0, "property FTS row must be deleted on retire");
6922    }
6923
6924    #[test]
6925    fn no_property_fts_row_for_unregistered_kind() {
6926        let db = NamedTempFile::new().expect("temporary db");
6927        let schema = Arc::new(SchemaManager::new());
6928        {
6929            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6930            schema.bootstrap(&conn).expect("bootstrap");
6931            // No schema registered for "Note" kind.
6932        }
6933        let writer = WriterActor::start(
6934            db.path(),
6935            Arc::clone(&schema),
6936            ProvenanceMode::Warn,
6937            Arc::new(TelemetryCounters::default()),
6938        )
6939        .expect("writer");
6940
6941        writer
6942            .submit(WriteRequest {
6943                label: "insert".to_owned(),
6944                nodes: vec![NodeInsert {
6945                    row_id: "row-1".to_owned(),
6946                    logical_id: "note-1".to_owned(),
6947                    kind: "Note".to_owned(),
6948                    properties: r#"{"title":"hello"}"#.to_owned(),
6949                    source_ref: Some("src-1".to_owned()),
6950                    upsert: false,
6951                    chunk_policy: ChunkPolicy::Preserve,
6952                    content_ref: None,
6953                }],
6954                node_retires: vec![],
6955                edges: vec![],
6956                edge_retires: vec![],
6957                chunks: vec![],
6958                runs: vec![],
6959                steps: vec![],
6960                actions: vec![],
6961                optional_backfills: vec![],
6962                vec_inserts: vec![],
6963                operational_writes: vec![],
6964            })
6965            .expect("insert");
6966
6967        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6968        let table = fathomdb_schema::fts_kind_table_name("Note");
6969        let exists: bool = conn
6970            .query_row(
6971                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1",
6972                rusqlite::params![table],
6973                |row| row.get::<_, i64>(0),
6974            )
6975            .map(|c| c > 0)
6976            .expect("exists check");
6977        assert!(
6978            !exists,
6979            "no per-kind FTS table should be created for unregistered kind"
6980        );
6981    }
6982
6983    // --- A-6 per-kind FTS table tests ---
6984
6985    #[test]
6986    fn property_fts_write_goes_to_per_kind_table() {
6987        // After A-6: writes go to fts_props_goal, NOT fts_node_properties.
6988        let db = NamedTempFile::new().expect("temporary db");
6989        let schema = Arc::new(SchemaManager::new());
6990        {
6991            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6992            schema.bootstrap(&conn).expect("bootstrap");
6993            conn.execute(
6994                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6995                 VALUES ('Goal', '[\"$.name\"]', ' ')",
6996                [],
6997            )
6998            .expect("register schema");
6999            // Create per-kind table (migration 21 would do this for existing schemas).
7000            conn.execute_batch(
7001                "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7002                    node_logical_id UNINDEXED, text_content, \
7003                    tokenize = 'porter unicode61 remove_diacritics 2'\
7004                )",
7005            )
7006            .expect("create per-kind table");
7007        }
7008        let writer = WriterActor::start(
7009            db.path(),
7010            Arc::clone(&schema),
7011            ProvenanceMode::Warn,
7012            Arc::new(TelemetryCounters::default()),
7013        )
7014        .expect("writer");
7015
7016        writer
7017            .submit(WriteRequest {
7018                label: "insert".to_owned(),
7019                nodes: vec![NodeInsert {
7020                    row_id: "row-1".to_owned(),
7021                    logical_id: "goal-1".to_owned(),
7022                    kind: "Goal".to_owned(),
7023                    properties: r#"{"name":"Ship v2"}"#.to_owned(),
7024                    source_ref: Some("src-1".to_owned()),
7025                    upsert: false,
7026                    chunk_policy: ChunkPolicy::Preserve,
7027                    content_ref: None,
7028                }],
7029                node_retires: vec![],
7030                edges: vec![],
7031                edge_retires: vec![],
7032                chunks: vec![],
7033                runs: vec![],
7034                steps: vec![],
7035                actions: vec![],
7036                optional_backfills: vec![],
7037                vec_inserts: vec![],
7038                operational_writes: vec![],
7039            })
7040            .expect("write");
7041
7042        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7043        // Per-kind table must have the row.
7044        let per_kind_count: i64 = conn
7045            .query_row(
7046                "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7047                [],
7048                |row| row.get(0),
7049            )
7050            .expect("per-kind count");
7051        assert_eq!(
7052            per_kind_count, 1,
7053            "per-kind table fts_props_goal must have the row"
7054        );
7055    }
7056
7057    #[test]
7058    fn property_fts_retire_removes_from_per_kind_table() {
7059        // After A-6: retire removes from fts_props_goal, NOT fts_node_properties.
7060        let db = NamedTempFile::new().expect("temporary db");
7061        let schema = Arc::new(SchemaManager::new());
7062        {
7063            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7064            schema.bootstrap(&conn).expect("bootstrap");
7065            conn.execute(
7066                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7067                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7068                [],
7069            )
7070            .expect("register schema");
7071            conn.execute_batch(
7072                "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7073                    node_logical_id UNINDEXED, text_content, \
7074                    tokenize = 'porter unicode61 remove_diacritics 2'\
7075                )",
7076            )
7077            .expect("create per-kind table");
7078        }
7079        let writer = WriterActor::start(
7080            db.path(),
7081            Arc::clone(&schema),
7082            ProvenanceMode::Warn,
7083            Arc::new(TelemetryCounters::default()),
7084        )
7085        .expect("writer");
7086
7087        // Insert.
7088        writer
7089            .submit(WriteRequest {
7090                label: "insert".to_owned(),
7091                nodes: vec![NodeInsert {
7092                    row_id: "row-1".to_owned(),
7093                    logical_id: "goal-1".to_owned(),
7094                    kind: "Goal".to_owned(),
7095                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7096                    source_ref: Some("src-1".to_owned()),
7097                    upsert: false,
7098                    chunk_policy: ChunkPolicy::Preserve,
7099                    content_ref: None,
7100                }],
7101                node_retires: vec![],
7102                edges: vec![],
7103                edge_retires: vec![],
7104                chunks: vec![],
7105                runs: vec![],
7106                steps: vec![],
7107                actions: vec![],
7108                optional_backfills: vec![],
7109                vec_inserts: vec![],
7110                operational_writes: vec![],
7111            })
7112            .expect("insert");
7113
7114        // Retire.
7115        writer
7116            .submit(WriteRequest {
7117                label: "retire".to_owned(),
7118                nodes: vec![],
7119                node_retires: vec![NodeRetire {
7120                    logical_id: "goal-1".to_owned(),
7121                    source_ref: Some("forget-1".to_owned()),
7122                }],
7123                edges: vec![],
7124                edge_retires: vec![],
7125                chunks: vec![],
7126                runs: vec![],
7127                steps: vec![],
7128                actions: vec![],
7129                optional_backfills: vec![],
7130                vec_inserts: vec![],
7131                operational_writes: vec![],
7132            })
7133            .expect("retire");
7134
7135        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7136        let per_kind_count: i64 = conn
7137            .query_row(
7138                "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7139                [],
7140                |row| row.get(0),
7141            )
7142            .expect("per-kind count");
7143        assert_eq!(
7144            per_kind_count, 0,
7145            "per-kind table row must be removed on retire"
7146        );
7147    }
7148
7149    mod extract_json_path_tests {
7150        use super::super::extract_json_path;
7151        use serde_json::json;
7152
7153        #[test]
7154        fn string_value() {
7155            let v = json!({"name": "alice"});
7156            assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7157        }
7158
7159        #[test]
7160        fn number_value() {
7161            let v = json!({"age": 42});
7162            assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7163        }
7164
7165        #[test]
7166        fn bool_value() {
7167            let v = json!({"active": true});
7168            assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7169        }
7170
7171        #[test]
7172        fn null_value() {
7173            let v = json!({"x": null});
7174            assert!(extract_json_path(&v, "$.x").is_empty());
7175        }
7176
7177        #[test]
7178        fn missing_path() {
7179            let v = json!({"name": "a"});
7180            assert!(extract_json_path(&v, "$.missing").is_empty());
7181        }
7182
7183        #[test]
7184        fn nested_path() {
7185            let v = json!({"address": {"city": "NYC"}});
7186            assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7187        }
7188
7189        #[test]
7190        fn array_of_strings() {
7191            let v = json!({"tags": ["a", "b", "c"]});
7192            assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7193        }
7194
7195        #[test]
7196        fn array_mixed_scalars() {
7197            let v = json!({"vals": ["x", 1, true]});
7198            assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7199        }
7200
7201        #[test]
7202        fn array_only_objects_returns_empty() {
7203            let v = json!({"data": [{"k": "v"}]});
7204            assert!(extract_json_path(&v, "$.data").is_empty());
7205        }
7206
7207        #[test]
7208        fn array_mixed_objects_and_scalars() {
7209            let v = json!({"data": ["keep", {"skip": true}, "also"]});
7210            assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7211        }
7212
7213        #[test]
7214        fn object_returns_empty() {
7215            let v = json!({"meta": {"k": "v"}});
7216            assert!(extract_json_path(&v, "$.meta").is_empty());
7217        }
7218
7219        #[test]
7220        fn no_prefix_returns_empty() {
7221            let v = json!({"name": "a"});
7222            assert!(extract_json_path(&v, "name").is_empty());
7223        }
7224    }
7225
7226    mod recursive_extraction_tests {
7227        use super::super::{
7228            LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7229            PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7230        };
7231        use serde_json::json;
7232
7233        fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7234            PropertyFtsSchema {
7235                paths,
7236                separator: " ".to_owned(),
7237                exclude_paths: Vec::new(),
7238            }
7239        }
7240
7241        fn schema_with_excludes(
7242            paths: Vec<PropertyPathEntry>,
7243            excludes: Vec<String>,
7244        ) -> PropertyFtsSchema {
7245            PropertyFtsSchema {
7246                paths,
7247                separator: " ".to_owned(),
7248                exclude_paths: excludes,
7249            }
7250        }
7251
7252        #[test]
7253        fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7254            let props = json!({"payload": {"b": "two", "a": "one"}});
7255            let (blob, positions, _stats) = extract_property_fts(
7256                &props,
7257                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7258            );
7259            let blob = blob.expect("blob emitted");
7260            let idx_one = blob.find("one").expect("contains 'one'");
7261            let idx_two = blob.find("two").expect("contains 'two'");
7262            assert!(
7263                idx_one < idx_two,
7264                "lex order: 'one' (key a) before 'two' (key b)"
7265            );
7266            assert_eq!(positions.len(), 2);
7267            assert_eq!(positions[0].leaf_path, "$.payload.a");
7268            assert_eq!(positions[1].leaf_path, "$.payload.b");
7269        }
7270
7271        #[test]
7272        fn recursive_extraction_walks_arrays_of_scalars() {
7273            let props = json!({"tags": ["red", "blue"]});
7274            let (_blob, positions, _stats) = extract_property_fts(
7275                &props,
7276                &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7277            );
7278            assert_eq!(positions.len(), 2);
7279            assert_eq!(positions[0].leaf_path, "$.tags[0]");
7280            assert_eq!(positions[1].leaf_path, "$.tags[1]");
7281        }
7282
7283        #[test]
7284        fn recursive_extraction_walks_arrays_of_objects() {
7285            let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7286            let (_blob, positions, _stats) = extract_property_fts(
7287                &props,
7288                &schema(vec![PropertyPathEntry::recursive("$.items")]),
7289            );
7290            assert_eq!(positions.len(), 2);
7291            assert_eq!(positions[0].leaf_path, "$.items[0].name");
7292            assert_eq!(positions[1].leaf_path, "$.items[1].name");
7293        }
7294
7295        #[test]
7296        fn recursive_extraction_stringifies_numbers_and_bools() {
7297            let props = json!({"root": {"n": 42, "ok": true}});
7298            let (blob, _positions, _stats) = extract_property_fts(
7299                &props,
7300                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7301            );
7302            let blob = blob.expect("blob emitted");
7303            assert!(blob.contains("42"));
7304            assert!(blob.contains("true"));
7305        }
7306
7307        #[test]
7308        fn recursive_extraction_skips_nulls_and_missing() {
7309            let props = json!({"root": {"x": null, "y": "present"}});
7310            let (blob, positions, _stats) = extract_property_fts(
7311                &props,
7312                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7313            );
7314            let blob = blob.expect("blob emitted");
7315            assert!(!blob.contains("null"));
7316            assert_eq!(positions.len(), 1);
7317            assert_eq!(positions[0].leaf_path, "$.root.y");
7318        }
7319
7320        #[test]
7321        fn recursive_extraction_respects_max_depth_guardrail() {
7322            // Build nested object 10 levels deep. MAX_RECURSIVE_DEPTH = 8.
7323            let mut inner = json!("leaf-value");
7324            for _ in 0..10 {
7325                inner = json!({ "k": inner });
7326            }
7327            let props = json!({ "root": inner });
7328            let (blob, positions, stats) = extract_property_fts(
7329                &props,
7330                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7331            );
7332            assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7333            // The walk should stop before reaching the leaf (depth 8).
7334            assert!(
7335                blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7336                "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7337            );
7338            // Row is still indexed with whatever was emitted — even if
7339            // that's nothing. The contract is only that the walk clamped.
7340            let _ = positions;
7341            let _ = MAX_RECURSIVE_DEPTH;
7342        }
7343
7344        #[test]
7345        fn recursive_extraction_respects_max_bytes_guardrail() {
7346            // Build an array of 4KB strings; total blob > 64KB.
7347            let leaves: Vec<String> = (0..40)
7348                .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7349                .collect();
7350            let props = json!({ "root": leaves });
7351            let (blob, positions, stats) = extract_property_fts(
7352                &props,
7353                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7354            );
7355            assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7356            let blob = blob.expect("blob must still be emitted");
7357            assert!(
7358                blob.len() <= MAX_EXTRACTED_BYTES,
7359                "blob must not exceed MAX_EXTRACTED_BYTES"
7360            );
7361            // No partial leaves: every position end must fall inside blob length.
7362            for pos in &positions {
7363                assert!(pos.end_offset <= blob.len());
7364                let slice = &blob[pos.start_offset..pos.end_offset];
7365                // Slice must equal some original leaf value; we only
7366                // verify it's not empty and doesn't straddle a separator.
7367                assert!(!slice.is_empty());
7368                assert!(!slice.contains(LEAF_SEPARATOR));
7369            }
7370            // Blob cannot end in a dangling separator.
7371            assert!(!blob.ends_with(LEAF_SEPARATOR));
7372        }
7373
7374        #[test]
7375        fn recursive_extraction_respects_exclude_paths() {
7376            let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7377            let (blob, positions, stats) = extract_property_fts(
7378                &props,
7379                &schema_with_excludes(
7380                    vec![PropertyPathEntry::recursive("$.payload")],
7381                    vec!["$.payload.priv".to_owned()],
7382                ),
7383            );
7384            let blob = blob.expect("blob emitted");
7385            assert!(blob.contains("yes"));
7386            assert!(!blob.contains("no"));
7387            assert_eq!(positions.len(), 1);
7388            assert_eq!(positions[0].leaf_path, "$.payload.pub");
7389            assert!(stats.excluded_subtree > 0);
7390        }
7391
7392        #[test]
7393        fn position_map_entries_match_emitted_leaves_in_order() {
7394            let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7395            let (blob, positions, _stats) = extract_property_fts(
7396                &props,
7397                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7398            );
7399            let blob = blob.expect("blob emitted");
7400            assert_eq!(positions.len(), 3);
7401            // Leaf paths in lexicographic order.
7402            assert_eq!(positions[0].leaf_path, "$.root.a");
7403            assert_eq!(positions[1].leaf_path, "$.root.b");
7404            assert_eq!(positions[2].leaf_path, "$.root.c");
7405            // Monotonic, non-overlapping offsets.
7406            let mut prev_end: usize = 0;
7407            for (i, pos) in positions.iter().enumerate() {
7408                assert!(pos.start_offset >= prev_end);
7409                assert!(pos.end_offset > pos.start_offset);
7410                assert!(pos.end_offset <= blob.len());
7411                let slice = &blob[pos.start_offset..pos.end_offset];
7412                match i {
7413                    0 => assert_eq!(slice, "alpha"),
7414                    1 => assert_eq!(slice, "bravo"),
7415                    2 => assert_eq!(slice, "charlie"),
7416                    _ => unreachable!(),
7417                }
7418                prev_end = pos.end_offset;
7419            }
7420        }
7421
7422        #[test]
7423        fn scalar_only_schema_produces_empty_position_map() {
7424            let props = json!({"name": "alpha", "title": "beta"});
7425            let (blob, positions, _stats) = extract_property_fts(
7426                &props,
7427                &schema(vec![
7428                    PropertyPathEntry::scalar("$.name"),
7429                    PropertyPathEntry::scalar("$.title"),
7430                ]),
7431            );
7432            assert_eq!(blob.as_deref(), Some("alpha beta"));
7433            assert!(
7434                positions.is_empty(),
7435                "scalar-only schema must emit no position entries"
7436            );
7437            // Sanity-check the type so the test fails loudly if the
7438            // signature changes.
7439            let _: Vec<PositionEntry> = positions;
7440        }
7441
7442        fn assert_unique_start_offsets(positions: &[PositionEntry]) {
7443            let mut seen = std::collections::HashSet::new();
7444            for pos in positions {
7445                let start = pos.start_offset;
7446                assert!(
7447                    seen.insert(start),
7448                    "duplicate start_offset {start} in positions {positions:?}"
7449                );
7450            }
7451        }
7452
7453        #[test]
7454        fn recursive_extraction_empty_then_nonempty_in_array() {
7455            let props = json!({"payload": {"xs": ["", "x"]}});
7456            let (combined, positions, _stats) = extract_property_fts(
7457                &props,
7458                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7459            );
7460            assert_eq!(combined.as_deref(), Some("x"));
7461            assert_eq!(positions.len(), 1);
7462            assert_eq!(positions[0].leaf_path, "$.payload.xs[1]");
7463            assert_eq!(positions[0].start_offset, 0);
7464            assert_eq!(positions[0].end_offset, 1);
7465            assert_unique_start_offsets(&positions);
7466        }
7467
7468        #[test]
7469        fn recursive_extraction_two_empties_then_nonempty_in_array() {
7470            let props = json!({"payload": {"xs": ["", "", "x"]}});
7471            let (combined, positions, _stats) = extract_property_fts(
7472                &props,
7473                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7474            );
7475            assert_eq!(combined.as_deref(), Some("x"));
7476            assert_eq!(positions.len(), 1);
7477            assert_eq!(positions[0].leaf_path, "$.payload.xs[2]");
7478            assert_eq!(positions[0].start_offset, 0);
7479            assert_eq!(positions[0].end_offset, 1);
7480            assert_unique_start_offsets(&positions);
7481        }
7482
7483        #[test]
7484        fn recursive_extraction_empty_then_nonempty_sibling_keys() {
7485            let props = json!({"payload": {"a": "", "b": "x"}});
7486            let (combined, positions, _stats) = extract_property_fts(
7487                &props,
7488                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7489            );
7490            assert_eq!(combined.as_deref(), Some("x"));
7491            assert_eq!(positions.len(), 1);
7492            assert_eq!(positions[0].leaf_path, "$.payload.b");
7493            assert_eq!(positions[0].start_offset, 0);
7494            assert_eq!(positions[0].end_offset, 1);
7495            assert_unique_start_offsets(&positions);
7496        }
7497
7498        #[test]
7499        fn recursive_extraction_nested_empty_then_nonempty_sibling_keys() {
7500            let props = json!({"payload": {"inner": {"a": "", "b": "x"}}});
7501            let (combined, positions, _stats) = extract_property_fts(
7502                &props,
7503                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7504            );
7505            assert_eq!(combined.as_deref(), Some("x"));
7506            assert_eq!(positions.len(), 1);
7507            assert_eq!(positions[0].leaf_path, "$.payload.inner.b");
7508            assert_eq!(positions[0].start_offset, 0);
7509            assert_eq!(positions[0].end_offset, 1);
7510            assert_unique_start_offsets(&positions);
7511        }
7512
7513        #[test]
7514        fn recursive_extraction_descent_past_empty_sibling_into_nested_subtree() {
7515            let props = json!({"payload": {"a": "", "b": {"c": "x"}}});
7516            let (combined, positions, _stats) = extract_property_fts(
7517                &props,
7518                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7519            );
7520            assert_eq!(combined.as_deref(), Some("x"));
7521            assert_eq!(positions.len(), 1);
7522            assert_eq!(positions[0].leaf_path, "$.payload.b.c");
7523            assert_eq!(positions[0].start_offset, 0);
7524            assert_eq!(positions[0].end_offset, 1);
7525            assert_unique_start_offsets(&positions);
7526        }
7527
7528        #[test]
7529        fn recursive_extraction_all_empty_shapes_emit_no_positions() {
7530            let cases = vec![
7531                json!({"payload": {}}),
7532                json!({"payload": {"a": ""}}),
7533                json!({"payload": {"xs": []}}),
7534                json!({"payload": {"xs": [""]}}),
7535                json!({"payload": {"xs": ["", ""]}}),
7536                json!({"payload": {"xs": ["", "", ""]}}),
7537            ];
7538            for case in cases {
7539                let (combined, positions, _stats) = extract_property_fts(
7540                    &case,
7541                    &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7542                );
7543                assert!(
7544                    combined.is_none(),
7545                    "all-empty payload {case:?} must produce no combined text, got {combined:?}"
7546                );
7547                assert!(
7548                    positions.is_empty(),
7549                    "all-empty payload {case:?} must produce no positions, got {positions:?}"
7550                );
7551            }
7552        }
7553
7554        #[test]
7555        fn recursive_extraction_nonempty_then_empty_then_nonempty() {
7556            let props = json!({"payload": {"xs": ["x", "", "y"]}});
7557            let (combined, positions, _stats) = extract_property_fts(
7558                &props,
7559                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7560            );
7561            let blob = combined.expect("blob emitted");
7562            assert_eq!(positions.len(), 2);
7563            assert_eq!(positions[0].leaf_path, "$.payload.xs[0]");
7564            assert_eq!(positions[1].leaf_path, "$.payload.xs[2]");
7565            assert_eq!(positions[0].start_offset, 0);
7566            assert_eq!(positions[0].end_offset, 1);
7567            // The two non-empty leaves are separated by exactly one
7568            // LEAF_SEPARATOR; the empty middle leaf contributes neither
7569            // bytes nor a position.
7570            assert_eq!(positions[1].start_offset, 1 + LEAF_SEPARATOR.len());
7571            assert_eq!(positions[1].end_offset, 2 + LEAF_SEPARATOR.len());
7572            assert_eq!(
7573                &blob[positions[0].start_offset..positions[0].end_offset],
7574                "x"
7575            );
7576            assert_eq!(
7577                &blob[positions[1].start_offset..positions[1].end_offset],
7578                "y"
7579            );
7580            assert_unique_start_offsets(&positions);
7581        }
7582
7583        #[test]
7584        fn recursive_extraction_null_leaves_unchanged() {
7585            let props = json!({"payload": {"xs": [null, null]}});
7586            let (combined, positions, _stats) = extract_property_fts(
7587                &props,
7588                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7589            );
7590            assert!(combined.is_none());
7591            assert!(positions.is_empty());
7592        }
7593    }
7594
7595    mod parse_property_schema_json_tests {
7596        use super::super::parse_property_schema_json;
7597
7598        #[test]
7599        fn parse_property_schema_json_preserves_weight() {
7600            let json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar"}]"#;
7601            let schema = parse_property_schema_json(json, " ");
7602            assert_eq!(schema.paths.len(), 2);
7603            let title = &schema.paths[0];
7604            assert_eq!(title.path, "$.title");
7605            assert_eq!(title.weight, Some(2.0_f32));
7606            let body = &schema.paths[1];
7607            assert_eq!(body.path, "$.body");
7608            assert_eq!(body.weight, None);
7609        }
7610    }
7611
7612    // --- B-3: extract_property_fts_columns tests ---
7613
7614    mod extract_property_fts_columns_tests {
7615        use serde_json::json;
7616
7617        use super::super::{PropertyFtsSchema, PropertyPathEntry, extract_property_fts_columns};
7618
7619        fn weighted_schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7620            PropertyFtsSchema {
7621                paths,
7622                separator: " ".to_owned(),
7623                exclude_paths: Vec::new(),
7624            }
7625        }
7626
7627        #[test]
7628        fn extract_property_fts_columns_returns_per_spec_text() {
7629            let props = json!({"title": "Hello", "body": "World"});
7630            let mut title_entry = PropertyPathEntry::scalar("$.title");
7631            title_entry.weight = Some(2.0);
7632            let mut body_entry = PropertyPathEntry::scalar("$.body");
7633            body_entry.weight = Some(1.0);
7634            let schema = weighted_schema(vec![title_entry, body_entry]);
7635            let cols = extract_property_fts_columns(&props, &schema);
7636            assert_eq!(cols.len(), 2);
7637            assert_eq!(cols[0].0, "title");
7638            assert_eq!(cols[0].1, "Hello");
7639            assert_eq!(cols[1].0, "body");
7640            assert_eq!(cols[1].1, "World");
7641        }
7642
7643        #[test]
7644        fn extract_property_fts_columns_missing_field_returns_empty_string() {
7645            let props = json!({"title": "Hello"});
7646            let mut title_entry = PropertyPathEntry::scalar("$.title");
7647            title_entry.weight = Some(2.0);
7648            let mut body_entry = PropertyPathEntry::scalar("$.body");
7649            body_entry.weight = Some(1.0);
7650            let schema = weighted_schema(vec![title_entry, body_entry]);
7651            let cols = extract_property_fts_columns(&props, &schema);
7652            assert_eq!(cols.len(), 2);
7653            assert_eq!(cols[0].1, "Hello");
7654            assert_eq!(cols[1].1, "", "missing field yields empty string");
7655        }
7656    }
7657
7658    // --- B-3: writer per-column INSERT for weighted schemas ---
7659
7660    #[test]
7661    fn writer_inserts_per_column_for_weighted_schema() {
7662        use fathomdb_schema::fts_column_name;
7663
7664        let db = NamedTempFile::new().expect("temporary db");
7665        let schema_manager = Arc::new(SchemaManager::new());
7666
7667        // Bootstrap and create the weighted per-kind table.
7668        {
7669            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7670            schema_manager.bootstrap(&conn).expect("bootstrap");
7671
7672            let title_col = fts_column_name("$.title", false);
7673            let body_col = fts_column_name("$.body", false);
7674
7675            // Register schema with weights.
7676            let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#.to_string();
7677            conn.execute(
7678                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7679                 VALUES (?1, ?2, ' ')",
7680                rusqlite::params!["Article", paths_json],
7681            )
7682            .expect("register schema");
7683
7684            // Create the per-kind FTS table with per-column layout.
7685            conn.execute_batch(&format!(
7686                "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_article USING fts5(\
7687                    node_logical_id UNINDEXED, {title_col}, {body_col}, \
7688                    tokenize = 'porter unicode61 remove_diacritics 2'\
7689                )"
7690            ))
7691            .expect("create weighted per-kind table");
7692        }
7693
7694        let writer = WriterActor::start(
7695            db.path(),
7696            Arc::clone(&schema_manager),
7697            ProvenanceMode::Warn,
7698            Arc::new(TelemetryCounters::default()),
7699        )
7700        .expect("writer");
7701
7702        writer
7703            .submit(WriteRequest {
7704                label: "insert".to_owned(),
7705                nodes: vec![NodeInsert {
7706                    row_id: "row-1".to_owned(),
7707                    logical_id: "article-1".to_owned(),
7708                    kind: "Article".to_owned(),
7709                    properties: r#"{"title":"Hello World","body":"Foo Bar"}"#.to_owned(),
7710                    source_ref: Some("src-1".to_owned()),
7711                    upsert: false,
7712                    chunk_policy: ChunkPolicy::Preserve,
7713                    content_ref: None,
7714                }],
7715                node_retires: vec![],
7716                edges: vec![],
7717                edge_retires: vec![],
7718                chunks: vec![],
7719                runs: vec![],
7720                steps: vec![],
7721                actions: vec![],
7722                optional_backfills: vec![],
7723                vec_inserts: vec![],
7724                operational_writes: vec![],
7725            })
7726            .expect("write");
7727
7728        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7729        let title_col = fts_column_name("$.title", false);
7730        let body_col = fts_column_name("$.body", false);
7731
7732        // The per-kind table must have one row for article-1.
7733        let count: i64 = conn
7734            .query_row(
7735                "SELECT count(*) FROM fts_props_article WHERE node_logical_id = 'article-1'",
7736                [],
7737                |r| r.get(0),
7738            )
7739            .expect("count");
7740        assert_eq!(count, 1, "per-kind FTS table must have the row");
7741
7742        // Verify per-column values via direct SELECT (not MATCH).
7743        let (title_val, body_val): (String, String) = conn
7744            .query_row(
7745                &format!(
7746                    "SELECT {title_col}, {body_col} FROM fts_props_article \
7747                     WHERE node_logical_id = 'article-1'"
7748                ),
7749                [],
7750                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
7751            )
7752            .expect("select per-column");
7753        assert_eq!(
7754            title_val, "Hello World",
7755            "title column must have correct value"
7756        );
7757        assert_eq!(body_val, "Foo Bar", "body column must have correct value");
7758    }
7759}