1#![allow(deprecated)]
6
7mod fts_extract;
8
9#[allow(unused_imports)]
16pub(crate) use fts_extract::{
17 ExtractStats, LEAF_SEPARATOR, PositionEntry, PropertyFtsSchema, PropertyPathMode,
18 extract_property_fts, extract_property_fts_columns, load_fts_property_schemas,
19 parse_property_schema_json,
20};
21
22#[cfg(test)]
24#[allow(unused_imports)]
25pub(crate) use fts_extract::{
26 MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PropertyPathEntry, extract_json_path,
27};
28
29use std::collections::HashMap;
30use std::mem::ManuallyDrop;
31use std::panic::AssertUnwindSafe;
32use std::path::Path;
33use std::sync::Arc;
34use std::sync::mpsc::{self, Sender, SyncSender};
35use std::thread;
36use std::time::Duration;
37
38use fathomdb_schema::SchemaManager;
39use rusqlite::{OptionalExtension, TransactionBehavior, params};
40
41use crate::operational::{
42 OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
43 OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
44 OperationalValidationMode, extract_secondary_index_entries_for_current,
45 extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
46 parse_operational_validation_contract, validate_operational_payload_against_contract,
47};
48use crate::telemetry::TelemetryCounters;
49use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
50
51#[derive(Clone, Debug, PartialEq, Eq)]
53pub struct OptionalProjectionTask {
54 pub target: ProjectionTarget,
56 pub payload: String,
58}
59
60#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
62pub enum ChunkPolicy {
63 #[default]
65 Preserve,
66 Replace,
68}
69
70#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
72pub enum ProvenanceMode {
73 #[default]
75 Warn,
76 Require,
79}
80
81#[derive(Clone, Debug, PartialEq, Eq)]
83pub struct NodeInsert {
84 pub row_id: String,
85 pub logical_id: String,
86 pub kind: String,
87 pub properties: String,
88 pub source_ref: Option<String>,
89 pub upsert: bool,
92 pub chunk_policy: ChunkPolicy,
94 pub content_ref: Option<String>,
96}
97
98#[derive(Clone, Debug, PartialEq, Eq)]
100pub struct EdgeInsert {
101 pub row_id: String,
102 pub logical_id: String,
103 pub source_logical_id: String,
104 pub target_logical_id: String,
105 pub kind: String,
106 pub properties: String,
107 pub source_ref: Option<String>,
108 pub upsert: bool,
111}
112
113#[derive(Clone, Debug, PartialEq, Eq)]
115pub struct NodeRetire {
116 pub logical_id: String,
117 pub source_ref: Option<String>,
118}
119
120#[derive(Clone, Debug, PartialEq, Eq)]
122pub struct EdgeRetire {
123 pub logical_id: String,
124 pub source_ref: Option<String>,
125}
126
127#[derive(Clone, Debug, PartialEq, Eq)]
129pub struct ChunkInsert {
130 pub id: String,
131 pub node_logical_id: String,
132 pub text_content: String,
133 pub byte_start: Option<i64>,
134 pub byte_end: Option<i64>,
135 pub content_hash: Option<String>,
137}
138
139#[deprecated(
146 since = "0.6.0",
147 note = "raw VecInsert is the managed vector projection's internal wire format. Callers should configure embedding + configure_vec_kind and write nodes/chunks normally; the projection actor will produce vec rows automatically. This type will remain public for a transition window but may become crate-private in a future release."
148)]
149#[derive(Clone, Debug, PartialEq)]
150pub struct VecInsert {
151 pub chunk_id: String,
152 pub embedding: Vec<f32>,
153}
154
155#[derive(Clone, Debug, PartialEq, Eq)]
157pub enum OperationalWrite {
158 Append {
159 collection: String,
160 record_key: String,
161 payload_json: String,
162 source_ref: Option<String>,
163 },
164 Put {
165 collection: String,
166 record_key: String,
167 payload_json: String,
168 source_ref: Option<String>,
169 },
170 Delete {
171 collection: String,
172 record_key: String,
173 source_ref: Option<String>,
174 },
175}
176
177#[derive(Clone, Debug, PartialEq, Eq)]
179pub struct RunInsert {
180 pub id: String,
181 pub kind: String,
182 pub status: String,
183 pub properties: String,
184 pub source_ref: Option<String>,
185 pub upsert: bool,
186 pub supersedes_id: Option<String>,
187}
188
189#[derive(Clone, Debug, PartialEq, Eq)]
191pub struct StepInsert {
192 pub id: String,
193 pub run_id: String,
194 pub kind: String,
195 pub status: String,
196 pub properties: String,
197 pub source_ref: Option<String>,
198 pub upsert: bool,
199 pub supersedes_id: Option<String>,
200}
201
202#[derive(Clone, Debug, PartialEq, Eq)]
204pub struct ActionInsert {
205 pub id: String,
206 pub step_id: String,
207 pub kind: String,
208 pub status: String,
209 pub properties: String,
210 pub source_ref: Option<String>,
211 pub upsert: bool,
212 pub supersedes_id: Option<String>,
213}
214
215const MAX_NODES: usize = 10_000;
217const MAX_EDGES: usize = 10_000;
218const MAX_CHUNKS: usize = 50_000;
219const MAX_RETIRES: usize = 10_000;
220const MAX_RUNTIME_ITEMS: usize = 10_000;
221const MAX_OPERATIONAL: usize = 10_000;
222const MAX_TOTAL_ITEMS: usize = 100_000;
223
224const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
231
232#[derive(Clone, Debug, PartialEq)]
234pub struct WriteRequest {
235 pub label: String,
236 pub nodes: Vec<NodeInsert>,
237 pub node_retires: Vec<NodeRetire>,
238 pub edges: Vec<EdgeInsert>,
239 pub edge_retires: Vec<EdgeRetire>,
240 pub chunks: Vec<ChunkInsert>,
241 pub runs: Vec<RunInsert>,
242 pub steps: Vec<StepInsert>,
243 pub actions: Vec<ActionInsert>,
244 pub optional_backfills: Vec<OptionalProjectionTask>,
245 pub vec_inserts: Vec<VecInsert>,
248 pub operational_writes: Vec<OperationalWrite>,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
253pub struct WriteReceipt {
254 pub label: String,
255 pub optional_backfill_count: usize,
256 pub warnings: Vec<String>,
257 pub provenance_warnings: Vec<String>,
258}
259
260#[derive(Clone, Debug, PartialEq, Eq)]
262pub struct LastAccessTouchRequest {
263 pub logical_ids: Vec<String>,
264 pub touched_at: i64,
265 pub source_ref: Option<String>,
266}
267
268#[derive(Clone, Debug, PartialEq, Eq)]
270pub struct LastAccessTouchReport {
271 pub touched_logical_ids: usize,
272 pub touched_at: i64,
273}
274
275#[derive(Clone, Debug, PartialEq, Eq)]
276struct FtsProjectionRow {
277 chunk_id: String,
278 node_logical_id: String,
279 kind: String,
280 text_content: String,
281}
282
283#[derive(Clone, Debug, PartialEq, Eq)]
284struct FtsPropertyProjectionRow {
285 node_logical_id: String,
286 kind: String,
287 text_content: String,
288 positions: Vec<PositionEntry>,
289 columns: Vec<(String, String)>,
292}
293
294struct PreparedWrite {
295 label: String,
296 nodes: Vec<NodeInsert>,
297 node_retires: Vec<NodeRetire>,
298 edges: Vec<EdgeInsert>,
299 edge_retires: Vec<EdgeRetire>,
300 chunks: Vec<ChunkInsert>,
301 runs: Vec<RunInsert>,
302 steps: Vec<StepInsert>,
303 actions: Vec<ActionInsert>,
304 #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
307 vec_inserts: Vec<VecInsert>,
308 operational_writes: Vec<OperationalWrite>,
309 operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
310 operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
311 operational_validation_warnings: Vec<String>,
312 node_kinds: HashMap<String, String>,
315 required_fts_rows: Vec<FtsProjectionRow>,
317 required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
319 optional_backfills: Vec<OptionalProjectionTask>,
320}
321
322enum WriteMessage {
323 Submit {
324 prepared: Box<PreparedWrite>,
325 reply: Sender<Result<WriteReceipt, EngineError>>,
326 },
327 TouchLastAccessed {
328 request: LastAccessTouchRequest,
329 reply: Sender<Result<LastAccessTouchReport, EngineError>>,
330 },
331 ClaimVectorProjection {
332 request: VectorProjectionClaimRequest,
333 reply: Sender<Result<Vec<VectorWorkClaim>, EngineError>>,
334 },
335 ApplyVectorProjection {
336 request: VectorProjectionApplyRequest,
337 reply: Sender<Result<(), EngineError>>,
338 },
339}
340
341#[derive(Clone, Debug)]
344pub struct VectorProjectionClaimRequest {
345 pub min_priority: i64,
348 pub limit: usize,
350}
351
352#[derive(Clone, Debug)]
355pub struct VectorWorkClaim {
356 pub work_id: i64,
358 pub kind: String,
360 pub chunk_id: String,
362 pub canonical_hash: String,
364 pub embedding_profile_id: i64,
366 pub priority: i64,
368 pub text_content: String,
371 pub chunk_missing: bool,
373}
374
375#[derive(Debug, Default)]
377pub struct VectorProjectionApplyRequest {
378 pub successes: Vec<VectorProjectionSuccess>,
381 pub discards: Vec<VectorProjectionDiscard>,
384 pub reverts: Vec<i64>,
387 pub revert_error: Option<String>,
389}
390
391#[derive(Clone, Debug)]
393pub struct VectorProjectionSuccess {
394 pub work_id: i64,
396 pub kind: String,
398 pub chunk_id: String,
400 pub embedding: Vec<f32>,
402}
403
404#[derive(Clone, Debug)]
406pub struct VectorProjectionDiscard {
407 pub work_id: i64,
409 pub reason: Option<String>,
411}
412
413#[derive(Debug)]
418pub struct WriterActor {
419 sender: ManuallyDrop<SyncSender<WriteMessage>>,
420 thread_handle: Option<thread::JoinHandle<()>>,
421 provenance_mode: ProvenanceMode,
422 _telemetry: Arc<TelemetryCounters>,
424}
425
426impl WriterActor {
427 pub fn start(
430 path: impl AsRef<Path>,
431 schema_manager: Arc<SchemaManager>,
432 provenance_mode: ProvenanceMode,
433 telemetry: Arc<TelemetryCounters>,
434 ) -> Result<Self, EngineError> {
435 let database_path = path.as_ref().to_path_buf();
436 let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
437
438 let writer_telemetry = Arc::clone(&telemetry);
439 let handle = thread::Builder::new()
440 .name("fathomdb-writer".to_owned())
441 .spawn(move || {
442 writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
443 })
444 .map_err(EngineError::Io)?;
445
446 Ok(Self {
447 sender: ManuallyDrop::new(sender),
448 thread_handle: Some(handle),
449 provenance_mode,
450 _telemetry: telemetry,
451 })
452 }
453
454 fn is_thread_alive(&self) -> bool {
456 self.thread_handle
457 .as_ref()
458 .is_some_and(|h| !h.is_finished())
459 }
460
461 fn check_thread_alive(&self) -> Result<(), EngineError> {
463 if self.is_thread_alive() {
464 Ok(())
465 } else {
466 Err(EngineError::WriterRejected(
467 "writer thread has exited".to_owned(),
468 ))
469 }
470 }
471
472 pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
476 self.check_thread_alive()?;
477 let prepared = prepare_write(request, self.provenance_mode)?;
478 let (reply_tx, reply_rx) = mpsc::channel();
479 self.sender
480 .send(WriteMessage::Submit {
481 prepared: Box::new(prepared),
482 reply: reply_tx,
483 })
484 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
485
486 recv_with_timeout(&reply_rx)
487 }
488
489 pub fn touch_last_accessed(
493 &self,
494 request: LastAccessTouchRequest,
495 ) -> Result<LastAccessTouchReport, EngineError> {
496 self.check_thread_alive()?;
497 prepare_touch_last_accessed(&request, self.provenance_mode)?;
498 let (reply_tx, reply_rx) = mpsc::channel();
499 self.sender
500 .send(WriteMessage::TouchLastAccessed {
501 request,
502 reply: reply_tx,
503 })
504 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
505
506 recv_with_timeout(&reply_rx)
507 }
508
509 pub fn claim_vector_projection(
518 &self,
519 request: VectorProjectionClaimRequest,
520 ) -> Result<Vec<VectorWorkClaim>, EngineError> {
521 self.check_thread_alive()?;
522 let (reply_tx, reply_rx) = mpsc::channel();
523 self.sender
524 .send(WriteMessage::ClaimVectorProjection {
525 request,
526 reply: reply_tx,
527 })
528 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
529 recv_with_timeout(&reply_rx)
530 }
531
532 pub fn apply_vector_projection(
540 &self,
541 request: VectorProjectionApplyRequest,
542 ) -> Result<(), EngineError> {
543 self.check_thread_alive()?;
544 let (reply_tx, reply_rx) = mpsc::channel();
545 self.sender
546 .send(WriteMessage::ApplyVectorProjection {
547 request,
548 reply: reply_tx,
549 })
550 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
551 recv_with_timeout(&reply_rx)
552 }
553}
554
555#[cfg(not(feature = "tracing"))]
556#[allow(clippy::print_stderr)]
557fn stderr_panic_notice() {
558 eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
559}
560
561impl Drop for WriterActor {
562 fn drop(&mut self) {
563 unsafe { ManuallyDrop::drop(&mut self.sender) };
570
571 if let Some(handle) = self.thread_handle.take() {
573 match handle.join() {
574 Ok(()) => {}
575 Err(payload) => {
576 if std::thread::panicking() {
577 trace_warn!(
578 "writer thread panicked during shutdown (suppressed: already panicking)"
579 );
580 #[cfg(not(feature = "tracing"))]
581 stderr_panic_notice();
582 } else {
583 std::panic::resume_unwind(payload);
584 }
585 }
586 }
587 }
588 }
589}
590
591fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
593 rx.recv_timeout(WRITER_REPLY_TIMEOUT)
594 .map_err(|error| match error {
595 mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
596 "write timed out waiting for writer thread reply — the write may still commit"
597 .to_owned(),
598 ),
599 mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
600 })
601 .and_then(|result| result)
602}
603
604fn prepare_touch_last_accessed(
605 request: &LastAccessTouchRequest,
606 mode: ProvenanceMode,
607) -> Result<(), EngineError> {
608 if request.logical_ids.is_empty() {
609 return Err(EngineError::InvalidWrite(
610 "touch_last_accessed requires at least one logical_id".to_owned(),
611 ));
612 }
613 for logical_id in &request.logical_ids {
614 if logical_id.trim().is_empty() {
615 return Err(EngineError::InvalidWrite(
616 "touch_last_accessed requires non-empty logical_ids".to_owned(),
617 ));
618 }
619 }
620 if mode == ProvenanceMode::Require && request.source_ref.is_none() {
621 return Err(EngineError::InvalidWrite(
622 "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
623 .to_owned(),
624 ));
625 }
626 Ok(())
627}
628
629fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
630 let missing: Vec<String> = request
631 .nodes
632 .iter()
633 .filter(|n| n.source_ref.is_none())
634 .map(|n| format!("node '{}'", n.logical_id))
635 .chain(
636 request
637 .node_retires
638 .iter()
639 .filter(|r| r.source_ref.is_none())
640 .map(|r| format!("node retire '{}'", r.logical_id)),
641 )
642 .chain(
643 request
644 .edges
645 .iter()
646 .filter(|e| e.source_ref.is_none())
647 .map(|e| format!("edge '{}'", e.logical_id)),
648 )
649 .chain(
650 request
651 .edge_retires
652 .iter()
653 .filter(|r| r.source_ref.is_none())
654 .map(|r| format!("edge retire '{}'", r.logical_id)),
655 )
656 .chain(
657 request
658 .runs
659 .iter()
660 .filter(|r| r.source_ref.is_none())
661 .map(|r| format!("run '{}'", r.id)),
662 )
663 .chain(
664 request
665 .steps
666 .iter()
667 .filter(|s| s.source_ref.is_none())
668 .map(|s| format!("step '{}'", s.id)),
669 )
670 .chain(
671 request
672 .actions
673 .iter()
674 .filter(|a| a.source_ref.is_none())
675 .map(|a| format!("action '{}'", a.id)),
676 )
677 .chain(
678 request
679 .operational_writes
680 .iter()
681 .filter(|write| operational_write_source_ref(write).is_none())
682 .map(|write| {
683 format!(
684 "operational {} '{}:{}'",
685 operational_write_kind(write),
686 operational_write_collection(write),
687 operational_write_record_key(write)
688 )
689 }),
690 )
691 .collect();
692
693 if missing.is_empty() {
694 Ok(())
695 } else {
696 Err(EngineError::InvalidWrite(format!(
697 "ProvenanceMode::Require: missing source_ref on: {}",
698 missing.join(", ")
699 )))
700 }
701}
702
703fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
704 if request.nodes.len() > MAX_NODES {
705 return Err(EngineError::InvalidWrite(format!(
706 "too many nodes: {} exceeds limit of {MAX_NODES}",
707 request.nodes.len()
708 )));
709 }
710 if request.edges.len() > MAX_EDGES {
711 return Err(EngineError::InvalidWrite(format!(
712 "too many edges: {} exceeds limit of {MAX_EDGES}",
713 request.edges.len()
714 )));
715 }
716 if request.chunks.len() > MAX_CHUNKS {
717 return Err(EngineError::InvalidWrite(format!(
718 "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
719 request.chunks.len()
720 )));
721 }
722 let retires = request.node_retires.len() + request.edge_retires.len();
723 if retires > MAX_RETIRES {
724 return Err(EngineError::InvalidWrite(format!(
725 "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
726 )));
727 }
728 let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
729 if runtime_items > MAX_RUNTIME_ITEMS {
730 return Err(EngineError::InvalidWrite(format!(
731 "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
732 )));
733 }
734 if request.operational_writes.len() > MAX_OPERATIONAL {
735 return Err(EngineError::InvalidWrite(format!(
736 "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
737 request.operational_writes.len()
738 )));
739 }
740 let total = request.nodes.len()
741 + request.node_retires.len()
742 + request.edges.len()
743 + request.edge_retires.len()
744 + request.chunks.len()
745 + request.runs.len()
746 + request.steps.len()
747 + request.actions.len()
748 + request.vec_inserts.len()
749 + request.operational_writes.len();
750 if total > MAX_TOTAL_ITEMS {
751 return Err(EngineError::InvalidWrite(format!(
752 "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
753 )));
754 }
755 Ok(())
756}
757
758#[allow(clippy::too_many_lines)]
759fn prepare_write(
760 request: WriteRequest,
761 mode: ProvenanceMode,
762) -> Result<PreparedWrite, EngineError> {
763 validate_request_size(&request)?;
764
765 for node in &request.nodes {
767 if node.row_id.is_empty() {
768 return Err(EngineError::InvalidWrite(
769 "NodeInsert has empty row_id".to_owned(),
770 ));
771 }
772 if node.logical_id.is_empty() {
773 return Err(EngineError::InvalidWrite(
774 "NodeInsert has empty logical_id".to_owned(),
775 ));
776 }
777 }
778 for edge in &request.edges {
779 if edge.row_id.is_empty() {
780 return Err(EngineError::InvalidWrite(
781 "EdgeInsert has empty row_id".to_owned(),
782 ));
783 }
784 if edge.logical_id.is_empty() {
785 return Err(EngineError::InvalidWrite(
786 "EdgeInsert has empty logical_id".to_owned(),
787 ));
788 }
789 }
790 for chunk in &request.chunks {
791 if chunk.id.is_empty() {
792 return Err(EngineError::InvalidWrite(
793 "ChunkInsert has empty id".to_owned(),
794 ));
795 }
796 if chunk.text_content.is_empty() {
797 return Err(EngineError::InvalidWrite(format!(
798 "chunk '{}' has empty text_content; empty chunks are not allowed",
799 chunk.id
800 )));
801 }
802 }
803 for run in &request.runs {
804 if run.id.is_empty() {
805 return Err(EngineError::InvalidWrite(
806 "RunInsert has empty id".to_owned(),
807 ));
808 }
809 }
810 for step in &request.steps {
811 if step.id.is_empty() {
812 return Err(EngineError::InvalidWrite(
813 "StepInsert has empty id".to_owned(),
814 ));
815 }
816 }
817 for action in &request.actions {
818 if action.id.is_empty() {
819 return Err(EngineError::InvalidWrite(
820 "ActionInsert has empty id".to_owned(),
821 ));
822 }
823 }
824 for vi in &request.vec_inserts {
825 if vi.chunk_id.is_empty() {
826 return Err(EngineError::InvalidWrite(
827 "VecInsert has empty chunk_id".to_owned(),
828 ));
829 }
830 if vi.embedding.is_empty() {
831 return Err(EngineError::InvalidWrite(format!(
832 "VecInsert for chunk '{}' has empty embedding",
833 vi.chunk_id
834 )));
835 }
836 }
837 for operational in &request.operational_writes {
838 if operational_write_collection(operational).is_empty() {
839 return Err(EngineError::InvalidWrite(
840 "OperationalWrite has empty collection".to_owned(),
841 ));
842 }
843 if operational_write_record_key(operational).is_empty() {
844 return Err(EngineError::InvalidWrite(format!(
845 "OperationalWrite for collection '{}' has empty record_key",
846 operational_write_collection(operational)
847 )));
848 }
849 match operational {
850 OperationalWrite::Append { payload_json, .. }
851 | OperationalWrite::Put { payload_json, .. } => {
852 if payload_json.is_empty() {
853 return Err(EngineError::InvalidWrite(format!(
854 "OperationalWrite {} '{}:{}' has empty payload_json",
855 operational_write_kind(operational),
856 operational_write_collection(operational),
857 operational_write_record_key(operational)
858 )));
859 }
860 }
861 OperationalWrite::Delete { .. } => {}
862 }
863 }
864
865 {
867 let mut seen = std::collections::HashSet::new();
868 for node in &request.nodes {
869 if !seen.insert(node.row_id.as_str()) {
870 return Err(EngineError::InvalidWrite(format!(
871 "duplicate row_id '{}' within the same WriteRequest",
872 node.row_id
873 )));
874 }
875 }
876 for edge in &request.edges {
877 if !seen.insert(edge.row_id.as_str()) {
878 return Err(EngineError::InvalidWrite(format!(
879 "duplicate row_id '{}' within the same WriteRequest",
880 edge.row_id
881 )));
882 }
883 }
884 }
885
886 if mode == ProvenanceMode::Require {
888 check_require_provenance(&request)?;
889 }
890
891 for run in &request.runs {
893 if run.upsert && run.supersedes_id.is_none() {
894 return Err(EngineError::InvalidWrite(format!(
895 "run '{}': upsert=true requires supersedes_id to be set",
896 run.id
897 )));
898 }
899 }
900 for step in &request.steps {
901 if step.upsert && step.supersedes_id.is_none() {
902 return Err(EngineError::InvalidWrite(format!(
903 "step '{}': upsert=true requires supersedes_id to be set",
904 step.id
905 )));
906 }
907 }
908 for action in &request.actions {
909 if action.upsert && action.supersedes_id.is_none() {
910 return Err(EngineError::InvalidWrite(format!(
911 "action '{}': upsert=true requires supersedes_id to be set",
912 action.id
913 )));
914 }
915 }
916
917 let node_kinds = request
918 .nodes
919 .iter()
920 .map(|node| (node.logical_id.clone(), node.kind.clone()))
921 .collect::<HashMap<_, _>>();
922
923 Ok(PreparedWrite {
924 label: request.label,
925 nodes: request.nodes,
926 node_retires: request.node_retires,
927 edges: request.edges,
928 edge_retires: request.edge_retires,
929 chunks: request.chunks,
930 runs: request.runs,
931 steps: request.steps,
932 actions: request.actions,
933 vec_inserts: request.vec_inserts,
934 operational_writes: request.operational_writes,
935 operational_collection_kinds: HashMap::new(),
936 operational_collection_filter_fields: HashMap::new(),
937 operational_validation_warnings: Vec::new(),
938 node_kinds,
939 required_fts_rows: Vec::new(),
940 required_property_fts_rows: Vec::new(),
941 optional_backfills: request.optional_backfills,
942 })
943}
944
945fn writer_loop(
946 database_path: &Path,
947 schema_manager: &Arc<SchemaManager>,
948 receiver: mpsc::Receiver<WriteMessage>,
949 telemetry: &TelemetryCounters,
950) {
951 trace_info!("writer thread started");
952
953 let mut conn = match sqlite::open_connection(database_path) {
954 Ok(conn) => conn,
955 Err(error) => {
956 trace_error!(error = %error, "writer thread: database connection failed");
957 reject_all(receiver, &error.to_string());
958 return;
959 }
960 };
961
962 if let Err(error) = schema_manager.bootstrap(&conn) {
963 trace_error!(error = %error, "writer thread: schema bootstrap failed");
964 reject_all(receiver, &error.to_string());
965 return;
966 }
967
968 for message in receiver {
969 match message {
970 WriteMessage::Submit {
971 mut prepared,
972 reply,
973 } => {
974 #[cfg(feature = "tracing")]
975 let start = std::time::Instant::now();
976 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
977 resolve_and_apply(&mut conn, &mut prepared)
978 }));
979 if let Ok(inner) = result {
980 #[allow(unused_variables)]
981 if let Err(error) = &inner {
982 trace_error!(
983 label = %prepared.label,
984 error = %error,
985 "write failed"
986 );
987 telemetry.increment_errors();
988 } else {
989 let row_count = (prepared.nodes.len()
990 + prepared.edges.len()
991 + prepared.chunks.len()) as u64;
992 telemetry.increment_writes(row_count);
993 trace_info!(
994 label = %prepared.label,
995 nodes = prepared.nodes.len(),
996 edges = prepared.edges.len(),
997 chunks = prepared.chunks.len(),
998 duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
999 "write committed"
1000 );
1001 }
1002 let _ = reply.send(inner);
1003 } else {
1004 trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
1005 telemetry.increment_errors();
1006 let _ = conn.execute_batch("ROLLBACK");
1008 let _ = reply.send(Err(EngineError::WriterRejected(
1009 "writer thread panic during resolve_and_apply".to_owned(),
1010 )));
1011 }
1012 }
1013 WriteMessage::TouchLastAccessed { request, reply } => {
1014 let result = apply_touch_last_accessed(&mut conn, &request);
1015 if result.is_ok() {
1016 telemetry.increment_writes(0);
1017 } else {
1018 telemetry.increment_errors();
1019 }
1020 let _ = reply.send(result);
1021 }
1022 WriteMessage::ClaimVectorProjection { request, reply } => {
1023 let result = apply_claim_vector_projection(&mut conn, &request);
1024 let _ = reply.send(result);
1025 }
1026 WriteMessage::ApplyVectorProjection { request, reply } => {
1027 let result = apply_apply_vector_projection(&mut conn, schema_manager, &request);
1028 let _ = reply.send(result);
1029 }
1030 }
1031 }
1032
1033 trace_info!("writer thread shutting down");
1034}
1035
1036fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
1037 for message in receiver {
1038 match message {
1039 WriteMessage::Submit { reply, .. } => {
1040 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1041 }
1042 WriteMessage::TouchLastAccessed { reply, .. } => {
1043 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1044 }
1045 WriteMessage::ClaimVectorProjection { reply, .. } => {
1046 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1047 }
1048 WriteMessage::ApplyVectorProjection { reply, .. } => {
1049 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1050 }
1051 }
1052 }
1053}
1054
1055fn apply_claim_vector_projection(
1056 conn: &mut rusqlite::Connection,
1057 request: &VectorProjectionClaimRequest,
1058) -> Result<Vec<VectorWorkClaim>, EngineError> {
1059 let limit = i64::try_from(request.limit).unwrap_or(i64::MAX);
1060 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1061
1062 let ids: Vec<i64> = {
1064 let mut stmt = tx.prepare(
1065 "SELECT work_id FROM vector_projection_work \
1066 WHERE state = 'pending' AND priority >= ?1 \
1067 ORDER BY priority DESC, created_at ASC, work_id ASC \
1068 LIMIT ?2",
1069 )?;
1070 stmt.query_map(rusqlite::params![request.min_priority, limit], |r| {
1071 r.get::<_, i64>(0)
1072 })?
1073 .collect::<Result<Vec<_>, _>>()?
1074 };
1075
1076 if ids.is_empty() {
1077 tx.commit()?;
1078 return Ok(Vec::new());
1079 }
1080
1081 {
1083 let mut upd = tx.prepare(
1084 "UPDATE vector_projection_work \
1085 SET state = 'inflight', updated_at = unixepoch() \
1086 WHERE work_id = ?1",
1087 )?;
1088 for id in &ids {
1089 upd.execute(rusqlite::params![id])?;
1090 }
1091 }
1092
1093 let mut claims: Vec<VectorWorkClaim> = Vec::with_capacity(ids.len());
1095 {
1096 let mut sel = tx.prepare(
1097 "SELECT w.work_id, w.kind, w.chunk_id, w.canonical_hash, w.embedding_profile_id, \
1098 w.priority, c.text_content \
1099 FROM vector_projection_work w \
1100 LEFT JOIN chunks c ON c.id = w.chunk_id \
1101 WHERE w.work_id = ?1",
1102 )?;
1103 for id in &ids {
1104 let row = sel.query_row(rusqlite::params![id], |r| {
1105 let text_opt: Option<String> = r.get(6)?;
1106 Ok(VectorWorkClaim {
1107 work_id: r.get(0)?,
1108 kind: r.get(1)?,
1109 chunk_id: r.get(2)?,
1110 canonical_hash: r.get(3)?,
1111 embedding_profile_id: r.get(4)?,
1112 priority: r.get(5)?,
1113 chunk_missing: text_opt.is_none(),
1114 text_content: text_opt.unwrap_or_default(),
1115 })
1116 })?;
1117 claims.push(row);
1118 }
1119 }
1120
1121 tx.commit()?;
1122 Ok(claims)
1123}
1124
1125fn apply_apply_vector_projection(
1126 conn: &mut rusqlite::Connection,
1127 #[allow(unused_variables)] schema_manager: &Arc<SchemaManager>,
1128 request: &VectorProjectionApplyRequest,
1129) -> Result<(), EngineError> {
1130 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1131
1132 #[cfg(feature = "sqlite-vec")]
1134 {
1135 for success in &request.successes {
1136 let table_name = fathomdb_schema::vec_kind_table_name(&success.kind);
1137 let dimension = success.embedding.len();
1138 let bytes: Vec<u8> = success
1139 .embedding
1140 .iter()
1141 .flat_map(|f| f.to_le_bytes())
1142 .collect();
1143 tx.execute_batch(&format!(
1144 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1145 chunk_id TEXT PRIMARY KEY,\
1146 embedding float[{dimension}]\
1147 )"
1148 ))?;
1149 tx.execute(
1150 &format!(
1151 "INSERT OR REPLACE INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
1152 ),
1153 rusqlite::params![success.chunk_id, bytes],
1154 )?;
1155 tx.execute(
1156 "DELETE FROM vector_projection_work WHERE work_id = ?1",
1157 rusqlite::params![success.work_id],
1158 )?;
1159 }
1160 }
1161 #[cfg(not(feature = "sqlite-vec"))]
1162 {
1163 for success in &request.successes {
1166 tx.execute(
1167 "DELETE FROM vector_projection_work WHERE work_id = ?1",
1168 rusqlite::params![success.work_id],
1169 )?;
1170 }
1171 }
1172
1173 for discard in &request.discards {
1174 tx.execute(
1175 "UPDATE vector_projection_work \
1176 SET state = 'discarded', last_error = ?1, updated_at = unixepoch() \
1177 WHERE work_id = ?2",
1178 rusqlite::params![discard.reason, discard.work_id],
1179 )?;
1180 }
1181
1182 if !request.reverts.is_empty() {
1183 let err = request.revert_error.as_deref();
1184 for work_id in &request.reverts {
1185 tx.execute(
1186 "UPDATE vector_projection_work \
1187 SET state = 'pending', \
1188 attempt_count = attempt_count + 1, \
1189 last_error = ?1, \
1190 updated_at = unixepoch() \
1191 WHERE work_id = ?2",
1192 rusqlite::params![err, work_id],
1193 )?;
1194 }
1195 }
1196
1197 tx.commit()?;
1198 Ok(())
1199}
1200
1201fn resolve_fts_rows(
1209 conn: &rusqlite::Connection,
1210 prepared: &mut PreparedWrite,
1211) -> Result<(), EngineError> {
1212 let retiring_ids: std::collections::HashSet<&str> = prepared
1213 .node_retires
1214 .iter()
1215 .map(|r| r.logical_id.as_str())
1216 .collect();
1217 for chunk in &prepared.chunks {
1218 if retiring_ids.contains(chunk.node_logical_id.as_str()) {
1219 return Err(EngineError::InvalidWrite(format!(
1220 "chunk '{}' references node_logical_id '{}' which is being retired in the same \
1221 WriteRequest; retire and chunk insertion for the same node must not be combined",
1222 chunk.id, chunk.node_logical_id
1223 )));
1224 }
1225 }
1226 for chunk in &prepared.chunks {
1227 let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
1228 k.clone()
1229 } else {
1230 match conn.query_row(
1231 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1232 params![chunk.node_logical_id],
1233 |row| row.get::<_, String>(0),
1234 ) {
1235 Ok(kind) => kind,
1236 Err(rusqlite::Error::QueryReturnedNoRows) => {
1237 return Err(EngineError::InvalidWrite(format!(
1238 "chunk '{}' references node_logical_id '{}' that is not present in this \
1239 write request or the database \
1240 (v1 limitation: chunks and their nodes must be submitted together or the \
1241 node must already exist)",
1242 chunk.id, chunk.node_logical_id
1243 )));
1244 }
1245 Err(e) => return Err(EngineError::Sqlite(e)),
1246 }
1247 };
1248 prepared.required_fts_rows.push(FtsProjectionRow {
1249 chunk_id: chunk.id.clone(),
1250 node_logical_id: chunk.node_logical_id.clone(),
1251 kind,
1252 text_content: chunk.text_content.clone(),
1253 });
1254 }
1255 trace_debug!(
1256 fts_rows = prepared.required_fts_rows.len(),
1257 chunks_processed = prepared.chunks.len(),
1258 "fts row resolution completed"
1259 );
1260 Ok(())
1261}
1262
1263fn resolve_property_fts_rows(
1266 conn: &rusqlite::Connection,
1267 prepared: &mut PreparedWrite,
1268) -> Result<(), EngineError> {
1269 if prepared.nodes.is_empty() {
1270 return Ok(());
1271 }
1272
1273 let schemas: HashMap<String, PropertyFtsSchema> =
1274 load_fts_property_schemas(conn)?.into_iter().collect();
1275
1276 if schemas.is_empty() {
1277 return Ok(());
1278 }
1279
1280 let mut combined_stats = ExtractStats::default();
1281 for node in &prepared.nodes {
1282 let Some(schema) = schemas.get(&node.kind) else {
1283 continue;
1284 };
1285 let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
1286 let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
1287 let (text_content, positions, stats) = extract_property_fts(&props, schema);
1288 combined_stats.merge(stats);
1289 if let Some(text_content) = text_content {
1290 let columns = if has_weights {
1291 extract_property_fts_columns(&props, schema)
1292 } else {
1293 Vec::new()
1294 };
1295 prepared
1296 .required_property_fts_rows
1297 .push(FtsPropertyProjectionRow {
1298 node_logical_id: node.logical_id.clone(),
1299 kind: node.kind.clone(),
1300 text_content,
1301 positions,
1302 columns,
1303 });
1304 }
1305 }
1306 if combined_stats != ExtractStats::default() {
1307 trace_debug!(
1308 depth_cap_hit = combined_stats.depth_cap_hit,
1309 byte_cap_reached = combined_stats.byte_cap_reached,
1310 excluded_subtree = combined_stats.excluded_subtree,
1311 "property fts recursive extraction guardrails engaged"
1312 );
1313 }
1314 trace_debug!(
1315 property_fts_rows = prepared.required_property_fts_rows.len(),
1316 nodes_processed = prepared.nodes.len(),
1317 "property fts row resolution completed"
1318 );
1319 Ok(())
1320}
1321
1322fn resolve_operational_writes(
1323 conn: &rusqlite::Connection,
1324 prepared: &mut PreparedWrite,
1325) -> Result<(), EngineError> {
1326 let mut collection_kinds = HashMap::new();
1327 let mut collection_filter_fields = HashMap::new();
1328 let mut collection_validation_contracts = HashMap::new();
1329 for write in &prepared.operational_writes {
1330 let collection = operational_write_collection(write);
1331 if !collection_kinds.contains_key(collection) {
1332 let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1333 .query_row(
1334 "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1335 params![collection],
1336 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1337 )
1338 .optional()
1339 .map_err(EngineError::Sqlite)?;
1340 let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1341 .ok_or_else(|| {
1342 EngineError::InvalidWrite(format!(
1343 "operational collection '{collection}' is not registered"
1344 ))
1345 })?;
1346 if disabled_at.is_some() {
1347 return Err(EngineError::InvalidWrite(format!(
1348 "operational collection '{collection}' is disabled"
1349 )));
1350 }
1351 let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1352 .map_err(EngineError::InvalidWrite)?;
1353 let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1354 let validation_contract = parse_operational_validation_contract(&validation_json)
1355 .map_err(EngineError::InvalidWrite)?;
1356 collection_kinds.insert(collection.to_owned(), kind);
1357 collection_filter_fields.insert(collection.to_owned(), filter_fields);
1358 collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1359 }
1360
1361 let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1362 EngineError::InvalidWrite("missing operational collection kind".to_owned())
1363 })?;
1364 match (kind, write) {
1365 (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1366 | (
1367 OperationalCollectionKind::LatestState,
1368 OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1369 ) => {}
1370 (OperationalCollectionKind::AppendOnlyLog, _) => {
1371 return Err(EngineError::InvalidWrite(format!(
1372 "operational collection '{collection}' is append_only_log and only accepts Append"
1373 )));
1374 }
1375 (OperationalCollectionKind::LatestState, _) => {
1376 return Err(EngineError::InvalidWrite(format!(
1377 "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1378 )));
1379 }
1380 }
1381 if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1382 let _ = check_operational_write_against_contract(write, contract)?;
1383 }
1384 }
1385 prepared.operational_collection_kinds = collection_kinds;
1386 prepared.operational_collection_filter_fields = collection_filter_fields;
1387 Ok(())
1388}
1389
1390fn parse_operational_filter_fields(
1391 filter_fields_json: &str,
1392) -> Result<Vec<OperationalFilterField>, EngineError> {
1393 let fields: Vec<OperationalFilterField> =
1394 serde_json::from_str(filter_fields_json).map_err(|error| {
1395 EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1396 })?;
1397 let mut seen = std::collections::HashSet::new();
1398 for field in &fields {
1399 if field.name.trim().is_empty() {
1400 return Err(EngineError::InvalidWrite(
1401 "filter_fields_json field names must not be empty".to_owned(),
1402 ));
1403 }
1404 if !seen.insert(field.name.as_str()) {
1405 return Err(EngineError::InvalidWrite(format!(
1406 "filter_fields_json contains duplicate field '{}'",
1407 field.name
1408 )));
1409 }
1410 if field.modes.is_empty() {
1411 return Err(EngineError::InvalidWrite(format!(
1412 "filter_fields_json field '{}' must declare at least one mode",
1413 field.name
1414 )));
1415 }
1416 if field.modes.contains(&OperationalFilterMode::Prefix)
1417 && field.field_type != OperationalFilterFieldType::String
1418 {
1419 return Err(EngineError::InvalidWrite(format!(
1420 "filter field '{}' only supports prefix for string types",
1421 field.name
1422 )));
1423 }
1424 }
1425 Ok(fields)
1426}
1427
1428#[derive(Clone, Debug, PartialEq, Eq)]
1429struct OperationalFilterValueRow {
1430 field_name: String,
1431 string_value: Option<String>,
1432 integer_value: Option<i64>,
1433}
1434
1435fn extract_operational_filter_values(
1436 filter_fields: &[OperationalFilterField],
1437 payload_json: &str,
1438) -> Vec<OperationalFilterValueRow> {
1439 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1440 return Vec::new();
1441 };
1442 let Some(object) = parsed.as_object() else {
1443 return Vec::new();
1444 };
1445
1446 filter_fields
1447 .iter()
1448 .filter_map(|field| {
1449 let value = object.get(&field.name)?;
1450 match field.field_type {
1451 OperationalFilterFieldType::String => {
1452 value
1453 .as_str()
1454 .map(|string_value| OperationalFilterValueRow {
1455 field_name: field.name.clone(),
1456 string_value: Some(string_value.to_owned()),
1457 integer_value: None,
1458 })
1459 }
1460 OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1461 value
1462 .as_i64()
1463 .map(|integer_value| OperationalFilterValueRow {
1464 field_name: field.name.clone(),
1465 string_value: None,
1466 integer_value: Some(integer_value),
1467 })
1468 }
1469 }
1470 })
1471 .collect()
1472}
1473
1474fn resolve_and_apply(
1475 conn: &mut rusqlite::Connection,
1476 prepared: &mut PreparedWrite,
1477) -> Result<WriteReceipt, EngineError> {
1478 resolve_fts_rows(conn, prepared)?;
1479 resolve_property_fts_rows(conn, prepared)?;
1480 resolve_operational_writes(conn, prepared)?;
1481 apply_write(conn, prepared)
1482}
1483
1484fn apply_touch_last_accessed(
1485 conn: &mut rusqlite::Connection,
1486 request: &LastAccessTouchRequest,
1487) -> Result<LastAccessTouchReport, EngineError> {
1488 let mut seen = std::collections::HashSet::new();
1489 let logical_ids = request
1490 .logical_ids
1491 .iter()
1492 .filter(|logical_id| seen.insert(logical_id.as_str()))
1493 .cloned()
1494 .collect::<Vec<_>>();
1495 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1496
1497 for logical_id in &logical_ids {
1498 let exists = tx
1499 .query_row(
1500 "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1501 params![logical_id],
1502 |row| row.get::<_, i64>(0),
1503 )
1504 .optional()?
1505 .is_some();
1506 if !exists {
1507 return Err(EngineError::InvalidWrite(format!(
1508 "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1509 )));
1510 }
1511 }
1512
1513 {
1514 let mut upsert_metadata = tx.prepare_cached(
1515 "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1516 VALUES (?1, ?2, ?2) \
1517 ON CONFLICT(logical_id) DO UPDATE SET \
1518 last_accessed_at = excluded.last_accessed_at, \
1519 updated_at = excluded.updated_at",
1520 )?;
1521 let mut insert_provenance = tx.prepare_cached(
1522 "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1523 VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1524 )?;
1525 for logical_id in &logical_ids {
1526 upsert_metadata.execute(params![logical_id, request.touched_at])?;
1527 insert_provenance.execute(params![
1528 new_id(),
1529 logical_id,
1530 request.source_ref.as_deref(),
1531 format!("{{\"touched_at\":{}}}", request.touched_at),
1532 ])?;
1533 }
1534 }
1535
1536 tx.commit()?;
1537 Ok(LastAccessTouchReport {
1538 touched_logical_ids: logical_ids.len(),
1539 touched_at: request.touched_at,
1540 })
1541}
1542
1543fn ensure_operational_collections_writable(
1544 tx: &rusqlite::Transaction<'_>,
1545 prepared: &PreparedWrite,
1546) -> Result<(), EngineError> {
1547 for collection in prepared.operational_collection_kinds.keys() {
1548 let disabled_at: Option<Option<i64>> = tx
1549 .query_row(
1550 "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1551 params![collection],
1552 |row| row.get::<_, Option<i64>>(0),
1553 )
1554 .optional()?;
1555 match disabled_at {
1556 Some(Some(_)) => {
1557 return Err(EngineError::InvalidWrite(format!(
1558 "operational collection '{collection}' is disabled"
1559 )));
1560 }
1561 Some(None) => {}
1562 None => {
1563 return Err(EngineError::InvalidWrite(format!(
1564 "operational collection '{collection}' is not registered"
1565 )));
1566 }
1567 }
1568 }
1569 Ok(())
1570}
1571
1572fn validate_operational_writes_against_live_contracts(
1573 tx: &rusqlite::Transaction<'_>,
1574 prepared: &PreparedWrite,
1575) -> Result<Vec<String>, EngineError> {
1576 let mut collection_validation_contracts =
1577 HashMap::<String, Option<OperationalValidationContract>>::new();
1578 for collection in prepared.operational_collection_kinds.keys() {
1579 let validation_json: String = tx
1580 .query_row(
1581 "SELECT validation_json FROM operational_collections WHERE name = ?1",
1582 params![collection],
1583 |row| row.get(0),
1584 )
1585 .map_err(EngineError::Sqlite)?;
1586 let validation_contract = parse_operational_validation_contract(&validation_json)
1587 .map_err(EngineError::InvalidWrite)?;
1588 collection_validation_contracts.insert(collection.clone(), validation_contract);
1589 }
1590
1591 let mut warnings = Vec::new();
1592 for write in &prepared.operational_writes {
1593 if let Some(Some(contract)) =
1594 collection_validation_contracts.get(operational_write_collection(write))
1595 && let Some(warning) = check_operational_write_against_contract(write, contract)?
1596 {
1597 warnings.push(warning);
1598 }
1599 }
1600
1601 Ok(warnings)
1602}
1603
1604fn load_live_operational_secondary_indexes(
1605 tx: &rusqlite::Transaction<'_>,
1606 prepared: &PreparedWrite,
1607) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1608 let mut collection_indexes = HashMap::new();
1609 for (collection, collection_kind) in &prepared.operational_collection_kinds {
1610 let secondary_indexes_json: String = tx
1611 .query_row(
1612 "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1613 params![collection],
1614 |row| row.get(0),
1615 )
1616 .map_err(EngineError::Sqlite)?;
1617 let indexes =
1618 parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1619 .map_err(EngineError::InvalidWrite)?;
1620 collection_indexes.insert(collection.clone(), indexes);
1621 }
1622 Ok(collection_indexes)
1623}
1624
1625fn check_operational_write_against_contract(
1626 write: &OperationalWrite,
1627 contract: &OperationalValidationContract,
1628) -> Result<Option<String>, EngineError> {
1629 if contract.mode == OperationalValidationMode::Disabled {
1630 return Ok(None);
1631 }
1632
1633 let (payload_json, collection, record_key) = match write {
1634 OperationalWrite::Append {
1635 collection,
1636 record_key,
1637 payload_json,
1638 ..
1639 }
1640 | OperationalWrite::Put {
1641 collection,
1642 record_key,
1643 payload_json,
1644 ..
1645 } => (
1646 payload_json.as_str(),
1647 collection.as_str(),
1648 record_key.as_str(),
1649 ),
1650 OperationalWrite::Delete { .. } => return Ok(None),
1651 };
1652
1653 match validate_operational_payload_against_contract(contract, payload_json) {
1654 Ok(()) => Ok(None),
1655 Err(message) => match contract.mode {
1656 OperationalValidationMode::Disabled => Ok(None),
1657 OperationalValidationMode::ReportOnly => Ok(Some(format!(
1658 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1659 kind = operational_write_kind(write)
1660 ))),
1661 OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1662 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1663 kind = operational_write_kind(write)
1664 ))),
1665 },
1666 }
1667}
1668
1669#[allow(clippy::too_many_lines)]
1670fn apply_write(
1671 conn: &mut rusqlite::Connection,
1672 prepared: &mut PreparedWrite,
1673) -> Result<WriteReceipt, EngineError> {
1674 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1675
1676 {
1679 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1680 let mut del_prop_positions = tx
1681 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1682 let mut del_staging = tx.prepare_cached(
1685 "DELETE FROM fts_property_rebuild_staging WHERE node_logical_id = ?1",
1686 )?;
1687 let mut sup_node = tx.prepare_cached(
1688 "UPDATE nodes SET superseded_at = unixepoch() \
1689 WHERE logical_id = ?1 AND superseded_at IS NULL",
1690 )?;
1691 let mut ins_event = tx.prepare_cached(
1692 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1693 VALUES (?1, 'node_retire', ?2, ?3)",
1694 )?;
1695 for retire in &prepared.node_retires {
1696 del_fts.execute(params![retire.logical_id])?;
1697 let kind_opt: Option<String> = tx
1699 .query_row(
1700 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1701 params![retire.logical_id],
1702 |r| r.get(0),
1703 )
1704 .optional()?;
1705 if let Some(kind) = kind_opt {
1706 let table = fathomdb_schema::fts_kind_table_name(&kind);
1707 let table_exists: bool = tx
1709 .query_row(
1710 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1711 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1712 rusqlite::params![table],
1713 |r| r.get::<_, i64>(0),
1714 )
1715 .unwrap_or(0)
1716 > 0;
1717 if table_exists {
1718 tx.execute(
1719 &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1720 params![retire.logical_id],
1721 )?;
1722 }
1723 }
1724 del_prop_positions.execute(params![retire.logical_id])?;
1725 del_staging.execute(params![retire.logical_id])?;
1726 tx.execute(
1729 "DELETE FROM vector_projection_work \
1730 WHERE state = 'pending' AND chunk_id IN \
1731 (SELECT id FROM chunks WHERE node_logical_id = ?1)",
1732 params![retire.logical_id],
1733 )?;
1734 sup_node.execute(params![retire.logical_id])?;
1735 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1736 }
1737 }
1738
1739 {
1741 let mut sup_edge = tx.prepare_cached(
1742 "UPDATE edges SET superseded_at = unixepoch() \
1743 WHERE logical_id = ?1 AND superseded_at IS NULL",
1744 )?;
1745 let mut ins_event = tx.prepare_cached(
1746 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1747 VALUES (?1, 'edge_retire', ?2, ?3)",
1748 )?;
1749 for retire in &prepared.edge_retires {
1750 sup_edge.execute(params![retire.logical_id])?;
1751 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1752 }
1753 }
1754
1755 {
1757 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1758 let mut del_prop_positions = tx
1759 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1760 let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1761 let mut sup_node = tx.prepare_cached(
1762 "UPDATE nodes SET superseded_at = unixepoch() \
1763 WHERE logical_id = ?1 AND superseded_at IS NULL",
1764 )?;
1765 let mut ins_node = tx.prepare_cached(
1766 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1767 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1768 )?;
1769 for node in &prepared.nodes {
1770 if node.upsert {
1771 let table = fathomdb_schema::fts_kind_table_name(&node.kind);
1774 let table_exists: bool = tx
1776 .query_row(
1777 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1778 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1779 rusqlite::params![table],
1780 |r| r.get::<_, i64>(0),
1781 )
1782 .unwrap_or(0)
1783 > 0;
1784 if table_exists {
1785 tx.execute(
1786 &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1787 params![node.logical_id],
1788 )?;
1789 }
1790 del_prop_positions.execute(params![node.logical_id])?;
1791 if node.chunk_policy == ChunkPolicy::Replace {
1792 #[cfg(feature = "sqlite-vec")]
1794 {
1795 let vec_table = fathomdb_schema::vec_kind_table_name(&node.kind);
1796 let del_sql = format!(
1797 "DELETE FROM {vec_table} WHERE chunk_id IN \
1798 (SELECT id FROM chunks WHERE node_logical_id = ?1)"
1799 );
1800 match tx.execute(&del_sql, params![node.logical_id]) {
1801 Ok(_) => {}
1802 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {}
1803 Err(e) => return Err(e.into()),
1804 }
1805 }
1806 del_fts.execute(params![node.logical_id])?;
1807 tx.execute(
1810 "DELETE FROM vector_projection_work \
1811 WHERE state = 'pending' AND chunk_id IN \
1812 (SELECT id FROM chunks WHERE node_logical_id = ?1)",
1813 params![node.logical_id],
1814 )?;
1815 del_chunks.execute(params![node.logical_id])?;
1816 }
1817 sup_node.execute(params![node.logical_id])?;
1818 }
1819 ins_node.execute(params![
1820 node.row_id,
1821 node.logical_id,
1822 node.kind,
1823 node.properties,
1824 node.source_ref,
1825 node.content_ref,
1826 ])?;
1827 }
1828 }
1829
1830 {
1832 let mut sup_edge = tx.prepare_cached(
1833 "UPDATE edges SET superseded_at = unixepoch() \
1834 WHERE logical_id = ?1 AND superseded_at IS NULL",
1835 )?;
1836 let mut ins_edge = tx.prepare_cached(
1837 "INSERT INTO edges \
1838 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1839 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1840 )?;
1841 for edge in &prepared.edges {
1842 if edge.upsert {
1843 sup_edge.execute(params![edge.logical_id])?;
1844 }
1845 ins_edge.execute(params![
1846 edge.row_id,
1847 edge.logical_id,
1848 edge.source_logical_id,
1849 edge.target_logical_id,
1850 edge.kind,
1851 edge.properties,
1852 edge.source_ref,
1853 ])?;
1854 }
1855 }
1856
1857 {
1859 let mut ins_chunk = tx.prepare_cached(
1860 "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
1861 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1862 )?;
1863 for chunk in &prepared.chunks {
1864 ins_chunk.execute(params![
1865 chunk.id,
1866 chunk.node_logical_id,
1867 chunk.text_content,
1868 chunk.byte_start,
1869 chunk.byte_end,
1870 chunk.content_hash,
1871 ])?;
1872 }
1873 }
1874
1875 maybe_enqueue_vector_projection_work(&tx, prepared)?;
1879
1880 {
1882 let mut sup_run = tx.prepare_cached(
1883 "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1884 )?;
1885 let mut ins_run = tx.prepare_cached(
1886 "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1887 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1888 )?;
1889 for run in &prepared.runs {
1890 if run.upsert
1891 && let Some(ref prior_id) = run.supersedes_id
1892 {
1893 sup_run.execute(params![prior_id])?;
1894 }
1895 ins_run.execute(params![
1896 run.id,
1897 run.kind,
1898 run.status,
1899 run.properties,
1900 run.source_ref
1901 ])?;
1902 }
1903 }
1904
1905 {
1907 let mut sup_step = tx.prepare_cached(
1908 "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1909 )?;
1910 let mut ins_step = tx.prepare_cached(
1911 "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1912 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1913 )?;
1914 for step in &prepared.steps {
1915 if step.upsert
1916 && let Some(ref prior_id) = step.supersedes_id
1917 {
1918 sup_step.execute(params![prior_id])?;
1919 }
1920 ins_step.execute(params![
1921 step.id,
1922 step.run_id,
1923 step.kind,
1924 step.status,
1925 step.properties,
1926 step.source_ref,
1927 ])?;
1928 }
1929 }
1930
1931 {
1933 let mut sup_action = tx.prepare_cached(
1934 "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1935 )?;
1936 let mut ins_action = tx.prepare_cached(
1937 "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1938 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1939 )?;
1940 for action in &prepared.actions {
1941 if action.upsert
1942 && let Some(ref prior_id) = action.supersedes_id
1943 {
1944 sup_action.execute(params![prior_id])?;
1945 }
1946 ins_action.execute(params![
1947 action.id,
1948 action.step_id,
1949 action.kind,
1950 action.status,
1951 action.properties,
1952 action.source_ref,
1953 ])?;
1954 }
1955 }
1956
1957 {
1959 ensure_operational_collections_writable(&tx, prepared)?;
1960 prepared.operational_validation_warnings =
1961 validate_operational_writes_against_live_contracts(&tx, prepared)?;
1962 let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
1963
1964 let mut next_mutation_order: i64 = tx.query_row(
1965 "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
1966 [],
1967 |row| row.get(0),
1968 )?;
1969 let mut ins_mutation = tx.prepare_cached(
1970 "INSERT INTO operational_mutations \
1971 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1972 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1973 )?;
1974 let mut ins_filter_value = tx.prepare_cached(
1975 "INSERT INTO operational_filter_values \
1976 (mutation_id, collection_name, field_name, string_value, integer_value) \
1977 VALUES (?1, ?2, ?3, ?4, ?5)",
1978 )?;
1979 let mut upsert_current = tx.prepare_cached(
1980 "INSERT INTO operational_current \
1981 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1982 VALUES (?1, ?2, ?3, unixepoch(), ?4) \
1983 ON CONFLICT(collection_name, record_key) DO UPDATE SET \
1984 payload_json = excluded.payload_json, \
1985 updated_at = excluded.updated_at, \
1986 last_mutation_id = excluded.last_mutation_id",
1987 )?;
1988 let mut del_current = tx.prepare_cached(
1989 "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
1990 )?;
1991 let mut del_current_secondary_indexes = tx.prepare_cached(
1992 "DELETE FROM operational_secondary_index_entries \
1993 WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
1994 )?;
1995 let mut ins_secondary_index = tx.prepare_cached(
1996 "INSERT INTO operational_secondary_index_entries \
1997 (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
1998 slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
1999 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
2000 )?;
2001 let mut current_row_stmt = tx.prepare_cached(
2002 "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
2003 WHERE collection_name = ?1 AND record_key = ?2",
2004 )?;
2005
2006 for write in &prepared.operational_writes {
2007 let collection = operational_write_collection(write);
2008 let record_key = operational_write_record_key(write);
2009 let mutation_id = new_id();
2010 next_mutation_order += 1;
2011 let payload_json = operational_write_payload(write);
2012 ins_mutation.execute(params![
2013 &mutation_id,
2014 collection,
2015 record_key,
2016 operational_write_kind(write),
2017 payload_json,
2018 operational_write_source_ref(write),
2019 next_mutation_order,
2020 ])?;
2021 if let Some(indexes) = collection_secondary_indexes.get(collection) {
2022 for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
2023 ins_secondary_index.execute(params![
2024 collection,
2025 entry.index_name,
2026 "mutation",
2027 &mutation_id,
2028 record_key,
2029 entry.sort_timestamp,
2030 entry.slot1_text,
2031 entry.slot1_integer,
2032 entry.slot2_text,
2033 entry.slot2_integer,
2034 entry.slot3_text,
2035 entry.slot3_integer,
2036 ])?;
2037 }
2038 }
2039 if let Some(filter_fields) = prepared
2040 .operational_collection_filter_fields
2041 .get(collection)
2042 {
2043 for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
2044 ins_filter_value.execute(params![
2045 &mutation_id,
2046 collection,
2047 filter_value.field_name,
2048 filter_value.string_value,
2049 filter_value.integer_value,
2050 ])?;
2051 }
2052 }
2053
2054 if prepared.operational_collection_kinds.get(collection)
2055 == Some(&OperationalCollectionKind::LatestState)
2056 {
2057 del_current_secondary_indexes.execute(params![collection, record_key])?;
2058 match write {
2059 OperationalWrite::Put { payload_json, .. } => {
2060 upsert_current.execute(params![
2061 collection,
2062 record_key,
2063 payload_json,
2064 &mutation_id,
2065 ])?;
2066 if let Some(indexes) = collection_secondary_indexes.get(collection) {
2067 let (current_payload_json, updated_at, last_mutation_id): (
2068 String,
2069 i64,
2070 String,
2071 ) = current_row_stmt
2072 .query_row(params![collection, record_key], |row| {
2073 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
2074 })?;
2075 for entry in extract_secondary_index_entries_for_current(
2076 indexes,
2077 ¤t_payload_json,
2078 updated_at,
2079 ) {
2080 ins_secondary_index.execute(params![
2081 collection,
2082 entry.index_name,
2083 "current",
2084 last_mutation_id.as_str(),
2085 record_key,
2086 entry.sort_timestamp,
2087 entry.slot1_text,
2088 entry.slot1_integer,
2089 entry.slot2_text,
2090 entry.slot2_integer,
2091 entry.slot3_text,
2092 entry.slot3_integer,
2093 ])?;
2094 }
2095 }
2096 }
2097 OperationalWrite::Delete { .. } => {
2098 del_current.execute(params![collection, record_key])?;
2099 }
2100 OperationalWrite::Append { .. } => {}
2101 }
2102 }
2103 }
2104 }
2105
2106 {
2108 let mut ins_fts = tx.prepare_cached(
2109 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
2110 VALUES (?1, ?2, ?3, ?4)",
2111 )?;
2112 for fts_row in &prepared.required_fts_rows {
2113 ins_fts.execute(params![
2114 fts_row.chunk_id,
2115 fts_row.node_logical_id,
2116 fts_row.kind,
2117 fts_row.text_content,
2118 ])?;
2119 }
2120 }
2121
2122 if !prepared.required_property_fts_rows.is_empty() {
2124 let mut ins_positions = tx.prepare_cached(
2125 "INSERT INTO fts_node_property_positions \
2126 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
2127 VALUES (?1, ?2, ?3, ?4, ?5)",
2128 )?;
2129 let mut del_positions = tx
2132 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
2133 for row in &prepared.required_property_fts_rows {
2134 del_positions.execute(params![row.node_logical_id])?;
2135 let table = fathomdb_schema::fts_kind_table_name(&row.kind);
2137 ensure_property_fts_table_for_row(&tx, &row.kind, &row.columns)?;
2138 if row.columns.is_empty() {
2139 tx.prepare(&format!(
2140 "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
2141 ))?
2142 .execute(params![row.node_logical_id, row.text_content])?;
2143 } else {
2144 let col_names: Vec<&str> = row.columns.iter().map(|(n, _)| n.as_str()).collect();
2146 let placeholders: Vec<String> = (2..=row.columns.len() + 1)
2147 .map(|i| format!("?{i}"))
2148 .collect();
2149 let sql = format!(
2150 "INSERT INTO {table}(node_logical_id, {cols}) VALUES (?1, {placeholders})",
2151 cols = col_names.join(", "),
2152 placeholders = placeholders.join(", "),
2153 );
2154 let mut stmt = tx.prepare(&sql)?;
2155 stmt.execute(rusqlite::params_from_iter(
2156 std::iter::once(row.node_logical_id.as_str())
2157 .chain(row.columns.iter().map(|(_, v)| v.as_str())),
2158 ))?;
2159 }
2160 for pos in &row.positions {
2161 ins_positions.execute(params![
2162 row.node_logical_id,
2163 row.kind,
2164 i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
2165 i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
2166 pos.leaf_path,
2167 ])?;
2168 }
2169 }
2170 }
2171
2172 if !prepared.required_property_fts_rows.is_empty() {
2177 let mut ins_staging_simple = tx.prepare_cached(
2178 "INSERT INTO fts_property_rebuild_staging \
2179 (kind, node_logical_id, text_content) \
2180 VALUES (?1, ?2, ?3) \
2181 ON CONFLICT(kind, node_logical_id) DO UPDATE \
2182 SET text_content = excluded.text_content",
2183 )?;
2184 let mut ins_staging_columns = tx.prepare_cached(
2185 "INSERT INTO fts_property_rebuild_staging \
2186 (kind, node_logical_id, text_content, columns_json) \
2187 VALUES (?1, ?2, ?3, ?4) \
2188 ON CONFLICT(kind, node_logical_id) DO UPDATE \
2189 SET text_content = excluded.text_content, \
2190 columns_json = excluded.columns_json",
2191 )?;
2192 let mut check_rebuild = tx.prepare_cached(
2193 "SELECT 1 FROM fts_property_rebuild_state \
2194 WHERE kind = ?1 AND state IN ('PENDING','BUILDING','SWAPPING') LIMIT 1",
2195 )?;
2196 for row in &prepared.required_property_fts_rows {
2197 let in_rebuild: bool = check_rebuild
2198 .query_row(params![row.kind], |_| Ok(true))
2199 .optional()?
2200 .unwrap_or(false);
2201 if in_rebuild {
2202 if row.columns.is_empty() {
2203 ins_staging_simple.execute(params![
2204 row.kind,
2205 row.node_logical_id,
2206 row.text_content
2207 ])?;
2208 } else {
2209 let json_map: serde_json::Map<String, serde_json::Value> = row
2210 .columns
2211 .iter()
2212 .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
2213 .collect();
2214 let columns_json =
2215 serde_json::to_string(&serde_json::Value::Object(json_map)).ok();
2216 ins_staging_columns.execute(params![
2217 row.kind,
2218 row.node_logical_id,
2219 row.text_content,
2220 columns_json
2221 ])?;
2222 }
2223 }
2224 }
2225 }
2226
2227 #[cfg(feature = "sqlite-vec")]
2230 {
2231 let chunk_to_node: std::collections::HashMap<&str, &str> = prepared
2233 .chunks
2234 .iter()
2235 .map(|c| (c.id.as_str(), c.node_logical_id.as_str()))
2236 .collect();
2237
2238 for vi in &prepared.vec_inserts {
2239 let kind: Option<String> = chunk_to_node
2241 .get(vi.chunk_id.as_str())
2242 .and_then(|lid| prepared.node_kinds.get(*lid))
2243 .cloned()
2244 .or_else(|| {
2245 tx.query_row(
2246 "SELECT n.kind FROM chunks c \
2247 JOIN nodes n ON n.logical_id = c.node_logical_id \
2248 WHERE c.id = ?1 LIMIT 1",
2249 rusqlite::params![vi.chunk_id],
2250 |row| row.get::<_, String>(0),
2251 )
2252 .optional()
2253 .unwrap_or(None)
2254 });
2255
2256 let Some(kind) = kind else {
2257 continue;
2259 };
2260
2261 let table_name = fathomdb_schema::vec_kind_table_name(&kind);
2262 let dimension = vi.embedding.len();
2263 let bytes: Vec<u8> = vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
2264 tx.execute_batch(&format!(
2266 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
2267 chunk_id TEXT PRIMARY KEY,\
2268 embedding float[{dimension}]\
2269 )"
2270 ))?;
2271 tx.execute(
2272 &format!(
2273 "INSERT OR REPLACE INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
2274 ),
2275 rusqlite::params![vi.chunk_id, bytes],
2276 )?;
2277 }
2278 }
2279
2280 tx.commit()?;
2281
2282 let provenance_warnings: Vec<String> = prepared
2283 .nodes
2284 .iter()
2285 .filter(|node| node.source_ref.is_none())
2286 .map(|node| format!("node '{}' has no source_ref", node.logical_id))
2287 .chain(
2288 prepared
2289 .node_retires
2290 .iter()
2291 .filter(|r| r.source_ref.is_none())
2292 .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
2293 )
2294 .chain(
2295 prepared
2296 .edges
2297 .iter()
2298 .filter(|e| e.source_ref.is_none())
2299 .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
2300 )
2301 .chain(
2302 prepared
2303 .edge_retires
2304 .iter()
2305 .filter(|r| r.source_ref.is_none())
2306 .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
2307 )
2308 .chain(
2309 prepared
2310 .runs
2311 .iter()
2312 .filter(|r| r.source_ref.is_none())
2313 .map(|r| format!("run '{}' has no source_ref", r.id)),
2314 )
2315 .chain(
2316 prepared
2317 .steps
2318 .iter()
2319 .filter(|s| s.source_ref.is_none())
2320 .map(|s| format!("step '{}' has no source_ref", s.id)),
2321 )
2322 .chain(
2323 prepared
2324 .actions
2325 .iter()
2326 .filter(|a| a.source_ref.is_none())
2327 .map(|a| format!("action '{}' has no source_ref", a.id)),
2328 )
2329 .chain(
2330 prepared
2331 .operational_writes
2332 .iter()
2333 .filter(|write| operational_write_source_ref(write).is_none())
2334 .map(|write| {
2335 format!(
2336 "operational {} '{}:{}' has no source_ref",
2337 operational_write_kind(write),
2338 operational_write_collection(write),
2339 operational_write_record_key(write)
2340 )
2341 }),
2342 )
2343 .collect();
2344
2345 let mut warnings = provenance_warnings.clone();
2346 warnings.extend(prepared.operational_validation_warnings.clone());
2347
2348 Ok(WriteReceipt {
2349 label: prepared.label.clone(),
2350 optional_backfill_count: prepared.optional_backfills.len(),
2351 warnings,
2352 provenance_warnings,
2353 })
2354}
2355
2356fn maybe_enqueue_vector_projection_work(
2367 tx: &rusqlite::Transaction<'_>,
2368 prepared: &PreparedWrite,
2369) -> Result<(), EngineError> {
2370 if prepared.chunks.is_empty() {
2371 return Ok(());
2372 }
2373
2374 let active_profile: Option<i64> = tx
2376 .query_row(
2377 "SELECT profile_id FROM vector_embedding_profiles WHERE active = 1",
2378 [],
2379 |row| row.get::<_, i64>(0),
2380 )
2381 .optional()?;
2382 let Some(profile_id) = active_profile else {
2383 return Ok(());
2384 };
2385
2386 let mut enabled_cache: HashMap<String, bool> = HashMap::new();
2388
2389 let mut is_kind_enabled = |kind: &str| -> Result<bool, EngineError> {
2390 if let Some(v) = enabled_cache.get(kind) {
2391 return Ok(*v);
2392 }
2393 let enabled: bool = tx
2394 .query_row(
2395 "SELECT 1 FROM vector_index_schemas WHERE kind = ?1 AND enabled = 1",
2396 params![kind],
2397 |_| Ok(true),
2398 )
2399 .optional()?
2400 .unwrap_or(false);
2401 enabled_cache.insert(kind.to_owned(), enabled);
2402 Ok(enabled)
2403 };
2404
2405 let mut ins_stmt = tx.prepare(
2406 "INSERT INTO vector_projection_work \
2407 (kind, node_logical_id, chunk_id, canonical_hash, priority, \
2408 embedding_profile_id, state, created_at, updated_at) \
2409 SELECT ?1, ?2, ?3, ?4, 1000, ?5, 'pending', unixepoch(), unixepoch() \
2410 WHERE NOT EXISTS ( \
2411 SELECT 1 FROM vector_projection_work \
2412 WHERE chunk_id = ?3 AND embedding_profile_id = ?5 AND state = 'pending' \
2413 )",
2414 )?;
2415 let mut promote_stmt = tx.prepare(
2416 "UPDATE vector_projection_work \
2417 SET priority = 1000, updated_at = unixepoch() \
2418 WHERE chunk_id = ?1 AND embedding_profile_id = ?2 AND state = 'pending' \
2419 AND priority < 1000",
2420 )?;
2421
2422 for chunk in &prepared.chunks {
2423 let kind_owned: Option<String> =
2426 if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
2427 Some(k.clone())
2428 } else {
2429 tx.query_row(
2430 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
2431 params![chunk.node_logical_id],
2432 |row| row.get::<_, String>(0),
2433 )
2434 .optional()?
2435 };
2436 let Some(kind) = kind_owned else {
2437 continue;
2438 };
2439 if !is_kind_enabled(&kind)? {
2440 continue;
2441 }
2442 let canonical_hash = crate::admin::canonical_chunk_hash(&chunk.id, &chunk.text_content);
2443 let inserted = ins_stmt.execute(params![
2444 kind,
2445 chunk.node_logical_id,
2446 chunk.id,
2447 canonical_hash,
2448 profile_id,
2449 ])?;
2450 if inserted == 0 {
2451 promote_stmt.execute(params![chunk.id, profile_id])?;
2453 }
2454 }
2455 Ok(())
2456}
2457
2458fn ensure_property_fts_table_for_row(
2459 conn: &rusqlite::Connection,
2460 kind: &str,
2461 columns: &[(String, String)],
2462) -> Result<(), rusqlite::Error> {
2463 let table = fathomdb_schema::fts_kind_table_name(kind);
2464 let exists: bool = conn
2465 .query_row(
2466 "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1 \
2467 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
2468 rusqlite::params![table],
2469 |_| Ok(true),
2470 )
2471 .optional()?
2472 .unwrap_or(false);
2473 if exists {
2474 return Ok(());
2475 }
2476
2477 let tokenizer = fathomdb_schema::resolve_fts_tokenizer(conn, kind)
2478 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
2479 let tokenizer_sql = tokenizer.replace('\'', "''");
2480 let cols: Vec<String> = if columns.is_empty() {
2481 vec![
2482 "node_logical_id UNINDEXED".to_owned(),
2483 "text_content".to_owned(),
2484 ]
2485 } else {
2486 std::iter::once("node_logical_id UNINDEXED".to_owned())
2487 .chain(columns.iter().map(|(name, _)| name.clone()))
2488 .collect()
2489 };
2490 conn.execute_batch(&format!(
2491 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5({cols}, tokenize='{tokenizer_sql}')",
2492 cols = cols.join(", "),
2493 ))?;
2494 Ok(())
2495}
2496
2497fn operational_write_collection(write: &OperationalWrite) -> &str {
2498 match write {
2499 OperationalWrite::Append { collection, .. }
2500 | OperationalWrite::Put { collection, .. }
2501 | OperationalWrite::Delete { collection, .. } => collection,
2502 }
2503}
2504
2505fn operational_write_record_key(write: &OperationalWrite) -> &str {
2506 match write {
2507 OperationalWrite::Append { record_key, .. }
2508 | OperationalWrite::Put { record_key, .. }
2509 | OperationalWrite::Delete { record_key, .. } => record_key,
2510 }
2511}
2512
2513fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2514 match write {
2515 OperationalWrite::Append { .. } => "append",
2516 OperationalWrite::Put { .. } => "put",
2517 OperationalWrite::Delete { .. } => "delete",
2518 }
2519}
2520
2521fn operational_write_payload(write: &OperationalWrite) -> &str {
2522 match write {
2523 OperationalWrite::Append { payload_json, .. }
2524 | OperationalWrite::Put { payload_json, .. } => payload_json,
2525 OperationalWrite::Delete { .. } => "null",
2526 }
2527}
2528
2529fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2530 match write {
2531 OperationalWrite::Append { source_ref, .. }
2532 | OperationalWrite::Put { source_ref, .. }
2533 | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2534 }
2535}
2536
2537#[cfg(test)]
2538#[allow(clippy::expect_used)]
2539mod tests {
2540 use std::sync::Arc;
2541
2542 use fathomdb_schema::SchemaManager;
2543 use tempfile::NamedTempFile;
2544
2545 use super::{apply_write, prepare_write, resolve_operational_writes};
2546 use crate::{
2547 ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2548 NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2549 StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2550 projection::ProjectionTarget,
2551 };
2552
2553 #[test]
2554 fn writer_executes_runtime_table_rows() {
2555 let db = NamedTempFile::new().expect("temporary db");
2556 let writer = WriterActor::start(
2557 db.path(),
2558 Arc::new(SchemaManager::new()),
2559 ProvenanceMode::Warn,
2560 Arc::new(TelemetryCounters::default()),
2561 )
2562 .expect("writer");
2563
2564 let receipt = writer
2565 .submit(WriteRequest {
2566 label: "runtime".to_owned(),
2567 nodes: vec![],
2568 node_retires: vec![],
2569 edges: vec![],
2570 edge_retires: vec![],
2571 chunks: vec![],
2572 runs: vec![RunInsert {
2573 id: "run-1".to_owned(),
2574 kind: "session".to_owned(),
2575 status: "completed".to_owned(),
2576 properties: "{}".to_owned(),
2577 source_ref: Some("src-1".to_owned()),
2578 upsert: false,
2579 supersedes_id: None,
2580 }],
2581 steps: vec![StepInsert {
2582 id: "step-1".to_owned(),
2583 run_id: "run-1".to_owned(),
2584 kind: "llm".to_owned(),
2585 status: "completed".to_owned(),
2586 properties: "{}".to_owned(),
2587 source_ref: Some("src-1".to_owned()),
2588 upsert: false,
2589 supersedes_id: None,
2590 }],
2591 actions: vec![ActionInsert {
2592 id: "action-1".to_owned(),
2593 step_id: "step-1".to_owned(),
2594 kind: "emit".to_owned(),
2595 status: "completed".to_owned(),
2596 properties: "{}".to_owned(),
2597 source_ref: Some("src-1".to_owned()),
2598 upsert: false,
2599 supersedes_id: None,
2600 }],
2601 optional_backfills: vec![],
2602 vec_inserts: vec![],
2603 operational_writes: vec![],
2604 })
2605 .expect("write receipt");
2606
2607 assert_eq!(receipt.label, "runtime");
2608 }
2609 #[test]
2610 fn writer_put_operational_write_updates_current_and_mutations() {
2611 let db = NamedTempFile::new().expect("temporary db");
2612 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2613 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2614 conn.execute(
2615 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2616 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2617 [],
2618 )
2619 .expect("seed collection");
2620 drop(conn);
2621 let writer = WriterActor::start(
2622 db.path(),
2623 Arc::new(SchemaManager::new()),
2624 ProvenanceMode::Warn,
2625 Arc::new(TelemetryCounters::default()),
2626 )
2627 .expect("writer");
2628
2629 writer
2630 .submit(WriteRequest {
2631 label: "node-and-operational".to_owned(),
2632 nodes: vec![NodeInsert {
2633 row_id: "row-1".to_owned(),
2634 logical_id: "lg-1".to_owned(),
2635 kind: "Meeting".to_owned(),
2636 properties: "{}".to_owned(),
2637 source_ref: Some("src-1".to_owned()),
2638 upsert: false,
2639 chunk_policy: ChunkPolicy::Preserve,
2640 content_ref: None,
2641 }],
2642 node_retires: vec![],
2643 edges: vec![],
2644 edge_retires: vec![],
2645 chunks: vec![],
2646 runs: vec![],
2647 steps: vec![],
2648 actions: vec![],
2649 optional_backfills: vec![],
2650 vec_inserts: vec![],
2651 operational_writes: vec![OperationalWrite::Put {
2652 collection: "connector_health".to_owned(),
2653 record_key: "gmail".to_owned(),
2654 payload_json: r#"{"status":"ok"}"#.to_owned(),
2655 source_ref: Some("src-1".to_owned()),
2656 }],
2657 })
2658 .expect("write receipt");
2659
2660 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2661 let node_count: i64 = conn
2662 .query_row(
2663 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2664 [],
2665 |row| row.get(0),
2666 )
2667 .expect("node count");
2668 assert_eq!(node_count, 1);
2669 let mutation_count: i64 = conn
2670 .query_row(
2671 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2672 AND record_key = 'gmail'",
2673 [],
2674 |row| row.get(0),
2675 )
2676 .expect("mutation count");
2677 assert_eq!(mutation_count, 1);
2678 let payload: String = conn
2679 .query_row(
2680 "SELECT payload_json FROM operational_current \
2681 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2682 [],
2683 |row| row.get(0),
2684 )
2685 .expect("current payload");
2686 assert_eq!(payload, r#"{"status":"ok"}"#);
2687 }
2688
2689 #[test]
2690 fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2691 let db = NamedTempFile::new().expect("temporary db");
2692 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2693 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2694 conn.execute(
2695 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2696 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2697 [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2698 )
2699 .expect("seed collection");
2700 drop(conn);
2701 let writer = WriterActor::start(
2702 db.path(),
2703 Arc::new(SchemaManager::new()),
2704 ProvenanceMode::Warn,
2705 Arc::new(TelemetryCounters::default()),
2706 )
2707 .expect("writer");
2708
2709 writer
2710 .submit(WriteRequest {
2711 label: "disabled-validation".to_owned(),
2712 nodes: vec![],
2713 node_retires: vec![],
2714 edges: vec![],
2715 edge_retires: vec![],
2716 chunks: vec![],
2717 runs: vec![],
2718 steps: vec![],
2719 actions: vec![],
2720 optional_backfills: vec![],
2721 vec_inserts: vec![],
2722 operational_writes: vec![OperationalWrite::Put {
2723 collection: "connector_health".to_owned(),
2724 record_key: "gmail".to_owned(),
2725 payload_json: r#"{"bogus":true}"#.to_owned(),
2726 source_ref: Some("src-1".to_owned()),
2727 }],
2728 })
2729 .expect("write receipt");
2730
2731 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2732 let payload: String = conn
2733 .query_row(
2734 "SELECT payload_json FROM operational_current \
2735 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2736 [],
2737 |row| row.get(0),
2738 )
2739 .expect("current payload");
2740 assert_eq!(payload, r#"{"bogus":true}"#);
2741 }
2742
2743 #[test]
2744 fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2745 let db = NamedTempFile::new().expect("temporary db");
2746 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2747 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2748 conn.execute(
2749 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2750 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2751 [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2752 )
2753 .expect("seed collection");
2754 drop(conn);
2755 let writer = WriterActor::start(
2756 db.path(),
2757 Arc::new(SchemaManager::new()),
2758 ProvenanceMode::Warn,
2759 Arc::new(TelemetryCounters::default()),
2760 )
2761 .expect("writer");
2762
2763 let receipt = writer
2764 .submit(WriteRequest {
2765 label: "report-only-validation".to_owned(),
2766 nodes: vec![],
2767 node_retires: vec![],
2768 edges: vec![],
2769 edge_retires: vec![],
2770 chunks: vec![],
2771 runs: vec![],
2772 steps: vec![],
2773 actions: vec![],
2774 optional_backfills: vec![],
2775 vec_inserts: vec![],
2776 operational_writes: vec![OperationalWrite::Put {
2777 collection: "connector_health".to_owned(),
2778 record_key: "gmail".to_owned(),
2779 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2780 source_ref: Some("src-1".to_owned()),
2781 }],
2782 })
2783 .expect("report_only write should succeed");
2784
2785 assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2786 assert_eq!(receipt.warnings.len(), 1);
2787 assert!(
2788 receipt.warnings[0].contains("connector_health"),
2789 "warning should identify collection"
2790 );
2791 assert!(
2792 receipt.warnings[0].contains("must be one of"),
2793 "warning should explain validation failure"
2794 );
2795
2796 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2797 let payload: String = conn
2798 .query_row(
2799 "SELECT payload_json FROM operational_current \
2800 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2801 [],
2802 |row| row.get(0),
2803 )
2804 .expect("current payload");
2805 assert_eq!(payload, r#"{"status":"bogus"}"#);
2806 }
2807
2808 #[test]
2809 fn writer_rejects_operational_write_for_missing_collection() {
2810 let db = NamedTempFile::new().expect("temporary db");
2811 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2812 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2813 drop(conn);
2814 let writer = WriterActor::start(
2815 db.path(),
2816 Arc::new(SchemaManager::new()),
2817 ProvenanceMode::Warn,
2818 Arc::new(TelemetryCounters::default()),
2819 )
2820 .expect("writer");
2821
2822 let result = writer.submit(WriteRequest {
2823 label: "missing-operational-collection".to_owned(),
2824 nodes: vec![],
2825 node_retires: vec![],
2826 edges: vec![],
2827 edge_retires: vec![],
2828 chunks: vec![],
2829 runs: vec![],
2830 steps: vec![],
2831 actions: vec![],
2832 optional_backfills: vec![],
2833 vec_inserts: vec![],
2834 operational_writes: vec![OperationalWrite::Put {
2835 collection: "connector_health".to_owned(),
2836 record_key: "gmail".to_owned(),
2837 payload_json: r#"{"status":"ok"}"#.to_owned(),
2838 source_ref: Some("src-1".to_owned()),
2839 }],
2840 });
2841
2842 assert!(
2843 matches!(result, Err(EngineError::InvalidWrite(_))),
2844 "missing operational collection must return InvalidWrite"
2845 );
2846 }
2847
2848 #[test]
2849 fn writer_append_operational_write_records_history_without_current_row() {
2850 let db = NamedTempFile::new().expect("temporary db");
2851 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2852 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2853 conn.execute(
2854 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2855 VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2856 [],
2857 )
2858 .expect("seed collection");
2859 drop(conn);
2860 let writer = WriterActor::start(
2861 db.path(),
2862 Arc::new(SchemaManager::new()),
2863 ProvenanceMode::Warn,
2864 Arc::new(TelemetryCounters::default()),
2865 )
2866 .expect("writer");
2867
2868 writer
2869 .submit(WriteRequest {
2870 label: "append-operational".to_owned(),
2871 nodes: vec![],
2872 node_retires: vec![],
2873 edges: vec![],
2874 edge_retires: vec![],
2875 chunks: vec![],
2876 runs: vec![],
2877 steps: vec![],
2878 actions: vec![],
2879 optional_backfills: vec![],
2880 vec_inserts: vec![],
2881 operational_writes: vec![OperationalWrite::Append {
2882 collection: "audit_log".to_owned(),
2883 record_key: "evt-1".to_owned(),
2884 payload_json: r#"{"type":"sync"}"#.to_owned(),
2885 source_ref: Some("src-1".to_owned()),
2886 }],
2887 })
2888 .expect("write receipt");
2889
2890 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2891 let mutation: (String, String) = conn
2892 .query_row(
2893 "SELECT op_kind, payload_json FROM operational_mutations \
2894 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2895 [],
2896 |row| Ok((row.get(0)?, row.get(1)?)),
2897 )
2898 .expect("mutation row");
2899 assert_eq!(mutation.0, "append");
2900 assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2901 let current_count: i64 = conn
2902 .query_row(
2903 "SELECT count(*) FROM operational_current \
2904 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2905 [],
2906 |row| row.get(0),
2907 )
2908 .expect("current count");
2909 assert_eq!(current_count, 0);
2910 }
2911
2912 #[test]
2913 fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2914 let db = NamedTempFile::new().expect("temporary db");
2915 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2916 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2917 conn.execute(
2918 "INSERT INTO operational_collections \
2919 (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2920 VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2921 '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2922 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2923 )
2924 .expect("seed collection");
2925 drop(conn);
2926 let writer = WriterActor::start(
2927 db.path(),
2928 Arc::new(SchemaManager::new()),
2929 ProvenanceMode::Warn,
2930 Arc::new(TelemetryCounters::default()),
2931 )
2932 .expect("writer");
2933
2934 let error = writer
2935 .submit(WriteRequest {
2936 label: "invalid-append".to_owned(),
2937 nodes: vec![],
2938 node_retires: vec![],
2939 edges: vec![],
2940 edge_retires: vec![],
2941 chunks: vec![],
2942 runs: vec![],
2943 steps: vec![],
2944 actions: vec![],
2945 optional_backfills: vec![],
2946 vec_inserts: vec![],
2947 operational_writes: vec![OperationalWrite::Append {
2948 collection: "audit_log".to_owned(),
2949 record_key: "evt-1".to_owned(),
2950 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2951 source_ref: Some("src-1".to_owned()),
2952 }],
2953 })
2954 .expect_err("invalid append must reject");
2955 assert!(matches!(error, EngineError::InvalidWrite(_)));
2956 assert!(error.to_string().contains("must be one of"));
2957
2958 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2959 let mutation_count: i64 = conn
2960 .query_row(
2961 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2962 [],
2963 |row| row.get(0),
2964 )
2965 .expect("mutation count");
2966 assert_eq!(mutation_count, 0);
2967 let filter_count: i64 = conn
2968 .query_row(
2969 "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2970 [],
2971 |row| row.get(0),
2972 )
2973 .expect("filter count");
2974 assert_eq!(filter_count, 0);
2975 }
2976
2977 #[test]
2978 fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2979 let db = NamedTempFile::new().expect("temporary db");
2980 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2981 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2982 conn.execute(
2983 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2984 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2985 [],
2986 )
2987 .expect("seed collection");
2988 drop(conn);
2989 let writer = WriterActor::start(
2990 db.path(),
2991 Arc::new(SchemaManager::new()),
2992 ProvenanceMode::Warn,
2993 Arc::new(TelemetryCounters::default()),
2994 )
2995 .expect("writer");
2996
2997 writer
2998 .submit(WriteRequest {
2999 label: "put-operational".to_owned(),
3000 nodes: vec![],
3001 node_retires: vec![],
3002 edges: vec![],
3003 edge_retires: vec![],
3004 chunks: vec![],
3005 runs: vec![],
3006 steps: vec![],
3007 actions: vec![],
3008 optional_backfills: vec![],
3009 vec_inserts: vec![],
3010 operational_writes: vec![OperationalWrite::Put {
3011 collection: "connector_health".to_owned(),
3012 record_key: "gmail".to_owned(),
3013 payload_json: r#"{"status":"ok"}"#.to_owned(),
3014 source_ref: Some("src-1".to_owned()),
3015 }],
3016 })
3017 .expect("put receipt");
3018
3019 writer
3020 .submit(WriteRequest {
3021 label: "delete-operational".to_owned(),
3022 nodes: vec![],
3023 node_retires: vec![],
3024 edges: vec![],
3025 edge_retires: vec![],
3026 chunks: vec![],
3027 runs: vec![],
3028 steps: vec![],
3029 actions: vec![],
3030 optional_backfills: vec![],
3031 vec_inserts: vec![],
3032 operational_writes: vec![OperationalWrite::Delete {
3033 collection: "connector_health".to_owned(),
3034 record_key: "gmail".to_owned(),
3035 source_ref: Some("src-2".to_owned()),
3036 }],
3037 })
3038 .expect("delete receipt");
3039
3040 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3041 let mutation_kinds: Vec<String> = {
3042 let mut stmt = conn
3043 .prepare(
3044 "SELECT op_kind FROM operational_mutations \
3045 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3046 ORDER BY mutation_order ASC",
3047 )
3048 .expect("stmt");
3049 stmt.query_map([], |row| row.get(0))
3050 .expect("rows")
3051 .collect::<Result<_, _>>()
3052 .expect("collect")
3053 };
3054 assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
3055 let current_count: i64 = conn
3056 .query_row(
3057 "SELECT count(*) FROM operational_current \
3058 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3059 [],
3060 |row| row.get(0),
3061 )
3062 .expect("current count");
3063 assert_eq!(current_count, 0);
3064 }
3065
3066 #[test]
3067 fn writer_delete_bypasses_validation_contract() {
3068 let db = NamedTempFile::new().expect("temporary db");
3069 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3070 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3071 conn.execute(
3072 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3073 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3074 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3075 )
3076 .expect("seed collection");
3077 drop(conn);
3078 let writer = WriterActor::start(
3079 db.path(),
3080 Arc::new(SchemaManager::new()),
3081 ProvenanceMode::Warn,
3082 Arc::new(TelemetryCounters::default()),
3083 )
3084 .expect("writer");
3085
3086 writer
3087 .submit(WriteRequest {
3088 label: "valid-put".to_owned(),
3089 nodes: vec![],
3090 node_retires: vec![],
3091 edges: vec![],
3092 edge_retires: vec![],
3093 chunks: vec![],
3094 runs: vec![],
3095 steps: vec![],
3096 actions: vec![],
3097 optional_backfills: vec![],
3098 vec_inserts: vec![],
3099 operational_writes: vec![OperationalWrite::Put {
3100 collection: "connector_health".to_owned(),
3101 record_key: "gmail".to_owned(),
3102 payload_json: r#"{"status":"ok"}"#.to_owned(),
3103 source_ref: Some("src-1".to_owned()),
3104 }],
3105 })
3106 .expect("put receipt");
3107 writer
3108 .submit(WriteRequest {
3109 label: "delete-after-put".to_owned(),
3110 nodes: vec![],
3111 node_retires: vec![],
3112 edges: vec![],
3113 edge_retires: vec![],
3114 chunks: vec![],
3115 runs: vec![],
3116 steps: vec![],
3117 actions: vec![],
3118 optional_backfills: vec![],
3119 vec_inserts: vec![],
3120 operational_writes: vec![OperationalWrite::Delete {
3121 collection: "connector_health".to_owned(),
3122 record_key: "gmail".to_owned(),
3123 source_ref: Some("src-2".to_owned()),
3124 }],
3125 })
3126 .expect("delete receipt");
3127
3128 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3129 let current_count: i64 = conn
3130 .query_row(
3131 "SELECT count(*) FROM operational_current \
3132 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3133 [],
3134 |row| row.get(0),
3135 )
3136 .expect("current count");
3137 assert_eq!(current_count, 0);
3138 }
3139
3140 #[test]
3141 fn writer_latest_state_secondary_indexes_track_put_and_delete() {
3142 let db = NamedTempFile::new().expect("temporary db");
3143 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3144 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3145 conn.execute(
3146 "INSERT INTO operational_collections \
3147 (name, kind, schema_json, retention_json, secondary_indexes_json) \
3148 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3149 [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
3150 )
3151 .expect("seed collection");
3152 drop(conn);
3153 let writer = WriterActor::start(
3154 db.path(),
3155 Arc::new(SchemaManager::new()),
3156 ProvenanceMode::Warn,
3157 Arc::new(TelemetryCounters::default()),
3158 )
3159 .expect("writer");
3160
3161 writer
3162 .submit(WriteRequest {
3163 label: "secondary-index-put".to_owned(),
3164 nodes: vec![],
3165 node_retires: vec![],
3166 edges: vec![],
3167 edge_retires: vec![],
3168 chunks: vec![],
3169 runs: vec![],
3170 steps: vec![],
3171 actions: vec![],
3172 optional_backfills: vec![],
3173 vec_inserts: vec![],
3174 operational_writes: vec![OperationalWrite::Put {
3175 collection: "connector_health".to_owned(),
3176 record_key: "gmail".to_owned(),
3177 payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
3178 .to_owned(),
3179 source_ref: Some("src-1".to_owned()),
3180 }],
3181 })
3182 .expect("put receipt");
3183
3184 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3185 let current_entry_count: i64 = conn
3186 .query_row(
3187 "SELECT count(*) FROM operational_secondary_index_entries \
3188 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3189 [],
3190 |row| row.get(0),
3191 )
3192 .expect("current secondary index count");
3193 assert_eq!(current_entry_count, 2);
3194 drop(conn);
3195
3196 writer
3197 .submit(WriteRequest {
3198 label: "secondary-index-delete".to_owned(),
3199 nodes: vec![],
3200 node_retires: vec![],
3201 edges: vec![],
3202 edge_retires: vec![],
3203 chunks: vec![],
3204 runs: vec![],
3205 steps: vec![],
3206 actions: vec![],
3207 optional_backfills: vec![],
3208 vec_inserts: vec![],
3209 operational_writes: vec![OperationalWrite::Delete {
3210 collection: "connector_health".to_owned(),
3211 record_key: "gmail".to_owned(),
3212 source_ref: Some("src-2".to_owned()),
3213 }],
3214 })
3215 .expect("delete receipt");
3216
3217 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3218 let current_entry_count: i64 = conn
3219 .query_row(
3220 "SELECT count(*) FROM operational_secondary_index_entries \
3221 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3222 [],
3223 |row| row.get(0),
3224 )
3225 .expect("current secondary index count");
3226 assert_eq!(current_entry_count, 0);
3227 }
3228
3229 #[test]
3230 fn writer_latest_state_operational_writes_persist_mutation_order() {
3231 let db = NamedTempFile::new().expect("temporary db");
3232 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3233 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3234 conn.execute(
3235 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3236 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3237 [],
3238 )
3239 .expect("seed collection");
3240 drop(conn);
3241
3242 let writer = WriterActor::start(
3243 db.path(),
3244 Arc::new(SchemaManager::new()),
3245 ProvenanceMode::Warn,
3246 Arc::new(TelemetryCounters::default()),
3247 )
3248 .expect("writer");
3249
3250 writer
3251 .submit(WriteRequest {
3252 label: "ordered-operational-batch".to_owned(),
3253 nodes: vec![],
3254 node_retires: vec![],
3255 edges: vec![],
3256 edge_retires: vec![],
3257 chunks: vec![],
3258 runs: vec![],
3259 steps: vec![],
3260 actions: vec![],
3261 optional_backfills: vec![],
3262 vec_inserts: vec![],
3263 operational_writes: vec![
3264 OperationalWrite::Put {
3265 collection: "connector_health".to_owned(),
3266 record_key: "gmail".to_owned(),
3267 payload_json: r#"{"status":"old"}"#.to_owned(),
3268 source_ref: Some("src-1".to_owned()),
3269 },
3270 OperationalWrite::Delete {
3271 collection: "connector_health".to_owned(),
3272 record_key: "gmail".to_owned(),
3273 source_ref: Some("src-2".to_owned()),
3274 },
3275 OperationalWrite::Put {
3276 collection: "connector_health".to_owned(),
3277 record_key: "gmail".to_owned(),
3278 payload_json: r#"{"status":"new"}"#.to_owned(),
3279 source_ref: Some("src-3".to_owned()),
3280 },
3281 ],
3282 })
3283 .expect("write receipt");
3284
3285 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3286 let rows: Vec<(String, i64)> = {
3287 let mut stmt = conn
3288 .prepare(
3289 "SELECT op_kind, mutation_order FROM operational_mutations \
3290 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3291 ORDER BY mutation_order ASC",
3292 )
3293 .expect("stmt");
3294 stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3295 .expect("rows")
3296 .collect::<Result<_, _>>()
3297 .expect("collect")
3298 };
3299 assert_eq!(
3300 rows,
3301 vec![
3302 ("put".to_owned(), 1),
3303 ("delete".to_owned(), 2),
3304 ("put".to_owned(), 3),
3305 ]
3306 );
3307 let payload: String = conn
3308 .query_row(
3309 "SELECT payload_json FROM operational_current \
3310 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3311 [],
3312 |row| row.get(0),
3313 )
3314 .expect("current payload");
3315 assert_eq!(payload, r#"{"status":"new"}"#);
3316 }
3317
3318 #[test]
3319 fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
3320 let db = NamedTempFile::new().expect("temporary db");
3321 let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
3322 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3323 conn.execute(
3324 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3325 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3326 [],
3327 )
3328 .expect("seed collection");
3329
3330 let request = WriteRequest {
3331 label: "disabled-race".to_owned(),
3332 nodes: vec![],
3333 node_retires: vec![],
3334 edges: vec![],
3335 edge_retires: vec![],
3336 chunks: vec![],
3337 runs: vec![],
3338 steps: vec![],
3339 actions: vec![],
3340 optional_backfills: vec![],
3341 vec_inserts: vec![],
3342 operational_writes: vec![OperationalWrite::Put {
3343 collection: "connector_health".to_owned(),
3344 record_key: "gmail".to_owned(),
3345 payload_json: r#"{"status":"ok"}"#.to_owned(),
3346 source_ref: Some("src-1".to_owned()),
3347 }],
3348 };
3349 let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
3350 resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
3351
3352 conn.execute(
3353 "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
3354 [],
3355 )
3356 .expect("disable collection after preflight");
3357
3358 let error =
3359 apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
3360 assert!(matches!(error, EngineError::InvalidWrite(_)));
3361 assert!(error.to_string().contains("is disabled"));
3362
3363 let mutation_count: i64 = conn
3364 .query_row(
3365 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3366 [],
3367 |row| row.get(0),
3368 )
3369 .expect("mutation count");
3370 assert_eq!(mutation_count, 0);
3371
3372 let current_count: i64 = conn
3373 .query_row(
3374 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3375 [],
3376 |row| row.get(0),
3377 )
3378 .expect("current count");
3379 assert_eq!(current_count, 0);
3380 }
3381
3382 #[test]
3383 fn writer_enforce_validation_rejects_invalid_put_atomically() {
3384 let db = NamedTempFile::new().expect("temporary db");
3385 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3386 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3387 conn.execute(
3388 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3389 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3390 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3391 )
3392 .expect("seed collection");
3393 drop(conn);
3394 let writer = WriterActor::start(
3395 db.path(),
3396 Arc::new(SchemaManager::new()),
3397 ProvenanceMode::Warn,
3398 Arc::new(TelemetryCounters::default()),
3399 )
3400 .expect("writer");
3401
3402 let error = writer
3403 .submit(WriteRequest {
3404 label: "invalid-put".to_owned(),
3405 nodes: vec![NodeInsert {
3406 row_id: "row-1".to_owned(),
3407 logical_id: "lg-1".to_owned(),
3408 kind: "Meeting".to_owned(),
3409 properties: "{}".to_owned(),
3410 source_ref: Some("src-1".to_owned()),
3411 upsert: false,
3412 chunk_policy: ChunkPolicy::Preserve,
3413 content_ref: None,
3414 }],
3415 node_retires: vec![],
3416 edges: vec![],
3417 edge_retires: vec![],
3418 chunks: vec![],
3419 runs: vec![],
3420 steps: vec![],
3421 actions: vec![],
3422 optional_backfills: vec![],
3423 vec_inserts: vec![],
3424 operational_writes: vec![OperationalWrite::Put {
3425 collection: "connector_health".to_owned(),
3426 record_key: "gmail".to_owned(),
3427 payload_json: r#"{"status":"bogus"}"#.to_owned(),
3428 source_ref: Some("src-1".to_owned()),
3429 }],
3430 })
3431 .expect_err("invalid put must reject");
3432 assert!(matches!(error, EngineError::InvalidWrite(_)));
3433 assert!(error.to_string().contains("must be one of"));
3434
3435 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3436 let node_count: i64 = conn
3437 .query_row(
3438 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
3439 [],
3440 |row| row.get(0),
3441 )
3442 .expect("node count");
3443 assert_eq!(node_count, 0);
3444 let mutation_count: i64 = conn
3445 .query_row(
3446 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3447 [],
3448 |row| row.get(0),
3449 )
3450 .expect("mutation count");
3451 assert_eq!(mutation_count, 0);
3452 let current_count: i64 = conn
3453 .query_row(
3454 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3455 [],
3456 |row| row.get(0),
3457 )
3458 .expect("current count");
3459 assert_eq!(current_count, 0);
3460 }
3461
3462 #[test]
3463 fn writer_rejects_append_against_latest_state_collection() {
3464 let db = NamedTempFile::new().expect("temporary db");
3465 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3466 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3467 conn.execute(
3468 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3469 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3470 [],
3471 )
3472 .expect("seed collection");
3473 drop(conn);
3474 let writer = WriterActor::start(
3475 db.path(),
3476 Arc::new(SchemaManager::new()),
3477 ProvenanceMode::Warn,
3478 Arc::new(TelemetryCounters::default()),
3479 )
3480 .expect("writer");
3481
3482 let result = writer.submit(WriteRequest {
3483 label: "bad-append".to_owned(),
3484 nodes: vec![],
3485 node_retires: vec![],
3486 edges: vec![],
3487 edge_retires: vec![],
3488 chunks: vec![],
3489 runs: vec![],
3490 steps: vec![],
3491 actions: vec![],
3492 optional_backfills: vec![],
3493 vec_inserts: vec![],
3494 operational_writes: vec![OperationalWrite::Append {
3495 collection: "connector_health".to_owned(),
3496 record_key: "gmail".to_owned(),
3497 payload_json: r#"{"status":"ok"}"#.to_owned(),
3498 source_ref: Some("src-1".to_owned()),
3499 }],
3500 });
3501
3502 assert!(
3503 matches!(result, Err(EngineError::InvalidWrite(_))),
3504 "latest_state collection must reject Append"
3505 );
3506 }
3507
3508 #[test]
3509 fn writer_upsert_supersedes_prior_active_node() {
3510 let db = NamedTempFile::new().expect("temporary db");
3511 let writer = WriterActor::start(
3512 db.path(),
3513 Arc::new(SchemaManager::new()),
3514 ProvenanceMode::Warn,
3515 Arc::new(TelemetryCounters::default()),
3516 )
3517 .expect("writer");
3518
3519 writer
3520 .submit(WriteRequest {
3521 label: "v1".to_owned(),
3522 nodes: vec![NodeInsert {
3523 row_id: "row-1".to_owned(),
3524 logical_id: "lg-1".to_owned(),
3525 kind: "Meeting".to_owned(),
3526 properties: r#"{"version":1}"#.to_owned(),
3527 source_ref: Some("src-1".to_owned()),
3528 upsert: false,
3529 chunk_policy: ChunkPolicy::Preserve,
3530 content_ref: None,
3531 }],
3532 node_retires: vec![],
3533 edges: vec![],
3534 edge_retires: vec![],
3535 chunks: vec![],
3536 runs: vec![],
3537 steps: vec![],
3538 actions: vec![],
3539 optional_backfills: vec![],
3540 vec_inserts: vec![],
3541 operational_writes: vec![],
3542 })
3543 .expect("v1 write");
3544
3545 writer
3546 .submit(WriteRequest {
3547 label: "v2".to_owned(),
3548 nodes: vec![NodeInsert {
3549 row_id: "row-2".to_owned(),
3550 logical_id: "lg-1".to_owned(),
3551 kind: "Meeting".to_owned(),
3552 properties: r#"{"version":2}"#.to_owned(),
3553 source_ref: Some("src-2".to_owned()),
3554 upsert: true,
3555 chunk_policy: ChunkPolicy::Preserve,
3556 content_ref: None,
3557 }],
3558 node_retires: vec![],
3559 edges: vec![],
3560 edge_retires: vec![],
3561 chunks: vec![],
3562 runs: vec![],
3563 steps: vec![],
3564 actions: vec![],
3565 optional_backfills: vec![],
3566 vec_inserts: vec![],
3567 operational_writes: vec![],
3568 })
3569 .expect("v2 upsert write");
3570
3571 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3572 let (active_row_id, props): (String, String) = conn
3573 .query_row(
3574 "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3575 [],
3576 |row| Ok((row.get(0)?, row.get(1)?)),
3577 )
3578 .expect("active row");
3579 assert_eq!(active_row_id, "row-2");
3580 assert!(props.contains("\"version\":2"));
3581
3582 let superseded: i64 = conn
3583 .query_row(
3584 "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3585 [],
3586 |row| row.get(0),
3587 )
3588 .expect("superseded count");
3589 assert_eq!(superseded, 1);
3590 }
3591
3592 #[test]
3593 fn writer_inserts_edge_between_two_nodes() {
3594 let db = NamedTempFile::new().expect("temporary db");
3595 let writer = WriterActor::start(
3596 db.path(),
3597 Arc::new(SchemaManager::new()),
3598 ProvenanceMode::Warn,
3599 Arc::new(TelemetryCounters::default()),
3600 )
3601 .expect("writer");
3602
3603 writer
3604 .submit(WriteRequest {
3605 label: "nodes-and-edge".to_owned(),
3606 nodes: vec![
3607 NodeInsert {
3608 row_id: "row-meeting".to_owned(),
3609 logical_id: "meeting-1".to_owned(),
3610 kind: "Meeting".to_owned(),
3611 properties: "{}".to_owned(),
3612 source_ref: Some("src-1".to_owned()),
3613 upsert: false,
3614 chunk_policy: ChunkPolicy::Preserve,
3615 content_ref: None,
3616 },
3617 NodeInsert {
3618 row_id: "row-task".to_owned(),
3619 logical_id: "task-1".to_owned(),
3620 kind: "Task".to_owned(),
3621 properties: "{}".to_owned(),
3622 source_ref: Some("src-1".to_owned()),
3623 upsert: false,
3624 chunk_policy: ChunkPolicy::Preserve,
3625 content_ref: None,
3626 },
3627 ],
3628 node_retires: vec![],
3629 edges: vec![EdgeInsert {
3630 row_id: "edge-1".to_owned(),
3631 logical_id: "edge-lg-1".to_owned(),
3632 source_logical_id: "meeting-1".to_owned(),
3633 target_logical_id: "task-1".to_owned(),
3634 kind: "HAS_TASK".to_owned(),
3635 properties: "{}".to_owned(),
3636 source_ref: Some("src-1".to_owned()),
3637 upsert: false,
3638 }],
3639 edge_retires: vec![],
3640 chunks: vec![],
3641 runs: vec![],
3642 steps: vec![],
3643 actions: vec![],
3644 optional_backfills: vec![],
3645 vec_inserts: vec![],
3646 operational_writes: vec![],
3647 })
3648 .expect("write receipt");
3649
3650 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3651 let (src, tgt, kind): (String, String, String) = conn
3652 .query_row(
3653 "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3654 [],
3655 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3656 )
3657 .expect("edge row");
3658 assert_eq!(src, "meeting-1");
3659 assert_eq!(tgt, "task-1");
3660 assert_eq!(kind, "HAS_TASK");
3661 }
3662
3663 #[test]
3664 #[allow(clippy::too_many_lines)]
3665 fn writer_upsert_supersedes_prior_active_edge() {
3666 let db = NamedTempFile::new().expect("temporary db");
3667 let writer = WriterActor::start(
3668 db.path(),
3669 Arc::new(SchemaManager::new()),
3670 ProvenanceMode::Warn,
3671 Arc::new(TelemetryCounters::default()),
3672 )
3673 .expect("writer");
3674
3675 writer
3677 .submit(WriteRequest {
3678 label: "nodes".to_owned(),
3679 nodes: vec![
3680 NodeInsert {
3681 row_id: "row-a".to_owned(),
3682 logical_id: "node-a".to_owned(),
3683 kind: "Meeting".to_owned(),
3684 properties: "{}".to_owned(),
3685 source_ref: Some("src-1".to_owned()),
3686 upsert: false,
3687 chunk_policy: ChunkPolicy::Preserve,
3688 content_ref: None,
3689 },
3690 NodeInsert {
3691 row_id: "row-b".to_owned(),
3692 logical_id: "node-b".to_owned(),
3693 kind: "Task".to_owned(),
3694 properties: "{}".to_owned(),
3695 source_ref: Some("src-1".to_owned()),
3696 upsert: false,
3697 chunk_policy: ChunkPolicy::Preserve,
3698 content_ref: None,
3699 },
3700 ],
3701 node_retires: vec![],
3702 edges: vec![],
3703 edge_retires: vec![],
3704 chunks: vec![],
3705 runs: vec![],
3706 steps: vec![],
3707 actions: vec![],
3708 optional_backfills: vec![],
3709 vec_inserts: vec![],
3710 operational_writes: vec![],
3711 })
3712 .expect("nodes write");
3713
3714 writer
3716 .submit(WriteRequest {
3717 label: "edge-v1".to_owned(),
3718 nodes: vec![],
3719 node_retires: vec![],
3720 edges: vec![EdgeInsert {
3721 row_id: "edge-row-1".to_owned(),
3722 logical_id: "edge-lg-1".to_owned(),
3723 source_logical_id: "node-a".to_owned(),
3724 target_logical_id: "node-b".to_owned(),
3725 kind: "HAS_TASK".to_owned(),
3726 properties: r#"{"weight":1}"#.to_owned(),
3727 source_ref: Some("src-1".to_owned()),
3728 upsert: false,
3729 }],
3730 edge_retires: vec![],
3731 chunks: vec![],
3732 runs: vec![],
3733 steps: vec![],
3734 actions: vec![],
3735 optional_backfills: vec![],
3736 vec_inserts: vec![],
3737 operational_writes: vec![],
3738 })
3739 .expect("edge v1 write");
3740
3741 writer
3743 .submit(WriteRequest {
3744 label: "edge-v2".to_owned(),
3745 nodes: vec![],
3746 node_retires: vec![],
3747 edges: vec![EdgeInsert {
3748 row_id: "edge-row-2".to_owned(),
3749 logical_id: "edge-lg-1".to_owned(),
3750 source_logical_id: "node-a".to_owned(),
3751 target_logical_id: "node-b".to_owned(),
3752 kind: "HAS_TASK".to_owned(),
3753 properties: r#"{"weight":2}"#.to_owned(),
3754 source_ref: Some("src-2".to_owned()),
3755 upsert: true,
3756 }],
3757 edge_retires: vec![],
3758 chunks: vec![],
3759 runs: vec![],
3760 steps: vec![],
3761 actions: vec![],
3762 optional_backfills: vec![],
3763 vec_inserts: vec![],
3764 operational_writes: vec![],
3765 })
3766 .expect("edge v2 upsert");
3767
3768 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3769 let (active_row_id, props): (String, String) = conn
3770 .query_row(
3771 "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3772 [],
3773 |row| Ok((row.get(0)?, row.get(1)?)),
3774 )
3775 .expect("active edge");
3776 assert_eq!(active_row_id, "edge-row-2");
3777 assert!(props.contains("\"weight\":2"));
3778
3779 let superseded: i64 = conn
3780 .query_row(
3781 "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3782 [],
3783 |row| row.get(0),
3784 )
3785 .expect("superseded count");
3786 assert_eq!(superseded, 1);
3787 }
3788
3789 #[test]
3790 fn writer_fts_rows_are_written_to_database() {
3791 let db = NamedTempFile::new().expect("temporary db");
3792 let writer = WriterActor::start(
3793 db.path(),
3794 Arc::new(SchemaManager::new()),
3795 ProvenanceMode::Warn,
3796 Arc::new(TelemetryCounters::default()),
3797 )
3798 .expect("writer");
3799
3800 writer
3801 .submit(WriteRequest {
3802 label: "seed".to_owned(),
3803 nodes: vec![NodeInsert {
3804 row_id: "row-1".to_owned(),
3805 logical_id: "logical-1".to_owned(),
3806 kind: "Meeting".to_owned(),
3807 properties: "{}".to_owned(),
3808 source_ref: Some("src-1".to_owned()),
3809 upsert: false,
3810 chunk_policy: ChunkPolicy::Preserve,
3811 content_ref: None,
3812 }],
3813 node_retires: vec![],
3814 edges: vec![],
3815 edge_retires: vec![],
3816 chunks: vec![ChunkInsert {
3817 id: "chunk-1".to_owned(),
3818 node_logical_id: "logical-1".to_owned(),
3819 text_content: "budget discussion".to_owned(),
3820 byte_start: None,
3821 byte_end: None,
3822 content_hash: None,
3823 }],
3824 runs: vec![],
3825 steps: vec![],
3826 actions: vec![],
3827 optional_backfills: vec![],
3828 vec_inserts: vec![],
3829 operational_writes: vec![],
3830 })
3831 .expect("write receipt");
3832
3833 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3834 let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3835 conn.query_row(
3836 "SELECT chunk_id, node_logical_id, kind, text_content \
3837 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3838 [],
3839 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3840 )
3841 .expect("fts row");
3842 assert_eq!(chunk_id, "chunk-1");
3843 assert_eq!(node_logical_id, "logical-1");
3844 assert_eq!(kind, "Meeting");
3845 assert_eq!(text_content, "budget discussion");
3846 }
3847
3848 #[test]
3849 fn writer_receipt_warns_on_nodes_without_source_ref() {
3850 let db = NamedTempFile::new().expect("temporary db");
3851 let writer = WriterActor::start(
3852 db.path(),
3853 Arc::new(SchemaManager::new()),
3854 ProvenanceMode::Warn,
3855 Arc::new(TelemetryCounters::default()),
3856 )
3857 .expect("writer");
3858
3859 let receipt = writer
3860 .submit(WriteRequest {
3861 label: "no-source".to_owned(),
3862 nodes: vec![NodeInsert {
3863 row_id: "row-1".to_owned(),
3864 logical_id: "logical-1".to_owned(),
3865 kind: "Meeting".to_owned(),
3866 properties: "{}".to_owned(),
3867 source_ref: None,
3868 upsert: false,
3869 chunk_policy: ChunkPolicy::Preserve,
3870 content_ref: None,
3871 }],
3872 node_retires: vec![],
3873 edges: vec![],
3874 edge_retires: vec![],
3875 chunks: vec![],
3876 runs: vec![],
3877 steps: vec![],
3878 actions: vec![],
3879 optional_backfills: vec![],
3880 vec_inserts: vec![],
3881 operational_writes: vec![],
3882 })
3883 .expect("write receipt");
3884
3885 assert_eq!(receipt.provenance_warnings.len(), 1);
3886 assert!(receipt.provenance_warnings[0].contains("logical-1"));
3887 }
3888
3889 #[test]
3890 fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3891 let db = NamedTempFile::new().expect("temporary db");
3892 let writer = WriterActor::start(
3893 db.path(),
3894 Arc::new(SchemaManager::new()),
3895 ProvenanceMode::Warn,
3896 Arc::new(TelemetryCounters::default()),
3897 )
3898 .expect("writer");
3899
3900 let receipt = writer
3901 .submit(WriteRequest {
3902 label: "with-source".to_owned(),
3903 nodes: vec![NodeInsert {
3904 row_id: "row-1".to_owned(),
3905 logical_id: "logical-1".to_owned(),
3906 kind: "Meeting".to_owned(),
3907 properties: "{}".to_owned(),
3908 source_ref: Some("src-1".to_owned()),
3909 upsert: false,
3910 chunk_policy: ChunkPolicy::Preserve,
3911 content_ref: None,
3912 }],
3913 node_retires: vec![],
3914 edges: vec![],
3915 edge_retires: vec![],
3916 chunks: vec![],
3917 runs: vec![],
3918 steps: vec![],
3919 actions: vec![],
3920 optional_backfills: vec![],
3921 vec_inserts: vec![],
3922 operational_writes: vec![],
3923 })
3924 .expect("write receipt");
3925
3926 assert!(receipt.provenance_warnings.is_empty());
3927 }
3928
3929 #[test]
3930 fn writer_accepts_chunk_for_pre_existing_node() {
3931 let db = NamedTempFile::new().expect("temporary db");
3932 let writer = WriterActor::start(
3933 db.path(),
3934 Arc::new(SchemaManager::new()),
3935 ProvenanceMode::Warn,
3936 Arc::new(TelemetryCounters::default()),
3937 )
3938 .expect("writer");
3939
3940 writer
3942 .submit(WriteRequest {
3943 label: "r1".to_owned(),
3944 nodes: vec![NodeInsert {
3945 row_id: "row-1".to_owned(),
3946 logical_id: "logical-1".to_owned(),
3947 kind: "Meeting".to_owned(),
3948 properties: "{}".to_owned(),
3949 source_ref: Some("src-1".to_owned()),
3950 upsert: false,
3951 chunk_policy: ChunkPolicy::Preserve,
3952 content_ref: None,
3953 }],
3954 node_retires: vec![],
3955 edges: vec![],
3956 edge_retires: vec![],
3957 chunks: vec![],
3958 runs: vec![],
3959 steps: vec![],
3960 actions: vec![],
3961 optional_backfills: vec![],
3962 vec_inserts: vec![],
3963 operational_writes: vec![],
3964 })
3965 .expect("r1 write");
3966
3967 writer
3969 .submit(WriteRequest {
3970 label: "r2".to_owned(),
3971 nodes: vec![],
3972 node_retires: vec![],
3973 edges: vec![],
3974 edge_retires: vec![],
3975 chunks: vec![ChunkInsert {
3976 id: "chunk-1".to_owned(),
3977 node_logical_id: "logical-1".to_owned(),
3978 text_content: "budget discussion".to_owned(),
3979 byte_start: None,
3980 byte_end: None,
3981 content_hash: None,
3982 }],
3983 runs: vec![],
3984 steps: vec![],
3985 actions: vec![],
3986 optional_backfills: vec![],
3987 vec_inserts: vec![],
3988 operational_writes: vec![],
3989 })
3990 .expect("r2 write — chunk for pre-existing node");
3991
3992 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3993 let count: i64 = conn
3994 .query_row(
3995 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3996 [],
3997 |row| row.get(0),
3998 )
3999 .expect("fts count");
4000 assert_eq!(
4001 count, 1,
4002 "FTS row must exist for chunk attached to pre-existing node"
4003 );
4004 }
4005
4006 #[test]
4007 fn writer_rejects_chunk_for_completely_unknown_node() {
4008 let db = NamedTempFile::new().expect("temporary db");
4009 let writer = WriterActor::start(
4010 db.path(),
4011 Arc::new(SchemaManager::new()),
4012 ProvenanceMode::Warn,
4013 Arc::new(TelemetryCounters::default()),
4014 )
4015 .expect("writer");
4016
4017 let result = writer.submit(WriteRequest {
4018 label: "bad".to_owned(),
4019 nodes: vec![],
4020 node_retires: vec![],
4021 edges: vec![],
4022 edge_retires: vec![],
4023 chunks: vec![ChunkInsert {
4024 id: "chunk-1".to_owned(),
4025 node_logical_id: "nonexistent".to_owned(),
4026 text_content: "some text".to_owned(),
4027 byte_start: None,
4028 byte_end: None,
4029 content_hash: None,
4030 }],
4031 runs: vec![],
4032 steps: vec![],
4033 actions: vec![],
4034 optional_backfills: vec![],
4035 vec_inserts: vec![],
4036 operational_writes: vec![],
4037 });
4038
4039 assert!(
4040 matches!(result, Err(EngineError::InvalidWrite(_))),
4041 "completely unknown node must return InvalidWrite"
4042 );
4043 }
4044
4045 #[test]
4046 fn writer_executes_typed_nodes_chunks_and_derived_projections() {
4047 let db = NamedTempFile::new().expect("temporary db");
4048 let writer = WriterActor::start(
4049 db.path(),
4050 Arc::new(SchemaManager::new()),
4051 ProvenanceMode::Warn,
4052 Arc::new(TelemetryCounters::default()),
4053 )
4054 .expect("writer");
4055
4056 let receipt = writer
4057 .submit(WriteRequest {
4058 label: "seed".to_owned(),
4059 nodes: vec![NodeInsert {
4060 row_id: "row-1".to_owned(),
4061 logical_id: "logical-1".to_owned(),
4062 kind: "Meeting".to_owned(),
4063 properties: "{}".to_owned(),
4064 source_ref: None,
4065 upsert: false,
4066 chunk_policy: ChunkPolicy::Preserve,
4067 content_ref: None,
4068 }],
4069 node_retires: vec![],
4070 edges: vec![],
4071 edge_retires: vec![],
4072 chunks: vec![ChunkInsert {
4073 id: "chunk-1".to_owned(),
4074 node_logical_id: "logical-1".to_owned(),
4075 text_content: "budget discussion".to_owned(),
4076 byte_start: None,
4077 byte_end: None,
4078 content_hash: None,
4079 }],
4080 runs: vec![],
4081 steps: vec![],
4082 actions: vec![],
4083 optional_backfills: vec![],
4084 vec_inserts: vec![],
4085 operational_writes: vec![],
4086 })
4087 .expect("write receipt");
4088
4089 assert_eq!(receipt.label, "seed");
4090 }
4091
4092 #[test]
4093 fn writer_node_retire_supersedes_active_node() {
4094 let db = NamedTempFile::new().expect("temporary db");
4095 let writer = WriterActor::start(
4096 db.path(),
4097 Arc::new(SchemaManager::new()),
4098 ProvenanceMode::Warn,
4099 Arc::new(TelemetryCounters::default()),
4100 )
4101 .expect("writer");
4102
4103 writer
4104 .submit(WriteRequest {
4105 label: "seed".to_owned(),
4106 nodes: vec![NodeInsert {
4107 row_id: "row-1".to_owned(),
4108 logical_id: "meeting-1".to_owned(),
4109 kind: "Meeting".to_owned(),
4110 properties: "{}".to_owned(),
4111 source_ref: Some("src-1".to_owned()),
4112 upsert: false,
4113 chunk_policy: ChunkPolicy::Preserve,
4114 content_ref: None,
4115 }],
4116 node_retires: vec![],
4117 edges: vec![],
4118 edge_retires: vec![],
4119 chunks: vec![],
4120 runs: vec![],
4121 steps: vec![],
4122 actions: vec![],
4123 optional_backfills: vec![],
4124 vec_inserts: vec![],
4125 operational_writes: vec![],
4126 })
4127 .expect("seed write");
4128
4129 writer
4130 .submit(WriteRequest {
4131 label: "retire".to_owned(),
4132 nodes: vec![],
4133 node_retires: vec![NodeRetire {
4134 logical_id: "meeting-1".to_owned(),
4135 source_ref: Some("src-2".to_owned()),
4136 }],
4137 edges: vec![],
4138 edge_retires: vec![],
4139 chunks: vec![],
4140 runs: vec![],
4141 steps: vec![],
4142 actions: vec![],
4143 optional_backfills: vec![],
4144 vec_inserts: vec![],
4145 operational_writes: vec![],
4146 })
4147 .expect("retire write");
4148
4149 let conn = rusqlite::Connection::open(db.path()).expect("open");
4150 let active: i64 = conn
4151 .query_row(
4152 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
4153 [],
4154 |r| r.get(0),
4155 )
4156 .expect("count active");
4157 let historical: i64 = conn
4158 .query_row(
4159 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
4160 [],
4161 |r| r.get(0),
4162 )
4163 .expect("count historical");
4164
4165 assert_eq!(active, 0, "active count must be 0 after retire");
4166 assert_eq!(historical, 1, "historical count must be 1 after retire");
4167 }
4168
4169 #[test]
4170 fn writer_node_retire_preserves_chunks_and_clears_fts() {
4171 let db = NamedTempFile::new().expect("temporary db");
4172 let writer = WriterActor::start(
4173 db.path(),
4174 Arc::new(SchemaManager::new()),
4175 ProvenanceMode::Warn,
4176 Arc::new(TelemetryCounters::default()),
4177 )
4178 .expect("writer");
4179
4180 writer
4181 .submit(WriteRequest {
4182 label: "seed".to_owned(),
4183 nodes: vec![NodeInsert {
4184 row_id: "row-1".to_owned(),
4185 logical_id: "meeting-1".to_owned(),
4186 kind: "Meeting".to_owned(),
4187 properties: "{}".to_owned(),
4188 source_ref: Some("src-1".to_owned()),
4189 upsert: false,
4190 chunk_policy: ChunkPolicy::Preserve,
4191 content_ref: None,
4192 }],
4193 node_retires: vec![],
4194 edges: vec![],
4195 edge_retires: vec![],
4196 chunks: vec![ChunkInsert {
4197 id: "chunk-1".to_owned(),
4198 node_logical_id: "meeting-1".to_owned(),
4199 text_content: "budget discussion".to_owned(),
4200 byte_start: None,
4201 byte_end: None,
4202 content_hash: None,
4203 }],
4204 runs: vec![],
4205 steps: vec![],
4206 actions: vec![],
4207 optional_backfills: vec![],
4208 vec_inserts: vec![],
4209 operational_writes: vec![],
4210 })
4211 .expect("seed write");
4212
4213 writer
4214 .submit(WriteRequest {
4215 label: "retire".to_owned(),
4216 nodes: vec![],
4217 node_retires: vec![NodeRetire {
4218 logical_id: "meeting-1".to_owned(),
4219 source_ref: Some("src-2".to_owned()),
4220 }],
4221 edges: vec![],
4222 edge_retires: vec![],
4223 chunks: vec![],
4224 runs: vec![],
4225 steps: vec![],
4226 actions: vec![],
4227 optional_backfills: vec![],
4228 vec_inserts: vec![],
4229 operational_writes: vec![],
4230 })
4231 .expect("retire write");
4232
4233 let conn = rusqlite::Connection::open(db.path()).expect("open");
4234 let chunk_count: i64 = conn
4235 .query_row(
4236 "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
4237 [],
4238 |r| r.get(0),
4239 )
4240 .expect("chunk count");
4241 let fts_count: i64 = conn
4242 .query_row(
4243 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
4244 [],
4245 |r| r.get(0),
4246 )
4247 .expect("fts count");
4248
4249 assert_eq!(
4250 chunk_count, 1,
4251 "chunks must remain after node retire so restore can re-establish content"
4252 );
4253 assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
4254 }
4255
4256 #[test]
4257 fn writer_edge_retire_supersedes_active_edge() {
4258 let db = NamedTempFile::new().expect("temporary db");
4259 let writer = WriterActor::start(
4260 db.path(),
4261 Arc::new(SchemaManager::new()),
4262 ProvenanceMode::Warn,
4263 Arc::new(TelemetryCounters::default()),
4264 )
4265 .expect("writer");
4266
4267 writer
4268 .submit(WriteRequest {
4269 label: "seed".to_owned(),
4270 nodes: vec![
4271 NodeInsert {
4272 row_id: "row-a".to_owned(),
4273 logical_id: "node-a".to_owned(),
4274 kind: "Meeting".to_owned(),
4275 properties: "{}".to_owned(),
4276 source_ref: Some("src-1".to_owned()),
4277 upsert: false,
4278 chunk_policy: ChunkPolicy::Preserve,
4279 content_ref: None,
4280 },
4281 NodeInsert {
4282 row_id: "row-b".to_owned(),
4283 logical_id: "node-b".to_owned(),
4284 kind: "Task".to_owned(),
4285 properties: "{}".to_owned(),
4286 source_ref: Some("src-1".to_owned()),
4287 upsert: false,
4288 chunk_policy: ChunkPolicy::Preserve,
4289 content_ref: None,
4290 },
4291 ],
4292 node_retires: vec![],
4293 edges: vec![EdgeInsert {
4294 row_id: "edge-1".to_owned(),
4295 logical_id: "edge-lg-1".to_owned(),
4296 source_logical_id: "node-a".to_owned(),
4297 target_logical_id: "node-b".to_owned(),
4298 kind: "HAS_TASK".to_owned(),
4299 properties: "{}".to_owned(),
4300 source_ref: Some("src-1".to_owned()),
4301 upsert: false,
4302 }],
4303 edge_retires: vec![],
4304 chunks: vec![],
4305 runs: vec![],
4306 steps: vec![],
4307 actions: vec![],
4308 optional_backfills: vec![],
4309 vec_inserts: vec![],
4310 operational_writes: vec![],
4311 })
4312 .expect("seed write");
4313
4314 writer
4315 .submit(WriteRequest {
4316 label: "retire-edge".to_owned(),
4317 nodes: vec![],
4318 node_retires: vec![],
4319 edges: vec![],
4320 edge_retires: vec![EdgeRetire {
4321 logical_id: "edge-lg-1".to_owned(),
4322 source_ref: Some("src-2".to_owned()),
4323 }],
4324 chunks: vec![],
4325 runs: vec![],
4326 steps: vec![],
4327 actions: vec![],
4328 optional_backfills: vec![],
4329 vec_inserts: vec![],
4330 operational_writes: vec![],
4331 })
4332 .expect("retire edge write");
4333
4334 let conn = rusqlite::Connection::open(db.path()).expect("open");
4335 let active: i64 = conn
4336 .query_row(
4337 "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
4338 [],
4339 |r| r.get(0),
4340 )
4341 .expect("active edge count");
4342
4343 assert_eq!(active, 0, "active edge count must be 0 after retire");
4344 }
4345
4346 #[test]
4347 fn writer_retire_without_source_ref_emits_provenance_warning() {
4348 let db = NamedTempFile::new().expect("temporary db");
4349 let writer = WriterActor::start(
4350 db.path(),
4351 Arc::new(SchemaManager::new()),
4352 ProvenanceMode::Warn,
4353 Arc::new(TelemetryCounters::default()),
4354 )
4355 .expect("writer");
4356
4357 writer
4358 .submit(WriteRequest {
4359 label: "seed".to_owned(),
4360 nodes: vec![NodeInsert {
4361 row_id: "row-1".to_owned(),
4362 logical_id: "meeting-1".to_owned(),
4363 kind: "Meeting".to_owned(),
4364 properties: "{}".to_owned(),
4365 source_ref: Some("src-1".to_owned()),
4366 upsert: false,
4367 chunk_policy: ChunkPolicy::Preserve,
4368 content_ref: None,
4369 }],
4370 node_retires: vec![],
4371 edges: vec![],
4372 edge_retires: vec![],
4373 chunks: vec![],
4374 runs: vec![],
4375 steps: vec![],
4376 actions: vec![],
4377 optional_backfills: vec![],
4378 vec_inserts: vec![],
4379 operational_writes: vec![],
4380 })
4381 .expect("seed write");
4382
4383 let receipt = writer
4384 .submit(WriteRequest {
4385 label: "retire-no-src".to_owned(),
4386 nodes: vec![],
4387 node_retires: vec![NodeRetire {
4388 logical_id: "meeting-1".to_owned(),
4389 source_ref: None,
4390 }],
4391 edges: vec![],
4392 edge_retires: vec![],
4393 chunks: vec![],
4394 runs: vec![],
4395 steps: vec![],
4396 actions: vec![],
4397 optional_backfills: vec![],
4398 vec_inserts: vec![],
4399 operational_writes: vec![],
4400 })
4401 .expect("retire write");
4402
4403 assert!(
4404 !receipt.provenance_warnings.is_empty(),
4405 "retire without source_ref must emit a provenance warning"
4406 );
4407 }
4408
4409 #[test]
4410 #[allow(clippy::too_many_lines)]
4411 fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
4412 let db = NamedTempFile::new().expect("temporary db");
4413 let writer = WriterActor::start(
4414 db.path(),
4415 Arc::new(SchemaManager::new()),
4416 ProvenanceMode::Warn,
4417 Arc::new(TelemetryCounters::default()),
4418 )
4419 .expect("writer");
4420
4421 writer
4422 .submit(WriteRequest {
4423 label: "v1".to_owned(),
4424 nodes: vec![NodeInsert {
4425 row_id: "row-1".to_owned(),
4426 logical_id: "meeting-1".to_owned(),
4427 kind: "Meeting".to_owned(),
4428 properties: "{}".to_owned(),
4429 source_ref: Some("src-1".to_owned()),
4430 upsert: false,
4431 chunk_policy: ChunkPolicy::Preserve,
4432 content_ref: None,
4433 }],
4434 node_retires: vec![],
4435 edges: vec![],
4436 edge_retires: vec![],
4437 chunks: vec![ChunkInsert {
4438 id: "chunk-old".to_owned(),
4439 node_logical_id: "meeting-1".to_owned(),
4440 text_content: "old text".to_owned(),
4441 byte_start: None,
4442 byte_end: None,
4443 content_hash: None,
4444 }],
4445 runs: vec![],
4446 steps: vec![],
4447 actions: vec![],
4448 optional_backfills: vec![],
4449 vec_inserts: vec![],
4450 operational_writes: vec![],
4451 })
4452 .expect("v1 write");
4453
4454 writer
4455 .submit(WriteRequest {
4456 label: "v2".to_owned(),
4457 nodes: vec![NodeInsert {
4458 row_id: "row-2".to_owned(),
4459 logical_id: "meeting-1".to_owned(),
4460 kind: "Meeting".to_owned(),
4461 properties: "{}".to_owned(),
4462 source_ref: Some("src-2".to_owned()),
4463 upsert: true,
4464 chunk_policy: ChunkPolicy::Replace,
4465 content_ref: None,
4466 }],
4467 node_retires: vec![],
4468 edges: vec![],
4469 edge_retires: vec![],
4470 chunks: vec![ChunkInsert {
4471 id: "chunk-new".to_owned(),
4472 node_logical_id: "meeting-1".to_owned(),
4473 text_content: "new text".to_owned(),
4474 byte_start: None,
4475 byte_end: None,
4476 content_hash: None,
4477 }],
4478 runs: vec![],
4479 steps: vec![],
4480 actions: vec![],
4481 optional_backfills: vec![],
4482 vec_inserts: vec![],
4483 operational_writes: vec![],
4484 })
4485 .expect("v2 write");
4486
4487 let conn = rusqlite::Connection::open(db.path()).expect("open");
4488 let old_chunk: i64 = conn
4489 .query_row(
4490 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4491 [],
4492 |r| r.get(0),
4493 )
4494 .expect("old chunk count");
4495 let new_chunk: i64 = conn
4496 .query_row(
4497 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4498 [],
4499 |r| r.get(0),
4500 )
4501 .expect("new chunk count");
4502 let fts_old: i64 = conn
4503 .query_row(
4504 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4505 [],
4506 |r| r.get(0),
4507 )
4508 .expect("old fts count");
4509
4510 assert_eq!(
4511 old_chunk, 0,
4512 "old chunk must be deleted by ChunkPolicy::Replace"
4513 );
4514 assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4515 assert_eq!(
4516 fts_old, 0,
4517 "old FTS row must be deleted by ChunkPolicy::Replace"
4518 );
4519 }
4520
4521 #[test]
4522 fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4523 let db = NamedTempFile::new().expect("temporary db");
4524 let writer = WriterActor::start(
4525 db.path(),
4526 Arc::new(SchemaManager::new()),
4527 ProvenanceMode::Warn,
4528 Arc::new(TelemetryCounters::default()),
4529 )
4530 .expect("writer");
4531
4532 writer
4533 .submit(WriteRequest {
4534 label: "v1".to_owned(),
4535 nodes: vec![NodeInsert {
4536 row_id: "row-1".to_owned(),
4537 logical_id: "meeting-1".to_owned(),
4538 kind: "Meeting".to_owned(),
4539 properties: "{}".to_owned(),
4540 source_ref: Some("src-1".to_owned()),
4541 upsert: false,
4542 chunk_policy: ChunkPolicy::Preserve,
4543 content_ref: None,
4544 }],
4545 node_retires: vec![],
4546 edges: vec![],
4547 edge_retires: vec![],
4548 chunks: vec![ChunkInsert {
4549 id: "chunk-old".to_owned(),
4550 node_logical_id: "meeting-1".to_owned(),
4551 text_content: "old text".to_owned(),
4552 byte_start: None,
4553 byte_end: None,
4554 content_hash: None,
4555 }],
4556 runs: vec![],
4557 steps: vec![],
4558 actions: vec![],
4559 optional_backfills: vec![],
4560 vec_inserts: vec![],
4561 operational_writes: vec![],
4562 })
4563 .expect("v1 write");
4564
4565 writer
4566 .submit(WriteRequest {
4567 label: "v2-props-only".to_owned(),
4568 nodes: vec![NodeInsert {
4569 row_id: "row-2".to_owned(),
4570 logical_id: "meeting-1".to_owned(),
4571 kind: "Meeting".to_owned(),
4572 properties: r#"{"status":"updated"}"#.to_owned(),
4573 source_ref: Some("src-2".to_owned()),
4574 upsert: true,
4575 chunk_policy: ChunkPolicy::Preserve,
4576 content_ref: None,
4577 }],
4578 node_retires: vec![],
4579 edges: vec![],
4580 edge_retires: vec![],
4581 chunks: vec![],
4582 runs: vec![],
4583 steps: vec![],
4584 actions: vec![],
4585 optional_backfills: vec![],
4586 vec_inserts: vec![],
4587 operational_writes: vec![],
4588 })
4589 .expect("v2 preserve write");
4590
4591 let conn = rusqlite::Connection::open(db.path()).expect("open");
4592 let old_chunk: i64 = conn
4593 .query_row(
4594 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4595 [],
4596 |r| r.get(0),
4597 )
4598 .expect("old chunk count");
4599
4600 assert_eq!(
4601 old_chunk, 1,
4602 "old chunk must be preserved by ChunkPolicy::Preserve"
4603 );
4604 }
4605
4606 #[test]
4607 fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4608 let db = NamedTempFile::new().expect("temporary db");
4609 let writer = WriterActor::start(
4610 db.path(),
4611 Arc::new(SchemaManager::new()),
4612 ProvenanceMode::Warn,
4613 Arc::new(TelemetryCounters::default()),
4614 )
4615 .expect("writer");
4616
4617 writer
4618 .submit(WriteRequest {
4619 label: "v1".to_owned(),
4620 nodes: vec![NodeInsert {
4621 row_id: "row-1".to_owned(),
4622 logical_id: "meeting-1".to_owned(),
4623 kind: "Meeting".to_owned(),
4624 properties: "{}".to_owned(),
4625 source_ref: Some("src-1".to_owned()),
4626 upsert: false,
4627 chunk_policy: ChunkPolicy::Preserve,
4628 content_ref: None,
4629 }],
4630 node_retires: vec![],
4631 edges: vec![],
4632 edge_retires: vec![],
4633 chunks: vec![ChunkInsert {
4634 id: "chunk-existing".to_owned(),
4635 node_logical_id: "meeting-1".to_owned(),
4636 text_content: "existing text".to_owned(),
4637 byte_start: None,
4638 byte_end: None,
4639 content_hash: None,
4640 }],
4641 runs: vec![],
4642 steps: vec![],
4643 actions: vec![],
4644 optional_backfills: vec![],
4645 vec_inserts: vec![],
4646 operational_writes: vec![],
4647 })
4648 .expect("v1 write");
4649
4650 writer
4652 .submit(WriteRequest {
4653 label: "insert-no-upsert".to_owned(),
4654 nodes: vec![NodeInsert {
4655 row_id: "row-2".to_owned(),
4656 logical_id: "meeting-2".to_owned(),
4657 kind: "Meeting".to_owned(),
4658 properties: "{}".to_owned(),
4659 source_ref: Some("src-2".to_owned()),
4660 upsert: false,
4661 chunk_policy: ChunkPolicy::Replace,
4662 content_ref: None,
4663 }],
4664 node_retires: vec![],
4665 edges: vec![],
4666 edge_retires: vec![],
4667 chunks: vec![],
4668 runs: vec![],
4669 steps: vec![],
4670 actions: vec![],
4671 optional_backfills: vec![],
4672 vec_inserts: vec![],
4673 operational_writes: vec![],
4674 })
4675 .expect("insert no-upsert write");
4676
4677 let conn = rusqlite::Connection::open(db.path()).expect("open");
4678 let existing_chunk: i64 = conn
4679 .query_row(
4680 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4681 [],
4682 |r| r.get(0),
4683 )
4684 .expect("chunk count");
4685
4686 assert_eq!(
4687 existing_chunk, 1,
4688 "ChunkPolicy::Replace without upsert must not delete existing chunks"
4689 );
4690 }
4691
4692 #[test]
4693 fn writer_run_upsert_supersedes_prior_active_run() {
4694 let db = NamedTempFile::new().expect("temporary db");
4695 let writer = WriterActor::start(
4696 db.path(),
4697 Arc::new(SchemaManager::new()),
4698 ProvenanceMode::Warn,
4699 Arc::new(TelemetryCounters::default()),
4700 )
4701 .expect("writer");
4702
4703 writer
4704 .submit(WriteRequest {
4705 label: "v1".to_owned(),
4706 nodes: vec![],
4707 node_retires: vec![],
4708 edges: vec![],
4709 edge_retires: vec![],
4710 chunks: vec![],
4711 runs: vec![RunInsert {
4712 id: "run-v1".to_owned(),
4713 kind: "session".to_owned(),
4714 status: "completed".to_owned(),
4715 properties: "{}".to_owned(),
4716 source_ref: Some("src-1".to_owned()),
4717 upsert: false,
4718 supersedes_id: None,
4719 }],
4720 steps: vec![],
4721 actions: vec![],
4722 optional_backfills: vec![],
4723 vec_inserts: vec![],
4724 operational_writes: vec![],
4725 })
4726 .expect("v1 run write");
4727
4728 writer
4729 .submit(WriteRequest {
4730 label: "v2".to_owned(),
4731 nodes: vec![],
4732 node_retires: vec![],
4733 edges: vec![],
4734 edge_retires: vec![],
4735 chunks: vec![],
4736 runs: vec![RunInsert {
4737 id: "run-v2".to_owned(),
4738 kind: "session".to_owned(),
4739 status: "completed".to_owned(),
4740 properties: "{}".to_owned(),
4741 source_ref: Some("src-2".to_owned()),
4742 upsert: true,
4743 supersedes_id: Some("run-v1".to_owned()),
4744 }],
4745 steps: vec![],
4746 actions: vec![],
4747 optional_backfills: vec![],
4748 vec_inserts: vec![],
4749 operational_writes: vec![],
4750 })
4751 .expect("v2 run write");
4752
4753 let conn = rusqlite::Connection::open(db.path()).expect("open");
4754 let v1_historical: i64 = conn
4755 .query_row(
4756 "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4757 [],
4758 |r| r.get(0),
4759 )
4760 .expect("v1 historical count");
4761 let v2_active: i64 = conn
4762 .query_row(
4763 "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4764 [],
4765 |r| r.get(0),
4766 )
4767 .expect("v2 active count");
4768
4769 assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4770 assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4771 }
4772
4773 #[test]
4774 fn writer_step_upsert_supersedes_prior_active_step() {
4775 let db = NamedTempFile::new().expect("temporary db");
4776 let writer = WriterActor::start(
4777 db.path(),
4778 Arc::new(SchemaManager::new()),
4779 ProvenanceMode::Warn,
4780 Arc::new(TelemetryCounters::default()),
4781 )
4782 .expect("writer");
4783
4784 writer
4785 .submit(WriteRequest {
4786 label: "v1".to_owned(),
4787 nodes: vec![],
4788 node_retires: vec![],
4789 edges: vec![],
4790 edge_retires: vec![],
4791 chunks: vec![],
4792 runs: vec![RunInsert {
4793 id: "run-1".to_owned(),
4794 kind: "session".to_owned(),
4795 status: "completed".to_owned(),
4796 properties: "{}".to_owned(),
4797 source_ref: Some("src-1".to_owned()),
4798 upsert: false,
4799 supersedes_id: None,
4800 }],
4801 steps: vec![StepInsert {
4802 id: "step-v1".to_owned(),
4803 run_id: "run-1".to_owned(),
4804 kind: "llm".to_owned(),
4805 status: "completed".to_owned(),
4806 properties: "{}".to_owned(),
4807 source_ref: Some("src-1".to_owned()),
4808 upsert: false,
4809 supersedes_id: None,
4810 }],
4811 actions: vec![],
4812 optional_backfills: vec![],
4813 vec_inserts: vec![],
4814 operational_writes: vec![],
4815 })
4816 .expect("v1 step write");
4817
4818 writer
4819 .submit(WriteRequest {
4820 label: "v2".to_owned(),
4821 nodes: vec![],
4822 node_retires: vec![],
4823 edges: vec![],
4824 edge_retires: vec![],
4825 chunks: vec![],
4826 runs: vec![],
4827 steps: vec![StepInsert {
4828 id: "step-v2".to_owned(),
4829 run_id: "run-1".to_owned(),
4830 kind: "llm".to_owned(),
4831 status: "completed".to_owned(),
4832 properties: "{}".to_owned(),
4833 source_ref: Some("src-2".to_owned()),
4834 upsert: true,
4835 supersedes_id: Some("step-v1".to_owned()),
4836 }],
4837 actions: vec![],
4838 optional_backfills: vec![],
4839 vec_inserts: vec![],
4840 operational_writes: vec![],
4841 })
4842 .expect("v2 step write");
4843
4844 let conn = rusqlite::Connection::open(db.path()).expect("open");
4845 let v1_historical: i64 = conn
4846 .query_row(
4847 "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4848 [],
4849 |r| r.get(0),
4850 )
4851 .expect("v1 historical count");
4852 let v2_active: i64 = conn
4853 .query_row(
4854 "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4855 [],
4856 |r| r.get(0),
4857 )
4858 .expect("v2 active count");
4859
4860 assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4861 assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4862 }
4863
4864 #[test]
4865 fn writer_action_upsert_supersedes_prior_active_action() {
4866 let db = NamedTempFile::new().expect("temporary db");
4867 let writer = WriterActor::start(
4868 db.path(),
4869 Arc::new(SchemaManager::new()),
4870 ProvenanceMode::Warn,
4871 Arc::new(TelemetryCounters::default()),
4872 )
4873 .expect("writer");
4874
4875 writer
4876 .submit(WriteRequest {
4877 label: "v1".to_owned(),
4878 nodes: vec![],
4879 node_retires: vec![],
4880 edges: vec![],
4881 edge_retires: vec![],
4882 chunks: vec![],
4883 runs: vec![RunInsert {
4884 id: "run-1".to_owned(),
4885 kind: "session".to_owned(),
4886 status: "completed".to_owned(),
4887 properties: "{}".to_owned(),
4888 source_ref: Some("src-1".to_owned()),
4889 upsert: false,
4890 supersedes_id: None,
4891 }],
4892 steps: vec![StepInsert {
4893 id: "step-1".to_owned(),
4894 run_id: "run-1".to_owned(),
4895 kind: "llm".to_owned(),
4896 status: "completed".to_owned(),
4897 properties: "{}".to_owned(),
4898 source_ref: Some("src-1".to_owned()),
4899 upsert: false,
4900 supersedes_id: None,
4901 }],
4902 actions: vec![ActionInsert {
4903 id: "action-v1".to_owned(),
4904 step_id: "step-1".to_owned(),
4905 kind: "emit".to_owned(),
4906 status: "completed".to_owned(),
4907 properties: "{}".to_owned(),
4908 source_ref: Some("src-1".to_owned()),
4909 upsert: false,
4910 supersedes_id: None,
4911 }],
4912 optional_backfills: vec![],
4913 vec_inserts: vec![],
4914 operational_writes: vec![],
4915 })
4916 .expect("v1 action write");
4917
4918 writer
4919 .submit(WriteRequest {
4920 label: "v2".to_owned(),
4921 nodes: vec![],
4922 node_retires: vec![],
4923 edges: vec![],
4924 edge_retires: vec![],
4925 chunks: vec![],
4926 runs: vec![],
4927 steps: vec![],
4928 actions: vec![ActionInsert {
4929 id: "action-v2".to_owned(),
4930 step_id: "step-1".to_owned(),
4931 kind: "emit".to_owned(),
4932 status: "completed".to_owned(),
4933 properties: "{}".to_owned(),
4934 source_ref: Some("src-2".to_owned()),
4935 upsert: true,
4936 supersedes_id: Some("action-v1".to_owned()),
4937 }],
4938 optional_backfills: vec![],
4939 vec_inserts: vec![],
4940 operational_writes: vec![],
4941 })
4942 .expect("v2 action write");
4943
4944 let conn = rusqlite::Connection::open(db.path()).expect("open");
4945 let v1_historical: i64 = conn
4946 .query_row(
4947 "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4948 [],
4949 |r| r.get(0),
4950 )
4951 .expect("v1 historical count");
4952 let v2_active: i64 = conn
4953 .query_row(
4954 "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4955 [],
4956 |r| r.get(0),
4957 )
4958 .expect("v2 active count");
4959
4960 assert_eq!(
4961 v1_historical, 1,
4962 "action-v1 must be historical after upsert"
4963 );
4964 assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4965 }
4966
4967 #[test]
4970 fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4971 let db = NamedTempFile::new().expect("temporary db");
4972 let writer = WriterActor::start(
4973 db.path(),
4974 Arc::new(SchemaManager::new()),
4975 ProvenanceMode::Warn,
4976 Arc::new(TelemetryCounters::default()),
4977 )
4978 .expect("writer");
4979
4980 let result = writer.submit(WriteRequest {
4981 label: "bad".to_owned(),
4982 nodes: vec![],
4983 node_retires: vec![],
4984 edges: vec![],
4985 edge_retires: vec![],
4986 chunks: vec![],
4987 runs: vec![RunInsert {
4988 id: "run-1".to_owned(),
4989 kind: "session".to_owned(),
4990 status: "completed".to_owned(),
4991 properties: "{}".to_owned(),
4992 source_ref: None,
4993 upsert: true,
4994 supersedes_id: None,
4995 }],
4996 steps: vec![],
4997 actions: vec![],
4998 optional_backfills: vec![],
4999 vec_inserts: vec![],
5000 operational_writes: vec![],
5001 });
5002
5003 assert!(
5004 matches!(result, Err(EngineError::InvalidWrite(_))),
5005 "run upsert=true without supersedes_id must return InvalidWrite"
5006 );
5007 }
5008
5009 #[test]
5010 fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
5011 let db = NamedTempFile::new().expect("temporary db");
5012 let writer = WriterActor::start(
5013 db.path(),
5014 Arc::new(SchemaManager::new()),
5015 ProvenanceMode::Warn,
5016 Arc::new(TelemetryCounters::default()),
5017 )
5018 .expect("writer");
5019
5020 let result = writer.submit(WriteRequest {
5021 label: "bad".to_owned(),
5022 nodes: vec![],
5023 node_retires: vec![],
5024 edges: vec![],
5025 edge_retires: vec![],
5026 chunks: vec![],
5027 runs: vec![],
5028 steps: vec![StepInsert {
5029 id: "step-1".to_owned(),
5030 run_id: "run-1".to_owned(),
5031 kind: "llm".to_owned(),
5032 status: "completed".to_owned(),
5033 properties: "{}".to_owned(),
5034 source_ref: None,
5035 upsert: true,
5036 supersedes_id: None,
5037 }],
5038 actions: vec![],
5039 optional_backfills: vec![],
5040 vec_inserts: vec![],
5041 operational_writes: vec![],
5042 });
5043
5044 assert!(
5045 matches!(result, Err(EngineError::InvalidWrite(_))),
5046 "step upsert=true without supersedes_id must return InvalidWrite"
5047 );
5048 }
5049
5050 #[test]
5051 fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
5052 let db = NamedTempFile::new().expect("temporary db");
5053 let writer = WriterActor::start(
5054 db.path(),
5055 Arc::new(SchemaManager::new()),
5056 ProvenanceMode::Warn,
5057 Arc::new(TelemetryCounters::default()),
5058 )
5059 .expect("writer");
5060
5061 let result = writer.submit(WriteRequest {
5062 label: "bad".to_owned(),
5063 nodes: vec![],
5064 node_retires: vec![],
5065 edges: vec![],
5066 edge_retires: vec![],
5067 chunks: vec![],
5068 runs: vec![],
5069 steps: vec![],
5070 actions: vec![ActionInsert {
5071 id: "action-1".to_owned(),
5072 step_id: "step-1".to_owned(),
5073 kind: "emit".to_owned(),
5074 status: "completed".to_owned(),
5075 properties: "{}".to_owned(),
5076 source_ref: None,
5077 upsert: true,
5078 supersedes_id: None,
5079 }],
5080 optional_backfills: vec![],
5081 vec_inserts: vec![],
5082 operational_writes: vec![],
5083 });
5084
5085 assert!(
5086 matches!(result, Err(EngineError::InvalidWrite(_))),
5087 "action upsert=true without supersedes_id must return InvalidWrite"
5088 );
5089 }
5090
5091 #[test]
5094 fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
5095 let db = NamedTempFile::new().expect("temporary db");
5096 let writer = WriterActor::start(
5097 db.path(),
5098 Arc::new(SchemaManager::new()),
5099 ProvenanceMode::Warn,
5100 Arc::new(TelemetryCounters::default()),
5101 )
5102 .expect("writer");
5103
5104 let receipt = writer
5105 .submit(WriteRequest {
5106 label: "test".to_owned(),
5107 nodes: vec![
5108 NodeInsert {
5109 row_id: "row-a".to_owned(),
5110 logical_id: "node-a".to_owned(),
5111 kind: "Meeting".to_owned(),
5112 properties: "{}".to_owned(),
5113 source_ref: Some("src-1".to_owned()),
5114 upsert: false,
5115 chunk_policy: ChunkPolicy::Preserve,
5116 content_ref: None,
5117 },
5118 NodeInsert {
5119 row_id: "row-b".to_owned(),
5120 logical_id: "node-b".to_owned(),
5121 kind: "Task".to_owned(),
5122 properties: "{}".to_owned(),
5123 source_ref: Some("src-1".to_owned()),
5124 upsert: false,
5125 chunk_policy: ChunkPolicy::Preserve,
5126 content_ref: None,
5127 },
5128 ],
5129 node_retires: vec![],
5130 edges: vec![EdgeInsert {
5131 row_id: "edge-1".to_owned(),
5132 logical_id: "edge-lg-1".to_owned(),
5133 source_logical_id: "node-a".to_owned(),
5134 target_logical_id: "node-b".to_owned(),
5135 kind: "HAS_TASK".to_owned(),
5136 properties: "{}".to_owned(),
5137 source_ref: None,
5138 upsert: false,
5139 }],
5140 edge_retires: vec![],
5141 chunks: vec![],
5142 runs: vec![],
5143 steps: vec![],
5144 actions: vec![],
5145 optional_backfills: vec![],
5146 vec_inserts: vec![],
5147 operational_writes: vec![],
5148 })
5149 .expect("write");
5150
5151 assert!(
5152 !receipt.provenance_warnings.is_empty(),
5153 "edge insert without source_ref must emit a provenance warning"
5154 );
5155 }
5156
5157 #[test]
5158 fn writer_run_insert_without_source_ref_emits_provenance_warning() {
5159 let db = NamedTempFile::new().expect("temporary db");
5160 let writer = WriterActor::start(
5161 db.path(),
5162 Arc::new(SchemaManager::new()),
5163 ProvenanceMode::Warn,
5164 Arc::new(TelemetryCounters::default()),
5165 )
5166 .expect("writer");
5167
5168 let receipt = writer
5169 .submit(WriteRequest {
5170 label: "test".to_owned(),
5171 nodes: vec![],
5172 node_retires: vec![],
5173 edges: vec![],
5174 edge_retires: vec![],
5175 chunks: vec![],
5176 runs: vec![RunInsert {
5177 id: "run-1".to_owned(),
5178 kind: "session".to_owned(),
5179 status: "completed".to_owned(),
5180 properties: "{}".to_owned(),
5181 source_ref: None,
5182 upsert: false,
5183 supersedes_id: None,
5184 }],
5185 steps: vec![],
5186 actions: vec![],
5187 optional_backfills: vec![],
5188 vec_inserts: vec![],
5189 operational_writes: vec![],
5190 })
5191 .expect("write");
5192
5193 assert!(
5194 !receipt.provenance_warnings.is_empty(),
5195 "run insert without source_ref must emit a provenance warning"
5196 );
5197 }
5198
5199 #[test]
5202 fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
5203 let db = NamedTempFile::new().expect("temporary db");
5204 let writer = WriterActor::start(
5205 db.path(),
5206 Arc::new(SchemaManager::new()),
5207 ProvenanceMode::Warn,
5208 Arc::new(TelemetryCounters::default()),
5209 )
5210 .expect("writer");
5211
5212 writer
5214 .submit(WriteRequest {
5215 label: "seed".to_owned(),
5216 nodes: vec![NodeInsert {
5217 row_id: "row-1".to_owned(),
5218 logical_id: "meeting-1".to_owned(),
5219 kind: "Meeting".to_owned(),
5220 properties: "{}".to_owned(),
5221 source_ref: Some("src-1".to_owned()),
5222 upsert: false,
5223 chunk_policy: ChunkPolicy::Preserve,
5224 content_ref: None,
5225 }],
5226 node_retires: vec![],
5227 edges: vec![],
5228 edge_retires: vec![],
5229 chunks: vec![],
5230 runs: vec![],
5231 steps: vec![],
5232 actions: vec![],
5233 optional_backfills: vec![],
5234 vec_inserts: vec![],
5235 operational_writes: vec![],
5236 })
5237 .expect("seed write");
5238
5239 let result = writer.submit(WriteRequest {
5241 label: "bad".to_owned(),
5242 nodes: vec![],
5243 node_retires: vec![NodeRetire {
5244 logical_id: "meeting-1".to_owned(),
5245 source_ref: Some("src-2".to_owned()),
5246 }],
5247 edges: vec![],
5248 edge_retires: vec![],
5249 chunks: vec![ChunkInsert {
5250 id: "chunk-bad".to_owned(),
5251 node_logical_id: "meeting-1".to_owned(),
5252 text_content: "some text".to_owned(),
5253 byte_start: None,
5254 byte_end: None,
5255 content_hash: None,
5256 }],
5257 runs: vec![],
5258 steps: vec![],
5259 actions: vec![],
5260 optional_backfills: vec![],
5261 vec_inserts: vec![],
5262 operational_writes: vec![],
5263 });
5264
5265 assert!(
5266 matches!(result, Err(EngineError::InvalidWrite(_))),
5267 "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
5268 );
5269 }
5270
5271 #[test]
5274 fn writer_batch_insert_multiple_nodes() {
5275 let db = NamedTempFile::new().expect("temporary db");
5276 let writer = WriterActor::start(
5277 db.path(),
5278 Arc::new(SchemaManager::new()),
5279 ProvenanceMode::Warn,
5280 Arc::new(TelemetryCounters::default()),
5281 )
5282 .expect("writer");
5283
5284 let nodes: Vec<NodeInsert> = (0..100)
5285 .map(|i| NodeInsert {
5286 row_id: format!("row-{i}"),
5287 logical_id: format!("lg-{i}"),
5288 kind: "Note".to_owned(),
5289 properties: "{}".to_owned(),
5290 source_ref: Some("batch-src".to_owned()),
5291 upsert: false,
5292 chunk_policy: ChunkPolicy::Preserve,
5293 content_ref: None,
5294 })
5295 .collect();
5296
5297 writer
5298 .submit(WriteRequest {
5299 label: "batch".to_owned(),
5300 nodes,
5301 node_retires: vec![],
5302 edges: vec![],
5303 edge_retires: vec![],
5304 chunks: vec![],
5305 runs: vec![],
5306 steps: vec![],
5307 actions: vec![],
5308 optional_backfills: vec![],
5309 vec_inserts: vec![],
5310 operational_writes: vec![],
5311 })
5312 .expect("batch write");
5313
5314 let conn = rusqlite::Connection::open(db.path()).expect("open");
5315 let count: i64 = conn
5316 .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
5317 .expect("count nodes");
5318 assert_eq!(
5319 count, 100,
5320 "all 100 nodes must be present after batch insert"
5321 );
5322 }
5323
5324 #[test]
5327 fn prepare_write_rejects_empty_node_row_id() {
5328 let db = NamedTempFile::new().expect("temporary db");
5329 let writer = WriterActor::start(
5330 db.path(),
5331 Arc::new(SchemaManager::new()),
5332 ProvenanceMode::Warn,
5333 Arc::new(TelemetryCounters::default()),
5334 )
5335 .expect("writer");
5336
5337 let result = writer.submit(WriteRequest {
5338 label: "test".to_owned(),
5339 nodes: vec![NodeInsert {
5340 row_id: String::new(),
5341 logical_id: "lg-1".to_owned(),
5342 kind: "Note".to_owned(),
5343 properties: "{}".to_owned(),
5344 source_ref: None,
5345 upsert: false,
5346 chunk_policy: ChunkPolicy::Preserve,
5347 content_ref: None,
5348 }],
5349 node_retires: vec![],
5350 edges: vec![],
5351 edge_retires: vec![],
5352 chunks: vec![],
5353 runs: vec![],
5354 steps: vec![],
5355 actions: vec![],
5356 optional_backfills: vec![],
5357 vec_inserts: vec![],
5358 operational_writes: vec![],
5359 });
5360
5361 assert!(
5362 matches!(result, Err(EngineError::InvalidWrite(_))),
5363 "empty row_id must be rejected"
5364 );
5365 }
5366
5367 #[test]
5368 fn prepare_write_rejects_empty_node_logical_id() {
5369 let db = NamedTempFile::new().expect("temporary db");
5370 let writer = WriterActor::start(
5371 db.path(),
5372 Arc::new(SchemaManager::new()),
5373 ProvenanceMode::Warn,
5374 Arc::new(TelemetryCounters::default()),
5375 )
5376 .expect("writer");
5377
5378 let result = writer.submit(WriteRequest {
5379 label: "test".to_owned(),
5380 nodes: vec![NodeInsert {
5381 row_id: "row-1".to_owned(),
5382 logical_id: String::new(),
5383 kind: "Note".to_owned(),
5384 properties: "{}".to_owned(),
5385 source_ref: None,
5386 upsert: false,
5387 chunk_policy: ChunkPolicy::Preserve,
5388 content_ref: None,
5389 }],
5390 node_retires: vec![],
5391 edges: vec![],
5392 edge_retires: vec![],
5393 chunks: vec![],
5394 runs: vec![],
5395 steps: vec![],
5396 actions: vec![],
5397 optional_backfills: vec![],
5398 vec_inserts: vec![],
5399 operational_writes: vec![],
5400 });
5401
5402 assert!(
5403 matches!(result, Err(EngineError::InvalidWrite(_))),
5404 "empty logical_id must be rejected"
5405 );
5406 }
5407
5408 #[test]
5409 fn prepare_write_rejects_duplicate_row_ids_in_request() {
5410 let db = NamedTempFile::new().expect("temporary db");
5411 let writer = WriterActor::start(
5412 db.path(),
5413 Arc::new(SchemaManager::new()),
5414 ProvenanceMode::Warn,
5415 Arc::new(TelemetryCounters::default()),
5416 )
5417 .expect("writer");
5418
5419 let result = writer.submit(WriteRequest {
5420 label: "test".to_owned(),
5421 nodes: vec![
5422 NodeInsert {
5423 row_id: "row-1".to_owned(),
5424 logical_id: "lg-1".to_owned(),
5425 kind: "Note".to_owned(),
5426 properties: "{}".to_owned(),
5427 source_ref: None,
5428 upsert: false,
5429 chunk_policy: ChunkPolicy::Preserve,
5430 content_ref: None,
5431 },
5432 NodeInsert {
5433 row_id: "row-1".to_owned(), logical_id: "lg-2".to_owned(),
5435 kind: "Note".to_owned(),
5436 properties: "{}".to_owned(),
5437 source_ref: None,
5438 upsert: false,
5439 chunk_policy: ChunkPolicy::Preserve,
5440 content_ref: None,
5441 },
5442 ],
5443 node_retires: vec![],
5444 edges: vec![],
5445 edge_retires: vec![],
5446 chunks: vec![],
5447 runs: vec![],
5448 steps: vec![],
5449 actions: vec![],
5450 optional_backfills: vec![],
5451 vec_inserts: vec![],
5452 operational_writes: vec![],
5453 });
5454
5455 assert!(
5456 matches!(result, Err(EngineError::InvalidWrite(_))),
5457 "duplicate row_id within request must be rejected"
5458 );
5459 }
5460
5461 #[test]
5462 fn prepare_write_rejects_empty_chunk_id() {
5463 let db = NamedTempFile::new().expect("temporary db");
5464 let writer = WriterActor::start(
5465 db.path(),
5466 Arc::new(SchemaManager::new()),
5467 ProvenanceMode::Warn,
5468 Arc::new(TelemetryCounters::default()),
5469 )
5470 .expect("writer");
5471
5472 let result = writer.submit(WriteRequest {
5473 label: "test".to_owned(),
5474 nodes: vec![NodeInsert {
5475 row_id: "row-1".to_owned(),
5476 logical_id: "lg-1".to_owned(),
5477 kind: "Note".to_owned(),
5478 properties: "{}".to_owned(),
5479 source_ref: None,
5480 upsert: false,
5481 chunk_policy: ChunkPolicy::Preserve,
5482 content_ref: None,
5483 }],
5484 node_retires: vec![],
5485 edges: vec![],
5486 edge_retires: vec![],
5487 chunks: vec![ChunkInsert {
5488 id: String::new(),
5489 node_logical_id: "lg-1".to_owned(),
5490 text_content: "some text".to_owned(),
5491 byte_start: None,
5492 byte_end: None,
5493 content_hash: None,
5494 }],
5495 runs: vec![],
5496 steps: vec![],
5497 actions: vec![],
5498 optional_backfills: vec![],
5499 vec_inserts: vec![],
5500 operational_writes: vec![],
5501 });
5502
5503 assert!(
5504 matches!(result, Err(EngineError::InvalidWrite(_))),
5505 "empty chunk id must be rejected"
5506 );
5507 }
5508
5509 #[test]
5512 fn writer_receipt_warns_on_step_without_source_ref() {
5513 let db = NamedTempFile::new().expect("temporary db");
5514 let writer = WriterActor::start(
5515 db.path(),
5516 Arc::new(SchemaManager::new()),
5517 ProvenanceMode::Warn,
5518 Arc::new(TelemetryCounters::default()),
5519 )
5520 .expect("writer");
5521
5522 writer
5524 .submit(WriteRequest {
5525 label: "seed-run".to_owned(),
5526 nodes: vec![],
5527 node_retires: vec![],
5528 edges: vec![],
5529 edge_retires: vec![],
5530 chunks: vec![],
5531 runs: vec![RunInsert {
5532 id: "run-1".to_owned(),
5533 kind: "session".to_owned(),
5534 status: "active".to_owned(),
5535 properties: "{}".to_owned(),
5536 source_ref: Some("src-1".to_owned()),
5537 upsert: false,
5538 supersedes_id: None,
5539 }],
5540 steps: vec![],
5541 actions: vec![],
5542 optional_backfills: vec![],
5543 vec_inserts: vec![],
5544 operational_writes: vec![],
5545 })
5546 .expect("seed run");
5547
5548 let receipt = writer
5549 .submit(WriteRequest {
5550 label: "test".to_owned(),
5551 nodes: vec![],
5552 node_retires: vec![],
5553 edges: vec![],
5554 edge_retires: vec![],
5555 chunks: vec![],
5556 runs: vec![],
5557 steps: vec![StepInsert {
5558 id: "step-1".to_owned(),
5559 run_id: "run-1".to_owned(),
5560 kind: "llm_call".to_owned(),
5561 status: "completed".to_owned(),
5562 properties: "{}".to_owned(),
5563 source_ref: None,
5564 upsert: false,
5565 supersedes_id: None,
5566 }],
5567 actions: vec![],
5568 optional_backfills: vec![],
5569 vec_inserts: vec![],
5570 operational_writes: vec![],
5571 })
5572 .expect("write");
5573
5574 assert!(
5575 !receipt.provenance_warnings.is_empty(),
5576 "step insert without source_ref must emit a provenance warning"
5577 );
5578 }
5579
5580 #[test]
5581 fn writer_receipt_warns_on_action_without_source_ref() {
5582 let db = NamedTempFile::new().expect("temporary db");
5583 let writer = WriterActor::start(
5584 db.path(),
5585 Arc::new(SchemaManager::new()),
5586 ProvenanceMode::Warn,
5587 Arc::new(TelemetryCounters::default()),
5588 )
5589 .expect("writer");
5590
5591 writer
5593 .submit(WriteRequest {
5594 label: "seed".to_owned(),
5595 nodes: vec![],
5596 node_retires: vec![],
5597 edges: vec![],
5598 edge_retires: vec![],
5599 chunks: vec![],
5600 runs: vec![RunInsert {
5601 id: "run-1".to_owned(),
5602 kind: "session".to_owned(),
5603 status: "active".to_owned(),
5604 properties: "{}".to_owned(),
5605 source_ref: Some("src-1".to_owned()),
5606 upsert: false,
5607 supersedes_id: None,
5608 }],
5609 steps: vec![StepInsert {
5610 id: "step-1".to_owned(),
5611 run_id: "run-1".to_owned(),
5612 kind: "llm_call".to_owned(),
5613 status: "completed".to_owned(),
5614 properties: "{}".to_owned(),
5615 source_ref: Some("src-1".to_owned()),
5616 upsert: false,
5617 supersedes_id: None,
5618 }],
5619 actions: vec![],
5620 optional_backfills: vec![],
5621 vec_inserts: vec![],
5622 operational_writes: vec![],
5623 })
5624 .expect("seed");
5625
5626 let receipt = writer
5627 .submit(WriteRequest {
5628 label: "test".to_owned(),
5629 nodes: vec![],
5630 node_retires: vec![],
5631 edges: vec![],
5632 edge_retires: vec![],
5633 chunks: vec![],
5634 runs: vec![],
5635 steps: vec![],
5636 actions: vec![ActionInsert {
5637 id: "action-1".to_owned(),
5638 step_id: "step-1".to_owned(),
5639 kind: "tool_call".to_owned(),
5640 status: "completed".to_owned(),
5641 properties: "{}".to_owned(),
5642 source_ref: None,
5643 upsert: false,
5644 supersedes_id: None,
5645 }],
5646 optional_backfills: vec![],
5647 vec_inserts: vec![],
5648 operational_writes: vec![],
5649 })
5650 .expect("write");
5651
5652 assert!(
5653 !receipt.provenance_warnings.is_empty(),
5654 "action insert without source_ref must emit a provenance warning"
5655 );
5656 }
5657
5658 #[test]
5659 fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5660 let db = NamedTempFile::new().expect("temporary db");
5661 let writer = WriterActor::start(
5662 db.path(),
5663 Arc::new(SchemaManager::new()),
5664 ProvenanceMode::Warn,
5665 Arc::new(TelemetryCounters::default()),
5666 )
5667 .expect("writer");
5668
5669 let receipt = writer
5670 .submit(WriteRequest {
5671 label: "test".to_owned(),
5672 nodes: vec![NodeInsert {
5673 row_id: "row-1".to_owned(),
5674 logical_id: "node-1".to_owned(),
5675 kind: "Note".to_owned(),
5676 properties: "{}".to_owned(),
5677 source_ref: Some("src-1".to_owned()),
5678 upsert: false,
5679 chunk_policy: ChunkPolicy::Preserve,
5680 content_ref: None,
5681 }],
5682 node_retires: vec![],
5683 edges: vec![],
5684 edge_retires: vec![],
5685 chunks: vec![],
5686 runs: vec![RunInsert {
5687 id: "run-1".to_owned(),
5688 kind: "session".to_owned(),
5689 status: "active".to_owned(),
5690 properties: "{}".to_owned(),
5691 source_ref: Some("src-1".to_owned()),
5692 upsert: false,
5693 supersedes_id: None,
5694 }],
5695 steps: vec![StepInsert {
5696 id: "step-1".to_owned(),
5697 run_id: "run-1".to_owned(),
5698 kind: "llm_call".to_owned(),
5699 status: "completed".to_owned(),
5700 properties: "{}".to_owned(),
5701 source_ref: Some("src-1".to_owned()),
5702 upsert: false,
5703 supersedes_id: None,
5704 }],
5705 actions: vec![ActionInsert {
5706 id: "action-1".to_owned(),
5707 step_id: "step-1".to_owned(),
5708 kind: "tool_call".to_owned(),
5709 status: "completed".to_owned(),
5710 properties: "{}".to_owned(),
5711 source_ref: Some("src-1".to_owned()),
5712 upsert: false,
5713 supersedes_id: None,
5714 }],
5715 optional_backfills: vec![],
5716 vec_inserts: vec![],
5717 operational_writes: vec![],
5718 })
5719 .expect("write");
5720
5721 assert!(
5722 receipt.provenance_warnings.is_empty(),
5723 "no warnings expected when all types have source_ref; got: {:?}",
5724 receipt.provenance_warnings
5725 );
5726 }
5727
5728 #[test]
5731 fn default_provenance_mode_is_warn() {
5732 let db = NamedTempFile::new().expect("temporary db");
5734 let writer = WriterActor::start(
5735 db.path(),
5736 Arc::new(SchemaManager::new()),
5737 ProvenanceMode::default(),
5738 Arc::new(TelemetryCounters::default()),
5739 )
5740 .expect("writer");
5741
5742 let receipt = writer
5743 .submit(WriteRequest {
5744 label: "test".to_owned(),
5745 nodes: vec![NodeInsert {
5746 row_id: "row-1".to_owned(),
5747 logical_id: "node-1".to_owned(),
5748 kind: "Note".to_owned(),
5749 properties: "{}".to_owned(),
5750 source_ref: None,
5751 upsert: false,
5752 chunk_policy: ChunkPolicy::Preserve,
5753 content_ref: None,
5754 }],
5755 node_retires: vec![],
5756 edges: vec![],
5757 edge_retires: vec![],
5758 chunks: vec![],
5759 runs: vec![],
5760 steps: vec![],
5761 actions: vec![],
5762 optional_backfills: vec![],
5763 vec_inserts: vec![],
5764 operational_writes: vec![],
5765 })
5766 .expect("Warn mode must not reject missing source_ref");
5767
5768 assert!(
5769 !receipt.provenance_warnings.is_empty(),
5770 "Warn mode must emit a warning instead of rejecting"
5771 );
5772 }
5773
5774 #[test]
5775 fn require_mode_rejects_node_without_source_ref() {
5776 let db = NamedTempFile::new().expect("temporary db");
5777 let writer = WriterActor::start(
5778 db.path(),
5779 Arc::new(SchemaManager::new()),
5780 ProvenanceMode::Require,
5781 Arc::new(TelemetryCounters::default()),
5782 )
5783 .expect("writer");
5784
5785 let result = writer.submit(WriteRequest {
5786 label: "test".to_owned(),
5787 nodes: vec![NodeInsert {
5788 row_id: "row-1".to_owned(),
5789 logical_id: "node-1".to_owned(),
5790 kind: "Note".to_owned(),
5791 properties: "{}".to_owned(),
5792 source_ref: None,
5793 upsert: false,
5794 chunk_policy: ChunkPolicy::Preserve,
5795 content_ref: None,
5796 }],
5797 node_retires: vec![],
5798 edges: vec![],
5799 edge_retires: vec![],
5800 chunks: vec![],
5801 runs: vec![],
5802 steps: vec![],
5803 actions: vec![],
5804 optional_backfills: vec![],
5805 vec_inserts: vec![],
5806 operational_writes: vec![],
5807 });
5808
5809 assert!(
5810 matches!(result, Err(EngineError::InvalidWrite(_))),
5811 "Require mode must reject node without source_ref"
5812 );
5813 }
5814
5815 #[test]
5816 fn require_mode_accepts_node_with_source_ref() {
5817 let db = NamedTempFile::new().expect("temporary db");
5818 let writer = WriterActor::start(
5819 db.path(),
5820 Arc::new(SchemaManager::new()),
5821 ProvenanceMode::Require,
5822 Arc::new(TelemetryCounters::default()),
5823 )
5824 .expect("writer");
5825
5826 let result = writer.submit(WriteRequest {
5827 label: "test".to_owned(),
5828 nodes: vec![NodeInsert {
5829 row_id: "row-1".to_owned(),
5830 logical_id: "node-1".to_owned(),
5831 kind: "Note".to_owned(),
5832 properties: "{}".to_owned(),
5833 source_ref: Some("src-1".to_owned()),
5834 upsert: false,
5835 chunk_policy: ChunkPolicy::Preserve,
5836 content_ref: None,
5837 }],
5838 node_retires: vec![],
5839 edges: vec![],
5840 edge_retires: vec![],
5841 chunks: vec![],
5842 runs: vec![],
5843 steps: vec![],
5844 actions: vec![],
5845 optional_backfills: vec![],
5846 vec_inserts: vec![],
5847 operational_writes: vec![],
5848 });
5849
5850 assert!(
5851 result.is_ok(),
5852 "Require mode must accept node with source_ref"
5853 );
5854 }
5855
5856 #[test]
5857 fn require_mode_rejects_edge_without_source_ref() {
5858 let db = NamedTempFile::new().expect("temporary db");
5859 let writer = WriterActor::start(
5860 db.path(),
5861 Arc::new(SchemaManager::new()),
5862 ProvenanceMode::Require,
5863 Arc::new(TelemetryCounters::default()),
5864 )
5865 .expect("writer");
5866
5867 let result = writer.submit(WriteRequest {
5869 label: "test".to_owned(),
5870 nodes: vec![
5871 NodeInsert {
5872 row_id: "row-a".to_owned(),
5873 logical_id: "node-a".to_owned(),
5874 kind: "Note".to_owned(),
5875 properties: "{}".to_owned(),
5876 source_ref: Some("src-1".to_owned()),
5877 upsert: false,
5878 chunk_policy: ChunkPolicy::Preserve,
5879 content_ref: None,
5880 },
5881 NodeInsert {
5882 row_id: "row-b".to_owned(),
5883 logical_id: "node-b".to_owned(),
5884 kind: "Note".to_owned(),
5885 properties: "{}".to_owned(),
5886 source_ref: Some("src-1".to_owned()),
5887 upsert: false,
5888 chunk_policy: ChunkPolicy::Preserve,
5889 content_ref: None,
5890 },
5891 ],
5892 node_retires: vec![],
5893 edges: vec![EdgeInsert {
5894 row_id: "edge-row-1".to_owned(),
5895 logical_id: "edge-1".to_owned(),
5896 source_logical_id: "node-a".to_owned(),
5897 target_logical_id: "node-b".to_owned(),
5898 kind: "LINKS_TO".to_owned(),
5899 properties: "{}".to_owned(),
5900 source_ref: None,
5901 upsert: false,
5902 }],
5903 edge_retires: vec![],
5904 chunks: vec![],
5905 runs: vec![],
5906 steps: vec![],
5907 actions: vec![],
5908 optional_backfills: vec![],
5909 vec_inserts: vec![],
5910 operational_writes: vec![],
5911 });
5912
5913 assert!(
5914 matches!(result, Err(EngineError::InvalidWrite(_))),
5915 "Require mode must reject edge without source_ref"
5916 );
5917 }
5918
5919 #[test]
5922 fn fts_row_has_correct_kind_from_co_submitted_node() {
5923 let db = NamedTempFile::new().expect("temporary db");
5924 let writer = WriterActor::start(
5925 db.path(),
5926 Arc::new(SchemaManager::new()),
5927 ProvenanceMode::Warn,
5928 Arc::new(TelemetryCounters::default()),
5929 )
5930 .expect("writer");
5931
5932 writer
5933 .submit(WriteRequest {
5934 label: "test".to_owned(),
5935 nodes: vec![NodeInsert {
5936 row_id: "row-1".to_owned(),
5937 logical_id: "node-1".to_owned(),
5938 kind: "Meeting".to_owned(),
5939 properties: "{}".to_owned(),
5940 source_ref: Some("src-1".to_owned()),
5941 upsert: false,
5942 chunk_policy: ChunkPolicy::Preserve,
5943 content_ref: None,
5944 }],
5945 node_retires: vec![],
5946 edges: vec![],
5947 edge_retires: vec![],
5948 chunks: vec![ChunkInsert {
5949 id: "chunk-1".to_owned(),
5950 node_logical_id: "node-1".to_owned(),
5951 text_content: "some text".to_owned(),
5952 byte_start: None,
5953 byte_end: None,
5954 content_hash: None,
5955 }],
5956 runs: vec![],
5957 steps: vec![],
5958 actions: vec![],
5959 optional_backfills: vec![],
5960 vec_inserts: vec![],
5961 operational_writes: vec![],
5962 })
5963 .expect("write");
5964
5965 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5966 let kind: String = conn
5967 .query_row(
5968 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5969 [],
5970 |row| row.get(0),
5971 )
5972 .expect("fts row");
5973
5974 assert_eq!(kind, "Meeting");
5975 }
5976
5977 #[test]
5978 fn fts_row_has_correct_text_content() {
5979 let db = NamedTempFile::new().expect("temporary db");
5980 let writer = WriterActor::start(
5981 db.path(),
5982 Arc::new(SchemaManager::new()),
5983 ProvenanceMode::Warn,
5984 Arc::new(TelemetryCounters::default()),
5985 )
5986 .expect("writer");
5987
5988 writer
5989 .submit(WriteRequest {
5990 label: "test".to_owned(),
5991 nodes: vec![NodeInsert {
5992 row_id: "row-1".to_owned(),
5993 logical_id: "node-1".to_owned(),
5994 kind: "Note".to_owned(),
5995 properties: "{}".to_owned(),
5996 source_ref: Some("src-1".to_owned()),
5997 upsert: false,
5998 chunk_policy: ChunkPolicy::Preserve,
5999 content_ref: None,
6000 }],
6001 node_retires: vec![],
6002 edges: vec![],
6003 edge_retires: vec![],
6004 chunks: vec![ChunkInsert {
6005 id: "chunk-1".to_owned(),
6006 node_logical_id: "node-1".to_owned(),
6007 text_content: "exactly this text".to_owned(),
6008 byte_start: None,
6009 byte_end: None,
6010 content_hash: None,
6011 }],
6012 runs: vec![],
6013 steps: vec![],
6014 actions: vec![],
6015 optional_backfills: vec![],
6016 vec_inserts: vec![],
6017 operational_writes: vec![],
6018 })
6019 .expect("write");
6020
6021 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6022 let text: String = conn
6023 .query_row(
6024 "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
6025 [],
6026 |row| row.get(0),
6027 )
6028 .expect("fts row");
6029
6030 assert_eq!(text, "exactly this text");
6031 }
6032
6033 #[test]
6034 fn fts_row_has_correct_kind_from_pre_existing_node() {
6035 let db = NamedTempFile::new().expect("temporary db");
6036 let writer = WriterActor::start(
6037 db.path(),
6038 Arc::new(SchemaManager::new()),
6039 ProvenanceMode::Warn,
6040 Arc::new(TelemetryCounters::default()),
6041 )
6042 .expect("writer");
6043
6044 writer
6046 .submit(WriteRequest {
6047 label: "r1".to_owned(),
6048 nodes: vec![NodeInsert {
6049 row_id: "row-1".to_owned(),
6050 logical_id: "node-1".to_owned(),
6051 kind: "Document".to_owned(),
6052 properties: "{}".to_owned(),
6053 source_ref: Some("src-1".to_owned()),
6054 upsert: false,
6055 chunk_policy: ChunkPolicy::Preserve,
6056 content_ref: None,
6057 }],
6058 node_retires: vec![],
6059 edges: vec![],
6060 edge_retires: vec![],
6061 chunks: vec![],
6062 runs: vec![],
6063 steps: vec![],
6064 actions: vec![],
6065 optional_backfills: vec![],
6066 vec_inserts: vec![],
6067 operational_writes: vec![],
6068 })
6069 .expect("r1 write");
6070
6071 writer
6073 .submit(WriteRequest {
6074 label: "r2".to_owned(),
6075 nodes: vec![],
6076 node_retires: vec![],
6077 edges: vec![],
6078 edge_retires: vec![],
6079 chunks: vec![ChunkInsert {
6080 id: "chunk-1".to_owned(),
6081 node_logical_id: "node-1".to_owned(),
6082 text_content: "some text".to_owned(),
6083 byte_start: None,
6084 byte_end: None,
6085 content_hash: None,
6086 }],
6087 runs: vec![],
6088 steps: vec![],
6089 actions: vec![],
6090 optional_backfills: vec![],
6091 vec_inserts: vec![],
6092 operational_writes: vec![],
6093 })
6094 .expect("r2 write");
6095
6096 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6097 let kind: String = conn
6098 .query_row(
6099 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
6100 [],
6101 |row| row.get(0),
6102 )
6103 .expect("fts row");
6104
6105 assert_eq!(kind, "Document");
6106 }
6107
6108 #[test]
6109 fn fts_derives_rows_for_multiple_chunks_per_node() {
6110 let db = NamedTempFile::new().expect("temporary db");
6111 let writer = WriterActor::start(
6112 db.path(),
6113 Arc::new(SchemaManager::new()),
6114 ProvenanceMode::Warn,
6115 Arc::new(TelemetryCounters::default()),
6116 )
6117 .expect("writer");
6118
6119 writer
6120 .submit(WriteRequest {
6121 label: "test".to_owned(),
6122 nodes: vec![NodeInsert {
6123 row_id: "row-1".to_owned(),
6124 logical_id: "node-1".to_owned(),
6125 kind: "Meeting".to_owned(),
6126 properties: "{}".to_owned(),
6127 source_ref: Some("src-1".to_owned()),
6128 upsert: false,
6129 chunk_policy: ChunkPolicy::Preserve,
6130 content_ref: None,
6131 }],
6132 node_retires: vec![],
6133 edges: vec![],
6134 edge_retires: vec![],
6135 chunks: vec![
6136 ChunkInsert {
6137 id: "chunk-a".to_owned(),
6138 node_logical_id: "node-1".to_owned(),
6139 text_content: "intro".to_owned(),
6140 byte_start: None,
6141 byte_end: None,
6142 content_hash: None,
6143 },
6144 ChunkInsert {
6145 id: "chunk-b".to_owned(),
6146 node_logical_id: "node-1".to_owned(),
6147 text_content: "body".to_owned(),
6148 byte_start: None,
6149 byte_end: None,
6150 content_hash: None,
6151 },
6152 ChunkInsert {
6153 id: "chunk-c".to_owned(),
6154 node_logical_id: "node-1".to_owned(),
6155 text_content: "conclusion".to_owned(),
6156 byte_start: None,
6157 byte_end: None,
6158 content_hash: None,
6159 },
6160 ],
6161 runs: vec![],
6162 steps: vec![],
6163 actions: vec![],
6164 optional_backfills: vec![],
6165 vec_inserts: vec![],
6166 operational_writes: vec![],
6167 })
6168 .expect("write");
6169
6170 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6171 let count: i64 = conn
6172 .query_row(
6173 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6174 [],
6175 |row| row.get(0),
6176 )
6177 .expect("fts count");
6178
6179 assert_eq!(count, 3, "three chunks must produce three FTS rows");
6180 }
6181
6182 #[test]
6183 fn fts_resolves_mixed_fast_and_db_paths() {
6184 let db = NamedTempFile::new().expect("temporary db");
6185 let writer = WriterActor::start(
6186 db.path(),
6187 Arc::new(SchemaManager::new()),
6188 ProvenanceMode::Warn,
6189 Arc::new(TelemetryCounters::default()),
6190 )
6191 .expect("writer");
6192
6193 writer
6195 .submit(WriteRequest {
6196 label: "seed".to_owned(),
6197 nodes: vec![NodeInsert {
6198 row_id: "row-existing".to_owned(),
6199 logical_id: "existing-node".to_owned(),
6200 kind: "Archive".to_owned(),
6201 properties: "{}".to_owned(),
6202 source_ref: Some("src-1".to_owned()),
6203 upsert: false,
6204 chunk_policy: ChunkPolicy::Preserve,
6205 content_ref: None,
6206 }],
6207 node_retires: vec![],
6208 edges: vec![],
6209 edge_retires: vec![],
6210 chunks: vec![],
6211 runs: vec![],
6212 steps: vec![],
6213 actions: vec![],
6214 optional_backfills: vec![],
6215 vec_inserts: vec![],
6216 operational_writes: vec![],
6217 })
6218 .expect("seed");
6219
6220 writer
6222 .submit(WriteRequest {
6223 label: "mixed".to_owned(),
6224 nodes: vec![NodeInsert {
6225 row_id: "row-new".to_owned(),
6226 logical_id: "new-node".to_owned(),
6227 kind: "Inbox".to_owned(),
6228 properties: "{}".to_owned(),
6229 source_ref: Some("src-2".to_owned()),
6230 upsert: false,
6231 chunk_policy: ChunkPolicy::Preserve,
6232 content_ref: None,
6233 }],
6234 node_retires: vec![],
6235 edges: vec![],
6236 edge_retires: vec![],
6237 chunks: vec![
6238 ChunkInsert {
6239 id: "chunk-fast".to_owned(),
6240 node_logical_id: "new-node".to_owned(),
6241 text_content: "new content".to_owned(),
6242 byte_start: None,
6243 byte_end: None,
6244 content_hash: None,
6245 },
6246 ChunkInsert {
6247 id: "chunk-db".to_owned(),
6248 node_logical_id: "existing-node".to_owned(),
6249 text_content: "archive content".to_owned(),
6250 byte_start: None,
6251 byte_end: None,
6252 content_hash: None,
6253 },
6254 ],
6255 runs: vec![],
6256 steps: vec![],
6257 actions: vec![],
6258 optional_backfills: vec![],
6259 vec_inserts: vec![],
6260 operational_writes: vec![],
6261 })
6262 .expect("mixed write");
6263
6264 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6265 let fast_kind: String = conn
6266 .query_row(
6267 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
6268 [],
6269 |row| row.get(0),
6270 )
6271 .expect("fast path fts row");
6272 let db_kind: String = conn
6273 .query_row(
6274 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
6275 [],
6276 |row| row.get(0),
6277 )
6278 .expect("db path fts row");
6279
6280 assert_eq!(fast_kind, "Inbox");
6281 assert_eq!(db_kind, "Archive");
6282 }
6283
6284 #[test]
6285 fn prepare_write_rejects_empty_chunk_text() {
6286 let db = NamedTempFile::new().expect("temporary db");
6287 let writer = WriterActor::start(
6288 db.path(),
6289 Arc::new(SchemaManager::new()),
6290 ProvenanceMode::Warn,
6291 Arc::new(TelemetryCounters::default()),
6292 )
6293 .expect("writer");
6294
6295 let result = writer.submit(WriteRequest {
6296 label: "test".to_owned(),
6297 nodes: vec![NodeInsert {
6298 row_id: "row-1".to_owned(),
6299 logical_id: "node-1".to_owned(),
6300 kind: "Note".to_owned(),
6301 properties: "{}".to_owned(),
6302 source_ref: None,
6303 upsert: false,
6304 chunk_policy: ChunkPolicy::Preserve,
6305 content_ref: None,
6306 }],
6307 node_retires: vec![],
6308 edges: vec![],
6309 edge_retires: vec![],
6310 chunks: vec![ChunkInsert {
6311 id: "chunk-1".to_owned(),
6312 node_logical_id: "node-1".to_owned(),
6313 text_content: String::new(),
6314 byte_start: None,
6315 byte_end: None,
6316 content_hash: None,
6317 }],
6318 runs: vec![],
6319 steps: vec![],
6320 actions: vec![],
6321 optional_backfills: vec![],
6322 vec_inserts: vec![],
6323 operational_writes: vec![],
6324 });
6325
6326 assert!(
6327 matches!(result, Err(EngineError::InvalidWrite(_))),
6328 "empty text_content must be rejected"
6329 );
6330 }
6331
6332 #[test]
6333 fn receipt_reports_zero_backfills_when_none_submitted() {
6334 let db = NamedTempFile::new().expect("temporary db");
6335 let writer = WriterActor::start(
6336 db.path(),
6337 Arc::new(SchemaManager::new()),
6338 ProvenanceMode::Warn,
6339 Arc::new(TelemetryCounters::default()),
6340 )
6341 .expect("writer");
6342
6343 let receipt = writer
6344 .submit(WriteRequest {
6345 label: "test".to_owned(),
6346 nodes: vec![NodeInsert {
6347 row_id: "row-1".to_owned(),
6348 logical_id: "node-1".to_owned(),
6349 kind: "Note".to_owned(),
6350 properties: "{}".to_owned(),
6351 source_ref: Some("src-1".to_owned()),
6352 upsert: false,
6353 chunk_policy: ChunkPolicy::Preserve,
6354 content_ref: None,
6355 }],
6356 node_retires: vec![],
6357 edges: vec![],
6358 edge_retires: vec![],
6359 chunks: vec![],
6360 runs: vec![],
6361 steps: vec![],
6362 actions: vec![],
6363 optional_backfills: vec![],
6364 vec_inserts: vec![],
6365 operational_writes: vec![],
6366 })
6367 .expect("write");
6368
6369 assert_eq!(receipt.optional_backfill_count, 0);
6370 }
6371
6372 #[test]
6373 fn receipt_reports_correct_backfill_count() {
6374 let db = NamedTempFile::new().expect("temporary db");
6375 let writer = WriterActor::start(
6376 db.path(),
6377 Arc::new(SchemaManager::new()),
6378 ProvenanceMode::Warn,
6379 Arc::new(TelemetryCounters::default()),
6380 )
6381 .expect("writer");
6382
6383 let receipt = writer
6384 .submit(WriteRequest {
6385 label: "test".to_owned(),
6386 nodes: vec![NodeInsert {
6387 row_id: "row-1".to_owned(),
6388 logical_id: "node-1".to_owned(),
6389 kind: "Note".to_owned(),
6390 properties: "{}".to_owned(),
6391 source_ref: Some("src-1".to_owned()),
6392 upsert: false,
6393 chunk_policy: ChunkPolicy::Preserve,
6394 content_ref: None,
6395 }],
6396 node_retires: vec![],
6397 edges: vec![],
6398 edge_retires: vec![],
6399 chunks: vec![],
6400 runs: vec![],
6401 steps: vec![],
6402 actions: vec![],
6403 optional_backfills: vec![
6404 OptionalProjectionTask {
6405 target: ProjectionTarget::Fts,
6406 payload: "p1".to_owned(),
6407 },
6408 OptionalProjectionTask {
6409 target: ProjectionTarget::Vec,
6410 payload: "p2".to_owned(),
6411 },
6412 OptionalProjectionTask {
6413 target: ProjectionTarget::All,
6414 payload: "p3".to_owned(),
6415 },
6416 ],
6417 vec_inserts: vec![],
6418 operational_writes: vec![],
6419 })
6420 .expect("write");
6421
6422 assert_eq!(receipt.optional_backfill_count, 3);
6423 }
6424
6425 #[test]
6426 fn backfill_tasks_are_not_executed_during_write() {
6427 let db = NamedTempFile::new().expect("temporary db");
6428 let writer = WriterActor::start(
6429 db.path(),
6430 Arc::new(SchemaManager::new()),
6431 ProvenanceMode::Warn,
6432 Arc::new(TelemetryCounters::default()),
6433 )
6434 .expect("writer");
6435
6436 writer
6439 .submit(WriteRequest {
6440 label: "test".to_owned(),
6441 nodes: vec![NodeInsert {
6442 row_id: "row-1".to_owned(),
6443 logical_id: "node-1".to_owned(),
6444 kind: "Note".to_owned(),
6445 properties: "{}".to_owned(),
6446 source_ref: Some("src-1".to_owned()),
6447 upsert: false,
6448 chunk_policy: ChunkPolicy::Preserve,
6449 content_ref: None,
6450 }],
6451 node_retires: vec![],
6452 edges: vec![],
6453 edge_retires: vec![],
6454 chunks: vec![ChunkInsert {
6455 id: "chunk-1".to_owned(),
6456 node_logical_id: "node-1".to_owned(),
6457 text_content: "required text".to_owned(),
6458 byte_start: None,
6459 byte_end: None,
6460 content_hash: None,
6461 }],
6462 runs: vec![],
6463 steps: vec![],
6464 actions: vec![],
6465 optional_backfills: vec![OptionalProjectionTask {
6466 target: ProjectionTarget::Fts,
6467 payload: "backfill-payload".to_owned(),
6468 }],
6469 vec_inserts: vec![],
6470 operational_writes: vec![],
6471 })
6472 .expect("write");
6473
6474 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6475 let count: i64 = conn
6476 .query_row(
6477 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6478 [],
6479 |row| row.get(0),
6480 )
6481 .expect("fts count");
6482
6483 assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6484 }
6485
6486 #[test]
6487 fn fts_row_uses_new_kind_after_node_replace() {
6488 let db = NamedTempFile::new().expect("temporary db");
6489 let writer = WriterActor::start(
6490 db.path(),
6491 Arc::new(SchemaManager::new()),
6492 ProvenanceMode::Warn,
6493 Arc::new(TelemetryCounters::default()),
6494 )
6495 .expect("writer");
6496
6497 writer
6499 .submit(WriteRequest {
6500 label: "v1".to_owned(),
6501 nodes: vec![NodeInsert {
6502 row_id: "row-1".to_owned(),
6503 logical_id: "node-1".to_owned(),
6504 kind: "Note".to_owned(),
6505 properties: "{}".to_owned(),
6506 source_ref: Some("src-1".to_owned()),
6507 upsert: false,
6508 chunk_policy: ChunkPolicy::Preserve,
6509 content_ref: None,
6510 }],
6511 node_retires: vec![],
6512 edges: vec![],
6513 edge_retires: vec![],
6514 chunks: vec![ChunkInsert {
6515 id: "chunk-v1".to_owned(),
6516 node_logical_id: "node-1".to_owned(),
6517 text_content: "original".to_owned(),
6518 byte_start: None,
6519 byte_end: None,
6520 content_hash: None,
6521 }],
6522 runs: vec![],
6523 steps: vec![],
6524 actions: vec![],
6525 optional_backfills: vec![],
6526 vec_inserts: vec![],
6527 operational_writes: vec![],
6528 })
6529 .expect("v1 write");
6530
6531 writer
6533 .submit(WriteRequest {
6534 label: "v2".to_owned(),
6535 nodes: vec![NodeInsert {
6536 row_id: "row-2".to_owned(),
6537 logical_id: "node-1".to_owned(),
6538 kind: "Meeting".to_owned(),
6539 properties: "{}".to_owned(),
6540 source_ref: Some("src-2".to_owned()),
6541 upsert: true,
6542 chunk_policy: ChunkPolicy::Replace,
6543 content_ref: None,
6544 }],
6545 node_retires: vec![],
6546 edges: vec![],
6547 edge_retires: vec![],
6548 chunks: vec![ChunkInsert {
6549 id: "chunk-v2".to_owned(),
6550 node_logical_id: "node-1".to_owned(),
6551 text_content: "updated".to_owned(),
6552 byte_start: None,
6553 byte_end: None,
6554 content_hash: None,
6555 }],
6556 runs: vec![],
6557 steps: vec![],
6558 actions: vec![],
6559 optional_backfills: vec![],
6560 vec_inserts: vec![],
6561 operational_writes: vec![],
6562 })
6563 .expect("v2 write");
6564
6565 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6566
6567 let old_count: i64 = conn
6569 .query_row(
6570 "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6571 [],
6572 |row| row.get(0),
6573 )
6574 .expect("old fts count");
6575 assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6576
6577 let new_kind: String = conn
6579 .query_row(
6580 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6581 [],
6582 |row| row.get(0),
6583 )
6584 .expect("new fts row");
6585 assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6586 }
6587
6588 #[test]
6591 fn vec_insert_empty_chunk_id_is_rejected() {
6592 let db = NamedTempFile::new().expect("temporary db");
6593 let writer = WriterActor::start(
6594 db.path(),
6595 Arc::new(SchemaManager::new()),
6596 ProvenanceMode::Warn,
6597 Arc::new(TelemetryCounters::default()),
6598 )
6599 .expect("writer");
6600 let result = writer.submit(WriteRequest {
6601 label: "vec-test".to_owned(),
6602 nodes: vec![],
6603 node_retires: vec![],
6604 edges: vec![],
6605 edge_retires: vec![],
6606 chunks: vec![],
6607 runs: vec![],
6608 steps: vec![],
6609 actions: vec![],
6610 optional_backfills: vec![],
6611 vec_inserts: vec![VecInsert {
6612 chunk_id: String::new(),
6613 embedding: vec![0.1, 0.2, 0.3],
6614 }],
6615 operational_writes: vec![],
6616 });
6617 assert!(
6618 matches!(result, Err(EngineError::InvalidWrite(_))),
6619 "empty chunk_id in VecInsert must be rejected"
6620 );
6621 }
6622
6623 #[test]
6624 fn vec_insert_empty_embedding_is_rejected() {
6625 let db = NamedTempFile::new().expect("temporary db");
6626 let writer = WriterActor::start(
6627 db.path(),
6628 Arc::new(SchemaManager::new()),
6629 ProvenanceMode::Warn,
6630 Arc::new(TelemetryCounters::default()),
6631 )
6632 .expect("writer");
6633 let result = writer.submit(WriteRequest {
6634 label: "vec-test".to_owned(),
6635 nodes: vec![],
6636 node_retires: vec![],
6637 edges: vec![],
6638 edge_retires: vec![],
6639 chunks: vec![],
6640 runs: vec![],
6641 steps: vec![],
6642 actions: vec![],
6643 optional_backfills: vec![],
6644 vec_inserts: vec![VecInsert {
6645 chunk_id: "chunk-1".to_owned(),
6646 embedding: vec![],
6647 }],
6648 operational_writes: vec![],
6649 });
6650 assert!(
6651 matches!(result, Err(EngineError::InvalidWrite(_))),
6652 "empty embedding in VecInsert must be rejected"
6653 );
6654 }
6655
6656 #[test]
6657 fn vec_insert_noop_without_feature() {
6658 let db = NamedTempFile::new().expect("temporary db");
6661 let writer = WriterActor::start(
6662 db.path(),
6663 Arc::new(SchemaManager::new()),
6664 ProvenanceMode::Warn,
6665 Arc::new(TelemetryCounters::default()),
6666 )
6667 .expect("writer");
6668 let result = writer.submit(WriteRequest {
6669 label: "vec-noop".to_owned(),
6670 nodes: vec![],
6671 node_retires: vec![],
6672 edges: vec![],
6673 edge_retires: vec![],
6674 chunks: vec![],
6675 runs: vec![],
6676 steps: vec![],
6677 actions: vec![],
6678 optional_backfills: vec![],
6679 vec_inserts: vec![VecInsert {
6680 chunk_id: "chunk-noop".to_owned(),
6681 embedding: vec![1.0, 2.0, 3.0],
6682 }],
6683 operational_writes: vec![],
6684 });
6685 #[cfg(not(feature = "sqlite-vec"))]
6686 result.expect("noop VecInsert without feature must succeed");
6687 #[cfg(feature = "sqlite-vec")]
6689 let _ = result;
6690 }
6691
6692 #[cfg(feature = "sqlite-vec")]
6693 #[test]
6694 fn node_retire_preserves_vec_rows_for_later_restore() {
6695 use crate::sqlite::open_connection_with_vec;
6696
6697 let db = NamedTempFile::new().expect("temporary db");
6698 let schema_manager = Arc::new(SchemaManager::new());
6699
6700 {
6701 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6702 schema_manager.bootstrap(&conn).expect("bootstrap");
6703 }
6704
6705 let writer = WriterActor::start(
6706 db.path(),
6707 Arc::clone(&schema_manager),
6708 ProvenanceMode::Warn,
6709 Arc::new(TelemetryCounters::default()),
6710 )
6711 .expect("writer");
6712
6713 writer
6715 .submit(WriteRequest {
6716 label: "setup".to_owned(),
6717 nodes: vec![NodeInsert {
6718 row_id: "row-retire-vec".to_owned(),
6719 logical_id: "node-retire-vec".to_owned(),
6720 kind: "Doc".to_owned(),
6721 properties: "{}".to_owned(),
6722 source_ref: Some("src".to_owned()),
6723 upsert: false,
6724 chunk_policy: ChunkPolicy::Preserve,
6725 content_ref: None,
6726 }],
6727 node_retires: vec![],
6728 edges: vec![],
6729 edge_retires: vec![],
6730 chunks: vec![ChunkInsert {
6731 id: "chunk-retire-vec".to_owned(),
6732 node_logical_id: "node-retire-vec".to_owned(),
6733 text_content: "text".to_owned(),
6734 byte_start: None,
6735 byte_end: None,
6736 content_hash: None,
6737 }],
6738 runs: vec![],
6739 steps: vec![],
6740 actions: vec![],
6741 optional_backfills: vec![],
6742 vec_inserts: vec![VecInsert {
6743 chunk_id: "chunk-retire-vec".to_owned(),
6744 embedding: vec![0.1, 0.2, 0.3],
6745 }],
6746 operational_writes: vec![],
6747 })
6748 .expect("setup write");
6749
6750 writer
6752 .submit(WriteRequest {
6753 label: "retire".to_owned(),
6754 nodes: vec![],
6755 node_retires: vec![NodeRetire {
6756 logical_id: "node-retire-vec".to_owned(),
6757 source_ref: Some("src".to_owned()),
6758 }],
6759 edges: vec![],
6760 edge_retires: vec![],
6761 chunks: vec![],
6762 runs: vec![],
6763 steps: vec![],
6764 actions: vec![],
6765 optional_backfills: vec![],
6766 vec_inserts: vec![],
6767 operational_writes: vec![],
6768 })
6769 .expect("retire write");
6770
6771 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6772 let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
6773 let count: i64 = conn
6774 .query_row(
6775 &format!("SELECT count(*) FROM {vec_table} WHERE chunk_id = 'chunk-retire-vec'"),
6776 [],
6777 |row| row.get(0),
6778 )
6779 .expect("count");
6780 assert_eq!(
6781 count, 1,
6782 "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6783 );
6784 }
6785
6786 #[cfg(feature = "sqlite-vec")]
6787 #[test]
6788 #[allow(clippy::too_many_lines)]
6789 fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6790 use crate::sqlite::open_connection_with_vec;
6791
6792 let db = NamedTempFile::new().expect("temporary db");
6793 let schema_manager = Arc::new(SchemaManager::new());
6794
6795 {
6796 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6797 schema_manager.bootstrap(&conn).expect("bootstrap");
6798 }
6799
6800 let writer = WriterActor::start(
6801 db.path(),
6802 Arc::clone(&schema_manager),
6803 ProvenanceMode::Warn,
6804 Arc::new(TelemetryCounters::default()),
6805 )
6806 .expect("writer");
6807
6808 writer
6810 .submit(WriteRequest {
6811 label: "v1".to_owned(),
6812 nodes: vec![NodeInsert {
6813 row_id: "row-replace-v1".to_owned(),
6814 logical_id: "node-replace-vec".to_owned(),
6815 kind: "Doc".to_owned(),
6816 properties: "{}".to_owned(),
6817 source_ref: Some("src".to_owned()),
6818 upsert: false,
6819 chunk_policy: ChunkPolicy::Preserve,
6820 content_ref: None,
6821 }],
6822 node_retires: vec![],
6823 edges: vec![],
6824 edge_retires: vec![],
6825 chunks: vec![ChunkInsert {
6826 id: "chunk-replace-A".to_owned(),
6827 node_logical_id: "node-replace-vec".to_owned(),
6828 text_content: "version one".to_owned(),
6829 byte_start: None,
6830 byte_end: None,
6831 content_hash: None,
6832 }],
6833 runs: vec![],
6834 steps: vec![],
6835 actions: vec![],
6836 optional_backfills: vec![],
6837 vec_inserts: vec![VecInsert {
6838 chunk_id: "chunk-replace-A".to_owned(),
6839 embedding: vec![0.1, 0.2, 0.3],
6840 }],
6841 operational_writes: vec![],
6842 })
6843 .expect("v1 write");
6844
6845 writer
6847 .submit(WriteRequest {
6848 label: "v2".to_owned(),
6849 nodes: vec![NodeInsert {
6850 row_id: "row-replace-v2".to_owned(),
6851 logical_id: "node-replace-vec".to_owned(),
6852 kind: "Doc".to_owned(),
6853 properties: "{}".to_owned(),
6854 source_ref: Some("src".to_owned()),
6855 upsert: true,
6856 chunk_policy: ChunkPolicy::Replace,
6857 content_ref: None,
6858 }],
6859 node_retires: vec![],
6860 edges: vec![],
6861 edge_retires: vec![],
6862 chunks: vec![ChunkInsert {
6863 id: "chunk-replace-B".to_owned(),
6864 node_logical_id: "node-replace-vec".to_owned(),
6865 text_content: "version two".to_owned(),
6866 byte_start: None,
6867 byte_end: None,
6868 content_hash: None,
6869 }],
6870 runs: vec![],
6871 steps: vec![],
6872 actions: vec![],
6873 optional_backfills: vec![],
6874 vec_inserts: vec![VecInsert {
6875 chunk_id: "chunk-replace-B".to_owned(),
6876 embedding: vec![0.4, 0.5, 0.6],
6877 }],
6878 operational_writes: vec![],
6879 })
6880 .expect("v2 write");
6881
6882 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6883 let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
6884 let count_a: i64 = conn
6885 .query_row(
6886 &format!("SELECT count(*) FROM {vec_table} WHERE chunk_id = 'chunk-replace-A'"),
6887 [],
6888 |row| row.get(0),
6889 )
6890 .expect("count A");
6891 let count_b: i64 = conn
6892 .query_row(
6893 &format!("SELECT count(*) FROM {vec_table} WHERE chunk_id = 'chunk-replace-B'"),
6894 [],
6895 |row| row.get(0),
6896 )
6897 .expect("count B");
6898 assert_eq!(
6899 count_a, 0,
6900 "old vec row (chunk-A) must be deleted on Replace"
6901 );
6902 assert_eq!(
6903 count_b, 1,
6904 "new vec row (chunk-B) must be present after Replace"
6905 );
6906 }
6907
6908 #[cfg(feature = "sqlite-vec")]
6909 #[test]
6910 fn vec_insert_is_persisted_when_feature_enabled() {
6911 use crate::sqlite::open_connection_with_vec;
6912
6913 let db = NamedTempFile::new().expect("temporary db");
6914 let schema_manager = Arc::new(SchemaManager::new());
6915
6916 {
6918 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6919 schema_manager.bootstrap(&conn).expect("bootstrap");
6920 }
6921
6922 let writer = WriterActor::start(
6923 db.path(),
6924 Arc::clone(&schema_manager),
6925 ProvenanceMode::Warn,
6926 Arc::new(TelemetryCounters::default()),
6927 )
6928 .expect("writer");
6929
6930 writer
6932 .submit(WriteRequest {
6933 label: "vec-insert".to_owned(),
6934 nodes: vec![NodeInsert {
6935 row_id: "row-vec".to_owned(),
6936 logical_id: "node-vec".to_owned(),
6937 kind: "Document".to_owned(),
6938 properties: "{}".to_owned(),
6939 source_ref: Some("test".to_owned()),
6940 upsert: false,
6941 chunk_policy: ChunkPolicy::Preserve,
6942 content_ref: None,
6943 }],
6944 node_retires: vec![],
6945 edges: vec![],
6946 edge_retires: vec![],
6947 chunks: vec![ChunkInsert {
6948 id: "chunk-vec".to_owned(),
6949 node_logical_id: "node-vec".to_owned(),
6950 text_content: "test text".to_owned(),
6951 byte_start: None,
6952 byte_end: None,
6953 content_hash: None,
6954 }],
6955 runs: vec![],
6956 steps: vec![],
6957 actions: vec![],
6958 optional_backfills: vec![],
6959 vec_inserts: vec![VecInsert {
6960 chunk_id: "chunk-vec".to_owned(),
6961 embedding: vec![0.1, 0.2, 0.3],
6962 }],
6963 operational_writes: vec![],
6964 })
6965 .expect("vec insert write");
6966
6967 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6968 let vec_table = fathomdb_schema::vec_kind_table_name("Document");
6969 let count: i64 = conn
6970 .query_row(
6971 &format!("SELECT count(*) FROM {vec_table} WHERE chunk_id = 'chunk-vec'"),
6972 [],
6973 |row| row.get(0),
6974 )
6975 .expect("count");
6976 assert_eq!(count, 1, "VecInsert must persist a row in vec_document");
6977 }
6978
6979 #[test]
6982 fn write_request_exceeding_node_limit_is_rejected() {
6983 let nodes: Vec<NodeInsert> = (0..10_001)
6984 .map(|i| NodeInsert {
6985 row_id: format!("row-{i}"),
6986 logical_id: format!("lg-{i}"),
6987 kind: "Note".to_owned(),
6988 properties: "{}".to_owned(),
6989 source_ref: None,
6990 upsert: false,
6991 chunk_policy: ChunkPolicy::Preserve,
6992 content_ref: None,
6993 })
6994 .collect();
6995
6996 let request = WriteRequest {
6997 label: "too-many-nodes".to_owned(),
6998 nodes,
6999 node_retires: vec![],
7000 edges: vec![],
7001 edge_retires: vec![],
7002 chunks: vec![],
7003 runs: vec![],
7004 steps: vec![],
7005 actions: vec![],
7006 optional_backfills: vec![],
7007 vec_inserts: vec![],
7008 operational_writes: vec![],
7009 };
7010
7011 let result = prepare_write(request, ProvenanceMode::Warn)
7012 .map(|_| ())
7013 .map_err(|e| format!("{e}"));
7014 assert!(
7015 matches!(result, Err(ref msg) if msg.contains("too many nodes")),
7016 "exceeding node limit must return InvalidWrite: got {result:?}"
7017 );
7018 }
7019
7020 #[test]
7021 fn write_request_exceeding_total_limit_is_rejected() {
7022 let request = WriteRequest {
7026 label: "too-many-total".to_owned(),
7027 nodes: (0..10_000)
7028 .map(|i| NodeInsert {
7029 row_id: format!("row-{i}"),
7030 logical_id: format!("lg-{i}"),
7031 kind: "Note".to_owned(),
7032 properties: "{}".to_owned(),
7033 source_ref: None,
7034 upsert: false,
7035 chunk_policy: ChunkPolicy::Preserve,
7036 content_ref: None,
7037 })
7038 .collect(),
7039 node_retires: vec![],
7040 edges: (0..10_000)
7041 .map(|i| EdgeInsert {
7042 row_id: format!("edge-row-{i}"),
7043 logical_id: format!("edge-lg-{i}"),
7044 kind: "link".to_owned(),
7045 source_logical_id: format!("lg-{i}"),
7046 target_logical_id: format!("lg-{}", i + 1),
7047 properties: "{}".to_owned(),
7048 source_ref: None,
7049 upsert: false,
7050 })
7051 .collect(),
7052 edge_retires: vec![],
7053 chunks: (0..50_000)
7054 .map(|i| ChunkInsert {
7055 id: format!("chunk-{i}"),
7056 node_logical_id: "lg-0".to_owned(),
7057 text_content: "text".to_owned(),
7058 byte_start: None,
7059 byte_end: None,
7060 content_hash: None,
7061 })
7062 .collect(),
7063 runs: vec![],
7064 steps: vec![],
7065 actions: vec![],
7066 optional_backfills: vec![],
7067 vec_inserts: (0..20_001)
7068 .map(|i| VecInsert {
7069 chunk_id: format!("vec-chunk-{i}"),
7070 embedding: vec![0.1],
7071 })
7072 .collect(),
7073 operational_writes: (0..10_000)
7074 .map(|i| OperationalWrite::Append {
7075 collection: format!("col-{i}"),
7076 record_key: format!("key-{i}"),
7077 payload_json: "{}".to_owned(),
7078 source_ref: None,
7079 })
7080 .collect(),
7081 };
7082
7083 let result = prepare_write(request, ProvenanceMode::Warn)
7084 .map(|_| ())
7085 .map_err(|e| format!("{e}"));
7086 assert!(
7087 matches!(result, Err(ref msg) if msg.contains("too many total items")),
7088 "exceeding total item limit must return InvalidWrite: got {result:?}"
7089 );
7090 }
7091
7092 #[test]
7093 fn write_request_within_limits_succeeds() {
7094 let db = NamedTempFile::new().expect("temporary db");
7095 let writer = WriterActor::start(
7096 db.path(),
7097 Arc::new(SchemaManager::new()),
7098 ProvenanceMode::Warn,
7099 Arc::new(TelemetryCounters::default()),
7100 )
7101 .expect("writer");
7102
7103 let result = writer.submit(WriteRequest {
7104 label: "within-limits".to_owned(),
7105 nodes: vec![NodeInsert {
7106 row_id: "row-1".to_owned(),
7107 logical_id: "lg-1".to_owned(),
7108 kind: "Note".to_owned(),
7109 properties: "{}".to_owned(),
7110 source_ref: None,
7111 upsert: false,
7112 chunk_policy: ChunkPolicy::Preserve,
7113 content_ref: None,
7114 }],
7115 node_retires: vec![],
7116 edges: vec![],
7117 edge_retires: vec![],
7118 chunks: vec![],
7119 runs: vec![],
7120 steps: vec![],
7121 actions: vec![],
7122 optional_backfills: vec![],
7123 vec_inserts: vec![],
7124 operational_writes: vec![],
7125 });
7126
7127 assert!(
7128 result.is_ok(),
7129 "write request within limits must succeed: got {result:?}"
7130 );
7131 }
7132
7133 #[test]
7134 fn property_fts_rows_created_on_node_insert() {
7135 let db = NamedTempFile::new().expect("temporary db");
7136 let schema = Arc::new(SchemaManager::new());
7138 {
7139 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7140 schema.bootstrap(&conn).expect("bootstrap");
7141 conn.execute(
7142 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7143 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
7144 [],
7145 )
7146 .expect("register schema");
7147 }
7148 let writer = WriterActor::start(
7149 db.path(),
7150 Arc::clone(&schema),
7151 ProvenanceMode::Warn,
7152 Arc::new(TelemetryCounters::default()),
7153 )
7154 .expect("writer");
7155
7156 writer
7157 .submit(WriteRequest {
7158 label: "goal-insert".to_owned(),
7159 nodes: vec![NodeInsert {
7160 row_id: "row-1".to_owned(),
7161 logical_id: "goal-1".to_owned(),
7162 kind: "Goal".to_owned(),
7163 properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
7164 .to_owned(),
7165 source_ref: Some("src-1".to_owned()),
7166 upsert: false,
7167 chunk_policy: ChunkPolicy::Preserve,
7168 content_ref: None,
7169 }],
7170 node_retires: vec![],
7171 edges: vec![],
7172 edge_retires: vec![],
7173 chunks: vec![],
7174 runs: vec![],
7175 steps: vec![],
7176 actions: vec![],
7177 optional_backfills: vec![],
7178 vec_inserts: vec![],
7179 operational_writes: vec![],
7180 })
7181 .expect("write");
7182
7183 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7184 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7185 let text: String = conn
7186 .query_row(
7187 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7188 [],
7189 |row| row.get(0),
7190 )
7191 .expect("property FTS row must exist");
7192 assert_eq!(text, "Ship v2 Launch the redesign");
7193 }
7194
7195 #[test]
7196 fn property_fts_rows_replaced_on_upsert() {
7197 let db = NamedTempFile::new().expect("temporary db");
7198 let schema = Arc::new(SchemaManager::new());
7199 {
7200 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7201 schema.bootstrap(&conn).expect("bootstrap");
7202 conn.execute(
7203 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7204 VALUES ('Goal', '[\"$.name\"]', ' ')",
7205 [],
7206 )
7207 .expect("register schema");
7208 }
7209 let writer = WriterActor::start(
7210 db.path(),
7211 Arc::clone(&schema),
7212 ProvenanceMode::Warn,
7213 Arc::new(TelemetryCounters::default()),
7214 )
7215 .expect("writer");
7216
7217 writer
7219 .submit(WriteRequest {
7220 label: "insert".to_owned(),
7221 nodes: vec![NodeInsert {
7222 row_id: "row-1".to_owned(),
7223 logical_id: "goal-1".to_owned(),
7224 kind: "Goal".to_owned(),
7225 properties: r#"{"name":"Alpha"}"#.to_owned(),
7226 source_ref: Some("src-1".to_owned()),
7227 upsert: false,
7228 chunk_policy: ChunkPolicy::Preserve,
7229 content_ref: None,
7230 }],
7231 node_retires: vec![],
7232 edges: vec![],
7233 edge_retires: vec![],
7234 chunks: vec![],
7235 runs: vec![],
7236 steps: vec![],
7237 actions: vec![],
7238 optional_backfills: vec![],
7239 vec_inserts: vec![],
7240 operational_writes: vec![],
7241 })
7242 .expect("insert");
7243
7244 writer
7246 .submit(WriteRequest {
7247 label: "upsert".to_owned(),
7248 nodes: vec![NodeInsert {
7249 row_id: "row-2".to_owned(),
7250 logical_id: "goal-1".to_owned(),
7251 kind: "Goal".to_owned(),
7252 properties: r#"{"name":"Beta"}"#.to_owned(),
7253 source_ref: Some("src-2".to_owned()),
7254 upsert: true,
7255 chunk_policy: ChunkPolicy::Preserve,
7256 content_ref: None,
7257 }],
7258 node_retires: vec![],
7259 edges: vec![],
7260 edge_retires: vec![],
7261 chunks: vec![],
7262 runs: vec![],
7263 steps: vec![],
7264 actions: vec![],
7265 optional_backfills: vec![],
7266 vec_inserts: vec![],
7267 operational_writes: vec![],
7268 })
7269 .expect("upsert");
7270
7271 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7272 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7273 let count: i64 = conn
7274 .query_row(
7275 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7276 [],
7277 |row| row.get(0),
7278 )
7279 .expect("count");
7280 assert_eq!(
7281 count, 1,
7282 "must have exactly one property FTS row after upsert"
7283 );
7284
7285 let text: String = conn
7286 .query_row(
7287 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7288 [],
7289 |row| row.get(0),
7290 )
7291 .expect("text");
7292 assert_eq!(text, "Beta", "property FTS must reflect updated properties");
7293 }
7294
7295 #[test]
7296 fn property_fts_rows_deleted_on_retire() {
7297 let db = NamedTempFile::new().expect("temporary db");
7298 let schema = Arc::new(SchemaManager::new());
7299 {
7300 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7301 schema.bootstrap(&conn).expect("bootstrap");
7302 conn.execute(
7303 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7304 VALUES ('Goal', '[\"$.name\"]', ' ')",
7305 [],
7306 )
7307 .expect("register schema");
7308 }
7309 let writer = WriterActor::start(
7310 db.path(),
7311 Arc::clone(&schema),
7312 ProvenanceMode::Warn,
7313 Arc::new(TelemetryCounters::default()),
7314 )
7315 .expect("writer");
7316
7317 writer
7319 .submit(WriteRequest {
7320 label: "insert".to_owned(),
7321 nodes: vec![NodeInsert {
7322 row_id: "row-1".to_owned(),
7323 logical_id: "goal-1".to_owned(),
7324 kind: "Goal".to_owned(),
7325 properties: r#"{"name":"Alpha"}"#.to_owned(),
7326 source_ref: Some("src-1".to_owned()),
7327 upsert: false,
7328 chunk_policy: ChunkPolicy::Preserve,
7329 content_ref: None,
7330 }],
7331 node_retires: vec![],
7332 edges: vec![],
7333 edge_retires: vec![],
7334 chunks: vec![],
7335 runs: vec![],
7336 steps: vec![],
7337 actions: vec![],
7338 optional_backfills: vec![],
7339 vec_inserts: vec![],
7340 operational_writes: vec![],
7341 })
7342 .expect("insert");
7343
7344 writer
7346 .submit(WriteRequest {
7347 label: "retire".to_owned(),
7348 nodes: vec![],
7349 node_retires: vec![NodeRetire {
7350 logical_id: "goal-1".to_owned(),
7351 source_ref: Some("forget-1".to_owned()),
7352 }],
7353 edges: vec![],
7354 edge_retires: vec![],
7355 chunks: vec![],
7356 runs: vec![],
7357 steps: vec![],
7358 actions: vec![],
7359 optional_backfills: vec![],
7360 vec_inserts: vec![],
7361 operational_writes: vec![],
7362 })
7363 .expect("retire");
7364
7365 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7366 let table = fathomdb_schema::fts_kind_table_name("Goal");
7367 let count: i64 = conn
7368 .query_row(
7369 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
7370 [],
7371 |row| row.get(0),
7372 )
7373 .expect("count");
7374 assert_eq!(count, 0, "property FTS row must be deleted on retire");
7375 }
7376
7377 #[test]
7378 fn no_property_fts_row_for_unregistered_kind() {
7379 let db = NamedTempFile::new().expect("temporary db");
7380 let schema = Arc::new(SchemaManager::new());
7381 {
7382 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7383 schema.bootstrap(&conn).expect("bootstrap");
7384 }
7386 let writer = WriterActor::start(
7387 db.path(),
7388 Arc::clone(&schema),
7389 ProvenanceMode::Warn,
7390 Arc::new(TelemetryCounters::default()),
7391 )
7392 .expect("writer");
7393
7394 writer
7395 .submit(WriteRequest {
7396 label: "insert".to_owned(),
7397 nodes: vec![NodeInsert {
7398 row_id: "row-1".to_owned(),
7399 logical_id: "note-1".to_owned(),
7400 kind: "Note".to_owned(),
7401 properties: r#"{"title":"hello"}"#.to_owned(),
7402 source_ref: Some("src-1".to_owned()),
7403 upsert: false,
7404 chunk_policy: ChunkPolicy::Preserve,
7405 content_ref: None,
7406 }],
7407 node_retires: vec![],
7408 edges: vec![],
7409 edge_retires: vec![],
7410 chunks: vec![],
7411 runs: vec![],
7412 steps: vec![],
7413 actions: vec![],
7414 optional_backfills: vec![],
7415 vec_inserts: vec![],
7416 operational_writes: vec![],
7417 })
7418 .expect("insert");
7419
7420 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7421 let table = fathomdb_schema::fts_kind_table_name("Note");
7422 let exists: bool = conn
7423 .query_row(
7424 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1",
7425 rusqlite::params![table],
7426 |row| row.get::<_, i64>(0),
7427 )
7428 .map(|c| c > 0)
7429 .expect("exists check");
7430 assert!(
7431 !exists,
7432 "no per-kind FTS table should be created for unregistered kind"
7433 );
7434 }
7435
7436 #[test]
7439 fn property_fts_write_goes_to_per_kind_table() {
7440 let db = NamedTempFile::new().expect("temporary db");
7442 let schema = Arc::new(SchemaManager::new());
7443 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7444 {
7445 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7446 schema.bootstrap(&conn).expect("bootstrap");
7447 conn.execute(
7448 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7449 VALUES ('Goal', '[\"$.name\"]', ' ')",
7450 [],
7451 )
7452 .expect("register schema");
7453 conn.execute_batch(&format!(
7455 "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} USING fts5(\
7456 node_logical_id UNINDEXED, text_content, \
7457 tokenize = 'porter unicode61 remove_diacritics 2'\
7458 )"
7459 ))
7460 .expect("create per-kind table");
7461 }
7462 let writer = WriterActor::start(
7463 db.path(),
7464 Arc::clone(&schema),
7465 ProvenanceMode::Warn,
7466 Arc::new(TelemetryCounters::default()),
7467 )
7468 .expect("writer");
7469
7470 writer
7471 .submit(WriteRequest {
7472 label: "insert".to_owned(),
7473 nodes: vec![NodeInsert {
7474 row_id: "row-1".to_owned(),
7475 logical_id: "goal-1".to_owned(),
7476 kind: "Goal".to_owned(),
7477 properties: r#"{"name":"Ship v2"}"#.to_owned(),
7478 source_ref: Some("src-1".to_owned()),
7479 upsert: false,
7480 chunk_policy: ChunkPolicy::Preserve,
7481 content_ref: None,
7482 }],
7483 node_retires: vec![],
7484 edges: vec![],
7485 edge_retires: vec![],
7486 chunks: vec![],
7487 runs: vec![],
7488 steps: vec![],
7489 actions: vec![],
7490 optional_backfills: vec![],
7491 vec_inserts: vec![],
7492 operational_writes: vec![],
7493 })
7494 .expect("write");
7495
7496 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7497 let per_kind_count: i64 = conn
7499 .query_row(
7500 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7501 [],
7502 |row| row.get(0),
7503 )
7504 .expect("per-kind count");
7505 assert_eq!(per_kind_count, 1, "per-kind table must have the row");
7506 }
7507
7508 #[test]
7509 fn property_fts_retire_removes_from_per_kind_table() {
7510 let db = NamedTempFile::new().expect("temporary db");
7512 let schema = Arc::new(SchemaManager::new());
7513 {
7514 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7515 schema.bootstrap(&conn).expect("bootstrap");
7516 conn.execute(
7517 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7518 VALUES ('Goal', '[\"$.name\"]', ' ')",
7519 [],
7520 )
7521 .expect("register schema");
7522 conn.execute_batch(
7523 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7524 node_logical_id UNINDEXED, text_content, \
7525 tokenize = 'porter unicode61 remove_diacritics 2'\
7526 )",
7527 )
7528 .expect("create per-kind table");
7529 }
7530 let writer = WriterActor::start(
7531 db.path(),
7532 Arc::clone(&schema),
7533 ProvenanceMode::Warn,
7534 Arc::new(TelemetryCounters::default()),
7535 )
7536 .expect("writer");
7537
7538 writer
7540 .submit(WriteRequest {
7541 label: "insert".to_owned(),
7542 nodes: vec![NodeInsert {
7543 row_id: "row-1".to_owned(),
7544 logical_id: "goal-1".to_owned(),
7545 kind: "Goal".to_owned(),
7546 properties: r#"{"name":"Alpha"}"#.to_owned(),
7547 source_ref: Some("src-1".to_owned()),
7548 upsert: false,
7549 chunk_policy: ChunkPolicy::Preserve,
7550 content_ref: None,
7551 }],
7552 node_retires: vec![],
7553 edges: vec![],
7554 edge_retires: vec![],
7555 chunks: vec![],
7556 runs: vec![],
7557 steps: vec![],
7558 actions: vec![],
7559 optional_backfills: vec![],
7560 vec_inserts: vec![],
7561 operational_writes: vec![],
7562 })
7563 .expect("insert");
7564
7565 writer
7567 .submit(WriteRequest {
7568 label: "retire".to_owned(),
7569 nodes: vec![],
7570 node_retires: vec![NodeRetire {
7571 logical_id: "goal-1".to_owned(),
7572 source_ref: Some("forget-1".to_owned()),
7573 }],
7574 edges: vec![],
7575 edge_retires: vec![],
7576 chunks: vec![],
7577 runs: vec![],
7578 steps: vec![],
7579 actions: vec![],
7580 optional_backfills: vec![],
7581 vec_inserts: vec![],
7582 operational_writes: vec![],
7583 })
7584 .expect("retire");
7585
7586 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7587 let per_kind_count: i64 = conn
7588 .query_row(
7589 "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7590 [],
7591 |row| row.get(0),
7592 )
7593 .expect("per-kind count");
7594 assert_eq!(
7595 per_kind_count, 0,
7596 "per-kind table row must be removed on retire"
7597 );
7598 }
7599
7600 mod extract_json_path_tests {
7601 use super::super::extract_json_path;
7602 use serde_json::json;
7603
7604 #[test]
7605 fn string_value() {
7606 let v = json!({"name": "alice"});
7607 assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7608 }
7609
7610 #[test]
7611 fn number_value() {
7612 let v = json!({"age": 42});
7613 assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7614 }
7615
7616 #[test]
7617 fn bool_value() {
7618 let v = json!({"active": true});
7619 assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7620 }
7621
7622 #[test]
7623 fn null_value() {
7624 let v = json!({"x": null});
7625 assert!(extract_json_path(&v, "$.x").is_empty());
7626 }
7627
7628 #[test]
7629 fn missing_path() {
7630 let v = json!({"name": "a"});
7631 assert!(extract_json_path(&v, "$.missing").is_empty());
7632 }
7633
7634 #[test]
7635 fn nested_path() {
7636 let v = json!({"address": {"city": "NYC"}});
7637 assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7638 }
7639
7640 #[test]
7641 fn array_of_strings() {
7642 let v = json!({"tags": ["a", "b", "c"]});
7643 assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7644 }
7645
7646 #[test]
7647 fn array_mixed_scalars() {
7648 let v = json!({"vals": ["x", 1, true]});
7649 assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7650 }
7651
7652 #[test]
7653 fn array_only_objects_returns_empty() {
7654 let v = json!({"data": [{"k": "v"}]});
7655 assert!(extract_json_path(&v, "$.data").is_empty());
7656 }
7657
7658 #[test]
7659 fn array_mixed_objects_and_scalars() {
7660 let v = json!({"data": ["keep", {"skip": true}, "also"]});
7661 assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7662 }
7663
7664 #[test]
7665 fn object_returns_empty() {
7666 let v = json!({"meta": {"k": "v"}});
7667 assert!(extract_json_path(&v, "$.meta").is_empty());
7668 }
7669
7670 #[test]
7671 fn no_prefix_returns_empty() {
7672 let v = json!({"name": "a"});
7673 assert!(extract_json_path(&v, "name").is_empty());
7674 }
7675 }
7676
7677 mod recursive_extraction_tests {
7678 use super::super::{
7679 LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7680 PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7681 };
7682 use serde_json::json;
7683
7684 fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7685 PropertyFtsSchema {
7686 paths,
7687 separator: " ".to_owned(),
7688 exclude_paths: Vec::new(),
7689 }
7690 }
7691
7692 fn schema_with_excludes(
7693 paths: Vec<PropertyPathEntry>,
7694 excludes: Vec<String>,
7695 ) -> PropertyFtsSchema {
7696 PropertyFtsSchema {
7697 paths,
7698 separator: " ".to_owned(),
7699 exclude_paths: excludes,
7700 }
7701 }
7702
7703 #[test]
7704 fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7705 let props = json!({"payload": {"b": "two", "a": "one"}});
7706 let (blob, positions, _stats) = extract_property_fts(
7707 &props,
7708 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7709 );
7710 let blob = blob.expect("blob emitted");
7711 let idx_one = blob.find("one").expect("contains 'one'");
7712 let idx_two = blob.find("two").expect("contains 'two'");
7713 assert!(
7714 idx_one < idx_two,
7715 "lex order: 'one' (key a) before 'two' (key b)"
7716 );
7717 assert_eq!(positions.len(), 2);
7718 assert_eq!(positions[0].leaf_path, "$.payload.a");
7719 assert_eq!(positions[1].leaf_path, "$.payload.b");
7720 }
7721
7722 #[test]
7723 fn recursive_extraction_walks_arrays_of_scalars() {
7724 let props = json!({"tags": ["red", "blue"]});
7725 let (_blob, positions, _stats) = extract_property_fts(
7726 &props,
7727 &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7728 );
7729 assert_eq!(positions.len(), 2);
7730 assert_eq!(positions[0].leaf_path, "$.tags[0]");
7731 assert_eq!(positions[1].leaf_path, "$.tags[1]");
7732 }
7733
7734 #[test]
7735 fn recursive_extraction_walks_arrays_of_objects() {
7736 let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7737 let (_blob, positions, _stats) = extract_property_fts(
7738 &props,
7739 &schema(vec![PropertyPathEntry::recursive("$.items")]),
7740 );
7741 assert_eq!(positions.len(), 2);
7742 assert_eq!(positions[0].leaf_path, "$.items[0].name");
7743 assert_eq!(positions[1].leaf_path, "$.items[1].name");
7744 }
7745
7746 #[test]
7747 fn recursive_extraction_stringifies_numbers_and_bools() {
7748 let props = json!({"root": {"n": 42, "ok": true}});
7749 let (blob, _positions, _stats) = extract_property_fts(
7750 &props,
7751 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7752 );
7753 let blob = blob.expect("blob emitted");
7754 assert!(blob.contains("42"));
7755 assert!(blob.contains("true"));
7756 }
7757
7758 #[test]
7759 fn recursive_extraction_skips_nulls_and_missing() {
7760 let props = json!({"root": {"x": null, "y": "present"}});
7761 let (blob, positions, _stats) = extract_property_fts(
7762 &props,
7763 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7764 );
7765 let blob = blob.expect("blob emitted");
7766 assert!(!blob.contains("null"));
7767 assert_eq!(positions.len(), 1);
7768 assert_eq!(positions[0].leaf_path, "$.root.y");
7769 }
7770
7771 #[test]
7772 fn recursive_extraction_respects_max_depth_guardrail() {
7773 let mut inner = json!("leaf-value");
7775 for _ in 0..10 {
7776 inner = json!({ "k": inner });
7777 }
7778 let props = json!({ "root": inner });
7779 let (blob, positions, stats) = extract_property_fts(
7780 &props,
7781 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7782 );
7783 assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7784 assert!(
7786 blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7787 "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7788 );
7789 let _ = positions;
7792 let _ = MAX_RECURSIVE_DEPTH;
7793 }
7794
7795 #[test]
7796 fn recursive_extraction_respects_max_bytes_guardrail() {
7797 let leaves: Vec<String> = (0..40)
7799 .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7800 .collect();
7801 let props = json!({ "root": leaves });
7802 let (blob, positions, stats) = extract_property_fts(
7803 &props,
7804 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7805 );
7806 assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7807 let blob = blob.expect("blob must still be emitted");
7808 assert!(
7809 blob.len() <= MAX_EXTRACTED_BYTES,
7810 "blob must not exceed MAX_EXTRACTED_BYTES"
7811 );
7812 for pos in &positions {
7814 assert!(pos.end_offset <= blob.len());
7815 let slice = &blob[pos.start_offset..pos.end_offset];
7816 assert!(!slice.is_empty());
7819 assert!(!slice.contains(LEAF_SEPARATOR));
7820 }
7821 assert!(!blob.ends_with(LEAF_SEPARATOR));
7823 }
7824
7825 #[test]
7826 fn recursive_extraction_respects_exclude_paths() {
7827 let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7828 let (blob, positions, stats) = extract_property_fts(
7829 &props,
7830 &schema_with_excludes(
7831 vec![PropertyPathEntry::recursive("$.payload")],
7832 vec!["$.payload.priv".to_owned()],
7833 ),
7834 );
7835 let blob = blob.expect("blob emitted");
7836 assert!(blob.contains("yes"));
7837 assert!(!blob.contains("no"));
7838 assert_eq!(positions.len(), 1);
7839 assert_eq!(positions[0].leaf_path, "$.payload.pub");
7840 assert!(stats.excluded_subtree > 0);
7841 }
7842
7843 #[test]
7844 fn position_map_entries_match_emitted_leaves_in_order() {
7845 let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7846 let (blob, positions, _stats) = extract_property_fts(
7847 &props,
7848 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7849 );
7850 let blob = blob.expect("blob emitted");
7851 assert_eq!(positions.len(), 3);
7852 assert_eq!(positions[0].leaf_path, "$.root.a");
7854 assert_eq!(positions[1].leaf_path, "$.root.b");
7855 assert_eq!(positions[2].leaf_path, "$.root.c");
7856 let mut prev_end: usize = 0;
7858 for (i, pos) in positions.iter().enumerate() {
7859 assert!(pos.start_offset >= prev_end);
7860 assert!(pos.end_offset > pos.start_offset);
7861 assert!(pos.end_offset <= blob.len());
7862 let slice = &blob[pos.start_offset..pos.end_offset];
7863 match i {
7864 0 => assert_eq!(slice, "alpha"),
7865 1 => assert_eq!(slice, "bravo"),
7866 2 => assert_eq!(slice, "charlie"),
7867 _ => unreachable!(),
7868 }
7869 prev_end = pos.end_offset;
7870 }
7871 }
7872
7873 #[test]
7874 fn scalar_only_schema_produces_empty_position_map() {
7875 let props = json!({"name": "alpha", "title": "beta"});
7876 let (blob, positions, _stats) = extract_property_fts(
7877 &props,
7878 &schema(vec![
7879 PropertyPathEntry::scalar("$.name"),
7880 PropertyPathEntry::scalar("$.title"),
7881 ]),
7882 );
7883 assert_eq!(blob.as_deref(), Some("alpha beta"));
7884 assert!(
7885 positions.is_empty(),
7886 "scalar-only schema must emit no position entries"
7887 );
7888 let _: Vec<PositionEntry> = positions;
7891 }
7892
7893 fn assert_unique_start_offsets(positions: &[PositionEntry]) {
7894 let mut seen = std::collections::HashSet::new();
7895 for pos in positions {
7896 let start = pos.start_offset;
7897 assert!(
7898 seen.insert(start),
7899 "duplicate start_offset {start} in positions {positions:?}"
7900 );
7901 }
7902 }
7903
7904 #[test]
7905 fn recursive_extraction_empty_then_nonempty_in_array() {
7906 let props = json!({"payload": {"xs": ["", "x"]}});
7907 let (combined, positions, _stats) = extract_property_fts(
7908 &props,
7909 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7910 );
7911 assert_eq!(combined.as_deref(), Some("x"));
7912 assert_eq!(positions.len(), 1);
7913 assert_eq!(positions[0].leaf_path, "$.payload.xs[1]");
7914 assert_eq!(positions[0].start_offset, 0);
7915 assert_eq!(positions[0].end_offset, 1);
7916 assert_unique_start_offsets(&positions);
7917 }
7918
7919 #[test]
7920 fn recursive_extraction_two_empties_then_nonempty_in_array() {
7921 let props = json!({"payload": {"xs": ["", "", "x"]}});
7922 let (combined, positions, _stats) = extract_property_fts(
7923 &props,
7924 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7925 );
7926 assert_eq!(combined.as_deref(), Some("x"));
7927 assert_eq!(positions.len(), 1);
7928 assert_eq!(positions[0].leaf_path, "$.payload.xs[2]");
7929 assert_eq!(positions[0].start_offset, 0);
7930 assert_eq!(positions[0].end_offset, 1);
7931 assert_unique_start_offsets(&positions);
7932 }
7933
7934 #[test]
7935 fn recursive_extraction_empty_then_nonempty_sibling_keys() {
7936 let props = json!({"payload": {"a": "", "b": "x"}});
7937 let (combined, positions, _stats) = extract_property_fts(
7938 &props,
7939 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7940 );
7941 assert_eq!(combined.as_deref(), Some("x"));
7942 assert_eq!(positions.len(), 1);
7943 assert_eq!(positions[0].leaf_path, "$.payload.b");
7944 assert_eq!(positions[0].start_offset, 0);
7945 assert_eq!(positions[0].end_offset, 1);
7946 assert_unique_start_offsets(&positions);
7947 }
7948
7949 #[test]
7950 fn recursive_extraction_nested_empty_then_nonempty_sibling_keys() {
7951 let props = json!({"payload": {"inner": {"a": "", "b": "x"}}});
7952 let (combined, positions, _stats) = extract_property_fts(
7953 &props,
7954 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7955 );
7956 assert_eq!(combined.as_deref(), Some("x"));
7957 assert_eq!(positions.len(), 1);
7958 assert_eq!(positions[0].leaf_path, "$.payload.inner.b");
7959 assert_eq!(positions[0].start_offset, 0);
7960 assert_eq!(positions[0].end_offset, 1);
7961 assert_unique_start_offsets(&positions);
7962 }
7963
7964 #[test]
7965 fn recursive_extraction_descent_past_empty_sibling_into_nested_subtree() {
7966 let props = json!({"payload": {"a": "", "b": {"c": "x"}}});
7967 let (combined, positions, _stats) = extract_property_fts(
7968 &props,
7969 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7970 );
7971 assert_eq!(combined.as_deref(), Some("x"));
7972 assert_eq!(positions.len(), 1);
7973 assert_eq!(positions[0].leaf_path, "$.payload.b.c");
7974 assert_eq!(positions[0].start_offset, 0);
7975 assert_eq!(positions[0].end_offset, 1);
7976 assert_unique_start_offsets(&positions);
7977 }
7978
7979 #[test]
7980 fn recursive_extraction_all_empty_shapes_emit_no_positions() {
7981 let cases = vec![
7982 json!({"payload": {}}),
7983 json!({"payload": {"a": ""}}),
7984 json!({"payload": {"xs": []}}),
7985 json!({"payload": {"xs": [""]}}),
7986 json!({"payload": {"xs": ["", ""]}}),
7987 json!({"payload": {"xs": ["", "", ""]}}),
7988 ];
7989 for case in cases {
7990 let (combined, positions, _stats) = extract_property_fts(
7991 &case,
7992 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7993 );
7994 assert!(
7995 combined.is_none(),
7996 "all-empty payload {case:?} must produce no combined text, got {combined:?}"
7997 );
7998 assert!(
7999 positions.is_empty(),
8000 "all-empty payload {case:?} must produce no positions, got {positions:?}"
8001 );
8002 }
8003 }
8004
8005 #[test]
8006 fn recursive_extraction_nonempty_then_empty_then_nonempty() {
8007 let props = json!({"payload": {"xs": ["x", "", "y"]}});
8008 let (combined, positions, _stats) = extract_property_fts(
8009 &props,
8010 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
8011 );
8012 let blob = combined.expect("blob emitted");
8013 assert_eq!(positions.len(), 2);
8014 assert_eq!(positions[0].leaf_path, "$.payload.xs[0]");
8015 assert_eq!(positions[1].leaf_path, "$.payload.xs[2]");
8016 assert_eq!(positions[0].start_offset, 0);
8017 assert_eq!(positions[0].end_offset, 1);
8018 assert_eq!(positions[1].start_offset, 1 + LEAF_SEPARATOR.len());
8022 assert_eq!(positions[1].end_offset, 2 + LEAF_SEPARATOR.len());
8023 assert_eq!(
8024 &blob[positions[0].start_offset..positions[0].end_offset],
8025 "x"
8026 );
8027 assert_eq!(
8028 &blob[positions[1].start_offset..positions[1].end_offset],
8029 "y"
8030 );
8031 assert_unique_start_offsets(&positions);
8032 }
8033
8034 #[test]
8035 fn recursive_extraction_null_leaves_unchanged() {
8036 let props = json!({"payload": {"xs": [null, null]}});
8037 let (combined, positions, _stats) = extract_property_fts(
8038 &props,
8039 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
8040 );
8041 assert!(combined.is_none());
8042 assert!(positions.is_empty());
8043 }
8044 }
8045
8046 mod parse_property_schema_json_tests {
8047 use super::super::parse_property_schema_json;
8048
8049 #[test]
8050 fn parse_property_schema_json_preserves_weight() {
8051 let json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar"}]"#;
8052 let schema = parse_property_schema_json(json, " ");
8053 assert_eq!(schema.paths.len(), 2);
8054 let title = &schema.paths[0];
8055 assert_eq!(title.path, "$.title");
8056 assert_eq!(title.weight, Some(2.0_f32));
8057 let body = &schema.paths[1];
8058 assert_eq!(body.path, "$.body");
8059 assert_eq!(body.weight, None);
8060 }
8061 }
8062
8063 mod extract_property_fts_columns_tests {
8066 use serde_json::json;
8067
8068 use super::super::{PropertyFtsSchema, PropertyPathEntry, extract_property_fts_columns};
8069
8070 fn weighted_schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
8071 PropertyFtsSchema {
8072 paths,
8073 separator: " ".to_owned(),
8074 exclude_paths: Vec::new(),
8075 }
8076 }
8077
8078 #[test]
8079 fn extract_property_fts_columns_returns_per_spec_text() {
8080 let props = json!({"title": "Hello", "body": "World"});
8081 let mut title_entry = PropertyPathEntry::scalar("$.title");
8082 title_entry.weight = Some(2.0);
8083 let mut body_entry = PropertyPathEntry::scalar("$.body");
8084 body_entry.weight = Some(1.0);
8085 let schema = weighted_schema(vec![title_entry, body_entry]);
8086 let cols = extract_property_fts_columns(&props, &schema);
8087 assert_eq!(cols.len(), 2);
8088 assert_eq!(cols[0].0, "title");
8089 assert_eq!(cols[0].1, "Hello");
8090 assert_eq!(cols[1].0, "body");
8091 assert_eq!(cols[1].1, "World");
8092 }
8093
8094 #[test]
8095 fn extract_property_fts_columns_missing_field_returns_empty_string() {
8096 let props = json!({"title": "Hello"});
8097 let mut title_entry = PropertyPathEntry::scalar("$.title");
8098 title_entry.weight = Some(2.0);
8099 let mut body_entry = PropertyPathEntry::scalar("$.body");
8100 body_entry.weight = Some(1.0);
8101 let schema = weighted_schema(vec![title_entry, body_entry]);
8102 let cols = extract_property_fts_columns(&props, &schema);
8103 assert_eq!(cols.len(), 2);
8104 assert_eq!(cols[0].1, "Hello");
8105 assert_eq!(cols[1].1, "", "missing field yields empty string");
8106 }
8107 }
8108
8109 #[test]
8112 fn writer_inserts_per_column_for_weighted_schema() {
8113 use fathomdb_schema::fts_column_name;
8114
8115 let db = NamedTempFile::new().expect("temporary db");
8116 let schema_manager = Arc::new(SchemaManager::new());
8117
8118 let article_table = fathomdb_schema::fts_kind_table_name("Article");
8119 {
8121 let conn = rusqlite::Connection::open(db.path()).expect("conn");
8122 schema_manager.bootstrap(&conn).expect("bootstrap");
8123
8124 let title_col = fts_column_name("$.title", false);
8125 let body_col = fts_column_name("$.body", false);
8126
8127 let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#.to_string();
8129 conn.execute(
8130 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
8131 VALUES (?1, ?2, ' ')",
8132 rusqlite::params!["Article", paths_json],
8133 )
8134 .expect("register schema");
8135
8136 conn.execute_batch(&format!(
8138 "CREATE VIRTUAL TABLE IF NOT EXISTS {article_table} USING fts5(\
8139 node_logical_id UNINDEXED, {title_col}, {body_col}, \
8140 tokenize = 'porter unicode61 remove_diacritics 2'\
8141 )"
8142 ))
8143 .expect("create weighted per-kind table");
8144 }
8145
8146 let writer = WriterActor::start(
8147 db.path(),
8148 Arc::clone(&schema_manager),
8149 ProvenanceMode::Warn,
8150 Arc::new(TelemetryCounters::default()),
8151 )
8152 .expect("writer");
8153
8154 writer
8155 .submit(WriteRequest {
8156 label: "insert".to_owned(),
8157 nodes: vec![NodeInsert {
8158 row_id: "row-1".to_owned(),
8159 logical_id: "article-1".to_owned(),
8160 kind: "Article".to_owned(),
8161 properties: r#"{"title":"Hello World","body":"Foo Bar"}"#.to_owned(),
8162 source_ref: Some("src-1".to_owned()),
8163 upsert: false,
8164 chunk_policy: ChunkPolicy::Preserve,
8165 content_ref: None,
8166 }],
8167 node_retires: vec![],
8168 edges: vec![],
8169 edge_retires: vec![],
8170 chunks: vec![],
8171 runs: vec![],
8172 steps: vec![],
8173 actions: vec![],
8174 optional_backfills: vec![],
8175 vec_inserts: vec![],
8176 operational_writes: vec![],
8177 })
8178 .expect("write");
8179
8180 let conn = rusqlite::Connection::open(db.path()).expect("conn");
8181 let title_col = fts_column_name("$.title", false);
8182 let body_col = fts_column_name("$.body", false);
8183
8184 let count: i64 = conn
8186 .query_row(
8187 &format!(
8188 "SELECT count(*) FROM {article_table} WHERE node_logical_id = 'article-1'"
8189 ),
8190 [],
8191 |r| r.get(0),
8192 )
8193 .expect("count");
8194 assert_eq!(count, 1, "per-kind FTS table must have the row");
8195
8196 let (title_val, body_val): (String, String) = conn
8198 .query_row(
8199 &format!(
8200 "SELECT {title_col}, {body_col} FROM {article_table} \
8201 WHERE node_logical_id = 'article-1'"
8202 ),
8203 [],
8204 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
8205 )
8206 .expect("select per-column");
8207 assert_eq!(
8208 title_val, "Hello World",
8209 "title column must have correct value"
8210 );
8211 assert_eq!(body_val, "Foo Bar", "body column must have correct value");
8212 }
8213}