1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::{DEFAULT_FTS_TOKENIZER, SchemaManager};
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14 OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15 OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16 OperationalValidationMode, extract_secondary_index_entries_for_current,
17 extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18 parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26 pub target: ProjectionTarget,
28 pub payload: String,
30}
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35 #[default]
37 Preserve,
38 Replace,
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45 #[default]
47 Warn,
48 Require,
51}
52
53#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56 pub row_id: String,
57 pub logical_id: String,
58 pub kind: String,
59 pub properties: String,
60 pub source_ref: Option<String>,
61 pub upsert: bool,
64 pub chunk_policy: ChunkPolicy,
66 pub content_ref: Option<String>,
68}
69
70#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EdgeInsert {
73 pub row_id: String,
74 pub logical_id: String,
75 pub source_logical_id: String,
76 pub target_logical_id: String,
77 pub kind: String,
78 pub properties: String,
79 pub source_ref: Option<String>,
80 pub upsert: bool,
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
87pub struct NodeRetire {
88 pub logical_id: String,
89 pub source_ref: Option<String>,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeRetire {
95 pub logical_id: String,
96 pub source_ref: Option<String>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq)]
101pub struct ChunkInsert {
102 pub id: String,
103 pub node_logical_id: String,
104 pub text_content: String,
105 pub byte_start: Option<i64>,
106 pub byte_end: Option<i64>,
107 pub content_hash: Option<String>,
109}
110
111#[derive(Clone, Debug, PartialEq)]
118pub struct VecInsert {
119 pub chunk_id: String,
120 pub embedding: Vec<f32>,
121}
122
123#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum OperationalWrite {
126 Append {
127 collection: String,
128 record_key: String,
129 payload_json: String,
130 source_ref: Option<String>,
131 },
132 Put {
133 collection: String,
134 record_key: String,
135 payload_json: String,
136 source_ref: Option<String>,
137 },
138 Delete {
139 collection: String,
140 record_key: String,
141 source_ref: Option<String>,
142 },
143}
144
145#[derive(Clone, Debug, PartialEq, Eq)]
147pub struct RunInsert {
148 pub id: String,
149 pub kind: String,
150 pub status: String,
151 pub properties: String,
152 pub source_ref: Option<String>,
153 pub upsert: bool,
154 pub supersedes_id: Option<String>,
155}
156
157#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepInsert {
160 pub id: String,
161 pub run_id: String,
162 pub kind: String,
163 pub status: String,
164 pub properties: String,
165 pub source_ref: Option<String>,
166 pub upsert: bool,
167 pub supersedes_id: Option<String>,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct ActionInsert {
173 pub id: String,
174 pub step_id: String,
175 pub kind: String,
176 pub status: String,
177 pub properties: String,
178 pub source_ref: Option<String>,
179 pub upsert: bool,
180 pub supersedes_id: Option<String>,
181}
182
183const MAX_NODES: usize = 10_000;
185const MAX_EDGES: usize = 10_000;
186const MAX_CHUNKS: usize = 50_000;
187const MAX_RETIRES: usize = 10_000;
188const MAX_RUNTIME_ITEMS: usize = 10_000;
189const MAX_OPERATIONAL: usize = 10_000;
190const MAX_TOTAL_ITEMS: usize = 100_000;
191
192const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
199
200#[derive(Clone, Debug, PartialEq)]
202pub struct WriteRequest {
203 pub label: String,
204 pub nodes: Vec<NodeInsert>,
205 pub node_retires: Vec<NodeRetire>,
206 pub edges: Vec<EdgeInsert>,
207 pub edge_retires: Vec<EdgeRetire>,
208 pub chunks: Vec<ChunkInsert>,
209 pub runs: Vec<RunInsert>,
210 pub steps: Vec<StepInsert>,
211 pub actions: Vec<ActionInsert>,
212 pub optional_backfills: Vec<OptionalProjectionTask>,
213 pub vec_inserts: Vec<VecInsert>,
216 pub operational_writes: Vec<OperationalWrite>,
217}
218
219#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct WriteReceipt {
222 pub label: String,
223 pub optional_backfill_count: usize,
224 pub warnings: Vec<String>,
225 pub provenance_warnings: Vec<String>,
226}
227
228#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct LastAccessTouchRequest {
231 pub logical_ids: Vec<String>,
232 pub touched_at: i64,
233 pub source_ref: Option<String>,
234}
235
236#[derive(Clone, Debug, PartialEq, Eq)]
238pub struct LastAccessTouchReport {
239 pub touched_logical_ids: usize,
240 pub touched_at: i64,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct FtsProjectionRow {
245 chunk_id: String,
246 node_logical_id: String,
247 kind: String,
248 text_content: String,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
252struct FtsPropertyProjectionRow {
253 node_logical_id: String,
254 kind: String,
255 text_content: String,
256 positions: Vec<PositionEntry>,
257 columns: Vec<(String, String)>,
260}
261
262pub(crate) const MAX_RECURSIVE_DEPTH: usize = 8;
265
266pub(crate) const MAX_EXTRACTED_BYTES: usize = 65_536;
276
277pub(crate) const LEAF_SEPARATOR: &str = " fathomdbphrasebreaksentinel ";
304
305#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
308pub(crate) enum PropertyPathMode {
309 #[default]
312 Scalar,
313 Recursive,
316}
317
318#[derive(Clone, Debug, PartialEq)]
320pub(crate) struct PropertyPathEntry {
321 pub path: String,
322 pub mode: PropertyPathMode,
323 pub weight: Option<f32>,
325}
326
327impl Eq for PropertyPathEntry {}
329
330impl PropertyPathEntry {
331 pub(crate) fn scalar(path: impl Into<String>) -> Self {
332 Self {
333 path: path.into(),
334 mode: PropertyPathMode::Scalar,
335 weight: None,
336 }
337 }
338
339 #[cfg(test)]
340 pub(crate) fn recursive(path: impl Into<String>) -> Self {
341 Self {
342 path: path.into(),
343 mode: PropertyPathMode::Recursive,
344 weight: None,
345 }
346 }
347}
348
349#[derive(Clone, Debug, PartialEq, Eq)]
351pub(crate) struct PropertyFtsSchema {
352 pub paths: Vec<PropertyPathEntry>,
355 pub separator: String,
358 pub exclude_paths: Vec<String>,
367}
368
369#[derive(Clone, Debug, PartialEq, Eq)]
373pub(crate) struct PositionEntry {
374 pub start_offset: usize,
375 pub end_offset: usize,
376 pub leaf_path: String,
377}
378
379#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
382pub(crate) struct ExtractStats {
383 pub depth_cap_hit: usize,
384 pub byte_cap_reached: bool,
389 pub excluded_subtree: usize,
390}
391
392impl ExtractStats {
393 fn merge(&mut self, other: ExtractStats) {
394 self.depth_cap_hit += other.depth_cap_hit;
395 self.byte_cap_reached |= other.byte_cap_reached;
396 self.excluded_subtree += other.excluded_subtree;
397 }
398}
399
400struct PreparedWrite {
401 label: String,
402 nodes: Vec<NodeInsert>,
403 node_retires: Vec<NodeRetire>,
404 edges: Vec<EdgeInsert>,
405 edge_retires: Vec<EdgeRetire>,
406 chunks: Vec<ChunkInsert>,
407 runs: Vec<RunInsert>,
408 steps: Vec<StepInsert>,
409 actions: Vec<ActionInsert>,
410 #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
413 vec_inserts: Vec<VecInsert>,
414 operational_writes: Vec<OperationalWrite>,
415 operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
416 operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
417 operational_validation_warnings: Vec<String>,
418 node_kinds: HashMap<String, String>,
421 required_fts_rows: Vec<FtsProjectionRow>,
423 required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
425 optional_backfills: Vec<OptionalProjectionTask>,
426}
427
428enum WriteMessage {
429 Submit {
430 prepared: Box<PreparedWrite>,
431 reply: Sender<Result<WriteReceipt, EngineError>>,
432 },
433 TouchLastAccessed {
434 request: LastAccessTouchRequest,
435 reply: Sender<Result<LastAccessTouchReport, EngineError>>,
436 },
437}
438
439#[derive(Debug)]
444pub struct WriterActor {
445 sender: ManuallyDrop<SyncSender<WriteMessage>>,
446 thread_handle: Option<thread::JoinHandle<()>>,
447 provenance_mode: ProvenanceMode,
448 _telemetry: Arc<TelemetryCounters>,
450}
451
452impl WriterActor {
453 pub fn start(
456 path: impl AsRef<Path>,
457 schema_manager: Arc<SchemaManager>,
458 provenance_mode: ProvenanceMode,
459 telemetry: Arc<TelemetryCounters>,
460 ) -> Result<Self, EngineError> {
461 let database_path = path.as_ref().to_path_buf();
462 let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
463
464 let writer_telemetry = Arc::clone(&telemetry);
465 let handle = thread::Builder::new()
466 .name("fathomdb-writer".to_owned())
467 .spawn(move || {
468 writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
469 })
470 .map_err(EngineError::Io)?;
471
472 Ok(Self {
473 sender: ManuallyDrop::new(sender),
474 thread_handle: Some(handle),
475 provenance_mode,
476 _telemetry: telemetry,
477 })
478 }
479
480 fn is_thread_alive(&self) -> bool {
482 self.thread_handle
483 .as_ref()
484 .is_some_and(|h| !h.is_finished())
485 }
486
487 fn check_thread_alive(&self) -> Result<(), EngineError> {
489 if self.is_thread_alive() {
490 Ok(())
491 } else {
492 Err(EngineError::WriterRejected(
493 "writer thread has exited".to_owned(),
494 ))
495 }
496 }
497
498 pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
502 self.check_thread_alive()?;
503 let prepared = prepare_write(request, self.provenance_mode)?;
504 let (reply_tx, reply_rx) = mpsc::channel();
505 self.sender
506 .send(WriteMessage::Submit {
507 prepared: Box::new(prepared),
508 reply: reply_tx,
509 })
510 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
511
512 recv_with_timeout(&reply_rx)
513 }
514
515 pub fn touch_last_accessed(
519 &self,
520 request: LastAccessTouchRequest,
521 ) -> Result<LastAccessTouchReport, EngineError> {
522 self.check_thread_alive()?;
523 prepare_touch_last_accessed(&request, self.provenance_mode)?;
524 let (reply_tx, reply_rx) = mpsc::channel();
525 self.sender
526 .send(WriteMessage::TouchLastAccessed {
527 request,
528 reply: reply_tx,
529 })
530 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
531
532 recv_with_timeout(&reply_rx)
533 }
534}
535
536#[cfg(not(feature = "tracing"))]
537#[allow(clippy::print_stderr)]
538fn stderr_panic_notice() {
539 eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
540}
541
542impl Drop for WriterActor {
543 fn drop(&mut self) {
544 unsafe { ManuallyDrop::drop(&mut self.sender) };
551
552 if let Some(handle) = self.thread_handle.take() {
554 match handle.join() {
555 Ok(()) => {}
556 Err(payload) => {
557 if std::thread::panicking() {
558 trace_warn!(
559 "writer thread panicked during shutdown (suppressed: already panicking)"
560 );
561 #[cfg(not(feature = "tracing"))]
562 stderr_panic_notice();
563 } else {
564 std::panic::resume_unwind(payload);
565 }
566 }
567 }
568 }
569 }
570}
571
572fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
574 rx.recv_timeout(WRITER_REPLY_TIMEOUT)
575 .map_err(|error| match error {
576 mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
577 "write timed out waiting for writer thread reply — the write may still commit"
578 .to_owned(),
579 ),
580 mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
581 })
582 .and_then(|result| result)
583}
584
585fn prepare_touch_last_accessed(
586 request: &LastAccessTouchRequest,
587 mode: ProvenanceMode,
588) -> Result<(), EngineError> {
589 if request.logical_ids.is_empty() {
590 return Err(EngineError::InvalidWrite(
591 "touch_last_accessed requires at least one logical_id".to_owned(),
592 ));
593 }
594 for logical_id in &request.logical_ids {
595 if logical_id.trim().is_empty() {
596 return Err(EngineError::InvalidWrite(
597 "touch_last_accessed requires non-empty logical_ids".to_owned(),
598 ));
599 }
600 }
601 if mode == ProvenanceMode::Require && request.source_ref.is_none() {
602 return Err(EngineError::InvalidWrite(
603 "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
604 .to_owned(),
605 ));
606 }
607 Ok(())
608}
609
610fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
611 let missing: Vec<String> = request
612 .nodes
613 .iter()
614 .filter(|n| n.source_ref.is_none())
615 .map(|n| format!("node '{}'", n.logical_id))
616 .chain(
617 request
618 .node_retires
619 .iter()
620 .filter(|r| r.source_ref.is_none())
621 .map(|r| format!("node retire '{}'", r.logical_id)),
622 )
623 .chain(
624 request
625 .edges
626 .iter()
627 .filter(|e| e.source_ref.is_none())
628 .map(|e| format!("edge '{}'", e.logical_id)),
629 )
630 .chain(
631 request
632 .edge_retires
633 .iter()
634 .filter(|r| r.source_ref.is_none())
635 .map(|r| format!("edge retire '{}'", r.logical_id)),
636 )
637 .chain(
638 request
639 .runs
640 .iter()
641 .filter(|r| r.source_ref.is_none())
642 .map(|r| format!("run '{}'", r.id)),
643 )
644 .chain(
645 request
646 .steps
647 .iter()
648 .filter(|s| s.source_ref.is_none())
649 .map(|s| format!("step '{}'", s.id)),
650 )
651 .chain(
652 request
653 .actions
654 .iter()
655 .filter(|a| a.source_ref.is_none())
656 .map(|a| format!("action '{}'", a.id)),
657 )
658 .chain(
659 request
660 .operational_writes
661 .iter()
662 .filter(|write| operational_write_source_ref(write).is_none())
663 .map(|write| {
664 format!(
665 "operational {} '{}:{}'",
666 operational_write_kind(write),
667 operational_write_collection(write),
668 operational_write_record_key(write)
669 )
670 }),
671 )
672 .collect();
673
674 if missing.is_empty() {
675 Ok(())
676 } else {
677 Err(EngineError::InvalidWrite(format!(
678 "ProvenanceMode::Require: missing source_ref on: {}",
679 missing.join(", ")
680 )))
681 }
682}
683
684fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
685 if request.nodes.len() > MAX_NODES {
686 return Err(EngineError::InvalidWrite(format!(
687 "too many nodes: {} exceeds limit of {MAX_NODES}",
688 request.nodes.len()
689 )));
690 }
691 if request.edges.len() > MAX_EDGES {
692 return Err(EngineError::InvalidWrite(format!(
693 "too many edges: {} exceeds limit of {MAX_EDGES}",
694 request.edges.len()
695 )));
696 }
697 if request.chunks.len() > MAX_CHUNKS {
698 return Err(EngineError::InvalidWrite(format!(
699 "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
700 request.chunks.len()
701 )));
702 }
703 let retires = request.node_retires.len() + request.edge_retires.len();
704 if retires > MAX_RETIRES {
705 return Err(EngineError::InvalidWrite(format!(
706 "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
707 )));
708 }
709 let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
710 if runtime_items > MAX_RUNTIME_ITEMS {
711 return Err(EngineError::InvalidWrite(format!(
712 "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
713 )));
714 }
715 if request.operational_writes.len() > MAX_OPERATIONAL {
716 return Err(EngineError::InvalidWrite(format!(
717 "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
718 request.operational_writes.len()
719 )));
720 }
721 let total = request.nodes.len()
722 + request.node_retires.len()
723 + request.edges.len()
724 + request.edge_retires.len()
725 + request.chunks.len()
726 + request.runs.len()
727 + request.steps.len()
728 + request.actions.len()
729 + request.vec_inserts.len()
730 + request.operational_writes.len();
731 if total > MAX_TOTAL_ITEMS {
732 return Err(EngineError::InvalidWrite(format!(
733 "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
734 )));
735 }
736 Ok(())
737}
738
739#[allow(clippy::too_many_lines)]
740fn prepare_write(
741 request: WriteRequest,
742 mode: ProvenanceMode,
743) -> Result<PreparedWrite, EngineError> {
744 validate_request_size(&request)?;
745
746 for node in &request.nodes {
748 if node.row_id.is_empty() {
749 return Err(EngineError::InvalidWrite(
750 "NodeInsert has empty row_id".to_owned(),
751 ));
752 }
753 if node.logical_id.is_empty() {
754 return Err(EngineError::InvalidWrite(
755 "NodeInsert has empty logical_id".to_owned(),
756 ));
757 }
758 }
759 for edge in &request.edges {
760 if edge.row_id.is_empty() {
761 return Err(EngineError::InvalidWrite(
762 "EdgeInsert has empty row_id".to_owned(),
763 ));
764 }
765 if edge.logical_id.is_empty() {
766 return Err(EngineError::InvalidWrite(
767 "EdgeInsert has empty logical_id".to_owned(),
768 ));
769 }
770 }
771 for chunk in &request.chunks {
772 if chunk.id.is_empty() {
773 return Err(EngineError::InvalidWrite(
774 "ChunkInsert has empty id".to_owned(),
775 ));
776 }
777 if chunk.text_content.is_empty() {
778 return Err(EngineError::InvalidWrite(format!(
779 "chunk '{}' has empty text_content; empty chunks are not allowed",
780 chunk.id
781 )));
782 }
783 }
784 for run in &request.runs {
785 if run.id.is_empty() {
786 return Err(EngineError::InvalidWrite(
787 "RunInsert has empty id".to_owned(),
788 ));
789 }
790 }
791 for step in &request.steps {
792 if step.id.is_empty() {
793 return Err(EngineError::InvalidWrite(
794 "StepInsert has empty id".to_owned(),
795 ));
796 }
797 }
798 for action in &request.actions {
799 if action.id.is_empty() {
800 return Err(EngineError::InvalidWrite(
801 "ActionInsert has empty id".to_owned(),
802 ));
803 }
804 }
805 for vi in &request.vec_inserts {
806 if vi.chunk_id.is_empty() {
807 return Err(EngineError::InvalidWrite(
808 "VecInsert has empty chunk_id".to_owned(),
809 ));
810 }
811 if vi.embedding.is_empty() {
812 return Err(EngineError::InvalidWrite(format!(
813 "VecInsert for chunk '{}' has empty embedding",
814 vi.chunk_id
815 )));
816 }
817 }
818 for operational in &request.operational_writes {
819 if operational_write_collection(operational).is_empty() {
820 return Err(EngineError::InvalidWrite(
821 "OperationalWrite has empty collection".to_owned(),
822 ));
823 }
824 if operational_write_record_key(operational).is_empty() {
825 return Err(EngineError::InvalidWrite(format!(
826 "OperationalWrite for collection '{}' has empty record_key",
827 operational_write_collection(operational)
828 )));
829 }
830 match operational {
831 OperationalWrite::Append { payload_json, .. }
832 | OperationalWrite::Put { payload_json, .. } => {
833 if payload_json.is_empty() {
834 return Err(EngineError::InvalidWrite(format!(
835 "OperationalWrite {} '{}:{}' has empty payload_json",
836 operational_write_kind(operational),
837 operational_write_collection(operational),
838 operational_write_record_key(operational)
839 )));
840 }
841 }
842 OperationalWrite::Delete { .. } => {}
843 }
844 }
845
846 {
848 let mut seen = std::collections::HashSet::new();
849 for node in &request.nodes {
850 if !seen.insert(node.row_id.as_str()) {
851 return Err(EngineError::InvalidWrite(format!(
852 "duplicate row_id '{}' within the same WriteRequest",
853 node.row_id
854 )));
855 }
856 }
857 for edge in &request.edges {
858 if !seen.insert(edge.row_id.as_str()) {
859 return Err(EngineError::InvalidWrite(format!(
860 "duplicate row_id '{}' within the same WriteRequest",
861 edge.row_id
862 )));
863 }
864 }
865 }
866
867 if mode == ProvenanceMode::Require {
869 check_require_provenance(&request)?;
870 }
871
872 for run in &request.runs {
874 if run.upsert && run.supersedes_id.is_none() {
875 return Err(EngineError::InvalidWrite(format!(
876 "run '{}': upsert=true requires supersedes_id to be set",
877 run.id
878 )));
879 }
880 }
881 for step in &request.steps {
882 if step.upsert && step.supersedes_id.is_none() {
883 return Err(EngineError::InvalidWrite(format!(
884 "step '{}': upsert=true requires supersedes_id to be set",
885 step.id
886 )));
887 }
888 }
889 for action in &request.actions {
890 if action.upsert && action.supersedes_id.is_none() {
891 return Err(EngineError::InvalidWrite(format!(
892 "action '{}': upsert=true requires supersedes_id to be set",
893 action.id
894 )));
895 }
896 }
897
898 let node_kinds = request
899 .nodes
900 .iter()
901 .map(|node| (node.logical_id.clone(), node.kind.clone()))
902 .collect::<HashMap<_, _>>();
903
904 Ok(PreparedWrite {
905 label: request.label,
906 nodes: request.nodes,
907 node_retires: request.node_retires,
908 edges: request.edges,
909 edge_retires: request.edge_retires,
910 chunks: request.chunks,
911 runs: request.runs,
912 steps: request.steps,
913 actions: request.actions,
914 vec_inserts: request.vec_inserts,
915 operational_writes: request.operational_writes,
916 operational_collection_kinds: HashMap::new(),
917 operational_collection_filter_fields: HashMap::new(),
918 operational_validation_warnings: Vec::new(),
919 node_kinds,
920 required_fts_rows: Vec::new(),
921 required_property_fts_rows: Vec::new(),
922 optional_backfills: request.optional_backfills,
923 })
924}
925
926fn writer_loop(
927 database_path: &Path,
928 schema_manager: &Arc<SchemaManager>,
929 receiver: mpsc::Receiver<WriteMessage>,
930 telemetry: &TelemetryCounters,
931) {
932 trace_info!("writer thread started");
933
934 let mut conn = match sqlite::open_connection(database_path) {
935 Ok(conn) => conn,
936 Err(error) => {
937 trace_error!(error = %error, "writer thread: database connection failed");
938 reject_all(receiver, &error.to_string());
939 return;
940 }
941 };
942
943 if let Err(error) = schema_manager.bootstrap(&conn) {
944 trace_error!(error = %error, "writer thread: schema bootstrap failed");
945 reject_all(receiver, &error.to_string());
946 return;
947 }
948
949 for message in receiver {
950 match message {
951 WriteMessage::Submit {
952 mut prepared,
953 reply,
954 } => {
955 #[cfg(feature = "tracing")]
956 let start = std::time::Instant::now();
957 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
958 resolve_and_apply(&mut conn, &mut prepared)
959 }));
960 if let Ok(inner) = result {
961 #[allow(unused_variables)]
962 if let Err(error) = &inner {
963 trace_error!(
964 label = %prepared.label,
965 error = %error,
966 "write failed"
967 );
968 telemetry.increment_errors();
969 } else {
970 let row_count = (prepared.nodes.len()
971 + prepared.edges.len()
972 + prepared.chunks.len()) as u64;
973 telemetry.increment_writes(row_count);
974 trace_info!(
975 label = %prepared.label,
976 nodes = prepared.nodes.len(),
977 edges = prepared.edges.len(),
978 chunks = prepared.chunks.len(),
979 duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
980 "write committed"
981 );
982 }
983 let _ = reply.send(inner);
984 } else {
985 trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
986 telemetry.increment_errors();
987 let _ = conn.execute_batch("ROLLBACK");
989 let _ = reply.send(Err(EngineError::WriterRejected(
990 "writer thread panic during resolve_and_apply".to_owned(),
991 )));
992 }
993 }
994 WriteMessage::TouchLastAccessed { request, reply } => {
995 let result = apply_touch_last_accessed(&mut conn, &request);
996 if result.is_ok() {
997 telemetry.increment_writes(0);
998 } else {
999 telemetry.increment_errors();
1000 }
1001 let _ = reply.send(result);
1002 }
1003 }
1004 }
1005
1006 trace_info!("writer thread shutting down");
1007}
1008
1009fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
1010 for message in receiver {
1011 match message {
1012 WriteMessage::Submit { reply, .. } => {
1013 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1014 }
1015 WriteMessage::TouchLastAccessed { reply, .. } => {
1016 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1017 }
1018 }
1019 }
1020}
1021
1022fn resolve_fts_rows(
1030 conn: &rusqlite::Connection,
1031 prepared: &mut PreparedWrite,
1032) -> Result<(), EngineError> {
1033 let retiring_ids: std::collections::HashSet<&str> = prepared
1034 .node_retires
1035 .iter()
1036 .map(|r| r.logical_id.as_str())
1037 .collect();
1038 for chunk in &prepared.chunks {
1039 if retiring_ids.contains(chunk.node_logical_id.as_str()) {
1040 return Err(EngineError::InvalidWrite(format!(
1041 "chunk '{}' references node_logical_id '{}' which is being retired in the same \
1042 WriteRequest; retire and chunk insertion for the same node must not be combined",
1043 chunk.id, chunk.node_logical_id
1044 )));
1045 }
1046 }
1047 for chunk in &prepared.chunks {
1048 let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
1049 k.clone()
1050 } else {
1051 match conn.query_row(
1052 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1053 params![chunk.node_logical_id],
1054 |row| row.get::<_, String>(0),
1055 ) {
1056 Ok(kind) => kind,
1057 Err(rusqlite::Error::QueryReturnedNoRows) => {
1058 return Err(EngineError::InvalidWrite(format!(
1059 "chunk '{}' references node_logical_id '{}' that is not present in this \
1060 write request or the database \
1061 (v1 limitation: chunks and their nodes must be submitted together or the \
1062 node must already exist)",
1063 chunk.id, chunk.node_logical_id
1064 )));
1065 }
1066 Err(e) => return Err(EngineError::Sqlite(e)),
1067 }
1068 };
1069 prepared.required_fts_rows.push(FtsProjectionRow {
1070 chunk_id: chunk.id.clone(),
1071 node_logical_id: chunk.node_logical_id.clone(),
1072 kind,
1073 text_content: chunk.text_content.clone(),
1074 });
1075 }
1076 trace_debug!(
1077 fts_rows = prepared.required_fts_rows.len(),
1078 chunks_processed = prepared.chunks.len(),
1079 "fts row resolution completed"
1080 );
1081 Ok(())
1082}
1083
1084fn resolve_property_fts_rows(
1087 conn: &rusqlite::Connection,
1088 prepared: &mut PreparedWrite,
1089) -> Result<(), EngineError> {
1090 if prepared.nodes.is_empty() {
1091 return Ok(());
1092 }
1093
1094 let schemas: HashMap<String, PropertyFtsSchema> =
1095 load_fts_property_schemas(conn)?.into_iter().collect();
1096
1097 if schemas.is_empty() {
1098 return Ok(());
1099 }
1100
1101 let mut combined_stats = ExtractStats::default();
1102 for node in &prepared.nodes {
1103 let Some(schema) = schemas.get(&node.kind) else {
1104 continue;
1105 };
1106 let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
1107 let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
1108 let (text_content, positions, stats) = extract_property_fts(&props, schema);
1109 combined_stats.merge(stats);
1110 if let Some(text_content) = text_content {
1111 let columns = if has_weights {
1112 extract_property_fts_columns(&props, schema)
1113 } else {
1114 Vec::new()
1115 };
1116 prepared
1117 .required_property_fts_rows
1118 .push(FtsPropertyProjectionRow {
1119 node_logical_id: node.logical_id.clone(),
1120 kind: node.kind.clone(),
1121 text_content,
1122 positions,
1123 columns,
1124 });
1125 }
1126 }
1127 if combined_stats != ExtractStats::default() {
1128 trace_debug!(
1129 depth_cap_hit = combined_stats.depth_cap_hit,
1130 byte_cap_reached = combined_stats.byte_cap_reached,
1131 excluded_subtree = combined_stats.excluded_subtree,
1132 "property fts recursive extraction guardrails engaged"
1133 );
1134 }
1135 trace_debug!(
1136 property_fts_rows = prepared.required_property_fts_rows.len(),
1137 nodes_processed = prepared.nodes.len(),
1138 "property fts row resolution completed"
1139 );
1140 Ok(())
1141}
1142
1143pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
1147 let Some(path) = path.strip_prefix("$.") else {
1148 return Vec::new();
1149 };
1150 let mut current = value;
1151 for segment in path.split('.') {
1152 match current.get(segment) {
1153 Some(v) => current = v,
1154 None => return Vec::new(),
1155 }
1156 }
1157 match current {
1158 serde_json::Value::String(s) => vec![s.clone()],
1159 serde_json::Value::Number(n) => vec![n.to_string()],
1160 serde_json::Value::Bool(b) => vec![b.to_string()],
1161 serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
1162 serde_json::Value::Array(arr) => arr
1163 .iter()
1164 .filter_map(|v| match v {
1165 serde_json::Value::String(s) => Some(s.clone()),
1166 serde_json::Value::Number(n) => Some(n.to_string()),
1167 serde_json::Value::Bool(b) => Some(b.to_string()),
1168 _ => None,
1169 })
1170 .collect(),
1171 }
1172}
1173
1174pub(crate) fn extract_property_fts(
1194 props: &serde_json::Value,
1195 schema: &PropertyFtsSchema,
1196) -> (Option<String>, Vec<PositionEntry>, ExtractStats) {
1197 let mut walker = RecursiveWalker {
1198 blob: String::new(),
1199 positions: Vec::new(),
1200 stats: ExtractStats::default(),
1201 exclude_paths: schema.exclude_paths.clone(),
1202 stopped: false,
1203 };
1204
1205 let mut scalar_parts: Vec<String> = Vec::new();
1206
1207 for entry in &schema.paths {
1208 match entry.mode {
1209 PropertyPathMode::Scalar => {
1210 scalar_parts.extend(extract_json_path(props, &entry.path));
1211 }
1212 PropertyPathMode::Recursive => {
1213 let root = resolve_path_root(props, &entry.path);
1214 if let Some(root) = root {
1215 walker.walk(&entry.path, root, 0);
1216 }
1217 }
1218 }
1219 }
1220
1221 let scalar_text = if scalar_parts.is_empty() {
1227 None
1228 } else {
1229 Some(scalar_parts.join(&schema.separator))
1230 };
1231
1232 let combined = match (scalar_text, walker.blob.is_empty()) {
1233 (None, true) => None,
1234 (None, false) => Some(walker.blob.clone()),
1235 (Some(s), true) => Some(s),
1236 (Some(mut s), false) => {
1237 let offset = s.len() + LEAF_SEPARATOR.len();
1240 for pos in &mut walker.positions {
1241 pos.start_offset += offset;
1242 pos.end_offset += offset;
1243 }
1244 s.push_str(LEAF_SEPARATOR);
1245 s.push_str(&walker.blob);
1246 Some(s)
1247 }
1248 };
1249
1250 (combined, walker.positions, walker.stats)
1251}
1252
1253pub(crate) fn extract_property_fts_columns(
1257 props: &serde_json::Value,
1258 schema: &PropertyFtsSchema,
1259) -> Vec<(String, String)> {
1260 let mut result = Vec::new();
1261 for entry in &schema.paths {
1262 let is_recursive = matches!(entry.mode, PropertyPathMode::Recursive);
1263 let column_name = fathomdb_schema::fts_column_name(&entry.path, is_recursive);
1264 let text = match entry.mode {
1265 PropertyPathMode::Scalar => {
1266 let parts = extract_json_path(props, &entry.path);
1267 parts.join(&schema.separator)
1268 }
1269 PropertyPathMode::Recursive => {
1270 let mut walker = RecursiveWalker {
1271 blob: String::new(),
1272 positions: Vec::new(),
1273 stats: ExtractStats::default(),
1274 exclude_paths: schema.exclude_paths.clone(),
1275 stopped: false,
1276 };
1277 if let Some(root) = resolve_path_root(props, &entry.path) {
1278 walker.walk(&entry.path, root, 0);
1279 }
1280 walker.blob
1281 }
1282 };
1283 result.push((column_name, text));
1284 }
1285 result
1286}
1287
1288fn resolve_path_root<'a>(
1289 value: &'a serde_json::Value,
1290 path: &str,
1291) -> Option<&'a serde_json::Value> {
1292 let stripped = path.strip_prefix("$.")?;
1293 let mut current = value;
1294 for segment in stripped.split('.') {
1295 current = current.get(segment)?;
1296 }
1297 Some(current)
1298}
1299
1300struct RecursiveWalker {
1301 blob: String,
1302 positions: Vec<PositionEntry>,
1303 stats: ExtractStats,
1304 exclude_paths: Vec<String>,
1305 stopped: bool,
1309}
1310
1311impl RecursiveWalker {
1312 fn walk(&mut self, current_path: &str, value: &serde_json::Value, depth: usize) {
1313 if self.stopped {
1314 return;
1315 }
1316 if self.exclude_paths.iter().any(|p| p == current_path) {
1317 self.stats.excluded_subtree += 1;
1318 return;
1319 }
1320 match value {
1321 serde_json::Value::String(s) => self.emit_leaf(current_path, s),
1322 serde_json::Value::Number(n) => self.emit_leaf(current_path, &n.to_string()),
1323 serde_json::Value::Bool(b) => self.emit_leaf(current_path, &b.to_string()),
1324 serde_json::Value::Null => {}
1325 serde_json::Value::Object(map) => {
1326 if depth >= MAX_RECURSIVE_DEPTH {
1327 self.stats.depth_cap_hit += 1;
1328 return;
1329 }
1330 let mut keys: Vec<&String> = map.keys().collect();
1331 keys.sort();
1332 for key in keys {
1333 if self.stopped {
1334 return;
1335 }
1336 let child_path = format!("{current_path}.{key}");
1337 if let Some(child) = map.get(key) {
1338 self.walk(&child_path, child, depth + 1);
1339 }
1340 }
1341 }
1342 serde_json::Value::Array(items) => {
1343 if depth >= MAX_RECURSIVE_DEPTH {
1344 self.stats.depth_cap_hit += 1;
1345 return;
1346 }
1347 for (idx, item) in items.iter().enumerate() {
1348 if self.stopped {
1349 return;
1350 }
1351 let child_path = format!("{current_path}[{idx}]");
1352 self.walk(&child_path, item, depth + 1);
1353 }
1354 }
1355 }
1356 }
1357
1358 fn emit_leaf(&mut self, leaf_path: &str, value: &str) {
1359 if self.stopped {
1360 return;
1361 }
1362 if value.is_empty() {
1363 return;
1364 }
1365 let sep_len = if self.blob.is_empty() {
1369 0
1370 } else {
1371 LEAF_SEPARATOR.len()
1372 };
1373 let projected_len = self.blob.len() + sep_len + value.len();
1374 if projected_len > MAX_EXTRACTED_BYTES {
1375 self.stats.byte_cap_reached = true;
1376 self.stopped = true;
1377 return;
1378 }
1379 if !self.blob.is_empty() {
1380 self.blob.push_str(LEAF_SEPARATOR);
1381 }
1382 let start_offset = self.blob.len();
1383 self.blob.push_str(value);
1384 let end_offset = self.blob.len();
1385 self.positions.push(PositionEntry {
1386 start_offset,
1387 end_offset,
1388 leaf_path: leaf_path.to_owned(),
1389 });
1390 }
1391}
1392
1393pub(crate) fn load_fts_property_schemas(
1399 conn: &rusqlite::Connection,
1400) -> Result<Vec<(String, PropertyFtsSchema)>, rusqlite::Error> {
1401 let mut stmt =
1402 conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
1403 stmt.query_map([], |row| {
1404 let kind: String = row.get(0)?;
1405 let paths_json: String = row.get(1)?;
1406 let separator: String = row.get(2)?;
1407 let schema = parse_property_schema_json(&paths_json, &separator);
1408 Ok((kind, schema))
1409 })?
1410 .collect::<Result<Vec<_>, _>>()
1411}
1412
1413pub(crate) fn parse_property_schema_json(paths_json: &str, separator: &str) -> PropertyFtsSchema {
1414 let value: serde_json::Value = serde_json::from_str(paths_json).unwrap_or_default();
1415 let mut paths = Vec::new();
1416 let mut exclude_paths: Vec<String> = Vec::new();
1417
1418 let path_values: Vec<serde_json::Value> = match value {
1419 serde_json::Value::Array(arr) => arr,
1420 serde_json::Value::Object(map) => {
1421 if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1422 exclude_paths = excl
1423 .iter()
1424 .filter_map(|v| v.as_str().map(str::to_owned))
1425 .collect();
1426 }
1427 match map.get("paths") {
1428 Some(serde_json::Value::Array(arr)) => arr.clone(),
1429 _ => Vec::new(),
1430 }
1431 }
1432 _ => Vec::new(),
1433 };
1434
1435 for entry in path_values {
1436 match entry {
1437 serde_json::Value::String(path) => {
1438 paths.push(PropertyPathEntry::scalar(path));
1439 }
1440 serde_json::Value::Object(map) => {
1441 let Some(path) = map.get("path").and_then(|v| v.as_str()) else {
1442 continue;
1443 };
1444 let mode = map.get("mode").and_then(|v| v.as_str()).map_or(
1445 PropertyPathMode::Scalar,
1446 |m| match m {
1447 "recursive" => PropertyPathMode::Recursive,
1448 _ => PropertyPathMode::Scalar,
1449 },
1450 );
1451 #[allow(clippy::cast_possible_truncation)]
1452 let weight = map
1453 .get("weight")
1454 .and_then(serde_json::Value::as_f64)
1455 .map(|w| w as f32);
1456 paths.push(PropertyPathEntry {
1457 path: path.to_owned(),
1458 mode,
1459 weight,
1460 });
1461 if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1462 for p in excl {
1463 if let Some(s) = p.as_str() {
1464 exclude_paths.push(s.to_owned());
1465 }
1466 }
1467 }
1468 }
1469 _ => {}
1470 }
1471 }
1472
1473 PropertyFtsSchema {
1474 paths,
1475 separator: separator.to_owned(),
1476 exclude_paths,
1477 }
1478}
1479
1480fn resolve_operational_writes(
1481 conn: &rusqlite::Connection,
1482 prepared: &mut PreparedWrite,
1483) -> Result<(), EngineError> {
1484 let mut collection_kinds = HashMap::new();
1485 let mut collection_filter_fields = HashMap::new();
1486 let mut collection_validation_contracts = HashMap::new();
1487 for write in &prepared.operational_writes {
1488 let collection = operational_write_collection(write);
1489 if !collection_kinds.contains_key(collection) {
1490 let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1491 .query_row(
1492 "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1493 params![collection],
1494 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1495 )
1496 .optional()
1497 .map_err(EngineError::Sqlite)?;
1498 let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1499 .ok_or_else(|| {
1500 EngineError::InvalidWrite(format!(
1501 "operational collection '{collection}' is not registered"
1502 ))
1503 })?;
1504 if disabled_at.is_some() {
1505 return Err(EngineError::InvalidWrite(format!(
1506 "operational collection '{collection}' is disabled"
1507 )));
1508 }
1509 let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1510 .map_err(EngineError::InvalidWrite)?;
1511 let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1512 let validation_contract = parse_operational_validation_contract(&validation_json)
1513 .map_err(EngineError::InvalidWrite)?;
1514 collection_kinds.insert(collection.to_owned(), kind);
1515 collection_filter_fields.insert(collection.to_owned(), filter_fields);
1516 collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1517 }
1518
1519 let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1520 EngineError::InvalidWrite("missing operational collection kind".to_owned())
1521 })?;
1522 match (kind, write) {
1523 (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1524 | (
1525 OperationalCollectionKind::LatestState,
1526 OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1527 ) => {}
1528 (OperationalCollectionKind::AppendOnlyLog, _) => {
1529 return Err(EngineError::InvalidWrite(format!(
1530 "operational collection '{collection}' is append_only_log and only accepts Append"
1531 )));
1532 }
1533 (OperationalCollectionKind::LatestState, _) => {
1534 return Err(EngineError::InvalidWrite(format!(
1535 "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1536 )));
1537 }
1538 }
1539 if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1540 let _ = check_operational_write_against_contract(write, contract)?;
1541 }
1542 }
1543 prepared.operational_collection_kinds = collection_kinds;
1544 prepared.operational_collection_filter_fields = collection_filter_fields;
1545 Ok(())
1546}
1547
1548fn parse_operational_filter_fields(
1549 filter_fields_json: &str,
1550) -> Result<Vec<OperationalFilterField>, EngineError> {
1551 let fields: Vec<OperationalFilterField> =
1552 serde_json::from_str(filter_fields_json).map_err(|error| {
1553 EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1554 })?;
1555 let mut seen = std::collections::HashSet::new();
1556 for field in &fields {
1557 if field.name.trim().is_empty() {
1558 return Err(EngineError::InvalidWrite(
1559 "filter_fields_json field names must not be empty".to_owned(),
1560 ));
1561 }
1562 if !seen.insert(field.name.as_str()) {
1563 return Err(EngineError::InvalidWrite(format!(
1564 "filter_fields_json contains duplicate field '{}'",
1565 field.name
1566 )));
1567 }
1568 if field.modes.is_empty() {
1569 return Err(EngineError::InvalidWrite(format!(
1570 "filter_fields_json field '{}' must declare at least one mode",
1571 field.name
1572 )));
1573 }
1574 if field.modes.contains(&OperationalFilterMode::Prefix)
1575 && field.field_type != OperationalFilterFieldType::String
1576 {
1577 return Err(EngineError::InvalidWrite(format!(
1578 "filter field '{}' only supports prefix for string types",
1579 field.name
1580 )));
1581 }
1582 }
1583 Ok(fields)
1584}
1585
1586#[derive(Clone, Debug, PartialEq, Eq)]
1587struct OperationalFilterValueRow {
1588 field_name: String,
1589 string_value: Option<String>,
1590 integer_value: Option<i64>,
1591}
1592
1593fn extract_operational_filter_values(
1594 filter_fields: &[OperationalFilterField],
1595 payload_json: &str,
1596) -> Vec<OperationalFilterValueRow> {
1597 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1598 return Vec::new();
1599 };
1600 let Some(object) = parsed.as_object() else {
1601 return Vec::new();
1602 };
1603
1604 filter_fields
1605 .iter()
1606 .filter_map(|field| {
1607 let value = object.get(&field.name)?;
1608 match field.field_type {
1609 OperationalFilterFieldType::String => {
1610 value
1611 .as_str()
1612 .map(|string_value| OperationalFilterValueRow {
1613 field_name: field.name.clone(),
1614 string_value: Some(string_value.to_owned()),
1615 integer_value: None,
1616 })
1617 }
1618 OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1619 value
1620 .as_i64()
1621 .map(|integer_value| OperationalFilterValueRow {
1622 field_name: field.name.clone(),
1623 string_value: None,
1624 integer_value: Some(integer_value),
1625 })
1626 }
1627 }
1628 })
1629 .collect()
1630}
1631
1632fn resolve_and_apply(
1633 conn: &mut rusqlite::Connection,
1634 prepared: &mut PreparedWrite,
1635) -> Result<WriteReceipt, EngineError> {
1636 resolve_fts_rows(conn, prepared)?;
1637 resolve_property_fts_rows(conn, prepared)?;
1638 resolve_operational_writes(conn, prepared)?;
1639 apply_write(conn, prepared)
1640}
1641
1642fn apply_touch_last_accessed(
1643 conn: &mut rusqlite::Connection,
1644 request: &LastAccessTouchRequest,
1645) -> Result<LastAccessTouchReport, EngineError> {
1646 let mut seen = std::collections::HashSet::new();
1647 let logical_ids = request
1648 .logical_ids
1649 .iter()
1650 .filter(|logical_id| seen.insert(logical_id.as_str()))
1651 .cloned()
1652 .collect::<Vec<_>>();
1653 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1654
1655 for logical_id in &logical_ids {
1656 let exists = tx
1657 .query_row(
1658 "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1659 params![logical_id],
1660 |row| row.get::<_, i64>(0),
1661 )
1662 .optional()?
1663 .is_some();
1664 if !exists {
1665 return Err(EngineError::InvalidWrite(format!(
1666 "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1667 )));
1668 }
1669 }
1670
1671 {
1672 let mut upsert_metadata = tx.prepare_cached(
1673 "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1674 VALUES (?1, ?2, ?2) \
1675 ON CONFLICT(logical_id) DO UPDATE SET \
1676 last_accessed_at = excluded.last_accessed_at, \
1677 updated_at = excluded.updated_at",
1678 )?;
1679 let mut insert_provenance = tx.prepare_cached(
1680 "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1681 VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1682 )?;
1683 for logical_id in &logical_ids {
1684 upsert_metadata.execute(params![logical_id, request.touched_at])?;
1685 insert_provenance.execute(params![
1686 new_id(),
1687 logical_id,
1688 request.source_ref.as_deref(),
1689 format!("{{\"touched_at\":{}}}", request.touched_at),
1690 ])?;
1691 }
1692 }
1693
1694 tx.commit()?;
1695 Ok(LastAccessTouchReport {
1696 touched_logical_ids: logical_ids.len(),
1697 touched_at: request.touched_at,
1698 })
1699}
1700
1701fn ensure_operational_collections_writable(
1702 tx: &rusqlite::Transaction<'_>,
1703 prepared: &PreparedWrite,
1704) -> Result<(), EngineError> {
1705 for collection in prepared.operational_collection_kinds.keys() {
1706 let disabled_at: Option<Option<i64>> = tx
1707 .query_row(
1708 "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1709 params![collection],
1710 |row| row.get::<_, Option<i64>>(0),
1711 )
1712 .optional()?;
1713 match disabled_at {
1714 Some(Some(_)) => {
1715 return Err(EngineError::InvalidWrite(format!(
1716 "operational collection '{collection}' is disabled"
1717 )));
1718 }
1719 Some(None) => {}
1720 None => {
1721 return Err(EngineError::InvalidWrite(format!(
1722 "operational collection '{collection}' is not registered"
1723 )));
1724 }
1725 }
1726 }
1727 Ok(())
1728}
1729
1730fn validate_operational_writes_against_live_contracts(
1731 tx: &rusqlite::Transaction<'_>,
1732 prepared: &PreparedWrite,
1733) -> Result<Vec<String>, EngineError> {
1734 let mut collection_validation_contracts =
1735 HashMap::<String, Option<OperationalValidationContract>>::new();
1736 for collection in prepared.operational_collection_kinds.keys() {
1737 let validation_json: String = tx
1738 .query_row(
1739 "SELECT validation_json FROM operational_collections WHERE name = ?1",
1740 params![collection],
1741 |row| row.get(0),
1742 )
1743 .map_err(EngineError::Sqlite)?;
1744 let validation_contract = parse_operational_validation_contract(&validation_json)
1745 .map_err(EngineError::InvalidWrite)?;
1746 collection_validation_contracts.insert(collection.clone(), validation_contract);
1747 }
1748
1749 let mut warnings = Vec::new();
1750 for write in &prepared.operational_writes {
1751 if let Some(Some(contract)) =
1752 collection_validation_contracts.get(operational_write_collection(write))
1753 && let Some(warning) = check_operational_write_against_contract(write, contract)?
1754 {
1755 warnings.push(warning);
1756 }
1757 }
1758
1759 Ok(warnings)
1760}
1761
1762fn load_live_operational_secondary_indexes(
1763 tx: &rusqlite::Transaction<'_>,
1764 prepared: &PreparedWrite,
1765) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1766 let mut collection_indexes = HashMap::new();
1767 for (collection, collection_kind) in &prepared.operational_collection_kinds {
1768 let secondary_indexes_json: String = tx
1769 .query_row(
1770 "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1771 params![collection],
1772 |row| row.get(0),
1773 )
1774 .map_err(EngineError::Sqlite)?;
1775 let indexes =
1776 parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1777 .map_err(EngineError::InvalidWrite)?;
1778 collection_indexes.insert(collection.clone(), indexes);
1779 }
1780 Ok(collection_indexes)
1781}
1782
1783fn check_operational_write_against_contract(
1784 write: &OperationalWrite,
1785 contract: &OperationalValidationContract,
1786) -> Result<Option<String>, EngineError> {
1787 if contract.mode == OperationalValidationMode::Disabled {
1788 return Ok(None);
1789 }
1790
1791 let (payload_json, collection, record_key) = match write {
1792 OperationalWrite::Append {
1793 collection,
1794 record_key,
1795 payload_json,
1796 ..
1797 }
1798 | OperationalWrite::Put {
1799 collection,
1800 record_key,
1801 payload_json,
1802 ..
1803 } => (
1804 payload_json.as_str(),
1805 collection.as_str(),
1806 record_key.as_str(),
1807 ),
1808 OperationalWrite::Delete { .. } => return Ok(None),
1809 };
1810
1811 match validate_operational_payload_against_contract(contract, payload_json) {
1812 Ok(()) => Ok(None),
1813 Err(message) => match contract.mode {
1814 OperationalValidationMode::Disabled => Ok(None),
1815 OperationalValidationMode::ReportOnly => Ok(Some(format!(
1816 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1817 kind = operational_write_kind(write)
1818 ))),
1819 OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1820 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1821 kind = operational_write_kind(write)
1822 ))),
1823 },
1824 }
1825}
1826
1827#[allow(clippy::too_many_lines)]
1828fn apply_write(
1829 conn: &mut rusqlite::Connection,
1830 prepared: &mut PreparedWrite,
1831) -> Result<WriteReceipt, EngineError> {
1832 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1833
1834 {
1837 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1838 let mut del_prop_positions = tx
1839 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1840 let mut del_staging = tx.prepare_cached(
1843 "DELETE FROM fts_property_rebuild_staging WHERE node_logical_id = ?1",
1844 )?;
1845 let mut sup_node = tx.prepare_cached(
1846 "UPDATE nodes SET superseded_at = unixepoch() \
1847 WHERE logical_id = ?1 AND superseded_at IS NULL",
1848 )?;
1849 let mut ins_event = tx.prepare_cached(
1850 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1851 VALUES (?1, 'node_retire', ?2, ?3)",
1852 )?;
1853 for retire in &prepared.node_retires {
1854 del_fts.execute(params![retire.logical_id])?;
1855 let kind_opt: Option<String> = tx
1857 .query_row(
1858 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1859 params![retire.logical_id],
1860 |r| r.get(0),
1861 )
1862 .optional()?;
1863 if let Some(kind) = kind_opt {
1864 let table = fathomdb_schema::fts_kind_table_name(&kind);
1865 let table_exists: bool = tx
1867 .query_row(
1868 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1869 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1870 rusqlite::params![table],
1871 |r| r.get::<_, i64>(0),
1872 )
1873 .unwrap_or(0)
1874 > 0;
1875 if table_exists {
1876 tx.execute(
1877 &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1878 params![retire.logical_id],
1879 )?;
1880 }
1881 }
1882 del_prop_positions.execute(params![retire.logical_id])?;
1883 del_staging.execute(params![retire.logical_id])?;
1884 sup_node.execute(params![retire.logical_id])?;
1885 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1886 }
1887 }
1888
1889 {
1891 let mut sup_edge = tx.prepare_cached(
1892 "UPDATE edges SET superseded_at = unixepoch() \
1893 WHERE logical_id = ?1 AND superseded_at IS NULL",
1894 )?;
1895 let mut ins_event = tx.prepare_cached(
1896 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1897 VALUES (?1, 'edge_retire', ?2, ?3)",
1898 )?;
1899 for retire in &prepared.edge_retires {
1900 sup_edge.execute(params![retire.logical_id])?;
1901 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1902 }
1903 }
1904
1905 {
1907 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1908 let mut del_prop_positions = tx
1909 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1910 let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1911 let mut sup_node = tx.prepare_cached(
1912 "UPDATE nodes SET superseded_at = unixepoch() \
1913 WHERE logical_id = ?1 AND superseded_at IS NULL",
1914 )?;
1915 let mut ins_node = tx.prepare_cached(
1916 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1917 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1918 )?;
1919 #[cfg(feature = "sqlite-vec")]
1920 let vec_del_sql2 = "DELETE FROM vec_nodes_active WHERE chunk_id IN \
1921 (SELECT id FROM chunks WHERE node_logical_id = ?1)";
1922 #[cfg(feature = "sqlite-vec")]
1923 let mut del_vec = match tx.prepare_cached(vec_del_sql2) {
1924 Ok(stmt) => Some(stmt),
1925 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => None,
1926 Err(e) => return Err(e.into()),
1927 };
1928 for node in &prepared.nodes {
1929 if node.upsert {
1930 let table = fathomdb_schema::fts_kind_table_name(&node.kind);
1933 let table_exists: bool = tx
1935 .query_row(
1936 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1937 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1938 rusqlite::params![table],
1939 |r| r.get::<_, i64>(0),
1940 )
1941 .unwrap_or(0)
1942 > 0;
1943 if table_exists {
1944 tx.execute(
1945 &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1946 params![node.logical_id],
1947 )?;
1948 }
1949 del_prop_positions.execute(params![node.logical_id])?;
1950 if node.chunk_policy == ChunkPolicy::Replace {
1951 #[cfg(feature = "sqlite-vec")]
1952 if let Some(ref mut stmt) = del_vec {
1953 stmt.execute(params![node.logical_id])?;
1954 }
1955 del_fts.execute(params![node.logical_id])?;
1956 del_chunks.execute(params![node.logical_id])?;
1957 }
1958 sup_node.execute(params![node.logical_id])?;
1959 }
1960 ins_node.execute(params![
1961 node.row_id,
1962 node.logical_id,
1963 node.kind,
1964 node.properties,
1965 node.source_ref,
1966 node.content_ref,
1967 ])?;
1968 }
1969 }
1970
1971 {
1973 let mut sup_edge = tx.prepare_cached(
1974 "UPDATE edges SET superseded_at = unixepoch() \
1975 WHERE logical_id = ?1 AND superseded_at IS NULL",
1976 )?;
1977 let mut ins_edge = tx.prepare_cached(
1978 "INSERT INTO edges \
1979 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1980 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1981 )?;
1982 for edge in &prepared.edges {
1983 if edge.upsert {
1984 sup_edge.execute(params![edge.logical_id])?;
1985 }
1986 ins_edge.execute(params![
1987 edge.row_id,
1988 edge.logical_id,
1989 edge.source_logical_id,
1990 edge.target_logical_id,
1991 edge.kind,
1992 edge.properties,
1993 edge.source_ref,
1994 ])?;
1995 }
1996 }
1997
1998 {
2000 let mut ins_chunk = tx.prepare_cached(
2001 "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
2002 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
2003 )?;
2004 for chunk in &prepared.chunks {
2005 ins_chunk.execute(params![
2006 chunk.id,
2007 chunk.node_logical_id,
2008 chunk.text_content,
2009 chunk.byte_start,
2010 chunk.byte_end,
2011 chunk.content_hash,
2012 ])?;
2013 }
2014 }
2015
2016 {
2018 let mut sup_run = tx.prepare_cached(
2019 "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
2020 )?;
2021 let mut ins_run = tx.prepare_cached(
2022 "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
2023 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
2024 )?;
2025 for run in &prepared.runs {
2026 if run.upsert
2027 && let Some(ref prior_id) = run.supersedes_id
2028 {
2029 sup_run.execute(params![prior_id])?;
2030 }
2031 ins_run.execute(params![
2032 run.id,
2033 run.kind,
2034 run.status,
2035 run.properties,
2036 run.source_ref
2037 ])?;
2038 }
2039 }
2040
2041 {
2043 let mut sup_step = tx.prepare_cached(
2044 "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
2045 )?;
2046 let mut ins_step = tx.prepare_cached(
2047 "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
2048 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
2049 )?;
2050 for step in &prepared.steps {
2051 if step.upsert
2052 && let Some(ref prior_id) = step.supersedes_id
2053 {
2054 sup_step.execute(params![prior_id])?;
2055 }
2056 ins_step.execute(params![
2057 step.id,
2058 step.run_id,
2059 step.kind,
2060 step.status,
2061 step.properties,
2062 step.source_ref,
2063 ])?;
2064 }
2065 }
2066
2067 {
2069 let mut sup_action = tx.prepare_cached(
2070 "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
2071 )?;
2072 let mut ins_action = tx.prepare_cached(
2073 "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
2074 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
2075 )?;
2076 for action in &prepared.actions {
2077 if action.upsert
2078 && let Some(ref prior_id) = action.supersedes_id
2079 {
2080 sup_action.execute(params![prior_id])?;
2081 }
2082 ins_action.execute(params![
2083 action.id,
2084 action.step_id,
2085 action.kind,
2086 action.status,
2087 action.properties,
2088 action.source_ref,
2089 ])?;
2090 }
2091 }
2092
2093 {
2095 ensure_operational_collections_writable(&tx, prepared)?;
2096 prepared.operational_validation_warnings =
2097 validate_operational_writes_against_live_contracts(&tx, prepared)?;
2098 let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
2099
2100 let mut next_mutation_order: i64 = tx.query_row(
2101 "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
2102 [],
2103 |row| row.get(0),
2104 )?;
2105 let mut ins_mutation = tx.prepare_cached(
2106 "INSERT INTO operational_mutations \
2107 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2108 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
2109 )?;
2110 let mut ins_filter_value = tx.prepare_cached(
2111 "INSERT INTO operational_filter_values \
2112 (mutation_id, collection_name, field_name, string_value, integer_value) \
2113 VALUES (?1, ?2, ?3, ?4, ?5)",
2114 )?;
2115 let mut upsert_current = tx.prepare_cached(
2116 "INSERT INTO operational_current \
2117 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2118 VALUES (?1, ?2, ?3, unixepoch(), ?4) \
2119 ON CONFLICT(collection_name, record_key) DO UPDATE SET \
2120 payload_json = excluded.payload_json, \
2121 updated_at = excluded.updated_at, \
2122 last_mutation_id = excluded.last_mutation_id",
2123 )?;
2124 let mut del_current = tx.prepare_cached(
2125 "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
2126 )?;
2127 let mut del_current_secondary_indexes = tx.prepare_cached(
2128 "DELETE FROM operational_secondary_index_entries \
2129 WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
2130 )?;
2131 let mut ins_secondary_index = tx.prepare_cached(
2132 "INSERT INTO operational_secondary_index_entries \
2133 (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
2134 slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
2135 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
2136 )?;
2137 let mut current_row_stmt = tx.prepare_cached(
2138 "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
2139 WHERE collection_name = ?1 AND record_key = ?2",
2140 )?;
2141
2142 for write in &prepared.operational_writes {
2143 let collection = operational_write_collection(write);
2144 let record_key = operational_write_record_key(write);
2145 let mutation_id = new_id();
2146 next_mutation_order += 1;
2147 let payload_json = operational_write_payload(write);
2148 ins_mutation.execute(params![
2149 &mutation_id,
2150 collection,
2151 record_key,
2152 operational_write_kind(write),
2153 payload_json,
2154 operational_write_source_ref(write),
2155 next_mutation_order,
2156 ])?;
2157 if let Some(indexes) = collection_secondary_indexes.get(collection) {
2158 for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
2159 ins_secondary_index.execute(params![
2160 collection,
2161 entry.index_name,
2162 "mutation",
2163 &mutation_id,
2164 record_key,
2165 entry.sort_timestamp,
2166 entry.slot1_text,
2167 entry.slot1_integer,
2168 entry.slot2_text,
2169 entry.slot2_integer,
2170 entry.slot3_text,
2171 entry.slot3_integer,
2172 ])?;
2173 }
2174 }
2175 if let Some(filter_fields) = prepared
2176 .operational_collection_filter_fields
2177 .get(collection)
2178 {
2179 for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
2180 ins_filter_value.execute(params![
2181 &mutation_id,
2182 collection,
2183 filter_value.field_name,
2184 filter_value.string_value,
2185 filter_value.integer_value,
2186 ])?;
2187 }
2188 }
2189
2190 if prepared.operational_collection_kinds.get(collection)
2191 == Some(&OperationalCollectionKind::LatestState)
2192 {
2193 del_current_secondary_indexes.execute(params![collection, record_key])?;
2194 match write {
2195 OperationalWrite::Put { payload_json, .. } => {
2196 upsert_current.execute(params![
2197 collection,
2198 record_key,
2199 payload_json,
2200 &mutation_id,
2201 ])?;
2202 if let Some(indexes) = collection_secondary_indexes.get(collection) {
2203 let (current_payload_json, updated_at, last_mutation_id): (
2204 String,
2205 i64,
2206 String,
2207 ) = current_row_stmt
2208 .query_row(params![collection, record_key], |row| {
2209 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
2210 })?;
2211 for entry in extract_secondary_index_entries_for_current(
2212 indexes,
2213 ¤t_payload_json,
2214 updated_at,
2215 ) {
2216 ins_secondary_index.execute(params![
2217 collection,
2218 entry.index_name,
2219 "current",
2220 last_mutation_id.as_str(),
2221 record_key,
2222 entry.sort_timestamp,
2223 entry.slot1_text,
2224 entry.slot1_integer,
2225 entry.slot2_text,
2226 entry.slot2_integer,
2227 entry.slot3_text,
2228 entry.slot3_integer,
2229 ])?;
2230 }
2231 }
2232 }
2233 OperationalWrite::Delete { .. } => {
2234 del_current.execute(params![collection, record_key])?;
2235 }
2236 OperationalWrite::Append { .. } => {}
2237 }
2238 }
2239 }
2240 }
2241
2242 {
2244 let mut ins_fts = tx.prepare_cached(
2245 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
2246 VALUES (?1, ?2, ?3, ?4)",
2247 )?;
2248 for fts_row in &prepared.required_fts_rows {
2249 ins_fts.execute(params![
2250 fts_row.chunk_id,
2251 fts_row.node_logical_id,
2252 fts_row.kind,
2253 fts_row.text_content,
2254 ])?;
2255 }
2256 }
2257
2258 if !prepared.required_property_fts_rows.is_empty() {
2260 let mut ins_positions = tx.prepare_cached(
2261 "INSERT INTO fts_node_property_positions \
2262 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
2263 VALUES (?1, ?2, ?3, ?4, ?5)",
2264 )?;
2265 let mut del_positions = tx
2268 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
2269 for row in &prepared.required_property_fts_rows {
2270 del_positions.execute(params![row.node_logical_id])?;
2271 let table = fathomdb_schema::fts_kind_table_name(&row.kind);
2273 if row.columns.is_empty() {
2274 tx.execute_batch(&format!(
2277 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
2278 node_logical_id UNINDEXED, text_content, \
2279 tokenize = '{DEFAULT_FTS_TOKENIZER}'\
2280 )"
2281 ))?;
2282 tx.prepare(&format!(
2283 "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
2284 ))?
2285 .execute(params![row.node_logical_id, row.text_content])?;
2286 } else {
2287 let col_names: Vec<&str> = row.columns.iter().map(|(n, _)| n.as_str()).collect();
2289 let placeholders: Vec<String> = (2..=row.columns.len() + 1)
2290 .map(|i| format!("?{i}"))
2291 .collect();
2292 let sql = format!(
2293 "INSERT INTO {table}(node_logical_id, {cols}) VALUES (?1, {placeholders})",
2294 cols = col_names.join(", "),
2295 placeholders = placeholders.join(", "),
2296 );
2297 let mut stmt = tx.prepare(&sql)?;
2298 stmt.execute(rusqlite::params_from_iter(
2299 std::iter::once(row.node_logical_id.as_str())
2300 .chain(row.columns.iter().map(|(_, v)| v.as_str())),
2301 ))?;
2302 }
2303 for pos in &row.positions {
2304 ins_positions.execute(params![
2305 row.node_logical_id,
2306 row.kind,
2307 i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
2308 i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
2309 pos.leaf_path,
2310 ])?;
2311 }
2312 }
2313 }
2314
2315 if !prepared.required_property_fts_rows.is_empty() {
2320 let mut ins_staging_simple = tx.prepare_cached(
2321 "INSERT INTO fts_property_rebuild_staging \
2322 (kind, node_logical_id, text_content) \
2323 VALUES (?1, ?2, ?3) \
2324 ON CONFLICT(kind, node_logical_id) DO UPDATE \
2325 SET text_content = excluded.text_content",
2326 )?;
2327 let mut ins_staging_columns = tx.prepare_cached(
2328 "INSERT INTO fts_property_rebuild_staging \
2329 (kind, node_logical_id, text_content, columns_json) \
2330 VALUES (?1, ?2, ?3, ?4) \
2331 ON CONFLICT(kind, node_logical_id) DO UPDATE \
2332 SET text_content = excluded.text_content, \
2333 columns_json = excluded.columns_json",
2334 )?;
2335 let mut check_rebuild = tx.prepare_cached(
2336 "SELECT 1 FROM fts_property_rebuild_state \
2337 WHERE kind = ?1 AND state IN ('PENDING','BUILDING','SWAPPING') LIMIT 1",
2338 )?;
2339 for row in &prepared.required_property_fts_rows {
2340 let in_rebuild: bool = check_rebuild
2341 .query_row(params![row.kind], |_| Ok(true))
2342 .optional()?
2343 .unwrap_or(false);
2344 if in_rebuild {
2345 if row.columns.is_empty() {
2346 ins_staging_simple.execute(params![
2347 row.kind,
2348 row.node_logical_id,
2349 row.text_content
2350 ])?;
2351 } else {
2352 let json_map: serde_json::Map<String, serde_json::Value> = row
2353 .columns
2354 .iter()
2355 .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
2356 .collect();
2357 let columns_json =
2358 serde_json::to_string(&serde_json::Value::Object(json_map)).ok();
2359 ins_staging_columns.execute(params![
2360 row.kind,
2361 row.node_logical_id,
2362 row.text_content,
2363 columns_json
2364 ])?;
2365 }
2366 }
2367 }
2368 }
2369
2370 #[cfg(feature = "sqlite-vec")]
2372 {
2373 match tx
2374 .prepare_cached("INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES (?1, ?2)")
2375 {
2376 Ok(mut ins_vec) => {
2377 for vi in &prepared.vec_inserts {
2378 let bytes: Vec<u8> =
2379 vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
2380 ins_vec.execute(params![vi.chunk_id, bytes])?;
2381 }
2382 }
2383 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {
2384 }
2386 Err(e) => return Err(e.into()),
2387 }
2388 }
2389
2390 tx.commit()?;
2391
2392 let provenance_warnings: Vec<String> = prepared
2393 .nodes
2394 .iter()
2395 .filter(|node| node.source_ref.is_none())
2396 .map(|node| format!("node '{}' has no source_ref", node.logical_id))
2397 .chain(
2398 prepared
2399 .node_retires
2400 .iter()
2401 .filter(|r| r.source_ref.is_none())
2402 .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
2403 )
2404 .chain(
2405 prepared
2406 .edges
2407 .iter()
2408 .filter(|e| e.source_ref.is_none())
2409 .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
2410 )
2411 .chain(
2412 prepared
2413 .edge_retires
2414 .iter()
2415 .filter(|r| r.source_ref.is_none())
2416 .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
2417 )
2418 .chain(
2419 prepared
2420 .runs
2421 .iter()
2422 .filter(|r| r.source_ref.is_none())
2423 .map(|r| format!("run '{}' has no source_ref", r.id)),
2424 )
2425 .chain(
2426 prepared
2427 .steps
2428 .iter()
2429 .filter(|s| s.source_ref.is_none())
2430 .map(|s| format!("step '{}' has no source_ref", s.id)),
2431 )
2432 .chain(
2433 prepared
2434 .actions
2435 .iter()
2436 .filter(|a| a.source_ref.is_none())
2437 .map(|a| format!("action '{}' has no source_ref", a.id)),
2438 )
2439 .chain(
2440 prepared
2441 .operational_writes
2442 .iter()
2443 .filter(|write| operational_write_source_ref(write).is_none())
2444 .map(|write| {
2445 format!(
2446 "operational {} '{}:{}' has no source_ref",
2447 operational_write_kind(write),
2448 operational_write_collection(write),
2449 operational_write_record_key(write)
2450 )
2451 }),
2452 )
2453 .collect();
2454
2455 let mut warnings = provenance_warnings.clone();
2456 warnings.extend(prepared.operational_validation_warnings.clone());
2457
2458 Ok(WriteReceipt {
2459 label: prepared.label.clone(),
2460 optional_backfill_count: prepared.optional_backfills.len(),
2461 warnings,
2462 provenance_warnings,
2463 })
2464}
2465
2466fn operational_write_collection(write: &OperationalWrite) -> &str {
2467 match write {
2468 OperationalWrite::Append { collection, .. }
2469 | OperationalWrite::Put { collection, .. }
2470 | OperationalWrite::Delete { collection, .. } => collection,
2471 }
2472}
2473
2474fn operational_write_record_key(write: &OperationalWrite) -> &str {
2475 match write {
2476 OperationalWrite::Append { record_key, .. }
2477 | OperationalWrite::Put { record_key, .. }
2478 | OperationalWrite::Delete { record_key, .. } => record_key,
2479 }
2480}
2481
2482fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2483 match write {
2484 OperationalWrite::Append { .. } => "append",
2485 OperationalWrite::Put { .. } => "put",
2486 OperationalWrite::Delete { .. } => "delete",
2487 }
2488}
2489
2490fn operational_write_payload(write: &OperationalWrite) -> &str {
2491 match write {
2492 OperationalWrite::Append { payload_json, .. }
2493 | OperationalWrite::Put { payload_json, .. } => payload_json,
2494 OperationalWrite::Delete { .. } => "null",
2495 }
2496}
2497
2498fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2499 match write {
2500 OperationalWrite::Append { source_ref, .. }
2501 | OperationalWrite::Put { source_ref, .. }
2502 | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2503 }
2504}
2505
2506#[cfg(test)]
2507#[allow(clippy::expect_used)]
2508mod tests {
2509 use std::sync::Arc;
2510
2511 use fathomdb_schema::SchemaManager;
2512 use tempfile::NamedTempFile;
2513
2514 use super::{apply_write, prepare_write, resolve_operational_writes};
2515 use crate::{
2516 ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2517 NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2518 StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2519 projection::ProjectionTarget,
2520 };
2521
2522 #[test]
2523 fn writer_executes_runtime_table_rows() {
2524 let db = NamedTempFile::new().expect("temporary db");
2525 let writer = WriterActor::start(
2526 db.path(),
2527 Arc::new(SchemaManager::new()),
2528 ProvenanceMode::Warn,
2529 Arc::new(TelemetryCounters::default()),
2530 )
2531 .expect("writer");
2532
2533 let receipt = writer
2534 .submit(WriteRequest {
2535 label: "runtime".to_owned(),
2536 nodes: vec![],
2537 node_retires: vec![],
2538 edges: vec![],
2539 edge_retires: vec![],
2540 chunks: vec![],
2541 runs: vec![RunInsert {
2542 id: "run-1".to_owned(),
2543 kind: "session".to_owned(),
2544 status: "completed".to_owned(),
2545 properties: "{}".to_owned(),
2546 source_ref: Some("src-1".to_owned()),
2547 upsert: false,
2548 supersedes_id: None,
2549 }],
2550 steps: vec![StepInsert {
2551 id: "step-1".to_owned(),
2552 run_id: "run-1".to_owned(),
2553 kind: "llm".to_owned(),
2554 status: "completed".to_owned(),
2555 properties: "{}".to_owned(),
2556 source_ref: Some("src-1".to_owned()),
2557 upsert: false,
2558 supersedes_id: None,
2559 }],
2560 actions: vec![ActionInsert {
2561 id: "action-1".to_owned(),
2562 step_id: "step-1".to_owned(),
2563 kind: "emit".to_owned(),
2564 status: "completed".to_owned(),
2565 properties: "{}".to_owned(),
2566 source_ref: Some("src-1".to_owned()),
2567 upsert: false,
2568 supersedes_id: None,
2569 }],
2570 optional_backfills: vec![],
2571 vec_inserts: vec![],
2572 operational_writes: vec![],
2573 })
2574 .expect("write receipt");
2575
2576 assert_eq!(receipt.label, "runtime");
2577 }
2578
2579 #[test]
2580 fn writer_put_operational_write_updates_current_and_mutations() {
2581 let db = NamedTempFile::new().expect("temporary db");
2582 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2583 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2584 conn.execute(
2585 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2586 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2587 [],
2588 )
2589 .expect("seed collection");
2590 drop(conn);
2591 let writer = WriterActor::start(
2592 db.path(),
2593 Arc::new(SchemaManager::new()),
2594 ProvenanceMode::Warn,
2595 Arc::new(TelemetryCounters::default()),
2596 )
2597 .expect("writer");
2598
2599 writer
2600 .submit(WriteRequest {
2601 label: "node-and-operational".to_owned(),
2602 nodes: vec![NodeInsert {
2603 row_id: "row-1".to_owned(),
2604 logical_id: "lg-1".to_owned(),
2605 kind: "Meeting".to_owned(),
2606 properties: "{}".to_owned(),
2607 source_ref: Some("src-1".to_owned()),
2608 upsert: false,
2609 chunk_policy: ChunkPolicy::Preserve,
2610 content_ref: None,
2611 }],
2612 node_retires: vec![],
2613 edges: vec![],
2614 edge_retires: vec![],
2615 chunks: vec![],
2616 runs: vec![],
2617 steps: vec![],
2618 actions: vec![],
2619 optional_backfills: vec![],
2620 vec_inserts: vec![],
2621 operational_writes: vec![OperationalWrite::Put {
2622 collection: "connector_health".to_owned(),
2623 record_key: "gmail".to_owned(),
2624 payload_json: r#"{"status":"ok"}"#.to_owned(),
2625 source_ref: Some("src-1".to_owned()),
2626 }],
2627 })
2628 .expect("write receipt");
2629
2630 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2631 let node_count: i64 = conn
2632 .query_row(
2633 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2634 [],
2635 |row| row.get(0),
2636 )
2637 .expect("node count");
2638 assert_eq!(node_count, 1);
2639 let mutation_count: i64 = conn
2640 .query_row(
2641 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2642 AND record_key = 'gmail'",
2643 [],
2644 |row| row.get(0),
2645 )
2646 .expect("mutation count");
2647 assert_eq!(mutation_count, 1);
2648 let payload: String = conn
2649 .query_row(
2650 "SELECT payload_json FROM operational_current \
2651 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2652 [],
2653 |row| row.get(0),
2654 )
2655 .expect("current payload");
2656 assert_eq!(payload, r#"{"status":"ok"}"#);
2657 }
2658
2659 #[test]
2660 fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2661 let db = NamedTempFile::new().expect("temporary db");
2662 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2663 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2664 conn.execute(
2665 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2666 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2667 [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2668 )
2669 .expect("seed collection");
2670 drop(conn);
2671 let writer = WriterActor::start(
2672 db.path(),
2673 Arc::new(SchemaManager::new()),
2674 ProvenanceMode::Warn,
2675 Arc::new(TelemetryCounters::default()),
2676 )
2677 .expect("writer");
2678
2679 writer
2680 .submit(WriteRequest {
2681 label: "disabled-validation".to_owned(),
2682 nodes: vec![],
2683 node_retires: vec![],
2684 edges: vec![],
2685 edge_retires: vec![],
2686 chunks: vec![],
2687 runs: vec![],
2688 steps: vec![],
2689 actions: vec![],
2690 optional_backfills: vec![],
2691 vec_inserts: vec![],
2692 operational_writes: vec![OperationalWrite::Put {
2693 collection: "connector_health".to_owned(),
2694 record_key: "gmail".to_owned(),
2695 payload_json: r#"{"bogus":true}"#.to_owned(),
2696 source_ref: Some("src-1".to_owned()),
2697 }],
2698 })
2699 .expect("write receipt");
2700
2701 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2702 let payload: String = conn
2703 .query_row(
2704 "SELECT payload_json FROM operational_current \
2705 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2706 [],
2707 |row| row.get(0),
2708 )
2709 .expect("current payload");
2710 assert_eq!(payload, r#"{"bogus":true}"#);
2711 }
2712
2713 #[test]
2714 fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2715 let db = NamedTempFile::new().expect("temporary db");
2716 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2717 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2718 conn.execute(
2719 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2720 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2721 [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2722 )
2723 .expect("seed collection");
2724 drop(conn);
2725 let writer = WriterActor::start(
2726 db.path(),
2727 Arc::new(SchemaManager::new()),
2728 ProvenanceMode::Warn,
2729 Arc::new(TelemetryCounters::default()),
2730 )
2731 .expect("writer");
2732
2733 let receipt = writer
2734 .submit(WriteRequest {
2735 label: "report-only-validation".to_owned(),
2736 nodes: vec![],
2737 node_retires: vec![],
2738 edges: vec![],
2739 edge_retires: vec![],
2740 chunks: vec![],
2741 runs: vec![],
2742 steps: vec![],
2743 actions: vec![],
2744 optional_backfills: vec![],
2745 vec_inserts: vec![],
2746 operational_writes: vec![OperationalWrite::Put {
2747 collection: "connector_health".to_owned(),
2748 record_key: "gmail".to_owned(),
2749 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2750 source_ref: Some("src-1".to_owned()),
2751 }],
2752 })
2753 .expect("report_only write should succeed");
2754
2755 assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2756 assert_eq!(receipt.warnings.len(), 1);
2757 assert!(
2758 receipt.warnings[0].contains("connector_health"),
2759 "warning should identify collection"
2760 );
2761 assert!(
2762 receipt.warnings[0].contains("must be one of"),
2763 "warning should explain validation failure"
2764 );
2765
2766 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2767 let payload: String = conn
2768 .query_row(
2769 "SELECT payload_json FROM operational_current \
2770 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2771 [],
2772 |row| row.get(0),
2773 )
2774 .expect("current payload");
2775 assert_eq!(payload, r#"{"status":"bogus"}"#);
2776 }
2777
2778 #[test]
2779 fn writer_rejects_operational_write_for_missing_collection() {
2780 let db = NamedTempFile::new().expect("temporary db");
2781 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2782 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2783 drop(conn);
2784 let writer = WriterActor::start(
2785 db.path(),
2786 Arc::new(SchemaManager::new()),
2787 ProvenanceMode::Warn,
2788 Arc::new(TelemetryCounters::default()),
2789 )
2790 .expect("writer");
2791
2792 let result = writer.submit(WriteRequest {
2793 label: "missing-operational-collection".to_owned(),
2794 nodes: vec![],
2795 node_retires: vec![],
2796 edges: vec![],
2797 edge_retires: vec![],
2798 chunks: vec![],
2799 runs: vec![],
2800 steps: vec![],
2801 actions: vec![],
2802 optional_backfills: vec![],
2803 vec_inserts: vec![],
2804 operational_writes: vec![OperationalWrite::Put {
2805 collection: "connector_health".to_owned(),
2806 record_key: "gmail".to_owned(),
2807 payload_json: r#"{"status":"ok"}"#.to_owned(),
2808 source_ref: Some("src-1".to_owned()),
2809 }],
2810 });
2811
2812 assert!(
2813 matches!(result, Err(EngineError::InvalidWrite(_))),
2814 "missing operational collection must return InvalidWrite"
2815 );
2816 }
2817
2818 #[test]
2819 fn writer_append_operational_write_records_history_without_current_row() {
2820 let db = NamedTempFile::new().expect("temporary db");
2821 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2822 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2823 conn.execute(
2824 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2825 VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2826 [],
2827 )
2828 .expect("seed collection");
2829 drop(conn);
2830 let writer = WriterActor::start(
2831 db.path(),
2832 Arc::new(SchemaManager::new()),
2833 ProvenanceMode::Warn,
2834 Arc::new(TelemetryCounters::default()),
2835 )
2836 .expect("writer");
2837
2838 writer
2839 .submit(WriteRequest {
2840 label: "append-operational".to_owned(),
2841 nodes: vec![],
2842 node_retires: vec![],
2843 edges: vec![],
2844 edge_retires: vec![],
2845 chunks: vec![],
2846 runs: vec![],
2847 steps: vec![],
2848 actions: vec![],
2849 optional_backfills: vec![],
2850 vec_inserts: vec![],
2851 operational_writes: vec![OperationalWrite::Append {
2852 collection: "audit_log".to_owned(),
2853 record_key: "evt-1".to_owned(),
2854 payload_json: r#"{"type":"sync"}"#.to_owned(),
2855 source_ref: Some("src-1".to_owned()),
2856 }],
2857 })
2858 .expect("write receipt");
2859
2860 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2861 let mutation: (String, String) = conn
2862 .query_row(
2863 "SELECT op_kind, payload_json FROM operational_mutations \
2864 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2865 [],
2866 |row| Ok((row.get(0)?, row.get(1)?)),
2867 )
2868 .expect("mutation row");
2869 assert_eq!(mutation.0, "append");
2870 assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2871 let current_count: i64 = conn
2872 .query_row(
2873 "SELECT count(*) FROM operational_current \
2874 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2875 [],
2876 |row| row.get(0),
2877 )
2878 .expect("current count");
2879 assert_eq!(current_count, 0);
2880 }
2881
2882 #[test]
2883 fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2884 let db = NamedTempFile::new().expect("temporary db");
2885 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2886 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2887 conn.execute(
2888 "INSERT INTO operational_collections \
2889 (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2890 VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2891 '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2892 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2893 )
2894 .expect("seed collection");
2895 drop(conn);
2896 let writer = WriterActor::start(
2897 db.path(),
2898 Arc::new(SchemaManager::new()),
2899 ProvenanceMode::Warn,
2900 Arc::new(TelemetryCounters::default()),
2901 )
2902 .expect("writer");
2903
2904 let error = writer
2905 .submit(WriteRequest {
2906 label: "invalid-append".to_owned(),
2907 nodes: vec![],
2908 node_retires: vec![],
2909 edges: vec![],
2910 edge_retires: vec![],
2911 chunks: vec![],
2912 runs: vec![],
2913 steps: vec![],
2914 actions: vec![],
2915 optional_backfills: vec![],
2916 vec_inserts: vec![],
2917 operational_writes: vec![OperationalWrite::Append {
2918 collection: "audit_log".to_owned(),
2919 record_key: "evt-1".to_owned(),
2920 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2921 source_ref: Some("src-1".to_owned()),
2922 }],
2923 })
2924 .expect_err("invalid append must reject");
2925 assert!(matches!(error, EngineError::InvalidWrite(_)));
2926 assert!(error.to_string().contains("must be one of"));
2927
2928 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2929 let mutation_count: i64 = conn
2930 .query_row(
2931 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2932 [],
2933 |row| row.get(0),
2934 )
2935 .expect("mutation count");
2936 assert_eq!(mutation_count, 0);
2937 let filter_count: i64 = conn
2938 .query_row(
2939 "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2940 [],
2941 |row| row.get(0),
2942 )
2943 .expect("filter count");
2944 assert_eq!(filter_count, 0);
2945 }
2946
2947 #[test]
2948 fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2949 let db = NamedTempFile::new().expect("temporary db");
2950 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2951 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2952 conn.execute(
2953 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2954 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2955 [],
2956 )
2957 .expect("seed collection");
2958 drop(conn);
2959 let writer = WriterActor::start(
2960 db.path(),
2961 Arc::new(SchemaManager::new()),
2962 ProvenanceMode::Warn,
2963 Arc::new(TelemetryCounters::default()),
2964 )
2965 .expect("writer");
2966
2967 writer
2968 .submit(WriteRequest {
2969 label: "put-operational".to_owned(),
2970 nodes: vec![],
2971 node_retires: vec![],
2972 edges: vec![],
2973 edge_retires: vec![],
2974 chunks: vec![],
2975 runs: vec![],
2976 steps: vec![],
2977 actions: vec![],
2978 optional_backfills: vec![],
2979 vec_inserts: vec![],
2980 operational_writes: vec![OperationalWrite::Put {
2981 collection: "connector_health".to_owned(),
2982 record_key: "gmail".to_owned(),
2983 payload_json: r#"{"status":"ok"}"#.to_owned(),
2984 source_ref: Some("src-1".to_owned()),
2985 }],
2986 })
2987 .expect("put receipt");
2988
2989 writer
2990 .submit(WriteRequest {
2991 label: "delete-operational".to_owned(),
2992 nodes: vec![],
2993 node_retires: vec![],
2994 edges: vec![],
2995 edge_retires: vec![],
2996 chunks: vec![],
2997 runs: vec![],
2998 steps: vec![],
2999 actions: vec![],
3000 optional_backfills: vec![],
3001 vec_inserts: vec![],
3002 operational_writes: vec![OperationalWrite::Delete {
3003 collection: "connector_health".to_owned(),
3004 record_key: "gmail".to_owned(),
3005 source_ref: Some("src-2".to_owned()),
3006 }],
3007 })
3008 .expect("delete receipt");
3009
3010 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3011 let mutation_kinds: Vec<String> = {
3012 let mut stmt = conn
3013 .prepare(
3014 "SELECT op_kind FROM operational_mutations \
3015 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3016 ORDER BY mutation_order ASC",
3017 )
3018 .expect("stmt");
3019 stmt.query_map([], |row| row.get(0))
3020 .expect("rows")
3021 .collect::<Result<_, _>>()
3022 .expect("collect")
3023 };
3024 assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
3025 let current_count: i64 = conn
3026 .query_row(
3027 "SELECT count(*) FROM operational_current \
3028 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3029 [],
3030 |row| row.get(0),
3031 )
3032 .expect("current count");
3033 assert_eq!(current_count, 0);
3034 }
3035
3036 #[test]
3037 fn writer_delete_bypasses_validation_contract() {
3038 let db = NamedTempFile::new().expect("temporary db");
3039 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3040 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3041 conn.execute(
3042 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3043 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3044 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3045 )
3046 .expect("seed collection");
3047 drop(conn);
3048 let writer = WriterActor::start(
3049 db.path(),
3050 Arc::new(SchemaManager::new()),
3051 ProvenanceMode::Warn,
3052 Arc::new(TelemetryCounters::default()),
3053 )
3054 .expect("writer");
3055
3056 writer
3057 .submit(WriteRequest {
3058 label: "valid-put".to_owned(),
3059 nodes: vec![],
3060 node_retires: vec![],
3061 edges: vec![],
3062 edge_retires: vec![],
3063 chunks: vec![],
3064 runs: vec![],
3065 steps: vec![],
3066 actions: vec![],
3067 optional_backfills: vec![],
3068 vec_inserts: vec![],
3069 operational_writes: vec![OperationalWrite::Put {
3070 collection: "connector_health".to_owned(),
3071 record_key: "gmail".to_owned(),
3072 payload_json: r#"{"status":"ok"}"#.to_owned(),
3073 source_ref: Some("src-1".to_owned()),
3074 }],
3075 })
3076 .expect("put receipt");
3077 writer
3078 .submit(WriteRequest {
3079 label: "delete-after-put".to_owned(),
3080 nodes: vec![],
3081 node_retires: vec![],
3082 edges: vec![],
3083 edge_retires: vec![],
3084 chunks: vec![],
3085 runs: vec![],
3086 steps: vec![],
3087 actions: vec![],
3088 optional_backfills: vec![],
3089 vec_inserts: vec![],
3090 operational_writes: vec![OperationalWrite::Delete {
3091 collection: "connector_health".to_owned(),
3092 record_key: "gmail".to_owned(),
3093 source_ref: Some("src-2".to_owned()),
3094 }],
3095 })
3096 .expect("delete receipt");
3097
3098 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3099 let current_count: i64 = conn
3100 .query_row(
3101 "SELECT count(*) FROM operational_current \
3102 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3103 [],
3104 |row| row.get(0),
3105 )
3106 .expect("current count");
3107 assert_eq!(current_count, 0);
3108 }
3109
3110 #[test]
3111 fn writer_latest_state_secondary_indexes_track_put_and_delete() {
3112 let db = NamedTempFile::new().expect("temporary db");
3113 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3114 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3115 conn.execute(
3116 "INSERT INTO operational_collections \
3117 (name, kind, schema_json, retention_json, secondary_indexes_json) \
3118 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3119 [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
3120 )
3121 .expect("seed collection");
3122 drop(conn);
3123 let writer = WriterActor::start(
3124 db.path(),
3125 Arc::new(SchemaManager::new()),
3126 ProvenanceMode::Warn,
3127 Arc::new(TelemetryCounters::default()),
3128 )
3129 .expect("writer");
3130
3131 writer
3132 .submit(WriteRequest {
3133 label: "secondary-index-put".to_owned(),
3134 nodes: vec![],
3135 node_retires: vec![],
3136 edges: vec![],
3137 edge_retires: vec![],
3138 chunks: vec![],
3139 runs: vec![],
3140 steps: vec![],
3141 actions: vec![],
3142 optional_backfills: vec![],
3143 vec_inserts: vec![],
3144 operational_writes: vec![OperationalWrite::Put {
3145 collection: "connector_health".to_owned(),
3146 record_key: "gmail".to_owned(),
3147 payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
3148 .to_owned(),
3149 source_ref: Some("src-1".to_owned()),
3150 }],
3151 })
3152 .expect("put receipt");
3153
3154 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3155 let current_entry_count: i64 = conn
3156 .query_row(
3157 "SELECT count(*) FROM operational_secondary_index_entries \
3158 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3159 [],
3160 |row| row.get(0),
3161 )
3162 .expect("current secondary index count");
3163 assert_eq!(current_entry_count, 2);
3164 drop(conn);
3165
3166 writer
3167 .submit(WriteRequest {
3168 label: "secondary-index-delete".to_owned(),
3169 nodes: vec![],
3170 node_retires: vec![],
3171 edges: vec![],
3172 edge_retires: vec![],
3173 chunks: vec![],
3174 runs: vec![],
3175 steps: vec![],
3176 actions: vec![],
3177 optional_backfills: vec![],
3178 vec_inserts: vec![],
3179 operational_writes: vec![OperationalWrite::Delete {
3180 collection: "connector_health".to_owned(),
3181 record_key: "gmail".to_owned(),
3182 source_ref: Some("src-2".to_owned()),
3183 }],
3184 })
3185 .expect("delete receipt");
3186
3187 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3188 let current_entry_count: i64 = conn
3189 .query_row(
3190 "SELECT count(*) FROM operational_secondary_index_entries \
3191 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3192 [],
3193 |row| row.get(0),
3194 )
3195 .expect("current secondary index count");
3196 assert_eq!(current_entry_count, 0);
3197 }
3198
3199 #[test]
3200 fn writer_latest_state_operational_writes_persist_mutation_order() {
3201 let db = NamedTempFile::new().expect("temporary db");
3202 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3203 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3204 conn.execute(
3205 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3206 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3207 [],
3208 )
3209 .expect("seed collection");
3210 drop(conn);
3211
3212 let writer = WriterActor::start(
3213 db.path(),
3214 Arc::new(SchemaManager::new()),
3215 ProvenanceMode::Warn,
3216 Arc::new(TelemetryCounters::default()),
3217 )
3218 .expect("writer");
3219
3220 writer
3221 .submit(WriteRequest {
3222 label: "ordered-operational-batch".to_owned(),
3223 nodes: vec![],
3224 node_retires: vec![],
3225 edges: vec![],
3226 edge_retires: vec![],
3227 chunks: vec![],
3228 runs: vec![],
3229 steps: vec![],
3230 actions: vec![],
3231 optional_backfills: vec![],
3232 vec_inserts: vec![],
3233 operational_writes: vec![
3234 OperationalWrite::Put {
3235 collection: "connector_health".to_owned(),
3236 record_key: "gmail".to_owned(),
3237 payload_json: r#"{"status":"old"}"#.to_owned(),
3238 source_ref: Some("src-1".to_owned()),
3239 },
3240 OperationalWrite::Delete {
3241 collection: "connector_health".to_owned(),
3242 record_key: "gmail".to_owned(),
3243 source_ref: Some("src-2".to_owned()),
3244 },
3245 OperationalWrite::Put {
3246 collection: "connector_health".to_owned(),
3247 record_key: "gmail".to_owned(),
3248 payload_json: r#"{"status":"new"}"#.to_owned(),
3249 source_ref: Some("src-3".to_owned()),
3250 },
3251 ],
3252 })
3253 .expect("write receipt");
3254
3255 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3256 let rows: Vec<(String, i64)> = {
3257 let mut stmt = conn
3258 .prepare(
3259 "SELECT op_kind, mutation_order FROM operational_mutations \
3260 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3261 ORDER BY mutation_order ASC",
3262 )
3263 .expect("stmt");
3264 stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3265 .expect("rows")
3266 .collect::<Result<_, _>>()
3267 .expect("collect")
3268 };
3269 assert_eq!(
3270 rows,
3271 vec![
3272 ("put".to_owned(), 1),
3273 ("delete".to_owned(), 2),
3274 ("put".to_owned(), 3),
3275 ]
3276 );
3277 let payload: String = conn
3278 .query_row(
3279 "SELECT payload_json FROM operational_current \
3280 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3281 [],
3282 |row| row.get(0),
3283 )
3284 .expect("current payload");
3285 assert_eq!(payload, r#"{"status":"new"}"#);
3286 }
3287
3288 #[test]
3289 fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
3290 let db = NamedTempFile::new().expect("temporary db");
3291 let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
3292 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3293 conn.execute(
3294 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3295 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3296 [],
3297 )
3298 .expect("seed collection");
3299
3300 let request = WriteRequest {
3301 label: "disabled-race".to_owned(),
3302 nodes: vec![],
3303 node_retires: vec![],
3304 edges: vec![],
3305 edge_retires: vec![],
3306 chunks: vec![],
3307 runs: vec![],
3308 steps: vec![],
3309 actions: vec![],
3310 optional_backfills: vec![],
3311 vec_inserts: vec![],
3312 operational_writes: vec![OperationalWrite::Put {
3313 collection: "connector_health".to_owned(),
3314 record_key: "gmail".to_owned(),
3315 payload_json: r#"{"status":"ok"}"#.to_owned(),
3316 source_ref: Some("src-1".to_owned()),
3317 }],
3318 };
3319 let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
3320 resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
3321
3322 conn.execute(
3323 "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
3324 [],
3325 )
3326 .expect("disable collection after preflight");
3327
3328 let error =
3329 apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
3330 assert!(matches!(error, EngineError::InvalidWrite(_)));
3331 assert!(error.to_string().contains("is disabled"));
3332
3333 let mutation_count: i64 = conn
3334 .query_row(
3335 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3336 [],
3337 |row| row.get(0),
3338 )
3339 .expect("mutation count");
3340 assert_eq!(mutation_count, 0);
3341
3342 let current_count: i64 = conn
3343 .query_row(
3344 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3345 [],
3346 |row| row.get(0),
3347 )
3348 .expect("current count");
3349 assert_eq!(current_count, 0);
3350 }
3351
3352 #[test]
3353 fn writer_enforce_validation_rejects_invalid_put_atomically() {
3354 let db = NamedTempFile::new().expect("temporary db");
3355 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3356 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3357 conn.execute(
3358 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3359 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3360 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3361 )
3362 .expect("seed collection");
3363 drop(conn);
3364 let writer = WriterActor::start(
3365 db.path(),
3366 Arc::new(SchemaManager::new()),
3367 ProvenanceMode::Warn,
3368 Arc::new(TelemetryCounters::default()),
3369 )
3370 .expect("writer");
3371
3372 let error = writer
3373 .submit(WriteRequest {
3374 label: "invalid-put".to_owned(),
3375 nodes: vec![NodeInsert {
3376 row_id: "row-1".to_owned(),
3377 logical_id: "lg-1".to_owned(),
3378 kind: "Meeting".to_owned(),
3379 properties: "{}".to_owned(),
3380 source_ref: Some("src-1".to_owned()),
3381 upsert: false,
3382 chunk_policy: ChunkPolicy::Preserve,
3383 content_ref: None,
3384 }],
3385 node_retires: vec![],
3386 edges: vec![],
3387 edge_retires: vec![],
3388 chunks: vec![],
3389 runs: vec![],
3390 steps: vec![],
3391 actions: vec![],
3392 optional_backfills: vec![],
3393 vec_inserts: vec![],
3394 operational_writes: vec![OperationalWrite::Put {
3395 collection: "connector_health".to_owned(),
3396 record_key: "gmail".to_owned(),
3397 payload_json: r#"{"status":"bogus"}"#.to_owned(),
3398 source_ref: Some("src-1".to_owned()),
3399 }],
3400 })
3401 .expect_err("invalid put must reject");
3402 assert!(matches!(error, EngineError::InvalidWrite(_)));
3403 assert!(error.to_string().contains("must be one of"));
3404
3405 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3406 let node_count: i64 = conn
3407 .query_row(
3408 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
3409 [],
3410 |row| row.get(0),
3411 )
3412 .expect("node count");
3413 assert_eq!(node_count, 0);
3414 let mutation_count: i64 = conn
3415 .query_row(
3416 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3417 [],
3418 |row| row.get(0),
3419 )
3420 .expect("mutation count");
3421 assert_eq!(mutation_count, 0);
3422 let current_count: i64 = conn
3423 .query_row(
3424 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3425 [],
3426 |row| row.get(0),
3427 )
3428 .expect("current count");
3429 assert_eq!(current_count, 0);
3430 }
3431
3432 #[test]
3433 fn writer_rejects_append_against_latest_state_collection() {
3434 let db = NamedTempFile::new().expect("temporary db");
3435 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3436 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3437 conn.execute(
3438 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3439 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3440 [],
3441 )
3442 .expect("seed collection");
3443 drop(conn);
3444 let writer = WriterActor::start(
3445 db.path(),
3446 Arc::new(SchemaManager::new()),
3447 ProvenanceMode::Warn,
3448 Arc::new(TelemetryCounters::default()),
3449 )
3450 .expect("writer");
3451
3452 let result = writer.submit(WriteRequest {
3453 label: "bad-append".to_owned(),
3454 nodes: vec![],
3455 node_retires: vec![],
3456 edges: vec![],
3457 edge_retires: vec![],
3458 chunks: vec![],
3459 runs: vec![],
3460 steps: vec![],
3461 actions: vec![],
3462 optional_backfills: vec![],
3463 vec_inserts: vec![],
3464 operational_writes: vec![OperationalWrite::Append {
3465 collection: "connector_health".to_owned(),
3466 record_key: "gmail".to_owned(),
3467 payload_json: r#"{"status":"ok"}"#.to_owned(),
3468 source_ref: Some("src-1".to_owned()),
3469 }],
3470 });
3471
3472 assert!(
3473 matches!(result, Err(EngineError::InvalidWrite(_))),
3474 "latest_state collection must reject Append"
3475 );
3476 }
3477
3478 #[test]
3479 fn writer_upsert_supersedes_prior_active_node() {
3480 let db = NamedTempFile::new().expect("temporary db");
3481 let writer = WriterActor::start(
3482 db.path(),
3483 Arc::new(SchemaManager::new()),
3484 ProvenanceMode::Warn,
3485 Arc::new(TelemetryCounters::default()),
3486 )
3487 .expect("writer");
3488
3489 writer
3490 .submit(WriteRequest {
3491 label: "v1".to_owned(),
3492 nodes: vec![NodeInsert {
3493 row_id: "row-1".to_owned(),
3494 logical_id: "lg-1".to_owned(),
3495 kind: "Meeting".to_owned(),
3496 properties: r#"{"version":1}"#.to_owned(),
3497 source_ref: Some("src-1".to_owned()),
3498 upsert: false,
3499 chunk_policy: ChunkPolicy::Preserve,
3500 content_ref: None,
3501 }],
3502 node_retires: vec![],
3503 edges: vec![],
3504 edge_retires: vec![],
3505 chunks: vec![],
3506 runs: vec![],
3507 steps: vec![],
3508 actions: vec![],
3509 optional_backfills: vec![],
3510 vec_inserts: vec![],
3511 operational_writes: vec![],
3512 })
3513 .expect("v1 write");
3514
3515 writer
3516 .submit(WriteRequest {
3517 label: "v2".to_owned(),
3518 nodes: vec![NodeInsert {
3519 row_id: "row-2".to_owned(),
3520 logical_id: "lg-1".to_owned(),
3521 kind: "Meeting".to_owned(),
3522 properties: r#"{"version":2}"#.to_owned(),
3523 source_ref: Some("src-2".to_owned()),
3524 upsert: true,
3525 chunk_policy: ChunkPolicy::Preserve,
3526 content_ref: None,
3527 }],
3528 node_retires: vec![],
3529 edges: vec![],
3530 edge_retires: vec![],
3531 chunks: vec![],
3532 runs: vec![],
3533 steps: vec![],
3534 actions: vec![],
3535 optional_backfills: vec![],
3536 vec_inserts: vec![],
3537 operational_writes: vec![],
3538 })
3539 .expect("v2 upsert write");
3540
3541 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3542 let (active_row_id, props): (String, String) = conn
3543 .query_row(
3544 "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3545 [],
3546 |row| Ok((row.get(0)?, row.get(1)?)),
3547 )
3548 .expect("active row");
3549 assert_eq!(active_row_id, "row-2");
3550 assert!(props.contains("\"version\":2"));
3551
3552 let superseded: i64 = conn
3553 .query_row(
3554 "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3555 [],
3556 |row| row.get(0),
3557 )
3558 .expect("superseded count");
3559 assert_eq!(superseded, 1);
3560 }
3561
3562 #[test]
3563 fn writer_inserts_edge_between_two_nodes() {
3564 let db = NamedTempFile::new().expect("temporary db");
3565 let writer = WriterActor::start(
3566 db.path(),
3567 Arc::new(SchemaManager::new()),
3568 ProvenanceMode::Warn,
3569 Arc::new(TelemetryCounters::default()),
3570 )
3571 .expect("writer");
3572
3573 writer
3574 .submit(WriteRequest {
3575 label: "nodes-and-edge".to_owned(),
3576 nodes: vec![
3577 NodeInsert {
3578 row_id: "row-meeting".to_owned(),
3579 logical_id: "meeting-1".to_owned(),
3580 kind: "Meeting".to_owned(),
3581 properties: "{}".to_owned(),
3582 source_ref: Some("src-1".to_owned()),
3583 upsert: false,
3584 chunk_policy: ChunkPolicy::Preserve,
3585 content_ref: None,
3586 },
3587 NodeInsert {
3588 row_id: "row-task".to_owned(),
3589 logical_id: "task-1".to_owned(),
3590 kind: "Task".to_owned(),
3591 properties: "{}".to_owned(),
3592 source_ref: Some("src-1".to_owned()),
3593 upsert: false,
3594 chunk_policy: ChunkPolicy::Preserve,
3595 content_ref: None,
3596 },
3597 ],
3598 node_retires: vec![],
3599 edges: vec![EdgeInsert {
3600 row_id: "edge-1".to_owned(),
3601 logical_id: "edge-lg-1".to_owned(),
3602 source_logical_id: "meeting-1".to_owned(),
3603 target_logical_id: "task-1".to_owned(),
3604 kind: "HAS_TASK".to_owned(),
3605 properties: "{}".to_owned(),
3606 source_ref: Some("src-1".to_owned()),
3607 upsert: false,
3608 }],
3609 edge_retires: vec![],
3610 chunks: vec![],
3611 runs: vec![],
3612 steps: vec![],
3613 actions: vec![],
3614 optional_backfills: vec![],
3615 vec_inserts: vec![],
3616 operational_writes: vec![],
3617 })
3618 .expect("write receipt");
3619
3620 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3621 let (src, tgt, kind): (String, String, String) = conn
3622 .query_row(
3623 "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3624 [],
3625 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3626 )
3627 .expect("edge row");
3628 assert_eq!(src, "meeting-1");
3629 assert_eq!(tgt, "task-1");
3630 assert_eq!(kind, "HAS_TASK");
3631 }
3632
3633 #[test]
3634 #[allow(clippy::too_many_lines)]
3635 fn writer_upsert_supersedes_prior_active_edge() {
3636 let db = NamedTempFile::new().expect("temporary db");
3637 let writer = WriterActor::start(
3638 db.path(),
3639 Arc::new(SchemaManager::new()),
3640 ProvenanceMode::Warn,
3641 Arc::new(TelemetryCounters::default()),
3642 )
3643 .expect("writer");
3644
3645 writer
3647 .submit(WriteRequest {
3648 label: "nodes".to_owned(),
3649 nodes: vec![
3650 NodeInsert {
3651 row_id: "row-a".to_owned(),
3652 logical_id: "node-a".to_owned(),
3653 kind: "Meeting".to_owned(),
3654 properties: "{}".to_owned(),
3655 source_ref: Some("src-1".to_owned()),
3656 upsert: false,
3657 chunk_policy: ChunkPolicy::Preserve,
3658 content_ref: None,
3659 },
3660 NodeInsert {
3661 row_id: "row-b".to_owned(),
3662 logical_id: "node-b".to_owned(),
3663 kind: "Task".to_owned(),
3664 properties: "{}".to_owned(),
3665 source_ref: Some("src-1".to_owned()),
3666 upsert: false,
3667 chunk_policy: ChunkPolicy::Preserve,
3668 content_ref: None,
3669 },
3670 ],
3671 node_retires: vec![],
3672 edges: vec![],
3673 edge_retires: vec![],
3674 chunks: vec![],
3675 runs: vec![],
3676 steps: vec![],
3677 actions: vec![],
3678 optional_backfills: vec![],
3679 vec_inserts: vec![],
3680 operational_writes: vec![],
3681 })
3682 .expect("nodes write");
3683
3684 writer
3686 .submit(WriteRequest {
3687 label: "edge-v1".to_owned(),
3688 nodes: vec![],
3689 node_retires: vec![],
3690 edges: vec![EdgeInsert {
3691 row_id: "edge-row-1".to_owned(),
3692 logical_id: "edge-lg-1".to_owned(),
3693 source_logical_id: "node-a".to_owned(),
3694 target_logical_id: "node-b".to_owned(),
3695 kind: "HAS_TASK".to_owned(),
3696 properties: r#"{"weight":1}"#.to_owned(),
3697 source_ref: Some("src-1".to_owned()),
3698 upsert: false,
3699 }],
3700 edge_retires: vec![],
3701 chunks: vec![],
3702 runs: vec![],
3703 steps: vec![],
3704 actions: vec![],
3705 optional_backfills: vec![],
3706 vec_inserts: vec![],
3707 operational_writes: vec![],
3708 })
3709 .expect("edge v1 write");
3710
3711 writer
3713 .submit(WriteRequest {
3714 label: "edge-v2".to_owned(),
3715 nodes: vec![],
3716 node_retires: vec![],
3717 edges: vec![EdgeInsert {
3718 row_id: "edge-row-2".to_owned(),
3719 logical_id: "edge-lg-1".to_owned(),
3720 source_logical_id: "node-a".to_owned(),
3721 target_logical_id: "node-b".to_owned(),
3722 kind: "HAS_TASK".to_owned(),
3723 properties: r#"{"weight":2}"#.to_owned(),
3724 source_ref: Some("src-2".to_owned()),
3725 upsert: true,
3726 }],
3727 edge_retires: vec![],
3728 chunks: vec![],
3729 runs: vec![],
3730 steps: vec![],
3731 actions: vec![],
3732 optional_backfills: vec![],
3733 vec_inserts: vec![],
3734 operational_writes: vec![],
3735 })
3736 .expect("edge v2 upsert");
3737
3738 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3739 let (active_row_id, props): (String, String) = conn
3740 .query_row(
3741 "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3742 [],
3743 |row| Ok((row.get(0)?, row.get(1)?)),
3744 )
3745 .expect("active edge");
3746 assert_eq!(active_row_id, "edge-row-2");
3747 assert!(props.contains("\"weight\":2"));
3748
3749 let superseded: i64 = conn
3750 .query_row(
3751 "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3752 [],
3753 |row| row.get(0),
3754 )
3755 .expect("superseded count");
3756 assert_eq!(superseded, 1);
3757 }
3758
3759 #[test]
3760 fn writer_fts_rows_are_written_to_database() {
3761 let db = NamedTempFile::new().expect("temporary db");
3762 let writer = WriterActor::start(
3763 db.path(),
3764 Arc::new(SchemaManager::new()),
3765 ProvenanceMode::Warn,
3766 Arc::new(TelemetryCounters::default()),
3767 )
3768 .expect("writer");
3769
3770 writer
3771 .submit(WriteRequest {
3772 label: "seed".to_owned(),
3773 nodes: vec![NodeInsert {
3774 row_id: "row-1".to_owned(),
3775 logical_id: "logical-1".to_owned(),
3776 kind: "Meeting".to_owned(),
3777 properties: "{}".to_owned(),
3778 source_ref: Some("src-1".to_owned()),
3779 upsert: false,
3780 chunk_policy: ChunkPolicy::Preserve,
3781 content_ref: None,
3782 }],
3783 node_retires: vec![],
3784 edges: vec![],
3785 edge_retires: vec![],
3786 chunks: vec![ChunkInsert {
3787 id: "chunk-1".to_owned(),
3788 node_logical_id: "logical-1".to_owned(),
3789 text_content: "budget discussion".to_owned(),
3790 byte_start: None,
3791 byte_end: None,
3792 content_hash: None,
3793 }],
3794 runs: vec![],
3795 steps: vec![],
3796 actions: vec![],
3797 optional_backfills: vec![],
3798 vec_inserts: vec![],
3799 operational_writes: vec![],
3800 })
3801 .expect("write receipt");
3802
3803 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3804 let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3805 conn.query_row(
3806 "SELECT chunk_id, node_logical_id, kind, text_content \
3807 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3808 [],
3809 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3810 )
3811 .expect("fts row");
3812 assert_eq!(chunk_id, "chunk-1");
3813 assert_eq!(node_logical_id, "logical-1");
3814 assert_eq!(kind, "Meeting");
3815 assert_eq!(text_content, "budget discussion");
3816 }
3817
3818 #[test]
3819 fn writer_receipt_warns_on_nodes_without_source_ref() {
3820 let db = NamedTempFile::new().expect("temporary db");
3821 let writer = WriterActor::start(
3822 db.path(),
3823 Arc::new(SchemaManager::new()),
3824 ProvenanceMode::Warn,
3825 Arc::new(TelemetryCounters::default()),
3826 )
3827 .expect("writer");
3828
3829 let receipt = writer
3830 .submit(WriteRequest {
3831 label: "no-source".to_owned(),
3832 nodes: vec![NodeInsert {
3833 row_id: "row-1".to_owned(),
3834 logical_id: "logical-1".to_owned(),
3835 kind: "Meeting".to_owned(),
3836 properties: "{}".to_owned(),
3837 source_ref: None,
3838 upsert: false,
3839 chunk_policy: ChunkPolicy::Preserve,
3840 content_ref: None,
3841 }],
3842 node_retires: vec![],
3843 edges: vec![],
3844 edge_retires: vec![],
3845 chunks: vec![],
3846 runs: vec![],
3847 steps: vec![],
3848 actions: vec![],
3849 optional_backfills: vec![],
3850 vec_inserts: vec![],
3851 operational_writes: vec![],
3852 })
3853 .expect("write receipt");
3854
3855 assert_eq!(receipt.provenance_warnings.len(), 1);
3856 assert!(receipt.provenance_warnings[0].contains("logical-1"));
3857 }
3858
3859 #[test]
3860 fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3861 let db = NamedTempFile::new().expect("temporary db");
3862 let writer = WriterActor::start(
3863 db.path(),
3864 Arc::new(SchemaManager::new()),
3865 ProvenanceMode::Warn,
3866 Arc::new(TelemetryCounters::default()),
3867 )
3868 .expect("writer");
3869
3870 let receipt = writer
3871 .submit(WriteRequest {
3872 label: "with-source".to_owned(),
3873 nodes: vec![NodeInsert {
3874 row_id: "row-1".to_owned(),
3875 logical_id: "logical-1".to_owned(),
3876 kind: "Meeting".to_owned(),
3877 properties: "{}".to_owned(),
3878 source_ref: Some("src-1".to_owned()),
3879 upsert: false,
3880 chunk_policy: ChunkPolicy::Preserve,
3881 content_ref: None,
3882 }],
3883 node_retires: vec![],
3884 edges: vec![],
3885 edge_retires: vec![],
3886 chunks: vec![],
3887 runs: vec![],
3888 steps: vec![],
3889 actions: vec![],
3890 optional_backfills: vec![],
3891 vec_inserts: vec![],
3892 operational_writes: vec![],
3893 })
3894 .expect("write receipt");
3895
3896 assert!(receipt.provenance_warnings.is_empty());
3897 }
3898
3899 #[test]
3900 fn writer_accepts_chunk_for_pre_existing_node() {
3901 let db = NamedTempFile::new().expect("temporary db");
3902 let writer = WriterActor::start(
3903 db.path(),
3904 Arc::new(SchemaManager::new()),
3905 ProvenanceMode::Warn,
3906 Arc::new(TelemetryCounters::default()),
3907 )
3908 .expect("writer");
3909
3910 writer
3912 .submit(WriteRequest {
3913 label: "r1".to_owned(),
3914 nodes: vec![NodeInsert {
3915 row_id: "row-1".to_owned(),
3916 logical_id: "logical-1".to_owned(),
3917 kind: "Meeting".to_owned(),
3918 properties: "{}".to_owned(),
3919 source_ref: Some("src-1".to_owned()),
3920 upsert: false,
3921 chunk_policy: ChunkPolicy::Preserve,
3922 content_ref: None,
3923 }],
3924 node_retires: vec![],
3925 edges: vec![],
3926 edge_retires: vec![],
3927 chunks: vec![],
3928 runs: vec![],
3929 steps: vec![],
3930 actions: vec![],
3931 optional_backfills: vec![],
3932 vec_inserts: vec![],
3933 operational_writes: vec![],
3934 })
3935 .expect("r1 write");
3936
3937 writer
3939 .submit(WriteRequest {
3940 label: "r2".to_owned(),
3941 nodes: vec![],
3942 node_retires: vec![],
3943 edges: vec![],
3944 edge_retires: vec![],
3945 chunks: vec![ChunkInsert {
3946 id: "chunk-1".to_owned(),
3947 node_logical_id: "logical-1".to_owned(),
3948 text_content: "budget discussion".to_owned(),
3949 byte_start: None,
3950 byte_end: None,
3951 content_hash: None,
3952 }],
3953 runs: vec![],
3954 steps: vec![],
3955 actions: vec![],
3956 optional_backfills: vec![],
3957 vec_inserts: vec![],
3958 operational_writes: vec![],
3959 })
3960 .expect("r2 write — chunk for pre-existing node");
3961
3962 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3963 let count: i64 = conn
3964 .query_row(
3965 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3966 [],
3967 |row| row.get(0),
3968 )
3969 .expect("fts count");
3970 assert_eq!(
3971 count, 1,
3972 "FTS row must exist for chunk attached to pre-existing node"
3973 );
3974 }
3975
3976 #[test]
3977 fn writer_rejects_chunk_for_completely_unknown_node() {
3978 let db = NamedTempFile::new().expect("temporary db");
3979 let writer = WriterActor::start(
3980 db.path(),
3981 Arc::new(SchemaManager::new()),
3982 ProvenanceMode::Warn,
3983 Arc::new(TelemetryCounters::default()),
3984 )
3985 .expect("writer");
3986
3987 let result = writer.submit(WriteRequest {
3988 label: "bad".to_owned(),
3989 nodes: vec![],
3990 node_retires: vec![],
3991 edges: vec![],
3992 edge_retires: vec![],
3993 chunks: vec![ChunkInsert {
3994 id: "chunk-1".to_owned(),
3995 node_logical_id: "nonexistent".to_owned(),
3996 text_content: "some text".to_owned(),
3997 byte_start: None,
3998 byte_end: None,
3999 content_hash: None,
4000 }],
4001 runs: vec![],
4002 steps: vec![],
4003 actions: vec![],
4004 optional_backfills: vec![],
4005 vec_inserts: vec![],
4006 operational_writes: vec![],
4007 });
4008
4009 assert!(
4010 matches!(result, Err(EngineError::InvalidWrite(_))),
4011 "completely unknown node must return InvalidWrite"
4012 );
4013 }
4014
4015 #[test]
4016 fn writer_executes_typed_nodes_chunks_and_derived_projections() {
4017 let db = NamedTempFile::new().expect("temporary db");
4018 let writer = WriterActor::start(
4019 db.path(),
4020 Arc::new(SchemaManager::new()),
4021 ProvenanceMode::Warn,
4022 Arc::new(TelemetryCounters::default()),
4023 )
4024 .expect("writer");
4025
4026 let receipt = writer
4027 .submit(WriteRequest {
4028 label: "seed".to_owned(),
4029 nodes: vec![NodeInsert {
4030 row_id: "row-1".to_owned(),
4031 logical_id: "logical-1".to_owned(),
4032 kind: "Meeting".to_owned(),
4033 properties: "{}".to_owned(),
4034 source_ref: None,
4035 upsert: false,
4036 chunk_policy: ChunkPolicy::Preserve,
4037 content_ref: None,
4038 }],
4039 node_retires: vec![],
4040 edges: vec![],
4041 edge_retires: vec![],
4042 chunks: vec![ChunkInsert {
4043 id: "chunk-1".to_owned(),
4044 node_logical_id: "logical-1".to_owned(),
4045 text_content: "budget discussion".to_owned(),
4046 byte_start: None,
4047 byte_end: None,
4048 content_hash: None,
4049 }],
4050 runs: vec![],
4051 steps: vec![],
4052 actions: vec![],
4053 optional_backfills: vec![],
4054 vec_inserts: vec![],
4055 operational_writes: vec![],
4056 })
4057 .expect("write receipt");
4058
4059 assert_eq!(receipt.label, "seed");
4060 }
4061
4062 #[test]
4063 fn writer_node_retire_supersedes_active_node() {
4064 let db = NamedTempFile::new().expect("temporary db");
4065 let writer = WriterActor::start(
4066 db.path(),
4067 Arc::new(SchemaManager::new()),
4068 ProvenanceMode::Warn,
4069 Arc::new(TelemetryCounters::default()),
4070 )
4071 .expect("writer");
4072
4073 writer
4074 .submit(WriteRequest {
4075 label: "seed".to_owned(),
4076 nodes: vec![NodeInsert {
4077 row_id: "row-1".to_owned(),
4078 logical_id: "meeting-1".to_owned(),
4079 kind: "Meeting".to_owned(),
4080 properties: "{}".to_owned(),
4081 source_ref: Some("src-1".to_owned()),
4082 upsert: false,
4083 chunk_policy: ChunkPolicy::Preserve,
4084 content_ref: None,
4085 }],
4086 node_retires: vec![],
4087 edges: vec![],
4088 edge_retires: vec![],
4089 chunks: vec![],
4090 runs: vec![],
4091 steps: vec![],
4092 actions: vec![],
4093 optional_backfills: vec![],
4094 vec_inserts: vec![],
4095 operational_writes: vec![],
4096 })
4097 .expect("seed write");
4098
4099 writer
4100 .submit(WriteRequest {
4101 label: "retire".to_owned(),
4102 nodes: vec![],
4103 node_retires: vec![NodeRetire {
4104 logical_id: "meeting-1".to_owned(),
4105 source_ref: Some("src-2".to_owned()),
4106 }],
4107 edges: vec![],
4108 edge_retires: vec![],
4109 chunks: vec![],
4110 runs: vec![],
4111 steps: vec![],
4112 actions: vec![],
4113 optional_backfills: vec![],
4114 vec_inserts: vec![],
4115 operational_writes: vec![],
4116 })
4117 .expect("retire write");
4118
4119 let conn = rusqlite::Connection::open(db.path()).expect("open");
4120 let active: i64 = conn
4121 .query_row(
4122 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
4123 [],
4124 |r| r.get(0),
4125 )
4126 .expect("count active");
4127 let historical: i64 = conn
4128 .query_row(
4129 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
4130 [],
4131 |r| r.get(0),
4132 )
4133 .expect("count historical");
4134
4135 assert_eq!(active, 0, "active count must be 0 after retire");
4136 assert_eq!(historical, 1, "historical count must be 1 after retire");
4137 }
4138
4139 #[test]
4140 fn writer_node_retire_preserves_chunks_and_clears_fts() {
4141 let db = NamedTempFile::new().expect("temporary db");
4142 let writer = WriterActor::start(
4143 db.path(),
4144 Arc::new(SchemaManager::new()),
4145 ProvenanceMode::Warn,
4146 Arc::new(TelemetryCounters::default()),
4147 )
4148 .expect("writer");
4149
4150 writer
4151 .submit(WriteRequest {
4152 label: "seed".to_owned(),
4153 nodes: vec![NodeInsert {
4154 row_id: "row-1".to_owned(),
4155 logical_id: "meeting-1".to_owned(),
4156 kind: "Meeting".to_owned(),
4157 properties: "{}".to_owned(),
4158 source_ref: Some("src-1".to_owned()),
4159 upsert: false,
4160 chunk_policy: ChunkPolicy::Preserve,
4161 content_ref: None,
4162 }],
4163 node_retires: vec![],
4164 edges: vec![],
4165 edge_retires: vec![],
4166 chunks: vec![ChunkInsert {
4167 id: "chunk-1".to_owned(),
4168 node_logical_id: "meeting-1".to_owned(),
4169 text_content: "budget discussion".to_owned(),
4170 byte_start: None,
4171 byte_end: None,
4172 content_hash: None,
4173 }],
4174 runs: vec![],
4175 steps: vec![],
4176 actions: vec![],
4177 optional_backfills: vec![],
4178 vec_inserts: vec![],
4179 operational_writes: vec![],
4180 })
4181 .expect("seed write");
4182
4183 writer
4184 .submit(WriteRequest {
4185 label: "retire".to_owned(),
4186 nodes: vec![],
4187 node_retires: vec![NodeRetire {
4188 logical_id: "meeting-1".to_owned(),
4189 source_ref: Some("src-2".to_owned()),
4190 }],
4191 edges: vec![],
4192 edge_retires: vec![],
4193 chunks: vec![],
4194 runs: vec![],
4195 steps: vec![],
4196 actions: vec![],
4197 optional_backfills: vec![],
4198 vec_inserts: vec![],
4199 operational_writes: vec![],
4200 })
4201 .expect("retire write");
4202
4203 let conn = rusqlite::Connection::open(db.path()).expect("open");
4204 let chunk_count: i64 = conn
4205 .query_row(
4206 "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
4207 [],
4208 |r| r.get(0),
4209 )
4210 .expect("chunk count");
4211 let fts_count: i64 = conn
4212 .query_row(
4213 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
4214 [],
4215 |r| r.get(0),
4216 )
4217 .expect("fts count");
4218
4219 assert_eq!(
4220 chunk_count, 1,
4221 "chunks must remain after node retire so restore can re-establish content"
4222 );
4223 assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
4224 }
4225
4226 #[test]
4227 fn writer_edge_retire_supersedes_active_edge() {
4228 let db = NamedTempFile::new().expect("temporary db");
4229 let writer = WriterActor::start(
4230 db.path(),
4231 Arc::new(SchemaManager::new()),
4232 ProvenanceMode::Warn,
4233 Arc::new(TelemetryCounters::default()),
4234 )
4235 .expect("writer");
4236
4237 writer
4238 .submit(WriteRequest {
4239 label: "seed".to_owned(),
4240 nodes: vec![
4241 NodeInsert {
4242 row_id: "row-a".to_owned(),
4243 logical_id: "node-a".to_owned(),
4244 kind: "Meeting".to_owned(),
4245 properties: "{}".to_owned(),
4246 source_ref: Some("src-1".to_owned()),
4247 upsert: false,
4248 chunk_policy: ChunkPolicy::Preserve,
4249 content_ref: None,
4250 },
4251 NodeInsert {
4252 row_id: "row-b".to_owned(),
4253 logical_id: "node-b".to_owned(),
4254 kind: "Task".to_owned(),
4255 properties: "{}".to_owned(),
4256 source_ref: Some("src-1".to_owned()),
4257 upsert: false,
4258 chunk_policy: ChunkPolicy::Preserve,
4259 content_ref: None,
4260 },
4261 ],
4262 node_retires: vec![],
4263 edges: vec![EdgeInsert {
4264 row_id: "edge-1".to_owned(),
4265 logical_id: "edge-lg-1".to_owned(),
4266 source_logical_id: "node-a".to_owned(),
4267 target_logical_id: "node-b".to_owned(),
4268 kind: "HAS_TASK".to_owned(),
4269 properties: "{}".to_owned(),
4270 source_ref: Some("src-1".to_owned()),
4271 upsert: false,
4272 }],
4273 edge_retires: vec![],
4274 chunks: vec![],
4275 runs: vec![],
4276 steps: vec![],
4277 actions: vec![],
4278 optional_backfills: vec![],
4279 vec_inserts: vec![],
4280 operational_writes: vec![],
4281 })
4282 .expect("seed write");
4283
4284 writer
4285 .submit(WriteRequest {
4286 label: "retire-edge".to_owned(),
4287 nodes: vec![],
4288 node_retires: vec![],
4289 edges: vec![],
4290 edge_retires: vec![EdgeRetire {
4291 logical_id: "edge-lg-1".to_owned(),
4292 source_ref: Some("src-2".to_owned()),
4293 }],
4294 chunks: vec![],
4295 runs: vec![],
4296 steps: vec![],
4297 actions: vec![],
4298 optional_backfills: vec![],
4299 vec_inserts: vec![],
4300 operational_writes: vec![],
4301 })
4302 .expect("retire edge write");
4303
4304 let conn = rusqlite::Connection::open(db.path()).expect("open");
4305 let active: i64 = conn
4306 .query_row(
4307 "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
4308 [],
4309 |r| r.get(0),
4310 )
4311 .expect("active edge count");
4312
4313 assert_eq!(active, 0, "active edge count must be 0 after retire");
4314 }
4315
4316 #[test]
4317 fn writer_retire_without_source_ref_emits_provenance_warning() {
4318 let db = NamedTempFile::new().expect("temporary db");
4319 let writer = WriterActor::start(
4320 db.path(),
4321 Arc::new(SchemaManager::new()),
4322 ProvenanceMode::Warn,
4323 Arc::new(TelemetryCounters::default()),
4324 )
4325 .expect("writer");
4326
4327 writer
4328 .submit(WriteRequest {
4329 label: "seed".to_owned(),
4330 nodes: vec![NodeInsert {
4331 row_id: "row-1".to_owned(),
4332 logical_id: "meeting-1".to_owned(),
4333 kind: "Meeting".to_owned(),
4334 properties: "{}".to_owned(),
4335 source_ref: Some("src-1".to_owned()),
4336 upsert: false,
4337 chunk_policy: ChunkPolicy::Preserve,
4338 content_ref: None,
4339 }],
4340 node_retires: vec![],
4341 edges: vec![],
4342 edge_retires: vec![],
4343 chunks: vec![],
4344 runs: vec![],
4345 steps: vec![],
4346 actions: vec![],
4347 optional_backfills: vec![],
4348 vec_inserts: vec![],
4349 operational_writes: vec![],
4350 })
4351 .expect("seed write");
4352
4353 let receipt = writer
4354 .submit(WriteRequest {
4355 label: "retire-no-src".to_owned(),
4356 nodes: vec![],
4357 node_retires: vec![NodeRetire {
4358 logical_id: "meeting-1".to_owned(),
4359 source_ref: None,
4360 }],
4361 edges: vec![],
4362 edge_retires: vec![],
4363 chunks: vec![],
4364 runs: vec![],
4365 steps: vec![],
4366 actions: vec![],
4367 optional_backfills: vec![],
4368 vec_inserts: vec![],
4369 operational_writes: vec![],
4370 })
4371 .expect("retire write");
4372
4373 assert!(
4374 !receipt.provenance_warnings.is_empty(),
4375 "retire without source_ref must emit a provenance warning"
4376 );
4377 }
4378
4379 #[test]
4380 #[allow(clippy::too_many_lines)]
4381 fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
4382 let db = NamedTempFile::new().expect("temporary db");
4383 let writer = WriterActor::start(
4384 db.path(),
4385 Arc::new(SchemaManager::new()),
4386 ProvenanceMode::Warn,
4387 Arc::new(TelemetryCounters::default()),
4388 )
4389 .expect("writer");
4390
4391 writer
4392 .submit(WriteRequest {
4393 label: "v1".to_owned(),
4394 nodes: vec![NodeInsert {
4395 row_id: "row-1".to_owned(),
4396 logical_id: "meeting-1".to_owned(),
4397 kind: "Meeting".to_owned(),
4398 properties: "{}".to_owned(),
4399 source_ref: Some("src-1".to_owned()),
4400 upsert: false,
4401 chunk_policy: ChunkPolicy::Preserve,
4402 content_ref: None,
4403 }],
4404 node_retires: vec![],
4405 edges: vec![],
4406 edge_retires: vec![],
4407 chunks: vec![ChunkInsert {
4408 id: "chunk-old".to_owned(),
4409 node_logical_id: "meeting-1".to_owned(),
4410 text_content: "old text".to_owned(),
4411 byte_start: None,
4412 byte_end: None,
4413 content_hash: None,
4414 }],
4415 runs: vec![],
4416 steps: vec![],
4417 actions: vec![],
4418 optional_backfills: vec![],
4419 vec_inserts: vec![],
4420 operational_writes: vec![],
4421 })
4422 .expect("v1 write");
4423
4424 writer
4425 .submit(WriteRequest {
4426 label: "v2".to_owned(),
4427 nodes: vec![NodeInsert {
4428 row_id: "row-2".to_owned(),
4429 logical_id: "meeting-1".to_owned(),
4430 kind: "Meeting".to_owned(),
4431 properties: "{}".to_owned(),
4432 source_ref: Some("src-2".to_owned()),
4433 upsert: true,
4434 chunk_policy: ChunkPolicy::Replace,
4435 content_ref: None,
4436 }],
4437 node_retires: vec![],
4438 edges: vec![],
4439 edge_retires: vec![],
4440 chunks: vec![ChunkInsert {
4441 id: "chunk-new".to_owned(),
4442 node_logical_id: "meeting-1".to_owned(),
4443 text_content: "new text".to_owned(),
4444 byte_start: None,
4445 byte_end: None,
4446 content_hash: None,
4447 }],
4448 runs: vec![],
4449 steps: vec![],
4450 actions: vec![],
4451 optional_backfills: vec![],
4452 vec_inserts: vec![],
4453 operational_writes: vec![],
4454 })
4455 .expect("v2 write");
4456
4457 let conn = rusqlite::Connection::open(db.path()).expect("open");
4458 let old_chunk: i64 = conn
4459 .query_row(
4460 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4461 [],
4462 |r| r.get(0),
4463 )
4464 .expect("old chunk count");
4465 let new_chunk: i64 = conn
4466 .query_row(
4467 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4468 [],
4469 |r| r.get(0),
4470 )
4471 .expect("new chunk count");
4472 let fts_old: i64 = conn
4473 .query_row(
4474 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4475 [],
4476 |r| r.get(0),
4477 )
4478 .expect("old fts count");
4479
4480 assert_eq!(
4481 old_chunk, 0,
4482 "old chunk must be deleted by ChunkPolicy::Replace"
4483 );
4484 assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4485 assert_eq!(
4486 fts_old, 0,
4487 "old FTS row must be deleted by ChunkPolicy::Replace"
4488 );
4489 }
4490
4491 #[test]
4492 fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4493 let db = NamedTempFile::new().expect("temporary db");
4494 let writer = WriterActor::start(
4495 db.path(),
4496 Arc::new(SchemaManager::new()),
4497 ProvenanceMode::Warn,
4498 Arc::new(TelemetryCounters::default()),
4499 )
4500 .expect("writer");
4501
4502 writer
4503 .submit(WriteRequest {
4504 label: "v1".to_owned(),
4505 nodes: vec![NodeInsert {
4506 row_id: "row-1".to_owned(),
4507 logical_id: "meeting-1".to_owned(),
4508 kind: "Meeting".to_owned(),
4509 properties: "{}".to_owned(),
4510 source_ref: Some("src-1".to_owned()),
4511 upsert: false,
4512 chunk_policy: ChunkPolicy::Preserve,
4513 content_ref: None,
4514 }],
4515 node_retires: vec![],
4516 edges: vec![],
4517 edge_retires: vec![],
4518 chunks: vec![ChunkInsert {
4519 id: "chunk-old".to_owned(),
4520 node_logical_id: "meeting-1".to_owned(),
4521 text_content: "old text".to_owned(),
4522 byte_start: None,
4523 byte_end: None,
4524 content_hash: None,
4525 }],
4526 runs: vec![],
4527 steps: vec![],
4528 actions: vec![],
4529 optional_backfills: vec![],
4530 vec_inserts: vec![],
4531 operational_writes: vec![],
4532 })
4533 .expect("v1 write");
4534
4535 writer
4536 .submit(WriteRequest {
4537 label: "v2-props-only".to_owned(),
4538 nodes: vec![NodeInsert {
4539 row_id: "row-2".to_owned(),
4540 logical_id: "meeting-1".to_owned(),
4541 kind: "Meeting".to_owned(),
4542 properties: r#"{"status":"updated"}"#.to_owned(),
4543 source_ref: Some("src-2".to_owned()),
4544 upsert: true,
4545 chunk_policy: ChunkPolicy::Preserve,
4546 content_ref: None,
4547 }],
4548 node_retires: vec![],
4549 edges: vec![],
4550 edge_retires: vec![],
4551 chunks: vec![],
4552 runs: vec![],
4553 steps: vec![],
4554 actions: vec![],
4555 optional_backfills: vec![],
4556 vec_inserts: vec![],
4557 operational_writes: vec![],
4558 })
4559 .expect("v2 preserve write");
4560
4561 let conn = rusqlite::Connection::open(db.path()).expect("open");
4562 let old_chunk: i64 = conn
4563 .query_row(
4564 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4565 [],
4566 |r| r.get(0),
4567 )
4568 .expect("old chunk count");
4569
4570 assert_eq!(
4571 old_chunk, 1,
4572 "old chunk must be preserved by ChunkPolicy::Preserve"
4573 );
4574 }
4575
4576 #[test]
4577 fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4578 let db = NamedTempFile::new().expect("temporary db");
4579 let writer = WriterActor::start(
4580 db.path(),
4581 Arc::new(SchemaManager::new()),
4582 ProvenanceMode::Warn,
4583 Arc::new(TelemetryCounters::default()),
4584 )
4585 .expect("writer");
4586
4587 writer
4588 .submit(WriteRequest {
4589 label: "v1".to_owned(),
4590 nodes: vec![NodeInsert {
4591 row_id: "row-1".to_owned(),
4592 logical_id: "meeting-1".to_owned(),
4593 kind: "Meeting".to_owned(),
4594 properties: "{}".to_owned(),
4595 source_ref: Some("src-1".to_owned()),
4596 upsert: false,
4597 chunk_policy: ChunkPolicy::Preserve,
4598 content_ref: None,
4599 }],
4600 node_retires: vec![],
4601 edges: vec![],
4602 edge_retires: vec![],
4603 chunks: vec![ChunkInsert {
4604 id: "chunk-existing".to_owned(),
4605 node_logical_id: "meeting-1".to_owned(),
4606 text_content: "existing text".to_owned(),
4607 byte_start: None,
4608 byte_end: None,
4609 content_hash: None,
4610 }],
4611 runs: vec![],
4612 steps: vec![],
4613 actions: vec![],
4614 optional_backfills: vec![],
4615 vec_inserts: vec![],
4616 operational_writes: vec![],
4617 })
4618 .expect("v1 write");
4619
4620 writer
4622 .submit(WriteRequest {
4623 label: "insert-no-upsert".to_owned(),
4624 nodes: vec![NodeInsert {
4625 row_id: "row-2".to_owned(),
4626 logical_id: "meeting-2".to_owned(),
4627 kind: "Meeting".to_owned(),
4628 properties: "{}".to_owned(),
4629 source_ref: Some("src-2".to_owned()),
4630 upsert: false,
4631 chunk_policy: ChunkPolicy::Replace,
4632 content_ref: None,
4633 }],
4634 node_retires: vec![],
4635 edges: vec![],
4636 edge_retires: vec![],
4637 chunks: vec![],
4638 runs: vec![],
4639 steps: vec![],
4640 actions: vec![],
4641 optional_backfills: vec![],
4642 vec_inserts: vec![],
4643 operational_writes: vec![],
4644 })
4645 .expect("insert no-upsert write");
4646
4647 let conn = rusqlite::Connection::open(db.path()).expect("open");
4648 let existing_chunk: i64 = conn
4649 .query_row(
4650 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4651 [],
4652 |r| r.get(0),
4653 )
4654 .expect("chunk count");
4655
4656 assert_eq!(
4657 existing_chunk, 1,
4658 "ChunkPolicy::Replace without upsert must not delete existing chunks"
4659 );
4660 }
4661
4662 #[test]
4663 fn writer_run_upsert_supersedes_prior_active_run() {
4664 let db = NamedTempFile::new().expect("temporary db");
4665 let writer = WriterActor::start(
4666 db.path(),
4667 Arc::new(SchemaManager::new()),
4668 ProvenanceMode::Warn,
4669 Arc::new(TelemetryCounters::default()),
4670 )
4671 .expect("writer");
4672
4673 writer
4674 .submit(WriteRequest {
4675 label: "v1".to_owned(),
4676 nodes: vec![],
4677 node_retires: vec![],
4678 edges: vec![],
4679 edge_retires: vec![],
4680 chunks: vec![],
4681 runs: vec![RunInsert {
4682 id: "run-v1".to_owned(),
4683 kind: "session".to_owned(),
4684 status: "completed".to_owned(),
4685 properties: "{}".to_owned(),
4686 source_ref: Some("src-1".to_owned()),
4687 upsert: false,
4688 supersedes_id: None,
4689 }],
4690 steps: vec![],
4691 actions: vec![],
4692 optional_backfills: vec![],
4693 vec_inserts: vec![],
4694 operational_writes: vec![],
4695 })
4696 .expect("v1 run write");
4697
4698 writer
4699 .submit(WriteRequest {
4700 label: "v2".to_owned(),
4701 nodes: vec![],
4702 node_retires: vec![],
4703 edges: vec![],
4704 edge_retires: vec![],
4705 chunks: vec![],
4706 runs: vec![RunInsert {
4707 id: "run-v2".to_owned(),
4708 kind: "session".to_owned(),
4709 status: "completed".to_owned(),
4710 properties: "{}".to_owned(),
4711 source_ref: Some("src-2".to_owned()),
4712 upsert: true,
4713 supersedes_id: Some("run-v1".to_owned()),
4714 }],
4715 steps: vec![],
4716 actions: vec![],
4717 optional_backfills: vec![],
4718 vec_inserts: vec![],
4719 operational_writes: vec![],
4720 })
4721 .expect("v2 run write");
4722
4723 let conn = rusqlite::Connection::open(db.path()).expect("open");
4724 let v1_historical: i64 = conn
4725 .query_row(
4726 "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4727 [],
4728 |r| r.get(0),
4729 )
4730 .expect("v1 historical count");
4731 let v2_active: i64 = conn
4732 .query_row(
4733 "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4734 [],
4735 |r| r.get(0),
4736 )
4737 .expect("v2 active count");
4738
4739 assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4740 assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4741 }
4742
4743 #[test]
4744 fn writer_step_upsert_supersedes_prior_active_step() {
4745 let db = NamedTempFile::new().expect("temporary db");
4746 let writer = WriterActor::start(
4747 db.path(),
4748 Arc::new(SchemaManager::new()),
4749 ProvenanceMode::Warn,
4750 Arc::new(TelemetryCounters::default()),
4751 )
4752 .expect("writer");
4753
4754 writer
4755 .submit(WriteRequest {
4756 label: "v1".to_owned(),
4757 nodes: vec![],
4758 node_retires: vec![],
4759 edges: vec![],
4760 edge_retires: vec![],
4761 chunks: vec![],
4762 runs: vec![RunInsert {
4763 id: "run-1".to_owned(),
4764 kind: "session".to_owned(),
4765 status: "completed".to_owned(),
4766 properties: "{}".to_owned(),
4767 source_ref: Some("src-1".to_owned()),
4768 upsert: false,
4769 supersedes_id: None,
4770 }],
4771 steps: vec![StepInsert {
4772 id: "step-v1".to_owned(),
4773 run_id: "run-1".to_owned(),
4774 kind: "llm".to_owned(),
4775 status: "completed".to_owned(),
4776 properties: "{}".to_owned(),
4777 source_ref: Some("src-1".to_owned()),
4778 upsert: false,
4779 supersedes_id: None,
4780 }],
4781 actions: vec![],
4782 optional_backfills: vec![],
4783 vec_inserts: vec![],
4784 operational_writes: vec![],
4785 })
4786 .expect("v1 step write");
4787
4788 writer
4789 .submit(WriteRequest {
4790 label: "v2".to_owned(),
4791 nodes: vec![],
4792 node_retires: vec![],
4793 edges: vec![],
4794 edge_retires: vec![],
4795 chunks: vec![],
4796 runs: vec![],
4797 steps: vec![StepInsert {
4798 id: "step-v2".to_owned(),
4799 run_id: "run-1".to_owned(),
4800 kind: "llm".to_owned(),
4801 status: "completed".to_owned(),
4802 properties: "{}".to_owned(),
4803 source_ref: Some("src-2".to_owned()),
4804 upsert: true,
4805 supersedes_id: Some("step-v1".to_owned()),
4806 }],
4807 actions: vec![],
4808 optional_backfills: vec![],
4809 vec_inserts: vec![],
4810 operational_writes: vec![],
4811 })
4812 .expect("v2 step write");
4813
4814 let conn = rusqlite::Connection::open(db.path()).expect("open");
4815 let v1_historical: i64 = conn
4816 .query_row(
4817 "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4818 [],
4819 |r| r.get(0),
4820 )
4821 .expect("v1 historical count");
4822 let v2_active: i64 = conn
4823 .query_row(
4824 "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4825 [],
4826 |r| r.get(0),
4827 )
4828 .expect("v2 active count");
4829
4830 assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4831 assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4832 }
4833
4834 #[test]
4835 fn writer_action_upsert_supersedes_prior_active_action() {
4836 let db = NamedTempFile::new().expect("temporary db");
4837 let writer = WriterActor::start(
4838 db.path(),
4839 Arc::new(SchemaManager::new()),
4840 ProvenanceMode::Warn,
4841 Arc::new(TelemetryCounters::default()),
4842 )
4843 .expect("writer");
4844
4845 writer
4846 .submit(WriteRequest {
4847 label: "v1".to_owned(),
4848 nodes: vec![],
4849 node_retires: vec![],
4850 edges: vec![],
4851 edge_retires: vec![],
4852 chunks: vec![],
4853 runs: vec![RunInsert {
4854 id: "run-1".to_owned(),
4855 kind: "session".to_owned(),
4856 status: "completed".to_owned(),
4857 properties: "{}".to_owned(),
4858 source_ref: Some("src-1".to_owned()),
4859 upsert: false,
4860 supersedes_id: None,
4861 }],
4862 steps: vec![StepInsert {
4863 id: "step-1".to_owned(),
4864 run_id: "run-1".to_owned(),
4865 kind: "llm".to_owned(),
4866 status: "completed".to_owned(),
4867 properties: "{}".to_owned(),
4868 source_ref: Some("src-1".to_owned()),
4869 upsert: false,
4870 supersedes_id: None,
4871 }],
4872 actions: vec![ActionInsert {
4873 id: "action-v1".to_owned(),
4874 step_id: "step-1".to_owned(),
4875 kind: "emit".to_owned(),
4876 status: "completed".to_owned(),
4877 properties: "{}".to_owned(),
4878 source_ref: Some("src-1".to_owned()),
4879 upsert: false,
4880 supersedes_id: None,
4881 }],
4882 optional_backfills: vec![],
4883 vec_inserts: vec![],
4884 operational_writes: vec![],
4885 })
4886 .expect("v1 action write");
4887
4888 writer
4889 .submit(WriteRequest {
4890 label: "v2".to_owned(),
4891 nodes: vec![],
4892 node_retires: vec![],
4893 edges: vec![],
4894 edge_retires: vec![],
4895 chunks: vec![],
4896 runs: vec![],
4897 steps: vec![],
4898 actions: vec![ActionInsert {
4899 id: "action-v2".to_owned(),
4900 step_id: "step-1".to_owned(),
4901 kind: "emit".to_owned(),
4902 status: "completed".to_owned(),
4903 properties: "{}".to_owned(),
4904 source_ref: Some("src-2".to_owned()),
4905 upsert: true,
4906 supersedes_id: Some("action-v1".to_owned()),
4907 }],
4908 optional_backfills: vec![],
4909 vec_inserts: vec![],
4910 operational_writes: vec![],
4911 })
4912 .expect("v2 action write");
4913
4914 let conn = rusqlite::Connection::open(db.path()).expect("open");
4915 let v1_historical: i64 = conn
4916 .query_row(
4917 "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4918 [],
4919 |r| r.get(0),
4920 )
4921 .expect("v1 historical count");
4922 let v2_active: i64 = conn
4923 .query_row(
4924 "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4925 [],
4926 |r| r.get(0),
4927 )
4928 .expect("v2 active count");
4929
4930 assert_eq!(
4931 v1_historical, 1,
4932 "action-v1 must be historical after upsert"
4933 );
4934 assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4935 }
4936
4937 #[test]
4940 fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4941 let db = NamedTempFile::new().expect("temporary db");
4942 let writer = WriterActor::start(
4943 db.path(),
4944 Arc::new(SchemaManager::new()),
4945 ProvenanceMode::Warn,
4946 Arc::new(TelemetryCounters::default()),
4947 )
4948 .expect("writer");
4949
4950 let result = writer.submit(WriteRequest {
4951 label: "bad".to_owned(),
4952 nodes: vec![],
4953 node_retires: vec![],
4954 edges: vec![],
4955 edge_retires: vec![],
4956 chunks: vec![],
4957 runs: vec![RunInsert {
4958 id: "run-1".to_owned(),
4959 kind: "session".to_owned(),
4960 status: "completed".to_owned(),
4961 properties: "{}".to_owned(),
4962 source_ref: None,
4963 upsert: true,
4964 supersedes_id: None,
4965 }],
4966 steps: vec![],
4967 actions: vec![],
4968 optional_backfills: vec![],
4969 vec_inserts: vec![],
4970 operational_writes: vec![],
4971 });
4972
4973 assert!(
4974 matches!(result, Err(EngineError::InvalidWrite(_))),
4975 "run upsert=true without supersedes_id must return InvalidWrite"
4976 );
4977 }
4978
4979 #[test]
4980 fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4981 let db = NamedTempFile::new().expect("temporary db");
4982 let writer = WriterActor::start(
4983 db.path(),
4984 Arc::new(SchemaManager::new()),
4985 ProvenanceMode::Warn,
4986 Arc::new(TelemetryCounters::default()),
4987 )
4988 .expect("writer");
4989
4990 let result = writer.submit(WriteRequest {
4991 label: "bad".to_owned(),
4992 nodes: vec![],
4993 node_retires: vec![],
4994 edges: vec![],
4995 edge_retires: vec![],
4996 chunks: vec![],
4997 runs: vec![],
4998 steps: vec![StepInsert {
4999 id: "step-1".to_owned(),
5000 run_id: "run-1".to_owned(),
5001 kind: "llm".to_owned(),
5002 status: "completed".to_owned(),
5003 properties: "{}".to_owned(),
5004 source_ref: None,
5005 upsert: true,
5006 supersedes_id: None,
5007 }],
5008 actions: vec![],
5009 optional_backfills: vec![],
5010 vec_inserts: vec![],
5011 operational_writes: vec![],
5012 });
5013
5014 assert!(
5015 matches!(result, Err(EngineError::InvalidWrite(_))),
5016 "step upsert=true without supersedes_id must return InvalidWrite"
5017 );
5018 }
5019
5020 #[test]
5021 fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
5022 let db = NamedTempFile::new().expect("temporary db");
5023 let writer = WriterActor::start(
5024 db.path(),
5025 Arc::new(SchemaManager::new()),
5026 ProvenanceMode::Warn,
5027 Arc::new(TelemetryCounters::default()),
5028 )
5029 .expect("writer");
5030
5031 let result = writer.submit(WriteRequest {
5032 label: "bad".to_owned(),
5033 nodes: vec![],
5034 node_retires: vec![],
5035 edges: vec![],
5036 edge_retires: vec![],
5037 chunks: vec![],
5038 runs: vec![],
5039 steps: vec![],
5040 actions: vec![ActionInsert {
5041 id: "action-1".to_owned(),
5042 step_id: "step-1".to_owned(),
5043 kind: "emit".to_owned(),
5044 status: "completed".to_owned(),
5045 properties: "{}".to_owned(),
5046 source_ref: None,
5047 upsert: true,
5048 supersedes_id: None,
5049 }],
5050 optional_backfills: vec![],
5051 vec_inserts: vec![],
5052 operational_writes: vec![],
5053 });
5054
5055 assert!(
5056 matches!(result, Err(EngineError::InvalidWrite(_))),
5057 "action upsert=true without supersedes_id must return InvalidWrite"
5058 );
5059 }
5060
5061 #[test]
5064 fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
5065 let db = NamedTempFile::new().expect("temporary db");
5066 let writer = WriterActor::start(
5067 db.path(),
5068 Arc::new(SchemaManager::new()),
5069 ProvenanceMode::Warn,
5070 Arc::new(TelemetryCounters::default()),
5071 )
5072 .expect("writer");
5073
5074 let receipt = writer
5075 .submit(WriteRequest {
5076 label: "test".to_owned(),
5077 nodes: vec![
5078 NodeInsert {
5079 row_id: "row-a".to_owned(),
5080 logical_id: "node-a".to_owned(),
5081 kind: "Meeting".to_owned(),
5082 properties: "{}".to_owned(),
5083 source_ref: Some("src-1".to_owned()),
5084 upsert: false,
5085 chunk_policy: ChunkPolicy::Preserve,
5086 content_ref: None,
5087 },
5088 NodeInsert {
5089 row_id: "row-b".to_owned(),
5090 logical_id: "node-b".to_owned(),
5091 kind: "Task".to_owned(),
5092 properties: "{}".to_owned(),
5093 source_ref: Some("src-1".to_owned()),
5094 upsert: false,
5095 chunk_policy: ChunkPolicy::Preserve,
5096 content_ref: None,
5097 },
5098 ],
5099 node_retires: vec![],
5100 edges: vec![EdgeInsert {
5101 row_id: "edge-1".to_owned(),
5102 logical_id: "edge-lg-1".to_owned(),
5103 source_logical_id: "node-a".to_owned(),
5104 target_logical_id: "node-b".to_owned(),
5105 kind: "HAS_TASK".to_owned(),
5106 properties: "{}".to_owned(),
5107 source_ref: None,
5108 upsert: false,
5109 }],
5110 edge_retires: vec![],
5111 chunks: vec![],
5112 runs: vec![],
5113 steps: vec![],
5114 actions: vec![],
5115 optional_backfills: vec![],
5116 vec_inserts: vec![],
5117 operational_writes: vec![],
5118 })
5119 .expect("write");
5120
5121 assert!(
5122 !receipt.provenance_warnings.is_empty(),
5123 "edge insert without source_ref must emit a provenance warning"
5124 );
5125 }
5126
5127 #[test]
5128 fn writer_run_insert_without_source_ref_emits_provenance_warning() {
5129 let db = NamedTempFile::new().expect("temporary db");
5130 let writer = WriterActor::start(
5131 db.path(),
5132 Arc::new(SchemaManager::new()),
5133 ProvenanceMode::Warn,
5134 Arc::new(TelemetryCounters::default()),
5135 )
5136 .expect("writer");
5137
5138 let receipt = writer
5139 .submit(WriteRequest {
5140 label: "test".to_owned(),
5141 nodes: vec![],
5142 node_retires: vec![],
5143 edges: vec![],
5144 edge_retires: vec![],
5145 chunks: vec![],
5146 runs: vec![RunInsert {
5147 id: "run-1".to_owned(),
5148 kind: "session".to_owned(),
5149 status: "completed".to_owned(),
5150 properties: "{}".to_owned(),
5151 source_ref: None,
5152 upsert: false,
5153 supersedes_id: None,
5154 }],
5155 steps: vec![],
5156 actions: vec![],
5157 optional_backfills: vec![],
5158 vec_inserts: vec![],
5159 operational_writes: vec![],
5160 })
5161 .expect("write");
5162
5163 assert!(
5164 !receipt.provenance_warnings.is_empty(),
5165 "run insert without source_ref must emit a provenance warning"
5166 );
5167 }
5168
5169 #[test]
5172 fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
5173 let db = NamedTempFile::new().expect("temporary db");
5174 let writer = WriterActor::start(
5175 db.path(),
5176 Arc::new(SchemaManager::new()),
5177 ProvenanceMode::Warn,
5178 Arc::new(TelemetryCounters::default()),
5179 )
5180 .expect("writer");
5181
5182 writer
5184 .submit(WriteRequest {
5185 label: "seed".to_owned(),
5186 nodes: vec![NodeInsert {
5187 row_id: "row-1".to_owned(),
5188 logical_id: "meeting-1".to_owned(),
5189 kind: "Meeting".to_owned(),
5190 properties: "{}".to_owned(),
5191 source_ref: Some("src-1".to_owned()),
5192 upsert: false,
5193 chunk_policy: ChunkPolicy::Preserve,
5194 content_ref: None,
5195 }],
5196 node_retires: vec![],
5197 edges: vec![],
5198 edge_retires: vec![],
5199 chunks: vec![],
5200 runs: vec![],
5201 steps: vec![],
5202 actions: vec![],
5203 optional_backfills: vec![],
5204 vec_inserts: vec![],
5205 operational_writes: vec![],
5206 })
5207 .expect("seed write");
5208
5209 let result = writer.submit(WriteRequest {
5211 label: "bad".to_owned(),
5212 nodes: vec![],
5213 node_retires: vec![NodeRetire {
5214 logical_id: "meeting-1".to_owned(),
5215 source_ref: Some("src-2".to_owned()),
5216 }],
5217 edges: vec![],
5218 edge_retires: vec![],
5219 chunks: vec![ChunkInsert {
5220 id: "chunk-bad".to_owned(),
5221 node_logical_id: "meeting-1".to_owned(),
5222 text_content: "some text".to_owned(),
5223 byte_start: None,
5224 byte_end: None,
5225 content_hash: None,
5226 }],
5227 runs: vec![],
5228 steps: vec![],
5229 actions: vec![],
5230 optional_backfills: vec![],
5231 vec_inserts: vec![],
5232 operational_writes: vec![],
5233 });
5234
5235 assert!(
5236 matches!(result, Err(EngineError::InvalidWrite(_))),
5237 "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
5238 );
5239 }
5240
5241 #[test]
5244 fn writer_batch_insert_multiple_nodes() {
5245 let db = NamedTempFile::new().expect("temporary db");
5246 let writer = WriterActor::start(
5247 db.path(),
5248 Arc::new(SchemaManager::new()),
5249 ProvenanceMode::Warn,
5250 Arc::new(TelemetryCounters::default()),
5251 )
5252 .expect("writer");
5253
5254 let nodes: Vec<NodeInsert> = (0..100)
5255 .map(|i| NodeInsert {
5256 row_id: format!("row-{i}"),
5257 logical_id: format!("lg-{i}"),
5258 kind: "Note".to_owned(),
5259 properties: "{}".to_owned(),
5260 source_ref: Some("batch-src".to_owned()),
5261 upsert: false,
5262 chunk_policy: ChunkPolicy::Preserve,
5263 content_ref: None,
5264 })
5265 .collect();
5266
5267 writer
5268 .submit(WriteRequest {
5269 label: "batch".to_owned(),
5270 nodes,
5271 node_retires: vec![],
5272 edges: vec![],
5273 edge_retires: vec![],
5274 chunks: vec![],
5275 runs: vec![],
5276 steps: vec![],
5277 actions: vec![],
5278 optional_backfills: vec![],
5279 vec_inserts: vec![],
5280 operational_writes: vec![],
5281 })
5282 .expect("batch write");
5283
5284 let conn = rusqlite::Connection::open(db.path()).expect("open");
5285 let count: i64 = conn
5286 .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
5287 .expect("count nodes");
5288 assert_eq!(
5289 count, 100,
5290 "all 100 nodes must be present after batch insert"
5291 );
5292 }
5293
5294 #[test]
5297 fn prepare_write_rejects_empty_node_row_id() {
5298 let db = NamedTempFile::new().expect("temporary db");
5299 let writer = WriterActor::start(
5300 db.path(),
5301 Arc::new(SchemaManager::new()),
5302 ProvenanceMode::Warn,
5303 Arc::new(TelemetryCounters::default()),
5304 )
5305 .expect("writer");
5306
5307 let result = writer.submit(WriteRequest {
5308 label: "test".to_owned(),
5309 nodes: vec![NodeInsert {
5310 row_id: String::new(),
5311 logical_id: "lg-1".to_owned(),
5312 kind: "Note".to_owned(),
5313 properties: "{}".to_owned(),
5314 source_ref: None,
5315 upsert: false,
5316 chunk_policy: ChunkPolicy::Preserve,
5317 content_ref: None,
5318 }],
5319 node_retires: vec![],
5320 edges: vec![],
5321 edge_retires: vec![],
5322 chunks: vec![],
5323 runs: vec![],
5324 steps: vec![],
5325 actions: vec![],
5326 optional_backfills: vec![],
5327 vec_inserts: vec![],
5328 operational_writes: vec![],
5329 });
5330
5331 assert!(
5332 matches!(result, Err(EngineError::InvalidWrite(_))),
5333 "empty row_id must be rejected"
5334 );
5335 }
5336
5337 #[test]
5338 fn prepare_write_rejects_empty_node_logical_id() {
5339 let db = NamedTempFile::new().expect("temporary db");
5340 let writer = WriterActor::start(
5341 db.path(),
5342 Arc::new(SchemaManager::new()),
5343 ProvenanceMode::Warn,
5344 Arc::new(TelemetryCounters::default()),
5345 )
5346 .expect("writer");
5347
5348 let result = writer.submit(WriteRequest {
5349 label: "test".to_owned(),
5350 nodes: vec![NodeInsert {
5351 row_id: "row-1".to_owned(),
5352 logical_id: String::new(),
5353 kind: "Note".to_owned(),
5354 properties: "{}".to_owned(),
5355 source_ref: None,
5356 upsert: false,
5357 chunk_policy: ChunkPolicy::Preserve,
5358 content_ref: None,
5359 }],
5360 node_retires: vec![],
5361 edges: vec![],
5362 edge_retires: vec![],
5363 chunks: vec![],
5364 runs: vec![],
5365 steps: vec![],
5366 actions: vec![],
5367 optional_backfills: vec![],
5368 vec_inserts: vec![],
5369 operational_writes: vec![],
5370 });
5371
5372 assert!(
5373 matches!(result, Err(EngineError::InvalidWrite(_))),
5374 "empty logical_id must be rejected"
5375 );
5376 }
5377
5378 #[test]
5379 fn prepare_write_rejects_duplicate_row_ids_in_request() {
5380 let db = NamedTempFile::new().expect("temporary db");
5381 let writer = WriterActor::start(
5382 db.path(),
5383 Arc::new(SchemaManager::new()),
5384 ProvenanceMode::Warn,
5385 Arc::new(TelemetryCounters::default()),
5386 )
5387 .expect("writer");
5388
5389 let result = writer.submit(WriteRequest {
5390 label: "test".to_owned(),
5391 nodes: vec![
5392 NodeInsert {
5393 row_id: "row-1".to_owned(),
5394 logical_id: "lg-1".to_owned(),
5395 kind: "Note".to_owned(),
5396 properties: "{}".to_owned(),
5397 source_ref: None,
5398 upsert: false,
5399 chunk_policy: ChunkPolicy::Preserve,
5400 content_ref: None,
5401 },
5402 NodeInsert {
5403 row_id: "row-1".to_owned(), logical_id: "lg-2".to_owned(),
5405 kind: "Note".to_owned(),
5406 properties: "{}".to_owned(),
5407 source_ref: None,
5408 upsert: false,
5409 chunk_policy: ChunkPolicy::Preserve,
5410 content_ref: None,
5411 },
5412 ],
5413 node_retires: vec![],
5414 edges: vec![],
5415 edge_retires: vec![],
5416 chunks: vec![],
5417 runs: vec![],
5418 steps: vec![],
5419 actions: vec![],
5420 optional_backfills: vec![],
5421 vec_inserts: vec![],
5422 operational_writes: vec![],
5423 });
5424
5425 assert!(
5426 matches!(result, Err(EngineError::InvalidWrite(_))),
5427 "duplicate row_id within request must be rejected"
5428 );
5429 }
5430
5431 #[test]
5432 fn prepare_write_rejects_empty_chunk_id() {
5433 let db = NamedTempFile::new().expect("temporary db");
5434 let writer = WriterActor::start(
5435 db.path(),
5436 Arc::new(SchemaManager::new()),
5437 ProvenanceMode::Warn,
5438 Arc::new(TelemetryCounters::default()),
5439 )
5440 .expect("writer");
5441
5442 let result = writer.submit(WriteRequest {
5443 label: "test".to_owned(),
5444 nodes: vec![NodeInsert {
5445 row_id: "row-1".to_owned(),
5446 logical_id: "lg-1".to_owned(),
5447 kind: "Note".to_owned(),
5448 properties: "{}".to_owned(),
5449 source_ref: None,
5450 upsert: false,
5451 chunk_policy: ChunkPolicy::Preserve,
5452 content_ref: None,
5453 }],
5454 node_retires: vec![],
5455 edges: vec![],
5456 edge_retires: vec![],
5457 chunks: vec![ChunkInsert {
5458 id: String::new(),
5459 node_logical_id: "lg-1".to_owned(),
5460 text_content: "some text".to_owned(),
5461 byte_start: None,
5462 byte_end: None,
5463 content_hash: None,
5464 }],
5465 runs: vec![],
5466 steps: vec![],
5467 actions: vec![],
5468 optional_backfills: vec![],
5469 vec_inserts: vec![],
5470 operational_writes: vec![],
5471 });
5472
5473 assert!(
5474 matches!(result, Err(EngineError::InvalidWrite(_))),
5475 "empty chunk id must be rejected"
5476 );
5477 }
5478
5479 #[test]
5482 fn writer_receipt_warns_on_step_without_source_ref() {
5483 let db = NamedTempFile::new().expect("temporary db");
5484 let writer = WriterActor::start(
5485 db.path(),
5486 Arc::new(SchemaManager::new()),
5487 ProvenanceMode::Warn,
5488 Arc::new(TelemetryCounters::default()),
5489 )
5490 .expect("writer");
5491
5492 writer
5494 .submit(WriteRequest {
5495 label: "seed-run".to_owned(),
5496 nodes: vec![],
5497 node_retires: vec![],
5498 edges: vec![],
5499 edge_retires: vec![],
5500 chunks: vec![],
5501 runs: vec![RunInsert {
5502 id: "run-1".to_owned(),
5503 kind: "session".to_owned(),
5504 status: "active".to_owned(),
5505 properties: "{}".to_owned(),
5506 source_ref: Some("src-1".to_owned()),
5507 upsert: false,
5508 supersedes_id: None,
5509 }],
5510 steps: vec![],
5511 actions: vec![],
5512 optional_backfills: vec![],
5513 vec_inserts: vec![],
5514 operational_writes: vec![],
5515 })
5516 .expect("seed run");
5517
5518 let receipt = writer
5519 .submit(WriteRequest {
5520 label: "test".to_owned(),
5521 nodes: vec![],
5522 node_retires: vec![],
5523 edges: vec![],
5524 edge_retires: vec![],
5525 chunks: vec![],
5526 runs: vec![],
5527 steps: vec![StepInsert {
5528 id: "step-1".to_owned(),
5529 run_id: "run-1".to_owned(),
5530 kind: "llm_call".to_owned(),
5531 status: "completed".to_owned(),
5532 properties: "{}".to_owned(),
5533 source_ref: None,
5534 upsert: false,
5535 supersedes_id: None,
5536 }],
5537 actions: vec![],
5538 optional_backfills: vec![],
5539 vec_inserts: vec![],
5540 operational_writes: vec![],
5541 })
5542 .expect("write");
5543
5544 assert!(
5545 !receipt.provenance_warnings.is_empty(),
5546 "step insert without source_ref must emit a provenance warning"
5547 );
5548 }
5549
5550 #[test]
5551 fn writer_receipt_warns_on_action_without_source_ref() {
5552 let db = NamedTempFile::new().expect("temporary db");
5553 let writer = WriterActor::start(
5554 db.path(),
5555 Arc::new(SchemaManager::new()),
5556 ProvenanceMode::Warn,
5557 Arc::new(TelemetryCounters::default()),
5558 )
5559 .expect("writer");
5560
5561 writer
5563 .submit(WriteRequest {
5564 label: "seed".to_owned(),
5565 nodes: vec![],
5566 node_retires: vec![],
5567 edges: vec![],
5568 edge_retires: vec![],
5569 chunks: vec![],
5570 runs: vec![RunInsert {
5571 id: "run-1".to_owned(),
5572 kind: "session".to_owned(),
5573 status: "active".to_owned(),
5574 properties: "{}".to_owned(),
5575 source_ref: Some("src-1".to_owned()),
5576 upsert: false,
5577 supersedes_id: None,
5578 }],
5579 steps: vec![StepInsert {
5580 id: "step-1".to_owned(),
5581 run_id: "run-1".to_owned(),
5582 kind: "llm_call".to_owned(),
5583 status: "completed".to_owned(),
5584 properties: "{}".to_owned(),
5585 source_ref: Some("src-1".to_owned()),
5586 upsert: false,
5587 supersedes_id: None,
5588 }],
5589 actions: vec![],
5590 optional_backfills: vec![],
5591 vec_inserts: vec![],
5592 operational_writes: vec![],
5593 })
5594 .expect("seed");
5595
5596 let receipt = writer
5597 .submit(WriteRequest {
5598 label: "test".to_owned(),
5599 nodes: vec![],
5600 node_retires: vec![],
5601 edges: vec![],
5602 edge_retires: vec![],
5603 chunks: vec![],
5604 runs: vec![],
5605 steps: vec![],
5606 actions: vec![ActionInsert {
5607 id: "action-1".to_owned(),
5608 step_id: "step-1".to_owned(),
5609 kind: "tool_call".to_owned(),
5610 status: "completed".to_owned(),
5611 properties: "{}".to_owned(),
5612 source_ref: None,
5613 upsert: false,
5614 supersedes_id: None,
5615 }],
5616 optional_backfills: vec![],
5617 vec_inserts: vec![],
5618 operational_writes: vec![],
5619 })
5620 .expect("write");
5621
5622 assert!(
5623 !receipt.provenance_warnings.is_empty(),
5624 "action insert without source_ref must emit a provenance warning"
5625 );
5626 }
5627
5628 #[test]
5629 fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5630 let db = NamedTempFile::new().expect("temporary db");
5631 let writer = WriterActor::start(
5632 db.path(),
5633 Arc::new(SchemaManager::new()),
5634 ProvenanceMode::Warn,
5635 Arc::new(TelemetryCounters::default()),
5636 )
5637 .expect("writer");
5638
5639 let receipt = writer
5640 .submit(WriteRequest {
5641 label: "test".to_owned(),
5642 nodes: vec![NodeInsert {
5643 row_id: "row-1".to_owned(),
5644 logical_id: "node-1".to_owned(),
5645 kind: "Note".to_owned(),
5646 properties: "{}".to_owned(),
5647 source_ref: Some("src-1".to_owned()),
5648 upsert: false,
5649 chunk_policy: ChunkPolicy::Preserve,
5650 content_ref: None,
5651 }],
5652 node_retires: vec![],
5653 edges: vec![],
5654 edge_retires: vec![],
5655 chunks: vec![],
5656 runs: vec![RunInsert {
5657 id: "run-1".to_owned(),
5658 kind: "session".to_owned(),
5659 status: "active".to_owned(),
5660 properties: "{}".to_owned(),
5661 source_ref: Some("src-1".to_owned()),
5662 upsert: false,
5663 supersedes_id: None,
5664 }],
5665 steps: vec![StepInsert {
5666 id: "step-1".to_owned(),
5667 run_id: "run-1".to_owned(),
5668 kind: "llm_call".to_owned(),
5669 status: "completed".to_owned(),
5670 properties: "{}".to_owned(),
5671 source_ref: Some("src-1".to_owned()),
5672 upsert: false,
5673 supersedes_id: None,
5674 }],
5675 actions: vec![ActionInsert {
5676 id: "action-1".to_owned(),
5677 step_id: "step-1".to_owned(),
5678 kind: "tool_call".to_owned(),
5679 status: "completed".to_owned(),
5680 properties: "{}".to_owned(),
5681 source_ref: Some("src-1".to_owned()),
5682 upsert: false,
5683 supersedes_id: None,
5684 }],
5685 optional_backfills: vec![],
5686 vec_inserts: vec![],
5687 operational_writes: vec![],
5688 })
5689 .expect("write");
5690
5691 assert!(
5692 receipt.provenance_warnings.is_empty(),
5693 "no warnings expected when all types have source_ref; got: {:?}",
5694 receipt.provenance_warnings
5695 );
5696 }
5697
5698 #[test]
5701 fn default_provenance_mode_is_warn() {
5702 let db = NamedTempFile::new().expect("temporary db");
5704 let writer = WriterActor::start(
5705 db.path(),
5706 Arc::new(SchemaManager::new()),
5707 ProvenanceMode::default(),
5708 Arc::new(TelemetryCounters::default()),
5709 )
5710 .expect("writer");
5711
5712 let receipt = writer
5713 .submit(WriteRequest {
5714 label: "test".to_owned(),
5715 nodes: vec![NodeInsert {
5716 row_id: "row-1".to_owned(),
5717 logical_id: "node-1".to_owned(),
5718 kind: "Note".to_owned(),
5719 properties: "{}".to_owned(),
5720 source_ref: None,
5721 upsert: false,
5722 chunk_policy: ChunkPolicy::Preserve,
5723 content_ref: None,
5724 }],
5725 node_retires: vec![],
5726 edges: vec![],
5727 edge_retires: vec![],
5728 chunks: vec![],
5729 runs: vec![],
5730 steps: vec![],
5731 actions: vec![],
5732 optional_backfills: vec![],
5733 vec_inserts: vec![],
5734 operational_writes: vec![],
5735 })
5736 .expect("Warn mode must not reject missing source_ref");
5737
5738 assert!(
5739 !receipt.provenance_warnings.is_empty(),
5740 "Warn mode must emit a warning instead of rejecting"
5741 );
5742 }
5743
5744 #[test]
5745 fn require_mode_rejects_node_without_source_ref() {
5746 let db = NamedTempFile::new().expect("temporary db");
5747 let writer = WriterActor::start(
5748 db.path(),
5749 Arc::new(SchemaManager::new()),
5750 ProvenanceMode::Require,
5751 Arc::new(TelemetryCounters::default()),
5752 )
5753 .expect("writer");
5754
5755 let result = writer.submit(WriteRequest {
5756 label: "test".to_owned(),
5757 nodes: vec![NodeInsert {
5758 row_id: "row-1".to_owned(),
5759 logical_id: "node-1".to_owned(),
5760 kind: "Note".to_owned(),
5761 properties: "{}".to_owned(),
5762 source_ref: None,
5763 upsert: false,
5764 chunk_policy: ChunkPolicy::Preserve,
5765 content_ref: None,
5766 }],
5767 node_retires: vec![],
5768 edges: vec![],
5769 edge_retires: vec![],
5770 chunks: vec![],
5771 runs: vec![],
5772 steps: vec![],
5773 actions: vec![],
5774 optional_backfills: vec![],
5775 vec_inserts: vec![],
5776 operational_writes: vec![],
5777 });
5778
5779 assert!(
5780 matches!(result, Err(EngineError::InvalidWrite(_))),
5781 "Require mode must reject node without source_ref"
5782 );
5783 }
5784
5785 #[test]
5786 fn require_mode_accepts_node_with_source_ref() {
5787 let db = NamedTempFile::new().expect("temporary db");
5788 let writer = WriterActor::start(
5789 db.path(),
5790 Arc::new(SchemaManager::new()),
5791 ProvenanceMode::Require,
5792 Arc::new(TelemetryCounters::default()),
5793 )
5794 .expect("writer");
5795
5796 let result = writer.submit(WriteRequest {
5797 label: "test".to_owned(),
5798 nodes: vec![NodeInsert {
5799 row_id: "row-1".to_owned(),
5800 logical_id: "node-1".to_owned(),
5801 kind: "Note".to_owned(),
5802 properties: "{}".to_owned(),
5803 source_ref: Some("src-1".to_owned()),
5804 upsert: false,
5805 chunk_policy: ChunkPolicy::Preserve,
5806 content_ref: None,
5807 }],
5808 node_retires: vec![],
5809 edges: vec![],
5810 edge_retires: vec![],
5811 chunks: vec![],
5812 runs: vec![],
5813 steps: vec![],
5814 actions: vec![],
5815 optional_backfills: vec![],
5816 vec_inserts: vec![],
5817 operational_writes: vec![],
5818 });
5819
5820 assert!(
5821 result.is_ok(),
5822 "Require mode must accept node with source_ref"
5823 );
5824 }
5825
5826 #[test]
5827 fn require_mode_rejects_edge_without_source_ref() {
5828 let db = NamedTempFile::new().expect("temporary db");
5829 let writer = WriterActor::start(
5830 db.path(),
5831 Arc::new(SchemaManager::new()),
5832 ProvenanceMode::Require,
5833 Arc::new(TelemetryCounters::default()),
5834 )
5835 .expect("writer");
5836
5837 let result = writer.submit(WriteRequest {
5839 label: "test".to_owned(),
5840 nodes: vec![
5841 NodeInsert {
5842 row_id: "row-a".to_owned(),
5843 logical_id: "node-a".to_owned(),
5844 kind: "Note".to_owned(),
5845 properties: "{}".to_owned(),
5846 source_ref: Some("src-1".to_owned()),
5847 upsert: false,
5848 chunk_policy: ChunkPolicy::Preserve,
5849 content_ref: None,
5850 },
5851 NodeInsert {
5852 row_id: "row-b".to_owned(),
5853 logical_id: "node-b".to_owned(),
5854 kind: "Note".to_owned(),
5855 properties: "{}".to_owned(),
5856 source_ref: Some("src-1".to_owned()),
5857 upsert: false,
5858 chunk_policy: ChunkPolicy::Preserve,
5859 content_ref: None,
5860 },
5861 ],
5862 node_retires: vec![],
5863 edges: vec![EdgeInsert {
5864 row_id: "edge-row-1".to_owned(),
5865 logical_id: "edge-1".to_owned(),
5866 source_logical_id: "node-a".to_owned(),
5867 target_logical_id: "node-b".to_owned(),
5868 kind: "LINKS_TO".to_owned(),
5869 properties: "{}".to_owned(),
5870 source_ref: None,
5871 upsert: false,
5872 }],
5873 edge_retires: vec![],
5874 chunks: vec![],
5875 runs: vec![],
5876 steps: vec![],
5877 actions: vec![],
5878 optional_backfills: vec![],
5879 vec_inserts: vec![],
5880 operational_writes: vec![],
5881 });
5882
5883 assert!(
5884 matches!(result, Err(EngineError::InvalidWrite(_))),
5885 "Require mode must reject edge without source_ref"
5886 );
5887 }
5888
5889 #[test]
5892 fn fts_row_has_correct_kind_from_co_submitted_node() {
5893 let db = NamedTempFile::new().expect("temporary db");
5894 let writer = WriterActor::start(
5895 db.path(),
5896 Arc::new(SchemaManager::new()),
5897 ProvenanceMode::Warn,
5898 Arc::new(TelemetryCounters::default()),
5899 )
5900 .expect("writer");
5901
5902 writer
5903 .submit(WriteRequest {
5904 label: "test".to_owned(),
5905 nodes: vec![NodeInsert {
5906 row_id: "row-1".to_owned(),
5907 logical_id: "node-1".to_owned(),
5908 kind: "Meeting".to_owned(),
5909 properties: "{}".to_owned(),
5910 source_ref: Some("src-1".to_owned()),
5911 upsert: false,
5912 chunk_policy: ChunkPolicy::Preserve,
5913 content_ref: None,
5914 }],
5915 node_retires: vec![],
5916 edges: vec![],
5917 edge_retires: vec![],
5918 chunks: vec![ChunkInsert {
5919 id: "chunk-1".to_owned(),
5920 node_logical_id: "node-1".to_owned(),
5921 text_content: "some text".to_owned(),
5922 byte_start: None,
5923 byte_end: None,
5924 content_hash: None,
5925 }],
5926 runs: vec![],
5927 steps: vec![],
5928 actions: vec![],
5929 optional_backfills: vec![],
5930 vec_inserts: vec![],
5931 operational_writes: vec![],
5932 })
5933 .expect("write");
5934
5935 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5936 let kind: String = conn
5937 .query_row(
5938 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5939 [],
5940 |row| row.get(0),
5941 )
5942 .expect("fts row");
5943
5944 assert_eq!(kind, "Meeting");
5945 }
5946
5947 #[test]
5948 fn fts_row_has_correct_text_content() {
5949 let db = NamedTempFile::new().expect("temporary db");
5950 let writer = WriterActor::start(
5951 db.path(),
5952 Arc::new(SchemaManager::new()),
5953 ProvenanceMode::Warn,
5954 Arc::new(TelemetryCounters::default()),
5955 )
5956 .expect("writer");
5957
5958 writer
5959 .submit(WriteRequest {
5960 label: "test".to_owned(),
5961 nodes: vec![NodeInsert {
5962 row_id: "row-1".to_owned(),
5963 logical_id: "node-1".to_owned(),
5964 kind: "Note".to_owned(),
5965 properties: "{}".to_owned(),
5966 source_ref: Some("src-1".to_owned()),
5967 upsert: false,
5968 chunk_policy: ChunkPolicy::Preserve,
5969 content_ref: None,
5970 }],
5971 node_retires: vec![],
5972 edges: vec![],
5973 edge_retires: vec![],
5974 chunks: vec![ChunkInsert {
5975 id: "chunk-1".to_owned(),
5976 node_logical_id: "node-1".to_owned(),
5977 text_content: "exactly this text".to_owned(),
5978 byte_start: None,
5979 byte_end: None,
5980 content_hash: None,
5981 }],
5982 runs: vec![],
5983 steps: vec![],
5984 actions: vec![],
5985 optional_backfills: vec![],
5986 vec_inserts: vec![],
5987 operational_writes: vec![],
5988 })
5989 .expect("write");
5990
5991 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5992 let text: String = conn
5993 .query_row(
5994 "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5995 [],
5996 |row| row.get(0),
5997 )
5998 .expect("fts row");
5999
6000 assert_eq!(text, "exactly this text");
6001 }
6002
6003 #[test]
6004 fn fts_row_has_correct_kind_from_pre_existing_node() {
6005 let db = NamedTempFile::new().expect("temporary db");
6006 let writer = WriterActor::start(
6007 db.path(),
6008 Arc::new(SchemaManager::new()),
6009 ProvenanceMode::Warn,
6010 Arc::new(TelemetryCounters::default()),
6011 )
6012 .expect("writer");
6013
6014 writer
6016 .submit(WriteRequest {
6017 label: "r1".to_owned(),
6018 nodes: vec![NodeInsert {
6019 row_id: "row-1".to_owned(),
6020 logical_id: "node-1".to_owned(),
6021 kind: "Document".to_owned(),
6022 properties: "{}".to_owned(),
6023 source_ref: Some("src-1".to_owned()),
6024 upsert: false,
6025 chunk_policy: ChunkPolicy::Preserve,
6026 content_ref: None,
6027 }],
6028 node_retires: vec![],
6029 edges: vec![],
6030 edge_retires: vec![],
6031 chunks: vec![],
6032 runs: vec![],
6033 steps: vec![],
6034 actions: vec![],
6035 optional_backfills: vec![],
6036 vec_inserts: vec![],
6037 operational_writes: vec![],
6038 })
6039 .expect("r1 write");
6040
6041 writer
6043 .submit(WriteRequest {
6044 label: "r2".to_owned(),
6045 nodes: vec![],
6046 node_retires: vec![],
6047 edges: vec![],
6048 edge_retires: vec![],
6049 chunks: vec![ChunkInsert {
6050 id: "chunk-1".to_owned(),
6051 node_logical_id: "node-1".to_owned(),
6052 text_content: "some text".to_owned(),
6053 byte_start: None,
6054 byte_end: None,
6055 content_hash: None,
6056 }],
6057 runs: vec![],
6058 steps: vec![],
6059 actions: vec![],
6060 optional_backfills: vec![],
6061 vec_inserts: vec![],
6062 operational_writes: vec![],
6063 })
6064 .expect("r2 write");
6065
6066 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6067 let kind: String = conn
6068 .query_row(
6069 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
6070 [],
6071 |row| row.get(0),
6072 )
6073 .expect("fts row");
6074
6075 assert_eq!(kind, "Document");
6076 }
6077
6078 #[test]
6079 fn fts_derives_rows_for_multiple_chunks_per_node() {
6080 let db = NamedTempFile::new().expect("temporary db");
6081 let writer = WriterActor::start(
6082 db.path(),
6083 Arc::new(SchemaManager::new()),
6084 ProvenanceMode::Warn,
6085 Arc::new(TelemetryCounters::default()),
6086 )
6087 .expect("writer");
6088
6089 writer
6090 .submit(WriteRequest {
6091 label: "test".to_owned(),
6092 nodes: vec![NodeInsert {
6093 row_id: "row-1".to_owned(),
6094 logical_id: "node-1".to_owned(),
6095 kind: "Meeting".to_owned(),
6096 properties: "{}".to_owned(),
6097 source_ref: Some("src-1".to_owned()),
6098 upsert: false,
6099 chunk_policy: ChunkPolicy::Preserve,
6100 content_ref: None,
6101 }],
6102 node_retires: vec![],
6103 edges: vec![],
6104 edge_retires: vec![],
6105 chunks: vec![
6106 ChunkInsert {
6107 id: "chunk-a".to_owned(),
6108 node_logical_id: "node-1".to_owned(),
6109 text_content: "intro".to_owned(),
6110 byte_start: None,
6111 byte_end: None,
6112 content_hash: None,
6113 },
6114 ChunkInsert {
6115 id: "chunk-b".to_owned(),
6116 node_logical_id: "node-1".to_owned(),
6117 text_content: "body".to_owned(),
6118 byte_start: None,
6119 byte_end: None,
6120 content_hash: None,
6121 },
6122 ChunkInsert {
6123 id: "chunk-c".to_owned(),
6124 node_logical_id: "node-1".to_owned(),
6125 text_content: "conclusion".to_owned(),
6126 byte_start: None,
6127 byte_end: None,
6128 content_hash: None,
6129 },
6130 ],
6131 runs: vec![],
6132 steps: vec![],
6133 actions: vec![],
6134 optional_backfills: vec![],
6135 vec_inserts: vec![],
6136 operational_writes: vec![],
6137 })
6138 .expect("write");
6139
6140 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6141 let count: i64 = conn
6142 .query_row(
6143 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6144 [],
6145 |row| row.get(0),
6146 )
6147 .expect("fts count");
6148
6149 assert_eq!(count, 3, "three chunks must produce three FTS rows");
6150 }
6151
6152 #[test]
6153 fn fts_resolves_mixed_fast_and_db_paths() {
6154 let db = NamedTempFile::new().expect("temporary db");
6155 let writer = WriterActor::start(
6156 db.path(),
6157 Arc::new(SchemaManager::new()),
6158 ProvenanceMode::Warn,
6159 Arc::new(TelemetryCounters::default()),
6160 )
6161 .expect("writer");
6162
6163 writer
6165 .submit(WriteRequest {
6166 label: "seed".to_owned(),
6167 nodes: vec![NodeInsert {
6168 row_id: "row-existing".to_owned(),
6169 logical_id: "existing-node".to_owned(),
6170 kind: "Archive".to_owned(),
6171 properties: "{}".to_owned(),
6172 source_ref: Some("src-1".to_owned()),
6173 upsert: false,
6174 chunk_policy: ChunkPolicy::Preserve,
6175 content_ref: None,
6176 }],
6177 node_retires: vec![],
6178 edges: vec![],
6179 edge_retires: vec![],
6180 chunks: vec![],
6181 runs: vec![],
6182 steps: vec![],
6183 actions: vec![],
6184 optional_backfills: vec![],
6185 vec_inserts: vec![],
6186 operational_writes: vec![],
6187 })
6188 .expect("seed");
6189
6190 writer
6192 .submit(WriteRequest {
6193 label: "mixed".to_owned(),
6194 nodes: vec![NodeInsert {
6195 row_id: "row-new".to_owned(),
6196 logical_id: "new-node".to_owned(),
6197 kind: "Inbox".to_owned(),
6198 properties: "{}".to_owned(),
6199 source_ref: Some("src-2".to_owned()),
6200 upsert: false,
6201 chunk_policy: ChunkPolicy::Preserve,
6202 content_ref: None,
6203 }],
6204 node_retires: vec![],
6205 edges: vec![],
6206 edge_retires: vec![],
6207 chunks: vec![
6208 ChunkInsert {
6209 id: "chunk-fast".to_owned(),
6210 node_logical_id: "new-node".to_owned(),
6211 text_content: "new content".to_owned(),
6212 byte_start: None,
6213 byte_end: None,
6214 content_hash: None,
6215 },
6216 ChunkInsert {
6217 id: "chunk-db".to_owned(),
6218 node_logical_id: "existing-node".to_owned(),
6219 text_content: "archive content".to_owned(),
6220 byte_start: None,
6221 byte_end: None,
6222 content_hash: None,
6223 },
6224 ],
6225 runs: vec![],
6226 steps: vec![],
6227 actions: vec![],
6228 optional_backfills: vec![],
6229 vec_inserts: vec![],
6230 operational_writes: vec![],
6231 })
6232 .expect("mixed write");
6233
6234 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6235 let fast_kind: String = conn
6236 .query_row(
6237 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
6238 [],
6239 |row| row.get(0),
6240 )
6241 .expect("fast path fts row");
6242 let db_kind: String = conn
6243 .query_row(
6244 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
6245 [],
6246 |row| row.get(0),
6247 )
6248 .expect("db path fts row");
6249
6250 assert_eq!(fast_kind, "Inbox");
6251 assert_eq!(db_kind, "Archive");
6252 }
6253
6254 #[test]
6255 fn prepare_write_rejects_empty_chunk_text() {
6256 let db = NamedTempFile::new().expect("temporary db");
6257 let writer = WriterActor::start(
6258 db.path(),
6259 Arc::new(SchemaManager::new()),
6260 ProvenanceMode::Warn,
6261 Arc::new(TelemetryCounters::default()),
6262 )
6263 .expect("writer");
6264
6265 let result = writer.submit(WriteRequest {
6266 label: "test".to_owned(),
6267 nodes: vec![NodeInsert {
6268 row_id: "row-1".to_owned(),
6269 logical_id: "node-1".to_owned(),
6270 kind: "Note".to_owned(),
6271 properties: "{}".to_owned(),
6272 source_ref: None,
6273 upsert: false,
6274 chunk_policy: ChunkPolicy::Preserve,
6275 content_ref: None,
6276 }],
6277 node_retires: vec![],
6278 edges: vec![],
6279 edge_retires: vec![],
6280 chunks: vec![ChunkInsert {
6281 id: "chunk-1".to_owned(),
6282 node_logical_id: "node-1".to_owned(),
6283 text_content: String::new(),
6284 byte_start: None,
6285 byte_end: None,
6286 content_hash: None,
6287 }],
6288 runs: vec![],
6289 steps: vec![],
6290 actions: vec![],
6291 optional_backfills: vec![],
6292 vec_inserts: vec![],
6293 operational_writes: vec![],
6294 });
6295
6296 assert!(
6297 matches!(result, Err(EngineError::InvalidWrite(_))),
6298 "empty text_content must be rejected"
6299 );
6300 }
6301
6302 #[test]
6303 fn receipt_reports_zero_backfills_when_none_submitted() {
6304 let db = NamedTempFile::new().expect("temporary db");
6305 let writer = WriterActor::start(
6306 db.path(),
6307 Arc::new(SchemaManager::new()),
6308 ProvenanceMode::Warn,
6309 Arc::new(TelemetryCounters::default()),
6310 )
6311 .expect("writer");
6312
6313 let receipt = writer
6314 .submit(WriteRequest {
6315 label: "test".to_owned(),
6316 nodes: vec![NodeInsert {
6317 row_id: "row-1".to_owned(),
6318 logical_id: "node-1".to_owned(),
6319 kind: "Note".to_owned(),
6320 properties: "{}".to_owned(),
6321 source_ref: Some("src-1".to_owned()),
6322 upsert: false,
6323 chunk_policy: ChunkPolicy::Preserve,
6324 content_ref: None,
6325 }],
6326 node_retires: vec![],
6327 edges: vec![],
6328 edge_retires: vec![],
6329 chunks: vec![],
6330 runs: vec![],
6331 steps: vec![],
6332 actions: vec![],
6333 optional_backfills: vec![],
6334 vec_inserts: vec![],
6335 operational_writes: vec![],
6336 })
6337 .expect("write");
6338
6339 assert_eq!(receipt.optional_backfill_count, 0);
6340 }
6341
6342 #[test]
6343 fn receipt_reports_correct_backfill_count() {
6344 let db = NamedTempFile::new().expect("temporary db");
6345 let writer = WriterActor::start(
6346 db.path(),
6347 Arc::new(SchemaManager::new()),
6348 ProvenanceMode::Warn,
6349 Arc::new(TelemetryCounters::default()),
6350 )
6351 .expect("writer");
6352
6353 let receipt = writer
6354 .submit(WriteRequest {
6355 label: "test".to_owned(),
6356 nodes: vec![NodeInsert {
6357 row_id: "row-1".to_owned(),
6358 logical_id: "node-1".to_owned(),
6359 kind: "Note".to_owned(),
6360 properties: "{}".to_owned(),
6361 source_ref: Some("src-1".to_owned()),
6362 upsert: false,
6363 chunk_policy: ChunkPolicy::Preserve,
6364 content_ref: None,
6365 }],
6366 node_retires: vec![],
6367 edges: vec![],
6368 edge_retires: vec![],
6369 chunks: vec![],
6370 runs: vec![],
6371 steps: vec![],
6372 actions: vec![],
6373 optional_backfills: vec![
6374 OptionalProjectionTask {
6375 target: ProjectionTarget::Fts,
6376 payload: "p1".to_owned(),
6377 },
6378 OptionalProjectionTask {
6379 target: ProjectionTarget::Vec,
6380 payload: "p2".to_owned(),
6381 },
6382 OptionalProjectionTask {
6383 target: ProjectionTarget::All,
6384 payload: "p3".to_owned(),
6385 },
6386 ],
6387 vec_inserts: vec![],
6388 operational_writes: vec![],
6389 })
6390 .expect("write");
6391
6392 assert_eq!(receipt.optional_backfill_count, 3);
6393 }
6394
6395 #[test]
6396 fn backfill_tasks_are_not_executed_during_write() {
6397 let db = NamedTempFile::new().expect("temporary db");
6398 let writer = WriterActor::start(
6399 db.path(),
6400 Arc::new(SchemaManager::new()),
6401 ProvenanceMode::Warn,
6402 Arc::new(TelemetryCounters::default()),
6403 )
6404 .expect("writer");
6405
6406 writer
6409 .submit(WriteRequest {
6410 label: "test".to_owned(),
6411 nodes: vec![NodeInsert {
6412 row_id: "row-1".to_owned(),
6413 logical_id: "node-1".to_owned(),
6414 kind: "Note".to_owned(),
6415 properties: "{}".to_owned(),
6416 source_ref: Some("src-1".to_owned()),
6417 upsert: false,
6418 chunk_policy: ChunkPolicy::Preserve,
6419 content_ref: None,
6420 }],
6421 node_retires: vec![],
6422 edges: vec![],
6423 edge_retires: vec![],
6424 chunks: vec![ChunkInsert {
6425 id: "chunk-1".to_owned(),
6426 node_logical_id: "node-1".to_owned(),
6427 text_content: "required text".to_owned(),
6428 byte_start: None,
6429 byte_end: None,
6430 content_hash: None,
6431 }],
6432 runs: vec![],
6433 steps: vec![],
6434 actions: vec![],
6435 optional_backfills: vec![OptionalProjectionTask {
6436 target: ProjectionTarget::Fts,
6437 payload: "backfill-payload".to_owned(),
6438 }],
6439 vec_inserts: vec![],
6440 operational_writes: vec![],
6441 })
6442 .expect("write");
6443
6444 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6445 let count: i64 = conn
6446 .query_row(
6447 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6448 [],
6449 |row| row.get(0),
6450 )
6451 .expect("fts count");
6452
6453 assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6454 }
6455
6456 #[test]
6457 fn fts_row_uses_new_kind_after_node_replace() {
6458 let db = NamedTempFile::new().expect("temporary db");
6459 let writer = WriterActor::start(
6460 db.path(),
6461 Arc::new(SchemaManager::new()),
6462 ProvenanceMode::Warn,
6463 Arc::new(TelemetryCounters::default()),
6464 )
6465 .expect("writer");
6466
6467 writer
6469 .submit(WriteRequest {
6470 label: "v1".to_owned(),
6471 nodes: vec![NodeInsert {
6472 row_id: "row-1".to_owned(),
6473 logical_id: "node-1".to_owned(),
6474 kind: "Note".to_owned(),
6475 properties: "{}".to_owned(),
6476 source_ref: Some("src-1".to_owned()),
6477 upsert: false,
6478 chunk_policy: ChunkPolicy::Preserve,
6479 content_ref: None,
6480 }],
6481 node_retires: vec![],
6482 edges: vec![],
6483 edge_retires: vec![],
6484 chunks: vec![ChunkInsert {
6485 id: "chunk-v1".to_owned(),
6486 node_logical_id: "node-1".to_owned(),
6487 text_content: "original".to_owned(),
6488 byte_start: None,
6489 byte_end: None,
6490 content_hash: None,
6491 }],
6492 runs: vec![],
6493 steps: vec![],
6494 actions: vec![],
6495 optional_backfills: vec![],
6496 vec_inserts: vec![],
6497 operational_writes: vec![],
6498 })
6499 .expect("v1 write");
6500
6501 writer
6503 .submit(WriteRequest {
6504 label: "v2".to_owned(),
6505 nodes: vec![NodeInsert {
6506 row_id: "row-2".to_owned(),
6507 logical_id: "node-1".to_owned(),
6508 kind: "Meeting".to_owned(),
6509 properties: "{}".to_owned(),
6510 source_ref: Some("src-2".to_owned()),
6511 upsert: true,
6512 chunk_policy: ChunkPolicy::Replace,
6513 content_ref: None,
6514 }],
6515 node_retires: vec![],
6516 edges: vec![],
6517 edge_retires: vec![],
6518 chunks: vec![ChunkInsert {
6519 id: "chunk-v2".to_owned(),
6520 node_logical_id: "node-1".to_owned(),
6521 text_content: "updated".to_owned(),
6522 byte_start: None,
6523 byte_end: None,
6524 content_hash: None,
6525 }],
6526 runs: vec![],
6527 steps: vec![],
6528 actions: vec![],
6529 optional_backfills: vec![],
6530 vec_inserts: vec![],
6531 operational_writes: vec![],
6532 })
6533 .expect("v2 write");
6534
6535 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6536
6537 let old_count: i64 = conn
6539 .query_row(
6540 "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6541 [],
6542 |row| row.get(0),
6543 )
6544 .expect("old fts count");
6545 assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6546
6547 let new_kind: String = conn
6549 .query_row(
6550 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6551 [],
6552 |row| row.get(0),
6553 )
6554 .expect("new fts row");
6555 assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6556 }
6557
6558 #[test]
6561 fn vec_insert_empty_chunk_id_is_rejected() {
6562 let db = NamedTempFile::new().expect("temporary db");
6563 let writer = WriterActor::start(
6564 db.path(),
6565 Arc::new(SchemaManager::new()),
6566 ProvenanceMode::Warn,
6567 Arc::new(TelemetryCounters::default()),
6568 )
6569 .expect("writer");
6570 let result = writer.submit(WriteRequest {
6571 label: "vec-test".to_owned(),
6572 nodes: vec![],
6573 node_retires: vec![],
6574 edges: vec![],
6575 edge_retires: vec![],
6576 chunks: vec![],
6577 runs: vec![],
6578 steps: vec![],
6579 actions: vec![],
6580 optional_backfills: vec![],
6581 vec_inserts: vec![VecInsert {
6582 chunk_id: String::new(),
6583 embedding: vec![0.1, 0.2, 0.3],
6584 }],
6585 operational_writes: vec![],
6586 });
6587 assert!(
6588 matches!(result, Err(EngineError::InvalidWrite(_))),
6589 "empty chunk_id in VecInsert must be rejected"
6590 );
6591 }
6592
6593 #[test]
6594 fn vec_insert_empty_embedding_is_rejected() {
6595 let db = NamedTempFile::new().expect("temporary db");
6596 let writer = WriterActor::start(
6597 db.path(),
6598 Arc::new(SchemaManager::new()),
6599 ProvenanceMode::Warn,
6600 Arc::new(TelemetryCounters::default()),
6601 )
6602 .expect("writer");
6603 let result = writer.submit(WriteRequest {
6604 label: "vec-test".to_owned(),
6605 nodes: vec![],
6606 node_retires: vec![],
6607 edges: vec![],
6608 edge_retires: vec![],
6609 chunks: vec![],
6610 runs: vec![],
6611 steps: vec![],
6612 actions: vec![],
6613 optional_backfills: vec![],
6614 vec_inserts: vec![VecInsert {
6615 chunk_id: "chunk-1".to_owned(),
6616 embedding: vec![],
6617 }],
6618 operational_writes: vec![],
6619 });
6620 assert!(
6621 matches!(result, Err(EngineError::InvalidWrite(_))),
6622 "empty embedding in VecInsert must be rejected"
6623 );
6624 }
6625
6626 #[test]
6627 fn vec_insert_noop_without_feature() {
6628 let db = NamedTempFile::new().expect("temporary db");
6631 let writer = WriterActor::start(
6632 db.path(),
6633 Arc::new(SchemaManager::new()),
6634 ProvenanceMode::Warn,
6635 Arc::new(TelemetryCounters::default()),
6636 )
6637 .expect("writer");
6638 let result = writer.submit(WriteRequest {
6639 label: "vec-noop".to_owned(),
6640 nodes: vec![],
6641 node_retires: vec![],
6642 edges: vec![],
6643 edge_retires: vec![],
6644 chunks: vec![],
6645 runs: vec![],
6646 steps: vec![],
6647 actions: vec![],
6648 optional_backfills: vec![],
6649 vec_inserts: vec![VecInsert {
6650 chunk_id: "chunk-noop".to_owned(),
6651 embedding: vec![1.0, 2.0, 3.0],
6652 }],
6653 operational_writes: vec![],
6654 });
6655 #[cfg(not(feature = "sqlite-vec"))]
6656 result.expect("noop VecInsert without feature must succeed");
6657 #[cfg(feature = "sqlite-vec")]
6659 let _ = result;
6660 }
6661
6662 #[cfg(feature = "sqlite-vec")]
6663 #[test]
6664 fn node_retire_preserves_vec_rows_for_later_restore() {
6665 use crate::sqlite::open_connection_with_vec;
6666
6667 let db = NamedTempFile::new().expect("temporary db");
6668 let schema_manager = Arc::new(SchemaManager::new());
6669
6670 {
6671 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6672 schema_manager.bootstrap(&conn).expect("bootstrap");
6673 schema_manager
6674 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6675 .expect("ensure profile");
6676 }
6677
6678 let writer = WriterActor::start(
6679 db.path(),
6680 Arc::clone(&schema_manager),
6681 ProvenanceMode::Warn,
6682 Arc::new(TelemetryCounters::default()),
6683 )
6684 .expect("writer");
6685
6686 writer
6688 .submit(WriteRequest {
6689 label: "setup".to_owned(),
6690 nodes: vec![NodeInsert {
6691 row_id: "row-retire-vec".to_owned(),
6692 logical_id: "node-retire-vec".to_owned(),
6693 kind: "Doc".to_owned(),
6694 properties: "{}".to_owned(),
6695 source_ref: Some("src".to_owned()),
6696 upsert: false,
6697 chunk_policy: ChunkPolicy::Preserve,
6698 content_ref: None,
6699 }],
6700 node_retires: vec![],
6701 edges: vec![],
6702 edge_retires: vec![],
6703 chunks: vec![ChunkInsert {
6704 id: "chunk-retire-vec".to_owned(),
6705 node_logical_id: "node-retire-vec".to_owned(),
6706 text_content: "text".to_owned(),
6707 byte_start: None,
6708 byte_end: None,
6709 content_hash: None,
6710 }],
6711 runs: vec![],
6712 steps: vec![],
6713 actions: vec![],
6714 optional_backfills: vec![],
6715 vec_inserts: vec![VecInsert {
6716 chunk_id: "chunk-retire-vec".to_owned(),
6717 embedding: vec![0.1, 0.2, 0.3],
6718 }],
6719 operational_writes: vec![],
6720 })
6721 .expect("setup write");
6722
6723 writer
6725 .submit(WriteRequest {
6726 label: "retire".to_owned(),
6727 nodes: vec![],
6728 node_retires: vec![NodeRetire {
6729 logical_id: "node-retire-vec".to_owned(),
6730 source_ref: Some("src".to_owned()),
6731 }],
6732 edges: vec![],
6733 edge_retires: vec![],
6734 chunks: vec![],
6735 runs: vec![],
6736 steps: vec![],
6737 actions: vec![],
6738 optional_backfills: vec![],
6739 vec_inserts: vec![],
6740 operational_writes: vec![],
6741 })
6742 .expect("retire write");
6743
6744 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6745 let count: i64 = conn
6746 .query_row(
6747 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-retire-vec'",
6748 [],
6749 |row| row.get(0),
6750 )
6751 .expect("count");
6752 assert_eq!(
6753 count, 1,
6754 "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6755 );
6756 }
6757
6758 #[cfg(feature = "sqlite-vec")]
6759 #[test]
6760 #[allow(clippy::too_many_lines)]
6761 fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6762 use crate::sqlite::open_connection_with_vec;
6763
6764 let db = NamedTempFile::new().expect("temporary db");
6765 let schema_manager = Arc::new(SchemaManager::new());
6766
6767 {
6768 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6769 schema_manager.bootstrap(&conn).expect("bootstrap");
6770 schema_manager
6771 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6772 .expect("ensure profile");
6773 }
6774
6775 let writer = WriterActor::start(
6776 db.path(),
6777 Arc::clone(&schema_manager),
6778 ProvenanceMode::Warn,
6779 Arc::new(TelemetryCounters::default()),
6780 )
6781 .expect("writer");
6782
6783 writer
6785 .submit(WriteRequest {
6786 label: "v1".to_owned(),
6787 nodes: vec![NodeInsert {
6788 row_id: "row-replace-v1".to_owned(),
6789 logical_id: "node-replace-vec".to_owned(),
6790 kind: "Doc".to_owned(),
6791 properties: "{}".to_owned(),
6792 source_ref: Some("src".to_owned()),
6793 upsert: false,
6794 chunk_policy: ChunkPolicy::Preserve,
6795 content_ref: None,
6796 }],
6797 node_retires: vec![],
6798 edges: vec![],
6799 edge_retires: vec![],
6800 chunks: vec![ChunkInsert {
6801 id: "chunk-replace-A".to_owned(),
6802 node_logical_id: "node-replace-vec".to_owned(),
6803 text_content: "version one".to_owned(),
6804 byte_start: None,
6805 byte_end: None,
6806 content_hash: None,
6807 }],
6808 runs: vec![],
6809 steps: vec![],
6810 actions: vec![],
6811 optional_backfills: vec![],
6812 vec_inserts: vec![VecInsert {
6813 chunk_id: "chunk-replace-A".to_owned(),
6814 embedding: vec![0.1, 0.2, 0.3],
6815 }],
6816 operational_writes: vec![],
6817 })
6818 .expect("v1 write");
6819
6820 writer
6822 .submit(WriteRequest {
6823 label: "v2".to_owned(),
6824 nodes: vec![NodeInsert {
6825 row_id: "row-replace-v2".to_owned(),
6826 logical_id: "node-replace-vec".to_owned(),
6827 kind: "Doc".to_owned(),
6828 properties: "{}".to_owned(),
6829 source_ref: Some("src".to_owned()),
6830 upsert: true,
6831 chunk_policy: ChunkPolicy::Replace,
6832 content_ref: None,
6833 }],
6834 node_retires: vec![],
6835 edges: vec![],
6836 edge_retires: vec![],
6837 chunks: vec![ChunkInsert {
6838 id: "chunk-replace-B".to_owned(),
6839 node_logical_id: "node-replace-vec".to_owned(),
6840 text_content: "version two".to_owned(),
6841 byte_start: None,
6842 byte_end: None,
6843 content_hash: None,
6844 }],
6845 runs: vec![],
6846 steps: vec![],
6847 actions: vec![],
6848 optional_backfills: vec![],
6849 vec_inserts: vec![VecInsert {
6850 chunk_id: "chunk-replace-B".to_owned(),
6851 embedding: vec![0.4, 0.5, 0.6],
6852 }],
6853 operational_writes: vec![],
6854 })
6855 .expect("v2 write");
6856
6857 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6858 let count_a: i64 = conn
6859 .query_row(
6860 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-A'",
6861 [],
6862 |row| row.get(0),
6863 )
6864 .expect("count A");
6865 let count_b: i64 = conn
6866 .query_row(
6867 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-B'",
6868 [],
6869 |row| row.get(0),
6870 )
6871 .expect("count B");
6872 assert_eq!(
6873 count_a, 0,
6874 "old vec row (chunk-A) must be deleted on Replace"
6875 );
6876 assert_eq!(
6877 count_b, 1,
6878 "new vec row (chunk-B) must be present after Replace"
6879 );
6880 }
6881
6882 #[cfg(feature = "sqlite-vec")]
6883 #[test]
6884 fn vec_insert_is_persisted_when_feature_enabled() {
6885 use crate::sqlite::open_connection_with_vec;
6886
6887 let db = NamedTempFile::new().expect("temporary db");
6888 let schema_manager = Arc::new(SchemaManager::new());
6889
6890 {
6892 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6893 schema_manager.bootstrap(&conn).expect("bootstrap");
6894 schema_manager
6895 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6896 .expect("ensure profile");
6897 }
6898
6899 let writer = WriterActor::start(
6900 db.path(),
6901 Arc::clone(&schema_manager),
6902 ProvenanceMode::Warn,
6903 Arc::new(TelemetryCounters::default()),
6904 )
6905 .expect("writer");
6906
6907 writer
6908 .submit(WriteRequest {
6909 label: "vec-insert".to_owned(),
6910 nodes: vec![],
6911 node_retires: vec![],
6912 edges: vec![],
6913 edge_retires: vec![],
6914 chunks: vec![],
6915 runs: vec![],
6916 steps: vec![],
6917 actions: vec![],
6918 optional_backfills: vec![],
6919 vec_inserts: vec![VecInsert {
6920 chunk_id: "chunk-vec".to_owned(),
6921 embedding: vec![0.1, 0.2, 0.3],
6922 }],
6923 operational_writes: vec![],
6924 })
6925 .expect("vec insert write");
6926
6927 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6928 let count: i64 = conn
6929 .query_row(
6930 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-vec'",
6931 [],
6932 |row| row.get(0),
6933 )
6934 .expect("count");
6935 assert_eq!(count, 1, "VecInsert must persist a row in vec_nodes_active");
6936 }
6937
6938 #[test]
6941 fn write_request_exceeding_node_limit_is_rejected() {
6942 let nodes: Vec<NodeInsert> = (0..10_001)
6943 .map(|i| NodeInsert {
6944 row_id: format!("row-{i}"),
6945 logical_id: format!("lg-{i}"),
6946 kind: "Note".to_owned(),
6947 properties: "{}".to_owned(),
6948 source_ref: None,
6949 upsert: false,
6950 chunk_policy: ChunkPolicy::Preserve,
6951 content_ref: None,
6952 })
6953 .collect();
6954
6955 let request = WriteRequest {
6956 label: "too-many-nodes".to_owned(),
6957 nodes,
6958 node_retires: vec![],
6959 edges: vec![],
6960 edge_retires: vec![],
6961 chunks: vec![],
6962 runs: vec![],
6963 steps: vec![],
6964 actions: vec![],
6965 optional_backfills: vec![],
6966 vec_inserts: vec![],
6967 operational_writes: vec![],
6968 };
6969
6970 let result = prepare_write(request, ProvenanceMode::Warn)
6971 .map(|_| ())
6972 .map_err(|e| format!("{e}"));
6973 assert!(
6974 matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6975 "exceeding node limit must return InvalidWrite: got {result:?}"
6976 );
6977 }
6978
6979 #[test]
6980 fn write_request_exceeding_total_limit_is_rejected() {
6981 let request = WriteRequest {
6985 label: "too-many-total".to_owned(),
6986 nodes: (0..10_000)
6987 .map(|i| NodeInsert {
6988 row_id: format!("row-{i}"),
6989 logical_id: format!("lg-{i}"),
6990 kind: "Note".to_owned(),
6991 properties: "{}".to_owned(),
6992 source_ref: None,
6993 upsert: false,
6994 chunk_policy: ChunkPolicy::Preserve,
6995 content_ref: None,
6996 })
6997 .collect(),
6998 node_retires: vec![],
6999 edges: (0..10_000)
7000 .map(|i| EdgeInsert {
7001 row_id: format!("edge-row-{i}"),
7002 logical_id: format!("edge-lg-{i}"),
7003 kind: "link".to_owned(),
7004 source_logical_id: format!("lg-{i}"),
7005 target_logical_id: format!("lg-{}", i + 1),
7006 properties: "{}".to_owned(),
7007 source_ref: None,
7008 upsert: false,
7009 })
7010 .collect(),
7011 edge_retires: vec![],
7012 chunks: (0..50_000)
7013 .map(|i| ChunkInsert {
7014 id: format!("chunk-{i}"),
7015 node_logical_id: "lg-0".to_owned(),
7016 text_content: "text".to_owned(),
7017 byte_start: None,
7018 byte_end: None,
7019 content_hash: None,
7020 })
7021 .collect(),
7022 runs: vec![],
7023 steps: vec![],
7024 actions: vec![],
7025 optional_backfills: vec![],
7026 vec_inserts: (0..20_001)
7027 .map(|i| VecInsert {
7028 chunk_id: format!("vec-chunk-{i}"),
7029 embedding: vec![0.1],
7030 })
7031 .collect(),
7032 operational_writes: (0..10_000)
7033 .map(|i| OperationalWrite::Append {
7034 collection: format!("col-{i}"),
7035 record_key: format!("key-{i}"),
7036 payload_json: "{}".to_owned(),
7037 source_ref: None,
7038 })
7039 .collect(),
7040 };
7041
7042 let result = prepare_write(request, ProvenanceMode::Warn)
7043 .map(|_| ())
7044 .map_err(|e| format!("{e}"));
7045 assert!(
7046 matches!(result, Err(ref msg) if msg.contains("too many total items")),
7047 "exceeding total item limit must return InvalidWrite: got {result:?}"
7048 );
7049 }
7050
7051 #[test]
7052 fn write_request_within_limits_succeeds() {
7053 let db = NamedTempFile::new().expect("temporary db");
7054 let writer = WriterActor::start(
7055 db.path(),
7056 Arc::new(SchemaManager::new()),
7057 ProvenanceMode::Warn,
7058 Arc::new(TelemetryCounters::default()),
7059 )
7060 .expect("writer");
7061
7062 let result = writer.submit(WriteRequest {
7063 label: "within-limits".to_owned(),
7064 nodes: vec![NodeInsert {
7065 row_id: "row-1".to_owned(),
7066 logical_id: "lg-1".to_owned(),
7067 kind: "Note".to_owned(),
7068 properties: "{}".to_owned(),
7069 source_ref: None,
7070 upsert: false,
7071 chunk_policy: ChunkPolicy::Preserve,
7072 content_ref: None,
7073 }],
7074 node_retires: vec![],
7075 edges: vec![],
7076 edge_retires: vec![],
7077 chunks: vec![],
7078 runs: vec![],
7079 steps: vec![],
7080 actions: vec![],
7081 optional_backfills: vec![],
7082 vec_inserts: vec![],
7083 operational_writes: vec![],
7084 });
7085
7086 assert!(
7087 result.is_ok(),
7088 "write request within limits must succeed: got {result:?}"
7089 );
7090 }
7091
7092 #[test]
7093 fn property_fts_rows_created_on_node_insert() {
7094 let db = NamedTempFile::new().expect("temporary db");
7095 let schema = Arc::new(SchemaManager::new());
7097 {
7098 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7099 schema.bootstrap(&conn).expect("bootstrap");
7100 conn.execute(
7101 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7102 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
7103 [],
7104 )
7105 .expect("register schema");
7106 }
7107 let writer = WriterActor::start(
7108 db.path(),
7109 Arc::clone(&schema),
7110 ProvenanceMode::Warn,
7111 Arc::new(TelemetryCounters::default()),
7112 )
7113 .expect("writer");
7114
7115 writer
7116 .submit(WriteRequest {
7117 label: "goal-insert".to_owned(),
7118 nodes: vec![NodeInsert {
7119 row_id: "row-1".to_owned(),
7120 logical_id: "goal-1".to_owned(),
7121 kind: "Goal".to_owned(),
7122 properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
7123 .to_owned(),
7124 source_ref: Some("src-1".to_owned()),
7125 upsert: false,
7126 chunk_policy: ChunkPolicy::Preserve,
7127 content_ref: None,
7128 }],
7129 node_retires: vec![],
7130 edges: vec![],
7131 edge_retires: vec![],
7132 chunks: vec![],
7133 runs: vec![],
7134 steps: vec![],
7135 actions: vec![],
7136 optional_backfills: vec![],
7137 vec_inserts: vec![],
7138 operational_writes: vec![],
7139 })
7140 .expect("write");
7141
7142 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7143 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7144 let text: String = conn
7145 .query_row(
7146 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7147 [],
7148 |row| row.get(0),
7149 )
7150 .expect("property FTS row must exist");
7151 assert_eq!(text, "Ship v2 Launch the redesign");
7152 }
7153
7154 #[test]
7155 fn property_fts_rows_replaced_on_upsert() {
7156 let db = NamedTempFile::new().expect("temporary db");
7157 let schema = Arc::new(SchemaManager::new());
7158 {
7159 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7160 schema.bootstrap(&conn).expect("bootstrap");
7161 conn.execute(
7162 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7163 VALUES ('Goal', '[\"$.name\"]', ' ')",
7164 [],
7165 )
7166 .expect("register schema");
7167 }
7168 let writer = WriterActor::start(
7169 db.path(),
7170 Arc::clone(&schema),
7171 ProvenanceMode::Warn,
7172 Arc::new(TelemetryCounters::default()),
7173 )
7174 .expect("writer");
7175
7176 writer
7178 .submit(WriteRequest {
7179 label: "insert".to_owned(),
7180 nodes: vec![NodeInsert {
7181 row_id: "row-1".to_owned(),
7182 logical_id: "goal-1".to_owned(),
7183 kind: "Goal".to_owned(),
7184 properties: r#"{"name":"Alpha"}"#.to_owned(),
7185 source_ref: Some("src-1".to_owned()),
7186 upsert: false,
7187 chunk_policy: ChunkPolicy::Preserve,
7188 content_ref: None,
7189 }],
7190 node_retires: vec![],
7191 edges: vec![],
7192 edge_retires: vec![],
7193 chunks: vec![],
7194 runs: vec![],
7195 steps: vec![],
7196 actions: vec![],
7197 optional_backfills: vec![],
7198 vec_inserts: vec![],
7199 operational_writes: vec![],
7200 })
7201 .expect("insert");
7202
7203 writer
7205 .submit(WriteRequest {
7206 label: "upsert".to_owned(),
7207 nodes: vec![NodeInsert {
7208 row_id: "row-2".to_owned(),
7209 logical_id: "goal-1".to_owned(),
7210 kind: "Goal".to_owned(),
7211 properties: r#"{"name":"Beta"}"#.to_owned(),
7212 source_ref: Some("src-2".to_owned()),
7213 upsert: true,
7214 chunk_policy: ChunkPolicy::Preserve,
7215 content_ref: None,
7216 }],
7217 node_retires: vec![],
7218 edges: vec![],
7219 edge_retires: vec![],
7220 chunks: vec![],
7221 runs: vec![],
7222 steps: vec![],
7223 actions: vec![],
7224 optional_backfills: vec![],
7225 vec_inserts: vec![],
7226 operational_writes: vec![],
7227 })
7228 .expect("upsert");
7229
7230 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7231 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7232 let count: i64 = conn
7233 .query_row(
7234 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7235 [],
7236 |row| row.get(0),
7237 )
7238 .expect("count");
7239 assert_eq!(
7240 count, 1,
7241 "must have exactly one property FTS row after upsert"
7242 );
7243
7244 let text: String = conn
7245 .query_row(
7246 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7247 [],
7248 |row| row.get(0),
7249 )
7250 .expect("text");
7251 assert_eq!(text, "Beta", "property FTS must reflect updated properties");
7252 }
7253
7254 #[test]
7255 fn property_fts_rows_deleted_on_retire() {
7256 let db = NamedTempFile::new().expect("temporary db");
7257 let schema = Arc::new(SchemaManager::new());
7258 {
7259 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7260 schema.bootstrap(&conn).expect("bootstrap");
7261 conn.execute(
7262 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7263 VALUES ('Goal', '[\"$.name\"]', ' ')",
7264 [],
7265 )
7266 .expect("register schema");
7267 }
7268 let writer = WriterActor::start(
7269 db.path(),
7270 Arc::clone(&schema),
7271 ProvenanceMode::Warn,
7272 Arc::new(TelemetryCounters::default()),
7273 )
7274 .expect("writer");
7275
7276 writer
7278 .submit(WriteRequest {
7279 label: "insert".to_owned(),
7280 nodes: vec![NodeInsert {
7281 row_id: "row-1".to_owned(),
7282 logical_id: "goal-1".to_owned(),
7283 kind: "Goal".to_owned(),
7284 properties: r#"{"name":"Alpha"}"#.to_owned(),
7285 source_ref: Some("src-1".to_owned()),
7286 upsert: false,
7287 chunk_policy: ChunkPolicy::Preserve,
7288 content_ref: None,
7289 }],
7290 node_retires: vec![],
7291 edges: vec![],
7292 edge_retires: vec![],
7293 chunks: vec![],
7294 runs: vec![],
7295 steps: vec![],
7296 actions: vec![],
7297 optional_backfills: vec![],
7298 vec_inserts: vec![],
7299 operational_writes: vec![],
7300 })
7301 .expect("insert");
7302
7303 writer
7305 .submit(WriteRequest {
7306 label: "retire".to_owned(),
7307 nodes: vec![],
7308 node_retires: vec![NodeRetire {
7309 logical_id: "goal-1".to_owned(),
7310 source_ref: Some("forget-1".to_owned()),
7311 }],
7312 edges: vec![],
7313 edge_retires: vec![],
7314 chunks: vec![],
7315 runs: vec![],
7316 steps: vec![],
7317 actions: vec![],
7318 optional_backfills: vec![],
7319 vec_inserts: vec![],
7320 operational_writes: vec![],
7321 })
7322 .expect("retire");
7323
7324 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7325 let table = fathomdb_schema::fts_kind_table_name("Goal");
7326 let count: i64 = conn
7327 .query_row(
7328 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
7329 [],
7330 |row| row.get(0),
7331 )
7332 .expect("count");
7333 assert_eq!(count, 0, "property FTS row must be deleted on retire");
7334 }
7335
7336 #[test]
7337 fn no_property_fts_row_for_unregistered_kind() {
7338 let db = NamedTempFile::new().expect("temporary db");
7339 let schema = Arc::new(SchemaManager::new());
7340 {
7341 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7342 schema.bootstrap(&conn).expect("bootstrap");
7343 }
7345 let writer = WriterActor::start(
7346 db.path(),
7347 Arc::clone(&schema),
7348 ProvenanceMode::Warn,
7349 Arc::new(TelemetryCounters::default()),
7350 )
7351 .expect("writer");
7352
7353 writer
7354 .submit(WriteRequest {
7355 label: "insert".to_owned(),
7356 nodes: vec![NodeInsert {
7357 row_id: "row-1".to_owned(),
7358 logical_id: "note-1".to_owned(),
7359 kind: "Note".to_owned(),
7360 properties: r#"{"title":"hello"}"#.to_owned(),
7361 source_ref: Some("src-1".to_owned()),
7362 upsert: false,
7363 chunk_policy: ChunkPolicy::Preserve,
7364 content_ref: None,
7365 }],
7366 node_retires: vec![],
7367 edges: vec![],
7368 edge_retires: vec![],
7369 chunks: vec![],
7370 runs: vec![],
7371 steps: vec![],
7372 actions: vec![],
7373 optional_backfills: vec![],
7374 vec_inserts: vec![],
7375 operational_writes: vec![],
7376 })
7377 .expect("insert");
7378
7379 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7380 let table = fathomdb_schema::fts_kind_table_name("Note");
7381 let exists: bool = conn
7382 .query_row(
7383 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1",
7384 rusqlite::params![table],
7385 |row| row.get::<_, i64>(0),
7386 )
7387 .map(|c| c > 0)
7388 .expect("exists check");
7389 assert!(
7390 !exists,
7391 "no per-kind FTS table should be created for unregistered kind"
7392 );
7393 }
7394
7395 #[test]
7398 fn property_fts_write_goes_to_per_kind_table() {
7399 let db = NamedTempFile::new().expect("temporary db");
7401 let schema = Arc::new(SchemaManager::new());
7402 {
7403 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7404 schema.bootstrap(&conn).expect("bootstrap");
7405 conn.execute(
7406 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7407 VALUES ('Goal', '[\"$.name\"]', ' ')",
7408 [],
7409 )
7410 .expect("register schema");
7411 conn.execute_batch(
7413 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7414 node_logical_id UNINDEXED, text_content, \
7415 tokenize = 'porter unicode61 remove_diacritics 2'\
7416 )",
7417 )
7418 .expect("create per-kind table");
7419 }
7420 let writer = WriterActor::start(
7421 db.path(),
7422 Arc::clone(&schema),
7423 ProvenanceMode::Warn,
7424 Arc::new(TelemetryCounters::default()),
7425 )
7426 .expect("writer");
7427
7428 writer
7429 .submit(WriteRequest {
7430 label: "insert".to_owned(),
7431 nodes: vec![NodeInsert {
7432 row_id: "row-1".to_owned(),
7433 logical_id: "goal-1".to_owned(),
7434 kind: "Goal".to_owned(),
7435 properties: r#"{"name":"Ship v2"}"#.to_owned(),
7436 source_ref: Some("src-1".to_owned()),
7437 upsert: false,
7438 chunk_policy: ChunkPolicy::Preserve,
7439 content_ref: None,
7440 }],
7441 node_retires: vec![],
7442 edges: vec![],
7443 edge_retires: vec![],
7444 chunks: vec![],
7445 runs: vec![],
7446 steps: vec![],
7447 actions: vec![],
7448 optional_backfills: vec![],
7449 vec_inserts: vec![],
7450 operational_writes: vec![],
7451 })
7452 .expect("write");
7453
7454 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7455 let per_kind_count: i64 = conn
7457 .query_row(
7458 "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7459 [],
7460 |row| row.get(0),
7461 )
7462 .expect("per-kind count");
7463 assert_eq!(
7464 per_kind_count, 1,
7465 "per-kind table fts_props_goal must have the row"
7466 );
7467 }
7468
7469 #[test]
7470 fn property_fts_retire_removes_from_per_kind_table() {
7471 let db = NamedTempFile::new().expect("temporary db");
7473 let schema = Arc::new(SchemaManager::new());
7474 {
7475 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7476 schema.bootstrap(&conn).expect("bootstrap");
7477 conn.execute(
7478 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7479 VALUES ('Goal', '[\"$.name\"]', ' ')",
7480 [],
7481 )
7482 .expect("register schema");
7483 conn.execute_batch(
7484 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7485 node_logical_id UNINDEXED, text_content, \
7486 tokenize = 'porter unicode61 remove_diacritics 2'\
7487 )",
7488 )
7489 .expect("create per-kind table");
7490 }
7491 let writer = WriterActor::start(
7492 db.path(),
7493 Arc::clone(&schema),
7494 ProvenanceMode::Warn,
7495 Arc::new(TelemetryCounters::default()),
7496 )
7497 .expect("writer");
7498
7499 writer
7501 .submit(WriteRequest {
7502 label: "insert".to_owned(),
7503 nodes: vec![NodeInsert {
7504 row_id: "row-1".to_owned(),
7505 logical_id: "goal-1".to_owned(),
7506 kind: "Goal".to_owned(),
7507 properties: r#"{"name":"Alpha"}"#.to_owned(),
7508 source_ref: Some("src-1".to_owned()),
7509 upsert: false,
7510 chunk_policy: ChunkPolicy::Preserve,
7511 content_ref: None,
7512 }],
7513 node_retires: vec![],
7514 edges: vec![],
7515 edge_retires: vec![],
7516 chunks: vec![],
7517 runs: vec![],
7518 steps: vec![],
7519 actions: vec![],
7520 optional_backfills: vec![],
7521 vec_inserts: vec![],
7522 operational_writes: vec![],
7523 })
7524 .expect("insert");
7525
7526 writer
7528 .submit(WriteRequest {
7529 label: "retire".to_owned(),
7530 nodes: vec![],
7531 node_retires: vec![NodeRetire {
7532 logical_id: "goal-1".to_owned(),
7533 source_ref: Some("forget-1".to_owned()),
7534 }],
7535 edges: vec![],
7536 edge_retires: vec![],
7537 chunks: vec![],
7538 runs: vec![],
7539 steps: vec![],
7540 actions: vec![],
7541 optional_backfills: vec![],
7542 vec_inserts: vec![],
7543 operational_writes: vec![],
7544 })
7545 .expect("retire");
7546
7547 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7548 let per_kind_count: i64 = conn
7549 .query_row(
7550 "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7551 [],
7552 |row| row.get(0),
7553 )
7554 .expect("per-kind count");
7555 assert_eq!(
7556 per_kind_count, 0,
7557 "per-kind table row must be removed on retire"
7558 );
7559 }
7560
7561 mod extract_json_path_tests {
7562 use super::super::extract_json_path;
7563 use serde_json::json;
7564
7565 #[test]
7566 fn string_value() {
7567 let v = json!({"name": "alice"});
7568 assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7569 }
7570
7571 #[test]
7572 fn number_value() {
7573 let v = json!({"age": 42});
7574 assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7575 }
7576
7577 #[test]
7578 fn bool_value() {
7579 let v = json!({"active": true});
7580 assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7581 }
7582
7583 #[test]
7584 fn null_value() {
7585 let v = json!({"x": null});
7586 assert!(extract_json_path(&v, "$.x").is_empty());
7587 }
7588
7589 #[test]
7590 fn missing_path() {
7591 let v = json!({"name": "a"});
7592 assert!(extract_json_path(&v, "$.missing").is_empty());
7593 }
7594
7595 #[test]
7596 fn nested_path() {
7597 let v = json!({"address": {"city": "NYC"}});
7598 assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7599 }
7600
7601 #[test]
7602 fn array_of_strings() {
7603 let v = json!({"tags": ["a", "b", "c"]});
7604 assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7605 }
7606
7607 #[test]
7608 fn array_mixed_scalars() {
7609 let v = json!({"vals": ["x", 1, true]});
7610 assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7611 }
7612
7613 #[test]
7614 fn array_only_objects_returns_empty() {
7615 let v = json!({"data": [{"k": "v"}]});
7616 assert!(extract_json_path(&v, "$.data").is_empty());
7617 }
7618
7619 #[test]
7620 fn array_mixed_objects_and_scalars() {
7621 let v = json!({"data": ["keep", {"skip": true}, "also"]});
7622 assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7623 }
7624
7625 #[test]
7626 fn object_returns_empty() {
7627 let v = json!({"meta": {"k": "v"}});
7628 assert!(extract_json_path(&v, "$.meta").is_empty());
7629 }
7630
7631 #[test]
7632 fn no_prefix_returns_empty() {
7633 let v = json!({"name": "a"});
7634 assert!(extract_json_path(&v, "name").is_empty());
7635 }
7636 }
7637
7638 mod recursive_extraction_tests {
7639 use super::super::{
7640 LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7641 PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7642 };
7643 use serde_json::json;
7644
7645 fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7646 PropertyFtsSchema {
7647 paths,
7648 separator: " ".to_owned(),
7649 exclude_paths: Vec::new(),
7650 }
7651 }
7652
7653 fn schema_with_excludes(
7654 paths: Vec<PropertyPathEntry>,
7655 excludes: Vec<String>,
7656 ) -> PropertyFtsSchema {
7657 PropertyFtsSchema {
7658 paths,
7659 separator: " ".to_owned(),
7660 exclude_paths: excludes,
7661 }
7662 }
7663
7664 #[test]
7665 fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7666 let props = json!({"payload": {"b": "two", "a": "one"}});
7667 let (blob, positions, _stats) = extract_property_fts(
7668 &props,
7669 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7670 );
7671 let blob = blob.expect("blob emitted");
7672 let idx_one = blob.find("one").expect("contains 'one'");
7673 let idx_two = blob.find("two").expect("contains 'two'");
7674 assert!(
7675 idx_one < idx_two,
7676 "lex order: 'one' (key a) before 'two' (key b)"
7677 );
7678 assert_eq!(positions.len(), 2);
7679 assert_eq!(positions[0].leaf_path, "$.payload.a");
7680 assert_eq!(positions[1].leaf_path, "$.payload.b");
7681 }
7682
7683 #[test]
7684 fn recursive_extraction_walks_arrays_of_scalars() {
7685 let props = json!({"tags": ["red", "blue"]});
7686 let (_blob, positions, _stats) = extract_property_fts(
7687 &props,
7688 &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7689 );
7690 assert_eq!(positions.len(), 2);
7691 assert_eq!(positions[0].leaf_path, "$.tags[0]");
7692 assert_eq!(positions[1].leaf_path, "$.tags[1]");
7693 }
7694
7695 #[test]
7696 fn recursive_extraction_walks_arrays_of_objects() {
7697 let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7698 let (_blob, positions, _stats) = extract_property_fts(
7699 &props,
7700 &schema(vec![PropertyPathEntry::recursive("$.items")]),
7701 );
7702 assert_eq!(positions.len(), 2);
7703 assert_eq!(positions[0].leaf_path, "$.items[0].name");
7704 assert_eq!(positions[1].leaf_path, "$.items[1].name");
7705 }
7706
7707 #[test]
7708 fn recursive_extraction_stringifies_numbers_and_bools() {
7709 let props = json!({"root": {"n": 42, "ok": true}});
7710 let (blob, _positions, _stats) = extract_property_fts(
7711 &props,
7712 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7713 );
7714 let blob = blob.expect("blob emitted");
7715 assert!(blob.contains("42"));
7716 assert!(blob.contains("true"));
7717 }
7718
7719 #[test]
7720 fn recursive_extraction_skips_nulls_and_missing() {
7721 let props = json!({"root": {"x": null, "y": "present"}});
7722 let (blob, positions, _stats) = extract_property_fts(
7723 &props,
7724 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7725 );
7726 let blob = blob.expect("blob emitted");
7727 assert!(!blob.contains("null"));
7728 assert_eq!(positions.len(), 1);
7729 assert_eq!(positions[0].leaf_path, "$.root.y");
7730 }
7731
7732 #[test]
7733 fn recursive_extraction_respects_max_depth_guardrail() {
7734 let mut inner = json!("leaf-value");
7736 for _ in 0..10 {
7737 inner = json!({ "k": inner });
7738 }
7739 let props = json!({ "root": inner });
7740 let (blob, positions, stats) = extract_property_fts(
7741 &props,
7742 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7743 );
7744 assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7745 assert!(
7747 blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7748 "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7749 );
7750 let _ = positions;
7753 let _ = MAX_RECURSIVE_DEPTH;
7754 }
7755
7756 #[test]
7757 fn recursive_extraction_respects_max_bytes_guardrail() {
7758 let leaves: Vec<String> = (0..40)
7760 .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7761 .collect();
7762 let props = json!({ "root": leaves });
7763 let (blob, positions, stats) = extract_property_fts(
7764 &props,
7765 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7766 );
7767 assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7768 let blob = blob.expect("blob must still be emitted");
7769 assert!(
7770 blob.len() <= MAX_EXTRACTED_BYTES,
7771 "blob must not exceed MAX_EXTRACTED_BYTES"
7772 );
7773 for pos in &positions {
7775 assert!(pos.end_offset <= blob.len());
7776 let slice = &blob[pos.start_offset..pos.end_offset];
7777 assert!(!slice.is_empty());
7780 assert!(!slice.contains(LEAF_SEPARATOR));
7781 }
7782 assert!(!blob.ends_with(LEAF_SEPARATOR));
7784 }
7785
7786 #[test]
7787 fn recursive_extraction_respects_exclude_paths() {
7788 let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7789 let (blob, positions, stats) = extract_property_fts(
7790 &props,
7791 &schema_with_excludes(
7792 vec![PropertyPathEntry::recursive("$.payload")],
7793 vec!["$.payload.priv".to_owned()],
7794 ),
7795 );
7796 let blob = blob.expect("blob emitted");
7797 assert!(blob.contains("yes"));
7798 assert!(!blob.contains("no"));
7799 assert_eq!(positions.len(), 1);
7800 assert_eq!(positions[0].leaf_path, "$.payload.pub");
7801 assert!(stats.excluded_subtree > 0);
7802 }
7803
7804 #[test]
7805 fn position_map_entries_match_emitted_leaves_in_order() {
7806 let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7807 let (blob, positions, _stats) = extract_property_fts(
7808 &props,
7809 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7810 );
7811 let blob = blob.expect("blob emitted");
7812 assert_eq!(positions.len(), 3);
7813 assert_eq!(positions[0].leaf_path, "$.root.a");
7815 assert_eq!(positions[1].leaf_path, "$.root.b");
7816 assert_eq!(positions[2].leaf_path, "$.root.c");
7817 let mut prev_end: usize = 0;
7819 for (i, pos) in positions.iter().enumerate() {
7820 assert!(pos.start_offset >= prev_end);
7821 assert!(pos.end_offset > pos.start_offset);
7822 assert!(pos.end_offset <= blob.len());
7823 let slice = &blob[pos.start_offset..pos.end_offset];
7824 match i {
7825 0 => assert_eq!(slice, "alpha"),
7826 1 => assert_eq!(slice, "bravo"),
7827 2 => assert_eq!(slice, "charlie"),
7828 _ => unreachable!(),
7829 }
7830 prev_end = pos.end_offset;
7831 }
7832 }
7833
7834 #[test]
7835 fn scalar_only_schema_produces_empty_position_map() {
7836 let props = json!({"name": "alpha", "title": "beta"});
7837 let (blob, positions, _stats) = extract_property_fts(
7838 &props,
7839 &schema(vec![
7840 PropertyPathEntry::scalar("$.name"),
7841 PropertyPathEntry::scalar("$.title"),
7842 ]),
7843 );
7844 assert_eq!(blob.as_deref(), Some("alpha beta"));
7845 assert!(
7846 positions.is_empty(),
7847 "scalar-only schema must emit no position entries"
7848 );
7849 let _: Vec<PositionEntry> = positions;
7852 }
7853
7854 fn assert_unique_start_offsets(positions: &[PositionEntry]) {
7855 let mut seen = std::collections::HashSet::new();
7856 for pos in positions {
7857 let start = pos.start_offset;
7858 assert!(
7859 seen.insert(start),
7860 "duplicate start_offset {start} in positions {positions:?}"
7861 );
7862 }
7863 }
7864
7865 #[test]
7866 fn recursive_extraction_empty_then_nonempty_in_array() {
7867 let props = json!({"payload": {"xs": ["", "x"]}});
7868 let (combined, positions, _stats) = extract_property_fts(
7869 &props,
7870 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7871 );
7872 assert_eq!(combined.as_deref(), Some("x"));
7873 assert_eq!(positions.len(), 1);
7874 assert_eq!(positions[0].leaf_path, "$.payload.xs[1]");
7875 assert_eq!(positions[0].start_offset, 0);
7876 assert_eq!(positions[0].end_offset, 1);
7877 assert_unique_start_offsets(&positions);
7878 }
7879
7880 #[test]
7881 fn recursive_extraction_two_empties_then_nonempty_in_array() {
7882 let props = json!({"payload": {"xs": ["", "", "x"]}});
7883 let (combined, positions, _stats) = extract_property_fts(
7884 &props,
7885 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7886 );
7887 assert_eq!(combined.as_deref(), Some("x"));
7888 assert_eq!(positions.len(), 1);
7889 assert_eq!(positions[0].leaf_path, "$.payload.xs[2]");
7890 assert_eq!(positions[0].start_offset, 0);
7891 assert_eq!(positions[0].end_offset, 1);
7892 assert_unique_start_offsets(&positions);
7893 }
7894
7895 #[test]
7896 fn recursive_extraction_empty_then_nonempty_sibling_keys() {
7897 let props = json!({"payload": {"a": "", "b": "x"}});
7898 let (combined, positions, _stats) = extract_property_fts(
7899 &props,
7900 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7901 );
7902 assert_eq!(combined.as_deref(), Some("x"));
7903 assert_eq!(positions.len(), 1);
7904 assert_eq!(positions[0].leaf_path, "$.payload.b");
7905 assert_eq!(positions[0].start_offset, 0);
7906 assert_eq!(positions[0].end_offset, 1);
7907 assert_unique_start_offsets(&positions);
7908 }
7909
7910 #[test]
7911 fn recursive_extraction_nested_empty_then_nonempty_sibling_keys() {
7912 let props = json!({"payload": {"inner": {"a": "", "b": "x"}}});
7913 let (combined, positions, _stats) = extract_property_fts(
7914 &props,
7915 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7916 );
7917 assert_eq!(combined.as_deref(), Some("x"));
7918 assert_eq!(positions.len(), 1);
7919 assert_eq!(positions[0].leaf_path, "$.payload.inner.b");
7920 assert_eq!(positions[0].start_offset, 0);
7921 assert_eq!(positions[0].end_offset, 1);
7922 assert_unique_start_offsets(&positions);
7923 }
7924
7925 #[test]
7926 fn recursive_extraction_descent_past_empty_sibling_into_nested_subtree() {
7927 let props = json!({"payload": {"a": "", "b": {"c": "x"}}});
7928 let (combined, positions, _stats) = extract_property_fts(
7929 &props,
7930 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7931 );
7932 assert_eq!(combined.as_deref(), Some("x"));
7933 assert_eq!(positions.len(), 1);
7934 assert_eq!(positions[0].leaf_path, "$.payload.b.c");
7935 assert_eq!(positions[0].start_offset, 0);
7936 assert_eq!(positions[0].end_offset, 1);
7937 assert_unique_start_offsets(&positions);
7938 }
7939
7940 #[test]
7941 fn recursive_extraction_all_empty_shapes_emit_no_positions() {
7942 let cases = vec![
7943 json!({"payload": {}}),
7944 json!({"payload": {"a": ""}}),
7945 json!({"payload": {"xs": []}}),
7946 json!({"payload": {"xs": [""]}}),
7947 json!({"payload": {"xs": ["", ""]}}),
7948 json!({"payload": {"xs": ["", "", ""]}}),
7949 ];
7950 for case in cases {
7951 let (combined, positions, _stats) = extract_property_fts(
7952 &case,
7953 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7954 );
7955 assert!(
7956 combined.is_none(),
7957 "all-empty payload {case:?} must produce no combined text, got {combined:?}"
7958 );
7959 assert!(
7960 positions.is_empty(),
7961 "all-empty payload {case:?} must produce no positions, got {positions:?}"
7962 );
7963 }
7964 }
7965
7966 #[test]
7967 fn recursive_extraction_nonempty_then_empty_then_nonempty() {
7968 let props = json!({"payload": {"xs": ["x", "", "y"]}});
7969 let (combined, positions, _stats) = extract_property_fts(
7970 &props,
7971 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7972 );
7973 let blob = combined.expect("blob emitted");
7974 assert_eq!(positions.len(), 2);
7975 assert_eq!(positions[0].leaf_path, "$.payload.xs[0]");
7976 assert_eq!(positions[1].leaf_path, "$.payload.xs[2]");
7977 assert_eq!(positions[0].start_offset, 0);
7978 assert_eq!(positions[0].end_offset, 1);
7979 assert_eq!(positions[1].start_offset, 1 + LEAF_SEPARATOR.len());
7983 assert_eq!(positions[1].end_offset, 2 + LEAF_SEPARATOR.len());
7984 assert_eq!(
7985 &blob[positions[0].start_offset..positions[0].end_offset],
7986 "x"
7987 );
7988 assert_eq!(
7989 &blob[positions[1].start_offset..positions[1].end_offset],
7990 "y"
7991 );
7992 assert_unique_start_offsets(&positions);
7993 }
7994
7995 #[test]
7996 fn recursive_extraction_null_leaves_unchanged() {
7997 let props = json!({"payload": {"xs": [null, null]}});
7998 let (combined, positions, _stats) = extract_property_fts(
7999 &props,
8000 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
8001 );
8002 assert!(combined.is_none());
8003 assert!(positions.is_empty());
8004 }
8005 }
8006
8007 mod parse_property_schema_json_tests {
8008 use super::super::parse_property_schema_json;
8009
8010 #[test]
8011 fn parse_property_schema_json_preserves_weight() {
8012 let json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar"}]"#;
8013 let schema = parse_property_schema_json(json, " ");
8014 assert_eq!(schema.paths.len(), 2);
8015 let title = &schema.paths[0];
8016 assert_eq!(title.path, "$.title");
8017 assert_eq!(title.weight, Some(2.0_f32));
8018 let body = &schema.paths[1];
8019 assert_eq!(body.path, "$.body");
8020 assert_eq!(body.weight, None);
8021 }
8022 }
8023
8024 mod extract_property_fts_columns_tests {
8027 use serde_json::json;
8028
8029 use super::super::{PropertyFtsSchema, PropertyPathEntry, extract_property_fts_columns};
8030
8031 fn weighted_schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
8032 PropertyFtsSchema {
8033 paths,
8034 separator: " ".to_owned(),
8035 exclude_paths: Vec::new(),
8036 }
8037 }
8038
8039 #[test]
8040 fn extract_property_fts_columns_returns_per_spec_text() {
8041 let props = json!({"title": "Hello", "body": "World"});
8042 let mut title_entry = PropertyPathEntry::scalar("$.title");
8043 title_entry.weight = Some(2.0);
8044 let mut body_entry = PropertyPathEntry::scalar("$.body");
8045 body_entry.weight = Some(1.0);
8046 let schema = weighted_schema(vec![title_entry, body_entry]);
8047 let cols = extract_property_fts_columns(&props, &schema);
8048 assert_eq!(cols.len(), 2);
8049 assert_eq!(cols[0].0, "title");
8050 assert_eq!(cols[0].1, "Hello");
8051 assert_eq!(cols[1].0, "body");
8052 assert_eq!(cols[1].1, "World");
8053 }
8054
8055 #[test]
8056 fn extract_property_fts_columns_missing_field_returns_empty_string() {
8057 let props = json!({"title": "Hello"});
8058 let mut title_entry = PropertyPathEntry::scalar("$.title");
8059 title_entry.weight = Some(2.0);
8060 let mut body_entry = PropertyPathEntry::scalar("$.body");
8061 body_entry.weight = Some(1.0);
8062 let schema = weighted_schema(vec![title_entry, body_entry]);
8063 let cols = extract_property_fts_columns(&props, &schema);
8064 assert_eq!(cols.len(), 2);
8065 assert_eq!(cols[0].1, "Hello");
8066 assert_eq!(cols[1].1, "", "missing field yields empty string");
8067 }
8068 }
8069
8070 #[test]
8073 fn writer_inserts_per_column_for_weighted_schema() {
8074 use fathomdb_schema::fts_column_name;
8075
8076 let db = NamedTempFile::new().expect("temporary db");
8077 let schema_manager = Arc::new(SchemaManager::new());
8078
8079 {
8081 let conn = rusqlite::Connection::open(db.path()).expect("conn");
8082 schema_manager.bootstrap(&conn).expect("bootstrap");
8083
8084 let title_col = fts_column_name("$.title", false);
8085 let body_col = fts_column_name("$.body", false);
8086
8087 let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#.to_string();
8089 conn.execute(
8090 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
8091 VALUES (?1, ?2, ' ')",
8092 rusqlite::params!["Article", paths_json],
8093 )
8094 .expect("register schema");
8095
8096 conn.execute_batch(&format!(
8098 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_article USING fts5(\
8099 node_logical_id UNINDEXED, {title_col}, {body_col}, \
8100 tokenize = 'porter unicode61 remove_diacritics 2'\
8101 )"
8102 ))
8103 .expect("create weighted per-kind table");
8104 }
8105
8106 let writer = WriterActor::start(
8107 db.path(),
8108 Arc::clone(&schema_manager),
8109 ProvenanceMode::Warn,
8110 Arc::new(TelemetryCounters::default()),
8111 )
8112 .expect("writer");
8113
8114 writer
8115 .submit(WriteRequest {
8116 label: "insert".to_owned(),
8117 nodes: vec![NodeInsert {
8118 row_id: "row-1".to_owned(),
8119 logical_id: "article-1".to_owned(),
8120 kind: "Article".to_owned(),
8121 properties: r#"{"title":"Hello World","body":"Foo Bar"}"#.to_owned(),
8122 source_ref: Some("src-1".to_owned()),
8123 upsert: false,
8124 chunk_policy: ChunkPolicy::Preserve,
8125 content_ref: None,
8126 }],
8127 node_retires: vec![],
8128 edges: vec![],
8129 edge_retires: vec![],
8130 chunks: vec![],
8131 runs: vec![],
8132 steps: vec![],
8133 actions: vec![],
8134 optional_backfills: vec![],
8135 vec_inserts: vec![],
8136 operational_writes: vec![],
8137 })
8138 .expect("write");
8139
8140 let conn = rusqlite::Connection::open(db.path()).expect("conn");
8141 let title_col = fts_column_name("$.title", false);
8142 let body_col = fts_column_name("$.body", false);
8143
8144 let count: i64 = conn
8146 .query_row(
8147 "SELECT count(*) FROM fts_props_article WHERE node_logical_id = 'article-1'",
8148 [],
8149 |r| r.get(0),
8150 )
8151 .expect("count");
8152 assert_eq!(count, 1, "per-kind FTS table must have the row");
8153
8154 let (title_val, body_val): (String, String) = conn
8156 .query_row(
8157 &format!(
8158 "SELECT {title_col}, {body_col} FROM fts_props_article \
8159 WHERE node_logical_id = 'article-1'"
8160 ),
8161 [],
8162 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
8163 )
8164 .expect("select per-column");
8165 assert_eq!(
8166 title_val, "Hello World",
8167 "title column must have correct value"
8168 );
8169 assert_eq!(body_val, "Foo Bar", "body column must have correct value");
8170 }
8171}