Skip to main content

silk/
graph.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2
3use crate::clock::LamportClock;
4use crate::entry::{Entry, GraphOp, Hash, Value};
5use crate::ontology::Ontology;
6
7/// A materialized node in the graph.
8#[derive(Debug, Clone, PartialEq)]
9pub struct Node {
10    pub node_id: String,
11    pub node_type: String,
12    pub subtype: Option<String>,
13    pub label: String,
14    pub properties: BTreeMap<String, Value>,
15    /// Per-property clocks for LWW conflict resolution.
16    /// Each property key tracks the clock of its last write, so
17    /// concurrent updates to different properties don't conflict.
18    pub property_clocks: HashMap<String, LamportClock>,
19    /// Clock of the entry that last modified this node.
20    /// Used for add-wins semantics and label LWW.
21    pub last_clock: LamportClock,
22    /// Clock of the most recent AddNode for this node.
23    /// Used for add-wins semantics: remove only wins if its clock
24    /// is strictly greater than last_add_clock.
25    pub last_add_clock: LamportClock,
26    /// Whether this node has been tombstoned (removed).
27    pub tombstoned: bool,
28}
29
30/// A materialized edge in the graph.
31#[derive(Debug, Clone, PartialEq)]
32pub struct Edge {
33    pub edge_id: String,
34    pub edge_type: String,
35    pub source_id: String,
36    pub target_id: String,
37    pub properties: BTreeMap<String, Value>,
38    /// Per-property clocks for LWW conflict resolution.
39    pub property_clocks: HashMap<String, LamportClock>,
40    pub last_clock: LamportClock,
41    /// Clock of the most recent AddEdge for this edge.
42    pub last_add_clock: LamportClock,
43    pub tombstoned: bool,
44}
45
46/// Materialized graph — derived from the op log.
47///
48/// Provides fast queries without replaying the full log.
49/// Updated incrementally as new entries arrive, or rebuilt
50/// from scratch by replaying the entire op log.
51///
52/// CRDT semantics:
53/// - **Add-wins** for topology (concurrent add + remove → node/edge exists)
54/// - **LWW** (Last-Writer-Wins) per property key (highest Lamport clock wins)
55/// - **Tombstones** for deletes (mark as deleted, don't physically remove)
56pub struct MaterializedGraph {
57    /// node_id → Node
58    pub nodes: HashMap<String, Node>,
59    /// edge_id → Edge
60    pub edges: HashMap<String, Edge>,
61    /// node_id → set of outgoing edge_ids
62    pub outgoing: HashMap<String, HashSet<String>>,
63    /// node_id → set of incoming edge_ids
64    pub incoming: HashMap<String, HashSet<String>>,
65    /// node_type → set of node_ids (type index)
66    pub by_type: HashMap<String, HashSet<String>>,
67    /// The ontology (for validation during materialization)
68    pub ontology: Ontology,
69    /// R-02: entries that failed ontology validation during apply().
70    /// These entries exist in the oplog (for CRDT convergence) but are
71    /// invisible in the materialized graph. Grow-only set — monotonic, safe.
72    pub quarantined: HashSet<Hash>,
73}
74
75impl MaterializedGraph {
76    /// Create an empty materialized graph with the given ontology.
77    pub fn new(ontology: Ontology) -> Self {
78        Self {
79            nodes: HashMap::new(),
80            edges: HashMap::new(),
81            outgoing: HashMap::new(),
82            incoming: HashMap::new(),
83            by_type: HashMap::new(),
84            ontology,
85            quarantined: HashSet::new(),
86        }
87    }
88
89    /// Apply a single entry to the graph (incremental materialization).
90    ///
91    /// R-02: Validates AddNode/AddEdge payloads against the ontology.
92    /// Invalid entries are quarantined (added to `self.quarantined`) and
93    /// skipped for materialization. They remain in the oplog for CRDT
94    /// convergence — quarantine is a graph-layer concern, not an oplog concern.
95    pub fn apply(&mut self, entry: &Entry) {
96        match &entry.payload {
97            GraphOp::Checkpoint { ops, op_clocks, .. } => {
98                // R-08: Replay synthetic ops to restore graph state.
99                // Bug 6 fix: use per-op clocks (preserves LWW metadata).
100                for (i, op) in ops.iter().enumerate() {
101                    let clock = if i < op_clocks.len() {
102                        LamportClock::with_values(&entry.author, op_clocks[i].0, op_clocks[i].1)
103                    } else {
104                        entry.clock.clone() // fallback for old checkpoints without op_clocks
105                    };
106                    let synthetic = Entry::new(op.clone(), vec![], vec![], clock, &entry.author);
107                    self.apply(&synthetic);
108                }
109            }
110            GraphOp::DefineOntology { .. } => {
111                // Genesis — nothing to materialize.
112            }
113            GraphOp::ExtendOntology { extension } => {
114                if let Err(_e) = self.ontology.merge_extension(extension) {
115                    self.quarantined.insert(entry.hash);
116                }
117            }
118            GraphOp::AddNode {
119                node_id,
120                node_type,
121                subtype,
122                label,
123                properties,
124            } => {
125                // R-02: validate against ontology, quarantine if invalid
126                if let Err(_e) =
127                    self.ontology
128                        .validate_node(node_type, subtype.as_deref(), properties)
129                {
130                    self.quarantined.insert(entry.hash);
131                    return;
132                }
133                self.apply_add_node(
134                    node_id,
135                    node_type,
136                    subtype.as_deref(),
137                    label,
138                    properties,
139                    &entry.clock,
140                );
141            }
142            GraphOp::AddEdge {
143                edge_id,
144                edge_type,
145                source_id,
146                target_id,
147                properties,
148            } => {
149                // R-02: validate edge type exists.
150                if !self.ontology.edge_types.contains_key(edge_type.as_str()) {
151                    self.quarantined.insert(entry.hash);
152                    return;
153                }
154                // Bug 13 fix: validate source/target type constraints when both nodes
155                // are materialized. If one is missing (out-of-order sync), skip —
156                // validation happens on rebuild.
157                if let (Some(src), Some(tgt)) = (
158                    self.nodes.get(source_id.as_str()),
159                    self.nodes.get(target_id.as_str()),
160                ) {
161                    if let Err(_) = self.ontology.validate_edge(
162                        edge_type,
163                        &src.node_type,
164                        &tgt.node_type,
165                        properties,
166                    ) {
167                        self.quarantined.insert(entry.hash);
168                        return;
169                    }
170                }
171                self.apply_add_edge(
172                    edge_id,
173                    edge_type,
174                    source_id,
175                    target_id,
176                    properties,
177                    &entry.clock,
178                );
179            }
180            GraphOp::UpdateProperty {
181                entity_id,
182                key,
183                value,
184            } => {
185                self.apply_update_property(entity_id, key, value, &entry.clock);
186            }
187            GraphOp::RemoveNode { node_id } => {
188                self.apply_remove_node(node_id, &entry.clock);
189            }
190            GraphOp::RemoveEdge { edge_id } => {
191                self.apply_remove_edge(edge_id, &entry.clock);
192            }
193        }
194    }
195
196    /// Apply a sequence of entries (full rematerialization from op log).
197    pub fn apply_all(&mut self, entries: &[&Entry]) {
198        for entry in entries {
199            self.apply(entry);
200        }
201    }
202
203    /// Rebuild from scratch: clear everything and replay all entries.
204    pub fn rebuild(&mut self, entries: &[&Entry]) {
205        self.nodes.clear();
206        self.edges.clear();
207        self.outgoing.clear();
208        self.incoming.clear();
209        self.by_type.clear();
210        self.quarantined.clear();
211        self.apply_all(entries);
212    }
213
214    // -- Queries --
215
216    /// Get a node by ID (returns None if not found or tombstoned).
217    pub fn get_node(&self, node_id: &str) -> Option<&Node> {
218        self.nodes.get(node_id).filter(|n| !n.tombstoned)
219    }
220
221    /// Get an edge by ID (returns None if not found or tombstoned).
222    pub fn get_edge(&self, edge_id: &str) -> Option<&Edge> {
223        self.edges.get(edge_id).filter(|e| !e.tombstoned)
224    }
225
226    /// Query all live nodes of a given type.
227    pub fn nodes_by_type(&self, node_type: &str) -> Vec<&Node> {
228        match self.by_type.get(node_type) {
229            Some(ids) => ids.iter().filter_map(|id| self.get_node(id)).collect(),
230            None => vec![],
231        }
232    }
233
234    /// Query all live nodes of a given subtype.
235    pub fn nodes_by_subtype(&self, subtype: &str) -> Vec<&Node> {
236        self.nodes
237            .values()
238            .filter(|n| !n.tombstoned && n.subtype.as_deref() == Some(subtype))
239            .collect()
240    }
241
242    /// Query nodes by a property value.
243    pub fn nodes_by_property(&self, key: &str, value: &Value) -> Vec<&Node> {
244        self.nodes
245            .values()
246            .filter(|n| !n.tombstoned && n.properties.get(key) == Some(value))
247            .collect()
248    }
249
250    /// Get outgoing edges for a node (only live edges with live endpoints).
251    pub fn outgoing_edges(&self, node_id: &str) -> Vec<&Edge> {
252        match self.outgoing.get(node_id) {
253            Some(edge_ids) => edge_ids
254                .iter()
255                .filter_map(|eid| self.get_edge(eid))
256                .filter(|e| self.is_node_live(&e.target_id))
257                .collect(),
258            None => vec![],
259        }
260    }
261
262    /// Get incoming edges for a node (only live edges with live endpoints).
263    pub fn incoming_edges(&self, node_id: &str) -> Vec<&Edge> {
264        match self.incoming.get(node_id) {
265            Some(edge_ids) => edge_ids
266                .iter()
267                .filter_map(|eid| self.get_edge(eid))
268                .filter(|e| self.is_node_live(&e.source_id))
269                .collect(),
270            None => vec![],
271        }
272    }
273
274    /// All live nodes.
275    pub fn all_nodes(&self) -> Vec<&Node> {
276        self.nodes.values().filter(|n| !n.tombstoned).collect()
277    }
278
279    /// All live edges (with live endpoints).
280    pub fn all_edges(&self) -> Vec<&Edge> {
281        self.edges
282            .values()
283            .filter(|e| {
284                !e.tombstoned && self.is_node_live(&e.source_id) && self.is_node_live(&e.target_id)
285            })
286            .collect()
287    }
288
289    /// Neighbors of a node (connected via outgoing edges).
290    pub fn neighbors(&self, node_id: &str) -> Vec<&str> {
291        self.outgoing_edges(node_id)
292            .iter()
293            .map(|e| e.target_id.as_str())
294            .collect()
295    }
296
297    /// Reverse neighbors (connected via incoming edges).
298    pub fn reverse_neighbors(&self, node_id: &str) -> Vec<&str> {
299        self.incoming_edges(node_id)
300            .iter()
301            .map(|e| e.source_id.as_str())
302            .collect()
303    }
304
305    // -- CRDT application helpers --
306
307    fn apply_add_node(
308        &mut self,
309        node_id: &str,
310        node_type: &str,
311        subtype: Option<&str>,
312        label: &str,
313        properties: &BTreeMap<String, Value>,
314        clock: &LamportClock,
315    ) {
316        if let Some(existing) = self.nodes.get_mut(node_id) {
317            // Add-wins: always resurrect from tombstone.
318            existing.tombstoned = false;
319            // Track the latest add clock for add-wins semantics.
320            if clock_wins(clock, &existing.last_add_clock) {
321                existing.last_add_clock = clock.clone();
322            }
323            // LWW merge for label, subtype, and properties.
324            if clock_wins(clock, &existing.last_clock) {
325                existing.label = label.to_string();
326                existing.subtype = subtype.map(|s| s.to_string());
327                existing.last_clock = clock.clone();
328            }
329            // Per-property LWW: each property from add_node competes
330            // only with writes to the same key.
331            for (k, v) in properties {
332                let dominated = existing
333                    .property_clocks
334                    .get(k)
335                    .map(|c| clock_wins(clock, c))
336                    .unwrap_or(true);
337                if dominated {
338                    existing.properties.insert(k.clone(), v.clone());
339                    existing.property_clocks.insert(k.clone(), clock.clone());
340                }
341            }
342        } else {
343            let property_clocks: HashMap<String, LamportClock> = properties
344                .keys()
345                .map(|k| (k.clone(), clock.clone()))
346                .collect();
347            let node = Node {
348                node_id: node_id.to_string(),
349                node_type: node_type.to_string(),
350                subtype: subtype.map(|s| s.to_string()),
351                label: label.to_string(),
352                properties: properties.clone(),
353                property_clocks,
354                last_clock: clock.clone(),
355                last_add_clock: clock.clone(),
356                tombstoned: false,
357            };
358            self.by_type
359                .entry(node_type.to_string())
360                .or_default()
361                .insert(node_id.to_string());
362            self.nodes.insert(node_id.to_string(), node);
363        }
364    }
365
366    fn apply_add_edge(
367        &mut self,
368        edge_id: &str,
369        edge_type: &str,
370        source_id: &str,
371        target_id: &str,
372        properties: &BTreeMap<String, Value>,
373        clock: &LamportClock,
374    ) {
375        if let Some(existing) = self.edges.get_mut(edge_id) {
376            // Add-wins: always resurrect if tombstoned.
377            existing.tombstoned = false;
378            if clock_wins(clock, &existing.last_add_clock) {
379                existing.last_add_clock = clock.clone();
380            }
381            if clock_wins(clock, &existing.last_clock) {
382                existing.last_clock = clock.clone();
383            }
384            // Per-property LWW for edge properties.
385            for (k, v) in properties {
386                let dominated = existing
387                    .property_clocks
388                    .get(k)
389                    .map(|c| clock_wins(clock, c))
390                    .unwrap_or(true);
391                if dominated {
392                    existing.properties.insert(k.clone(), v.clone());
393                    existing.property_clocks.insert(k.clone(), clock.clone());
394                }
395            }
396        } else {
397            let property_clocks: HashMap<String, LamportClock> = properties
398                .keys()
399                .map(|k| (k.clone(), clock.clone()))
400                .collect();
401            let edge = Edge {
402                edge_id: edge_id.to_string(),
403                edge_type: edge_type.to_string(),
404                source_id: source_id.to_string(),
405                target_id: target_id.to_string(),
406                properties: properties.clone(),
407                property_clocks,
408                last_clock: clock.clone(),
409                last_add_clock: clock.clone(),
410                tombstoned: false,
411            };
412            self.outgoing
413                .entry(source_id.to_string())
414                .or_default()
415                .insert(edge_id.to_string());
416            self.incoming
417                .entry(target_id.to_string())
418                .or_default()
419                .insert(edge_id.to_string());
420            self.edges.insert(edge_id.to_string(), edge);
421        }
422    }
423
424    fn apply_update_property(
425        &mut self,
426        entity_id: &str,
427        key: &str,
428        value: &Value,
429        clock: &LamportClock,
430    ) {
431        // Try node first, then edge. Per-property LWW: each key competes
432        // only with other writes to the same key, not the entire entity.
433        if let Some(node) = self.nodes.get_mut(entity_id) {
434            let dominated = node
435                .property_clocks
436                .get(key)
437                .map(|c| clock_wins(clock, c))
438                .unwrap_or(true);
439            if dominated {
440                node.properties.insert(key.to_string(), value.clone());
441                node.property_clocks.insert(key.to_string(), clock.clone());
442            }
443            // Update entity-level clock for add-wins tracking.
444            if clock_wins(clock, &node.last_clock) {
445                node.last_clock = clock.clone();
446            }
447        } else if let Some(edge) = self.edges.get_mut(entity_id) {
448            let dominated = edge
449                .property_clocks
450                .get(key)
451                .map(|c| clock_wins(clock, c))
452                .unwrap_or(true);
453            if dominated {
454                edge.properties.insert(key.to_string(), value.clone());
455                edge.property_clocks.insert(key.to_string(), clock.clone());
456            }
457            if clock_wins(clock, &edge.last_clock) {
458                edge.last_clock = clock.clone();
459            }
460        }
461        // If entity not found, silently ignore (may arrive out of order in sync).
462    }
463
464    fn apply_remove_node(&mut self, node_id: &str, clock: &LamportClock) {
465        if let Some(node) = self.nodes.get_mut(node_id) {
466            // Add-wins: only tombstone if the remove clock is strictly greater
467            // than the last add clock. If a concurrent (or later) add exists,
468            // the node stays alive.
469            if clock_wins(clock, &node.last_add_clock) {
470                node.tombstoned = true;
471                node.last_clock = clock.clone();
472            }
473        }
474        // Tombstoning a node doesn't physically remove edges — they just become
475        // invisible via is_node_live() checks in queries.
476    }
477
478    fn apply_remove_edge(&mut self, edge_id: &str, clock: &LamportClock) {
479        if let Some(edge) = self.edges.get_mut(edge_id) {
480            // Add-wins: only tombstone if remove clock > last add clock.
481            if clock_wins(clock, &edge.last_add_clock) {
482                edge.tombstoned = true;
483                edge.last_clock = clock.clone();
484            }
485        }
486    }
487
488    fn is_node_live(&self, node_id: &str) -> bool {
489        self.nodes
490            .get(node_id)
491            .map(|n| !n.tombstoned)
492            .unwrap_or(false)
493    }
494}
495
496/// LWW comparison: returns true if `new_clock` wins over `existing_clock`.
497/// Uses HybridClock total ordering: (physical_ms, logical, id).
498fn clock_wins(new_clock: &LamportClock, existing_clock: &LamportClock) -> bool {
499    new_clock.cmp_order(existing_clock) == std::cmp::Ordering::Greater
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use crate::entry::Entry;
506    use crate::ontology::{EdgeTypeDef, NodeTypeDef};
507
508    fn test_ontology() -> Ontology {
509        Ontology {
510            node_types: BTreeMap::from([
511                (
512                    "entity".into(),
513                    NodeTypeDef {
514                        description: None,
515                        properties: BTreeMap::new(),
516                        subtypes: None,
517                    },
518                ),
519                (
520                    "signal".into(),
521                    NodeTypeDef {
522                        description: None,
523                        properties: BTreeMap::new(),
524                        subtypes: None,
525                    },
526                ),
527            ]),
528            edge_types: BTreeMap::from([
529                (
530                    "RUNS_ON".into(),
531                    EdgeTypeDef {
532                        description: None,
533                        source_types: vec!["entity".into()],
534                        target_types: vec!["entity".into()],
535                        properties: BTreeMap::new(),
536                    },
537                ),
538                (
539                    "OBSERVES".into(),
540                    EdgeTypeDef {
541                        description: None,
542                        source_types: vec!["signal".into()],
543                        target_types: vec!["entity".into()],
544                        properties: BTreeMap::new(),
545                    },
546                ),
547            ]),
548        }
549    }
550
551    fn make_entry(op: GraphOp, clock_time: u64, author: &str) -> Entry {
552        Entry::new(
553            op,
554            vec![],
555            vec![],
556            LamportClock::with_values(author, clock_time, 0),
557            author,
558        )
559    }
560
561    // -- test_graph.rs spec from docs/silk.md --
562
563    #[test]
564    fn add_node_appears_in_query() {
565        let mut g = MaterializedGraph::new(test_ontology());
566        let entry = make_entry(
567            GraphOp::AddNode {
568                node_id: "server-1".into(),
569                node_type: "entity".into(),
570                label: "Server 1".into(),
571                properties: BTreeMap::from([("ip".into(), Value::String("10.0.0.1".into()))]),
572                subtype: None,
573            },
574            1,
575            "inst-a",
576        );
577        g.apply(&entry);
578
579        let node = g.get_node("server-1").unwrap();
580        assert_eq!(node.node_type, "entity");
581        assert_eq!(node.label, "Server 1");
582        assert_eq!(
583            node.properties.get("ip"),
584            Some(&Value::String("10.0.0.1".into()))
585        );
586    }
587
588    #[test]
589    fn add_edge_creates_adjacency() {
590        let mut g = MaterializedGraph::new(test_ontology());
591        g.apply(&make_entry(
592            GraphOp::AddNode {
593                node_id: "svc".into(),
594                node_type: "entity".into(),
595                label: "svc".into(),
596                properties: BTreeMap::new(),
597                subtype: None,
598            },
599            1,
600            "inst-a",
601        ));
602        g.apply(&make_entry(
603            GraphOp::AddNode {
604                node_id: "srv".into(),
605                node_type: "entity".into(),
606                label: "srv".into(),
607                properties: BTreeMap::new(),
608                subtype: None,
609            },
610            2,
611            "inst-a",
612        ));
613        g.apply(&make_entry(
614            GraphOp::AddEdge {
615                edge_id: "e1".into(),
616                edge_type: "RUNS_ON".into(),
617                source_id: "svc".into(),
618                target_id: "srv".into(),
619                properties: BTreeMap::new(),
620            },
621            3,
622            "inst-a",
623        ));
624
625        // Both endpoints know about the edge.
626        let out = g.outgoing_edges("svc");
627        assert_eq!(out.len(), 1);
628        assert_eq!(out[0].target_id, "srv");
629
630        let inc = g.incoming_edges("srv");
631        assert_eq!(inc.len(), 1);
632        assert_eq!(inc[0].source_id, "svc");
633
634        assert_eq!(g.neighbors("svc"), vec!["srv"]);
635    }
636
637    #[test]
638    fn update_property_reflected() {
639        let mut g = MaterializedGraph::new(test_ontology());
640        g.apply(&make_entry(
641            GraphOp::AddNode {
642                node_id: "s1".into(),
643                node_type: "entity".into(),
644                label: "s1".into(),
645                properties: BTreeMap::new(),
646                subtype: None,
647            },
648            1,
649            "inst-a",
650        ));
651        g.apply(&make_entry(
652            GraphOp::UpdateProperty {
653                entity_id: "s1".into(),
654                key: "cpu".into(),
655                value: Value::Float(85.5),
656            },
657            2,
658            "inst-a",
659        ));
660
661        let node = g.get_node("s1").unwrap();
662        assert_eq!(node.properties.get("cpu"), Some(&Value::Float(85.5)));
663    }
664
665    #[test]
666    fn remove_node_cascades_edges() {
667        let mut g = MaterializedGraph::new(test_ontology());
668        g.apply(&make_entry(
669            GraphOp::AddNode {
670                node_id: "a".into(),
671                node_type: "entity".into(),
672                label: "a".into(),
673                properties: BTreeMap::new(),
674                subtype: None,
675            },
676            1,
677            "inst-a",
678        ));
679        g.apply(&make_entry(
680            GraphOp::AddNode {
681                node_id: "b".into(),
682                node_type: "entity".into(),
683                label: "b".into(),
684                properties: BTreeMap::new(),
685                subtype: None,
686            },
687            2,
688            "inst-a",
689        ));
690        g.apply(&make_entry(
691            GraphOp::AddEdge {
692                edge_id: "e1".into(),
693                edge_type: "RUNS_ON".into(),
694                source_id: "a".into(),
695                target_id: "b".into(),
696                properties: BTreeMap::new(),
697            },
698            3,
699            "inst-a",
700        ));
701        assert_eq!(g.all_edges().len(), 1);
702
703        // Remove node 'b' — edge becomes invisible (dangling target).
704        g.apply(&make_entry(
705            GraphOp::RemoveNode {
706                node_id: "b".into(),
707            },
708            4,
709            "inst-a",
710        ));
711        assert!(g.get_node("b").is_none());
712        // Edge still exists but not returned by all_edges (target tombstoned).
713        assert_eq!(g.all_edges().len(), 0);
714        // Outgoing from 'a' also filters out dangling edges.
715        assert_eq!(g.outgoing_edges("a").len(), 0);
716    }
717
718    #[test]
719    fn remove_edge_preserves_nodes() {
720        let mut g = MaterializedGraph::new(test_ontology());
721        g.apply(&make_entry(
722            GraphOp::AddNode {
723                node_id: "a".into(),
724                node_type: "entity".into(),
725                label: "a".into(),
726                properties: BTreeMap::new(),
727                subtype: None,
728            },
729            1,
730            "inst-a",
731        ));
732        g.apply(&make_entry(
733            GraphOp::AddNode {
734                node_id: "b".into(),
735                node_type: "entity".into(),
736                label: "b".into(),
737                properties: BTreeMap::new(),
738                subtype: None,
739            },
740            2,
741            "inst-a",
742        ));
743        g.apply(&make_entry(
744            GraphOp::AddEdge {
745                edge_id: "e1".into(),
746                edge_type: "RUNS_ON".into(),
747                source_id: "a".into(),
748                target_id: "b".into(),
749                properties: BTreeMap::new(),
750            },
751            3,
752            "inst-a",
753        ));
754        g.apply(&make_entry(
755            GraphOp::RemoveEdge {
756                edge_id: "e1".into(),
757            },
758            4,
759            "inst-a",
760        ));
761
762        // Nodes still exist.
763        assert!(g.get_node("a").is_some());
764        assert!(g.get_node("b").is_some());
765        // Edge is gone.
766        assert!(g.get_edge("e1").is_none());
767        assert_eq!(g.all_edges().len(), 0);
768    }
769
770    #[test]
771    fn query_by_type_filters() {
772        let mut g = MaterializedGraph::new(test_ontology());
773        g.apply(&make_entry(
774            GraphOp::AddNode {
775                node_id: "s1".into(),
776                node_type: "entity".into(),
777                label: "s1".into(),
778                properties: BTreeMap::new(),
779                subtype: None,
780            },
781            1,
782            "inst-a",
783        ));
784        g.apply(&make_entry(
785            GraphOp::AddNode {
786                node_id: "s2".into(),
787                node_type: "entity".into(),
788                label: "s2".into(),
789                properties: BTreeMap::new(),
790                subtype: None,
791            },
792            2,
793            "inst-a",
794        ));
795        g.apply(&make_entry(
796            GraphOp::AddNode {
797                node_id: "alert".into(),
798                node_type: "signal".into(),
799                label: "alert".into(),
800                properties: BTreeMap::new(),
801                subtype: None,
802            },
803            3,
804            "inst-a",
805        ));
806
807        let entities = g.nodes_by_type("entity");
808        assert_eq!(entities.len(), 2);
809        let signals = g.nodes_by_type("signal");
810        assert_eq!(signals.len(), 1);
811        assert_eq!(signals[0].node_id, "alert");
812    }
813
814    #[test]
815    fn query_by_property_filters() {
816        let mut g = MaterializedGraph::new(test_ontology());
817        g.apply(&make_entry(
818            GraphOp::AddNode {
819                node_id: "s1".into(),
820                node_type: "entity".into(),
821                label: "s1".into(),
822                properties: BTreeMap::from([("status".into(), Value::String("alive".into()))]),
823                subtype: None,
824            },
825            1,
826            "inst-a",
827        ));
828        g.apply(&make_entry(
829            GraphOp::AddNode {
830                node_id: "s2".into(),
831                node_type: "entity".into(),
832                label: "s2".into(),
833                properties: BTreeMap::from([("status".into(), Value::String("dead".into()))]),
834                subtype: None,
835            },
836            2,
837            "inst-a",
838        ));
839
840        let alive = g.nodes_by_property("status", &Value::String("alive".into()));
841        assert_eq!(alive.len(), 1);
842        assert_eq!(alive[0].node_id, "s1");
843    }
844
845    #[test]
846    fn materialization_from_empty() {
847        // Build graph incrementally.
848        let mut g1 = MaterializedGraph::new(test_ontology());
849        let entries = vec![
850            make_entry(
851                GraphOp::DefineOntology {
852                    ontology: test_ontology(),
853                },
854                0,
855                "inst-a",
856            ),
857            make_entry(
858                GraphOp::AddNode {
859                    node_id: "a".into(),
860                    node_type: "entity".into(),
861                    label: "a".into(),
862                    properties: BTreeMap::new(),
863                    subtype: None,
864                },
865                1,
866                "inst-a",
867            ),
868            make_entry(
869                GraphOp::AddNode {
870                    node_id: "b".into(),
871                    node_type: "entity".into(),
872                    label: "b".into(),
873                    properties: BTreeMap::new(),
874                    subtype: None,
875                },
876                2,
877                "inst-a",
878            ),
879            make_entry(
880                GraphOp::AddEdge {
881                    edge_id: "e1".into(),
882                    edge_type: "RUNS_ON".into(),
883                    source_id: "a".into(),
884                    target_id: "b".into(),
885                    properties: BTreeMap::new(),
886                },
887                3,
888                "inst-a",
889            ),
890        ];
891        for e in &entries {
892            g1.apply(e);
893        }
894
895        // Rebuild from scratch.
896        let mut g2 = MaterializedGraph::new(test_ontology());
897        let refs: Vec<&Entry> = entries.iter().collect();
898        g2.rebuild(&refs);
899
900        // Same result.
901        assert_eq!(g1.all_nodes().len(), g2.all_nodes().len());
902        assert_eq!(g1.all_edges().len(), g2.all_edges().len());
903        for node in g1.all_nodes() {
904            let n2 = g2.get_node(&node.node_id).unwrap();
905            assert_eq!(node.node_type, n2.node_type);
906            assert_eq!(node.properties, n2.properties);
907        }
908    }
909
910    #[test]
911    fn incremental_equals_full() {
912        let entries = vec![
913            make_entry(
914                GraphOp::DefineOntology {
915                    ontology: test_ontology(),
916                },
917                0,
918                "inst-a",
919            ),
920            make_entry(
921                GraphOp::AddNode {
922                    node_id: "a".into(),
923                    node_type: "entity".into(),
924                    label: "a".into(),
925                    properties: BTreeMap::from([("x".into(), Value::Int(1))]),
926                    subtype: None,
927                },
928                1,
929                "inst-a",
930            ),
931            make_entry(
932                GraphOp::UpdateProperty {
933                    entity_id: "a".into(),
934                    key: "x".into(),
935                    value: Value::Int(2),
936                },
937                2,
938                "inst-a",
939            ),
940            make_entry(
941                GraphOp::AddNode {
942                    node_id: "b".into(),
943                    node_type: "entity".into(),
944                    label: "b".into(),
945                    properties: BTreeMap::new(),
946                    subtype: None,
947                },
948                3,
949                "inst-a",
950            ),
951            make_entry(
952                GraphOp::AddEdge {
953                    edge_id: "e1".into(),
954                    edge_type: "RUNS_ON".into(),
955                    source_id: "a".into(),
956                    target_id: "b".into(),
957                    properties: BTreeMap::new(),
958                },
959                4,
960                "inst-a",
961            ),
962            make_entry(
963                GraphOp::RemoveEdge {
964                    edge_id: "e1".into(),
965                },
966                5,
967                "inst-a",
968            ),
969        ];
970
971        // Incremental.
972        let mut g_inc = MaterializedGraph::new(test_ontology());
973        for e in &entries {
974            g_inc.apply(e);
975        }
976
977        // Full replay.
978        let mut g_full = MaterializedGraph::new(test_ontology());
979        let refs: Vec<&Entry> = entries.iter().collect();
980        g_full.rebuild(&refs);
981
982        // Property should be 2 (updated).
983        assert_eq!(
984            g_inc.get_node("a").unwrap().properties.get("x"),
985            Some(&Value::Int(2))
986        );
987        assert_eq!(
988            g_full.get_node("a").unwrap().properties.get("x"),
989            Some(&Value::Int(2))
990        );
991        // Edge should be removed.
992        assert_eq!(g_inc.all_edges().len(), 0);
993        assert_eq!(g_full.all_edges().len(), 0);
994    }
995
996    #[test]
997    fn lww_concurrent_property_update() {
998        // Two instances update the same property — higher clock wins.
999        let mut g = MaterializedGraph::new(test_ontology());
1000        g.apply(&make_entry(
1001            GraphOp::AddNode {
1002                node_id: "s1".into(),
1003                node_type: "entity".into(),
1004                label: "s1".into(),
1005                properties: BTreeMap::new(),
1006                subtype: None,
1007            },
1008            1,
1009            "inst-a",
1010        ));
1011        // inst-a sets status=alive at time 2
1012        g.apply(&make_entry(
1013            GraphOp::UpdateProperty {
1014                entity_id: "s1".into(),
1015                key: "status".into(),
1016                value: Value::String("alive".into()),
1017            },
1018            2,
1019            "inst-a",
1020        ));
1021        // inst-b sets status=dead at time 3 — wins (higher clock)
1022        g.apply(&make_entry(
1023            GraphOp::UpdateProperty {
1024                entity_id: "s1".into(),
1025                key: "status".into(),
1026                value: Value::String("dead".into()),
1027            },
1028            3,
1029            "inst-b",
1030        ));
1031        assert_eq!(
1032            g.get_node("s1").unwrap().properties.get("status"),
1033            Some(&Value::String("dead".into()))
1034        );
1035    }
1036
1037    #[test]
1038    fn lww_tiebreak_by_instance_id() {
1039        // Same clock time — higher instance ID wins.
1040        let mut g = MaterializedGraph::new(test_ontology());
1041        g.apply(&make_entry(
1042            GraphOp::AddNode {
1043                node_id: "s1".into(),
1044                node_type: "entity".into(),
1045                label: "s1".into(),
1046                properties: BTreeMap::new(),
1047                subtype: None,
1048            },
1049            1,
1050            "inst-a",
1051        ));
1052        // Both at physical_ms=5, logical=0. Lower id wins → "inst-a" wins.
1053        g.apply(&make_entry(
1054            GraphOp::UpdateProperty {
1055                entity_id: "s1".into(),
1056                key: "x".into(),
1057                value: Value::Int(1),
1058            },
1059            5,
1060            "inst-a",
1061        ));
1062        g.apply(&make_entry(
1063            GraphOp::UpdateProperty {
1064                entity_id: "s1".into(),
1065                key: "x".into(),
1066                value: Value::Int(2),
1067            },
1068            5,
1069            "inst-b",
1070        ));
1071        // inst-a has lower id → wins the tiebreak → value stays Int(1).
1072        assert_eq!(
1073            g.get_node("s1").unwrap().properties.get("x"),
1074            Some(&Value::Int(1))
1075        );
1076    }
1077
1078    #[test]
1079    fn lww_per_property_concurrent_different_keys() {
1080        // Two instances concurrently update DIFFERENT properties at the same
1081        // clock time. Both updates must be accepted — they don't conflict.
1082        // This requires per-property LWW, not node-level LWW.
1083        let mut g = MaterializedGraph::new(test_ontology());
1084        g.apply(&make_entry(
1085            GraphOp::AddNode {
1086                node_id: "s1".into(),
1087                node_type: "entity".into(),
1088                label: "s1".into(),
1089                properties: BTreeMap::from([
1090                    ("x".into(), Value::Int(0)),
1091                    ("y".into(), Value::Int(0)),
1092                ]),
1093                subtype: None,
1094            },
1095            1,
1096            "inst-a",
1097        ));
1098        // inst-a updates "x" at time 3
1099        g.apply(&make_entry(
1100            GraphOp::UpdateProperty {
1101                entity_id: "s1".into(),
1102                key: "x".into(),
1103                value: Value::Int(42),
1104            },
1105            3,
1106            "inst-a",
1107        ));
1108        // inst-b updates "y" at time 3 (concurrent, different property)
1109        g.apply(&make_entry(
1110            GraphOp::UpdateProperty {
1111                entity_id: "s1".into(),
1112                key: "y".into(),
1113                value: Value::Int(99),
1114            },
1115            3,
1116            "inst-b",
1117        ));
1118
1119        let node = g.get_node("s1").unwrap();
1120        // Both updates must be applied — no conflict.
1121        assert_eq!(
1122            node.properties.get("x"),
1123            Some(&Value::Int(42)),
1124            "update to 'x' must not be rejected by concurrent update to 'y'"
1125        );
1126        assert_eq!(
1127            node.properties.get("y"),
1128            Some(&Value::Int(99)),
1129            "update to 'y' must not be rejected by concurrent update to 'x'"
1130        );
1131    }
1132
1133    #[test]
1134    fn lww_per_property_order_independent() {
1135        // Same scenario but applied in reverse order — result must be identical.
1136        let mut g = MaterializedGraph::new(test_ontology());
1137        g.apply(&make_entry(
1138            GraphOp::AddNode {
1139                node_id: "s1".into(),
1140                node_type: "entity".into(),
1141                label: "s1".into(),
1142                properties: BTreeMap::from([
1143                    ("x".into(), Value::Int(0)),
1144                    ("y".into(), Value::Int(0)),
1145                ]),
1146                subtype: None,
1147            },
1148            1,
1149            "inst-a",
1150        ));
1151        // Apply inst-b first this time
1152        g.apply(&make_entry(
1153            GraphOp::UpdateProperty {
1154                entity_id: "s1".into(),
1155                key: "y".into(),
1156                value: Value::Int(99),
1157            },
1158            3,
1159            "inst-b",
1160        ));
1161        g.apply(&make_entry(
1162            GraphOp::UpdateProperty {
1163                entity_id: "s1".into(),
1164                key: "x".into(),
1165                value: Value::Int(42),
1166            },
1167            3,
1168            "inst-a",
1169        ));
1170
1171        let node = g.get_node("s1").unwrap();
1172        assert_eq!(node.properties.get("x"), Some(&Value::Int(42)));
1173        assert_eq!(node.properties.get("y"), Some(&Value::Int(99)));
1174    }
1175
1176    #[test]
1177    fn add_wins_over_remove() {
1178        // Concurrent add + remove → node should exist (add-wins).
1179        let mut g = MaterializedGraph::new(test_ontology());
1180        g.apply(&make_entry(
1181            GraphOp::AddNode {
1182                node_id: "s1".into(),
1183                node_type: "entity".into(),
1184                label: "s1".into(),
1185                properties: BTreeMap::new(),
1186                subtype: None,
1187            },
1188            1,
1189            "inst-a",
1190        ));
1191        // Remove at time 2.
1192        g.apply(&make_entry(
1193            GraphOp::RemoveNode {
1194                node_id: "s1".into(),
1195            },
1196            2,
1197            "inst-a",
1198        ));
1199        assert!(g.get_node("s1").is_none());
1200
1201        // Re-add at time 3 (add-wins — resurrects).
1202        g.apply(&make_entry(
1203            GraphOp::AddNode {
1204                node_id: "s1".into(),
1205                node_type: "entity".into(),
1206                label: "s1 v2".into(),
1207                properties: BTreeMap::new(),
1208                subtype: None,
1209            },
1210            3,
1211            "inst-b",
1212        ));
1213        let node = g.get_node("s1").unwrap();
1214        assert_eq!(node.label, "s1 v2");
1215        assert!(!node.tombstoned);
1216    }
1217}