1use std::collections::{BTreeMap, HashMap, HashSet};
2
3use crate::clock::LamportClock;
4use crate::entry::{Entry, GraphOp, Hash, Value};
5use crate::ontology::Ontology;
6
7#[derive(Debug, Clone, PartialEq)]
9pub struct Node {
10 pub node_id: String,
11 pub node_type: String,
12 pub subtype: Option<String>,
13 pub label: String,
14 pub properties: BTreeMap<String, Value>,
15 pub property_clocks: HashMap<String, LamportClock>,
19 pub last_clock: LamportClock,
22 pub last_add_clock: LamportClock,
26 pub tombstoned: bool,
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub struct Edge {
33 pub edge_id: String,
34 pub edge_type: String,
35 pub source_id: String,
36 pub target_id: String,
37 pub properties: BTreeMap<String, Value>,
38 pub property_clocks: HashMap<String, LamportClock>,
40 pub last_clock: LamportClock,
41 pub last_add_clock: LamportClock,
43 pub tombstoned: bool,
44}
45
46pub struct MaterializedGraph {
57 pub nodes: HashMap<String, Node>,
59 pub edges: HashMap<String, Edge>,
61 pub outgoing: HashMap<String, HashSet<String>>,
63 pub incoming: HashMap<String, HashSet<String>>,
65 pub by_type: HashMap<String, HashSet<String>>,
67 pub ontology: Ontology,
69 pub quarantined: HashSet<Hash>,
73}
74
75impl MaterializedGraph {
76 pub fn new(ontology: Ontology) -> Self {
78 Self {
79 nodes: HashMap::new(),
80 edges: HashMap::new(),
81 outgoing: HashMap::new(),
82 incoming: HashMap::new(),
83 by_type: HashMap::new(),
84 ontology,
85 quarantined: HashSet::new(),
86 }
87 }
88
89 pub fn apply(&mut self, entry: &Entry) {
96 match &entry.payload {
97 GraphOp::Checkpoint { ops, op_clocks, .. } => {
98 for (i, op) in ops.iter().enumerate() {
101 let clock = if i < op_clocks.len() {
102 LamportClock::with_values(&entry.author, op_clocks[i].0, op_clocks[i].1)
103 } else {
104 entry.clock.clone() };
106 let synthetic = Entry::new(op.clone(), vec![], vec![], clock, &entry.author);
107 self.apply(&synthetic);
108 }
109 }
110 GraphOp::DefineOntology { .. } => {
111 }
113 GraphOp::ExtendOntology { extension } => {
114 if let Err(_e) = self.ontology.merge_extension(extension) {
115 self.quarantined.insert(entry.hash);
116 }
117 }
118 GraphOp::AddNode {
119 node_id,
120 node_type,
121 subtype,
122 label,
123 properties,
124 } => {
125 if let Err(_e) =
127 self.ontology
128 .validate_node(node_type, subtype.as_deref(), properties)
129 {
130 self.quarantined.insert(entry.hash);
131 return;
132 }
133 self.apply_add_node(
134 node_id,
135 node_type,
136 subtype.as_deref(),
137 label,
138 properties,
139 &entry.clock,
140 );
141 }
142 GraphOp::AddEdge {
143 edge_id,
144 edge_type,
145 source_id,
146 target_id,
147 properties,
148 } => {
149 if !self.ontology.edge_types.contains_key(edge_type.as_str()) {
151 self.quarantined.insert(entry.hash);
152 return;
153 }
154 if let (Some(src), Some(tgt)) = (
158 self.nodes.get(source_id.as_str()),
159 self.nodes.get(target_id.as_str()),
160 ) {
161 if let Err(_) = self.ontology.validate_edge(
162 edge_type,
163 &src.node_type,
164 &tgt.node_type,
165 properties,
166 ) {
167 self.quarantined.insert(entry.hash);
168 return;
169 }
170 }
171 self.apply_add_edge(
172 edge_id,
173 edge_type,
174 source_id,
175 target_id,
176 properties,
177 &entry.clock,
178 );
179 }
180 GraphOp::UpdateProperty {
181 entity_id,
182 key,
183 value,
184 } => {
185 self.apply_update_property(entity_id, key, value, &entry.clock);
186 }
187 GraphOp::RemoveNode { node_id } => {
188 self.apply_remove_node(node_id, &entry.clock);
189 }
190 GraphOp::RemoveEdge { edge_id } => {
191 self.apply_remove_edge(edge_id, &entry.clock);
192 }
193 }
194 }
195
196 pub fn apply_all(&mut self, entries: &[&Entry]) {
198 for entry in entries {
199 self.apply(entry);
200 }
201 }
202
203 pub fn rebuild(&mut self, entries: &[&Entry]) {
205 self.nodes.clear();
206 self.edges.clear();
207 self.outgoing.clear();
208 self.incoming.clear();
209 self.by_type.clear();
210 self.quarantined.clear();
211 self.apply_all(entries);
212 }
213
214 pub fn get_node(&self, node_id: &str) -> Option<&Node> {
218 self.nodes.get(node_id).filter(|n| !n.tombstoned)
219 }
220
221 pub fn get_edge(&self, edge_id: &str) -> Option<&Edge> {
223 self.edges.get(edge_id).filter(|e| !e.tombstoned)
224 }
225
226 pub fn nodes_by_type(&self, node_type: &str) -> Vec<&Node> {
228 match self.by_type.get(node_type) {
229 Some(ids) => ids.iter().filter_map(|id| self.get_node(id)).collect(),
230 None => vec![],
231 }
232 }
233
234 pub fn nodes_by_subtype(&self, subtype: &str) -> Vec<&Node> {
236 self.nodes
237 .values()
238 .filter(|n| !n.tombstoned && n.subtype.as_deref() == Some(subtype))
239 .collect()
240 }
241
242 pub fn nodes_by_property(&self, key: &str, value: &Value) -> Vec<&Node> {
244 self.nodes
245 .values()
246 .filter(|n| !n.tombstoned && n.properties.get(key) == Some(value))
247 .collect()
248 }
249
250 pub fn outgoing_edges(&self, node_id: &str) -> Vec<&Edge> {
252 match self.outgoing.get(node_id) {
253 Some(edge_ids) => edge_ids
254 .iter()
255 .filter_map(|eid| self.get_edge(eid))
256 .filter(|e| self.is_node_live(&e.target_id))
257 .collect(),
258 None => vec![],
259 }
260 }
261
262 pub fn incoming_edges(&self, node_id: &str) -> Vec<&Edge> {
264 match self.incoming.get(node_id) {
265 Some(edge_ids) => edge_ids
266 .iter()
267 .filter_map(|eid| self.get_edge(eid))
268 .filter(|e| self.is_node_live(&e.source_id))
269 .collect(),
270 None => vec![],
271 }
272 }
273
274 pub fn all_nodes(&self) -> Vec<&Node> {
276 self.nodes.values().filter(|n| !n.tombstoned).collect()
277 }
278
279 pub fn all_edges(&self) -> Vec<&Edge> {
281 self.edges
282 .values()
283 .filter(|e| {
284 !e.tombstoned && self.is_node_live(&e.source_id) && self.is_node_live(&e.target_id)
285 })
286 .collect()
287 }
288
289 pub fn neighbors(&self, node_id: &str) -> Vec<&str> {
291 self.outgoing_edges(node_id)
292 .iter()
293 .map(|e| e.target_id.as_str())
294 .collect()
295 }
296
297 pub fn reverse_neighbors(&self, node_id: &str) -> Vec<&str> {
299 self.incoming_edges(node_id)
300 .iter()
301 .map(|e| e.source_id.as_str())
302 .collect()
303 }
304
305 fn apply_add_node(
308 &mut self,
309 node_id: &str,
310 node_type: &str,
311 subtype: Option<&str>,
312 label: &str,
313 properties: &BTreeMap<String, Value>,
314 clock: &LamportClock,
315 ) {
316 if let Some(existing) = self.nodes.get_mut(node_id) {
317 existing.tombstoned = false;
319 if clock_wins(clock, &existing.last_add_clock) {
321 existing.last_add_clock = clock.clone();
322 }
323 if clock_wins(clock, &existing.last_clock) {
325 existing.label = label.to_string();
326 existing.subtype = subtype.map(|s| s.to_string());
327 existing.last_clock = clock.clone();
328 }
329 for (k, v) in properties {
332 let dominated = existing
333 .property_clocks
334 .get(k)
335 .map(|c| clock_wins(clock, c))
336 .unwrap_or(true);
337 if dominated {
338 existing.properties.insert(k.clone(), v.clone());
339 existing.property_clocks.insert(k.clone(), clock.clone());
340 }
341 }
342 } else {
343 let property_clocks: HashMap<String, LamportClock> = properties
344 .keys()
345 .map(|k| (k.clone(), clock.clone()))
346 .collect();
347 let node = Node {
348 node_id: node_id.to_string(),
349 node_type: node_type.to_string(),
350 subtype: subtype.map(|s| s.to_string()),
351 label: label.to_string(),
352 properties: properties.clone(),
353 property_clocks,
354 last_clock: clock.clone(),
355 last_add_clock: clock.clone(),
356 tombstoned: false,
357 };
358 self.by_type
359 .entry(node_type.to_string())
360 .or_default()
361 .insert(node_id.to_string());
362 self.nodes.insert(node_id.to_string(), node);
363 }
364 }
365
366 fn apply_add_edge(
367 &mut self,
368 edge_id: &str,
369 edge_type: &str,
370 source_id: &str,
371 target_id: &str,
372 properties: &BTreeMap<String, Value>,
373 clock: &LamportClock,
374 ) {
375 if let Some(existing) = self.edges.get_mut(edge_id) {
376 existing.tombstoned = false;
378 if clock_wins(clock, &existing.last_add_clock) {
379 existing.last_add_clock = clock.clone();
380 }
381 if clock_wins(clock, &existing.last_clock) {
382 existing.last_clock = clock.clone();
383 }
384 for (k, v) in properties {
386 let dominated = existing
387 .property_clocks
388 .get(k)
389 .map(|c| clock_wins(clock, c))
390 .unwrap_or(true);
391 if dominated {
392 existing.properties.insert(k.clone(), v.clone());
393 existing.property_clocks.insert(k.clone(), clock.clone());
394 }
395 }
396 } else {
397 let property_clocks: HashMap<String, LamportClock> = properties
398 .keys()
399 .map(|k| (k.clone(), clock.clone()))
400 .collect();
401 let edge = Edge {
402 edge_id: edge_id.to_string(),
403 edge_type: edge_type.to_string(),
404 source_id: source_id.to_string(),
405 target_id: target_id.to_string(),
406 properties: properties.clone(),
407 property_clocks,
408 last_clock: clock.clone(),
409 last_add_clock: clock.clone(),
410 tombstoned: false,
411 };
412 self.outgoing
413 .entry(source_id.to_string())
414 .or_default()
415 .insert(edge_id.to_string());
416 self.incoming
417 .entry(target_id.to_string())
418 .or_default()
419 .insert(edge_id.to_string());
420 self.edges.insert(edge_id.to_string(), edge);
421 }
422 }
423
424 fn apply_update_property(
425 &mut self,
426 entity_id: &str,
427 key: &str,
428 value: &Value,
429 clock: &LamportClock,
430 ) {
431 if let Some(node) = self.nodes.get_mut(entity_id) {
434 let dominated = node
435 .property_clocks
436 .get(key)
437 .map(|c| clock_wins(clock, c))
438 .unwrap_or(true);
439 if dominated {
440 node.properties.insert(key.to_string(), value.clone());
441 node.property_clocks.insert(key.to_string(), clock.clone());
442 }
443 if clock_wins(clock, &node.last_clock) {
445 node.last_clock = clock.clone();
446 }
447 } else if let Some(edge) = self.edges.get_mut(entity_id) {
448 let dominated = edge
449 .property_clocks
450 .get(key)
451 .map(|c| clock_wins(clock, c))
452 .unwrap_or(true);
453 if dominated {
454 edge.properties.insert(key.to_string(), value.clone());
455 edge.property_clocks.insert(key.to_string(), clock.clone());
456 }
457 if clock_wins(clock, &edge.last_clock) {
458 edge.last_clock = clock.clone();
459 }
460 }
461 }
463
464 fn apply_remove_node(&mut self, node_id: &str, clock: &LamportClock) {
465 if let Some(node) = self.nodes.get_mut(node_id) {
466 if clock_wins(clock, &node.last_add_clock) {
470 node.tombstoned = true;
471 node.last_clock = clock.clone();
472 }
473 }
474 }
477
478 fn apply_remove_edge(&mut self, edge_id: &str, clock: &LamportClock) {
479 if let Some(edge) = self.edges.get_mut(edge_id) {
480 if clock_wins(clock, &edge.last_add_clock) {
482 edge.tombstoned = true;
483 edge.last_clock = clock.clone();
484 }
485 }
486 }
487
488 fn is_node_live(&self, node_id: &str) -> bool {
489 self.nodes
490 .get(node_id)
491 .map(|n| !n.tombstoned)
492 .unwrap_or(false)
493 }
494}
495
496fn clock_wins(new_clock: &LamportClock, existing_clock: &LamportClock) -> bool {
499 new_clock.cmp_order(existing_clock) == std::cmp::Ordering::Greater
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use crate::entry::Entry;
506 use crate::ontology::{EdgeTypeDef, NodeTypeDef};
507
508 fn test_ontology() -> Ontology {
509 Ontology {
510 node_types: BTreeMap::from([
511 (
512 "entity".into(),
513 NodeTypeDef {
514 description: None,
515 properties: BTreeMap::new(),
516 subtypes: None,
517 },
518 ),
519 (
520 "signal".into(),
521 NodeTypeDef {
522 description: None,
523 properties: BTreeMap::new(),
524 subtypes: None,
525 },
526 ),
527 ]),
528 edge_types: BTreeMap::from([
529 (
530 "RUNS_ON".into(),
531 EdgeTypeDef {
532 description: None,
533 source_types: vec!["entity".into()],
534 target_types: vec!["entity".into()],
535 properties: BTreeMap::new(),
536 },
537 ),
538 (
539 "OBSERVES".into(),
540 EdgeTypeDef {
541 description: None,
542 source_types: vec!["signal".into()],
543 target_types: vec!["entity".into()],
544 properties: BTreeMap::new(),
545 },
546 ),
547 ]),
548 }
549 }
550
551 fn make_entry(op: GraphOp, clock_time: u64, author: &str) -> Entry {
552 Entry::new(
553 op,
554 vec![],
555 vec![],
556 LamportClock::with_values(author, clock_time, 0),
557 author,
558 )
559 }
560
561 #[test]
564 fn add_node_appears_in_query() {
565 let mut g = MaterializedGraph::new(test_ontology());
566 let entry = make_entry(
567 GraphOp::AddNode {
568 node_id: "server-1".into(),
569 node_type: "entity".into(),
570 label: "Server 1".into(),
571 properties: BTreeMap::from([("ip".into(), Value::String("10.0.0.1".into()))]),
572 subtype: None,
573 },
574 1,
575 "inst-a",
576 );
577 g.apply(&entry);
578
579 let node = g.get_node("server-1").unwrap();
580 assert_eq!(node.node_type, "entity");
581 assert_eq!(node.label, "Server 1");
582 assert_eq!(
583 node.properties.get("ip"),
584 Some(&Value::String("10.0.0.1".into()))
585 );
586 }
587
588 #[test]
589 fn add_edge_creates_adjacency() {
590 let mut g = MaterializedGraph::new(test_ontology());
591 g.apply(&make_entry(
592 GraphOp::AddNode {
593 node_id: "svc".into(),
594 node_type: "entity".into(),
595 label: "svc".into(),
596 properties: BTreeMap::new(),
597 subtype: None,
598 },
599 1,
600 "inst-a",
601 ));
602 g.apply(&make_entry(
603 GraphOp::AddNode {
604 node_id: "srv".into(),
605 node_type: "entity".into(),
606 label: "srv".into(),
607 properties: BTreeMap::new(),
608 subtype: None,
609 },
610 2,
611 "inst-a",
612 ));
613 g.apply(&make_entry(
614 GraphOp::AddEdge {
615 edge_id: "e1".into(),
616 edge_type: "RUNS_ON".into(),
617 source_id: "svc".into(),
618 target_id: "srv".into(),
619 properties: BTreeMap::new(),
620 },
621 3,
622 "inst-a",
623 ));
624
625 let out = g.outgoing_edges("svc");
627 assert_eq!(out.len(), 1);
628 assert_eq!(out[0].target_id, "srv");
629
630 let inc = g.incoming_edges("srv");
631 assert_eq!(inc.len(), 1);
632 assert_eq!(inc[0].source_id, "svc");
633
634 assert_eq!(g.neighbors("svc"), vec!["srv"]);
635 }
636
637 #[test]
638 fn update_property_reflected() {
639 let mut g = MaterializedGraph::new(test_ontology());
640 g.apply(&make_entry(
641 GraphOp::AddNode {
642 node_id: "s1".into(),
643 node_type: "entity".into(),
644 label: "s1".into(),
645 properties: BTreeMap::new(),
646 subtype: None,
647 },
648 1,
649 "inst-a",
650 ));
651 g.apply(&make_entry(
652 GraphOp::UpdateProperty {
653 entity_id: "s1".into(),
654 key: "cpu".into(),
655 value: Value::Float(85.5),
656 },
657 2,
658 "inst-a",
659 ));
660
661 let node = g.get_node("s1").unwrap();
662 assert_eq!(node.properties.get("cpu"), Some(&Value::Float(85.5)));
663 }
664
665 #[test]
666 fn remove_node_cascades_edges() {
667 let mut g = MaterializedGraph::new(test_ontology());
668 g.apply(&make_entry(
669 GraphOp::AddNode {
670 node_id: "a".into(),
671 node_type: "entity".into(),
672 label: "a".into(),
673 properties: BTreeMap::new(),
674 subtype: None,
675 },
676 1,
677 "inst-a",
678 ));
679 g.apply(&make_entry(
680 GraphOp::AddNode {
681 node_id: "b".into(),
682 node_type: "entity".into(),
683 label: "b".into(),
684 properties: BTreeMap::new(),
685 subtype: None,
686 },
687 2,
688 "inst-a",
689 ));
690 g.apply(&make_entry(
691 GraphOp::AddEdge {
692 edge_id: "e1".into(),
693 edge_type: "RUNS_ON".into(),
694 source_id: "a".into(),
695 target_id: "b".into(),
696 properties: BTreeMap::new(),
697 },
698 3,
699 "inst-a",
700 ));
701 assert_eq!(g.all_edges().len(), 1);
702
703 g.apply(&make_entry(
705 GraphOp::RemoveNode {
706 node_id: "b".into(),
707 },
708 4,
709 "inst-a",
710 ));
711 assert!(g.get_node("b").is_none());
712 assert_eq!(g.all_edges().len(), 0);
714 assert_eq!(g.outgoing_edges("a").len(), 0);
716 }
717
718 #[test]
719 fn remove_edge_preserves_nodes() {
720 let mut g = MaterializedGraph::new(test_ontology());
721 g.apply(&make_entry(
722 GraphOp::AddNode {
723 node_id: "a".into(),
724 node_type: "entity".into(),
725 label: "a".into(),
726 properties: BTreeMap::new(),
727 subtype: None,
728 },
729 1,
730 "inst-a",
731 ));
732 g.apply(&make_entry(
733 GraphOp::AddNode {
734 node_id: "b".into(),
735 node_type: "entity".into(),
736 label: "b".into(),
737 properties: BTreeMap::new(),
738 subtype: None,
739 },
740 2,
741 "inst-a",
742 ));
743 g.apply(&make_entry(
744 GraphOp::AddEdge {
745 edge_id: "e1".into(),
746 edge_type: "RUNS_ON".into(),
747 source_id: "a".into(),
748 target_id: "b".into(),
749 properties: BTreeMap::new(),
750 },
751 3,
752 "inst-a",
753 ));
754 g.apply(&make_entry(
755 GraphOp::RemoveEdge {
756 edge_id: "e1".into(),
757 },
758 4,
759 "inst-a",
760 ));
761
762 assert!(g.get_node("a").is_some());
764 assert!(g.get_node("b").is_some());
765 assert!(g.get_edge("e1").is_none());
767 assert_eq!(g.all_edges().len(), 0);
768 }
769
770 #[test]
771 fn query_by_type_filters() {
772 let mut g = MaterializedGraph::new(test_ontology());
773 g.apply(&make_entry(
774 GraphOp::AddNode {
775 node_id: "s1".into(),
776 node_type: "entity".into(),
777 label: "s1".into(),
778 properties: BTreeMap::new(),
779 subtype: None,
780 },
781 1,
782 "inst-a",
783 ));
784 g.apply(&make_entry(
785 GraphOp::AddNode {
786 node_id: "s2".into(),
787 node_type: "entity".into(),
788 label: "s2".into(),
789 properties: BTreeMap::new(),
790 subtype: None,
791 },
792 2,
793 "inst-a",
794 ));
795 g.apply(&make_entry(
796 GraphOp::AddNode {
797 node_id: "alert".into(),
798 node_type: "signal".into(),
799 label: "alert".into(),
800 properties: BTreeMap::new(),
801 subtype: None,
802 },
803 3,
804 "inst-a",
805 ));
806
807 let entities = g.nodes_by_type("entity");
808 assert_eq!(entities.len(), 2);
809 let signals = g.nodes_by_type("signal");
810 assert_eq!(signals.len(), 1);
811 assert_eq!(signals[0].node_id, "alert");
812 }
813
814 #[test]
815 fn query_by_property_filters() {
816 let mut g = MaterializedGraph::new(test_ontology());
817 g.apply(&make_entry(
818 GraphOp::AddNode {
819 node_id: "s1".into(),
820 node_type: "entity".into(),
821 label: "s1".into(),
822 properties: BTreeMap::from([("status".into(), Value::String("alive".into()))]),
823 subtype: None,
824 },
825 1,
826 "inst-a",
827 ));
828 g.apply(&make_entry(
829 GraphOp::AddNode {
830 node_id: "s2".into(),
831 node_type: "entity".into(),
832 label: "s2".into(),
833 properties: BTreeMap::from([("status".into(), Value::String("dead".into()))]),
834 subtype: None,
835 },
836 2,
837 "inst-a",
838 ));
839
840 let alive = g.nodes_by_property("status", &Value::String("alive".into()));
841 assert_eq!(alive.len(), 1);
842 assert_eq!(alive[0].node_id, "s1");
843 }
844
845 #[test]
846 fn materialization_from_empty() {
847 let mut g1 = MaterializedGraph::new(test_ontology());
849 let entries = vec![
850 make_entry(
851 GraphOp::DefineOntology {
852 ontology: test_ontology(),
853 },
854 0,
855 "inst-a",
856 ),
857 make_entry(
858 GraphOp::AddNode {
859 node_id: "a".into(),
860 node_type: "entity".into(),
861 label: "a".into(),
862 properties: BTreeMap::new(),
863 subtype: None,
864 },
865 1,
866 "inst-a",
867 ),
868 make_entry(
869 GraphOp::AddNode {
870 node_id: "b".into(),
871 node_type: "entity".into(),
872 label: "b".into(),
873 properties: BTreeMap::new(),
874 subtype: None,
875 },
876 2,
877 "inst-a",
878 ),
879 make_entry(
880 GraphOp::AddEdge {
881 edge_id: "e1".into(),
882 edge_type: "RUNS_ON".into(),
883 source_id: "a".into(),
884 target_id: "b".into(),
885 properties: BTreeMap::new(),
886 },
887 3,
888 "inst-a",
889 ),
890 ];
891 for e in &entries {
892 g1.apply(e);
893 }
894
895 let mut g2 = MaterializedGraph::new(test_ontology());
897 let refs: Vec<&Entry> = entries.iter().collect();
898 g2.rebuild(&refs);
899
900 assert_eq!(g1.all_nodes().len(), g2.all_nodes().len());
902 assert_eq!(g1.all_edges().len(), g2.all_edges().len());
903 for node in g1.all_nodes() {
904 let n2 = g2.get_node(&node.node_id).unwrap();
905 assert_eq!(node.node_type, n2.node_type);
906 assert_eq!(node.properties, n2.properties);
907 }
908 }
909
910 #[test]
911 fn incremental_equals_full() {
912 let entries = vec![
913 make_entry(
914 GraphOp::DefineOntology {
915 ontology: test_ontology(),
916 },
917 0,
918 "inst-a",
919 ),
920 make_entry(
921 GraphOp::AddNode {
922 node_id: "a".into(),
923 node_type: "entity".into(),
924 label: "a".into(),
925 properties: BTreeMap::from([("x".into(), Value::Int(1))]),
926 subtype: None,
927 },
928 1,
929 "inst-a",
930 ),
931 make_entry(
932 GraphOp::UpdateProperty {
933 entity_id: "a".into(),
934 key: "x".into(),
935 value: Value::Int(2),
936 },
937 2,
938 "inst-a",
939 ),
940 make_entry(
941 GraphOp::AddNode {
942 node_id: "b".into(),
943 node_type: "entity".into(),
944 label: "b".into(),
945 properties: BTreeMap::new(),
946 subtype: None,
947 },
948 3,
949 "inst-a",
950 ),
951 make_entry(
952 GraphOp::AddEdge {
953 edge_id: "e1".into(),
954 edge_type: "RUNS_ON".into(),
955 source_id: "a".into(),
956 target_id: "b".into(),
957 properties: BTreeMap::new(),
958 },
959 4,
960 "inst-a",
961 ),
962 make_entry(
963 GraphOp::RemoveEdge {
964 edge_id: "e1".into(),
965 },
966 5,
967 "inst-a",
968 ),
969 ];
970
971 let mut g_inc = MaterializedGraph::new(test_ontology());
973 for e in &entries {
974 g_inc.apply(e);
975 }
976
977 let mut g_full = MaterializedGraph::new(test_ontology());
979 let refs: Vec<&Entry> = entries.iter().collect();
980 g_full.rebuild(&refs);
981
982 assert_eq!(
984 g_inc.get_node("a").unwrap().properties.get("x"),
985 Some(&Value::Int(2))
986 );
987 assert_eq!(
988 g_full.get_node("a").unwrap().properties.get("x"),
989 Some(&Value::Int(2))
990 );
991 assert_eq!(g_inc.all_edges().len(), 0);
993 assert_eq!(g_full.all_edges().len(), 0);
994 }
995
996 #[test]
997 fn lww_concurrent_property_update() {
998 let mut g = MaterializedGraph::new(test_ontology());
1000 g.apply(&make_entry(
1001 GraphOp::AddNode {
1002 node_id: "s1".into(),
1003 node_type: "entity".into(),
1004 label: "s1".into(),
1005 properties: BTreeMap::new(),
1006 subtype: None,
1007 },
1008 1,
1009 "inst-a",
1010 ));
1011 g.apply(&make_entry(
1013 GraphOp::UpdateProperty {
1014 entity_id: "s1".into(),
1015 key: "status".into(),
1016 value: Value::String("alive".into()),
1017 },
1018 2,
1019 "inst-a",
1020 ));
1021 g.apply(&make_entry(
1023 GraphOp::UpdateProperty {
1024 entity_id: "s1".into(),
1025 key: "status".into(),
1026 value: Value::String("dead".into()),
1027 },
1028 3,
1029 "inst-b",
1030 ));
1031 assert_eq!(
1032 g.get_node("s1").unwrap().properties.get("status"),
1033 Some(&Value::String("dead".into()))
1034 );
1035 }
1036
1037 #[test]
1038 fn lww_tiebreak_by_instance_id() {
1039 let mut g = MaterializedGraph::new(test_ontology());
1041 g.apply(&make_entry(
1042 GraphOp::AddNode {
1043 node_id: "s1".into(),
1044 node_type: "entity".into(),
1045 label: "s1".into(),
1046 properties: BTreeMap::new(),
1047 subtype: None,
1048 },
1049 1,
1050 "inst-a",
1051 ));
1052 g.apply(&make_entry(
1054 GraphOp::UpdateProperty {
1055 entity_id: "s1".into(),
1056 key: "x".into(),
1057 value: Value::Int(1),
1058 },
1059 5,
1060 "inst-a",
1061 ));
1062 g.apply(&make_entry(
1063 GraphOp::UpdateProperty {
1064 entity_id: "s1".into(),
1065 key: "x".into(),
1066 value: Value::Int(2),
1067 },
1068 5,
1069 "inst-b",
1070 ));
1071 assert_eq!(
1073 g.get_node("s1").unwrap().properties.get("x"),
1074 Some(&Value::Int(1))
1075 );
1076 }
1077
1078 #[test]
1079 fn lww_per_property_concurrent_different_keys() {
1080 let mut g = MaterializedGraph::new(test_ontology());
1084 g.apply(&make_entry(
1085 GraphOp::AddNode {
1086 node_id: "s1".into(),
1087 node_type: "entity".into(),
1088 label: "s1".into(),
1089 properties: BTreeMap::from([
1090 ("x".into(), Value::Int(0)),
1091 ("y".into(), Value::Int(0)),
1092 ]),
1093 subtype: None,
1094 },
1095 1,
1096 "inst-a",
1097 ));
1098 g.apply(&make_entry(
1100 GraphOp::UpdateProperty {
1101 entity_id: "s1".into(),
1102 key: "x".into(),
1103 value: Value::Int(42),
1104 },
1105 3,
1106 "inst-a",
1107 ));
1108 g.apply(&make_entry(
1110 GraphOp::UpdateProperty {
1111 entity_id: "s1".into(),
1112 key: "y".into(),
1113 value: Value::Int(99),
1114 },
1115 3,
1116 "inst-b",
1117 ));
1118
1119 let node = g.get_node("s1").unwrap();
1120 assert_eq!(
1122 node.properties.get("x"),
1123 Some(&Value::Int(42)),
1124 "update to 'x' must not be rejected by concurrent update to 'y'"
1125 );
1126 assert_eq!(
1127 node.properties.get("y"),
1128 Some(&Value::Int(99)),
1129 "update to 'y' must not be rejected by concurrent update to 'x'"
1130 );
1131 }
1132
1133 #[test]
1134 fn lww_per_property_order_independent() {
1135 let mut g = MaterializedGraph::new(test_ontology());
1137 g.apply(&make_entry(
1138 GraphOp::AddNode {
1139 node_id: "s1".into(),
1140 node_type: "entity".into(),
1141 label: "s1".into(),
1142 properties: BTreeMap::from([
1143 ("x".into(), Value::Int(0)),
1144 ("y".into(), Value::Int(0)),
1145 ]),
1146 subtype: None,
1147 },
1148 1,
1149 "inst-a",
1150 ));
1151 g.apply(&make_entry(
1153 GraphOp::UpdateProperty {
1154 entity_id: "s1".into(),
1155 key: "y".into(),
1156 value: Value::Int(99),
1157 },
1158 3,
1159 "inst-b",
1160 ));
1161 g.apply(&make_entry(
1162 GraphOp::UpdateProperty {
1163 entity_id: "s1".into(),
1164 key: "x".into(),
1165 value: Value::Int(42),
1166 },
1167 3,
1168 "inst-a",
1169 ));
1170
1171 let node = g.get_node("s1").unwrap();
1172 assert_eq!(node.properties.get("x"), Some(&Value::Int(42)));
1173 assert_eq!(node.properties.get("y"), Some(&Value::Int(99)));
1174 }
1175
1176 #[test]
1177 fn add_wins_over_remove() {
1178 let mut g = MaterializedGraph::new(test_ontology());
1180 g.apply(&make_entry(
1181 GraphOp::AddNode {
1182 node_id: "s1".into(),
1183 node_type: "entity".into(),
1184 label: "s1".into(),
1185 properties: BTreeMap::new(),
1186 subtype: None,
1187 },
1188 1,
1189 "inst-a",
1190 ));
1191 g.apply(&make_entry(
1193 GraphOp::RemoveNode {
1194 node_id: "s1".into(),
1195 },
1196 2,
1197 "inst-a",
1198 ));
1199 assert!(g.get_node("s1").is_none());
1200
1201 g.apply(&make_entry(
1203 GraphOp::AddNode {
1204 node_id: "s1".into(),
1205 node_type: "entity".into(),
1206 label: "s1 v2".into(),
1207 properties: BTreeMap::new(),
1208 subtype: None,
1209 },
1210 3,
1211 "inst-b",
1212 ));
1213 let node = g.get_node("s1").unwrap();
1214 assert_eq!(node.label, "s1 v2");
1215 assert!(!node.tombstoned);
1216 }
1217}