1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::SchemaManager;
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14 OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15 OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16 OperationalValidationMode, extract_secondary_index_entries_for_current,
17 extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18 parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26 pub target: ProjectionTarget,
28 pub payload: String,
30}
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35 #[default]
37 Preserve,
38 Replace,
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45 #[default]
47 Warn,
48 Require,
51}
52
53#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56 pub row_id: String,
57 pub logical_id: String,
58 pub kind: String,
59 pub properties: String,
60 pub source_ref: Option<String>,
61 pub upsert: bool,
64 pub chunk_policy: ChunkPolicy,
66}
67
68#[derive(Clone, Debug, PartialEq, Eq)]
70pub struct EdgeInsert {
71 pub row_id: String,
72 pub logical_id: String,
73 pub source_logical_id: String,
74 pub target_logical_id: String,
75 pub kind: String,
76 pub properties: String,
77 pub source_ref: Option<String>,
78 pub upsert: bool,
81}
82
83#[derive(Clone, Debug, PartialEq, Eq)]
85pub struct NodeRetire {
86 pub logical_id: String,
87 pub source_ref: Option<String>,
88}
89
90#[derive(Clone, Debug, PartialEq, Eq)]
92pub struct EdgeRetire {
93 pub logical_id: String,
94 pub source_ref: Option<String>,
95}
96
97#[derive(Clone, Debug, PartialEq, Eq)]
99pub struct ChunkInsert {
100 pub id: String,
101 pub node_logical_id: String,
102 pub text_content: String,
103 pub byte_start: Option<i64>,
104 pub byte_end: Option<i64>,
105}
106
107#[derive(Clone, Debug, PartialEq)]
114pub struct VecInsert {
115 pub chunk_id: String,
116 pub embedding: Vec<f32>,
117}
118
119#[derive(Clone, Debug, PartialEq, Eq)]
121pub enum OperationalWrite {
122 Append {
123 collection: String,
124 record_key: String,
125 payload_json: String,
126 source_ref: Option<String>,
127 },
128 Put {
129 collection: String,
130 record_key: String,
131 payload_json: String,
132 source_ref: Option<String>,
133 },
134 Delete {
135 collection: String,
136 record_key: String,
137 source_ref: Option<String>,
138 },
139}
140
141#[derive(Clone, Debug, PartialEq, Eq)]
143pub struct RunInsert {
144 pub id: String,
145 pub kind: String,
146 pub status: String,
147 pub properties: String,
148 pub source_ref: Option<String>,
149 pub upsert: bool,
150 pub supersedes_id: Option<String>,
151}
152
153#[derive(Clone, Debug, PartialEq, Eq)]
155pub struct StepInsert {
156 pub id: String,
157 pub run_id: String,
158 pub kind: String,
159 pub status: String,
160 pub properties: String,
161 pub source_ref: Option<String>,
162 pub upsert: bool,
163 pub supersedes_id: Option<String>,
164}
165
166#[derive(Clone, Debug, PartialEq, Eq)]
168pub struct ActionInsert {
169 pub id: String,
170 pub step_id: String,
171 pub kind: String,
172 pub status: String,
173 pub properties: String,
174 pub source_ref: Option<String>,
175 pub upsert: bool,
176 pub supersedes_id: Option<String>,
177}
178
179const MAX_NODES: usize = 10_000;
181const MAX_EDGES: usize = 10_000;
182const MAX_CHUNKS: usize = 50_000;
183const MAX_RETIRES: usize = 10_000;
184const MAX_RUNTIME_ITEMS: usize = 10_000;
185const MAX_OPERATIONAL: usize = 10_000;
186const MAX_TOTAL_ITEMS: usize = 100_000;
187
188const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
195
196#[derive(Clone, Debug, PartialEq)]
198pub struct WriteRequest {
199 pub label: String,
200 pub nodes: Vec<NodeInsert>,
201 pub node_retires: Vec<NodeRetire>,
202 pub edges: Vec<EdgeInsert>,
203 pub edge_retires: Vec<EdgeRetire>,
204 pub chunks: Vec<ChunkInsert>,
205 pub runs: Vec<RunInsert>,
206 pub steps: Vec<StepInsert>,
207 pub actions: Vec<ActionInsert>,
208 pub optional_backfills: Vec<OptionalProjectionTask>,
209 pub vec_inserts: Vec<VecInsert>,
212 pub operational_writes: Vec<OperationalWrite>,
213}
214
215#[derive(Clone, Debug, PartialEq, Eq)]
217pub struct WriteReceipt {
218 pub label: String,
219 pub optional_backfill_count: usize,
220 pub warnings: Vec<String>,
221 pub provenance_warnings: Vec<String>,
222}
223
224#[derive(Clone, Debug, PartialEq, Eq)]
226pub struct LastAccessTouchRequest {
227 pub logical_ids: Vec<String>,
228 pub touched_at: i64,
229 pub source_ref: Option<String>,
230}
231
232#[derive(Clone, Debug, PartialEq, Eq)]
234pub struct LastAccessTouchReport {
235 pub touched_logical_ids: usize,
236 pub touched_at: i64,
237}
238
239#[derive(Clone, Debug, PartialEq, Eq)]
240struct FtsProjectionRow {
241 chunk_id: String,
242 node_logical_id: String,
243 kind: String,
244 text_content: String,
245}
246
247struct PreparedWrite {
248 label: String,
249 nodes: Vec<NodeInsert>,
250 node_retires: Vec<NodeRetire>,
251 edges: Vec<EdgeInsert>,
252 edge_retires: Vec<EdgeRetire>,
253 chunks: Vec<ChunkInsert>,
254 runs: Vec<RunInsert>,
255 steps: Vec<StepInsert>,
256 actions: Vec<ActionInsert>,
257 #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
260 vec_inserts: Vec<VecInsert>,
261 operational_writes: Vec<OperationalWrite>,
262 operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
263 operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
264 operational_validation_warnings: Vec<String>,
265 node_kinds: HashMap<String, String>,
268 required_fts_rows: Vec<FtsProjectionRow>,
270 optional_backfills: Vec<OptionalProjectionTask>,
271}
272
273enum WriteMessage {
274 Submit {
275 prepared: Box<PreparedWrite>,
276 reply: Sender<Result<WriteReceipt, EngineError>>,
277 },
278 TouchLastAccessed {
279 request: LastAccessTouchRequest,
280 reply: Sender<Result<LastAccessTouchReport, EngineError>>,
281 },
282}
283
284#[derive(Debug)]
289pub struct WriterActor {
290 sender: ManuallyDrop<SyncSender<WriteMessage>>,
291 thread_handle: Option<thread::JoinHandle<()>>,
292 provenance_mode: ProvenanceMode,
293 _telemetry: Arc<TelemetryCounters>,
295}
296
297impl WriterActor {
298 pub fn start(
301 path: impl AsRef<Path>,
302 schema_manager: Arc<SchemaManager>,
303 provenance_mode: ProvenanceMode,
304 telemetry: Arc<TelemetryCounters>,
305 ) -> Result<Self, EngineError> {
306 let database_path = path.as_ref().to_path_buf();
307 let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
308
309 let writer_telemetry = Arc::clone(&telemetry);
310 let handle = thread::Builder::new()
311 .name("fathomdb-writer".to_owned())
312 .spawn(move || {
313 writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
314 })
315 .map_err(EngineError::Io)?;
316
317 Ok(Self {
318 sender: ManuallyDrop::new(sender),
319 thread_handle: Some(handle),
320 provenance_mode,
321 _telemetry: telemetry,
322 })
323 }
324
325 fn is_thread_alive(&self) -> bool {
327 self.thread_handle
328 .as_ref()
329 .is_some_and(|h| !h.is_finished())
330 }
331
332 fn check_thread_alive(&self) -> Result<(), EngineError> {
334 if self.is_thread_alive() {
335 Ok(())
336 } else {
337 Err(EngineError::WriterRejected(
338 "writer thread has exited".to_owned(),
339 ))
340 }
341 }
342
343 pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
347 self.check_thread_alive()?;
348 let prepared = prepare_write(request, self.provenance_mode)?;
349 let (reply_tx, reply_rx) = mpsc::channel();
350 self.sender
351 .send(WriteMessage::Submit {
352 prepared: Box::new(prepared),
353 reply: reply_tx,
354 })
355 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
356
357 recv_with_timeout(&reply_rx)
358 }
359
360 pub fn touch_last_accessed(
364 &self,
365 request: LastAccessTouchRequest,
366 ) -> Result<LastAccessTouchReport, EngineError> {
367 self.check_thread_alive()?;
368 prepare_touch_last_accessed(&request, self.provenance_mode)?;
369 let (reply_tx, reply_rx) = mpsc::channel();
370 self.sender
371 .send(WriteMessage::TouchLastAccessed {
372 request,
373 reply: reply_tx,
374 })
375 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
376
377 recv_with_timeout(&reply_rx)
378 }
379}
380
381#[cfg(not(feature = "tracing"))]
382#[allow(clippy::print_stderr)]
383fn stderr_panic_notice() {
384 eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
385}
386
387impl Drop for WriterActor {
388 fn drop(&mut self) {
389 unsafe { ManuallyDrop::drop(&mut self.sender) };
396
397 if let Some(handle) = self.thread_handle.take() {
399 match handle.join() {
400 Ok(()) => {}
401 Err(payload) => {
402 if std::thread::panicking() {
403 trace_warn!(
404 "writer thread panicked during shutdown (suppressed: already panicking)"
405 );
406 #[cfg(not(feature = "tracing"))]
407 stderr_panic_notice();
408 } else {
409 std::panic::resume_unwind(payload);
410 }
411 }
412 }
413 }
414 }
415}
416
417fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
419 rx.recv_timeout(WRITER_REPLY_TIMEOUT)
420 .map_err(|error| match error {
421 mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
422 "write timed out waiting for writer thread reply — the write may still commit"
423 .to_owned(),
424 ),
425 mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
426 })
427 .and_then(|result| result)
428}
429
430fn prepare_touch_last_accessed(
431 request: &LastAccessTouchRequest,
432 mode: ProvenanceMode,
433) -> Result<(), EngineError> {
434 if request.logical_ids.is_empty() {
435 return Err(EngineError::InvalidWrite(
436 "touch_last_accessed requires at least one logical_id".to_owned(),
437 ));
438 }
439 for logical_id in &request.logical_ids {
440 if logical_id.trim().is_empty() {
441 return Err(EngineError::InvalidWrite(
442 "touch_last_accessed requires non-empty logical_ids".to_owned(),
443 ));
444 }
445 }
446 if mode == ProvenanceMode::Require && request.source_ref.is_none() {
447 return Err(EngineError::InvalidWrite(
448 "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
449 .to_owned(),
450 ));
451 }
452 Ok(())
453}
454
455fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
456 let missing: Vec<String> = request
457 .nodes
458 .iter()
459 .filter(|n| n.source_ref.is_none())
460 .map(|n| format!("node '{}'", n.logical_id))
461 .chain(
462 request
463 .node_retires
464 .iter()
465 .filter(|r| r.source_ref.is_none())
466 .map(|r| format!("node retire '{}'", r.logical_id)),
467 )
468 .chain(
469 request
470 .edges
471 .iter()
472 .filter(|e| e.source_ref.is_none())
473 .map(|e| format!("edge '{}'", e.logical_id)),
474 )
475 .chain(
476 request
477 .edge_retires
478 .iter()
479 .filter(|r| r.source_ref.is_none())
480 .map(|r| format!("edge retire '{}'", r.logical_id)),
481 )
482 .chain(
483 request
484 .runs
485 .iter()
486 .filter(|r| r.source_ref.is_none())
487 .map(|r| format!("run '{}'", r.id)),
488 )
489 .chain(
490 request
491 .steps
492 .iter()
493 .filter(|s| s.source_ref.is_none())
494 .map(|s| format!("step '{}'", s.id)),
495 )
496 .chain(
497 request
498 .actions
499 .iter()
500 .filter(|a| a.source_ref.is_none())
501 .map(|a| format!("action '{}'", a.id)),
502 )
503 .chain(
504 request
505 .operational_writes
506 .iter()
507 .filter(|write| operational_write_source_ref(write).is_none())
508 .map(|write| {
509 format!(
510 "operational {} '{}:{}'",
511 operational_write_kind(write),
512 operational_write_collection(write),
513 operational_write_record_key(write)
514 )
515 }),
516 )
517 .collect();
518
519 if missing.is_empty() {
520 Ok(())
521 } else {
522 Err(EngineError::InvalidWrite(format!(
523 "ProvenanceMode::Require: missing source_ref on: {}",
524 missing.join(", ")
525 )))
526 }
527}
528
529fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
530 if request.nodes.len() > MAX_NODES {
531 return Err(EngineError::InvalidWrite(format!(
532 "too many nodes: {} exceeds limit of {MAX_NODES}",
533 request.nodes.len()
534 )));
535 }
536 if request.edges.len() > MAX_EDGES {
537 return Err(EngineError::InvalidWrite(format!(
538 "too many edges: {} exceeds limit of {MAX_EDGES}",
539 request.edges.len()
540 )));
541 }
542 if request.chunks.len() > MAX_CHUNKS {
543 return Err(EngineError::InvalidWrite(format!(
544 "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
545 request.chunks.len()
546 )));
547 }
548 let retires = request.node_retires.len() + request.edge_retires.len();
549 if retires > MAX_RETIRES {
550 return Err(EngineError::InvalidWrite(format!(
551 "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
552 )));
553 }
554 let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
555 if runtime_items > MAX_RUNTIME_ITEMS {
556 return Err(EngineError::InvalidWrite(format!(
557 "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
558 )));
559 }
560 if request.operational_writes.len() > MAX_OPERATIONAL {
561 return Err(EngineError::InvalidWrite(format!(
562 "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
563 request.operational_writes.len()
564 )));
565 }
566 let total = request.nodes.len()
567 + request.node_retires.len()
568 + request.edges.len()
569 + request.edge_retires.len()
570 + request.chunks.len()
571 + request.runs.len()
572 + request.steps.len()
573 + request.actions.len()
574 + request.vec_inserts.len()
575 + request.operational_writes.len();
576 if total > MAX_TOTAL_ITEMS {
577 return Err(EngineError::InvalidWrite(format!(
578 "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
579 )));
580 }
581 Ok(())
582}
583
584#[allow(clippy::too_many_lines)]
585fn prepare_write(
586 request: WriteRequest,
587 mode: ProvenanceMode,
588) -> Result<PreparedWrite, EngineError> {
589 validate_request_size(&request)?;
590
591 for node in &request.nodes {
593 if node.row_id.is_empty() {
594 return Err(EngineError::InvalidWrite(
595 "NodeInsert has empty row_id".to_owned(),
596 ));
597 }
598 if node.logical_id.is_empty() {
599 return Err(EngineError::InvalidWrite(
600 "NodeInsert has empty logical_id".to_owned(),
601 ));
602 }
603 }
604 for edge in &request.edges {
605 if edge.row_id.is_empty() {
606 return Err(EngineError::InvalidWrite(
607 "EdgeInsert has empty row_id".to_owned(),
608 ));
609 }
610 if edge.logical_id.is_empty() {
611 return Err(EngineError::InvalidWrite(
612 "EdgeInsert has empty logical_id".to_owned(),
613 ));
614 }
615 }
616 for chunk in &request.chunks {
617 if chunk.id.is_empty() {
618 return Err(EngineError::InvalidWrite(
619 "ChunkInsert has empty id".to_owned(),
620 ));
621 }
622 if chunk.text_content.is_empty() {
623 return Err(EngineError::InvalidWrite(format!(
624 "chunk '{}' has empty text_content; empty chunks are not allowed",
625 chunk.id
626 )));
627 }
628 }
629 for run in &request.runs {
630 if run.id.is_empty() {
631 return Err(EngineError::InvalidWrite(
632 "RunInsert has empty id".to_owned(),
633 ));
634 }
635 }
636 for step in &request.steps {
637 if step.id.is_empty() {
638 return Err(EngineError::InvalidWrite(
639 "StepInsert has empty id".to_owned(),
640 ));
641 }
642 }
643 for action in &request.actions {
644 if action.id.is_empty() {
645 return Err(EngineError::InvalidWrite(
646 "ActionInsert has empty id".to_owned(),
647 ));
648 }
649 }
650 for vi in &request.vec_inserts {
651 if vi.chunk_id.is_empty() {
652 return Err(EngineError::InvalidWrite(
653 "VecInsert has empty chunk_id".to_owned(),
654 ));
655 }
656 if vi.embedding.is_empty() {
657 return Err(EngineError::InvalidWrite(format!(
658 "VecInsert for chunk '{}' has empty embedding",
659 vi.chunk_id
660 )));
661 }
662 }
663 for operational in &request.operational_writes {
664 if operational_write_collection(operational).is_empty() {
665 return Err(EngineError::InvalidWrite(
666 "OperationalWrite has empty collection".to_owned(),
667 ));
668 }
669 if operational_write_record_key(operational).is_empty() {
670 return Err(EngineError::InvalidWrite(format!(
671 "OperationalWrite for collection '{}' has empty record_key",
672 operational_write_collection(operational)
673 )));
674 }
675 match operational {
676 OperationalWrite::Append { payload_json, .. }
677 | OperationalWrite::Put { payload_json, .. } => {
678 if payload_json.is_empty() {
679 return Err(EngineError::InvalidWrite(format!(
680 "OperationalWrite {} '{}:{}' has empty payload_json",
681 operational_write_kind(operational),
682 operational_write_collection(operational),
683 operational_write_record_key(operational)
684 )));
685 }
686 }
687 OperationalWrite::Delete { .. } => {}
688 }
689 }
690
691 {
693 let mut seen = std::collections::HashSet::new();
694 for node in &request.nodes {
695 if !seen.insert(node.row_id.as_str()) {
696 return Err(EngineError::InvalidWrite(format!(
697 "duplicate row_id '{}' within the same WriteRequest",
698 node.row_id
699 )));
700 }
701 }
702 for edge in &request.edges {
703 if !seen.insert(edge.row_id.as_str()) {
704 return Err(EngineError::InvalidWrite(format!(
705 "duplicate row_id '{}' within the same WriteRequest",
706 edge.row_id
707 )));
708 }
709 }
710 }
711
712 if mode == ProvenanceMode::Require {
714 check_require_provenance(&request)?;
715 }
716
717 for run in &request.runs {
719 if run.upsert && run.supersedes_id.is_none() {
720 return Err(EngineError::InvalidWrite(format!(
721 "run '{}': upsert=true requires supersedes_id to be set",
722 run.id
723 )));
724 }
725 }
726 for step in &request.steps {
727 if step.upsert && step.supersedes_id.is_none() {
728 return Err(EngineError::InvalidWrite(format!(
729 "step '{}': upsert=true requires supersedes_id to be set",
730 step.id
731 )));
732 }
733 }
734 for action in &request.actions {
735 if action.upsert && action.supersedes_id.is_none() {
736 return Err(EngineError::InvalidWrite(format!(
737 "action '{}': upsert=true requires supersedes_id to be set",
738 action.id
739 )));
740 }
741 }
742
743 let node_kinds = request
744 .nodes
745 .iter()
746 .map(|node| (node.logical_id.clone(), node.kind.clone()))
747 .collect::<HashMap<_, _>>();
748
749 Ok(PreparedWrite {
750 label: request.label,
751 nodes: request.nodes,
752 node_retires: request.node_retires,
753 edges: request.edges,
754 edge_retires: request.edge_retires,
755 chunks: request.chunks,
756 runs: request.runs,
757 steps: request.steps,
758 actions: request.actions,
759 vec_inserts: request.vec_inserts,
760 operational_writes: request.operational_writes,
761 operational_collection_kinds: HashMap::new(),
762 operational_collection_filter_fields: HashMap::new(),
763 operational_validation_warnings: Vec::new(),
764 node_kinds,
765 required_fts_rows: Vec::new(),
766 optional_backfills: request.optional_backfills,
767 })
768}
769
770fn writer_loop(
771 database_path: &Path,
772 schema_manager: &Arc<SchemaManager>,
773 receiver: mpsc::Receiver<WriteMessage>,
774 telemetry: &TelemetryCounters,
775) {
776 trace_info!("writer thread started");
777
778 let mut conn = match sqlite::open_connection(database_path) {
779 Ok(conn) => conn,
780 Err(error) => {
781 trace_error!(error = %error, "writer thread: database connection failed");
782 reject_all(receiver, &error.to_string());
783 return;
784 }
785 };
786
787 if let Err(error) = schema_manager.bootstrap(&conn) {
788 trace_error!(error = %error, "writer thread: schema bootstrap failed");
789 reject_all(receiver, &error.to_string());
790 return;
791 }
792
793 for message in receiver {
794 match message {
795 WriteMessage::Submit {
796 mut prepared,
797 reply,
798 } => {
799 #[cfg(feature = "tracing")]
800 let start = std::time::Instant::now();
801 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
802 resolve_and_apply(&mut conn, &mut prepared)
803 }));
804 if let Ok(inner) = result {
805 #[allow(unused_variables)]
806 if let Err(error) = &inner {
807 trace_error!(
808 label = %prepared.label,
809 error = %error,
810 "write failed"
811 );
812 telemetry.increment_errors();
813 } else {
814 let row_count = (prepared.nodes.len()
815 + prepared.edges.len()
816 + prepared.chunks.len()) as u64;
817 telemetry.increment_writes(row_count);
818 trace_info!(
819 label = %prepared.label,
820 nodes = prepared.nodes.len(),
821 edges = prepared.edges.len(),
822 chunks = prepared.chunks.len(),
823 duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
824 "write committed"
825 );
826 }
827 let _ = reply.send(inner);
828 } else {
829 trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
830 telemetry.increment_errors();
831 let _ = conn.execute_batch("ROLLBACK");
833 let _ = reply.send(Err(EngineError::WriterRejected(
834 "writer thread panic during resolve_and_apply".to_owned(),
835 )));
836 }
837 }
838 WriteMessage::TouchLastAccessed { request, reply } => {
839 let result = apply_touch_last_accessed(&mut conn, &request);
840 if result.is_ok() {
841 telemetry.increment_writes(0);
842 } else {
843 telemetry.increment_errors();
844 }
845 let _ = reply.send(result);
846 }
847 }
848 }
849
850 trace_info!("writer thread shutting down");
851}
852
853fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
854 for message in receiver {
855 match message {
856 WriteMessage::Submit { reply, .. } => {
857 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
858 }
859 WriteMessage::TouchLastAccessed { reply, .. } => {
860 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
861 }
862 }
863 }
864}
865
866fn resolve_fts_rows(
874 conn: &rusqlite::Connection,
875 prepared: &mut PreparedWrite,
876) -> Result<(), EngineError> {
877 let retiring_ids: std::collections::HashSet<&str> = prepared
878 .node_retires
879 .iter()
880 .map(|r| r.logical_id.as_str())
881 .collect();
882 for chunk in &prepared.chunks {
883 if retiring_ids.contains(chunk.node_logical_id.as_str()) {
884 return Err(EngineError::InvalidWrite(format!(
885 "chunk '{}' references node_logical_id '{}' which is being retired in the same \
886 WriteRequest; retire and chunk insertion for the same node must not be combined",
887 chunk.id, chunk.node_logical_id
888 )));
889 }
890 }
891 for chunk in &prepared.chunks {
892 let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
893 k.clone()
894 } else {
895 match conn.query_row(
896 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
897 params![chunk.node_logical_id],
898 |row| row.get::<_, String>(0),
899 ) {
900 Ok(kind) => kind,
901 Err(rusqlite::Error::QueryReturnedNoRows) => {
902 return Err(EngineError::InvalidWrite(format!(
903 "chunk '{}' references node_logical_id '{}' that is not present in this \
904 write request or the database \
905 (v1 limitation: chunks and their nodes must be submitted together or the \
906 node must already exist)",
907 chunk.id, chunk.node_logical_id
908 )));
909 }
910 Err(e) => return Err(EngineError::Sqlite(e)),
911 }
912 };
913 prepared.required_fts_rows.push(FtsProjectionRow {
914 chunk_id: chunk.id.clone(),
915 node_logical_id: chunk.node_logical_id.clone(),
916 kind,
917 text_content: chunk.text_content.clone(),
918 });
919 }
920 trace_debug!(
921 fts_rows = prepared.required_fts_rows.len(),
922 chunks_processed = prepared.chunks.len(),
923 "fts row resolution completed"
924 );
925 Ok(())
926}
927
928fn resolve_operational_writes(
929 conn: &rusqlite::Connection,
930 prepared: &mut PreparedWrite,
931) -> Result<(), EngineError> {
932 let mut collection_kinds = HashMap::new();
933 let mut collection_filter_fields = HashMap::new();
934 let mut collection_validation_contracts = HashMap::new();
935 for write in &prepared.operational_writes {
936 let collection = operational_write_collection(write);
937 if !collection_kinds.contains_key(collection) {
938 let maybe_row: Option<(String, Option<i64>, String, String)> = conn
939 .query_row(
940 "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
941 params![collection],
942 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
943 )
944 .optional()
945 .map_err(EngineError::Sqlite)?;
946 let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
947 .ok_or_else(|| {
948 EngineError::InvalidWrite(format!(
949 "operational collection '{collection}' is not registered"
950 ))
951 })?;
952 if disabled_at.is_some() {
953 return Err(EngineError::InvalidWrite(format!(
954 "operational collection '{collection}' is disabled"
955 )));
956 }
957 let kind = OperationalCollectionKind::try_from(kind_text.as_str())
958 .map_err(EngineError::InvalidWrite)?;
959 let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
960 let validation_contract = parse_operational_validation_contract(&validation_json)
961 .map_err(EngineError::InvalidWrite)?;
962 collection_kinds.insert(collection.to_owned(), kind);
963 collection_filter_fields.insert(collection.to_owned(), filter_fields);
964 collection_validation_contracts.insert(collection.to_owned(), validation_contract);
965 }
966
967 let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
968 EngineError::InvalidWrite("missing operational collection kind".to_owned())
969 })?;
970 match (kind, write) {
971 (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
972 | (
973 OperationalCollectionKind::LatestState,
974 OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
975 ) => {}
976 (OperationalCollectionKind::AppendOnlyLog, _) => {
977 return Err(EngineError::InvalidWrite(format!(
978 "operational collection '{collection}' is append_only_log and only accepts Append"
979 )));
980 }
981 (OperationalCollectionKind::LatestState, _) => {
982 return Err(EngineError::InvalidWrite(format!(
983 "operational collection '{collection}' is latest_state and only accepts Put/Delete"
984 )));
985 }
986 }
987 if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
988 let _ = check_operational_write_against_contract(write, contract)?;
989 }
990 }
991 prepared.operational_collection_kinds = collection_kinds;
992 prepared.operational_collection_filter_fields = collection_filter_fields;
993 Ok(())
994}
995
996fn parse_operational_filter_fields(
997 filter_fields_json: &str,
998) -> Result<Vec<OperationalFilterField>, EngineError> {
999 let fields: Vec<OperationalFilterField> =
1000 serde_json::from_str(filter_fields_json).map_err(|error| {
1001 EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1002 })?;
1003 let mut seen = std::collections::HashSet::new();
1004 for field in &fields {
1005 if field.name.trim().is_empty() {
1006 return Err(EngineError::InvalidWrite(
1007 "filter_fields_json field names must not be empty".to_owned(),
1008 ));
1009 }
1010 if !seen.insert(field.name.as_str()) {
1011 return Err(EngineError::InvalidWrite(format!(
1012 "filter_fields_json contains duplicate field '{}'",
1013 field.name
1014 )));
1015 }
1016 if field.modes.is_empty() {
1017 return Err(EngineError::InvalidWrite(format!(
1018 "filter_fields_json field '{}' must declare at least one mode",
1019 field.name
1020 )));
1021 }
1022 if field.modes.contains(&OperationalFilterMode::Prefix)
1023 && field.field_type != OperationalFilterFieldType::String
1024 {
1025 return Err(EngineError::InvalidWrite(format!(
1026 "filter field '{}' only supports prefix for string types",
1027 field.name
1028 )));
1029 }
1030 }
1031 Ok(fields)
1032}
1033
1034#[derive(Clone, Debug, PartialEq, Eq)]
1035struct OperationalFilterValueRow {
1036 field_name: String,
1037 string_value: Option<String>,
1038 integer_value: Option<i64>,
1039}
1040
1041fn extract_operational_filter_values(
1042 filter_fields: &[OperationalFilterField],
1043 payload_json: &str,
1044) -> Vec<OperationalFilterValueRow> {
1045 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1046 return Vec::new();
1047 };
1048 let Some(object) = parsed.as_object() else {
1049 return Vec::new();
1050 };
1051
1052 filter_fields
1053 .iter()
1054 .filter_map(|field| {
1055 let value = object.get(&field.name)?;
1056 match field.field_type {
1057 OperationalFilterFieldType::String => {
1058 value
1059 .as_str()
1060 .map(|string_value| OperationalFilterValueRow {
1061 field_name: field.name.clone(),
1062 string_value: Some(string_value.to_owned()),
1063 integer_value: None,
1064 })
1065 }
1066 OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1067 value
1068 .as_i64()
1069 .map(|integer_value| OperationalFilterValueRow {
1070 field_name: field.name.clone(),
1071 string_value: None,
1072 integer_value: Some(integer_value),
1073 })
1074 }
1075 }
1076 })
1077 .collect()
1078}
1079
1080fn resolve_and_apply(
1081 conn: &mut rusqlite::Connection,
1082 prepared: &mut PreparedWrite,
1083) -> Result<WriteReceipt, EngineError> {
1084 resolve_fts_rows(conn, prepared)?;
1085 resolve_operational_writes(conn, prepared)?;
1086 apply_write(conn, prepared)
1087}
1088
1089fn apply_touch_last_accessed(
1090 conn: &mut rusqlite::Connection,
1091 request: &LastAccessTouchRequest,
1092) -> Result<LastAccessTouchReport, EngineError> {
1093 let mut seen = std::collections::HashSet::new();
1094 let logical_ids = request
1095 .logical_ids
1096 .iter()
1097 .filter(|logical_id| seen.insert(logical_id.as_str()))
1098 .cloned()
1099 .collect::<Vec<_>>();
1100 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1101
1102 for logical_id in &logical_ids {
1103 let exists = tx
1104 .query_row(
1105 "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1106 params![logical_id],
1107 |row| row.get::<_, i64>(0),
1108 )
1109 .optional()?
1110 .is_some();
1111 if !exists {
1112 return Err(EngineError::InvalidWrite(format!(
1113 "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1114 )));
1115 }
1116 }
1117
1118 {
1119 let mut upsert_metadata = tx.prepare_cached(
1120 "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1121 VALUES (?1, ?2, ?2) \
1122 ON CONFLICT(logical_id) DO UPDATE SET \
1123 last_accessed_at = excluded.last_accessed_at, \
1124 updated_at = excluded.updated_at",
1125 )?;
1126 let mut insert_provenance = tx.prepare_cached(
1127 "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1128 VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1129 )?;
1130 for logical_id in &logical_ids {
1131 upsert_metadata.execute(params![logical_id, request.touched_at])?;
1132 insert_provenance.execute(params![
1133 new_id(),
1134 logical_id,
1135 request.source_ref.as_deref(),
1136 format!("{{\"touched_at\":{}}}", request.touched_at),
1137 ])?;
1138 }
1139 }
1140
1141 tx.commit()?;
1142 Ok(LastAccessTouchReport {
1143 touched_logical_ids: logical_ids.len(),
1144 touched_at: request.touched_at,
1145 })
1146}
1147
1148fn ensure_operational_collections_writable(
1149 tx: &rusqlite::Transaction<'_>,
1150 prepared: &PreparedWrite,
1151) -> Result<(), EngineError> {
1152 for collection in prepared.operational_collection_kinds.keys() {
1153 let disabled_at: Option<Option<i64>> = tx
1154 .query_row(
1155 "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1156 params![collection],
1157 |row| row.get::<_, Option<i64>>(0),
1158 )
1159 .optional()?;
1160 match disabled_at {
1161 Some(Some(_)) => {
1162 return Err(EngineError::InvalidWrite(format!(
1163 "operational collection '{collection}' is disabled"
1164 )));
1165 }
1166 Some(None) => {}
1167 None => {
1168 return Err(EngineError::InvalidWrite(format!(
1169 "operational collection '{collection}' is not registered"
1170 )));
1171 }
1172 }
1173 }
1174 Ok(())
1175}
1176
1177fn validate_operational_writes_against_live_contracts(
1178 tx: &rusqlite::Transaction<'_>,
1179 prepared: &PreparedWrite,
1180) -> Result<Vec<String>, EngineError> {
1181 let mut collection_validation_contracts =
1182 HashMap::<String, Option<OperationalValidationContract>>::new();
1183 for collection in prepared.operational_collection_kinds.keys() {
1184 let validation_json: String = tx
1185 .query_row(
1186 "SELECT validation_json FROM operational_collections WHERE name = ?1",
1187 params![collection],
1188 |row| row.get(0),
1189 )
1190 .map_err(EngineError::Sqlite)?;
1191 let validation_contract = parse_operational_validation_contract(&validation_json)
1192 .map_err(EngineError::InvalidWrite)?;
1193 collection_validation_contracts.insert(collection.clone(), validation_contract);
1194 }
1195
1196 let mut warnings = Vec::new();
1197 for write in &prepared.operational_writes {
1198 if let Some(Some(contract)) =
1199 collection_validation_contracts.get(operational_write_collection(write))
1200 && let Some(warning) = check_operational_write_against_contract(write, contract)?
1201 {
1202 warnings.push(warning);
1203 }
1204 }
1205
1206 Ok(warnings)
1207}
1208
1209fn load_live_operational_secondary_indexes(
1210 tx: &rusqlite::Transaction<'_>,
1211 prepared: &PreparedWrite,
1212) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1213 let mut collection_indexes = HashMap::new();
1214 for (collection, collection_kind) in &prepared.operational_collection_kinds {
1215 let secondary_indexes_json: String = tx
1216 .query_row(
1217 "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1218 params![collection],
1219 |row| row.get(0),
1220 )
1221 .map_err(EngineError::Sqlite)?;
1222 let indexes =
1223 parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1224 .map_err(EngineError::InvalidWrite)?;
1225 collection_indexes.insert(collection.clone(), indexes);
1226 }
1227 Ok(collection_indexes)
1228}
1229
1230fn check_operational_write_against_contract(
1231 write: &OperationalWrite,
1232 contract: &OperationalValidationContract,
1233) -> Result<Option<String>, EngineError> {
1234 if contract.mode == OperationalValidationMode::Disabled {
1235 return Ok(None);
1236 }
1237
1238 let (payload_json, collection, record_key) = match write {
1239 OperationalWrite::Append {
1240 collection,
1241 record_key,
1242 payload_json,
1243 ..
1244 }
1245 | OperationalWrite::Put {
1246 collection,
1247 record_key,
1248 payload_json,
1249 ..
1250 } => (
1251 payload_json.as_str(),
1252 collection.as_str(),
1253 record_key.as_str(),
1254 ),
1255 OperationalWrite::Delete { .. } => return Ok(None),
1256 };
1257
1258 match validate_operational_payload_against_contract(contract, payload_json) {
1259 Ok(()) => Ok(None),
1260 Err(message) => match contract.mode {
1261 OperationalValidationMode::Disabled => Ok(None),
1262 OperationalValidationMode::ReportOnly => Ok(Some(format!(
1263 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1264 kind = operational_write_kind(write)
1265 ))),
1266 OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1267 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1268 kind = operational_write_kind(write)
1269 ))),
1270 },
1271 }
1272}
1273
1274#[allow(clippy::too_many_lines)]
1275fn apply_write(
1276 conn: &mut rusqlite::Connection,
1277 prepared: &mut PreparedWrite,
1278) -> Result<WriteReceipt, EngineError> {
1279 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1280
1281 {
1284 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1285 let mut sup_node = tx.prepare_cached(
1286 "UPDATE nodes SET superseded_at = unixepoch() \
1287 WHERE logical_id = ?1 AND superseded_at IS NULL",
1288 )?;
1289 let mut ins_event = tx.prepare_cached(
1290 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1291 VALUES (?1, 'node_retire', ?2, ?3)",
1292 )?;
1293 for retire in &prepared.node_retires {
1294 del_fts.execute(params![retire.logical_id])?;
1295 sup_node.execute(params![retire.logical_id])?;
1296 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1297 }
1298 }
1299
1300 {
1302 let mut sup_edge = tx.prepare_cached(
1303 "UPDATE edges SET superseded_at = unixepoch() \
1304 WHERE logical_id = ?1 AND superseded_at IS NULL",
1305 )?;
1306 let mut ins_event = tx.prepare_cached(
1307 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1308 VALUES (?1, 'edge_retire', ?2, ?3)",
1309 )?;
1310 for retire in &prepared.edge_retires {
1311 sup_edge.execute(params![retire.logical_id])?;
1312 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1313 }
1314 }
1315
1316 {
1318 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1319 let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1320 let mut sup_node = tx.prepare_cached(
1321 "UPDATE nodes SET superseded_at = unixepoch() \
1322 WHERE logical_id = ?1 AND superseded_at IS NULL",
1323 )?;
1324 let mut ins_node = tx.prepare_cached(
1325 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1326 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1327 )?;
1328 #[cfg(feature = "sqlite-vec")]
1329 let vec_del_sql2 = "DELETE FROM vec_nodes_active WHERE chunk_id IN \
1330 (SELECT id FROM chunks WHERE node_logical_id = ?1)";
1331 #[cfg(feature = "sqlite-vec")]
1332 let mut del_vec = match tx.prepare_cached(vec_del_sql2) {
1333 Ok(stmt) => Some(stmt),
1334 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => None,
1335 Err(e) => return Err(e.into()),
1336 };
1337 for node in &prepared.nodes {
1338 if node.upsert {
1339 if node.chunk_policy == ChunkPolicy::Replace {
1340 #[cfg(feature = "sqlite-vec")]
1341 if let Some(ref mut stmt) = del_vec {
1342 stmt.execute(params![node.logical_id])?;
1343 }
1344 del_fts.execute(params![node.logical_id])?;
1345 del_chunks.execute(params![node.logical_id])?;
1346 }
1347 sup_node.execute(params![node.logical_id])?;
1348 }
1349 ins_node.execute(params![
1350 node.row_id,
1351 node.logical_id,
1352 node.kind,
1353 node.properties,
1354 node.source_ref,
1355 ])?;
1356 }
1357 }
1358
1359 {
1361 let mut sup_edge = tx.prepare_cached(
1362 "UPDATE edges SET superseded_at = unixepoch() \
1363 WHERE logical_id = ?1 AND superseded_at IS NULL",
1364 )?;
1365 let mut ins_edge = tx.prepare_cached(
1366 "INSERT INTO edges \
1367 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1368 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1369 )?;
1370 for edge in &prepared.edges {
1371 if edge.upsert {
1372 sup_edge.execute(params![edge.logical_id])?;
1373 }
1374 ins_edge.execute(params![
1375 edge.row_id,
1376 edge.logical_id,
1377 edge.source_logical_id,
1378 edge.target_logical_id,
1379 edge.kind,
1380 edge.properties,
1381 edge.source_ref,
1382 ])?;
1383 }
1384 }
1385
1386 {
1388 let mut ins_chunk = tx.prepare_cached(
1389 "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at) \
1390 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch())",
1391 )?;
1392 for chunk in &prepared.chunks {
1393 ins_chunk.execute(params![
1394 chunk.id,
1395 chunk.node_logical_id,
1396 chunk.text_content,
1397 chunk.byte_start,
1398 chunk.byte_end,
1399 ])?;
1400 }
1401 }
1402
1403 {
1405 let mut sup_run = tx.prepare_cached(
1406 "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1407 )?;
1408 let mut ins_run = tx.prepare_cached(
1409 "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1410 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1411 )?;
1412 for run in &prepared.runs {
1413 if run.upsert
1414 && let Some(ref prior_id) = run.supersedes_id
1415 {
1416 sup_run.execute(params![prior_id])?;
1417 }
1418 ins_run.execute(params![
1419 run.id,
1420 run.kind,
1421 run.status,
1422 run.properties,
1423 run.source_ref
1424 ])?;
1425 }
1426 }
1427
1428 {
1430 let mut sup_step = tx.prepare_cached(
1431 "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1432 )?;
1433 let mut ins_step = tx.prepare_cached(
1434 "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1435 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1436 )?;
1437 for step in &prepared.steps {
1438 if step.upsert
1439 && let Some(ref prior_id) = step.supersedes_id
1440 {
1441 sup_step.execute(params![prior_id])?;
1442 }
1443 ins_step.execute(params![
1444 step.id,
1445 step.run_id,
1446 step.kind,
1447 step.status,
1448 step.properties,
1449 step.source_ref,
1450 ])?;
1451 }
1452 }
1453
1454 {
1456 let mut sup_action = tx.prepare_cached(
1457 "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1458 )?;
1459 let mut ins_action = tx.prepare_cached(
1460 "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1461 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1462 )?;
1463 for action in &prepared.actions {
1464 if action.upsert
1465 && let Some(ref prior_id) = action.supersedes_id
1466 {
1467 sup_action.execute(params![prior_id])?;
1468 }
1469 ins_action.execute(params![
1470 action.id,
1471 action.step_id,
1472 action.kind,
1473 action.status,
1474 action.properties,
1475 action.source_ref,
1476 ])?;
1477 }
1478 }
1479
1480 {
1482 ensure_operational_collections_writable(&tx, prepared)?;
1483 prepared.operational_validation_warnings =
1484 validate_operational_writes_against_live_contracts(&tx, prepared)?;
1485 let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
1486
1487 let mut next_mutation_order: i64 = tx.query_row(
1488 "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
1489 [],
1490 |row| row.get(0),
1491 )?;
1492 let mut ins_mutation = tx.prepare_cached(
1493 "INSERT INTO operational_mutations \
1494 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1495 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1496 )?;
1497 let mut ins_filter_value = tx.prepare_cached(
1498 "INSERT INTO operational_filter_values \
1499 (mutation_id, collection_name, field_name, string_value, integer_value) \
1500 VALUES (?1, ?2, ?3, ?4, ?5)",
1501 )?;
1502 let mut upsert_current = tx.prepare_cached(
1503 "INSERT INTO operational_current \
1504 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1505 VALUES (?1, ?2, ?3, unixepoch(), ?4) \
1506 ON CONFLICT(collection_name, record_key) DO UPDATE SET \
1507 payload_json = excluded.payload_json, \
1508 updated_at = excluded.updated_at, \
1509 last_mutation_id = excluded.last_mutation_id",
1510 )?;
1511 let mut del_current = tx.prepare_cached(
1512 "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
1513 )?;
1514 let mut del_current_secondary_indexes = tx.prepare_cached(
1515 "DELETE FROM operational_secondary_index_entries \
1516 WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
1517 )?;
1518 let mut ins_secondary_index = tx.prepare_cached(
1519 "INSERT INTO operational_secondary_index_entries \
1520 (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
1521 slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
1522 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1523 )?;
1524 let mut current_row_stmt = tx.prepare_cached(
1525 "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
1526 WHERE collection_name = ?1 AND record_key = ?2",
1527 )?;
1528
1529 for write in &prepared.operational_writes {
1530 let collection = operational_write_collection(write);
1531 let record_key = operational_write_record_key(write);
1532 let mutation_id = new_id();
1533 next_mutation_order += 1;
1534 let payload_json = operational_write_payload(write);
1535 ins_mutation.execute(params![
1536 &mutation_id,
1537 collection,
1538 record_key,
1539 operational_write_kind(write),
1540 payload_json,
1541 operational_write_source_ref(write),
1542 next_mutation_order,
1543 ])?;
1544 if let Some(indexes) = collection_secondary_indexes.get(collection) {
1545 for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
1546 ins_secondary_index.execute(params![
1547 collection,
1548 entry.index_name,
1549 "mutation",
1550 &mutation_id,
1551 record_key,
1552 entry.sort_timestamp,
1553 entry.slot1_text,
1554 entry.slot1_integer,
1555 entry.slot2_text,
1556 entry.slot2_integer,
1557 entry.slot3_text,
1558 entry.slot3_integer,
1559 ])?;
1560 }
1561 }
1562 if let Some(filter_fields) = prepared
1563 .operational_collection_filter_fields
1564 .get(collection)
1565 {
1566 for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
1567 ins_filter_value.execute(params![
1568 &mutation_id,
1569 collection,
1570 filter_value.field_name,
1571 filter_value.string_value,
1572 filter_value.integer_value,
1573 ])?;
1574 }
1575 }
1576
1577 if prepared.operational_collection_kinds.get(collection)
1578 == Some(&OperationalCollectionKind::LatestState)
1579 {
1580 del_current_secondary_indexes.execute(params![collection, record_key])?;
1581 match write {
1582 OperationalWrite::Put { payload_json, .. } => {
1583 upsert_current.execute(params![
1584 collection,
1585 record_key,
1586 payload_json,
1587 &mutation_id,
1588 ])?;
1589 if let Some(indexes) = collection_secondary_indexes.get(collection) {
1590 let (current_payload_json, updated_at, last_mutation_id): (
1591 String,
1592 i64,
1593 String,
1594 ) = current_row_stmt
1595 .query_row(params![collection, record_key], |row| {
1596 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1597 })?;
1598 for entry in extract_secondary_index_entries_for_current(
1599 indexes,
1600 ¤t_payload_json,
1601 updated_at,
1602 ) {
1603 ins_secondary_index.execute(params![
1604 collection,
1605 entry.index_name,
1606 "current",
1607 last_mutation_id.as_str(),
1608 record_key,
1609 entry.sort_timestamp,
1610 entry.slot1_text,
1611 entry.slot1_integer,
1612 entry.slot2_text,
1613 entry.slot2_integer,
1614 entry.slot3_text,
1615 entry.slot3_integer,
1616 ])?;
1617 }
1618 }
1619 }
1620 OperationalWrite::Delete { .. } => {
1621 del_current.execute(params![collection, record_key])?;
1622 }
1623 OperationalWrite::Append { .. } => {}
1624 }
1625 }
1626 }
1627 }
1628
1629 {
1631 let mut ins_fts = tx.prepare_cached(
1632 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
1633 VALUES (?1, ?2, ?3, ?4)",
1634 )?;
1635 for fts_row in &prepared.required_fts_rows {
1636 ins_fts.execute(params![
1637 fts_row.chunk_id,
1638 fts_row.node_logical_id,
1639 fts_row.kind,
1640 fts_row.text_content,
1641 ])?;
1642 }
1643 }
1644
1645 #[cfg(feature = "sqlite-vec")]
1647 {
1648 match tx
1649 .prepare_cached("INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES (?1, ?2)")
1650 {
1651 Ok(mut ins_vec) => {
1652 for vi in &prepared.vec_inserts {
1653 let bytes: Vec<u8> =
1654 vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
1655 ins_vec.execute(params![vi.chunk_id, bytes])?;
1656 }
1657 }
1658 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {
1659 }
1661 Err(e) => return Err(e.into()),
1662 }
1663 }
1664
1665 tx.commit()?;
1666
1667 let provenance_warnings: Vec<String> = prepared
1668 .nodes
1669 .iter()
1670 .filter(|node| node.source_ref.is_none())
1671 .map(|node| format!("node '{}' has no source_ref", node.logical_id))
1672 .chain(
1673 prepared
1674 .node_retires
1675 .iter()
1676 .filter(|r| r.source_ref.is_none())
1677 .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
1678 )
1679 .chain(
1680 prepared
1681 .edges
1682 .iter()
1683 .filter(|e| e.source_ref.is_none())
1684 .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
1685 )
1686 .chain(
1687 prepared
1688 .edge_retires
1689 .iter()
1690 .filter(|r| r.source_ref.is_none())
1691 .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
1692 )
1693 .chain(
1694 prepared
1695 .runs
1696 .iter()
1697 .filter(|r| r.source_ref.is_none())
1698 .map(|r| format!("run '{}' has no source_ref", r.id)),
1699 )
1700 .chain(
1701 prepared
1702 .steps
1703 .iter()
1704 .filter(|s| s.source_ref.is_none())
1705 .map(|s| format!("step '{}' has no source_ref", s.id)),
1706 )
1707 .chain(
1708 prepared
1709 .actions
1710 .iter()
1711 .filter(|a| a.source_ref.is_none())
1712 .map(|a| format!("action '{}' has no source_ref", a.id)),
1713 )
1714 .chain(
1715 prepared
1716 .operational_writes
1717 .iter()
1718 .filter(|write| operational_write_source_ref(write).is_none())
1719 .map(|write| {
1720 format!(
1721 "operational {} '{}:{}' has no source_ref",
1722 operational_write_kind(write),
1723 operational_write_collection(write),
1724 operational_write_record_key(write)
1725 )
1726 }),
1727 )
1728 .collect();
1729
1730 let mut warnings = provenance_warnings.clone();
1731 warnings.extend(prepared.operational_validation_warnings.clone());
1732
1733 Ok(WriteReceipt {
1734 label: prepared.label.clone(),
1735 optional_backfill_count: prepared.optional_backfills.len(),
1736 warnings,
1737 provenance_warnings,
1738 })
1739}
1740
1741fn operational_write_collection(write: &OperationalWrite) -> &str {
1742 match write {
1743 OperationalWrite::Append { collection, .. }
1744 | OperationalWrite::Put { collection, .. }
1745 | OperationalWrite::Delete { collection, .. } => collection,
1746 }
1747}
1748
1749fn operational_write_record_key(write: &OperationalWrite) -> &str {
1750 match write {
1751 OperationalWrite::Append { record_key, .. }
1752 | OperationalWrite::Put { record_key, .. }
1753 | OperationalWrite::Delete { record_key, .. } => record_key,
1754 }
1755}
1756
1757fn operational_write_kind(write: &OperationalWrite) -> &'static str {
1758 match write {
1759 OperationalWrite::Append { .. } => "append",
1760 OperationalWrite::Put { .. } => "put",
1761 OperationalWrite::Delete { .. } => "delete",
1762 }
1763}
1764
1765fn operational_write_payload(write: &OperationalWrite) -> &str {
1766 match write {
1767 OperationalWrite::Append { payload_json, .. }
1768 | OperationalWrite::Put { payload_json, .. } => payload_json,
1769 OperationalWrite::Delete { .. } => "null",
1770 }
1771}
1772
1773fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
1774 match write {
1775 OperationalWrite::Append { source_ref, .. }
1776 | OperationalWrite::Put { source_ref, .. }
1777 | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
1778 }
1779}
1780
1781#[cfg(test)]
1782#[allow(clippy::expect_used)]
1783mod tests {
1784 use std::sync::Arc;
1785
1786 use fathomdb_schema::SchemaManager;
1787 use tempfile::NamedTempFile;
1788
1789 use super::{apply_write, prepare_write, resolve_operational_writes};
1790 use crate::{
1791 ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
1792 NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
1793 StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
1794 projection::ProjectionTarget,
1795 };
1796
1797 #[test]
1798 fn writer_executes_runtime_table_rows() {
1799 let db = NamedTempFile::new().expect("temporary db");
1800 let writer = WriterActor::start(
1801 db.path(),
1802 Arc::new(SchemaManager::new()),
1803 ProvenanceMode::Warn,
1804 Arc::new(TelemetryCounters::default()),
1805 )
1806 .expect("writer");
1807
1808 let receipt = writer
1809 .submit(WriteRequest {
1810 label: "runtime".to_owned(),
1811 nodes: vec![],
1812 node_retires: vec![],
1813 edges: vec![],
1814 edge_retires: vec![],
1815 chunks: vec![],
1816 runs: vec![RunInsert {
1817 id: "run-1".to_owned(),
1818 kind: "session".to_owned(),
1819 status: "completed".to_owned(),
1820 properties: "{}".to_owned(),
1821 source_ref: Some("src-1".to_owned()),
1822 upsert: false,
1823 supersedes_id: None,
1824 }],
1825 steps: vec![StepInsert {
1826 id: "step-1".to_owned(),
1827 run_id: "run-1".to_owned(),
1828 kind: "llm".to_owned(),
1829 status: "completed".to_owned(),
1830 properties: "{}".to_owned(),
1831 source_ref: Some("src-1".to_owned()),
1832 upsert: false,
1833 supersedes_id: None,
1834 }],
1835 actions: vec![ActionInsert {
1836 id: "action-1".to_owned(),
1837 step_id: "step-1".to_owned(),
1838 kind: "emit".to_owned(),
1839 status: "completed".to_owned(),
1840 properties: "{}".to_owned(),
1841 source_ref: Some("src-1".to_owned()),
1842 upsert: false,
1843 supersedes_id: None,
1844 }],
1845 optional_backfills: vec![],
1846 vec_inserts: vec![],
1847 operational_writes: vec![],
1848 })
1849 .expect("write receipt");
1850
1851 assert_eq!(receipt.label, "runtime");
1852 }
1853
1854 #[test]
1855 fn writer_put_operational_write_updates_current_and_mutations() {
1856 let db = NamedTempFile::new().expect("temporary db");
1857 let conn = rusqlite::Connection::open(db.path()).expect("conn");
1858 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
1859 conn.execute(
1860 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
1861 VALUES ('connector_health', 'latest_state', '{}', '{}')",
1862 [],
1863 )
1864 .expect("seed collection");
1865 drop(conn);
1866 let writer = WriterActor::start(
1867 db.path(),
1868 Arc::new(SchemaManager::new()),
1869 ProvenanceMode::Warn,
1870 Arc::new(TelemetryCounters::default()),
1871 )
1872 .expect("writer");
1873
1874 writer
1875 .submit(WriteRequest {
1876 label: "node-and-operational".to_owned(),
1877 nodes: vec![NodeInsert {
1878 row_id: "row-1".to_owned(),
1879 logical_id: "lg-1".to_owned(),
1880 kind: "Meeting".to_owned(),
1881 properties: "{}".to_owned(),
1882 source_ref: Some("src-1".to_owned()),
1883 upsert: false,
1884 chunk_policy: ChunkPolicy::Preserve,
1885 }],
1886 node_retires: vec![],
1887 edges: vec![],
1888 edge_retires: vec![],
1889 chunks: vec![],
1890 runs: vec![],
1891 steps: vec![],
1892 actions: vec![],
1893 optional_backfills: vec![],
1894 vec_inserts: vec![],
1895 operational_writes: vec![OperationalWrite::Put {
1896 collection: "connector_health".to_owned(),
1897 record_key: "gmail".to_owned(),
1898 payload_json: r#"{"status":"ok"}"#.to_owned(),
1899 source_ref: Some("src-1".to_owned()),
1900 }],
1901 })
1902 .expect("write receipt");
1903
1904 let conn = rusqlite::Connection::open(db.path()).expect("conn");
1905 let node_count: i64 = conn
1906 .query_row(
1907 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
1908 [],
1909 |row| row.get(0),
1910 )
1911 .expect("node count");
1912 assert_eq!(node_count, 1);
1913 let mutation_count: i64 = conn
1914 .query_row(
1915 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
1916 AND record_key = 'gmail'",
1917 [],
1918 |row| row.get(0),
1919 )
1920 .expect("mutation count");
1921 assert_eq!(mutation_count, 1);
1922 let payload: String = conn
1923 .query_row(
1924 "SELECT payload_json FROM operational_current \
1925 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
1926 [],
1927 |row| row.get(0),
1928 )
1929 .expect("current payload");
1930 assert_eq!(payload, r#"{"status":"ok"}"#);
1931 }
1932
1933 #[test]
1934 fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
1935 let db = NamedTempFile::new().expect("temporary db");
1936 let conn = rusqlite::Connection::open(db.path()).expect("conn");
1937 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
1938 conn.execute(
1939 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
1940 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
1941 [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
1942 )
1943 .expect("seed collection");
1944 drop(conn);
1945 let writer = WriterActor::start(
1946 db.path(),
1947 Arc::new(SchemaManager::new()),
1948 ProvenanceMode::Warn,
1949 Arc::new(TelemetryCounters::default()),
1950 )
1951 .expect("writer");
1952
1953 writer
1954 .submit(WriteRequest {
1955 label: "disabled-validation".to_owned(),
1956 nodes: vec![],
1957 node_retires: vec![],
1958 edges: vec![],
1959 edge_retires: vec![],
1960 chunks: vec![],
1961 runs: vec![],
1962 steps: vec![],
1963 actions: vec![],
1964 optional_backfills: vec![],
1965 vec_inserts: vec![],
1966 operational_writes: vec![OperationalWrite::Put {
1967 collection: "connector_health".to_owned(),
1968 record_key: "gmail".to_owned(),
1969 payload_json: r#"{"bogus":true}"#.to_owned(),
1970 source_ref: Some("src-1".to_owned()),
1971 }],
1972 })
1973 .expect("write receipt");
1974
1975 let conn = rusqlite::Connection::open(db.path()).expect("conn");
1976 let payload: String = conn
1977 .query_row(
1978 "SELECT payload_json FROM operational_current \
1979 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
1980 [],
1981 |row| row.get(0),
1982 )
1983 .expect("current payload");
1984 assert_eq!(payload, r#"{"bogus":true}"#);
1985 }
1986
1987 #[test]
1988 fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
1989 let db = NamedTempFile::new().expect("temporary db");
1990 let conn = rusqlite::Connection::open(db.path()).expect("conn");
1991 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
1992 conn.execute(
1993 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
1994 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
1995 [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
1996 )
1997 .expect("seed collection");
1998 drop(conn);
1999 let writer = WriterActor::start(
2000 db.path(),
2001 Arc::new(SchemaManager::new()),
2002 ProvenanceMode::Warn,
2003 Arc::new(TelemetryCounters::default()),
2004 )
2005 .expect("writer");
2006
2007 let receipt = writer
2008 .submit(WriteRequest {
2009 label: "report-only-validation".to_owned(),
2010 nodes: vec![],
2011 node_retires: vec![],
2012 edges: vec![],
2013 edge_retires: vec![],
2014 chunks: vec![],
2015 runs: vec![],
2016 steps: vec![],
2017 actions: vec![],
2018 optional_backfills: vec![],
2019 vec_inserts: vec![],
2020 operational_writes: vec![OperationalWrite::Put {
2021 collection: "connector_health".to_owned(),
2022 record_key: "gmail".to_owned(),
2023 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2024 source_ref: Some("src-1".to_owned()),
2025 }],
2026 })
2027 .expect("report_only write should succeed");
2028
2029 assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2030 assert_eq!(receipt.warnings.len(), 1);
2031 assert!(
2032 receipt.warnings[0].contains("connector_health"),
2033 "warning should identify collection"
2034 );
2035 assert!(
2036 receipt.warnings[0].contains("must be one of"),
2037 "warning should explain validation failure"
2038 );
2039
2040 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2041 let payload: String = conn
2042 .query_row(
2043 "SELECT payload_json FROM operational_current \
2044 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2045 [],
2046 |row| row.get(0),
2047 )
2048 .expect("current payload");
2049 assert_eq!(payload, r#"{"status":"bogus"}"#);
2050 }
2051
2052 #[test]
2053 fn writer_rejects_operational_write_for_missing_collection() {
2054 let db = NamedTempFile::new().expect("temporary db");
2055 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2056 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2057 drop(conn);
2058 let writer = WriterActor::start(
2059 db.path(),
2060 Arc::new(SchemaManager::new()),
2061 ProvenanceMode::Warn,
2062 Arc::new(TelemetryCounters::default()),
2063 )
2064 .expect("writer");
2065
2066 let result = writer.submit(WriteRequest {
2067 label: "missing-operational-collection".to_owned(),
2068 nodes: vec![],
2069 node_retires: vec![],
2070 edges: vec![],
2071 edge_retires: vec![],
2072 chunks: vec![],
2073 runs: vec![],
2074 steps: vec![],
2075 actions: vec![],
2076 optional_backfills: vec![],
2077 vec_inserts: vec![],
2078 operational_writes: vec![OperationalWrite::Put {
2079 collection: "connector_health".to_owned(),
2080 record_key: "gmail".to_owned(),
2081 payload_json: r#"{"status":"ok"}"#.to_owned(),
2082 source_ref: Some("src-1".to_owned()),
2083 }],
2084 });
2085
2086 assert!(
2087 matches!(result, Err(EngineError::InvalidWrite(_))),
2088 "missing operational collection must return InvalidWrite"
2089 );
2090 }
2091
2092 #[test]
2093 fn writer_append_operational_write_records_history_without_current_row() {
2094 let db = NamedTempFile::new().expect("temporary db");
2095 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2096 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2097 conn.execute(
2098 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2099 VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2100 [],
2101 )
2102 .expect("seed collection");
2103 drop(conn);
2104 let writer = WriterActor::start(
2105 db.path(),
2106 Arc::new(SchemaManager::new()),
2107 ProvenanceMode::Warn,
2108 Arc::new(TelemetryCounters::default()),
2109 )
2110 .expect("writer");
2111
2112 writer
2113 .submit(WriteRequest {
2114 label: "append-operational".to_owned(),
2115 nodes: vec![],
2116 node_retires: vec![],
2117 edges: vec![],
2118 edge_retires: vec![],
2119 chunks: vec![],
2120 runs: vec![],
2121 steps: vec![],
2122 actions: vec![],
2123 optional_backfills: vec![],
2124 vec_inserts: vec![],
2125 operational_writes: vec![OperationalWrite::Append {
2126 collection: "audit_log".to_owned(),
2127 record_key: "evt-1".to_owned(),
2128 payload_json: r#"{"type":"sync"}"#.to_owned(),
2129 source_ref: Some("src-1".to_owned()),
2130 }],
2131 })
2132 .expect("write receipt");
2133
2134 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2135 let mutation: (String, String) = conn
2136 .query_row(
2137 "SELECT op_kind, payload_json FROM operational_mutations \
2138 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2139 [],
2140 |row| Ok((row.get(0)?, row.get(1)?)),
2141 )
2142 .expect("mutation row");
2143 assert_eq!(mutation.0, "append");
2144 assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2145 let current_count: i64 = conn
2146 .query_row(
2147 "SELECT count(*) FROM operational_current \
2148 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2149 [],
2150 |row| row.get(0),
2151 )
2152 .expect("current count");
2153 assert_eq!(current_count, 0);
2154 }
2155
2156 #[test]
2157 fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2158 let db = NamedTempFile::new().expect("temporary db");
2159 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2160 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2161 conn.execute(
2162 "INSERT INTO operational_collections \
2163 (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2164 VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2165 '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2166 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2167 )
2168 .expect("seed collection");
2169 drop(conn);
2170 let writer = WriterActor::start(
2171 db.path(),
2172 Arc::new(SchemaManager::new()),
2173 ProvenanceMode::Warn,
2174 Arc::new(TelemetryCounters::default()),
2175 )
2176 .expect("writer");
2177
2178 let error = writer
2179 .submit(WriteRequest {
2180 label: "invalid-append".to_owned(),
2181 nodes: vec![],
2182 node_retires: vec![],
2183 edges: vec![],
2184 edge_retires: vec![],
2185 chunks: vec![],
2186 runs: vec![],
2187 steps: vec![],
2188 actions: vec![],
2189 optional_backfills: vec![],
2190 vec_inserts: vec![],
2191 operational_writes: vec![OperationalWrite::Append {
2192 collection: "audit_log".to_owned(),
2193 record_key: "evt-1".to_owned(),
2194 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2195 source_ref: Some("src-1".to_owned()),
2196 }],
2197 })
2198 .expect_err("invalid append must reject");
2199 assert!(matches!(error, EngineError::InvalidWrite(_)));
2200 assert!(error.to_string().contains("must be one of"));
2201
2202 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2203 let mutation_count: i64 = conn
2204 .query_row(
2205 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2206 [],
2207 |row| row.get(0),
2208 )
2209 .expect("mutation count");
2210 assert_eq!(mutation_count, 0);
2211 let filter_count: i64 = conn
2212 .query_row(
2213 "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2214 [],
2215 |row| row.get(0),
2216 )
2217 .expect("filter count");
2218 assert_eq!(filter_count, 0);
2219 }
2220
2221 #[test]
2222 fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2223 let db = NamedTempFile::new().expect("temporary db");
2224 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2225 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2226 conn.execute(
2227 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2228 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2229 [],
2230 )
2231 .expect("seed collection");
2232 drop(conn);
2233 let writer = WriterActor::start(
2234 db.path(),
2235 Arc::new(SchemaManager::new()),
2236 ProvenanceMode::Warn,
2237 Arc::new(TelemetryCounters::default()),
2238 )
2239 .expect("writer");
2240
2241 writer
2242 .submit(WriteRequest {
2243 label: "put-operational".to_owned(),
2244 nodes: vec![],
2245 node_retires: vec![],
2246 edges: vec![],
2247 edge_retires: vec![],
2248 chunks: vec![],
2249 runs: vec![],
2250 steps: vec![],
2251 actions: vec![],
2252 optional_backfills: vec![],
2253 vec_inserts: vec![],
2254 operational_writes: vec![OperationalWrite::Put {
2255 collection: "connector_health".to_owned(),
2256 record_key: "gmail".to_owned(),
2257 payload_json: r#"{"status":"ok"}"#.to_owned(),
2258 source_ref: Some("src-1".to_owned()),
2259 }],
2260 })
2261 .expect("put receipt");
2262
2263 writer
2264 .submit(WriteRequest {
2265 label: "delete-operational".to_owned(),
2266 nodes: vec![],
2267 node_retires: vec![],
2268 edges: vec![],
2269 edge_retires: vec![],
2270 chunks: vec![],
2271 runs: vec![],
2272 steps: vec![],
2273 actions: vec![],
2274 optional_backfills: vec![],
2275 vec_inserts: vec![],
2276 operational_writes: vec![OperationalWrite::Delete {
2277 collection: "connector_health".to_owned(),
2278 record_key: "gmail".to_owned(),
2279 source_ref: Some("src-2".to_owned()),
2280 }],
2281 })
2282 .expect("delete receipt");
2283
2284 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2285 let mutation_kinds: Vec<String> = {
2286 let mut stmt = conn
2287 .prepare(
2288 "SELECT op_kind FROM operational_mutations \
2289 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2290 ORDER BY mutation_order ASC",
2291 )
2292 .expect("stmt");
2293 stmt.query_map([], |row| row.get(0))
2294 .expect("rows")
2295 .collect::<Result<_, _>>()
2296 .expect("collect")
2297 };
2298 assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
2299 let current_count: i64 = conn
2300 .query_row(
2301 "SELECT count(*) FROM operational_current \
2302 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2303 [],
2304 |row| row.get(0),
2305 )
2306 .expect("current count");
2307 assert_eq!(current_count, 0);
2308 }
2309
2310 #[test]
2311 fn writer_delete_bypasses_validation_contract() {
2312 let db = NamedTempFile::new().expect("temporary db");
2313 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2314 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2315 conn.execute(
2316 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2317 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2318 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2319 )
2320 .expect("seed collection");
2321 drop(conn);
2322 let writer = WriterActor::start(
2323 db.path(),
2324 Arc::new(SchemaManager::new()),
2325 ProvenanceMode::Warn,
2326 Arc::new(TelemetryCounters::default()),
2327 )
2328 .expect("writer");
2329
2330 writer
2331 .submit(WriteRequest {
2332 label: "valid-put".to_owned(),
2333 nodes: vec![],
2334 node_retires: vec![],
2335 edges: vec![],
2336 edge_retires: vec![],
2337 chunks: vec![],
2338 runs: vec![],
2339 steps: vec![],
2340 actions: vec![],
2341 optional_backfills: vec![],
2342 vec_inserts: vec![],
2343 operational_writes: vec![OperationalWrite::Put {
2344 collection: "connector_health".to_owned(),
2345 record_key: "gmail".to_owned(),
2346 payload_json: r#"{"status":"ok"}"#.to_owned(),
2347 source_ref: Some("src-1".to_owned()),
2348 }],
2349 })
2350 .expect("put receipt");
2351 writer
2352 .submit(WriteRequest {
2353 label: "delete-after-put".to_owned(),
2354 nodes: vec![],
2355 node_retires: vec![],
2356 edges: vec![],
2357 edge_retires: vec![],
2358 chunks: vec![],
2359 runs: vec![],
2360 steps: vec![],
2361 actions: vec![],
2362 optional_backfills: vec![],
2363 vec_inserts: vec![],
2364 operational_writes: vec![OperationalWrite::Delete {
2365 collection: "connector_health".to_owned(),
2366 record_key: "gmail".to_owned(),
2367 source_ref: Some("src-2".to_owned()),
2368 }],
2369 })
2370 .expect("delete receipt");
2371
2372 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2373 let current_count: i64 = conn
2374 .query_row(
2375 "SELECT count(*) FROM operational_current \
2376 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2377 [],
2378 |row| row.get(0),
2379 )
2380 .expect("current count");
2381 assert_eq!(current_count, 0);
2382 }
2383
2384 #[test]
2385 fn writer_latest_state_secondary_indexes_track_put_and_delete() {
2386 let db = NamedTempFile::new().expect("temporary db");
2387 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2388 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2389 conn.execute(
2390 "INSERT INTO operational_collections \
2391 (name, kind, schema_json, retention_json, secondary_indexes_json) \
2392 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2393 [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
2394 )
2395 .expect("seed collection");
2396 drop(conn);
2397 let writer = WriterActor::start(
2398 db.path(),
2399 Arc::new(SchemaManager::new()),
2400 ProvenanceMode::Warn,
2401 Arc::new(TelemetryCounters::default()),
2402 )
2403 .expect("writer");
2404
2405 writer
2406 .submit(WriteRequest {
2407 label: "secondary-index-put".to_owned(),
2408 nodes: vec![],
2409 node_retires: vec![],
2410 edges: vec![],
2411 edge_retires: vec![],
2412 chunks: vec![],
2413 runs: vec![],
2414 steps: vec![],
2415 actions: vec![],
2416 optional_backfills: vec![],
2417 vec_inserts: vec![],
2418 operational_writes: vec![OperationalWrite::Put {
2419 collection: "connector_health".to_owned(),
2420 record_key: "gmail".to_owned(),
2421 payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
2422 .to_owned(),
2423 source_ref: Some("src-1".to_owned()),
2424 }],
2425 })
2426 .expect("put receipt");
2427
2428 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2429 let current_entry_count: i64 = conn
2430 .query_row(
2431 "SELECT count(*) FROM operational_secondary_index_entries \
2432 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2433 [],
2434 |row| row.get(0),
2435 )
2436 .expect("current secondary index count");
2437 assert_eq!(current_entry_count, 2);
2438 drop(conn);
2439
2440 writer
2441 .submit(WriteRequest {
2442 label: "secondary-index-delete".to_owned(),
2443 nodes: vec![],
2444 node_retires: vec![],
2445 edges: vec![],
2446 edge_retires: vec![],
2447 chunks: vec![],
2448 runs: vec![],
2449 steps: vec![],
2450 actions: vec![],
2451 optional_backfills: vec![],
2452 vec_inserts: vec![],
2453 operational_writes: vec![OperationalWrite::Delete {
2454 collection: "connector_health".to_owned(),
2455 record_key: "gmail".to_owned(),
2456 source_ref: Some("src-2".to_owned()),
2457 }],
2458 })
2459 .expect("delete receipt");
2460
2461 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2462 let current_entry_count: i64 = conn
2463 .query_row(
2464 "SELECT count(*) FROM operational_secondary_index_entries \
2465 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2466 [],
2467 |row| row.get(0),
2468 )
2469 .expect("current secondary index count");
2470 assert_eq!(current_entry_count, 0);
2471 }
2472
2473 #[test]
2474 fn writer_latest_state_operational_writes_persist_mutation_order() {
2475 let db = NamedTempFile::new().expect("temporary db");
2476 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2477 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2478 conn.execute(
2479 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2480 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2481 [],
2482 )
2483 .expect("seed collection");
2484 drop(conn);
2485
2486 let writer = WriterActor::start(
2487 db.path(),
2488 Arc::new(SchemaManager::new()),
2489 ProvenanceMode::Warn,
2490 Arc::new(TelemetryCounters::default()),
2491 )
2492 .expect("writer");
2493
2494 writer
2495 .submit(WriteRequest {
2496 label: "ordered-operational-batch".to_owned(),
2497 nodes: vec![],
2498 node_retires: vec![],
2499 edges: vec![],
2500 edge_retires: vec![],
2501 chunks: vec![],
2502 runs: vec![],
2503 steps: vec![],
2504 actions: vec![],
2505 optional_backfills: vec![],
2506 vec_inserts: vec![],
2507 operational_writes: vec![
2508 OperationalWrite::Put {
2509 collection: "connector_health".to_owned(),
2510 record_key: "gmail".to_owned(),
2511 payload_json: r#"{"status":"old"}"#.to_owned(),
2512 source_ref: Some("src-1".to_owned()),
2513 },
2514 OperationalWrite::Delete {
2515 collection: "connector_health".to_owned(),
2516 record_key: "gmail".to_owned(),
2517 source_ref: Some("src-2".to_owned()),
2518 },
2519 OperationalWrite::Put {
2520 collection: "connector_health".to_owned(),
2521 record_key: "gmail".to_owned(),
2522 payload_json: r#"{"status":"new"}"#.to_owned(),
2523 source_ref: Some("src-3".to_owned()),
2524 },
2525 ],
2526 })
2527 .expect("write receipt");
2528
2529 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2530 let rows: Vec<(String, i64)> = {
2531 let mut stmt = conn
2532 .prepare(
2533 "SELECT op_kind, mutation_order FROM operational_mutations \
2534 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2535 ORDER BY mutation_order ASC",
2536 )
2537 .expect("stmt");
2538 stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
2539 .expect("rows")
2540 .collect::<Result<_, _>>()
2541 .expect("collect")
2542 };
2543 assert_eq!(
2544 rows,
2545 vec![
2546 ("put".to_owned(), 1),
2547 ("delete".to_owned(), 2),
2548 ("put".to_owned(), 3),
2549 ]
2550 );
2551 let payload: String = conn
2552 .query_row(
2553 "SELECT payload_json FROM operational_current \
2554 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2555 [],
2556 |row| row.get(0),
2557 )
2558 .expect("current payload");
2559 assert_eq!(payload, r#"{"status":"new"}"#);
2560 }
2561
2562 #[test]
2563 fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
2564 let db = NamedTempFile::new().expect("temporary db");
2565 let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
2566 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2567 conn.execute(
2568 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2569 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2570 [],
2571 )
2572 .expect("seed collection");
2573
2574 let request = WriteRequest {
2575 label: "disabled-race".to_owned(),
2576 nodes: vec![],
2577 node_retires: vec![],
2578 edges: vec![],
2579 edge_retires: vec![],
2580 chunks: vec![],
2581 runs: vec![],
2582 steps: vec![],
2583 actions: vec![],
2584 optional_backfills: vec![],
2585 vec_inserts: vec![],
2586 operational_writes: vec![OperationalWrite::Put {
2587 collection: "connector_health".to_owned(),
2588 record_key: "gmail".to_owned(),
2589 payload_json: r#"{"status":"ok"}"#.to_owned(),
2590 source_ref: Some("src-1".to_owned()),
2591 }],
2592 };
2593 let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
2594 resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
2595
2596 conn.execute(
2597 "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
2598 [],
2599 )
2600 .expect("disable collection after preflight");
2601
2602 let error =
2603 apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
2604 assert!(matches!(error, EngineError::InvalidWrite(_)));
2605 assert!(error.to_string().contains("is disabled"));
2606
2607 let mutation_count: i64 = conn
2608 .query_row(
2609 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
2610 [],
2611 |row| row.get(0),
2612 )
2613 .expect("mutation count");
2614 assert_eq!(mutation_count, 0);
2615
2616 let current_count: i64 = conn
2617 .query_row(
2618 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
2619 [],
2620 |row| row.get(0),
2621 )
2622 .expect("current count");
2623 assert_eq!(current_count, 0);
2624 }
2625
2626 #[test]
2627 fn writer_enforce_validation_rejects_invalid_put_atomically() {
2628 let db = NamedTempFile::new().expect("temporary db");
2629 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2630 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2631 conn.execute(
2632 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2633 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2634 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2635 )
2636 .expect("seed collection");
2637 drop(conn);
2638 let writer = WriterActor::start(
2639 db.path(),
2640 Arc::new(SchemaManager::new()),
2641 ProvenanceMode::Warn,
2642 Arc::new(TelemetryCounters::default()),
2643 )
2644 .expect("writer");
2645
2646 let error = writer
2647 .submit(WriteRequest {
2648 label: "invalid-put".to_owned(),
2649 nodes: vec![NodeInsert {
2650 row_id: "row-1".to_owned(),
2651 logical_id: "lg-1".to_owned(),
2652 kind: "Meeting".to_owned(),
2653 properties: "{}".to_owned(),
2654 source_ref: Some("src-1".to_owned()),
2655 upsert: false,
2656 chunk_policy: ChunkPolicy::Preserve,
2657 }],
2658 node_retires: vec![],
2659 edges: vec![],
2660 edge_retires: vec![],
2661 chunks: vec![],
2662 runs: vec![],
2663 steps: vec![],
2664 actions: vec![],
2665 optional_backfills: vec![],
2666 vec_inserts: vec![],
2667 operational_writes: vec![OperationalWrite::Put {
2668 collection: "connector_health".to_owned(),
2669 record_key: "gmail".to_owned(),
2670 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2671 source_ref: Some("src-1".to_owned()),
2672 }],
2673 })
2674 .expect_err("invalid put must reject");
2675 assert!(matches!(error, EngineError::InvalidWrite(_)));
2676 assert!(error.to_string().contains("must be one of"));
2677
2678 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2679 let node_count: i64 = conn
2680 .query_row(
2681 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2682 [],
2683 |row| row.get(0),
2684 )
2685 .expect("node count");
2686 assert_eq!(node_count, 0);
2687 let mutation_count: i64 = conn
2688 .query_row(
2689 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
2690 [],
2691 |row| row.get(0),
2692 )
2693 .expect("mutation count");
2694 assert_eq!(mutation_count, 0);
2695 let current_count: i64 = conn
2696 .query_row(
2697 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
2698 [],
2699 |row| row.get(0),
2700 )
2701 .expect("current count");
2702 assert_eq!(current_count, 0);
2703 }
2704
2705 #[test]
2706 fn writer_rejects_append_against_latest_state_collection() {
2707 let db = NamedTempFile::new().expect("temporary db");
2708 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2709 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2710 conn.execute(
2711 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2712 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2713 [],
2714 )
2715 .expect("seed collection");
2716 drop(conn);
2717 let writer = WriterActor::start(
2718 db.path(),
2719 Arc::new(SchemaManager::new()),
2720 ProvenanceMode::Warn,
2721 Arc::new(TelemetryCounters::default()),
2722 )
2723 .expect("writer");
2724
2725 let result = writer.submit(WriteRequest {
2726 label: "bad-append".to_owned(),
2727 nodes: vec![],
2728 node_retires: vec![],
2729 edges: vec![],
2730 edge_retires: vec![],
2731 chunks: vec![],
2732 runs: vec![],
2733 steps: vec![],
2734 actions: vec![],
2735 optional_backfills: vec![],
2736 vec_inserts: vec![],
2737 operational_writes: vec![OperationalWrite::Append {
2738 collection: "connector_health".to_owned(),
2739 record_key: "gmail".to_owned(),
2740 payload_json: r#"{"status":"ok"}"#.to_owned(),
2741 source_ref: Some("src-1".to_owned()),
2742 }],
2743 });
2744
2745 assert!(
2746 matches!(result, Err(EngineError::InvalidWrite(_))),
2747 "latest_state collection must reject Append"
2748 );
2749 }
2750
2751 #[test]
2752 fn writer_upsert_supersedes_prior_active_node() {
2753 let db = NamedTempFile::new().expect("temporary db");
2754 let writer = WriterActor::start(
2755 db.path(),
2756 Arc::new(SchemaManager::new()),
2757 ProvenanceMode::Warn,
2758 Arc::new(TelemetryCounters::default()),
2759 )
2760 .expect("writer");
2761
2762 writer
2763 .submit(WriteRequest {
2764 label: "v1".to_owned(),
2765 nodes: vec![NodeInsert {
2766 row_id: "row-1".to_owned(),
2767 logical_id: "lg-1".to_owned(),
2768 kind: "Meeting".to_owned(),
2769 properties: r#"{"version":1}"#.to_owned(),
2770 source_ref: Some("src-1".to_owned()),
2771 upsert: false,
2772 chunk_policy: ChunkPolicy::Preserve,
2773 }],
2774 node_retires: vec![],
2775 edges: vec![],
2776 edge_retires: vec![],
2777 chunks: vec![],
2778 runs: vec![],
2779 steps: vec![],
2780 actions: vec![],
2781 optional_backfills: vec![],
2782 vec_inserts: vec![],
2783 operational_writes: vec![],
2784 })
2785 .expect("v1 write");
2786
2787 writer
2788 .submit(WriteRequest {
2789 label: "v2".to_owned(),
2790 nodes: vec![NodeInsert {
2791 row_id: "row-2".to_owned(),
2792 logical_id: "lg-1".to_owned(),
2793 kind: "Meeting".to_owned(),
2794 properties: r#"{"version":2}"#.to_owned(),
2795 source_ref: Some("src-2".to_owned()),
2796 upsert: true,
2797 chunk_policy: ChunkPolicy::Preserve,
2798 }],
2799 node_retires: vec![],
2800 edges: vec![],
2801 edge_retires: vec![],
2802 chunks: vec![],
2803 runs: vec![],
2804 steps: vec![],
2805 actions: vec![],
2806 optional_backfills: vec![],
2807 vec_inserts: vec![],
2808 operational_writes: vec![],
2809 })
2810 .expect("v2 upsert write");
2811
2812 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2813 let (active_row_id, props): (String, String) = conn
2814 .query_row(
2815 "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
2816 [],
2817 |row| Ok((row.get(0)?, row.get(1)?)),
2818 )
2819 .expect("active row");
2820 assert_eq!(active_row_id, "row-2");
2821 assert!(props.contains("\"version\":2"));
2822
2823 let superseded: i64 = conn
2824 .query_row(
2825 "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
2826 [],
2827 |row| row.get(0),
2828 )
2829 .expect("superseded count");
2830 assert_eq!(superseded, 1);
2831 }
2832
2833 #[test]
2834 fn writer_inserts_edge_between_two_nodes() {
2835 let db = NamedTempFile::new().expect("temporary db");
2836 let writer = WriterActor::start(
2837 db.path(),
2838 Arc::new(SchemaManager::new()),
2839 ProvenanceMode::Warn,
2840 Arc::new(TelemetryCounters::default()),
2841 )
2842 .expect("writer");
2843
2844 writer
2845 .submit(WriteRequest {
2846 label: "nodes-and-edge".to_owned(),
2847 nodes: vec![
2848 NodeInsert {
2849 row_id: "row-meeting".to_owned(),
2850 logical_id: "meeting-1".to_owned(),
2851 kind: "Meeting".to_owned(),
2852 properties: "{}".to_owned(),
2853 source_ref: Some("src-1".to_owned()),
2854 upsert: false,
2855 chunk_policy: ChunkPolicy::Preserve,
2856 },
2857 NodeInsert {
2858 row_id: "row-task".to_owned(),
2859 logical_id: "task-1".to_owned(),
2860 kind: "Task".to_owned(),
2861 properties: "{}".to_owned(),
2862 source_ref: Some("src-1".to_owned()),
2863 upsert: false,
2864 chunk_policy: ChunkPolicy::Preserve,
2865 },
2866 ],
2867 node_retires: vec![],
2868 edges: vec![EdgeInsert {
2869 row_id: "edge-1".to_owned(),
2870 logical_id: "edge-lg-1".to_owned(),
2871 source_logical_id: "meeting-1".to_owned(),
2872 target_logical_id: "task-1".to_owned(),
2873 kind: "HAS_TASK".to_owned(),
2874 properties: "{}".to_owned(),
2875 source_ref: Some("src-1".to_owned()),
2876 upsert: false,
2877 }],
2878 edge_retires: vec![],
2879 chunks: vec![],
2880 runs: vec![],
2881 steps: vec![],
2882 actions: vec![],
2883 optional_backfills: vec![],
2884 vec_inserts: vec![],
2885 operational_writes: vec![],
2886 })
2887 .expect("write receipt");
2888
2889 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2890 let (src, tgt, kind): (String, String, String) = conn
2891 .query_row(
2892 "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
2893 [],
2894 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2895 )
2896 .expect("edge row");
2897 assert_eq!(src, "meeting-1");
2898 assert_eq!(tgt, "task-1");
2899 assert_eq!(kind, "HAS_TASK");
2900 }
2901
2902 #[test]
2903 #[allow(clippy::too_many_lines)]
2904 fn writer_upsert_supersedes_prior_active_edge() {
2905 let db = NamedTempFile::new().expect("temporary db");
2906 let writer = WriterActor::start(
2907 db.path(),
2908 Arc::new(SchemaManager::new()),
2909 ProvenanceMode::Warn,
2910 Arc::new(TelemetryCounters::default()),
2911 )
2912 .expect("writer");
2913
2914 writer
2916 .submit(WriteRequest {
2917 label: "nodes".to_owned(),
2918 nodes: vec![
2919 NodeInsert {
2920 row_id: "row-a".to_owned(),
2921 logical_id: "node-a".to_owned(),
2922 kind: "Meeting".to_owned(),
2923 properties: "{}".to_owned(),
2924 source_ref: Some("src-1".to_owned()),
2925 upsert: false,
2926 chunk_policy: ChunkPolicy::Preserve,
2927 },
2928 NodeInsert {
2929 row_id: "row-b".to_owned(),
2930 logical_id: "node-b".to_owned(),
2931 kind: "Task".to_owned(),
2932 properties: "{}".to_owned(),
2933 source_ref: Some("src-1".to_owned()),
2934 upsert: false,
2935 chunk_policy: ChunkPolicy::Preserve,
2936 },
2937 ],
2938 node_retires: vec![],
2939 edges: vec![],
2940 edge_retires: vec![],
2941 chunks: vec![],
2942 runs: vec![],
2943 steps: vec![],
2944 actions: vec![],
2945 optional_backfills: vec![],
2946 vec_inserts: vec![],
2947 operational_writes: vec![],
2948 })
2949 .expect("nodes write");
2950
2951 writer
2953 .submit(WriteRequest {
2954 label: "edge-v1".to_owned(),
2955 nodes: vec![],
2956 node_retires: vec![],
2957 edges: vec![EdgeInsert {
2958 row_id: "edge-row-1".to_owned(),
2959 logical_id: "edge-lg-1".to_owned(),
2960 source_logical_id: "node-a".to_owned(),
2961 target_logical_id: "node-b".to_owned(),
2962 kind: "HAS_TASK".to_owned(),
2963 properties: r#"{"weight":1}"#.to_owned(),
2964 source_ref: Some("src-1".to_owned()),
2965 upsert: false,
2966 }],
2967 edge_retires: vec![],
2968 chunks: vec![],
2969 runs: vec![],
2970 steps: vec![],
2971 actions: vec![],
2972 optional_backfills: vec![],
2973 vec_inserts: vec![],
2974 operational_writes: vec![],
2975 })
2976 .expect("edge v1 write");
2977
2978 writer
2980 .submit(WriteRequest {
2981 label: "edge-v2".to_owned(),
2982 nodes: vec![],
2983 node_retires: vec![],
2984 edges: vec![EdgeInsert {
2985 row_id: "edge-row-2".to_owned(),
2986 logical_id: "edge-lg-1".to_owned(),
2987 source_logical_id: "node-a".to_owned(),
2988 target_logical_id: "node-b".to_owned(),
2989 kind: "HAS_TASK".to_owned(),
2990 properties: r#"{"weight":2}"#.to_owned(),
2991 source_ref: Some("src-2".to_owned()),
2992 upsert: true,
2993 }],
2994 edge_retires: vec![],
2995 chunks: vec![],
2996 runs: vec![],
2997 steps: vec![],
2998 actions: vec![],
2999 optional_backfills: vec![],
3000 vec_inserts: vec![],
3001 operational_writes: vec![],
3002 })
3003 .expect("edge v2 upsert");
3004
3005 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3006 let (active_row_id, props): (String, String) = conn
3007 .query_row(
3008 "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3009 [],
3010 |row| Ok((row.get(0)?, row.get(1)?)),
3011 )
3012 .expect("active edge");
3013 assert_eq!(active_row_id, "edge-row-2");
3014 assert!(props.contains("\"weight\":2"));
3015
3016 let superseded: i64 = conn
3017 .query_row(
3018 "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3019 [],
3020 |row| row.get(0),
3021 )
3022 .expect("superseded count");
3023 assert_eq!(superseded, 1);
3024 }
3025
3026 #[test]
3027 fn writer_fts_rows_are_written_to_database() {
3028 let db = NamedTempFile::new().expect("temporary db");
3029 let writer = WriterActor::start(
3030 db.path(),
3031 Arc::new(SchemaManager::new()),
3032 ProvenanceMode::Warn,
3033 Arc::new(TelemetryCounters::default()),
3034 )
3035 .expect("writer");
3036
3037 writer
3038 .submit(WriteRequest {
3039 label: "seed".to_owned(),
3040 nodes: vec![NodeInsert {
3041 row_id: "row-1".to_owned(),
3042 logical_id: "logical-1".to_owned(),
3043 kind: "Meeting".to_owned(),
3044 properties: "{}".to_owned(),
3045 source_ref: Some("src-1".to_owned()),
3046 upsert: false,
3047 chunk_policy: ChunkPolicy::Preserve,
3048 }],
3049 node_retires: vec![],
3050 edges: vec![],
3051 edge_retires: vec![],
3052 chunks: vec![ChunkInsert {
3053 id: "chunk-1".to_owned(),
3054 node_logical_id: "logical-1".to_owned(),
3055 text_content: "budget discussion".to_owned(),
3056 byte_start: None,
3057 byte_end: None,
3058 }],
3059 runs: vec![],
3060 steps: vec![],
3061 actions: vec![],
3062 optional_backfills: vec![],
3063 vec_inserts: vec![],
3064 operational_writes: vec![],
3065 })
3066 .expect("write receipt");
3067
3068 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3069 let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3070 conn.query_row(
3071 "SELECT chunk_id, node_logical_id, kind, text_content \
3072 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3073 [],
3074 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3075 )
3076 .expect("fts row");
3077 assert_eq!(chunk_id, "chunk-1");
3078 assert_eq!(node_logical_id, "logical-1");
3079 assert_eq!(kind, "Meeting");
3080 assert_eq!(text_content, "budget discussion");
3081 }
3082
3083 #[test]
3084 fn writer_receipt_warns_on_nodes_without_source_ref() {
3085 let db = NamedTempFile::new().expect("temporary db");
3086 let writer = WriterActor::start(
3087 db.path(),
3088 Arc::new(SchemaManager::new()),
3089 ProvenanceMode::Warn,
3090 Arc::new(TelemetryCounters::default()),
3091 )
3092 .expect("writer");
3093
3094 let receipt = writer
3095 .submit(WriteRequest {
3096 label: "no-source".to_owned(),
3097 nodes: vec![NodeInsert {
3098 row_id: "row-1".to_owned(),
3099 logical_id: "logical-1".to_owned(),
3100 kind: "Meeting".to_owned(),
3101 properties: "{}".to_owned(),
3102 source_ref: None,
3103 upsert: false,
3104 chunk_policy: ChunkPolicy::Preserve,
3105 }],
3106 node_retires: vec![],
3107 edges: vec![],
3108 edge_retires: vec![],
3109 chunks: vec![],
3110 runs: vec![],
3111 steps: vec![],
3112 actions: vec![],
3113 optional_backfills: vec![],
3114 vec_inserts: vec![],
3115 operational_writes: vec![],
3116 })
3117 .expect("write receipt");
3118
3119 assert_eq!(receipt.provenance_warnings.len(), 1);
3120 assert!(receipt.provenance_warnings[0].contains("logical-1"));
3121 }
3122
3123 #[test]
3124 fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3125 let db = NamedTempFile::new().expect("temporary db");
3126 let writer = WriterActor::start(
3127 db.path(),
3128 Arc::new(SchemaManager::new()),
3129 ProvenanceMode::Warn,
3130 Arc::new(TelemetryCounters::default()),
3131 )
3132 .expect("writer");
3133
3134 let receipt = writer
3135 .submit(WriteRequest {
3136 label: "with-source".to_owned(),
3137 nodes: vec![NodeInsert {
3138 row_id: "row-1".to_owned(),
3139 logical_id: "logical-1".to_owned(),
3140 kind: "Meeting".to_owned(),
3141 properties: "{}".to_owned(),
3142 source_ref: Some("src-1".to_owned()),
3143 upsert: false,
3144 chunk_policy: ChunkPolicy::Preserve,
3145 }],
3146 node_retires: vec![],
3147 edges: vec![],
3148 edge_retires: vec![],
3149 chunks: vec![],
3150 runs: vec![],
3151 steps: vec![],
3152 actions: vec![],
3153 optional_backfills: vec![],
3154 vec_inserts: vec![],
3155 operational_writes: vec![],
3156 })
3157 .expect("write receipt");
3158
3159 assert!(receipt.provenance_warnings.is_empty());
3160 }
3161
3162 #[test]
3163 fn writer_accepts_chunk_for_pre_existing_node() {
3164 let db = NamedTempFile::new().expect("temporary db");
3165 let writer = WriterActor::start(
3166 db.path(),
3167 Arc::new(SchemaManager::new()),
3168 ProvenanceMode::Warn,
3169 Arc::new(TelemetryCounters::default()),
3170 )
3171 .expect("writer");
3172
3173 writer
3175 .submit(WriteRequest {
3176 label: "r1".to_owned(),
3177 nodes: vec![NodeInsert {
3178 row_id: "row-1".to_owned(),
3179 logical_id: "logical-1".to_owned(),
3180 kind: "Meeting".to_owned(),
3181 properties: "{}".to_owned(),
3182 source_ref: Some("src-1".to_owned()),
3183 upsert: false,
3184 chunk_policy: ChunkPolicy::Preserve,
3185 }],
3186 node_retires: vec![],
3187 edges: vec![],
3188 edge_retires: vec![],
3189 chunks: vec![],
3190 runs: vec![],
3191 steps: vec![],
3192 actions: vec![],
3193 optional_backfills: vec![],
3194 vec_inserts: vec![],
3195 operational_writes: vec![],
3196 })
3197 .expect("r1 write");
3198
3199 writer
3201 .submit(WriteRequest {
3202 label: "r2".to_owned(),
3203 nodes: vec![],
3204 node_retires: vec![],
3205 edges: vec![],
3206 edge_retires: vec![],
3207 chunks: vec![ChunkInsert {
3208 id: "chunk-1".to_owned(),
3209 node_logical_id: "logical-1".to_owned(),
3210 text_content: "budget discussion".to_owned(),
3211 byte_start: None,
3212 byte_end: None,
3213 }],
3214 runs: vec![],
3215 steps: vec![],
3216 actions: vec![],
3217 optional_backfills: vec![],
3218 vec_inserts: vec![],
3219 operational_writes: vec![],
3220 })
3221 .expect("r2 write — chunk for pre-existing node");
3222
3223 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3224 let count: i64 = conn
3225 .query_row(
3226 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3227 [],
3228 |row| row.get(0),
3229 )
3230 .expect("fts count");
3231 assert_eq!(
3232 count, 1,
3233 "FTS row must exist for chunk attached to pre-existing node"
3234 );
3235 }
3236
3237 #[test]
3238 fn writer_rejects_chunk_for_completely_unknown_node() {
3239 let db = NamedTempFile::new().expect("temporary db");
3240 let writer = WriterActor::start(
3241 db.path(),
3242 Arc::new(SchemaManager::new()),
3243 ProvenanceMode::Warn,
3244 Arc::new(TelemetryCounters::default()),
3245 )
3246 .expect("writer");
3247
3248 let result = writer.submit(WriteRequest {
3249 label: "bad".to_owned(),
3250 nodes: vec![],
3251 node_retires: vec![],
3252 edges: vec![],
3253 edge_retires: vec![],
3254 chunks: vec![ChunkInsert {
3255 id: "chunk-1".to_owned(),
3256 node_logical_id: "nonexistent".to_owned(),
3257 text_content: "some text".to_owned(),
3258 byte_start: None,
3259 byte_end: None,
3260 }],
3261 runs: vec![],
3262 steps: vec![],
3263 actions: vec![],
3264 optional_backfills: vec![],
3265 vec_inserts: vec![],
3266 operational_writes: vec![],
3267 });
3268
3269 assert!(
3270 matches!(result, Err(EngineError::InvalidWrite(_))),
3271 "completely unknown node must return InvalidWrite"
3272 );
3273 }
3274
3275 #[test]
3276 fn writer_executes_typed_nodes_chunks_and_derived_projections() {
3277 let db = NamedTempFile::new().expect("temporary db");
3278 let writer = WriterActor::start(
3279 db.path(),
3280 Arc::new(SchemaManager::new()),
3281 ProvenanceMode::Warn,
3282 Arc::new(TelemetryCounters::default()),
3283 )
3284 .expect("writer");
3285
3286 let receipt = writer
3287 .submit(WriteRequest {
3288 label: "seed".to_owned(),
3289 nodes: vec![NodeInsert {
3290 row_id: "row-1".to_owned(),
3291 logical_id: "logical-1".to_owned(),
3292 kind: "Meeting".to_owned(),
3293 properties: "{}".to_owned(),
3294 source_ref: None,
3295 upsert: false,
3296 chunk_policy: ChunkPolicy::Preserve,
3297 }],
3298 node_retires: vec![],
3299 edges: vec![],
3300 edge_retires: vec![],
3301 chunks: vec![ChunkInsert {
3302 id: "chunk-1".to_owned(),
3303 node_logical_id: "logical-1".to_owned(),
3304 text_content: "budget discussion".to_owned(),
3305 byte_start: None,
3306 byte_end: None,
3307 }],
3308 runs: vec![],
3309 steps: vec![],
3310 actions: vec![],
3311 optional_backfills: vec![],
3312 vec_inserts: vec![],
3313 operational_writes: vec![],
3314 })
3315 .expect("write receipt");
3316
3317 assert_eq!(receipt.label, "seed");
3318 }
3319
3320 #[test]
3321 fn writer_node_retire_supersedes_active_node() {
3322 let db = NamedTempFile::new().expect("temporary db");
3323 let writer = WriterActor::start(
3324 db.path(),
3325 Arc::new(SchemaManager::new()),
3326 ProvenanceMode::Warn,
3327 Arc::new(TelemetryCounters::default()),
3328 )
3329 .expect("writer");
3330
3331 writer
3332 .submit(WriteRequest {
3333 label: "seed".to_owned(),
3334 nodes: vec![NodeInsert {
3335 row_id: "row-1".to_owned(),
3336 logical_id: "meeting-1".to_owned(),
3337 kind: "Meeting".to_owned(),
3338 properties: "{}".to_owned(),
3339 source_ref: Some("src-1".to_owned()),
3340 upsert: false,
3341 chunk_policy: ChunkPolicy::Preserve,
3342 }],
3343 node_retires: vec![],
3344 edges: vec![],
3345 edge_retires: vec![],
3346 chunks: vec![],
3347 runs: vec![],
3348 steps: vec![],
3349 actions: vec![],
3350 optional_backfills: vec![],
3351 vec_inserts: vec![],
3352 operational_writes: vec![],
3353 })
3354 .expect("seed write");
3355
3356 writer
3357 .submit(WriteRequest {
3358 label: "retire".to_owned(),
3359 nodes: vec![],
3360 node_retires: vec![NodeRetire {
3361 logical_id: "meeting-1".to_owned(),
3362 source_ref: Some("src-2".to_owned()),
3363 }],
3364 edges: vec![],
3365 edge_retires: vec![],
3366 chunks: vec![],
3367 runs: vec![],
3368 steps: vec![],
3369 actions: vec![],
3370 optional_backfills: vec![],
3371 vec_inserts: vec![],
3372 operational_writes: vec![],
3373 })
3374 .expect("retire write");
3375
3376 let conn = rusqlite::Connection::open(db.path()).expect("open");
3377 let active: i64 = conn
3378 .query_row(
3379 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
3380 [],
3381 |r| r.get(0),
3382 )
3383 .expect("count active");
3384 let historical: i64 = conn
3385 .query_row(
3386 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
3387 [],
3388 |r| r.get(0),
3389 )
3390 .expect("count historical");
3391
3392 assert_eq!(active, 0, "active count must be 0 after retire");
3393 assert_eq!(historical, 1, "historical count must be 1 after retire");
3394 }
3395
3396 #[test]
3397 fn writer_node_retire_preserves_chunks_and_clears_fts() {
3398 let db = NamedTempFile::new().expect("temporary db");
3399 let writer = WriterActor::start(
3400 db.path(),
3401 Arc::new(SchemaManager::new()),
3402 ProvenanceMode::Warn,
3403 Arc::new(TelemetryCounters::default()),
3404 )
3405 .expect("writer");
3406
3407 writer
3408 .submit(WriteRequest {
3409 label: "seed".to_owned(),
3410 nodes: vec![NodeInsert {
3411 row_id: "row-1".to_owned(),
3412 logical_id: "meeting-1".to_owned(),
3413 kind: "Meeting".to_owned(),
3414 properties: "{}".to_owned(),
3415 source_ref: Some("src-1".to_owned()),
3416 upsert: false,
3417 chunk_policy: ChunkPolicy::Preserve,
3418 }],
3419 node_retires: vec![],
3420 edges: vec![],
3421 edge_retires: vec![],
3422 chunks: vec![ChunkInsert {
3423 id: "chunk-1".to_owned(),
3424 node_logical_id: "meeting-1".to_owned(),
3425 text_content: "budget discussion".to_owned(),
3426 byte_start: None,
3427 byte_end: None,
3428 }],
3429 runs: vec![],
3430 steps: vec![],
3431 actions: vec![],
3432 optional_backfills: vec![],
3433 vec_inserts: vec![],
3434 operational_writes: vec![],
3435 })
3436 .expect("seed write");
3437
3438 writer
3439 .submit(WriteRequest {
3440 label: "retire".to_owned(),
3441 nodes: vec![],
3442 node_retires: vec![NodeRetire {
3443 logical_id: "meeting-1".to_owned(),
3444 source_ref: Some("src-2".to_owned()),
3445 }],
3446 edges: vec![],
3447 edge_retires: vec![],
3448 chunks: vec![],
3449 runs: vec![],
3450 steps: vec![],
3451 actions: vec![],
3452 optional_backfills: vec![],
3453 vec_inserts: vec![],
3454 operational_writes: vec![],
3455 })
3456 .expect("retire write");
3457
3458 let conn = rusqlite::Connection::open(db.path()).expect("open");
3459 let chunk_count: i64 = conn
3460 .query_row(
3461 "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
3462 [],
3463 |r| r.get(0),
3464 )
3465 .expect("chunk count");
3466 let fts_count: i64 = conn
3467 .query_row(
3468 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
3469 [],
3470 |r| r.get(0),
3471 )
3472 .expect("fts count");
3473
3474 assert_eq!(
3475 chunk_count, 1,
3476 "chunks must remain after node retire so restore can re-establish content"
3477 );
3478 assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
3479 }
3480
3481 #[test]
3482 fn writer_edge_retire_supersedes_active_edge() {
3483 let db = NamedTempFile::new().expect("temporary db");
3484 let writer = WriterActor::start(
3485 db.path(),
3486 Arc::new(SchemaManager::new()),
3487 ProvenanceMode::Warn,
3488 Arc::new(TelemetryCounters::default()),
3489 )
3490 .expect("writer");
3491
3492 writer
3493 .submit(WriteRequest {
3494 label: "seed".to_owned(),
3495 nodes: vec![
3496 NodeInsert {
3497 row_id: "row-a".to_owned(),
3498 logical_id: "node-a".to_owned(),
3499 kind: "Meeting".to_owned(),
3500 properties: "{}".to_owned(),
3501 source_ref: Some("src-1".to_owned()),
3502 upsert: false,
3503 chunk_policy: ChunkPolicy::Preserve,
3504 },
3505 NodeInsert {
3506 row_id: "row-b".to_owned(),
3507 logical_id: "node-b".to_owned(),
3508 kind: "Task".to_owned(),
3509 properties: "{}".to_owned(),
3510 source_ref: Some("src-1".to_owned()),
3511 upsert: false,
3512 chunk_policy: ChunkPolicy::Preserve,
3513 },
3514 ],
3515 node_retires: vec![],
3516 edges: vec![EdgeInsert {
3517 row_id: "edge-1".to_owned(),
3518 logical_id: "edge-lg-1".to_owned(),
3519 source_logical_id: "node-a".to_owned(),
3520 target_logical_id: "node-b".to_owned(),
3521 kind: "HAS_TASK".to_owned(),
3522 properties: "{}".to_owned(),
3523 source_ref: Some("src-1".to_owned()),
3524 upsert: false,
3525 }],
3526 edge_retires: vec![],
3527 chunks: vec![],
3528 runs: vec![],
3529 steps: vec![],
3530 actions: vec![],
3531 optional_backfills: vec![],
3532 vec_inserts: vec![],
3533 operational_writes: vec![],
3534 })
3535 .expect("seed write");
3536
3537 writer
3538 .submit(WriteRequest {
3539 label: "retire-edge".to_owned(),
3540 nodes: vec![],
3541 node_retires: vec![],
3542 edges: vec![],
3543 edge_retires: vec![EdgeRetire {
3544 logical_id: "edge-lg-1".to_owned(),
3545 source_ref: Some("src-2".to_owned()),
3546 }],
3547 chunks: vec![],
3548 runs: vec![],
3549 steps: vec![],
3550 actions: vec![],
3551 optional_backfills: vec![],
3552 vec_inserts: vec![],
3553 operational_writes: vec![],
3554 })
3555 .expect("retire edge write");
3556
3557 let conn = rusqlite::Connection::open(db.path()).expect("open");
3558 let active: i64 = conn
3559 .query_row(
3560 "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3561 [],
3562 |r| r.get(0),
3563 )
3564 .expect("active edge count");
3565
3566 assert_eq!(active, 0, "active edge count must be 0 after retire");
3567 }
3568
3569 #[test]
3570 fn writer_retire_without_source_ref_emits_provenance_warning() {
3571 let db = NamedTempFile::new().expect("temporary db");
3572 let writer = WriterActor::start(
3573 db.path(),
3574 Arc::new(SchemaManager::new()),
3575 ProvenanceMode::Warn,
3576 Arc::new(TelemetryCounters::default()),
3577 )
3578 .expect("writer");
3579
3580 writer
3581 .submit(WriteRequest {
3582 label: "seed".to_owned(),
3583 nodes: vec![NodeInsert {
3584 row_id: "row-1".to_owned(),
3585 logical_id: "meeting-1".to_owned(),
3586 kind: "Meeting".to_owned(),
3587 properties: "{}".to_owned(),
3588 source_ref: Some("src-1".to_owned()),
3589 upsert: false,
3590 chunk_policy: ChunkPolicy::Preserve,
3591 }],
3592 node_retires: vec![],
3593 edges: vec![],
3594 edge_retires: vec![],
3595 chunks: vec![],
3596 runs: vec![],
3597 steps: vec![],
3598 actions: vec![],
3599 optional_backfills: vec![],
3600 vec_inserts: vec![],
3601 operational_writes: vec![],
3602 })
3603 .expect("seed write");
3604
3605 let receipt = writer
3606 .submit(WriteRequest {
3607 label: "retire-no-src".to_owned(),
3608 nodes: vec![],
3609 node_retires: vec![NodeRetire {
3610 logical_id: "meeting-1".to_owned(),
3611 source_ref: None,
3612 }],
3613 edges: vec![],
3614 edge_retires: vec![],
3615 chunks: vec![],
3616 runs: vec![],
3617 steps: vec![],
3618 actions: vec![],
3619 optional_backfills: vec![],
3620 vec_inserts: vec![],
3621 operational_writes: vec![],
3622 })
3623 .expect("retire write");
3624
3625 assert!(
3626 !receipt.provenance_warnings.is_empty(),
3627 "retire without source_ref must emit a provenance warning"
3628 );
3629 }
3630
3631 #[test]
3632 fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
3633 let db = NamedTempFile::new().expect("temporary db");
3634 let writer = WriterActor::start(
3635 db.path(),
3636 Arc::new(SchemaManager::new()),
3637 ProvenanceMode::Warn,
3638 Arc::new(TelemetryCounters::default()),
3639 )
3640 .expect("writer");
3641
3642 writer
3643 .submit(WriteRequest {
3644 label: "v1".to_owned(),
3645 nodes: vec![NodeInsert {
3646 row_id: "row-1".to_owned(),
3647 logical_id: "meeting-1".to_owned(),
3648 kind: "Meeting".to_owned(),
3649 properties: "{}".to_owned(),
3650 source_ref: Some("src-1".to_owned()),
3651 upsert: false,
3652 chunk_policy: ChunkPolicy::Preserve,
3653 }],
3654 node_retires: vec![],
3655 edges: vec![],
3656 edge_retires: vec![],
3657 chunks: vec![ChunkInsert {
3658 id: "chunk-old".to_owned(),
3659 node_logical_id: "meeting-1".to_owned(),
3660 text_content: "old text".to_owned(),
3661 byte_start: None,
3662 byte_end: None,
3663 }],
3664 runs: vec![],
3665 steps: vec![],
3666 actions: vec![],
3667 optional_backfills: vec![],
3668 vec_inserts: vec![],
3669 operational_writes: vec![],
3670 })
3671 .expect("v1 write");
3672
3673 writer
3674 .submit(WriteRequest {
3675 label: "v2".to_owned(),
3676 nodes: vec![NodeInsert {
3677 row_id: "row-2".to_owned(),
3678 logical_id: "meeting-1".to_owned(),
3679 kind: "Meeting".to_owned(),
3680 properties: "{}".to_owned(),
3681 source_ref: Some("src-2".to_owned()),
3682 upsert: true,
3683 chunk_policy: ChunkPolicy::Replace,
3684 }],
3685 node_retires: vec![],
3686 edges: vec![],
3687 edge_retires: vec![],
3688 chunks: vec![ChunkInsert {
3689 id: "chunk-new".to_owned(),
3690 node_logical_id: "meeting-1".to_owned(),
3691 text_content: "new text".to_owned(),
3692 byte_start: None,
3693 byte_end: None,
3694 }],
3695 runs: vec![],
3696 steps: vec![],
3697 actions: vec![],
3698 optional_backfills: vec![],
3699 vec_inserts: vec![],
3700 operational_writes: vec![],
3701 })
3702 .expect("v2 write");
3703
3704 let conn = rusqlite::Connection::open(db.path()).expect("open");
3705 let old_chunk: i64 = conn
3706 .query_row(
3707 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
3708 [],
3709 |r| r.get(0),
3710 )
3711 .expect("old chunk count");
3712 let new_chunk: i64 = conn
3713 .query_row(
3714 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
3715 [],
3716 |r| r.get(0),
3717 )
3718 .expect("new chunk count");
3719 let fts_old: i64 = conn
3720 .query_row(
3721 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
3722 [],
3723 |r| r.get(0),
3724 )
3725 .expect("old fts count");
3726
3727 assert_eq!(
3728 old_chunk, 0,
3729 "old chunk must be deleted by ChunkPolicy::Replace"
3730 );
3731 assert_eq!(new_chunk, 1, "new chunk must exist after replace");
3732 assert_eq!(
3733 fts_old, 0,
3734 "old FTS row must be deleted by ChunkPolicy::Replace"
3735 );
3736 }
3737
3738 #[test]
3739 fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
3740 let db = NamedTempFile::new().expect("temporary db");
3741 let writer = WriterActor::start(
3742 db.path(),
3743 Arc::new(SchemaManager::new()),
3744 ProvenanceMode::Warn,
3745 Arc::new(TelemetryCounters::default()),
3746 )
3747 .expect("writer");
3748
3749 writer
3750 .submit(WriteRequest {
3751 label: "v1".to_owned(),
3752 nodes: vec![NodeInsert {
3753 row_id: "row-1".to_owned(),
3754 logical_id: "meeting-1".to_owned(),
3755 kind: "Meeting".to_owned(),
3756 properties: "{}".to_owned(),
3757 source_ref: Some("src-1".to_owned()),
3758 upsert: false,
3759 chunk_policy: ChunkPolicy::Preserve,
3760 }],
3761 node_retires: vec![],
3762 edges: vec![],
3763 edge_retires: vec![],
3764 chunks: vec![ChunkInsert {
3765 id: "chunk-old".to_owned(),
3766 node_logical_id: "meeting-1".to_owned(),
3767 text_content: "old text".to_owned(),
3768 byte_start: None,
3769 byte_end: None,
3770 }],
3771 runs: vec![],
3772 steps: vec![],
3773 actions: vec![],
3774 optional_backfills: vec![],
3775 vec_inserts: vec![],
3776 operational_writes: vec![],
3777 })
3778 .expect("v1 write");
3779
3780 writer
3781 .submit(WriteRequest {
3782 label: "v2-props-only".to_owned(),
3783 nodes: vec![NodeInsert {
3784 row_id: "row-2".to_owned(),
3785 logical_id: "meeting-1".to_owned(),
3786 kind: "Meeting".to_owned(),
3787 properties: r#"{"status":"updated"}"#.to_owned(),
3788 source_ref: Some("src-2".to_owned()),
3789 upsert: true,
3790 chunk_policy: ChunkPolicy::Preserve,
3791 }],
3792 node_retires: vec![],
3793 edges: vec![],
3794 edge_retires: vec![],
3795 chunks: vec![],
3796 runs: vec![],
3797 steps: vec![],
3798 actions: vec![],
3799 optional_backfills: vec![],
3800 vec_inserts: vec![],
3801 operational_writes: vec![],
3802 })
3803 .expect("v2 preserve write");
3804
3805 let conn = rusqlite::Connection::open(db.path()).expect("open");
3806 let old_chunk: i64 = conn
3807 .query_row(
3808 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
3809 [],
3810 |r| r.get(0),
3811 )
3812 .expect("old chunk count");
3813
3814 assert_eq!(
3815 old_chunk, 1,
3816 "old chunk must be preserved by ChunkPolicy::Preserve"
3817 );
3818 }
3819
3820 #[test]
3821 fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
3822 let db = NamedTempFile::new().expect("temporary db");
3823 let writer = WriterActor::start(
3824 db.path(),
3825 Arc::new(SchemaManager::new()),
3826 ProvenanceMode::Warn,
3827 Arc::new(TelemetryCounters::default()),
3828 )
3829 .expect("writer");
3830
3831 writer
3832 .submit(WriteRequest {
3833 label: "v1".to_owned(),
3834 nodes: vec![NodeInsert {
3835 row_id: "row-1".to_owned(),
3836 logical_id: "meeting-1".to_owned(),
3837 kind: "Meeting".to_owned(),
3838 properties: "{}".to_owned(),
3839 source_ref: Some("src-1".to_owned()),
3840 upsert: false,
3841 chunk_policy: ChunkPolicy::Preserve,
3842 }],
3843 node_retires: vec![],
3844 edges: vec![],
3845 edge_retires: vec![],
3846 chunks: vec![ChunkInsert {
3847 id: "chunk-existing".to_owned(),
3848 node_logical_id: "meeting-1".to_owned(),
3849 text_content: "existing text".to_owned(),
3850 byte_start: None,
3851 byte_end: None,
3852 }],
3853 runs: vec![],
3854 steps: vec![],
3855 actions: vec![],
3856 optional_backfills: vec![],
3857 vec_inserts: vec![],
3858 operational_writes: vec![],
3859 })
3860 .expect("v1 write");
3861
3862 writer
3864 .submit(WriteRequest {
3865 label: "insert-no-upsert".to_owned(),
3866 nodes: vec![NodeInsert {
3867 row_id: "row-2".to_owned(),
3868 logical_id: "meeting-2".to_owned(),
3869 kind: "Meeting".to_owned(),
3870 properties: "{}".to_owned(),
3871 source_ref: Some("src-2".to_owned()),
3872 upsert: false,
3873 chunk_policy: ChunkPolicy::Replace,
3874 }],
3875 node_retires: vec![],
3876 edges: vec![],
3877 edge_retires: vec![],
3878 chunks: vec![],
3879 runs: vec![],
3880 steps: vec![],
3881 actions: vec![],
3882 optional_backfills: vec![],
3883 vec_inserts: vec![],
3884 operational_writes: vec![],
3885 })
3886 .expect("insert no-upsert write");
3887
3888 let conn = rusqlite::Connection::open(db.path()).expect("open");
3889 let existing_chunk: i64 = conn
3890 .query_row(
3891 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
3892 [],
3893 |r| r.get(0),
3894 )
3895 .expect("chunk count");
3896
3897 assert_eq!(
3898 existing_chunk, 1,
3899 "ChunkPolicy::Replace without upsert must not delete existing chunks"
3900 );
3901 }
3902
3903 #[test]
3904 fn writer_run_upsert_supersedes_prior_active_run() {
3905 let db = NamedTempFile::new().expect("temporary db");
3906 let writer = WriterActor::start(
3907 db.path(),
3908 Arc::new(SchemaManager::new()),
3909 ProvenanceMode::Warn,
3910 Arc::new(TelemetryCounters::default()),
3911 )
3912 .expect("writer");
3913
3914 writer
3915 .submit(WriteRequest {
3916 label: "v1".to_owned(),
3917 nodes: vec![],
3918 node_retires: vec![],
3919 edges: vec![],
3920 edge_retires: vec![],
3921 chunks: vec![],
3922 runs: vec![RunInsert {
3923 id: "run-v1".to_owned(),
3924 kind: "session".to_owned(),
3925 status: "completed".to_owned(),
3926 properties: "{}".to_owned(),
3927 source_ref: Some("src-1".to_owned()),
3928 upsert: false,
3929 supersedes_id: None,
3930 }],
3931 steps: vec![],
3932 actions: vec![],
3933 optional_backfills: vec![],
3934 vec_inserts: vec![],
3935 operational_writes: vec![],
3936 })
3937 .expect("v1 run write");
3938
3939 writer
3940 .submit(WriteRequest {
3941 label: "v2".to_owned(),
3942 nodes: vec![],
3943 node_retires: vec![],
3944 edges: vec![],
3945 edge_retires: vec![],
3946 chunks: vec![],
3947 runs: vec![RunInsert {
3948 id: "run-v2".to_owned(),
3949 kind: "session".to_owned(),
3950 status: "completed".to_owned(),
3951 properties: "{}".to_owned(),
3952 source_ref: Some("src-2".to_owned()),
3953 upsert: true,
3954 supersedes_id: Some("run-v1".to_owned()),
3955 }],
3956 steps: vec![],
3957 actions: vec![],
3958 optional_backfills: vec![],
3959 vec_inserts: vec![],
3960 operational_writes: vec![],
3961 })
3962 .expect("v2 run write");
3963
3964 let conn = rusqlite::Connection::open(db.path()).expect("open");
3965 let v1_historical: i64 = conn
3966 .query_row(
3967 "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
3968 [],
3969 |r| r.get(0),
3970 )
3971 .expect("v1 historical count");
3972 let v2_active: i64 = conn
3973 .query_row(
3974 "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
3975 [],
3976 |r| r.get(0),
3977 )
3978 .expect("v2 active count");
3979
3980 assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
3981 assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
3982 }
3983
3984 #[test]
3985 fn writer_step_upsert_supersedes_prior_active_step() {
3986 let db = NamedTempFile::new().expect("temporary db");
3987 let writer = WriterActor::start(
3988 db.path(),
3989 Arc::new(SchemaManager::new()),
3990 ProvenanceMode::Warn,
3991 Arc::new(TelemetryCounters::default()),
3992 )
3993 .expect("writer");
3994
3995 writer
3996 .submit(WriteRequest {
3997 label: "v1".to_owned(),
3998 nodes: vec![],
3999 node_retires: vec![],
4000 edges: vec![],
4001 edge_retires: vec![],
4002 chunks: vec![],
4003 runs: vec![RunInsert {
4004 id: "run-1".to_owned(),
4005 kind: "session".to_owned(),
4006 status: "completed".to_owned(),
4007 properties: "{}".to_owned(),
4008 source_ref: Some("src-1".to_owned()),
4009 upsert: false,
4010 supersedes_id: None,
4011 }],
4012 steps: vec![StepInsert {
4013 id: "step-v1".to_owned(),
4014 run_id: "run-1".to_owned(),
4015 kind: "llm".to_owned(),
4016 status: "completed".to_owned(),
4017 properties: "{}".to_owned(),
4018 source_ref: Some("src-1".to_owned()),
4019 upsert: false,
4020 supersedes_id: None,
4021 }],
4022 actions: vec![],
4023 optional_backfills: vec![],
4024 vec_inserts: vec![],
4025 operational_writes: vec![],
4026 })
4027 .expect("v1 step write");
4028
4029 writer
4030 .submit(WriteRequest {
4031 label: "v2".to_owned(),
4032 nodes: vec![],
4033 node_retires: vec![],
4034 edges: vec![],
4035 edge_retires: vec![],
4036 chunks: vec![],
4037 runs: vec![],
4038 steps: vec![StepInsert {
4039 id: "step-v2".to_owned(),
4040 run_id: "run-1".to_owned(),
4041 kind: "llm".to_owned(),
4042 status: "completed".to_owned(),
4043 properties: "{}".to_owned(),
4044 source_ref: Some("src-2".to_owned()),
4045 upsert: true,
4046 supersedes_id: Some("step-v1".to_owned()),
4047 }],
4048 actions: vec![],
4049 optional_backfills: vec![],
4050 vec_inserts: vec![],
4051 operational_writes: vec![],
4052 })
4053 .expect("v2 step write");
4054
4055 let conn = rusqlite::Connection::open(db.path()).expect("open");
4056 let v1_historical: i64 = conn
4057 .query_row(
4058 "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4059 [],
4060 |r| r.get(0),
4061 )
4062 .expect("v1 historical count");
4063 let v2_active: i64 = conn
4064 .query_row(
4065 "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4066 [],
4067 |r| r.get(0),
4068 )
4069 .expect("v2 active count");
4070
4071 assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4072 assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4073 }
4074
4075 #[test]
4076 fn writer_action_upsert_supersedes_prior_active_action() {
4077 let db = NamedTempFile::new().expect("temporary db");
4078 let writer = WriterActor::start(
4079 db.path(),
4080 Arc::new(SchemaManager::new()),
4081 ProvenanceMode::Warn,
4082 Arc::new(TelemetryCounters::default()),
4083 )
4084 .expect("writer");
4085
4086 writer
4087 .submit(WriteRequest {
4088 label: "v1".to_owned(),
4089 nodes: vec![],
4090 node_retires: vec![],
4091 edges: vec![],
4092 edge_retires: vec![],
4093 chunks: vec![],
4094 runs: vec![RunInsert {
4095 id: "run-1".to_owned(),
4096 kind: "session".to_owned(),
4097 status: "completed".to_owned(),
4098 properties: "{}".to_owned(),
4099 source_ref: Some("src-1".to_owned()),
4100 upsert: false,
4101 supersedes_id: None,
4102 }],
4103 steps: vec![StepInsert {
4104 id: "step-1".to_owned(),
4105 run_id: "run-1".to_owned(),
4106 kind: "llm".to_owned(),
4107 status: "completed".to_owned(),
4108 properties: "{}".to_owned(),
4109 source_ref: Some("src-1".to_owned()),
4110 upsert: false,
4111 supersedes_id: None,
4112 }],
4113 actions: vec![ActionInsert {
4114 id: "action-v1".to_owned(),
4115 step_id: "step-1".to_owned(),
4116 kind: "emit".to_owned(),
4117 status: "completed".to_owned(),
4118 properties: "{}".to_owned(),
4119 source_ref: Some("src-1".to_owned()),
4120 upsert: false,
4121 supersedes_id: None,
4122 }],
4123 optional_backfills: vec![],
4124 vec_inserts: vec![],
4125 operational_writes: vec![],
4126 })
4127 .expect("v1 action write");
4128
4129 writer
4130 .submit(WriteRequest {
4131 label: "v2".to_owned(),
4132 nodes: vec![],
4133 node_retires: vec![],
4134 edges: vec![],
4135 edge_retires: vec![],
4136 chunks: vec![],
4137 runs: vec![],
4138 steps: vec![],
4139 actions: vec![ActionInsert {
4140 id: "action-v2".to_owned(),
4141 step_id: "step-1".to_owned(),
4142 kind: "emit".to_owned(),
4143 status: "completed".to_owned(),
4144 properties: "{}".to_owned(),
4145 source_ref: Some("src-2".to_owned()),
4146 upsert: true,
4147 supersedes_id: Some("action-v1".to_owned()),
4148 }],
4149 optional_backfills: vec![],
4150 vec_inserts: vec![],
4151 operational_writes: vec![],
4152 })
4153 .expect("v2 action write");
4154
4155 let conn = rusqlite::Connection::open(db.path()).expect("open");
4156 let v1_historical: i64 = conn
4157 .query_row(
4158 "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4159 [],
4160 |r| r.get(0),
4161 )
4162 .expect("v1 historical count");
4163 let v2_active: i64 = conn
4164 .query_row(
4165 "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4166 [],
4167 |r| r.get(0),
4168 )
4169 .expect("v2 active count");
4170
4171 assert_eq!(
4172 v1_historical, 1,
4173 "action-v1 must be historical after upsert"
4174 );
4175 assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4176 }
4177
4178 #[test]
4181 fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4182 let db = NamedTempFile::new().expect("temporary db");
4183 let writer = WriterActor::start(
4184 db.path(),
4185 Arc::new(SchemaManager::new()),
4186 ProvenanceMode::Warn,
4187 Arc::new(TelemetryCounters::default()),
4188 )
4189 .expect("writer");
4190
4191 let result = writer.submit(WriteRequest {
4192 label: "bad".to_owned(),
4193 nodes: vec![],
4194 node_retires: vec![],
4195 edges: vec![],
4196 edge_retires: vec![],
4197 chunks: vec![],
4198 runs: vec![RunInsert {
4199 id: "run-1".to_owned(),
4200 kind: "session".to_owned(),
4201 status: "completed".to_owned(),
4202 properties: "{}".to_owned(),
4203 source_ref: None,
4204 upsert: true,
4205 supersedes_id: None,
4206 }],
4207 steps: vec![],
4208 actions: vec![],
4209 optional_backfills: vec![],
4210 vec_inserts: vec![],
4211 operational_writes: vec![],
4212 });
4213
4214 assert!(
4215 matches!(result, Err(EngineError::InvalidWrite(_))),
4216 "run upsert=true without supersedes_id must return InvalidWrite"
4217 );
4218 }
4219
4220 #[test]
4221 fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4222 let db = NamedTempFile::new().expect("temporary db");
4223 let writer = WriterActor::start(
4224 db.path(),
4225 Arc::new(SchemaManager::new()),
4226 ProvenanceMode::Warn,
4227 Arc::new(TelemetryCounters::default()),
4228 )
4229 .expect("writer");
4230
4231 let result = writer.submit(WriteRequest {
4232 label: "bad".to_owned(),
4233 nodes: vec![],
4234 node_retires: vec![],
4235 edges: vec![],
4236 edge_retires: vec![],
4237 chunks: vec![],
4238 runs: vec![],
4239 steps: vec![StepInsert {
4240 id: "step-1".to_owned(),
4241 run_id: "run-1".to_owned(),
4242 kind: "llm".to_owned(),
4243 status: "completed".to_owned(),
4244 properties: "{}".to_owned(),
4245 source_ref: None,
4246 upsert: true,
4247 supersedes_id: None,
4248 }],
4249 actions: vec![],
4250 optional_backfills: vec![],
4251 vec_inserts: vec![],
4252 operational_writes: vec![],
4253 });
4254
4255 assert!(
4256 matches!(result, Err(EngineError::InvalidWrite(_))),
4257 "step upsert=true without supersedes_id must return InvalidWrite"
4258 );
4259 }
4260
4261 #[test]
4262 fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
4263 let db = NamedTempFile::new().expect("temporary db");
4264 let writer = WriterActor::start(
4265 db.path(),
4266 Arc::new(SchemaManager::new()),
4267 ProvenanceMode::Warn,
4268 Arc::new(TelemetryCounters::default()),
4269 )
4270 .expect("writer");
4271
4272 let result = writer.submit(WriteRequest {
4273 label: "bad".to_owned(),
4274 nodes: vec![],
4275 node_retires: vec![],
4276 edges: vec![],
4277 edge_retires: vec![],
4278 chunks: vec![],
4279 runs: vec![],
4280 steps: vec![],
4281 actions: vec![ActionInsert {
4282 id: "action-1".to_owned(),
4283 step_id: "step-1".to_owned(),
4284 kind: "emit".to_owned(),
4285 status: "completed".to_owned(),
4286 properties: "{}".to_owned(),
4287 source_ref: None,
4288 upsert: true,
4289 supersedes_id: None,
4290 }],
4291 optional_backfills: vec![],
4292 vec_inserts: vec![],
4293 operational_writes: vec![],
4294 });
4295
4296 assert!(
4297 matches!(result, Err(EngineError::InvalidWrite(_))),
4298 "action upsert=true without supersedes_id must return InvalidWrite"
4299 );
4300 }
4301
4302 #[test]
4305 fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
4306 let db = NamedTempFile::new().expect("temporary db");
4307 let writer = WriterActor::start(
4308 db.path(),
4309 Arc::new(SchemaManager::new()),
4310 ProvenanceMode::Warn,
4311 Arc::new(TelemetryCounters::default()),
4312 )
4313 .expect("writer");
4314
4315 let receipt = writer
4316 .submit(WriteRequest {
4317 label: "test".to_owned(),
4318 nodes: vec![
4319 NodeInsert {
4320 row_id: "row-a".to_owned(),
4321 logical_id: "node-a".to_owned(),
4322 kind: "Meeting".to_owned(),
4323 properties: "{}".to_owned(),
4324 source_ref: Some("src-1".to_owned()),
4325 upsert: false,
4326 chunk_policy: ChunkPolicy::Preserve,
4327 },
4328 NodeInsert {
4329 row_id: "row-b".to_owned(),
4330 logical_id: "node-b".to_owned(),
4331 kind: "Task".to_owned(),
4332 properties: "{}".to_owned(),
4333 source_ref: Some("src-1".to_owned()),
4334 upsert: false,
4335 chunk_policy: ChunkPolicy::Preserve,
4336 },
4337 ],
4338 node_retires: vec![],
4339 edges: vec![EdgeInsert {
4340 row_id: "edge-1".to_owned(),
4341 logical_id: "edge-lg-1".to_owned(),
4342 source_logical_id: "node-a".to_owned(),
4343 target_logical_id: "node-b".to_owned(),
4344 kind: "HAS_TASK".to_owned(),
4345 properties: "{}".to_owned(),
4346 source_ref: None,
4347 upsert: false,
4348 }],
4349 edge_retires: vec![],
4350 chunks: vec![],
4351 runs: vec![],
4352 steps: vec![],
4353 actions: vec![],
4354 optional_backfills: vec![],
4355 vec_inserts: vec![],
4356 operational_writes: vec![],
4357 })
4358 .expect("write");
4359
4360 assert!(
4361 !receipt.provenance_warnings.is_empty(),
4362 "edge insert without source_ref must emit a provenance warning"
4363 );
4364 }
4365
4366 #[test]
4367 fn writer_run_insert_without_source_ref_emits_provenance_warning() {
4368 let db = NamedTempFile::new().expect("temporary db");
4369 let writer = WriterActor::start(
4370 db.path(),
4371 Arc::new(SchemaManager::new()),
4372 ProvenanceMode::Warn,
4373 Arc::new(TelemetryCounters::default()),
4374 )
4375 .expect("writer");
4376
4377 let receipt = writer
4378 .submit(WriteRequest {
4379 label: "test".to_owned(),
4380 nodes: vec![],
4381 node_retires: vec![],
4382 edges: vec![],
4383 edge_retires: vec![],
4384 chunks: vec![],
4385 runs: vec![RunInsert {
4386 id: "run-1".to_owned(),
4387 kind: "session".to_owned(),
4388 status: "completed".to_owned(),
4389 properties: "{}".to_owned(),
4390 source_ref: None,
4391 upsert: false,
4392 supersedes_id: None,
4393 }],
4394 steps: vec![],
4395 actions: vec![],
4396 optional_backfills: vec![],
4397 vec_inserts: vec![],
4398 operational_writes: vec![],
4399 })
4400 .expect("write");
4401
4402 assert!(
4403 !receipt.provenance_warnings.is_empty(),
4404 "run insert without source_ref must emit a provenance warning"
4405 );
4406 }
4407
4408 #[test]
4411 fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
4412 let db = NamedTempFile::new().expect("temporary db");
4413 let writer = WriterActor::start(
4414 db.path(),
4415 Arc::new(SchemaManager::new()),
4416 ProvenanceMode::Warn,
4417 Arc::new(TelemetryCounters::default()),
4418 )
4419 .expect("writer");
4420
4421 writer
4423 .submit(WriteRequest {
4424 label: "seed".to_owned(),
4425 nodes: vec![NodeInsert {
4426 row_id: "row-1".to_owned(),
4427 logical_id: "meeting-1".to_owned(),
4428 kind: "Meeting".to_owned(),
4429 properties: "{}".to_owned(),
4430 source_ref: Some("src-1".to_owned()),
4431 upsert: false,
4432 chunk_policy: ChunkPolicy::Preserve,
4433 }],
4434 node_retires: vec![],
4435 edges: vec![],
4436 edge_retires: vec![],
4437 chunks: vec![],
4438 runs: vec![],
4439 steps: vec![],
4440 actions: vec![],
4441 optional_backfills: vec![],
4442 vec_inserts: vec![],
4443 operational_writes: vec![],
4444 })
4445 .expect("seed write");
4446
4447 let result = writer.submit(WriteRequest {
4449 label: "bad".to_owned(),
4450 nodes: vec![],
4451 node_retires: vec![NodeRetire {
4452 logical_id: "meeting-1".to_owned(),
4453 source_ref: Some("src-2".to_owned()),
4454 }],
4455 edges: vec![],
4456 edge_retires: vec![],
4457 chunks: vec![ChunkInsert {
4458 id: "chunk-bad".to_owned(),
4459 node_logical_id: "meeting-1".to_owned(),
4460 text_content: "some text".to_owned(),
4461 byte_start: None,
4462 byte_end: None,
4463 }],
4464 runs: vec![],
4465 steps: vec![],
4466 actions: vec![],
4467 optional_backfills: vec![],
4468 vec_inserts: vec![],
4469 operational_writes: vec![],
4470 });
4471
4472 assert!(
4473 matches!(result, Err(EngineError::InvalidWrite(_))),
4474 "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
4475 );
4476 }
4477
4478 #[test]
4481 fn writer_batch_insert_multiple_nodes() {
4482 let db = NamedTempFile::new().expect("temporary db");
4483 let writer = WriterActor::start(
4484 db.path(),
4485 Arc::new(SchemaManager::new()),
4486 ProvenanceMode::Warn,
4487 Arc::new(TelemetryCounters::default()),
4488 )
4489 .expect("writer");
4490
4491 let nodes: Vec<NodeInsert> = (0..100)
4492 .map(|i| NodeInsert {
4493 row_id: format!("row-{i}"),
4494 logical_id: format!("lg-{i}"),
4495 kind: "Note".to_owned(),
4496 properties: "{}".to_owned(),
4497 source_ref: Some("batch-src".to_owned()),
4498 upsert: false,
4499 chunk_policy: ChunkPolicy::Preserve,
4500 })
4501 .collect();
4502
4503 writer
4504 .submit(WriteRequest {
4505 label: "batch".to_owned(),
4506 nodes,
4507 node_retires: vec![],
4508 edges: vec![],
4509 edge_retires: vec![],
4510 chunks: vec![],
4511 runs: vec![],
4512 steps: vec![],
4513 actions: vec![],
4514 optional_backfills: vec![],
4515 vec_inserts: vec![],
4516 operational_writes: vec![],
4517 })
4518 .expect("batch write");
4519
4520 let conn = rusqlite::Connection::open(db.path()).expect("open");
4521 let count: i64 = conn
4522 .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
4523 .expect("count nodes");
4524 assert_eq!(
4525 count, 100,
4526 "all 100 nodes must be present after batch insert"
4527 );
4528 }
4529
4530 #[test]
4533 fn prepare_write_rejects_empty_node_row_id() {
4534 let db = NamedTempFile::new().expect("temporary db");
4535 let writer = WriterActor::start(
4536 db.path(),
4537 Arc::new(SchemaManager::new()),
4538 ProvenanceMode::Warn,
4539 Arc::new(TelemetryCounters::default()),
4540 )
4541 .expect("writer");
4542
4543 let result = writer.submit(WriteRequest {
4544 label: "test".to_owned(),
4545 nodes: vec![NodeInsert {
4546 row_id: String::new(),
4547 logical_id: "lg-1".to_owned(),
4548 kind: "Note".to_owned(),
4549 properties: "{}".to_owned(),
4550 source_ref: None,
4551 upsert: false,
4552 chunk_policy: ChunkPolicy::Preserve,
4553 }],
4554 node_retires: vec![],
4555 edges: vec![],
4556 edge_retires: vec![],
4557 chunks: vec![],
4558 runs: vec![],
4559 steps: vec![],
4560 actions: vec![],
4561 optional_backfills: vec![],
4562 vec_inserts: vec![],
4563 operational_writes: vec![],
4564 });
4565
4566 assert!(
4567 matches!(result, Err(EngineError::InvalidWrite(_))),
4568 "empty row_id must be rejected"
4569 );
4570 }
4571
4572 #[test]
4573 fn prepare_write_rejects_empty_node_logical_id() {
4574 let db = NamedTempFile::new().expect("temporary db");
4575 let writer = WriterActor::start(
4576 db.path(),
4577 Arc::new(SchemaManager::new()),
4578 ProvenanceMode::Warn,
4579 Arc::new(TelemetryCounters::default()),
4580 )
4581 .expect("writer");
4582
4583 let result = writer.submit(WriteRequest {
4584 label: "test".to_owned(),
4585 nodes: vec![NodeInsert {
4586 row_id: "row-1".to_owned(),
4587 logical_id: String::new(),
4588 kind: "Note".to_owned(),
4589 properties: "{}".to_owned(),
4590 source_ref: None,
4591 upsert: false,
4592 chunk_policy: ChunkPolicy::Preserve,
4593 }],
4594 node_retires: vec![],
4595 edges: vec![],
4596 edge_retires: vec![],
4597 chunks: vec![],
4598 runs: vec![],
4599 steps: vec![],
4600 actions: vec![],
4601 optional_backfills: vec![],
4602 vec_inserts: vec![],
4603 operational_writes: vec![],
4604 });
4605
4606 assert!(
4607 matches!(result, Err(EngineError::InvalidWrite(_))),
4608 "empty logical_id must be rejected"
4609 );
4610 }
4611
4612 #[test]
4613 fn prepare_write_rejects_duplicate_row_ids_in_request() {
4614 let db = NamedTempFile::new().expect("temporary db");
4615 let writer = WriterActor::start(
4616 db.path(),
4617 Arc::new(SchemaManager::new()),
4618 ProvenanceMode::Warn,
4619 Arc::new(TelemetryCounters::default()),
4620 )
4621 .expect("writer");
4622
4623 let result = writer.submit(WriteRequest {
4624 label: "test".to_owned(),
4625 nodes: vec![
4626 NodeInsert {
4627 row_id: "row-1".to_owned(),
4628 logical_id: "lg-1".to_owned(),
4629 kind: "Note".to_owned(),
4630 properties: "{}".to_owned(),
4631 source_ref: None,
4632 upsert: false,
4633 chunk_policy: ChunkPolicy::Preserve,
4634 },
4635 NodeInsert {
4636 row_id: "row-1".to_owned(), logical_id: "lg-2".to_owned(),
4638 kind: "Note".to_owned(),
4639 properties: "{}".to_owned(),
4640 source_ref: None,
4641 upsert: false,
4642 chunk_policy: ChunkPolicy::Preserve,
4643 },
4644 ],
4645 node_retires: vec![],
4646 edges: vec![],
4647 edge_retires: vec![],
4648 chunks: vec![],
4649 runs: vec![],
4650 steps: vec![],
4651 actions: vec![],
4652 optional_backfills: vec![],
4653 vec_inserts: vec![],
4654 operational_writes: vec![],
4655 });
4656
4657 assert!(
4658 matches!(result, Err(EngineError::InvalidWrite(_))),
4659 "duplicate row_id within request must be rejected"
4660 );
4661 }
4662
4663 #[test]
4664 fn prepare_write_rejects_empty_chunk_id() {
4665 let db = NamedTempFile::new().expect("temporary db");
4666 let writer = WriterActor::start(
4667 db.path(),
4668 Arc::new(SchemaManager::new()),
4669 ProvenanceMode::Warn,
4670 Arc::new(TelemetryCounters::default()),
4671 )
4672 .expect("writer");
4673
4674 let result = writer.submit(WriteRequest {
4675 label: "test".to_owned(),
4676 nodes: vec![NodeInsert {
4677 row_id: "row-1".to_owned(),
4678 logical_id: "lg-1".to_owned(),
4679 kind: "Note".to_owned(),
4680 properties: "{}".to_owned(),
4681 source_ref: None,
4682 upsert: false,
4683 chunk_policy: ChunkPolicy::Preserve,
4684 }],
4685 node_retires: vec![],
4686 edges: vec![],
4687 edge_retires: vec![],
4688 chunks: vec![ChunkInsert {
4689 id: String::new(),
4690 node_logical_id: "lg-1".to_owned(),
4691 text_content: "some text".to_owned(),
4692 byte_start: None,
4693 byte_end: None,
4694 }],
4695 runs: vec![],
4696 steps: vec![],
4697 actions: vec![],
4698 optional_backfills: vec![],
4699 vec_inserts: vec![],
4700 operational_writes: vec![],
4701 });
4702
4703 assert!(
4704 matches!(result, Err(EngineError::InvalidWrite(_))),
4705 "empty chunk id must be rejected"
4706 );
4707 }
4708
4709 #[test]
4712 fn writer_receipt_warns_on_step_without_source_ref() {
4713 let db = NamedTempFile::new().expect("temporary db");
4714 let writer = WriterActor::start(
4715 db.path(),
4716 Arc::new(SchemaManager::new()),
4717 ProvenanceMode::Warn,
4718 Arc::new(TelemetryCounters::default()),
4719 )
4720 .expect("writer");
4721
4722 writer
4724 .submit(WriteRequest {
4725 label: "seed-run".to_owned(),
4726 nodes: vec![],
4727 node_retires: vec![],
4728 edges: vec![],
4729 edge_retires: vec![],
4730 chunks: vec![],
4731 runs: vec![RunInsert {
4732 id: "run-1".to_owned(),
4733 kind: "session".to_owned(),
4734 status: "active".to_owned(),
4735 properties: "{}".to_owned(),
4736 source_ref: Some("src-1".to_owned()),
4737 upsert: false,
4738 supersedes_id: None,
4739 }],
4740 steps: vec![],
4741 actions: vec![],
4742 optional_backfills: vec![],
4743 vec_inserts: vec![],
4744 operational_writes: vec![],
4745 })
4746 .expect("seed run");
4747
4748 let receipt = writer
4749 .submit(WriteRequest {
4750 label: "test".to_owned(),
4751 nodes: vec![],
4752 node_retires: vec![],
4753 edges: vec![],
4754 edge_retires: vec![],
4755 chunks: vec![],
4756 runs: vec![],
4757 steps: vec![StepInsert {
4758 id: "step-1".to_owned(),
4759 run_id: "run-1".to_owned(),
4760 kind: "llm_call".to_owned(),
4761 status: "completed".to_owned(),
4762 properties: "{}".to_owned(),
4763 source_ref: None,
4764 upsert: false,
4765 supersedes_id: None,
4766 }],
4767 actions: vec![],
4768 optional_backfills: vec![],
4769 vec_inserts: vec![],
4770 operational_writes: vec![],
4771 })
4772 .expect("write");
4773
4774 assert!(
4775 !receipt.provenance_warnings.is_empty(),
4776 "step insert without source_ref must emit a provenance warning"
4777 );
4778 }
4779
4780 #[test]
4781 fn writer_receipt_warns_on_action_without_source_ref() {
4782 let db = NamedTempFile::new().expect("temporary db");
4783 let writer = WriterActor::start(
4784 db.path(),
4785 Arc::new(SchemaManager::new()),
4786 ProvenanceMode::Warn,
4787 Arc::new(TelemetryCounters::default()),
4788 )
4789 .expect("writer");
4790
4791 writer
4793 .submit(WriteRequest {
4794 label: "seed".to_owned(),
4795 nodes: vec![],
4796 node_retires: vec![],
4797 edges: vec![],
4798 edge_retires: vec![],
4799 chunks: vec![],
4800 runs: vec![RunInsert {
4801 id: "run-1".to_owned(),
4802 kind: "session".to_owned(),
4803 status: "active".to_owned(),
4804 properties: "{}".to_owned(),
4805 source_ref: Some("src-1".to_owned()),
4806 upsert: false,
4807 supersedes_id: None,
4808 }],
4809 steps: vec![StepInsert {
4810 id: "step-1".to_owned(),
4811 run_id: "run-1".to_owned(),
4812 kind: "llm_call".to_owned(),
4813 status: "completed".to_owned(),
4814 properties: "{}".to_owned(),
4815 source_ref: Some("src-1".to_owned()),
4816 upsert: false,
4817 supersedes_id: None,
4818 }],
4819 actions: vec![],
4820 optional_backfills: vec![],
4821 vec_inserts: vec![],
4822 operational_writes: vec![],
4823 })
4824 .expect("seed");
4825
4826 let receipt = writer
4827 .submit(WriteRequest {
4828 label: "test".to_owned(),
4829 nodes: vec![],
4830 node_retires: vec![],
4831 edges: vec![],
4832 edge_retires: vec![],
4833 chunks: vec![],
4834 runs: vec![],
4835 steps: vec![],
4836 actions: vec![ActionInsert {
4837 id: "action-1".to_owned(),
4838 step_id: "step-1".to_owned(),
4839 kind: "tool_call".to_owned(),
4840 status: "completed".to_owned(),
4841 properties: "{}".to_owned(),
4842 source_ref: None,
4843 upsert: false,
4844 supersedes_id: None,
4845 }],
4846 optional_backfills: vec![],
4847 vec_inserts: vec![],
4848 operational_writes: vec![],
4849 })
4850 .expect("write");
4851
4852 assert!(
4853 !receipt.provenance_warnings.is_empty(),
4854 "action insert without source_ref must emit a provenance warning"
4855 );
4856 }
4857
4858 #[test]
4859 fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
4860 let db = NamedTempFile::new().expect("temporary db");
4861 let writer = WriterActor::start(
4862 db.path(),
4863 Arc::new(SchemaManager::new()),
4864 ProvenanceMode::Warn,
4865 Arc::new(TelemetryCounters::default()),
4866 )
4867 .expect("writer");
4868
4869 let receipt = writer
4870 .submit(WriteRequest {
4871 label: "test".to_owned(),
4872 nodes: vec![NodeInsert {
4873 row_id: "row-1".to_owned(),
4874 logical_id: "node-1".to_owned(),
4875 kind: "Note".to_owned(),
4876 properties: "{}".to_owned(),
4877 source_ref: Some("src-1".to_owned()),
4878 upsert: false,
4879 chunk_policy: ChunkPolicy::Preserve,
4880 }],
4881 node_retires: vec![],
4882 edges: vec![],
4883 edge_retires: vec![],
4884 chunks: vec![],
4885 runs: vec![RunInsert {
4886 id: "run-1".to_owned(),
4887 kind: "session".to_owned(),
4888 status: "active".to_owned(),
4889 properties: "{}".to_owned(),
4890 source_ref: Some("src-1".to_owned()),
4891 upsert: false,
4892 supersedes_id: None,
4893 }],
4894 steps: vec![StepInsert {
4895 id: "step-1".to_owned(),
4896 run_id: "run-1".to_owned(),
4897 kind: "llm_call".to_owned(),
4898 status: "completed".to_owned(),
4899 properties: "{}".to_owned(),
4900 source_ref: Some("src-1".to_owned()),
4901 upsert: false,
4902 supersedes_id: None,
4903 }],
4904 actions: vec![ActionInsert {
4905 id: "action-1".to_owned(),
4906 step_id: "step-1".to_owned(),
4907 kind: "tool_call".to_owned(),
4908 status: "completed".to_owned(),
4909 properties: "{}".to_owned(),
4910 source_ref: Some("src-1".to_owned()),
4911 upsert: false,
4912 supersedes_id: None,
4913 }],
4914 optional_backfills: vec![],
4915 vec_inserts: vec![],
4916 operational_writes: vec![],
4917 })
4918 .expect("write");
4919
4920 assert!(
4921 receipt.provenance_warnings.is_empty(),
4922 "no warnings expected when all types have source_ref; got: {:?}",
4923 receipt.provenance_warnings
4924 );
4925 }
4926
4927 #[test]
4930 fn default_provenance_mode_is_warn() {
4931 let db = NamedTempFile::new().expect("temporary db");
4933 let writer = WriterActor::start(
4934 db.path(),
4935 Arc::new(SchemaManager::new()),
4936 ProvenanceMode::default(),
4937 Arc::new(TelemetryCounters::default()),
4938 )
4939 .expect("writer");
4940
4941 let receipt = writer
4942 .submit(WriteRequest {
4943 label: "test".to_owned(),
4944 nodes: vec![NodeInsert {
4945 row_id: "row-1".to_owned(),
4946 logical_id: "node-1".to_owned(),
4947 kind: "Note".to_owned(),
4948 properties: "{}".to_owned(),
4949 source_ref: None,
4950 upsert: false,
4951 chunk_policy: ChunkPolicy::Preserve,
4952 }],
4953 node_retires: vec![],
4954 edges: vec![],
4955 edge_retires: vec![],
4956 chunks: vec![],
4957 runs: vec![],
4958 steps: vec![],
4959 actions: vec![],
4960 optional_backfills: vec![],
4961 vec_inserts: vec![],
4962 operational_writes: vec![],
4963 })
4964 .expect("Warn mode must not reject missing source_ref");
4965
4966 assert!(
4967 !receipt.provenance_warnings.is_empty(),
4968 "Warn mode must emit a warning instead of rejecting"
4969 );
4970 }
4971
4972 #[test]
4973 fn require_mode_rejects_node_without_source_ref() {
4974 let db = NamedTempFile::new().expect("temporary db");
4975 let writer = WriterActor::start(
4976 db.path(),
4977 Arc::new(SchemaManager::new()),
4978 ProvenanceMode::Require,
4979 Arc::new(TelemetryCounters::default()),
4980 )
4981 .expect("writer");
4982
4983 let result = writer.submit(WriteRequest {
4984 label: "test".to_owned(),
4985 nodes: vec![NodeInsert {
4986 row_id: "row-1".to_owned(),
4987 logical_id: "node-1".to_owned(),
4988 kind: "Note".to_owned(),
4989 properties: "{}".to_owned(),
4990 source_ref: None,
4991 upsert: false,
4992 chunk_policy: ChunkPolicy::Preserve,
4993 }],
4994 node_retires: vec![],
4995 edges: vec![],
4996 edge_retires: vec![],
4997 chunks: vec![],
4998 runs: vec![],
4999 steps: vec![],
5000 actions: vec![],
5001 optional_backfills: vec![],
5002 vec_inserts: vec![],
5003 operational_writes: vec![],
5004 });
5005
5006 assert!(
5007 matches!(result, Err(EngineError::InvalidWrite(_))),
5008 "Require mode must reject node without source_ref"
5009 );
5010 }
5011
5012 #[test]
5013 fn require_mode_accepts_node_with_source_ref() {
5014 let db = NamedTempFile::new().expect("temporary db");
5015 let writer = WriterActor::start(
5016 db.path(),
5017 Arc::new(SchemaManager::new()),
5018 ProvenanceMode::Require,
5019 Arc::new(TelemetryCounters::default()),
5020 )
5021 .expect("writer");
5022
5023 let result = writer.submit(WriteRequest {
5024 label: "test".to_owned(),
5025 nodes: vec![NodeInsert {
5026 row_id: "row-1".to_owned(),
5027 logical_id: "node-1".to_owned(),
5028 kind: "Note".to_owned(),
5029 properties: "{}".to_owned(),
5030 source_ref: Some("src-1".to_owned()),
5031 upsert: false,
5032 chunk_policy: ChunkPolicy::Preserve,
5033 }],
5034 node_retires: vec![],
5035 edges: vec![],
5036 edge_retires: vec![],
5037 chunks: vec![],
5038 runs: vec![],
5039 steps: vec![],
5040 actions: vec![],
5041 optional_backfills: vec![],
5042 vec_inserts: vec![],
5043 operational_writes: vec![],
5044 });
5045
5046 assert!(
5047 result.is_ok(),
5048 "Require mode must accept node with source_ref"
5049 );
5050 }
5051
5052 #[test]
5053 fn require_mode_rejects_edge_without_source_ref() {
5054 let db = NamedTempFile::new().expect("temporary db");
5055 let writer = WriterActor::start(
5056 db.path(),
5057 Arc::new(SchemaManager::new()),
5058 ProvenanceMode::Require,
5059 Arc::new(TelemetryCounters::default()),
5060 )
5061 .expect("writer");
5062
5063 let result = writer.submit(WriteRequest {
5065 label: "test".to_owned(),
5066 nodes: vec![
5067 NodeInsert {
5068 row_id: "row-a".to_owned(),
5069 logical_id: "node-a".to_owned(),
5070 kind: "Note".to_owned(),
5071 properties: "{}".to_owned(),
5072 source_ref: Some("src-1".to_owned()),
5073 upsert: false,
5074 chunk_policy: ChunkPolicy::Preserve,
5075 },
5076 NodeInsert {
5077 row_id: "row-b".to_owned(),
5078 logical_id: "node-b".to_owned(),
5079 kind: "Note".to_owned(),
5080 properties: "{}".to_owned(),
5081 source_ref: Some("src-1".to_owned()),
5082 upsert: false,
5083 chunk_policy: ChunkPolicy::Preserve,
5084 },
5085 ],
5086 node_retires: vec![],
5087 edges: vec![EdgeInsert {
5088 row_id: "edge-row-1".to_owned(),
5089 logical_id: "edge-1".to_owned(),
5090 source_logical_id: "node-a".to_owned(),
5091 target_logical_id: "node-b".to_owned(),
5092 kind: "LINKS_TO".to_owned(),
5093 properties: "{}".to_owned(),
5094 source_ref: None,
5095 upsert: false,
5096 }],
5097 edge_retires: vec![],
5098 chunks: vec![],
5099 runs: vec![],
5100 steps: vec![],
5101 actions: vec![],
5102 optional_backfills: vec![],
5103 vec_inserts: vec![],
5104 operational_writes: vec![],
5105 });
5106
5107 assert!(
5108 matches!(result, Err(EngineError::InvalidWrite(_))),
5109 "Require mode must reject edge without source_ref"
5110 );
5111 }
5112
5113 #[test]
5116 fn fts_row_has_correct_kind_from_co_submitted_node() {
5117 let db = NamedTempFile::new().expect("temporary db");
5118 let writer = WriterActor::start(
5119 db.path(),
5120 Arc::new(SchemaManager::new()),
5121 ProvenanceMode::Warn,
5122 Arc::new(TelemetryCounters::default()),
5123 )
5124 .expect("writer");
5125
5126 writer
5127 .submit(WriteRequest {
5128 label: "test".to_owned(),
5129 nodes: vec![NodeInsert {
5130 row_id: "row-1".to_owned(),
5131 logical_id: "node-1".to_owned(),
5132 kind: "Meeting".to_owned(),
5133 properties: "{}".to_owned(),
5134 source_ref: Some("src-1".to_owned()),
5135 upsert: false,
5136 chunk_policy: ChunkPolicy::Preserve,
5137 }],
5138 node_retires: vec![],
5139 edges: vec![],
5140 edge_retires: vec![],
5141 chunks: vec![ChunkInsert {
5142 id: "chunk-1".to_owned(),
5143 node_logical_id: "node-1".to_owned(),
5144 text_content: "some text".to_owned(),
5145 byte_start: None,
5146 byte_end: None,
5147 }],
5148 runs: vec![],
5149 steps: vec![],
5150 actions: vec![],
5151 optional_backfills: vec![],
5152 vec_inserts: vec![],
5153 operational_writes: vec![],
5154 })
5155 .expect("write");
5156
5157 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5158 let kind: String = conn
5159 .query_row(
5160 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5161 [],
5162 |row| row.get(0),
5163 )
5164 .expect("fts row");
5165
5166 assert_eq!(kind, "Meeting");
5167 }
5168
5169 #[test]
5170 fn fts_row_has_correct_text_content() {
5171 let db = NamedTempFile::new().expect("temporary db");
5172 let writer = WriterActor::start(
5173 db.path(),
5174 Arc::new(SchemaManager::new()),
5175 ProvenanceMode::Warn,
5176 Arc::new(TelemetryCounters::default()),
5177 )
5178 .expect("writer");
5179
5180 writer
5181 .submit(WriteRequest {
5182 label: "test".to_owned(),
5183 nodes: vec![NodeInsert {
5184 row_id: "row-1".to_owned(),
5185 logical_id: "node-1".to_owned(),
5186 kind: "Note".to_owned(),
5187 properties: "{}".to_owned(),
5188 source_ref: Some("src-1".to_owned()),
5189 upsert: false,
5190 chunk_policy: ChunkPolicy::Preserve,
5191 }],
5192 node_retires: vec![],
5193 edges: vec![],
5194 edge_retires: vec![],
5195 chunks: vec![ChunkInsert {
5196 id: "chunk-1".to_owned(),
5197 node_logical_id: "node-1".to_owned(),
5198 text_content: "exactly this text".to_owned(),
5199 byte_start: None,
5200 byte_end: None,
5201 }],
5202 runs: vec![],
5203 steps: vec![],
5204 actions: vec![],
5205 optional_backfills: vec![],
5206 vec_inserts: vec![],
5207 operational_writes: vec![],
5208 })
5209 .expect("write");
5210
5211 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5212 let text: String = conn
5213 .query_row(
5214 "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5215 [],
5216 |row| row.get(0),
5217 )
5218 .expect("fts row");
5219
5220 assert_eq!(text, "exactly this text");
5221 }
5222
5223 #[test]
5224 fn fts_row_has_correct_kind_from_pre_existing_node() {
5225 let db = NamedTempFile::new().expect("temporary db");
5226 let writer = WriterActor::start(
5227 db.path(),
5228 Arc::new(SchemaManager::new()),
5229 ProvenanceMode::Warn,
5230 Arc::new(TelemetryCounters::default()),
5231 )
5232 .expect("writer");
5233
5234 writer
5236 .submit(WriteRequest {
5237 label: "r1".to_owned(),
5238 nodes: vec![NodeInsert {
5239 row_id: "row-1".to_owned(),
5240 logical_id: "node-1".to_owned(),
5241 kind: "Document".to_owned(),
5242 properties: "{}".to_owned(),
5243 source_ref: Some("src-1".to_owned()),
5244 upsert: false,
5245 chunk_policy: ChunkPolicy::Preserve,
5246 }],
5247 node_retires: vec![],
5248 edges: vec![],
5249 edge_retires: vec![],
5250 chunks: vec![],
5251 runs: vec![],
5252 steps: vec![],
5253 actions: vec![],
5254 optional_backfills: vec![],
5255 vec_inserts: vec![],
5256 operational_writes: vec![],
5257 })
5258 .expect("r1 write");
5259
5260 writer
5262 .submit(WriteRequest {
5263 label: "r2".to_owned(),
5264 nodes: vec![],
5265 node_retires: vec![],
5266 edges: vec![],
5267 edge_retires: vec![],
5268 chunks: vec![ChunkInsert {
5269 id: "chunk-1".to_owned(),
5270 node_logical_id: "node-1".to_owned(),
5271 text_content: "some text".to_owned(),
5272 byte_start: None,
5273 byte_end: None,
5274 }],
5275 runs: vec![],
5276 steps: vec![],
5277 actions: vec![],
5278 optional_backfills: vec![],
5279 vec_inserts: vec![],
5280 operational_writes: vec![],
5281 })
5282 .expect("r2 write");
5283
5284 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5285 let kind: String = conn
5286 .query_row(
5287 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5288 [],
5289 |row| row.get(0),
5290 )
5291 .expect("fts row");
5292
5293 assert_eq!(kind, "Document");
5294 }
5295
5296 #[test]
5297 fn fts_derives_rows_for_multiple_chunks_per_node() {
5298 let db = NamedTempFile::new().expect("temporary db");
5299 let writer = WriterActor::start(
5300 db.path(),
5301 Arc::new(SchemaManager::new()),
5302 ProvenanceMode::Warn,
5303 Arc::new(TelemetryCounters::default()),
5304 )
5305 .expect("writer");
5306
5307 writer
5308 .submit(WriteRequest {
5309 label: "test".to_owned(),
5310 nodes: vec![NodeInsert {
5311 row_id: "row-1".to_owned(),
5312 logical_id: "node-1".to_owned(),
5313 kind: "Meeting".to_owned(),
5314 properties: "{}".to_owned(),
5315 source_ref: Some("src-1".to_owned()),
5316 upsert: false,
5317 chunk_policy: ChunkPolicy::Preserve,
5318 }],
5319 node_retires: vec![],
5320 edges: vec![],
5321 edge_retires: vec![],
5322 chunks: vec![
5323 ChunkInsert {
5324 id: "chunk-a".to_owned(),
5325 node_logical_id: "node-1".to_owned(),
5326 text_content: "intro".to_owned(),
5327 byte_start: None,
5328 byte_end: None,
5329 },
5330 ChunkInsert {
5331 id: "chunk-b".to_owned(),
5332 node_logical_id: "node-1".to_owned(),
5333 text_content: "body".to_owned(),
5334 byte_start: None,
5335 byte_end: None,
5336 },
5337 ChunkInsert {
5338 id: "chunk-c".to_owned(),
5339 node_logical_id: "node-1".to_owned(),
5340 text_content: "conclusion".to_owned(),
5341 byte_start: None,
5342 byte_end: None,
5343 },
5344 ],
5345 runs: vec![],
5346 steps: vec![],
5347 actions: vec![],
5348 optional_backfills: vec![],
5349 vec_inserts: vec![],
5350 operational_writes: vec![],
5351 })
5352 .expect("write");
5353
5354 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5355 let count: i64 = conn
5356 .query_row(
5357 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5358 [],
5359 |row| row.get(0),
5360 )
5361 .expect("fts count");
5362
5363 assert_eq!(count, 3, "three chunks must produce three FTS rows");
5364 }
5365
5366 #[test]
5367 fn fts_resolves_mixed_fast_and_db_paths() {
5368 let db = NamedTempFile::new().expect("temporary db");
5369 let writer = WriterActor::start(
5370 db.path(),
5371 Arc::new(SchemaManager::new()),
5372 ProvenanceMode::Warn,
5373 Arc::new(TelemetryCounters::default()),
5374 )
5375 .expect("writer");
5376
5377 writer
5379 .submit(WriteRequest {
5380 label: "seed".to_owned(),
5381 nodes: vec![NodeInsert {
5382 row_id: "row-existing".to_owned(),
5383 logical_id: "existing-node".to_owned(),
5384 kind: "Archive".to_owned(),
5385 properties: "{}".to_owned(),
5386 source_ref: Some("src-1".to_owned()),
5387 upsert: false,
5388 chunk_policy: ChunkPolicy::Preserve,
5389 }],
5390 node_retires: vec![],
5391 edges: vec![],
5392 edge_retires: vec![],
5393 chunks: vec![],
5394 runs: vec![],
5395 steps: vec![],
5396 actions: vec![],
5397 optional_backfills: vec![],
5398 vec_inserts: vec![],
5399 operational_writes: vec![],
5400 })
5401 .expect("seed");
5402
5403 writer
5405 .submit(WriteRequest {
5406 label: "mixed".to_owned(),
5407 nodes: vec![NodeInsert {
5408 row_id: "row-new".to_owned(),
5409 logical_id: "new-node".to_owned(),
5410 kind: "Inbox".to_owned(),
5411 properties: "{}".to_owned(),
5412 source_ref: Some("src-2".to_owned()),
5413 upsert: false,
5414 chunk_policy: ChunkPolicy::Preserve,
5415 }],
5416 node_retires: vec![],
5417 edges: vec![],
5418 edge_retires: vec![],
5419 chunks: vec![
5420 ChunkInsert {
5421 id: "chunk-fast".to_owned(),
5422 node_logical_id: "new-node".to_owned(),
5423 text_content: "new content".to_owned(),
5424 byte_start: None,
5425 byte_end: None,
5426 },
5427 ChunkInsert {
5428 id: "chunk-db".to_owned(),
5429 node_logical_id: "existing-node".to_owned(),
5430 text_content: "archive content".to_owned(),
5431 byte_start: None,
5432 byte_end: None,
5433 },
5434 ],
5435 runs: vec![],
5436 steps: vec![],
5437 actions: vec![],
5438 optional_backfills: vec![],
5439 vec_inserts: vec![],
5440 operational_writes: vec![],
5441 })
5442 .expect("mixed write");
5443
5444 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5445 let fast_kind: String = conn
5446 .query_row(
5447 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
5448 [],
5449 |row| row.get(0),
5450 )
5451 .expect("fast path fts row");
5452 let db_kind: String = conn
5453 .query_row(
5454 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
5455 [],
5456 |row| row.get(0),
5457 )
5458 .expect("db path fts row");
5459
5460 assert_eq!(fast_kind, "Inbox");
5461 assert_eq!(db_kind, "Archive");
5462 }
5463
5464 #[test]
5465 fn prepare_write_rejects_empty_chunk_text() {
5466 let db = NamedTempFile::new().expect("temporary db");
5467 let writer = WriterActor::start(
5468 db.path(),
5469 Arc::new(SchemaManager::new()),
5470 ProvenanceMode::Warn,
5471 Arc::new(TelemetryCounters::default()),
5472 )
5473 .expect("writer");
5474
5475 let result = writer.submit(WriteRequest {
5476 label: "test".to_owned(),
5477 nodes: vec![NodeInsert {
5478 row_id: "row-1".to_owned(),
5479 logical_id: "node-1".to_owned(),
5480 kind: "Note".to_owned(),
5481 properties: "{}".to_owned(),
5482 source_ref: None,
5483 upsert: false,
5484 chunk_policy: ChunkPolicy::Preserve,
5485 }],
5486 node_retires: vec![],
5487 edges: vec![],
5488 edge_retires: vec![],
5489 chunks: vec![ChunkInsert {
5490 id: "chunk-1".to_owned(),
5491 node_logical_id: "node-1".to_owned(),
5492 text_content: String::new(),
5493 byte_start: None,
5494 byte_end: None,
5495 }],
5496 runs: vec![],
5497 steps: vec![],
5498 actions: vec![],
5499 optional_backfills: vec![],
5500 vec_inserts: vec![],
5501 operational_writes: vec![],
5502 });
5503
5504 assert!(
5505 matches!(result, Err(EngineError::InvalidWrite(_))),
5506 "empty text_content must be rejected"
5507 );
5508 }
5509
5510 #[test]
5511 fn receipt_reports_zero_backfills_when_none_submitted() {
5512 let db = NamedTempFile::new().expect("temporary db");
5513 let writer = WriterActor::start(
5514 db.path(),
5515 Arc::new(SchemaManager::new()),
5516 ProvenanceMode::Warn,
5517 Arc::new(TelemetryCounters::default()),
5518 )
5519 .expect("writer");
5520
5521 let receipt = writer
5522 .submit(WriteRequest {
5523 label: "test".to_owned(),
5524 nodes: vec![NodeInsert {
5525 row_id: "row-1".to_owned(),
5526 logical_id: "node-1".to_owned(),
5527 kind: "Note".to_owned(),
5528 properties: "{}".to_owned(),
5529 source_ref: Some("src-1".to_owned()),
5530 upsert: false,
5531 chunk_policy: ChunkPolicy::Preserve,
5532 }],
5533 node_retires: vec![],
5534 edges: vec![],
5535 edge_retires: vec![],
5536 chunks: vec![],
5537 runs: vec![],
5538 steps: vec![],
5539 actions: vec![],
5540 optional_backfills: vec![],
5541 vec_inserts: vec![],
5542 operational_writes: vec![],
5543 })
5544 .expect("write");
5545
5546 assert_eq!(receipt.optional_backfill_count, 0);
5547 }
5548
5549 #[test]
5550 fn receipt_reports_correct_backfill_count() {
5551 let db = NamedTempFile::new().expect("temporary db");
5552 let writer = WriterActor::start(
5553 db.path(),
5554 Arc::new(SchemaManager::new()),
5555 ProvenanceMode::Warn,
5556 Arc::new(TelemetryCounters::default()),
5557 )
5558 .expect("writer");
5559
5560 let receipt = writer
5561 .submit(WriteRequest {
5562 label: "test".to_owned(),
5563 nodes: vec![NodeInsert {
5564 row_id: "row-1".to_owned(),
5565 logical_id: "node-1".to_owned(),
5566 kind: "Note".to_owned(),
5567 properties: "{}".to_owned(),
5568 source_ref: Some("src-1".to_owned()),
5569 upsert: false,
5570 chunk_policy: ChunkPolicy::Preserve,
5571 }],
5572 node_retires: vec![],
5573 edges: vec![],
5574 edge_retires: vec![],
5575 chunks: vec![],
5576 runs: vec![],
5577 steps: vec![],
5578 actions: vec![],
5579 optional_backfills: vec![
5580 OptionalProjectionTask {
5581 target: ProjectionTarget::Fts,
5582 payload: "p1".to_owned(),
5583 },
5584 OptionalProjectionTask {
5585 target: ProjectionTarget::Vec,
5586 payload: "p2".to_owned(),
5587 },
5588 OptionalProjectionTask {
5589 target: ProjectionTarget::All,
5590 payload: "p3".to_owned(),
5591 },
5592 ],
5593 vec_inserts: vec![],
5594 operational_writes: vec![],
5595 })
5596 .expect("write");
5597
5598 assert_eq!(receipt.optional_backfill_count, 3);
5599 }
5600
5601 #[test]
5602 fn backfill_tasks_are_not_executed_during_write() {
5603 let db = NamedTempFile::new().expect("temporary db");
5604 let writer = WriterActor::start(
5605 db.path(),
5606 Arc::new(SchemaManager::new()),
5607 ProvenanceMode::Warn,
5608 Arc::new(TelemetryCounters::default()),
5609 )
5610 .expect("writer");
5611
5612 writer
5615 .submit(WriteRequest {
5616 label: "test".to_owned(),
5617 nodes: vec![NodeInsert {
5618 row_id: "row-1".to_owned(),
5619 logical_id: "node-1".to_owned(),
5620 kind: "Note".to_owned(),
5621 properties: "{}".to_owned(),
5622 source_ref: Some("src-1".to_owned()),
5623 upsert: false,
5624 chunk_policy: ChunkPolicy::Preserve,
5625 }],
5626 node_retires: vec![],
5627 edges: vec![],
5628 edge_retires: vec![],
5629 chunks: vec![ChunkInsert {
5630 id: "chunk-1".to_owned(),
5631 node_logical_id: "node-1".to_owned(),
5632 text_content: "required text".to_owned(),
5633 byte_start: None,
5634 byte_end: None,
5635 }],
5636 runs: vec![],
5637 steps: vec![],
5638 actions: vec![],
5639 optional_backfills: vec![OptionalProjectionTask {
5640 target: ProjectionTarget::Fts,
5641 payload: "backfill-payload".to_owned(),
5642 }],
5643 vec_inserts: vec![],
5644 operational_writes: vec![],
5645 })
5646 .expect("write");
5647
5648 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5649 let count: i64 = conn
5650 .query_row(
5651 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5652 [],
5653 |row| row.get(0),
5654 )
5655 .expect("fts count");
5656
5657 assert_eq!(count, 1, "backfill task must not create extra FTS rows");
5658 }
5659
5660 #[test]
5661 fn fts_row_uses_new_kind_after_node_replace() {
5662 let db = NamedTempFile::new().expect("temporary db");
5663 let writer = WriterActor::start(
5664 db.path(),
5665 Arc::new(SchemaManager::new()),
5666 ProvenanceMode::Warn,
5667 Arc::new(TelemetryCounters::default()),
5668 )
5669 .expect("writer");
5670
5671 writer
5673 .submit(WriteRequest {
5674 label: "v1".to_owned(),
5675 nodes: vec![NodeInsert {
5676 row_id: "row-1".to_owned(),
5677 logical_id: "node-1".to_owned(),
5678 kind: "Note".to_owned(),
5679 properties: "{}".to_owned(),
5680 source_ref: Some("src-1".to_owned()),
5681 upsert: false,
5682 chunk_policy: ChunkPolicy::Preserve,
5683 }],
5684 node_retires: vec![],
5685 edges: vec![],
5686 edge_retires: vec![],
5687 chunks: vec![ChunkInsert {
5688 id: "chunk-v1".to_owned(),
5689 node_logical_id: "node-1".to_owned(),
5690 text_content: "original".to_owned(),
5691 byte_start: None,
5692 byte_end: None,
5693 }],
5694 runs: vec![],
5695 steps: vec![],
5696 actions: vec![],
5697 optional_backfills: vec![],
5698 vec_inserts: vec![],
5699 operational_writes: vec![],
5700 })
5701 .expect("v1 write");
5702
5703 writer
5705 .submit(WriteRequest {
5706 label: "v2".to_owned(),
5707 nodes: vec![NodeInsert {
5708 row_id: "row-2".to_owned(),
5709 logical_id: "node-1".to_owned(),
5710 kind: "Meeting".to_owned(),
5711 properties: "{}".to_owned(),
5712 source_ref: Some("src-2".to_owned()),
5713 upsert: true,
5714 chunk_policy: ChunkPolicy::Replace,
5715 }],
5716 node_retires: vec![],
5717 edges: vec![],
5718 edge_retires: vec![],
5719 chunks: vec![ChunkInsert {
5720 id: "chunk-v2".to_owned(),
5721 node_logical_id: "node-1".to_owned(),
5722 text_content: "updated".to_owned(),
5723 byte_start: None,
5724 byte_end: None,
5725 }],
5726 runs: vec![],
5727 steps: vec![],
5728 actions: vec![],
5729 optional_backfills: vec![],
5730 vec_inserts: vec![],
5731 operational_writes: vec![],
5732 })
5733 .expect("v2 write");
5734
5735 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5736
5737 let old_count: i64 = conn
5739 .query_row(
5740 "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
5741 [],
5742 |row| row.get(0),
5743 )
5744 .expect("old fts count");
5745 assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
5746
5747 let new_kind: String = conn
5749 .query_row(
5750 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
5751 [],
5752 |row| row.get(0),
5753 )
5754 .expect("new fts row");
5755 assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
5756 }
5757
5758 #[test]
5761 fn vec_insert_empty_chunk_id_is_rejected() {
5762 let db = NamedTempFile::new().expect("temporary db");
5763 let writer = WriterActor::start(
5764 db.path(),
5765 Arc::new(SchemaManager::new()),
5766 ProvenanceMode::Warn,
5767 Arc::new(TelemetryCounters::default()),
5768 )
5769 .expect("writer");
5770 let result = writer.submit(WriteRequest {
5771 label: "vec-test".to_owned(),
5772 nodes: vec![],
5773 node_retires: vec![],
5774 edges: vec![],
5775 edge_retires: vec![],
5776 chunks: vec![],
5777 runs: vec![],
5778 steps: vec![],
5779 actions: vec![],
5780 optional_backfills: vec![],
5781 vec_inserts: vec![VecInsert {
5782 chunk_id: String::new(),
5783 embedding: vec![0.1, 0.2, 0.3],
5784 }],
5785 operational_writes: vec![],
5786 });
5787 assert!(
5788 matches!(result, Err(EngineError::InvalidWrite(_))),
5789 "empty chunk_id in VecInsert must be rejected"
5790 );
5791 }
5792
5793 #[test]
5794 fn vec_insert_empty_embedding_is_rejected() {
5795 let db = NamedTempFile::new().expect("temporary db");
5796 let writer = WriterActor::start(
5797 db.path(),
5798 Arc::new(SchemaManager::new()),
5799 ProvenanceMode::Warn,
5800 Arc::new(TelemetryCounters::default()),
5801 )
5802 .expect("writer");
5803 let result = writer.submit(WriteRequest {
5804 label: "vec-test".to_owned(),
5805 nodes: vec![],
5806 node_retires: vec![],
5807 edges: vec![],
5808 edge_retires: vec![],
5809 chunks: vec![],
5810 runs: vec![],
5811 steps: vec![],
5812 actions: vec![],
5813 optional_backfills: vec![],
5814 vec_inserts: vec![VecInsert {
5815 chunk_id: "chunk-1".to_owned(),
5816 embedding: vec![],
5817 }],
5818 operational_writes: vec![],
5819 });
5820 assert!(
5821 matches!(result, Err(EngineError::InvalidWrite(_))),
5822 "empty embedding in VecInsert must be rejected"
5823 );
5824 }
5825
5826 #[test]
5827 fn vec_insert_noop_without_feature() {
5828 let db = NamedTempFile::new().expect("temporary db");
5831 let writer = WriterActor::start(
5832 db.path(),
5833 Arc::new(SchemaManager::new()),
5834 ProvenanceMode::Warn,
5835 Arc::new(TelemetryCounters::default()),
5836 )
5837 .expect("writer");
5838 let result = writer.submit(WriteRequest {
5839 label: "vec-noop".to_owned(),
5840 nodes: vec![],
5841 node_retires: vec![],
5842 edges: vec![],
5843 edge_retires: vec![],
5844 chunks: vec![],
5845 runs: vec![],
5846 steps: vec![],
5847 actions: vec![],
5848 optional_backfills: vec![],
5849 vec_inserts: vec![VecInsert {
5850 chunk_id: "chunk-noop".to_owned(),
5851 embedding: vec![1.0, 2.0, 3.0],
5852 }],
5853 operational_writes: vec![],
5854 });
5855 #[cfg(not(feature = "sqlite-vec"))]
5856 result.expect("noop VecInsert without feature must succeed");
5857 #[cfg(feature = "sqlite-vec")]
5859 let _ = result;
5860 }
5861
5862 #[cfg(feature = "sqlite-vec")]
5863 #[test]
5864 fn node_retire_preserves_vec_rows_for_later_restore() {
5865 use crate::sqlite::open_connection_with_vec;
5866
5867 let db = NamedTempFile::new().expect("temporary db");
5868 let schema_manager = Arc::new(SchemaManager::new());
5869
5870 {
5871 let conn = open_connection_with_vec(db.path()).expect("vec connection");
5872 schema_manager.bootstrap(&conn).expect("bootstrap");
5873 schema_manager
5874 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
5875 .expect("ensure profile");
5876 }
5877
5878 let writer = WriterActor::start(
5879 db.path(),
5880 Arc::clone(&schema_manager),
5881 ProvenanceMode::Warn,
5882 Arc::new(TelemetryCounters::default()),
5883 )
5884 .expect("writer");
5885
5886 writer
5888 .submit(WriteRequest {
5889 label: "setup".to_owned(),
5890 nodes: vec![NodeInsert {
5891 row_id: "row-retire-vec".to_owned(),
5892 logical_id: "node-retire-vec".to_owned(),
5893 kind: "Doc".to_owned(),
5894 properties: "{}".to_owned(),
5895 source_ref: Some("src".to_owned()),
5896 upsert: false,
5897 chunk_policy: ChunkPolicy::Preserve,
5898 }],
5899 node_retires: vec![],
5900 edges: vec![],
5901 edge_retires: vec![],
5902 chunks: vec![ChunkInsert {
5903 id: "chunk-retire-vec".to_owned(),
5904 node_logical_id: "node-retire-vec".to_owned(),
5905 text_content: "text".to_owned(),
5906 byte_start: None,
5907 byte_end: None,
5908 }],
5909 runs: vec![],
5910 steps: vec![],
5911 actions: vec![],
5912 optional_backfills: vec![],
5913 vec_inserts: vec![VecInsert {
5914 chunk_id: "chunk-retire-vec".to_owned(),
5915 embedding: vec![0.1, 0.2, 0.3],
5916 }],
5917 operational_writes: vec![],
5918 })
5919 .expect("setup write");
5920
5921 writer
5923 .submit(WriteRequest {
5924 label: "retire".to_owned(),
5925 nodes: vec![],
5926 node_retires: vec![NodeRetire {
5927 logical_id: "node-retire-vec".to_owned(),
5928 source_ref: Some("src".to_owned()),
5929 }],
5930 edges: vec![],
5931 edge_retires: vec![],
5932 chunks: vec![],
5933 runs: vec![],
5934 steps: vec![],
5935 actions: vec![],
5936 optional_backfills: vec![],
5937 vec_inserts: vec![],
5938 operational_writes: vec![],
5939 })
5940 .expect("retire write");
5941
5942 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5943 let count: i64 = conn
5944 .query_row(
5945 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-retire-vec'",
5946 [],
5947 |row| row.get(0),
5948 )
5949 .expect("count");
5950 assert_eq!(
5951 count, 1,
5952 "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
5953 );
5954 }
5955
5956 #[cfg(feature = "sqlite-vec")]
5957 #[test]
5958 fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
5959 use crate::sqlite::open_connection_with_vec;
5960
5961 let db = NamedTempFile::new().expect("temporary db");
5962 let schema_manager = Arc::new(SchemaManager::new());
5963
5964 {
5965 let conn = open_connection_with_vec(db.path()).expect("vec connection");
5966 schema_manager.bootstrap(&conn).expect("bootstrap");
5967 schema_manager
5968 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
5969 .expect("ensure profile");
5970 }
5971
5972 let writer = WriterActor::start(
5973 db.path(),
5974 Arc::clone(&schema_manager),
5975 ProvenanceMode::Warn,
5976 Arc::new(TelemetryCounters::default()),
5977 )
5978 .expect("writer");
5979
5980 writer
5982 .submit(WriteRequest {
5983 label: "v1".to_owned(),
5984 nodes: vec![NodeInsert {
5985 row_id: "row-replace-v1".to_owned(),
5986 logical_id: "node-replace-vec".to_owned(),
5987 kind: "Doc".to_owned(),
5988 properties: "{}".to_owned(),
5989 source_ref: Some("src".to_owned()),
5990 upsert: false,
5991 chunk_policy: ChunkPolicy::Preserve,
5992 }],
5993 node_retires: vec![],
5994 edges: vec![],
5995 edge_retires: vec![],
5996 chunks: vec![ChunkInsert {
5997 id: "chunk-replace-A".to_owned(),
5998 node_logical_id: "node-replace-vec".to_owned(),
5999 text_content: "version one".to_owned(),
6000 byte_start: None,
6001 byte_end: None,
6002 }],
6003 runs: vec![],
6004 steps: vec![],
6005 actions: vec![],
6006 optional_backfills: vec![],
6007 vec_inserts: vec![VecInsert {
6008 chunk_id: "chunk-replace-A".to_owned(),
6009 embedding: vec![0.1, 0.2, 0.3],
6010 }],
6011 operational_writes: vec![],
6012 })
6013 .expect("v1 write");
6014
6015 writer
6017 .submit(WriteRequest {
6018 label: "v2".to_owned(),
6019 nodes: vec![NodeInsert {
6020 row_id: "row-replace-v2".to_owned(),
6021 logical_id: "node-replace-vec".to_owned(),
6022 kind: "Doc".to_owned(),
6023 properties: "{}".to_owned(),
6024 source_ref: Some("src".to_owned()),
6025 upsert: true,
6026 chunk_policy: ChunkPolicy::Replace,
6027 }],
6028 node_retires: vec![],
6029 edges: vec![],
6030 edge_retires: vec![],
6031 chunks: vec![ChunkInsert {
6032 id: "chunk-replace-B".to_owned(),
6033 node_logical_id: "node-replace-vec".to_owned(),
6034 text_content: "version two".to_owned(),
6035 byte_start: None,
6036 byte_end: None,
6037 }],
6038 runs: vec![],
6039 steps: vec![],
6040 actions: vec![],
6041 optional_backfills: vec![],
6042 vec_inserts: vec![VecInsert {
6043 chunk_id: "chunk-replace-B".to_owned(),
6044 embedding: vec![0.4, 0.5, 0.6],
6045 }],
6046 operational_writes: vec![],
6047 })
6048 .expect("v2 write");
6049
6050 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6051 let count_a: i64 = conn
6052 .query_row(
6053 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-A'",
6054 [],
6055 |row| row.get(0),
6056 )
6057 .expect("count A");
6058 let count_b: i64 = conn
6059 .query_row(
6060 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-B'",
6061 [],
6062 |row| row.get(0),
6063 )
6064 .expect("count B");
6065 assert_eq!(
6066 count_a, 0,
6067 "old vec row (chunk-A) must be deleted on Replace"
6068 );
6069 assert_eq!(
6070 count_b, 1,
6071 "new vec row (chunk-B) must be present after Replace"
6072 );
6073 }
6074
6075 #[cfg(feature = "sqlite-vec")]
6076 #[test]
6077 fn vec_insert_is_persisted_when_feature_enabled() {
6078 use crate::sqlite::open_connection_with_vec;
6079
6080 let db = NamedTempFile::new().expect("temporary db");
6081 let schema_manager = Arc::new(SchemaManager::new());
6082
6083 {
6085 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6086 schema_manager.bootstrap(&conn).expect("bootstrap");
6087 schema_manager
6088 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6089 .expect("ensure profile");
6090 }
6091
6092 let writer = WriterActor::start(
6093 db.path(),
6094 Arc::clone(&schema_manager),
6095 ProvenanceMode::Warn,
6096 Arc::new(TelemetryCounters::default()),
6097 )
6098 .expect("writer");
6099
6100 writer
6101 .submit(WriteRequest {
6102 label: "vec-insert".to_owned(),
6103 nodes: vec![],
6104 node_retires: vec![],
6105 edges: vec![],
6106 edge_retires: vec![],
6107 chunks: vec![],
6108 runs: vec![],
6109 steps: vec![],
6110 actions: vec![],
6111 optional_backfills: vec![],
6112 vec_inserts: vec![VecInsert {
6113 chunk_id: "chunk-vec".to_owned(),
6114 embedding: vec![0.1, 0.2, 0.3],
6115 }],
6116 operational_writes: vec![],
6117 })
6118 .expect("vec insert write");
6119
6120 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6121 let count: i64 = conn
6122 .query_row(
6123 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-vec'",
6124 [],
6125 |row| row.get(0),
6126 )
6127 .expect("count");
6128 assert_eq!(count, 1, "VecInsert must persist a row in vec_nodes_active");
6129 }
6130
6131 #[test]
6134 fn write_request_exceeding_node_limit_is_rejected() {
6135 let nodes: Vec<NodeInsert> = (0..10_001)
6136 .map(|i| NodeInsert {
6137 row_id: format!("row-{i}"),
6138 logical_id: format!("lg-{i}"),
6139 kind: "Note".to_owned(),
6140 properties: "{}".to_owned(),
6141 source_ref: None,
6142 upsert: false,
6143 chunk_policy: ChunkPolicy::Preserve,
6144 })
6145 .collect();
6146
6147 let request = WriteRequest {
6148 label: "too-many-nodes".to_owned(),
6149 nodes,
6150 node_retires: vec![],
6151 edges: vec![],
6152 edge_retires: vec![],
6153 chunks: vec![],
6154 runs: vec![],
6155 steps: vec![],
6156 actions: vec![],
6157 optional_backfills: vec![],
6158 vec_inserts: vec![],
6159 operational_writes: vec![],
6160 };
6161
6162 let result = prepare_write(request, ProvenanceMode::Warn)
6163 .map(|_| ())
6164 .map_err(|e| format!("{e}"));
6165 assert!(
6166 matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6167 "exceeding node limit must return InvalidWrite: got {result:?}"
6168 );
6169 }
6170
6171 #[test]
6172 fn write_request_exceeding_total_limit_is_rejected() {
6173 let request = WriteRequest {
6177 label: "too-many-total".to_owned(),
6178 nodes: (0..10_000)
6179 .map(|i| NodeInsert {
6180 row_id: format!("row-{i}"),
6181 logical_id: format!("lg-{i}"),
6182 kind: "Note".to_owned(),
6183 properties: "{}".to_owned(),
6184 source_ref: None,
6185 upsert: false,
6186 chunk_policy: ChunkPolicy::Preserve,
6187 })
6188 .collect(),
6189 node_retires: vec![],
6190 edges: (0..10_000)
6191 .map(|i| EdgeInsert {
6192 row_id: format!("edge-row-{i}"),
6193 logical_id: format!("edge-lg-{i}"),
6194 kind: "link".to_owned(),
6195 source_logical_id: format!("lg-{i}"),
6196 target_logical_id: format!("lg-{}", i + 1),
6197 properties: "{}".to_owned(),
6198 source_ref: None,
6199 upsert: false,
6200 })
6201 .collect(),
6202 edge_retires: vec![],
6203 chunks: (0..50_000)
6204 .map(|i| ChunkInsert {
6205 id: format!("chunk-{i}"),
6206 node_logical_id: "lg-0".to_owned(),
6207 text_content: "text".to_owned(),
6208 byte_start: None,
6209 byte_end: None,
6210 })
6211 .collect(),
6212 runs: vec![],
6213 steps: vec![],
6214 actions: vec![],
6215 optional_backfills: vec![],
6216 vec_inserts: (0..20_001)
6217 .map(|i| VecInsert {
6218 chunk_id: format!("vec-chunk-{i}"),
6219 embedding: vec![0.1],
6220 })
6221 .collect(),
6222 operational_writes: (0..10_000)
6223 .map(|i| OperationalWrite::Append {
6224 collection: format!("col-{i}"),
6225 record_key: format!("key-{i}"),
6226 payload_json: "{}".to_owned(),
6227 source_ref: None,
6228 })
6229 .collect(),
6230 };
6231
6232 let result = prepare_write(request, ProvenanceMode::Warn)
6233 .map(|_| ())
6234 .map_err(|e| format!("{e}"));
6235 assert!(
6236 matches!(result, Err(ref msg) if msg.contains("too many total items")),
6237 "exceeding total item limit must return InvalidWrite: got {result:?}"
6238 );
6239 }
6240
6241 #[test]
6242 fn write_request_within_limits_succeeds() {
6243 let db = NamedTempFile::new().expect("temporary db");
6244 let writer = WriterActor::start(
6245 db.path(),
6246 Arc::new(SchemaManager::new()),
6247 ProvenanceMode::Warn,
6248 Arc::new(TelemetryCounters::default()),
6249 )
6250 .expect("writer");
6251
6252 let result = writer.submit(WriteRequest {
6253 label: "within-limits".to_owned(),
6254 nodes: vec![NodeInsert {
6255 row_id: "row-1".to_owned(),
6256 logical_id: "lg-1".to_owned(),
6257 kind: "Note".to_owned(),
6258 properties: "{}".to_owned(),
6259 source_ref: None,
6260 upsert: false,
6261 chunk_policy: ChunkPolicy::Preserve,
6262 }],
6263 node_retires: vec![],
6264 edges: vec![],
6265 edge_retires: vec![],
6266 chunks: vec![],
6267 runs: vec![],
6268 steps: vec![],
6269 actions: vec![],
6270 optional_backfills: vec![],
6271 vec_inserts: vec![],
6272 operational_writes: vec![],
6273 });
6274
6275 assert!(
6276 result.is_ok(),
6277 "write request within limits must succeed: got {result:?}"
6278 );
6279 }
6280}