Skip to main content

fathomdb_engine/
writer.rs

1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::SchemaManager;
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14    OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15    OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16    OperationalValidationMode, extract_secondary_index_entries_for_current,
17    extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18    parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23/// A deferred projection backfill task submitted alongside a write.
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26    /// Which projection to backfill (FTS, vec, etc.).
27    pub target: ProjectionTarget,
28    /// JSON payload describing the backfill work.
29    pub payload: String,
30}
31
32/// Policy for handling existing chunks when upserting a node.
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35    /// Keep existing chunks and FTS rows.
36    #[default]
37    Preserve,
38    /// Delete existing chunks and FTS rows before inserting new ones.
39    Replace,
40}
41
42/// Controls how missing `source_ref` values are handled at write time.
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45    /// Emit a warning in the [`WriteReceipt`] but allow the write to proceed.
46    #[default]
47    Warn,
48    /// Reject the write with [`EngineError::InvalidWrite`] if any canonical
49    /// insert or retire is missing `source_ref`.
50    Require,
51}
52
53/// A node to be inserted in a [`WriteRequest`].
54#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56    pub row_id: String,
57    pub logical_id: String,
58    pub kind: String,
59    pub properties: String,
60    pub source_ref: Option<String>,
61    /// When true the writer supersedes the current active row for this `logical_id`
62    /// before inserting this new version. The supersession and insert are atomic.
63    pub upsert: bool,
64    /// Controls whether existing chunks and FTS rows are deleted when upsert=true.
65    pub chunk_policy: ChunkPolicy,
66    /// Optional URI referencing external content (e.g. a PDF, web page, or dataset).
67    pub content_ref: Option<String>,
68}
69
70/// An edge to be inserted in a [`WriteRequest`].
71#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EdgeInsert {
73    pub row_id: String,
74    pub logical_id: String,
75    pub source_logical_id: String,
76    pub target_logical_id: String,
77    pub kind: String,
78    pub properties: String,
79    pub source_ref: Option<String>,
80    /// When true the writer supersedes the current active edge for this `logical_id`
81    /// before inserting this new version. The supersession and insert are atomic.
82    pub upsert: bool,
83}
84
85/// A node to be retired (soft-deleted) in a [`WriteRequest`].
86#[derive(Clone, Debug, PartialEq, Eq)]
87pub struct NodeRetire {
88    pub logical_id: String,
89    pub source_ref: Option<String>,
90}
91
92/// An edge to be retired (soft-deleted) in a [`WriteRequest`].
93#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeRetire {
95    pub logical_id: String,
96    pub source_ref: Option<String>,
97}
98
99/// A text chunk to be inserted in a [`WriteRequest`].
100#[derive(Clone, Debug, PartialEq, Eq)]
101pub struct ChunkInsert {
102    pub id: String,
103    pub node_logical_id: String,
104    pub text_content: String,
105    pub byte_start: Option<i64>,
106    pub byte_end: Option<i64>,
107    /// Optional hash of the external content this chunk was derived from.
108    pub content_hash: Option<String>,
109}
110
111/// A vector embedding to attach to an existing chunk.
112///
113/// The `chunk_id` must reference a chunk already present in the database or
114/// co-submitted in the same [`WriteRequest`].  The embedding is stored in the
115/// `vec_nodes_active` virtual table when the `sqlite-vec` feature is enabled;
116/// without the feature the insert is silently skipped.
117#[derive(Clone, Debug, PartialEq)]
118pub struct VecInsert {
119    pub chunk_id: String,
120    pub embedding: Vec<f32>,
121}
122
123/// A mutation to an operational collection submitted as part of a write request.
124#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum OperationalWrite {
126    Append {
127        collection: String,
128        record_key: String,
129        payload_json: String,
130        source_ref: Option<String>,
131    },
132    Put {
133        collection: String,
134        record_key: String,
135        payload_json: String,
136        source_ref: Option<String>,
137    },
138    Delete {
139        collection: String,
140        record_key: String,
141        source_ref: Option<String>,
142    },
143}
144
145/// A run to be inserted in a [`WriteRequest`].
146#[derive(Clone, Debug, PartialEq, Eq)]
147pub struct RunInsert {
148    pub id: String,
149    pub kind: String,
150    pub status: String,
151    pub properties: String,
152    pub source_ref: Option<String>,
153    pub upsert: bool,
154    pub supersedes_id: Option<String>,
155}
156
157/// A step to be inserted in a [`WriteRequest`].
158#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepInsert {
160    pub id: String,
161    pub run_id: String,
162    pub kind: String,
163    pub status: String,
164    pub properties: String,
165    pub source_ref: Option<String>,
166    pub upsert: bool,
167    pub supersedes_id: Option<String>,
168}
169
170/// An action to be inserted in a [`WriteRequest`].
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct ActionInsert {
173    pub id: String,
174    pub step_id: String,
175    pub kind: String,
176    pub status: String,
177    pub properties: String,
178    pub source_ref: Option<String>,
179    pub upsert: bool,
180    pub supersedes_id: Option<String>,
181}
182
183// --- Write-request size limits ---
184const MAX_NODES: usize = 10_000;
185const MAX_EDGES: usize = 10_000;
186const MAX_CHUNKS: usize = 50_000;
187const MAX_RETIRES: usize = 10_000;
188const MAX_RUNTIME_ITEMS: usize = 10_000;
189const MAX_OPERATIONAL: usize = 10_000;
190const MAX_TOTAL_ITEMS: usize = 100_000;
191
192/// How long `submit` / `touch_last_accessed` wait for the writer thread to reply.
193///
194/// After this timeout the caller receives [`EngineError::WriterTimedOut`] — the
195/// writer thread is **not** cancelled, so the write may still commit.  This is
196/// intentionally distinct from [`EngineError::WriterRejected`], which signals
197/// that the writer thread has disconnected and the write will **not** commit.
198const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
199
200/// A batch of graph mutations to be applied atomically in a single `SQLite` transaction.
201#[derive(Clone, Debug, PartialEq)]
202pub struct WriteRequest {
203    pub label: String,
204    pub nodes: Vec<NodeInsert>,
205    pub node_retires: Vec<NodeRetire>,
206    pub edges: Vec<EdgeInsert>,
207    pub edge_retires: Vec<EdgeRetire>,
208    pub chunks: Vec<ChunkInsert>,
209    pub runs: Vec<RunInsert>,
210    pub steps: Vec<StepInsert>,
211    pub actions: Vec<ActionInsert>,
212    pub optional_backfills: Vec<OptionalProjectionTask>,
213    /// Vector embeddings to persist alongside chunks.  Silently skipped when the
214    /// `sqlite-vec` feature is absent.
215    pub vec_inserts: Vec<VecInsert>,
216    pub operational_writes: Vec<OperationalWrite>,
217}
218
219/// Receipt returned after a successful write transaction.
220#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct WriteReceipt {
222    pub label: String,
223    pub optional_backfill_count: usize,
224    pub warnings: Vec<String>,
225    pub provenance_warnings: Vec<String>,
226}
227
228/// Request to update `last_accessed_at` timestamps for a batch of nodes.
229#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct LastAccessTouchRequest {
231    pub logical_ids: Vec<String>,
232    pub touched_at: i64,
233    pub source_ref: Option<String>,
234}
235
236/// Report from a last-access touch operation.
237#[derive(Clone, Debug, PartialEq, Eq)]
238pub struct LastAccessTouchReport {
239    pub touched_logical_ids: usize,
240    pub touched_at: i64,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct FtsProjectionRow {
245    chunk_id: String,
246    node_logical_id: String,
247    kind: String,
248    text_content: String,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
252struct FtsPropertyProjectionRow {
253    node_logical_id: String,
254    kind: String,
255    text_content: String,
256}
257
258struct PreparedWrite {
259    label: String,
260    nodes: Vec<NodeInsert>,
261    node_retires: Vec<NodeRetire>,
262    edges: Vec<EdgeInsert>,
263    edge_retires: Vec<EdgeRetire>,
264    chunks: Vec<ChunkInsert>,
265    runs: Vec<RunInsert>,
266    steps: Vec<StepInsert>,
267    actions: Vec<ActionInsert>,
268    /// Suppressed when sqlite-vec feature is absent: field is set by `prepare_write` but only
269    /// consumed by the cfg-gated apply block.
270    #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
271    vec_inserts: Vec<VecInsert>,
272    operational_writes: Vec<OperationalWrite>,
273    operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
274    operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
275    operational_validation_warnings: Vec<String>,
276    /// `node_logical_id` → kind for nodes co-submitted in this request.
277    /// Used by `resolve_fts_rows` to avoid a DB round-trip for the common case.
278    node_kinds: HashMap<String, String>,
279    /// Filled in by `resolve_fts_rows` in the writer thread before BEGIN IMMEDIATE.
280    required_fts_rows: Vec<FtsProjectionRow>,
281    /// Filled in by `resolve_property_fts_rows` in the writer thread before BEGIN IMMEDIATE.
282    required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
283    optional_backfills: Vec<OptionalProjectionTask>,
284}
285
286enum WriteMessage {
287    Submit {
288        prepared: Box<PreparedWrite>,
289        reply: Sender<Result<WriteReceipt, EngineError>>,
290    },
291    TouchLastAccessed {
292        request: LastAccessTouchRequest,
293        reply: Sender<Result<LastAccessTouchReport, EngineError>>,
294    },
295}
296
297/// Single-threaded writer that serializes all mutations through one `SQLite` connection.
298///
299/// On drop, the channel is closed and the writer thread is joined, ensuring all
300/// in-flight writes complete and the `SQLite` connection closes cleanly.
301#[derive(Debug)]
302pub struct WriterActor {
303    sender: ManuallyDrop<SyncSender<WriteMessage>>,
304    thread_handle: Option<thread::JoinHandle<()>>,
305    provenance_mode: ProvenanceMode,
306    // Retained for Phase 2 (statement-level telemetry from WriterActor methods).
307    _telemetry: Arc<TelemetryCounters>,
308}
309
310impl WriterActor {
311    /// # Errors
312    /// Returns [`EngineError`] if the writer thread cannot be spawned.
313    pub fn start(
314        path: impl AsRef<Path>,
315        schema_manager: Arc<SchemaManager>,
316        provenance_mode: ProvenanceMode,
317        telemetry: Arc<TelemetryCounters>,
318    ) -> Result<Self, EngineError> {
319        let database_path = path.as_ref().to_path_buf();
320        let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
321
322        let writer_telemetry = Arc::clone(&telemetry);
323        let handle = thread::Builder::new()
324            .name("fathomdb-writer".to_owned())
325            .spawn(move || {
326                writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
327            })
328            .map_err(EngineError::Io)?;
329
330        Ok(Self {
331            sender: ManuallyDrop::new(sender),
332            thread_handle: Some(handle),
333            provenance_mode,
334            _telemetry: telemetry,
335        })
336    }
337
338    /// Returns `true` if the writer thread is still running.
339    fn is_thread_alive(&self) -> bool {
340        self.thread_handle
341            .as_ref()
342            .is_some_and(|h| !h.is_finished())
343    }
344
345    /// Returns `WriterRejected` if the writer thread has exited.
346    fn check_thread_alive(&self) -> Result<(), EngineError> {
347        if self.is_thread_alive() {
348            Ok(())
349        } else {
350            Err(EngineError::WriterRejected(
351                "writer thread has exited".to_owned(),
352            ))
353        }
354    }
355
356    /// # Errors
357    /// Returns [`EngineError`] if the write request validation fails, the writer actor has shut
358    /// down, or the underlying `SQLite` transaction fails.
359    pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
360        self.check_thread_alive()?;
361        let prepared = prepare_write(request, self.provenance_mode)?;
362        let (reply_tx, reply_rx) = mpsc::channel();
363        self.sender
364            .send(WriteMessage::Submit {
365                prepared: Box::new(prepared),
366                reply: reply_tx,
367            })
368            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
369
370        recv_with_timeout(&reply_rx)
371    }
372
373    /// # Errors
374    /// Returns [`EngineError`] if validation fails, the writer actor has shut down,
375    /// or the underlying `SQLite` transaction fails.
376    pub fn touch_last_accessed(
377        &self,
378        request: LastAccessTouchRequest,
379    ) -> Result<LastAccessTouchReport, EngineError> {
380        self.check_thread_alive()?;
381        prepare_touch_last_accessed(&request, self.provenance_mode)?;
382        let (reply_tx, reply_rx) = mpsc::channel();
383        self.sender
384            .send(WriteMessage::TouchLastAccessed {
385                request,
386                reply: reply_tx,
387            })
388            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
389
390        recv_with_timeout(&reply_rx)
391    }
392}
393
394#[cfg(not(feature = "tracing"))]
395#[allow(clippy::print_stderr)]
396fn stderr_panic_notice() {
397    eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
398}
399
400impl Drop for WriterActor {
401    fn drop(&mut self) {
402        // Phase 1: close the channel so the writer thread's `for msg in receiver`
403        // loop exits after finishing any in-progress message.
404        // Must happen BEFORE join to avoid deadlock.
405        //
406        // SAFETY: drop is called exactly once, and no method accesses the sender
407        // after drop begins.
408        unsafe { ManuallyDrop::drop(&mut self.sender) };
409
410        // Phase 2: join the writer thread to ensure the SQLite connection closes.
411        if let Some(handle) = self.thread_handle.take() {
412            match handle.join() {
413                Ok(()) => {}
414                Err(payload) => {
415                    if std::thread::panicking() {
416                        trace_warn!(
417                            "writer thread panicked during shutdown (suppressed: already panicking)"
418                        );
419                        #[cfg(not(feature = "tracing"))]
420                        stderr_panic_notice();
421                    } else {
422                        std::panic::resume_unwind(payload);
423                    }
424                }
425            }
426        }
427    }
428}
429
430/// Wait for a reply from the writer thread, with a timeout.
431fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
432    rx.recv_timeout(WRITER_REPLY_TIMEOUT)
433        .map_err(|error| match error {
434            mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
435                "write timed out waiting for writer thread reply — the write may still commit"
436                    .to_owned(),
437            ),
438            mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
439        })
440        .and_then(|result| result)
441}
442
443fn prepare_touch_last_accessed(
444    request: &LastAccessTouchRequest,
445    mode: ProvenanceMode,
446) -> Result<(), EngineError> {
447    if request.logical_ids.is_empty() {
448        return Err(EngineError::InvalidWrite(
449            "touch_last_accessed requires at least one logical_id".to_owned(),
450        ));
451    }
452    for logical_id in &request.logical_ids {
453        if logical_id.trim().is_empty() {
454            return Err(EngineError::InvalidWrite(
455                "touch_last_accessed requires non-empty logical_ids".to_owned(),
456            ));
457        }
458    }
459    if mode == ProvenanceMode::Require && request.source_ref.is_none() {
460        return Err(EngineError::InvalidWrite(
461            "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
462                .to_owned(),
463        ));
464    }
465    Ok(())
466}
467
468fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
469    let missing: Vec<String> = request
470        .nodes
471        .iter()
472        .filter(|n| n.source_ref.is_none())
473        .map(|n| format!("node '{}'", n.logical_id))
474        .chain(
475            request
476                .node_retires
477                .iter()
478                .filter(|r| r.source_ref.is_none())
479                .map(|r| format!("node retire '{}'", r.logical_id)),
480        )
481        .chain(
482            request
483                .edges
484                .iter()
485                .filter(|e| e.source_ref.is_none())
486                .map(|e| format!("edge '{}'", e.logical_id)),
487        )
488        .chain(
489            request
490                .edge_retires
491                .iter()
492                .filter(|r| r.source_ref.is_none())
493                .map(|r| format!("edge retire '{}'", r.logical_id)),
494        )
495        .chain(
496            request
497                .runs
498                .iter()
499                .filter(|r| r.source_ref.is_none())
500                .map(|r| format!("run '{}'", r.id)),
501        )
502        .chain(
503            request
504                .steps
505                .iter()
506                .filter(|s| s.source_ref.is_none())
507                .map(|s| format!("step '{}'", s.id)),
508        )
509        .chain(
510            request
511                .actions
512                .iter()
513                .filter(|a| a.source_ref.is_none())
514                .map(|a| format!("action '{}'", a.id)),
515        )
516        .chain(
517            request
518                .operational_writes
519                .iter()
520                .filter(|write| operational_write_source_ref(write).is_none())
521                .map(|write| {
522                    format!(
523                        "operational {} '{}:{}'",
524                        operational_write_kind(write),
525                        operational_write_collection(write),
526                        operational_write_record_key(write)
527                    )
528                }),
529        )
530        .collect();
531
532    if missing.is_empty() {
533        Ok(())
534    } else {
535        Err(EngineError::InvalidWrite(format!(
536            "ProvenanceMode::Require: missing source_ref on: {}",
537            missing.join(", ")
538        )))
539    }
540}
541
542fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
543    if request.nodes.len() > MAX_NODES {
544        return Err(EngineError::InvalidWrite(format!(
545            "too many nodes: {} exceeds limit of {MAX_NODES}",
546            request.nodes.len()
547        )));
548    }
549    if request.edges.len() > MAX_EDGES {
550        return Err(EngineError::InvalidWrite(format!(
551            "too many edges: {} exceeds limit of {MAX_EDGES}",
552            request.edges.len()
553        )));
554    }
555    if request.chunks.len() > MAX_CHUNKS {
556        return Err(EngineError::InvalidWrite(format!(
557            "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
558            request.chunks.len()
559        )));
560    }
561    let retires = request.node_retires.len() + request.edge_retires.len();
562    if retires > MAX_RETIRES {
563        return Err(EngineError::InvalidWrite(format!(
564            "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
565        )));
566    }
567    let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
568    if runtime_items > MAX_RUNTIME_ITEMS {
569        return Err(EngineError::InvalidWrite(format!(
570            "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
571        )));
572    }
573    if request.operational_writes.len() > MAX_OPERATIONAL {
574        return Err(EngineError::InvalidWrite(format!(
575            "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
576            request.operational_writes.len()
577        )));
578    }
579    let total = request.nodes.len()
580        + request.node_retires.len()
581        + request.edges.len()
582        + request.edge_retires.len()
583        + request.chunks.len()
584        + request.runs.len()
585        + request.steps.len()
586        + request.actions.len()
587        + request.vec_inserts.len()
588        + request.operational_writes.len();
589    if total > MAX_TOTAL_ITEMS {
590        return Err(EngineError::InvalidWrite(format!(
591            "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
592        )));
593    }
594    Ok(())
595}
596
597#[allow(clippy::too_many_lines)]
598fn prepare_write(
599    request: WriteRequest,
600    mode: ProvenanceMode,
601) -> Result<PreparedWrite, EngineError> {
602    validate_request_size(&request)?;
603
604    // --- ID validation: reject empty IDs ---
605    for node in &request.nodes {
606        if node.row_id.is_empty() {
607            return Err(EngineError::InvalidWrite(
608                "NodeInsert has empty row_id".to_owned(),
609            ));
610        }
611        if node.logical_id.is_empty() {
612            return Err(EngineError::InvalidWrite(
613                "NodeInsert has empty logical_id".to_owned(),
614            ));
615        }
616    }
617    for edge in &request.edges {
618        if edge.row_id.is_empty() {
619            return Err(EngineError::InvalidWrite(
620                "EdgeInsert has empty row_id".to_owned(),
621            ));
622        }
623        if edge.logical_id.is_empty() {
624            return Err(EngineError::InvalidWrite(
625                "EdgeInsert has empty logical_id".to_owned(),
626            ));
627        }
628    }
629    for chunk in &request.chunks {
630        if chunk.id.is_empty() {
631            return Err(EngineError::InvalidWrite(
632                "ChunkInsert has empty id".to_owned(),
633            ));
634        }
635        if chunk.text_content.is_empty() {
636            return Err(EngineError::InvalidWrite(format!(
637                "chunk '{}' has empty text_content; empty chunks are not allowed",
638                chunk.id
639            )));
640        }
641    }
642    for run in &request.runs {
643        if run.id.is_empty() {
644            return Err(EngineError::InvalidWrite(
645                "RunInsert has empty id".to_owned(),
646            ));
647        }
648    }
649    for step in &request.steps {
650        if step.id.is_empty() {
651            return Err(EngineError::InvalidWrite(
652                "StepInsert has empty id".to_owned(),
653            ));
654        }
655    }
656    for action in &request.actions {
657        if action.id.is_empty() {
658            return Err(EngineError::InvalidWrite(
659                "ActionInsert has empty id".to_owned(),
660            ));
661        }
662    }
663    for vi in &request.vec_inserts {
664        if vi.chunk_id.is_empty() {
665            return Err(EngineError::InvalidWrite(
666                "VecInsert has empty chunk_id".to_owned(),
667            ));
668        }
669        if vi.embedding.is_empty() {
670            return Err(EngineError::InvalidWrite(format!(
671                "VecInsert for chunk '{}' has empty embedding",
672                vi.chunk_id
673            )));
674        }
675    }
676    for operational in &request.operational_writes {
677        if operational_write_collection(operational).is_empty() {
678            return Err(EngineError::InvalidWrite(
679                "OperationalWrite has empty collection".to_owned(),
680            ));
681        }
682        if operational_write_record_key(operational).is_empty() {
683            return Err(EngineError::InvalidWrite(format!(
684                "OperationalWrite for collection '{}' has empty record_key",
685                operational_write_collection(operational)
686            )));
687        }
688        match operational {
689            OperationalWrite::Append { payload_json, .. }
690            | OperationalWrite::Put { payload_json, .. } => {
691                if payload_json.is_empty() {
692                    return Err(EngineError::InvalidWrite(format!(
693                        "OperationalWrite {} '{}:{}' has empty payload_json",
694                        operational_write_kind(operational),
695                        operational_write_collection(operational),
696                        operational_write_record_key(operational)
697                    )));
698                }
699            }
700            OperationalWrite::Delete { .. } => {}
701        }
702    }
703
704    // --- ID validation: reject duplicate row_ids within the request ---
705    {
706        let mut seen = std::collections::HashSet::new();
707        for node in &request.nodes {
708            if !seen.insert(node.row_id.as_str()) {
709                return Err(EngineError::InvalidWrite(format!(
710                    "duplicate row_id '{}' within the same WriteRequest",
711                    node.row_id
712                )));
713            }
714        }
715        for edge in &request.edges {
716            if !seen.insert(edge.row_id.as_str()) {
717                return Err(EngineError::InvalidWrite(format!(
718                    "duplicate row_id '{}' within the same WriteRequest",
719                    edge.row_id
720                )));
721            }
722        }
723    }
724
725    // --- ProvenanceMode::Require enforcement ---
726    if mode == ProvenanceMode::Require {
727        check_require_provenance(&request)?;
728    }
729
730    // --- Runtime table upsert validation ---
731    for run in &request.runs {
732        if run.upsert && run.supersedes_id.is_none() {
733            return Err(EngineError::InvalidWrite(format!(
734                "run '{}': upsert=true requires supersedes_id to be set",
735                run.id
736            )));
737        }
738    }
739    for step in &request.steps {
740        if step.upsert && step.supersedes_id.is_none() {
741            return Err(EngineError::InvalidWrite(format!(
742                "step '{}': upsert=true requires supersedes_id to be set",
743                step.id
744            )));
745        }
746    }
747    for action in &request.actions {
748        if action.upsert && action.supersedes_id.is_none() {
749            return Err(EngineError::InvalidWrite(format!(
750                "action '{}': upsert=true requires supersedes_id to be set",
751                action.id
752            )));
753        }
754    }
755
756    let node_kinds = request
757        .nodes
758        .iter()
759        .map(|node| (node.logical_id.clone(), node.kind.clone()))
760        .collect::<HashMap<_, _>>();
761
762    Ok(PreparedWrite {
763        label: request.label,
764        nodes: request.nodes,
765        node_retires: request.node_retires,
766        edges: request.edges,
767        edge_retires: request.edge_retires,
768        chunks: request.chunks,
769        runs: request.runs,
770        steps: request.steps,
771        actions: request.actions,
772        vec_inserts: request.vec_inserts,
773        operational_writes: request.operational_writes,
774        operational_collection_kinds: HashMap::new(),
775        operational_collection_filter_fields: HashMap::new(),
776        operational_validation_warnings: Vec::new(),
777        node_kinds,
778        required_fts_rows: Vec::new(),
779        required_property_fts_rows: Vec::new(),
780        optional_backfills: request.optional_backfills,
781    })
782}
783
784fn writer_loop(
785    database_path: &Path,
786    schema_manager: &Arc<SchemaManager>,
787    receiver: mpsc::Receiver<WriteMessage>,
788    telemetry: &TelemetryCounters,
789) {
790    trace_info!("writer thread started");
791
792    let mut conn = match sqlite::open_connection(database_path) {
793        Ok(conn) => conn,
794        Err(error) => {
795            trace_error!(error = %error, "writer thread: database connection failed");
796            reject_all(receiver, &error.to_string());
797            return;
798        }
799    };
800
801    if let Err(error) = schema_manager.bootstrap(&conn) {
802        trace_error!(error = %error, "writer thread: schema bootstrap failed");
803        reject_all(receiver, &error.to_string());
804        return;
805    }
806
807    for message in receiver {
808        match message {
809            WriteMessage::Submit {
810                mut prepared,
811                reply,
812            } => {
813                #[cfg(feature = "tracing")]
814                let start = std::time::Instant::now();
815                let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
816                    resolve_and_apply(&mut conn, &mut prepared)
817                }));
818                if let Ok(inner) = result {
819                    #[allow(unused_variables)]
820                    if let Err(error) = &inner {
821                        trace_error!(
822                            label = %prepared.label,
823                            error = %error,
824                            "write failed"
825                        );
826                        telemetry.increment_errors();
827                    } else {
828                        let row_count = (prepared.nodes.len()
829                            + prepared.edges.len()
830                            + prepared.chunks.len()) as u64;
831                        telemetry.increment_writes(row_count);
832                        trace_info!(
833                            label = %prepared.label,
834                            nodes = prepared.nodes.len(),
835                            edges = prepared.edges.len(),
836                            chunks = prepared.chunks.len(),
837                            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
838                            "write committed"
839                        );
840                    }
841                    let _ = reply.send(inner);
842                } else {
843                    trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
844                    telemetry.increment_errors();
845                    // Attempt to clean up any open transaction after a panic.
846                    let _ = conn.execute_batch("ROLLBACK");
847                    let _ = reply.send(Err(EngineError::WriterRejected(
848                        "writer thread panic during resolve_and_apply".to_owned(),
849                    )));
850                }
851            }
852            WriteMessage::TouchLastAccessed { request, reply } => {
853                let result = apply_touch_last_accessed(&mut conn, &request);
854                if result.is_ok() {
855                    telemetry.increment_writes(0);
856                } else {
857                    telemetry.increment_errors();
858                }
859                let _ = reply.send(result);
860            }
861        }
862    }
863
864    trace_info!("writer thread shutting down");
865}
866
867fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
868    for message in receiver {
869        match message {
870            WriteMessage::Submit { reply, .. } => {
871                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
872            }
873            WriteMessage::TouchLastAccessed { reply, .. } => {
874                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
875            }
876        }
877    }
878}
879
880/// Resolve FTS projection rows before the write transaction begins.
881///
882/// For each chunk in the prepared write, determines the node `kind` needed
883/// to populate the FTS index. If the node was co-submitted in the same
884/// request its kind is taken from `prepared.node_kinds` without a DB query.
885/// If the node is pre-existing it is looked up in the database. If the node
886/// cannot be found in either place, an `InvalidWrite` error is returned.
887fn resolve_fts_rows(
888    conn: &rusqlite::Connection,
889    prepared: &mut PreparedWrite,
890) -> Result<(), EngineError> {
891    let retiring_ids: std::collections::HashSet<&str> = prepared
892        .node_retires
893        .iter()
894        .map(|r| r.logical_id.as_str())
895        .collect();
896    for chunk in &prepared.chunks {
897        if retiring_ids.contains(chunk.node_logical_id.as_str()) {
898            return Err(EngineError::InvalidWrite(format!(
899                "chunk '{}' references node_logical_id '{}' which is being retired in the same \
900                 WriteRequest; retire and chunk insertion for the same node must not be combined",
901                chunk.id, chunk.node_logical_id
902            )));
903        }
904    }
905    for chunk in &prepared.chunks {
906        let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
907            k.clone()
908        } else {
909            match conn.query_row(
910                "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
911                params![chunk.node_logical_id],
912                |row| row.get::<_, String>(0),
913            ) {
914                Ok(kind) => kind,
915                Err(rusqlite::Error::QueryReturnedNoRows) => {
916                    return Err(EngineError::InvalidWrite(format!(
917                        "chunk '{}' references node_logical_id '{}' that is not present in this \
918                         write request or the database \
919                         (v1 limitation: chunks and their nodes must be submitted together or the \
920                         node must already exist)",
921                        chunk.id, chunk.node_logical_id
922                    )));
923                }
924                Err(e) => return Err(EngineError::Sqlite(e)),
925            }
926        };
927        prepared.required_fts_rows.push(FtsProjectionRow {
928            chunk_id: chunk.id.clone(),
929            node_logical_id: chunk.node_logical_id.clone(),
930            kind,
931            text_content: chunk.text_content.clone(),
932        });
933    }
934    trace_debug!(
935        fts_rows = prepared.required_fts_rows.len(),
936        chunks_processed = prepared.chunks.len(),
937        "fts row resolution completed"
938    );
939    Ok(())
940}
941
942/// Load FTS property schemas from the database and extract property values from
943/// co-submitted nodes, generating `FtsPropertyProjectionRow` entries.
944fn resolve_property_fts_rows(
945    conn: &rusqlite::Connection,
946    prepared: &mut PreparedWrite,
947) -> Result<(), EngineError> {
948    if prepared.nodes.is_empty() {
949        return Ok(());
950    }
951
952    let schemas: HashMap<String, (Vec<String>, String)> = load_fts_property_schemas(conn)?
953        .into_iter()
954        .map(|(kind, paths, sep)| (kind, (paths, sep)))
955        .collect();
956
957    if schemas.is_empty() {
958        return Ok(());
959    }
960
961    for node in &prepared.nodes {
962        let Some((paths, separator)) = schemas.get(&node.kind) else {
963            continue;
964        };
965        let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
966        if let Some(text_content) = compute_property_fts_text(&props, paths, separator) {
967            prepared
968                .required_property_fts_rows
969                .push(FtsPropertyProjectionRow {
970                    node_logical_id: node.logical_id.clone(),
971                    kind: node.kind.clone(),
972                    text_content,
973                });
974        }
975    }
976    trace_debug!(
977        property_fts_rows = prepared.required_property_fts_rows.len(),
978        nodes_processed = prepared.nodes.len(),
979        "property fts row resolution completed"
980    );
981    Ok(())
982}
983
984/// Extract scalar values from a JSON object at a `$.field` path.
985/// Supports simple dot-notation paths like `$.name`, `$.address.city`.
986/// Returns individual scalars so callers can join them with the configured separator.
987pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
988    let Some(path) = path.strip_prefix("$.") else {
989        return Vec::new();
990    };
991    let mut current = value;
992    for segment in path.split('.') {
993        match current.get(segment) {
994            Some(v) => current = v,
995            None => return Vec::new(),
996        }
997    }
998    match current {
999        serde_json::Value::String(s) => vec![s.clone()],
1000        serde_json::Value::Number(n) => vec![n.to_string()],
1001        serde_json::Value::Bool(b) => vec![b.to_string()],
1002        serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
1003        serde_json::Value::Array(arr) => arr
1004            .iter()
1005            .filter_map(|v| match v {
1006                serde_json::Value::String(s) => Some(s.clone()),
1007                serde_json::Value::Number(n) => Some(n.to_string()),
1008                serde_json::Value::Bool(b) => Some(b.to_string()),
1009                _ => None,
1010            })
1011            .collect(),
1012    }
1013}
1014
1015/// Extract property values at `paths` from a JSON value and join with `separator`.
1016/// Array elements are individual extracted values — the separator applies between
1017/// all scalars regardless of whether they came from the same path or different paths.
1018/// Returns `None` if no paths resolve to a value.
1019pub(crate) fn compute_property_fts_text(
1020    props: &serde_json::Value,
1021    paths: &[String],
1022    separator: &str,
1023) -> Option<String> {
1024    let mut parts = Vec::new();
1025    for path in paths {
1026        parts.extend(extract_json_path(props, path));
1027    }
1028    if parts.is_empty() {
1029        None
1030    } else {
1031        Some(parts.join(separator))
1032    }
1033}
1034
1035/// Load all registered FTS property schemas from the database.
1036pub(crate) fn load_fts_property_schemas(
1037    conn: &rusqlite::Connection,
1038) -> Result<Vec<(String, Vec<String>, String)>, rusqlite::Error> {
1039    let mut stmt =
1040        conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
1041    stmt.query_map([], |row| {
1042        let kind: String = row.get(0)?;
1043        let paths_json: String = row.get(1)?;
1044        let separator: String = row.get(2)?;
1045        let paths: Vec<String> = serde_json::from_str(&paths_json).unwrap_or_default();
1046        Ok((kind, paths, separator))
1047    })?
1048    .collect::<Result<Vec<_>, _>>()
1049}
1050
1051fn resolve_operational_writes(
1052    conn: &rusqlite::Connection,
1053    prepared: &mut PreparedWrite,
1054) -> Result<(), EngineError> {
1055    let mut collection_kinds = HashMap::new();
1056    let mut collection_filter_fields = HashMap::new();
1057    let mut collection_validation_contracts = HashMap::new();
1058    for write in &prepared.operational_writes {
1059        let collection = operational_write_collection(write);
1060        if !collection_kinds.contains_key(collection) {
1061            let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1062                .query_row(
1063                    "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1064                    params![collection],
1065                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1066                )
1067                .optional()
1068                .map_err(EngineError::Sqlite)?;
1069            let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1070                .ok_or_else(|| {
1071                    EngineError::InvalidWrite(format!(
1072                        "operational collection '{collection}' is not registered"
1073                    ))
1074                })?;
1075            if disabled_at.is_some() {
1076                return Err(EngineError::InvalidWrite(format!(
1077                    "operational collection '{collection}' is disabled"
1078                )));
1079            }
1080            let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1081                .map_err(EngineError::InvalidWrite)?;
1082            let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1083            let validation_contract = parse_operational_validation_contract(&validation_json)
1084                .map_err(EngineError::InvalidWrite)?;
1085            collection_kinds.insert(collection.to_owned(), kind);
1086            collection_filter_fields.insert(collection.to_owned(), filter_fields);
1087            collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1088        }
1089
1090        let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1091            EngineError::InvalidWrite("missing operational collection kind".to_owned())
1092        })?;
1093        match (kind, write) {
1094            (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1095            | (
1096                OperationalCollectionKind::LatestState,
1097                OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1098            ) => {}
1099            (OperationalCollectionKind::AppendOnlyLog, _) => {
1100                return Err(EngineError::InvalidWrite(format!(
1101                    "operational collection '{collection}' is append_only_log and only accepts Append"
1102                )));
1103            }
1104            (OperationalCollectionKind::LatestState, _) => {
1105                return Err(EngineError::InvalidWrite(format!(
1106                    "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1107                )));
1108            }
1109        }
1110        if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1111            let _ = check_operational_write_against_contract(write, contract)?;
1112        }
1113    }
1114    prepared.operational_collection_kinds = collection_kinds;
1115    prepared.operational_collection_filter_fields = collection_filter_fields;
1116    Ok(())
1117}
1118
1119fn parse_operational_filter_fields(
1120    filter_fields_json: &str,
1121) -> Result<Vec<OperationalFilterField>, EngineError> {
1122    let fields: Vec<OperationalFilterField> =
1123        serde_json::from_str(filter_fields_json).map_err(|error| {
1124            EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1125        })?;
1126    let mut seen = std::collections::HashSet::new();
1127    for field in &fields {
1128        if field.name.trim().is_empty() {
1129            return Err(EngineError::InvalidWrite(
1130                "filter_fields_json field names must not be empty".to_owned(),
1131            ));
1132        }
1133        if !seen.insert(field.name.as_str()) {
1134            return Err(EngineError::InvalidWrite(format!(
1135                "filter_fields_json contains duplicate field '{}'",
1136                field.name
1137            )));
1138        }
1139        if field.modes.is_empty() {
1140            return Err(EngineError::InvalidWrite(format!(
1141                "filter_fields_json field '{}' must declare at least one mode",
1142                field.name
1143            )));
1144        }
1145        if field.modes.contains(&OperationalFilterMode::Prefix)
1146            && field.field_type != OperationalFilterFieldType::String
1147        {
1148            return Err(EngineError::InvalidWrite(format!(
1149                "filter field '{}' only supports prefix for string types",
1150                field.name
1151            )));
1152        }
1153    }
1154    Ok(fields)
1155}
1156
1157#[derive(Clone, Debug, PartialEq, Eq)]
1158struct OperationalFilterValueRow {
1159    field_name: String,
1160    string_value: Option<String>,
1161    integer_value: Option<i64>,
1162}
1163
1164fn extract_operational_filter_values(
1165    filter_fields: &[OperationalFilterField],
1166    payload_json: &str,
1167) -> Vec<OperationalFilterValueRow> {
1168    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1169        return Vec::new();
1170    };
1171    let Some(object) = parsed.as_object() else {
1172        return Vec::new();
1173    };
1174
1175    filter_fields
1176        .iter()
1177        .filter_map(|field| {
1178            let value = object.get(&field.name)?;
1179            match field.field_type {
1180                OperationalFilterFieldType::String => {
1181                    value
1182                        .as_str()
1183                        .map(|string_value| OperationalFilterValueRow {
1184                            field_name: field.name.clone(),
1185                            string_value: Some(string_value.to_owned()),
1186                            integer_value: None,
1187                        })
1188                }
1189                OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1190                    value
1191                        .as_i64()
1192                        .map(|integer_value| OperationalFilterValueRow {
1193                            field_name: field.name.clone(),
1194                            string_value: None,
1195                            integer_value: Some(integer_value),
1196                        })
1197                }
1198            }
1199        })
1200        .collect()
1201}
1202
1203fn resolve_and_apply(
1204    conn: &mut rusqlite::Connection,
1205    prepared: &mut PreparedWrite,
1206) -> Result<WriteReceipt, EngineError> {
1207    resolve_fts_rows(conn, prepared)?;
1208    resolve_property_fts_rows(conn, prepared)?;
1209    resolve_operational_writes(conn, prepared)?;
1210    apply_write(conn, prepared)
1211}
1212
1213fn apply_touch_last_accessed(
1214    conn: &mut rusqlite::Connection,
1215    request: &LastAccessTouchRequest,
1216) -> Result<LastAccessTouchReport, EngineError> {
1217    let mut seen = std::collections::HashSet::new();
1218    let logical_ids = request
1219        .logical_ids
1220        .iter()
1221        .filter(|logical_id| seen.insert(logical_id.as_str()))
1222        .cloned()
1223        .collect::<Vec<_>>();
1224    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1225
1226    for logical_id in &logical_ids {
1227        let exists = tx
1228            .query_row(
1229                "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1230                params![logical_id],
1231                |row| row.get::<_, i64>(0),
1232            )
1233            .optional()?
1234            .is_some();
1235        if !exists {
1236            return Err(EngineError::InvalidWrite(format!(
1237                "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1238            )));
1239        }
1240    }
1241
1242    {
1243        let mut upsert_metadata = tx.prepare_cached(
1244            "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1245             VALUES (?1, ?2, ?2) \
1246             ON CONFLICT(logical_id) DO UPDATE SET \
1247                 last_accessed_at = excluded.last_accessed_at, \
1248                 updated_at = excluded.updated_at",
1249        )?;
1250        let mut insert_provenance = tx.prepare_cached(
1251            "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1252             VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1253        )?;
1254        for logical_id in &logical_ids {
1255            upsert_metadata.execute(params![logical_id, request.touched_at])?;
1256            insert_provenance.execute(params![
1257                new_id(),
1258                logical_id,
1259                request.source_ref.as_deref(),
1260                format!("{{\"touched_at\":{}}}", request.touched_at),
1261            ])?;
1262        }
1263    }
1264
1265    tx.commit()?;
1266    Ok(LastAccessTouchReport {
1267        touched_logical_ids: logical_ids.len(),
1268        touched_at: request.touched_at,
1269    })
1270}
1271
1272fn ensure_operational_collections_writable(
1273    tx: &rusqlite::Transaction<'_>,
1274    prepared: &PreparedWrite,
1275) -> Result<(), EngineError> {
1276    for collection in prepared.operational_collection_kinds.keys() {
1277        let disabled_at: Option<Option<i64>> = tx
1278            .query_row(
1279                "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1280                params![collection],
1281                |row| row.get::<_, Option<i64>>(0),
1282            )
1283            .optional()?;
1284        match disabled_at {
1285            Some(Some(_)) => {
1286                return Err(EngineError::InvalidWrite(format!(
1287                    "operational collection '{collection}' is disabled"
1288                )));
1289            }
1290            Some(None) => {}
1291            None => {
1292                return Err(EngineError::InvalidWrite(format!(
1293                    "operational collection '{collection}' is not registered"
1294                )));
1295            }
1296        }
1297    }
1298    Ok(())
1299}
1300
1301fn validate_operational_writes_against_live_contracts(
1302    tx: &rusqlite::Transaction<'_>,
1303    prepared: &PreparedWrite,
1304) -> Result<Vec<String>, EngineError> {
1305    let mut collection_validation_contracts =
1306        HashMap::<String, Option<OperationalValidationContract>>::new();
1307    for collection in prepared.operational_collection_kinds.keys() {
1308        let validation_json: String = tx
1309            .query_row(
1310                "SELECT validation_json FROM operational_collections WHERE name = ?1",
1311                params![collection],
1312                |row| row.get(0),
1313            )
1314            .map_err(EngineError::Sqlite)?;
1315        let validation_contract = parse_operational_validation_contract(&validation_json)
1316            .map_err(EngineError::InvalidWrite)?;
1317        collection_validation_contracts.insert(collection.clone(), validation_contract);
1318    }
1319
1320    let mut warnings = Vec::new();
1321    for write in &prepared.operational_writes {
1322        if let Some(Some(contract)) =
1323            collection_validation_contracts.get(operational_write_collection(write))
1324            && let Some(warning) = check_operational_write_against_contract(write, contract)?
1325        {
1326            warnings.push(warning);
1327        }
1328    }
1329
1330    Ok(warnings)
1331}
1332
1333fn load_live_operational_secondary_indexes(
1334    tx: &rusqlite::Transaction<'_>,
1335    prepared: &PreparedWrite,
1336) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1337    let mut collection_indexes = HashMap::new();
1338    for (collection, collection_kind) in &prepared.operational_collection_kinds {
1339        let secondary_indexes_json: String = tx
1340            .query_row(
1341                "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1342                params![collection],
1343                |row| row.get(0),
1344            )
1345            .map_err(EngineError::Sqlite)?;
1346        let indexes =
1347            parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1348                .map_err(EngineError::InvalidWrite)?;
1349        collection_indexes.insert(collection.clone(), indexes);
1350    }
1351    Ok(collection_indexes)
1352}
1353
1354fn check_operational_write_against_contract(
1355    write: &OperationalWrite,
1356    contract: &OperationalValidationContract,
1357) -> Result<Option<String>, EngineError> {
1358    if contract.mode == OperationalValidationMode::Disabled {
1359        return Ok(None);
1360    }
1361
1362    let (payload_json, collection, record_key) = match write {
1363        OperationalWrite::Append {
1364            collection,
1365            record_key,
1366            payload_json,
1367            ..
1368        }
1369        | OperationalWrite::Put {
1370            collection,
1371            record_key,
1372            payload_json,
1373            ..
1374        } => (
1375            payload_json.as_str(),
1376            collection.as_str(),
1377            record_key.as_str(),
1378        ),
1379        OperationalWrite::Delete { .. } => return Ok(None),
1380    };
1381
1382    match validate_operational_payload_against_contract(contract, payload_json) {
1383        Ok(()) => Ok(None),
1384        Err(message) => match contract.mode {
1385            OperationalValidationMode::Disabled => Ok(None),
1386            OperationalValidationMode::ReportOnly => Ok(Some(format!(
1387                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1388                kind = operational_write_kind(write)
1389            ))),
1390            OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1391                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1392                kind = operational_write_kind(write)
1393            ))),
1394        },
1395    }
1396}
1397
1398#[allow(clippy::too_many_lines)]
1399fn apply_write(
1400    conn: &mut rusqlite::Connection,
1401    prepared: &mut PreparedWrite,
1402) -> Result<WriteReceipt, EngineError> {
1403    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1404
1405    // Node retires: clear rebuildable FTS rows (chunk-based and property-based),
1406    // preserve chunks/vec for possible restore, mark superseded, record audit event.
1407    {
1408        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1409        let mut del_prop_fts =
1410            tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1411        let mut sup_node = tx.prepare_cached(
1412            "UPDATE nodes SET superseded_at = unixepoch() \
1413             WHERE logical_id = ?1 AND superseded_at IS NULL",
1414        )?;
1415        let mut ins_event = tx.prepare_cached(
1416            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1417             VALUES (?1, 'node_retire', ?2, ?3)",
1418        )?;
1419        for retire in &prepared.node_retires {
1420            del_fts.execute(params![retire.logical_id])?;
1421            del_prop_fts.execute(params![retire.logical_id])?;
1422            sup_node.execute(params![retire.logical_id])?;
1423            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1424        }
1425    }
1426
1427    // Edge retires: mark superseded, record audit event.
1428    {
1429        let mut sup_edge = tx.prepare_cached(
1430            "UPDATE edges SET superseded_at = unixepoch() \
1431             WHERE logical_id = ?1 AND superseded_at IS NULL",
1432        )?;
1433        let mut ins_event = tx.prepare_cached(
1434            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1435             VALUES (?1, 'edge_retire', ?2, ?3)",
1436        )?;
1437        for retire in &prepared.edge_retires {
1438            sup_edge.execute(params![retire.logical_id])?;
1439            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1440        }
1441    }
1442
1443    // Node inserts (with optional upsert + chunk-policy handling).
1444    {
1445        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1446        let mut del_prop_fts =
1447            tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1448        let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1449        let mut sup_node = tx.prepare_cached(
1450            "UPDATE nodes SET superseded_at = unixepoch() \
1451             WHERE logical_id = ?1 AND superseded_at IS NULL",
1452        )?;
1453        let mut ins_node = tx.prepare_cached(
1454            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1455             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1456        )?;
1457        #[cfg(feature = "sqlite-vec")]
1458        let vec_del_sql2 = "DELETE FROM vec_nodes_active WHERE chunk_id IN \
1459                            (SELECT id FROM chunks WHERE node_logical_id = ?1)";
1460        #[cfg(feature = "sqlite-vec")]
1461        let mut del_vec = match tx.prepare_cached(vec_del_sql2) {
1462            Ok(stmt) => Some(stmt),
1463            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => None,
1464            Err(e) => return Err(e.into()),
1465        };
1466        for node in &prepared.nodes {
1467            if node.upsert {
1468                // Property FTS rows are always replaced on upsert since properties change.
1469                del_prop_fts.execute(params![node.logical_id])?;
1470                if node.chunk_policy == ChunkPolicy::Replace {
1471                    #[cfg(feature = "sqlite-vec")]
1472                    if let Some(ref mut stmt) = del_vec {
1473                        stmt.execute(params![node.logical_id])?;
1474                    }
1475                    del_fts.execute(params![node.logical_id])?;
1476                    del_chunks.execute(params![node.logical_id])?;
1477                }
1478                sup_node.execute(params![node.logical_id])?;
1479            }
1480            ins_node.execute(params![
1481                node.row_id,
1482                node.logical_id,
1483                node.kind,
1484                node.properties,
1485                node.source_ref,
1486                node.content_ref,
1487            ])?;
1488        }
1489    }
1490
1491    // Edge inserts (with optional upsert).
1492    {
1493        let mut sup_edge = tx.prepare_cached(
1494            "UPDATE edges SET superseded_at = unixepoch() \
1495             WHERE logical_id = ?1 AND superseded_at IS NULL",
1496        )?;
1497        let mut ins_edge = tx.prepare_cached(
1498            "INSERT INTO edges \
1499             (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1500             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1501        )?;
1502        for edge in &prepared.edges {
1503            if edge.upsert {
1504                sup_edge.execute(params![edge.logical_id])?;
1505            }
1506            ins_edge.execute(params![
1507                edge.row_id,
1508                edge.logical_id,
1509                edge.source_logical_id,
1510                edge.target_logical_id,
1511                edge.kind,
1512                edge.properties,
1513                edge.source_ref,
1514            ])?;
1515        }
1516    }
1517
1518    // Chunk inserts.
1519    {
1520        let mut ins_chunk = tx.prepare_cached(
1521            "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
1522             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1523        )?;
1524        for chunk in &prepared.chunks {
1525            ins_chunk.execute(params![
1526                chunk.id,
1527                chunk.node_logical_id,
1528                chunk.text_content,
1529                chunk.byte_start,
1530                chunk.byte_end,
1531                chunk.content_hash,
1532            ])?;
1533        }
1534    }
1535
1536    // Run inserts (with optional upsert).
1537    {
1538        let mut sup_run = tx.prepare_cached(
1539            "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1540        )?;
1541        let mut ins_run = tx.prepare_cached(
1542            "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1543             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1544        )?;
1545        for run in &prepared.runs {
1546            if run.upsert
1547                && let Some(ref prior_id) = run.supersedes_id
1548            {
1549                sup_run.execute(params![prior_id])?;
1550            }
1551            ins_run.execute(params![
1552                run.id,
1553                run.kind,
1554                run.status,
1555                run.properties,
1556                run.source_ref
1557            ])?;
1558        }
1559    }
1560
1561    // Step inserts (with optional upsert).
1562    {
1563        let mut sup_step = tx.prepare_cached(
1564            "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1565        )?;
1566        let mut ins_step = tx.prepare_cached(
1567            "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1568             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1569        )?;
1570        for step in &prepared.steps {
1571            if step.upsert
1572                && let Some(ref prior_id) = step.supersedes_id
1573            {
1574                sup_step.execute(params![prior_id])?;
1575            }
1576            ins_step.execute(params![
1577                step.id,
1578                step.run_id,
1579                step.kind,
1580                step.status,
1581                step.properties,
1582                step.source_ref,
1583            ])?;
1584        }
1585    }
1586
1587    // Action inserts (with optional upsert).
1588    {
1589        let mut sup_action = tx.prepare_cached(
1590            "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1591        )?;
1592        let mut ins_action = tx.prepare_cached(
1593            "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1594             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1595        )?;
1596        for action in &prepared.actions {
1597            if action.upsert
1598                && let Some(ref prior_id) = action.supersedes_id
1599            {
1600                sup_action.execute(params![prior_id])?;
1601            }
1602            ins_action.execute(params![
1603                action.id,
1604                action.step_id,
1605                action.kind,
1606                action.status,
1607                action.properties,
1608                action.source_ref,
1609            ])?;
1610        }
1611    }
1612
1613    // Operational mutation log writes and latest-state current rows.
1614    {
1615        ensure_operational_collections_writable(&tx, prepared)?;
1616        prepared.operational_validation_warnings =
1617            validate_operational_writes_against_live_contracts(&tx, prepared)?;
1618        let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
1619
1620        let mut next_mutation_order: i64 = tx.query_row(
1621            "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
1622            [],
1623            |row| row.get(0),
1624        )?;
1625        let mut ins_mutation = tx.prepare_cached(
1626            "INSERT INTO operational_mutations \
1627             (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1628             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1629        )?;
1630        let mut ins_filter_value = tx.prepare_cached(
1631            "INSERT INTO operational_filter_values \
1632             (mutation_id, collection_name, field_name, string_value, integer_value) \
1633             VALUES (?1, ?2, ?3, ?4, ?5)",
1634        )?;
1635        let mut upsert_current = tx.prepare_cached(
1636            "INSERT INTO operational_current \
1637             (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1638             VALUES (?1, ?2, ?3, unixepoch(), ?4) \
1639             ON CONFLICT(collection_name, record_key) DO UPDATE SET \
1640                 payload_json = excluded.payload_json, \
1641                 updated_at = excluded.updated_at, \
1642                 last_mutation_id = excluded.last_mutation_id",
1643        )?;
1644        let mut del_current = tx.prepare_cached(
1645            "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
1646        )?;
1647        let mut del_current_secondary_indexes = tx.prepare_cached(
1648            "DELETE FROM operational_secondary_index_entries \
1649             WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
1650        )?;
1651        let mut ins_secondary_index = tx.prepare_cached(
1652            "INSERT INTO operational_secondary_index_entries \
1653             (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
1654              slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
1655             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1656        )?;
1657        let mut current_row_stmt = tx.prepare_cached(
1658            "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
1659             WHERE collection_name = ?1 AND record_key = ?2",
1660        )?;
1661
1662        for write in &prepared.operational_writes {
1663            let collection = operational_write_collection(write);
1664            let record_key = operational_write_record_key(write);
1665            let mutation_id = new_id();
1666            next_mutation_order += 1;
1667            let payload_json = operational_write_payload(write);
1668            ins_mutation.execute(params![
1669                &mutation_id,
1670                collection,
1671                record_key,
1672                operational_write_kind(write),
1673                payload_json,
1674                operational_write_source_ref(write),
1675                next_mutation_order,
1676            ])?;
1677            if let Some(indexes) = collection_secondary_indexes.get(collection) {
1678                for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
1679                    ins_secondary_index.execute(params![
1680                        collection,
1681                        entry.index_name,
1682                        "mutation",
1683                        &mutation_id,
1684                        record_key,
1685                        entry.sort_timestamp,
1686                        entry.slot1_text,
1687                        entry.slot1_integer,
1688                        entry.slot2_text,
1689                        entry.slot2_integer,
1690                        entry.slot3_text,
1691                        entry.slot3_integer,
1692                    ])?;
1693                }
1694            }
1695            if let Some(filter_fields) = prepared
1696                .operational_collection_filter_fields
1697                .get(collection)
1698            {
1699                for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
1700                    ins_filter_value.execute(params![
1701                        &mutation_id,
1702                        collection,
1703                        filter_value.field_name,
1704                        filter_value.string_value,
1705                        filter_value.integer_value,
1706                    ])?;
1707                }
1708            }
1709
1710            if prepared.operational_collection_kinds.get(collection)
1711                == Some(&OperationalCollectionKind::LatestState)
1712            {
1713                del_current_secondary_indexes.execute(params![collection, record_key])?;
1714                match write {
1715                    OperationalWrite::Put { payload_json, .. } => {
1716                        upsert_current.execute(params![
1717                            collection,
1718                            record_key,
1719                            payload_json,
1720                            &mutation_id,
1721                        ])?;
1722                        if let Some(indexes) = collection_secondary_indexes.get(collection) {
1723                            let (current_payload_json, updated_at, last_mutation_id): (
1724                                String,
1725                                i64,
1726                                String,
1727                            ) = current_row_stmt
1728                                .query_row(params![collection, record_key], |row| {
1729                                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1730                                })?;
1731                            for entry in extract_secondary_index_entries_for_current(
1732                                indexes,
1733                                &current_payload_json,
1734                                updated_at,
1735                            ) {
1736                                ins_secondary_index.execute(params![
1737                                    collection,
1738                                    entry.index_name,
1739                                    "current",
1740                                    last_mutation_id.as_str(),
1741                                    record_key,
1742                                    entry.sort_timestamp,
1743                                    entry.slot1_text,
1744                                    entry.slot1_integer,
1745                                    entry.slot2_text,
1746                                    entry.slot2_integer,
1747                                    entry.slot3_text,
1748                                    entry.slot3_integer,
1749                                ])?;
1750                            }
1751                        }
1752                    }
1753                    OperationalWrite::Delete { .. } => {
1754                        del_current.execute(params![collection, record_key])?;
1755                    }
1756                    OperationalWrite::Append { .. } => {}
1757                }
1758            }
1759        }
1760    }
1761
1762    // FTS row inserts.
1763    {
1764        let mut ins_fts = tx.prepare_cached(
1765            "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
1766             VALUES (?1, ?2, ?3, ?4)",
1767        )?;
1768        for fts_row in &prepared.required_fts_rows {
1769            ins_fts.execute(params![
1770                fts_row.chunk_id,
1771                fts_row.node_logical_id,
1772                fts_row.kind,
1773                fts_row.text_content,
1774            ])?;
1775        }
1776    }
1777
1778    // Property FTS row inserts.
1779    if !prepared.required_property_fts_rows.is_empty() {
1780        let mut ins_prop_fts = tx.prepare_cached(
1781            "INSERT INTO fts_node_properties (node_logical_id, kind, text_content) \
1782             VALUES (?1, ?2, ?3)",
1783        )?;
1784        for row in &prepared.required_property_fts_rows {
1785            ins_prop_fts.execute(params![row.node_logical_id, row.kind, row.text_content,])?;
1786        }
1787    }
1788
1789    // Vec inserts (feature-gated; silently skipped when sqlite-vec is absent or table missing).
1790    #[cfg(feature = "sqlite-vec")]
1791    {
1792        match tx
1793            .prepare_cached("INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES (?1, ?2)")
1794        {
1795            Ok(mut ins_vec) => {
1796                for vi in &prepared.vec_inserts {
1797                    let bytes: Vec<u8> =
1798                        vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
1799                    ins_vec.execute(params![vi.chunk_id, bytes])?;
1800                }
1801            }
1802            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {
1803                // vec profile absent: vec inserts are silently skipped.
1804            }
1805            Err(e) => return Err(e.into()),
1806        }
1807    }
1808
1809    tx.commit()?;
1810
1811    let provenance_warnings: Vec<String> = prepared
1812        .nodes
1813        .iter()
1814        .filter(|node| node.source_ref.is_none())
1815        .map(|node| format!("node '{}' has no source_ref", node.logical_id))
1816        .chain(
1817            prepared
1818                .node_retires
1819                .iter()
1820                .filter(|r| r.source_ref.is_none())
1821                .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
1822        )
1823        .chain(
1824            prepared
1825                .edges
1826                .iter()
1827                .filter(|e| e.source_ref.is_none())
1828                .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
1829        )
1830        .chain(
1831            prepared
1832                .edge_retires
1833                .iter()
1834                .filter(|r| r.source_ref.is_none())
1835                .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
1836        )
1837        .chain(
1838            prepared
1839                .runs
1840                .iter()
1841                .filter(|r| r.source_ref.is_none())
1842                .map(|r| format!("run '{}' has no source_ref", r.id)),
1843        )
1844        .chain(
1845            prepared
1846                .steps
1847                .iter()
1848                .filter(|s| s.source_ref.is_none())
1849                .map(|s| format!("step '{}' has no source_ref", s.id)),
1850        )
1851        .chain(
1852            prepared
1853                .actions
1854                .iter()
1855                .filter(|a| a.source_ref.is_none())
1856                .map(|a| format!("action '{}' has no source_ref", a.id)),
1857        )
1858        .chain(
1859            prepared
1860                .operational_writes
1861                .iter()
1862                .filter(|write| operational_write_source_ref(write).is_none())
1863                .map(|write| {
1864                    format!(
1865                        "operational {} '{}:{}' has no source_ref",
1866                        operational_write_kind(write),
1867                        operational_write_collection(write),
1868                        operational_write_record_key(write)
1869                    )
1870                }),
1871        )
1872        .collect();
1873
1874    let mut warnings = provenance_warnings.clone();
1875    warnings.extend(prepared.operational_validation_warnings.clone());
1876
1877    Ok(WriteReceipt {
1878        label: prepared.label.clone(),
1879        optional_backfill_count: prepared.optional_backfills.len(),
1880        warnings,
1881        provenance_warnings,
1882    })
1883}
1884
1885fn operational_write_collection(write: &OperationalWrite) -> &str {
1886    match write {
1887        OperationalWrite::Append { collection, .. }
1888        | OperationalWrite::Put { collection, .. }
1889        | OperationalWrite::Delete { collection, .. } => collection,
1890    }
1891}
1892
1893fn operational_write_record_key(write: &OperationalWrite) -> &str {
1894    match write {
1895        OperationalWrite::Append { record_key, .. }
1896        | OperationalWrite::Put { record_key, .. }
1897        | OperationalWrite::Delete { record_key, .. } => record_key,
1898    }
1899}
1900
1901fn operational_write_kind(write: &OperationalWrite) -> &'static str {
1902    match write {
1903        OperationalWrite::Append { .. } => "append",
1904        OperationalWrite::Put { .. } => "put",
1905        OperationalWrite::Delete { .. } => "delete",
1906    }
1907}
1908
1909fn operational_write_payload(write: &OperationalWrite) -> &str {
1910    match write {
1911        OperationalWrite::Append { payload_json, .. }
1912        | OperationalWrite::Put { payload_json, .. } => payload_json,
1913        OperationalWrite::Delete { .. } => "null",
1914    }
1915}
1916
1917fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
1918    match write {
1919        OperationalWrite::Append { source_ref, .. }
1920        | OperationalWrite::Put { source_ref, .. }
1921        | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
1922    }
1923}
1924
1925#[cfg(test)]
1926#[allow(clippy::expect_used)]
1927mod tests {
1928    use std::sync::Arc;
1929
1930    use fathomdb_schema::SchemaManager;
1931    use tempfile::NamedTempFile;
1932
1933    use super::{apply_write, prepare_write, resolve_operational_writes};
1934    use crate::{
1935        ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
1936        NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
1937        StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
1938        projection::ProjectionTarget,
1939    };
1940
1941    #[test]
1942    fn writer_executes_runtime_table_rows() {
1943        let db = NamedTempFile::new().expect("temporary db");
1944        let writer = WriterActor::start(
1945            db.path(),
1946            Arc::new(SchemaManager::new()),
1947            ProvenanceMode::Warn,
1948            Arc::new(TelemetryCounters::default()),
1949        )
1950        .expect("writer");
1951
1952        let receipt = writer
1953            .submit(WriteRequest {
1954                label: "runtime".to_owned(),
1955                nodes: vec![],
1956                node_retires: vec![],
1957                edges: vec![],
1958                edge_retires: vec![],
1959                chunks: vec![],
1960                runs: vec![RunInsert {
1961                    id: "run-1".to_owned(),
1962                    kind: "session".to_owned(),
1963                    status: "completed".to_owned(),
1964                    properties: "{}".to_owned(),
1965                    source_ref: Some("src-1".to_owned()),
1966                    upsert: false,
1967                    supersedes_id: None,
1968                }],
1969                steps: vec![StepInsert {
1970                    id: "step-1".to_owned(),
1971                    run_id: "run-1".to_owned(),
1972                    kind: "llm".to_owned(),
1973                    status: "completed".to_owned(),
1974                    properties: "{}".to_owned(),
1975                    source_ref: Some("src-1".to_owned()),
1976                    upsert: false,
1977                    supersedes_id: None,
1978                }],
1979                actions: vec![ActionInsert {
1980                    id: "action-1".to_owned(),
1981                    step_id: "step-1".to_owned(),
1982                    kind: "emit".to_owned(),
1983                    status: "completed".to_owned(),
1984                    properties: "{}".to_owned(),
1985                    source_ref: Some("src-1".to_owned()),
1986                    upsert: false,
1987                    supersedes_id: None,
1988                }],
1989                optional_backfills: vec![],
1990                vec_inserts: vec![],
1991                operational_writes: vec![],
1992            })
1993            .expect("write receipt");
1994
1995        assert_eq!(receipt.label, "runtime");
1996    }
1997
1998    #[test]
1999    fn writer_put_operational_write_updates_current_and_mutations() {
2000        let db = NamedTempFile::new().expect("temporary db");
2001        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2002        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2003        conn.execute(
2004            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2005             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2006            [],
2007        )
2008        .expect("seed collection");
2009        drop(conn);
2010        let writer = WriterActor::start(
2011            db.path(),
2012            Arc::new(SchemaManager::new()),
2013            ProvenanceMode::Warn,
2014            Arc::new(TelemetryCounters::default()),
2015        )
2016        .expect("writer");
2017
2018        writer
2019            .submit(WriteRequest {
2020                label: "node-and-operational".to_owned(),
2021                nodes: vec![NodeInsert {
2022                    row_id: "row-1".to_owned(),
2023                    logical_id: "lg-1".to_owned(),
2024                    kind: "Meeting".to_owned(),
2025                    properties: "{}".to_owned(),
2026                    source_ref: Some("src-1".to_owned()),
2027                    upsert: false,
2028                    chunk_policy: ChunkPolicy::Preserve,
2029                    content_ref: None,
2030                }],
2031                node_retires: vec![],
2032                edges: vec![],
2033                edge_retires: vec![],
2034                chunks: vec![],
2035                runs: vec![],
2036                steps: vec![],
2037                actions: vec![],
2038                optional_backfills: vec![],
2039                vec_inserts: vec![],
2040                operational_writes: vec![OperationalWrite::Put {
2041                    collection: "connector_health".to_owned(),
2042                    record_key: "gmail".to_owned(),
2043                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2044                    source_ref: Some("src-1".to_owned()),
2045                }],
2046            })
2047            .expect("write receipt");
2048
2049        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2050        let node_count: i64 = conn
2051            .query_row(
2052                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2053                [],
2054                |row| row.get(0),
2055            )
2056            .expect("node count");
2057        assert_eq!(node_count, 1);
2058        let mutation_count: i64 = conn
2059            .query_row(
2060                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2061                 AND record_key = 'gmail'",
2062                [],
2063                |row| row.get(0),
2064            )
2065            .expect("mutation count");
2066        assert_eq!(mutation_count, 1);
2067        let payload: String = conn
2068            .query_row(
2069                "SELECT payload_json FROM operational_current \
2070                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2071                [],
2072                |row| row.get(0),
2073            )
2074            .expect("current payload");
2075        assert_eq!(payload, r#"{"status":"ok"}"#);
2076    }
2077
2078    #[test]
2079    fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2080        let db = NamedTempFile::new().expect("temporary db");
2081        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2082        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2083        conn.execute(
2084            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2085             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2086            [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2087        )
2088        .expect("seed collection");
2089        drop(conn);
2090        let writer = WriterActor::start(
2091            db.path(),
2092            Arc::new(SchemaManager::new()),
2093            ProvenanceMode::Warn,
2094            Arc::new(TelemetryCounters::default()),
2095        )
2096        .expect("writer");
2097
2098        writer
2099            .submit(WriteRequest {
2100                label: "disabled-validation".to_owned(),
2101                nodes: vec![],
2102                node_retires: vec![],
2103                edges: vec![],
2104                edge_retires: vec![],
2105                chunks: vec![],
2106                runs: vec![],
2107                steps: vec![],
2108                actions: vec![],
2109                optional_backfills: vec![],
2110                vec_inserts: vec![],
2111                operational_writes: vec![OperationalWrite::Put {
2112                    collection: "connector_health".to_owned(),
2113                    record_key: "gmail".to_owned(),
2114                    payload_json: r#"{"bogus":true}"#.to_owned(),
2115                    source_ref: Some("src-1".to_owned()),
2116                }],
2117            })
2118            .expect("write receipt");
2119
2120        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2121        let payload: String = conn
2122            .query_row(
2123                "SELECT payload_json FROM operational_current \
2124                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2125                [],
2126                |row| row.get(0),
2127            )
2128            .expect("current payload");
2129        assert_eq!(payload, r#"{"bogus":true}"#);
2130    }
2131
2132    #[test]
2133    fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2134        let db = NamedTempFile::new().expect("temporary db");
2135        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2136        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2137        conn.execute(
2138            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2139             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2140            [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2141        )
2142        .expect("seed collection");
2143        drop(conn);
2144        let writer = WriterActor::start(
2145            db.path(),
2146            Arc::new(SchemaManager::new()),
2147            ProvenanceMode::Warn,
2148            Arc::new(TelemetryCounters::default()),
2149        )
2150        .expect("writer");
2151
2152        let receipt = writer
2153            .submit(WriteRequest {
2154                label: "report-only-validation".to_owned(),
2155                nodes: vec![],
2156                node_retires: vec![],
2157                edges: vec![],
2158                edge_retires: vec![],
2159                chunks: vec![],
2160                runs: vec![],
2161                steps: vec![],
2162                actions: vec![],
2163                optional_backfills: vec![],
2164                vec_inserts: vec![],
2165                operational_writes: vec![OperationalWrite::Put {
2166                    collection: "connector_health".to_owned(),
2167                    record_key: "gmail".to_owned(),
2168                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2169                    source_ref: Some("src-1".to_owned()),
2170                }],
2171            })
2172            .expect("report_only write should succeed");
2173
2174        assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2175        assert_eq!(receipt.warnings.len(), 1);
2176        assert!(
2177            receipt.warnings[0].contains("connector_health"),
2178            "warning should identify collection"
2179        );
2180        assert!(
2181            receipt.warnings[0].contains("must be one of"),
2182            "warning should explain validation failure"
2183        );
2184
2185        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2186        let payload: String = conn
2187            .query_row(
2188                "SELECT payload_json FROM operational_current \
2189                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2190                [],
2191                |row| row.get(0),
2192            )
2193            .expect("current payload");
2194        assert_eq!(payload, r#"{"status":"bogus"}"#);
2195    }
2196
2197    #[test]
2198    fn writer_rejects_operational_write_for_missing_collection() {
2199        let db = NamedTempFile::new().expect("temporary db");
2200        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2201        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2202        drop(conn);
2203        let writer = WriterActor::start(
2204            db.path(),
2205            Arc::new(SchemaManager::new()),
2206            ProvenanceMode::Warn,
2207            Arc::new(TelemetryCounters::default()),
2208        )
2209        .expect("writer");
2210
2211        let result = writer.submit(WriteRequest {
2212            label: "missing-operational-collection".to_owned(),
2213            nodes: vec![],
2214            node_retires: vec![],
2215            edges: vec![],
2216            edge_retires: vec![],
2217            chunks: vec![],
2218            runs: vec![],
2219            steps: vec![],
2220            actions: vec![],
2221            optional_backfills: vec![],
2222            vec_inserts: vec![],
2223            operational_writes: vec![OperationalWrite::Put {
2224                collection: "connector_health".to_owned(),
2225                record_key: "gmail".to_owned(),
2226                payload_json: r#"{"status":"ok"}"#.to_owned(),
2227                source_ref: Some("src-1".to_owned()),
2228            }],
2229        });
2230
2231        assert!(
2232            matches!(result, Err(EngineError::InvalidWrite(_))),
2233            "missing operational collection must return InvalidWrite"
2234        );
2235    }
2236
2237    #[test]
2238    fn writer_append_operational_write_records_history_without_current_row() {
2239        let db = NamedTempFile::new().expect("temporary db");
2240        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2241        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2242        conn.execute(
2243            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2244             VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2245            [],
2246        )
2247        .expect("seed collection");
2248        drop(conn);
2249        let writer = WriterActor::start(
2250            db.path(),
2251            Arc::new(SchemaManager::new()),
2252            ProvenanceMode::Warn,
2253            Arc::new(TelemetryCounters::default()),
2254        )
2255        .expect("writer");
2256
2257        writer
2258            .submit(WriteRequest {
2259                label: "append-operational".to_owned(),
2260                nodes: vec![],
2261                node_retires: vec![],
2262                edges: vec![],
2263                edge_retires: vec![],
2264                chunks: vec![],
2265                runs: vec![],
2266                steps: vec![],
2267                actions: vec![],
2268                optional_backfills: vec![],
2269                vec_inserts: vec![],
2270                operational_writes: vec![OperationalWrite::Append {
2271                    collection: "audit_log".to_owned(),
2272                    record_key: "evt-1".to_owned(),
2273                    payload_json: r#"{"type":"sync"}"#.to_owned(),
2274                    source_ref: Some("src-1".to_owned()),
2275                }],
2276            })
2277            .expect("write receipt");
2278
2279        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2280        let mutation: (String, String) = conn
2281            .query_row(
2282                "SELECT op_kind, payload_json FROM operational_mutations \
2283                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2284                [],
2285                |row| Ok((row.get(0)?, row.get(1)?)),
2286            )
2287            .expect("mutation row");
2288        assert_eq!(mutation.0, "append");
2289        assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2290        let current_count: i64 = conn
2291            .query_row(
2292                "SELECT count(*) FROM operational_current \
2293                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2294                [],
2295                |row| row.get(0),
2296            )
2297            .expect("current count");
2298        assert_eq!(current_count, 0);
2299    }
2300
2301    #[test]
2302    fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2303        let db = NamedTempFile::new().expect("temporary db");
2304        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2305        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2306        conn.execute(
2307            "INSERT INTO operational_collections \
2308             (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2309             VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2310                     '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2311            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2312        )
2313        .expect("seed collection");
2314        drop(conn);
2315        let writer = WriterActor::start(
2316            db.path(),
2317            Arc::new(SchemaManager::new()),
2318            ProvenanceMode::Warn,
2319            Arc::new(TelemetryCounters::default()),
2320        )
2321        .expect("writer");
2322
2323        let error = writer
2324            .submit(WriteRequest {
2325                label: "invalid-append".to_owned(),
2326                nodes: vec![],
2327                node_retires: vec![],
2328                edges: vec![],
2329                edge_retires: vec![],
2330                chunks: vec![],
2331                runs: vec![],
2332                steps: vec![],
2333                actions: vec![],
2334                optional_backfills: vec![],
2335                vec_inserts: vec![],
2336                operational_writes: vec![OperationalWrite::Append {
2337                    collection: "audit_log".to_owned(),
2338                    record_key: "evt-1".to_owned(),
2339                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2340                    source_ref: Some("src-1".to_owned()),
2341                }],
2342            })
2343            .expect_err("invalid append must reject");
2344        assert!(matches!(error, EngineError::InvalidWrite(_)));
2345        assert!(error.to_string().contains("must be one of"));
2346
2347        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2348        let mutation_count: i64 = conn
2349            .query_row(
2350                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2351                [],
2352                |row| row.get(0),
2353            )
2354            .expect("mutation count");
2355        assert_eq!(mutation_count, 0);
2356        let filter_count: i64 = conn
2357            .query_row(
2358                "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2359                [],
2360                |row| row.get(0),
2361            )
2362            .expect("filter count");
2363        assert_eq!(filter_count, 0);
2364    }
2365
2366    #[test]
2367    fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2368        let db = NamedTempFile::new().expect("temporary db");
2369        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2370        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2371        conn.execute(
2372            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2373             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2374            [],
2375        )
2376        .expect("seed collection");
2377        drop(conn);
2378        let writer = WriterActor::start(
2379            db.path(),
2380            Arc::new(SchemaManager::new()),
2381            ProvenanceMode::Warn,
2382            Arc::new(TelemetryCounters::default()),
2383        )
2384        .expect("writer");
2385
2386        writer
2387            .submit(WriteRequest {
2388                label: "put-operational".to_owned(),
2389                nodes: vec![],
2390                node_retires: vec![],
2391                edges: vec![],
2392                edge_retires: vec![],
2393                chunks: vec![],
2394                runs: vec![],
2395                steps: vec![],
2396                actions: vec![],
2397                optional_backfills: vec![],
2398                vec_inserts: vec![],
2399                operational_writes: vec![OperationalWrite::Put {
2400                    collection: "connector_health".to_owned(),
2401                    record_key: "gmail".to_owned(),
2402                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2403                    source_ref: Some("src-1".to_owned()),
2404                }],
2405            })
2406            .expect("put receipt");
2407
2408        writer
2409            .submit(WriteRequest {
2410                label: "delete-operational".to_owned(),
2411                nodes: vec![],
2412                node_retires: vec![],
2413                edges: vec![],
2414                edge_retires: vec![],
2415                chunks: vec![],
2416                runs: vec![],
2417                steps: vec![],
2418                actions: vec![],
2419                optional_backfills: vec![],
2420                vec_inserts: vec![],
2421                operational_writes: vec![OperationalWrite::Delete {
2422                    collection: "connector_health".to_owned(),
2423                    record_key: "gmail".to_owned(),
2424                    source_ref: Some("src-2".to_owned()),
2425                }],
2426            })
2427            .expect("delete receipt");
2428
2429        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2430        let mutation_kinds: Vec<String> = {
2431            let mut stmt = conn
2432                .prepare(
2433                    "SELECT op_kind FROM operational_mutations \
2434                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2435                     ORDER BY mutation_order ASC",
2436                )
2437                .expect("stmt");
2438            stmt.query_map([], |row| row.get(0))
2439                .expect("rows")
2440                .collect::<Result<_, _>>()
2441                .expect("collect")
2442        };
2443        assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
2444        let current_count: i64 = conn
2445            .query_row(
2446                "SELECT count(*) FROM operational_current \
2447                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2448                [],
2449                |row| row.get(0),
2450            )
2451            .expect("current count");
2452        assert_eq!(current_count, 0);
2453    }
2454
2455    #[test]
2456    fn writer_delete_bypasses_validation_contract() {
2457        let db = NamedTempFile::new().expect("temporary db");
2458        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2459        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2460        conn.execute(
2461            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2462             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2463            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2464        )
2465        .expect("seed collection");
2466        drop(conn);
2467        let writer = WriterActor::start(
2468            db.path(),
2469            Arc::new(SchemaManager::new()),
2470            ProvenanceMode::Warn,
2471            Arc::new(TelemetryCounters::default()),
2472        )
2473        .expect("writer");
2474
2475        writer
2476            .submit(WriteRequest {
2477                label: "valid-put".to_owned(),
2478                nodes: vec![],
2479                node_retires: vec![],
2480                edges: vec![],
2481                edge_retires: vec![],
2482                chunks: vec![],
2483                runs: vec![],
2484                steps: vec![],
2485                actions: vec![],
2486                optional_backfills: vec![],
2487                vec_inserts: vec![],
2488                operational_writes: vec![OperationalWrite::Put {
2489                    collection: "connector_health".to_owned(),
2490                    record_key: "gmail".to_owned(),
2491                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2492                    source_ref: Some("src-1".to_owned()),
2493                }],
2494            })
2495            .expect("put receipt");
2496        writer
2497            .submit(WriteRequest {
2498                label: "delete-after-put".to_owned(),
2499                nodes: vec![],
2500                node_retires: vec![],
2501                edges: vec![],
2502                edge_retires: vec![],
2503                chunks: vec![],
2504                runs: vec![],
2505                steps: vec![],
2506                actions: vec![],
2507                optional_backfills: vec![],
2508                vec_inserts: vec![],
2509                operational_writes: vec![OperationalWrite::Delete {
2510                    collection: "connector_health".to_owned(),
2511                    record_key: "gmail".to_owned(),
2512                    source_ref: Some("src-2".to_owned()),
2513                }],
2514            })
2515            .expect("delete receipt");
2516
2517        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2518        let current_count: i64 = conn
2519            .query_row(
2520                "SELECT count(*) FROM operational_current \
2521                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2522                [],
2523                |row| row.get(0),
2524            )
2525            .expect("current count");
2526        assert_eq!(current_count, 0);
2527    }
2528
2529    #[test]
2530    fn writer_latest_state_secondary_indexes_track_put_and_delete() {
2531        let db = NamedTempFile::new().expect("temporary db");
2532        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2533        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2534        conn.execute(
2535            "INSERT INTO operational_collections \
2536             (name, kind, schema_json, retention_json, secondary_indexes_json) \
2537             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2538            [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
2539        )
2540        .expect("seed collection");
2541        drop(conn);
2542        let writer = WriterActor::start(
2543            db.path(),
2544            Arc::new(SchemaManager::new()),
2545            ProvenanceMode::Warn,
2546            Arc::new(TelemetryCounters::default()),
2547        )
2548        .expect("writer");
2549
2550        writer
2551            .submit(WriteRequest {
2552                label: "secondary-index-put".to_owned(),
2553                nodes: vec![],
2554                node_retires: vec![],
2555                edges: vec![],
2556                edge_retires: vec![],
2557                chunks: vec![],
2558                runs: vec![],
2559                steps: vec![],
2560                actions: vec![],
2561                optional_backfills: vec![],
2562                vec_inserts: vec![],
2563                operational_writes: vec![OperationalWrite::Put {
2564                    collection: "connector_health".to_owned(),
2565                    record_key: "gmail".to_owned(),
2566                    payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
2567                        .to_owned(),
2568                    source_ref: Some("src-1".to_owned()),
2569                }],
2570            })
2571            .expect("put receipt");
2572
2573        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2574        let current_entry_count: i64 = conn
2575            .query_row(
2576                "SELECT count(*) FROM operational_secondary_index_entries \
2577                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2578                [],
2579                |row| row.get(0),
2580            )
2581            .expect("current secondary index count");
2582        assert_eq!(current_entry_count, 2);
2583        drop(conn);
2584
2585        writer
2586            .submit(WriteRequest {
2587                label: "secondary-index-delete".to_owned(),
2588                nodes: vec![],
2589                node_retires: vec![],
2590                edges: vec![],
2591                edge_retires: vec![],
2592                chunks: vec![],
2593                runs: vec![],
2594                steps: vec![],
2595                actions: vec![],
2596                optional_backfills: vec![],
2597                vec_inserts: vec![],
2598                operational_writes: vec![OperationalWrite::Delete {
2599                    collection: "connector_health".to_owned(),
2600                    record_key: "gmail".to_owned(),
2601                    source_ref: Some("src-2".to_owned()),
2602                }],
2603            })
2604            .expect("delete receipt");
2605
2606        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2607        let current_entry_count: i64 = conn
2608            .query_row(
2609                "SELECT count(*) FROM operational_secondary_index_entries \
2610                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2611                [],
2612                |row| row.get(0),
2613            )
2614            .expect("current secondary index count");
2615        assert_eq!(current_entry_count, 0);
2616    }
2617
2618    #[test]
2619    fn writer_latest_state_operational_writes_persist_mutation_order() {
2620        let db = NamedTempFile::new().expect("temporary db");
2621        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2622        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2623        conn.execute(
2624            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2625             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2626            [],
2627        )
2628        .expect("seed collection");
2629        drop(conn);
2630
2631        let writer = WriterActor::start(
2632            db.path(),
2633            Arc::new(SchemaManager::new()),
2634            ProvenanceMode::Warn,
2635            Arc::new(TelemetryCounters::default()),
2636        )
2637        .expect("writer");
2638
2639        writer
2640            .submit(WriteRequest {
2641                label: "ordered-operational-batch".to_owned(),
2642                nodes: vec![],
2643                node_retires: vec![],
2644                edges: vec![],
2645                edge_retires: vec![],
2646                chunks: vec![],
2647                runs: vec![],
2648                steps: vec![],
2649                actions: vec![],
2650                optional_backfills: vec![],
2651                vec_inserts: vec![],
2652                operational_writes: vec![
2653                    OperationalWrite::Put {
2654                        collection: "connector_health".to_owned(),
2655                        record_key: "gmail".to_owned(),
2656                        payload_json: r#"{"status":"old"}"#.to_owned(),
2657                        source_ref: Some("src-1".to_owned()),
2658                    },
2659                    OperationalWrite::Delete {
2660                        collection: "connector_health".to_owned(),
2661                        record_key: "gmail".to_owned(),
2662                        source_ref: Some("src-2".to_owned()),
2663                    },
2664                    OperationalWrite::Put {
2665                        collection: "connector_health".to_owned(),
2666                        record_key: "gmail".to_owned(),
2667                        payload_json: r#"{"status":"new"}"#.to_owned(),
2668                        source_ref: Some("src-3".to_owned()),
2669                    },
2670                ],
2671            })
2672            .expect("write receipt");
2673
2674        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2675        let rows: Vec<(String, i64)> = {
2676            let mut stmt = conn
2677                .prepare(
2678                    "SELECT op_kind, mutation_order FROM operational_mutations \
2679                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2680                     ORDER BY mutation_order ASC",
2681                )
2682                .expect("stmt");
2683            stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
2684                .expect("rows")
2685                .collect::<Result<_, _>>()
2686                .expect("collect")
2687        };
2688        assert_eq!(
2689            rows,
2690            vec![
2691                ("put".to_owned(), 1),
2692                ("delete".to_owned(), 2),
2693                ("put".to_owned(), 3),
2694            ]
2695        );
2696        let payload: String = conn
2697            .query_row(
2698                "SELECT payload_json FROM operational_current \
2699                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2700                [],
2701                |row| row.get(0),
2702            )
2703            .expect("current payload");
2704        assert_eq!(payload, r#"{"status":"new"}"#);
2705    }
2706
2707    #[test]
2708    fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
2709        let db = NamedTempFile::new().expect("temporary db");
2710        let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
2711        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2712        conn.execute(
2713            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2714             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2715            [],
2716        )
2717        .expect("seed collection");
2718
2719        let request = WriteRequest {
2720            label: "disabled-race".to_owned(),
2721            nodes: vec![],
2722            node_retires: vec![],
2723            edges: vec![],
2724            edge_retires: vec![],
2725            chunks: vec![],
2726            runs: vec![],
2727            steps: vec![],
2728            actions: vec![],
2729            optional_backfills: vec![],
2730            vec_inserts: vec![],
2731            operational_writes: vec![OperationalWrite::Put {
2732                collection: "connector_health".to_owned(),
2733                record_key: "gmail".to_owned(),
2734                payload_json: r#"{"status":"ok"}"#.to_owned(),
2735                source_ref: Some("src-1".to_owned()),
2736            }],
2737        };
2738        let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
2739        resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
2740
2741        conn.execute(
2742            "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
2743            [],
2744        )
2745        .expect("disable collection after preflight");
2746
2747        let error =
2748            apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
2749        assert!(matches!(error, EngineError::InvalidWrite(_)));
2750        assert!(error.to_string().contains("is disabled"));
2751
2752        let mutation_count: i64 = conn
2753            .query_row(
2754                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
2755                [],
2756                |row| row.get(0),
2757            )
2758            .expect("mutation count");
2759        assert_eq!(mutation_count, 0);
2760
2761        let current_count: i64 = conn
2762            .query_row(
2763                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
2764                [],
2765                |row| row.get(0),
2766            )
2767            .expect("current count");
2768        assert_eq!(current_count, 0);
2769    }
2770
2771    #[test]
2772    fn writer_enforce_validation_rejects_invalid_put_atomically() {
2773        let db = NamedTempFile::new().expect("temporary db");
2774        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2775        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2776        conn.execute(
2777            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2778             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2779            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2780        )
2781        .expect("seed collection");
2782        drop(conn);
2783        let writer = WriterActor::start(
2784            db.path(),
2785            Arc::new(SchemaManager::new()),
2786            ProvenanceMode::Warn,
2787            Arc::new(TelemetryCounters::default()),
2788        )
2789        .expect("writer");
2790
2791        let error = writer
2792            .submit(WriteRequest {
2793                label: "invalid-put".to_owned(),
2794                nodes: vec![NodeInsert {
2795                    row_id: "row-1".to_owned(),
2796                    logical_id: "lg-1".to_owned(),
2797                    kind: "Meeting".to_owned(),
2798                    properties: "{}".to_owned(),
2799                    source_ref: Some("src-1".to_owned()),
2800                    upsert: false,
2801                    chunk_policy: ChunkPolicy::Preserve,
2802                    content_ref: None,
2803                }],
2804                node_retires: vec![],
2805                edges: vec![],
2806                edge_retires: vec![],
2807                chunks: vec![],
2808                runs: vec![],
2809                steps: vec![],
2810                actions: vec![],
2811                optional_backfills: vec![],
2812                vec_inserts: vec![],
2813                operational_writes: vec![OperationalWrite::Put {
2814                    collection: "connector_health".to_owned(),
2815                    record_key: "gmail".to_owned(),
2816                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2817                    source_ref: Some("src-1".to_owned()),
2818                }],
2819            })
2820            .expect_err("invalid put must reject");
2821        assert!(matches!(error, EngineError::InvalidWrite(_)));
2822        assert!(error.to_string().contains("must be one of"));
2823
2824        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2825        let node_count: i64 = conn
2826            .query_row(
2827                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2828                [],
2829                |row| row.get(0),
2830            )
2831            .expect("node count");
2832        assert_eq!(node_count, 0);
2833        let mutation_count: i64 = conn
2834            .query_row(
2835                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
2836                [],
2837                |row| row.get(0),
2838            )
2839            .expect("mutation count");
2840        assert_eq!(mutation_count, 0);
2841        let current_count: i64 = conn
2842            .query_row(
2843                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
2844                [],
2845                |row| row.get(0),
2846            )
2847            .expect("current count");
2848        assert_eq!(current_count, 0);
2849    }
2850
2851    #[test]
2852    fn writer_rejects_append_against_latest_state_collection() {
2853        let db = NamedTempFile::new().expect("temporary db");
2854        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2855        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2856        conn.execute(
2857            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2858             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2859            [],
2860        )
2861        .expect("seed collection");
2862        drop(conn);
2863        let writer = WriterActor::start(
2864            db.path(),
2865            Arc::new(SchemaManager::new()),
2866            ProvenanceMode::Warn,
2867            Arc::new(TelemetryCounters::default()),
2868        )
2869        .expect("writer");
2870
2871        let result = writer.submit(WriteRequest {
2872            label: "bad-append".to_owned(),
2873            nodes: vec![],
2874            node_retires: vec![],
2875            edges: vec![],
2876            edge_retires: vec![],
2877            chunks: vec![],
2878            runs: vec![],
2879            steps: vec![],
2880            actions: vec![],
2881            optional_backfills: vec![],
2882            vec_inserts: vec![],
2883            operational_writes: vec![OperationalWrite::Append {
2884                collection: "connector_health".to_owned(),
2885                record_key: "gmail".to_owned(),
2886                payload_json: r#"{"status":"ok"}"#.to_owned(),
2887                source_ref: Some("src-1".to_owned()),
2888            }],
2889        });
2890
2891        assert!(
2892            matches!(result, Err(EngineError::InvalidWrite(_))),
2893            "latest_state collection must reject Append"
2894        );
2895    }
2896
2897    #[test]
2898    fn writer_upsert_supersedes_prior_active_node() {
2899        let db = NamedTempFile::new().expect("temporary db");
2900        let writer = WriterActor::start(
2901            db.path(),
2902            Arc::new(SchemaManager::new()),
2903            ProvenanceMode::Warn,
2904            Arc::new(TelemetryCounters::default()),
2905        )
2906        .expect("writer");
2907
2908        writer
2909            .submit(WriteRequest {
2910                label: "v1".to_owned(),
2911                nodes: vec![NodeInsert {
2912                    row_id: "row-1".to_owned(),
2913                    logical_id: "lg-1".to_owned(),
2914                    kind: "Meeting".to_owned(),
2915                    properties: r#"{"version":1}"#.to_owned(),
2916                    source_ref: Some("src-1".to_owned()),
2917                    upsert: false,
2918                    chunk_policy: ChunkPolicy::Preserve,
2919                    content_ref: None,
2920                }],
2921                node_retires: vec![],
2922                edges: vec![],
2923                edge_retires: vec![],
2924                chunks: vec![],
2925                runs: vec![],
2926                steps: vec![],
2927                actions: vec![],
2928                optional_backfills: vec![],
2929                vec_inserts: vec![],
2930                operational_writes: vec![],
2931            })
2932            .expect("v1 write");
2933
2934        writer
2935            .submit(WriteRequest {
2936                label: "v2".to_owned(),
2937                nodes: vec![NodeInsert {
2938                    row_id: "row-2".to_owned(),
2939                    logical_id: "lg-1".to_owned(),
2940                    kind: "Meeting".to_owned(),
2941                    properties: r#"{"version":2}"#.to_owned(),
2942                    source_ref: Some("src-2".to_owned()),
2943                    upsert: true,
2944                    chunk_policy: ChunkPolicy::Preserve,
2945                    content_ref: None,
2946                }],
2947                node_retires: vec![],
2948                edges: vec![],
2949                edge_retires: vec![],
2950                chunks: vec![],
2951                runs: vec![],
2952                steps: vec![],
2953                actions: vec![],
2954                optional_backfills: vec![],
2955                vec_inserts: vec![],
2956                operational_writes: vec![],
2957            })
2958            .expect("v2 upsert write");
2959
2960        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2961        let (active_row_id, props): (String, String) = conn
2962            .query_row(
2963                "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
2964                [],
2965                |row| Ok((row.get(0)?, row.get(1)?)),
2966            )
2967            .expect("active row");
2968        assert_eq!(active_row_id, "row-2");
2969        assert!(props.contains("\"version\":2"));
2970
2971        let superseded: i64 = conn
2972            .query_row(
2973                "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
2974                [],
2975                |row| row.get(0),
2976            )
2977            .expect("superseded count");
2978        assert_eq!(superseded, 1);
2979    }
2980
2981    #[test]
2982    fn writer_inserts_edge_between_two_nodes() {
2983        let db = NamedTempFile::new().expect("temporary db");
2984        let writer = WriterActor::start(
2985            db.path(),
2986            Arc::new(SchemaManager::new()),
2987            ProvenanceMode::Warn,
2988            Arc::new(TelemetryCounters::default()),
2989        )
2990        .expect("writer");
2991
2992        writer
2993            .submit(WriteRequest {
2994                label: "nodes-and-edge".to_owned(),
2995                nodes: vec![
2996                    NodeInsert {
2997                        row_id: "row-meeting".to_owned(),
2998                        logical_id: "meeting-1".to_owned(),
2999                        kind: "Meeting".to_owned(),
3000                        properties: "{}".to_owned(),
3001                        source_ref: Some("src-1".to_owned()),
3002                        upsert: false,
3003                        chunk_policy: ChunkPolicy::Preserve,
3004                        content_ref: None,
3005                    },
3006                    NodeInsert {
3007                        row_id: "row-task".to_owned(),
3008                        logical_id: "task-1".to_owned(),
3009                        kind: "Task".to_owned(),
3010                        properties: "{}".to_owned(),
3011                        source_ref: Some("src-1".to_owned()),
3012                        upsert: false,
3013                        chunk_policy: ChunkPolicy::Preserve,
3014                        content_ref: None,
3015                    },
3016                ],
3017                node_retires: vec![],
3018                edges: vec![EdgeInsert {
3019                    row_id: "edge-1".to_owned(),
3020                    logical_id: "edge-lg-1".to_owned(),
3021                    source_logical_id: "meeting-1".to_owned(),
3022                    target_logical_id: "task-1".to_owned(),
3023                    kind: "HAS_TASK".to_owned(),
3024                    properties: "{}".to_owned(),
3025                    source_ref: Some("src-1".to_owned()),
3026                    upsert: false,
3027                }],
3028                edge_retires: vec![],
3029                chunks: vec![],
3030                runs: vec![],
3031                steps: vec![],
3032                actions: vec![],
3033                optional_backfills: vec![],
3034                vec_inserts: vec![],
3035                operational_writes: vec![],
3036            })
3037            .expect("write receipt");
3038
3039        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3040        let (src, tgt, kind): (String, String, String) = conn
3041            .query_row(
3042                "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3043                [],
3044                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3045            )
3046            .expect("edge row");
3047        assert_eq!(src, "meeting-1");
3048        assert_eq!(tgt, "task-1");
3049        assert_eq!(kind, "HAS_TASK");
3050    }
3051
3052    #[test]
3053    #[allow(clippy::too_many_lines)]
3054    fn writer_upsert_supersedes_prior_active_edge() {
3055        let db = NamedTempFile::new().expect("temporary db");
3056        let writer = WriterActor::start(
3057            db.path(),
3058            Arc::new(SchemaManager::new()),
3059            ProvenanceMode::Warn,
3060            Arc::new(TelemetryCounters::default()),
3061        )
3062        .expect("writer");
3063
3064        // Write two nodes
3065        writer
3066            .submit(WriteRequest {
3067                label: "nodes".to_owned(),
3068                nodes: vec![
3069                    NodeInsert {
3070                        row_id: "row-a".to_owned(),
3071                        logical_id: "node-a".to_owned(),
3072                        kind: "Meeting".to_owned(),
3073                        properties: "{}".to_owned(),
3074                        source_ref: Some("src-1".to_owned()),
3075                        upsert: false,
3076                        chunk_policy: ChunkPolicy::Preserve,
3077                        content_ref: None,
3078                    },
3079                    NodeInsert {
3080                        row_id: "row-b".to_owned(),
3081                        logical_id: "node-b".to_owned(),
3082                        kind: "Task".to_owned(),
3083                        properties: "{}".to_owned(),
3084                        source_ref: Some("src-1".to_owned()),
3085                        upsert: false,
3086                        chunk_policy: ChunkPolicy::Preserve,
3087                        content_ref: None,
3088                    },
3089                ],
3090                node_retires: vec![],
3091                edges: vec![],
3092                edge_retires: vec![],
3093                chunks: vec![],
3094                runs: vec![],
3095                steps: vec![],
3096                actions: vec![],
3097                optional_backfills: vec![],
3098                vec_inserts: vec![],
3099                operational_writes: vec![],
3100            })
3101            .expect("nodes write");
3102
3103        // Write v1 edge
3104        writer
3105            .submit(WriteRequest {
3106                label: "edge-v1".to_owned(),
3107                nodes: vec![],
3108                node_retires: vec![],
3109                edges: vec![EdgeInsert {
3110                    row_id: "edge-row-1".to_owned(),
3111                    logical_id: "edge-lg-1".to_owned(),
3112                    source_logical_id: "node-a".to_owned(),
3113                    target_logical_id: "node-b".to_owned(),
3114                    kind: "HAS_TASK".to_owned(),
3115                    properties: r#"{"weight":1}"#.to_owned(),
3116                    source_ref: Some("src-1".to_owned()),
3117                    upsert: false,
3118                }],
3119                edge_retires: vec![],
3120                chunks: vec![],
3121                runs: vec![],
3122                steps: vec![],
3123                actions: vec![],
3124                optional_backfills: vec![],
3125                vec_inserts: vec![],
3126                operational_writes: vec![],
3127            })
3128            .expect("edge v1 write");
3129
3130        // Upsert v2 edge
3131        writer
3132            .submit(WriteRequest {
3133                label: "edge-v2".to_owned(),
3134                nodes: vec![],
3135                node_retires: vec![],
3136                edges: vec![EdgeInsert {
3137                    row_id: "edge-row-2".to_owned(),
3138                    logical_id: "edge-lg-1".to_owned(),
3139                    source_logical_id: "node-a".to_owned(),
3140                    target_logical_id: "node-b".to_owned(),
3141                    kind: "HAS_TASK".to_owned(),
3142                    properties: r#"{"weight":2}"#.to_owned(),
3143                    source_ref: Some("src-2".to_owned()),
3144                    upsert: true,
3145                }],
3146                edge_retires: vec![],
3147                chunks: vec![],
3148                runs: vec![],
3149                steps: vec![],
3150                actions: vec![],
3151                optional_backfills: vec![],
3152                vec_inserts: vec![],
3153                operational_writes: vec![],
3154            })
3155            .expect("edge v2 upsert");
3156
3157        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3158        let (active_row_id, props): (String, String) = conn
3159            .query_row(
3160                "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3161                [],
3162                |row| Ok((row.get(0)?, row.get(1)?)),
3163            )
3164            .expect("active edge");
3165        assert_eq!(active_row_id, "edge-row-2");
3166        assert!(props.contains("\"weight\":2"));
3167
3168        let superseded: i64 = conn
3169            .query_row(
3170                "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3171                [],
3172                |row| row.get(0),
3173            )
3174            .expect("superseded count");
3175        assert_eq!(superseded, 1);
3176    }
3177
3178    #[test]
3179    fn writer_fts_rows_are_written_to_database() {
3180        let db = NamedTempFile::new().expect("temporary db");
3181        let writer = WriterActor::start(
3182            db.path(),
3183            Arc::new(SchemaManager::new()),
3184            ProvenanceMode::Warn,
3185            Arc::new(TelemetryCounters::default()),
3186        )
3187        .expect("writer");
3188
3189        writer
3190            .submit(WriteRequest {
3191                label: "seed".to_owned(),
3192                nodes: vec![NodeInsert {
3193                    row_id: "row-1".to_owned(),
3194                    logical_id: "logical-1".to_owned(),
3195                    kind: "Meeting".to_owned(),
3196                    properties: "{}".to_owned(),
3197                    source_ref: Some("src-1".to_owned()),
3198                    upsert: false,
3199                    chunk_policy: ChunkPolicy::Preserve,
3200                    content_ref: None,
3201                }],
3202                node_retires: vec![],
3203                edges: vec![],
3204                edge_retires: vec![],
3205                chunks: vec![ChunkInsert {
3206                    id: "chunk-1".to_owned(),
3207                    node_logical_id: "logical-1".to_owned(),
3208                    text_content: "budget discussion".to_owned(),
3209                    byte_start: None,
3210                    byte_end: None,
3211                    content_hash: None,
3212                }],
3213                runs: vec![],
3214                steps: vec![],
3215                actions: vec![],
3216                optional_backfills: vec![],
3217                vec_inserts: vec![],
3218                operational_writes: vec![],
3219            })
3220            .expect("write receipt");
3221
3222        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3223        let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3224            conn.query_row(
3225                "SELECT chunk_id, node_logical_id, kind, text_content \
3226                 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3227                [],
3228                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3229            )
3230            .expect("fts row");
3231        assert_eq!(chunk_id, "chunk-1");
3232        assert_eq!(node_logical_id, "logical-1");
3233        assert_eq!(kind, "Meeting");
3234        assert_eq!(text_content, "budget discussion");
3235    }
3236
3237    #[test]
3238    fn writer_receipt_warns_on_nodes_without_source_ref() {
3239        let db = NamedTempFile::new().expect("temporary db");
3240        let writer = WriterActor::start(
3241            db.path(),
3242            Arc::new(SchemaManager::new()),
3243            ProvenanceMode::Warn,
3244            Arc::new(TelemetryCounters::default()),
3245        )
3246        .expect("writer");
3247
3248        let receipt = writer
3249            .submit(WriteRequest {
3250                label: "no-source".to_owned(),
3251                nodes: vec![NodeInsert {
3252                    row_id: "row-1".to_owned(),
3253                    logical_id: "logical-1".to_owned(),
3254                    kind: "Meeting".to_owned(),
3255                    properties: "{}".to_owned(),
3256                    source_ref: None,
3257                    upsert: false,
3258                    chunk_policy: ChunkPolicy::Preserve,
3259                    content_ref: None,
3260                }],
3261                node_retires: vec![],
3262                edges: vec![],
3263                edge_retires: vec![],
3264                chunks: vec![],
3265                runs: vec![],
3266                steps: vec![],
3267                actions: vec![],
3268                optional_backfills: vec![],
3269                vec_inserts: vec![],
3270                operational_writes: vec![],
3271            })
3272            .expect("write receipt");
3273
3274        assert_eq!(receipt.provenance_warnings.len(), 1);
3275        assert!(receipt.provenance_warnings[0].contains("logical-1"));
3276    }
3277
3278    #[test]
3279    fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3280        let db = NamedTempFile::new().expect("temporary db");
3281        let writer = WriterActor::start(
3282            db.path(),
3283            Arc::new(SchemaManager::new()),
3284            ProvenanceMode::Warn,
3285            Arc::new(TelemetryCounters::default()),
3286        )
3287        .expect("writer");
3288
3289        let receipt = writer
3290            .submit(WriteRequest {
3291                label: "with-source".to_owned(),
3292                nodes: vec![NodeInsert {
3293                    row_id: "row-1".to_owned(),
3294                    logical_id: "logical-1".to_owned(),
3295                    kind: "Meeting".to_owned(),
3296                    properties: "{}".to_owned(),
3297                    source_ref: Some("src-1".to_owned()),
3298                    upsert: false,
3299                    chunk_policy: ChunkPolicy::Preserve,
3300                    content_ref: None,
3301                }],
3302                node_retires: vec![],
3303                edges: vec![],
3304                edge_retires: vec![],
3305                chunks: vec![],
3306                runs: vec![],
3307                steps: vec![],
3308                actions: vec![],
3309                optional_backfills: vec![],
3310                vec_inserts: vec![],
3311                operational_writes: vec![],
3312            })
3313            .expect("write receipt");
3314
3315        assert!(receipt.provenance_warnings.is_empty());
3316    }
3317
3318    #[test]
3319    fn writer_accepts_chunk_for_pre_existing_node() {
3320        let db = NamedTempFile::new().expect("temporary db");
3321        let writer = WriterActor::start(
3322            db.path(),
3323            Arc::new(SchemaManager::new()),
3324            ProvenanceMode::Warn,
3325            Arc::new(TelemetryCounters::default()),
3326        )
3327        .expect("writer");
3328
3329        // Request 1: submit node only
3330        writer
3331            .submit(WriteRequest {
3332                label: "r1".to_owned(),
3333                nodes: vec![NodeInsert {
3334                    row_id: "row-1".to_owned(),
3335                    logical_id: "logical-1".to_owned(),
3336                    kind: "Meeting".to_owned(),
3337                    properties: "{}".to_owned(),
3338                    source_ref: Some("src-1".to_owned()),
3339                    upsert: false,
3340                    chunk_policy: ChunkPolicy::Preserve,
3341                    content_ref: None,
3342                }],
3343                node_retires: vec![],
3344                edges: vec![],
3345                edge_retires: vec![],
3346                chunks: vec![],
3347                runs: vec![],
3348                steps: vec![],
3349                actions: vec![],
3350                optional_backfills: vec![],
3351                vec_inserts: vec![],
3352                operational_writes: vec![],
3353            })
3354            .expect("r1 write");
3355
3356        // Request 2: submit chunk for pre-existing node
3357        writer
3358            .submit(WriteRequest {
3359                label: "r2".to_owned(),
3360                nodes: vec![],
3361                node_retires: vec![],
3362                edges: vec![],
3363                edge_retires: vec![],
3364                chunks: vec![ChunkInsert {
3365                    id: "chunk-1".to_owned(),
3366                    node_logical_id: "logical-1".to_owned(),
3367                    text_content: "budget discussion".to_owned(),
3368                    byte_start: None,
3369                    byte_end: None,
3370                    content_hash: None,
3371                }],
3372                runs: vec![],
3373                steps: vec![],
3374                actions: vec![],
3375                optional_backfills: vec![],
3376                vec_inserts: vec![],
3377                operational_writes: vec![],
3378            })
3379            .expect("r2 write — chunk for pre-existing node");
3380
3381        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3382        let count: i64 = conn
3383            .query_row(
3384                "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3385                [],
3386                |row| row.get(0),
3387            )
3388            .expect("fts count");
3389        assert_eq!(
3390            count, 1,
3391            "FTS row must exist for chunk attached to pre-existing node"
3392        );
3393    }
3394
3395    #[test]
3396    fn writer_rejects_chunk_for_completely_unknown_node() {
3397        let db = NamedTempFile::new().expect("temporary db");
3398        let writer = WriterActor::start(
3399            db.path(),
3400            Arc::new(SchemaManager::new()),
3401            ProvenanceMode::Warn,
3402            Arc::new(TelemetryCounters::default()),
3403        )
3404        .expect("writer");
3405
3406        let result = writer.submit(WriteRequest {
3407            label: "bad".to_owned(),
3408            nodes: vec![],
3409            node_retires: vec![],
3410            edges: vec![],
3411            edge_retires: vec![],
3412            chunks: vec![ChunkInsert {
3413                id: "chunk-1".to_owned(),
3414                node_logical_id: "nonexistent".to_owned(),
3415                text_content: "some text".to_owned(),
3416                byte_start: None,
3417                byte_end: None,
3418                content_hash: None,
3419            }],
3420            runs: vec![],
3421            steps: vec![],
3422            actions: vec![],
3423            optional_backfills: vec![],
3424            vec_inserts: vec![],
3425            operational_writes: vec![],
3426        });
3427
3428        assert!(
3429            matches!(result, Err(EngineError::InvalidWrite(_))),
3430            "completely unknown node must return InvalidWrite"
3431        );
3432    }
3433
3434    #[test]
3435    fn writer_executes_typed_nodes_chunks_and_derived_projections() {
3436        let db = NamedTempFile::new().expect("temporary db");
3437        let writer = WriterActor::start(
3438            db.path(),
3439            Arc::new(SchemaManager::new()),
3440            ProvenanceMode::Warn,
3441            Arc::new(TelemetryCounters::default()),
3442        )
3443        .expect("writer");
3444
3445        let receipt = writer
3446            .submit(WriteRequest {
3447                label: "seed".to_owned(),
3448                nodes: vec![NodeInsert {
3449                    row_id: "row-1".to_owned(),
3450                    logical_id: "logical-1".to_owned(),
3451                    kind: "Meeting".to_owned(),
3452                    properties: "{}".to_owned(),
3453                    source_ref: None,
3454                    upsert: false,
3455                    chunk_policy: ChunkPolicy::Preserve,
3456                    content_ref: None,
3457                }],
3458                node_retires: vec![],
3459                edges: vec![],
3460                edge_retires: vec![],
3461                chunks: vec![ChunkInsert {
3462                    id: "chunk-1".to_owned(),
3463                    node_logical_id: "logical-1".to_owned(),
3464                    text_content: "budget discussion".to_owned(),
3465                    byte_start: None,
3466                    byte_end: None,
3467                    content_hash: None,
3468                }],
3469                runs: vec![],
3470                steps: vec![],
3471                actions: vec![],
3472                optional_backfills: vec![],
3473                vec_inserts: vec![],
3474                operational_writes: vec![],
3475            })
3476            .expect("write receipt");
3477
3478        assert_eq!(receipt.label, "seed");
3479    }
3480
3481    #[test]
3482    fn writer_node_retire_supersedes_active_node() {
3483        let db = NamedTempFile::new().expect("temporary db");
3484        let writer = WriterActor::start(
3485            db.path(),
3486            Arc::new(SchemaManager::new()),
3487            ProvenanceMode::Warn,
3488            Arc::new(TelemetryCounters::default()),
3489        )
3490        .expect("writer");
3491
3492        writer
3493            .submit(WriteRequest {
3494                label: "seed".to_owned(),
3495                nodes: vec![NodeInsert {
3496                    row_id: "row-1".to_owned(),
3497                    logical_id: "meeting-1".to_owned(),
3498                    kind: "Meeting".to_owned(),
3499                    properties: "{}".to_owned(),
3500                    source_ref: Some("src-1".to_owned()),
3501                    upsert: false,
3502                    chunk_policy: ChunkPolicy::Preserve,
3503                    content_ref: None,
3504                }],
3505                node_retires: vec![],
3506                edges: vec![],
3507                edge_retires: vec![],
3508                chunks: vec![],
3509                runs: vec![],
3510                steps: vec![],
3511                actions: vec![],
3512                optional_backfills: vec![],
3513                vec_inserts: vec![],
3514                operational_writes: vec![],
3515            })
3516            .expect("seed write");
3517
3518        writer
3519            .submit(WriteRequest {
3520                label: "retire".to_owned(),
3521                nodes: vec![],
3522                node_retires: vec![NodeRetire {
3523                    logical_id: "meeting-1".to_owned(),
3524                    source_ref: Some("src-2".to_owned()),
3525                }],
3526                edges: vec![],
3527                edge_retires: vec![],
3528                chunks: vec![],
3529                runs: vec![],
3530                steps: vec![],
3531                actions: vec![],
3532                optional_backfills: vec![],
3533                vec_inserts: vec![],
3534                operational_writes: vec![],
3535            })
3536            .expect("retire write");
3537
3538        let conn = rusqlite::Connection::open(db.path()).expect("open");
3539        let active: i64 = conn
3540            .query_row(
3541                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
3542                [],
3543                |r| r.get(0),
3544            )
3545            .expect("count active");
3546        let historical: i64 = conn
3547            .query_row(
3548                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
3549                [],
3550                |r| r.get(0),
3551            )
3552            .expect("count historical");
3553
3554        assert_eq!(active, 0, "active count must be 0 after retire");
3555        assert_eq!(historical, 1, "historical count must be 1 after retire");
3556    }
3557
3558    #[test]
3559    fn writer_node_retire_preserves_chunks_and_clears_fts() {
3560        let db = NamedTempFile::new().expect("temporary db");
3561        let writer = WriterActor::start(
3562            db.path(),
3563            Arc::new(SchemaManager::new()),
3564            ProvenanceMode::Warn,
3565            Arc::new(TelemetryCounters::default()),
3566        )
3567        .expect("writer");
3568
3569        writer
3570            .submit(WriteRequest {
3571                label: "seed".to_owned(),
3572                nodes: vec![NodeInsert {
3573                    row_id: "row-1".to_owned(),
3574                    logical_id: "meeting-1".to_owned(),
3575                    kind: "Meeting".to_owned(),
3576                    properties: "{}".to_owned(),
3577                    source_ref: Some("src-1".to_owned()),
3578                    upsert: false,
3579                    chunk_policy: ChunkPolicy::Preserve,
3580                    content_ref: None,
3581                }],
3582                node_retires: vec![],
3583                edges: vec![],
3584                edge_retires: vec![],
3585                chunks: vec![ChunkInsert {
3586                    id: "chunk-1".to_owned(),
3587                    node_logical_id: "meeting-1".to_owned(),
3588                    text_content: "budget discussion".to_owned(),
3589                    byte_start: None,
3590                    byte_end: None,
3591                    content_hash: None,
3592                }],
3593                runs: vec![],
3594                steps: vec![],
3595                actions: vec![],
3596                optional_backfills: vec![],
3597                vec_inserts: vec![],
3598                operational_writes: vec![],
3599            })
3600            .expect("seed write");
3601
3602        writer
3603            .submit(WriteRequest {
3604                label: "retire".to_owned(),
3605                nodes: vec![],
3606                node_retires: vec![NodeRetire {
3607                    logical_id: "meeting-1".to_owned(),
3608                    source_ref: Some("src-2".to_owned()),
3609                }],
3610                edges: vec![],
3611                edge_retires: vec![],
3612                chunks: vec![],
3613                runs: vec![],
3614                steps: vec![],
3615                actions: vec![],
3616                optional_backfills: vec![],
3617                vec_inserts: vec![],
3618                operational_writes: vec![],
3619            })
3620            .expect("retire write");
3621
3622        let conn = rusqlite::Connection::open(db.path()).expect("open");
3623        let chunk_count: i64 = conn
3624            .query_row(
3625                "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
3626                [],
3627                |r| r.get(0),
3628            )
3629            .expect("chunk count");
3630        let fts_count: i64 = conn
3631            .query_row(
3632                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
3633                [],
3634                |r| r.get(0),
3635            )
3636            .expect("fts count");
3637
3638        assert_eq!(
3639            chunk_count, 1,
3640            "chunks must remain after node retire so restore can re-establish content"
3641        );
3642        assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
3643    }
3644
3645    #[test]
3646    fn writer_edge_retire_supersedes_active_edge() {
3647        let db = NamedTempFile::new().expect("temporary db");
3648        let writer = WriterActor::start(
3649            db.path(),
3650            Arc::new(SchemaManager::new()),
3651            ProvenanceMode::Warn,
3652            Arc::new(TelemetryCounters::default()),
3653        )
3654        .expect("writer");
3655
3656        writer
3657            .submit(WriteRequest {
3658                label: "seed".to_owned(),
3659                nodes: vec![
3660                    NodeInsert {
3661                        row_id: "row-a".to_owned(),
3662                        logical_id: "node-a".to_owned(),
3663                        kind: "Meeting".to_owned(),
3664                        properties: "{}".to_owned(),
3665                        source_ref: Some("src-1".to_owned()),
3666                        upsert: false,
3667                        chunk_policy: ChunkPolicy::Preserve,
3668                        content_ref: None,
3669                    },
3670                    NodeInsert {
3671                        row_id: "row-b".to_owned(),
3672                        logical_id: "node-b".to_owned(),
3673                        kind: "Task".to_owned(),
3674                        properties: "{}".to_owned(),
3675                        source_ref: Some("src-1".to_owned()),
3676                        upsert: false,
3677                        chunk_policy: ChunkPolicy::Preserve,
3678                        content_ref: None,
3679                    },
3680                ],
3681                node_retires: vec![],
3682                edges: vec![EdgeInsert {
3683                    row_id: "edge-1".to_owned(),
3684                    logical_id: "edge-lg-1".to_owned(),
3685                    source_logical_id: "node-a".to_owned(),
3686                    target_logical_id: "node-b".to_owned(),
3687                    kind: "HAS_TASK".to_owned(),
3688                    properties: "{}".to_owned(),
3689                    source_ref: Some("src-1".to_owned()),
3690                    upsert: false,
3691                }],
3692                edge_retires: vec![],
3693                chunks: vec![],
3694                runs: vec![],
3695                steps: vec![],
3696                actions: vec![],
3697                optional_backfills: vec![],
3698                vec_inserts: vec![],
3699                operational_writes: vec![],
3700            })
3701            .expect("seed write");
3702
3703        writer
3704            .submit(WriteRequest {
3705                label: "retire-edge".to_owned(),
3706                nodes: vec![],
3707                node_retires: vec![],
3708                edges: vec![],
3709                edge_retires: vec![EdgeRetire {
3710                    logical_id: "edge-lg-1".to_owned(),
3711                    source_ref: Some("src-2".to_owned()),
3712                }],
3713                chunks: vec![],
3714                runs: vec![],
3715                steps: vec![],
3716                actions: vec![],
3717                optional_backfills: vec![],
3718                vec_inserts: vec![],
3719                operational_writes: vec![],
3720            })
3721            .expect("retire edge write");
3722
3723        let conn = rusqlite::Connection::open(db.path()).expect("open");
3724        let active: i64 = conn
3725            .query_row(
3726                "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3727                [],
3728                |r| r.get(0),
3729            )
3730            .expect("active edge count");
3731
3732        assert_eq!(active, 0, "active edge count must be 0 after retire");
3733    }
3734
3735    #[test]
3736    fn writer_retire_without_source_ref_emits_provenance_warning() {
3737        let db = NamedTempFile::new().expect("temporary db");
3738        let writer = WriterActor::start(
3739            db.path(),
3740            Arc::new(SchemaManager::new()),
3741            ProvenanceMode::Warn,
3742            Arc::new(TelemetryCounters::default()),
3743        )
3744        .expect("writer");
3745
3746        writer
3747            .submit(WriteRequest {
3748                label: "seed".to_owned(),
3749                nodes: vec![NodeInsert {
3750                    row_id: "row-1".to_owned(),
3751                    logical_id: "meeting-1".to_owned(),
3752                    kind: "Meeting".to_owned(),
3753                    properties: "{}".to_owned(),
3754                    source_ref: Some("src-1".to_owned()),
3755                    upsert: false,
3756                    chunk_policy: ChunkPolicy::Preserve,
3757                    content_ref: None,
3758                }],
3759                node_retires: vec![],
3760                edges: vec![],
3761                edge_retires: vec![],
3762                chunks: vec![],
3763                runs: vec![],
3764                steps: vec![],
3765                actions: vec![],
3766                optional_backfills: vec![],
3767                vec_inserts: vec![],
3768                operational_writes: vec![],
3769            })
3770            .expect("seed write");
3771
3772        let receipt = writer
3773            .submit(WriteRequest {
3774                label: "retire-no-src".to_owned(),
3775                nodes: vec![],
3776                node_retires: vec![NodeRetire {
3777                    logical_id: "meeting-1".to_owned(),
3778                    source_ref: None,
3779                }],
3780                edges: vec![],
3781                edge_retires: vec![],
3782                chunks: vec![],
3783                runs: vec![],
3784                steps: vec![],
3785                actions: vec![],
3786                optional_backfills: vec![],
3787                vec_inserts: vec![],
3788                operational_writes: vec![],
3789            })
3790            .expect("retire write");
3791
3792        assert!(
3793            !receipt.provenance_warnings.is_empty(),
3794            "retire without source_ref must emit a provenance warning"
3795        );
3796    }
3797
3798    #[test]
3799    #[allow(clippy::too_many_lines)]
3800    fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
3801        let db = NamedTempFile::new().expect("temporary db");
3802        let writer = WriterActor::start(
3803            db.path(),
3804            Arc::new(SchemaManager::new()),
3805            ProvenanceMode::Warn,
3806            Arc::new(TelemetryCounters::default()),
3807        )
3808        .expect("writer");
3809
3810        writer
3811            .submit(WriteRequest {
3812                label: "v1".to_owned(),
3813                nodes: vec![NodeInsert {
3814                    row_id: "row-1".to_owned(),
3815                    logical_id: "meeting-1".to_owned(),
3816                    kind: "Meeting".to_owned(),
3817                    properties: "{}".to_owned(),
3818                    source_ref: Some("src-1".to_owned()),
3819                    upsert: false,
3820                    chunk_policy: ChunkPolicy::Preserve,
3821                    content_ref: None,
3822                }],
3823                node_retires: vec![],
3824                edges: vec![],
3825                edge_retires: vec![],
3826                chunks: vec![ChunkInsert {
3827                    id: "chunk-old".to_owned(),
3828                    node_logical_id: "meeting-1".to_owned(),
3829                    text_content: "old text".to_owned(),
3830                    byte_start: None,
3831                    byte_end: None,
3832                    content_hash: None,
3833                }],
3834                runs: vec![],
3835                steps: vec![],
3836                actions: vec![],
3837                optional_backfills: vec![],
3838                vec_inserts: vec![],
3839                operational_writes: vec![],
3840            })
3841            .expect("v1 write");
3842
3843        writer
3844            .submit(WriteRequest {
3845                label: "v2".to_owned(),
3846                nodes: vec![NodeInsert {
3847                    row_id: "row-2".to_owned(),
3848                    logical_id: "meeting-1".to_owned(),
3849                    kind: "Meeting".to_owned(),
3850                    properties: "{}".to_owned(),
3851                    source_ref: Some("src-2".to_owned()),
3852                    upsert: true,
3853                    chunk_policy: ChunkPolicy::Replace,
3854                    content_ref: None,
3855                }],
3856                node_retires: vec![],
3857                edges: vec![],
3858                edge_retires: vec![],
3859                chunks: vec![ChunkInsert {
3860                    id: "chunk-new".to_owned(),
3861                    node_logical_id: "meeting-1".to_owned(),
3862                    text_content: "new text".to_owned(),
3863                    byte_start: None,
3864                    byte_end: None,
3865                    content_hash: None,
3866                }],
3867                runs: vec![],
3868                steps: vec![],
3869                actions: vec![],
3870                optional_backfills: vec![],
3871                vec_inserts: vec![],
3872                operational_writes: vec![],
3873            })
3874            .expect("v2 write");
3875
3876        let conn = rusqlite::Connection::open(db.path()).expect("open");
3877        let old_chunk: i64 = conn
3878            .query_row(
3879                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
3880                [],
3881                |r| r.get(0),
3882            )
3883            .expect("old chunk count");
3884        let new_chunk: i64 = conn
3885            .query_row(
3886                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
3887                [],
3888                |r| r.get(0),
3889            )
3890            .expect("new chunk count");
3891        let fts_old: i64 = conn
3892            .query_row(
3893                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
3894                [],
3895                |r| r.get(0),
3896            )
3897            .expect("old fts count");
3898
3899        assert_eq!(
3900            old_chunk, 0,
3901            "old chunk must be deleted by ChunkPolicy::Replace"
3902        );
3903        assert_eq!(new_chunk, 1, "new chunk must exist after replace");
3904        assert_eq!(
3905            fts_old, 0,
3906            "old FTS row must be deleted by ChunkPolicy::Replace"
3907        );
3908    }
3909
3910    #[test]
3911    fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
3912        let db = NamedTempFile::new().expect("temporary db");
3913        let writer = WriterActor::start(
3914            db.path(),
3915            Arc::new(SchemaManager::new()),
3916            ProvenanceMode::Warn,
3917            Arc::new(TelemetryCounters::default()),
3918        )
3919        .expect("writer");
3920
3921        writer
3922            .submit(WriteRequest {
3923                label: "v1".to_owned(),
3924                nodes: vec![NodeInsert {
3925                    row_id: "row-1".to_owned(),
3926                    logical_id: "meeting-1".to_owned(),
3927                    kind: "Meeting".to_owned(),
3928                    properties: "{}".to_owned(),
3929                    source_ref: Some("src-1".to_owned()),
3930                    upsert: false,
3931                    chunk_policy: ChunkPolicy::Preserve,
3932                    content_ref: None,
3933                }],
3934                node_retires: vec![],
3935                edges: vec![],
3936                edge_retires: vec![],
3937                chunks: vec![ChunkInsert {
3938                    id: "chunk-old".to_owned(),
3939                    node_logical_id: "meeting-1".to_owned(),
3940                    text_content: "old text".to_owned(),
3941                    byte_start: None,
3942                    byte_end: None,
3943                    content_hash: None,
3944                }],
3945                runs: vec![],
3946                steps: vec![],
3947                actions: vec![],
3948                optional_backfills: vec![],
3949                vec_inserts: vec![],
3950                operational_writes: vec![],
3951            })
3952            .expect("v1 write");
3953
3954        writer
3955            .submit(WriteRequest {
3956                label: "v2-props-only".to_owned(),
3957                nodes: vec![NodeInsert {
3958                    row_id: "row-2".to_owned(),
3959                    logical_id: "meeting-1".to_owned(),
3960                    kind: "Meeting".to_owned(),
3961                    properties: r#"{"status":"updated"}"#.to_owned(),
3962                    source_ref: Some("src-2".to_owned()),
3963                    upsert: true,
3964                    chunk_policy: ChunkPolicy::Preserve,
3965                    content_ref: None,
3966                }],
3967                node_retires: vec![],
3968                edges: vec![],
3969                edge_retires: vec![],
3970                chunks: vec![],
3971                runs: vec![],
3972                steps: vec![],
3973                actions: vec![],
3974                optional_backfills: vec![],
3975                vec_inserts: vec![],
3976                operational_writes: vec![],
3977            })
3978            .expect("v2 preserve write");
3979
3980        let conn = rusqlite::Connection::open(db.path()).expect("open");
3981        let old_chunk: i64 = conn
3982            .query_row(
3983                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
3984                [],
3985                |r| r.get(0),
3986            )
3987            .expect("old chunk count");
3988
3989        assert_eq!(
3990            old_chunk, 1,
3991            "old chunk must be preserved by ChunkPolicy::Preserve"
3992        );
3993    }
3994
3995    #[test]
3996    fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
3997        let db = NamedTempFile::new().expect("temporary db");
3998        let writer = WriterActor::start(
3999            db.path(),
4000            Arc::new(SchemaManager::new()),
4001            ProvenanceMode::Warn,
4002            Arc::new(TelemetryCounters::default()),
4003        )
4004        .expect("writer");
4005
4006        writer
4007            .submit(WriteRequest {
4008                label: "v1".to_owned(),
4009                nodes: vec![NodeInsert {
4010                    row_id: "row-1".to_owned(),
4011                    logical_id: "meeting-1".to_owned(),
4012                    kind: "Meeting".to_owned(),
4013                    properties: "{}".to_owned(),
4014                    source_ref: Some("src-1".to_owned()),
4015                    upsert: false,
4016                    chunk_policy: ChunkPolicy::Preserve,
4017                    content_ref: None,
4018                }],
4019                node_retires: vec![],
4020                edges: vec![],
4021                edge_retires: vec![],
4022                chunks: vec![ChunkInsert {
4023                    id: "chunk-existing".to_owned(),
4024                    node_logical_id: "meeting-1".to_owned(),
4025                    text_content: "existing text".to_owned(),
4026                    byte_start: None,
4027                    byte_end: None,
4028                    content_hash: None,
4029                }],
4030                runs: vec![],
4031                steps: vec![],
4032                actions: vec![],
4033                optional_backfills: vec![],
4034                vec_inserts: vec![],
4035                operational_writes: vec![],
4036            })
4037            .expect("v1 write");
4038
4039        // Insert a second node (not upsert) with ChunkPolicy::Replace — should NOT delete prior chunks
4040        writer
4041            .submit(WriteRequest {
4042                label: "insert-no-upsert".to_owned(),
4043                nodes: vec![NodeInsert {
4044                    row_id: "row-2".to_owned(),
4045                    logical_id: "meeting-2".to_owned(),
4046                    kind: "Meeting".to_owned(),
4047                    properties: "{}".to_owned(),
4048                    source_ref: Some("src-2".to_owned()),
4049                    upsert: false,
4050                    chunk_policy: ChunkPolicy::Replace,
4051                    content_ref: None,
4052                }],
4053                node_retires: vec![],
4054                edges: vec![],
4055                edge_retires: vec![],
4056                chunks: vec![],
4057                runs: vec![],
4058                steps: vec![],
4059                actions: vec![],
4060                optional_backfills: vec![],
4061                vec_inserts: vec![],
4062                operational_writes: vec![],
4063            })
4064            .expect("insert no-upsert write");
4065
4066        let conn = rusqlite::Connection::open(db.path()).expect("open");
4067        let existing_chunk: i64 = conn
4068            .query_row(
4069                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4070                [],
4071                |r| r.get(0),
4072            )
4073            .expect("chunk count");
4074
4075        assert_eq!(
4076            existing_chunk, 1,
4077            "ChunkPolicy::Replace without upsert must not delete existing chunks"
4078        );
4079    }
4080
4081    #[test]
4082    fn writer_run_upsert_supersedes_prior_active_run() {
4083        let db = NamedTempFile::new().expect("temporary db");
4084        let writer = WriterActor::start(
4085            db.path(),
4086            Arc::new(SchemaManager::new()),
4087            ProvenanceMode::Warn,
4088            Arc::new(TelemetryCounters::default()),
4089        )
4090        .expect("writer");
4091
4092        writer
4093            .submit(WriteRequest {
4094                label: "v1".to_owned(),
4095                nodes: vec![],
4096                node_retires: vec![],
4097                edges: vec![],
4098                edge_retires: vec![],
4099                chunks: vec![],
4100                runs: vec![RunInsert {
4101                    id: "run-v1".to_owned(),
4102                    kind: "session".to_owned(),
4103                    status: "completed".to_owned(),
4104                    properties: "{}".to_owned(),
4105                    source_ref: Some("src-1".to_owned()),
4106                    upsert: false,
4107                    supersedes_id: None,
4108                }],
4109                steps: vec![],
4110                actions: vec![],
4111                optional_backfills: vec![],
4112                vec_inserts: vec![],
4113                operational_writes: vec![],
4114            })
4115            .expect("v1 run write");
4116
4117        writer
4118            .submit(WriteRequest {
4119                label: "v2".to_owned(),
4120                nodes: vec![],
4121                node_retires: vec![],
4122                edges: vec![],
4123                edge_retires: vec![],
4124                chunks: vec![],
4125                runs: vec![RunInsert {
4126                    id: "run-v2".to_owned(),
4127                    kind: "session".to_owned(),
4128                    status: "completed".to_owned(),
4129                    properties: "{}".to_owned(),
4130                    source_ref: Some("src-2".to_owned()),
4131                    upsert: true,
4132                    supersedes_id: Some("run-v1".to_owned()),
4133                }],
4134                steps: vec![],
4135                actions: vec![],
4136                optional_backfills: vec![],
4137                vec_inserts: vec![],
4138                operational_writes: vec![],
4139            })
4140            .expect("v2 run write");
4141
4142        let conn = rusqlite::Connection::open(db.path()).expect("open");
4143        let v1_historical: i64 = conn
4144            .query_row(
4145                "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4146                [],
4147                |r| r.get(0),
4148            )
4149            .expect("v1 historical count");
4150        let v2_active: i64 = conn
4151            .query_row(
4152                "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4153                [],
4154                |r| r.get(0),
4155            )
4156            .expect("v2 active count");
4157
4158        assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4159        assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4160    }
4161
4162    #[test]
4163    fn writer_step_upsert_supersedes_prior_active_step() {
4164        let db = NamedTempFile::new().expect("temporary db");
4165        let writer = WriterActor::start(
4166            db.path(),
4167            Arc::new(SchemaManager::new()),
4168            ProvenanceMode::Warn,
4169            Arc::new(TelemetryCounters::default()),
4170        )
4171        .expect("writer");
4172
4173        writer
4174            .submit(WriteRequest {
4175                label: "v1".to_owned(),
4176                nodes: vec![],
4177                node_retires: vec![],
4178                edges: vec![],
4179                edge_retires: vec![],
4180                chunks: vec![],
4181                runs: vec![RunInsert {
4182                    id: "run-1".to_owned(),
4183                    kind: "session".to_owned(),
4184                    status: "completed".to_owned(),
4185                    properties: "{}".to_owned(),
4186                    source_ref: Some("src-1".to_owned()),
4187                    upsert: false,
4188                    supersedes_id: None,
4189                }],
4190                steps: vec![StepInsert {
4191                    id: "step-v1".to_owned(),
4192                    run_id: "run-1".to_owned(),
4193                    kind: "llm".to_owned(),
4194                    status: "completed".to_owned(),
4195                    properties: "{}".to_owned(),
4196                    source_ref: Some("src-1".to_owned()),
4197                    upsert: false,
4198                    supersedes_id: None,
4199                }],
4200                actions: vec![],
4201                optional_backfills: vec![],
4202                vec_inserts: vec![],
4203                operational_writes: vec![],
4204            })
4205            .expect("v1 step write");
4206
4207        writer
4208            .submit(WriteRequest {
4209                label: "v2".to_owned(),
4210                nodes: vec![],
4211                node_retires: vec![],
4212                edges: vec![],
4213                edge_retires: vec![],
4214                chunks: vec![],
4215                runs: vec![],
4216                steps: vec![StepInsert {
4217                    id: "step-v2".to_owned(),
4218                    run_id: "run-1".to_owned(),
4219                    kind: "llm".to_owned(),
4220                    status: "completed".to_owned(),
4221                    properties: "{}".to_owned(),
4222                    source_ref: Some("src-2".to_owned()),
4223                    upsert: true,
4224                    supersedes_id: Some("step-v1".to_owned()),
4225                }],
4226                actions: vec![],
4227                optional_backfills: vec![],
4228                vec_inserts: vec![],
4229                operational_writes: vec![],
4230            })
4231            .expect("v2 step write");
4232
4233        let conn = rusqlite::Connection::open(db.path()).expect("open");
4234        let v1_historical: i64 = conn
4235            .query_row(
4236                "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4237                [],
4238                |r| r.get(0),
4239            )
4240            .expect("v1 historical count");
4241        let v2_active: i64 = conn
4242            .query_row(
4243                "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4244                [],
4245                |r| r.get(0),
4246            )
4247            .expect("v2 active count");
4248
4249        assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4250        assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4251    }
4252
4253    #[test]
4254    fn writer_action_upsert_supersedes_prior_active_action() {
4255        let db = NamedTempFile::new().expect("temporary db");
4256        let writer = WriterActor::start(
4257            db.path(),
4258            Arc::new(SchemaManager::new()),
4259            ProvenanceMode::Warn,
4260            Arc::new(TelemetryCounters::default()),
4261        )
4262        .expect("writer");
4263
4264        writer
4265            .submit(WriteRequest {
4266                label: "v1".to_owned(),
4267                nodes: vec![],
4268                node_retires: vec![],
4269                edges: vec![],
4270                edge_retires: vec![],
4271                chunks: vec![],
4272                runs: vec![RunInsert {
4273                    id: "run-1".to_owned(),
4274                    kind: "session".to_owned(),
4275                    status: "completed".to_owned(),
4276                    properties: "{}".to_owned(),
4277                    source_ref: Some("src-1".to_owned()),
4278                    upsert: false,
4279                    supersedes_id: None,
4280                }],
4281                steps: vec![StepInsert {
4282                    id: "step-1".to_owned(),
4283                    run_id: "run-1".to_owned(),
4284                    kind: "llm".to_owned(),
4285                    status: "completed".to_owned(),
4286                    properties: "{}".to_owned(),
4287                    source_ref: Some("src-1".to_owned()),
4288                    upsert: false,
4289                    supersedes_id: None,
4290                }],
4291                actions: vec![ActionInsert {
4292                    id: "action-v1".to_owned(),
4293                    step_id: "step-1".to_owned(),
4294                    kind: "emit".to_owned(),
4295                    status: "completed".to_owned(),
4296                    properties: "{}".to_owned(),
4297                    source_ref: Some("src-1".to_owned()),
4298                    upsert: false,
4299                    supersedes_id: None,
4300                }],
4301                optional_backfills: vec![],
4302                vec_inserts: vec![],
4303                operational_writes: vec![],
4304            })
4305            .expect("v1 action write");
4306
4307        writer
4308            .submit(WriteRequest {
4309                label: "v2".to_owned(),
4310                nodes: vec![],
4311                node_retires: vec![],
4312                edges: vec![],
4313                edge_retires: vec![],
4314                chunks: vec![],
4315                runs: vec![],
4316                steps: vec![],
4317                actions: vec![ActionInsert {
4318                    id: "action-v2".to_owned(),
4319                    step_id: "step-1".to_owned(),
4320                    kind: "emit".to_owned(),
4321                    status: "completed".to_owned(),
4322                    properties: "{}".to_owned(),
4323                    source_ref: Some("src-2".to_owned()),
4324                    upsert: true,
4325                    supersedes_id: Some("action-v1".to_owned()),
4326                }],
4327                optional_backfills: vec![],
4328                vec_inserts: vec![],
4329                operational_writes: vec![],
4330            })
4331            .expect("v2 action write");
4332
4333        let conn = rusqlite::Connection::open(db.path()).expect("open");
4334        let v1_historical: i64 = conn
4335            .query_row(
4336                "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4337                [],
4338                |r| r.get(0),
4339            )
4340            .expect("v1 historical count");
4341        let v2_active: i64 = conn
4342            .query_row(
4343                "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4344                [],
4345                |r| r.get(0),
4346            )
4347            .expect("v2 active count");
4348
4349        assert_eq!(
4350            v1_historical, 1,
4351            "action-v1 must be historical after upsert"
4352        );
4353        assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4354    }
4355
4356    // P0: runtime upsert without supersedes_id must be rejected
4357
4358    #[test]
4359    fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4360        let db = NamedTempFile::new().expect("temporary db");
4361        let writer = WriterActor::start(
4362            db.path(),
4363            Arc::new(SchemaManager::new()),
4364            ProvenanceMode::Warn,
4365            Arc::new(TelemetryCounters::default()),
4366        )
4367        .expect("writer");
4368
4369        let result = writer.submit(WriteRequest {
4370            label: "bad".to_owned(),
4371            nodes: vec![],
4372            node_retires: vec![],
4373            edges: vec![],
4374            edge_retires: vec![],
4375            chunks: vec![],
4376            runs: vec![RunInsert {
4377                id: "run-1".to_owned(),
4378                kind: "session".to_owned(),
4379                status: "completed".to_owned(),
4380                properties: "{}".to_owned(),
4381                source_ref: None,
4382                upsert: true,
4383                supersedes_id: None,
4384            }],
4385            steps: vec![],
4386            actions: vec![],
4387            optional_backfills: vec![],
4388            vec_inserts: vec![],
4389            operational_writes: vec![],
4390        });
4391
4392        assert!(
4393            matches!(result, Err(EngineError::InvalidWrite(_))),
4394            "run upsert=true without supersedes_id must return InvalidWrite"
4395        );
4396    }
4397
4398    #[test]
4399    fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4400        let db = NamedTempFile::new().expect("temporary db");
4401        let writer = WriterActor::start(
4402            db.path(),
4403            Arc::new(SchemaManager::new()),
4404            ProvenanceMode::Warn,
4405            Arc::new(TelemetryCounters::default()),
4406        )
4407        .expect("writer");
4408
4409        let result = writer.submit(WriteRequest {
4410            label: "bad".to_owned(),
4411            nodes: vec![],
4412            node_retires: vec![],
4413            edges: vec![],
4414            edge_retires: vec![],
4415            chunks: vec![],
4416            runs: vec![],
4417            steps: vec![StepInsert {
4418                id: "step-1".to_owned(),
4419                run_id: "run-1".to_owned(),
4420                kind: "llm".to_owned(),
4421                status: "completed".to_owned(),
4422                properties: "{}".to_owned(),
4423                source_ref: None,
4424                upsert: true,
4425                supersedes_id: None,
4426            }],
4427            actions: vec![],
4428            optional_backfills: vec![],
4429            vec_inserts: vec![],
4430            operational_writes: vec![],
4431        });
4432
4433        assert!(
4434            matches!(result, Err(EngineError::InvalidWrite(_))),
4435            "step upsert=true without supersedes_id must return InvalidWrite"
4436        );
4437    }
4438
4439    #[test]
4440    fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
4441        let db = NamedTempFile::new().expect("temporary db");
4442        let writer = WriterActor::start(
4443            db.path(),
4444            Arc::new(SchemaManager::new()),
4445            ProvenanceMode::Warn,
4446            Arc::new(TelemetryCounters::default()),
4447        )
4448        .expect("writer");
4449
4450        let result = writer.submit(WriteRequest {
4451            label: "bad".to_owned(),
4452            nodes: vec![],
4453            node_retires: vec![],
4454            edges: vec![],
4455            edge_retires: vec![],
4456            chunks: vec![],
4457            runs: vec![],
4458            steps: vec![],
4459            actions: vec![ActionInsert {
4460                id: "action-1".to_owned(),
4461                step_id: "step-1".to_owned(),
4462                kind: "emit".to_owned(),
4463                status: "completed".to_owned(),
4464                properties: "{}".to_owned(),
4465                source_ref: None,
4466                upsert: true,
4467                supersedes_id: None,
4468            }],
4469            optional_backfills: vec![],
4470            vec_inserts: vec![],
4471            operational_writes: vec![],
4472        });
4473
4474        assert!(
4475            matches!(result, Err(EngineError::InvalidWrite(_))),
4476            "action upsert=true without supersedes_id must return InvalidWrite"
4477        );
4478    }
4479
4480    // P1a/b: provenance warnings for edge inserts and runtime table inserts
4481
4482    #[test]
4483    fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
4484        let db = NamedTempFile::new().expect("temporary db");
4485        let writer = WriterActor::start(
4486            db.path(),
4487            Arc::new(SchemaManager::new()),
4488            ProvenanceMode::Warn,
4489            Arc::new(TelemetryCounters::default()),
4490        )
4491        .expect("writer");
4492
4493        let receipt = writer
4494            .submit(WriteRequest {
4495                label: "test".to_owned(),
4496                nodes: vec![
4497                    NodeInsert {
4498                        row_id: "row-a".to_owned(),
4499                        logical_id: "node-a".to_owned(),
4500                        kind: "Meeting".to_owned(),
4501                        properties: "{}".to_owned(),
4502                        source_ref: Some("src-1".to_owned()),
4503                        upsert: false,
4504                        chunk_policy: ChunkPolicy::Preserve,
4505                        content_ref: None,
4506                    },
4507                    NodeInsert {
4508                        row_id: "row-b".to_owned(),
4509                        logical_id: "node-b".to_owned(),
4510                        kind: "Task".to_owned(),
4511                        properties: "{}".to_owned(),
4512                        source_ref: Some("src-1".to_owned()),
4513                        upsert: false,
4514                        chunk_policy: ChunkPolicy::Preserve,
4515                        content_ref: None,
4516                    },
4517                ],
4518                node_retires: vec![],
4519                edges: vec![EdgeInsert {
4520                    row_id: "edge-1".to_owned(),
4521                    logical_id: "edge-lg-1".to_owned(),
4522                    source_logical_id: "node-a".to_owned(),
4523                    target_logical_id: "node-b".to_owned(),
4524                    kind: "HAS_TASK".to_owned(),
4525                    properties: "{}".to_owned(),
4526                    source_ref: None,
4527                    upsert: false,
4528                }],
4529                edge_retires: vec![],
4530                chunks: vec![],
4531                runs: vec![],
4532                steps: vec![],
4533                actions: vec![],
4534                optional_backfills: vec![],
4535                vec_inserts: vec![],
4536                operational_writes: vec![],
4537            })
4538            .expect("write");
4539
4540        assert!(
4541            !receipt.provenance_warnings.is_empty(),
4542            "edge insert without source_ref must emit a provenance warning"
4543        );
4544    }
4545
4546    #[test]
4547    fn writer_run_insert_without_source_ref_emits_provenance_warning() {
4548        let db = NamedTempFile::new().expect("temporary db");
4549        let writer = WriterActor::start(
4550            db.path(),
4551            Arc::new(SchemaManager::new()),
4552            ProvenanceMode::Warn,
4553            Arc::new(TelemetryCounters::default()),
4554        )
4555        .expect("writer");
4556
4557        let receipt = writer
4558            .submit(WriteRequest {
4559                label: "test".to_owned(),
4560                nodes: vec![],
4561                node_retires: vec![],
4562                edges: vec![],
4563                edge_retires: vec![],
4564                chunks: vec![],
4565                runs: vec![RunInsert {
4566                    id: "run-1".to_owned(),
4567                    kind: "session".to_owned(),
4568                    status: "completed".to_owned(),
4569                    properties: "{}".to_owned(),
4570                    source_ref: None,
4571                    upsert: false,
4572                    supersedes_id: None,
4573                }],
4574                steps: vec![],
4575                actions: vec![],
4576                optional_backfills: vec![],
4577                vec_inserts: vec![],
4578                operational_writes: vec![],
4579            })
4580            .expect("write");
4581
4582        assert!(
4583            !receipt.provenance_warnings.is_empty(),
4584            "run insert without source_ref must emit a provenance warning"
4585        );
4586    }
4587
4588    // P1c: retire a node AND submit chunks for the same logical_id in one request
4589
4590    #[test]
4591    fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
4592        let db = NamedTempFile::new().expect("temporary db");
4593        let writer = WriterActor::start(
4594            db.path(),
4595            Arc::new(SchemaManager::new()),
4596            ProvenanceMode::Warn,
4597            Arc::new(TelemetryCounters::default()),
4598        )
4599        .expect("writer");
4600
4601        // First seed the node so it exists
4602        writer
4603            .submit(WriteRequest {
4604                label: "seed".to_owned(),
4605                nodes: vec![NodeInsert {
4606                    row_id: "row-1".to_owned(),
4607                    logical_id: "meeting-1".to_owned(),
4608                    kind: "Meeting".to_owned(),
4609                    properties: "{}".to_owned(),
4610                    source_ref: Some("src-1".to_owned()),
4611                    upsert: false,
4612                    chunk_policy: ChunkPolicy::Preserve,
4613                    content_ref: None,
4614                }],
4615                node_retires: vec![],
4616                edges: vec![],
4617                edge_retires: vec![],
4618                chunks: vec![],
4619                runs: vec![],
4620                steps: vec![],
4621                actions: vec![],
4622                optional_backfills: vec![],
4623                vec_inserts: vec![],
4624                operational_writes: vec![],
4625            })
4626            .expect("seed write");
4627
4628        // Now try to retire it AND add a chunk for it in the same request
4629        let result = writer.submit(WriteRequest {
4630            label: "bad".to_owned(),
4631            nodes: vec![],
4632            node_retires: vec![NodeRetire {
4633                logical_id: "meeting-1".to_owned(),
4634                source_ref: Some("src-2".to_owned()),
4635            }],
4636            edges: vec![],
4637            edge_retires: vec![],
4638            chunks: vec![ChunkInsert {
4639                id: "chunk-bad".to_owned(),
4640                node_logical_id: "meeting-1".to_owned(),
4641                text_content: "some text".to_owned(),
4642                byte_start: None,
4643                byte_end: None,
4644                content_hash: None,
4645            }],
4646            runs: vec![],
4647            steps: vec![],
4648            actions: vec![],
4649            optional_backfills: vec![],
4650            vec_inserts: vec![],
4651            operational_writes: vec![],
4652        });
4653
4654        assert!(
4655            matches!(result, Err(EngineError::InvalidWrite(_))),
4656            "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
4657        );
4658    }
4659
4660    // --- Item 1: prepare_cached batch insert ---
4661
4662    #[test]
4663    fn writer_batch_insert_multiple_nodes() {
4664        let db = NamedTempFile::new().expect("temporary db");
4665        let writer = WriterActor::start(
4666            db.path(),
4667            Arc::new(SchemaManager::new()),
4668            ProvenanceMode::Warn,
4669            Arc::new(TelemetryCounters::default()),
4670        )
4671        .expect("writer");
4672
4673        let nodes: Vec<NodeInsert> = (0..100)
4674            .map(|i| NodeInsert {
4675                row_id: format!("row-{i}"),
4676                logical_id: format!("lg-{i}"),
4677                kind: "Note".to_owned(),
4678                properties: "{}".to_owned(),
4679                source_ref: Some("batch-src".to_owned()),
4680                upsert: false,
4681                chunk_policy: ChunkPolicy::Preserve,
4682                content_ref: None,
4683            })
4684            .collect();
4685
4686        writer
4687            .submit(WriteRequest {
4688                label: "batch".to_owned(),
4689                nodes,
4690                node_retires: vec![],
4691                edges: vec![],
4692                edge_retires: vec![],
4693                chunks: vec![],
4694                runs: vec![],
4695                steps: vec![],
4696                actions: vec![],
4697                optional_backfills: vec![],
4698                vec_inserts: vec![],
4699                operational_writes: vec![],
4700            })
4701            .expect("batch write");
4702
4703        let conn = rusqlite::Connection::open(db.path()).expect("open");
4704        let count: i64 = conn
4705            .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
4706            .expect("count nodes");
4707        assert_eq!(
4708            count, 100,
4709            "all 100 nodes must be present after batch insert"
4710        );
4711    }
4712
4713    // --- Item 2: ID validation ---
4714
4715    #[test]
4716    fn prepare_write_rejects_empty_node_row_id() {
4717        let db = NamedTempFile::new().expect("temporary db");
4718        let writer = WriterActor::start(
4719            db.path(),
4720            Arc::new(SchemaManager::new()),
4721            ProvenanceMode::Warn,
4722            Arc::new(TelemetryCounters::default()),
4723        )
4724        .expect("writer");
4725
4726        let result = writer.submit(WriteRequest {
4727            label: "test".to_owned(),
4728            nodes: vec![NodeInsert {
4729                row_id: String::new(),
4730                logical_id: "lg-1".to_owned(),
4731                kind: "Note".to_owned(),
4732                properties: "{}".to_owned(),
4733                source_ref: None,
4734                upsert: false,
4735                chunk_policy: ChunkPolicy::Preserve,
4736                content_ref: None,
4737            }],
4738            node_retires: vec![],
4739            edges: vec![],
4740            edge_retires: vec![],
4741            chunks: vec![],
4742            runs: vec![],
4743            steps: vec![],
4744            actions: vec![],
4745            optional_backfills: vec![],
4746            vec_inserts: vec![],
4747            operational_writes: vec![],
4748        });
4749
4750        assert!(
4751            matches!(result, Err(EngineError::InvalidWrite(_))),
4752            "empty row_id must be rejected"
4753        );
4754    }
4755
4756    #[test]
4757    fn prepare_write_rejects_empty_node_logical_id() {
4758        let db = NamedTempFile::new().expect("temporary db");
4759        let writer = WriterActor::start(
4760            db.path(),
4761            Arc::new(SchemaManager::new()),
4762            ProvenanceMode::Warn,
4763            Arc::new(TelemetryCounters::default()),
4764        )
4765        .expect("writer");
4766
4767        let result = writer.submit(WriteRequest {
4768            label: "test".to_owned(),
4769            nodes: vec![NodeInsert {
4770                row_id: "row-1".to_owned(),
4771                logical_id: String::new(),
4772                kind: "Note".to_owned(),
4773                properties: "{}".to_owned(),
4774                source_ref: None,
4775                upsert: false,
4776                chunk_policy: ChunkPolicy::Preserve,
4777                content_ref: None,
4778            }],
4779            node_retires: vec![],
4780            edges: vec![],
4781            edge_retires: vec![],
4782            chunks: vec![],
4783            runs: vec![],
4784            steps: vec![],
4785            actions: vec![],
4786            optional_backfills: vec![],
4787            vec_inserts: vec![],
4788            operational_writes: vec![],
4789        });
4790
4791        assert!(
4792            matches!(result, Err(EngineError::InvalidWrite(_))),
4793            "empty logical_id must be rejected"
4794        );
4795    }
4796
4797    #[test]
4798    fn prepare_write_rejects_duplicate_row_ids_in_request() {
4799        let db = NamedTempFile::new().expect("temporary db");
4800        let writer = WriterActor::start(
4801            db.path(),
4802            Arc::new(SchemaManager::new()),
4803            ProvenanceMode::Warn,
4804            Arc::new(TelemetryCounters::default()),
4805        )
4806        .expect("writer");
4807
4808        let result = writer.submit(WriteRequest {
4809            label: "test".to_owned(),
4810            nodes: vec![
4811                NodeInsert {
4812                    row_id: "row-1".to_owned(),
4813                    logical_id: "lg-1".to_owned(),
4814                    kind: "Note".to_owned(),
4815                    properties: "{}".to_owned(),
4816                    source_ref: None,
4817                    upsert: false,
4818                    chunk_policy: ChunkPolicy::Preserve,
4819                    content_ref: None,
4820                },
4821                NodeInsert {
4822                    row_id: "row-1".to_owned(), // duplicate
4823                    logical_id: "lg-2".to_owned(),
4824                    kind: "Note".to_owned(),
4825                    properties: "{}".to_owned(),
4826                    source_ref: None,
4827                    upsert: false,
4828                    chunk_policy: ChunkPolicy::Preserve,
4829                    content_ref: None,
4830                },
4831            ],
4832            node_retires: vec![],
4833            edges: vec![],
4834            edge_retires: vec![],
4835            chunks: vec![],
4836            runs: vec![],
4837            steps: vec![],
4838            actions: vec![],
4839            optional_backfills: vec![],
4840            vec_inserts: vec![],
4841            operational_writes: vec![],
4842        });
4843
4844        assert!(
4845            matches!(result, Err(EngineError::InvalidWrite(_))),
4846            "duplicate row_id within request must be rejected"
4847        );
4848    }
4849
4850    #[test]
4851    fn prepare_write_rejects_empty_chunk_id() {
4852        let db = NamedTempFile::new().expect("temporary db");
4853        let writer = WriterActor::start(
4854            db.path(),
4855            Arc::new(SchemaManager::new()),
4856            ProvenanceMode::Warn,
4857            Arc::new(TelemetryCounters::default()),
4858        )
4859        .expect("writer");
4860
4861        let result = writer.submit(WriteRequest {
4862            label: "test".to_owned(),
4863            nodes: vec![NodeInsert {
4864                row_id: "row-1".to_owned(),
4865                logical_id: "lg-1".to_owned(),
4866                kind: "Note".to_owned(),
4867                properties: "{}".to_owned(),
4868                source_ref: None,
4869                upsert: false,
4870                chunk_policy: ChunkPolicy::Preserve,
4871                content_ref: None,
4872            }],
4873            node_retires: vec![],
4874            edges: vec![],
4875            edge_retires: vec![],
4876            chunks: vec![ChunkInsert {
4877                id: String::new(),
4878                node_logical_id: "lg-1".to_owned(),
4879                text_content: "some text".to_owned(),
4880                byte_start: None,
4881                byte_end: None,
4882                content_hash: None,
4883            }],
4884            runs: vec![],
4885            steps: vec![],
4886            actions: vec![],
4887            optional_backfills: vec![],
4888            vec_inserts: vec![],
4889            operational_writes: vec![],
4890        });
4891
4892        assert!(
4893            matches!(result, Err(EngineError::InvalidWrite(_))),
4894            "empty chunk id must be rejected"
4895        );
4896    }
4897
4898    // --- Item 4: provenance warning coverage tests ---
4899
4900    #[test]
4901    fn writer_receipt_warns_on_step_without_source_ref() {
4902        let db = NamedTempFile::new().expect("temporary db");
4903        let writer = WriterActor::start(
4904            db.path(),
4905            Arc::new(SchemaManager::new()),
4906            ProvenanceMode::Warn,
4907            Arc::new(TelemetryCounters::default()),
4908        )
4909        .expect("writer");
4910
4911        // seed a run first so step FK is satisfied
4912        writer
4913            .submit(WriteRequest {
4914                label: "seed-run".to_owned(),
4915                nodes: vec![],
4916                node_retires: vec![],
4917                edges: vec![],
4918                edge_retires: vec![],
4919                chunks: vec![],
4920                runs: vec![RunInsert {
4921                    id: "run-1".to_owned(),
4922                    kind: "session".to_owned(),
4923                    status: "active".to_owned(),
4924                    properties: "{}".to_owned(),
4925                    source_ref: Some("src-1".to_owned()),
4926                    upsert: false,
4927                    supersedes_id: None,
4928                }],
4929                steps: vec![],
4930                actions: vec![],
4931                optional_backfills: vec![],
4932                vec_inserts: vec![],
4933                operational_writes: vec![],
4934            })
4935            .expect("seed run");
4936
4937        let receipt = writer
4938            .submit(WriteRequest {
4939                label: "test".to_owned(),
4940                nodes: vec![],
4941                node_retires: vec![],
4942                edges: vec![],
4943                edge_retires: vec![],
4944                chunks: vec![],
4945                runs: vec![],
4946                steps: vec![StepInsert {
4947                    id: "step-1".to_owned(),
4948                    run_id: "run-1".to_owned(),
4949                    kind: "llm_call".to_owned(),
4950                    status: "completed".to_owned(),
4951                    properties: "{}".to_owned(),
4952                    source_ref: None,
4953                    upsert: false,
4954                    supersedes_id: None,
4955                }],
4956                actions: vec![],
4957                optional_backfills: vec![],
4958                vec_inserts: vec![],
4959                operational_writes: vec![],
4960            })
4961            .expect("write");
4962
4963        assert!(
4964            !receipt.provenance_warnings.is_empty(),
4965            "step insert without source_ref must emit a provenance warning"
4966        );
4967    }
4968
4969    #[test]
4970    fn writer_receipt_warns_on_action_without_source_ref() {
4971        let db = NamedTempFile::new().expect("temporary db");
4972        let writer = WriterActor::start(
4973            db.path(),
4974            Arc::new(SchemaManager::new()),
4975            ProvenanceMode::Warn,
4976            Arc::new(TelemetryCounters::default()),
4977        )
4978        .expect("writer");
4979
4980        // seed run and step so action FK is satisfied
4981        writer
4982            .submit(WriteRequest {
4983                label: "seed".to_owned(),
4984                nodes: vec![],
4985                node_retires: vec![],
4986                edges: vec![],
4987                edge_retires: vec![],
4988                chunks: vec![],
4989                runs: vec![RunInsert {
4990                    id: "run-1".to_owned(),
4991                    kind: "session".to_owned(),
4992                    status: "active".to_owned(),
4993                    properties: "{}".to_owned(),
4994                    source_ref: Some("src-1".to_owned()),
4995                    upsert: false,
4996                    supersedes_id: None,
4997                }],
4998                steps: vec![StepInsert {
4999                    id: "step-1".to_owned(),
5000                    run_id: "run-1".to_owned(),
5001                    kind: "llm_call".to_owned(),
5002                    status: "completed".to_owned(),
5003                    properties: "{}".to_owned(),
5004                    source_ref: Some("src-1".to_owned()),
5005                    upsert: false,
5006                    supersedes_id: None,
5007                }],
5008                actions: vec![],
5009                optional_backfills: vec![],
5010                vec_inserts: vec![],
5011                operational_writes: vec![],
5012            })
5013            .expect("seed");
5014
5015        let receipt = writer
5016            .submit(WriteRequest {
5017                label: "test".to_owned(),
5018                nodes: vec![],
5019                node_retires: vec![],
5020                edges: vec![],
5021                edge_retires: vec![],
5022                chunks: vec![],
5023                runs: vec![],
5024                steps: vec![],
5025                actions: vec![ActionInsert {
5026                    id: "action-1".to_owned(),
5027                    step_id: "step-1".to_owned(),
5028                    kind: "tool_call".to_owned(),
5029                    status: "completed".to_owned(),
5030                    properties: "{}".to_owned(),
5031                    source_ref: None,
5032                    upsert: false,
5033                    supersedes_id: None,
5034                }],
5035                optional_backfills: vec![],
5036                vec_inserts: vec![],
5037                operational_writes: vec![],
5038            })
5039            .expect("write");
5040
5041        assert!(
5042            !receipt.provenance_warnings.is_empty(),
5043            "action insert without source_ref must emit a provenance warning"
5044        );
5045    }
5046
5047    #[test]
5048    fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5049        let db = NamedTempFile::new().expect("temporary db");
5050        let writer = WriterActor::start(
5051            db.path(),
5052            Arc::new(SchemaManager::new()),
5053            ProvenanceMode::Warn,
5054            Arc::new(TelemetryCounters::default()),
5055        )
5056        .expect("writer");
5057
5058        let receipt = writer
5059            .submit(WriteRequest {
5060                label: "test".to_owned(),
5061                nodes: vec![NodeInsert {
5062                    row_id: "row-1".to_owned(),
5063                    logical_id: "node-1".to_owned(),
5064                    kind: "Note".to_owned(),
5065                    properties: "{}".to_owned(),
5066                    source_ref: Some("src-1".to_owned()),
5067                    upsert: false,
5068                    chunk_policy: ChunkPolicy::Preserve,
5069                    content_ref: None,
5070                }],
5071                node_retires: vec![],
5072                edges: vec![],
5073                edge_retires: vec![],
5074                chunks: vec![],
5075                runs: vec![RunInsert {
5076                    id: "run-1".to_owned(),
5077                    kind: "session".to_owned(),
5078                    status: "active".to_owned(),
5079                    properties: "{}".to_owned(),
5080                    source_ref: Some("src-1".to_owned()),
5081                    upsert: false,
5082                    supersedes_id: None,
5083                }],
5084                steps: vec![StepInsert {
5085                    id: "step-1".to_owned(),
5086                    run_id: "run-1".to_owned(),
5087                    kind: "llm_call".to_owned(),
5088                    status: "completed".to_owned(),
5089                    properties: "{}".to_owned(),
5090                    source_ref: Some("src-1".to_owned()),
5091                    upsert: false,
5092                    supersedes_id: None,
5093                }],
5094                actions: vec![ActionInsert {
5095                    id: "action-1".to_owned(),
5096                    step_id: "step-1".to_owned(),
5097                    kind: "tool_call".to_owned(),
5098                    status: "completed".to_owned(),
5099                    properties: "{}".to_owned(),
5100                    source_ref: Some("src-1".to_owned()),
5101                    upsert: false,
5102                    supersedes_id: None,
5103                }],
5104                optional_backfills: vec![],
5105                vec_inserts: vec![],
5106                operational_writes: vec![],
5107            })
5108            .expect("write");
5109
5110        assert!(
5111            receipt.provenance_warnings.is_empty(),
5112            "no warnings expected when all types have source_ref; got: {:?}",
5113            receipt.provenance_warnings
5114        );
5115    }
5116
5117    // --- Item 4 Task 2: ProvenanceMode tests ---
5118
5119    #[test]
5120    fn default_provenance_mode_is_warn() {
5121        // ProvenanceMode::Warn is the Default; a node without source_ref must warn, not error
5122        let db = NamedTempFile::new().expect("temporary db");
5123        let writer = WriterActor::start(
5124            db.path(),
5125            Arc::new(SchemaManager::new()),
5126            ProvenanceMode::default(),
5127            Arc::new(TelemetryCounters::default()),
5128        )
5129        .expect("writer");
5130
5131        let receipt = writer
5132            .submit(WriteRequest {
5133                label: "test".to_owned(),
5134                nodes: vec![NodeInsert {
5135                    row_id: "row-1".to_owned(),
5136                    logical_id: "node-1".to_owned(),
5137                    kind: "Note".to_owned(),
5138                    properties: "{}".to_owned(),
5139                    source_ref: None,
5140                    upsert: false,
5141                    chunk_policy: ChunkPolicy::Preserve,
5142                    content_ref: None,
5143                }],
5144                node_retires: vec![],
5145                edges: vec![],
5146                edge_retires: vec![],
5147                chunks: vec![],
5148                runs: vec![],
5149                steps: vec![],
5150                actions: vec![],
5151                optional_backfills: vec![],
5152                vec_inserts: vec![],
5153                operational_writes: vec![],
5154            })
5155            .expect("Warn mode must not reject missing source_ref");
5156
5157        assert!(
5158            !receipt.provenance_warnings.is_empty(),
5159            "Warn mode must emit a warning instead of rejecting"
5160        );
5161    }
5162
5163    #[test]
5164    fn require_mode_rejects_node_without_source_ref() {
5165        let db = NamedTempFile::new().expect("temporary db");
5166        let writer = WriterActor::start(
5167            db.path(),
5168            Arc::new(SchemaManager::new()),
5169            ProvenanceMode::Require,
5170            Arc::new(TelemetryCounters::default()),
5171        )
5172        .expect("writer");
5173
5174        let result = writer.submit(WriteRequest {
5175            label: "test".to_owned(),
5176            nodes: vec![NodeInsert {
5177                row_id: "row-1".to_owned(),
5178                logical_id: "node-1".to_owned(),
5179                kind: "Note".to_owned(),
5180                properties: "{}".to_owned(),
5181                source_ref: None,
5182                upsert: false,
5183                chunk_policy: ChunkPolicy::Preserve,
5184                content_ref: None,
5185            }],
5186            node_retires: vec![],
5187            edges: vec![],
5188            edge_retires: vec![],
5189            chunks: vec![],
5190            runs: vec![],
5191            steps: vec![],
5192            actions: vec![],
5193            optional_backfills: vec![],
5194            vec_inserts: vec![],
5195            operational_writes: vec![],
5196        });
5197
5198        assert!(
5199            matches!(result, Err(EngineError::InvalidWrite(_))),
5200            "Require mode must reject node without source_ref"
5201        );
5202    }
5203
5204    #[test]
5205    fn require_mode_accepts_node_with_source_ref() {
5206        let db = NamedTempFile::new().expect("temporary db");
5207        let writer = WriterActor::start(
5208            db.path(),
5209            Arc::new(SchemaManager::new()),
5210            ProvenanceMode::Require,
5211            Arc::new(TelemetryCounters::default()),
5212        )
5213        .expect("writer");
5214
5215        let result = writer.submit(WriteRequest {
5216            label: "test".to_owned(),
5217            nodes: vec![NodeInsert {
5218                row_id: "row-1".to_owned(),
5219                logical_id: "node-1".to_owned(),
5220                kind: "Note".to_owned(),
5221                properties: "{}".to_owned(),
5222                source_ref: Some("src-1".to_owned()),
5223                upsert: false,
5224                chunk_policy: ChunkPolicy::Preserve,
5225                content_ref: None,
5226            }],
5227            node_retires: vec![],
5228            edges: vec![],
5229            edge_retires: vec![],
5230            chunks: vec![],
5231            runs: vec![],
5232            steps: vec![],
5233            actions: vec![],
5234            optional_backfills: vec![],
5235            vec_inserts: vec![],
5236            operational_writes: vec![],
5237        });
5238
5239        assert!(
5240            result.is_ok(),
5241            "Require mode must accept node with source_ref"
5242        );
5243    }
5244
5245    #[test]
5246    fn require_mode_rejects_edge_without_source_ref() {
5247        let db = NamedTempFile::new().expect("temporary db");
5248        let writer = WriterActor::start(
5249            db.path(),
5250            Arc::new(SchemaManager::new()),
5251            ProvenanceMode::Require,
5252            Arc::new(TelemetryCounters::default()),
5253        )
5254        .expect("writer");
5255
5256        // seed nodes first so FK check doesn't interfere (Require mode rejects before DB touch)
5257        let result = writer.submit(WriteRequest {
5258            label: "test".to_owned(),
5259            nodes: vec![
5260                NodeInsert {
5261                    row_id: "row-a".to_owned(),
5262                    logical_id: "node-a".to_owned(),
5263                    kind: "Note".to_owned(),
5264                    properties: "{}".to_owned(),
5265                    source_ref: Some("src-1".to_owned()),
5266                    upsert: false,
5267                    chunk_policy: ChunkPolicy::Preserve,
5268                    content_ref: None,
5269                },
5270                NodeInsert {
5271                    row_id: "row-b".to_owned(),
5272                    logical_id: "node-b".to_owned(),
5273                    kind: "Note".to_owned(),
5274                    properties: "{}".to_owned(),
5275                    source_ref: Some("src-1".to_owned()),
5276                    upsert: false,
5277                    chunk_policy: ChunkPolicy::Preserve,
5278                    content_ref: None,
5279                },
5280            ],
5281            node_retires: vec![],
5282            edges: vec![EdgeInsert {
5283                row_id: "edge-row-1".to_owned(),
5284                logical_id: "edge-1".to_owned(),
5285                source_logical_id: "node-a".to_owned(),
5286                target_logical_id: "node-b".to_owned(),
5287                kind: "LINKS_TO".to_owned(),
5288                properties: "{}".to_owned(),
5289                source_ref: None,
5290                upsert: false,
5291            }],
5292            edge_retires: vec![],
5293            chunks: vec![],
5294            runs: vec![],
5295            steps: vec![],
5296            actions: vec![],
5297            optional_backfills: vec![],
5298            vec_inserts: vec![],
5299            operational_writes: vec![],
5300        });
5301
5302        assert!(
5303            matches!(result, Err(EngineError::InvalidWrite(_))),
5304            "Require mode must reject edge without source_ref"
5305        );
5306    }
5307
5308    // --- Item 5: FTS projection coverage tests ---
5309
5310    #[test]
5311    fn fts_row_has_correct_kind_from_co_submitted_node() {
5312        let db = NamedTempFile::new().expect("temporary db");
5313        let writer = WriterActor::start(
5314            db.path(),
5315            Arc::new(SchemaManager::new()),
5316            ProvenanceMode::Warn,
5317            Arc::new(TelemetryCounters::default()),
5318        )
5319        .expect("writer");
5320
5321        writer
5322            .submit(WriteRequest {
5323                label: "test".to_owned(),
5324                nodes: vec![NodeInsert {
5325                    row_id: "row-1".to_owned(),
5326                    logical_id: "node-1".to_owned(),
5327                    kind: "Meeting".to_owned(),
5328                    properties: "{}".to_owned(),
5329                    source_ref: Some("src-1".to_owned()),
5330                    upsert: false,
5331                    chunk_policy: ChunkPolicy::Preserve,
5332                    content_ref: None,
5333                }],
5334                node_retires: vec![],
5335                edges: vec![],
5336                edge_retires: vec![],
5337                chunks: vec![ChunkInsert {
5338                    id: "chunk-1".to_owned(),
5339                    node_logical_id: "node-1".to_owned(),
5340                    text_content: "some text".to_owned(),
5341                    byte_start: None,
5342                    byte_end: None,
5343                    content_hash: None,
5344                }],
5345                runs: vec![],
5346                steps: vec![],
5347                actions: vec![],
5348                optional_backfills: vec![],
5349                vec_inserts: vec![],
5350                operational_writes: vec![],
5351            })
5352            .expect("write");
5353
5354        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5355        let kind: String = conn
5356            .query_row(
5357                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5358                [],
5359                |row| row.get(0),
5360            )
5361            .expect("fts row");
5362
5363        assert_eq!(kind, "Meeting");
5364    }
5365
5366    #[test]
5367    fn fts_row_has_correct_text_content() {
5368        let db = NamedTempFile::new().expect("temporary db");
5369        let writer = WriterActor::start(
5370            db.path(),
5371            Arc::new(SchemaManager::new()),
5372            ProvenanceMode::Warn,
5373            Arc::new(TelemetryCounters::default()),
5374        )
5375        .expect("writer");
5376
5377        writer
5378            .submit(WriteRequest {
5379                label: "test".to_owned(),
5380                nodes: vec![NodeInsert {
5381                    row_id: "row-1".to_owned(),
5382                    logical_id: "node-1".to_owned(),
5383                    kind: "Note".to_owned(),
5384                    properties: "{}".to_owned(),
5385                    source_ref: Some("src-1".to_owned()),
5386                    upsert: false,
5387                    chunk_policy: ChunkPolicy::Preserve,
5388                    content_ref: None,
5389                }],
5390                node_retires: vec![],
5391                edges: vec![],
5392                edge_retires: vec![],
5393                chunks: vec![ChunkInsert {
5394                    id: "chunk-1".to_owned(),
5395                    node_logical_id: "node-1".to_owned(),
5396                    text_content: "exactly this text".to_owned(),
5397                    byte_start: None,
5398                    byte_end: None,
5399                    content_hash: None,
5400                }],
5401                runs: vec![],
5402                steps: vec![],
5403                actions: vec![],
5404                optional_backfills: vec![],
5405                vec_inserts: vec![],
5406                operational_writes: vec![],
5407            })
5408            .expect("write");
5409
5410        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5411        let text: String = conn
5412            .query_row(
5413                "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5414                [],
5415                |row| row.get(0),
5416            )
5417            .expect("fts row");
5418
5419        assert_eq!(text, "exactly this text");
5420    }
5421
5422    #[test]
5423    fn fts_row_has_correct_kind_from_pre_existing_node() {
5424        let db = NamedTempFile::new().expect("temporary db");
5425        let writer = WriterActor::start(
5426            db.path(),
5427            Arc::new(SchemaManager::new()),
5428            ProvenanceMode::Warn,
5429            Arc::new(TelemetryCounters::default()),
5430        )
5431        .expect("writer");
5432
5433        // Request 1: node only
5434        writer
5435            .submit(WriteRequest {
5436                label: "r1".to_owned(),
5437                nodes: vec![NodeInsert {
5438                    row_id: "row-1".to_owned(),
5439                    logical_id: "node-1".to_owned(),
5440                    kind: "Document".to_owned(),
5441                    properties: "{}".to_owned(),
5442                    source_ref: Some("src-1".to_owned()),
5443                    upsert: false,
5444                    chunk_policy: ChunkPolicy::Preserve,
5445                    content_ref: None,
5446                }],
5447                node_retires: vec![],
5448                edges: vec![],
5449                edge_retires: vec![],
5450                chunks: vec![],
5451                runs: vec![],
5452                steps: vec![],
5453                actions: vec![],
5454                optional_backfills: vec![],
5455                vec_inserts: vec![],
5456                operational_writes: vec![],
5457            })
5458            .expect("r1 write");
5459
5460        // Request 2: chunk for pre-existing node
5461        writer
5462            .submit(WriteRequest {
5463                label: "r2".to_owned(),
5464                nodes: vec![],
5465                node_retires: vec![],
5466                edges: vec![],
5467                edge_retires: vec![],
5468                chunks: vec![ChunkInsert {
5469                    id: "chunk-1".to_owned(),
5470                    node_logical_id: "node-1".to_owned(),
5471                    text_content: "some text".to_owned(),
5472                    byte_start: None,
5473                    byte_end: None,
5474                    content_hash: None,
5475                }],
5476                runs: vec![],
5477                steps: vec![],
5478                actions: vec![],
5479                optional_backfills: vec![],
5480                vec_inserts: vec![],
5481                operational_writes: vec![],
5482            })
5483            .expect("r2 write");
5484
5485        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5486        let kind: String = conn
5487            .query_row(
5488                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5489                [],
5490                |row| row.get(0),
5491            )
5492            .expect("fts row");
5493
5494        assert_eq!(kind, "Document");
5495    }
5496
5497    #[test]
5498    fn fts_derives_rows_for_multiple_chunks_per_node() {
5499        let db = NamedTempFile::new().expect("temporary db");
5500        let writer = WriterActor::start(
5501            db.path(),
5502            Arc::new(SchemaManager::new()),
5503            ProvenanceMode::Warn,
5504            Arc::new(TelemetryCounters::default()),
5505        )
5506        .expect("writer");
5507
5508        writer
5509            .submit(WriteRequest {
5510                label: "test".to_owned(),
5511                nodes: vec![NodeInsert {
5512                    row_id: "row-1".to_owned(),
5513                    logical_id: "node-1".to_owned(),
5514                    kind: "Meeting".to_owned(),
5515                    properties: "{}".to_owned(),
5516                    source_ref: Some("src-1".to_owned()),
5517                    upsert: false,
5518                    chunk_policy: ChunkPolicy::Preserve,
5519                    content_ref: None,
5520                }],
5521                node_retires: vec![],
5522                edges: vec![],
5523                edge_retires: vec![],
5524                chunks: vec![
5525                    ChunkInsert {
5526                        id: "chunk-a".to_owned(),
5527                        node_logical_id: "node-1".to_owned(),
5528                        text_content: "intro".to_owned(),
5529                        byte_start: None,
5530                        byte_end: None,
5531                        content_hash: None,
5532                    },
5533                    ChunkInsert {
5534                        id: "chunk-b".to_owned(),
5535                        node_logical_id: "node-1".to_owned(),
5536                        text_content: "body".to_owned(),
5537                        byte_start: None,
5538                        byte_end: None,
5539                        content_hash: None,
5540                    },
5541                    ChunkInsert {
5542                        id: "chunk-c".to_owned(),
5543                        node_logical_id: "node-1".to_owned(),
5544                        text_content: "conclusion".to_owned(),
5545                        byte_start: None,
5546                        byte_end: None,
5547                        content_hash: None,
5548                    },
5549                ],
5550                runs: vec![],
5551                steps: vec![],
5552                actions: vec![],
5553                optional_backfills: vec![],
5554                vec_inserts: vec![],
5555                operational_writes: vec![],
5556            })
5557            .expect("write");
5558
5559        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5560        let count: i64 = conn
5561            .query_row(
5562                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5563                [],
5564                |row| row.get(0),
5565            )
5566            .expect("fts count");
5567
5568        assert_eq!(count, 3, "three chunks must produce three FTS rows");
5569    }
5570
5571    #[test]
5572    fn fts_resolves_mixed_fast_and_db_paths() {
5573        let db = NamedTempFile::new().expect("temporary db");
5574        let writer = WriterActor::start(
5575            db.path(),
5576            Arc::new(SchemaManager::new()),
5577            ProvenanceMode::Warn,
5578            Arc::new(TelemetryCounters::default()),
5579        )
5580        .expect("writer");
5581
5582        // Seed pre-existing node
5583        writer
5584            .submit(WriteRequest {
5585                label: "seed".to_owned(),
5586                nodes: vec![NodeInsert {
5587                    row_id: "row-existing".to_owned(),
5588                    logical_id: "existing-node".to_owned(),
5589                    kind: "Archive".to_owned(),
5590                    properties: "{}".to_owned(),
5591                    source_ref: Some("src-1".to_owned()),
5592                    upsert: false,
5593                    chunk_policy: ChunkPolicy::Preserve,
5594                    content_ref: None,
5595                }],
5596                node_retires: vec![],
5597                edges: vec![],
5598                edge_retires: vec![],
5599                chunks: vec![],
5600                runs: vec![],
5601                steps: vec![],
5602                actions: vec![],
5603                optional_backfills: vec![],
5604                vec_inserts: vec![],
5605                operational_writes: vec![],
5606            })
5607            .expect("seed");
5608
5609        // Mixed request: new node (fast path) + chunk for pre-existing node (DB path)
5610        writer
5611            .submit(WriteRequest {
5612                label: "mixed".to_owned(),
5613                nodes: vec![NodeInsert {
5614                    row_id: "row-new".to_owned(),
5615                    logical_id: "new-node".to_owned(),
5616                    kind: "Inbox".to_owned(),
5617                    properties: "{}".to_owned(),
5618                    source_ref: Some("src-2".to_owned()),
5619                    upsert: false,
5620                    chunk_policy: ChunkPolicy::Preserve,
5621                    content_ref: None,
5622                }],
5623                node_retires: vec![],
5624                edges: vec![],
5625                edge_retires: vec![],
5626                chunks: vec![
5627                    ChunkInsert {
5628                        id: "chunk-fast".to_owned(),
5629                        node_logical_id: "new-node".to_owned(),
5630                        text_content: "new content".to_owned(),
5631                        byte_start: None,
5632                        byte_end: None,
5633                        content_hash: None,
5634                    },
5635                    ChunkInsert {
5636                        id: "chunk-db".to_owned(),
5637                        node_logical_id: "existing-node".to_owned(),
5638                        text_content: "archive content".to_owned(),
5639                        byte_start: None,
5640                        byte_end: None,
5641                        content_hash: None,
5642                    },
5643                ],
5644                runs: vec![],
5645                steps: vec![],
5646                actions: vec![],
5647                optional_backfills: vec![],
5648                vec_inserts: vec![],
5649                operational_writes: vec![],
5650            })
5651            .expect("mixed write");
5652
5653        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5654        let fast_kind: String = conn
5655            .query_row(
5656                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
5657                [],
5658                |row| row.get(0),
5659            )
5660            .expect("fast path fts row");
5661        let db_kind: String = conn
5662            .query_row(
5663                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
5664                [],
5665                |row| row.get(0),
5666            )
5667            .expect("db path fts row");
5668
5669        assert_eq!(fast_kind, "Inbox");
5670        assert_eq!(db_kind, "Archive");
5671    }
5672
5673    #[test]
5674    fn prepare_write_rejects_empty_chunk_text() {
5675        let db = NamedTempFile::new().expect("temporary db");
5676        let writer = WriterActor::start(
5677            db.path(),
5678            Arc::new(SchemaManager::new()),
5679            ProvenanceMode::Warn,
5680            Arc::new(TelemetryCounters::default()),
5681        )
5682        .expect("writer");
5683
5684        let result = writer.submit(WriteRequest {
5685            label: "test".to_owned(),
5686            nodes: vec![NodeInsert {
5687                row_id: "row-1".to_owned(),
5688                logical_id: "node-1".to_owned(),
5689                kind: "Note".to_owned(),
5690                properties: "{}".to_owned(),
5691                source_ref: None,
5692                upsert: false,
5693                chunk_policy: ChunkPolicy::Preserve,
5694                content_ref: None,
5695            }],
5696            node_retires: vec![],
5697            edges: vec![],
5698            edge_retires: vec![],
5699            chunks: vec![ChunkInsert {
5700                id: "chunk-1".to_owned(),
5701                node_logical_id: "node-1".to_owned(),
5702                text_content: String::new(),
5703                byte_start: None,
5704                byte_end: None,
5705                content_hash: None,
5706            }],
5707            runs: vec![],
5708            steps: vec![],
5709            actions: vec![],
5710            optional_backfills: vec![],
5711            vec_inserts: vec![],
5712            operational_writes: vec![],
5713        });
5714
5715        assert!(
5716            matches!(result, Err(EngineError::InvalidWrite(_))),
5717            "empty text_content must be rejected"
5718        );
5719    }
5720
5721    #[test]
5722    fn receipt_reports_zero_backfills_when_none_submitted() {
5723        let db = NamedTempFile::new().expect("temporary db");
5724        let writer = WriterActor::start(
5725            db.path(),
5726            Arc::new(SchemaManager::new()),
5727            ProvenanceMode::Warn,
5728            Arc::new(TelemetryCounters::default()),
5729        )
5730        .expect("writer");
5731
5732        let receipt = writer
5733            .submit(WriteRequest {
5734                label: "test".to_owned(),
5735                nodes: vec![NodeInsert {
5736                    row_id: "row-1".to_owned(),
5737                    logical_id: "node-1".to_owned(),
5738                    kind: "Note".to_owned(),
5739                    properties: "{}".to_owned(),
5740                    source_ref: Some("src-1".to_owned()),
5741                    upsert: false,
5742                    chunk_policy: ChunkPolicy::Preserve,
5743                    content_ref: None,
5744                }],
5745                node_retires: vec![],
5746                edges: vec![],
5747                edge_retires: vec![],
5748                chunks: vec![],
5749                runs: vec![],
5750                steps: vec![],
5751                actions: vec![],
5752                optional_backfills: vec![],
5753                vec_inserts: vec![],
5754                operational_writes: vec![],
5755            })
5756            .expect("write");
5757
5758        assert_eq!(receipt.optional_backfill_count, 0);
5759    }
5760
5761    #[test]
5762    fn receipt_reports_correct_backfill_count() {
5763        let db = NamedTempFile::new().expect("temporary db");
5764        let writer = WriterActor::start(
5765            db.path(),
5766            Arc::new(SchemaManager::new()),
5767            ProvenanceMode::Warn,
5768            Arc::new(TelemetryCounters::default()),
5769        )
5770        .expect("writer");
5771
5772        let receipt = writer
5773            .submit(WriteRequest {
5774                label: "test".to_owned(),
5775                nodes: vec![NodeInsert {
5776                    row_id: "row-1".to_owned(),
5777                    logical_id: "node-1".to_owned(),
5778                    kind: "Note".to_owned(),
5779                    properties: "{}".to_owned(),
5780                    source_ref: Some("src-1".to_owned()),
5781                    upsert: false,
5782                    chunk_policy: ChunkPolicy::Preserve,
5783                    content_ref: None,
5784                }],
5785                node_retires: vec![],
5786                edges: vec![],
5787                edge_retires: vec![],
5788                chunks: vec![],
5789                runs: vec![],
5790                steps: vec![],
5791                actions: vec![],
5792                optional_backfills: vec![
5793                    OptionalProjectionTask {
5794                        target: ProjectionTarget::Fts,
5795                        payload: "p1".to_owned(),
5796                    },
5797                    OptionalProjectionTask {
5798                        target: ProjectionTarget::Vec,
5799                        payload: "p2".to_owned(),
5800                    },
5801                    OptionalProjectionTask {
5802                        target: ProjectionTarget::All,
5803                        payload: "p3".to_owned(),
5804                    },
5805                ],
5806                vec_inserts: vec![],
5807                operational_writes: vec![],
5808            })
5809            .expect("write");
5810
5811        assert_eq!(receipt.optional_backfill_count, 3);
5812    }
5813
5814    #[test]
5815    fn backfill_tasks_are_not_executed_during_write() {
5816        let db = NamedTempFile::new().expect("temporary db");
5817        let writer = WriterActor::start(
5818            db.path(),
5819            Arc::new(SchemaManager::new()),
5820            ProvenanceMode::Warn,
5821            Arc::new(TelemetryCounters::default()),
5822        )
5823        .expect("writer");
5824
5825        // Write a node + chunk. Submit a backfill task targeting FTS.
5826        // The write path must not create any extra FTS rows beyond the required one.
5827        writer
5828            .submit(WriteRequest {
5829                label: "test".to_owned(),
5830                nodes: vec![NodeInsert {
5831                    row_id: "row-1".to_owned(),
5832                    logical_id: "node-1".to_owned(),
5833                    kind: "Note".to_owned(),
5834                    properties: "{}".to_owned(),
5835                    source_ref: Some("src-1".to_owned()),
5836                    upsert: false,
5837                    chunk_policy: ChunkPolicy::Preserve,
5838                    content_ref: None,
5839                }],
5840                node_retires: vec![],
5841                edges: vec![],
5842                edge_retires: vec![],
5843                chunks: vec![ChunkInsert {
5844                    id: "chunk-1".to_owned(),
5845                    node_logical_id: "node-1".to_owned(),
5846                    text_content: "required text".to_owned(),
5847                    byte_start: None,
5848                    byte_end: None,
5849                    content_hash: None,
5850                }],
5851                runs: vec![],
5852                steps: vec![],
5853                actions: vec![],
5854                optional_backfills: vec![OptionalProjectionTask {
5855                    target: ProjectionTarget::Fts,
5856                    payload: "backfill-payload".to_owned(),
5857                }],
5858                vec_inserts: vec![],
5859                operational_writes: vec![],
5860            })
5861            .expect("write");
5862
5863        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5864        let count: i64 = conn
5865            .query_row(
5866                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5867                [],
5868                |row| row.get(0),
5869            )
5870            .expect("fts count");
5871
5872        assert_eq!(count, 1, "backfill task must not create extra FTS rows");
5873    }
5874
5875    #[test]
5876    fn fts_row_uses_new_kind_after_node_replace() {
5877        let db = NamedTempFile::new().expect("temporary db");
5878        let writer = WriterActor::start(
5879            db.path(),
5880            Arc::new(SchemaManager::new()),
5881            ProvenanceMode::Warn,
5882            Arc::new(TelemetryCounters::default()),
5883        )
5884        .expect("writer");
5885
5886        // Write original node as "Note"
5887        writer
5888            .submit(WriteRequest {
5889                label: "v1".to_owned(),
5890                nodes: vec![NodeInsert {
5891                    row_id: "row-1".to_owned(),
5892                    logical_id: "node-1".to_owned(),
5893                    kind: "Note".to_owned(),
5894                    properties: "{}".to_owned(),
5895                    source_ref: Some("src-1".to_owned()),
5896                    upsert: false,
5897                    chunk_policy: ChunkPolicy::Preserve,
5898                    content_ref: None,
5899                }],
5900                node_retires: vec![],
5901                edges: vec![],
5902                edge_retires: vec![],
5903                chunks: vec![ChunkInsert {
5904                    id: "chunk-v1".to_owned(),
5905                    node_logical_id: "node-1".to_owned(),
5906                    text_content: "original".to_owned(),
5907                    byte_start: None,
5908                    byte_end: None,
5909                    content_hash: None,
5910                }],
5911                runs: vec![],
5912                steps: vec![],
5913                actions: vec![],
5914                optional_backfills: vec![],
5915                vec_inserts: vec![],
5916                operational_writes: vec![],
5917            })
5918            .expect("v1 write");
5919
5920        // Replace with "Meeting" kind + new chunk using ChunkPolicy::Replace
5921        writer
5922            .submit(WriteRequest {
5923                label: "v2".to_owned(),
5924                nodes: vec![NodeInsert {
5925                    row_id: "row-2".to_owned(),
5926                    logical_id: "node-1".to_owned(),
5927                    kind: "Meeting".to_owned(),
5928                    properties: "{}".to_owned(),
5929                    source_ref: Some("src-2".to_owned()),
5930                    upsert: true,
5931                    chunk_policy: ChunkPolicy::Replace,
5932                    content_ref: None,
5933                }],
5934                node_retires: vec![],
5935                edges: vec![],
5936                edge_retires: vec![],
5937                chunks: vec![ChunkInsert {
5938                    id: "chunk-v2".to_owned(),
5939                    node_logical_id: "node-1".to_owned(),
5940                    text_content: "updated".to_owned(),
5941                    byte_start: None,
5942                    byte_end: None,
5943                    content_hash: None,
5944                }],
5945                runs: vec![],
5946                steps: vec![],
5947                actions: vec![],
5948                optional_backfills: vec![],
5949                vec_inserts: vec![],
5950                operational_writes: vec![],
5951            })
5952            .expect("v2 write");
5953
5954        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5955
5956        // Old FTS row must be gone
5957        let old_count: i64 = conn
5958            .query_row(
5959                "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
5960                [],
5961                |row| row.get(0),
5962            )
5963            .expect("old fts count");
5964        assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
5965
5966        // New FTS row must use the new kind
5967        let new_kind: String = conn
5968            .query_row(
5969                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
5970                [],
5971                |row| row.get(0),
5972            )
5973            .expect("new fts row");
5974        assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
5975    }
5976
5977    // --- Item 3: VecInsert tests ---
5978
5979    #[test]
5980    fn vec_insert_empty_chunk_id_is_rejected() {
5981        let db = NamedTempFile::new().expect("temporary db");
5982        let writer = WriterActor::start(
5983            db.path(),
5984            Arc::new(SchemaManager::new()),
5985            ProvenanceMode::Warn,
5986            Arc::new(TelemetryCounters::default()),
5987        )
5988        .expect("writer");
5989        let result = writer.submit(WriteRequest {
5990            label: "vec-test".to_owned(),
5991            nodes: vec![],
5992            node_retires: vec![],
5993            edges: vec![],
5994            edge_retires: vec![],
5995            chunks: vec![],
5996            runs: vec![],
5997            steps: vec![],
5998            actions: vec![],
5999            optional_backfills: vec![],
6000            vec_inserts: vec![VecInsert {
6001                chunk_id: String::new(),
6002                embedding: vec![0.1, 0.2, 0.3],
6003            }],
6004            operational_writes: vec![],
6005        });
6006        assert!(
6007            matches!(result, Err(EngineError::InvalidWrite(_))),
6008            "empty chunk_id in VecInsert must be rejected"
6009        );
6010    }
6011
6012    #[test]
6013    fn vec_insert_empty_embedding_is_rejected() {
6014        let db = NamedTempFile::new().expect("temporary db");
6015        let writer = WriterActor::start(
6016            db.path(),
6017            Arc::new(SchemaManager::new()),
6018            ProvenanceMode::Warn,
6019            Arc::new(TelemetryCounters::default()),
6020        )
6021        .expect("writer");
6022        let result = writer.submit(WriteRequest {
6023            label: "vec-test".to_owned(),
6024            nodes: vec![],
6025            node_retires: vec![],
6026            edges: vec![],
6027            edge_retires: vec![],
6028            chunks: vec![],
6029            runs: vec![],
6030            steps: vec![],
6031            actions: vec![],
6032            optional_backfills: vec![],
6033            vec_inserts: vec![VecInsert {
6034                chunk_id: "chunk-1".to_owned(),
6035                embedding: vec![],
6036            }],
6037            operational_writes: vec![],
6038        });
6039        assert!(
6040            matches!(result, Err(EngineError::InvalidWrite(_))),
6041            "empty embedding in VecInsert must be rejected"
6042        );
6043    }
6044
6045    #[test]
6046    fn vec_insert_noop_without_feature() {
6047        // Without the sqlite-vec feature, a well-formed VecInsert must succeed
6048        // (no error) but not write to any vec table.
6049        let db = NamedTempFile::new().expect("temporary db");
6050        let writer = WriterActor::start(
6051            db.path(),
6052            Arc::new(SchemaManager::new()),
6053            ProvenanceMode::Warn,
6054            Arc::new(TelemetryCounters::default()),
6055        )
6056        .expect("writer");
6057        let result = writer.submit(WriteRequest {
6058            label: "vec-noop".to_owned(),
6059            nodes: vec![],
6060            node_retires: vec![],
6061            edges: vec![],
6062            edge_retires: vec![],
6063            chunks: vec![],
6064            runs: vec![],
6065            steps: vec![],
6066            actions: vec![],
6067            optional_backfills: vec![],
6068            vec_inserts: vec![VecInsert {
6069                chunk_id: "chunk-noop".to_owned(),
6070                embedding: vec![1.0, 2.0, 3.0],
6071            }],
6072            operational_writes: vec![],
6073        });
6074        #[cfg(not(feature = "sqlite-vec"))]
6075        result.expect("noop VecInsert without feature must succeed");
6076        // The result variable is used above; silence unused warning for cfg-on path.
6077        #[cfg(feature = "sqlite-vec")]
6078        let _ = result;
6079    }
6080
6081    #[cfg(feature = "sqlite-vec")]
6082    #[test]
6083    fn node_retire_preserves_vec_rows_for_later_restore() {
6084        use crate::sqlite::open_connection_with_vec;
6085
6086        let db = NamedTempFile::new().expect("temporary db");
6087        let schema_manager = Arc::new(SchemaManager::new());
6088
6089        {
6090            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6091            schema_manager.bootstrap(&conn).expect("bootstrap");
6092            schema_manager
6093                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6094                .expect("ensure profile");
6095        }
6096
6097        let writer = WriterActor::start(
6098            db.path(),
6099            Arc::clone(&schema_manager),
6100            ProvenanceMode::Warn,
6101            Arc::new(TelemetryCounters::default()),
6102        )
6103        .expect("writer");
6104
6105        // Insert node + chunk + vec row
6106        writer
6107            .submit(WriteRequest {
6108                label: "setup".to_owned(),
6109                nodes: vec![NodeInsert {
6110                    row_id: "row-retire-vec".to_owned(),
6111                    logical_id: "node-retire-vec".to_owned(),
6112                    kind: "Doc".to_owned(),
6113                    properties: "{}".to_owned(),
6114                    source_ref: Some("src".to_owned()),
6115                    upsert: false,
6116                    chunk_policy: ChunkPolicy::Preserve,
6117                    content_ref: None,
6118                }],
6119                node_retires: vec![],
6120                edges: vec![],
6121                edge_retires: vec![],
6122                chunks: vec![ChunkInsert {
6123                    id: "chunk-retire-vec".to_owned(),
6124                    node_logical_id: "node-retire-vec".to_owned(),
6125                    text_content: "text".to_owned(),
6126                    byte_start: None,
6127                    byte_end: None,
6128                    content_hash: None,
6129                }],
6130                runs: vec![],
6131                steps: vec![],
6132                actions: vec![],
6133                optional_backfills: vec![],
6134                vec_inserts: vec![VecInsert {
6135                    chunk_id: "chunk-retire-vec".to_owned(),
6136                    embedding: vec![0.1, 0.2, 0.3],
6137                }],
6138                operational_writes: vec![],
6139            })
6140            .expect("setup write");
6141
6142        // Retire the node
6143        writer
6144            .submit(WriteRequest {
6145                label: "retire".to_owned(),
6146                nodes: vec![],
6147                node_retires: vec![NodeRetire {
6148                    logical_id: "node-retire-vec".to_owned(),
6149                    source_ref: Some("src".to_owned()),
6150                }],
6151                edges: vec![],
6152                edge_retires: vec![],
6153                chunks: vec![],
6154                runs: vec![],
6155                steps: vec![],
6156                actions: vec![],
6157                optional_backfills: vec![],
6158                vec_inserts: vec![],
6159                operational_writes: vec![],
6160            })
6161            .expect("retire write");
6162
6163        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6164        let count: i64 = conn
6165            .query_row(
6166                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-retire-vec'",
6167                [],
6168                |row| row.get(0),
6169            )
6170            .expect("count");
6171        assert_eq!(
6172            count, 1,
6173            "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6174        );
6175    }
6176
6177    #[cfg(feature = "sqlite-vec")]
6178    #[test]
6179    fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6180        use crate::sqlite::open_connection_with_vec;
6181
6182        let db = NamedTempFile::new().expect("temporary db");
6183        let schema_manager = Arc::new(SchemaManager::new());
6184
6185        {
6186            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6187            schema_manager.bootstrap(&conn).expect("bootstrap");
6188            schema_manager
6189                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6190                .expect("ensure profile");
6191        }
6192
6193        let writer = WriterActor::start(
6194            db.path(),
6195            Arc::clone(&schema_manager),
6196            ProvenanceMode::Warn,
6197            Arc::new(TelemetryCounters::default()),
6198        )
6199        .expect("writer");
6200
6201        // Insert node + chunk-A + vec-A
6202        writer
6203            .submit(WriteRequest {
6204                label: "v1".to_owned(),
6205                nodes: vec![NodeInsert {
6206                    row_id: "row-replace-v1".to_owned(),
6207                    logical_id: "node-replace-vec".to_owned(),
6208                    kind: "Doc".to_owned(),
6209                    properties: "{}".to_owned(),
6210                    source_ref: Some("src".to_owned()),
6211                    upsert: false,
6212                    chunk_policy: ChunkPolicy::Preserve,
6213                    content_ref: None,
6214                }],
6215                node_retires: vec![],
6216                edges: vec![],
6217                edge_retires: vec![],
6218                chunks: vec![ChunkInsert {
6219                    id: "chunk-replace-A".to_owned(),
6220                    node_logical_id: "node-replace-vec".to_owned(),
6221                    text_content: "version one".to_owned(),
6222                    byte_start: None,
6223                    byte_end: None,
6224                    content_hash: None,
6225                }],
6226                runs: vec![],
6227                steps: vec![],
6228                actions: vec![],
6229                optional_backfills: vec![],
6230                vec_inserts: vec![VecInsert {
6231                    chunk_id: "chunk-replace-A".to_owned(),
6232                    embedding: vec![0.1, 0.2, 0.3],
6233                }],
6234                operational_writes: vec![],
6235            })
6236            .expect("v1 write");
6237
6238        // Upsert with Replace + chunk-B + vec-B
6239        writer
6240            .submit(WriteRequest {
6241                label: "v2".to_owned(),
6242                nodes: vec![NodeInsert {
6243                    row_id: "row-replace-v2".to_owned(),
6244                    logical_id: "node-replace-vec".to_owned(),
6245                    kind: "Doc".to_owned(),
6246                    properties: "{}".to_owned(),
6247                    source_ref: Some("src".to_owned()),
6248                    upsert: true,
6249                    chunk_policy: ChunkPolicy::Replace,
6250                    content_ref: None,
6251                }],
6252                node_retires: vec![],
6253                edges: vec![],
6254                edge_retires: vec![],
6255                chunks: vec![ChunkInsert {
6256                    id: "chunk-replace-B".to_owned(),
6257                    node_logical_id: "node-replace-vec".to_owned(),
6258                    text_content: "version two".to_owned(),
6259                    byte_start: None,
6260                    byte_end: None,
6261                    content_hash: None,
6262                }],
6263                runs: vec![],
6264                steps: vec![],
6265                actions: vec![],
6266                optional_backfills: vec![],
6267                vec_inserts: vec![VecInsert {
6268                    chunk_id: "chunk-replace-B".to_owned(),
6269                    embedding: vec![0.4, 0.5, 0.6],
6270                }],
6271                operational_writes: vec![],
6272            })
6273            .expect("v2 write");
6274
6275        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6276        let count_a: i64 = conn
6277            .query_row(
6278                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-A'",
6279                [],
6280                |row| row.get(0),
6281            )
6282            .expect("count A");
6283        let count_b: i64 = conn
6284            .query_row(
6285                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-B'",
6286                [],
6287                |row| row.get(0),
6288            )
6289            .expect("count B");
6290        assert_eq!(
6291            count_a, 0,
6292            "old vec row (chunk-A) must be deleted on Replace"
6293        );
6294        assert_eq!(
6295            count_b, 1,
6296            "new vec row (chunk-B) must be present after Replace"
6297        );
6298    }
6299
6300    #[cfg(feature = "sqlite-vec")]
6301    #[test]
6302    fn vec_insert_is_persisted_when_feature_enabled() {
6303        use crate::sqlite::open_connection_with_vec;
6304
6305        let db = NamedTempFile::new().expect("temporary db");
6306        let schema_manager = Arc::new(SchemaManager::new());
6307
6308        // Open a vec-capable connection and bootstrap + ensure profile
6309        {
6310            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6311            schema_manager.bootstrap(&conn).expect("bootstrap");
6312            schema_manager
6313                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6314                .expect("ensure profile");
6315        }
6316
6317        let writer = WriterActor::start(
6318            db.path(),
6319            Arc::clone(&schema_manager),
6320            ProvenanceMode::Warn,
6321            Arc::new(TelemetryCounters::default()),
6322        )
6323        .expect("writer");
6324
6325        writer
6326            .submit(WriteRequest {
6327                label: "vec-insert".to_owned(),
6328                nodes: vec![],
6329                node_retires: vec![],
6330                edges: vec![],
6331                edge_retires: vec![],
6332                chunks: vec![],
6333                runs: vec![],
6334                steps: vec![],
6335                actions: vec![],
6336                optional_backfills: vec![],
6337                vec_inserts: vec![VecInsert {
6338                    chunk_id: "chunk-vec".to_owned(),
6339                    embedding: vec![0.1, 0.2, 0.3],
6340                }],
6341                operational_writes: vec![],
6342            })
6343            .expect("vec insert write");
6344
6345        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6346        let count: i64 = conn
6347            .query_row(
6348                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-vec'",
6349                [],
6350                |row| row.get(0),
6351            )
6352            .expect("count");
6353        assert_eq!(count, 1, "VecInsert must persist a row in vec_nodes_active");
6354    }
6355
6356    // --- WriteRequest size validation tests ---
6357
6358    #[test]
6359    fn write_request_exceeding_node_limit_is_rejected() {
6360        let nodes: Vec<NodeInsert> = (0..10_001)
6361            .map(|i| NodeInsert {
6362                row_id: format!("row-{i}"),
6363                logical_id: format!("lg-{i}"),
6364                kind: "Note".to_owned(),
6365                properties: "{}".to_owned(),
6366                source_ref: None,
6367                upsert: false,
6368                chunk_policy: ChunkPolicy::Preserve,
6369                content_ref: None,
6370            })
6371            .collect();
6372
6373        let request = WriteRequest {
6374            label: "too-many-nodes".to_owned(),
6375            nodes,
6376            node_retires: vec![],
6377            edges: vec![],
6378            edge_retires: vec![],
6379            chunks: vec![],
6380            runs: vec![],
6381            steps: vec![],
6382            actions: vec![],
6383            optional_backfills: vec![],
6384            vec_inserts: vec![],
6385            operational_writes: vec![],
6386        };
6387
6388        let result = prepare_write(request, ProvenanceMode::Warn)
6389            .map(|_| ())
6390            .map_err(|e| format!("{e}"));
6391        assert!(
6392            matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6393            "exceeding node limit must return InvalidWrite: got {result:?}"
6394        );
6395    }
6396
6397    #[test]
6398    fn write_request_exceeding_total_limit_is_rejected() {
6399        // Spread items across fields to exceed 100_000 total
6400        // without exceeding any single per-field limit.
6401        // nodes(10k) + edges(10k) + chunks(50k) + vec_inserts(20001) + operational(10k) = 100_001
6402        let request = WriteRequest {
6403            label: "too-many-total".to_owned(),
6404            nodes: (0..10_000)
6405                .map(|i| NodeInsert {
6406                    row_id: format!("row-{i}"),
6407                    logical_id: format!("lg-{i}"),
6408                    kind: "Note".to_owned(),
6409                    properties: "{}".to_owned(),
6410                    source_ref: None,
6411                    upsert: false,
6412                    chunk_policy: ChunkPolicy::Preserve,
6413                    content_ref: None,
6414                })
6415                .collect(),
6416            node_retires: vec![],
6417            edges: (0..10_000)
6418                .map(|i| EdgeInsert {
6419                    row_id: format!("edge-row-{i}"),
6420                    logical_id: format!("edge-lg-{i}"),
6421                    kind: "link".to_owned(),
6422                    source_logical_id: format!("lg-{i}"),
6423                    target_logical_id: format!("lg-{}", i + 1),
6424                    properties: "{}".to_owned(),
6425                    source_ref: None,
6426                    upsert: false,
6427                })
6428                .collect(),
6429            edge_retires: vec![],
6430            chunks: (0..50_000)
6431                .map(|i| ChunkInsert {
6432                    id: format!("chunk-{i}"),
6433                    node_logical_id: "lg-0".to_owned(),
6434                    text_content: "text".to_owned(),
6435                    byte_start: None,
6436                    byte_end: None,
6437                    content_hash: None,
6438                })
6439                .collect(),
6440            runs: vec![],
6441            steps: vec![],
6442            actions: vec![],
6443            optional_backfills: vec![],
6444            vec_inserts: (0..20_001)
6445                .map(|i| VecInsert {
6446                    chunk_id: format!("vec-chunk-{i}"),
6447                    embedding: vec![0.1],
6448                })
6449                .collect(),
6450            operational_writes: (0..10_000)
6451                .map(|i| OperationalWrite::Append {
6452                    collection: format!("col-{i}"),
6453                    record_key: format!("key-{i}"),
6454                    payload_json: "{}".to_owned(),
6455                    source_ref: None,
6456                })
6457                .collect(),
6458        };
6459
6460        let result = prepare_write(request, ProvenanceMode::Warn)
6461            .map(|_| ())
6462            .map_err(|e| format!("{e}"));
6463        assert!(
6464            matches!(result, Err(ref msg) if msg.contains("too many total items")),
6465            "exceeding total item limit must return InvalidWrite: got {result:?}"
6466        );
6467    }
6468
6469    #[test]
6470    fn write_request_within_limits_succeeds() {
6471        let db = NamedTempFile::new().expect("temporary db");
6472        let writer = WriterActor::start(
6473            db.path(),
6474            Arc::new(SchemaManager::new()),
6475            ProvenanceMode::Warn,
6476            Arc::new(TelemetryCounters::default()),
6477        )
6478        .expect("writer");
6479
6480        let result = writer.submit(WriteRequest {
6481            label: "within-limits".to_owned(),
6482            nodes: vec![NodeInsert {
6483                row_id: "row-1".to_owned(),
6484                logical_id: "lg-1".to_owned(),
6485                kind: "Note".to_owned(),
6486                properties: "{}".to_owned(),
6487                source_ref: None,
6488                upsert: false,
6489                chunk_policy: ChunkPolicy::Preserve,
6490                content_ref: None,
6491            }],
6492            node_retires: vec![],
6493            edges: vec![],
6494            edge_retires: vec![],
6495            chunks: vec![],
6496            runs: vec![],
6497            steps: vec![],
6498            actions: vec![],
6499            optional_backfills: vec![],
6500            vec_inserts: vec![],
6501            operational_writes: vec![],
6502        });
6503
6504        assert!(
6505            result.is_ok(),
6506            "write request within limits must succeed: got {result:?}"
6507        );
6508    }
6509
6510    #[test]
6511    fn property_fts_rows_created_on_node_insert() {
6512        let db = NamedTempFile::new().expect("temporary db");
6513        // Bootstrap and register a property schema before starting the writer.
6514        let schema = Arc::new(SchemaManager::new());
6515        {
6516            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6517            schema.bootstrap(&conn).expect("bootstrap");
6518            conn.execute(
6519                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6520                 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
6521                [],
6522            )
6523            .expect("register schema");
6524        }
6525        let writer = WriterActor::start(
6526            db.path(),
6527            Arc::clone(&schema),
6528            ProvenanceMode::Warn,
6529            Arc::new(TelemetryCounters::default()),
6530        )
6531        .expect("writer");
6532
6533        writer
6534            .submit(WriteRequest {
6535                label: "goal-insert".to_owned(),
6536                nodes: vec![NodeInsert {
6537                    row_id: "row-1".to_owned(),
6538                    logical_id: "goal-1".to_owned(),
6539                    kind: "Goal".to_owned(),
6540                    properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
6541                        .to_owned(),
6542                    source_ref: Some("src-1".to_owned()),
6543                    upsert: false,
6544                    chunk_policy: ChunkPolicy::Preserve,
6545                    content_ref: None,
6546                }],
6547                node_retires: vec![],
6548                edges: vec![],
6549                edge_retires: vec![],
6550                chunks: vec![],
6551                runs: vec![],
6552                steps: vec![],
6553                actions: vec![],
6554                optional_backfills: vec![],
6555                vec_inserts: vec![],
6556                operational_writes: vec![],
6557            })
6558            .expect("write");
6559
6560        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6561        let text: String = conn
6562            .query_row(
6563                "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6564                [],
6565                |row| row.get(0),
6566            )
6567            .expect("property FTS row must exist");
6568        assert_eq!(text, "Ship v2 Launch the redesign");
6569    }
6570
6571    #[test]
6572    fn property_fts_rows_replaced_on_upsert() {
6573        let db = NamedTempFile::new().expect("temporary db");
6574        let schema = Arc::new(SchemaManager::new());
6575        {
6576            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6577            schema.bootstrap(&conn).expect("bootstrap");
6578            conn.execute(
6579                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6580                 VALUES ('Goal', '[\"$.name\"]', ' ')",
6581                [],
6582            )
6583            .expect("register schema");
6584        }
6585        let writer = WriterActor::start(
6586            db.path(),
6587            Arc::clone(&schema),
6588            ProvenanceMode::Warn,
6589            Arc::new(TelemetryCounters::default()),
6590        )
6591        .expect("writer");
6592
6593        // Initial insert.
6594        writer
6595            .submit(WriteRequest {
6596                label: "insert".to_owned(),
6597                nodes: vec![NodeInsert {
6598                    row_id: "row-1".to_owned(),
6599                    logical_id: "goal-1".to_owned(),
6600                    kind: "Goal".to_owned(),
6601                    properties: r#"{"name":"Alpha"}"#.to_owned(),
6602                    source_ref: Some("src-1".to_owned()),
6603                    upsert: false,
6604                    chunk_policy: ChunkPolicy::Preserve,
6605                    content_ref: None,
6606                }],
6607                node_retires: vec![],
6608                edges: vec![],
6609                edge_retires: vec![],
6610                chunks: vec![],
6611                runs: vec![],
6612                steps: vec![],
6613                actions: vec![],
6614                optional_backfills: vec![],
6615                vec_inserts: vec![],
6616                operational_writes: vec![],
6617            })
6618            .expect("insert");
6619
6620        // Upsert with changed property.
6621        writer
6622            .submit(WriteRequest {
6623                label: "upsert".to_owned(),
6624                nodes: vec![NodeInsert {
6625                    row_id: "row-2".to_owned(),
6626                    logical_id: "goal-1".to_owned(),
6627                    kind: "Goal".to_owned(),
6628                    properties: r#"{"name":"Beta"}"#.to_owned(),
6629                    source_ref: Some("src-2".to_owned()),
6630                    upsert: true,
6631                    chunk_policy: ChunkPolicy::Preserve,
6632                    content_ref: None,
6633                }],
6634                node_retires: vec![],
6635                edges: vec![],
6636                edge_retires: vec![],
6637                chunks: vec![],
6638                runs: vec![],
6639                steps: vec![],
6640                actions: vec![],
6641                optional_backfills: vec![],
6642                vec_inserts: vec![],
6643                operational_writes: vec![],
6644            })
6645            .expect("upsert");
6646
6647        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6648        let count: i64 = conn
6649            .query_row(
6650                "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6651                [],
6652                |row| row.get(0),
6653            )
6654            .expect("count");
6655        assert_eq!(
6656            count, 1,
6657            "must have exactly one property FTS row after upsert"
6658        );
6659
6660        let text: String = conn
6661            .query_row(
6662                "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6663                [],
6664                |row| row.get(0),
6665            )
6666            .expect("text");
6667        assert_eq!(text, "Beta", "property FTS must reflect updated properties");
6668    }
6669
6670    #[test]
6671    fn property_fts_rows_deleted_on_retire() {
6672        let db = NamedTempFile::new().expect("temporary db");
6673        let schema = Arc::new(SchemaManager::new());
6674        {
6675            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6676            schema.bootstrap(&conn).expect("bootstrap");
6677            conn.execute(
6678                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6679                 VALUES ('Goal', '[\"$.name\"]', ' ')",
6680                [],
6681            )
6682            .expect("register schema");
6683        }
6684        let writer = WriterActor::start(
6685            db.path(),
6686            Arc::clone(&schema),
6687            ProvenanceMode::Warn,
6688            Arc::new(TelemetryCounters::default()),
6689        )
6690        .expect("writer");
6691
6692        // Insert a node.
6693        writer
6694            .submit(WriteRequest {
6695                label: "insert".to_owned(),
6696                nodes: vec![NodeInsert {
6697                    row_id: "row-1".to_owned(),
6698                    logical_id: "goal-1".to_owned(),
6699                    kind: "Goal".to_owned(),
6700                    properties: r#"{"name":"Alpha"}"#.to_owned(),
6701                    source_ref: Some("src-1".to_owned()),
6702                    upsert: false,
6703                    chunk_policy: ChunkPolicy::Preserve,
6704                    content_ref: None,
6705                }],
6706                node_retires: vec![],
6707                edges: vec![],
6708                edge_retires: vec![],
6709                chunks: vec![],
6710                runs: vec![],
6711                steps: vec![],
6712                actions: vec![],
6713                optional_backfills: vec![],
6714                vec_inserts: vec![],
6715                operational_writes: vec![],
6716            })
6717            .expect("insert");
6718
6719        // Retire the node.
6720        writer
6721            .submit(WriteRequest {
6722                label: "retire".to_owned(),
6723                nodes: vec![],
6724                node_retires: vec![NodeRetire {
6725                    logical_id: "goal-1".to_owned(),
6726                    source_ref: Some("forget-1".to_owned()),
6727                }],
6728                edges: vec![],
6729                edge_retires: vec![],
6730                chunks: vec![],
6731                runs: vec![],
6732                steps: vec![],
6733                actions: vec![],
6734                optional_backfills: vec![],
6735                vec_inserts: vec![],
6736                operational_writes: vec![],
6737            })
6738            .expect("retire");
6739
6740        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6741        let count: i64 = conn
6742            .query_row(
6743                "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6744                [],
6745                |row| row.get(0),
6746            )
6747            .expect("count");
6748        assert_eq!(count, 0, "property FTS row must be deleted on retire");
6749    }
6750
6751    #[test]
6752    fn no_property_fts_row_for_unregistered_kind() {
6753        let db = NamedTempFile::new().expect("temporary db");
6754        let schema = Arc::new(SchemaManager::new());
6755        {
6756            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6757            schema.bootstrap(&conn).expect("bootstrap");
6758            // No schema registered for "Note" kind.
6759        }
6760        let writer = WriterActor::start(
6761            db.path(),
6762            Arc::clone(&schema),
6763            ProvenanceMode::Warn,
6764            Arc::new(TelemetryCounters::default()),
6765        )
6766        .expect("writer");
6767
6768        writer
6769            .submit(WriteRequest {
6770                label: "insert".to_owned(),
6771                nodes: vec![NodeInsert {
6772                    row_id: "row-1".to_owned(),
6773                    logical_id: "note-1".to_owned(),
6774                    kind: "Note".to_owned(),
6775                    properties: r#"{"title":"hello"}"#.to_owned(),
6776                    source_ref: Some("src-1".to_owned()),
6777                    upsert: false,
6778                    chunk_policy: ChunkPolicy::Preserve,
6779                    content_ref: None,
6780                }],
6781                node_retires: vec![],
6782                edges: vec![],
6783                edge_retires: vec![],
6784                chunks: vec![],
6785                runs: vec![],
6786                steps: vec![],
6787                actions: vec![],
6788                optional_backfills: vec![],
6789                vec_inserts: vec![],
6790                operational_writes: vec![],
6791            })
6792            .expect("insert");
6793
6794        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6795        let count: i64 = conn
6796            .query_row("SELECT count(*) FROM fts_node_properties", [], |row| {
6797                row.get(0)
6798            })
6799            .expect("count");
6800        assert_eq!(count, 0, "no property FTS rows for unregistered kind");
6801    }
6802
6803    mod extract_json_path_tests {
6804        use super::super::extract_json_path;
6805        use serde_json::json;
6806
6807        #[test]
6808        fn string_value() {
6809            let v = json!({"name": "alice"});
6810            assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
6811        }
6812
6813        #[test]
6814        fn number_value() {
6815            let v = json!({"age": 42});
6816            assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
6817        }
6818
6819        #[test]
6820        fn bool_value() {
6821            let v = json!({"active": true});
6822            assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
6823        }
6824
6825        #[test]
6826        fn null_value() {
6827            let v = json!({"x": null});
6828            assert!(extract_json_path(&v, "$.x").is_empty());
6829        }
6830
6831        #[test]
6832        fn missing_path() {
6833            let v = json!({"name": "a"});
6834            assert!(extract_json_path(&v, "$.missing").is_empty());
6835        }
6836
6837        #[test]
6838        fn nested_path() {
6839            let v = json!({"address": {"city": "NYC"}});
6840            assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
6841        }
6842
6843        #[test]
6844        fn array_of_strings() {
6845            let v = json!({"tags": ["a", "b", "c"]});
6846            assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
6847        }
6848
6849        #[test]
6850        fn array_mixed_scalars() {
6851            let v = json!({"vals": ["x", 1, true]});
6852            assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
6853        }
6854
6855        #[test]
6856        fn array_only_objects_returns_empty() {
6857            let v = json!({"data": [{"k": "v"}]});
6858            assert!(extract_json_path(&v, "$.data").is_empty());
6859        }
6860
6861        #[test]
6862        fn array_mixed_objects_and_scalars() {
6863            let v = json!({"data": ["keep", {"skip": true}, "also"]});
6864            assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
6865        }
6866
6867        #[test]
6868        fn object_returns_empty() {
6869            let v = json!({"meta": {"k": "v"}});
6870            assert!(extract_json_path(&v, "$.meta").is_empty());
6871        }
6872
6873        #[test]
6874        fn no_prefix_returns_empty() {
6875            let v = json!({"name": "a"});
6876            assert!(extract_json_path(&v, "name").is_empty());
6877        }
6878    }
6879}