1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::SchemaManager;
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14 OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15 OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16 OperationalValidationMode, extract_secondary_index_entries_for_current,
17 extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18 parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26 pub target: ProjectionTarget,
28 pub payload: String,
30}
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35 #[default]
37 Preserve,
38 Replace,
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45 #[default]
47 Warn,
48 Require,
51}
52
53#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56 pub row_id: String,
57 pub logical_id: String,
58 pub kind: String,
59 pub properties: String,
60 pub source_ref: Option<String>,
61 pub upsert: bool,
64 pub chunk_policy: ChunkPolicy,
66 pub content_ref: Option<String>,
68}
69
70#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EdgeInsert {
73 pub row_id: String,
74 pub logical_id: String,
75 pub source_logical_id: String,
76 pub target_logical_id: String,
77 pub kind: String,
78 pub properties: String,
79 pub source_ref: Option<String>,
80 pub upsert: bool,
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
87pub struct NodeRetire {
88 pub logical_id: String,
89 pub source_ref: Option<String>,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeRetire {
95 pub logical_id: String,
96 pub source_ref: Option<String>,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq)]
101pub struct ChunkInsert {
102 pub id: String,
103 pub node_logical_id: String,
104 pub text_content: String,
105 pub byte_start: Option<i64>,
106 pub byte_end: Option<i64>,
107 pub content_hash: Option<String>,
109}
110
111#[derive(Clone, Debug, PartialEq)]
118pub struct VecInsert {
119 pub chunk_id: String,
120 pub embedding: Vec<f32>,
121}
122
123#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum OperationalWrite {
126 Append {
127 collection: String,
128 record_key: String,
129 payload_json: String,
130 source_ref: Option<String>,
131 },
132 Put {
133 collection: String,
134 record_key: String,
135 payload_json: String,
136 source_ref: Option<String>,
137 },
138 Delete {
139 collection: String,
140 record_key: String,
141 source_ref: Option<String>,
142 },
143}
144
145#[derive(Clone, Debug, PartialEq, Eq)]
147pub struct RunInsert {
148 pub id: String,
149 pub kind: String,
150 pub status: String,
151 pub properties: String,
152 pub source_ref: Option<String>,
153 pub upsert: bool,
154 pub supersedes_id: Option<String>,
155}
156
157#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepInsert {
160 pub id: String,
161 pub run_id: String,
162 pub kind: String,
163 pub status: String,
164 pub properties: String,
165 pub source_ref: Option<String>,
166 pub upsert: bool,
167 pub supersedes_id: Option<String>,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct ActionInsert {
173 pub id: String,
174 pub step_id: String,
175 pub kind: String,
176 pub status: String,
177 pub properties: String,
178 pub source_ref: Option<String>,
179 pub upsert: bool,
180 pub supersedes_id: Option<String>,
181}
182
183const MAX_NODES: usize = 10_000;
185const MAX_EDGES: usize = 10_000;
186const MAX_CHUNKS: usize = 50_000;
187const MAX_RETIRES: usize = 10_000;
188const MAX_RUNTIME_ITEMS: usize = 10_000;
189const MAX_OPERATIONAL: usize = 10_000;
190const MAX_TOTAL_ITEMS: usize = 100_000;
191
192const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
199
200#[derive(Clone, Debug, PartialEq)]
202pub struct WriteRequest {
203 pub label: String,
204 pub nodes: Vec<NodeInsert>,
205 pub node_retires: Vec<NodeRetire>,
206 pub edges: Vec<EdgeInsert>,
207 pub edge_retires: Vec<EdgeRetire>,
208 pub chunks: Vec<ChunkInsert>,
209 pub runs: Vec<RunInsert>,
210 pub steps: Vec<StepInsert>,
211 pub actions: Vec<ActionInsert>,
212 pub optional_backfills: Vec<OptionalProjectionTask>,
213 pub vec_inserts: Vec<VecInsert>,
216 pub operational_writes: Vec<OperationalWrite>,
217}
218
219#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct WriteReceipt {
222 pub label: String,
223 pub optional_backfill_count: usize,
224 pub warnings: Vec<String>,
225 pub provenance_warnings: Vec<String>,
226}
227
228#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct LastAccessTouchRequest {
231 pub logical_ids: Vec<String>,
232 pub touched_at: i64,
233 pub source_ref: Option<String>,
234}
235
236#[derive(Clone, Debug, PartialEq, Eq)]
238pub struct LastAccessTouchReport {
239 pub touched_logical_ids: usize,
240 pub touched_at: i64,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct FtsProjectionRow {
245 chunk_id: String,
246 node_logical_id: String,
247 kind: String,
248 text_content: String,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
252struct FtsPropertyProjectionRow {
253 node_logical_id: String,
254 kind: String,
255 text_content: String,
256}
257
258struct PreparedWrite {
259 label: String,
260 nodes: Vec<NodeInsert>,
261 node_retires: Vec<NodeRetire>,
262 edges: Vec<EdgeInsert>,
263 edge_retires: Vec<EdgeRetire>,
264 chunks: Vec<ChunkInsert>,
265 runs: Vec<RunInsert>,
266 steps: Vec<StepInsert>,
267 actions: Vec<ActionInsert>,
268 #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
271 vec_inserts: Vec<VecInsert>,
272 operational_writes: Vec<OperationalWrite>,
273 operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
274 operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
275 operational_validation_warnings: Vec<String>,
276 node_kinds: HashMap<String, String>,
279 required_fts_rows: Vec<FtsProjectionRow>,
281 required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
283 optional_backfills: Vec<OptionalProjectionTask>,
284}
285
286enum WriteMessage {
287 Submit {
288 prepared: Box<PreparedWrite>,
289 reply: Sender<Result<WriteReceipt, EngineError>>,
290 },
291 TouchLastAccessed {
292 request: LastAccessTouchRequest,
293 reply: Sender<Result<LastAccessTouchReport, EngineError>>,
294 },
295}
296
297#[derive(Debug)]
302pub struct WriterActor {
303 sender: ManuallyDrop<SyncSender<WriteMessage>>,
304 thread_handle: Option<thread::JoinHandle<()>>,
305 provenance_mode: ProvenanceMode,
306 _telemetry: Arc<TelemetryCounters>,
308}
309
310impl WriterActor {
311 pub fn start(
314 path: impl AsRef<Path>,
315 schema_manager: Arc<SchemaManager>,
316 provenance_mode: ProvenanceMode,
317 telemetry: Arc<TelemetryCounters>,
318 ) -> Result<Self, EngineError> {
319 let database_path = path.as_ref().to_path_buf();
320 let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
321
322 let writer_telemetry = Arc::clone(&telemetry);
323 let handle = thread::Builder::new()
324 .name("fathomdb-writer".to_owned())
325 .spawn(move || {
326 writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
327 })
328 .map_err(EngineError::Io)?;
329
330 Ok(Self {
331 sender: ManuallyDrop::new(sender),
332 thread_handle: Some(handle),
333 provenance_mode,
334 _telemetry: telemetry,
335 })
336 }
337
338 fn is_thread_alive(&self) -> bool {
340 self.thread_handle
341 .as_ref()
342 .is_some_and(|h| !h.is_finished())
343 }
344
345 fn check_thread_alive(&self) -> Result<(), EngineError> {
347 if self.is_thread_alive() {
348 Ok(())
349 } else {
350 Err(EngineError::WriterRejected(
351 "writer thread has exited".to_owned(),
352 ))
353 }
354 }
355
356 pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
360 self.check_thread_alive()?;
361 let prepared = prepare_write(request, self.provenance_mode)?;
362 let (reply_tx, reply_rx) = mpsc::channel();
363 self.sender
364 .send(WriteMessage::Submit {
365 prepared: Box::new(prepared),
366 reply: reply_tx,
367 })
368 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
369
370 recv_with_timeout(&reply_rx)
371 }
372
373 pub fn touch_last_accessed(
377 &self,
378 request: LastAccessTouchRequest,
379 ) -> Result<LastAccessTouchReport, EngineError> {
380 self.check_thread_alive()?;
381 prepare_touch_last_accessed(&request, self.provenance_mode)?;
382 let (reply_tx, reply_rx) = mpsc::channel();
383 self.sender
384 .send(WriteMessage::TouchLastAccessed {
385 request,
386 reply: reply_tx,
387 })
388 .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
389
390 recv_with_timeout(&reply_rx)
391 }
392}
393
394#[cfg(not(feature = "tracing"))]
395#[allow(clippy::print_stderr)]
396fn stderr_panic_notice() {
397 eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
398}
399
400impl Drop for WriterActor {
401 fn drop(&mut self) {
402 unsafe { ManuallyDrop::drop(&mut self.sender) };
409
410 if let Some(handle) = self.thread_handle.take() {
412 match handle.join() {
413 Ok(()) => {}
414 Err(payload) => {
415 if std::thread::panicking() {
416 trace_warn!(
417 "writer thread panicked during shutdown (suppressed: already panicking)"
418 );
419 #[cfg(not(feature = "tracing"))]
420 stderr_panic_notice();
421 } else {
422 std::panic::resume_unwind(payload);
423 }
424 }
425 }
426 }
427 }
428}
429
430fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
432 rx.recv_timeout(WRITER_REPLY_TIMEOUT)
433 .map_err(|error| match error {
434 mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
435 "write timed out waiting for writer thread reply — the write may still commit"
436 .to_owned(),
437 ),
438 mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
439 })
440 .and_then(|result| result)
441}
442
443fn prepare_touch_last_accessed(
444 request: &LastAccessTouchRequest,
445 mode: ProvenanceMode,
446) -> Result<(), EngineError> {
447 if request.logical_ids.is_empty() {
448 return Err(EngineError::InvalidWrite(
449 "touch_last_accessed requires at least one logical_id".to_owned(),
450 ));
451 }
452 for logical_id in &request.logical_ids {
453 if logical_id.trim().is_empty() {
454 return Err(EngineError::InvalidWrite(
455 "touch_last_accessed requires non-empty logical_ids".to_owned(),
456 ));
457 }
458 }
459 if mode == ProvenanceMode::Require && request.source_ref.is_none() {
460 return Err(EngineError::InvalidWrite(
461 "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
462 .to_owned(),
463 ));
464 }
465 Ok(())
466}
467
468fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
469 let missing: Vec<String> = request
470 .nodes
471 .iter()
472 .filter(|n| n.source_ref.is_none())
473 .map(|n| format!("node '{}'", n.logical_id))
474 .chain(
475 request
476 .node_retires
477 .iter()
478 .filter(|r| r.source_ref.is_none())
479 .map(|r| format!("node retire '{}'", r.logical_id)),
480 )
481 .chain(
482 request
483 .edges
484 .iter()
485 .filter(|e| e.source_ref.is_none())
486 .map(|e| format!("edge '{}'", e.logical_id)),
487 )
488 .chain(
489 request
490 .edge_retires
491 .iter()
492 .filter(|r| r.source_ref.is_none())
493 .map(|r| format!("edge retire '{}'", r.logical_id)),
494 )
495 .chain(
496 request
497 .runs
498 .iter()
499 .filter(|r| r.source_ref.is_none())
500 .map(|r| format!("run '{}'", r.id)),
501 )
502 .chain(
503 request
504 .steps
505 .iter()
506 .filter(|s| s.source_ref.is_none())
507 .map(|s| format!("step '{}'", s.id)),
508 )
509 .chain(
510 request
511 .actions
512 .iter()
513 .filter(|a| a.source_ref.is_none())
514 .map(|a| format!("action '{}'", a.id)),
515 )
516 .chain(
517 request
518 .operational_writes
519 .iter()
520 .filter(|write| operational_write_source_ref(write).is_none())
521 .map(|write| {
522 format!(
523 "operational {} '{}:{}'",
524 operational_write_kind(write),
525 operational_write_collection(write),
526 operational_write_record_key(write)
527 )
528 }),
529 )
530 .collect();
531
532 if missing.is_empty() {
533 Ok(())
534 } else {
535 Err(EngineError::InvalidWrite(format!(
536 "ProvenanceMode::Require: missing source_ref on: {}",
537 missing.join(", ")
538 )))
539 }
540}
541
542fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
543 if request.nodes.len() > MAX_NODES {
544 return Err(EngineError::InvalidWrite(format!(
545 "too many nodes: {} exceeds limit of {MAX_NODES}",
546 request.nodes.len()
547 )));
548 }
549 if request.edges.len() > MAX_EDGES {
550 return Err(EngineError::InvalidWrite(format!(
551 "too many edges: {} exceeds limit of {MAX_EDGES}",
552 request.edges.len()
553 )));
554 }
555 if request.chunks.len() > MAX_CHUNKS {
556 return Err(EngineError::InvalidWrite(format!(
557 "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
558 request.chunks.len()
559 )));
560 }
561 let retires = request.node_retires.len() + request.edge_retires.len();
562 if retires > MAX_RETIRES {
563 return Err(EngineError::InvalidWrite(format!(
564 "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
565 )));
566 }
567 let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
568 if runtime_items > MAX_RUNTIME_ITEMS {
569 return Err(EngineError::InvalidWrite(format!(
570 "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
571 )));
572 }
573 if request.operational_writes.len() > MAX_OPERATIONAL {
574 return Err(EngineError::InvalidWrite(format!(
575 "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
576 request.operational_writes.len()
577 )));
578 }
579 let total = request.nodes.len()
580 + request.node_retires.len()
581 + request.edges.len()
582 + request.edge_retires.len()
583 + request.chunks.len()
584 + request.runs.len()
585 + request.steps.len()
586 + request.actions.len()
587 + request.vec_inserts.len()
588 + request.operational_writes.len();
589 if total > MAX_TOTAL_ITEMS {
590 return Err(EngineError::InvalidWrite(format!(
591 "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
592 )));
593 }
594 Ok(())
595}
596
597#[allow(clippy::too_many_lines)]
598fn prepare_write(
599 request: WriteRequest,
600 mode: ProvenanceMode,
601) -> Result<PreparedWrite, EngineError> {
602 validate_request_size(&request)?;
603
604 for node in &request.nodes {
606 if node.row_id.is_empty() {
607 return Err(EngineError::InvalidWrite(
608 "NodeInsert has empty row_id".to_owned(),
609 ));
610 }
611 if node.logical_id.is_empty() {
612 return Err(EngineError::InvalidWrite(
613 "NodeInsert has empty logical_id".to_owned(),
614 ));
615 }
616 }
617 for edge in &request.edges {
618 if edge.row_id.is_empty() {
619 return Err(EngineError::InvalidWrite(
620 "EdgeInsert has empty row_id".to_owned(),
621 ));
622 }
623 if edge.logical_id.is_empty() {
624 return Err(EngineError::InvalidWrite(
625 "EdgeInsert has empty logical_id".to_owned(),
626 ));
627 }
628 }
629 for chunk in &request.chunks {
630 if chunk.id.is_empty() {
631 return Err(EngineError::InvalidWrite(
632 "ChunkInsert has empty id".to_owned(),
633 ));
634 }
635 if chunk.text_content.is_empty() {
636 return Err(EngineError::InvalidWrite(format!(
637 "chunk '{}' has empty text_content; empty chunks are not allowed",
638 chunk.id
639 )));
640 }
641 }
642 for run in &request.runs {
643 if run.id.is_empty() {
644 return Err(EngineError::InvalidWrite(
645 "RunInsert has empty id".to_owned(),
646 ));
647 }
648 }
649 for step in &request.steps {
650 if step.id.is_empty() {
651 return Err(EngineError::InvalidWrite(
652 "StepInsert has empty id".to_owned(),
653 ));
654 }
655 }
656 for action in &request.actions {
657 if action.id.is_empty() {
658 return Err(EngineError::InvalidWrite(
659 "ActionInsert has empty id".to_owned(),
660 ));
661 }
662 }
663 for vi in &request.vec_inserts {
664 if vi.chunk_id.is_empty() {
665 return Err(EngineError::InvalidWrite(
666 "VecInsert has empty chunk_id".to_owned(),
667 ));
668 }
669 if vi.embedding.is_empty() {
670 return Err(EngineError::InvalidWrite(format!(
671 "VecInsert for chunk '{}' has empty embedding",
672 vi.chunk_id
673 )));
674 }
675 }
676 for operational in &request.operational_writes {
677 if operational_write_collection(operational).is_empty() {
678 return Err(EngineError::InvalidWrite(
679 "OperationalWrite has empty collection".to_owned(),
680 ));
681 }
682 if operational_write_record_key(operational).is_empty() {
683 return Err(EngineError::InvalidWrite(format!(
684 "OperationalWrite for collection '{}' has empty record_key",
685 operational_write_collection(operational)
686 )));
687 }
688 match operational {
689 OperationalWrite::Append { payload_json, .. }
690 | OperationalWrite::Put { payload_json, .. } => {
691 if payload_json.is_empty() {
692 return Err(EngineError::InvalidWrite(format!(
693 "OperationalWrite {} '{}:{}' has empty payload_json",
694 operational_write_kind(operational),
695 operational_write_collection(operational),
696 operational_write_record_key(operational)
697 )));
698 }
699 }
700 OperationalWrite::Delete { .. } => {}
701 }
702 }
703
704 {
706 let mut seen = std::collections::HashSet::new();
707 for node in &request.nodes {
708 if !seen.insert(node.row_id.as_str()) {
709 return Err(EngineError::InvalidWrite(format!(
710 "duplicate row_id '{}' within the same WriteRequest",
711 node.row_id
712 )));
713 }
714 }
715 for edge in &request.edges {
716 if !seen.insert(edge.row_id.as_str()) {
717 return Err(EngineError::InvalidWrite(format!(
718 "duplicate row_id '{}' within the same WriteRequest",
719 edge.row_id
720 )));
721 }
722 }
723 }
724
725 if mode == ProvenanceMode::Require {
727 check_require_provenance(&request)?;
728 }
729
730 for run in &request.runs {
732 if run.upsert && run.supersedes_id.is_none() {
733 return Err(EngineError::InvalidWrite(format!(
734 "run '{}': upsert=true requires supersedes_id to be set",
735 run.id
736 )));
737 }
738 }
739 for step in &request.steps {
740 if step.upsert && step.supersedes_id.is_none() {
741 return Err(EngineError::InvalidWrite(format!(
742 "step '{}': upsert=true requires supersedes_id to be set",
743 step.id
744 )));
745 }
746 }
747 for action in &request.actions {
748 if action.upsert && action.supersedes_id.is_none() {
749 return Err(EngineError::InvalidWrite(format!(
750 "action '{}': upsert=true requires supersedes_id to be set",
751 action.id
752 )));
753 }
754 }
755
756 let node_kinds = request
757 .nodes
758 .iter()
759 .map(|node| (node.logical_id.clone(), node.kind.clone()))
760 .collect::<HashMap<_, _>>();
761
762 Ok(PreparedWrite {
763 label: request.label,
764 nodes: request.nodes,
765 node_retires: request.node_retires,
766 edges: request.edges,
767 edge_retires: request.edge_retires,
768 chunks: request.chunks,
769 runs: request.runs,
770 steps: request.steps,
771 actions: request.actions,
772 vec_inserts: request.vec_inserts,
773 operational_writes: request.operational_writes,
774 operational_collection_kinds: HashMap::new(),
775 operational_collection_filter_fields: HashMap::new(),
776 operational_validation_warnings: Vec::new(),
777 node_kinds,
778 required_fts_rows: Vec::new(),
779 required_property_fts_rows: Vec::new(),
780 optional_backfills: request.optional_backfills,
781 })
782}
783
784fn writer_loop(
785 database_path: &Path,
786 schema_manager: &Arc<SchemaManager>,
787 receiver: mpsc::Receiver<WriteMessage>,
788 telemetry: &TelemetryCounters,
789) {
790 trace_info!("writer thread started");
791
792 let mut conn = match sqlite::open_connection(database_path) {
793 Ok(conn) => conn,
794 Err(error) => {
795 trace_error!(error = %error, "writer thread: database connection failed");
796 reject_all(receiver, &error.to_string());
797 return;
798 }
799 };
800
801 if let Err(error) = schema_manager.bootstrap(&conn) {
802 trace_error!(error = %error, "writer thread: schema bootstrap failed");
803 reject_all(receiver, &error.to_string());
804 return;
805 }
806
807 for message in receiver {
808 match message {
809 WriteMessage::Submit {
810 mut prepared,
811 reply,
812 } => {
813 #[cfg(feature = "tracing")]
814 let start = std::time::Instant::now();
815 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
816 resolve_and_apply(&mut conn, &mut prepared)
817 }));
818 if let Ok(inner) = result {
819 #[allow(unused_variables)]
820 if let Err(error) = &inner {
821 trace_error!(
822 label = %prepared.label,
823 error = %error,
824 "write failed"
825 );
826 telemetry.increment_errors();
827 } else {
828 let row_count = (prepared.nodes.len()
829 + prepared.edges.len()
830 + prepared.chunks.len()) as u64;
831 telemetry.increment_writes(row_count);
832 trace_info!(
833 label = %prepared.label,
834 nodes = prepared.nodes.len(),
835 edges = prepared.edges.len(),
836 chunks = prepared.chunks.len(),
837 duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
838 "write committed"
839 );
840 }
841 let _ = reply.send(inner);
842 } else {
843 trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
844 telemetry.increment_errors();
845 let _ = conn.execute_batch("ROLLBACK");
847 let _ = reply.send(Err(EngineError::WriterRejected(
848 "writer thread panic during resolve_and_apply".to_owned(),
849 )));
850 }
851 }
852 WriteMessage::TouchLastAccessed { request, reply } => {
853 let result = apply_touch_last_accessed(&mut conn, &request);
854 if result.is_ok() {
855 telemetry.increment_writes(0);
856 } else {
857 telemetry.increment_errors();
858 }
859 let _ = reply.send(result);
860 }
861 }
862 }
863
864 trace_info!("writer thread shutting down");
865}
866
867fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
868 for message in receiver {
869 match message {
870 WriteMessage::Submit { reply, .. } => {
871 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
872 }
873 WriteMessage::TouchLastAccessed { reply, .. } => {
874 let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
875 }
876 }
877 }
878}
879
880fn resolve_fts_rows(
888 conn: &rusqlite::Connection,
889 prepared: &mut PreparedWrite,
890) -> Result<(), EngineError> {
891 let retiring_ids: std::collections::HashSet<&str> = prepared
892 .node_retires
893 .iter()
894 .map(|r| r.logical_id.as_str())
895 .collect();
896 for chunk in &prepared.chunks {
897 if retiring_ids.contains(chunk.node_logical_id.as_str()) {
898 return Err(EngineError::InvalidWrite(format!(
899 "chunk '{}' references node_logical_id '{}' which is being retired in the same \
900 WriteRequest; retire and chunk insertion for the same node must not be combined",
901 chunk.id, chunk.node_logical_id
902 )));
903 }
904 }
905 for chunk in &prepared.chunks {
906 let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
907 k.clone()
908 } else {
909 match conn.query_row(
910 "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
911 params![chunk.node_logical_id],
912 |row| row.get::<_, String>(0),
913 ) {
914 Ok(kind) => kind,
915 Err(rusqlite::Error::QueryReturnedNoRows) => {
916 return Err(EngineError::InvalidWrite(format!(
917 "chunk '{}' references node_logical_id '{}' that is not present in this \
918 write request or the database \
919 (v1 limitation: chunks and their nodes must be submitted together or the \
920 node must already exist)",
921 chunk.id, chunk.node_logical_id
922 )));
923 }
924 Err(e) => return Err(EngineError::Sqlite(e)),
925 }
926 };
927 prepared.required_fts_rows.push(FtsProjectionRow {
928 chunk_id: chunk.id.clone(),
929 node_logical_id: chunk.node_logical_id.clone(),
930 kind,
931 text_content: chunk.text_content.clone(),
932 });
933 }
934 trace_debug!(
935 fts_rows = prepared.required_fts_rows.len(),
936 chunks_processed = prepared.chunks.len(),
937 "fts row resolution completed"
938 );
939 Ok(())
940}
941
942fn resolve_property_fts_rows(
945 conn: &rusqlite::Connection,
946 prepared: &mut PreparedWrite,
947) -> Result<(), EngineError> {
948 if prepared.nodes.is_empty() {
949 return Ok(());
950 }
951
952 let schemas: HashMap<String, (Vec<String>, String)> = load_fts_property_schemas(conn)?
953 .into_iter()
954 .map(|(kind, paths, sep)| (kind, (paths, sep)))
955 .collect();
956
957 if schemas.is_empty() {
958 return Ok(());
959 }
960
961 for node in &prepared.nodes {
962 let Some((paths, separator)) = schemas.get(&node.kind) else {
963 continue;
964 };
965 let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
966 if let Some(text_content) = compute_property_fts_text(&props, paths, separator) {
967 prepared
968 .required_property_fts_rows
969 .push(FtsPropertyProjectionRow {
970 node_logical_id: node.logical_id.clone(),
971 kind: node.kind.clone(),
972 text_content,
973 });
974 }
975 }
976 trace_debug!(
977 property_fts_rows = prepared.required_property_fts_rows.len(),
978 nodes_processed = prepared.nodes.len(),
979 "property fts row resolution completed"
980 );
981 Ok(())
982}
983
984pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
988 let Some(path) = path.strip_prefix("$.") else {
989 return Vec::new();
990 };
991 let mut current = value;
992 for segment in path.split('.') {
993 match current.get(segment) {
994 Some(v) => current = v,
995 None => return Vec::new(),
996 }
997 }
998 match current {
999 serde_json::Value::String(s) => vec![s.clone()],
1000 serde_json::Value::Number(n) => vec![n.to_string()],
1001 serde_json::Value::Bool(b) => vec![b.to_string()],
1002 serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
1003 serde_json::Value::Array(arr) => arr
1004 .iter()
1005 .filter_map(|v| match v {
1006 serde_json::Value::String(s) => Some(s.clone()),
1007 serde_json::Value::Number(n) => Some(n.to_string()),
1008 serde_json::Value::Bool(b) => Some(b.to_string()),
1009 _ => None,
1010 })
1011 .collect(),
1012 }
1013}
1014
1015pub(crate) fn compute_property_fts_text(
1020 props: &serde_json::Value,
1021 paths: &[String],
1022 separator: &str,
1023) -> Option<String> {
1024 let mut parts = Vec::new();
1025 for path in paths {
1026 parts.extend(extract_json_path(props, path));
1027 }
1028 if parts.is_empty() {
1029 None
1030 } else {
1031 Some(parts.join(separator))
1032 }
1033}
1034
1035pub(crate) fn load_fts_property_schemas(
1037 conn: &rusqlite::Connection,
1038) -> Result<Vec<(String, Vec<String>, String)>, rusqlite::Error> {
1039 let mut stmt =
1040 conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
1041 stmt.query_map([], |row| {
1042 let kind: String = row.get(0)?;
1043 let paths_json: String = row.get(1)?;
1044 let separator: String = row.get(2)?;
1045 let paths: Vec<String> = serde_json::from_str(&paths_json).unwrap_or_default();
1046 Ok((kind, paths, separator))
1047 })?
1048 .collect::<Result<Vec<_>, _>>()
1049}
1050
1051fn resolve_operational_writes(
1052 conn: &rusqlite::Connection,
1053 prepared: &mut PreparedWrite,
1054) -> Result<(), EngineError> {
1055 let mut collection_kinds = HashMap::new();
1056 let mut collection_filter_fields = HashMap::new();
1057 let mut collection_validation_contracts = HashMap::new();
1058 for write in &prepared.operational_writes {
1059 let collection = operational_write_collection(write);
1060 if !collection_kinds.contains_key(collection) {
1061 let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1062 .query_row(
1063 "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1064 params![collection],
1065 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1066 )
1067 .optional()
1068 .map_err(EngineError::Sqlite)?;
1069 let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1070 .ok_or_else(|| {
1071 EngineError::InvalidWrite(format!(
1072 "operational collection '{collection}' is not registered"
1073 ))
1074 })?;
1075 if disabled_at.is_some() {
1076 return Err(EngineError::InvalidWrite(format!(
1077 "operational collection '{collection}' is disabled"
1078 )));
1079 }
1080 let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1081 .map_err(EngineError::InvalidWrite)?;
1082 let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1083 let validation_contract = parse_operational_validation_contract(&validation_json)
1084 .map_err(EngineError::InvalidWrite)?;
1085 collection_kinds.insert(collection.to_owned(), kind);
1086 collection_filter_fields.insert(collection.to_owned(), filter_fields);
1087 collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1088 }
1089
1090 let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1091 EngineError::InvalidWrite("missing operational collection kind".to_owned())
1092 })?;
1093 match (kind, write) {
1094 (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1095 | (
1096 OperationalCollectionKind::LatestState,
1097 OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1098 ) => {}
1099 (OperationalCollectionKind::AppendOnlyLog, _) => {
1100 return Err(EngineError::InvalidWrite(format!(
1101 "operational collection '{collection}' is append_only_log and only accepts Append"
1102 )));
1103 }
1104 (OperationalCollectionKind::LatestState, _) => {
1105 return Err(EngineError::InvalidWrite(format!(
1106 "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1107 )));
1108 }
1109 }
1110 if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1111 let _ = check_operational_write_against_contract(write, contract)?;
1112 }
1113 }
1114 prepared.operational_collection_kinds = collection_kinds;
1115 prepared.operational_collection_filter_fields = collection_filter_fields;
1116 Ok(())
1117}
1118
1119fn parse_operational_filter_fields(
1120 filter_fields_json: &str,
1121) -> Result<Vec<OperationalFilterField>, EngineError> {
1122 let fields: Vec<OperationalFilterField> =
1123 serde_json::from_str(filter_fields_json).map_err(|error| {
1124 EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1125 })?;
1126 let mut seen = std::collections::HashSet::new();
1127 for field in &fields {
1128 if field.name.trim().is_empty() {
1129 return Err(EngineError::InvalidWrite(
1130 "filter_fields_json field names must not be empty".to_owned(),
1131 ));
1132 }
1133 if !seen.insert(field.name.as_str()) {
1134 return Err(EngineError::InvalidWrite(format!(
1135 "filter_fields_json contains duplicate field '{}'",
1136 field.name
1137 )));
1138 }
1139 if field.modes.is_empty() {
1140 return Err(EngineError::InvalidWrite(format!(
1141 "filter_fields_json field '{}' must declare at least one mode",
1142 field.name
1143 )));
1144 }
1145 if field.modes.contains(&OperationalFilterMode::Prefix)
1146 && field.field_type != OperationalFilterFieldType::String
1147 {
1148 return Err(EngineError::InvalidWrite(format!(
1149 "filter field '{}' only supports prefix for string types",
1150 field.name
1151 )));
1152 }
1153 }
1154 Ok(fields)
1155}
1156
1157#[derive(Clone, Debug, PartialEq, Eq)]
1158struct OperationalFilterValueRow {
1159 field_name: String,
1160 string_value: Option<String>,
1161 integer_value: Option<i64>,
1162}
1163
1164fn extract_operational_filter_values(
1165 filter_fields: &[OperationalFilterField],
1166 payload_json: &str,
1167) -> Vec<OperationalFilterValueRow> {
1168 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1169 return Vec::new();
1170 };
1171 let Some(object) = parsed.as_object() else {
1172 return Vec::new();
1173 };
1174
1175 filter_fields
1176 .iter()
1177 .filter_map(|field| {
1178 let value = object.get(&field.name)?;
1179 match field.field_type {
1180 OperationalFilterFieldType::String => {
1181 value
1182 .as_str()
1183 .map(|string_value| OperationalFilterValueRow {
1184 field_name: field.name.clone(),
1185 string_value: Some(string_value.to_owned()),
1186 integer_value: None,
1187 })
1188 }
1189 OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1190 value
1191 .as_i64()
1192 .map(|integer_value| OperationalFilterValueRow {
1193 field_name: field.name.clone(),
1194 string_value: None,
1195 integer_value: Some(integer_value),
1196 })
1197 }
1198 }
1199 })
1200 .collect()
1201}
1202
1203fn resolve_and_apply(
1204 conn: &mut rusqlite::Connection,
1205 prepared: &mut PreparedWrite,
1206) -> Result<WriteReceipt, EngineError> {
1207 resolve_fts_rows(conn, prepared)?;
1208 resolve_property_fts_rows(conn, prepared)?;
1209 resolve_operational_writes(conn, prepared)?;
1210 apply_write(conn, prepared)
1211}
1212
1213fn apply_touch_last_accessed(
1214 conn: &mut rusqlite::Connection,
1215 request: &LastAccessTouchRequest,
1216) -> Result<LastAccessTouchReport, EngineError> {
1217 let mut seen = std::collections::HashSet::new();
1218 let logical_ids = request
1219 .logical_ids
1220 .iter()
1221 .filter(|logical_id| seen.insert(logical_id.as_str()))
1222 .cloned()
1223 .collect::<Vec<_>>();
1224 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1225
1226 for logical_id in &logical_ids {
1227 let exists = tx
1228 .query_row(
1229 "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1230 params![logical_id],
1231 |row| row.get::<_, i64>(0),
1232 )
1233 .optional()?
1234 .is_some();
1235 if !exists {
1236 return Err(EngineError::InvalidWrite(format!(
1237 "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1238 )));
1239 }
1240 }
1241
1242 {
1243 let mut upsert_metadata = tx.prepare_cached(
1244 "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1245 VALUES (?1, ?2, ?2) \
1246 ON CONFLICT(logical_id) DO UPDATE SET \
1247 last_accessed_at = excluded.last_accessed_at, \
1248 updated_at = excluded.updated_at",
1249 )?;
1250 let mut insert_provenance = tx.prepare_cached(
1251 "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1252 VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1253 )?;
1254 for logical_id in &logical_ids {
1255 upsert_metadata.execute(params![logical_id, request.touched_at])?;
1256 insert_provenance.execute(params![
1257 new_id(),
1258 logical_id,
1259 request.source_ref.as_deref(),
1260 format!("{{\"touched_at\":{}}}", request.touched_at),
1261 ])?;
1262 }
1263 }
1264
1265 tx.commit()?;
1266 Ok(LastAccessTouchReport {
1267 touched_logical_ids: logical_ids.len(),
1268 touched_at: request.touched_at,
1269 })
1270}
1271
1272fn ensure_operational_collections_writable(
1273 tx: &rusqlite::Transaction<'_>,
1274 prepared: &PreparedWrite,
1275) -> Result<(), EngineError> {
1276 for collection in prepared.operational_collection_kinds.keys() {
1277 let disabled_at: Option<Option<i64>> = tx
1278 .query_row(
1279 "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1280 params![collection],
1281 |row| row.get::<_, Option<i64>>(0),
1282 )
1283 .optional()?;
1284 match disabled_at {
1285 Some(Some(_)) => {
1286 return Err(EngineError::InvalidWrite(format!(
1287 "operational collection '{collection}' is disabled"
1288 )));
1289 }
1290 Some(None) => {}
1291 None => {
1292 return Err(EngineError::InvalidWrite(format!(
1293 "operational collection '{collection}' is not registered"
1294 )));
1295 }
1296 }
1297 }
1298 Ok(())
1299}
1300
1301fn validate_operational_writes_against_live_contracts(
1302 tx: &rusqlite::Transaction<'_>,
1303 prepared: &PreparedWrite,
1304) -> Result<Vec<String>, EngineError> {
1305 let mut collection_validation_contracts =
1306 HashMap::<String, Option<OperationalValidationContract>>::new();
1307 for collection in prepared.operational_collection_kinds.keys() {
1308 let validation_json: String = tx
1309 .query_row(
1310 "SELECT validation_json FROM operational_collections WHERE name = ?1",
1311 params![collection],
1312 |row| row.get(0),
1313 )
1314 .map_err(EngineError::Sqlite)?;
1315 let validation_contract = parse_operational_validation_contract(&validation_json)
1316 .map_err(EngineError::InvalidWrite)?;
1317 collection_validation_contracts.insert(collection.clone(), validation_contract);
1318 }
1319
1320 let mut warnings = Vec::new();
1321 for write in &prepared.operational_writes {
1322 if let Some(Some(contract)) =
1323 collection_validation_contracts.get(operational_write_collection(write))
1324 && let Some(warning) = check_operational_write_against_contract(write, contract)?
1325 {
1326 warnings.push(warning);
1327 }
1328 }
1329
1330 Ok(warnings)
1331}
1332
1333fn load_live_operational_secondary_indexes(
1334 tx: &rusqlite::Transaction<'_>,
1335 prepared: &PreparedWrite,
1336) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1337 let mut collection_indexes = HashMap::new();
1338 for (collection, collection_kind) in &prepared.operational_collection_kinds {
1339 let secondary_indexes_json: String = tx
1340 .query_row(
1341 "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1342 params![collection],
1343 |row| row.get(0),
1344 )
1345 .map_err(EngineError::Sqlite)?;
1346 let indexes =
1347 parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1348 .map_err(EngineError::InvalidWrite)?;
1349 collection_indexes.insert(collection.clone(), indexes);
1350 }
1351 Ok(collection_indexes)
1352}
1353
1354fn check_operational_write_against_contract(
1355 write: &OperationalWrite,
1356 contract: &OperationalValidationContract,
1357) -> Result<Option<String>, EngineError> {
1358 if contract.mode == OperationalValidationMode::Disabled {
1359 return Ok(None);
1360 }
1361
1362 let (payload_json, collection, record_key) = match write {
1363 OperationalWrite::Append {
1364 collection,
1365 record_key,
1366 payload_json,
1367 ..
1368 }
1369 | OperationalWrite::Put {
1370 collection,
1371 record_key,
1372 payload_json,
1373 ..
1374 } => (
1375 payload_json.as_str(),
1376 collection.as_str(),
1377 record_key.as_str(),
1378 ),
1379 OperationalWrite::Delete { .. } => return Ok(None),
1380 };
1381
1382 match validate_operational_payload_against_contract(contract, payload_json) {
1383 Ok(()) => Ok(None),
1384 Err(message) => match contract.mode {
1385 OperationalValidationMode::Disabled => Ok(None),
1386 OperationalValidationMode::ReportOnly => Ok(Some(format!(
1387 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1388 kind = operational_write_kind(write)
1389 ))),
1390 OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1391 "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1392 kind = operational_write_kind(write)
1393 ))),
1394 },
1395 }
1396}
1397
1398#[allow(clippy::too_many_lines)]
1399fn apply_write(
1400 conn: &mut rusqlite::Connection,
1401 prepared: &mut PreparedWrite,
1402) -> Result<WriteReceipt, EngineError> {
1403 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1404
1405 {
1408 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1409 let mut del_prop_fts =
1410 tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1411 let mut sup_node = tx.prepare_cached(
1412 "UPDATE nodes SET superseded_at = unixepoch() \
1413 WHERE logical_id = ?1 AND superseded_at IS NULL",
1414 )?;
1415 let mut ins_event = tx.prepare_cached(
1416 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1417 VALUES (?1, 'node_retire', ?2, ?3)",
1418 )?;
1419 for retire in &prepared.node_retires {
1420 del_fts.execute(params![retire.logical_id])?;
1421 del_prop_fts.execute(params![retire.logical_id])?;
1422 sup_node.execute(params![retire.logical_id])?;
1423 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1424 }
1425 }
1426
1427 {
1429 let mut sup_edge = tx.prepare_cached(
1430 "UPDATE edges SET superseded_at = unixepoch() \
1431 WHERE logical_id = ?1 AND superseded_at IS NULL",
1432 )?;
1433 let mut ins_event = tx.prepare_cached(
1434 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1435 VALUES (?1, 'edge_retire', ?2, ?3)",
1436 )?;
1437 for retire in &prepared.edge_retires {
1438 sup_edge.execute(params![retire.logical_id])?;
1439 ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1440 }
1441 }
1442
1443 {
1445 let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1446 let mut del_prop_fts =
1447 tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1448 let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1449 let mut sup_node = tx.prepare_cached(
1450 "UPDATE nodes SET superseded_at = unixepoch() \
1451 WHERE logical_id = ?1 AND superseded_at IS NULL",
1452 )?;
1453 let mut ins_node = tx.prepare_cached(
1454 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1455 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1456 )?;
1457 #[cfg(feature = "sqlite-vec")]
1458 let vec_del_sql2 = "DELETE FROM vec_nodes_active WHERE chunk_id IN \
1459 (SELECT id FROM chunks WHERE node_logical_id = ?1)";
1460 #[cfg(feature = "sqlite-vec")]
1461 let mut del_vec = match tx.prepare_cached(vec_del_sql2) {
1462 Ok(stmt) => Some(stmt),
1463 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => None,
1464 Err(e) => return Err(e.into()),
1465 };
1466 for node in &prepared.nodes {
1467 if node.upsert {
1468 del_prop_fts.execute(params![node.logical_id])?;
1470 if node.chunk_policy == ChunkPolicy::Replace {
1471 #[cfg(feature = "sqlite-vec")]
1472 if let Some(ref mut stmt) = del_vec {
1473 stmt.execute(params![node.logical_id])?;
1474 }
1475 del_fts.execute(params![node.logical_id])?;
1476 del_chunks.execute(params![node.logical_id])?;
1477 }
1478 sup_node.execute(params![node.logical_id])?;
1479 }
1480 ins_node.execute(params![
1481 node.row_id,
1482 node.logical_id,
1483 node.kind,
1484 node.properties,
1485 node.source_ref,
1486 node.content_ref,
1487 ])?;
1488 }
1489 }
1490
1491 {
1493 let mut sup_edge = tx.prepare_cached(
1494 "UPDATE edges SET superseded_at = unixepoch() \
1495 WHERE logical_id = ?1 AND superseded_at IS NULL",
1496 )?;
1497 let mut ins_edge = tx.prepare_cached(
1498 "INSERT INTO edges \
1499 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1500 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1501 )?;
1502 for edge in &prepared.edges {
1503 if edge.upsert {
1504 sup_edge.execute(params![edge.logical_id])?;
1505 }
1506 ins_edge.execute(params![
1507 edge.row_id,
1508 edge.logical_id,
1509 edge.source_logical_id,
1510 edge.target_logical_id,
1511 edge.kind,
1512 edge.properties,
1513 edge.source_ref,
1514 ])?;
1515 }
1516 }
1517
1518 {
1520 let mut ins_chunk = tx.prepare_cached(
1521 "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
1522 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1523 )?;
1524 for chunk in &prepared.chunks {
1525 ins_chunk.execute(params![
1526 chunk.id,
1527 chunk.node_logical_id,
1528 chunk.text_content,
1529 chunk.byte_start,
1530 chunk.byte_end,
1531 chunk.content_hash,
1532 ])?;
1533 }
1534 }
1535
1536 {
1538 let mut sup_run = tx.prepare_cached(
1539 "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1540 )?;
1541 let mut ins_run = tx.prepare_cached(
1542 "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1543 VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1544 )?;
1545 for run in &prepared.runs {
1546 if run.upsert
1547 && let Some(ref prior_id) = run.supersedes_id
1548 {
1549 sup_run.execute(params![prior_id])?;
1550 }
1551 ins_run.execute(params![
1552 run.id,
1553 run.kind,
1554 run.status,
1555 run.properties,
1556 run.source_ref
1557 ])?;
1558 }
1559 }
1560
1561 {
1563 let mut sup_step = tx.prepare_cached(
1564 "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1565 )?;
1566 let mut ins_step = tx.prepare_cached(
1567 "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1568 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1569 )?;
1570 for step in &prepared.steps {
1571 if step.upsert
1572 && let Some(ref prior_id) = step.supersedes_id
1573 {
1574 sup_step.execute(params![prior_id])?;
1575 }
1576 ins_step.execute(params![
1577 step.id,
1578 step.run_id,
1579 step.kind,
1580 step.status,
1581 step.properties,
1582 step.source_ref,
1583 ])?;
1584 }
1585 }
1586
1587 {
1589 let mut sup_action = tx.prepare_cached(
1590 "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1591 )?;
1592 let mut ins_action = tx.prepare_cached(
1593 "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1594 VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1595 )?;
1596 for action in &prepared.actions {
1597 if action.upsert
1598 && let Some(ref prior_id) = action.supersedes_id
1599 {
1600 sup_action.execute(params![prior_id])?;
1601 }
1602 ins_action.execute(params![
1603 action.id,
1604 action.step_id,
1605 action.kind,
1606 action.status,
1607 action.properties,
1608 action.source_ref,
1609 ])?;
1610 }
1611 }
1612
1613 {
1615 ensure_operational_collections_writable(&tx, prepared)?;
1616 prepared.operational_validation_warnings =
1617 validate_operational_writes_against_live_contracts(&tx, prepared)?;
1618 let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
1619
1620 let mut next_mutation_order: i64 = tx.query_row(
1621 "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
1622 [],
1623 |row| row.get(0),
1624 )?;
1625 let mut ins_mutation = tx.prepare_cached(
1626 "INSERT INTO operational_mutations \
1627 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1628 VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1629 )?;
1630 let mut ins_filter_value = tx.prepare_cached(
1631 "INSERT INTO operational_filter_values \
1632 (mutation_id, collection_name, field_name, string_value, integer_value) \
1633 VALUES (?1, ?2, ?3, ?4, ?5)",
1634 )?;
1635 let mut upsert_current = tx.prepare_cached(
1636 "INSERT INTO operational_current \
1637 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1638 VALUES (?1, ?2, ?3, unixepoch(), ?4) \
1639 ON CONFLICT(collection_name, record_key) DO UPDATE SET \
1640 payload_json = excluded.payload_json, \
1641 updated_at = excluded.updated_at, \
1642 last_mutation_id = excluded.last_mutation_id",
1643 )?;
1644 let mut del_current = tx.prepare_cached(
1645 "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
1646 )?;
1647 let mut del_current_secondary_indexes = tx.prepare_cached(
1648 "DELETE FROM operational_secondary_index_entries \
1649 WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
1650 )?;
1651 let mut ins_secondary_index = tx.prepare_cached(
1652 "INSERT INTO operational_secondary_index_entries \
1653 (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
1654 slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
1655 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1656 )?;
1657 let mut current_row_stmt = tx.prepare_cached(
1658 "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
1659 WHERE collection_name = ?1 AND record_key = ?2",
1660 )?;
1661
1662 for write in &prepared.operational_writes {
1663 let collection = operational_write_collection(write);
1664 let record_key = operational_write_record_key(write);
1665 let mutation_id = new_id();
1666 next_mutation_order += 1;
1667 let payload_json = operational_write_payload(write);
1668 ins_mutation.execute(params![
1669 &mutation_id,
1670 collection,
1671 record_key,
1672 operational_write_kind(write),
1673 payload_json,
1674 operational_write_source_ref(write),
1675 next_mutation_order,
1676 ])?;
1677 if let Some(indexes) = collection_secondary_indexes.get(collection) {
1678 for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
1679 ins_secondary_index.execute(params![
1680 collection,
1681 entry.index_name,
1682 "mutation",
1683 &mutation_id,
1684 record_key,
1685 entry.sort_timestamp,
1686 entry.slot1_text,
1687 entry.slot1_integer,
1688 entry.slot2_text,
1689 entry.slot2_integer,
1690 entry.slot3_text,
1691 entry.slot3_integer,
1692 ])?;
1693 }
1694 }
1695 if let Some(filter_fields) = prepared
1696 .operational_collection_filter_fields
1697 .get(collection)
1698 {
1699 for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
1700 ins_filter_value.execute(params![
1701 &mutation_id,
1702 collection,
1703 filter_value.field_name,
1704 filter_value.string_value,
1705 filter_value.integer_value,
1706 ])?;
1707 }
1708 }
1709
1710 if prepared.operational_collection_kinds.get(collection)
1711 == Some(&OperationalCollectionKind::LatestState)
1712 {
1713 del_current_secondary_indexes.execute(params![collection, record_key])?;
1714 match write {
1715 OperationalWrite::Put { payload_json, .. } => {
1716 upsert_current.execute(params![
1717 collection,
1718 record_key,
1719 payload_json,
1720 &mutation_id,
1721 ])?;
1722 if let Some(indexes) = collection_secondary_indexes.get(collection) {
1723 let (current_payload_json, updated_at, last_mutation_id): (
1724 String,
1725 i64,
1726 String,
1727 ) = current_row_stmt
1728 .query_row(params![collection, record_key], |row| {
1729 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1730 })?;
1731 for entry in extract_secondary_index_entries_for_current(
1732 indexes,
1733 ¤t_payload_json,
1734 updated_at,
1735 ) {
1736 ins_secondary_index.execute(params![
1737 collection,
1738 entry.index_name,
1739 "current",
1740 last_mutation_id.as_str(),
1741 record_key,
1742 entry.sort_timestamp,
1743 entry.slot1_text,
1744 entry.slot1_integer,
1745 entry.slot2_text,
1746 entry.slot2_integer,
1747 entry.slot3_text,
1748 entry.slot3_integer,
1749 ])?;
1750 }
1751 }
1752 }
1753 OperationalWrite::Delete { .. } => {
1754 del_current.execute(params![collection, record_key])?;
1755 }
1756 OperationalWrite::Append { .. } => {}
1757 }
1758 }
1759 }
1760 }
1761
1762 {
1764 let mut ins_fts = tx.prepare_cached(
1765 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
1766 VALUES (?1, ?2, ?3, ?4)",
1767 )?;
1768 for fts_row in &prepared.required_fts_rows {
1769 ins_fts.execute(params![
1770 fts_row.chunk_id,
1771 fts_row.node_logical_id,
1772 fts_row.kind,
1773 fts_row.text_content,
1774 ])?;
1775 }
1776 }
1777
1778 if !prepared.required_property_fts_rows.is_empty() {
1780 let mut ins_prop_fts = tx.prepare_cached(
1781 "INSERT INTO fts_node_properties (node_logical_id, kind, text_content) \
1782 VALUES (?1, ?2, ?3)",
1783 )?;
1784 for row in &prepared.required_property_fts_rows {
1785 ins_prop_fts.execute(params![row.node_logical_id, row.kind, row.text_content,])?;
1786 }
1787 }
1788
1789 #[cfg(feature = "sqlite-vec")]
1791 {
1792 match tx
1793 .prepare_cached("INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES (?1, ?2)")
1794 {
1795 Ok(mut ins_vec) => {
1796 for vi in &prepared.vec_inserts {
1797 let bytes: Vec<u8> =
1798 vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
1799 ins_vec.execute(params![vi.chunk_id, bytes])?;
1800 }
1801 }
1802 Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {
1803 }
1805 Err(e) => return Err(e.into()),
1806 }
1807 }
1808
1809 tx.commit()?;
1810
1811 let provenance_warnings: Vec<String> = prepared
1812 .nodes
1813 .iter()
1814 .filter(|node| node.source_ref.is_none())
1815 .map(|node| format!("node '{}' has no source_ref", node.logical_id))
1816 .chain(
1817 prepared
1818 .node_retires
1819 .iter()
1820 .filter(|r| r.source_ref.is_none())
1821 .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
1822 )
1823 .chain(
1824 prepared
1825 .edges
1826 .iter()
1827 .filter(|e| e.source_ref.is_none())
1828 .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
1829 )
1830 .chain(
1831 prepared
1832 .edge_retires
1833 .iter()
1834 .filter(|r| r.source_ref.is_none())
1835 .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
1836 )
1837 .chain(
1838 prepared
1839 .runs
1840 .iter()
1841 .filter(|r| r.source_ref.is_none())
1842 .map(|r| format!("run '{}' has no source_ref", r.id)),
1843 )
1844 .chain(
1845 prepared
1846 .steps
1847 .iter()
1848 .filter(|s| s.source_ref.is_none())
1849 .map(|s| format!("step '{}' has no source_ref", s.id)),
1850 )
1851 .chain(
1852 prepared
1853 .actions
1854 .iter()
1855 .filter(|a| a.source_ref.is_none())
1856 .map(|a| format!("action '{}' has no source_ref", a.id)),
1857 )
1858 .chain(
1859 prepared
1860 .operational_writes
1861 .iter()
1862 .filter(|write| operational_write_source_ref(write).is_none())
1863 .map(|write| {
1864 format!(
1865 "operational {} '{}:{}' has no source_ref",
1866 operational_write_kind(write),
1867 operational_write_collection(write),
1868 operational_write_record_key(write)
1869 )
1870 }),
1871 )
1872 .collect();
1873
1874 let mut warnings = provenance_warnings.clone();
1875 warnings.extend(prepared.operational_validation_warnings.clone());
1876
1877 Ok(WriteReceipt {
1878 label: prepared.label.clone(),
1879 optional_backfill_count: prepared.optional_backfills.len(),
1880 warnings,
1881 provenance_warnings,
1882 })
1883}
1884
1885fn operational_write_collection(write: &OperationalWrite) -> &str {
1886 match write {
1887 OperationalWrite::Append { collection, .. }
1888 | OperationalWrite::Put { collection, .. }
1889 | OperationalWrite::Delete { collection, .. } => collection,
1890 }
1891}
1892
1893fn operational_write_record_key(write: &OperationalWrite) -> &str {
1894 match write {
1895 OperationalWrite::Append { record_key, .. }
1896 | OperationalWrite::Put { record_key, .. }
1897 | OperationalWrite::Delete { record_key, .. } => record_key,
1898 }
1899}
1900
1901fn operational_write_kind(write: &OperationalWrite) -> &'static str {
1902 match write {
1903 OperationalWrite::Append { .. } => "append",
1904 OperationalWrite::Put { .. } => "put",
1905 OperationalWrite::Delete { .. } => "delete",
1906 }
1907}
1908
1909fn operational_write_payload(write: &OperationalWrite) -> &str {
1910 match write {
1911 OperationalWrite::Append { payload_json, .. }
1912 | OperationalWrite::Put { payload_json, .. } => payload_json,
1913 OperationalWrite::Delete { .. } => "null",
1914 }
1915}
1916
1917fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
1918 match write {
1919 OperationalWrite::Append { source_ref, .. }
1920 | OperationalWrite::Put { source_ref, .. }
1921 | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
1922 }
1923}
1924
1925#[cfg(test)]
1926#[allow(clippy::expect_used)]
1927mod tests {
1928 use std::sync::Arc;
1929
1930 use fathomdb_schema::SchemaManager;
1931 use tempfile::NamedTempFile;
1932
1933 use super::{apply_write, prepare_write, resolve_operational_writes};
1934 use crate::{
1935 ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
1936 NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
1937 StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
1938 projection::ProjectionTarget,
1939 };
1940
1941 #[test]
1942 fn writer_executes_runtime_table_rows() {
1943 let db = NamedTempFile::new().expect("temporary db");
1944 let writer = WriterActor::start(
1945 db.path(),
1946 Arc::new(SchemaManager::new()),
1947 ProvenanceMode::Warn,
1948 Arc::new(TelemetryCounters::default()),
1949 )
1950 .expect("writer");
1951
1952 let receipt = writer
1953 .submit(WriteRequest {
1954 label: "runtime".to_owned(),
1955 nodes: vec![],
1956 node_retires: vec![],
1957 edges: vec![],
1958 edge_retires: vec![],
1959 chunks: vec![],
1960 runs: vec![RunInsert {
1961 id: "run-1".to_owned(),
1962 kind: "session".to_owned(),
1963 status: "completed".to_owned(),
1964 properties: "{}".to_owned(),
1965 source_ref: Some("src-1".to_owned()),
1966 upsert: false,
1967 supersedes_id: None,
1968 }],
1969 steps: vec![StepInsert {
1970 id: "step-1".to_owned(),
1971 run_id: "run-1".to_owned(),
1972 kind: "llm".to_owned(),
1973 status: "completed".to_owned(),
1974 properties: "{}".to_owned(),
1975 source_ref: Some("src-1".to_owned()),
1976 upsert: false,
1977 supersedes_id: None,
1978 }],
1979 actions: vec![ActionInsert {
1980 id: "action-1".to_owned(),
1981 step_id: "step-1".to_owned(),
1982 kind: "emit".to_owned(),
1983 status: "completed".to_owned(),
1984 properties: "{}".to_owned(),
1985 source_ref: Some("src-1".to_owned()),
1986 upsert: false,
1987 supersedes_id: None,
1988 }],
1989 optional_backfills: vec![],
1990 vec_inserts: vec![],
1991 operational_writes: vec![],
1992 })
1993 .expect("write receipt");
1994
1995 assert_eq!(receipt.label, "runtime");
1996 }
1997
1998 #[test]
1999 fn writer_put_operational_write_updates_current_and_mutations() {
2000 let db = NamedTempFile::new().expect("temporary db");
2001 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2002 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2003 conn.execute(
2004 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2005 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2006 [],
2007 )
2008 .expect("seed collection");
2009 drop(conn);
2010 let writer = WriterActor::start(
2011 db.path(),
2012 Arc::new(SchemaManager::new()),
2013 ProvenanceMode::Warn,
2014 Arc::new(TelemetryCounters::default()),
2015 )
2016 .expect("writer");
2017
2018 writer
2019 .submit(WriteRequest {
2020 label: "node-and-operational".to_owned(),
2021 nodes: vec![NodeInsert {
2022 row_id: "row-1".to_owned(),
2023 logical_id: "lg-1".to_owned(),
2024 kind: "Meeting".to_owned(),
2025 properties: "{}".to_owned(),
2026 source_ref: Some("src-1".to_owned()),
2027 upsert: false,
2028 chunk_policy: ChunkPolicy::Preserve,
2029 content_ref: None,
2030 }],
2031 node_retires: vec![],
2032 edges: vec![],
2033 edge_retires: vec![],
2034 chunks: vec![],
2035 runs: vec![],
2036 steps: vec![],
2037 actions: vec![],
2038 optional_backfills: vec![],
2039 vec_inserts: vec![],
2040 operational_writes: vec![OperationalWrite::Put {
2041 collection: "connector_health".to_owned(),
2042 record_key: "gmail".to_owned(),
2043 payload_json: r#"{"status":"ok"}"#.to_owned(),
2044 source_ref: Some("src-1".to_owned()),
2045 }],
2046 })
2047 .expect("write receipt");
2048
2049 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2050 let node_count: i64 = conn
2051 .query_row(
2052 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2053 [],
2054 |row| row.get(0),
2055 )
2056 .expect("node count");
2057 assert_eq!(node_count, 1);
2058 let mutation_count: i64 = conn
2059 .query_row(
2060 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2061 AND record_key = 'gmail'",
2062 [],
2063 |row| row.get(0),
2064 )
2065 .expect("mutation count");
2066 assert_eq!(mutation_count, 1);
2067 let payload: String = conn
2068 .query_row(
2069 "SELECT payload_json FROM operational_current \
2070 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2071 [],
2072 |row| row.get(0),
2073 )
2074 .expect("current payload");
2075 assert_eq!(payload, r#"{"status":"ok"}"#);
2076 }
2077
2078 #[test]
2079 fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2080 let db = NamedTempFile::new().expect("temporary db");
2081 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2082 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2083 conn.execute(
2084 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2085 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2086 [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2087 )
2088 .expect("seed collection");
2089 drop(conn);
2090 let writer = WriterActor::start(
2091 db.path(),
2092 Arc::new(SchemaManager::new()),
2093 ProvenanceMode::Warn,
2094 Arc::new(TelemetryCounters::default()),
2095 )
2096 .expect("writer");
2097
2098 writer
2099 .submit(WriteRequest {
2100 label: "disabled-validation".to_owned(),
2101 nodes: vec![],
2102 node_retires: vec![],
2103 edges: vec![],
2104 edge_retires: vec![],
2105 chunks: vec![],
2106 runs: vec![],
2107 steps: vec![],
2108 actions: vec![],
2109 optional_backfills: vec![],
2110 vec_inserts: vec![],
2111 operational_writes: vec![OperationalWrite::Put {
2112 collection: "connector_health".to_owned(),
2113 record_key: "gmail".to_owned(),
2114 payload_json: r#"{"bogus":true}"#.to_owned(),
2115 source_ref: Some("src-1".to_owned()),
2116 }],
2117 })
2118 .expect("write receipt");
2119
2120 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2121 let payload: String = conn
2122 .query_row(
2123 "SELECT payload_json FROM operational_current \
2124 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2125 [],
2126 |row| row.get(0),
2127 )
2128 .expect("current payload");
2129 assert_eq!(payload, r#"{"bogus":true}"#);
2130 }
2131
2132 #[test]
2133 fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2134 let db = NamedTempFile::new().expect("temporary db");
2135 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2136 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2137 conn.execute(
2138 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2139 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2140 [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2141 )
2142 .expect("seed collection");
2143 drop(conn);
2144 let writer = WriterActor::start(
2145 db.path(),
2146 Arc::new(SchemaManager::new()),
2147 ProvenanceMode::Warn,
2148 Arc::new(TelemetryCounters::default()),
2149 )
2150 .expect("writer");
2151
2152 let receipt = writer
2153 .submit(WriteRequest {
2154 label: "report-only-validation".to_owned(),
2155 nodes: vec![],
2156 node_retires: vec![],
2157 edges: vec![],
2158 edge_retires: vec![],
2159 chunks: vec![],
2160 runs: vec![],
2161 steps: vec![],
2162 actions: vec![],
2163 optional_backfills: vec![],
2164 vec_inserts: vec![],
2165 operational_writes: vec![OperationalWrite::Put {
2166 collection: "connector_health".to_owned(),
2167 record_key: "gmail".to_owned(),
2168 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2169 source_ref: Some("src-1".to_owned()),
2170 }],
2171 })
2172 .expect("report_only write should succeed");
2173
2174 assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2175 assert_eq!(receipt.warnings.len(), 1);
2176 assert!(
2177 receipt.warnings[0].contains("connector_health"),
2178 "warning should identify collection"
2179 );
2180 assert!(
2181 receipt.warnings[0].contains("must be one of"),
2182 "warning should explain validation failure"
2183 );
2184
2185 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2186 let payload: String = conn
2187 .query_row(
2188 "SELECT payload_json FROM operational_current \
2189 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2190 [],
2191 |row| row.get(0),
2192 )
2193 .expect("current payload");
2194 assert_eq!(payload, r#"{"status":"bogus"}"#);
2195 }
2196
2197 #[test]
2198 fn writer_rejects_operational_write_for_missing_collection() {
2199 let db = NamedTempFile::new().expect("temporary db");
2200 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2201 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2202 drop(conn);
2203 let writer = WriterActor::start(
2204 db.path(),
2205 Arc::new(SchemaManager::new()),
2206 ProvenanceMode::Warn,
2207 Arc::new(TelemetryCounters::default()),
2208 )
2209 .expect("writer");
2210
2211 let result = writer.submit(WriteRequest {
2212 label: "missing-operational-collection".to_owned(),
2213 nodes: vec![],
2214 node_retires: vec![],
2215 edges: vec![],
2216 edge_retires: vec![],
2217 chunks: vec![],
2218 runs: vec![],
2219 steps: vec![],
2220 actions: vec![],
2221 optional_backfills: vec![],
2222 vec_inserts: vec![],
2223 operational_writes: vec![OperationalWrite::Put {
2224 collection: "connector_health".to_owned(),
2225 record_key: "gmail".to_owned(),
2226 payload_json: r#"{"status":"ok"}"#.to_owned(),
2227 source_ref: Some("src-1".to_owned()),
2228 }],
2229 });
2230
2231 assert!(
2232 matches!(result, Err(EngineError::InvalidWrite(_))),
2233 "missing operational collection must return InvalidWrite"
2234 );
2235 }
2236
2237 #[test]
2238 fn writer_append_operational_write_records_history_without_current_row() {
2239 let db = NamedTempFile::new().expect("temporary db");
2240 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2241 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2242 conn.execute(
2243 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2244 VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2245 [],
2246 )
2247 .expect("seed collection");
2248 drop(conn);
2249 let writer = WriterActor::start(
2250 db.path(),
2251 Arc::new(SchemaManager::new()),
2252 ProvenanceMode::Warn,
2253 Arc::new(TelemetryCounters::default()),
2254 )
2255 .expect("writer");
2256
2257 writer
2258 .submit(WriteRequest {
2259 label: "append-operational".to_owned(),
2260 nodes: vec![],
2261 node_retires: vec![],
2262 edges: vec![],
2263 edge_retires: vec![],
2264 chunks: vec![],
2265 runs: vec![],
2266 steps: vec![],
2267 actions: vec![],
2268 optional_backfills: vec![],
2269 vec_inserts: vec![],
2270 operational_writes: vec![OperationalWrite::Append {
2271 collection: "audit_log".to_owned(),
2272 record_key: "evt-1".to_owned(),
2273 payload_json: r#"{"type":"sync"}"#.to_owned(),
2274 source_ref: Some("src-1".to_owned()),
2275 }],
2276 })
2277 .expect("write receipt");
2278
2279 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2280 let mutation: (String, String) = conn
2281 .query_row(
2282 "SELECT op_kind, payload_json FROM operational_mutations \
2283 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2284 [],
2285 |row| Ok((row.get(0)?, row.get(1)?)),
2286 )
2287 .expect("mutation row");
2288 assert_eq!(mutation.0, "append");
2289 assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2290 let current_count: i64 = conn
2291 .query_row(
2292 "SELECT count(*) FROM operational_current \
2293 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2294 [],
2295 |row| row.get(0),
2296 )
2297 .expect("current count");
2298 assert_eq!(current_count, 0);
2299 }
2300
2301 #[test]
2302 fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2303 let db = NamedTempFile::new().expect("temporary db");
2304 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2305 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2306 conn.execute(
2307 "INSERT INTO operational_collections \
2308 (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2309 VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2310 '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2311 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2312 )
2313 .expect("seed collection");
2314 drop(conn);
2315 let writer = WriterActor::start(
2316 db.path(),
2317 Arc::new(SchemaManager::new()),
2318 ProvenanceMode::Warn,
2319 Arc::new(TelemetryCounters::default()),
2320 )
2321 .expect("writer");
2322
2323 let error = writer
2324 .submit(WriteRequest {
2325 label: "invalid-append".to_owned(),
2326 nodes: vec![],
2327 node_retires: vec![],
2328 edges: vec![],
2329 edge_retires: vec![],
2330 chunks: vec![],
2331 runs: vec![],
2332 steps: vec![],
2333 actions: vec![],
2334 optional_backfills: vec![],
2335 vec_inserts: vec![],
2336 operational_writes: vec![OperationalWrite::Append {
2337 collection: "audit_log".to_owned(),
2338 record_key: "evt-1".to_owned(),
2339 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2340 source_ref: Some("src-1".to_owned()),
2341 }],
2342 })
2343 .expect_err("invalid append must reject");
2344 assert!(matches!(error, EngineError::InvalidWrite(_)));
2345 assert!(error.to_string().contains("must be one of"));
2346
2347 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2348 let mutation_count: i64 = conn
2349 .query_row(
2350 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2351 [],
2352 |row| row.get(0),
2353 )
2354 .expect("mutation count");
2355 assert_eq!(mutation_count, 0);
2356 let filter_count: i64 = conn
2357 .query_row(
2358 "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2359 [],
2360 |row| row.get(0),
2361 )
2362 .expect("filter count");
2363 assert_eq!(filter_count, 0);
2364 }
2365
2366 #[test]
2367 fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2368 let db = NamedTempFile::new().expect("temporary db");
2369 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2370 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2371 conn.execute(
2372 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2373 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2374 [],
2375 )
2376 .expect("seed collection");
2377 drop(conn);
2378 let writer = WriterActor::start(
2379 db.path(),
2380 Arc::new(SchemaManager::new()),
2381 ProvenanceMode::Warn,
2382 Arc::new(TelemetryCounters::default()),
2383 )
2384 .expect("writer");
2385
2386 writer
2387 .submit(WriteRequest {
2388 label: "put-operational".to_owned(),
2389 nodes: vec![],
2390 node_retires: vec![],
2391 edges: vec![],
2392 edge_retires: vec![],
2393 chunks: vec![],
2394 runs: vec![],
2395 steps: vec![],
2396 actions: vec![],
2397 optional_backfills: vec![],
2398 vec_inserts: vec![],
2399 operational_writes: vec![OperationalWrite::Put {
2400 collection: "connector_health".to_owned(),
2401 record_key: "gmail".to_owned(),
2402 payload_json: r#"{"status":"ok"}"#.to_owned(),
2403 source_ref: Some("src-1".to_owned()),
2404 }],
2405 })
2406 .expect("put receipt");
2407
2408 writer
2409 .submit(WriteRequest {
2410 label: "delete-operational".to_owned(),
2411 nodes: vec![],
2412 node_retires: vec![],
2413 edges: vec![],
2414 edge_retires: vec![],
2415 chunks: vec![],
2416 runs: vec![],
2417 steps: vec![],
2418 actions: vec![],
2419 optional_backfills: vec![],
2420 vec_inserts: vec![],
2421 operational_writes: vec![OperationalWrite::Delete {
2422 collection: "connector_health".to_owned(),
2423 record_key: "gmail".to_owned(),
2424 source_ref: Some("src-2".to_owned()),
2425 }],
2426 })
2427 .expect("delete receipt");
2428
2429 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2430 let mutation_kinds: Vec<String> = {
2431 let mut stmt = conn
2432 .prepare(
2433 "SELECT op_kind FROM operational_mutations \
2434 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2435 ORDER BY mutation_order ASC",
2436 )
2437 .expect("stmt");
2438 stmt.query_map([], |row| row.get(0))
2439 .expect("rows")
2440 .collect::<Result<_, _>>()
2441 .expect("collect")
2442 };
2443 assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
2444 let current_count: i64 = conn
2445 .query_row(
2446 "SELECT count(*) FROM operational_current \
2447 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2448 [],
2449 |row| row.get(0),
2450 )
2451 .expect("current count");
2452 assert_eq!(current_count, 0);
2453 }
2454
2455 #[test]
2456 fn writer_delete_bypasses_validation_contract() {
2457 let db = NamedTempFile::new().expect("temporary db");
2458 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2459 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2460 conn.execute(
2461 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2462 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2463 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2464 )
2465 .expect("seed collection");
2466 drop(conn);
2467 let writer = WriterActor::start(
2468 db.path(),
2469 Arc::new(SchemaManager::new()),
2470 ProvenanceMode::Warn,
2471 Arc::new(TelemetryCounters::default()),
2472 )
2473 .expect("writer");
2474
2475 writer
2476 .submit(WriteRequest {
2477 label: "valid-put".to_owned(),
2478 nodes: vec![],
2479 node_retires: vec![],
2480 edges: vec![],
2481 edge_retires: vec![],
2482 chunks: vec![],
2483 runs: vec![],
2484 steps: vec![],
2485 actions: vec![],
2486 optional_backfills: vec![],
2487 vec_inserts: vec![],
2488 operational_writes: vec![OperationalWrite::Put {
2489 collection: "connector_health".to_owned(),
2490 record_key: "gmail".to_owned(),
2491 payload_json: r#"{"status":"ok"}"#.to_owned(),
2492 source_ref: Some("src-1".to_owned()),
2493 }],
2494 })
2495 .expect("put receipt");
2496 writer
2497 .submit(WriteRequest {
2498 label: "delete-after-put".to_owned(),
2499 nodes: vec![],
2500 node_retires: vec![],
2501 edges: vec![],
2502 edge_retires: vec![],
2503 chunks: vec![],
2504 runs: vec![],
2505 steps: vec![],
2506 actions: vec![],
2507 optional_backfills: vec![],
2508 vec_inserts: vec![],
2509 operational_writes: vec![OperationalWrite::Delete {
2510 collection: "connector_health".to_owned(),
2511 record_key: "gmail".to_owned(),
2512 source_ref: Some("src-2".to_owned()),
2513 }],
2514 })
2515 .expect("delete receipt");
2516
2517 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2518 let current_count: i64 = conn
2519 .query_row(
2520 "SELECT count(*) FROM operational_current \
2521 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2522 [],
2523 |row| row.get(0),
2524 )
2525 .expect("current count");
2526 assert_eq!(current_count, 0);
2527 }
2528
2529 #[test]
2530 fn writer_latest_state_secondary_indexes_track_put_and_delete() {
2531 let db = NamedTempFile::new().expect("temporary db");
2532 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2533 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2534 conn.execute(
2535 "INSERT INTO operational_collections \
2536 (name, kind, schema_json, retention_json, secondary_indexes_json) \
2537 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2538 [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
2539 )
2540 .expect("seed collection");
2541 drop(conn);
2542 let writer = WriterActor::start(
2543 db.path(),
2544 Arc::new(SchemaManager::new()),
2545 ProvenanceMode::Warn,
2546 Arc::new(TelemetryCounters::default()),
2547 )
2548 .expect("writer");
2549
2550 writer
2551 .submit(WriteRequest {
2552 label: "secondary-index-put".to_owned(),
2553 nodes: vec![],
2554 node_retires: vec![],
2555 edges: vec![],
2556 edge_retires: vec![],
2557 chunks: vec![],
2558 runs: vec![],
2559 steps: vec![],
2560 actions: vec![],
2561 optional_backfills: vec![],
2562 vec_inserts: vec![],
2563 operational_writes: vec![OperationalWrite::Put {
2564 collection: "connector_health".to_owned(),
2565 record_key: "gmail".to_owned(),
2566 payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
2567 .to_owned(),
2568 source_ref: Some("src-1".to_owned()),
2569 }],
2570 })
2571 .expect("put receipt");
2572
2573 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2574 let current_entry_count: i64 = conn
2575 .query_row(
2576 "SELECT count(*) FROM operational_secondary_index_entries \
2577 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2578 [],
2579 |row| row.get(0),
2580 )
2581 .expect("current secondary index count");
2582 assert_eq!(current_entry_count, 2);
2583 drop(conn);
2584
2585 writer
2586 .submit(WriteRequest {
2587 label: "secondary-index-delete".to_owned(),
2588 nodes: vec![],
2589 node_retires: vec![],
2590 edges: vec![],
2591 edge_retires: vec![],
2592 chunks: vec![],
2593 runs: vec![],
2594 steps: vec![],
2595 actions: vec![],
2596 optional_backfills: vec![],
2597 vec_inserts: vec![],
2598 operational_writes: vec![OperationalWrite::Delete {
2599 collection: "connector_health".to_owned(),
2600 record_key: "gmail".to_owned(),
2601 source_ref: Some("src-2".to_owned()),
2602 }],
2603 })
2604 .expect("delete receipt");
2605
2606 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2607 let current_entry_count: i64 = conn
2608 .query_row(
2609 "SELECT count(*) FROM operational_secondary_index_entries \
2610 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2611 [],
2612 |row| row.get(0),
2613 )
2614 .expect("current secondary index count");
2615 assert_eq!(current_entry_count, 0);
2616 }
2617
2618 #[test]
2619 fn writer_latest_state_operational_writes_persist_mutation_order() {
2620 let db = NamedTempFile::new().expect("temporary db");
2621 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2622 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2623 conn.execute(
2624 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2625 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2626 [],
2627 )
2628 .expect("seed collection");
2629 drop(conn);
2630
2631 let writer = WriterActor::start(
2632 db.path(),
2633 Arc::new(SchemaManager::new()),
2634 ProvenanceMode::Warn,
2635 Arc::new(TelemetryCounters::default()),
2636 )
2637 .expect("writer");
2638
2639 writer
2640 .submit(WriteRequest {
2641 label: "ordered-operational-batch".to_owned(),
2642 nodes: vec![],
2643 node_retires: vec![],
2644 edges: vec![],
2645 edge_retires: vec![],
2646 chunks: vec![],
2647 runs: vec![],
2648 steps: vec![],
2649 actions: vec![],
2650 optional_backfills: vec![],
2651 vec_inserts: vec![],
2652 operational_writes: vec![
2653 OperationalWrite::Put {
2654 collection: "connector_health".to_owned(),
2655 record_key: "gmail".to_owned(),
2656 payload_json: r#"{"status":"old"}"#.to_owned(),
2657 source_ref: Some("src-1".to_owned()),
2658 },
2659 OperationalWrite::Delete {
2660 collection: "connector_health".to_owned(),
2661 record_key: "gmail".to_owned(),
2662 source_ref: Some("src-2".to_owned()),
2663 },
2664 OperationalWrite::Put {
2665 collection: "connector_health".to_owned(),
2666 record_key: "gmail".to_owned(),
2667 payload_json: r#"{"status":"new"}"#.to_owned(),
2668 source_ref: Some("src-3".to_owned()),
2669 },
2670 ],
2671 })
2672 .expect("write receipt");
2673
2674 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2675 let rows: Vec<(String, i64)> = {
2676 let mut stmt = conn
2677 .prepare(
2678 "SELECT op_kind, mutation_order FROM operational_mutations \
2679 WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2680 ORDER BY mutation_order ASC",
2681 )
2682 .expect("stmt");
2683 stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
2684 .expect("rows")
2685 .collect::<Result<_, _>>()
2686 .expect("collect")
2687 };
2688 assert_eq!(
2689 rows,
2690 vec![
2691 ("put".to_owned(), 1),
2692 ("delete".to_owned(), 2),
2693 ("put".to_owned(), 3),
2694 ]
2695 );
2696 let payload: String = conn
2697 .query_row(
2698 "SELECT payload_json FROM operational_current \
2699 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2700 [],
2701 |row| row.get(0),
2702 )
2703 .expect("current payload");
2704 assert_eq!(payload, r#"{"status":"new"}"#);
2705 }
2706
2707 #[test]
2708 fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
2709 let db = NamedTempFile::new().expect("temporary db");
2710 let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
2711 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2712 conn.execute(
2713 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2714 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2715 [],
2716 )
2717 .expect("seed collection");
2718
2719 let request = WriteRequest {
2720 label: "disabled-race".to_owned(),
2721 nodes: vec![],
2722 node_retires: vec![],
2723 edges: vec![],
2724 edge_retires: vec![],
2725 chunks: vec![],
2726 runs: vec![],
2727 steps: vec![],
2728 actions: vec![],
2729 optional_backfills: vec![],
2730 vec_inserts: vec![],
2731 operational_writes: vec![OperationalWrite::Put {
2732 collection: "connector_health".to_owned(),
2733 record_key: "gmail".to_owned(),
2734 payload_json: r#"{"status":"ok"}"#.to_owned(),
2735 source_ref: Some("src-1".to_owned()),
2736 }],
2737 };
2738 let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
2739 resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
2740
2741 conn.execute(
2742 "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
2743 [],
2744 )
2745 .expect("disable collection after preflight");
2746
2747 let error =
2748 apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
2749 assert!(matches!(error, EngineError::InvalidWrite(_)));
2750 assert!(error.to_string().contains("is disabled"));
2751
2752 let mutation_count: i64 = conn
2753 .query_row(
2754 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
2755 [],
2756 |row| row.get(0),
2757 )
2758 .expect("mutation count");
2759 assert_eq!(mutation_count, 0);
2760
2761 let current_count: i64 = conn
2762 .query_row(
2763 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
2764 [],
2765 |row| row.get(0),
2766 )
2767 .expect("current count");
2768 assert_eq!(current_count, 0);
2769 }
2770
2771 #[test]
2772 fn writer_enforce_validation_rejects_invalid_put_atomically() {
2773 let db = NamedTempFile::new().expect("temporary db");
2774 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2775 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2776 conn.execute(
2777 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2778 VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2779 [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2780 )
2781 .expect("seed collection");
2782 drop(conn);
2783 let writer = WriterActor::start(
2784 db.path(),
2785 Arc::new(SchemaManager::new()),
2786 ProvenanceMode::Warn,
2787 Arc::new(TelemetryCounters::default()),
2788 )
2789 .expect("writer");
2790
2791 let error = writer
2792 .submit(WriteRequest {
2793 label: "invalid-put".to_owned(),
2794 nodes: vec![NodeInsert {
2795 row_id: "row-1".to_owned(),
2796 logical_id: "lg-1".to_owned(),
2797 kind: "Meeting".to_owned(),
2798 properties: "{}".to_owned(),
2799 source_ref: Some("src-1".to_owned()),
2800 upsert: false,
2801 chunk_policy: ChunkPolicy::Preserve,
2802 content_ref: None,
2803 }],
2804 node_retires: vec![],
2805 edges: vec![],
2806 edge_retires: vec![],
2807 chunks: vec![],
2808 runs: vec![],
2809 steps: vec![],
2810 actions: vec![],
2811 optional_backfills: vec![],
2812 vec_inserts: vec![],
2813 operational_writes: vec![OperationalWrite::Put {
2814 collection: "connector_health".to_owned(),
2815 record_key: "gmail".to_owned(),
2816 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2817 source_ref: Some("src-1".to_owned()),
2818 }],
2819 })
2820 .expect_err("invalid put must reject");
2821 assert!(matches!(error, EngineError::InvalidWrite(_)));
2822 assert!(error.to_string().contains("must be one of"));
2823
2824 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2825 let node_count: i64 = conn
2826 .query_row(
2827 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2828 [],
2829 |row| row.get(0),
2830 )
2831 .expect("node count");
2832 assert_eq!(node_count, 0);
2833 let mutation_count: i64 = conn
2834 .query_row(
2835 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
2836 [],
2837 |row| row.get(0),
2838 )
2839 .expect("mutation count");
2840 assert_eq!(mutation_count, 0);
2841 let current_count: i64 = conn
2842 .query_row(
2843 "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
2844 [],
2845 |row| row.get(0),
2846 )
2847 .expect("current count");
2848 assert_eq!(current_count, 0);
2849 }
2850
2851 #[test]
2852 fn writer_rejects_append_against_latest_state_collection() {
2853 let db = NamedTempFile::new().expect("temporary db");
2854 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2855 SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2856 conn.execute(
2857 "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2858 VALUES ('connector_health', 'latest_state', '{}', '{}')",
2859 [],
2860 )
2861 .expect("seed collection");
2862 drop(conn);
2863 let writer = WriterActor::start(
2864 db.path(),
2865 Arc::new(SchemaManager::new()),
2866 ProvenanceMode::Warn,
2867 Arc::new(TelemetryCounters::default()),
2868 )
2869 .expect("writer");
2870
2871 let result = writer.submit(WriteRequest {
2872 label: "bad-append".to_owned(),
2873 nodes: vec![],
2874 node_retires: vec![],
2875 edges: vec![],
2876 edge_retires: vec![],
2877 chunks: vec![],
2878 runs: vec![],
2879 steps: vec![],
2880 actions: vec![],
2881 optional_backfills: vec![],
2882 vec_inserts: vec![],
2883 operational_writes: vec![OperationalWrite::Append {
2884 collection: "connector_health".to_owned(),
2885 record_key: "gmail".to_owned(),
2886 payload_json: r#"{"status":"ok"}"#.to_owned(),
2887 source_ref: Some("src-1".to_owned()),
2888 }],
2889 });
2890
2891 assert!(
2892 matches!(result, Err(EngineError::InvalidWrite(_))),
2893 "latest_state collection must reject Append"
2894 );
2895 }
2896
2897 #[test]
2898 fn writer_upsert_supersedes_prior_active_node() {
2899 let db = NamedTempFile::new().expect("temporary db");
2900 let writer = WriterActor::start(
2901 db.path(),
2902 Arc::new(SchemaManager::new()),
2903 ProvenanceMode::Warn,
2904 Arc::new(TelemetryCounters::default()),
2905 )
2906 .expect("writer");
2907
2908 writer
2909 .submit(WriteRequest {
2910 label: "v1".to_owned(),
2911 nodes: vec![NodeInsert {
2912 row_id: "row-1".to_owned(),
2913 logical_id: "lg-1".to_owned(),
2914 kind: "Meeting".to_owned(),
2915 properties: r#"{"version":1}"#.to_owned(),
2916 source_ref: Some("src-1".to_owned()),
2917 upsert: false,
2918 chunk_policy: ChunkPolicy::Preserve,
2919 content_ref: None,
2920 }],
2921 node_retires: vec![],
2922 edges: vec![],
2923 edge_retires: vec![],
2924 chunks: vec![],
2925 runs: vec![],
2926 steps: vec![],
2927 actions: vec![],
2928 optional_backfills: vec![],
2929 vec_inserts: vec![],
2930 operational_writes: vec![],
2931 })
2932 .expect("v1 write");
2933
2934 writer
2935 .submit(WriteRequest {
2936 label: "v2".to_owned(),
2937 nodes: vec![NodeInsert {
2938 row_id: "row-2".to_owned(),
2939 logical_id: "lg-1".to_owned(),
2940 kind: "Meeting".to_owned(),
2941 properties: r#"{"version":2}"#.to_owned(),
2942 source_ref: Some("src-2".to_owned()),
2943 upsert: true,
2944 chunk_policy: ChunkPolicy::Preserve,
2945 content_ref: None,
2946 }],
2947 node_retires: vec![],
2948 edges: vec![],
2949 edge_retires: vec![],
2950 chunks: vec![],
2951 runs: vec![],
2952 steps: vec![],
2953 actions: vec![],
2954 optional_backfills: vec![],
2955 vec_inserts: vec![],
2956 operational_writes: vec![],
2957 })
2958 .expect("v2 upsert write");
2959
2960 let conn = rusqlite::Connection::open(db.path()).expect("conn");
2961 let (active_row_id, props): (String, String) = conn
2962 .query_row(
2963 "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
2964 [],
2965 |row| Ok((row.get(0)?, row.get(1)?)),
2966 )
2967 .expect("active row");
2968 assert_eq!(active_row_id, "row-2");
2969 assert!(props.contains("\"version\":2"));
2970
2971 let superseded: i64 = conn
2972 .query_row(
2973 "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
2974 [],
2975 |row| row.get(0),
2976 )
2977 .expect("superseded count");
2978 assert_eq!(superseded, 1);
2979 }
2980
2981 #[test]
2982 fn writer_inserts_edge_between_two_nodes() {
2983 let db = NamedTempFile::new().expect("temporary db");
2984 let writer = WriterActor::start(
2985 db.path(),
2986 Arc::new(SchemaManager::new()),
2987 ProvenanceMode::Warn,
2988 Arc::new(TelemetryCounters::default()),
2989 )
2990 .expect("writer");
2991
2992 writer
2993 .submit(WriteRequest {
2994 label: "nodes-and-edge".to_owned(),
2995 nodes: vec![
2996 NodeInsert {
2997 row_id: "row-meeting".to_owned(),
2998 logical_id: "meeting-1".to_owned(),
2999 kind: "Meeting".to_owned(),
3000 properties: "{}".to_owned(),
3001 source_ref: Some("src-1".to_owned()),
3002 upsert: false,
3003 chunk_policy: ChunkPolicy::Preserve,
3004 content_ref: None,
3005 },
3006 NodeInsert {
3007 row_id: "row-task".to_owned(),
3008 logical_id: "task-1".to_owned(),
3009 kind: "Task".to_owned(),
3010 properties: "{}".to_owned(),
3011 source_ref: Some("src-1".to_owned()),
3012 upsert: false,
3013 chunk_policy: ChunkPolicy::Preserve,
3014 content_ref: None,
3015 },
3016 ],
3017 node_retires: vec![],
3018 edges: vec![EdgeInsert {
3019 row_id: "edge-1".to_owned(),
3020 logical_id: "edge-lg-1".to_owned(),
3021 source_logical_id: "meeting-1".to_owned(),
3022 target_logical_id: "task-1".to_owned(),
3023 kind: "HAS_TASK".to_owned(),
3024 properties: "{}".to_owned(),
3025 source_ref: Some("src-1".to_owned()),
3026 upsert: false,
3027 }],
3028 edge_retires: vec![],
3029 chunks: vec![],
3030 runs: vec![],
3031 steps: vec![],
3032 actions: vec![],
3033 optional_backfills: vec![],
3034 vec_inserts: vec![],
3035 operational_writes: vec![],
3036 })
3037 .expect("write receipt");
3038
3039 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3040 let (src, tgt, kind): (String, String, String) = conn
3041 .query_row(
3042 "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3043 [],
3044 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3045 )
3046 .expect("edge row");
3047 assert_eq!(src, "meeting-1");
3048 assert_eq!(tgt, "task-1");
3049 assert_eq!(kind, "HAS_TASK");
3050 }
3051
3052 #[test]
3053 #[allow(clippy::too_many_lines)]
3054 fn writer_upsert_supersedes_prior_active_edge() {
3055 let db = NamedTempFile::new().expect("temporary db");
3056 let writer = WriterActor::start(
3057 db.path(),
3058 Arc::new(SchemaManager::new()),
3059 ProvenanceMode::Warn,
3060 Arc::new(TelemetryCounters::default()),
3061 )
3062 .expect("writer");
3063
3064 writer
3066 .submit(WriteRequest {
3067 label: "nodes".to_owned(),
3068 nodes: vec![
3069 NodeInsert {
3070 row_id: "row-a".to_owned(),
3071 logical_id: "node-a".to_owned(),
3072 kind: "Meeting".to_owned(),
3073 properties: "{}".to_owned(),
3074 source_ref: Some("src-1".to_owned()),
3075 upsert: false,
3076 chunk_policy: ChunkPolicy::Preserve,
3077 content_ref: None,
3078 },
3079 NodeInsert {
3080 row_id: "row-b".to_owned(),
3081 logical_id: "node-b".to_owned(),
3082 kind: "Task".to_owned(),
3083 properties: "{}".to_owned(),
3084 source_ref: Some("src-1".to_owned()),
3085 upsert: false,
3086 chunk_policy: ChunkPolicy::Preserve,
3087 content_ref: None,
3088 },
3089 ],
3090 node_retires: vec![],
3091 edges: vec![],
3092 edge_retires: vec![],
3093 chunks: vec![],
3094 runs: vec![],
3095 steps: vec![],
3096 actions: vec![],
3097 optional_backfills: vec![],
3098 vec_inserts: vec![],
3099 operational_writes: vec![],
3100 })
3101 .expect("nodes write");
3102
3103 writer
3105 .submit(WriteRequest {
3106 label: "edge-v1".to_owned(),
3107 nodes: vec![],
3108 node_retires: vec![],
3109 edges: vec![EdgeInsert {
3110 row_id: "edge-row-1".to_owned(),
3111 logical_id: "edge-lg-1".to_owned(),
3112 source_logical_id: "node-a".to_owned(),
3113 target_logical_id: "node-b".to_owned(),
3114 kind: "HAS_TASK".to_owned(),
3115 properties: r#"{"weight":1}"#.to_owned(),
3116 source_ref: Some("src-1".to_owned()),
3117 upsert: false,
3118 }],
3119 edge_retires: vec![],
3120 chunks: vec![],
3121 runs: vec![],
3122 steps: vec![],
3123 actions: vec![],
3124 optional_backfills: vec![],
3125 vec_inserts: vec![],
3126 operational_writes: vec![],
3127 })
3128 .expect("edge v1 write");
3129
3130 writer
3132 .submit(WriteRequest {
3133 label: "edge-v2".to_owned(),
3134 nodes: vec![],
3135 node_retires: vec![],
3136 edges: vec![EdgeInsert {
3137 row_id: "edge-row-2".to_owned(),
3138 logical_id: "edge-lg-1".to_owned(),
3139 source_logical_id: "node-a".to_owned(),
3140 target_logical_id: "node-b".to_owned(),
3141 kind: "HAS_TASK".to_owned(),
3142 properties: r#"{"weight":2}"#.to_owned(),
3143 source_ref: Some("src-2".to_owned()),
3144 upsert: true,
3145 }],
3146 edge_retires: vec![],
3147 chunks: vec![],
3148 runs: vec![],
3149 steps: vec![],
3150 actions: vec![],
3151 optional_backfills: vec![],
3152 vec_inserts: vec![],
3153 operational_writes: vec![],
3154 })
3155 .expect("edge v2 upsert");
3156
3157 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3158 let (active_row_id, props): (String, String) = conn
3159 .query_row(
3160 "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3161 [],
3162 |row| Ok((row.get(0)?, row.get(1)?)),
3163 )
3164 .expect("active edge");
3165 assert_eq!(active_row_id, "edge-row-2");
3166 assert!(props.contains("\"weight\":2"));
3167
3168 let superseded: i64 = conn
3169 .query_row(
3170 "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3171 [],
3172 |row| row.get(0),
3173 )
3174 .expect("superseded count");
3175 assert_eq!(superseded, 1);
3176 }
3177
3178 #[test]
3179 fn writer_fts_rows_are_written_to_database() {
3180 let db = NamedTempFile::new().expect("temporary db");
3181 let writer = WriterActor::start(
3182 db.path(),
3183 Arc::new(SchemaManager::new()),
3184 ProvenanceMode::Warn,
3185 Arc::new(TelemetryCounters::default()),
3186 )
3187 .expect("writer");
3188
3189 writer
3190 .submit(WriteRequest {
3191 label: "seed".to_owned(),
3192 nodes: vec![NodeInsert {
3193 row_id: "row-1".to_owned(),
3194 logical_id: "logical-1".to_owned(),
3195 kind: "Meeting".to_owned(),
3196 properties: "{}".to_owned(),
3197 source_ref: Some("src-1".to_owned()),
3198 upsert: false,
3199 chunk_policy: ChunkPolicy::Preserve,
3200 content_ref: None,
3201 }],
3202 node_retires: vec![],
3203 edges: vec![],
3204 edge_retires: vec![],
3205 chunks: vec![ChunkInsert {
3206 id: "chunk-1".to_owned(),
3207 node_logical_id: "logical-1".to_owned(),
3208 text_content: "budget discussion".to_owned(),
3209 byte_start: None,
3210 byte_end: None,
3211 content_hash: None,
3212 }],
3213 runs: vec![],
3214 steps: vec![],
3215 actions: vec![],
3216 optional_backfills: vec![],
3217 vec_inserts: vec![],
3218 operational_writes: vec![],
3219 })
3220 .expect("write receipt");
3221
3222 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3223 let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3224 conn.query_row(
3225 "SELECT chunk_id, node_logical_id, kind, text_content \
3226 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3227 [],
3228 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3229 )
3230 .expect("fts row");
3231 assert_eq!(chunk_id, "chunk-1");
3232 assert_eq!(node_logical_id, "logical-1");
3233 assert_eq!(kind, "Meeting");
3234 assert_eq!(text_content, "budget discussion");
3235 }
3236
3237 #[test]
3238 fn writer_receipt_warns_on_nodes_without_source_ref() {
3239 let db = NamedTempFile::new().expect("temporary db");
3240 let writer = WriterActor::start(
3241 db.path(),
3242 Arc::new(SchemaManager::new()),
3243 ProvenanceMode::Warn,
3244 Arc::new(TelemetryCounters::default()),
3245 )
3246 .expect("writer");
3247
3248 let receipt = writer
3249 .submit(WriteRequest {
3250 label: "no-source".to_owned(),
3251 nodes: vec![NodeInsert {
3252 row_id: "row-1".to_owned(),
3253 logical_id: "logical-1".to_owned(),
3254 kind: "Meeting".to_owned(),
3255 properties: "{}".to_owned(),
3256 source_ref: None,
3257 upsert: false,
3258 chunk_policy: ChunkPolicy::Preserve,
3259 content_ref: None,
3260 }],
3261 node_retires: vec![],
3262 edges: vec![],
3263 edge_retires: vec![],
3264 chunks: vec![],
3265 runs: vec![],
3266 steps: vec![],
3267 actions: vec![],
3268 optional_backfills: vec![],
3269 vec_inserts: vec![],
3270 operational_writes: vec![],
3271 })
3272 .expect("write receipt");
3273
3274 assert_eq!(receipt.provenance_warnings.len(), 1);
3275 assert!(receipt.provenance_warnings[0].contains("logical-1"));
3276 }
3277
3278 #[test]
3279 fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3280 let db = NamedTempFile::new().expect("temporary db");
3281 let writer = WriterActor::start(
3282 db.path(),
3283 Arc::new(SchemaManager::new()),
3284 ProvenanceMode::Warn,
3285 Arc::new(TelemetryCounters::default()),
3286 )
3287 .expect("writer");
3288
3289 let receipt = writer
3290 .submit(WriteRequest {
3291 label: "with-source".to_owned(),
3292 nodes: vec![NodeInsert {
3293 row_id: "row-1".to_owned(),
3294 logical_id: "logical-1".to_owned(),
3295 kind: "Meeting".to_owned(),
3296 properties: "{}".to_owned(),
3297 source_ref: Some("src-1".to_owned()),
3298 upsert: false,
3299 chunk_policy: ChunkPolicy::Preserve,
3300 content_ref: None,
3301 }],
3302 node_retires: vec![],
3303 edges: vec![],
3304 edge_retires: vec![],
3305 chunks: vec![],
3306 runs: vec![],
3307 steps: vec![],
3308 actions: vec![],
3309 optional_backfills: vec![],
3310 vec_inserts: vec![],
3311 operational_writes: vec![],
3312 })
3313 .expect("write receipt");
3314
3315 assert!(receipt.provenance_warnings.is_empty());
3316 }
3317
3318 #[test]
3319 fn writer_accepts_chunk_for_pre_existing_node() {
3320 let db = NamedTempFile::new().expect("temporary db");
3321 let writer = WriterActor::start(
3322 db.path(),
3323 Arc::new(SchemaManager::new()),
3324 ProvenanceMode::Warn,
3325 Arc::new(TelemetryCounters::default()),
3326 )
3327 .expect("writer");
3328
3329 writer
3331 .submit(WriteRequest {
3332 label: "r1".to_owned(),
3333 nodes: vec![NodeInsert {
3334 row_id: "row-1".to_owned(),
3335 logical_id: "logical-1".to_owned(),
3336 kind: "Meeting".to_owned(),
3337 properties: "{}".to_owned(),
3338 source_ref: Some("src-1".to_owned()),
3339 upsert: false,
3340 chunk_policy: ChunkPolicy::Preserve,
3341 content_ref: None,
3342 }],
3343 node_retires: vec![],
3344 edges: vec![],
3345 edge_retires: vec![],
3346 chunks: vec![],
3347 runs: vec![],
3348 steps: vec![],
3349 actions: vec![],
3350 optional_backfills: vec![],
3351 vec_inserts: vec![],
3352 operational_writes: vec![],
3353 })
3354 .expect("r1 write");
3355
3356 writer
3358 .submit(WriteRequest {
3359 label: "r2".to_owned(),
3360 nodes: vec![],
3361 node_retires: vec![],
3362 edges: vec![],
3363 edge_retires: vec![],
3364 chunks: vec![ChunkInsert {
3365 id: "chunk-1".to_owned(),
3366 node_logical_id: "logical-1".to_owned(),
3367 text_content: "budget discussion".to_owned(),
3368 byte_start: None,
3369 byte_end: None,
3370 content_hash: None,
3371 }],
3372 runs: vec![],
3373 steps: vec![],
3374 actions: vec![],
3375 optional_backfills: vec![],
3376 vec_inserts: vec![],
3377 operational_writes: vec![],
3378 })
3379 .expect("r2 write — chunk for pre-existing node");
3380
3381 let conn = rusqlite::Connection::open(db.path()).expect("conn");
3382 let count: i64 = conn
3383 .query_row(
3384 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3385 [],
3386 |row| row.get(0),
3387 )
3388 .expect("fts count");
3389 assert_eq!(
3390 count, 1,
3391 "FTS row must exist for chunk attached to pre-existing node"
3392 );
3393 }
3394
3395 #[test]
3396 fn writer_rejects_chunk_for_completely_unknown_node() {
3397 let db = NamedTempFile::new().expect("temporary db");
3398 let writer = WriterActor::start(
3399 db.path(),
3400 Arc::new(SchemaManager::new()),
3401 ProvenanceMode::Warn,
3402 Arc::new(TelemetryCounters::default()),
3403 )
3404 .expect("writer");
3405
3406 let result = writer.submit(WriteRequest {
3407 label: "bad".to_owned(),
3408 nodes: vec![],
3409 node_retires: vec![],
3410 edges: vec![],
3411 edge_retires: vec![],
3412 chunks: vec![ChunkInsert {
3413 id: "chunk-1".to_owned(),
3414 node_logical_id: "nonexistent".to_owned(),
3415 text_content: "some text".to_owned(),
3416 byte_start: None,
3417 byte_end: None,
3418 content_hash: None,
3419 }],
3420 runs: vec![],
3421 steps: vec![],
3422 actions: vec![],
3423 optional_backfills: vec![],
3424 vec_inserts: vec![],
3425 operational_writes: vec![],
3426 });
3427
3428 assert!(
3429 matches!(result, Err(EngineError::InvalidWrite(_))),
3430 "completely unknown node must return InvalidWrite"
3431 );
3432 }
3433
3434 #[test]
3435 fn writer_executes_typed_nodes_chunks_and_derived_projections() {
3436 let db = NamedTempFile::new().expect("temporary db");
3437 let writer = WriterActor::start(
3438 db.path(),
3439 Arc::new(SchemaManager::new()),
3440 ProvenanceMode::Warn,
3441 Arc::new(TelemetryCounters::default()),
3442 )
3443 .expect("writer");
3444
3445 let receipt = writer
3446 .submit(WriteRequest {
3447 label: "seed".to_owned(),
3448 nodes: vec![NodeInsert {
3449 row_id: "row-1".to_owned(),
3450 logical_id: "logical-1".to_owned(),
3451 kind: "Meeting".to_owned(),
3452 properties: "{}".to_owned(),
3453 source_ref: None,
3454 upsert: false,
3455 chunk_policy: ChunkPolicy::Preserve,
3456 content_ref: None,
3457 }],
3458 node_retires: vec![],
3459 edges: vec![],
3460 edge_retires: vec![],
3461 chunks: vec![ChunkInsert {
3462 id: "chunk-1".to_owned(),
3463 node_logical_id: "logical-1".to_owned(),
3464 text_content: "budget discussion".to_owned(),
3465 byte_start: None,
3466 byte_end: None,
3467 content_hash: None,
3468 }],
3469 runs: vec![],
3470 steps: vec![],
3471 actions: vec![],
3472 optional_backfills: vec![],
3473 vec_inserts: vec![],
3474 operational_writes: vec![],
3475 })
3476 .expect("write receipt");
3477
3478 assert_eq!(receipt.label, "seed");
3479 }
3480
3481 #[test]
3482 fn writer_node_retire_supersedes_active_node() {
3483 let db = NamedTempFile::new().expect("temporary db");
3484 let writer = WriterActor::start(
3485 db.path(),
3486 Arc::new(SchemaManager::new()),
3487 ProvenanceMode::Warn,
3488 Arc::new(TelemetryCounters::default()),
3489 )
3490 .expect("writer");
3491
3492 writer
3493 .submit(WriteRequest {
3494 label: "seed".to_owned(),
3495 nodes: vec![NodeInsert {
3496 row_id: "row-1".to_owned(),
3497 logical_id: "meeting-1".to_owned(),
3498 kind: "Meeting".to_owned(),
3499 properties: "{}".to_owned(),
3500 source_ref: Some("src-1".to_owned()),
3501 upsert: false,
3502 chunk_policy: ChunkPolicy::Preserve,
3503 content_ref: None,
3504 }],
3505 node_retires: vec![],
3506 edges: vec![],
3507 edge_retires: vec![],
3508 chunks: vec![],
3509 runs: vec![],
3510 steps: vec![],
3511 actions: vec![],
3512 optional_backfills: vec![],
3513 vec_inserts: vec![],
3514 operational_writes: vec![],
3515 })
3516 .expect("seed write");
3517
3518 writer
3519 .submit(WriteRequest {
3520 label: "retire".to_owned(),
3521 nodes: vec![],
3522 node_retires: vec![NodeRetire {
3523 logical_id: "meeting-1".to_owned(),
3524 source_ref: Some("src-2".to_owned()),
3525 }],
3526 edges: vec![],
3527 edge_retires: vec![],
3528 chunks: vec![],
3529 runs: vec![],
3530 steps: vec![],
3531 actions: vec![],
3532 optional_backfills: vec![],
3533 vec_inserts: vec![],
3534 operational_writes: vec![],
3535 })
3536 .expect("retire write");
3537
3538 let conn = rusqlite::Connection::open(db.path()).expect("open");
3539 let active: i64 = conn
3540 .query_row(
3541 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
3542 [],
3543 |r| r.get(0),
3544 )
3545 .expect("count active");
3546 let historical: i64 = conn
3547 .query_row(
3548 "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
3549 [],
3550 |r| r.get(0),
3551 )
3552 .expect("count historical");
3553
3554 assert_eq!(active, 0, "active count must be 0 after retire");
3555 assert_eq!(historical, 1, "historical count must be 1 after retire");
3556 }
3557
3558 #[test]
3559 fn writer_node_retire_preserves_chunks_and_clears_fts() {
3560 let db = NamedTempFile::new().expect("temporary db");
3561 let writer = WriterActor::start(
3562 db.path(),
3563 Arc::new(SchemaManager::new()),
3564 ProvenanceMode::Warn,
3565 Arc::new(TelemetryCounters::default()),
3566 )
3567 .expect("writer");
3568
3569 writer
3570 .submit(WriteRequest {
3571 label: "seed".to_owned(),
3572 nodes: vec![NodeInsert {
3573 row_id: "row-1".to_owned(),
3574 logical_id: "meeting-1".to_owned(),
3575 kind: "Meeting".to_owned(),
3576 properties: "{}".to_owned(),
3577 source_ref: Some("src-1".to_owned()),
3578 upsert: false,
3579 chunk_policy: ChunkPolicy::Preserve,
3580 content_ref: None,
3581 }],
3582 node_retires: vec![],
3583 edges: vec![],
3584 edge_retires: vec![],
3585 chunks: vec![ChunkInsert {
3586 id: "chunk-1".to_owned(),
3587 node_logical_id: "meeting-1".to_owned(),
3588 text_content: "budget discussion".to_owned(),
3589 byte_start: None,
3590 byte_end: None,
3591 content_hash: None,
3592 }],
3593 runs: vec![],
3594 steps: vec![],
3595 actions: vec![],
3596 optional_backfills: vec![],
3597 vec_inserts: vec![],
3598 operational_writes: vec![],
3599 })
3600 .expect("seed write");
3601
3602 writer
3603 .submit(WriteRequest {
3604 label: "retire".to_owned(),
3605 nodes: vec![],
3606 node_retires: vec![NodeRetire {
3607 logical_id: "meeting-1".to_owned(),
3608 source_ref: Some("src-2".to_owned()),
3609 }],
3610 edges: vec![],
3611 edge_retires: vec![],
3612 chunks: vec![],
3613 runs: vec![],
3614 steps: vec![],
3615 actions: vec![],
3616 optional_backfills: vec![],
3617 vec_inserts: vec![],
3618 operational_writes: vec![],
3619 })
3620 .expect("retire write");
3621
3622 let conn = rusqlite::Connection::open(db.path()).expect("open");
3623 let chunk_count: i64 = conn
3624 .query_row(
3625 "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
3626 [],
3627 |r| r.get(0),
3628 )
3629 .expect("chunk count");
3630 let fts_count: i64 = conn
3631 .query_row(
3632 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
3633 [],
3634 |r| r.get(0),
3635 )
3636 .expect("fts count");
3637
3638 assert_eq!(
3639 chunk_count, 1,
3640 "chunks must remain after node retire so restore can re-establish content"
3641 );
3642 assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
3643 }
3644
3645 #[test]
3646 fn writer_edge_retire_supersedes_active_edge() {
3647 let db = NamedTempFile::new().expect("temporary db");
3648 let writer = WriterActor::start(
3649 db.path(),
3650 Arc::new(SchemaManager::new()),
3651 ProvenanceMode::Warn,
3652 Arc::new(TelemetryCounters::default()),
3653 )
3654 .expect("writer");
3655
3656 writer
3657 .submit(WriteRequest {
3658 label: "seed".to_owned(),
3659 nodes: vec![
3660 NodeInsert {
3661 row_id: "row-a".to_owned(),
3662 logical_id: "node-a".to_owned(),
3663 kind: "Meeting".to_owned(),
3664 properties: "{}".to_owned(),
3665 source_ref: Some("src-1".to_owned()),
3666 upsert: false,
3667 chunk_policy: ChunkPolicy::Preserve,
3668 content_ref: None,
3669 },
3670 NodeInsert {
3671 row_id: "row-b".to_owned(),
3672 logical_id: "node-b".to_owned(),
3673 kind: "Task".to_owned(),
3674 properties: "{}".to_owned(),
3675 source_ref: Some("src-1".to_owned()),
3676 upsert: false,
3677 chunk_policy: ChunkPolicy::Preserve,
3678 content_ref: None,
3679 },
3680 ],
3681 node_retires: vec![],
3682 edges: vec![EdgeInsert {
3683 row_id: "edge-1".to_owned(),
3684 logical_id: "edge-lg-1".to_owned(),
3685 source_logical_id: "node-a".to_owned(),
3686 target_logical_id: "node-b".to_owned(),
3687 kind: "HAS_TASK".to_owned(),
3688 properties: "{}".to_owned(),
3689 source_ref: Some("src-1".to_owned()),
3690 upsert: false,
3691 }],
3692 edge_retires: vec![],
3693 chunks: vec![],
3694 runs: vec![],
3695 steps: vec![],
3696 actions: vec![],
3697 optional_backfills: vec![],
3698 vec_inserts: vec![],
3699 operational_writes: vec![],
3700 })
3701 .expect("seed write");
3702
3703 writer
3704 .submit(WriteRequest {
3705 label: "retire-edge".to_owned(),
3706 nodes: vec![],
3707 node_retires: vec![],
3708 edges: vec![],
3709 edge_retires: vec![EdgeRetire {
3710 logical_id: "edge-lg-1".to_owned(),
3711 source_ref: Some("src-2".to_owned()),
3712 }],
3713 chunks: vec![],
3714 runs: vec![],
3715 steps: vec![],
3716 actions: vec![],
3717 optional_backfills: vec![],
3718 vec_inserts: vec![],
3719 operational_writes: vec![],
3720 })
3721 .expect("retire edge write");
3722
3723 let conn = rusqlite::Connection::open(db.path()).expect("open");
3724 let active: i64 = conn
3725 .query_row(
3726 "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3727 [],
3728 |r| r.get(0),
3729 )
3730 .expect("active edge count");
3731
3732 assert_eq!(active, 0, "active edge count must be 0 after retire");
3733 }
3734
3735 #[test]
3736 fn writer_retire_without_source_ref_emits_provenance_warning() {
3737 let db = NamedTempFile::new().expect("temporary db");
3738 let writer = WriterActor::start(
3739 db.path(),
3740 Arc::new(SchemaManager::new()),
3741 ProvenanceMode::Warn,
3742 Arc::new(TelemetryCounters::default()),
3743 )
3744 .expect("writer");
3745
3746 writer
3747 .submit(WriteRequest {
3748 label: "seed".to_owned(),
3749 nodes: vec![NodeInsert {
3750 row_id: "row-1".to_owned(),
3751 logical_id: "meeting-1".to_owned(),
3752 kind: "Meeting".to_owned(),
3753 properties: "{}".to_owned(),
3754 source_ref: Some("src-1".to_owned()),
3755 upsert: false,
3756 chunk_policy: ChunkPolicy::Preserve,
3757 content_ref: None,
3758 }],
3759 node_retires: vec![],
3760 edges: vec![],
3761 edge_retires: vec![],
3762 chunks: vec![],
3763 runs: vec![],
3764 steps: vec![],
3765 actions: vec![],
3766 optional_backfills: vec![],
3767 vec_inserts: vec![],
3768 operational_writes: vec![],
3769 })
3770 .expect("seed write");
3771
3772 let receipt = writer
3773 .submit(WriteRequest {
3774 label: "retire-no-src".to_owned(),
3775 nodes: vec![],
3776 node_retires: vec![NodeRetire {
3777 logical_id: "meeting-1".to_owned(),
3778 source_ref: None,
3779 }],
3780 edges: vec![],
3781 edge_retires: vec![],
3782 chunks: vec![],
3783 runs: vec![],
3784 steps: vec![],
3785 actions: vec![],
3786 optional_backfills: vec![],
3787 vec_inserts: vec![],
3788 operational_writes: vec![],
3789 })
3790 .expect("retire write");
3791
3792 assert!(
3793 !receipt.provenance_warnings.is_empty(),
3794 "retire without source_ref must emit a provenance warning"
3795 );
3796 }
3797
3798 #[test]
3799 #[allow(clippy::too_many_lines)]
3800 fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
3801 let db = NamedTempFile::new().expect("temporary db");
3802 let writer = WriterActor::start(
3803 db.path(),
3804 Arc::new(SchemaManager::new()),
3805 ProvenanceMode::Warn,
3806 Arc::new(TelemetryCounters::default()),
3807 )
3808 .expect("writer");
3809
3810 writer
3811 .submit(WriteRequest {
3812 label: "v1".to_owned(),
3813 nodes: vec![NodeInsert {
3814 row_id: "row-1".to_owned(),
3815 logical_id: "meeting-1".to_owned(),
3816 kind: "Meeting".to_owned(),
3817 properties: "{}".to_owned(),
3818 source_ref: Some("src-1".to_owned()),
3819 upsert: false,
3820 chunk_policy: ChunkPolicy::Preserve,
3821 content_ref: None,
3822 }],
3823 node_retires: vec![],
3824 edges: vec![],
3825 edge_retires: vec![],
3826 chunks: vec![ChunkInsert {
3827 id: "chunk-old".to_owned(),
3828 node_logical_id: "meeting-1".to_owned(),
3829 text_content: "old text".to_owned(),
3830 byte_start: None,
3831 byte_end: None,
3832 content_hash: None,
3833 }],
3834 runs: vec![],
3835 steps: vec![],
3836 actions: vec![],
3837 optional_backfills: vec![],
3838 vec_inserts: vec![],
3839 operational_writes: vec![],
3840 })
3841 .expect("v1 write");
3842
3843 writer
3844 .submit(WriteRequest {
3845 label: "v2".to_owned(),
3846 nodes: vec![NodeInsert {
3847 row_id: "row-2".to_owned(),
3848 logical_id: "meeting-1".to_owned(),
3849 kind: "Meeting".to_owned(),
3850 properties: "{}".to_owned(),
3851 source_ref: Some("src-2".to_owned()),
3852 upsert: true,
3853 chunk_policy: ChunkPolicy::Replace,
3854 content_ref: None,
3855 }],
3856 node_retires: vec![],
3857 edges: vec![],
3858 edge_retires: vec![],
3859 chunks: vec![ChunkInsert {
3860 id: "chunk-new".to_owned(),
3861 node_logical_id: "meeting-1".to_owned(),
3862 text_content: "new text".to_owned(),
3863 byte_start: None,
3864 byte_end: None,
3865 content_hash: None,
3866 }],
3867 runs: vec![],
3868 steps: vec![],
3869 actions: vec![],
3870 optional_backfills: vec![],
3871 vec_inserts: vec![],
3872 operational_writes: vec![],
3873 })
3874 .expect("v2 write");
3875
3876 let conn = rusqlite::Connection::open(db.path()).expect("open");
3877 let old_chunk: i64 = conn
3878 .query_row(
3879 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
3880 [],
3881 |r| r.get(0),
3882 )
3883 .expect("old chunk count");
3884 let new_chunk: i64 = conn
3885 .query_row(
3886 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
3887 [],
3888 |r| r.get(0),
3889 )
3890 .expect("new chunk count");
3891 let fts_old: i64 = conn
3892 .query_row(
3893 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
3894 [],
3895 |r| r.get(0),
3896 )
3897 .expect("old fts count");
3898
3899 assert_eq!(
3900 old_chunk, 0,
3901 "old chunk must be deleted by ChunkPolicy::Replace"
3902 );
3903 assert_eq!(new_chunk, 1, "new chunk must exist after replace");
3904 assert_eq!(
3905 fts_old, 0,
3906 "old FTS row must be deleted by ChunkPolicy::Replace"
3907 );
3908 }
3909
3910 #[test]
3911 fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
3912 let db = NamedTempFile::new().expect("temporary db");
3913 let writer = WriterActor::start(
3914 db.path(),
3915 Arc::new(SchemaManager::new()),
3916 ProvenanceMode::Warn,
3917 Arc::new(TelemetryCounters::default()),
3918 )
3919 .expect("writer");
3920
3921 writer
3922 .submit(WriteRequest {
3923 label: "v1".to_owned(),
3924 nodes: vec![NodeInsert {
3925 row_id: "row-1".to_owned(),
3926 logical_id: "meeting-1".to_owned(),
3927 kind: "Meeting".to_owned(),
3928 properties: "{}".to_owned(),
3929 source_ref: Some("src-1".to_owned()),
3930 upsert: false,
3931 chunk_policy: ChunkPolicy::Preserve,
3932 content_ref: None,
3933 }],
3934 node_retires: vec![],
3935 edges: vec![],
3936 edge_retires: vec![],
3937 chunks: vec![ChunkInsert {
3938 id: "chunk-old".to_owned(),
3939 node_logical_id: "meeting-1".to_owned(),
3940 text_content: "old text".to_owned(),
3941 byte_start: None,
3942 byte_end: None,
3943 content_hash: None,
3944 }],
3945 runs: vec![],
3946 steps: vec![],
3947 actions: vec![],
3948 optional_backfills: vec![],
3949 vec_inserts: vec![],
3950 operational_writes: vec![],
3951 })
3952 .expect("v1 write");
3953
3954 writer
3955 .submit(WriteRequest {
3956 label: "v2-props-only".to_owned(),
3957 nodes: vec![NodeInsert {
3958 row_id: "row-2".to_owned(),
3959 logical_id: "meeting-1".to_owned(),
3960 kind: "Meeting".to_owned(),
3961 properties: r#"{"status":"updated"}"#.to_owned(),
3962 source_ref: Some("src-2".to_owned()),
3963 upsert: true,
3964 chunk_policy: ChunkPolicy::Preserve,
3965 content_ref: None,
3966 }],
3967 node_retires: vec![],
3968 edges: vec![],
3969 edge_retires: vec![],
3970 chunks: vec![],
3971 runs: vec![],
3972 steps: vec![],
3973 actions: vec![],
3974 optional_backfills: vec![],
3975 vec_inserts: vec![],
3976 operational_writes: vec![],
3977 })
3978 .expect("v2 preserve write");
3979
3980 let conn = rusqlite::Connection::open(db.path()).expect("open");
3981 let old_chunk: i64 = conn
3982 .query_row(
3983 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
3984 [],
3985 |r| r.get(0),
3986 )
3987 .expect("old chunk count");
3988
3989 assert_eq!(
3990 old_chunk, 1,
3991 "old chunk must be preserved by ChunkPolicy::Preserve"
3992 );
3993 }
3994
3995 #[test]
3996 fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
3997 let db = NamedTempFile::new().expect("temporary db");
3998 let writer = WriterActor::start(
3999 db.path(),
4000 Arc::new(SchemaManager::new()),
4001 ProvenanceMode::Warn,
4002 Arc::new(TelemetryCounters::default()),
4003 )
4004 .expect("writer");
4005
4006 writer
4007 .submit(WriteRequest {
4008 label: "v1".to_owned(),
4009 nodes: vec![NodeInsert {
4010 row_id: "row-1".to_owned(),
4011 logical_id: "meeting-1".to_owned(),
4012 kind: "Meeting".to_owned(),
4013 properties: "{}".to_owned(),
4014 source_ref: Some("src-1".to_owned()),
4015 upsert: false,
4016 chunk_policy: ChunkPolicy::Preserve,
4017 content_ref: None,
4018 }],
4019 node_retires: vec![],
4020 edges: vec![],
4021 edge_retires: vec![],
4022 chunks: vec![ChunkInsert {
4023 id: "chunk-existing".to_owned(),
4024 node_logical_id: "meeting-1".to_owned(),
4025 text_content: "existing text".to_owned(),
4026 byte_start: None,
4027 byte_end: None,
4028 content_hash: None,
4029 }],
4030 runs: vec![],
4031 steps: vec![],
4032 actions: vec![],
4033 optional_backfills: vec![],
4034 vec_inserts: vec![],
4035 operational_writes: vec![],
4036 })
4037 .expect("v1 write");
4038
4039 writer
4041 .submit(WriteRequest {
4042 label: "insert-no-upsert".to_owned(),
4043 nodes: vec![NodeInsert {
4044 row_id: "row-2".to_owned(),
4045 logical_id: "meeting-2".to_owned(),
4046 kind: "Meeting".to_owned(),
4047 properties: "{}".to_owned(),
4048 source_ref: Some("src-2".to_owned()),
4049 upsert: false,
4050 chunk_policy: ChunkPolicy::Replace,
4051 content_ref: None,
4052 }],
4053 node_retires: vec![],
4054 edges: vec![],
4055 edge_retires: vec![],
4056 chunks: vec![],
4057 runs: vec![],
4058 steps: vec![],
4059 actions: vec![],
4060 optional_backfills: vec![],
4061 vec_inserts: vec![],
4062 operational_writes: vec![],
4063 })
4064 .expect("insert no-upsert write");
4065
4066 let conn = rusqlite::Connection::open(db.path()).expect("open");
4067 let existing_chunk: i64 = conn
4068 .query_row(
4069 "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4070 [],
4071 |r| r.get(0),
4072 )
4073 .expect("chunk count");
4074
4075 assert_eq!(
4076 existing_chunk, 1,
4077 "ChunkPolicy::Replace without upsert must not delete existing chunks"
4078 );
4079 }
4080
4081 #[test]
4082 fn writer_run_upsert_supersedes_prior_active_run() {
4083 let db = NamedTempFile::new().expect("temporary db");
4084 let writer = WriterActor::start(
4085 db.path(),
4086 Arc::new(SchemaManager::new()),
4087 ProvenanceMode::Warn,
4088 Arc::new(TelemetryCounters::default()),
4089 )
4090 .expect("writer");
4091
4092 writer
4093 .submit(WriteRequest {
4094 label: "v1".to_owned(),
4095 nodes: vec![],
4096 node_retires: vec![],
4097 edges: vec![],
4098 edge_retires: vec![],
4099 chunks: vec![],
4100 runs: vec![RunInsert {
4101 id: "run-v1".to_owned(),
4102 kind: "session".to_owned(),
4103 status: "completed".to_owned(),
4104 properties: "{}".to_owned(),
4105 source_ref: Some("src-1".to_owned()),
4106 upsert: false,
4107 supersedes_id: None,
4108 }],
4109 steps: vec![],
4110 actions: vec![],
4111 optional_backfills: vec![],
4112 vec_inserts: vec![],
4113 operational_writes: vec![],
4114 })
4115 .expect("v1 run write");
4116
4117 writer
4118 .submit(WriteRequest {
4119 label: "v2".to_owned(),
4120 nodes: vec![],
4121 node_retires: vec![],
4122 edges: vec![],
4123 edge_retires: vec![],
4124 chunks: vec![],
4125 runs: vec![RunInsert {
4126 id: "run-v2".to_owned(),
4127 kind: "session".to_owned(),
4128 status: "completed".to_owned(),
4129 properties: "{}".to_owned(),
4130 source_ref: Some("src-2".to_owned()),
4131 upsert: true,
4132 supersedes_id: Some("run-v1".to_owned()),
4133 }],
4134 steps: vec![],
4135 actions: vec![],
4136 optional_backfills: vec![],
4137 vec_inserts: vec![],
4138 operational_writes: vec![],
4139 })
4140 .expect("v2 run write");
4141
4142 let conn = rusqlite::Connection::open(db.path()).expect("open");
4143 let v1_historical: i64 = conn
4144 .query_row(
4145 "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4146 [],
4147 |r| r.get(0),
4148 )
4149 .expect("v1 historical count");
4150 let v2_active: i64 = conn
4151 .query_row(
4152 "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4153 [],
4154 |r| r.get(0),
4155 )
4156 .expect("v2 active count");
4157
4158 assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4159 assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4160 }
4161
4162 #[test]
4163 fn writer_step_upsert_supersedes_prior_active_step() {
4164 let db = NamedTempFile::new().expect("temporary db");
4165 let writer = WriterActor::start(
4166 db.path(),
4167 Arc::new(SchemaManager::new()),
4168 ProvenanceMode::Warn,
4169 Arc::new(TelemetryCounters::default()),
4170 )
4171 .expect("writer");
4172
4173 writer
4174 .submit(WriteRequest {
4175 label: "v1".to_owned(),
4176 nodes: vec![],
4177 node_retires: vec![],
4178 edges: vec![],
4179 edge_retires: vec![],
4180 chunks: vec![],
4181 runs: vec![RunInsert {
4182 id: "run-1".to_owned(),
4183 kind: "session".to_owned(),
4184 status: "completed".to_owned(),
4185 properties: "{}".to_owned(),
4186 source_ref: Some("src-1".to_owned()),
4187 upsert: false,
4188 supersedes_id: None,
4189 }],
4190 steps: vec![StepInsert {
4191 id: "step-v1".to_owned(),
4192 run_id: "run-1".to_owned(),
4193 kind: "llm".to_owned(),
4194 status: "completed".to_owned(),
4195 properties: "{}".to_owned(),
4196 source_ref: Some("src-1".to_owned()),
4197 upsert: false,
4198 supersedes_id: None,
4199 }],
4200 actions: vec![],
4201 optional_backfills: vec![],
4202 vec_inserts: vec![],
4203 operational_writes: vec![],
4204 })
4205 .expect("v1 step write");
4206
4207 writer
4208 .submit(WriteRequest {
4209 label: "v2".to_owned(),
4210 nodes: vec![],
4211 node_retires: vec![],
4212 edges: vec![],
4213 edge_retires: vec![],
4214 chunks: vec![],
4215 runs: vec![],
4216 steps: vec![StepInsert {
4217 id: "step-v2".to_owned(),
4218 run_id: "run-1".to_owned(),
4219 kind: "llm".to_owned(),
4220 status: "completed".to_owned(),
4221 properties: "{}".to_owned(),
4222 source_ref: Some("src-2".to_owned()),
4223 upsert: true,
4224 supersedes_id: Some("step-v1".to_owned()),
4225 }],
4226 actions: vec![],
4227 optional_backfills: vec![],
4228 vec_inserts: vec![],
4229 operational_writes: vec![],
4230 })
4231 .expect("v2 step write");
4232
4233 let conn = rusqlite::Connection::open(db.path()).expect("open");
4234 let v1_historical: i64 = conn
4235 .query_row(
4236 "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4237 [],
4238 |r| r.get(0),
4239 )
4240 .expect("v1 historical count");
4241 let v2_active: i64 = conn
4242 .query_row(
4243 "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4244 [],
4245 |r| r.get(0),
4246 )
4247 .expect("v2 active count");
4248
4249 assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4250 assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4251 }
4252
4253 #[test]
4254 fn writer_action_upsert_supersedes_prior_active_action() {
4255 let db = NamedTempFile::new().expect("temporary db");
4256 let writer = WriterActor::start(
4257 db.path(),
4258 Arc::new(SchemaManager::new()),
4259 ProvenanceMode::Warn,
4260 Arc::new(TelemetryCounters::default()),
4261 )
4262 .expect("writer");
4263
4264 writer
4265 .submit(WriteRequest {
4266 label: "v1".to_owned(),
4267 nodes: vec![],
4268 node_retires: vec![],
4269 edges: vec![],
4270 edge_retires: vec![],
4271 chunks: vec![],
4272 runs: vec![RunInsert {
4273 id: "run-1".to_owned(),
4274 kind: "session".to_owned(),
4275 status: "completed".to_owned(),
4276 properties: "{}".to_owned(),
4277 source_ref: Some("src-1".to_owned()),
4278 upsert: false,
4279 supersedes_id: None,
4280 }],
4281 steps: vec![StepInsert {
4282 id: "step-1".to_owned(),
4283 run_id: "run-1".to_owned(),
4284 kind: "llm".to_owned(),
4285 status: "completed".to_owned(),
4286 properties: "{}".to_owned(),
4287 source_ref: Some("src-1".to_owned()),
4288 upsert: false,
4289 supersedes_id: None,
4290 }],
4291 actions: vec![ActionInsert {
4292 id: "action-v1".to_owned(),
4293 step_id: "step-1".to_owned(),
4294 kind: "emit".to_owned(),
4295 status: "completed".to_owned(),
4296 properties: "{}".to_owned(),
4297 source_ref: Some("src-1".to_owned()),
4298 upsert: false,
4299 supersedes_id: None,
4300 }],
4301 optional_backfills: vec![],
4302 vec_inserts: vec![],
4303 operational_writes: vec![],
4304 })
4305 .expect("v1 action write");
4306
4307 writer
4308 .submit(WriteRequest {
4309 label: "v2".to_owned(),
4310 nodes: vec![],
4311 node_retires: vec![],
4312 edges: vec![],
4313 edge_retires: vec![],
4314 chunks: vec![],
4315 runs: vec![],
4316 steps: vec![],
4317 actions: vec![ActionInsert {
4318 id: "action-v2".to_owned(),
4319 step_id: "step-1".to_owned(),
4320 kind: "emit".to_owned(),
4321 status: "completed".to_owned(),
4322 properties: "{}".to_owned(),
4323 source_ref: Some("src-2".to_owned()),
4324 upsert: true,
4325 supersedes_id: Some("action-v1".to_owned()),
4326 }],
4327 optional_backfills: vec![],
4328 vec_inserts: vec![],
4329 operational_writes: vec![],
4330 })
4331 .expect("v2 action write");
4332
4333 let conn = rusqlite::Connection::open(db.path()).expect("open");
4334 let v1_historical: i64 = conn
4335 .query_row(
4336 "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4337 [],
4338 |r| r.get(0),
4339 )
4340 .expect("v1 historical count");
4341 let v2_active: i64 = conn
4342 .query_row(
4343 "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4344 [],
4345 |r| r.get(0),
4346 )
4347 .expect("v2 active count");
4348
4349 assert_eq!(
4350 v1_historical, 1,
4351 "action-v1 must be historical after upsert"
4352 );
4353 assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4354 }
4355
4356 #[test]
4359 fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4360 let db = NamedTempFile::new().expect("temporary db");
4361 let writer = WriterActor::start(
4362 db.path(),
4363 Arc::new(SchemaManager::new()),
4364 ProvenanceMode::Warn,
4365 Arc::new(TelemetryCounters::default()),
4366 )
4367 .expect("writer");
4368
4369 let result = writer.submit(WriteRequest {
4370 label: "bad".to_owned(),
4371 nodes: vec![],
4372 node_retires: vec![],
4373 edges: vec![],
4374 edge_retires: vec![],
4375 chunks: vec![],
4376 runs: vec![RunInsert {
4377 id: "run-1".to_owned(),
4378 kind: "session".to_owned(),
4379 status: "completed".to_owned(),
4380 properties: "{}".to_owned(),
4381 source_ref: None,
4382 upsert: true,
4383 supersedes_id: None,
4384 }],
4385 steps: vec![],
4386 actions: vec![],
4387 optional_backfills: vec![],
4388 vec_inserts: vec![],
4389 operational_writes: vec![],
4390 });
4391
4392 assert!(
4393 matches!(result, Err(EngineError::InvalidWrite(_))),
4394 "run upsert=true without supersedes_id must return InvalidWrite"
4395 );
4396 }
4397
4398 #[test]
4399 fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4400 let db = NamedTempFile::new().expect("temporary db");
4401 let writer = WriterActor::start(
4402 db.path(),
4403 Arc::new(SchemaManager::new()),
4404 ProvenanceMode::Warn,
4405 Arc::new(TelemetryCounters::default()),
4406 )
4407 .expect("writer");
4408
4409 let result = writer.submit(WriteRequest {
4410 label: "bad".to_owned(),
4411 nodes: vec![],
4412 node_retires: vec![],
4413 edges: vec![],
4414 edge_retires: vec![],
4415 chunks: vec![],
4416 runs: vec![],
4417 steps: vec![StepInsert {
4418 id: "step-1".to_owned(),
4419 run_id: "run-1".to_owned(),
4420 kind: "llm".to_owned(),
4421 status: "completed".to_owned(),
4422 properties: "{}".to_owned(),
4423 source_ref: None,
4424 upsert: true,
4425 supersedes_id: None,
4426 }],
4427 actions: vec![],
4428 optional_backfills: vec![],
4429 vec_inserts: vec![],
4430 operational_writes: vec![],
4431 });
4432
4433 assert!(
4434 matches!(result, Err(EngineError::InvalidWrite(_))),
4435 "step upsert=true without supersedes_id must return InvalidWrite"
4436 );
4437 }
4438
4439 #[test]
4440 fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
4441 let db = NamedTempFile::new().expect("temporary db");
4442 let writer = WriterActor::start(
4443 db.path(),
4444 Arc::new(SchemaManager::new()),
4445 ProvenanceMode::Warn,
4446 Arc::new(TelemetryCounters::default()),
4447 )
4448 .expect("writer");
4449
4450 let result = writer.submit(WriteRequest {
4451 label: "bad".to_owned(),
4452 nodes: vec![],
4453 node_retires: vec![],
4454 edges: vec![],
4455 edge_retires: vec![],
4456 chunks: vec![],
4457 runs: vec![],
4458 steps: vec![],
4459 actions: vec![ActionInsert {
4460 id: "action-1".to_owned(),
4461 step_id: "step-1".to_owned(),
4462 kind: "emit".to_owned(),
4463 status: "completed".to_owned(),
4464 properties: "{}".to_owned(),
4465 source_ref: None,
4466 upsert: true,
4467 supersedes_id: None,
4468 }],
4469 optional_backfills: vec![],
4470 vec_inserts: vec![],
4471 operational_writes: vec![],
4472 });
4473
4474 assert!(
4475 matches!(result, Err(EngineError::InvalidWrite(_))),
4476 "action upsert=true without supersedes_id must return InvalidWrite"
4477 );
4478 }
4479
4480 #[test]
4483 fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
4484 let db = NamedTempFile::new().expect("temporary db");
4485 let writer = WriterActor::start(
4486 db.path(),
4487 Arc::new(SchemaManager::new()),
4488 ProvenanceMode::Warn,
4489 Arc::new(TelemetryCounters::default()),
4490 )
4491 .expect("writer");
4492
4493 let receipt = writer
4494 .submit(WriteRequest {
4495 label: "test".to_owned(),
4496 nodes: vec![
4497 NodeInsert {
4498 row_id: "row-a".to_owned(),
4499 logical_id: "node-a".to_owned(),
4500 kind: "Meeting".to_owned(),
4501 properties: "{}".to_owned(),
4502 source_ref: Some("src-1".to_owned()),
4503 upsert: false,
4504 chunk_policy: ChunkPolicy::Preserve,
4505 content_ref: None,
4506 },
4507 NodeInsert {
4508 row_id: "row-b".to_owned(),
4509 logical_id: "node-b".to_owned(),
4510 kind: "Task".to_owned(),
4511 properties: "{}".to_owned(),
4512 source_ref: Some("src-1".to_owned()),
4513 upsert: false,
4514 chunk_policy: ChunkPolicy::Preserve,
4515 content_ref: None,
4516 },
4517 ],
4518 node_retires: vec![],
4519 edges: vec![EdgeInsert {
4520 row_id: "edge-1".to_owned(),
4521 logical_id: "edge-lg-1".to_owned(),
4522 source_logical_id: "node-a".to_owned(),
4523 target_logical_id: "node-b".to_owned(),
4524 kind: "HAS_TASK".to_owned(),
4525 properties: "{}".to_owned(),
4526 source_ref: None,
4527 upsert: false,
4528 }],
4529 edge_retires: vec![],
4530 chunks: vec![],
4531 runs: vec![],
4532 steps: vec![],
4533 actions: vec![],
4534 optional_backfills: vec![],
4535 vec_inserts: vec![],
4536 operational_writes: vec![],
4537 })
4538 .expect("write");
4539
4540 assert!(
4541 !receipt.provenance_warnings.is_empty(),
4542 "edge insert without source_ref must emit a provenance warning"
4543 );
4544 }
4545
4546 #[test]
4547 fn writer_run_insert_without_source_ref_emits_provenance_warning() {
4548 let db = NamedTempFile::new().expect("temporary db");
4549 let writer = WriterActor::start(
4550 db.path(),
4551 Arc::new(SchemaManager::new()),
4552 ProvenanceMode::Warn,
4553 Arc::new(TelemetryCounters::default()),
4554 )
4555 .expect("writer");
4556
4557 let receipt = writer
4558 .submit(WriteRequest {
4559 label: "test".to_owned(),
4560 nodes: vec![],
4561 node_retires: vec![],
4562 edges: vec![],
4563 edge_retires: vec![],
4564 chunks: vec![],
4565 runs: vec![RunInsert {
4566 id: "run-1".to_owned(),
4567 kind: "session".to_owned(),
4568 status: "completed".to_owned(),
4569 properties: "{}".to_owned(),
4570 source_ref: None,
4571 upsert: false,
4572 supersedes_id: None,
4573 }],
4574 steps: vec![],
4575 actions: vec![],
4576 optional_backfills: vec![],
4577 vec_inserts: vec![],
4578 operational_writes: vec![],
4579 })
4580 .expect("write");
4581
4582 assert!(
4583 !receipt.provenance_warnings.is_empty(),
4584 "run insert without source_ref must emit a provenance warning"
4585 );
4586 }
4587
4588 #[test]
4591 fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
4592 let db = NamedTempFile::new().expect("temporary db");
4593 let writer = WriterActor::start(
4594 db.path(),
4595 Arc::new(SchemaManager::new()),
4596 ProvenanceMode::Warn,
4597 Arc::new(TelemetryCounters::default()),
4598 )
4599 .expect("writer");
4600
4601 writer
4603 .submit(WriteRequest {
4604 label: "seed".to_owned(),
4605 nodes: vec![NodeInsert {
4606 row_id: "row-1".to_owned(),
4607 logical_id: "meeting-1".to_owned(),
4608 kind: "Meeting".to_owned(),
4609 properties: "{}".to_owned(),
4610 source_ref: Some("src-1".to_owned()),
4611 upsert: false,
4612 chunk_policy: ChunkPolicy::Preserve,
4613 content_ref: None,
4614 }],
4615 node_retires: vec![],
4616 edges: vec![],
4617 edge_retires: vec![],
4618 chunks: vec![],
4619 runs: vec![],
4620 steps: vec![],
4621 actions: vec![],
4622 optional_backfills: vec![],
4623 vec_inserts: vec![],
4624 operational_writes: vec![],
4625 })
4626 .expect("seed write");
4627
4628 let result = writer.submit(WriteRequest {
4630 label: "bad".to_owned(),
4631 nodes: vec![],
4632 node_retires: vec![NodeRetire {
4633 logical_id: "meeting-1".to_owned(),
4634 source_ref: Some("src-2".to_owned()),
4635 }],
4636 edges: vec![],
4637 edge_retires: vec![],
4638 chunks: vec![ChunkInsert {
4639 id: "chunk-bad".to_owned(),
4640 node_logical_id: "meeting-1".to_owned(),
4641 text_content: "some text".to_owned(),
4642 byte_start: None,
4643 byte_end: None,
4644 content_hash: None,
4645 }],
4646 runs: vec![],
4647 steps: vec![],
4648 actions: vec![],
4649 optional_backfills: vec![],
4650 vec_inserts: vec![],
4651 operational_writes: vec![],
4652 });
4653
4654 assert!(
4655 matches!(result, Err(EngineError::InvalidWrite(_))),
4656 "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
4657 );
4658 }
4659
4660 #[test]
4663 fn writer_batch_insert_multiple_nodes() {
4664 let db = NamedTempFile::new().expect("temporary db");
4665 let writer = WriterActor::start(
4666 db.path(),
4667 Arc::new(SchemaManager::new()),
4668 ProvenanceMode::Warn,
4669 Arc::new(TelemetryCounters::default()),
4670 )
4671 .expect("writer");
4672
4673 let nodes: Vec<NodeInsert> = (0..100)
4674 .map(|i| NodeInsert {
4675 row_id: format!("row-{i}"),
4676 logical_id: format!("lg-{i}"),
4677 kind: "Note".to_owned(),
4678 properties: "{}".to_owned(),
4679 source_ref: Some("batch-src".to_owned()),
4680 upsert: false,
4681 chunk_policy: ChunkPolicy::Preserve,
4682 content_ref: None,
4683 })
4684 .collect();
4685
4686 writer
4687 .submit(WriteRequest {
4688 label: "batch".to_owned(),
4689 nodes,
4690 node_retires: vec![],
4691 edges: vec![],
4692 edge_retires: vec![],
4693 chunks: vec![],
4694 runs: vec![],
4695 steps: vec![],
4696 actions: vec![],
4697 optional_backfills: vec![],
4698 vec_inserts: vec![],
4699 operational_writes: vec![],
4700 })
4701 .expect("batch write");
4702
4703 let conn = rusqlite::Connection::open(db.path()).expect("open");
4704 let count: i64 = conn
4705 .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
4706 .expect("count nodes");
4707 assert_eq!(
4708 count, 100,
4709 "all 100 nodes must be present after batch insert"
4710 );
4711 }
4712
4713 #[test]
4716 fn prepare_write_rejects_empty_node_row_id() {
4717 let db = NamedTempFile::new().expect("temporary db");
4718 let writer = WriterActor::start(
4719 db.path(),
4720 Arc::new(SchemaManager::new()),
4721 ProvenanceMode::Warn,
4722 Arc::new(TelemetryCounters::default()),
4723 )
4724 .expect("writer");
4725
4726 let result = writer.submit(WriteRequest {
4727 label: "test".to_owned(),
4728 nodes: vec![NodeInsert {
4729 row_id: String::new(),
4730 logical_id: "lg-1".to_owned(),
4731 kind: "Note".to_owned(),
4732 properties: "{}".to_owned(),
4733 source_ref: None,
4734 upsert: false,
4735 chunk_policy: ChunkPolicy::Preserve,
4736 content_ref: None,
4737 }],
4738 node_retires: vec![],
4739 edges: vec![],
4740 edge_retires: vec![],
4741 chunks: vec![],
4742 runs: vec![],
4743 steps: vec![],
4744 actions: vec![],
4745 optional_backfills: vec![],
4746 vec_inserts: vec![],
4747 operational_writes: vec![],
4748 });
4749
4750 assert!(
4751 matches!(result, Err(EngineError::InvalidWrite(_))),
4752 "empty row_id must be rejected"
4753 );
4754 }
4755
4756 #[test]
4757 fn prepare_write_rejects_empty_node_logical_id() {
4758 let db = NamedTempFile::new().expect("temporary db");
4759 let writer = WriterActor::start(
4760 db.path(),
4761 Arc::new(SchemaManager::new()),
4762 ProvenanceMode::Warn,
4763 Arc::new(TelemetryCounters::default()),
4764 )
4765 .expect("writer");
4766
4767 let result = writer.submit(WriteRequest {
4768 label: "test".to_owned(),
4769 nodes: vec![NodeInsert {
4770 row_id: "row-1".to_owned(),
4771 logical_id: String::new(),
4772 kind: "Note".to_owned(),
4773 properties: "{}".to_owned(),
4774 source_ref: None,
4775 upsert: false,
4776 chunk_policy: ChunkPolicy::Preserve,
4777 content_ref: None,
4778 }],
4779 node_retires: vec![],
4780 edges: vec![],
4781 edge_retires: vec![],
4782 chunks: vec![],
4783 runs: vec![],
4784 steps: vec![],
4785 actions: vec![],
4786 optional_backfills: vec![],
4787 vec_inserts: vec![],
4788 operational_writes: vec![],
4789 });
4790
4791 assert!(
4792 matches!(result, Err(EngineError::InvalidWrite(_))),
4793 "empty logical_id must be rejected"
4794 );
4795 }
4796
4797 #[test]
4798 fn prepare_write_rejects_duplicate_row_ids_in_request() {
4799 let db = NamedTempFile::new().expect("temporary db");
4800 let writer = WriterActor::start(
4801 db.path(),
4802 Arc::new(SchemaManager::new()),
4803 ProvenanceMode::Warn,
4804 Arc::new(TelemetryCounters::default()),
4805 )
4806 .expect("writer");
4807
4808 let result = writer.submit(WriteRequest {
4809 label: "test".to_owned(),
4810 nodes: vec![
4811 NodeInsert {
4812 row_id: "row-1".to_owned(),
4813 logical_id: "lg-1".to_owned(),
4814 kind: "Note".to_owned(),
4815 properties: "{}".to_owned(),
4816 source_ref: None,
4817 upsert: false,
4818 chunk_policy: ChunkPolicy::Preserve,
4819 content_ref: None,
4820 },
4821 NodeInsert {
4822 row_id: "row-1".to_owned(), logical_id: "lg-2".to_owned(),
4824 kind: "Note".to_owned(),
4825 properties: "{}".to_owned(),
4826 source_ref: None,
4827 upsert: false,
4828 chunk_policy: ChunkPolicy::Preserve,
4829 content_ref: None,
4830 },
4831 ],
4832 node_retires: vec![],
4833 edges: vec![],
4834 edge_retires: vec![],
4835 chunks: vec![],
4836 runs: vec![],
4837 steps: vec![],
4838 actions: vec![],
4839 optional_backfills: vec![],
4840 vec_inserts: vec![],
4841 operational_writes: vec![],
4842 });
4843
4844 assert!(
4845 matches!(result, Err(EngineError::InvalidWrite(_))),
4846 "duplicate row_id within request must be rejected"
4847 );
4848 }
4849
4850 #[test]
4851 fn prepare_write_rejects_empty_chunk_id() {
4852 let db = NamedTempFile::new().expect("temporary db");
4853 let writer = WriterActor::start(
4854 db.path(),
4855 Arc::new(SchemaManager::new()),
4856 ProvenanceMode::Warn,
4857 Arc::new(TelemetryCounters::default()),
4858 )
4859 .expect("writer");
4860
4861 let result = writer.submit(WriteRequest {
4862 label: "test".to_owned(),
4863 nodes: vec![NodeInsert {
4864 row_id: "row-1".to_owned(),
4865 logical_id: "lg-1".to_owned(),
4866 kind: "Note".to_owned(),
4867 properties: "{}".to_owned(),
4868 source_ref: None,
4869 upsert: false,
4870 chunk_policy: ChunkPolicy::Preserve,
4871 content_ref: None,
4872 }],
4873 node_retires: vec![],
4874 edges: vec![],
4875 edge_retires: vec![],
4876 chunks: vec![ChunkInsert {
4877 id: String::new(),
4878 node_logical_id: "lg-1".to_owned(),
4879 text_content: "some text".to_owned(),
4880 byte_start: None,
4881 byte_end: None,
4882 content_hash: None,
4883 }],
4884 runs: vec![],
4885 steps: vec![],
4886 actions: vec![],
4887 optional_backfills: vec![],
4888 vec_inserts: vec![],
4889 operational_writes: vec![],
4890 });
4891
4892 assert!(
4893 matches!(result, Err(EngineError::InvalidWrite(_))),
4894 "empty chunk id must be rejected"
4895 );
4896 }
4897
4898 #[test]
4901 fn writer_receipt_warns_on_step_without_source_ref() {
4902 let db = NamedTempFile::new().expect("temporary db");
4903 let writer = WriterActor::start(
4904 db.path(),
4905 Arc::new(SchemaManager::new()),
4906 ProvenanceMode::Warn,
4907 Arc::new(TelemetryCounters::default()),
4908 )
4909 .expect("writer");
4910
4911 writer
4913 .submit(WriteRequest {
4914 label: "seed-run".to_owned(),
4915 nodes: vec![],
4916 node_retires: vec![],
4917 edges: vec![],
4918 edge_retires: vec![],
4919 chunks: vec![],
4920 runs: vec![RunInsert {
4921 id: "run-1".to_owned(),
4922 kind: "session".to_owned(),
4923 status: "active".to_owned(),
4924 properties: "{}".to_owned(),
4925 source_ref: Some("src-1".to_owned()),
4926 upsert: false,
4927 supersedes_id: None,
4928 }],
4929 steps: vec![],
4930 actions: vec![],
4931 optional_backfills: vec![],
4932 vec_inserts: vec![],
4933 operational_writes: vec![],
4934 })
4935 .expect("seed run");
4936
4937 let receipt = writer
4938 .submit(WriteRequest {
4939 label: "test".to_owned(),
4940 nodes: vec![],
4941 node_retires: vec![],
4942 edges: vec![],
4943 edge_retires: vec![],
4944 chunks: vec![],
4945 runs: vec![],
4946 steps: vec![StepInsert {
4947 id: "step-1".to_owned(),
4948 run_id: "run-1".to_owned(),
4949 kind: "llm_call".to_owned(),
4950 status: "completed".to_owned(),
4951 properties: "{}".to_owned(),
4952 source_ref: None,
4953 upsert: false,
4954 supersedes_id: None,
4955 }],
4956 actions: vec![],
4957 optional_backfills: vec![],
4958 vec_inserts: vec![],
4959 operational_writes: vec![],
4960 })
4961 .expect("write");
4962
4963 assert!(
4964 !receipt.provenance_warnings.is_empty(),
4965 "step insert without source_ref must emit a provenance warning"
4966 );
4967 }
4968
4969 #[test]
4970 fn writer_receipt_warns_on_action_without_source_ref() {
4971 let db = NamedTempFile::new().expect("temporary db");
4972 let writer = WriterActor::start(
4973 db.path(),
4974 Arc::new(SchemaManager::new()),
4975 ProvenanceMode::Warn,
4976 Arc::new(TelemetryCounters::default()),
4977 )
4978 .expect("writer");
4979
4980 writer
4982 .submit(WriteRequest {
4983 label: "seed".to_owned(),
4984 nodes: vec![],
4985 node_retires: vec![],
4986 edges: vec![],
4987 edge_retires: vec![],
4988 chunks: vec![],
4989 runs: vec![RunInsert {
4990 id: "run-1".to_owned(),
4991 kind: "session".to_owned(),
4992 status: "active".to_owned(),
4993 properties: "{}".to_owned(),
4994 source_ref: Some("src-1".to_owned()),
4995 upsert: false,
4996 supersedes_id: None,
4997 }],
4998 steps: vec![StepInsert {
4999 id: "step-1".to_owned(),
5000 run_id: "run-1".to_owned(),
5001 kind: "llm_call".to_owned(),
5002 status: "completed".to_owned(),
5003 properties: "{}".to_owned(),
5004 source_ref: Some("src-1".to_owned()),
5005 upsert: false,
5006 supersedes_id: None,
5007 }],
5008 actions: vec![],
5009 optional_backfills: vec![],
5010 vec_inserts: vec![],
5011 operational_writes: vec![],
5012 })
5013 .expect("seed");
5014
5015 let receipt = writer
5016 .submit(WriteRequest {
5017 label: "test".to_owned(),
5018 nodes: vec![],
5019 node_retires: vec![],
5020 edges: vec![],
5021 edge_retires: vec![],
5022 chunks: vec![],
5023 runs: vec![],
5024 steps: vec![],
5025 actions: vec![ActionInsert {
5026 id: "action-1".to_owned(),
5027 step_id: "step-1".to_owned(),
5028 kind: "tool_call".to_owned(),
5029 status: "completed".to_owned(),
5030 properties: "{}".to_owned(),
5031 source_ref: None,
5032 upsert: false,
5033 supersedes_id: None,
5034 }],
5035 optional_backfills: vec![],
5036 vec_inserts: vec![],
5037 operational_writes: vec![],
5038 })
5039 .expect("write");
5040
5041 assert!(
5042 !receipt.provenance_warnings.is_empty(),
5043 "action insert without source_ref must emit a provenance warning"
5044 );
5045 }
5046
5047 #[test]
5048 fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5049 let db = NamedTempFile::new().expect("temporary db");
5050 let writer = WriterActor::start(
5051 db.path(),
5052 Arc::new(SchemaManager::new()),
5053 ProvenanceMode::Warn,
5054 Arc::new(TelemetryCounters::default()),
5055 )
5056 .expect("writer");
5057
5058 let receipt = writer
5059 .submit(WriteRequest {
5060 label: "test".to_owned(),
5061 nodes: vec![NodeInsert {
5062 row_id: "row-1".to_owned(),
5063 logical_id: "node-1".to_owned(),
5064 kind: "Note".to_owned(),
5065 properties: "{}".to_owned(),
5066 source_ref: Some("src-1".to_owned()),
5067 upsert: false,
5068 chunk_policy: ChunkPolicy::Preserve,
5069 content_ref: None,
5070 }],
5071 node_retires: vec![],
5072 edges: vec![],
5073 edge_retires: vec![],
5074 chunks: vec![],
5075 runs: vec![RunInsert {
5076 id: "run-1".to_owned(),
5077 kind: "session".to_owned(),
5078 status: "active".to_owned(),
5079 properties: "{}".to_owned(),
5080 source_ref: Some("src-1".to_owned()),
5081 upsert: false,
5082 supersedes_id: None,
5083 }],
5084 steps: vec![StepInsert {
5085 id: "step-1".to_owned(),
5086 run_id: "run-1".to_owned(),
5087 kind: "llm_call".to_owned(),
5088 status: "completed".to_owned(),
5089 properties: "{}".to_owned(),
5090 source_ref: Some("src-1".to_owned()),
5091 upsert: false,
5092 supersedes_id: None,
5093 }],
5094 actions: vec![ActionInsert {
5095 id: "action-1".to_owned(),
5096 step_id: "step-1".to_owned(),
5097 kind: "tool_call".to_owned(),
5098 status: "completed".to_owned(),
5099 properties: "{}".to_owned(),
5100 source_ref: Some("src-1".to_owned()),
5101 upsert: false,
5102 supersedes_id: None,
5103 }],
5104 optional_backfills: vec![],
5105 vec_inserts: vec![],
5106 operational_writes: vec![],
5107 })
5108 .expect("write");
5109
5110 assert!(
5111 receipt.provenance_warnings.is_empty(),
5112 "no warnings expected when all types have source_ref; got: {:?}",
5113 receipt.provenance_warnings
5114 );
5115 }
5116
5117 #[test]
5120 fn default_provenance_mode_is_warn() {
5121 let db = NamedTempFile::new().expect("temporary db");
5123 let writer = WriterActor::start(
5124 db.path(),
5125 Arc::new(SchemaManager::new()),
5126 ProvenanceMode::default(),
5127 Arc::new(TelemetryCounters::default()),
5128 )
5129 .expect("writer");
5130
5131 let receipt = writer
5132 .submit(WriteRequest {
5133 label: "test".to_owned(),
5134 nodes: vec![NodeInsert {
5135 row_id: "row-1".to_owned(),
5136 logical_id: "node-1".to_owned(),
5137 kind: "Note".to_owned(),
5138 properties: "{}".to_owned(),
5139 source_ref: None,
5140 upsert: false,
5141 chunk_policy: ChunkPolicy::Preserve,
5142 content_ref: None,
5143 }],
5144 node_retires: vec![],
5145 edges: vec![],
5146 edge_retires: vec![],
5147 chunks: vec![],
5148 runs: vec![],
5149 steps: vec![],
5150 actions: vec![],
5151 optional_backfills: vec![],
5152 vec_inserts: vec![],
5153 operational_writes: vec![],
5154 })
5155 .expect("Warn mode must not reject missing source_ref");
5156
5157 assert!(
5158 !receipt.provenance_warnings.is_empty(),
5159 "Warn mode must emit a warning instead of rejecting"
5160 );
5161 }
5162
5163 #[test]
5164 fn require_mode_rejects_node_without_source_ref() {
5165 let db = NamedTempFile::new().expect("temporary db");
5166 let writer = WriterActor::start(
5167 db.path(),
5168 Arc::new(SchemaManager::new()),
5169 ProvenanceMode::Require,
5170 Arc::new(TelemetryCounters::default()),
5171 )
5172 .expect("writer");
5173
5174 let result = writer.submit(WriteRequest {
5175 label: "test".to_owned(),
5176 nodes: vec![NodeInsert {
5177 row_id: "row-1".to_owned(),
5178 logical_id: "node-1".to_owned(),
5179 kind: "Note".to_owned(),
5180 properties: "{}".to_owned(),
5181 source_ref: None,
5182 upsert: false,
5183 chunk_policy: ChunkPolicy::Preserve,
5184 content_ref: None,
5185 }],
5186 node_retires: vec![],
5187 edges: vec![],
5188 edge_retires: vec![],
5189 chunks: vec![],
5190 runs: vec![],
5191 steps: vec![],
5192 actions: vec![],
5193 optional_backfills: vec![],
5194 vec_inserts: vec![],
5195 operational_writes: vec![],
5196 });
5197
5198 assert!(
5199 matches!(result, Err(EngineError::InvalidWrite(_))),
5200 "Require mode must reject node without source_ref"
5201 );
5202 }
5203
5204 #[test]
5205 fn require_mode_accepts_node_with_source_ref() {
5206 let db = NamedTempFile::new().expect("temporary db");
5207 let writer = WriterActor::start(
5208 db.path(),
5209 Arc::new(SchemaManager::new()),
5210 ProvenanceMode::Require,
5211 Arc::new(TelemetryCounters::default()),
5212 )
5213 .expect("writer");
5214
5215 let result = writer.submit(WriteRequest {
5216 label: "test".to_owned(),
5217 nodes: vec![NodeInsert {
5218 row_id: "row-1".to_owned(),
5219 logical_id: "node-1".to_owned(),
5220 kind: "Note".to_owned(),
5221 properties: "{}".to_owned(),
5222 source_ref: Some("src-1".to_owned()),
5223 upsert: false,
5224 chunk_policy: ChunkPolicy::Preserve,
5225 content_ref: None,
5226 }],
5227 node_retires: vec![],
5228 edges: vec![],
5229 edge_retires: vec![],
5230 chunks: vec![],
5231 runs: vec![],
5232 steps: vec![],
5233 actions: vec![],
5234 optional_backfills: vec![],
5235 vec_inserts: vec![],
5236 operational_writes: vec![],
5237 });
5238
5239 assert!(
5240 result.is_ok(),
5241 "Require mode must accept node with source_ref"
5242 );
5243 }
5244
5245 #[test]
5246 fn require_mode_rejects_edge_without_source_ref() {
5247 let db = NamedTempFile::new().expect("temporary db");
5248 let writer = WriterActor::start(
5249 db.path(),
5250 Arc::new(SchemaManager::new()),
5251 ProvenanceMode::Require,
5252 Arc::new(TelemetryCounters::default()),
5253 )
5254 .expect("writer");
5255
5256 let result = writer.submit(WriteRequest {
5258 label: "test".to_owned(),
5259 nodes: vec![
5260 NodeInsert {
5261 row_id: "row-a".to_owned(),
5262 logical_id: "node-a".to_owned(),
5263 kind: "Note".to_owned(),
5264 properties: "{}".to_owned(),
5265 source_ref: Some("src-1".to_owned()),
5266 upsert: false,
5267 chunk_policy: ChunkPolicy::Preserve,
5268 content_ref: None,
5269 },
5270 NodeInsert {
5271 row_id: "row-b".to_owned(),
5272 logical_id: "node-b".to_owned(),
5273 kind: "Note".to_owned(),
5274 properties: "{}".to_owned(),
5275 source_ref: Some("src-1".to_owned()),
5276 upsert: false,
5277 chunk_policy: ChunkPolicy::Preserve,
5278 content_ref: None,
5279 },
5280 ],
5281 node_retires: vec![],
5282 edges: vec![EdgeInsert {
5283 row_id: "edge-row-1".to_owned(),
5284 logical_id: "edge-1".to_owned(),
5285 source_logical_id: "node-a".to_owned(),
5286 target_logical_id: "node-b".to_owned(),
5287 kind: "LINKS_TO".to_owned(),
5288 properties: "{}".to_owned(),
5289 source_ref: None,
5290 upsert: false,
5291 }],
5292 edge_retires: vec![],
5293 chunks: vec![],
5294 runs: vec![],
5295 steps: vec![],
5296 actions: vec![],
5297 optional_backfills: vec![],
5298 vec_inserts: vec![],
5299 operational_writes: vec![],
5300 });
5301
5302 assert!(
5303 matches!(result, Err(EngineError::InvalidWrite(_))),
5304 "Require mode must reject edge without source_ref"
5305 );
5306 }
5307
5308 #[test]
5311 fn fts_row_has_correct_kind_from_co_submitted_node() {
5312 let db = NamedTempFile::new().expect("temporary db");
5313 let writer = WriterActor::start(
5314 db.path(),
5315 Arc::new(SchemaManager::new()),
5316 ProvenanceMode::Warn,
5317 Arc::new(TelemetryCounters::default()),
5318 )
5319 .expect("writer");
5320
5321 writer
5322 .submit(WriteRequest {
5323 label: "test".to_owned(),
5324 nodes: vec![NodeInsert {
5325 row_id: "row-1".to_owned(),
5326 logical_id: "node-1".to_owned(),
5327 kind: "Meeting".to_owned(),
5328 properties: "{}".to_owned(),
5329 source_ref: Some("src-1".to_owned()),
5330 upsert: false,
5331 chunk_policy: ChunkPolicy::Preserve,
5332 content_ref: None,
5333 }],
5334 node_retires: vec![],
5335 edges: vec![],
5336 edge_retires: vec![],
5337 chunks: vec![ChunkInsert {
5338 id: "chunk-1".to_owned(),
5339 node_logical_id: "node-1".to_owned(),
5340 text_content: "some text".to_owned(),
5341 byte_start: None,
5342 byte_end: None,
5343 content_hash: None,
5344 }],
5345 runs: vec![],
5346 steps: vec![],
5347 actions: vec![],
5348 optional_backfills: vec![],
5349 vec_inserts: vec![],
5350 operational_writes: vec![],
5351 })
5352 .expect("write");
5353
5354 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5355 let kind: String = conn
5356 .query_row(
5357 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5358 [],
5359 |row| row.get(0),
5360 )
5361 .expect("fts row");
5362
5363 assert_eq!(kind, "Meeting");
5364 }
5365
5366 #[test]
5367 fn fts_row_has_correct_text_content() {
5368 let db = NamedTempFile::new().expect("temporary db");
5369 let writer = WriterActor::start(
5370 db.path(),
5371 Arc::new(SchemaManager::new()),
5372 ProvenanceMode::Warn,
5373 Arc::new(TelemetryCounters::default()),
5374 )
5375 .expect("writer");
5376
5377 writer
5378 .submit(WriteRequest {
5379 label: "test".to_owned(),
5380 nodes: vec![NodeInsert {
5381 row_id: "row-1".to_owned(),
5382 logical_id: "node-1".to_owned(),
5383 kind: "Note".to_owned(),
5384 properties: "{}".to_owned(),
5385 source_ref: Some("src-1".to_owned()),
5386 upsert: false,
5387 chunk_policy: ChunkPolicy::Preserve,
5388 content_ref: None,
5389 }],
5390 node_retires: vec![],
5391 edges: vec![],
5392 edge_retires: vec![],
5393 chunks: vec![ChunkInsert {
5394 id: "chunk-1".to_owned(),
5395 node_logical_id: "node-1".to_owned(),
5396 text_content: "exactly this text".to_owned(),
5397 byte_start: None,
5398 byte_end: None,
5399 content_hash: None,
5400 }],
5401 runs: vec![],
5402 steps: vec![],
5403 actions: vec![],
5404 optional_backfills: vec![],
5405 vec_inserts: vec![],
5406 operational_writes: vec![],
5407 })
5408 .expect("write");
5409
5410 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5411 let text: String = conn
5412 .query_row(
5413 "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5414 [],
5415 |row| row.get(0),
5416 )
5417 .expect("fts row");
5418
5419 assert_eq!(text, "exactly this text");
5420 }
5421
5422 #[test]
5423 fn fts_row_has_correct_kind_from_pre_existing_node() {
5424 let db = NamedTempFile::new().expect("temporary db");
5425 let writer = WriterActor::start(
5426 db.path(),
5427 Arc::new(SchemaManager::new()),
5428 ProvenanceMode::Warn,
5429 Arc::new(TelemetryCounters::default()),
5430 )
5431 .expect("writer");
5432
5433 writer
5435 .submit(WriteRequest {
5436 label: "r1".to_owned(),
5437 nodes: vec![NodeInsert {
5438 row_id: "row-1".to_owned(),
5439 logical_id: "node-1".to_owned(),
5440 kind: "Document".to_owned(),
5441 properties: "{}".to_owned(),
5442 source_ref: Some("src-1".to_owned()),
5443 upsert: false,
5444 chunk_policy: ChunkPolicy::Preserve,
5445 content_ref: None,
5446 }],
5447 node_retires: vec![],
5448 edges: vec![],
5449 edge_retires: vec![],
5450 chunks: vec![],
5451 runs: vec![],
5452 steps: vec![],
5453 actions: vec![],
5454 optional_backfills: vec![],
5455 vec_inserts: vec![],
5456 operational_writes: vec![],
5457 })
5458 .expect("r1 write");
5459
5460 writer
5462 .submit(WriteRequest {
5463 label: "r2".to_owned(),
5464 nodes: vec![],
5465 node_retires: vec![],
5466 edges: vec![],
5467 edge_retires: vec![],
5468 chunks: vec![ChunkInsert {
5469 id: "chunk-1".to_owned(),
5470 node_logical_id: "node-1".to_owned(),
5471 text_content: "some text".to_owned(),
5472 byte_start: None,
5473 byte_end: None,
5474 content_hash: None,
5475 }],
5476 runs: vec![],
5477 steps: vec![],
5478 actions: vec![],
5479 optional_backfills: vec![],
5480 vec_inserts: vec![],
5481 operational_writes: vec![],
5482 })
5483 .expect("r2 write");
5484
5485 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5486 let kind: String = conn
5487 .query_row(
5488 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5489 [],
5490 |row| row.get(0),
5491 )
5492 .expect("fts row");
5493
5494 assert_eq!(kind, "Document");
5495 }
5496
5497 #[test]
5498 fn fts_derives_rows_for_multiple_chunks_per_node() {
5499 let db = NamedTempFile::new().expect("temporary db");
5500 let writer = WriterActor::start(
5501 db.path(),
5502 Arc::new(SchemaManager::new()),
5503 ProvenanceMode::Warn,
5504 Arc::new(TelemetryCounters::default()),
5505 )
5506 .expect("writer");
5507
5508 writer
5509 .submit(WriteRequest {
5510 label: "test".to_owned(),
5511 nodes: vec![NodeInsert {
5512 row_id: "row-1".to_owned(),
5513 logical_id: "node-1".to_owned(),
5514 kind: "Meeting".to_owned(),
5515 properties: "{}".to_owned(),
5516 source_ref: Some("src-1".to_owned()),
5517 upsert: false,
5518 chunk_policy: ChunkPolicy::Preserve,
5519 content_ref: None,
5520 }],
5521 node_retires: vec![],
5522 edges: vec![],
5523 edge_retires: vec![],
5524 chunks: vec![
5525 ChunkInsert {
5526 id: "chunk-a".to_owned(),
5527 node_logical_id: "node-1".to_owned(),
5528 text_content: "intro".to_owned(),
5529 byte_start: None,
5530 byte_end: None,
5531 content_hash: None,
5532 },
5533 ChunkInsert {
5534 id: "chunk-b".to_owned(),
5535 node_logical_id: "node-1".to_owned(),
5536 text_content: "body".to_owned(),
5537 byte_start: None,
5538 byte_end: None,
5539 content_hash: None,
5540 },
5541 ChunkInsert {
5542 id: "chunk-c".to_owned(),
5543 node_logical_id: "node-1".to_owned(),
5544 text_content: "conclusion".to_owned(),
5545 byte_start: None,
5546 byte_end: None,
5547 content_hash: None,
5548 },
5549 ],
5550 runs: vec![],
5551 steps: vec![],
5552 actions: vec![],
5553 optional_backfills: vec![],
5554 vec_inserts: vec![],
5555 operational_writes: vec![],
5556 })
5557 .expect("write");
5558
5559 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5560 let count: i64 = conn
5561 .query_row(
5562 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5563 [],
5564 |row| row.get(0),
5565 )
5566 .expect("fts count");
5567
5568 assert_eq!(count, 3, "three chunks must produce three FTS rows");
5569 }
5570
5571 #[test]
5572 fn fts_resolves_mixed_fast_and_db_paths() {
5573 let db = NamedTempFile::new().expect("temporary db");
5574 let writer = WriterActor::start(
5575 db.path(),
5576 Arc::new(SchemaManager::new()),
5577 ProvenanceMode::Warn,
5578 Arc::new(TelemetryCounters::default()),
5579 )
5580 .expect("writer");
5581
5582 writer
5584 .submit(WriteRequest {
5585 label: "seed".to_owned(),
5586 nodes: vec![NodeInsert {
5587 row_id: "row-existing".to_owned(),
5588 logical_id: "existing-node".to_owned(),
5589 kind: "Archive".to_owned(),
5590 properties: "{}".to_owned(),
5591 source_ref: Some("src-1".to_owned()),
5592 upsert: false,
5593 chunk_policy: ChunkPolicy::Preserve,
5594 content_ref: None,
5595 }],
5596 node_retires: vec![],
5597 edges: vec![],
5598 edge_retires: vec![],
5599 chunks: vec![],
5600 runs: vec![],
5601 steps: vec![],
5602 actions: vec![],
5603 optional_backfills: vec![],
5604 vec_inserts: vec![],
5605 operational_writes: vec![],
5606 })
5607 .expect("seed");
5608
5609 writer
5611 .submit(WriteRequest {
5612 label: "mixed".to_owned(),
5613 nodes: vec![NodeInsert {
5614 row_id: "row-new".to_owned(),
5615 logical_id: "new-node".to_owned(),
5616 kind: "Inbox".to_owned(),
5617 properties: "{}".to_owned(),
5618 source_ref: Some("src-2".to_owned()),
5619 upsert: false,
5620 chunk_policy: ChunkPolicy::Preserve,
5621 content_ref: None,
5622 }],
5623 node_retires: vec![],
5624 edges: vec![],
5625 edge_retires: vec![],
5626 chunks: vec![
5627 ChunkInsert {
5628 id: "chunk-fast".to_owned(),
5629 node_logical_id: "new-node".to_owned(),
5630 text_content: "new content".to_owned(),
5631 byte_start: None,
5632 byte_end: None,
5633 content_hash: None,
5634 },
5635 ChunkInsert {
5636 id: "chunk-db".to_owned(),
5637 node_logical_id: "existing-node".to_owned(),
5638 text_content: "archive content".to_owned(),
5639 byte_start: None,
5640 byte_end: None,
5641 content_hash: None,
5642 },
5643 ],
5644 runs: vec![],
5645 steps: vec![],
5646 actions: vec![],
5647 optional_backfills: vec![],
5648 vec_inserts: vec![],
5649 operational_writes: vec![],
5650 })
5651 .expect("mixed write");
5652
5653 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5654 let fast_kind: String = conn
5655 .query_row(
5656 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
5657 [],
5658 |row| row.get(0),
5659 )
5660 .expect("fast path fts row");
5661 let db_kind: String = conn
5662 .query_row(
5663 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
5664 [],
5665 |row| row.get(0),
5666 )
5667 .expect("db path fts row");
5668
5669 assert_eq!(fast_kind, "Inbox");
5670 assert_eq!(db_kind, "Archive");
5671 }
5672
5673 #[test]
5674 fn prepare_write_rejects_empty_chunk_text() {
5675 let db = NamedTempFile::new().expect("temporary db");
5676 let writer = WriterActor::start(
5677 db.path(),
5678 Arc::new(SchemaManager::new()),
5679 ProvenanceMode::Warn,
5680 Arc::new(TelemetryCounters::default()),
5681 )
5682 .expect("writer");
5683
5684 let result = writer.submit(WriteRequest {
5685 label: "test".to_owned(),
5686 nodes: vec![NodeInsert {
5687 row_id: "row-1".to_owned(),
5688 logical_id: "node-1".to_owned(),
5689 kind: "Note".to_owned(),
5690 properties: "{}".to_owned(),
5691 source_ref: None,
5692 upsert: false,
5693 chunk_policy: ChunkPolicy::Preserve,
5694 content_ref: None,
5695 }],
5696 node_retires: vec![],
5697 edges: vec![],
5698 edge_retires: vec![],
5699 chunks: vec![ChunkInsert {
5700 id: "chunk-1".to_owned(),
5701 node_logical_id: "node-1".to_owned(),
5702 text_content: String::new(),
5703 byte_start: None,
5704 byte_end: None,
5705 content_hash: None,
5706 }],
5707 runs: vec![],
5708 steps: vec![],
5709 actions: vec![],
5710 optional_backfills: vec![],
5711 vec_inserts: vec![],
5712 operational_writes: vec![],
5713 });
5714
5715 assert!(
5716 matches!(result, Err(EngineError::InvalidWrite(_))),
5717 "empty text_content must be rejected"
5718 );
5719 }
5720
5721 #[test]
5722 fn receipt_reports_zero_backfills_when_none_submitted() {
5723 let db = NamedTempFile::new().expect("temporary db");
5724 let writer = WriterActor::start(
5725 db.path(),
5726 Arc::new(SchemaManager::new()),
5727 ProvenanceMode::Warn,
5728 Arc::new(TelemetryCounters::default()),
5729 )
5730 .expect("writer");
5731
5732 let receipt = writer
5733 .submit(WriteRequest {
5734 label: "test".to_owned(),
5735 nodes: vec![NodeInsert {
5736 row_id: "row-1".to_owned(),
5737 logical_id: "node-1".to_owned(),
5738 kind: "Note".to_owned(),
5739 properties: "{}".to_owned(),
5740 source_ref: Some("src-1".to_owned()),
5741 upsert: false,
5742 chunk_policy: ChunkPolicy::Preserve,
5743 content_ref: None,
5744 }],
5745 node_retires: vec![],
5746 edges: vec![],
5747 edge_retires: vec![],
5748 chunks: vec![],
5749 runs: vec![],
5750 steps: vec![],
5751 actions: vec![],
5752 optional_backfills: vec![],
5753 vec_inserts: vec![],
5754 operational_writes: vec![],
5755 })
5756 .expect("write");
5757
5758 assert_eq!(receipt.optional_backfill_count, 0);
5759 }
5760
5761 #[test]
5762 fn receipt_reports_correct_backfill_count() {
5763 let db = NamedTempFile::new().expect("temporary db");
5764 let writer = WriterActor::start(
5765 db.path(),
5766 Arc::new(SchemaManager::new()),
5767 ProvenanceMode::Warn,
5768 Arc::new(TelemetryCounters::default()),
5769 )
5770 .expect("writer");
5771
5772 let receipt = writer
5773 .submit(WriteRequest {
5774 label: "test".to_owned(),
5775 nodes: vec![NodeInsert {
5776 row_id: "row-1".to_owned(),
5777 logical_id: "node-1".to_owned(),
5778 kind: "Note".to_owned(),
5779 properties: "{}".to_owned(),
5780 source_ref: Some("src-1".to_owned()),
5781 upsert: false,
5782 chunk_policy: ChunkPolicy::Preserve,
5783 content_ref: None,
5784 }],
5785 node_retires: vec![],
5786 edges: vec![],
5787 edge_retires: vec![],
5788 chunks: vec![],
5789 runs: vec![],
5790 steps: vec![],
5791 actions: vec![],
5792 optional_backfills: vec![
5793 OptionalProjectionTask {
5794 target: ProjectionTarget::Fts,
5795 payload: "p1".to_owned(),
5796 },
5797 OptionalProjectionTask {
5798 target: ProjectionTarget::Vec,
5799 payload: "p2".to_owned(),
5800 },
5801 OptionalProjectionTask {
5802 target: ProjectionTarget::All,
5803 payload: "p3".to_owned(),
5804 },
5805 ],
5806 vec_inserts: vec![],
5807 operational_writes: vec![],
5808 })
5809 .expect("write");
5810
5811 assert_eq!(receipt.optional_backfill_count, 3);
5812 }
5813
5814 #[test]
5815 fn backfill_tasks_are_not_executed_during_write() {
5816 let db = NamedTempFile::new().expect("temporary db");
5817 let writer = WriterActor::start(
5818 db.path(),
5819 Arc::new(SchemaManager::new()),
5820 ProvenanceMode::Warn,
5821 Arc::new(TelemetryCounters::default()),
5822 )
5823 .expect("writer");
5824
5825 writer
5828 .submit(WriteRequest {
5829 label: "test".to_owned(),
5830 nodes: vec![NodeInsert {
5831 row_id: "row-1".to_owned(),
5832 logical_id: "node-1".to_owned(),
5833 kind: "Note".to_owned(),
5834 properties: "{}".to_owned(),
5835 source_ref: Some("src-1".to_owned()),
5836 upsert: false,
5837 chunk_policy: ChunkPolicy::Preserve,
5838 content_ref: None,
5839 }],
5840 node_retires: vec![],
5841 edges: vec![],
5842 edge_retires: vec![],
5843 chunks: vec![ChunkInsert {
5844 id: "chunk-1".to_owned(),
5845 node_logical_id: "node-1".to_owned(),
5846 text_content: "required text".to_owned(),
5847 byte_start: None,
5848 byte_end: None,
5849 content_hash: None,
5850 }],
5851 runs: vec![],
5852 steps: vec![],
5853 actions: vec![],
5854 optional_backfills: vec![OptionalProjectionTask {
5855 target: ProjectionTarget::Fts,
5856 payload: "backfill-payload".to_owned(),
5857 }],
5858 vec_inserts: vec![],
5859 operational_writes: vec![],
5860 })
5861 .expect("write");
5862
5863 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5864 let count: i64 = conn
5865 .query_row(
5866 "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5867 [],
5868 |row| row.get(0),
5869 )
5870 .expect("fts count");
5871
5872 assert_eq!(count, 1, "backfill task must not create extra FTS rows");
5873 }
5874
5875 #[test]
5876 fn fts_row_uses_new_kind_after_node_replace() {
5877 let db = NamedTempFile::new().expect("temporary db");
5878 let writer = WriterActor::start(
5879 db.path(),
5880 Arc::new(SchemaManager::new()),
5881 ProvenanceMode::Warn,
5882 Arc::new(TelemetryCounters::default()),
5883 )
5884 .expect("writer");
5885
5886 writer
5888 .submit(WriteRequest {
5889 label: "v1".to_owned(),
5890 nodes: vec![NodeInsert {
5891 row_id: "row-1".to_owned(),
5892 logical_id: "node-1".to_owned(),
5893 kind: "Note".to_owned(),
5894 properties: "{}".to_owned(),
5895 source_ref: Some("src-1".to_owned()),
5896 upsert: false,
5897 chunk_policy: ChunkPolicy::Preserve,
5898 content_ref: None,
5899 }],
5900 node_retires: vec![],
5901 edges: vec![],
5902 edge_retires: vec![],
5903 chunks: vec![ChunkInsert {
5904 id: "chunk-v1".to_owned(),
5905 node_logical_id: "node-1".to_owned(),
5906 text_content: "original".to_owned(),
5907 byte_start: None,
5908 byte_end: None,
5909 content_hash: None,
5910 }],
5911 runs: vec![],
5912 steps: vec![],
5913 actions: vec![],
5914 optional_backfills: vec![],
5915 vec_inserts: vec![],
5916 operational_writes: vec![],
5917 })
5918 .expect("v1 write");
5919
5920 writer
5922 .submit(WriteRequest {
5923 label: "v2".to_owned(),
5924 nodes: vec![NodeInsert {
5925 row_id: "row-2".to_owned(),
5926 logical_id: "node-1".to_owned(),
5927 kind: "Meeting".to_owned(),
5928 properties: "{}".to_owned(),
5929 source_ref: Some("src-2".to_owned()),
5930 upsert: true,
5931 chunk_policy: ChunkPolicy::Replace,
5932 content_ref: None,
5933 }],
5934 node_retires: vec![],
5935 edges: vec![],
5936 edge_retires: vec![],
5937 chunks: vec![ChunkInsert {
5938 id: "chunk-v2".to_owned(),
5939 node_logical_id: "node-1".to_owned(),
5940 text_content: "updated".to_owned(),
5941 byte_start: None,
5942 byte_end: None,
5943 content_hash: None,
5944 }],
5945 runs: vec![],
5946 steps: vec![],
5947 actions: vec![],
5948 optional_backfills: vec![],
5949 vec_inserts: vec![],
5950 operational_writes: vec![],
5951 })
5952 .expect("v2 write");
5953
5954 let conn = rusqlite::Connection::open(db.path()).expect("conn");
5955
5956 let old_count: i64 = conn
5958 .query_row(
5959 "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
5960 [],
5961 |row| row.get(0),
5962 )
5963 .expect("old fts count");
5964 assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
5965
5966 let new_kind: String = conn
5968 .query_row(
5969 "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
5970 [],
5971 |row| row.get(0),
5972 )
5973 .expect("new fts row");
5974 assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
5975 }
5976
5977 #[test]
5980 fn vec_insert_empty_chunk_id_is_rejected() {
5981 let db = NamedTempFile::new().expect("temporary db");
5982 let writer = WriterActor::start(
5983 db.path(),
5984 Arc::new(SchemaManager::new()),
5985 ProvenanceMode::Warn,
5986 Arc::new(TelemetryCounters::default()),
5987 )
5988 .expect("writer");
5989 let result = writer.submit(WriteRequest {
5990 label: "vec-test".to_owned(),
5991 nodes: vec![],
5992 node_retires: vec![],
5993 edges: vec![],
5994 edge_retires: vec![],
5995 chunks: vec![],
5996 runs: vec![],
5997 steps: vec![],
5998 actions: vec![],
5999 optional_backfills: vec![],
6000 vec_inserts: vec![VecInsert {
6001 chunk_id: String::new(),
6002 embedding: vec![0.1, 0.2, 0.3],
6003 }],
6004 operational_writes: vec![],
6005 });
6006 assert!(
6007 matches!(result, Err(EngineError::InvalidWrite(_))),
6008 "empty chunk_id in VecInsert must be rejected"
6009 );
6010 }
6011
6012 #[test]
6013 fn vec_insert_empty_embedding_is_rejected() {
6014 let db = NamedTempFile::new().expect("temporary db");
6015 let writer = WriterActor::start(
6016 db.path(),
6017 Arc::new(SchemaManager::new()),
6018 ProvenanceMode::Warn,
6019 Arc::new(TelemetryCounters::default()),
6020 )
6021 .expect("writer");
6022 let result = writer.submit(WriteRequest {
6023 label: "vec-test".to_owned(),
6024 nodes: vec![],
6025 node_retires: vec![],
6026 edges: vec![],
6027 edge_retires: vec![],
6028 chunks: vec![],
6029 runs: vec![],
6030 steps: vec![],
6031 actions: vec![],
6032 optional_backfills: vec![],
6033 vec_inserts: vec![VecInsert {
6034 chunk_id: "chunk-1".to_owned(),
6035 embedding: vec![],
6036 }],
6037 operational_writes: vec![],
6038 });
6039 assert!(
6040 matches!(result, Err(EngineError::InvalidWrite(_))),
6041 "empty embedding in VecInsert must be rejected"
6042 );
6043 }
6044
6045 #[test]
6046 fn vec_insert_noop_without_feature() {
6047 let db = NamedTempFile::new().expect("temporary db");
6050 let writer = WriterActor::start(
6051 db.path(),
6052 Arc::new(SchemaManager::new()),
6053 ProvenanceMode::Warn,
6054 Arc::new(TelemetryCounters::default()),
6055 )
6056 .expect("writer");
6057 let result = writer.submit(WriteRequest {
6058 label: "vec-noop".to_owned(),
6059 nodes: vec![],
6060 node_retires: vec![],
6061 edges: vec![],
6062 edge_retires: vec![],
6063 chunks: vec![],
6064 runs: vec![],
6065 steps: vec![],
6066 actions: vec![],
6067 optional_backfills: vec![],
6068 vec_inserts: vec![VecInsert {
6069 chunk_id: "chunk-noop".to_owned(),
6070 embedding: vec![1.0, 2.0, 3.0],
6071 }],
6072 operational_writes: vec![],
6073 });
6074 #[cfg(not(feature = "sqlite-vec"))]
6075 result.expect("noop VecInsert without feature must succeed");
6076 #[cfg(feature = "sqlite-vec")]
6078 let _ = result;
6079 }
6080
6081 #[cfg(feature = "sqlite-vec")]
6082 #[test]
6083 fn node_retire_preserves_vec_rows_for_later_restore() {
6084 use crate::sqlite::open_connection_with_vec;
6085
6086 let db = NamedTempFile::new().expect("temporary db");
6087 let schema_manager = Arc::new(SchemaManager::new());
6088
6089 {
6090 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6091 schema_manager.bootstrap(&conn).expect("bootstrap");
6092 schema_manager
6093 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6094 .expect("ensure profile");
6095 }
6096
6097 let writer = WriterActor::start(
6098 db.path(),
6099 Arc::clone(&schema_manager),
6100 ProvenanceMode::Warn,
6101 Arc::new(TelemetryCounters::default()),
6102 )
6103 .expect("writer");
6104
6105 writer
6107 .submit(WriteRequest {
6108 label: "setup".to_owned(),
6109 nodes: vec![NodeInsert {
6110 row_id: "row-retire-vec".to_owned(),
6111 logical_id: "node-retire-vec".to_owned(),
6112 kind: "Doc".to_owned(),
6113 properties: "{}".to_owned(),
6114 source_ref: Some("src".to_owned()),
6115 upsert: false,
6116 chunk_policy: ChunkPolicy::Preserve,
6117 content_ref: None,
6118 }],
6119 node_retires: vec![],
6120 edges: vec![],
6121 edge_retires: vec![],
6122 chunks: vec![ChunkInsert {
6123 id: "chunk-retire-vec".to_owned(),
6124 node_logical_id: "node-retire-vec".to_owned(),
6125 text_content: "text".to_owned(),
6126 byte_start: None,
6127 byte_end: None,
6128 content_hash: None,
6129 }],
6130 runs: vec![],
6131 steps: vec![],
6132 actions: vec![],
6133 optional_backfills: vec![],
6134 vec_inserts: vec![VecInsert {
6135 chunk_id: "chunk-retire-vec".to_owned(),
6136 embedding: vec![0.1, 0.2, 0.3],
6137 }],
6138 operational_writes: vec![],
6139 })
6140 .expect("setup write");
6141
6142 writer
6144 .submit(WriteRequest {
6145 label: "retire".to_owned(),
6146 nodes: vec![],
6147 node_retires: vec![NodeRetire {
6148 logical_id: "node-retire-vec".to_owned(),
6149 source_ref: Some("src".to_owned()),
6150 }],
6151 edges: vec![],
6152 edge_retires: vec![],
6153 chunks: vec![],
6154 runs: vec![],
6155 steps: vec![],
6156 actions: vec![],
6157 optional_backfills: vec![],
6158 vec_inserts: vec![],
6159 operational_writes: vec![],
6160 })
6161 .expect("retire write");
6162
6163 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6164 let count: i64 = conn
6165 .query_row(
6166 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-retire-vec'",
6167 [],
6168 |row| row.get(0),
6169 )
6170 .expect("count");
6171 assert_eq!(
6172 count, 1,
6173 "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6174 );
6175 }
6176
6177 #[cfg(feature = "sqlite-vec")]
6178 #[test]
6179 fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6180 use crate::sqlite::open_connection_with_vec;
6181
6182 let db = NamedTempFile::new().expect("temporary db");
6183 let schema_manager = Arc::new(SchemaManager::new());
6184
6185 {
6186 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6187 schema_manager.bootstrap(&conn).expect("bootstrap");
6188 schema_manager
6189 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6190 .expect("ensure profile");
6191 }
6192
6193 let writer = WriterActor::start(
6194 db.path(),
6195 Arc::clone(&schema_manager),
6196 ProvenanceMode::Warn,
6197 Arc::new(TelemetryCounters::default()),
6198 )
6199 .expect("writer");
6200
6201 writer
6203 .submit(WriteRequest {
6204 label: "v1".to_owned(),
6205 nodes: vec![NodeInsert {
6206 row_id: "row-replace-v1".to_owned(),
6207 logical_id: "node-replace-vec".to_owned(),
6208 kind: "Doc".to_owned(),
6209 properties: "{}".to_owned(),
6210 source_ref: Some("src".to_owned()),
6211 upsert: false,
6212 chunk_policy: ChunkPolicy::Preserve,
6213 content_ref: None,
6214 }],
6215 node_retires: vec![],
6216 edges: vec![],
6217 edge_retires: vec![],
6218 chunks: vec![ChunkInsert {
6219 id: "chunk-replace-A".to_owned(),
6220 node_logical_id: "node-replace-vec".to_owned(),
6221 text_content: "version one".to_owned(),
6222 byte_start: None,
6223 byte_end: None,
6224 content_hash: None,
6225 }],
6226 runs: vec![],
6227 steps: vec![],
6228 actions: vec![],
6229 optional_backfills: vec![],
6230 vec_inserts: vec![VecInsert {
6231 chunk_id: "chunk-replace-A".to_owned(),
6232 embedding: vec![0.1, 0.2, 0.3],
6233 }],
6234 operational_writes: vec![],
6235 })
6236 .expect("v1 write");
6237
6238 writer
6240 .submit(WriteRequest {
6241 label: "v2".to_owned(),
6242 nodes: vec![NodeInsert {
6243 row_id: "row-replace-v2".to_owned(),
6244 logical_id: "node-replace-vec".to_owned(),
6245 kind: "Doc".to_owned(),
6246 properties: "{}".to_owned(),
6247 source_ref: Some("src".to_owned()),
6248 upsert: true,
6249 chunk_policy: ChunkPolicy::Replace,
6250 content_ref: None,
6251 }],
6252 node_retires: vec![],
6253 edges: vec![],
6254 edge_retires: vec![],
6255 chunks: vec![ChunkInsert {
6256 id: "chunk-replace-B".to_owned(),
6257 node_logical_id: "node-replace-vec".to_owned(),
6258 text_content: "version two".to_owned(),
6259 byte_start: None,
6260 byte_end: None,
6261 content_hash: None,
6262 }],
6263 runs: vec![],
6264 steps: vec![],
6265 actions: vec![],
6266 optional_backfills: vec![],
6267 vec_inserts: vec![VecInsert {
6268 chunk_id: "chunk-replace-B".to_owned(),
6269 embedding: vec![0.4, 0.5, 0.6],
6270 }],
6271 operational_writes: vec![],
6272 })
6273 .expect("v2 write");
6274
6275 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6276 let count_a: i64 = conn
6277 .query_row(
6278 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-A'",
6279 [],
6280 |row| row.get(0),
6281 )
6282 .expect("count A");
6283 let count_b: i64 = conn
6284 .query_row(
6285 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-B'",
6286 [],
6287 |row| row.get(0),
6288 )
6289 .expect("count B");
6290 assert_eq!(
6291 count_a, 0,
6292 "old vec row (chunk-A) must be deleted on Replace"
6293 );
6294 assert_eq!(
6295 count_b, 1,
6296 "new vec row (chunk-B) must be present after Replace"
6297 );
6298 }
6299
6300 #[cfg(feature = "sqlite-vec")]
6301 #[test]
6302 fn vec_insert_is_persisted_when_feature_enabled() {
6303 use crate::sqlite::open_connection_with_vec;
6304
6305 let db = NamedTempFile::new().expect("temporary db");
6306 let schema_manager = Arc::new(SchemaManager::new());
6307
6308 {
6310 let conn = open_connection_with_vec(db.path()).expect("vec connection");
6311 schema_manager.bootstrap(&conn).expect("bootstrap");
6312 schema_manager
6313 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6314 .expect("ensure profile");
6315 }
6316
6317 let writer = WriterActor::start(
6318 db.path(),
6319 Arc::clone(&schema_manager),
6320 ProvenanceMode::Warn,
6321 Arc::new(TelemetryCounters::default()),
6322 )
6323 .expect("writer");
6324
6325 writer
6326 .submit(WriteRequest {
6327 label: "vec-insert".to_owned(),
6328 nodes: vec![],
6329 node_retires: vec![],
6330 edges: vec![],
6331 edge_retires: vec![],
6332 chunks: vec![],
6333 runs: vec![],
6334 steps: vec![],
6335 actions: vec![],
6336 optional_backfills: vec![],
6337 vec_inserts: vec![VecInsert {
6338 chunk_id: "chunk-vec".to_owned(),
6339 embedding: vec![0.1, 0.2, 0.3],
6340 }],
6341 operational_writes: vec![],
6342 })
6343 .expect("vec insert write");
6344
6345 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6346 let count: i64 = conn
6347 .query_row(
6348 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-vec'",
6349 [],
6350 |row| row.get(0),
6351 )
6352 .expect("count");
6353 assert_eq!(count, 1, "VecInsert must persist a row in vec_nodes_active");
6354 }
6355
6356 #[test]
6359 fn write_request_exceeding_node_limit_is_rejected() {
6360 let nodes: Vec<NodeInsert> = (0..10_001)
6361 .map(|i| NodeInsert {
6362 row_id: format!("row-{i}"),
6363 logical_id: format!("lg-{i}"),
6364 kind: "Note".to_owned(),
6365 properties: "{}".to_owned(),
6366 source_ref: None,
6367 upsert: false,
6368 chunk_policy: ChunkPolicy::Preserve,
6369 content_ref: None,
6370 })
6371 .collect();
6372
6373 let request = WriteRequest {
6374 label: "too-many-nodes".to_owned(),
6375 nodes,
6376 node_retires: vec![],
6377 edges: vec![],
6378 edge_retires: vec![],
6379 chunks: vec![],
6380 runs: vec![],
6381 steps: vec![],
6382 actions: vec![],
6383 optional_backfills: vec![],
6384 vec_inserts: vec![],
6385 operational_writes: vec![],
6386 };
6387
6388 let result = prepare_write(request, ProvenanceMode::Warn)
6389 .map(|_| ())
6390 .map_err(|e| format!("{e}"));
6391 assert!(
6392 matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6393 "exceeding node limit must return InvalidWrite: got {result:?}"
6394 );
6395 }
6396
6397 #[test]
6398 fn write_request_exceeding_total_limit_is_rejected() {
6399 let request = WriteRequest {
6403 label: "too-many-total".to_owned(),
6404 nodes: (0..10_000)
6405 .map(|i| NodeInsert {
6406 row_id: format!("row-{i}"),
6407 logical_id: format!("lg-{i}"),
6408 kind: "Note".to_owned(),
6409 properties: "{}".to_owned(),
6410 source_ref: None,
6411 upsert: false,
6412 chunk_policy: ChunkPolicy::Preserve,
6413 content_ref: None,
6414 })
6415 .collect(),
6416 node_retires: vec![],
6417 edges: (0..10_000)
6418 .map(|i| EdgeInsert {
6419 row_id: format!("edge-row-{i}"),
6420 logical_id: format!("edge-lg-{i}"),
6421 kind: "link".to_owned(),
6422 source_logical_id: format!("lg-{i}"),
6423 target_logical_id: format!("lg-{}", i + 1),
6424 properties: "{}".to_owned(),
6425 source_ref: None,
6426 upsert: false,
6427 })
6428 .collect(),
6429 edge_retires: vec![],
6430 chunks: (0..50_000)
6431 .map(|i| ChunkInsert {
6432 id: format!("chunk-{i}"),
6433 node_logical_id: "lg-0".to_owned(),
6434 text_content: "text".to_owned(),
6435 byte_start: None,
6436 byte_end: None,
6437 content_hash: None,
6438 })
6439 .collect(),
6440 runs: vec![],
6441 steps: vec![],
6442 actions: vec![],
6443 optional_backfills: vec![],
6444 vec_inserts: (0..20_001)
6445 .map(|i| VecInsert {
6446 chunk_id: format!("vec-chunk-{i}"),
6447 embedding: vec![0.1],
6448 })
6449 .collect(),
6450 operational_writes: (0..10_000)
6451 .map(|i| OperationalWrite::Append {
6452 collection: format!("col-{i}"),
6453 record_key: format!("key-{i}"),
6454 payload_json: "{}".to_owned(),
6455 source_ref: None,
6456 })
6457 .collect(),
6458 };
6459
6460 let result = prepare_write(request, ProvenanceMode::Warn)
6461 .map(|_| ())
6462 .map_err(|e| format!("{e}"));
6463 assert!(
6464 matches!(result, Err(ref msg) if msg.contains("too many total items")),
6465 "exceeding total item limit must return InvalidWrite: got {result:?}"
6466 );
6467 }
6468
6469 #[test]
6470 fn write_request_within_limits_succeeds() {
6471 let db = NamedTempFile::new().expect("temporary db");
6472 let writer = WriterActor::start(
6473 db.path(),
6474 Arc::new(SchemaManager::new()),
6475 ProvenanceMode::Warn,
6476 Arc::new(TelemetryCounters::default()),
6477 )
6478 .expect("writer");
6479
6480 let result = writer.submit(WriteRequest {
6481 label: "within-limits".to_owned(),
6482 nodes: vec![NodeInsert {
6483 row_id: "row-1".to_owned(),
6484 logical_id: "lg-1".to_owned(),
6485 kind: "Note".to_owned(),
6486 properties: "{}".to_owned(),
6487 source_ref: None,
6488 upsert: false,
6489 chunk_policy: ChunkPolicy::Preserve,
6490 content_ref: None,
6491 }],
6492 node_retires: vec![],
6493 edges: vec![],
6494 edge_retires: vec![],
6495 chunks: vec![],
6496 runs: vec![],
6497 steps: vec![],
6498 actions: vec![],
6499 optional_backfills: vec![],
6500 vec_inserts: vec![],
6501 operational_writes: vec![],
6502 });
6503
6504 assert!(
6505 result.is_ok(),
6506 "write request within limits must succeed: got {result:?}"
6507 );
6508 }
6509
6510 #[test]
6511 fn property_fts_rows_created_on_node_insert() {
6512 let db = NamedTempFile::new().expect("temporary db");
6513 let schema = Arc::new(SchemaManager::new());
6515 {
6516 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6517 schema.bootstrap(&conn).expect("bootstrap");
6518 conn.execute(
6519 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6520 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
6521 [],
6522 )
6523 .expect("register schema");
6524 }
6525 let writer = WriterActor::start(
6526 db.path(),
6527 Arc::clone(&schema),
6528 ProvenanceMode::Warn,
6529 Arc::new(TelemetryCounters::default()),
6530 )
6531 .expect("writer");
6532
6533 writer
6534 .submit(WriteRequest {
6535 label: "goal-insert".to_owned(),
6536 nodes: vec![NodeInsert {
6537 row_id: "row-1".to_owned(),
6538 logical_id: "goal-1".to_owned(),
6539 kind: "Goal".to_owned(),
6540 properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
6541 .to_owned(),
6542 source_ref: Some("src-1".to_owned()),
6543 upsert: false,
6544 chunk_policy: ChunkPolicy::Preserve,
6545 content_ref: None,
6546 }],
6547 node_retires: vec![],
6548 edges: vec![],
6549 edge_retires: vec![],
6550 chunks: vec![],
6551 runs: vec![],
6552 steps: vec![],
6553 actions: vec![],
6554 optional_backfills: vec![],
6555 vec_inserts: vec![],
6556 operational_writes: vec![],
6557 })
6558 .expect("write");
6559
6560 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6561 let text: String = conn
6562 .query_row(
6563 "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6564 [],
6565 |row| row.get(0),
6566 )
6567 .expect("property FTS row must exist");
6568 assert_eq!(text, "Ship v2 Launch the redesign");
6569 }
6570
6571 #[test]
6572 fn property_fts_rows_replaced_on_upsert() {
6573 let db = NamedTempFile::new().expect("temporary db");
6574 let schema = Arc::new(SchemaManager::new());
6575 {
6576 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6577 schema.bootstrap(&conn).expect("bootstrap");
6578 conn.execute(
6579 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6580 VALUES ('Goal', '[\"$.name\"]', ' ')",
6581 [],
6582 )
6583 .expect("register schema");
6584 }
6585 let writer = WriterActor::start(
6586 db.path(),
6587 Arc::clone(&schema),
6588 ProvenanceMode::Warn,
6589 Arc::new(TelemetryCounters::default()),
6590 )
6591 .expect("writer");
6592
6593 writer
6595 .submit(WriteRequest {
6596 label: "insert".to_owned(),
6597 nodes: vec![NodeInsert {
6598 row_id: "row-1".to_owned(),
6599 logical_id: "goal-1".to_owned(),
6600 kind: "Goal".to_owned(),
6601 properties: r#"{"name":"Alpha"}"#.to_owned(),
6602 source_ref: Some("src-1".to_owned()),
6603 upsert: false,
6604 chunk_policy: ChunkPolicy::Preserve,
6605 content_ref: None,
6606 }],
6607 node_retires: vec![],
6608 edges: vec![],
6609 edge_retires: vec![],
6610 chunks: vec![],
6611 runs: vec![],
6612 steps: vec![],
6613 actions: vec![],
6614 optional_backfills: vec![],
6615 vec_inserts: vec![],
6616 operational_writes: vec![],
6617 })
6618 .expect("insert");
6619
6620 writer
6622 .submit(WriteRequest {
6623 label: "upsert".to_owned(),
6624 nodes: vec![NodeInsert {
6625 row_id: "row-2".to_owned(),
6626 logical_id: "goal-1".to_owned(),
6627 kind: "Goal".to_owned(),
6628 properties: r#"{"name":"Beta"}"#.to_owned(),
6629 source_ref: Some("src-2".to_owned()),
6630 upsert: true,
6631 chunk_policy: ChunkPolicy::Preserve,
6632 content_ref: None,
6633 }],
6634 node_retires: vec![],
6635 edges: vec![],
6636 edge_retires: vec![],
6637 chunks: vec![],
6638 runs: vec![],
6639 steps: vec![],
6640 actions: vec![],
6641 optional_backfills: vec![],
6642 vec_inserts: vec![],
6643 operational_writes: vec![],
6644 })
6645 .expect("upsert");
6646
6647 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6648 let count: i64 = conn
6649 .query_row(
6650 "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6651 [],
6652 |row| row.get(0),
6653 )
6654 .expect("count");
6655 assert_eq!(
6656 count, 1,
6657 "must have exactly one property FTS row after upsert"
6658 );
6659
6660 let text: String = conn
6661 .query_row(
6662 "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6663 [],
6664 |row| row.get(0),
6665 )
6666 .expect("text");
6667 assert_eq!(text, "Beta", "property FTS must reflect updated properties");
6668 }
6669
6670 #[test]
6671 fn property_fts_rows_deleted_on_retire() {
6672 let db = NamedTempFile::new().expect("temporary db");
6673 let schema = Arc::new(SchemaManager::new());
6674 {
6675 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6676 schema.bootstrap(&conn).expect("bootstrap");
6677 conn.execute(
6678 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6679 VALUES ('Goal', '[\"$.name\"]', ' ')",
6680 [],
6681 )
6682 .expect("register schema");
6683 }
6684 let writer = WriterActor::start(
6685 db.path(),
6686 Arc::clone(&schema),
6687 ProvenanceMode::Warn,
6688 Arc::new(TelemetryCounters::default()),
6689 )
6690 .expect("writer");
6691
6692 writer
6694 .submit(WriteRequest {
6695 label: "insert".to_owned(),
6696 nodes: vec![NodeInsert {
6697 row_id: "row-1".to_owned(),
6698 logical_id: "goal-1".to_owned(),
6699 kind: "Goal".to_owned(),
6700 properties: r#"{"name":"Alpha"}"#.to_owned(),
6701 source_ref: Some("src-1".to_owned()),
6702 upsert: false,
6703 chunk_policy: ChunkPolicy::Preserve,
6704 content_ref: None,
6705 }],
6706 node_retires: vec![],
6707 edges: vec![],
6708 edge_retires: vec![],
6709 chunks: vec![],
6710 runs: vec![],
6711 steps: vec![],
6712 actions: vec![],
6713 optional_backfills: vec![],
6714 vec_inserts: vec![],
6715 operational_writes: vec![],
6716 })
6717 .expect("insert");
6718
6719 writer
6721 .submit(WriteRequest {
6722 label: "retire".to_owned(),
6723 nodes: vec![],
6724 node_retires: vec![NodeRetire {
6725 logical_id: "goal-1".to_owned(),
6726 source_ref: Some("forget-1".to_owned()),
6727 }],
6728 edges: vec![],
6729 edge_retires: vec![],
6730 chunks: vec![],
6731 runs: vec![],
6732 steps: vec![],
6733 actions: vec![],
6734 optional_backfills: vec![],
6735 vec_inserts: vec![],
6736 operational_writes: vec![],
6737 })
6738 .expect("retire");
6739
6740 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6741 let count: i64 = conn
6742 .query_row(
6743 "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6744 [],
6745 |row| row.get(0),
6746 )
6747 .expect("count");
6748 assert_eq!(count, 0, "property FTS row must be deleted on retire");
6749 }
6750
6751 #[test]
6752 fn no_property_fts_row_for_unregistered_kind() {
6753 let db = NamedTempFile::new().expect("temporary db");
6754 let schema = Arc::new(SchemaManager::new());
6755 {
6756 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6757 schema.bootstrap(&conn).expect("bootstrap");
6758 }
6760 let writer = WriterActor::start(
6761 db.path(),
6762 Arc::clone(&schema),
6763 ProvenanceMode::Warn,
6764 Arc::new(TelemetryCounters::default()),
6765 )
6766 .expect("writer");
6767
6768 writer
6769 .submit(WriteRequest {
6770 label: "insert".to_owned(),
6771 nodes: vec![NodeInsert {
6772 row_id: "row-1".to_owned(),
6773 logical_id: "note-1".to_owned(),
6774 kind: "Note".to_owned(),
6775 properties: r#"{"title":"hello"}"#.to_owned(),
6776 source_ref: Some("src-1".to_owned()),
6777 upsert: false,
6778 chunk_policy: ChunkPolicy::Preserve,
6779 content_ref: None,
6780 }],
6781 node_retires: vec![],
6782 edges: vec![],
6783 edge_retires: vec![],
6784 chunks: vec![],
6785 runs: vec![],
6786 steps: vec![],
6787 actions: vec![],
6788 optional_backfills: vec![],
6789 vec_inserts: vec![],
6790 operational_writes: vec![],
6791 })
6792 .expect("insert");
6793
6794 let conn = rusqlite::Connection::open(db.path()).expect("conn");
6795 let count: i64 = conn
6796 .query_row("SELECT count(*) FROM fts_node_properties", [], |row| {
6797 row.get(0)
6798 })
6799 .expect("count");
6800 assert_eq!(count, 0, "no property FTS rows for unregistered kind");
6801 }
6802
6803 mod extract_json_path_tests {
6804 use super::super::extract_json_path;
6805 use serde_json::json;
6806
6807 #[test]
6808 fn string_value() {
6809 let v = json!({"name": "alice"});
6810 assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
6811 }
6812
6813 #[test]
6814 fn number_value() {
6815 let v = json!({"age": 42});
6816 assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
6817 }
6818
6819 #[test]
6820 fn bool_value() {
6821 let v = json!({"active": true});
6822 assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
6823 }
6824
6825 #[test]
6826 fn null_value() {
6827 let v = json!({"x": null});
6828 assert!(extract_json_path(&v, "$.x").is_empty());
6829 }
6830
6831 #[test]
6832 fn missing_path() {
6833 let v = json!({"name": "a"});
6834 assert!(extract_json_path(&v, "$.missing").is_empty());
6835 }
6836
6837 #[test]
6838 fn nested_path() {
6839 let v = json!({"address": {"city": "NYC"}});
6840 assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
6841 }
6842
6843 #[test]
6844 fn array_of_strings() {
6845 let v = json!({"tags": ["a", "b", "c"]});
6846 assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
6847 }
6848
6849 #[test]
6850 fn array_mixed_scalars() {
6851 let v = json!({"vals": ["x", 1, true]});
6852 assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
6853 }
6854
6855 #[test]
6856 fn array_only_objects_returns_empty() {
6857 let v = json!({"data": [{"k": "v"}]});
6858 assert!(extract_json_path(&v, "$.data").is_empty());
6859 }
6860
6861 #[test]
6862 fn array_mixed_objects_and_scalars() {
6863 let v = json!({"data": ["keep", {"skip": true}, "also"]});
6864 assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
6865 }
6866
6867 #[test]
6868 fn object_returns_empty() {
6869 let v = json!({"meta": {"k": "v"}});
6870 assert!(extract_json_path(&v, "$.meta").is_empty());
6871 }
6872
6873 #[test]
6874 fn no_prefix_returns_empty() {
6875 let v = json!({"name": "a"});
6876 assert!(extract_json_path(&v, "name").is_empty());
6877 }
6878 }
6879}