1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::SchemaManager;
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14 OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15 OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16 OperationalValidationMode, extract_secondary_index_entries_for_current,
17 extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18 parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26 pub target: ProjectionTarget,
28 pub payload: String,
30}
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35 #[default]
37 Preserve,
38 Replace,
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45 #[default]
47 Warn,
48 Require,
51}
52
53#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56 pub row_id: String,
57 pub logical_id: String,
58 pub kind: String,
59 pub properties: String,
60 pub source_ref: Option<String>,
61 pub upsert: bool,
64 pub chunk_policy: ChunkPolicy,
66 pub content_ref: Option<String>,
68}
69
70#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EdgeInsert {
73 pub row_id: String,
74 pub logical_id: String,
75 pub source_logical_id: String,
76 pub target_logical_id: String,
77 pub kind: String,
78 pub properties: String,
79 pub source_ref: Option<String>,
80 pub upsert: bool,
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
87pub struct NodeRetire {
88 pub logical_id: String,
89 pub source_ref: Option<String>,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeRetire {
95 pub logical_id: String,
96 pub source_ref: Option<String>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq)]
101pub struct ChunkInsert {
102 pub id: String,
103 pub node_logical_id: String,
104 pub text_content: String,
105 pub byte_start: Option<i64>,
106 pub byte_end: Option<i64>,
107 pub content_hash: Option<String>,
109}
110
111#[derive(Clone, Debug, PartialEq)]
118pub struct VecInsert {
119 pub chunk_id: String,
120 pub embedding: Vec<f32>,
121}
122
123#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum OperationalWrite {
126 Append {
127 collection: String,
128 record_key: String,
129 payload_json: String,
130 source_ref: Option<String>,
131 },
132 Put {
133 collection: String,
134 record_key: String,
135 payload_json: String,
136 source_ref: Option<String>,
137 },
138 Delete {
139 collection: String,
140 record_key: String,
141 source_ref: Option<String>,
142 },
143}
144
145#[derive(Clone, Debug, PartialEq, Eq)]
147pub struct RunInsert {
148 pub id: String,
149 pub kind: String,
150 pub status: String,
151 pub properties: String,
152 pub source_ref: Option<String>,
153 pub upsert: bool,
154 pub supersedes_id: Option<String>,
155}
156
157#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepInsert {
160 pub id: String,
161 pub run_id: String,
162 pub kind: String,
163 pub status: String,
164 pub properties: String,
165 pub source_ref: Option<String>,
166 pub upsert: bool,
167 pub supersedes_id: Option<String>,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct ActionInsert {
173 pub id: String,
174 pub step_id: String,
175 pub kind: String,
176 pub status: String,
177 pub properties: String,
178 pub source_ref: Option<String>,
179 pub upsert: bool,
180 pub supersedes_id: Option<String>,
181}
182
183const MAX_NODES: usize = 10_000;
185const MAX_EDGES: usize = 10_000;
186const MAX_CHUNKS: usize = 50_000;
187const MAX_RETIRES: usize = 10_000;
188const MAX_RUNTIME_ITEMS: usize = 10_000;
189const MAX_OPERATIONAL: usize = 10_000;
190const MAX_TOTAL_ITEMS: usize = 100_000;
191
192const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
199
200#[derive(Clone, Debug, PartialEq)]
202pub struct WriteRequest {
203 pub label: String,
204 pub nodes: Vec<NodeInsert>,
205 pub node_retires: Vec<NodeRetire>,
206 pub edges: Vec<EdgeInsert>,
207 pub edge_retires: Vec<EdgeRetire>,
208 pub chunks: Vec<ChunkInsert>,
209 pub runs: Vec<RunInsert>,
210 pub steps: Vec<StepInsert>,
211 pub actions: Vec<ActionInsert>,
212 pub optional_backfills: Vec<OptionalProjectionTask>,
213 pub vec_inserts: Vec<VecInsert>,
216 pub operational_writes: Vec<OperationalWrite>,
217}
218
219#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct WriteReceipt {
222 pub label: String,
223 pub optional_backfill_count: usize,
224 pub warnings: Vec<String>,
225 pub provenance_warnings: Vec<String>,
226}
227
228#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct LastAccessTouchRequest {
231 pub logical_ids: Vec<String>,
232 pub touched_at: i64,
233 pub source_ref: Option<String>,
234}
235
236#[derive(Clone, Debug, PartialEq, Eq)]
238pub struct LastAccessTouchReport {
239 pub touched_logical_ids: usize,
240 pub touched_at: i64,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct FtsProjectionRow {
245 chunk_id: String,
246 node_logical_id: String,
247 kind: String,
248 text_content: String,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
252struct FtsPropertyProjectionRow {
253 node_logical_id: String,
254 kind: String,
255 text_content: String,
256 positions: Vec<PositionEntry>,
257}
258
259pub(crate) const MAX_RECURSIVE_DEPTH: usize = 8;
262
263pub(crate) const MAX_EXTRACTED_BYTES: usize = 65_536;
273
274pub(crate) const LEAF_SEPARATOR: &str = " fathomdbphrasebreaksentinel ";
301
302#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
305pub(crate) enum PropertyPathMode {
306 #[default]
309 Scalar,
310 Recursive,
313}
314
315#[derive(Clone, Debug, PartialEq, Eq)]
317pub(crate) struct PropertyPathEntry {
318 pub path: String,
319 pub mode: PropertyPathMode,
320}
321
322impl PropertyPathEntry {
323 pub(crate) fn scalar(path: impl Into<String>) -> Self {
324 Self {
325 path: path.into(),
326 mode: PropertyPathMode::Scalar,
327 }
328 }
329
330 #[cfg(test)]
331 pub(crate) fn recursive(path: impl Into<String>) -> Self {
332 Self {
333 path: path.into(),
334 mode: PropertyPathMode::Recursive,
335 }
336 }
337}
338
339#[derive(Clone, Debug, PartialEq, Eq)]
341pub(crate) struct PropertyFtsSchema {
342 pub paths: Vec<PropertyPathEntry>,
345 pub separator: String,
348 pub exclude_paths: Vec<String>,
357}
358
359#[derive(Clone, Debug, PartialEq, Eq)]
363pub(crate) struct PositionEntry {
364 pub start_offset: usize,
365 pub end_offset: usize,
366 pub leaf_path: String,
367}
368
369#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
372pub(crate) struct ExtractStats {
373 pub depth_cap_hit: usize,
374 pub byte_cap_reached: bool,
379 pub excluded_subtree: usize,
380}
381
382impl ExtractStats {
383 fn merge(&mut self, other: ExtractStats) {
384 self.depth_cap_hit += other.depth_cap_hit;
385 self.byte_cap_reached |= other.byte_cap_reached;
386 self.excluded_subtree += other.excluded_subtree;
387 }
388}
389
390struct PreparedWrite {
391 label: String,
392 nodes: Vec<NodeInsert>,
393 node_retires: Vec<NodeRetire>,
394 edges: Vec<EdgeInsert>,
395 edge_retires: Vec<EdgeRetire>,
396 chunks: Vec<ChunkInsert>,
397 runs: Vec<RunInsert>,
398 steps: Vec<StepInsert>,
399 actions: Vec<ActionInsert>,
400 #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
403 vec_inserts: Vec<VecInsert>,
404 operational_writes: Vec<OperationalWrite>,
405 operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
406 operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
407 operational_validation_warnings: Vec<String>,
408 node_kinds: HashMap<String, String>,
411 required_fts_rows: Vec<FtsProjectionRow>,
413 required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
415 optional_backfills: Vec<OptionalProjectionTask>,
416}
417
418enum WriteMessage {
419 Submit {
420 prepared: Box<PreparedWrite>,
421 reply: Sender<Result<WriteReceipt, EngineError>>,
422 },
423 TouchLastAccessed {
424 request: LastAccessTouchRequest,
425 reply: Sender<Result<LastAccessTouchReport, EngineError>>,
426 },
427}
428
429#[derive(Debug)]
434pub struct WriterActor {
435 sender: ManuallyDrop<SyncSender<WriteMessage>>,
436 thread_handle: Option<thread::JoinHandle<()>>,
437 provenance_mode: ProvenanceMode,
438 _telemetry: Arc<TelemetryCounters>,
440}
441
442impl WriterActor {
443 pub fn start(
446 path: impl AsRef<Path>,
447 schema_manager: Arc<SchemaManager>,
448 provenance_mode: ProvenanceMode,
449 telemetry: Arc<TelemetryCounters>,
450 ) -> Result<Self, EngineError> {
451 let database_path = path.as_ref().to_path_buf();
452 let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
453
454 let writer_telemetry = Arc::clone(&telemetry);
455 let handle = thread::Builder::new()
456 .name("fathomdb-writer".to_owned())
457 .spawn(move || {
458 writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
459 })
460 .map_err(EngineError::Io)?;
461
462 Ok(Self {
463 sender: ManuallyDrop::new(sender),
464 thread_handle: Some(handle),
465 provenance_mode,
466 _telemetry: telemetry,
467 })
468 }
469
470 fn is_thread_alive(&self) -> bool {
472 self.thread_handle
473 .as_ref()
474 .is_some_and(|h| !h.is_finished())
475 }
476
477 fn check_thread_alive(&self) -> Result<(), EngineError> {
479 if self.is_thread_alive() {
480 Ok(())
481 } else {
482 Err(EngineError::WriterRejected(
483 "writer thread has exited".to_owned(),
484 ))
485 }
486 }
487
488 pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
492 self.check_thread_alive()?;
493 let prepared = prepare_write(request, self.provenance_mode)?;
494 let (reply_tx, reply_rx) = mpsc::channel();
495 self.sender
496 .send(WriteMessage::Submit {
497 prepared: Box::new(prepared),
498 reply: reply_tx,
499 })
500 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
501
502 recv_with_timeout(&reply_rx)
503 }
504
505 pub fn touch_last_accessed(
509 &self,
510 request: LastAccessTouchRequest,
511 ) -> Result<LastAccessTouchReport, EngineError> {
512 self.check_thread_alive()?;
513 prepare_touch_last_accessed(&request, self.provenance_mode)?;
514 let (reply_tx, reply_rx) = mpsc::channel();
515 self.sender
516 .send(WriteMessage::TouchLastAccessed {
517 request,
518 reply: reply_tx,
519 })
520 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
521
522 recv_with_timeout(&reply_rx)
523 }
524}
525
526#[cfg(not(feature = "tracing"))]
527#[allow(clippy::print_stderr)]
528fn stderr_panic_notice() {
529 eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
530}
531
532impl Drop for WriterActor {
533 fn drop(&mut self) {
534 unsafe { ManuallyDrop::drop(&mut self.sender) };
541
542 if let Some(handle) = self.thread_handle.take() {
544 match handle.join() {
545 Ok(()) => {}
546 Err(payload) => {
547 if std::thread::panicking() {
548 trace_warn!(
549 "writer thread panicked during shutdown (suppressed: already panicking)"
550 );
551 #[cfg(not(feature = "tracing"))]
552 stderr_panic_notice();
553 } else {
554 std::panic::resume_unwind(payload);
555 }
556 }
557 }
558 }
559 }
560}
561
562fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
564 rx.recv_timeout(WRITER_REPLY_TIMEOUT)
565 .map_err(|error| match error {
566 mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
567 "write timed out waiting for writer thread reply — the write may still commit"
568 .to_owned(),
569 ),
570 mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
571 })
572 .and_then(|result| result)
573}
574
575fn prepare_touch_last_accessed(
576 request: &LastAccessTouchRequest,
577 mode: ProvenanceMode,
578) -> Result<(), EngineError> {
579 if request.logical_ids.is_empty() {
580 return Err(EngineError::InvalidWrite(
581 "touch_last_accessed requires at least one logical_id".to_owned(),
582 ));
583 }
584 for logical_id in &request.logical_ids {
585 if logical_id.trim().is_empty() {
586 return Err(EngineError::InvalidWrite(
587 "touch_last_accessed requires non-empty logical_ids".to_owned(),
588 ));
589 }
590 }
591 if mode == ProvenanceMode::Require && request.source_ref.is_none() {
592 return Err(EngineError::InvalidWrite(
593 "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
594 .to_owned(),
595 ));
596 }
597 Ok(())
598}
599
600fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
601 let missing: Vec<String> = request
602 .nodes
603 .iter()
604 .filter(|n| n.source_ref.is_none())
605 .map(|n| format!("node '{}'", n.logical_id))
606 .chain(
607 request
608 .node_retires
609 .iter()
610 .filter(|r| r.source_ref.is_none())
611 .map(|r| format!("node retire '{}'", r.logical_id)),
612 )
613 .chain(
614 request
615 .edges
616 .iter()
617 .filter(|e| e.source_ref.is_none())
618 .map(|e| format!("edge '{}'", e.logical_id)),
619 )
620 .chain(
621 request
622 .edge_retires
623 .iter()
624 .filter(|r| r.source_ref.is_none())
625 .map(|r| format!("edge retire '{}'", r.logical_id)),
626 )
627 .chain(
628 request
629 .runs
630 .iter()
631 .filter(|r| r.source_ref.is_none())
632 .map(|r| format!("run '{}'", r.id)),
633 )
634 .chain(
635 request
636 .steps
637 .iter()
638 .filter(|s| s.source_ref.is_none())
639 .map(|s| format!("step '{}'", s.id)),
640 )
641 .chain(
642 request
643 .actions
644 .iter()
645 .filter(|a| a.source_ref.is_none())
646 .map(|a| format!("action '{}'", a.id)),
647 )
648 .chain(
649 request
650 .operational_writes
651 .iter()
652 .filter(|write| operational_write_source_ref(write).is_none())
653 .map(|write| {
654 format!(
655 "operational {} '{}:{}'",
656 operational_write_kind(write),
657 operational_write_collection(write),
658 operational_write_record_key(write)
659 )
660 }),
661 )
662 .collect();
663
664 if missing.is_empty() {
665 Ok(())
666 } else {
667 Err(EngineError::InvalidWrite(format!(
668 "ProvenanceMode::Require: missing source_ref on: {}",
669 missing.join(", ")
670 )))
671 }
672}
673
674fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
675 if request.nodes.len() > MAX_NODES {
676 return Err(EngineError::InvalidWrite(format!(
677 "too many nodes: {} exceeds limit of {MAX_NODES}",
678 request.nodes.len()
679 )));
680 }
681 if request.edges.len() > MAX_EDGES {
682 return Err(EngineError::InvalidWrite(format!(
683 "too many edges: {} exceeds limit of {MAX_EDGES}",
684 request.edges.len()
685 )));
686 }
687 if request.chunks.len() > MAX_CHUNKS {
688 return Err(EngineError::InvalidWrite(format!(
689 "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
690 request.chunks.len()
691 )));
692 }
693 let retires = request.node_retires.len() + request.edge_retires.len();
694 if retires > MAX_RETIRES {
695 return Err(EngineError::InvalidWrite(format!(
696 "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
697 )));
698 }
699 let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
700 if runtime_items > MAX_RUNTIME_ITEMS {
701 return Err(EngineError::InvalidWrite(format!(
702 "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
703 )));
704 }
705 if request.operational_writes.len() > MAX_OPERATIONAL {
706 return Err(EngineError::InvalidWrite(format!(
707 "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
708 request.operational_writes.len()
709 )));
710 }
711 let total = request.nodes.len()
712 + request.node_retires.len()
713 + request.edges.len()
714 + request.edge_retires.len()
715 + request.chunks.len()
716 + request.runs.len()
717 + request.steps.len()
718 + request.actions.len()
719 + request.vec_inserts.len()
720 + request.operational_writes.len();
721 if total > MAX_TOTAL_ITEMS {
722 return Err(EngineError::InvalidWrite(format!(
723 "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
724 )));
725 }
726 Ok(())
727}
728
729#[allow(clippy::too_many_lines)]
730fn prepare_write(
731 request: WriteRequest,
732 mode: ProvenanceMode,
733) -> Result<PreparedWrite, EngineError> {
734 validate_request_size(&request)?;
735
736 for node in &request.nodes {
738 if node.row_id.is_empty() {
739 return Err(EngineError::InvalidWrite(
740 "NodeInsert has empty row_id".to_owned(),
741 ));
742 }
743 if node.logical_id.is_empty() {
744 return Err(EngineError::InvalidWrite(
745 "NodeInsert has empty logical_id".to_owned(),
746 ));
747 }
748 }
749 for edge in &request.edges {
750 if edge.row_id.is_empty() {
751 return Err(EngineError::InvalidWrite(
752 "EdgeInsert has empty row_id".to_owned(),
753 ));
754 }
755 if edge.logical_id.is_empty() {
756 return Err(EngineError::InvalidWrite(
757 "EdgeInsert has empty logical_id".to_owned(),
758 ));
759 }
760 }
761 for chunk in &request.chunks {
762 if chunk.id.is_empty() {
763 return Err(EngineError::InvalidWrite(
764 "ChunkInsert has empty id".to_owned(),
765 ));
766 }
767 if chunk.text_content.is_empty() {
768 return Err(EngineError::InvalidWrite(format!(
769 "chunk '{}' has empty text_content; empty chunks are not allowed",
770 chunk.id
771 )));
772 }
773 }
774 for run in &request.runs {
775 if run.id.is_empty() {
776 return Err(EngineError::InvalidWrite(
777 "RunInsert has empty id".to_owned(),
778 ));
779 }
780 }
781 for step in &request.steps {
782 if step.id.is_empty() {
783 return Err(EngineError::InvalidWrite(
784 "StepInsert has empty id".to_owned(),
785 ));
786 }
787 }
788 for action in &request.actions {
789 if action.id.is_empty() {
790 return Err(EngineError::InvalidWrite(
791 "ActionInsert has empty id".to_owned(),
792 ));
793 }
794 }
795 for vi in &request.vec_inserts {
796 if vi.chunk_id.is_empty() {
797 return Err(EngineError::InvalidWrite(
798 "VecInsert has empty chunk_id".to_owned(),
799 ));
800 }
801 if vi.embedding.is_empty() {
802 return Err(EngineError::InvalidWrite(format!(
803 "VecInsert for chunk '{}' has empty embedding",
804 vi.chunk_id
805 )));
806 }
807 }
808 for operational in &request.operational_writes {
809 if operational_write_collection(operational).is_empty() {
810 return Err(EngineError::InvalidWrite(
811 "OperationalWrite has empty collection".to_owned(),
812 ));
813 }
814 if operational_write_record_key(operational).is_empty() {
815 return Err(EngineError::InvalidWrite(format!(
816 "OperationalWrite for collection '{}' has empty record_key",
817 operational_write_collection(operational)
818 )));
819 }
820 match operational {
821 OperationalWrite::Append { payload_json, .. }
822 | OperationalWrite::Put { payload_json, .. } => {
823 if payload_json.is_empty() {
824 return Err(EngineError::InvalidWrite(format!(
825 "OperationalWrite {} '{}:{}' has empty payload_json",
826 operational_write_kind(operational),
827 operational_write_collection(operational),
828 operational_write_record_key(operational)
829 )));
830 }
831 }
832 OperationalWrite::Delete { .. } => {}
833 }
834 }
835
836 {
838 let mut seen = std::collections::HashSet::new();
839 for node in &request.nodes {
840 if !seen.insert(node.row_id.as_str()) {
841 return Err(EngineError::InvalidWrite(format!(
842 "duplicate row_id '{}' within the same WriteRequest",
843 node.row_id
844 )));
845 }
846 }
847 for edge in &request.edges {
848 if !seen.insert(edge.row_id.as_str()) {
849 return Err(EngineError::InvalidWrite(format!(
850 "duplicate row_id '{}' within the same WriteRequest",
851 edge.row_id
852 )));
853 }
854 }
855 }
856
857 if mode == ProvenanceMode::Require {
859 check_require_provenance(&request)?;
860 }
861
862 for run in &request.runs {
864 if run.upsert && run.supersedes_id.is_none() {
865 return Err(EngineError::InvalidWrite(format!(
866 "run '{}': upsert=true requires supersedes_id to be set",
867 run.id
868 )));
869 }
870 }
871 for step in &request.steps {
872 if step.upsert && step.supersedes_id.is_none() {
873 return Err(EngineError::InvalidWrite(format!(
874 "step '{}': upsert=true requires supersedes_id to be set",
875 step.id
876 )));
877 }
878 }
879 for action in &request.actions {
880 if action.upsert && action.supersedes_id.is_none() {
881 return Err(EngineError::InvalidWrite(format!(
882 "action '{}': upsert=true requires supersedes_id to be set",
883 action.id
884 )));
885 }
886 }
887
888 let node_kinds = request
889 .nodes
890 .iter()
891 .map(|node| (node.logical_id.clone(), node.kind.clone()))
892 .collect::<HashMap<_, _>>();
893
894 Ok(PreparedWrite {
895 label: request.label,
896 nodes: request.nodes,
897 node_retires: request.node_retires,
898 edges: request.edges,
899 edge_retires: request.edge_retires,
900 chunks: request.chunks,
901 runs: request.runs,
902 steps: request.steps,
903 actions: request.actions,
904 vec_inserts: request.vec_inserts,
905 operational_writes: request.operational_writes,
906 operational_collection_kinds: HashMap::new(),
907 operational_collection_filter_fields: HashMap::new(),
908 operational_validation_warnings: Vec::new(),
909 node_kinds,
910 required_fts_rows: Vec::new(),
911 required_property_fts_rows: Vec::new(),
912 optional_backfills: request.optional_backfills,
913 })
914}
915
916fn writer_loop(
917 database_path: &Path,
918 schema_manager: &Arc<SchemaManager>,
919 receiver: mpsc::Receiver<WriteMessage>,
920 telemetry: &TelemetryCounters,
921) {
922 trace_info!("writer thread started");
923
924 let mut conn = match sqlite::open_connection(database_path) {
925 Ok(conn) => conn,
926 Err(error) => {
927 trace_error!(error = %error, "writer thread: database connection failed");
928 reject_all(receiver, &error.to_string());
929 return;
930 }
931 };
932
933 if let Err(error) = schema_manager.bootstrap(&conn) {
934 trace_error!(error = %error, "writer thread: schema bootstrap failed");
935 reject_all(receiver, &error.to_string());
936 return;
937 }
938
939 for message in receiver {
940 match message {
941 WriteMessage::Submit {
942 mut prepared,
943 reply,
944 } => {
945 #[cfg(feature = "tracing")]
946 let start = std::time::Instant::now();
947 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
948 resolve_and_apply(&mut conn, &mut prepared)
949 }));
950 if let Ok(inner) = result {
951 #[allow(unused_variables)]
952 if let Err(error) = &inner {
953 trace_error!(
954 label = %prepared.label,
955 error = %error,
956 "write failed"
957 );
958 telemetry.increment_errors();
959 } else {
960 let row_count = (prepared.nodes.len()
961 + prepared.edges.len()
962 + prepared.chunks.len()) as u64;
963 telemetry.increment_writes(row_count);
964 trace_info!(
965 label = %prepared.label,
966 nodes = prepared.nodes.len(),
967 edges = prepared.edges.len(),
968 chunks = prepared.chunks.len(),
969 duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
970 "write committed"
971 );
972 }
973 let _ = reply.send(inner);
974 } else {
975 trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
976 telemetry.increment_errors();
977 let _ = conn.execute_batch("ROLLBACK");
979 let _ = reply.send(Err(EngineError::WriterRejected(
980 "writer thread panic during resolve_and_apply".to_owned(),
981 )));
982 }
983 }
984 WriteMessage::TouchLastAccessed { request, reply } => {
985 let result = apply_touch_last_accessed(&mut conn, &request);
986 if result.is_ok() {
987 telemetry.increment_writes(0);
988 } else {
989 telemetry.increment_errors();
990 }
991 let _ = reply.send(result);
992 }
993 }
994 }
995
996 trace_info!("writer thread shutting down");
997}
998
999fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
1000 for message in receiver {
1001 match message {
1002 WriteMessage::Submit { reply, .. } => {
1003 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1004 }
1005 WriteMessage::TouchLastAccessed { reply, .. } => {
1006 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1007 }
1008 }
1009 }
1010}
1011
1012fn resolve_fts_rows(
1020 conn: &rusqlite::Connection,
1021 prepared: &mut PreparedWrite,
1022) -> Result<(), EngineError> {
1023 let retiring_ids: std::collections::HashSet<&str> = prepared
1024 .node_retires
1025 .iter()
1026 .map(|r| r.logical_id.as_str())
1027 .collect();
1028 for chunk in &prepared.chunks {
1029 if retiring_ids.contains(chunk.node_logical_id.as_str()) {
1030 return Err(EngineError::InvalidWrite(format!(
1031 "chunk '{}' references node_logical_id '{}' which is being retired in the same \
1032 WriteRequest; retire and chunk insertion for the same node must not be combined",
1033 chunk.id, chunk.node_logical_id
1034 )));
1035 }
1036 }
1037 for chunk in &prepared.chunks {
1038 let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
1039 k.clone()
1040 } else {
1041 match conn.query_row(
1042 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1043 params![chunk.node_logical_id],
1044 |row| row.get::<_, String>(0),
1045 ) {
1046 Ok(kind) => kind,
1047 Err(rusqlite::Error::QueryReturnedNoRows) => {
1048 return Err(EngineError::InvalidWrite(format!(
1049 "chunk '{}' references node_logical_id '{}' that is not present in this \
1050 write request or the database \
1051 (v1 limitation: chunks and their nodes must be submitted together or the \
1052 node must already exist)",
1053 chunk.id, chunk.node_logical_id
1054 )));
1055 }
1056 Err(e) => return Err(EngineError::Sqlite(e)),
1057 }
1058 };
1059 prepared.required_fts_rows.push(FtsProjectionRow {
1060 chunk_id: chunk.id.clone(),
1061 node_logical_id: chunk.node_logical_id.clone(),
1062 kind,
1063 text_content: chunk.text_content.clone(),
1064 });
1065 }
1066 trace_debug!(
1067 fts_rows = prepared.required_fts_rows.len(),
1068 chunks_processed = prepared.chunks.len(),
1069 "fts row resolution completed"
1070 );
1071 Ok(())
1072}
1073
1074fn resolve_property_fts_rows(
1077 conn: &rusqlite::Connection,
1078 prepared: &mut PreparedWrite,
1079) -> Result<(), EngineError> {
1080 if prepared.nodes.is_empty() {
1081 return Ok(());
1082 }
1083
1084 let schemas: HashMap<String, PropertyFtsSchema> =
1085 load_fts_property_schemas(conn)?.into_iter().collect();
1086
1087 if schemas.is_empty() {
1088 return Ok(());
1089 }
1090
1091 let mut combined_stats = ExtractStats::default();
1092 for node in &prepared.nodes {
1093 let Some(schema) = schemas.get(&node.kind) else {
1094 continue;
1095 };
1096 let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
1097 let (text_content, positions, stats) = extract_property_fts(&props, schema);
1098 combined_stats.merge(stats);
1099 if let Some(text_content) = text_content {
1100 prepared
1101 .required_property_fts_rows
1102 .push(FtsPropertyProjectionRow {
1103 node_logical_id: node.logical_id.clone(),
1104 kind: node.kind.clone(),
1105 text_content,
1106 positions,
1107 });
1108 }
1109 }
1110 if combined_stats != ExtractStats::default() {
1111 trace_debug!(
1112 depth_cap_hit = combined_stats.depth_cap_hit,
1113 byte_cap_reached = combined_stats.byte_cap_reached,
1114 excluded_subtree = combined_stats.excluded_subtree,
1115 "property fts recursive extraction guardrails engaged"
1116 );
1117 }
1118 trace_debug!(
1119 property_fts_rows = prepared.required_property_fts_rows.len(),
1120 nodes_processed = prepared.nodes.len(),
1121 "property fts row resolution completed"
1122 );
1123 Ok(())
1124}
1125
1126pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
1130 let Some(path) = path.strip_prefix("$.") else {
1131 return Vec::new();
1132 };
1133 let mut current = value;
1134 for segment in path.split('.') {
1135 match current.get(segment) {
1136 Some(v) => current = v,
1137 None => return Vec::new(),
1138 }
1139 }
1140 match current {
1141 serde_json::Value::String(s) => vec![s.clone()],
1142 serde_json::Value::Number(n) => vec![n.to_string()],
1143 serde_json::Value::Bool(b) => vec![b.to_string()],
1144 serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
1145 serde_json::Value::Array(arr) => arr
1146 .iter()
1147 .filter_map(|v| match v {
1148 serde_json::Value::String(s) => Some(s.clone()),
1149 serde_json::Value::Number(n) => Some(n.to_string()),
1150 serde_json::Value::Bool(b) => Some(b.to_string()),
1151 _ => None,
1152 })
1153 .collect(),
1154 }
1155}
1156
1157pub(crate) fn extract_property_fts(
1177 props: &serde_json::Value,
1178 schema: &PropertyFtsSchema,
1179) -> (Option<String>, Vec<PositionEntry>, ExtractStats) {
1180 let mut walker = RecursiveWalker {
1181 blob: String::new(),
1182 positions: Vec::new(),
1183 stats: ExtractStats::default(),
1184 exclude_paths: schema.exclude_paths.clone(),
1185 stopped: false,
1186 };
1187
1188 let mut scalar_parts: Vec<String> = Vec::new();
1189
1190 for entry in &schema.paths {
1191 match entry.mode {
1192 PropertyPathMode::Scalar => {
1193 scalar_parts.extend(extract_json_path(props, &entry.path));
1194 }
1195 PropertyPathMode::Recursive => {
1196 let root = resolve_path_root(props, &entry.path);
1197 if let Some(root) = root {
1198 walker.walk(&entry.path, root, 0);
1199 }
1200 }
1201 }
1202 }
1203
1204 let scalar_text = if scalar_parts.is_empty() {
1210 None
1211 } else {
1212 Some(scalar_parts.join(&schema.separator))
1213 };
1214
1215 let combined = match (scalar_text, walker.blob.is_empty()) {
1216 (None, true) => None,
1217 (None, false) => Some(walker.blob.clone()),
1218 (Some(s), true) => Some(s),
1219 (Some(mut s), false) => {
1220 let offset = s.len() + LEAF_SEPARATOR.len();
1223 for pos in &mut walker.positions {
1224 pos.start_offset += offset;
1225 pos.end_offset += offset;
1226 }
1227 s.push_str(LEAF_SEPARATOR);
1228 s.push_str(&walker.blob);
1229 Some(s)
1230 }
1231 };
1232
1233 (combined, walker.positions, walker.stats)
1234}
1235
1236fn resolve_path_root<'a>(
1237 value: &'a serde_json::Value,
1238 path: &str,
1239) -> Option<&'a serde_json::Value> {
1240 let stripped = path.strip_prefix("$.")?;
1241 let mut current = value;
1242 for segment in stripped.split('.') {
1243 current = current.get(segment)?;
1244 }
1245 Some(current)
1246}
1247
1248struct RecursiveWalker {
1249 blob: String,
1250 positions: Vec<PositionEntry>,
1251 stats: ExtractStats,
1252 exclude_paths: Vec<String>,
1253 stopped: bool,
1257}
1258
1259impl RecursiveWalker {
1260 fn walk(&mut self, current_path: &str, value: &serde_json::Value, depth: usize) {
1261 if self.stopped {
1262 return;
1263 }
1264 if self.exclude_paths.iter().any(|p| p == current_path) {
1265 self.stats.excluded_subtree += 1;
1266 return;
1267 }
1268 match value {
1269 serde_json::Value::String(s) => self.emit_leaf(current_path, s),
1270 serde_json::Value::Number(n) => self.emit_leaf(current_path, &n.to_string()),
1271 serde_json::Value::Bool(b) => self.emit_leaf(current_path, &b.to_string()),
1272 serde_json::Value::Null => {}
1273 serde_json::Value::Object(map) => {
1274 if depth >= MAX_RECURSIVE_DEPTH {
1275 self.stats.depth_cap_hit += 1;
1276 return;
1277 }
1278 let mut keys: Vec<&String> = map.keys().collect();
1279 keys.sort();
1280 for key in keys {
1281 if self.stopped {
1282 return;
1283 }
1284 let child_path = format!("{current_path}.{key}");
1285 if let Some(child) = map.get(key) {
1286 self.walk(&child_path, child, depth + 1);
1287 }
1288 }
1289 }
1290 serde_json::Value::Array(items) => {
1291 if depth >= MAX_RECURSIVE_DEPTH {
1292 self.stats.depth_cap_hit += 1;
1293 return;
1294 }
1295 for (idx, item) in items.iter().enumerate() {
1296 if self.stopped {
1297 return;
1298 }
1299 let child_path = format!("{current_path}[{idx}]");
1300 self.walk(&child_path, item, depth + 1);
1301 }
1302 }
1303 }
1304 }
1305
1306 fn emit_leaf(&mut self, leaf_path: &str, value: &str) {
1307 if self.stopped {
1308 return;
1309 }
1310 if value.is_empty() {
1311 return;
1312 }
1313 let sep_len = if self.blob.is_empty() {
1317 0
1318 } else {
1319 LEAF_SEPARATOR.len()
1320 };
1321 let projected_len = self.blob.len() + sep_len + value.len();
1322 if projected_len > MAX_EXTRACTED_BYTES {
1323 self.stats.byte_cap_reached = true;
1324 self.stopped = true;
1325 return;
1326 }
1327 if !self.blob.is_empty() {
1328 self.blob.push_str(LEAF_SEPARATOR);
1329 }
1330 let start_offset = self.blob.len();
1331 self.blob.push_str(value);
1332 let end_offset = self.blob.len();
1333 self.positions.push(PositionEntry {
1334 start_offset,
1335 end_offset,
1336 leaf_path: leaf_path.to_owned(),
1337 });
1338 }
1339}
1340
1341pub(crate) fn load_fts_property_schemas(
1347 conn: &rusqlite::Connection,
1348) -> Result<Vec<(String, PropertyFtsSchema)>, rusqlite::Error> {
1349 let mut stmt =
1350 conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
1351 stmt.query_map([], |row| {
1352 let kind: String = row.get(0)?;
1353 let paths_json: String = row.get(1)?;
1354 let separator: String = row.get(2)?;
1355 let schema = parse_property_schema_json(&paths_json, &separator);
1356 Ok((kind, schema))
1357 })?
1358 .collect::<Result<Vec<_>, _>>()
1359}
1360
1361pub(crate) fn parse_property_schema_json(paths_json: &str, separator: &str) -> PropertyFtsSchema {
1362 let value: serde_json::Value = serde_json::from_str(paths_json).unwrap_or_default();
1363 let mut paths = Vec::new();
1364 let mut exclude_paths: Vec<String> = Vec::new();
1365
1366 let path_values: Vec<serde_json::Value> = match value {
1367 serde_json::Value::Array(arr) => arr,
1368 serde_json::Value::Object(map) => {
1369 if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1370 exclude_paths = excl
1371 .iter()
1372 .filter_map(|v| v.as_str().map(str::to_owned))
1373 .collect();
1374 }
1375 match map.get("paths") {
1376 Some(serde_json::Value::Array(arr)) => arr.clone(),
1377 _ => Vec::new(),
1378 }
1379 }
1380 _ => Vec::new(),
1381 };
1382
1383 for entry in path_values {
1384 match entry {
1385 serde_json::Value::String(path) => {
1386 paths.push(PropertyPathEntry::scalar(path));
1387 }
1388 serde_json::Value::Object(map) => {
1389 let Some(path) = map.get("path").and_then(|v| v.as_str()) else {
1390 continue;
1391 };
1392 let mode = map.get("mode").and_then(|v| v.as_str()).map_or(
1393 PropertyPathMode::Scalar,
1394 |m| match m {
1395 "recursive" => PropertyPathMode::Recursive,
1396 _ => PropertyPathMode::Scalar,
1397 },
1398 );
1399 paths.push(PropertyPathEntry {
1400 path: path.to_owned(),
1401 mode,
1402 });
1403 if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1404 for p in excl {
1405 if let Some(s) = p.as_str() {
1406 exclude_paths.push(s.to_owned());
1407 }
1408 }
1409 }
1410 }
1411 _ => {}
1412 }
1413 }
1414
1415 PropertyFtsSchema {
1416 paths,
1417 separator: separator.to_owned(),
1418 exclude_paths,
1419 }
1420}
1421
1422fn resolve_operational_writes(
1423 conn: &rusqlite::Connection,
1424 prepared: &mut PreparedWrite,
1425) -> Result<(), EngineError> {
1426 let mut collection_kinds = HashMap::new();
1427 let mut collection_filter_fields = HashMap::new();
1428 let mut collection_validation_contracts = HashMap::new();
1429 for write in &prepared.operational_writes {
1430 let collection = operational_write_collection(write);
1431 if !collection_kinds.contains_key(collection) {
1432 let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1433 .query_row(
1434 "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1435 params![collection],
1436 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1437 )
1438 .optional()
1439 .map_err(EngineError::Sqlite)?;
1440 let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1441 .ok_or_else(|| {
1442 EngineError::InvalidWrite(format!(
1443 "operational collection '{collection}' is not registered"
1444 ))
1445 })?;
1446 if disabled_at.is_some() {
1447 return Err(EngineError::InvalidWrite(format!(
1448 "operational collection '{collection}' is disabled"
1449 )));
1450 }
1451 let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1452 .map_err(EngineError::InvalidWrite)?;
1453 let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1454 let validation_contract = parse_operational_validation_contract(&validation_json)
1455 .map_err(EngineError::InvalidWrite)?;
1456 collection_kinds.insert(collection.to_owned(), kind);
1457 collection_filter_fields.insert(collection.to_owned(), filter_fields);
1458 collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1459 }
1460
1461 let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1462 EngineError::InvalidWrite("missing operational collection kind".to_owned())
1463 })?;
1464 match (kind, write) {
1465 (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1466 | (
1467 OperationalCollectionKind::LatestState,
1468 OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1469 ) => {}
1470 (OperationalCollectionKind::AppendOnlyLog, _) => {
1471 return Err(EngineError::InvalidWrite(format!(
1472 "operational collection '{collection}' is append_only_log and only accepts Append"
1473 )));
1474 }
1475 (OperationalCollectionKind::LatestState, _) => {
1476 return Err(EngineError::InvalidWrite(format!(
1477 "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1478 )));
1479 }
1480 }
1481 if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1482 let _ = check_operational_write_against_contract(write, contract)?;
1483 }
1484 }
1485 prepared.operational_collection_kinds = collection_kinds;
1486 prepared.operational_collection_filter_fields = collection_filter_fields;
1487 Ok(())
1488}
1489
1490fn parse_operational_filter_fields(
1491 filter_fields_json: &str,
1492) -> Result<Vec<OperationalFilterField>, EngineError> {
1493 let fields: Vec<OperationalFilterField> =
1494 serde_json::from_str(filter_fields_json).map_err(|error| {
1495 EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1496 })?;
1497 let mut seen = std::collections::HashSet::new();
1498 for field in &fields {
1499 if field.name.trim().is_empty() {
1500 return Err(EngineError::InvalidWrite(
1501 "filter_fields_json field names must not be empty".to_owned(),
1502 ));
1503 }
1504 if !seen.insert(field.name.as_str()) {
1505 return Err(EngineError::InvalidWrite(format!(
1506 "filter_fields_json contains duplicate field '{}'",
1507 field.name
1508 )));
1509 }
1510 if field.modes.is_empty() {
1511 return Err(EngineError::InvalidWrite(format!(
1512 "filter_fields_json field '{}' must declare at least one mode",
1513 field.name
1514 )));
1515 }
1516 if field.modes.contains(&OperationalFilterMode::Prefix)
1517 && field.field_type != OperationalFilterFieldType::String
1518 {
1519 return Err(EngineError::InvalidWrite(format!(
1520 "filter field '{}' only supports prefix for string types",
1521 field.name
1522 )));
1523 }
1524 }
1525 Ok(fields)
1526}
1527
1528#[derive(Clone, Debug, PartialEq, Eq)]
1529struct OperationalFilterValueRow {
1530 field_name: String,
1531 string_value: Option<String>,
1532 integer_value: Option<i64>,
1533}
1534
1535fn extract_operational_filter_values(
1536 filter_fields: &[OperationalFilterField],
1537 payload_json: &str,
1538) -> Vec<OperationalFilterValueRow> {
1539 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1540 return Vec::new();
1541 };
1542 let Some(object) = parsed.as_object() else {
1543 return Vec::new();
1544 };
1545
1546 filter_fields
1547 .iter()
1548 .filter_map(|field| {
1549 let value = object.get(&field.name)?;
1550 match field.field_type {
1551 OperationalFilterFieldType::String => {
1552 value
1553 .as_str()
1554 .map(|string_value| OperationalFilterValueRow {
1555 field_name: field.name.clone(),
1556 string_value: Some(string_value.to_owned()),
1557 integer_value: None,
1558 })
1559 }
1560 OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1561 value
1562 .as_i64()
1563 .map(|integer_value| OperationalFilterValueRow {
1564 field_name: field.name.clone(),
1565 string_value: None,
1566 integer_value: Some(integer_value),
1567 })
1568 }
1569 }
1570 })
1571 .collect()
1572}
1573
1574fn resolve_and_apply(
1575 conn: &mut rusqlite::Connection,
1576 prepared: &mut PreparedWrite,
1577) -> Result<WriteReceipt, EngineError> {
1578 resolve_fts_rows(conn, prepared)?;
1579 resolve_property_fts_rows(conn, prepared)?;
1580 resolve_operational_writes(conn, prepared)?;
1581 apply_write(conn, prepared)
1582}
1583
1584fn apply_touch_last_accessed(
1585 conn: &mut rusqlite::Connection,
1586 request: &LastAccessTouchRequest,
1587) -> Result<LastAccessTouchReport, EngineError> {
1588 let mut seen = std::collections::HashSet::new();
1589 let logical_ids = request
1590 .logical_ids
1591 .iter()
1592 .filter(|logical_id| seen.insert(logical_id.as_str()))
1593 .cloned()
1594 .collect::<Vec<_>>();
1595 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1596
1597 for logical_id in &logical_ids {
1598 let exists = tx
1599 .query_row(
1600 "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1601 params![logical_id],
1602 |row| row.get::<_, i64>(0),
1603 )
1604 .optional()?
1605 .is_some();
1606 if !exists {
1607 return Err(EngineError::InvalidWrite(format!(
1608 "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1609 )));
1610 }
1611 }
1612
1613 {
1614 let mut upsert_metadata = tx.prepare_cached(
1615 "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1616 VALUES (?1, ?2, ?2) \
1617 ON CONFLICT(logical_id) DO UPDATE SET \
1618 last_accessed_at = excluded.last_accessed_at, \
1619 updated_at = excluded.updated_at",
1620 )?;
1621 let mut insert_provenance = tx.prepare_cached(
1622 "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1623 VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1624 )?;
1625 for logical_id in &logical_ids {
1626 upsert_metadata.execute(params![logical_id, request.touched_at])?;
1627 insert_provenance.execute(params![
1628 new_id(),
1629 logical_id,
1630 request.source_ref.as_deref(),
1631 format!("{{\"touched_at\":{}}}", request.touched_at),
1632 ])?;
1633 }
1634 }
1635
1636 tx.commit()?;
1637 Ok(LastAccessTouchReport {
1638 touched_logical_ids: logical_ids.len(),
1639 touched_at: request.touched_at,
1640 })
1641}
1642
1643fn ensure_operational_collections_writable(
1644 tx: &rusqlite::Transaction<'_>,
1645 prepared: &PreparedWrite,
1646) -> Result<(), EngineError> {
1647 for collection in prepared.operational_collection_kinds.keys() {
1648 let disabled_at: Option<Option<i64>> = tx
1649 .query_row(
1650 "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1651 params![collection],
1652 |row| row.get::<_, Option<i64>>(0),
1653 )
1654 .optional()?;
1655 match disabled_at {
1656 Some(Some(_)) => {
1657 return Err(EngineError::InvalidWrite(format!(
1658 "operational collection '{collection}' is disabled"
1659 )));
1660 }
1661 Some(None) => {}
1662 None => {
1663 return Err(EngineError::InvalidWrite(format!(
1664 "operational collection '{collection}' is not registered"
1665 )));
1666 }
1667 }
1668 }
1669 Ok(())
1670}
1671
1672fn validate_operational_writes_against_live_contracts(
1673 tx: &rusqlite::Transaction<'_>,
1674 prepared: &PreparedWrite,
1675) -> Result<Vec<String>, EngineError> {
1676 let mut collection_validation_contracts =
1677 HashMap::<String, Option<OperationalValidationContract>>::new();
1678 for collection in prepared.operational_collection_kinds.keys() {
1679 let validation_json: String = tx
1680 .query_row(
1681 "SELECT validation_json FROM operational_collections WHERE name = ?1",
1682 params![collection],
1683 |row| row.get(0),
1684 )
1685 .map_err(EngineError::Sqlite)?;
1686 let validation_contract = parse_operational_validation_contract(&validation_json)
1687 .map_err(EngineError::InvalidWrite)?;
1688 collection_validation_contracts.insert(collection.clone(), validation_contract);
1689 }
1690
1691 let mut warnings = Vec::new();
1692 for write in &prepared.operational_writes {
1693 if let Some(Some(contract)) =
1694 collection_validation_contracts.get(operational_write_collection(write))
1695 && let Some(warning) = check_operational_write_against_contract(write, contract)?
1696 {
1697 warnings.push(warning);
1698 }
1699 }
1700
1701 Ok(warnings)
1702}
1703
1704fn load_live_operational_secondary_indexes(
1705 tx: &rusqlite::Transaction<'_>,
1706 prepared: &PreparedWrite,
1707) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1708 let mut collection_indexes = HashMap::new();
1709 for (collection, collection_kind) in &prepared.operational_collection_kinds {
1710 let secondary_indexes_json: String = tx
1711 .query_row(
1712 "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1713 params![collection],
1714 |row| row.get(0),
1715 )
1716 .map_err(EngineError::Sqlite)?;
1717 let indexes =
1718 parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1719 .map_err(EngineError::InvalidWrite)?;
1720 collection_indexes.insert(collection.clone(), indexes);
1721 }
1722 Ok(collection_indexes)
1723}
1724
1725fn check_operational_write_against_contract(
1726 write: &OperationalWrite,
1727 contract: &OperationalValidationContract,
1728) -> Result<Option<String>, EngineError> {
1729 if contract.mode == OperationalValidationMode::Disabled {
1730 return Ok(None);
1731 }
1732
1733 let (payload_json, collection, record_key) = match write {
1734 OperationalWrite::Append {
1735 collection,
1736 record_key,
1737 payload_json,
1738 ..
1739 }
1740 | OperationalWrite::Put {
1741 collection,
1742 record_key,
1743 payload_json,
1744 ..
1745 } => (
1746 payload_json.as_str(),
1747 collection.as_str(),
1748 record_key.as_str(),
1749 ),
1750 OperationalWrite::Delete { .. } => return Ok(None),
1751 };
1752
1753 match validate_operational_payload_against_contract(contract, payload_json) {
1754 Ok(()) => Ok(None),
1755 Err(message) => match contract.mode {
1756 OperationalValidationMode::Disabled => Ok(None),
1757 OperationalValidationMode::ReportOnly => Ok(Some(format!(
1758 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1759 kind = operational_write_kind(write)
1760 ))),
1761 OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1762 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1763 kind = operational_write_kind(write)
1764 ))),
1765 },
1766 }
1767}
1768
1769#[allow(clippy::too_many_lines)]
1770fn apply_write(
1771 conn: &mut rusqlite::Connection,
1772 prepared: &mut PreparedWrite,
1773) -> Result<WriteReceipt, EngineError> {
1774 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1775
1776 {
1779 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1780 let mut del_prop_fts =
1781 tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1782 let mut del_prop_positions = tx
1783 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1784 let mut del_staging = tx.prepare_cached(
1787 "DELETE FROM fts_property_rebuild_staging WHERE node_logical_id = ?1",
1788 )?;
1789 let mut sup_node = tx.prepare_cached(
1790 "UPDATE nodes SET superseded_at = unixepoch() \
1791 WHERE logical_id = ?1 AND superseded_at IS NULL",
1792 )?;
1793 let mut ins_event = tx.prepare_cached(
1794 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1795 VALUES (?1, 'node_retire', ?2, ?3)",
1796 )?;
1797 for retire in &prepared.node_retires {
1798 del_fts.execute(params![retire.logical_id])?;
1799 del_prop_fts.execute(params![retire.logical_id])?;
1800 del_prop_positions.execute(params![retire.logical_id])?;
1801 del_staging.execute(params![retire.logical_id])?;
1802 sup_node.execute(params![retire.logical_id])?;
1803 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1804 }
1805 }
1806
1807 {
1809 let mut sup_edge = tx.prepare_cached(
1810 "UPDATE edges SET superseded_at = unixepoch() \
1811 WHERE logical_id = ?1 AND superseded_at IS NULL",
1812 )?;
1813 let mut ins_event = tx.prepare_cached(
1814 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1815 VALUES (?1, 'edge_retire', ?2, ?3)",
1816 )?;
1817 for retire in &prepared.edge_retires {
1818 sup_edge.execute(params![retire.logical_id])?;
1819 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1820 }
1821 }
1822
1823 {
1825 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1826 let mut del_prop_fts =
1827 tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1828 let mut del_prop_positions = tx
1829 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1830 let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1831 let mut sup_node = tx.prepare_cached(
1832 "UPDATE nodes SET superseded_at = unixepoch() \
1833 WHERE logical_id = ?1 AND superseded_at IS NULL",
1834 )?;
1835 let mut ins_node = tx.prepare_cached(
1836 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1837 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1838 )?;
1839 #[cfg(feature = "sqlite-vec")]
1840 let vec_del_sql2 = "DELETE FROM vec_nodes_active WHERE chunk_id IN \
1841 (SELECT id FROM chunks WHERE node_logical_id = ?1)";
1842 #[cfg(feature = "sqlite-vec")]
1843 let mut del_vec = match tx.prepare_cached(vec_del_sql2) {
1844 Ok(stmt) => Some(stmt),
1845 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => None,
1846 Err(e) => return Err(e.into()),
1847 };
1848 for node in &prepared.nodes {
1849 if node.upsert {
1850 del_prop_fts.execute(params![node.logical_id])?;
1852 del_prop_positions.execute(params![node.logical_id])?;
1853 if node.chunk_policy == ChunkPolicy::Replace {
1854 #[cfg(feature = "sqlite-vec")]
1855 if let Some(ref mut stmt) = del_vec {
1856 stmt.execute(params![node.logical_id])?;
1857 }
1858 del_fts.execute(params![node.logical_id])?;
1859 del_chunks.execute(params![node.logical_id])?;
1860 }
1861 sup_node.execute(params![node.logical_id])?;
1862 }
1863 ins_node.execute(params![
1864 node.row_id,
1865 node.logical_id,
1866 node.kind,
1867 node.properties,
1868 node.source_ref,
1869 node.content_ref,
1870 ])?;
1871 }
1872 }
1873
1874 {
1876 let mut sup_edge = tx.prepare_cached(
1877 "UPDATE edges SET superseded_at = unixepoch() \
1878 WHERE logical_id = ?1 AND superseded_at IS NULL",
1879 )?;
1880 let mut ins_edge = tx.prepare_cached(
1881 "INSERT INTO edges \
1882 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1883 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1884 )?;
1885 for edge in &prepared.edges {
1886 if edge.upsert {
1887 sup_edge.execute(params![edge.logical_id])?;
1888 }
1889 ins_edge.execute(params![
1890 edge.row_id,
1891 edge.logical_id,
1892 edge.source_logical_id,
1893 edge.target_logical_id,
1894 edge.kind,
1895 edge.properties,
1896 edge.source_ref,
1897 ])?;
1898 }
1899 }
1900
1901 {
1903 let mut ins_chunk = tx.prepare_cached(
1904 "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
1905 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1906 )?;
1907 for chunk in &prepared.chunks {
1908 ins_chunk.execute(params![
1909 chunk.id,
1910 chunk.node_logical_id,
1911 chunk.text_content,
1912 chunk.byte_start,
1913 chunk.byte_end,
1914 chunk.content_hash,
1915 ])?;
1916 }
1917 }
1918
1919 {
1921 let mut sup_run = tx.prepare_cached(
1922 "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1923 )?;
1924 let mut ins_run = tx.prepare_cached(
1925 "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1926 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1927 )?;
1928 for run in &prepared.runs {
1929 if run.upsert
1930 && let Some(ref prior_id) = run.supersedes_id
1931 {
1932 sup_run.execute(params![prior_id])?;
1933 }
1934 ins_run.execute(params![
1935 run.id,
1936 run.kind,
1937 run.status,
1938 run.properties,
1939 run.source_ref
1940 ])?;
1941 }
1942 }
1943
1944 {
1946 let mut sup_step = tx.prepare_cached(
1947 "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1948 )?;
1949 let mut ins_step = tx.prepare_cached(
1950 "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1951 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1952 )?;
1953 for step in &prepared.steps {
1954 if step.upsert
1955 && let Some(ref prior_id) = step.supersedes_id
1956 {
1957 sup_step.execute(params![prior_id])?;
1958 }
1959 ins_step.execute(params![
1960 step.id,
1961 step.run_id,
1962 step.kind,
1963 step.status,
1964 step.properties,
1965 step.source_ref,
1966 ])?;
1967 }
1968 }
1969
1970 {
1972 let mut sup_action = tx.prepare_cached(
1973 "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1974 )?;
1975 let mut ins_action = tx.prepare_cached(
1976 "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1977 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1978 )?;
1979 for action in &prepared.actions {
1980 if action.upsert
1981 && let Some(ref prior_id) = action.supersedes_id
1982 {
1983 sup_action.execute(params![prior_id])?;
1984 }
1985 ins_action.execute(params![
1986 action.id,
1987 action.step_id,
1988 action.kind,
1989 action.status,
1990 action.properties,
1991 action.source_ref,
1992 ])?;
1993 }
1994 }
1995
1996 {
1998 ensure_operational_collections_writable(&tx, prepared)?;
1999 prepared.operational_validation_warnings =
2000 validate_operational_writes_against_live_contracts(&tx, prepared)?;
2001 let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
2002
2003 let mut next_mutation_order: i64 = tx.query_row(
2004 "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
2005 [],
2006 |row| row.get(0),
2007 )?;
2008 let mut ins_mutation = tx.prepare_cached(
2009 "INSERT INTO operational_mutations \
2010 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2011 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
2012 )?;
2013 let mut ins_filter_value = tx.prepare_cached(
2014 "INSERT INTO operational_filter_values \
2015 (mutation_id, collection_name, field_name, string_value, integer_value) \
2016 VALUES (?1, ?2, ?3, ?4, ?5)",
2017 )?;
2018 let mut upsert_current = tx.prepare_cached(
2019 "INSERT INTO operational_current \
2020 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2021 VALUES (?1, ?2, ?3, unixepoch(), ?4) \
2022 ON CONFLICT(collection_name, record_key) DO UPDATE SET \
2023 payload_json = excluded.payload_json, \
2024 updated_at = excluded.updated_at, \
2025 last_mutation_id = excluded.last_mutation_id",
2026 )?;
2027 let mut del_current = tx.prepare_cached(
2028 "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
2029 )?;
2030 let mut del_current_secondary_indexes = tx.prepare_cached(
2031 "DELETE FROM operational_secondary_index_entries \
2032 WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
2033 )?;
2034 let mut ins_secondary_index = tx.prepare_cached(
2035 "INSERT INTO operational_secondary_index_entries \
2036 (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
2037 slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
2038 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
2039 )?;
2040 let mut current_row_stmt = tx.prepare_cached(
2041 "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
2042 WHERE collection_name = ?1 AND record_key = ?2",
2043 )?;
2044
2045 for write in &prepared.operational_writes {
2046 let collection = operational_write_collection(write);
2047 let record_key = operational_write_record_key(write);
2048 let mutation_id = new_id();
2049 next_mutation_order += 1;
2050 let payload_json = operational_write_payload(write);
2051 ins_mutation.execute(params![
2052 &mutation_id,
2053 collection,
2054 record_key,
2055 operational_write_kind(write),
2056 payload_json,
2057 operational_write_source_ref(write),
2058 next_mutation_order,
2059 ])?;
2060 if let Some(indexes) = collection_secondary_indexes.get(collection) {
2061 for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
2062 ins_secondary_index.execute(params![
2063 collection,
2064 entry.index_name,
2065 "mutation",
2066 &mutation_id,
2067 record_key,
2068 entry.sort_timestamp,
2069 entry.slot1_text,
2070 entry.slot1_integer,
2071 entry.slot2_text,
2072 entry.slot2_integer,
2073 entry.slot3_text,
2074 entry.slot3_integer,
2075 ])?;
2076 }
2077 }
2078 if let Some(filter_fields) = prepared
2079 .operational_collection_filter_fields
2080 .get(collection)
2081 {
2082 for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
2083 ins_filter_value.execute(params![
2084 &mutation_id,
2085 collection,
2086 filter_value.field_name,
2087 filter_value.string_value,
2088 filter_value.integer_value,
2089 ])?;
2090 }
2091 }
2092
2093 if prepared.operational_collection_kinds.get(collection)
2094 == Some(&OperationalCollectionKind::LatestState)
2095 {
2096 del_current_secondary_indexes.execute(params![collection, record_key])?;
2097 match write {
2098 OperationalWrite::Put { payload_json, .. } => {
2099 upsert_current.execute(params![
2100 collection,
2101 record_key,
2102 payload_json,
2103 &mutation_id,
2104 ])?;
2105 if let Some(indexes) = collection_secondary_indexes.get(collection) {
2106 let (current_payload_json, updated_at, last_mutation_id): (
2107 String,
2108 i64,
2109 String,
2110 ) = current_row_stmt
2111 .query_row(params![collection, record_key], |row| {
2112 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
2113 })?;
2114 for entry in extract_secondary_index_entries_for_current(
2115 indexes,
2116 ¤t_payload_json,
2117 updated_at,
2118 ) {
2119 ins_secondary_index.execute(params![
2120 collection,
2121 entry.index_name,
2122 "current",
2123 last_mutation_id.as_str(),
2124 record_key,
2125 entry.sort_timestamp,
2126 entry.slot1_text,
2127 entry.slot1_integer,
2128 entry.slot2_text,
2129 entry.slot2_integer,
2130 entry.slot3_text,
2131 entry.slot3_integer,
2132 ])?;
2133 }
2134 }
2135 }
2136 OperationalWrite::Delete { .. } => {
2137 del_current.execute(params![collection, record_key])?;
2138 }
2139 OperationalWrite::Append { .. } => {}
2140 }
2141 }
2142 }
2143 }
2144
2145 {
2147 let mut ins_fts = tx.prepare_cached(
2148 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
2149 VALUES (?1, ?2, ?3, ?4)",
2150 )?;
2151 for fts_row in &prepared.required_fts_rows {
2152 ins_fts.execute(params![
2153 fts_row.chunk_id,
2154 fts_row.node_logical_id,
2155 fts_row.kind,
2156 fts_row.text_content,
2157 ])?;
2158 }
2159 }
2160
2161 if !prepared.required_property_fts_rows.is_empty() {
2163 let mut ins_prop_fts = tx.prepare_cached(
2164 "INSERT INTO fts_node_properties (node_logical_id, kind, text_content) \
2165 VALUES (?1, ?2, ?3)",
2166 )?;
2167 let mut ins_positions = tx.prepare_cached(
2168 "INSERT INTO fts_node_property_positions \
2169 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
2170 VALUES (?1, ?2, ?3, ?4, ?5)",
2171 )?;
2172 let mut del_positions = tx
2175 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
2176 for row in &prepared.required_property_fts_rows {
2177 del_positions.execute(params![row.node_logical_id])?;
2178 ins_prop_fts.execute(params![row.node_logical_id, row.kind, row.text_content,])?;
2179 for pos in &row.positions {
2180 ins_positions.execute(params![
2181 row.node_logical_id,
2182 row.kind,
2183 i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
2184 i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
2185 pos.leaf_path,
2186 ])?;
2187 }
2188 }
2189 }
2190
2191 if !prepared.required_property_fts_rows.is_empty() {
2196 let mut ins_staging = tx.prepare_cached(
2197 "INSERT INTO fts_property_rebuild_staging \
2198 (kind, node_logical_id, text_content) \
2199 VALUES (?1, ?2, ?3) \
2200 ON CONFLICT(kind, node_logical_id) DO UPDATE \
2201 SET text_content = excluded.text_content",
2202 )?;
2203 let mut check_rebuild = tx.prepare_cached(
2204 "SELECT 1 FROM fts_property_rebuild_state \
2205 WHERE kind = ?1 AND state IN ('PENDING','BUILDING','SWAPPING') LIMIT 1",
2206 )?;
2207 for row in &prepared.required_property_fts_rows {
2208 let in_rebuild: bool = check_rebuild
2209 .query_row(params![row.kind], |_| Ok(true))
2210 .optional()?
2211 .unwrap_or(false);
2212 if in_rebuild {
2213 ins_staging.execute(params![row.kind, row.node_logical_id, row.text_content])?;
2214 }
2215 }
2216 }
2217
2218 #[cfg(feature = "sqlite-vec")]
2220 {
2221 match tx
2222 .prepare_cached("INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES (?1, ?2)")
2223 {
2224 Ok(mut ins_vec) => {
2225 for vi in &prepared.vec_inserts {
2226 let bytes: Vec<u8> =
2227 vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
2228 ins_vec.execute(params![vi.chunk_id, bytes])?;
2229 }
2230 }
2231 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {
2232 }
2234 Err(e) => return Err(e.into()),
2235 }
2236 }
2237
2238 tx.commit()?;
2239
2240 let provenance_warnings: Vec<String> = prepared
2241 .nodes
2242 .iter()
2243 .filter(|node| node.source_ref.is_none())
2244 .map(|node| format!("node '{}' has no source_ref", node.logical_id))
2245 .chain(
2246 prepared
2247 .node_retires
2248 .iter()
2249 .filter(|r| r.source_ref.is_none())
2250 .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
2251 )
2252 .chain(
2253 prepared
2254 .edges
2255 .iter()
2256 .filter(|e| e.source_ref.is_none())
2257 .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
2258 )
2259 .chain(
2260 prepared
2261 .edge_retires
2262 .iter()
2263 .filter(|r| r.source_ref.is_none())
2264 .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
2265 )
2266 .chain(
2267 prepared
2268 .runs
2269 .iter()
2270 .filter(|r| r.source_ref.is_none())
2271 .map(|r| format!("run '{}' has no source_ref", r.id)),
2272 )
2273 .chain(
2274 prepared
2275 .steps
2276 .iter()
2277 .filter(|s| s.source_ref.is_none())
2278 .map(|s| format!("step '{}' has no source_ref", s.id)),
2279 )
2280 .chain(
2281 prepared
2282 .actions
2283 .iter()
2284 .filter(|a| a.source_ref.is_none())
2285 .map(|a| format!("action '{}' has no source_ref", a.id)),
2286 )
2287 .chain(
2288 prepared
2289 .operational_writes
2290 .iter()
2291 .filter(|write| operational_write_source_ref(write).is_none())
2292 .map(|write| {
2293 format!(
2294 "operational {} '{}:{}' has no source_ref",
2295 operational_write_kind(write),
2296 operational_write_collection(write),
2297 operational_write_record_key(write)
2298 )
2299 }),
2300 )
2301 .collect();
2302
2303 let mut warnings = provenance_warnings.clone();
2304 warnings.extend(prepared.operational_validation_warnings.clone());
2305
2306 Ok(WriteReceipt {
2307 label: prepared.label.clone(),
2308 optional_backfill_count: prepared.optional_backfills.len(),
2309 warnings,
2310 provenance_warnings,
2311 })
2312}
2313
2314fn operational_write_collection(write: &OperationalWrite) -> &str {
2315 match write {
2316 OperationalWrite::Append { collection, .. }
2317 | OperationalWrite::Put { collection, .. }
2318 | OperationalWrite::Delete { collection, .. } => collection,
2319 }
2320}
2321
2322fn operational_write_record_key(write: &OperationalWrite) -> &str {
2323 match write {
2324 OperationalWrite::Append { record_key, .. }
2325 | OperationalWrite::Put { record_key, .. }
2326 | OperationalWrite::Delete { record_key, .. } => record_key,
2327 }
2328}
2329
2330fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2331 match write {
2332 OperationalWrite::Append { .. } => "append",
2333 OperationalWrite::Put { .. } => "put",
2334 OperationalWrite::Delete { .. } => "delete",
2335 }
2336}
2337
2338fn operational_write_payload(write: &OperationalWrite) -> &str {
2339 match write {
2340 OperationalWrite::Append { payload_json, .. }
2341 | OperationalWrite::Put { payload_json, .. } => payload_json,
2342 OperationalWrite::Delete { .. } => "null",
2343 }
2344}
2345
2346fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2347 match write {
2348 OperationalWrite::Append { source_ref, .. }
2349 | OperationalWrite::Put { source_ref, .. }
2350 | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2351 }
2352}
2353
2354#[cfg(test)]
2355#[allow(clippy::expect_used)]
2356mod tests {
2357 use std::sync::Arc;
2358
2359 use fathomdb_schema::SchemaManager;
2360 use tempfile::NamedTempFile;
2361
2362 use super::{apply_write, prepare_write, resolve_operational_writes};
2363 use crate::{
2364 ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2365 NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2366 StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2367 projection::ProjectionTarget,
2368 };
2369
2370 #[test]
2371 fn writer_executes_runtime_table_rows() {
2372 let db = NamedTempFile::new().expect("temporary db");
2373 let writer = WriterActor::start(
2374 db.path(),
2375 Arc::new(SchemaManager::new()),
2376 ProvenanceMode::Warn,
2377 Arc::new(TelemetryCounters::default()),
2378 )
2379 .expect("writer");
2380
2381 let receipt = writer
2382 .submit(WriteRequest {
2383 label: "runtime".to_owned(),
2384 nodes: vec![],
2385 node_retires: vec![],
2386 edges: vec![],
2387 edge_retires: vec![],
2388 chunks: vec![],
2389 runs: vec![RunInsert {
2390 id: "run-1".to_owned(),
2391 kind: "session".to_owned(),
2392 status: "completed".to_owned(),
2393 properties: "{}".to_owned(),
2394 source_ref: Some("src-1".to_owned()),
2395 upsert: false,
2396 supersedes_id: None,
2397 }],
2398 steps: vec![StepInsert {
2399 id: "step-1".to_owned(),
2400 run_id: "run-1".to_owned(),
2401 kind: "llm".to_owned(),
2402 status: "completed".to_owned(),
2403 properties: "{}".to_owned(),
2404 source_ref: Some("src-1".to_owned()),
2405 upsert: false,
2406 supersedes_id: None,
2407 }],
2408 actions: vec![ActionInsert {
2409 id: "action-1".to_owned(),
2410 step_id: "step-1".to_owned(),
2411 kind: "emit".to_owned(),
2412 status: "completed".to_owned(),
2413 properties: "{}".to_owned(),
2414 source_ref: Some("src-1".to_owned()),
2415 upsert: false,
2416 supersedes_id: None,
2417 }],
2418 optional_backfills: vec![],
2419 vec_inserts: vec![],
2420 operational_writes: vec![],
2421 })
2422 .expect("write receipt");
2423
2424 assert_eq!(receipt.label, "runtime");
2425 }
2426
2427 #[test]
2428 fn writer_put_operational_write_updates_current_and_mutations() {
2429 let db = NamedTempFile::new().expect("temporary db");
2430 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2431 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2432 conn.execute(
2433 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2434 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2435 [],
2436 )
2437 .expect("seed collection");
2438 drop(conn);
2439 let writer = WriterActor::start(
2440 db.path(),
2441 Arc::new(SchemaManager::new()),
2442 ProvenanceMode::Warn,
2443 Arc::new(TelemetryCounters::default()),
2444 )
2445 .expect("writer");
2446
2447 writer
2448 .submit(WriteRequest {
2449 label: "node-and-operational".to_owned(),
2450 nodes: vec![NodeInsert {
2451 row_id: "row-1".to_owned(),
2452 logical_id: "lg-1".to_owned(),
2453 kind: "Meeting".to_owned(),
2454 properties: "{}".to_owned(),
2455 source_ref: Some("src-1".to_owned()),
2456 upsert: false,
2457 chunk_policy: ChunkPolicy::Preserve,
2458 content_ref: None,
2459 }],
2460 node_retires: vec![],
2461 edges: vec![],
2462 edge_retires: vec![],
2463 chunks: vec![],
2464 runs: vec![],
2465 steps: vec![],
2466 actions: vec![],
2467 optional_backfills: vec![],
2468 vec_inserts: vec![],
2469 operational_writes: vec![OperationalWrite::Put {
2470 collection: "connector_health".to_owned(),
2471 record_key: "gmail".to_owned(),
2472 payload_json: r#"{"status":"ok"}"#.to_owned(),
2473 source_ref: Some("src-1".to_owned()),
2474 }],
2475 })
2476 .expect("write receipt");
2477
2478 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2479 let node_count: i64 = conn
2480 .query_row(
2481 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2482 [],
2483 |row| row.get(0),
2484 )
2485 .expect("node count");
2486 assert_eq!(node_count, 1);
2487 let mutation_count: i64 = conn
2488 .query_row(
2489 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2490 AND record_key = 'gmail'",
2491 [],
2492 |row| row.get(0),
2493 )
2494 .expect("mutation count");
2495 assert_eq!(mutation_count, 1);
2496 let payload: String = conn
2497 .query_row(
2498 "SELECT payload_json FROM operational_current \
2499 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2500 [],
2501 |row| row.get(0),
2502 )
2503 .expect("current payload");
2504 assert_eq!(payload, r#"{"status":"ok"}"#);
2505 }
2506
2507 #[test]
2508 fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2509 let db = NamedTempFile::new().expect("temporary db");
2510 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2511 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2512 conn.execute(
2513 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2514 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2515 [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2516 )
2517 .expect("seed collection");
2518 drop(conn);
2519 let writer = WriterActor::start(
2520 db.path(),
2521 Arc::new(SchemaManager::new()),
2522 ProvenanceMode::Warn,
2523 Arc::new(TelemetryCounters::default()),
2524 )
2525 .expect("writer");
2526
2527 writer
2528 .submit(WriteRequest {
2529 label: "disabled-validation".to_owned(),
2530 nodes: vec![],
2531 node_retires: vec![],
2532 edges: vec![],
2533 edge_retires: vec![],
2534 chunks: vec![],
2535 runs: vec![],
2536 steps: vec![],
2537 actions: vec![],
2538 optional_backfills: vec![],
2539 vec_inserts: vec![],
2540 operational_writes: vec![OperationalWrite::Put {
2541 collection: "connector_health".to_owned(),
2542 record_key: "gmail".to_owned(),
2543 payload_json: r#"{"bogus":true}"#.to_owned(),
2544 source_ref: Some("src-1".to_owned()),
2545 }],
2546 })
2547 .expect("write receipt");
2548
2549 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2550 let payload: String = conn
2551 .query_row(
2552 "SELECT payload_json FROM operational_current \
2553 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2554 [],
2555 |row| row.get(0),
2556 )
2557 .expect("current payload");
2558 assert_eq!(payload, r#"{"bogus":true}"#);
2559 }
2560
2561 #[test]
2562 fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2563 let db = NamedTempFile::new().expect("temporary db");
2564 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2565 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2566 conn.execute(
2567 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2568 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2569 [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2570 )
2571 .expect("seed collection");
2572 drop(conn);
2573 let writer = WriterActor::start(
2574 db.path(),
2575 Arc::new(SchemaManager::new()),
2576 ProvenanceMode::Warn,
2577 Arc::new(TelemetryCounters::default()),
2578 )
2579 .expect("writer");
2580
2581 let receipt = writer
2582 .submit(WriteRequest {
2583 label: "report-only-validation".to_owned(),
2584 nodes: vec![],
2585 node_retires: vec![],
2586 edges: vec![],
2587 edge_retires: vec![],
2588 chunks: vec![],
2589 runs: vec![],
2590 steps: vec![],
2591 actions: vec![],
2592 optional_backfills: vec![],
2593 vec_inserts: vec![],
2594 operational_writes: vec![OperationalWrite::Put {
2595 collection: "connector_health".to_owned(),
2596 record_key: "gmail".to_owned(),
2597 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2598 source_ref: Some("src-1".to_owned()),
2599 }],
2600 })
2601 .expect("report_only write should succeed");
2602
2603 assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2604 assert_eq!(receipt.warnings.len(), 1);
2605 assert!(
2606 receipt.warnings[0].contains("connector_health"),
2607 "warning should identify collection"
2608 );
2609 assert!(
2610 receipt.warnings[0].contains("must be one of"),
2611 "warning should explain validation failure"
2612 );
2613
2614 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2615 let payload: String = conn
2616 .query_row(
2617 "SELECT payload_json FROM operational_current \
2618 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2619 [],
2620 |row| row.get(0),
2621 )
2622 .expect("current payload");
2623 assert_eq!(payload, r#"{"status":"bogus"}"#);
2624 }
2625
2626 #[test]
2627 fn writer_rejects_operational_write_for_missing_collection() {
2628 let db = NamedTempFile::new().expect("temporary db");
2629 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2630 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2631 drop(conn);
2632 let writer = WriterActor::start(
2633 db.path(),
2634 Arc::new(SchemaManager::new()),
2635 ProvenanceMode::Warn,
2636 Arc::new(TelemetryCounters::default()),
2637 )
2638 .expect("writer");
2639
2640 let result = writer.submit(WriteRequest {
2641 label: "missing-operational-collection".to_owned(),
2642 nodes: vec![],
2643 node_retires: vec![],
2644 edges: vec![],
2645 edge_retires: vec![],
2646 chunks: vec![],
2647 runs: vec![],
2648 steps: vec![],
2649 actions: vec![],
2650 optional_backfills: vec![],
2651 vec_inserts: vec![],
2652 operational_writes: vec![OperationalWrite::Put {
2653 collection: "connector_health".to_owned(),
2654 record_key: "gmail".to_owned(),
2655 payload_json: r#"{"status":"ok"}"#.to_owned(),
2656 source_ref: Some("src-1".to_owned()),
2657 }],
2658 });
2659
2660 assert!(
2661 matches!(result, Err(EngineError::InvalidWrite(_))),
2662 "missing operational collection must return InvalidWrite"
2663 );
2664 }
2665
2666 #[test]
2667 fn writer_append_operational_write_records_history_without_current_row() {
2668 let db = NamedTempFile::new().expect("temporary db");
2669 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2670 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2671 conn.execute(
2672 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2673 VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2674 [],
2675 )
2676 .expect("seed collection");
2677 drop(conn);
2678 let writer = WriterActor::start(
2679 db.path(),
2680 Arc::new(SchemaManager::new()),
2681 ProvenanceMode::Warn,
2682 Arc::new(TelemetryCounters::default()),
2683 )
2684 .expect("writer");
2685
2686 writer
2687 .submit(WriteRequest {
2688 label: "append-operational".to_owned(),
2689 nodes: vec![],
2690 node_retires: vec![],
2691 edges: vec![],
2692 edge_retires: vec![],
2693 chunks: vec![],
2694 runs: vec![],
2695 steps: vec![],
2696 actions: vec![],
2697 optional_backfills: vec![],
2698 vec_inserts: vec![],
2699 operational_writes: vec![OperationalWrite::Append {
2700 collection: "audit_log".to_owned(),
2701 record_key: "evt-1".to_owned(),
2702 payload_json: r#"{"type":"sync"}"#.to_owned(),
2703 source_ref: Some("src-1".to_owned()),
2704 }],
2705 })
2706 .expect("write receipt");
2707
2708 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2709 let mutation: (String, String) = conn
2710 .query_row(
2711 "SELECT op_kind, payload_json FROM operational_mutations \
2712 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2713 [],
2714 |row| Ok((row.get(0)?, row.get(1)?)),
2715 )
2716 .expect("mutation row");
2717 assert_eq!(mutation.0, "append");
2718 assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2719 let current_count: i64 = conn
2720 .query_row(
2721 "SELECT count(*) FROM operational_current \
2722 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2723 [],
2724 |row| row.get(0),
2725 )
2726 .expect("current count");
2727 assert_eq!(current_count, 0);
2728 }
2729
2730 #[test]
2731 fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2732 let db = NamedTempFile::new().expect("temporary db");
2733 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2734 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2735 conn.execute(
2736 "INSERT INTO operational_collections \
2737 (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2738 VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2739 '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2740 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2741 )
2742 .expect("seed collection");
2743 drop(conn);
2744 let writer = WriterActor::start(
2745 db.path(),
2746 Arc::new(SchemaManager::new()),
2747 ProvenanceMode::Warn,
2748 Arc::new(TelemetryCounters::default()),
2749 )
2750 .expect("writer");
2751
2752 let error = writer
2753 .submit(WriteRequest {
2754 label: "invalid-append".to_owned(),
2755 nodes: vec![],
2756 node_retires: vec![],
2757 edges: vec![],
2758 edge_retires: vec![],
2759 chunks: vec![],
2760 runs: vec![],
2761 steps: vec![],
2762 actions: vec![],
2763 optional_backfills: vec![],
2764 vec_inserts: vec![],
2765 operational_writes: vec![OperationalWrite::Append {
2766 collection: "audit_log".to_owned(),
2767 record_key: "evt-1".to_owned(),
2768 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2769 source_ref: Some("src-1".to_owned()),
2770 }],
2771 })
2772 .expect_err("invalid append must reject");
2773 assert!(matches!(error, EngineError::InvalidWrite(_)));
2774 assert!(error.to_string().contains("must be one of"));
2775
2776 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2777 let mutation_count: i64 = conn
2778 .query_row(
2779 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2780 [],
2781 |row| row.get(0),
2782 )
2783 .expect("mutation count");
2784 assert_eq!(mutation_count, 0);
2785 let filter_count: i64 = conn
2786 .query_row(
2787 "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2788 [],
2789 |row| row.get(0),
2790 )
2791 .expect("filter count");
2792 assert_eq!(filter_count, 0);
2793 }
2794
2795 #[test]
2796 fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2797 let db = NamedTempFile::new().expect("temporary db");
2798 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2799 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2800 conn.execute(
2801 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2802 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2803 [],
2804 )
2805 .expect("seed collection");
2806 drop(conn);
2807 let writer = WriterActor::start(
2808 db.path(),
2809 Arc::new(SchemaManager::new()),
2810 ProvenanceMode::Warn,
2811 Arc::new(TelemetryCounters::default()),
2812 )
2813 .expect("writer");
2814
2815 writer
2816 .submit(WriteRequest {
2817 label: "put-operational".to_owned(),
2818 nodes: vec![],
2819 node_retires: vec![],
2820 edges: vec![],
2821 edge_retires: vec![],
2822 chunks: vec![],
2823 runs: vec![],
2824 steps: vec![],
2825 actions: vec![],
2826 optional_backfills: vec![],
2827 vec_inserts: vec![],
2828 operational_writes: vec![OperationalWrite::Put {
2829 collection: "connector_health".to_owned(),
2830 record_key: "gmail".to_owned(),
2831 payload_json: r#"{"status":"ok"}"#.to_owned(),
2832 source_ref: Some("src-1".to_owned()),
2833 }],
2834 })
2835 .expect("put receipt");
2836
2837 writer
2838 .submit(WriteRequest {
2839 label: "delete-operational".to_owned(),
2840 nodes: vec![],
2841 node_retires: vec![],
2842 edges: vec![],
2843 edge_retires: vec![],
2844 chunks: vec![],
2845 runs: vec![],
2846 steps: vec![],
2847 actions: vec![],
2848 optional_backfills: vec![],
2849 vec_inserts: vec![],
2850 operational_writes: vec![OperationalWrite::Delete {
2851 collection: "connector_health".to_owned(),
2852 record_key: "gmail".to_owned(),
2853 source_ref: Some("src-2".to_owned()),
2854 }],
2855 })
2856 .expect("delete receipt");
2857
2858 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2859 let mutation_kinds: Vec<String> = {
2860 let mut stmt = conn
2861 .prepare(
2862 "SELECT op_kind FROM operational_mutations \
2863 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2864 ORDER BY mutation_order ASC",
2865 )
2866 .expect("stmt");
2867 stmt.query_map([], |row| row.get(0))
2868 .expect("rows")
2869 .collect::<Result<_, _>>()
2870 .expect("collect")
2871 };
2872 assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
2873 let current_count: i64 = conn
2874 .query_row(
2875 "SELECT count(*) FROM operational_current \
2876 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2877 [],
2878 |row| row.get(0),
2879 )
2880 .expect("current count");
2881 assert_eq!(current_count, 0);
2882 }
2883
2884 #[test]
2885 fn writer_delete_bypasses_validation_contract() {
2886 let db = NamedTempFile::new().expect("temporary db");
2887 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2888 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2889 conn.execute(
2890 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2891 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2892 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2893 )
2894 .expect("seed collection");
2895 drop(conn);
2896 let writer = WriterActor::start(
2897 db.path(),
2898 Arc::new(SchemaManager::new()),
2899 ProvenanceMode::Warn,
2900 Arc::new(TelemetryCounters::default()),
2901 )
2902 .expect("writer");
2903
2904 writer
2905 .submit(WriteRequest {
2906 label: "valid-put".to_owned(),
2907 nodes: vec![],
2908 node_retires: vec![],
2909 edges: vec![],
2910 edge_retires: vec![],
2911 chunks: vec![],
2912 runs: vec![],
2913 steps: vec![],
2914 actions: vec![],
2915 optional_backfills: vec![],
2916 vec_inserts: vec![],
2917 operational_writes: vec![OperationalWrite::Put {
2918 collection: "connector_health".to_owned(),
2919 record_key: "gmail".to_owned(),
2920 payload_json: r#"{"status":"ok"}"#.to_owned(),
2921 source_ref: Some("src-1".to_owned()),
2922 }],
2923 })
2924 .expect("put receipt");
2925 writer
2926 .submit(WriteRequest {
2927 label: "delete-after-put".to_owned(),
2928 nodes: vec![],
2929 node_retires: vec![],
2930 edges: vec![],
2931 edge_retires: vec![],
2932 chunks: vec![],
2933 runs: vec![],
2934 steps: vec![],
2935 actions: vec![],
2936 optional_backfills: vec![],
2937 vec_inserts: vec![],
2938 operational_writes: vec![OperationalWrite::Delete {
2939 collection: "connector_health".to_owned(),
2940 record_key: "gmail".to_owned(),
2941 source_ref: Some("src-2".to_owned()),
2942 }],
2943 })
2944 .expect("delete receipt");
2945
2946 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2947 let current_count: i64 = conn
2948 .query_row(
2949 "SELECT count(*) FROM operational_current \
2950 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2951 [],
2952 |row| row.get(0),
2953 )
2954 .expect("current count");
2955 assert_eq!(current_count, 0);
2956 }
2957
2958 #[test]
2959 fn writer_latest_state_secondary_indexes_track_put_and_delete() {
2960 let db = NamedTempFile::new().expect("temporary db");
2961 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2962 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2963 conn.execute(
2964 "INSERT INTO operational_collections \
2965 (name, kind, schema_json, retention_json, secondary_indexes_json) \
2966 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2967 [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
2968 )
2969 .expect("seed collection");
2970 drop(conn);
2971 let writer = WriterActor::start(
2972 db.path(),
2973 Arc::new(SchemaManager::new()),
2974 ProvenanceMode::Warn,
2975 Arc::new(TelemetryCounters::default()),
2976 )
2977 .expect("writer");
2978
2979 writer
2980 .submit(WriteRequest {
2981 label: "secondary-index-put".to_owned(),
2982 nodes: vec![],
2983 node_retires: vec![],
2984 edges: vec![],
2985 edge_retires: vec![],
2986 chunks: vec![],
2987 runs: vec![],
2988 steps: vec![],
2989 actions: vec![],
2990 optional_backfills: vec![],
2991 vec_inserts: vec![],
2992 operational_writes: vec![OperationalWrite::Put {
2993 collection: "connector_health".to_owned(),
2994 record_key: "gmail".to_owned(),
2995 payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
2996 .to_owned(),
2997 source_ref: Some("src-1".to_owned()),
2998 }],
2999 })
3000 .expect("put receipt");
3001
3002 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3003 let current_entry_count: i64 = conn
3004 .query_row(
3005 "SELECT count(*) FROM operational_secondary_index_entries \
3006 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3007 [],
3008 |row| row.get(0),
3009 )
3010 .expect("current secondary index count");
3011 assert_eq!(current_entry_count, 2);
3012 drop(conn);
3013
3014 writer
3015 .submit(WriteRequest {
3016 label: "secondary-index-delete".to_owned(),
3017 nodes: vec![],
3018 node_retires: vec![],
3019 edges: vec![],
3020 edge_retires: vec![],
3021 chunks: vec![],
3022 runs: vec![],
3023 steps: vec![],
3024 actions: vec![],
3025 optional_backfills: vec![],
3026 vec_inserts: vec![],
3027 operational_writes: vec![OperationalWrite::Delete {
3028 collection: "connector_health".to_owned(),
3029 record_key: "gmail".to_owned(),
3030 source_ref: Some("src-2".to_owned()),
3031 }],
3032 })
3033 .expect("delete receipt");
3034
3035 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3036 let current_entry_count: i64 = conn
3037 .query_row(
3038 "SELECT count(*) FROM operational_secondary_index_entries \
3039 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3040 [],
3041 |row| row.get(0),
3042 )
3043 .expect("current secondary index count");
3044 assert_eq!(current_entry_count, 0);
3045 }
3046
3047 #[test]
3048 fn writer_latest_state_operational_writes_persist_mutation_order() {
3049 let db = NamedTempFile::new().expect("temporary db");
3050 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3051 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3052 conn.execute(
3053 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3054 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3055 [],
3056 )
3057 .expect("seed collection");
3058 drop(conn);
3059
3060 let writer = WriterActor::start(
3061 db.path(),
3062 Arc::new(SchemaManager::new()),
3063 ProvenanceMode::Warn,
3064 Arc::new(TelemetryCounters::default()),
3065 )
3066 .expect("writer");
3067
3068 writer
3069 .submit(WriteRequest {
3070 label: "ordered-operational-batch".to_owned(),
3071 nodes: vec![],
3072 node_retires: vec![],
3073 edges: vec![],
3074 edge_retires: vec![],
3075 chunks: vec![],
3076 runs: vec![],
3077 steps: vec![],
3078 actions: vec![],
3079 optional_backfills: vec![],
3080 vec_inserts: vec![],
3081 operational_writes: vec![
3082 OperationalWrite::Put {
3083 collection: "connector_health".to_owned(),
3084 record_key: "gmail".to_owned(),
3085 payload_json: r#"{"status":"old"}"#.to_owned(),
3086 source_ref: Some("src-1".to_owned()),
3087 },
3088 OperationalWrite::Delete {
3089 collection: "connector_health".to_owned(),
3090 record_key: "gmail".to_owned(),
3091 source_ref: Some("src-2".to_owned()),
3092 },
3093 OperationalWrite::Put {
3094 collection: "connector_health".to_owned(),
3095 record_key: "gmail".to_owned(),
3096 payload_json: r#"{"status":"new"}"#.to_owned(),
3097 source_ref: Some("src-3".to_owned()),
3098 },
3099 ],
3100 })
3101 .expect("write receipt");
3102
3103 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3104 let rows: Vec<(String, i64)> = {
3105 let mut stmt = conn
3106 .prepare(
3107 "SELECT op_kind, mutation_order FROM operational_mutations \
3108 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3109 ORDER BY mutation_order ASC",
3110 )
3111 .expect("stmt");
3112 stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3113 .expect("rows")
3114 .collect::<Result<_, _>>()
3115 .expect("collect")
3116 };
3117 assert_eq!(
3118 rows,
3119 vec![
3120 ("put".to_owned(), 1),
3121 ("delete".to_owned(), 2),
3122 ("put".to_owned(), 3),
3123 ]
3124 );
3125 let payload: String = conn
3126 .query_row(
3127 "SELECT payload_json FROM operational_current \
3128 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3129 [],
3130 |row| row.get(0),
3131 )
3132 .expect("current payload");
3133 assert_eq!(payload, r#"{"status":"new"}"#);
3134 }
3135
3136 #[test]
3137 fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
3138 let db = NamedTempFile::new().expect("temporary db");
3139 let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
3140 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3141 conn.execute(
3142 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3143 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3144 [],
3145 )
3146 .expect("seed collection");
3147
3148 let request = WriteRequest {
3149 label: "disabled-race".to_owned(),
3150 nodes: vec![],
3151 node_retires: vec![],
3152 edges: vec![],
3153 edge_retires: vec![],
3154 chunks: vec![],
3155 runs: vec![],
3156 steps: vec![],
3157 actions: vec![],
3158 optional_backfills: vec![],
3159 vec_inserts: vec![],
3160 operational_writes: vec![OperationalWrite::Put {
3161 collection: "connector_health".to_owned(),
3162 record_key: "gmail".to_owned(),
3163 payload_json: r#"{"status":"ok"}"#.to_owned(),
3164 source_ref: Some("src-1".to_owned()),
3165 }],
3166 };
3167 let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
3168 resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
3169
3170 conn.execute(
3171 "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
3172 [],
3173 )
3174 .expect("disable collection after preflight");
3175
3176 let error =
3177 apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
3178 assert!(matches!(error, EngineError::InvalidWrite(_)));
3179 assert!(error.to_string().contains("is disabled"));
3180
3181 let mutation_count: i64 = conn
3182 .query_row(
3183 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3184 [],
3185 |row| row.get(0),
3186 )
3187 .expect("mutation count");
3188 assert_eq!(mutation_count, 0);
3189
3190 let current_count: i64 = conn
3191 .query_row(
3192 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3193 [],
3194 |row| row.get(0),
3195 )
3196 .expect("current count");
3197 assert_eq!(current_count, 0);
3198 }
3199
3200 #[test]
3201 fn writer_enforce_validation_rejects_invalid_put_atomically() {
3202 let db = NamedTempFile::new().expect("temporary db");
3203 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3204 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3205 conn.execute(
3206 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3207 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3208 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3209 )
3210 .expect("seed collection");
3211 drop(conn);
3212 let writer = WriterActor::start(
3213 db.path(),
3214 Arc::new(SchemaManager::new()),
3215 ProvenanceMode::Warn,
3216 Arc::new(TelemetryCounters::default()),
3217 )
3218 .expect("writer");
3219
3220 let error = writer
3221 .submit(WriteRequest {
3222 label: "invalid-put".to_owned(),
3223 nodes: vec![NodeInsert {
3224 row_id: "row-1".to_owned(),
3225 logical_id: "lg-1".to_owned(),
3226 kind: "Meeting".to_owned(),
3227 properties: "{}".to_owned(),
3228 source_ref: Some("src-1".to_owned()),
3229 upsert: false,
3230 chunk_policy: ChunkPolicy::Preserve,
3231 content_ref: None,
3232 }],
3233 node_retires: vec![],
3234 edges: vec![],
3235 edge_retires: vec![],
3236 chunks: vec![],
3237 runs: vec![],
3238 steps: vec![],
3239 actions: vec![],
3240 optional_backfills: vec![],
3241 vec_inserts: vec![],
3242 operational_writes: vec![OperationalWrite::Put {
3243 collection: "connector_health".to_owned(),
3244 record_key: "gmail".to_owned(),
3245 payload_json: r#"{"status":"bogus"}"#.to_owned(),
3246 source_ref: Some("src-1".to_owned()),
3247 }],
3248 })
3249 .expect_err("invalid put must reject");
3250 assert!(matches!(error, EngineError::InvalidWrite(_)));
3251 assert!(error.to_string().contains("must be one of"));
3252
3253 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3254 let node_count: i64 = conn
3255 .query_row(
3256 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
3257 [],
3258 |row| row.get(0),
3259 )
3260 .expect("node count");
3261 assert_eq!(node_count, 0);
3262 let mutation_count: i64 = conn
3263 .query_row(
3264 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3265 [],
3266 |row| row.get(0),
3267 )
3268 .expect("mutation count");
3269 assert_eq!(mutation_count, 0);
3270 let current_count: i64 = conn
3271 .query_row(
3272 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3273 [],
3274 |row| row.get(0),
3275 )
3276 .expect("current count");
3277 assert_eq!(current_count, 0);
3278 }
3279
3280 #[test]
3281 fn writer_rejects_append_against_latest_state_collection() {
3282 let db = NamedTempFile::new().expect("temporary db");
3283 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3284 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3285 conn.execute(
3286 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3287 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3288 [],
3289 )
3290 .expect("seed collection");
3291 drop(conn);
3292 let writer = WriterActor::start(
3293 db.path(),
3294 Arc::new(SchemaManager::new()),
3295 ProvenanceMode::Warn,
3296 Arc::new(TelemetryCounters::default()),
3297 )
3298 .expect("writer");
3299
3300 let result = writer.submit(WriteRequest {
3301 label: "bad-append".to_owned(),
3302 nodes: vec![],
3303 node_retires: vec![],
3304 edges: vec![],
3305 edge_retires: vec![],
3306 chunks: vec![],
3307 runs: vec![],
3308 steps: vec![],
3309 actions: vec![],
3310 optional_backfills: vec![],
3311 vec_inserts: vec![],
3312 operational_writes: vec![OperationalWrite::Append {
3313 collection: "connector_health".to_owned(),
3314 record_key: "gmail".to_owned(),
3315 payload_json: r#"{"status":"ok"}"#.to_owned(),
3316 source_ref: Some("src-1".to_owned()),
3317 }],
3318 });
3319
3320 assert!(
3321 matches!(result, Err(EngineError::InvalidWrite(_))),
3322 "latest_state collection must reject Append"
3323 );
3324 }
3325
3326 #[test]
3327 fn writer_upsert_supersedes_prior_active_node() {
3328 let db = NamedTempFile::new().expect("temporary db");
3329 let writer = WriterActor::start(
3330 db.path(),
3331 Arc::new(SchemaManager::new()),
3332 ProvenanceMode::Warn,
3333 Arc::new(TelemetryCounters::default()),
3334 )
3335 .expect("writer");
3336
3337 writer
3338 .submit(WriteRequest {
3339 label: "v1".to_owned(),
3340 nodes: vec![NodeInsert {
3341 row_id: "row-1".to_owned(),
3342 logical_id: "lg-1".to_owned(),
3343 kind: "Meeting".to_owned(),
3344 properties: r#"{"version":1}"#.to_owned(),
3345 source_ref: Some("src-1".to_owned()),
3346 upsert: false,
3347 chunk_policy: ChunkPolicy::Preserve,
3348 content_ref: None,
3349 }],
3350 node_retires: vec![],
3351 edges: vec![],
3352 edge_retires: vec![],
3353 chunks: vec![],
3354 runs: vec![],
3355 steps: vec![],
3356 actions: vec![],
3357 optional_backfills: vec![],
3358 vec_inserts: vec![],
3359 operational_writes: vec![],
3360 })
3361 .expect("v1 write");
3362
3363 writer
3364 .submit(WriteRequest {
3365 label: "v2".to_owned(),
3366 nodes: vec![NodeInsert {
3367 row_id: "row-2".to_owned(),
3368 logical_id: "lg-1".to_owned(),
3369 kind: "Meeting".to_owned(),
3370 properties: r#"{"version":2}"#.to_owned(),
3371 source_ref: Some("src-2".to_owned()),
3372 upsert: true,
3373 chunk_policy: ChunkPolicy::Preserve,
3374 content_ref: None,
3375 }],
3376 node_retires: vec![],
3377 edges: vec![],
3378 edge_retires: vec![],
3379 chunks: vec![],
3380 runs: vec![],
3381 steps: vec![],
3382 actions: vec![],
3383 optional_backfills: vec![],
3384 vec_inserts: vec![],
3385 operational_writes: vec![],
3386 })
3387 .expect("v2 upsert write");
3388
3389 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3390 let (active_row_id, props): (String, String) = conn
3391 .query_row(
3392 "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3393 [],
3394 |row| Ok((row.get(0)?, row.get(1)?)),
3395 )
3396 .expect("active row");
3397 assert_eq!(active_row_id, "row-2");
3398 assert!(props.contains("\"version\":2"));
3399
3400 let superseded: i64 = conn
3401 .query_row(
3402 "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3403 [],
3404 |row| row.get(0),
3405 )
3406 .expect("superseded count");
3407 assert_eq!(superseded, 1);
3408 }
3409
3410 #[test]
3411 fn writer_inserts_edge_between_two_nodes() {
3412 let db = NamedTempFile::new().expect("temporary db");
3413 let writer = WriterActor::start(
3414 db.path(),
3415 Arc::new(SchemaManager::new()),
3416 ProvenanceMode::Warn,
3417 Arc::new(TelemetryCounters::default()),
3418 )
3419 .expect("writer");
3420
3421 writer
3422 .submit(WriteRequest {
3423 label: "nodes-and-edge".to_owned(),
3424 nodes: vec![
3425 NodeInsert {
3426 row_id: "row-meeting".to_owned(),
3427 logical_id: "meeting-1".to_owned(),
3428 kind: "Meeting".to_owned(),
3429 properties: "{}".to_owned(),
3430 source_ref: Some("src-1".to_owned()),
3431 upsert: false,
3432 chunk_policy: ChunkPolicy::Preserve,
3433 content_ref: None,
3434 },
3435 NodeInsert {
3436 row_id: "row-task".to_owned(),
3437 logical_id: "task-1".to_owned(),
3438 kind: "Task".to_owned(),
3439 properties: "{}".to_owned(),
3440 source_ref: Some("src-1".to_owned()),
3441 upsert: false,
3442 chunk_policy: ChunkPolicy::Preserve,
3443 content_ref: None,
3444 },
3445 ],
3446 node_retires: vec![],
3447 edges: vec![EdgeInsert {
3448 row_id: "edge-1".to_owned(),
3449 logical_id: "edge-lg-1".to_owned(),
3450 source_logical_id: "meeting-1".to_owned(),
3451 target_logical_id: "task-1".to_owned(),
3452 kind: "HAS_TASK".to_owned(),
3453 properties: "{}".to_owned(),
3454 source_ref: Some("src-1".to_owned()),
3455 upsert: false,
3456 }],
3457 edge_retires: vec![],
3458 chunks: vec![],
3459 runs: vec![],
3460 steps: vec![],
3461 actions: vec![],
3462 optional_backfills: vec![],
3463 vec_inserts: vec![],
3464 operational_writes: vec![],
3465 })
3466 .expect("write receipt");
3467
3468 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3469 let (src, tgt, kind): (String, String, String) = conn
3470 .query_row(
3471 "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3472 [],
3473 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3474 )
3475 .expect("edge row");
3476 assert_eq!(src, "meeting-1");
3477 assert_eq!(tgt, "task-1");
3478 assert_eq!(kind, "HAS_TASK");
3479 }
3480
3481 #[test]
3482 #[allow(clippy::too_many_lines)]
3483 fn writer_upsert_supersedes_prior_active_edge() {
3484 let db = NamedTempFile::new().expect("temporary db");
3485 let writer = WriterActor::start(
3486 db.path(),
3487 Arc::new(SchemaManager::new()),
3488 ProvenanceMode::Warn,
3489 Arc::new(TelemetryCounters::default()),
3490 )
3491 .expect("writer");
3492
3493 writer
3495 .submit(WriteRequest {
3496 label: "nodes".to_owned(),
3497 nodes: vec![
3498 NodeInsert {
3499 row_id: "row-a".to_owned(),
3500 logical_id: "node-a".to_owned(),
3501 kind: "Meeting".to_owned(),
3502 properties: "{}".to_owned(),
3503 source_ref: Some("src-1".to_owned()),
3504 upsert: false,
3505 chunk_policy: ChunkPolicy::Preserve,
3506 content_ref: None,
3507 },
3508 NodeInsert {
3509 row_id: "row-b".to_owned(),
3510 logical_id: "node-b".to_owned(),
3511 kind: "Task".to_owned(),
3512 properties: "{}".to_owned(),
3513 source_ref: Some("src-1".to_owned()),
3514 upsert: false,
3515 chunk_policy: ChunkPolicy::Preserve,
3516 content_ref: None,
3517 },
3518 ],
3519 node_retires: vec![],
3520 edges: vec![],
3521 edge_retires: vec![],
3522 chunks: vec![],
3523 runs: vec![],
3524 steps: vec![],
3525 actions: vec![],
3526 optional_backfills: vec![],
3527 vec_inserts: vec![],
3528 operational_writes: vec![],
3529 })
3530 .expect("nodes write");
3531
3532 writer
3534 .submit(WriteRequest {
3535 label: "edge-v1".to_owned(),
3536 nodes: vec![],
3537 node_retires: vec![],
3538 edges: vec![EdgeInsert {
3539 row_id: "edge-row-1".to_owned(),
3540 logical_id: "edge-lg-1".to_owned(),
3541 source_logical_id: "node-a".to_owned(),
3542 target_logical_id: "node-b".to_owned(),
3543 kind: "HAS_TASK".to_owned(),
3544 properties: r#"{"weight":1}"#.to_owned(),
3545 source_ref: Some("src-1".to_owned()),
3546 upsert: false,
3547 }],
3548 edge_retires: vec![],
3549 chunks: vec![],
3550 runs: vec![],
3551 steps: vec![],
3552 actions: vec![],
3553 optional_backfills: vec![],
3554 vec_inserts: vec![],
3555 operational_writes: vec![],
3556 })
3557 .expect("edge v1 write");
3558
3559 writer
3561 .submit(WriteRequest {
3562 label: "edge-v2".to_owned(),
3563 nodes: vec![],
3564 node_retires: vec![],
3565 edges: vec![EdgeInsert {
3566 row_id: "edge-row-2".to_owned(),
3567 logical_id: "edge-lg-1".to_owned(),
3568 source_logical_id: "node-a".to_owned(),
3569 target_logical_id: "node-b".to_owned(),
3570 kind: "HAS_TASK".to_owned(),
3571 properties: r#"{"weight":2}"#.to_owned(),
3572 source_ref: Some("src-2".to_owned()),
3573 upsert: true,
3574 }],
3575 edge_retires: vec![],
3576 chunks: vec![],
3577 runs: vec![],
3578 steps: vec![],
3579 actions: vec![],
3580 optional_backfills: vec![],
3581 vec_inserts: vec![],
3582 operational_writes: vec![],
3583 })
3584 .expect("edge v2 upsert");
3585
3586 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3587 let (active_row_id, props): (String, String) = conn
3588 .query_row(
3589 "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3590 [],
3591 |row| Ok((row.get(0)?, row.get(1)?)),
3592 )
3593 .expect("active edge");
3594 assert_eq!(active_row_id, "edge-row-2");
3595 assert!(props.contains("\"weight\":2"));
3596
3597 let superseded: i64 = conn
3598 .query_row(
3599 "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3600 [],
3601 |row| row.get(0),
3602 )
3603 .expect("superseded count");
3604 assert_eq!(superseded, 1);
3605 }
3606
3607 #[test]
3608 fn writer_fts_rows_are_written_to_database() {
3609 let db = NamedTempFile::new().expect("temporary db");
3610 let writer = WriterActor::start(
3611 db.path(),
3612 Arc::new(SchemaManager::new()),
3613 ProvenanceMode::Warn,
3614 Arc::new(TelemetryCounters::default()),
3615 )
3616 .expect("writer");
3617
3618 writer
3619 .submit(WriteRequest {
3620 label: "seed".to_owned(),
3621 nodes: vec![NodeInsert {
3622 row_id: "row-1".to_owned(),
3623 logical_id: "logical-1".to_owned(),
3624 kind: "Meeting".to_owned(),
3625 properties: "{}".to_owned(),
3626 source_ref: Some("src-1".to_owned()),
3627 upsert: false,
3628 chunk_policy: ChunkPolicy::Preserve,
3629 content_ref: None,
3630 }],
3631 node_retires: vec![],
3632 edges: vec![],
3633 edge_retires: vec![],
3634 chunks: vec![ChunkInsert {
3635 id: "chunk-1".to_owned(),
3636 node_logical_id: "logical-1".to_owned(),
3637 text_content: "budget discussion".to_owned(),
3638 byte_start: None,
3639 byte_end: None,
3640 content_hash: None,
3641 }],
3642 runs: vec![],
3643 steps: vec![],
3644 actions: vec![],
3645 optional_backfills: vec![],
3646 vec_inserts: vec![],
3647 operational_writes: vec![],
3648 })
3649 .expect("write receipt");
3650
3651 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3652 let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3653 conn.query_row(
3654 "SELECT chunk_id, node_logical_id, kind, text_content \
3655 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3656 [],
3657 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3658 )
3659 .expect("fts row");
3660 assert_eq!(chunk_id, "chunk-1");
3661 assert_eq!(node_logical_id, "logical-1");
3662 assert_eq!(kind, "Meeting");
3663 assert_eq!(text_content, "budget discussion");
3664 }
3665
3666 #[test]
3667 fn writer_receipt_warns_on_nodes_without_source_ref() {
3668 let db = NamedTempFile::new().expect("temporary db");
3669 let writer = WriterActor::start(
3670 db.path(),
3671 Arc::new(SchemaManager::new()),
3672 ProvenanceMode::Warn,
3673 Arc::new(TelemetryCounters::default()),
3674 )
3675 .expect("writer");
3676
3677 let receipt = writer
3678 .submit(WriteRequest {
3679 label: "no-source".to_owned(),
3680 nodes: vec![NodeInsert {
3681 row_id: "row-1".to_owned(),
3682 logical_id: "logical-1".to_owned(),
3683 kind: "Meeting".to_owned(),
3684 properties: "{}".to_owned(),
3685 source_ref: None,
3686 upsert: false,
3687 chunk_policy: ChunkPolicy::Preserve,
3688 content_ref: None,
3689 }],
3690 node_retires: vec![],
3691 edges: vec![],
3692 edge_retires: vec![],
3693 chunks: vec![],
3694 runs: vec![],
3695 steps: vec![],
3696 actions: vec![],
3697 optional_backfills: vec![],
3698 vec_inserts: vec![],
3699 operational_writes: vec![],
3700 })
3701 .expect("write receipt");
3702
3703 assert_eq!(receipt.provenance_warnings.len(), 1);
3704 assert!(receipt.provenance_warnings[0].contains("logical-1"));
3705 }
3706
3707 #[test]
3708 fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3709 let db = NamedTempFile::new().expect("temporary db");
3710 let writer = WriterActor::start(
3711 db.path(),
3712 Arc::new(SchemaManager::new()),
3713 ProvenanceMode::Warn,
3714 Arc::new(TelemetryCounters::default()),
3715 )
3716 .expect("writer");
3717
3718 let receipt = writer
3719 .submit(WriteRequest {
3720 label: "with-source".to_owned(),
3721 nodes: vec![NodeInsert {
3722 row_id: "row-1".to_owned(),
3723 logical_id: "logical-1".to_owned(),
3724 kind: "Meeting".to_owned(),
3725 properties: "{}".to_owned(),
3726 source_ref: Some("src-1".to_owned()),
3727 upsert: false,
3728 chunk_policy: ChunkPolicy::Preserve,
3729 content_ref: None,
3730 }],
3731 node_retires: vec![],
3732 edges: vec![],
3733 edge_retires: vec![],
3734 chunks: vec![],
3735 runs: vec![],
3736 steps: vec![],
3737 actions: vec![],
3738 optional_backfills: vec![],
3739 vec_inserts: vec![],
3740 operational_writes: vec![],
3741 })
3742 .expect("write receipt");
3743
3744 assert!(receipt.provenance_warnings.is_empty());
3745 }
3746
3747 #[test]
3748 fn writer_accepts_chunk_for_pre_existing_node() {
3749 let db = NamedTempFile::new().expect("temporary db");
3750 let writer = WriterActor::start(
3751 db.path(),
3752 Arc::new(SchemaManager::new()),
3753 ProvenanceMode::Warn,
3754 Arc::new(TelemetryCounters::default()),
3755 )
3756 .expect("writer");
3757
3758 writer
3760 .submit(WriteRequest {
3761 label: "r1".to_owned(),
3762 nodes: vec![NodeInsert {
3763 row_id: "row-1".to_owned(),
3764 logical_id: "logical-1".to_owned(),
3765 kind: "Meeting".to_owned(),
3766 properties: "{}".to_owned(),
3767 source_ref: Some("src-1".to_owned()),
3768 upsert: false,
3769 chunk_policy: ChunkPolicy::Preserve,
3770 content_ref: None,
3771 }],
3772 node_retires: vec![],
3773 edges: vec![],
3774 edge_retires: vec![],
3775 chunks: vec![],
3776 runs: vec![],
3777 steps: vec![],
3778 actions: vec![],
3779 optional_backfills: vec![],
3780 vec_inserts: vec![],
3781 operational_writes: vec![],
3782 })
3783 .expect("r1 write");
3784
3785 writer
3787 .submit(WriteRequest {
3788 label: "r2".to_owned(),
3789 nodes: vec![],
3790 node_retires: vec![],
3791 edges: vec![],
3792 edge_retires: vec![],
3793 chunks: vec![ChunkInsert {
3794 id: "chunk-1".to_owned(),
3795 node_logical_id: "logical-1".to_owned(),
3796 text_content: "budget discussion".to_owned(),
3797 byte_start: None,
3798 byte_end: None,
3799 content_hash: None,
3800 }],
3801 runs: vec![],
3802 steps: vec![],
3803 actions: vec![],
3804 optional_backfills: vec![],
3805 vec_inserts: vec![],
3806 operational_writes: vec![],
3807 })
3808 .expect("r2 write — chunk for pre-existing node");
3809
3810 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3811 let count: i64 = conn
3812 .query_row(
3813 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3814 [],
3815 |row| row.get(0),
3816 )
3817 .expect("fts count");
3818 assert_eq!(
3819 count, 1,
3820 "FTS row must exist for chunk attached to pre-existing node"
3821 );
3822 }
3823
3824 #[test]
3825 fn writer_rejects_chunk_for_completely_unknown_node() {
3826 let db = NamedTempFile::new().expect("temporary db");
3827 let writer = WriterActor::start(
3828 db.path(),
3829 Arc::new(SchemaManager::new()),
3830 ProvenanceMode::Warn,
3831 Arc::new(TelemetryCounters::default()),
3832 )
3833 .expect("writer");
3834
3835 let result = writer.submit(WriteRequest {
3836 label: "bad".to_owned(),
3837 nodes: vec![],
3838 node_retires: vec![],
3839 edges: vec![],
3840 edge_retires: vec![],
3841 chunks: vec![ChunkInsert {
3842 id: "chunk-1".to_owned(),
3843 node_logical_id: "nonexistent".to_owned(),
3844 text_content: "some text".to_owned(),
3845 byte_start: None,
3846 byte_end: None,
3847 content_hash: None,
3848 }],
3849 runs: vec![],
3850 steps: vec![],
3851 actions: vec![],
3852 optional_backfills: vec![],
3853 vec_inserts: vec![],
3854 operational_writes: vec![],
3855 });
3856
3857 assert!(
3858 matches!(result, Err(EngineError::InvalidWrite(_))),
3859 "completely unknown node must return InvalidWrite"
3860 );
3861 }
3862
3863 #[test]
3864 fn writer_executes_typed_nodes_chunks_and_derived_projections() {
3865 let db = NamedTempFile::new().expect("temporary db");
3866 let writer = WriterActor::start(
3867 db.path(),
3868 Arc::new(SchemaManager::new()),
3869 ProvenanceMode::Warn,
3870 Arc::new(TelemetryCounters::default()),
3871 )
3872 .expect("writer");
3873
3874 let receipt = writer
3875 .submit(WriteRequest {
3876 label: "seed".to_owned(),
3877 nodes: vec![NodeInsert {
3878 row_id: "row-1".to_owned(),
3879 logical_id: "logical-1".to_owned(),
3880 kind: "Meeting".to_owned(),
3881 properties: "{}".to_owned(),
3882 source_ref: None,
3883 upsert: false,
3884 chunk_policy: ChunkPolicy::Preserve,
3885 content_ref: None,
3886 }],
3887 node_retires: vec![],
3888 edges: vec![],
3889 edge_retires: vec![],
3890 chunks: vec![ChunkInsert {
3891 id: "chunk-1".to_owned(),
3892 node_logical_id: "logical-1".to_owned(),
3893 text_content: "budget discussion".to_owned(),
3894 byte_start: None,
3895 byte_end: None,
3896 content_hash: None,
3897 }],
3898 runs: vec![],
3899 steps: vec![],
3900 actions: vec![],
3901 optional_backfills: vec![],
3902 vec_inserts: vec![],
3903 operational_writes: vec![],
3904 })
3905 .expect("write receipt");
3906
3907 assert_eq!(receipt.label, "seed");
3908 }
3909
3910 #[test]
3911 fn writer_node_retire_supersedes_active_node() {
3912 let db = NamedTempFile::new().expect("temporary db");
3913 let writer = WriterActor::start(
3914 db.path(),
3915 Arc::new(SchemaManager::new()),
3916 ProvenanceMode::Warn,
3917 Arc::new(TelemetryCounters::default()),
3918 )
3919 .expect("writer");
3920
3921 writer
3922 .submit(WriteRequest {
3923 label: "seed".to_owned(),
3924 nodes: vec![NodeInsert {
3925 row_id: "row-1".to_owned(),
3926 logical_id: "meeting-1".to_owned(),
3927 kind: "Meeting".to_owned(),
3928 properties: "{}".to_owned(),
3929 source_ref: Some("src-1".to_owned()),
3930 upsert: false,
3931 chunk_policy: ChunkPolicy::Preserve,
3932 content_ref: None,
3933 }],
3934 node_retires: vec![],
3935 edges: vec![],
3936 edge_retires: vec![],
3937 chunks: vec![],
3938 runs: vec![],
3939 steps: vec![],
3940 actions: vec![],
3941 optional_backfills: vec![],
3942 vec_inserts: vec![],
3943 operational_writes: vec![],
3944 })
3945 .expect("seed write");
3946
3947 writer
3948 .submit(WriteRequest {
3949 label: "retire".to_owned(),
3950 nodes: vec![],
3951 node_retires: vec![NodeRetire {
3952 logical_id: "meeting-1".to_owned(),
3953 source_ref: Some("src-2".to_owned()),
3954 }],
3955 edges: vec![],
3956 edge_retires: vec![],
3957 chunks: vec![],
3958 runs: vec![],
3959 steps: vec![],
3960 actions: vec![],
3961 optional_backfills: vec![],
3962 vec_inserts: vec![],
3963 operational_writes: vec![],
3964 })
3965 .expect("retire write");
3966
3967 let conn = rusqlite::Connection::open(db.path()).expect("open");
3968 let active: i64 = conn
3969 .query_row(
3970 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
3971 [],
3972 |r| r.get(0),
3973 )
3974 .expect("count active");
3975 let historical: i64 = conn
3976 .query_row(
3977 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
3978 [],
3979 |r| r.get(0),
3980 )
3981 .expect("count historical");
3982
3983 assert_eq!(active, 0, "active count must be 0 after retire");
3984 assert_eq!(historical, 1, "historical count must be 1 after retire");
3985 }
3986
3987 #[test]
3988 fn writer_node_retire_preserves_chunks_and_clears_fts() {
3989 let db = NamedTempFile::new().expect("temporary db");
3990 let writer = WriterActor::start(
3991 db.path(),
3992 Arc::new(SchemaManager::new()),
3993 ProvenanceMode::Warn,
3994 Arc::new(TelemetryCounters::default()),
3995 )
3996 .expect("writer");
3997
3998 writer
3999 .submit(WriteRequest {
4000 label: "seed".to_owned(),
4001 nodes: vec![NodeInsert {
4002 row_id: "row-1".to_owned(),
4003 logical_id: "meeting-1".to_owned(),
4004 kind: "Meeting".to_owned(),
4005 properties: "{}".to_owned(),
4006 source_ref: Some("src-1".to_owned()),
4007 upsert: false,
4008 chunk_policy: ChunkPolicy::Preserve,
4009 content_ref: None,
4010 }],
4011 node_retires: vec![],
4012 edges: vec![],
4013 edge_retires: vec![],
4014 chunks: vec![ChunkInsert {
4015 id: "chunk-1".to_owned(),
4016 node_logical_id: "meeting-1".to_owned(),
4017 text_content: "budget discussion".to_owned(),
4018 byte_start: None,
4019 byte_end: None,
4020 content_hash: None,
4021 }],
4022 runs: vec![],
4023 steps: vec![],
4024 actions: vec![],
4025 optional_backfills: vec![],
4026 vec_inserts: vec![],
4027 operational_writes: vec![],
4028 })
4029 .expect("seed write");
4030
4031 writer
4032 .submit(WriteRequest {
4033 label: "retire".to_owned(),
4034 nodes: vec![],
4035 node_retires: vec![NodeRetire {
4036 logical_id: "meeting-1".to_owned(),
4037 source_ref: Some("src-2".to_owned()),
4038 }],
4039 edges: vec![],
4040 edge_retires: vec![],
4041 chunks: vec![],
4042 runs: vec![],
4043 steps: vec![],
4044 actions: vec![],
4045 optional_backfills: vec![],
4046 vec_inserts: vec![],
4047 operational_writes: vec![],
4048 })
4049 .expect("retire write");
4050
4051 let conn = rusqlite::Connection::open(db.path()).expect("open");
4052 let chunk_count: i64 = conn
4053 .query_row(
4054 "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
4055 [],
4056 |r| r.get(0),
4057 )
4058 .expect("chunk count");
4059 let fts_count: i64 = conn
4060 .query_row(
4061 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
4062 [],
4063 |r| r.get(0),
4064 )
4065 .expect("fts count");
4066
4067 assert_eq!(
4068 chunk_count, 1,
4069 "chunks must remain after node retire so restore can re-establish content"
4070 );
4071 assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
4072 }
4073
4074 #[test]
4075 fn writer_edge_retire_supersedes_active_edge() {
4076 let db = NamedTempFile::new().expect("temporary db");
4077 let writer = WriterActor::start(
4078 db.path(),
4079 Arc::new(SchemaManager::new()),
4080 ProvenanceMode::Warn,
4081 Arc::new(TelemetryCounters::default()),
4082 )
4083 .expect("writer");
4084
4085 writer
4086 .submit(WriteRequest {
4087 label: "seed".to_owned(),
4088 nodes: vec![
4089 NodeInsert {
4090 row_id: "row-a".to_owned(),
4091 logical_id: "node-a".to_owned(),
4092 kind: "Meeting".to_owned(),
4093 properties: "{}".to_owned(),
4094 source_ref: Some("src-1".to_owned()),
4095 upsert: false,
4096 chunk_policy: ChunkPolicy::Preserve,
4097 content_ref: None,
4098 },
4099 NodeInsert {
4100 row_id: "row-b".to_owned(),
4101 logical_id: "node-b".to_owned(),
4102 kind: "Task".to_owned(),
4103 properties: "{}".to_owned(),
4104 source_ref: Some("src-1".to_owned()),
4105 upsert: false,
4106 chunk_policy: ChunkPolicy::Preserve,
4107 content_ref: None,
4108 },
4109 ],
4110 node_retires: vec![],
4111 edges: vec![EdgeInsert {
4112 row_id: "edge-1".to_owned(),
4113 logical_id: "edge-lg-1".to_owned(),
4114 source_logical_id: "node-a".to_owned(),
4115 target_logical_id: "node-b".to_owned(),
4116 kind: "HAS_TASK".to_owned(),
4117 properties: "{}".to_owned(),
4118 source_ref: Some("src-1".to_owned()),
4119 upsert: false,
4120 }],
4121 edge_retires: vec![],
4122 chunks: vec![],
4123 runs: vec![],
4124 steps: vec![],
4125 actions: vec![],
4126 optional_backfills: vec![],
4127 vec_inserts: vec![],
4128 operational_writes: vec![],
4129 })
4130 .expect("seed write");
4131
4132 writer
4133 .submit(WriteRequest {
4134 label: "retire-edge".to_owned(),
4135 nodes: vec![],
4136 node_retires: vec![],
4137 edges: vec![],
4138 edge_retires: vec![EdgeRetire {
4139 logical_id: "edge-lg-1".to_owned(),
4140 source_ref: Some("src-2".to_owned()),
4141 }],
4142 chunks: vec![],
4143 runs: vec![],
4144 steps: vec![],
4145 actions: vec![],
4146 optional_backfills: vec![],
4147 vec_inserts: vec![],
4148 operational_writes: vec![],
4149 })
4150 .expect("retire edge write");
4151
4152 let conn = rusqlite::Connection::open(db.path()).expect("open");
4153 let active: i64 = conn
4154 .query_row(
4155 "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
4156 [],
4157 |r| r.get(0),
4158 )
4159 .expect("active edge count");
4160
4161 assert_eq!(active, 0, "active edge count must be 0 after retire");
4162 }
4163
4164 #[test]
4165 fn writer_retire_without_source_ref_emits_provenance_warning() {
4166 let db = NamedTempFile::new().expect("temporary db");
4167 let writer = WriterActor::start(
4168 db.path(),
4169 Arc::new(SchemaManager::new()),
4170 ProvenanceMode::Warn,
4171 Arc::new(TelemetryCounters::default()),
4172 )
4173 .expect("writer");
4174
4175 writer
4176 .submit(WriteRequest {
4177 label: "seed".to_owned(),
4178 nodes: vec![NodeInsert {
4179 row_id: "row-1".to_owned(),
4180 logical_id: "meeting-1".to_owned(),
4181 kind: "Meeting".to_owned(),
4182 properties: "{}".to_owned(),
4183 source_ref: Some("src-1".to_owned()),
4184 upsert: false,
4185 chunk_policy: ChunkPolicy::Preserve,
4186 content_ref: None,
4187 }],
4188 node_retires: vec![],
4189 edges: vec![],
4190 edge_retires: vec![],
4191 chunks: vec![],
4192 runs: vec![],
4193 steps: vec![],
4194 actions: vec![],
4195 optional_backfills: vec![],
4196 vec_inserts: vec![],
4197 operational_writes: vec![],
4198 })
4199 .expect("seed write");
4200
4201 let receipt = writer
4202 .submit(WriteRequest {
4203 label: "retire-no-src".to_owned(),
4204 nodes: vec![],
4205 node_retires: vec![NodeRetire {
4206 logical_id: "meeting-1".to_owned(),
4207 source_ref: None,
4208 }],
4209 edges: vec![],
4210 edge_retires: vec![],
4211 chunks: vec![],
4212 runs: vec![],
4213 steps: vec![],
4214 actions: vec![],
4215 optional_backfills: vec![],
4216 vec_inserts: vec![],
4217 operational_writes: vec![],
4218 })
4219 .expect("retire write");
4220
4221 assert!(
4222 !receipt.provenance_warnings.is_empty(),
4223 "retire without source_ref must emit a provenance warning"
4224 );
4225 }
4226
4227 #[test]
4228 #[allow(clippy::too_many_lines)]
4229 fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
4230 let db = NamedTempFile::new().expect("temporary db");
4231 let writer = WriterActor::start(
4232 db.path(),
4233 Arc::new(SchemaManager::new()),
4234 ProvenanceMode::Warn,
4235 Arc::new(TelemetryCounters::default()),
4236 )
4237 .expect("writer");
4238
4239 writer
4240 .submit(WriteRequest {
4241 label: "v1".to_owned(),
4242 nodes: vec![NodeInsert {
4243 row_id: "row-1".to_owned(),
4244 logical_id: "meeting-1".to_owned(),
4245 kind: "Meeting".to_owned(),
4246 properties: "{}".to_owned(),
4247 source_ref: Some("src-1".to_owned()),
4248 upsert: false,
4249 chunk_policy: ChunkPolicy::Preserve,
4250 content_ref: None,
4251 }],
4252 node_retires: vec![],
4253 edges: vec![],
4254 edge_retires: vec![],
4255 chunks: vec![ChunkInsert {
4256 id: "chunk-old".to_owned(),
4257 node_logical_id: "meeting-1".to_owned(),
4258 text_content: "old text".to_owned(),
4259 byte_start: None,
4260 byte_end: None,
4261 content_hash: None,
4262 }],
4263 runs: vec![],
4264 steps: vec![],
4265 actions: vec![],
4266 optional_backfills: vec![],
4267 vec_inserts: vec![],
4268 operational_writes: vec![],
4269 })
4270 .expect("v1 write");
4271
4272 writer
4273 .submit(WriteRequest {
4274 label: "v2".to_owned(),
4275 nodes: vec![NodeInsert {
4276 row_id: "row-2".to_owned(),
4277 logical_id: "meeting-1".to_owned(),
4278 kind: "Meeting".to_owned(),
4279 properties: "{}".to_owned(),
4280 source_ref: Some("src-2".to_owned()),
4281 upsert: true,
4282 chunk_policy: ChunkPolicy::Replace,
4283 content_ref: None,
4284 }],
4285 node_retires: vec![],
4286 edges: vec![],
4287 edge_retires: vec![],
4288 chunks: vec![ChunkInsert {
4289 id: "chunk-new".to_owned(),
4290 node_logical_id: "meeting-1".to_owned(),
4291 text_content: "new text".to_owned(),
4292 byte_start: None,
4293 byte_end: None,
4294 content_hash: None,
4295 }],
4296 runs: vec![],
4297 steps: vec![],
4298 actions: vec![],
4299 optional_backfills: vec![],
4300 vec_inserts: vec![],
4301 operational_writes: vec![],
4302 })
4303 .expect("v2 write");
4304
4305 let conn = rusqlite::Connection::open(db.path()).expect("open");
4306 let old_chunk: i64 = conn
4307 .query_row(
4308 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4309 [],
4310 |r| r.get(0),
4311 )
4312 .expect("old chunk count");
4313 let new_chunk: i64 = conn
4314 .query_row(
4315 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4316 [],
4317 |r| r.get(0),
4318 )
4319 .expect("new chunk count");
4320 let fts_old: i64 = conn
4321 .query_row(
4322 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4323 [],
4324 |r| r.get(0),
4325 )
4326 .expect("old fts count");
4327
4328 assert_eq!(
4329 old_chunk, 0,
4330 "old chunk must be deleted by ChunkPolicy::Replace"
4331 );
4332 assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4333 assert_eq!(
4334 fts_old, 0,
4335 "old FTS row must be deleted by ChunkPolicy::Replace"
4336 );
4337 }
4338
4339 #[test]
4340 fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4341 let db = NamedTempFile::new().expect("temporary db");
4342 let writer = WriterActor::start(
4343 db.path(),
4344 Arc::new(SchemaManager::new()),
4345 ProvenanceMode::Warn,
4346 Arc::new(TelemetryCounters::default()),
4347 )
4348 .expect("writer");
4349
4350 writer
4351 .submit(WriteRequest {
4352 label: "v1".to_owned(),
4353 nodes: vec![NodeInsert {
4354 row_id: "row-1".to_owned(),
4355 logical_id: "meeting-1".to_owned(),
4356 kind: "Meeting".to_owned(),
4357 properties: "{}".to_owned(),
4358 source_ref: Some("src-1".to_owned()),
4359 upsert: false,
4360 chunk_policy: ChunkPolicy::Preserve,
4361 content_ref: None,
4362 }],
4363 node_retires: vec![],
4364 edges: vec![],
4365 edge_retires: vec![],
4366 chunks: vec![ChunkInsert {
4367 id: "chunk-old".to_owned(),
4368 node_logical_id: "meeting-1".to_owned(),
4369 text_content: "old text".to_owned(),
4370 byte_start: None,
4371 byte_end: None,
4372 content_hash: None,
4373 }],
4374 runs: vec![],
4375 steps: vec![],
4376 actions: vec![],
4377 optional_backfills: vec![],
4378 vec_inserts: vec![],
4379 operational_writes: vec![],
4380 })
4381 .expect("v1 write");
4382
4383 writer
4384 .submit(WriteRequest {
4385 label: "v2-props-only".to_owned(),
4386 nodes: vec![NodeInsert {
4387 row_id: "row-2".to_owned(),
4388 logical_id: "meeting-1".to_owned(),
4389 kind: "Meeting".to_owned(),
4390 properties: r#"{"status":"updated"}"#.to_owned(),
4391 source_ref: Some("src-2".to_owned()),
4392 upsert: true,
4393 chunk_policy: ChunkPolicy::Preserve,
4394 content_ref: None,
4395 }],
4396 node_retires: vec![],
4397 edges: vec![],
4398 edge_retires: vec![],
4399 chunks: vec![],
4400 runs: vec![],
4401 steps: vec![],
4402 actions: vec![],
4403 optional_backfills: vec![],
4404 vec_inserts: vec![],
4405 operational_writes: vec![],
4406 })
4407 .expect("v2 preserve write");
4408
4409 let conn = rusqlite::Connection::open(db.path()).expect("open");
4410 let old_chunk: i64 = conn
4411 .query_row(
4412 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4413 [],
4414 |r| r.get(0),
4415 )
4416 .expect("old chunk count");
4417
4418 assert_eq!(
4419 old_chunk, 1,
4420 "old chunk must be preserved by ChunkPolicy::Preserve"
4421 );
4422 }
4423
4424 #[test]
4425 fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4426 let db = NamedTempFile::new().expect("temporary db");
4427 let writer = WriterActor::start(
4428 db.path(),
4429 Arc::new(SchemaManager::new()),
4430 ProvenanceMode::Warn,
4431 Arc::new(TelemetryCounters::default()),
4432 )
4433 .expect("writer");
4434
4435 writer
4436 .submit(WriteRequest {
4437 label: "v1".to_owned(),
4438 nodes: vec![NodeInsert {
4439 row_id: "row-1".to_owned(),
4440 logical_id: "meeting-1".to_owned(),
4441 kind: "Meeting".to_owned(),
4442 properties: "{}".to_owned(),
4443 source_ref: Some("src-1".to_owned()),
4444 upsert: false,
4445 chunk_policy: ChunkPolicy::Preserve,
4446 content_ref: None,
4447 }],
4448 node_retires: vec![],
4449 edges: vec![],
4450 edge_retires: vec![],
4451 chunks: vec![ChunkInsert {
4452 id: "chunk-existing".to_owned(),
4453 node_logical_id: "meeting-1".to_owned(),
4454 text_content: "existing text".to_owned(),
4455 byte_start: None,
4456 byte_end: None,
4457 content_hash: None,
4458 }],
4459 runs: vec![],
4460 steps: vec![],
4461 actions: vec![],
4462 optional_backfills: vec![],
4463 vec_inserts: vec![],
4464 operational_writes: vec![],
4465 })
4466 .expect("v1 write");
4467
4468 writer
4470 .submit(WriteRequest {
4471 label: "insert-no-upsert".to_owned(),
4472 nodes: vec![NodeInsert {
4473 row_id: "row-2".to_owned(),
4474 logical_id: "meeting-2".to_owned(),
4475 kind: "Meeting".to_owned(),
4476 properties: "{}".to_owned(),
4477 source_ref: Some("src-2".to_owned()),
4478 upsert: false,
4479 chunk_policy: ChunkPolicy::Replace,
4480 content_ref: None,
4481 }],
4482 node_retires: vec![],
4483 edges: vec![],
4484 edge_retires: vec![],
4485 chunks: vec![],
4486 runs: vec![],
4487 steps: vec![],
4488 actions: vec![],
4489 optional_backfills: vec![],
4490 vec_inserts: vec![],
4491 operational_writes: vec![],
4492 })
4493 .expect("insert no-upsert write");
4494
4495 let conn = rusqlite::Connection::open(db.path()).expect("open");
4496 let existing_chunk: i64 = conn
4497 .query_row(
4498 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4499 [],
4500 |r| r.get(0),
4501 )
4502 .expect("chunk count");
4503
4504 assert_eq!(
4505 existing_chunk, 1,
4506 "ChunkPolicy::Replace without upsert must not delete existing chunks"
4507 );
4508 }
4509
4510 #[test]
4511 fn writer_run_upsert_supersedes_prior_active_run() {
4512 let db = NamedTempFile::new().expect("temporary db");
4513 let writer = WriterActor::start(
4514 db.path(),
4515 Arc::new(SchemaManager::new()),
4516 ProvenanceMode::Warn,
4517 Arc::new(TelemetryCounters::default()),
4518 )
4519 .expect("writer");
4520
4521 writer
4522 .submit(WriteRequest {
4523 label: "v1".to_owned(),
4524 nodes: vec![],
4525 node_retires: vec![],
4526 edges: vec![],
4527 edge_retires: vec![],
4528 chunks: vec![],
4529 runs: vec![RunInsert {
4530 id: "run-v1".to_owned(),
4531 kind: "session".to_owned(),
4532 status: "completed".to_owned(),
4533 properties: "{}".to_owned(),
4534 source_ref: Some("src-1".to_owned()),
4535 upsert: false,
4536 supersedes_id: None,
4537 }],
4538 steps: vec![],
4539 actions: vec![],
4540 optional_backfills: vec![],
4541 vec_inserts: vec![],
4542 operational_writes: vec![],
4543 })
4544 .expect("v1 run write");
4545
4546 writer
4547 .submit(WriteRequest {
4548 label: "v2".to_owned(),
4549 nodes: vec![],
4550 node_retires: vec![],
4551 edges: vec![],
4552 edge_retires: vec![],
4553 chunks: vec![],
4554 runs: vec![RunInsert {
4555 id: "run-v2".to_owned(),
4556 kind: "session".to_owned(),
4557 status: "completed".to_owned(),
4558 properties: "{}".to_owned(),
4559 source_ref: Some("src-2".to_owned()),
4560 upsert: true,
4561 supersedes_id: Some("run-v1".to_owned()),
4562 }],
4563 steps: vec![],
4564 actions: vec![],
4565 optional_backfills: vec![],
4566 vec_inserts: vec![],
4567 operational_writes: vec![],
4568 })
4569 .expect("v2 run write");
4570
4571 let conn = rusqlite::Connection::open(db.path()).expect("open");
4572 let v1_historical: i64 = conn
4573 .query_row(
4574 "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4575 [],
4576 |r| r.get(0),
4577 )
4578 .expect("v1 historical count");
4579 let v2_active: i64 = conn
4580 .query_row(
4581 "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4582 [],
4583 |r| r.get(0),
4584 )
4585 .expect("v2 active count");
4586
4587 assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4588 assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4589 }
4590
4591 #[test]
4592 fn writer_step_upsert_supersedes_prior_active_step() {
4593 let db = NamedTempFile::new().expect("temporary db");
4594 let writer = WriterActor::start(
4595 db.path(),
4596 Arc::new(SchemaManager::new()),
4597 ProvenanceMode::Warn,
4598 Arc::new(TelemetryCounters::default()),
4599 )
4600 .expect("writer");
4601
4602 writer
4603 .submit(WriteRequest {
4604 label: "v1".to_owned(),
4605 nodes: vec![],
4606 node_retires: vec![],
4607 edges: vec![],
4608 edge_retires: vec![],
4609 chunks: vec![],
4610 runs: vec![RunInsert {
4611 id: "run-1".to_owned(),
4612 kind: "session".to_owned(),
4613 status: "completed".to_owned(),
4614 properties: "{}".to_owned(),
4615 source_ref: Some("src-1".to_owned()),
4616 upsert: false,
4617 supersedes_id: None,
4618 }],
4619 steps: vec![StepInsert {
4620 id: "step-v1".to_owned(),
4621 run_id: "run-1".to_owned(),
4622 kind: "llm".to_owned(),
4623 status: "completed".to_owned(),
4624 properties: "{}".to_owned(),
4625 source_ref: Some("src-1".to_owned()),
4626 upsert: false,
4627 supersedes_id: None,
4628 }],
4629 actions: vec![],
4630 optional_backfills: vec![],
4631 vec_inserts: vec![],
4632 operational_writes: vec![],
4633 })
4634 .expect("v1 step write");
4635
4636 writer
4637 .submit(WriteRequest {
4638 label: "v2".to_owned(),
4639 nodes: vec![],
4640 node_retires: vec![],
4641 edges: vec![],
4642 edge_retires: vec![],
4643 chunks: vec![],
4644 runs: vec![],
4645 steps: vec![StepInsert {
4646 id: "step-v2".to_owned(),
4647 run_id: "run-1".to_owned(),
4648 kind: "llm".to_owned(),
4649 status: "completed".to_owned(),
4650 properties: "{}".to_owned(),
4651 source_ref: Some("src-2".to_owned()),
4652 upsert: true,
4653 supersedes_id: Some("step-v1".to_owned()),
4654 }],
4655 actions: vec![],
4656 optional_backfills: vec![],
4657 vec_inserts: vec![],
4658 operational_writes: vec![],
4659 })
4660 .expect("v2 step write");
4661
4662 let conn = rusqlite::Connection::open(db.path()).expect("open");
4663 let v1_historical: i64 = conn
4664 .query_row(
4665 "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4666 [],
4667 |r| r.get(0),
4668 )
4669 .expect("v1 historical count");
4670 let v2_active: i64 = conn
4671 .query_row(
4672 "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4673 [],
4674 |r| r.get(0),
4675 )
4676 .expect("v2 active count");
4677
4678 assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4679 assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4680 }
4681
4682 #[test]
4683 fn writer_action_upsert_supersedes_prior_active_action() {
4684 let db = NamedTempFile::new().expect("temporary db");
4685 let writer = WriterActor::start(
4686 db.path(),
4687 Arc::new(SchemaManager::new()),
4688 ProvenanceMode::Warn,
4689 Arc::new(TelemetryCounters::default()),
4690 )
4691 .expect("writer");
4692
4693 writer
4694 .submit(WriteRequest {
4695 label: "v1".to_owned(),
4696 nodes: vec![],
4697 node_retires: vec![],
4698 edges: vec![],
4699 edge_retires: vec![],
4700 chunks: vec![],
4701 runs: vec![RunInsert {
4702 id: "run-1".to_owned(),
4703 kind: "session".to_owned(),
4704 status: "completed".to_owned(),
4705 properties: "{}".to_owned(),
4706 source_ref: Some("src-1".to_owned()),
4707 upsert: false,
4708 supersedes_id: None,
4709 }],
4710 steps: vec![StepInsert {
4711 id: "step-1".to_owned(),
4712 run_id: "run-1".to_owned(),
4713 kind: "llm".to_owned(),
4714 status: "completed".to_owned(),
4715 properties: "{}".to_owned(),
4716 source_ref: Some("src-1".to_owned()),
4717 upsert: false,
4718 supersedes_id: None,
4719 }],
4720 actions: vec![ActionInsert {
4721 id: "action-v1".to_owned(),
4722 step_id: "step-1".to_owned(),
4723 kind: "emit".to_owned(),
4724 status: "completed".to_owned(),
4725 properties: "{}".to_owned(),
4726 source_ref: Some("src-1".to_owned()),
4727 upsert: false,
4728 supersedes_id: None,
4729 }],
4730 optional_backfills: vec![],
4731 vec_inserts: vec![],
4732 operational_writes: vec![],
4733 })
4734 .expect("v1 action write");
4735
4736 writer
4737 .submit(WriteRequest {
4738 label: "v2".to_owned(),
4739 nodes: vec![],
4740 node_retires: vec![],
4741 edges: vec![],
4742 edge_retires: vec![],
4743 chunks: vec![],
4744 runs: vec![],
4745 steps: vec![],
4746 actions: vec![ActionInsert {
4747 id: "action-v2".to_owned(),
4748 step_id: "step-1".to_owned(),
4749 kind: "emit".to_owned(),
4750 status: "completed".to_owned(),
4751 properties: "{}".to_owned(),
4752 source_ref: Some("src-2".to_owned()),
4753 upsert: true,
4754 supersedes_id: Some("action-v1".to_owned()),
4755 }],
4756 optional_backfills: vec![],
4757 vec_inserts: vec![],
4758 operational_writes: vec![],
4759 })
4760 .expect("v2 action write");
4761
4762 let conn = rusqlite::Connection::open(db.path()).expect("open");
4763 let v1_historical: i64 = conn
4764 .query_row(
4765 "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4766 [],
4767 |r| r.get(0),
4768 )
4769 .expect("v1 historical count");
4770 let v2_active: i64 = conn
4771 .query_row(
4772 "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4773 [],
4774 |r| r.get(0),
4775 )
4776 .expect("v2 active count");
4777
4778 assert_eq!(
4779 v1_historical, 1,
4780 "action-v1 must be historical after upsert"
4781 );
4782 assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4783 }
4784
4785 #[test]
4788 fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4789 let db = NamedTempFile::new().expect("temporary db");
4790 let writer = WriterActor::start(
4791 db.path(),
4792 Arc::new(SchemaManager::new()),
4793 ProvenanceMode::Warn,
4794 Arc::new(TelemetryCounters::default()),
4795 )
4796 .expect("writer");
4797
4798 let result = writer.submit(WriteRequest {
4799 label: "bad".to_owned(),
4800 nodes: vec![],
4801 node_retires: vec![],
4802 edges: vec![],
4803 edge_retires: vec![],
4804 chunks: vec![],
4805 runs: vec![RunInsert {
4806 id: "run-1".to_owned(),
4807 kind: "session".to_owned(),
4808 status: "completed".to_owned(),
4809 properties: "{}".to_owned(),
4810 source_ref: None,
4811 upsert: true,
4812 supersedes_id: None,
4813 }],
4814 steps: vec![],
4815 actions: vec![],
4816 optional_backfills: vec![],
4817 vec_inserts: vec![],
4818 operational_writes: vec![],
4819 });
4820
4821 assert!(
4822 matches!(result, Err(EngineError::InvalidWrite(_))),
4823 "run upsert=true without supersedes_id must return InvalidWrite"
4824 );
4825 }
4826
4827 #[test]
4828 fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4829 let db = NamedTempFile::new().expect("temporary db");
4830 let writer = WriterActor::start(
4831 db.path(),
4832 Arc::new(SchemaManager::new()),
4833 ProvenanceMode::Warn,
4834 Arc::new(TelemetryCounters::default()),
4835 )
4836 .expect("writer");
4837
4838 let result = writer.submit(WriteRequest {
4839 label: "bad".to_owned(),
4840 nodes: vec![],
4841 node_retires: vec![],
4842 edges: vec![],
4843 edge_retires: vec![],
4844 chunks: vec![],
4845 runs: vec![],
4846 steps: vec![StepInsert {
4847 id: "step-1".to_owned(),
4848 run_id: "run-1".to_owned(),
4849 kind: "llm".to_owned(),
4850 status: "completed".to_owned(),
4851 properties: "{}".to_owned(),
4852 source_ref: None,
4853 upsert: true,
4854 supersedes_id: None,
4855 }],
4856 actions: vec![],
4857 optional_backfills: vec![],
4858 vec_inserts: vec![],
4859 operational_writes: vec![],
4860 });
4861
4862 assert!(
4863 matches!(result, Err(EngineError::InvalidWrite(_))),
4864 "step upsert=true without supersedes_id must return InvalidWrite"
4865 );
4866 }
4867
4868 #[test]
4869 fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
4870 let db = NamedTempFile::new().expect("temporary db");
4871 let writer = WriterActor::start(
4872 db.path(),
4873 Arc::new(SchemaManager::new()),
4874 ProvenanceMode::Warn,
4875 Arc::new(TelemetryCounters::default()),
4876 )
4877 .expect("writer");
4878
4879 let result = writer.submit(WriteRequest {
4880 label: "bad".to_owned(),
4881 nodes: vec![],
4882 node_retires: vec![],
4883 edges: vec![],
4884 edge_retires: vec![],
4885 chunks: vec![],
4886 runs: vec![],
4887 steps: vec![],
4888 actions: vec![ActionInsert {
4889 id: "action-1".to_owned(),
4890 step_id: "step-1".to_owned(),
4891 kind: "emit".to_owned(),
4892 status: "completed".to_owned(),
4893 properties: "{}".to_owned(),
4894 source_ref: None,
4895 upsert: true,
4896 supersedes_id: None,
4897 }],
4898 optional_backfills: vec![],
4899 vec_inserts: vec![],
4900 operational_writes: vec![],
4901 });
4902
4903 assert!(
4904 matches!(result, Err(EngineError::InvalidWrite(_))),
4905 "action upsert=true without supersedes_id must return InvalidWrite"
4906 );
4907 }
4908
4909 #[test]
4912 fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
4913 let db = NamedTempFile::new().expect("temporary db");
4914 let writer = WriterActor::start(
4915 db.path(),
4916 Arc::new(SchemaManager::new()),
4917 ProvenanceMode::Warn,
4918 Arc::new(TelemetryCounters::default()),
4919 )
4920 .expect("writer");
4921
4922 let receipt = writer
4923 .submit(WriteRequest {
4924 label: "test".to_owned(),
4925 nodes: vec![
4926 NodeInsert {
4927 row_id: "row-a".to_owned(),
4928 logical_id: "node-a".to_owned(),
4929 kind: "Meeting".to_owned(),
4930 properties: "{}".to_owned(),
4931 source_ref: Some("src-1".to_owned()),
4932 upsert: false,
4933 chunk_policy: ChunkPolicy::Preserve,
4934 content_ref: None,
4935 },
4936 NodeInsert {
4937 row_id: "row-b".to_owned(),
4938 logical_id: "node-b".to_owned(),
4939 kind: "Task".to_owned(),
4940 properties: "{}".to_owned(),
4941 source_ref: Some("src-1".to_owned()),
4942 upsert: false,
4943 chunk_policy: ChunkPolicy::Preserve,
4944 content_ref: None,
4945 },
4946 ],
4947 node_retires: vec![],
4948 edges: vec![EdgeInsert {
4949 row_id: "edge-1".to_owned(),
4950 logical_id: "edge-lg-1".to_owned(),
4951 source_logical_id: "node-a".to_owned(),
4952 target_logical_id: "node-b".to_owned(),
4953 kind: "HAS_TASK".to_owned(),
4954 properties: "{}".to_owned(),
4955 source_ref: None,
4956 upsert: false,
4957 }],
4958 edge_retires: vec![],
4959 chunks: vec![],
4960 runs: vec![],
4961 steps: vec![],
4962 actions: vec![],
4963 optional_backfills: vec![],
4964 vec_inserts: vec![],
4965 operational_writes: vec![],
4966 })
4967 .expect("write");
4968
4969 assert!(
4970 !receipt.provenance_warnings.is_empty(),
4971 "edge insert without source_ref must emit a provenance warning"
4972 );
4973 }
4974
4975 #[test]
4976 fn writer_run_insert_without_source_ref_emits_provenance_warning() {
4977 let db = NamedTempFile::new().expect("temporary db");
4978 let writer = WriterActor::start(
4979 db.path(),
4980 Arc::new(SchemaManager::new()),
4981 ProvenanceMode::Warn,
4982 Arc::new(TelemetryCounters::default()),
4983 )
4984 .expect("writer");
4985
4986 let receipt = writer
4987 .submit(WriteRequest {
4988 label: "test".to_owned(),
4989 nodes: vec![],
4990 node_retires: vec![],
4991 edges: vec![],
4992 edge_retires: vec![],
4993 chunks: vec![],
4994 runs: vec![RunInsert {
4995 id: "run-1".to_owned(),
4996 kind: "session".to_owned(),
4997 status: "completed".to_owned(),
4998 properties: "{}".to_owned(),
4999 source_ref: None,
5000 upsert: false,
5001 supersedes_id: None,
5002 }],
5003 steps: vec![],
5004 actions: vec![],
5005 optional_backfills: vec![],
5006 vec_inserts: vec![],
5007 operational_writes: vec![],
5008 })
5009 .expect("write");
5010
5011 assert!(
5012 !receipt.provenance_warnings.is_empty(),
5013 "run insert without source_ref must emit a provenance warning"
5014 );
5015 }
5016
5017 #[test]
5020 fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
5021 let db = NamedTempFile::new().expect("temporary db");
5022 let writer = WriterActor::start(
5023 db.path(),
5024 Arc::new(SchemaManager::new()),
5025 ProvenanceMode::Warn,
5026 Arc::new(TelemetryCounters::default()),
5027 )
5028 .expect("writer");
5029
5030 writer
5032 .submit(WriteRequest {
5033 label: "seed".to_owned(),
5034 nodes: vec![NodeInsert {
5035 row_id: "row-1".to_owned(),
5036 logical_id: "meeting-1".to_owned(),
5037 kind: "Meeting".to_owned(),
5038 properties: "{}".to_owned(),
5039 source_ref: Some("src-1".to_owned()),
5040 upsert: false,
5041 chunk_policy: ChunkPolicy::Preserve,
5042 content_ref: None,
5043 }],
5044 node_retires: vec![],
5045 edges: vec![],
5046 edge_retires: vec![],
5047 chunks: vec![],
5048 runs: vec![],
5049 steps: vec![],
5050 actions: vec![],
5051 optional_backfills: vec![],
5052 vec_inserts: vec![],
5053 operational_writes: vec![],
5054 })
5055 .expect("seed write");
5056
5057 let result = writer.submit(WriteRequest {
5059 label: "bad".to_owned(),
5060 nodes: vec![],
5061 node_retires: vec![NodeRetire {
5062 logical_id: "meeting-1".to_owned(),
5063 source_ref: Some("src-2".to_owned()),
5064 }],
5065 edges: vec![],
5066 edge_retires: vec![],
5067 chunks: vec![ChunkInsert {
5068 id: "chunk-bad".to_owned(),
5069 node_logical_id: "meeting-1".to_owned(),
5070 text_content: "some text".to_owned(),
5071 byte_start: None,
5072 byte_end: None,
5073 content_hash: None,
5074 }],
5075 runs: vec![],
5076 steps: vec![],
5077 actions: vec![],
5078 optional_backfills: vec![],
5079 vec_inserts: vec![],
5080 operational_writes: vec![],
5081 });
5082
5083 assert!(
5084 matches!(result, Err(EngineError::InvalidWrite(_))),
5085 "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
5086 );
5087 }
5088
5089 #[test]
5092 fn writer_batch_insert_multiple_nodes() {
5093 let db = NamedTempFile::new().expect("temporary db");
5094 let writer = WriterActor::start(
5095 db.path(),
5096 Arc::new(SchemaManager::new()),
5097 ProvenanceMode::Warn,
5098 Arc::new(TelemetryCounters::default()),
5099 )
5100 .expect("writer");
5101
5102 let nodes: Vec<NodeInsert> = (0..100)
5103 .map(|i| NodeInsert {
5104 row_id: format!("row-{i}"),
5105 logical_id: format!("lg-{i}"),
5106 kind: "Note".to_owned(),
5107 properties: "{}".to_owned(),
5108 source_ref: Some("batch-src".to_owned()),
5109 upsert: false,
5110 chunk_policy: ChunkPolicy::Preserve,
5111 content_ref: None,
5112 })
5113 .collect();
5114
5115 writer
5116 .submit(WriteRequest {
5117 label: "batch".to_owned(),
5118 nodes,
5119 node_retires: vec![],
5120 edges: vec![],
5121 edge_retires: vec![],
5122 chunks: vec![],
5123 runs: vec![],
5124 steps: vec![],
5125 actions: vec![],
5126 optional_backfills: vec![],
5127 vec_inserts: vec![],
5128 operational_writes: vec![],
5129 })
5130 .expect("batch write");
5131
5132 let conn = rusqlite::Connection::open(db.path()).expect("open");
5133 let count: i64 = conn
5134 .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
5135 .expect("count nodes");
5136 assert_eq!(
5137 count, 100,
5138 "all 100 nodes must be present after batch insert"
5139 );
5140 }
5141
5142 #[test]
5145 fn prepare_write_rejects_empty_node_row_id() {
5146 let db = NamedTempFile::new().expect("temporary db");
5147 let writer = WriterActor::start(
5148 db.path(),
5149 Arc::new(SchemaManager::new()),
5150 ProvenanceMode::Warn,
5151 Arc::new(TelemetryCounters::default()),
5152 )
5153 .expect("writer");
5154
5155 let result = writer.submit(WriteRequest {
5156 label: "test".to_owned(),
5157 nodes: vec![NodeInsert {
5158 row_id: String::new(),
5159 logical_id: "lg-1".to_owned(),
5160 kind: "Note".to_owned(),
5161 properties: "{}".to_owned(),
5162 source_ref: None,
5163 upsert: false,
5164 chunk_policy: ChunkPolicy::Preserve,
5165 content_ref: None,
5166 }],
5167 node_retires: vec![],
5168 edges: vec![],
5169 edge_retires: vec![],
5170 chunks: vec![],
5171 runs: vec![],
5172 steps: vec![],
5173 actions: vec![],
5174 optional_backfills: vec![],
5175 vec_inserts: vec![],
5176 operational_writes: vec![],
5177 });
5178
5179 assert!(
5180 matches!(result, Err(EngineError::InvalidWrite(_))),
5181 "empty row_id must be rejected"
5182 );
5183 }
5184
5185 #[test]
5186 fn prepare_write_rejects_empty_node_logical_id() {
5187 let db = NamedTempFile::new().expect("temporary db");
5188 let writer = WriterActor::start(
5189 db.path(),
5190 Arc::new(SchemaManager::new()),
5191 ProvenanceMode::Warn,
5192 Arc::new(TelemetryCounters::default()),
5193 )
5194 .expect("writer");
5195
5196 let result = writer.submit(WriteRequest {
5197 label: "test".to_owned(),
5198 nodes: vec![NodeInsert {
5199 row_id: "row-1".to_owned(),
5200 logical_id: String::new(),
5201 kind: "Note".to_owned(),
5202 properties: "{}".to_owned(),
5203 source_ref: None,
5204 upsert: false,
5205 chunk_policy: ChunkPolicy::Preserve,
5206 content_ref: None,
5207 }],
5208 node_retires: vec![],
5209 edges: vec![],
5210 edge_retires: vec![],
5211 chunks: vec![],
5212 runs: vec![],
5213 steps: vec![],
5214 actions: vec![],
5215 optional_backfills: vec![],
5216 vec_inserts: vec![],
5217 operational_writes: vec![],
5218 });
5219
5220 assert!(
5221 matches!(result, Err(EngineError::InvalidWrite(_))),
5222 "empty logical_id must be rejected"
5223 );
5224 }
5225
5226 #[test]
5227 fn prepare_write_rejects_duplicate_row_ids_in_request() {
5228 let db = NamedTempFile::new().expect("temporary db");
5229 let writer = WriterActor::start(
5230 db.path(),
5231 Arc::new(SchemaManager::new()),
5232 ProvenanceMode::Warn,
5233 Arc::new(TelemetryCounters::default()),
5234 )
5235 .expect("writer");
5236
5237 let result = writer.submit(WriteRequest {
5238 label: "test".to_owned(),
5239 nodes: vec![
5240 NodeInsert {
5241 row_id: "row-1".to_owned(),
5242 logical_id: "lg-1".to_owned(),
5243 kind: "Note".to_owned(),
5244 properties: "{}".to_owned(),
5245 source_ref: None,
5246 upsert: false,
5247 chunk_policy: ChunkPolicy::Preserve,
5248 content_ref: None,
5249 },
5250 NodeInsert {
5251 row_id: "row-1".to_owned(), logical_id: "lg-2".to_owned(),
5253 kind: "Note".to_owned(),
5254 properties: "{}".to_owned(),
5255 source_ref: None,
5256 upsert: false,
5257 chunk_policy: ChunkPolicy::Preserve,
5258 content_ref: None,
5259 },
5260 ],
5261 node_retires: vec![],
5262 edges: vec![],
5263 edge_retires: vec![],
5264 chunks: vec![],
5265 runs: vec![],
5266 steps: vec![],
5267 actions: vec![],
5268 optional_backfills: vec![],
5269 vec_inserts: vec![],
5270 operational_writes: vec![],
5271 });
5272
5273 assert!(
5274 matches!(result, Err(EngineError::InvalidWrite(_))),
5275 "duplicate row_id within request must be rejected"
5276 );
5277 }
5278
5279 #[test]
5280 fn prepare_write_rejects_empty_chunk_id() {
5281 let db = NamedTempFile::new().expect("temporary db");
5282 let writer = WriterActor::start(
5283 db.path(),
5284 Arc::new(SchemaManager::new()),
5285 ProvenanceMode::Warn,
5286 Arc::new(TelemetryCounters::default()),
5287 )
5288 .expect("writer");
5289
5290 let result = writer.submit(WriteRequest {
5291 label: "test".to_owned(),
5292 nodes: vec![NodeInsert {
5293 row_id: "row-1".to_owned(),
5294 logical_id: "lg-1".to_owned(),
5295 kind: "Note".to_owned(),
5296 properties: "{}".to_owned(),
5297 source_ref: None,
5298 upsert: false,
5299 chunk_policy: ChunkPolicy::Preserve,
5300 content_ref: None,
5301 }],
5302 node_retires: vec![],
5303 edges: vec![],
5304 edge_retires: vec![],
5305 chunks: vec![ChunkInsert {
5306 id: String::new(),
5307 node_logical_id: "lg-1".to_owned(),
5308 text_content: "some text".to_owned(),
5309 byte_start: None,
5310 byte_end: None,
5311 content_hash: None,
5312 }],
5313 runs: vec![],
5314 steps: vec![],
5315 actions: vec![],
5316 optional_backfills: vec![],
5317 vec_inserts: vec![],
5318 operational_writes: vec![],
5319 });
5320
5321 assert!(
5322 matches!(result, Err(EngineError::InvalidWrite(_))),
5323 "empty chunk id must be rejected"
5324 );
5325 }
5326
5327 #[test]
5330 fn writer_receipt_warns_on_step_without_source_ref() {
5331 let db = NamedTempFile::new().expect("temporary db");
5332 let writer = WriterActor::start(
5333 db.path(),
5334 Arc::new(SchemaManager::new()),
5335 ProvenanceMode::Warn,
5336 Arc::new(TelemetryCounters::default()),
5337 )
5338 .expect("writer");
5339
5340 writer
5342 .submit(WriteRequest {
5343 label: "seed-run".to_owned(),
5344 nodes: vec![],
5345 node_retires: vec![],
5346 edges: vec![],
5347 edge_retires: vec![],
5348 chunks: vec![],
5349 runs: vec![RunInsert {
5350 id: "run-1".to_owned(),
5351 kind: "session".to_owned(),
5352 status: "active".to_owned(),
5353 properties: "{}".to_owned(),
5354 source_ref: Some("src-1".to_owned()),
5355 upsert: false,
5356 supersedes_id: None,
5357 }],
5358 steps: vec![],
5359 actions: vec![],
5360 optional_backfills: vec![],
5361 vec_inserts: vec![],
5362 operational_writes: vec![],
5363 })
5364 .expect("seed run");
5365
5366 let receipt = writer
5367 .submit(WriteRequest {
5368 label: "test".to_owned(),
5369 nodes: vec![],
5370 node_retires: vec![],
5371 edges: vec![],
5372 edge_retires: vec![],
5373 chunks: vec![],
5374 runs: vec![],
5375 steps: vec![StepInsert {
5376 id: "step-1".to_owned(),
5377 run_id: "run-1".to_owned(),
5378 kind: "llm_call".to_owned(),
5379 status: "completed".to_owned(),
5380 properties: "{}".to_owned(),
5381 source_ref: None,
5382 upsert: false,
5383 supersedes_id: None,
5384 }],
5385 actions: vec![],
5386 optional_backfills: vec![],
5387 vec_inserts: vec![],
5388 operational_writes: vec![],
5389 })
5390 .expect("write");
5391
5392 assert!(
5393 !receipt.provenance_warnings.is_empty(),
5394 "step insert without source_ref must emit a provenance warning"
5395 );
5396 }
5397
5398 #[test]
5399 fn writer_receipt_warns_on_action_without_source_ref() {
5400 let db = NamedTempFile::new().expect("temporary db");
5401 let writer = WriterActor::start(
5402 db.path(),
5403 Arc::new(SchemaManager::new()),
5404 ProvenanceMode::Warn,
5405 Arc::new(TelemetryCounters::default()),
5406 )
5407 .expect("writer");
5408
5409 writer
5411 .submit(WriteRequest {
5412 label: "seed".to_owned(),
5413 nodes: vec![],
5414 node_retires: vec![],
5415 edges: vec![],
5416 edge_retires: vec![],
5417 chunks: vec![],
5418 runs: vec![RunInsert {
5419 id: "run-1".to_owned(),
5420 kind: "session".to_owned(),
5421 status: "active".to_owned(),
5422 properties: "{}".to_owned(),
5423 source_ref: Some("src-1".to_owned()),
5424 upsert: false,
5425 supersedes_id: None,
5426 }],
5427 steps: vec![StepInsert {
5428 id: "step-1".to_owned(),
5429 run_id: "run-1".to_owned(),
5430 kind: "llm_call".to_owned(),
5431 status: "completed".to_owned(),
5432 properties: "{}".to_owned(),
5433 source_ref: Some("src-1".to_owned()),
5434 upsert: false,
5435 supersedes_id: None,
5436 }],
5437 actions: vec![],
5438 optional_backfills: vec![],
5439 vec_inserts: vec![],
5440 operational_writes: vec![],
5441 })
5442 .expect("seed");
5443
5444 let receipt = writer
5445 .submit(WriteRequest {
5446 label: "test".to_owned(),
5447 nodes: vec![],
5448 node_retires: vec![],
5449 edges: vec![],
5450 edge_retires: vec![],
5451 chunks: vec![],
5452 runs: vec![],
5453 steps: vec![],
5454 actions: vec![ActionInsert {
5455 id: "action-1".to_owned(),
5456 step_id: "step-1".to_owned(),
5457 kind: "tool_call".to_owned(),
5458 status: "completed".to_owned(),
5459 properties: "{}".to_owned(),
5460 source_ref: None,
5461 upsert: false,
5462 supersedes_id: None,
5463 }],
5464 optional_backfills: vec![],
5465 vec_inserts: vec![],
5466 operational_writes: vec![],
5467 })
5468 .expect("write");
5469
5470 assert!(
5471 !receipt.provenance_warnings.is_empty(),
5472 "action insert without source_ref must emit a provenance warning"
5473 );
5474 }
5475
5476 #[test]
5477 fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5478 let db = NamedTempFile::new().expect("temporary db");
5479 let writer = WriterActor::start(
5480 db.path(),
5481 Arc::new(SchemaManager::new()),
5482 ProvenanceMode::Warn,
5483 Arc::new(TelemetryCounters::default()),
5484 )
5485 .expect("writer");
5486
5487 let receipt = writer
5488 .submit(WriteRequest {
5489 label: "test".to_owned(),
5490 nodes: vec![NodeInsert {
5491 row_id: "row-1".to_owned(),
5492 logical_id: "node-1".to_owned(),
5493 kind: "Note".to_owned(),
5494 properties: "{}".to_owned(),
5495 source_ref: Some("src-1".to_owned()),
5496 upsert: false,
5497 chunk_policy: ChunkPolicy::Preserve,
5498 content_ref: None,
5499 }],
5500 node_retires: vec![],
5501 edges: vec![],
5502 edge_retires: vec![],
5503 chunks: vec![],
5504 runs: vec![RunInsert {
5505 id: "run-1".to_owned(),
5506 kind: "session".to_owned(),
5507 status: "active".to_owned(),
5508 properties: "{}".to_owned(),
5509 source_ref: Some("src-1".to_owned()),
5510 upsert: false,
5511 supersedes_id: None,
5512 }],
5513 steps: vec![StepInsert {
5514 id: "step-1".to_owned(),
5515 run_id: "run-1".to_owned(),
5516 kind: "llm_call".to_owned(),
5517 status: "completed".to_owned(),
5518 properties: "{}".to_owned(),
5519 source_ref: Some("src-1".to_owned()),
5520 upsert: false,
5521 supersedes_id: None,
5522 }],
5523 actions: vec![ActionInsert {
5524 id: "action-1".to_owned(),
5525 step_id: "step-1".to_owned(),
5526 kind: "tool_call".to_owned(),
5527 status: "completed".to_owned(),
5528 properties: "{}".to_owned(),
5529 source_ref: Some("src-1".to_owned()),
5530 upsert: false,
5531 supersedes_id: None,
5532 }],
5533 optional_backfills: vec![],
5534 vec_inserts: vec![],
5535 operational_writes: vec![],
5536 })
5537 .expect("write");
5538
5539 assert!(
5540 receipt.provenance_warnings.is_empty(),
5541 "no warnings expected when all types have source_ref; got: {:?}",
5542 receipt.provenance_warnings
5543 );
5544 }
5545
5546 #[test]
5549 fn default_provenance_mode_is_warn() {
5550 let db = NamedTempFile::new().expect("temporary db");
5552 let writer = WriterActor::start(
5553 db.path(),
5554 Arc::new(SchemaManager::new()),
5555 ProvenanceMode::default(),
5556 Arc::new(TelemetryCounters::default()),
5557 )
5558 .expect("writer");
5559
5560 let receipt = writer
5561 .submit(WriteRequest {
5562 label: "test".to_owned(),
5563 nodes: vec![NodeInsert {
5564 row_id: "row-1".to_owned(),
5565 logical_id: "node-1".to_owned(),
5566 kind: "Note".to_owned(),
5567 properties: "{}".to_owned(),
5568 source_ref: None,
5569 upsert: false,
5570 chunk_policy: ChunkPolicy::Preserve,
5571 content_ref: None,
5572 }],
5573 node_retires: vec![],
5574 edges: vec![],
5575 edge_retires: vec![],
5576 chunks: vec![],
5577 runs: vec![],
5578 steps: vec![],
5579 actions: vec![],
5580 optional_backfills: vec![],
5581 vec_inserts: vec![],
5582 operational_writes: vec![],
5583 })
5584 .expect("Warn mode must not reject missing source_ref");
5585
5586 assert!(
5587 !receipt.provenance_warnings.is_empty(),
5588 "Warn mode must emit a warning instead of rejecting"
5589 );
5590 }
5591
5592 #[test]
5593 fn require_mode_rejects_node_without_source_ref() {
5594 let db = NamedTempFile::new().expect("temporary db");
5595 let writer = WriterActor::start(
5596 db.path(),
5597 Arc::new(SchemaManager::new()),
5598 ProvenanceMode::Require,
5599 Arc::new(TelemetryCounters::default()),
5600 )
5601 .expect("writer");
5602
5603 let result = writer.submit(WriteRequest {
5604 label: "test".to_owned(),
5605 nodes: vec![NodeInsert {
5606 row_id: "row-1".to_owned(),
5607 logical_id: "node-1".to_owned(),
5608 kind: "Note".to_owned(),
5609 properties: "{}".to_owned(),
5610 source_ref: None,
5611 upsert: false,
5612 chunk_policy: ChunkPolicy::Preserve,
5613 content_ref: None,
5614 }],
5615 node_retires: vec![],
5616 edges: vec![],
5617 edge_retires: vec![],
5618 chunks: vec![],
5619 runs: vec![],
5620 steps: vec![],
5621 actions: vec![],
5622 optional_backfills: vec![],
5623 vec_inserts: vec![],
5624 operational_writes: vec![],
5625 });
5626
5627 assert!(
5628 matches!(result, Err(EngineError::InvalidWrite(_))),
5629 "Require mode must reject node without source_ref"
5630 );
5631 }
5632
5633 #[test]
5634 fn require_mode_accepts_node_with_source_ref() {
5635 let db = NamedTempFile::new().expect("temporary db");
5636 let writer = WriterActor::start(
5637 db.path(),
5638 Arc::new(SchemaManager::new()),
5639 ProvenanceMode::Require,
5640 Arc::new(TelemetryCounters::default()),
5641 )
5642 .expect("writer");
5643
5644 let result = writer.submit(WriteRequest {
5645 label: "test".to_owned(),
5646 nodes: vec![NodeInsert {
5647 row_id: "row-1".to_owned(),
5648 logical_id: "node-1".to_owned(),
5649 kind: "Note".to_owned(),
5650 properties: "{}".to_owned(),
5651 source_ref: Some("src-1".to_owned()),
5652 upsert: false,
5653 chunk_policy: ChunkPolicy::Preserve,
5654 content_ref: None,
5655 }],
5656 node_retires: vec![],
5657 edges: vec![],
5658 edge_retires: vec![],
5659 chunks: vec![],
5660 runs: vec![],
5661 steps: vec![],
5662 actions: vec![],
5663 optional_backfills: vec![],
5664 vec_inserts: vec![],
5665 operational_writes: vec![],
5666 });
5667
5668 assert!(
5669 result.is_ok(),
5670 "Require mode must accept node with source_ref"
5671 );
5672 }
5673
5674 #[test]
5675 fn require_mode_rejects_edge_without_source_ref() {
5676 let db = NamedTempFile::new().expect("temporary db");
5677 let writer = WriterActor::start(
5678 db.path(),
5679 Arc::new(SchemaManager::new()),
5680 ProvenanceMode::Require,
5681 Arc::new(TelemetryCounters::default()),
5682 )
5683 .expect("writer");
5684
5685 let result = writer.submit(WriteRequest {
5687 label: "test".to_owned(),
5688 nodes: vec![
5689 NodeInsert {
5690 row_id: "row-a".to_owned(),
5691 logical_id: "node-a".to_owned(),
5692 kind: "Note".to_owned(),
5693 properties: "{}".to_owned(),
5694 source_ref: Some("src-1".to_owned()),
5695 upsert: false,
5696 chunk_policy: ChunkPolicy::Preserve,
5697 content_ref: None,
5698 },
5699 NodeInsert {
5700 row_id: "row-b".to_owned(),
5701 logical_id: "node-b".to_owned(),
5702 kind: "Note".to_owned(),
5703 properties: "{}".to_owned(),
5704 source_ref: Some("src-1".to_owned()),
5705 upsert: false,
5706 chunk_policy: ChunkPolicy::Preserve,
5707 content_ref: None,
5708 },
5709 ],
5710 node_retires: vec![],
5711 edges: vec![EdgeInsert {
5712 row_id: "edge-row-1".to_owned(),
5713 logical_id: "edge-1".to_owned(),
5714 source_logical_id: "node-a".to_owned(),
5715 target_logical_id: "node-b".to_owned(),
5716 kind: "LINKS_TO".to_owned(),
5717 properties: "{}".to_owned(),
5718 source_ref: None,
5719 upsert: false,
5720 }],
5721 edge_retires: vec![],
5722 chunks: vec![],
5723 runs: vec![],
5724 steps: vec![],
5725 actions: vec![],
5726 optional_backfills: vec![],
5727 vec_inserts: vec![],
5728 operational_writes: vec![],
5729 });
5730
5731 assert!(
5732 matches!(result, Err(EngineError::InvalidWrite(_))),
5733 "Require mode must reject edge without source_ref"
5734 );
5735 }
5736
5737 #[test]
5740 fn fts_row_has_correct_kind_from_co_submitted_node() {
5741 let db = NamedTempFile::new().expect("temporary db");
5742 let writer = WriterActor::start(
5743 db.path(),
5744 Arc::new(SchemaManager::new()),
5745 ProvenanceMode::Warn,
5746 Arc::new(TelemetryCounters::default()),
5747 )
5748 .expect("writer");
5749
5750 writer
5751 .submit(WriteRequest {
5752 label: "test".to_owned(),
5753 nodes: vec![NodeInsert {
5754 row_id: "row-1".to_owned(),
5755 logical_id: "node-1".to_owned(),
5756 kind: "Meeting".to_owned(),
5757 properties: "{}".to_owned(),
5758 source_ref: Some("src-1".to_owned()),
5759 upsert: false,
5760 chunk_policy: ChunkPolicy::Preserve,
5761 content_ref: None,
5762 }],
5763 node_retires: vec![],
5764 edges: vec![],
5765 edge_retires: vec![],
5766 chunks: vec![ChunkInsert {
5767 id: "chunk-1".to_owned(),
5768 node_logical_id: "node-1".to_owned(),
5769 text_content: "some text".to_owned(),
5770 byte_start: None,
5771 byte_end: None,
5772 content_hash: None,
5773 }],
5774 runs: vec![],
5775 steps: vec![],
5776 actions: vec![],
5777 optional_backfills: vec![],
5778 vec_inserts: vec![],
5779 operational_writes: vec![],
5780 })
5781 .expect("write");
5782
5783 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5784 let kind: String = conn
5785 .query_row(
5786 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5787 [],
5788 |row| row.get(0),
5789 )
5790 .expect("fts row");
5791
5792 assert_eq!(kind, "Meeting");
5793 }
5794
5795 #[test]
5796 fn fts_row_has_correct_text_content() {
5797 let db = NamedTempFile::new().expect("temporary db");
5798 let writer = WriterActor::start(
5799 db.path(),
5800 Arc::new(SchemaManager::new()),
5801 ProvenanceMode::Warn,
5802 Arc::new(TelemetryCounters::default()),
5803 )
5804 .expect("writer");
5805
5806 writer
5807 .submit(WriteRequest {
5808 label: "test".to_owned(),
5809 nodes: vec![NodeInsert {
5810 row_id: "row-1".to_owned(),
5811 logical_id: "node-1".to_owned(),
5812 kind: "Note".to_owned(),
5813 properties: "{}".to_owned(),
5814 source_ref: Some("src-1".to_owned()),
5815 upsert: false,
5816 chunk_policy: ChunkPolicy::Preserve,
5817 content_ref: None,
5818 }],
5819 node_retires: vec![],
5820 edges: vec![],
5821 edge_retires: vec![],
5822 chunks: vec![ChunkInsert {
5823 id: "chunk-1".to_owned(),
5824 node_logical_id: "node-1".to_owned(),
5825 text_content: "exactly this text".to_owned(),
5826 byte_start: None,
5827 byte_end: None,
5828 content_hash: None,
5829 }],
5830 runs: vec![],
5831 steps: vec![],
5832 actions: vec![],
5833 optional_backfills: vec![],
5834 vec_inserts: vec![],
5835 operational_writes: vec![],
5836 })
5837 .expect("write");
5838
5839 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5840 let text: String = conn
5841 .query_row(
5842 "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5843 [],
5844 |row| row.get(0),
5845 )
5846 .expect("fts row");
5847
5848 assert_eq!(text, "exactly this text");
5849 }
5850
5851 #[test]
5852 fn fts_row_has_correct_kind_from_pre_existing_node() {
5853 let db = NamedTempFile::new().expect("temporary db");
5854 let writer = WriterActor::start(
5855 db.path(),
5856 Arc::new(SchemaManager::new()),
5857 ProvenanceMode::Warn,
5858 Arc::new(TelemetryCounters::default()),
5859 )
5860 .expect("writer");
5861
5862 writer
5864 .submit(WriteRequest {
5865 label: "r1".to_owned(),
5866 nodes: vec![NodeInsert {
5867 row_id: "row-1".to_owned(),
5868 logical_id: "node-1".to_owned(),
5869 kind: "Document".to_owned(),
5870 properties: "{}".to_owned(),
5871 source_ref: Some("src-1".to_owned()),
5872 upsert: false,
5873 chunk_policy: ChunkPolicy::Preserve,
5874 content_ref: None,
5875 }],
5876 node_retires: vec![],
5877 edges: vec![],
5878 edge_retires: vec![],
5879 chunks: vec![],
5880 runs: vec![],
5881 steps: vec![],
5882 actions: vec![],
5883 optional_backfills: vec![],
5884 vec_inserts: vec![],
5885 operational_writes: vec![],
5886 })
5887 .expect("r1 write");
5888
5889 writer
5891 .submit(WriteRequest {
5892 label: "r2".to_owned(),
5893 nodes: vec![],
5894 node_retires: vec![],
5895 edges: vec![],
5896 edge_retires: vec![],
5897 chunks: vec![ChunkInsert {
5898 id: "chunk-1".to_owned(),
5899 node_logical_id: "node-1".to_owned(),
5900 text_content: "some text".to_owned(),
5901 byte_start: None,
5902 byte_end: None,
5903 content_hash: None,
5904 }],
5905 runs: vec![],
5906 steps: vec![],
5907 actions: vec![],
5908 optional_backfills: vec![],
5909 vec_inserts: vec![],
5910 operational_writes: vec![],
5911 })
5912 .expect("r2 write");
5913
5914 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5915 let kind: String = conn
5916 .query_row(
5917 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5918 [],
5919 |row| row.get(0),
5920 )
5921 .expect("fts row");
5922
5923 assert_eq!(kind, "Document");
5924 }
5925
5926 #[test]
5927 fn fts_derives_rows_for_multiple_chunks_per_node() {
5928 let db = NamedTempFile::new().expect("temporary db");
5929 let writer = WriterActor::start(
5930 db.path(),
5931 Arc::new(SchemaManager::new()),
5932 ProvenanceMode::Warn,
5933 Arc::new(TelemetryCounters::default()),
5934 )
5935 .expect("writer");
5936
5937 writer
5938 .submit(WriteRequest {
5939 label: "test".to_owned(),
5940 nodes: vec![NodeInsert {
5941 row_id: "row-1".to_owned(),
5942 logical_id: "node-1".to_owned(),
5943 kind: "Meeting".to_owned(),
5944 properties: "{}".to_owned(),
5945 source_ref: Some("src-1".to_owned()),
5946 upsert: false,
5947 chunk_policy: ChunkPolicy::Preserve,
5948 content_ref: None,
5949 }],
5950 node_retires: vec![],
5951 edges: vec![],
5952 edge_retires: vec![],
5953 chunks: vec![
5954 ChunkInsert {
5955 id: "chunk-a".to_owned(),
5956 node_logical_id: "node-1".to_owned(),
5957 text_content: "intro".to_owned(),
5958 byte_start: None,
5959 byte_end: None,
5960 content_hash: None,
5961 },
5962 ChunkInsert {
5963 id: "chunk-b".to_owned(),
5964 node_logical_id: "node-1".to_owned(),
5965 text_content: "body".to_owned(),
5966 byte_start: None,
5967 byte_end: None,
5968 content_hash: None,
5969 },
5970 ChunkInsert {
5971 id: "chunk-c".to_owned(),
5972 node_logical_id: "node-1".to_owned(),
5973 text_content: "conclusion".to_owned(),
5974 byte_start: None,
5975 byte_end: None,
5976 content_hash: None,
5977 },
5978 ],
5979 runs: vec![],
5980 steps: vec![],
5981 actions: vec![],
5982 optional_backfills: vec![],
5983 vec_inserts: vec![],
5984 operational_writes: vec![],
5985 })
5986 .expect("write");
5987
5988 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5989 let count: i64 = conn
5990 .query_row(
5991 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5992 [],
5993 |row| row.get(0),
5994 )
5995 .expect("fts count");
5996
5997 assert_eq!(count, 3, "three chunks must produce three FTS rows");
5998 }
5999
6000 #[test]
6001 fn fts_resolves_mixed_fast_and_db_paths() {
6002 let db = NamedTempFile::new().expect("temporary db");
6003 let writer = WriterActor::start(
6004 db.path(),
6005 Arc::new(SchemaManager::new()),
6006 ProvenanceMode::Warn,
6007 Arc::new(TelemetryCounters::default()),
6008 )
6009 .expect("writer");
6010
6011 writer
6013 .submit(WriteRequest {
6014 label: "seed".to_owned(),
6015 nodes: vec![NodeInsert {
6016 row_id: "row-existing".to_owned(),
6017 logical_id: "existing-node".to_owned(),
6018 kind: "Archive".to_owned(),
6019 properties: "{}".to_owned(),
6020 source_ref: Some("src-1".to_owned()),
6021 upsert: false,
6022 chunk_policy: ChunkPolicy::Preserve,
6023 content_ref: None,
6024 }],
6025 node_retires: vec![],
6026 edges: vec![],
6027 edge_retires: vec![],
6028 chunks: vec![],
6029 runs: vec![],
6030 steps: vec![],
6031 actions: vec![],
6032 optional_backfills: vec![],
6033 vec_inserts: vec![],
6034 operational_writes: vec![],
6035 })
6036 .expect("seed");
6037
6038 writer
6040 .submit(WriteRequest {
6041 label: "mixed".to_owned(),
6042 nodes: vec![NodeInsert {
6043 row_id: "row-new".to_owned(),
6044 logical_id: "new-node".to_owned(),
6045 kind: "Inbox".to_owned(),
6046 properties: "{}".to_owned(),
6047 source_ref: Some("src-2".to_owned()),
6048 upsert: false,
6049 chunk_policy: ChunkPolicy::Preserve,
6050 content_ref: None,
6051 }],
6052 node_retires: vec![],
6053 edges: vec![],
6054 edge_retires: vec![],
6055 chunks: vec![
6056 ChunkInsert {
6057 id: "chunk-fast".to_owned(),
6058 node_logical_id: "new-node".to_owned(),
6059 text_content: "new content".to_owned(),
6060 byte_start: None,
6061 byte_end: None,
6062 content_hash: None,
6063 },
6064 ChunkInsert {
6065 id: "chunk-db".to_owned(),
6066 node_logical_id: "existing-node".to_owned(),
6067 text_content: "archive content".to_owned(),
6068 byte_start: None,
6069 byte_end: None,
6070 content_hash: None,
6071 },
6072 ],
6073 runs: vec![],
6074 steps: vec![],
6075 actions: vec![],
6076 optional_backfills: vec![],
6077 vec_inserts: vec![],
6078 operational_writes: vec![],
6079 })
6080 .expect("mixed write");
6081
6082 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6083 let fast_kind: String = conn
6084 .query_row(
6085 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
6086 [],
6087 |row| row.get(0),
6088 )
6089 .expect("fast path fts row");
6090 let db_kind: String = conn
6091 .query_row(
6092 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
6093 [],
6094 |row| row.get(0),
6095 )
6096 .expect("db path fts row");
6097
6098 assert_eq!(fast_kind, "Inbox");
6099 assert_eq!(db_kind, "Archive");
6100 }
6101
6102 #[test]
6103 fn prepare_write_rejects_empty_chunk_text() {
6104 let db = NamedTempFile::new().expect("temporary db");
6105 let writer = WriterActor::start(
6106 db.path(),
6107 Arc::new(SchemaManager::new()),
6108 ProvenanceMode::Warn,
6109 Arc::new(TelemetryCounters::default()),
6110 )
6111 .expect("writer");
6112
6113 let result = writer.submit(WriteRequest {
6114 label: "test".to_owned(),
6115 nodes: vec![NodeInsert {
6116 row_id: "row-1".to_owned(),
6117 logical_id: "node-1".to_owned(),
6118 kind: "Note".to_owned(),
6119 properties: "{}".to_owned(),
6120 source_ref: None,
6121 upsert: false,
6122 chunk_policy: ChunkPolicy::Preserve,
6123 content_ref: None,
6124 }],
6125 node_retires: vec![],
6126 edges: vec![],
6127 edge_retires: vec![],
6128 chunks: vec![ChunkInsert {
6129 id: "chunk-1".to_owned(),
6130 node_logical_id: "node-1".to_owned(),
6131 text_content: String::new(),
6132 byte_start: None,
6133 byte_end: None,
6134 content_hash: None,
6135 }],
6136 runs: vec![],
6137 steps: vec![],
6138 actions: vec![],
6139 optional_backfills: vec![],
6140 vec_inserts: vec![],
6141 operational_writes: vec![],
6142 });
6143
6144 assert!(
6145 matches!(result, Err(EngineError::InvalidWrite(_))),
6146 "empty text_content must be rejected"
6147 );
6148 }
6149
6150 #[test]
6151 fn receipt_reports_zero_backfills_when_none_submitted() {
6152 let db = NamedTempFile::new().expect("temporary db");
6153 let writer = WriterActor::start(
6154 db.path(),
6155 Arc::new(SchemaManager::new()),
6156 ProvenanceMode::Warn,
6157 Arc::new(TelemetryCounters::default()),
6158 )
6159 .expect("writer");
6160
6161 let receipt = writer
6162 .submit(WriteRequest {
6163 label: "test".to_owned(),
6164 nodes: vec![NodeInsert {
6165 row_id: "row-1".to_owned(),
6166 logical_id: "node-1".to_owned(),
6167 kind: "Note".to_owned(),
6168 properties: "{}".to_owned(),
6169 source_ref: Some("src-1".to_owned()),
6170 upsert: false,
6171 chunk_policy: ChunkPolicy::Preserve,
6172 content_ref: None,
6173 }],
6174 node_retires: vec![],
6175 edges: vec![],
6176 edge_retires: vec![],
6177 chunks: vec![],
6178 runs: vec![],
6179 steps: vec![],
6180 actions: vec![],
6181 optional_backfills: vec![],
6182 vec_inserts: vec![],
6183 operational_writes: vec![],
6184 })
6185 .expect("write");
6186
6187 assert_eq!(receipt.optional_backfill_count, 0);
6188 }
6189
6190 #[test]
6191 fn receipt_reports_correct_backfill_count() {
6192 let db = NamedTempFile::new().expect("temporary db");
6193 let writer = WriterActor::start(
6194 db.path(),
6195 Arc::new(SchemaManager::new()),
6196 ProvenanceMode::Warn,
6197 Arc::new(TelemetryCounters::default()),
6198 )
6199 .expect("writer");
6200
6201 let receipt = writer
6202 .submit(WriteRequest {
6203 label: "test".to_owned(),
6204 nodes: vec![NodeInsert {
6205 row_id: "row-1".to_owned(),
6206 logical_id: "node-1".to_owned(),
6207 kind: "Note".to_owned(),
6208 properties: "{}".to_owned(),
6209 source_ref: Some("src-1".to_owned()),
6210 upsert: false,
6211 chunk_policy: ChunkPolicy::Preserve,
6212 content_ref: None,
6213 }],
6214 node_retires: vec![],
6215 edges: vec![],
6216 edge_retires: vec![],
6217 chunks: vec![],
6218 runs: vec![],
6219 steps: vec![],
6220 actions: vec![],
6221 optional_backfills: vec![
6222 OptionalProjectionTask {
6223 target: ProjectionTarget::Fts,
6224 payload: "p1".to_owned(),
6225 },
6226 OptionalProjectionTask {
6227 target: ProjectionTarget::Vec,
6228 payload: "p2".to_owned(),
6229 },
6230 OptionalProjectionTask {
6231 target: ProjectionTarget::All,
6232 payload: "p3".to_owned(),
6233 },
6234 ],
6235 vec_inserts: vec![],
6236 operational_writes: vec![],
6237 })
6238 .expect("write");
6239
6240 assert_eq!(receipt.optional_backfill_count, 3);
6241 }
6242
6243 #[test]
6244 fn backfill_tasks_are_not_executed_during_write() {
6245 let db = NamedTempFile::new().expect("temporary db");
6246 let writer = WriterActor::start(
6247 db.path(),
6248 Arc::new(SchemaManager::new()),
6249 ProvenanceMode::Warn,
6250 Arc::new(TelemetryCounters::default()),
6251 )
6252 .expect("writer");
6253
6254 writer
6257 .submit(WriteRequest {
6258 label: "test".to_owned(),
6259 nodes: vec![NodeInsert {
6260 row_id: "row-1".to_owned(),
6261 logical_id: "node-1".to_owned(),
6262 kind: "Note".to_owned(),
6263 properties: "{}".to_owned(),
6264 source_ref: Some("src-1".to_owned()),
6265 upsert: false,
6266 chunk_policy: ChunkPolicy::Preserve,
6267 content_ref: None,
6268 }],
6269 node_retires: vec![],
6270 edges: vec![],
6271 edge_retires: vec![],
6272 chunks: vec![ChunkInsert {
6273 id: "chunk-1".to_owned(),
6274 node_logical_id: "node-1".to_owned(),
6275 text_content: "required text".to_owned(),
6276 byte_start: None,
6277 byte_end: None,
6278 content_hash: None,
6279 }],
6280 runs: vec![],
6281 steps: vec![],
6282 actions: vec![],
6283 optional_backfills: vec![OptionalProjectionTask {
6284 target: ProjectionTarget::Fts,
6285 payload: "backfill-payload".to_owned(),
6286 }],
6287 vec_inserts: vec![],
6288 operational_writes: vec![],
6289 })
6290 .expect("write");
6291
6292 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6293 let count: i64 = conn
6294 .query_row(
6295 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6296 [],
6297 |row| row.get(0),
6298 )
6299 .expect("fts count");
6300
6301 assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6302 }
6303
6304 #[test]
6305 fn fts_row_uses_new_kind_after_node_replace() {
6306 let db = NamedTempFile::new().expect("temporary db");
6307 let writer = WriterActor::start(
6308 db.path(),
6309 Arc::new(SchemaManager::new()),
6310 ProvenanceMode::Warn,
6311 Arc::new(TelemetryCounters::default()),
6312 )
6313 .expect("writer");
6314
6315 writer
6317 .submit(WriteRequest {
6318 label: "v1".to_owned(),
6319 nodes: vec![NodeInsert {
6320 row_id: "row-1".to_owned(),
6321 logical_id: "node-1".to_owned(),
6322 kind: "Note".to_owned(),
6323 properties: "{}".to_owned(),
6324 source_ref: Some("src-1".to_owned()),
6325 upsert: false,
6326 chunk_policy: ChunkPolicy::Preserve,
6327 content_ref: None,
6328 }],
6329 node_retires: vec![],
6330 edges: vec![],
6331 edge_retires: vec![],
6332 chunks: vec![ChunkInsert {
6333 id: "chunk-v1".to_owned(),
6334 node_logical_id: "node-1".to_owned(),
6335 text_content: "original".to_owned(),
6336 byte_start: None,
6337 byte_end: None,
6338 content_hash: None,
6339 }],
6340 runs: vec![],
6341 steps: vec![],
6342 actions: vec![],
6343 optional_backfills: vec![],
6344 vec_inserts: vec![],
6345 operational_writes: vec![],
6346 })
6347 .expect("v1 write");
6348
6349 writer
6351 .submit(WriteRequest {
6352 label: "v2".to_owned(),
6353 nodes: vec![NodeInsert {
6354 row_id: "row-2".to_owned(),
6355 logical_id: "node-1".to_owned(),
6356 kind: "Meeting".to_owned(),
6357 properties: "{}".to_owned(),
6358 source_ref: Some("src-2".to_owned()),
6359 upsert: true,
6360 chunk_policy: ChunkPolicy::Replace,
6361 content_ref: None,
6362 }],
6363 node_retires: vec![],
6364 edges: vec![],
6365 edge_retires: vec![],
6366 chunks: vec![ChunkInsert {
6367 id: "chunk-v2".to_owned(),
6368 node_logical_id: "node-1".to_owned(),
6369 text_content: "updated".to_owned(),
6370 byte_start: None,
6371 byte_end: None,
6372 content_hash: None,
6373 }],
6374 runs: vec![],
6375 steps: vec![],
6376 actions: vec![],
6377 optional_backfills: vec![],
6378 vec_inserts: vec![],
6379 operational_writes: vec![],
6380 })
6381 .expect("v2 write");
6382
6383 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6384
6385 let old_count: i64 = conn
6387 .query_row(
6388 "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6389 [],
6390 |row| row.get(0),
6391 )
6392 .expect("old fts count");
6393 assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6394
6395 let new_kind: String = conn
6397 .query_row(
6398 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6399 [],
6400 |row| row.get(0),
6401 )
6402 .expect("new fts row");
6403 assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6404 }
6405
6406 #[test]
6409 fn vec_insert_empty_chunk_id_is_rejected() {
6410 let db = NamedTempFile::new().expect("temporary db");
6411 let writer = WriterActor::start(
6412 db.path(),
6413 Arc::new(SchemaManager::new()),
6414 ProvenanceMode::Warn,
6415 Arc::new(TelemetryCounters::default()),
6416 )
6417 .expect("writer");
6418 let result = writer.submit(WriteRequest {
6419 label: "vec-test".to_owned(),
6420 nodes: vec![],
6421 node_retires: vec![],
6422 edges: vec![],
6423 edge_retires: vec![],
6424 chunks: vec![],
6425 runs: vec![],
6426 steps: vec![],
6427 actions: vec![],
6428 optional_backfills: vec![],
6429 vec_inserts: vec![VecInsert {
6430 chunk_id: String::new(),
6431 embedding: vec![0.1, 0.2, 0.3],
6432 }],
6433 operational_writes: vec![],
6434 });
6435 assert!(
6436 matches!(result, Err(EngineError::InvalidWrite(_))),
6437 "empty chunk_id in VecInsert must be rejected"
6438 );
6439 }
6440
6441 #[test]
6442 fn vec_insert_empty_embedding_is_rejected() {
6443 let db = NamedTempFile::new().expect("temporary db");
6444 let writer = WriterActor::start(
6445 db.path(),
6446 Arc::new(SchemaManager::new()),
6447 ProvenanceMode::Warn,
6448 Arc::new(TelemetryCounters::default()),
6449 )
6450 .expect("writer");
6451 let result = writer.submit(WriteRequest {
6452 label: "vec-test".to_owned(),
6453 nodes: vec![],
6454 node_retires: vec![],
6455 edges: vec![],
6456 edge_retires: vec![],
6457 chunks: vec![],
6458 runs: vec![],
6459 steps: vec![],
6460 actions: vec![],
6461 optional_backfills: vec![],
6462 vec_inserts: vec![VecInsert {
6463 chunk_id: "chunk-1".to_owned(),
6464 embedding: vec![],
6465 }],
6466 operational_writes: vec![],
6467 });
6468 assert!(
6469 matches!(result, Err(EngineError::InvalidWrite(_))),
6470 "empty embedding in VecInsert must be rejected"
6471 );
6472 }
6473
6474 #[test]
6475 fn vec_insert_noop_without_feature() {
6476 let db = NamedTempFile::new().expect("temporary db");
6479 let writer = WriterActor::start(
6480 db.path(),
6481 Arc::new(SchemaManager::new()),
6482 ProvenanceMode::Warn,
6483 Arc::new(TelemetryCounters::default()),
6484 )
6485 .expect("writer");
6486 let result = writer.submit(WriteRequest {
6487 label: "vec-noop".to_owned(),
6488 nodes: vec![],
6489 node_retires: vec![],
6490 edges: vec![],
6491 edge_retires: vec![],
6492 chunks: vec![],
6493 runs: vec![],
6494 steps: vec![],
6495 actions: vec![],
6496 optional_backfills: vec![],
6497 vec_inserts: vec![VecInsert {
6498 chunk_id: "chunk-noop".to_owned(),
6499 embedding: vec![1.0, 2.0, 3.0],
6500 }],
6501 operational_writes: vec![],
6502 });
6503 #[cfg(not(feature = "sqlite-vec"))]
6504 result.expect("noop VecInsert without feature must succeed");
6505 #[cfg(feature = "sqlite-vec")]
6507 let _ = result;
6508 }
6509
6510 #[cfg(feature = "sqlite-vec")]
6511 #[test]
6512 fn node_retire_preserves_vec_rows_for_later_restore() {
6513 use crate::sqlite::open_connection_with_vec;
6514
6515 let db = NamedTempFile::new().expect("temporary db");
6516 let schema_manager = Arc::new(SchemaManager::new());
6517
6518 {
6519 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6520 schema_manager.bootstrap(&conn).expect("bootstrap");
6521 schema_manager
6522 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6523 .expect("ensure profile");
6524 }
6525
6526 let writer = WriterActor::start(
6527 db.path(),
6528 Arc::clone(&schema_manager),
6529 ProvenanceMode::Warn,
6530 Arc::new(TelemetryCounters::default()),
6531 )
6532 .expect("writer");
6533
6534 writer
6536 .submit(WriteRequest {
6537 label: "setup".to_owned(),
6538 nodes: vec![NodeInsert {
6539 row_id: "row-retire-vec".to_owned(),
6540 logical_id: "node-retire-vec".to_owned(),
6541 kind: "Doc".to_owned(),
6542 properties: "{}".to_owned(),
6543 source_ref: Some("src".to_owned()),
6544 upsert: false,
6545 chunk_policy: ChunkPolicy::Preserve,
6546 content_ref: None,
6547 }],
6548 node_retires: vec![],
6549 edges: vec![],
6550 edge_retires: vec![],
6551 chunks: vec![ChunkInsert {
6552 id: "chunk-retire-vec".to_owned(),
6553 node_logical_id: "node-retire-vec".to_owned(),
6554 text_content: "text".to_owned(),
6555 byte_start: None,
6556 byte_end: None,
6557 content_hash: None,
6558 }],
6559 runs: vec![],
6560 steps: vec![],
6561 actions: vec![],
6562 optional_backfills: vec![],
6563 vec_inserts: vec![VecInsert {
6564 chunk_id: "chunk-retire-vec".to_owned(),
6565 embedding: vec![0.1, 0.2, 0.3],
6566 }],
6567 operational_writes: vec![],
6568 })
6569 .expect("setup write");
6570
6571 writer
6573 .submit(WriteRequest {
6574 label: "retire".to_owned(),
6575 nodes: vec![],
6576 node_retires: vec![NodeRetire {
6577 logical_id: "node-retire-vec".to_owned(),
6578 source_ref: Some("src".to_owned()),
6579 }],
6580 edges: vec![],
6581 edge_retires: vec![],
6582 chunks: vec![],
6583 runs: vec![],
6584 steps: vec![],
6585 actions: vec![],
6586 optional_backfills: vec![],
6587 vec_inserts: vec![],
6588 operational_writes: vec![],
6589 })
6590 .expect("retire write");
6591
6592 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6593 let count: i64 = conn
6594 .query_row(
6595 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-retire-vec'",
6596 [],
6597 |row| row.get(0),
6598 )
6599 .expect("count");
6600 assert_eq!(
6601 count, 1,
6602 "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6603 );
6604 }
6605
6606 #[cfg(feature = "sqlite-vec")]
6607 #[test]
6608 #[allow(clippy::too_many_lines)]
6609 fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6610 use crate::sqlite::open_connection_with_vec;
6611
6612 let db = NamedTempFile::new().expect("temporary db");
6613 let schema_manager = Arc::new(SchemaManager::new());
6614
6615 {
6616 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6617 schema_manager.bootstrap(&conn).expect("bootstrap");
6618 schema_manager
6619 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6620 .expect("ensure profile");
6621 }
6622
6623 let writer = WriterActor::start(
6624 db.path(),
6625 Arc::clone(&schema_manager),
6626 ProvenanceMode::Warn,
6627 Arc::new(TelemetryCounters::default()),
6628 )
6629 .expect("writer");
6630
6631 writer
6633 .submit(WriteRequest {
6634 label: "v1".to_owned(),
6635 nodes: vec![NodeInsert {
6636 row_id: "row-replace-v1".to_owned(),
6637 logical_id: "node-replace-vec".to_owned(),
6638 kind: "Doc".to_owned(),
6639 properties: "{}".to_owned(),
6640 source_ref: Some("src".to_owned()),
6641 upsert: false,
6642 chunk_policy: ChunkPolicy::Preserve,
6643 content_ref: None,
6644 }],
6645 node_retires: vec![],
6646 edges: vec![],
6647 edge_retires: vec![],
6648 chunks: vec![ChunkInsert {
6649 id: "chunk-replace-A".to_owned(),
6650 node_logical_id: "node-replace-vec".to_owned(),
6651 text_content: "version one".to_owned(),
6652 byte_start: None,
6653 byte_end: None,
6654 content_hash: None,
6655 }],
6656 runs: vec![],
6657 steps: vec![],
6658 actions: vec![],
6659 optional_backfills: vec![],
6660 vec_inserts: vec![VecInsert {
6661 chunk_id: "chunk-replace-A".to_owned(),
6662 embedding: vec![0.1, 0.2, 0.3],
6663 }],
6664 operational_writes: vec![],
6665 })
6666 .expect("v1 write");
6667
6668 writer
6670 .submit(WriteRequest {
6671 label: "v2".to_owned(),
6672 nodes: vec![NodeInsert {
6673 row_id: "row-replace-v2".to_owned(),
6674 logical_id: "node-replace-vec".to_owned(),
6675 kind: "Doc".to_owned(),
6676 properties: "{}".to_owned(),
6677 source_ref: Some("src".to_owned()),
6678 upsert: true,
6679 chunk_policy: ChunkPolicy::Replace,
6680 content_ref: None,
6681 }],
6682 node_retires: vec![],
6683 edges: vec![],
6684 edge_retires: vec![],
6685 chunks: vec![ChunkInsert {
6686 id: "chunk-replace-B".to_owned(),
6687 node_logical_id: "node-replace-vec".to_owned(),
6688 text_content: "version two".to_owned(),
6689 byte_start: None,
6690 byte_end: None,
6691 content_hash: None,
6692 }],
6693 runs: vec![],
6694 steps: vec![],
6695 actions: vec![],
6696 optional_backfills: vec![],
6697 vec_inserts: vec![VecInsert {
6698 chunk_id: "chunk-replace-B".to_owned(),
6699 embedding: vec![0.4, 0.5, 0.6],
6700 }],
6701 operational_writes: vec![],
6702 })
6703 .expect("v2 write");
6704
6705 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6706 let count_a: i64 = conn
6707 .query_row(
6708 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-A'",
6709 [],
6710 |row| row.get(0),
6711 )
6712 .expect("count A");
6713 let count_b: i64 = conn
6714 .query_row(
6715 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-B'",
6716 [],
6717 |row| row.get(0),
6718 )
6719 .expect("count B");
6720 assert_eq!(
6721 count_a, 0,
6722 "old vec row (chunk-A) must be deleted on Replace"
6723 );
6724 assert_eq!(
6725 count_b, 1,
6726 "new vec row (chunk-B) must be present after Replace"
6727 );
6728 }
6729
6730 #[cfg(feature = "sqlite-vec")]
6731 #[test]
6732 fn vec_insert_is_persisted_when_feature_enabled() {
6733 use crate::sqlite::open_connection_with_vec;
6734
6735 let db = NamedTempFile::new().expect("temporary db");
6736 let schema_manager = Arc::new(SchemaManager::new());
6737
6738 {
6740 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6741 schema_manager.bootstrap(&conn).expect("bootstrap");
6742 schema_manager
6743 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6744 .expect("ensure profile");
6745 }
6746
6747 let writer = WriterActor::start(
6748 db.path(),
6749 Arc::clone(&schema_manager),
6750 ProvenanceMode::Warn,
6751 Arc::new(TelemetryCounters::default()),
6752 )
6753 .expect("writer");
6754
6755 writer
6756 .submit(WriteRequest {
6757 label: "vec-insert".to_owned(),
6758 nodes: vec![],
6759 node_retires: vec![],
6760 edges: vec![],
6761 edge_retires: vec![],
6762 chunks: vec![],
6763 runs: vec![],
6764 steps: vec![],
6765 actions: vec![],
6766 optional_backfills: vec![],
6767 vec_inserts: vec![VecInsert {
6768 chunk_id: "chunk-vec".to_owned(),
6769 embedding: vec![0.1, 0.2, 0.3],
6770 }],
6771 operational_writes: vec![],
6772 })
6773 .expect("vec insert write");
6774
6775 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6776 let count: i64 = conn
6777 .query_row(
6778 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-vec'",
6779 [],
6780 |row| row.get(0),
6781 )
6782 .expect("count");
6783 assert_eq!(count, 1, "VecInsert must persist a row in vec_nodes_active");
6784 }
6785
6786 #[test]
6789 fn write_request_exceeding_node_limit_is_rejected() {
6790 let nodes: Vec<NodeInsert> = (0..10_001)
6791 .map(|i| NodeInsert {
6792 row_id: format!("row-{i}"),
6793 logical_id: format!("lg-{i}"),
6794 kind: "Note".to_owned(),
6795 properties: "{}".to_owned(),
6796 source_ref: None,
6797 upsert: false,
6798 chunk_policy: ChunkPolicy::Preserve,
6799 content_ref: None,
6800 })
6801 .collect();
6802
6803 let request = WriteRequest {
6804 label: "too-many-nodes".to_owned(),
6805 nodes,
6806 node_retires: vec![],
6807 edges: vec![],
6808 edge_retires: vec![],
6809 chunks: vec![],
6810 runs: vec![],
6811 steps: vec![],
6812 actions: vec![],
6813 optional_backfills: vec![],
6814 vec_inserts: vec![],
6815 operational_writes: vec![],
6816 };
6817
6818 let result = prepare_write(request, ProvenanceMode::Warn)
6819 .map(|_| ())
6820 .map_err(|e| format!("{e}"));
6821 assert!(
6822 matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6823 "exceeding node limit must return InvalidWrite: got {result:?}"
6824 );
6825 }
6826
6827 #[test]
6828 fn write_request_exceeding_total_limit_is_rejected() {
6829 let request = WriteRequest {
6833 label: "too-many-total".to_owned(),
6834 nodes: (0..10_000)
6835 .map(|i| NodeInsert {
6836 row_id: format!("row-{i}"),
6837 logical_id: format!("lg-{i}"),
6838 kind: "Note".to_owned(),
6839 properties: "{}".to_owned(),
6840 source_ref: None,
6841 upsert: false,
6842 chunk_policy: ChunkPolicy::Preserve,
6843 content_ref: None,
6844 })
6845 .collect(),
6846 node_retires: vec![],
6847 edges: (0..10_000)
6848 .map(|i| EdgeInsert {
6849 row_id: format!("edge-row-{i}"),
6850 logical_id: format!("edge-lg-{i}"),
6851 kind: "link".to_owned(),
6852 source_logical_id: format!("lg-{i}"),
6853 target_logical_id: format!("lg-{}", i + 1),
6854 properties: "{}".to_owned(),
6855 source_ref: None,
6856 upsert: false,
6857 })
6858 .collect(),
6859 edge_retires: vec![],
6860 chunks: (0..50_000)
6861 .map(|i| ChunkInsert {
6862 id: format!("chunk-{i}"),
6863 node_logical_id: "lg-0".to_owned(),
6864 text_content: "text".to_owned(),
6865 byte_start: None,
6866 byte_end: None,
6867 content_hash: None,
6868 })
6869 .collect(),
6870 runs: vec![],
6871 steps: vec![],
6872 actions: vec![],
6873 optional_backfills: vec![],
6874 vec_inserts: (0..20_001)
6875 .map(|i| VecInsert {
6876 chunk_id: format!("vec-chunk-{i}"),
6877 embedding: vec![0.1],
6878 })
6879 .collect(),
6880 operational_writes: (0..10_000)
6881 .map(|i| OperationalWrite::Append {
6882 collection: format!("col-{i}"),
6883 record_key: format!("key-{i}"),
6884 payload_json: "{}".to_owned(),
6885 source_ref: None,
6886 })
6887 .collect(),
6888 };
6889
6890 let result = prepare_write(request, ProvenanceMode::Warn)
6891 .map(|_| ())
6892 .map_err(|e| format!("{e}"));
6893 assert!(
6894 matches!(result, Err(ref msg) if msg.contains("too many total items")),
6895 "exceeding total item limit must return InvalidWrite: got {result:?}"
6896 );
6897 }
6898
6899 #[test]
6900 fn write_request_within_limits_succeeds() {
6901 let db = NamedTempFile::new().expect("temporary db");
6902 let writer = WriterActor::start(
6903 db.path(),
6904 Arc::new(SchemaManager::new()),
6905 ProvenanceMode::Warn,
6906 Arc::new(TelemetryCounters::default()),
6907 )
6908 .expect("writer");
6909
6910 let result = writer.submit(WriteRequest {
6911 label: "within-limits".to_owned(),
6912 nodes: vec![NodeInsert {
6913 row_id: "row-1".to_owned(),
6914 logical_id: "lg-1".to_owned(),
6915 kind: "Note".to_owned(),
6916 properties: "{}".to_owned(),
6917 source_ref: None,
6918 upsert: false,
6919 chunk_policy: ChunkPolicy::Preserve,
6920 content_ref: None,
6921 }],
6922 node_retires: vec![],
6923 edges: vec![],
6924 edge_retires: vec![],
6925 chunks: vec![],
6926 runs: vec![],
6927 steps: vec![],
6928 actions: vec![],
6929 optional_backfills: vec![],
6930 vec_inserts: vec![],
6931 operational_writes: vec![],
6932 });
6933
6934 assert!(
6935 result.is_ok(),
6936 "write request within limits must succeed: got {result:?}"
6937 );
6938 }
6939
6940 #[test]
6941 fn property_fts_rows_created_on_node_insert() {
6942 let db = NamedTempFile::new().expect("temporary db");
6943 let schema = Arc::new(SchemaManager::new());
6945 {
6946 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6947 schema.bootstrap(&conn).expect("bootstrap");
6948 conn.execute(
6949 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6950 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
6951 [],
6952 )
6953 .expect("register schema");
6954 }
6955 let writer = WriterActor::start(
6956 db.path(),
6957 Arc::clone(&schema),
6958 ProvenanceMode::Warn,
6959 Arc::new(TelemetryCounters::default()),
6960 )
6961 .expect("writer");
6962
6963 writer
6964 .submit(WriteRequest {
6965 label: "goal-insert".to_owned(),
6966 nodes: vec![NodeInsert {
6967 row_id: "row-1".to_owned(),
6968 logical_id: "goal-1".to_owned(),
6969 kind: "Goal".to_owned(),
6970 properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
6971 .to_owned(),
6972 source_ref: Some("src-1".to_owned()),
6973 upsert: false,
6974 chunk_policy: ChunkPolicy::Preserve,
6975 content_ref: None,
6976 }],
6977 node_retires: vec![],
6978 edges: vec![],
6979 edge_retires: vec![],
6980 chunks: vec![],
6981 runs: vec![],
6982 steps: vec![],
6983 actions: vec![],
6984 optional_backfills: vec![],
6985 vec_inserts: vec![],
6986 operational_writes: vec![],
6987 })
6988 .expect("write");
6989
6990 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6991 let text: String = conn
6992 .query_row(
6993 "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6994 [],
6995 |row| row.get(0),
6996 )
6997 .expect("property FTS row must exist");
6998 assert_eq!(text, "Ship v2 Launch the redesign");
6999 }
7000
7001 #[test]
7002 fn property_fts_rows_replaced_on_upsert() {
7003 let db = NamedTempFile::new().expect("temporary db");
7004 let schema = Arc::new(SchemaManager::new());
7005 {
7006 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7007 schema.bootstrap(&conn).expect("bootstrap");
7008 conn.execute(
7009 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7010 VALUES ('Goal', '[\"$.name\"]', ' ')",
7011 [],
7012 )
7013 .expect("register schema");
7014 }
7015 let writer = WriterActor::start(
7016 db.path(),
7017 Arc::clone(&schema),
7018 ProvenanceMode::Warn,
7019 Arc::new(TelemetryCounters::default()),
7020 )
7021 .expect("writer");
7022
7023 writer
7025 .submit(WriteRequest {
7026 label: "insert".to_owned(),
7027 nodes: vec![NodeInsert {
7028 row_id: "row-1".to_owned(),
7029 logical_id: "goal-1".to_owned(),
7030 kind: "Goal".to_owned(),
7031 properties: r#"{"name":"Alpha"}"#.to_owned(),
7032 source_ref: Some("src-1".to_owned()),
7033 upsert: false,
7034 chunk_policy: ChunkPolicy::Preserve,
7035 content_ref: None,
7036 }],
7037 node_retires: vec![],
7038 edges: vec![],
7039 edge_retires: vec![],
7040 chunks: vec![],
7041 runs: vec![],
7042 steps: vec![],
7043 actions: vec![],
7044 optional_backfills: vec![],
7045 vec_inserts: vec![],
7046 operational_writes: vec![],
7047 })
7048 .expect("insert");
7049
7050 writer
7052 .submit(WriteRequest {
7053 label: "upsert".to_owned(),
7054 nodes: vec![NodeInsert {
7055 row_id: "row-2".to_owned(),
7056 logical_id: "goal-1".to_owned(),
7057 kind: "Goal".to_owned(),
7058 properties: r#"{"name":"Beta"}"#.to_owned(),
7059 source_ref: Some("src-2".to_owned()),
7060 upsert: true,
7061 chunk_policy: ChunkPolicy::Preserve,
7062 content_ref: None,
7063 }],
7064 node_retires: vec![],
7065 edges: vec![],
7066 edge_retires: vec![],
7067 chunks: vec![],
7068 runs: vec![],
7069 steps: vec![],
7070 actions: vec![],
7071 optional_backfills: vec![],
7072 vec_inserts: vec![],
7073 operational_writes: vec![],
7074 })
7075 .expect("upsert");
7076
7077 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7078 let count: i64 = conn
7079 .query_row(
7080 "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7081 [],
7082 |row| row.get(0),
7083 )
7084 .expect("count");
7085 assert_eq!(
7086 count, 1,
7087 "must have exactly one property FTS row after upsert"
7088 );
7089
7090 let text: String = conn
7091 .query_row(
7092 "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7093 [],
7094 |row| row.get(0),
7095 )
7096 .expect("text");
7097 assert_eq!(text, "Beta", "property FTS must reflect updated properties");
7098 }
7099
7100 #[test]
7101 fn property_fts_rows_deleted_on_retire() {
7102 let db = NamedTempFile::new().expect("temporary db");
7103 let schema = Arc::new(SchemaManager::new());
7104 {
7105 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7106 schema.bootstrap(&conn).expect("bootstrap");
7107 conn.execute(
7108 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7109 VALUES ('Goal', '[\"$.name\"]', ' ')",
7110 [],
7111 )
7112 .expect("register schema");
7113 }
7114 let writer = WriterActor::start(
7115 db.path(),
7116 Arc::clone(&schema),
7117 ProvenanceMode::Warn,
7118 Arc::new(TelemetryCounters::default()),
7119 )
7120 .expect("writer");
7121
7122 writer
7124 .submit(WriteRequest {
7125 label: "insert".to_owned(),
7126 nodes: vec![NodeInsert {
7127 row_id: "row-1".to_owned(),
7128 logical_id: "goal-1".to_owned(),
7129 kind: "Goal".to_owned(),
7130 properties: r#"{"name":"Alpha"}"#.to_owned(),
7131 source_ref: Some("src-1".to_owned()),
7132 upsert: false,
7133 chunk_policy: ChunkPolicy::Preserve,
7134 content_ref: None,
7135 }],
7136 node_retires: vec![],
7137 edges: vec![],
7138 edge_retires: vec![],
7139 chunks: vec![],
7140 runs: vec![],
7141 steps: vec![],
7142 actions: vec![],
7143 optional_backfills: vec![],
7144 vec_inserts: vec![],
7145 operational_writes: vec![],
7146 })
7147 .expect("insert");
7148
7149 writer
7151 .submit(WriteRequest {
7152 label: "retire".to_owned(),
7153 nodes: vec![],
7154 node_retires: vec![NodeRetire {
7155 logical_id: "goal-1".to_owned(),
7156 source_ref: Some("forget-1".to_owned()),
7157 }],
7158 edges: vec![],
7159 edge_retires: vec![],
7160 chunks: vec![],
7161 runs: vec![],
7162 steps: vec![],
7163 actions: vec![],
7164 optional_backfills: vec![],
7165 vec_inserts: vec![],
7166 operational_writes: vec![],
7167 })
7168 .expect("retire");
7169
7170 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7171 let count: i64 = conn
7172 .query_row(
7173 "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7174 [],
7175 |row| row.get(0),
7176 )
7177 .expect("count");
7178 assert_eq!(count, 0, "property FTS row must be deleted on retire");
7179 }
7180
7181 #[test]
7182 fn no_property_fts_row_for_unregistered_kind() {
7183 let db = NamedTempFile::new().expect("temporary db");
7184 let schema = Arc::new(SchemaManager::new());
7185 {
7186 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7187 schema.bootstrap(&conn).expect("bootstrap");
7188 }
7190 let writer = WriterActor::start(
7191 db.path(),
7192 Arc::clone(&schema),
7193 ProvenanceMode::Warn,
7194 Arc::new(TelemetryCounters::default()),
7195 )
7196 .expect("writer");
7197
7198 writer
7199 .submit(WriteRequest {
7200 label: "insert".to_owned(),
7201 nodes: vec![NodeInsert {
7202 row_id: "row-1".to_owned(),
7203 logical_id: "note-1".to_owned(),
7204 kind: "Note".to_owned(),
7205 properties: r#"{"title":"hello"}"#.to_owned(),
7206 source_ref: Some("src-1".to_owned()),
7207 upsert: false,
7208 chunk_policy: ChunkPolicy::Preserve,
7209 content_ref: None,
7210 }],
7211 node_retires: vec![],
7212 edges: vec![],
7213 edge_retires: vec![],
7214 chunks: vec![],
7215 runs: vec![],
7216 steps: vec![],
7217 actions: vec![],
7218 optional_backfills: vec![],
7219 vec_inserts: vec![],
7220 operational_writes: vec![],
7221 })
7222 .expect("insert");
7223
7224 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7225 let count: i64 = conn
7226 .query_row("SELECT count(*) FROM fts_node_properties", [], |row| {
7227 row.get(0)
7228 })
7229 .expect("count");
7230 assert_eq!(count, 0, "no property FTS rows for unregistered kind");
7231 }
7232
7233 mod extract_json_path_tests {
7234 use super::super::extract_json_path;
7235 use serde_json::json;
7236
7237 #[test]
7238 fn string_value() {
7239 let v = json!({"name": "alice"});
7240 assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7241 }
7242
7243 #[test]
7244 fn number_value() {
7245 let v = json!({"age": 42});
7246 assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7247 }
7248
7249 #[test]
7250 fn bool_value() {
7251 let v = json!({"active": true});
7252 assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7253 }
7254
7255 #[test]
7256 fn null_value() {
7257 let v = json!({"x": null});
7258 assert!(extract_json_path(&v, "$.x").is_empty());
7259 }
7260
7261 #[test]
7262 fn missing_path() {
7263 let v = json!({"name": "a"});
7264 assert!(extract_json_path(&v, "$.missing").is_empty());
7265 }
7266
7267 #[test]
7268 fn nested_path() {
7269 let v = json!({"address": {"city": "NYC"}});
7270 assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7271 }
7272
7273 #[test]
7274 fn array_of_strings() {
7275 let v = json!({"tags": ["a", "b", "c"]});
7276 assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7277 }
7278
7279 #[test]
7280 fn array_mixed_scalars() {
7281 let v = json!({"vals": ["x", 1, true]});
7282 assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7283 }
7284
7285 #[test]
7286 fn array_only_objects_returns_empty() {
7287 let v = json!({"data": [{"k": "v"}]});
7288 assert!(extract_json_path(&v, "$.data").is_empty());
7289 }
7290
7291 #[test]
7292 fn array_mixed_objects_and_scalars() {
7293 let v = json!({"data": ["keep", {"skip": true}, "also"]});
7294 assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7295 }
7296
7297 #[test]
7298 fn object_returns_empty() {
7299 let v = json!({"meta": {"k": "v"}});
7300 assert!(extract_json_path(&v, "$.meta").is_empty());
7301 }
7302
7303 #[test]
7304 fn no_prefix_returns_empty() {
7305 let v = json!({"name": "a"});
7306 assert!(extract_json_path(&v, "name").is_empty());
7307 }
7308 }
7309
7310 mod recursive_extraction_tests {
7311 use super::super::{
7312 LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7313 PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7314 };
7315 use serde_json::json;
7316
7317 fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7318 PropertyFtsSchema {
7319 paths,
7320 separator: " ".to_owned(),
7321 exclude_paths: Vec::new(),
7322 }
7323 }
7324
7325 fn schema_with_excludes(
7326 paths: Vec<PropertyPathEntry>,
7327 excludes: Vec<String>,
7328 ) -> PropertyFtsSchema {
7329 PropertyFtsSchema {
7330 paths,
7331 separator: " ".to_owned(),
7332 exclude_paths: excludes,
7333 }
7334 }
7335
7336 #[test]
7337 fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7338 let props = json!({"payload": {"b": "two", "a": "one"}});
7339 let (blob, positions, _stats) = extract_property_fts(
7340 &props,
7341 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7342 );
7343 let blob = blob.expect("blob emitted");
7344 let idx_one = blob.find("one").expect("contains 'one'");
7345 let idx_two = blob.find("two").expect("contains 'two'");
7346 assert!(
7347 idx_one < idx_two,
7348 "lex order: 'one' (key a) before 'two' (key b)"
7349 );
7350 assert_eq!(positions.len(), 2);
7351 assert_eq!(positions[0].leaf_path, "$.payload.a");
7352 assert_eq!(positions[1].leaf_path, "$.payload.b");
7353 }
7354
7355 #[test]
7356 fn recursive_extraction_walks_arrays_of_scalars() {
7357 let props = json!({"tags": ["red", "blue"]});
7358 let (_blob, positions, _stats) = extract_property_fts(
7359 &props,
7360 &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7361 );
7362 assert_eq!(positions.len(), 2);
7363 assert_eq!(positions[0].leaf_path, "$.tags[0]");
7364 assert_eq!(positions[1].leaf_path, "$.tags[1]");
7365 }
7366
7367 #[test]
7368 fn recursive_extraction_walks_arrays_of_objects() {
7369 let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7370 let (_blob, positions, _stats) = extract_property_fts(
7371 &props,
7372 &schema(vec![PropertyPathEntry::recursive("$.items")]),
7373 );
7374 assert_eq!(positions.len(), 2);
7375 assert_eq!(positions[0].leaf_path, "$.items[0].name");
7376 assert_eq!(positions[1].leaf_path, "$.items[1].name");
7377 }
7378
7379 #[test]
7380 fn recursive_extraction_stringifies_numbers_and_bools() {
7381 let props = json!({"root": {"n": 42, "ok": true}});
7382 let (blob, _positions, _stats) = extract_property_fts(
7383 &props,
7384 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7385 );
7386 let blob = blob.expect("blob emitted");
7387 assert!(blob.contains("42"));
7388 assert!(blob.contains("true"));
7389 }
7390
7391 #[test]
7392 fn recursive_extraction_skips_nulls_and_missing() {
7393 let props = json!({"root": {"x": null, "y": "present"}});
7394 let (blob, positions, _stats) = extract_property_fts(
7395 &props,
7396 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7397 );
7398 let blob = blob.expect("blob emitted");
7399 assert!(!blob.contains("null"));
7400 assert_eq!(positions.len(), 1);
7401 assert_eq!(positions[0].leaf_path, "$.root.y");
7402 }
7403
7404 #[test]
7405 fn recursive_extraction_respects_max_depth_guardrail() {
7406 let mut inner = json!("leaf-value");
7408 for _ in 0..10 {
7409 inner = json!({ "k": inner });
7410 }
7411 let props = json!({ "root": inner });
7412 let (blob, positions, stats) = extract_property_fts(
7413 &props,
7414 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7415 );
7416 assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7417 assert!(
7419 blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7420 "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7421 );
7422 let _ = positions;
7425 let _ = MAX_RECURSIVE_DEPTH;
7426 }
7427
7428 #[test]
7429 fn recursive_extraction_respects_max_bytes_guardrail() {
7430 let leaves: Vec<String> = (0..40)
7432 .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7433 .collect();
7434 let props = json!({ "root": leaves });
7435 let (blob, positions, stats) = extract_property_fts(
7436 &props,
7437 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7438 );
7439 assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7440 let blob = blob.expect("blob must still be emitted");
7441 assert!(
7442 blob.len() <= MAX_EXTRACTED_BYTES,
7443 "blob must not exceed MAX_EXTRACTED_BYTES"
7444 );
7445 for pos in &positions {
7447 assert!(pos.end_offset <= blob.len());
7448 let slice = &blob[pos.start_offset..pos.end_offset];
7449 assert!(!slice.is_empty());
7452 assert!(!slice.contains(LEAF_SEPARATOR));
7453 }
7454 assert!(!blob.ends_with(LEAF_SEPARATOR));
7456 }
7457
7458 #[test]
7459 fn recursive_extraction_respects_exclude_paths() {
7460 let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7461 let (blob, positions, stats) = extract_property_fts(
7462 &props,
7463 &schema_with_excludes(
7464 vec![PropertyPathEntry::recursive("$.payload")],
7465 vec!["$.payload.priv".to_owned()],
7466 ),
7467 );
7468 let blob = blob.expect("blob emitted");
7469 assert!(blob.contains("yes"));
7470 assert!(!blob.contains("no"));
7471 assert_eq!(positions.len(), 1);
7472 assert_eq!(positions[0].leaf_path, "$.payload.pub");
7473 assert!(stats.excluded_subtree > 0);
7474 }
7475
7476 #[test]
7477 fn position_map_entries_match_emitted_leaves_in_order() {
7478 let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7479 let (blob, positions, _stats) = extract_property_fts(
7480 &props,
7481 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7482 );
7483 let blob = blob.expect("blob emitted");
7484 assert_eq!(positions.len(), 3);
7485 assert_eq!(positions[0].leaf_path, "$.root.a");
7487 assert_eq!(positions[1].leaf_path, "$.root.b");
7488 assert_eq!(positions[2].leaf_path, "$.root.c");
7489 let mut prev_end: usize = 0;
7491 for (i, pos) in positions.iter().enumerate() {
7492 assert!(pos.start_offset >= prev_end);
7493 assert!(pos.end_offset > pos.start_offset);
7494 assert!(pos.end_offset <= blob.len());
7495 let slice = &blob[pos.start_offset..pos.end_offset];
7496 match i {
7497 0 => assert_eq!(slice, "alpha"),
7498 1 => assert_eq!(slice, "bravo"),
7499 2 => assert_eq!(slice, "charlie"),
7500 _ => unreachable!(),
7501 }
7502 prev_end = pos.end_offset;
7503 }
7504 }
7505
7506 #[test]
7507 fn scalar_only_schema_produces_empty_position_map() {
7508 let props = json!({"name": "alpha", "title": "beta"});
7509 let (blob, positions, _stats) = extract_property_fts(
7510 &props,
7511 &schema(vec![
7512 PropertyPathEntry::scalar("$.name"),
7513 PropertyPathEntry::scalar("$.title"),
7514 ]),
7515 );
7516 assert_eq!(blob.as_deref(), Some("alpha beta"));
7517 assert!(
7518 positions.is_empty(),
7519 "scalar-only schema must emit no position entries"
7520 );
7521 let _: Vec<PositionEntry> = positions;
7524 }
7525
7526 fn assert_unique_start_offsets(positions: &[PositionEntry]) {
7527 let mut seen = std::collections::HashSet::new();
7528 for pos in positions {
7529 let start = pos.start_offset;
7530 assert!(
7531 seen.insert(start),
7532 "duplicate start_offset {start} in positions {positions:?}"
7533 );
7534 }
7535 }
7536
7537 #[test]
7538 fn recursive_extraction_empty_then_nonempty_in_array() {
7539 let props = json!({"payload": {"xs": ["", "x"]}});
7540 let (combined, positions, _stats) = extract_property_fts(
7541 &props,
7542 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7543 );
7544 assert_eq!(combined.as_deref(), Some("x"));
7545 assert_eq!(positions.len(), 1);
7546 assert_eq!(positions[0].leaf_path, "$.payload.xs[1]");
7547 assert_eq!(positions[0].start_offset, 0);
7548 assert_eq!(positions[0].end_offset, 1);
7549 assert_unique_start_offsets(&positions);
7550 }
7551
7552 #[test]
7553 fn recursive_extraction_two_empties_then_nonempty_in_array() {
7554 let props = json!({"payload": {"xs": ["", "", "x"]}});
7555 let (combined, positions, _stats) = extract_property_fts(
7556 &props,
7557 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7558 );
7559 assert_eq!(combined.as_deref(), Some("x"));
7560 assert_eq!(positions.len(), 1);
7561 assert_eq!(positions[0].leaf_path, "$.payload.xs[2]");
7562 assert_eq!(positions[0].start_offset, 0);
7563 assert_eq!(positions[0].end_offset, 1);
7564 assert_unique_start_offsets(&positions);
7565 }
7566
7567 #[test]
7568 fn recursive_extraction_empty_then_nonempty_sibling_keys() {
7569 let props = json!({"payload": {"a": "", "b": "x"}});
7570 let (combined, positions, _stats) = extract_property_fts(
7571 &props,
7572 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7573 );
7574 assert_eq!(combined.as_deref(), Some("x"));
7575 assert_eq!(positions.len(), 1);
7576 assert_eq!(positions[0].leaf_path, "$.payload.b");
7577 assert_eq!(positions[0].start_offset, 0);
7578 assert_eq!(positions[0].end_offset, 1);
7579 assert_unique_start_offsets(&positions);
7580 }
7581
7582 #[test]
7583 fn recursive_extraction_nested_empty_then_nonempty_sibling_keys() {
7584 let props = json!({"payload": {"inner": {"a": "", "b": "x"}}});
7585 let (combined, positions, _stats) = extract_property_fts(
7586 &props,
7587 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7588 );
7589 assert_eq!(combined.as_deref(), Some("x"));
7590 assert_eq!(positions.len(), 1);
7591 assert_eq!(positions[0].leaf_path, "$.payload.inner.b");
7592 assert_eq!(positions[0].start_offset, 0);
7593 assert_eq!(positions[0].end_offset, 1);
7594 assert_unique_start_offsets(&positions);
7595 }
7596
7597 #[test]
7598 fn recursive_extraction_descent_past_empty_sibling_into_nested_subtree() {
7599 let props = json!({"payload": {"a": "", "b": {"c": "x"}}});
7600 let (combined, positions, _stats) = extract_property_fts(
7601 &props,
7602 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7603 );
7604 assert_eq!(combined.as_deref(), Some("x"));
7605 assert_eq!(positions.len(), 1);
7606 assert_eq!(positions[0].leaf_path, "$.payload.b.c");
7607 assert_eq!(positions[0].start_offset, 0);
7608 assert_eq!(positions[0].end_offset, 1);
7609 assert_unique_start_offsets(&positions);
7610 }
7611
7612 #[test]
7613 fn recursive_extraction_all_empty_shapes_emit_no_positions() {
7614 let cases = vec![
7615 json!({"payload": {}}),
7616 json!({"payload": {"a": ""}}),
7617 json!({"payload": {"xs": []}}),
7618 json!({"payload": {"xs": [""]}}),
7619 json!({"payload": {"xs": ["", ""]}}),
7620 json!({"payload": {"xs": ["", "", ""]}}),
7621 ];
7622 for case in cases {
7623 let (combined, positions, _stats) = extract_property_fts(
7624 &case,
7625 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7626 );
7627 assert!(
7628 combined.is_none(),
7629 "all-empty payload {case:?} must produce no combined text, got {combined:?}"
7630 );
7631 assert!(
7632 positions.is_empty(),
7633 "all-empty payload {case:?} must produce no positions, got {positions:?}"
7634 );
7635 }
7636 }
7637
7638 #[test]
7639 fn recursive_extraction_nonempty_then_empty_then_nonempty() {
7640 let props = json!({"payload": {"xs": ["x", "", "y"]}});
7641 let (combined, positions, _stats) = extract_property_fts(
7642 &props,
7643 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7644 );
7645 let blob = combined.expect("blob emitted");
7646 assert_eq!(positions.len(), 2);
7647 assert_eq!(positions[0].leaf_path, "$.payload.xs[0]");
7648 assert_eq!(positions[1].leaf_path, "$.payload.xs[2]");
7649 assert_eq!(positions[0].start_offset, 0);
7650 assert_eq!(positions[0].end_offset, 1);
7651 assert_eq!(positions[1].start_offset, 1 + LEAF_SEPARATOR.len());
7655 assert_eq!(positions[1].end_offset, 2 + LEAF_SEPARATOR.len());
7656 assert_eq!(
7657 &blob[positions[0].start_offset..positions[0].end_offset],
7658 "x"
7659 );
7660 assert_eq!(
7661 &blob[positions[1].start_offset..positions[1].end_offset],
7662 "y"
7663 );
7664 assert_unique_start_offsets(&positions);
7665 }
7666
7667 #[test]
7668 fn recursive_extraction_null_leaves_unchanged() {
7669 let props = json!({"payload": {"xs": [null, null]}});
7670 let (combined, positions, _stats) = extract_property_fts(
7671 &props,
7672 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7673 );
7674 assert!(combined.is_none());
7675 assert!(positions.is_empty());
7676 }
7677 }
7678}