1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::SchemaManager;
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14 OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15 OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16 OperationalValidationMode, extract_secondary_index_entries_for_current,
17 extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18 parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26 pub target: ProjectionTarget,
28 pub payload: String,
30}
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35 #[default]
37 Preserve,
38 Replace,
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45 #[default]
47 Warn,
48 Require,
51}
52
53#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56 pub row_id: String,
57 pub logical_id: String,
58 pub kind: String,
59 pub properties: String,
60 pub source_ref: Option<String>,
61 pub upsert: bool,
64 pub chunk_policy: ChunkPolicy,
66 pub content_ref: Option<String>,
68}
69
70#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EdgeInsert {
73 pub row_id: String,
74 pub logical_id: String,
75 pub source_logical_id: String,
76 pub target_logical_id: String,
77 pub kind: String,
78 pub properties: String,
79 pub source_ref: Option<String>,
80 pub upsert: bool,
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
87pub struct NodeRetire {
88 pub logical_id: String,
89 pub source_ref: Option<String>,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeRetire {
95 pub logical_id: String,
96 pub source_ref: Option<String>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq)]
101pub struct ChunkInsert {
102 pub id: String,
103 pub node_logical_id: String,
104 pub text_content: String,
105 pub byte_start: Option<i64>,
106 pub byte_end: Option<i64>,
107 pub content_hash: Option<String>,
109}
110
111#[derive(Clone, Debug, PartialEq)]
118pub struct VecInsert {
119 pub chunk_id: String,
120 pub embedding: Vec<f32>,
121}
122
123#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum OperationalWrite {
126 Append {
127 collection: String,
128 record_key: String,
129 payload_json: String,
130 source_ref: Option<String>,
131 },
132 Put {
133 collection: String,
134 record_key: String,
135 payload_json: String,
136 source_ref: Option<String>,
137 },
138 Delete {
139 collection: String,
140 record_key: String,
141 source_ref: Option<String>,
142 },
143}
144
145#[derive(Clone, Debug, PartialEq, Eq)]
147pub struct RunInsert {
148 pub id: String,
149 pub kind: String,
150 pub status: String,
151 pub properties: String,
152 pub source_ref: Option<String>,
153 pub upsert: bool,
154 pub supersedes_id: Option<String>,
155}
156
157#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepInsert {
160 pub id: String,
161 pub run_id: String,
162 pub kind: String,
163 pub status: String,
164 pub properties: String,
165 pub source_ref: Option<String>,
166 pub upsert: bool,
167 pub supersedes_id: Option<String>,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct ActionInsert {
173 pub id: String,
174 pub step_id: String,
175 pub kind: String,
176 pub status: String,
177 pub properties: String,
178 pub source_ref: Option<String>,
179 pub upsert: bool,
180 pub supersedes_id: Option<String>,
181}
182
183const MAX_NODES: usize = 10_000;
185const MAX_EDGES: usize = 10_000;
186const MAX_CHUNKS: usize = 50_000;
187const MAX_RETIRES: usize = 10_000;
188const MAX_RUNTIME_ITEMS: usize = 10_000;
189const MAX_OPERATIONAL: usize = 10_000;
190const MAX_TOTAL_ITEMS: usize = 100_000;
191
192const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
199
200#[derive(Clone, Debug, PartialEq)]
202pub struct WriteRequest {
203 pub label: String,
204 pub nodes: Vec<NodeInsert>,
205 pub node_retires: Vec<NodeRetire>,
206 pub edges: Vec<EdgeInsert>,
207 pub edge_retires: Vec<EdgeRetire>,
208 pub chunks: Vec<ChunkInsert>,
209 pub runs: Vec<RunInsert>,
210 pub steps: Vec<StepInsert>,
211 pub actions: Vec<ActionInsert>,
212 pub optional_backfills: Vec<OptionalProjectionTask>,
213 pub vec_inserts: Vec<VecInsert>,
216 pub operational_writes: Vec<OperationalWrite>,
217}
218
219#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct WriteReceipt {
222 pub label: String,
223 pub optional_backfill_count: usize,
224 pub warnings: Vec<String>,
225 pub provenance_warnings: Vec<String>,
226}
227
228#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct LastAccessTouchRequest {
231 pub logical_ids: Vec<String>,
232 pub touched_at: i64,
233 pub source_ref: Option<String>,
234}
235
236#[derive(Clone, Debug, PartialEq, Eq)]
238pub struct LastAccessTouchReport {
239 pub touched_logical_ids: usize,
240 pub touched_at: i64,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct FtsProjectionRow {
245 chunk_id: String,
246 node_logical_id: String,
247 kind: String,
248 text_content: String,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
252struct FtsPropertyProjectionRow {
253 node_logical_id: String,
254 kind: String,
255 text_content: String,
256 positions: Vec<PositionEntry>,
257}
258
259pub(crate) const MAX_RECURSIVE_DEPTH: usize = 8;
262
263pub(crate) const MAX_EXTRACTED_BYTES: usize = 65_536;
273
274pub(crate) const LEAF_SEPARATOR: &str = " fathomdbphrasebreaksentinel ";
301
302#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
305pub(crate) enum PropertyPathMode {
306 #[default]
309 Scalar,
310 Recursive,
313}
314
315#[derive(Clone, Debug, PartialEq, Eq)]
317pub(crate) struct PropertyPathEntry {
318 pub path: String,
319 pub mode: PropertyPathMode,
320}
321
322impl PropertyPathEntry {
323 pub(crate) fn scalar(path: impl Into<String>) -> Self {
324 Self {
325 path: path.into(),
326 mode: PropertyPathMode::Scalar,
327 }
328 }
329
330 #[cfg(test)]
331 pub(crate) fn recursive(path: impl Into<String>) -> Self {
332 Self {
333 path: path.into(),
334 mode: PropertyPathMode::Recursive,
335 }
336 }
337}
338
339#[derive(Clone, Debug, PartialEq, Eq)]
341pub(crate) struct PropertyFtsSchema {
342 pub paths: Vec<PropertyPathEntry>,
345 pub separator: String,
348 pub exclude_paths: Vec<String>,
357}
358
359#[derive(Clone, Debug, PartialEq, Eq)]
363pub(crate) struct PositionEntry {
364 pub start_offset: usize,
365 pub end_offset: usize,
366 pub leaf_path: String,
367}
368
369#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
372pub(crate) struct ExtractStats {
373 pub depth_cap_hit: usize,
374 pub byte_cap_reached: bool,
379 pub excluded_subtree: usize,
380}
381
382impl ExtractStats {
383 fn merge(&mut self, other: ExtractStats) {
384 self.depth_cap_hit += other.depth_cap_hit;
385 self.byte_cap_reached |= other.byte_cap_reached;
386 self.excluded_subtree += other.excluded_subtree;
387 }
388}
389
390struct PreparedWrite {
391 label: String,
392 nodes: Vec<NodeInsert>,
393 node_retires: Vec<NodeRetire>,
394 edges: Vec<EdgeInsert>,
395 edge_retires: Vec<EdgeRetire>,
396 chunks: Vec<ChunkInsert>,
397 runs: Vec<RunInsert>,
398 steps: Vec<StepInsert>,
399 actions: Vec<ActionInsert>,
400 #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
403 vec_inserts: Vec<VecInsert>,
404 operational_writes: Vec<OperationalWrite>,
405 operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
406 operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
407 operational_validation_warnings: Vec<String>,
408 node_kinds: HashMap<String, String>,
411 required_fts_rows: Vec<FtsProjectionRow>,
413 required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
415 optional_backfills: Vec<OptionalProjectionTask>,
416}
417
418enum WriteMessage {
419 Submit {
420 prepared: Box<PreparedWrite>,
421 reply: Sender<Result<WriteReceipt, EngineError>>,
422 },
423 TouchLastAccessed {
424 request: LastAccessTouchRequest,
425 reply: Sender<Result<LastAccessTouchReport, EngineError>>,
426 },
427}
428
429#[derive(Debug)]
434pub struct WriterActor {
435 sender: ManuallyDrop<SyncSender<WriteMessage>>,
436 thread_handle: Option<thread::JoinHandle<()>>,
437 provenance_mode: ProvenanceMode,
438 _telemetry: Arc<TelemetryCounters>,
440}
441
442impl WriterActor {
443 pub fn start(
446 path: impl AsRef<Path>,
447 schema_manager: Arc<SchemaManager>,
448 provenance_mode: ProvenanceMode,
449 telemetry: Arc<TelemetryCounters>,
450 ) -> Result<Self, EngineError> {
451 let database_path = path.as_ref().to_path_buf();
452 let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
453
454 let writer_telemetry = Arc::clone(&telemetry);
455 let handle = thread::Builder::new()
456 .name("fathomdb-writer".to_owned())
457 .spawn(move || {
458 writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
459 })
460 .map_err(EngineError::Io)?;
461
462 Ok(Self {
463 sender: ManuallyDrop::new(sender),
464 thread_handle: Some(handle),
465 provenance_mode,
466 _telemetry: telemetry,
467 })
468 }
469
470 fn is_thread_alive(&self) -> bool {
472 self.thread_handle
473 .as_ref()
474 .is_some_and(|h| !h.is_finished())
475 }
476
477 fn check_thread_alive(&self) -> Result<(), EngineError> {
479 if self.is_thread_alive() {
480 Ok(())
481 } else {
482 Err(EngineError::WriterRejected(
483 "writer thread has exited".to_owned(),
484 ))
485 }
486 }
487
488 pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
492 self.check_thread_alive()?;
493 let prepared = prepare_write(request, self.provenance_mode)?;
494 let (reply_tx, reply_rx) = mpsc::channel();
495 self.sender
496 .send(WriteMessage::Submit {
497 prepared: Box::new(prepared),
498 reply: reply_tx,
499 })
500 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
501
502 recv_with_timeout(&reply_rx)
503 }
504
505 pub fn touch_last_accessed(
509 &self,
510 request: LastAccessTouchRequest,
511 ) -> Result<LastAccessTouchReport, EngineError> {
512 self.check_thread_alive()?;
513 prepare_touch_last_accessed(&request, self.provenance_mode)?;
514 let (reply_tx, reply_rx) = mpsc::channel();
515 self.sender
516 .send(WriteMessage::TouchLastAccessed {
517 request,
518 reply: reply_tx,
519 })
520 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
521
522 recv_with_timeout(&reply_rx)
523 }
524}
525
526#[cfg(not(feature = "tracing"))]
527#[allow(clippy::print_stderr)]
528fn stderr_panic_notice() {
529 eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
530}
531
532impl Drop for WriterActor {
533 fn drop(&mut self) {
534 unsafe { ManuallyDrop::drop(&mut self.sender) };
541
542 if let Some(handle) = self.thread_handle.take() {
544 match handle.join() {
545 Ok(()) => {}
546 Err(payload) => {
547 if std::thread::panicking() {
548 trace_warn!(
549 "writer thread panicked during shutdown (suppressed: already panicking)"
550 );
551 #[cfg(not(feature = "tracing"))]
552 stderr_panic_notice();
553 } else {
554 std::panic::resume_unwind(payload);
555 }
556 }
557 }
558 }
559 }
560}
561
562fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
564 rx.recv_timeout(WRITER_REPLY_TIMEOUT)
565 .map_err(|error| match error {
566 mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
567 "write timed out waiting for writer thread reply — the write may still commit"
568 .to_owned(),
569 ),
570 mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
571 })
572 .and_then(|result| result)
573}
574
575fn prepare_touch_last_accessed(
576 request: &LastAccessTouchRequest,
577 mode: ProvenanceMode,
578) -> Result<(), EngineError> {
579 if request.logical_ids.is_empty() {
580 return Err(EngineError::InvalidWrite(
581 "touch_last_accessed requires at least one logical_id".to_owned(),
582 ));
583 }
584 for logical_id in &request.logical_ids {
585 if logical_id.trim().is_empty() {
586 return Err(EngineError::InvalidWrite(
587 "touch_last_accessed requires non-empty logical_ids".to_owned(),
588 ));
589 }
590 }
591 if mode == ProvenanceMode::Require && request.source_ref.is_none() {
592 return Err(EngineError::InvalidWrite(
593 "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
594 .to_owned(),
595 ));
596 }
597 Ok(())
598}
599
600fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
601 let missing: Vec<String> = request
602 .nodes
603 .iter()
604 .filter(|n| n.source_ref.is_none())
605 .map(|n| format!("node '{}'", n.logical_id))
606 .chain(
607 request
608 .node_retires
609 .iter()
610 .filter(|r| r.source_ref.is_none())
611 .map(|r| format!("node retire '{}'", r.logical_id)),
612 )
613 .chain(
614 request
615 .edges
616 .iter()
617 .filter(|e| e.source_ref.is_none())
618 .map(|e| format!("edge '{}'", e.logical_id)),
619 )
620 .chain(
621 request
622 .edge_retires
623 .iter()
624 .filter(|r| r.source_ref.is_none())
625 .map(|r| format!("edge retire '{}'", r.logical_id)),
626 )
627 .chain(
628 request
629 .runs
630 .iter()
631 .filter(|r| r.source_ref.is_none())
632 .map(|r| format!("run '{}'", r.id)),
633 )
634 .chain(
635 request
636 .steps
637 .iter()
638 .filter(|s| s.source_ref.is_none())
639 .map(|s| format!("step '{}'", s.id)),
640 )
641 .chain(
642 request
643 .actions
644 .iter()
645 .filter(|a| a.source_ref.is_none())
646 .map(|a| format!("action '{}'", a.id)),
647 )
648 .chain(
649 request
650 .operational_writes
651 .iter()
652 .filter(|write| operational_write_source_ref(write).is_none())
653 .map(|write| {
654 format!(
655 "operational {} '{}:{}'",
656 operational_write_kind(write),
657 operational_write_collection(write),
658 operational_write_record_key(write)
659 )
660 }),
661 )
662 .collect();
663
664 if missing.is_empty() {
665 Ok(())
666 } else {
667 Err(EngineError::InvalidWrite(format!(
668 "ProvenanceMode::Require: missing source_ref on: {}",
669 missing.join(", ")
670 )))
671 }
672}
673
674fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
675 if request.nodes.len() > MAX_NODES {
676 return Err(EngineError::InvalidWrite(format!(
677 "too many nodes: {} exceeds limit of {MAX_NODES}",
678 request.nodes.len()
679 )));
680 }
681 if request.edges.len() > MAX_EDGES {
682 return Err(EngineError::InvalidWrite(format!(
683 "too many edges: {} exceeds limit of {MAX_EDGES}",
684 request.edges.len()
685 )));
686 }
687 if request.chunks.len() > MAX_CHUNKS {
688 return Err(EngineError::InvalidWrite(format!(
689 "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
690 request.chunks.len()
691 )));
692 }
693 let retires = request.node_retires.len() + request.edge_retires.len();
694 if retires > MAX_RETIRES {
695 return Err(EngineError::InvalidWrite(format!(
696 "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
697 )));
698 }
699 let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
700 if runtime_items > MAX_RUNTIME_ITEMS {
701 return Err(EngineError::InvalidWrite(format!(
702 "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
703 )));
704 }
705 if request.operational_writes.len() > MAX_OPERATIONAL {
706 return Err(EngineError::InvalidWrite(format!(
707 "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
708 request.operational_writes.len()
709 )));
710 }
711 let total = request.nodes.len()
712 + request.node_retires.len()
713 + request.edges.len()
714 + request.edge_retires.len()
715 + request.chunks.len()
716 + request.runs.len()
717 + request.steps.len()
718 + request.actions.len()
719 + request.vec_inserts.len()
720 + request.operational_writes.len();
721 if total > MAX_TOTAL_ITEMS {
722 return Err(EngineError::InvalidWrite(format!(
723 "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
724 )));
725 }
726 Ok(())
727}
728
729#[allow(clippy::too_many_lines)]
730fn prepare_write(
731 request: WriteRequest,
732 mode: ProvenanceMode,
733) -> Result<PreparedWrite, EngineError> {
734 validate_request_size(&request)?;
735
736 for node in &request.nodes {
738 if node.row_id.is_empty() {
739 return Err(EngineError::InvalidWrite(
740 "NodeInsert has empty row_id".to_owned(),
741 ));
742 }
743 if node.logical_id.is_empty() {
744 return Err(EngineError::InvalidWrite(
745 "NodeInsert has empty logical_id".to_owned(),
746 ));
747 }
748 }
749 for edge in &request.edges {
750 if edge.row_id.is_empty() {
751 return Err(EngineError::InvalidWrite(
752 "EdgeInsert has empty row_id".to_owned(),
753 ));
754 }
755 if edge.logical_id.is_empty() {
756 return Err(EngineError::InvalidWrite(
757 "EdgeInsert has empty logical_id".to_owned(),
758 ));
759 }
760 }
761 for chunk in &request.chunks {
762 if chunk.id.is_empty() {
763 return Err(EngineError::InvalidWrite(
764 "ChunkInsert has empty id".to_owned(),
765 ));
766 }
767 if chunk.text_content.is_empty() {
768 return Err(EngineError::InvalidWrite(format!(
769 "chunk '{}' has empty text_content; empty chunks are not allowed",
770 chunk.id
771 )));
772 }
773 }
774 for run in &request.runs {
775 if run.id.is_empty() {
776 return Err(EngineError::InvalidWrite(
777 "RunInsert has empty id".to_owned(),
778 ));
779 }
780 }
781 for step in &request.steps {
782 if step.id.is_empty() {
783 return Err(EngineError::InvalidWrite(
784 "StepInsert has empty id".to_owned(),
785 ));
786 }
787 }
788 for action in &request.actions {
789 if action.id.is_empty() {
790 return Err(EngineError::InvalidWrite(
791 "ActionInsert has empty id".to_owned(),
792 ));
793 }
794 }
795 for vi in &request.vec_inserts {
796 if vi.chunk_id.is_empty() {
797 return Err(EngineError::InvalidWrite(
798 "VecInsert has empty chunk_id".to_owned(),
799 ));
800 }
801 if vi.embedding.is_empty() {
802 return Err(EngineError::InvalidWrite(format!(
803 "VecInsert for chunk '{}' has empty embedding",
804 vi.chunk_id
805 )));
806 }
807 }
808 for operational in &request.operational_writes {
809 if operational_write_collection(operational).is_empty() {
810 return Err(EngineError::InvalidWrite(
811 "OperationalWrite has empty collection".to_owned(),
812 ));
813 }
814 if operational_write_record_key(operational).is_empty() {
815 return Err(EngineError::InvalidWrite(format!(
816 "OperationalWrite for collection '{}' has empty record_key",
817 operational_write_collection(operational)
818 )));
819 }
820 match operational {
821 OperationalWrite::Append { payload_json, .. }
822 | OperationalWrite::Put { payload_json, .. } => {
823 if payload_json.is_empty() {
824 return Err(EngineError::InvalidWrite(format!(
825 "OperationalWrite {} '{}:{}' has empty payload_json",
826 operational_write_kind(operational),
827 operational_write_collection(operational),
828 operational_write_record_key(operational)
829 )));
830 }
831 }
832 OperationalWrite::Delete { .. } => {}
833 }
834 }
835
836 {
838 let mut seen = std::collections::HashSet::new();
839 for node in &request.nodes {
840 if !seen.insert(node.row_id.as_str()) {
841 return Err(EngineError::InvalidWrite(format!(
842 "duplicate row_id '{}' within the same WriteRequest",
843 node.row_id
844 )));
845 }
846 }
847 for edge in &request.edges {
848 if !seen.insert(edge.row_id.as_str()) {
849 return Err(EngineError::InvalidWrite(format!(
850 "duplicate row_id '{}' within the same WriteRequest",
851 edge.row_id
852 )));
853 }
854 }
855 }
856
857 if mode == ProvenanceMode::Require {
859 check_require_provenance(&request)?;
860 }
861
862 for run in &request.runs {
864 if run.upsert && run.supersedes_id.is_none() {
865 return Err(EngineError::InvalidWrite(format!(
866 "run '{}': upsert=true requires supersedes_id to be set",
867 run.id
868 )));
869 }
870 }
871 for step in &request.steps {
872 if step.upsert && step.supersedes_id.is_none() {
873 return Err(EngineError::InvalidWrite(format!(
874 "step '{}': upsert=true requires supersedes_id to be set",
875 step.id
876 )));
877 }
878 }
879 for action in &request.actions {
880 if action.upsert && action.supersedes_id.is_none() {
881 return Err(EngineError::InvalidWrite(format!(
882 "action '{}': upsert=true requires supersedes_id to be set",
883 action.id
884 )));
885 }
886 }
887
888 let node_kinds = request
889 .nodes
890 .iter()
891 .map(|node| (node.logical_id.clone(), node.kind.clone()))
892 .collect::<HashMap<_, _>>();
893
894 Ok(PreparedWrite {
895 label: request.label,
896 nodes: request.nodes,
897 node_retires: request.node_retires,
898 edges: request.edges,
899 edge_retires: request.edge_retires,
900 chunks: request.chunks,
901 runs: request.runs,
902 steps: request.steps,
903 actions: request.actions,
904 vec_inserts: request.vec_inserts,
905 operational_writes: request.operational_writes,
906 operational_collection_kinds: HashMap::new(),
907 operational_collection_filter_fields: HashMap::new(),
908 operational_validation_warnings: Vec::new(),
909 node_kinds,
910 required_fts_rows: Vec::new(),
911 required_property_fts_rows: Vec::new(),
912 optional_backfills: request.optional_backfills,
913 })
914}
915
916fn writer_loop(
917 database_path: &Path,
918 schema_manager: &Arc<SchemaManager>,
919 receiver: mpsc::Receiver<WriteMessage>,
920 telemetry: &TelemetryCounters,
921) {
922 trace_info!("writer thread started");
923
924 let mut conn = match sqlite::open_connection(database_path) {
925 Ok(conn) => conn,
926 Err(error) => {
927 trace_error!(error = %error, "writer thread: database connection failed");
928 reject_all(receiver, &error.to_string());
929 return;
930 }
931 };
932
933 if let Err(error) = schema_manager.bootstrap(&conn) {
934 trace_error!(error = %error, "writer thread: schema bootstrap failed");
935 reject_all(receiver, &error.to_string());
936 return;
937 }
938
939 for message in receiver {
940 match message {
941 WriteMessage::Submit {
942 mut prepared,
943 reply,
944 } => {
945 #[cfg(feature = "tracing")]
946 let start = std::time::Instant::now();
947 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
948 resolve_and_apply(&mut conn, &mut prepared)
949 }));
950 if let Ok(inner) = result {
951 #[allow(unused_variables)]
952 if let Err(error) = &inner {
953 trace_error!(
954 label = %prepared.label,
955 error = %error,
956 "write failed"
957 );
958 telemetry.increment_errors();
959 } else {
960 let row_count = (prepared.nodes.len()
961 + prepared.edges.len()
962 + prepared.chunks.len()) as u64;
963 telemetry.increment_writes(row_count);
964 trace_info!(
965 label = %prepared.label,
966 nodes = prepared.nodes.len(),
967 edges = prepared.edges.len(),
968 chunks = prepared.chunks.len(),
969 duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
970 "write committed"
971 );
972 }
973 let _ = reply.send(inner);
974 } else {
975 trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
976 telemetry.increment_errors();
977 let _ = conn.execute_batch("ROLLBACK");
979 let _ = reply.send(Err(EngineError::WriterRejected(
980 "writer thread panic during resolve_and_apply".to_owned(),
981 )));
982 }
983 }
984 WriteMessage::TouchLastAccessed { request, reply } => {
985 let result = apply_touch_last_accessed(&mut conn, &request);
986 if result.is_ok() {
987 telemetry.increment_writes(0);
988 } else {
989 telemetry.increment_errors();
990 }
991 let _ = reply.send(result);
992 }
993 }
994 }
995
996 trace_info!("writer thread shutting down");
997}
998
999fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
1000 for message in receiver {
1001 match message {
1002 WriteMessage::Submit { reply, .. } => {
1003 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1004 }
1005 WriteMessage::TouchLastAccessed { reply, .. } => {
1006 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1007 }
1008 }
1009 }
1010}
1011
1012fn resolve_fts_rows(
1020 conn: &rusqlite::Connection,
1021 prepared: &mut PreparedWrite,
1022) -> Result<(), EngineError> {
1023 let retiring_ids: std::collections::HashSet<&str> = prepared
1024 .node_retires
1025 .iter()
1026 .map(|r| r.logical_id.as_str())
1027 .collect();
1028 for chunk in &prepared.chunks {
1029 if retiring_ids.contains(chunk.node_logical_id.as_str()) {
1030 return Err(EngineError::InvalidWrite(format!(
1031 "chunk '{}' references node_logical_id '{}' which is being retired in the same \
1032 WriteRequest; retire and chunk insertion for the same node must not be combined",
1033 chunk.id, chunk.node_logical_id
1034 )));
1035 }
1036 }
1037 for chunk in &prepared.chunks {
1038 let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
1039 k.clone()
1040 } else {
1041 match conn.query_row(
1042 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1043 params![chunk.node_logical_id],
1044 |row| row.get::<_, String>(0),
1045 ) {
1046 Ok(kind) => kind,
1047 Err(rusqlite::Error::QueryReturnedNoRows) => {
1048 return Err(EngineError::InvalidWrite(format!(
1049 "chunk '{}' references node_logical_id '{}' that is not present in this \
1050 write request or the database \
1051 (v1 limitation: chunks and their nodes must be submitted together or the \
1052 node must already exist)",
1053 chunk.id, chunk.node_logical_id
1054 )));
1055 }
1056 Err(e) => return Err(EngineError::Sqlite(e)),
1057 }
1058 };
1059 prepared.required_fts_rows.push(FtsProjectionRow {
1060 chunk_id: chunk.id.clone(),
1061 node_logical_id: chunk.node_logical_id.clone(),
1062 kind,
1063 text_content: chunk.text_content.clone(),
1064 });
1065 }
1066 trace_debug!(
1067 fts_rows = prepared.required_fts_rows.len(),
1068 chunks_processed = prepared.chunks.len(),
1069 "fts row resolution completed"
1070 );
1071 Ok(())
1072}
1073
1074fn resolve_property_fts_rows(
1077 conn: &rusqlite::Connection,
1078 prepared: &mut PreparedWrite,
1079) -> Result<(), EngineError> {
1080 if prepared.nodes.is_empty() {
1081 return Ok(());
1082 }
1083
1084 let schemas: HashMap<String, PropertyFtsSchema> =
1085 load_fts_property_schemas(conn)?.into_iter().collect();
1086
1087 if schemas.is_empty() {
1088 return Ok(());
1089 }
1090
1091 let mut combined_stats = ExtractStats::default();
1092 for node in &prepared.nodes {
1093 let Some(schema) = schemas.get(&node.kind) else {
1094 continue;
1095 };
1096 let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
1097 let (text_content, positions, stats) = extract_property_fts(&props, schema);
1098 combined_stats.merge(stats);
1099 if let Some(text_content) = text_content {
1100 prepared
1101 .required_property_fts_rows
1102 .push(FtsPropertyProjectionRow {
1103 node_logical_id: node.logical_id.clone(),
1104 kind: node.kind.clone(),
1105 text_content,
1106 positions,
1107 });
1108 }
1109 }
1110 if combined_stats != ExtractStats::default() {
1111 trace_debug!(
1112 depth_cap_hit = combined_stats.depth_cap_hit,
1113 byte_cap_reached = combined_stats.byte_cap_reached,
1114 excluded_subtree = combined_stats.excluded_subtree,
1115 "property fts recursive extraction guardrails engaged"
1116 );
1117 }
1118 trace_debug!(
1119 property_fts_rows = prepared.required_property_fts_rows.len(),
1120 nodes_processed = prepared.nodes.len(),
1121 "property fts row resolution completed"
1122 );
1123 Ok(())
1124}
1125
1126pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
1130 let Some(path) = path.strip_prefix("$.") else {
1131 return Vec::new();
1132 };
1133 let mut current = value;
1134 for segment in path.split('.') {
1135 match current.get(segment) {
1136 Some(v) => current = v,
1137 None => return Vec::new(),
1138 }
1139 }
1140 match current {
1141 serde_json::Value::String(s) => vec![s.clone()],
1142 serde_json::Value::Number(n) => vec![n.to_string()],
1143 serde_json::Value::Bool(b) => vec![b.to_string()],
1144 serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
1145 serde_json::Value::Array(arr) => arr
1146 .iter()
1147 .filter_map(|v| match v {
1148 serde_json::Value::String(s) => Some(s.clone()),
1149 serde_json::Value::Number(n) => Some(n.to_string()),
1150 serde_json::Value::Bool(b) => Some(b.to_string()),
1151 _ => None,
1152 })
1153 .collect(),
1154 }
1155}
1156
1157pub(crate) fn extract_property_fts(
1177 props: &serde_json::Value,
1178 schema: &PropertyFtsSchema,
1179) -> (Option<String>, Vec<PositionEntry>, ExtractStats) {
1180 let mut walker = RecursiveWalker {
1181 blob: String::new(),
1182 positions: Vec::new(),
1183 stats: ExtractStats::default(),
1184 exclude_paths: schema.exclude_paths.clone(),
1185 stopped: false,
1186 };
1187
1188 let mut scalar_parts: Vec<String> = Vec::new();
1189
1190 for entry in &schema.paths {
1191 match entry.mode {
1192 PropertyPathMode::Scalar => {
1193 scalar_parts.extend(extract_json_path(props, &entry.path));
1194 }
1195 PropertyPathMode::Recursive => {
1196 let root = resolve_path_root(props, &entry.path);
1197 if let Some(root) = root {
1198 walker.walk(&entry.path, root, 0);
1199 }
1200 }
1201 }
1202 }
1203
1204 let scalar_text = if scalar_parts.is_empty() {
1210 None
1211 } else {
1212 Some(scalar_parts.join(&schema.separator))
1213 };
1214
1215 let combined = match (scalar_text, walker.blob.is_empty()) {
1216 (None, true) => None,
1217 (None, false) => Some(walker.blob.clone()),
1218 (Some(s), true) => Some(s),
1219 (Some(mut s), false) => {
1220 let offset = s.len() + LEAF_SEPARATOR.len();
1223 for pos in &mut walker.positions {
1224 pos.start_offset += offset;
1225 pos.end_offset += offset;
1226 }
1227 s.push_str(LEAF_SEPARATOR);
1228 s.push_str(&walker.blob);
1229 Some(s)
1230 }
1231 };
1232
1233 (combined, walker.positions, walker.stats)
1234}
1235
1236fn resolve_path_root<'a>(
1237 value: &'a serde_json::Value,
1238 path: &str,
1239) -> Option<&'a serde_json::Value> {
1240 let stripped = path.strip_prefix("$.")?;
1241 let mut current = value;
1242 for segment in stripped.split('.') {
1243 current = current.get(segment)?;
1244 }
1245 Some(current)
1246}
1247
1248struct RecursiveWalker {
1249 blob: String,
1250 positions: Vec<PositionEntry>,
1251 stats: ExtractStats,
1252 exclude_paths: Vec<String>,
1253 stopped: bool,
1257}
1258
1259impl RecursiveWalker {
1260 fn walk(&mut self, current_path: &str, value: &serde_json::Value, depth: usize) {
1261 if self.stopped {
1262 return;
1263 }
1264 if self.exclude_paths.iter().any(|p| p == current_path) {
1265 self.stats.excluded_subtree += 1;
1266 return;
1267 }
1268 match value {
1269 serde_json::Value::String(s) => self.emit_leaf(current_path, s),
1270 serde_json::Value::Number(n) => self.emit_leaf(current_path, &n.to_string()),
1271 serde_json::Value::Bool(b) => self.emit_leaf(current_path, &b.to_string()),
1272 serde_json::Value::Null => {}
1273 serde_json::Value::Object(map) => {
1274 if depth >= MAX_RECURSIVE_DEPTH {
1275 self.stats.depth_cap_hit += 1;
1276 return;
1277 }
1278 let mut keys: Vec<&String> = map.keys().collect();
1279 keys.sort();
1280 for key in keys {
1281 if self.stopped {
1282 return;
1283 }
1284 let child_path = format!("{current_path}.{key}");
1285 if let Some(child) = map.get(key) {
1286 self.walk(&child_path, child, depth + 1);
1287 }
1288 }
1289 }
1290 serde_json::Value::Array(items) => {
1291 if depth >= MAX_RECURSIVE_DEPTH {
1292 self.stats.depth_cap_hit += 1;
1293 return;
1294 }
1295 for (idx, item) in items.iter().enumerate() {
1296 if self.stopped {
1297 return;
1298 }
1299 let child_path = format!("{current_path}[{idx}]");
1300 self.walk(&child_path, item, depth + 1);
1301 }
1302 }
1303 }
1304 }
1305
1306 fn emit_leaf(&mut self, leaf_path: &str, value: &str) {
1307 if self.stopped {
1308 return;
1309 }
1310 if value.is_empty() {
1311 return;
1312 }
1313 let sep_len = if self.blob.is_empty() {
1317 0
1318 } else {
1319 LEAF_SEPARATOR.len()
1320 };
1321 let projected_len = self.blob.len() + sep_len + value.len();
1322 if projected_len > MAX_EXTRACTED_BYTES {
1323 self.stats.byte_cap_reached = true;
1324 self.stopped = true;
1325 return;
1326 }
1327 if !self.blob.is_empty() {
1328 self.blob.push_str(LEAF_SEPARATOR);
1329 }
1330 let start_offset = self.blob.len();
1331 self.blob.push_str(value);
1332 let end_offset = self.blob.len();
1333 self.positions.push(PositionEntry {
1334 start_offset,
1335 end_offset,
1336 leaf_path: leaf_path.to_owned(),
1337 });
1338 }
1339}
1340
1341pub(crate) fn load_fts_property_schemas(
1347 conn: &rusqlite::Connection,
1348) -> Result<Vec<(String, PropertyFtsSchema)>, rusqlite::Error> {
1349 let mut stmt =
1350 conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
1351 stmt.query_map([], |row| {
1352 let kind: String = row.get(0)?;
1353 let paths_json: String = row.get(1)?;
1354 let separator: String = row.get(2)?;
1355 let schema = parse_property_schema_json(&paths_json, &separator);
1356 Ok((kind, schema))
1357 })?
1358 .collect::<Result<Vec<_>, _>>()
1359}
1360
1361pub(crate) fn parse_property_schema_json(paths_json: &str, separator: &str) -> PropertyFtsSchema {
1362 let value: serde_json::Value = serde_json::from_str(paths_json).unwrap_or_default();
1363 let mut paths = Vec::new();
1364 let mut exclude_paths: Vec<String> = Vec::new();
1365
1366 let path_values: Vec<serde_json::Value> = match value {
1367 serde_json::Value::Array(arr) => arr,
1368 serde_json::Value::Object(map) => {
1369 if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1370 exclude_paths = excl
1371 .iter()
1372 .filter_map(|v| v.as_str().map(str::to_owned))
1373 .collect();
1374 }
1375 match map.get("paths") {
1376 Some(serde_json::Value::Array(arr)) => arr.clone(),
1377 _ => Vec::new(),
1378 }
1379 }
1380 _ => Vec::new(),
1381 };
1382
1383 for entry in path_values {
1384 match entry {
1385 serde_json::Value::String(path) => {
1386 paths.push(PropertyPathEntry::scalar(path));
1387 }
1388 serde_json::Value::Object(map) => {
1389 let Some(path) = map.get("path").and_then(|v| v.as_str()) else {
1390 continue;
1391 };
1392 let mode = map.get("mode").and_then(|v| v.as_str()).map_or(
1393 PropertyPathMode::Scalar,
1394 |m| match m {
1395 "recursive" => PropertyPathMode::Recursive,
1396 _ => PropertyPathMode::Scalar,
1397 },
1398 );
1399 paths.push(PropertyPathEntry {
1400 path: path.to_owned(),
1401 mode,
1402 });
1403 if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1404 for p in excl {
1405 if let Some(s) = p.as_str() {
1406 exclude_paths.push(s.to_owned());
1407 }
1408 }
1409 }
1410 }
1411 _ => {}
1412 }
1413 }
1414
1415 PropertyFtsSchema {
1416 paths,
1417 separator: separator.to_owned(),
1418 exclude_paths,
1419 }
1420}
1421
1422fn resolve_operational_writes(
1423 conn: &rusqlite::Connection,
1424 prepared: &mut PreparedWrite,
1425) -> Result<(), EngineError> {
1426 let mut collection_kinds = HashMap::new();
1427 let mut collection_filter_fields = HashMap::new();
1428 let mut collection_validation_contracts = HashMap::new();
1429 for write in &prepared.operational_writes {
1430 let collection = operational_write_collection(write);
1431 if !collection_kinds.contains_key(collection) {
1432 let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1433 .query_row(
1434 "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1435 params![collection],
1436 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1437 )
1438 .optional()
1439 .map_err(EngineError::Sqlite)?;
1440 let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1441 .ok_or_else(|| {
1442 EngineError::InvalidWrite(format!(
1443 "operational collection '{collection}' is not registered"
1444 ))
1445 })?;
1446 if disabled_at.is_some() {
1447 return Err(EngineError::InvalidWrite(format!(
1448 "operational collection '{collection}' is disabled"
1449 )));
1450 }
1451 let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1452 .map_err(EngineError::InvalidWrite)?;
1453 let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1454 let validation_contract = parse_operational_validation_contract(&validation_json)
1455 .map_err(EngineError::InvalidWrite)?;
1456 collection_kinds.insert(collection.to_owned(), kind);
1457 collection_filter_fields.insert(collection.to_owned(), filter_fields);
1458 collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1459 }
1460
1461 let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1462 EngineError::InvalidWrite("missing operational collection kind".to_owned())
1463 })?;
1464 match (kind, write) {
1465 (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1466 | (
1467 OperationalCollectionKind::LatestState,
1468 OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1469 ) => {}
1470 (OperationalCollectionKind::AppendOnlyLog, _) => {
1471 return Err(EngineError::InvalidWrite(format!(
1472 "operational collection '{collection}' is append_only_log and only accepts Append"
1473 )));
1474 }
1475 (OperationalCollectionKind::LatestState, _) => {
1476 return Err(EngineError::InvalidWrite(format!(
1477 "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1478 )));
1479 }
1480 }
1481 if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1482 let _ = check_operational_write_against_contract(write, contract)?;
1483 }
1484 }
1485 prepared.operational_collection_kinds = collection_kinds;
1486 prepared.operational_collection_filter_fields = collection_filter_fields;
1487 Ok(())
1488}
1489
1490fn parse_operational_filter_fields(
1491 filter_fields_json: &str,
1492) -> Result<Vec<OperationalFilterField>, EngineError> {
1493 let fields: Vec<OperationalFilterField> =
1494 serde_json::from_str(filter_fields_json).map_err(|error| {
1495 EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1496 })?;
1497 let mut seen = std::collections::HashSet::new();
1498 for field in &fields {
1499 if field.name.trim().is_empty() {
1500 return Err(EngineError::InvalidWrite(
1501 "filter_fields_json field names must not be empty".to_owned(),
1502 ));
1503 }
1504 if !seen.insert(field.name.as_str()) {
1505 return Err(EngineError::InvalidWrite(format!(
1506 "filter_fields_json contains duplicate field '{}'",
1507 field.name
1508 )));
1509 }
1510 if field.modes.is_empty() {
1511 return Err(EngineError::InvalidWrite(format!(
1512 "filter_fields_json field '{}' must declare at least one mode",
1513 field.name
1514 )));
1515 }
1516 if field.modes.contains(&OperationalFilterMode::Prefix)
1517 && field.field_type != OperationalFilterFieldType::String
1518 {
1519 return Err(EngineError::InvalidWrite(format!(
1520 "filter field '{}' only supports prefix for string types",
1521 field.name
1522 )));
1523 }
1524 }
1525 Ok(fields)
1526}
1527
1528#[derive(Clone, Debug, PartialEq, Eq)]
1529struct OperationalFilterValueRow {
1530 field_name: String,
1531 string_value: Option<String>,
1532 integer_value: Option<i64>,
1533}
1534
1535fn extract_operational_filter_values(
1536 filter_fields: &[OperationalFilterField],
1537 payload_json: &str,
1538) -> Vec<OperationalFilterValueRow> {
1539 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1540 return Vec::new();
1541 };
1542 let Some(object) = parsed.as_object() else {
1543 return Vec::new();
1544 };
1545
1546 filter_fields
1547 .iter()
1548 .filter_map(|field| {
1549 let value = object.get(&field.name)?;
1550 match field.field_type {
1551 OperationalFilterFieldType::String => {
1552 value
1553 .as_str()
1554 .map(|string_value| OperationalFilterValueRow {
1555 field_name: field.name.clone(),
1556 string_value: Some(string_value.to_owned()),
1557 integer_value: None,
1558 })
1559 }
1560 OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1561 value
1562 .as_i64()
1563 .map(|integer_value| OperationalFilterValueRow {
1564 field_name: field.name.clone(),
1565 string_value: None,
1566 integer_value: Some(integer_value),
1567 })
1568 }
1569 }
1570 })
1571 .collect()
1572}
1573
1574fn resolve_and_apply(
1575 conn: &mut rusqlite::Connection,
1576 prepared: &mut PreparedWrite,
1577) -> Result<WriteReceipt, EngineError> {
1578 resolve_fts_rows(conn, prepared)?;
1579 resolve_property_fts_rows(conn, prepared)?;
1580 resolve_operational_writes(conn, prepared)?;
1581 apply_write(conn, prepared)
1582}
1583
1584fn apply_touch_last_accessed(
1585 conn: &mut rusqlite::Connection,
1586 request: &LastAccessTouchRequest,
1587) -> Result<LastAccessTouchReport, EngineError> {
1588 let mut seen = std::collections::HashSet::new();
1589 let logical_ids = request
1590 .logical_ids
1591 .iter()
1592 .filter(|logical_id| seen.insert(logical_id.as_str()))
1593 .cloned()
1594 .collect::<Vec<_>>();
1595 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1596
1597 for logical_id in &logical_ids {
1598 let exists = tx
1599 .query_row(
1600 "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1601 params![logical_id],
1602 |row| row.get::<_, i64>(0),
1603 )
1604 .optional()?
1605 .is_some();
1606 if !exists {
1607 return Err(EngineError::InvalidWrite(format!(
1608 "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1609 )));
1610 }
1611 }
1612
1613 {
1614 let mut upsert_metadata = tx.prepare_cached(
1615 "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1616 VALUES (?1, ?2, ?2) \
1617 ON CONFLICT(logical_id) DO UPDATE SET \
1618 last_accessed_at = excluded.last_accessed_at, \
1619 updated_at = excluded.updated_at",
1620 )?;
1621 let mut insert_provenance = tx.prepare_cached(
1622 "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1623 VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1624 )?;
1625 for logical_id in &logical_ids {
1626 upsert_metadata.execute(params![logical_id, request.touched_at])?;
1627 insert_provenance.execute(params![
1628 new_id(),
1629 logical_id,
1630 request.source_ref.as_deref(),
1631 format!("{{\"touched_at\":{}}}", request.touched_at),
1632 ])?;
1633 }
1634 }
1635
1636 tx.commit()?;
1637 Ok(LastAccessTouchReport {
1638 touched_logical_ids: logical_ids.len(),
1639 touched_at: request.touched_at,
1640 })
1641}
1642
1643fn ensure_operational_collections_writable(
1644 tx: &rusqlite::Transaction<'_>,
1645 prepared: &PreparedWrite,
1646) -> Result<(), EngineError> {
1647 for collection in prepared.operational_collection_kinds.keys() {
1648 let disabled_at: Option<Option<i64>> = tx
1649 .query_row(
1650 "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1651 params![collection],
1652 |row| row.get::<_, Option<i64>>(0),
1653 )
1654 .optional()?;
1655 match disabled_at {
1656 Some(Some(_)) => {
1657 return Err(EngineError::InvalidWrite(format!(
1658 "operational collection '{collection}' is disabled"
1659 )));
1660 }
1661 Some(None) => {}
1662 None => {
1663 return Err(EngineError::InvalidWrite(format!(
1664 "operational collection '{collection}' is not registered"
1665 )));
1666 }
1667 }
1668 }
1669 Ok(())
1670}
1671
1672fn validate_operational_writes_against_live_contracts(
1673 tx: &rusqlite::Transaction<'_>,
1674 prepared: &PreparedWrite,
1675) -> Result<Vec<String>, EngineError> {
1676 let mut collection_validation_contracts =
1677 HashMap::<String, Option<OperationalValidationContract>>::new();
1678 for collection in prepared.operational_collection_kinds.keys() {
1679 let validation_json: String = tx
1680 .query_row(
1681 "SELECT validation_json FROM operational_collections WHERE name = ?1",
1682 params![collection],
1683 |row| row.get(0),
1684 )
1685 .map_err(EngineError::Sqlite)?;
1686 let validation_contract = parse_operational_validation_contract(&validation_json)
1687 .map_err(EngineError::InvalidWrite)?;
1688 collection_validation_contracts.insert(collection.clone(), validation_contract);
1689 }
1690
1691 let mut warnings = Vec::new();
1692 for write in &prepared.operational_writes {
1693 if let Some(Some(contract)) =
1694 collection_validation_contracts.get(operational_write_collection(write))
1695 && let Some(warning) = check_operational_write_against_contract(write, contract)?
1696 {
1697 warnings.push(warning);
1698 }
1699 }
1700
1701 Ok(warnings)
1702}
1703
1704fn load_live_operational_secondary_indexes(
1705 tx: &rusqlite::Transaction<'_>,
1706 prepared: &PreparedWrite,
1707) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1708 let mut collection_indexes = HashMap::new();
1709 for (collection, collection_kind) in &prepared.operational_collection_kinds {
1710 let secondary_indexes_json: String = tx
1711 .query_row(
1712 "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1713 params![collection],
1714 |row| row.get(0),
1715 )
1716 .map_err(EngineError::Sqlite)?;
1717 let indexes =
1718 parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1719 .map_err(EngineError::InvalidWrite)?;
1720 collection_indexes.insert(collection.clone(), indexes);
1721 }
1722 Ok(collection_indexes)
1723}
1724
1725fn check_operational_write_against_contract(
1726 write: &OperationalWrite,
1727 contract: &OperationalValidationContract,
1728) -> Result<Option<String>, EngineError> {
1729 if contract.mode == OperationalValidationMode::Disabled {
1730 return Ok(None);
1731 }
1732
1733 let (payload_json, collection, record_key) = match write {
1734 OperationalWrite::Append {
1735 collection,
1736 record_key,
1737 payload_json,
1738 ..
1739 }
1740 | OperationalWrite::Put {
1741 collection,
1742 record_key,
1743 payload_json,
1744 ..
1745 } => (
1746 payload_json.as_str(),
1747 collection.as_str(),
1748 record_key.as_str(),
1749 ),
1750 OperationalWrite::Delete { .. } => return Ok(None),
1751 };
1752
1753 match validate_operational_payload_against_contract(contract, payload_json) {
1754 Ok(()) => Ok(None),
1755 Err(message) => match contract.mode {
1756 OperationalValidationMode::Disabled => Ok(None),
1757 OperationalValidationMode::ReportOnly => Ok(Some(format!(
1758 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1759 kind = operational_write_kind(write)
1760 ))),
1761 OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1762 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1763 kind = operational_write_kind(write)
1764 ))),
1765 },
1766 }
1767}
1768
1769#[allow(clippy::too_many_lines)]
1770fn apply_write(
1771 conn: &mut rusqlite::Connection,
1772 prepared: &mut PreparedWrite,
1773) -> Result<WriteReceipt, EngineError> {
1774 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1775
1776 {
1779 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1780 let mut del_prop_fts =
1781 tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1782 let mut del_prop_positions = tx
1783 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1784 let mut sup_node = tx.prepare_cached(
1785 "UPDATE nodes SET superseded_at = unixepoch() \
1786 WHERE logical_id = ?1 AND superseded_at IS NULL",
1787 )?;
1788 let mut ins_event = tx.prepare_cached(
1789 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1790 VALUES (?1, 'node_retire', ?2, ?3)",
1791 )?;
1792 for retire in &prepared.node_retires {
1793 del_fts.execute(params![retire.logical_id])?;
1794 del_prop_fts.execute(params![retire.logical_id])?;
1795 del_prop_positions.execute(params![retire.logical_id])?;
1796 sup_node.execute(params![retire.logical_id])?;
1797 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1798 }
1799 }
1800
1801 {
1803 let mut sup_edge = tx.prepare_cached(
1804 "UPDATE edges SET superseded_at = unixepoch() \
1805 WHERE logical_id = ?1 AND superseded_at IS NULL",
1806 )?;
1807 let mut ins_event = tx.prepare_cached(
1808 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1809 VALUES (?1, 'edge_retire', ?2, ?3)",
1810 )?;
1811 for retire in &prepared.edge_retires {
1812 sup_edge.execute(params![retire.logical_id])?;
1813 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1814 }
1815 }
1816
1817 {
1819 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1820 let mut del_prop_fts =
1821 tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1822 let mut del_prop_positions = tx
1823 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1824 let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1825 let mut sup_node = tx.prepare_cached(
1826 "UPDATE nodes SET superseded_at = unixepoch() \
1827 WHERE logical_id = ?1 AND superseded_at IS NULL",
1828 )?;
1829 let mut ins_node = tx.prepare_cached(
1830 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1831 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1832 )?;
1833 #[cfg(feature = "sqlite-vec")]
1834 let vec_del_sql2 = "DELETE FROM vec_nodes_active WHERE chunk_id IN \
1835 (SELECT id FROM chunks WHERE node_logical_id = ?1)";
1836 #[cfg(feature = "sqlite-vec")]
1837 let mut del_vec = match tx.prepare_cached(vec_del_sql2) {
1838 Ok(stmt) => Some(stmt),
1839 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => None,
1840 Err(e) => return Err(e.into()),
1841 };
1842 for node in &prepared.nodes {
1843 if node.upsert {
1844 del_prop_fts.execute(params![node.logical_id])?;
1846 del_prop_positions.execute(params![node.logical_id])?;
1847 if node.chunk_policy == ChunkPolicy::Replace {
1848 #[cfg(feature = "sqlite-vec")]
1849 if let Some(ref mut stmt) = del_vec {
1850 stmt.execute(params![node.logical_id])?;
1851 }
1852 del_fts.execute(params![node.logical_id])?;
1853 del_chunks.execute(params![node.logical_id])?;
1854 }
1855 sup_node.execute(params![node.logical_id])?;
1856 }
1857 ins_node.execute(params![
1858 node.row_id,
1859 node.logical_id,
1860 node.kind,
1861 node.properties,
1862 node.source_ref,
1863 node.content_ref,
1864 ])?;
1865 }
1866 }
1867
1868 {
1870 let mut sup_edge = tx.prepare_cached(
1871 "UPDATE edges SET superseded_at = unixepoch() \
1872 WHERE logical_id = ?1 AND superseded_at IS NULL",
1873 )?;
1874 let mut ins_edge = tx.prepare_cached(
1875 "INSERT INTO edges \
1876 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1877 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1878 )?;
1879 for edge in &prepared.edges {
1880 if edge.upsert {
1881 sup_edge.execute(params![edge.logical_id])?;
1882 }
1883 ins_edge.execute(params![
1884 edge.row_id,
1885 edge.logical_id,
1886 edge.source_logical_id,
1887 edge.target_logical_id,
1888 edge.kind,
1889 edge.properties,
1890 edge.source_ref,
1891 ])?;
1892 }
1893 }
1894
1895 {
1897 let mut ins_chunk = tx.prepare_cached(
1898 "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
1899 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1900 )?;
1901 for chunk in &prepared.chunks {
1902 ins_chunk.execute(params![
1903 chunk.id,
1904 chunk.node_logical_id,
1905 chunk.text_content,
1906 chunk.byte_start,
1907 chunk.byte_end,
1908 chunk.content_hash,
1909 ])?;
1910 }
1911 }
1912
1913 {
1915 let mut sup_run = tx.prepare_cached(
1916 "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1917 )?;
1918 let mut ins_run = tx.prepare_cached(
1919 "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1920 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1921 )?;
1922 for run in &prepared.runs {
1923 if run.upsert
1924 && let Some(ref prior_id) = run.supersedes_id
1925 {
1926 sup_run.execute(params![prior_id])?;
1927 }
1928 ins_run.execute(params![
1929 run.id,
1930 run.kind,
1931 run.status,
1932 run.properties,
1933 run.source_ref
1934 ])?;
1935 }
1936 }
1937
1938 {
1940 let mut sup_step = tx.prepare_cached(
1941 "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1942 )?;
1943 let mut ins_step = tx.prepare_cached(
1944 "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1945 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1946 )?;
1947 for step in &prepared.steps {
1948 if step.upsert
1949 && let Some(ref prior_id) = step.supersedes_id
1950 {
1951 sup_step.execute(params![prior_id])?;
1952 }
1953 ins_step.execute(params![
1954 step.id,
1955 step.run_id,
1956 step.kind,
1957 step.status,
1958 step.properties,
1959 step.source_ref,
1960 ])?;
1961 }
1962 }
1963
1964 {
1966 let mut sup_action = tx.prepare_cached(
1967 "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1968 )?;
1969 let mut ins_action = tx.prepare_cached(
1970 "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1971 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1972 )?;
1973 for action in &prepared.actions {
1974 if action.upsert
1975 && let Some(ref prior_id) = action.supersedes_id
1976 {
1977 sup_action.execute(params![prior_id])?;
1978 }
1979 ins_action.execute(params![
1980 action.id,
1981 action.step_id,
1982 action.kind,
1983 action.status,
1984 action.properties,
1985 action.source_ref,
1986 ])?;
1987 }
1988 }
1989
1990 {
1992 ensure_operational_collections_writable(&tx, prepared)?;
1993 prepared.operational_validation_warnings =
1994 validate_operational_writes_against_live_contracts(&tx, prepared)?;
1995 let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
1996
1997 let mut next_mutation_order: i64 = tx.query_row(
1998 "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
1999 [],
2000 |row| row.get(0),
2001 )?;
2002 let mut ins_mutation = tx.prepare_cached(
2003 "INSERT INTO operational_mutations \
2004 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2005 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
2006 )?;
2007 let mut ins_filter_value = tx.prepare_cached(
2008 "INSERT INTO operational_filter_values \
2009 (mutation_id, collection_name, field_name, string_value, integer_value) \
2010 VALUES (?1, ?2, ?3, ?4, ?5)",
2011 )?;
2012 let mut upsert_current = tx.prepare_cached(
2013 "INSERT INTO operational_current \
2014 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2015 VALUES (?1, ?2, ?3, unixepoch(), ?4) \
2016 ON CONFLICT(collection_name, record_key) DO UPDATE SET \
2017 payload_json = excluded.payload_json, \
2018 updated_at = excluded.updated_at, \
2019 last_mutation_id = excluded.last_mutation_id",
2020 )?;
2021 let mut del_current = tx.prepare_cached(
2022 "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
2023 )?;
2024 let mut del_current_secondary_indexes = tx.prepare_cached(
2025 "DELETE FROM operational_secondary_index_entries \
2026 WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
2027 )?;
2028 let mut ins_secondary_index = tx.prepare_cached(
2029 "INSERT INTO operational_secondary_index_entries \
2030 (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
2031 slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
2032 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
2033 )?;
2034 let mut current_row_stmt = tx.prepare_cached(
2035 "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
2036 WHERE collection_name = ?1 AND record_key = ?2",
2037 )?;
2038
2039 for write in &prepared.operational_writes {
2040 let collection = operational_write_collection(write);
2041 let record_key = operational_write_record_key(write);
2042 let mutation_id = new_id();
2043 next_mutation_order += 1;
2044 let payload_json = operational_write_payload(write);
2045 ins_mutation.execute(params![
2046 &mutation_id,
2047 collection,
2048 record_key,
2049 operational_write_kind(write),
2050 payload_json,
2051 operational_write_source_ref(write),
2052 next_mutation_order,
2053 ])?;
2054 if let Some(indexes) = collection_secondary_indexes.get(collection) {
2055 for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
2056 ins_secondary_index.execute(params![
2057 collection,
2058 entry.index_name,
2059 "mutation",
2060 &mutation_id,
2061 record_key,
2062 entry.sort_timestamp,
2063 entry.slot1_text,
2064 entry.slot1_integer,
2065 entry.slot2_text,
2066 entry.slot2_integer,
2067 entry.slot3_text,
2068 entry.slot3_integer,
2069 ])?;
2070 }
2071 }
2072 if let Some(filter_fields) = prepared
2073 .operational_collection_filter_fields
2074 .get(collection)
2075 {
2076 for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
2077 ins_filter_value.execute(params![
2078 &mutation_id,
2079 collection,
2080 filter_value.field_name,
2081 filter_value.string_value,
2082 filter_value.integer_value,
2083 ])?;
2084 }
2085 }
2086
2087 if prepared.operational_collection_kinds.get(collection)
2088 == Some(&OperationalCollectionKind::LatestState)
2089 {
2090 del_current_secondary_indexes.execute(params![collection, record_key])?;
2091 match write {
2092 OperationalWrite::Put { payload_json, .. } => {
2093 upsert_current.execute(params![
2094 collection,
2095 record_key,
2096 payload_json,
2097 &mutation_id,
2098 ])?;
2099 if let Some(indexes) = collection_secondary_indexes.get(collection) {
2100 let (current_payload_json, updated_at, last_mutation_id): (
2101 String,
2102 i64,
2103 String,
2104 ) = current_row_stmt
2105 .query_row(params![collection, record_key], |row| {
2106 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
2107 })?;
2108 for entry in extract_secondary_index_entries_for_current(
2109 indexes,
2110 ¤t_payload_json,
2111 updated_at,
2112 ) {
2113 ins_secondary_index.execute(params![
2114 collection,
2115 entry.index_name,
2116 "current",
2117 last_mutation_id.as_str(),
2118 record_key,
2119 entry.sort_timestamp,
2120 entry.slot1_text,
2121 entry.slot1_integer,
2122 entry.slot2_text,
2123 entry.slot2_integer,
2124 entry.slot3_text,
2125 entry.slot3_integer,
2126 ])?;
2127 }
2128 }
2129 }
2130 OperationalWrite::Delete { .. } => {
2131 del_current.execute(params![collection, record_key])?;
2132 }
2133 OperationalWrite::Append { .. } => {}
2134 }
2135 }
2136 }
2137 }
2138
2139 {
2141 let mut ins_fts = tx.prepare_cached(
2142 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
2143 VALUES (?1, ?2, ?3, ?4)",
2144 )?;
2145 for fts_row in &prepared.required_fts_rows {
2146 ins_fts.execute(params![
2147 fts_row.chunk_id,
2148 fts_row.node_logical_id,
2149 fts_row.kind,
2150 fts_row.text_content,
2151 ])?;
2152 }
2153 }
2154
2155 if !prepared.required_property_fts_rows.is_empty() {
2157 let mut ins_prop_fts = tx.prepare_cached(
2158 "INSERT INTO fts_node_properties (node_logical_id, kind, text_content) \
2159 VALUES (?1, ?2, ?3)",
2160 )?;
2161 let mut ins_positions = tx.prepare_cached(
2162 "INSERT INTO fts_node_property_positions \
2163 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
2164 VALUES (?1, ?2, ?3, ?4, ?5)",
2165 )?;
2166 let mut del_positions = tx
2169 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
2170 for row in &prepared.required_property_fts_rows {
2171 del_positions.execute(params![row.node_logical_id])?;
2172 ins_prop_fts.execute(params![row.node_logical_id, row.kind, row.text_content,])?;
2173 for pos in &row.positions {
2174 ins_positions.execute(params![
2175 row.node_logical_id,
2176 row.kind,
2177 i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
2178 i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
2179 pos.leaf_path,
2180 ])?;
2181 }
2182 }
2183 }
2184
2185 #[cfg(feature = "sqlite-vec")]
2187 {
2188 match tx
2189 .prepare_cached("INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES (?1, ?2)")
2190 {
2191 Ok(mut ins_vec) => {
2192 for vi in &prepared.vec_inserts {
2193 let bytes: Vec<u8> =
2194 vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
2195 ins_vec.execute(params![vi.chunk_id, bytes])?;
2196 }
2197 }
2198 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {
2199 }
2201 Err(e) => return Err(e.into()),
2202 }
2203 }
2204
2205 tx.commit()?;
2206
2207 let provenance_warnings: Vec<String> = prepared
2208 .nodes
2209 .iter()
2210 .filter(|node| node.source_ref.is_none())
2211 .map(|node| format!("node '{}' has no source_ref", node.logical_id))
2212 .chain(
2213 prepared
2214 .node_retires
2215 .iter()
2216 .filter(|r| r.source_ref.is_none())
2217 .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
2218 )
2219 .chain(
2220 prepared
2221 .edges
2222 .iter()
2223 .filter(|e| e.source_ref.is_none())
2224 .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
2225 )
2226 .chain(
2227 prepared
2228 .edge_retires
2229 .iter()
2230 .filter(|r| r.source_ref.is_none())
2231 .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
2232 )
2233 .chain(
2234 prepared
2235 .runs
2236 .iter()
2237 .filter(|r| r.source_ref.is_none())
2238 .map(|r| format!("run '{}' has no source_ref", r.id)),
2239 )
2240 .chain(
2241 prepared
2242 .steps
2243 .iter()
2244 .filter(|s| s.source_ref.is_none())
2245 .map(|s| format!("step '{}' has no source_ref", s.id)),
2246 )
2247 .chain(
2248 prepared
2249 .actions
2250 .iter()
2251 .filter(|a| a.source_ref.is_none())
2252 .map(|a| format!("action '{}' has no source_ref", a.id)),
2253 )
2254 .chain(
2255 prepared
2256 .operational_writes
2257 .iter()
2258 .filter(|write| operational_write_source_ref(write).is_none())
2259 .map(|write| {
2260 format!(
2261 "operational {} '{}:{}' has no source_ref",
2262 operational_write_kind(write),
2263 operational_write_collection(write),
2264 operational_write_record_key(write)
2265 )
2266 }),
2267 )
2268 .collect();
2269
2270 let mut warnings = provenance_warnings.clone();
2271 warnings.extend(prepared.operational_validation_warnings.clone());
2272
2273 Ok(WriteReceipt {
2274 label: prepared.label.clone(),
2275 optional_backfill_count: prepared.optional_backfills.len(),
2276 warnings,
2277 provenance_warnings,
2278 })
2279}
2280
2281fn operational_write_collection(write: &OperationalWrite) -> &str {
2282 match write {
2283 OperationalWrite::Append { collection, .. }
2284 | OperationalWrite::Put { collection, .. }
2285 | OperationalWrite::Delete { collection, .. } => collection,
2286 }
2287}
2288
2289fn operational_write_record_key(write: &OperationalWrite) -> &str {
2290 match write {
2291 OperationalWrite::Append { record_key, .. }
2292 | OperationalWrite::Put { record_key, .. }
2293 | OperationalWrite::Delete { record_key, .. } => record_key,
2294 }
2295}
2296
2297fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2298 match write {
2299 OperationalWrite::Append { .. } => "append",
2300 OperationalWrite::Put { .. } => "put",
2301 OperationalWrite::Delete { .. } => "delete",
2302 }
2303}
2304
2305fn operational_write_payload(write: &OperationalWrite) -> &str {
2306 match write {
2307 OperationalWrite::Append { payload_json, .. }
2308 | OperationalWrite::Put { payload_json, .. } => payload_json,
2309 OperationalWrite::Delete { .. } => "null",
2310 }
2311}
2312
2313fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2314 match write {
2315 OperationalWrite::Append { source_ref, .. }
2316 | OperationalWrite::Put { source_ref, .. }
2317 | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2318 }
2319}
2320
2321#[cfg(test)]
2322#[allow(clippy::expect_used)]
2323mod tests {
2324 use std::sync::Arc;
2325
2326 use fathomdb_schema::SchemaManager;
2327 use tempfile::NamedTempFile;
2328
2329 use super::{apply_write, prepare_write, resolve_operational_writes};
2330 use crate::{
2331 ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2332 NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2333 StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2334 projection::ProjectionTarget,
2335 };
2336
2337 #[test]
2338 fn writer_executes_runtime_table_rows() {
2339 let db = NamedTempFile::new().expect("temporary db");
2340 let writer = WriterActor::start(
2341 db.path(),
2342 Arc::new(SchemaManager::new()),
2343 ProvenanceMode::Warn,
2344 Arc::new(TelemetryCounters::default()),
2345 )
2346 .expect("writer");
2347
2348 let receipt = writer
2349 .submit(WriteRequest {
2350 label: "runtime".to_owned(),
2351 nodes: vec![],
2352 node_retires: vec![],
2353 edges: vec![],
2354 edge_retires: vec![],
2355 chunks: vec![],
2356 runs: vec![RunInsert {
2357 id: "run-1".to_owned(),
2358 kind: "session".to_owned(),
2359 status: "completed".to_owned(),
2360 properties: "{}".to_owned(),
2361 source_ref: Some("src-1".to_owned()),
2362 upsert: false,
2363 supersedes_id: None,
2364 }],
2365 steps: vec![StepInsert {
2366 id: "step-1".to_owned(),
2367 run_id: "run-1".to_owned(),
2368 kind: "llm".to_owned(),
2369 status: "completed".to_owned(),
2370 properties: "{}".to_owned(),
2371 source_ref: Some("src-1".to_owned()),
2372 upsert: false,
2373 supersedes_id: None,
2374 }],
2375 actions: vec![ActionInsert {
2376 id: "action-1".to_owned(),
2377 step_id: "step-1".to_owned(),
2378 kind: "emit".to_owned(),
2379 status: "completed".to_owned(),
2380 properties: "{}".to_owned(),
2381 source_ref: Some("src-1".to_owned()),
2382 upsert: false,
2383 supersedes_id: None,
2384 }],
2385 optional_backfills: vec![],
2386 vec_inserts: vec![],
2387 operational_writes: vec![],
2388 })
2389 .expect("write receipt");
2390
2391 assert_eq!(receipt.label, "runtime");
2392 }
2393
2394 #[test]
2395 fn writer_put_operational_write_updates_current_and_mutations() {
2396 let db = NamedTempFile::new().expect("temporary db");
2397 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2398 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2399 conn.execute(
2400 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2401 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2402 [],
2403 )
2404 .expect("seed collection");
2405 drop(conn);
2406 let writer = WriterActor::start(
2407 db.path(),
2408 Arc::new(SchemaManager::new()),
2409 ProvenanceMode::Warn,
2410 Arc::new(TelemetryCounters::default()),
2411 )
2412 .expect("writer");
2413
2414 writer
2415 .submit(WriteRequest {
2416 label: "node-and-operational".to_owned(),
2417 nodes: vec![NodeInsert {
2418 row_id: "row-1".to_owned(),
2419 logical_id: "lg-1".to_owned(),
2420 kind: "Meeting".to_owned(),
2421 properties: "{}".to_owned(),
2422 source_ref: Some("src-1".to_owned()),
2423 upsert: false,
2424 chunk_policy: ChunkPolicy::Preserve,
2425 content_ref: None,
2426 }],
2427 node_retires: vec![],
2428 edges: vec![],
2429 edge_retires: vec![],
2430 chunks: vec![],
2431 runs: vec![],
2432 steps: vec![],
2433 actions: vec![],
2434 optional_backfills: vec![],
2435 vec_inserts: vec![],
2436 operational_writes: vec![OperationalWrite::Put {
2437 collection: "connector_health".to_owned(),
2438 record_key: "gmail".to_owned(),
2439 payload_json: r#"{"status":"ok"}"#.to_owned(),
2440 source_ref: Some("src-1".to_owned()),
2441 }],
2442 })
2443 .expect("write receipt");
2444
2445 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2446 let node_count: i64 = conn
2447 .query_row(
2448 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2449 [],
2450 |row| row.get(0),
2451 )
2452 .expect("node count");
2453 assert_eq!(node_count, 1);
2454 let mutation_count: i64 = conn
2455 .query_row(
2456 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2457 AND record_key = 'gmail'",
2458 [],
2459 |row| row.get(0),
2460 )
2461 .expect("mutation count");
2462 assert_eq!(mutation_count, 1);
2463 let payload: String = conn
2464 .query_row(
2465 "SELECT payload_json FROM operational_current \
2466 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2467 [],
2468 |row| row.get(0),
2469 )
2470 .expect("current payload");
2471 assert_eq!(payload, r#"{"status":"ok"}"#);
2472 }
2473
2474 #[test]
2475 fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2476 let db = NamedTempFile::new().expect("temporary db");
2477 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2478 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2479 conn.execute(
2480 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2481 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2482 [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2483 )
2484 .expect("seed collection");
2485 drop(conn);
2486 let writer = WriterActor::start(
2487 db.path(),
2488 Arc::new(SchemaManager::new()),
2489 ProvenanceMode::Warn,
2490 Arc::new(TelemetryCounters::default()),
2491 )
2492 .expect("writer");
2493
2494 writer
2495 .submit(WriteRequest {
2496 label: "disabled-validation".to_owned(),
2497 nodes: vec![],
2498 node_retires: vec![],
2499 edges: vec![],
2500 edge_retires: vec![],
2501 chunks: vec![],
2502 runs: vec![],
2503 steps: vec![],
2504 actions: vec![],
2505 optional_backfills: vec![],
2506 vec_inserts: vec![],
2507 operational_writes: vec![OperationalWrite::Put {
2508 collection: "connector_health".to_owned(),
2509 record_key: "gmail".to_owned(),
2510 payload_json: r#"{"bogus":true}"#.to_owned(),
2511 source_ref: Some("src-1".to_owned()),
2512 }],
2513 })
2514 .expect("write receipt");
2515
2516 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2517 let payload: String = conn
2518 .query_row(
2519 "SELECT payload_json FROM operational_current \
2520 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2521 [],
2522 |row| row.get(0),
2523 )
2524 .expect("current payload");
2525 assert_eq!(payload, r#"{"bogus":true}"#);
2526 }
2527
2528 #[test]
2529 fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2530 let db = NamedTempFile::new().expect("temporary db");
2531 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2532 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2533 conn.execute(
2534 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2535 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2536 [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2537 )
2538 .expect("seed collection");
2539 drop(conn);
2540 let writer = WriterActor::start(
2541 db.path(),
2542 Arc::new(SchemaManager::new()),
2543 ProvenanceMode::Warn,
2544 Arc::new(TelemetryCounters::default()),
2545 )
2546 .expect("writer");
2547
2548 let receipt = writer
2549 .submit(WriteRequest {
2550 label: "report-only-validation".to_owned(),
2551 nodes: vec![],
2552 node_retires: vec![],
2553 edges: vec![],
2554 edge_retires: vec![],
2555 chunks: vec![],
2556 runs: vec![],
2557 steps: vec![],
2558 actions: vec![],
2559 optional_backfills: vec![],
2560 vec_inserts: vec![],
2561 operational_writes: vec![OperationalWrite::Put {
2562 collection: "connector_health".to_owned(),
2563 record_key: "gmail".to_owned(),
2564 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2565 source_ref: Some("src-1".to_owned()),
2566 }],
2567 })
2568 .expect("report_only write should succeed");
2569
2570 assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2571 assert_eq!(receipt.warnings.len(), 1);
2572 assert!(
2573 receipt.warnings[0].contains("connector_health"),
2574 "warning should identify collection"
2575 );
2576 assert!(
2577 receipt.warnings[0].contains("must be one of"),
2578 "warning should explain validation failure"
2579 );
2580
2581 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2582 let payload: String = conn
2583 .query_row(
2584 "SELECT payload_json FROM operational_current \
2585 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2586 [],
2587 |row| row.get(0),
2588 )
2589 .expect("current payload");
2590 assert_eq!(payload, r#"{"status":"bogus"}"#);
2591 }
2592
2593 #[test]
2594 fn writer_rejects_operational_write_for_missing_collection() {
2595 let db = NamedTempFile::new().expect("temporary db");
2596 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2597 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2598 drop(conn);
2599 let writer = WriterActor::start(
2600 db.path(),
2601 Arc::new(SchemaManager::new()),
2602 ProvenanceMode::Warn,
2603 Arc::new(TelemetryCounters::default()),
2604 )
2605 .expect("writer");
2606
2607 let result = writer.submit(WriteRequest {
2608 label: "missing-operational-collection".to_owned(),
2609 nodes: vec![],
2610 node_retires: vec![],
2611 edges: vec![],
2612 edge_retires: vec![],
2613 chunks: vec![],
2614 runs: vec![],
2615 steps: vec![],
2616 actions: vec![],
2617 optional_backfills: vec![],
2618 vec_inserts: vec![],
2619 operational_writes: vec![OperationalWrite::Put {
2620 collection: "connector_health".to_owned(),
2621 record_key: "gmail".to_owned(),
2622 payload_json: r#"{"status":"ok"}"#.to_owned(),
2623 source_ref: Some("src-1".to_owned()),
2624 }],
2625 });
2626
2627 assert!(
2628 matches!(result, Err(EngineError::InvalidWrite(_))),
2629 "missing operational collection must return InvalidWrite"
2630 );
2631 }
2632
2633 #[test]
2634 fn writer_append_operational_write_records_history_without_current_row() {
2635 let db = NamedTempFile::new().expect("temporary db");
2636 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2637 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2638 conn.execute(
2639 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2640 VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2641 [],
2642 )
2643 .expect("seed collection");
2644 drop(conn);
2645 let writer = WriterActor::start(
2646 db.path(),
2647 Arc::new(SchemaManager::new()),
2648 ProvenanceMode::Warn,
2649 Arc::new(TelemetryCounters::default()),
2650 )
2651 .expect("writer");
2652
2653 writer
2654 .submit(WriteRequest {
2655 label: "append-operational".to_owned(),
2656 nodes: vec![],
2657 node_retires: vec![],
2658 edges: vec![],
2659 edge_retires: vec![],
2660 chunks: vec![],
2661 runs: vec![],
2662 steps: vec![],
2663 actions: vec![],
2664 optional_backfills: vec![],
2665 vec_inserts: vec![],
2666 operational_writes: vec![OperationalWrite::Append {
2667 collection: "audit_log".to_owned(),
2668 record_key: "evt-1".to_owned(),
2669 payload_json: r#"{"type":"sync"}"#.to_owned(),
2670 source_ref: Some("src-1".to_owned()),
2671 }],
2672 })
2673 .expect("write receipt");
2674
2675 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2676 let mutation: (String, String) = conn
2677 .query_row(
2678 "SELECT op_kind, payload_json FROM operational_mutations \
2679 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2680 [],
2681 |row| Ok((row.get(0)?, row.get(1)?)),
2682 )
2683 .expect("mutation row");
2684 assert_eq!(mutation.0, "append");
2685 assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2686 let current_count: i64 = conn
2687 .query_row(
2688 "SELECT count(*) FROM operational_current \
2689 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2690 [],
2691 |row| row.get(0),
2692 )
2693 .expect("current count");
2694 assert_eq!(current_count, 0);
2695 }
2696
2697 #[test]
2698 fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2699 let db = NamedTempFile::new().expect("temporary db");
2700 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2701 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2702 conn.execute(
2703 "INSERT INTO operational_collections \
2704 (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2705 VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2706 '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2707 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2708 )
2709 .expect("seed collection");
2710 drop(conn);
2711 let writer = WriterActor::start(
2712 db.path(),
2713 Arc::new(SchemaManager::new()),
2714 ProvenanceMode::Warn,
2715 Arc::new(TelemetryCounters::default()),
2716 )
2717 .expect("writer");
2718
2719 let error = writer
2720 .submit(WriteRequest {
2721 label: "invalid-append".to_owned(),
2722 nodes: vec![],
2723 node_retires: vec![],
2724 edges: vec![],
2725 edge_retires: vec![],
2726 chunks: vec![],
2727 runs: vec![],
2728 steps: vec![],
2729 actions: vec![],
2730 optional_backfills: vec![],
2731 vec_inserts: vec![],
2732 operational_writes: vec![OperationalWrite::Append {
2733 collection: "audit_log".to_owned(),
2734 record_key: "evt-1".to_owned(),
2735 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2736 source_ref: Some("src-1".to_owned()),
2737 }],
2738 })
2739 .expect_err("invalid append must reject");
2740 assert!(matches!(error, EngineError::InvalidWrite(_)));
2741 assert!(error.to_string().contains("must be one of"));
2742
2743 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2744 let mutation_count: i64 = conn
2745 .query_row(
2746 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2747 [],
2748 |row| row.get(0),
2749 )
2750 .expect("mutation count");
2751 assert_eq!(mutation_count, 0);
2752 let filter_count: i64 = conn
2753 .query_row(
2754 "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2755 [],
2756 |row| row.get(0),
2757 )
2758 .expect("filter count");
2759 assert_eq!(filter_count, 0);
2760 }
2761
2762 #[test]
2763 fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2764 let db = NamedTempFile::new().expect("temporary db");
2765 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2766 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2767 conn.execute(
2768 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2769 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2770 [],
2771 )
2772 .expect("seed collection");
2773 drop(conn);
2774 let writer = WriterActor::start(
2775 db.path(),
2776 Arc::new(SchemaManager::new()),
2777 ProvenanceMode::Warn,
2778 Arc::new(TelemetryCounters::default()),
2779 )
2780 .expect("writer");
2781
2782 writer
2783 .submit(WriteRequest {
2784 label: "put-operational".to_owned(),
2785 nodes: vec![],
2786 node_retires: vec![],
2787 edges: vec![],
2788 edge_retires: vec![],
2789 chunks: vec![],
2790 runs: vec![],
2791 steps: vec![],
2792 actions: vec![],
2793 optional_backfills: vec![],
2794 vec_inserts: vec![],
2795 operational_writes: vec![OperationalWrite::Put {
2796 collection: "connector_health".to_owned(),
2797 record_key: "gmail".to_owned(),
2798 payload_json: r#"{"status":"ok"}"#.to_owned(),
2799 source_ref: Some("src-1".to_owned()),
2800 }],
2801 })
2802 .expect("put receipt");
2803
2804 writer
2805 .submit(WriteRequest {
2806 label: "delete-operational".to_owned(),
2807 nodes: vec![],
2808 node_retires: vec![],
2809 edges: vec![],
2810 edge_retires: vec![],
2811 chunks: vec![],
2812 runs: vec![],
2813 steps: vec![],
2814 actions: vec![],
2815 optional_backfills: vec![],
2816 vec_inserts: vec![],
2817 operational_writes: vec![OperationalWrite::Delete {
2818 collection: "connector_health".to_owned(),
2819 record_key: "gmail".to_owned(),
2820 source_ref: Some("src-2".to_owned()),
2821 }],
2822 })
2823 .expect("delete receipt");
2824
2825 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2826 let mutation_kinds: Vec<String> = {
2827 let mut stmt = conn
2828 .prepare(
2829 "SELECT op_kind FROM operational_mutations \
2830 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2831 ORDER BY mutation_order ASC",
2832 )
2833 .expect("stmt");
2834 stmt.query_map([], |row| row.get(0))
2835 .expect("rows")
2836 .collect::<Result<_, _>>()
2837 .expect("collect")
2838 };
2839 assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
2840 let current_count: i64 = conn
2841 .query_row(
2842 "SELECT count(*) FROM operational_current \
2843 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2844 [],
2845 |row| row.get(0),
2846 )
2847 .expect("current count");
2848 assert_eq!(current_count, 0);
2849 }
2850
2851 #[test]
2852 fn writer_delete_bypasses_validation_contract() {
2853 let db = NamedTempFile::new().expect("temporary db");
2854 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2855 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2856 conn.execute(
2857 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2858 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2859 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2860 )
2861 .expect("seed collection");
2862 drop(conn);
2863 let writer = WriterActor::start(
2864 db.path(),
2865 Arc::new(SchemaManager::new()),
2866 ProvenanceMode::Warn,
2867 Arc::new(TelemetryCounters::default()),
2868 )
2869 .expect("writer");
2870
2871 writer
2872 .submit(WriteRequest {
2873 label: "valid-put".to_owned(),
2874 nodes: vec![],
2875 node_retires: vec![],
2876 edges: vec![],
2877 edge_retires: vec![],
2878 chunks: vec![],
2879 runs: vec![],
2880 steps: vec![],
2881 actions: vec![],
2882 optional_backfills: vec![],
2883 vec_inserts: vec![],
2884 operational_writes: vec![OperationalWrite::Put {
2885 collection: "connector_health".to_owned(),
2886 record_key: "gmail".to_owned(),
2887 payload_json: r#"{"status":"ok"}"#.to_owned(),
2888 source_ref: Some("src-1".to_owned()),
2889 }],
2890 })
2891 .expect("put receipt");
2892 writer
2893 .submit(WriteRequest {
2894 label: "delete-after-put".to_owned(),
2895 nodes: vec![],
2896 node_retires: vec![],
2897 edges: vec![],
2898 edge_retires: vec![],
2899 chunks: vec![],
2900 runs: vec![],
2901 steps: vec![],
2902 actions: vec![],
2903 optional_backfills: vec![],
2904 vec_inserts: vec![],
2905 operational_writes: vec![OperationalWrite::Delete {
2906 collection: "connector_health".to_owned(),
2907 record_key: "gmail".to_owned(),
2908 source_ref: Some("src-2".to_owned()),
2909 }],
2910 })
2911 .expect("delete receipt");
2912
2913 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2914 let current_count: i64 = conn
2915 .query_row(
2916 "SELECT count(*) FROM operational_current \
2917 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2918 [],
2919 |row| row.get(0),
2920 )
2921 .expect("current count");
2922 assert_eq!(current_count, 0);
2923 }
2924
2925 #[test]
2926 fn writer_latest_state_secondary_indexes_track_put_and_delete() {
2927 let db = NamedTempFile::new().expect("temporary db");
2928 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2929 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2930 conn.execute(
2931 "INSERT INTO operational_collections \
2932 (name, kind, schema_json, retention_json, secondary_indexes_json) \
2933 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2934 [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
2935 )
2936 .expect("seed collection");
2937 drop(conn);
2938 let writer = WriterActor::start(
2939 db.path(),
2940 Arc::new(SchemaManager::new()),
2941 ProvenanceMode::Warn,
2942 Arc::new(TelemetryCounters::default()),
2943 )
2944 .expect("writer");
2945
2946 writer
2947 .submit(WriteRequest {
2948 label: "secondary-index-put".to_owned(),
2949 nodes: vec![],
2950 node_retires: vec![],
2951 edges: vec![],
2952 edge_retires: vec![],
2953 chunks: vec![],
2954 runs: vec![],
2955 steps: vec![],
2956 actions: vec![],
2957 optional_backfills: vec![],
2958 vec_inserts: vec![],
2959 operational_writes: vec![OperationalWrite::Put {
2960 collection: "connector_health".to_owned(),
2961 record_key: "gmail".to_owned(),
2962 payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
2963 .to_owned(),
2964 source_ref: Some("src-1".to_owned()),
2965 }],
2966 })
2967 .expect("put receipt");
2968
2969 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2970 let current_entry_count: i64 = conn
2971 .query_row(
2972 "SELECT count(*) FROM operational_secondary_index_entries \
2973 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2974 [],
2975 |row| row.get(0),
2976 )
2977 .expect("current secondary index count");
2978 assert_eq!(current_entry_count, 2);
2979 drop(conn);
2980
2981 writer
2982 .submit(WriteRequest {
2983 label: "secondary-index-delete".to_owned(),
2984 nodes: vec![],
2985 node_retires: vec![],
2986 edges: vec![],
2987 edge_retires: vec![],
2988 chunks: vec![],
2989 runs: vec![],
2990 steps: vec![],
2991 actions: vec![],
2992 optional_backfills: vec![],
2993 vec_inserts: vec![],
2994 operational_writes: vec![OperationalWrite::Delete {
2995 collection: "connector_health".to_owned(),
2996 record_key: "gmail".to_owned(),
2997 source_ref: Some("src-2".to_owned()),
2998 }],
2999 })
3000 .expect("delete receipt");
3001
3002 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3003 let current_entry_count: i64 = conn
3004 .query_row(
3005 "SELECT count(*) FROM operational_secondary_index_entries \
3006 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3007 [],
3008 |row| row.get(0),
3009 )
3010 .expect("current secondary index count");
3011 assert_eq!(current_entry_count, 0);
3012 }
3013
3014 #[test]
3015 fn writer_latest_state_operational_writes_persist_mutation_order() {
3016 let db = NamedTempFile::new().expect("temporary db");
3017 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3018 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3019 conn.execute(
3020 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3021 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3022 [],
3023 )
3024 .expect("seed collection");
3025 drop(conn);
3026
3027 let writer = WriterActor::start(
3028 db.path(),
3029 Arc::new(SchemaManager::new()),
3030 ProvenanceMode::Warn,
3031 Arc::new(TelemetryCounters::default()),
3032 )
3033 .expect("writer");
3034
3035 writer
3036 .submit(WriteRequest {
3037 label: "ordered-operational-batch".to_owned(),
3038 nodes: vec![],
3039 node_retires: vec![],
3040 edges: vec![],
3041 edge_retires: vec![],
3042 chunks: vec![],
3043 runs: vec![],
3044 steps: vec![],
3045 actions: vec![],
3046 optional_backfills: vec![],
3047 vec_inserts: vec![],
3048 operational_writes: vec![
3049 OperationalWrite::Put {
3050 collection: "connector_health".to_owned(),
3051 record_key: "gmail".to_owned(),
3052 payload_json: r#"{"status":"old"}"#.to_owned(),
3053 source_ref: Some("src-1".to_owned()),
3054 },
3055 OperationalWrite::Delete {
3056 collection: "connector_health".to_owned(),
3057 record_key: "gmail".to_owned(),
3058 source_ref: Some("src-2".to_owned()),
3059 },
3060 OperationalWrite::Put {
3061 collection: "connector_health".to_owned(),
3062 record_key: "gmail".to_owned(),
3063 payload_json: r#"{"status":"new"}"#.to_owned(),
3064 source_ref: Some("src-3".to_owned()),
3065 },
3066 ],
3067 })
3068 .expect("write receipt");
3069
3070 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3071 let rows: Vec<(String, i64)> = {
3072 let mut stmt = conn
3073 .prepare(
3074 "SELECT op_kind, mutation_order FROM operational_mutations \
3075 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3076 ORDER BY mutation_order ASC",
3077 )
3078 .expect("stmt");
3079 stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3080 .expect("rows")
3081 .collect::<Result<_, _>>()
3082 .expect("collect")
3083 };
3084 assert_eq!(
3085 rows,
3086 vec![
3087 ("put".to_owned(), 1),
3088 ("delete".to_owned(), 2),
3089 ("put".to_owned(), 3),
3090 ]
3091 );
3092 let payload: String = conn
3093 .query_row(
3094 "SELECT payload_json FROM operational_current \
3095 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3096 [],
3097 |row| row.get(0),
3098 )
3099 .expect("current payload");
3100 assert_eq!(payload, r#"{"status":"new"}"#);
3101 }
3102
3103 #[test]
3104 fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
3105 let db = NamedTempFile::new().expect("temporary db");
3106 let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
3107 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3108 conn.execute(
3109 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3110 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3111 [],
3112 )
3113 .expect("seed collection");
3114
3115 let request = WriteRequest {
3116 label: "disabled-race".to_owned(),
3117 nodes: vec![],
3118 node_retires: vec![],
3119 edges: vec![],
3120 edge_retires: vec![],
3121 chunks: vec![],
3122 runs: vec![],
3123 steps: vec![],
3124 actions: vec![],
3125 optional_backfills: vec![],
3126 vec_inserts: vec![],
3127 operational_writes: vec![OperationalWrite::Put {
3128 collection: "connector_health".to_owned(),
3129 record_key: "gmail".to_owned(),
3130 payload_json: r#"{"status":"ok"}"#.to_owned(),
3131 source_ref: Some("src-1".to_owned()),
3132 }],
3133 };
3134 let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
3135 resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
3136
3137 conn.execute(
3138 "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
3139 [],
3140 )
3141 .expect("disable collection after preflight");
3142
3143 let error =
3144 apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
3145 assert!(matches!(error, EngineError::InvalidWrite(_)));
3146 assert!(error.to_string().contains("is disabled"));
3147
3148 let mutation_count: i64 = conn
3149 .query_row(
3150 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3151 [],
3152 |row| row.get(0),
3153 )
3154 .expect("mutation count");
3155 assert_eq!(mutation_count, 0);
3156
3157 let current_count: i64 = conn
3158 .query_row(
3159 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3160 [],
3161 |row| row.get(0),
3162 )
3163 .expect("current count");
3164 assert_eq!(current_count, 0);
3165 }
3166
3167 #[test]
3168 fn writer_enforce_validation_rejects_invalid_put_atomically() {
3169 let db = NamedTempFile::new().expect("temporary db");
3170 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3171 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3172 conn.execute(
3173 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3174 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3175 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3176 )
3177 .expect("seed collection");
3178 drop(conn);
3179 let writer = WriterActor::start(
3180 db.path(),
3181 Arc::new(SchemaManager::new()),
3182 ProvenanceMode::Warn,
3183 Arc::new(TelemetryCounters::default()),
3184 )
3185 .expect("writer");
3186
3187 let error = writer
3188 .submit(WriteRequest {
3189 label: "invalid-put".to_owned(),
3190 nodes: vec![NodeInsert {
3191 row_id: "row-1".to_owned(),
3192 logical_id: "lg-1".to_owned(),
3193 kind: "Meeting".to_owned(),
3194 properties: "{}".to_owned(),
3195 source_ref: Some("src-1".to_owned()),
3196 upsert: false,
3197 chunk_policy: ChunkPolicy::Preserve,
3198 content_ref: None,
3199 }],
3200 node_retires: vec![],
3201 edges: vec![],
3202 edge_retires: vec![],
3203 chunks: vec![],
3204 runs: vec![],
3205 steps: vec![],
3206 actions: vec![],
3207 optional_backfills: vec![],
3208 vec_inserts: vec![],
3209 operational_writes: vec![OperationalWrite::Put {
3210 collection: "connector_health".to_owned(),
3211 record_key: "gmail".to_owned(),
3212 payload_json: r#"{"status":"bogus"}"#.to_owned(),
3213 source_ref: Some("src-1".to_owned()),
3214 }],
3215 })
3216 .expect_err("invalid put must reject");
3217 assert!(matches!(error, EngineError::InvalidWrite(_)));
3218 assert!(error.to_string().contains("must be one of"));
3219
3220 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3221 let node_count: i64 = conn
3222 .query_row(
3223 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
3224 [],
3225 |row| row.get(0),
3226 )
3227 .expect("node count");
3228 assert_eq!(node_count, 0);
3229 let mutation_count: i64 = conn
3230 .query_row(
3231 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3232 [],
3233 |row| row.get(0),
3234 )
3235 .expect("mutation count");
3236 assert_eq!(mutation_count, 0);
3237 let current_count: i64 = conn
3238 .query_row(
3239 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3240 [],
3241 |row| row.get(0),
3242 )
3243 .expect("current count");
3244 assert_eq!(current_count, 0);
3245 }
3246
3247 #[test]
3248 fn writer_rejects_append_against_latest_state_collection() {
3249 let db = NamedTempFile::new().expect("temporary db");
3250 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3251 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3252 conn.execute(
3253 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3254 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3255 [],
3256 )
3257 .expect("seed collection");
3258 drop(conn);
3259 let writer = WriterActor::start(
3260 db.path(),
3261 Arc::new(SchemaManager::new()),
3262 ProvenanceMode::Warn,
3263 Arc::new(TelemetryCounters::default()),
3264 )
3265 .expect("writer");
3266
3267 let result = writer.submit(WriteRequest {
3268 label: "bad-append".to_owned(),
3269 nodes: vec![],
3270 node_retires: vec![],
3271 edges: vec![],
3272 edge_retires: vec![],
3273 chunks: vec![],
3274 runs: vec![],
3275 steps: vec![],
3276 actions: vec![],
3277 optional_backfills: vec![],
3278 vec_inserts: vec![],
3279 operational_writes: vec![OperationalWrite::Append {
3280 collection: "connector_health".to_owned(),
3281 record_key: "gmail".to_owned(),
3282 payload_json: r#"{"status":"ok"}"#.to_owned(),
3283 source_ref: Some("src-1".to_owned()),
3284 }],
3285 });
3286
3287 assert!(
3288 matches!(result, Err(EngineError::InvalidWrite(_))),
3289 "latest_state collection must reject Append"
3290 );
3291 }
3292
3293 #[test]
3294 fn writer_upsert_supersedes_prior_active_node() {
3295 let db = NamedTempFile::new().expect("temporary db");
3296 let writer = WriterActor::start(
3297 db.path(),
3298 Arc::new(SchemaManager::new()),
3299 ProvenanceMode::Warn,
3300 Arc::new(TelemetryCounters::default()),
3301 )
3302 .expect("writer");
3303
3304 writer
3305 .submit(WriteRequest {
3306 label: "v1".to_owned(),
3307 nodes: vec![NodeInsert {
3308 row_id: "row-1".to_owned(),
3309 logical_id: "lg-1".to_owned(),
3310 kind: "Meeting".to_owned(),
3311 properties: r#"{"version":1}"#.to_owned(),
3312 source_ref: Some("src-1".to_owned()),
3313 upsert: false,
3314 chunk_policy: ChunkPolicy::Preserve,
3315 content_ref: None,
3316 }],
3317 node_retires: vec![],
3318 edges: vec![],
3319 edge_retires: vec![],
3320 chunks: vec![],
3321 runs: vec![],
3322 steps: vec![],
3323 actions: vec![],
3324 optional_backfills: vec![],
3325 vec_inserts: vec![],
3326 operational_writes: vec![],
3327 })
3328 .expect("v1 write");
3329
3330 writer
3331 .submit(WriteRequest {
3332 label: "v2".to_owned(),
3333 nodes: vec![NodeInsert {
3334 row_id: "row-2".to_owned(),
3335 logical_id: "lg-1".to_owned(),
3336 kind: "Meeting".to_owned(),
3337 properties: r#"{"version":2}"#.to_owned(),
3338 source_ref: Some("src-2".to_owned()),
3339 upsert: true,
3340 chunk_policy: ChunkPolicy::Preserve,
3341 content_ref: None,
3342 }],
3343 node_retires: vec![],
3344 edges: vec![],
3345 edge_retires: vec![],
3346 chunks: vec![],
3347 runs: vec![],
3348 steps: vec![],
3349 actions: vec![],
3350 optional_backfills: vec![],
3351 vec_inserts: vec![],
3352 operational_writes: vec![],
3353 })
3354 .expect("v2 upsert write");
3355
3356 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3357 let (active_row_id, props): (String, String) = conn
3358 .query_row(
3359 "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3360 [],
3361 |row| Ok((row.get(0)?, row.get(1)?)),
3362 )
3363 .expect("active row");
3364 assert_eq!(active_row_id, "row-2");
3365 assert!(props.contains("\"version\":2"));
3366
3367 let superseded: i64 = conn
3368 .query_row(
3369 "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3370 [],
3371 |row| row.get(0),
3372 )
3373 .expect("superseded count");
3374 assert_eq!(superseded, 1);
3375 }
3376
3377 #[test]
3378 fn writer_inserts_edge_between_two_nodes() {
3379 let db = NamedTempFile::new().expect("temporary db");
3380 let writer = WriterActor::start(
3381 db.path(),
3382 Arc::new(SchemaManager::new()),
3383 ProvenanceMode::Warn,
3384 Arc::new(TelemetryCounters::default()),
3385 )
3386 .expect("writer");
3387
3388 writer
3389 .submit(WriteRequest {
3390 label: "nodes-and-edge".to_owned(),
3391 nodes: vec![
3392 NodeInsert {
3393 row_id: "row-meeting".to_owned(),
3394 logical_id: "meeting-1".to_owned(),
3395 kind: "Meeting".to_owned(),
3396 properties: "{}".to_owned(),
3397 source_ref: Some("src-1".to_owned()),
3398 upsert: false,
3399 chunk_policy: ChunkPolicy::Preserve,
3400 content_ref: None,
3401 },
3402 NodeInsert {
3403 row_id: "row-task".to_owned(),
3404 logical_id: "task-1".to_owned(),
3405 kind: "Task".to_owned(),
3406 properties: "{}".to_owned(),
3407 source_ref: Some("src-1".to_owned()),
3408 upsert: false,
3409 chunk_policy: ChunkPolicy::Preserve,
3410 content_ref: None,
3411 },
3412 ],
3413 node_retires: vec![],
3414 edges: vec![EdgeInsert {
3415 row_id: "edge-1".to_owned(),
3416 logical_id: "edge-lg-1".to_owned(),
3417 source_logical_id: "meeting-1".to_owned(),
3418 target_logical_id: "task-1".to_owned(),
3419 kind: "HAS_TASK".to_owned(),
3420 properties: "{}".to_owned(),
3421 source_ref: Some("src-1".to_owned()),
3422 upsert: false,
3423 }],
3424 edge_retires: vec![],
3425 chunks: vec![],
3426 runs: vec![],
3427 steps: vec![],
3428 actions: vec![],
3429 optional_backfills: vec![],
3430 vec_inserts: vec![],
3431 operational_writes: vec![],
3432 })
3433 .expect("write receipt");
3434
3435 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3436 let (src, tgt, kind): (String, String, String) = conn
3437 .query_row(
3438 "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3439 [],
3440 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3441 )
3442 .expect("edge row");
3443 assert_eq!(src, "meeting-1");
3444 assert_eq!(tgt, "task-1");
3445 assert_eq!(kind, "HAS_TASK");
3446 }
3447
3448 #[test]
3449 #[allow(clippy::too_many_lines)]
3450 fn writer_upsert_supersedes_prior_active_edge() {
3451 let db = NamedTempFile::new().expect("temporary db");
3452 let writer = WriterActor::start(
3453 db.path(),
3454 Arc::new(SchemaManager::new()),
3455 ProvenanceMode::Warn,
3456 Arc::new(TelemetryCounters::default()),
3457 )
3458 .expect("writer");
3459
3460 writer
3462 .submit(WriteRequest {
3463 label: "nodes".to_owned(),
3464 nodes: vec![
3465 NodeInsert {
3466 row_id: "row-a".to_owned(),
3467 logical_id: "node-a".to_owned(),
3468 kind: "Meeting".to_owned(),
3469 properties: "{}".to_owned(),
3470 source_ref: Some("src-1".to_owned()),
3471 upsert: false,
3472 chunk_policy: ChunkPolicy::Preserve,
3473 content_ref: None,
3474 },
3475 NodeInsert {
3476 row_id: "row-b".to_owned(),
3477 logical_id: "node-b".to_owned(),
3478 kind: "Task".to_owned(),
3479 properties: "{}".to_owned(),
3480 source_ref: Some("src-1".to_owned()),
3481 upsert: false,
3482 chunk_policy: ChunkPolicy::Preserve,
3483 content_ref: None,
3484 },
3485 ],
3486 node_retires: vec![],
3487 edges: vec![],
3488 edge_retires: vec![],
3489 chunks: vec![],
3490 runs: vec![],
3491 steps: vec![],
3492 actions: vec![],
3493 optional_backfills: vec![],
3494 vec_inserts: vec![],
3495 operational_writes: vec![],
3496 })
3497 .expect("nodes write");
3498
3499 writer
3501 .submit(WriteRequest {
3502 label: "edge-v1".to_owned(),
3503 nodes: vec![],
3504 node_retires: vec![],
3505 edges: vec![EdgeInsert {
3506 row_id: "edge-row-1".to_owned(),
3507 logical_id: "edge-lg-1".to_owned(),
3508 source_logical_id: "node-a".to_owned(),
3509 target_logical_id: "node-b".to_owned(),
3510 kind: "HAS_TASK".to_owned(),
3511 properties: r#"{"weight":1}"#.to_owned(),
3512 source_ref: Some("src-1".to_owned()),
3513 upsert: false,
3514 }],
3515 edge_retires: vec![],
3516 chunks: vec![],
3517 runs: vec![],
3518 steps: vec![],
3519 actions: vec![],
3520 optional_backfills: vec![],
3521 vec_inserts: vec![],
3522 operational_writes: vec![],
3523 })
3524 .expect("edge v1 write");
3525
3526 writer
3528 .submit(WriteRequest {
3529 label: "edge-v2".to_owned(),
3530 nodes: vec![],
3531 node_retires: vec![],
3532 edges: vec![EdgeInsert {
3533 row_id: "edge-row-2".to_owned(),
3534 logical_id: "edge-lg-1".to_owned(),
3535 source_logical_id: "node-a".to_owned(),
3536 target_logical_id: "node-b".to_owned(),
3537 kind: "HAS_TASK".to_owned(),
3538 properties: r#"{"weight":2}"#.to_owned(),
3539 source_ref: Some("src-2".to_owned()),
3540 upsert: true,
3541 }],
3542 edge_retires: vec![],
3543 chunks: vec![],
3544 runs: vec![],
3545 steps: vec![],
3546 actions: vec![],
3547 optional_backfills: vec![],
3548 vec_inserts: vec![],
3549 operational_writes: vec![],
3550 })
3551 .expect("edge v2 upsert");
3552
3553 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3554 let (active_row_id, props): (String, String) = conn
3555 .query_row(
3556 "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3557 [],
3558 |row| Ok((row.get(0)?, row.get(1)?)),
3559 )
3560 .expect("active edge");
3561 assert_eq!(active_row_id, "edge-row-2");
3562 assert!(props.contains("\"weight\":2"));
3563
3564 let superseded: i64 = conn
3565 .query_row(
3566 "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3567 [],
3568 |row| row.get(0),
3569 )
3570 .expect("superseded count");
3571 assert_eq!(superseded, 1);
3572 }
3573
3574 #[test]
3575 fn writer_fts_rows_are_written_to_database() {
3576 let db = NamedTempFile::new().expect("temporary db");
3577 let writer = WriterActor::start(
3578 db.path(),
3579 Arc::new(SchemaManager::new()),
3580 ProvenanceMode::Warn,
3581 Arc::new(TelemetryCounters::default()),
3582 )
3583 .expect("writer");
3584
3585 writer
3586 .submit(WriteRequest {
3587 label: "seed".to_owned(),
3588 nodes: vec![NodeInsert {
3589 row_id: "row-1".to_owned(),
3590 logical_id: "logical-1".to_owned(),
3591 kind: "Meeting".to_owned(),
3592 properties: "{}".to_owned(),
3593 source_ref: Some("src-1".to_owned()),
3594 upsert: false,
3595 chunk_policy: ChunkPolicy::Preserve,
3596 content_ref: None,
3597 }],
3598 node_retires: vec![],
3599 edges: vec![],
3600 edge_retires: vec![],
3601 chunks: vec![ChunkInsert {
3602 id: "chunk-1".to_owned(),
3603 node_logical_id: "logical-1".to_owned(),
3604 text_content: "budget discussion".to_owned(),
3605 byte_start: None,
3606 byte_end: None,
3607 content_hash: None,
3608 }],
3609 runs: vec![],
3610 steps: vec![],
3611 actions: vec![],
3612 optional_backfills: vec![],
3613 vec_inserts: vec![],
3614 operational_writes: vec![],
3615 })
3616 .expect("write receipt");
3617
3618 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3619 let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3620 conn.query_row(
3621 "SELECT chunk_id, node_logical_id, kind, text_content \
3622 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3623 [],
3624 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3625 )
3626 .expect("fts row");
3627 assert_eq!(chunk_id, "chunk-1");
3628 assert_eq!(node_logical_id, "logical-1");
3629 assert_eq!(kind, "Meeting");
3630 assert_eq!(text_content, "budget discussion");
3631 }
3632
3633 #[test]
3634 fn writer_receipt_warns_on_nodes_without_source_ref() {
3635 let db = NamedTempFile::new().expect("temporary db");
3636 let writer = WriterActor::start(
3637 db.path(),
3638 Arc::new(SchemaManager::new()),
3639 ProvenanceMode::Warn,
3640 Arc::new(TelemetryCounters::default()),
3641 )
3642 .expect("writer");
3643
3644 let receipt = writer
3645 .submit(WriteRequest {
3646 label: "no-source".to_owned(),
3647 nodes: vec![NodeInsert {
3648 row_id: "row-1".to_owned(),
3649 logical_id: "logical-1".to_owned(),
3650 kind: "Meeting".to_owned(),
3651 properties: "{}".to_owned(),
3652 source_ref: None,
3653 upsert: false,
3654 chunk_policy: ChunkPolicy::Preserve,
3655 content_ref: None,
3656 }],
3657 node_retires: vec![],
3658 edges: vec![],
3659 edge_retires: vec![],
3660 chunks: vec![],
3661 runs: vec![],
3662 steps: vec![],
3663 actions: vec![],
3664 optional_backfills: vec![],
3665 vec_inserts: vec![],
3666 operational_writes: vec![],
3667 })
3668 .expect("write receipt");
3669
3670 assert_eq!(receipt.provenance_warnings.len(), 1);
3671 assert!(receipt.provenance_warnings[0].contains("logical-1"));
3672 }
3673
3674 #[test]
3675 fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3676 let db = NamedTempFile::new().expect("temporary db");
3677 let writer = WriterActor::start(
3678 db.path(),
3679 Arc::new(SchemaManager::new()),
3680 ProvenanceMode::Warn,
3681 Arc::new(TelemetryCounters::default()),
3682 )
3683 .expect("writer");
3684
3685 let receipt = writer
3686 .submit(WriteRequest {
3687 label: "with-source".to_owned(),
3688 nodes: vec![NodeInsert {
3689 row_id: "row-1".to_owned(),
3690 logical_id: "logical-1".to_owned(),
3691 kind: "Meeting".to_owned(),
3692 properties: "{}".to_owned(),
3693 source_ref: Some("src-1".to_owned()),
3694 upsert: false,
3695 chunk_policy: ChunkPolicy::Preserve,
3696 content_ref: None,
3697 }],
3698 node_retires: vec![],
3699 edges: vec![],
3700 edge_retires: vec![],
3701 chunks: vec![],
3702 runs: vec![],
3703 steps: vec![],
3704 actions: vec![],
3705 optional_backfills: vec![],
3706 vec_inserts: vec![],
3707 operational_writes: vec![],
3708 })
3709 .expect("write receipt");
3710
3711 assert!(receipt.provenance_warnings.is_empty());
3712 }
3713
3714 #[test]
3715 fn writer_accepts_chunk_for_pre_existing_node() {
3716 let db = NamedTempFile::new().expect("temporary db");
3717 let writer = WriterActor::start(
3718 db.path(),
3719 Arc::new(SchemaManager::new()),
3720 ProvenanceMode::Warn,
3721 Arc::new(TelemetryCounters::default()),
3722 )
3723 .expect("writer");
3724
3725 writer
3727 .submit(WriteRequest {
3728 label: "r1".to_owned(),
3729 nodes: vec![NodeInsert {
3730 row_id: "row-1".to_owned(),
3731 logical_id: "logical-1".to_owned(),
3732 kind: "Meeting".to_owned(),
3733 properties: "{}".to_owned(),
3734 source_ref: Some("src-1".to_owned()),
3735 upsert: false,
3736 chunk_policy: ChunkPolicy::Preserve,
3737 content_ref: None,
3738 }],
3739 node_retires: vec![],
3740 edges: vec![],
3741 edge_retires: vec![],
3742 chunks: vec![],
3743 runs: vec![],
3744 steps: vec![],
3745 actions: vec![],
3746 optional_backfills: vec![],
3747 vec_inserts: vec![],
3748 operational_writes: vec![],
3749 })
3750 .expect("r1 write");
3751
3752 writer
3754 .submit(WriteRequest {
3755 label: "r2".to_owned(),
3756 nodes: vec![],
3757 node_retires: vec![],
3758 edges: vec![],
3759 edge_retires: vec![],
3760 chunks: vec![ChunkInsert {
3761 id: "chunk-1".to_owned(),
3762 node_logical_id: "logical-1".to_owned(),
3763 text_content: "budget discussion".to_owned(),
3764 byte_start: None,
3765 byte_end: None,
3766 content_hash: None,
3767 }],
3768 runs: vec![],
3769 steps: vec![],
3770 actions: vec![],
3771 optional_backfills: vec![],
3772 vec_inserts: vec![],
3773 operational_writes: vec![],
3774 })
3775 .expect("r2 write — chunk for pre-existing node");
3776
3777 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3778 let count: i64 = conn
3779 .query_row(
3780 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3781 [],
3782 |row| row.get(0),
3783 )
3784 .expect("fts count");
3785 assert_eq!(
3786 count, 1,
3787 "FTS row must exist for chunk attached to pre-existing node"
3788 );
3789 }
3790
3791 #[test]
3792 fn writer_rejects_chunk_for_completely_unknown_node() {
3793 let db = NamedTempFile::new().expect("temporary db");
3794 let writer = WriterActor::start(
3795 db.path(),
3796 Arc::new(SchemaManager::new()),
3797 ProvenanceMode::Warn,
3798 Arc::new(TelemetryCounters::default()),
3799 )
3800 .expect("writer");
3801
3802 let result = writer.submit(WriteRequest {
3803 label: "bad".to_owned(),
3804 nodes: vec![],
3805 node_retires: vec![],
3806 edges: vec![],
3807 edge_retires: vec![],
3808 chunks: vec![ChunkInsert {
3809 id: "chunk-1".to_owned(),
3810 node_logical_id: "nonexistent".to_owned(),
3811 text_content: "some text".to_owned(),
3812 byte_start: None,
3813 byte_end: None,
3814 content_hash: None,
3815 }],
3816 runs: vec![],
3817 steps: vec![],
3818 actions: vec![],
3819 optional_backfills: vec![],
3820 vec_inserts: vec![],
3821 operational_writes: vec![],
3822 });
3823
3824 assert!(
3825 matches!(result, Err(EngineError::InvalidWrite(_))),
3826 "completely unknown node must return InvalidWrite"
3827 );
3828 }
3829
3830 #[test]
3831 fn writer_executes_typed_nodes_chunks_and_derived_projections() {
3832 let db = NamedTempFile::new().expect("temporary db");
3833 let writer = WriterActor::start(
3834 db.path(),
3835 Arc::new(SchemaManager::new()),
3836 ProvenanceMode::Warn,
3837 Arc::new(TelemetryCounters::default()),
3838 )
3839 .expect("writer");
3840
3841 let receipt = writer
3842 .submit(WriteRequest {
3843 label: "seed".to_owned(),
3844 nodes: vec![NodeInsert {
3845 row_id: "row-1".to_owned(),
3846 logical_id: "logical-1".to_owned(),
3847 kind: "Meeting".to_owned(),
3848 properties: "{}".to_owned(),
3849 source_ref: None,
3850 upsert: false,
3851 chunk_policy: ChunkPolicy::Preserve,
3852 content_ref: None,
3853 }],
3854 node_retires: vec![],
3855 edges: vec![],
3856 edge_retires: vec![],
3857 chunks: vec![ChunkInsert {
3858 id: "chunk-1".to_owned(),
3859 node_logical_id: "logical-1".to_owned(),
3860 text_content: "budget discussion".to_owned(),
3861 byte_start: None,
3862 byte_end: None,
3863 content_hash: None,
3864 }],
3865 runs: vec![],
3866 steps: vec![],
3867 actions: vec![],
3868 optional_backfills: vec![],
3869 vec_inserts: vec![],
3870 operational_writes: vec![],
3871 })
3872 .expect("write receipt");
3873
3874 assert_eq!(receipt.label, "seed");
3875 }
3876
3877 #[test]
3878 fn writer_node_retire_supersedes_active_node() {
3879 let db = NamedTempFile::new().expect("temporary db");
3880 let writer = WriterActor::start(
3881 db.path(),
3882 Arc::new(SchemaManager::new()),
3883 ProvenanceMode::Warn,
3884 Arc::new(TelemetryCounters::default()),
3885 )
3886 .expect("writer");
3887
3888 writer
3889 .submit(WriteRequest {
3890 label: "seed".to_owned(),
3891 nodes: vec![NodeInsert {
3892 row_id: "row-1".to_owned(),
3893 logical_id: "meeting-1".to_owned(),
3894 kind: "Meeting".to_owned(),
3895 properties: "{}".to_owned(),
3896 source_ref: Some("src-1".to_owned()),
3897 upsert: false,
3898 chunk_policy: ChunkPolicy::Preserve,
3899 content_ref: None,
3900 }],
3901 node_retires: vec![],
3902 edges: vec![],
3903 edge_retires: vec![],
3904 chunks: vec![],
3905 runs: vec![],
3906 steps: vec![],
3907 actions: vec![],
3908 optional_backfills: vec![],
3909 vec_inserts: vec![],
3910 operational_writes: vec![],
3911 })
3912 .expect("seed write");
3913
3914 writer
3915 .submit(WriteRequest {
3916 label: "retire".to_owned(),
3917 nodes: vec![],
3918 node_retires: vec![NodeRetire {
3919 logical_id: "meeting-1".to_owned(),
3920 source_ref: Some("src-2".to_owned()),
3921 }],
3922 edges: vec![],
3923 edge_retires: vec![],
3924 chunks: vec![],
3925 runs: vec![],
3926 steps: vec![],
3927 actions: vec![],
3928 optional_backfills: vec![],
3929 vec_inserts: vec![],
3930 operational_writes: vec![],
3931 })
3932 .expect("retire write");
3933
3934 let conn = rusqlite::Connection::open(db.path()).expect("open");
3935 let active: i64 = conn
3936 .query_row(
3937 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
3938 [],
3939 |r| r.get(0),
3940 )
3941 .expect("count active");
3942 let historical: i64 = conn
3943 .query_row(
3944 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
3945 [],
3946 |r| r.get(0),
3947 )
3948 .expect("count historical");
3949
3950 assert_eq!(active, 0, "active count must be 0 after retire");
3951 assert_eq!(historical, 1, "historical count must be 1 after retire");
3952 }
3953
3954 #[test]
3955 fn writer_node_retire_preserves_chunks_and_clears_fts() {
3956 let db = NamedTempFile::new().expect("temporary db");
3957 let writer = WriterActor::start(
3958 db.path(),
3959 Arc::new(SchemaManager::new()),
3960 ProvenanceMode::Warn,
3961 Arc::new(TelemetryCounters::default()),
3962 )
3963 .expect("writer");
3964
3965 writer
3966 .submit(WriteRequest {
3967 label: "seed".to_owned(),
3968 nodes: vec![NodeInsert {
3969 row_id: "row-1".to_owned(),
3970 logical_id: "meeting-1".to_owned(),
3971 kind: "Meeting".to_owned(),
3972 properties: "{}".to_owned(),
3973 source_ref: Some("src-1".to_owned()),
3974 upsert: false,
3975 chunk_policy: ChunkPolicy::Preserve,
3976 content_ref: None,
3977 }],
3978 node_retires: vec![],
3979 edges: vec![],
3980 edge_retires: vec![],
3981 chunks: vec![ChunkInsert {
3982 id: "chunk-1".to_owned(),
3983 node_logical_id: "meeting-1".to_owned(),
3984 text_content: "budget discussion".to_owned(),
3985 byte_start: None,
3986 byte_end: None,
3987 content_hash: None,
3988 }],
3989 runs: vec![],
3990 steps: vec![],
3991 actions: vec![],
3992 optional_backfills: vec![],
3993 vec_inserts: vec![],
3994 operational_writes: vec![],
3995 })
3996 .expect("seed write");
3997
3998 writer
3999 .submit(WriteRequest {
4000 label: "retire".to_owned(),
4001 nodes: vec![],
4002 node_retires: vec![NodeRetire {
4003 logical_id: "meeting-1".to_owned(),
4004 source_ref: Some("src-2".to_owned()),
4005 }],
4006 edges: vec![],
4007 edge_retires: vec![],
4008 chunks: vec![],
4009 runs: vec![],
4010 steps: vec![],
4011 actions: vec![],
4012 optional_backfills: vec![],
4013 vec_inserts: vec![],
4014 operational_writes: vec![],
4015 })
4016 .expect("retire write");
4017
4018 let conn = rusqlite::Connection::open(db.path()).expect("open");
4019 let chunk_count: i64 = conn
4020 .query_row(
4021 "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
4022 [],
4023 |r| r.get(0),
4024 )
4025 .expect("chunk count");
4026 let fts_count: i64 = conn
4027 .query_row(
4028 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
4029 [],
4030 |r| r.get(0),
4031 )
4032 .expect("fts count");
4033
4034 assert_eq!(
4035 chunk_count, 1,
4036 "chunks must remain after node retire so restore can re-establish content"
4037 );
4038 assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
4039 }
4040
4041 #[test]
4042 fn writer_edge_retire_supersedes_active_edge() {
4043 let db = NamedTempFile::new().expect("temporary db");
4044 let writer = WriterActor::start(
4045 db.path(),
4046 Arc::new(SchemaManager::new()),
4047 ProvenanceMode::Warn,
4048 Arc::new(TelemetryCounters::default()),
4049 )
4050 .expect("writer");
4051
4052 writer
4053 .submit(WriteRequest {
4054 label: "seed".to_owned(),
4055 nodes: vec![
4056 NodeInsert {
4057 row_id: "row-a".to_owned(),
4058 logical_id: "node-a".to_owned(),
4059 kind: "Meeting".to_owned(),
4060 properties: "{}".to_owned(),
4061 source_ref: Some("src-1".to_owned()),
4062 upsert: false,
4063 chunk_policy: ChunkPolicy::Preserve,
4064 content_ref: None,
4065 },
4066 NodeInsert {
4067 row_id: "row-b".to_owned(),
4068 logical_id: "node-b".to_owned(),
4069 kind: "Task".to_owned(),
4070 properties: "{}".to_owned(),
4071 source_ref: Some("src-1".to_owned()),
4072 upsert: false,
4073 chunk_policy: ChunkPolicy::Preserve,
4074 content_ref: None,
4075 },
4076 ],
4077 node_retires: vec![],
4078 edges: vec![EdgeInsert {
4079 row_id: "edge-1".to_owned(),
4080 logical_id: "edge-lg-1".to_owned(),
4081 source_logical_id: "node-a".to_owned(),
4082 target_logical_id: "node-b".to_owned(),
4083 kind: "HAS_TASK".to_owned(),
4084 properties: "{}".to_owned(),
4085 source_ref: Some("src-1".to_owned()),
4086 upsert: false,
4087 }],
4088 edge_retires: vec![],
4089 chunks: vec![],
4090 runs: vec![],
4091 steps: vec![],
4092 actions: vec![],
4093 optional_backfills: vec![],
4094 vec_inserts: vec![],
4095 operational_writes: vec![],
4096 })
4097 .expect("seed write");
4098
4099 writer
4100 .submit(WriteRequest {
4101 label: "retire-edge".to_owned(),
4102 nodes: vec![],
4103 node_retires: vec![],
4104 edges: vec![],
4105 edge_retires: vec![EdgeRetire {
4106 logical_id: "edge-lg-1".to_owned(),
4107 source_ref: Some("src-2".to_owned()),
4108 }],
4109 chunks: vec![],
4110 runs: vec![],
4111 steps: vec![],
4112 actions: vec![],
4113 optional_backfills: vec![],
4114 vec_inserts: vec![],
4115 operational_writes: vec![],
4116 })
4117 .expect("retire edge write");
4118
4119 let conn = rusqlite::Connection::open(db.path()).expect("open");
4120 let active: i64 = conn
4121 .query_row(
4122 "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
4123 [],
4124 |r| r.get(0),
4125 )
4126 .expect("active edge count");
4127
4128 assert_eq!(active, 0, "active edge count must be 0 after retire");
4129 }
4130
4131 #[test]
4132 fn writer_retire_without_source_ref_emits_provenance_warning() {
4133 let db = NamedTempFile::new().expect("temporary db");
4134 let writer = WriterActor::start(
4135 db.path(),
4136 Arc::new(SchemaManager::new()),
4137 ProvenanceMode::Warn,
4138 Arc::new(TelemetryCounters::default()),
4139 )
4140 .expect("writer");
4141
4142 writer
4143 .submit(WriteRequest {
4144 label: "seed".to_owned(),
4145 nodes: vec![NodeInsert {
4146 row_id: "row-1".to_owned(),
4147 logical_id: "meeting-1".to_owned(),
4148 kind: "Meeting".to_owned(),
4149 properties: "{}".to_owned(),
4150 source_ref: Some("src-1".to_owned()),
4151 upsert: false,
4152 chunk_policy: ChunkPolicy::Preserve,
4153 content_ref: None,
4154 }],
4155 node_retires: vec![],
4156 edges: vec![],
4157 edge_retires: vec![],
4158 chunks: vec![],
4159 runs: vec![],
4160 steps: vec![],
4161 actions: vec![],
4162 optional_backfills: vec![],
4163 vec_inserts: vec![],
4164 operational_writes: vec![],
4165 })
4166 .expect("seed write");
4167
4168 let receipt = writer
4169 .submit(WriteRequest {
4170 label: "retire-no-src".to_owned(),
4171 nodes: vec![],
4172 node_retires: vec![NodeRetire {
4173 logical_id: "meeting-1".to_owned(),
4174 source_ref: None,
4175 }],
4176 edges: vec![],
4177 edge_retires: vec![],
4178 chunks: vec![],
4179 runs: vec![],
4180 steps: vec![],
4181 actions: vec![],
4182 optional_backfills: vec![],
4183 vec_inserts: vec![],
4184 operational_writes: vec![],
4185 })
4186 .expect("retire write");
4187
4188 assert!(
4189 !receipt.provenance_warnings.is_empty(),
4190 "retire without source_ref must emit a provenance warning"
4191 );
4192 }
4193
4194 #[test]
4195 #[allow(clippy::too_many_lines)]
4196 fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
4197 let db = NamedTempFile::new().expect("temporary db");
4198 let writer = WriterActor::start(
4199 db.path(),
4200 Arc::new(SchemaManager::new()),
4201 ProvenanceMode::Warn,
4202 Arc::new(TelemetryCounters::default()),
4203 )
4204 .expect("writer");
4205
4206 writer
4207 .submit(WriteRequest {
4208 label: "v1".to_owned(),
4209 nodes: vec![NodeInsert {
4210 row_id: "row-1".to_owned(),
4211 logical_id: "meeting-1".to_owned(),
4212 kind: "Meeting".to_owned(),
4213 properties: "{}".to_owned(),
4214 source_ref: Some("src-1".to_owned()),
4215 upsert: false,
4216 chunk_policy: ChunkPolicy::Preserve,
4217 content_ref: None,
4218 }],
4219 node_retires: vec![],
4220 edges: vec![],
4221 edge_retires: vec![],
4222 chunks: vec![ChunkInsert {
4223 id: "chunk-old".to_owned(),
4224 node_logical_id: "meeting-1".to_owned(),
4225 text_content: "old text".to_owned(),
4226 byte_start: None,
4227 byte_end: None,
4228 content_hash: None,
4229 }],
4230 runs: vec![],
4231 steps: vec![],
4232 actions: vec![],
4233 optional_backfills: vec![],
4234 vec_inserts: vec![],
4235 operational_writes: vec![],
4236 })
4237 .expect("v1 write");
4238
4239 writer
4240 .submit(WriteRequest {
4241 label: "v2".to_owned(),
4242 nodes: vec![NodeInsert {
4243 row_id: "row-2".to_owned(),
4244 logical_id: "meeting-1".to_owned(),
4245 kind: "Meeting".to_owned(),
4246 properties: "{}".to_owned(),
4247 source_ref: Some("src-2".to_owned()),
4248 upsert: true,
4249 chunk_policy: ChunkPolicy::Replace,
4250 content_ref: None,
4251 }],
4252 node_retires: vec![],
4253 edges: vec![],
4254 edge_retires: vec![],
4255 chunks: vec![ChunkInsert {
4256 id: "chunk-new".to_owned(),
4257 node_logical_id: "meeting-1".to_owned(),
4258 text_content: "new text".to_owned(),
4259 byte_start: None,
4260 byte_end: None,
4261 content_hash: None,
4262 }],
4263 runs: vec![],
4264 steps: vec![],
4265 actions: vec![],
4266 optional_backfills: vec![],
4267 vec_inserts: vec![],
4268 operational_writes: vec![],
4269 })
4270 .expect("v2 write");
4271
4272 let conn = rusqlite::Connection::open(db.path()).expect("open");
4273 let old_chunk: i64 = conn
4274 .query_row(
4275 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4276 [],
4277 |r| r.get(0),
4278 )
4279 .expect("old chunk count");
4280 let new_chunk: i64 = conn
4281 .query_row(
4282 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4283 [],
4284 |r| r.get(0),
4285 )
4286 .expect("new chunk count");
4287 let fts_old: i64 = conn
4288 .query_row(
4289 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4290 [],
4291 |r| r.get(0),
4292 )
4293 .expect("old fts count");
4294
4295 assert_eq!(
4296 old_chunk, 0,
4297 "old chunk must be deleted by ChunkPolicy::Replace"
4298 );
4299 assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4300 assert_eq!(
4301 fts_old, 0,
4302 "old FTS row must be deleted by ChunkPolicy::Replace"
4303 );
4304 }
4305
4306 #[test]
4307 fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4308 let db = NamedTempFile::new().expect("temporary db");
4309 let writer = WriterActor::start(
4310 db.path(),
4311 Arc::new(SchemaManager::new()),
4312 ProvenanceMode::Warn,
4313 Arc::new(TelemetryCounters::default()),
4314 )
4315 .expect("writer");
4316
4317 writer
4318 .submit(WriteRequest {
4319 label: "v1".to_owned(),
4320 nodes: vec![NodeInsert {
4321 row_id: "row-1".to_owned(),
4322 logical_id: "meeting-1".to_owned(),
4323 kind: "Meeting".to_owned(),
4324 properties: "{}".to_owned(),
4325 source_ref: Some("src-1".to_owned()),
4326 upsert: false,
4327 chunk_policy: ChunkPolicy::Preserve,
4328 content_ref: None,
4329 }],
4330 node_retires: vec![],
4331 edges: vec![],
4332 edge_retires: vec![],
4333 chunks: vec![ChunkInsert {
4334 id: "chunk-old".to_owned(),
4335 node_logical_id: "meeting-1".to_owned(),
4336 text_content: "old text".to_owned(),
4337 byte_start: None,
4338 byte_end: None,
4339 content_hash: None,
4340 }],
4341 runs: vec![],
4342 steps: vec![],
4343 actions: vec![],
4344 optional_backfills: vec![],
4345 vec_inserts: vec![],
4346 operational_writes: vec![],
4347 })
4348 .expect("v1 write");
4349
4350 writer
4351 .submit(WriteRequest {
4352 label: "v2-props-only".to_owned(),
4353 nodes: vec![NodeInsert {
4354 row_id: "row-2".to_owned(),
4355 logical_id: "meeting-1".to_owned(),
4356 kind: "Meeting".to_owned(),
4357 properties: r#"{"status":"updated"}"#.to_owned(),
4358 source_ref: Some("src-2".to_owned()),
4359 upsert: true,
4360 chunk_policy: ChunkPolicy::Preserve,
4361 content_ref: None,
4362 }],
4363 node_retires: vec![],
4364 edges: vec![],
4365 edge_retires: vec![],
4366 chunks: vec![],
4367 runs: vec![],
4368 steps: vec![],
4369 actions: vec![],
4370 optional_backfills: vec![],
4371 vec_inserts: vec![],
4372 operational_writes: vec![],
4373 })
4374 .expect("v2 preserve write");
4375
4376 let conn = rusqlite::Connection::open(db.path()).expect("open");
4377 let old_chunk: i64 = conn
4378 .query_row(
4379 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4380 [],
4381 |r| r.get(0),
4382 )
4383 .expect("old chunk count");
4384
4385 assert_eq!(
4386 old_chunk, 1,
4387 "old chunk must be preserved by ChunkPolicy::Preserve"
4388 );
4389 }
4390
4391 #[test]
4392 fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4393 let db = NamedTempFile::new().expect("temporary db");
4394 let writer = WriterActor::start(
4395 db.path(),
4396 Arc::new(SchemaManager::new()),
4397 ProvenanceMode::Warn,
4398 Arc::new(TelemetryCounters::default()),
4399 )
4400 .expect("writer");
4401
4402 writer
4403 .submit(WriteRequest {
4404 label: "v1".to_owned(),
4405 nodes: vec![NodeInsert {
4406 row_id: "row-1".to_owned(),
4407 logical_id: "meeting-1".to_owned(),
4408 kind: "Meeting".to_owned(),
4409 properties: "{}".to_owned(),
4410 source_ref: Some("src-1".to_owned()),
4411 upsert: false,
4412 chunk_policy: ChunkPolicy::Preserve,
4413 content_ref: None,
4414 }],
4415 node_retires: vec![],
4416 edges: vec![],
4417 edge_retires: vec![],
4418 chunks: vec![ChunkInsert {
4419 id: "chunk-existing".to_owned(),
4420 node_logical_id: "meeting-1".to_owned(),
4421 text_content: "existing text".to_owned(),
4422 byte_start: None,
4423 byte_end: None,
4424 content_hash: None,
4425 }],
4426 runs: vec![],
4427 steps: vec![],
4428 actions: vec![],
4429 optional_backfills: vec![],
4430 vec_inserts: vec![],
4431 operational_writes: vec![],
4432 })
4433 .expect("v1 write");
4434
4435 writer
4437 .submit(WriteRequest {
4438 label: "insert-no-upsert".to_owned(),
4439 nodes: vec![NodeInsert {
4440 row_id: "row-2".to_owned(),
4441 logical_id: "meeting-2".to_owned(),
4442 kind: "Meeting".to_owned(),
4443 properties: "{}".to_owned(),
4444 source_ref: Some("src-2".to_owned()),
4445 upsert: false,
4446 chunk_policy: ChunkPolicy::Replace,
4447 content_ref: None,
4448 }],
4449 node_retires: vec![],
4450 edges: vec![],
4451 edge_retires: vec![],
4452 chunks: vec![],
4453 runs: vec![],
4454 steps: vec![],
4455 actions: vec![],
4456 optional_backfills: vec![],
4457 vec_inserts: vec![],
4458 operational_writes: vec![],
4459 })
4460 .expect("insert no-upsert write");
4461
4462 let conn = rusqlite::Connection::open(db.path()).expect("open");
4463 let existing_chunk: i64 = conn
4464 .query_row(
4465 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4466 [],
4467 |r| r.get(0),
4468 )
4469 .expect("chunk count");
4470
4471 assert_eq!(
4472 existing_chunk, 1,
4473 "ChunkPolicy::Replace without upsert must not delete existing chunks"
4474 );
4475 }
4476
4477 #[test]
4478 fn writer_run_upsert_supersedes_prior_active_run() {
4479 let db = NamedTempFile::new().expect("temporary db");
4480 let writer = WriterActor::start(
4481 db.path(),
4482 Arc::new(SchemaManager::new()),
4483 ProvenanceMode::Warn,
4484 Arc::new(TelemetryCounters::default()),
4485 )
4486 .expect("writer");
4487
4488 writer
4489 .submit(WriteRequest {
4490 label: "v1".to_owned(),
4491 nodes: vec![],
4492 node_retires: vec![],
4493 edges: vec![],
4494 edge_retires: vec![],
4495 chunks: vec![],
4496 runs: vec![RunInsert {
4497 id: "run-v1".to_owned(),
4498 kind: "session".to_owned(),
4499 status: "completed".to_owned(),
4500 properties: "{}".to_owned(),
4501 source_ref: Some("src-1".to_owned()),
4502 upsert: false,
4503 supersedes_id: None,
4504 }],
4505 steps: vec![],
4506 actions: vec![],
4507 optional_backfills: vec![],
4508 vec_inserts: vec![],
4509 operational_writes: vec![],
4510 })
4511 .expect("v1 run write");
4512
4513 writer
4514 .submit(WriteRequest {
4515 label: "v2".to_owned(),
4516 nodes: vec![],
4517 node_retires: vec![],
4518 edges: vec![],
4519 edge_retires: vec![],
4520 chunks: vec![],
4521 runs: vec![RunInsert {
4522 id: "run-v2".to_owned(),
4523 kind: "session".to_owned(),
4524 status: "completed".to_owned(),
4525 properties: "{}".to_owned(),
4526 source_ref: Some("src-2".to_owned()),
4527 upsert: true,
4528 supersedes_id: Some("run-v1".to_owned()),
4529 }],
4530 steps: vec![],
4531 actions: vec![],
4532 optional_backfills: vec![],
4533 vec_inserts: vec![],
4534 operational_writes: vec![],
4535 })
4536 .expect("v2 run write");
4537
4538 let conn = rusqlite::Connection::open(db.path()).expect("open");
4539 let v1_historical: i64 = conn
4540 .query_row(
4541 "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4542 [],
4543 |r| r.get(0),
4544 )
4545 .expect("v1 historical count");
4546 let v2_active: i64 = conn
4547 .query_row(
4548 "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4549 [],
4550 |r| r.get(0),
4551 )
4552 .expect("v2 active count");
4553
4554 assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4555 assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4556 }
4557
4558 #[test]
4559 fn writer_step_upsert_supersedes_prior_active_step() {
4560 let db = NamedTempFile::new().expect("temporary db");
4561 let writer = WriterActor::start(
4562 db.path(),
4563 Arc::new(SchemaManager::new()),
4564 ProvenanceMode::Warn,
4565 Arc::new(TelemetryCounters::default()),
4566 )
4567 .expect("writer");
4568
4569 writer
4570 .submit(WriteRequest {
4571 label: "v1".to_owned(),
4572 nodes: vec![],
4573 node_retires: vec![],
4574 edges: vec![],
4575 edge_retires: vec![],
4576 chunks: vec![],
4577 runs: vec![RunInsert {
4578 id: "run-1".to_owned(),
4579 kind: "session".to_owned(),
4580 status: "completed".to_owned(),
4581 properties: "{}".to_owned(),
4582 source_ref: Some("src-1".to_owned()),
4583 upsert: false,
4584 supersedes_id: None,
4585 }],
4586 steps: vec![StepInsert {
4587 id: "step-v1".to_owned(),
4588 run_id: "run-1".to_owned(),
4589 kind: "llm".to_owned(),
4590 status: "completed".to_owned(),
4591 properties: "{}".to_owned(),
4592 source_ref: Some("src-1".to_owned()),
4593 upsert: false,
4594 supersedes_id: None,
4595 }],
4596 actions: vec![],
4597 optional_backfills: vec![],
4598 vec_inserts: vec![],
4599 operational_writes: vec![],
4600 })
4601 .expect("v1 step write");
4602
4603 writer
4604 .submit(WriteRequest {
4605 label: "v2".to_owned(),
4606 nodes: vec![],
4607 node_retires: vec![],
4608 edges: vec![],
4609 edge_retires: vec![],
4610 chunks: vec![],
4611 runs: vec![],
4612 steps: vec![StepInsert {
4613 id: "step-v2".to_owned(),
4614 run_id: "run-1".to_owned(),
4615 kind: "llm".to_owned(),
4616 status: "completed".to_owned(),
4617 properties: "{}".to_owned(),
4618 source_ref: Some("src-2".to_owned()),
4619 upsert: true,
4620 supersedes_id: Some("step-v1".to_owned()),
4621 }],
4622 actions: vec![],
4623 optional_backfills: vec![],
4624 vec_inserts: vec![],
4625 operational_writes: vec![],
4626 })
4627 .expect("v2 step write");
4628
4629 let conn = rusqlite::Connection::open(db.path()).expect("open");
4630 let v1_historical: i64 = conn
4631 .query_row(
4632 "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4633 [],
4634 |r| r.get(0),
4635 )
4636 .expect("v1 historical count");
4637 let v2_active: i64 = conn
4638 .query_row(
4639 "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4640 [],
4641 |r| r.get(0),
4642 )
4643 .expect("v2 active count");
4644
4645 assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4646 assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4647 }
4648
4649 #[test]
4650 fn writer_action_upsert_supersedes_prior_active_action() {
4651 let db = NamedTempFile::new().expect("temporary db");
4652 let writer = WriterActor::start(
4653 db.path(),
4654 Arc::new(SchemaManager::new()),
4655 ProvenanceMode::Warn,
4656 Arc::new(TelemetryCounters::default()),
4657 )
4658 .expect("writer");
4659
4660 writer
4661 .submit(WriteRequest {
4662 label: "v1".to_owned(),
4663 nodes: vec![],
4664 node_retires: vec![],
4665 edges: vec![],
4666 edge_retires: vec![],
4667 chunks: vec![],
4668 runs: vec![RunInsert {
4669 id: "run-1".to_owned(),
4670 kind: "session".to_owned(),
4671 status: "completed".to_owned(),
4672 properties: "{}".to_owned(),
4673 source_ref: Some("src-1".to_owned()),
4674 upsert: false,
4675 supersedes_id: None,
4676 }],
4677 steps: vec![StepInsert {
4678 id: "step-1".to_owned(),
4679 run_id: "run-1".to_owned(),
4680 kind: "llm".to_owned(),
4681 status: "completed".to_owned(),
4682 properties: "{}".to_owned(),
4683 source_ref: Some("src-1".to_owned()),
4684 upsert: false,
4685 supersedes_id: None,
4686 }],
4687 actions: vec![ActionInsert {
4688 id: "action-v1".to_owned(),
4689 step_id: "step-1".to_owned(),
4690 kind: "emit".to_owned(),
4691 status: "completed".to_owned(),
4692 properties: "{}".to_owned(),
4693 source_ref: Some("src-1".to_owned()),
4694 upsert: false,
4695 supersedes_id: None,
4696 }],
4697 optional_backfills: vec![],
4698 vec_inserts: vec![],
4699 operational_writes: vec![],
4700 })
4701 .expect("v1 action write");
4702
4703 writer
4704 .submit(WriteRequest {
4705 label: "v2".to_owned(),
4706 nodes: vec![],
4707 node_retires: vec![],
4708 edges: vec![],
4709 edge_retires: vec![],
4710 chunks: vec![],
4711 runs: vec![],
4712 steps: vec![],
4713 actions: vec![ActionInsert {
4714 id: "action-v2".to_owned(),
4715 step_id: "step-1".to_owned(),
4716 kind: "emit".to_owned(),
4717 status: "completed".to_owned(),
4718 properties: "{}".to_owned(),
4719 source_ref: Some("src-2".to_owned()),
4720 upsert: true,
4721 supersedes_id: Some("action-v1".to_owned()),
4722 }],
4723 optional_backfills: vec![],
4724 vec_inserts: vec![],
4725 operational_writes: vec![],
4726 })
4727 .expect("v2 action write");
4728
4729 let conn = rusqlite::Connection::open(db.path()).expect("open");
4730 let v1_historical: i64 = conn
4731 .query_row(
4732 "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4733 [],
4734 |r| r.get(0),
4735 )
4736 .expect("v1 historical count");
4737 let v2_active: i64 = conn
4738 .query_row(
4739 "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4740 [],
4741 |r| r.get(0),
4742 )
4743 .expect("v2 active count");
4744
4745 assert_eq!(
4746 v1_historical, 1,
4747 "action-v1 must be historical after upsert"
4748 );
4749 assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4750 }
4751
4752 #[test]
4755 fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4756 let db = NamedTempFile::new().expect("temporary db");
4757 let writer = WriterActor::start(
4758 db.path(),
4759 Arc::new(SchemaManager::new()),
4760 ProvenanceMode::Warn,
4761 Arc::new(TelemetryCounters::default()),
4762 )
4763 .expect("writer");
4764
4765 let result = writer.submit(WriteRequest {
4766 label: "bad".to_owned(),
4767 nodes: vec![],
4768 node_retires: vec![],
4769 edges: vec![],
4770 edge_retires: vec![],
4771 chunks: vec![],
4772 runs: vec![RunInsert {
4773 id: "run-1".to_owned(),
4774 kind: "session".to_owned(),
4775 status: "completed".to_owned(),
4776 properties: "{}".to_owned(),
4777 source_ref: None,
4778 upsert: true,
4779 supersedes_id: None,
4780 }],
4781 steps: vec![],
4782 actions: vec![],
4783 optional_backfills: vec![],
4784 vec_inserts: vec![],
4785 operational_writes: vec![],
4786 });
4787
4788 assert!(
4789 matches!(result, Err(EngineError::InvalidWrite(_))),
4790 "run upsert=true without supersedes_id must return InvalidWrite"
4791 );
4792 }
4793
4794 #[test]
4795 fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4796 let db = NamedTempFile::new().expect("temporary db");
4797 let writer = WriterActor::start(
4798 db.path(),
4799 Arc::new(SchemaManager::new()),
4800 ProvenanceMode::Warn,
4801 Arc::new(TelemetryCounters::default()),
4802 )
4803 .expect("writer");
4804
4805 let result = writer.submit(WriteRequest {
4806 label: "bad".to_owned(),
4807 nodes: vec![],
4808 node_retires: vec![],
4809 edges: vec![],
4810 edge_retires: vec![],
4811 chunks: vec![],
4812 runs: vec![],
4813 steps: vec![StepInsert {
4814 id: "step-1".to_owned(),
4815 run_id: "run-1".to_owned(),
4816 kind: "llm".to_owned(),
4817 status: "completed".to_owned(),
4818 properties: "{}".to_owned(),
4819 source_ref: None,
4820 upsert: true,
4821 supersedes_id: None,
4822 }],
4823 actions: vec![],
4824 optional_backfills: vec![],
4825 vec_inserts: vec![],
4826 operational_writes: vec![],
4827 });
4828
4829 assert!(
4830 matches!(result, Err(EngineError::InvalidWrite(_))),
4831 "step upsert=true without supersedes_id must return InvalidWrite"
4832 );
4833 }
4834
4835 #[test]
4836 fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
4837 let db = NamedTempFile::new().expect("temporary db");
4838 let writer = WriterActor::start(
4839 db.path(),
4840 Arc::new(SchemaManager::new()),
4841 ProvenanceMode::Warn,
4842 Arc::new(TelemetryCounters::default()),
4843 )
4844 .expect("writer");
4845
4846 let result = writer.submit(WriteRequest {
4847 label: "bad".to_owned(),
4848 nodes: vec![],
4849 node_retires: vec![],
4850 edges: vec![],
4851 edge_retires: vec![],
4852 chunks: vec![],
4853 runs: vec![],
4854 steps: vec![],
4855 actions: vec![ActionInsert {
4856 id: "action-1".to_owned(),
4857 step_id: "step-1".to_owned(),
4858 kind: "emit".to_owned(),
4859 status: "completed".to_owned(),
4860 properties: "{}".to_owned(),
4861 source_ref: None,
4862 upsert: true,
4863 supersedes_id: None,
4864 }],
4865 optional_backfills: vec![],
4866 vec_inserts: vec![],
4867 operational_writes: vec![],
4868 });
4869
4870 assert!(
4871 matches!(result, Err(EngineError::InvalidWrite(_))),
4872 "action upsert=true without supersedes_id must return InvalidWrite"
4873 );
4874 }
4875
4876 #[test]
4879 fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
4880 let db = NamedTempFile::new().expect("temporary db");
4881 let writer = WriterActor::start(
4882 db.path(),
4883 Arc::new(SchemaManager::new()),
4884 ProvenanceMode::Warn,
4885 Arc::new(TelemetryCounters::default()),
4886 )
4887 .expect("writer");
4888
4889 let receipt = writer
4890 .submit(WriteRequest {
4891 label: "test".to_owned(),
4892 nodes: vec![
4893 NodeInsert {
4894 row_id: "row-a".to_owned(),
4895 logical_id: "node-a".to_owned(),
4896 kind: "Meeting".to_owned(),
4897 properties: "{}".to_owned(),
4898 source_ref: Some("src-1".to_owned()),
4899 upsert: false,
4900 chunk_policy: ChunkPolicy::Preserve,
4901 content_ref: None,
4902 },
4903 NodeInsert {
4904 row_id: "row-b".to_owned(),
4905 logical_id: "node-b".to_owned(),
4906 kind: "Task".to_owned(),
4907 properties: "{}".to_owned(),
4908 source_ref: Some("src-1".to_owned()),
4909 upsert: false,
4910 chunk_policy: ChunkPolicy::Preserve,
4911 content_ref: None,
4912 },
4913 ],
4914 node_retires: vec![],
4915 edges: vec![EdgeInsert {
4916 row_id: "edge-1".to_owned(),
4917 logical_id: "edge-lg-1".to_owned(),
4918 source_logical_id: "node-a".to_owned(),
4919 target_logical_id: "node-b".to_owned(),
4920 kind: "HAS_TASK".to_owned(),
4921 properties: "{}".to_owned(),
4922 source_ref: None,
4923 upsert: false,
4924 }],
4925 edge_retires: vec![],
4926 chunks: vec![],
4927 runs: vec![],
4928 steps: vec![],
4929 actions: vec![],
4930 optional_backfills: vec![],
4931 vec_inserts: vec![],
4932 operational_writes: vec![],
4933 })
4934 .expect("write");
4935
4936 assert!(
4937 !receipt.provenance_warnings.is_empty(),
4938 "edge insert without source_ref must emit a provenance warning"
4939 );
4940 }
4941
4942 #[test]
4943 fn writer_run_insert_without_source_ref_emits_provenance_warning() {
4944 let db = NamedTempFile::new().expect("temporary db");
4945 let writer = WriterActor::start(
4946 db.path(),
4947 Arc::new(SchemaManager::new()),
4948 ProvenanceMode::Warn,
4949 Arc::new(TelemetryCounters::default()),
4950 )
4951 .expect("writer");
4952
4953 let receipt = writer
4954 .submit(WriteRequest {
4955 label: "test".to_owned(),
4956 nodes: vec![],
4957 node_retires: vec![],
4958 edges: vec![],
4959 edge_retires: vec![],
4960 chunks: vec![],
4961 runs: vec![RunInsert {
4962 id: "run-1".to_owned(),
4963 kind: "session".to_owned(),
4964 status: "completed".to_owned(),
4965 properties: "{}".to_owned(),
4966 source_ref: None,
4967 upsert: false,
4968 supersedes_id: None,
4969 }],
4970 steps: vec![],
4971 actions: vec![],
4972 optional_backfills: vec![],
4973 vec_inserts: vec![],
4974 operational_writes: vec![],
4975 })
4976 .expect("write");
4977
4978 assert!(
4979 !receipt.provenance_warnings.is_empty(),
4980 "run insert without source_ref must emit a provenance warning"
4981 );
4982 }
4983
4984 #[test]
4987 fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
4988 let db = NamedTempFile::new().expect("temporary db");
4989 let writer = WriterActor::start(
4990 db.path(),
4991 Arc::new(SchemaManager::new()),
4992 ProvenanceMode::Warn,
4993 Arc::new(TelemetryCounters::default()),
4994 )
4995 .expect("writer");
4996
4997 writer
4999 .submit(WriteRequest {
5000 label: "seed".to_owned(),
5001 nodes: vec![NodeInsert {
5002 row_id: "row-1".to_owned(),
5003 logical_id: "meeting-1".to_owned(),
5004 kind: "Meeting".to_owned(),
5005 properties: "{}".to_owned(),
5006 source_ref: Some("src-1".to_owned()),
5007 upsert: false,
5008 chunk_policy: ChunkPolicy::Preserve,
5009 content_ref: None,
5010 }],
5011 node_retires: vec![],
5012 edges: vec![],
5013 edge_retires: vec![],
5014 chunks: vec![],
5015 runs: vec![],
5016 steps: vec![],
5017 actions: vec![],
5018 optional_backfills: vec![],
5019 vec_inserts: vec![],
5020 operational_writes: vec![],
5021 })
5022 .expect("seed write");
5023
5024 let result = writer.submit(WriteRequest {
5026 label: "bad".to_owned(),
5027 nodes: vec![],
5028 node_retires: vec![NodeRetire {
5029 logical_id: "meeting-1".to_owned(),
5030 source_ref: Some("src-2".to_owned()),
5031 }],
5032 edges: vec![],
5033 edge_retires: vec![],
5034 chunks: vec![ChunkInsert {
5035 id: "chunk-bad".to_owned(),
5036 node_logical_id: "meeting-1".to_owned(),
5037 text_content: "some text".to_owned(),
5038 byte_start: None,
5039 byte_end: None,
5040 content_hash: None,
5041 }],
5042 runs: vec![],
5043 steps: vec![],
5044 actions: vec![],
5045 optional_backfills: vec![],
5046 vec_inserts: vec![],
5047 operational_writes: vec![],
5048 });
5049
5050 assert!(
5051 matches!(result, Err(EngineError::InvalidWrite(_))),
5052 "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
5053 );
5054 }
5055
5056 #[test]
5059 fn writer_batch_insert_multiple_nodes() {
5060 let db = NamedTempFile::new().expect("temporary db");
5061 let writer = WriterActor::start(
5062 db.path(),
5063 Arc::new(SchemaManager::new()),
5064 ProvenanceMode::Warn,
5065 Arc::new(TelemetryCounters::default()),
5066 )
5067 .expect("writer");
5068
5069 let nodes: Vec<NodeInsert> = (0..100)
5070 .map(|i| NodeInsert {
5071 row_id: format!("row-{i}"),
5072 logical_id: format!("lg-{i}"),
5073 kind: "Note".to_owned(),
5074 properties: "{}".to_owned(),
5075 source_ref: Some("batch-src".to_owned()),
5076 upsert: false,
5077 chunk_policy: ChunkPolicy::Preserve,
5078 content_ref: None,
5079 })
5080 .collect();
5081
5082 writer
5083 .submit(WriteRequest {
5084 label: "batch".to_owned(),
5085 nodes,
5086 node_retires: vec![],
5087 edges: vec![],
5088 edge_retires: vec![],
5089 chunks: vec![],
5090 runs: vec![],
5091 steps: vec![],
5092 actions: vec![],
5093 optional_backfills: vec![],
5094 vec_inserts: vec![],
5095 operational_writes: vec![],
5096 })
5097 .expect("batch write");
5098
5099 let conn = rusqlite::Connection::open(db.path()).expect("open");
5100 let count: i64 = conn
5101 .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
5102 .expect("count nodes");
5103 assert_eq!(
5104 count, 100,
5105 "all 100 nodes must be present after batch insert"
5106 );
5107 }
5108
5109 #[test]
5112 fn prepare_write_rejects_empty_node_row_id() {
5113 let db = NamedTempFile::new().expect("temporary db");
5114 let writer = WriterActor::start(
5115 db.path(),
5116 Arc::new(SchemaManager::new()),
5117 ProvenanceMode::Warn,
5118 Arc::new(TelemetryCounters::default()),
5119 )
5120 .expect("writer");
5121
5122 let result = writer.submit(WriteRequest {
5123 label: "test".to_owned(),
5124 nodes: vec![NodeInsert {
5125 row_id: String::new(),
5126 logical_id: "lg-1".to_owned(),
5127 kind: "Note".to_owned(),
5128 properties: "{}".to_owned(),
5129 source_ref: None,
5130 upsert: false,
5131 chunk_policy: ChunkPolicy::Preserve,
5132 content_ref: None,
5133 }],
5134 node_retires: vec![],
5135 edges: vec![],
5136 edge_retires: vec![],
5137 chunks: vec![],
5138 runs: vec![],
5139 steps: vec![],
5140 actions: vec![],
5141 optional_backfills: vec![],
5142 vec_inserts: vec![],
5143 operational_writes: vec![],
5144 });
5145
5146 assert!(
5147 matches!(result, Err(EngineError::InvalidWrite(_))),
5148 "empty row_id must be rejected"
5149 );
5150 }
5151
5152 #[test]
5153 fn prepare_write_rejects_empty_node_logical_id() {
5154 let db = NamedTempFile::new().expect("temporary db");
5155 let writer = WriterActor::start(
5156 db.path(),
5157 Arc::new(SchemaManager::new()),
5158 ProvenanceMode::Warn,
5159 Arc::new(TelemetryCounters::default()),
5160 )
5161 .expect("writer");
5162
5163 let result = writer.submit(WriteRequest {
5164 label: "test".to_owned(),
5165 nodes: vec![NodeInsert {
5166 row_id: "row-1".to_owned(),
5167 logical_id: String::new(),
5168 kind: "Note".to_owned(),
5169 properties: "{}".to_owned(),
5170 source_ref: None,
5171 upsert: false,
5172 chunk_policy: ChunkPolicy::Preserve,
5173 content_ref: None,
5174 }],
5175 node_retires: vec![],
5176 edges: vec![],
5177 edge_retires: vec![],
5178 chunks: vec![],
5179 runs: vec![],
5180 steps: vec![],
5181 actions: vec![],
5182 optional_backfills: vec![],
5183 vec_inserts: vec![],
5184 operational_writes: vec![],
5185 });
5186
5187 assert!(
5188 matches!(result, Err(EngineError::InvalidWrite(_))),
5189 "empty logical_id must be rejected"
5190 );
5191 }
5192
5193 #[test]
5194 fn prepare_write_rejects_duplicate_row_ids_in_request() {
5195 let db = NamedTempFile::new().expect("temporary db");
5196 let writer = WriterActor::start(
5197 db.path(),
5198 Arc::new(SchemaManager::new()),
5199 ProvenanceMode::Warn,
5200 Arc::new(TelemetryCounters::default()),
5201 )
5202 .expect("writer");
5203
5204 let result = writer.submit(WriteRequest {
5205 label: "test".to_owned(),
5206 nodes: vec![
5207 NodeInsert {
5208 row_id: "row-1".to_owned(),
5209 logical_id: "lg-1".to_owned(),
5210 kind: "Note".to_owned(),
5211 properties: "{}".to_owned(),
5212 source_ref: None,
5213 upsert: false,
5214 chunk_policy: ChunkPolicy::Preserve,
5215 content_ref: None,
5216 },
5217 NodeInsert {
5218 row_id: "row-1".to_owned(), logical_id: "lg-2".to_owned(),
5220 kind: "Note".to_owned(),
5221 properties: "{}".to_owned(),
5222 source_ref: None,
5223 upsert: false,
5224 chunk_policy: ChunkPolicy::Preserve,
5225 content_ref: None,
5226 },
5227 ],
5228 node_retires: vec![],
5229 edges: vec![],
5230 edge_retires: vec![],
5231 chunks: vec![],
5232 runs: vec![],
5233 steps: vec![],
5234 actions: vec![],
5235 optional_backfills: vec![],
5236 vec_inserts: vec![],
5237 operational_writes: vec![],
5238 });
5239
5240 assert!(
5241 matches!(result, Err(EngineError::InvalidWrite(_))),
5242 "duplicate row_id within request must be rejected"
5243 );
5244 }
5245
5246 #[test]
5247 fn prepare_write_rejects_empty_chunk_id() {
5248 let db = NamedTempFile::new().expect("temporary db");
5249 let writer = WriterActor::start(
5250 db.path(),
5251 Arc::new(SchemaManager::new()),
5252 ProvenanceMode::Warn,
5253 Arc::new(TelemetryCounters::default()),
5254 )
5255 .expect("writer");
5256
5257 let result = writer.submit(WriteRequest {
5258 label: "test".to_owned(),
5259 nodes: vec![NodeInsert {
5260 row_id: "row-1".to_owned(),
5261 logical_id: "lg-1".to_owned(),
5262 kind: "Note".to_owned(),
5263 properties: "{}".to_owned(),
5264 source_ref: None,
5265 upsert: false,
5266 chunk_policy: ChunkPolicy::Preserve,
5267 content_ref: None,
5268 }],
5269 node_retires: vec![],
5270 edges: vec![],
5271 edge_retires: vec![],
5272 chunks: vec![ChunkInsert {
5273 id: String::new(),
5274 node_logical_id: "lg-1".to_owned(),
5275 text_content: "some text".to_owned(),
5276 byte_start: None,
5277 byte_end: None,
5278 content_hash: None,
5279 }],
5280 runs: vec![],
5281 steps: vec![],
5282 actions: vec![],
5283 optional_backfills: vec![],
5284 vec_inserts: vec![],
5285 operational_writes: vec![],
5286 });
5287
5288 assert!(
5289 matches!(result, Err(EngineError::InvalidWrite(_))),
5290 "empty chunk id must be rejected"
5291 );
5292 }
5293
5294 #[test]
5297 fn writer_receipt_warns_on_step_without_source_ref() {
5298 let db = NamedTempFile::new().expect("temporary db");
5299 let writer = WriterActor::start(
5300 db.path(),
5301 Arc::new(SchemaManager::new()),
5302 ProvenanceMode::Warn,
5303 Arc::new(TelemetryCounters::default()),
5304 )
5305 .expect("writer");
5306
5307 writer
5309 .submit(WriteRequest {
5310 label: "seed-run".to_owned(),
5311 nodes: vec![],
5312 node_retires: vec![],
5313 edges: vec![],
5314 edge_retires: vec![],
5315 chunks: vec![],
5316 runs: vec![RunInsert {
5317 id: "run-1".to_owned(),
5318 kind: "session".to_owned(),
5319 status: "active".to_owned(),
5320 properties: "{}".to_owned(),
5321 source_ref: Some("src-1".to_owned()),
5322 upsert: false,
5323 supersedes_id: None,
5324 }],
5325 steps: vec![],
5326 actions: vec![],
5327 optional_backfills: vec![],
5328 vec_inserts: vec![],
5329 operational_writes: vec![],
5330 })
5331 .expect("seed run");
5332
5333 let receipt = writer
5334 .submit(WriteRequest {
5335 label: "test".to_owned(),
5336 nodes: vec![],
5337 node_retires: vec![],
5338 edges: vec![],
5339 edge_retires: vec![],
5340 chunks: vec![],
5341 runs: vec![],
5342 steps: vec![StepInsert {
5343 id: "step-1".to_owned(),
5344 run_id: "run-1".to_owned(),
5345 kind: "llm_call".to_owned(),
5346 status: "completed".to_owned(),
5347 properties: "{}".to_owned(),
5348 source_ref: None,
5349 upsert: false,
5350 supersedes_id: None,
5351 }],
5352 actions: vec![],
5353 optional_backfills: vec![],
5354 vec_inserts: vec![],
5355 operational_writes: vec![],
5356 })
5357 .expect("write");
5358
5359 assert!(
5360 !receipt.provenance_warnings.is_empty(),
5361 "step insert without source_ref must emit a provenance warning"
5362 );
5363 }
5364
5365 #[test]
5366 fn writer_receipt_warns_on_action_without_source_ref() {
5367 let db = NamedTempFile::new().expect("temporary db");
5368 let writer = WriterActor::start(
5369 db.path(),
5370 Arc::new(SchemaManager::new()),
5371 ProvenanceMode::Warn,
5372 Arc::new(TelemetryCounters::default()),
5373 )
5374 .expect("writer");
5375
5376 writer
5378 .submit(WriteRequest {
5379 label: "seed".to_owned(),
5380 nodes: vec![],
5381 node_retires: vec![],
5382 edges: vec![],
5383 edge_retires: vec![],
5384 chunks: vec![],
5385 runs: vec![RunInsert {
5386 id: "run-1".to_owned(),
5387 kind: "session".to_owned(),
5388 status: "active".to_owned(),
5389 properties: "{}".to_owned(),
5390 source_ref: Some("src-1".to_owned()),
5391 upsert: false,
5392 supersedes_id: None,
5393 }],
5394 steps: vec![StepInsert {
5395 id: "step-1".to_owned(),
5396 run_id: "run-1".to_owned(),
5397 kind: "llm_call".to_owned(),
5398 status: "completed".to_owned(),
5399 properties: "{}".to_owned(),
5400 source_ref: Some("src-1".to_owned()),
5401 upsert: false,
5402 supersedes_id: None,
5403 }],
5404 actions: vec![],
5405 optional_backfills: vec![],
5406 vec_inserts: vec![],
5407 operational_writes: vec![],
5408 })
5409 .expect("seed");
5410
5411 let receipt = writer
5412 .submit(WriteRequest {
5413 label: "test".to_owned(),
5414 nodes: vec![],
5415 node_retires: vec![],
5416 edges: vec![],
5417 edge_retires: vec![],
5418 chunks: vec![],
5419 runs: vec![],
5420 steps: vec![],
5421 actions: vec![ActionInsert {
5422 id: "action-1".to_owned(),
5423 step_id: "step-1".to_owned(),
5424 kind: "tool_call".to_owned(),
5425 status: "completed".to_owned(),
5426 properties: "{}".to_owned(),
5427 source_ref: None,
5428 upsert: false,
5429 supersedes_id: None,
5430 }],
5431 optional_backfills: vec![],
5432 vec_inserts: vec![],
5433 operational_writes: vec![],
5434 })
5435 .expect("write");
5436
5437 assert!(
5438 !receipt.provenance_warnings.is_empty(),
5439 "action insert without source_ref must emit a provenance warning"
5440 );
5441 }
5442
5443 #[test]
5444 fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5445 let db = NamedTempFile::new().expect("temporary db");
5446 let writer = WriterActor::start(
5447 db.path(),
5448 Arc::new(SchemaManager::new()),
5449 ProvenanceMode::Warn,
5450 Arc::new(TelemetryCounters::default()),
5451 )
5452 .expect("writer");
5453
5454 let receipt = writer
5455 .submit(WriteRequest {
5456 label: "test".to_owned(),
5457 nodes: vec![NodeInsert {
5458 row_id: "row-1".to_owned(),
5459 logical_id: "node-1".to_owned(),
5460 kind: "Note".to_owned(),
5461 properties: "{}".to_owned(),
5462 source_ref: Some("src-1".to_owned()),
5463 upsert: false,
5464 chunk_policy: ChunkPolicy::Preserve,
5465 content_ref: None,
5466 }],
5467 node_retires: vec![],
5468 edges: vec![],
5469 edge_retires: vec![],
5470 chunks: vec![],
5471 runs: vec![RunInsert {
5472 id: "run-1".to_owned(),
5473 kind: "session".to_owned(),
5474 status: "active".to_owned(),
5475 properties: "{}".to_owned(),
5476 source_ref: Some("src-1".to_owned()),
5477 upsert: false,
5478 supersedes_id: None,
5479 }],
5480 steps: vec![StepInsert {
5481 id: "step-1".to_owned(),
5482 run_id: "run-1".to_owned(),
5483 kind: "llm_call".to_owned(),
5484 status: "completed".to_owned(),
5485 properties: "{}".to_owned(),
5486 source_ref: Some("src-1".to_owned()),
5487 upsert: false,
5488 supersedes_id: None,
5489 }],
5490 actions: vec![ActionInsert {
5491 id: "action-1".to_owned(),
5492 step_id: "step-1".to_owned(),
5493 kind: "tool_call".to_owned(),
5494 status: "completed".to_owned(),
5495 properties: "{}".to_owned(),
5496 source_ref: Some("src-1".to_owned()),
5497 upsert: false,
5498 supersedes_id: None,
5499 }],
5500 optional_backfills: vec![],
5501 vec_inserts: vec![],
5502 operational_writes: vec![],
5503 })
5504 .expect("write");
5505
5506 assert!(
5507 receipt.provenance_warnings.is_empty(),
5508 "no warnings expected when all types have source_ref; got: {:?}",
5509 receipt.provenance_warnings
5510 );
5511 }
5512
5513 #[test]
5516 fn default_provenance_mode_is_warn() {
5517 let db = NamedTempFile::new().expect("temporary db");
5519 let writer = WriterActor::start(
5520 db.path(),
5521 Arc::new(SchemaManager::new()),
5522 ProvenanceMode::default(),
5523 Arc::new(TelemetryCounters::default()),
5524 )
5525 .expect("writer");
5526
5527 let receipt = writer
5528 .submit(WriteRequest {
5529 label: "test".to_owned(),
5530 nodes: vec![NodeInsert {
5531 row_id: "row-1".to_owned(),
5532 logical_id: "node-1".to_owned(),
5533 kind: "Note".to_owned(),
5534 properties: "{}".to_owned(),
5535 source_ref: None,
5536 upsert: false,
5537 chunk_policy: ChunkPolicy::Preserve,
5538 content_ref: None,
5539 }],
5540 node_retires: vec![],
5541 edges: vec![],
5542 edge_retires: vec![],
5543 chunks: vec![],
5544 runs: vec![],
5545 steps: vec![],
5546 actions: vec![],
5547 optional_backfills: vec![],
5548 vec_inserts: vec![],
5549 operational_writes: vec![],
5550 })
5551 .expect("Warn mode must not reject missing source_ref");
5552
5553 assert!(
5554 !receipt.provenance_warnings.is_empty(),
5555 "Warn mode must emit a warning instead of rejecting"
5556 );
5557 }
5558
5559 #[test]
5560 fn require_mode_rejects_node_without_source_ref() {
5561 let db = NamedTempFile::new().expect("temporary db");
5562 let writer = WriterActor::start(
5563 db.path(),
5564 Arc::new(SchemaManager::new()),
5565 ProvenanceMode::Require,
5566 Arc::new(TelemetryCounters::default()),
5567 )
5568 .expect("writer");
5569
5570 let result = writer.submit(WriteRequest {
5571 label: "test".to_owned(),
5572 nodes: vec![NodeInsert {
5573 row_id: "row-1".to_owned(),
5574 logical_id: "node-1".to_owned(),
5575 kind: "Note".to_owned(),
5576 properties: "{}".to_owned(),
5577 source_ref: None,
5578 upsert: false,
5579 chunk_policy: ChunkPolicy::Preserve,
5580 content_ref: None,
5581 }],
5582 node_retires: vec![],
5583 edges: vec![],
5584 edge_retires: vec![],
5585 chunks: vec![],
5586 runs: vec![],
5587 steps: vec![],
5588 actions: vec![],
5589 optional_backfills: vec![],
5590 vec_inserts: vec![],
5591 operational_writes: vec![],
5592 });
5593
5594 assert!(
5595 matches!(result, Err(EngineError::InvalidWrite(_))),
5596 "Require mode must reject node without source_ref"
5597 );
5598 }
5599
5600 #[test]
5601 fn require_mode_accepts_node_with_source_ref() {
5602 let db = NamedTempFile::new().expect("temporary db");
5603 let writer = WriterActor::start(
5604 db.path(),
5605 Arc::new(SchemaManager::new()),
5606 ProvenanceMode::Require,
5607 Arc::new(TelemetryCounters::default()),
5608 )
5609 .expect("writer");
5610
5611 let result = writer.submit(WriteRequest {
5612 label: "test".to_owned(),
5613 nodes: vec![NodeInsert {
5614 row_id: "row-1".to_owned(),
5615 logical_id: "node-1".to_owned(),
5616 kind: "Note".to_owned(),
5617 properties: "{}".to_owned(),
5618 source_ref: Some("src-1".to_owned()),
5619 upsert: false,
5620 chunk_policy: ChunkPolicy::Preserve,
5621 content_ref: None,
5622 }],
5623 node_retires: vec![],
5624 edges: vec![],
5625 edge_retires: vec![],
5626 chunks: vec![],
5627 runs: vec![],
5628 steps: vec![],
5629 actions: vec![],
5630 optional_backfills: vec![],
5631 vec_inserts: vec![],
5632 operational_writes: vec![],
5633 });
5634
5635 assert!(
5636 result.is_ok(),
5637 "Require mode must accept node with source_ref"
5638 );
5639 }
5640
5641 #[test]
5642 fn require_mode_rejects_edge_without_source_ref() {
5643 let db = NamedTempFile::new().expect("temporary db");
5644 let writer = WriterActor::start(
5645 db.path(),
5646 Arc::new(SchemaManager::new()),
5647 ProvenanceMode::Require,
5648 Arc::new(TelemetryCounters::default()),
5649 )
5650 .expect("writer");
5651
5652 let result = writer.submit(WriteRequest {
5654 label: "test".to_owned(),
5655 nodes: vec![
5656 NodeInsert {
5657 row_id: "row-a".to_owned(),
5658 logical_id: "node-a".to_owned(),
5659 kind: "Note".to_owned(),
5660 properties: "{}".to_owned(),
5661 source_ref: Some("src-1".to_owned()),
5662 upsert: false,
5663 chunk_policy: ChunkPolicy::Preserve,
5664 content_ref: None,
5665 },
5666 NodeInsert {
5667 row_id: "row-b".to_owned(),
5668 logical_id: "node-b".to_owned(),
5669 kind: "Note".to_owned(),
5670 properties: "{}".to_owned(),
5671 source_ref: Some("src-1".to_owned()),
5672 upsert: false,
5673 chunk_policy: ChunkPolicy::Preserve,
5674 content_ref: None,
5675 },
5676 ],
5677 node_retires: vec![],
5678 edges: vec![EdgeInsert {
5679 row_id: "edge-row-1".to_owned(),
5680 logical_id: "edge-1".to_owned(),
5681 source_logical_id: "node-a".to_owned(),
5682 target_logical_id: "node-b".to_owned(),
5683 kind: "LINKS_TO".to_owned(),
5684 properties: "{}".to_owned(),
5685 source_ref: None,
5686 upsert: false,
5687 }],
5688 edge_retires: vec![],
5689 chunks: vec![],
5690 runs: vec![],
5691 steps: vec![],
5692 actions: vec![],
5693 optional_backfills: vec![],
5694 vec_inserts: vec![],
5695 operational_writes: vec![],
5696 });
5697
5698 assert!(
5699 matches!(result, Err(EngineError::InvalidWrite(_))),
5700 "Require mode must reject edge without source_ref"
5701 );
5702 }
5703
5704 #[test]
5707 fn fts_row_has_correct_kind_from_co_submitted_node() {
5708 let db = NamedTempFile::new().expect("temporary db");
5709 let writer = WriterActor::start(
5710 db.path(),
5711 Arc::new(SchemaManager::new()),
5712 ProvenanceMode::Warn,
5713 Arc::new(TelemetryCounters::default()),
5714 )
5715 .expect("writer");
5716
5717 writer
5718 .submit(WriteRequest {
5719 label: "test".to_owned(),
5720 nodes: vec![NodeInsert {
5721 row_id: "row-1".to_owned(),
5722 logical_id: "node-1".to_owned(),
5723 kind: "Meeting".to_owned(),
5724 properties: "{}".to_owned(),
5725 source_ref: Some("src-1".to_owned()),
5726 upsert: false,
5727 chunk_policy: ChunkPolicy::Preserve,
5728 content_ref: None,
5729 }],
5730 node_retires: vec![],
5731 edges: vec![],
5732 edge_retires: vec![],
5733 chunks: vec![ChunkInsert {
5734 id: "chunk-1".to_owned(),
5735 node_logical_id: "node-1".to_owned(),
5736 text_content: "some text".to_owned(),
5737 byte_start: None,
5738 byte_end: None,
5739 content_hash: None,
5740 }],
5741 runs: vec![],
5742 steps: vec![],
5743 actions: vec![],
5744 optional_backfills: vec![],
5745 vec_inserts: vec![],
5746 operational_writes: vec![],
5747 })
5748 .expect("write");
5749
5750 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5751 let kind: String = conn
5752 .query_row(
5753 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5754 [],
5755 |row| row.get(0),
5756 )
5757 .expect("fts row");
5758
5759 assert_eq!(kind, "Meeting");
5760 }
5761
5762 #[test]
5763 fn fts_row_has_correct_text_content() {
5764 let db = NamedTempFile::new().expect("temporary db");
5765 let writer = WriterActor::start(
5766 db.path(),
5767 Arc::new(SchemaManager::new()),
5768 ProvenanceMode::Warn,
5769 Arc::new(TelemetryCounters::default()),
5770 )
5771 .expect("writer");
5772
5773 writer
5774 .submit(WriteRequest {
5775 label: "test".to_owned(),
5776 nodes: vec![NodeInsert {
5777 row_id: "row-1".to_owned(),
5778 logical_id: "node-1".to_owned(),
5779 kind: "Note".to_owned(),
5780 properties: "{}".to_owned(),
5781 source_ref: Some("src-1".to_owned()),
5782 upsert: false,
5783 chunk_policy: ChunkPolicy::Preserve,
5784 content_ref: None,
5785 }],
5786 node_retires: vec![],
5787 edges: vec![],
5788 edge_retires: vec![],
5789 chunks: vec![ChunkInsert {
5790 id: "chunk-1".to_owned(),
5791 node_logical_id: "node-1".to_owned(),
5792 text_content: "exactly this text".to_owned(),
5793 byte_start: None,
5794 byte_end: None,
5795 content_hash: None,
5796 }],
5797 runs: vec![],
5798 steps: vec![],
5799 actions: vec![],
5800 optional_backfills: vec![],
5801 vec_inserts: vec![],
5802 operational_writes: vec![],
5803 })
5804 .expect("write");
5805
5806 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5807 let text: String = conn
5808 .query_row(
5809 "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5810 [],
5811 |row| row.get(0),
5812 )
5813 .expect("fts row");
5814
5815 assert_eq!(text, "exactly this text");
5816 }
5817
5818 #[test]
5819 fn fts_row_has_correct_kind_from_pre_existing_node() {
5820 let db = NamedTempFile::new().expect("temporary db");
5821 let writer = WriterActor::start(
5822 db.path(),
5823 Arc::new(SchemaManager::new()),
5824 ProvenanceMode::Warn,
5825 Arc::new(TelemetryCounters::default()),
5826 )
5827 .expect("writer");
5828
5829 writer
5831 .submit(WriteRequest {
5832 label: "r1".to_owned(),
5833 nodes: vec![NodeInsert {
5834 row_id: "row-1".to_owned(),
5835 logical_id: "node-1".to_owned(),
5836 kind: "Document".to_owned(),
5837 properties: "{}".to_owned(),
5838 source_ref: Some("src-1".to_owned()),
5839 upsert: false,
5840 chunk_policy: ChunkPolicy::Preserve,
5841 content_ref: None,
5842 }],
5843 node_retires: vec![],
5844 edges: vec![],
5845 edge_retires: vec![],
5846 chunks: vec![],
5847 runs: vec![],
5848 steps: vec![],
5849 actions: vec![],
5850 optional_backfills: vec![],
5851 vec_inserts: vec![],
5852 operational_writes: vec![],
5853 })
5854 .expect("r1 write");
5855
5856 writer
5858 .submit(WriteRequest {
5859 label: "r2".to_owned(),
5860 nodes: vec![],
5861 node_retires: vec![],
5862 edges: vec![],
5863 edge_retires: vec![],
5864 chunks: vec![ChunkInsert {
5865 id: "chunk-1".to_owned(),
5866 node_logical_id: "node-1".to_owned(),
5867 text_content: "some text".to_owned(),
5868 byte_start: None,
5869 byte_end: None,
5870 content_hash: None,
5871 }],
5872 runs: vec![],
5873 steps: vec![],
5874 actions: vec![],
5875 optional_backfills: vec![],
5876 vec_inserts: vec![],
5877 operational_writes: vec![],
5878 })
5879 .expect("r2 write");
5880
5881 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5882 let kind: String = conn
5883 .query_row(
5884 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5885 [],
5886 |row| row.get(0),
5887 )
5888 .expect("fts row");
5889
5890 assert_eq!(kind, "Document");
5891 }
5892
5893 #[test]
5894 fn fts_derives_rows_for_multiple_chunks_per_node() {
5895 let db = NamedTempFile::new().expect("temporary db");
5896 let writer = WriterActor::start(
5897 db.path(),
5898 Arc::new(SchemaManager::new()),
5899 ProvenanceMode::Warn,
5900 Arc::new(TelemetryCounters::default()),
5901 )
5902 .expect("writer");
5903
5904 writer
5905 .submit(WriteRequest {
5906 label: "test".to_owned(),
5907 nodes: vec![NodeInsert {
5908 row_id: "row-1".to_owned(),
5909 logical_id: "node-1".to_owned(),
5910 kind: "Meeting".to_owned(),
5911 properties: "{}".to_owned(),
5912 source_ref: Some("src-1".to_owned()),
5913 upsert: false,
5914 chunk_policy: ChunkPolicy::Preserve,
5915 content_ref: None,
5916 }],
5917 node_retires: vec![],
5918 edges: vec![],
5919 edge_retires: vec![],
5920 chunks: vec![
5921 ChunkInsert {
5922 id: "chunk-a".to_owned(),
5923 node_logical_id: "node-1".to_owned(),
5924 text_content: "intro".to_owned(),
5925 byte_start: None,
5926 byte_end: None,
5927 content_hash: None,
5928 },
5929 ChunkInsert {
5930 id: "chunk-b".to_owned(),
5931 node_logical_id: "node-1".to_owned(),
5932 text_content: "body".to_owned(),
5933 byte_start: None,
5934 byte_end: None,
5935 content_hash: None,
5936 },
5937 ChunkInsert {
5938 id: "chunk-c".to_owned(),
5939 node_logical_id: "node-1".to_owned(),
5940 text_content: "conclusion".to_owned(),
5941 byte_start: None,
5942 byte_end: None,
5943 content_hash: None,
5944 },
5945 ],
5946 runs: vec![],
5947 steps: vec![],
5948 actions: vec![],
5949 optional_backfills: vec![],
5950 vec_inserts: vec![],
5951 operational_writes: vec![],
5952 })
5953 .expect("write");
5954
5955 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5956 let count: i64 = conn
5957 .query_row(
5958 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5959 [],
5960 |row| row.get(0),
5961 )
5962 .expect("fts count");
5963
5964 assert_eq!(count, 3, "three chunks must produce three FTS rows");
5965 }
5966
5967 #[test]
5968 fn fts_resolves_mixed_fast_and_db_paths() {
5969 let db = NamedTempFile::new().expect("temporary db");
5970 let writer = WriterActor::start(
5971 db.path(),
5972 Arc::new(SchemaManager::new()),
5973 ProvenanceMode::Warn,
5974 Arc::new(TelemetryCounters::default()),
5975 )
5976 .expect("writer");
5977
5978 writer
5980 .submit(WriteRequest {
5981 label: "seed".to_owned(),
5982 nodes: vec![NodeInsert {
5983 row_id: "row-existing".to_owned(),
5984 logical_id: "existing-node".to_owned(),
5985 kind: "Archive".to_owned(),
5986 properties: "{}".to_owned(),
5987 source_ref: Some("src-1".to_owned()),
5988 upsert: false,
5989 chunk_policy: ChunkPolicy::Preserve,
5990 content_ref: None,
5991 }],
5992 node_retires: vec![],
5993 edges: vec![],
5994 edge_retires: vec![],
5995 chunks: vec![],
5996 runs: vec![],
5997 steps: vec![],
5998 actions: vec![],
5999 optional_backfills: vec![],
6000 vec_inserts: vec![],
6001 operational_writes: vec![],
6002 })
6003 .expect("seed");
6004
6005 writer
6007 .submit(WriteRequest {
6008 label: "mixed".to_owned(),
6009 nodes: vec![NodeInsert {
6010 row_id: "row-new".to_owned(),
6011 logical_id: "new-node".to_owned(),
6012 kind: "Inbox".to_owned(),
6013 properties: "{}".to_owned(),
6014 source_ref: Some("src-2".to_owned()),
6015 upsert: false,
6016 chunk_policy: ChunkPolicy::Preserve,
6017 content_ref: None,
6018 }],
6019 node_retires: vec![],
6020 edges: vec![],
6021 edge_retires: vec![],
6022 chunks: vec![
6023 ChunkInsert {
6024 id: "chunk-fast".to_owned(),
6025 node_logical_id: "new-node".to_owned(),
6026 text_content: "new content".to_owned(),
6027 byte_start: None,
6028 byte_end: None,
6029 content_hash: None,
6030 },
6031 ChunkInsert {
6032 id: "chunk-db".to_owned(),
6033 node_logical_id: "existing-node".to_owned(),
6034 text_content: "archive content".to_owned(),
6035 byte_start: None,
6036 byte_end: None,
6037 content_hash: None,
6038 },
6039 ],
6040 runs: vec![],
6041 steps: vec![],
6042 actions: vec![],
6043 optional_backfills: vec![],
6044 vec_inserts: vec![],
6045 operational_writes: vec![],
6046 })
6047 .expect("mixed write");
6048
6049 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6050 let fast_kind: String = conn
6051 .query_row(
6052 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
6053 [],
6054 |row| row.get(0),
6055 )
6056 .expect("fast path fts row");
6057 let db_kind: String = conn
6058 .query_row(
6059 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
6060 [],
6061 |row| row.get(0),
6062 )
6063 .expect("db path fts row");
6064
6065 assert_eq!(fast_kind, "Inbox");
6066 assert_eq!(db_kind, "Archive");
6067 }
6068
6069 #[test]
6070 fn prepare_write_rejects_empty_chunk_text() {
6071 let db = NamedTempFile::new().expect("temporary db");
6072 let writer = WriterActor::start(
6073 db.path(),
6074 Arc::new(SchemaManager::new()),
6075 ProvenanceMode::Warn,
6076 Arc::new(TelemetryCounters::default()),
6077 )
6078 .expect("writer");
6079
6080 let result = writer.submit(WriteRequest {
6081 label: "test".to_owned(),
6082 nodes: vec![NodeInsert {
6083 row_id: "row-1".to_owned(),
6084 logical_id: "node-1".to_owned(),
6085 kind: "Note".to_owned(),
6086 properties: "{}".to_owned(),
6087 source_ref: None,
6088 upsert: false,
6089 chunk_policy: ChunkPolicy::Preserve,
6090 content_ref: None,
6091 }],
6092 node_retires: vec![],
6093 edges: vec![],
6094 edge_retires: vec![],
6095 chunks: vec![ChunkInsert {
6096 id: "chunk-1".to_owned(),
6097 node_logical_id: "node-1".to_owned(),
6098 text_content: String::new(),
6099 byte_start: None,
6100 byte_end: None,
6101 content_hash: None,
6102 }],
6103 runs: vec![],
6104 steps: vec![],
6105 actions: vec![],
6106 optional_backfills: vec![],
6107 vec_inserts: vec![],
6108 operational_writes: vec![],
6109 });
6110
6111 assert!(
6112 matches!(result, Err(EngineError::InvalidWrite(_))),
6113 "empty text_content must be rejected"
6114 );
6115 }
6116
6117 #[test]
6118 fn receipt_reports_zero_backfills_when_none_submitted() {
6119 let db = NamedTempFile::new().expect("temporary db");
6120 let writer = WriterActor::start(
6121 db.path(),
6122 Arc::new(SchemaManager::new()),
6123 ProvenanceMode::Warn,
6124 Arc::new(TelemetryCounters::default()),
6125 )
6126 .expect("writer");
6127
6128 let receipt = writer
6129 .submit(WriteRequest {
6130 label: "test".to_owned(),
6131 nodes: vec![NodeInsert {
6132 row_id: "row-1".to_owned(),
6133 logical_id: "node-1".to_owned(),
6134 kind: "Note".to_owned(),
6135 properties: "{}".to_owned(),
6136 source_ref: Some("src-1".to_owned()),
6137 upsert: false,
6138 chunk_policy: ChunkPolicy::Preserve,
6139 content_ref: None,
6140 }],
6141 node_retires: vec![],
6142 edges: vec![],
6143 edge_retires: vec![],
6144 chunks: vec![],
6145 runs: vec![],
6146 steps: vec![],
6147 actions: vec![],
6148 optional_backfills: vec![],
6149 vec_inserts: vec![],
6150 operational_writes: vec![],
6151 })
6152 .expect("write");
6153
6154 assert_eq!(receipt.optional_backfill_count, 0);
6155 }
6156
6157 #[test]
6158 fn receipt_reports_correct_backfill_count() {
6159 let db = NamedTempFile::new().expect("temporary db");
6160 let writer = WriterActor::start(
6161 db.path(),
6162 Arc::new(SchemaManager::new()),
6163 ProvenanceMode::Warn,
6164 Arc::new(TelemetryCounters::default()),
6165 )
6166 .expect("writer");
6167
6168 let receipt = writer
6169 .submit(WriteRequest {
6170 label: "test".to_owned(),
6171 nodes: vec![NodeInsert {
6172 row_id: "row-1".to_owned(),
6173 logical_id: "node-1".to_owned(),
6174 kind: "Note".to_owned(),
6175 properties: "{}".to_owned(),
6176 source_ref: Some("src-1".to_owned()),
6177 upsert: false,
6178 chunk_policy: ChunkPolicy::Preserve,
6179 content_ref: None,
6180 }],
6181 node_retires: vec![],
6182 edges: vec![],
6183 edge_retires: vec![],
6184 chunks: vec![],
6185 runs: vec![],
6186 steps: vec![],
6187 actions: vec![],
6188 optional_backfills: vec![
6189 OptionalProjectionTask {
6190 target: ProjectionTarget::Fts,
6191 payload: "p1".to_owned(),
6192 },
6193 OptionalProjectionTask {
6194 target: ProjectionTarget::Vec,
6195 payload: "p2".to_owned(),
6196 },
6197 OptionalProjectionTask {
6198 target: ProjectionTarget::All,
6199 payload: "p3".to_owned(),
6200 },
6201 ],
6202 vec_inserts: vec![],
6203 operational_writes: vec![],
6204 })
6205 .expect("write");
6206
6207 assert_eq!(receipt.optional_backfill_count, 3);
6208 }
6209
6210 #[test]
6211 fn backfill_tasks_are_not_executed_during_write() {
6212 let db = NamedTempFile::new().expect("temporary db");
6213 let writer = WriterActor::start(
6214 db.path(),
6215 Arc::new(SchemaManager::new()),
6216 ProvenanceMode::Warn,
6217 Arc::new(TelemetryCounters::default()),
6218 )
6219 .expect("writer");
6220
6221 writer
6224 .submit(WriteRequest {
6225 label: "test".to_owned(),
6226 nodes: vec![NodeInsert {
6227 row_id: "row-1".to_owned(),
6228 logical_id: "node-1".to_owned(),
6229 kind: "Note".to_owned(),
6230 properties: "{}".to_owned(),
6231 source_ref: Some("src-1".to_owned()),
6232 upsert: false,
6233 chunk_policy: ChunkPolicy::Preserve,
6234 content_ref: None,
6235 }],
6236 node_retires: vec![],
6237 edges: vec![],
6238 edge_retires: vec![],
6239 chunks: vec![ChunkInsert {
6240 id: "chunk-1".to_owned(),
6241 node_logical_id: "node-1".to_owned(),
6242 text_content: "required text".to_owned(),
6243 byte_start: None,
6244 byte_end: None,
6245 content_hash: None,
6246 }],
6247 runs: vec![],
6248 steps: vec![],
6249 actions: vec![],
6250 optional_backfills: vec![OptionalProjectionTask {
6251 target: ProjectionTarget::Fts,
6252 payload: "backfill-payload".to_owned(),
6253 }],
6254 vec_inserts: vec![],
6255 operational_writes: vec![],
6256 })
6257 .expect("write");
6258
6259 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6260 let count: i64 = conn
6261 .query_row(
6262 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6263 [],
6264 |row| row.get(0),
6265 )
6266 .expect("fts count");
6267
6268 assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6269 }
6270
6271 #[test]
6272 fn fts_row_uses_new_kind_after_node_replace() {
6273 let db = NamedTempFile::new().expect("temporary db");
6274 let writer = WriterActor::start(
6275 db.path(),
6276 Arc::new(SchemaManager::new()),
6277 ProvenanceMode::Warn,
6278 Arc::new(TelemetryCounters::default()),
6279 )
6280 .expect("writer");
6281
6282 writer
6284 .submit(WriteRequest {
6285 label: "v1".to_owned(),
6286 nodes: vec![NodeInsert {
6287 row_id: "row-1".to_owned(),
6288 logical_id: "node-1".to_owned(),
6289 kind: "Note".to_owned(),
6290 properties: "{}".to_owned(),
6291 source_ref: Some("src-1".to_owned()),
6292 upsert: false,
6293 chunk_policy: ChunkPolicy::Preserve,
6294 content_ref: None,
6295 }],
6296 node_retires: vec![],
6297 edges: vec![],
6298 edge_retires: vec![],
6299 chunks: vec![ChunkInsert {
6300 id: "chunk-v1".to_owned(),
6301 node_logical_id: "node-1".to_owned(),
6302 text_content: "original".to_owned(),
6303 byte_start: None,
6304 byte_end: None,
6305 content_hash: None,
6306 }],
6307 runs: vec![],
6308 steps: vec![],
6309 actions: vec![],
6310 optional_backfills: vec![],
6311 vec_inserts: vec![],
6312 operational_writes: vec![],
6313 })
6314 .expect("v1 write");
6315
6316 writer
6318 .submit(WriteRequest {
6319 label: "v2".to_owned(),
6320 nodes: vec![NodeInsert {
6321 row_id: "row-2".to_owned(),
6322 logical_id: "node-1".to_owned(),
6323 kind: "Meeting".to_owned(),
6324 properties: "{}".to_owned(),
6325 source_ref: Some("src-2".to_owned()),
6326 upsert: true,
6327 chunk_policy: ChunkPolicy::Replace,
6328 content_ref: None,
6329 }],
6330 node_retires: vec![],
6331 edges: vec![],
6332 edge_retires: vec![],
6333 chunks: vec![ChunkInsert {
6334 id: "chunk-v2".to_owned(),
6335 node_logical_id: "node-1".to_owned(),
6336 text_content: "updated".to_owned(),
6337 byte_start: None,
6338 byte_end: None,
6339 content_hash: None,
6340 }],
6341 runs: vec![],
6342 steps: vec![],
6343 actions: vec![],
6344 optional_backfills: vec![],
6345 vec_inserts: vec![],
6346 operational_writes: vec![],
6347 })
6348 .expect("v2 write");
6349
6350 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6351
6352 let old_count: i64 = conn
6354 .query_row(
6355 "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6356 [],
6357 |row| row.get(0),
6358 )
6359 .expect("old fts count");
6360 assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6361
6362 let new_kind: String = conn
6364 .query_row(
6365 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6366 [],
6367 |row| row.get(0),
6368 )
6369 .expect("new fts row");
6370 assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6371 }
6372
6373 #[test]
6376 fn vec_insert_empty_chunk_id_is_rejected() {
6377 let db = NamedTempFile::new().expect("temporary db");
6378 let writer = WriterActor::start(
6379 db.path(),
6380 Arc::new(SchemaManager::new()),
6381 ProvenanceMode::Warn,
6382 Arc::new(TelemetryCounters::default()),
6383 )
6384 .expect("writer");
6385 let result = writer.submit(WriteRequest {
6386 label: "vec-test".to_owned(),
6387 nodes: vec![],
6388 node_retires: vec![],
6389 edges: vec![],
6390 edge_retires: vec![],
6391 chunks: vec![],
6392 runs: vec![],
6393 steps: vec![],
6394 actions: vec![],
6395 optional_backfills: vec![],
6396 vec_inserts: vec![VecInsert {
6397 chunk_id: String::new(),
6398 embedding: vec![0.1, 0.2, 0.3],
6399 }],
6400 operational_writes: vec![],
6401 });
6402 assert!(
6403 matches!(result, Err(EngineError::InvalidWrite(_))),
6404 "empty chunk_id in VecInsert must be rejected"
6405 );
6406 }
6407
6408 #[test]
6409 fn vec_insert_empty_embedding_is_rejected() {
6410 let db = NamedTempFile::new().expect("temporary db");
6411 let writer = WriterActor::start(
6412 db.path(),
6413 Arc::new(SchemaManager::new()),
6414 ProvenanceMode::Warn,
6415 Arc::new(TelemetryCounters::default()),
6416 )
6417 .expect("writer");
6418 let result = writer.submit(WriteRequest {
6419 label: "vec-test".to_owned(),
6420 nodes: vec![],
6421 node_retires: vec![],
6422 edges: vec![],
6423 edge_retires: vec![],
6424 chunks: vec![],
6425 runs: vec![],
6426 steps: vec![],
6427 actions: vec![],
6428 optional_backfills: vec![],
6429 vec_inserts: vec![VecInsert {
6430 chunk_id: "chunk-1".to_owned(),
6431 embedding: vec![],
6432 }],
6433 operational_writes: vec![],
6434 });
6435 assert!(
6436 matches!(result, Err(EngineError::InvalidWrite(_))),
6437 "empty embedding in VecInsert must be rejected"
6438 );
6439 }
6440
6441 #[test]
6442 fn vec_insert_noop_without_feature() {
6443 let db = NamedTempFile::new().expect("temporary db");
6446 let writer = WriterActor::start(
6447 db.path(),
6448 Arc::new(SchemaManager::new()),
6449 ProvenanceMode::Warn,
6450 Arc::new(TelemetryCounters::default()),
6451 )
6452 .expect("writer");
6453 let result = writer.submit(WriteRequest {
6454 label: "vec-noop".to_owned(),
6455 nodes: vec![],
6456 node_retires: vec![],
6457 edges: vec![],
6458 edge_retires: vec![],
6459 chunks: vec![],
6460 runs: vec![],
6461 steps: vec![],
6462 actions: vec![],
6463 optional_backfills: vec![],
6464 vec_inserts: vec![VecInsert {
6465 chunk_id: "chunk-noop".to_owned(),
6466 embedding: vec![1.0, 2.0, 3.0],
6467 }],
6468 operational_writes: vec![],
6469 });
6470 #[cfg(not(feature = "sqlite-vec"))]
6471 result.expect("noop VecInsert without feature must succeed");
6472 #[cfg(feature = "sqlite-vec")]
6474 let _ = result;
6475 }
6476
6477 #[cfg(feature = "sqlite-vec")]
6478 #[test]
6479 fn node_retire_preserves_vec_rows_for_later_restore() {
6480 use crate::sqlite::open_connection_with_vec;
6481
6482 let db = NamedTempFile::new().expect("temporary db");
6483 let schema_manager = Arc::new(SchemaManager::new());
6484
6485 {
6486 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6487 schema_manager.bootstrap(&conn).expect("bootstrap");
6488 schema_manager
6489 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6490 .expect("ensure profile");
6491 }
6492
6493 let writer = WriterActor::start(
6494 db.path(),
6495 Arc::clone(&schema_manager),
6496 ProvenanceMode::Warn,
6497 Arc::new(TelemetryCounters::default()),
6498 )
6499 .expect("writer");
6500
6501 writer
6503 .submit(WriteRequest {
6504 label: "setup".to_owned(),
6505 nodes: vec![NodeInsert {
6506 row_id: "row-retire-vec".to_owned(),
6507 logical_id: "node-retire-vec".to_owned(),
6508 kind: "Doc".to_owned(),
6509 properties: "{}".to_owned(),
6510 source_ref: Some("src".to_owned()),
6511 upsert: false,
6512 chunk_policy: ChunkPolicy::Preserve,
6513 content_ref: None,
6514 }],
6515 node_retires: vec![],
6516 edges: vec![],
6517 edge_retires: vec![],
6518 chunks: vec![ChunkInsert {
6519 id: "chunk-retire-vec".to_owned(),
6520 node_logical_id: "node-retire-vec".to_owned(),
6521 text_content: "text".to_owned(),
6522 byte_start: None,
6523 byte_end: None,
6524 content_hash: None,
6525 }],
6526 runs: vec![],
6527 steps: vec![],
6528 actions: vec![],
6529 optional_backfills: vec![],
6530 vec_inserts: vec![VecInsert {
6531 chunk_id: "chunk-retire-vec".to_owned(),
6532 embedding: vec![0.1, 0.2, 0.3],
6533 }],
6534 operational_writes: vec![],
6535 })
6536 .expect("setup write");
6537
6538 writer
6540 .submit(WriteRequest {
6541 label: "retire".to_owned(),
6542 nodes: vec![],
6543 node_retires: vec![NodeRetire {
6544 logical_id: "node-retire-vec".to_owned(),
6545 source_ref: Some("src".to_owned()),
6546 }],
6547 edges: vec![],
6548 edge_retires: vec![],
6549 chunks: vec![],
6550 runs: vec![],
6551 steps: vec![],
6552 actions: vec![],
6553 optional_backfills: vec![],
6554 vec_inserts: vec![],
6555 operational_writes: vec![],
6556 })
6557 .expect("retire write");
6558
6559 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6560 let count: i64 = conn
6561 .query_row(
6562 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-retire-vec'",
6563 [],
6564 |row| row.get(0),
6565 )
6566 .expect("count");
6567 assert_eq!(
6568 count, 1,
6569 "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6570 );
6571 }
6572
6573 #[cfg(feature = "sqlite-vec")]
6574 #[test]
6575 #[allow(clippy::too_many_lines)]
6576 fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6577 use crate::sqlite::open_connection_with_vec;
6578
6579 let db = NamedTempFile::new().expect("temporary db");
6580 let schema_manager = Arc::new(SchemaManager::new());
6581
6582 {
6583 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6584 schema_manager.bootstrap(&conn).expect("bootstrap");
6585 schema_manager
6586 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6587 .expect("ensure profile");
6588 }
6589
6590 let writer = WriterActor::start(
6591 db.path(),
6592 Arc::clone(&schema_manager),
6593 ProvenanceMode::Warn,
6594 Arc::new(TelemetryCounters::default()),
6595 )
6596 .expect("writer");
6597
6598 writer
6600 .submit(WriteRequest {
6601 label: "v1".to_owned(),
6602 nodes: vec![NodeInsert {
6603 row_id: "row-replace-v1".to_owned(),
6604 logical_id: "node-replace-vec".to_owned(),
6605 kind: "Doc".to_owned(),
6606 properties: "{}".to_owned(),
6607 source_ref: Some("src".to_owned()),
6608 upsert: false,
6609 chunk_policy: ChunkPolicy::Preserve,
6610 content_ref: None,
6611 }],
6612 node_retires: vec![],
6613 edges: vec![],
6614 edge_retires: vec![],
6615 chunks: vec![ChunkInsert {
6616 id: "chunk-replace-A".to_owned(),
6617 node_logical_id: "node-replace-vec".to_owned(),
6618 text_content: "version one".to_owned(),
6619 byte_start: None,
6620 byte_end: None,
6621 content_hash: None,
6622 }],
6623 runs: vec![],
6624 steps: vec![],
6625 actions: vec![],
6626 optional_backfills: vec![],
6627 vec_inserts: vec![VecInsert {
6628 chunk_id: "chunk-replace-A".to_owned(),
6629 embedding: vec![0.1, 0.2, 0.3],
6630 }],
6631 operational_writes: vec![],
6632 })
6633 .expect("v1 write");
6634
6635 writer
6637 .submit(WriteRequest {
6638 label: "v2".to_owned(),
6639 nodes: vec![NodeInsert {
6640 row_id: "row-replace-v2".to_owned(),
6641 logical_id: "node-replace-vec".to_owned(),
6642 kind: "Doc".to_owned(),
6643 properties: "{}".to_owned(),
6644 source_ref: Some("src".to_owned()),
6645 upsert: true,
6646 chunk_policy: ChunkPolicy::Replace,
6647 content_ref: None,
6648 }],
6649 node_retires: vec![],
6650 edges: vec![],
6651 edge_retires: vec![],
6652 chunks: vec![ChunkInsert {
6653 id: "chunk-replace-B".to_owned(),
6654 node_logical_id: "node-replace-vec".to_owned(),
6655 text_content: "version two".to_owned(),
6656 byte_start: None,
6657 byte_end: None,
6658 content_hash: None,
6659 }],
6660 runs: vec![],
6661 steps: vec![],
6662 actions: vec![],
6663 optional_backfills: vec![],
6664 vec_inserts: vec![VecInsert {
6665 chunk_id: "chunk-replace-B".to_owned(),
6666 embedding: vec![0.4, 0.5, 0.6],
6667 }],
6668 operational_writes: vec![],
6669 })
6670 .expect("v2 write");
6671
6672 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6673 let count_a: i64 = conn
6674 .query_row(
6675 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-A'",
6676 [],
6677 |row| row.get(0),
6678 )
6679 .expect("count A");
6680 let count_b: i64 = conn
6681 .query_row(
6682 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-B'",
6683 [],
6684 |row| row.get(0),
6685 )
6686 .expect("count B");
6687 assert_eq!(
6688 count_a, 0,
6689 "old vec row (chunk-A) must be deleted on Replace"
6690 );
6691 assert_eq!(
6692 count_b, 1,
6693 "new vec row (chunk-B) must be present after Replace"
6694 );
6695 }
6696
6697 #[cfg(feature = "sqlite-vec")]
6698 #[test]
6699 fn vec_insert_is_persisted_when_feature_enabled() {
6700 use crate::sqlite::open_connection_with_vec;
6701
6702 let db = NamedTempFile::new().expect("temporary db");
6703 let schema_manager = Arc::new(SchemaManager::new());
6704
6705 {
6707 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6708 schema_manager.bootstrap(&conn).expect("bootstrap");
6709 schema_manager
6710 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6711 .expect("ensure profile");
6712 }
6713
6714 let writer = WriterActor::start(
6715 db.path(),
6716 Arc::clone(&schema_manager),
6717 ProvenanceMode::Warn,
6718 Arc::new(TelemetryCounters::default()),
6719 )
6720 .expect("writer");
6721
6722 writer
6723 .submit(WriteRequest {
6724 label: "vec-insert".to_owned(),
6725 nodes: vec![],
6726 node_retires: vec![],
6727 edges: vec![],
6728 edge_retires: vec![],
6729 chunks: vec![],
6730 runs: vec![],
6731 steps: vec![],
6732 actions: vec![],
6733 optional_backfills: vec![],
6734 vec_inserts: vec![VecInsert {
6735 chunk_id: "chunk-vec".to_owned(),
6736 embedding: vec![0.1, 0.2, 0.3],
6737 }],
6738 operational_writes: vec![],
6739 })
6740 .expect("vec insert write");
6741
6742 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6743 let count: i64 = conn
6744 .query_row(
6745 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-vec'",
6746 [],
6747 |row| row.get(0),
6748 )
6749 .expect("count");
6750 assert_eq!(count, 1, "VecInsert must persist a row in vec_nodes_active");
6751 }
6752
6753 #[test]
6756 fn write_request_exceeding_node_limit_is_rejected() {
6757 let nodes: Vec<NodeInsert> = (0..10_001)
6758 .map(|i| NodeInsert {
6759 row_id: format!("row-{i}"),
6760 logical_id: format!("lg-{i}"),
6761 kind: "Note".to_owned(),
6762 properties: "{}".to_owned(),
6763 source_ref: None,
6764 upsert: false,
6765 chunk_policy: ChunkPolicy::Preserve,
6766 content_ref: None,
6767 })
6768 .collect();
6769
6770 let request = WriteRequest {
6771 label: "too-many-nodes".to_owned(),
6772 nodes,
6773 node_retires: vec![],
6774 edges: vec![],
6775 edge_retires: vec![],
6776 chunks: vec![],
6777 runs: vec![],
6778 steps: vec![],
6779 actions: vec![],
6780 optional_backfills: vec![],
6781 vec_inserts: vec![],
6782 operational_writes: vec![],
6783 };
6784
6785 let result = prepare_write(request, ProvenanceMode::Warn)
6786 .map(|_| ())
6787 .map_err(|e| format!("{e}"));
6788 assert!(
6789 matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6790 "exceeding node limit must return InvalidWrite: got {result:?}"
6791 );
6792 }
6793
6794 #[test]
6795 fn write_request_exceeding_total_limit_is_rejected() {
6796 let request = WriteRequest {
6800 label: "too-many-total".to_owned(),
6801 nodes: (0..10_000)
6802 .map(|i| NodeInsert {
6803 row_id: format!("row-{i}"),
6804 logical_id: format!("lg-{i}"),
6805 kind: "Note".to_owned(),
6806 properties: "{}".to_owned(),
6807 source_ref: None,
6808 upsert: false,
6809 chunk_policy: ChunkPolicy::Preserve,
6810 content_ref: None,
6811 })
6812 .collect(),
6813 node_retires: vec![],
6814 edges: (0..10_000)
6815 .map(|i| EdgeInsert {
6816 row_id: format!("edge-row-{i}"),
6817 logical_id: format!("edge-lg-{i}"),
6818 kind: "link".to_owned(),
6819 source_logical_id: format!("lg-{i}"),
6820 target_logical_id: format!("lg-{}", i + 1),
6821 properties: "{}".to_owned(),
6822 source_ref: None,
6823 upsert: false,
6824 })
6825 .collect(),
6826 edge_retires: vec![],
6827 chunks: (0..50_000)
6828 .map(|i| ChunkInsert {
6829 id: format!("chunk-{i}"),
6830 node_logical_id: "lg-0".to_owned(),
6831 text_content: "text".to_owned(),
6832 byte_start: None,
6833 byte_end: None,
6834 content_hash: None,
6835 })
6836 .collect(),
6837 runs: vec![],
6838 steps: vec![],
6839 actions: vec![],
6840 optional_backfills: vec![],
6841 vec_inserts: (0..20_001)
6842 .map(|i| VecInsert {
6843 chunk_id: format!("vec-chunk-{i}"),
6844 embedding: vec![0.1],
6845 })
6846 .collect(),
6847 operational_writes: (0..10_000)
6848 .map(|i| OperationalWrite::Append {
6849 collection: format!("col-{i}"),
6850 record_key: format!("key-{i}"),
6851 payload_json: "{}".to_owned(),
6852 source_ref: None,
6853 })
6854 .collect(),
6855 };
6856
6857 let result = prepare_write(request, ProvenanceMode::Warn)
6858 .map(|_| ())
6859 .map_err(|e| format!("{e}"));
6860 assert!(
6861 matches!(result, Err(ref msg) if msg.contains("too many total items")),
6862 "exceeding total item limit must return InvalidWrite: got {result:?}"
6863 );
6864 }
6865
6866 #[test]
6867 fn write_request_within_limits_succeeds() {
6868 let db = NamedTempFile::new().expect("temporary db");
6869 let writer = WriterActor::start(
6870 db.path(),
6871 Arc::new(SchemaManager::new()),
6872 ProvenanceMode::Warn,
6873 Arc::new(TelemetryCounters::default()),
6874 )
6875 .expect("writer");
6876
6877 let result = writer.submit(WriteRequest {
6878 label: "within-limits".to_owned(),
6879 nodes: vec![NodeInsert {
6880 row_id: "row-1".to_owned(),
6881 logical_id: "lg-1".to_owned(),
6882 kind: "Note".to_owned(),
6883 properties: "{}".to_owned(),
6884 source_ref: None,
6885 upsert: false,
6886 chunk_policy: ChunkPolicy::Preserve,
6887 content_ref: None,
6888 }],
6889 node_retires: vec![],
6890 edges: vec![],
6891 edge_retires: vec![],
6892 chunks: vec![],
6893 runs: vec![],
6894 steps: vec![],
6895 actions: vec![],
6896 optional_backfills: vec![],
6897 vec_inserts: vec![],
6898 operational_writes: vec![],
6899 });
6900
6901 assert!(
6902 result.is_ok(),
6903 "write request within limits must succeed: got {result:?}"
6904 );
6905 }
6906
6907 #[test]
6908 fn property_fts_rows_created_on_node_insert() {
6909 let db = NamedTempFile::new().expect("temporary db");
6910 let schema = Arc::new(SchemaManager::new());
6912 {
6913 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6914 schema.bootstrap(&conn).expect("bootstrap");
6915 conn.execute(
6916 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6917 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
6918 [],
6919 )
6920 .expect("register schema");
6921 }
6922 let writer = WriterActor::start(
6923 db.path(),
6924 Arc::clone(&schema),
6925 ProvenanceMode::Warn,
6926 Arc::new(TelemetryCounters::default()),
6927 )
6928 .expect("writer");
6929
6930 writer
6931 .submit(WriteRequest {
6932 label: "goal-insert".to_owned(),
6933 nodes: vec![NodeInsert {
6934 row_id: "row-1".to_owned(),
6935 logical_id: "goal-1".to_owned(),
6936 kind: "Goal".to_owned(),
6937 properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
6938 .to_owned(),
6939 source_ref: Some("src-1".to_owned()),
6940 upsert: false,
6941 chunk_policy: ChunkPolicy::Preserve,
6942 content_ref: None,
6943 }],
6944 node_retires: vec![],
6945 edges: vec![],
6946 edge_retires: vec![],
6947 chunks: vec![],
6948 runs: vec![],
6949 steps: vec![],
6950 actions: vec![],
6951 optional_backfills: vec![],
6952 vec_inserts: vec![],
6953 operational_writes: vec![],
6954 })
6955 .expect("write");
6956
6957 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6958 let text: String = conn
6959 .query_row(
6960 "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6961 [],
6962 |row| row.get(0),
6963 )
6964 .expect("property FTS row must exist");
6965 assert_eq!(text, "Ship v2 Launch the redesign");
6966 }
6967
6968 #[test]
6969 fn property_fts_rows_replaced_on_upsert() {
6970 let db = NamedTempFile::new().expect("temporary db");
6971 let schema = Arc::new(SchemaManager::new());
6972 {
6973 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6974 schema.bootstrap(&conn).expect("bootstrap");
6975 conn.execute(
6976 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6977 VALUES ('Goal', '[\"$.name\"]', ' ')",
6978 [],
6979 )
6980 .expect("register schema");
6981 }
6982 let writer = WriterActor::start(
6983 db.path(),
6984 Arc::clone(&schema),
6985 ProvenanceMode::Warn,
6986 Arc::new(TelemetryCounters::default()),
6987 )
6988 .expect("writer");
6989
6990 writer
6992 .submit(WriteRequest {
6993 label: "insert".to_owned(),
6994 nodes: vec![NodeInsert {
6995 row_id: "row-1".to_owned(),
6996 logical_id: "goal-1".to_owned(),
6997 kind: "Goal".to_owned(),
6998 properties: r#"{"name":"Alpha"}"#.to_owned(),
6999 source_ref: Some("src-1".to_owned()),
7000 upsert: false,
7001 chunk_policy: ChunkPolicy::Preserve,
7002 content_ref: None,
7003 }],
7004 node_retires: vec![],
7005 edges: vec![],
7006 edge_retires: vec![],
7007 chunks: vec![],
7008 runs: vec![],
7009 steps: vec![],
7010 actions: vec![],
7011 optional_backfills: vec![],
7012 vec_inserts: vec![],
7013 operational_writes: vec![],
7014 })
7015 .expect("insert");
7016
7017 writer
7019 .submit(WriteRequest {
7020 label: "upsert".to_owned(),
7021 nodes: vec![NodeInsert {
7022 row_id: "row-2".to_owned(),
7023 logical_id: "goal-1".to_owned(),
7024 kind: "Goal".to_owned(),
7025 properties: r#"{"name":"Beta"}"#.to_owned(),
7026 source_ref: Some("src-2".to_owned()),
7027 upsert: true,
7028 chunk_policy: ChunkPolicy::Preserve,
7029 content_ref: None,
7030 }],
7031 node_retires: vec![],
7032 edges: vec![],
7033 edge_retires: vec![],
7034 chunks: vec![],
7035 runs: vec![],
7036 steps: vec![],
7037 actions: vec![],
7038 optional_backfills: vec![],
7039 vec_inserts: vec![],
7040 operational_writes: vec![],
7041 })
7042 .expect("upsert");
7043
7044 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7045 let count: i64 = conn
7046 .query_row(
7047 "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7048 [],
7049 |row| row.get(0),
7050 )
7051 .expect("count");
7052 assert_eq!(
7053 count, 1,
7054 "must have exactly one property FTS row after upsert"
7055 );
7056
7057 let text: String = conn
7058 .query_row(
7059 "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7060 [],
7061 |row| row.get(0),
7062 )
7063 .expect("text");
7064 assert_eq!(text, "Beta", "property FTS must reflect updated properties");
7065 }
7066
7067 #[test]
7068 fn property_fts_rows_deleted_on_retire() {
7069 let db = NamedTempFile::new().expect("temporary db");
7070 let schema = Arc::new(SchemaManager::new());
7071 {
7072 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7073 schema.bootstrap(&conn).expect("bootstrap");
7074 conn.execute(
7075 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7076 VALUES ('Goal', '[\"$.name\"]', ' ')",
7077 [],
7078 )
7079 .expect("register schema");
7080 }
7081 let writer = WriterActor::start(
7082 db.path(),
7083 Arc::clone(&schema),
7084 ProvenanceMode::Warn,
7085 Arc::new(TelemetryCounters::default()),
7086 )
7087 .expect("writer");
7088
7089 writer
7091 .submit(WriteRequest {
7092 label: "insert".to_owned(),
7093 nodes: vec![NodeInsert {
7094 row_id: "row-1".to_owned(),
7095 logical_id: "goal-1".to_owned(),
7096 kind: "Goal".to_owned(),
7097 properties: r#"{"name":"Alpha"}"#.to_owned(),
7098 source_ref: Some("src-1".to_owned()),
7099 upsert: false,
7100 chunk_policy: ChunkPolicy::Preserve,
7101 content_ref: None,
7102 }],
7103 node_retires: vec![],
7104 edges: vec![],
7105 edge_retires: vec![],
7106 chunks: vec![],
7107 runs: vec![],
7108 steps: vec![],
7109 actions: vec![],
7110 optional_backfills: vec![],
7111 vec_inserts: vec![],
7112 operational_writes: vec![],
7113 })
7114 .expect("insert");
7115
7116 writer
7118 .submit(WriteRequest {
7119 label: "retire".to_owned(),
7120 nodes: vec![],
7121 node_retires: vec![NodeRetire {
7122 logical_id: "goal-1".to_owned(),
7123 source_ref: Some("forget-1".to_owned()),
7124 }],
7125 edges: vec![],
7126 edge_retires: vec![],
7127 chunks: vec![],
7128 runs: vec![],
7129 steps: vec![],
7130 actions: vec![],
7131 optional_backfills: vec![],
7132 vec_inserts: vec![],
7133 operational_writes: vec![],
7134 })
7135 .expect("retire");
7136
7137 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7138 let count: i64 = conn
7139 .query_row(
7140 "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7141 [],
7142 |row| row.get(0),
7143 )
7144 .expect("count");
7145 assert_eq!(count, 0, "property FTS row must be deleted on retire");
7146 }
7147
7148 #[test]
7149 fn no_property_fts_row_for_unregistered_kind() {
7150 let db = NamedTempFile::new().expect("temporary db");
7151 let schema = Arc::new(SchemaManager::new());
7152 {
7153 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7154 schema.bootstrap(&conn).expect("bootstrap");
7155 }
7157 let writer = WriterActor::start(
7158 db.path(),
7159 Arc::clone(&schema),
7160 ProvenanceMode::Warn,
7161 Arc::new(TelemetryCounters::default()),
7162 )
7163 .expect("writer");
7164
7165 writer
7166 .submit(WriteRequest {
7167 label: "insert".to_owned(),
7168 nodes: vec![NodeInsert {
7169 row_id: "row-1".to_owned(),
7170 logical_id: "note-1".to_owned(),
7171 kind: "Note".to_owned(),
7172 properties: r#"{"title":"hello"}"#.to_owned(),
7173 source_ref: Some("src-1".to_owned()),
7174 upsert: false,
7175 chunk_policy: ChunkPolicy::Preserve,
7176 content_ref: None,
7177 }],
7178 node_retires: vec![],
7179 edges: vec![],
7180 edge_retires: vec![],
7181 chunks: vec![],
7182 runs: vec![],
7183 steps: vec![],
7184 actions: vec![],
7185 optional_backfills: vec![],
7186 vec_inserts: vec![],
7187 operational_writes: vec![],
7188 })
7189 .expect("insert");
7190
7191 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7192 let count: i64 = conn
7193 .query_row("SELECT count(*) FROM fts_node_properties", [], |row| {
7194 row.get(0)
7195 })
7196 .expect("count");
7197 assert_eq!(count, 0, "no property FTS rows for unregistered kind");
7198 }
7199
7200 mod extract_json_path_tests {
7201 use super::super::extract_json_path;
7202 use serde_json::json;
7203
7204 #[test]
7205 fn string_value() {
7206 let v = json!({"name": "alice"});
7207 assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7208 }
7209
7210 #[test]
7211 fn number_value() {
7212 let v = json!({"age": 42});
7213 assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7214 }
7215
7216 #[test]
7217 fn bool_value() {
7218 let v = json!({"active": true});
7219 assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7220 }
7221
7222 #[test]
7223 fn null_value() {
7224 let v = json!({"x": null});
7225 assert!(extract_json_path(&v, "$.x").is_empty());
7226 }
7227
7228 #[test]
7229 fn missing_path() {
7230 let v = json!({"name": "a"});
7231 assert!(extract_json_path(&v, "$.missing").is_empty());
7232 }
7233
7234 #[test]
7235 fn nested_path() {
7236 let v = json!({"address": {"city": "NYC"}});
7237 assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7238 }
7239
7240 #[test]
7241 fn array_of_strings() {
7242 let v = json!({"tags": ["a", "b", "c"]});
7243 assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7244 }
7245
7246 #[test]
7247 fn array_mixed_scalars() {
7248 let v = json!({"vals": ["x", 1, true]});
7249 assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7250 }
7251
7252 #[test]
7253 fn array_only_objects_returns_empty() {
7254 let v = json!({"data": [{"k": "v"}]});
7255 assert!(extract_json_path(&v, "$.data").is_empty());
7256 }
7257
7258 #[test]
7259 fn array_mixed_objects_and_scalars() {
7260 let v = json!({"data": ["keep", {"skip": true}, "also"]});
7261 assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7262 }
7263
7264 #[test]
7265 fn object_returns_empty() {
7266 let v = json!({"meta": {"k": "v"}});
7267 assert!(extract_json_path(&v, "$.meta").is_empty());
7268 }
7269
7270 #[test]
7271 fn no_prefix_returns_empty() {
7272 let v = json!({"name": "a"});
7273 assert!(extract_json_path(&v, "name").is_empty());
7274 }
7275 }
7276
7277 mod recursive_extraction_tests {
7278 use super::super::{
7279 LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7280 PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7281 };
7282 use serde_json::json;
7283
7284 fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7285 PropertyFtsSchema {
7286 paths,
7287 separator: " ".to_owned(),
7288 exclude_paths: Vec::new(),
7289 }
7290 }
7291
7292 fn schema_with_excludes(
7293 paths: Vec<PropertyPathEntry>,
7294 excludes: Vec<String>,
7295 ) -> PropertyFtsSchema {
7296 PropertyFtsSchema {
7297 paths,
7298 separator: " ".to_owned(),
7299 exclude_paths: excludes,
7300 }
7301 }
7302
7303 #[test]
7304 fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7305 let props = json!({"payload": {"b": "two", "a": "one"}});
7306 let (blob, positions, _stats) = extract_property_fts(
7307 &props,
7308 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7309 );
7310 let blob = blob.expect("blob emitted");
7311 let idx_one = blob.find("one").expect("contains 'one'");
7312 let idx_two = blob.find("two").expect("contains 'two'");
7313 assert!(
7314 idx_one < idx_two,
7315 "lex order: 'one' (key a) before 'two' (key b)"
7316 );
7317 assert_eq!(positions.len(), 2);
7318 assert_eq!(positions[0].leaf_path, "$.payload.a");
7319 assert_eq!(positions[1].leaf_path, "$.payload.b");
7320 }
7321
7322 #[test]
7323 fn recursive_extraction_walks_arrays_of_scalars() {
7324 let props = json!({"tags": ["red", "blue"]});
7325 let (_blob, positions, _stats) = extract_property_fts(
7326 &props,
7327 &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7328 );
7329 assert_eq!(positions.len(), 2);
7330 assert_eq!(positions[0].leaf_path, "$.tags[0]");
7331 assert_eq!(positions[1].leaf_path, "$.tags[1]");
7332 }
7333
7334 #[test]
7335 fn recursive_extraction_walks_arrays_of_objects() {
7336 let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7337 let (_blob, positions, _stats) = extract_property_fts(
7338 &props,
7339 &schema(vec![PropertyPathEntry::recursive("$.items")]),
7340 );
7341 assert_eq!(positions.len(), 2);
7342 assert_eq!(positions[0].leaf_path, "$.items[0].name");
7343 assert_eq!(positions[1].leaf_path, "$.items[1].name");
7344 }
7345
7346 #[test]
7347 fn recursive_extraction_stringifies_numbers_and_bools() {
7348 let props = json!({"root": {"n": 42, "ok": true}});
7349 let (blob, _positions, _stats) = extract_property_fts(
7350 &props,
7351 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7352 );
7353 let blob = blob.expect("blob emitted");
7354 assert!(blob.contains("42"));
7355 assert!(blob.contains("true"));
7356 }
7357
7358 #[test]
7359 fn recursive_extraction_skips_nulls_and_missing() {
7360 let props = json!({"root": {"x": null, "y": "present"}});
7361 let (blob, positions, _stats) = extract_property_fts(
7362 &props,
7363 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7364 );
7365 let blob = blob.expect("blob emitted");
7366 assert!(!blob.contains("null"));
7367 assert_eq!(positions.len(), 1);
7368 assert_eq!(positions[0].leaf_path, "$.root.y");
7369 }
7370
7371 #[test]
7372 fn recursive_extraction_respects_max_depth_guardrail() {
7373 let mut inner = json!("leaf-value");
7375 for _ in 0..10 {
7376 inner = json!({ "k": inner });
7377 }
7378 let props = json!({ "root": inner });
7379 let (blob, positions, stats) = extract_property_fts(
7380 &props,
7381 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7382 );
7383 assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7384 assert!(
7386 blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7387 "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7388 );
7389 let _ = positions;
7392 let _ = MAX_RECURSIVE_DEPTH;
7393 }
7394
7395 #[test]
7396 fn recursive_extraction_respects_max_bytes_guardrail() {
7397 let leaves: Vec<String> = (0..40)
7399 .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7400 .collect();
7401 let props = json!({ "root": leaves });
7402 let (blob, positions, stats) = extract_property_fts(
7403 &props,
7404 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7405 );
7406 assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7407 let blob = blob.expect("blob must still be emitted");
7408 assert!(
7409 blob.len() <= MAX_EXTRACTED_BYTES,
7410 "blob must not exceed MAX_EXTRACTED_BYTES"
7411 );
7412 for pos in &positions {
7414 assert!(pos.end_offset <= blob.len());
7415 let slice = &blob[pos.start_offset..pos.end_offset];
7416 assert!(!slice.is_empty());
7419 assert!(!slice.contains(LEAF_SEPARATOR));
7420 }
7421 assert!(!blob.ends_with(LEAF_SEPARATOR));
7423 }
7424
7425 #[test]
7426 fn recursive_extraction_respects_exclude_paths() {
7427 let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7428 let (blob, positions, stats) = extract_property_fts(
7429 &props,
7430 &schema_with_excludes(
7431 vec![PropertyPathEntry::recursive("$.payload")],
7432 vec!["$.payload.priv".to_owned()],
7433 ),
7434 );
7435 let blob = blob.expect("blob emitted");
7436 assert!(blob.contains("yes"));
7437 assert!(!blob.contains("no"));
7438 assert_eq!(positions.len(), 1);
7439 assert_eq!(positions[0].leaf_path, "$.payload.pub");
7440 assert!(stats.excluded_subtree > 0);
7441 }
7442
7443 #[test]
7444 fn position_map_entries_match_emitted_leaves_in_order() {
7445 let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7446 let (blob, positions, _stats) = extract_property_fts(
7447 &props,
7448 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7449 );
7450 let blob = blob.expect("blob emitted");
7451 assert_eq!(positions.len(), 3);
7452 assert_eq!(positions[0].leaf_path, "$.root.a");
7454 assert_eq!(positions[1].leaf_path, "$.root.b");
7455 assert_eq!(positions[2].leaf_path, "$.root.c");
7456 let mut prev_end: usize = 0;
7458 for (i, pos) in positions.iter().enumerate() {
7459 assert!(pos.start_offset >= prev_end);
7460 assert!(pos.end_offset > pos.start_offset);
7461 assert!(pos.end_offset <= blob.len());
7462 let slice = &blob[pos.start_offset..pos.end_offset];
7463 match i {
7464 0 => assert_eq!(slice, "alpha"),
7465 1 => assert_eq!(slice, "bravo"),
7466 2 => assert_eq!(slice, "charlie"),
7467 _ => unreachable!(),
7468 }
7469 prev_end = pos.end_offset;
7470 }
7471 }
7472
7473 #[test]
7474 fn scalar_only_schema_produces_empty_position_map() {
7475 let props = json!({"name": "alpha", "title": "beta"});
7476 let (blob, positions, _stats) = extract_property_fts(
7477 &props,
7478 &schema(vec![
7479 PropertyPathEntry::scalar("$.name"),
7480 PropertyPathEntry::scalar("$.title"),
7481 ]),
7482 );
7483 assert_eq!(blob.as_deref(), Some("alpha beta"));
7484 assert!(
7485 positions.is_empty(),
7486 "scalar-only schema must emit no position entries"
7487 );
7488 let _: Vec<PositionEntry> = positions;
7491 }
7492
7493 fn assert_unique_start_offsets(positions: &[PositionEntry]) {
7494 let mut seen = std::collections::HashSet::new();
7495 for pos in positions {
7496 let start = pos.start_offset;
7497 assert!(
7498 seen.insert(start),
7499 "duplicate start_offset {start} in positions {positions:?}"
7500 );
7501 }
7502 }
7503
7504 #[test]
7505 fn recursive_extraction_empty_then_nonempty_in_array() {
7506 let props = json!({"payload": {"xs": ["", "x"]}});
7507 let (combined, positions, _stats) = extract_property_fts(
7508 &props,
7509 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7510 );
7511 assert_eq!(combined.as_deref(), Some("x"));
7512 assert_eq!(positions.len(), 1);
7513 assert_eq!(positions[0].leaf_path, "$.payload.xs[1]");
7514 assert_eq!(positions[0].start_offset, 0);
7515 assert_eq!(positions[0].end_offset, 1);
7516 assert_unique_start_offsets(&positions);
7517 }
7518
7519 #[test]
7520 fn recursive_extraction_two_empties_then_nonempty_in_array() {
7521 let props = json!({"payload": {"xs": ["", "", "x"]}});
7522 let (combined, positions, _stats) = extract_property_fts(
7523 &props,
7524 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7525 );
7526 assert_eq!(combined.as_deref(), Some("x"));
7527 assert_eq!(positions.len(), 1);
7528 assert_eq!(positions[0].leaf_path, "$.payload.xs[2]");
7529 assert_eq!(positions[0].start_offset, 0);
7530 assert_eq!(positions[0].end_offset, 1);
7531 assert_unique_start_offsets(&positions);
7532 }
7533
7534 #[test]
7535 fn recursive_extraction_empty_then_nonempty_sibling_keys() {
7536 let props = json!({"payload": {"a": "", "b": "x"}});
7537 let (combined, positions, _stats) = extract_property_fts(
7538 &props,
7539 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7540 );
7541 assert_eq!(combined.as_deref(), Some("x"));
7542 assert_eq!(positions.len(), 1);
7543 assert_eq!(positions[0].leaf_path, "$.payload.b");
7544 assert_eq!(positions[0].start_offset, 0);
7545 assert_eq!(positions[0].end_offset, 1);
7546 assert_unique_start_offsets(&positions);
7547 }
7548
7549 #[test]
7550 fn recursive_extraction_nested_empty_then_nonempty_sibling_keys() {
7551 let props = json!({"payload": {"inner": {"a": "", "b": "x"}}});
7552 let (combined, positions, _stats) = extract_property_fts(
7553 &props,
7554 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7555 );
7556 assert_eq!(combined.as_deref(), Some("x"));
7557 assert_eq!(positions.len(), 1);
7558 assert_eq!(positions[0].leaf_path, "$.payload.inner.b");
7559 assert_eq!(positions[0].start_offset, 0);
7560 assert_eq!(positions[0].end_offset, 1);
7561 assert_unique_start_offsets(&positions);
7562 }
7563
7564 #[test]
7565 fn recursive_extraction_descent_past_empty_sibling_into_nested_subtree() {
7566 let props = json!({"payload": {"a": "", "b": {"c": "x"}}});
7567 let (combined, positions, _stats) = extract_property_fts(
7568 &props,
7569 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7570 );
7571 assert_eq!(combined.as_deref(), Some("x"));
7572 assert_eq!(positions.len(), 1);
7573 assert_eq!(positions[0].leaf_path, "$.payload.b.c");
7574 assert_eq!(positions[0].start_offset, 0);
7575 assert_eq!(positions[0].end_offset, 1);
7576 assert_unique_start_offsets(&positions);
7577 }
7578
7579 #[test]
7580 fn recursive_extraction_all_empty_shapes_emit_no_positions() {
7581 let cases = vec![
7582 json!({"payload": {}}),
7583 json!({"payload": {"a": ""}}),
7584 json!({"payload": {"xs": []}}),
7585 json!({"payload": {"xs": [""]}}),
7586 json!({"payload": {"xs": ["", ""]}}),
7587 json!({"payload": {"xs": ["", "", ""]}}),
7588 ];
7589 for case in cases {
7590 let (combined, positions, _stats) = extract_property_fts(
7591 &case,
7592 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7593 );
7594 assert!(
7595 combined.is_none(),
7596 "all-empty payload {case:?} must produce no combined text, got {combined:?}"
7597 );
7598 assert!(
7599 positions.is_empty(),
7600 "all-empty payload {case:?} must produce no positions, got {positions:?}"
7601 );
7602 }
7603 }
7604
7605 #[test]
7606 fn recursive_extraction_nonempty_then_empty_then_nonempty() {
7607 let props = json!({"payload": {"xs": ["x", "", "y"]}});
7608 let (combined, positions, _stats) = extract_property_fts(
7609 &props,
7610 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7611 );
7612 let blob = combined.expect("blob emitted");
7613 assert_eq!(positions.len(), 2);
7614 assert_eq!(positions[0].leaf_path, "$.payload.xs[0]");
7615 assert_eq!(positions[1].leaf_path, "$.payload.xs[2]");
7616 assert_eq!(positions[0].start_offset, 0);
7617 assert_eq!(positions[0].end_offset, 1);
7618 assert_eq!(positions[1].start_offset, 1 + LEAF_SEPARATOR.len());
7622 assert_eq!(positions[1].end_offset, 2 + LEAF_SEPARATOR.len());
7623 assert_eq!(
7624 &blob[positions[0].start_offset..positions[0].end_offset],
7625 "x"
7626 );
7627 assert_eq!(
7628 &blob[positions[1].start_offset..positions[1].end_offset],
7629 "y"
7630 );
7631 assert_unique_start_offsets(&positions);
7632 }
7633
7634 #[test]
7635 fn recursive_extraction_null_leaves_unchanged() {
7636 let props = json!({"payload": {"xs": [null, null]}});
7637 let (combined, positions, _stats) = extract_property_fts(
7638 &props,
7639 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7640 );
7641 assert!(combined.is_none());
7642 assert!(positions.is_empty());
7643 }
7644 }
7645}