Skip to main content

fathomdb_engine/
writer.rs

1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::SchemaManager;
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14    OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15    OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16    OperationalValidationMode, extract_secondary_index_entries_for_current,
17    extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18    parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23/// A deferred projection backfill task submitted alongside a write.
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26    /// Which projection to backfill (FTS, vec, etc.).
27    pub target: ProjectionTarget,
28    /// JSON payload describing the backfill work.
29    pub payload: String,
30}
31
32/// Policy for handling existing chunks when upserting a node.
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35    /// Keep existing chunks and FTS rows.
36    #[default]
37    Preserve,
38    /// Delete existing chunks and FTS rows before inserting new ones.
39    Replace,
40}
41
42/// Controls how missing `source_ref` values are handled at write time.
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45    /// Emit a warning in the [`WriteReceipt`] but allow the write to proceed.
46    #[default]
47    Warn,
48    /// Reject the write with [`EngineError::InvalidWrite`] if any canonical
49    /// insert or retire is missing `source_ref`.
50    Require,
51}
52
53/// A node to be inserted in a [`WriteRequest`].
54#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56    pub row_id: String,
57    pub logical_id: String,
58    pub kind: String,
59    pub properties: String,
60    pub source_ref: Option<String>,
61    /// When true the writer supersedes the current active row for this `logical_id`
62    /// before inserting this new version. The supersession and insert are atomic.
63    pub upsert: bool,
64    /// Controls whether existing chunks and FTS rows are deleted when upsert=true.
65    pub chunk_policy: ChunkPolicy,
66}
67
68/// An edge to be inserted in a [`WriteRequest`].
69#[derive(Clone, Debug, PartialEq, Eq)]
70pub struct EdgeInsert {
71    pub row_id: String,
72    pub logical_id: String,
73    pub source_logical_id: String,
74    pub target_logical_id: String,
75    pub kind: String,
76    pub properties: String,
77    pub source_ref: Option<String>,
78    /// When true the writer supersedes the current active edge for this `logical_id`
79    /// before inserting this new version. The supersession and insert are atomic.
80    pub upsert: bool,
81}
82
83/// A node to be retired (soft-deleted) in a [`WriteRequest`].
84#[derive(Clone, Debug, PartialEq, Eq)]
85pub struct NodeRetire {
86    pub logical_id: String,
87    pub source_ref: Option<String>,
88}
89
90/// An edge to be retired (soft-deleted) in a [`WriteRequest`].
91#[derive(Clone, Debug, PartialEq, Eq)]
92pub struct EdgeRetire {
93    pub logical_id: String,
94    pub source_ref: Option<String>,
95}
96
97/// A text chunk to be inserted in a [`WriteRequest`].
98#[derive(Clone, Debug, PartialEq, Eq)]
99pub struct ChunkInsert {
100    pub id: String,
101    pub node_logical_id: String,
102    pub text_content: String,
103    pub byte_start: Option<i64>,
104    pub byte_end: Option<i64>,
105}
106
107/// A vector embedding to attach to an existing chunk.
108///
109/// The `chunk_id` must reference a chunk already present in the database or
110/// co-submitted in the same [`WriteRequest`].  The embedding is stored in the
111/// `vec_nodes_active` virtual table when the `sqlite-vec` feature is enabled;
112/// without the feature the insert is silently skipped.
113#[derive(Clone, Debug, PartialEq)]
114pub struct VecInsert {
115    pub chunk_id: String,
116    pub embedding: Vec<f32>,
117}
118
119/// A mutation to an operational collection submitted as part of a write request.
120#[derive(Clone, Debug, PartialEq, Eq)]
121pub enum OperationalWrite {
122    Append {
123        collection: String,
124        record_key: String,
125        payload_json: String,
126        source_ref: Option<String>,
127    },
128    Put {
129        collection: String,
130        record_key: String,
131        payload_json: String,
132        source_ref: Option<String>,
133    },
134    Delete {
135        collection: String,
136        record_key: String,
137        source_ref: Option<String>,
138    },
139}
140
141/// A run to be inserted in a [`WriteRequest`].
142#[derive(Clone, Debug, PartialEq, Eq)]
143pub struct RunInsert {
144    pub id: String,
145    pub kind: String,
146    pub status: String,
147    pub properties: String,
148    pub source_ref: Option<String>,
149    pub upsert: bool,
150    pub supersedes_id: Option<String>,
151}
152
153/// A step to be inserted in a [`WriteRequest`].
154#[derive(Clone, Debug, PartialEq, Eq)]
155pub struct StepInsert {
156    pub id: String,
157    pub run_id: String,
158    pub kind: String,
159    pub status: String,
160    pub properties: String,
161    pub source_ref: Option<String>,
162    pub upsert: bool,
163    pub supersedes_id: Option<String>,
164}
165
166/// An action to be inserted in a [`WriteRequest`].
167#[derive(Clone, Debug, PartialEq, Eq)]
168pub struct ActionInsert {
169    pub id: String,
170    pub step_id: String,
171    pub kind: String,
172    pub status: String,
173    pub properties: String,
174    pub source_ref: Option<String>,
175    pub upsert: bool,
176    pub supersedes_id: Option<String>,
177}
178
179// --- Write-request size limits ---
180const MAX_NODES: usize = 10_000;
181const MAX_EDGES: usize = 10_000;
182const MAX_CHUNKS: usize = 50_000;
183const MAX_RETIRES: usize = 10_000;
184const MAX_RUNTIME_ITEMS: usize = 10_000;
185const MAX_OPERATIONAL: usize = 10_000;
186const MAX_TOTAL_ITEMS: usize = 100_000;
187
188/// How long `submit` / `touch_last_accessed` wait for the writer thread to reply.
189///
190/// After this timeout the caller receives [`EngineError::WriterTimedOut`] — the
191/// writer thread is **not** cancelled, so the write may still commit.  This is
192/// intentionally distinct from [`EngineError::WriterRejected`], which signals
193/// that the writer thread has disconnected and the write will **not** commit.
194const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
195
196/// A batch of graph mutations to be applied atomically in a single `SQLite` transaction.
197#[derive(Clone, Debug, PartialEq)]
198pub struct WriteRequest {
199    pub label: String,
200    pub nodes: Vec<NodeInsert>,
201    pub node_retires: Vec<NodeRetire>,
202    pub edges: Vec<EdgeInsert>,
203    pub edge_retires: Vec<EdgeRetire>,
204    pub chunks: Vec<ChunkInsert>,
205    pub runs: Vec<RunInsert>,
206    pub steps: Vec<StepInsert>,
207    pub actions: Vec<ActionInsert>,
208    pub optional_backfills: Vec<OptionalProjectionTask>,
209    /// Vector embeddings to persist alongside chunks.  Silently skipped when the
210    /// `sqlite-vec` feature is absent.
211    pub vec_inserts: Vec<VecInsert>,
212    pub operational_writes: Vec<OperationalWrite>,
213}
214
215/// Receipt returned after a successful write transaction.
216#[derive(Clone, Debug, PartialEq, Eq)]
217pub struct WriteReceipt {
218    pub label: String,
219    pub optional_backfill_count: usize,
220    pub warnings: Vec<String>,
221    pub provenance_warnings: Vec<String>,
222}
223
224/// Request to update `last_accessed_at` timestamps for a batch of nodes.
225#[derive(Clone, Debug, PartialEq, Eq)]
226pub struct LastAccessTouchRequest {
227    pub logical_ids: Vec<String>,
228    pub touched_at: i64,
229    pub source_ref: Option<String>,
230}
231
232/// Report from a last-access touch operation.
233#[derive(Clone, Debug, PartialEq, Eq)]
234pub struct LastAccessTouchReport {
235    pub touched_logical_ids: usize,
236    pub touched_at: i64,
237}
238
239#[derive(Clone, Debug, PartialEq, Eq)]
240struct FtsProjectionRow {
241    chunk_id: String,
242    node_logical_id: String,
243    kind: String,
244    text_content: String,
245}
246
247struct PreparedWrite {
248    label: String,
249    nodes: Vec<NodeInsert>,
250    node_retires: Vec<NodeRetire>,
251    edges: Vec<EdgeInsert>,
252    edge_retires: Vec<EdgeRetire>,
253    chunks: Vec<ChunkInsert>,
254    runs: Vec<RunInsert>,
255    steps: Vec<StepInsert>,
256    actions: Vec<ActionInsert>,
257    /// Suppressed when sqlite-vec feature is absent: field is set by `prepare_write` but only
258    /// consumed by the cfg-gated apply block.
259    #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
260    vec_inserts: Vec<VecInsert>,
261    operational_writes: Vec<OperationalWrite>,
262    operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
263    operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
264    operational_validation_warnings: Vec<String>,
265    /// `node_logical_id` → kind for nodes co-submitted in this request.
266    /// Used by `resolve_fts_rows` to avoid a DB round-trip for the common case.
267    node_kinds: HashMap<String, String>,
268    /// Filled in by `resolve_fts_rows` in the writer thread before BEGIN IMMEDIATE.
269    required_fts_rows: Vec<FtsProjectionRow>,
270    optional_backfills: Vec<OptionalProjectionTask>,
271}
272
273enum WriteMessage {
274    Submit {
275        prepared: Box<PreparedWrite>,
276        reply: Sender<Result<WriteReceipt, EngineError>>,
277    },
278    TouchLastAccessed {
279        request: LastAccessTouchRequest,
280        reply: Sender<Result<LastAccessTouchReport, EngineError>>,
281    },
282}
283
284/// Single-threaded writer that serializes all mutations through one `SQLite` connection.
285///
286/// On drop, the channel is closed and the writer thread is joined, ensuring all
287/// in-flight writes complete and the `SQLite` connection closes cleanly.
288#[derive(Debug)]
289pub struct WriterActor {
290    sender: ManuallyDrop<SyncSender<WriteMessage>>,
291    thread_handle: Option<thread::JoinHandle<()>>,
292    provenance_mode: ProvenanceMode,
293    // Retained for Phase 2 (statement-level telemetry from WriterActor methods).
294    _telemetry: Arc<TelemetryCounters>,
295}
296
297impl WriterActor {
298    /// # Errors
299    /// Returns [`EngineError`] if the writer thread cannot be spawned.
300    pub fn start(
301        path: impl AsRef<Path>,
302        schema_manager: Arc<SchemaManager>,
303        provenance_mode: ProvenanceMode,
304        telemetry: Arc<TelemetryCounters>,
305    ) -> Result<Self, EngineError> {
306        let database_path = path.as_ref().to_path_buf();
307        let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
308
309        let writer_telemetry = Arc::clone(&telemetry);
310        let handle = thread::Builder::new()
311            .name("fathomdb-writer".to_owned())
312            .spawn(move || {
313                writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
314            })
315            .map_err(EngineError::Io)?;
316
317        Ok(Self {
318            sender: ManuallyDrop::new(sender),
319            thread_handle: Some(handle),
320            provenance_mode,
321            _telemetry: telemetry,
322        })
323    }
324
325    /// Returns `true` if the writer thread is still running.
326    fn is_thread_alive(&self) -> bool {
327        self.thread_handle
328            .as_ref()
329            .is_some_and(|h| !h.is_finished())
330    }
331
332    /// Returns `WriterRejected` if the writer thread has exited.
333    fn check_thread_alive(&self) -> Result<(), EngineError> {
334        if self.is_thread_alive() {
335            Ok(())
336        } else {
337            Err(EngineError::WriterRejected(
338                "writer thread has exited".to_owned(),
339            ))
340        }
341    }
342
343    /// # Errors
344    /// Returns [`EngineError`] if the write request validation fails, the writer actor has shut
345    /// down, or the underlying `SQLite` transaction fails.
346    pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
347        self.check_thread_alive()?;
348        let prepared = prepare_write(request, self.provenance_mode)?;
349        let (reply_tx, reply_rx) = mpsc::channel();
350        self.sender
351            .send(WriteMessage::Submit {
352                prepared: Box::new(prepared),
353                reply: reply_tx,
354            })
355            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
356
357        recv_with_timeout(&reply_rx)
358    }
359
360    /// # Errors
361    /// Returns [`EngineError`] if validation fails, the writer actor has shut down,
362    /// or the underlying `SQLite` transaction fails.
363    pub fn touch_last_accessed(
364        &self,
365        request: LastAccessTouchRequest,
366    ) -> Result<LastAccessTouchReport, EngineError> {
367        self.check_thread_alive()?;
368        prepare_touch_last_accessed(&request, self.provenance_mode)?;
369        let (reply_tx, reply_rx) = mpsc::channel();
370        self.sender
371            .send(WriteMessage::TouchLastAccessed {
372                request,
373                reply: reply_tx,
374            })
375            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
376
377        recv_with_timeout(&reply_rx)
378    }
379}
380
381#[cfg(not(feature = "tracing"))]
382#[allow(clippy::print_stderr)]
383fn stderr_panic_notice() {
384    eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
385}
386
387impl Drop for WriterActor {
388    fn drop(&mut self) {
389        // Phase 1: close the channel so the writer thread's `for msg in receiver`
390        // loop exits after finishing any in-progress message.
391        // Must happen BEFORE join to avoid deadlock.
392        //
393        // SAFETY: drop is called exactly once, and no method accesses the sender
394        // after drop begins.
395        unsafe { ManuallyDrop::drop(&mut self.sender) };
396
397        // Phase 2: join the writer thread to ensure the SQLite connection closes.
398        if let Some(handle) = self.thread_handle.take() {
399            match handle.join() {
400                Ok(()) => {}
401                Err(payload) => {
402                    if std::thread::panicking() {
403                        trace_warn!(
404                            "writer thread panicked during shutdown (suppressed: already panicking)"
405                        );
406                        #[cfg(not(feature = "tracing"))]
407                        stderr_panic_notice();
408                    } else {
409                        std::panic::resume_unwind(payload);
410                    }
411                }
412            }
413        }
414    }
415}
416
417/// Wait for a reply from the writer thread, with a timeout.
418fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
419    rx.recv_timeout(WRITER_REPLY_TIMEOUT)
420        .map_err(|error| match error {
421            mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
422                "write timed out waiting for writer thread reply — the write may still commit"
423                    .to_owned(),
424            ),
425            mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
426        })
427        .and_then(|result| result)
428}
429
430fn prepare_touch_last_accessed(
431    request: &LastAccessTouchRequest,
432    mode: ProvenanceMode,
433) -> Result<(), EngineError> {
434    if request.logical_ids.is_empty() {
435        return Err(EngineError::InvalidWrite(
436            "touch_last_accessed requires at least one logical_id".to_owned(),
437        ));
438    }
439    for logical_id in &request.logical_ids {
440        if logical_id.trim().is_empty() {
441            return Err(EngineError::InvalidWrite(
442                "touch_last_accessed requires non-empty logical_ids".to_owned(),
443            ));
444        }
445    }
446    if mode == ProvenanceMode::Require && request.source_ref.is_none() {
447        return Err(EngineError::InvalidWrite(
448            "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
449                .to_owned(),
450        ));
451    }
452    Ok(())
453}
454
455fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
456    let missing: Vec<String> = request
457        .nodes
458        .iter()
459        .filter(|n| n.source_ref.is_none())
460        .map(|n| format!("node '{}'", n.logical_id))
461        .chain(
462            request
463                .node_retires
464                .iter()
465                .filter(|r| r.source_ref.is_none())
466                .map(|r| format!("node retire '{}'", r.logical_id)),
467        )
468        .chain(
469            request
470                .edges
471                .iter()
472                .filter(|e| e.source_ref.is_none())
473                .map(|e| format!("edge '{}'", e.logical_id)),
474        )
475        .chain(
476            request
477                .edge_retires
478                .iter()
479                .filter(|r| r.source_ref.is_none())
480                .map(|r| format!("edge retire '{}'", r.logical_id)),
481        )
482        .chain(
483            request
484                .runs
485                .iter()
486                .filter(|r| r.source_ref.is_none())
487                .map(|r| format!("run '{}'", r.id)),
488        )
489        .chain(
490            request
491                .steps
492                .iter()
493                .filter(|s| s.source_ref.is_none())
494                .map(|s| format!("step '{}'", s.id)),
495        )
496        .chain(
497            request
498                .actions
499                .iter()
500                .filter(|a| a.source_ref.is_none())
501                .map(|a| format!("action '{}'", a.id)),
502        )
503        .chain(
504            request
505                .operational_writes
506                .iter()
507                .filter(|write| operational_write_source_ref(write).is_none())
508                .map(|write| {
509                    format!(
510                        "operational {} '{}:{}'",
511                        operational_write_kind(write),
512                        operational_write_collection(write),
513                        operational_write_record_key(write)
514                    )
515                }),
516        )
517        .collect();
518
519    if missing.is_empty() {
520        Ok(())
521    } else {
522        Err(EngineError::InvalidWrite(format!(
523            "ProvenanceMode::Require: missing source_ref on: {}",
524            missing.join(", ")
525        )))
526    }
527}
528
529fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
530    if request.nodes.len() > MAX_NODES {
531        return Err(EngineError::InvalidWrite(format!(
532            "too many nodes: {} exceeds limit of {MAX_NODES}",
533            request.nodes.len()
534        )));
535    }
536    if request.edges.len() > MAX_EDGES {
537        return Err(EngineError::InvalidWrite(format!(
538            "too many edges: {} exceeds limit of {MAX_EDGES}",
539            request.edges.len()
540        )));
541    }
542    if request.chunks.len() > MAX_CHUNKS {
543        return Err(EngineError::InvalidWrite(format!(
544            "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
545            request.chunks.len()
546        )));
547    }
548    let retires = request.node_retires.len() + request.edge_retires.len();
549    if retires > MAX_RETIRES {
550        return Err(EngineError::InvalidWrite(format!(
551            "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
552        )));
553    }
554    let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
555    if runtime_items > MAX_RUNTIME_ITEMS {
556        return Err(EngineError::InvalidWrite(format!(
557            "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
558        )));
559    }
560    if request.operational_writes.len() > MAX_OPERATIONAL {
561        return Err(EngineError::InvalidWrite(format!(
562            "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
563            request.operational_writes.len()
564        )));
565    }
566    let total = request.nodes.len()
567        + request.node_retires.len()
568        + request.edges.len()
569        + request.edge_retires.len()
570        + request.chunks.len()
571        + request.runs.len()
572        + request.steps.len()
573        + request.actions.len()
574        + request.vec_inserts.len()
575        + request.operational_writes.len();
576    if total > MAX_TOTAL_ITEMS {
577        return Err(EngineError::InvalidWrite(format!(
578            "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
579        )));
580    }
581    Ok(())
582}
583
584#[allow(clippy::too_many_lines)]
585fn prepare_write(
586    request: WriteRequest,
587    mode: ProvenanceMode,
588) -> Result<PreparedWrite, EngineError> {
589    validate_request_size(&request)?;
590
591    // --- ID validation: reject empty IDs ---
592    for node in &request.nodes {
593        if node.row_id.is_empty() {
594            return Err(EngineError::InvalidWrite(
595                "NodeInsert has empty row_id".to_owned(),
596            ));
597        }
598        if node.logical_id.is_empty() {
599            return Err(EngineError::InvalidWrite(
600                "NodeInsert has empty logical_id".to_owned(),
601            ));
602        }
603    }
604    for edge in &request.edges {
605        if edge.row_id.is_empty() {
606            return Err(EngineError::InvalidWrite(
607                "EdgeInsert has empty row_id".to_owned(),
608            ));
609        }
610        if edge.logical_id.is_empty() {
611            return Err(EngineError::InvalidWrite(
612                "EdgeInsert has empty logical_id".to_owned(),
613            ));
614        }
615    }
616    for chunk in &request.chunks {
617        if chunk.id.is_empty() {
618            return Err(EngineError::InvalidWrite(
619                "ChunkInsert has empty id".to_owned(),
620            ));
621        }
622        if chunk.text_content.is_empty() {
623            return Err(EngineError::InvalidWrite(format!(
624                "chunk '{}' has empty text_content; empty chunks are not allowed",
625                chunk.id
626            )));
627        }
628    }
629    for run in &request.runs {
630        if run.id.is_empty() {
631            return Err(EngineError::InvalidWrite(
632                "RunInsert has empty id".to_owned(),
633            ));
634        }
635    }
636    for step in &request.steps {
637        if step.id.is_empty() {
638            return Err(EngineError::InvalidWrite(
639                "StepInsert has empty id".to_owned(),
640            ));
641        }
642    }
643    for action in &request.actions {
644        if action.id.is_empty() {
645            return Err(EngineError::InvalidWrite(
646                "ActionInsert has empty id".to_owned(),
647            ));
648        }
649    }
650    for vi in &request.vec_inserts {
651        if vi.chunk_id.is_empty() {
652            return Err(EngineError::InvalidWrite(
653                "VecInsert has empty chunk_id".to_owned(),
654            ));
655        }
656        if vi.embedding.is_empty() {
657            return Err(EngineError::InvalidWrite(format!(
658                "VecInsert for chunk '{}' has empty embedding",
659                vi.chunk_id
660            )));
661        }
662    }
663    for operational in &request.operational_writes {
664        if operational_write_collection(operational).is_empty() {
665            return Err(EngineError::InvalidWrite(
666                "OperationalWrite has empty collection".to_owned(),
667            ));
668        }
669        if operational_write_record_key(operational).is_empty() {
670            return Err(EngineError::InvalidWrite(format!(
671                "OperationalWrite for collection '{}' has empty record_key",
672                operational_write_collection(operational)
673            )));
674        }
675        match operational {
676            OperationalWrite::Append { payload_json, .. }
677            | OperationalWrite::Put { payload_json, .. } => {
678                if payload_json.is_empty() {
679                    return Err(EngineError::InvalidWrite(format!(
680                        "OperationalWrite {} '{}:{}' has empty payload_json",
681                        operational_write_kind(operational),
682                        operational_write_collection(operational),
683                        operational_write_record_key(operational)
684                    )));
685                }
686            }
687            OperationalWrite::Delete { .. } => {}
688        }
689    }
690
691    // --- ID validation: reject duplicate row_ids within the request ---
692    {
693        let mut seen = std::collections::HashSet::new();
694        for node in &request.nodes {
695            if !seen.insert(node.row_id.as_str()) {
696                return Err(EngineError::InvalidWrite(format!(
697                    "duplicate row_id '{}' within the same WriteRequest",
698                    node.row_id
699                )));
700            }
701        }
702        for edge in &request.edges {
703            if !seen.insert(edge.row_id.as_str()) {
704                return Err(EngineError::InvalidWrite(format!(
705                    "duplicate row_id '{}' within the same WriteRequest",
706                    edge.row_id
707                )));
708            }
709        }
710    }
711
712    // --- ProvenanceMode::Require enforcement ---
713    if mode == ProvenanceMode::Require {
714        check_require_provenance(&request)?;
715    }
716
717    // --- Runtime table upsert validation ---
718    for run in &request.runs {
719        if run.upsert && run.supersedes_id.is_none() {
720            return Err(EngineError::InvalidWrite(format!(
721                "run '{}': upsert=true requires supersedes_id to be set",
722                run.id
723            )));
724        }
725    }
726    for step in &request.steps {
727        if step.upsert && step.supersedes_id.is_none() {
728            return Err(EngineError::InvalidWrite(format!(
729                "step '{}': upsert=true requires supersedes_id to be set",
730                step.id
731            )));
732        }
733    }
734    for action in &request.actions {
735        if action.upsert && action.supersedes_id.is_none() {
736            return Err(EngineError::InvalidWrite(format!(
737                "action '{}': upsert=true requires supersedes_id to be set",
738                action.id
739            )));
740        }
741    }
742
743    let node_kinds = request
744        .nodes
745        .iter()
746        .map(|node| (node.logical_id.clone(), node.kind.clone()))
747        .collect::<HashMap<_, _>>();
748
749    Ok(PreparedWrite {
750        label: request.label,
751        nodes: request.nodes,
752        node_retires: request.node_retires,
753        edges: request.edges,
754        edge_retires: request.edge_retires,
755        chunks: request.chunks,
756        runs: request.runs,
757        steps: request.steps,
758        actions: request.actions,
759        vec_inserts: request.vec_inserts,
760        operational_writes: request.operational_writes,
761        operational_collection_kinds: HashMap::new(),
762        operational_collection_filter_fields: HashMap::new(),
763        operational_validation_warnings: Vec::new(),
764        node_kinds,
765        required_fts_rows: Vec::new(),
766        optional_backfills: request.optional_backfills,
767    })
768}
769
770fn writer_loop(
771    database_path: &Path,
772    schema_manager: &Arc<SchemaManager>,
773    receiver: mpsc::Receiver<WriteMessage>,
774    telemetry: &TelemetryCounters,
775) {
776    trace_info!("writer thread started");
777
778    let mut conn = match sqlite::open_connection(database_path) {
779        Ok(conn) => conn,
780        Err(error) => {
781            trace_error!(error = %error, "writer thread: database connection failed");
782            reject_all(receiver, &error.to_string());
783            return;
784        }
785    };
786
787    if let Err(error) = schema_manager.bootstrap(&conn) {
788        trace_error!(error = %error, "writer thread: schema bootstrap failed");
789        reject_all(receiver, &error.to_string());
790        return;
791    }
792
793    for message in receiver {
794        match message {
795            WriteMessage::Submit {
796                mut prepared,
797                reply,
798            } => {
799                #[cfg(feature = "tracing")]
800                let start = std::time::Instant::now();
801                let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
802                    resolve_and_apply(&mut conn, &mut prepared)
803                }));
804                if let Ok(inner) = result {
805                    #[allow(unused_variables)]
806                    if let Err(error) = &inner {
807                        trace_error!(
808                            label = %prepared.label,
809                            error = %error,
810                            "write failed"
811                        );
812                        telemetry.increment_errors();
813                    } else {
814                        let row_count = (prepared.nodes.len()
815                            + prepared.edges.len()
816                            + prepared.chunks.len()) as u64;
817                        telemetry.increment_writes(row_count);
818                        trace_info!(
819                            label = %prepared.label,
820                            nodes = prepared.nodes.len(),
821                            edges = prepared.edges.len(),
822                            chunks = prepared.chunks.len(),
823                            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
824                            "write committed"
825                        );
826                    }
827                    let _ = reply.send(inner);
828                } else {
829                    trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
830                    telemetry.increment_errors();
831                    // Attempt to clean up any open transaction after a panic.
832                    let _ = conn.execute_batch("ROLLBACK");
833                    let _ = reply.send(Err(EngineError::WriterRejected(
834                        "writer thread panic during resolve_and_apply".to_owned(),
835                    )));
836                }
837            }
838            WriteMessage::TouchLastAccessed { request, reply } => {
839                let result = apply_touch_last_accessed(&mut conn, &request);
840                if result.is_ok() {
841                    telemetry.increment_writes(0);
842                } else {
843                    telemetry.increment_errors();
844                }
845                let _ = reply.send(result);
846            }
847        }
848    }
849
850    trace_info!("writer thread shutting down");
851}
852
853fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
854    for message in receiver {
855        match message {
856            WriteMessage::Submit { reply, .. } => {
857                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
858            }
859            WriteMessage::TouchLastAccessed { reply, .. } => {
860                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
861            }
862        }
863    }
864}
865
866/// Resolve FTS projection rows before the write transaction begins.
867///
868/// For each chunk in the prepared write, determines the node `kind` needed
869/// to populate the FTS index. If the node was co-submitted in the same
870/// request its kind is taken from `prepared.node_kinds` without a DB query.
871/// If the node is pre-existing it is looked up in the database. If the node
872/// cannot be found in either place, an `InvalidWrite` error is returned.
873fn resolve_fts_rows(
874    conn: &rusqlite::Connection,
875    prepared: &mut PreparedWrite,
876) -> Result<(), EngineError> {
877    let retiring_ids: std::collections::HashSet<&str> = prepared
878        .node_retires
879        .iter()
880        .map(|r| r.logical_id.as_str())
881        .collect();
882    for chunk in &prepared.chunks {
883        if retiring_ids.contains(chunk.node_logical_id.as_str()) {
884            return Err(EngineError::InvalidWrite(format!(
885                "chunk '{}' references node_logical_id '{}' which is being retired in the same \
886                 WriteRequest; retire and chunk insertion for the same node must not be combined",
887                chunk.id, chunk.node_logical_id
888            )));
889        }
890    }
891    for chunk in &prepared.chunks {
892        let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
893            k.clone()
894        } else {
895            match conn.query_row(
896                "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
897                params![chunk.node_logical_id],
898                |row| row.get::<_, String>(0),
899            ) {
900                Ok(kind) => kind,
901                Err(rusqlite::Error::QueryReturnedNoRows) => {
902                    return Err(EngineError::InvalidWrite(format!(
903                        "chunk '{}' references node_logical_id '{}' that is not present in this \
904                         write request or the database \
905                         (v1 limitation: chunks and their nodes must be submitted together or the \
906                         node must already exist)",
907                        chunk.id, chunk.node_logical_id
908                    )));
909                }
910                Err(e) => return Err(EngineError::Sqlite(e)),
911            }
912        };
913        prepared.required_fts_rows.push(FtsProjectionRow {
914            chunk_id: chunk.id.clone(),
915            node_logical_id: chunk.node_logical_id.clone(),
916            kind,
917            text_content: chunk.text_content.clone(),
918        });
919    }
920    trace_debug!(
921        fts_rows = prepared.required_fts_rows.len(),
922        chunks_processed = prepared.chunks.len(),
923        "fts row resolution completed"
924    );
925    Ok(())
926}
927
928fn resolve_operational_writes(
929    conn: &rusqlite::Connection,
930    prepared: &mut PreparedWrite,
931) -> Result<(), EngineError> {
932    let mut collection_kinds = HashMap::new();
933    let mut collection_filter_fields = HashMap::new();
934    let mut collection_validation_contracts = HashMap::new();
935    for write in &prepared.operational_writes {
936        let collection = operational_write_collection(write);
937        if !collection_kinds.contains_key(collection) {
938            let maybe_row: Option<(String, Option<i64>, String, String)> = conn
939                .query_row(
940                    "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
941                    params![collection],
942                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
943                )
944                .optional()
945                .map_err(EngineError::Sqlite)?;
946            let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
947                .ok_or_else(|| {
948                    EngineError::InvalidWrite(format!(
949                        "operational collection '{collection}' is not registered"
950                    ))
951                })?;
952            if disabled_at.is_some() {
953                return Err(EngineError::InvalidWrite(format!(
954                    "operational collection '{collection}' is disabled"
955                )));
956            }
957            let kind = OperationalCollectionKind::try_from(kind_text.as_str())
958                .map_err(EngineError::InvalidWrite)?;
959            let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
960            let validation_contract = parse_operational_validation_contract(&validation_json)
961                .map_err(EngineError::InvalidWrite)?;
962            collection_kinds.insert(collection.to_owned(), kind);
963            collection_filter_fields.insert(collection.to_owned(), filter_fields);
964            collection_validation_contracts.insert(collection.to_owned(), validation_contract);
965        }
966
967        let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
968            EngineError::InvalidWrite("missing operational collection kind".to_owned())
969        })?;
970        match (kind, write) {
971            (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
972            | (
973                OperationalCollectionKind::LatestState,
974                OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
975            ) => {}
976            (OperationalCollectionKind::AppendOnlyLog, _) => {
977                return Err(EngineError::InvalidWrite(format!(
978                    "operational collection '{collection}' is append_only_log and only accepts Append"
979                )));
980            }
981            (OperationalCollectionKind::LatestState, _) => {
982                return Err(EngineError::InvalidWrite(format!(
983                    "operational collection '{collection}' is latest_state and only accepts Put/Delete"
984                )));
985            }
986        }
987        if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
988            let _ = check_operational_write_against_contract(write, contract)?;
989        }
990    }
991    prepared.operational_collection_kinds = collection_kinds;
992    prepared.operational_collection_filter_fields = collection_filter_fields;
993    Ok(())
994}
995
996fn parse_operational_filter_fields(
997    filter_fields_json: &str,
998) -> Result<Vec<OperationalFilterField>, EngineError> {
999    let fields: Vec<OperationalFilterField> =
1000        serde_json::from_str(filter_fields_json).map_err(|error| {
1001            EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1002        })?;
1003    let mut seen = std::collections::HashSet::new();
1004    for field in &fields {
1005        if field.name.trim().is_empty() {
1006            return Err(EngineError::InvalidWrite(
1007                "filter_fields_json field names must not be empty".to_owned(),
1008            ));
1009        }
1010        if !seen.insert(field.name.as_str()) {
1011            return Err(EngineError::InvalidWrite(format!(
1012                "filter_fields_json contains duplicate field '{}'",
1013                field.name
1014            )));
1015        }
1016        if field.modes.is_empty() {
1017            return Err(EngineError::InvalidWrite(format!(
1018                "filter_fields_json field '{}' must declare at least one mode",
1019                field.name
1020            )));
1021        }
1022        if field.modes.contains(&OperationalFilterMode::Prefix)
1023            && field.field_type != OperationalFilterFieldType::String
1024        {
1025            return Err(EngineError::InvalidWrite(format!(
1026                "filter field '{}' only supports prefix for string types",
1027                field.name
1028            )));
1029        }
1030    }
1031    Ok(fields)
1032}
1033
1034#[derive(Clone, Debug, PartialEq, Eq)]
1035struct OperationalFilterValueRow {
1036    field_name: String,
1037    string_value: Option<String>,
1038    integer_value: Option<i64>,
1039}
1040
1041fn extract_operational_filter_values(
1042    filter_fields: &[OperationalFilterField],
1043    payload_json: &str,
1044) -> Vec<OperationalFilterValueRow> {
1045    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1046        return Vec::new();
1047    };
1048    let Some(object) = parsed.as_object() else {
1049        return Vec::new();
1050    };
1051
1052    filter_fields
1053        .iter()
1054        .filter_map(|field| {
1055            let value = object.get(&field.name)?;
1056            match field.field_type {
1057                OperationalFilterFieldType::String => {
1058                    value
1059                        .as_str()
1060                        .map(|string_value| OperationalFilterValueRow {
1061                            field_name: field.name.clone(),
1062                            string_value: Some(string_value.to_owned()),
1063                            integer_value: None,
1064                        })
1065                }
1066                OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1067                    value
1068                        .as_i64()
1069                        .map(|integer_value| OperationalFilterValueRow {
1070                            field_name: field.name.clone(),
1071                            string_value: None,
1072                            integer_value: Some(integer_value),
1073                        })
1074                }
1075            }
1076        })
1077        .collect()
1078}
1079
1080fn resolve_and_apply(
1081    conn: &mut rusqlite::Connection,
1082    prepared: &mut PreparedWrite,
1083) -> Result<WriteReceipt, EngineError> {
1084    resolve_fts_rows(conn, prepared)?;
1085    resolve_operational_writes(conn, prepared)?;
1086    apply_write(conn, prepared)
1087}
1088
1089fn apply_touch_last_accessed(
1090    conn: &mut rusqlite::Connection,
1091    request: &LastAccessTouchRequest,
1092) -> Result<LastAccessTouchReport, EngineError> {
1093    let mut seen = std::collections::HashSet::new();
1094    let logical_ids = request
1095        .logical_ids
1096        .iter()
1097        .filter(|logical_id| seen.insert(logical_id.as_str()))
1098        .cloned()
1099        .collect::<Vec<_>>();
1100    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1101
1102    for logical_id in &logical_ids {
1103        let exists = tx
1104            .query_row(
1105                "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1106                params![logical_id],
1107                |row| row.get::<_, i64>(0),
1108            )
1109            .optional()?
1110            .is_some();
1111        if !exists {
1112            return Err(EngineError::InvalidWrite(format!(
1113                "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1114            )));
1115        }
1116    }
1117
1118    {
1119        let mut upsert_metadata = tx.prepare_cached(
1120            "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1121             VALUES (?1, ?2, ?2) \
1122             ON CONFLICT(logical_id) DO UPDATE SET \
1123                 last_accessed_at = excluded.last_accessed_at, \
1124                 updated_at = excluded.updated_at",
1125        )?;
1126        let mut insert_provenance = tx.prepare_cached(
1127            "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1128             VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1129        )?;
1130        for logical_id in &logical_ids {
1131            upsert_metadata.execute(params![logical_id, request.touched_at])?;
1132            insert_provenance.execute(params![
1133                new_id(),
1134                logical_id,
1135                request.source_ref.as_deref(),
1136                format!("{{\"touched_at\":{}}}", request.touched_at),
1137            ])?;
1138        }
1139    }
1140
1141    tx.commit()?;
1142    Ok(LastAccessTouchReport {
1143        touched_logical_ids: logical_ids.len(),
1144        touched_at: request.touched_at,
1145    })
1146}
1147
1148fn ensure_operational_collections_writable(
1149    tx: &rusqlite::Transaction<'_>,
1150    prepared: &PreparedWrite,
1151) -> Result<(), EngineError> {
1152    for collection in prepared.operational_collection_kinds.keys() {
1153        let disabled_at: Option<Option<i64>> = tx
1154            .query_row(
1155                "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1156                params![collection],
1157                |row| row.get::<_, Option<i64>>(0),
1158            )
1159            .optional()?;
1160        match disabled_at {
1161            Some(Some(_)) => {
1162                return Err(EngineError::InvalidWrite(format!(
1163                    "operational collection '{collection}' is disabled"
1164                )));
1165            }
1166            Some(None) => {}
1167            None => {
1168                return Err(EngineError::InvalidWrite(format!(
1169                    "operational collection '{collection}' is not registered"
1170                )));
1171            }
1172        }
1173    }
1174    Ok(())
1175}
1176
1177fn validate_operational_writes_against_live_contracts(
1178    tx: &rusqlite::Transaction<'_>,
1179    prepared: &PreparedWrite,
1180) -> Result<Vec<String>, EngineError> {
1181    let mut collection_validation_contracts =
1182        HashMap::<String, Option<OperationalValidationContract>>::new();
1183    for collection in prepared.operational_collection_kinds.keys() {
1184        let validation_json: String = tx
1185            .query_row(
1186                "SELECT validation_json FROM operational_collections WHERE name = ?1",
1187                params![collection],
1188                |row| row.get(0),
1189            )
1190            .map_err(EngineError::Sqlite)?;
1191        let validation_contract = parse_operational_validation_contract(&validation_json)
1192            .map_err(EngineError::InvalidWrite)?;
1193        collection_validation_contracts.insert(collection.clone(), validation_contract);
1194    }
1195
1196    let mut warnings = Vec::new();
1197    for write in &prepared.operational_writes {
1198        if let Some(Some(contract)) =
1199            collection_validation_contracts.get(operational_write_collection(write))
1200            && let Some(warning) = check_operational_write_against_contract(write, contract)?
1201        {
1202            warnings.push(warning);
1203        }
1204    }
1205
1206    Ok(warnings)
1207}
1208
1209fn load_live_operational_secondary_indexes(
1210    tx: &rusqlite::Transaction<'_>,
1211    prepared: &PreparedWrite,
1212) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1213    let mut collection_indexes = HashMap::new();
1214    for (collection, collection_kind) in &prepared.operational_collection_kinds {
1215        let secondary_indexes_json: String = tx
1216            .query_row(
1217                "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1218                params![collection],
1219                |row| row.get(0),
1220            )
1221            .map_err(EngineError::Sqlite)?;
1222        let indexes =
1223            parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1224                .map_err(EngineError::InvalidWrite)?;
1225        collection_indexes.insert(collection.clone(), indexes);
1226    }
1227    Ok(collection_indexes)
1228}
1229
1230fn check_operational_write_against_contract(
1231    write: &OperationalWrite,
1232    contract: &OperationalValidationContract,
1233) -> Result<Option<String>, EngineError> {
1234    if contract.mode == OperationalValidationMode::Disabled {
1235        return Ok(None);
1236    }
1237
1238    let (payload_json, collection, record_key) = match write {
1239        OperationalWrite::Append {
1240            collection,
1241            record_key,
1242            payload_json,
1243            ..
1244        }
1245        | OperationalWrite::Put {
1246            collection,
1247            record_key,
1248            payload_json,
1249            ..
1250        } => (
1251            payload_json.as_str(),
1252            collection.as_str(),
1253            record_key.as_str(),
1254        ),
1255        OperationalWrite::Delete { .. } => return Ok(None),
1256    };
1257
1258    match validate_operational_payload_against_contract(contract, payload_json) {
1259        Ok(()) => Ok(None),
1260        Err(message) => match contract.mode {
1261            OperationalValidationMode::Disabled => Ok(None),
1262            OperationalValidationMode::ReportOnly => Ok(Some(format!(
1263                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1264                kind = operational_write_kind(write)
1265            ))),
1266            OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1267                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1268                kind = operational_write_kind(write)
1269            ))),
1270        },
1271    }
1272}
1273
1274#[allow(clippy::too_many_lines)]
1275fn apply_write(
1276    conn: &mut rusqlite::Connection,
1277    prepared: &mut PreparedWrite,
1278) -> Result<WriteReceipt, EngineError> {
1279    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1280
1281    // Node retires: clear rebuildable FTS rows, preserve chunks/vec for possible restore,
1282    // mark superseded, record audit event.
1283    {
1284        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1285        let mut sup_node = tx.prepare_cached(
1286            "UPDATE nodes SET superseded_at = unixepoch() \
1287             WHERE logical_id = ?1 AND superseded_at IS NULL",
1288        )?;
1289        let mut ins_event = tx.prepare_cached(
1290            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1291             VALUES (?1, 'node_retire', ?2, ?3)",
1292        )?;
1293        for retire in &prepared.node_retires {
1294            del_fts.execute(params![retire.logical_id])?;
1295            sup_node.execute(params![retire.logical_id])?;
1296            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1297        }
1298    }
1299
1300    // Edge retires: mark superseded, record audit event.
1301    {
1302        let mut sup_edge = tx.prepare_cached(
1303            "UPDATE edges SET superseded_at = unixepoch() \
1304             WHERE logical_id = ?1 AND superseded_at IS NULL",
1305        )?;
1306        let mut ins_event = tx.prepare_cached(
1307            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1308             VALUES (?1, 'edge_retire', ?2, ?3)",
1309        )?;
1310        for retire in &prepared.edge_retires {
1311            sup_edge.execute(params![retire.logical_id])?;
1312            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1313        }
1314    }
1315
1316    // Node inserts (with optional upsert + chunk-policy handling).
1317    {
1318        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1319        let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1320        let mut sup_node = tx.prepare_cached(
1321            "UPDATE nodes SET superseded_at = unixepoch() \
1322             WHERE logical_id = ?1 AND superseded_at IS NULL",
1323        )?;
1324        let mut ins_node = tx.prepare_cached(
1325            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1326             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1327        )?;
1328        #[cfg(feature = "sqlite-vec")]
1329        let vec_del_sql2 = "DELETE FROM vec_nodes_active WHERE chunk_id IN \
1330                            (SELECT id FROM chunks WHERE node_logical_id = ?1)";
1331        #[cfg(feature = "sqlite-vec")]
1332        let mut del_vec = match tx.prepare_cached(vec_del_sql2) {
1333            Ok(stmt) => Some(stmt),
1334            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => None,
1335            Err(e) => return Err(e.into()),
1336        };
1337        for node in &prepared.nodes {
1338            if node.upsert {
1339                if node.chunk_policy == ChunkPolicy::Replace {
1340                    #[cfg(feature = "sqlite-vec")]
1341                    if let Some(ref mut stmt) = del_vec {
1342                        stmt.execute(params![node.logical_id])?;
1343                    }
1344                    del_fts.execute(params![node.logical_id])?;
1345                    del_chunks.execute(params![node.logical_id])?;
1346                }
1347                sup_node.execute(params![node.logical_id])?;
1348            }
1349            ins_node.execute(params![
1350                node.row_id,
1351                node.logical_id,
1352                node.kind,
1353                node.properties,
1354                node.source_ref,
1355            ])?;
1356        }
1357    }
1358
1359    // Edge inserts (with optional upsert).
1360    {
1361        let mut sup_edge = tx.prepare_cached(
1362            "UPDATE edges SET superseded_at = unixepoch() \
1363             WHERE logical_id = ?1 AND superseded_at IS NULL",
1364        )?;
1365        let mut ins_edge = tx.prepare_cached(
1366            "INSERT INTO edges \
1367             (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1368             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1369        )?;
1370        for edge in &prepared.edges {
1371            if edge.upsert {
1372                sup_edge.execute(params![edge.logical_id])?;
1373            }
1374            ins_edge.execute(params![
1375                edge.row_id,
1376                edge.logical_id,
1377                edge.source_logical_id,
1378                edge.target_logical_id,
1379                edge.kind,
1380                edge.properties,
1381                edge.source_ref,
1382            ])?;
1383        }
1384    }
1385
1386    // Chunk inserts.
1387    {
1388        let mut ins_chunk = tx.prepare_cached(
1389            "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at) \
1390             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch())",
1391        )?;
1392        for chunk in &prepared.chunks {
1393            ins_chunk.execute(params![
1394                chunk.id,
1395                chunk.node_logical_id,
1396                chunk.text_content,
1397                chunk.byte_start,
1398                chunk.byte_end,
1399            ])?;
1400        }
1401    }
1402
1403    // Run inserts (with optional upsert).
1404    {
1405        let mut sup_run = tx.prepare_cached(
1406            "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1407        )?;
1408        let mut ins_run = tx.prepare_cached(
1409            "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1410             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1411        )?;
1412        for run in &prepared.runs {
1413            if run.upsert
1414                && let Some(ref prior_id) = run.supersedes_id
1415            {
1416                sup_run.execute(params![prior_id])?;
1417            }
1418            ins_run.execute(params![
1419                run.id,
1420                run.kind,
1421                run.status,
1422                run.properties,
1423                run.source_ref
1424            ])?;
1425        }
1426    }
1427
1428    // Step inserts (with optional upsert).
1429    {
1430        let mut sup_step = tx.prepare_cached(
1431            "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1432        )?;
1433        let mut ins_step = tx.prepare_cached(
1434            "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1435             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1436        )?;
1437        for step in &prepared.steps {
1438            if step.upsert
1439                && let Some(ref prior_id) = step.supersedes_id
1440            {
1441                sup_step.execute(params![prior_id])?;
1442            }
1443            ins_step.execute(params![
1444                step.id,
1445                step.run_id,
1446                step.kind,
1447                step.status,
1448                step.properties,
1449                step.source_ref,
1450            ])?;
1451        }
1452    }
1453
1454    // Action inserts (with optional upsert).
1455    {
1456        let mut sup_action = tx.prepare_cached(
1457            "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1458        )?;
1459        let mut ins_action = tx.prepare_cached(
1460            "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1461             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1462        )?;
1463        for action in &prepared.actions {
1464            if action.upsert
1465                && let Some(ref prior_id) = action.supersedes_id
1466            {
1467                sup_action.execute(params![prior_id])?;
1468            }
1469            ins_action.execute(params![
1470                action.id,
1471                action.step_id,
1472                action.kind,
1473                action.status,
1474                action.properties,
1475                action.source_ref,
1476            ])?;
1477        }
1478    }
1479
1480    // Operational mutation log writes and latest-state current rows.
1481    {
1482        ensure_operational_collections_writable(&tx, prepared)?;
1483        prepared.operational_validation_warnings =
1484            validate_operational_writes_against_live_contracts(&tx, prepared)?;
1485        let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
1486
1487        let mut next_mutation_order: i64 = tx.query_row(
1488            "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
1489            [],
1490            |row| row.get(0),
1491        )?;
1492        let mut ins_mutation = tx.prepare_cached(
1493            "INSERT INTO operational_mutations \
1494             (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1495             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1496        )?;
1497        let mut ins_filter_value = tx.prepare_cached(
1498            "INSERT INTO operational_filter_values \
1499             (mutation_id, collection_name, field_name, string_value, integer_value) \
1500             VALUES (?1, ?2, ?3, ?4, ?5)",
1501        )?;
1502        let mut upsert_current = tx.prepare_cached(
1503            "INSERT INTO operational_current \
1504             (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1505             VALUES (?1, ?2, ?3, unixepoch(), ?4) \
1506             ON CONFLICT(collection_name, record_key) DO UPDATE SET \
1507                 payload_json = excluded.payload_json, \
1508                 updated_at = excluded.updated_at, \
1509                 last_mutation_id = excluded.last_mutation_id",
1510        )?;
1511        let mut del_current = tx.prepare_cached(
1512            "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
1513        )?;
1514        let mut del_current_secondary_indexes = tx.prepare_cached(
1515            "DELETE FROM operational_secondary_index_entries \
1516             WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
1517        )?;
1518        let mut ins_secondary_index = tx.prepare_cached(
1519            "INSERT INTO operational_secondary_index_entries \
1520             (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
1521              slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
1522             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1523        )?;
1524        let mut current_row_stmt = tx.prepare_cached(
1525            "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
1526             WHERE collection_name = ?1 AND record_key = ?2",
1527        )?;
1528
1529        for write in &prepared.operational_writes {
1530            let collection = operational_write_collection(write);
1531            let record_key = operational_write_record_key(write);
1532            let mutation_id = new_id();
1533            next_mutation_order += 1;
1534            let payload_json = operational_write_payload(write);
1535            ins_mutation.execute(params![
1536                &mutation_id,
1537                collection,
1538                record_key,
1539                operational_write_kind(write),
1540                payload_json,
1541                operational_write_source_ref(write),
1542                next_mutation_order,
1543            ])?;
1544            if let Some(indexes) = collection_secondary_indexes.get(collection) {
1545                for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
1546                    ins_secondary_index.execute(params![
1547                        collection,
1548                        entry.index_name,
1549                        "mutation",
1550                        &mutation_id,
1551                        record_key,
1552                        entry.sort_timestamp,
1553                        entry.slot1_text,
1554                        entry.slot1_integer,
1555                        entry.slot2_text,
1556                        entry.slot2_integer,
1557                        entry.slot3_text,
1558                        entry.slot3_integer,
1559                    ])?;
1560                }
1561            }
1562            if let Some(filter_fields) = prepared
1563                .operational_collection_filter_fields
1564                .get(collection)
1565            {
1566                for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
1567                    ins_filter_value.execute(params![
1568                        &mutation_id,
1569                        collection,
1570                        filter_value.field_name,
1571                        filter_value.string_value,
1572                        filter_value.integer_value,
1573                    ])?;
1574                }
1575            }
1576
1577            if prepared.operational_collection_kinds.get(collection)
1578                == Some(&OperationalCollectionKind::LatestState)
1579            {
1580                del_current_secondary_indexes.execute(params![collection, record_key])?;
1581                match write {
1582                    OperationalWrite::Put { payload_json, .. } => {
1583                        upsert_current.execute(params![
1584                            collection,
1585                            record_key,
1586                            payload_json,
1587                            &mutation_id,
1588                        ])?;
1589                        if let Some(indexes) = collection_secondary_indexes.get(collection) {
1590                            let (current_payload_json, updated_at, last_mutation_id): (
1591                                String,
1592                                i64,
1593                                String,
1594                            ) = current_row_stmt
1595                                .query_row(params![collection, record_key], |row| {
1596                                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1597                                })?;
1598                            for entry in extract_secondary_index_entries_for_current(
1599                                indexes,
1600                                &current_payload_json,
1601                                updated_at,
1602                            ) {
1603                                ins_secondary_index.execute(params![
1604                                    collection,
1605                                    entry.index_name,
1606                                    "current",
1607                                    last_mutation_id.as_str(),
1608                                    record_key,
1609                                    entry.sort_timestamp,
1610                                    entry.slot1_text,
1611                                    entry.slot1_integer,
1612                                    entry.slot2_text,
1613                                    entry.slot2_integer,
1614                                    entry.slot3_text,
1615                                    entry.slot3_integer,
1616                                ])?;
1617                            }
1618                        }
1619                    }
1620                    OperationalWrite::Delete { .. } => {
1621                        del_current.execute(params![collection, record_key])?;
1622                    }
1623                    OperationalWrite::Append { .. } => {}
1624                }
1625            }
1626        }
1627    }
1628
1629    // FTS row inserts.
1630    {
1631        let mut ins_fts = tx.prepare_cached(
1632            "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
1633             VALUES (?1, ?2, ?3, ?4)",
1634        )?;
1635        for fts_row in &prepared.required_fts_rows {
1636            ins_fts.execute(params![
1637                fts_row.chunk_id,
1638                fts_row.node_logical_id,
1639                fts_row.kind,
1640                fts_row.text_content,
1641            ])?;
1642        }
1643    }
1644
1645    // Vec inserts (feature-gated; silently skipped when sqlite-vec is absent or table missing).
1646    #[cfg(feature = "sqlite-vec")]
1647    {
1648        match tx
1649            .prepare_cached("INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES (?1, ?2)")
1650        {
1651            Ok(mut ins_vec) => {
1652                for vi in &prepared.vec_inserts {
1653                    let bytes: Vec<u8> =
1654                        vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
1655                    ins_vec.execute(params![vi.chunk_id, bytes])?;
1656                }
1657            }
1658            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {
1659                // vec profile absent: vec inserts are silently skipped.
1660            }
1661            Err(e) => return Err(e.into()),
1662        }
1663    }
1664
1665    tx.commit()?;
1666
1667    let provenance_warnings: Vec<String> = prepared
1668        .nodes
1669        .iter()
1670        .filter(|node| node.source_ref.is_none())
1671        .map(|node| format!("node '{}' has no source_ref", node.logical_id))
1672        .chain(
1673            prepared
1674                .node_retires
1675                .iter()
1676                .filter(|r| r.source_ref.is_none())
1677                .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
1678        )
1679        .chain(
1680            prepared
1681                .edges
1682                .iter()
1683                .filter(|e| e.source_ref.is_none())
1684                .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
1685        )
1686        .chain(
1687            prepared
1688                .edge_retires
1689                .iter()
1690                .filter(|r| r.source_ref.is_none())
1691                .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
1692        )
1693        .chain(
1694            prepared
1695                .runs
1696                .iter()
1697                .filter(|r| r.source_ref.is_none())
1698                .map(|r| format!("run '{}' has no source_ref", r.id)),
1699        )
1700        .chain(
1701            prepared
1702                .steps
1703                .iter()
1704                .filter(|s| s.source_ref.is_none())
1705                .map(|s| format!("step '{}' has no source_ref", s.id)),
1706        )
1707        .chain(
1708            prepared
1709                .actions
1710                .iter()
1711                .filter(|a| a.source_ref.is_none())
1712                .map(|a| format!("action '{}' has no source_ref", a.id)),
1713        )
1714        .chain(
1715            prepared
1716                .operational_writes
1717                .iter()
1718                .filter(|write| operational_write_source_ref(write).is_none())
1719                .map(|write| {
1720                    format!(
1721                        "operational {} '{}:{}' has no source_ref",
1722                        operational_write_kind(write),
1723                        operational_write_collection(write),
1724                        operational_write_record_key(write)
1725                    )
1726                }),
1727        )
1728        .collect();
1729
1730    let mut warnings = provenance_warnings.clone();
1731    warnings.extend(prepared.operational_validation_warnings.clone());
1732
1733    Ok(WriteReceipt {
1734        label: prepared.label.clone(),
1735        optional_backfill_count: prepared.optional_backfills.len(),
1736        warnings,
1737        provenance_warnings,
1738    })
1739}
1740
1741fn operational_write_collection(write: &OperationalWrite) -> &str {
1742    match write {
1743        OperationalWrite::Append { collection, .. }
1744        | OperationalWrite::Put { collection, .. }
1745        | OperationalWrite::Delete { collection, .. } => collection,
1746    }
1747}
1748
1749fn operational_write_record_key(write: &OperationalWrite) -> &str {
1750    match write {
1751        OperationalWrite::Append { record_key, .. }
1752        | OperationalWrite::Put { record_key, .. }
1753        | OperationalWrite::Delete { record_key, .. } => record_key,
1754    }
1755}
1756
1757fn operational_write_kind(write: &OperationalWrite) -> &'static str {
1758    match write {
1759        OperationalWrite::Append { .. } => "append",
1760        OperationalWrite::Put { .. } => "put",
1761        OperationalWrite::Delete { .. } => "delete",
1762    }
1763}
1764
1765fn operational_write_payload(write: &OperationalWrite) -> &str {
1766    match write {
1767        OperationalWrite::Append { payload_json, .. }
1768        | OperationalWrite::Put { payload_json, .. } => payload_json,
1769        OperationalWrite::Delete { .. } => "null",
1770    }
1771}
1772
1773fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
1774    match write {
1775        OperationalWrite::Append { source_ref, .. }
1776        | OperationalWrite::Put { source_ref, .. }
1777        | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
1778    }
1779}
1780
1781#[cfg(test)]
1782#[allow(clippy::expect_used)]
1783mod tests {
1784    use std::sync::Arc;
1785
1786    use fathomdb_schema::SchemaManager;
1787    use tempfile::NamedTempFile;
1788
1789    use super::{apply_write, prepare_write, resolve_operational_writes};
1790    use crate::{
1791        ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
1792        NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
1793        StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
1794        projection::ProjectionTarget,
1795    };
1796
1797    #[test]
1798    fn writer_executes_runtime_table_rows() {
1799        let db = NamedTempFile::new().expect("temporary db");
1800        let writer = WriterActor::start(
1801            db.path(),
1802            Arc::new(SchemaManager::new()),
1803            ProvenanceMode::Warn,
1804            Arc::new(TelemetryCounters::default()),
1805        )
1806        .expect("writer");
1807
1808        let receipt = writer
1809            .submit(WriteRequest {
1810                label: "runtime".to_owned(),
1811                nodes: vec![],
1812                node_retires: vec![],
1813                edges: vec![],
1814                edge_retires: vec![],
1815                chunks: vec![],
1816                runs: vec![RunInsert {
1817                    id: "run-1".to_owned(),
1818                    kind: "session".to_owned(),
1819                    status: "completed".to_owned(),
1820                    properties: "{}".to_owned(),
1821                    source_ref: Some("src-1".to_owned()),
1822                    upsert: false,
1823                    supersedes_id: None,
1824                }],
1825                steps: vec![StepInsert {
1826                    id: "step-1".to_owned(),
1827                    run_id: "run-1".to_owned(),
1828                    kind: "llm".to_owned(),
1829                    status: "completed".to_owned(),
1830                    properties: "{}".to_owned(),
1831                    source_ref: Some("src-1".to_owned()),
1832                    upsert: false,
1833                    supersedes_id: None,
1834                }],
1835                actions: vec![ActionInsert {
1836                    id: "action-1".to_owned(),
1837                    step_id: "step-1".to_owned(),
1838                    kind: "emit".to_owned(),
1839                    status: "completed".to_owned(),
1840                    properties: "{}".to_owned(),
1841                    source_ref: Some("src-1".to_owned()),
1842                    upsert: false,
1843                    supersedes_id: None,
1844                }],
1845                optional_backfills: vec![],
1846                vec_inserts: vec![],
1847                operational_writes: vec![],
1848            })
1849            .expect("write receipt");
1850
1851        assert_eq!(receipt.label, "runtime");
1852    }
1853
1854    #[test]
1855    fn writer_put_operational_write_updates_current_and_mutations() {
1856        let db = NamedTempFile::new().expect("temporary db");
1857        let conn = rusqlite::Connection::open(db.path()).expect("conn");
1858        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
1859        conn.execute(
1860            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
1861             VALUES ('connector_health', 'latest_state', '{}', '{}')",
1862            [],
1863        )
1864        .expect("seed collection");
1865        drop(conn);
1866        let writer = WriterActor::start(
1867            db.path(),
1868            Arc::new(SchemaManager::new()),
1869            ProvenanceMode::Warn,
1870            Arc::new(TelemetryCounters::default()),
1871        )
1872        .expect("writer");
1873
1874        writer
1875            .submit(WriteRequest {
1876                label: "node-and-operational".to_owned(),
1877                nodes: vec![NodeInsert {
1878                    row_id: "row-1".to_owned(),
1879                    logical_id: "lg-1".to_owned(),
1880                    kind: "Meeting".to_owned(),
1881                    properties: "{}".to_owned(),
1882                    source_ref: Some("src-1".to_owned()),
1883                    upsert: false,
1884                    chunk_policy: ChunkPolicy::Preserve,
1885                }],
1886                node_retires: vec![],
1887                edges: vec![],
1888                edge_retires: vec![],
1889                chunks: vec![],
1890                runs: vec![],
1891                steps: vec![],
1892                actions: vec![],
1893                optional_backfills: vec![],
1894                vec_inserts: vec![],
1895                operational_writes: vec![OperationalWrite::Put {
1896                    collection: "connector_health".to_owned(),
1897                    record_key: "gmail".to_owned(),
1898                    payload_json: r#"{"status":"ok"}"#.to_owned(),
1899                    source_ref: Some("src-1".to_owned()),
1900                }],
1901            })
1902            .expect("write receipt");
1903
1904        let conn = rusqlite::Connection::open(db.path()).expect("conn");
1905        let node_count: i64 = conn
1906            .query_row(
1907                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
1908                [],
1909                |row| row.get(0),
1910            )
1911            .expect("node count");
1912        assert_eq!(node_count, 1);
1913        let mutation_count: i64 = conn
1914            .query_row(
1915                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
1916                 AND record_key = 'gmail'",
1917                [],
1918                |row| row.get(0),
1919            )
1920            .expect("mutation count");
1921        assert_eq!(mutation_count, 1);
1922        let payload: String = conn
1923            .query_row(
1924                "SELECT payload_json FROM operational_current \
1925                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
1926                [],
1927                |row| row.get(0),
1928            )
1929            .expect("current payload");
1930        assert_eq!(payload, r#"{"status":"ok"}"#);
1931    }
1932
1933    #[test]
1934    fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
1935        let db = NamedTempFile::new().expect("temporary db");
1936        let conn = rusqlite::Connection::open(db.path()).expect("conn");
1937        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
1938        conn.execute(
1939            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
1940             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
1941            [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
1942        )
1943        .expect("seed collection");
1944        drop(conn);
1945        let writer = WriterActor::start(
1946            db.path(),
1947            Arc::new(SchemaManager::new()),
1948            ProvenanceMode::Warn,
1949            Arc::new(TelemetryCounters::default()),
1950        )
1951        .expect("writer");
1952
1953        writer
1954            .submit(WriteRequest {
1955                label: "disabled-validation".to_owned(),
1956                nodes: vec![],
1957                node_retires: vec![],
1958                edges: vec![],
1959                edge_retires: vec![],
1960                chunks: vec![],
1961                runs: vec![],
1962                steps: vec![],
1963                actions: vec![],
1964                optional_backfills: vec![],
1965                vec_inserts: vec![],
1966                operational_writes: vec![OperationalWrite::Put {
1967                    collection: "connector_health".to_owned(),
1968                    record_key: "gmail".to_owned(),
1969                    payload_json: r#"{"bogus":true}"#.to_owned(),
1970                    source_ref: Some("src-1".to_owned()),
1971                }],
1972            })
1973            .expect("write receipt");
1974
1975        let conn = rusqlite::Connection::open(db.path()).expect("conn");
1976        let payload: String = conn
1977            .query_row(
1978                "SELECT payload_json FROM operational_current \
1979                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
1980                [],
1981                |row| row.get(0),
1982            )
1983            .expect("current payload");
1984        assert_eq!(payload, r#"{"bogus":true}"#);
1985    }
1986
1987    #[test]
1988    fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
1989        let db = NamedTempFile::new().expect("temporary db");
1990        let conn = rusqlite::Connection::open(db.path()).expect("conn");
1991        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
1992        conn.execute(
1993            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
1994             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
1995            [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
1996        )
1997        .expect("seed collection");
1998        drop(conn);
1999        let writer = WriterActor::start(
2000            db.path(),
2001            Arc::new(SchemaManager::new()),
2002            ProvenanceMode::Warn,
2003            Arc::new(TelemetryCounters::default()),
2004        )
2005        .expect("writer");
2006
2007        let receipt = writer
2008            .submit(WriteRequest {
2009                label: "report-only-validation".to_owned(),
2010                nodes: vec![],
2011                node_retires: vec![],
2012                edges: vec![],
2013                edge_retires: vec![],
2014                chunks: vec![],
2015                runs: vec![],
2016                steps: vec![],
2017                actions: vec![],
2018                optional_backfills: vec![],
2019                vec_inserts: vec![],
2020                operational_writes: vec![OperationalWrite::Put {
2021                    collection: "connector_health".to_owned(),
2022                    record_key: "gmail".to_owned(),
2023                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2024                    source_ref: Some("src-1".to_owned()),
2025                }],
2026            })
2027            .expect("report_only write should succeed");
2028
2029        assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2030        assert_eq!(receipt.warnings.len(), 1);
2031        assert!(
2032            receipt.warnings[0].contains("connector_health"),
2033            "warning should identify collection"
2034        );
2035        assert!(
2036            receipt.warnings[0].contains("must be one of"),
2037            "warning should explain validation failure"
2038        );
2039
2040        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2041        let payload: String = conn
2042            .query_row(
2043                "SELECT payload_json FROM operational_current \
2044                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2045                [],
2046                |row| row.get(0),
2047            )
2048            .expect("current payload");
2049        assert_eq!(payload, r#"{"status":"bogus"}"#);
2050    }
2051
2052    #[test]
2053    fn writer_rejects_operational_write_for_missing_collection() {
2054        let db = NamedTempFile::new().expect("temporary db");
2055        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2056        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2057        drop(conn);
2058        let writer = WriterActor::start(
2059            db.path(),
2060            Arc::new(SchemaManager::new()),
2061            ProvenanceMode::Warn,
2062            Arc::new(TelemetryCounters::default()),
2063        )
2064        .expect("writer");
2065
2066        let result = writer.submit(WriteRequest {
2067            label: "missing-operational-collection".to_owned(),
2068            nodes: vec![],
2069            node_retires: vec![],
2070            edges: vec![],
2071            edge_retires: vec![],
2072            chunks: vec![],
2073            runs: vec![],
2074            steps: vec![],
2075            actions: vec![],
2076            optional_backfills: vec![],
2077            vec_inserts: vec![],
2078            operational_writes: vec![OperationalWrite::Put {
2079                collection: "connector_health".to_owned(),
2080                record_key: "gmail".to_owned(),
2081                payload_json: r#"{"status":"ok"}"#.to_owned(),
2082                source_ref: Some("src-1".to_owned()),
2083            }],
2084        });
2085
2086        assert!(
2087            matches!(result, Err(EngineError::InvalidWrite(_))),
2088            "missing operational collection must return InvalidWrite"
2089        );
2090    }
2091
2092    #[test]
2093    fn writer_append_operational_write_records_history_without_current_row() {
2094        let db = NamedTempFile::new().expect("temporary db");
2095        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2096        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2097        conn.execute(
2098            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2099             VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2100            [],
2101        )
2102        .expect("seed collection");
2103        drop(conn);
2104        let writer = WriterActor::start(
2105            db.path(),
2106            Arc::new(SchemaManager::new()),
2107            ProvenanceMode::Warn,
2108            Arc::new(TelemetryCounters::default()),
2109        )
2110        .expect("writer");
2111
2112        writer
2113            .submit(WriteRequest {
2114                label: "append-operational".to_owned(),
2115                nodes: vec![],
2116                node_retires: vec![],
2117                edges: vec![],
2118                edge_retires: vec![],
2119                chunks: vec![],
2120                runs: vec![],
2121                steps: vec![],
2122                actions: vec![],
2123                optional_backfills: vec![],
2124                vec_inserts: vec![],
2125                operational_writes: vec![OperationalWrite::Append {
2126                    collection: "audit_log".to_owned(),
2127                    record_key: "evt-1".to_owned(),
2128                    payload_json: r#"{"type":"sync"}"#.to_owned(),
2129                    source_ref: Some("src-1".to_owned()),
2130                }],
2131            })
2132            .expect("write receipt");
2133
2134        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2135        let mutation: (String, String) = conn
2136            .query_row(
2137                "SELECT op_kind, payload_json FROM operational_mutations \
2138                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2139                [],
2140                |row| Ok((row.get(0)?, row.get(1)?)),
2141            )
2142            .expect("mutation row");
2143        assert_eq!(mutation.0, "append");
2144        assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2145        let current_count: i64 = conn
2146            .query_row(
2147                "SELECT count(*) FROM operational_current \
2148                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2149                [],
2150                |row| row.get(0),
2151            )
2152            .expect("current count");
2153        assert_eq!(current_count, 0);
2154    }
2155
2156    #[test]
2157    fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2158        let db = NamedTempFile::new().expect("temporary db");
2159        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2160        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2161        conn.execute(
2162            "INSERT INTO operational_collections \
2163             (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2164             VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2165                     '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2166            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2167        )
2168        .expect("seed collection");
2169        drop(conn);
2170        let writer = WriterActor::start(
2171            db.path(),
2172            Arc::new(SchemaManager::new()),
2173            ProvenanceMode::Warn,
2174            Arc::new(TelemetryCounters::default()),
2175        )
2176        .expect("writer");
2177
2178        let error = writer
2179            .submit(WriteRequest {
2180                label: "invalid-append".to_owned(),
2181                nodes: vec![],
2182                node_retires: vec![],
2183                edges: vec![],
2184                edge_retires: vec![],
2185                chunks: vec![],
2186                runs: vec![],
2187                steps: vec![],
2188                actions: vec![],
2189                optional_backfills: vec![],
2190                vec_inserts: vec![],
2191                operational_writes: vec![OperationalWrite::Append {
2192                    collection: "audit_log".to_owned(),
2193                    record_key: "evt-1".to_owned(),
2194                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2195                    source_ref: Some("src-1".to_owned()),
2196                }],
2197            })
2198            .expect_err("invalid append must reject");
2199        assert!(matches!(error, EngineError::InvalidWrite(_)));
2200        assert!(error.to_string().contains("must be one of"));
2201
2202        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2203        let mutation_count: i64 = conn
2204            .query_row(
2205                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2206                [],
2207                |row| row.get(0),
2208            )
2209            .expect("mutation count");
2210        assert_eq!(mutation_count, 0);
2211        let filter_count: i64 = conn
2212            .query_row(
2213                "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2214                [],
2215                |row| row.get(0),
2216            )
2217            .expect("filter count");
2218        assert_eq!(filter_count, 0);
2219    }
2220
2221    #[test]
2222    fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2223        let db = NamedTempFile::new().expect("temporary db");
2224        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2225        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2226        conn.execute(
2227            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2228             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2229            [],
2230        )
2231        .expect("seed collection");
2232        drop(conn);
2233        let writer = WriterActor::start(
2234            db.path(),
2235            Arc::new(SchemaManager::new()),
2236            ProvenanceMode::Warn,
2237            Arc::new(TelemetryCounters::default()),
2238        )
2239        .expect("writer");
2240
2241        writer
2242            .submit(WriteRequest {
2243                label: "put-operational".to_owned(),
2244                nodes: vec![],
2245                node_retires: vec![],
2246                edges: vec![],
2247                edge_retires: vec![],
2248                chunks: vec![],
2249                runs: vec![],
2250                steps: vec![],
2251                actions: vec![],
2252                optional_backfills: vec![],
2253                vec_inserts: vec![],
2254                operational_writes: vec![OperationalWrite::Put {
2255                    collection: "connector_health".to_owned(),
2256                    record_key: "gmail".to_owned(),
2257                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2258                    source_ref: Some("src-1".to_owned()),
2259                }],
2260            })
2261            .expect("put receipt");
2262
2263        writer
2264            .submit(WriteRequest {
2265                label: "delete-operational".to_owned(),
2266                nodes: vec![],
2267                node_retires: vec![],
2268                edges: vec![],
2269                edge_retires: vec![],
2270                chunks: vec![],
2271                runs: vec![],
2272                steps: vec![],
2273                actions: vec![],
2274                optional_backfills: vec![],
2275                vec_inserts: vec![],
2276                operational_writes: vec![OperationalWrite::Delete {
2277                    collection: "connector_health".to_owned(),
2278                    record_key: "gmail".to_owned(),
2279                    source_ref: Some("src-2".to_owned()),
2280                }],
2281            })
2282            .expect("delete receipt");
2283
2284        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2285        let mutation_kinds: Vec<String> = {
2286            let mut stmt = conn
2287                .prepare(
2288                    "SELECT op_kind FROM operational_mutations \
2289                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2290                     ORDER BY mutation_order ASC",
2291                )
2292                .expect("stmt");
2293            stmt.query_map([], |row| row.get(0))
2294                .expect("rows")
2295                .collect::<Result<_, _>>()
2296                .expect("collect")
2297        };
2298        assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
2299        let current_count: i64 = conn
2300            .query_row(
2301                "SELECT count(*) FROM operational_current \
2302                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2303                [],
2304                |row| row.get(0),
2305            )
2306            .expect("current count");
2307        assert_eq!(current_count, 0);
2308    }
2309
2310    #[test]
2311    fn writer_delete_bypasses_validation_contract() {
2312        let db = NamedTempFile::new().expect("temporary db");
2313        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2314        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2315        conn.execute(
2316            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2317             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2318            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2319        )
2320        .expect("seed collection");
2321        drop(conn);
2322        let writer = WriterActor::start(
2323            db.path(),
2324            Arc::new(SchemaManager::new()),
2325            ProvenanceMode::Warn,
2326            Arc::new(TelemetryCounters::default()),
2327        )
2328        .expect("writer");
2329
2330        writer
2331            .submit(WriteRequest {
2332                label: "valid-put".to_owned(),
2333                nodes: vec![],
2334                node_retires: vec![],
2335                edges: vec![],
2336                edge_retires: vec![],
2337                chunks: vec![],
2338                runs: vec![],
2339                steps: vec![],
2340                actions: vec![],
2341                optional_backfills: vec![],
2342                vec_inserts: vec![],
2343                operational_writes: vec![OperationalWrite::Put {
2344                    collection: "connector_health".to_owned(),
2345                    record_key: "gmail".to_owned(),
2346                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2347                    source_ref: Some("src-1".to_owned()),
2348                }],
2349            })
2350            .expect("put receipt");
2351        writer
2352            .submit(WriteRequest {
2353                label: "delete-after-put".to_owned(),
2354                nodes: vec![],
2355                node_retires: vec![],
2356                edges: vec![],
2357                edge_retires: vec![],
2358                chunks: vec![],
2359                runs: vec![],
2360                steps: vec![],
2361                actions: vec![],
2362                optional_backfills: vec![],
2363                vec_inserts: vec![],
2364                operational_writes: vec![OperationalWrite::Delete {
2365                    collection: "connector_health".to_owned(),
2366                    record_key: "gmail".to_owned(),
2367                    source_ref: Some("src-2".to_owned()),
2368                }],
2369            })
2370            .expect("delete receipt");
2371
2372        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2373        let current_count: i64 = conn
2374            .query_row(
2375                "SELECT count(*) FROM operational_current \
2376                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2377                [],
2378                |row| row.get(0),
2379            )
2380            .expect("current count");
2381        assert_eq!(current_count, 0);
2382    }
2383
2384    #[test]
2385    fn writer_latest_state_secondary_indexes_track_put_and_delete() {
2386        let db = NamedTempFile::new().expect("temporary db");
2387        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2388        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2389        conn.execute(
2390            "INSERT INTO operational_collections \
2391             (name, kind, schema_json, retention_json, secondary_indexes_json) \
2392             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2393            [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
2394        )
2395        .expect("seed collection");
2396        drop(conn);
2397        let writer = WriterActor::start(
2398            db.path(),
2399            Arc::new(SchemaManager::new()),
2400            ProvenanceMode::Warn,
2401            Arc::new(TelemetryCounters::default()),
2402        )
2403        .expect("writer");
2404
2405        writer
2406            .submit(WriteRequest {
2407                label: "secondary-index-put".to_owned(),
2408                nodes: vec![],
2409                node_retires: vec![],
2410                edges: vec![],
2411                edge_retires: vec![],
2412                chunks: vec![],
2413                runs: vec![],
2414                steps: vec![],
2415                actions: vec![],
2416                optional_backfills: vec![],
2417                vec_inserts: vec![],
2418                operational_writes: vec![OperationalWrite::Put {
2419                    collection: "connector_health".to_owned(),
2420                    record_key: "gmail".to_owned(),
2421                    payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
2422                        .to_owned(),
2423                    source_ref: Some("src-1".to_owned()),
2424                }],
2425            })
2426            .expect("put receipt");
2427
2428        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2429        let current_entry_count: i64 = conn
2430            .query_row(
2431                "SELECT count(*) FROM operational_secondary_index_entries \
2432                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2433                [],
2434                |row| row.get(0),
2435            )
2436            .expect("current secondary index count");
2437        assert_eq!(current_entry_count, 2);
2438        drop(conn);
2439
2440        writer
2441            .submit(WriteRequest {
2442                label: "secondary-index-delete".to_owned(),
2443                nodes: vec![],
2444                node_retires: vec![],
2445                edges: vec![],
2446                edge_retires: vec![],
2447                chunks: vec![],
2448                runs: vec![],
2449                steps: vec![],
2450                actions: vec![],
2451                optional_backfills: vec![],
2452                vec_inserts: vec![],
2453                operational_writes: vec![OperationalWrite::Delete {
2454                    collection: "connector_health".to_owned(),
2455                    record_key: "gmail".to_owned(),
2456                    source_ref: Some("src-2".to_owned()),
2457                }],
2458            })
2459            .expect("delete receipt");
2460
2461        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2462        let current_entry_count: i64 = conn
2463            .query_row(
2464                "SELECT count(*) FROM operational_secondary_index_entries \
2465                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2466                [],
2467                |row| row.get(0),
2468            )
2469            .expect("current secondary index count");
2470        assert_eq!(current_entry_count, 0);
2471    }
2472
2473    #[test]
2474    fn writer_latest_state_operational_writes_persist_mutation_order() {
2475        let db = NamedTempFile::new().expect("temporary db");
2476        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2477        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2478        conn.execute(
2479            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2480             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2481            [],
2482        )
2483        .expect("seed collection");
2484        drop(conn);
2485
2486        let writer = WriterActor::start(
2487            db.path(),
2488            Arc::new(SchemaManager::new()),
2489            ProvenanceMode::Warn,
2490            Arc::new(TelemetryCounters::default()),
2491        )
2492        .expect("writer");
2493
2494        writer
2495            .submit(WriteRequest {
2496                label: "ordered-operational-batch".to_owned(),
2497                nodes: vec![],
2498                node_retires: vec![],
2499                edges: vec![],
2500                edge_retires: vec![],
2501                chunks: vec![],
2502                runs: vec![],
2503                steps: vec![],
2504                actions: vec![],
2505                optional_backfills: vec![],
2506                vec_inserts: vec![],
2507                operational_writes: vec![
2508                    OperationalWrite::Put {
2509                        collection: "connector_health".to_owned(),
2510                        record_key: "gmail".to_owned(),
2511                        payload_json: r#"{"status":"old"}"#.to_owned(),
2512                        source_ref: Some("src-1".to_owned()),
2513                    },
2514                    OperationalWrite::Delete {
2515                        collection: "connector_health".to_owned(),
2516                        record_key: "gmail".to_owned(),
2517                        source_ref: Some("src-2".to_owned()),
2518                    },
2519                    OperationalWrite::Put {
2520                        collection: "connector_health".to_owned(),
2521                        record_key: "gmail".to_owned(),
2522                        payload_json: r#"{"status":"new"}"#.to_owned(),
2523                        source_ref: Some("src-3".to_owned()),
2524                    },
2525                ],
2526            })
2527            .expect("write receipt");
2528
2529        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2530        let rows: Vec<(String, i64)> = {
2531            let mut stmt = conn
2532                .prepare(
2533                    "SELECT op_kind, mutation_order FROM operational_mutations \
2534                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2535                     ORDER BY mutation_order ASC",
2536                )
2537                .expect("stmt");
2538            stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
2539                .expect("rows")
2540                .collect::<Result<_, _>>()
2541                .expect("collect")
2542        };
2543        assert_eq!(
2544            rows,
2545            vec![
2546                ("put".to_owned(), 1),
2547                ("delete".to_owned(), 2),
2548                ("put".to_owned(), 3),
2549            ]
2550        );
2551        let payload: String = conn
2552            .query_row(
2553                "SELECT payload_json FROM operational_current \
2554                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2555                [],
2556                |row| row.get(0),
2557            )
2558            .expect("current payload");
2559        assert_eq!(payload, r#"{"status":"new"}"#);
2560    }
2561
2562    #[test]
2563    fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
2564        let db = NamedTempFile::new().expect("temporary db");
2565        let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
2566        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2567        conn.execute(
2568            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2569             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2570            [],
2571        )
2572        .expect("seed collection");
2573
2574        let request = WriteRequest {
2575            label: "disabled-race".to_owned(),
2576            nodes: vec![],
2577            node_retires: vec![],
2578            edges: vec![],
2579            edge_retires: vec![],
2580            chunks: vec![],
2581            runs: vec![],
2582            steps: vec![],
2583            actions: vec![],
2584            optional_backfills: vec![],
2585            vec_inserts: vec![],
2586            operational_writes: vec![OperationalWrite::Put {
2587                collection: "connector_health".to_owned(),
2588                record_key: "gmail".to_owned(),
2589                payload_json: r#"{"status":"ok"}"#.to_owned(),
2590                source_ref: Some("src-1".to_owned()),
2591            }],
2592        };
2593        let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
2594        resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
2595
2596        conn.execute(
2597            "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
2598            [],
2599        )
2600        .expect("disable collection after preflight");
2601
2602        let error =
2603            apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
2604        assert!(matches!(error, EngineError::InvalidWrite(_)));
2605        assert!(error.to_string().contains("is disabled"));
2606
2607        let mutation_count: i64 = conn
2608            .query_row(
2609                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
2610                [],
2611                |row| row.get(0),
2612            )
2613            .expect("mutation count");
2614        assert_eq!(mutation_count, 0);
2615
2616        let current_count: i64 = conn
2617            .query_row(
2618                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
2619                [],
2620                |row| row.get(0),
2621            )
2622            .expect("current count");
2623        assert_eq!(current_count, 0);
2624    }
2625
2626    #[test]
2627    fn writer_enforce_validation_rejects_invalid_put_atomically() {
2628        let db = NamedTempFile::new().expect("temporary db");
2629        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2630        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2631        conn.execute(
2632            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2633             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2634            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2635        )
2636        .expect("seed collection");
2637        drop(conn);
2638        let writer = WriterActor::start(
2639            db.path(),
2640            Arc::new(SchemaManager::new()),
2641            ProvenanceMode::Warn,
2642            Arc::new(TelemetryCounters::default()),
2643        )
2644        .expect("writer");
2645
2646        let error = writer
2647            .submit(WriteRequest {
2648                label: "invalid-put".to_owned(),
2649                nodes: vec![NodeInsert {
2650                    row_id: "row-1".to_owned(),
2651                    logical_id: "lg-1".to_owned(),
2652                    kind: "Meeting".to_owned(),
2653                    properties: "{}".to_owned(),
2654                    source_ref: Some("src-1".to_owned()),
2655                    upsert: false,
2656                    chunk_policy: ChunkPolicy::Preserve,
2657                }],
2658                node_retires: vec![],
2659                edges: vec![],
2660                edge_retires: vec![],
2661                chunks: vec![],
2662                runs: vec![],
2663                steps: vec![],
2664                actions: vec![],
2665                optional_backfills: vec![],
2666                vec_inserts: vec![],
2667                operational_writes: vec![OperationalWrite::Put {
2668                    collection: "connector_health".to_owned(),
2669                    record_key: "gmail".to_owned(),
2670                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2671                    source_ref: Some("src-1".to_owned()),
2672                }],
2673            })
2674            .expect_err("invalid put must reject");
2675        assert!(matches!(error, EngineError::InvalidWrite(_)));
2676        assert!(error.to_string().contains("must be one of"));
2677
2678        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2679        let node_count: i64 = conn
2680            .query_row(
2681                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2682                [],
2683                |row| row.get(0),
2684            )
2685            .expect("node count");
2686        assert_eq!(node_count, 0);
2687        let mutation_count: i64 = conn
2688            .query_row(
2689                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
2690                [],
2691                |row| row.get(0),
2692            )
2693            .expect("mutation count");
2694        assert_eq!(mutation_count, 0);
2695        let current_count: i64 = conn
2696            .query_row(
2697                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
2698                [],
2699                |row| row.get(0),
2700            )
2701            .expect("current count");
2702        assert_eq!(current_count, 0);
2703    }
2704
2705    #[test]
2706    fn writer_rejects_append_against_latest_state_collection() {
2707        let db = NamedTempFile::new().expect("temporary db");
2708        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2709        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2710        conn.execute(
2711            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2712             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2713            [],
2714        )
2715        .expect("seed collection");
2716        drop(conn);
2717        let writer = WriterActor::start(
2718            db.path(),
2719            Arc::new(SchemaManager::new()),
2720            ProvenanceMode::Warn,
2721            Arc::new(TelemetryCounters::default()),
2722        )
2723        .expect("writer");
2724
2725        let result = writer.submit(WriteRequest {
2726            label: "bad-append".to_owned(),
2727            nodes: vec![],
2728            node_retires: vec![],
2729            edges: vec![],
2730            edge_retires: vec![],
2731            chunks: vec![],
2732            runs: vec![],
2733            steps: vec![],
2734            actions: vec![],
2735            optional_backfills: vec![],
2736            vec_inserts: vec![],
2737            operational_writes: vec![OperationalWrite::Append {
2738                collection: "connector_health".to_owned(),
2739                record_key: "gmail".to_owned(),
2740                payload_json: r#"{"status":"ok"}"#.to_owned(),
2741                source_ref: Some("src-1".to_owned()),
2742            }],
2743        });
2744
2745        assert!(
2746            matches!(result, Err(EngineError::InvalidWrite(_))),
2747            "latest_state collection must reject Append"
2748        );
2749    }
2750
2751    #[test]
2752    fn writer_upsert_supersedes_prior_active_node() {
2753        let db = NamedTempFile::new().expect("temporary db");
2754        let writer = WriterActor::start(
2755            db.path(),
2756            Arc::new(SchemaManager::new()),
2757            ProvenanceMode::Warn,
2758            Arc::new(TelemetryCounters::default()),
2759        )
2760        .expect("writer");
2761
2762        writer
2763            .submit(WriteRequest {
2764                label: "v1".to_owned(),
2765                nodes: vec![NodeInsert {
2766                    row_id: "row-1".to_owned(),
2767                    logical_id: "lg-1".to_owned(),
2768                    kind: "Meeting".to_owned(),
2769                    properties: r#"{"version":1}"#.to_owned(),
2770                    source_ref: Some("src-1".to_owned()),
2771                    upsert: false,
2772                    chunk_policy: ChunkPolicy::Preserve,
2773                }],
2774                node_retires: vec![],
2775                edges: vec![],
2776                edge_retires: vec![],
2777                chunks: vec![],
2778                runs: vec![],
2779                steps: vec![],
2780                actions: vec![],
2781                optional_backfills: vec![],
2782                vec_inserts: vec![],
2783                operational_writes: vec![],
2784            })
2785            .expect("v1 write");
2786
2787        writer
2788            .submit(WriteRequest {
2789                label: "v2".to_owned(),
2790                nodes: vec![NodeInsert {
2791                    row_id: "row-2".to_owned(),
2792                    logical_id: "lg-1".to_owned(),
2793                    kind: "Meeting".to_owned(),
2794                    properties: r#"{"version":2}"#.to_owned(),
2795                    source_ref: Some("src-2".to_owned()),
2796                    upsert: true,
2797                    chunk_policy: ChunkPolicy::Preserve,
2798                }],
2799                node_retires: vec![],
2800                edges: vec![],
2801                edge_retires: vec![],
2802                chunks: vec![],
2803                runs: vec![],
2804                steps: vec![],
2805                actions: vec![],
2806                optional_backfills: vec![],
2807                vec_inserts: vec![],
2808                operational_writes: vec![],
2809            })
2810            .expect("v2 upsert write");
2811
2812        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2813        let (active_row_id, props): (String, String) = conn
2814            .query_row(
2815                "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
2816                [],
2817                |row| Ok((row.get(0)?, row.get(1)?)),
2818            )
2819            .expect("active row");
2820        assert_eq!(active_row_id, "row-2");
2821        assert!(props.contains("\"version\":2"));
2822
2823        let superseded: i64 = conn
2824            .query_row(
2825                "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
2826                [],
2827                |row| row.get(0),
2828            )
2829            .expect("superseded count");
2830        assert_eq!(superseded, 1);
2831    }
2832
2833    #[test]
2834    fn writer_inserts_edge_between_two_nodes() {
2835        let db = NamedTempFile::new().expect("temporary db");
2836        let writer = WriterActor::start(
2837            db.path(),
2838            Arc::new(SchemaManager::new()),
2839            ProvenanceMode::Warn,
2840            Arc::new(TelemetryCounters::default()),
2841        )
2842        .expect("writer");
2843
2844        writer
2845            .submit(WriteRequest {
2846                label: "nodes-and-edge".to_owned(),
2847                nodes: vec![
2848                    NodeInsert {
2849                        row_id: "row-meeting".to_owned(),
2850                        logical_id: "meeting-1".to_owned(),
2851                        kind: "Meeting".to_owned(),
2852                        properties: "{}".to_owned(),
2853                        source_ref: Some("src-1".to_owned()),
2854                        upsert: false,
2855                        chunk_policy: ChunkPolicy::Preserve,
2856                    },
2857                    NodeInsert {
2858                        row_id: "row-task".to_owned(),
2859                        logical_id: "task-1".to_owned(),
2860                        kind: "Task".to_owned(),
2861                        properties: "{}".to_owned(),
2862                        source_ref: Some("src-1".to_owned()),
2863                        upsert: false,
2864                        chunk_policy: ChunkPolicy::Preserve,
2865                    },
2866                ],
2867                node_retires: vec![],
2868                edges: vec![EdgeInsert {
2869                    row_id: "edge-1".to_owned(),
2870                    logical_id: "edge-lg-1".to_owned(),
2871                    source_logical_id: "meeting-1".to_owned(),
2872                    target_logical_id: "task-1".to_owned(),
2873                    kind: "HAS_TASK".to_owned(),
2874                    properties: "{}".to_owned(),
2875                    source_ref: Some("src-1".to_owned()),
2876                    upsert: false,
2877                }],
2878                edge_retires: vec![],
2879                chunks: vec![],
2880                runs: vec![],
2881                steps: vec![],
2882                actions: vec![],
2883                optional_backfills: vec![],
2884                vec_inserts: vec![],
2885                operational_writes: vec![],
2886            })
2887            .expect("write receipt");
2888
2889        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2890        let (src, tgt, kind): (String, String, String) = conn
2891            .query_row(
2892                "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
2893                [],
2894                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2895            )
2896            .expect("edge row");
2897        assert_eq!(src, "meeting-1");
2898        assert_eq!(tgt, "task-1");
2899        assert_eq!(kind, "HAS_TASK");
2900    }
2901
2902    #[test]
2903    #[allow(clippy::too_many_lines)]
2904    fn writer_upsert_supersedes_prior_active_edge() {
2905        let db = NamedTempFile::new().expect("temporary db");
2906        let writer = WriterActor::start(
2907            db.path(),
2908            Arc::new(SchemaManager::new()),
2909            ProvenanceMode::Warn,
2910            Arc::new(TelemetryCounters::default()),
2911        )
2912        .expect("writer");
2913
2914        // Write two nodes
2915        writer
2916            .submit(WriteRequest {
2917                label: "nodes".to_owned(),
2918                nodes: vec![
2919                    NodeInsert {
2920                        row_id: "row-a".to_owned(),
2921                        logical_id: "node-a".to_owned(),
2922                        kind: "Meeting".to_owned(),
2923                        properties: "{}".to_owned(),
2924                        source_ref: Some("src-1".to_owned()),
2925                        upsert: false,
2926                        chunk_policy: ChunkPolicy::Preserve,
2927                    },
2928                    NodeInsert {
2929                        row_id: "row-b".to_owned(),
2930                        logical_id: "node-b".to_owned(),
2931                        kind: "Task".to_owned(),
2932                        properties: "{}".to_owned(),
2933                        source_ref: Some("src-1".to_owned()),
2934                        upsert: false,
2935                        chunk_policy: ChunkPolicy::Preserve,
2936                    },
2937                ],
2938                node_retires: vec![],
2939                edges: vec![],
2940                edge_retires: vec![],
2941                chunks: vec![],
2942                runs: vec![],
2943                steps: vec![],
2944                actions: vec![],
2945                optional_backfills: vec![],
2946                vec_inserts: vec![],
2947                operational_writes: vec![],
2948            })
2949            .expect("nodes write");
2950
2951        // Write v1 edge
2952        writer
2953            .submit(WriteRequest {
2954                label: "edge-v1".to_owned(),
2955                nodes: vec![],
2956                node_retires: vec![],
2957                edges: vec![EdgeInsert {
2958                    row_id: "edge-row-1".to_owned(),
2959                    logical_id: "edge-lg-1".to_owned(),
2960                    source_logical_id: "node-a".to_owned(),
2961                    target_logical_id: "node-b".to_owned(),
2962                    kind: "HAS_TASK".to_owned(),
2963                    properties: r#"{"weight":1}"#.to_owned(),
2964                    source_ref: Some("src-1".to_owned()),
2965                    upsert: false,
2966                }],
2967                edge_retires: vec![],
2968                chunks: vec![],
2969                runs: vec![],
2970                steps: vec![],
2971                actions: vec![],
2972                optional_backfills: vec![],
2973                vec_inserts: vec![],
2974                operational_writes: vec![],
2975            })
2976            .expect("edge v1 write");
2977
2978        // Upsert v2 edge
2979        writer
2980            .submit(WriteRequest {
2981                label: "edge-v2".to_owned(),
2982                nodes: vec![],
2983                node_retires: vec![],
2984                edges: vec![EdgeInsert {
2985                    row_id: "edge-row-2".to_owned(),
2986                    logical_id: "edge-lg-1".to_owned(),
2987                    source_logical_id: "node-a".to_owned(),
2988                    target_logical_id: "node-b".to_owned(),
2989                    kind: "HAS_TASK".to_owned(),
2990                    properties: r#"{"weight":2}"#.to_owned(),
2991                    source_ref: Some("src-2".to_owned()),
2992                    upsert: true,
2993                }],
2994                edge_retires: vec![],
2995                chunks: vec![],
2996                runs: vec![],
2997                steps: vec![],
2998                actions: vec![],
2999                optional_backfills: vec![],
3000                vec_inserts: vec![],
3001                operational_writes: vec![],
3002            })
3003            .expect("edge v2 upsert");
3004
3005        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3006        let (active_row_id, props): (String, String) = conn
3007            .query_row(
3008                "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3009                [],
3010                |row| Ok((row.get(0)?, row.get(1)?)),
3011            )
3012            .expect("active edge");
3013        assert_eq!(active_row_id, "edge-row-2");
3014        assert!(props.contains("\"weight\":2"));
3015
3016        let superseded: i64 = conn
3017            .query_row(
3018                "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3019                [],
3020                |row| row.get(0),
3021            )
3022            .expect("superseded count");
3023        assert_eq!(superseded, 1);
3024    }
3025
3026    #[test]
3027    fn writer_fts_rows_are_written_to_database() {
3028        let db = NamedTempFile::new().expect("temporary db");
3029        let writer = WriterActor::start(
3030            db.path(),
3031            Arc::new(SchemaManager::new()),
3032            ProvenanceMode::Warn,
3033            Arc::new(TelemetryCounters::default()),
3034        )
3035        .expect("writer");
3036
3037        writer
3038            .submit(WriteRequest {
3039                label: "seed".to_owned(),
3040                nodes: vec![NodeInsert {
3041                    row_id: "row-1".to_owned(),
3042                    logical_id: "logical-1".to_owned(),
3043                    kind: "Meeting".to_owned(),
3044                    properties: "{}".to_owned(),
3045                    source_ref: Some("src-1".to_owned()),
3046                    upsert: false,
3047                    chunk_policy: ChunkPolicy::Preserve,
3048                }],
3049                node_retires: vec![],
3050                edges: vec![],
3051                edge_retires: vec![],
3052                chunks: vec![ChunkInsert {
3053                    id: "chunk-1".to_owned(),
3054                    node_logical_id: "logical-1".to_owned(),
3055                    text_content: "budget discussion".to_owned(),
3056                    byte_start: None,
3057                    byte_end: None,
3058                }],
3059                runs: vec![],
3060                steps: vec![],
3061                actions: vec![],
3062                optional_backfills: vec![],
3063                vec_inserts: vec![],
3064                operational_writes: vec![],
3065            })
3066            .expect("write receipt");
3067
3068        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3069        let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3070            conn.query_row(
3071                "SELECT chunk_id, node_logical_id, kind, text_content \
3072                 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3073                [],
3074                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3075            )
3076            .expect("fts row");
3077        assert_eq!(chunk_id, "chunk-1");
3078        assert_eq!(node_logical_id, "logical-1");
3079        assert_eq!(kind, "Meeting");
3080        assert_eq!(text_content, "budget discussion");
3081    }
3082
3083    #[test]
3084    fn writer_receipt_warns_on_nodes_without_source_ref() {
3085        let db = NamedTempFile::new().expect("temporary db");
3086        let writer = WriterActor::start(
3087            db.path(),
3088            Arc::new(SchemaManager::new()),
3089            ProvenanceMode::Warn,
3090            Arc::new(TelemetryCounters::default()),
3091        )
3092        .expect("writer");
3093
3094        let receipt = writer
3095            .submit(WriteRequest {
3096                label: "no-source".to_owned(),
3097                nodes: vec![NodeInsert {
3098                    row_id: "row-1".to_owned(),
3099                    logical_id: "logical-1".to_owned(),
3100                    kind: "Meeting".to_owned(),
3101                    properties: "{}".to_owned(),
3102                    source_ref: None,
3103                    upsert: false,
3104                    chunk_policy: ChunkPolicy::Preserve,
3105                }],
3106                node_retires: vec![],
3107                edges: vec![],
3108                edge_retires: vec![],
3109                chunks: vec![],
3110                runs: vec![],
3111                steps: vec![],
3112                actions: vec![],
3113                optional_backfills: vec![],
3114                vec_inserts: vec![],
3115                operational_writes: vec![],
3116            })
3117            .expect("write receipt");
3118
3119        assert_eq!(receipt.provenance_warnings.len(), 1);
3120        assert!(receipt.provenance_warnings[0].contains("logical-1"));
3121    }
3122
3123    #[test]
3124    fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3125        let db = NamedTempFile::new().expect("temporary db");
3126        let writer = WriterActor::start(
3127            db.path(),
3128            Arc::new(SchemaManager::new()),
3129            ProvenanceMode::Warn,
3130            Arc::new(TelemetryCounters::default()),
3131        )
3132        .expect("writer");
3133
3134        let receipt = writer
3135            .submit(WriteRequest {
3136                label: "with-source".to_owned(),
3137                nodes: vec![NodeInsert {
3138                    row_id: "row-1".to_owned(),
3139                    logical_id: "logical-1".to_owned(),
3140                    kind: "Meeting".to_owned(),
3141                    properties: "{}".to_owned(),
3142                    source_ref: Some("src-1".to_owned()),
3143                    upsert: false,
3144                    chunk_policy: ChunkPolicy::Preserve,
3145                }],
3146                node_retires: vec![],
3147                edges: vec![],
3148                edge_retires: vec![],
3149                chunks: vec![],
3150                runs: vec![],
3151                steps: vec![],
3152                actions: vec![],
3153                optional_backfills: vec![],
3154                vec_inserts: vec![],
3155                operational_writes: vec![],
3156            })
3157            .expect("write receipt");
3158
3159        assert!(receipt.provenance_warnings.is_empty());
3160    }
3161
3162    #[test]
3163    fn writer_accepts_chunk_for_pre_existing_node() {
3164        let db = NamedTempFile::new().expect("temporary db");
3165        let writer = WriterActor::start(
3166            db.path(),
3167            Arc::new(SchemaManager::new()),
3168            ProvenanceMode::Warn,
3169            Arc::new(TelemetryCounters::default()),
3170        )
3171        .expect("writer");
3172
3173        // Request 1: submit node only
3174        writer
3175            .submit(WriteRequest {
3176                label: "r1".to_owned(),
3177                nodes: vec![NodeInsert {
3178                    row_id: "row-1".to_owned(),
3179                    logical_id: "logical-1".to_owned(),
3180                    kind: "Meeting".to_owned(),
3181                    properties: "{}".to_owned(),
3182                    source_ref: Some("src-1".to_owned()),
3183                    upsert: false,
3184                    chunk_policy: ChunkPolicy::Preserve,
3185                }],
3186                node_retires: vec![],
3187                edges: vec![],
3188                edge_retires: vec![],
3189                chunks: vec![],
3190                runs: vec![],
3191                steps: vec![],
3192                actions: vec![],
3193                optional_backfills: vec![],
3194                vec_inserts: vec![],
3195                operational_writes: vec![],
3196            })
3197            .expect("r1 write");
3198
3199        // Request 2: submit chunk for pre-existing node
3200        writer
3201            .submit(WriteRequest {
3202                label: "r2".to_owned(),
3203                nodes: vec![],
3204                node_retires: vec![],
3205                edges: vec![],
3206                edge_retires: vec![],
3207                chunks: vec![ChunkInsert {
3208                    id: "chunk-1".to_owned(),
3209                    node_logical_id: "logical-1".to_owned(),
3210                    text_content: "budget discussion".to_owned(),
3211                    byte_start: None,
3212                    byte_end: None,
3213                }],
3214                runs: vec![],
3215                steps: vec![],
3216                actions: vec![],
3217                optional_backfills: vec![],
3218                vec_inserts: vec![],
3219                operational_writes: vec![],
3220            })
3221            .expect("r2 write — chunk for pre-existing node");
3222
3223        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3224        let count: i64 = conn
3225            .query_row(
3226                "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3227                [],
3228                |row| row.get(0),
3229            )
3230            .expect("fts count");
3231        assert_eq!(
3232            count, 1,
3233            "FTS row must exist for chunk attached to pre-existing node"
3234        );
3235    }
3236
3237    #[test]
3238    fn writer_rejects_chunk_for_completely_unknown_node() {
3239        let db = NamedTempFile::new().expect("temporary db");
3240        let writer = WriterActor::start(
3241            db.path(),
3242            Arc::new(SchemaManager::new()),
3243            ProvenanceMode::Warn,
3244            Arc::new(TelemetryCounters::default()),
3245        )
3246        .expect("writer");
3247
3248        let result = writer.submit(WriteRequest {
3249            label: "bad".to_owned(),
3250            nodes: vec![],
3251            node_retires: vec![],
3252            edges: vec![],
3253            edge_retires: vec![],
3254            chunks: vec![ChunkInsert {
3255                id: "chunk-1".to_owned(),
3256                node_logical_id: "nonexistent".to_owned(),
3257                text_content: "some text".to_owned(),
3258                byte_start: None,
3259                byte_end: None,
3260            }],
3261            runs: vec![],
3262            steps: vec![],
3263            actions: vec![],
3264            optional_backfills: vec![],
3265            vec_inserts: vec![],
3266            operational_writes: vec![],
3267        });
3268
3269        assert!(
3270            matches!(result, Err(EngineError::InvalidWrite(_))),
3271            "completely unknown node must return InvalidWrite"
3272        );
3273    }
3274
3275    #[test]
3276    fn writer_executes_typed_nodes_chunks_and_derived_projections() {
3277        let db = NamedTempFile::new().expect("temporary db");
3278        let writer = WriterActor::start(
3279            db.path(),
3280            Arc::new(SchemaManager::new()),
3281            ProvenanceMode::Warn,
3282            Arc::new(TelemetryCounters::default()),
3283        )
3284        .expect("writer");
3285
3286        let receipt = writer
3287            .submit(WriteRequest {
3288                label: "seed".to_owned(),
3289                nodes: vec![NodeInsert {
3290                    row_id: "row-1".to_owned(),
3291                    logical_id: "logical-1".to_owned(),
3292                    kind: "Meeting".to_owned(),
3293                    properties: "{}".to_owned(),
3294                    source_ref: None,
3295                    upsert: false,
3296                    chunk_policy: ChunkPolicy::Preserve,
3297                }],
3298                node_retires: vec![],
3299                edges: vec![],
3300                edge_retires: vec![],
3301                chunks: vec![ChunkInsert {
3302                    id: "chunk-1".to_owned(),
3303                    node_logical_id: "logical-1".to_owned(),
3304                    text_content: "budget discussion".to_owned(),
3305                    byte_start: None,
3306                    byte_end: None,
3307                }],
3308                runs: vec![],
3309                steps: vec![],
3310                actions: vec![],
3311                optional_backfills: vec![],
3312                vec_inserts: vec![],
3313                operational_writes: vec![],
3314            })
3315            .expect("write receipt");
3316
3317        assert_eq!(receipt.label, "seed");
3318    }
3319
3320    #[test]
3321    fn writer_node_retire_supersedes_active_node() {
3322        let db = NamedTempFile::new().expect("temporary db");
3323        let writer = WriterActor::start(
3324            db.path(),
3325            Arc::new(SchemaManager::new()),
3326            ProvenanceMode::Warn,
3327            Arc::new(TelemetryCounters::default()),
3328        )
3329        .expect("writer");
3330
3331        writer
3332            .submit(WriteRequest {
3333                label: "seed".to_owned(),
3334                nodes: vec![NodeInsert {
3335                    row_id: "row-1".to_owned(),
3336                    logical_id: "meeting-1".to_owned(),
3337                    kind: "Meeting".to_owned(),
3338                    properties: "{}".to_owned(),
3339                    source_ref: Some("src-1".to_owned()),
3340                    upsert: false,
3341                    chunk_policy: ChunkPolicy::Preserve,
3342                }],
3343                node_retires: vec![],
3344                edges: vec![],
3345                edge_retires: vec![],
3346                chunks: vec![],
3347                runs: vec![],
3348                steps: vec![],
3349                actions: vec![],
3350                optional_backfills: vec![],
3351                vec_inserts: vec![],
3352                operational_writes: vec![],
3353            })
3354            .expect("seed write");
3355
3356        writer
3357            .submit(WriteRequest {
3358                label: "retire".to_owned(),
3359                nodes: vec![],
3360                node_retires: vec![NodeRetire {
3361                    logical_id: "meeting-1".to_owned(),
3362                    source_ref: Some("src-2".to_owned()),
3363                }],
3364                edges: vec![],
3365                edge_retires: vec![],
3366                chunks: vec![],
3367                runs: vec![],
3368                steps: vec![],
3369                actions: vec![],
3370                optional_backfills: vec![],
3371                vec_inserts: vec![],
3372                operational_writes: vec![],
3373            })
3374            .expect("retire write");
3375
3376        let conn = rusqlite::Connection::open(db.path()).expect("open");
3377        let active: i64 = conn
3378            .query_row(
3379                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
3380                [],
3381                |r| r.get(0),
3382            )
3383            .expect("count active");
3384        let historical: i64 = conn
3385            .query_row(
3386                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
3387                [],
3388                |r| r.get(0),
3389            )
3390            .expect("count historical");
3391
3392        assert_eq!(active, 0, "active count must be 0 after retire");
3393        assert_eq!(historical, 1, "historical count must be 1 after retire");
3394    }
3395
3396    #[test]
3397    fn writer_node_retire_preserves_chunks_and_clears_fts() {
3398        let db = NamedTempFile::new().expect("temporary db");
3399        let writer = WriterActor::start(
3400            db.path(),
3401            Arc::new(SchemaManager::new()),
3402            ProvenanceMode::Warn,
3403            Arc::new(TelemetryCounters::default()),
3404        )
3405        .expect("writer");
3406
3407        writer
3408            .submit(WriteRequest {
3409                label: "seed".to_owned(),
3410                nodes: vec![NodeInsert {
3411                    row_id: "row-1".to_owned(),
3412                    logical_id: "meeting-1".to_owned(),
3413                    kind: "Meeting".to_owned(),
3414                    properties: "{}".to_owned(),
3415                    source_ref: Some("src-1".to_owned()),
3416                    upsert: false,
3417                    chunk_policy: ChunkPolicy::Preserve,
3418                }],
3419                node_retires: vec![],
3420                edges: vec![],
3421                edge_retires: vec![],
3422                chunks: vec![ChunkInsert {
3423                    id: "chunk-1".to_owned(),
3424                    node_logical_id: "meeting-1".to_owned(),
3425                    text_content: "budget discussion".to_owned(),
3426                    byte_start: None,
3427                    byte_end: None,
3428                }],
3429                runs: vec![],
3430                steps: vec![],
3431                actions: vec![],
3432                optional_backfills: vec![],
3433                vec_inserts: vec![],
3434                operational_writes: vec![],
3435            })
3436            .expect("seed write");
3437
3438        writer
3439            .submit(WriteRequest {
3440                label: "retire".to_owned(),
3441                nodes: vec![],
3442                node_retires: vec![NodeRetire {
3443                    logical_id: "meeting-1".to_owned(),
3444                    source_ref: Some("src-2".to_owned()),
3445                }],
3446                edges: vec![],
3447                edge_retires: vec![],
3448                chunks: vec![],
3449                runs: vec![],
3450                steps: vec![],
3451                actions: vec![],
3452                optional_backfills: vec![],
3453                vec_inserts: vec![],
3454                operational_writes: vec![],
3455            })
3456            .expect("retire write");
3457
3458        let conn = rusqlite::Connection::open(db.path()).expect("open");
3459        let chunk_count: i64 = conn
3460            .query_row(
3461                "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
3462                [],
3463                |r| r.get(0),
3464            )
3465            .expect("chunk count");
3466        let fts_count: i64 = conn
3467            .query_row(
3468                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
3469                [],
3470                |r| r.get(0),
3471            )
3472            .expect("fts count");
3473
3474        assert_eq!(
3475            chunk_count, 1,
3476            "chunks must remain after node retire so restore can re-establish content"
3477        );
3478        assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
3479    }
3480
3481    #[test]
3482    fn writer_edge_retire_supersedes_active_edge() {
3483        let db = NamedTempFile::new().expect("temporary db");
3484        let writer = WriterActor::start(
3485            db.path(),
3486            Arc::new(SchemaManager::new()),
3487            ProvenanceMode::Warn,
3488            Arc::new(TelemetryCounters::default()),
3489        )
3490        .expect("writer");
3491
3492        writer
3493            .submit(WriteRequest {
3494                label: "seed".to_owned(),
3495                nodes: vec![
3496                    NodeInsert {
3497                        row_id: "row-a".to_owned(),
3498                        logical_id: "node-a".to_owned(),
3499                        kind: "Meeting".to_owned(),
3500                        properties: "{}".to_owned(),
3501                        source_ref: Some("src-1".to_owned()),
3502                        upsert: false,
3503                        chunk_policy: ChunkPolicy::Preserve,
3504                    },
3505                    NodeInsert {
3506                        row_id: "row-b".to_owned(),
3507                        logical_id: "node-b".to_owned(),
3508                        kind: "Task".to_owned(),
3509                        properties: "{}".to_owned(),
3510                        source_ref: Some("src-1".to_owned()),
3511                        upsert: false,
3512                        chunk_policy: ChunkPolicy::Preserve,
3513                    },
3514                ],
3515                node_retires: vec![],
3516                edges: vec![EdgeInsert {
3517                    row_id: "edge-1".to_owned(),
3518                    logical_id: "edge-lg-1".to_owned(),
3519                    source_logical_id: "node-a".to_owned(),
3520                    target_logical_id: "node-b".to_owned(),
3521                    kind: "HAS_TASK".to_owned(),
3522                    properties: "{}".to_owned(),
3523                    source_ref: Some("src-1".to_owned()),
3524                    upsert: false,
3525                }],
3526                edge_retires: vec![],
3527                chunks: vec![],
3528                runs: vec![],
3529                steps: vec![],
3530                actions: vec![],
3531                optional_backfills: vec![],
3532                vec_inserts: vec![],
3533                operational_writes: vec![],
3534            })
3535            .expect("seed write");
3536
3537        writer
3538            .submit(WriteRequest {
3539                label: "retire-edge".to_owned(),
3540                nodes: vec![],
3541                node_retires: vec![],
3542                edges: vec![],
3543                edge_retires: vec![EdgeRetire {
3544                    logical_id: "edge-lg-1".to_owned(),
3545                    source_ref: Some("src-2".to_owned()),
3546                }],
3547                chunks: vec![],
3548                runs: vec![],
3549                steps: vec![],
3550                actions: vec![],
3551                optional_backfills: vec![],
3552                vec_inserts: vec![],
3553                operational_writes: vec![],
3554            })
3555            .expect("retire edge write");
3556
3557        let conn = rusqlite::Connection::open(db.path()).expect("open");
3558        let active: i64 = conn
3559            .query_row(
3560                "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3561                [],
3562                |r| r.get(0),
3563            )
3564            .expect("active edge count");
3565
3566        assert_eq!(active, 0, "active edge count must be 0 after retire");
3567    }
3568
3569    #[test]
3570    fn writer_retire_without_source_ref_emits_provenance_warning() {
3571        let db = NamedTempFile::new().expect("temporary db");
3572        let writer = WriterActor::start(
3573            db.path(),
3574            Arc::new(SchemaManager::new()),
3575            ProvenanceMode::Warn,
3576            Arc::new(TelemetryCounters::default()),
3577        )
3578        .expect("writer");
3579
3580        writer
3581            .submit(WriteRequest {
3582                label: "seed".to_owned(),
3583                nodes: vec![NodeInsert {
3584                    row_id: "row-1".to_owned(),
3585                    logical_id: "meeting-1".to_owned(),
3586                    kind: "Meeting".to_owned(),
3587                    properties: "{}".to_owned(),
3588                    source_ref: Some("src-1".to_owned()),
3589                    upsert: false,
3590                    chunk_policy: ChunkPolicy::Preserve,
3591                }],
3592                node_retires: vec![],
3593                edges: vec![],
3594                edge_retires: vec![],
3595                chunks: vec![],
3596                runs: vec![],
3597                steps: vec![],
3598                actions: vec![],
3599                optional_backfills: vec![],
3600                vec_inserts: vec![],
3601                operational_writes: vec![],
3602            })
3603            .expect("seed write");
3604
3605        let receipt = writer
3606            .submit(WriteRequest {
3607                label: "retire-no-src".to_owned(),
3608                nodes: vec![],
3609                node_retires: vec![NodeRetire {
3610                    logical_id: "meeting-1".to_owned(),
3611                    source_ref: None,
3612                }],
3613                edges: vec![],
3614                edge_retires: vec![],
3615                chunks: vec![],
3616                runs: vec![],
3617                steps: vec![],
3618                actions: vec![],
3619                optional_backfills: vec![],
3620                vec_inserts: vec![],
3621                operational_writes: vec![],
3622            })
3623            .expect("retire write");
3624
3625        assert!(
3626            !receipt.provenance_warnings.is_empty(),
3627            "retire without source_ref must emit a provenance warning"
3628        );
3629    }
3630
3631    #[test]
3632    fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
3633        let db = NamedTempFile::new().expect("temporary db");
3634        let writer = WriterActor::start(
3635            db.path(),
3636            Arc::new(SchemaManager::new()),
3637            ProvenanceMode::Warn,
3638            Arc::new(TelemetryCounters::default()),
3639        )
3640        .expect("writer");
3641
3642        writer
3643            .submit(WriteRequest {
3644                label: "v1".to_owned(),
3645                nodes: vec![NodeInsert {
3646                    row_id: "row-1".to_owned(),
3647                    logical_id: "meeting-1".to_owned(),
3648                    kind: "Meeting".to_owned(),
3649                    properties: "{}".to_owned(),
3650                    source_ref: Some("src-1".to_owned()),
3651                    upsert: false,
3652                    chunk_policy: ChunkPolicy::Preserve,
3653                }],
3654                node_retires: vec![],
3655                edges: vec![],
3656                edge_retires: vec![],
3657                chunks: vec![ChunkInsert {
3658                    id: "chunk-old".to_owned(),
3659                    node_logical_id: "meeting-1".to_owned(),
3660                    text_content: "old text".to_owned(),
3661                    byte_start: None,
3662                    byte_end: None,
3663                }],
3664                runs: vec![],
3665                steps: vec![],
3666                actions: vec![],
3667                optional_backfills: vec![],
3668                vec_inserts: vec![],
3669                operational_writes: vec![],
3670            })
3671            .expect("v1 write");
3672
3673        writer
3674            .submit(WriteRequest {
3675                label: "v2".to_owned(),
3676                nodes: vec![NodeInsert {
3677                    row_id: "row-2".to_owned(),
3678                    logical_id: "meeting-1".to_owned(),
3679                    kind: "Meeting".to_owned(),
3680                    properties: "{}".to_owned(),
3681                    source_ref: Some("src-2".to_owned()),
3682                    upsert: true,
3683                    chunk_policy: ChunkPolicy::Replace,
3684                }],
3685                node_retires: vec![],
3686                edges: vec![],
3687                edge_retires: vec![],
3688                chunks: vec![ChunkInsert {
3689                    id: "chunk-new".to_owned(),
3690                    node_logical_id: "meeting-1".to_owned(),
3691                    text_content: "new text".to_owned(),
3692                    byte_start: None,
3693                    byte_end: None,
3694                }],
3695                runs: vec![],
3696                steps: vec![],
3697                actions: vec![],
3698                optional_backfills: vec![],
3699                vec_inserts: vec![],
3700                operational_writes: vec![],
3701            })
3702            .expect("v2 write");
3703
3704        let conn = rusqlite::Connection::open(db.path()).expect("open");
3705        let old_chunk: i64 = conn
3706            .query_row(
3707                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
3708                [],
3709                |r| r.get(0),
3710            )
3711            .expect("old chunk count");
3712        let new_chunk: i64 = conn
3713            .query_row(
3714                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
3715                [],
3716                |r| r.get(0),
3717            )
3718            .expect("new chunk count");
3719        let fts_old: i64 = conn
3720            .query_row(
3721                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
3722                [],
3723                |r| r.get(0),
3724            )
3725            .expect("old fts count");
3726
3727        assert_eq!(
3728            old_chunk, 0,
3729            "old chunk must be deleted by ChunkPolicy::Replace"
3730        );
3731        assert_eq!(new_chunk, 1, "new chunk must exist after replace");
3732        assert_eq!(
3733            fts_old, 0,
3734            "old FTS row must be deleted by ChunkPolicy::Replace"
3735        );
3736    }
3737
3738    #[test]
3739    fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
3740        let db = NamedTempFile::new().expect("temporary db");
3741        let writer = WriterActor::start(
3742            db.path(),
3743            Arc::new(SchemaManager::new()),
3744            ProvenanceMode::Warn,
3745            Arc::new(TelemetryCounters::default()),
3746        )
3747        .expect("writer");
3748
3749        writer
3750            .submit(WriteRequest {
3751                label: "v1".to_owned(),
3752                nodes: vec![NodeInsert {
3753                    row_id: "row-1".to_owned(),
3754                    logical_id: "meeting-1".to_owned(),
3755                    kind: "Meeting".to_owned(),
3756                    properties: "{}".to_owned(),
3757                    source_ref: Some("src-1".to_owned()),
3758                    upsert: false,
3759                    chunk_policy: ChunkPolicy::Preserve,
3760                }],
3761                node_retires: vec![],
3762                edges: vec![],
3763                edge_retires: vec![],
3764                chunks: vec![ChunkInsert {
3765                    id: "chunk-old".to_owned(),
3766                    node_logical_id: "meeting-1".to_owned(),
3767                    text_content: "old text".to_owned(),
3768                    byte_start: None,
3769                    byte_end: None,
3770                }],
3771                runs: vec![],
3772                steps: vec![],
3773                actions: vec![],
3774                optional_backfills: vec![],
3775                vec_inserts: vec![],
3776                operational_writes: vec![],
3777            })
3778            .expect("v1 write");
3779
3780        writer
3781            .submit(WriteRequest {
3782                label: "v2-props-only".to_owned(),
3783                nodes: vec![NodeInsert {
3784                    row_id: "row-2".to_owned(),
3785                    logical_id: "meeting-1".to_owned(),
3786                    kind: "Meeting".to_owned(),
3787                    properties: r#"{"status":"updated"}"#.to_owned(),
3788                    source_ref: Some("src-2".to_owned()),
3789                    upsert: true,
3790                    chunk_policy: ChunkPolicy::Preserve,
3791                }],
3792                node_retires: vec![],
3793                edges: vec![],
3794                edge_retires: vec![],
3795                chunks: vec![],
3796                runs: vec![],
3797                steps: vec![],
3798                actions: vec![],
3799                optional_backfills: vec![],
3800                vec_inserts: vec![],
3801                operational_writes: vec![],
3802            })
3803            .expect("v2 preserve write");
3804
3805        let conn = rusqlite::Connection::open(db.path()).expect("open");
3806        let old_chunk: i64 = conn
3807            .query_row(
3808                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
3809                [],
3810                |r| r.get(0),
3811            )
3812            .expect("old chunk count");
3813
3814        assert_eq!(
3815            old_chunk, 1,
3816            "old chunk must be preserved by ChunkPolicy::Preserve"
3817        );
3818    }
3819
3820    #[test]
3821    fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
3822        let db = NamedTempFile::new().expect("temporary db");
3823        let writer = WriterActor::start(
3824            db.path(),
3825            Arc::new(SchemaManager::new()),
3826            ProvenanceMode::Warn,
3827            Arc::new(TelemetryCounters::default()),
3828        )
3829        .expect("writer");
3830
3831        writer
3832            .submit(WriteRequest {
3833                label: "v1".to_owned(),
3834                nodes: vec![NodeInsert {
3835                    row_id: "row-1".to_owned(),
3836                    logical_id: "meeting-1".to_owned(),
3837                    kind: "Meeting".to_owned(),
3838                    properties: "{}".to_owned(),
3839                    source_ref: Some("src-1".to_owned()),
3840                    upsert: false,
3841                    chunk_policy: ChunkPolicy::Preserve,
3842                }],
3843                node_retires: vec![],
3844                edges: vec![],
3845                edge_retires: vec![],
3846                chunks: vec![ChunkInsert {
3847                    id: "chunk-existing".to_owned(),
3848                    node_logical_id: "meeting-1".to_owned(),
3849                    text_content: "existing text".to_owned(),
3850                    byte_start: None,
3851                    byte_end: None,
3852                }],
3853                runs: vec![],
3854                steps: vec![],
3855                actions: vec![],
3856                optional_backfills: vec![],
3857                vec_inserts: vec![],
3858                operational_writes: vec![],
3859            })
3860            .expect("v1 write");
3861
3862        // Insert a second node (not upsert) with ChunkPolicy::Replace — should NOT delete prior chunks
3863        writer
3864            .submit(WriteRequest {
3865                label: "insert-no-upsert".to_owned(),
3866                nodes: vec![NodeInsert {
3867                    row_id: "row-2".to_owned(),
3868                    logical_id: "meeting-2".to_owned(),
3869                    kind: "Meeting".to_owned(),
3870                    properties: "{}".to_owned(),
3871                    source_ref: Some("src-2".to_owned()),
3872                    upsert: false,
3873                    chunk_policy: ChunkPolicy::Replace,
3874                }],
3875                node_retires: vec![],
3876                edges: vec![],
3877                edge_retires: vec![],
3878                chunks: vec![],
3879                runs: vec![],
3880                steps: vec![],
3881                actions: vec![],
3882                optional_backfills: vec![],
3883                vec_inserts: vec![],
3884                operational_writes: vec![],
3885            })
3886            .expect("insert no-upsert write");
3887
3888        let conn = rusqlite::Connection::open(db.path()).expect("open");
3889        let existing_chunk: i64 = conn
3890            .query_row(
3891                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
3892                [],
3893                |r| r.get(0),
3894            )
3895            .expect("chunk count");
3896
3897        assert_eq!(
3898            existing_chunk, 1,
3899            "ChunkPolicy::Replace without upsert must not delete existing chunks"
3900        );
3901    }
3902
3903    #[test]
3904    fn writer_run_upsert_supersedes_prior_active_run() {
3905        let db = NamedTempFile::new().expect("temporary db");
3906        let writer = WriterActor::start(
3907            db.path(),
3908            Arc::new(SchemaManager::new()),
3909            ProvenanceMode::Warn,
3910            Arc::new(TelemetryCounters::default()),
3911        )
3912        .expect("writer");
3913
3914        writer
3915            .submit(WriteRequest {
3916                label: "v1".to_owned(),
3917                nodes: vec![],
3918                node_retires: vec![],
3919                edges: vec![],
3920                edge_retires: vec![],
3921                chunks: vec![],
3922                runs: vec![RunInsert {
3923                    id: "run-v1".to_owned(),
3924                    kind: "session".to_owned(),
3925                    status: "completed".to_owned(),
3926                    properties: "{}".to_owned(),
3927                    source_ref: Some("src-1".to_owned()),
3928                    upsert: false,
3929                    supersedes_id: None,
3930                }],
3931                steps: vec![],
3932                actions: vec![],
3933                optional_backfills: vec![],
3934                vec_inserts: vec![],
3935                operational_writes: vec![],
3936            })
3937            .expect("v1 run write");
3938
3939        writer
3940            .submit(WriteRequest {
3941                label: "v2".to_owned(),
3942                nodes: vec![],
3943                node_retires: vec![],
3944                edges: vec![],
3945                edge_retires: vec![],
3946                chunks: vec![],
3947                runs: vec![RunInsert {
3948                    id: "run-v2".to_owned(),
3949                    kind: "session".to_owned(),
3950                    status: "completed".to_owned(),
3951                    properties: "{}".to_owned(),
3952                    source_ref: Some("src-2".to_owned()),
3953                    upsert: true,
3954                    supersedes_id: Some("run-v1".to_owned()),
3955                }],
3956                steps: vec![],
3957                actions: vec![],
3958                optional_backfills: vec![],
3959                vec_inserts: vec![],
3960                operational_writes: vec![],
3961            })
3962            .expect("v2 run write");
3963
3964        let conn = rusqlite::Connection::open(db.path()).expect("open");
3965        let v1_historical: i64 = conn
3966            .query_row(
3967                "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
3968                [],
3969                |r| r.get(0),
3970            )
3971            .expect("v1 historical count");
3972        let v2_active: i64 = conn
3973            .query_row(
3974                "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
3975                [],
3976                |r| r.get(0),
3977            )
3978            .expect("v2 active count");
3979
3980        assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
3981        assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
3982    }
3983
3984    #[test]
3985    fn writer_step_upsert_supersedes_prior_active_step() {
3986        let db = NamedTempFile::new().expect("temporary db");
3987        let writer = WriterActor::start(
3988            db.path(),
3989            Arc::new(SchemaManager::new()),
3990            ProvenanceMode::Warn,
3991            Arc::new(TelemetryCounters::default()),
3992        )
3993        .expect("writer");
3994
3995        writer
3996            .submit(WriteRequest {
3997                label: "v1".to_owned(),
3998                nodes: vec![],
3999                node_retires: vec![],
4000                edges: vec![],
4001                edge_retires: vec![],
4002                chunks: vec![],
4003                runs: vec![RunInsert {
4004                    id: "run-1".to_owned(),
4005                    kind: "session".to_owned(),
4006                    status: "completed".to_owned(),
4007                    properties: "{}".to_owned(),
4008                    source_ref: Some("src-1".to_owned()),
4009                    upsert: false,
4010                    supersedes_id: None,
4011                }],
4012                steps: vec![StepInsert {
4013                    id: "step-v1".to_owned(),
4014                    run_id: "run-1".to_owned(),
4015                    kind: "llm".to_owned(),
4016                    status: "completed".to_owned(),
4017                    properties: "{}".to_owned(),
4018                    source_ref: Some("src-1".to_owned()),
4019                    upsert: false,
4020                    supersedes_id: None,
4021                }],
4022                actions: vec![],
4023                optional_backfills: vec![],
4024                vec_inserts: vec![],
4025                operational_writes: vec![],
4026            })
4027            .expect("v1 step write");
4028
4029        writer
4030            .submit(WriteRequest {
4031                label: "v2".to_owned(),
4032                nodes: vec![],
4033                node_retires: vec![],
4034                edges: vec![],
4035                edge_retires: vec![],
4036                chunks: vec![],
4037                runs: vec![],
4038                steps: vec![StepInsert {
4039                    id: "step-v2".to_owned(),
4040                    run_id: "run-1".to_owned(),
4041                    kind: "llm".to_owned(),
4042                    status: "completed".to_owned(),
4043                    properties: "{}".to_owned(),
4044                    source_ref: Some("src-2".to_owned()),
4045                    upsert: true,
4046                    supersedes_id: Some("step-v1".to_owned()),
4047                }],
4048                actions: vec![],
4049                optional_backfills: vec![],
4050                vec_inserts: vec![],
4051                operational_writes: vec![],
4052            })
4053            .expect("v2 step write");
4054
4055        let conn = rusqlite::Connection::open(db.path()).expect("open");
4056        let v1_historical: i64 = conn
4057            .query_row(
4058                "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4059                [],
4060                |r| r.get(0),
4061            )
4062            .expect("v1 historical count");
4063        let v2_active: i64 = conn
4064            .query_row(
4065                "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4066                [],
4067                |r| r.get(0),
4068            )
4069            .expect("v2 active count");
4070
4071        assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4072        assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4073    }
4074
4075    #[test]
4076    fn writer_action_upsert_supersedes_prior_active_action() {
4077        let db = NamedTempFile::new().expect("temporary db");
4078        let writer = WriterActor::start(
4079            db.path(),
4080            Arc::new(SchemaManager::new()),
4081            ProvenanceMode::Warn,
4082            Arc::new(TelemetryCounters::default()),
4083        )
4084        .expect("writer");
4085
4086        writer
4087            .submit(WriteRequest {
4088                label: "v1".to_owned(),
4089                nodes: vec![],
4090                node_retires: vec![],
4091                edges: vec![],
4092                edge_retires: vec![],
4093                chunks: vec![],
4094                runs: vec![RunInsert {
4095                    id: "run-1".to_owned(),
4096                    kind: "session".to_owned(),
4097                    status: "completed".to_owned(),
4098                    properties: "{}".to_owned(),
4099                    source_ref: Some("src-1".to_owned()),
4100                    upsert: false,
4101                    supersedes_id: None,
4102                }],
4103                steps: vec![StepInsert {
4104                    id: "step-1".to_owned(),
4105                    run_id: "run-1".to_owned(),
4106                    kind: "llm".to_owned(),
4107                    status: "completed".to_owned(),
4108                    properties: "{}".to_owned(),
4109                    source_ref: Some("src-1".to_owned()),
4110                    upsert: false,
4111                    supersedes_id: None,
4112                }],
4113                actions: vec![ActionInsert {
4114                    id: "action-v1".to_owned(),
4115                    step_id: "step-1".to_owned(),
4116                    kind: "emit".to_owned(),
4117                    status: "completed".to_owned(),
4118                    properties: "{}".to_owned(),
4119                    source_ref: Some("src-1".to_owned()),
4120                    upsert: false,
4121                    supersedes_id: None,
4122                }],
4123                optional_backfills: vec![],
4124                vec_inserts: vec![],
4125                operational_writes: vec![],
4126            })
4127            .expect("v1 action write");
4128
4129        writer
4130            .submit(WriteRequest {
4131                label: "v2".to_owned(),
4132                nodes: vec![],
4133                node_retires: vec![],
4134                edges: vec![],
4135                edge_retires: vec![],
4136                chunks: vec![],
4137                runs: vec![],
4138                steps: vec![],
4139                actions: vec![ActionInsert {
4140                    id: "action-v2".to_owned(),
4141                    step_id: "step-1".to_owned(),
4142                    kind: "emit".to_owned(),
4143                    status: "completed".to_owned(),
4144                    properties: "{}".to_owned(),
4145                    source_ref: Some("src-2".to_owned()),
4146                    upsert: true,
4147                    supersedes_id: Some("action-v1".to_owned()),
4148                }],
4149                optional_backfills: vec![],
4150                vec_inserts: vec![],
4151                operational_writes: vec![],
4152            })
4153            .expect("v2 action write");
4154
4155        let conn = rusqlite::Connection::open(db.path()).expect("open");
4156        let v1_historical: i64 = conn
4157            .query_row(
4158                "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4159                [],
4160                |r| r.get(0),
4161            )
4162            .expect("v1 historical count");
4163        let v2_active: i64 = conn
4164            .query_row(
4165                "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4166                [],
4167                |r| r.get(0),
4168            )
4169            .expect("v2 active count");
4170
4171        assert_eq!(
4172            v1_historical, 1,
4173            "action-v1 must be historical after upsert"
4174        );
4175        assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4176    }
4177
4178    // P0: runtime upsert without supersedes_id must be rejected
4179
4180    #[test]
4181    fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4182        let db = NamedTempFile::new().expect("temporary db");
4183        let writer = WriterActor::start(
4184            db.path(),
4185            Arc::new(SchemaManager::new()),
4186            ProvenanceMode::Warn,
4187            Arc::new(TelemetryCounters::default()),
4188        )
4189        .expect("writer");
4190
4191        let result = writer.submit(WriteRequest {
4192            label: "bad".to_owned(),
4193            nodes: vec![],
4194            node_retires: vec![],
4195            edges: vec![],
4196            edge_retires: vec![],
4197            chunks: vec![],
4198            runs: vec![RunInsert {
4199                id: "run-1".to_owned(),
4200                kind: "session".to_owned(),
4201                status: "completed".to_owned(),
4202                properties: "{}".to_owned(),
4203                source_ref: None,
4204                upsert: true,
4205                supersedes_id: None,
4206            }],
4207            steps: vec![],
4208            actions: vec![],
4209            optional_backfills: vec![],
4210            vec_inserts: vec![],
4211            operational_writes: vec![],
4212        });
4213
4214        assert!(
4215            matches!(result, Err(EngineError::InvalidWrite(_))),
4216            "run upsert=true without supersedes_id must return InvalidWrite"
4217        );
4218    }
4219
4220    #[test]
4221    fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4222        let db = NamedTempFile::new().expect("temporary db");
4223        let writer = WriterActor::start(
4224            db.path(),
4225            Arc::new(SchemaManager::new()),
4226            ProvenanceMode::Warn,
4227            Arc::new(TelemetryCounters::default()),
4228        )
4229        .expect("writer");
4230
4231        let result = writer.submit(WriteRequest {
4232            label: "bad".to_owned(),
4233            nodes: vec![],
4234            node_retires: vec![],
4235            edges: vec![],
4236            edge_retires: vec![],
4237            chunks: vec![],
4238            runs: vec![],
4239            steps: vec![StepInsert {
4240                id: "step-1".to_owned(),
4241                run_id: "run-1".to_owned(),
4242                kind: "llm".to_owned(),
4243                status: "completed".to_owned(),
4244                properties: "{}".to_owned(),
4245                source_ref: None,
4246                upsert: true,
4247                supersedes_id: None,
4248            }],
4249            actions: vec![],
4250            optional_backfills: vec![],
4251            vec_inserts: vec![],
4252            operational_writes: vec![],
4253        });
4254
4255        assert!(
4256            matches!(result, Err(EngineError::InvalidWrite(_))),
4257            "step upsert=true without supersedes_id must return InvalidWrite"
4258        );
4259    }
4260
4261    #[test]
4262    fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
4263        let db = NamedTempFile::new().expect("temporary db");
4264        let writer = WriterActor::start(
4265            db.path(),
4266            Arc::new(SchemaManager::new()),
4267            ProvenanceMode::Warn,
4268            Arc::new(TelemetryCounters::default()),
4269        )
4270        .expect("writer");
4271
4272        let result = writer.submit(WriteRequest {
4273            label: "bad".to_owned(),
4274            nodes: vec![],
4275            node_retires: vec![],
4276            edges: vec![],
4277            edge_retires: vec![],
4278            chunks: vec![],
4279            runs: vec![],
4280            steps: vec![],
4281            actions: vec![ActionInsert {
4282                id: "action-1".to_owned(),
4283                step_id: "step-1".to_owned(),
4284                kind: "emit".to_owned(),
4285                status: "completed".to_owned(),
4286                properties: "{}".to_owned(),
4287                source_ref: None,
4288                upsert: true,
4289                supersedes_id: None,
4290            }],
4291            optional_backfills: vec![],
4292            vec_inserts: vec![],
4293            operational_writes: vec![],
4294        });
4295
4296        assert!(
4297            matches!(result, Err(EngineError::InvalidWrite(_))),
4298            "action upsert=true without supersedes_id must return InvalidWrite"
4299        );
4300    }
4301
4302    // P1a/b: provenance warnings for edge inserts and runtime table inserts
4303
4304    #[test]
4305    fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
4306        let db = NamedTempFile::new().expect("temporary db");
4307        let writer = WriterActor::start(
4308            db.path(),
4309            Arc::new(SchemaManager::new()),
4310            ProvenanceMode::Warn,
4311            Arc::new(TelemetryCounters::default()),
4312        )
4313        .expect("writer");
4314
4315        let receipt = writer
4316            .submit(WriteRequest {
4317                label: "test".to_owned(),
4318                nodes: vec![
4319                    NodeInsert {
4320                        row_id: "row-a".to_owned(),
4321                        logical_id: "node-a".to_owned(),
4322                        kind: "Meeting".to_owned(),
4323                        properties: "{}".to_owned(),
4324                        source_ref: Some("src-1".to_owned()),
4325                        upsert: false,
4326                        chunk_policy: ChunkPolicy::Preserve,
4327                    },
4328                    NodeInsert {
4329                        row_id: "row-b".to_owned(),
4330                        logical_id: "node-b".to_owned(),
4331                        kind: "Task".to_owned(),
4332                        properties: "{}".to_owned(),
4333                        source_ref: Some("src-1".to_owned()),
4334                        upsert: false,
4335                        chunk_policy: ChunkPolicy::Preserve,
4336                    },
4337                ],
4338                node_retires: vec![],
4339                edges: vec![EdgeInsert {
4340                    row_id: "edge-1".to_owned(),
4341                    logical_id: "edge-lg-1".to_owned(),
4342                    source_logical_id: "node-a".to_owned(),
4343                    target_logical_id: "node-b".to_owned(),
4344                    kind: "HAS_TASK".to_owned(),
4345                    properties: "{}".to_owned(),
4346                    source_ref: None,
4347                    upsert: false,
4348                }],
4349                edge_retires: vec![],
4350                chunks: vec![],
4351                runs: vec![],
4352                steps: vec![],
4353                actions: vec![],
4354                optional_backfills: vec![],
4355                vec_inserts: vec![],
4356                operational_writes: vec![],
4357            })
4358            .expect("write");
4359
4360        assert!(
4361            !receipt.provenance_warnings.is_empty(),
4362            "edge insert without source_ref must emit a provenance warning"
4363        );
4364    }
4365
4366    #[test]
4367    fn writer_run_insert_without_source_ref_emits_provenance_warning() {
4368        let db = NamedTempFile::new().expect("temporary db");
4369        let writer = WriterActor::start(
4370            db.path(),
4371            Arc::new(SchemaManager::new()),
4372            ProvenanceMode::Warn,
4373            Arc::new(TelemetryCounters::default()),
4374        )
4375        .expect("writer");
4376
4377        let receipt = writer
4378            .submit(WriteRequest {
4379                label: "test".to_owned(),
4380                nodes: vec![],
4381                node_retires: vec![],
4382                edges: vec![],
4383                edge_retires: vec![],
4384                chunks: vec![],
4385                runs: vec![RunInsert {
4386                    id: "run-1".to_owned(),
4387                    kind: "session".to_owned(),
4388                    status: "completed".to_owned(),
4389                    properties: "{}".to_owned(),
4390                    source_ref: None,
4391                    upsert: false,
4392                    supersedes_id: None,
4393                }],
4394                steps: vec![],
4395                actions: vec![],
4396                optional_backfills: vec![],
4397                vec_inserts: vec![],
4398                operational_writes: vec![],
4399            })
4400            .expect("write");
4401
4402        assert!(
4403            !receipt.provenance_warnings.is_empty(),
4404            "run insert without source_ref must emit a provenance warning"
4405        );
4406    }
4407
4408    // P1c: retire a node AND submit chunks for the same logical_id in one request
4409
4410    #[test]
4411    fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
4412        let db = NamedTempFile::new().expect("temporary db");
4413        let writer = WriterActor::start(
4414            db.path(),
4415            Arc::new(SchemaManager::new()),
4416            ProvenanceMode::Warn,
4417            Arc::new(TelemetryCounters::default()),
4418        )
4419        .expect("writer");
4420
4421        // First seed the node so it exists
4422        writer
4423            .submit(WriteRequest {
4424                label: "seed".to_owned(),
4425                nodes: vec![NodeInsert {
4426                    row_id: "row-1".to_owned(),
4427                    logical_id: "meeting-1".to_owned(),
4428                    kind: "Meeting".to_owned(),
4429                    properties: "{}".to_owned(),
4430                    source_ref: Some("src-1".to_owned()),
4431                    upsert: false,
4432                    chunk_policy: ChunkPolicy::Preserve,
4433                }],
4434                node_retires: vec![],
4435                edges: vec![],
4436                edge_retires: vec![],
4437                chunks: vec![],
4438                runs: vec![],
4439                steps: vec![],
4440                actions: vec![],
4441                optional_backfills: vec![],
4442                vec_inserts: vec![],
4443                operational_writes: vec![],
4444            })
4445            .expect("seed write");
4446
4447        // Now try to retire it AND add a chunk for it in the same request
4448        let result = writer.submit(WriteRequest {
4449            label: "bad".to_owned(),
4450            nodes: vec![],
4451            node_retires: vec![NodeRetire {
4452                logical_id: "meeting-1".to_owned(),
4453                source_ref: Some("src-2".to_owned()),
4454            }],
4455            edges: vec![],
4456            edge_retires: vec![],
4457            chunks: vec![ChunkInsert {
4458                id: "chunk-bad".to_owned(),
4459                node_logical_id: "meeting-1".to_owned(),
4460                text_content: "some text".to_owned(),
4461                byte_start: None,
4462                byte_end: None,
4463            }],
4464            runs: vec![],
4465            steps: vec![],
4466            actions: vec![],
4467            optional_backfills: vec![],
4468            vec_inserts: vec![],
4469            operational_writes: vec![],
4470        });
4471
4472        assert!(
4473            matches!(result, Err(EngineError::InvalidWrite(_))),
4474            "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
4475        );
4476    }
4477
4478    // --- Item 1: prepare_cached batch insert ---
4479
4480    #[test]
4481    fn writer_batch_insert_multiple_nodes() {
4482        let db = NamedTempFile::new().expect("temporary db");
4483        let writer = WriterActor::start(
4484            db.path(),
4485            Arc::new(SchemaManager::new()),
4486            ProvenanceMode::Warn,
4487            Arc::new(TelemetryCounters::default()),
4488        )
4489        .expect("writer");
4490
4491        let nodes: Vec<NodeInsert> = (0..100)
4492            .map(|i| NodeInsert {
4493                row_id: format!("row-{i}"),
4494                logical_id: format!("lg-{i}"),
4495                kind: "Note".to_owned(),
4496                properties: "{}".to_owned(),
4497                source_ref: Some("batch-src".to_owned()),
4498                upsert: false,
4499                chunk_policy: ChunkPolicy::Preserve,
4500            })
4501            .collect();
4502
4503        writer
4504            .submit(WriteRequest {
4505                label: "batch".to_owned(),
4506                nodes,
4507                node_retires: vec![],
4508                edges: vec![],
4509                edge_retires: vec![],
4510                chunks: vec![],
4511                runs: vec![],
4512                steps: vec![],
4513                actions: vec![],
4514                optional_backfills: vec![],
4515                vec_inserts: vec![],
4516                operational_writes: vec![],
4517            })
4518            .expect("batch write");
4519
4520        let conn = rusqlite::Connection::open(db.path()).expect("open");
4521        let count: i64 = conn
4522            .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
4523            .expect("count nodes");
4524        assert_eq!(
4525            count, 100,
4526            "all 100 nodes must be present after batch insert"
4527        );
4528    }
4529
4530    // --- Item 2: ID validation ---
4531
4532    #[test]
4533    fn prepare_write_rejects_empty_node_row_id() {
4534        let db = NamedTempFile::new().expect("temporary db");
4535        let writer = WriterActor::start(
4536            db.path(),
4537            Arc::new(SchemaManager::new()),
4538            ProvenanceMode::Warn,
4539            Arc::new(TelemetryCounters::default()),
4540        )
4541        .expect("writer");
4542
4543        let result = writer.submit(WriteRequest {
4544            label: "test".to_owned(),
4545            nodes: vec![NodeInsert {
4546                row_id: String::new(),
4547                logical_id: "lg-1".to_owned(),
4548                kind: "Note".to_owned(),
4549                properties: "{}".to_owned(),
4550                source_ref: None,
4551                upsert: false,
4552                chunk_policy: ChunkPolicy::Preserve,
4553            }],
4554            node_retires: vec![],
4555            edges: vec![],
4556            edge_retires: vec![],
4557            chunks: vec![],
4558            runs: vec![],
4559            steps: vec![],
4560            actions: vec![],
4561            optional_backfills: vec![],
4562            vec_inserts: vec![],
4563            operational_writes: vec![],
4564        });
4565
4566        assert!(
4567            matches!(result, Err(EngineError::InvalidWrite(_))),
4568            "empty row_id must be rejected"
4569        );
4570    }
4571
4572    #[test]
4573    fn prepare_write_rejects_empty_node_logical_id() {
4574        let db = NamedTempFile::new().expect("temporary db");
4575        let writer = WriterActor::start(
4576            db.path(),
4577            Arc::new(SchemaManager::new()),
4578            ProvenanceMode::Warn,
4579            Arc::new(TelemetryCounters::default()),
4580        )
4581        .expect("writer");
4582
4583        let result = writer.submit(WriteRequest {
4584            label: "test".to_owned(),
4585            nodes: vec![NodeInsert {
4586                row_id: "row-1".to_owned(),
4587                logical_id: String::new(),
4588                kind: "Note".to_owned(),
4589                properties: "{}".to_owned(),
4590                source_ref: None,
4591                upsert: false,
4592                chunk_policy: ChunkPolicy::Preserve,
4593            }],
4594            node_retires: vec![],
4595            edges: vec![],
4596            edge_retires: vec![],
4597            chunks: vec![],
4598            runs: vec![],
4599            steps: vec![],
4600            actions: vec![],
4601            optional_backfills: vec![],
4602            vec_inserts: vec![],
4603            operational_writes: vec![],
4604        });
4605
4606        assert!(
4607            matches!(result, Err(EngineError::InvalidWrite(_))),
4608            "empty logical_id must be rejected"
4609        );
4610    }
4611
4612    #[test]
4613    fn prepare_write_rejects_duplicate_row_ids_in_request() {
4614        let db = NamedTempFile::new().expect("temporary db");
4615        let writer = WriterActor::start(
4616            db.path(),
4617            Arc::new(SchemaManager::new()),
4618            ProvenanceMode::Warn,
4619            Arc::new(TelemetryCounters::default()),
4620        )
4621        .expect("writer");
4622
4623        let result = writer.submit(WriteRequest {
4624            label: "test".to_owned(),
4625            nodes: vec![
4626                NodeInsert {
4627                    row_id: "row-1".to_owned(),
4628                    logical_id: "lg-1".to_owned(),
4629                    kind: "Note".to_owned(),
4630                    properties: "{}".to_owned(),
4631                    source_ref: None,
4632                    upsert: false,
4633                    chunk_policy: ChunkPolicy::Preserve,
4634                },
4635                NodeInsert {
4636                    row_id: "row-1".to_owned(), // duplicate
4637                    logical_id: "lg-2".to_owned(),
4638                    kind: "Note".to_owned(),
4639                    properties: "{}".to_owned(),
4640                    source_ref: None,
4641                    upsert: false,
4642                    chunk_policy: ChunkPolicy::Preserve,
4643                },
4644            ],
4645            node_retires: vec![],
4646            edges: vec![],
4647            edge_retires: vec![],
4648            chunks: vec![],
4649            runs: vec![],
4650            steps: vec![],
4651            actions: vec![],
4652            optional_backfills: vec![],
4653            vec_inserts: vec![],
4654            operational_writes: vec![],
4655        });
4656
4657        assert!(
4658            matches!(result, Err(EngineError::InvalidWrite(_))),
4659            "duplicate row_id within request must be rejected"
4660        );
4661    }
4662
4663    #[test]
4664    fn prepare_write_rejects_empty_chunk_id() {
4665        let db = NamedTempFile::new().expect("temporary db");
4666        let writer = WriterActor::start(
4667            db.path(),
4668            Arc::new(SchemaManager::new()),
4669            ProvenanceMode::Warn,
4670            Arc::new(TelemetryCounters::default()),
4671        )
4672        .expect("writer");
4673
4674        let result = writer.submit(WriteRequest {
4675            label: "test".to_owned(),
4676            nodes: vec![NodeInsert {
4677                row_id: "row-1".to_owned(),
4678                logical_id: "lg-1".to_owned(),
4679                kind: "Note".to_owned(),
4680                properties: "{}".to_owned(),
4681                source_ref: None,
4682                upsert: false,
4683                chunk_policy: ChunkPolicy::Preserve,
4684            }],
4685            node_retires: vec![],
4686            edges: vec![],
4687            edge_retires: vec![],
4688            chunks: vec![ChunkInsert {
4689                id: String::new(),
4690                node_logical_id: "lg-1".to_owned(),
4691                text_content: "some text".to_owned(),
4692                byte_start: None,
4693                byte_end: None,
4694            }],
4695            runs: vec![],
4696            steps: vec![],
4697            actions: vec![],
4698            optional_backfills: vec![],
4699            vec_inserts: vec![],
4700            operational_writes: vec![],
4701        });
4702
4703        assert!(
4704            matches!(result, Err(EngineError::InvalidWrite(_))),
4705            "empty chunk id must be rejected"
4706        );
4707    }
4708
4709    // --- Item 4: provenance warning coverage tests ---
4710
4711    #[test]
4712    fn writer_receipt_warns_on_step_without_source_ref() {
4713        let db = NamedTempFile::new().expect("temporary db");
4714        let writer = WriterActor::start(
4715            db.path(),
4716            Arc::new(SchemaManager::new()),
4717            ProvenanceMode::Warn,
4718            Arc::new(TelemetryCounters::default()),
4719        )
4720        .expect("writer");
4721
4722        // seed a run first so step FK is satisfied
4723        writer
4724            .submit(WriteRequest {
4725                label: "seed-run".to_owned(),
4726                nodes: vec![],
4727                node_retires: vec![],
4728                edges: vec![],
4729                edge_retires: vec![],
4730                chunks: vec![],
4731                runs: vec![RunInsert {
4732                    id: "run-1".to_owned(),
4733                    kind: "session".to_owned(),
4734                    status: "active".to_owned(),
4735                    properties: "{}".to_owned(),
4736                    source_ref: Some("src-1".to_owned()),
4737                    upsert: false,
4738                    supersedes_id: None,
4739                }],
4740                steps: vec![],
4741                actions: vec![],
4742                optional_backfills: vec![],
4743                vec_inserts: vec![],
4744                operational_writes: vec![],
4745            })
4746            .expect("seed run");
4747
4748        let receipt = writer
4749            .submit(WriteRequest {
4750                label: "test".to_owned(),
4751                nodes: vec![],
4752                node_retires: vec![],
4753                edges: vec![],
4754                edge_retires: vec![],
4755                chunks: vec![],
4756                runs: vec![],
4757                steps: vec![StepInsert {
4758                    id: "step-1".to_owned(),
4759                    run_id: "run-1".to_owned(),
4760                    kind: "llm_call".to_owned(),
4761                    status: "completed".to_owned(),
4762                    properties: "{}".to_owned(),
4763                    source_ref: None,
4764                    upsert: false,
4765                    supersedes_id: None,
4766                }],
4767                actions: vec![],
4768                optional_backfills: vec![],
4769                vec_inserts: vec![],
4770                operational_writes: vec![],
4771            })
4772            .expect("write");
4773
4774        assert!(
4775            !receipt.provenance_warnings.is_empty(),
4776            "step insert without source_ref must emit a provenance warning"
4777        );
4778    }
4779
4780    #[test]
4781    fn writer_receipt_warns_on_action_without_source_ref() {
4782        let db = NamedTempFile::new().expect("temporary db");
4783        let writer = WriterActor::start(
4784            db.path(),
4785            Arc::new(SchemaManager::new()),
4786            ProvenanceMode::Warn,
4787            Arc::new(TelemetryCounters::default()),
4788        )
4789        .expect("writer");
4790
4791        // seed run and step so action FK is satisfied
4792        writer
4793            .submit(WriteRequest {
4794                label: "seed".to_owned(),
4795                nodes: vec![],
4796                node_retires: vec![],
4797                edges: vec![],
4798                edge_retires: vec![],
4799                chunks: vec![],
4800                runs: vec![RunInsert {
4801                    id: "run-1".to_owned(),
4802                    kind: "session".to_owned(),
4803                    status: "active".to_owned(),
4804                    properties: "{}".to_owned(),
4805                    source_ref: Some("src-1".to_owned()),
4806                    upsert: false,
4807                    supersedes_id: None,
4808                }],
4809                steps: vec![StepInsert {
4810                    id: "step-1".to_owned(),
4811                    run_id: "run-1".to_owned(),
4812                    kind: "llm_call".to_owned(),
4813                    status: "completed".to_owned(),
4814                    properties: "{}".to_owned(),
4815                    source_ref: Some("src-1".to_owned()),
4816                    upsert: false,
4817                    supersedes_id: None,
4818                }],
4819                actions: vec![],
4820                optional_backfills: vec![],
4821                vec_inserts: vec![],
4822                operational_writes: vec![],
4823            })
4824            .expect("seed");
4825
4826        let receipt = writer
4827            .submit(WriteRequest {
4828                label: "test".to_owned(),
4829                nodes: vec![],
4830                node_retires: vec![],
4831                edges: vec![],
4832                edge_retires: vec![],
4833                chunks: vec![],
4834                runs: vec![],
4835                steps: vec![],
4836                actions: vec![ActionInsert {
4837                    id: "action-1".to_owned(),
4838                    step_id: "step-1".to_owned(),
4839                    kind: "tool_call".to_owned(),
4840                    status: "completed".to_owned(),
4841                    properties: "{}".to_owned(),
4842                    source_ref: None,
4843                    upsert: false,
4844                    supersedes_id: None,
4845                }],
4846                optional_backfills: vec![],
4847                vec_inserts: vec![],
4848                operational_writes: vec![],
4849            })
4850            .expect("write");
4851
4852        assert!(
4853            !receipt.provenance_warnings.is_empty(),
4854            "action insert without source_ref must emit a provenance warning"
4855        );
4856    }
4857
4858    #[test]
4859    fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
4860        let db = NamedTempFile::new().expect("temporary db");
4861        let writer = WriterActor::start(
4862            db.path(),
4863            Arc::new(SchemaManager::new()),
4864            ProvenanceMode::Warn,
4865            Arc::new(TelemetryCounters::default()),
4866        )
4867        .expect("writer");
4868
4869        let receipt = writer
4870            .submit(WriteRequest {
4871                label: "test".to_owned(),
4872                nodes: vec![NodeInsert {
4873                    row_id: "row-1".to_owned(),
4874                    logical_id: "node-1".to_owned(),
4875                    kind: "Note".to_owned(),
4876                    properties: "{}".to_owned(),
4877                    source_ref: Some("src-1".to_owned()),
4878                    upsert: false,
4879                    chunk_policy: ChunkPolicy::Preserve,
4880                }],
4881                node_retires: vec![],
4882                edges: vec![],
4883                edge_retires: vec![],
4884                chunks: vec![],
4885                runs: vec![RunInsert {
4886                    id: "run-1".to_owned(),
4887                    kind: "session".to_owned(),
4888                    status: "active".to_owned(),
4889                    properties: "{}".to_owned(),
4890                    source_ref: Some("src-1".to_owned()),
4891                    upsert: false,
4892                    supersedes_id: None,
4893                }],
4894                steps: vec![StepInsert {
4895                    id: "step-1".to_owned(),
4896                    run_id: "run-1".to_owned(),
4897                    kind: "llm_call".to_owned(),
4898                    status: "completed".to_owned(),
4899                    properties: "{}".to_owned(),
4900                    source_ref: Some("src-1".to_owned()),
4901                    upsert: false,
4902                    supersedes_id: None,
4903                }],
4904                actions: vec![ActionInsert {
4905                    id: "action-1".to_owned(),
4906                    step_id: "step-1".to_owned(),
4907                    kind: "tool_call".to_owned(),
4908                    status: "completed".to_owned(),
4909                    properties: "{}".to_owned(),
4910                    source_ref: Some("src-1".to_owned()),
4911                    upsert: false,
4912                    supersedes_id: None,
4913                }],
4914                optional_backfills: vec![],
4915                vec_inserts: vec![],
4916                operational_writes: vec![],
4917            })
4918            .expect("write");
4919
4920        assert!(
4921            receipt.provenance_warnings.is_empty(),
4922            "no warnings expected when all types have source_ref; got: {:?}",
4923            receipt.provenance_warnings
4924        );
4925    }
4926
4927    // --- Item 4 Task 2: ProvenanceMode tests ---
4928
4929    #[test]
4930    fn default_provenance_mode_is_warn() {
4931        // ProvenanceMode::Warn is the Default; a node without source_ref must warn, not error
4932        let db = NamedTempFile::new().expect("temporary db");
4933        let writer = WriterActor::start(
4934            db.path(),
4935            Arc::new(SchemaManager::new()),
4936            ProvenanceMode::default(),
4937            Arc::new(TelemetryCounters::default()),
4938        )
4939        .expect("writer");
4940
4941        let receipt = writer
4942            .submit(WriteRequest {
4943                label: "test".to_owned(),
4944                nodes: vec![NodeInsert {
4945                    row_id: "row-1".to_owned(),
4946                    logical_id: "node-1".to_owned(),
4947                    kind: "Note".to_owned(),
4948                    properties: "{}".to_owned(),
4949                    source_ref: None,
4950                    upsert: false,
4951                    chunk_policy: ChunkPolicy::Preserve,
4952                }],
4953                node_retires: vec![],
4954                edges: vec![],
4955                edge_retires: vec![],
4956                chunks: vec![],
4957                runs: vec![],
4958                steps: vec![],
4959                actions: vec![],
4960                optional_backfills: vec![],
4961                vec_inserts: vec![],
4962                operational_writes: vec![],
4963            })
4964            .expect("Warn mode must not reject missing source_ref");
4965
4966        assert!(
4967            !receipt.provenance_warnings.is_empty(),
4968            "Warn mode must emit a warning instead of rejecting"
4969        );
4970    }
4971
4972    #[test]
4973    fn require_mode_rejects_node_without_source_ref() {
4974        let db = NamedTempFile::new().expect("temporary db");
4975        let writer = WriterActor::start(
4976            db.path(),
4977            Arc::new(SchemaManager::new()),
4978            ProvenanceMode::Require,
4979            Arc::new(TelemetryCounters::default()),
4980        )
4981        .expect("writer");
4982
4983        let result = writer.submit(WriteRequest {
4984            label: "test".to_owned(),
4985            nodes: vec![NodeInsert {
4986                row_id: "row-1".to_owned(),
4987                logical_id: "node-1".to_owned(),
4988                kind: "Note".to_owned(),
4989                properties: "{}".to_owned(),
4990                source_ref: None,
4991                upsert: false,
4992                chunk_policy: ChunkPolicy::Preserve,
4993            }],
4994            node_retires: vec![],
4995            edges: vec![],
4996            edge_retires: vec![],
4997            chunks: vec![],
4998            runs: vec![],
4999            steps: vec![],
5000            actions: vec![],
5001            optional_backfills: vec![],
5002            vec_inserts: vec![],
5003            operational_writes: vec![],
5004        });
5005
5006        assert!(
5007            matches!(result, Err(EngineError::InvalidWrite(_))),
5008            "Require mode must reject node without source_ref"
5009        );
5010    }
5011
5012    #[test]
5013    fn require_mode_accepts_node_with_source_ref() {
5014        let db = NamedTempFile::new().expect("temporary db");
5015        let writer = WriterActor::start(
5016            db.path(),
5017            Arc::new(SchemaManager::new()),
5018            ProvenanceMode::Require,
5019            Arc::new(TelemetryCounters::default()),
5020        )
5021        .expect("writer");
5022
5023        let result = writer.submit(WriteRequest {
5024            label: "test".to_owned(),
5025            nodes: vec![NodeInsert {
5026                row_id: "row-1".to_owned(),
5027                logical_id: "node-1".to_owned(),
5028                kind: "Note".to_owned(),
5029                properties: "{}".to_owned(),
5030                source_ref: Some("src-1".to_owned()),
5031                upsert: false,
5032                chunk_policy: ChunkPolicy::Preserve,
5033            }],
5034            node_retires: vec![],
5035            edges: vec![],
5036            edge_retires: vec![],
5037            chunks: vec![],
5038            runs: vec![],
5039            steps: vec![],
5040            actions: vec![],
5041            optional_backfills: vec![],
5042            vec_inserts: vec![],
5043            operational_writes: vec![],
5044        });
5045
5046        assert!(
5047            result.is_ok(),
5048            "Require mode must accept node with source_ref"
5049        );
5050    }
5051
5052    #[test]
5053    fn require_mode_rejects_edge_without_source_ref() {
5054        let db = NamedTempFile::new().expect("temporary db");
5055        let writer = WriterActor::start(
5056            db.path(),
5057            Arc::new(SchemaManager::new()),
5058            ProvenanceMode::Require,
5059            Arc::new(TelemetryCounters::default()),
5060        )
5061        .expect("writer");
5062
5063        // seed nodes first so FK check doesn't interfere (Require mode rejects before DB touch)
5064        let result = writer.submit(WriteRequest {
5065            label: "test".to_owned(),
5066            nodes: vec![
5067                NodeInsert {
5068                    row_id: "row-a".to_owned(),
5069                    logical_id: "node-a".to_owned(),
5070                    kind: "Note".to_owned(),
5071                    properties: "{}".to_owned(),
5072                    source_ref: Some("src-1".to_owned()),
5073                    upsert: false,
5074                    chunk_policy: ChunkPolicy::Preserve,
5075                },
5076                NodeInsert {
5077                    row_id: "row-b".to_owned(),
5078                    logical_id: "node-b".to_owned(),
5079                    kind: "Note".to_owned(),
5080                    properties: "{}".to_owned(),
5081                    source_ref: Some("src-1".to_owned()),
5082                    upsert: false,
5083                    chunk_policy: ChunkPolicy::Preserve,
5084                },
5085            ],
5086            node_retires: vec![],
5087            edges: vec![EdgeInsert {
5088                row_id: "edge-row-1".to_owned(),
5089                logical_id: "edge-1".to_owned(),
5090                source_logical_id: "node-a".to_owned(),
5091                target_logical_id: "node-b".to_owned(),
5092                kind: "LINKS_TO".to_owned(),
5093                properties: "{}".to_owned(),
5094                source_ref: None,
5095                upsert: false,
5096            }],
5097            edge_retires: vec![],
5098            chunks: vec![],
5099            runs: vec![],
5100            steps: vec![],
5101            actions: vec![],
5102            optional_backfills: vec![],
5103            vec_inserts: vec![],
5104            operational_writes: vec![],
5105        });
5106
5107        assert!(
5108            matches!(result, Err(EngineError::InvalidWrite(_))),
5109            "Require mode must reject edge without source_ref"
5110        );
5111    }
5112
5113    // --- Item 5: FTS projection coverage tests ---
5114
5115    #[test]
5116    fn fts_row_has_correct_kind_from_co_submitted_node() {
5117        let db = NamedTempFile::new().expect("temporary db");
5118        let writer = WriterActor::start(
5119            db.path(),
5120            Arc::new(SchemaManager::new()),
5121            ProvenanceMode::Warn,
5122            Arc::new(TelemetryCounters::default()),
5123        )
5124        .expect("writer");
5125
5126        writer
5127            .submit(WriteRequest {
5128                label: "test".to_owned(),
5129                nodes: vec![NodeInsert {
5130                    row_id: "row-1".to_owned(),
5131                    logical_id: "node-1".to_owned(),
5132                    kind: "Meeting".to_owned(),
5133                    properties: "{}".to_owned(),
5134                    source_ref: Some("src-1".to_owned()),
5135                    upsert: false,
5136                    chunk_policy: ChunkPolicy::Preserve,
5137                }],
5138                node_retires: vec![],
5139                edges: vec![],
5140                edge_retires: vec![],
5141                chunks: vec![ChunkInsert {
5142                    id: "chunk-1".to_owned(),
5143                    node_logical_id: "node-1".to_owned(),
5144                    text_content: "some text".to_owned(),
5145                    byte_start: None,
5146                    byte_end: None,
5147                }],
5148                runs: vec![],
5149                steps: vec![],
5150                actions: vec![],
5151                optional_backfills: vec![],
5152                vec_inserts: vec![],
5153                operational_writes: vec![],
5154            })
5155            .expect("write");
5156
5157        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5158        let kind: String = conn
5159            .query_row(
5160                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5161                [],
5162                |row| row.get(0),
5163            )
5164            .expect("fts row");
5165
5166        assert_eq!(kind, "Meeting");
5167    }
5168
5169    #[test]
5170    fn fts_row_has_correct_text_content() {
5171        let db = NamedTempFile::new().expect("temporary db");
5172        let writer = WriterActor::start(
5173            db.path(),
5174            Arc::new(SchemaManager::new()),
5175            ProvenanceMode::Warn,
5176            Arc::new(TelemetryCounters::default()),
5177        )
5178        .expect("writer");
5179
5180        writer
5181            .submit(WriteRequest {
5182                label: "test".to_owned(),
5183                nodes: vec![NodeInsert {
5184                    row_id: "row-1".to_owned(),
5185                    logical_id: "node-1".to_owned(),
5186                    kind: "Note".to_owned(),
5187                    properties: "{}".to_owned(),
5188                    source_ref: Some("src-1".to_owned()),
5189                    upsert: false,
5190                    chunk_policy: ChunkPolicy::Preserve,
5191                }],
5192                node_retires: vec![],
5193                edges: vec![],
5194                edge_retires: vec![],
5195                chunks: vec![ChunkInsert {
5196                    id: "chunk-1".to_owned(),
5197                    node_logical_id: "node-1".to_owned(),
5198                    text_content: "exactly this text".to_owned(),
5199                    byte_start: None,
5200                    byte_end: None,
5201                }],
5202                runs: vec![],
5203                steps: vec![],
5204                actions: vec![],
5205                optional_backfills: vec![],
5206                vec_inserts: vec![],
5207                operational_writes: vec![],
5208            })
5209            .expect("write");
5210
5211        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5212        let text: String = conn
5213            .query_row(
5214                "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5215                [],
5216                |row| row.get(0),
5217            )
5218            .expect("fts row");
5219
5220        assert_eq!(text, "exactly this text");
5221    }
5222
5223    #[test]
5224    fn fts_row_has_correct_kind_from_pre_existing_node() {
5225        let db = NamedTempFile::new().expect("temporary db");
5226        let writer = WriterActor::start(
5227            db.path(),
5228            Arc::new(SchemaManager::new()),
5229            ProvenanceMode::Warn,
5230            Arc::new(TelemetryCounters::default()),
5231        )
5232        .expect("writer");
5233
5234        // Request 1: node only
5235        writer
5236            .submit(WriteRequest {
5237                label: "r1".to_owned(),
5238                nodes: vec![NodeInsert {
5239                    row_id: "row-1".to_owned(),
5240                    logical_id: "node-1".to_owned(),
5241                    kind: "Document".to_owned(),
5242                    properties: "{}".to_owned(),
5243                    source_ref: Some("src-1".to_owned()),
5244                    upsert: false,
5245                    chunk_policy: ChunkPolicy::Preserve,
5246                }],
5247                node_retires: vec![],
5248                edges: vec![],
5249                edge_retires: vec![],
5250                chunks: vec![],
5251                runs: vec![],
5252                steps: vec![],
5253                actions: vec![],
5254                optional_backfills: vec![],
5255                vec_inserts: vec![],
5256                operational_writes: vec![],
5257            })
5258            .expect("r1 write");
5259
5260        // Request 2: chunk for pre-existing node
5261        writer
5262            .submit(WriteRequest {
5263                label: "r2".to_owned(),
5264                nodes: vec![],
5265                node_retires: vec![],
5266                edges: vec![],
5267                edge_retires: vec![],
5268                chunks: vec![ChunkInsert {
5269                    id: "chunk-1".to_owned(),
5270                    node_logical_id: "node-1".to_owned(),
5271                    text_content: "some text".to_owned(),
5272                    byte_start: None,
5273                    byte_end: None,
5274                }],
5275                runs: vec![],
5276                steps: vec![],
5277                actions: vec![],
5278                optional_backfills: vec![],
5279                vec_inserts: vec![],
5280                operational_writes: vec![],
5281            })
5282            .expect("r2 write");
5283
5284        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5285        let kind: String = conn
5286            .query_row(
5287                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5288                [],
5289                |row| row.get(0),
5290            )
5291            .expect("fts row");
5292
5293        assert_eq!(kind, "Document");
5294    }
5295
5296    #[test]
5297    fn fts_derives_rows_for_multiple_chunks_per_node() {
5298        let db = NamedTempFile::new().expect("temporary db");
5299        let writer = WriterActor::start(
5300            db.path(),
5301            Arc::new(SchemaManager::new()),
5302            ProvenanceMode::Warn,
5303            Arc::new(TelemetryCounters::default()),
5304        )
5305        .expect("writer");
5306
5307        writer
5308            .submit(WriteRequest {
5309                label: "test".to_owned(),
5310                nodes: vec![NodeInsert {
5311                    row_id: "row-1".to_owned(),
5312                    logical_id: "node-1".to_owned(),
5313                    kind: "Meeting".to_owned(),
5314                    properties: "{}".to_owned(),
5315                    source_ref: Some("src-1".to_owned()),
5316                    upsert: false,
5317                    chunk_policy: ChunkPolicy::Preserve,
5318                }],
5319                node_retires: vec![],
5320                edges: vec![],
5321                edge_retires: vec![],
5322                chunks: vec![
5323                    ChunkInsert {
5324                        id: "chunk-a".to_owned(),
5325                        node_logical_id: "node-1".to_owned(),
5326                        text_content: "intro".to_owned(),
5327                        byte_start: None,
5328                        byte_end: None,
5329                    },
5330                    ChunkInsert {
5331                        id: "chunk-b".to_owned(),
5332                        node_logical_id: "node-1".to_owned(),
5333                        text_content: "body".to_owned(),
5334                        byte_start: None,
5335                        byte_end: None,
5336                    },
5337                    ChunkInsert {
5338                        id: "chunk-c".to_owned(),
5339                        node_logical_id: "node-1".to_owned(),
5340                        text_content: "conclusion".to_owned(),
5341                        byte_start: None,
5342                        byte_end: None,
5343                    },
5344                ],
5345                runs: vec![],
5346                steps: vec![],
5347                actions: vec![],
5348                optional_backfills: vec![],
5349                vec_inserts: vec![],
5350                operational_writes: vec![],
5351            })
5352            .expect("write");
5353
5354        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5355        let count: i64 = conn
5356            .query_row(
5357                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5358                [],
5359                |row| row.get(0),
5360            )
5361            .expect("fts count");
5362
5363        assert_eq!(count, 3, "three chunks must produce three FTS rows");
5364    }
5365
5366    #[test]
5367    fn fts_resolves_mixed_fast_and_db_paths() {
5368        let db = NamedTempFile::new().expect("temporary db");
5369        let writer = WriterActor::start(
5370            db.path(),
5371            Arc::new(SchemaManager::new()),
5372            ProvenanceMode::Warn,
5373            Arc::new(TelemetryCounters::default()),
5374        )
5375        .expect("writer");
5376
5377        // Seed pre-existing node
5378        writer
5379            .submit(WriteRequest {
5380                label: "seed".to_owned(),
5381                nodes: vec![NodeInsert {
5382                    row_id: "row-existing".to_owned(),
5383                    logical_id: "existing-node".to_owned(),
5384                    kind: "Archive".to_owned(),
5385                    properties: "{}".to_owned(),
5386                    source_ref: Some("src-1".to_owned()),
5387                    upsert: false,
5388                    chunk_policy: ChunkPolicy::Preserve,
5389                }],
5390                node_retires: vec![],
5391                edges: vec![],
5392                edge_retires: vec![],
5393                chunks: vec![],
5394                runs: vec![],
5395                steps: vec![],
5396                actions: vec![],
5397                optional_backfills: vec![],
5398                vec_inserts: vec![],
5399                operational_writes: vec![],
5400            })
5401            .expect("seed");
5402
5403        // Mixed request: new node (fast path) + chunk for pre-existing node (DB path)
5404        writer
5405            .submit(WriteRequest {
5406                label: "mixed".to_owned(),
5407                nodes: vec![NodeInsert {
5408                    row_id: "row-new".to_owned(),
5409                    logical_id: "new-node".to_owned(),
5410                    kind: "Inbox".to_owned(),
5411                    properties: "{}".to_owned(),
5412                    source_ref: Some("src-2".to_owned()),
5413                    upsert: false,
5414                    chunk_policy: ChunkPolicy::Preserve,
5415                }],
5416                node_retires: vec![],
5417                edges: vec![],
5418                edge_retires: vec![],
5419                chunks: vec![
5420                    ChunkInsert {
5421                        id: "chunk-fast".to_owned(),
5422                        node_logical_id: "new-node".to_owned(),
5423                        text_content: "new content".to_owned(),
5424                        byte_start: None,
5425                        byte_end: None,
5426                    },
5427                    ChunkInsert {
5428                        id: "chunk-db".to_owned(),
5429                        node_logical_id: "existing-node".to_owned(),
5430                        text_content: "archive content".to_owned(),
5431                        byte_start: None,
5432                        byte_end: None,
5433                    },
5434                ],
5435                runs: vec![],
5436                steps: vec![],
5437                actions: vec![],
5438                optional_backfills: vec![],
5439                vec_inserts: vec![],
5440                operational_writes: vec![],
5441            })
5442            .expect("mixed write");
5443
5444        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5445        let fast_kind: String = conn
5446            .query_row(
5447                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
5448                [],
5449                |row| row.get(0),
5450            )
5451            .expect("fast path fts row");
5452        let db_kind: String = conn
5453            .query_row(
5454                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
5455                [],
5456                |row| row.get(0),
5457            )
5458            .expect("db path fts row");
5459
5460        assert_eq!(fast_kind, "Inbox");
5461        assert_eq!(db_kind, "Archive");
5462    }
5463
5464    #[test]
5465    fn prepare_write_rejects_empty_chunk_text() {
5466        let db = NamedTempFile::new().expect("temporary db");
5467        let writer = WriterActor::start(
5468            db.path(),
5469            Arc::new(SchemaManager::new()),
5470            ProvenanceMode::Warn,
5471            Arc::new(TelemetryCounters::default()),
5472        )
5473        .expect("writer");
5474
5475        let result = writer.submit(WriteRequest {
5476            label: "test".to_owned(),
5477            nodes: vec![NodeInsert {
5478                row_id: "row-1".to_owned(),
5479                logical_id: "node-1".to_owned(),
5480                kind: "Note".to_owned(),
5481                properties: "{}".to_owned(),
5482                source_ref: None,
5483                upsert: false,
5484                chunk_policy: ChunkPolicy::Preserve,
5485            }],
5486            node_retires: vec![],
5487            edges: vec![],
5488            edge_retires: vec![],
5489            chunks: vec![ChunkInsert {
5490                id: "chunk-1".to_owned(),
5491                node_logical_id: "node-1".to_owned(),
5492                text_content: String::new(),
5493                byte_start: None,
5494                byte_end: None,
5495            }],
5496            runs: vec![],
5497            steps: vec![],
5498            actions: vec![],
5499            optional_backfills: vec![],
5500            vec_inserts: vec![],
5501            operational_writes: vec![],
5502        });
5503
5504        assert!(
5505            matches!(result, Err(EngineError::InvalidWrite(_))),
5506            "empty text_content must be rejected"
5507        );
5508    }
5509
5510    #[test]
5511    fn receipt_reports_zero_backfills_when_none_submitted() {
5512        let db = NamedTempFile::new().expect("temporary db");
5513        let writer = WriterActor::start(
5514            db.path(),
5515            Arc::new(SchemaManager::new()),
5516            ProvenanceMode::Warn,
5517            Arc::new(TelemetryCounters::default()),
5518        )
5519        .expect("writer");
5520
5521        let receipt = writer
5522            .submit(WriteRequest {
5523                label: "test".to_owned(),
5524                nodes: vec![NodeInsert {
5525                    row_id: "row-1".to_owned(),
5526                    logical_id: "node-1".to_owned(),
5527                    kind: "Note".to_owned(),
5528                    properties: "{}".to_owned(),
5529                    source_ref: Some("src-1".to_owned()),
5530                    upsert: false,
5531                    chunk_policy: ChunkPolicy::Preserve,
5532                }],
5533                node_retires: vec![],
5534                edges: vec![],
5535                edge_retires: vec![],
5536                chunks: vec![],
5537                runs: vec![],
5538                steps: vec![],
5539                actions: vec![],
5540                optional_backfills: vec![],
5541                vec_inserts: vec![],
5542                operational_writes: vec![],
5543            })
5544            .expect("write");
5545
5546        assert_eq!(receipt.optional_backfill_count, 0);
5547    }
5548
5549    #[test]
5550    fn receipt_reports_correct_backfill_count() {
5551        let db = NamedTempFile::new().expect("temporary db");
5552        let writer = WriterActor::start(
5553            db.path(),
5554            Arc::new(SchemaManager::new()),
5555            ProvenanceMode::Warn,
5556            Arc::new(TelemetryCounters::default()),
5557        )
5558        .expect("writer");
5559
5560        let receipt = writer
5561            .submit(WriteRequest {
5562                label: "test".to_owned(),
5563                nodes: vec![NodeInsert {
5564                    row_id: "row-1".to_owned(),
5565                    logical_id: "node-1".to_owned(),
5566                    kind: "Note".to_owned(),
5567                    properties: "{}".to_owned(),
5568                    source_ref: Some("src-1".to_owned()),
5569                    upsert: false,
5570                    chunk_policy: ChunkPolicy::Preserve,
5571                }],
5572                node_retires: vec![],
5573                edges: vec![],
5574                edge_retires: vec![],
5575                chunks: vec![],
5576                runs: vec![],
5577                steps: vec![],
5578                actions: vec![],
5579                optional_backfills: vec![
5580                    OptionalProjectionTask {
5581                        target: ProjectionTarget::Fts,
5582                        payload: "p1".to_owned(),
5583                    },
5584                    OptionalProjectionTask {
5585                        target: ProjectionTarget::Vec,
5586                        payload: "p2".to_owned(),
5587                    },
5588                    OptionalProjectionTask {
5589                        target: ProjectionTarget::All,
5590                        payload: "p3".to_owned(),
5591                    },
5592                ],
5593                vec_inserts: vec![],
5594                operational_writes: vec![],
5595            })
5596            .expect("write");
5597
5598        assert_eq!(receipt.optional_backfill_count, 3);
5599    }
5600
5601    #[test]
5602    fn backfill_tasks_are_not_executed_during_write() {
5603        let db = NamedTempFile::new().expect("temporary db");
5604        let writer = WriterActor::start(
5605            db.path(),
5606            Arc::new(SchemaManager::new()),
5607            ProvenanceMode::Warn,
5608            Arc::new(TelemetryCounters::default()),
5609        )
5610        .expect("writer");
5611
5612        // Write a node + chunk. Submit a backfill task targeting FTS.
5613        // The write path must not create any extra FTS rows beyond the required one.
5614        writer
5615            .submit(WriteRequest {
5616                label: "test".to_owned(),
5617                nodes: vec![NodeInsert {
5618                    row_id: "row-1".to_owned(),
5619                    logical_id: "node-1".to_owned(),
5620                    kind: "Note".to_owned(),
5621                    properties: "{}".to_owned(),
5622                    source_ref: Some("src-1".to_owned()),
5623                    upsert: false,
5624                    chunk_policy: ChunkPolicy::Preserve,
5625                }],
5626                node_retires: vec![],
5627                edges: vec![],
5628                edge_retires: vec![],
5629                chunks: vec![ChunkInsert {
5630                    id: "chunk-1".to_owned(),
5631                    node_logical_id: "node-1".to_owned(),
5632                    text_content: "required text".to_owned(),
5633                    byte_start: None,
5634                    byte_end: None,
5635                }],
5636                runs: vec![],
5637                steps: vec![],
5638                actions: vec![],
5639                optional_backfills: vec![OptionalProjectionTask {
5640                    target: ProjectionTarget::Fts,
5641                    payload: "backfill-payload".to_owned(),
5642                }],
5643                vec_inserts: vec![],
5644                operational_writes: vec![],
5645            })
5646            .expect("write");
5647
5648        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5649        let count: i64 = conn
5650            .query_row(
5651                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5652                [],
5653                |row| row.get(0),
5654            )
5655            .expect("fts count");
5656
5657        assert_eq!(count, 1, "backfill task must not create extra FTS rows");
5658    }
5659
5660    #[test]
5661    fn fts_row_uses_new_kind_after_node_replace() {
5662        let db = NamedTempFile::new().expect("temporary db");
5663        let writer = WriterActor::start(
5664            db.path(),
5665            Arc::new(SchemaManager::new()),
5666            ProvenanceMode::Warn,
5667            Arc::new(TelemetryCounters::default()),
5668        )
5669        .expect("writer");
5670
5671        // Write original node as "Note"
5672        writer
5673            .submit(WriteRequest {
5674                label: "v1".to_owned(),
5675                nodes: vec![NodeInsert {
5676                    row_id: "row-1".to_owned(),
5677                    logical_id: "node-1".to_owned(),
5678                    kind: "Note".to_owned(),
5679                    properties: "{}".to_owned(),
5680                    source_ref: Some("src-1".to_owned()),
5681                    upsert: false,
5682                    chunk_policy: ChunkPolicy::Preserve,
5683                }],
5684                node_retires: vec![],
5685                edges: vec![],
5686                edge_retires: vec![],
5687                chunks: vec![ChunkInsert {
5688                    id: "chunk-v1".to_owned(),
5689                    node_logical_id: "node-1".to_owned(),
5690                    text_content: "original".to_owned(),
5691                    byte_start: None,
5692                    byte_end: None,
5693                }],
5694                runs: vec![],
5695                steps: vec![],
5696                actions: vec![],
5697                optional_backfills: vec![],
5698                vec_inserts: vec![],
5699                operational_writes: vec![],
5700            })
5701            .expect("v1 write");
5702
5703        // Replace with "Meeting" kind + new chunk using ChunkPolicy::Replace
5704        writer
5705            .submit(WriteRequest {
5706                label: "v2".to_owned(),
5707                nodes: vec![NodeInsert {
5708                    row_id: "row-2".to_owned(),
5709                    logical_id: "node-1".to_owned(),
5710                    kind: "Meeting".to_owned(),
5711                    properties: "{}".to_owned(),
5712                    source_ref: Some("src-2".to_owned()),
5713                    upsert: true,
5714                    chunk_policy: ChunkPolicy::Replace,
5715                }],
5716                node_retires: vec![],
5717                edges: vec![],
5718                edge_retires: vec![],
5719                chunks: vec![ChunkInsert {
5720                    id: "chunk-v2".to_owned(),
5721                    node_logical_id: "node-1".to_owned(),
5722                    text_content: "updated".to_owned(),
5723                    byte_start: None,
5724                    byte_end: None,
5725                }],
5726                runs: vec![],
5727                steps: vec![],
5728                actions: vec![],
5729                optional_backfills: vec![],
5730                vec_inserts: vec![],
5731                operational_writes: vec![],
5732            })
5733            .expect("v2 write");
5734
5735        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5736
5737        // Old FTS row must be gone
5738        let old_count: i64 = conn
5739            .query_row(
5740                "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
5741                [],
5742                |row| row.get(0),
5743            )
5744            .expect("old fts count");
5745        assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
5746
5747        // New FTS row must use the new kind
5748        let new_kind: String = conn
5749            .query_row(
5750                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
5751                [],
5752                |row| row.get(0),
5753            )
5754            .expect("new fts row");
5755        assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
5756    }
5757
5758    // --- Item 3: VecInsert tests ---
5759
5760    #[test]
5761    fn vec_insert_empty_chunk_id_is_rejected() {
5762        let db = NamedTempFile::new().expect("temporary db");
5763        let writer = WriterActor::start(
5764            db.path(),
5765            Arc::new(SchemaManager::new()),
5766            ProvenanceMode::Warn,
5767            Arc::new(TelemetryCounters::default()),
5768        )
5769        .expect("writer");
5770        let result = writer.submit(WriteRequest {
5771            label: "vec-test".to_owned(),
5772            nodes: vec![],
5773            node_retires: vec![],
5774            edges: vec![],
5775            edge_retires: vec![],
5776            chunks: vec![],
5777            runs: vec![],
5778            steps: vec![],
5779            actions: vec![],
5780            optional_backfills: vec![],
5781            vec_inserts: vec![VecInsert {
5782                chunk_id: String::new(),
5783                embedding: vec![0.1, 0.2, 0.3],
5784            }],
5785            operational_writes: vec![],
5786        });
5787        assert!(
5788            matches!(result, Err(EngineError::InvalidWrite(_))),
5789            "empty chunk_id in VecInsert must be rejected"
5790        );
5791    }
5792
5793    #[test]
5794    fn vec_insert_empty_embedding_is_rejected() {
5795        let db = NamedTempFile::new().expect("temporary db");
5796        let writer = WriterActor::start(
5797            db.path(),
5798            Arc::new(SchemaManager::new()),
5799            ProvenanceMode::Warn,
5800            Arc::new(TelemetryCounters::default()),
5801        )
5802        .expect("writer");
5803        let result = writer.submit(WriteRequest {
5804            label: "vec-test".to_owned(),
5805            nodes: vec![],
5806            node_retires: vec![],
5807            edges: vec![],
5808            edge_retires: vec![],
5809            chunks: vec![],
5810            runs: vec![],
5811            steps: vec![],
5812            actions: vec![],
5813            optional_backfills: vec![],
5814            vec_inserts: vec![VecInsert {
5815                chunk_id: "chunk-1".to_owned(),
5816                embedding: vec![],
5817            }],
5818            operational_writes: vec![],
5819        });
5820        assert!(
5821            matches!(result, Err(EngineError::InvalidWrite(_))),
5822            "empty embedding in VecInsert must be rejected"
5823        );
5824    }
5825
5826    #[test]
5827    fn vec_insert_noop_without_feature() {
5828        // Without the sqlite-vec feature, a well-formed VecInsert must succeed
5829        // (no error) but not write to any vec table.
5830        let db = NamedTempFile::new().expect("temporary db");
5831        let writer = WriterActor::start(
5832            db.path(),
5833            Arc::new(SchemaManager::new()),
5834            ProvenanceMode::Warn,
5835            Arc::new(TelemetryCounters::default()),
5836        )
5837        .expect("writer");
5838        let result = writer.submit(WriteRequest {
5839            label: "vec-noop".to_owned(),
5840            nodes: vec![],
5841            node_retires: vec![],
5842            edges: vec![],
5843            edge_retires: vec![],
5844            chunks: vec![],
5845            runs: vec![],
5846            steps: vec![],
5847            actions: vec![],
5848            optional_backfills: vec![],
5849            vec_inserts: vec![VecInsert {
5850                chunk_id: "chunk-noop".to_owned(),
5851                embedding: vec![1.0, 2.0, 3.0],
5852            }],
5853            operational_writes: vec![],
5854        });
5855        #[cfg(not(feature = "sqlite-vec"))]
5856        result.expect("noop VecInsert without feature must succeed");
5857        // The result variable is used above; silence unused warning for cfg-on path.
5858        #[cfg(feature = "sqlite-vec")]
5859        let _ = result;
5860    }
5861
5862    #[cfg(feature = "sqlite-vec")]
5863    #[test]
5864    fn node_retire_preserves_vec_rows_for_later_restore() {
5865        use crate::sqlite::open_connection_with_vec;
5866
5867        let db = NamedTempFile::new().expect("temporary db");
5868        let schema_manager = Arc::new(SchemaManager::new());
5869
5870        {
5871            let conn = open_connection_with_vec(db.path()).expect("vec connection");
5872            schema_manager.bootstrap(&conn).expect("bootstrap");
5873            schema_manager
5874                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
5875                .expect("ensure profile");
5876        }
5877
5878        let writer = WriterActor::start(
5879            db.path(),
5880            Arc::clone(&schema_manager),
5881            ProvenanceMode::Warn,
5882            Arc::new(TelemetryCounters::default()),
5883        )
5884        .expect("writer");
5885
5886        // Insert node + chunk + vec row
5887        writer
5888            .submit(WriteRequest {
5889                label: "setup".to_owned(),
5890                nodes: vec![NodeInsert {
5891                    row_id: "row-retire-vec".to_owned(),
5892                    logical_id: "node-retire-vec".to_owned(),
5893                    kind: "Doc".to_owned(),
5894                    properties: "{}".to_owned(),
5895                    source_ref: Some("src".to_owned()),
5896                    upsert: false,
5897                    chunk_policy: ChunkPolicy::Preserve,
5898                }],
5899                node_retires: vec![],
5900                edges: vec![],
5901                edge_retires: vec![],
5902                chunks: vec![ChunkInsert {
5903                    id: "chunk-retire-vec".to_owned(),
5904                    node_logical_id: "node-retire-vec".to_owned(),
5905                    text_content: "text".to_owned(),
5906                    byte_start: None,
5907                    byte_end: None,
5908                }],
5909                runs: vec![],
5910                steps: vec![],
5911                actions: vec![],
5912                optional_backfills: vec![],
5913                vec_inserts: vec![VecInsert {
5914                    chunk_id: "chunk-retire-vec".to_owned(),
5915                    embedding: vec![0.1, 0.2, 0.3],
5916                }],
5917                operational_writes: vec![],
5918            })
5919            .expect("setup write");
5920
5921        // Retire the node
5922        writer
5923            .submit(WriteRequest {
5924                label: "retire".to_owned(),
5925                nodes: vec![],
5926                node_retires: vec![NodeRetire {
5927                    logical_id: "node-retire-vec".to_owned(),
5928                    source_ref: Some("src".to_owned()),
5929                }],
5930                edges: vec![],
5931                edge_retires: vec![],
5932                chunks: vec![],
5933                runs: vec![],
5934                steps: vec![],
5935                actions: vec![],
5936                optional_backfills: vec![],
5937                vec_inserts: vec![],
5938                operational_writes: vec![],
5939            })
5940            .expect("retire write");
5941
5942        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5943        let count: i64 = conn
5944            .query_row(
5945                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-retire-vec'",
5946                [],
5947                |row| row.get(0),
5948            )
5949            .expect("count");
5950        assert_eq!(
5951            count, 1,
5952            "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
5953        );
5954    }
5955
5956    #[cfg(feature = "sqlite-vec")]
5957    #[test]
5958    fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
5959        use crate::sqlite::open_connection_with_vec;
5960
5961        let db = NamedTempFile::new().expect("temporary db");
5962        let schema_manager = Arc::new(SchemaManager::new());
5963
5964        {
5965            let conn = open_connection_with_vec(db.path()).expect("vec connection");
5966            schema_manager.bootstrap(&conn).expect("bootstrap");
5967            schema_manager
5968                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
5969                .expect("ensure profile");
5970        }
5971
5972        let writer = WriterActor::start(
5973            db.path(),
5974            Arc::clone(&schema_manager),
5975            ProvenanceMode::Warn,
5976            Arc::new(TelemetryCounters::default()),
5977        )
5978        .expect("writer");
5979
5980        // Insert node + chunk-A + vec-A
5981        writer
5982            .submit(WriteRequest {
5983                label: "v1".to_owned(),
5984                nodes: vec![NodeInsert {
5985                    row_id: "row-replace-v1".to_owned(),
5986                    logical_id: "node-replace-vec".to_owned(),
5987                    kind: "Doc".to_owned(),
5988                    properties: "{}".to_owned(),
5989                    source_ref: Some("src".to_owned()),
5990                    upsert: false,
5991                    chunk_policy: ChunkPolicy::Preserve,
5992                }],
5993                node_retires: vec![],
5994                edges: vec![],
5995                edge_retires: vec![],
5996                chunks: vec![ChunkInsert {
5997                    id: "chunk-replace-A".to_owned(),
5998                    node_logical_id: "node-replace-vec".to_owned(),
5999                    text_content: "version one".to_owned(),
6000                    byte_start: None,
6001                    byte_end: None,
6002                }],
6003                runs: vec![],
6004                steps: vec![],
6005                actions: vec![],
6006                optional_backfills: vec![],
6007                vec_inserts: vec![VecInsert {
6008                    chunk_id: "chunk-replace-A".to_owned(),
6009                    embedding: vec![0.1, 0.2, 0.3],
6010                }],
6011                operational_writes: vec![],
6012            })
6013            .expect("v1 write");
6014
6015        // Upsert with Replace + chunk-B + vec-B
6016        writer
6017            .submit(WriteRequest {
6018                label: "v2".to_owned(),
6019                nodes: vec![NodeInsert {
6020                    row_id: "row-replace-v2".to_owned(),
6021                    logical_id: "node-replace-vec".to_owned(),
6022                    kind: "Doc".to_owned(),
6023                    properties: "{}".to_owned(),
6024                    source_ref: Some("src".to_owned()),
6025                    upsert: true,
6026                    chunk_policy: ChunkPolicy::Replace,
6027                }],
6028                node_retires: vec![],
6029                edges: vec![],
6030                edge_retires: vec![],
6031                chunks: vec![ChunkInsert {
6032                    id: "chunk-replace-B".to_owned(),
6033                    node_logical_id: "node-replace-vec".to_owned(),
6034                    text_content: "version two".to_owned(),
6035                    byte_start: None,
6036                    byte_end: None,
6037                }],
6038                runs: vec![],
6039                steps: vec![],
6040                actions: vec![],
6041                optional_backfills: vec![],
6042                vec_inserts: vec![VecInsert {
6043                    chunk_id: "chunk-replace-B".to_owned(),
6044                    embedding: vec![0.4, 0.5, 0.6],
6045                }],
6046                operational_writes: vec![],
6047            })
6048            .expect("v2 write");
6049
6050        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6051        let count_a: i64 = conn
6052            .query_row(
6053                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-A'",
6054                [],
6055                |row| row.get(0),
6056            )
6057            .expect("count A");
6058        let count_b: i64 = conn
6059            .query_row(
6060                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-B'",
6061                [],
6062                |row| row.get(0),
6063            )
6064            .expect("count B");
6065        assert_eq!(
6066            count_a, 0,
6067            "old vec row (chunk-A) must be deleted on Replace"
6068        );
6069        assert_eq!(
6070            count_b, 1,
6071            "new vec row (chunk-B) must be present after Replace"
6072        );
6073    }
6074
6075    #[cfg(feature = "sqlite-vec")]
6076    #[test]
6077    fn vec_insert_is_persisted_when_feature_enabled() {
6078        use crate::sqlite::open_connection_with_vec;
6079
6080        let db = NamedTempFile::new().expect("temporary db");
6081        let schema_manager = Arc::new(SchemaManager::new());
6082
6083        // Open a vec-capable connection and bootstrap + ensure profile
6084        {
6085            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6086            schema_manager.bootstrap(&conn).expect("bootstrap");
6087            schema_manager
6088                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6089                .expect("ensure profile");
6090        }
6091
6092        let writer = WriterActor::start(
6093            db.path(),
6094            Arc::clone(&schema_manager),
6095            ProvenanceMode::Warn,
6096            Arc::new(TelemetryCounters::default()),
6097        )
6098        .expect("writer");
6099
6100        writer
6101            .submit(WriteRequest {
6102                label: "vec-insert".to_owned(),
6103                nodes: vec![],
6104                node_retires: vec![],
6105                edges: vec![],
6106                edge_retires: vec![],
6107                chunks: vec![],
6108                runs: vec![],
6109                steps: vec![],
6110                actions: vec![],
6111                optional_backfills: vec![],
6112                vec_inserts: vec![VecInsert {
6113                    chunk_id: "chunk-vec".to_owned(),
6114                    embedding: vec![0.1, 0.2, 0.3],
6115                }],
6116                operational_writes: vec![],
6117            })
6118            .expect("vec insert write");
6119
6120        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6121        let count: i64 = conn
6122            .query_row(
6123                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-vec'",
6124                [],
6125                |row| row.get(0),
6126            )
6127            .expect("count");
6128        assert_eq!(count, 1, "VecInsert must persist a row in vec_nodes_active");
6129    }
6130
6131    // --- WriteRequest size validation tests ---
6132
6133    #[test]
6134    fn write_request_exceeding_node_limit_is_rejected() {
6135        let nodes: Vec<NodeInsert> = (0..10_001)
6136            .map(|i| NodeInsert {
6137                row_id: format!("row-{i}"),
6138                logical_id: format!("lg-{i}"),
6139                kind: "Note".to_owned(),
6140                properties: "{}".to_owned(),
6141                source_ref: None,
6142                upsert: false,
6143                chunk_policy: ChunkPolicy::Preserve,
6144            })
6145            .collect();
6146
6147        let request = WriteRequest {
6148            label: "too-many-nodes".to_owned(),
6149            nodes,
6150            node_retires: vec![],
6151            edges: vec![],
6152            edge_retires: vec![],
6153            chunks: vec![],
6154            runs: vec![],
6155            steps: vec![],
6156            actions: vec![],
6157            optional_backfills: vec![],
6158            vec_inserts: vec![],
6159            operational_writes: vec![],
6160        };
6161
6162        let result = prepare_write(request, ProvenanceMode::Warn)
6163            .map(|_| ())
6164            .map_err(|e| format!("{e}"));
6165        assert!(
6166            matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6167            "exceeding node limit must return InvalidWrite: got {result:?}"
6168        );
6169    }
6170
6171    #[test]
6172    fn write_request_exceeding_total_limit_is_rejected() {
6173        // Spread items across fields to exceed 100_000 total
6174        // without exceeding any single per-field limit.
6175        // nodes(10k) + edges(10k) + chunks(50k) + vec_inserts(20001) + operational(10k) = 100_001
6176        let request = WriteRequest {
6177            label: "too-many-total".to_owned(),
6178            nodes: (0..10_000)
6179                .map(|i| NodeInsert {
6180                    row_id: format!("row-{i}"),
6181                    logical_id: format!("lg-{i}"),
6182                    kind: "Note".to_owned(),
6183                    properties: "{}".to_owned(),
6184                    source_ref: None,
6185                    upsert: false,
6186                    chunk_policy: ChunkPolicy::Preserve,
6187                })
6188                .collect(),
6189            node_retires: vec![],
6190            edges: (0..10_000)
6191                .map(|i| EdgeInsert {
6192                    row_id: format!("edge-row-{i}"),
6193                    logical_id: format!("edge-lg-{i}"),
6194                    kind: "link".to_owned(),
6195                    source_logical_id: format!("lg-{i}"),
6196                    target_logical_id: format!("lg-{}", i + 1),
6197                    properties: "{}".to_owned(),
6198                    source_ref: None,
6199                    upsert: false,
6200                })
6201                .collect(),
6202            edge_retires: vec![],
6203            chunks: (0..50_000)
6204                .map(|i| ChunkInsert {
6205                    id: format!("chunk-{i}"),
6206                    node_logical_id: "lg-0".to_owned(),
6207                    text_content: "text".to_owned(),
6208                    byte_start: None,
6209                    byte_end: None,
6210                })
6211                .collect(),
6212            runs: vec![],
6213            steps: vec![],
6214            actions: vec![],
6215            optional_backfills: vec![],
6216            vec_inserts: (0..20_001)
6217                .map(|i| VecInsert {
6218                    chunk_id: format!("vec-chunk-{i}"),
6219                    embedding: vec![0.1],
6220                })
6221                .collect(),
6222            operational_writes: (0..10_000)
6223                .map(|i| OperationalWrite::Append {
6224                    collection: format!("col-{i}"),
6225                    record_key: format!("key-{i}"),
6226                    payload_json: "{}".to_owned(),
6227                    source_ref: None,
6228                })
6229                .collect(),
6230        };
6231
6232        let result = prepare_write(request, ProvenanceMode::Warn)
6233            .map(|_| ())
6234            .map_err(|e| format!("{e}"));
6235        assert!(
6236            matches!(result, Err(ref msg) if msg.contains("too many total items")),
6237            "exceeding total item limit must return InvalidWrite: got {result:?}"
6238        );
6239    }
6240
6241    #[test]
6242    fn write_request_within_limits_succeeds() {
6243        let db = NamedTempFile::new().expect("temporary db");
6244        let writer = WriterActor::start(
6245            db.path(),
6246            Arc::new(SchemaManager::new()),
6247            ProvenanceMode::Warn,
6248            Arc::new(TelemetryCounters::default()),
6249        )
6250        .expect("writer");
6251
6252        let result = writer.submit(WriteRequest {
6253            label: "within-limits".to_owned(),
6254            nodes: vec![NodeInsert {
6255                row_id: "row-1".to_owned(),
6256                logical_id: "lg-1".to_owned(),
6257                kind: "Note".to_owned(),
6258                properties: "{}".to_owned(),
6259                source_ref: None,
6260                upsert: false,
6261                chunk_policy: ChunkPolicy::Preserve,
6262            }],
6263            node_retires: vec![],
6264            edges: vec![],
6265            edge_retires: vec![],
6266            chunks: vec![],
6267            runs: vec![],
6268            steps: vec![],
6269            actions: vec![],
6270            optional_backfills: vec![],
6271            vec_inserts: vec![],
6272            operational_writes: vec![],
6273        });
6274
6275        assert!(
6276            result.is_ok(),
6277            "write request within limits must succeed: got {result:?}"
6278        );
6279    }
6280}