1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::SchemaManager;
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14 OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15 OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16 OperationalValidationMode, extract_secondary_index_entries_for_current,
17 extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18 parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26 pub target: ProjectionTarget,
28 pub payload: String,
30}
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35 #[default]
37 Preserve,
38 Replace,
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45 #[default]
47 Warn,
48 Require,
51}
52
53#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56 pub row_id: String,
57 pub logical_id: String,
58 pub kind: String,
59 pub properties: String,
60 pub source_ref: Option<String>,
61 pub upsert: bool,
64 pub chunk_policy: ChunkPolicy,
66 pub content_ref: Option<String>,
68}
69
70#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EdgeInsert {
73 pub row_id: String,
74 pub logical_id: String,
75 pub source_logical_id: String,
76 pub target_logical_id: String,
77 pub kind: String,
78 pub properties: String,
79 pub source_ref: Option<String>,
80 pub upsert: bool,
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
87pub struct NodeRetire {
88 pub logical_id: String,
89 pub source_ref: Option<String>,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeRetire {
95 pub logical_id: String,
96 pub source_ref: Option<String>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq)]
101pub struct ChunkInsert {
102 pub id: String,
103 pub node_logical_id: String,
104 pub text_content: String,
105 pub byte_start: Option<i64>,
106 pub byte_end: Option<i64>,
107 pub content_hash: Option<String>,
109}
110
111#[derive(Clone, Debug, PartialEq)]
118pub struct VecInsert {
119 pub chunk_id: String,
120 pub embedding: Vec<f32>,
121}
122
123#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum OperationalWrite {
126 Append {
127 collection: String,
128 record_key: String,
129 payload_json: String,
130 source_ref: Option<String>,
131 },
132 Put {
133 collection: String,
134 record_key: String,
135 payload_json: String,
136 source_ref: Option<String>,
137 },
138 Delete {
139 collection: String,
140 record_key: String,
141 source_ref: Option<String>,
142 },
143}
144
145#[derive(Clone, Debug, PartialEq, Eq)]
147pub struct RunInsert {
148 pub id: String,
149 pub kind: String,
150 pub status: String,
151 pub properties: String,
152 pub source_ref: Option<String>,
153 pub upsert: bool,
154 pub supersedes_id: Option<String>,
155}
156
157#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepInsert {
160 pub id: String,
161 pub run_id: String,
162 pub kind: String,
163 pub status: String,
164 pub properties: String,
165 pub source_ref: Option<String>,
166 pub upsert: bool,
167 pub supersedes_id: Option<String>,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct ActionInsert {
173 pub id: String,
174 pub step_id: String,
175 pub kind: String,
176 pub status: String,
177 pub properties: String,
178 pub source_ref: Option<String>,
179 pub upsert: bool,
180 pub supersedes_id: Option<String>,
181}
182
183const MAX_NODES: usize = 10_000;
185const MAX_EDGES: usize = 10_000;
186const MAX_CHUNKS: usize = 50_000;
187const MAX_RETIRES: usize = 10_000;
188const MAX_RUNTIME_ITEMS: usize = 10_000;
189const MAX_OPERATIONAL: usize = 10_000;
190const MAX_TOTAL_ITEMS: usize = 100_000;
191
192const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
199
200#[derive(Clone, Debug, PartialEq)]
202pub struct WriteRequest {
203 pub label: String,
204 pub nodes: Vec<NodeInsert>,
205 pub node_retires: Vec<NodeRetire>,
206 pub edges: Vec<EdgeInsert>,
207 pub edge_retires: Vec<EdgeRetire>,
208 pub chunks: Vec<ChunkInsert>,
209 pub runs: Vec<RunInsert>,
210 pub steps: Vec<StepInsert>,
211 pub actions: Vec<ActionInsert>,
212 pub optional_backfills: Vec<OptionalProjectionTask>,
213 pub vec_inserts: Vec<VecInsert>,
216 pub operational_writes: Vec<OperationalWrite>,
217}
218
219#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct WriteReceipt {
222 pub label: String,
223 pub optional_backfill_count: usize,
224 pub warnings: Vec<String>,
225 pub provenance_warnings: Vec<String>,
226}
227
228#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct LastAccessTouchRequest {
231 pub logical_ids: Vec<String>,
232 pub touched_at: i64,
233 pub source_ref: Option<String>,
234}
235
236#[derive(Clone, Debug, PartialEq, Eq)]
238pub struct LastAccessTouchReport {
239 pub touched_logical_ids: usize,
240 pub touched_at: i64,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct FtsProjectionRow {
245 chunk_id: String,
246 node_logical_id: String,
247 kind: String,
248 text_content: String,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
252struct FtsPropertyProjectionRow {
253 node_logical_id: String,
254 kind: String,
255 text_content: String,
256 positions: Vec<PositionEntry>,
257}
258
259pub(crate) const MAX_RECURSIVE_DEPTH: usize = 8;
262
263pub(crate) const MAX_EXTRACTED_BYTES: usize = 65_536;
273
274pub(crate) const LEAF_SEPARATOR: &str = " fathomdbphrasebreaksentinel ";
301
302#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
305pub(crate) enum PropertyPathMode {
306 #[default]
309 Scalar,
310 Recursive,
313}
314
315#[derive(Clone, Debug, PartialEq, Eq)]
317pub(crate) struct PropertyPathEntry {
318 pub path: String,
319 pub mode: PropertyPathMode,
320}
321
322impl PropertyPathEntry {
323 pub(crate) fn scalar(path: impl Into<String>) -> Self {
324 Self {
325 path: path.into(),
326 mode: PropertyPathMode::Scalar,
327 }
328 }
329
330 #[cfg(test)]
331 pub(crate) fn recursive(path: impl Into<String>) -> Self {
332 Self {
333 path: path.into(),
334 mode: PropertyPathMode::Recursive,
335 }
336 }
337}
338
339#[derive(Clone, Debug, PartialEq, Eq)]
341pub(crate) struct PropertyFtsSchema {
342 pub paths: Vec<PropertyPathEntry>,
345 pub separator: String,
348 pub exclude_paths: Vec<String>,
357}
358
359#[derive(Clone, Debug, PartialEq, Eq)]
363pub(crate) struct PositionEntry {
364 pub start_offset: usize,
365 pub end_offset: usize,
366 pub leaf_path: String,
367}
368
369#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
372pub(crate) struct ExtractStats {
373 pub depth_cap_hit: usize,
374 pub byte_cap_reached: bool,
379 pub excluded_subtree: usize,
380}
381
382impl ExtractStats {
383 fn merge(&mut self, other: ExtractStats) {
384 self.depth_cap_hit += other.depth_cap_hit;
385 self.byte_cap_reached |= other.byte_cap_reached;
386 self.excluded_subtree += other.excluded_subtree;
387 }
388}
389
390struct PreparedWrite {
391 label: String,
392 nodes: Vec<NodeInsert>,
393 node_retires: Vec<NodeRetire>,
394 edges: Vec<EdgeInsert>,
395 edge_retires: Vec<EdgeRetire>,
396 chunks: Vec<ChunkInsert>,
397 runs: Vec<RunInsert>,
398 steps: Vec<StepInsert>,
399 actions: Vec<ActionInsert>,
400 #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
403 vec_inserts: Vec<VecInsert>,
404 operational_writes: Vec<OperationalWrite>,
405 operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
406 operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
407 operational_validation_warnings: Vec<String>,
408 node_kinds: HashMap<String, String>,
411 required_fts_rows: Vec<FtsProjectionRow>,
413 required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
415 optional_backfills: Vec<OptionalProjectionTask>,
416}
417
418enum WriteMessage {
419 Submit {
420 prepared: Box<PreparedWrite>,
421 reply: Sender<Result<WriteReceipt, EngineError>>,
422 },
423 TouchLastAccessed {
424 request: LastAccessTouchRequest,
425 reply: Sender<Result<LastAccessTouchReport, EngineError>>,
426 },
427}
428
429#[derive(Debug)]
434pub struct WriterActor {
435 sender: ManuallyDrop<SyncSender<WriteMessage>>,
436 thread_handle: Option<thread::JoinHandle<()>>,
437 provenance_mode: ProvenanceMode,
438 _telemetry: Arc<TelemetryCounters>,
440}
441
442impl WriterActor {
443 pub fn start(
446 path: impl AsRef<Path>,
447 schema_manager: Arc<SchemaManager>,
448 provenance_mode: ProvenanceMode,
449 telemetry: Arc<TelemetryCounters>,
450 ) -> Result<Self, EngineError> {
451 let database_path = path.as_ref().to_path_buf();
452 let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
453
454 let writer_telemetry = Arc::clone(&telemetry);
455 let handle = thread::Builder::new()
456 .name("fathomdb-writer".to_owned())
457 .spawn(move || {
458 writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
459 })
460 .map_err(EngineError::Io)?;
461
462 Ok(Self {
463 sender: ManuallyDrop::new(sender),
464 thread_handle: Some(handle),
465 provenance_mode,
466 _telemetry: telemetry,
467 })
468 }
469
470 fn is_thread_alive(&self) -> bool {
472 self.thread_handle
473 .as_ref()
474 .is_some_and(|h| !h.is_finished())
475 }
476
477 fn check_thread_alive(&self) -> Result<(), EngineError> {
479 if self.is_thread_alive() {
480 Ok(())
481 } else {
482 Err(EngineError::WriterRejected(
483 "writer thread has exited".to_owned(),
484 ))
485 }
486 }
487
488 pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
492 self.check_thread_alive()?;
493 let prepared = prepare_write(request, self.provenance_mode)?;
494 let (reply_tx, reply_rx) = mpsc::channel();
495 self.sender
496 .send(WriteMessage::Submit {
497 prepared: Box::new(prepared),
498 reply: reply_tx,
499 })
500 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
501
502 recv_with_timeout(&reply_rx)
503 }
504
505 pub fn touch_last_accessed(
509 &self,
510 request: LastAccessTouchRequest,
511 ) -> Result<LastAccessTouchReport, EngineError> {
512 self.check_thread_alive()?;
513 prepare_touch_last_accessed(&request, self.provenance_mode)?;
514 let (reply_tx, reply_rx) = mpsc::channel();
515 self.sender
516 .send(WriteMessage::TouchLastAccessed {
517 request,
518 reply: reply_tx,
519 })
520 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
521
522 recv_with_timeout(&reply_rx)
523 }
524}
525
526#[cfg(not(feature = "tracing"))]
527#[allow(clippy::print_stderr)]
528fn stderr_panic_notice() {
529 eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
530}
531
532impl Drop for WriterActor {
533 fn drop(&mut self) {
534 unsafe { ManuallyDrop::drop(&mut self.sender) };
541
542 if let Some(handle) = self.thread_handle.take() {
544 match handle.join() {
545 Ok(()) => {}
546 Err(payload) => {
547 if std::thread::panicking() {
548 trace_warn!(
549 "writer thread panicked during shutdown (suppressed: already panicking)"
550 );
551 #[cfg(not(feature = "tracing"))]
552 stderr_panic_notice();
553 } else {
554 std::panic::resume_unwind(payload);
555 }
556 }
557 }
558 }
559 }
560}
561
562fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
564 rx.recv_timeout(WRITER_REPLY_TIMEOUT)
565 .map_err(|error| match error {
566 mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
567 "write timed out waiting for writer thread reply — the write may still commit"
568 .to_owned(),
569 ),
570 mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
571 })
572 .and_then(|result| result)
573}
574
575fn prepare_touch_last_accessed(
576 request: &LastAccessTouchRequest,
577 mode: ProvenanceMode,
578) -> Result<(), EngineError> {
579 if request.logical_ids.is_empty() {
580 return Err(EngineError::InvalidWrite(
581 "touch_last_accessed requires at least one logical_id".to_owned(),
582 ));
583 }
584 for logical_id in &request.logical_ids {
585 if logical_id.trim().is_empty() {
586 return Err(EngineError::InvalidWrite(
587 "touch_last_accessed requires non-empty logical_ids".to_owned(),
588 ));
589 }
590 }
591 if mode == ProvenanceMode::Require && request.source_ref.is_none() {
592 return Err(EngineError::InvalidWrite(
593 "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
594 .to_owned(),
595 ));
596 }
597 Ok(())
598}
599
600fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
601 let missing: Vec<String> = request
602 .nodes
603 .iter()
604 .filter(|n| n.source_ref.is_none())
605 .map(|n| format!("node '{}'", n.logical_id))
606 .chain(
607 request
608 .node_retires
609 .iter()
610 .filter(|r| r.source_ref.is_none())
611 .map(|r| format!("node retire '{}'", r.logical_id)),
612 )
613 .chain(
614 request
615 .edges
616 .iter()
617 .filter(|e| e.source_ref.is_none())
618 .map(|e| format!("edge '{}'", e.logical_id)),
619 )
620 .chain(
621 request
622 .edge_retires
623 .iter()
624 .filter(|r| r.source_ref.is_none())
625 .map(|r| format!("edge retire '{}'", r.logical_id)),
626 )
627 .chain(
628 request
629 .runs
630 .iter()
631 .filter(|r| r.source_ref.is_none())
632 .map(|r| format!("run '{}'", r.id)),
633 )
634 .chain(
635 request
636 .steps
637 .iter()
638 .filter(|s| s.source_ref.is_none())
639 .map(|s| format!("step '{}'", s.id)),
640 )
641 .chain(
642 request
643 .actions
644 .iter()
645 .filter(|a| a.source_ref.is_none())
646 .map(|a| format!("action '{}'", a.id)),
647 )
648 .chain(
649 request
650 .operational_writes
651 .iter()
652 .filter(|write| operational_write_source_ref(write).is_none())
653 .map(|write| {
654 format!(
655 "operational {} '{}:{}'",
656 operational_write_kind(write),
657 operational_write_collection(write),
658 operational_write_record_key(write)
659 )
660 }),
661 )
662 .collect();
663
664 if missing.is_empty() {
665 Ok(())
666 } else {
667 Err(EngineError::InvalidWrite(format!(
668 "ProvenanceMode::Require: missing source_ref on: {}",
669 missing.join(", ")
670 )))
671 }
672}
673
674fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
675 if request.nodes.len() > MAX_NODES {
676 return Err(EngineError::InvalidWrite(format!(
677 "too many nodes: {} exceeds limit of {MAX_NODES}",
678 request.nodes.len()
679 )));
680 }
681 if request.edges.len() > MAX_EDGES {
682 return Err(EngineError::InvalidWrite(format!(
683 "too many edges: {} exceeds limit of {MAX_EDGES}",
684 request.edges.len()
685 )));
686 }
687 if request.chunks.len() > MAX_CHUNKS {
688 return Err(EngineError::InvalidWrite(format!(
689 "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
690 request.chunks.len()
691 )));
692 }
693 let retires = request.node_retires.len() + request.edge_retires.len();
694 if retires > MAX_RETIRES {
695 return Err(EngineError::InvalidWrite(format!(
696 "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
697 )));
698 }
699 let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
700 if runtime_items > MAX_RUNTIME_ITEMS {
701 return Err(EngineError::InvalidWrite(format!(
702 "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
703 )));
704 }
705 if request.operational_writes.len() > MAX_OPERATIONAL {
706 return Err(EngineError::InvalidWrite(format!(
707 "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
708 request.operational_writes.len()
709 )));
710 }
711 let total = request.nodes.len()
712 + request.node_retires.len()
713 + request.edges.len()
714 + request.edge_retires.len()
715 + request.chunks.len()
716 + request.runs.len()
717 + request.steps.len()
718 + request.actions.len()
719 + request.vec_inserts.len()
720 + request.operational_writes.len();
721 if total > MAX_TOTAL_ITEMS {
722 return Err(EngineError::InvalidWrite(format!(
723 "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
724 )));
725 }
726 Ok(())
727}
728
729#[allow(clippy::too_many_lines)]
730fn prepare_write(
731 request: WriteRequest,
732 mode: ProvenanceMode,
733) -> Result<PreparedWrite, EngineError> {
734 validate_request_size(&request)?;
735
736 for node in &request.nodes {
738 if node.row_id.is_empty() {
739 return Err(EngineError::InvalidWrite(
740 "NodeInsert has empty row_id".to_owned(),
741 ));
742 }
743 if node.logical_id.is_empty() {
744 return Err(EngineError::InvalidWrite(
745 "NodeInsert has empty logical_id".to_owned(),
746 ));
747 }
748 }
749 for edge in &request.edges {
750 if edge.row_id.is_empty() {
751 return Err(EngineError::InvalidWrite(
752 "EdgeInsert has empty row_id".to_owned(),
753 ));
754 }
755 if edge.logical_id.is_empty() {
756 return Err(EngineError::InvalidWrite(
757 "EdgeInsert has empty logical_id".to_owned(),
758 ));
759 }
760 }
761 for chunk in &request.chunks {
762 if chunk.id.is_empty() {
763 return Err(EngineError::InvalidWrite(
764 "ChunkInsert has empty id".to_owned(),
765 ));
766 }
767 if chunk.text_content.is_empty() {
768 return Err(EngineError::InvalidWrite(format!(
769 "chunk '{}' has empty text_content; empty chunks are not allowed",
770 chunk.id
771 )));
772 }
773 }
774 for run in &request.runs {
775 if run.id.is_empty() {
776 return Err(EngineError::InvalidWrite(
777 "RunInsert has empty id".to_owned(),
778 ));
779 }
780 }
781 for step in &request.steps {
782 if step.id.is_empty() {
783 return Err(EngineError::InvalidWrite(
784 "StepInsert has empty id".to_owned(),
785 ));
786 }
787 }
788 for action in &request.actions {
789 if action.id.is_empty() {
790 return Err(EngineError::InvalidWrite(
791 "ActionInsert has empty id".to_owned(),
792 ));
793 }
794 }
795 for vi in &request.vec_inserts {
796 if vi.chunk_id.is_empty() {
797 return Err(EngineError::InvalidWrite(
798 "VecInsert has empty chunk_id".to_owned(),
799 ));
800 }
801 if vi.embedding.is_empty() {
802 return Err(EngineError::InvalidWrite(format!(
803 "VecInsert for chunk '{}' has empty embedding",
804 vi.chunk_id
805 )));
806 }
807 }
808 for operational in &request.operational_writes {
809 if operational_write_collection(operational).is_empty() {
810 return Err(EngineError::InvalidWrite(
811 "OperationalWrite has empty collection".to_owned(),
812 ));
813 }
814 if operational_write_record_key(operational).is_empty() {
815 return Err(EngineError::InvalidWrite(format!(
816 "OperationalWrite for collection '{}' has empty record_key",
817 operational_write_collection(operational)
818 )));
819 }
820 match operational {
821 OperationalWrite::Append { payload_json, .. }
822 | OperationalWrite::Put { payload_json, .. } => {
823 if payload_json.is_empty() {
824 return Err(EngineError::InvalidWrite(format!(
825 "OperationalWrite {} '{}:{}' has empty payload_json",
826 operational_write_kind(operational),
827 operational_write_collection(operational),
828 operational_write_record_key(operational)
829 )));
830 }
831 }
832 OperationalWrite::Delete { .. } => {}
833 }
834 }
835
836 {
838 let mut seen = std::collections::HashSet::new();
839 for node in &request.nodes {
840 if !seen.insert(node.row_id.as_str()) {
841 return Err(EngineError::InvalidWrite(format!(
842 "duplicate row_id '{}' within the same WriteRequest",
843 node.row_id
844 )));
845 }
846 }
847 for edge in &request.edges {
848 if !seen.insert(edge.row_id.as_str()) {
849 return Err(EngineError::InvalidWrite(format!(
850 "duplicate row_id '{}' within the same WriteRequest",
851 edge.row_id
852 )));
853 }
854 }
855 }
856
857 if mode == ProvenanceMode::Require {
859 check_require_provenance(&request)?;
860 }
861
862 for run in &request.runs {
864 if run.upsert && run.supersedes_id.is_none() {
865 return Err(EngineError::InvalidWrite(format!(
866 "run '{}': upsert=true requires supersedes_id to be set",
867 run.id
868 )));
869 }
870 }
871 for step in &request.steps {
872 if step.upsert && step.supersedes_id.is_none() {
873 return Err(EngineError::InvalidWrite(format!(
874 "step '{}': upsert=true requires supersedes_id to be set",
875 step.id
876 )));
877 }
878 }
879 for action in &request.actions {
880 if action.upsert && action.supersedes_id.is_none() {
881 return Err(EngineError::InvalidWrite(format!(
882 "action '{}': upsert=true requires supersedes_id to be set",
883 action.id
884 )));
885 }
886 }
887
888 let node_kinds = request
889 .nodes
890 .iter()
891 .map(|node| (node.logical_id.clone(), node.kind.clone()))
892 .collect::<HashMap<_, _>>();
893
894 Ok(PreparedWrite {
895 label: request.label,
896 nodes: request.nodes,
897 node_retires: request.node_retires,
898 edges: request.edges,
899 edge_retires: request.edge_retires,
900 chunks: request.chunks,
901 runs: request.runs,
902 steps: request.steps,
903 actions: request.actions,
904 vec_inserts: request.vec_inserts,
905 operational_writes: request.operational_writes,
906 operational_collection_kinds: HashMap::new(),
907 operational_collection_filter_fields: HashMap::new(),
908 operational_validation_warnings: Vec::new(),
909 node_kinds,
910 required_fts_rows: Vec::new(),
911 required_property_fts_rows: Vec::new(),
912 optional_backfills: request.optional_backfills,
913 })
914}
915
916fn writer_loop(
917 database_path: &Path,
918 schema_manager: &Arc<SchemaManager>,
919 receiver: mpsc::Receiver<WriteMessage>,
920 telemetry: &TelemetryCounters,
921) {
922 trace_info!("writer thread started");
923
924 let mut conn = match sqlite::open_connection(database_path) {
925 Ok(conn) => conn,
926 Err(error) => {
927 trace_error!(error = %error, "writer thread: database connection failed");
928 reject_all(receiver, &error.to_string());
929 return;
930 }
931 };
932
933 if let Err(error) = schema_manager.bootstrap(&conn) {
934 trace_error!(error = %error, "writer thread: schema bootstrap failed");
935 reject_all(receiver, &error.to_string());
936 return;
937 }
938
939 for message in receiver {
940 match message {
941 WriteMessage::Submit {
942 mut prepared,
943 reply,
944 } => {
945 #[cfg(feature = "tracing")]
946 let start = std::time::Instant::now();
947 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
948 resolve_and_apply(&mut conn, &mut prepared)
949 }));
950 if let Ok(inner) = result {
951 #[allow(unused_variables)]
952 if let Err(error) = &inner {
953 trace_error!(
954 label = %prepared.label,
955 error = %error,
956 "write failed"
957 );
958 telemetry.increment_errors();
959 } else {
960 let row_count = (prepared.nodes.len()
961 + prepared.edges.len()
962 + prepared.chunks.len()) as u64;
963 telemetry.increment_writes(row_count);
964 trace_info!(
965 label = %prepared.label,
966 nodes = prepared.nodes.len(),
967 edges = prepared.edges.len(),
968 chunks = prepared.chunks.len(),
969 duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
970 "write committed"
971 );
972 }
973 let _ = reply.send(inner);
974 } else {
975 trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
976 telemetry.increment_errors();
977 let _ = conn.execute_batch("ROLLBACK");
979 let _ = reply.send(Err(EngineError::WriterRejected(
980 "writer thread panic during resolve_and_apply".to_owned(),
981 )));
982 }
983 }
984 WriteMessage::TouchLastAccessed { request, reply } => {
985 let result = apply_touch_last_accessed(&mut conn, &request);
986 if result.is_ok() {
987 telemetry.increment_writes(0);
988 } else {
989 telemetry.increment_errors();
990 }
991 let _ = reply.send(result);
992 }
993 }
994 }
995
996 trace_info!("writer thread shutting down");
997}
998
999fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
1000 for message in receiver {
1001 match message {
1002 WriteMessage::Submit { reply, .. } => {
1003 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1004 }
1005 WriteMessage::TouchLastAccessed { reply, .. } => {
1006 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1007 }
1008 }
1009 }
1010}
1011
1012fn resolve_fts_rows(
1020 conn: &rusqlite::Connection,
1021 prepared: &mut PreparedWrite,
1022) -> Result<(), EngineError> {
1023 let retiring_ids: std::collections::HashSet<&str> = prepared
1024 .node_retires
1025 .iter()
1026 .map(|r| r.logical_id.as_str())
1027 .collect();
1028 for chunk in &prepared.chunks {
1029 if retiring_ids.contains(chunk.node_logical_id.as_str()) {
1030 return Err(EngineError::InvalidWrite(format!(
1031 "chunk '{}' references node_logical_id '{}' which is being retired in the same \
1032 WriteRequest; retire and chunk insertion for the same node must not be combined",
1033 chunk.id, chunk.node_logical_id
1034 )));
1035 }
1036 }
1037 for chunk in &prepared.chunks {
1038 let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
1039 k.clone()
1040 } else {
1041 match conn.query_row(
1042 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1043 params![chunk.node_logical_id],
1044 |row| row.get::<_, String>(0),
1045 ) {
1046 Ok(kind) => kind,
1047 Err(rusqlite::Error::QueryReturnedNoRows) => {
1048 return Err(EngineError::InvalidWrite(format!(
1049 "chunk '{}' references node_logical_id '{}' that is not present in this \
1050 write request or the database \
1051 (v1 limitation: chunks and their nodes must be submitted together or the \
1052 node must already exist)",
1053 chunk.id, chunk.node_logical_id
1054 )));
1055 }
1056 Err(e) => return Err(EngineError::Sqlite(e)),
1057 }
1058 };
1059 prepared.required_fts_rows.push(FtsProjectionRow {
1060 chunk_id: chunk.id.clone(),
1061 node_logical_id: chunk.node_logical_id.clone(),
1062 kind,
1063 text_content: chunk.text_content.clone(),
1064 });
1065 }
1066 trace_debug!(
1067 fts_rows = prepared.required_fts_rows.len(),
1068 chunks_processed = prepared.chunks.len(),
1069 "fts row resolution completed"
1070 );
1071 Ok(())
1072}
1073
1074fn resolve_property_fts_rows(
1077 conn: &rusqlite::Connection,
1078 prepared: &mut PreparedWrite,
1079) -> Result<(), EngineError> {
1080 if prepared.nodes.is_empty() {
1081 return Ok(());
1082 }
1083
1084 let schemas: HashMap<String, PropertyFtsSchema> =
1085 load_fts_property_schemas(conn)?.into_iter().collect();
1086
1087 if schemas.is_empty() {
1088 return Ok(());
1089 }
1090
1091 let mut combined_stats = ExtractStats::default();
1092 for node in &prepared.nodes {
1093 let Some(schema) = schemas.get(&node.kind) else {
1094 continue;
1095 };
1096 let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
1097 let (text_content, positions, stats) = extract_property_fts(&props, schema);
1098 combined_stats.merge(stats);
1099 if let Some(text_content) = text_content {
1100 prepared
1101 .required_property_fts_rows
1102 .push(FtsPropertyProjectionRow {
1103 node_logical_id: node.logical_id.clone(),
1104 kind: node.kind.clone(),
1105 text_content,
1106 positions,
1107 });
1108 }
1109 }
1110 if combined_stats != ExtractStats::default() {
1111 trace_debug!(
1112 depth_cap_hit = combined_stats.depth_cap_hit,
1113 byte_cap_reached = combined_stats.byte_cap_reached,
1114 excluded_subtree = combined_stats.excluded_subtree,
1115 "property fts recursive extraction guardrails engaged"
1116 );
1117 }
1118 trace_debug!(
1119 property_fts_rows = prepared.required_property_fts_rows.len(),
1120 nodes_processed = prepared.nodes.len(),
1121 "property fts row resolution completed"
1122 );
1123 Ok(())
1124}
1125
1126pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
1130 let Some(path) = path.strip_prefix("$.") else {
1131 return Vec::new();
1132 };
1133 let mut current = value;
1134 for segment in path.split('.') {
1135 match current.get(segment) {
1136 Some(v) => current = v,
1137 None => return Vec::new(),
1138 }
1139 }
1140 match current {
1141 serde_json::Value::String(s) => vec![s.clone()],
1142 serde_json::Value::Number(n) => vec![n.to_string()],
1143 serde_json::Value::Bool(b) => vec![b.to_string()],
1144 serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
1145 serde_json::Value::Array(arr) => arr
1146 .iter()
1147 .filter_map(|v| match v {
1148 serde_json::Value::String(s) => Some(s.clone()),
1149 serde_json::Value::Number(n) => Some(n.to_string()),
1150 serde_json::Value::Bool(b) => Some(b.to_string()),
1151 _ => None,
1152 })
1153 .collect(),
1154 }
1155}
1156
1157pub(crate) fn extract_property_fts(
1177 props: &serde_json::Value,
1178 schema: &PropertyFtsSchema,
1179) -> (Option<String>, Vec<PositionEntry>, ExtractStats) {
1180 let mut walker = RecursiveWalker {
1181 blob: String::new(),
1182 positions: Vec::new(),
1183 stats: ExtractStats::default(),
1184 exclude_paths: schema.exclude_paths.clone(),
1185 stopped: false,
1186 };
1187
1188 let mut scalar_parts: Vec<String> = Vec::new();
1189
1190 for entry in &schema.paths {
1191 match entry.mode {
1192 PropertyPathMode::Scalar => {
1193 scalar_parts.extend(extract_json_path(props, &entry.path));
1194 }
1195 PropertyPathMode::Recursive => {
1196 let root = resolve_path_root(props, &entry.path);
1197 if let Some(root) = root {
1198 walker.walk(&entry.path, root, 0);
1199 }
1200 }
1201 }
1202 }
1203
1204 let scalar_text = if scalar_parts.is_empty() {
1210 None
1211 } else {
1212 Some(scalar_parts.join(&schema.separator))
1213 };
1214
1215 let combined = match (scalar_text, walker.blob.is_empty()) {
1216 (None, true) => None,
1217 (None, false) => Some(walker.blob.clone()),
1218 (Some(s), true) => Some(s),
1219 (Some(mut s), false) => {
1220 let offset = s.len() + LEAF_SEPARATOR.len();
1223 for pos in &mut walker.positions {
1224 pos.start_offset += offset;
1225 pos.end_offset += offset;
1226 }
1227 s.push_str(LEAF_SEPARATOR);
1228 s.push_str(&walker.blob);
1229 Some(s)
1230 }
1231 };
1232
1233 (combined, walker.positions, walker.stats)
1234}
1235
1236fn resolve_path_root<'a>(
1237 value: &'a serde_json::Value,
1238 path: &str,
1239) -> Option<&'a serde_json::Value> {
1240 let stripped = path.strip_prefix("$.")?;
1241 let mut current = value;
1242 for segment in stripped.split('.') {
1243 current = current.get(segment)?;
1244 }
1245 Some(current)
1246}
1247
1248struct RecursiveWalker {
1249 blob: String,
1250 positions: Vec<PositionEntry>,
1251 stats: ExtractStats,
1252 exclude_paths: Vec<String>,
1253 stopped: bool,
1257}
1258
1259impl RecursiveWalker {
1260 fn walk(&mut self, current_path: &str, value: &serde_json::Value, depth: usize) {
1261 if self.stopped {
1262 return;
1263 }
1264 if self.exclude_paths.iter().any(|p| p == current_path) {
1265 self.stats.excluded_subtree += 1;
1266 return;
1267 }
1268 match value {
1269 serde_json::Value::String(s) => self.emit_leaf(current_path, s),
1270 serde_json::Value::Number(n) => self.emit_leaf(current_path, &n.to_string()),
1271 serde_json::Value::Bool(b) => self.emit_leaf(current_path, &b.to_string()),
1272 serde_json::Value::Null => {}
1273 serde_json::Value::Object(map) => {
1274 if depth >= MAX_RECURSIVE_DEPTH {
1275 self.stats.depth_cap_hit += 1;
1276 return;
1277 }
1278 let mut keys: Vec<&String> = map.keys().collect();
1279 keys.sort();
1280 for key in keys {
1281 if self.stopped {
1282 return;
1283 }
1284 let child_path = format!("{current_path}.{key}");
1285 if let Some(child) = map.get(key) {
1286 self.walk(&child_path, child, depth + 1);
1287 }
1288 }
1289 }
1290 serde_json::Value::Array(items) => {
1291 if depth >= MAX_RECURSIVE_DEPTH {
1292 self.stats.depth_cap_hit += 1;
1293 return;
1294 }
1295 for (idx, item) in items.iter().enumerate() {
1296 if self.stopped {
1297 return;
1298 }
1299 let child_path = format!("{current_path}[{idx}]");
1300 self.walk(&child_path, item, depth + 1);
1301 }
1302 }
1303 }
1304 }
1305
1306 fn emit_leaf(&mut self, leaf_path: &str, value: &str) {
1307 if self.stopped {
1308 return;
1309 }
1310 let sep_len = if self.blob.is_empty() {
1314 0
1315 } else {
1316 LEAF_SEPARATOR.len()
1317 };
1318 let projected_len = self.blob.len() + sep_len + value.len();
1319 if projected_len > MAX_EXTRACTED_BYTES {
1320 self.stats.byte_cap_reached = true;
1321 self.stopped = true;
1322 return;
1323 }
1324 if !self.blob.is_empty() {
1325 self.blob.push_str(LEAF_SEPARATOR);
1326 }
1327 let start_offset = self.blob.len();
1328 self.blob.push_str(value);
1329 let end_offset = self.blob.len();
1330 self.positions.push(PositionEntry {
1331 start_offset,
1332 end_offset,
1333 leaf_path: leaf_path.to_owned(),
1334 });
1335 }
1336}
1337
1338pub(crate) fn load_fts_property_schemas(
1344 conn: &rusqlite::Connection,
1345) -> Result<Vec<(String, PropertyFtsSchema)>, rusqlite::Error> {
1346 let mut stmt =
1347 conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
1348 stmt.query_map([], |row| {
1349 let kind: String = row.get(0)?;
1350 let paths_json: String = row.get(1)?;
1351 let separator: String = row.get(2)?;
1352 let schema = parse_property_schema_json(&paths_json, &separator);
1353 Ok((kind, schema))
1354 })?
1355 .collect::<Result<Vec<_>, _>>()
1356}
1357
1358pub(crate) fn parse_property_schema_json(paths_json: &str, separator: &str) -> PropertyFtsSchema {
1359 let value: serde_json::Value = serde_json::from_str(paths_json).unwrap_or_default();
1360 let mut paths = Vec::new();
1361 let mut exclude_paths: Vec<String> = Vec::new();
1362
1363 let path_values: Vec<serde_json::Value> = match value {
1364 serde_json::Value::Array(arr) => arr,
1365 serde_json::Value::Object(map) => {
1366 if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1367 exclude_paths = excl
1368 .iter()
1369 .filter_map(|v| v.as_str().map(str::to_owned))
1370 .collect();
1371 }
1372 match map.get("paths") {
1373 Some(serde_json::Value::Array(arr)) => arr.clone(),
1374 _ => Vec::new(),
1375 }
1376 }
1377 _ => Vec::new(),
1378 };
1379
1380 for entry in path_values {
1381 match entry {
1382 serde_json::Value::String(path) => {
1383 paths.push(PropertyPathEntry::scalar(path));
1384 }
1385 serde_json::Value::Object(map) => {
1386 let Some(path) = map.get("path").and_then(|v| v.as_str()) else {
1387 continue;
1388 };
1389 let mode = map.get("mode").and_then(|v| v.as_str()).map_or(
1390 PropertyPathMode::Scalar,
1391 |m| match m {
1392 "recursive" => PropertyPathMode::Recursive,
1393 _ => PropertyPathMode::Scalar,
1394 },
1395 );
1396 paths.push(PropertyPathEntry {
1397 path: path.to_owned(),
1398 mode,
1399 });
1400 if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1401 for p in excl {
1402 if let Some(s) = p.as_str() {
1403 exclude_paths.push(s.to_owned());
1404 }
1405 }
1406 }
1407 }
1408 _ => {}
1409 }
1410 }
1411
1412 PropertyFtsSchema {
1413 paths,
1414 separator: separator.to_owned(),
1415 exclude_paths,
1416 }
1417}
1418
1419fn resolve_operational_writes(
1420 conn: &rusqlite::Connection,
1421 prepared: &mut PreparedWrite,
1422) -> Result<(), EngineError> {
1423 let mut collection_kinds = HashMap::new();
1424 let mut collection_filter_fields = HashMap::new();
1425 let mut collection_validation_contracts = HashMap::new();
1426 for write in &prepared.operational_writes {
1427 let collection = operational_write_collection(write);
1428 if !collection_kinds.contains_key(collection) {
1429 let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1430 .query_row(
1431 "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1432 params![collection],
1433 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1434 )
1435 .optional()
1436 .map_err(EngineError::Sqlite)?;
1437 let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1438 .ok_or_else(|| {
1439 EngineError::InvalidWrite(format!(
1440 "operational collection '{collection}' is not registered"
1441 ))
1442 })?;
1443 if disabled_at.is_some() {
1444 return Err(EngineError::InvalidWrite(format!(
1445 "operational collection '{collection}' is disabled"
1446 )));
1447 }
1448 let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1449 .map_err(EngineError::InvalidWrite)?;
1450 let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1451 let validation_contract = parse_operational_validation_contract(&validation_json)
1452 .map_err(EngineError::InvalidWrite)?;
1453 collection_kinds.insert(collection.to_owned(), kind);
1454 collection_filter_fields.insert(collection.to_owned(), filter_fields);
1455 collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1456 }
1457
1458 let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1459 EngineError::InvalidWrite("missing operational collection kind".to_owned())
1460 })?;
1461 match (kind, write) {
1462 (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1463 | (
1464 OperationalCollectionKind::LatestState,
1465 OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1466 ) => {}
1467 (OperationalCollectionKind::AppendOnlyLog, _) => {
1468 return Err(EngineError::InvalidWrite(format!(
1469 "operational collection '{collection}' is append_only_log and only accepts Append"
1470 )));
1471 }
1472 (OperationalCollectionKind::LatestState, _) => {
1473 return Err(EngineError::InvalidWrite(format!(
1474 "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1475 )));
1476 }
1477 }
1478 if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1479 let _ = check_operational_write_against_contract(write, contract)?;
1480 }
1481 }
1482 prepared.operational_collection_kinds = collection_kinds;
1483 prepared.operational_collection_filter_fields = collection_filter_fields;
1484 Ok(())
1485}
1486
1487fn parse_operational_filter_fields(
1488 filter_fields_json: &str,
1489) -> Result<Vec<OperationalFilterField>, EngineError> {
1490 let fields: Vec<OperationalFilterField> =
1491 serde_json::from_str(filter_fields_json).map_err(|error| {
1492 EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1493 })?;
1494 let mut seen = std::collections::HashSet::new();
1495 for field in &fields {
1496 if field.name.trim().is_empty() {
1497 return Err(EngineError::InvalidWrite(
1498 "filter_fields_json field names must not be empty".to_owned(),
1499 ));
1500 }
1501 if !seen.insert(field.name.as_str()) {
1502 return Err(EngineError::InvalidWrite(format!(
1503 "filter_fields_json contains duplicate field '{}'",
1504 field.name
1505 )));
1506 }
1507 if field.modes.is_empty() {
1508 return Err(EngineError::InvalidWrite(format!(
1509 "filter_fields_json field '{}' must declare at least one mode",
1510 field.name
1511 )));
1512 }
1513 if field.modes.contains(&OperationalFilterMode::Prefix)
1514 && field.field_type != OperationalFilterFieldType::String
1515 {
1516 return Err(EngineError::InvalidWrite(format!(
1517 "filter field '{}' only supports prefix for string types",
1518 field.name
1519 )));
1520 }
1521 }
1522 Ok(fields)
1523}
1524
1525#[derive(Clone, Debug, PartialEq, Eq)]
1526struct OperationalFilterValueRow {
1527 field_name: String,
1528 string_value: Option<String>,
1529 integer_value: Option<i64>,
1530}
1531
1532fn extract_operational_filter_values(
1533 filter_fields: &[OperationalFilterField],
1534 payload_json: &str,
1535) -> Vec<OperationalFilterValueRow> {
1536 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1537 return Vec::new();
1538 };
1539 let Some(object) = parsed.as_object() else {
1540 return Vec::new();
1541 };
1542
1543 filter_fields
1544 .iter()
1545 .filter_map(|field| {
1546 let value = object.get(&field.name)?;
1547 match field.field_type {
1548 OperationalFilterFieldType::String => {
1549 value
1550 .as_str()
1551 .map(|string_value| OperationalFilterValueRow {
1552 field_name: field.name.clone(),
1553 string_value: Some(string_value.to_owned()),
1554 integer_value: None,
1555 })
1556 }
1557 OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1558 value
1559 .as_i64()
1560 .map(|integer_value| OperationalFilterValueRow {
1561 field_name: field.name.clone(),
1562 string_value: None,
1563 integer_value: Some(integer_value),
1564 })
1565 }
1566 }
1567 })
1568 .collect()
1569}
1570
1571fn resolve_and_apply(
1572 conn: &mut rusqlite::Connection,
1573 prepared: &mut PreparedWrite,
1574) -> Result<WriteReceipt, EngineError> {
1575 resolve_fts_rows(conn, prepared)?;
1576 resolve_property_fts_rows(conn, prepared)?;
1577 resolve_operational_writes(conn, prepared)?;
1578 apply_write(conn, prepared)
1579}
1580
1581fn apply_touch_last_accessed(
1582 conn: &mut rusqlite::Connection,
1583 request: &LastAccessTouchRequest,
1584) -> Result<LastAccessTouchReport, EngineError> {
1585 let mut seen = std::collections::HashSet::new();
1586 let logical_ids = request
1587 .logical_ids
1588 .iter()
1589 .filter(|logical_id| seen.insert(logical_id.as_str()))
1590 .cloned()
1591 .collect::<Vec<_>>();
1592 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1593
1594 for logical_id in &logical_ids {
1595 let exists = tx
1596 .query_row(
1597 "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1598 params![logical_id],
1599 |row| row.get::<_, i64>(0),
1600 )
1601 .optional()?
1602 .is_some();
1603 if !exists {
1604 return Err(EngineError::InvalidWrite(format!(
1605 "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1606 )));
1607 }
1608 }
1609
1610 {
1611 let mut upsert_metadata = tx.prepare_cached(
1612 "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1613 VALUES (?1, ?2, ?2) \
1614 ON CONFLICT(logical_id) DO UPDATE SET \
1615 last_accessed_at = excluded.last_accessed_at, \
1616 updated_at = excluded.updated_at",
1617 )?;
1618 let mut insert_provenance = tx.prepare_cached(
1619 "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1620 VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1621 )?;
1622 for logical_id in &logical_ids {
1623 upsert_metadata.execute(params![logical_id, request.touched_at])?;
1624 insert_provenance.execute(params![
1625 new_id(),
1626 logical_id,
1627 request.source_ref.as_deref(),
1628 format!("{{\"touched_at\":{}}}", request.touched_at),
1629 ])?;
1630 }
1631 }
1632
1633 tx.commit()?;
1634 Ok(LastAccessTouchReport {
1635 touched_logical_ids: logical_ids.len(),
1636 touched_at: request.touched_at,
1637 })
1638}
1639
1640fn ensure_operational_collections_writable(
1641 tx: &rusqlite::Transaction<'_>,
1642 prepared: &PreparedWrite,
1643) -> Result<(), EngineError> {
1644 for collection in prepared.operational_collection_kinds.keys() {
1645 let disabled_at: Option<Option<i64>> = tx
1646 .query_row(
1647 "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1648 params![collection],
1649 |row| row.get::<_, Option<i64>>(0),
1650 )
1651 .optional()?;
1652 match disabled_at {
1653 Some(Some(_)) => {
1654 return Err(EngineError::InvalidWrite(format!(
1655 "operational collection '{collection}' is disabled"
1656 )));
1657 }
1658 Some(None) => {}
1659 None => {
1660 return Err(EngineError::InvalidWrite(format!(
1661 "operational collection '{collection}' is not registered"
1662 )));
1663 }
1664 }
1665 }
1666 Ok(())
1667}
1668
1669fn validate_operational_writes_against_live_contracts(
1670 tx: &rusqlite::Transaction<'_>,
1671 prepared: &PreparedWrite,
1672) -> Result<Vec<String>, EngineError> {
1673 let mut collection_validation_contracts =
1674 HashMap::<String, Option<OperationalValidationContract>>::new();
1675 for collection in prepared.operational_collection_kinds.keys() {
1676 let validation_json: String = tx
1677 .query_row(
1678 "SELECT validation_json FROM operational_collections WHERE name = ?1",
1679 params![collection],
1680 |row| row.get(0),
1681 )
1682 .map_err(EngineError::Sqlite)?;
1683 let validation_contract = parse_operational_validation_contract(&validation_json)
1684 .map_err(EngineError::InvalidWrite)?;
1685 collection_validation_contracts.insert(collection.clone(), validation_contract);
1686 }
1687
1688 let mut warnings = Vec::new();
1689 for write in &prepared.operational_writes {
1690 if let Some(Some(contract)) =
1691 collection_validation_contracts.get(operational_write_collection(write))
1692 && let Some(warning) = check_operational_write_against_contract(write, contract)?
1693 {
1694 warnings.push(warning);
1695 }
1696 }
1697
1698 Ok(warnings)
1699}
1700
1701fn load_live_operational_secondary_indexes(
1702 tx: &rusqlite::Transaction<'_>,
1703 prepared: &PreparedWrite,
1704) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1705 let mut collection_indexes = HashMap::new();
1706 for (collection, collection_kind) in &prepared.operational_collection_kinds {
1707 let secondary_indexes_json: String = tx
1708 .query_row(
1709 "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1710 params![collection],
1711 |row| row.get(0),
1712 )
1713 .map_err(EngineError::Sqlite)?;
1714 let indexes =
1715 parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1716 .map_err(EngineError::InvalidWrite)?;
1717 collection_indexes.insert(collection.clone(), indexes);
1718 }
1719 Ok(collection_indexes)
1720}
1721
1722fn check_operational_write_against_contract(
1723 write: &OperationalWrite,
1724 contract: &OperationalValidationContract,
1725) -> Result<Option<String>, EngineError> {
1726 if contract.mode == OperationalValidationMode::Disabled {
1727 return Ok(None);
1728 }
1729
1730 let (payload_json, collection, record_key) = match write {
1731 OperationalWrite::Append {
1732 collection,
1733 record_key,
1734 payload_json,
1735 ..
1736 }
1737 | OperationalWrite::Put {
1738 collection,
1739 record_key,
1740 payload_json,
1741 ..
1742 } => (
1743 payload_json.as_str(),
1744 collection.as_str(),
1745 record_key.as_str(),
1746 ),
1747 OperationalWrite::Delete { .. } => return Ok(None),
1748 };
1749
1750 match validate_operational_payload_against_contract(contract, payload_json) {
1751 Ok(()) => Ok(None),
1752 Err(message) => match contract.mode {
1753 OperationalValidationMode::Disabled => Ok(None),
1754 OperationalValidationMode::ReportOnly => Ok(Some(format!(
1755 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1756 kind = operational_write_kind(write)
1757 ))),
1758 OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1759 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1760 kind = operational_write_kind(write)
1761 ))),
1762 },
1763 }
1764}
1765
1766#[allow(clippy::too_many_lines)]
1767fn apply_write(
1768 conn: &mut rusqlite::Connection,
1769 prepared: &mut PreparedWrite,
1770) -> Result<WriteReceipt, EngineError> {
1771 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1772
1773 {
1776 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1777 let mut del_prop_fts =
1778 tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1779 let mut del_prop_positions = tx
1780 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1781 let mut sup_node = tx.prepare_cached(
1782 "UPDATE nodes SET superseded_at = unixepoch() \
1783 WHERE logical_id = ?1 AND superseded_at IS NULL",
1784 )?;
1785 let mut ins_event = tx.prepare_cached(
1786 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1787 VALUES (?1, 'node_retire', ?2, ?3)",
1788 )?;
1789 for retire in &prepared.node_retires {
1790 del_fts.execute(params![retire.logical_id])?;
1791 del_prop_fts.execute(params![retire.logical_id])?;
1792 del_prop_positions.execute(params![retire.logical_id])?;
1793 sup_node.execute(params![retire.logical_id])?;
1794 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1795 }
1796 }
1797
1798 {
1800 let mut sup_edge = tx.prepare_cached(
1801 "UPDATE edges SET superseded_at = unixepoch() \
1802 WHERE logical_id = ?1 AND superseded_at IS NULL",
1803 )?;
1804 let mut ins_event = tx.prepare_cached(
1805 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1806 VALUES (?1, 'edge_retire', ?2, ?3)",
1807 )?;
1808 for retire in &prepared.edge_retires {
1809 sup_edge.execute(params![retire.logical_id])?;
1810 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1811 }
1812 }
1813
1814 {
1816 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1817 let mut del_prop_fts =
1818 tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1819 let mut del_prop_positions = tx
1820 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1821 let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1822 let mut sup_node = tx.prepare_cached(
1823 "UPDATE nodes SET superseded_at = unixepoch() \
1824 WHERE logical_id = ?1 AND superseded_at IS NULL",
1825 )?;
1826 let mut ins_node = tx.prepare_cached(
1827 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1828 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1829 )?;
1830 #[cfg(feature = "sqlite-vec")]
1831 let vec_del_sql2 = "DELETE FROM vec_nodes_active WHERE chunk_id IN \
1832 (SELECT id FROM chunks WHERE node_logical_id = ?1)";
1833 #[cfg(feature = "sqlite-vec")]
1834 let mut del_vec = match tx.prepare_cached(vec_del_sql2) {
1835 Ok(stmt) => Some(stmt),
1836 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => None,
1837 Err(e) => return Err(e.into()),
1838 };
1839 for node in &prepared.nodes {
1840 if node.upsert {
1841 del_prop_fts.execute(params![node.logical_id])?;
1843 del_prop_positions.execute(params![node.logical_id])?;
1844 if node.chunk_policy == ChunkPolicy::Replace {
1845 #[cfg(feature = "sqlite-vec")]
1846 if let Some(ref mut stmt) = del_vec {
1847 stmt.execute(params![node.logical_id])?;
1848 }
1849 del_fts.execute(params![node.logical_id])?;
1850 del_chunks.execute(params![node.logical_id])?;
1851 }
1852 sup_node.execute(params![node.logical_id])?;
1853 }
1854 ins_node.execute(params![
1855 node.row_id,
1856 node.logical_id,
1857 node.kind,
1858 node.properties,
1859 node.source_ref,
1860 node.content_ref,
1861 ])?;
1862 }
1863 }
1864
1865 {
1867 let mut sup_edge = tx.prepare_cached(
1868 "UPDATE edges SET superseded_at = unixepoch() \
1869 WHERE logical_id = ?1 AND superseded_at IS NULL",
1870 )?;
1871 let mut ins_edge = tx.prepare_cached(
1872 "INSERT INTO edges \
1873 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1874 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1875 )?;
1876 for edge in &prepared.edges {
1877 if edge.upsert {
1878 sup_edge.execute(params![edge.logical_id])?;
1879 }
1880 ins_edge.execute(params![
1881 edge.row_id,
1882 edge.logical_id,
1883 edge.source_logical_id,
1884 edge.target_logical_id,
1885 edge.kind,
1886 edge.properties,
1887 edge.source_ref,
1888 ])?;
1889 }
1890 }
1891
1892 {
1894 let mut ins_chunk = tx.prepare_cached(
1895 "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
1896 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1897 )?;
1898 for chunk in &prepared.chunks {
1899 ins_chunk.execute(params![
1900 chunk.id,
1901 chunk.node_logical_id,
1902 chunk.text_content,
1903 chunk.byte_start,
1904 chunk.byte_end,
1905 chunk.content_hash,
1906 ])?;
1907 }
1908 }
1909
1910 {
1912 let mut sup_run = tx.prepare_cached(
1913 "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1914 )?;
1915 let mut ins_run = tx.prepare_cached(
1916 "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1917 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1918 )?;
1919 for run in &prepared.runs {
1920 if run.upsert
1921 && let Some(ref prior_id) = run.supersedes_id
1922 {
1923 sup_run.execute(params![prior_id])?;
1924 }
1925 ins_run.execute(params![
1926 run.id,
1927 run.kind,
1928 run.status,
1929 run.properties,
1930 run.source_ref
1931 ])?;
1932 }
1933 }
1934
1935 {
1937 let mut sup_step = tx.prepare_cached(
1938 "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1939 )?;
1940 let mut ins_step = tx.prepare_cached(
1941 "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1942 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1943 )?;
1944 for step in &prepared.steps {
1945 if step.upsert
1946 && let Some(ref prior_id) = step.supersedes_id
1947 {
1948 sup_step.execute(params![prior_id])?;
1949 }
1950 ins_step.execute(params![
1951 step.id,
1952 step.run_id,
1953 step.kind,
1954 step.status,
1955 step.properties,
1956 step.source_ref,
1957 ])?;
1958 }
1959 }
1960
1961 {
1963 let mut sup_action = tx.prepare_cached(
1964 "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1965 )?;
1966 let mut ins_action = tx.prepare_cached(
1967 "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1968 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1969 )?;
1970 for action in &prepared.actions {
1971 if action.upsert
1972 && let Some(ref prior_id) = action.supersedes_id
1973 {
1974 sup_action.execute(params![prior_id])?;
1975 }
1976 ins_action.execute(params![
1977 action.id,
1978 action.step_id,
1979 action.kind,
1980 action.status,
1981 action.properties,
1982 action.source_ref,
1983 ])?;
1984 }
1985 }
1986
1987 {
1989 ensure_operational_collections_writable(&tx, prepared)?;
1990 prepared.operational_validation_warnings =
1991 validate_operational_writes_against_live_contracts(&tx, prepared)?;
1992 let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
1993
1994 let mut next_mutation_order: i64 = tx.query_row(
1995 "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
1996 [],
1997 |row| row.get(0),
1998 )?;
1999 let mut ins_mutation = tx.prepare_cached(
2000 "INSERT INTO operational_mutations \
2001 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2002 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
2003 )?;
2004 let mut ins_filter_value = tx.prepare_cached(
2005 "INSERT INTO operational_filter_values \
2006 (mutation_id, collection_name, field_name, string_value, integer_value) \
2007 VALUES (?1, ?2, ?3, ?4, ?5)",
2008 )?;
2009 let mut upsert_current = tx.prepare_cached(
2010 "INSERT INTO operational_current \
2011 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2012 VALUES (?1, ?2, ?3, unixepoch(), ?4) \
2013 ON CONFLICT(collection_name, record_key) DO UPDATE SET \
2014 payload_json = excluded.payload_json, \
2015 updated_at = excluded.updated_at, \
2016 last_mutation_id = excluded.last_mutation_id",
2017 )?;
2018 let mut del_current = tx.prepare_cached(
2019 "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
2020 )?;
2021 let mut del_current_secondary_indexes = tx.prepare_cached(
2022 "DELETE FROM operational_secondary_index_entries \
2023 WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
2024 )?;
2025 let mut ins_secondary_index = tx.prepare_cached(
2026 "INSERT INTO operational_secondary_index_entries \
2027 (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
2028 slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
2029 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
2030 )?;
2031 let mut current_row_stmt = tx.prepare_cached(
2032 "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
2033 WHERE collection_name = ?1 AND record_key = ?2",
2034 )?;
2035
2036 for write in &prepared.operational_writes {
2037 let collection = operational_write_collection(write);
2038 let record_key = operational_write_record_key(write);
2039 let mutation_id = new_id();
2040 next_mutation_order += 1;
2041 let payload_json = operational_write_payload(write);
2042 ins_mutation.execute(params![
2043 &mutation_id,
2044 collection,
2045 record_key,
2046 operational_write_kind(write),
2047 payload_json,
2048 operational_write_source_ref(write),
2049 next_mutation_order,
2050 ])?;
2051 if let Some(indexes) = collection_secondary_indexes.get(collection) {
2052 for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
2053 ins_secondary_index.execute(params![
2054 collection,
2055 entry.index_name,
2056 "mutation",
2057 &mutation_id,
2058 record_key,
2059 entry.sort_timestamp,
2060 entry.slot1_text,
2061 entry.slot1_integer,
2062 entry.slot2_text,
2063 entry.slot2_integer,
2064 entry.slot3_text,
2065 entry.slot3_integer,
2066 ])?;
2067 }
2068 }
2069 if let Some(filter_fields) = prepared
2070 .operational_collection_filter_fields
2071 .get(collection)
2072 {
2073 for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
2074 ins_filter_value.execute(params![
2075 &mutation_id,
2076 collection,
2077 filter_value.field_name,
2078 filter_value.string_value,
2079 filter_value.integer_value,
2080 ])?;
2081 }
2082 }
2083
2084 if prepared.operational_collection_kinds.get(collection)
2085 == Some(&OperationalCollectionKind::LatestState)
2086 {
2087 del_current_secondary_indexes.execute(params![collection, record_key])?;
2088 match write {
2089 OperationalWrite::Put { payload_json, .. } => {
2090 upsert_current.execute(params![
2091 collection,
2092 record_key,
2093 payload_json,
2094 &mutation_id,
2095 ])?;
2096 if let Some(indexes) = collection_secondary_indexes.get(collection) {
2097 let (current_payload_json, updated_at, last_mutation_id): (
2098 String,
2099 i64,
2100 String,
2101 ) = current_row_stmt
2102 .query_row(params![collection, record_key], |row| {
2103 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
2104 })?;
2105 for entry in extract_secondary_index_entries_for_current(
2106 indexes,
2107 ¤t_payload_json,
2108 updated_at,
2109 ) {
2110 ins_secondary_index.execute(params![
2111 collection,
2112 entry.index_name,
2113 "current",
2114 last_mutation_id.as_str(),
2115 record_key,
2116 entry.sort_timestamp,
2117 entry.slot1_text,
2118 entry.slot1_integer,
2119 entry.slot2_text,
2120 entry.slot2_integer,
2121 entry.slot3_text,
2122 entry.slot3_integer,
2123 ])?;
2124 }
2125 }
2126 }
2127 OperationalWrite::Delete { .. } => {
2128 del_current.execute(params![collection, record_key])?;
2129 }
2130 OperationalWrite::Append { .. } => {}
2131 }
2132 }
2133 }
2134 }
2135
2136 {
2138 let mut ins_fts = tx.prepare_cached(
2139 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
2140 VALUES (?1, ?2, ?3, ?4)",
2141 )?;
2142 for fts_row in &prepared.required_fts_rows {
2143 ins_fts.execute(params![
2144 fts_row.chunk_id,
2145 fts_row.node_logical_id,
2146 fts_row.kind,
2147 fts_row.text_content,
2148 ])?;
2149 }
2150 }
2151
2152 if !prepared.required_property_fts_rows.is_empty() {
2154 let mut ins_prop_fts = tx.prepare_cached(
2155 "INSERT INTO fts_node_properties (node_logical_id, kind, text_content) \
2156 VALUES (?1, ?2, ?3)",
2157 )?;
2158 let mut ins_positions = tx.prepare_cached(
2159 "INSERT INTO fts_node_property_positions \
2160 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
2161 VALUES (?1, ?2, ?3, ?4, ?5)",
2162 )?;
2163 let mut del_positions = tx
2166 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
2167 for row in &prepared.required_property_fts_rows {
2168 del_positions.execute(params![row.node_logical_id])?;
2169 ins_prop_fts.execute(params![row.node_logical_id, row.kind, row.text_content,])?;
2170 for pos in &row.positions {
2171 ins_positions.execute(params![
2172 row.node_logical_id,
2173 row.kind,
2174 i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
2175 i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
2176 pos.leaf_path,
2177 ])?;
2178 }
2179 }
2180 }
2181
2182 #[cfg(feature = "sqlite-vec")]
2184 {
2185 match tx
2186 .prepare_cached("INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES (?1, ?2)")
2187 {
2188 Ok(mut ins_vec) => {
2189 for vi in &prepared.vec_inserts {
2190 let bytes: Vec<u8> =
2191 vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
2192 ins_vec.execute(params![vi.chunk_id, bytes])?;
2193 }
2194 }
2195 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {
2196 }
2198 Err(e) => return Err(e.into()),
2199 }
2200 }
2201
2202 tx.commit()?;
2203
2204 let provenance_warnings: Vec<String> = prepared
2205 .nodes
2206 .iter()
2207 .filter(|node| node.source_ref.is_none())
2208 .map(|node| format!("node '{}' has no source_ref", node.logical_id))
2209 .chain(
2210 prepared
2211 .node_retires
2212 .iter()
2213 .filter(|r| r.source_ref.is_none())
2214 .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
2215 )
2216 .chain(
2217 prepared
2218 .edges
2219 .iter()
2220 .filter(|e| e.source_ref.is_none())
2221 .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
2222 )
2223 .chain(
2224 prepared
2225 .edge_retires
2226 .iter()
2227 .filter(|r| r.source_ref.is_none())
2228 .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
2229 )
2230 .chain(
2231 prepared
2232 .runs
2233 .iter()
2234 .filter(|r| r.source_ref.is_none())
2235 .map(|r| format!("run '{}' has no source_ref", r.id)),
2236 )
2237 .chain(
2238 prepared
2239 .steps
2240 .iter()
2241 .filter(|s| s.source_ref.is_none())
2242 .map(|s| format!("step '{}' has no source_ref", s.id)),
2243 )
2244 .chain(
2245 prepared
2246 .actions
2247 .iter()
2248 .filter(|a| a.source_ref.is_none())
2249 .map(|a| format!("action '{}' has no source_ref", a.id)),
2250 )
2251 .chain(
2252 prepared
2253 .operational_writes
2254 .iter()
2255 .filter(|write| operational_write_source_ref(write).is_none())
2256 .map(|write| {
2257 format!(
2258 "operational {} '{}:{}' has no source_ref",
2259 operational_write_kind(write),
2260 operational_write_collection(write),
2261 operational_write_record_key(write)
2262 )
2263 }),
2264 )
2265 .collect();
2266
2267 let mut warnings = provenance_warnings.clone();
2268 warnings.extend(prepared.operational_validation_warnings.clone());
2269
2270 Ok(WriteReceipt {
2271 label: prepared.label.clone(),
2272 optional_backfill_count: prepared.optional_backfills.len(),
2273 warnings,
2274 provenance_warnings,
2275 })
2276}
2277
2278fn operational_write_collection(write: &OperationalWrite) -> &str {
2279 match write {
2280 OperationalWrite::Append { collection, .. }
2281 | OperationalWrite::Put { collection, .. }
2282 | OperationalWrite::Delete { collection, .. } => collection,
2283 }
2284}
2285
2286fn operational_write_record_key(write: &OperationalWrite) -> &str {
2287 match write {
2288 OperationalWrite::Append { record_key, .. }
2289 | OperationalWrite::Put { record_key, .. }
2290 | OperationalWrite::Delete { record_key, .. } => record_key,
2291 }
2292}
2293
2294fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2295 match write {
2296 OperationalWrite::Append { .. } => "append",
2297 OperationalWrite::Put { .. } => "put",
2298 OperationalWrite::Delete { .. } => "delete",
2299 }
2300}
2301
2302fn operational_write_payload(write: &OperationalWrite) -> &str {
2303 match write {
2304 OperationalWrite::Append { payload_json, .. }
2305 | OperationalWrite::Put { payload_json, .. } => payload_json,
2306 OperationalWrite::Delete { .. } => "null",
2307 }
2308}
2309
2310fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2311 match write {
2312 OperationalWrite::Append { source_ref, .. }
2313 | OperationalWrite::Put { source_ref, .. }
2314 | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2315 }
2316}
2317
2318#[cfg(test)]
2319#[allow(clippy::expect_used)]
2320mod tests {
2321 use std::sync::Arc;
2322
2323 use fathomdb_schema::SchemaManager;
2324 use tempfile::NamedTempFile;
2325
2326 use super::{apply_write, prepare_write, resolve_operational_writes};
2327 use crate::{
2328 ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2329 NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2330 StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2331 projection::ProjectionTarget,
2332 };
2333
2334 #[test]
2335 fn writer_executes_runtime_table_rows() {
2336 let db = NamedTempFile::new().expect("temporary db");
2337 let writer = WriterActor::start(
2338 db.path(),
2339 Arc::new(SchemaManager::new()),
2340 ProvenanceMode::Warn,
2341 Arc::new(TelemetryCounters::default()),
2342 )
2343 .expect("writer");
2344
2345 let receipt = writer
2346 .submit(WriteRequest {
2347 label: "runtime".to_owned(),
2348 nodes: vec![],
2349 node_retires: vec![],
2350 edges: vec![],
2351 edge_retires: vec![],
2352 chunks: vec![],
2353 runs: vec![RunInsert {
2354 id: "run-1".to_owned(),
2355 kind: "session".to_owned(),
2356 status: "completed".to_owned(),
2357 properties: "{}".to_owned(),
2358 source_ref: Some("src-1".to_owned()),
2359 upsert: false,
2360 supersedes_id: None,
2361 }],
2362 steps: vec![StepInsert {
2363 id: "step-1".to_owned(),
2364 run_id: "run-1".to_owned(),
2365 kind: "llm".to_owned(),
2366 status: "completed".to_owned(),
2367 properties: "{}".to_owned(),
2368 source_ref: Some("src-1".to_owned()),
2369 upsert: false,
2370 supersedes_id: None,
2371 }],
2372 actions: vec![ActionInsert {
2373 id: "action-1".to_owned(),
2374 step_id: "step-1".to_owned(),
2375 kind: "emit".to_owned(),
2376 status: "completed".to_owned(),
2377 properties: "{}".to_owned(),
2378 source_ref: Some("src-1".to_owned()),
2379 upsert: false,
2380 supersedes_id: None,
2381 }],
2382 optional_backfills: vec![],
2383 vec_inserts: vec![],
2384 operational_writes: vec![],
2385 })
2386 .expect("write receipt");
2387
2388 assert_eq!(receipt.label, "runtime");
2389 }
2390
2391 #[test]
2392 fn writer_put_operational_write_updates_current_and_mutations() {
2393 let db = NamedTempFile::new().expect("temporary db");
2394 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2395 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2396 conn.execute(
2397 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2398 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2399 [],
2400 )
2401 .expect("seed collection");
2402 drop(conn);
2403 let writer = WriterActor::start(
2404 db.path(),
2405 Arc::new(SchemaManager::new()),
2406 ProvenanceMode::Warn,
2407 Arc::new(TelemetryCounters::default()),
2408 )
2409 .expect("writer");
2410
2411 writer
2412 .submit(WriteRequest {
2413 label: "node-and-operational".to_owned(),
2414 nodes: vec![NodeInsert {
2415 row_id: "row-1".to_owned(),
2416 logical_id: "lg-1".to_owned(),
2417 kind: "Meeting".to_owned(),
2418 properties: "{}".to_owned(),
2419 source_ref: Some("src-1".to_owned()),
2420 upsert: false,
2421 chunk_policy: ChunkPolicy::Preserve,
2422 content_ref: None,
2423 }],
2424 node_retires: vec![],
2425 edges: vec![],
2426 edge_retires: vec![],
2427 chunks: vec![],
2428 runs: vec![],
2429 steps: vec![],
2430 actions: vec![],
2431 optional_backfills: vec![],
2432 vec_inserts: vec![],
2433 operational_writes: vec![OperationalWrite::Put {
2434 collection: "connector_health".to_owned(),
2435 record_key: "gmail".to_owned(),
2436 payload_json: r#"{"status":"ok"}"#.to_owned(),
2437 source_ref: Some("src-1".to_owned()),
2438 }],
2439 })
2440 .expect("write receipt");
2441
2442 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2443 let node_count: i64 = conn
2444 .query_row(
2445 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2446 [],
2447 |row| row.get(0),
2448 )
2449 .expect("node count");
2450 assert_eq!(node_count, 1);
2451 let mutation_count: i64 = conn
2452 .query_row(
2453 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2454 AND record_key = 'gmail'",
2455 [],
2456 |row| row.get(0),
2457 )
2458 .expect("mutation count");
2459 assert_eq!(mutation_count, 1);
2460 let payload: String = conn
2461 .query_row(
2462 "SELECT payload_json FROM operational_current \
2463 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2464 [],
2465 |row| row.get(0),
2466 )
2467 .expect("current payload");
2468 assert_eq!(payload, r#"{"status":"ok"}"#);
2469 }
2470
2471 #[test]
2472 fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2473 let db = NamedTempFile::new().expect("temporary db");
2474 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2475 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2476 conn.execute(
2477 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2478 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2479 [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2480 )
2481 .expect("seed collection");
2482 drop(conn);
2483 let writer = WriterActor::start(
2484 db.path(),
2485 Arc::new(SchemaManager::new()),
2486 ProvenanceMode::Warn,
2487 Arc::new(TelemetryCounters::default()),
2488 )
2489 .expect("writer");
2490
2491 writer
2492 .submit(WriteRequest {
2493 label: "disabled-validation".to_owned(),
2494 nodes: vec![],
2495 node_retires: vec![],
2496 edges: vec![],
2497 edge_retires: vec![],
2498 chunks: vec![],
2499 runs: vec![],
2500 steps: vec![],
2501 actions: vec![],
2502 optional_backfills: vec![],
2503 vec_inserts: vec![],
2504 operational_writes: vec![OperationalWrite::Put {
2505 collection: "connector_health".to_owned(),
2506 record_key: "gmail".to_owned(),
2507 payload_json: r#"{"bogus":true}"#.to_owned(),
2508 source_ref: Some("src-1".to_owned()),
2509 }],
2510 })
2511 .expect("write receipt");
2512
2513 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2514 let payload: String = conn
2515 .query_row(
2516 "SELECT payload_json FROM operational_current \
2517 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2518 [],
2519 |row| row.get(0),
2520 )
2521 .expect("current payload");
2522 assert_eq!(payload, r#"{"bogus":true}"#);
2523 }
2524
2525 #[test]
2526 fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2527 let db = NamedTempFile::new().expect("temporary db");
2528 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2529 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2530 conn.execute(
2531 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2532 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2533 [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2534 )
2535 .expect("seed collection");
2536 drop(conn);
2537 let writer = WriterActor::start(
2538 db.path(),
2539 Arc::new(SchemaManager::new()),
2540 ProvenanceMode::Warn,
2541 Arc::new(TelemetryCounters::default()),
2542 )
2543 .expect("writer");
2544
2545 let receipt = writer
2546 .submit(WriteRequest {
2547 label: "report-only-validation".to_owned(),
2548 nodes: vec![],
2549 node_retires: vec![],
2550 edges: vec![],
2551 edge_retires: vec![],
2552 chunks: vec![],
2553 runs: vec![],
2554 steps: vec![],
2555 actions: vec![],
2556 optional_backfills: vec![],
2557 vec_inserts: vec![],
2558 operational_writes: vec![OperationalWrite::Put {
2559 collection: "connector_health".to_owned(),
2560 record_key: "gmail".to_owned(),
2561 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2562 source_ref: Some("src-1".to_owned()),
2563 }],
2564 })
2565 .expect("report_only write should succeed");
2566
2567 assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2568 assert_eq!(receipt.warnings.len(), 1);
2569 assert!(
2570 receipt.warnings[0].contains("connector_health"),
2571 "warning should identify collection"
2572 );
2573 assert!(
2574 receipt.warnings[0].contains("must be one of"),
2575 "warning should explain validation failure"
2576 );
2577
2578 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2579 let payload: String = conn
2580 .query_row(
2581 "SELECT payload_json FROM operational_current \
2582 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2583 [],
2584 |row| row.get(0),
2585 )
2586 .expect("current payload");
2587 assert_eq!(payload, r#"{"status":"bogus"}"#);
2588 }
2589
2590 #[test]
2591 fn writer_rejects_operational_write_for_missing_collection() {
2592 let db = NamedTempFile::new().expect("temporary db");
2593 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2594 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2595 drop(conn);
2596 let writer = WriterActor::start(
2597 db.path(),
2598 Arc::new(SchemaManager::new()),
2599 ProvenanceMode::Warn,
2600 Arc::new(TelemetryCounters::default()),
2601 )
2602 .expect("writer");
2603
2604 let result = writer.submit(WriteRequest {
2605 label: "missing-operational-collection".to_owned(),
2606 nodes: vec![],
2607 node_retires: vec![],
2608 edges: vec![],
2609 edge_retires: vec![],
2610 chunks: vec![],
2611 runs: vec![],
2612 steps: vec![],
2613 actions: vec![],
2614 optional_backfills: vec![],
2615 vec_inserts: vec![],
2616 operational_writes: vec![OperationalWrite::Put {
2617 collection: "connector_health".to_owned(),
2618 record_key: "gmail".to_owned(),
2619 payload_json: r#"{"status":"ok"}"#.to_owned(),
2620 source_ref: Some("src-1".to_owned()),
2621 }],
2622 });
2623
2624 assert!(
2625 matches!(result, Err(EngineError::InvalidWrite(_))),
2626 "missing operational collection must return InvalidWrite"
2627 );
2628 }
2629
2630 #[test]
2631 fn writer_append_operational_write_records_history_without_current_row() {
2632 let db = NamedTempFile::new().expect("temporary db");
2633 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2634 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2635 conn.execute(
2636 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2637 VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2638 [],
2639 )
2640 .expect("seed collection");
2641 drop(conn);
2642 let writer = WriterActor::start(
2643 db.path(),
2644 Arc::new(SchemaManager::new()),
2645 ProvenanceMode::Warn,
2646 Arc::new(TelemetryCounters::default()),
2647 )
2648 .expect("writer");
2649
2650 writer
2651 .submit(WriteRequest {
2652 label: "append-operational".to_owned(),
2653 nodes: vec![],
2654 node_retires: vec![],
2655 edges: vec![],
2656 edge_retires: vec![],
2657 chunks: vec![],
2658 runs: vec![],
2659 steps: vec![],
2660 actions: vec![],
2661 optional_backfills: vec![],
2662 vec_inserts: vec![],
2663 operational_writes: vec![OperationalWrite::Append {
2664 collection: "audit_log".to_owned(),
2665 record_key: "evt-1".to_owned(),
2666 payload_json: r#"{"type":"sync"}"#.to_owned(),
2667 source_ref: Some("src-1".to_owned()),
2668 }],
2669 })
2670 .expect("write receipt");
2671
2672 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2673 let mutation: (String, String) = conn
2674 .query_row(
2675 "SELECT op_kind, payload_json FROM operational_mutations \
2676 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2677 [],
2678 |row| Ok((row.get(0)?, row.get(1)?)),
2679 )
2680 .expect("mutation row");
2681 assert_eq!(mutation.0, "append");
2682 assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2683 let current_count: i64 = conn
2684 .query_row(
2685 "SELECT count(*) FROM operational_current \
2686 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2687 [],
2688 |row| row.get(0),
2689 )
2690 .expect("current count");
2691 assert_eq!(current_count, 0);
2692 }
2693
2694 #[test]
2695 fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2696 let db = NamedTempFile::new().expect("temporary db");
2697 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2698 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2699 conn.execute(
2700 "INSERT INTO operational_collections \
2701 (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2702 VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2703 '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2704 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2705 )
2706 .expect("seed collection");
2707 drop(conn);
2708 let writer = WriterActor::start(
2709 db.path(),
2710 Arc::new(SchemaManager::new()),
2711 ProvenanceMode::Warn,
2712 Arc::new(TelemetryCounters::default()),
2713 )
2714 .expect("writer");
2715
2716 let error = writer
2717 .submit(WriteRequest {
2718 label: "invalid-append".to_owned(),
2719 nodes: vec![],
2720 node_retires: vec![],
2721 edges: vec![],
2722 edge_retires: vec![],
2723 chunks: vec![],
2724 runs: vec![],
2725 steps: vec![],
2726 actions: vec![],
2727 optional_backfills: vec![],
2728 vec_inserts: vec![],
2729 operational_writes: vec![OperationalWrite::Append {
2730 collection: "audit_log".to_owned(),
2731 record_key: "evt-1".to_owned(),
2732 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2733 source_ref: Some("src-1".to_owned()),
2734 }],
2735 })
2736 .expect_err("invalid append must reject");
2737 assert!(matches!(error, EngineError::InvalidWrite(_)));
2738 assert!(error.to_string().contains("must be one of"));
2739
2740 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2741 let mutation_count: i64 = conn
2742 .query_row(
2743 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2744 [],
2745 |row| row.get(0),
2746 )
2747 .expect("mutation count");
2748 assert_eq!(mutation_count, 0);
2749 let filter_count: i64 = conn
2750 .query_row(
2751 "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2752 [],
2753 |row| row.get(0),
2754 )
2755 .expect("filter count");
2756 assert_eq!(filter_count, 0);
2757 }
2758
2759 #[test]
2760 fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2761 let db = NamedTempFile::new().expect("temporary db");
2762 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2763 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2764 conn.execute(
2765 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2766 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2767 [],
2768 )
2769 .expect("seed collection");
2770 drop(conn);
2771 let writer = WriterActor::start(
2772 db.path(),
2773 Arc::new(SchemaManager::new()),
2774 ProvenanceMode::Warn,
2775 Arc::new(TelemetryCounters::default()),
2776 )
2777 .expect("writer");
2778
2779 writer
2780 .submit(WriteRequest {
2781 label: "put-operational".to_owned(),
2782 nodes: vec![],
2783 node_retires: vec![],
2784 edges: vec![],
2785 edge_retires: vec![],
2786 chunks: vec![],
2787 runs: vec![],
2788 steps: vec![],
2789 actions: vec![],
2790 optional_backfills: vec![],
2791 vec_inserts: vec![],
2792 operational_writes: vec![OperationalWrite::Put {
2793 collection: "connector_health".to_owned(),
2794 record_key: "gmail".to_owned(),
2795 payload_json: r#"{"status":"ok"}"#.to_owned(),
2796 source_ref: Some("src-1".to_owned()),
2797 }],
2798 })
2799 .expect("put receipt");
2800
2801 writer
2802 .submit(WriteRequest {
2803 label: "delete-operational".to_owned(),
2804 nodes: vec![],
2805 node_retires: vec![],
2806 edges: vec![],
2807 edge_retires: vec![],
2808 chunks: vec![],
2809 runs: vec![],
2810 steps: vec![],
2811 actions: vec![],
2812 optional_backfills: vec![],
2813 vec_inserts: vec![],
2814 operational_writes: vec![OperationalWrite::Delete {
2815 collection: "connector_health".to_owned(),
2816 record_key: "gmail".to_owned(),
2817 source_ref: Some("src-2".to_owned()),
2818 }],
2819 })
2820 .expect("delete receipt");
2821
2822 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2823 let mutation_kinds: Vec<String> = {
2824 let mut stmt = conn
2825 .prepare(
2826 "SELECT op_kind FROM operational_mutations \
2827 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2828 ORDER BY mutation_order ASC",
2829 )
2830 .expect("stmt");
2831 stmt.query_map([], |row| row.get(0))
2832 .expect("rows")
2833 .collect::<Result<_, _>>()
2834 .expect("collect")
2835 };
2836 assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
2837 let current_count: i64 = conn
2838 .query_row(
2839 "SELECT count(*) FROM operational_current \
2840 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2841 [],
2842 |row| row.get(0),
2843 )
2844 .expect("current count");
2845 assert_eq!(current_count, 0);
2846 }
2847
2848 #[test]
2849 fn writer_delete_bypasses_validation_contract() {
2850 let db = NamedTempFile::new().expect("temporary db");
2851 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2852 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2853 conn.execute(
2854 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2855 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2856 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2857 )
2858 .expect("seed collection");
2859 drop(conn);
2860 let writer = WriterActor::start(
2861 db.path(),
2862 Arc::new(SchemaManager::new()),
2863 ProvenanceMode::Warn,
2864 Arc::new(TelemetryCounters::default()),
2865 )
2866 .expect("writer");
2867
2868 writer
2869 .submit(WriteRequest {
2870 label: "valid-put".to_owned(),
2871 nodes: vec![],
2872 node_retires: vec![],
2873 edges: vec![],
2874 edge_retires: vec![],
2875 chunks: vec![],
2876 runs: vec![],
2877 steps: vec![],
2878 actions: vec![],
2879 optional_backfills: vec![],
2880 vec_inserts: vec![],
2881 operational_writes: vec![OperationalWrite::Put {
2882 collection: "connector_health".to_owned(),
2883 record_key: "gmail".to_owned(),
2884 payload_json: r#"{"status":"ok"}"#.to_owned(),
2885 source_ref: Some("src-1".to_owned()),
2886 }],
2887 })
2888 .expect("put receipt");
2889 writer
2890 .submit(WriteRequest {
2891 label: "delete-after-put".to_owned(),
2892 nodes: vec![],
2893 node_retires: vec![],
2894 edges: vec![],
2895 edge_retires: vec![],
2896 chunks: vec![],
2897 runs: vec![],
2898 steps: vec![],
2899 actions: vec![],
2900 optional_backfills: vec![],
2901 vec_inserts: vec![],
2902 operational_writes: vec![OperationalWrite::Delete {
2903 collection: "connector_health".to_owned(),
2904 record_key: "gmail".to_owned(),
2905 source_ref: Some("src-2".to_owned()),
2906 }],
2907 })
2908 .expect("delete receipt");
2909
2910 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2911 let current_count: i64 = conn
2912 .query_row(
2913 "SELECT count(*) FROM operational_current \
2914 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2915 [],
2916 |row| row.get(0),
2917 )
2918 .expect("current count");
2919 assert_eq!(current_count, 0);
2920 }
2921
2922 #[test]
2923 fn writer_latest_state_secondary_indexes_track_put_and_delete() {
2924 let db = NamedTempFile::new().expect("temporary db");
2925 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2926 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2927 conn.execute(
2928 "INSERT INTO operational_collections \
2929 (name, kind, schema_json, retention_json, secondary_indexes_json) \
2930 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2931 [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
2932 )
2933 .expect("seed collection");
2934 drop(conn);
2935 let writer = WriterActor::start(
2936 db.path(),
2937 Arc::new(SchemaManager::new()),
2938 ProvenanceMode::Warn,
2939 Arc::new(TelemetryCounters::default()),
2940 )
2941 .expect("writer");
2942
2943 writer
2944 .submit(WriteRequest {
2945 label: "secondary-index-put".to_owned(),
2946 nodes: vec![],
2947 node_retires: vec![],
2948 edges: vec![],
2949 edge_retires: vec![],
2950 chunks: vec![],
2951 runs: vec![],
2952 steps: vec![],
2953 actions: vec![],
2954 optional_backfills: vec![],
2955 vec_inserts: vec![],
2956 operational_writes: vec![OperationalWrite::Put {
2957 collection: "connector_health".to_owned(),
2958 record_key: "gmail".to_owned(),
2959 payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
2960 .to_owned(),
2961 source_ref: Some("src-1".to_owned()),
2962 }],
2963 })
2964 .expect("put receipt");
2965
2966 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2967 let current_entry_count: i64 = conn
2968 .query_row(
2969 "SELECT count(*) FROM operational_secondary_index_entries \
2970 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2971 [],
2972 |row| row.get(0),
2973 )
2974 .expect("current secondary index count");
2975 assert_eq!(current_entry_count, 2);
2976 drop(conn);
2977
2978 writer
2979 .submit(WriteRequest {
2980 label: "secondary-index-delete".to_owned(),
2981 nodes: vec![],
2982 node_retires: vec![],
2983 edges: vec![],
2984 edge_retires: vec![],
2985 chunks: vec![],
2986 runs: vec![],
2987 steps: vec![],
2988 actions: vec![],
2989 optional_backfills: vec![],
2990 vec_inserts: vec![],
2991 operational_writes: vec![OperationalWrite::Delete {
2992 collection: "connector_health".to_owned(),
2993 record_key: "gmail".to_owned(),
2994 source_ref: Some("src-2".to_owned()),
2995 }],
2996 })
2997 .expect("delete receipt");
2998
2999 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3000 let current_entry_count: i64 = conn
3001 .query_row(
3002 "SELECT count(*) FROM operational_secondary_index_entries \
3003 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3004 [],
3005 |row| row.get(0),
3006 )
3007 .expect("current secondary index count");
3008 assert_eq!(current_entry_count, 0);
3009 }
3010
3011 #[test]
3012 fn writer_latest_state_operational_writes_persist_mutation_order() {
3013 let db = NamedTempFile::new().expect("temporary db");
3014 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3015 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3016 conn.execute(
3017 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3018 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3019 [],
3020 )
3021 .expect("seed collection");
3022 drop(conn);
3023
3024 let writer = WriterActor::start(
3025 db.path(),
3026 Arc::new(SchemaManager::new()),
3027 ProvenanceMode::Warn,
3028 Arc::new(TelemetryCounters::default()),
3029 )
3030 .expect("writer");
3031
3032 writer
3033 .submit(WriteRequest {
3034 label: "ordered-operational-batch".to_owned(),
3035 nodes: vec![],
3036 node_retires: vec![],
3037 edges: vec![],
3038 edge_retires: vec![],
3039 chunks: vec![],
3040 runs: vec![],
3041 steps: vec![],
3042 actions: vec![],
3043 optional_backfills: vec![],
3044 vec_inserts: vec![],
3045 operational_writes: vec![
3046 OperationalWrite::Put {
3047 collection: "connector_health".to_owned(),
3048 record_key: "gmail".to_owned(),
3049 payload_json: r#"{"status":"old"}"#.to_owned(),
3050 source_ref: Some("src-1".to_owned()),
3051 },
3052 OperationalWrite::Delete {
3053 collection: "connector_health".to_owned(),
3054 record_key: "gmail".to_owned(),
3055 source_ref: Some("src-2".to_owned()),
3056 },
3057 OperationalWrite::Put {
3058 collection: "connector_health".to_owned(),
3059 record_key: "gmail".to_owned(),
3060 payload_json: r#"{"status":"new"}"#.to_owned(),
3061 source_ref: Some("src-3".to_owned()),
3062 },
3063 ],
3064 })
3065 .expect("write receipt");
3066
3067 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3068 let rows: Vec<(String, i64)> = {
3069 let mut stmt = conn
3070 .prepare(
3071 "SELECT op_kind, mutation_order FROM operational_mutations \
3072 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3073 ORDER BY mutation_order ASC",
3074 )
3075 .expect("stmt");
3076 stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3077 .expect("rows")
3078 .collect::<Result<_, _>>()
3079 .expect("collect")
3080 };
3081 assert_eq!(
3082 rows,
3083 vec![
3084 ("put".to_owned(), 1),
3085 ("delete".to_owned(), 2),
3086 ("put".to_owned(), 3),
3087 ]
3088 );
3089 let payload: String = conn
3090 .query_row(
3091 "SELECT payload_json FROM operational_current \
3092 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3093 [],
3094 |row| row.get(0),
3095 )
3096 .expect("current payload");
3097 assert_eq!(payload, r#"{"status":"new"}"#);
3098 }
3099
3100 #[test]
3101 fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
3102 let db = NamedTempFile::new().expect("temporary db");
3103 let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
3104 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3105 conn.execute(
3106 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3107 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3108 [],
3109 )
3110 .expect("seed collection");
3111
3112 let request = WriteRequest {
3113 label: "disabled-race".to_owned(),
3114 nodes: vec![],
3115 node_retires: vec![],
3116 edges: vec![],
3117 edge_retires: vec![],
3118 chunks: vec![],
3119 runs: vec![],
3120 steps: vec![],
3121 actions: vec![],
3122 optional_backfills: vec![],
3123 vec_inserts: vec![],
3124 operational_writes: vec![OperationalWrite::Put {
3125 collection: "connector_health".to_owned(),
3126 record_key: "gmail".to_owned(),
3127 payload_json: r#"{"status":"ok"}"#.to_owned(),
3128 source_ref: Some("src-1".to_owned()),
3129 }],
3130 };
3131 let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
3132 resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
3133
3134 conn.execute(
3135 "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
3136 [],
3137 )
3138 .expect("disable collection after preflight");
3139
3140 let error =
3141 apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
3142 assert!(matches!(error, EngineError::InvalidWrite(_)));
3143 assert!(error.to_string().contains("is disabled"));
3144
3145 let mutation_count: i64 = conn
3146 .query_row(
3147 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3148 [],
3149 |row| row.get(0),
3150 )
3151 .expect("mutation count");
3152 assert_eq!(mutation_count, 0);
3153
3154 let current_count: i64 = conn
3155 .query_row(
3156 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3157 [],
3158 |row| row.get(0),
3159 )
3160 .expect("current count");
3161 assert_eq!(current_count, 0);
3162 }
3163
3164 #[test]
3165 fn writer_enforce_validation_rejects_invalid_put_atomically() {
3166 let db = NamedTempFile::new().expect("temporary db");
3167 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3168 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3169 conn.execute(
3170 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3171 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3172 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3173 )
3174 .expect("seed collection");
3175 drop(conn);
3176 let writer = WriterActor::start(
3177 db.path(),
3178 Arc::new(SchemaManager::new()),
3179 ProvenanceMode::Warn,
3180 Arc::new(TelemetryCounters::default()),
3181 )
3182 .expect("writer");
3183
3184 let error = writer
3185 .submit(WriteRequest {
3186 label: "invalid-put".to_owned(),
3187 nodes: vec![NodeInsert {
3188 row_id: "row-1".to_owned(),
3189 logical_id: "lg-1".to_owned(),
3190 kind: "Meeting".to_owned(),
3191 properties: "{}".to_owned(),
3192 source_ref: Some("src-1".to_owned()),
3193 upsert: false,
3194 chunk_policy: ChunkPolicy::Preserve,
3195 content_ref: None,
3196 }],
3197 node_retires: vec![],
3198 edges: vec![],
3199 edge_retires: vec![],
3200 chunks: vec![],
3201 runs: vec![],
3202 steps: vec![],
3203 actions: vec![],
3204 optional_backfills: vec![],
3205 vec_inserts: vec![],
3206 operational_writes: vec![OperationalWrite::Put {
3207 collection: "connector_health".to_owned(),
3208 record_key: "gmail".to_owned(),
3209 payload_json: r#"{"status":"bogus"}"#.to_owned(),
3210 source_ref: Some("src-1".to_owned()),
3211 }],
3212 })
3213 .expect_err("invalid put must reject");
3214 assert!(matches!(error, EngineError::InvalidWrite(_)));
3215 assert!(error.to_string().contains("must be one of"));
3216
3217 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3218 let node_count: i64 = conn
3219 .query_row(
3220 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
3221 [],
3222 |row| row.get(0),
3223 )
3224 .expect("node count");
3225 assert_eq!(node_count, 0);
3226 let mutation_count: i64 = conn
3227 .query_row(
3228 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3229 [],
3230 |row| row.get(0),
3231 )
3232 .expect("mutation count");
3233 assert_eq!(mutation_count, 0);
3234 let current_count: i64 = conn
3235 .query_row(
3236 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3237 [],
3238 |row| row.get(0),
3239 )
3240 .expect("current count");
3241 assert_eq!(current_count, 0);
3242 }
3243
3244 #[test]
3245 fn writer_rejects_append_against_latest_state_collection() {
3246 let db = NamedTempFile::new().expect("temporary db");
3247 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3248 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3249 conn.execute(
3250 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3251 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3252 [],
3253 )
3254 .expect("seed collection");
3255 drop(conn);
3256 let writer = WriterActor::start(
3257 db.path(),
3258 Arc::new(SchemaManager::new()),
3259 ProvenanceMode::Warn,
3260 Arc::new(TelemetryCounters::default()),
3261 )
3262 .expect("writer");
3263
3264 let result = writer.submit(WriteRequest {
3265 label: "bad-append".to_owned(),
3266 nodes: vec![],
3267 node_retires: vec![],
3268 edges: vec![],
3269 edge_retires: vec![],
3270 chunks: vec![],
3271 runs: vec![],
3272 steps: vec![],
3273 actions: vec![],
3274 optional_backfills: vec![],
3275 vec_inserts: vec![],
3276 operational_writes: vec![OperationalWrite::Append {
3277 collection: "connector_health".to_owned(),
3278 record_key: "gmail".to_owned(),
3279 payload_json: r#"{"status":"ok"}"#.to_owned(),
3280 source_ref: Some("src-1".to_owned()),
3281 }],
3282 });
3283
3284 assert!(
3285 matches!(result, Err(EngineError::InvalidWrite(_))),
3286 "latest_state collection must reject Append"
3287 );
3288 }
3289
3290 #[test]
3291 fn writer_upsert_supersedes_prior_active_node() {
3292 let db = NamedTempFile::new().expect("temporary db");
3293 let writer = WriterActor::start(
3294 db.path(),
3295 Arc::new(SchemaManager::new()),
3296 ProvenanceMode::Warn,
3297 Arc::new(TelemetryCounters::default()),
3298 )
3299 .expect("writer");
3300
3301 writer
3302 .submit(WriteRequest {
3303 label: "v1".to_owned(),
3304 nodes: vec![NodeInsert {
3305 row_id: "row-1".to_owned(),
3306 logical_id: "lg-1".to_owned(),
3307 kind: "Meeting".to_owned(),
3308 properties: r#"{"version":1}"#.to_owned(),
3309 source_ref: Some("src-1".to_owned()),
3310 upsert: false,
3311 chunk_policy: ChunkPolicy::Preserve,
3312 content_ref: None,
3313 }],
3314 node_retires: vec![],
3315 edges: vec![],
3316 edge_retires: vec![],
3317 chunks: vec![],
3318 runs: vec![],
3319 steps: vec![],
3320 actions: vec![],
3321 optional_backfills: vec![],
3322 vec_inserts: vec![],
3323 operational_writes: vec![],
3324 })
3325 .expect("v1 write");
3326
3327 writer
3328 .submit(WriteRequest {
3329 label: "v2".to_owned(),
3330 nodes: vec![NodeInsert {
3331 row_id: "row-2".to_owned(),
3332 logical_id: "lg-1".to_owned(),
3333 kind: "Meeting".to_owned(),
3334 properties: r#"{"version":2}"#.to_owned(),
3335 source_ref: Some("src-2".to_owned()),
3336 upsert: true,
3337 chunk_policy: ChunkPolicy::Preserve,
3338 content_ref: None,
3339 }],
3340 node_retires: vec![],
3341 edges: vec![],
3342 edge_retires: vec![],
3343 chunks: vec![],
3344 runs: vec![],
3345 steps: vec![],
3346 actions: vec![],
3347 optional_backfills: vec![],
3348 vec_inserts: vec![],
3349 operational_writes: vec![],
3350 })
3351 .expect("v2 upsert write");
3352
3353 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3354 let (active_row_id, props): (String, String) = conn
3355 .query_row(
3356 "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3357 [],
3358 |row| Ok((row.get(0)?, row.get(1)?)),
3359 )
3360 .expect("active row");
3361 assert_eq!(active_row_id, "row-2");
3362 assert!(props.contains("\"version\":2"));
3363
3364 let superseded: i64 = conn
3365 .query_row(
3366 "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3367 [],
3368 |row| row.get(0),
3369 )
3370 .expect("superseded count");
3371 assert_eq!(superseded, 1);
3372 }
3373
3374 #[test]
3375 fn writer_inserts_edge_between_two_nodes() {
3376 let db = NamedTempFile::new().expect("temporary db");
3377 let writer = WriterActor::start(
3378 db.path(),
3379 Arc::new(SchemaManager::new()),
3380 ProvenanceMode::Warn,
3381 Arc::new(TelemetryCounters::default()),
3382 )
3383 .expect("writer");
3384
3385 writer
3386 .submit(WriteRequest {
3387 label: "nodes-and-edge".to_owned(),
3388 nodes: vec![
3389 NodeInsert {
3390 row_id: "row-meeting".to_owned(),
3391 logical_id: "meeting-1".to_owned(),
3392 kind: "Meeting".to_owned(),
3393 properties: "{}".to_owned(),
3394 source_ref: Some("src-1".to_owned()),
3395 upsert: false,
3396 chunk_policy: ChunkPolicy::Preserve,
3397 content_ref: None,
3398 },
3399 NodeInsert {
3400 row_id: "row-task".to_owned(),
3401 logical_id: "task-1".to_owned(),
3402 kind: "Task".to_owned(),
3403 properties: "{}".to_owned(),
3404 source_ref: Some("src-1".to_owned()),
3405 upsert: false,
3406 chunk_policy: ChunkPolicy::Preserve,
3407 content_ref: None,
3408 },
3409 ],
3410 node_retires: vec![],
3411 edges: vec![EdgeInsert {
3412 row_id: "edge-1".to_owned(),
3413 logical_id: "edge-lg-1".to_owned(),
3414 source_logical_id: "meeting-1".to_owned(),
3415 target_logical_id: "task-1".to_owned(),
3416 kind: "HAS_TASK".to_owned(),
3417 properties: "{}".to_owned(),
3418 source_ref: Some("src-1".to_owned()),
3419 upsert: false,
3420 }],
3421 edge_retires: vec![],
3422 chunks: vec![],
3423 runs: vec![],
3424 steps: vec![],
3425 actions: vec![],
3426 optional_backfills: vec![],
3427 vec_inserts: vec![],
3428 operational_writes: vec![],
3429 })
3430 .expect("write receipt");
3431
3432 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3433 let (src, tgt, kind): (String, String, String) = conn
3434 .query_row(
3435 "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3436 [],
3437 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3438 )
3439 .expect("edge row");
3440 assert_eq!(src, "meeting-1");
3441 assert_eq!(tgt, "task-1");
3442 assert_eq!(kind, "HAS_TASK");
3443 }
3444
3445 #[test]
3446 #[allow(clippy::too_many_lines)]
3447 fn writer_upsert_supersedes_prior_active_edge() {
3448 let db = NamedTempFile::new().expect("temporary db");
3449 let writer = WriterActor::start(
3450 db.path(),
3451 Arc::new(SchemaManager::new()),
3452 ProvenanceMode::Warn,
3453 Arc::new(TelemetryCounters::default()),
3454 )
3455 .expect("writer");
3456
3457 writer
3459 .submit(WriteRequest {
3460 label: "nodes".to_owned(),
3461 nodes: vec![
3462 NodeInsert {
3463 row_id: "row-a".to_owned(),
3464 logical_id: "node-a".to_owned(),
3465 kind: "Meeting".to_owned(),
3466 properties: "{}".to_owned(),
3467 source_ref: Some("src-1".to_owned()),
3468 upsert: false,
3469 chunk_policy: ChunkPolicy::Preserve,
3470 content_ref: None,
3471 },
3472 NodeInsert {
3473 row_id: "row-b".to_owned(),
3474 logical_id: "node-b".to_owned(),
3475 kind: "Task".to_owned(),
3476 properties: "{}".to_owned(),
3477 source_ref: Some("src-1".to_owned()),
3478 upsert: false,
3479 chunk_policy: ChunkPolicy::Preserve,
3480 content_ref: None,
3481 },
3482 ],
3483 node_retires: vec![],
3484 edges: vec![],
3485 edge_retires: vec![],
3486 chunks: vec![],
3487 runs: vec![],
3488 steps: vec![],
3489 actions: vec![],
3490 optional_backfills: vec![],
3491 vec_inserts: vec![],
3492 operational_writes: vec![],
3493 })
3494 .expect("nodes write");
3495
3496 writer
3498 .submit(WriteRequest {
3499 label: "edge-v1".to_owned(),
3500 nodes: vec![],
3501 node_retires: vec![],
3502 edges: vec![EdgeInsert {
3503 row_id: "edge-row-1".to_owned(),
3504 logical_id: "edge-lg-1".to_owned(),
3505 source_logical_id: "node-a".to_owned(),
3506 target_logical_id: "node-b".to_owned(),
3507 kind: "HAS_TASK".to_owned(),
3508 properties: r#"{"weight":1}"#.to_owned(),
3509 source_ref: Some("src-1".to_owned()),
3510 upsert: false,
3511 }],
3512 edge_retires: vec![],
3513 chunks: vec![],
3514 runs: vec![],
3515 steps: vec![],
3516 actions: vec![],
3517 optional_backfills: vec![],
3518 vec_inserts: vec![],
3519 operational_writes: vec![],
3520 })
3521 .expect("edge v1 write");
3522
3523 writer
3525 .submit(WriteRequest {
3526 label: "edge-v2".to_owned(),
3527 nodes: vec![],
3528 node_retires: vec![],
3529 edges: vec![EdgeInsert {
3530 row_id: "edge-row-2".to_owned(),
3531 logical_id: "edge-lg-1".to_owned(),
3532 source_logical_id: "node-a".to_owned(),
3533 target_logical_id: "node-b".to_owned(),
3534 kind: "HAS_TASK".to_owned(),
3535 properties: r#"{"weight":2}"#.to_owned(),
3536 source_ref: Some("src-2".to_owned()),
3537 upsert: true,
3538 }],
3539 edge_retires: vec![],
3540 chunks: vec![],
3541 runs: vec![],
3542 steps: vec![],
3543 actions: vec![],
3544 optional_backfills: vec![],
3545 vec_inserts: vec![],
3546 operational_writes: vec![],
3547 })
3548 .expect("edge v2 upsert");
3549
3550 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3551 let (active_row_id, props): (String, String) = conn
3552 .query_row(
3553 "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3554 [],
3555 |row| Ok((row.get(0)?, row.get(1)?)),
3556 )
3557 .expect("active edge");
3558 assert_eq!(active_row_id, "edge-row-2");
3559 assert!(props.contains("\"weight\":2"));
3560
3561 let superseded: i64 = conn
3562 .query_row(
3563 "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3564 [],
3565 |row| row.get(0),
3566 )
3567 .expect("superseded count");
3568 assert_eq!(superseded, 1);
3569 }
3570
3571 #[test]
3572 fn writer_fts_rows_are_written_to_database() {
3573 let db = NamedTempFile::new().expect("temporary db");
3574 let writer = WriterActor::start(
3575 db.path(),
3576 Arc::new(SchemaManager::new()),
3577 ProvenanceMode::Warn,
3578 Arc::new(TelemetryCounters::default()),
3579 )
3580 .expect("writer");
3581
3582 writer
3583 .submit(WriteRequest {
3584 label: "seed".to_owned(),
3585 nodes: vec![NodeInsert {
3586 row_id: "row-1".to_owned(),
3587 logical_id: "logical-1".to_owned(),
3588 kind: "Meeting".to_owned(),
3589 properties: "{}".to_owned(),
3590 source_ref: Some("src-1".to_owned()),
3591 upsert: false,
3592 chunk_policy: ChunkPolicy::Preserve,
3593 content_ref: None,
3594 }],
3595 node_retires: vec![],
3596 edges: vec![],
3597 edge_retires: vec![],
3598 chunks: vec![ChunkInsert {
3599 id: "chunk-1".to_owned(),
3600 node_logical_id: "logical-1".to_owned(),
3601 text_content: "budget discussion".to_owned(),
3602 byte_start: None,
3603 byte_end: None,
3604 content_hash: None,
3605 }],
3606 runs: vec![],
3607 steps: vec![],
3608 actions: vec![],
3609 optional_backfills: vec![],
3610 vec_inserts: vec![],
3611 operational_writes: vec![],
3612 })
3613 .expect("write receipt");
3614
3615 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3616 let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3617 conn.query_row(
3618 "SELECT chunk_id, node_logical_id, kind, text_content \
3619 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3620 [],
3621 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3622 )
3623 .expect("fts row");
3624 assert_eq!(chunk_id, "chunk-1");
3625 assert_eq!(node_logical_id, "logical-1");
3626 assert_eq!(kind, "Meeting");
3627 assert_eq!(text_content, "budget discussion");
3628 }
3629
3630 #[test]
3631 fn writer_receipt_warns_on_nodes_without_source_ref() {
3632 let db = NamedTempFile::new().expect("temporary db");
3633 let writer = WriterActor::start(
3634 db.path(),
3635 Arc::new(SchemaManager::new()),
3636 ProvenanceMode::Warn,
3637 Arc::new(TelemetryCounters::default()),
3638 )
3639 .expect("writer");
3640
3641 let receipt = writer
3642 .submit(WriteRequest {
3643 label: "no-source".to_owned(),
3644 nodes: vec![NodeInsert {
3645 row_id: "row-1".to_owned(),
3646 logical_id: "logical-1".to_owned(),
3647 kind: "Meeting".to_owned(),
3648 properties: "{}".to_owned(),
3649 source_ref: None,
3650 upsert: false,
3651 chunk_policy: ChunkPolicy::Preserve,
3652 content_ref: None,
3653 }],
3654 node_retires: vec![],
3655 edges: vec![],
3656 edge_retires: vec![],
3657 chunks: vec![],
3658 runs: vec![],
3659 steps: vec![],
3660 actions: vec![],
3661 optional_backfills: vec![],
3662 vec_inserts: vec![],
3663 operational_writes: vec![],
3664 })
3665 .expect("write receipt");
3666
3667 assert_eq!(receipt.provenance_warnings.len(), 1);
3668 assert!(receipt.provenance_warnings[0].contains("logical-1"));
3669 }
3670
3671 #[test]
3672 fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3673 let db = NamedTempFile::new().expect("temporary db");
3674 let writer = WriterActor::start(
3675 db.path(),
3676 Arc::new(SchemaManager::new()),
3677 ProvenanceMode::Warn,
3678 Arc::new(TelemetryCounters::default()),
3679 )
3680 .expect("writer");
3681
3682 let receipt = writer
3683 .submit(WriteRequest {
3684 label: "with-source".to_owned(),
3685 nodes: vec![NodeInsert {
3686 row_id: "row-1".to_owned(),
3687 logical_id: "logical-1".to_owned(),
3688 kind: "Meeting".to_owned(),
3689 properties: "{}".to_owned(),
3690 source_ref: Some("src-1".to_owned()),
3691 upsert: false,
3692 chunk_policy: ChunkPolicy::Preserve,
3693 content_ref: None,
3694 }],
3695 node_retires: vec![],
3696 edges: vec![],
3697 edge_retires: vec![],
3698 chunks: vec![],
3699 runs: vec![],
3700 steps: vec![],
3701 actions: vec![],
3702 optional_backfills: vec![],
3703 vec_inserts: vec![],
3704 operational_writes: vec![],
3705 })
3706 .expect("write receipt");
3707
3708 assert!(receipt.provenance_warnings.is_empty());
3709 }
3710
3711 #[test]
3712 fn writer_accepts_chunk_for_pre_existing_node() {
3713 let db = NamedTempFile::new().expect("temporary db");
3714 let writer = WriterActor::start(
3715 db.path(),
3716 Arc::new(SchemaManager::new()),
3717 ProvenanceMode::Warn,
3718 Arc::new(TelemetryCounters::default()),
3719 )
3720 .expect("writer");
3721
3722 writer
3724 .submit(WriteRequest {
3725 label: "r1".to_owned(),
3726 nodes: vec![NodeInsert {
3727 row_id: "row-1".to_owned(),
3728 logical_id: "logical-1".to_owned(),
3729 kind: "Meeting".to_owned(),
3730 properties: "{}".to_owned(),
3731 source_ref: Some("src-1".to_owned()),
3732 upsert: false,
3733 chunk_policy: ChunkPolicy::Preserve,
3734 content_ref: None,
3735 }],
3736 node_retires: vec![],
3737 edges: vec![],
3738 edge_retires: vec![],
3739 chunks: vec![],
3740 runs: vec![],
3741 steps: vec![],
3742 actions: vec![],
3743 optional_backfills: vec![],
3744 vec_inserts: vec![],
3745 operational_writes: vec![],
3746 })
3747 .expect("r1 write");
3748
3749 writer
3751 .submit(WriteRequest {
3752 label: "r2".to_owned(),
3753 nodes: vec![],
3754 node_retires: vec![],
3755 edges: vec![],
3756 edge_retires: vec![],
3757 chunks: vec![ChunkInsert {
3758 id: "chunk-1".to_owned(),
3759 node_logical_id: "logical-1".to_owned(),
3760 text_content: "budget discussion".to_owned(),
3761 byte_start: None,
3762 byte_end: None,
3763 content_hash: None,
3764 }],
3765 runs: vec![],
3766 steps: vec![],
3767 actions: vec![],
3768 optional_backfills: vec![],
3769 vec_inserts: vec![],
3770 operational_writes: vec![],
3771 })
3772 .expect("r2 write — chunk for pre-existing node");
3773
3774 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3775 let count: i64 = conn
3776 .query_row(
3777 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3778 [],
3779 |row| row.get(0),
3780 )
3781 .expect("fts count");
3782 assert_eq!(
3783 count, 1,
3784 "FTS row must exist for chunk attached to pre-existing node"
3785 );
3786 }
3787
3788 #[test]
3789 fn writer_rejects_chunk_for_completely_unknown_node() {
3790 let db = NamedTempFile::new().expect("temporary db");
3791 let writer = WriterActor::start(
3792 db.path(),
3793 Arc::new(SchemaManager::new()),
3794 ProvenanceMode::Warn,
3795 Arc::new(TelemetryCounters::default()),
3796 )
3797 .expect("writer");
3798
3799 let result = writer.submit(WriteRequest {
3800 label: "bad".to_owned(),
3801 nodes: vec![],
3802 node_retires: vec![],
3803 edges: vec![],
3804 edge_retires: vec![],
3805 chunks: vec![ChunkInsert {
3806 id: "chunk-1".to_owned(),
3807 node_logical_id: "nonexistent".to_owned(),
3808 text_content: "some text".to_owned(),
3809 byte_start: None,
3810 byte_end: None,
3811 content_hash: None,
3812 }],
3813 runs: vec![],
3814 steps: vec![],
3815 actions: vec![],
3816 optional_backfills: vec![],
3817 vec_inserts: vec![],
3818 operational_writes: vec![],
3819 });
3820
3821 assert!(
3822 matches!(result, Err(EngineError::InvalidWrite(_))),
3823 "completely unknown node must return InvalidWrite"
3824 );
3825 }
3826
3827 #[test]
3828 fn writer_executes_typed_nodes_chunks_and_derived_projections() {
3829 let db = NamedTempFile::new().expect("temporary db");
3830 let writer = WriterActor::start(
3831 db.path(),
3832 Arc::new(SchemaManager::new()),
3833 ProvenanceMode::Warn,
3834 Arc::new(TelemetryCounters::default()),
3835 )
3836 .expect("writer");
3837
3838 let receipt = writer
3839 .submit(WriteRequest {
3840 label: "seed".to_owned(),
3841 nodes: vec![NodeInsert {
3842 row_id: "row-1".to_owned(),
3843 logical_id: "logical-1".to_owned(),
3844 kind: "Meeting".to_owned(),
3845 properties: "{}".to_owned(),
3846 source_ref: None,
3847 upsert: false,
3848 chunk_policy: ChunkPolicy::Preserve,
3849 content_ref: None,
3850 }],
3851 node_retires: vec![],
3852 edges: vec![],
3853 edge_retires: vec![],
3854 chunks: vec![ChunkInsert {
3855 id: "chunk-1".to_owned(),
3856 node_logical_id: "logical-1".to_owned(),
3857 text_content: "budget discussion".to_owned(),
3858 byte_start: None,
3859 byte_end: None,
3860 content_hash: None,
3861 }],
3862 runs: vec![],
3863 steps: vec![],
3864 actions: vec![],
3865 optional_backfills: vec![],
3866 vec_inserts: vec![],
3867 operational_writes: vec![],
3868 })
3869 .expect("write receipt");
3870
3871 assert_eq!(receipt.label, "seed");
3872 }
3873
3874 #[test]
3875 fn writer_node_retire_supersedes_active_node() {
3876 let db = NamedTempFile::new().expect("temporary db");
3877 let writer = WriterActor::start(
3878 db.path(),
3879 Arc::new(SchemaManager::new()),
3880 ProvenanceMode::Warn,
3881 Arc::new(TelemetryCounters::default()),
3882 )
3883 .expect("writer");
3884
3885 writer
3886 .submit(WriteRequest {
3887 label: "seed".to_owned(),
3888 nodes: vec![NodeInsert {
3889 row_id: "row-1".to_owned(),
3890 logical_id: "meeting-1".to_owned(),
3891 kind: "Meeting".to_owned(),
3892 properties: "{}".to_owned(),
3893 source_ref: Some("src-1".to_owned()),
3894 upsert: false,
3895 chunk_policy: ChunkPolicy::Preserve,
3896 content_ref: None,
3897 }],
3898 node_retires: vec![],
3899 edges: vec![],
3900 edge_retires: vec![],
3901 chunks: vec![],
3902 runs: vec![],
3903 steps: vec![],
3904 actions: vec![],
3905 optional_backfills: vec![],
3906 vec_inserts: vec![],
3907 operational_writes: vec![],
3908 })
3909 .expect("seed write");
3910
3911 writer
3912 .submit(WriteRequest {
3913 label: "retire".to_owned(),
3914 nodes: vec![],
3915 node_retires: vec![NodeRetire {
3916 logical_id: "meeting-1".to_owned(),
3917 source_ref: Some("src-2".to_owned()),
3918 }],
3919 edges: vec![],
3920 edge_retires: vec![],
3921 chunks: vec![],
3922 runs: vec![],
3923 steps: vec![],
3924 actions: vec![],
3925 optional_backfills: vec![],
3926 vec_inserts: vec![],
3927 operational_writes: vec![],
3928 })
3929 .expect("retire write");
3930
3931 let conn = rusqlite::Connection::open(db.path()).expect("open");
3932 let active: i64 = conn
3933 .query_row(
3934 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
3935 [],
3936 |r| r.get(0),
3937 )
3938 .expect("count active");
3939 let historical: i64 = conn
3940 .query_row(
3941 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
3942 [],
3943 |r| r.get(0),
3944 )
3945 .expect("count historical");
3946
3947 assert_eq!(active, 0, "active count must be 0 after retire");
3948 assert_eq!(historical, 1, "historical count must be 1 after retire");
3949 }
3950
3951 #[test]
3952 fn writer_node_retire_preserves_chunks_and_clears_fts() {
3953 let db = NamedTempFile::new().expect("temporary db");
3954 let writer = WriterActor::start(
3955 db.path(),
3956 Arc::new(SchemaManager::new()),
3957 ProvenanceMode::Warn,
3958 Arc::new(TelemetryCounters::default()),
3959 )
3960 .expect("writer");
3961
3962 writer
3963 .submit(WriteRequest {
3964 label: "seed".to_owned(),
3965 nodes: vec![NodeInsert {
3966 row_id: "row-1".to_owned(),
3967 logical_id: "meeting-1".to_owned(),
3968 kind: "Meeting".to_owned(),
3969 properties: "{}".to_owned(),
3970 source_ref: Some("src-1".to_owned()),
3971 upsert: false,
3972 chunk_policy: ChunkPolicy::Preserve,
3973 content_ref: None,
3974 }],
3975 node_retires: vec![],
3976 edges: vec![],
3977 edge_retires: vec![],
3978 chunks: vec![ChunkInsert {
3979 id: "chunk-1".to_owned(),
3980 node_logical_id: "meeting-1".to_owned(),
3981 text_content: "budget discussion".to_owned(),
3982 byte_start: None,
3983 byte_end: None,
3984 content_hash: None,
3985 }],
3986 runs: vec![],
3987 steps: vec![],
3988 actions: vec![],
3989 optional_backfills: vec![],
3990 vec_inserts: vec![],
3991 operational_writes: vec![],
3992 })
3993 .expect("seed write");
3994
3995 writer
3996 .submit(WriteRequest {
3997 label: "retire".to_owned(),
3998 nodes: vec![],
3999 node_retires: vec![NodeRetire {
4000 logical_id: "meeting-1".to_owned(),
4001 source_ref: Some("src-2".to_owned()),
4002 }],
4003 edges: vec![],
4004 edge_retires: vec![],
4005 chunks: vec![],
4006 runs: vec![],
4007 steps: vec![],
4008 actions: vec![],
4009 optional_backfills: vec![],
4010 vec_inserts: vec![],
4011 operational_writes: vec![],
4012 })
4013 .expect("retire write");
4014
4015 let conn = rusqlite::Connection::open(db.path()).expect("open");
4016 let chunk_count: i64 = conn
4017 .query_row(
4018 "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
4019 [],
4020 |r| r.get(0),
4021 )
4022 .expect("chunk count");
4023 let fts_count: i64 = conn
4024 .query_row(
4025 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
4026 [],
4027 |r| r.get(0),
4028 )
4029 .expect("fts count");
4030
4031 assert_eq!(
4032 chunk_count, 1,
4033 "chunks must remain after node retire so restore can re-establish content"
4034 );
4035 assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
4036 }
4037
4038 #[test]
4039 fn writer_edge_retire_supersedes_active_edge() {
4040 let db = NamedTempFile::new().expect("temporary db");
4041 let writer = WriterActor::start(
4042 db.path(),
4043 Arc::new(SchemaManager::new()),
4044 ProvenanceMode::Warn,
4045 Arc::new(TelemetryCounters::default()),
4046 )
4047 .expect("writer");
4048
4049 writer
4050 .submit(WriteRequest {
4051 label: "seed".to_owned(),
4052 nodes: vec![
4053 NodeInsert {
4054 row_id: "row-a".to_owned(),
4055 logical_id: "node-a".to_owned(),
4056 kind: "Meeting".to_owned(),
4057 properties: "{}".to_owned(),
4058 source_ref: Some("src-1".to_owned()),
4059 upsert: false,
4060 chunk_policy: ChunkPolicy::Preserve,
4061 content_ref: None,
4062 },
4063 NodeInsert {
4064 row_id: "row-b".to_owned(),
4065 logical_id: "node-b".to_owned(),
4066 kind: "Task".to_owned(),
4067 properties: "{}".to_owned(),
4068 source_ref: Some("src-1".to_owned()),
4069 upsert: false,
4070 chunk_policy: ChunkPolicy::Preserve,
4071 content_ref: None,
4072 },
4073 ],
4074 node_retires: vec![],
4075 edges: vec![EdgeInsert {
4076 row_id: "edge-1".to_owned(),
4077 logical_id: "edge-lg-1".to_owned(),
4078 source_logical_id: "node-a".to_owned(),
4079 target_logical_id: "node-b".to_owned(),
4080 kind: "HAS_TASK".to_owned(),
4081 properties: "{}".to_owned(),
4082 source_ref: Some("src-1".to_owned()),
4083 upsert: false,
4084 }],
4085 edge_retires: vec![],
4086 chunks: vec![],
4087 runs: vec![],
4088 steps: vec![],
4089 actions: vec![],
4090 optional_backfills: vec![],
4091 vec_inserts: vec![],
4092 operational_writes: vec![],
4093 })
4094 .expect("seed write");
4095
4096 writer
4097 .submit(WriteRequest {
4098 label: "retire-edge".to_owned(),
4099 nodes: vec![],
4100 node_retires: vec![],
4101 edges: vec![],
4102 edge_retires: vec![EdgeRetire {
4103 logical_id: "edge-lg-1".to_owned(),
4104 source_ref: Some("src-2".to_owned()),
4105 }],
4106 chunks: vec![],
4107 runs: vec![],
4108 steps: vec![],
4109 actions: vec![],
4110 optional_backfills: vec![],
4111 vec_inserts: vec![],
4112 operational_writes: vec![],
4113 })
4114 .expect("retire edge write");
4115
4116 let conn = rusqlite::Connection::open(db.path()).expect("open");
4117 let active: i64 = conn
4118 .query_row(
4119 "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
4120 [],
4121 |r| r.get(0),
4122 )
4123 .expect("active edge count");
4124
4125 assert_eq!(active, 0, "active edge count must be 0 after retire");
4126 }
4127
4128 #[test]
4129 fn writer_retire_without_source_ref_emits_provenance_warning() {
4130 let db = NamedTempFile::new().expect("temporary db");
4131 let writer = WriterActor::start(
4132 db.path(),
4133 Arc::new(SchemaManager::new()),
4134 ProvenanceMode::Warn,
4135 Arc::new(TelemetryCounters::default()),
4136 )
4137 .expect("writer");
4138
4139 writer
4140 .submit(WriteRequest {
4141 label: "seed".to_owned(),
4142 nodes: vec![NodeInsert {
4143 row_id: "row-1".to_owned(),
4144 logical_id: "meeting-1".to_owned(),
4145 kind: "Meeting".to_owned(),
4146 properties: "{}".to_owned(),
4147 source_ref: Some("src-1".to_owned()),
4148 upsert: false,
4149 chunk_policy: ChunkPolicy::Preserve,
4150 content_ref: None,
4151 }],
4152 node_retires: vec![],
4153 edges: vec![],
4154 edge_retires: vec![],
4155 chunks: vec![],
4156 runs: vec![],
4157 steps: vec![],
4158 actions: vec![],
4159 optional_backfills: vec![],
4160 vec_inserts: vec![],
4161 operational_writes: vec![],
4162 })
4163 .expect("seed write");
4164
4165 let receipt = writer
4166 .submit(WriteRequest {
4167 label: "retire-no-src".to_owned(),
4168 nodes: vec![],
4169 node_retires: vec![NodeRetire {
4170 logical_id: "meeting-1".to_owned(),
4171 source_ref: None,
4172 }],
4173 edges: vec![],
4174 edge_retires: vec![],
4175 chunks: vec![],
4176 runs: vec![],
4177 steps: vec![],
4178 actions: vec![],
4179 optional_backfills: vec![],
4180 vec_inserts: vec![],
4181 operational_writes: vec![],
4182 })
4183 .expect("retire write");
4184
4185 assert!(
4186 !receipt.provenance_warnings.is_empty(),
4187 "retire without source_ref must emit a provenance warning"
4188 );
4189 }
4190
4191 #[test]
4192 #[allow(clippy::too_many_lines)]
4193 fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
4194 let db = NamedTempFile::new().expect("temporary db");
4195 let writer = WriterActor::start(
4196 db.path(),
4197 Arc::new(SchemaManager::new()),
4198 ProvenanceMode::Warn,
4199 Arc::new(TelemetryCounters::default()),
4200 )
4201 .expect("writer");
4202
4203 writer
4204 .submit(WriteRequest {
4205 label: "v1".to_owned(),
4206 nodes: vec![NodeInsert {
4207 row_id: "row-1".to_owned(),
4208 logical_id: "meeting-1".to_owned(),
4209 kind: "Meeting".to_owned(),
4210 properties: "{}".to_owned(),
4211 source_ref: Some("src-1".to_owned()),
4212 upsert: false,
4213 chunk_policy: ChunkPolicy::Preserve,
4214 content_ref: None,
4215 }],
4216 node_retires: vec![],
4217 edges: vec![],
4218 edge_retires: vec![],
4219 chunks: vec![ChunkInsert {
4220 id: "chunk-old".to_owned(),
4221 node_logical_id: "meeting-1".to_owned(),
4222 text_content: "old text".to_owned(),
4223 byte_start: None,
4224 byte_end: None,
4225 content_hash: None,
4226 }],
4227 runs: vec![],
4228 steps: vec![],
4229 actions: vec![],
4230 optional_backfills: vec![],
4231 vec_inserts: vec![],
4232 operational_writes: vec![],
4233 })
4234 .expect("v1 write");
4235
4236 writer
4237 .submit(WriteRequest {
4238 label: "v2".to_owned(),
4239 nodes: vec![NodeInsert {
4240 row_id: "row-2".to_owned(),
4241 logical_id: "meeting-1".to_owned(),
4242 kind: "Meeting".to_owned(),
4243 properties: "{}".to_owned(),
4244 source_ref: Some("src-2".to_owned()),
4245 upsert: true,
4246 chunk_policy: ChunkPolicy::Replace,
4247 content_ref: None,
4248 }],
4249 node_retires: vec![],
4250 edges: vec![],
4251 edge_retires: vec![],
4252 chunks: vec![ChunkInsert {
4253 id: "chunk-new".to_owned(),
4254 node_logical_id: "meeting-1".to_owned(),
4255 text_content: "new text".to_owned(),
4256 byte_start: None,
4257 byte_end: None,
4258 content_hash: None,
4259 }],
4260 runs: vec![],
4261 steps: vec![],
4262 actions: vec![],
4263 optional_backfills: vec![],
4264 vec_inserts: vec![],
4265 operational_writes: vec![],
4266 })
4267 .expect("v2 write");
4268
4269 let conn = rusqlite::Connection::open(db.path()).expect("open");
4270 let old_chunk: i64 = conn
4271 .query_row(
4272 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4273 [],
4274 |r| r.get(0),
4275 )
4276 .expect("old chunk count");
4277 let new_chunk: i64 = conn
4278 .query_row(
4279 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4280 [],
4281 |r| r.get(0),
4282 )
4283 .expect("new chunk count");
4284 let fts_old: i64 = conn
4285 .query_row(
4286 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4287 [],
4288 |r| r.get(0),
4289 )
4290 .expect("old fts count");
4291
4292 assert_eq!(
4293 old_chunk, 0,
4294 "old chunk must be deleted by ChunkPolicy::Replace"
4295 );
4296 assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4297 assert_eq!(
4298 fts_old, 0,
4299 "old FTS row must be deleted by ChunkPolicy::Replace"
4300 );
4301 }
4302
4303 #[test]
4304 fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4305 let db = NamedTempFile::new().expect("temporary db");
4306 let writer = WriterActor::start(
4307 db.path(),
4308 Arc::new(SchemaManager::new()),
4309 ProvenanceMode::Warn,
4310 Arc::new(TelemetryCounters::default()),
4311 )
4312 .expect("writer");
4313
4314 writer
4315 .submit(WriteRequest {
4316 label: "v1".to_owned(),
4317 nodes: vec![NodeInsert {
4318 row_id: "row-1".to_owned(),
4319 logical_id: "meeting-1".to_owned(),
4320 kind: "Meeting".to_owned(),
4321 properties: "{}".to_owned(),
4322 source_ref: Some("src-1".to_owned()),
4323 upsert: false,
4324 chunk_policy: ChunkPolicy::Preserve,
4325 content_ref: None,
4326 }],
4327 node_retires: vec![],
4328 edges: vec![],
4329 edge_retires: vec![],
4330 chunks: vec![ChunkInsert {
4331 id: "chunk-old".to_owned(),
4332 node_logical_id: "meeting-1".to_owned(),
4333 text_content: "old text".to_owned(),
4334 byte_start: None,
4335 byte_end: None,
4336 content_hash: None,
4337 }],
4338 runs: vec![],
4339 steps: vec![],
4340 actions: vec![],
4341 optional_backfills: vec![],
4342 vec_inserts: vec![],
4343 operational_writes: vec![],
4344 })
4345 .expect("v1 write");
4346
4347 writer
4348 .submit(WriteRequest {
4349 label: "v2-props-only".to_owned(),
4350 nodes: vec![NodeInsert {
4351 row_id: "row-2".to_owned(),
4352 logical_id: "meeting-1".to_owned(),
4353 kind: "Meeting".to_owned(),
4354 properties: r#"{"status":"updated"}"#.to_owned(),
4355 source_ref: Some("src-2".to_owned()),
4356 upsert: true,
4357 chunk_policy: ChunkPolicy::Preserve,
4358 content_ref: None,
4359 }],
4360 node_retires: vec![],
4361 edges: vec![],
4362 edge_retires: vec![],
4363 chunks: vec![],
4364 runs: vec![],
4365 steps: vec![],
4366 actions: vec![],
4367 optional_backfills: vec![],
4368 vec_inserts: vec![],
4369 operational_writes: vec![],
4370 })
4371 .expect("v2 preserve write");
4372
4373 let conn = rusqlite::Connection::open(db.path()).expect("open");
4374 let old_chunk: i64 = conn
4375 .query_row(
4376 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4377 [],
4378 |r| r.get(0),
4379 )
4380 .expect("old chunk count");
4381
4382 assert_eq!(
4383 old_chunk, 1,
4384 "old chunk must be preserved by ChunkPolicy::Preserve"
4385 );
4386 }
4387
4388 #[test]
4389 fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4390 let db = NamedTempFile::new().expect("temporary db");
4391 let writer = WriterActor::start(
4392 db.path(),
4393 Arc::new(SchemaManager::new()),
4394 ProvenanceMode::Warn,
4395 Arc::new(TelemetryCounters::default()),
4396 )
4397 .expect("writer");
4398
4399 writer
4400 .submit(WriteRequest {
4401 label: "v1".to_owned(),
4402 nodes: vec![NodeInsert {
4403 row_id: "row-1".to_owned(),
4404 logical_id: "meeting-1".to_owned(),
4405 kind: "Meeting".to_owned(),
4406 properties: "{}".to_owned(),
4407 source_ref: Some("src-1".to_owned()),
4408 upsert: false,
4409 chunk_policy: ChunkPolicy::Preserve,
4410 content_ref: None,
4411 }],
4412 node_retires: vec![],
4413 edges: vec![],
4414 edge_retires: vec![],
4415 chunks: vec![ChunkInsert {
4416 id: "chunk-existing".to_owned(),
4417 node_logical_id: "meeting-1".to_owned(),
4418 text_content: "existing text".to_owned(),
4419 byte_start: None,
4420 byte_end: None,
4421 content_hash: None,
4422 }],
4423 runs: vec![],
4424 steps: vec![],
4425 actions: vec![],
4426 optional_backfills: vec![],
4427 vec_inserts: vec![],
4428 operational_writes: vec![],
4429 })
4430 .expect("v1 write");
4431
4432 writer
4434 .submit(WriteRequest {
4435 label: "insert-no-upsert".to_owned(),
4436 nodes: vec![NodeInsert {
4437 row_id: "row-2".to_owned(),
4438 logical_id: "meeting-2".to_owned(),
4439 kind: "Meeting".to_owned(),
4440 properties: "{}".to_owned(),
4441 source_ref: Some("src-2".to_owned()),
4442 upsert: false,
4443 chunk_policy: ChunkPolicy::Replace,
4444 content_ref: None,
4445 }],
4446 node_retires: vec![],
4447 edges: vec![],
4448 edge_retires: vec![],
4449 chunks: vec![],
4450 runs: vec![],
4451 steps: vec![],
4452 actions: vec![],
4453 optional_backfills: vec![],
4454 vec_inserts: vec![],
4455 operational_writes: vec![],
4456 })
4457 .expect("insert no-upsert write");
4458
4459 let conn = rusqlite::Connection::open(db.path()).expect("open");
4460 let existing_chunk: i64 = conn
4461 .query_row(
4462 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4463 [],
4464 |r| r.get(0),
4465 )
4466 .expect("chunk count");
4467
4468 assert_eq!(
4469 existing_chunk, 1,
4470 "ChunkPolicy::Replace without upsert must not delete existing chunks"
4471 );
4472 }
4473
4474 #[test]
4475 fn writer_run_upsert_supersedes_prior_active_run() {
4476 let db = NamedTempFile::new().expect("temporary db");
4477 let writer = WriterActor::start(
4478 db.path(),
4479 Arc::new(SchemaManager::new()),
4480 ProvenanceMode::Warn,
4481 Arc::new(TelemetryCounters::default()),
4482 )
4483 .expect("writer");
4484
4485 writer
4486 .submit(WriteRequest {
4487 label: "v1".to_owned(),
4488 nodes: vec![],
4489 node_retires: vec![],
4490 edges: vec![],
4491 edge_retires: vec![],
4492 chunks: vec![],
4493 runs: vec![RunInsert {
4494 id: "run-v1".to_owned(),
4495 kind: "session".to_owned(),
4496 status: "completed".to_owned(),
4497 properties: "{}".to_owned(),
4498 source_ref: Some("src-1".to_owned()),
4499 upsert: false,
4500 supersedes_id: None,
4501 }],
4502 steps: vec![],
4503 actions: vec![],
4504 optional_backfills: vec![],
4505 vec_inserts: vec![],
4506 operational_writes: vec![],
4507 })
4508 .expect("v1 run write");
4509
4510 writer
4511 .submit(WriteRequest {
4512 label: "v2".to_owned(),
4513 nodes: vec![],
4514 node_retires: vec![],
4515 edges: vec![],
4516 edge_retires: vec![],
4517 chunks: vec![],
4518 runs: vec![RunInsert {
4519 id: "run-v2".to_owned(),
4520 kind: "session".to_owned(),
4521 status: "completed".to_owned(),
4522 properties: "{}".to_owned(),
4523 source_ref: Some("src-2".to_owned()),
4524 upsert: true,
4525 supersedes_id: Some("run-v1".to_owned()),
4526 }],
4527 steps: vec![],
4528 actions: vec![],
4529 optional_backfills: vec![],
4530 vec_inserts: vec![],
4531 operational_writes: vec![],
4532 })
4533 .expect("v2 run write");
4534
4535 let conn = rusqlite::Connection::open(db.path()).expect("open");
4536 let v1_historical: i64 = conn
4537 .query_row(
4538 "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4539 [],
4540 |r| r.get(0),
4541 )
4542 .expect("v1 historical count");
4543 let v2_active: i64 = conn
4544 .query_row(
4545 "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4546 [],
4547 |r| r.get(0),
4548 )
4549 .expect("v2 active count");
4550
4551 assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4552 assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4553 }
4554
4555 #[test]
4556 fn writer_step_upsert_supersedes_prior_active_step() {
4557 let db = NamedTempFile::new().expect("temporary db");
4558 let writer = WriterActor::start(
4559 db.path(),
4560 Arc::new(SchemaManager::new()),
4561 ProvenanceMode::Warn,
4562 Arc::new(TelemetryCounters::default()),
4563 )
4564 .expect("writer");
4565
4566 writer
4567 .submit(WriteRequest {
4568 label: "v1".to_owned(),
4569 nodes: vec![],
4570 node_retires: vec![],
4571 edges: vec![],
4572 edge_retires: vec![],
4573 chunks: vec![],
4574 runs: vec![RunInsert {
4575 id: "run-1".to_owned(),
4576 kind: "session".to_owned(),
4577 status: "completed".to_owned(),
4578 properties: "{}".to_owned(),
4579 source_ref: Some("src-1".to_owned()),
4580 upsert: false,
4581 supersedes_id: None,
4582 }],
4583 steps: vec![StepInsert {
4584 id: "step-v1".to_owned(),
4585 run_id: "run-1".to_owned(),
4586 kind: "llm".to_owned(),
4587 status: "completed".to_owned(),
4588 properties: "{}".to_owned(),
4589 source_ref: Some("src-1".to_owned()),
4590 upsert: false,
4591 supersedes_id: None,
4592 }],
4593 actions: vec![],
4594 optional_backfills: vec![],
4595 vec_inserts: vec![],
4596 operational_writes: vec![],
4597 })
4598 .expect("v1 step write");
4599
4600 writer
4601 .submit(WriteRequest {
4602 label: "v2".to_owned(),
4603 nodes: vec![],
4604 node_retires: vec![],
4605 edges: vec![],
4606 edge_retires: vec![],
4607 chunks: vec![],
4608 runs: vec![],
4609 steps: vec![StepInsert {
4610 id: "step-v2".to_owned(),
4611 run_id: "run-1".to_owned(),
4612 kind: "llm".to_owned(),
4613 status: "completed".to_owned(),
4614 properties: "{}".to_owned(),
4615 source_ref: Some("src-2".to_owned()),
4616 upsert: true,
4617 supersedes_id: Some("step-v1".to_owned()),
4618 }],
4619 actions: vec![],
4620 optional_backfills: vec![],
4621 vec_inserts: vec![],
4622 operational_writes: vec![],
4623 })
4624 .expect("v2 step write");
4625
4626 let conn = rusqlite::Connection::open(db.path()).expect("open");
4627 let v1_historical: i64 = conn
4628 .query_row(
4629 "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4630 [],
4631 |r| r.get(0),
4632 )
4633 .expect("v1 historical count");
4634 let v2_active: i64 = conn
4635 .query_row(
4636 "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4637 [],
4638 |r| r.get(0),
4639 )
4640 .expect("v2 active count");
4641
4642 assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4643 assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4644 }
4645
4646 #[test]
4647 fn writer_action_upsert_supersedes_prior_active_action() {
4648 let db = NamedTempFile::new().expect("temporary db");
4649 let writer = WriterActor::start(
4650 db.path(),
4651 Arc::new(SchemaManager::new()),
4652 ProvenanceMode::Warn,
4653 Arc::new(TelemetryCounters::default()),
4654 )
4655 .expect("writer");
4656
4657 writer
4658 .submit(WriteRequest {
4659 label: "v1".to_owned(),
4660 nodes: vec![],
4661 node_retires: vec![],
4662 edges: vec![],
4663 edge_retires: vec![],
4664 chunks: vec![],
4665 runs: vec![RunInsert {
4666 id: "run-1".to_owned(),
4667 kind: "session".to_owned(),
4668 status: "completed".to_owned(),
4669 properties: "{}".to_owned(),
4670 source_ref: Some("src-1".to_owned()),
4671 upsert: false,
4672 supersedes_id: None,
4673 }],
4674 steps: vec![StepInsert {
4675 id: "step-1".to_owned(),
4676 run_id: "run-1".to_owned(),
4677 kind: "llm".to_owned(),
4678 status: "completed".to_owned(),
4679 properties: "{}".to_owned(),
4680 source_ref: Some("src-1".to_owned()),
4681 upsert: false,
4682 supersedes_id: None,
4683 }],
4684 actions: vec![ActionInsert {
4685 id: "action-v1".to_owned(),
4686 step_id: "step-1".to_owned(),
4687 kind: "emit".to_owned(),
4688 status: "completed".to_owned(),
4689 properties: "{}".to_owned(),
4690 source_ref: Some("src-1".to_owned()),
4691 upsert: false,
4692 supersedes_id: None,
4693 }],
4694 optional_backfills: vec![],
4695 vec_inserts: vec![],
4696 operational_writes: vec![],
4697 })
4698 .expect("v1 action write");
4699
4700 writer
4701 .submit(WriteRequest {
4702 label: "v2".to_owned(),
4703 nodes: vec![],
4704 node_retires: vec![],
4705 edges: vec![],
4706 edge_retires: vec![],
4707 chunks: vec![],
4708 runs: vec![],
4709 steps: vec![],
4710 actions: vec![ActionInsert {
4711 id: "action-v2".to_owned(),
4712 step_id: "step-1".to_owned(),
4713 kind: "emit".to_owned(),
4714 status: "completed".to_owned(),
4715 properties: "{}".to_owned(),
4716 source_ref: Some("src-2".to_owned()),
4717 upsert: true,
4718 supersedes_id: Some("action-v1".to_owned()),
4719 }],
4720 optional_backfills: vec![],
4721 vec_inserts: vec![],
4722 operational_writes: vec![],
4723 })
4724 .expect("v2 action write");
4725
4726 let conn = rusqlite::Connection::open(db.path()).expect("open");
4727 let v1_historical: i64 = conn
4728 .query_row(
4729 "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4730 [],
4731 |r| r.get(0),
4732 )
4733 .expect("v1 historical count");
4734 let v2_active: i64 = conn
4735 .query_row(
4736 "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4737 [],
4738 |r| r.get(0),
4739 )
4740 .expect("v2 active count");
4741
4742 assert_eq!(
4743 v1_historical, 1,
4744 "action-v1 must be historical after upsert"
4745 );
4746 assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4747 }
4748
4749 #[test]
4752 fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4753 let db = NamedTempFile::new().expect("temporary db");
4754 let writer = WriterActor::start(
4755 db.path(),
4756 Arc::new(SchemaManager::new()),
4757 ProvenanceMode::Warn,
4758 Arc::new(TelemetryCounters::default()),
4759 )
4760 .expect("writer");
4761
4762 let result = writer.submit(WriteRequest {
4763 label: "bad".to_owned(),
4764 nodes: vec![],
4765 node_retires: vec![],
4766 edges: vec![],
4767 edge_retires: vec![],
4768 chunks: vec![],
4769 runs: vec![RunInsert {
4770 id: "run-1".to_owned(),
4771 kind: "session".to_owned(),
4772 status: "completed".to_owned(),
4773 properties: "{}".to_owned(),
4774 source_ref: None,
4775 upsert: true,
4776 supersedes_id: None,
4777 }],
4778 steps: vec![],
4779 actions: vec![],
4780 optional_backfills: vec![],
4781 vec_inserts: vec![],
4782 operational_writes: vec![],
4783 });
4784
4785 assert!(
4786 matches!(result, Err(EngineError::InvalidWrite(_))),
4787 "run upsert=true without supersedes_id must return InvalidWrite"
4788 );
4789 }
4790
4791 #[test]
4792 fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4793 let db = NamedTempFile::new().expect("temporary db");
4794 let writer = WriterActor::start(
4795 db.path(),
4796 Arc::new(SchemaManager::new()),
4797 ProvenanceMode::Warn,
4798 Arc::new(TelemetryCounters::default()),
4799 )
4800 .expect("writer");
4801
4802 let result = writer.submit(WriteRequest {
4803 label: "bad".to_owned(),
4804 nodes: vec![],
4805 node_retires: vec![],
4806 edges: vec![],
4807 edge_retires: vec![],
4808 chunks: vec![],
4809 runs: vec![],
4810 steps: vec![StepInsert {
4811 id: "step-1".to_owned(),
4812 run_id: "run-1".to_owned(),
4813 kind: "llm".to_owned(),
4814 status: "completed".to_owned(),
4815 properties: "{}".to_owned(),
4816 source_ref: None,
4817 upsert: true,
4818 supersedes_id: None,
4819 }],
4820 actions: vec![],
4821 optional_backfills: vec![],
4822 vec_inserts: vec![],
4823 operational_writes: vec![],
4824 });
4825
4826 assert!(
4827 matches!(result, Err(EngineError::InvalidWrite(_))),
4828 "step upsert=true without supersedes_id must return InvalidWrite"
4829 );
4830 }
4831
4832 #[test]
4833 fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
4834 let db = NamedTempFile::new().expect("temporary db");
4835 let writer = WriterActor::start(
4836 db.path(),
4837 Arc::new(SchemaManager::new()),
4838 ProvenanceMode::Warn,
4839 Arc::new(TelemetryCounters::default()),
4840 )
4841 .expect("writer");
4842
4843 let result = writer.submit(WriteRequest {
4844 label: "bad".to_owned(),
4845 nodes: vec![],
4846 node_retires: vec![],
4847 edges: vec![],
4848 edge_retires: vec![],
4849 chunks: vec![],
4850 runs: vec![],
4851 steps: vec![],
4852 actions: vec![ActionInsert {
4853 id: "action-1".to_owned(),
4854 step_id: "step-1".to_owned(),
4855 kind: "emit".to_owned(),
4856 status: "completed".to_owned(),
4857 properties: "{}".to_owned(),
4858 source_ref: None,
4859 upsert: true,
4860 supersedes_id: None,
4861 }],
4862 optional_backfills: vec![],
4863 vec_inserts: vec![],
4864 operational_writes: vec![],
4865 });
4866
4867 assert!(
4868 matches!(result, Err(EngineError::InvalidWrite(_))),
4869 "action upsert=true without supersedes_id must return InvalidWrite"
4870 );
4871 }
4872
4873 #[test]
4876 fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
4877 let db = NamedTempFile::new().expect("temporary db");
4878 let writer = WriterActor::start(
4879 db.path(),
4880 Arc::new(SchemaManager::new()),
4881 ProvenanceMode::Warn,
4882 Arc::new(TelemetryCounters::default()),
4883 )
4884 .expect("writer");
4885
4886 let receipt = writer
4887 .submit(WriteRequest {
4888 label: "test".to_owned(),
4889 nodes: vec![
4890 NodeInsert {
4891 row_id: "row-a".to_owned(),
4892 logical_id: "node-a".to_owned(),
4893 kind: "Meeting".to_owned(),
4894 properties: "{}".to_owned(),
4895 source_ref: Some("src-1".to_owned()),
4896 upsert: false,
4897 chunk_policy: ChunkPolicy::Preserve,
4898 content_ref: None,
4899 },
4900 NodeInsert {
4901 row_id: "row-b".to_owned(),
4902 logical_id: "node-b".to_owned(),
4903 kind: "Task".to_owned(),
4904 properties: "{}".to_owned(),
4905 source_ref: Some("src-1".to_owned()),
4906 upsert: false,
4907 chunk_policy: ChunkPolicy::Preserve,
4908 content_ref: None,
4909 },
4910 ],
4911 node_retires: vec![],
4912 edges: vec![EdgeInsert {
4913 row_id: "edge-1".to_owned(),
4914 logical_id: "edge-lg-1".to_owned(),
4915 source_logical_id: "node-a".to_owned(),
4916 target_logical_id: "node-b".to_owned(),
4917 kind: "HAS_TASK".to_owned(),
4918 properties: "{}".to_owned(),
4919 source_ref: None,
4920 upsert: false,
4921 }],
4922 edge_retires: vec![],
4923 chunks: vec![],
4924 runs: vec![],
4925 steps: vec![],
4926 actions: vec![],
4927 optional_backfills: vec![],
4928 vec_inserts: vec![],
4929 operational_writes: vec![],
4930 })
4931 .expect("write");
4932
4933 assert!(
4934 !receipt.provenance_warnings.is_empty(),
4935 "edge insert without source_ref must emit a provenance warning"
4936 );
4937 }
4938
4939 #[test]
4940 fn writer_run_insert_without_source_ref_emits_provenance_warning() {
4941 let db = NamedTempFile::new().expect("temporary db");
4942 let writer = WriterActor::start(
4943 db.path(),
4944 Arc::new(SchemaManager::new()),
4945 ProvenanceMode::Warn,
4946 Arc::new(TelemetryCounters::default()),
4947 )
4948 .expect("writer");
4949
4950 let receipt = writer
4951 .submit(WriteRequest {
4952 label: "test".to_owned(),
4953 nodes: vec![],
4954 node_retires: vec![],
4955 edges: vec![],
4956 edge_retires: vec![],
4957 chunks: vec![],
4958 runs: vec![RunInsert {
4959 id: "run-1".to_owned(),
4960 kind: "session".to_owned(),
4961 status: "completed".to_owned(),
4962 properties: "{}".to_owned(),
4963 source_ref: None,
4964 upsert: false,
4965 supersedes_id: None,
4966 }],
4967 steps: vec![],
4968 actions: vec![],
4969 optional_backfills: vec![],
4970 vec_inserts: vec![],
4971 operational_writes: vec![],
4972 })
4973 .expect("write");
4974
4975 assert!(
4976 !receipt.provenance_warnings.is_empty(),
4977 "run insert without source_ref must emit a provenance warning"
4978 );
4979 }
4980
4981 #[test]
4984 fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
4985 let db = NamedTempFile::new().expect("temporary db");
4986 let writer = WriterActor::start(
4987 db.path(),
4988 Arc::new(SchemaManager::new()),
4989 ProvenanceMode::Warn,
4990 Arc::new(TelemetryCounters::default()),
4991 )
4992 .expect("writer");
4993
4994 writer
4996 .submit(WriteRequest {
4997 label: "seed".to_owned(),
4998 nodes: vec![NodeInsert {
4999 row_id: "row-1".to_owned(),
5000 logical_id: "meeting-1".to_owned(),
5001 kind: "Meeting".to_owned(),
5002 properties: "{}".to_owned(),
5003 source_ref: Some("src-1".to_owned()),
5004 upsert: false,
5005 chunk_policy: ChunkPolicy::Preserve,
5006 content_ref: None,
5007 }],
5008 node_retires: vec![],
5009 edges: vec![],
5010 edge_retires: vec![],
5011 chunks: vec![],
5012 runs: vec![],
5013 steps: vec![],
5014 actions: vec![],
5015 optional_backfills: vec![],
5016 vec_inserts: vec![],
5017 operational_writes: vec![],
5018 })
5019 .expect("seed write");
5020
5021 let result = writer.submit(WriteRequest {
5023 label: "bad".to_owned(),
5024 nodes: vec![],
5025 node_retires: vec![NodeRetire {
5026 logical_id: "meeting-1".to_owned(),
5027 source_ref: Some("src-2".to_owned()),
5028 }],
5029 edges: vec![],
5030 edge_retires: vec![],
5031 chunks: vec![ChunkInsert {
5032 id: "chunk-bad".to_owned(),
5033 node_logical_id: "meeting-1".to_owned(),
5034 text_content: "some text".to_owned(),
5035 byte_start: None,
5036 byte_end: None,
5037 content_hash: None,
5038 }],
5039 runs: vec![],
5040 steps: vec![],
5041 actions: vec![],
5042 optional_backfills: vec![],
5043 vec_inserts: vec![],
5044 operational_writes: vec![],
5045 });
5046
5047 assert!(
5048 matches!(result, Err(EngineError::InvalidWrite(_))),
5049 "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
5050 );
5051 }
5052
5053 #[test]
5056 fn writer_batch_insert_multiple_nodes() {
5057 let db = NamedTempFile::new().expect("temporary db");
5058 let writer = WriterActor::start(
5059 db.path(),
5060 Arc::new(SchemaManager::new()),
5061 ProvenanceMode::Warn,
5062 Arc::new(TelemetryCounters::default()),
5063 )
5064 .expect("writer");
5065
5066 let nodes: Vec<NodeInsert> = (0..100)
5067 .map(|i| NodeInsert {
5068 row_id: format!("row-{i}"),
5069 logical_id: format!("lg-{i}"),
5070 kind: "Note".to_owned(),
5071 properties: "{}".to_owned(),
5072 source_ref: Some("batch-src".to_owned()),
5073 upsert: false,
5074 chunk_policy: ChunkPolicy::Preserve,
5075 content_ref: None,
5076 })
5077 .collect();
5078
5079 writer
5080 .submit(WriteRequest {
5081 label: "batch".to_owned(),
5082 nodes,
5083 node_retires: vec![],
5084 edges: vec![],
5085 edge_retires: vec![],
5086 chunks: vec![],
5087 runs: vec![],
5088 steps: vec![],
5089 actions: vec![],
5090 optional_backfills: vec![],
5091 vec_inserts: vec![],
5092 operational_writes: vec![],
5093 })
5094 .expect("batch write");
5095
5096 let conn = rusqlite::Connection::open(db.path()).expect("open");
5097 let count: i64 = conn
5098 .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
5099 .expect("count nodes");
5100 assert_eq!(
5101 count, 100,
5102 "all 100 nodes must be present after batch insert"
5103 );
5104 }
5105
5106 #[test]
5109 fn prepare_write_rejects_empty_node_row_id() {
5110 let db = NamedTempFile::new().expect("temporary db");
5111 let writer = WriterActor::start(
5112 db.path(),
5113 Arc::new(SchemaManager::new()),
5114 ProvenanceMode::Warn,
5115 Arc::new(TelemetryCounters::default()),
5116 )
5117 .expect("writer");
5118
5119 let result = writer.submit(WriteRequest {
5120 label: "test".to_owned(),
5121 nodes: vec![NodeInsert {
5122 row_id: String::new(),
5123 logical_id: "lg-1".to_owned(),
5124 kind: "Note".to_owned(),
5125 properties: "{}".to_owned(),
5126 source_ref: None,
5127 upsert: false,
5128 chunk_policy: ChunkPolicy::Preserve,
5129 content_ref: None,
5130 }],
5131 node_retires: vec![],
5132 edges: vec![],
5133 edge_retires: vec![],
5134 chunks: vec![],
5135 runs: vec![],
5136 steps: vec![],
5137 actions: vec![],
5138 optional_backfills: vec![],
5139 vec_inserts: vec![],
5140 operational_writes: vec![],
5141 });
5142
5143 assert!(
5144 matches!(result, Err(EngineError::InvalidWrite(_))),
5145 "empty row_id must be rejected"
5146 );
5147 }
5148
5149 #[test]
5150 fn prepare_write_rejects_empty_node_logical_id() {
5151 let db = NamedTempFile::new().expect("temporary db");
5152 let writer = WriterActor::start(
5153 db.path(),
5154 Arc::new(SchemaManager::new()),
5155 ProvenanceMode::Warn,
5156 Arc::new(TelemetryCounters::default()),
5157 )
5158 .expect("writer");
5159
5160 let result = writer.submit(WriteRequest {
5161 label: "test".to_owned(),
5162 nodes: vec![NodeInsert {
5163 row_id: "row-1".to_owned(),
5164 logical_id: String::new(),
5165 kind: "Note".to_owned(),
5166 properties: "{}".to_owned(),
5167 source_ref: None,
5168 upsert: false,
5169 chunk_policy: ChunkPolicy::Preserve,
5170 content_ref: None,
5171 }],
5172 node_retires: vec![],
5173 edges: vec![],
5174 edge_retires: vec![],
5175 chunks: vec![],
5176 runs: vec![],
5177 steps: vec![],
5178 actions: vec![],
5179 optional_backfills: vec![],
5180 vec_inserts: vec![],
5181 operational_writes: vec![],
5182 });
5183
5184 assert!(
5185 matches!(result, Err(EngineError::InvalidWrite(_))),
5186 "empty logical_id must be rejected"
5187 );
5188 }
5189
5190 #[test]
5191 fn prepare_write_rejects_duplicate_row_ids_in_request() {
5192 let db = NamedTempFile::new().expect("temporary db");
5193 let writer = WriterActor::start(
5194 db.path(),
5195 Arc::new(SchemaManager::new()),
5196 ProvenanceMode::Warn,
5197 Arc::new(TelemetryCounters::default()),
5198 )
5199 .expect("writer");
5200
5201 let result = writer.submit(WriteRequest {
5202 label: "test".to_owned(),
5203 nodes: vec![
5204 NodeInsert {
5205 row_id: "row-1".to_owned(),
5206 logical_id: "lg-1".to_owned(),
5207 kind: "Note".to_owned(),
5208 properties: "{}".to_owned(),
5209 source_ref: None,
5210 upsert: false,
5211 chunk_policy: ChunkPolicy::Preserve,
5212 content_ref: None,
5213 },
5214 NodeInsert {
5215 row_id: "row-1".to_owned(), logical_id: "lg-2".to_owned(),
5217 kind: "Note".to_owned(),
5218 properties: "{}".to_owned(),
5219 source_ref: None,
5220 upsert: false,
5221 chunk_policy: ChunkPolicy::Preserve,
5222 content_ref: None,
5223 },
5224 ],
5225 node_retires: vec![],
5226 edges: vec![],
5227 edge_retires: vec![],
5228 chunks: vec![],
5229 runs: vec![],
5230 steps: vec![],
5231 actions: vec![],
5232 optional_backfills: vec![],
5233 vec_inserts: vec![],
5234 operational_writes: vec![],
5235 });
5236
5237 assert!(
5238 matches!(result, Err(EngineError::InvalidWrite(_))),
5239 "duplicate row_id within request must be rejected"
5240 );
5241 }
5242
5243 #[test]
5244 fn prepare_write_rejects_empty_chunk_id() {
5245 let db = NamedTempFile::new().expect("temporary db");
5246 let writer = WriterActor::start(
5247 db.path(),
5248 Arc::new(SchemaManager::new()),
5249 ProvenanceMode::Warn,
5250 Arc::new(TelemetryCounters::default()),
5251 )
5252 .expect("writer");
5253
5254 let result = writer.submit(WriteRequest {
5255 label: "test".to_owned(),
5256 nodes: vec![NodeInsert {
5257 row_id: "row-1".to_owned(),
5258 logical_id: "lg-1".to_owned(),
5259 kind: "Note".to_owned(),
5260 properties: "{}".to_owned(),
5261 source_ref: None,
5262 upsert: false,
5263 chunk_policy: ChunkPolicy::Preserve,
5264 content_ref: None,
5265 }],
5266 node_retires: vec![],
5267 edges: vec![],
5268 edge_retires: vec![],
5269 chunks: vec![ChunkInsert {
5270 id: String::new(),
5271 node_logical_id: "lg-1".to_owned(),
5272 text_content: "some text".to_owned(),
5273 byte_start: None,
5274 byte_end: None,
5275 content_hash: None,
5276 }],
5277 runs: vec![],
5278 steps: vec![],
5279 actions: vec![],
5280 optional_backfills: vec![],
5281 vec_inserts: vec![],
5282 operational_writes: vec![],
5283 });
5284
5285 assert!(
5286 matches!(result, Err(EngineError::InvalidWrite(_))),
5287 "empty chunk id must be rejected"
5288 );
5289 }
5290
5291 #[test]
5294 fn writer_receipt_warns_on_step_without_source_ref() {
5295 let db = NamedTempFile::new().expect("temporary db");
5296 let writer = WriterActor::start(
5297 db.path(),
5298 Arc::new(SchemaManager::new()),
5299 ProvenanceMode::Warn,
5300 Arc::new(TelemetryCounters::default()),
5301 )
5302 .expect("writer");
5303
5304 writer
5306 .submit(WriteRequest {
5307 label: "seed-run".to_owned(),
5308 nodes: vec![],
5309 node_retires: vec![],
5310 edges: vec![],
5311 edge_retires: vec![],
5312 chunks: vec![],
5313 runs: vec![RunInsert {
5314 id: "run-1".to_owned(),
5315 kind: "session".to_owned(),
5316 status: "active".to_owned(),
5317 properties: "{}".to_owned(),
5318 source_ref: Some("src-1".to_owned()),
5319 upsert: false,
5320 supersedes_id: None,
5321 }],
5322 steps: vec![],
5323 actions: vec![],
5324 optional_backfills: vec![],
5325 vec_inserts: vec![],
5326 operational_writes: vec![],
5327 })
5328 .expect("seed run");
5329
5330 let receipt = writer
5331 .submit(WriteRequest {
5332 label: "test".to_owned(),
5333 nodes: vec![],
5334 node_retires: vec![],
5335 edges: vec![],
5336 edge_retires: vec![],
5337 chunks: vec![],
5338 runs: vec![],
5339 steps: vec![StepInsert {
5340 id: "step-1".to_owned(),
5341 run_id: "run-1".to_owned(),
5342 kind: "llm_call".to_owned(),
5343 status: "completed".to_owned(),
5344 properties: "{}".to_owned(),
5345 source_ref: None,
5346 upsert: false,
5347 supersedes_id: None,
5348 }],
5349 actions: vec![],
5350 optional_backfills: vec![],
5351 vec_inserts: vec![],
5352 operational_writes: vec![],
5353 })
5354 .expect("write");
5355
5356 assert!(
5357 !receipt.provenance_warnings.is_empty(),
5358 "step insert without source_ref must emit a provenance warning"
5359 );
5360 }
5361
5362 #[test]
5363 fn writer_receipt_warns_on_action_without_source_ref() {
5364 let db = NamedTempFile::new().expect("temporary db");
5365 let writer = WriterActor::start(
5366 db.path(),
5367 Arc::new(SchemaManager::new()),
5368 ProvenanceMode::Warn,
5369 Arc::new(TelemetryCounters::default()),
5370 )
5371 .expect("writer");
5372
5373 writer
5375 .submit(WriteRequest {
5376 label: "seed".to_owned(),
5377 nodes: vec![],
5378 node_retires: vec![],
5379 edges: vec![],
5380 edge_retires: vec![],
5381 chunks: vec![],
5382 runs: vec![RunInsert {
5383 id: "run-1".to_owned(),
5384 kind: "session".to_owned(),
5385 status: "active".to_owned(),
5386 properties: "{}".to_owned(),
5387 source_ref: Some("src-1".to_owned()),
5388 upsert: false,
5389 supersedes_id: None,
5390 }],
5391 steps: vec![StepInsert {
5392 id: "step-1".to_owned(),
5393 run_id: "run-1".to_owned(),
5394 kind: "llm_call".to_owned(),
5395 status: "completed".to_owned(),
5396 properties: "{}".to_owned(),
5397 source_ref: Some("src-1".to_owned()),
5398 upsert: false,
5399 supersedes_id: None,
5400 }],
5401 actions: vec![],
5402 optional_backfills: vec![],
5403 vec_inserts: vec![],
5404 operational_writes: vec![],
5405 })
5406 .expect("seed");
5407
5408 let receipt = writer
5409 .submit(WriteRequest {
5410 label: "test".to_owned(),
5411 nodes: vec![],
5412 node_retires: vec![],
5413 edges: vec![],
5414 edge_retires: vec![],
5415 chunks: vec![],
5416 runs: vec![],
5417 steps: vec![],
5418 actions: vec![ActionInsert {
5419 id: "action-1".to_owned(),
5420 step_id: "step-1".to_owned(),
5421 kind: "tool_call".to_owned(),
5422 status: "completed".to_owned(),
5423 properties: "{}".to_owned(),
5424 source_ref: None,
5425 upsert: false,
5426 supersedes_id: None,
5427 }],
5428 optional_backfills: vec![],
5429 vec_inserts: vec![],
5430 operational_writes: vec![],
5431 })
5432 .expect("write");
5433
5434 assert!(
5435 !receipt.provenance_warnings.is_empty(),
5436 "action insert without source_ref must emit a provenance warning"
5437 );
5438 }
5439
5440 #[test]
5441 fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5442 let db = NamedTempFile::new().expect("temporary db");
5443 let writer = WriterActor::start(
5444 db.path(),
5445 Arc::new(SchemaManager::new()),
5446 ProvenanceMode::Warn,
5447 Arc::new(TelemetryCounters::default()),
5448 )
5449 .expect("writer");
5450
5451 let receipt = writer
5452 .submit(WriteRequest {
5453 label: "test".to_owned(),
5454 nodes: vec![NodeInsert {
5455 row_id: "row-1".to_owned(),
5456 logical_id: "node-1".to_owned(),
5457 kind: "Note".to_owned(),
5458 properties: "{}".to_owned(),
5459 source_ref: Some("src-1".to_owned()),
5460 upsert: false,
5461 chunk_policy: ChunkPolicy::Preserve,
5462 content_ref: None,
5463 }],
5464 node_retires: vec![],
5465 edges: vec![],
5466 edge_retires: vec![],
5467 chunks: vec![],
5468 runs: vec![RunInsert {
5469 id: "run-1".to_owned(),
5470 kind: "session".to_owned(),
5471 status: "active".to_owned(),
5472 properties: "{}".to_owned(),
5473 source_ref: Some("src-1".to_owned()),
5474 upsert: false,
5475 supersedes_id: None,
5476 }],
5477 steps: vec![StepInsert {
5478 id: "step-1".to_owned(),
5479 run_id: "run-1".to_owned(),
5480 kind: "llm_call".to_owned(),
5481 status: "completed".to_owned(),
5482 properties: "{}".to_owned(),
5483 source_ref: Some("src-1".to_owned()),
5484 upsert: false,
5485 supersedes_id: None,
5486 }],
5487 actions: vec![ActionInsert {
5488 id: "action-1".to_owned(),
5489 step_id: "step-1".to_owned(),
5490 kind: "tool_call".to_owned(),
5491 status: "completed".to_owned(),
5492 properties: "{}".to_owned(),
5493 source_ref: Some("src-1".to_owned()),
5494 upsert: false,
5495 supersedes_id: None,
5496 }],
5497 optional_backfills: vec![],
5498 vec_inserts: vec![],
5499 operational_writes: vec![],
5500 })
5501 .expect("write");
5502
5503 assert!(
5504 receipt.provenance_warnings.is_empty(),
5505 "no warnings expected when all types have source_ref; got: {:?}",
5506 receipt.provenance_warnings
5507 );
5508 }
5509
5510 #[test]
5513 fn default_provenance_mode_is_warn() {
5514 let db = NamedTempFile::new().expect("temporary db");
5516 let writer = WriterActor::start(
5517 db.path(),
5518 Arc::new(SchemaManager::new()),
5519 ProvenanceMode::default(),
5520 Arc::new(TelemetryCounters::default()),
5521 )
5522 .expect("writer");
5523
5524 let receipt = writer
5525 .submit(WriteRequest {
5526 label: "test".to_owned(),
5527 nodes: vec![NodeInsert {
5528 row_id: "row-1".to_owned(),
5529 logical_id: "node-1".to_owned(),
5530 kind: "Note".to_owned(),
5531 properties: "{}".to_owned(),
5532 source_ref: None,
5533 upsert: false,
5534 chunk_policy: ChunkPolicy::Preserve,
5535 content_ref: None,
5536 }],
5537 node_retires: vec![],
5538 edges: vec![],
5539 edge_retires: vec![],
5540 chunks: vec![],
5541 runs: vec![],
5542 steps: vec![],
5543 actions: vec![],
5544 optional_backfills: vec![],
5545 vec_inserts: vec![],
5546 operational_writes: vec![],
5547 })
5548 .expect("Warn mode must not reject missing source_ref");
5549
5550 assert!(
5551 !receipt.provenance_warnings.is_empty(),
5552 "Warn mode must emit a warning instead of rejecting"
5553 );
5554 }
5555
5556 #[test]
5557 fn require_mode_rejects_node_without_source_ref() {
5558 let db = NamedTempFile::new().expect("temporary db");
5559 let writer = WriterActor::start(
5560 db.path(),
5561 Arc::new(SchemaManager::new()),
5562 ProvenanceMode::Require,
5563 Arc::new(TelemetryCounters::default()),
5564 )
5565 .expect("writer");
5566
5567 let result = writer.submit(WriteRequest {
5568 label: "test".to_owned(),
5569 nodes: vec![NodeInsert {
5570 row_id: "row-1".to_owned(),
5571 logical_id: "node-1".to_owned(),
5572 kind: "Note".to_owned(),
5573 properties: "{}".to_owned(),
5574 source_ref: None,
5575 upsert: false,
5576 chunk_policy: ChunkPolicy::Preserve,
5577 content_ref: None,
5578 }],
5579 node_retires: vec![],
5580 edges: vec![],
5581 edge_retires: vec![],
5582 chunks: vec![],
5583 runs: vec![],
5584 steps: vec![],
5585 actions: vec![],
5586 optional_backfills: vec![],
5587 vec_inserts: vec![],
5588 operational_writes: vec![],
5589 });
5590
5591 assert!(
5592 matches!(result, Err(EngineError::InvalidWrite(_))),
5593 "Require mode must reject node without source_ref"
5594 );
5595 }
5596
5597 #[test]
5598 fn require_mode_accepts_node_with_source_ref() {
5599 let db = NamedTempFile::new().expect("temporary db");
5600 let writer = WriterActor::start(
5601 db.path(),
5602 Arc::new(SchemaManager::new()),
5603 ProvenanceMode::Require,
5604 Arc::new(TelemetryCounters::default()),
5605 )
5606 .expect("writer");
5607
5608 let result = writer.submit(WriteRequest {
5609 label: "test".to_owned(),
5610 nodes: vec![NodeInsert {
5611 row_id: "row-1".to_owned(),
5612 logical_id: "node-1".to_owned(),
5613 kind: "Note".to_owned(),
5614 properties: "{}".to_owned(),
5615 source_ref: Some("src-1".to_owned()),
5616 upsert: false,
5617 chunk_policy: ChunkPolicy::Preserve,
5618 content_ref: None,
5619 }],
5620 node_retires: vec![],
5621 edges: vec![],
5622 edge_retires: vec![],
5623 chunks: vec![],
5624 runs: vec![],
5625 steps: vec![],
5626 actions: vec![],
5627 optional_backfills: vec![],
5628 vec_inserts: vec![],
5629 operational_writes: vec![],
5630 });
5631
5632 assert!(
5633 result.is_ok(),
5634 "Require mode must accept node with source_ref"
5635 );
5636 }
5637
5638 #[test]
5639 fn require_mode_rejects_edge_without_source_ref() {
5640 let db = NamedTempFile::new().expect("temporary db");
5641 let writer = WriterActor::start(
5642 db.path(),
5643 Arc::new(SchemaManager::new()),
5644 ProvenanceMode::Require,
5645 Arc::new(TelemetryCounters::default()),
5646 )
5647 .expect("writer");
5648
5649 let result = writer.submit(WriteRequest {
5651 label: "test".to_owned(),
5652 nodes: vec![
5653 NodeInsert {
5654 row_id: "row-a".to_owned(),
5655 logical_id: "node-a".to_owned(),
5656 kind: "Note".to_owned(),
5657 properties: "{}".to_owned(),
5658 source_ref: Some("src-1".to_owned()),
5659 upsert: false,
5660 chunk_policy: ChunkPolicy::Preserve,
5661 content_ref: None,
5662 },
5663 NodeInsert {
5664 row_id: "row-b".to_owned(),
5665 logical_id: "node-b".to_owned(),
5666 kind: "Note".to_owned(),
5667 properties: "{}".to_owned(),
5668 source_ref: Some("src-1".to_owned()),
5669 upsert: false,
5670 chunk_policy: ChunkPolicy::Preserve,
5671 content_ref: None,
5672 },
5673 ],
5674 node_retires: vec![],
5675 edges: vec![EdgeInsert {
5676 row_id: "edge-row-1".to_owned(),
5677 logical_id: "edge-1".to_owned(),
5678 source_logical_id: "node-a".to_owned(),
5679 target_logical_id: "node-b".to_owned(),
5680 kind: "LINKS_TO".to_owned(),
5681 properties: "{}".to_owned(),
5682 source_ref: None,
5683 upsert: false,
5684 }],
5685 edge_retires: vec![],
5686 chunks: vec![],
5687 runs: vec![],
5688 steps: vec![],
5689 actions: vec![],
5690 optional_backfills: vec![],
5691 vec_inserts: vec![],
5692 operational_writes: vec![],
5693 });
5694
5695 assert!(
5696 matches!(result, Err(EngineError::InvalidWrite(_))),
5697 "Require mode must reject edge without source_ref"
5698 );
5699 }
5700
5701 #[test]
5704 fn fts_row_has_correct_kind_from_co_submitted_node() {
5705 let db = NamedTempFile::new().expect("temporary db");
5706 let writer = WriterActor::start(
5707 db.path(),
5708 Arc::new(SchemaManager::new()),
5709 ProvenanceMode::Warn,
5710 Arc::new(TelemetryCounters::default()),
5711 )
5712 .expect("writer");
5713
5714 writer
5715 .submit(WriteRequest {
5716 label: "test".to_owned(),
5717 nodes: vec![NodeInsert {
5718 row_id: "row-1".to_owned(),
5719 logical_id: "node-1".to_owned(),
5720 kind: "Meeting".to_owned(),
5721 properties: "{}".to_owned(),
5722 source_ref: Some("src-1".to_owned()),
5723 upsert: false,
5724 chunk_policy: ChunkPolicy::Preserve,
5725 content_ref: None,
5726 }],
5727 node_retires: vec![],
5728 edges: vec![],
5729 edge_retires: vec![],
5730 chunks: vec![ChunkInsert {
5731 id: "chunk-1".to_owned(),
5732 node_logical_id: "node-1".to_owned(),
5733 text_content: "some text".to_owned(),
5734 byte_start: None,
5735 byte_end: None,
5736 content_hash: None,
5737 }],
5738 runs: vec![],
5739 steps: vec![],
5740 actions: vec![],
5741 optional_backfills: vec![],
5742 vec_inserts: vec![],
5743 operational_writes: vec![],
5744 })
5745 .expect("write");
5746
5747 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5748 let kind: String = conn
5749 .query_row(
5750 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5751 [],
5752 |row| row.get(0),
5753 )
5754 .expect("fts row");
5755
5756 assert_eq!(kind, "Meeting");
5757 }
5758
5759 #[test]
5760 fn fts_row_has_correct_text_content() {
5761 let db = NamedTempFile::new().expect("temporary db");
5762 let writer = WriterActor::start(
5763 db.path(),
5764 Arc::new(SchemaManager::new()),
5765 ProvenanceMode::Warn,
5766 Arc::new(TelemetryCounters::default()),
5767 )
5768 .expect("writer");
5769
5770 writer
5771 .submit(WriteRequest {
5772 label: "test".to_owned(),
5773 nodes: vec![NodeInsert {
5774 row_id: "row-1".to_owned(),
5775 logical_id: "node-1".to_owned(),
5776 kind: "Note".to_owned(),
5777 properties: "{}".to_owned(),
5778 source_ref: Some("src-1".to_owned()),
5779 upsert: false,
5780 chunk_policy: ChunkPolicy::Preserve,
5781 content_ref: None,
5782 }],
5783 node_retires: vec![],
5784 edges: vec![],
5785 edge_retires: vec![],
5786 chunks: vec![ChunkInsert {
5787 id: "chunk-1".to_owned(),
5788 node_logical_id: "node-1".to_owned(),
5789 text_content: "exactly this text".to_owned(),
5790 byte_start: None,
5791 byte_end: None,
5792 content_hash: None,
5793 }],
5794 runs: vec![],
5795 steps: vec![],
5796 actions: vec![],
5797 optional_backfills: vec![],
5798 vec_inserts: vec![],
5799 operational_writes: vec![],
5800 })
5801 .expect("write");
5802
5803 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5804 let text: String = conn
5805 .query_row(
5806 "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5807 [],
5808 |row| row.get(0),
5809 )
5810 .expect("fts row");
5811
5812 assert_eq!(text, "exactly this text");
5813 }
5814
5815 #[test]
5816 fn fts_row_has_correct_kind_from_pre_existing_node() {
5817 let db = NamedTempFile::new().expect("temporary db");
5818 let writer = WriterActor::start(
5819 db.path(),
5820 Arc::new(SchemaManager::new()),
5821 ProvenanceMode::Warn,
5822 Arc::new(TelemetryCounters::default()),
5823 )
5824 .expect("writer");
5825
5826 writer
5828 .submit(WriteRequest {
5829 label: "r1".to_owned(),
5830 nodes: vec![NodeInsert {
5831 row_id: "row-1".to_owned(),
5832 logical_id: "node-1".to_owned(),
5833 kind: "Document".to_owned(),
5834 properties: "{}".to_owned(),
5835 source_ref: Some("src-1".to_owned()),
5836 upsert: false,
5837 chunk_policy: ChunkPolicy::Preserve,
5838 content_ref: None,
5839 }],
5840 node_retires: vec![],
5841 edges: vec![],
5842 edge_retires: vec![],
5843 chunks: vec![],
5844 runs: vec![],
5845 steps: vec![],
5846 actions: vec![],
5847 optional_backfills: vec![],
5848 vec_inserts: vec![],
5849 operational_writes: vec![],
5850 })
5851 .expect("r1 write");
5852
5853 writer
5855 .submit(WriteRequest {
5856 label: "r2".to_owned(),
5857 nodes: vec![],
5858 node_retires: vec![],
5859 edges: vec![],
5860 edge_retires: vec![],
5861 chunks: vec![ChunkInsert {
5862 id: "chunk-1".to_owned(),
5863 node_logical_id: "node-1".to_owned(),
5864 text_content: "some text".to_owned(),
5865 byte_start: None,
5866 byte_end: None,
5867 content_hash: None,
5868 }],
5869 runs: vec![],
5870 steps: vec![],
5871 actions: vec![],
5872 optional_backfills: vec![],
5873 vec_inserts: vec![],
5874 operational_writes: vec![],
5875 })
5876 .expect("r2 write");
5877
5878 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5879 let kind: String = conn
5880 .query_row(
5881 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5882 [],
5883 |row| row.get(0),
5884 )
5885 .expect("fts row");
5886
5887 assert_eq!(kind, "Document");
5888 }
5889
5890 #[test]
5891 fn fts_derives_rows_for_multiple_chunks_per_node() {
5892 let db = NamedTempFile::new().expect("temporary db");
5893 let writer = WriterActor::start(
5894 db.path(),
5895 Arc::new(SchemaManager::new()),
5896 ProvenanceMode::Warn,
5897 Arc::new(TelemetryCounters::default()),
5898 )
5899 .expect("writer");
5900
5901 writer
5902 .submit(WriteRequest {
5903 label: "test".to_owned(),
5904 nodes: vec![NodeInsert {
5905 row_id: "row-1".to_owned(),
5906 logical_id: "node-1".to_owned(),
5907 kind: "Meeting".to_owned(),
5908 properties: "{}".to_owned(),
5909 source_ref: Some("src-1".to_owned()),
5910 upsert: false,
5911 chunk_policy: ChunkPolicy::Preserve,
5912 content_ref: None,
5913 }],
5914 node_retires: vec![],
5915 edges: vec![],
5916 edge_retires: vec![],
5917 chunks: vec![
5918 ChunkInsert {
5919 id: "chunk-a".to_owned(),
5920 node_logical_id: "node-1".to_owned(),
5921 text_content: "intro".to_owned(),
5922 byte_start: None,
5923 byte_end: None,
5924 content_hash: None,
5925 },
5926 ChunkInsert {
5927 id: "chunk-b".to_owned(),
5928 node_logical_id: "node-1".to_owned(),
5929 text_content: "body".to_owned(),
5930 byte_start: None,
5931 byte_end: None,
5932 content_hash: None,
5933 },
5934 ChunkInsert {
5935 id: "chunk-c".to_owned(),
5936 node_logical_id: "node-1".to_owned(),
5937 text_content: "conclusion".to_owned(),
5938 byte_start: None,
5939 byte_end: None,
5940 content_hash: None,
5941 },
5942 ],
5943 runs: vec![],
5944 steps: vec![],
5945 actions: vec![],
5946 optional_backfills: vec![],
5947 vec_inserts: vec![],
5948 operational_writes: vec![],
5949 })
5950 .expect("write");
5951
5952 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5953 let count: i64 = conn
5954 .query_row(
5955 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5956 [],
5957 |row| row.get(0),
5958 )
5959 .expect("fts count");
5960
5961 assert_eq!(count, 3, "three chunks must produce three FTS rows");
5962 }
5963
5964 #[test]
5965 fn fts_resolves_mixed_fast_and_db_paths() {
5966 let db = NamedTempFile::new().expect("temporary db");
5967 let writer = WriterActor::start(
5968 db.path(),
5969 Arc::new(SchemaManager::new()),
5970 ProvenanceMode::Warn,
5971 Arc::new(TelemetryCounters::default()),
5972 )
5973 .expect("writer");
5974
5975 writer
5977 .submit(WriteRequest {
5978 label: "seed".to_owned(),
5979 nodes: vec![NodeInsert {
5980 row_id: "row-existing".to_owned(),
5981 logical_id: "existing-node".to_owned(),
5982 kind: "Archive".to_owned(),
5983 properties: "{}".to_owned(),
5984 source_ref: Some("src-1".to_owned()),
5985 upsert: false,
5986 chunk_policy: ChunkPolicy::Preserve,
5987 content_ref: None,
5988 }],
5989 node_retires: vec![],
5990 edges: vec![],
5991 edge_retires: vec![],
5992 chunks: vec![],
5993 runs: vec![],
5994 steps: vec![],
5995 actions: vec![],
5996 optional_backfills: vec![],
5997 vec_inserts: vec![],
5998 operational_writes: vec![],
5999 })
6000 .expect("seed");
6001
6002 writer
6004 .submit(WriteRequest {
6005 label: "mixed".to_owned(),
6006 nodes: vec![NodeInsert {
6007 row_id: "row-new".to_owned(),
6008 logical_id: "new-node".to_owned(),
6009 kind: "Inbox".to_owned(),
6010 properties: "{}".to_owned(),
6011 source_ref: Some("src-2".to_owned()),
6012 upsert: false,
6013 chunk_policy: ChunkPolicy::Preserve,
6014 content_ref: None,
6015 }],
6016 node_retires: vec![],
6017 edges: vec![],
6018 edge_retires: vec![],
6019 chunks: vec![
6020 ChunkInsert {
6021 id: "chunk-fast".to_owned(),
6022 node_logical_id: "new-node".to_owned(),
6023 text_content: "new content".to_owned(),
6024 byte_start: None,
6025 byte_end: None,
6026 content_hash: None,
6027 },
6028 ChunkInsert {
6029 id: "chunk-db".to_owned(),
6030 node_logical_id: "existing-node".to_owned(),
6031 text_content: "archive content".to_owned(),
6032 byte_start: None,
6033 byte_end: None,
6034 content_hash: None,
6035 },
6036 ],
6037 runs: vec![],
6038 steps: vec![],
6039 actions: vec![],
6040 optional_backfills: vec![],
6041 vec_inserts: vec![],
6042 operational_writes: vec![],
6043 })
6044 .expect("mixed write");
6045
6046 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6047 let fast_kind: String = conn
6048 .query_row(
6049 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
6050 [],
6051 |row| row.get(0),
6052 )
6053 .expect("fast path fts row");
6054 let db_kind: String = conn
6055 .query_row(
6056 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
6057 [],
6058 |row| row.get(0),
6059 )
6060 .expect("db path fts row");
6061
6062 assert_eq!(fast_kind, "Inbox");
6063 assert_eq!(db_kind, "Archive");
6064 }
6065
6066 #[test]
6067 fn prepare_write_rejects_empty_chunk_text() {
6068 let db = NamedTempFile::new().expect("temporary db");
6069 let writer = WriterActor::start(
6070 db.path(),
6071 Arc::new(SchemaManager::new()),
6072 ProvenanceMode::Warn,
6073 Arc::new(TelemetryCounters::default()),
6074 )
6075 .expect("writer");
6076
6077 let result = writer.submit(WriteRequest {
6078 label: "test".to_owned(),
6079 nodes: vec![NodeInsert {
6080 row_id: "row-1".to_owned(),
6081 logical_id: "node-1".to_owned(),
6082 kind: "Note".to_owned(),
6083 properties: "{}".to_owned(),
6084 source_ref: None,
6085 upsert: false,
6086 chunk_policy: ChunkPolicy::Preserve,
6087 content_ref: None,
6088 }],
6089 node_retires: vec![],
6090 edges: vec![],
6091 edge_retires: vec![],
6092 chunks: vec![ChunkInsert {
6093 id: "chunk-1".to_owned(),
6094 node_logical_id: "node-1".to_owned(),
6095 text_content: String::new(),
6096 byte_start: None,
6097 byte_end: None,
6098 content_hash: None,
6099 }],
6100 runs: vec![],
6101 steps: vec![],
6102 actions: vec![],
6103 optional_backfills: vec![],
6104 vec_inserts: vec![],
6105 operational_writes: vec![],
6106 });
6107
6108 assert!(
6109 matches!(result, Err(EngineError::InvalidWrite(_))),
6110 "empty text_content must be rejected"
6111 );
6112 }
6113
6114 #[test]
6115 fn receipt_reports_zero_backfills_when_none_submitted() {
6116 let db = NamedTempFile::new().expect("temporary db");
6117 let writer = WriterActor::start(
6118 db.path(),
6119 Arc::new(SchemaManager::new()),
6120 ProvenanceMode::Warn,
6121 Arc::new(TelemetryCounters::default()),
6122 )
6123 .expect("writer");
6124
6125 let receipt = writer
6126 .submit(WriteRequest {
6127 label: "test".to_owned(),
6128 nodes: vec![NodeInsert {
6129 row_id: "row-1".to_owned(),
6130 logical_id: "node-1".to_owned(),
6131 kind: "Note".to_owned(),
6132 properties: "{}".to_owned(),
6133 source_ref: Some("src-1".to_owned()),
6134 upsert: false,
6135 chunk_policy: ChunkPolicy::Preserve,
6136 content_ref: None,
6137 }],
6138 node_retires: vec![],
6139 edges: vec![],
6140 edge_retires: vec![],
6141 chunks: vec![],
6142 runs: vec![],
6143 steps: vec![],
6144 actions: vec![],
6145 optional_backfills: vec![],
6146 vec_inserts: vec![],
6147 operational_writes: vec![],
6148 })
6149 .expect("write");
6150
6151 assert_eq!(receipt.optional_backfill_count, 0);
6152 }
6153
6154 #[test]
6155 fn receipt_reports_correct_backfill_count() {
6156 let db = NamedTempFile::new().expect("temporary db");
6157 let writer = WriterActor::start(
6158 db.path(),
6159 Arc::new(SchemaManager::new()),
6160 ProvenanceMode::Warn,
6161 Arc::new(TelemetryCounters::default()),
6162 )
6163 .expect("writer");
6164
6165 let receipt = writer
6166 .submit(WriteRequest {
6167 label: "test".to_owned(),
6168 nodes: vec![NodeInsert {
6169 row_id: "row-1".to_owned(),
6170 logical_id: "node-1".to_owned(),
6171 kind: "Note".to_owned(),
6172 properties: "{}".to_owned(),
6173 source_ref: Some("src-1".to_owned()),
6174 upsert: false,
6175 chunk_policy: ChunkPolicy::Preserve,
6176 content_ref: None,
6177 }],
6178 node_retires: vec![],
6179 edges: vec![],
6180 edge_retires: vec![],
6181 chunks: vec![],
6182 runs: vec![],
6183 steps: vec![],
6184 actions: vec![],
6185 optional_backfills: vec![
6186 OptionalProjectionTask {
6187 target: ProjectionTarget::Fts,
6188 payload: "p1".to_owned(),
6189 },
6190 OptionalProjectionTask {
6191 target: ProjectionTarget::Vec,
6192 payload: "p2".to_owned(),
6193 },
6194 OptionalProjectionTask {
6195 target: ProjectionTarget::All,
6196 payload: "p3".to_owned(),
6197 },
6198 ],
6199 vec_inserts: vec![],
6200 operational_writes: vec![],
6201 })
6202 .expect("write");
6203
6204 assert_eq!(receipt.optional_backfill_count, 3);
6205 }
6206
6207 #[test]
6208 fn backfill_tasks_are_not_executed_during_write() {
6209 let db = NamedTempFile::new().expect("temporary db");
6210 let writer = WriterActor::start(
6211 db.path(),
6212 Arc::new(SchemaManager::new()),
6213 ProvenanceMode::Warn,
6214 Arc::new(TelemetryCounters::default()),
6215 )
6216 .expect("writer");
6217
6218 writer
6221 .submit(WriteRequest {
6222 label: "test".to_owned(),
6223 nodes: vec![NodeInsert {
6224 row_id: "row-1".to_owned(),
6225 logical_id: "node-1".to_owned(),
6226 kind: "Note".to_owned(),
6227 properties: "{}".to_owned(),
6228 source_ref: Some("src-1".to_owned()),
6229 upsert: false,
6230 chunk_policy: ChunkPolicy::Preserve,
6231 content_ref: None,
6232 }],
6233 node_retires: vec![],
6234 edges: vec![],
6235 edge_retires: vec![],
6236 chunks: vec![ChunkInsert {
6237 id: "chunk-1".to_owned(),
6238 node_logical_id: "node-1".to_owned(),
6239 text_content: "required text".to_owned(),
6240 byte_start: None,
6241 byte_end: None,
6242 content_hash: None,
6243 }],
6244 runs: vec![],
6245 steps: vec![],
6246 actions: vec![],
6247 optional_backfills: vec![OptionalProjectionTask {
6248 target: ProjectionTarget::Fts,
6249 payload: "backfill-payload".to_owned(),
6250 }],
6251 vec_inserts: vec![],
6252 operational_writes: vec![],
6253 })
6254 .expect("write");
6255
6256 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6257 let count: i64 = conn
6258 .query_row(
6259 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6260 [],
6261 |row| row.get(0),
6262 )
6263 .expect("fts count");
6264
6265 assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6266 }
6267
6268 #[test]
6269 fn fts_row_uses_new_kind_after_node_replace() {
6270 let db = NamedTempFile::new().expect("temporary db");
6271 let writer = WriterActor::start(
6272 db.path(),
6273 Arc::new(SchemaManager::new()),
6274 ProvenanceMode::Warn,
6275 Arc::new(TelemetryCounters::default()),
6276 )
6277 .expect("writer");
6278
6279 writer
6281 .submit(WriteRequest {
6282 label: "v1".to_owned(),
6283 nodes: vec![NodeInsert {
6284 row_id: "row-1".to_owned(),
6285 logical_id: "node-1".to_owned(),
6286 kind: "Note".to_owned(),
6287 properties: "{}".to_owned(),
6288 source_ref: Some("src-1".to_owned()),
6289 upsert: false,
6290 chunk_policy: ChunkPolicy::Preserve,
6291 content_ref: None,
6292 }],
6293 node_retires: vec![],
6294 edges: vec![],
6295 edge_retires: vec![],
6296 chunks: vec![ChunkInsert {
6297 id: "chunk-v1".to_owned(),
6298 node_logical_id: "node-1".to_owned(),
6299 text_content: "original".to_owned(),
6300 byte_start: None,
6301 byte_end: None,
6302 content_hash: None,
6303 }],
6304 runs: vec![],
6305 steps: vec![],
6306 actions: vec![],
6307 optional_backfills: vec![],
6308 vec_inserts: vec![],
6309 operational_writes: vec![],
6310 })
6311 .expect("v1 write");
6312
6313 writer
6315 .submit(WriteRequest {
6316 label: "v2".to_owned(),
6317 nodes: vec![NodeInsert {
6318 row_id: "row-2".to_owned(),
6319 logical_id: "node-1".to_owned(),
6320 kind: "Meeting".to_owned(),
6321 properties: "{}".to_owned(),
6322 source_ref: Some("src-2".to_owned()),
6323 upsert: true,
6324 chunk_policy: ChunkPolicy::Replace,
6325 content_ref: None,
6326 }],
6327 node_retires: vec![],
6328 edges: vec![],
6329 edge_retires: vec![],
6330 chunks: vec![ChunkInsert {
6331 id: "chunk-v2".to_owned(),
6332 node_logical_id: "node-1".to_owned(),
6333 text_content: "updated".to_owned(),
6334 byte_start: None,
6335 byte_end: None,
6336 content_hash: None,
6337 }],
6338 runs: vec![],
6339 steps: vec![],
6340 actions: vec![],
6341 optional_backfills: vec![],
6342 vec_inserts: vec![],
6343 operational_writes: vec![],
6344 })
6345 .expect("v2 write");
6346
6347 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6348
6349 let old_count: i64 = conn
6351 .query_row(
6352 "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6353 [],
6354 |row| row.get(0),
6355 )
6356 .expect("old fts count");
6357 assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6358
6359 let new_kind: String = conn
6361 .query_row(
6362 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6363 [],
6364 |row| row.get(0),
6365 )
6366 .expect("new fts row");
6367 assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6368 }
6369
6370 #[test]
6373 fn vec_insert_empty_chunk_id_is_rejected() {
6374 let db = NamedTempFile::new().expect("temporary db");
6375 let writer = WriterActor::start(
6376 db.path(),
6377 Arc::new(SchemaManager::new()),
6378 ProvenanceMode::Warn,
6379 Arc::new(TelemetryCounters::default()),
6380 )
6381 .expect("writer");
6382 let result = writer.submit(WriteRequest {
6383 label: "vec-test".to_owned(),
6384 nodes: vec![],
6385 node_retires: vec![],
6386 edges: vec![],
6387 edge_retires: vec![],
6388 chunks: vec![],
6389 runs: vec![],
6390 steps: vec![],
6391 actions: vec![],
6392 optional_backfills: vec![],
6393 vec_inserts: vec![VecInsert {
6394 chunk_id: String::new(),
6395 embedding: vec![0.1, 0.2, 0.3],
6396 }],
6397 operational_writes: vec![],
6398 });
6399 assert!(
6400 matches!(result, Err(EngineError::InvalidWrite(_))),
6401 "empty chunk_id in VecInsert must be rejected"
6402 );
6403 }
6404
6405 #[test]
6406 fn vec_insert_empty_embedding_is_rejected() {
6407 let db = NamedTempFile::new().expect("temporary db");
6408 let writer = WriterActor::start(
6409 db.path(),
6410 Arc::new(SchemaManager::new()),
6411 ProvenanceMode::Warn,
6412 Arc::new(TelemetryCounters::default()),
6413 )
6414 .expect("writer");
6415 let result = writer.submit(WriteRequest {
6416 label: "vec-test".to_owned(),
6417 nodes: vec![],
6418 node_retires: vec![],
6419 edges: vec![],
6420 edge_retires: vec![],
6421 chunks: vec![],
6422 runs: vec![],
6423 steps: vec![],
6424 actions: vec![],
6425 optional_backfills: vec![],
6426 vec_inserts: vec![VecInsert {
6427 chunk_id: "chunk-1".to_owned(),
6428 embedding: vec![],
6429 }],
6430 operational_writes: vec![],
6431 });
6432 assert!(
6433 matches!(result, Err(EngineError::InvalidWrite(_))),
6434 "empty embedding in VecInsert must be rejected"
6435 );
6436 }
6437
6438 #[test]
6439 fn vec_insert_noop_without_feature() {
6440 let db = NamedTempFile::new().expect("temporary db");
6443 let writer = WriterActor::start(
6444 db.path(),
6445 Arc::new(SchemaManager::new()),
6446 ProvenanceMode::Warn,
6447 Arc::new(TelemetryCounters::default()),
6448 )
6449 .expect("writer");
6450 let result = writer.submit(WriteRequest {
6451 label: "vec-noop".to_owned(),
6452 nodes: vec![],
6453 node_retires: vec![],
6454 edges: vec![],
6455 edge_retires: vec![],
6456 chunks: vec![],
6457 runs: vec![],
6458 steps: vec![],
6459 actions: vec![],
6460 optional_backfills: vec![],
6461 vec_inserts: vec![VecInsert {
6462 chunk_id: "chunk-noop".to_owned(),
6463 embedding: vec![1.0, 2.0, 3.0],
6464 }],
6465 operational_writes: vec![],
6466 });
6467 #[cfg(not(feature = "sqlite-vec"))]
6468 result.expect("noop VecInsert without feature must succeed");
6469 #[cfg(feature = "sqlite-vec")]
6471 let _ = result;
6472 }
6473
6474 #[cfg(feature = "sqlite-vec")]
6475 #[test]
6476 fn node_retire_preserves_vec_rows_for_later_restore() {
6477 use crate::sqlite::open_connection_with_vec;
6478
6479 let db = NamedTempFile::new().expect("temporary db");
6480 let schema_manager = Arc::new(SchemaManager::new());
6481
6482 {
6483 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6484 schema_manager.bootstrap(&conn).expect("bootstrap");
6485 schema_manager
6486 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6487 .expect("ensure profile");
6488 }
6489
6490 let writer = WriterActor::start(
6491 db.path(),
6492 Arc::clone(&schema_manager),
6493 ProvenanceMode::Warn,
6494 Arc::new(TelemetryCounters::default()),
6495 )
6496 .expect("writer");
6497
6498 writer
6500 .submit(WriteRequest {
6501 label: "setup".to_owned(),
6502 nodes: vec![NodeInsert {
6503 row_id: "row-retire-vec".to_owned(),
6504 logical_id: "node-retire-vec".to_owned(),
6505 kind: "Doc".to_owned(),
6506 properties: "{}".to_owned(),
6507 source_ref: Some("src".to_owned()),
6508 upsert: false,
6509 chunk_policy: ChunkPolicy::Preserve,
6510 content_ref: None,
6511 }],
6512 node_retires: vec![],
6513 edges: vec![],
6514 edge_retires: vec![],
6515 chunks: vec![ChunkInsert {
6516 id: "chunk-retire-vec".to_owned(),
6517 node_logical_id: "node-retire-vec".to_owned(),
6518 text_content: "text".to_owned(),
6519 byte_start: None,
6520 byte_end: None,
6521 content_hash: None,
6522 }],
6523 runs: vec![],
6524 steps: vec![],
6525 actions: vec![],
6526 optional_backfills: vec![],
6527 vec_inserts: vec![VecInsert {
6528 chunk_id: "chunk-retire-vec".to_owned(),
6529 embedding: vec![0.1, 0.2, 0.3],
6530 }],
6531 operational_writes: vec![],
6532 })
6533 .expect("setup write");
6534
6535 writer
6537 .submit(WriteRequest {
6538 label: "retire".to_owned(),
6539 nodes: vec![],
6540 node_retires: vec![NodeRetire {
6541 logical_id: "node-retire-vec".to_owned(),
6542 source_ref: Some("src".to_owned()),
6543 }],
6544 edges: vec![],
6545 edge_retires: vec![],
6546 chunks: vec![],
6547 runs: vec![],
6548 steps: vec![],
6549 actions: vec![],
6550 optional_backfills: vec![],
6551 vec_inserts: vec![],
6552 operational_writes: vec![],
6553 })
6554 .expect("retire write");
6555
6556 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6557 let count: i64 = conn
6558 .query_row(
6559 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-retire-vec'",
6560 [],
6561 |row| row.get(0),
6562 )
6563 .expect("count");
6564 assert_eq!(
6565 count, 1,
6566 "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6567 );
6568 }
6569
6570 #[cfg(feature = "sqlite-vec")]
6571 #[test]
6572 fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6573 use crate::sqlite::open_connection_with_vec;
6574
6575 let db = NamedTempFile::new().expect("temporary db");
6576 let schema_manager = Arc::new(SchemaManager::new());
6577
6578 {
6579 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6580 schema_manager.bootstrap(&conn).expect("bootstrap");
6581 schema_manager
6582 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6583 .expect("ensure profile");
6584 }
6585
6586 let writer = WriterActor::start(
6587 db.path(),
6588 Arc::clone(&schema_manager),
6589 ProvenanceMode::Warn,
6590 Arc::new(TelemetryCounters::default()),
6591 )
6592 .expect("writer");
6593
6594 writer
6596 .submit(WriteRequest {
6597 label: "v1".to_owned(),
6598 nodes: vec![NodeInsert {
6599 row_id: "row-replace-v1".to_owned(),
6600 logical_id: "node-replace-vec".to_owned(),
6601 kind: "Doc".to_owned(),
6602 properties: "{}".to_owned(),
6603 source_ref: Some("src".to_owned()),
6604 upsert: false,
6605 chunk_policy: ChunkPolicy::Preserve,
6606 content_ref: None,
6607 }],
6608 node_retires: vec![],
6609 edges: vec![],
6610 edge_retires: vec![],
6611 chunks: vec![ChunkInsert {
6612 id: "chunk-replace-A".to_owned(),
6613 node_logical_id: "node-replace-vec".to_owned(),
6614 text_content: "version one".to_owned(),
6615 byte_start: None,
6616 byte_end: None,
6617 content_hash: None,
6618 }],
6619 runs: vec![],
6620 steps: vec![],
6621 actions: vec![],
6622 optional_backfills: vec![],
6623 vec_inserts: vec![VecInsert {
6624 chunk_id: "chunk-replace-A".to_owned(),
6625 embedding: vec![0.1, 0.2, 0.3],
6626 }],
6627 operational_writes: vec![],
6628 })
6629 .expect("v1 write");
6630
6631 writer
6633 .submit(WriteRequest {
6634 label: "v2".to_owned(),
6635 nodes: vec![NodeInsert {
6636 row_id: "row-replace-v2".to_owned(),
6637 logical_id: "node-replace-vec".to_owned(),
6638 kind: "Doc".to_owned(),
6639 properties: "{}".to_owned(),
6640 source_ref: Some("src".to_owned()),
6641 upsert: true,
6642 chunk_policy: ChunkPolicy::Replace,
6643 content_ref: None,
6644 }],
6645 node_retires: vec![],
6646 edges: vec![],
6647 edge_retires: vec![],
6648 chunks: vec![ChunkInsert {
6649 id: "chunk-replace-B".to_owned(),
6650 node_logical_id: "node-replace-vec".to_owned(),
6651 text_content: "version two".to_owned(),
6652 byte_start: None,
6653 byte_end: None,
6654 content_hash: None,
6655 }],
6656 runs: vec![],
6657 steps: vec![],
6658 actions: vec![],
6659 optional_backfills: vec![],
6660 vec_inserts: vec![VecInsert {
6661 chunk_id: "chunk-replace-B".to_owned(),
6662 embedding: vec![0.4, 0.5, 0.6],
6663 }],
6664 operational_writes: vec![],
6665 })
6666 .expect("v2 write");
6667
6668 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6669 let count_a: i64 = conn
6670 .query_row(
6671 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-A'",
6672 [],
6673 |row| row.get(0),
6674 )
6675 .expect("count A");
6676 let count_b: i64 = conn
6677 .query_row(
6678 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-B'",
6679 [],
6680 |row| row.get(0),
6681 )
6682 .expect("count B");
6683 assert_eq!(
6684 count_a, 0,
6685 "old vec row (chunk-A) must be deleted on Replace"
6686 );
6687 assert_eq!(
6688 count_b, 1,
6689 "new vec row (chunk-B) must be present after Replace"
6690 );
6691 }
6692
6693 #[cfg(feature = "sqlite-vec")]
6694 #[test]
6695 fn vec_insert_is_persisted_when_feature_enabled() {
6696 use crate::sqlite::open_connection_with_vec;
6697
6698 let db = NamedTempFile::new().expect("temporary db");
6699 let schema_manager = Arc::new(SchemaManager::new());
6700
6701 {
6703 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6704 schema_manager.bootstrap(&conn).expect("bootstrap");
6705 schema_manager
6706 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6707 .expect("ensure profile");
6708 }
6709
6710 let writer = WriterActor::start(
6711 db.path(),
6712 Arc::clone(&schema_manager),
6713 ProvenanceMode::Warn,
6714 Arc::new(TelemetryCounters::default()),
6715 )
6716 .expect("writer");
6717
6718 writer
6719 .submit(WriteRequest {
6720 label: "vec-insert".to_owned(),
6721 nodes: vec![],
6722 node_retires: vec![],
6723 edges: vec![],
6724 edge_retires: vec![],
6725 chunks: vec![],
6726 runs: vec![],
6727 steps: vec![],
6728 actions: vec![],
6729 optional_backfills: vec![],
6730 vec_inserts: vec![VecInsert {
6731 chunk_id: "chunk-vec".to_owned(),
6732 embedding: vec![0.1, 0.2, 0.3],
6733 }],
6734 operational_writes: vec![],
6735 })
6736 .expect("vec insert write");
6737
6738 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6739 let count: i64 = conn
6740 .query_row(
6741 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-vec'",
6742 [],
6743 |row| row.get(0),
6744 )
6745 .expect("count");
6746 assert_eq!(count, 1, "VecInsert must persist a row in vec_nodes_active");
6747 }
6748
6749 #[test]
6752 fn write_request_exceeding_node_limit_is_rejected() {
6753 let nodes: Vec<NodeInsert> = (0..10_001)
6754 .map(|i| NodeInsert {
6755 row_id: format!("row-{i}"),
6756 logical_id: format!("lg-{i}"),
6757 kind: "Note".to_owned(),
6758 properties: "{}".to_owned(),
6759 source_ref: None,
6760 upsert: false,
6761 chunk_policy: ChunkPolicy::Preserve,
6762 content_ref: None,
6763 })
6764 .collect();
6765
6766 let request = WriteRequest {
6767 label: "too-many-nodes".to_owned(),
6768 nodes,
6769 node_retires: vec![],
6770 edges: vec![],
6771 edge_retires: vec![],
6772 chunks: vec![],
6773 runs: vec![],
6774 steps: vec![],
6775 actions: vec![],
6776 optional_backfills: vec![],
6777 vec_inserts: vec![],
6778 operational_writes: vec![],
6779 };
6780
6781 let result = prepare_write(request, ProvenanceMode::Warn)
6782 .map(|_| ())
6783 .map_err(|e| format!("{e}"));
6784 assert!(
6785 matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6786 "exceeding node limit must return InvalidWrite: got {result:?}"
6787 );
6788 }
6789
6790 #[test]
6791 fn write_request_exceeding_total_limit_is_rejected() {
6792 let request = WriteRequest {
6796 label: "too-many-total".to_owned(),
6797 nodes: (0..10_000)
6798 .map(|i| NodeInsert {
6799 row_id: format!("row-{i}"),
6800 logical_id: format!("lg-{i}"),
6801 kind: "Note".to_owned(),
6802 properties: "{}".to_owned(),
6803 source_ref: None,
6804 upsert: false,
6805 chunk_policy: ChunkPolicy::Preserve,
6806 content_ref: None,
6807 })
6808 .collect(),
6809 node_retires: vec![],
6810 edges: (0..10_000)
6811 .map(|i| EdgeInsert {
6812 row_id: format!("edge-row-{i}"),
6813 logical_id: format!("edge-lg-{i}"),
6814 kind: "link".to_owned(),
6815 source_logical_id: format!("lg-{i}"),
6816 target_logical_id: format!("lg-{}", i + 1),
6817 properties: "{}".to_owned(),
6818 source_ref: None,
6819 upsert: false,
6820 })
6821 .collect(),
6822 edge_retires: vec![],
6823 chunks: (0..50_000)
6824 .map(|i| ChunkInsert {
6825 id: format!("chunk-{i}"),
6826 node_logical_id: "lg-0".to_owned(),
6827 text_content: "text".to_owned(),
6828 byte_start: None,
6829 byte_end: None,
6830 content_hash: None,
6831 })
6832 .collect(),
6833 runs: vec![],
6834 steps: vec![],
6835 actions: vec![],
6836 optional_backfills: vec![],
6837 vec_inserts: (0..20_001)
6838 .map(|i| VecInsert {
6839 chunk_id: format!("vec-chunk-{i}"),
6840 embedding: vec![0.1],
6841 })
6842 .collect(),
6843 operational_writes: (0..10_000)
6844 .map(|i| OperationalWrite::Append {
6845 collection: format!("col-{i}"),
6846 record_key: format!("key-{i}"),
6847 payload_json: "{}".to_owned(),
6848 source_ref: None,
6849 })
6850 .collect(),
6851 };
6852
6853 let result = prepare_write(request, ProvenanceMode::Warn)
6854 .map(|_| ())
6855 .map_err(|e| format!("{e}"));
6856 assert!(
6857 matches!(result, Err(ref msg) if msg.contains("too many total items")),
6858 "exceeding total item limit must return InvalidWrite: got {result:?}"
6859 );
6860 }
6861
6862 #[test]
6863 fn write_request_within_limits_succeeds() {
6864 let db = NamedTempFile::new().expect("temporary db");
6865 let writer = WriterActor::start(
6866 db.path(),
6867 Arc::new(SchemaManager::new()),
6868 ProvenanceMode::Warn,
6869 Arc::new(TelemetryCounters::default()),
6870 )
6871 .expect("writer");
6872
6873 let result = writer.submit(WriteRequest {
6874 label: "within-limits".to_owned(),
6875 nodes: vec![NodeInsert {
6876 row_id: "row-1".to_owned(),
6877 logical_id: "lg-1".to_owned(),
6878 kind: "Note".to_owned(),
6879 properties: "{}".to_owned(),
6880 source_ref: None,
6881 upsert: false,
6882 chunk_policy: ChunkPolicy::Preserve,
6883 content_ref: None,
6884 }],
6885 node_retires: vec![],
6886 edges: vec![],
6887 edge_retires: vec![],
6888 chunks: vec![],
6889 runs: vec![],
6890 steps: vec![],
6891 actions: vec![],
6892 optional_backfills: vec![],
6893 vec_inserts: vec![],
6894 operational_writes: vec![],
6895 });
6896
6897 assert!(
6898 result.is_ok(),
6899 "write request within limits must succeed: got {result:?}"
6900 );
6901 }
6902
6903 #[test]
6904 fn property_fts_rows_created_on_node_insert() {
6905 let db = NamedTempFile::new().expect("temporary db");
6906 let schema = Arc::new(SchemaManager::new());
6908 {
6909 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6910 schema.bootstrap(&conn).expect("bootstrap");
6911 conn.execute(
6912 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6913 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
6914 [],
6915 )
6916 .expect("register schema");
6917 }
6918 let writer = WriterActor::start(
6919 db.path(),
6920 Arc::clone(&schema),
6921 ProvenanceMode::Warn,
6922 Arc::new(TelemetryCounters::default()),
6923 )
6924 .expect("writer");
6925
6926 writer
6927 .submit(WriteRequest {
6928 label: "goal-insert".to_owned(),
6929 nodes: vec![NodeInsert {
6930 row_id: "row-1".to_owned(),
6931 logical_id: "goal-1".to_owned(),
6932 kind: "Goal".to_owned(),
6933 properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
6934 .to_owned(),
6935 source_ref: Some("src-1".to_owned()),
6936 upsert: false,
6937 chunk_policy: ChunkPolicy::Preserve,
6938 content_ref: None,
6939 }],
6940 node_retires: vec![],
6941 edges: vec![],
6942 edge_retires: vec![],
6943 chunks: vec![],
6944 runs: vec![],
6945 steps: vec![],
6946 actions: vec![],
6947 optional_backfills: vec![],
6948 vec_inserts: vec![],
6949 operational_writes: vec![],
6950 })
6951 .expect("write");
6952
6953 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6954 let text: String = conn
6955 .query_row(
6956 "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6957 [],
6958 |row| row.get(0),
6959 )
6960 .expect("property FTS row must exist");
6961 assert_eq!(text, "Ship v2 Launch the redesign");
6962 }
6963
6964 #[test]
6965 fn property_fts_rows_replaced_on_upsert() {
6966 let db = NamedTempFile::new().expect("temporary db");
6967 let schema = Arc::new(SchemaManager::new());
6968 {
6969 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6970 schema.bootstrap(&conn).expect("bootstrap");
6971 conn.execute(
6972 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6973 VALUES ('Goal', '[\"$.name\"]', ' ')",
6974 [],
6975 )
6976 .expect("register schema");
6977 }
6978 let writer = WriterActor::start(
6979 db.path(),
6980 Arc::clone(&schema),
6981 ProvenanceMode::Warn,
6982 Arc::new(TelemetryCounters::default()),
6983 )
6984 .expect("writer");
6985
6986 writer
6988 .submit(WriteRequest {
6989 label: "insert".to_owned(),
6990 nodes: vec![NodeInsert {
6991 row_id: "row-1".to_owned(),
6992 logical_id: "goal-1".to_owned(),
6993 kind: "Goal".to_owned(),
6994 properties: r#"{"name":"Alpha"}"#.to_owned(),
6995 source_ref: Some("src-1".to_owned()),
6996 upsert: false,
6997 chunk_policy: ChunkPolicy::Preserve,
6998 content_ref: None,
6999 }],
7000 node_retires: vec![],
7001 edges: vec![],
7002 edge_retires: vec![],
7003 chunks: vec![],
7004 runs: vec![],
7005 steps: vec![],
7006 actions: vec![],
7007 optional_backfills: vec![],
7008 vec_inserts: vec![],
7009 operational_writes: vec![],
7010 })
7011 .expect("insert");
7012
7013 writer
7015 .submit(WriteRequest {
7016 label: "upsert".to_owned(),
7017 nodes: vec![NodeInsert {
7018 row_id: "row-2".to_owned(),
7019 logical_id: "goal-1".to_owned(),
7020 kind: "Goal".to_owned(),
7021 properties: r#"{"name":"Beta"}"#.to_owned(),
7022 source_ref: Some("src-2".to_owned()),
7023 upsert: true,
7024 chunk_policy: ChunkPolicy::Preserve,
7025 content_ref: None,
7026 }],
7027 node_retires: vec![],
7028 edges: vec![],
7029 edge_retires: vec![],
7030 chunks: vec![],
7031 runs: vec![],
7032 steps: vec![],
7033 actions: vec![],
7034 optional_backfills: vec![],
7035 vec_inserts: vec![],
7036 operational_writes: vec![],
7037 })
7038 .expect("upsert");
7039
7040 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7041 let count: i64 = conn
7042 .query_row(
7043 "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7044 [],
7045 |row| row.get(0),
7046 )
7047 .expect("count");
7048 assert_eq!(
7049 count, 1,
7050 "must have exactly one property FTS row after upsert"
7051 );
7052
7053 let text: String = conn
7054 .query_row(
7055 "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7056 [],
7057 |row| row.get(0),
7058 )
7059 .expect("text");
7060 assert_eq!(text, "Beta", "property FTS must reflect updated properties");
7061 }
7062
7063 #[test]
7064 fn property_fts_rows_deleted_on_retire() {
7065 let db = NamedTempFile::new().expect("temporary db");
7066 let schema = Arc::new(SchemaManager::new());
7067 {
7068 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7069 schema.bootstrap(&conn).expect("bootstrap");
7070 conn.execute(
7071 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7072 VALUES ('Goal', '[\"$.name\"]', ' ')",
7073 [],
7074 )
7075 .expect("register schema");
7076 }
7077 let writer = WriterActor::start(
7078 db.path(),
7079 Arc::clone(&schema),
7080 ProvenanceMode::Warn,
7081 Arc::new(TelemetryCounters::default()),
7082 )
7083 .expect("writer");
7084
7085 writer
7087 .submit(WriteRequest {
7088 label: "insert".to_owned(),
7089 nodes: vec![NodeInsert {
7090 row_id: "row-1".to_owned(),
7091 logical_id: "goal-1".to_owned(),
7092 kind: "Goal".to_owned(),
7093 properties: r#"{"name":"Alpha"}"#.to_owned(),
7094 source_ref: Some("src-1".to_owned()),
7095 upsert: false,
7096 chunk_policy: ChunkPolicy::Preserve,
7097 content_ref: None,
7098 }],
7099 node_retires: vec![],
7100 edges: vec![],
7101 edge_retires: vec![],
7102 chunks: vec![],
7103 runs: vec![],
7104 steps: vec![],
7105 actions: vec![],
7106 optional_backfills: vec![],
7107 vec_inserts: vec![],
7108 operational_writes: vec![],
7109 })
7110 .expect("insert");
7111
7112 writer
7114 .submit(WriteRequest {
7115 label: "retire".to_owned(),
7116 nodes: vec![],
7117 node_retires: vec![NodeRetire {
7118 logical_id: "goal-1".to_owned(),
7119 source_ref: Some("forget-1".to_owned()),
7120 }],
7121 edges: vec![],
7122 edge_retires: vec![],
7123 chunks: vec![],
7124 runs: vec![],
7125 steps: vec![],
7126 actions: vec![],
7127 optional_backfills: vec![],
7128 vec_inserts: vec![],
7129 operational_writes: vec![],
7130 })
7131 .expect("retire");
7132
7133 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7134 let count: i64 = conn
7135 .query_row(
7136 "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7137 [],
7138 |row| row.get(0),
7139 )
7140 .expect("count");
7141 assert_eq!(count, 0, "property FTS row must be deleted on retire");
7142 }
7143
7144 #[test]
7145 fn no_property_fts_row_for_unregistered_kind() {
7146 let db = NamedTempFile::new().expect("temporary db");
7147 let schema = Arc::new(SchemaManager::new());
7148 {
7149 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7150 schema.bootstrap(&conn).expect("bootstrap");
7151 }
7153 let writer = WriterActor::start(
7154 db.path(),
7155 Arc::clone(&schema),
7156 ProvenanceMode::Warn,
7157 Arc::new(TelemetryCounters::default()),
7158 )
7159 .expect("writer");
7160
7161 writer
7162 .submit(WriteRequest {
7163 label: "insert".to_owned(),
7164 nodes: vec![NodeInsert {
7165 row_id: "row-1".to_owned(),
7166 logical_id: "note-1".to_owned(),
7167 kind: "Note".to_owned(),
7168 properties: r#"{"title":"hello"}"#.to_owned(),
7169 source_ref: Some("src-1".to_owned()),
7170 upsert: false,
7171 chunk_policy: ChunkPolicy::Preserve,
7172 content_ref: None,
7173 }],
7174 node_retires: vec![],
7175 edges: vec![],
7176 edge_retires: vec![],
7177 chunks: vec![],
7178 runs: vec![],
7179 steps: vec![],
7180 actions: vec![],
7181 optional_backfills: vec![],
7182 vec_inserts: vec![],
7183 operational_writes: vec![],
7184 })
7185 .expect("insert");
7186
7187 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7188 let count: i64 = conn
7189 .query_row("SELECT count(*) FROM fts_node_properties", [], |row| {
7190 row.get(0)
7191 })
7192 .expect("count");
7193 assert_eq!(count, 0, "no property FTS rows for unregistered kind");
7194 }
7195
7196 mod extract_json_path_tests {
7197 use super::super::extract_json_path;
7198 use serde_json::json;
7199
7200 #[test]
7201 fn string_value() {
7202 let v = json!({"name": "alice"});
7203 assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7204 }
7205
7206 #[test]
7207 fn number_value() {
7208 let v = json!({"age": 42});
7209 assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7210 }
7211
7212 #[test]
7213 fn bool_value() {
7214 let v = json!({"active": true});
7215 assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7216 }
7217
7218 #[test]
7219 fn null_value() {
7220 let v = json!({"x": null});
7221 assert!(extract_json_path(&v, "$.x").is_empty());
7222 }
7223
7224 #[test]
7225 fn missing_path() {
7226 let v = json!({"name": "a"});
7227 assert!(extract_json_path(&v, "$.missing").is_empty());
7228 }
7229
7230 #[test]
7231 fn nested_path() {
7232 let v = json!({"address": {"city": "NYC"}});
7233 assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7234 }
7235
7236 #[test]
7237 fn array_of_strings() {
7238 let v = json!({"tags": ["a", "b", "c"]});
7239 assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7240 }
7241
7242 #[test]
7243 fn array_mixed_scalars() {
7244 let v = json!({"vals": ["x", 1, true]});
7245 assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7246 }
7247
7248 #[test]
7249 fn array_only_objects_returns_empty() {
7250 let v = json!({"data": [{"k": "v"}]});
7251 assert!(extract_json_path(&v, "$.data").is_empty());
7252 }
7253
7254 #[test]
7255 fn array_mixed_objects_and_scalars() {
7256 let v = json!({"data": ["keep", {"skip": true}, "also"]});
7257 assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7258 }
7259
7260 #[test]
7261 fn object_returns_empty() {
7262 let v = json!({"meta": {"k": "v"}});
7263 assert!(extract_json_path(&v, "$.meta").is_empty());
7264 }
7265
7266 #[test]
7267 fn no_prefix_returns_empty() {
7268 let v = json!({"name": "a"});
7269 assert!(extract_json_path(&v, "name").is_empty());
7270 }
7271 }
7272
7273 mod recursive_extraction_tests {
7274 use super::super::{
7275 LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7276 PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7277 };
7278 use serde_json::json;
7279
7280 fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7281 PropertyFtsSchema {
7282 paths,
7283 separator: " ".to_owned(),
7284 exclude_paths: Vec::new(),
7285 }
7286 }
7287
7288 fn schema_with_excludes(
7289 paths: Vec<PropertyPathEntry>,
7290 excludes: Vec<String>,
7291 ) -> PropertyFtsSchema {
7292 PropertyFtsSchema {
7293 paths,
7294 separator: " ".to_owned(),
7295 exclude_paths: excludes,
7296 }
7297 }
7298
7299 #[test]
7300 fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7301 let props = json!({"payload": {"b": "two", "a": "one"}});
7302 let (blob, positions, _stats) = extract_property_fts(
7303 &props,
7304 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7305 );
7306 let blob = blob.expect("blob emitted");
7307 let idx_one = blob.find("one").expect("contains 'one'");
7308 let idx_two = blob.find("two").expect("contains 'two'");
7309 assert!(
7310 idx_one < idx_two,
7311 "lex order: 'one' (key a) before 'two' (key b)"
7312 );
7313 assert_eq!(positions.len(), 2);
7314 assert_eq!(positions[0].leaf_path, "$.payload.a");
7315 assert_eq!(positions[1].leaf_path, "$.payload.b");
7316 }
7317
7318 #[test]
7319 fn recursive_extraction_walks_arrays_of_scalars() {
7320 let props = json!({"tags": ["red", "blue"]});
7321 let (_blob, positions, _stats) = extract_property_fts(
7322 &props,
7323 &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7324 );
7325 assert_eq!(positions.len(), 2);
7326 assert_eq!(positions[0].leaf_path, "$.tags[0]");
7327 assert_eq!(positions[1].leaf_path, "$.tags[1]");
7328 }
7329
7330 #[test]
7331 fn recursive_extraction_walks_arrays_of_objects() {
7332 let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7333 let (_blob, positions, _stats) = extract_property_fts(
7334 &props,
7335 &schema(vec![PropertyPathEntry::recursive("$.items")]),
7336 );
7337 assert_eq!(positions.len(), 2);
7338 assert_eq!(positions[0].leaf_path, "$.items[0].name");
7339 assert_eq!(positions[1].leaf_path, "$.items[1].name");
7340 }
7341
7342 #[test]
7343 fn recursive_extraction_stringifies_numbers_and_bools() {
7344 let props = json!({"root": {"n": 42, "ok": true}});
7345 let (blob, _positions, _stats) = extract_property_fts(
7346 &props,
7347 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7348 );
7349 let blob = blob.expect("blob emitted");
7350 assert!(blob.contains("42"));
7351 assert!(blob.contains("true"));
7352 }
7353
7354 #[test]
7355 fn recursive_extraction_skips_nulls_and_missing() {
7356 let props = json!({"root": {"x": null, "y": "present"}});
7357 let (blob, positions, _stats) = extract_property_fts(
7358 &props,
7359 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7360 );
7361 let blob = blob.expect("blob emitted");
7362 assert!(!blob.contains("null"));
7363 assert_eq!(positions.len(), 1);
7364 assert_eq!(positions[0].leaf_path, "$.root.y");
7365 }
7366
7367 #[test]
7368 fn recursive_extraction_respects_max_depth_guardrail() {
7369 let mut inner = json!("leaf-value");
7371 for _ in 0..10 {
7372 inner = json!({ "k": inner });
7373 }
7374 let props = json!({ "root": inner });
7375 let (blob, positions, stats) = extract_property_fts(
7376 &props,
7377 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7378 );
7379 assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7380 assert!(
7382 blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7383 "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7384 );
7385 let _ = positions;
7388 let _ = MAX_RECURSIVE_DEPTH;
7389 }
7390
7391 #[test]
7392 fn recursive_extraction_respects_max_bytes_guardrail() {
7393 let leaves: Vec<String> = (0..40)
7395 .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7396 .collect();
7397 let props = json!({ "root": leaves });
7398 let (blob, positions, stats) = extract_property_fts(
7399 &props,
7400 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7401 );
7402 assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7403 let blob = blob.expect("blob must still be emitted");
7404 assert!(
7405 blob.len() <= MAX_EXTRACTED_BYTES,
7406 "blob must not exceed MAX_EXTRACTED_BYTES"
7407 );
7408 for pos in &positions {
7410 assert!(pos.end_offset <= blob.len());
7411 let slice = &blob[pos.start_offset..pos.end_offset];
7412 assert!(!slice.is_empty());
7415 assert!(!slice.contains(LEAF_SEPARATOR));
7416 }
7417 assert!(!blob.ends_with(LEAF_SEPARATOR));
7419 }
7420
7421 #[test]
7422 fn recursive_extraction_respects_exclude_paths() {
7423 let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7424 let (blob, positions, stats) = extract_property_fts(
7425 &props,
7426 &schema_with_excludes(
7427 vec![PropertyPathEntry::recursive("$.payload")],
7428 vec!["$.payload.priv".to_owned()],
7429 ),
7430 );
7431 let blob = blob.expect("blob emitted");
7432 assert!(blob.contains("yes"));
7433 assert!(!blob.contains("no"));
7434 assert_eq!(positions.len(), 1);
7435 assert_eq!(positions[0].leaf_path, "$.payload.pub");
7436 assert!(stats.excluded_subtree > 0);
7437 }
7438
7439 #[test]
7440 fn position_map_entries_match_emitted_leaves_in_order() {
7441 let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7442 let (blob, positions, _stats) = extract_property_fts(
7443 &props,
7444 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7445 );
7446 let blob = blob.expect("blob emitted");
7447 assert_eq!(positions.len(), 3);
7448 assert_eq!(positions[0].leaf_path, "$.root.a");
7450 assert_eq!(positions[1].leaf_path, "$.root.b");
7451 assert_eq!(positions[2].leaf_path, "$.root.c");
7452 let mut prev_end: usize = 0;
7454 for (i, pos) in positions.iter().enumerate() {
7455 assert!(pos.start_offset >= prev_end);
7456 assert!(pos.end_offset > pos.start_offset);
7457 assert!(pos.end_offset <= blob.len());
7458 let slice = &blob[pos.start_offset..pos.end_offset];
7459 match i {
7460 0 => assert_eq!(slice, "alpha"),
7461 1 => assert_eq!(slice, "bravo"),
7462 2 => assert_eq!(slice, "charlie"),
7463 _ => unreachable!(),
7464 }
7465 prev_end = pos.end_offset;
7466 }
7467 }
7468
7469 #[test]
7470 fn scalar_only_schema_produces_empty_position_map() {
7471 let props = json!({"name": "alpha", "title": "beta"});
7472 let (blob, positions, _stats) = extract_property_fts(
7473 &props,
7474 &schema(vec![
7475 PropertyPathEntry::scalar("$.name"),
7476 PropertyPathEntry::scalar("$.title"),
7477 ]),
7478 );
7479 assert_eq!(blob.as_deref(), Some("alpha beta"));
7480 assert!(
7481 positions.is_empty(),
7482 "scalar-only schema must emit no position entries"
7483 );
7484 let _: Vec<PositionEntry> = positions;
7487 }
7488 }
7489}