Skip to main content

silk/
graph.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2
3use crate::clock::LamportClock;
4use crate::entry::{Entry, GraphOp, Hash, Value};
5use crate::ontology::Ontology;
6
7/// A materialized node in the graph.
8#[derive(Debug, Clone, PartialEq)]
9pub struct Node {
10    pub node_id: String,
11    pub node_type: String,
12    pub subtype: Option<String>,
13    pub label: String,
14    pub properties: BTreeMap<String, Value>,
15    /// Per-property clocks for LWW conflict resolution.
16    /// Each property key tracks the clock of its last write, so
17    /// concurrent updates to different properties don't conflict.
18    pub property_clocks: HashMap<String, LamportClock>,
19    /// Clock of the entry that last modified this node.
20    /// Used for add-wins semantics and label LWW.
21    pub last_clock: LamportClock,
22    /// Clock of the most recent AddNode for this node.
23    /// Used for add-wins semantics: remove only wins if its clock
24    /// is strictly greater than last_add_clock.
25    pub last_add_clock: LamportClock,
26    /// Whether this node has been tombstoned (removed).
27    pub tombstoned: bool,
28}
29
30/// A materialized edge in the graph.
31#[derive(Debug, Clone, PartialEq)]
32pub struct Edge {
33    pub edge_id: String,
34    pub edge_type: String,
35    pub source_id: String,
36    pub target_id: String,
37    pub properties: BTreeMap<String, Value>,
38    /// Per-property clocks for LWW conflict resolution.
39    pub property_clocks: HashMap<String, LamportClock>,
40    pub last_clock: LamportClock,
41    /// Clock of the most recent AddEdge for this edge.
42    pub last_add_clock: LamportClock,
43    pub tombstoned: bool,
44}
45
46/// Materialized graph — derived from the op log.
47///
48/// Provides fast queries without replaying the full log.
49/// Updated incrementally as new entries arrive, or rebuilt
50/// from scratch by replaying the entire op log.
51///
52/// CRDT semantics:
53/// - **Add-wins** for topology (concurrent add + remove → node/edge exists)
54/// - **LWW** (Last-Writer-Wins) per property key (highest Lamport clock wins)
55/// - **Tombstones** for deletes (mark as deleted, don't physically remove)
56pub struct MaterializedGraph {
57    /// node_id → Node
58    pub nodes: HashMap<String, Node>,
59    /// edge_id → Edge
60    pub edges: HashMap<String, Edge>,
61    /// node_id → set of outgoing edge_ids
62    pub outgoing: HashMap<String, HashSet<String>>,
63    /// node_id → set of incoming edge_ids
64    pub incoming: HashMap<String, HashSet<String>>,
65    /// node_type → set of node_ids (type index)
66    pub by_type: HashMap<String, HashSet<String>>,
67    /// The ontology (for validation during materialization)
68    pub ontology: Ontology,
69    /// R-02: entries that failed ontology validation during apply().
70    /// These entries exist in the oplog (for CRDT convergence) but are
71    /// invisible in the materialized graph. Grow-only set — monotonic, safe.
72    pub quarantined: HashSet<Hash>,
73}
74
75impl MaterializedGraph {
76    /// Create an empty materialized graph with the given ontology.
77    pub fn new(ontology: Ontology) -> Self {
78        Self {
79            nodes: HashMap::new(),
80            edges: HashMap::new(),
81            outgoing: HashMap::new(),
82            incoming: HashMap::new(),
83            by_type: HashMap::new(),
84            ontology,
85            quarantined: HashSet::new(),
86        }
87    }
88
89    /// Apply a single entry to the graph (incremental materialization).
90    ///
91    /// R-02: Validates AddNode/AddEdge payloads against the ontology.
92    /// Invalid entries are quarantined (added to `self.quarantined`) and
93    /// skipped for materialization. They remain in the oplog for CRDT
94    /// convergence — quarantine is a graph-layer concern, not an oplog concern.
95    pub fn apply(&mut self, entry: &Entry) {
96        match &entry.payload {
97            GraphOp::Checkpoint { ops, op_clocks, .. } => {
98                // R-08: Replay synthetic ops to restore graph state.
99                // Bug 6 fix: use per-op clocks (preserves LWW metadata).
100                for (i, op) in ops.iter().enumerate() {
101                    let clock = if i < op_clocks.len() {
102                        LamportClock::with_values(&entry.author, op_clocks[i].0, op_clocks[i].1)
103                    } else {
104                        entry.clock.clone() // fallback for old checkpoints without op_clocks
105                    };
106                    let synthetic = Entry::new(op.clone(), vec![], vec![], clock, &entry.author);
107                    self.apply(&synthetic);
108                }
109            }
110            GraphOp::DefineOntology { .. } => {
111                // Genesis — nothing to materialize.
112            }
113            GraphOp::ExtendOntology { extension } => {
114                if let Err(_e) = self.ontology.merge_extension(extension) {
115                    self.quarantined.insert(entry.hash);
116                }
117            }
118            GraphOp::AddNode {
119                node_id,
120                node_type,
121                subtype,
122                label,
123                properties,
124            } => {
125                // R-02: validate against ontology, quarantine if invalid
126                if let Err(_e) =
127                    self.ontology
128                        .validate_node(node_type, subtype.as_deref(), properties)
129                {
130                    self.quarantined.insert(entry.hash);
131                    return;
132                }
133                self.apply_add_node(
134                    node_id,
135                    node_type,
136                    subtype.as_deref(),
137                    label,
138                    properties,
139                    &entry.clock,
140                );
141            }
142            GraphOp::AddEdge {
143                edge_id,
144                edge_type,
145                source_id,
146                target_id,
147                properties,
148            } => {
149                // R-02: validate edge type exists.
150                if !self.ontology.edge_types.contains_key(edge_type.as_str()) {
151                    self.quarantined.insert(entry.hash);
152                    return;
153                }
154                // Bug 13 fix: validate source/target type constraints when both nodes
155                // are materialized. If one is missing (out-of-order sync), skip —
156                // validation happens on rebuild.
157                if let (Some(src), Some(tgt)) = (
158                    self.nodes.get(source_id.as_str()),
159                    self.nodes.get(target_id.as_str()),
160                ) {
161                    if self
162                        .ontology
163                        .validate_edge(edge_type, &src.node_type, &tgt.node_type, properties)
164                        .is_err()
165                    {
166                        self.quarantined.insert(entry.hash);
167                        return;
168                    }
169                }
170                self.apply_add_edge(
171                    edge_id,
172                    edge_type,
173                    source_id,
174                    target_id,
175                    properties,
176                    &entry.clock,
177                );
178            }
179            GraphOp::UpdateProperty {
180                entity_id,
181                key,
182                value,
183            } => {
184                self.apply_update_property(entity_id, key, value, &entry.clock);
185            }
186            GraphOp::RemoveNode { node_id } => {
187                self.apply_remove_node(node_id, &entry.clock);
188            }
189            GraphOp::RemoveEdge { edge_id } => {
190                self.apply_remove_edge(edge_id, &entry.clock);
191            }
192        }
193    }
194
195    /// Apply a sequence of entries (full rematerialization from op log).
196    pub fn apply_all(&mut self, entries: &[&Entry]) {
197        for entry in entries {
198            self.apply(entry);
199        }
200    }
201
202    /// Rebuild from scratch: clear everything and replay all entries.
203    pub fn rebuild(&mut self, entries: &[&Entry]) {
204        self.nodes.clear();
205        self.edges.clear();
206        self.outgoing.clear();
207        self.incoming.clear();
208        self.by_type.clear();
209        self.quarantined.clear();
210        self.apply_all(entries);
211    }
212
213    // -- Queries --
214
215    /// Get a node by ID (returns None if not found or tombstoned).
216    pub fn get_node(&self, node_id: &str) -> Option<&Node> {
217        self.nodes.get(node_id).filter(|n| !n.tombstoned)
218    }
219
220    /// Get an edge by ID (returns None if not found or tombstoned).
221    pub fn get_edge(&self, edge_id: &str) -> Option<&Edge> {
222        self.edges.get(edge_id).filter(|e| !e.tombstoned)
223    }
224
225    /// Query all live nodes of a given type.
226    pub fn nodes_by_type(&self, node_type: &str) -> Vec<&Node> {
227        match self.by_type.get(node_type) {
228            Some(ids) => ids.iter().filter_map(|id| self.get_node(id)).collect(),
229            None => vec![],
230        }
231    }
232
233    /// Query all live nodes of a given subtype.
234    pub fn nodes_by_subtype(&self, subtype: &str) -> Vec<&Node> {
235        self.nodes
236            .values()
237            .filter(|n| !n.tombstoned && n.subtype.as_deref() == Some(subtype))
238            .collect()
239    }
240
241    /// Query nodes by a property value.
242    pub fn nodes_by_property(&self, key: &str, value: &Value) -> Vec<&Node> {
243        self.nodes
244            .values()
245            .filter(|n| !n.tombstoned && n.properties.get(key) == Some(value))
246            .collect()
247    }
248
249    /// Get outgoing edges for a node (only live edges with live endpoints).
250    pub fn outgoing_edges(&self, node_id: &str) -> Vec<&Edge> {
251        match self.outgoing.get(node_id) {
252            Some(edge_ids) => edge_ids
253                .iter()
254                .filter_map(|eid| self.get_edge(eid))
255                .filter(|e| self.is_node_live(&e.target_id))
256                .collect(),
257            None => vec![],
258        }
259    }
260
261    /// Get incoming edges for a node (only live edges with live endpoints).
262    pub fn incoming_edges(&self, node_id: &str) -> Vec<&Edge> {
263        match self.incoming.get(node_id) {
264            Some(edge_ids) => edge_ids
265                .iter()
266                .filter_map(|eid| self.get_edge(eid))
267                .filter(|e| self.is_node_live(&e.source_id))
268                .collect(),
269            None => vec![],
270        }
271    }
272
273    /// All live nodes.
274    pub fn all_nodes(&self) -> Vec<&Node> {
275        self.nodes.values().filter(|n| !n.tombstoned).collect()
276    }
277
278    /// All live edges (with live endpoints).
279    pub fn all_edges(&self) -> Vec<&Edge> {
280        self.edges
281            .values()
282            .filter(|e| {
283                !e.tombstoned && self.is_node_live(&e.source_id) && self.is_node_live(&e.target_id)
284            })
285            .collect()
286    }
287
288    /// Neighbors of a node (connected via outgoing edges).
289    pub fn neighbors(&self, node_id: &str) -> Vec<&str> {
290        self.outgoing_edges(node_id)
291            .iter()
292            .map(|e| e.target_id.as_str())
293            .collect()
294    }
295
296    /// Reverse neighbors (connected via incoming edges).
297    pub fn reverse_neighbors(&self, node_id: &str) -> Vec<&str> {
298        self.incoming_edges(node_id)
299            .iter()
300            .map(|e| e.source_id.as_str())
301            .collect()
302    }
303
304    // -- CRDT application helpers --
305
306    fn apply_add_node(
307        &mut self,
308        node_id: &str,
309        node_type: &str,
310        subtype: Option<&str>,
311        label: &str,
312        properties: &BTreeMap<String, Value>,
313        clock: &LamportClock,
314    ) {
315        if let Some(existing) = self.nodes.get_mut(node_id) {
316            // Add-wins: always resurrect from tombstone.
317            existing.tombstoned = false;
318            // Track the latest add clock for add-wins semantics.
319            if clock_wins(clock, &existing.last_add_clock) {
320                existing.last_add_clock = clock.clone();
321            }
322            // LWW merge for label, subtype, and properties.
323            if clock_wins(clock, &existing.last_clock) {
324                existing.label = label.to_string();
325                existing.subtype = subtype.map(|s| s.to_string());
326                existing.last_clock = clock.clone();
327            }
328            // Per-property LWW: each property from add_node competes
329            // only with writes to the same key.
330            for (k, v) in properties {
331                let dominated = existing
332                    .property_clocks
333                    .get(k)
334                    .map(|c| clock_wins(clock, c))
335                    .unwrap_or(true);
336                if dominated {
337                    existing.properties.insert(k.clone(), v.clone());
338                    existing.property_clocks.insert(k.clone(), clock.clone());
339                }
340            }
341        } else {
342            let property_clocks: HashMap<String, LamportClock> = properties
343                .keys()
344                .map(|k| (k.clone(), clock.clone()))
345                .collect();
346            let node = Node {
347                node_id: node_id.to_string(),
348                node_type: node_type.to_string(),
349                subtype: subtype.map(|s| s.to_string()),
350                label: label.to_string(),
351                properties: properties.clone(),
352                property_clocks,
353                last_clock: clock.clone(),
354                last_add_clock: clock.clone(),
355                tombstoned: false,
356            };
357            self.by_type
358                .entry(node_type.to_string())
359                .or_default()
360                .insert(node_id.to_string());
361            self.nodes.insert(node_id.to_string(), node);
362        }
363    }
364
365    fn apply_add_edge(
366        &mut self,
367        edge_id: &str,
368        edge_type: &str,
369        source_id: &str,
370        target_id: &str,
371        properties: &BTreeMap<String, Value>,
372        clock: &LamportClock,
373    ) {
374        if let Some(existing) = self.edges.get_mut(edge_id) {
375            // Add-wins: always resurrect if tombstoned.
376            existing.tombstoned = false;
377            if clock_wins(clock, &existing.last_add_clock) {
378                existing.last_add_clock = clock.clone();
379            }
380            if clock_wins(clock, &existing.last_clock) {
381                existing.last_clock = clock.clone();
382            }
383            // Per-property LWW for edge properties.
384            for (k, v) in properties {
385                let dominated = existing
386                    .property_clocks
387                    .get(k)
388                    .map(|c| clock_wins(clock, c))
389                    .unwrap_or(true);
390                if dominated {
391                    existing.properties.insert(k.clone(), v.clone());
392                    existing.property_clocks.insert(k.clone(), clock.clone());
393                }
394            }
395        } else {
396            let property_clocks: HashMap<String, LamportClock> = properties
397                .keys()
398                .map(|k| (k.clone(), clock.clone()))
399                .collect();
400            let edge = Edge {
401                edge_id: edge_id.to_string(),
402                edge_type: edge_type.to_string(),
403                source_id: source_id.to_string(),
404                target_id: target_id.to_string(),
405                properties: properties.clone(),
406                property_clocks,
407                last_clock: clock.clone(),
408                last_add_clock: clock.clone(),
409                tombstoned: false,
410            };
411            self.outgoing
412                .entry(source_id.to_string())
413                .or_default()
414                .insert(edge_id.to_string());
415            self.incoming
416                .entry(target_id.to_string())
417                .or_default()
418                .insert(edge_id.to_string());
419            self.edges.insert(edge_id.to_string(), edge);
420        }
421    }
422
423    fn apply_update_property(
424        &mut self,
425        entity_id: &str,
426        key: &str,
427        value: &Value,
428        clock: &LamportClock,
429    ) {
430        // Try node first, then edge. Per-property LWW: each key competes
431        // only with other writes to the same key, not the entire entity.
432        if let Some(node) = self.nodes.get_mut(entity_id) {
433            let dominated = node
434                .property_clocks
435                .get(key)
436                .map(|c| clock_wins(clock, c))
437                .unwrap_or(true);
438            if dominated {
439                node.properties.insert(key.to_string(), value.clone());
440                node.property_clocks.insert(key.to_string(), clock.clone());
441            }
442            // Update entity-level clock for add-wins tracking.
443            if clock_wins(clock, &node.last_clock) {
444                node.last_clock = clock.clone();
445            }
446        } else if let Some(edge) = self.edges.get_mut(entity_id) {
447            let dominated = edge
448                .property_clocks
449                .get(key)
450                .map(|c| clock_wins(clock, c))
451                .unwrap_or(true);
452            if dominated {
453                edge.properties.insert(key.to_string(), value.clone());
454                edge.property_clocks.insert(key.to_string(), clock.clone());
455            }
456            if clock_wins(clock, &edge.last_clock) {
457                edge.last_clock = clock.clone();
458            }
459        }
460        // If entity not found, silently ignore (may arrive out of order in sync).
461    }
462
463    fn apply_remove_node(&mut self, node_id: &str, clock: &LamportClock) {
464        if let Some(node) = self.nodes.get_mut(node_id) {
465            // Add-wins: only tombstone if the remove clock is strictly greater
466            // than the last add clock. If a concurrent (or later) add exists,
467            // the node stays alive.
468            if clock_wins(clock, &node.last_add_clock) {
469                node.tombstoned = true;
470                node.last_clock = clock.clone();
471            }
472        }
473        // Tombstoning a node doesn't physically remove edges — they just become
474        // invisible via is_node_live() checks in queries.
475    }
476
477    fn apply_remove_edge(&mut self, edge_id: &str, clock: &LamportClock) {
478        if let Some(edge) = self.edges.get_mut(edge_id) {
479            // Add-wins: only tombstone if remove clock > last add clock.
480            if clock_wins(clock, &edge.last_add_clock) {
481                edge.tombstoned = true;
482                edge.last_clock = clock.clone();
483            }
484        }
485    }
486
487    fn is_node_live(&self, node_id: &str) -> bool {
488        self.nodes
489            .get(node_id)
490            .map(|n| !n.tombstoned)
491            .unwrap_or(false)
492    }
493}
494
495/// LWW comparison: returns true if `new_clock` wins over `existing_clock`.
496/// Uses HybridClock total ordering: (physical_ms, logical, id).
497fn clock_wins(new_clock: &LamportClock, existing_clock: &LamportClock) -> bool {
498    new_clock.cmp_order(existing_clock) == std::cmp::Ordering::Greater
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504    use crate::entry::Entry;
505    use crate::ontology::{EdgeTypeDef, NodeTypeDef};
506
507    fn test_ontology() -> Ontology {
508        Ontology {
509            node_types: BTreeMap::from([
510                (
511                    "entity".into(),
512                    NodeTypeDef {
513                        description: None,
514                        properties: BTreeMap::new(),
515                        subtypes: None,
516                    },
517                ),
518                (
519                    "signal".into(),
520                    NodeTypeDef {
521                        description: None,
522                        properties: BTreeMap::new(),
523                        subtypes: None,
524                    },
525                ),
526            ]),
527            edge_types: BTreeMap::from([
528                (
529                    "RUNS_ON".into(),
530                    EdgeTypeDef {
531                        description: None,
532                        source_types: vec!["entity".into()],
533                        target_types: vec!["entity".into()],
534                        properties: BTreeMap::new(),
535                    },
536                ),
537                (
538                    "OBSERVES".into(),
539                    EdgeTypeDef {
540                        description: None,
541                        source_types: vec!["signal".into()],
542                        target_types: vec!["entity".into()],
543                        properties: BTreeMap::new(),
544                    },
545                ),
546            ]),
547        }
548    }
549
550    fn make_entry(op: GraphOp, clock_time: u64, author: &str) -> Entry {
551        Entry::new(
552            op,
553            vec![],
554            vec![],
555            LamportClock::with_values(author, clock_time, 0),
556            author,
557        )
558    }
559
560    // -- test_graph.rs spec from docs/silk.md --
561
562    #[test]
563    fn add_node_appears_in_query() {
564        let mut g = MaterializedGraph::new(test_ontology());
565        let entry = make_entry(
566            GraphOp::AddNode {
567                node_id: "server-1".into(),
568                node_type: "entity".into(),
569                label: "Server 1".into(),
570                properties: BTreeMap::from([("ip".into(), Value::String("10.0.0.1".into()))]),
571                subtype: None,
572            },
573            1,
574            "inst-a",
575        );
576        g.apply(&entry);
577
578        let node = g.get_node("server-1").unwrap();
579        assert_eq!(node.node_type, "entity");
580        assert_eq!(node.label, "Server 1");
581        assert_eq!(
582            node.properties.get("ip"),
583            Some(&Value::String("10.0.0.1".into()))
584        );
585    }
586
587    #[test]
588    fn add_edge_creates_adjacency() {
589        let mut g = MaterializedGraph::new(test_ontology());
590        g.apply(&make_entry(
591            GraphOp::AddNode {
592                node_id: "svc".into(),
593                node_type: "entity".into(),
594                label: "svc".into(),
595                properties: BTreeMap::new(),
596                subtype: None,
597            },
598            1,
599            "inst-a",
600        ));
601        g.apply(&make_entry(
602            GraphOp::AddNode {
603                node_id: "srv".into(),
604                node_type: "entity".into(),
605                label: "srv".into(),
606                properties: BTreeMap::new(),
607                subtype: None,
608            },
609            2,
610            "inst-a",
611        ));
612        g.apply(&make_entry(
613            GraphOp::AddEdge {
614                edge_id: "e1".into(),
615                edge_type: "RUNS_ON".into(),
616                source_id: "svc".into(),
617                target_id: "srv".into(),
618                properties: BTreeMap::new(),
619            },
620            3,
621            "inst-a",
622        ));
623
624        // Both endpoints know about the edge.
625        let out = g.outgoing_edges("svc");
626        assert_eq!(out.len(), 1);
627        assert_eq!(out[0].target_id, "srv");
628
629        let inc = g.incoming_edges("srv");
630        assert_eq!(inc.len(), 1);
631        assert_eq!(inc[0].source_id, "svc");
632
633        assert_eq!(g.neighbors("svc"), vec!["srv"]);
634    }
635
636    #[test]
637    fn update_property_reflected() {
638        let mut g = MaterializedGraph::new(test_ontology());
639        g.apply(&make_entry(
640            GraphOp::AddNode {
641                node_id: "s1".into(),
642                node_type: "entity".into(),
643                label: "s1".into(),
644                properties: BTreeMap::new(),
645                subtype: None,
646            },
647            1,
648            "inst-a",
649        ));
650        g.apply(&make_entry(
651            GraphOp::UpdateProperty {
652                entity_id: "s1".into(),
653                key: "cpu".into(),
654                value: Value::Float(85.5),
655            },
656            2,
657            "inst-a",
658        ));
659
660        let node = g.get_node("s1").unwrap();
661        assert_eq!(node.properties.get("cpu"), Some(&Value::Float(85.5)));
662    }
663
664    #[test]
665    fn remove_node_cascades_edges() {
666        let mut g = MaterializedGraph::new(test_ontology());
667        g.apply(&make_entry(
668            GraphOp::AddNode {
669                node_id: "a".into(),
670                node_type: "entity".into(),
671                label: "a".into(),
672                properties: BTreeMap::new(),
673                subtype: None,
674            },
675            1,
676            "inst-a",
677        ));
678        g.apply(&make_entry(
679            GraphOp::AddNode {
680                node_id: "b".into(),
681                node_type: "entity".into(),
682                label: "b".into(),
683                properties: BTreeMap::new(),
684                subtype: None,
685            },
686            2,
687            "inst-a",
688        ));
689        g.apply(&make_entry(
690            GraphOp::AddEdge {
691                edge_id: "e1".into(),
692                edge_type: "RUNS_ON".into(),
693                source_id: "a".into(),
694                target_id: "b".into(),
695                properties: BTreeMap::new(),
696            },
697            3,
698            "inst-a",
699        ));
700        assert_eq!(g.all_edges().len(), 1);
701
702        // Remove node 'b' — edge becomes invisible (dangling target).
703        g.apply(&make_entry(
704            GraphOp::RemoveNode {
705                node_id: "b".into(),
706            },
707            4,
708            "inst-a",
709        ));
710        assert!(g.get_node("b").is_none());
711        // Edge still exists but not returned by all_edges (target tombstoned).
712        assert_eq!(g.all_edges().len(), 0);
713        // Outgoing from 'a' also filters out dangling edges.
714        assert_eq!(g.outgoing_edges("a").len(), 0);
715    }
716
717    #[test]
718    fn remove_edge_preserves_nodes() {
719        let mut g = MaterializedGraph::new(test_ontology());
720        g.apply(&make_entry(
721            GraphOp::AddNode {
722                node_id: "a".into(),
723                node_type: "entity".into(),
724                label: "a".into(),
725                properties: BTreeMap::new(),
726                subtype: None,
727            },
728            1,
729            "inst-a",
730        ));
731        g.apply(&make_entry(
732            GraphOp::AddNode {
733                node_id: "b".into(),
734                node_type: "entity".into(),
735                label: "b".into(),
736                properties: BTreeMap::new(),
737                subtype: None,
738            },
739            2,
740            "inst-a",
741        ));
742        g.apply(&make_entry(
743            GraphOp::AddEdge {
744                edge_id: "e1".into(),
745                edge_type: "RUNS_ON".into(),
746                source_id: "a".into(),
747                target_id: "b".into(),
748                properties: BTreeMap::new(),
749            },
750            3,
751            "inst-a",
752        ));
753        g.apply(&make_entry(
754            GraphOp::RemoveEdge {
755                edge_id: "e1".into(),
756            },
757            4,
758            "inst-a",
759        ));
760
761        // Nodes still exist.
762        assert!(g.get_node("a").is_some());
763        assert!(g.get_node("b").is_some());
764        // Edge is gone.
765        assert!(g.get_edge("e1").is_none());
766        assert_eq!(g.all_edges().len(), 0);
767    }
768
769    #[test]
770    fn query_by_type_filters() {
771        let mut g = MaterializedGraph::new(test_ontology());
772        g.apply(&make_entry(
773            GraphOp::AddNode {
774                node_id: "s1".into(),
775                node_type: "entity".into(),
776                label: "s1".into(),
777                properties: BTreeMap::new(),
778                subtype: None,
779            },
780            1,
781            "inst-a",
782        ));
783        g.apply(&make_entry(
784            GraphOp::AddNode {
785                node_id: "s2".into(),
786                node_type: "entity".into(),
787                label: "s2".into(),
788                properties: BTreeMap::new(),
789                subtype: None,
790            },
791            2,
792            "inst-a",
793        ));
794        g.apply(&make_entry(
795            GraphOp::AddNode {
796                node_id: "alert".into(),
797                node_type: "signal".into(),
798                label: "alert".into(),
799                properties: BTreeMap::new(),
800                subtype: None,
801            },
802            3,
803            "inst-a",
804        ));
805
806        let entities = g.nodes_by_type("entity");
807        assert_eq!(entities.len(), 2);
808        let signals = g.nodes_by_type("signal");
809        assert_eq!(signals.len(), 1);
810        assert_eq!(signals[0].node_id, "alert");
811    }
812
813    #[test]
814    fn query_by_property_filters() {
815        let mut g = MaterializedGraph::new(test_ontology());
816        g.apply(&make_entry(
817            GraphOp::AddNode {
818                node_id: "s1".into(),
819                node_type: "entity".into(),
820                label: "s1".into(),
821                properties: BTreeMap::from([("status".into(), Value::String("alive".into()))]),
822                subtype: None,
823            },
824            1,
825            "inst-a",
826        ));
827        g.apply(&make_entry(
828            GraphOp::AddNode {
829                node_id: "s2".into(),
830                node_type: "entity".into(),
831                label: "s2".into(),
832                properties: BTreeMap::from([("status".into(), Value::String("dead".into()))]),
833                subtype: None,
834            },
835            2,
836            "inst-a",
837        ));
838
839        let alive = g.nodes_by_property("status", &Value::String("alive".into()));
840        assert_eq!(alive.len(), 1);
841        assert_eq!(alive[0].node_id, "s1");
842    }
843
844    #[test]
845    fn materialization_from_empty() {
846        // Build graph incrementally.
847        let mut g1 = MaterializedGraph::new(test_ontology());
848        let entries = vec![
849            make_entry(
850                GraphOp::DefineOntology {
851                    ontology: test_ontology(),
852                },
853                0,
854                "inst-a",
855            ),
856            make_entry(
857                GraphOp::AddNode {
858                    node_id: "a".into(),
859                    node_type: "entity".into(),
860                    label: "a".into(),
861                    properties: BTreeMap::new(),
862                    subtype: None,
863                },
864                1,
865                "inst-a",
866            ),
867            make_entry(
868                GraphOp::AddNode {
869                    node_id: "b".into(),
870                    node_type: "entity".into(),
871                    label: "b".into(),
872                    properties: BTreeMap::new(),
873                    subtype: None,
874                },
875                2,
876                "inst-a",
877            ),
878            make_entry(
879                GraphOp::AddEdge {
880                    edge_id: "e1".into(),
881                    edge_type: "RUNS_ON".into(),
882                    source_id: "a".into(),
883                    target_id: "b".into(),
884                    properties: BTreeMap::new(),
885                },
886                3,
887                "inst-a",
888            ),
889        ];
890        for e in &entries {
891            g1.apply(e);
892        }
893
894        // Rebuild from scratch.
895        let mut g2 = MaterializedGraph::new(test_ontology());
896        let refs: Vec<&Entry> = entries.iter().collect();
897        g2.rebuild(&refs);
898
899        // Same result.
900        assert_eq!(g1.all_nodes().len(), g2.all_nodes().len());
901        assert_eq!(g1.all_edges().len(), g2.all_edges().len());
902        for node in g1.all_nodes() {
903            let n2 = g2.get_node(&node.node_id).unwrap();
904            assert_eq!(node.node_type, n2.node_type);
905            assert_eq!(node.properties, n2.properties);
906        }
907    }
908
909    #[test]
910    fn incremental_equals_full() {
911        let entries = vec![
912            make_entry(
913                GraphOp::DefineOntology {
914                    ontology: test_ontology(),
915                },
916                0,
917                "inst-a",
918            ),
919            make_entry(
920                GraphOp::AddNode {
921                    node_id: "a".into(),
922                    node_type: "entity".into(),
923                    label: "a".into(),
924                    properties: BTreeMap::from([("x".into(), Value::Int(1))]),
925                    subtype: None,
926                },
927                1,
928                "inst-a",
929            ),
930            make_entry(
931                GraphOp::UpdateProperty {
932                    entity_id: "a".into(),
933                    key: "x".into(),
934                    value: Value::Int(2),
935                },
936                2,
937                "inst-a",
938            ),
939            make_entry(
940                GraphOp::AddNode {
941                    node_id: "b".into(),
942                    node_type: "entity".into(),
943                    label: "b".into(),
944                    properties: BTreeMap::new(),
945                    subtype: None,
946                },
947                3,
948                "inst-a",
949            ),
950            make_entry(
951                GraphOp::AddEdge {
952                    edge_id: "e1".into(),
953                    edge_type: "RUNS_ON".into(),
954                    source_id: "a".into(),
955                    target_id: "b".into(),
956                    properties: BTreeMap::new(),
957                },
958                4,
959                "inst-a",
960            ),
961            make_entry(
962                GraphOp::RemoveEdge {
963                    edge_id: "e1".into(),
964                },
965                5,
966                "inst-a",
967            ),
968        ];
969
970        // Incremental.
971        let mut g_inc = MaterializedGraph::new(test_ontology());
972        for e in &entries {
973            g_inc.apply(e);
974        }
975
976        // Full replay.
977        let mut g_full = MaterializedGraph::new(test_ontology());
978        let refs: Vec<&Entry> = entries.iter().collect();
979        g_full.rebuild(&refs);
980
981        // Property should be 2 (updated).
982        assert_eq!(
983            g_inc.get_node("a").unwrap().properties.get("x"),
984            Some(&Value::Int(2))
985        );
986        assert_eq!(
987            g_full.get_node("a").unwrap().properties.get("x"),
988            Some(&Value::Int(2))
989        );
990        // Edge should be removed.
991        assert_eq!(g_inc.all_edges().len(), 0);
992        assert_eq!(g_full.all_edges().len(), 0);
993    }
994
995    #[test]
996    fn lww_concurrent_property_update() {
997        // Two instances update the same property — higher clock wins.
998        let mut g = MaterializedGraph::new(test_ontology());
999        g.apply(&make_entry(
1000            GraphOp::AddNode {
1001                node_id: "s1".into(),
1002                node_type: "entity".into(),
1003                label: "s1".into(),
1004                properties: BTreeMap::new(),
1005                subtype: None,
1006            },
1007            1,
1008            "inst-a",
1009        ));
1010        // inst-a sets status=alive at time 2
1011        g.apply(&make_entry(
1012            GraphOp::UpdateProperty {
1013                entity_id: "s1".into(),
1014                key: "status".into(),
1015                value: Value::String("alive".into()),
1016            },
1017            2,
1018            "inst-a",
1019        ));
1020        // inst-b sets status=dead at time 3 — wins (higher clock)
1021        g.apply(&make_entry(
1022            GraphOp::UpdateProperty {
1023                entity_id: "s1".into(),
1024                key: "status".into(),
1025                value: Value::String("dead".into()),
1026            },
1027            3,
1028            "inst-b",
1029        ));
1030        assert_eq!(
1031            g.get_node("s1").unwrap().properties.get("status"),
1032            Some(&Value::String("dead".into()))
1033        );
1034    }
1035
1036    #[test]
1037    fn lww_tiebreak_by_instance_id() {
1038        // Same clock time — higher instance ID wins.
1039        let mut g = MaterializedGraph::new(test_ontology());
1040        g.apply(&make_entry(
1041            GraphOp::AddNode {
1042                node_id: "s1".into(),
1043                node_type: "entity".into(),
1044                label: "s1".into(),
1045                properties: BTreeMap::new(),
1046                subtype: None,
1047            },
1048            1,
1049            "inst-a",
1050        ));
1051        // Both at physical_ms=5, logical=0. Lower id wins → "inst-a" wins.
1052        g.apply(&make_entry(
1053            GraphOp::UpdateProperty {
1054                entity_id: "s1".into(),
1055                key: "x".into(),
1056                value: Value::Int(1),
1057            },
1058            5,
1059            "inst-a",
1060        ));
1061        g.apply(&make_entry(
1062            GraphOp::UpdateProperty {
1063                entity_id: "s1".into(),
1064                key: "x".into(),
1065                value: Value::Int(2),
1066            },
1067            5,
1068            "inst-b",
1069        ));
1070        // inst-a has lower id → wins the tiebreak → value stays Int(1).
1071        assert_eq!(
1072            g.get_node("s1").unwrap().properties.get("x"),
1073            Some(&Value::Int(1))
1074        );
1075    }
1076
1077    #[test]
1078    fn lww_per_property_concurrent_different_keys() {
1079        // Two instances concurrently update DIFFERENT properties at the same
1080        // clock time. Both updates must be accepted — they don't conflict.
1081        // This requires per-property LWW, not node-level LWW.
1082        let mut g = MaterializedGraph::new(test_ontology());
1083        g.apply(&make_entry(
1084            GraphOp::AddNode {
1085                node_id: "s1".into(),
1086                node_type: "entity".into(),
1087                label: "s1".into(),
1088                properties: BTreeMap::from([
1089                    ("x".into(), Value::Int(0)),
1090                    ("y".into(), Value::Int(0)),
1091                ]),
1092                subtype: None,
1093            },
1094            1,
1095            "inst-a",
1096        ));
1097        // inst-a updates "x" at time 3
1098        g.apply(&make_entry(
1099            GraphOp::UpdateProperty {
1100                entity_id: "s1".into(),
1101                key: "x".into(),
1102                value: Value::Int(42),
1103            },
1104            3,
1105            "inst-a",
1106        ));
1107        // inst-b updates "y" at time 3 (concurrent, different property)
1108        g.apply(&make_entry(
1109            GraphOp::UpdateProperty {
1110                entity_id: "s1".into(),
1111                key: "y".into(),
1112                value: Value::Int(99),
1113            },
1114            3,
1115            "inst-b",
1116        ));
1117
1118        let node = g.get_node("s1").unwrap();
1119        // Both updates must be applied — no conflict.
1120        assert_eq!(
1121            node.properties.get("x"),
1122            Some(&Value::Int(42)),
1123            "update to 'x' must not be rejected by concurrent update to 'y'"
1124        );
1125        assert_eq!(
1126            node.properties.get("y"),
1127            Some(&Value::Int(99)),
1128            "update to 'y' must not be rejected by concurrent update to 'x'"
1129        );
1130    }
1131
1132    #[test]
1133    fn lww_per_property_order_independent() {
1134        // Same scenario but applied in reverse order — result must be identical.
1135        let mut g = MaterializedGraph::new(test_ontology());
1136        g.apply(&make_entry(
1137            GraphOp::AddNode {
1138                node_id: "s1".into(),
1139                node_type: "entity".into(),
1140                label: "s1".into(),
1141                properties: BTreeMap::from([
1142                    ("x".into(), Value::Int(0)),
1143                    ("y".into(), Value::Int(0)),
1144                ]),
1145                subtype: None,
1146            },
1147            1,
1148            "inst-a",
1149        ));
1150        // Apply inst-b first this time
1151        g.apply(&make_entry(
1152            GraphOp::UpdateProperty {
1153                entity_id: "s1".into(),
1154                key: "y".into(),
1155                value: Value::Int(99),
1156            },
1157            3,
1158            "inst-b",
1159        ));
1160        g.apply(&make_entry(
1161            GraphOp::UpdateProperty {
1162                entity_id: "s1".into(),
1163                key: "x".into(),
1164                value: Value::Int(42),
1165            },
1166            3,
1167            "inst-a",
1168        ));
1169
1170        let node = g.get_node("s1").unwrap();
1171        assert_eq!(node.properties.get("x"), Some(&Value::Int(42)));
1172        assert_eq!(node.properties.get("y"), Some(&Value::Int(99)));
1173    }
1174
1175    #[test]
1176    fn add_wins_over_remove() {
1177        // Concurrent add + remove → node should exist (add-wins).
1178        let mut g = MaterializedGraph::new(test_ontology());
1179        g.apply(&make_entry(
1180            GraphOp::AddNode {
1181                node_id: "s1".into(),
1182                node_type: "entity".into(),
1183                label: "s1".into(),
1184                properties: BTreeMap::new(),
1185                subtype: None,
1186            },
1187            1,
1188            "inst-a",
1189        ));
1190        // Remove at time 2.
1191        g.apply(&make_entry(
1192            GraphOp::RemoveNode {
1193                node_id: "s1".into(),
1194            },
1195            2,
1196            "inst-a",
1197        ));
1198        assert!(g.get_node("s1").is_none());
1199
1200        // Re-add at time 3 (add-wins — resurrects).
1201        g.apply(&make_entry(
1202            GraphOp::AddNode {
1203                node_id: "s1".into(),
1204                node_type: "entity".into(),
1205                label: "s1 v2".into(),
1206                properties: BTreeMap::new(),
1207                subtype: None,
1208            },
1209            3,
1210            "inst-b",
1211        ));
1212        let node = g.get_node("s1").unwrap();
1213        assert_eq!(node.label, "s1 v2");
1214        assert!(!node.tombstoned);
1215    }
1216}