1use std::collections::{BTreeMap, HashMap, HashSet};
2
3use crate::clock::LamportClock;
4use crate::entry::{Entry, GraphOp, Hash, Value};
5use crate::ontology::Ontology;
6
7#[derive(Debug, Clone, PartialEq)]
9pub struct Node {
10 pub node_id: String,
11 pub node_type: String,
12 pub subtype: Option<String>,
13 pub label: String,
14 pub properties: BTreeMap<String, Value>,
15 pub property_clocks: HashMap<String, LamportClock>,
19 pub last_clock: LamportClock,
22 pub last_add_clock: LamportClock,
26 pub tombstoned: bool,
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub struct Edge {
33 pub edge_id: String,
34 pub edge_type: String,
35 pub source_id: String,
36 pub target_id: String,
37 pub properties: BTreeMap<String, Value>,
38 pub property_clocks: HashMap<String, LamportClock>,
40 pub last_clock: LamportClock,
41 pub last_add_clock: LamportClock,
43 pub tombstoned: bool,
44}
45
46pub struct MaterializedGraph {
57 pub nodes: HashMap<String, Node>,
59 pub edges: HashMap<String, Edge>,
61 pub outgoing: HashMap<String, HashSet<String>>,
63 pub incoming: HashMap<String, HashSet<String>>,
65 pub by_type: HashMap<String, HashSet<String>>,
67 pub ontology: Ontology,
69 pub quarantined: HashSet<Hash>,
76}
77
78impl MaterializedGraph {
79 pub fn new(ontology: Ontology) -> Self {
81 Self {
82 nodes: HashMap::new(),
83 edges: HashMap::new(),
84 outgoing: HashMap::new(),
85 incoming: HashMap::new(),
86 by_type: HashMap::new(),
87 ontology,
88 quarantined: HashSet::new(),
89 }
90 }
91
92 pub fn apply(&mut self, entry: &Entry) {
99 match &entry.payload {
100 GraphOp::Checkpoint { ops, op_clocks, .. } => {
101 for (i, op) in ops.iter().enumerate() {
104 let clock = if i < op_clocks.len() {
105 LamportClock::with_values(&entry.author, op_clocks[i].0, op_clocks[i].1)
106 } else {
107 entry.clock.clone() };
109 let synthetic = Entry::new(op.clone(), vec![], vec![], clock, &entry.author);
110 self.apply(&synthetic);
111 }
112 }
113 GraphOp::DefineOntology { .. } => {
114 }
116 GraphOp::ExtendOntology { extension } => {
117 if let Err(_e) = self.ontology.merge_extension(extension) {
118 self.quarantined.insert(entry.hash);
119 }
120 }
121 GraphOp::AddNode {
122 node_id,
123 node_type,
124 subtype,
125 label,
126 properties,
127 } => {
128 if let Err(_e) =
130 self.ontology
131 .validate_node(node_type, subtype.as_deref(), properties)
132 {
133 self.quarantined.insert(entry.hash);
134 return;
135 }
136 self.apply_add_node(
137 node_id,
138 node_type,
139 subtype.as_deref(),
140 label,
141 properties,
142 &entry.clock,
143 );
144 }
145 GraphOp::AddEdge {
146 edge_id,
147 edge_type,
148 source_id,
149 target_id,
150 properties,
151 } => {
152 if !self.ontology.edge_types.contains_key(edge_type.as_str()) {
154 self.quarantined.insert(entry.hash);
155 return;
156 }
157 if let (Some(src), Some(tgt)) = (
161 self.nodes.get(source_id.as_str()),
162 self.nodes.get(target_id.as_str()),
163 ) {
164 if self
165 .ontology
166 .validate_edge(edge_type, &src.node_type, &tgt.node_type, properties)
167 .is_err()
168 {
169 self.quarantined.insert(entry.hash);
170 return;
171 }
172 }
173 self.apply_add_edge(
174 edge_id,
175 edge_type,
176 source_id,
177 target_id,
178 properties,
179 &entry.clock,
180 );
181 }
182 GraphOp::UpdateProperty {
183 entity_id,
184 key,
185 value,
186 } => {
187 self.apply_update_property(entity_id, key, value, &entry.clock);
188 }
189 GraphOp::RemoveNode { node_id } => {
190 self.apply_remove_node(node_id, &entry.clock);
191 }
192 GraphOp::RemoveEdge { edge_id } => {
193 self.apply_remove_edge(edge_id, &entry.clock);
194 }
195 }
196 }
197
198 pub fn apply_all(&mut self, entries: &[&Entry]) {
200 for entry in entries {
201 self.apply(entry);
202 }
203 }
204
205 pub fn rebuild(&mut self, entries: &[&Entry]) {
207 self.nodes.clear();
208 self.edges.clear();
209 self.outgoing.clear();
210 self.incoming.clear();
211 self.by_type.clear();
212 self.quarantined.clear();
213 self.apply_all(entries);
214 }
215
216 pub fn get_node(&self, node_id: &str) -> Option<&Node> {
220 self.nodes.get(node_id).filter(|n| !n.tombstoned)
221 }
222
223 pub fn get_edge(&self, edge_id: &str) -> Option<&Edge> {
225 self.edges.get(edge_id).filter(|e| !e.tombstoned)
226 }
227
228 pub fn nodes_by_type(&self, node_type: &str) -> Vec<&Node> {
231 let mut types = vec![node_type.to_string()];
232 types.extend(
233 self.ontology
234 .descendants(node_type)
235 .into_iter()
236 .map(|s| s.to_string()),
237 );
238 types
239 .iter()
240 .flat_map(|t| self.by_type.get(t.as_str()))
241 .flatten()
242 .filter_map(|id| self.get_node(id))
243 .collect()
244 }
245
246 pub fn nodes_by_subtype(&self, subtype: &str) -> Vec<&Node> {
248 self.nodes
249 .values()
250 .filter(|n| !n.tombstoned && n.subtype.as_deref() == Some(subtype))
251 .collect()
252 }
253
254 pub fn nodes_by_property(&self, key: &str, value: &Value) -> Vec<&Node> {
256 self.nodes
257 .values()
258 .filter(|n| !n.tombstoned && n.properties.get(key) == Some(value))
259 .collect()
260 }
261
262 pub fn outgoing_edges(&self, node_id: &str) -> Vec<&Edge> {
264 match self.outgoing.get(node_id) {
265 Some(edge_ids) => edge_ids
266 .iter()
267 .filter_map(|eid| self.get_edge(eid))
268 .filter(|e| self.is_node_live(&e.target_id))
269 .collect(),
270 None => vec![],
271 }
272 }
273
274 pub fn incoming_edges(&self, node_id: &str) -> Vec<&Edge> {
276 match self.incoming.get(node_id) {
277 Some(edge_ids) => edge_ids
278 .iter()
279 .filter_map(|eid| self.get_edge(eid))
280 .filter(|e| self.is_node_live(&e.source_id))
281 .collect(),
282 None => vec![],
283 }
284 }
285
286 pub fn estimated_memory_bytes(&self) -> usize {
291 let mut total = 0;
292 for node in self.nodes.values() {
294 total += node.node_id.len() + node.node_type.len() + node.label.len();
295 total += node.subtype.as_ref().map_or(0, |s| s.len());
296 for (k, v) in &node.properties {
298 total += k.len() + std::mem::size_of_val(v) + 48; }
300 total += 128; }
302 for edge in self.edges.values() {
304 total += edge.edge_id.len() + edge.edge_type.len();
305 total += edge.source_id.len() + edge.target_id.len();
306 for (k, v) in &edge.properties {
307 total += k.len() + std::mem::size_of_val(v) + 48;
308 }
309 total += 128;
310 }
311 for (k, set) in &self.outgoing {
313 total += k.len() + set.len() * 32;
314 }
315 for (k, set) in &self.incoming {
316 total += k.len() + set.len() * 32;
317 }
318 for (k, set) in &self.by_type {
320 total += k.len() + set.len() * 32;
321 }
322 total += self.quarantined.len() * 48;
324 total
325 }
326
327 pub fn all_nodes(&self) -> Vec<&Node> {
329 self.nodes.values().filter(|n| !n.tombstoned).collect()
330 }
331
332 pub fn all_edges(&self) -> Vec<&Edge> {
334 self.edges
335 .values()
336 .filter(|e| {
337 !e.tombstoned && self.is_node_live(&e.source_id) && self.is_node_live(&e.target_id)
338 })
339 .collect()
340 }
341
342 pub fn neighbors(&self, node_id: &str) -> Vec<&str> {
344 self.outgoing_edges(node_id)
345 .iter()
346 .map(|e| e.target_id.as_str())
347 .collect()
348 }
349
350 pub fn reverse_neighbors(&self, node_id: &str) -> Vec<&str> {
352 self.incoming_edges(node_id)
353 .iter()
354 .map(|e| e.source_id.as_str())
355 .collect()
356 }
357
358 fn apply_add_node(
361 &mut self,
362 node_id: &str,
363 node_type: &str,
364 subtype: Option<&str>,
365 label: &str,
366 properties: &BTreeMap<String, Value>,
367 clock: &LamportClock,
368 ) {
369 if let Some(existing) = self.nodes.get_mut(node_id) {
370 existing.tombstoned = false;
372 if clock_wins(clock, &existing.last_add_clock) {
374 existing.last_add_clock = clock.clone();
375 }
376 if clock_wins(clock, &existing.last_clock) {
378 existing.label = label.to_string();
379 existing.subtype = subtype.map(|s| s.to_string());
380 existing.last_clock = clock.clone();
381 }
382 merge_properties_lww(
383 &mut existing.properties,
384 &mut existing.property_clocks,
385 properties,
386 clock,
387 );
388 } else {
389 let property_clocks: HashMap<String, LamportClock> = properties
390 .keys()
391 .map(|k| (k.clone(), clock.clone()))
392 .collect();
393 let node = Node {
394 node_id: node_id.to_string(),
395 node_type: node_type.to_string(),
396 subtype: subtype.map(|s| s.to_string()),
397 label: label.to_string(),
398 properties: properties.clone(),
399 property_clocks,
400 last_clock: clock.clone(),
401 last_add_clock: clock.clone(),
402 tombstoned: false,
403 };
404 self.by_type
405 .entry(node_type.to_string())
406 .or_default()
407 .insert(node_id.to_string());
408 self.nodes.insert(node_id.to_string(), node);
409 }
410 }
411
412 fn apply_add_edge(
413 &mut self,
414 edge_id: &str,
415 edge_type: &str,
416 source_id: &str,
417 target_id: &str,
418 properties: &BTreeMap<String, Value>,
419 clock: &LamportClock,
420 ) {
421 if let Some(existing) = self.edges.get_mut(edge_id) {
422 existing.tombstoned = false;
424 if clock_wins(clock, &existing.last_add_clock) {
425 existing.last_add_clock = clock.clone();
426 }
427 if clock_wins(clock, &existing.last_clock) {
428 existing.last_clock = clock.clone();
429 }
430 merge_properties_lww(
431 &mut existing.properties,
432 &mut existing.property_clocks,
433 properties,
434 clock,
435 );
436 } else {
437 let property_clocks: HashMap<String, LamportClock> = properties
438 .keys()
439 .map(|k| (k.clone(), clock.clone()))
440 .collect();
441 let edge = Edge {
442 edge_id: edge_id.to_string(),
443 edge_type: edge_type.to_string(),
444 source_id: source_id.to_string(),
445 target_id: target_id.to_string(),
446 properties: properties.clone(),
447 property_clocks,
448 last_clock: clock.clone(),
449 last_add_clock: clock.clone(),
450 tombstoned: false,
451 };
452 self.outgoing
453 .entry(source_id.to_string())
454 .or_default()
455 .insert(edge_id.to_string());
456 self.incoming
457 .entry(target_id.to_string())
458 .or_default()
459 .insert(edge_id.to_string());
460 self.edges.insert(edge_id.to_string(), edge);
461 }
462 }
463
464 fn apply_update_property(
465 &mut self,
466 entity_id: &str,
467 key: &str,
468 value: &Value,
469 clock: &LamportClock,
470 ) {
471 if let Some(node) = self.nodes.get_mut(entity_id) {
474 let dominated = node
475 .property_clocks
476 .get(key)
477 .map(|c| clock_wins(clock, c))
478 .unwrap_or(true);
479 if dominated {
480 node.properties.insert(key.to_string(), value.clone());
481 node.property_clocks.insert(key.to_string(), clock.clone());
482 }
483 if clock_wins(clock, &node.last_clock) {
485 node.last_clock = clock.clone();
486 }
487 } else if let Some(edge) = self.edges.get_mut(entity_id) {
488 let dominated = edge
489 .property_clocks
490 .get(key)
491 .map(|c| clock_wins(clock, c))
492 .unwrap_or(true);
493 if dominated {
494 edge.properties.insert(key.to_string(), value.clone());
495 edge.property_clocks.insert(key.to_string(), clock.clone());
496 }
497 if clock_wins(clock, &edge.last_clock) {
498 edge.last_clock = clock.clone();
499 }
500 }
501 }
503
504 fn apply_remove_node(&mut self, node_id: &str, clock: &LamportClock) {
505 if let Some(node) = self.nodes.get_mut(node_id) {
506 if clock_wins(clock, &node.last_add_clock) {
510 node.tombstoned = true;
511 node.last_clock = clock.clone();
512 }
513 }
514 }
517
518 fn apply_remove_edge(&mut self, edge_id: &str, clock: &LamportClock) {
519 if let Some(edge) = self.edges.get_mut(edge_id) {
520 if clock_wins(clock, &edge.last_add_clock) {
522 edge.tombstoned = true;
523 edge.last_clock = clock.clone();
524 }
525 }
526 }
527
528 fn is_node_live(&self, node_id: &str) -> bool {
529 self.nodes
530 .get(node_id)
531 .map(|n| !n.tombstoned)
532 .unwrap_or(false)
533 }
534}
535
536fn merge_properties_lww(
539 existing_props: &mut BTreeMap<String, Value>,
540 existing_clocks: &mut HashMap<String, LamportClock>,
541 new_props: &BTreeMap<String, Value>,
542 clock: &LamportClock,
543) {
544 for (k, v) in new_props {
545 let dominated = existing_clocks
546 .get(k)
547 .map(|c| clock_wins(clock, c))
548 .unwrap_or(true);
549 if dominated {
550 existing_props.insert(k.clone(), v.clone());
551 existing_clocks.insert(k.clone(), clock.clone());
552 }
553 }
554}
555
556fn clock_wins(new_clock: &LamportClock, existing_clock: &LamportClock) -> bool {
559 new_clock.cmp_order(existing_clock) == std::cmp::Ordering::Greater
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565 use crate::entry::Entry;
566 use crate::ontology::{EdgeTypeDef, NodeTypeDef};
567
568 fn test_ontology() -> Ontology {
569 Ontology {
570 node_types: BTreeMap::from([
571 (
572 "entity".into(),
573 NodeTypeDef {
574 description: None,
575 properties: BTreeMap::new(),
576 subtypes: None,
577 parent_type: None,
578 },
579 ),
580 (
581 "signal".into(),
582 NodeTypeDef {
583 description: None,
584 properties: BTreeMap::new(),
585 subtypes: None,
586 parent_type: None,
587 },
588 ),
589 ]),
590 edge_types: BTreeMap::from([
591 (
592 "RUNS_ON".into(),
593 EdgeTypeDef {
594 description: None,
595 source_types: vec!["entity".into()],
596 target_types: vec!["entity".into()],
597 properties: BTreeMap::new(),
598 },
599 ),
600 (
601 "OBSERVES".into(),
602 EdgeTypeDef {
603 description: None,
604 source_types: vec!["signal".into()],
605 target_types: vec!["entity".into()],
606 properties: BTreeMap::new(),
607 },
608 ),
609 ]),
610 }
611 }
612
613 fn make_entry(op: GraphOp, clock_time: u64, author: &str) -> Entry {
614 Entry::new(
615 op,
616 vec![],
617 vec![],
618 LamportClock::with_values(author, clock_time, 0),
619 author,
620 )
621 }
622
623 #[test]
626 fn add_node_appears_in_query() {
627 let mut g = MaterializedGraph::new(test_ontology());
628 let entry = make_entry(
629 GraphOp::AddNode {
630 node_id: "server-1".into(),
631 node_type: "entity".into(),
632 label: "Server 1".into(),
633 properties: BTreeMap::from([("ip".into(), Value::String("10.0.0.1".into()))]),
634 subtype: None,
635 },
636 1,
637 "inst-a",
638 );
639 g.apply(&entry);
640
641 let node = g.get_node("server-1").unwrap();
642 assert_eq!(node.node_type, "entity");
643 assert_eq!(node.label, "Server 1");
644 assert_eq!(
645 node.properties.get("ip"),
646 Some(&Value::String("10.0.0.1".into()))
647 );
648 }
649
650 #[test]
651 fn add_edge_creates_adjacency() {
652 let mut g = MaterializedGraph::new(test_ontology());
653 g.apply(&make_entry(
654 GraphOp::AddNode {
655 node_id: "svc".into(),
656 node_type: "entity".into(),
657 label: "svc".into(),
658 properties: BTreeMap::new(),
659 subtype: None,
660 },
661 1,
662 "inst-a",
663 ));
664 g.apply(&make_entry(
665 GraphOp::AddNode {
666 node_id: "srv".into(),
667 node_type: "entity".into(),
668 label: "srv".into(),
669 properties: BTreeMap::new(),
670 subtype: None,
671 },
672 2,
673 "inst-a",
674 ));
675 g.apply(&make_entry(
676 GraphOp::AddEdge {
677 edge_id: "e1".into(),
678 edge_type: "RUNS_ON".into(),
679 source_id: "svc".into(),
680 target_id: "srv".into(),
681 properties: BTreeMap::new(),
682 },
683 3,
684 "inst-a",
685 ));
686
687 let out = g.outgoing_edges("svc");
689 assert_eq!(out.len(), 1);
690 assert_eq!(out[0].target_id, "srv");
691
692 let inc = g.incoming_edges("srv");
693 assert_eq!(inc.len(), 1);
694 assert_eq!(inc[0].source_id, "svc");
695
696 assert_eq!(g.neighbors("svc"), vec!["srv"]);
697 }
698
699 #[test]
700 fn update_property_reflected() {
701 let mut g = MaterializedGraph::new(test_ontology());
702 g.apply(&make_entry(
703 GraphOp::AddNode {
704 node_id: "s1".into(),
705 node_type: "entity".into(),
706 label: "s1".into(),
707 properties: BTreeMap::new(),
708 subtype: None,
709 },
710 1,
711 "inst-a",
712 ));
713 g.apply(&make_entry(
714 GraphOp::UpdateProperty {
715 entity_id: "s1".into(),
716 key: "cpu".into(),
717 value: Value::Float(85.5),
718 },
719 2,
720 "inst-a",
721 ));
722
723 let node = g.get_node("s1").unwrap();
724 assert_eq!(node.properties.get("cpu"), Some(&Value::Float(85.5)));
725 }
726
727 #[test]
728 fn remove_node_cascades_edges() {
729 let mut g = MaterializedGraph::new(test_ontology());
730 g.apply(&make_entry(
731 GraphOp::AddNode {
732 node_id: "a".into(),
733 node_type: "entity".into(),
734 label: "a".into(),
735 properties: BTreeMap::new(),
736 subtype: None,
737 },
738 1,
739 "inst-a",
740 ));
741 g.apply(&make_entry(
742 GraphOp::AddNode {
743 node_id: "b".into(),
744 node_type: "entity".into(),
745 label: "b".into(),
746 properties: BTreeMap::new(),
747 subtype: None,
748 },
749 2,
750 "inst-a",
751 ));
752 g.apply(&make_entry(
753 GraphOp::AddEdge {
754 edge_id: "e1".into(),
755 edge_type: "RUNS_ON".into(),
756 source_id: "a".into(),
757 target_id: "b".into(),
758 properties: BTreeMap::new(),
759 },
760 3,
761 "inst-a",
762 ));
763 assert_eq!(g.all_edges().len(), 1);
764
765 g.apply(&make_entry(
767 GraphOp::RemoveNode {
768 node_id: "b".into(),
769 },
770 4,
771 "inst-a",
772 ));
773 assert!(g.get_node("b").is_none());
774 assert_eq!(g.all_edges().len(), 0);
776 assert_eq!(g.outgoing_edges("a").len(), 0);
778 }
779
780 #[test]
781 fn remove_edge_preserves_nodes() {
782 let mut g = MaterializedGraph::new(test_ontology());
783 g.apply(&make_entry(
784 GraphOp::AddNode {
785 node_id: "a".into(),
786 node_type: "entity".into(),
787 label: "a".into(),
788 properties: BTreeMap::new(),
789 subtype: None,
790 },
791 1,
792 "inst-a",
793 ));
794 g.apply(&make_entry(
795 GraphOp::AddNode {
796 node_id: "b".into(),
797 node_type: "entity".into(),
798 label: "b".into(),
799 properties: BTreeMap::new(),
800 subtype: None,
801 },
802 2,
803 "inst-a",
804 ));
805 g.apply(&make_entry(
806 GraphOp::AddEdge {
807 edge_id: "e1".into(),
808 edge_type: "RUNS_ON".into(),
809 source_id: "a".into(),
810 target_id: "b".into(),
811 properties: BTreeMap::new(),
812 },
813 3,
814 "inst-a",
815 ));
816 g.apply(&make_entry(
817 GraphOp::RemoveEdge {
818 edge_id: "e1".into(),
819 },
820 4,
821 "inst-a",
822 ));
823
824 assert!(g.get_node("a").is_some());
826 assert!(g.get_node("b").is_some());
827 assert!(g.get_edge("e1").is_none());
829 assert_eq!(g.all_edges().len(), 0);
830 }
831
832 #[test]
833 fn query_by_type_filters() {
834 let mut g = MaterializedGraph::new(test_ontology());
835 g.apply(&make_entry(
836 GraphOp::AddNode {
837 node_id: "s1".into(),
838 node_type: "entity".into(),
839 label: "s1".into(),
840 properties: BTreeMap::new(),
841 subtype: None,
842 },
843 1,
844 "inst-a",
845 ));
846 g.apply(&make_entry(
847 GraphOp::AddNode {
848 node_id: "s2".into(),
849 node_type: "entity".into(),
850 label: "s2".into(),
851 properties: BTreeMap::new(),
852 subtype: None,
853 },
854 2,
855 "inst-a",
856 ));
857 g.apply(&make_entry(
858 GraphOp::AddNode {
859 node_id: "alert".into(),
860 node_type: "signal".into(),
861 label: "alert".into(),
862 properties: BTreeMap::new(),
863 subtype: None,
864 },
865 3,
866 "inst-a",
867 ));
868
869 let entities = g.nodes_by_type("entity");
870 assert_eq!(entities.len(), 2);
871 let signals = g.nodes_by_type("signal");
872 assert_eq!(signals.len(), 1);
873 assert_eq!(signals[0].node_id, "alert");
874 }
875
876 #[test]
877 fn query_by_property_filters() {
878 let mut g = MaterializedGraph::new(test_ontology());
879 g.apply(&make_entry(
880 GraphOp::AddNode {
881 node_id: "s1".into(),
882 node_type: "entity".into(),
883 label: "s1".into(),
884 properties: BTreeMap::from([("status".into(), Value::String("alive".into()))]),
885 subtype: None,
886 },
887 1,
888 "inst-a",
889 ));
890 g.apply(&make_entry(
891 GraphOp::AddNode {
892 node_id: "s2".into(),
893 node_type: "entity".into(),
894 label: "s2".into(),
895 properties: BTreeMap::from([("status".into(), Value::String("dead".into()))]),
896 subtype: None,
897 },
898 2,
899 "inst-a",
900 ));
901
902 let alive = g.nodes_by_property("status", &Value::String("alive".into()));
903 assert_eq!(alive.len(), 1);
904 assert_eq!(alive[0].node_id, "s1");
905 }
906
907 #[test]
908 fn materialization_from_empty() {
909 let mut g1 = MaterializedGraph::new(test_ontology());
911 let entries = vec![
912 make_entry(
913 GraphOp::DefineOntology {
914 ontology: test_ontology(),
915 },
916 0,
917 "inst-a",
918 ),
919 make_entry(
920 GraphOp::AddNode {
921 node_id: "a".into(),
922 node_type: "entity".into(),
923 label: "a".into(),
924 properties: BTreeMap::new(),
925 subtype: None,
926 },
927 1,
928 "inst-a",
929 ),
930 make_entry(
931 GraphOp::AddNode {
932 node_id: "b".into(),
933 node_type: "entity".into(),
934 label: "b".into(),
935 properties: BTreeMap::new(),
936 subtype: None,
937 },
938 2,
939 "inst-a",
940 ),
941 make_entry(
942 GraphOp::AddEdge {
943 edge_id: "e1".into(),
944 edge_type: "RUNS_ON".into(),
945 source_id: "a".into(),
946 target_id: "b".into(),
947 properties: BTreeMap::new(),
948 },
949 3,
950 "inst-a",
951 ),
952 ];
953 for e in &entries {
954 g1.apply(e);
955 }
956
957 let mut g2 = MaterializedGraph::new(test_ontology());
959 let refs: Vec<&Entry> = entries.iter().collect();
960 g2.rebuild(&refs);
961
962 assert_eq!(g1.all_nodes().len(), g2.all_nodes().len());
964 assert_eq!(g1.all_edges().len(), g2.all_edges().len());
965 for node in g1.all_nodes() {
966 let n2 = g2.get_node(&node.node_id).unwrap();
967 assert_eq!(node.node_type, n2.node_type);
968 assert_eq!(node.properties, n2.properties);
969 }
970 }
971
972 #[test]
973 fn incremental_equals_full() {
974 let entries = vec![
975 make_entry(
976 GraphOp::DefineOntology {
977 ontology: test_ontology(),
978 },
979 0,
980 "inst-a",
981 ),
982 make_entry(
983 GraphOp::AddNode {
984 node_id: "a".into(),
985 node_type: "entity".into(),
986 label: "a".into(),
987 properties: BTreeMap::from([("x".into(), Value::Int(1))]),
988 subtype: None,
989 },
990 1,
991 "inst-a",
992 ),
993 make_entry(
994 GraphOp::UpdateProperty {
995 entity_id: "a".into(),
996 key: "x".into(),
997 value: Value::Int(2),
998 },
999 2,
1000 "inst-a",
1001 ),
1002 make_entry(
1003 GraphOp::AddNode {
1004 node_id: "b".into(),
1005 node_type: "entity".into(),
1006 label: "b".into(),
1007 properties: BTreeMap::new(),
1008 subtype: None,
1009 },
1010 3,
1011 "inst-a",
1012 ),
1013 make_entry(
1014 GraphOp::AddEdge {
1015 edge_id: "e1".into(),
1016 edge_type: "RUNS_ON".into(),
1017 source_id: "a".into(),
1018 target_id: "b".into(),
1019 properties: BTreeMap::new(),
1020 },
1021 4,
1022 "inst-a",
1023 ),
1024 make_entry(
1025 GraphOp::RemoveEdge {
1026 edge_id: "e1".into(),
1027 },
1028 5,
1029 "inst-a",
1030 ),
1031 ];
1032
1033 let mut g_inc = MaterializedGraph::new(test_ontology());
1035 for e in &entries {
1036 g_inc.apply(e);
1037 }
1038
1039 let mut g_full = MaterializedGraph::new(test_ontology());
1041 let refs: Vec<&Entry> = entries.iter().collect();
1042 g_full.rebuild(&refs);
1043
1044 assert_eq!(
1046 g_inc.get_node("a").unwrap().properties.get("x"),
1047 Some(&Value::Int(2))
1048 );
1049 assert_eq!(
1050 g_full.get_node("a").unwrap().properties.get("x"),
1051 Some(&Value::Int(2))
1052 );
1053 assert_eq!(g_inc.all_edges().len(), 0);
1055 assert_eq!(g_full.all_edges().len(), 0);
1056 }
1057
1058 #[test]
1059 fn lww_concurrent_property_update() {
1060 let mut g = MaterializedGraph::new(test_ontology());
1062 g.apply(&make_entry(
1063 GraphOp::AddNode {
1064 node_id: "s1".into(),
1065 node_type: "entity".into(),
1066 label: "s1".into(),
1067 properties: BTreeMap::new(),
1068 subtype: None,
1069 },
1070 1,
1071 "inst-a",
1072 ));
1073 g.apply(&make_entry(
1075 GraphOp::UpdateProperty {
1076 entity_id: "s1".into(),
1077 key: "status".into(),
1078 value: Value::String("alive".into()),
1079 },
1080 2,
1081 "inst-a",
1082 ));
1083 g.apply(&make_entry(
1085 GraphOp::UpdateProperty {
1086 entity_id: "s1".into(),
1087 key: "status".into(),
1088 value: Value::String("dead".into()),
1089 },
1090 3,
1091 "inst-b",
1092 ));
1093 assert_eq!(
1094 g.get_node("s1").unwrap().properties.get("status"),
1095 Some(&Value::String("dead".into()))
1096 );
1097 }
1098
1099 #[test]
1100 fn lww_tiebreak_by_instance_id() {
1101 let mut g = MaterializedGraph::new(test_ontology());
1103 g.apply(&make_entry(
1104 GraphOp::AddNode {
1105 node_id: "s1".into(),
1106 node_type: "entity".into(),
1107 label: "s1".into(),
1108 properties: BTreeMap::new(),
1109 subtype: None,
1110 },
1111 1,
1112 "inst-a",
1113 ));
1114 g.apply(&make_entry(
1116 GraphOp::UpdateProperty {
1117 entity_id: "s1".into(),
1118 key: "x".into(),
1119 value: Value::Int(1),
1120 },
1121 5,
1122 "inst-a",
1123 ));
1124 g.apply(&make_entry(
1125 GraphOp::UpdateProperty {
1126 entity_id: "s1".into(),
1127 key: "x".into(),
1128 value: Value::Int(2),
1129 },
1130 5,
1131 "inst-b",
1132 ));
1133 assert_eq!(
1135 g.get_node("s1").unwrap().properties.get("x"),
1136 Some(&Value::Int(1))
1137 );
1138 }
1139
1140 #[test]
1141 fn lww_per_property_concurrent_different_keys() {
1142 let mut g = MaterializedGraph::new(test_ontology());
1146 g.apply(&make_entry(
1147 GraphOp::AddNode {
1148 node_id: "s1".into(),
1149 node_type: "entity".into(),
1150 label: "s1".into(),
1151 properties: BTreeMap::from([
1152 ("x".into(), Value::Int(0)),
1153 ("y".into(), Value::Int(0)),
1154 ]),
1155 subtype: None,
1156 },
1157 1,
1158 "inst-a",
1159 ));
1160 g.apply(&make_entry(
1162 GraphOp::UpdateProperty {
1163 entity_id: "s1".into(),
1164 key: "x".into(),
1165 value: Value::Int(42),
1166 },
1167 3,
1168 "inst-a",
1169 ));
1170 g.apply(&make_entry(
1172 GraphOp::UpdateProperty {
1173 entity_id: "s1".into(),
1174 key: "y".into(),
1175 value: Value::Int(99),
1176 },
1177 3,
1178 "inst-b",
1179 ));
1180
1181 let node = g.get_node("s1").unwrap();
1182 assert_eq!(
1184 node.properties.get("x"),
1185 Some(&Value::Int(42)),
1186 "update to 'x' must not be rejected by concurrent update to 'y'"
1187 );
1188 assert_eq!(
1189 node.properties.get("y"),
1190 Some(&Value::Int(99)),
1191 "update to 'y' must not be rejected by concurrent update to 'x'"
1192 );
1193 }
1194
1195 #[test]
1196 fn lww_per_property_order_independent() {
1197 let mut g = MaterializedGraph::new(test_ontology());
1199 g.apply(&make_entry(
1200 GraphOp::AddNode {
1201 node_id: "s1".into(),
1202 node_type: "entity".into(),
1203 label: "s1".into(),
1204 properties: BTreeMap::from([
1205 ("x".into(), Value::Int(0)),
1206 ("y".into(), Value::Int(0)),
1207 ]),
1208 subtype: None,
1209 },
1210 1,
1211 "inst-a",
1212 ));
1213 g.apply(&make_entry(
1215 GraphOp::UpdateProperty {
1216 entity_id: "s1".into(),
1217 key: "y".into(),
1218 value: Value::Int(99),
1219 },
1220 3,
1221 "inst-b",
1222 ));
1223 g.apply(&make_entry(
1224 GraphOp::UpdateProperty {
1225 entity_id: "s1".into(),
1226 key: "x".into(),
1227 value: Value::Int(42),
1228 },
1229 3,
1230 "inst-a",
1231 ));
1232
1233 let node = g.get_node("s1").unwrap();
1234 assert_eq!(node.properties.get("x"), Some(&Value::Int(42)));
1235 assert_eq!(node.properties.get("y"), Some(&Value::Int(99)));
1236 }
1237
1238 #[test]
1239 fn add_wins_over_remove() {
1240 let mut g = MaterializedGraph::new(test_ontology());
1242 g.apply(&make_entry(
1243 GraphOp::AddNode {
1244 node_id: "s1".into(),
1245 node_type: "entity".into(),
1246 label: "s1".into(),
1247 properties: BTreeMap::new(),
1248 subtype: None,
1249 },
1250 1,
1251 "inst-a",
1252 ));
1253 g.apply(&make_entry(
1255 GraphOp::RemoveNode {
1256 node_id: "s1".into(),
1257 },
1258 2,
1259 "inst-a",
1260 ));
1261 assert!(g.get_node("s1").is_none());
1262
1263 g.apply(&make_entry(
1265 GraphOp::AddNode {
1266 node_id: "s1".into(),
1267 node_type: "entity".into(),
1268 label: "s1 v2".into(),
1269 properties: BTreeMap::new(),
1270 subtype: None,
1271 },
1272 3,
1273 "inst-b",
1274 ));
1275 let node = g.get_node("s1").unwrap();
1276 assert_eq!(node.label, "s1 v2");
1277 assert!(!node.tombstoned);
1278 }
1279}