1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::{DEFAULT_FTS_TOKENIZER, SchemaManager};
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14 OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15 OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16 OperationalValidationMode, extract_secondary_index_entries_for_current,
17 extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18 parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26 pub target: ProjectionTarget,
28 pub payload: String,
30}
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35 #[default]
37 Preserve,
38 Replace,
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45 #[default]
47 Warn,
48 Require,
51}
52
53#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56 pub row_id: String,
57 pub logical_id: String,
58 pub kind: String,
59 pub properties: String,
60 pub source_ref: Option<String>,
61 pub upsert: bool,
64 pub chunk_policy: ChunkPolicy,
66 pub content_ref: Option<String>,
68}
69
70#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EdgeInsert {
73 pub row_id: String,
74 pub logical_id: String,
75 pub source_logical_id: String,
76 pub target_logical_id: String,
77 pub kind: String,
78 pub properties: String,
79 pub source_ref: Option<String>,
80 pub upsert: bool,
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
87pub struct NodeRetire {
88 pub logical_id: String,
89 pub source_ref: Option<String>,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeRetire {
95 pub logical_id: String,
96 pub source_ref: Option<String>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq)]
101pub struct ChunkInsert {
102 pub id: String,
103 pub node_logical_id: String,
104 pub text_content: String,
105 pub byte_start: Option<i64>,
106 pub byte_end: Option<i64>,
107 pub content_hash: Option<String>,
109}
110
111#[derive(Clone, Debug, PartialEq)]
118pub struct VecInsert {
119 pub chunk_id: String,
120 pub embedding: Vec<f32>,
121}
122
123#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum OperationalWrite {
126 Append {
127 collection: String,
128 record_key: String,
129 payload_json: String,
130 source_ref: Option<String>,
131 },
132 Put {
133 collection: String,
134 record_key: String,
135 payload_json: String,
136 source_ref: Option<String>,
137 },
138 Delete {
139 collection: String,
140 record_key: String,
141 source_ref: Option<String>,
142 },
143}
144
145#[derive(Clone, Debug, PartialEq, Eq)]
147pub struct RunInsert {
148 pub id: String,
149 pub kind: String,
150 pub status: String,
151 pub properties: String,
152 pub source_ref: Option<String>,
153 pub upsert: bool,
154 pub supersedes_id: Option<String>,
155}
156
157#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepInsert {
160 pub id: String,
161 pub run_id: String,
162 pub kind: String,
163 pub status: String,
164 pub properties: String,
165 pub source_ref: Option<String>,
166 pub upsert: bool,
167 pub supersedes_id: Option<String>,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct ActionInsert {
173 pub id: String,
174 pub step_id: String,
175 pub kind: String,
176 pub status: String,
177 pub properties: String,
178 pub source_ref: Option<String>,
179 pub upsert: bool,
180 pub supersedes_id: Option<String>,
181}
182
183const MAX_NODES: usize = 10_000;
185const MAX_EDGES: usize = 10_000;
186const MAX_CHUNKS: usize = 50_000;
187const MAX_RETIRES: usize = 10_000;
188const MAX_RUNTIME_ITEMS: usize = 10_000;
189const MAX_OPERATIONAL: usize = 10_000;
190const MAX_TOTAL_ITEMS: usize = 100_000;
191
192const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
199
200#[derive(Clone, Debug, PartialEq)]
202pub struct WriteRequest {
203 pub label: String,
204 pub nodes: Vec<NodeInsert>,
205 pub node_retires: Vec<NodeRetire>,
206 pub edges: Vec<EdgeInsert>,
207 pub edge_retires: Vec<EdgeRetire>,
208 pub chunks: Vec<ChunkInsert>,
209 pub runs: Vec<RunInsert>,
210 pub steps: Vec<StepInsert>,
211 pub actions: Vec<ActionInsert>,
212 pub optional_backfills: Vec<OptionalProjectionTask>,
213 pub vec_inserts: Vec<VecInsert>,
216 pub operational_writes: Vec<OperationalWrite>,
217}
218
219#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct WriteReceipt {
222 pub label: String,
223 pub optional_backfill_count: usize,
224 pub warnings: Vec<String>,
225 pub provenance_warnings: Vec<String>,
226}
227
228#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct LastAccessTouchRequest {
231 pub logical_ids: Vec<String>,
232 pub touched_at: i64,
233 pub source_ref: Option<String>,
234}
235
236#[derive(Clone, Debug, PartialEq, Eq)]
238pub struct LastAccessTouchReport {
239 pub touched_logical_ids: usize,
240 pub touched_at: i64,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct FtsProjectionRow {
245 chunk_id: String,
246 node_logical_id: String,
247 kind: String,
248 text_content: String,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
252struct FtsPropertyProjectionRow {
253 node_logical_id: String,
254 kind: String,
255 text_content: String,
256 positions: Vec<PositionEntry>,
257 columns: Vec<(String, String)>,
260}
261
262pub(crate) const MAX_RECURSIVE_DEPTH: usize = 8;
265
266pub(crate) const MAX_EXTRACTED_BYTES: usize = 65_536;
276
277pub(crate) const LEAF_SEPARATOR: &str = " fathomdbphrasebreaksentinel ";
304
305#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
308pub(crate) enum PropertyPathMode {
309 #[default]
312 Scalar,
313 Recursive,
316}
317
318#[derive(Clone, Debug, PartialEq)]
320pub(crate) struct PropertyPathEntry {
321 pub path: String,
322 pub mode: PropertyPathMode,
323 pub weight: Option<f32>,
325}
326
327impl Eq for PropertyPathEntry {}
329
330impl PropertyPathEntry {
331 pub(crate) fn scalar(path: impl Into<String>) -> Self {
332 Self {
333 path: path.into(),
334 mode: PropertyPathMode::Scalar,
335 weight: None,
336 }
337 }
338
339 #[cfg(test)]
340 pub(crate) fn recursive(path: impl Into<String>) -> Self {
341 Self {
342 path: path.into(),
343 mode: PropertyPathMode::Recursive,
344 weight: None,
345 }
346 }
347}
348
349#[derive(Clone, Debug, PartialEq, Eq)]
351pub(crate) struct PropertyFtsSchema {
352 pub paths: Vec<PropertyPathEntry>,
355 pub separator: String,
358 pub exclude_paths: Vec<String>,
367}
368
369#[derive(Clone, Debug, PartialEq, Eq)]
373pub(crate) struct PositionEntry {
374 pub start_offset: usize,
375 pub end_offset: usize,
376 pub leaf_path: String,
377}
378
379#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
382pub(crate) struct ExtractStats {
383 pub depth_cap_hit: usize,
384 pub byte_cap_reached: bool,
389 pub excluded_subtree: usize,
390}
391
392impl ExtractStats {
393 fn merge(&mut self, other: ExtractStats) {
394 self.depth_cap_hit += other.depth_cap_hit;
395 self.byte_cap_reached |= other.byte_cap_reached;
396 self.excluded_subtree += other.excluded_subtree;
397 }
398}
399
400struct PreparedWrite {
401 label: String,
402 nodes: Vec<NodeInsert>,
403 node_retires: Vec<NodeRetire>,
404 edges: Vec<EdgeInsert>,
405 edge_retires: Vec<EdgeRetire>,
406 chunks: Vec<ChunkInsert>,
407 runs: Vec<RunInsert>,
408 steps: Vec<StepInsert>,
409 actions: Vec<ActionInsert>,
410 #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
413 vec_inserts: Vec<VecInsert>,
414 operational_writes: Vec<OperationalWrite>,
415 operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
416 operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
417 operational_validation_warnings: Vec<String>,
418 node_kinds: HashMap<String, String>,
421 required_fts_rows: Vec<FtsProjectionRow>,
423 required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
425 optional_backfills: Vec<OptionalProjectionTask>,
426}
427
428enum WriteMessage {
429 Submit {
430 prepared: Box<PreparedWrite>,
431 reply: Sender<Result<WriteReceipt, EngineError>>,
432 },
433 TouchLastAccessed {
434 request: LastAccessTouchRequest,
435 reply: Sender<Result<LastAccessTouchReport, EngineError>>,
436 },
437}
438
439#[derive(Debug)]
444pub struct WriterActor {
445 sender: ManuallyDrop<SyncSender<WriteMessage>>,
446 thread_handle: Option<thread::JoinHandle<()>>,
447 provenance_mode: ProvenanceMode,
448 _telemetry: Arc<TelemetryCounters>,
450}
451
452impl WriterActor {
453 pub fn start(
456 path: impl AsRef<Path>,
457 schema_manager: Arc<SchemaManager>,
458 provenance_mode: ProvenanceMode,
459 telemetry: Arc<TelemetryCounters>,
460 ) -> Result<Self, EngineError> {
461 let database_path = path.as_ref().to_path_buf();
462 let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
463
464 let writer_telemetry = Arc::clone(&telemetry);
465 let handle = thread::Builder::new()
466 .name("fathomdb-writer".to_owned())
467 .spawn(move || {
468 writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
469 })
470 .map_err(EngineError::Io)?;
471
472 Ok(Self {
473 sender: ManuallyDrop::new(sender),
474 thread_handle: Some(handle),
475 provenance_mode,
476 _telemetry: telemetry,
477 })
478 }
479
480 fn is_thread_alive(&self) -> bool {
482 self.thread_handle
483 .as_ref()
484 .is_some_and(|h| !h.is_finished())
485 }
486
487 fn check_thread_alive(&self) -> Result<(), EngineError> {
489 if self.is_thread_alive() {
490 Ok(())
491 } else {
492 Err(EngineError::WriterRejected(
493 "writer thread has exited".to_owned(),
494 ))
495 }
496 }
497
498 pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
502 self.check_thread_alive()?;
503 let prepared = prepare_write(request, self.provenance_mode)?;
504 let (reply_tx, reply_rx) = mpsc::channel();
505 self.sender
506 .send(WriteMessage::Submit {
507 prepared: Box::new(prepared),
508 reply: reply_tx,
509 })
510 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
511
512 recv_with_timeout(&reply_rx)
513 }
514
515 pub fn touch_last_accessed(
519 &self,
520 request: LastAccessTouchRequest,
521 ) -> Result<LastAccessTouchReport, EngineError> {
522 self.check_thread_alive()?;
523 prepare_touch_last_accessed(&request, self.provenance_mode)?;
524 let (reply_tx, reply_rx) = mpsc::channel();
525 self.sender
526 .send(WriteMessage::TouchLastAccessed {
527 request,
528 reply: reply_tx,
529 })
530 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
531
532 recv_with_timeout(&reply_rx)
533 }
534}
535
536#[cfg(not(feature = "tracing"))]
537#[allow(clippy::print_stderr)]
538fn stderr_panic_notice() {
539 eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
540}
541
542impl Drop for WriterActor {
543 fn drop(&mut self) {
544 unsafe { ManuallyDrop::drop(&mut self.sender) };
551
552 if let Some(handle) = self.thread_handle.take() {
554 match handle.join() {
555 Ok(()) => {}
556 Err(payload) => {
557 if std::thread::panicking() {
558 trace_warn!(
559 "writer thread panicked during shutdown (suppressed: already panicking)"
560 );
561 #[cfg(not(feature = "tracing"))]
562 stderr_panic_notice();
563 } else {
564 std::panic::resume_unwind(payload);
565 }
566 }
567 }
568 }
569 }
570}
571
572fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
574 rx.recv_timeout(WRITER_REPLY_TIMEOUT)
575 .map_err(|error| match error {
576 mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
577 "write timed out waiting for writer thread reply — the write may still commit"
578 .to_owned(),
579 ),
580 mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
581 })
582 .and_then(|result| result)
583}
584
585fn prepare_touch_last_accessed(
586 request: &LastAccessTouchRequest,
587 mode: ProvenanceMode,
588) -> Result<(), EngineError> {
589 if request.logical_ids.is_empty() {
590 return Err(EngineError::InvalidWrite(
591 "touch_last_accessed requires at least one logical_id".to_owned(),
592 ));
593 }
594 for logical_id in &request.logical_ids {
595 if logical_id.trim().is_empty() {
596 return Err(EngineError::InvalidWrite(
597 "touch_last_accessed requires non-empty logical_ids".to_owned(),
598 ));
599 }
600 }
601 if mode == ProvenanceMode::Require && request.source_ref.is_none() {
602 return Err(EngineError::InvalidWrite(
603 "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
604 .to_owned(),
605 ));
606 }
607 Ok(())
608}
609
610fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
611 let missing: Vec<String> = request
612 .nodes
613 .iter()
614 .filter(|n| n.source_ref.is_none())
615 .map(|n| format!("node '{}'", n.logical_id))
616 .chain(
617 request
618 .node_retires
619 .iter()
620 .filter(|r| r.source_ref.is_none())
621 .map(|r| format!("node retire '{}'", r.logical_id)),
622 )
623 .chain(
624 request
625 .edges
626 .iter()
627 .filter(|e| e.source_ref.is_none())
628 .map(|e| format!("edge '{}'", e.logical_id)),
629 )
630 .chain(
631 request
632 .edge_retires
633 .iter()
634 .filter(|r| r.source_ref.is_none())
635 .map(|r| format!("edge retire '{}'", r.logical_id)),
636 )
637 .chain(
638 request
639 .runs
640 .iter()
641 .filter(|r| r.source_ref.is_none())
642 .map(|r| format!("run '{}'", r.id)),
643 )
644 .chain(
645 request
646 .steps
647 .iter()
648 .filter(|s| s.source_ref.is_none())
649 .map(|s| format!("step '{}'", s.id)),
650 )
651 .chain(
652 request
653 .actions
654 .iter()
655 .filter(|a| a.source_ref.is_none())
656 .map(|a| format!("action '{}'", a.id)),
657 )
658 .chain(
659 request
660 .operational_writes
661 .iter()
662 .filter(|write| operational_write_source_ref(write).is_none())
663 .map(|write| {
664 format!(
665 "operational {} '{}:{}'",
666 operational_write_kind(write),
667 operational_write_collection(write),
668 operational_write_record_key(write)
669 )
670 }),
671 )
672 .collect();
673
674 if missing.is_empty() {
675 Ok(())
676 } else {
677 Err(EngineError::InvalidWrite(format!(
678 "ProvenanceMode::Require: missing source_ref on: {}",
679 missing.join(", ")
680 )))
681 }
682}
683
684fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
685 if request.nodes.len() > MAX_NODES {
686 return Err(EngineError::InvalidWrite(format!(
687 "too many nodes: {} exceeds limit of {MAX_NODES}",
688 request.nodes.len()
689 )));
690 }
691 if request.edges.len() > MAX_EDGES {
692 return Err(EngineError::InvalidWrite(format!(
693 "too many edges: {} exceeds limit of {MAX_EDGES}",
694 request.edges.len()
695 )));
696 }
697 if request.chunks.len() > MAX_CHUNKS {
698 return Err(EngineError::InvalidWrite(format!(
699 "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
700 request.chunks.len()
701 )));
702 }
703 let retires = request.node_retires.len() + request.edge_retires.len();
704 if retires > MAX_RETIRES {
705 return Err(EngineError::InvalidWrite(format!(
706 "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
707 )));
708 }
709 let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
710 if runtime_items > MAX_RUNTIME_ITEMS {
711 return Err(EngineError::InvalidWrite(format!(
712 "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
713 )));
714 }
715 if request.operational_writes.len() > MAX_OPERATIONAL {
716 return Err(EngineError::InvalidWrite(format!(
717 "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
718 request.operational_writes.len()
719 )));
720 }
721 let total = request.nodes.len()
722 + request.node_retires.len()
723 + request.edges.len()
724 + request.edge_retires.len()
725 + request.chunks.len()
726 + request.runs.len()
727 + request.steps.len()
728 + request.actions.len()
729 + request.vec_inserts.len()
730 + request.operational_writes.len();
731 if total > MAX_TOTAL_ITEMS {
732 return Err(EngineError::InvalidWrite(format!(
733 "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
734 )));
735 }
736 Ok(())
737}
738
739#[allow(clippy::too_many_lines)]
740fn prepare_write(
741 request: WriteRequest,
742 mode: ProvenanceMode,
743) -> Result<PreparedWrite, EngineError> {
744 validate_request_size(&request)?;
745
746 for node in &request.nodes {
748 if node.row_id.is_empty() {
749 return Err(EngineError::InvalidWrite(
750 "NodeInsert has empty row_id".to_owned(),
751 ));
752 }
753 if node.logical_id.is_empty() {
754 return Err(EngineError::InvalidWrite(
755 "NodeInsert has empty logical_id".to_owned(),
756 ));
757 }
758 }
759 for edge in &request.edges {
760 if edge.row_id.is_empty() {
761 return Err(EngineError::InvalidWrite(
762 "EdgeInsert has empty row_id".to_owned(),
763 ));
764 }
765 if edge.logical_id.is_empty() {
766 return Err(EngineError::InvalidWrite(
767 "EdgeInsert has empty logical_id".to_owned(),
768 ));
769 }
770 }
771 for chunk in &request.chunks {
772 if chunk.id.is_empty() {
773 return Err(EngineError::InvalidWrite(
774 "ChunkInsert has empty id".to_owned(),
775 ));
776 }
777 if chunk.text_content.is_empty() {
778 return Err(EngineError::InvalidWrite(format!(
779 "chunk '{}' has empty text_content; empty chunks are not allowed",
780 chunk.id
781 )));
782 }
783 }
784 for run in &request.runs {
785 if run.id.is_empty() {
786 return Err(EngineError::InvalidWrite(
787 "RunInsert has empty id".to_owned(),
788 ));
789 }
790 }
791 for step in &request.steps {
792 if step.id.is_empty() {
793 return Err(EngineError::InvalidWrite(
794 "StepInsert has empty id".to_owned(),
795 ));
796 }
797 }
798 for action in &request.actions {
799 if action.id.is_empty() {
800 return Err(EngineError::InvalidWrite(
801 "ActionInsert has empty id".to_owned(),
802 ));
803 }
804 }
805 for vi in &request.vec_inserts {
806 if vi.chunk_id.is_empty() {
807 return Err(EngineError::InvalidWrite(
808 "VecInsert has empty chunk_id".to_owned(),
809 ));
810 }
811 if vi.embedding.is_empty() {
812 return Err(EngineError::InvalidWrite(format!(
813 "VecInsert for chunk '{}' has empty embedding",
814 vi.chunk_id
815 )));
816 }
817 }
818 for operational in &request.operational_writes {
819 if operational_write_collection(operational).is_empty() {
820 return Err(EngineError::InvalidWrite(
821 "OperationalWrite has empty collection".to_owned(),
822 ));
823 }
824 if operational_write_record_key(operational).is_empty() {
825 return Err(EngineError::InvalidWrite(format!(
826 "OperationalWrite for collection '{}' has empty record_key",
827 operational_write_collection(operational)
828 )));
829 }
830 match operational {
831 OperationalWrite::Append { payload_json, .. }
832 | OperationalWrite::Put { payload_json, .. } => {
833 if payload_json.is_empty() {
834 return Err(EngineError::InvalidWrite(format!(
835 "OperationalWrite {} '{}:{}' has empty payload_json",
836 operational_write_kind(operational),
837 operational_write_collection(operational),
838 operational_write_record_key(operational)
839 )));
840 }
841 }
842 OperationalWrite::Delete { .. } => {}
843 }
844 }
845
846 {
848 let mut seen = std::collections::HashSet::new();
849 for node in &request.nodes {
850 if !seen.insert(node.row_id.as_str()) {
851 return Err(EngineError::InvalidWrite(format!(
852 "duplicate row_id '{}' within the same WriteRequest",
853 node.row_id
854 )));
855 }
856 }
857 for edge in &request.edges {
858 if !seen.insert(edge.row_id.as_str()) {
859 return Err(EngineError::InvalidWrite(format!(
860 "duplicate row_id '{}' within the same WriteRequest",
861 edge.row_id
862 )));
863 }
864 }
865 }
866
867 if mode == ProvenanceMode::Require {
869 check_require_provenance(&request)?;
870 }
871
872 for run in &request.runs {
874 if run.upsert && run.supersedes_id.is_none() {
875 return Err(EngineError::InvalidWrite(format!(
876 "run '{}': upsert=true requires supersedes_id to be set",
877 run.id
878 )));
879 }
880 }
881 for step in &request.steps {
882 if step.upsert && step.supersedes_id.is_none() {
883 return Err(EngineError::InvalidWrite(format!(
884 "step '{}': upsert=true requires supersedes_id to be set",
885 step.id
886 )));
887 }
888 }
889 for action in &request.actions {
890 if action.upsert && action.supersedes_id.is_none() {
891 return Err(EngineError::InvalidWrite(format!(
892 "action '{}': upsert=true requires supersedes_id to be set",
893 action.id
894 )));
895 }
896 }
897
898 let node_kinds = request
899 .nodes
900 .iter()
901 .map(|node| (node.logical_id.clone(), node.kind.clone()))
902 .collect::<HashMap<_, _>>();
903
904 Ok(PreparedWrite {
905 label: request.label,
906 nodes: request.nodes,
907 node_retires: request.node_retires,
908 edges: request.edges,
909 edge_retires: request.edge_retires,
910 chunks: request.chunks,
911 runs: request.runs,
912 steps: request.steps,
913 actions: request.actions,
914 vec_inserts: request.vec_inserts,
915 operational_writes: request.operational_writes,
916 operational_collection_kinds: HashMap::new(),
917 operational_collection_filter_fields: HashMap::new(),
918 operational_validation_warnings: Vec::new(),
919 node_kinds,
920 required_fts_rows: Vec::new(),
921 required_property_fts_rows: Vec::new(),
922 optional_backfills: request.optional_backfills,
923 })
924}
925
926fn writer_loop(
927 database_path: &Path,
928 schema_manager: &Arc<SchemaManager>,
929 receiver: mpsc::Receiver<WriteMessage>,
930 telemetry: &TelemetryCounters,
931) {
932 trace_info!("writer thread started");
933
934 let mut conn = match sqlite::open_connection(database_path) {
935 Ok(conn) => conn,
936 Err(error) => {
937 trace_error!(error = %error, "writer thread: database connection failed");
938 reject_all(receiver, &error.to_string());
939 return;
940 }
941 };
942
943 if let Err(error) = schema_manager.bootstrap(&conn) {
944 trace_error!(error = %error, "writer thread: schema bootstrap failed");
945 reject_all(receiver, &error.to_string());
946 return;
947 }
948
949 for message in receiver {
950 match message {
951 WriteMessage::Submit {
952 mut prepared,
953 reply,
954 } => {
955 #[cfg(feature = "tracing")]
956 let start = std::time::Instant::now();
957 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
958 resolve_and_apply(&mut conn, &mut prepared)
959 }));
960 if let Ok(inner) = result {
961 #[allow(unused_variables)]
962 if let Err(error) = &inner {
963 trace_error!(
964 label = %prepared.label,
965 error = %error,
966 "write failed"
967 );
968 telemetry.increment_errors();
969 } else {
970 let row_count = (prepared.nodes.len()
971 + prepared.edges.len()
972 + prepared.chunks.len()) as u64;
973 telemetry.increment_writes(row_count);
974 trace_info!(
975 label = %prepared.label,
976 nodes = prepared.nodes.len(),
977 edges = prepared.edges.len(),
978 chunks = prepared.chunks.len(),
979 duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
980 "write committed"
981 );
982 }
983 let _ = reply.send(inner);
984 } else {
985 trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
986 telemetry.increment_errors();
987 let _ = conn.execute_batch("ROLLBACK");
989 let _ = reply.send(Err(EngineError::WriterRejected(
990 "writer thread panic during resolve_and_apply".to_owned(),
991 )));
992 }
993 }
994 WriteMessage::TouchLastAccessed { request, reply } => {
995 let result = apply_touch_last_accessed(&mut conn, &request);
996 if result.is_ok() {
997 telemetry.increment_writes(0);
998 } else {
999 telemetry.increment_errors();
1000 }
1001 let _ = reply.send(result);
1002 }
1003 }
1004 }
1005
1006 trace_info!("writer thread shutting down");
1007}
1008
1009fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
1010 for message in receiver {
1011 match message {
1012 WriteMessage::Submit { reply, .. } => {
1013 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1014 }
1015 WriteMessage::TouchLastAccessed { reply, .. } => {
1016 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1017 }
1018 }
1019 }
1020}
1021
1022fn resolve_fts_rows(
1030 conn: &rusqlite::Connection,
1031 prepared: &mut PreparedWrite,
1032) -> Result<(), EngineError> {
1033 let retiring_ids: std::collections::HashSet<&str> = prepared
1034 .node_retires
1035 .iter()
1036 .map(|r| r.logical_id.as_str())
1037 .collect();
1038 for chunk in &prepared.chunks {
1039 if retiring_ids.contains(chunk.node_logical_id.as_str()) {
1040 return Err(EngineError::InvalidWrite(format!(
1041 "chunk '{}' references node_logical_id '{}' which is being retired in the same \
1042 WriteRequest; retire and chunk insertion for the same node must not be combined",
1043 chunk.id, chunk.node_logical_id
1044 )));
1045 }
1046 }
1047 for chunk in &prepared.chunks {
1048 let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
1049 k.clone()
1050 } else {
1051 match conn.query_row(
1052 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1053 params![chunk.node_logical_id],
1054 |row| row.get::<_, String>(0),
1055 ) {
1056 Ok(kind) => kind,
1057 Err(rusqlite::Error::QueryReturnedNoRows) => {
1058 return Err(EngineError::InvalidWrite(format!(
1059 "chunk '{}' references node_logical_id '{}' that is not present in this \
1060 write request or the database \
1061 (v1 limitation: chunks and their nodes must be submitted together or the \
1062 node must already exist)",
1063 chunk.id, chunk.node_logical_id
1064 )));
1065 }
1066 Err(e) => return Err(EngineError::Sqlite(e)),
1067 }
1068 };
1069 prepared.required_fts_rows.push(FtsProjectionRow {
1070 chunk_id: chunk.id.clone(),
1071 node_logical_id: chunk.node_logical_id.clone(),
1072 kind,
1073 text_content: chunk.text_content.clone(),
1074 });
1075 }
1076 trace_debug!(
1077 fts_rows = prepared.required_fts_rows.len(),
1078 chunks_processed = prepared.chunks.len(),
1079 "fts row resolution completed"
1080 );
1081 Ok(())
1082}
1083
1084fn resolve_property_fts_rows(
1087 conn: &rusqlite::Connection,
1088 prepared: &mut PreparedWrite,
1089) -> Result<(), EngineError> {
1090 if prepared.nodes.is_empty() {
1091 return Ok(());
1092 }
1093
1094 let schemas: HashMap<String, PropertyFtsSchema> =
1095 load_fts_property_schemas(conn)?.into_iter().collect();
1096
1097 if schemas.is_empty() {
1098 return Ok(());
1099 }
1100
1101 let mut combined_stats = ExtractStats::default();
1102 for node in &prepared.nodes {
1103 let Some(schema) = schemas.get(&node.kind) else {
1104 continue;
1105 };
1106 let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
1107 let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
1108 let (text_content, positions, stats) = extract_property_fts(&props, schema);
1109 combined_stats.merge(stats);
1110 if let Some(text_content) = text_content {
1111 let columns = if has_weights {
1112 extract_property_fts_columns(&props, schema)
1113 } else {
1114 Vec::new()
1115 };
1116 prepared
1117 .required_property_fts_rows
1118 .push(FtsPropertyProjectionRow {
1119 node_logical_id: node.logical_id.clone(),
1120 kind: node.kind.clone(),
1121 text_content,
1122 positions,
1123 columns,
1124 });
1125 }
1126 }
1127 if combined_stats != ExtractStats::default() {
1128 trace_debug!(
1129 depth_cap_hit = combined_stats.depth_cap_hit,
1130 byte_cap_reached = combined_stats.byte_cap_reached,
1131 excluded_subtree = combined_stats.excluded_subtree,
1132 "property fts recursive extraction guardrails engaged"
1133 );
1134 }
1135 trace_debug!(
1136 property_fts_rows = prepared.required_property_fts_rows.len(),
1137 nodes_processed = prepared.nodes.len(),
1138 "property fts row resolution completed"
1139 );
1140 Ok(())
1141}
1142
1143pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
1147 let Some(path) = path.strip_prefix("$.") else {
1148 return Vec::new();
1149 };
1150 let mut current = value;
1151 for segment in path.split('.') {
1152 match current.get(segment) {
1153 Some(v) => current = v,
1154 None => return Vec::new(),
1155 }
1156 }
1157 match current {
1158 serde_json::Value::String(s) => vec![s.clone()],
1159 serde_json::Value::Number(n) => vec![n.to_string()],
1160 serde_json::Value::Bool(b) => vec![b.to_string()],
1161 serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
1162 serde_json::Value::Array(arr) => arr
1163 .iter()
1164 .filter_map(|v| match v {
1165 serde_json::Value::String(s) => Some(s.clone()),
1166 serde_json::Value::Number(n) => Some(n.to_string()),
1167 serde_json::Value::Bool(b) => Some(b.to_string()),
1168 _ => None,
1169 })
1170 .collect(),
1171 }
1172}
1173
1174pub(crate) fn extract_property_fts(
1194 props: &serde_json::Value,
1195 schema: &PropertyFtsSchema,
1196) -> (Option<String>, Vec<PositionEntry>, ExtractStats) {
1197 let mut walker = RecursiveWalker {
1198 blob: String::new(),
1199 positions: Vec::new(),
1200 stats: ExtractStats::default(),
1201 exclude_paths: schema.exclude_paths.clone(),
1202 stopped: false,
1203 };
1204
1205 let mut scalar_parts: Vec<String> = Vec::new();
1206
1207 for entry in &schema.paths {
1208 match entry.mode {
1209 PropertyPathMode::Scalar => {
1210 scalar_parts.extend(extract_json_path(props, &entry.path));
1211 }
1212 PropertyPathMode::Recursive => {
1213 let root = resolve_path_root(props, &entry.path);
1214 if let Some(root) = root {
1215 walker.walk(&entry.path, root, 0);
1216 }
1217 }
1218 }
1219 }
1220
1221 let scalar_text = if scalar_parts.is_empty() {
1227 None
1228 } else {
1229 Some(scalar_parts.join(&schema.separator))
1230 };
1231
1232 let combined = match (scalar_text, walker.blob.is_empty()) {
1233 (None, true) => None,
1234 (None, false) => Some(walker.blob.clone()),
1235 (Some(s), true) => Some(s),
1236 (Some(mut s), false) => {
1237 let offset = s.len() + LEAF_SEPARATOR.len();
1240 for pos in &mut walker.positions {
1241 pos.start_offset += offset;
1242 pos.end_offset += offset;
1243 }
1244 s.push_str(LEAF_SEPARATOR);
1245 s.push_str(&walker.blob);
1246 Some(s)
1247 }
1248 };
1249
1250 (combined, walker.positions, walker.stats)
1251}
1252
1253pub(crate) fn extract_property_fts_columns(
1257 props: &serde_json::Value,
1258 schema: &PropertyFtsSchema,
1259) -> Vec<(String, String)> {
1260 let mut result = Vec::new();
1261 for entry in &schema.paths {
1262 let is_recursive = matches!(entry.mode, PropertyPathMode::Recursive);
1263 let column_name = fathomdb_schema::fts_column_name(&entry.path, is_recursive);
1264 let text = match entry.mode {
1265 PropertyPathMode::Scalar => {
1266 let parts = extract_json_path(props, &entry.path);
1267 parts.join(&schema.separator)
1268 }
1269 PropertyPathMode::Recursive => {
1270 let mut walker = RecursiveWalker {
1271 blob: String::new(),
1272 positions: Vec::new(),
1273 stats: ExtractStats::default(),
1274 exclude_paths: schema.exclude_paths.clone(),
1275 stopped: false,
1276 };
1277 if let Some(root) = resolve_path_root(props, &entry.path) {
1278 walker.walk(&entry.path, root, 0);
1279 }
1280 walker.blob
1281 }
1282 };
1283 result.push((column_name, text));
1284 }
1285 result
1286}
1287
1288fn resolve_path_root<'a>(
1289 value: &'a serde_json::Value,
1290 path: &str,
1291) -> Option<&'a serde_json::Value> {
1292 let stripped = path.strip_prefix("$.")?;
1293 let mut current = value;
1294 for segment in stripped.split('.') {
1295 current = current.get(segment)?;
1296 }
1297 Some(current)
1298}
1299
1300struct RecursiveWalker {
1301 blob: String,
1302 positions: Vec<PositionEntry>,
1303 stats: ExtractStats,
1304 exclude_paths: Vec<String>,
1305 stopped: bool,
1309}
1310
1311impl RecursiveWalker {
1312 fn walk(&mut self, current_path: &str, value: &serde_json::Value, depth: usize) {
1313 if self.stopped {
1314 return;
1315 }
1316 if self.exclude_paths.iter().any(|p| p == current_path) {
1317 self.stats.excluded_subtree += 1;
1318 return;
1319 }
1320 match value {
1321 serde_json::Value::String(s) => self.emit_leaf(current_path, s),
1322 serde_json::Value::Number(n) => self.emit_leaf(current_path, &n.to_string()),
1323 serde_json::Value::Bool(b) => self.emit_leaf(current_path, &b.to_string()),
1324 serde_json::Value::Null => {}
1325 serde_json::Value::Object(map) => {
1326 if depth >= MAX_RECURSIVE_DEPTH {
1327 self.stats.depth_cap_hit += 1;
1328 return;
1329 }
1330 let mut keys: Vec<&String> = map.keys().collect();
1331 keys.sort();
1332 for key in keys {
1333 if self.stopped {
1334 return;
1335 }
1336 let child_path = format!("{current_path}.{key}");
1337 if let Some(child) = map.get(key) {
1338 self.walk(&child_path, child, depth + 1);
1339 }
1340 }
1341 }
1342 serde_json::Value::Array(items) => {
1343 if depth >= MAX_RECURSIVE_DEPTH {
1344 self.stats.depth_cap_hit += 1;
1345 return;
1346 }
1347 for (idx, item) in items.iter().enumerate() {
1348 if self.stopped {
1349 return;
1350 }
1351 let child_path = format!("{current_path}[{idx}]");
1352 self.walk(&child_path, item, depth + 1);
1353 }
1354 }
1355 }
1356 }
1357
1358 fn emit_leaf(&mut self, leaf_path: &str, value: &str) {
1359 if self.stopped {
1360 return;
1361 }
1362 if value.is_empty() {
1363 return;
1364 }
1365 let sep_len = if self.blob.is_empty() {
1369 0
1370 } else {
1371 LEAF_SEPARATOR.len()
1372 };
1373 let projected_len = self.blob.len() + sep_len + value.len();
1374 if projected_len > MAX_EXTRACTED_BYTES {
1375 self.stats.byte_cap_reached = true;
1376 self.stopped = true;
1377 return;
1378 }
1379 if !self.blob.is_empty() {
1380 self.blob.push_str(LEAF_SEPARATOR);
1381 }
1382 let start_offset = self.blob.len();
1383 self.blob.push_str(value);
1384 let end_offset = self.blob.len();
1385 self.positions.push(PositionEntry {
1386 start_offset,
1387 end_offset,
1388 leaf_path: leaf_path.to_owned(),
1389 });
1390 }
1391}
1392
1393pub(crate) fn load_fts_property_schemas(
1399 conn: &rusqlite::Connection,
1400) -> Result<Vec<(String, PropertyFtsSchema)>, rusqlite::Error> {
1401 let mut stmt =
1402 conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
1403 stmt.query_map([], |row| {
1404 let kind: String = row.get(0)?;
1405 let paths_json: String = row.get(1)?;
1406 let separator: String = row.get(2)?;
1407 let schema = parse_property_schema_json(&paths_json, &separator);
1408 Ok((kind, schema))
1409 })?
1410 .collect::<Result<Vec<_>, _>>()
1411}
1412
1413pub(crate) fn parse_property_schema_json(paths_json: &str, separator: &str) -> PropertyFtsSchema {
1414 let value: serde_json::Value = serde_json::from_str(paths_json).unwrap_or_default();
1415 let mut paths = Vec::new();
1416 let mut exclude_paths: Vec<String> = Vec::new();
1417
1418 let path_values: Vec<serde_json::Value> = match value {
1419 serde_json::Value::Array(arr) => arr,
1420 serde_json::Value::Object(map) => {
1421 if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1422 exclude_paths = excl
1423 .iter()
1424 .filter_map(|v| v.as_str().map(str::to_owned))
1425 .collect();
1426 }
1427 match map.get("paths") {
1428 Some(serde_json::Value::Array(arr)) => arr.clone(),
1429 _ => Vec::new(),
1430 }
1431 }
1432 _ => Vec::new(),
1433 };
1434
1435 for entry in path_values {
1436 match entry {
1437 serde_json::Value::String(path) => {
1438 paths.push(PropertyPathEntry::scalar(path));
1439 }
1440 serde_json::Value::Object(map) => {
1441 let Some(path) = map.get("path").and_then(|v| v.as_str()) else {
1442 continue;
1443 };
1444 let mode = map.get("mode").and_then(|v| v.as_str()).map_or(
1445 PropertyPathMode::Scalar,
1446 |m| match m {
1447 "recursive" => PropertyPathMode::Recursive,
1448 _ => PropertyPathMode::Scalar,
1449 },
1450 );
1451 #[allow(clippy::cast_possible_truncation)]
1452 let weight = map
1453 .get("weight")
1454 .and_then(serde_json::Value::as_f64)
1455 .map(|w| w as f32);
1456 paths.push(PropertyPathEntry {
1457 path: path.to_owned(),
1458 mode,
1459 weight,
1460 });
1461 if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1462 for p in excl {
1463 if let Some(s) = p.as_str() {
1464 exclude_paths.push(s.to_owned());
1465 }
1466 }
1467 }
1468 }
1469 _ => {}
1470 }
1471 }
1472
1473 PropertyFtsSchema {
1474 paths,
1475 separator: separator.to_owned(),
1476 exclude_paths,
1477 }
1478}
1479
1480fn resolve_operational_writes(
1481 conn: &rusqlite::Connection,
1482 prepared: &mut PreparedWrite,
1483) -> Result<(), EngineError> {
1484 let mut collection_kinds = HashMap::new();
1485 let mut collection_filter_fields = HashMap::new();
1486 let mut collection_validation_contracts = HashMap::new();
1487 for write in &prepared.operational_writes {
1488 let collection = operational_write_collection(write);
1489 if !collection_kinds.contains_key(collection) {
1490 let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1491 .query_row(
1492 "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1493 params![collection],
1494 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1495 )
1496 .optional()
1497 .map_err(EngineError::Sqlite)?;
1498 let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1499 .ok_or_else(|| {
1500 EngineError::InvalidWrite(format!(
1501 "operational collection '{collection}' is not registered"
1502 ))
1503 })?;
1504 if disabled_at.is_some() {
1505 return Err(EngineError::InvalidWrite(format!(
1506 "operational collection '{collection}' is disabled"
1507 )));
1508 }
1509 let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1510 .map_err(EngineError::InvalidWrite)?;
1511 let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1512 let validation_contract = parse_operational_validation_contract(&validation_json)
1513 .map_err(EngineError::InvalidWrite)?;
1514 collection_kinds.insert(collection.to_owned(), kind);
1515 collection_filter_fields.insert(collection.to_owned(), filter_fields);
1516 collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1517 }
1518
1519 let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1520 EngineError::InvalidWrite("missing operational collection kind".to_owned())
1521 })?;
1522 match (kind, write) {
1523 (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1524 | (
1525 OperationalCollectionKind::LatestState,
1526 OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1527 ) => {}
1528 (OperationalCollectionKind::AppendOnlyLog, _) => {
1529 return Err(EngineError::InvalidWrite(format!(
1530 "operational collection '{collection}' is append_only_log and only accepts Append"
1531 )));
1532 }
1533 (OperationalCollectionKind::LatestState, _) => {
1534 return Err(EngineError::InvalidWrite(format!(
1535 "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1536 )));
1537 }
1538 }
1539 if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1540 let _ = check_operational_write_against_contract(write, contract)?;
1541 }
1542 }
1543 prepared.operational_collection_kinds = collection_kinds;
1544 prepared.operational_collection_filter_fields = collection_filter_fields;
1545 Ok(())
1546}
1547
1548fn parse_operational_filter_fields(
1549 filter_fields_json: &str,
1550) -> Result<Vec<OperationalFilterField>, EngineError> {
1551 let fields: Vec<OperationalFilterField> =
1552 serde_json::from_str(filter_fields_json).map_err(|error| {
1553 EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1554 })?;
1555 let mut seen = std::collections::HashSet::new();
1556 for field in &fields {
1557 if field.name.trim().is_empty() {
1558 return Err(EngineError::InvalidWrite(
1559 "filter_fields_json field names must not be empty".to_owned(),
1560 ));
1561 }
1562 if !seen.insert(field.name.as_str()) {
1563 return Err(EngineError::InvalidWrite(format!(
1564 "filter_fields_json contains duplicate field '{}'",
1565 field.name
1566 )));
1567 }
1568 if field.modes.is_empty() {
1569 return Err(EngineError::InvalidWrite(format!(
1570 "filter_fields_json field '{}' must declare at least one mode",
1571 field.name
1572 )));
1573 }
1574 if field.modes.contains(&OperationalFilterMode::Prefix)
1575 && field.field_type != OperationalFilterFieldType::String
1576 {
1577 return Err(EngineError::InvalidWrite(format!(
1578 "filter field '{}' only supports prefix for string types",
1579 field.name
1580 )));
1581 }
1582 }
1583 Ok(fields)
1584}
1585
1586#[derive(Clone, Debug, PartialEq, Eq)]
1587struct OperationalFilterValueRow {
1588 field_name: String,
1589 string_value: Option<String>,
1590 integer_value: Option<i64>,
1591}
1592
1593fn extract_operational_filter_values(
1594 filter_fields: &[OperationalFilterField],
1595 payload_json: &str,
1596) -> Vec<OperationalFilterValueRow> {
1597 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1598 return Vec::new();
1599 };
1600 let Some(object) = parsed.as_object() else {
1601 return Vec::new();
1602 };
1603
1604 filter_fields
1605 .iter()
1606 .filter_map(|field| {
1607 let value = object.get(&field.name)?;
1608 match field.field_type {
1609 OperationalFilterFieldType::String => {
1610 value
1611 .as_str()
1612 .map(|string_value| OperationalFilterValueRow {
1613 field_name: field.name.clone(),
1614 string_value: Some(string_value.to_owned()),
1615 integer_value: None,
1616 })
1617 }
1618 OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1619 value
1620 .as_i64()
1621 .map(|integer_value| OperationalFilterValueRow {
1622 field_name: field.name.clone(),
1623 string_value: None,
1624 integer_value: Some(integer_value),
1625 })
1626 }
1627 }
1628 })
1629 .collect()
1630}
1631
1632fn resolve_and_apply(
1633 conn: &mut rusqlite::Connection,
1634 prepared: &mut PreparedWrite,
1635) -> Result<WriteReceipt, EngineError> {
1636 resolve_fts_rows(conn, prepared)?;
1637 resolve_property_fts_rows(conn, prepared)?;
1638 resolve_operational_writes(conn, prepared)?;
1639 apply_write(conn, prepared)
1640}
1641
1642fn apply_touch_last_accessed(
1643 conn: &mut rusqlite::Connection,
1644 request: &LastAccessTouchRequest,
1645) -> Result<LastAccessTouchReport, EngineError> {
1646 let mut seen = std::collections::HashSet::new();
1647 let logical_ids = request
1648 .logical_ids
1649 .iter()
1650 .filter(|logical_id| seen.insert(logical_id.as_str()))
1651 .cloned()
1652 .collect::<Vec<_>>();
1653 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1654
1655 for logical_id in &logical_ids {
1656 let exists = tx
1657 .query_row(
1658 "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1659 params![logical_id],
1660 |row| row.get::<_, i64>(0),
1661 )
1662 .optional()?
1663 .is_some();
1664 if !exists {
1665 return Err(EngineError::InvalidWrite(format!(
1666 "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1667 )));
1668 }
1669 }
1670
1671 {
1672 let mut upsert_metadata = tx.prepare_cached(
1673 "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1674 VALUES (?1, ?2, ?2) \
1675 ON CONFLICT(logical_id) DO UPDATE SET \
1676 last_accessed_at = excluded.last_accessed_at, \
1677 updated_at = excluded.updated_at",
1678 )?;
1679 let mut insert_provenance = tx.prepare_cached(
1680 "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1681 VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1682 )?;
1683 for logical_id in &logical_ids {
1684 upsert_metadata.execute(params![logical_id, request.touched_at])?;
1685 insert_provenance.execute(params![
1686 new_id(),
1687 logical_id,
1688 request.source_ref.as_deref(),
1689 format!("{{\"touched_at\":{}}}", request.touched_at),
1690 ])?;
1691 }
1692 }
1693
1694 tx.commit()?;
1695 Ok(LastAccessTouchReport {
1696 touched_logical_ids: logical_ids.len(),
1697 touched_at: request.touched_at,
1698 })
1699}
1700
1701fn ensure_operational_collections_writable(
1702 tx: &rusqlite::Transaction<'_>,
1703 prepared: &PreparedWrite,
1704) -> Result<(), EngineError> {
1705 for collection in prepared.operational_collection_kinds.keys() {
1706 let disabled_at: Option<Option<i64>> = tx
1707 .query_row(
1708 "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1709 params![collection],
1710 |row| row.get::<_, Option<i64>>(0),
1711 )
1712 .optional()?;
1713 match disabled_at {
1714 Some(Some(_)) => {
1715 return Err(EngineError::InvalidWrite(format!(
1716 "operational collection '{collection}' is disabled"
1717 )));
1718 }
1719 Some(None) => {}
1720 None => {
1721 return Err(EngineError::InvalidWrite(format!(
1722 "operational collection '{collection}' is not registered"
1723 )));
1724 }
1725 }
1726 }
1727 Ok(())
1728}
1729
1730fn validate_operational_writes_against_live_contracts(
1731 tx: &rusqlite::Transaction<'_>,
1732 prepared: &PreparedWrite,
1733) -> Result<Vec<String>, EngineError> {
1734 let mut collection_validation_contracts =
1735 HashMap::<String, Option<OperationalValidationContract>>::new();
1736 for collection in prepared.operational_collection_kinds.keys() {
1737 let validation_json: String = tx
1738 .query_row(
1739 "SELECT validation_json FROM operational_collections WHERE name = ?1",
1740 params![collection],
1741 |row| row.get(0),
1742 )
1743 .map_err(EngineError::Sqlite)?;
1744 let validation_contract = parse_operational_validation_contract(&validation_json)
1745 .map_err(EngineError::InvalidWrite)?;
1746 collection_validation_contracts.insert(collection.clone(), validation_contract);
1747 }
1748
1749 let mut warnings = Vec::new();
1750 for write in &prepared.operational_writes {
1751 if let Some(Some(contract)) =
1752 collection_validation_contracts.get(operational_write_collection(write))
1753 && let Some(warning) = check_operational_write_against_contract(write, contract)?
1754 {
1755 warnings.push(warning);
1756 }
1757 }
1758
1759 Ok(warnings)
1760}
1761
1762fn load_live_operational_secondary_indexes(
1763 tx: &rusqlite::Transaction<'_>,
1764 prepared: &PreparedWrite,
1765) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1766 let mut collection_indexes = HashMap::new();
1767 for (collection, collection_kind) in &prepared.operational_collection_kinds {
1768 let secondary_indexes_json: String = tx
1769 .query_row(
1770 "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1771 params![collection],
1772 |row| row.get(0),
1773 )
1774 .map_err(EngineError::Sqlite)?;
1775 let indexes =
1776 parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1777 .map_err(EngineError::InvalidWrite)?;
1778 collection_indexes.insert(collection.clone(), indexes);
1779 }
1780 Ok(collection_indexes)
1781}
1782
1783fn check_operational_write_against_contract(
1784 write: &OperationalWrite,
1785 contract: &OperationalValidationContract,
1786) -> Result<Option<String>, EngineError> {
1787 if contract.mode == OperationalValidationMode::Disabled {
1788 return Ok(None);
1789 }
1790
1791 let (payload_json, collection, record_key) = match write {
1792 OperationalWrite::Append {
1793 collection,
1794 record_key,
1795 payload_json,
1796 ..
1797 }
1798 | OperationalWrite::Put {
1799 collection,
1800 record_key,
1801 payload_json,
1802 ..
1803 } => (
1804 payload_json.as_str(),
1805 collection.as_str(),
1806 record_key.as_str(),
1807 ),
1808 OperationalWrite::Delete { .. } => return Ok(None),
1809 };
1810
1811 match validate_operational_payload_against_contract(contract, payload_json) {
1812 Ok(()) => Ok(None),
1813 Err(message) => match contract.mode {
1814 OperationalValidationMode::Disabled => Ok(None),
1815 OperationalValidationMode::ReportOnly => Ok(Some(format!(
1816 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1817 kind = operational_write_kind(write)
1818 ))),
1819 OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1820 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1821 kind = operational_write_kind(write)
1822 ))),
1823 },
1824 }
1825}
1826
1827#[allow(clippy::too_many_lines)]
1828fn apply_write(
1829 conn: &mut rusqlite::Connection,
1830 prepared: &mut PreparedWrite,
1831) -> Result<WriteReceipt, EngineError> {
1832 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1833
1834 {
1837 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1838 let mut del_prop_positions = tx
1839 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1840 let mut del_staging = tx.prepare_cached(
1843 "DELETE FROM fts_property_rebuild_staging WHERE node_logical_id = ?1",
1844 )?;
1845 let mut sup_node = tx.prepare_cached(
1846 "UPDATE nodes SET superseded_at = unixepoch() \
1847 WHERE logical_id = ?1 AND superseded_at IS NULL",
1848 )?;
1849 let mut ins_event = tx.prepare_cached(
1850 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1851 VALUES (?1, 'node_retire', ?2, ?3)",
1852 )?;
1853 for retire in &prepared.node_retires {
1854 del_fts.execute(params![retire.logical_id])?;
1855 let kind_opt: Option<String> = tx
1857 .query_row(
1858 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1859 params![retire.logical_id],
1860 |r| r.get(0),
1861 )
1862 .optional()?;
1863 if let Some(kind) = kind_opt {
1864 let table = fathomdb_schema::fts_kind_table_name(&kind);
1865 let table_exists: bool = tx
1867 .query_row(
1868 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1869 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1870 rusqlite::params![table],
1871 |r| r.get::<_, i64>(0),
1872 )
1873 .unwrap_or(0)
1874 > 0;
1875 if table_exists {
1876 tx.execute(
1877 &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1878 params![retire.logical_id],
1879 )?;
1880 }
1881 }
1882 del_prop_positions.execute(params![retire.logical_id])?;
1883 del_staging.execute(params![retire.logical_id])?;
1884 sup_node.execute(params![retire.logical_id])?;
1885 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1886 }
1887 }
1888
1889 {
1891 let mut sup_edge = tx.prepare_cached(
1892 "UPDATE edges SET superseded_at = unixepoch() \
1893 WHERE logical_id = ?1 AND superseded_at IS NULL",
1894 )?;
1895 let mut ins_event = tx.prepare_cached(
1896 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1897 VALUES (?1, 'edge_retire', ?2, ?3)",
1898 )?;
1899 for retire in &prepared.edge_retires {
1900 sup_edge.execute(params![retire.logical_id])?;
1901 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1902 }
1903 }
1904
1905 {
1907 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1908 let mut del_prop_positions = tx
1909 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1910 let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1911 let mut sup_node = tx.prepare_cached(
1912 "UPDATE nodes SET superseded_at = unixepoch() \
1913 WHERE logical_id = ?1 AND superseded_at IS NULL",
1914 )?;
1915 let mut ins_node = tx.prepare_cached(
1916 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1917 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1918 )?;
1919 for node in &prepared.nodes {
1920 if node.upsert {
1921 let table = fathomdb_schema::fts_kind_table_name(&node.kind);
1924 let table_exists: bool = tx
1926 .query_row(
1927 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1928 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1929 rusqlite::params![table],
1930 |r| r.get::<_, i64>(0),
1931 )
1932 .unwrap_or(0)
1933 > 0;
1934 if table_exists {
1935 tx.execute(
1936 &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1937 params![node.logical_id],
1938 )?;
1939 }
1940 del_prop_positions.execute(params![node.logical_id])?;
1941 if node.chunk_policy == ChunkPolicy::Replace {
1942 #[cfg(feature = "sqlite-vec")]
1944 {
1945 let vec_table = fathomdb_schema::vec_kind_table_name(&node.kind);
1946 let del_sql = format!(
1947 "DELETE FROM {vec_table} WHERE chunk_id IN \
1948 (SELECT id FROM chunks WHERE node_logical_id = ?1)"
1949 );
1950 match tx.execute(&del_sql, params![node.logical_id]) {
1951 Ok(_) => {}
1952 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {}
1953 Err(e) => return Err(e.into()),
1954 }
1955 }
1956 del_fts.execute(params![node.logical_id])?;
1957 del_chunks.execute(params![node.logical_id])?;
1958 }
1959 sup_node.execute(params![node.logical_id])?;
1960 }
1961 ins_node.execute(params![
1962 node.row_id,
1963 node.logical_id,
1964 node.kind,
1965 node.properties,
1966 node.source_ref,
1967 node.content_ref,
1968 ])?;
1969 }
1970 }
1971
1972 {
1974 let mut sup_edge = tx.prepare_cached(
1975 "UPDATE edges SET superseded_at = unixepoch() \
1976 WHERE logical_id = ?1 AND superseded_at IS NULL",
1977 )?;
1978 let mut ins_edge = tx.prepare_cached(
1979 "INSERT INTO edges \
1980 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1981 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1982 )?;
1983 for edge in &prepared.edges {
1984 if edge.upsert {
1985 sup_edge.execute(params![edge.logical_id])?;
1986 }
1987 ins_edge.execute(params![
1988 edge.row_id,
1989 edge.logical_id,
1990 edge.source_logical_id,
1991 edge.target_logical_id,
1992 edge.kind,
1993 edge.properties,
1994 edge.source_ref,
1995 ])?;
1996 }
1997 }
1998
1999 {
2001 let mut ins_chunk = tx.prepare_cached(
2002 "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
2003 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
2004 )?;
2005 for chunk in &prepared.chunks {
2006 ins_chunk.execute(params![
2007 chunk.id,
2008 chunk.node_logical_id,
2009 chunk.text_content,
2010 chunk.byte_start,
2011 chunk.byte_end,
2012 chunk.content_hash,
2013 ])?;
2014 }
2015 }
2016
2017 {
2019 let mut sup_run = tx.prepare_cached(
2020 "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
2021 )?;
2022 let mut ins_run = tx.prepare_cached(
2023 "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
2024 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
2025 )?;
2026 for run in &prepared.runs {
2027 if run.upsert
2028 && let Some(ref prior_id) = run.supersedes_id
2029 {
2030 sup_run.execute(params![prior_id])?;
2031 }
2032 ins_run.execute(params![
2033 run.id,
2034 run.kind,
2035 run.status,
2036 run.properties,
2037 run.source_ref
2038 ])?;
2039 }
2040 }
2041
2042 {
2044 let mut sup_step = tx.prepare_cached(
2045 "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
2046 )?;
2047 let mut ins_step = tx.prepare_cached(
2048 "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
2049 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
2050 )?;
2051 for step in &prepared.steps {
2052 if step.upsert
2053 && let Some(ref prior_id) = step.supersedes_id
2054 {
2055 sup_step.execute(params![prior_id])?;
2056 }
2057 ins_step.execute(params![
2058 step.id,
2059 step.run_id,
2060 step.kind,
2061 step.status,
2062 step.properties,
2063 step.source_ref,
2064 ])?;
2065 }
2066 }
2067
2068 {
2070 let mut sup_action = tx.prepare_cached(
2071 "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
2072 )?;
2073 let mut ins_action = tx.prepare_cached(
2074 "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
2075 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
2076 )?;
2077 for action in &prepared.actions {
2078 if action.upsert
2079 && let Some(ref prior_id) = action.supersedes_id
2080 {
2081 sup_action.execute(params![prior_id])?;
2082 }
2083 ins_action.execute(params![
2084 action.id,
2085 action.step_id,
2086 action.kind,
2087 action.status,
2088 action.properties,
2089 action.source_ref,
2090 ])?;
2091 }
2092 }
2093
2094 {
2096 ensure_operational_collections_writable(&tx, prepared)?;
2097 prepared.operational_validation_warnings =
2098 validate_operational_writes_against_live_contracts(&tx, prepared)?;
2099 let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
2100
2101 let mut next_mutation_order: i64 = tx.query_row(
2102 "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
2103 [],
2104 |row| row.get(0),
2105 )?;
2106 let mut ins_mutation = tx.prepare_cached(
2107 "INSERT INTO operational_mutations \
2108 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2109 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
2110 )?;
2111 let mut ins_filter_value = tx.prepare_cached(
2112 "INSERT INTO operational_filter_values \
2113 (mutation_id, collection_name, field_name, string_value, integer_value) \
2114 VALUES (?1, ?2, ?3, ?4, ?5)",
2115 )?;
2116 let mut upsert_current = tx.prepare_cached(
2117 "INSERT INTO operational_current \
2118 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2119 VALUES (?1, ?2, ?3, unixepoch(), ?4) \
2120 ON CONFLICT(collection_name, record_key) DO UPDATE SET \
2121 payload_json = excluded.payload_json, \
2122 updated_at = excluded.updated_at, \
2123 last_mutation_id = excluded.last_mutation_id",
2124 )?;
2125 let mut del_current = tx.prepare_cached(
2126 "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
2127 )?;
2128 let mut del_current_secondary_indexes = tx.prepare_cached(
2129 "DELETE FROM operational_secondary_index_entries \
2130 WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
2131 )?;
2132 let mut ins_secondary_index = tx.prepare_cached(
2133 "INSERT INTO operational_secondary_index_entries \
2134 (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
2135 slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
2136 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
2137 )?;
2138 let mut current_row_stmt = tx.prepare_cached(
2139 "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
2140 WHERE collection_name = ?1 AND record_key = ?2",
2141 )?;
2142
2143 for write in &prepared.operational_writes {
2144 let collection = operational_write_collection(write);
2145 let record_key = operational_write_record_key(write);
2146 let mutation_id = new_id();
2147 next_mutation_order += 1;
2148 let payload_json = operational_write_payload(write);
2149 ins_mutation.execute(params![
2150 &mutation_id,
2151 collection,
2152 record_key,
2153 operational_write_kind(write),
2154 payload_json,
2155 operational_write_source_ref(write),
2156 next_mutation_order,
2157 ])?;
2158 if let Some(indexes) = collection_secondary_indexes.get(collection) {
2159 for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
2160 ins_secondary_index.execute(params![
2161 collection,
2162 entry.index_name,
2163 "mutation",
2164 &mutation_id,
2165 record_key,
2166 entry.sort_timestamp,
2167 entry.slot1_text,
2168 entry.slot1_integer,
2169 entry.slot2_text,
2170 entry.slot2_integer,
2171 entry.slot3_text,
2172 entry.slot3_integer,
2173 ])?;
2174 }
2175 }
2176 if let Some(filter_fields) = prepared
2177 .operational_collection_filter_fields
2178 .get(collection)
2179 {
2180 for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
2181 ins_filter_value.execute(params![
2182 &mutation_id,
2183 collection,
2184 filter_value.field_name,
2185 filter_value.string_value,
2186 filter_value.integer_value,
2187 ])?;
2188 }
2189 }
2190
2191 if prepared.operational_collection_kinds.get(collection)
2192 == Some(&OperationalCollectionKind::LatestState)
2193 {
2194 del_current_secondary_indexes.execute(params![collection, record_key])?;
2195 match write {
2196 OperationalWrite::Put { payload_json, .. } => {
2197 upsert_current.execute(params![
2198 collection,
2199 record_key,
2200 payload_json,
2201 &mutation_id,
2202 ])?;
2203 if let Some(indexes) = collection_secondary_indexes.get(collection) {
2204 let (current_payload_json, updated_at, last_mutation_id): (
2205 String,
2206 i64,
2207 String,
2208 ) = current_row_stmt
2209 .query_row(params![collection, record_key], |row| {
2210 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
2211 })?;
2212 for entry in extract_secondary_index_entries_for_current(
2213 indexes,
2214 ¤t_payload_json,
2215 updated_at,
2216 ) {
2217 ins_secondary_index.execute(params![
2218 collection,
2219 entry.index_name,
2220 "current",
2221 last_mutation_id.as_str(),
2222 record_key,
2223 entry.sort_timestamp,
2224 entry.slot1_text,
2225 entry.slot1_integer,
2226 entry.slot2_text,
2227 entry.slot2_integer,
2228 entry.slot3_text,
2229 entry.slot3_integer,
2230 ])?;
2231 }
2232 }
2233 }
2234 OperationalWrite::Delete { .. } => {
2235 del_current.execute(params![collection, record_key])?;
2236 }
2237 OperationalWrite::Append { .. } => {}
2238 }
2239 }
2240 }
2241 }
2242
2243 {
2245 let mut ins_fts = tx.prepare_cached(
2246 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
2247 VALUES (?1, ?2, ?3, ?4)",
2248 )?;
2249 for fts_row in &prepared.required_fts_rows {
2250 ins_fts.execute(params![
2251 fts_row.chunk_id,
2252 fts_row.node_logical_id,
2253 fts_row.kind,
2254 fts_row.text_content,
2255 ])?;
2256 }
2257 }
2258
2259 if !prepared.required_property_fts_rows.is_empty() {
2261 let mut ins_positions = tx.prepare_cached(
2262 "INSERT INTO fts_node_property_positions \
2263 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
2264 VALUES (?1, ?2, ?3, ?4, ?5)",
2265 )?;
2266 let mut del_positions = tx
2269 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
2270 for row in &prepared.required_property_fts_rows {
2271 del_positions.execute(params![row.node_logical_id])?;
2272 let table = fathomdb_schema::fts_kind_table_name(&row.kind);
2274 if row.columns.is_empty() {
2275 tx.execute_batch(&format!(
2278 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
2279 node_logical_id UNINDEXED, text_content, \
2280 tokenize = '{DEFAULT_FTS_TOKENIZER}'\
2281 )"
2282 ))?;
2283 tx.prepare(&format!(
2284 "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
2285 ))?
2286 .execute(params![row.node_logical_id, row.text_content])?;
2287 } else {
2288 let col_names: Vec<&str> = row.columns.iter().map(|(n, _)| n.as_str()).collect();
2290 let placeholders: Vec<String> = (2..=row.columns.len() + 1)
2291 .map(|i| format!("?{i}"))
2292 .collect();
2293 let sql = format!(
2294 "INSERT INTO {table}(node_logical_id, {cols}) VALUES (?1, {placeholders})",
2295 cols = col_names.join(", "),
2296 placeholders = placeholders.join(", "),
2297 );
2298 let mut stmt = tx.prepare(&sql)?;
2299 stmt.execute(rusqlite::params_from_iter(
2300 std::iter::once(row.node_logical_id.as_str())
2301 .chain(row.columns.iter().map(|(_, v)| v.as_str())),
2302 ))?;
2303 }
2304 for pos in &row.positions {
2305 ins_positions.execute(params![
2306 row.node_logical_id,
2307 row.kind,
2308 i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
2309 i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
2310 pos.leaf_path,
2311 ])?;
2312 }
2313 }
2314 }
2315
2316 if !prepared.required_property_fts_rows.is_empty() {
2321 let mut ins_staging_simple = tx.prepare_cached(
2322 "INSERT INTO fts_property_rebuild_staging \
2323 (kind, node_logical_id, text_content) \
2324 VALUES (?1, ?2, ?3) \
2325 ON CONFLICT(kind, node_logical_id) DO UPDATE \
2326 SET text_content = excluded.text_content",
2327 )?;
2328 let mut ins_staging_columns = tx.prepare_cached(
2329 "INSERT INTO fts_property_rebuild_staging \
2330 (kind, node_logical_id, text_content, columns_json) \
2331 VALUES (?1, ?2, ?3, ?4) \
2332 ON CONFLICT(kind, node_logical_id) DO UPDATE \
2333 SET text_content = excluded.text_content, \
2334 columns_json = excluded.columns_json",
2335 )?;
2336 let mut check_rebuild = tx.prepare_cached(
2337 "SELECT 1 FROM fts_property_rebuild_state \
2338 WHERE kind = ?1 AND state IN ('PENDING','BUILDING','SWAPPING') LIMIT 1",
2339 )?;
2340 for row in &prepared.required_property_fts_rows {
2341 let in_rebuild: bool = check_rebuild
2342 .query_row(params![row.kind], |_| Ok(true))
2343 .optional()?
2344 .unwrap_or(false);
2345 if in_rebuild {
2346 if row.columns.is_empty() {
2347 ins_staging_simple.execute(params![
2348 row.kind,
2349 row.node_logical_id,
2350 row.text_content
2351 ])?;
2352 } else {
2353 let json_map: serde_json::Map<String, serde_json::Value> = row
2354 .columns
2355 .iter()
2356 .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
2357 .collect();
2358 let columns_json =
2359 serde_json::to_string(&serde_json::Value::Object(json_map)).ok();
2360 ins_staging_columns.execute(params![
2361 row.kind,
2362 row.node_logical_id,
2363 row.text_content,
2364 columns_json
2365 ])?;
2366 }
2367 }
2368 }
2369 }
2370
2371 #[cfg(feature = "sqlite-vec")]
2374 {
2375 let chunk_to_node: std::collections::HashMap<&str, &str> = prepared
2377 .chunks
2378 .iter()
2379 .map(|c| (c.id.as_str(), c.node_logical_id.as_str()))
2380 .collect();
2381
2382 for vi in &prepared.vec_inserts {
2383 let kind: Option<String> = chunk_to_node
2385 .get(vi.chunk_id.as_str())
2386 .and_then(|lid| prepared.node_kinds.get(*lid))
2387 .cloned()
2388 .or_else(|| {
2389 tx.query_row(
2390 "SELECT n.kind FROM chunks c \
2391 JOIN nodes n ON n.logical_id = c.node_logical_id \
2392 WHERE c.id = ?1 LIMIT 1",
2393 rusqlite::params![vi.chunk_id],
2394 |row| row.get::<_, String>(0),
2395 )
2396 .optional()
2397 .unwrap_or(None)
2398 });
2399
2400 let Some(kind) = kind else {
2401 continue;
2403 };
2404
2405 let table_name = fathomdb_schema::vec_kind_table_name(&kind);
2406 let dimension = vi.embedding.len();
2407 let bytes: Vec<u8> = vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
2408 tx.execute_batch(&format!(
2410 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
2411 chunk_id TEXT PRIMARY KEY,\
2412 embedding float[{dimension}]\
2413 )"
2414 ))?;
2415 tx.execute(
2416 &format!(
2417 "INSERT OR REPLACE INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
2418 ),
2419 rusqlite::params![vi.chunk_id, bytes],
2420 )?;
2421 }
2422 }
2423
2424 tx.commit()?;
2425
2426 let provenance_warnings: Vec<String> = prepared
2427 .nodes
2428 .iter()
2429 .filter(|node| node.source_ref.is_none())
2430 .map(|node| format!("node '{}' has no source_ref", node.logical_id))
2431 .chain(
2432 prepared
2433 .node_retires
2434 .iter()
2435 .filter(|r| r.source_ref.is_none())
2436 .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
2437 )
2438 .chain(
2439 prepared
2440 .edges
2441 .iter()
2442 .filter(|e| e.source_ref.is_none())
2443 .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
2444 )
2445 .chain(
2446 prepared
2447 .edge_retires
2448 .iter()
2449 .filter(|r| r.source_ref.is_none())
2450 .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
2451 )
2452 .chain(
2453 prepared
2454 .runs
2455 .iter()
2456 .filter(|r| r.source_ref.is_none())
2457 .map(|r| format!("run '{}' has no source_ref", r.id)),
2458 )
2459 .chain(
2460 prepared
2461 .steps
2462 .iter()
2463 .filter(|s| s.source_ref.is_none())
2464 .map(|s| format!("step '{}' has no source_ref", s.id)),
2465 )
2466 .chain(
2467 prepared
2468 .actions
2469 .iter()
2470 .filter(|a| a.source_ref.is_none())
2471 .map(|a| format!("action '{}' has no source_ref", a.id)),
2472 )
2473 .chain(
2474 prepared
2475 .operational_writes
2476 .iter()
2477 .filter(|write| operational_write_source_ref(write).is_none())
2478 .map(|write| {
2479 format!(
2480 "operational {} '{}:{}' has no source_ref",
2481 operational_write_kind(write),
2482 operational_write_collection(write),
2483 operational_write_record_key(write)
2484 )
2485 }),
2486 )
2487 .collect();
2488
2489 let mut warnings = provenance_warnings.clone();
2490 warnings.extend(prepared.operational_validation_warnings.clone());
2491
2492 Ok(WriteReceipt {
2493 label: prepared.label.clone(),
2494 optional_backfill_count: prepared.optional_backfills.len(),
2495 warnings,
2496 provenance_warnings,
2497 })
2498}
2499
2500fn operational_write_collection(write: &OperationalWrite) -> &str {
2501 match write {
2502 OperationalWrite::Append { collection, .. }
2503 | OperationalWrite::Put { collection, .. }
2504 | OperationalWrite::Delete { collection, .. } => collection,
2505 }
2506}
2507
2508fn operational_write_record_key(write: &OperationalWrite) -> &str {
2509 match write {
2510 OperationalWrite::Append { record_key, .. }
2511 | OperationalWrite::Put { record_key, .. }
2512 | OperationalWrite::Delete { record_key, .. } => record_key,
2513 }
2514}
2515
2516fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2517 match write {
2518 OperationalWrite::Append { .. } => "append",
2519 OperationalWrite::Put { .. } => "put",
2520 OperationalWrite::Delete { .. } => "delete",
2521 }
2522}
2523
2524fn operational_write_payload(write: &OperationalWrite) -> &str {
2525 match write {
2526 OperationalWrite::Append { payload_json, .. }
2527 | OperationalWrite::Put { payload_json, .. } => payload_json,
2528 OperationalWrite::Delete { .. } => "null",
2529 }
2530}
2531
2532fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2533 match write {
2534 OperationalWrite::Append { source_ref, .. }
2535 | OperationalWrite::Put { source_ref, .. }
2536 | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2537 }
2538}
2539
2540#[cfg(test)]
2541#[allow(clippy::expect_used)]
2542mod tests {
2543 use std::sync::Arc;
2544
2545 use fathomdb_schema::SchemaManager;
2546 use tempfile::NamedTempFile;
2547
2548 use super::{apply_write, prepare_write, resolve_operational_writes};
2549 use crate::{
2550 ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2551 NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2552 StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2553 projection::ProjectionTarget,
2554 };
2555
2556 #[test]
2557 fn writer_executes_runtime_table_rows() {
2558 let db = NamedTempFile::new().expect("temporary db");
2559 let writer = WriterActor::start(
2560 db.path(),
2561 Arc::new(SchemaManager::new()),
2562 ProvenanceMode::Warn,
2563 Arc::new(TelemetryCounters::default()),
2564 )
2565 .expect("writer");
2566
2567 let receipt = writer
2568 .submit(WriteRequest {
2569 label: "runtime".to_owned(),
2570 nodes: vec![],
2571 node_retires: vec![],
2572 edges: vec![],
2573 edge_retires: vec![],
2574 chunks: vec![],
2575 runs: vec![RunInsert {
2576 id: "run-1".to_owned(),
2577 kind: "session".to_owned(),
2578 status: "completed".to_owned(),
2579 properties: "{}".to_owned(),
2580 source_ref: Some("src-1".to_owned()),
2581 upsert: false,
2582 supersedes_id: None,
2583 }],
2584 steps: vec![StepInsert {
2585 id: "step-1".to_owned(),
2586 run_id: "run-1".to_owned(),
2587 kind: "llm".to_owned(),
2588 status: "completed".to_owned(),
2589 properties: "{}".to_owned(),
2590 source_ref: Some("src-1".to_owned()),
2591 upsert: false,
2592 supersedes_id: None,
2593 }],
2594 actions: vec![ActionInsert {
2595 id: "action-1".to_owned(),
2596 step_id: "step-1".to_owned(),
2597 kind: "emit".to_owned(),
2598 status: "completed".to_owned(),
2599 properties: "{}".to_owned(),
2600 source_ref: Some("src-1".to_owned()),
2601 upsert: false,
2602 supersedes_id: None,
2603 }],
2604 optional_backfills: vec![],
2605 vec_inserts: vec![],
2606 operational_writes: vec![],
2607 })
2608 .expect("write receipt");
2609
2610 assert_eq!(receipt.label, "runtime");
2611 }
2612
2613 #[test]
2614 fn writer_put_operational_write_updates_current_and_mutations() {
2615 let db = NamedTempFile::new().expect("temporary db");
2616 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2617 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2618 conn.execute(
2619 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2620 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2621 [],
2622 )
2623 .expect("seed collection");
2624 drop(conn);
2625 let writer = WriterActor::start(
2626 db.path(),
2627 Arc::new(SchemaManager::new()),
2628 ProvenanceMode::Warn,
2629 Arc::new(TelemetryCounters::default()),
2630 )
2631 .expect("writer");
2632
2633 writer
2634 .submit(WriteRequest {
2635 label: "node-and-operational".to_owned(),
2636 nodes: vec![NodeInsert {
2637 row_id: "row-1".to_owned(),
2638 logical_id: "lg-1".to_owned(),
2639 kind: "Meeting".to_owned(),
2640 properties: "{}".to_owned(),
2641 source_ref: Some("src-1".to_owned()),
2642 upsert: false,
2643 chunk_policy: ChunkPolicy::Preserve,
2644 content_ref: None,
2645 }],
2646 node_retires: vec![],
2647 edges: vec![],
2648 edge_retires: vec![],
2649 chunks: vec![],
2650 runs: vec![],
2651 steps: vec![],
2652 actions: vec![],
2653 optional_backfills: vec![],
2654 vec_inserts: vec![],
2655 operational_writes: vec![OperationalWrite::Put {
2656 collection: "connector_health".to_owned(),
2657 record_key: "gmail".to_owned(),
2658 payload_json: r#"{"status":"ok"}"#.to_owned(),
2659 source_ref: Some("src-1".to_owned()),
2660 }],
2661 })
2662 .expect("write receipt");
2663
2664 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2665 let node_count: i64 = conn
2666 .query_row(
2667 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2668 [],
2669 |row| row.get(0),
2670 )
2671 .expect("node count");
2672 assert_eq!(node_count, 1);
2673 let mutation_count: i64 = conn
2674 .query_row(
2675 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2676 AND record_key = 'gmail'",
2677 [],
2678 |row| row.get(0),
2679 )
2680 .expect("mutation count");
2681 assert_eq!(mutation_count, 1);
2682 let payload: String = conn
2683 .query_row(
2684 "SELECT payload_json FROM operational_current \
2685 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2686 [],
2687 |row| row.get(0),
2688 )
2689 .expect("current payload");
2690 assert_eq!(payload, r#"{"status":"ok"}"#);
2691 }
2692
2693 #[test]
2694 fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2695 let db = NamedTempFile::new().expect("temporary db");
2696 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2697 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2698 conn.execute(
2699 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2700 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2701 [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2702 )
2703 .expect("seed collection");
2704 drop(conn);
2705 let writer = WriterActor::start(
2706 db.path(),
2707 Arc::new(SchemaManager::new()),
2708 ProvenanceMode::Warn,
2709 Arc::new(TelemetryCounters::default()),
2710 )
2711 .expect("writer");
2712
2713 writer
2714 .submit(WriteRequest {
2715 label: "disabled-validation".to_owned(),
2716 nodes: vec![],
2717 node_retires: vec![],
2718 edges: vec![],
2719 edge_retires: vec![],
2720 chunks: vec![],
2721 runs: vec![],
2722 steps: vec![],
2723 actions: vec![],
2724 optional_backfills: vec![],
2725 vec_inserts: vec![],
2726 operational_writes: vec![OperationalWrite::Put {
2727 collection: "connector_health".to_owned(),
2728 record_key: "gmail".to_owned(),
2729 payload_json: r#"{"bogus":true}"#.to_owned(),
2730 source_ref: Some("src-1".to_owned()),
2731 }],
2732 })
2733 .expect("write receipt");
2734
2735 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2736 let payload: String = conn
2737 .query_row(
2738 "SELECT payload_json FROM operational_current \
2739 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2740 [],
2741 |row| row.get(0),
2742 )
2743 .expect("current payload");
2744 assert_eq!(payload, r#"{"bogus":true}"#);
2745 }
2746
2747 #[test]
2748 fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2749 let db = NamedTempFile::new().expect("temporary db");
2750 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2751 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2752 conn.execute(
2753 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2754 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2755 [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2756 )
2757 .expect("seed collection");
2758 drop(conn);
2759 let writer = WriterActor::start(
2760 db.path(),
2761 Arc::new(SchemaManager::new()),
2762 ProvenanceMode::Warn,
2763 Arc::new(TelemetryCounters::default()),
2764 )
2765 .expect("writer");
2766
2767 let receipt = writer
2768 .submit(WriteRequest {
2769 label: "report-only-validation".to_owned(),
2770 nodes: vec![],
2771 node_retires: vec![],
2772 edges: vec![],
2773 edge_retires: vec![],
2774 chunks: vec![],
2775 runs: vec![],
2776 steps: vec![],
2777 actions: vec![],
2778 optional_backfills: vec![],
2779 vec_inserts: vec![],
2780 operational_writes: vec![OperationalWrite::Put {
2781 collection: "connector_health".to_owned(),
2782 record_key: "gmail".to_owned(),
2783 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2784 source_ref: Some("src-1".to_owned()),
2785 }],
2786 })
2787 .expect("report_only write should succeed");
2788
2789 assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2790 assert_eq!(receipt.warnings.len(), 1);
2791 assert!(
2792 receipt.warnings[0].contains("connector_health"),
2793 "warning should identify collection"
2794 );
2795 assert!(
2796 receipt.warnings[0].contains("must be one of"),
2797 "warning should explain validation failure"
2798 );
2799
2800 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2801 let payload: String = conn
2802 .query_row(
2803 "SELECT payload_json FROM operational_current \
2804 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2805 [],
2806 |row| row.get(0),
2807 )
2808 .expect("current payload");
2809 assert_eq!(payload, r#"{"status":"bogus"}"#);
2810 }
2811
2812 #[test]
2813 fn writer_rejects_operational_write_for_missing_collection() {
2814 let db = NamedTempFile::new().expect("temporary db");
2815 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2816 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2817 drop(conn);
2818 let writer = WriterActor::start(
2819 db.path(),
2820 Arc::new(SchemaManager::new()),
2821 ProvenanceMode::Warn,
2822 Arc::new(TelemetryCounters::default()),
2823 )
2824 .expect("writer");
2825
2826 let result = writer.submit(WriteRequest {
2827 label: "missing-operational-collection".to_owned(),
2828 nodes: vec![],
2829 node_retires: vec![],
2830 edges: vec![],
2831 edge_retires: vec![],
2832 chunks: vec![],
2833 runs: vec![],
2834 steps: vec![],
2835 actions: vec![],
2836 optional_backfills: vec![],
2837 vec_inserts: vec![],
2838 operational_writes: vec![OperationalWrite::Put {
2839 collection: "connector_health".to_owned(),
2840 record_key: "gmail".to_owned(),
2841 payload_json: r#"{"status":"ok"}"#.to_owned(),
2842 source_ref: Some("src-1".to_owned()),
2843 }],
2844 });
2845
2846 assert!(
2847 matches!(result, Err(EngineError::InvalidWrite(_))),
2848 "missing operational collection must return InvalidWrite"
2849 );
2850 }
2851
2852 #[test]
2853 fn writer_append_operational_write_records_history_without_current_row() {
2854 let db = NamedTempFile::new().expect("temporary db");
2855 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2856 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2857 conn.execute(
2858 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2859 VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2860 [],
2861 )
2862 .expect("seed collection");
2863 drop(conn);
2864 let writer = WriterActor::start(
2865 db.path(),
2866 Arc::new(SchemaManager::new()),
2867 ProvenanceMode::Warn,
2868 Arc::new(TelemetryCounters::default()),
2869 )
2870 .expect("writer");
2871
2872 writer
2873 .submit(WriteRequest {
2874 label: "append-operational".to_owned(),
2875 nodes: vec![],
2876 node_retires: vec![],
2877 edges: vec![],
2878 edge_retires: vec![],
2879 chunks: vec![],
2880 runs: vec![],
2881 steps: vec![],
2882 actions: vec![],
2883 optional_backfills: vec![],
2884 vec_inserts: vec![],
2885 operational_writes: vec![OperationalWrite::Append {
2886 collection: "audit_log".to_owned(),
2887 record_key: "evt-1".to_owned(),
2888 payload_json: r#"{"type":"sync"}"#.to_owned(),
2889 source_ref: Some("src-1".to_owned()),
2890 }],
2891 })
2892 .expect("write receipt");
2893
2894 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2895 let mutation: (String, String) = conn
2896 .query_row(
2897 "SELECT op_kind, payload_json FROM operational_mutations \
2898 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2899 [],
2900 |row| Ok((row.get(0)?, row.get(1)?)),
2901 )
2902 .expect("mutation row");
2903 assert_eq!(mutation.0, "append");
2904 assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2905 let current_count: i64 = conn
2906 .query_row(
2907 "SELECT count(*) FROM operational_current \
2908 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2909 [],
2910 |row| row.get(0),
2911 )
2912 .expect("current count");
2913 assert_eq!(current_count, 0);
2914 }
2915
2916 #[test]
2917 fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2918 let db = NamedTempFile::new().expect("temporary db");
2919 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2920 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2921 conn.execute(
2922 "INSERT INTO operational_collections \
2923 (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2924 VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2925 '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2926 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2927 )
2928 .expect("seed collection");
2929 drop(conn);
2930 let writer = WriterActor::start(
2931 db.path(),
2932 Arc::new(SchemaManager::new()),
2933 ProvenanceMode::Warn,
2934 Arc::new(TelemetryCounters::default()),
2935 )
2936 .expect("writer");
2937
2938 let error = writer
2939 .submit(WriteRequest {
2940 label: "invalid-append".to_owned(),
2941 nodes: vec![],
2942 node_retires: vec![],
2943 edges: vec![],
2944 edge_retires: vec![],
2945 chunks: vec![],
2946 runs: vec![],
2947 steps: vec![],
2948 actions: vec![],
2949 optional_backfills: vec![],
2950 vec_inserts: vec![],
2951 operational_writes: vec![OperationalWrite::Append {
2952 collection: "audit_log".to_owned(),
2953 record_key: "evt-1".to_owned(),
2954 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2955 source_ref: Some("src-1".to_owned()),
2956 }],
2957 })
2958 .expect_err("invalid append must reject");
2959 assert!(matches!(error, EngineError::InvalidWrite(_)));
2960 assert!(error.to_string().contains("must be one of"));
2961
2962 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2963 let mutation_count: i64 = conn
2964 .query_row(
2965 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2966 [],
2967 |row| row.get(0),
2968 )
2969 .expect("mutation count");
2970 assert_eq!(mutation_count, 0);
2971 let filter_count: i64 = conn
2972 .query_row(
2973 "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2974 [],
2975 |row| row.get(0),
2976 )
2977 .expect("filter count");
2978 assert_eq!(filter_count, 0);
2979 }
2980
2981 #[test]
2982 fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2983 let db = NamedTempFile::new().expect("temporary db");
2984 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2985 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2986 conn.execute(
2987 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2988 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2989 [],
2990 )
2991 .expect("seed collection");
2992 drop(conn);
2993 let writer = WriterActor::start(
2994 db.path(),
2995 Arc::new(SchemaManager::new()),
2996 ProvenanceMode::Warn,
2997 Arc::new(TelemetryCounters::default()),
2998 )
2999 .expect("writer");
3000
3001 writer
3002 .submit(WriteRequest {
3003 label: "put-operational".to_owned(),
3004 nodes: vec![],
3005 node_retires: vec![],
3006 edges: vec![],
3007 edge_retires: vec![],
3008 chunks: vec![],
3009 runs: vec![],
3010 steps: vec![],
3011 actions: vec![],
3012 optional_backfills: vec![],
3013 vec_inserts: vec![],
3014 operational_writes: vec![OperationalWrite::Put {
3015 collection: "connector_health".to_owned(),
3016 record_key: "gmail".to_owned(),
3017 payload_json: r#"{"status":"ok"}"#.to_owned(),
3018 source_ref: Some("src-1".to_owned()),
3019 }],
3020 })
3021 .expect("put receipt");
3022
3023 writer
3024 .submit(WriteRequest {
3025 label: "delete-operational".to_owned(),
3026 nodes: vec![],
3027 node_retires: vec![],
3028 edges: vec![],
3029 edge_retires: vec![],
3030 chunks: vec![],
3031 runs: vec![],
3032 steps: vec![],
3033 actions: vec![],
3034 optional_backfills: vec![],
3035 vec_inserts: vec![],
3036 operational_writes: vec![OperationalWrite::Delete {
3037 collection: "connector_health".to_owned(),
3038 record_key: "gmail".to_owned(),
3039 source_ref: Some("src-2".to_owned()),
3040 }],
3041 })
3042 .expect("delete receipt");
3043
3044 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3045 let mutation_kinds: Vec<String> = {
3046 let mut stmt = conn
3047 .prepare(
3048 "SELECT op_kind FROM operational_mutations \
3049 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3050 ORDER BY mutation_order ASC",
3051 )
3052 .expect("stmt");
3053 stmt.query_map([], |row| row.get(0))
3054 .expect("rows")
3055 .collect::<Result<_, _>>()
3056 .expect("collect")
3057 };
3058 assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
3059 let current_count: i64 = conn
3060 .query_row(
3061 "SELECT count(*) FROM operational_current \
3062 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3063 [],
3064 |row| row.get(0),
3065 )
3066 .expect("current count");
3067 assert_eq!(current_count, 0);
3068 }
3069
3070 #[test]
3071 fn writer_delete_bypasses_validation_contract() {
3072 let db = NamedTempFile::new().expect("temporary db");
3073 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3074 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3075 conn.execute(
3076 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3077 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3078 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3079 )
3080 .expect("seed collection");
3081 drop(conn);
3082 let writer = WriterActor::start(
3083 db.path(),
3084 Arc::new(SchemaManager::new()),
3085 ProvenanceMode::Warn,
3086 Arc::new(TelemetryCounters::default()),
3087 )
3088 .expect("writer");
3089
3090 writer
3091 .submit(WriteRequest {
3092 label: "valid-put".to_owned(),
3093 nodes: vec![],
3094 node_retires: vec![],
3095 edges: vec![],
3096 edge_retires: vec![],
3097 chunks: vec![],
3098 runs: vec![],
3099 steps: vec![],
3100 actions: vec![],
3101 optional_backfills: vec![],
3102 vec_inserts: vec![],
3103 operational_writes: vec![OperationalWrite::Put {
3104 collection: "connector_health".to_owned(),
3105 record_key: "gmail".to_owned(),
3106 payload_json: r#"{"status":"ok"}"#.to_owned(),
3107 source_ref: Some("src-1".to_owned()),
3108 }],
3109 })
3110 .expect("put receipt");
3111 writer
3112 .submit(WriteRequest {
3113 label: "delete-after-put".to_owned(),
3114 nodes: vec![],
3115 node_retires: vec![],
3116 edges: vec![],
3117 edge_retires: vec![],
3118 chunks: vec![],
3119 runs: vec![],
3120 steps: vec![],
3121 actions: vec![],
3122 optional_backfills: vec![],
3123 vec_inserts: vec![],
3124 operational_writes: vec![OperationalWrite::Delete {
3125 collection: "connector_health".to_owned(),
3126 record_key: "gmail".to_owned(),
3127 source_ref: Some("src-2".to_owned()),
3128 }],
3129 })
3130 .expect("delete receipt");
3131
3132 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3133 let current_count: i64 = conn
3134 .query_row(
3135 "SELECT count(*) FROM operational_current \
3136 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3137 [],
3138 |row| row.get(0),
3139 )
3140 .expect("current count");
3141 assert_eq!(current_count, 0);
3142 }
3143
3144 #[test]
3145 fn writer_latest_state_secondary_indexes_track_put_and_delete() {
3146 let db = NamedTempFile::new().expect("temporary db");
3147 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3148 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3149 conn.execute(
3150 "INSERT INTO operational_collections \
3151 (name, kind, schema_json, retention_json, secondary_indexes_json) \
3152 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3153 [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
3154 )
3155 .expect("seed collection");
3156 drop(conn);
3157 let writer = WriterActor::start(
3158 db.path(),
3159 Arc::new(SchemaManager::new()),
3160 ProvenanceMode::Warn,
3161 Arc::new(TelemetryCounters::default()),
3162 )
3163 .expect("writer");
3164
3165 writer
3166 .submit(WriteRequest {
3167 label: "secondary-index-put".to_owned(),
3168 nodes: vec![],
3169 node_retires: vec![],
3170 edges: vec![],
3171 edge_retires: vec![],
3172 chunks: vec![],
3173 runs: vec![],
3174 steps: vec![],
3175 actions: vec![],
3176 optional_backfills: vec![],
3177 vec_inserts: vec![],
3178 operational_writes: vec![OperationalWrite::Put {
3179 collection: "connector_health".to_owned(),
3180 record_key: "gmail".to_owned(),
3181 payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
3182 .to_owned(),
3183 source_ref: Some("src-1".to_owned()),
3184 }],
3185 })
3186 .expect("put receipt");
3187
3188 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3189 let current_entry_count: i64 = conn
3190 .query_row(
3191 "SELECT count(*) FROM operational_secondary_index_entries \
3192 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3193 [],
3194 |row| row.get(0),
3195 )
3196 .expect("current secondary index count");
3197 assert_eq!(current_entry_count, 2);
3198 drop(conn);
3199
3200 writer
3201 .submit(WriteRequest {
3202 label: "secondary-index-delete".to_owned(),
3203 nodes: vec![],
3204 node_retires: vec![],
3205 edges: vec![],
3206 edge_retires: vec![],
3207 chunks: vec![],
3208 runs: vec![],
3209 steps: vec![],
3210 actions: vec![],
3211 optional_backfills: vec![],
3212 vec_inserts: vec![],
3213 operational_writes: vec![OperationalWrite::Delete {
3214 collection: "connector_health".to_owned(),
3215 record_key: "gmail".to_owned(),
3216 source_ref: Some("src-2".to_owned()),
3217 }],
3218 })
3219 .expect("delete receipt");
3220
3221 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3222 let current_entry_count: i64 = conn
3223 .query_row(
3224 "SELECT count(*) FROM operational_secondary_index_entries \
3225 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3226 [],
3227 |row| row.get(0),
3228 )
3229 .expect("current secondary index count");
3230 assert_eq!(current_entry_count, 0);
3231 }
3232
3233 #[test]
3234 fn writer_latest_state_operational_writes_persist_mutation_order() {
3235 let db = NamedTempFile::new().expect("temporary db");
3236 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3237 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3238 conn.execute(
3239 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3240 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3241 [],
3242 )
3243 .expect("seed collection");
3244 drop(conn);
3245
3246 let writer = WriterActor::start(
3247 db.path(),
3248 Arc::new(SchemaManager::new()),
3249 ProvenanceMode::Warn,
3250 Arc::new(TelemetryCounters::default()),
3251 )
3252 .expect("writer");
3253
3254 writer
3255 .submit(WriteRequest {
3256 label: "ordered-operational-batch".to_owned(),
3257 nodes: vec![],
3258 node_retires: vec![],
3259 edges: vec![],
3260 edge_retires: vec![],
3261 chunks: vec![],
3262 runs: vec![],
3263 steps: vec![],
3264 actions: vec![],
3265 optional_backfills: vec![],
3266 vec_inserts: vec![],
3267 operational_writes: vec![
3268 OperationalWrite::Put {
3269 collection: "connector_health".to_owned(),
3270 record_key: "gmail".to_owned(),
3271 payload_json: r#"{"status":"old"}"#.to_owned(),
3272 source_ref: Some("src-1".to_owned()),
3273 },
3274 OperationalWrite::Delete {
3275 collection: "connector_health".to_owned(),
3276 record_key: "gmail".to_owned(),
3277 source_ref: Some("src-2".to_owned()),
3278 },
3279 OperationalWrite::Put {
3280 collection: "connector_health".to_owned(),
3281 record_key: "gmail".to_owned(),
3282 payload_json: r#"{"status":"new"}"#.to_owned(),
3283 source_ref: Some("src-3".to_owned()),
3284 },
3285 ],
3286 })
3287 .expect("write receipt");
3288
3289 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3290 let rows: Vec<(String, i64)> = {
3291 let mut stmt = conn
3292 .prepare(
3293 "SELECT op_kind, mutation_order FROM operational_mutations \
3294 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3295 ORDER BY mutation_order ASC",
3296 )
3297 .expect("stmt");
3298 stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3299 .expect("rows")
3300 .collect::<Result<_, _>>()
3301 .expect("collect")
3302 };
3303 assert_eq!(
3304 rows,
3305 vec![
3306 ("put".to_owned(), 1),
3307 ("delete".to_owned(), 2),
3308 ("put".to_owned(), 3),
3309 ]
3310 );
3311 let payload: String = conn
3312 .query_row(
3313 "SELECT payload_json FROM operational_current \
3314 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3315 [],
3316 |row| row.get(0),
3317 )
3318 .expect("current payload");
3319 assert_eq!(payload, r#"{"status":"new"}"#);
3320 }
3321
3322 #[test]
3323 fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
3324 let db = NamedTempFile::new().expect("temporary db");
3325 let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
3326 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3327 conn.execute(
3328 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3329 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3330 [],
3331 )
3332 .expect("seed collection");
3333
3334 let request = WriteRequest {
3335 label: "disabled-race".to_owned(),
3336 nodes: vec![],
3337 node_retires: vec![],
3338 edges: vec![],
3339 edge_retires: vec![],
3340 chunks: vec![],
3341 runs: vec![],
3342 steps: vec![],
3343 actions: vec![],
3344 optional_backfills: vec![],
3345 vec_inserts: vec![],
3346 operational_writes: vec![OperationalWrite::Put {
3347 collection: "connector_health".to_owned(),
3348 record_key: "gmail".to_owned(),
3349 payload_json: r#"{"status":"ok"}"#.to_owned(),
3350 source_ref: Some("src-1".to_owned()),
3351 }],
3352 };
3353 let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
3354 resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
3355
3356 conn.execute(
3357 "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
3358 [],
3359 )
3360 .expect("disable collection after preflight");
3361
3362 let error =
3363 apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
3364 assert!(matches!(error, EngineError::InvalidWrite(_)));
3365 assert!(error.to_string().contains("is disabled"));
3366
3367 let mutation_count: i64 = conn
3368 .query_row(
3369 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3370 [],
3371 |row| row.get(0),
3372 )
3373 .expect("mutation count");
3374 assert_eq!(mutation_count, 0);
3375
3376 let current_count: i64 = conn
3377 .query_row(
3378 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3379 [],
3380 |row| row.get(0),
3381 )
3382 .expect("current count");
3383 assert_eq!(current_count, 0);
3384 }
3385
3386 #[test]
3387 fn writer_enforce_validation_rejects_invalid_put_atomically() {
3388 let db = NamedTempFile::new().expect("temporary db");
3389 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3390 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3391 conn.execute(
3392 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3393 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3394 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3395 )
3396 .expect("seed collection");
3397 drop(conn);
3398 let writer = WriterActor::start(
3399 db.path(),
3400 Arc::new(SchemaManager::new()),
3401 ProvenanceMode::Warn,
3402 Arc::new(TelemetryCounters::default()),
3403 )
3404 .expect("writer");
3405
3406 let error = writer
3407 .submit(WriteRequest {
3408 label: "invalid-put".to_owned(),
3409 nodes: vec![NodeInsert {
3410 row_id: "row-1".to_owned(),
3411 logical_id: "lg-1".to_owned(),
3412 kind: "Meeting".to_owned(),
3413 properties: "{}".to_owned(),
3414 source_ref: Some("src-1".to_owned()),
3415 upsert: false,
3416 chunk_policy: ChunkPolicy::Preserve,
3417 content_ref: None,
3418 }],
3419 node_retires: vec![],
3420 edges: vec![],
3421 edge_retires: vec![],
3422 chunks: vec![],
3423 runs: vec![],
3424 steps: vec![],
3425 actions: vec![],
3426 optional_backfills: vec![],
3427 vec_inserts: vec![],
3428 operational_writes: vec![OperationalWrite::Put {
3429 collection: "connector_health".to_owned(),
3430 record_key: "gmail".to_owned(),
3431 payload_json: r#"{"status":"bogus"}"#.to_owned(),
3432 source_ref: Some("src-1".to_owned()),
3433 }],
3434 })
3435 .expect_err("invalid put must reject");
3436 assert!(matches!(error, EngineError::InvalidWrite(_)));
3437 assert!(error.to_string().contains("must be one of"));
3438
3439 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3440 let node_count: i64 = conn
3441 .query_row(
3442 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
3443 [],
3444 |row| row.get(0),
3445 )
3446 .expect("node count");
3447 assert_eq!(node_count, 0);
3448 let mutation_count: i64 = conn
3449 .query_row(
3450 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3451 [],
3452 |row| row.get(0),
3453 )
3454 .expect("mutation count");
3455 assert_eq!(mutation_count, 0);
3456 let current_count: i64 = conn
3457 .query_row(
3458 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3459 [],
3460 |row| row.get(0),
3461 )
3462 .expect("current count");
3463 assert_eq!(current_count, 0);
3464 }
3465
3466 #[test]
3467 fn writer_rejects_append_against_latest_state_collection() {
3468 let db = NamedTempFile::new().expect("temporary db");
3469 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3470 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3471 conn.execute(
3472 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3473 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3474 [],
3475 )
3476 .expect("seed collection");
3477 drop(conn);
3478 let writer = WriterActor::start(
3479 db.path(),
3480 Arc::new(SchemaManager::new()),
3481 ProvenanceMode::Warn,
3482 Arc::new(TelemetryCounters::default()),
3483 )
3484 .expect("writer");
3485
3486 let result = writer.submit(WriteRequest {
3487 label: "bad-append".to_owned(),
3488 nodes: vec![],
3489 node_retires: vec![],
3490 edges: vec![],
3491 edge_retires: vec![],
3492 chunks: vec![],
3493 runs: vec![],
3494 steps: vec![],
3495 actions: vec![],
3496 optional_backfills: vec![],
3497 vec_inserts: vec![],
3498 operational_writes: vec![OperationalWrite::Append {
3499 collection: "connector_health".to_owned(),
3500 record_key: "gmail".to_owned(),
3501 payload_json: r#"{"status":"ok"}"#.to_owned(),
3502 source_ref: Some("src-1".to_owned()),
3503 }],
3504 });
3505
3506 assert!(
3507 matches!(result, Err(EngineError::InvalidWrite(_))),
3508 "latest_state collection must reject Append"
3509 );
3510 }
3511
3512 #[test]
3513 fn writer_upsert_supersedes_prior_active_node() {
3514 let db = NamedTempFile::new().expect("temporary db");
3515 let writer = WriterActor::start(
3516 db.path(),
3517 Arc::new(SchemaManager::new()),
3518 ProvenanceMode::Warn,
3519 Arc::new(TelemetryCounters::default()),
3520 )
3521 .expect("writer");
3522
3523 writer
3524 .submit(WriteRequest {
3525 label: "v1".to_owned(),
3526 nodes: vec![NodeInsert {
3527 row_id: "row-1".to_owned(),
3528 logical_id: "lg-1".to_owned(),
3529 kind: "Meeting".to_owned(),
3530 properties: r#"{"version":1}"#.to_owned(),
3531 source_ref: Some("src-1".to_owned()),
3532 upsert: false,
3533 chunk_policy: ChunkPolicy::Preserve,
3534 content_ref: None,
3535 }],
3536 node_retires: vec![],
3537 edges: vec![],
3538 edge_retires: vec![],
3539 chunks: vec![],
3540 runs: vec![],
3541 steps: vec![],
3542 actions: vec![],
3543 optional_backfills: vec![],
3544 vec_inserts: vec![],
3545 operational_writes: vec![],
3546 })
3547 .expect("v1 write");
3548
3549 writer
3550 .submit(WriteRequest {
3551 label: "v2".to_owned(),
3552 nodes: vec![NodeInsert {
3553 row_id: "row-2".to_owned(),
3554 logical_id: "lg-1".to_owned(),
3555 kind: "Meeting".to_owned(),
3556 properties: r#"{"version":2}"#.to_owned(),
3557 source_ref: Some("src-2".to_owned()),
3558 upsert: true,
3559 chunk_policy: ChunkPolicy::Preserve,
3560 content_ref: None,
3561 }],
3562 node_retires: vec![],
3563 edges: vec![],
3564 edge_retires: vec![],
3565 chunks: vec![],
3566 runs: vec![],
3567 steps: vec![],
3568 actions: vec![],
3569 optional_backfills: vec![],
3570 vec_inserts: vec![],
3571 operational_writes: vec![],
3572 })
3573 .expect("v2 upsert write");
3574
3575 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3576 let (active_row_id, props): (String, String) = conn
3577 .query_row(
3578 "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3579 [],
3580 |row| Ok((row.get(0)?, row.get(1)?)),
3581 )
3582 .expect("active row");
3583 assert_eq!(active_row_id, "row-2");
3584 assert!(props.contains("\"version\":2"));
3585
3586 let superseded: i64 = conn
3587 .query_row(
3588 "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3589 [],
3590 |row| row.get(0),
3591 )
3592 .expect("superseded count");
3593 assert_eq!(superseded, 1);
3594 }
3595
3596 #[test]
3597 fn writer_inserts_edge_between_two_nodes() {
3598 let db = NamedTempFile::new().expect("temporary db");
3599 let writer = WriterActor::start(
3600 db.path(),
3601 Arc::new(SchemaManager::new()),
3602 ProvenanceMode::Warn,
3603 Arc::new(TelemetryCounters::default()),
3604 )
3605 .expect("writer");
3606
3607 writer
3608 .submit(WriteRequest {
3609 label: "nodes-and-edge".to_owned(),
3610 nodes: vec![
3611 NodeInsert {
3612 row_id: "row-meeting".to_owned(),
3613 logical_id: "meeting-1".to_owned(),
3614 kind: "Meeting".to_owned(),
3615 properties: "{}".to_owned(),
3616 source_ref: Some("src-1".to_owned()),
3617 upsert: false,
3618 chunk_policy: ChunkPolicy::Preserve,
3619 content_ref: None,
3620 },
3621 NodeInsert {
3622 row_id: "row-task".to_owned(),
3623 logical_id: "task-1".to_owned(),
3624 kind: "Task".to_owned(),
3625 properties: "{}".to_owned(),
3626 source_ref: Some("src-1".to_owned()),
3627 upsert: false,
3628 chunk_policy: ChunkPolicy::Preserve,
3629 content_ref: None,
3630 },
3631 ],
3632 node_retires: vec![],
3633 edges: vec![EdgeInsert {
3634 row_id: "edge-1".to_owned(),
3635 logical_id: "edge-lg-1".to_owned(),
3636 source_logical_id: "meeting-1".to_owned(),
3637 target_logical_id: "task-1".to_owned(),
3638 kind: "HAS_TASK".to_owned(),
3639 properties: "{}".to_owned(),
3640 source_ref: Some("src-1".to_owned()),
3641 upsert: false,
3642 }],
3643 edge_retires: vec![],
3644 chunks: vec![],
3645 runs: vec![],
3646 steps: vec![],
3647 actions: vec![],
3648 optional_backfills: vec![],
3649 vec_inserts: vec![],
3650 operational_writes: vec![],
3651 })
3652 .expect("write receipt");
3653
3654 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3655 let (src, tgt, kind): (String, String, String) = conn
3656 .query_row(
3657 "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3658 [],
3659 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3660 )
3661 .expect("edge row");
3662 assert_eq!(src, "meeting-1");
3663 assert_eq!(tgt, "task-1");
3664 assert_eq!(kind, "HAS_TASK");
3665 }
3666
3667 #[test]
3668 #[allow(clippy::too_many_lines)]
3669 fn writer_upsert_supersedes_prior_active_edge() {
3670 let db = NamedTempFile::new().expect("temporary db");
3671 let writer = WriterActor::start(
3672 db.path(),
3673 Arc::new(SchemaManager::new()),
3674 ProvenanceMode::Warn,
3675 Arc::new(TelemetryCounters::default()),
3676 )
3677 .expect("writer");
3678
3679 writer
3681 .submit(WriteRequest {
3682 label: "nodes".to_owned(),
3683 nodes: vec![
3684 NodeInsert {
3685 row_id: "row-a".to_owned(),
3686 logical_id: "node-a".to_owned(),
3687 kind: "Meeting".to_owned(),
3688 properties: "{}".to_owned(),
3689 source_ref: Some("src-1".to_owned()),
3690 upsert: false,
3691 chunk_policy: ChunkPolicy::Preserve,
3692 content_ref: None,
3693 },
3694 NodeInsert {
3695 row_id: "row-b".to_owned(),
3696 logical_id: "node-b".to_owned(),
3697 kind: "Task".to_owned(),
3698 properties: "{}".to_owned(),
3699 source_ref: Some("src-1".to_owned()),
3700 upsert: false,
3701 chunk_policy: ChunkPolicy::Preserve,
3702 content_ref: None,
3703 },
3704 ],
3705 node_retires: vec![],
3706 edges: vec![],
3707 edge_retires: vec![],
3708 chunks: vec![],
3709 runs: vec![],
3710 steps: vec![],
3711 actions: vec![],
3712 optional_backfills: vec![],
3713 vec_inserts: vec![],
3714 operational_writes: vec![],
3715 })
3716 .expect("nodes write");
3717
3718 writer
3720 .submit(WriteRequest {
3721 label: "edge-v1".to_owned(),
3722 nodes: vec![],
3723 node_retires: vec![],
3724 edges: vec![EdgeInsert {
3725 row_id: "edge-row-1".to_owned(),
3726 logical_id: "edge-lg-1".to_owned(),
3727 source_logical_id: "node-a".to_owned(),
3728 target_logical_id: "node-b".to_owned(),
3729 kind: "HAS_TASK".to_owned(),
3730 properties: r#"{"weight":1}"#.to_owned(),
3731 source_ref: Some("src-1".to_owned()),
3732 upsert: false,
3733 }],
3734 edge_retires: vec![],
3735 chunks: vec![],
3736 runs: vec![],
3737 steps: vec![],
3738 actions: vec![],
3739 optional_backfills: vec![],
3740 vec_inserts: vec![],
3741 operational_writes: vec![],
3742 })
3743 .expect("edge v1 write");
3744
3745 writer
3747 .submit(WriteRequest {
3748 label: "edge-v2".to_owned(),
3749 nodes: vec![],
3750 node_retires: vec![],
3751 edges: vec![EdgeInsert {
3752 row_id: "edge-row-2".to_owned(),
3753 logical_id: "edge-lg-1".to_owned(),
3754 source_logical_id: "node-a".to_owned(),
3755 target_logical_id: "node-b".to_owned(),
3756 kind: "HAS_TASK".to_owned(),
3757 properties: r#"{"weight":2}"#.to_owned(),
3758 source_ref: Some("src-2".to_owned()),
3759 upsert: true,
3760 }],
3761 edge_retires: vec![],
3762 chunks: vec![],
3763 runs: vec![],
3764 steps: vec![],
3765 actions: vec![],
3766 optional_backfills: vec![],
3767 vec_inserts: vec![],
3768 operational_writes: vec![],
3769 })
3770 .expect("edge v2 upsert");
3771
3772 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3773 let (active_row_id, props): (String, String) = conn
3774 .query_row(
3775 "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3776 [],
3777 |row| Ok((row.get(0)?, row.get(1)?)),
3778 )
3779 .expect("active edge");
3780 assert_eq!(active_row_id, "edge-row-2");
3781 assert!(props.contains("\"weight\":2"));
3782
3783 let superseded: i64 = conn
3784 .query_row(
3785 "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3786 [],
3787 |row| row.get(0),
3788 )
3789 .expect("superseded count");
3790 assert_eq!(superseded, 1);
3791 }
3792
3793 #[test]
3794 fn writer_fts_rows_are_written_to_database() {
3795 let db = NamedTempFile::new().expect("temporary db");
3796 let writer = WriterActor::start(
3797 db.path(),
3798 Arc::new(SchemaManager::new()),
3799 ProvenanceMode::Warn,
3800 Arc::new(TelemetryCounters::default()),
3801 )
3802 .expect("writer");
3803
3804 writer
3805 .submit(WriteRequest {
3806 label: "seed".to_owned(),
3807 nodes: vec![NodeInsert {
3808 row_id: "row-1".to_owned(),
3809 logical_id: "logical-1".to_owned(),
3810 kind: "Meeting".to_owned(),
3811 properties: "{}".to_owned(),
3812 source_ref: Some("src-1".to_owned()),
3813 upsert: false,
3814 chunk_policy: ChunkPolicy::Preserve,
3815 content_ref: None,
3816 }],
3817 node_retires: vec![],
3818 edges: vec![],
3819 edge_retires: vec![],
3820 chunks: vec![ChunkInsert {
3821 id: "chunk-1".to_owned(),
3822 node_logical_id: "logical-1".to_owned(),
3823 text_content: "budget discussion".to_owned(),
3824 byte_start: None,
3825 byte_end: None,
3826 content_hash: None,
3827 }],
3828 runs: vec![],
3829 steps: vec![],
3830 actions: vec![],
3831 optional_backfills: vec![],
3832 vec_inserts: vec![],
3833 operational_writes: vec![],
3834 })
3835 .expect("write receipt");
3836
3837 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3838 let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3839 conn.query_row(
3840 "SELECT chunk_id, node_logical_id, kind, text_content \
3841 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3842 [],
3843 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3844 )
3845 .expect("fts row");
3846 assert_eq!(chunk_id, "chunk-1");
3847 assert_eq!(node_logical_id, "logical-1");
3848 assert_eq!(kind, "Meeting");
3849 assert_eq!(text_content, "budget discussion");
3850 }
3851
3852 #[test]
3853 fn writer_receipt_warns_on_nodes_without_source_ref() {
3854 let db = NamedTempFile::new().expect("temporary db");
3855 let writer = WriterActor::start(
3856 db.path(),
3857 Arc::new(SchemaManager::new()),
3858 ProvenanceMode::Warn,
3859 Arc::new(TelemetryCounters::default()),
3860 )
3861 .expect("writer");
3862
3863 let receipt = writer
3864 .submit(WriteRequest {
3865 label: "no-source".to_owned(),
3866 nodes: vec![NodeInsert {
3867 row_id: "row-1".to_owned(),
3868 logical_id: "logical-1".to_owned(),
3869 kind: "Meeting".to_owned(),
3870 properties: "{}".to_owned(),
3871 source_ref: None,
3872 upsert: false,
3873 chunk_policy: ChunkPolicy::Preserve,
3874 content_ref: None,
3875 }],
3876 node_retires: vec![],
3877 edges: vec![],
3878 edge_retires: vec![],
3879 chunks: vec![],
3880 runs: vec![],
3881 steps: vec![],
3882 actions: vec![],
3883 optional_backfills: vec![],
3884 vec_inserts: vec![],
3885 operational_writes: vec![],
3886 })
3887 .expect("write receipt");
3888
3889 assert_eq!(receipt.provenance_warnings.len(), 1);
3890 assert!(receipt.provenance_warnings[0].contains("logical-1"));
3891 }
3892
3893 #[test]
3894 fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3895 let db = NamedTempFile::new().expect("temporary db");
3896 let writer = WriterActor::start(
3897 db.path(),
3898 Arc::new(SchemaManager::new()),
3899 ProvenanceMode::Warn,
3900 Arc::new(TelemetryCounters::default()),
3901 )
3902 .expect("writer");
3903
3904 let receipt = writer
3905 .submit(WriteRequest {
3906 label: "with-source".to_owned(),
3907 nodes: vec![NodeInsert {
3908 row_id: "row-1".to_owned(),
3909 logical_id: "logical-1".to_owned(),
3910 kind: "Meeting".to_owned(),
3911 properties: "{}".to_owned(),
3912 source_ref: Some("src-1".to_owned()),
3913 upsert: false,
3914 chunk_policy: ChunkPolicy::Preserve,
3915 content_ref: None,
3916 }],
3917 node_retires: vec![],
3918 edges: vec![],
3919 edge_retires: vec![],
3920 chunks: vec![],
3921 runs: vec![],
3922 steps: vec![],
3923 actions: vec![],
3924 optional_backfills: vec![],
3925 vec_inserts: vec![],
3926 operational_writes: vec![],
3927 })
3928 .expect("write receipt");
3929
3930 assert!(receipt.provenance_warnings.is_empty());
3931 }
3932
3933 #[test]
3934 fn writer_accepts_chunk_for_pre_existing_node() {
3935 let db = NamedTempFile::new().expect("temporary db");
3936 let writer = WriterActor::start(
3937 db.path(),
3938 Arc::new(SchemaManager::new()),
3939 ProvenanceMode::Warn,
3940 Arc::new(TelemetryCounters::default()),
3941 )
3942 .expect("writer");
3943
3944 writer
3946 .submit(WriteRequest {
3947 label: "r1".to_owned(),
3948 nodes: vec![NodeInsert {
3949 row_id: "row-1".to_owned(),
3950 logical_id: "logical-1".to_owned(),
3951 kind: "Meeting".to_owned(),
3952 properties: "{}".to_owned(),
3953 source_ref: Some("src-1".to_owned()),
3954 upsert: false,
3955 chunk_policy: ChunkPolicy::Preserve,
3956 content_ref: None,
3957 }],
3958 node_retires: vec![],
3959 edges: vec![],
3960 edge_retires: vec![],
3961 chunks: vec![],
3962 runs: vec![],
3963 steps: vec![],
3964 actions: vec![],
3965 optional_backfills: vec![],
3966 vec_inserts: vec![],
3967 operational_writes: vec![],
3968 })
3969 .expect("r1 write");
3970
3971 writer
3973 .submit(WriteRequest {
3974 label: "r2".to_owned(),
3975 nodes: vec![],
3976 node_retires: vec![],
3977 edges: vec![],
3978 edge_retires: vec![],
3979 chunks: vec![ChunkInsert {
3980 id: "chunk-1".to_owned(),
3981 node_logical_id: "logical-1".to_owned(),
3982 text_content: "budget discussion".to_owned(),
3983 byte_start: None,
3984 byte_end: None,
3985 content_hash: None,
3986 }],
3987 runs: vec![],
3988 steps: vec![],
3989 actions: vec![],
3990 optional_backfills: vec![],
3991 vec_inserts: vec![],
3992 operational_writes: vec![],
3993 })
3994 .expect("r2 write — chunk for pre-existing node");
3995
3996 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3997 let count: i64 = conn
3998 .query_row(
3999 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
4000 [],
4001 |row| row.get(0),
4002 )
4003 .expect("fts count");
4004 assert_eq!(
4005 count, 1,
4006 "FTS row must exist for chunk attached to pre-existing node"
4007 );
4008 }
4009
4010 #[test]
4011 fn writer_rejects_chunk_for_completely_unknown_node() {
4012 let db = NamedTempFile::new().expect("temporary db");
4013 let writer = WriterActor::start(
4014 db.path(),
4015 Arc::new(SchemaManager::new()),
4016 ProvenanceMode::Warn,
4017 Arc::new(TelemetryCounters::default()),
4018 )
4019 .expect("writer");
4020
4021 let result = writer.submit(WriteRequest {
4022 label: "bad".to_owned(),
4023 nodes: vec![],
4024 node_retires: vec![],
4025 edges: vec![],
4026 edge_retires: vec![],
4027 chunks: vec![ChunkInsert {
4028 id: "chunk-1".to_owned(),
4029 node_logical_id: "nonexistent".to_owned(),
4030 text_content: "some text".to_owned(),
4031 byte_start: None,
4032 byte_end: None,
4033 content_hash: None,
4034 }],
4035 runs: vec![],
4036 steps: vec![],
4037 actions: vec![],
4038 optional_backfills: vec![],
4039 vec_inserts: vec![],
4040 operational_writes: vec![],
4041 });
4042
4043 assert!(
4044 matches!(result, Err(EngineError::InvalidWrite(_))),
4045 "completely unknown node must return InvalidWrite"
4046 );
4047 }
4048
4049 #[test]
4050 fn writer_executes_typed_nodes_chunks_and_derived_projections() {
4051 let db = NamedTempFile::new().expect("temporary db");
4052 let writer = WriterActor::start(
4053 db.path(),
4054 Arc::new(SchemaManager::new()),
4055 ProvenanceMode::Warn,
4056 Arc::new(TelemetryCounters::default()),
4057 )
4058 .expect("writer");
4059
4060 let receipt = writer
4061 .submit(WriteRequest {
4062 label: "seed".to_owned(),
4063 nodes: vec![NodeInsert {
4064 row_id: "row-1".to_owned(),
4065 logical_id: "logical-1".to_owned(),
4066 kind: "Meeting".to_owned(),
4067 properties: "{}".to_owned(),
4068 source_ref: None,
4069 upsert: false,
4070 chunk_policy: ChunkPolicy::Preserve,
4071 content_ref: None,
4072 }],
4073 node_retires: vec![],
4074 edges: vec![],
4075 edge_retires: vec![],
4076 chunks: vec![ChunkInsert {
4077 id: "chunk-1".to_owned(),
4078 node_logical_id: "logical-1".to_owned(),
4079 text_content: "budget discussion".to_owned(),
4080 byte_start: None,
4081 byte_end: None,
4082 content_hash: None,
4083 }],
4084 runs: vec![],
4085 steps: vec![],
4086 actions: vec![],
4087 optional_backfills: vec![],
4088 vec_inserts: vec![],
4089 operational_writes: vec![],
4090 })
4091 .expect("write receipt");
4092
4093 assert_eq!(receipt.label, "seed");
4094 }
4095
4096 #[test]
4097 fn writer_node_retire_supersedes_active_node() {
4098 let db = NamedTempFile::new().expect("temporary db");
4099 let writer = WriterActor::start(
4100 db.path(),
4101 Arc::new(SchemaManager::new()),
4102 ProvenanceMode::Warn,
4103 Arc::new(TelemetryCounters::default()),
4104 )
4105 .expect("writer");
4106
4107 writer
4108 .submit(WriteRequest {
4109 label: "seed".to_owned(),
4110 nodes: vec![NodeInsert {
4111 row_id: "row-1".to_owned(),
4112 logical_id: "meeting-1".to_owned(),
4113 kind: "Meeting".to_owned(),
4114 properties: "{}".to_owned(),
4115 source_ref: Some("src-1".to_owned()),
4116 upsert: false,
4117 chunk_policy: ChunkPolicy::Preserve,
4118 content_ref: None,
4119 }],
4120 node_retires: vec![],
4121 edges: vec![],
4122 edge_retires: vec![],
4123 chunks: vec![],
4124 runs: vec![],
4125 steps: vec![],
4126 actions: vec![],
4127 optional_backfills: vec![],
4128 vec_inserts: vec![],
4129 operational_writes: vec![],
4130 })
4131 .expect("seed write");
4132
4133 writer
4134 .submit(WriteRequest {
4135 label: "retire".to_owned(),
4136 nodes: vec![],
4137 node_retires: vec![NodeRetire {
4138 logical_id: "meeting-1".to_owned(),
4139 source_ref: Some("src-2".to_owned()),
4140 }],
4141 edges: vec![],
4142 edge_retires: vec![],
4143 chunks: vec![],
4144 runs: vec![],
4145 steps: vec![],
4146 actions: vec![],
4147 optional_backfills: vec![],
4148 vec_inserts: vec![],
4149 operational_writes: vec![],
4150 })
4151 .expect("retire write");
4152
4153 let conn = rusqlite::Connection::open(db.path()).expect("open");
4154 let active: i64 = conn
4155 .query_row(
4156 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
4157 [],
4158 |r| r.get(0),
4159 )
4160 .expect("count active");
4161 let historical: i64 = conn
4162 .query_row(
4163 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
4164 [],
4165 |r| r.get(0),
4166 )
4167 .expect("count historical");
4168
4169 assert_eq!(active, 0, "active count must be 0 after retire");
4170 assert_eq!(historical, 1, "historical count must be 1 after retire");
4171 }
4172
4173 #[test]
4174 fn writer_node_retire_preserves_chunks_and_clears_fts() {
4175 let db = NamedTempFile::new().expect("temporary db");
4176 let writer = WriterActor::start(
4177 db.path(),
4178 Arc::new(SchemaManager::new()),
4179 ProvenanceMode::Warn,
4180 Arc::new(TelemetryCounters::default()),
4181 )
4182 .expect("writer");
4183
4184 writer
4185 .submit(WriteRequest {
4186 label: "seed".to_owned(),
4187 nodes: vec![NodeInsert {
4188 row_id: "row-1".to_owned(),
4189 logical_id: "meeting-1".to_owned(),
4190 kind: "Meeting".to_owned(),
4191 properties: "{}".to_owned(),
4192 source_ref: Some("src-1".to_owned()),
4193 upsert: false,
4194 chunk_policy: ChunkPolicy::Preserve,
4195 content_ref: None,
4196 }],
4197 node_retires: vec![],
4198 edges: vec![],
4199 edge_retires: vec![],
4200 chunks: vec![ChunkInsert {
4201 id: "chunk-1".to_owned(),
4202 node_logical_id: "meeting-1".to_owned(),
4203 text_content: "budget discussion".to_owned(),
4204 byte_start: None,
4205 byte_end: None,
4206 content_hash: None,
4207 }],
4208 runs: vec![],
4209 steps: vec![],
4210 actions: vec![],
4211 optional_backfills: vec![],
4212 vec_inserts: vec![],
4213 operational_writes: vec![],
4214 })
4215 .expect("seed write");
4216
4217 writer
4218 .submit(WriteRequest {
4219 label: "retire".to_owned(),
4220 nodes: vec![],
4221 node_retires: vec![NodeRetire {
4222 logical_id: "meeting-1".to_owned(),
4223 source_ref: Some("src-2".to_owned()),
4224 }],
4225 edges: vec![],
4226 edge_retires: vec![],
4227 chunks: vec![],
4228 runs: vec![],
4229 steps: vec![],
4230 actions: vec![],
4231 optional_backfills: vec![],
4232 vec_inserts: vec![],
4233 operational_writes: vec![],
4234 })
4235 .expect("retire write");
4236
4237 let conn = rusqlite::Connection::open(db.path()).expect("open");
4238 let chunk_count: i64 = conn
4239 .query_row(
4240 "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
4241 [],
4242 |r| r.get(0),
4243 )
4244 .expect("chunk count");
4245 let fts_count: i64 = conn
4246 .query_row(
4247 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
4248 [],
4249 |r| r.get(0),
4250 )
4251 .expect("fts count");
4252
4253 assert_eq!(
4254 chunk_count, 1,
4255 "chunks must remain after node retire so restore can re-establish content"
4256 );
4257 assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
4258 }
4259
4260 #[test]
4261 fn writer_edge_retire_supersedes_active_edge() {
4262 let db = NamedTempFile::new().expect("temporary db");
4263 let writer = WriterActor::start(
4264 db.path(),
4265 Arc::new(SchemaManager::new()),
4266 ProvenanceMode::Warn,
4267 Arc::new(TelemetryCounters::default()),
4268 )
4269 .expect("writer");
4270
4271 writer
4272 .submit(WriteRequest {
4273 label: "seed".to_owned(),
4274 nodes: vec![
4275 NodeInsert {
4276 row_id: "row-a".to_owned(),
4277 logical_id: "node-a".to_owned(),
4278 kind: "Meeting".to_owned(),
4279 properties: "{}".to_owned(),
4280 source_ref: Some("src-1".to_owned()),
4281 upsert: false,
4282 chunk_policy: ChunkPolicy::Preserve,
4283 content_ref: None,
4284 },
4285 NodeInsert {
4286 row_id: "row-b".to_owned(),
4287 logical_id: "node-b".to_owned(),
4288 kind: "Task".to_owned(),
4289 properties: "{}".to_owned(),
4290 source_ref: Some("src-1".to_owned()),
4291 upsert: false,
4292 chunk_policy: ChunkPolicy::Preserve,
4293 content_ref: None,
4294 },
4295 ],
4296 node_retires: vec![],
4297 edges: vec![EdgeInsert {
4298 row_id: "edge-1".to_owned(),
4299 logical_id: "edge-lg-1".to_owned(),
4300 source_logical_id: "node-a".to_owned(),
4301 target_logical_id: "node-b".to_owned(),
4302 kind: "HAS_TASK".to_owned(),
4303 properties: "{}".to_owned(),
4304 source_ref: Some("src-1".to_owned()),
4305 upsert: false,
4306 }],
4307 edge_retires: vec![],
4308 chunks: vec![],
4309 runs: vec![],
4310 steps: vec![],
4311 actions: vec![],
4312 optional_backfills: vec![],
4313 vec_inserts: vec![],
4314 operational_writes: vec![],
4315 })
4316 .expect("seed write");
4317
4318 writer
4319 .submit(WriteRequest {
4320 label: "retire-edge".to_owned(),
4321 nodes: vec![],
4322 node_retires: vec![],
4323 edges: vec![],
4324 edge_retires: vec![EdgeRetire {
4325 logical_id: "edge-lg-1".to_owned(),
4326 source_ref: Some("src-2".to_owned()),
4327 }],
4328 chunks: vec![],
4329 runs: vec![],
4330 steps: vec![],
4331 actions: vec![],
4332 optional_backfills: vec![],
4333 vec_inserts: vec![],
4334 operational_writes: vec![],
4335 })
4336 .expect("retire edge write");
4337
4338 let conn = rusqlite::Connection::open(db.path()).expect("open");
4339 let active: i64 = conn
4340 .query_row(
4341 "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
4342 [],
4343 |r| r.get(0),
4344 )
4345 .expect("active edge count");
4346
4347 assert_eq!(active, 0, "active edge count must be 0 after retire");
4348 }
4349
4350 #[test]
4351 fn writer_retire_without_source_ref_emits_provenance_warning() {
4352 let db = NamedTempFile::new().expect("temporary db");
4353 let writer = WriterActor::start(
4354 db.path(),
4355 Arc::new(SchemaManager::new()),
4356 ProvenanceMode::Warn,
4357 Arc::new(TelemetryCounters::default()),
4358 )
4359 .expect("writer");
4360
4361 writer
4362 .submit(WriteRequest {
4363 label: "seed".to_owned(),
4364 nodes: vec![NodeInsert {
4365 row_id: "row-1".to_owned(),
4366 logical_id: "meeting-1".to_owned(),
4367 kind: "Meeting".to_owned(),
4368 properties: "{}".to_owned(),
4369 source_ref: Some("src-1".to_owned()),
4370 upsert: false,
4371 chunk_policy: ChunkPolicy::Preserve,
4372 content_ref: None,
4373 }],
4374 node_retires: vec![],
4375 edges: vec![],
4376 edge_retires: vec![],
4377 chunks: vec![],
4378 runs: vec![],
4379 steps: vec![],
4380 actions: vec![],
4381 optional_backfills: vec![],
4382 vec_inserts: vec![],
4383 operational_writes: vec![],
4384 })
4385 .expect("seed write");
4386
4387 let receipt = writer
4388 .submit(WriteRequest {
4389 label: "retire-no-src".to_owned(),
4390 nodes: vec![],
4391 node_retires: vec![NodeRetire {
4392 logical_id: "meeting-1".to_owned(),
4393 source_ref: None,
4394 }],
4395 edges: vec![],
4396 edge_retires: vec![],
4397 chunks: vec![],
4398 runs: vec![],
4399 steps: vec![],
4400 actions: vec![],
4401 optional_backfills: vec![],
4402 vec_inserts: vec![],
4403 operational_writes: vec![],
4404 })
4405 .expect("retire write");
4406
4407 assert!(
4408 !receipt.provenance_warnings.is_empty(),
4409 "retire without source_ref must emit a provenance warning"
4410 );
4411 }
4412
4413 #[test]
4414 #[allow(clippy::too_many_lines)]
4415 fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
4416 let db = NamedTempFile::new().expect("temporary db");
4417 let writer = WriterActor::start(
4418 db.path(),
4419 Arc::new(SchemaManager::new()),
4420 ProvenanceMode::Warn,
4421 Arc::new(TelemetryCounters::default()),
4422 )
4423 .expect("writer");
4424
4425 writer
4426 .submit(WriteRequest {
4427 label: "v1".to_owned(),
4428 nodes: vec![NodeInsert {
4429 row_id: "row-1".to_owned(),
4430 logical_id: "meeting-1".to_owned(),
4431 kind: "Meeting".to_owned(),
4432 properties: "{}".to_owned(),
4433 source_ref: Some("src-1".to_owned()),
4434 upsert: false,
4435 chunk_policy: ChunkPolicy::Preserve,
4436 content_ref: None,
4437 }],
4438 node_retires: vec![],
4439 edges: vec![],
4440 edge_retires: vec![],
4441 chunks: vec![ChunkInsert {
4442 id: "chunk-old".to_owned(),
4443 node_logical_id: "meeting-1".to_owned(),
4444 text_content: "old text".to_owned(),
4445 byte_start: None,
4446 byte_end: None,
4447 content_hash: None,
4448 }],
4449 runs: vec![],
4450 steps: vec![],
4451 actions: vec![],
4452 optional_backfills: vec![],
4453 vec_inserts: vec![],
4454 operational_writes: vec![],
4455 })
4456 .expect("v1 write");
4457
4458 writer
4459 .submit(WriteRequest {
4460 label: "v2".to_owned(),
4461 nodes: vec![NodeInsert {
4462 row_id: "row-2".to_owned(),
4463 logical_id: "meeting-1".to_owned(),
4464 kind: "Meeting".to_owned(),
4465 properties: "{}".to_owned(),
4466 source_ref: Some("src-2".to_owned()),
4467 upsert: true,
4468 chunk_policy: ChunkPolicy::Replace,
4469 content_ref: None,
4470 }],
4471 node_retires: vec![],
4472 edges: vec![],
4473 edge_retires: vec![],
4474 chunks: vec![ChunkInsert {
4475 id: "chunk-new".to_owned(),
4476 node_logical_id: "meeting-1".to_owned(),
4477 text_content: "new text".to_owned(),
4478 byte_start: None,
4479 byte_end: None,
4480 content_hash: None,
4481 }],
4482 runs: vec![],
4483 steps: vec![],
4484 actions: vec![],
4485 optional_backfills: vec![],
4486 vec_inserts: vec![],
4487 operational_writes: vec![],
4488 })
4489 .expect("v2 write");
4490
4491 let conn = rusqlite::Connection::open(db.path()).expect("open");
4492 let old_chunk: i64 = conn
4493 .query_row(
4494 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4495 [],
4496 |r| r.get(0),
4497 )
4498 .expect("old chunk count");
4499 let new_chunk: i64 = conn
4500 .query_row(
4501 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4502 [],
4503 |r| r.get(0),
4504 )
4505 .expect("new chunk count");
4506 let fts_old: i64 = conn
4507 .query_row(
4508 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4509 [],
4510 |r| r.get(0),
4511 )
4512 .expect("old fts count");
4513
4514 assert_eq!(
4515 old_chunk, 0,
4516 "old chunk must be deleted by ChunkPolicy::Replace"
4517 );
4518 assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4519 assert_eq!(
4520 fts_old, 0,
4521 "old FTS row must be deleted by ChunkPolicy::Replace"
4522 );
4523 }
4524
4525 #[test]
4526 fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4527 let db = NamedTempFile::new().expect("temporary db");
4528 let writer = WriterActor::start(
4529 db.path(),
4530 Arc::new(SchemaManager::new()),
4531 ProvenanceMode::Warn,
4532 Arc::new(TelemetryCounters::default()),
4533 )
4534 .expect("writer");
4535
4536 writer
4537 .submit(WriteRequest {
4538 label: "v1".to_owned(),
4539 nodes: vec![NodeInsert {
4540 row_id: "row-1".to_owned(),
4541 logical_id: "meeting-1".to_owned(),
4542 kind: "Meeting".to_owned(),
4543 properties: "{}".to_owned(),
4544 source_ref: Some("src-1".to_owned()),
4545 upsert: false,
4546 chunk_policy: ChunkPolicy::Preserve,
4547 content_ref: None,
4548 }],
4549 node_retires: vec![],
4550 edges: vec![],
4551 edge_retires: vec![],
4552 chunks: vec![ChunkInsert {
4553 id: "chunk-old".to_owned(),
4554 node_logical_id: "meeting-1".to_owned(),
4555 text_content: "old text".to_owned(),
4556 byte_start: None,
4557 byte_end: None,
4558 content_hash: None,
4559 }],
4560 runs: vec![],
4561 steps: vec![],
4562 actions: vec![],
4563 optional_backfills: vec![],
4564 vec_inserts: vec![],
4565 operational_writes: vec![],
4566 })
4567 .expect("v1 write");
4568
4569 writer
4570 .submit(WriteRequest {
4571 label: "v2-props-only".to_owned(),
4572 nodes: vec![NodeInsert {
4573 row_id: "row-2".to_owned(),
4574 logical_id: "meeting-1".to_owned(),
4575 kind: "Meeting".to_owned(),
4576 properties: r#"{"status":"updated"}"#.to_owned(),
4577 source_ref: Some("src-2".to_owned()),
4578 upsert: true,
4579 chunk_policy: ChunkPolicy::Preserve,
4580 content_ref: None,
4581 }],
4582 node_retires: vec![],
4583 edges: vec![],
4584 edge_retires: vec![],
4585 chunks: vec![],
4586 runs: vec![],
4587 steps: vec![],
4588 actions: vec![],
4589 optional_backfills: vec![],
4590 vec_inserts: vec![],
4591 operational_writes: vec![],
4592 })
4593 .expect("v2 preserve write");
4594
4595 let conn = rusqlite::Connection::open(db.path()).expect("open");
4596 let old_chunk: i64 = conn
4597 .query_row(
4598 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4599 [],
4600 |r| r.get(0),
4601 )
4602 .expect("old chunk count");
4603
4604 assert_eq!(
4605 old_chunk, 1,
4606 "old chunk must be preserved by ChunkPolicy::Preserve"
4607 );
4608 }
4609
4610 #[test]
4611 fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4612 let db = NamedTempFile::new().expect("temporary db");
4613 let writer = WriterActor::start(
4614 db.path(),
4615 Arc::new(SchemaManager::new()),
4616 ProvenanceMode::Warn,
4617 Arc::new(TelemetryCounters::default()),
4618 )
4619 .expect("writer");
4620
4621 writer
4622 .submit(WriteRequest {
4623 label: "v1".to_owned(),
4624 nodes: vec![NodeInsert {
4625 row_id: "row-1".to_owned(),
4626 logical_id: "meeting-1".to_owned(),
4627 kind: "Meeting".to_owned(),
4628 properties: "{}".to_owned(),
4629 source_ref: Some("src-1".to_owned()),
4630 upsert: false,
4631 chunk_policy: ChunkPolicy::Preserve,
4632 content_ref: None,
4633 }],
4634 node_retires: vec![],
4635 edges: vec![],
4636 edge_retires: vec![],
4637 chunks: vec![ChunkInsert {
4638 id: "chunk-existing".to_owned(),
4639 node_logical_id: "meeting-1".to_owned(),
4640 text_content: "existing text".to_owned(),
4641 byte_start: None,
4642 byte_end: None,
4643 content_hash: None,
4644 }],
4645 runs: vec![],
4646 steps: vec![],
4647 actions: vec![],
4648 optional_backfills: vec![],
4649 vec_inserts: vec![],
4650 operational_writes: vec![],
4651 })
4652 .expect("v1 write");
4653
4654 writer
4656 .submit(WriteRequest {
4657 label: "insert-no-upsert".to_owned(),
4658 nodes: vec![NodeInsert {
4659 row_id: "row-2".to_owned(),
4660 logical_id: "meeting-2".to_owned(),
4661 kind: "Meeting".to_owned(),
4662 properties: "{}".to_owned(),
4663 source_ref: Some("src-2".to_owned()),
4664 upsert: false,
4665 chunk_policy: ChunkPolicy::Replace,
4666 content_ref: None,
4667 }],
4668 node_retires: vec![],
4669 edges: vec![],
4670 edge_retires: vec![],
4671 chunks: vec![],
4672 runs: vec![],
4673 steps: vec![],
4674 actions: vec![],
4675 optional_backfills: vec![],
4676 vec_inserts: vec![],
4677 operational_writes: vec![],
4678 })
4679 .expect("insert no-upsert write");
4680
4681 let conn = rusqlite::Connection::open(db.path()).expect("open");
4682 let existing_chunk: i64 = conn
4683 .query_row(
4684 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4685 [],
4686 |r| r.get(0),
4687 )
4688 .expect("chunk count");
4689
4690 assert_eq!(
4691 existing_chunk, 1,
4692 "ChunkPolicy::Replace without upsert must not delete existing chunks"
4693 );
4694 }
4695
4696 #[test]
4697 fn writer_run_upsert_supersedes_prior_active_run() {
4698 let db = NamedTempFile::new().expect("temporary db");
4699 let writer = WriterActor::start(
4700 db.path(),
4701 Arc::new(SchemaManager::new()),
4702 ProvenanceMode::Warn,
4703 Arc::new(TelemetryCounters::default()),
4704 )
4705 .expect("writer");
4706
4707 writer
4708 .submit(WriteRequest {
4709 label: "v1".to_owned(),
4710 nodes: vec![],
4711 node_retires: vec![],
4712 edges: vec![],
4713 edge_retires: vec![],
4714 chunks: vec![],
4715 runs: vec![RunInsert {
4716 id: "run-v1".to_owned(),
4717 kind: "session".to_owned(),
4718 status: "completed".to_owned(),
4719 properties: "{}".to_owned(),
4720 source_ref: Some("src-1".to_owned()),
4721 upsert: false,
4722 supersedes_id: None,
4723 }],
4724 steps: vec![],
4725 actions: vec![],
4726 optional_backfills: vec![],
4727 vec_inserts: vec![],
4728 operational_writes: vec![],
4729 })
4730 .expect("v1 run write");
4731
4732 writer
4733 .submit(WriteRequest {
4734 label: "v2".to_owned(),
4735 nodes: vec![],
4736 node_retires: vec![],
4737 edges: vec![],
4738 edge_retires: vec![],
4739 chunks: vec![],
4740 runs: vec![RunInsert {
4741 id: "run-v2".to_owned(),
4742 kind: "session".to_owned(),
4743 status: "completed".to_owned(),
4744 properties: "{}".to_owned(),
4745 source_ref: Some("src-2".to_owned()),
4746 upsert: true,
4747 supersedes_id: Some("run-v1".to_owned()),
4748 }],
4749 steps: vec![],
4750 actions: vec![],
4751 optional_backfills: vec![],
4752 vec_inserts: vec![],
4753 operational_writes: vec![],
4754 })
4755 .expect("v2 run write");
4756
4757 let conn = rusqlite::Connection::open(db.path()).expect("open");
4758 let v1_historical: i64 = conn
4759 .query_row(
4760 "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4761 [],
4762 |r| r.get(0),
4763 )
4764 .expect("v1 historical count");
4765 let v2_active: i64 = conn
4766 .query_row(
4767 "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4768 [],
4769 |r| r.get(0),
4770 )
4771 .expect("v2 active count");
4772
4773 assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4774 assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4775 }
4776
4777 #[test]
4778 fn writer_step_upsert_supersedes_prior_active_step() {
4779 let db = NamedTempFile::new().expect("temporary db");
4780 let writer = WriterActor::start(
4781 db.path(),
4782 Arc::new(SchemaManager::new()),
4783 ProvenanceMode::Warn,
4784 Arc::new(TelemetryCounters::default()),
4785 )
4786 .expect("writer");
4787
4788 writer
4789 .submit(WriteRequest {
4790 label: "v1".to_owned(),
4791 nodes: vec![],
4792 node_retires: vec![],
4793 edges: vec![],
4794 edge_retires: vec![],
4795 chunks: vec![],
4796 runs: vec![RunInsert {
4797 id: "run-1".to_owned(),
4798 kind: "session".to_owned(),
4799 status: "completed".to_owned(),
4800 properties: "{}".to_owned(),
4801 source_ref: Some("src-1".to_owned()),
4802 upsert: false,
4803 supersedes_id: None,
4804 }],
4805 steps: vec![StepInsert {
4806 id: "step-v1".to_owned(),
4807 run_id: "run-1".to_owned(),
4808 kind: "llm".to_owned(),
4809 status: "completed".to_owned(),
4810 properties: "{}".to_owned(),
4811 source_ref: Some("src-1".to_owned()),
4812 upsert: false,
4813 supersedes_id: None,
4814 }],
4815 actions: vec![],
4816 optional_backfills: vec![],
4817 vec_inserts: vec![],
4818 operational_writes: vec![],
4819 })
4820 .expect("v1 step write");
4821
4822 writer
4823 .submit(WriteRequest {
4824 label: "v2".to_owned(),
4825 nodes: vec![],
4826 node_retires: vec![],
4827 edges: vec![],
4828 edge_retires: vec![],
4829 chunks: vec![],
4830 runs: vec![],
4831 steps: vec![StepInsert {
4832 id: "step-v2".to_owned(),
4833 run_id: "run-1".to_owned(),
4834 kind: "llm".to_owned(),
4835 status: "completed".to_owned(),
4836 properties: "{}".to_owned(),
4837 source_ref: Some("src-2".to_owned()),
4838 upsert: true,
4839 supersedes_id: Some("step-v1".to_owned()),
4840 }],
4841 actions: vec![],
4842 optional_backfills: vec![],
4843 vec_inserts: vec![],
4844 operational_writes: vec![],
4845 })
4846 .expect("v2 step write");
4847
4848 let conn = rusqlite::Connection::open(db.path()).expect("open");
4849 let v1_historical: i64 = conn
4850 .query_row(
4851 "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4852 [],
4853 |r| r.get(0),
4854 )
4855 .expect("v1 historical count");
4856 let v2_active: i64 = conn
4857 .query_row(
4858 "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4859 [],
4860 |r| r.get(0),
4861 )
4862 .expect("v2 active count");
4863
4864 assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4865 assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4866 }
4867
4868 #[test]
4869 fn writer_action_upsert_supersedes_prior_active_action() {
4870 let db = NamedTempFile::new().expect("temporary db");
4871 let writer = WriterActor::start(
4872 db.path(),
4873 Arc::new(SchemaManager::new()),
4874 ProvenanceMode::Warn,
4875 Arc::new(TelemetryCounters::default()),
4876 )
4877 .expect("writer");
4878
4879 writer
4880 .submit(WriteRequest {
4881 label: "v1".to_owned(),
4882 nodes: vec![],
4883 node_retires: vec![],
4884 edges: vec![],
4885 edge_retires: vec![],
4886 chunks: vec![],
4887 runs: vec![RunInsert {
4888 id: "run-1".to_owned(),
4889 kind: "session".to_owned(),
4890 status: "completed".to_owned(),
4891 properties: "{}".to_owned(),
4892 source_ref: Some("src-1".to_owned()),
4893 upsert: false,
4894 supersedes_id: None,
4895 }],
4896 steps: vec![StepInsert {
4897 id: "step-1".to_owned(),
4898 run_id: "run-1".to_owned(),
4899 kind: "llm".to_owned(),
4900 status: "completed".to_owned(),
4901 properties: "{}".to_owned(),
4902 source_ref: Some("src-1".to_owned()),
4903 upsert: false,
4904 supersedes_id: None,
4905 }],
4906 actions: vec![ActionInsert {
4907 id: "action-v1".to_owned(),
4908 step_id: "step-1".to_owned(),
4909 kind: "emit".to_owned(),
4910 status: "completed".to_owned(),
4911 properties: "{}".to_owned(),
4912 source_ref: Some("src-1".to_owned()),
4913 upsert: false,
4914 supersedes_id: None,
4915 }],
4916 optional_backfills: vec![],
4917 vec_inserts: vec![],
4918 operational_writes: vec![],
4919 })
4920 .expect("v1 action write");
4921
4922 writer
4923 .submit(WriteRequest {
4924 label: "v2".to_owned(),
4925 nodes: vec![],
4926 node_retires: vec![],
4927 edges: vec![],
4928 edge_retires: vec![],
4929 chunks: vec![],
4930 runs: vec![],
4931 steps: vec![],
4932 actions: vec![ActionInsert {
4933 id: "action-v2".to_owned(),
4934 step_id: "step-1".to_owned(),
4935 kind: "emit".to_owned(),
4936 status: "completed".to_owned(),
4937 properties: "{}".to_owned(),
4938 source_ref: Some("src-2".to_owned()),
4939 upsert: true,
4940 supersedes_id: Some("action-v1".to_owned()),
4941 }],
4942 optional_backfills: vec![],
4943 vec_inserts: vec![],
4944 operational_writes: vec![],
4945 })
4946 .expect("v2 action write");
4947
4948 let conn = rusqlite::Connection::open(db.path()).expect("open");
4949 let v1_historical: i64 = conn
4950 .query_row(
4951 "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4952 [],
4953 |r| r.get(0),
4954 )
4955 .expect("v1 historical count");
4956 let v2_active: i64 = conn
4957 .query_row(
4958 "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4959 [],
4960 |r| r.get(0),
4961 )
4962 .expect("v2 active count");
4963
4964 assert_eq!(
4965 v1_historical, 1,
4966 "action-v1 must be historical after upsert"
4967 );
4968 assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4969 }
4970
4971 #[test]
4974 fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4975 let db = NamedTempFile::new().expect("temporary db");
4976 let writer = WriterActor::start(
4977 db.path(),
4978 Arc::new(SchemaManager::new()),
4979 ProvenanceMode::Warn,
4980 Arc::new(TelemetryCounters::default()),
4981 )
4982 .expect("writer");
4983
4984 let result = writer.submit(WriteRequest {
4985 label: "bad".to_owned(),
4986 nodes: vec![],
4987 node_retires: vec![],
4988 edges: vec![],
4989 edge_retires: vec![],
4990 chunks: vec![],
4991 runs: vec![RunInsert {
4992 id: "run-1".to_owned(),
4993 kind: "session".to_owned(),
4994 status: "completed".to_owned(),
4995 properties: "{}".to_owned(),
4996 source_ref: None,
4997 upsert: true,
4998 supersedes_id: None,
4999 }],
5000 steps: vec![],
5001 actions: vec![],
5002 optional_backfills: vec![],
5003 vec_inserts: vec![],
5004 operational_writes: vec![],
5005 });
5006
5007 assert!(
5008 matches!(result, Err(EngineError::InvalidWrite(_))),
5009 "run upsert=true without supersedes_id must return InvalidWrite"
5010 );
5011 }
5012
5013 #[test]
5014 fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
5015 let db = NamedTempFile::new().expect("temporary db");
5016 let writer = WriterActor::start(
5017 db.path(),
5018 Arc::new(SchemaManager::new()),
5019 ProvenanceMode::Warn,
5020 Arc::new(TelemetryCounters::default()),
5021 )
5022 .expect("writer");
5023
5024 let result = writer.submit(WriteRequest {
5025 label: "bad".to_owned(),
5026 nodes: vec![],
5027 node_retires: vec![],
5028 edges: vec![],
5029 edge_retires: vec![],
5030 chunks: vec![],
5031 runs: vec![],
5032 steps: vec![StepInsert {
5033 id: "step-1".to_owned(),
5034 run_id: "run-1".to_owned(),
5035 kind: "llm".to_owned(),
5036 status: "completed".to_owned(),
5037 properties: "{}".to_owned(),
5038 source_ref: None,
5039 upsert: true,
5040 supersedes_id: None,
5041 }],
5042 actions: vec![],
5043 optional_backfills: vec![],
5044 vec_inserts: vec![],
5045 operational_writes: vec![],
5046 });
5047
5048 assert!(
5049 matches!(result, Err(EngineError::InvalidWrite(_))),
5050 "step upsert=true without supersedes_id must return InvalidWrite"
5051 );
5052 }
5053
5054 #[test]
5055 fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
5056 let db = NamedTempFile::new().expect("temporary db");
5057 let writer = WriterActor::start(
5058 db.path(),
5059 Arc::new(SchemaManager::new()),
5060 ProvenanceMode::Warn,
5061 Arc::new(TelemetryCounters::default()),
5062 )
5063 .expect("writer");
5064
5065 let result = writer.submit(WriteRequest {
5066 label: "bad".to_owned(),
5067 nodes: vec![],
5068 node_retires: vec![],
5069 edges: vec![],
5070 edge_retires: vec![],
5071 chunks: vec![],
5072 runs: vec![],
5073 steps: vec![],
5074 actions: vec![ActionInsert {
5075 id: "action-1".to_owned(),
5076 step_id: "step-1".to_owned(),
5077 kind: "emit".to_owned(),
5078 status: "completed".to_owned(),
5079 properties: "{}".to_owned(),
5080 source_ref: None,
5081 upsert: true,
5082 supersedes_id: None,
5083 }],
5084 optional_backfills: vec![],
5085 vec_inserts: vec![],
5086 operational_writes: vec![],
5087 });
5088
5089 assert!(
5090 matches!(result, Err(EngineError::InvalidWrite(_))),
5091 "action upsert=true without supersedes_id must return InvalidWrite"
5092 );
5093 }
5094
5095 #[test]
5098 fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
5099 let db = NamedTempFile::new().expect("temporary db");
5100 let writer = WriterActor::start(
5101 db.path(),
5102 Arc::new(SchemaManager::new()),
5103 ProvenanceMode::Warn,
5104 Arc::new(TelemetryCounters::default()),
5105 )
5106 .expect("writer");
5107
5108 let receipt = writer
5109 .submit(WriteRequest {
5110 label: "test".to_owned(),
5111 nodes: vec![
5112 NodeInsert {
5113 row_id: "row-a".to_owned(),
5114 logical_id: "node-a".to_owned(),
5115 kind: "Meeting".to_owned(),
5116 properties: "{}".to_owned(),
5117 source_ref: Some("src-1".to_owned()),
5118 upsert: false,
5119 chunk_policy: ChunkPolicy::Preserve,
5120 content_ref: None,
5121 },
5122 NodeInsert {
5123 row_id: "row-b".to_owned(),
5124 logical_id: "node-b".to_owned(),
5125 kind: "Task".to_owned(),
5126 properties: "{}".to_owned(),
5127 source_ref: Some("src-1".to_owned()),
5128 upsert: false,
5129 chunk_policy: ChunkPolicy::Preserve,
5130 content_ref: None,
5131 },
5132 ],
5133 node_retires: vec![],
5134 edges: vec![EdgeInsert {
5135 row_id: "edge-1".to_owned(),
5136 logical_id: "edge-lg-1".to_owned(),
5137 source_logical_id: "node-a".to_owned(),
5138 target_logical_id: "node-b".to_owned(),
5139 kind: "HAS_TASK".to_owned(),
5140 properties: "{}".to_owned(),
5141 source_ref: None,
5142 upsert: false,
5143 }],
5144 edge_retires: vec![],
5145 chunks: vec![],
5146 runs: vec![],
5147 steps: vec![],
5148 actions: vec![],
5149 optional_backfills: vec![],
5150 vec_inserts: vec![],
5151 operational_writes: vec![],
5152 })
5153 .expect("write");
5154
5155 assert!(
5156 !receipt.provenance_warnings.is_empty(),
5157 "edge insert without source_ref must emit a provenance warning"
5158 );
5159 }
5160
5161 #[test]
5162 fn writer_run_insert_without_source_ref_emits_provenance_warning() {
5163 let db = NamedTempFile::new().expect("temporary db");
5164 let writer = WriterActor::start(
5165 db.path(),
5166 Arc::new(SchemaManager::new()),
5167 ProvenanceMode::Warn,
5168 Arc::new(TelemetryCounters::default()),
5169 )
5170 .expect("writer");
5171
5172 let receipt = writer
5173 .submit(WriteRequest {
5174 label: "test".to_owned(),
5175 nodes: vec![],
5176 node_retires: vec![],
5177 edges: vec![],
5178 edge_retires: vec![],
5179 chunks: vec![],
5180 runs: vec![RunInsert {
5181 id: "run-1".to_owned(),
5182 kind: "session".to_owned(),
5183 status: "completed".to_owned(),
5184 properties: "{}".to_owned(),
5185 source_ref: None,
5186 upsert: false,
5187 supersedes_id: None,
5188 }],
5189 steps: vec![],
5190 actions: vec![],
5191 optional_backfills: vec![],
5192 vec_inserts: vec![],
5193 operational_writes: vec![],
5194 })
5195 .expect("write");
5196
5197 assert!(
5198 !receipt.provenance_warnings.is_empty(),
5199 "run insert without source_ref must emit a provenance warning"
5200 );
5201 }
5202
5203 #[test]
5206 fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
5207 let db = NamedTempFile::new().expect("temporary db");
5208 let writer = WriterActor::start(
5209 db.path(),
5210 Arc::new(SchemaManager::new()),
5211 ProvenanceMode::Warn,
5212 Arc::new(TelemetryCounters::default()),
5213 )
5214 .expect("writer");
5215
5216 writer
5218 .submit(WriteRequest {
5219 label: "seed".to_owned(),
5220 nodes: vec![NodeInsert {
5221 row_id: "row-1".to_owned(),
5222 logical_id: "meeting-1".to_owned(),
5223 kind: "Meeting".to_owned(),
5224 properties: "{}".to_owned(),
5225 source_ref: Some("src-1".to_owned()),
5226 upsert: false,
5227 chunk_policy: ChunkPolicy::Preserve,
5228 content_ref: None,
5229 }],
5230 node_retires: vec![],
5231 edges: vec![],
5232 edge_retires: vec![],
5233 chunks: vec![],
5234 runs: vec![],
5235 steps: vec![],
5236 actions: vec![],
5237 optional_backfills: vec![],
5238 vec_inserts: vec![],
5239 operational_writes: vec![],
5240 })
5241 .expect("seed write");
5242
5243 let result = writer.submit(WriteRequest {
5245 label: "bad".to_owned(),
5246 nodes: vec![],
5247 node_retires: vec![NodeRetire {
5248 logical_id: "meeting-1".to_owned(),
5249 source_ref: Some("src-2".to_owned()),
5250 }],
5251 edges: vec![],
5252 edge_retires: vec![],
5253 chunks: vec![ChunkInsert {
5254 id: "chunk-bad".to_owned(),
5255 node_logical_id: "meeting-1".to_owned(),
5256 text_content: "some text".to_owned(),
5257 byte_start: None,
5258 byte_end: None,
5259 content_hash: None,
5260 }],
5261 runs: vec![],
5262 steps: vec![],
5263 actions: vec![],
5264 optional_backfills: vec![],
5265 vec_inserts: vec![],
5266 operational_writes: vec![],
5267 });
5268
5269 assert!(
5270 matches!(result, Err(EngineError::InvalidWrite(_))),
5271 "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
5272 );
5273 }
5274
5275 #[test]
5278 fn writer_batch_insert_multiple_nodes() {
5279 let db = NamedTempFile::new().expect("temporary db");
5280 let writer = WriterActor::start(
5281 db.path(),
5282 Arc::new(SchemaManager::new()),
5283 ProvenanceMode::Warn,
5284 Arc::new(TelemetryCounters::default()),
5285 )
5286 .expect("writer");
5287
5288 let nodes: Vec<NodeInsert> = (0..100)
5289 .map(|i| NodeInsert {
5290 row_id: format!("row-{i}"),
5291 logical_id: format!("lg-{i}"),
5292 kind: "Note".to_owned(),
5293 properties: "{}".to_owned(),
5294 source_ref: Some("batch-src".to_owned()),
5295 upsert: false,
5296 chunk_policy: ChunkPolicy::Preserve,
5297 content_ref: None,
5298 })
5299 .collect();
5300
5301 writer
5302 .submit(WriteRequest {
5303 label: "batch".to_owned(),
5304 nodes,
5305 node_retires: vec![],
5306 edges: vec![],
5307 edge_retires: vec![],
5308 chunks: vec![],
5309 runs: vec![],
5310 steps: vec![],
5311 actions: vec![],
5312 optional_backfills: vec![],
5313 vec_inserts: vec![],
5314 operational_writes: vec![],
5315 })
5316 .expect("batch write");
5317
5318 let conn = rusqlite::Connection::open(db.path()).expect("open");
5319 let count: i64 = conn
5320 .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
5321 .expect("count nodes");
5322 assert_eq!(
5323 count, 100,
5324 "all 100 nodes must be present after batch insert"
5325 );
5326 }
5327
5328 #[test]
5331 fn prepare_write_rejects_empty_node_row_id() {
5332 let db = NamedTempFile::new().expect("temporary db");
5333 let writer = WriterActor::start(
5334 db.path(),
5335 Arc::new(SchemaManager::new()),
5336 ProvenanceMode::Warn,
5337 Arc::new(TelemetryCounters::default()),
5338 )
5339 .expect("writer");
5340
5341 let result = writer.submit(WriteRequest {
5342 label: "test".to_owned(),
5343 nodes: vec![NodeInsert {
5344 row_id: String::new(),
5345 logical_id: "lg-1".to_owned(),
5346 kind: "Note".to_owned(),
5347 properties: "{}".to_owned(),
5348 source_ref: None,
5349 upsert: false,
5350 chunk_policy: ChunkPolicy::Preserve,
5351 content_ref: None,
5352 }],
5353 node_retires: vec![],
5354 edges: vec![],
5355 edge_retires: vec![],
5356 chunks: vec![],
5357 runs: vec![],
5358 steps: vec![],
5359 actions: vec![],
5360 optional_backfills: vec![],
5361 vec_inserts: vec![],
5362 operational_writes: vec![],
5363 });
5364
5365 assert!(
5366 matches!(result, Err(EngineError::InvalidWrite(_))),
5367 "empty row_id must be rejected"
5368 );
5369 }
5370
5371 #[test]
5372 fn prepare_write_rejects_empty_node_logical_id() {
5373 let db = NamedTempFile::new().expect("temporary db");
5374 let writer = WriterActor::start(
5375 db.path(),
5376 Arc::new(SchemaManager::new()),
5377 ProvenanceMode::Warn,
5378 Arc::new(TelemetryCounters::default()),
5379 )
5380 .expect("writer");
5381
5382 let result = writer.submit(WriteRequest {
5383 label: "test".to_owned(),
5384 nodes: vec![NodeInsert {
5385 row_id: "row-1".to_owned(),
5386 logical_id: String::new(),
5387 kind: "Note".to_owned(),
5388 properties: "{}".to_owned(),
5389 source_ref: None,
5390 upsert: false,
5391 chunk_policy: ChunkPolicy::Preserve,
5392 content_ref: None,
5393 }],
5394 node_retires: vec![],
5395 edges: vec![],
5396 edge_retires: vec![],
5397 chunks: vec![],
5398 runs: vec![],
5399 steps: vec![],
5400 actions: vec![],
5401 optional_backfills: vec![],
5402 vec_inserts: vec![],
5403 operational_writes: vec![],
5404 });
5405
5406 assert!(
5407 matches!(result, Err(EngineError::InvalidWrite(_))),
5408 "empty logical_id must be rejected"
5409 );
5410 }
5411
5412 #[test]
5413 fn prepare_write_rejects_duplicate_row_ids_in_request() {
5414 let db = NamedTempFile::new().expect("temporary db");
5415 let writer = WriterActor::start(
5416 db.path(),
5417 Arc::new(SchemaManager::new()),
5418 ProvenanceMode::Warn,
5419 Arc::new(TelemetryCounters::default()),
5420 )
5421 .expect("writer");
5422
5423 let result = writer.submit(WriteRequest {
5424 label: "test".to_owned(),
5425 nodes: vec![
5426 NodeInsert {
5427 row_id: "row-1".to_owned(),
5428 logical_id: "lg-1".to_owned(),
5429 kind: "Note".to_owned(),
5430 properties: "{}".to_owned(),
5431 source_ref: None,
5432 upsert: false,
5433 chunk_policy: ChunkPolicy::Preserve,
5434 content_ref: None,
5435 },
5436 NodeInsert {
5437 row_id: "row-1".to_owned(), logical_id: "lg-2".to_owned(),
5439 kind: "Note".to_owned(),
5440 properties: "{}".to_owned(),
5441 source_ref: None,
5442 upsert: false,
5443 chunk_policy: ChunkPolicy::Preserve,
5444 content_ref: None,
5445 },
5446 ],
5447 node_retires: vec![],
5448 edges: vec![],
5449 edge_retires: vec![],
5450 chunks: vec![],
5451 runs: vec![],
5452 steps: vec![],
5453 actions: vec![],
5454 optional_backfills: vec![],
5455 vec_inserts: vec![],
5456 operational_writes: vec![],
5457 });
5458
5459 assert!(
5460 matches!(result, Err(EngineError::InvalidWrite(_))),
5461 "duplicate row_id within request must be rejected"
5462 );
5463 }
5464
5465 #[test]
5466 fn prepare_write_rejects_empty_chunk_id() {
5467 let db = NamedTempFile::new().expect("temporary db");
5468 let writer = WriterActor::start(
5469 db.path(),
5470 Arc::new(SchemaManager::new()),
5471 ProvenanceMode::Warn,
5472 Arc::new(TelemetryCounters::default()),
5473 )
5474 .expect("writer");
5475
5476 let result = writer.submit(WriteRequest {
5477 label: "test".to_owned(),
5478 nodes: vec![NodeInsert {
5479 row_id: "row-1".to_owned(),
5480 logical_id: "lg-1".to_owned(),
5481 kind: "Note".to_owned(),
5482 properties: "{}".to_owned(),
5483 source_ref: None,
5484 upsert: false,
5485 chunk_policy: ChunkPolicy::Preserve,
5486 content_ref: None,
5487 }],
5488 node_retires: vec![],
5489 edges: vec![],
5490 edge_retires: vec![],
5491 chunks: vec![ChunkInsert {
5492 id: String::new(),
5493 node_logical_id: "lg-1".to_owned(),
5494 text_content: "some text".to_owned(),
5495 byte_start: None,
5496 byte_end: None,
5497 content_hash: None,
5498 }],
5499 runs: vec![],
5500 steps: vec![],
5501 actions: vec![],
5502 optional_backfills: vec![],
5503 vec_inserts: vec![],
5504 operational_writes: vec![],
5505 });
5506
5507 assert!(
5508 matches!(result, Err(EngineError::InvalidWrite(_))),
5509 "empty chunk id must be rejected"
5510 );
5511 }
5512
5513 #[test]
5516 fn writer_receipt_warns_on_step_without_source_ref() {
5517 let db = NamedTempFile::new().expect("temporary db");
5518 let writer = WriterActor::start(
5519 db.path(),
5520 Arc::new(SchemaManager::new()),
5521 ProvenanceMode::Warn,
5522 Arc::new(TelemetryCounters::default()),
5523 )
5524 .expect("writer");
5525
5526 writer
5528 .submit(WriteRequest {
5529 label: "seed-run".to_owned(),
5530 nodes: vec![],
5531 node_retires: vec![],
5532 edges: vec![],
5533 edge_retires: vec![],
5534 chunks: vec![],
5535 runs: vec![RunInsert {
5536 id: "run-1".to_owned(),
5537 kind: "session".to_owned(),
5538 status: "active".to_owned(),
5539 properties: "{}".to_owned(),
5540 source_ref: Some("src-1".to_owned()),
5541 upsert: false,
5542 supersedes_id: None,
5543 }],
5544 steps: vec![],
5545 actions: vec![],
5546 optional_backfills: vec![],
5547 vec_inserts: vec![],
5548 operational_writes: vec![],
5549 })
5550 .expect("seed run");
5551
5552 let receipt = writer
5553 .submit(WriteRequest {
5554 label: "test".to_owned(),
5555 nodes: vec![],
5556 node_retires: vec![],
5557 edges: vec![],
5558 edge_retires: vec![],
5559 chunks: vec![],
5560 runs: vec![],
5561 steps: vec![StepInsert {
5562 id: "step-1".to_owned(),
5563 run_id: "run-1".to_owned(),
5564 kind: "llm_call".to_owned(),
5565 status: "completed".to_owned(),
5566 properties: "{}".to_owned(),
5567 source_ref: None,
5568 upsert: false,
5569 supersedes_id: None,
5570 }],
5571 actions: vec![],
5572 optional_backfills: vec![],
5573 vec_inserts: vec![],
5574 operational_writes: vec![],
5575 })
5576 .expect("write");
5577
5578 assert!(
5579 !receipt.provenance_warnings.is_empty(),
5580 "step insert without source_ref must emit a provenance warning"
5581 );
5582 }
5583
5584 #[test]
5585 fn writer_receipt_warns_on_action_without_source_ref() {
5586 let db = NamedTempFile::new().expect("temporary db");
5587 let writer = WriterActor::start(
5588 db.path(),
5589 Arc::new(SchemaManager::new()),
5590 ProvenanceMode::Warn,
5591 Arc::new(TelemetryCounters::default()),
5592 )
5593 .expect("writer");
5594
5595 writer
5597 .submit(WriteRequest {
5598 label: "seed".to_owned(),
5599 nodes: vec![],
5600 node_retires: vec![],
5601 edges: vec![],
5602 edge_retires: vec![],
5603 chunks: vec![],
5604 runs: vec![RunInsert {
5605 id: "run-1".to_owned(),
5606 kind: "session".to_owned(),
5607 status: "active".to_owned(),
5608 properties: "{}".to_owned(),
5609 source_ref: Some("src-1".to_owned()),
5610 upsert: false,
5611 supersedes_id: None,
5612 }],
5613 steps: vec![StepInsert {
5614 id: "step-1".to_owned(),
5615 run_id: "run-1".to_owned(),
5616 kind: "llm_call".to_owned(),
5617 status: "completed".to_owned(),
5618 properties: "{}".to_owned(),
5619 source_ref: Some("src-1".to_owned()),
5620 upsert: false,
5621 supersedes_id: None,
5622 }],
5623 actions: vec![],
5624 optional_backfills: vec![],
5625 vec_inserts: vec![],
5626 operational_writes: vec![],
5627 })
5628 .expect("seed");
5629
5630 let receipt = writer
5631 .submit(WriteRequest {
5632 label: "test".to_owned(),
5633 nodes: vec![],
5634 node_retires: vec![],
5635 edges: vec![],
5636 edge_retires: vec![],
5637 chunks: vec![],
5638 runs: vec![],
5639 steps: vec![],
5640 actions: vec![ActionInsert {
5641 id: "action-1".to_owned(),
5642 step_id: "step-1".to_owned(),
5643 kind: "tool_call".to_owned(),
5644 status: "completed".to_owned(),
5645 properties: "{}".to_owned(),
5646 source_ref: None,
5647 upsert: false,
5648 supersedes_id: None,
5649 }],
5650 optional_backfills: vec![],
5651 vec_inserts: vec![],
5652 operational_writes: vec![],
5653 })
5654 .expect("write");
5655
5656 assert!(
5657 !receipt.provenance_warnings.is_empty(),
5658 "action insert without source_ref must emit a provenance warning"
5659 );
5660 }
5661
5662 #[test]
5663 fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5664 let db = NamedTempFile::new().expect("temporary db");
5665 let writer = WriterActor::start(
5666 db.path(),
5667 Arc::new(SchemaManager::new()),
5668 ProvenanceMode::Warn,
5669 Arc::new(TelemetryCounters::default()),
5670 )
5671 .expect("writer");
5672
5673 let receipt = writer
5674 .submit(WriteRequest {
5675 label: "test".to_owned(),
5676 nodes: vec![NodeInsert {
5677 row_id: "row-1".to_owned(),
5678 logical_id: "node-1".to_owned(),
5679 kind: "Note".to_owned(),
5680 properties: "{}".to_owned(),
5681 source_ref: Some("src-1".to_owned()),
5682 upsert: false,
5683 chunk_policy: ChunkPolicy::Preserve,
5684 content_ref: None,
5685 }],
5686 node_retires: vec![],
5687 edges: vec![],
5688 edge_retires: vec![],
5689 chunks: vec![],
5690 runs: vec![RunInsert {
5691 id: "run-1".to_owned(),
5692 kind: "session".to_owned(),
5693 status: "active".to_owned(),
5694 properties: "{}".to_owned(),
5695 source_ref: Some("src-1".to_owned()),
5696 upsert: false,
5697 supersedes_id: None,
5698 }],
5699 steps: vec![StepInsert {
5700 id: "step-1".to_owned(),
5701 run_id: "run-1".to_owned(),
5702 kind: "llm_call".to_owned(),
5703 status: "completed".to_owned(),
5704 properties: "{}".to_owned(),
5705 source_ref: Some("src-1".to_owned()),
5706 upsert: false,
5707 supersedes_id: None,
5708 }],
5709 actions: vec![ActionInsert {
5710 id: "action-1".to_owned(),
5711 step_id: "step-1".to_owned(),
5712 kind: "tool_call".to_owned(),
5713 status: "completed".to_owned(),
5714 properties: "{}".to_owned(),
5715 source_ref: Some("src-1".to_owned()),
5716 upsert: false,
5717 supersedes_id: None,
5718 }],
5719 optional_backfills: vec![],
5720 vec_inserts: vec![],
5721 operational_writes: vec![],
5722 })
5723 .expect("write");
5724
5725 assert!(
5726 receipt.provenance_warnings.is_empty(),
5727 "no warnings expected when all types have source_ref; got: {:?}",
5728 receipt.provenance_warnings
5729 );
5730 }
5731
5732 #[test]
5735 fn default_provenance_mode_is_warn() {
5736 let db = NamedTempFile::new().expect("temporary db");
5738 let writer = WriterActor::start(
5739 db.path(),
5740 Arc::new(SchemaManager::new()),
5741 ProvenanceMode::default(),
5742 Arc::new(TelemetryCounters::default()),
5743 )
5744 .expect("writer");
5745
5746 let receipt = writer
5747 .submit(WriteRequest {
5748 label: "test".to_owned(),
5749 nodes: vec![NodeInsert {
5750 row_id: "row-1".to_owned(),
5751 logical_id: "node-1".to_owned(),
5752 kind: "Note".to_owned(),
5753 properties: "{}".to_owned(),
5754 source_ref: None,
5755 upsert: false,
5756 chunk_policy: ChunkPolicy::Preserve,
5757 content_ref: None,
5758 }],
5759 node_retires: vec![],
5760 edges: vec![],
5761 edge_retires: vec![],
5762 chunks: vec![],
5763 runs: vec![],
5764 steps: vec![],
5765 actions: vec![],
5766 optional_backfills: vec![],
5767 vec_inserts: vec![],
5768 operational_writes: vec![],
5769 })
5770 .expect("Warn mode must not reject missing source_ref");
5771
5772 assert!(
5773 !receipt.provenance_warnings.is_empty(),
5774 "Warn mode must emit a warning instead of rejecting"
5775 );
5776 }
5777
5778 #[test]
5779 fn require_mode_rejects_node_without_source_ref() {
5780 let db = NamedTempFile::new().expect("temporary db");
5781 let writer = WriterActor::start(
5782 db.path(),
5783 Arc::new(SchemaManager::new()),
5784 ProvenanceMode::Require,
5785 Arc::new(TelemetryCounters::default()),
5786 )
5787 .expect("writer");
5788
5789 let result = writer.submit(WriteRequest {
5790 label: "test".to_owned(),
5791 nodes: vec![NodeInsert {
5792 row_id: "row-1".to_owned(),
5793 logical_id: "node-1".to_owned(),
5794 kind: "Note".to_owned(),
5795 properties: "{}".to_owned(),
5796 source_ref: None,
5797 upsert: false,
5798 chunk_policy: ChunkPolicy::Preserve,
5799 content_ref: None,
5800 }],
5801 node_retires: vec![],
5802 edges: vec![],
5803 edge_retires: vec![],
5804 chunks: vec![],
5805 runs: vec![],
5806 steps: vec![],
5807 actions: vec![],
5808 optional_backfills: vec![],
5809 vec_inserts: vec![],
5810 operational_writes: vec![],
5811 });
5812
5813 assert!(
5814 matches!(result, Err(EngineError::InvalidWrite(_))),
5815 "Require mode must reject node without source_ref"
5816 );
5817 }
5818
5819 #[test]
5820 fn require_mode_accepts_node_with_source_ref() {
5821 let db = NamedTempFile::new().expect("temporary db");
5822 let writer = WriterActor::start(
5823 db.path(),
5824 Arc::new(SchemaManager::new()),
5825 ProvenanceMode::Require,
5826 Arc::new(TelemetryCounters::default()),
5827 )
5828 .expect("writer");
5829
5830 let result = writer.submit(WriteRequest {
5831 label: "test".to_owned(),
5832 nodes: vec![NodeInsert {
5833 row_id: "row-1".to_owned(),
5834 logical_id: "node-1".to_owned(),
5835 kind: "Note".to_owned(),
5836 properties: "{}".to_owned(),
5837 source_ref: Some("src-1".to_owned()),
5838 upsert: false,
5839 chunk_policy: ChunkPolicy::Preserve,
5840 content_ref: None,
5841 }],
5842 node_retires: vec![],
5843 edges: vec![],
5844 edge_retires: vec![],
5845 chunks: vec![],
5846 runs: vec![],
5847 steps: vec![],
5848 actions: vec![],
5849 optional_backfills: vec![],
5850 vec_inserts: vec![],
5851 operational_writes: vec![],
5852 });
5853
5854 assert!(
5855 result.is_ok(),
5856 "Require mode must accept node with source_ref"
5857 );
5858 }
5859
5860 #[test]
5861 fn require_mode_rejects_edge_without_source_ref() {
5862 let db = NamedTempFile::new().expect("temporary db");
5863 let writer = WriterActor::start(
5864 db.path(),
5865 Arc::new(SchemaManager::new()),
5866 ProvenanceMode::Require,
5867 Arc::new(TelemetryCounters::default()),
5868 )
5869 .expect("writer");
5870
5871 let result = writer.submit(WriteRequest {
5873 label: "test".to_owned(),
5874 nodes: vec![
5875 NodeInsert {
5876 row_id: "row-a".to_owned(),
5877 logical_id: "node-a".to_owned(),
5878 kind: "Note".to_owned(),
5879 properties: "{}".to_owned(),
5880 source_ref: Some("src-1".to_owned()),
5881 upsert: false,
5882 chunk_policy: ChunkPolicy::Preserve,
5883 content_ref: None,
5884 },
5885 NodeInsert {
5886 row_id: "row-b".to_owned(),
5887 logical_id: "node-b".to_owned(),
5888 kind: "Note".to_owned(),
5889 properties: "{}".to_owned(),
5890 source_ref: Some("src-1".to_owned()),
5891 upsert: false,
5892 chunk_policy: ChunkPolicy::Preserve,
5893 content_ref: None,
5894 },
5895 ],
5896 node_retires: vec![],
5897 edges: vec![EdgeInsert {
5898 row_id: "edge-row-1".to_owned(),
5899 logical_id: "edge-1".to_owned(),
5900 source_logical_id: "node-a".to_owned(),
5901 target_logical_id: "node-b".to_owned(),
5902 kind: "LINKS_TO".to_owned(),
5903 properties: "{}".to_owned(),
5904 source_ref: None,
5905 upsert: false,
5906 }],
5907 edge_retires: vec![],
5908 chunks: vec![],
5909 runs: vec![],
5910 steps: vec![],
5911 actions: vec![],
5912 optional_backfills: vec![],
5913 vec_inserts: vec![],
5914 operational_writes: vec![],
5915 });
5916
5917 assert!(
5918 matches!(result, Err(EngineError::InvalidWrite(_))),
5919 "Require mode must reject edge without source_ref"
5920 );
5921 }
5922
5923 #[test]
5926 fn fts_row_has_correct_kind_from_co_submitted_node() {
5927 let db = NamedTempFile::new().expect("temporary db");
5928 let writer = WriterActor::start(
5929 db.path(),
5930 Arc::new(SchemaManager::new()),
5931 ProvenanceMode::Warn,
5932 Arc::new(TelemetryCounters::default()),
5933 )
5934 .expect("writer");
5935
5936 writer
5937 .submit(WriteRequest {
5938 label: "test".to_owned(),
5939 nodes: vec![NodeInsert {
5940 row_id: "row-1".to_owned(),
5941 logical_id: "node-1".to_owned(),
5942 kind: "Meeting".to_owned(),
5943 properties: "{}".to_owned(),
5944 source_ref: Some("src-1".to_owned()),
5945 upsert: false,
5946 chunk_policy: ChunkPolicy::Preserve,
5947 content_ref: None,
5948 }],
5949 node_retires: vec![],
5950 edges: vec![],
5951 edge_retires: vec![],
5952 chunks: vec![ChunkInsert {
5953 id: "chunk-1".to_owned(),
5954 node_logical_id: "node-1".to_owned(),
5955 text_content: "some text".to_owned(),
5956 byte_start: None,
5957 byte_end: None,
5958 content_hash: None,
5959 }],
5960 runs: vec![],
5961 steps: vec![],
5962 actions: vec![],
5963 optional_backfills: vec![],
5964 vec_inserts: vec![],
5965 operational_writes: vec![],
5966 })
5967 .expect("write");
5968
5969 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5970 let kind: String = conn
5971 .query_row(
5972 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5973 [],
5974 |row| row.get(0),
5975 )
5976 .expect("fts row");
5977
5978 assert_eq!(kind, "Meeting");
5979 }
5980
5981 #[test]
5982 fn fts_row_has_correct_text_content() {
5983 let db = NamedTempFile::new().expect("temporary db");
5984 let writer = WriterActor::start(
5985 db.path(),
5986 Arc::new(SchemaManager::new()),
5987 ProvenanceMode::Warn,
5988 Arc::new(TelemetryCounters::default()),
5989 )
5990 .expect("writer");
5991
5992 writer
5993 .submit(WriteRequest {
5994 label: "test".to_owned(),
5995 nodes: vec![NodeInsert {
5996 row_id: "row-1".to_owned(),
5997 logical_id: "node-1".to_owned(),
5998 kind: "Note".to_owned(),
5999 properties: "{}".to_owned(),
6000 source_ref: Some("src-1".to_owned()),
6001 upsert: false,
6002 chunk_policy: ChunkPolicy::Preserve,
6003 content_ref: None,
6004 }],
6005 node_retires: vec![],
6006 edges: vec![],
6007 edge_retires: vec![],
6008 chunks: vec![ChunkInsert {
6009 id: "chunk-1".to_owned(),
6010 node_logical_id: "node-1".to_owned(),
6011 text_content: "exactly this text".to_owned(),
6012 byte_start: None,
6013 byte_end: None,
6014 content_hash: None,
6015 }],
6016 runs: vec![],
6017 steps: vec![],
6018 actions: vec![],
6019 optional_backfills: vec![],
6020 vec_inserts: vec![],
6021 operational_writes: vec![],
6022 })
6023 .expect("write");
6024
6025 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6026 let text: String = conn
6027 .query_row(
6028 "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
6029 [],
6030 |row| row.get(0),
6031 )
6032 .expect("fts row");
6033
6034 assert_eq!(text, "exactly this text");
6035 }
6036
6037 #[test]
6038 fn fts_row_has_correct_kind_from_pre_existing_node() {
6039 let db = NamedTempFile::new().expect("temporary db");
6040 let writer = WriterActor::start(
6041 db.path(),
6042 Arc::new(SchemaManager::new()),
6043 ProvenanceMode::Warn,
6044 Arc::new(TelemetryCounters::default()),
6045 )
6046 .expect("writer");
6047
6048 writer
6050 .submit(WriteRequest {
6051 label: "r1".to_owned(),
6052 nodes: vec![NodeInsert {
6053 row_id: "row-1".to_owned(),
6054 logical_id: "node-1".to_owned(),
6055 kind: "Document".to_owned(),
6056 properties: "{}".to_owned(),
6057 source_ref: Some("src-1".to_owned()),
6058 upsert: false,
6059 chunk_policy: ChunkPolicy::Preserve,
6060 content_ref: None,
6061 }],
6062 node_retires: vec![],
6063 edges: vec![],
6064 edge_retires: vec![],
6065 chunks: vec![],
6066 runs: vec![],
6067 steps: vec![],
6068 actions: vec![],
6069 optional_backfills: vec![],
6070 vec_inserts: vec![],
6071 operational_writes: vec![],
6072 })
6073 .expect("r1 write");
6074
6075 writer
6077 .submit(WriteRequest {
6078 label: "r2".to_owned(),
6079 nodes: vec![],
6080 node_retires: vec![],
6081 edges: vec![],
6082 edge_retires: vec![],
6083 chunks: vec![ChunkInsert {
6084 id: "chunk-1".to_owned(),
6085 node_logical_id: "node-1".to_owned(),
6086 text_content: "some text".to_owned(),
6087 byte_start: None,
6088 byte_end: None,
6089 content_hash: None,
6090 }],
6091 runs: vec![],
6092 steps: vec![],
6093 actions: vec![],
6094 optional_backfills: vec![],
6095 vec_inserts: vec![],
6096 operational_writes: vec![],
6097 })
6098 .expect("r2 write");
6099
6100 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6101 let kind: String = conn
6102 .query_row(
6103 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
6104 [],
6105 |row| row.get(0),
6106 )
6107 .expect("fts row");
6108
6109 assert_eq!(kind, "Document");
6110 }
6111
6112 #[test]
6113 fn fts_derives_rows_for_multiple_chunks_per_node() {
6114 let db = NamedTempFile::new().expect("temporary db");
6115 let writer = WriterActor::start(
6116 db.path(),
6117 Arc::new(SchemaManager::new()),
6118 ProvenanceMode::Warn,
6119 Arc::new(TelemetryCounters::default()),
6120 )
6121 .expect("writer");
6122
6123 writer
6124 .submit(WriteRequest {
6125 label: "test".to_owned(),
6126 nodes: vec![NodeInsert {
6127 row_id: "row-1".to_owned(),
6128 logical_id: "node-1".to_owned(),
6129 kind: "Meeting".to_owned(),
6130 properties: "{}".to_owned(),
6131 source_ref: Some("src-1".to_owned()),
6132 upsert: false,
6133 chunk_policy: ChunkPolicy::Preserve,
6134 content_ref: None,
6135 }],
6136 node_retires: vec![],
6137 edges: vec![],
6138 edge_retires: vec![],
6139 chunks: vec![
6140 ChunkInsert {
6141 id: "chunk-a".to_owned(),
6142 node_logical_id: "node-1".to_owned(),
6143 text_content: "intro".to_owned(),
6144 byte_start: None,
6145 byte_end: None,
6146 content_hash: None,
6147 },
6148 ChunkInsert {
6149 id: "chunk-b".to_owned(),
6150 node_logical_id: "node-1".to_owned(),
6151 text_content: "body".to_owned(),
6152 byte_start: None,
6153 byte_end: None,
6154 content_hash: None,
6155 },
6156 ChunkInsert {
6157 id: "chunk-c".to_owned(),
6158 node_logical_id: "node-1".to_owned(),
6159 text_content: "conclusion".to_owned(),
6160 byte_start: None,
6161 byte_end: None,
6162 content_hash: None,
6163 },
6164 ],
6165 runs: vec![],
6166 steps: vec![],
6167 actions: vec![],
6168 optional_backfills: vec![],
6169 vec_inserts: vec![],
6170 operational_writes: vec![],
6171 })
6172 .expect("write");
6173
6174 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6175 let count: i64 = conn
6176 .query_row(
6177 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6178 [],
6179 |row| row.get(0),
6180 )
6181 .expect("fts count");
6182
6183 assert_eq!(count, 3, "three chunks must produce three FTS rows");
6184 }
6185
6186 #[test]
6187 fn fts_resolves_mixed_fast_and_db_paths() {
6188 let db = NamedTempFile::new().expect("temporary db");
6189 let writer = WriterActor::start(
6190 db.path(),
6191 Arc::new(SchemaManager::new()),
6192 ProvenanceMode::Warn,
6193 Arc::new(TelemetryCounters::default()),
6194 )
6195 .expect("writer");
6196
6197 writer
6199 .submit(WriteRequest {
6200 label: "seed".to_owned(),
6201 nodes: vec![NodeInsert {
6202 row_id: "row-existing".to_owned(),
6203 logical_id: "existing-node".to_owned(),
6204 kind: "Archive".to_owned(),
6205 properties: "{}".to_owned(),
6206 source_ref: Some("src-1".to_owned()),
6207 upsert: false,
6208 chunk_policy: ChunkPolicy::Preserve,
6209 content_ref: None,
6210 }],
6211 node_retires: vec![],
6212 edges: vec![],
6213 edge_retires: vec![],
6214 chunks: vec![],
6215 runs: vec![],
6216 steps: vec![],
6217 actions: vec![],
6218 optional_backfills: vec![],
6219 vec_inserts: vec![],
6220 operational_writes: vec![],
6221 })
6222 .expect("seed");
6223
6224 writer
6226 .submit(WriteRequest {
6227 label: "mixed".to_owned(),
6228 nodes: vec![NodeInsert {
6229 row_id: "row-new".to_owned(),
6230 logical_id: "new-node".to_owned(),
6231 kind: "Inbox".to_owned(),
6232 properties: "{}".to_owned(),
6233 source_ref: Some("src-2".to_owned()),
6234 upsert: false,
6235 chunk_policy: ChunkPolicy::Preserve,
6236 content_ref: None,
6237 }],
6238 node_retires: vec![],
6239 edges: vec![],
6240 edge_retires: vec![],
6241 chunks: vec![
6242 ChunkInsert {
6243 id: "chunk-fast".to_owned(),
6244 node_logical_id: "new-node".to_owned(),
6245 text_content: "new content".to_owned(),
6246 byte_start: None,
6247 byte_end: None,
6248 content_hash: None,
6249 },
6250 ChunkInsert {
6251 id: "chunk-db".to_owned(),
6252 node_logical_id: "existing-node".to_owned(),
6253 text_content: "archive content".to_owned(),
6254 byte_start: None,
6255 byte_end: None,
6256 content_hash: None,
6257 },
6258 ],
6259 runs: vec![],
6260 steps: vec![],
6261 actions: vec![],
6262 optional_backfills: vec![],
6263 vec_inserts: vec![],
6264 operational_writes: vec![],
6265 })
6266 .expect("mixed write");
6267
6268 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6269 let fast_kind: String = conn
6270 .query_row(
6271 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
6272 [],
6273 |row| row.get(0),
6274 )
6275 .expect("fast path fts row");
6276 let db_kind: String = conn
6277 .query_row(
6278 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
6279 [],
6280 |row| row.get(0),
6281 )
6282 .expect("db path fts row");
6283
6284 assert_eq!(fast_kind, "Inbox");
6285 assert_eq!(db_kind, "Archive");
6286 }
6287
6288 #[test]
6289 fn prepare_write_rejects_empty_chunk_text() {
6290 let db = NamedTempFile::new().expect("temporary db");
6291 let writer = WriterActor::start(
6292 db.path(),
6293 Arc::new(SchemaManager::new()),
6294 ProvenanceMode::Warn,
6295 Arc::new(TelemetryCounters::default()),
6296 )
6297 .expect("writer");
6298
6299 let result = writer.submit(WriteRequest {
6300 label: "test".to_owned(),
6301 nodes: vec![NodeInsert {
6302 row_id: "row-1".to_owned(),
6303 logical_id: "node-1".to_owned(),
6304 kind: "Note".to_owned(),
6305 properties: "{}".to_owned(),
6306 source_ref: None,
6307 upsert: false,
6308 chunk_policy: ChunkPolicy::Preserve,
6309 content_ref: None,
6310 }],
6311 node_retires: vec![],
6312 edges: vec![],
6313 edge_retires: vec![],
6314 chunks: vec![ChunkInsert {
6315 id: "chunk-1".to_owned(),
6316 node_logical_id: "node-1".to_owned(),
6317 text_content: String::new(),
6318 byte_start: None,
6319 byte_end: None,
6320 content_hash: None,
6321 }],
6322 runs: vec![],
6323 steps: vec![],
6324 actions: vec![],
6325 optional_backfills: vec![],
6326 vec_inserts: vec![],
6327 operational_writes: vec![],
6328 });
6329
6330 assert!(
6331 matches!(result, Err(EngineError::InvalidWrite(_))),
6332 "empty text_content must be rejected"
6333 );
6334 }
6335
6336 #[test]
6337 fn receipt_reports_zero_backfills_when_none_submitted() {
6338 let db = NamedTempFile::new().expect("temporary db");
6339 let writer = WriterActor::start(
6340 db.path(),
6341 Arc::new(SchemaManager::new()),
6342 ProvenanceMode::Warn,
6343 Arc::new(TelemetryCounters::default()),
6344 )
6345 .expect("writer");
6346
6347 let receipt = writer
6348 .submit(WriteRequest {
6349 label: "test".to_owned(),
6350 nodes: vec![NodeInsert {
6351 row_id: "row-1".to_owned(),
6352 logical_id: "node-1".to_owned(),
6353 kind: "Note".to_owned(),
6354 properties: "{}".to_owned(),
6355 source_ref: Some("src-1".to_owned()),
6356 upsert: false,
6357 chunk_policy: ChunkPolicy::Preserve,
6358 content_ref: None,
6359 }],
6360 node_retires: vec![],
6361 edges: vec![],
6362 edge_retires: vec![],
6363 chunks: vec![],
6364 runs: vec![],
6365 steps: vec![],
6366 actions: vec![],
6367 optional_backfills: vec![],
6368 vec_inserts: vec![],
6369 operational_writes: vec![],
6370 })
6371 .expect("write");
6372
6373 assert_eq!(receipt.optional_backfill_count, 0);
6374 }
6375
6376 #[test]
6377 fn receipt_reports_correct_backfill_count() {
6378 let db = NamedTempFile::new().expect("temporary db");
6379 let writer = WriterActor::start(
6380 db.path(),
6381 Arc::new(SchemaManager::new()),
6382 ProvenanceMode::Warn,
6383 Arc::new(TelemetryCounters::default()),
6384 )
6385 .expect("writer");
6386
6387 let receipt = writer
6388 .submit(WriteRequest {
6389 label: "test".to_owned(),
6390 nodes: vec![NodeInsert {
6391 row_id: "row-1".to_owned(),
6392 logical_id: "node-1".to_owned(),
6393 kind: "Note".to_owned(),
6394 properties: "{}".to_owned(),
6395 source_ref: Some("src-1".to_owned()),
6396 upsert: false,
6397 chunk_policy: ChunkPolicy::Preserve,
6398 content_ref: None,
6399 }],
6400 node_retires: vec![],
6401 edges: vec![],
6402 edge_retires: vec![],
6403 chunks: vec![],
6404 runs: vec![],
6405 steps: vec![],
6406 actions: vec![],
6407 optional_backfills: vec![
6408 OptionalProjectionTask {
6409 target: ProjectionTarget::Fts,
6410 payload: "p1".to_owned(),
6411 },
6412 OptionalProjectionTask {
6413 target: ProjectionTarget::Vec,
6414 payload: "p2".to_owned(),
6415 },
6416 OptionalProjectionTask {
6417 target: ProjectionTarget::All,
6418 payload: "p3".to_owned(),
6419 },
6420 ],
6421 vec_inserts: vec![],
6422 operational_writes: vec![],
6423 })
6424 .expect("write");
6425
6426 assert_eq!(receipt.optional_backfill_count, 3);
6427 }
6428
6429 #[test]
6430 fn backfill_tasks_are_not_executed_during_write() {
6431 let db = NamedTempFile::new().expect("temporary db");
6432 let writer = WriterActor::start(
6433 db.path(),
6434 Arc::new(SchemaManager::new()),
6435 ProvenanceMode::Warn,
6436 Arc::new(TelemetryCounters::default()),
6437 )
6438 .expect("writer");
6439
6440 writer
6443 .submit(WriteRequest {
6444 label: "test".to_owned(),
6445 nodes: vec![NodeInsert {
6446 row_id: "row-1".to_owned(),
6447 logical_id: "node-1".to_owned(),
6448 kind: "Note".to_owned(),
6449 properties: "{}".to_owned(),
6450 source_ref: Some("src-1".to_owned()),
6451 upsert: false,
6452 chunk_policy: ChunkPolicy::Preserve,
6453 content_ref: None,
6454 }],
6455 node_retires: vec![],
6456 edges: vec![],
6457 edge_retires: vec![],
6458 chunks: vec![ChunkInsert {
6459 id: "chunk-1".to_owned(),
6460 node_logical_id: "node-1".to_owned(),
6461 text_content: "required text".to_owned(),
6462 byte_start: None,
6463 byte_end: None,
6464 content_hash: None,
6465 }],
6466 runs: vec![],
6467 steps: vec![],
6468 actions: vec![],
6469 optional_backfills: vec![OptionalProjectionTask {
6470 target: ProjectionTarget::Fts,
6471 payload: "backfill-payload".to_owned(),
6472 }],
6473 vec_inserts: vec![],
6474 operational_writes: vec![],
6475 })
6476 .expect("write");
6477
6478 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6479 let count: i64 = conn
6480 .query_row(
6481 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6482 [],
6483 |row| row.get(0),
6484 )
6485 .expect("fts count");
6486
6487 assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6488 }
6489
6490 #[test]
6491 fn fts_row_uses_new_kind_after_node_replace() {
6492 let db = NamedTempFile::new().expect("temporary db");
6493 let writer = WriterActor::start(
6494 db.path(),
6495 Arc::new(SchemaManager::new()),
6496 ProvenanceMode::Warn,
6497 Arc::new(TelemetryCounters::default()),
6498 )
6499 .expect("writer");
6500
6501 writer
6503 .submit(WriteRequest {
6504 label: "v1".to_owned(),
6505 nodes: vec![NodeInsert {
6506 row_id: "row-1".to_owned(),
6507 logical_id: "node-1".to_owned(),
6508 kind: "Note".to_owned(),
6509 properties: "{}".to_owned(),
6510 source_ref: Some("src-1".to_owned()),
6511 upsert: false,
6512 chunk_policy: ChunkPolicy::Preserve,
6513 content_ref: None,
6514 }],
6515 node_retires: vec![],
6516 edges: vec![],
6517 edge_retires: vec![],
6518 chunks: vec![ChunkInsert {
6519 id: "chunk-v1".to_owned(),
6520 node_logical_id: "node-1".to_owned(),
6521 text_content: "original".to_owned(),
6522 byte_start: None,
6523 byte_end: None,
6524 content_hash: None,
6525 }],
6526 runs: vec![],
6527 steps: vec![],
6528 actions: vec![],
6529 optional_backfills: vec![],
6530 vec_inserts: vec![],
6531 operational_writes: vec![],
6532 })
6533 .expect("v1 write");
6534
6535 writer
6537 .submit(WriteRequest {
6538 label: "v2".to_owned(),
6539 nodes: vec![NodeInsert {
6540 row_id: "row-2".to_owned(),
6541 logical_id: "node-1".to_owned(),
6542 kind: "Meeting".to_owned(),
6543 properties: "{}".to_owned(),
6544 source_ref: Some("src-2".to_owned()),
6545 upsert: true,
6546 chunk_policy: ChunkPolicy::Replace,
6547 content_ref: None,
6548 }],
6549 node_retires: vec![],
6550 edges: vec![],
6551 edge_retires: vec![],
6552 chunks: vec![ChunkInsert {
6553 id: "chunk-v2".to_owned(),
6554 node_logical_id: "node-1".to_owned(),
6555 text_content: "updated".to_owned(),
6556 byte_start: None,
6557 byte_end: None,
6558 content_hash: None,
6559 }],
6560 runs: vec![],
6561 steps: vec![],
6562 actions: vec![],
6563 optional_backfills: vec![],
6564 vec_inserts: vec![],
6565 operational_writes: vec![],
6566 })
6567 .expect("v2 write");
6568
6569 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6570
6571 let old_count: i64 = conn
6573 .query_row(
6574 "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6575 [],
6576 |row| row.get(0),
6577 )
6578 .expect("old fts count");
6579 assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6580
6581 let new_kind: String = conn
6583 .query_row(
6584 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6585 [],
6586 |row| row.get(0),
6587 )
6588 .expect("new fts row");
6589 assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6590 }
6591
6592 #[test]
6595 fn vec_insert_empty_chunk_id_is_rejected() {
6596 let db = NamedTempFile::new().expect("temporary db");
6597 let writer = WriterActor::start(
6598 db.path(),
6599 Arc::new(SchemaManager::new()),
6600 ProvenanceMode::Warn,
6601 Arc::new(TelemetryCounters::default()),
6602 )
6603 .expect("writer");
6604 let result = writer.submit(WriteRequest {
6605 label: "vec-test".to_owned(),
6606 nodes: vec![],
6607 node_retires: vec![],
6608 edges: vec![],
6609 edge_retires: vec![],
6610 chunks: vec![],
6611 runs: vec![],
6612 steps: vec![],
6613 actions: vec![],
6614 optional_backfills: vec![],
6615 vec_inserts: vec![VecInsert {
6616 chunk_id: String::new(),
6617 embedding: vec![0.1, 0.2, 0.3],
6618 }],
6619 operational_writes: vec![],
6620 });
6621 assert!(
6622 matches!(result, Err(EngineError::InvalidWrite(_))),
6623 "empty chunk_id in VecInsert must be rejected"
6624 );
6625 }
6626
6627 #[test]
6628 fn vec_insert_empty_embedding_is_rejected() {
6629 let db = NamedTempFile::new().expect("temporary db");
6630 let writer = WriterActor::start(
6631 db.path(),
6632 Arc::new(SchemaManager::new()),
6633 ProvenanceMode::Warn,
6634 Arc::new(TelemetryCounters::default()),
6635 )
6636 .expect("writer");
6637 let result = writer.submit(WriteRequest {
6638 label: "vec-test".to_owned(),
6639 nodes: vec![],
6640 node_retires: vec![],
6641 edges: vec![],
6642 edge_retires: vec![],
6643 chunks: vec![],
6644 runs: vec![],
6645 steps: vec![],
6646 actions: vec![],
6647 optional_backfills: vec![],
6648 vec_inserts: vec![VecInsert {
6649 chunk_id: "chunk-1".to_owned(),
6650 embedding: vec![],
6651 }],
6652 operational_writes: vec![],
6653 });
6654 assert!(
6655 matches!(result, Err(EngineError::InvalidWrite(_))),
6656 "empty embedding in VecInsert must be rejected"
6657 );
6658 }
6659
6660 #[test]
6661 fn vec_insert_noop_without_feature() {
6662 let db = NamedTempFile::new().expect("temporary db");
6665 let writer = WriterActor::start(
6666 db.path(),
6667 Arc::new(SchemaManager::new()),
6668 ProvenanceMode::Warn,
6669 Arc::new(TelemetryCounters::default()),
6670 )
6671 .expect("writer");
6672 let result = writer.submit(WriteRequest {
6673 label: "vec-noop".to_owned(),
6674 nodes: vec![],
6675 node_retires: vec![],
6676 edges: vec![],
6677 edge_retires: vec![],
6678 chunks: vec![],
6679 runs: vec![],
6680 steps: vec![],
6681 actions: vec![],
6682 optional_backfills: vec![],
6683 vec_inserts: vec![VecInsert {
6684 chunk_id: "chunk-noop".to_owned(),
6685 embedding: vec![1.0, 2.0, 3.0],
6686 }],
6687 operational_writes: vec![],
6688 });
6689 #[cfg(not(feature = "sqlite-vec"))]
6690 result.expect("noop VecInsert without feature must succeed");
6691 #[cfg(feature = "sqlite-vec")]
6693 let _ = result;
6694 }
6695
6696 #[cfg(feature = "sqlite-vec")]
6697 #[test]
6698 fn node_retire_preserves_vec_rows_for_later_restore() {
6699 use crate::sqlite::open_connection_with_vec;
6700
6701 let db = NamedTempFile::new().expect("temporary db");
6702 let schema_manager = Arc::new(SchemaManager::new());
6703
6704 {
6705 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6706 schema_manager.bootstrap(&conn).expect("bootstrap");
6707 }
6708
6709 let writer = WriterActor::start(
6710 db.path(),
6711 Arc::clone(&schema_manager),
6712 ProvenanceMode::Warn,
6713 Arc::new(TelemetryCounters::default()),
6714 )
6715 .expect("writer");
6716
6717 writer
6719 .submit(WriteRequest {
6720 label: "setup".to_owned(),
6721 nodes: vec![NodeInsert {
6722 row_id: "row-retire-vec".to_owned(),
6723 logical_id: "node-retire-vec".to_owned(),
6724 kind: "Doc".to_owned(),
6725 properties: "{}".to_owned(),
6726 source_ref: Some("src".to_owned()),
6727 upsert: false,
6728 chunk_policy: ChunkPolicy::Preserve,
6729 content_ref: None,
6730 }],
6731 node_retires: vec![],
6732 edges: vec![],
6733 edge_retires: vec![],
6734 chunks: vec![ChunkInsert {
6735 id: "chunk-retire-vec".to_owned(),
6736 node_logical_id: "node-retire-vec".to_owned(),
6737 text_content: "text".to_owned(),
6738 byte_start: None,
6739 byte_end: None,
6740 content_hash: None,
6741 }],
6742 runs: vec![],
6743 steps: vec![],
6744 actions: vec![],
6745 optional_backfills: vec![],
6746 vec_inserts: vec![VecInsert {
6747 chunk_id: "chunk-retire-vec".to_owned(),
6748 embedding: vec![0.1, 0.2, 0.3],
6749 }],
6750 operational_writes: vec![],
6751 })
6752 .expect("setup write");
6753
6754 writer
6756 .submit(WriteRequest {
6757 label: "retire".to_owned(),
6758 nodes: vec![],
6759 node_retires: vec![NodeRetire {
6760 logical_id: "node-retire-vec".to_owned(),
6761 source_ref: Some("src".to_owned()),
6762 }],
6763 edges: vec![],
6764 edge_retires: vec![],
6765 chunks: vec![],
6766 runs: vec![],
6767 steps: vec![],
6768 actions: vec![],
6769 optional_backfills: vec![],
6770 vec_inserts: vec![],
6771 operational_writes: vec![],
6772 })
6773 .expect("retire write");
6774
6775 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6776 let count: i64 = conn
6777 .query_row(
6778 "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-retire-vec'",
6779 [],
6780 |row| row.get(0),
6781 )
6782 .expect("count");
6783 assert_eq!(
6784 count, 1,
6785 "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6786 );
6787 }
6788
6789 #[cfg(feature = "sqlite-vec")]
6790 #[test]
6791 #[allow(clippy::too_many_lines)]
6792 fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6793 use crate::sqlite::open_connection_with_vec;
6794
6795 let db = NamedTempFile::new().expect("temporary db");
6796 let schema_manager = Arc::new(SchemaManager::new());
6797
6798 {
6799 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6800 schema_manager.bootstrap(&conn).expect("bootstrap");
6801 }
6802
6803 let writer = WriterActor::start(
6804 db.path(),
6805 Arc::clone(&schema_manager),
6806 ProvenanceMode::Warn,
6807 Arc::new(TelemetryCounters::default()),
6808 )
6809 .expect("writer");
6810
6811 writer
6813 .submit(WriteRequest {
6814 label: "v1".to_owned(),
6815 nodes: vec![NodeInsert {
6816 row_id: "row-replace-v1".to_owned(),
6817 logical_id: "node-replace-vec".to_owned(),
6818 kind: "Doc".to_owned(),
6819 properties: "{}".to_owned(),
6820 source_ref: Some("src".to_owned()),
6821 upsert: false,
6822 chunk_policy: ChunkPolicy::Preserve,
6823 content_ref: None,
6824 }],
6825 node_retires: vec![],
6826 edges: vec![],
6827 edge_retires: vec![],
6828 chunks: vec![ChunkInsert {
6829 id: "chunk-replace-A".to_owned(),
6830 node_logical_id: "node-replace-vec".to_owned(),
6831 text_content: "version one".to_owned(),
6832 byte_start: None,
6833 byte_end: None,
6834 content_hash: None,
6835 }],
6836 runs: vec![],
6837 steps: vec![],
6838 actions: vec![],
6839 optional_backfills: vec![],
6840 vec_inserts: vec![VecInsert {
6841 chunk_id: "chunk-replace-A".to_owned(),
6842 embedding: vec![0.1, 0.2, 0.3],
6843 }],
6844 operational_writes: vec![],
6845 })
6846 .expect("v1 write");
6847
6848 writer
6850 .submit(WriteRequest {
6851 label: "v2".to_owned(),
6852 nodes: vec![NodeInsert {
6853 row_id: "row-replace-v2".to_owned(),
6854 logical_id: "node-replace-vec".to_owned(),
6855 kind: "Doc".to_owned(),
6856 properties: "{}".to_owned(),
6857 source_ref: Some("src".to_owned()),
6858 upsert: true,
6859 chunk_policy: ChunkPolicy::Replace,
6860 content_ref: None,
6861 }],
6862 node_retires: vec![],
6863 edges: vec![],
6864 edge_retires: vec![],
6865 chunks: vec![ChunkInsert {
6866 id: "chunk-replace-B".to_owned(),
6867 node_logical_id: "node-replace-vec".to_owned(),
6868 text_content: "version two".to_owned(),
6869 byte_start: None,
6870 byte_end: None,
6871 content_hash: None,
6872 }],
6873 runs: vec![],
6874 steps: vec![],
6875 actions: vec![],
6876 optional_backfills: vec![],
6877 vec_inserts: vec![VecInsert {
6878 chunk_id: "chunk-replace-B".to_owned(),
6879 embedding: vec![0.4, 0.5, 0.6],
6880 }],
6881 operational_writes: vec![],
6882 })
6883 .expect("v2 write");
6884
6885 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6886 let count_a: i64 = conn
6887 .query_row(
6888 "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-replace-A'",
6889 [],
6890 |row| row.get(0),
6891 )
6892 .expect("count A");
6893 let count_b: i64 = conn
6894 .query_row(
6895 "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-replace-B'",
6896 [],
6897 |row| row.get(0),
6898 )
6899 .expect("count B");
6900 assert_eq!(
6901 count_a, 0,
6902 "old vec row (chunk-A) must be deleted on Replace"
6903 );
6904 assert_eq!(
6905 count_b, 1,
6906 "new vec row (chunk-B) must be present after Replace"
6907 );
6908 }
6909
6910 #[cfg(feature = "sqlite-vec")]
6911 #[test]
6912 fn vec_insert_is_persisted_when_feature_enabled() {
6913 use crate::sqlite::open_connection_with_vec;
6914
6915 let db = NamedTempFile::new().expect("temporary db");
6916 let schema_manager = Arc::new(SchemaManager::new());
6917
6918 {
6920 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6921 schema_manager.bootstrap(&conn).expect("bootstrap");
6922 }
6923
6924 let writer = WriterActor::start(
6925 db.path(),
6926 Arc::clone(&schema_manager),
6927 ProvenanceMode::Warn,
6928 Arc::new(TelemetryCounters::default()),
6929 )
6930 .expect("writer");
6931
6932 writer
6934 .submit(WriteRequest {
6935 label: "vec-insert".to_owned(),
6936 nodes: vec![NodeInsert {
6937 row_id: "row-vec".to_owned(),
6938 logical_id: "node-vec".to_owned(),
6939 kind: "Document".to_owned(),
6940 properties: "{}".to_owned(),
6941 source_ref: Some("test".to_owned()),
6942 upsert: false,
6943 chunk_policy: ChunkPolicy::Preserve,
6944 content_ref: None,
6945 }],
6946 node_retires: vec![],
6947 edges: vec![],
6948 edge_retires: vec![],
6949 chunks: vec![ChunkInsert {
6950 id: "chunk-vec".to_owned(),
6951 node_logical_id: "node-vec".to_owned(),
6952 text_content: "test text".to_owned(),
6953 byte_start: None,
6954 byte_end: None,
6955 content_hash: None,
6956 }],
6957 runs: vec![],
6958 steps: vec![],
6959 actions: vec![],
6960 optional_backfills: vec![],
6961 vec_inserts: vec![VecInsert {
6962 chunk_id: "chunk-vec".to_owned(),
6963 embedding: vec![0.1, 0.2, 0.3],
6964 }],
6965 operational_writes: vec![],
6966 })
6967 .expect("vec insert write");
6968
6969 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6970 let count: i64 = conn
6971 .query_row(
6972 "SELECT count(*) FROM vec_document WHERE chunk_id = 'chunk-vec'",
6973 [],
6974 |row| row.get(0),
6975 )
6976 .expect("count");
6977 assert_eq!(count, 1, "VecInsert must persist a row in vec_document");
6978 }
6979
6980 #[test]
6983 fn write_request_exceeding_node_limit_is_rejected() {
6984 let nodes: Vec<NodeInsert> = (0..10_001)
6985 .map(|i| NodeInsert {
6986 row_id: format!("row-{i}"),
6987 logical_id: format!("lg-{i}"),
6988 kind: "Note".to_owned(),
6989 properties: "{}".to_owned(),
6990 source_ref: None,
6991 upsert: false,
6992 chunk_policy: ChunkPolicy::Preserve,
6993 content_ref: None,
6994 })
6995 .collect();
6996
6997 let request = WriteRequest {
6998 label: "too-many-nodes".to_owned(),
6999 nodes,
7000 node_retires: vec![],
7001 edges: vec![],
7002 edge_retires: vec![],
7003 chunks: vec![],
7004 runs: vec![],
7005 steps: vec![],
7006 actions: vec![],
7007 optional_backfills: vec![],
7008 vec_inserts: vec![],
7009 operational_writes: vec![],
7010 };
7011
7012 let result = prepare_write(request, ProvenanceMode::Warn)
7013 .map(|_| ())
7014 .map_err(|e| format!("{e}"));
7015 assert!(
7016 matches!(result, Err(ref msg) if msg.contains("too many nodes")),
7017 "exceeding node limit must return InvalidWrite: got {result:?}"
7018 );
7019 }
7020
7021 #[test]
7022 fn write_request_exceeding_total_limit_is_rejected() {
7023 let request = WriteRequest {
7027 label: "too-many-total".to_owned(),
7028 nodes: (0..10_000)
7029 .map(|i| NodeInsert {
7030 row_id: format!("row-{i}"),
7031 logical_id: format!("lg-{i}"),
7032 kind: "Note".to_owned(),
7033 properties: "{}".to_owned(),
7034 source_ref: None,
7035 upsert: false,
7036 chunk_policy: ChunkPolicy::Preserve,
7037 content_ref: None,
7038 })
7039 .collect(),
7040 node_retires: vec![],
7041 edges: (0..10_000)
7042 .map(|i| EdgeInsert {
7043 row_id: format!("edge-row-{i}"),
7044 logical_id: format!("edge-lg-{i}"),
7045 kind: "link".to_owned(),
7046 source_logical_id: format!("lg-{i}"),
7047 target_logical_id: format!("lg-{}", i + 1),
7048 properties: "{}".to_owned(),
7049 source_ref: None,
7050 upsert: false,
7051 })
7052 .collect(),
7053 edge_retires: vec![],
7054 chunks: (0..50_000)
7055 .map(|i| ChunkInsert {
7056 id: format!("chunk-{i}"),
7057 node_logical_id: "lg-0".to_owned(),
7058 text_content: "text".to_owned(),
7059 byte_start: None,
7060 byte_end: None,
7061 content_hash: None,
7062 })
7063 .collect(),
7064 runs: vec![],
7065 steps: vec![],
7066 actions: vec![],
7067 optional_backfills: vec![],
7068 vec_inserts: (0..20_001)
7069 .map(|i| VecInsert {
7070 chunk_id: format!("vec-chunk-{i}"),
7071 embedding: vec![0.1],
7072 })
7073 .collect(),
7074 operational_writes: (0..10_000)
7075 .map(|i| OperationalWrite::Append {
7076 collection: format!("col-{i}"),
7077 record_key: format!("key-{i}"),
7078 payload_json: "{}".to_owned(),
7079 source_ref: None,
7080 })
7081 .collect(),
7082 };
7083
7084 let result = prepare_write(request, ProvenanceMode::Warn)
7085 .map(|_| ())
7086 .map_err(|e| format!("{e}"));
7087 assert!(
7088 matches!(result, Err(ref msg) if msg.contains("too many total items")),
7089 "exceeding total item limit must return InvalidWrite: got {result:?}"
7090 );
7091 }
7092
7093 #[test]
7094 fn write_request_within_limits_succeeds() {
7095 let db = NamedTempFile::new().expect("temporary db");
7096 let writer = WriterActor::start(
7097 db.path(),
7098 Arc::new(SchemaManager::new()),
7099 ProvenanceMode::Warn,
7100 Arc::new(TelemetryCounters::default()),
7101 )
7102 .expect("writer");
7103
7104 let result = writer.submit(WriteRequest {
7105 label: "within-limits".to_owned(),
7106 nodes: vec![NodeInsert {
7107 row_id: "row-1".to_owned(),
7108 logical_id: "lg-1".to_owned(),
7109 kind: "Note".to_owned(),
7110 properties: "{}".to_owned(),
7111 source_ref: None,
7112 upsert: false,
7113 chunk_policy: ChunkPolicy::Preserve,
7114 content_ref: None,
7115 }],
7116 node_retires: vec![],
7117 edges: vec![],
7118 edge_retires: vec![],
7119 chunks: vec![],
7120 runs: vec![],
7121 steps: vec![],
7122 actions: vec![],
7123 optional_backfills: vec![],
7124 vec_inserts: vec![],
7125 operational_writes: vec![],
7126 });
7127
7128 assert!(
7129 result.is_ok(),
7130 "write request within limits must succeed: got {result:?}"
7131 );
7132 }
7133
7134 #[test]
7135 fn property_fts_rows_created_on_node_insert() {
7136 let db = NamedTempFile::new().expect("temporary db");
7137 let schema = Arc::new(SchemaManager::new());
7139 {
7140 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7141 schema.bootstrap(&conn).expect("bootstrap");
7142 conn.execute(
7143 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7144 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
7145 [],
7146 )
7147 .expect("register schema");
7148 }
7149 let writer = WriterActor::start(
7150 db.path(),
7151 Arc::clone(&schema),
7152 ProvenanceMode::Warn,
7153 Arc::new(TelemetryCounters::default()),
7154 )
7155 .expect("writer");
7156
7157 writer
7158 .submit(WriteRequest {
7159 label: "goal-insert".to_owned(),
7160 nodes: vec![NodeInsert {
7161 row_id: "row-1".to_owned(),
7162 logical_id: "goal-1".to_owned(),
7163 kind: "Goal".to_owned(),
7164 properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
7165 .to_owned(),
7166 source_ref: Some("src-1".to_owned()),
7167 upsert: false,
7168 chunk_policy: ChunkPolicy::Preserve,
7169 content_ref: None,
7170 }],
7171 node_retires: vec![],
7172 edges: vec![],
7173 edge_retires: vec![],
7174 chunks: vec![],
7175 runs: vec![],
7176 steps: vec![],
7177 actions: vec![],
7178 optional_backfills: vec![],
7179 vec_inserts: vec![],
7180 operational_writes: vec![],
7181 })
7182 .expect("write");
7183
7184 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7185 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7186 let text: String = conn
7187 .query_row(
7188 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7189 [],
7190 |row| row.get(0),
7191 )
7192 .expect("property FTS row must exist");
7193 assert_eq!(text, "Ship v2 Launch the redesign");
7194 }
7195
7196 #[test]
7197 fn property_fts_rows_replaced_on_upsert() {
7198 let db = NamedTempFile::new().expect("temporary db");
7199 let schema = Arc::new(SchemaManager::new());
7200 {
7201 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7202 schema.bootstrap(&conn).expect("bootstrap");
7203 conn.execute(
7204 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7205 VALUES ('Goal', '[\"$.name\"]', ' ')",
7206 [],
7207 )
7208 .expect("register schema");
7209 }
7210 let writer = WriterActor::start(
7211 db.path(),
7212 Arc::clone(&schema),
7213 ProvenanceMode::Warn,
7214 Arc::new(TelemetryCounters::default()),
7215 )
7216 .expect("writer");
7217
7218 writer
7220 .submit(WriteRequest {
7221 label: "insert".to_owned(),
7222 nodes: vec![NodeInsert {
7223 row_id: "row-1".to_owned(),
7224 logical_id: "goal-1".to_owned(),
7225 kind: "Goal".to_owned(),
7226 properties: r#"{"name":"Alpha"}"#.to_owned(),
7227 source_ref: Some("src-1".to_owned()),
7228 upsert: false,
7229 chunk_policy: ChunkPolicy::Preserve,
7230 content_ref: None,
7231 }],
7232 node_retires: vec![],
7233 edges: vec![],
7234 edge_retires: vec![],
7235 chunks: vec![],
7236 runs: vec![],
7237 steps: vec![],
7238 actions: vec![],
7239 optional_backfills: vec![],
7240 vec_inserts: vec![],
7241 operational_writes: vec![],
7242 })
7243 .expect("insert");
7244
7245 writer
7247 .submit(WriteRequest {
7248 label: "upsert".to_owned(),
7249 nodes: vec![NodeInsert {
7250 row_id: "row-2".to_owned(),
7251 logical_id: "goal-1".to_owned(),
7252 kind: "Goal".to_owned(),
7253 properties: r#"{"name":"Beta"}"#.to_owned(),
7254 source_ref: Some("src-2".to_owned()),
7255 upsert: true,
7256 chunk_policy: ChunkPolicy::Preserve,
7257 content_ref: None,
7258 }],
7259 node_retires: vec![],
7260 edges: vec![],
7261 edge_retires: vec![],
7262 chunks: vec![],
7263 runs: vec![],
7264 steps: vec![],
7265 actions: vec![],
7266 optional_backfills: vec![],
7267 vec_inserts: vec![],
7268 operational_writes: vec![],
7269 })
7270 .expect("upsert");
7271
7272 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7273 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7274 let count: i64 = conn
7275 .query_row(
7276 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7277 [],
7278 |row| row.get(0),
7279 )
7280 .expect("count");
7281 assert_eq!(
7282 count, 1,
7283 "must have exactly one property FTS row after upsert"
7284 );
7285
7286 let text: String = conn
7287 .query_row(
7288 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7289 [],
7290 |row| row.get(0),
7291 )
7292 .expect("text");
7293 assert_eq!(text, "Beta", "property FTS must reflect updated properties");
7294 }
7295
7296 #[test]
7297 fn property_fts_rows_deleted_on_retire() {
7298 let db = NamedTempFile::new().expect("temporary db");
7299 let schema = Arc::new(SchemaManager::new());
7300 {
7301 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7302 schema.bootstrap(&conn).expect("bootstrap");
7303 conn.execute(
7304 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7305 VALUES ('Goal', '[\"$.name\"]', ' ')",
7306 [],
7307 )
7308 .expect("register schema");
7309 }
7310 let writer = WriterActor::start(
7311 db.path(),
7312 Arc::clone(&schema),
7313 ProvenanceMode::Warn,
7314 Arc::new(TelemetryCounters::default()),
7315 )
7316 .expect("writer");
7317
7318 writer
7320 .submit(WriteRequest {
7321 label: "insert".to_owned(),
7322 nodes: vec![NodeInsert {
7323 row_id: "row-1".to_owned(),
7324 logical_id: "goal-1".to_owned(),
7325 kind: "Goal".to_owned(),
7326 properties: r#"{"name":"Alpha"}"#.to_owned(),
7327 source_ref: Some("src-1".to_owned()),
7328 upsert: false,
7329 chunk_policy: ChunkPolicy::Preserve,
7330 content_ref: None,
7331 }],
7332 node_retires: vec![],
7333 edges: vec![],
7334 edge_retires: vec![],
7335 chunks: vec![],
7336 runs: vec![],
7337 steps: vec![],
7338 actions: vec![],
7339 optional_backfills: vec![],
7340 vec_inserts: vec![],
7341 operational_writes: vec![],
7342 })
7343 .expect("insert");
7344
7345 writer
7347 .submit(WriteRequest {
7348 label: "retire".to_owned(),
7349 nodes: vec![],
7350 node_retires: vec![NodeRetire {
7351 logical_id: "goal-1".to_owned(),
7352 source_ref: Some("forget-1".to_owned()),
7353 }],
7354 edges: vec![],
7355 edge_retires: vec![],
7356 chunks: vec![],
7357 runs: vec![],
7358 steps: vec![],
7359 actions: vec![],
7360 optional_backfills: vec![],
7361 vec_inserts: vec![],
7362 operational_writes: vec![],
7363 })
7364 .expect("retire");
7365
7366 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7367 let table = fathomdb_schema::fts_kind_table_name("Goal");
7368 let count: i64 = conn
7369 .query_row(
7370 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
7371 [],
7372 |row| row.get(0),
7373 )
7374 .expect("count");
7375 assert_eq!(count, 0, "property FTS row must be deleted on retire");
7376 }
7377
7378 #[test]
7379 fn no_property_fts_row_for_unregistered_kind() {
7380 let db = NamedTempFile::new().expect("temporary db");
7381 let schema = Arc::new(SchemaManager::new());
7382 {
7383 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7384 schema.bootstrap(&conn).expect("bootstrap");
7385 }
7387 let writer = WriterActor::start(
7388 db.path(),
7389 Arc::clone(&schema),
7390 ProvenanceMode::Warn,
7391 Arc::new(TelemetryCounters::default()),
7392 )
7393 .expect("writer");
7394
7395 writer
7396 .submit(WriteRequest {
7397 label: "insert".to_owned(),
7398 nodes: vec![NodeInsert {
7399 row_id: "row-1".to_owned(),
7400 logical_id: "note-1".to_owned(),
7401 kind: "Note".to_owned(),
7402 properties: r#"{"title":"hello"}"#.to_owned(),
7403 source_ref: Some("src-1".to_owned()),
7404 upsert: false,
7405 chunk_policy: ChunkPolicy::Preserve,
7406 content_ref: None,
7407 }],
7408 node_retires: vec![],
7409 edges: vec![],
7410 edge_retires: vec![],
7411 chunks: vec![],
7412 runs: vec![],
7413 steps: vec![],
7414 actions: vec![],
7415 optional_backfills: vec![],
7416 vec_inserts: vec![],
7417 operational_writes: vec![],
7418 })
7419 .expect("insert");
7420
7421 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7422 let table = fathomdb_schema::fts_kind_table_name("Note");
7423 let exists: bool = conn
7424 .query_row(
7425 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1",
7426 rusqlite::params![table],
7427 |row| row.get::<_, i64>(0),
7428 )
7429 .map(|c| c > 0)
7430 .expect("exists check");
7431 assert!(
7432 !exists,
7433 "no per-kind FTS table should be created for unregistered kind"
7434 );
7435 }
7436
7437 #[test]
7440 fn property_fts_write_goes_to_per_kind_table() {
7441 let db = NamedTempFile::new().expect("temporary db");
7443 let schema = Arc::new(SchemaManager::new());
7444 {
7445 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7446 schema.bootstrap(&conn).expect("bootstrap");
7447 conn.execute(
7448 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7449 VALUES ('Goal', '[\"$.name\"]', ' ')",
7450 [],
7451 )
7452 .expect("register schema");
7453 conn.execute_batch(
7455 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7456 node_logical_id UNINDEXED, text_content, \
7457 tokenize = 'porter unicode61 remove_diacritics 2'\
7458 )",
7459 )
7460 .expect("create per-kind table");
7461 }
7462 let writer = WriterActor::start(
7463 db.path(),
7464 Arc::clone(&schema),
7465 ProvenanceMode::Warn,
7466 Arc::new(TelemetryCounters::default()),
7467 )
7468 .expect("writer");
7469
7470 writer
7471 .submit(WriteRequest {
7472 label: "insert".to_owned(),
7473 nodes: vec![NodeInsert {
7474 row_id: "row-1".to_owned(),
7475 logical_id: "goal-1".to_owned(),
7476 kind: "Goal".to_owned(),
7477 properties: r#"{"name":"Ship v2"}"#.to_owned(),
7478 source_ref: Some("src-1".to_owned()),
7479 upsert: false,
7480 chunk_policy: ChunkPolicy::Preserve,
7481 content_ref: None,
7482 }],
7483 node_retires: vec![],
7484 edges: vec![],
7485 edge_retires: vec![],
7486 chunks: vec![],
7487 runs: vec![],
7488 steps: vec![],
7489 actions: vec![],
7490 optional_backfills: vec![],
7491 vec_inserts: vec![],
7492 operational_writes: vec![],
7493 })
7494 .expect("write");
7495
7496 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7497 let per_kind_count: i64 = conn
7499 .query_row(
7500 "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7501 [],
7502 |row| row.get(0),
7503 )
7504 .expect("per-kind count");
7505 assert_eq!(
7506 per_kind_count, 1,
7507 "per-kind table fts_props_goal must have the row"
7508 );
7509 }
7510
7511 #[test]
7512 fn property_fts_retire_removes_from_per_kind_table() {
7513 let db = NamedTempFile::new().expect("temporary db");
7515 let schema = Arc::new(SchemaManager::new());
7516 {
7517 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7518 schema.bootstrap(&conn).expect("bootstrap");
7519 conn.execute(
7520 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7521 VALUES ('Goal', '[\"$.name\"]', ' ')",
7522 [],
7523 )
7524 .expect("register schema");
7525 conn.execute_batch(
7526 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7527 node_logical_id UNINDEXED, text_content, \
7528 tokenize = 'porter unicode61 remove_diacritics 2'\
7529 )",
7530 )
7531 .expect("create per-kind table");
7532 }
7533 let writer = WriterActor::start(
7534 db.path(),
7535 Arc::clone(&schema),
7536 ProvenanceMode::Warn,
7537 Arc::new(TelemetryCounters::default()),
7538 )
7539 .expect("writer");
7540
7541 writer
7543 .submit(WriteRequest {
7544 label: "insert".to_owned(),
7545 nodes: vec![NodeInsert {
7546 row_id: "row-1".to_owned(),
7547 logical_id: "goal-1".to_owned(),
7548 kind: "Goal".to_owned(),
7549 properties: r#"{"name":"Alpha"}"#.to_owned(),
7550 source_ref: Some("src-1".to_owned()),
7551 upsert: false,
7552 chunk_policy: ChunkPolicy::Preserve,
7553 content_ref: None,
7554 }],
7555 node_retires: vec![],
7556 edges: vec![],
7557 edge_retires: vec![],
7558 chunks: vec![],
7559 runs: vec![],
7560 steps: vec![],
7561 actions: vec![],
7562 optional_backfills: vec![],
7563 vec_inserts: vec![],
7564 operational_writes: vec![],
7565 })
7566 .expect("insert");
7567
7568 writer
7570 .submit(WriteRequest {
7571 label: "retire".to_owned(),
7572 nodes: vec![],
7573 node_retires: vec![NodeRetire {
7574 logical_id: "goal-1".to_owned(),
7575 source_ref: Some("forget-1".to_owned()),
7576 }],
7577 edges: vec![],
7578 edge_retires: vec![],
7579 chunks: vec![],
7580 runs: vec![],
7581 steps: vec![],
7582 actions: vec![],
7583 optional_backfills: vec![],
7584 vec_inserts: vec![],
7585 operational_writes: vec![],
7586 })
7587 .expect("retire");
7588
7589 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7590 let per_kind_count: i64 = conn
7591 .query_row(
7592 "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7593 [],
7594 |row| row.get(0),
7595 )
7596 .expect("per-kind count");
7597 assert_eq!(
7598 per_kind_count, 0,
7599 "per-kind table row must be removed on retire"
7600 );
7601 }
7602
7603 mod extract_json_path_tests {
7604 use super::super::extract_json_path;
7605 use serde_json::json;
7606
7607 #[test]
7608 fn string_value() {
7609 let v = json!({"name": "alice"});
7610 assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7611 }
7612
7613 #[test]
7614 fn number_value() {
7615 let v = json!({"age": 42});
7616 assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7617 }
7618
7619 #[test]
7620 fn bool_value() {
7621 let v = json!({"active": true});
7622 assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7623 }
7624
7625 #[test]
7626 fn null_value() {
7627 let v = json!({"x": null});
7628 assert!(extract_json_path(&v, "$.x").is_empty());
7629 }
7630
7631 #[test]
7632 fn missing_path() {
7633 let v = json!({"name": "a"});
7634 assert!(extract_json_path(&v, "$.missing").is_empty());
7635 }
7636
7637 #[test]
7638 fn nested_path() {
7639 let v = json!({"address": {"city": "NYC"}});
7640 assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7641 }
7642
7643 #[test]
7644 fn array_of_strings() {
7645 let v = json!({"tags": ["a", "b", "c"]});
7646 assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7647 }
7648
7649 #[test]
7650 fn array_mixed_scalars() {
7651 let v = json!({"vals": ["x", 1, true]});
7652 assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7653 }
7654
7655 #[test]
7656 fn array_only_objects_returns_empty() {
7657 let v = json!({"data": [{"k": "v"}]});
7658 assert!(extract_json_path(&v, "$.data").is_empty());
7659 }
7660
7661 #[test]
7662 fn array_mixed_objects_and_scalars() {
7663 let v = json!({"data": ["keep", {"skip": true}, "also"]});
7664 assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7665 }
7666
7667 #[test]
7668 fn object_returns_empty() {
7669 let v = json!({"meta": {"k": "v"}});
7670 assert!(extract_json_path(&v, "$.meta").is_empty());
7671 }
7672
7673 #[test]
7674 fn no_prefix_returns_empty() {
7675 let v = json!({"name": "a"});
7676 assert!(extract_json_path(&v, "name").is_empty());
7677 }
7678 }
7679
7680 mod recursive_extraction_tests {
7681 use super::super::{
7682 LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7683 PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7684 };
7685 use serde_json::json;
7686
7687 fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7688 PropertyFtsSchema {
7689 paths,
7690 separator: " ".to_owned(),
7691 exclude_paths: Vec::new(),
7692 }
7693 }
7694
7695 fn schema_with_excludes(
7696 paths: Vec<PropertyPathEntry>,
7697 excludes: Vec<String>,
7698 ) -> PropertyFtsSchema {
7699 PropertyFtsSchema {
7700 paths,
7701 separator: " ".to_owned(),
7702 exclude_paths: excludes,
7703 }
7704 }
7705
7706 #[test]
7707 fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7708 let props = json!({"payload": {"b": "two", "a": "one"}});
7709 let (blob, positions, _stats) = extract_property_fts(
7710 &props,
7711 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7712 );
7713 let blob = blob.expect("blob emitted");
7714 let idx_one = blob.find("one").expect("contains 'one'");
7715 let idx_two = blob.find("two").expect("contains 'two'");
7716 assert!(
7717 idx_one < idx_two,
7718 "lex order: 'one' (key a) before 'two' (key b)"
7719 );
7720 assert_eq!(positions.len(), 2);
7721 assert_eq!(positions[0].leaf_path, "$.payload.a");
7722 assert_eq!(positions[1].leaf_path, "$.payload.b");
7723 }
7724
7725 #[test]
7726 fn recursive_extraction_walks_arrays_of_scalars() {
7727 let props = json!({"tags": ["red", "blue"]});
7728 let (_blob, positions, _stats) = extract_property_fts(
7729 &props,
7730 &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7731 );
7732 assert_eq!(positions.len(), 2);
7733 assert_eq!(positions[0].leaf_path, "$.tags[0]");
7734 assert_eq!(positions[1].leaf_path, "$.tags[1]");
7735 }
7736
7737 #[test]
7738 fn recursive_extraction_walks_arrays_of_objects() {
7739 let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7740 let (_blob, positions, _stats) = extract_property_fts(
7741 &props,
7742 &schema(vec![PropertyPathEntry::recursive("$.items")]),
7743 );
7744 assert_eq!(positions.len(), 2);
7745 assert_eq!(positions[0].leaf_path, "$.items[0].name");
7746 assert_eq!(positions[1].leaf_path, "$.items[1].name");
7747 }
7748
7749 #[test]
7750 fn recursive_extraction_stringifies_numbers_and_bools() {
7751 let props = json!({"root": {"n": 42, "ok": true}});
7752 let (blob, _positions, _stats) = extract_property_fts(
7753 &props,
7754 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7755 );
7756 let blob = blob.expect("blob emitted");
7757 assert!(blob.contains("42"));
7758 assert!(blob.contains("true"));
7759 }
7760
7761 #[test]
7762 fn recursive_extraction_skips_nulls_and_missing() {
7763 let props = json!({"root": {"x": null, "y": "present"}});
7764 let (blob, positions, _stats) = extract_property_fts(
7765 &props,
7766 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7767 );
7768 let blob = blob.expect("blob emitted");
7769 assert!(!blob.contains("null"));
7770 assert_eq!(positions.len(), 1);
7771 assert_eq!(positions[0].leaf_path, "$.root.y");
7772 }
7773
7774 #[test]
7775 fn recursive_extraction_respects_max_depth_guardrail() {
7776 let mut inner = json!("leaf-value");
7778 for _ in 0..10 {
7779 inner = json!({ "k": inner });
7780 }
7781 let props = json!({ "root": inner });
7782 let (blob, positions, stats) = extract_property_fts(
7783 &props,
7784 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7785 );
7786 assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7787 assert!(
7789 blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7790 "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7791 );
7792 let _ = positions;
7795 let _ = MAX_RECURSIVE_DEPTH;
7796 }
7797
7798 #[test]
7799 fn recursive_extraction_respects_max_bytes_guardrail() {
7800 let leaves: Vec<String> = (0..40)
7802 .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7803 .collect();
7804 let props = json!({ "root": leaves });
7805 let (blob, positions, stats) = extract_property_fts(
7806 &props,
7807 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7808 );
7809 assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7810 let blob = blob.expect("blob must still be emitted");
7811 assert!(
7812 blob.len() <= MAX_EXTRACTED_BYTES,
7813 "blob must not exceed MAX_EXTRACTED_BYTES"
7814 );
7815 for pos in &positions {
7817 assert!(pos.end_offset <= blob.len());
7818 let slice = &blob[pos.start_offset..pos.end_offset];
7819 assert!(!slice.is_empty());
7822 assert!(!slice.contains(LEAF_SEPARATOR));
7823 }
7824 assert!(!blob.ends_with(LEAF_SEPARATOR));
7826 }
7827
7828 #[test]
7829 fn recursive_extraction_respects_exclude_paths() {
7830 let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7831 let (blob, positions, stats) = extract_property_fts(
7832 &props,
7833 &schema_with_excludes(
7834 vec![PropertyPathEntry::recursive("$.payload")],
7835 vec!["$.payload.priv".to_owned()],
7836 ),
7837 );
7838 let blob = blob.expect("blob emitted");
7839 assert!(blob.contains("yes"));
7840 assert!(!blob.contains("no"));
7841 assert_eq!(positions.len(), 1);
7842 assert_eq!(positions[0].leaf_path, "$.payload.pub");
7843 assert!(stats.excluded_subtree > 0);
7844 }
7845
7846 #[test]
7847 fn position_map_entries_match_emitted_leaves_in_order() {
7848 let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7849 let (blob, positions, _stats) = extract_property_fts(
7850 &props,
7851 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7852 );
7853 let blob = blob.expect("blob emitted");
7854 assert_eq!(positions.len(), 3);
7855 assert_eq!(positions[0].leaf_path, "$.root.a");
7857 assert_eq!(positions[1].leaf_path, "$.root.b");
7858 assert_eq!(positions[2].leaf_path, "$.root.c");
7859 let mut prev_end: usize = 0;
7861 for (i, pos) in positions.iter().enumerate() {
7862 assert!(pos.start_offset >= prev_end);
7863 assert!(pos.end_offset > pos.start_offset);
7864 assert!(pos.end_offset <= blob.len());
7865 let slice = &blob[pos.start_offset..pos.end_offset];
7866 match i {
7867 0 => assert_eq!(slice, "alpha"),
7868 1 => assert_eq!(slice, "bravo"),
7869 2 => assert_eq!(slice, "charlie"),
7870 _ => unreachable!(),
7871 }
7872 prev_end = pos.end_offset;
7873 }
7874 }
7875
7876 #[test]
7877 fn scalar_only_schema_produces_empty_position_map() {
7878 let props = json!({"name": "alpha", "title": "beta"});
7879 let (blob, positions, _stats) = extract_property_fts(
7880 &props,
7881 &schema(vec![
7882 PropertyPathEntry::scalar("$.name"),
7883 PropertyPathEntry::scalar("$.title"),
7884 ]),
7885 );
7886 assert_eq!(blob.as_deref(), Some("alpha beta"));
7887 assert!(
7888 positions.is_empty(),
7889 "scalar-only schema must emit no position entries"
7890 );
7891 let _: Vec<PositionEntry> = positions;
7894 }
7895
7896 fn assert_unique_start_offsets(positions: &[PositionEntry]) {
7897 let mut seen = std::collections::HashSet::new();
7898 for pos in positions {
7899 let start = pos.start_offset;
7900 assert!(
7901 seen.insert(start),
7902 "duplicate start_offset {start} in positions {positions:?}"
7903 );
7904 }
7905 }
7906
7907 #[test]
7908 fn recursive_extraction_empty_then_nonempty_in_array() {
7909 let props = json!({"payload": {"xs": ["", "x"]}});
7910 let (combined, positions, _stats) = extract_property_fts(
7911 &props,
7912 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7913 );
7914 assert_eq!(combined.as_deref(), Some("x"));
7915 assert_eq!(positions.len(), 1);
7916 assert_eq!(positions[0].leaf_path, "$.payload.xs[1]");
7917 assert_eq!(positions[0].start_offset, 0);
7918 assert_eq!(positions[0].end_offset, 1);
7919 assert_unique_start_offsets(&positions);
7920 }
7921
7922 #[test]
7923 fn recursive_extraction_two_empties_then_nonempty_in_array() {
7924 let props = json!({"payload": {"xs": ["", "", "x"]}});
7925 let (combined, positions, _stats) = extract_property_fts(
7926 &props,
7927 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7928 );
7929 assert_eq!(combined.as_deref(), Some("x"));
7930 assert_eq!(positions.len(), 1);
7931 assert_eq!(positions[0].leaf_path, "$.payload.xs[2]");
7932 assert_eq!(positions[0].start_offset, 0);
7933 assert_eq!(positions[0].end_offset, 1);
7934 assert_unique_start_offsets(&positions);
7935 }
7936
7937 #[test]
7938 fn recursive_extraction_empty_then_nonempty_sibling_keys() {
7939 let props = json!({"payload": {"a": "", "b": "x"}});
7940 let (combined, positions, _stats) = extract_property_fts(
7941 &props,
7942 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7943 );
7944 assert_eq!(combined.as_deref(), Some("x"));
7945 assert_eq!(positions.len(), 1);
7946 assert_eq!(positions[0].leaf_path, "$.payload.b");
7947 assert_eq!(positions[0].start_offset, 0);
7948 assert_eq!(positions[0].end_offset, 1);
7949 assert_unique_start_offsets(&positions);
7950 }
7951
7952 #[test]
7953 fn recursive_extraction_nested_empty_then_nonempty_sibling_keys() {
7954 let props = json!({"payload": {"inner": {"a": "", "b": "x"}}});
7955 let (combined, positions, _stats) = extract_property_fts(
7956 &props,
7957 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7958 );
7959 assert_eq!(combined.as_deref(), Some("x"));
7960 assert_eq!(positions.len(), 1);
7961 assert_eq!(positions[0].leaf_path, "$.payload.inner.b");
7962 assert_eq!(positions[0].start_offset, 0);
7963 assert_eq!(positions[0].end_offset, 1);
7964 assert_unique_start_offsets(&positions);
7965 }
7966
7967 #[test]
7968 fn recursive_extraction_descent_past_empty_sibling_into_nested_subtree() {
7969 let props = json!({"payload": {"a": "", "b": {"c": "x"}}});
7970 let (combined, positions, _stats) = extract_property_fts(
7971 &props,
7972 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7973 );
7974 assert_eq!(combined.as_deref(), Some("x"));
7975 assert_eq!(positions.len(), 1);
7976 assert_eq!(positions[0].leaf_path, "$.payload.b.c");
7977 assert_eq!(positions[0].start_offset, 0);
7978 assert_eq!(positions[0].end_offset, 1);
7979 assert_unique_start_offsets(&positions);
7980 }
7981
7982 #[test]
7983 fn recursive_extraction_all_empty_shapes_emit_no_positions() {
7984 let cases = vec![
7985 json!({"payload": {}}),
7986 json!({"payload": {"a": ""}}),
7987 json!({"payload": {"xs": []}}),
7988 json!({"payload": {"xs": [""]}}),
7989 json!({"payload": {"xs": ["", ""]}}),
7990 json!({"payload": {"xs": ["", "", ""]}}),
7991 ];
7992 for case in cases {
7993 let (combined, positions, _stats) = extract_property_fts(
7994 &case,
7995 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7996 );
7997 assert!(
7998 combined.is_none(),
7999 "all-empty payload {case:?} must produce no combined text, got {combined:?}"
8000 );
8001 assert!(
8002 positions.is_empty(),
8003 "all-empty payload {case:?} must produce no positions, got {positions:?}"
8004 );
8005 }
8006 }
8007
8008 #[test]
8009 fn recursive_extraction_nonempty_then_empty_then_nonempty() {
8010 let props = json!({"payload": {"xs": ["x", "", "y"]}});
8011 let (combined, positions, _stats) = extract_property_fts(
8012 &props,
8013 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
8014 );
8015 let blob = combined.expect("blob emitted");
8016 assert_eq!(positions.len(), 2);
8017 assert_eq!(positions[0].leaf_path, "$.payload.xs[0]");
8018 assert_eq!(positions[1].leaf_path, "$.payload.xs[2]");
8019 assert_eq!(positions[0].start_offset, 0);
8020 assert_eq!(positions[0].end_offset, 1);
8021 assert_eq!(positions[1].start_offset, 1 + LEAF_SEPARATOR.len());
8025 assert_eq!(positions[1].end_offset, 2 + LEAF_SEPARATOR.len());
8026 assert_eq!(
8027 &blob[positions[0].start_offset..positions[0].end_offset],
8028 "x"
8029 );
8030 assert_eq!(
8031 &blob[positions[1].start_offset..positions[1].end_offset],
8032 "y"
8033 );
8034 assert_unique_start_offsets(&positions);
8035 }
8036
8037 #[test]
8038 fn recursive_extraction_null_leaves_unchanged() {
8039 let props = json!({"payload": {"xs": [null, null]}});
8040 let (combined, positions, _stats) = extract_property_fts(
8041 &props,
8042 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
8043 );
8044 assert!(combined.is_none());
8045 assert!(positions.is_empty());
8046 }
8047 }
8048
8049 mod parse_property_schema_json_tests {
8050 use super::super::parse_property_schema_json;
8051
8052 #[test]
8053 fn parse_property_schema_json_preserves_weight() {
8054 let json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar"}]"#;
8055 let schema = parse_property_schema_json(json, " ");
8056 assert_eq!(schema.paths.len(), 2);
8057 let title = &schema.paths[0];
8058 assert_eq!(title.path, "$.title");
8059 assert_eq!(title.weight, Some(2.0_f32));
8060 let body = &schema.paths[1];
8061 assert_eq!(body.path, "$.body");
8062 assert_eq!(body.weight, None);
8063 }
8064 }
8065
8066 mod extract_property_fts_columns_tests {
8069 use serde_json::json;
8070
8071 use super::super::{PropertyFtsSchema, PropertyPathEntry, extract_property_fts_columns};
8072
8073 fn weighted_schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
8074 PropertyFtsSchema {
8075 paths,
8076 separator: " ".to_owned(),
8077 exclude_paths: Vec::new(),
8078 }
8079 }
8080
8081 #[test]
8082 fn extract_property_fts_columns_returns_per_spec_text() {
8083 let props = json!({"title": "Hello", "body": "World"});
8084 let mut title_entry = PropertyPathEntry::scalar("$.title");
8085 title_entry.weight = Some(2.0);
8086 let mut body_entry = PropertyPathEntry::scalar("$.body");
8087 body_entry.weight = Some(1.0);
8088 let schema = weighted_schema(vec![title_entry, body_entry]);
8089 let cols = extract_property_fts_columns(&props, &schema);
8090 assert_eq!(cols.len(), 2);
8091 assert_eq!(cols[0].0, "title");
8092 assert_eq!(cols[0].1, "Hello");
8093 assert_eq!(cols[1].0, "body");
8094 assert_eq!(cols[1].1, "World");
8095 }
8096
8097 #[test]
8098 fn extract_property_fts_columns_missing_field_returns_empty_string() {
8099 let props = json!({"title": "Hello"});
8100 let mut title_entry = PropertyPathEntry::scalar("$.title");
8101 title_entry.weight = Some(2.0);
8102 let mut body_entry = PropertyPathEntry::scalar("$.body");
8103 body_entry.weight = Some(1.0);
8104 let schema = weighted_schema(vec![title_entry, body_entry]);
8105 let cols = extract_property_fts_columns(&props, &schema);
8106 assert_eq!(cols.len(), 2);
8107 assert_eq!(cols[0].1, "Hello");
8108 assert_eq!(cols[1].1, "", "missing field yields empty string");
8109 }
8110 }
8111
8112 #[test]
8115 fn writer_inserts_per_column_for_weighted_schema() {
8116 use fathomdb_schema::fts_column_name;
8117
8118 let db = NamedTempFile::new().expect("temporary db");
8119 let schema_manager = Arc::new(SchemaManager::new());
8120
8121 {
8123 let conn = rusqlite::Connection::open(db.path()).expect("conn");
8124 schema_manager.bootstrap(&conn).expect("bootstrap");
8125
8126 let title_col = fts_column_name("$.title", false);
8127 let body_col = fts_column_name("$.body", false);
8128
8129 let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#.to_string();
8131 conn.execute(
8132 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
8133 VALUES (?1, ?2, ' ')",
8134 rusqlite::params!["Article", paths_json],
8135 )
8136 .expect("register schema");
8137
8138 conn.execute_batch(&format!(
8140 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_article USING fts5(\
8141 node_logical_id UNINDEXED, {title_col}, {body_col}, \
8142 tokenize = 'porter unicode61 remove_diacritics 2'\
8143 )"
8144 ))
8145 .expect("create weighted per-kind table");
8146 }
8147
8148 let writer = WriterActor::start(
8149 db.path(),
8150 Arc::clone(&schema_manager),
8151 ProvenanceMode::Warn,
8152 Arc::new(TelemetryCounters::default()),
8153 )
8154 .expect("writer");
8155
8156 writer
8157 .submit(WriteRequest {
8158 label: "insert".to_owned(),
8159 nodes: vec![NodeInsert {
8160 row_id: "row-1".to_owned(),
8161 logical_id: "article-1".to_owned(),
8162 kind: "Article".to_owned(),
8163 properties: r#"{"title":"Hello World","body":"Foo Bar"}"#.to_owned(),
8164 source_ref: Some("src-1".to_owned()),
8165 upsert: false,
8166 chunk_policy: ChunkPolicy::Preserve,
8167 content_ref: None,
8168 }],
8169 node_retires: vec![],
8170 edges: vec![],
8171 edge_retires: vec![],
8172 chunks: vec![],
8173 runs: vec![],
8174 steps: vec![],
8175 actions: vec![],
8176 optional_backfills: vec![],
8177 vec_inserts: vec![],
8178 operational_writes: vec![],
8179 })
8180 .expect("write");
8181
8182 let conn = rusqlite::Connection::open(db.path()).expect("conn");
8183 let title_col = fts_column_name("$.title", false);
8184 let body_col = fts_column_name("$.body", false);
8185
8186 let count: i64 = conn
8188 .query_row(
8189 "SELECT count(*) FROM fts_props_article WHERE node_logical_id = 'article-1'",
8190 [],
8191 |r| r.get(0),
8192 )
8193 .expect("count");
8194 assert_eq!(count, 1, "per-kind FTS table must have the row");
8195
8196 let (title_val, body_val): (String, String) = conn
8198 .query_row(
8199 &format!(
8200 "SELECT {title_col}, {body_col} FROM fts_props_article \
8201 WHERE node_logical_id = 'article-1'"
8202 ),
8203 [],
8204 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
8205 )
8206 .expect("select per-column");
8207 assert_eq!(
8208 title_val, "Hello World",
8209 "title column must have correct value"
8210 );
8211 assert_eq!(body_val, "Foo Bar", "body column must have correct value");
8212 }
8213}