1use std::collections::{BTreeMap, HashMap, HashSet};
2
3use crate::clock::LamportClock;
4use crate::entry::{Entry, GraphOp, Hash, Value};
5use crate::ontology::Ontology;
6
7#[derive(Debug, Clone, PartialEq)]
9pub struct Node {
10 pub node_id: String,
11 pub node_type: String,
12 pub subtype: Option<String>,
13 pub label: String,
14 pub properties: BTreeMap<String, Value>,
15 pub property_clocks: HashMap<String, LamportClock>,
19 pub last_clock: LamportClock,
22 pub last_add_clock: LamportClock,
26 pub tombstoned: bool,
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub struct Edge {
33 pub edge_id: String,
34 pub edge_type: String,
35 pub source_id: String,
36 pub target_id: String,
37 pub properties: BTreeMap<String, Value>,
38 pub property_clocks: HashMap<String, LamportClock>,
40 pub last_clock: LamportClock,
41 pub last_add_clock: LamportClock,
43 pub tombstoned: bool,
44}
45
46pub struct MaterializedGraph {
57 pub nodes: HashMap<String, Node>,
59 pub edges: HashMap<String, Edge>,
61 pub outgoing: HashMap<String, HashSet<String>>,
63 pub incoming: HashMap<String, HashSet<String>>,
65 pub by_type: HashMap<String, HashSet<String>>,
67 pub ontology: Ontology,
69 pub quarantined: HashSet<Hash>,
73}
74
75impl MaterializedGraph {
76 pub fn new(ontology: Ontology) -> Self {
78 Self {
79 nodes: HashMap::new(),
80 edges: HashMap::new(),
81 outgoing: HashMap::new(),
82 incoming: HashMap::new(),
83 by_type: HashMap::new(),
84 ontology,
85 quarantined: HashSet::new(),
86 }
87 }
88
89 pub fn apply(&mut self, entry: &Entry) {
96 match &entry.payload {
97 GraphOp::Checkpoint { ops, op_clocks, .. } => {
98 for (i, op) in ops.iter().enumerate() {
101 let clock = if i < op_clocks.len() {
102 LamportClock::with_values(&entry.author, op_clocks[i].0, op_clocks[i].1)
103 } else {
104 entry.clock.clone() };
106 let synthetic = Entry::new(op.clone(), vec![], vec![], clock, &entry.author);
107 self.apply(&synthetic);
108 }
109 }
110 GraphOp::DefineOntology { .. } => {
111 }
113 GraphOp::ExtendOntology { extension } => {
114 if let Err(_e) = self.ontology.merge_extension(extension) {
115 self.quarantined.insert(entry.hash);
116 }
117 }
118 GraphOp::AddNode {
119 node_id,
120 node_type,
121 subtype,
122 label,
123 properties,
124 } => {
125 if let Err(_e) =
127 self.ontology
128 .validate_node(node_type, subtype.as_deref(), properties)
129 {
130 self.quarantined.insert(entry.hash);
131 return;
132 }
133 self.apply_add_node(
134 node_id,
135 node_type,
136 subtype.as_deref(),
137 label,
138 properties,
139 &entry.clock,
140 );
141 }
142 GraphOp::AddEdge {
143 edge_id,
144 edge_type,
145 source_id,
146 target_id,
147 properties,
148 } => {
149 if !self.ontology.edge_types.contains_key(edge_type.as_str()) {
151 self.quarantined.insert(entry.hash);
152 return;
153 }
154 if let (Some(src), Some(tgt)) = (
158 self.nodes.get(source_id.as_str()),
159 self.nodes.get(target_id.as_str()),
160 ) {
161 if self
162 .ontology
163 .validate_edge(edge_type, &src.node_type, &tgt.node_type, properties)
164 .is_err()
165 {
166 self.quarantined.insert(entry.hash);
167 return;
168 }
169 }
170 self.apply_add_edge(
171 edge_id,
172 edge_type,
173 source_id,
174 target_id,
175 properties,
176 &entry.clock,
177 );
178 }
179 GraphOp::UpdateProperty {
180 entity_id,
181 key,
182 value,
183 } => {
184 self.apply_update_property(entity_id, key, value, &entry.clock);
185 }
186 GraphOp::RemoveNode { node_id } => {
187 self.apply_remove_node(node_id, &entry.clock);
188 }
189 GraphOp::RemoveEdge { edge_id } => {
190 self.apply_remove_edge(edge_id, &entry.clock);
191 }
192 }
193 }
194
195 pub fn apply_all(&mut self, entries: &[&Entry]) {
197 for entry in entries {
198 self.apply(entry);
199 }
200 }
201
202 pub fn rebuild(&mut self, entries: &[&Entry]) {
204 self.nodes.clear();
205 self.edges.clear();
206 self.outgoing.clear();
207 self.incoming.clear();
208 self.by_type.clear();
209 self.quarantined.clear();
210 self.apply_all(entries);
211 }
212
213 pub fn get_node(&self, node_id: &str) -> Option<&Node> {
217 self.nodes.get(node_id).filter(|n| !n.tombstoned)
218 }
219
220 pub fn get_edge(&self, edge_id: &str) -> Option<&Edge> {
222 self.edges.get(edge_id).filter(|e| !e.tombstoned)
223 }
224
225 pub fn nodes_by_type(&self, node_type: &str) -> Vec<&Node> {
227 match self.by_type.get(node_type) {
228 Some(ids) => ids.iter().filter_map(|id| self.get_node(id)).collect(),
229 None => vec![],
230 }
231 }
232
233 pub fn nodes_by_subtype(&self, subtype: &str) -> Vec<&Node> {
235 self.nodes
236 .values()
237 .filter(|n| !n.tombstoned && n.subtype.as_deref() == Some(subtype))
238 .collect()
239 }
240
241 pub fn nodes_by_property(&self, key: &str, value: &Value) -> Vec<&Node> {
243 self.nodes
244 .values()
245 .filter(|n| !n.tombstoned && n.properties.get(key) == Some(value))
246 .collect()
247 }
248
249 pub fn outgoing_edges(&self, node_id: &str) -> Vec<&Edge> {
251 match self.outgoing.get(node_id) {
252 Some(edge_ids) => edge_ids
253 .iter()
254 .filter_map(|eid| self.get_edge(eid))
255 .filter(|e| self.is_node_live(&e.target_id))
256 .collect(),
257 None => vec![],
258 }
259 }
260
261 pub fn incoming_edges(&self, node_id: &str) -> Vec<&Edge> {
263 match self.incoming.get(node_id) {
264 Some(edge_ids) => edge_ids
265 .iter()
266 .filter_map(|eid| self.get_edge(eid))
267 .filter(|e| self.is_node_live(&e.source_id))
268 .collect(),
269 None => vec![],
270 }
271 }
272
273 pub fn all_nodes(&self) -> Vec<&Node> {
275 self.nodes.values().filter(|n| !n.tombstoned).collect()
276 }
277
278 pub fn all_edges(&self) -> Vec<&Edge> {
280 self.edges
281 .values()
282 .filter(|e| {
283 !e.tombstoned && self.is_node_live(&e.source_id) && self.is_node_live(&e.target_id)
284 })
285 .collect()
286 }
287
288 pub fn neighbors(&self, node_id: &str) -> Vec<&str> {
290 self.outgoing_edges(node_id)
291 .iter()
292 .map(|e| e.target_id.as_str())
293 .collect()
294 }
295
296 pub fn reverse_neighbors(&self, node_id: &str) -> Vec<&str> {
298 self.incoming_edges(node_id)
299 .iter()
300 .map(|e| e.source_id.as_str())
301 .collect()
302 }
303
304 fn apply_add_node(
307 &mut self,
308 node_id: &str,
309 node_type: &str,
310 subtype: Option<&str>,
311 label: &str,
312 properties: &BTreeMap<String, Value>,
313 clock: &LamportClock,
314 ) {
315 if let Some(existing) = self.nodes.get_mut(node_id) {
316 existing.tombstoned = false;
318 if clock_wins(clock, &existing.last_add_clock) {
320 existing.last_add_clock = clock.clone();
321 }
322 if clock_wins(clock, &existing.last_clock) {
324 existing.label = label.to_string();
325 existing.subtype = subtype.map(|s| s.to_string());
326 existing.last_clock = clock.clone();
327 }
328 for (k, v) in properties {
331 let dominated = existing
332 .property_clocks
333 .get(k)
334 .map(|c| clock_wins(clock, c))
335 .unwrap_or(true);
336 if dominated {
337 existing.properties.insert(k.clone(), v.clone());
338 existing.property_clocks.insert(k.clone(), clock.clone());
339 }
340 }
341 } else {
342 let property_clocks: HashMap<String, LamportClock> = properties
343 .keys()
344 .map(|k| (k.clone(), clock.clone()))
345 .collect();
346 let node = Node {
347 node_id: node_id.to_string(),
348 node_type: node_type.to_string(),
349 subtype: subtype.map(|s| s.to_string()),
350 label: label.to_string(),
351 properties: properties.clone(),
352 property_clocks,
353 last_clock: clock.clone(),
354 last_add_clock: clock.clone(),
355 tombstoned: false,
356 };
357 self.by_type
358 .entry(node_type.to_string())
359 .or_default()
360 .insert(node_id.to_string());
361 self.nodes.insert(node_id.to_string(), node);
362 }
363 }
364
365 fn apply_add_edge(
366 &mut self,
367 edge_id: &str,
368 edge_type: &str,
369 source_id: &str,
370 target_id: &str,
371 properties: &BTreeMap<String, Value>,
372 clock: &LamportClock,
373 ) {
374 if let Some(existing) = self.edges.get_mut(edge_id) {
375 existing.tombstoned = false;
377 if clock_wins(clock, &existing.last_add_clock) {
378 existing.last_add_clock = clock.clone();
379 }
380 if clock_wins(clock, &existing.last_clock) {
381 existing.last_clock = clock.clone();
382 }
383 for (k, v) in properties {
385 let dominated = existing
386 .property_clocks
387 .get(k)
388 .map(|c| clock_wins(clock, c))
389 .unwrap_or(true);
390 if dominated {
391 existing.properties.insert(k.clone(), v.clone());
392 existing.property_clocks.insert(k.clone(), clock.clone());
393 }
394 }
395 } else {
396 let property_clocks: HashMap<String, LamportClock> = properties
397 .keys()
398 .map(|k| (k.clone(), clock.clone()))
399 .collect();
400 let edge = Edge {
401 edge_id: edge_id.to_string(),
402 edge_type: edge_type.to_string(),
403 source_id: source_id.to_string(),
404 target_id: target_id.to_string(),
405 properties: properties.clone(),
406 property_clocks,
407 last_clock: clock.clone(),
408 last_add_clock: clock.clone(),
409 tombstoned: false,
410 };
411 self.outgoing
412 .entry(source_id.to_string())
413 .or_default()
414 .insert(edge_id.to_string());
415 self.incoming
416 .entry(target_id.to_string())
417 .or_default()
418 .insert(edge_id.to_string());
419 self.edges.insert(edge_id.to_string(), edge);
420 }
421 }
422
423 fn apply_update_property(
424 &mut self,
425 entity_id: &str,
426 key: &str,
427 value: &Value,
428 clock: &LamportClock,
429 ) {
430 if let Some(node) = self.nodes.get_mut(entity_id) {
433 let dominated = node
434 .property_clocks
435 .get(key)
436 .map(|c| clock_wins(clock, c))
437 .unwrap_or(true);
438 if dominated {
439 node.properties.insert(key.to_string(), value.clone());
440 node.property_clocks.insert(key.to_string(), clock.clone());
441 }
442 if clock_wins(clock, &node.last_clock) {
444 node.last_clock = clock.clone();
445 }
446 } else if let Some(edge) = self.edges.get_mut(entity_id) {
447 let dominated = edge
448 .property_clocks
449 .get(key)
450 .map(|c| clock_wins(clock, c))
451 .unwrap_or(true);
452 if dominated {
453 edge.properties.insert(key.to_string(), value.clone());
454 edge.property_clocks.insert(key.to_string(), clock.clone());
455 }
456 if clock_wins(clock, &edge.last_clock) {
457 edge.last_clock = clock.clone();
458 }
459 }
460 }
462
463 fn apply_remove_node(&mut self, node_id: &str, clock: &LamportClock) {
464 if let Some(node) = self.nodes.get_mut(node_id) {
465 if clock_wins(clock, &node.last_add_clock) {
469 node.tombstoned = true;
470 node.last_clock = clock.clone();
471 }
472 }
473 }
476
477 fn apply_remove_edge(&mut self, edge_id: &str, clock: &LamportClock) {
478 if let Some(edge) = self.edges.get_mut(edge_id) {
479 if clock_wins(clock, &edge.last_add_clock) {
481 edge.tombstoned = true;
482 edge.last_clock = clock.clone();
483 }
484 }
485 }
486
487 fn is_node_live(&self, node_id: &str) -> bool {
488 self.nodes
489 .get(node_id)
490 .map(|n| !n.tombstoned)
491 .unwrap_or(false)
492 }
493}
494
495fn clock_wins(new_clock: &LamportClock, existing_clock: &LamportClock) -> bool {
498 new_clock.cmp_order(existing_clock) == std::cmp::Ordering::Greater
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504 use crate::entry::Entry;
505 use crate::ontology::{EdgeTypeDef, NodeTypeDef};
506
507 fn test_ontology() -> Ontology {
508 Ontology {
509 node_types: BTreeMap::from([
510 (
511 "entity".into(),
512 NodeTypeDef {
513 description: None,
514 properties: BTreeMap::new(),
515 subtypes: None,
516 },
517 ),
518 (
519 "signal".into(),
520 NodeTypeDef {
521 description: None,
522 properties: BTreeMap::new(),
523 subtypes: None,
524 },
525 ),
526 ]),
527 edge_types: BTreeMap::from([
528 (
529 "RUNS_ON".into(),
530 EdgeTypeDef {
531 description: None,
532 source_types: vec!["entity".into()],
533 target_types: vec!["entity".into()],
534 properties: BTreeMap::new(),
535 },
536 ),
537 (
538 "OBSERVES".into(),
539 EdgeTypeDef {
540 description: None,
541 source_types: vec!["signal".into()],
542 target_types: vec!["entity".into()],
543 properties: BTreeMap::new(),
544 },
545 ),
546 ]),
547 }
548 }
549
550 fn make_entry(op: GraphOp, clock_time: u64, author: &str) -> Entry {
551 Entry::new(
552 op,
553 vec![],
554 vec![],
555 LamportClock::with_values(author, clock_time, 0),
556 author,
557 )
558 }
559
560 #[test]
563 fn add_node_appears_in_query() {
564 let mut g = MaterializedGraph::new(test_ontology());
565 let entry = make_entry(
566 GraphOp::AddNode {
567 node_id: "server-1".into(),
568 node_type: "entity".into(),
569 label: "Server 1".into(),
570 properties: BTreeMap::from([("ip".into(), Value::String("10.0.0.1".into()))]),
571 subtype: None,
572 },
573 1,
574 "inst-a",
575 );
576 g.apply(&entry);
577
578 let node = g.get_node("server-1").unwrap();
579 assert_eq!(node.node_type, "entity");
580 assert_eq!(node.label, "Server 1");
581 assert_eq!(
582 node.properties.get("ip"),
583 Some(&Value::String("10.0.0.1".into()))
584 );
585 }
586
587 #[test]
588 fn add_edge_creates_adjacency() {
589 let mut g = MaterializedGraph::new(test_ontology());
590 g.apply(&make_entry(
591 GraphOp::AddNode {
592 node_id: "svc".into(),
593 node_type: "entity".into(),
594 label: "svc".into(),
595 properties: BTreeMap::new(),
596 subtype: None,
597 },
598 1,
599 "inst-a",
600 ));
601 g.apply(&make_entry(
602 GraphOp::AddNode {
603 node_id: "srv".into(),
604 node_type: "entity".into(),
605 label: "srv".into(),
606 properties: BTreeMap::new(),
607 subtype: None,
608 },
609 2,
610 "inst-a",
611 ));
612 g.apply(&make_entry(
613 GraphOp::AddEdge {
614 edge_id: "e1".into(),
615 edge_type: "RUNS_ON".into(),
616 source_id: "svc".into(),
617 target_id: "srv".into(),
618 properties: BTreeMap::new(),
619 },
620 3,
621 "inst-a",
622 ));
623
624 let out = g.outgoing_edges("svc");
626 assert_eq!(out.len(), 1);
627 assert_eq!(out[0].target_id, "srv");
628
629 let inc = g.incoming_edges("srv");
630 assert_eq!(inc.len(), 1);
631 assert_eq!(inc[0].source_id, "svc");
632
633 assert_eq!(g.neighbors("svc"), vec!["srv"]);
634 }
635
636 #[test]
637 fn update_property_reflected() {
638 let mut g = MaterializedGraph::new(test_ontology());
639 g.apply(&make_entry(
640 GraphOp::AddNode {
641 node_id: "s1".into(),
642 node_type: "entity".into(),
643 label: "s1".into(),
644 properties: BTreeMap::new(),
645 subtype: None,
646 },
647 1,
648 "inst-a",
649 ));
650 g.apply(&make_entry(
651 GraphOp::UpdateProperty {
652 entity_id: "s1".into(),
653 key: "cpu".into(),
654 value: Value::Float(85.5),
655 },
656 2,
657 "inst-a",
658 ));
659
660 let node = g.get_node("s1").unwrap();
661 assert_eq!(node.properties.get("cpu"), Some(&Value::Float(85.5)));
662 }
663
664 #[test]
665 fn remove_node_cascades_edges() {
666 let mut g = MaterializedGraph::new(test_ontology());
667 g.apply(&make_entry(
668 GraphOp::AddNode {
669 node_id: "a".into(),
670 node_type: "entity".into(),
671 label: "a".into(),
672 properties: BTreeMap::new(),
673 subtype: None,
674 },
675 1,
676 "inst-a",
677 ));
678 g.apply(&make_entry(
679 GraphOp::AddNode {
680 node_id: "b".into(),
681 node_type: "entity".into(),
682 label: "b".into(),
683 properties: BTreeMap::new(),
684 subtype: None,
685 },
686 2,
687 "inst-a",
688 ));
689 g.apply(&make_entry(
690 GraphOp::AddEdge {
691 edge_id: "e1".into(),
692 edge_type: "RUNS_ON".into(),
693 source_id: "a".into(),
694 target_id: "b".into(),
695 properties: BTreeMap::new(),
696 },
697 3,
698 "inst-a",
699 ));
700 assert_eq!(g.all_edges().len(), 1);
701
702 g.apply(&make_entry(
704 GraphOp::RemoveNode {
705 node_id: "b".into(),
706 },
707 4,
708 "inst-a",
709 ));
710 assert!(g.get_node("b").is_none());
711 assert_eq!(g.all_edges().len(), 0);
713 assert_eq!(g.outgoing_edges("a").len(), 0);
715 }
716
717 #[test]
718 fn remove_edge_preserves_nodes() {
719 let mut g = MaterializedGraph::new(test_ontology());
720 g.apply(&make_entry(
721 GraphOp::AddNode {
722 node_id: "a".into(),
723 node_type: "entity".into(),
724 label: "a".into(),
725 properties: BTreeMap::new(),
726 subtype: None,
727 },
728 1,
729 "inst-a",
730 ));
731 g.apply(&make_entry(
732 GraphOp::AddNode {
733 node_id: "b".into(),
734 node_type: "entity".into(),
735 label: "b".into(),
736 properties: BTreeMap::new(),
737 subtype: None,
738 },
739 2,
740 "inst-a",
741 ));
742 g.apply(&make_entry(
743 GraphOp::AddEdge {
744 edge_id: "e1".into(),
745 edge_type: "RUNS_ON".into(),
746 source_id: "a".into(),
747 target_id: "b".into(),
748 properties: BTreeMap::new(),
749 },
750 3,
751 "inst-a",
752 ));
753 g.apply(&make_entry(
754 GraphOp::RemoveEdge {
755 edge_id: "e1".into(),
756 },
757 4,
758 "inst-a",
759 ));
760
761 assert!(g.get_node("a").is_some());
763 assert!(g.get_node("b").is_some());
764 assert!(g.get_edge("e1").is_none());
766 assert_eq!(g.all_edges().len(), 0);
767 }
768
769 #[test]
770 fn query_by_type_filters() {
771 let mut g = MaterializedGraph::new(test_ontology());
772 g.apply(&make_entry(
773 GraphOp::AddNode {
774 node_id: "s1".into(),
775 node_type: "entity".into(),
776 label: "s1".into(),
777 properties: BTreeMap::new(),
778 subtype: None,
779 },
780 1,
781 "inst-a",
782 ));
783 g.apply(&make_entry(
784 GraphOp::AddNode {
785 node_id: "s2".into(),
786 node_type: "entity".into(),
787 label: "s2".into(),
788 properties: BTreeMap::new(),
789 subtype: None,
790 },
791 2,
792 "inst-a",
793 ));
794 g.apply(&make_entry(
795 GraphOp::AddNode {
796 node_id: "alert".into(),
797 node_type: "signal".into(),
798 label: "alert".into(),
799 properties: BTreeMap::new(),
800 subtype: None,
801 },
802 3,
803 "inst-a",
804 ));
805
806 let entities = g.nodes_by_type("entity");
807 assert_eq!(entities.len(), 2);
808 let signals = g.nodes_by_type("signal");
809 assert_eq!(signals.len(), 1);
810 assert_eq!(signals[0].node_id, "alert");
811 }
812
813 #[test]
814 fn query_by_property_filters() {
815 let mut g = MaterializedGraph::new(test_ontology());
816 g.apply(&make_entry(
817 GraphOp::AddNode {
818 node_id: "s1".into(),
819 node_type: "entity".into(),
820 label: "s1".into(),
821 properties: BTreeMap::from([("status".into(), Value::String("alive".into()))]),
822 subtype: None,
823 },
824 1,
825 "inst-a",
826 ));
827 g.apply(&make_entry(
828 GraphOp::AddNode {
829 node_id: "s2".into(),
830 node_type: "entity".into(),
831 label: "s2".into(),
832 properties: BTreeMap::from([("status".into(), Value::String("dead".into()))]),
833 subtype: None,
834 },
835 2,
836 "inst-a",
837 ));
838
839 let alive = g.nodes_by_property("status", &Value::String("alive".into()));
840 assert_eq!(alive.len(), 1);
841 assert_eq!(alive[0].node_id, "s1");
842 }
843
844 #[test]
845 fn materialization_from_empty() {
846 let mut g1 = MaterializedGraph::new(test_ontology());
848 let entries = vec![
849 make_entry(
850 GraphOp::DefineOntology {
851 ontology: test_ontology(),
852 },
853 0,
854 "inst-a",
855 ),
856 make_entry(
857 GraphOp::AddNode {
858 node_id: "a".into(),
859 node_type: "entity".into(),
860 label: "a".into(),
861 properties: BTreeMap::new(),
862 subtype: None,
863 },
864 1,
865 "inst-a",
866 ),
867 make_entry(
868 GraphOp::AddNode {
869 node_id: "b".into(),
870 node_type: "entity".into(),
871 label: "b".into(),
872 properties: BTreeMap::new(),
873 subtype: None,
874 },
875 2,
876 "inst-a",
877 ),
878 make_entry(
879 GraphOp::AddEdge {
880 edge_id: "e1".into(),
881 edge_type: "RUNS_ON".into(),
882 source_id: "a".into(),
883 target_id: "b".into(),
884 properties: BTreeMap::new(),
885 },
886 3,
887 "inst-a",
888 ),
889 ];
890 for e in &entries {
891 g1.apply(e);
892 }
893
894 let mut g2 = MaterializedGraph::new(test_ontology());
896 let refs: Vec<&Entry> = entries.iter().collect();
897 g2.rebuild(&refs);
898
899 assert_eq!(g1.all_nodes().len(), g2.all_nodes().len());
901 assert_eq!(g1.all_edges().len(), g2.all_edges().len());
902 for node in g1.all_nodes() {
903 let n2 = g2.get_node(&node.node_id).unwrap();
904 assert_eq!(node.node_type, n2.node_type);
905 assert_eq!(node.properties, n2.properties);
906 }
907 }
908
909 #[test]
910 fn incremental_equals_full() {
911 let entries = vec![
912 make_entry(
913 GraphOp::DefineOntology {
914 ontology: test_ontology(),
915 },
916 0,
917 "inst-a",
918 ),
919 make_entry(
920 GraphOp::AddNode {
921 node_id: "a".into(),
922 node_type: "entity".into(),
923 label: "a".into(),
924 properties: BTreeMap::from([("x".into(), Value::Int(1))]),
925 subtype: None,
926 },
927 1,
928 "inst-a",
929 ),
930 make_entry(
931 GraphOp::UpdateProperty {
932 entity_id: "a".into(),
933 key: "x".into(),
934 value: Value::Int(2),
935 },
936 2,
937 "inst-a",
938 ),
939 make_entry(
940 GraphOp::AddNode {
941 node_id: "b".into(),
942 node_type: "entity".into(),
943 label: "b".into(),
944 properties: BTreeMap::new(),
945 subtype: None,
946 },
947 3,
948 "inst-a",
949 ),
950 make_entry(
951 GraphOp::AddEdge {
952 edge_id: "e1".into(),
953 edge_type: "RUNS_ON".into(),
954 source_id: "a".into(),
955 target_id: "b".into(),
956 properties: BTreeMap::new(),
957 },
958 4,
959 "inst-a",
960 ),
961 make_entry(
962 GraphOp::RemoveEdge {
963 edge_id: "e1".into(),
964 },
965 5,
966 "inst-a",
967 ),
968 ];
969
970 let mut g_inc = MaterializedGraph::new(test_ontology());
972 for e in &entries {
973 g_inc.apply(e);
974 }
975
976 let mut g_full = MaterializedGraph::new(test_ontology());
978 let refs: Vec<&Entry> = entries.iter().collect();
979 g_full.rebuild(&refs);
980
981 assert_eq!(
983 g_inc.get_node("a").unwrap().properties.get("x"),
984 Some(&Value::Int(2))
985 );
986 assert_eq!(
987 g_full.get_node("a").unwrap().properties.get("x"),
988 Some(&Value::Int(2))
989 );
990 assert_eq!(g_inc.all_edges().len(), 0);
992 assert_eq!(g_full.all_edges().len(), 0);
993 }
994
995 #[test]
996 fn lww_concurrent_property_update() {
997 let mut g = MaterializedGraph::new(test_ontology());
999 g.apply(&make_entry(
1000 GraphOp::AddNode {
1001 node_id: "s1".into(),
1002 node_type: "entity".into(),
1003 label: "s1".into(),
1004 properties: BTreeMap::new(),
1005 subtype: None,
1006 },
1007 1,
1008 "inst-a",
1009 ));
1010 g.apply(&make_entry(
1012 GraphOp::UpdateProperty {
1013 entity_id: "s1".into(),
1014 key: "status".into(),
1015 value: Value::String("alive".into()),
1016 },
1017 2,
1018 "inst-a",
1019 ));
1020 g.apply(&make_entry(
1022 GraphOp::UpdateProperty {
1023 entity_id: "s1".into(),
1024 key: "status".into(),
1025 value: Value::String("dead".into()),
1026 },
1027 3,
1028 "inst-b",
1029 ));
1030 assert_eq!(
1031 g.get_node("s1").unwrap().properties.get("status"),
1032 Some(&Value::String("dead".into()))
1033 );
1034 }
1035
1036 #[test]
1037 fn lww_tiebreak_by_instance_id() {
1038 let mut g = MaterializedGraph::new(test_ontology());
1040 g.apply(&make_entry(
1041 GraphOp::AddNode {
1042 node_id: "s1".into(),
1043 node_type: "entity".into(),
1044 label: "s1".into(),
1045 properties: BTreeMap::new(),
1046 subtype: None,
1047 },
1048 1,
1049 "inst-a",
1050 ));
1051 g.apply(&make_entry(
1053 GraphOp::UpdateProperty {
1054 entity_id: "s1".into(),
1055 key: "x".into(),
1056 value: Value::Int(1),
1057 },
1058 5,
1059 "inst-a",
1060 ));
1061 g.apply(&make_entry(
1062 GraphOp::UpdateProperty {
1063 entity_id: "s1".into(),
1064 key: "x".into(),
1065 value: Value::Int(2),
1066 },
1067 5,
1068 "inst-b",
1069 ));
1070 assert_eq!(
1072 g.get_node("s1").unwrap().properties.get("x"),
1073 Some(&Value::Int(1))
1074 );
1075 }
1076
1077 #[test]
1078 fn lww_per_property_concurrent_different_keys() {
1079 let mut g = MaterializedGraph::new(test_ontology());
1083 g.apply(&make_entry(
1084 GraphOp::AddNode {
1085 node_id: "s1".into(),
1086 node_type: "entity".into(),
1087 label: "s1".into(),
1088 properties: BTreeMap::from([
1089 ("x".into(), Value::Int(0)),
1090 ("y".into(), Value::Int(0)),
1091 ]),
1092 subtype: None,
1093 },
1094 1,
1095 "inst-a",
1096 ));
1097 g.apply(&make_entry(
1099 GraphOp::UpdateProperty {
1100 entity_id: "s1".into(),
1101 key: "x".into(),
1102 value: Value::Int(42),
1103 },
1104 3,
1105 "inst-a",
1106 ));
1107 g.apply(&make_entry(
1109 GraphOp::UpdateProperty {
1110 entity_id: "s1".into(),
1111 key: "y".into(),
1112 value: Value::Int(99),
1113 },
1114 3,
1115 "inst-b",
1116 ));
1117
1118 let node = g.get_node("s1").unwrap();
1119 assert_eq!(
1121 node.properties.get("x"),
1122 Some(&Value::Int(42)),
1123 "update to 'x' must not be rejected by concurrent update to 'y'"
1124 );
1125 assert_eq!(
1126 node.properties.get("y"),
1127 Some(&Value::Int(99)),
1128 "update to 'y' must not be rejected by concurrent update to 'x'"
1129 );
1130 }
1131
1132 #[test]
1133 fn lww_per_property_order_independent() {
1134 let mut g = MaterializedGraph::new(test_ontology());
1136 g.apply(&make_entry(
1137 GraphOp::AddNode {
1138 node_id: "s1".into(),
1139 node_type: "entity".into(),
1140 label: "s1".into(),
1141 properties: BTreeMap::from([
1142 ("x".into(), Value::Int(0)),
1143 ("y".into(), Value::Int(0)),
1144 ]),
1145 subtype: None,
1146 },
1147 1,
1148 "inst-a",
1149 ));
1150 g.apply(&make_entry(
1152 GraphOp::UpdateProperty {
1153 entity_id: "s1".into(),
1154 key: "y".into(),
1155 value: Value::Int(99),
1156 },
1157 3,
1158 "inst-b",
1159 ));
1160 g.apply(&make_entry(
1161 GraphOp::UpdateProperty {
1162 entity_id: "s1".into(),
1163 key: "x".into(),
1164 value: Value::Int(42),
1165 },
1166 3,
1167 "inst-a",
1168 ));
1169
1170 let node = g.get_node("s1").unwrap();
1171 assert_eq!(node.properties.get("x"), Some(&Value::Int(42)));
1172 assert_eq!(node.properties.get("y"), Some(&Value::Int(99)));
1173 }
1174
1175 #[test]
1176 fn add_wins_over_remove() {
1177 let mut g = MaterializedGraph::new(test_ontology());
1179 g.apply(&make_entry(
1180 GraphOp::AddNode {
1181 node_id: "s1".into(),
1182 node_type: "entity".into(),
1183 label: "s1".into(),
1184 properties: BTreeMap::new(),
1185 subtype: None,
1186 },
1187 1,
1188 "inst-a",
1189 ));
1190 g.apply(&make_entry(
1192 GraphOp::RemoveNode {
1193 node_id: "s1".into(),
1194 },
1195 2,
1196 "inst-a",
1197 ));
1198 assert!(g.get_node("s1").is_none());
1199
1200 g.apply(&make_entry(
1202 GraphOp::AddNode {
1203 node_id: "s1".into(),
1204 node_type: "entity".into(),
1205 label: "s1 v2".into(),
1206 properties: BTreeMap::new(),
1207 subtype: None,
1208 },
1209 3,
1210 "inst-b",
1211 ));
1212 let node = g.get_node("s1").unwrap();
1213 assert_eq!(node.label, "s1 v2");
1214 assert!(!node.tombstoned);
1215 }
1216}