1use std::collections::{BTreeMap, HashMap, HashSet};
2
3use crate::clock::LamportClock;
4use crate::entry::{Entry, GraphOp, Hash, Value};
5use crate::ontology::Ontology;
6
7#[derive(Debug, Clone, PartialEq)]
9pub struct Node {
10 pub node_id: String,
11 pub node_type: String,
12 pub subtype: Option<String>,
13 pub label: String,
14 pub properties: BTreeMap<String, Value>,
15 pub property_clocks: HashMap<String, LamportClock>,
19 pub last_clock: LamportClock,
22 pub last_add_clock: LamportClock,
26 pub tombstoned: bool,
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub struct Edge {
33 pub edge_id: String,
34 pub edge_type: String,
35 pub source_id: String,
36 pub target_id: String,
37 pub properties: BTreeMap<String, Value>,
38 pub property_clocks: HashMap<String, LamportClock>,
40 pub last_clock: LamportClock,
41 pub last_add_clock: LamportClock,
43 pub tombstoned: bool,
44}
45
46pub struct MaterializedGraph {
57 pub nodes: HashMap<String, Node>,
59 pub edges: HashMap<String, Edge>,
61 pub outgoing: HashMap<String, HashSet<String>>,
63 pub incoming: HashMap<String, HashSet<String>>,
65 pub by_type: HashMap<String, HashSet<String>>,
67 pub ontology: Ontology,
69 pub quarantined: HashSet<Hash>,
76}
77
78impl MaterializedGraph {
79 pub fn new(ontology: Ontology) -> Self {
81 Self {
82 nodes: HashMap::new(),
83 edges: HashMap::new(),
84 outgoing: HashMap::new(),
85 incoming: HashMap::new(),
86 by_type: HashMap::new(),
87 ontology,
88 quarantined: HashSet::new(),
89 }
90 }
91
92 pub fn apply(&mut self, entry: &Entry) {
99 match &entry.payload {
100 GraphOp::Checkpoint { ops, op_clocks, .. } => {
101 for (i, op) in ops.iter().enumerate() {
104 let clock = if i < op_clocks.len() {
105 LamportClock::with_values(&entry.author, op_clocks[i].0, op_clocks[i].1)
106 } else {
107 entry.clock.clone() };
109 let synthetic = Entry::new(op.clone(), vec![], vec![], clock, &entry.author);
110 self.apply(&synthetic);
111 }
112 }
113 GraphOp::DefineOntology { .. } => {
114 }
116 GraphOp::ExtendOntology { extension } => {
117 if let Err(_e) = self.ontology.merge_extension(extension) {
118 self.quarantined.insert(entry.hash);
119 }
120 }
121 GraphOp::AddNode {
122 node_id,
123 node_type,
124 subtype,
125 label,
126 properties,
127 } => {
128 if let Err(_e) =
130 self.ontology
131 .validate_node(node_type, subtype.as_deref(), properties)
132 {
133 self.quarantined.insert(entry.hash);
134 return;
135 }
136 self.apply_add_node(
137 node_id,
138 node_type,
139 subtype.as_deref(),
140 label,
141 properties,
142 &entry.clock,
143 );
144 }
145 GraphOp::AddEdge {
146 edge_id,
147 edge_type,
148 source_id,
149 target_id,
150 properties,
151 } => {
152 if !self.ontology.edge_types.contains_key(edge_type.as_str()) {
154 self.quarantined.insert(entry.hash);
155 return;
156 }
157 if let (Some(src), Some(tgt)) = (
161 self.nodes.get(source_id.as_str()),
162 self.nodes.get(target_id.as_str()),
163 ) {
164 if self
165 .ontology
166 .validate_edge(edge_type, &src.node_type, &tgt.node_type, properties)
167 .is_err()
168 {
169 self.quarantined.insert(entry.hash);
170 return;
171 }
172 }
173 self.apply_add_edge(
174 edge_id,
175 edge_type,
176 source_id,
177 target_id,
178 properties,
179 &entry.clock,
180 );
181 }
182 GraphOp::UpdateProperty {
183 entity_id,
184 key,
185 value,
186 } => {
187 self.apply_update_property(entity_id, key, value, &entry.clock);
188 }
189 GraphOp::RemoveNode { node_id } => {
190 self.apply_remove_node(node_id, &entry.clock);
191 }
192 GraphOp::RemoveEdge { edge_id } => {
193 self.apply_remove_edge(edge_id, &entry.clock);
194 }
195 GraphOp::DefineLens { .. } => {
196 }
198 }
199 }
200
201 pub fn apply_all(&mut self, entries: &[&Entry]) {
203 for entry in entries {
204 self.apply(entry);
205 }
206 }
207
208 pub fn rebuild(&mut self, entries: &[&Entry]) {
210 self.nodes.clear();
211 self.edges.clear();
212 self.outgoing.clear();
213 self.incoming.clear();
214 self.by_type.clear();
215 self.quarantined.clear();
216 self.apply_all(entries);
217 }
218
219 pub fn get_node(&self, node_id: &str) -> Option<&Node> {
223 self.nodes.get(node_id).filter(|n| !n.tombstoned)
224 }
225
226 pub fn get_edge(&self, edge_id: &str) -> Option<&Edge> {
228 self.edges.get(edge_id).filter(|e| !e.tombstoned)
229 }
230
231 pub fn nodes_by_type(&self, node_type: &str) -> Vec<&Node> {
234 let mut types = vec![node_type.to_string()];
235 types.extend(
236 self.ontology
237 .descendants(node_type)
238 .into_iter()
239 .map(|s| s.to_string()),
240 );
241 types
242 .iter()
243 .flat_map(|t| self.by_type.get(t.as_str()))
244 .flatten()
245 .filter_map(|id| self.get_node(id))
246 .collect()
247 }
248
249 pub fn nodes_by_subtype(&self, subtype: &str) -> Vec<&Node> {
251 self.nodes
252 .values()
253 .filter(|n| !n.tombstoned && n.subtype.as_deref() == Some(subtype))
254 .collect()
255 }
256
257 pub fn nodes_by_property(&self, key: &str, value: &Value) -> Vec<&Node> {
259 self.nodes
260 .values()
261 .filter(|n| !n.tombstoned && n.properties.get(key) == Some(value))
262 .collect()
263 }
264
265 pub fn outgoing_edges(&self, node_id: &str) -> Vec<&Edge> {
267 match self.outgoing.get(node_id) {
268 Some(edge_ids) => edge_ids
269 .iter()
270 .filter_map(|eid| self.get_edge(eid))
271 .filter(|e| self.is_node_live(&e.target_id))
272 .collect(),
273 None => vec![],
274 }
275 }
276
277 pub fn incoming_edges(&self, node_id: &str) -> Vec<&Edge> {
279 match self.incoming.get(node_id) {
280 Some(edge_ids) => edge_ids
281 .iter()
282 .filter_map(|eid| self.get_edge(eid))
283 .filter(|e| self.is_node_live(&e.source_id))
284 .collect(),
285 None => vec![],
286 }
287 }
288
289 pub fn estimated_memory_bytes(&self) -> usize {
294 let mut total = 0;
295 for node in self.nodes.values() {
297 total += node.node_id.len() + node.node_type.len() + node.label.len();
298 total += node.subtype.as_ref().map_or(0, |s| s.len());
299 for (k, v) in &node.properties {
301 total += k.len() + std::mem::size_of_val(v) + 48; }
303 total += 128; }
305 for edge in self.edges.values() {
307 total += edge.edge_id.len() + edge.edge_type.len();
308 total += edge.source_id.len() + edge.target_id.len();
309 for (k, v) in &edge.properties {
310 total += k.len() + std::mem::size_of_val(v) + 48;
311 }
312 total += 128;
313 }
314 for (k, set) in &self.outgoing {
316 total += k.len() + set.len() * 32;
317 }
318 for (k, set) in &self.incoming {
319 total += k.len() + set.len() * 32;
320 }
321 for (k, set) in &self.by_type {
323 total += k.len() + set.len() * 32;
324 }
325 total += self.quarantined.len() * 48;
327 total
328 }
329
330 pub fn all_nodes(&self) -> Vec<&Node> {
332 self.nodes.values().filter(|n| !n.tombstoned).collect()
333 }
334
335 pub fn all_edges(&self) -> Vec<&Edge> {
337 self.edges
338 .values()
339 .filter(|e| {
340 !e.tombstoned && self.is_node_live(&e.source_id) && self.is_node_live(&e.target_id)
341 })
342 .collect()
343 }
344
345 pub fn neighbors(&self, node_id: &str) -> Vec<&str> {
347 self.outgoing_edges(node_id)
348 .iter()
349 .map(|e| e.target_id.as_str())
350 .collect()
351 }
352
353 pub fn reverse_neighbors(&self, node_id: &str) -> Vec<&str> {
355 self.incoming_edges(node_id)
356 .iter()
357 .map(|e| e.source_id.as_str())
358 .collect()
359 }
360
361 fn apply_add_node(
364 &mut self,
365 node_id: &str,
366 node_type: &str,
367 subtype: Option<&str>,
368 label: &str,
369 properties: &BTreeMap<String, Value>,
370 clock: &LamportClock,
371 ) {
372 if let Some(existing) = self.nodes.get_mut(node_id) {
373 existing.tombstoned = false;
375 if clock_wins(clock, &existing.last_add_clock) {
377 existing.last_add_clock = clock.clone();
378 }
379 if clock_wins(clock, &existing.last_clock) {
381 existing.label = label.to_string();
382 existing.subtype = subtype.map(|s| s.to_string());
383 existing.last_clock = clock.clone();
384 }
385 merge_properties_lww(
386 &mut existing.properties,
387 &mut existing.property_clocks,
388 properties,
389 clock,
390 );
391 } else {
392 let property_clocks: HashMap<String, LamportClock> = properties
393 .keys()
394 .map(|k| (k.clone(), clock.clone()))
395 .collect();
396 let node = Node {
397 node_id: node_id.to_string(),
398 node_type: node_type.to_string(),
399 subtype: subtype.map(|s| s.to_string()),
400 label: label.to_string(),
401 properties: properties.clone(),
402 property_clocks,
403 last_clock: clock.clone(),
404 last_add_clock: clock.clone(),
405 tombstoned: false,
406 };
407 self.by_type
408 .entry(node_type.to_string())
409 .or_default()
410 .insert(node_id.to_string());
411 self.nodes.insert(node_id.to_string(), node);
412 }
413 }
414
415 fn apply_add_edge(
416 &mut self,
417 edge_id: &str,
418 edge_type: &str,
419 source_id: &str,
420 target_id: &str,
421 properties: &BTreeMap<String, Value>,
422 clock: &LamportClock,
423 ) {
424 if let Some(existing) = self.edges.get_mut(edge_id) {
425 existing.tombstoned = false;
427 if clock_wins(clock, &existing.last_add_clock) {
428 existing.last_add_clock = clock.clone();
429 }
430 if clock_wins(clock, &existing.last_clock) {
431 existing.last_clock = clock.clone();
432 }
433 merge_properties_lww(
434 &mut existing.properties,
435 &mut existing.property_clocks,
436 properties,
437 clock,
438 );
439 } else {
440 let property_clocks: HashMap<String, LamportClock> = properties
441 .keys()
442 .map(|k| (k.clone(), clock.clone()))
443 .collect();
444 let edge = Edge {
445 edge_id: edge_id.to_string(),
446 edge_type: edge_type.to_string(),
447 source_id: source_id.to_string(),
448 target_id: target_id.to_string(),
449 properties: properties.clone(),
450 property_clocks,
451 last_clock: clock.clone(),
452 last_add_clock: clock.clone(),
453 tombstoned: false,
454 };
455 self.outgoing
456 .entry(source_id.to_string())
457 .or_default()
458 .insert(edge_id.to_string());
459 self.incoming
460 .entry(target_id.to_string())
461 .or_default()
462 .insert(edge_id.to_string());
463 self.edges.insert(edge_id.to_string(), edge);
464 }
465 }
466
467 fn apply_update_property(
468 &mut self,
469 entity_id: &str,
470 key: &str,
471 value: &Value,
472 clock: &LamportClock,
473 ) {
474 if let Some(node) = self.nodes.get_mut(entity_id) {
477 let dominated = node
478 .property_clocks
479 .get(key)
480 .map(|c| clock_wins(clock, c))
481 .unwrap_or(true);
482 if dominated {
483 node.properties.insert(key.to_string(), value.clone());
484 node.property_clocks.insert(key.to_string(), clock.clone());
485 }
486 if clock_wins(clock, &node.last_clock) {
488 node.last_clock = clock.clone();
489 }
490 } else if let Some(edge) = self.edges.get_mut(entity_id) {
491 let dominated = edge
492 .property_clocks
493 .get(key)
494 .map(|c| clock_wins(clock, c))
495 .unwrap_or(true);
496 if dominated {
497 edge.properties.insert(key.to_string(), value.clone());
498 edge.property_clocks.insert(key.to_string(), clock.clone());
499 }
500 if clock_wins(clock, &edge.last_clock) {
501 edge.last_clock = clock.clone();
502 }
503 }
504 }
506
507 fn apply_remove_node(&mut self, node_id: &str, clock: &LamportClock) {
508 if let Some(node) = self.nodes.get_mut(node_id) {
509 if clock_wins(clock, &node.last_add_clock) {
513 node.tombstoned = true;
514 node.last_clock = clock.clone();
515 }
516 }
517 }
520
521 fn apply_remove_edge(&mut self, edge_id: &str, clock: &LamportClock) {
522 if let Some(edge) = self.edges.get_mut(edge_id) {
523 if clock_wins(clock, &edge.last_add_clock) {
525 edge.tombstoned = true;
526 edge.last_clock = clock.clone();
527 }
528 }
529 }
530
531 fn is_node_live(&self, node_id: &str) -> bool {
532 self.nodes
533 .get(node_id)
534 .map(|n| !n.tombstoned)
535 .unwrap_or(false)
536 }
537}
538
539fn merge_properties_lww(
542 existing_props: &mut BTreeMap<String, Value>,
543 existing_clocks: &mut HashMap<String, LamportClock>,
544 new_props: &BTreeMap<String, Value>,
545 clock: &LamportClock,
546) {
547 for (k, v) in new_props {
548 let dominated = existing_clocks
549 .get(k)
550 .map(|c| clock_wins(clock, c))
551 .unwrap_or(true);
552 if dominated {
553 existing_props.insert(k.clone(), v.clone());
554 existing_clocks.insert(k.clone(), clock.clone());
555 }
556 }
557}
558
559fn clock_wins(new_clock: &LamportClock, existing_clock: &LamportClock) -> bool {
562 new_clock.cmp_order(existing_clock) == std::cmp::Ordering::Greater
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568 use crate::entry::Entry;
569 use crate::ontology::{EdgeTypeDef, NodeTypeDef};
570
571 fn test_ontology() -> Ontology {
572 Ontology {
573 node_types: BTreeMap::from([
574 (
575 "entity".into(),
576 NodeTypeDef {
577 description: None,
578 properties: BTreeMap::new(),
579 subtypes: None,
580 parent_type: None,
581 },
582 ),
583 (
584 "signal".into(),
585 NodeTypeDef {
586 description: None,
587 properties: BTreeMap::new(),
588 subtypes: None,
589 parent_type: None,
590 },
591 ),
592 ]),
593 edge_types: BTreeMap::from([
594 (
595 "RUNS_ON".into(),
596 EdgeTypeDef {
597 description: None,
598 source_types: vec!["entity".into()],
599 target_types: vec!["entity".into()],
600 properties: BTreeMap::new(),
601 },
602 ),
603 (
604 "OBSERVES".into(),
605 EdgeTypeDef {
606 description: None,
607 source_types: vec!["signal".into()],
608 target_types: vec!["entity".into()],
609 properties: BTreeMap::new(),
610 },
611 ),
612 ]),
613 }
614 }
615
616 fn make_entry(op: GraphOp, clock_time: u64, author: &str) -> Entry {
617 Entry::new(
618 op,
619 vec![],
620 vec![],
621 LamportClock::with_values(author, clock_time, 0),
622 author,
623 )
624 }
625
626 #[test]
629 fn add_node_appears_in_query() {
630 let mut g = MaterializedGraph::new(test_ontology());
631 let entry = make_entry(
632 GraphOp::AddNode {
633 node_id: "server-1".into(),
634 node_type: "entity".into(),
635 label: "Server 1".into(),
636 properties: BTreeMap::from([("ip".into(), Value::String("10.0.0.1".into()))]),
637 subtype: None,
638 },
639 1,
640 "inst-a",
641 );
642 g.apply(&entry);
643
644 let node = g.get_node("server-1").unwrap();
645 assert_eq!(node.node_type, "entity");
646 assert_eq!(node.label, "Server 1");
647 assert_eq!(
648 node.properties.get("ip"),
649 Some(&Value::String("10.0.0.1".into()))
650 );
651 }
652
653 #[test]
654 fn add_edge_creates_adjacency() {
655 let mut g = MaterializedGraph::new(test_ontology());
656 g.apply(&make_entry(
657 GraphOp::AddNode {
658 node_id: "svc".into(),
659 node_type: "entity".into(),
660 label: "svc".into(),
661 properties: BTreeMap::new(),
662 subtype: None,
663 },
664 1,
665 "inst-a",
666 ));
667 g.apply(&make_entry(
668 GraphOp::AddNode {
669 node_id: "srv".into(),
670 node_type: "entity".into(),
671 label: "srv".into(),
672 properties: BTreeMap::new(),
673 subtype: None,
674 },
675 2,
676 "inst-a",
677 ));
678 g.apply(&make_entry(
679 GraphOp::AddEdge {
680 edge_id: "e1".into(),
681 edge_type: "RUNS_ON".into(),
682 source_id: "svc".into(),
683 target_id: "srv".into(),
684 properties: BTreeMap::new(),
685 },
686 3,
687 "inst-a",
688 ));
689
690 let out = g.outgoing_edges("svc");
692 assert_eq!(out.len(), 1);
693 assert_eq!(out[0].target_id, "srv");
694
695 let inc = g.incoming_edges("srv");
696 assert_eq!(inc.len(), 1);
697 assert_eq!(inc[0].source_id, "svc");
698
699 assert_eq!(g.neighbors("svc"), vec!["srv"]);
700 }
701
702 #[test]
703 fn update_property_reflected() {
704 let mut g = MaterializedGraph::new(test_ontology());
705 g.apply(&make_entry(
706 GraphOp::AddNode {
707 node_id: "s1".into(),
708 node_type: "entity".into(),
709 label: "s1".into(),
710 properties: BTreeMap::new(),
711 subtype: None,
712 },
713 1,
714 "inst-a",
715 ));
716 g.apply(&make_entry(
717 GraphOp::UpdateProperty {
718 entity_id: "s1".into(),
719 key: "cpu".into(),
720 value: Value::Float(85.5),
721 },
722 2,
723 "inst-a",
724 ));
725
726 let node = g.get_node("s1").unwrap();
727 assert_eq!(node.properties.get("cpu"), Some(&Value::Float(85.5)));
728 }
729
730 #[test]
731 fn remove_node_cascades_edges() {
732 let mut g = MaterializedGraph::new(test_ontology());
733 g.apply(&make_entry(
734 GraphOp::AddNode {
735 node_id: "a".into(),
736 node_type: "entity".into(),
737 label: "a".into(),
738 properties: BTreeMap::new(),
739 subtype: None,
740 },
741 1,
742 "inst-a",
743 ));
744 g.apply(&make_entry(
745 GraphOp::AddNode {
746 node_id: "b".into(),
747 node_type: "entity".into(),
748 label: "b".into(),
749 properties: BTreeMap::new(),
750 subtype: None,
751 },
752 2,
753 "inst-a",
754 ));
755 g.apply(&make_entry(
756 GraphOp::AddEdge {
757 edge_id: "e1".into(),
758 edge_type: "RUNS_ON".into(),
759 source_id: "a".into(),
760 target_id: "b".into(),
761 properties: BTreeMap::new(),
762 },
763 3,
764 "inst-a",
765 ));
766 assert_eq!(g.all_edges().len(), 1);
767
768 g.apply(&make_entry(
770 GraphOp::RemoveNode {
771 node_id: "b".into(),
772 },
773 4,
774 "inst-a",
775 ));
776 assert!(g.get_node("b").is_none());
777 assert_eq!(g.all_edges().len(), 0);
779 assert_eq!(g.outgoing_edges("a").len(), 0);
781 }
782
783 #[test]
784 fn remove_edge_preserves_nodes() {
785 let mut g = MaterializedGraph::new(test_ontology());
786 g.apply(&make_entry(
787 GraphOp::AddNode {
788 node_id: "a".into(),
789 node_type: "entity".into(),
790 label: "a".into(),
791 properties: BTreeMap::new(),
792 subtype: None,
793 },
794 1,
795 "inst-a",
796 ));
797 g.apply(&make_entry(
798 GraphOp::AddNode {
799 node_id: "b".into(),
800 node_type: "entity".into(),
801 label: "b".into(),
802 properties: BTreeMap::new(),
803 subtype: None,
804 },
805 2,
806 "inst-a",
807 ));
808 g.apply(&make_entry(
809 GraphOp::AddEdge {
810 edge_id: "e1".into(),
811 edge_type: "RUNS_ON".into(),
812 source_id: "a".into(),
813 target_id: "b".into(),
814 properties: BTreeMap::new(),
815 },
816 3,
817 "inst-a",
818 ));
819 g.apply(&make_entry(
820 GraphOp::RemoveEdge {
821 edge_id: "e1".into(),
822 },
823 4,
824 "inst-a",
825 ));
826
827 assert!(g.get_node("a").is_some());
829 assert!(g.get_node("b").is_some());
830 assert!(g.get_edge("e1").is_none());
832 assert_eq!(g.all_edges().len(), 0);
833 }
834
835 #[test]
836 fn query_by_type_filters() {
837 let mut g = MaterializedGraph::new(test_ontology());
838 g.apply(&make_entry(
839 GraphOp::AddNode {
840 node_id: "s1".into(),
841 node_type: "entity".into(),
842 label: "s1".into(),
843 properties: BTreeMap::new(),
844 subtype: None,
845 },
846 1,
847 "inst-a",
848 ));
849 g.apply(&make_entry(
850 GraphOp::AddNode {
851 node_id: "s2".into(),
852 node_type: "entity".into(),
853 label: "s2".into(),
854 properties: BTreeMap::new(),
855 subtype: None,
856 },
857 2,
858 "inst-a",
859 ));
860 g.apply(&make_entry(
861 GraphOp::AddNode {
862 node_id: "alert".into(),
863 node_type: "signal".into(),
864 label: "alert".into(),
865 properties: BTreeMap::new(),
866 subtype: None,
867 },
868 3,
869 "inst-a",
870 ));
871
872 let entities = g.nodes_by_type("entity");
873 assert_eq!(entities.len(), 2);
874 let signals = g.nodes_by_type("signal");
875 assert_eq!(signals.len(), 1);
876 assert_eq!(signals[0].node_id, "alert");
877 }
878
879 #[test]
880 fn query_by_property_filters() {
881 let mut g = MaterializedGraph::new(test_ontology());
882 g.apply(&make_entry(
883 GraphOp::AddNode {
884 node_id: "s1".into(),
885 node_type: "entity".into(),
886 label: "s1".into(),
887 properties: BTreeMap::from([("status".into(), Value::String("alive".into()))]),
888 subtype: None,
889 },
890 1,
891 "inst-a",
892 ));
893 g.apply(&make_entry(
894 GraphOp::AddNode {
895 node_id: "s2".into(),
896 node_type: "entity".into(),
897 label: "s2".into(),
898 properties: BTreeMap::from([("status".into(), Value::String("dead".into()))]),
899 subtype: None,
900 },
901 2,
902 "inst-a",
903 ));
904
905 let alive = g.nodes_by_property("status", &Value::String("alive".into()));
906 assert_eq!(alive.len(), 1);
907 assert_eq!(alive[0].node_id, "s1");
908 }
909
910 #[test]
911 fn materialization_from_empty() {
912 let mut g1 = MaterializedGraph::new(test_ontology());
914 let entries = vec![
915 make_entry(
916 GraphOp::DefineOntology {
917 ontology: test_ontology(),
918 },
919 0,
920 "inst-a",
921 ),
922 make_entry(
923 GraphOp::AddNode {
924 node_id: "a".into(),
925 node_type: "entity".into(),
926 label: "a".into(),
927 properties: BTreeMap::new(),
928 subtype: None,
929 },
930 1,
931 "inst-a",
932 ),
933 make_entry(
934 GraphOp::AddNode {
935 node_id: "b".into(),
936 node_type: "entity".into(),
937 label: "b".into(),
938 properties: BTreeMap::new(),
939 subtype: None,
940 },
941 2,
942 "inst-a",
943 ),
944 make_entry(
945 GraphOp::AddEdge {
946 edge_id: "e1".into(),
947 edge_type: "RUNS_ON".into(),
948 source_id: "a".into(),
949 target_id: "b".into(),
950 properties: BTreeMap::new(),
951 },
952 3,
953 "inst-a",
954 ),
955 ];
956 for e in &entries {
957 g1.apply(e);
958 }
959
960 let mut g2 = MaterializedGraph::new(test_ontology());
962 let refs: Vec<&Entry> = entries.iter().collect();
963 g2.rebuild(&refs);
964
965 assert_eq!(g1.all_nodes().len(), g2.all_nodes().len());
967 assert_eq!(g1.all_edges().len(), g2.all_edges().len());
968 for node in g1.all_nodes() {
969 let n2 = g2.get_node(&node.node_id).unwrap();
970 assert_eq!(node.node_type, n2.node_type);
971 assert_eq!(node.properties, n2.properties);
972 }
973 }
974
975 #[test]
976 fn incremental_equals_full() {
977 let entries = vec![
978 make_entry(
979 GraphOp::DefineOntology {
980 ontology: test_ontology(),
981 },
982 0,
983 "inst-a",
984 ),
985 make_entry(
986 GraphOp::AddNode {
987 node_id: "a".into(),
988 node_type: "entity".into(),
989 label: "a".into(),
990 properties: BTreeMap::from([("x".into(), Value::Int(1))]),
991 subtype: None,
992 },
993 1,
994 "inst-a",
995 ),
996 make_entry(
997 GraphOp::UpdateProperty {
998 entity_id: "a".into(),
999 key: "x".into(),
1000 value: Value::Int(2),
1001 },
1002 2,
1003 "inst-a",
1004 ),
1005 make_entry(
1006 GraphOp::AddNode {
1007 node_id: "b".into(),
1008 node_type: "entity".into(),
1009 label: "b".into(),
1010 properties: BTreeMap::new(),
1011 subtype: None,
1012 },
1013 3,
1014 "inst-a",
1015 ),
1016 make_entry(
1017 GraphOp::AddEdge {
1018 edge_id: "e1".into(),
1019 edge_type: "RUNS_ON".into(),
1020 source_id: "a".into(),
1021 target_id: "b".into(),
1022 properties: BTreeMap::new(),
1023 },
1024 4,
1025 "inst-a",
1026 ),
1027 make_entry(
1028 GraphOp::RemoveEdge {
1029 edge_id: "e1".into(),
1030 },
1031 5,
1032 "inst-a",
1033 ),
1034 ];
1035
1036 let mut g_inc = MaterializedGraph::new(test_ontology());
1038 for e in &entries {
1039 g_inc.apply(e);
1040 }
1041
1042 let mut g_full = MaterializedGraph::new(test_ontology());
1044 let refs: Vec<&Entry> = entries.iter().collect();
1045 g_full.rebuild(&refs);
1046
1047 assert_eq!(
1049 g_inc.get_node("a").unwrap().properties.get("x"),
1050 Some(&Value::Int(2))
1051 );
1052 assert_eq!(
1053 g_full.get_node("a").unwrap().properties.get("x"),
1054 Some(&Value::Int(2))
1055 );
1056 assert_eq!(g_inc.all_edges().len(), 0);
1058 assert_eq!(g_full.all_edges().len(), 0);
1059 }
1060
1061 #[test]
1062 fn lww_concurrent_property_update() {
1063 let mut g = MaterializedGraph::new(test_ontology());
1065 g.apply(&make_entry(
1066 GraphOp::AddNode {
1067 node_id: "s1".into(),
1068 node_type: "entity".into(),
1069 label: "s1".into(),
1070 properties: BTreeMap::new(),
1071 subtype: None,
1072 },
1073 1,
1074 "inst-a",
1075 ));
1076 g.apply(&make_entry(
1078 GraphOp::UpdateProperty {
1079 entity_id: "s1".into(),
1080 key: "status".into(),
1081 value: Value::String("alive".into()),
1082 },
1083 2,
1084 "inst-a",
1085 ));
1086 g.apply(&make_entry(
1088 GraphOp::UpdateProperty {
1089 entity_id: "s1".into(),
1090 key: "status".into(),
1091 value: Value::String("dead".into()),
1092 },
1093 3,
1094 "inst-b",
1095 ));
1096 assert_eq!(
1097 g.get_node("s1").unwrap().properties.get("status"),
1098 Some(&Value::String("dead".into()))
1099 );
1100 }
1101
1102 #[test]
1103 fn lww_tiebreak_by_instance_id() {
1104 let mut g = MaterializedGraph::new(test_ontology());
1106 g.apply(&make_entry(
1107 GraphOp::AddNode {
1108 node_id: "s1".into(),
1109 node_type: "entity".into(),
1110 label: "s1".into(),
1111 properties: BTreeMap::new(),
1112 subtype: None,
1113 },
1114 1,
1115 "inst-a",
1116 ));
1117 g.apply(&make_entry(
1119 GraphOp::UpdateProperty {
1120 entity_id: "s1".into(),
1121 key: "x".into(),
1122 value: Value::Int(1),
1123 },
1124 5,
1125 "inst-a",
1126 ));
1127 g.apply(&make_entry(
1128 GraphOp::UpdateProperty {
1129 entity_id: "s1".into(),
1130 key: "x".into(),
1131 value: Value::Int(2),
1132 },
1133 5,
1134 "inst-b",
1135 ));
1136 assert_eq!(
1138 g.get_node("s1").unwrap().properties.get("x"),
1139 Some(&Value::Int(1))
1140 );
1141 }
1142
1143 #[test]
1144 fn lww_per_property_concurrent_different_keys() {
1145 let mut g = MaterializedGraph::new(test_ontology());
1149 g.apply(&make_entry(
1150 GraphOp::AddNode {
1151 node_id: "s1".into(),
1152 node_type: "entity".into(),
1153 label: "s1".into(),
1154 properties: BTreeMap::from([
1155 ("x".into(), Value::Int(0)),
1156 ("y".into(), Value::Int(0)),
1157 ]),
1158 subtype: None,
1159 },
1160 1,
1161 "inst-a",
1162 ));
1163 g.apply(&make_entry(
1165 GraphOp::UpdateProperty {
1166 entity_id: "s1".into(),
1167 key: "x".into(),
1168 value: Value::Int(42),
1169 },
1170 3,
1171 "inst-a",
1172 ));
1173 g.apply(&make_entry(
1175 GraphOp::UpdateProperty {
1176 entity_id: "s1".into(),
1177 key: "y".into(),
1178 value: Value::Int(99),
1179 },
1180 3,
1181 "inst-b",
1182 ));
1183
1184 let node = g.get_node("s1").unwrap();
1185 assert_eq!(
1187 node.properties.get("x"),
1188 Some(&Value::Int(42)),
1189 "update to 'x' must not be rejected by concurrent update to 'y'"
1190 );
1191 assert_eq!(
1192 node.properties.get("y"),
1193 Some(&Value::Int(99)),
1194 "update to 'y' must not be rejected by concurrent update to 'x'"
1195 );
1196 }
1197
1198 #[test]
1199 fn lww_per_property_order_independent() {
1200 let mut g = MaterializedGraph::new(test_ontology());
1202 g.apply(&make_entry(
1203 GraphOp::AddNode {
1204 node_id: "s1".into(),
1205 node_type: "entity".into(),
1206 label: "s1".into(),
1207 properties: BTreeMap::from([
1208 ("x".into(), Value::Int(0)),
1209 ("y".into(), Value::Int(0)),
1210 ]),
1211 subtype: None,
1212 },
1213 1,
1214 "inst-a",
1215 ));
1216 g.apply(&make_entry(
1218 GraphOp::UpdateProperty {
1219 entity_id: "s1".into(),
1220 key: "y".into(),
1221 value: Value::Int(99),
1222 },
1223 3,
1224 "inst-b",
1225 ));
1226 g.apply(&make_entry(
1227 GraphOp::UpdateProperty {
1228 entity_id: "s1".into(),
1229 key: "x".into(),
1230 value: Value::Int(42),
1231 },
1232 3,
1233 "inst-a",
1234 ));
1235
1236 let node = g.get_node("s1").unwrap();
1237 assert_eq!(node.properties.get("x"), Some(&Value::Int(42)));
1238 assert_eq!(node.properties.get("y"), Some(&Value::Int(99)));
1239 }
1240
1241 #[test]
1242 fn add_wins_over_remove() {
1243 let mut g = MaterializedGraph::new(test_ontology());
1245 g.apply(&make_entry(
1246 GraphOp::AddNode {
1247 node_id: "s1".into(),
1248 node_type: "entity".into(),
1249 label: "s1".into(),
1250 properties: BTreeMap::new(),
1251 subtype: None,
1252 },
1253 1,
1254 "inst-a",
1255 ));
1256 g.apply(&make_entry(
1258 GraphOp::RemoveNode {
1259 node_id: "s1".into(),
1260 },
1261 2,
1262 "inst-a",
1263 ));
1264 assert!(g.get_node("s1").is_none());
1265
1266 g.apply(&make_entry(
1268 GraphOp::AddNode {
1269 node_id: "s1".into(),
1270 node_type: "entity".into(),
1271 label: "s1 v2".into(),
1272 properties: BTreeMap::new(),
1273 subtype: None,
1274 },
1275 3,
1276 "inst-b",
1277 ));
1278 let node = g.get_node("s1").unwrap();
1279 assert_eq!(node.label, "s1 v2");
1280 assert!(!node.tombstoned);
1281 }
1282}