1mod fts_extract;
2
3#[allow(unused_imports)]
10pub(crate) use fts_extract::{
11 ExtractStats, LEAF_SEPARATOR, PositionEntry, PropertyFtsSchema, PropertyPathMode,
12 extract_property_fts, extract_property_fts_columns, load_fts_property_schemas,
13 parse_property_schema_json,
14};
15
16#[cfg(test)]
18#[allow(unused_imports)]
19pub(crate) use fts_extract::{
20 MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PropertyPathEntry, extract_json_path,
21};
22
23use std::collections::HashMap;
24use std::mem::ManuallyDrop;
25use std::panic::AssertUnwindSafe;
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::mpsc::{self, Sender, SyncSender};
29use std::thread;
30use std::time::Duration;
31
32use fathomdb_schema::{DEFAULT_FTS_TOKENIZER, SchemaManager};
33use rusqlite::{OptionalExtension, TransactionBehavior, params};
34
35use crate::operational::{
36 OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
37 OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
38 OperationalValidationMode, extract_secondary_index_entries_for_current,
39 extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
40 parse_operational_validation_contract, validate_operational_payload_against_contract,
41};
42use crate::telemetry::TelemetryCounters;
43use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
44
45#[derive(Clone, Debug, PartialEq, Eq)]
47pub struct OptionalProjectionTask {
48 pub target: ProjectionTarget,
50 pub payload: String,
52}
53
54#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
56pub enum ChunkPolicy {
57 #[default]
59 Preserve,
60 Replace,
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
66pub enum ProvenanceMode {
67 #[default]
69 Warn,
70 Require,
73}
74
75#[derive(Clone, Debug, PartialEq, Eq)]
77pub struct NodeInsert {
78 pub row_id: String,
79 pub logical_id: String,
80 pub kind: String,
81 pub properties: String,
82 pub source_ref: Option<String>,
83 pub upsert: bool,
86 pub chunk_policy: ChunkPolicy,
88 pub content_ref: Option<String>,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeInsert {
95 pub row_id: String,
96 pub logical_id: String,
97 pub source_logical_id: String,
98 pub target_logical_id: String,
99 pub kind: String,
100 pub properties: String,
101 pub source_ref: Option<String>,
102 pub upsert: bool,
105}
106
107#[derive(Clone, Debug, PartialEq, Eq)]
109pub struct NodeRetire {
110 pub logical_id: String,
111 pub source_ref: Option<String>,
112}
113
114#[derive(Clone, Debug, PartialEq, Eq)]
116pub struct EdgeRetire {
117 pub logical_id: String,
118 pub source_ref: Option<String>,
119}
120
121#[derive(Clone, Debug, PartialEq, Eq)]
123pub struct ChunkInsert {
124 pub id: String,
125 pub node_logical_id: String,
126 pub text_content: String,
127 pub byte_start: Option<i64>,
128 pub byte_end: Option<i64>,
129 pub content_hash: Option<String>,
131}
132
133#[derive(Clone, Debug, PartialEq)]
140pub struct VecInsert {
141 pub chunk_id: String,
142 pub embedding: Vec<f32>,
143}
144
145#[derive(Clone, Debug, PartialEq, Eq)]
147pub enum OperationalWrite {
148 Append {
149 collection: String,
150 record_key: String,
151 payload_json: String,
152 source_ref: Option<String>,
153 },
154 Put {
155 collection: String,
156 record_key: String,
157 payload_json: String,
158 source_ref: Option<String>,
159 },
160 Delete {
161 collection: String,
162 record_key: String,
163 source_ref: Option<String>,
164 },
165}
166
167#[derive(Clone, Debug, PartialEq, Eq)]
169pub struct RunInsert {
170 pub id: String,
171 pub kind: String,
172 pub status: String,
173 pub properties: String,
174 pub source_ref: Option<String>,
175 pub upsert: bool,
176 pub supersedes_id: Option<String>,
177}
178
179#[derive(Clone, Debug, PartialEq, Eq)]
181pub struct StepInsert {
182 pub id: String,
183 pub run_id: String,
184 pub kind: String,
185 pub status: String,
186 pub properties: String,
187 pub source_ref: Option<String>,
188 pub upsert: bool,
189 pub supersedes_id: Option<String>,
190}
191
192#[derive(Clone, Debug, PartialEq, Eq)]
194pub struct ActionInsert {
195 pub id: String,
196 pub step_id: String,
197 pub kind: String,
198 pub status: String,
199 pub properties: String,
200 pub source_ref: Option<String>,
201 pub upsert: bool,
202 pub supersedes_id: Option<String>,
203}
204
205const MAX_NODES: usize = 10_000;
207const MAX_EDGES: usize = 10_000;
208const MAX_CHUNKS: usize = 50_000;
209const MAX_RETIRES: usize = 10_000;
210const MAX_RUNTIME_ITEMS: usize = 10_000;
211const MAX_OPERATIONAL: usize = 10_000;
212const MAX_TOTAL_ITEMS: usize = 100_000;
213
214const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
221
222#[derive(Clone, Debug, PartialEq)]
224pub struct WriteRequest {
225 pub label: String,
226 pub nodes: Vec<NodeInsert>,
227 pub node_retires: Vec<NodeRetire>,
228 pub edges: Vec<EdgeInsert>,
229 pub edge_retires: Vec<EdgeRetire>,
230 pub chunks: Vec<ChunkInsert>,
231 pub runs: Vec<RunInsert>,
232 pub steps: Vec<StepInsert>,
233 pub actions: Vec<ActionInsert>,
234 pub optional_backfills: Vec<OptionalProjectionTask>,
235 pub vec_inserts: Vec<VecInsert>,
238 pub operational_writes: Vec<OperationalWrite>,
239}
240
241#[derive(Clone, Debug, PartialEq, Eq)]
243pub struct WriteReceipt {
244 pub label: String,
245 pub optional_backfill_count: usize,
246 pub warnings: Vec<String>,
247 pub provenance_warnings: Vec<String>,
248}
249
250#[derive(Clone, Debug, PartialEq, Eq)]
252pub struct LastAccessTouchRequest {
253 pub logical_ids: Vec<String>,
254 pub touched_at: i64,
255 pub source_ref: Option<String>,
256}
257
258#[derive(Clone, Debug, PartialEq, Eq)]
260pub struct LastAccessTouchReport {
261 pub touched_logical_ids: usize,
262 pub touched_at: i64,
263}
264
265#[derive(Clone, Debug, PartialEq, Eq)]
266struct FtsProjectionRow {
267 chunk_id: String,
268 node_logical_id: String,
269 kind: String,
270 text_content: String,
271}
272
273#[derive(Clone, Debug, PartialEq, Eq)]
274struct FtsPropertyProjectionRow {
275 node_logical_id: String,
276 kind: String,
277 text_content: String,
278 positions: Vec<PositionEntry>,
279 columns: Vec<(String, String)>,
282}
283
284struct PreparedWrite {
285 label: String,
286 nodes: Vec<NodeInsert>,
287 node_retires: Vec<NodeRetire>,
288 edges: Vec<EdgeInsert>,
289 edge_retires: Vec<EdgeRetire>,
290 chunks: Vec<ChunkInsert>,
291 runs: Vec<RunInsert>,
292 steps: Vec<StepInsert>,
293 actions: Vec<ActionInsert>,
294 #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
297 vec_inserts: Vec<VecInsert>,
298 operational_writes: Vec<OperationalWrite>,
299 operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
300 operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
301 operational_validation_warnings: Vec<String>,
302 node_kinds: HashMap<String, String>,
305 required_fts_rows: Vec<FtsProjectionRow>,
307 required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
309 optional_backfills: Vec<OptionalProjectionTask>,
310}
311
312enum WriteMessage {
313 Submit {
314 prepared: Box<PreparedWrite>,
315 reply: Sender<Result<WriteReceipt, EngineError>>,
316 },
317 TouchLastAccessed {
318 request: LastAccessTouchRequest,
319 reply: Sender<Result<LastAccessTouchReport, EngineError>>,
320 },
321}
322
323#[derive(Debug)]
328pub struct WriterActor {
329 sender: ManuallyDrop<SyncSender<WriteMessage>>,
330 thread_handle: Option<thread::JoinHandle<()>>,
331 provenance_mode: ProvenanceMode,
332 _telemetry: Arc<TelemetryCounters>,
334}
335
336impl WriterActor {
337 pub fn start(
340 path: impl AsRef<Path>,
341 schema_manager: Arc<SchemaManager>,
342 provenance_mode: ProvenanceMode,
343 telemetry: Arc<TelemetryCounters>,
344 ) -> Result<Self, EngineError> {
345 let database_path = path.as_ref().to_path_buf();
346 let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
347
348 let writer_telemetry = Arc::clone(&telemetry);
349 let handle = thread::Builder::new()
350 .name("fathomdb-writer".to_owned())
351 .spawn(move || {
352 writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
353 })
354 .map_err(EngineError::Io)?;
355
356 Ok(Self {
357 sender: ManuallyDrop::new(sender),
358 thread_handle: Some(handle),
359 provenance_mode,
360 _telemetry: telemetry,
361 })
362 }
363
364 fn is_thread_alive(&self) -> bool {
366 self.thread_handle
367 .as_ref()
368 .is_some_and(|h| !h.is_finished())
369 }
370
371 fn check_thread_alive(&self) -> Result<(), EngineError> {
373 if self.is_thread_alive() {
374 Ok(())
375 } else {
376 Err(EngineError::WriterRejected(
377 "writer thread has exited".to_owned(),
378 ))
379 }
380 }
381
382 pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
386 self.check_thread_alive()?;
387 let prepared = prepare_write(request, self.provenance_mode)?;
388 let (reply_tx, reply_rx) = mpsc::channel();
389 self.sender
390 .send(WriteMessage::Submit {
391 prepared: Box::new(prepared),
392 reply: reply_tx,
393 })
394 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
395
396 recv_with_timeout(&reply_rx)
397 }
398
399 pub fn touch_last_accessed(
403 &self,
404 request: LastAccessTouchRequest,
405 ) -> Result<LastAccessTouchReport, EngineError> {
406 self.check_thread_alive()?;
407 prepare_touch_last_accessed(&request, self.provenance_mode)?;
408 let (reply_tx, reply_rx) = mpsc::channel();
409 self.sender
410 .send(WriteMessage::TouchLastAccessed {
411 request,
412 reply: reply_tx,
413 })
414 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
415
416 recv_with_timeout(&reply_rx)
417 }
418}
419
420#[cfg(not(feature = "tracing"))]
421#[allow(clippy::print_stderr)]
422fn stderr_panic_notice() {
423 eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
424}
425
426impl Drop for WriterActor {
427 fn drop(&mut self) {
428 unsafe { ManuallyDrop::drop(&mut self.sender) };
435
436 if let Some(handle) = self.thread_handle.take() {
438 match handle.join() {
439 Ok(()) => {}
440 Err(payload) => {
441 if std::thread::panicking() {
442 trace_warn!(
443 "writer thread panicked during shutdown (suppressed: already panicking)"
444 );
445 #[cfg(not(feature = "tracing"))]
446 stderr_panic_notice();
447 } else {
448 std::panic::resume_unwind(payload);
449 }
450 }
451 }
452 }
453 }
454}
455
456fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
458 rx.recv_timeout(WRITER_REPLY_TIMEOUT)
459 .map_err(|error| match error {
460 mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
461 "write timed out waiting for writer thread reply — the write may still commit"
462 .to_owned(),
463 ),
464 mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
465 })
466 .and_then(|result| result)
467}
468
469fn prepare_touch_last_accessed(
470 request: &LastAccessTouchRequest,
471 mode: ProvenanceMode,
472) -> Result<(), EngineError> {
473 if request.logical_ids.is_empty() {
474 return Err(EngineError::InvalidWrite(
475 "touch_last_accessed requires at least one logical_id".to_owned(),
476 ));
477 }
478 for logical_id in &request.logical_ids {
479 if logical_id.trim().is_empty() {
480 return Err(EngineError::InvalidWrite(
481 "touch_last_accessed requires non-empty logical_ids".to_owned(),
482 ));
483 }
484 }
485 if mode == ProvenanceMode::Require && request.source_ref.is_none() {
486 return Err(EngineError::InvalidWrite(
487 "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
488 .to_owned(),
489 ));
490 }
491 Ok(())
492}
493
494fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
495 let missing: Vec<String> = request
496 .nodes
497 .iter()
498 .filter(|n| n.source_ref.is_none())
499 .map(|n| format!("node '{}'", n.logical_id))
500 .chain(
501 request
502 .node_retires
503 .iter()
504 .filter(|r| r.source_ref.is_none())
505 .map(|r| format!("node retire '{}'", r.logical_id)),
506 )
507 .chain(
508 request
509 .edges
510 .iter()
511 .filter(|e| e.source_ref.is_none())
512 .map(|e| format!("edge '{}'", e.logical_id)),
513 )
514 .chain(
515 request
516 .edge_retires
517 .iter()
518 .filter(|r| r.source_ref.is_none())
519 .map(|r| format!("edge retire '{}'", r.logical_id)),
520 )
521 .chain(
522 request
523 .runs
524 .iter()
525 .filter(|r| r.source_ref.is_none())
526 .map(|r| format!("run '{}'", r.id)),
527 )
528 .chain(
529 request
530 .steps
531 .iter()
532 .filter(|s| s.source_ref.is_none())
533 .map(|s| format!("step '{}'", s.id)),
534 )
535 .chain(
536 request
537 .actions
538 .iter()
539 .filter(|a| a.source_ref.is_none())
540 .map(|a| format!("action '{}'", a.id)),
541 )
542 .chain(
543 request
544 .operational_writes
545 .iter()
546 .filter(|write| operational_write_source_ref(write).is_none())
547 .map(|write| {
548 format!(
549 "operational {} '{}:{}'",
550 operational_write_kind(write),
551 operational_write_collection(write),
552 operational_write_record_key(write)
553 )
554 }),
555 )
556 .collect();
557
558 if missing.is_empty() {
559 Ok(())
560 } else {
561 Err(EngineError::InvalidWrite(format!(
562 "ProvenanceMode::Require: missing source_ref on: {}",
563 missing.join(", ")
564 )))
565 }
566}
567
568fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
569 if request.nodes.len() > MAX_NODES {
570 return Err(EngineError::InvalidWrite(format!(
571 "too many nodes: {} exceeds limit of {MAX_NODES}",
572 request.nodes.len()
573 )));
574 }
575 if request.edges.len() > MAX_EDGES {
576 return Err(EngineError::InvalidWrite(format!(
577 "too many edges: {} exceeds limit of {MAX_EDGES}",
578 request.edges.len()
579 )));
580 }
581 if request.chunks.len() > MAX_CHUNKS {
582 return Err(EngineError::InvalidWrite(format!(
583 "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
584 request.chunks.len()
585 )));
586 }
587 let retires = request.node_retires.len() + request.edge_retires.len();
588 if retires > MAX_RETIRES {
589 return Err(EngineError::InvalidWrite(format!(
590 "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
591 )));
592 }
593 let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
594 if runtime_items > MAX_RUNTIME_ITEMS {
595 return Err(EngineError::InvalidWrite(format!(
596 "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
597 )));
598 }
599 if request.operational_writes.len() > MAX_OPERATIONAL {
600 return Err(EngineError::InvalidWrite(format!(
601 "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
602 request.operational_writes.len()
603 )));
604 }
605 let total = request.nodes.len()
606 + request.node_retires.len()
607 + request.edges.len()
608 + request.edge_retires.len()
609 + request.chunks.len()
610 + request.runs.len()
611 + request.steps.len()
612 + request.actions.len()
613 + request.vec_inserts.len()
614 + request.operational_writes.len();
615 if total > MAX_TOTAL_ITEMS {
616 return Err(EngineError::InvalidWrite(format!(
617 "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
618 )));
619 }
620 Ok(())
621}
622
623#[allow(clippy::too_many_lines)]
624fn prepare_write(
625 request: WriteRequest,
626 mode: ProvenanceMode,
627) -> Result<PreparedWrite, EngineError> {
628 validate_request_size(&request)?;
629
630 for node in &request.nodes {
632 if node.row_id.is_empty() {
633 return Err(EngineError::InvalidWrite(
634 "NodeInsert has empty row_id".to_owned(),
635 ));
636 }
637 if node.logical_id.is_empty() {
638 return Err(EngineError::InvalidWrite(
639 "NodeInsert has empty logical_id".to_owned(),
640 ));
641 }
642 }
643 for edge in &request.edges {
644 if edge.row_id.is_empty() {
645 return Err(EngineError::InvalidWrite(
646 "EdgeInsert has empty row_id".to_owned(),
647 ));
648 }
649 if edge.logical_id.is_empty() {
650 return Err(EngineError::InvalidWrite(
651 "EdgeInsert has empty logical_id".to_owned(),
652 ));
653 }
654 }
655 for chunk in &request.chunks {
656 if chunk.id.is_empty() {
657 return Err(EngineError::InvalidWrite(
658 "ChunkInsert has empty id".to_owned(),
659 ));
660 }
661 if chunk.text_content.is_empty() {
662 return Err(EngineError::InvalidWrite(format!(
663 "chunk '{}' has empty text_content; empty chunks are not allowed",
664 chunk.id
665 )));
666 }
667 }
668 for run in &request.runs {
669 if run.id.is_empty() {
670 return Err(EngineError::InvalidWrite(
671 "RunInsert has empty id".to_owned(),
672 ));
673 }
674 }
675 for step in &request.steps {
676 if step.id.is_empty() {
677 return Err(EngineError::InvalidWrite(
678 "StepInsert has empty id".to_owned(),
679 ));
680 }
681 }
682 for action in &request.actions {
683 if action.id.is_empty() {
684 return Err(EngineError::InvalidWrite(
685 "ActionInsert has empty id".to_owned(),
686 ));
687 }
688 }
689 for vi in &request.vec_inserts {
690 if vi.chunk_id.is_empty() {
691 return Err(EngineError::InvalidWrite(
692 "VecInsert has empty chunk_id".to_owned(),
693 ));
694 }
695 if vi.embedding.is_empty() {
696 return Err(EngineError::InvalidWrite(format!(
697 "VecInsert for chunk '{}' has empty embedding",
698 vi.chunk_id
699 )));
700 }
701 }
702 for operational in &request.operational_writes {
703 if operational_write_collection(operational).is_empty() {
704 return Err(EngineError::InvalidWrite(
705 "OperationalWrite has empty collection".to_owned(),
706 ));
707 }
708 if operational_write_record_key(operational).is_empty() {
709 return Err(EngineError::InvalidWrite(format!(
710 "OperationalWrite for collection '{}' has empty record_key",
711 operational_write_collection(operational)
712 )));
713 }
714 match operational {
715 OperationalWrite::Append { payload_json, .. }
716 | OperationalWrite::Put { payload_json, .. } => {
717 if payload_json.is_empty() {
718 return Err(EngineError::InvalidWrite(format!(
719 "OperationalWrite {} '{}:{}' has empty payload_json",
720 operational_write_kind(operational),
721 operational_write_collection(operational),
722 operational_write_record_key(operational)
723 )));
724 }
725 }
726 OperationalWrite::Delete { .. } => {}
727 }
728 }
729
730 {
732 let mut seen = std::collections::HashSet::new();
733 for node in &request.nodes {
734 if !seen.insert(node.row_id.as_str()) {
735 return Err(EngineError::InvalidWrite(format!(
736 "duplicate row_id '{}' within the same WriteRequest",
737 node.row_id
738 )));
739 }
740 }
741 for edge in &request.edges {
742 if !seen.insert(edge.row_id.as_str()) {
743 return Err(EngineError::InvalidWrite(format!(
744 "duplicate row_id '{}' within the same WriteRequest",
745 edge.row_id
746 )));
747 }
748 }
749 }
750
751 if mode == ProvenanceMode::Require {
753 check_require_provenance(&request)?;
754 }
755
756 for run in &request.runs {
758 if run.upsert && run.supersedes_id.is_none() {
759 return Err(EngineError::InvalidWrite(format!(
760 "run '{}': upsert=true requires supersedes_id to be set",
761 run.id
762 )));
763 }
764 }
765 for step in &request.steps {
766 if step.upsert && step.supersedes_id.is_none() {
767 return Err(EngineError::InvalidWrite(format!(
768 "step '{}': upsert=true requires supersedes_id to be set",
769 step.id
770 )));
771 }
772 }
773 for action in &request.actions {
774 if action.upsert && action.supersedes_id.is_none() {
775 return Err(EngineError::InvalidWrite(format!(
776 "action '{}': upsert=true requires supersedes_id to be set",
777 action.id
778 )));
779 }
780 }
781
782 let node_kinds = request
783 .nodes
784 .iter()
785 .map(|node| (node.logical_id.clone(), node.kind.clone()))
786 .collect::<HashMap<_, _>>();
787
788 Ok(PreparedWrite {
789 label: request.label,
790 nodes: request.nodes,
791 node_retires: request.node_retires,
792 edges: request.edges,
793 edge_retires: request.edge_retires,
794 chunks: request.chunks,
795 runs: request.runs,
796 steps: request.steps,
797 actions: request.actions,
798 vec_inserts: request.vec_inserts,
799 operational_writes: request.operational_writes,
800 operational_collection_kinds: HashMap::new(),
801 operational_collection_filter_fields: HashMap::new(),
802 operational_validation_warnings: Vec::new(),
803 node_kinds,
804 required_fts_rows: Vec::new(),
805 required_property_fts_rows: Vec::new(),
806 optional_backfills: request.optional_backfills,
807 })
808}
809
810fn writer_loop(
811 database_path: &Path,
812 schema_manager: &Arc<SchemaManager>,
813 receiver: mpsc::Receiver<WriteMessage>,
814 telemetry: &TelemetryCounters,
815) {
816 trace_info!("writer thread started");
817
818 let mut conn = match sqlite::open_connection(database_path) {
819 Ok(conn) => conn,
820 Err(error) => {
821 trace_error!(error = %error, "writer thread: database connection failed");
822 reject_all(receiver, &error.to_string());
823 return;
824 }
825 };
826
827 if let Err(error) = schema_manager.bootstrap(&conn) {
828 trace_error!(error = %error, "writer thread: schema bootstrap failed");
829 reject_all(receiver, &error.to_string());
830 return;
831 }
832
833 for message in receiver {
834 match message {
835 WriteMessage::Submit {
836 mut prepared,
837 reply,
838 } => {
839 #[cfg(feature = "tracing")]
840 let start = std::time::Instant::now();
841 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
842 resolve_and_apply(&mut conn, &mut prepared)
843 }));
844 if let Ok(inner) = result {
845 #[allow(unused_variables)]
846 if let Err(error) = &inner {
847 trace_error!(
848 label = %prepared.label,
849 error = %error,
850 "write failed"
851 );
852 telemetry.increment_errors();
853 } else {
854 let row_count = (prepared.nodes.len()
855 + prepared.edges.len()
856 + prepared.chunks.len()) as u64;
857 telemetry.increment_writes(row_count);
858 trace_info!(
859 label = %prepared.label,
860 nodes = prepared.nodes.len(),
861 edges = prepared.edges.len(),
862 chunks = prepared.chunks.len(),
863 duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
864 "write committed"
865 );
866 }
867 let _ = reply.send(inner);
868 } else {
869 trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
870 telemetry.increment_errors();
871 let _ = conn.execute_batch("ROLLBACK");
873 let _ = reply.send(Err(EngineError::WriterRejected(
874 "writer thread panic during resolve_and_apply".to_owned(),
875 )));
876 }
877 }
878 WriteMessage::TouchLastAccessed { request, reply } => {
879 let result = apply_touch_last_accessed(&mut conn, &request);
880 if result.is_ok() {
881 telemetry.increment_writes(0);
882 } else {
883 telemetry.increment_errors();
884 }
885 let _ = reply.send(result);
886 }
887 }
888 }
889
890 trace_info!("writer thread shutting down");
891}
892
893fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
894 for message in receiver {
895 match message {
896 WriteMessage::Submit { reply, .. } => {
897 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
898 }
899 WriteMessage::TouchLastAccessed { reply, .. } => {
900 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
901 }
902 }
903 }
904}
905
906fn resolve_fts_rows(
914 conn: &rusqlite::Connection,
915 prepared: &mut PreparedWrite,
916) -> Result<(), EngineError> {
917 let retiring_ids: std::collections::HashSet<&str> = prepared
918 .node_retires
919 .iter()
920 .map(|r| r.logical_id.as_str())
921 .collect();
922 for chunk in &prepared.chunks {
923 if retiring_ids.contains(chunk.node_logical_id.as_str()) {
924 return Err(EngineError::InvalidWrite(format!(
925 "chunk '{}' references node_logical_id '{}' which is being retired in the same \
926 WriteRequest; retire and chunk insertion for the same node must not be combined",
927 chunk.id, chunk.node_logical_id
928 )));
929 }
930 }
931 for chunk in &prepared.chunks {
932 let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
933 k.clone()
934 } else {
935 match conn.query_row(
936 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
937 params![chunk.node_logical_id],
938 |row| row.get::<_, String>(0),
939 ) {
940 Ok(kind) => kind,
941 Err(rusqlite::Error::QueryReturnedNoRows) => {
942 return Err(EngineError::InvalidWrite(format!(
943 "chunk '{}' references node_logical_id '{}' that is not present in this \
944 write request or the database \
945 (v1 limitation: chunks and their nodes must be submitted together or the \
946 node must already exist)",
947 chunk.id, chunk.node_logical_id
948 )));
949 }
950 Err(e) => return Err(EngineError::Sqlite(e)),
951 }
952 };
953 prepared.required_fts_rows.push(FtsProjectionRow {
954 chunk_id: chunk.id.clone(),
955 node_logical_id: chunk.node_logical_id.clone(),
956 kind,
957 text_content: chunk.text_content.clone(),
958 });
959 }
960 trace_debug!(
961 fts_rows = prepared.required_fts_rows.len(),
962 chunks_processed = prepared.chunks.len(),
963 "fts row resolution completed"
964 );
965 Ok(())
966}
967
968fn resolve_property_fts_rows(
971 conn: &rusqlite::Connection,
972 prepared: &mut PreparedWrite,
973) -> Result<(), EngineError> {
974 if prepared.nodes.is_empty() {
975 return Ok(());
976 }
977
978 let schemas: HashMap<String, PropertyFtsSchema> =
979 load_fts_property_schemas(conn)?.into_iter().collect();
980
981 if schemas.is_empty() {
982 return Ok(());
983 }
984
985 let mut combined_stats = ExtractStats::default();
986 for node in &prepared.nodes {
987 let Some(schema) = schemas.get(&node.kind) else {
988 continue;
989 };
990 let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
991 let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
992 let (text_content, positions, stats) = extract_property_fts(&props, schema);
993 combined_stats.merge(stats);
994 if let Some(text_content) = text_content {
995 let columns = if has_weights {
996 extract_property_fts_columns(&props, schema)
997 } else {
998 Vec::new()
999 };
1000 prepared
1001 .required_property_fts_rows
1002 .push(FtsPropertyProjectionRow {
1003 node_logical_id: node.logical_id.clone(),
1004 kind: node.kind.clone(),
1005 text_content,
1006 positions,
1007 columns,
1008 });
1009 }
1010 }
1011 if combined_stats != ExtractStats::default() {
1012 trace_debug!(
1013 depth_cap_hit = combined_stats.depth_cap_hit,
1014 byte_cap_reached = combined_stats.byte_cap_reached,
1015 excluded_subtree = combined_stats.excluded_subtree,
1016 "property fts recursive extraction guardrails engaged"
1017 );
1018 }
1019 trace_debug!(
1020 property_fts_rows = prepared.required_property_fts_rows.len(),
1021 nodes_processed = prepared.nodes.len(),
1022 "property fts row resolution completed"
1023 );
1024 Ok(())
1025}
1026
1027fn resolve_operational_writes(
1028 conn: &rusqlite::Connection,
1029 prepared: &mut PreparedWrite,
1030) -> Result<(), EngineError> {
1031 let mut collection_kinds = HashMap::new();
1032 let mut collection_filter_fields = HashMap::new();
1033 let mut collection_validation_contracts = HashMap::new();
1034 for write in &prepared.operational_writes {
1035 let collection = operational_write_collection(write);
1036 if !collection_kinds.contains_key(collection) {
1037 let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1038 .query_row(
1039 "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1040 params![collection],
1041 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1042 )
1043 .optional()
1044 .map_err(EngineError::Sqlite)?;
1045 let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1046 .ok_or_else(|| {
1047 EngineError::InvalidWrite(format!(
1048 "operational collection '{collection}' is not registered"
1049 ))
1050 })?;
1051 if disabled_at.is_some() {
1052 return Err(EngineError::InvalidWrite(format!(
1053 "operational collection '{collection}' is disabled"
1054 )));
1055 }
1056 let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1057 .map_err(EngineError::InvalidWrite)?;
1058 let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1059 let validation_contract = parse_operational_validation_contract(&validation_json)
1060 .map_err(EngineError::InvalidWrite)?;
1061 collection_kinds.insert(collection.to_owned(), kind);
1062 collection_filter_fields.insert(collection.to_owned(), filter_fields);
1063 collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1064 }
1065
1066 let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1067 EngineError::InvalidWrite("missing operational collection kind".to_owned())
1068 })?;
1069 match (kind, write) {
1070 (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1071 | (
1072 OperationalCollectionKind::LatestState,
1073 OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1074 ) => {}
1075 (OperationalCollectionKind::AppendOnlyLog, _) => {
1076 return Err(EngineError::InvalidWrite(format!(
1077 "operational collection '{collection}' is append_only_log and only accepts Append"
1078 )));
1079 }
1080 (OperationalCollectionKind::LatestState, _) => {
1081 return Err(EngineError::InvalidWrite(format!(
1082 "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1083 )));
1084 }
1085 }
1086 if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1087 let _ = check_operational_write_against_contract(write, contract)?;
1088 }
1089 }
1090 prepared.operational_collection_kinds = collection_kinds;
1091 prepared.operational_collection_filter_fields = collection_filter_fields;
1092 Ok(())
1093}
1094
1095fn parse_operational_filter_fields(
1096 filter_fields_json: &str,
1097) -> Result<Vec<OperationalFilterField>, EngineError> {
1098 let fields: Vec<OperationalFilterField> =
1099 serde_json::from_str(filter_fields_json).map_err(|error| {
1100 EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1101 })?;
1102 let mut seen = std::collections::HashSet::new();
1103 for field in &fields {
1104 if field.name.trim().is_empty() {
1105 return Err(EngineError::InvalidWrite(
1106 "filter_fields_json field names must not be empty".to_owned(),
1107 ));
1108 }
1109 if !seen.insert(field.name.as_str()) {
1110 return Err(EngineError::InvalidWrite(format!(
1111 "filter_fields_json contains duplicate field '{}'",
1112 field.name
1113 )));
1114 }
1115 if field.modes.is_empty() {
1116 return Err(EngineError::InvalidWrite(format!(
1117 "filter_fields_json field '{}' must declare at least one mode",
1118 field.name
1119 )));
1120 }
1121 if field.modes.contains(&OperationalFilterMode::Prefix)
1122 && field.field_type != OperationalFilterFieldType::String
1123 {
1124 return Err(EngineError::InvalidWrite(format!(
1125 "filter field '{}' only supports prefix for string types",
1126 field.name
1127 )));
1128 }
1129 }
1130 Ok(fields)
1131}
1132
1133#[derive(Clone, Debug, PartialEq, Eq)]
1134struct OperationalFilterValueRow {
1135 field_name: String,
1136 string_value: Option<String>,
1137 integer_value: Option<i64>,
1138}
1139
1140fn extract_operational_filter_values(
1141 filter_fields: &[OperationalFilterField],
1142 payload_json: &str,
1143) -> Vec<OperationalFilterValueRow> {
1144 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1145 return Vec::new();
1146 };
1147 let Some(object) = parsed.as_object() else {
1148 return Vec::new();
1149 };
1150
1151 filter_fields
1152 .iter()
1153 .filter_map(|field| {
1154 let value = object.get(&field.name)?;
1155 match field.field_type {
1156 OperationalFilterFieldType::String => {
1157 value
1158 .as_str()
1159 .map(|string_value| OperationalFilterValueRow {
1160 field_name: field.name.clone(),
1161 string_value: Some(string_value.to_owned()),
1162 integer_value: None,
1163 })
1164 }
1165 OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1166 value
1167 .as_i64()
1168 .map(|integer_value| OperationalFilterValueRow {
1169 field_name: field.name.clone(),
1170 string_value: None,
1171 integer_value: Some(integer_value),
1172 })
1173 }
1174 }
1175 })
1176 .collect()
1177}
1178
1179fn resolve_and_apply(
1180 conn: &mut rusqlite::Connection,
1181 prepared: &mut PreparedWrite,
1182) -> Result<WriteReceipt, EngineError> {
1183 resolve_fts_rows(conn, prepared)?;
1184 resolve_property_fts_rows(conn, prepared)?;
1185 resolve_operational_writes(conn, prepared)?;
1186 apply_write(conn, prepared)
1187}
1188
1189fn apply_touch_last_accessed(
1190 conn: &mut rusqlite::Connection,
1191 request: &LastAccessTouchRequest,
1192) -> Result<LastAccessTouchReport, EngineError> {
1193 let mut seen = std::collections::HashSet::new();
1194 let logical_ids = request
1195 .logical_ids
1196 .iter()
1197 .filter(|logical_id| seen.insert(logical_id.as_str()))
1198 .cloned()
1199 .collect::<Vec<_>>();
1200 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1201
1202 for logical_id in &logical_ids {
1203 let exists = tx
1204 .query_row(
1205 "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1206 params![logical_id],
1207 |row| row.get::<_, i64>(0),
1208 )
1209 .optional()?
1210 .is_some();
1211 if !exists {
1212 return Err(EngineError::InvalidWrite(format!(
1213 "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1214 )));
1215 }
1216 }
1217
1218 {
1219 let mut upsert_metadata = tx.prepare_cached(
1220 "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1221 VALUES (?1, ?2, ?2) \
1222 ON CONFLICT(logical_id) DO UPDATE SET \
1223 last_accessed_at = excluded.last_accessed_at, \
1224 updated_at = excluded.updated_at",
1225 )?;
1226 let mut insert_provenance = tx.prepare_cached(
1227 "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1228 VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1229 )?;
1230 for logical_id in &logical_ids {
1231 upsert_metadata.execute(params![logical_id, request.touched_at])?;
1232 insert_provenance.execute(params![
1233 new_id(),
1234 logical_id,
1235 request.source_ref.as_deref(),
1236 format!("{{\"touched_at\":{}}}", request.touched_at),
1237 ])?;
1238 }
1239 }
1240
1241 tx.commit()?;
1242 Ok(LastAccessTouchReport {
1243 touched_logical_ids: logical_ids.len(),
1244 touched_at: request.touched_at,
1245 })
1246}
1247
1248fn ensure_operational_collections_writable(
1249 tx: &rusqlite::Transaction<'_>,
1250 prepared: &PreparedWrite,
1251) -> Result<(), EngineError> {
1252 for collection in prepared.operational_collection_kinds.keys() {
1253 let disabled_at: Option<Option<i64>> = tx
1254 .query_row(
1255 "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1256 params![collection],
1257 |row| row.get::<_, Option<i64>>(0),
1258 )
1259 .optional()?;
1260 match disabled_at {
1261 Some(Some(_)) => {
1262 return Err(EngineError::InvalidWrite(format!(
1263 "operational collection '{collection}' is disabled"
1264 )));
1265 }
1266 Some(None) => {}
1267 None => {
1268 return Err(EngineError::InvalidWrite(format!(
1269 "operational collection '{collection}' is not registered"
1270 )));
1271 }
1272 }
1273 }
1274 Ok(())
1275}
1276
1277fn validate_operational_writes_against_live_contracts(
1278 tx: &rusqlite::Transaction<'_>,
1279 prepared: &PreparedWrite,
1280) -> Result<Vec<String>, EngineError> {
1281 let mut collection_validation_contracts =
1282 HashMap::<String, Option<OperationalValidationContract>>::new();
1283 for collection in prepared.operational_collection_kinds.keys() {
1284 let validation_json: String = tx
1285 .query_row(
1286 "SELECT validation_json FROM operational_collections WHERE name = ?1",
1287 params![collection],
1288 |row| row.get(0),
1289 )
1290 .map_err(EngineError::Sqlite)?;
1291 let validation_contract = parse_operational_validation_contract(&validation_json)
1292 .map_err(EngineError::InvalidWrite)?;
1293 collection_validation_contracts.insert(collection.clone(), validation_contract);
1294 }
1295
1296 let mut warnings = Vec::new();
1297 for write in &prepared.operational_writes {
1298 if let Some(Some(contract)) =
1299 collection_validation_contracts.get(operational_write_collection(write))
1300 && let Some(warning) = check_operational_write_against_contract(write, contract)?
1301 {
1302 warnings.push(warning);
1303 }
1304 }
1305
1306 Ok(warnings)
1307}
1308
1309fn load_live_operational_secondary_indexes(
1310 tx: &rusqlite::Transaction<'_>,
1311 prepared: &PreparedWrite,
1312) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1313 let mut collection_indexes = HashMap::new();
1314 for (collection, collection_kind) in &prepared.operational_collection_kinds {
1315 let secondary_indexes_json: String = tx
1316 .query_row(
1317 "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1318 params![collection],
1319 |row| row.get(0),
1320 )
1321 .map_err(EngineError::Sqlite)?;
1322 let indexes =
1323 parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1324 .map_err(EngineError::InvalidWrite)?;
1325 collection_indexes.insert(collection.clone(), indexes);
1326 }
1327 Ok(collection_indexes)
1328}
1329
1330fn check_operational_write_against_contract(
1331 write: &OperationalWrite,
1332 contract: &OperationalValidationContract,
1333) -> Result<Option<String>, EngineError> {
1334 if contract.mode == OperationalValidationMode::Disabled {
1335 return Ok(None);
1336 }
1337
1338 let (payload_json, collection, record_key) = match write {
1339 OperationalWrite::Append {
1340 collection,
1341 record_key,
1342 payload_json,
1343 ..
1344 }
1345 | OperationalWrite::Put {
1346 collection,
1347 record_key,
1348 payload_json,
1349 ..
1350 } => (
1351 payload_json.as_str(),
1352 collection.as_str(),
1353 record_key.as_str(),
1354 ),
1355 OperationalWrite::Delete { .. } => return Ok(None),
1356 };
1357
1358 match validate_operational_payload_against_contract(contract, payload_json) {
1359 Ok(()) => Ok(None),
1360 Err(message) => match contract.mode {
1361 OperationalValidationMode::Disabled => Ok(None),
1362 OperationalValidationMode::ReportOnly => Ok(Some(format!(
1363 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1364 kind = operational_write_kind(write)
1365 ))),
1366 OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1367 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1368 kind = operational_write_kind(write)
1369 ))),
1370 },
1371 }
1372}
1373
1374#[allow(clippy::too_many_lines)]
1375fn apply_write(
1376 conn: &mut rusqlite::Connection,
1377 prepared: &mut PreparedWrite,
1378) -> Result<WriteReceipt, EngineError> {
1379 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1380
1381 {
1384 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1385 let mut del_prop_positions = tx
1386 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1387 let mut del_staging = tx.prepare_cached(
1390 "DELETE FROM fts_property_rebuild_staging WHERE node_logical_id = ?1",
1391 )?;
1392 let mut sup_node = tx.prepare_cached(
1393 "UPDATE nodes SET superseded_at = unixepoch() \
1394 WHERE logical_id = ?1 AND superseded_at IS NULL",
1395 )?;
1396 let mut ins_event = tx.prepare_cached(
1397 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1398 VALUES (?1, 'node_retire', ?2, ?3)",
1399 )?;
1400 for retire in &prepared.node_retires {
1401 del_fts.execute(params![retire.logical_id])?;
1402 let kind_opt: Option<String> = tx
1404 .query_row(
1405 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1406 params![retire.logical_id],
1407 |r| r.get(0),
1408 )
1409 .optional()?;
1410 if let Some(kind) = kind_opt {
1411 let table = fathomdb_schema::fts_kind_table_name(&kind);
1412 let table_exists: bool = tx
1414 .query_row(
1415 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1416 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1417 rusqlite::params![table],
1418 |r| r.get::<_, i64>(0),
1419 )
1420 .unwrap_or(0)
1421 > 0;
1422 if table_exists {
1423 tx.execute(
1424 &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1425 params![retire.logical_id],
1426 )?;
1427 }
1428 }
1429 del_prop_positions.execute(params![retire.logical_id])?;
1430 del_staging.execute(params![retire.logical_id])?;
1431 sup_node.execute(params![retire.logical_id])?;
1432 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1433 }
1434 }
1435
1436 {
1438 let mut sup_edge = tx.prepare_cached(
1439 "UPDATE edges SET superseded_at = unixepoch() \
1440 WHERE logical_id = ?1 AND superseded_at IS NULL",
1441 )?;
1442 let mut ins_event = tx.prepare_cached(
1443 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1444 VALUES (?1, 'edge_retire', ?2, ?3)",
1445 )?;
1446 for retire in &prepared.edge_retires {
1447 sup_edge.execute(params![retire.logical_id])?;
1448 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1449 }
1450 }
1451
1452 {
1454 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1455 let mut del_prop_positions = tx
1456 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1457 let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1458 let mut sup_node = tx.prepare_cached(
1459 "UPDATE nodes SET superseded_at = unixepoch() \
1460 WHERE logical_id = ?1 AND superseded_at IS NULL",
1461 )?;
1462 let mut ins_node = tx.prepare_cached(
1463 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1464 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1465 )?;
1466 for node in &prepared.nodes {
1467 if node.upsert {
1468 let table = fathomdb_schema::fts_kind_table_name(&node.kind);
1471 let table_exists: bool = tx
1473 .query_row(
1474 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1475 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1476 rusqlite::params![table],
1477 |r| r.get::<_, i64>(0),
1478 )
1479 .unwrap_or(0)
1480 > 0;
1481 if table_exists {
1482 tx.execute(
1483 &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1484 params![node.logical_id],
1485 )?;
1486 }
1487 del_prop_positions.execute(params![node.logical_id])?;
1488 if node.chunk_policy == ChunkPolicy::Replace {
1489 #[cfg(feature = "sqlite-vec")]
1491 {
1492 let vec_table = fathomdb_schema::vec_kind_table_name(&node.kind);
1493 let del_sql = format!(
1494 "DELETE FROM {vec_table} WHERE chunk_id IN \
1495 (SELECT id FROM chunks WHERE node_logical_id = ?1)"
1496 );
1497 match tx.execute(&del_sql, params![node.logical_id]) {
1498 Ok(_) => {}
1499 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {}
1500 Err(e) => return Err(e.into()),
1501 }
1502 }
1503 del_fts.execute(params![node.logical_id])?;
1504 del_chunks.execute(params![node.logical_id])?;
1505 }
1506 sup_node.execute(params![node.logical_id])?;
1507 }
1508 ins_node.execute(params![
1509 node.row_id,
1510 node.logical_id,
1511 node.kind,
1512 node.properties,
1513 node.source_ref,
1514 node.content_ref,
1515 ])?;
1516 }
1517 }
1518
1519 {
1521 let mut sup_edge = tx.prepare_cached(
1522 "UPDATE edges SET superseded_at = unixepoch() \
1523 WHERE logical_id = ?1 AND superseded_at IS NULL",
1524 )?;
1525 let mut ins_edge = tx.prepare_cached(
1526 "INSERT INTO edges \
1527 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1528 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1529 )?;
1530 for edge in &prepared.edges {
1531 if edge.upsert {
1532 sup_edge.execute(params![edge.logical_id])?;
1533 }
1534 ins_edge.execute(params![
1535 edge.row_id,
1536 edge.logical_id,
1537 edge.source_logical_id,
1538 edge.target_logical_id,
1539 edge.kind,
1540 edge.properties,
1541 edge.source_ref,
1542 ])?;
1543 }
1544 }
1545
1546 {
1548 let mut ins_chunk = tx.prepare_cached(
1549 "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
1550 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1551 )?;
1552 for chunk in &prepared.chunks {
1553 ins_chunk.execute(params![
1554 chunk.id,
1555 chunk.node_logical_id,
1556 chunk.text_content,
1557 chunk.byte_start,
1558 chunk.byte_end,
1559 chunk.content_hash,
1560 ])?;
1561 }
1562 }
1563
1564 {
1566 let mut sup_run = tx.prepare_cached(
1567 "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1568 )?;
1569 let mut ins_run = tx.prepare_cached(
1570 "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1571 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1572 )?;
1573 for run in &prepared.runs {
1574 if run.upsert
1575 && let Some(ref prior_id) = run.supersedes_id
1576 {
1577 sup_run.execute(params![prior_id])?;
1578 }
1579 ins_run.execute(params![
1580 run.id,
1581 run.kind,
1582 run.status,
1583 run.properties,
1584 run.source_ref
1585 ])?;
1586 }
1587 }
1588
1589 {
1591 let mut sup_step = tx.prepare_cached(
1592 "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1593 )?;
1594 let mut ins_step = tx.prepare_cached(
1595 "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1596 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1597 )?;
1598 for step in &prepared.steps {
1599 if step.upsert
1600 && let Some(ref prior_id) = step.supersedes_id
1601 {
1602 sup_step.execute(params![prior_id])?;
1603 }
1604 ins_step.execute(params![
1605 step.id,
1606 step.run_id,
1607 step.kind,
1608 step.status,
1609 step.properties,
1610 step.source_ref,
1611 ])?;
1612 }
1613 }
1614
1615 {
1617 let mut sup_action = tx.prepare_cached(
1618 "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1619 )?;
1620 let mut ins_action = tx.prepare_cached(
1621 "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1622 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1623 )?;
1624 for action in &prepared.actions {
1625 if action.upsert
1626 && let Some(ref prior_id) = action.supersedes_id
1627 {
1628 sup_action.execute(params![prior_id])?;
1629 }
1630 ins_action.execute(params![
1631 action.id,
1632 action.step_id,
1633 action.kind,
1634 action.status,
1635 action.properties,
1636 action.source_ref,
1637 ])?;
1638 }
1639 }
1640
1641 {
1643 ensure_operational_collections_writable(&tx, prepared)?;
1644 prepared.operational_validation_warnings =
1645 validate_operational_writes_against_live_contracts(&tx, prepared)?;
1646 let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
1647
1648 let mut next_mutation_order: i64 = tx.query_row(
1649 "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
1650 [],
1651 |row| row.get(0),
1652 )?;
1653 let mut ins_mutation = tx.prepare_cached(
1654 "INSERT INTO operational_mutations \
1655 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1656 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1657 )?;
1658 let mut ins_filter_value = tx.prepare_cached(
1659 "INSERT INTO operational_filter_values \
1660 (mutation_id, collection_name, field_name, string_value, integer_value) \
1661 VALUES (?1, ?2, ?3, ?4, ?5)",
1662 )?;
1663 let mut upsert_current = tx.prepare_cached(
1664 "INSERT INTO operational_current \
1665 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1666 VALUES (?1, ?2, ?3, unixepoch(), ?4) \
1667 ON CONFLICT(collection_name, record_key) DO UPDATE SET \
1668 payload_json = excluded.payload_json, \
1669 updated_at = excluded.updated_at, \
1670 last_mutation_id = excluded.last_mutation_id",
1671 )?;
1672 let mut del_current = tx.prepare_cached(
1673 "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
1674 )?;
1675 let mut del_current_secondary_indexes = tx.prepare_cached(
1676 "DELETE FROM operational_secondary_index_entries \
1677 WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
1678 )?;
1679 let mut ins_secondary_index = tx.prepare_cached(
1680 "INSERT INTO operational_secondary_index_entries \
1681 (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
1682 slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
1683 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1684 )?;
1685 let mut current_row_stmt = tx.prepare_cached(
1686 "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
1687 WHERE collection_name = ?1 AND record_key = ?2",
1688 )?;
1689
1690 for write in &prepared.operational_writes {
1691 let collection = operational_write_collection(write);
1692 let record_key = operational_write_record_key(write);
1693 let mutation_id = new_id();
1694 next_mutation_order += 1;
1695 let payload_json = operational_write_payload(write);
1696 ins_mutation.execute(params![
1697 &mutation_id,
1698 collection,
1699 record_key,
1700 operational_write_kind(write),
1701 payload_json,
1702 operational_write_source_ref(write),
1703 next_mutation_order,
1704 ])?;
1705 if let Some(indexes) = collection_secondary_indexes.get(collection) {
1706 for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
1707 ins_secondary_index.execute(params![
1708 collection,
1709 entry.index_name,
1710 "mutation",
1711 &mutation_id,
1712 record_key,
1713 entry.sort_timestamp,
1714 entry.slot1_text,
1715 entry.slot1_integer,
1716 entry.slot2_text,
1717 entry.slot2_integer,
1718 entry.slot3_text,
1719 entry.slot3_integer,
1720 ])?;
1721 }
1722 }
1723 if let Some(filter_fields) = prepared
1724 .operational_collection_filter_fields
1725 .get(collection)
1726 {
1727 for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
1728 ins_filter_value.execute(params![
1729 &mutation_id,
1730 collection,
1731 filter_value.field_name,
1732 filter_value.string_value,
1733 filter_value.integer_value,
1734 ])?;
1735 }
1736 }
1737
1738 if prepared.operational_collection_kinds.get(collection)
1739 == Some(&OperationalCollectionKind::LatestState)
1740 {
1741 del_current_secondary_indexes.execute(params![collection, record_key])?;
1742 match write {
1743 OperationalWrite::Put { payload_json, .. } => {
1744 upsert_current.execute(params![
1745 collection,
1746 record_key,
1747 payload_json,
1748 &mutation_id,
1749 ])?;
1750 if let Some(indexes) = collection_secondary_indexes.get(collection) {
1751 let (current_payload_json, updated_at, last_mutation_id): (
1752 String,
1753 i64,
1754 String,
1755 ) = current_row_stmt
1756 .query_row(params![collection, record_key], |row| {
1757 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1758 })?;
1759 for entry in extract_secondary_index_entries_for_current(
1760 indexes,
1761 ¤t_payload_json,
1762 updated_at,
1763 ) {
1764 ins_secondary_index.execute(params![
1765 collection,
1766 entry.index_name,
1767 "current",
1768 last_mutation_id.as_str(),
1769 record_key,
1770 entry.sort_timestamp,
1771 entry.slot1_text,
1772 entry.slot1_integer,
1773 entry.slot2_text,
1774 entry.slot2_integer,
1775 entry.slot3_text,
1776 entry.slot3_integer,
1777 ])?;
1778 }
1779 }
1780 }
1781 OperationalWrite::Delete { .. } => {
1782 del_current.execute(params![collection, record_key])?;
1783 }
1784 OperationalWrite::Append { .. } => {}
1785 }
1786 }
1787 }
1788 }
1789
1790 {
1792 let mut ins_fts = tx.prepare_cached(
1793 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
1794 VALUES (?1, ?2, ?3, ?4)",
1795 )?;
1796 for fts_row in &prepared.required_fts_rows {
1797 ins_fts.execute(params![
1798 fts_row.chunk_id,
1799 fts_row.node_logical_id,
1800 fts_row.kind,
1801 fts_row.text_content,
1802 ])?;
1803 }
1804 }
1805
1806 if !prepared.required_property_fts_rows.is_empty() {
1808 let mut ins_positions = tx.prepare_cached(
1809 "INSERT INTO fts_node_property_positions \
1810 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
1811 VALUES (?1, ?2, ?3, ?4, ?5)",
1812 )?;
1813 let mut del_positions = tx
1816 .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1817 for row in &prepared.required_property_fts_rows {
1818 del_positions.execute(params![row.node_logical_id])?;
1819 let table = fathomdb_schema::fts_kind_table_name(&row.kind);
1821 if row.columns.is_empty() {
1822 tx.execute_batch(&format!(
1825 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
1826 node_logical_id UNINDEXED, text_content, \
1827 tokenize = '{DEFAULT_FTS_TOKENIZER}'\
1828 )"
1829 ))?;
1830 tx.prepare(&format!(
1831 "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
1832 ))?
1833 .execute(params![row.node_logical_id, row.text_content])?;
1834 } else {
1835 let col_names: Vec<&str> = row.columns.iter().map(|(n, _)| n.as_str()).collect();
1837 let placeholders: Vec<String> = (2..=row.columns.len() + 1)
1838 .map(|i| format!("?{i}"))
1839 .collect();
1840 let sql = format!(
1841 "INSERT INTO {table}(node_logical_id, {cols}) VALUES (?1, {placeholders})",
1842 cols = col_names.join(", "),
1843 placeholders = placeholders.join(", "),
1844 );
1845 let mut stmt = tx.prepare(&sql)?;
1846 stmt.execute(rusqlite::params_from_iter(
1847 std::iter::once(row.node_logical_id.as_str())
1848 .chain(row.columns.iter().map(|(_, v)| v.as_str())),
1849 ))?;
1850 }
1851 for pos in &row.positions {
1852 ins_positions.execute(params![
1853 row.node_logical_id,
1854 row.kind,
1855 i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
1856 i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
1857 pos.leaf_path,
1858 ])?;
1859 }
1860 }
1861 }
1862
1863 if !prepared.required_property_fts_rows.is_empty() {
1868 let mut ins_staging_simple = tx.prepare_cached(
1869 "INSERT INTO fts_property_rebuild_staging \
1870 (kind, node_logical_id, text_content) \
1871 VALUES (?1, ?2, ?3) \
1872 ON CONFLICT(kind, node_logical_id) DO UPDATE \
1873 SET text_content = excluded.text_content",
1874 )?;
1875 let mut ins_staging_columns = tx.prepare_cached(
1876 "INSERT INTO fts_property_rebuild_staging \
1877 (kind, node_logical_id, text_content, columns_json) \
1878 VALUES (?1, ?2, ?3, ?4) \
1879 ON CONFLICT(kind, node_logical_id) DO UPDATE \
1880 SET text_content = excluded.text_content, \
1881 columns_json = excluded.columns_json",
1882 )?;
1883 let mut check_rebuild = tx.prepare_cached(
1884 "SELECT 1 FROM fts_property_rebuild_state \
1885 WHERE kind = ?1 AND state IN ('PENDING','BUILDING','SWAPPING') LIMIT 1",
1886 )?;
1887 for row in &prepared.required_property_fts_rows {
1888 let in_rebuild: bool = check_rebuild
1889 .query_row(params![row.kind], |_| Ok(true))
1890 .optional()?
1891 .unwrap_or(false);
1892 if in_rebuild {
1893 if row.columns.is_empty() {
1894 ins_staging_simple.execute(params![
1895 row.kind,
1896 row.node_logical_id,
1897 row.text_content
1898 ])?;
1899 } else {
1900 let json_map: serde_json::Map<String, serde_json::Value> = row
1901 .columns
1902 .iter()
1903 .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
1904 .collect();
1905 let columns_json =
1906 serde_json::to_string(&serde_json::Value::Object(json_map)).ok();
1907 ins_staging_columns.execute(params![
1908 row.kind,
1909 row.node_logical_id,
1910 row.text_content,
1911 columns_json
1912 ])?;
1913 }
1914 }
1915 }
1916 }
1917
1918 #[cfg(feature = "sqlite-vec")]
1921 {
1922 let chunk_to_node: std::collections::HashMap<&str, &str> = prepared
1924 .chunks
1925 .iter()
1926 .map(|c| (c.id.as_str(), c.node_logical_id.as_str()))
1927 .collect();
1928
1929 for vi in &prepared.vec_inserts {
1930 let kind: Option<String> = chunk_to_node
1932 .get(vi.chunk_id.as_str())
1933 .and_then(|lid| prepared.node_kinds.get(*lid))
1934 .cloned()
1935 .or_else(|| {
1936 tx.query_row(
1937 "SELECT n.kind FROM chunks c \
1938 JOIN nodes n ON n.logical_id = c.node_logical_id \
1939 WHERE c.id = ?1 LIMIT 1",
1940 rusqlite::params![vi.chunk_id],
1941 |row| row.get::<_, String>(0),
1942 )
1943 .optional()
1944 .unwrap_or(None)
1945 });
1946
1947 let Some(kind) = kind else {
1948 continue;
1950 };
1951
1952 let table_name = fathomdb_schema::vec_kind_table_name(&kind);
1953 let dimension = vi.embedding.len();
1954 let bytes: Vec<u8> = vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
1955 tx.execute_batch(&format!(
1957 "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1958 chunk_id TEXT PRIMARY KEY,\
1959 embedding float[{dimension}]\
1960 )"
1961 ))?;
1962 tx.execute(
1963 &format!(
1964 "INSERT OR REPLACE INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
1965 ),
1966 rusqlite::params![vi.chunk_id, bytes],
1967 )?;
1968 }
1969 }
1970
1971 tx.commit()?;
1972
1973 let provenance_warnings: Vec<String> = prepared
1974 .nodes
1975 .iter()
1976 .filter(|node| node.source_ref.is_none())
1977 .map(|node| format!("node '{}' has no source_ref", node.logical_id))
1978 .chain(
1979 prepared
1980 .node_retires
1981 .iter()
1982 .filter(|r| r.source_ref.is_none())
1983 .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
1984 )
1985 .chain(
1986 prepared
1987 .edges
1988 .iter()
1989 .filter(|e| e.source_ref.is_none())
1990 .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
1991 )
1992 .chain(
1993 prepared
1994 .edge_retires
1995 .iter()
1996 .filter(|r| r.source_ref.is_none())
1997 .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
1998 )
1999 .chain(
2000 prepared
2001 .runs
2002 .iter()
2003 .filter(|r| r.source_ref.is_none())
2004 .map(|r| format!("run '{}' has no source_ref", r.id)),
2005 )
2006 .chain(
2007 prepared
2008 .steps
2009 .iter()
2010 .filter(|s| s.source_ref.is_none())
2011 .map(|s| format!("step '{}' has no source_ref", s.id)),
2012 )
2013 .chain(
2014 prepared
2015 .actions
2016 .iter()
2017 .filter(|a| a.source_ref.is_none())
2018 .map(|a| format!("action '{}' has no source_ref", a.id)),
2019 )
2020 .chain(
2021 prepared
2022 .operational_writes
2023 .iter()
2024 .filter(|write| operational_write_source_ref(write).is_none())
2025 .map(|write| {
2026 format!(
2027 "operational {} '{}:{}' has no source_ref",
2028 operational_write_kind(write),
2029 operational_write_collection(write),
2030 operational_write_record_key(write)
2031 )
2032 }),
2033 )
2034 .collect();
2035
2036 let mut warnings = provenance_warnings.clone();
2037 warnings.extend(prepared.operational_validation_warnings.clone());
2038
2039 Ok(WriteReceipt {
2040 label: prepared.label.clone(),
2041 optional_backfill_count: prepared.optional_backfills.len(),
2042 warnings,
2043 provenance_warnings,
2044 })
2045}
2046
2047fn operational_write_collection(write: &OperationalWrite) -> &str {
2048 match write {
2049 OperationalWrite::Append { collection, .. }
2050 | OperationalWrite::Put { collection, .. }
2051 | OperationalWrite::Delete { collection, .. } => collection,
2052 }
2053}
2054
2055fn operational_write_record_key(write: &OperationalWrite) -> &str {
2056 match write {
2057 OperationalWrite::Append { record_key, .. }
2058 | OperationalWrite::Put { record_key, .. }
2059 | OperationalWrite::Delete { record_key, .. } => record_key,
2060 }
2061}
2062
2063fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2064 match write {
2065 OperationalWrite::Append { .. } => "append",
2066 OperationalWrite::Put { .. } => "put",
2067 OperationalWrite::Delete { .. } => "delete",
2068 }
2069}
2070
2071fn operational_write_payload(write: &OperationalWrite) -> &str {
2072 match write {
2073 OperationalWrite::Append { payload_json, .. }
2074 | OperationalWrite::Put { payload_json, .. } => payload_json,
2075 OperationalWrite::Delete { .. } => "null",
2076 }
2077}
2078
2079fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2080 match write {
2081 OperationalWrite::Append { source_ref, .. }
2082 | OperationalWrite::Put { source_ref, .. }
2083 | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2084 }
2085}
2086
2087#[cfg(test)]
2088#[allow(clippy::expect_used)]
2089mod tests {
2090 use std::sync::Arc;
2091
2092 use fathomdb_schema::SchemaManager;
2093 use tempfile::NamedTempFile;
2094
2095 use super::{apply_write, prepare_write, resolve_operational_writes};
2096 use crate::{
2097 ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2098 NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2099 StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2100 projection::ProjectionTarget,
2101 };
2102
2103 #[test]
2104 fn writer_executes_runtime_table_rows() {
2105 let db = NamedTempFile::new().expect("temporary db");
2106 let writer = WriterActor::start(
2107 db.path(),
2108 Arc::new(SchemaManager::new()),
2109 ProvenanceMode::Warn,
2110 Arc::new(TelemetryCounters::default()),
2111 )
2112 .expect("writer");
2113
2114 let receipt = writer
2115 .submit(WriteRequest {
2116 label: "runtime".to_owned(),
2117 nodes: vec![],
2118 node_retires: vec![],
2119 edges: vec![],
2120 edge_retires: vec![],
2121 chunks: vec![],
2122 runs: vec![RunInsert {
2123 id: "run-1".to_owned(),
2124 kind: "session".to_owned(),
2125 status: "completed".to_owned(),
2126 properties: "{}".to_owned(),
2127 source_ref: Some("src-1".to_owned()),
2128 upsert: false,
2129 supersedes_id: None,
2130 }],
2131 steps: vec![StepInsert {
2132 id: "step-1".to_owned(),
2133 run_id: "run-1".to_owned(),
2134 kind: "llm".to_owned(),
2135 status: "completed".to_owned(),
2136 properties: "{}".to_owned(),
2137 source_ref: Some("src-1".to_owned()),
2138 upsert: false,
2139 supersedes_id: None,
2140 }],
2141 actions: vec![ActionInsert {
2142 id: "action-1".to_owned(),
2143 step_id: "step-1".to_owned(),
2144 kind: "emit".to_owned(),
2145 status: "completed".to_owned(),
2146 properties: "{}".to_owned(),
2147 source_ref: Some("src-1".to_owned()),
2148 upsert: false,
2149 supersedes_id: None,
2150 }],
2151 optional_backfills: vec![],
2152 vec_inserts: vec![],
2153 operational_writes: vec![],
2154 })
2155 .expect("write receipt");
2156
2157 assert_eq!(receipt.label, "runtime");
2158 }
2159 #[test]
2160 fn writer_put_operational_write_updates_current_and_mutations() {
2161 let db = NamedTempFile::new().expect("temporary db");
2162 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2163 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2164 conn.execute(
2165 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2166 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2167 [],
2168 )
2169 .expect("seed collection");
2170 drop(conn);
2171 let writer = WriterActor::start(
2172 db.path(),
2173 Arc::new(SchemaManager::new()),
2174 ProvenanceMode::Warn,
2175 Arc::new(TelemetryCounters::default()),
2176 )
2177 .expect("writer");
2178
2179 writer
2180 .submit(WriteRequest {
2181 label: "node-and-operational".to_owned(),
2182 nodes: vec![NodeInsert {
2183 row_id: "row-1".to_owned(),
2184 logical_id: "lg-1".to_owned(),
2185 kind: "Meeting".to_owned(),
2186 properties: "{}".to_owned(),
2187 source_ref: Some("src-1".to_owned()),
2188 upsert: false,
2189 chunk_policy: ChunkPolicy::Preserve,
2190 content_ref: None,
2191 }],
2192 node_retires: vec![],
2193 edges: vec![],
2194 edge_retires: vec![],
2195 chunks: vec![],
2196 runs: vec![],
2197 steps: vec![],
2198 actions: vec![],
2199 optional_backfills: vec![],
2200 vec_inserts: vec![],
2201 operational_writes: vec![OperationalWrite::Put {
2202 collection: "connector_health".to_owned(),
2203 record_key: "gmail".to_owned(),
2204 payload_json: r#"{"status":"ok"}"#.to_owned(),
2205 source_ref: Some("src-1".to_owned()),
2206 }],
2207 })
2208 .expect("write receipt");
2209
2210 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2211 let node_count: i64 = conn
2212 .query_row(
2213 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2214 [],
2215 |row| row.get(0),
2216 )
2217 .expect("node count");
2218 assert_eq!(node_count, 1);
2219 let mutation_count: i64 = conn
2220 .query_row(
2221 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2222 AND record_key = 'gmail'",
2223 [],
2224 |row| row.get(0),
2225 )
2226 .expect("mutation count");
2227 assert_eq!(mutation_count, 1);
2228 let payload: String = conn
2229 .query_row(
2230 "SELECT payload_json FROM operational_current \
2231 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2232 [],
2233 |row| row.get(0),
2234 )
2235 .expect("current payload");
2236 assert_eq!(payload, r#"{"status":"ok"}"#);
2237 }
2238
2239 #[test]
2240 fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2241 let db = NamedTempFile::new().expect("temporary db");
2242 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2243 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2244 conn.execute(
2245 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2246 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2247 [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2248 )
2249 .expect("seed collection");
2250 drop(conn);
2251 let writer = WriterActor::start(
2252 db.path(),
2253 Arc::new(SchemaManager::new()),
2254 ProvenanceMode::Warn,
2255 Arc::new(TelemetryCounters::default()),
2256 )
2257 .expect("writer");
2258
2259 writer
2260 .submit(WriteRequest {
2261 label: "disabled-validation".to_owned(),
2262 nodes: vec![],
2263 node_retires: vec![],
2264 edges: vec![],
2265 edge_retires: vec![],
2266 chunks: vec![],
2267 runs: vec![],
2268 steps: vec![],
2269 actions: vec![],
2270 optional_backfills: vec![],
2271 vec_inserts: vec![],
2272 operational_writes: vec![OperationalWrite::Put {
2273 collection: "connector_health".to_owned(),
2274 record_key: "gmail".to_owned(),
2275 payload_json: r#"{"bogus":true}"#.to_owned(),
2276 source_ref: Some("src-1".to_owned()),
2277 }],
2278 })
2279 .expect("write receipt");
2280
2281 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2282 let payload: String = conn
2283 .query_row(
2284 "SELECT payload_json FROM operational_current \
2285 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2286 [],
2287 |row| row.get(0),
2288 )
2289 .expect("current payload");
2290 assert_eq!(payload, r#"{"bogus":true}"#);
2291 }
2292
2293 #[test]
2294 fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2295 let db = NamedTempFile::new().expect("temporary db");
2296 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2297 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2298 conn.execute(
2299 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2300 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2301 [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2302 )
2303 .expect("seed collection");
2304 drop(conn);
2305 let writer = WriterActor::start(
2306 db.path(),
2307 Arc::new(SchemaManager::new()),
2308 ProvenanceMode::Warn,
2309 Arc::new(TelemetryCounters::default()),
2310 )
2311 .expect("writer");
2312
2313 let receipt = writer
2314 .submit(WriteRequest {
2315 label: "report-only-validation".to_owned(),
2316 nodes: vec![],
2317 node_retires: vec![],
2318 edges: vec![],
2319 edge_retires: vec![],
2320 chunks: vec![],
2321 runs: vec![],
2322 steps: vec![],
2323 actions: vec![],
2324 optional_backfills: vec![],
2325 vec_inserts: vec![],
2326 operational_writes: vec![OperationalWrite::Put {
2327 collection: "connector_health".to_owned(),
2328 record_key: "gmail".to_owned(),
2329 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2330 source_ref: Some("src-1".to_owned()),
2331 }],
2332 })
2333 .expect("report_only write should succeed");
2334
2335 assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2336 assert_eq!(receipt.warnings.len(), 1);
2337 assert!(
2338 receipt.warnings[0].contains("connector_health"),
2339 "warning should identify collection"
2340 );
2341 assert!(
2342 receipt.warnings[0].contains("must be one of"),
2343 "warning should explain validation failure"
2344 );
2345
2346 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2347 let payload: String = conn
2348 .query_row(
2349 "SELECT payload_json FROM operational_current \
2350 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2351 [],
2352 |row| row.get(0),
2353 )
2354 .expect("current payload");
2355 assert_eq!(payload, r#"{"status":"bogus"}"#);
2356 }
2357
2358 #[test]
2359 fn writer_rejects_operational_write_for_missing_collection() {
2360 let db = NamedTempFile::new().expect("temporary db");
2361 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2362 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2363 drop(conn);
2364 let writer = WriterActor::start(
2365 db.path(),
2366 Arc::new(SchemaManager::new()),
2367 ProvenanceMode::Warn,
2368 Arc::new(TelemetryCounters::default()),
2369 )
2370 .expect("writer");
2371
2372 let result = writer.submit(WriteRequest {
2373 label: "missing-operational-collection".to_owned(),
2374 nodes: vec![],
2375 node_retires: vec![],
2376 edges: vec![],
2377 edge_retires: vec![],
2378 chunks: vec![],
2379 runs: vec![],
2380 steps: vec![],
2381 actions: vec![],
2382 optional_backfills: vec![],
2383 vec_inserts: vec![],
2384 operational_writes: vec![OperationalWrite::Put {
2385 collection: "connector_health".to_owned(),
2386 record_key: "gmail".to_owned(),
2387 payload_json: r#"{"status":"ok"}"#.to_owned(),
2388 source_ref: Some("src-1".to_owned()),
2389 }],
2390 });
2391
2392 assert!(
2393 matches!(result, Err(EngineError::InvalidWrite(_))),
2394 "missing operational collection must return InvalidWrite"
2395 );
2396 }
2397
2398 #[test]
2399 fn writer_append_operational_write_records_history_without_current_row() {
2400 let db = NamedTempFile::new().expect("temporary db");
2401 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2402 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2403 conn.execute(
2404 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2405 VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2406 [],
2407 )
2408 .expect("seed collection");
2409 drop(conn);
2410 let writer = WriterActor::start(
2411 db.path(),
2412 Arc::new(SchemaManager::new()),
2413 ProvenanceMode::Warn,
2414 Arc::new(TelemetryCounters::default()),
2415 )
2416 .expect("writer");
2417
2418 writer
2419 .submit(WriteRequest {
2420 label: "append-operational".to_owned(),
2421 nodes: vec![],
2422 node_retires: vec![],
2423 edges: vec![],
2424 edge_retires: vec![],
2425 chunks: vec![],
2426 runs: vec![],
2427 steps: vec![],
2428 actions: vec![],
2429 optional_backfills: vec![],
2430 vec_inserts: vec![],
2431 operational_writes: vec![OperationalWrite::Append {
2432 collection: "audit_log".to_owned(),
2433 record_key: "evt-1".to_owned(),
2434 payload_json: r#"{"type":"sync"}"#.to_owned(),
2435 source_ref: Some("src-1".to_owned()),
2436 }],
2437 })
2438 .expect("write receipt");
2439
2440 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2441 let mutation: (String, String) = conn
2442 .query_row(
2443 "SELECT op_kind, payload_json FROM operational_mutations \
2444 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2445 [],
2446 |row| Ok((row.get(0)?, row.get(1)?)),
2447 )
2448 .expect("mutation row");
2449 assert_eq!(mutation.0, "append");
2450 assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2451 let current_count: i64 = conn
2452 .query_row(
2453 "SELECT count(*) FROM operational_current \
2454 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2455 [],
2456 |row| row.get(0),
2457 )
2458 .expect("current count");
2459 assert_eq!(current_count, 0);
2460 }
2461
2462 #[test]
2463 fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2464 let db = NamedTempFile::new().expect("temporary db");
2465 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2466 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2467 conn.execute(
2468 "INSERT INTO operational_collections \
2469 (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2470 VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2471 '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2472 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2473 )
2474 .expect("seed collection");
2475 drop(conn);
2476 let writer = WriterActor::start(
2477 db.path(),
2478 Arc::new(SchemaManager::new()),
2479 ProvenanceMode::Warn,
2480 Arc::new(TelemetryCounters::default()),
2481 )
2482 .expect("writer");
2483
2484 let error = writer
2485 .submit(WriteRequest {
2486 label: "invalid-append".to_owned(),
2487 nodes: vec![],
2488 node_retires: vec![],
2489 edges: vec![],
2490 edge_retires: vec![],
2491 chunks: vec![],
2492 runs: vec![],
2493 steps: vec![],
2494 actions: vec![],
2495 optional_backfills: vec![],
2496 vec_inserts: vec![],
2497 operational_writes: vec![OperationalWrite::Append {
2498 collection: "audit_log".to_owned(),
2499 record_key: "evt-1".to_owned(),
2500 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2501 source_ref: Some("src-1".to_owned()),
2502 }],
2503 })
2504 .expect_err("invalid append must reject");
2505 assert!(matches!(error, EngineError::InvalidWrite(_)));
2506 assert!(error.to_string().contains("must be one of"));
2507
2508 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2509 let mutation_count: i64 = conn
2510 .query_row(
2511 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2512 [],
2513 |row| row.get(0),
2514 )
2515 .expect("mutation count");
2516 assert_eq!(mutation_count, 0);
2517 let filter_count: i64 = conn
2518 .query_row(
2519 "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2520 [],
2521 |row| row.get(0),
2522 )
2523 .expect("filter count");
2524 assert_eq!(filter_count, 0);
2525 }
2526
2527 #[test]
2528 fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2529 let db = NamedTempFile::new().expect("temporary db");
2530 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2531 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2532 conn.execute(
2533 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2534 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2535 [],
2536 )
2537 .expect("seed collection");
2538 drop(conn);
2539 let writer = WriterActor::start(
2540 db.path(),
2541 Arc::new(SchemaManager::new()),
2542 ProvenanceMode::Warn,
2543 Arc::new(TelemetryCounters::default()),
2544 )
2545 .expect("writer");
2546
2547 writer
2548 .submit(WriteRequest {
2549 label: "put-operational".to_owned(),
2550 nodes: vec![],
2551 node_retires: vec![],
2552 edges: vec![],
2553 edge_retires: vec![],
2554 chunks: vec![],
2555 runs: vec![],
2556 steps: vec![],
2557 actions: vec![],
2558 optional_backfills: vec![],
2559 vec_inserts: vec![],
2560 operational_writes: vec![OperationalWrite::Put {
2561 collection: "connector_health".to_owned(),
2562 record_key: "gmail".to_owned(),
2563 payload_json: r#"{"status":"ok"}"#.to_owned(),
2564 source_ref: Some("src-1".to_owned()),
2565 }],
2566 })
2567 .expect("put receipt");
2568
2569 writer
2570 .submit(WriteRequest {
2571 label: "delete-operational".to_owned(),
2572 nodes: vec![],
2573 node_retires: vec![],
2574 edges: vec![],
2575 edge_retires: vec![],
2576 chunks: vec![],
2577 runs: vec![],
2578 steps: vec![],
2579 actions: vec![],
2580 optional_backfills: vec![],
2581 vec_inserts: vec![],
2582 operational_writes: vec![OperationalWrite::Delete {
2583 collection: "connector_health".to_owned(),
2584 record_key: "gmail".to_owned(),
2585 source_ref: Some("src-2".to_owned()),
2586 }],
2587 })
2588 .expect("delete receipt");
2589
2590 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2591 let mutation_kinds: Vec<String> = {
2592 let mut stmt = conn
2593 .prepare(
2594 "SELECT op_kind FROM operational_mutations \
2595 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2596 ORDER BY mutation_order ASC",
2597 )
2598 .expect("stmt");
2599 stmt.query_map([], |row| row.get(0))
2600 .expect("rows")
2601 .collect::<Result<_, _>>()
2602 .expect("collect")
2603 };
2604 assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
2605 let current_count: i64 = conn
2606 .query_row(
2607 "SELECT count(*) FROM operational_current \
2608 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2609 [],
2610 |row| row.get(0),
2611 )
2612 .expect("current count");
2613 assert_eq!(current_count, 0);
2614 }
2615
2616 #[test]
2617 fn writer_delete_bypasses_validation_contract() {
2618 let db = NamedTempFile::new().expect("temporary db");
2619 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2620 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2621 conn.execute(
2622 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2623 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2624 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2625 )
2626 .expect("seed collection");
2627 drop(conn);
2628 let writer = WriterActor::start(
2629 db.path(),
2630 Arc::new(SchemaManager::new()),
2631 ProvenanceMode::Warn,
2632 Arc::new(TelemetryCounters::default()),
2633 )
2634 .expect("writer");
2635
2636 writer
2637 .submit(WriteRequest {
2638 label: "valid-put".to_owned(),
2639 nodes: vec![],
2640 node_retires: vec![],
2641 edges: vec![],
2642 edge_retires: vec![],
2643 chunks: vec![],
2644 runs: vec![],
2645 steps: vec![],
2646 actions: vec![],
2647 optional_backfills: vec![],
2648 vec_inserts: vec![],
2649 operational_writes: vec![OperationalWrite::Put {
2650 collection: "connector_health".to_owned(),
2651 record_key: "gmail".to_owned(),
2652 payload_json: r#"{"status":"ok"}"#.to_owned(),
2653 source_ref: Some("src-1".to_owned()),
2654 }],
2655 })
2656 .expect("put receipt");
2657 writer
2658 .submit(WriteRequest {
2659 label: "delete-after-put".to_owned(),
2660 nodes: vec![],
2661 node_retires: vec![],
2662 edges: vec![],
2663 edge_retires: vec![],
2664 chunks: vec![],
2665 runs: vec![],
2666 steps: vec![],
2667 actions: vec![],
2668 optional_backfills: vec![],
2669 vec_inserts: vec![],
2670 operational_writes: vec![OperationalWrite::Delete {
2671 collection: "connector_health".to_owned(),
2672 record_key: "gmail".to_owned(),
2673 source_ref: Some("src-2".to_owned()),
2674 }],
2675 })
2676 .expect("delete receipt");
2677
2678 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2679 let current_count: i64 = conn
2680 .query_row(
2681 "SELECT count(*) FROM operational_current \
2682 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2683 [],
2684 |row| row.get(0),
2685 )
2686 .expect("current count");
2687 assert_eq!(current_count, 0);
2688 }
2689
2690 #[test]
2691 fn writer_latest_state_secondary_indexes_track_put_and_delete() {
2692 let db = NamedTempFile::new().expect("temporary db");
2693 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2694 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2695 conn.execute(
2696 "INSERT INTO operational_collections \
2697 (name, kind, schema_json, retention_json, secondary_indexes_json) \
2698 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2699 [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
2700 )
2701 .expect("seed collection");
2702 drop(conn);
2703 let writer = WriterActor::start(
2704 db.path(),
2705 Arc::new(SchemaManager::new()),
2706 ProvenanceMode::Warn,
2707 Arc::new(TelemetryCounters::default()),
2708 )
2709 .expect("writer");
2710
2711 writer
2712 .submit(WriteRequest {
2713 label: "secondary-index-put".to_owned(),
2714 nodes: vec![],
2715 node_retires: vec![],
2716 edges: vec![],
2717 edge_retires: vec![],
2718 chunks: vec![],
2719 runs: vec![],
2720 steps: vec![],
2721 actions: vec![],
2722 optional_backfills: vec![],
2723 vec_inserts: vec![],
2724 operational_writes: vec![OperationalWrite::Put {
2725 collection: "connector_health".to_owned(),
2726 record_key: "gmail".to_owned(),
2727 payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
2728 .to_owned(),
2729 source_ref: Some("src-1".to_owned()),
2730 }],
2731 })
2732 .expect("put receipt");
2733
2734 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2735 let current_entry_count: i64 = conn
2736 .query_row(
2737 "SELECT count(*) FROM operational_secondary_index_entries \
2738 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2739 [],
2740 |row| row.get(0),
2741 )
2742 .expect("current secondary index count");
2743 assert_eq!(current_entry_count, 2);
2744 drop(conn);
2745
2746 writer
2747 .submit(WriteRequest {
2748 label: "secondary-index-delete".to_owned(),
2749 nodes: vec![],
2750 node_retires: vec![],
2751 edges: vec![],
2752 edge_retires: vec![],
2753 chunks: vec![],
2754 runs: vec![],
2755 steps: vec![],
2756 actions: vec![],
2757 optional_backfills: vec![],
2758 vec_inserts: vec![],
2759 operational_writes: vec![OperationalWrite::Delete {
2760 collection: "connector_health".to_owned(),
2761 record_key: "gmail".to_owned(),
2762 source_ref: Some("src-2".to_owned()),
2763 }],
2764 })
2765 .expect("delete receipt");
2766
2767 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2768 let current_entry_count: i64 = conn
2769 .query_row(
2770 "SELECT count(*) FROM operational_secondary_index_entries \
2771 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2772 [],
2773 |row| row.get(0),
2774 )
2775 .expect("current secondary index count");
2776 assert_eq!(current_entry_count, 0);
2777 }
2778
2779 #[test]
2780 fn writer_latest_state_operational_writes_persist_mutation_order() {
2781 let db = NamedTempFile::new().expect("temporary db");
2782 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2783 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2784 conn.execute(
2785 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2786 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2787 [],
2788 )
2789 .expect("seed collection");
2790 drop(conn);
2791
2792 let writer = WriterActor::start(
2793 db.path(),
2794 Arc::new(SchemaManager::new()),
2795 ProvenanceMode::Warn,
2796 Arc::new(TelemetryCounters::default()),
2797 )
2798 .expect("writer");
2799
2800 writer
2801 .submit(WriteRequest {
2802 label: "ordered-operational-batch".to_owned(),
2803 nodes: vec![],
2804 node_retires: vec![],
2805 edges: vec![],
2806 edge_retires: vec![],
2807 chunks: vec![],
2808 runs: vec![],
2809 steps: vec![],
2810 actions: vec![],
2811 optional_backfills: vec![],
2812 vec_inserts: vec![],
2813 operational_writes: vec![
2814 OperationalWrite::Put {
2815 collection: "connector_health".to_owned(),
2816 record_key: "gmail".to_owned(),
2817 payload_json: r#"{"status":"old"}"#.to_owned(),
2818 source_ref: Some("src-1".to_owned()),
2819 },
2820 OperationalWrite::Delete {
2821 collection: "connector_health".to_owned(),
2822 record_key: "gmail".to_owned(),
2823 source_ref: Some("src-2".to_owned()),
2824 },
2825 OperationalWrite::Put {
2826 collection: "connector_health".to_owned(),
2827 record_key: "gmail".to_owned(),
2828 payload_json: r#"{"status":"new"}"#.to_owned(),
2829 source_ref: Some("src-3".to_owned()),
2830 },
2831 ],
2832 })
2833 .expect("write receipt");
2834
2835 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2836 let rows: Vec<(String, i64)> = {
2837 let mut stmt = conn
2838 .prepare(
2839 "SELECT op_kind, mutation_order FROM operational_mutations \
2840 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2841 ORDER BY mutation_order ASC",
2842 )
2843 .expect("stmt");
2844 stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
2845 .expect("rows")
2846 .collect::<Result<_, _>>()
2847 .expect("collect")
2848 };
2849 assert_eq!(
2850 rows,
2851 vec![
2852 ("put".to_owned(), 1),
2853 ("delete".to_owned(), 2),
2854 ("put".to_owned(), 3),
2855 ]
2856 );
2857 let payload: String = conn
2858 .query_row(
2859 "SELECT payload_json FROM operational_current \
2860 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2861 [],
2862 |row| row.get(0),
2863 )
2864 .expect("current payload");
2865 assert_eq!(payload, r#"{"status":"new"}"#);
2866 }
2867
2868 #[test]
2869 fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
2870 let db = NamedTempFile::new().expect("temporary db");
2871 let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
2872 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2873 conn.execute(
2874 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2875 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2876 [],
2877 )
2878 .expect("seed collection");
2879
2880 let request = WriteRequest {
2881 label: "disabled-race".to_owned(),
2882 nodes: vec![],
2883 node_retires: vec![],
2884 edges: vec![],
2885 edge_retires: vec![],
2886 chunks: vec![],
2887 runs: vec![],
2888 steps: vec![],
2889 actions: vec![],
2890 optional_backfills: vec![],
2891 vec_inserts: vec![],
2892 operational_writes: vec![OperationalWrite::Put {
2893 collection: "connector_health".to_owned(),
2894 record_key: "gmail".to_owned(),
2895 payload_json: r#"{"status":"ok"}"#.to_owned(),
2896 source_ref: Some("src-1".to_owned()),
2897 }],
2898 };
2899 let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
2900 resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
2901
2902 conn.execute(
2903 "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
2904 [],
2905 )
2906 .expect("disable collection after preflight");
2907
2908 let error =
2909 apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
2910 assert!(matches!(error, EngineError::InvalidWrite(_)));
2911 assert!(error.to_string().contains("is disabled"));
2912
2913 let mutation_count: i64 = conn
2914 .query_row(
2915 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
2916 [],
2917 |row| row.get(0),
2918 )
2919 .expect("mutation count");
2920 assert_eq!(mutation_count, 0);
2921
2922 let current_count: i64 = conn
2923 .query_row(
2924 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
2925 [],
2926 |row| row.get(0),
2927 )
2928 .expect("current count");
2929 assert_eq!(current_count, 0);
2930 }
2931
2932 #[test]
2933 fn writer_enforce_validation_rejects_invalid_put_atomically() {
2934 let db = NamedTempFile::new().expect("temporary db");
2935 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2936 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2937 conn.execute(
2938 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2939 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2940 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2941 )
2942 .expect("seed collection");
2943 drop(conn);
2944 let writer = WriterActor::start(
2945 db.path(),
2946 Arc::new(SchemaManager::new()),
2947 ProvenanceMode::Warn,
2948 Arc::new(TelemetryCounters::default()),
2949 )
2950 .expect("writer");
2951
2952 let error = writer
2953 .submit(WriteRequest {
2954 label: "invalid-put".to_owned(),
2955 nodes: vec![NodeInsert {
2956 row_id: "row-1".to_owned(),
2957 logical_id: "lg-1".to_owned(),
2958 kind: "Meeting".to_owned(),
2959 properties: "{}".to_owned(),
2960 source_ref: Some("src-1".to_owned()),
2961 upsert: false,
2962 chunk_policy: ChunkPolicy::Preserve,
2963 content_ref: None,
2964 }],
2965 node_retires: vec![],
2966 edges: vec![],
2967 edge_retires: vec![],
2968 chunks: vec![],
2969 runs: vec![],
2970 steps: vec![],
2971 actions: vec![],
2972 optional_backfills: vec![],
2973 vec_inserts: vec![],
2974 operational_writes: vec![OperationalWrite::Put {
2975 collection: "connector_health".to_owned(),
2976 record_key: "gmail".to_owned(),
2977 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2978 source_ref: Some("src-1".to_owned()),
2979 }],
2980 })
2981 .expect_err("invalid put must reject");
2982 assert!(matches!(error, EngineError::InvalidWrite(_)));
2983 assert!(error.to_string().contains("must be one of"));
2984
2985 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2986 let node_count: i64 = conn
2987 .query_row(
2988 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2989 [],
2990 |row| row.get(0),
2991 )
2992 .expect("node count");
2993 assert_eq!(node_count, 0);
2994 let mutation_count: i64 = conn
2995 .query_row(
2996 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
2997 [],
2998 |row| row.get(0),
2999 )
3000 .expect("mutation count");
3001 assert_eq!(mutation_count, 0);
3002 let current_count: i64 = conn
3003 .query_row(
3004 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3005 [],
3006 |row| row.get(0),
3007 )
3008 .expect("current count");
3009 assert_eq!(current_count, 0);
3010 }
3011
3012 #[test]
3013 fn writer_rejects_append_against_latest_state_collection() {
3014 let db = NamedTempFile::new().expect("temporary db");
3015 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3016 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3017 conn.execute(
3018 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3019 VALUES ('connector_health', 'latest_state', '{}', '{}')",
3020 [],
3021 )
3022 .expect("seed collection");
3023 drop(conn);
3024 let writer = WriterActor::start(
3025 db.path(),
3026 Arc::new(SchemaManager::new()),
3027 ProvenanceMode::Warn,
3028 Arc::new(TelemetryCounters::default()),
3029 )
3030 .expect("writer");
3031
3032 let result = writer.submit(WriteRequest {
3033 label: "bad-append".to_owned(),
3034 nodes: vec![],
3035 node_retires: vec![],
3036 edges: vec![],
3037 edge_retires: vec![],
3038 chunks: vec![],
3039 runs: vec![],
3040 steps: vec![],
3041 actions: vec![],
3042 optional_backfills: vec![],
3043 vec_inserts: vec![],
3044 operational_writes: vec![OperationalWrite::Append {
3045 collection: "connector_health".to_owned(),
3046 record_key: "gmail".to_owned(),
3047 payload_json: r#"{"status":"ok"}"#.to_owned(),
3048 source_ref: Some("src-1".to_owned()),
3049 }],
3050 });
3051
3052 assert!(
3053 matches!(result, Err(EngineError::InvalidWrite(_))),
3054 "latest_state collection must reject Append"
3055 );
3056 }
3057
3058 #[test]
3059 fn writer_upsert_supersedes_prior_active_node() {
3060 let db = NamedTempFile::new().expect("temporary db");
3061 let writer = WriterActor::start(
3062 db.path(),
3063 Arc::new(SchemaManager::new()),
3064 ProvenanceMode::Warn,
3065 Arc::new(TelemetryCounters::default()),
3066 )
3067 .expect("writer");
3068
3069 writer
3070 .submit(WriteRequest {
3071 label: "v1".to_owned(),
3072 nodes: vec![NodeInsert {
3073 row_id: "row-1".to_owned(),
3074 logical_id: "lg-1".to_owned(),
3075 kind: "Meeting".to_owned(),
3076 properties: r#"{"version":1}"#.to_owned(),
3077 source_ref: Some("src-1".to_owned()),
3078 upsert: false,
3079 chunk_policy: ChunkPolicy::Preserve,
3080 content_ref: None,
3081 }],
3082 node_retires: vec![],
3083 edges: vec![],
3084 edge_retires: vec![],
3085 chunks: vec![],
3086 runs: vec![],
3087 steps: vec![],
3088 actions: vec![],
3089 optional_backfills: vec![],
3090 vec_inserts: vec![],
3091 operational_writes: vec![],
3092 })
3093 .expect("v1 write");
3094
3095 writer
3096 .submit(WriteRequest {
3097 label: "v2".to_owned(),
3098 nodes: vec![NodeInsert {
3099 row_id: "row-2".to_owned(),
3100 logical_id: "lg-1".to_owned(),
3101 kind: "Meeting".to_owned(),
3102 properties: r#"{"version":2}"#.to_owned(),
3103 source_ref: Some("src-2".to_owned()),
3104 upsert: true,
3105 chunk_policy: ChunkPolicy::Preserve,
3106 content_ref: None,
3107 }],
3108 node_retires: vec![],
3109 edges: vec![],
3110 edge_retires: vec![],
3111 chunks: vec![],
3112 runs: vec![],
3113 steps: vec![],
3114 actions: vec![],
3115 optional_backfills: vec![],
3116 vec_inserts: vec![],
3117 operational_writes: vec![],
3118 })
3119 .expect("v2 upsert write");
3120
3121 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3122 let (active_row_id, props): (String, String) = conn
3123 .query_row(
3124 "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3125 [],
3126 |row| Ok((row.get(0)?, row.get(1)?)),
3127 )
3128 .expect("active row");
3129 assert_eq!(active_row_id, "row-2");
3130 assert!(props.contains("\"version\":2"));
3131
3132 let superseded: i64 = conn
3133 .query_row(
3134 "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3135 [],
3136 |row| row.get(0),
3137 )
3138 .expect("superseded count");
3139 assert_eq!(superseded, 1);
3140 }
3141
3142 #[test]
3143 fn writer_inserts_edge_between_two_nodes() {
3144 let db = NamedTempFile::new().expect("temporary db");
3145 let writer = WriterActor::start(
3146 db.path(),
3147 Arc::new(SchemaManager::new()),
3148 ProvenanceMode::Warn,
3149 Arc::new(TelemetryCounters::default()),
3150 )
3151 .expect("writer");
3152
3153 writer
3154 .submit(WriteRequest {
3155 label: "nodes-and-edge".to_owned(),
3156 nodes: vec![
3157 NodeInsert {
3158 row_id: "row-meeting".to_owned(),
3159 logical_id: "meeting-1".to_owned(),
3160 kind: "Meeting".to_owned(),
3161 properties: "{}".to_owned(),
3162 source_ref: Some("src-1".to_owned()),
3163 upsert: false,
3164 chunk_policy: ChunkPolicy::Preserve,
3165 content_ref: None,
3166 },
3167 NodeInsert {
3168 row_id: "row-task".to_owned(),
3169 logical_id: "task-1".to_owned(),
3170 kind: "Task".to_owned(),
3171 properties: "{}".to_owned(),
3172 source_ref: Some("src-1".to_owned()),
3173 upsert: false,
3174 chunk_policy: ChunkPolicy::Preserve,
3175 content_ref: None,
3176 },
3177 ],
3178 node_retires: vec![],
3179 edges: vec![EdgeInsert {
3180 row_id: "edge-1".to_owned(),
3181 logical_id: "edge-lg-1".to_owned(),
3182 source_logical_id: "meeting-1".to_owned(),
3183 target_logical_id: "task-1".to_owned(),
3184 kind: "HAS_TASK".to_owned(),
3185 properties: "{}".to_owned(),
3186 source_ref: Some("src-1".to_owned()),
3187 upsert: false,
3188 }],
3189 edge_retires: vec![],
3190 chunks: vec![],
3191 runs: vec![],
3192 steps: vec![],
3193 actions: vec![],
3194 optional_backfills: vec![],
3195 vec_inserts: vec![],
3196 operational_writes: vec![],
3197 })
3198 .expect("write receipt");
3199
3200 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3201 let (src, tgt, kind): (String, String, String) = conn
3202 .query_row(
3203 "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3204 [],
3205 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3206 )
3207 .expect("edge row");
3208 assert_eq!(src, "meeting-1");
3209 assert_eq!(tgt, "task-1");
3210 assert_eq!(kind, "HAS_TASK");
3211 }
3212
3213 #[test]
3214 #[allow(clippy::too_many_lines)]
3215 fn writer_upsert_supersedes_prior_active_edge() {
3216 let db = NamedTempFile::new().expect("temporary db");
3217 let writer = WriterActor::start(
3218 db.path(),
3219 Arc::new(SchemaManager::new()),
3220 ProvenanceMode::Warn,
3221 Arc::new(TelemetryCounters::default()),
3222 )
3223 .expect("writer");
3224
3225 writer
3227 .submit(WriteRequest {
3228 label: "nodes".to_owned(),
3229 nodes: vec![
3230 NodeInsert {
3231 row_id: "row-a".to_owned(),
3232 logical_id: "node-a".to_owned(),
3233 kind: "Meeting".to_owned(),
3234 properties: "{}".to_owned(),
3235 source_ref: Some("src-1".to_owned()),
3236 upsert: false,
3237 chunk_policy: ChunkPolicy::Preserve,
3238 content_ref: None,
3239 },
3240 NodeInsert {
3241 row_id: "row-b".to_owned(),
3242 logical_id: "node-b".to_owned(),
3243 kind: "Task".to_owned(),
3244 properties: "{}".to_owned(),
3245 source_ref: Some("src-1".to_owned()),
3246 upsert: false,
3247 chunk_policy: ChunkPolicy::Preserve,
3248 content_ref: None,
3249 },
3250 ],
3251 node_retires: vec![],
3252 edges: vec![],
3253 edge_retires: vec![],
3254 chunks: vec![],
3255 runs: vec![],
3256 steps: vec![],
3257 actions: vec![],
3258 optional_backfills: vec![],
3259 vec_inserts: vec![],
3260 operational_writes: vec![],
3261 })
3262 .expect("nodes write");
3263
3264 writer
3266 .submit(WriteRequest {
3267 label: "edge-v1".to_owned(),
3268 nodes: vec![],
3269 node_retires: vec![],
3270 edges: vec![EdgeInsert {
3271 row_id: "edge-row-1".to_owned(),
3272 logical_id: "edge-lg-1".to_owned(),
3273 source_logical_id: "node-a".to_owned(),
3274 target_logical_id: "node-b".to_owned(),
3275 kind: "HAS_TASK".to_owned(),
3276 properties: r#"{"weight":1}"#.to_owned(),
3277 source_ref: Some("src-1".to_owned()),
3278 upsert: false,
3279 }],
3280 edge_retires: vec![],
3281 chunks: vec![],
3282 runs: vec![],
3283 steps: vec![],
3284 actions: vec![],
3285 optional_backfills: vec![],
3286 vec_inserts: vec![],
3287 operational_writes: vec![],
3288 })
3289 .expect("edge v1 write");
3290
3291 writer
3293 .submit(WriteRequest {
3294 label: "edge-v2".to_owned(),
3295 nodes: vec![],
3296 node_retires: vec![],
3297 edges: vec![EdgeInsert {
3298 row_id: "edge-row-2".to_owned(),
3299 logical_id: "edge-lg-1".to_owned(),
3300 source_logical_id: "node-a".to_owned(),
3301 target_logical_id: "node-b".to_owned(),
3302 kind: "HAS_TASK".to_owned(),
3303 properties: r#"{"weight":2}"#.to_owned(),
3304 source_ref: Some("src-2".to_owned()),
3305 upsert: true,
3306 }],
3307 edge_retires: vec![],
3308 chunks: vec![],
3309 runs: vec![],
3310 steps: vec![],
3311 actions: vec![],
3312 optional_backfills: vec![],
3313 vec_inserts: vec![],
3314 operational_writes: vec![],
3315 })
3316 .expect("edge v2 upsert");
3317
3318 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3319 let (active_row_id, props): (String, String) = conn
3320 .query_row(
3321 "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3322 [],
3323 |row| Ok((row.get(0)?, row.get(1)?)),
3324 )
3325 .expect("active edge");
3326 assert_eq!(active_row_id, "edge-row-2");
3327 assert!(props.contains("\"weight\":2"));
3328
3329 let superseded: i64 = conn
3330 .query_row(
3331 "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3332 [],
3333 |row| row.get(0),
3334 )
3335 .expect("superseded count");
3336 assert_eq!(superseded, 1);
3337 }
3338
3339 #[test]
3340 fn writer_fts_rows_are_written_to_database() {
3341 let db = NamedTempFile::new().expect("temporary db");
3342 let writer = WriterActor::start(
3343 db.path(),
3344 Arc::new(SchemaManager::new()),
3345 ProvenanceMode::Warn,
3346 Arc::new(TelemetryCounters::default()),
3347 )
3348 .expect("writer");
3349
3350 writer
3351 .submit(WriteRequest {
3352 label: "seed".to_owned(),
3353 nodes: vec![NodeInsert {
3354 row_id: "row-1".to_owned(),
3355 logical_id: "logical-1".to_owned(),
3356 kind: "Meeting".to_owned(),
3357 properties: "{}".to_owned(),
3358 source_ref: Some("src-1".to_owned()),
3359 upsert: false,
3360 chunk_policy: ChunkPolicy::Preserve,
3361 content_ref: None,
3362 }],
3363 node_retires: vec![],
3364 edges: vec![],
3365 edge_retires: vec![],
3366 chunks: vec![ChunkInsert {
3367 id: "chunk-1".to_owned(),
3368 node_logical_id: "logical-1".to_owned(),
3369 text_content: "budget discussion".to_owned(),
3370 byte_start: None,
3371 byte_end: None,
3372 content_hash: None,
3373 }],
3374 runs: vec![],
3375 steps: vec![],
3376 actions: vec![],
3377 optional_backfills: vec![],
3378 vec_inserts: vec![],
3379 operational_writes: vec![],
3380 })
3381 .expect("write receipt");
3382
3383 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3384 let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3385 conn.query_row(
3386 "SELECT chunk_id, node_logical_id, kind, text_content \
3387 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3388 [],
3389 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3390 )
3391 .expect("fts row");
3392 assert_eq!(chunk_id, "chunk-1");
3393 assert_eq!(node_logical_id, "logical-1");
3394 assert_eq!(kind, "Meeting");
3395 assert_eq!(text_content, "budget discussion");
3396 }
3397
3398 #[test]
3399 fn writer_receipt_warns_on_nodes_without_source_ref() {
3400 let db = NamedTempFile::new().expect("temporary db");
3401 let writer = WriterActor::start(
3402 db.path(),
3403 Arc::new(SchemaManager::new()),
3404 ProvenanceMode::Warn,
3405 Arc::new(TelemetryCounters::default()),
3406 )
3407 .expect("writer");
3408
3409 let receipt = writer
3410 .submit(WriteRequest {
3411 label: "no-source".to_owned(),
3412 nodes: vec![NodeInsert {
3413 row_id: "row-1".to_owned(),
3414 logical_id: "logical-1".to_owned(),
3415 kind: "Meeting".to_owned(),
3416 properties: "{}".to_owned(),
3417 source_ref: None,
3418 upsert: false,
3419 chunk_policy: ChunkPolicy::Preserve,
3420 content_ref: None,
3421 }],
3422 node_retires: vec![],
3423 edges: vec![],
3424 edge_retires: vec![],
3425 chunks: vec![],
3426 runs: vec![],
3427 steps: vec![],
3428 actions: vec![],
3429 optional_backfills: vec![],
3430 vec_inserts: vec![],
3431 operational_writes: vec![],
3432 })
3433 .expect("write receipt");
3434
3435 assert_eq!(receipt.provenance_warnings.len(), 1);
3436 assert!(receipt.provenance_warnings[0].contains("logical-1"));
3437 }
3438
3439 #[test]
3440 fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3441 let db = NamedTempFile::new().expect("temporary db");
3442 let writer = WriterActor::start(
3443 db.path(),
3444 Arc::new(SchemaManager::new()),
3445 ProvenanceMode::Warn,
3446 Arc::new(TelemetryCounters::default()),
3447 )
3448 .expect("writer");
3449
3450 let receipt = writer
3451 .submit(WriteRequest {
3452 label: "with-source".to_owned(),
3453 nodes: vec![NodeInsert {
3454 row_id: "row-1".to_owned(),
3455 logical_id: "logical-1".to_owned(),
3456 kind: "Meeting".to_owned(),
3457 properties: "{}".to_owned(),
3458 source_ref: Some("src-1".to_owned()),
3459 upsert: false,
3460 chunk_policy: ChunkPolicy::Preserve,
3461 content_ref: None,
3462 }],
3463 node_retires: vec![],
3464 edges: vec![],
3465 edge_retires: vec![],
3466 chunks: vec![],
3467 runs: vec![],
3468 steps: vec![],
3469 actions: vec![],
3470 optional_backfills: vec![],
3471 vec_inserts: vec![],
3472 operational_writes: vec![],
3473 })
3474 .expect("write receipt");
3475
3476 assert!(receipt.provenance_warnings.is_empty());
3477 }
3478
3479 #[test]
3480 fn writer_accepts_chunk_for_pre_existing_node() {
3481 let db = NamedTempFile::new().expect("temporary db");
3482 let writer = WriterActor::start(
3483 db.path(),
3484 Arc::new(SchemaManager::new()),
3485 ProvenanceMode::Warn,
3486 Arc::new(TelemetryCounters::default()),
3487 )
3488 .expect("writer");
3489
3490 writer
3492 .submit(WriteRequest {
3493 label: "r1".to_owned(),
3494 nodes: vec![NodeInsert {
3495 row_id: "row-1".to_owned(),
3496 logical_id: "logical-1".to_owned(),
3497 kind: "Meeting".to_owned(),
3498 properties: "{}".to_owned(),
3499 source_ref: Some("src-1".to_owned()),
3500 upsert: false,
3501 chunk_policy: ChunkPolicy::Preserve,
3502 content_ref: None,
3503 }],
3504 node_retires: vec![],
3505 edges: vec![],
3506 edge_retires: vec![],
3507 chunks: vec![],
3508 runs: vec![],
3509 steps: vec![],
3510 actions: vec![],
3511 optional_backfills: vec![],
3512 vec_inserts: vec![],
3513 operational_writes: vec![],
3514 })
3515 .expect("r1 write");
3516
3517 writer
3519 .submit(WriteRequest {
3520 label: "r2".to_owned(),
3521 nodes: vec![],
3522 node_retires: vec![],
3523 edges: vec![],
3524 edge_retires: vec![],
3525 chunks: vec![ChunkInsert {
3526 id: "chunk-1".to_owned(),
3527 node_logical_id: "logical-1".to_owned(),
3528 text_content: "budget discussion".to_owned(),
3529 byte_start: None,
3530 byte_end: None,
3531 content_hash: None,
3532 }],
3533 runs: vec![],
3534 steps: vec![],
3535 actions: vec![],
3536 optional_backfills: vec![],
3537 vec_inserts: vec![],
3538 operational_writes: vec![],
3539 })
3540 .expect("r2 write — chunk for pre-existing node");
3541
3542 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3543 let count: i64 = conn
3544 .query_row(
3545 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3546 [],
3547 |row| row.get(0),
3548 )
3549 .expect("fts count");
3550 assert_eq!(
3551 count, 1,
3552 "FTS row must exist for chunk attached to pre-existing node"
3553 );
3554 }
3555
3556 #[test]
3557 fn writer_rejects_chunk_for_completely_unknown_node() {
3558 let db = NamedTempFile::new().expect("temporary db");
3559 let writer = WriterActor::start(
3560 db.path(),
3561 Arc::new(SchemaManager::new()),
3562 ProvenanceMode::Warn,
3563 Arc::new(TelemetryCounters::default()),
3564 )
3565 .expect("writer");
3566
3567 let result = writer.submit(WriteRequest {
3568 label: "bad".to_owned(),
3569 nodes: vec![],
3570 node_retires: vec![],
3571 edges: vec![],
3572 edge_retires: vec![],
3573 chunks: vec![ChunkInsert {
3574 id: "chunk-1".to_owned(),
3575 node_logical_id: "nonexistent".to_owned(),
3576 text_content: "some text".to_owned(),
3577 byte_start: None,
3578 byte_end: None,
3579 content_hash: None,
3580 }],
3581 runs: vec![],
3582 steps: vec![],
3583 actions: vec![],
3584 optional_backfills: vec![],
3585 vec_inserts: vec![],
3586 operational_writes: vec![],
3587 });
3588
3589 assert!(
3590 matches!(result, Err(EngineError::InvalidWrite(_))),
3591 "completely unknown node must return InvalidWrite"
3592 );
3593 }
3594
3595 #[test]
3596 fn writer_executes_typed_nodes_chunks_and_derived_projections() {
3597 let db = NamedTempFile::new().expect("temporary db");
3598 let writer = WriterActor::start(
3599 db.path(),
3600 Arc::new(SchemaManager::new()),
3601 ProvenanceMode::Warn,
3602 Arc::new(TelemetryCounters::default()),
3603 )
3604 .expect("writer");
3605
3606 let receipt = writer
3607 .submit(WriteRequest {
3608 label: "seed".to_owned(),
3609 nodes: vec![NodeInsert {
3610 row_id: "row-1".to_owned(),
3611 logical_id: "logical-1".to_owned(),
3612 kind: "Meeting".to_owned(),
3613 properties: "{}".to_owned(),
3614 source_ref: None,
3615 upsert: false,
3616 chunk_policy: ChunkPolicy::Preserve,
3617 content_ref: None,
3618 }],
3619 node_retires: vec![],
3620 edges: vec![],
3621 edge_retires: vec![],
3622 chunks: vec![ChunkInsert {
3623 id: "chunk-1".to_owned(),
3624 node_logical_id: "logical-1".to_owned(),
3625 text_content: "budget discussion".to_owned(),
3626 byte_start: None,
3627 byte_end: None,
3628 content_hash: None,
3629 }],
3630 runs: vec![],
3631 steps: vec![],
3632 actions: vec![],
3633 optional_backfills: vec![],
3634 vec_inserts: vec![],
3635 operational_writes: vec![],
3636 })
3637 .expect("write receipt");
3638
3639 assert_eq!(receipt.label, "seed");
3640 }
3641
3642 #[test]
3643 fn writer_node_retire_supersedes_active_node() {
3644 let db = NamedTempFile::new().expect("temporary db");
3645 let writer = WriterActor::start(
3646 db.path(),
3647 Arc::new(SchemaManager::new()),
3648 ProvenanceMode::Warn,
3649 Arc::new(TelemetryCounters::default()),
3650 )
3651 .expect("writer");
3652
3653 writer
3654 .submit(WriteRequest {
3655 label: "seed".to_owned(),
3656 nodes: vec![NodeInsert {
3657 row_id: "row-1".to_owned(),
3658 logical_id: "meeting-1".to_owned(),
3659 kind: "Meeting".to_owned(),
3660 properties: "{}".to_owned(),
3661 source_ref: Some("src-1".to_owned()),
3662 upsert: false,
3663 chunk_policy: ChunkPolicy::Preserve,
3664 content_ref: None,
3665 }],
3666 node_retires: vec![],
3667 edges: vec![],
3668 edge_retires: vec![],
3669 chunks: vec![],
3670 runs: vec![],
3671 steps: vec![],
3672 actions: vec![],
3673 optional_backfills: vec![],
3674 vec_inserts: vec![],
3675 operational_writes: vec![],
3676 })
3677 .expect("seed write");
3678
3679 writer
3680 .submit(WriteRequest {
3681 label: "retire".to_owned(),
3682 nodes: vec![],
3683 node_retires: vec![NodeRetire {
3684 logical_id: "meeting-1".to_owned(),
3685 source_ref: Some("src-2".to_owned()),
3686 }],
3687 edges: vec![],
3688 edge_retires: vec![],
3689 chunks: vec![],
3690 runs: vec![],
3691 steps: vec![],
3692 actions: vec![],
3693 optional_backfills: vec![],
3694 vec_inserts: vec![],
3695 operational_writes: vec![],
3696 })
3697 .expect("retire write");
3698
3699 let conn = rusqlite::Connection::open(db.path()).expect("open");
3700 let active: i64 = conn
3701 .query_row(
3702 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
3703 [],
3704 |r| r.get(0),
3705 )
3706 .expect("count active");
3707 let historical: i64 = conn
3708 .query_row(
3709 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
3710 [],
3711 |r| r.get(0),
3712 )
3713 .expect("count historical");
3714
3715 assert_eq!(active, 0, "active count must be 0 after retire");
3716 assert_eq!(historical, 1, "historical count must be 1 after retire");
3717 }
3718
3719 #[test]
3720 fn writer_node_retire_preserves_chunks_and_clears_fts() {
3721 let db = NamedTempFile::new().expect("temporary db");
3722 let writer = WriterActor::start(
3723 db.path(),
3724 Arc::new(SchemaManager::new()),
3725 ProvenanceMode::Warn,
3726 Arc::new(TelemetryCounters::default()),
3727 )
3728 .expect("writer");
3729
3730 writer
3731 .submit(WriteRequest {
3732 label: "seed".to_owned(),
3733 nodes: vec![NodeInsert {
3734 row_id: "row-1".to_owned(),
3735 logical_id: "meeting-1".to_owned(),
3736 kind: "Meeting".to_owned(),
3737 properties: "{}".to_owned(),
3738 source_ref: Some("src-1".to_owned()),
3739 upsert: false,
3740 chunk_policy: ChunkPolicy::Preserve,
3741 content_ref: None,
3742 }],
3743 node_retires: vec![],
3744 edges: vec![],
3745 edge_retires: vec![],
3746 chunks: vec![ChunkInsert {
3747 id: "chunk-1".to_owned(),
3748 node_logical_id: "meeting-1".to_owned(),
3749 text_content: "budget discussion".to_owned(),
3750 byte_start: None,
3751 byte_end: None,
3752 content_hash: None,
3753 }],
3754 runs: vec![],
3755 steps: vec![],
3756 actions: vec![],
3757 optional_backfills: vec![],
3758 vec_inserts: vec![],
3759 operational_writes: vec![],
3760 })
3761 .expect("seed write");
3762
3763 writer
3764 .submit(WriteRequest {
3765 label: "retire".to_owned(),
3766 nodes: vec![],
3767 node_retires: vec![NodeRetire {
3768 logical_id: "meeting-1".to_owned(),
3769 source_ref: Some("src-2".to_owned()),
3770 }],
3771 edges: vec![],
3772 edge_retires: vec![],
3773 chunks: vec![],
3774 runs: vec![],
3775 steps: vec![],
3776 actions: vec![],
3777 optional_backfills: vec![],
3778 vec_inserts: vec![],
3779 operational_writes: vec![],
3780 })
3781 .expect("retire write");
3782
3783 let conn = rusqlite::Connection::open(db.path()).expect("open");
3784 let chunk_count: i64 = conn
3785 .query_row(
3786 "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
3787 [],
3788 |r| r.get(0),
3789 )
3790 .expect("chunk count");
3791 let fts_count: i64 = conn
3792 .query_row(
3793 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
3794 [],
3795 |r| r.get(0),
3796 )
3797 .expect("fts count");
3798
3799 assert_eq!(
3800 chunk_count, 1,
3801 "chunks must remain after node retire so restore can re-establish content"
3802 );
3803 assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
3804 }
3805
3806 #[test]
3807 fn writer_edge_retire_supersedes_active_edge() {
3808 let db = NamedTempFile::new().expect("temporary db");
3809 let writer = WriterActor::start(
3810 db.path(),
3811 Arc::new(SchemaManager::new()),
3812 ProvenanceMode::Warn,
3813 Arc::new(TelemetryCounters::default()),
3814 )
3815 .expect("writer");
3816
3817 writer
3818 .submit(WriteRequest {
3819 label: "seed".to_owned(),
3820 nodes: vec![
3821 NodeInsert {
3822 row_id: "row-a".to_owned(),
3823 logical_id: "node-a".to_owned(),
3824 kind: "Meeting".to_owned(),
3825 properties: "{}".to_owned(),
3826 source_ref: Some("src-1".to_owned()),
3827 upsert: false,
3828 chunk_policy: ChunkPolicy::Preserve,
3829 content_ref: None,
3830 },
3831 NodeInsert {
3832 row_id: "row-b".to_owned(),
3833 logical_id: "node-b".to_owned(),
3834 kind: "Task".to_owned(),
3835 properties: "{}".to_owned(),
3836 source_ref: Some("src-1".to_owned()),
3837 upsert: false,
3838 chunk_policy: ChunkPolicy::Preserve,
3839 content_ref: None,
3840 },
3841 ],
3842 node_retires: vec![],
3843 edges: vec![EdgeInsert {
3844 row_id: "edge-1".to_owned(),
3845 logical_id: "edge-lg-1".to_owned(),
3846 source_logical_id: "node-a".to_owned(),
3847 target_logical_id: "node-b".to_owned(),
3848 kind: "HAS_TASK".to_owned(),
3849 properties: "{}".to_owned(),
3850 source_ref: Some("src-1".to_owned()),
3851 upsert: false,
3852 }],
3853 edge_retires: vec![],
3854 chunks: vec![],
3855 runs: vec![],
3856 steps: vec![],
3857 actions: vec![],
3858 optional_backfills: vec![],
3859 vec_inserts: vec![],
3860 operational_writes: vec![],
3861 })
3862 .expect("seed write");
3863
3864 writer
3865 .submit(WriteRequest {
3866 label: "retire-edge".to_owned(),
3867 nodes: vec![],
3868 node_retires: vec![],
3869 edges: vec![],
3870 edge_retires: vec![EdgeRetire {
3871 logical_id: "edge-lg-1".to_owned(),
3872 source_ref: Some("src-2".to_owned()),
3873 }],
3874 chunks: vec![],
3875 runs: vec![],
3876 steps: vec![],
3877 actions: vec![],
3878 optional_backfills: vec![],
3879 vec_inserts: vec![],
3880 operational_writes: vec![],
3881 })
3882 .expect("retire edge write");
3883
3884 let conn = rusqlite::Connection::open(db.path()).expect("open");
3885 let active: i64 = conn
3886 .query_row(
3887 "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3888 [],
3889 |r| r.get(0),
3890 )
3891 .expect("active edge count");
3892
3893 assert_eq!(active, 0, "active edge count must be 0 after retire");
3894 }
3895
3896 #[test]
3897 fn writer_retire_without_source_ref_emits_provenance_warning() {
3898 let db = NamedTempFile::new().expect("temporary db");
3899 let writer = WriterActor::start(
3900 db.path(),
3901 Arc::new(SchemaManager::new()),
3902 ProvenanceMode::Warn,
3903 Arc::new(TelemetryCounters::default()),
3904 )
3905 .expect("writer");
3906
3907 writer
3908 .submit(WriteRequest {
3909 label: "seed".to_owned(),
3910 nodes: vec![NodeInsert {
3911 row_id: "row-1".to_owned(),
3912 logical_id: "meeting-1".to_owned(),
3913 kind: "Meeting".to_owned(),
3914 properties: "{}".to_owned(),
3915 source_ref: Some("src-1".to_owned()),
3916 upsert: false,
3917 chunk_policy: ChunkPolicy::Preserve,
3918 content_ref: None,
3919 }],
3920 node_retires: vec![],
3921 edges: vec![],
3922 edge_retires: vec![],
3923 chunks: vec![],
3924 runs: vec![],
3925 steps: vec![],
3926 actions: vec![],
3927 optional_backfills: vec![],
3928 vec_inserts: vec![],
3929 operational_writes: vec![],
3930 })
3931 .expect("seed write");
3932
3933 let receipt = writer
3934 .submit(WriteRequest {
3935 label: "retire-no-src".to_owned(),
3936 nodes: vec![],
3937 node_retires: vec![NodeRetire {
3938 logical_id: "meeting-1".to_owned(),
3939 source_ref: None,
3940 }],
3941 edges: vec![],
3942 edge_retires: vec![],
3943 chunks: vec![],
3944 runs: vec![],
3945 steps: vec![],
3946 actions: vec![],
3947 optional_backfills: vec![],
3948 vec_inserts: vec![],
3949 operational_writes: vec![],
3950 })
3951 .expect("retire write");
3952
3953 assert!(
3954 !receipt.provenance_warnings.is_empty(),
3955 "retire without source_ref must emit a provenance warning"
3956 );
3957 }
3958
3959 #[test]
3960 #[allow(clippy::too_many_lines)]
3961 fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
3962 let db = NamedTempFile::new().expect("temporary db");
3963 let writer = WriterActor::start(
3964 db.path(),
3965 Arc::new(SchemaManager::new()),
3966 ProvenanceMode::Warn,
3967 Arc::new(TelemetryCounters::default()),
3968 )
3969 .expect("writer");
3970
3971 writer
3972 .submit(WriteRequest {
3973 label: "v1".to_owned(),
3974 nodes: vec![NodeInsert {
3975 row_id: "row-1".to_owned(),
3976 logical_id: "meeting-1".to_owned(),
3977 kind: "Meeting".to_owned(),
3978 properties: "{}".to_owned(),
3979 source_ref: Some("src-1".to_owned()),
3980 upsert: false,
3981 chunk_policy: ChunkPolicy::Preserve,
3982 content_ref: None,
3983 }],
3984 node_retires: vec![],
3985 edges: vec![],
3986 edge_retires: vec![],
3987 chunks: vec![ChunkInsert {
3988 id: "chunk-old".to_owned(),
3989 node_logical_id: "meeting-1".to_owned(),
3990 text_content: "old text".to_owned(),
3991 byte_start: None,
3992 byte_end: None,
3993 content_hash: None,
3994 }],
3995 runs: vec![],
3996 steps: vec![],
3997 actions: vec![],
3998 optional_backfills: vec![],
3999 vec_inserts: vec![],
4000 operational_writes: vec![],
4001 })
4002 .expect("v1 write");
4003
4004 writer
4005 .submit(WriteRequest {
4006 label: "v2".to_owned(),
4007 nodes: vec![NodeInsert {
4008 row_id: "row-2".to_owned(),
4009 logical_id: "meeting-1".to_owned(),
4010 kind: "Meeting".to_owned(),
4011 properties: "{}".to_owned(),
4012 source_ref: Some("src-2".to_owned()),
4013 upsert: true,
4014 chunk_policy: ChunkPolicy::Replace,
4015 content_ref: None,
4016 }],
4017 node_retires: vec![],
4018 edges: vec![],
4019 edge_retires: vec![],
4020 chunks: vec![ChunkInsert {
4021 id: "chunk-new".to_owned(),
4022 node_logical_id: "meeting-1".to_owned(),
4023 text_content: "new text".to_owned(),
4024 byte_start: None,
4025 byte_end: None,
4026 content_hash: None,
4027 }],
4028 runs: vec![],
4029 steps: vec![],
4030 actions: vec![],
4031 optional_backfills: vec![],
4032 vec_inserts: vec![],
4033 operational_writes: vec![],
4034 })
4035 .expect("v2 write");
4036
4037 let conn = rusqlite::Connection::open(db.path()).expect("open");
4038 let old_chunk: i64 = conn
4039 .query_row(
4040 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4041 [],
4042 |r| r.get(0),
4043 )
4044 .expect("old chunk count");
4045 let new_chunk: i64 = conn
4046 .query_row(
4047 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4048 [],
4049 |r| r.get(0),
4050 )
4051 .expect("new chunk count");
4052 let fts_old: i64 = conn
4053 .query_row(
4054 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4055 [],
4056 |r| r.get(0),
4057 )
4058 .expect("old fts count");
4059
4060 assert_eq!(
4061 old_chunk, 0,
4062 "old chunk must be deleted by ChunkPolicy::Replace"
4063 );
4064 assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4065 assert_eq!(
4066 fts_old, 0,
4067 "old FTS row must be deleted by ChunkPolicy::Replace"
4068 );
4069 }
4070
4071 #[test]
4072 fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4073 let db = NamedTempFile::new().expect("temporary db");
4074 let writer = WriterActor::start(
4075 db.path(),
4076 Arc::new(SchemaManager::new()),
4077 ProvenanceMode::Warn,
4078 Arc::new(TelemetryCounters::default()),
4079 )
4080 .expect("writer");
4081
4082 writer
4083 .submit(WriteRequest {
4084 label: "v1".to_owned(),
4085 nodes: vec![NodeInsert {
4086 row_id: "row-1".to_owned(),
4087 logical_id: "meeting-1".to_owned(),
4088 kind: "Meeting".to_owned(),
4089 properties: "{}".to_owned(),
4090 source_ref: Some("src-1".to_owned()),
4091 upsert: false,
4092 chunk_policy: ChunkPolicy::Preserve,
4093 content_ref: None,
4094 }],
4095 node_retires: vec![],
4096 edges: vec![],
4097 edge_retires: vec![],
4098 chunks: vec![ChunkInsert {
4099 id: "chunk-old".to_owned(),
4100 node_logical_id: "meeting-1".to_owned(),
4101 text_content: "old text".to_owned(),
4102 byte_start: None,
4103 byte_end: None,
4104 content_hash: None,
4105 }],
4106 runs: vec![],
4107 steps: vec![],
4108 actions: vec![],
4109 optional_backfills: vec![],
4110 vec_inserts: vec![],
4111 operational_writes: vec![],
4112 })
4113 .expect("v1 write");
4114
4115 writer
4116 .submit(WriteRequest {
4117 label: "v2-props-only".to_owned(),
4118 nodes: vec![NodeInsert {
4119 row_id: "row-2".to_owned(),
4120 logical_id: "meeting-1".to_owned(),
4121 kind: "Meeting".to_owned(),
4122 properties: r#"{"status":"updated"}"#.to_owned(),
4123 source_ref: Some("src-2".to_owned()),
4124 upsert: true,
4125 chunk_policy: ChunkPolicy::Preserve,
4126 content_ref: None,
4127 }],
4128 node_retires: vec![],
4129 edges: vec![],
4130 edge_retires: vec![],
4131 chunks: vec![],
4132 runs: vec![],
4133 steps: vec![],
4134 actions: vec![],
4135 optional_backfills: vec![],
4136 vec_inserts: vec![],
4137 operational_writes: vec![],
4138 })
4139 .expect("v2 preserve write");
4140
4141 let conn = rusqlite::Connection::open(db.path()).expect("open");
4142 let old_chunk: i64 = conn
4143 .query_row(
4144 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4145 [],
4146 |r| r.get(0),
4147 )
4148 .expect("old chunk count");
4149
4150 assert_eq!(
4151 old_chunk, 1,
4152 "old chunk must be preserved by ChunkPolicy::Preserve"
4153 );
4154 }
4155
4156 #[test]
4157 fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4158 let db = NamedTempFile::new().expect("temporary db");
4159 let writer = WriterActor::start(
4160 db.path(),
4161 Arc::new(SchemaManager::new()),
4162 ProvenanceMode::Warn,
4163 Arc::new(TelemetryCounters::default()),
4164 )
4165 .expect("writer");
4166
4167 writer
4168 .submit(WriteRequest {
4169 label: "v1".to_owned(),
4170 nodes: vec![NodeInsert {
4171 row_id: "row-1".to_owned(),
4172 logical_id: "meeting-1".to_owned(),
4173 kind: "Meeting".to_owned(),
4174 properties: "{}".to_owned(),
4175 source_ref: Some("src-1".to_owned()),
4176 upsert: false,
4177 chunk_policy: ChunkPolicy::Preserve,
4178 content_ref: None,
4179 }],
4180 node_retires: vec![],
4181 edges: vec![],
4182 edge_retires: vec![],
4183 chunks: vec![ChunkInsert {
4184 id: "chunk-existing".to_owned(),
4185 node_logical_id: "meeting-1".to_owned(),
4186 text_content: "existing text".to_owned(),
4187 byte_start: None,
4188 byte_end: None,
4189 content_hash: None,
4190 }],
4191 runs: vec![],
4192 steps: vec![],
4193 actions: vec![],
4194 optional_backfills: vec![],
4195 vec_inserts: vec![],
4196 operational_writes: vec![],
4197 })
4198 .expect("v1 write");
4199
4200 writer
4202 .submit(WriteRequest {
4203 label: "insert-no-upsert".to_owned(),
4204 nodes: vec![NodeInsert {
4205 row_id: "row-2".to_owned(),
4206 logical_id: "meeting-2".to_owned(),
4207 kind: "Meeting".to_owned(),
4208 properties: "{}".to_owned(),
4209 source_ref: Some("src-2".to_owned()),
4210 upsert: false,
4211 chunk_policy: ChunkPolicy::Replace,
4212 content_ref: None,
4213 }],
4214 node_retires: vec![],
4215 edges: vec![],
4216 edge_retires: vec![],
4217 chunks: vec![],
4218 runs: vec![],
4219 steps: vec![],
4220 actions: vec![],
4221 optional_backfills: vec![],
4222 vec_inserts: vec![],
4223 operational_writes: vec![],
4224 })
4225 .expect("insert no-upsert write");
4226
4227 let conn = rusqlite::Connection::open(db.path()).expect("open");
4228 let existing_chunk: i64 = conn
4229 .query_row(
4230 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4231 [],
4232 |r| r.get(0),
4233 )
4234 .expect("chunk count");
4235
4236 assert_eq!(
4237 existing_chunk, 1,
4238 "ChunkPolicy::Replace without upsert must not delete existing chunks"
4239 );
4240 }
4241
4242 #[test]
4243 fn writer_run_upsert_supersedes_prior_active_run() {
4244 let db = NamedTempFile::new().expect("temporary db");
4245 let writer = WriterActor::start(
4246 db.path(),
4247 Arc::new(SchemaManager::new()),
4248 ProvenanceMode::Warn,
4249 Arc::new(TelemetryCounters::default()),
4250 )
4251 .expect("writer");
4252
4253 writer
4254 .submit(WriteRequest {
4255 label: "v1".to_owned(),
4256 nodes: vec![],
4257 node_retires: vec![],
4258 edges: vec![],
4259 edge_retires: vec![],
4260 chunks: vec![],
4261 runs: vec![RunInsert {
4262 id: "run-v1".to_owned(),
4263 kind: "session".to_owned(),
4264 status: "completed".to_owned(),
4265 properties: "{}".to_owned(),
4266 source_ref: Some("src-1".to_owned()),
4267 upsert: false,
4268 supersedes_id: None,
4269 }],
4270 steps: vec![],
4271 actions: vec![],
4272 optional_backfills: vec![],
4273 vec_inserts: vec![],
4274 operational_writes: vec![],
4275 })
4276 .expect("v1 run write");
4277
4278 writer
4279 .submit(WriteRequest {
4280 label: "v2".to_owned(),
4281 nodes: vec![],
4282 node_retires: vec![],
4283 edges: vec![],
4284 edge_retires: vec![],
4285 chunks: vec![],
4286 runs: vec![RunInsert {
4287 id: "run-v2".to_owned(),
4288 kind: "session".to_owned(),
4289 status: "completed".to_owned(),
4290 properties: "{}".to_owned(),
4291 source_ref: Some("src-2".to_owned()),
4292 upsert: true,
4293 supersedes_id: Some("run-v1".to_owned()),
4294 }],
4295 steps: vec![],
4296 actions: vec![],
4297 optional_backfills: vec![],
4298 vec_inserts: vec![],
4299 operational_writes: vec![],
4300 })
4301 .expect("v2 run write");
4302
4303 let conn = rusqlite::Connection::open(db.path()).expect("open");
4304 let v1_historical: i64 = conn
4305 .query_row(
4306 "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4307 [],
4308 |r| r.get(0),
4309 )
4310 .expect("v1 historical count");
4311 let v2_active: i64 = conn
4312 .query_row(
4313 "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4314 [],
4315 |r| r.get(0),
4316 )
4317 .expect("v2 active count");
4318
4319 assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4320 assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4321 }
4322
4323 #[test]
4324 fn writer_step_upsert_supersedes_prior_active_step() {
4325 let db = NamedTempFile::new().expect("temporary db");
4326 let writer = WriterActor::start(
4327 db.path(),
4328 Arc::new(SchemaManager::new()),
4329 ProvenanceMode::Warn,
4330 Arc::new(TelemetryCounters::default()),
4331 )
4332 .expect("writer");
4333
4334 writer
4335 .submit(WriteRequest {
4336 label: "v1".to_owned(),
4337 nodes: vec![],
4338 node_retires: vec![],
4339 edges: vec![],
4340 edge_retires: vec![],
4341 chunks: vec![],
4342 runs: vec![RunInsert {
4343 id: "run-1".to_owned(),
4344 kind: "session".to_owned(),
4345 status: "completed".to_owned(),
4346 properties: "{}".to_owned(),
4347 source_ref: Some("src-1".to_owned()),
4348 upsert: false,
4349 supersedes_id: None,
4350 }],
4351 steps: vec![StepInsert {
4352 id: "step-v1".to_owned(),
4353 run_id: "run-1".to_owned(),
4354 kind: "llm".to_owned(),
4355 status: "completed".to_owned(),
4356 properties: "{}".to_owned(),
4357 source_ref: Some("src-1".to_owned()),
4358 upsert: false,
4359 supersedes_id: None,
4360 }],
4361 actions: vec![],
4362 optional_backfills: vec![],
4363 vec_inserts: vec![],
4364 operational_writes: vec![],
4365 })
4366 .expect("v1 step write");
4367
4368 writer
4369 .submit(WriteRequest {
4370 label: "v2".to_owned(),
4371 nodes: vec![],
4372 node_retires: vec![],
4373 edges: vec![],
4374 edge_retires: vec![],
4375 chunks: vec![],
4376 runs: vec![],
4377 steps: vec![StepInsert {
4378 id: "step-v2".to_owned(),
4379 run_id: "run-1".to_owned(),
4380 kind: "llm".to_owned(),
4381 status: "completed".to_owned(),
4382 properties: "{}".to_owned(),
4383 source_ref: Some("src-2".to_owned()),
4384 upsert: true,
4385 supersedes_id: Some("step-v1".to_owned()),
4386 }],
4387 actions: vec![],
4388 optional_backfills: vec![],
4389 vec_inserts: vec![],
4390 operational_writes: vec![],
4391 })
4392 .expect("v2 step write");
4393
4394 let conn = rusqlite::Connection::open(db.path()).expect("open");
4395 let v1_historical: i64 = conn
4396 .query_row(
4397 "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4398 [],
4399 |r| r.get(0),
4400 )
4401 .expect("v1 historical count");
4402 let v2_active: i64 = conn
4403 .query_row(
4404 "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4405 [],
4406 |r| r.get(0),
4407 )
4408 .expect("v2 active count");
4409
4410 assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4411 assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4412 }
4413
4414 #[test]
4415 fn writer_action_upsert_supersedes_prior_active_action() {
4416 let db = NamedTempFile::new().expect("temporary db");
4417 let writer = WriterActor::start(
4418 db.path(),
4419 Arc::new(SchemaManager::new()),
4420 ProvenanceMode::Warn,
4421 Arc::new(TelemetryCounters::default()),
4422 )
4423 .expect("writer");
4424
4425 writer
4426 .submit(WriteRequest {
4427 label: "v1".to_owned(),
4428 nodes: vec![],
4429 node_retires: vec![],
4430 edges: vec![],
4431 edge_retires: vec![],
4432 chunks: vec![],
4433 runs: vec![RunInsert {
4434 id: "run-1".to_owned(),
4435 kind: "session".to_owned(),
4436 status: "completed".to_owned(),
4437 properties: "{}".to_owned(),
4438 source_ref: Some("src-1".to_owned()),
4439 upsert: false,
4440 supersedes_id: None,
4441 }],
4442 steps: vec![StepInsert {
4443 id: "step-1".to_owned(),
4444 run_id: "run-1".to_owned(),
4445 kind: "llm".to_owned(),
4446 status: "completed".to_owned(),
4447 properties: "{}".to_owned(),
4448 source_ref: Some("src-1".to_owned()),
4449 upsert: false,
4450 supersedes_id: None,
4451 }],
4452 actions: vec![ActionInsert {
4453 id: "action-v1".to_owned(),
4454 step_id: "step-1".to_owned(),
4455 kind: "emit".to_owned(),
4456 status: "completed".to_owned(),
4457 properties: "{}".to_owned(),
4458 source_ref: Some("src-1".to_owned()),
4459 upsert: false,
4460 supersedes_id: None,
4461 }],
4462 optional_backfills: vec![],
4463 vec_inserts: vec![],
4464 operational_writes: vec![],
4465 })
4466 .expect("v1 action write");
4467
4468 writer
4469 .submit(WriteRequest {
4470 label: "v2".to_owned(),
4471 nodes: vec![],
4472 node_retires: vec![],
4473 edges: vec![],
4474 edge_retires: vec![],
4475 chunks: vec![],
4476 runs: vec![],
4477 steps: vec![],
4478 actions: vec![ActionInsert {
4479 id: "action-v2".to_owned(),
4480 step_id: "step-1".to_owned(),
4481 kind: "emit".to_owned(),
4482 status: "completed".to_owned(),
4483 properties: "{}".to_owned(),
4484 source_ref: Some("src-2".to_owned()),
4485 upsert: true,
4486 supersedes_id: Some("action-v1".to_owned()),
4487 }],
4488 optional_backfills: vec![],
4489 vec_inserts: vec![],
4490 operational_writes: vec![],
4491 })
4492 .expect("v2 action write");
4493
4494 let conn = rusqlite::Connection::open(db.path()).expect("open");
4495 let v1_historical: i64 = conn
4496 .query_row(
4497 "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4498 [],
4499 |r| r.get(0),
4500 )
4501 .expect("v1 historical count");
4502 let v2_active: i64 = conn
4503 .query_row(
4504 "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4505 [],
4506 |r| r.get(0),
4507 )
4508 .expect("v2 active count");
4509
4510 assert_eq!(
4511 v1_historical, 1,
4512 "action-v1 must be historical after upsert"
4513 );
4514 assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4515 }
4516
4517 #[test]
4520 fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4521 let db = NamedTempFile::new().expect("temporary db");
4522 let writer = WriterActor::start(
4523 db.path(),
4524 Arc::new(SchemaManager::new()),
4525 ProvenanceMode::Warn,
4526 Arc::new(TelemetryCounters::default()),
4527 )
4528 .expect("writer");
4529
4530 let result = writer.submit(WriteRequest {
4531 label: "bad".to_owned(),
4532 nodes: vec![],
4533 node_retires: vec![],
4534 edges: vec![],
4535 edge_retires: vec![],
4536 chunks: vec![],
4537 runs: vec![RunInsert {
4538 id: "run-1".to_owned(),
4539 kind: "session".to_owned(),
4540 status: "completed".to_owned(),
4541 properties: "{}".to_owned(),
4542 source_ref: None,
4543 upsert: true,
4544 supersedes_id: None,
4545 }],
4546 steps: vec![],
4547 actions: vec![],
4548 optional_backfills: vec![],
4549 vec_inserts: vec![],
4550 operational_writes: vec![],
4551 });
4552
4553 assert!(
4554 matches!(result, Err(EngineError::InvalidWrite(_))),
4555 "run upsert=true without supersedes_id must return InvalidWrite"
4556 );
4557 }
4558
4559 #[test]
4560 fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4561 let db = NamedTempFile::new().expect("temporary db");
4562 let writer = WriterActor::start(
4563 db.path(),
4564 Arc::new(SchemaManager::new()),
4565 ProvenanceMode::Warn,
4566 Arc::new(TelemetryCounters::default()),
4567 )
4568 .expect("writer");
4569
4570 let result = writer.submit(WriteRequest {
4571 label: "bad".to_owned(),
4572 nodes: vec![],
4573 node_retires: vec![],
4574 edges: vec![],
4575 edge_retires: vec![],
4576 chunks: vec![],
4577 runs: vec![],
4578 steps: vec![StepInsert {
4579 id: "step-1".to_owned(),
4580 run_id: "run-1".to_owned(),
4581 kind: "llm".to_owned(),
4582 status: "completed".to_owned(),
4583 properties: "{}".to_owned(),
4584 source_ref: None,
4585 upsert: true,
4586 supersedes_id: None,
4587 }],
4588 actions: vec![],
4589 optional_backfills: vec![],
4590 vec_inserts: vec![],
4591 operational_writes: vec![],
4592 });
4593
4594 assert!(
4595 matches!(result, Err(EngineError::InvalidWrite(_))),
4596 "step upsert=true without supersedes_id must return InvalidWrite"
4597 );
4598 }
4599
4600 #[test]
4601 fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
4602 let db = NamedTempFile::new().expect("temporary db");
4603 let writer = WriterActor::start(
4604 db.path(),
4605 Arc::new(SchemaManager::new()),
4606 ProvenanceMode::Warn,
4607 Arc::new(TelemetryCounters::default()),
4608 )
4609 .expect("writer");
4610
4611 let result = writer.submit(WriteRequest {
4612 label: "bad".to_owned(),
4613 nodes: vec![],
4614 node_retires: vec![],
4615 edges: vec![],
4616 edge_retires: vec![],
4617 chunks: vec![],
4618 runs: vec![],
4619 steps: vec![],
4620 actions: vec![ActionInsert {
4621 id: "action-1".to_owned(),
4622 step_id: "step-1".to_owned(),
4623 kind: "emit".to_owned(),
4624 status: "completed".to_owned(),
4625 properties: "{}".to_owned(),
4626 source_ref: None,
4627 upsert: true,
4628 supersedes_id: None,
4629 }],
4630 optional_backfills: vec![],
4631 vec_inserts: vec![],
4632 operational_writes: vec![],
4633 });
4634
4635 assert!(
4636 matches!(result, Err(EngineError::InvalidWrite(_))),
4637 "action upsert=true without supersedes_id must return InvalidWrite"
4638 );
4639 }
4640
4641 #[test]
4644 fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
4645 let db = NamedTempFile::new().expect("temporary db");
4646 let writer = WriterActor::start(
4647 db.path(),
4648 Arc::new(SchemaManager::new()),
4649 ProvenanceMode::Warn,
4650 Arc::new(TelemetryCounters::default()),
4651 )
4652 .expect("writer");
4653
4654 let receipt = writer
4655 .submit(WriteRequest {
4656 label: "test".to_owned(),
4657 nodes: vec![
4658 NodeInsert {
4659 row_id: "row-a".to_owned(),
4660 logical_id: "node-a".to_owned(),
4661 kind: "Meeting".to_owned(),
4662 properties: "{}".to_owned(),
4663 source_ref: Some("src-1".to_owned()),
4664 upsert: false,
4665 chunk_policy: ChunkPolicy::Preserve,
4666 content_ref: None,
4667 },
4668 NodeInsert {
4669 row_id: "row-b".to_owned(),
4670 logical_id: "node-b".to_owned(),
4671 kind: "Task".to_owned(),
4672 properties: "{}".to_owned(),
4673 source_ref: Some("src-1".to_owned()),
4674 upsert: false,
4675 chunk_policy: ChunkPolicy::Preserve,
4676 content_ref: None,
4677 },
4678 ],
4679 node_retires: vec![],
4680 edges: vec![EdgeInsert {
4681 row_id: "edge-1".to_owned(),
4682 logical_id: "edge-lg-1".to_owned(),
4683 source_logical_id: "node-a".to_owned(),
4684 target_logical_id: "node-b".to_owned(),
4685 kind: "HAS_TASK".to_owned(),
4686 properties: "{}".to_owned(),
4687 source_ref: None,
4688 upsert: false,
4689 }],
4690 edge_retires: vec![],
4691 chunks: vec![],
4692 runs: vec![],
4693 steps: vec![],
4694 actions: vec![],
4695 optional_backfills: vec![],
4696 vec_inserts: vec![],
4697 operational_writes: vec![],
4698 })
4699 .expect("write");
4700
4701 assert!(
4702 !receipt.provenance_warnings.is_empty(),
4703 "edge insert without source_ref must emit a provenance warning"
4704 );
4705 }
4706
4707 #[test]
4708 fn writer_run_insert_without_source_ref_emits_provenance_warning() {
4709 let db = NamedTempFile::new().expect("temporary db");
4710 let writer = WriterActor::start(
4711 db.path(),
4712 Arc::new(SchemaManager::new()),
4713 ProvenanceMode::Warn,
4714 Arc::new(TelemetryCounters::default()),
4715 )
4716 .expect("writer");
4717
4718 let receipt = writer
4719 .submit(WriteRequest {
4720 label: "test".to_owned(),
4721 nodes: vec![],
4722 node_retires: vec![],
4723 edges: vec![],
4724 edge_retires: vec![],
4725 chunks: vec![],
4726 runs: vec![RunInsert {
4727 id: "run-1".to_owned(),
4728 kind: "session".to_owned(),
4729 status: "completed".to_owned(),
4730 properties: "{}".to_owned(),
4731 source_ref: None,
4732 upsert: false,
4733 supersedes_id: None,
4734 }],
4735 steps: vec![],
4736 actions: vec![],
4737 optional_backfills: vec![],
4738 vec_inserts: vec![],
4739 operational_writes: vec![],
4740 })
4741 .expect("write");
4742
4743 assert!(
4744 !receipt.provenance_warnings.is_empty(),
4745 "run insert without source_ref must emit a provenance warning"
4746 );
4747 }
4748
4749 #[test]
4752 fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
4753 let db = NamedTempFile::new().expect("temporary db");
4754 let writer = WriterActor::start(
4755 db.path(),
4756 Arc::new(SchemaManager::new()),
4757 ProvenanceMode::Warn,
4758 Arc::new(TelemetryCounters::default()),
4759 )
4760 .expect("writer");
4761
4762 writer
4764 .submit(WriteRequest {
4765 label: "seed".to_owned(),
4766 nodes: vec![NodeInsert {
4767 row_id: "row-1".to_owned(),
4768 logical_id: "meeting-1".to_owned(),
4769 kind: "Meeting".to_owned(),
4770 properties: "{}".to_owned(),
4771 source_ref: Some("src-1".to_owned()),
4772 upsert: false,
4773 chunk_policy: ChunkPolicy::Preserve,
4774 content_ref: None,
4775 }],
4776 node_retires: vec![],
4777 edges: vec![],
4778 edge_retires: vec![],
4779 chunks: vec![],
4780 runs: vec![],
4781 steps: vec![],
4782 actions: vec![],
4783 optional_backfills: vec![],
4784 vec_inserts: vec![],
4785 operational_writes: vec![],
4786 })
4787 .expect("seed write");
4788
4789 let result = writer.submit(WriteRequest {
4791 label: "bad".to_owned(),
4792 nodes: vec![],
4793 node_retires: vec![NodeRetire {
4794 logical_id: "meeting-1".to_owned(),
4795 source_ref: Some("src-2".to_owned()),
4796 }],
4797 edges: vec![],
4798 edge_retires: vec![],
4799 chunks: vec![ChunkInsert {
4800 id: "chunk-bad".to_owned(),
4801 node_logical_id: "meeting-1".to_owned(),
4802 text_content: "some text".to_owned(),
4803 byte_start: None,
4804 byte_end: None,
4805 content_hash: None,
4806 }],
4807 runs: vec![],
4808 steps: vec![],
4809 actions: vec![],
4810 optional_backfills: vec![],
4811 vec_inserts: vec![],
4812 operational_writes: vec![],
4813 });
4814
4815 assert!(
4816 matches!(result, Err(EngineError::InvalidWrite(_))),
4817 "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
4818 );
4819 }
4820
4821 #[test]
4824 fn writer_batch_insert_multiple_nodes() {
4825 let db = NamedTempFile::new().expect("temporary db");
4826 let writer = WriterActor::start(
4827 db.path(),
4828 Arc::new(SchemaManager::new()),
4829 ProvenanceMode::Warn,
4830 Arc::new(TelemetryCounters::default()),
4831 )
4832 .expect("writer");
4833
4834 let nodes: Vec<NodeInsert> = (0..100)
4835 .map(|i| NodeInsert {
4836 row_id: format!("row-{i}"),
4837 logical_id: format!("lg-{i}"),
4838 kind: "Note".to_owned(),
4839 properties: "{}".to_owned(),
4840 source_ref: Some("batch-src".to_owned()),
4841 upsert: false,
4842 chunk_policy: ChunkPolicy::Preserve,
4843 content_ref: None,
4844 })
4845 .collect();
4846
4847 writer
4848 .submit(WriteRequest {
4849 label: "batch".to_owned(),
4850 nodes,
4851 node_retires: vec![],
4852 edges: vec![],
4853 edge_retires: vec![],
4854 chunks: vec![],
4855 runs: vec![],
4856 steps: vec![],
4857 actions: vec![],
4858 optional_backfills: vec![],
4859 vec_inserts: vec![],
4860 operational_writes: vec![],
4861 })
4862 .expect("batch write");
4863
4864 let conn = rusqlite::Connection::open(db.path()).expect("open");
4865 let count: i64 = conn
4866 .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
4867 .expect("count nodes");
4868 assert_eq!(
4869 count, 100,
4870 "all 100 nodes must be present after batch insert"
4871 );
4872 }
4873
4874 #[test]
4877 fn prepare_write_rejects_empty_node_row_id() {
4878 let db = NamedTempFile::new().expect("temporary db");
4879 let writer = WriterActor::start(
4880 db.path(),
4881 Arc::new(SchemaManager::new()),
4882 ProvenanceMode::Warn,
4883 Arc::new(TelemetryCounters::default()),
4884 )
4885 .expect("writer");
4886
4887 let result = writer.submit(WriteRequest {
4888 label: "test".to_owned(),
4889 nodes: vec![NodeInsert {
4890 row_id: String::new(),
4891 logical_id: "lg-1".to_owned(),
4892 kind: "Note".to_owned(),
4893 properties: "{}".to_owned(),
4894 source_ref: None,
4895 upsert: false,
4896 chunk_policy: ChunkPolicy::Preserve,
4897 content_ref: None,
4898 }],
4899 node_retires: vec![],
4900 edges: vec![],
4901 edge_retires: vec![],
4902 chunks: vec![],
4903 runs: vec![],
4904 steps: vec![],
4905 actions: vec![],
4906 optional_backfills: vec![],
4907 vec_inserts: vec![],
4908 operational_writes: vec![],
4909 });
4910
4911 assert!(
4912 matches!(result, Err(EngineError::InvalidWrite(_))),
4913 "empty row_id must be rejected"
4914 );
4915 }
4916
4917 #[test]
4918 fn prepare_write_rejects_empty_node_logical_id() {
4919 let db = NamedTempFile::new().expect("temporary db");
4920 let writer = WriterActor::start(
4921 db.path(),
4922 Arc::new(SchemaManager::new()),
4923 ProvenanceMode::Warn,
4924 Arc::new(TelemetryCounters::default()),
4925 )
4926 .expect("writer");
4927
4928 let result = writer.submit(WriteRequest {
4929 label: "test".to_owned(),
4930 nodes: vec![NodeInsert {
4931 row_id: "row-1".to_owned(),
4932 logical_id: String::new(),
4933 kind: "Note".to_owned(),
4934 properties: "{}".to_owned(),
4935 source_ref: None,
4936 upsert: false,
4937 chunk_policy: ChunkPolicy::Preserve,
4938 content_ref: None,
4939 }],
4940 node_retires: vec![],
4941 edges: vec![],
4942 edge_retires: vec![],
4943 chunks: vec![],
4944 runs: vec![],
4945 steps: vec![],
4946 actions: vec![],
4947 optional_backfills: vec![],
4948 vec_inserts: vec![],
4949 operational_writes: vec![],
4950 });
4951
4952 assert!(
4953 matches!(result, Err(EngineError::InvalidWrite(_))),
4954 "empty logical_id must be rejected"
4955 );
4956 }
4957
4958 #[test]
4959 fn prepare_write_rejects_duplicate_row_ids_in_request() {
4960 let db = NamedTempFile::new().expect("temporary db");
4961 let writer = WriterActor::start(
4962 db.path(),
4963 Arc::new(SchemaManager::new()),
4964 ProvenanceMode::Warn,
4965 Arc::new(TelemetryCounters::default()),
4966 )
4967 .expect("writer");
4968
4969 let result = writer.submit(WriteRequest {
4970 label: "test".to_owned(),
4971 nodes: vec![
4972 NodeInsert {
4973 row_id: "row-1".to_owned(),
4974 logical_id: "lg-1".to_owned(),
4975 kind: "Note".to_owned(),
4976 properties: "{}".to_owned(),
4977 source_ref: None,
4978 upsert: false,
4979 chunk_policy: ChunkPolicy::Preserve,
4980 content_ref: None,
4981 },
4982 NodeInsert {
4983 row_id: "row-1".to_owned(), logical_id: "lg-2".to_owned(),
4985 kind: "Note".to_owned(),
4986 properties: "{}".to_owned(),
4987 source_ref: None,
4988 upsert: false,
4989 chunk_policy: ChunkPolicy::Preserve,
4990 content_ref: None,
4991 },
4992 ],
4993 node_retires: vec![],
4994 edges: vec![],
4995 edge_retires: vec![],
4996 chunks: vec![],
4997 runs: vec![],
4998 steps: vec![],
4999 actions: vec![],
5000 optional_backfills: vec![],
5001 vec_inserts: vec![],
5002 operational_writes: vec![],
5003 });
5004
5005 assert!(
5006 matches!(result, Err(EngineError::InvalidWrite(_))),
5007 "duplicate row_id within request must be rejected"
5008 );
5009 }
5010
5011 #[test]
5012 fn prepare_write_rejects_empty_chunk_id() {
5013 let db = NamedTempFile::new().expect("temporary db");
5014 let writer = WriterActor::start(
5015 db.path(),
5016 Arc::new(SchemaManager::new()),
5017 ProvenanceMode::Warn,
5018 Arc::new(TelemetryCounters::default()),
5019 )
5020 .expect("writer");
5021
5022 let result = writer.submit(WriteRequest {
5023 label: "test".to_owned(),
5024 nodes: vec![NodeInsert {
5025 row_id: "row-1".to_owned(),
5026 logical_id: "lg-1".to_owned(),
5027 kind: "Note".to_owned(),
5028 properties: "{}".to_owned(),
5029 source_ref: None,
5030 upsert: false,
5031 chunk_policy: ChunkPolicy::Preserve,
5032 content_ref: None,
5033 }],
5034 node_retires: vec![],
5035 edges: vec![],
5036 edge_retires: vec![],
5037 chunks: vec![ChunkInsert {
5038 id: String::new(),
5039 node_logical_id: "lg-1".to_owned(),
5040 text_content: "some text".to_owned(),
5041 byte_start: None,
5042 byte_end: None,
5043 content_hash: None,
5044 }],
5045 runs: vec![],
5046 steps: vec![],
5047 actions: vec![],
5048 optional_backfills: vec![],
5049 vec_inserts: vec![],
5050 operational_writes: vec![],
5051 });
5052
5053 assert!(
5054 matches!(result, Err(EngineError::InvalidWrite(_))),
5055 "empty chunk id must be rejected"
5056 );
5057 }
5058
5059 #[test]
5062 fn writer_receipt_warns_on_step_without_source_ref() {
5063 let db = NamedTempFile::new().expect("temporary db");
5064 let writer = WriterActor::start(
5065 db.path(),
5066 Arc::new(SchemaManager::new()),
5067 ProvenanceMode::Warn,
5068 Arc::new(TelemetryCounters::default()),
5069 )
5070 .expect("writer");
5071
5072 writer
5074 .submit(WriteRequest {
5075 label: "seed-run".to_owned(),
5076 nodes: vec![],
5077 node_retires: vec![],
5078 edges: vec![],
5079 edge_retires: vec![],
5080 chunks: vec![],
5081 runs: vec![RunInsert {
5082 id: "run-1".to_owned(),
5083 kind: "session".to_owned(),
5084 status: "active".to_owned(),
5085 properties: "{}".to_owned(),
5086 source_ref: Some("src-1".to_owned()),
5087 upsert: false,
5088 supersedes_id: None,
5089 }],
5090 steps: vec![],
5091 actions: vec![],
5092 optional_backfills: vec![],
5093 vec_inserts: vec![],
5094 operational_writes: vec![],
5095 })
5096 .expect("seed run");
5097
5098 let receipt = writer
5099 .submit(WriteRequest {
5100 label: "test".to_owned(),
5101 nodes: vec![],
5102 node_retires: vec![],
5103 edges: vec![],
5104 edge_retires: vec![],
5105 chunks: vec![],
5106 runs: vec![],
5107 steps: vec![StepInsert {
5108 id: "step-1".to_owned(),
5109 run_id: "run-1".to_owned(),
5110 kind: "llm_call".to_owned(),
5111 status: "completed".to_owned(),
5112 properties: "{}".to_owned(),
5113 source_ref: None,
5114 upsert: false,
5115 supersedes_id: None,
5116 }],
5117 actions: vec![],
5118 optional_backfills: vec![],
5119 vec_inserts: vec![],
5120 operational_writes: vec![],
5121 })
5122 .expect("write");
5123
5124 assert!(
5125 !receipt.provenance_warnings.is_empty(),
5126 "step insert without source_ref must emit a provenance warning"
5127 );
5128 }
5129
5130 #[test]
5131 fn writer_receipt_warns_on_action_without_source_ref() {
5132 let db = NamedTempFile::new().expect("temporary db");
5133 let writer = WriterActor::start(
5134 db.path(),
5135 Arc::new(SchemaManager::new()),
5136 ProvenanceMode::Warn,
5137 Arc::new(TelemetryCounters::default()),
5138 )
5139 .expect("writer");
5140
5141 writer
5143 .submit(WriteRequest {
5144 label: "seed".to_owned(),
5145 nodes: vec![],
5146 node_retires: vec![],
5147 edges: vec![],
5148 edge_retires: vec![],
5149 chunks: vec![],
5150 runs: vec![RunInsert {
5151 id: "run-1".to_owned(),
5152 kind: "session".to_owned(),
5153 status: "active".to_owned(),
5154 properties: "{}".to_owned(),
5155 source_ref: Some("src-1".to_owned()),
5156 upsert: false,
5157 supersedes_id: None,
5158 }],
5159 steps: vec![StepInsert {
5160 id: "step-1".to_owned(),
5161 run_id: "run-1".to_owned(),
5162 kind: "llm_call".to_owned(),
5163 status: "completed".to_owned(),
5164 properties: "{}".to_owned(),
5165 source_ref: Some("src-1".to_owned()),
5166 upsert: false,
5167 supersedes_id: None,
5168 }],
5169 actions: vec![],
5170 optional_backfills: vec![],
5171 vec_inserts: vec![],
5172 operational_writes: vec![],
5173 })
5174 .expect("seed");
5175
5176 let receipt = writer
5177 .submit(WriteRequest {
5178 label: "test".to_owned(),
5179 nodes: vec![],
5180 node_retires: vec![],
5181 edges: vec![],
5182 edge_retires: vec![],
5183 chunks: vec![],
5184 runs: vec![],
5185 steps: vec![],
5186 actions: vec![ActionInsert {
5187 id: "action-1".to_owned(),
5188 step_id: "step-1".to_owned(),
5189 kind: "tool_call".to_owned(),
5190 status: "completed".to_owned(),
5191 properties: "{}".to_owned(),
5192 source_ref: None,
5193 upsert: false,
5194 supersedes_id: None,
5195 }],
5196 optional_backfills: vec![],
5197 vec_inserts: vec![],
5198 operational_writes: vec![],
5199 })
5200 .expect("write");
5201
5202 assert!(
5203 !receipt.provenance_warnings.is_empty(),
5204 "action insert without source_ref must emit a provenance warning"
5205 );
5206 }
5207
5208 #[test]
5209 fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5210 let db = NamedTempFile::new().expect("temporary db");
5211 let writer = WriterActor::start(
5212 db.path(),
5213 Arc::new(SchemaManager::new()),
5214 ProvenanceMode::Warn,
5215 Arc::new(TelemetryCounters::default()),
5216 )
5217 .expect("writer");
5218
5219 let receipt = writer
5220 .submit(WriteRequest {
5221 label: "test".to_owned(),
5222 nodes: vec![NodeInsert {
5223 row_id: "row-1".to_owned(),
5224 logical_id: "node-1".to_owned(),
5225 kind: "Note".to_owned(),
5226 properties: "{}".to_owned(),
5227 source_ref: Some("src-1".to_owned()),
5228 upsert: false,
5229 chunk_policy: ChunkPolicy::Preserve,
5230 content_ref: None,
5231 }],
5232 node_retires: vec![],
5233 edges: vec![],
5234 edge_retires: vec![],
5235 chunks: vec![],
5236 runs: vec![RunInsert {
5237 id: "run-1".to_owned(),
5238 kind: "session".to_owned(),
5239 status: "active".to_owned(),
5240 properties: "{}".to_owned(),
5241 source_ref: Some("src-1".to_owned()),
5242 upsert: false,
5243 supersedes_id: None,
5244 }],
5245 steps: vec![StepInsert {
5246 id: "step-1".to_owned(),
5247 run_id: "run-1".to_owned(),
5248 kind: "llm_call".to_owned(),
5249 status: "completed".to_owned(),
5250 properties: "{}".to_owned(),
5251 source_ref: Some("src-1".to_owned()),
5252 upsert: false,
5253 supersedes_id: None,
5254 }],
5255 actions: vec![ActionInsert {
5256 id: "action-1".to_owned(),
5257 step_id: "step-1".to_owned(),
5258 kind: "tool_call".to_owned(),
5259 status: "completed".to_owned(),
5260 properties: "{}".to_owned(),
5261 source_ref: Some("src-1".to_owned()),
5262 upsert: false,
5263 supersedes_id: None,
5264 }],
5265 optional_backfills: vec![],
5266 vec_inserts: vec![],
5267 operational_writes: vec![],
5268 })
5269 .expect("write");
5270
5271 assert!(
5272 receipt.provenance_warnings.is_empty(),
5273 "no warnings expected when all types have source_ref; got: {:?}",
5274 receipt.provenance_warnings
5275 );
5276 }
5277
5278 #[test]
5281 fn default_provenance_mode_is_warn() {
5282 let db = NamedTempFile::new().expect("temporary db");
5284 let writer = WriterActor::start(
5285 db.path(),
5286 Arc::new(SchemaManager::new()),
5287 ProvenanceMode::default(),
5288 Arc::new(TelemetryCounters::default()),
5289 )
5290 .expect("writer");
5291
5292 let receipt = writer
5293 .submit(WriteRequest {
5294 label: "test".to_owned(),
5295 nodes: vec![NodeInsert {
5296 row_id: "row-1".to_owned(),
5297 logical_id: "node-1".to_owned(),
5298 kind: "Note".to_owned(),
5299 properties: "{}".to_owned(),
5300 source_ref: None,
5301 upsert: false,
5302 chunk_policy: ChunkPolicy::Preserve,
5303 content_ref: None,
5304 }],
5305 node_retires: vec![],
5306 edges: vec![],
5307 edge_retires: vec![],
5308 chunks: vec![],
5309 runs: vec![],
5310 steps: vec![],
5311 actions: vec![],
5312 optional_backfills: vec![],
5313 vec_inserts: vec![],
5314 operational_writes: vec![],
5315 })
5316 .expect("Warn mode must not reject missing source_ref");
5317
5318 assert!(
5319 !receipt.provenance_warnings.is_empty(),
5320 "Warn mode must emit a warning instead of rejecting"
5321 );
5322 }
5323
5324 #[test]
5325 fn require_mode_rejects_node_without_source_ref() {
5326 let db = NamedTempFile::new().expect("temporary db");
5327 let writer = WriterActor::start(
5328 db.path(),
5329 Arc::new(SchemaManager::new()),
5330 ProvenanceMode::Require,
5331 Arc::new(TelemetryCounters::default()),
5332 )
5333 .expect("writer");
5334
5335 let result = writer.submit(WriteRequest {
5336 label: "test".to_owned(),
5337 nodes: vec![NodeInsert {
5338 row_id: "row-1".to_owned(),
5339 logical_id: "node-1".to_owned(),
5340 kind: "Note".to_owned(),
5341 properties: "{}".to_owned(),
5342 source_ref: None,
5343 upsert: false,
5344 chunk_policy: ChunkPolicy::Preserve,
5345 content_ref: None,
5346 }],
5347 node_retires: vec![],
5348 edges: vec![],
5349 edge_retires: vec![],
5350 chunks: vec![],
5351 runs: vec![],
5352 steps: vec![],
5353 actions: vec![],
5354 optional_backfills: vec![],
5355 vec_inserts: vec![],
5356 operational_writes: vec![],
5357 });
5358
5359 assert!(
5360 matches!(result, Err(EngineError::InvalidWrite(_))),
5361 "Require mode must reject node without source_ref"
5362 );
5363 }
5364
5365 #[test]
5366 fn require_mode_accepts_node_with_source_ref() {
5367 let db = NamedTempFile::new().expect("temporary db");
5368 let writer = WriterActor::start(
5369 db.path(),
5370 Arc::new(SchemaManager::new()),
5371 ProvenanceMode::Require,
5372 Arc::new(TelemetryCounters::default()),
5373 )
5374 .expect("writer");
5375
5376 let result = writer.submit(WriteRequest {
5377 label: "test".to_owned(),
5378 nodes: vec![NodeInsert {
5379 row_id: "row-1".to_owned(),
5380 logical_id: "node-1".to_owned(),
5381 kind: "Note".to_owned(),
5382 properties: "{}".to_owned(),
5383 source_ref: Some("src-1".to_owned()),
5384 upsert: false,
5385 chunk_policy: ChunkPolicy::Preserve,
5386 content_ref: None,
5387 }],
5388 node_retires: vec![],
5389 edges: vec![],
5390 edge_retires: vec![],
5391 chunks: vec![],
5392 runs: vec![],
5393 steps: vec![],
5394 actions: vec![],
5395 optional_backfills: vec![],
5396 vec_inserts: vec![],
5397 operational_writes: vec![],
5398 });
5399
5400 assert!(
5401 result.is_ok(),
5402 "Require mode must accept node with source_ref"
5403 );
5404 }
5405
5406 #[test]
5407 fn require_mode_rejects_edge_without_source_ref() {
5408 let db = NamedTempFile::new().expect("temporary db");
5409 let writer = WriterActor::start(
5410 db.path(),
5411 Arc::new(SchemaManager::new()),
5412 ProvenanceMode::Require,
5413 Arc::new(TelemetryCounters::default()),
5414 )
5415 .expect("writer");
5416
5417 let result = writer.submit(WriteRequest {
5419 label: "test".to_owned(),
5420 nodes: vec![
5421 NodeInsert {
5422 row_id: "row-a".to_owned(),
5423 logical_id: "node-a".to_owned(),
5424 kind: "Note".to_owned(),
5425 properties: "{}".to_owned(),
5426 source_ref: Some("src-1".to_owned()),
5427 upsert: false,
5428 chunk_policy: ChunkPolicy::Preserve,
5429 content_ref: None,
5430 },
5431 NodeInsert {
5432 row_id: "row-b".to_owned(),
5433 logical_id: "node-b".to_owned(),
5434 kind: "Note".to_owned(),
5435 properties: "{}".to_owned(),
5436 source_ref: Some("src-1".to_owned()),
5437 upsert: false,
5438 chunk_policy: ChunkPolicy::Preserve,
5439 content_ref: None,
5440 },
5441 ],
5442 node_retires: vec![],
5443 edges: vec![EdgeInsert {
5444 row_id: "edge-row-1".to_owned(),
5445 logical_id: "edge-1".to_owned(),
5446 source_logical_id: "node-a".to_owned(),
5447 target_logical_id: "node-b".to_owned(),
5448 kind: "LINKS_TO".to_owned(),
5449 properties: "{}".to_owned(),
5450 source_ref: None,
5451 upsert: false,
5452 }],
5453 edge_retires: vec![],
5454 chunks: vec![],
5455 runs: vec![],
5456 steps: vec![],
5457 actions: vec![],
5458 optional_backfills: vec![],
5459 vec_inserts: vec![],
5460 operational_writes: vec![],
5461 });
5462
5463 assert!(
5464 matches!(result, Err(EngineError::InvalidWrite(_))),
5465 "Require mode must reject edge without source_ref"
5466 );
5467 }
5468
5469 #[test]
5472 fn fts_row_has_correct_kind_from_co_submitted_node() {
5473 let db = NamedTempFile::new().expect("temporary db");
5474 let writer = WriterActor::start(
5475 db.path(),
5476 Arc::new(SchemaManager::new()),
5477 ProvenanceMode::Warn,
5478 Arc::new(TelemetryCounters::default()),
5479 )
5480 .expect("writer");
5481
5482 writer
5483 .submit(WriteRequest {
5484 label: "test".to_owned(),
5485 nodes: vec![NodeInsert {
5486 row_id: "row-1".to_owned(),
5487 logical_id: "node-1".to_owned(),
5488 kind: "Meeting".to_owned(),
5489 properties: "{}".to_owned(),
5490 source_ref: Some("src-1".to_owned()),
5491 upsert: false,
5492 chunk_policy: ChunkPolicy::Preserve,
5493 content_ref: None,
5494 }],
5495 node_retires: vec![],
5496 edges: vec![],
5497 edge_retires: vec![],
5498 chunks: vec![ChunkInsert {
5499 id: "chunk-1".to_owned(),
5500 node_logical_id: "node-1".to_owned(),
5501 text_content: "some text".to_owned(),
5502 byte_start: None,
5503 byte_end: None,
5504 content_hash: None,
5505 }],
5506 runs: vec![],
5507 steps: vec![],
5508 actions: vec![],
5509 optional_backfills: vec![],
5510 vec_inserts: vec![],
5511 operational_writes: vec![],
5512 })
5513 .expect("write");
5514
5515 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5516 let kind: String = conn
5517 .query_row(
5518 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5519 [],
5520 |row| row.get(0),
5521 )
5522 .expect("fts row");
5523
5524 assert_eq!(kind, "Meeting");
5525 }
5526
5527 #[test]
5528 fn fts_row_has_correct_text_content() {
5529 let db = NamedTempFile::new().expect("temporary db");
5530 let writer = WriterActor::start(
5531 db.path(),
5532 Arc::new(SchemaManager::new()),
5533 ProvenanceMode::Warn,
5534 Arc::new(TelemetryCounters::default()),
5535 )
5536 .expect("writer");
5537
5538 writer
5539 .submit(WriteRequest {
5540 label: "test".to_owned(),
5541 nodes: vec![NodeInsert {
5542 row_id: "row-1".to_owned(),
5543 logical_id: "node-1".to_owned(),
5544 kind: "Note".to_owned(),
5545 properties: "{}".to_owned(),
5546 source_ref: Some("src-1".to_owned()),
5547 upsert: false,
5548 chunk_policy: ChunkPolicy::Preserve,
5549 content_ref: None,
5550 }],
5551 node_retires: vec![],
5552 edges: vec![],
5553 edge_retires: vec![],
5554 chunks: vec![ChunkInsert {
5555 id: "chunk-1".to_owned(),
5556 node_logical_id: "node-1".to_owned(),
5557 text_content: "exactly this text".to_owned(),
5558 byte_start: None,
5559 byte_end: None,
5560 content_hash: None,
5561 }],
5562 runs: vec![],
5563 steps: vec![],
5564 actions: vec![],
5565 optional_backfills: vec![],
5566 vec_inserts: vec![],
5567 operational_writes: vec![],
5568 })
5569 .expect("write");
5570
5571 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5572 let text: String = conn
5573 .query_row(
5574 "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5575 [],
5576 |row| row.get(0),
5577 )
5578 .expect("fts row");
5579
5580 assert_eq!(text, "exactly this text");
5581 }
5582
5583 #[test]
5584 fn fts_row_has_correct_kind_from_pre_existing_node() {
5585 let db = NamedTempFile::new().expect("temporary db");
5586 let writer = WriterActor::start(
5587 db.path(),
5588 Arc::new(SchemaManager::new()),
5589 ProvenanceMode::Warn,
5590 Arc::new(TelemetryCounters::default()),
5591 )
5592 .expect("writer");
5593
5594 writer
5596 .submit(WriteRequest {
5597 label: "r1".to_owned(),
5598 nodes: vec![NodeInsert {
5599 row_id: "row-1".to_owned(),
5600 logical_id: "node-1".to_owned(),
5601 kind: "Document".to_owned(),
5602 properties: "{}".to_owned(),
5603 source_ref: Some("src-1".to_owned()),
5604 upsert: false,
5605 chunk_policy: ChunkPolicy::Preserve,
5606 content_ref: None,
5607 }],
5608 node_retires: vec![],
5609 edges: vec![],
5610 edge_retires: vec![],
5611 chunks: vec![],
5612 runs: vec![],
5613 steps: vec![],
5614 actions: vec![],
5615 optional_backfills: vec![],
5616 vec_inserts: vec![],
5617 operational_writes: vec![],
5618 })
5619 .expect("r1 write");
5620
5621 writer
5623 .submit(WriteRequest {
5624 label: "r2".to_owned(),
5625 nodes: vec![],
5626 node_retires: vec![],
5627 edges: vec![],
5628 edge_retires: vec![],
5629 chunks: vec![ChunkInsert {
5630 id: "chunk-1".to_owned(),
5631 node_logical_id: "node-1".to_owned(),
5632 text_content: "some text".to_owned(),
5633 byte_start: None,
5634 byte_end: None,
5635 content_hash: None,
5636 }],
5637 runs: vec![],
5638 steps: vec![],
5639 actions: vec![],
5640 optional_backfills: vec![],
5641 vec_inserts: vec![],
5642 operational_writes: vec![],
5643 })
5644 .expect("r2 write");
5645
5646 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5647 let kind: String = conn
5648 .query_row(
5649 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5650 [],
5651 |row| row.get(0),
5652 )
5653 .expect("fts row");
5654
5655 assert_eq!(kind, "Document");
5656 }
5657
5658 #[test]
5659 fn fts_derives_rows_for_multiple_chunks_per_node() {
5660 let db = NamedTempFile::new().expect("temporary db");
5661 let writer = WriterActor::start(
5662 db.path(),
5663 Arc::new(SchemaManager::new()),
5664 ProvenanceMode::Warn,
5665 Arc::new(TelemetryCounters::default()),
5666 )
5667 .expect("writer");
5668
5669 writer
5670 .submit(WriteRequest {
5671 label: "test".to_owned(),
5672 nodes: vec![NodeInsert {
5673 row_id: "row-1".to_owned(),
5674 logical_id: "node-1".to_owned(),
5675 kind: "Meeting".to_owned(),
5676 properties: "{}".to_owned(),
5677 source_ref: Some("src-1".to_owned()),
5678 upsert: false,
5679 chunk_policy: ChunkPolicy::Preserve,
5680 content_ref: None,
5681 }],
5682 node_retires: vec![],
5683 edges: vec![],
5684 edge_retires: vec![],
5685 chunks: vec![
5686 ChunkInsert {
5687 id: "chunk-a".to_owned(),
5688 node_logical_id: "node-1".to_owned(),
5689 text_content: "intro".to_owned(),
5690 byte_start: None,
5691 byte_end: None,
5692 content_hash: None,
5693 },
5694 ChunkInsert {
5695 id: "chunk-b".to_owned(),
5696 node_logical_id: "node-1".to_owned(),
5697 text_content: "body".to_owned(),
5698 byte_start: None,
5699 byte_end: None,
5700 content_hash: None,
5701 },
5702 ChunkInsert {
5703 id: "chunk-c".to_owned(),
5704 node_logical_id: "node-1".to_owned(),
5705 text_content: "conclusion".to_owned(),
5706 byte_start: None,
5707 byte_end: None,
5708 content_hash: None,
5709 },
5710 ],
5711 runs: vec![],
5712 steps: vec![],
5713 actions: vec![],
5714 optional_backfills: vec![],
5715 vec_inserts: vec![],
5716 operational_writes: vec![],
5717 })
5718 .expect("write");
5719
5720 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5721 let count: i64 = conn
5722 .query_row(
5723 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5724 [],
5725 |row| row.get(0),
5726 )
5727 .expect("fts count");
5728
5729 assert_eq!(count, 3, "three chunks must produce three FTS rows");
5730 }
5731
5732 #[test]
5733 fn fts_resolves_mixed_fast_and_db_paths() {
5734 let db = NamedTempFile::new().expect("temporary db");
5735 let writer = WriterActor::start(
5736 db.path(),
5737 Arc::new(SchemaManager::new()),
5738 ProvenanceMode::Warn,
5739 Arc::new(TelemetryCounters::default()),
5740 )
5741 .expect("writer");
5742
5743 writer
5745 .submit(WriteRequest {
5746 label: "seed".to_owned(),
5747 nodes: vec![NodeInsert {
5748 row_id: "row-existing".to_owned(),
5749 logical_id: "existing-node".to_owned(),
5750 kind: "Archive".to_owned(),
5751 properties: "{}".to_owned(),
5752 source_ref: Some("src-1".to_owned()),
5753 upsert: false,
5754 chunk_policy: ChunkPolicy::Preserve,
5755 content_ref: None,
5756 }],
5757 node_retires: vec![],
5758 edges: vec![],
5759 edge_retires: vec![],
5760 chunks: vec![],
5761 runs: vec![],
5762 steps: vec![],
5763 actions: vec![],
5764 optional_backfills: vec![],
5765 vec_inserts: vec![],
5766 operational_writes: vec![],
5767 })
5768 .expect("seed");
5769
5770 writer
5772 .submit(WriteRequest {
5773 label: "mixed".to_owned(),
5774 nodes: vec![NodeInsert {
5775 row_id: "row-new".to_owned(),
5776 logical_id: "new-node".to_owned(),
5777 kind: "Inbox".to_owned(),
5778 properties: "{}".to_owned(),
5779 source_ref: Some("src-2".to_owned()),
5780 upsert: false,
5781 chunk_policy: ChunkPolicy::Preserve,
5782 content_ref: None,
5783 }],
5784 node_retires: vec![],
5785 edges: vec![],
5786 edge_retires: vec![],
5787 chunks: vec![
5788 ChunkInsert {
5789 id: "chunk-fast".to_owned(),
5790 node_logical_id: "new-node".to_owned(),
5791 text_content: "new content".to_owned(),
5792 byte_start: None,
5793 byte_end: None,
5794 content_hash: None,
5795 },
5796 ChunkInsert {
5797 id: "chunk-db".to_owned(),
5798 node_logical_id: "existing-node".to_owned(),
5799 text_content: "archive content".to_owned(),
5800 byte_start: None,
5801 byte_end: None,
5802 content_hash: None,
5803 },
5804 ],
5805 runs: vec![],
5806 steps: vec![],
5807 actions: vec![],
5808 optional_backfills: vec![],
5809 vec_inserts: vec![],
5810 operational_writes: vec![],
5811 })
5812 .expect("mixed write");
5813
5814 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5815 let fast_kind: String = conn
5816 .query_row(
5817 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
5818 [],
5819 |row| row.get(0),
5820 )
5821 .expect("fast path fts row");
5822 let db_kind: String = conn
5823 .query_row(
5824 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
5825 [],
5826 |row| row.get(0),
5827 )
5828 .expect("db path fts row");
5829
5830 assert_eq!(fast_kind, "Inbox");
5831 assert_eq!(db_kind, "Archive");
5832 }
5833
5834 #[test]
5835 fn prepare_write_rejects_empty_chunk_text() {
5836 let db = NamedTempFile::new().expect("temporary db");
5837 let writer = WriterActor::start(
5838 db.path(),
5839 Arc::new(SchemaManager::new()),
5840 ProvenanceMode::Warn,
5841 Arc::new(TelemetryCounters::default()),
5842 )
5843 .expect("writer");
5844
5845 let result = writer.submit(WriteRequest {
5846 label: "test".to_owned(),
5847 nodes: vec![NodeInsert {
5848 row_id: "row-1".to_owned(),
5849 logical_id: "node-1".to_owned(),
5850 kind: "Note".to_owned(),
5851 properties: "{}".to_owned(),
5852 source_ref: None,
5853 upsert: false,
5854 chunk_policy: ChunkPolicy::Preserve,
5855 content_ref: None,
5856 }],
5857 node_retires: vec![],
5858 edges: vec![],
5859 edge_retires: vec![],
5860 chunks: vec![ChunkInsert {
5861 id: "chunk-1".to_owned(),
5862 node_logical_id: "node-1".to_owned(),
5863 text_content: String::new(),
5864 byte_start: None,
5865 byte_end: None,
5866 content_hash: None,
5867 }],
5868 runs: vec![],
5869 steps: vec![],
5870 actions: vec![],
5871 optional_backfills: vec![],
5872 vec_inserts: vec![],
5873 operational_writes: vec![],
5874 });
5875
5876 assert!(
5877 matches!(result, Err(EngineError::InvalidWrite(_))),
5878 "empty text_content must be rejected"
5879 );
5880 }
5881
5882 #[test]
5883 fn receipt_reports_zero_backfills_when_none_submitted() {
5884 let db = NamedTempFile::new().expect("temporary db");
5885 let writer = WriterActor::start(
5886 db.path(),
5887 Arc::new(SchemaManager::new()),
5888 ProvenanceMode::Warn,
5889 Arc::new(TelemetryCounters::default()),
5890 )
5891 .expect("writer");
5892
5893 let receipt = writer
5894 .submit(WriteRequest {
5895 label: "test".to_owned(),
5896 nodes: vec![NodeInsert {
5897 row_id: "row-1".to_owned(),
5898 logical_id: "node-1".to_owned(),
5899 kind: "Note".to_owned(),
5900 properties: "{}".to_owned(),
5901 source_ref: Some("src-1".to_owned()),
5902 upsert: false,
5903 chunk_policy: ChunkPolicy::Preserve,
5904 content_ref: None,
5905 }],
5906 node_retires: vec![],
5907 edges: vec![],
5908 edge_retires: vec![],
5909 chunks: vec![],
5910 runs: vec![],
5911 steps: vec![],
5912 actions: vec![],
5913 optional_backfills: vec![],
5914 vec_inserts: vec![],
5915 operational_writes: vec![],
5916 })
5917 .expect("write");
5918
5919 assert_eq!(receipt.optional_backfill_count, 0);
5920 }
5921
5922 #[test]
5923 fn receipt_reports_correct_backfill_count() {
5924 let db = NamedTempFile::new().expect("temporary db");
5925 let writer = WriterActor::start(
5926 db.path(),
5927 Arc::new(SchemaManager::new()),
5928 ProvenanceMode::Warn,
5929 Arc::new(TelemetryCounters::default()),
5930 )
5931 .expect("writer");
5932
5933 let receipt = writer
5934 .submit(WriteRequest {
5935 label: "test".to_owned(),
5936 nodes: vec![NodeInsert {
5937 row_id: "row-1".to_owned(),
5938 logical_id: "node-1".to_owned(),
5939 kind: "Note".to_owned(),
5940 properties: "{}".to_owned(),
5941 source_ref: Some("src-1".to_owned()),
5942 upsert: false,
5943 chunk_policy: ChunkPolicy::Preserve,
5944 content_ref: None,
5945 }],
5946 node_retires: vec![],
5947 edges: vec![],
5948 edge_retires: vec![],
5949 chunks: vec![],
5950 runs: vec![],
5951 steps: vec![],
5952 actions: vec![],
5953 optional_backfills: vec![
5954 OptionalProjectionTask {
5955 target: ProjectionTarget::Fts,
5956 payload: "p1".to_owned(),
5957 },
5958 OptionalProjectionTask {
5959 target: ProjectionTarget::Vec,
5960 payload: "p2".to_owned(),
5961 },
5962 OptionalProjectionTask {
5963 target: ProjectionTarget::All,
5964 payload: "p3".to_owned(),
5965 },
5966 ],
5967 vec_inserts: vec![],
5968 operational_writes: vec![],
5969 })
5970 .expect("write");
5971
5972 assert_eq!(receipt.optional_backfill_count, 3);
5973 }
5974
5975 #[test]
5976 fn backfill_tasks_are_not_executed_during_write() {
5977 let db = NamedTempFile::new().expect("temporary db");
5978 let writer = WriterActor::start(
5979 db.path(),
5980 Arc::new(SchemaManager::new()),
5981 ProvenanceMode::Warn,
5982 Arc::new(TelemetryCounters::default()),
5983 )
5984 .expect("writer");
5985
5986 writer
5989 .submit(WriteRequest {
5990 label: "test".to_owned(),
5991 nodes: vec![NodeInsert {
5992 row_id: "row-1".to_owned(),
5993 logical_id: "node-1".to_owned(),
5994 kind: "Note".to_owned(),
5995 properties: "{}".to_owned(),
5996 source_ref: Some("src-1".to_owned()),
5997 upsert: false,
5998 chunk_policy: ChunkPolicy::Preserve,
5999 content_ref: None,
6000 }],
6001 node_retires: vec![],
6002 edges: vec![],
6003 edge_retires: vec![],
6004 chunks: vec![ChunkInsert {
6005 id: "chunk-1".to_owned(),
6006 node_logical_id: "node-1".to_owned(),
6007 text_content: "required text".to_owned(),
6008 byte_start: None,
6009 byte_end: None,
6010 content_hash: None,
6011 }],
6012 runs: vec![],
6013 steps: vec![],
6014 actions: vec![],
6015 optional_backfills: vec![OptionalProjectionTask {
6016 target: ProjectionTarget::Fts,
6017 payload: "backfill-payload".to_owned(),
6018 }],
6019 vec_inserts: vec![],
6020 operational_writes: vec![],
6021 })
6022 .expect("write");
6023
6024 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6025 let count: i64 = conn
6026 .query_row(
6027 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6028 [],
6029 |row| row.get(0),
6030 )
6031 .expect("fts count");
6032
6033 assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6034 }
6035
6036 #[test]
6037 fn fts_row_uses_new_kind_after_node_replace() {
6038 let db = NamedTempFile::new().expect("temporary db");
6039 let writer = WriterActor::start(
6040 db.path(),
6041 Arc::new(SchemaManager::new()),
6042 ProvenanceMode::Warn,
6043 Arc::new(TelemetryCounters::default()),
6044 )
6045 .expect("writer");
6046
6047 writer
6049 .submit(WriteRequest {
6050 label: "v1".to_owned(),
6051 nodes: vec![NodeInsert {
6052 row_id: "row-1".to_owned(),
6053 logical_id: "node-1".to_owned(),
6054 kind: "Note".to_owned(),
6055 properties: "{}".to_owned(),
6056 source_ref: Some("src-1".to_owned()),
6057 upsert: false,
6058 chunk_policy: ChunkPolicy::Preserve,
6059 content_ref: None,
6060 }],
6061 node_retires: vec![],
6062 edges: vec![],
6063 edge_retires: vec![],
6064 chunks: vec![ChunkInsert {
6065 id: "chunk-v1".to_owned(),
6066 node_logical_id: "node-1".to_owned(),
6067 text_content: "original".to_owned(),
6068 byte_start: None,
6069 byte_end: None,
6070 content_hash: None,
6071 }],
6072 runs: vec![],
6073 steps: vec![],
6074 actions: vec![],
6075 optional_backfills: vec![],
6076 vec_inserts: vec![],
6077 operational_writes: vec![],
6078 })
6079 .expect("v1 write");
6080
6081 writer
6083 .submit(WriteRequest {
6084 label: "v2".to_owned(),
6085 nodes: vec![NodeInsert {
6086 row_id: "row-2".to_owned(),
6087 logical_id: "node-1".to_owned(),
6088 kind: "Meeting".to_owned(),
6089 properties: "{}".to_owned(),
6090 source_ref: Some("src-2".to_owned()),
6091 upsert: true,
6092 chunk_policy: ChunkPolicy::Replace,
6093 content_ref: None,
6094 }],
6095 node_retires: vec![],
6096 edges: vec![],
6097 edge_retires: vec![],
6098 chunks: vec![ChunkInsert {
6099 id: "chunk-v2".to_owned(),
6100 node_logical_id: "node-1".to_owned(),
6101 text_content: "updated".to_owned(),
6102 byte_start: None,
6103 byte_end: None,
6104 content_hash: None,
6105 }],
6106 runs: vec![],
6107 steps: vec![],
6108 actions: vec![],
6109 optional_backfills: vec![],
6110 vec_inserts: vec![],
6111 operational_writes: vec![],
6112 })
6113 .expect("v2 write");
6114
6115 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6116
6117 let old_count: i64 = conn
6119 .query_row(
6120 "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6121 [],
6122 |row| row.get(0),
6123 )
6124 .expect("old fts count");
6125 assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6126
6127 let new_kind: String = conn
6129 .query_row(
6130 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6131 [],
6132 |row| row.get(0),
6133 )
6134 .expect("new fts row");
6135 assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6136 }
6137
6138 #[test]
6141 fn vec_insert_empty_chunk_id_is_rejected() {
6142 let db = NamedTempFile::new().expect("temporary db");
6143 let writer = WriterActor::start(
6144 db.path(),
6145 Arc::new(SchemaManager::new()),
6146 ProvenanceMode::Warn,
6147 Arc::new(TelemetryCounters::default()),
6148 )
6149 .expect("writer");
6150 let result = writer.submit(WriteRequest {
6151 label: "vec-test".to_owned(),
6152 nodes: vec![],
6153 node_retires: vec![],
6154 edges: vec![],
6155 edge_retires: vec![],
6156 chunks: vec![],
6157 runs: vec![],
6158 steps: vec![],
6159 actions: vec![],
6160 optional_backfills: vec![],
6161 vec_inserts: vec![VecInsert {
6162 chunk_id: String::new(),
6163 embedding: vec![0.1, 0.2, 0.3],
6164 }],
6165 operational_writes: vec![],
6166 });
6167 assert!(
6168 matches!(result, Err(EngineError::InvalidWrite(_))),
6169 "empty chunk_id in VecInsert must be rejected"
6170 );
6171 }
6172
6173 #[test]
6174 fn vec_insert_empty_embedding_is_rejected() {
6175 let db = NamedTempFile::new().expect("temporary db");
6176 let writer = WriterActor::start(
6177 db.path(),
6178 Arc::new(SchemaManager::new()),
6179 ProvenanceMode::Warn,
6180 Arc::new(TelemetryCounters::default()),
6181 )
6182 .expect("writer");
6183 let result = writer.submit(WriteRequest {
6184 label: "vec-test".to_owned(),
6185 nodes: vec![],
6186 node_retires: vec![],
6187 edges: vec![],
6188 edge_retires: vec![],
6189 chunks: vec![],
6190 runs: vec![],
6191 steps: vec![],
6192 actions: vec![],
6193 optional_backfills: vec![],
6194 vec_inserts: vec![VecInsert {
6195 chunk_id: "chunk-1".to_owned(),
6196 embedding: vec![],
6197 }],
6198 operational_writes: vec![],
6199 });
6200 assert!(
6201 matches!(result, Err(EngineError::InvalidWrite(_))),
6202 "empty embedding in VecInsert must be rejected"
6203 );
6204 }
6205
6206 #[test]
6207 fn vec_insert_noop_without_feature() {
6208 let db = NamedTempFile::new().expect("temporary db");
6211 let writer = WriterActor::start(
6212 db.path(),
6213 Arc::new(SchemaManager::new()),
6214 ProvenanceMode::Warn,
6215 Arc::new(TelemetryCounters::default()),
6216 )
6217 .expect("writer");
6218 let result = writer.submit(WriteRequest {
6219 label: "vec-noop".to_owned(),
6220 nodes: vec![],
6221 node_retires: vec![],
6222 edges: vec![],
6223 edge_retires: vec![],
6224 chunks: vec![],
6225 runs: vec![],
6226 steps: vec![],
6227 actions: vec![],
6228 optional_backfills: vec![],
6229 vec_inserts: vec![VecInsert {
6230 chunk_id: "chunk-noop".to_owned(),
6231 embedding: vec![1.0, 2.0, 3.0],
6232 }],
6233 operational_writes: vec![],
6234 });
6235 #[cfg(not(feature = "sqlite-vec"))]
6236 result.expect("noop VecInsert without feature must succeed");
6237 #[cfg(feature = "sqlite-vec")]
6239 let _ = result;
6240 }
6241
6242 #[cfg(feature = "sqlite-vec")]
6243 #[test]
6244 fn node_retire_preserves_vec_rows_for_later_restore() {
6245 use crate::sqlite::open_connection_with_vec;
6246
6247 let db = NamedTempFile::new().expect("temporary db");
6248 let schema_manager = Arc::new(SchemaManager::new());
6249
6250 {
6251 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6252 schema_manager.bootstrap(&conn).expect("bootstrap");
6253 }
6254
6255 let writer = WriterActor::start(
6256 db.path(),
6257 Arc::clone(&schema_manager),
6258 ProvenanceMode::Warn,
6259 Arc::new(TelemetryCounters::default()),
6260 )
6261 .expect("writer");
6262
6263 writer
6265 .submit(WriteRequest {
6266 label: "setup".to_owned(),
6267 nodes: vec![NodeInsert {
6268 row_id: "row-retire-vec".to_owned(),
6269 logical_id: "node-retire-vec".to_owned(),
6270 kind: "Doc".to_owned(),
6271 properties: "{}".to_owned(),
6272 source_ref: Some("src".to_owned()),
6273 upsert: false,
6274 chunk_policy: ChunkPolicy::Preserve,
6275 content_ref: None,
6276 }],
6277 node_retires: vec![],
6278 edges: vec![],
6279 edge_retires: vec![],
6280 chunks: vec![ChunkInsert {
6281 id: "chunk-retire-vec".to_owned(),
6282 node_logical_id: "node-retire-vec".to_owned(),
6283 text_content: "text".to_owned(),
6284 byte_start: None,
6285 byte_end: None,
6286 content_hash: None,
6287 }],
6288 runs: vec![],
6289 steps: vec![],
6290 actions: vec![],
6291 optional_backfills: vec![],
6292 vec_inserts: vec![VecInsert {
6293 chunk_id: "chunk-retire-vec".to_owned(),
6294 embedding: vec![0.1, 0.2, 0.3],
6295 }],
6296 operational_writes: vec![],
6297 })
6298 .expect("setup write");
6299
6300 writer
6302 .submit(WriteRequest {
6303 label: "retire".to_owned(),
6304 nodes: vec![],
6305 node_retires: vec![NodeRetire {
6306 logical_id: "node-retire-vec".to_owned(),
6307 source_ref: Some("src".to_owned()),
6308 }],
6309 edges: vec![],
6310 edge_retires: vec![],
6311 chunks: vec![],
6312 runs: vec![],
6313 steps: vec![],
6314 actions: vec![],
6315 optional_backfills: vec![],
6316 vec_inserts: vec![],
6317 operational_writes: vec![],
6318 })
6319 .expect("retire write");
6320
6321 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6322 let count: i64 = conn
6323 .query_row(
6324 "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-retire-vec'",
6325 [],
6326 |row| row.get(0),
6327 )
6328 .expect("count");
6329 assert_eq!(
6330 count, 1,
6331 "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6332 );
6333 }
6334
6335 #[cfg(feature = "sqlite-vec")]
6336 #[test]
6337 #[allow(clippy::too_many_lines)]
6338 fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6339 use crate::sqlite::open_connection_with_vec;
6340
6341 let db = NamedTempFile::new().expect("temporary db");
6342 let schema_manager = Arc::new(SchemaManager::new());
6343
6344 {
6345 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6346 schema_manager.bootstrap(&conn).expect("bootstrap");
6347 }
6348
6349 let writer = WriterActor::start(
6350 db.path(),
6351 Arc::clone(&schema_manager),
6352 ProvenanceMode::Warn,
6353 Arc::new(TelemetryCounters::default()),
6354 )
6355 .expect("writer");
6356
6357 writer
6359 .submit(WriteRequest {
6360 label: "v1".to_owned(),
6361 nodes: vec![NodeInsert {
6362 row_id: "row-replace-v1".to_owned(),
6363 logical_id: "node-replace-vec".to_owned(),
6364 kind: "Doc".to_owned(),
6365 properties: "{}".to_owned(),
6366 source_ref: Some("src".to_owned()),
6367 upsert: false,
6368 chunk_policy: ChunkPolicy::Preserve,
6369 content_ref: None,
6370 }],
6371 node_retires: vec![],
6372 edges: vec![],
6373 edge_retires: vec![],
6374 chunks: vec![ChunkInsert {
6375 id: "chunk-replace-A".to_owned(),
6376 node_logical_id: "node-replace-vec".to_owned(),
6377 text_content: "version one".to_owned(),
6378 byte_start: None,
6379 byte_end: None,
6380 content_hash: None,
6381 }],
6382 runs: vec![],
6383 steps: vec![],
6384 actions: vec![],
6385 optional_backfills: vec![],
6386 vec_inserts: vec![VecInsert {
6387 chunk_id: "chunk-replace-A".to_owned(),
6388 embedding: vec![0.1, 0.2, 0.3],
6389 }],
6390 operational_writes: vec![],
6391 })
6392 .expect("v1 write");
6393
6394 writer
6396 .submit(WriteRequest {
6397 label: "v2".to_owned(),
6398 nodes: vec![NodeInsert {
6399 row_id: "row-replace-v2".to_owned(),
6400 logical_id: "node-replace-vec".to_owned(),
6401 kind: "Doc".to_owned(),
6402 properties: "{}".to_owned(),
6403 source_ref: Some("src".to_owned()),
6404 upsert: true,
6405 chunk_policy: ChunkPolicy::Replace,
6406 content_ref: None,
6407 }],
6408 node_retires: vec![],
6409 edges: vec![],
6410 edge_retires: vec![],
6411 chunks: vec![ChunkInsert {
6412 id: "chunk-replace-B".to_owned(),
6413 node_logical_id: "node-replace-vec".to_owned(),
6414 text_content: "version two".to_owned(),
6415 byte_start: None,
6416 byte_end: None,
6417 content_hash: None,
6418 }],
6419 runs: vec![],
6420 steps: vec![],
6421 actions: vec![],
6422 optional_backfills: vec![],
6423 vec_inserts: vec![VecInsert {
6424 chunk_id: "chunk-replace-B".to_owned(),
6425 embedding: vec![0.4, 0.5, 0.6],
6426 }],
6427 operational_writes: vec![],
6428 })
6429 .expect("v2 write");
6430
6431 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6432 let count_a: i64 = conn
6433 .query_row(
6434 "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-replace-A'",
6435 [],
6436 |row| row.get(0),
6437 )
6438 .expect("count A");
6439 let count_b: i64 = conn
6440 .query_row(
6441 "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-replace-B'",
6442 [],
6443 |row| row.get(0),
6444 )
6445 .expect("count B");
6446 assert_eq!(
6447 count_a, 0,
6448 "old vec row (chunk-A) must be deleted on Replace"
6449 );
6450 assert_eq!(
6451 count_b, 1,
6452 "new vec row (chunk-B) must be present after Replace"
6453 );
6454 }
6455
6456 #[cfg(feature = "sqlite-vec")]
6457 #[test]
6458 fn vec_insert_is_persisted_when_feature_enabled() {
6459 use crate::sqlite::open_connection_with_vec;
6460
6461 let db = NamedTempFile::new().expect("temporary db");
6462 let schema_manager = Arc::new(SchemaManager::new());
6463
6464 {
6466 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6467 schema_manager.bootstrap(&conn).expect("bootstrap");
6468 }
6469
6470 let writer = WriterActor::start(
6471 db.path(),
6472 Arc::clone(&schema_manager),
6473 ProvenanceMode::Warn,
6474 Arc::new(TelemetryCounters::default()),
6475 )
6476 .expect("writer");
6477
6478 writer
6480 .submit(WriteRequest {
6481 label: "vec-insert".to_owned(),
6482 nodes: vec![NodeInsert {
6483 row_id: "row-vec".to_owned(),
6484 logical_id: "node-vec".to_owned(),
6485 kind: "Document".to_owned(),
6486 properties: "{}".to_owned(),
6487 source_ref: Some("test".to_owned()),
6488 upsert: false,
6489 chunk_policy: ChunkPolicy::Preserve,
6490 content_ref: None,
6491 }],
6492 node_retires: vec![],
6493 edges: vec![],
6494 edge_retires: vec![],
6495 chunks: vec![ChunkInsert {
6496 id: "chunk-vec".to_owned(),
6497 node_logical_id: "node-vec".to_owned(),
6498 text_content: "test text".to_owned(),
6499 byte_start: None,
6500 byte_end: None,
6501 content_hash: None,
6502 }],
6503 runs: vec![],
6504 steps: vec![],
6505 actions: vec![],
6506 optional_backfills: vec![],
6507 vec_inserts: vec![VecInsert {
6508 chunk_id: "chunk-vec".to_owned(),
6509 embedding: vec![0.1, 0.2, 0.3],
6510 }],
6511 operational_writes: vec![],
6512 })
6513 .expect("vec insert write");
6514
6515 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6516 let count: i64 = conn
6517 .query_row(
6518 "SELECT count(*) FROM vec_document WHERE chunk_id = 'chunk-vec'",
6519 [],
6520 |row| row.get(0),
6521 )
6522 .expect("count");
6523 assert_eq!(count, 1, "VecInsert must persist a row in vec_document");
6524 }
6525
6526 #[test]
6529 fn write_request_exceeding_node_limit_is_rejected() {
6530 let nodes: Vec<NodeInsert> = (0..10_001)
6531 .map(|i| NodeInsert {
6532 row_id: format!("row-{i}"),
6533 logical_id: format!("lg-{i}"),
6534 kind: "Note".to_owned(),
6535 properties: "{}".to_owned(),
6536 source_ref: None,
6537 upsert: false,
6538 chunk_policy: ChunkPolicy::Preserve,
6539 content_ref: None,
6540 })
6541 .collect();
6542
6543 let request = WriteRequest {
6544 label: "too-many-nodes".to_owned(),
6545 nodes,
6546 node_retires: vec![],
6547 edges: vec![],
6548 edge_retires: vec![],
6549 chunks: vec![],
6550 runs: vec![],
6551 steps: vec![],
6552 actions: vec![],
6553 optional_backfills: vec![],
6554 vec_inserts: vec![],
6555 operational_writes: vec![],
6556 };
6557
6558 let result = prepare_write(request, ProvenanceMode::Warn)
6559 .map(|_| ())
6560 .map_err(|e| format!("{e}"));
6561 assert!(
6562 matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6563 "exceeding node limit must return InvalidWrite: got {result:?}"
6564 );
6565 }
6566
6567 #[test]
6568 fn write_request_exceeding_total_limit_is_rejected() {
6569 let request = WriteRequest {
6573 label: "too-many-total".to_owned(),
6574 nodes: (0..10_000)
6575 .map(|i| NodeInsert {
6576 row_id: format!("row-{i}"),
6577 logical_id: format!("lg-{i}"),
6578 kind: "Note".to_owned(),
6579 properties: "{}".to_owned(),
6580 source_ref: None,
6581 upsert: false,
6582 chunk_policy: ChunkPolicy::Preserve,
6583 content_ref: None,
6584 })
6585 .collect(),
6586 node_retires: vec![],
6587 edges: (0..10_000)
6588 .map(|i| EdgeInsert {
6589 row_id: format!("edge-row-{i}"),
6590 logical_id: format!("edge-lg-{i}"),
6591 kind: "link".to_owned(),
6592 source_logical_id: format!("lg-{i}"),
6593 target_logical_id: format!("lg-{}", i + 1),
6594 properties: "{}".to_owned(),
6595 source_ref: None,
6596 upsert: false,
6597 })
6598 .collect(),
6599 edge_retires: vec![],
6600 chunks: (0..50_000)
6601 .map(|i| ChunkInsert {
6602 id: format!("chunk-{i}"),
6603 node_logical_id: "lg-0".to_owned(),
6604 text_content: "text".to_owned(),
6605 byte_start: None,
6606 byte_end: None,
6607 content_hash: None,
6608 })
6609 .collect(),
6610 runs: vec![],
6611 steps: vec![],
6612 actions: vec![],
6613 optional_backfills: vec![],
6614 vec_inserts: (0..20_001)
6615 .map(|i| VecInsert {
6616 chunk_id: format!("vec-chunk-{i}"),
6617 embedding: vec![0.1],
6618 })
6619 .collect(),
6620 operational_writes: (0..10_000)
6621 .map(|i| OperationalWrite::Append {
6622 collection: format!("col-{i}"),
6623 record_key: format!("key-{i}"),
6624 payload_json: "{}".to_owned(),
6625 source_ref: None,
6626 })
6627 .collect(),
6628 };
6629
6630 let result = prepare_write(request, ProvenanceMode::Warn)
6631 .map(|_| ())
6632 .map_err(|e| format!("{e}"));
6633 assert!(
6634 matches!(result, Err(ref msg) if msg.contains("too many total items")),
6635 "exceeding total item limit must return InvalidWrite: got {result:?}"
6636 );
6637 }
6638
6639 #[test]
6640 fn write_request_within_limits_succeeds() {
6641 let db = NamedTempFile::new().expect("temporary db");
6642 let writer = WriterActor::start(
6643 db.path(),
6644 Arc::new(SchemaManager::new()),
6645 ProvenanceMode::Warn,
6646 Arc::new(TelemetryCounters::default()),
6647 )
6648 .expect("writer");
6649
6650 let result = writer.submit(WriteRequest {
6651 label: "within-limits".to_owned(),
6652 nodes: vec![NodeInsert {
6653 row_id: "row-1".to_owned(),
6654 logical_id: "lg-1".to_owned(),
6655 kind: "Note".to_owned(),
6656 properties: "{}".to_owned(),
6657 source_ref: None,
6658 upsert: false,
6659 chunk_policy: ChunkPolicy::Preserve,
6660 content_ref: None,
6661 }],
6662 node_retires: vec![],
6663 edges: vec![],
6664 edge_retires: vec![],
6665 chunks: vec![],
6666 runs: vec![],
6667 steps: vec![],
6668 actions: vec![],
6669 optional_backfills: vec![],
6670 vec_inserts: vec![],
6671 operational_writes: vec![],
6672 });
6673
6674 assert!(
6675 result.is_ok(),
6676 "write request within limits must succeed: got {result:?}"
6677 );
6678 }
6679
6680 #[test]
6681 fn property_fts_rows_created_on_node_insert() {
6682 let db = NamedTempFile::new().expect("temporary db");
6683 let schema = Arc::new(SchemaManager::new());
6685 {
6686 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6687 schema.bootstrap(&conn).expect("bootstrap");
6688 conn.execute(
6689 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6690 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
6691 [],
6692 )
6693 .expect("register schema");
6694 }
6695 let writer = WriterActor::start(
6696 db.path(),
6697 Arc::clone(&schema),
6698 ProvenanceMode::Warn,
6699 Arc::new(TelemetryCounters::default()),
6700 )
6701 .expect("writer");
6702
6703 writer
6704 .submit(WriteRequest {
6705 label: "goal-insert".to_owned(),
6706 nodes: vec![NodeInsert {
6707 row_id: "row-1".to_owned(),
6708 logical_id: "goal-1".to_owned(),
6709 kind: "Goal".to_owned(),
6710 properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
6711 .to_owned(),
6712 source_ref: Some("src-1".to_owned()),
6713 upsert: false,
6714 chunk_policy: ChunkPolicy::Preserve,
6715 content_ref: None,
6716 }],
6717 node_retires: vec![],
6718 edges: vec![],
6719 edge_retires: vec![],
6720 chunks: vec![],
6721 runs: vec![],
6722 steps: vec![],
6723 actions: vec![],
6724 optional_backfills: vec![],
6725 vec_inserts: vec![],
6726 operational_writes: vec![],
6727 })
6728 .expect("write");
6729
6730 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6731 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
6732 let text: String = conn
6733 .query_row(
6734 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6735 [],
6736 |row| row.get(0),
6737 )
6738 .expect("property FTS row must exist");
6739 assert_eq!(text, "Ship v2 Launch the redesign");
6740 }
6741
6742 #[test]
6743 fn property_fts_rows_replaced_on_upsert() {
6744 let db = NamedTempFile::new().expect("temporary db");
6745 let schema = Arc::new(SchemaManager::new());
6746 {
6747 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6748 schema.bootstrap(&conn).expect("bootstrap");
6749 conn.execute(
6750 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6751 VALUES ('Goal', '[\"$.name\"]', ' ')",
6752 [],
6753 )
6754 .expect("register schema");
6755 }
6756 let writer = WriterActor::start(
6757 db.path(),
6758 Arc::clone(&schema),
6759 ProvenanceMode::Warn,
6760 Arc::new(TelemetryCounters::default()),
6761 )
6762 .expect("writer");
6763
6764 writer
6766 .submit(WriteRequest {
6767 label: "insert".to_owned(),
6768 nodes: vec![NodeInsert {
6769 row_id: "row-1".to_owned(),
6770 logical_id: "goal-1".to_owned(),
6771 kind: "Goal".to_owned(),
6772 properties: r#"{"name":"Alpha"}"#.to_owned(),
6773 source_ref: Some("src-1".to_owned()),
6774 upsert: false,
6775 chunk_policy: ChunkPolicy::Preserve,
6776 content_ref: None,
6777 }],
6778 node_retires: vec![],
6779 edges: vec![],
6780 edge_retires: vec![],
6781 chunks: vec![],
6782 runs: vec![],
6783 steps: vec![],
6784 actions: vec![],
6785 optional_backfills: vec![],
6786 vec_inserts: vec![],
6787 operational_writes: vec![],
6788 })
6789 .expect("insert");
6790
6791 writer
6793 .submit(WriteRequest {
6794 label: "upsert".to_owned(),
6795 nodes: vec![NodeInsert {
6796 row_id: "row-2".to_owned(),
6797 logical_id: "goal-1".to_owned(),
6798 kind: "Goal".to_owned(),
6799 properties: r#"{"name":"Beta"}"#.to_owned(),
6800 source_ref: Some("src-2".to_owned()),
6801 upsert: true,
6802 chunk_policy: ChunkPolicy::Preserve,
6803 content_ref: None,
6804 }],
6805 node_retires: vec![],
6806 edges: vec![],
6807 edge_retires: vec![],
6808 chunks: vec![],
6809 runs: vec![],
6810 steps: vec![],
6811 actions: vec![],
6812 optional_backfills: vec![],
6813 vec_inserts: vec![],
6814 operational_writes: vec![],
6815 })
6816 .expect("upsert");
6817
6818 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6819 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
6820 let count: i64 = conn
6821 .query_row(
6822 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6823 [],
6824 |row| row.get(0),
6825 )
6826 .expect("count");
6827 assert_eq!(
6828 count, 1,
6829 "must have exactly one property FTS row after upsert"
6830 );
6831
6832 let text: String = conn
6833 .query_row(
6834 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6835 [],
6836 |row| row.get(0),
6837 )
6838 .expect("text");
6839 assert_eq!(text, "Beta", "property FTS must reflect updated properties");
6840 }
6841
6842 #[test]
6843 fn property_fts_rows_deleted_on_retire() {
6844 let db = NamedTempFile::new().expect("temporary db");
6845 let schema = Arc::new(SchemaManager::new());
6846 {
6847 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6848 schema.bootstrap(&conn).expect("bootstrap");
6849 conn.execute(
6850 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6851 VALUES ('Goal', '[\"$.name\"]', ' ')",
6852 [],
6853 )
6854 .expect("register schema");
6855 }
6856 let writer = WriterActor::start(
6857 db.path(),
6858 Arc::clone(&schema),
6859 ProvenanceMode::Warn,
6860 Arc::new(TelemetryCounters::default()),
6861 )
6862 .expect("writer");
6863
6864 writer
6866 .submit(WriteRequest {
6867 label: "insert".to_owned(),
6868 nodes: vec![NodeInsert {
6869 row_id: "row-1".to_owned(),
6870 logical_id: "goal-1".to_owned(),
6871 kind: "Goal".to_owned(),
6872 properties: r#"{"name":"Alpha"}"#.to_owned(),
6873 source_ref: Some("src-1".to_owned()),
6874 upsert: false,
6875 chunk_policy: ChunkPolicy::Preserve,
6876 content_ref: None,
6877 }],
6878 node_retires: vec![],
6879 edges: vec![],
6880 edge_retires: vec![],
6881 chunks: vec![],
6882 runs: vec![],
6883 steps: vec![],
6884 actions: vec![],
6885 optional_backfills: vec![],
6886 vec_inserts: vec![],
6887 operational_writes: vec![],
6888 })
6889 .expect("insert");
6890
6891 writer
6893 .submit(WriteRequest {
6894 label: "retire".to_owned(),
6895 nodes: vec![],
6896 node_retires: vec![NodeRetire {
6897 logical_id: "goal-1".to_owned(),
6898 source_ref: Some("forget-1".to_owned()),
6899 }],
6900 edges: vec![],
6901 edge_retires: vec![],
6902 chunks: vec![],
6903 runs: vec![],
6904 steps: vec![],
6905 actions: vec![],
6906 optional_backfills: vec![],
6907 vec_inserts: vec![],
6908 operational_writes: vec![],
6909 })
6910 .expect("retire");
6911
6912 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6913 let table = fathomdb_schema::fts_kind_table_name("Goal");
6914 let count: i64 = conn
6915 .query_row(
6916 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
6917 [],
6918 |row| row.get(0),
6919 )
6920 .expect("count");
6921 assert_eq!(count, 0, "property FTS row must be deleted on retire");
6922 }
6923
6924 #[test]
6925 fn no_property_fts_row_for_unregistered_kind() {
6926 let db = NamedTempFile::new().expect("temporary db");
6927 let schema = Arc::new(SchemaManager::new());
6928 {
6929 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6930 schema.bootstrap(&conn).expect("bootstrap");
6931 }
6933 let writer = WriterActor::start(
6934 db.path(),
6935 Arc::clone(&schema),
6936 ProvenanceMode::Warn,
6937 Arc::new(TelemetryCounters::default()),
6938 )
6939 .expect("writer");
6940
6941 writer
6942 .submit(WriteRequest {
6943 label: "insert".to_owned(),
6944 nodes: vec![NodeInsert {
6945 row_id: "row-1".to_owned(),
6946 logical_id: "note-1".to_owned(),
6947 kind: "Note".to_owned(),
6948 properties: r#"{"title":"hello"}"#.to_owned(),
6949 source_ref: Some("src-1".to_owned()),
6950 upsert: false,
6951 chunk_policy: ChunkPolicy::Preserve,
6952 content_ref: None,
6953 }],
6954 node_retires: vec![],
6955 edges: vec![],
6956 edge_retires: vec![],
6957 chunks: vec![],
6958 runs: vec![],
6959 steps: vec![],
6960 actions: vec![],
6961 optional_backfills: vec![],
6962 vec_inserts: vec![],
6963 operational_writes: vec![],
6964 })
6965 .expect("insert");
6966
6967 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6968 let table = fathomdb_schema::fts_kind_table_name("Note");
6969 let exists: bool = conn
6970 .query_row(
6971 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1",
6972 rusqlite::params![table],
6973 |row| row.get::<_, i64>(0),
6974 )
6975 .map(|c| c > 0)
6976 .expect("exists check");
6977 assert!(
6978 !exists,
6979 "no per-kind FTS table should be created for unregistered kind"
6980 );
6981 }
6982
6983 #[test]
6986 fn property_fts_write_goes_to_per_kind_table() {
6987 let db = NamedTempFile::new().expect("temporary db");
6989 let schema = Arc::new(SchemaManager::new());
6990 {
6991 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6992 schema.bootstrap(&conn).expect("bootstrap");
6993 conn.execute(
6994 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6995 VALUES ('Goal', '[\"$.name\"]', ' ')",
6996 [],
6997 )
6998 .expect("register schema");
6999 conn.execute_batch(
7001 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7002 node_logical_id UNINDEXED, text_content, \
7003 tokenize = 'porter unicode61 remove_diacritics 2'\
7004 )",
7005 )
7006 .expect("create per-kind table");
7007 }
7008 let writer = WriterActor::start(
7009 db.path(),
7010 Arc::clone(&schema),
7011 ProvenanceMode::Warn,
7012 Arc::new(TelemetryCounters::default()),
7013 )
7014 .expect("writer");
7015
7016 writer
7017 .submit(WriteRequest {
7018 label: "insert".to_owned(),
7019 nodes: vec![NodeInsert {
7020 row_id: "row-1".to_owned(),
7021 logical_id: "goal-1".to_owned(),
7022 kind: "Goal".to_owned(),
7023 properties: r#"{"name":"Ship v2"}"#.to_owned(),
7024 source_ref: Some("src-1".to_owned()),
7025 upsert: false,
7026 chunk_policy: ChunkPolicy::Preserve,
7027 content_ref: None,
7028 }],
7029 node_retires: vec![],
7030 edges: vec![],
7031 edge_retires: vec![],
7032 chunks: vec![],
7033 runs: vec![],
7034 steps: vec![],
7035 actions: vec![],
7036 optional_backfills: vec![],
7037 vec_inserts: vec![],
7038 operational_writes: vec![],
7039 })
7040 .expect("write");
7041
7042 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7043 let per_kind_count: i64 = conn
7045 .query_row(
7046 "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7047 [],
7048 |row| row.get(0),
7049 )
7050 .expect("per-kind count");
7051 assert_eq!(
7052 per_kind_count, 1,
7053 "per-kind table fts_props_goal must have the row"
7054 );
7055 }
7056
7057 #[test]
7058 fn property_fts_retire_removes_from_per_kind_table() {
7059 let db = NamedTempFile::new().expect("temporary db");
7061 let schema = Arc::new(SchemaManager::new());
7062 {
7063 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7064 schema.bootstrap(&conn).expect("bootstrap");
7065 conn.execute(
7066 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7067 VALUES ('Goal', '[\"$.name\"]', ' ')",
7068 [],
7069 )
7070 .expect("register schema");
7071 conn.execute_batch(
7072 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7073 node_logical_id UNINDEXED, text_content, \
7074 tokenize = 'porter unicode61 remove_diacritics 2'\
7075 )",
7076 )
7077 .expect("create per-kind table");
7078 }
7079 let writer = WriterActor::start(
7080 db.path(),
7081 Arc::clone(&schema),
7082 ProvenanceMode::Warn,
7083 Arc::new(TelemetryCounters::default()),
7084 )
7085 .expect("writer");
7086
7087 writer
7089 .submit(WriteRequest {
7090 label: "insert".to_owned(),
7091 nodes: vec![NodeInsert {
7092 row_id: "row-1".to_owned(),
7093 logical_id: "goal-1".to_owned(),
7094 kind: "Goal".to_owned(),
7095 properties: r#"{"name":"Alpha"}"#.to_owned(),
7096 source_ref: Some("src-1".to_owned()),
7097 upsert: false,
7098 chunk_policy: ChunkPolicy::Preserve,
7099 content_ref: None,
7100 }],
7101 node_retires: vec![],
7102 edges: vec![],
7103 edge_retires: vec![],
7104 chunks: vec![],
7105 runs: vec![],
7106 steps: vec![],
7107 actions: vec![],
7108 optional_backfills: vec![],
7109 vec_inserts: vec![],
7110 operational_writes: vec![],
7111 })
7112 .expect("insert");
7113
7114 writer
7116 .submit(WriteRequest {
7117 label: "retire".to_owned(),
7118 nodes: vec![],
7119 node_retires: vec![NodeRetire {
7120 logical_id: "goal-1".to_owned(),
7121 source_ref: Some("forget-1".to_owned()),
7122 }],
7123 edges: vec![],
7124 edge_retires: vec![],
7125 chunks: vec![],
7126 runs: vec![],
7127 steps: vec![],
7128 actions: vec![],
7129 optional_backfills: vec![],
7130 vec_inserts: vec![],
7131 operational_writes: vec![],
7132 })
7133 .expect("retire");
7134
7135 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7136 let per_kind_count: i64 = conn
7137 .query_row(
7138 "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7139 [],
7140 |row| row.get(0),
7141 )
7142 .expect("per-kind count");
7143 assert_eq!(
7144 per_kind_count, 0,
7145 "per-kind table row must be removed on retire"
7146 );
7147 }
7148
7149 mod extract_json_path_tests {
7150 use super::super::extract_json_path;
7151 use serde_json::json;
7152
7153 #[test]
7154 fn string_value() {
7155 let v = json!({"name": "alice"});
7156 assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7157 }
7158
7159 #[test]
7160 fn number_value() {
7161 let v = json!({"age": 42});
7162 assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7163 }
7164
7165 #[test]
7166 fn bool_value() {
7167 let v = json!({"active": true});
7168 assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7169 }
7170
7171 #[test]
7172 fn null_value() {
7173 let v = json!({"x": null});
7174 assert!(extract_json_path(&v, "$.x").is_empty());
7175 }
7176
7177 #[test]
7178 fn missing_path() {
7179 let v = json!({"name": "a"});
7180 assert!(extract_json_path(&v, "$.missing").is_empty());
7181 }
7182
7183 #[test]
7184 fn nested_path() {
7185 let v = json!({"address": {"city": "NYC"}});
7186 assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7187 }
7188
7189 #[test]
7190 fn array_of_strings() {
7191 let v = json!({"tags": ["a", "b", "c"]});
7192 assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7193 }
7194
7195 #[test]
7196 fn array_mixed_scalars() {
7197 let v = json!({"vals": ["x", 1, true]});
7198 assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7199 }
7200
7201 #[test]
7202 fn array_only_objects_returns_empty() {
7203 let v = json!({"data": [{"k": "v"}]});
7204 assert!(extract_json_path(&v, "$.data").is_empty());
7205 }
7206
7207 #[test]
7208 fn array_mixed_objects_and_scalars() {
7209 let v = json!({"data": ["keep", {"skip": true}, "also"]});
7210 assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7211 }
7212
7213 #[test]
7214 fn object_returns_empty() {
7215 let v = json!({"meta": {"k": "v"}});
7216 assert!(extract_json_path(&v, "$.meta").is_empty());
7217 }
7218
7219 #[test]
7220 fn no_prefix_returns_empty() {
7221 let v = json!({"name": "a"});
7222 assert!(extract_json_path(&v, "name").is_empty());
7223 }
7224 }
7225
7226 mod recursive_extraction_tests {
7227 use super::super::{
7228 LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7229 PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7230 };
7231 use serde_json::json;
7232
7233 fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7234 PropertyFtsSchema {
7235 paths,
7236 separator: " ".to_owned(),
7237 exclude_paths: Vec::new(),
7238 }
7239 }
7240
7241 fn schema_with_excludes(
7242 paths: Vec<PropertyPathEntry>,
7243 excludes: Vec<String>,
7244 ) -> PropertyFtsSchema {
7245 PropertyFtsSchema {
7246 paths,
7247 separator: " ".to_owned(),
7248 exclude_paths: excludes,
7249 }
7250 }
7251
7252 #[test]
7253 fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7254 let props = json!({"payload": {"b": "two", "a": "one"}});
7255 let (blob, positions, _stats) = extract_property_fts(
7256 &props,
7257 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7258 );
7259 let blob = blob.expect("blob emitted");
7260 let idx_one = blob.find("one").expect("contains 'one'");
7261 let idx_two = blob.find("two").expect("contains 'two'");
7262 assert!(
7263 idx_one < idx_two,
7264 "lex order: 'one' (key a) before 'two' (key b)"
7265 );
7266 assert_eq!(positions.len(), 2);
7267 assert_eq!(positions[0].leaf_path, "$.payload.a");
7268 assert_eq!(positions[1].leaf_path, "$.payload.b");
7269 }
7270
7271 #[test]
7272 fn recursive_extraction_walks_arrays_of_scalars() {
7273 let props = json!({"tags": ["red", "blue"]});
7274 let (_blob, positions, _stats) = extract_property_fts(
7275 &props,
7276 &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7277 );
7278 assert_eq!(positions.len(), 2);
7279 assert_eq!(positions[0].leaf_path, "$.tags[0]");
7280 assert_eq!(positions[1].leaf_path, "$.tags[1]");
7281 }
7282
7283 #[test]
7284 fn recursive_extraction_walks_arrays_of_objects() {
7285 let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7286 let (_blob, positions, _stats) = extract_property_fts(
7287 &props,
7288 &schema(vec![PropertyPathEntry::recursive("$.items")]),
7289 );
7290 assert_eq!(positions.len(), 2);
7291 assert_eq!(positions[0].leaf_path, "$.items[0].name");
7292 assert_eq!(positions[1].leaf_path, "$.items[1].name");
7293 }
7294
7295 #[test]
7296 fn recursive_extraction_stringifies_numbers_and_bools() {
7297 let props = json!({"root": {"n": 42, "ok": true}});
7298 let (blob, _positions, _stats) = extract_property_fts(
7299 &props,
7300 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7301 );
7302 let blob = blob.expect("blob emitted");
7303 assert!(blob.contains("42"));
7304 assert!(blob.contains("true"));
7305 }
7306
7307 #[test]
7308 fn recursive_extraction_skips_nulls_and_missing() {
7309 let props = json!({"root": {"x": null, "y": "present"}});
7310 let (blob, positions, _stats) = extract_property_fts(
7311 &props,
7312 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7313 );
7314 let blob = blob.expect("blob emitted");
7315 assert!(!blob.contains("null"));
7316 assert_eq!(positions.len(), 1);
7317 assert_eq!(positions[0].leaf_path, "$.root.y");
7318 }
7319
7320 #[test]
7321 fn recursive_extraction_respects_max_depth_guardrail() {
7322 let mut inner = json!("leaf-value");
7324 for _ in 0..10 {
7325 inner = json!({ "k": inner });
7326 }
7327 let props = json!({ "root": inner });
7328 let (blob, positions, stats) = extract_property_fts(
7329 &props,
7330 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7331 );
7332 assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7333 assert!(
7335 blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7336 "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7337 );
7338 let _ = positions;
7341 let _ = MAX_RECURSIVE_DEPTH;
7342 }
7343
7344 #[test]
7345 fn recursive_extraction_respects_max_bytes_guardrail() {
7346 let leaves: Vec<String> = (0..40)
7348 .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7349 .collect();
7350 let props = json!({ "root": leaves });
7351 let (blob, positions, stats) = extract_property_fts(
7352 &props,
7353 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7354 );
7355 assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7356 let blob = blob.expect("blob must still be emitted");
7357 assert!(
7358 blob.len() <= MAX_EXTRACTED_BYTES,
7359 "blob must not exceed MAX_EXTRACTED_BYTES"
7360 );
7361 for pos in &positions {
7363 assert!(pos.end_offset <= blob.len());
7364 let slice = &blob[pos.start_offset..pos.end_offset];
7365 assert!(!slice.is_empty());
7368 assert!(!slice.contains(LEAF_SEPARATOR));
7369 }
7370 assert!(!blob.ends_with(LEAF_SEPARATOR));
7372 }
7373
7374 #[test]
7375 fn recursive_extraction_respects_exclude_paths() {
7376 let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7377 let (blob, positions, stats) = extract_property_fts(
7378 &props,
7379 &schema_with_excludes(
7380 vec![PropertyPathEntry::recursive("$.payload")],
7381 vec!["$.payload.priv".to_owned()],
7382 ),
7383 );
7384 let blob = blob.expect("blob emitted");
7385 assert!(blob.contains("yes"));
7386 assert!(!blob.contains("no"));
7387 assert_eq!(positions.len(), 1);
7388 assert_eq!(positions[0].leaf_path, "$.payload.pub");
7389 assert!(stats.excluded_subtree > 0);
7390 }
7391
7392 #[test]
7393 fn position_map_entries_match_emitted_leaves_in_order() {
7394 let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7395 let (blob, positions, _stats) = extract_property_fts(
7396 &props,
7397 &schema(vec![PropertyPathEntry::recursive("$.root")]),
7398 );
7399 let blob = blob.expect("blob emitted");
7400 assert_eq!(positions.len(), 3);
7401 assert_eq!(positions[0].leaf_path, "$.root.a");
7403 assert_eq!(positions[1].leaf_path, "$.root.b");
7404 assert_eq!(positions[2].leaf_path, "$.root.c");
7405 let mut prev_end: usize = 0;
7407 for (i, pos) in positions.iter().enumerate() {
7408 assert!(pos.start_offset >= prev_end);
7409 assert!(pos.end_offset > pos.start_offset);
7410 assert!(pos.end_offset <= blob.len());
7411 let slice = &blob[pos.start_offset..pos.end_offset];
7412 match i {
7413 0 => assert_eq!(slice, "alpha"),
7414 1 => assert_eq!(slice, "bravo"),
7415 2 => assert_eq!(slice, "charlie"),
7416 _ => unreachable!(),
7417 }
7418 prev_end = pos.end_offset;
7419 }
7420 }
7421
7422 #[test]
7423 fn scalar_only_schema_produces_empty_position_map() {
7424 let props = json!({"name": "alpha", "title": "beta"});
7425 let (blob, positions, _stats) = extract_property_fts(
7426 &props,
7427 &schema(vec![
7428 PropertyPathEntry::scalar("$.name"),
7429 PropertyPathEntry::scalar("$.title"),
7430 ]),
7431 );
7432 assert_eq!(blob.as_deref(), Some("alpha beta"));
7433 assert!(
7434 positions.is_empty(),
7435 "scalar-only schema must emit no position entries"
7436 );
7437 let _: Vec<PositionEntry> = positions;
7440 }
7441
7442 fn assert_unique_start_offsets(positions: &[PositionEntry]) {
7443 let mut seen = std::collections::HashSet::new();
7444 for pos in positions {
7445 let start = pos.start_offset;
7446 assert!(
7447 seen.insert(start),
7448 "duplicate start_offset {start} in positions {positions:?}"
7449 );
7450 }
7451 }
7452
7453 #[test]
7454 fn recursive_extraction_empty_then_nonempty_in_array() {
7455 let props = json!({"payload": {"xs": ["", "x"]}});
7456 let (combined, positions, _stats) = extract_property_fts(
7457 &props,
7458 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7459 );
7460 assert_eq!(combined.as_deref(), Some("x"));
7461 assert_eq!(positions.len(), 1);
7462 assert_eq!(positions[0].leaf_path, "$.payload.xs[1]");
7463 assert_eq!(positions[0].start_offset, 0);
7464 assert_eq!(positions[0].end_offset, 1);
7465 assert_unique_start_offsets(&positions);
7466 }
7467
7468 #[test]
7469 fn recursive_extraction_two_empties_then_nonempty_in_array() {
7470 let props = json!({"payload": {"xs": ["", "", "x"]}});
7471 let (combined, positions, _stats) = extract_property_fts(
7472 &props,
7473 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7474 );
7475 assert_eq!(combined.as_deref(), Some("x"));
7476 assert_eq!(positions.len(), 1);
7477 assert_eq!(positions[0].leaf_path, "$.payload.xs[2]");
7478 assert_eq!(positions[0].start_offset, 0);
7479 assert_eq!(positions[0].end_offset, 1);
7480 assert_unique_start_offsets(&positions);
7481 }
7482
7483 #[test]
7484 fn recursive_extraction_empty_then_nonempty_sibling_keys() {
7485 let props = json!({"payload": {"a": "", "b": "x"}});
7486 let (combined, positions, _stats) = extract_property_fts(
7487 &props,
7488 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7489 );
7490 assert_eq!(combined.as_deref(), Some("x"));
7491 assert_eq!(positions.len(), 1);
7492 assert_eq!(positions[0].leaf_path, "$.payload.b");
7493 assert_eq!(positions[0].start_offset, 0);
7494 assert_eq!(positions[0].end_offset, 1);
7495 assert_unique_start_offsets(&positions);
7496 }
7497
7498 #[test]
7499 fn recursive_extraction_nested_empty_then_nonempty_sibling_keys() {
7500 let props = json!({"payload": {"inner": {"a": "", "b": "x"}}});
7501 let (combined, positions, _stats) = extract_property_fts(
7502 &props,
7503 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7504 );
7505 assert_eq!(combined.as_deref(), Some("x"));
7506 assert_eq!(positions.len(), 1);
7507 assert_eq!(positions[0].leaf_path, "$.payload.inner.b");
7508 assert_eq!(positions[0].start_offset, 0);
7509 assert_eq!(positions[0].end_offset, 1);
7510 assert_unique_start_offsets(&positions);
7511 }
7512
7513 #[test]
7514 fn recursive_extraction_descent_past_empty_sibling_into_nested_subtree() {
7515 let props = json!({"payload": {"a": "", "b": {"c": "x"}}});
7516 let (combined, positions, _stats) = extract_property_fts(
7517 &props,
7518 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7519 );
7520 assert_eq!(combined.as_deref(), Some("x"));
7521 assert_eq!(positions.len(), 1);
7522 assert_eq!(positions[0].leaf_path, "$.payload.b.c");
7523 assert_eq!(positions[0].start_offset, 0);
7524 assert_eq!(positions[0].end_offset, 1);
7525 assert_unique_start_offsets(&positions);
7526 }
7527
7528 #[test]
7529 fn recursive_extraction_all_empty_shapes_emit_no_positions() {
7530 let cases = vec![
7531 json!({"payload": {}}),
7532 json!({"payload": {"a": ""}}),
7533 json!({"payload": {"xs": []}}),
7534 json!({"payload": {"xs": [""]}}),
7535 json!({"payload": {"xs": ["", ""]}}),
7536 json!({"payload": {"xs": ["", "", ""]}}),
7537 ];
7538 for case in cases {
7539 let (combined, positions, _stats) = extract_property_fts(
7540 &case,
7541 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7542 );
7543 assert!(
7544 combined.is_none(),
7545 "all-empty payload {case:?} must produce no combined text, got {combined:?}"
7546 );
7547 assert!(
7548 positions.is_empty(),
7549 "all-empty payload {case:?} must produce no positions, got {positions:?}"
7550 );
7551 }
7552 }
7553
7554 #[test]
7555 fn recursive_extraction_nonempty_then_empty_then_nonempty() {
7556 let props = json!({"payload": {"xs": ["x", "", "y"]}});
7557 let (combined, positions, _stats) = extract_property_fts(
7558 &props,
7559 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7560 );
7561 let blob = combined.expect("blob emitted");
7562 assert_eq!(positions.len(), 2);
7563 assert_eq!(positions[0].leaf_path, "$.payload.xs[0]");
7564 assert_eq!(positions[1].leaf_path, "$.payload.xs[2]");
7565 assert_eq!(positions[0].start_offset, 0);
7566 assert_eq!(positions[0].end_offset, 1);
7567 assert_eq!(positions[1].start_offset, 1 + LEAF_SEPARATOR.len());
7571 assert_eq!(positions[1].end_offset, 2 + LEAF_SEPARATOR.len());
7572 assert_eq!(
7573 &blob[positions[0].start_offset..positions[0].end_offset],
7574 "x"
7575 );
7576 assert_eq!(
7577 &blob[positions[1].start_offset..positions[1].end_offset],
7578 "y"
7579 );
7580 assert_unique_start_offsets(&positions);
7581 }
7582
7583 #[test]
7584 fn recursive_extraction_null_leaves_unchanged() {
7585 let props = json!({"payload": {"xs": [null, null]}});
7586 let (combined, positions, _stats) = extract_property_fts(
7587 &props,
7588 &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7589 );
7590 assert!(combined.is_none());
7591 assert!(positions.is_empty());
7592 }
7593 }
7594
7595 mod parse_property_schema_json_tests {
7596 use super::super::parse_property_schema_json;
7597
7598 #[test]
7599 fn parse_property_schema_json_preserves_weight() {
7600 let json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar"}]"#;
7601 let schema = parse_property_schema_json(json, " ");
7602 assert_eq!(schema.paths.len(), 2);
7603 let title = &schema.paths[0];
7604 assert_eq!(title.path, "$.title");
7605 assert_eq!(title.weight, Some(2.0_f32));
7606 let body = &schema.paths[1];
7607 assert_eq!(body.path, "$.body");
7608 assert_eq!(body.weight, None);
7609 }
7610 }
7611
7612 mod extract_property_fts_columns_tests {
7615 use serde_json::json;
7616
7617 use super::super::{PropertyFtsSchema, PropertyPathEntry, extract_property_fts_columns};
7618
7619 fn weighted_schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7620 PropertyFtsSchema {
7621 paths,
7622 separator: " ".to_owned(),
7623 exclude_paths: Vec::new(),
7624 }
7625 }
7626
7627 #[test]
7628 fn extract_property_fts_columns_returns_per_spec_text() {
7629 let props = json!({"title": "Hello", "body": "World"});
7630 let mut title_entry = PropertyPathEntry::scalar("$.title");
7631 title_entry.weight = Some(2.0);
7632 let mut body_entry = PropertyPathEntry::scalar("$.body");
7633 body_entry.weight = Some(1.0);
7634 let schema = weighted_schema(vec![title_entry, body_entry]);
7635 let cols = extract_property_fts_columns(&props, &schema);
7636 assert_eq!(cols.len(), 2);
7637 assert_eq!(cols[0].0, "title");
7638 assert_eq!(cols[0].1, "Hello");
7639 assert_eq!(cols[1].0, "body");
7640 assert_eq!(cols[1].1, "World");
7641 }
7642
7643 #[test]
7644 fn extract_property_fts_columns_missing_field_returns_empty_string() {
7645 let props = json!({"title": "Hello"});
7646 let mut title_entry = PropertyPathEntry::scalar("$.title");
7647 title_entry.weight = Some(2.0);
7648 let mut body_entry = PropertyPathEntry::scalar("$.body");
7649 body_entry.weight = Some(1.0);
7650 let schema = weighted_schema(vec![title_entry, body_entry]);
7651 let cols = extract_property_fts_columns(&props, &schema);
7652 assert_eq!(cols.len(), 2);
7653 assert_eq!(cols[0].1, "Hello");
7654 assert_eq!(cols[1].1, "", "missing field yields empty string");
7655 }
7656 }
7657
7658 #[test]
7661 fn writer_inserts_per_column_for_weighted_schema() {
7662 use fathomdb_schema::fts_column_name;
7663
7664 let db = NamedTempFile::new().expect("temporary db");
7665 let schema_manager = Arc::new(SchemaManager::new());
7666
7667 {
7669 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7670 schema_manager.bootstrap(&conn).expect("bootstrap");
7671
7672 let title_col = fts_column_name("$.title", false);
7673 let body_col = fts_column_name("$.body", false);
7674
7675 let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#.to_string();
7677 conn.execute(
7678 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7679 VALUES (?1, ?2, ' ')",
7680 rusqlite::params!["Article", paths_json],
7681 )
7682 .expect("register schema");
7683
7684 conn.execute_batch(&format!(
7686 "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_article USING fts5(\
7687 node_logical_id UNINDEXED, {title_col}, {body_col}, \
7688 tokenize = 'porter unicode61 remove_diacritics 2'\
7689 )"
7690 ))
7691 .expect("create weighted per-kind table");
7692 }
7693
7694 let writer = WriterActor::start(
7695 db.path(),
7696 Arc::clone(&schema_manager),
7697 ProvenanceMode::Warn,
7698 Arc::new(TelemetryCounters::default()),
7699 )
7700 .expect("writer");
7701
7702 writer
7703 .submit(WriteRequest {
7704 label: "insert".to_owned(),
7705 nodes: vec![NodeInsert {
7706 row_id: "row-1".to_owned(),
7707 logical_id: "article-1".to_owned(),
7708 kind: "Article".to_owned(),
7709 properties: r#"{"title":"Hello World","body":"Foo Bar"}"#.to_owned(),
7710 source_ref: Some("src-1".to_owned()),
7711 upsert: false,
7712 chunk_policy: ChunkPolicy::Preserve,
7713 content_ref: None,
7714 }],
7715 node_retires: vec![],
7716 edges: vec![],
7717 edge_retires: vec![],
7718 chunks: vec![],
7719 runs: vec![],
7720 steps: vec![],
7721 actions: vec![],
7722 optional_backfills: vec![],
7723 vec_inserts: vec![],
7724 operational_writes: vec![],
7725 })
7726 .expect("write");
7727
7728 let conn = rusqlite::Connection::open(db.path()).expect("conn");
7729 let title_col = fts_column_name("$.title", false);
7730 let body_col = fts_column_name("$.body", false);
7731
7732 let count: i64 = conn
7734 .query_row(
7735 "SELECT count(*) FROM fts_props_article WHERE node_logical_id = 'article-1'",
7736 [],
7737 |r| r.get(0),
7738 )
7739 .expect("count");
7740 assert_eq!(count, 1, "per-kind FTS table must have the row");
7741
7742 let (title_val, body_val): (String, String) = conn
7744 .query_row(
7745 &format!(
7746 "SELECT {title_col}, {body_col} FROM fts_props_article \
7747 WHERE node_logical_id = 'article-1'"
7748 ),
7749 [],
7750 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
7751 )
7752 .expect("select per-column");
7753 assert_eq!(
7754 title_val, "Hello World",
7755 "title column must have correct value"
7756 );
7757 assert_eq!(body_val, "Foo Bar", "body column must have correct value");
7758 }
7759}