Skip to main content

silk/
graph.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2
3use crate::clock::LamportClock;
4use crate::entry::{Entry, GraphOp, Hash, Value};
5use crate::ontology::Ontology;
6
7/// A materialized node in the graph.
8#[derive(Debug, Clone, PartialEq)]
9pub struct Node {
10    pub node_id: String,
11    pub node_type: String,
12    pub subtype: Option<String>,
13    pub label: String,
14    pub properties: BTreeMap<String, Value>,
15    /// Per-property clocks for LWW conflict resolution.
16    /// Each property key tracks the clock of its last write, so
17    /// concurrent updates to different properties don't conflict.
18    pub property_clocks: HashMap<String, LamportClock>,
19    /// Clock of the entry that last modified this node.
20    /// Used for add-wins semantics and label LWW.
21    pub last_clock: LamportClock,
22    /// Clock of the most recent AddNode for this node.
23    /// Used for add-wins semantics: remove only wins if its clock
24    /// is strictly greater than last_add_clock.
25    pub last_add_clock: LamportClock,
26    /// Whether this node has been tombstoned (removed).
27    pub tombstoned: bool,
28}
29
30/// A materialized edge in the graph.
31#[derive(Debug, Clone, PartialEq)]
32pub struct Edge {
33    pub edge_id: String,
34    pub edge_type: String,
35    pub source_id: String,
36    pub target_id: String,
37    pub properties: BTreeMap<String, Value>,
38    /// Per-property clocks for LWW conflict resolution.
39    pub property_clocks: HashMap<String, LamportClock>,
40    pub last_clock: LamportClock,
41    /// Clock of the most recent AddEdge for this edge.
42    pub last_add_clock: LamportClock,
43    pub tombstoned: bool,
44}
45
46/// Materialized graph — derived from the op log.
47///
48/// Provides fast queries without replaying the full log.
49/// Updated incrementally as new entries arrive, or rebuilt
50/// from scratch by replaying the entire op log.
51///
52/// CRDT semantics:
53/// - **Add-wins** for topology (concurrent add + remove → node/edge exists)
54/// - **LWW** (Last-Writer-Wins) per property key (highest Lamport clock wins)
55/// - **Tombstones** for deletes (mark as deleted, don't physically remove)
56pub struct MaterializedGraph {
57    /// node_id → Node
58    pub nodes: HashMap<String, Node>,
59    /// edge_id → Edge
60    pub edges: HashMap<String, Edge>,
61    /// node_id → set of outgoing edge_ids
62    pub outgoing: HashMap<String, HashSet<String>>,
63    /// node_id → set of incoming edge_ids
64    pub incoming: HashMap<String, HashSet<String>>,
65    /// node_type → set of node_ids (type index)
66    pub by_type: HashMap<String, HashSet<String>>,
67    /// The ontology (for validation during materialization)
68    pub ontology: Ontology,
69    /// R-02: entries that failed ontology validation during apply().
70    /// These entries exist in the oplog (for CRDT convergence) but are
71    /// invisible in the materialized graph. Grow-only set — monotonic, safe.
72    pub quarantined: HashSet<Hash>,
73}
74
75impl MaterializedGraph {
76    /// Create an empty materialized graph with the given ontology.
77    pub fn new(ontology: Ontology) -> Self {
78        Self {
79            nodes: HashMap::new(),
80            edges: HashMap::new(),
81            outgoing: HashMap::new(),
82            incoming: HashMap::new(),
83            by_type: HashMap::new(),
84            ontology,
85            quarantined: HashSet::new(),
86        }
87    }
88
89    /// Apply a single entry to the graph (incremental materialization).
90    ///
91    /// R-02: Validates AddNode/AddEdge payloads against the ontology.
92    /// Invalid entries are quarantined (added to `self.quarantined`) and
93    /// skipped for materialization. They remain in the oplog for CRDT
94    /// convergence — quarantine is a graph-layer concern, not an oplog concern.
95    pub fn apply(&mut self, entry: &Entry) {
96        match &entry.payload {
97            GraphOp::DefineOntology { .. } => {
98                // Genesis — nothing to materialize.
99            }
100            GraphOp::ExtendOntology { extension } => {
101                if let Err(_e) = self.ontology.merge_extension(extension) {
102                    self.quarantined.insert(entry.hash);
103                }
104            }
105            GraphOp::AddNode {
106                node_id,
107                node_type,
108                subtype,
109                label,
110                properties,
111            } => {
112                // R-02: validate against ontology, quarantine if invalid
113                if let Err(_e) =
114                    self.ontology
115                        .validate_node(node_type, subtype.as_deref(), properties)
116                {
117                    self.quarantined.insert(entry.hash);
118                    return;
119                }
120                self.apply_add_node(
121                    node_id,
122                    node_type,
123                    subtype.as_deref(),
124                    label,
125                    properties,
126                    &entry.clock,
127                );
128            }
129            GraphOp::AddEdge {
130                edge_id,
131                edge_type,
132                source_id,
133                target_id,
134                properties,
135            } => {
136                // R-02: validate edge type exists. Full source/target type
137                // validation is skipped — during sync, source/target nodes
138                // may not be materialized yet (out-of-order batch).
139                if !self.ontology.edge_types.contains_key(edge_type.as_str()) {
140                    self.quarantined.insert(entry.hash);
141                    return;
142                }
143                self.apply_add_edge(
144                    edge_id,
145                    edge_type,
146                    source_id,
147                    target_id,
148                    properties,
149                    &entry.clock,
150                );
151            }
152            GraphOp::UpdateProperty {
153                entity_id,
154                key,
155                value,
156            } => {
157                self.apply_update_property(entity_id, key, value, &entry.clock);
158            }
159            GraphOp::RemoveNode { node_id } => {
160                self.apply_remove_node(node_id, &entry.clock);
161            }
162            GraphOp::RemoveEdge { edge_id } => {
163                self.apply_remove_edge(edge_id, &entry.clock);
164            }
165        }
166    }
167
168    /// Apply a sequence of entries (full rematerialization from op log).
169    pub fn apply_all(&mut self, entries: &[&Entry]) {
170        for entry in entries {
171            self.apply(entry);
172        }
173    }
174
175    /// Rebuild from scratch: clear everything and replay all entries.
176    pub fn rebuild(&mut self, entries: &[&Entry]) {
177        self.nodes.clear();
178        self.edges.clear();
179        self.outgoing.clear();
180        self.incoming.clear();
181        self.by_type.clear();
182        self.quarantined.clear();
183        self.apply_all(entries);
184    }
185
186    // -- Queries --
187
188    /// Get a node by ID (returns None if not found or tombstoned).
189    pub fn get_node(&self, node_id: &str) -> Option<&Node> {
190        self.nodes.get(node_id).filter(|n| !n.tombstoned)
191    }
192
193    /// Get an edge by ID (returns None if not found or tombstoned).
194    pub fn get_edge(&self, edge_id: &str) -> Option<&Edge> {
195        self.edges.get(edge_id).filter(|e| !e.tombstoned)
196    }
197
198    /// Query all live nodes of a given type.
199    pub fn nodes_by_type(&self, node_type: &str) -> Vec<&Node> {
200        match self.by_type.get(node_type) {
201            Some(ids) => ids.iter().filter_map(|id| self.get_node(id)).collect(),
202            None => vec![],
203        }
204    }
205
206    /// Query all live nodes of a given subtype.
207    pub fn nodes_by_subtype(&self, subtype: &str) -> Vec<&Node> {
208        self.nodes
209            .values()
210            .filter(|n| !n.tombstoned && n.subtype.as_deref() == Some(subtype))
211            .collect()
212    }
213
214    /// Query nodes by a property value.
215    pub fn nodes_by_property(&self, key: &str, value: &Value) -> Vec<&Node> {
216        self.nodes
217            .values()
218            .filter(|n| !n.tombstoned && n.properties.get(key) == Some(value))
219            .collect()
220    }
221
222    /// Get outgoing edges for a node (only live edges with live endpoints).
223    pub fn outgoing_edges(&self, node_id: &str) -> Vec<&Edge> {
224        match self.outgoing.get(node_id) {
225            Some(edge_ids) => edge_ids
226                .iter()
227                .filter_map(|eid| self.get_edge(eid))
228                .filter(|e| self.is_node_live(&e.target_id))
229                .collect(),
230            None => vec![],
231        }
232    }
233
234    /// Get incoming edges for a node (only live edges with live endpoints).
235    pub fn incoming_edges(&self, node_id: &str) -> Vec<&Edge> {
236        match self.incoming.get(node_id) {
237            Some(edge_ids) => edge_ids
238                .iter()
239                .filter_map(|eid| self.get_edge(eid))
240                .filter(|e| self.is_node_live(&e.source_id))
241                .collect(),
242            None => vec![],
243        }
244    }
245
246    /// All live nodes.
247    pub fn all_nodes(&self) -> Vec<&Node> {
248        self.nodes.values().filter(|n| !n.tombstoned).collect()
249    }
250
251    /// All live edges (with live endpoints).
252    pub fn all_edges(&self) -> Vec<&Edge> {
253        self.edges
254            .values()
255            .filter(|e| {
256                !e.tombstoned && self.is_node_live(&e.source_id) && self.is_node_live(&e.target_id)
257            })
258            .collect()
259    }
260
261    /// Neighbors of a node (connected via outgoing edges).
262    pub fn neighbors(&self, node_id: &str) -> Vec<&str> {
263        self.outgoing_edges(node_id)
264            .iter()
265            .map(|e| e.target_id.as_str())
266            .collect()
267    }
268
269    /// Reverse neighbors (connected via incoming edges).
270    pub fn reverse_neighbors(&self, node_id: &str) -> Vec<&str> {
271        self.incoming_edges(node_id)
272            .iter()
273            .map(|e| e.source_id.as_str())
274            .collect()
275    }
276
277    // -- CRDT application helpers --
278
279    fn apply_add_node(
280        &mut self,
281        node_id: &str,
282        node_type: &str,
283        subtype: Option<&str>,
284        label: &str,
285        properties: &BTreeMap<String, Value>,
286        clock: &LamportClock,
287    ) {
288        if let Some(existing) = self.nodes.get_mut(node_id) {
289            // Add-wins: always resurrect from tombstone.
290            existing.tombstoned = false;
291            // Track the latest add clock for add-wins semantics.
292            if clock_wins(clock, &existing.last_add_clock) {
293                existing.last_add_clock = clock.clone();
294            }
295            // LWW merge for label, subtype, and properties.
296            if clock_wins(clock, &existing.last_clock) {
297                existing.label = label.to_string();
298                existing.subtype = subtype.map(|s| s.to_string());
299                existing.last_clock = clock.clone();
300            }
301            // Per-property LWW: each property from add_node competes
302            // only with writes to the same key.
303            for (k, v) in properties {
304                let dominated = existing
305                    .property_clocks
306                    .get(k)
307                    .map(|c| clock_wins(clock, c))
308                    .unwrap_or(true);
309                if dominated {
310                    existing.properties.insert(k.clone(), v.clone());
311                    existing.property_clocks.insert(k.clone(), clock.clone());
312                }
313            }
314        } else {
315            let property_clocks: HashMap<String, LamportClock> = properties
316                .keys()
317                .map(|k| (k.clone(), clock.clone()))
318                .collect();
319            let node = Node {
320                node_id: node_id.to_string(),
321                node_type: node_type.to_string(),
322                subtype: subtype.map(|s| s.to_string()),
323                label: label.to_string(),
324                properties: properties.clone(),
325                property_clocks,
326                last_clock: clock.clone(),
327                last_add_clock: clock.clone(),
328                tombstoned: false,
329            };
330            self.by_type
331                .entry(node_type.to_string())
332                .or_default()
333                .insert(node_id.to_string());
334            self.nodes.insert(node_id.to_string(), node);
335        }
336    }
337
338    fn apply_add_edge(
339        &mut self,
340        edge_id: &str,
341        edge_type: &str,
342        source_id: &str,
343        target_id: &str,
344        properties: &BTreeMap<String, Value>,
345        clock: &LamportClock,
346    ) {
347        if let Some(existing) = self.edges.get_mut(edge_id) {
348            // Add-wins: always resurrect if tombstoned.
349            existing.tombstoned = false;
350            if clock_wins(clock, &existing.last_add_clock) {
351                existing.last_add_clock = clock.clone();
352            }
353            if clock_wins(clock, &existing.last_clock) {
354                existing.last_clock = clock.clone();
355            }
356            // Per-property LWW for edge properties.
357            for (k, v) in properties {
358                let dominated = existing
359                    .property_clocks
360                    .get(k)
361                    .map(|c| clock_wins(clock, c))
362                    .unwrap_or(true);
363                if dominated {
364                    existing.properties.insert(k.clone(), v.clone());
365                    existing.property_clocks.insert(k.clone(), clock.clone());
366                }
367            }
368        } else {
369            let property_clocks: HashMap<String, LamportClock> = properties
370                .keys()
371                .map(|k| (k.clone(), clock.clone()))
372                .collect();
373            let edge = Edge {
374                edge_id: edge_id.to_string(),
375                edge_type: edge_type.to_string(),
376                source_id: source_id.to_string(),
377                target_id: target_id.to_string(),
378                properties: properties.clone(),
379                property_clocks,
380                last_clock: clock.clone(),
381                last_add_clock: clock.clone(),
382                tombstoned: false,
383            };
384            self.outgoing
385                .entry(source_id.to_string())
386                .or_default()
387                .insert(edge_id.to_string());
388            self.incoming
389                .entry(target_id.to_string())
390                .or_default()
391                .insert(edge_id.to_string());
392            self.edges.insert(edge_id.to_string(), edge);
393        }
394    }
395
396    fn apply_update_property(
397        &mut self,
398        entity_id: &str,
399        key: &str,
400        value: &Value,
401        clock: &LamportClock,
402    ) {
403        // Try node first, then edge. Per-property LWW: each key competes
404        // only with other writes to the same key, not the entire entity.
405        if let Some(node) = self.nodes.get_mut(entity_id) {
406            let dominated = node
407                .property_clocks
408                .get(key)
409                .map(|c| clock_wins(clock, c))
410                .unwrap_or(true);
411            if dominated {
412                node.properties.insert(key.to_string(), value.clone());
413                node.property_clocks.insert(key.to_string(), clock.clone());
414            }
415            // Update entity-level clock for add-wins tracking.
416            if clock_wins(clock, &node.last_clock) {
417                node.last_clock = clock.clone();
418            }
419        } else if let Some(edge) = self.edges.get_mut(entity_id) {
420            let dominated = edge
421                .property_clocks
422                .get(key)
423                .map(|c| clock_wins(clock, c))
424                .unwrap_or(true);
425            if dominated {
426                edge.properties.insert(key.to_string(), value.clone());
427                edge.property_clocks.insert(key.to_string(), clock.clone());
428            }
429            if clock_wins(clock, &edge.last_clock) {
430                edge.last_clock = clock.clone();
431            }
432        }
433        // If entity not found, silently ignore (may arrive out of order in sync).
434    }
435
436    fn apply_remove_node(&mut self, node_id: &str, clock: &LamportClock) {
437        if let Some(node) = self.nodes.get_mut(node_id) {
438            // Add-wins: only tombstone if the remove clock is strictly greater
439            // than the last add clock. If a concurrent (or later) add exists,
440            // the node stays alive.
441            if clock_wins(clock, &node.last_add_clock) {
442                node.tombstoned = true;
443                node.last_clock = clock.clone();
444            }
445        }
446        // Tombstoning a node doesn't physically remove edges — they just become
447        // invisible via is_node_live() checks in queries.
448    }
449
450    fn apply_remove_edge(&mut self, edge_id: &str, clock: &LamportClock) {
451        if let Some(edge) = self.edges.get_mut(edge_id) {
452            // Add-wins: only tombstone if remove clock > last add clock.
453            if clock_wins(clock, &edge.last_add_clock) {
454                edge.tombstoned = true;
455                edge.last_clock = clock.clone();
456            }
457        }
458    }
459
460    fn is_node_live(&self, node_id: &str) -> bool {
461        self.nodes
462            .get(node_id)
463            .map(|n| !n.tombstoned)
464            .unwrap_or(false)
465    }
466}
467
468/// LWW comparison: returns true if `new_clock` wins over `existing_clock`.
469/// Uses HybridClock total ordering: (physical_ms, logical, id).
470fn clock_wins(new_clock: &LamportClock, existing_clock: &LamportClock) -> bool {
471    new_clock.cmp_order(existing_clock) == std::cmp::Ordering::Greater
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477    use crate::entry::Entry;
478    use crate::ontology::{EdgeTypeDef, NodeTypeDef};
479
480    fn test_ontology() -> Ontology {
481        Ontology {
482            node_types: BTreeMap::from([
483                (
484                    "entity".into(),
485                    NodeTypeDef {
486                        description: None,
487                        properties: BTreeMap::new(),
488                        subtypes: None,
489                    },
490                ),
491                (
492                    "signal".into(),
493                    NodeTypeDef {
494                        description: None,
495                        properties: BTreeMap::new(),
496                        subtypes: None,
497                    },
498                ),
499            ]),
500            edge_types: BTreeMap::from([
501                (
502                    "RUNS_ON".into(),
503                    EdgeTypeDef {
504                        description: None,
505                        source_types: vec!["entity".into()],
506                        target_types: vec!["entity".into()],
507                        properties: BTreeMap::new(),
508                    },
509                ),
510                (
511                    "OBSERVES".into(),
512                    EdgeTypeDef {
513                        description: None,
514                        source_types: vec!["signal".into()],
515                        target_types: vec!["entity".into()],
516                        properties: BTreeMap::new(),
517                    },
518                ),
519            ]),
520        }
521    }
522
523    fn make_entry(op: GraphOp, clock_time: u64, author: &str) -> Entry {
524        Entry::new(
525            op,
526            vec![],
527            vec![],
528            LamportClock::with_values(author, clock_time, 0),
529            author,
530        )
531    }
532
533    // -- test_graph.rs spec from docs/silk.md --
534
535    #[test]
536    fn add_node_appears_in_query() {
537        let mut g = MaterializedGraph::new(test_ontology());
538        let entry = make_entry(
539            GraphOp::AddNode {
540                node_id: "server-1".into(),
541                node_type: "entity".into(),
542                label: "Server 1".into(),
543                properties: BTreeMap::from([("ip".into(), Value::String("10.0.0.1".into()))]),
544                subtype: None,
545            },
546            1,
547            "inst-a",
548        );
549        g.apply(&entry);
550
551        let node = g.get_node("server-1").unwrap();
552        assert_eq!(node.node_type, "entity");
553        assert_eq!(node.label, "Server 1");
554        assert_eq!(
555            node.properties.get("ip"),
556            Some(&Value::String("10.0.0.1".into()))
557        );
558    }
559
560    #[test]
561    fn add_edge_creates_adjacency() {
562        let mut g = MaterializedGraph::new(test_ontology());
563        g.apply(&make_entry(
564            GraphOp::AddNode {
565                node_id: "svc".into(),
566                node_type: "entity".into(),
567                label: "svc".into(),
568                properties: BTreeMap::new(),
569                subtype: None,
570            },
571            1,
572            "inst-a",
573        ));
574        g.apply(&make_entry(
575            GraphOp::AddNode {
576                node_id: "srv".into(),
577                node_type: "entity".into(),
578                label: "srv".into(),
579                properties: BTreeMap::new(),
580                subtype: None,
581            },
582            2,
583            "inst-a",
584        ));
585        g.apply(&make_entry(
586            GraphOp::AddEdge {
587                edge_id: "e1".into(),
588                edge_type: "RUNS_ON".into(),
589                source_id: "svc".into(),
590                target_id: "srv".into(),
591                properties: BTreeMap::new(),
592            },
593            3,
594            "inst-a",
595        ));
596
597        // Both endpoints know about the edge.
598        let out = g.outgoing_edges("svc");
599        assert_eq!(out.len(), 1);
600        assert_eq!(out[0].target_id, "srv");
601
602        let inc = g.incoming_edges("srv");
603        assert_eq!(inc.len(), 1);
604        assert_eq!(inc[0].source_id, "svc");
605
606        assert_eq!(g.neighbors("svc"), vec!["srv"]);
607    }
608
609    #[test]
610    fn update_property_reflected() {
611        let mut g = MaterializedGraph::new(test_ontology());
612        g.apply(&make_entry(
613            GraphOp::AddNode {
614                node_id: "s1".into(),
615                node_type: "entity".into(),
616                label: "s1".into(),
617                properties: BTreeMap::new(),
618                subtype: None,
619            },
620            1,
621            "inst-a",
622        ));
623        g.apply(&make_entry(
624            GraphOp::UpdateProperty {
625                entity_id: "s1".into(),
626                key: "cpu".into(),
627                value: Value::Float(85.5),
628            },
629            2,
630            "inst-a",
631        ));
632
633        let node = g.get_node("s1").unwrap();
634        assert_eq!(node.properties.get("cpu"), Some(&Value::Float(85.5)));
635    }
636
637    #[test]
638    fn remove_node_cascades_edges() {
639        let mut g = MaterializedGraph::new(test_ontology());
640        g.apply(&make_entry(
641            GraphOp::AddNode {
642                node_id: "a".into(),
643                node_type: "entity".into(),
644                label: "a".into(),
645                properties: BTreeMap::new(),
646                subtype: None,
647            },
648            1,
649            "inst-a",
650        ));
651        g.apply(&make_entry(
652            GraphOp::AddNode {
653                node_id: "b".into(),
654                node_type: "entity".into(),
655                label: "b".into(),
656                properties: BTreeMap::new(),
657                subtype: None,
658            },
659            2,
660            "inst-a",
661        ));
662        g.apply(&make_entry(
663            GraphOp::AddEdge {
664                edge_id: "e1".into(),
665                edge_type: "RUNS_ON".into(),
666                source_id: "a".into(),
667                target_id: "b".into(),
668                properties: BTreeMap::new(),
669            },
670            3,
671            "inst-a",
672        ));
673        assert_eq!(g.all_edges().len(), 1);
674
675        // Remove node 'b' — edge becomes invisible (dangling target).
676        g.apply(&make_entry(
677            GraphOp::RemoveNode {
678                node_id: "b".into(),
679            },
680            4,
681            "inst-a",
682        ));
683        assert!(g.get_node("b").is_none());
684        // Edge still exists but not returned by all_edges (target tombstoned).
685        assert_eq!(g.all_edges().len(), 0);
686        // Outgoing from 'a' also filters out dangling edges.
687        assert_eq!(g.outgoing_edges("a").len(), 0);
688    }
689
690    #[test]
691    fn remove_edge_preserves_nodes() {
692        let mut g = MaterializedGraph::new(test_ontology());
693        g.apply(&make_entry(
694            GraphOp::AddNode {
695                node_id: "a".into(),
696                node_type: "entity".into(),
697                label: "a".into(),
698                properties: BTreeMap::new(),
699                subtype: None,
700            },
701            1,
702            "inst-a",
703        ));
704        g.apply(&make_entry(
705            GraphOp::AddNode {
706                node_id: "b".into(),
707                node_type: "entity".into(),
708                label: "b".into(),
709                properties: BTreeMap::new(),
710                subtype: None,
711            },
712            2,
713            "inst-a",
714        ));
715        g.apply(&make_entry(
716            GraphOp::AddEdge {
717                edge_id: "e1".into(),
718                edge_type: "RUNS_ON".into(),
719                source_id: "a".into(),
720                target_id: "b".into(),
721                properties: BTreeMap::new(),
722            },
723            3,
724            "inst-a",
725        ));
726        g.apply(&make_entry(
727            GraphOp::RemoveEdge {
728                edge_id: "e1".into(),
729            },
730            4,
731            "inst-a",
732        ));
733
734        // Nodes still exist.
735        assert!(g.get_node("a").is_some());
736        assert!(g.get_node("b").is_some());
737        // Edge is gone.
738        assert!(g.get_edge("e1").is_none());
739        assert_eq!(g.all_edges().len(), 0);
740    }
741
742    #[test]
743    fn query_by_type_filters() {
744        let mut g = MaterializedGraph::new(test_ontology());
745        g.apply(&make_entry(
746            GraphOp::AddNode {
747                node_id: "s1".into(),
748                node_type: "entity".into(),
749                label: "s1".into(),
750                properties: BTreeMap::new(),
751                subtype: None,
752            },
753            1,
754            "inst-a",
755        ));
756        g.apply(&make_entry(
757            GraphOp::AddNode {
758                node_id: "s2".into(),
759                node_type: "entity".into(),
760                label: "s2".into(),
761                properties: BTreeMap::new(),
762                subtype: None,
763            },
764            2,
765            "inst-a",
766        ));
767        g.apply(&make_entry(
768            GraphOp::AddNode {
769                node_id: "alert".into(),
770                node_type: "signal".into(),
771                label: "alert".into(),
772                properties: BTreeMap::new(),
773                subtype: None,
774            },
775            3,
776            "inst-a",
777        ));
778
779        let entities = g.nodes_by_type("entity");
780        assert_eq!(entities.len(), 2);
781        let signals = g.nodes_by_type("signal");
782        assert_eq!(signals.len(), 1);
783        assert_eq!(signals[0].node_id, "alert");
784    }
785
786    #[test]
787    fn query_by_property_filters() {
788        let mut g = MaterializedGraph::new(test_ontology());
789        g.apply(&make_entry(
790            GraphOp::AddNode {
791                node_id: "s1".into(),
792                node_type: "entity".into(),
793                label: "s1".into(),
794                properties: BTreeMap::from([("status".into(), Value::String("alive".into()))]),
795                subtype: None,
796            },
797            1,
798            "inst-a",
799        ));
800        g.apply(&make_entry(
801            GraphOp::AddNode {
802                node_id: "s2".into(),
803                node_type: "entity".into(),
804                label: "s2".into(),
805                properties: BTreeMap::from([("status".into(), Value::String("dead".into()))]),
806                subtype: None,
807            },
808            2,
809            "inst-a",
810        ));
811
812        let alive = g.nodes_by_property("status", &Value::String("alive".into()));
813        assert_eq!(alive.len(), 1);
814        assert_eq!(alive[0].node_id, "s1");
815    }
816
817    #[test]
818    fn materialization_from_empty() {
819        // Build graph incrementally.
820        let mut g1 = MaterializedGraph::new(test_ontology());
821        let entries = vec![
822            make_entry(
823                GraphOp::DefineOntology {
824                    ontology: test_ontology(),
825                },
826                0,
827                "inst-a",
828            ),
829            make_entry(
830                GraphOp::AddNode {
831                    node_id: "a".into(),
832                    node_type: "entity".into(),
833                    label: "a".into(),
834                    properties: BTreeMap::new(),
835                    subtype: None,
836                },
837                1,
838                "inst-a",
839            ),
840            make_entry(
841                GraphOp::AddNode {
842                    node_id: "b".into(),
843                    node_type: "entity".into(),
844                    label: "b".into(),
845                    properties: BTreeMap::new(),
846                    subtype: None,
847                },
848                2,
849                "inst-a",
850            ),
851            make_entry(
852                GraphOp::AddEdge {
853                    edge_id: "e1".into(),
854                    edge_type: "RUNS_ON".into(),
855                    source_id: "a".into(),
856                    target_id: "b".into(),
857                    properties: BTreeMap::new(),
858                },
859                3,
860                "inst-a",
861            ),
862        ];
863        for e in &entries {
864            g1.apply(e);
865        }
866
867        // Rebuild from scratch.
868        let mut g2 = MaterializedGraph::new(test_ontology());
869        let refs: Vec<&Entry> = entries.iter().collect();
870        g2.rebuild(&refs);
871
872        // Same result.
873        assert_eq!(g1.all_nodes().len(), g2.all_nodes().len());
874        assert_eq!(g1.all_edges().len(), g2.all_edges().len());
875        for node in g1.all_nodes() {
876            let n2 = g2.get_node(&node.node_id).unwrap();
877            assert_eq!(node.node_type, n2.node_type);
878            assert_eq!(node.properties, n2.properties);
879        }
880    }
881
882    #[test]
883    fn incremental_equals_full() {
884        let entries = vec![
885            make_entry(
886                GraphOp::DefineOntology {
887                    ontology: test_ontology(),
888                },
889                0,
890                "inst-a",
891            ),
892            make_entry(
893                GraphOp::AddNode {
894                    node_id: "a".into(),
895                    node_type: "entity".into(),
896                    label: "a".into(),
897                    properties: BTreeMap::from([("x".into(), Value::Int(1))]),
898                    subtype: None,
899                },
900                1,
901                "inst-a",
902            ),
903            make_entry(
904                GraphOp::UpdateProperty {
905                    entity_id: "a".into(),
906                    key: "x".into(),
907                    value: Value::Int(2),
908                },
909                2,
910                "inst-a",
911            ),
912            make_entry(
913                GraphOp::AddNode {
914                    node_id: "b".into(),
915                    node_type: "entity".into(),
916                    label: "b".into(),
917                    properties: BTreeMap::new(),
918                    subtype: None,
919                },
920                3,
921                "inst-a",
922            ),
923            make_entry(
924                GraphOp::AddEdge {
925                    edge_id: "e1".into(),
926                    edge_type: "RUNS_ON".into(),
927                    source_id: "a".into(),
928                    target_id: "b".into(),
929                    properties: BTreeMap::new(),
930                },
931                4,
932                "inst-a",
933            ),
934            make_entry(
935                GraphOp::RemoveEdge {
936                    edge_id: "e1".into(),
937                },
938                5,
939                "inst-a",
940            ),
941        ];
942
943        // Incremental.
944        let mut g_inc = MaterializedGraph::new(test_ontology());
945        for e in &entries {
946            g_inc.apply(e);
947        }
948
949        // Full replay.
950        let mut g_full = MaterializedGraph::new(test_ontology());
951        let refs: Vec<&Entry> = entries.iter().collect();
952        g_full.rebuild(&refs);
953
954        // Property should be 2 (updated).
955        assert_eq!(
956            g_inc.get_node("a").unwrap().properties.get("x"),
957            Some(&Value::Int(2))
958        );
959        assert_eq!(
960            g_full.get_node("a").unwrap().properties.get("x"),
961            Some(&Value::Int(2))
962        );
963        // Edge should be removed.
964        assert_eq!(g_inc.all_edges().len(), 0);
965        assert_eq!(g_full.all_edges().len(), 0);
966    }
967
968    #[test]
969    fn lww_concurrent_property_update() {
970        // Two instances update the same property — higher clock wins.
971        let mut g = MaterializedGraph::new(test_ontology());
972        g.apply(&make_entry(
973            GraphOp::AddNode {
974                node_id: "s1".into(),
975                node_type: "entity".into(),
976                label: "s1".into(),
977                properties: BTreeMap::new(),
978                subtype: None,
979            },
980            1,
981            "inst-a",
982        ));
983        // inst-a sets status=alive at time 2
984        g.apply(&make_entry(
985            GraphOp::UpdateProperty {
986                entity_id: "s1".into(),
987                key: "status".into(),
988                value: Value::String("alive".into()),
989            },
990            2,
991            "inst-a",
992        ));
993        // inst-b sets status=dead at time 3 — wins (higher clock)
994        g.apply(&make_entry(
995            GraphOp::UpdateProperty {
996                entity_id: "s1".into(),
997                key: "status".into(),
998                value: Value::String("dead".into()),
999            },
1000            3,
1001            "inst-b",
1002        ));
1003        assert_eq!(
1004            g.get_node("s1").unwrap().properties.get("status"),
1005            Some(&Value::String("dead".into()))
1006        );
1007    }
1008
1009    #[test]
1010    fn lww_tiebreak_by_instance_id() {
1011        // Same clock time — higher instance ID wins.
1012        let mut g = MaterializedGraph::new(test_ontology());
1013        g.apply(&make_entry(
1014            GraphOp::AddNode {
1015                node_id: "s1".into(),
1016                node_type: "entity".into(),
1017                label: "s1".into(),
1018                properties: BTreeMap::new(),
1019                subtype: None,
1020            },
1021            1,
1022            "inst-a",
1023        ));
1024        // Both at physical_ms=5, logical=0. Lower id wins → "inst-a" wins.
1025        g.apply(&make_entry(
1026            GraphOp::UpdateProperty {
1027                entity_id: "s1".into(),
1028                key: "x".into(),
1029                value: Value::Int(1),
1030            },
1031            5,
1032            "inst-a",
1033        ));
1034        g.apply(&make_entry(
1035            GraphOp::UpdateProperty {
1036                entity_id: "s1".into(),
1037                key: "x".into(),
1038                value: Value::Int(2),
1039            },
1040            5,
1041            "inst-b",
1042        ));
1043        // inst-a has lower id → wins the tiebreak → value stays Int(1).
1044        assert_eq!(
1045            g.get_node("s1").unwrap().properties.get("x"),
1046            Some(&Value::Int(1))
1047        );
1048    }
1049
1050    #[test]
1051    fn lww_per_property_concurrent_different_keys() {
1052        // Two instances concurrently update DIFFERENT properties at the same
1053        // clock time. Both updates must be accepted — they don't conflict.
1054        // This requires per-property LWW, not node-level LWW.
1055        let mut g = MaterializedGraph::new(test_ontology());
1056        g.apply(&make_entry(
1057            GraphOp::AddNode {
1058                node_id: "s1".into(),
1059                node_type: "entity".into(),
1060                label: "s1".into(),
1061                properties: BTreeMap::from([
1062                    ("x".into(), Value::Int(0)),
1063                    ("y".into(), Value::Int(0)),
1064                ]),
1065                subtype: None,
1066            },
1067            1,
1068            "inst-a",
1069        ));
1070        // inst-a updates "x" at time 3
1071        g.apply(&make_entry(
1072            GraphOp::UpdateProperty {
1073                entity_id: "s1".into(),
1074                key: "x".into(),
1075                value: Value::Int(42),
1076            },
1077            3,
1078            "inst-a",
1079        ));
1080        // inst-b updates "y" at time 3 (concurrent, different property)
1081        g.apply(&make_entry(
1082            GraphOp::UpdateProperty {
1083                entity_id: "s1".into(),
1084                key: "y".into(),
1085                value: Value::Int(99),
1086            },
1087            3,
1088            "inst-b",
1089        ));
1090
1091        let node = g.get_node("s1").unwrap();
1092        // Both updates must be applied — no conflict.
1093        assert_eq!(
1094            node.properties.get("x"),
1095            Some(&Value::Int(42)),
1096            "update to 'x' must not be rejected by concurrent update to 'y'"
1097        );
1098        assert_eq!(
1099            node.properties.get("y"),
1100            Some(&Value::Int(99)),
1101            "update to 'y' must not be rejected by concurrent update to 'x'"
1102        );
1103    }
1104
1105    #[test]
1106    fn lww_per_property_order_independent() {
1107        // Same scenario but applied in reverse order — result must be identical.
1108        let mut g = MaterializedGraph::new(test_ontology());
1109        g.apply(&make_entry(
1110            GraphOp::AddNode {
1111                node_id: "s1".into(),
1112                node_type: "entity".into(),
1113                label: "s1".into(),
1114                properties: BTreeMap::from([
1115                    ("x".into(), Value::Int(0)),
1116                    ("y".into(), Value::Int(0)),
1117                ]),
1118                subtype: None,
1119            },
1120            1,
1121            "inst-a",
1122        ));
1123        // Apply inst-b first this time
1124        g.apply(&make_entry(
1125            GraphOp::UpdateProperty {
1126                entity_id: "s1".into(),
1127                key: "y".into(),
1128                value: Value::Int(99),
1129            },
1130            3,
1131            "inst-b",
1132        ));
1133        g.apply(&make_entry(
1134            GraphOp::UpdateProperty {
1135                entity_id: "s1".into(),
1136                key: "x".into(),
1137                value: Value::Int(42),
1138            },
1139            3,
1140            "inst-a",
1141        ));
1142
1143        let node = g.get_node("s1").unwrap();
1144        assert_eq!(node.properties.get("x"), Some(&Value::Int(42)));
1145        assert_eq!(node.properties.get("y"), Some(&Value::Int(99)));
1146    }
1147
1148    #[test]
1149    fn add_wins_over_remove() {
1150        // Concurrent add + remove → node should exist (add-wins).
1151        let mut g = MaterializedGraph::new(test_ontology());
1152        g.apply(&make_entry(
1153            GraphOp::AddNode {
1154                node_id: "s1".into(),
1155                node_type: "entity".into(),
1156                label: "s1".into(),
1157                properties: BTreeMap::new(),
1158                subtype: None,
1159            },
1160            1,
1161            "inst-a",
1162        ));
1163        // Remove at time 2.
1164        g.apply(&make_entry(
1165            GraphOp::RemoveNode {
1166                node_id: "s1".into(),
1167            },
1168            2,
1169            "inst-a",
1170        ));
1171        assert!(g.get_node("s1").is_none());
1172
1173        // Re-add at time 3 (add-wins — resurrects).
1174        g.apply(&make_entry(
1175            GraphOp::AddNode {
1176                node_id: "s1".into(),
1177                node_type: "entity".into(),
1178                label: "s1 v2".into(),
1179                properties: BTreeMap::new(),
1180                subtype: None,
1181            },
1182            3,
1183            "inst-b",
1184        ));
1185        let node = g.get_node("s1").unwrap();
1186        assert_eq!(node.label, "s1 v2");
1187        assert!(!node.tombstoned);
1188    }
1189}