Skip to main content

silk/
graph.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2
3use crate::clock::LamportClock;
4use crate::entry::{Entry, GraphOp, Hash, Value};
5use crate::ontology::Ontology;
6
7/// A materialized node in the graph.
8#[derive(Debug, Clone, PartialEq)]
9pub struct Node {
10    pub node_id: String,
11    pub node_type: String,
12    pub subtype: Option<String>,
13    pub label: String,
14    pub properties: BTreeMap<String, Value>,
15    /// Per-property clocks for LWW conflict resolution.
16    /// Each property key tracks the clock of its last write, so
17    /// concurrent updates to different properties don't conflict.
18    pub property_clocks: HashMap<String, LamportClock>,
19    /// Clock of the entry that last modified this node.
20    /// Used for add-wins semantics and label LWW.
21    pub last_clock: LamportClock,
22    /// Clock of the most recent AddNode for this node.
23    /// Used for add-wins semantics: remove only wins if its clock
24    /// is strictly greater than last_add_clock.
25    pub last_add_clock: LamportClock,
26    /// Whether this node has been tombstoned (removed).
27    pub tombstoned: bool,
28}
29
30/// A materialized edge in the graph.
31#[derive(Debug, Clone, PartialEq)]
32pub struct Edge {
33    pub edge_id: String,
34    pub edge_type: String,
35    pub source_id: String,
36    pub target_id: String,
37    pub properties: BTreeMap<String, Value>,
38    /// Per-property clocks for LWW conflict resolution.
39    pub property_clocks: HashMap<String, LamportClock>,
40    pub last_clock: LamportClock,
41    /// Clock of the most recent AddEdge for this edge.
42    pub last_add_clock: LamportClock,
43    pub tombstoned: bool,
44}
45
46/// Materialized graph — derived from the op log.
47///
48/// Provides fast queries without replaying the full log.
49/// Updated incrementally as new entries arrive, or rebuilt
50/// from scratch by replaying the entire op log.
51///
52/// CRDT semantics:
53/// - **Add-wins** for topology (concurrent add + remove → node/edge exists)
54/// - **LWW** (Last-Writer-Wins) per property key (highest Lamport clock wins)
55/// - **Tombstones** for deletes (mark as deleted, don't physically remove)
56pub struct MaterializedGraph {
57    /// node_id → Node
58    pub nodes: HashMap<String, Node>,
59    /// edge_id → Edge
60    pub edges: HashMap<String, Edge>,
61    /// node_id → set of outgoing edge_ids
62    pub outgoing: HashMap<String, HashSet<String>>,
63    /// node_id → set of incoming edge_ids
64    pub incoming: HashMap<String, HashSet<String>>,
65    /// node_type → set of node_ids (type index)
66    pub by_type: HashMap<String, HashSet<String>>,
67    /// The ontology (for validation during materialization)
68    pub ontology: Ontology,
69    /// R-02: entries that failed ontology validation during apply().
70    /// These entries exist in the oplog (for CRDT convergence) but are
71    /// invisible in the materialized graph. Grow-only within a single
72    /// materialization pass. Cleared and rebuilt on `rebuild()` — this
73    /// allows previously-quarantined entries to be re-evaluated when the
74    /// ontology evolves (e.g., after ExtendOntology arrives via sync).
75    pub quarantined: HashSet<Hash>,
76}
77
78impl MaterializedGraph {
79    /// Create an empty materialized graph with the given ontology.
80    pub fn new(ontology: Ontology) -> Self {
81        Self {
82            nodes: HashMap::new(),
83            edges: HashMap::new(),
84            outgoing: HashMap::new(),
85            incoming: HashMap::new(),
86            by_type: HashMap::new(),
87            ontology,
88            quarantined: HashSet::new(),
89        }
90    }
91
92    /// Apply a single entry to the graph (incremental materialization).
93    ///
94    /// R-02: Validates AddNode/AddEdge payloads against the ontology.
95    /// Invalid entries are quarantined (added to `self.quarantined`) and
96    /// skipped for materialization. They remain in the oplog for CRDT
97    /// convergence — quarantine is a graph-layer concern, not an oplog concern.
98    pub fn apply(&mut self, entry: &Entry) {
99        match &entry.payload {
100            GraphOp::Checkpoint { ops, op_clocks, .. } => {
101                // R-08: Replay synthetic ops to restore graph state.
102                // Bug 6 fix: use per-op clocks (preserves LWW metadata).
103                for (i, op) in ops.iter().enumerate() {
104                    let clock = if i < op_clocks.len() {
105                        LamportClock::with_values(&entry.author, op_clocks[i].0, op_clocks[i].1)
106                    } else {
107                        entry.clock.clone() // fallback for old checkpoints without op_clocks
108                    };
109                    let synthetic = Entry::new(op.clone(), vec![], vec![], clock, &entry.author);
110                    self.apply(&synthetic);
111                }
112            }
113            GraphOp::DefineOntology { .. } => {
114                // Genesis — nothing to materialize.
115            }
116            GraphOp::ExtendOntology { extension } => {
117                if let Err(_e) = self.ontology.merge_extension(extension) {
118                    self.quarantined.insert(entry.hash);
119                }
120            }
121            GraphOp::AddNode {
122                node_id,
123                node_type,
124                subtype,
125                label,
126                properties,
127            } => {
128                // R-02: validate against ontology, quarantine if invalid
129                if let Err(_e) =
130                    self.ontology
131                        .validate_node(node_type, subtype.as_deref(), properties)
132                {
133                    self.quarantined.insert(entry.hash);
134                    return;
135                }
136                self.apply_add_node(
137                    node_id,
138                    node_type,
139                    subtype.as_deref(),
140                    label,
141                    properties,
142                    &entry.clock,
143                );
144            }
145            GraphOp::AddEdge {
146                edge_id,
147                edge_type,
148                source_id,
149                target_id,
150                properties,
151            } => {
152                // R-02: validate edge type exists.
153                if !self.ontology.edge_types.contains_key(edge_type.as_str()) {
154                    self.quarantined.insert(entry.hash);
155                    return;
156                }
157                // Bug 13 fix: validate source/target type constraints when both nodes
158                // are materialized. If one is missing (out-of-order sync), skip —
159                // validation happens on rebuild.
160                if let (Some(src), Some(tgt)) = (
161                    self.nodes.get(source_id.as_str()),
162                    self.nodes.get(target_id.as_str()),
163                ) {
164                    if self
165                        .ontology
166                        .validate_edge(edge_type, &src.node_type, &tgt.node_type, properties)
167                        .is_err()
168                    {
169                        self.quarantined.insert(entry.hash);
170                        return;
171                    }
172                }
173                self.apply_add_edge(
174                    edge_id,
175                    edge_type,
176                    source_id,
177                    target_id,
178                    properties,
179                    &entry.clock,
180                );
181            }
182            GraphOp::UpdateProperty {
183                entity_id,
184                key,
185                value,
186            } => {
187                self.apply_update_property(entity_id, key, value, &entry.clock);
188            }
189            GraphOp::RemoveNode { node_id } => {
190                self.apply_remove_node(node_id, &entry.clock);
191            }
192            GraphOp::RemoveEdge { edge_id } => {
193                self.apply_remove_edge(edge_id, &entry.clock);
194            }
195            GraphOp::DefineLens { .. } => {
196                // Reserved. No materialization — lenses are metadata, not graph state.
197            }
198        }
199    }
200
201    /// Apply a sequence of entries (full rematerialization from op log).
202    pub fn apply_all(&mut self, entries: &[&Entry]) {
203        for entry in entries {
204            self.apply(entry);
205        }
206    }
207
208    /// Rebuild from scratch: clear everything and replay all entries.
209    pub fn rebuild(&mut self, entries: &[&Entry]) {
210        self.nodes.clear();
211        self.edges.clear();
212        self.outgoing.clear();
213        self.incoming.clear();
214        self.by_type.clear();
215        self.quarantined.clear();
216        self.apply_all(entries);
217    }
218
219    // -- Queries --
220
221    /// Get a node by ID (returns None if not found or tombstoned).
222    pub fn get_node(&self, node_id: &str) -> Option<&Node> {
223        self.nodes.get(node_id).filter(|n| !n.tombstoned)
224    }
225
226    /// Get an edge by ID (returns None if not found or tombstoned).
227    pub fn get_edge(&self, edge_id: &str) -> Option<&Edge> {
228        self.edges.get(edge_id).filter(|e| !e.tombstoned)
229    }
230
231    /// Query all live nodes of a given type, including descendants (RDFS rdfs9).
232    /// If "entity" has children "server" and "project", querying "entity" returns all three.
233    pub fn nodes_by_type(&self, node_type: &str) -> Vec<&Node> {
234        let mut types = vec![node_type.to_string()];
235        types.extend(
236            self.ontology
237                .descendants(node_type)
238                .into_iter()
239                .map(|s| s.to_string()),
240        );
241        types
242            .iter()
243            .flat_map(|t| self.by_type.get(t.as_str()))
244            .flatten()
245            .filter_map(|id| self.get_node(id))
246            .collect()
247    }
248
249    /// Query all live nodes of a given subtype.
250    pub fn nodes_by_subtype(&self, subtype: &str) -> Vec<&Node> {
251        self.nodes
252            .values()
253            .filter(|n| !n.tombstoned && n.subtype.as_deref() == Some(subtype))
254            .collect()
255    }
256
257    /// Query nodes by a property value.
258    pub fn nodes_by_property(&self, key: &str, value: &Value) -> Vec<&Node> {
259        self.nodes
260            .values()
261            .filter(|n| !n.tombstoned && n.properties.get(key) == Some(value))
262            .collect()
263    }
264
265    /// Get outgoing edges for a node (only live edges with live endpoints).
266    pub fn outgoing_edges(&self, node_id: &str) -> Vec<&Edge> {
267        match self.outgoing.get(node_id) {
268            Some(edge_ids) => edge_ids
269                .iter()
270                .filter_map(|eid| self.get_edge(eid))
271                .filter(|e| self.is_node_live(&e.target_id))
272                .collect(),
273            None => vec![],
274        }
275    }
276
277    /// Get incoming edges for a node (only live edges with live endpoints).
278    pub fn incoming_edges(&self, node_id: &str) -> Vec<&Edge> {
279        match self.incoming.get(node_id) {
280            Some(edge_ids) => edge_ids
281                .iter()
282                .filter_map(|eid| self.get_edge(eid))
283                .filter(|e| self.is_node_live(&e.source_id))
284                .collect(),
285            None => vec![],
286        }
287    }
288
289    /// Approximate heap memory used by the materialized graph (bytes).
290    /// Uses fixed overhead estimates per node/edge. Does not account for heap
291    /// allocations behind String/Vec in property values or allocator fragmentation.
292    /// Actual memory may be 2-3x higher for string-heavy graphs.
293    pub fn estimated_memory_bytes(&self) -> usize {
294        let mut total = 0;
295        // Nodes: id string + type string + label + properties + clocks + overhead
296        for node in self.nodes.values() {
297            total += node.node_id.len() + node.node_type.len() + node.label.len();
298            total += node.subtype.as_ref().map_or(0, |s| s.len());
299            // Properties: key + estimated value size + clock per property
300            for (k, v) in &node.properties {
301                total += k.len() + std::mem::size_of_val(v) + 48; // key + value + clock overhead
302            }
303            total += 128; // fixed struct overhead (clocks, bools, HashMap shells)
304        }
305        // Edges: similar structure
306        for edge in self.edges.values() {
307            total += edge.edge_id.len() + edge.edge_type.len();
308            total += edge.source_id.len() + edge.target_id.len();
309            for (k, v) in &edge.properties {
310                total += k.len() + std::mem::size_of_val(v) + 48;
311            }
312            total += 128;
313        }
314        // Adjacency indexes: outgoing + incoming (id strings + HashSet overhead)
315        for (k, set) in &self.outgoing {
316            total += k.len() + set.len() * 32;
317        }
318        for (k, set) in &self.incoming {
319            total += k.len() + set.len() * 32;
320        }
321        // Type index
322        for (k, set) in &self.by_type {
323            total += k.len() + set.len() * 32;
324        }
325        // Quarantine set
326        total += self.quarantined.len() * 48;
327        total
328    }
329
330    /// All live nodes.
331    pub fn all_nodes(&self) -> Vec<&Node> {
332        self.nodes.values().filter(|n| !n.tombstoned).collect()
333    }
334
335    /// All live edges (with live endpoints).
336    pub fn all_edges(&self) -> Vec<&Edge> {
337        self.edges
338            .values()
339            .filter(|e| {
340                !e.tombstoned && self.is_node_live(&e.source_id) && self.is_node_live(&e.target_id)
341            })
342            .collect()
343    }
344
345    /// Neighbors of a node (connected via outgoing edges).
346    pub fn neighbors(&self, node_id: &str) -> Vec<&str> {
347        self.outgoing_edges(node_id)
348            .iter()
349            .map(|e| e.target_id.as_str())
350            .collect()
351    }
352
353    /// Reverse neighbors (connected via incoming edges).
354    pub fn reverse_neighbors(&self, node_id: &str) -> Vec<&str> {
355        self.incoming_edges(node_id)
356            .iter()
357            .map(|e| e.source_id.as_str())
358            .collect()
359    }
360
361    // -- CRDT application helpers --
362
363    fn apply_add_node(
364        &mut self,
365        node_id: &str,
366        node_type: &str,
367        subtype: Option<&str>,
368        label: &str,
369        properties: &BTreeMap<String, Value>,
370        clock: &LamportClock,
371    ) {
372        if let Some(existing) = self.nodes.get_mut(node_id) {
373            // Add-wins: always resurrect from tombstone.
374            existing.tombstoned = false;
375            // Track the latest add clock for add-wins semantics.
376            if clock_wins(clock, &existing.last_add_clock) {
377                existing.last_add_clock = clock.clone();
378            }
379            // LWW merge for label, subtype, and properties.
380            if clock_wins(clock, &existing.last_clock) {
381                existing.label = label.to_string();
382                existing.subtype = subtype.map(|s| s.to_string());
383                existing.last_clock = clock.clone();
384            }
385            merge_properties_lww(
386                &mut existing.properties,
387                &mut existing.property_clocks,
388                properties,
389                clock,
390            );
391        } else {
392            let property_clocks: HashMap<String, LamportClock> = properties
393                .keys()
394                .map(|k| (k.clone(), clock.clone()))
395                .collect();
396            let node = Node {
397                node_id: node_id.to_string(),
398                node_type: node_type.to_string(),
399                subtype: subtype.map(|s| s.to_string()),
400                label: label.to_string(),
401                properties: properties.clone(),
402                property_clocks,
403                last_clock: clock.clone(),
404                last_add_clock: clock.clone(),
405                tombstoned: false,
406            };
407            self.by_type
408                .entry(node_type.to_string())
409                .or_default()
410                .insert(node_id.to_string());
411            self.nodes.insert(node_id.to_string(), node);
412        }
413    }
414
415    fn apply_add_edge(
416        &mut self,
417        edge_id: &str,
418        edge_type: &str,
419        source_id: &str,
420        target_id: &str,
421        properties: &BTreeMap<String, Value>,
422        clock: &LamportClock,
423    ) {
424        if let Some(existing) = self.edges.get_mut(edge_id) {
425            // Add-wins: always resurrect if tombstoned.
426            existing.tombstoned = false;
427            if clock_wins(clock, &existing.last_add_clock) {
428                existing.last_add_clock = clock.clone();
429            }
430            if clock_wins(clock, &existing.last_clock) {
431                existing.last_clock = clock.clone();
432            }
433            merge_properties_lww(
434                &mut existing.properties,
435                &mut existing.property_clocks,
436                properties,
437                clock,
438            );
439        } else {
440            let property_clocks: HashMap<String, LamportClock> = properties
441                .keys()
442                .map(|k| (k.clone(), clock.clone()))
443                .collect();
444            let edge = Edge {
445                edge_id: edge_id.to_string(),
446                edge_type: edge_type.to_string(),
447                source_id: source_id.to_string(),
448                target_id: target_id.to_string(),
449                properties: properties.clone(),
450                property_clocks,
451                last_clock: clock.clone(),
452                last_add_clock: clock.clone(),
453                tombstoned: false,
454            };
455            self.outgoing
456                .entry(source_id.to_string())
457                .or_default()
458                .insert(edge_id.to_string());
459            self.incoming
460                .entry(target_id.to_string())
461                .or_default()
462                .insert(edge_id.to_string());
463            self.edges.insert(edge_id.to_string(), edge);
464        }
465    }
466
467    fn apply_update_property(
468        &mut self,
469        entity_id: &str,
470        key: &str,
471        value: &Value,
472        clock: &LamportClock,
473    ) {
474        // Try node first, then edge. Per-property LWW: each key competes
475        // only with other writes to the same key, not the entire entity.
476        if let Some(node) = self.nodes.get_mut(entity_id) {
477            let dominated = node
478                .property_clocks
479                .get(key)
480                .map(|c| clock_wins(clock, c))
481                .unwrap_or(true);
482            if dominated {
483                node.properties.insert(key.to_string(), value.clone());
484                node.property_clocks.insert(key.to_string(), clock.clone());
485            }
486            // Update entity-level clock for add-wins tracking.
487            if clock_wins(clock, &node.last_clock) {
488                node.last_clock = clock.clone();
489            }
490        } else if let Some(edge) = self.edges.get_mut(entity_id) {
491            let dominated = edge
492                .property_clocks
493                .get(key)
494                .map(|c| clock_wins(clock, c))
495                .unwrap_or(true);
496            if dominated {
497                edge.properties.insert(key.to_string(), value.clone());
498                edge.property_clocks.insert(key.to_string(), clock.clone());
499            }
500            if clock_wins(clock, &edge.last_clock) {
501                edge.last_clock = clock.clone();
502            }
503        }
504        // If entity not found, silently ignore (may arrive out of order in sync).
505    }
506
507    fn apply_remove_node(&mut self, node_id: &str, clock: &LamportClock) {
508        if let Some(node) = self.nodes.get_mut(node_id) {
509            // Add-wins: only tombstone if the remove clock is strictly greater
510            // than the last add clock. If a concurrent (or later) add exists,
511            // the node stays alive.
512            if clock_wins(clock, &node.last_add_clock) {
513                node.tombstoned = true;
514                node.last_clock = clock.clone();
515            }
516        }
517        // Tombstoning a node doesn't physically remove edges — they just become
518        // invisible via is_node_live() checks in queries.
519    }
520
521    fn apply_remove_edge(&mut self, edge_id: &str, clock: &LamportClock) {
522        if let Some(edge) = self.edges.get_mut(edge_id) {
523            // Add-wins: only tombstone if remove clock > last add clock.
524            if clock_wins(clock, &edge.last_add_clock) {
525                edge.tombstoned = true;
526                edge.last_clock = clock.clone();
527            }
528        }
529    }
530
531    fn is_node_live(&self, node_id: &str) -> bool {
532        self.nodes
533            .get(node_id)
534            .map(|n| !n.tombstoned)
535            .unwrap_or(false)
536    }
537}
538
539/// Per-property LWW merge: each property from `new_props` competes with
540/// existing properties. Higher clock wins per key.
541fn merge_properties_lww(
542    existing_props: &mut BTreeMap<String, Value>,
543    existing_clocks: &mut HashMap<String, LamportClock>,
544    new_props: &BTreeMap<String, Value>,
545    clock: &LamportClock,
546) {
547    for (k, v) in new_props {
548        let dominated = existing_clocks
549            .get(k)
550            .map(|c| clock_wins(clock, c))
551            .unwrap_or(true);
552        if dominated {
553            existing_props.insert(k.clone(), v.clone());
554            existing_clocks.insert(k.clone(), clock.clone());
555        }
556    }
557}
558
559/// LWW comparison: returns true if `new_clock` wins over `existing_clock`.
560/// Uses HybridClock total ordering: (physical_ms, logical, id).
561fn clock_wins(new_clock: &LamportClock, existing_clock: &LamportClock) -> bool {
562    new_clock.cmp_order(existing_clock) == std::cmp::Ordering::Greater
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568    use crate::entry::Entry;
569    use crate::ontology::{EdgeTypeDef, NodeTypeDef};
570
571    fn test_ontology() -> Ontology {
572        Ontology {
573            node_types: BTreeMap::from([
574                (
575                    "entity".into(),
576                    NodeTypeDef {
577                        description: None,
578                        properties: BTreeMap::new(),
579                        subtypes: None,
580                        parent_type: None,
581                    },
582                ),
583                (
584                    "signal".into(),
585                    NodeTypeDef {
586                        description: None,
587                        properties: BTreeMap::new(),
588                        subtypes: None,
589                        parent_type: None,
590                    },
591                ),
592            ]),
593            edge_types: BTreeMap::from([
594                (
595                    "RUNS_ON".into(),
596                    EdgeTypeDef {
597                        description: None,
598                        source_types: vec!["entity".into()],
599                        target_types: vec!["entity".into()],
600                        properties: BTreeMap::new(),
601                    },
602                ),
603                (
604                    "OBSERVES".into(),
605                    EdgeTypeDef {
606                        description: None,
607                        source_types: vec!["signal".into()],
608                        target_types: vec!["entity".into()],
609                        properties: BTreeMap::new(),
610                    },
611                ),
612            ]),
613        }
614    }
615
616    fn make_entry(op: GraphOp, clock_time: u64, author: &str) -> Entry {
617        Entry::new(
618            op,
619            vec![],
620            vec![],
621            LamportClock::with_values(author, clock_time, 0),
622            author,
623        )
624    }
625
626    // -- test_graph.rs spec from docs/silk.md --
627
628    #[test]
629    fn add_node_appears_in_query() {
630        let mut g = MaterializedGraph::new(test_ontology());
631        let entry = make_entry(
632            GraphOp::AddNode {
633                node_id: "server-1".into(),
634                node_type: "entity".into(),
635                label: "Server 1".into(),
636                properties: BTreeMap::from([("ip".into(), Value::String("10.0.0.1".into()))]),
637                subtype: None,
638            },
639            1,
640            "inst-a",
641        );
642        g.apply(&entry);
643
644        let node = g.get_node("server-1").unwrap();
645        assert_eq!(node.node_type, "entity");
646        assert_eq!(node.label, "Server 1");
647        assert_eq!(
648            node.properties.get("ip"),
649            Some(&Value::String("10.0.0.1".into()))
650        );
651    }
652
653    #[test]
654    fn add_edge_creates_adjacency() {
655        let mut g = MaterializedGraph::new(test_ontology());
656        g.apply(&make_entry(
657            GraphOp::AddNode {
658                node_id: "svc".into(),
659                node_type: "entity".into(),
660                label: "svc".into(),
661                properties: BTreeMap::new(),
662                subtype: None,
663            },
664            1,
665            "inst-a",
666        ));
667        g.apply(&make_entry(
668            GraphOp::AddNode {
669                node_id: "srv".into(),
670                node_type: "entity".into(),
671                label: "srv".into(),
672                properties: BTreeMap::new(),
673                subtype: None,
674            },
675            2,
676            "inst-a",
677        ));
678        g.apply(&make_entry(
679            GraphOp::AddEdge {
680                edge_id: "e1".into(),
681                edge_type: "RUNS_ON".into(),
682                source_id: "svc".into(),
683                target_id: "srv".into(),
684                properties: BTreeMap::new(),
685            },
686            3,
687            "inst-a",
688        ));
689
690        // Both endpoints know about the edge.
691        let out = g.outgoing_edges("svc");
692        assert_eq!(out.len(), 1);
693        assert_eq!(out[0].target_id, "srv");
694
695        let inc = g.incoming_edges("srv");
696        assert_eq!(inc.len(), 1);
697        assert_eq!(inc[0].source_id, "svc");
698
699        assert_eq!(g.neighbors("svc"), vec!["srv"]);
700    }
701
702    #[test]
703    fn update_property_reflected() {
704        let mut g = MaterializedGraph::new(test_ontology());
705        g.apply(&make_entry(
706            GraphOp::AddNode {
707                node_id: "s1".into(),
708                node_type: "entity".into(),
709                label: "s1".into(),
710                properties: BTreeMap::new(),
711                subtype: None,
712            },
713            1,
714            "inst-a",
715        ));
716        g.apply(&make_entry(
717            GraphOp::UpdateProperty {
718                entity_id: "s1".into(),
719                key: "cpu".into(),
720                value: Value::Float(85.5),
721            },
722            2,
723            "inst-a",
724        ));
725
726        let node = g.get_node("s1").unwrap();
727        assert_eq!(node.properties.get("cpu"), Some(&Value::Float(85.5)));
728    }
729
730    #[test]
731    fn remove_node_cascades_edges() {
732        let mut g = MaterializedGraph::new(test_ontology());
733        g.apply(&make_entry(
734            GraphOp::AddNode {
735                node_id: "a".into(),
736                node_type: "entity".into(),
737                label: "a".into(),
738                properties: BTreeMap::new(),
739                subtype: None,
740            },
741            1,
742            "inst-a",
743        ));
744        g.apply(&make_entry(
745            GraphOp::AddNode {
746                node_id: "b".into(),
747                node_type: "entity".into(),
748                label: "b".into(),
749                properties: BTreeMap::new(),
750                subtype: None,
751            },
752            2,
753            "inst-a",
754        ));
755        g.apply(&make_entry(
756            GraphOp::AddEdge {
757                edge_id: "e1".into(),
758                edge_type: "RUNS_ON".into(),
759                source_id: "a".into(),
760                target_id: "b".into(),
761                properties: BTreeMap::new(),
762            },
763            3,
764            "inst-a",
765        ));
766        assert_eq!(g.all_edges().len(), 1);
767
768        // Remove node 'b' — edge becomes invisible (dangling target).
769        g.apply(&make_entry(
770            GraphOp::RemoveNode {
771                node_id: "b".into(),
772            },
773            4,
774            "inst-a",
775        ));
776        assert!(g.get_node("b").is_none());
777        // Edge still exists but not returned by all_edges (target tombstoned).
778        assert_eq!(g.all_edges().len(), 0);
779        // Outgoing from 'a' also filters out dangling edges.
780        assert_eq!(g.outgoing_edges("a").len(), 0);
781    }
782
783    #[test]
784    fn remove_edge_preserves_nodes() {
785        let mut g = MaterializedGraph::new(test_ontology());
786        g.apply(&make_entry(
787            GraphOp::AddNode {
788                node_id: "a".into(),
789                node_type: "entity".into(),
790                label: "a".into(),
791                properties: BTreeMap::new(),
792                subtype: None,
793            },
794            1,
795            "inst-a",
796        ));
797        g.apply(&make_entry(
798            GraphOp::AddNode {
799                node_id: "b".into(),
800                node_type: "entity".into(),
801                label: "b".into(),
802                properties: BTreeMap::new(),
803                subtype: None,
804            },
805            2,
806            "inst-a",
807        ));
808        g.apply(&make_entry(
809            GraphOp::AddEdge {
810                edge_id: "e1".into(),
811                edge_type: "RUNS_ON".into(),
812                source_id: "a".into(),
813                target_id: "b".into(),
814                properties: BTreeMap::new(),
815            },
816            3,
817            "inst-a",
818        ));
819        g.apply(&make_entry(
820            GraphOp::RemoveEdge {
821                edge_id: "e1".into(),
822            },
823            4,
824            "inst-a",
825        ));
826
827        // Nodes still exist.
828        assert!(g.get_node("a").is_some());
829        assert!(g.get_node("b").is_some());
830        // Edge is gone.
831        assert!(g.get_edge("e1").is_none());
832        assert_eq!(g.all_edges().len(), 0);
833    }
834
835    #[test]
836    fn query_by_type_filters() {
837        let mut g = MaterializedGraph::new(test_ontology());
838        g.apply(&make_entry(
839            GraphOp::AddNode {
840                node_id: "s1".into(),
841                node_type: "entity".into(),
842                label: "s1".into(),
843                properties: BTreeMap::new(),
844                subtype: None,
845            },
846            1,
847            "inst-a",
848        ));
849        g.apply(&make_entry(
850            GraphOp::AddNode {
851                node_id: "s2".into(),
852                node_type: "entity".into(),
853                label: "s2".into(),
854                properties: BTreeMap::new(),
855                subtype: None,
856            },
857            2,
858            "inst-a",
859        ));
860        g.apply(&make_entry(
861            GraphOp::AddNode {
862                node_id: "alert".into(),
863                node_type: "signal".into(),
864                label: "alert".into(),
865                properties: BTreeMap::new(),
866                subtype: None,
867            },
868            3,
869            "inst-a",
870        ));
871
872        let entities = g.nodes_by_type("entity");
873        assert_eq!(entities.len(), 2);
874        let signals = g.nodes_by_type("signal");
875        assert_eq!(signals.len(), 1);
876        assert_eq!(signals[0].node_id, "alert");
877    }
878
879    #[test]
880    fn query_by_property_filters() {
881        let mut g = MaterializedGraph::new(test_ontology());
882        g.apply(&make_entry(
883            GraphOp::AddNode {
884                node_id: "s1".into(),
885                node_type: "entity".into(),
886                label: "s1".into(),
887                properties: BTreeMap::from([("status".into(), Value::String("alive".into()))]),
888                subtype: None,
889            },
890            1,
891            "inst-a",
892        ));
893        g.apply(&make_entry(
894            GraphOp::AddNode {
895                node_id: "s2".into(),
896                node_type: "entity".into(),
897                label: "s2".into(),
898                properties: BTreeMap::from([("status".into(), Value::String("dead".into()))]),
899                subtype: None,
900            },
901            2,
902            "inst-a",
903        ));
904
905        let alive = g.nodes_by_property("status", &Value::String("alive".into()));
906        assert_eq!(alive.len(), 1);
907        assert_eq!(alive[0].node_id, "s1");
908    }
909
910    #[test]
911    fn materialization_from_empty() {
912        // Build graph incrementally.
913        let mut g1 = MaterializedGraph::new(test_ontology());
914        let entries = vec![
915            make_entry(
916                GraphOp::DefineOntology {
917                    ontology: test_ontology(),
918                },
919                0,
920                "inst-a",
921            ),
922            make_entry(
923                GraphOp::AddNode {
924                    node_id: "a".into(),
925                    node_type: "entity".into(),
926                    label: "a".into(),
927                    properties: BTreeMap::new(),
928                    subtype: None,
929                },
930                1,
931                "inst-a",
932            ),
933            make_entry(
934                GraphOp::AddNode {
935                    node_id: "b".into(),
936                    node_type: "entity".into(),
937                    label: "b".into(),
938                    properties: BTreeMap::new(),
939                    subtype: None,
940                },
941                2,
942                "inst-a",
943            ),
944            make_entry(
945                GraphOp::AddEdge {
946                    edge_id: "e1".into(),
947                    edge_type: "RUNS_ON".into(),
948                    source_id: "a".into(),
949                    target_id: "b".into(),
950                    properties: BTreeMap::new(),
951                },
952                3,
953                "inst-a",
954            ),
955        ];
956        for e in &entries {
957            g1.apply(e);
958        }
959
960        // Rebuild from scratch.
961        let mut g2 = MaterializedGraph::new(test_ontology());
962        let refs: Vec<&Entry> = entries.iter().collect();
963        g2.rebuild(&refs);
964
965        // Same result.
966        assert_eq!(g1.all_nodes().len(), g2.all_nodes().len());
967        assert_eq!(g1.all_edges().len(), g2.all_edges().len());
968        for node in g1.all_nodes() {
969            let n2 = g2.get_node(&node.node_id).unwrap();
970            assert_eq!(node.node_type, n2.node_type);
971            assert_eq!(node.properties, n2.properties);
972        }
973    }
974
975    #[test]
976    fn incremental_equals_full() {
977        let entries = vec![
978            make_entry(
979                GraphOp::DefineOntology {
980                    ontology: test_ontology(),
981                },
982                0,
983                "inst-a",
984            ),
985            make_entry(
986                GraphOp::AddNode {
987                    node_id: "a".into(),
988                    node_type: "entity".into(),
989                    label: "a".into(),
990                    properties: BTreeMap::from([("x".into(), Value::Int(1))]),
991                    subtype: None,
992                },
993                1,
994                "inst-a",
995            ),
996            make_entry(
997                GraphOp::UpdateProperty {
998                    entity_id: "a".into(),
999                    key: "x".into(),
1000                    value: Value::Int(2),
1001                },
1002                2,
1003                "inst-a",
1004            ),
1005            make_entry(
1006                GraphOp::AddNode {
1007                    node_id: "b".into(),
1008                    node_type: "entity".into(),
1009                    label: "b".into(),
1010                    properties: BTreeMap::new(),
1011                    subtype: None,
1012                },
1013                3,
1014                "inst-a",
1015            ),
1016            make_entry(
1017                GraphOp::AddEdge {
1018                    edge_id: "e1".into(),
1019                    edge_type: "RUNS_ON".into(),
1020                    source_id: "a".into(),
1021                    target_id: "b".into(),
1022                    properties: BTreeMap::new(),
1023                },
1024                4,
1025                "inst-a",
1026            ),
1027            make_entry(
1028                GraphOp::RemoveEdge {
1029                    edge_id: "e1".into(),
1030                },
1031                5,
1032                "inst-a",
1033            ),
1034        ];
1035
1036        // Incremental.
1037        let mut g_inc = MaterializedGraph::new(test_ontology());
1038        for e in &entries {
1039            g_inc.apply(e);
1040        }
1041
1042        // Full replay.
1043        let mut g_full = MaterializedGraph::new(test_ontology());
1044        let refs: Vec<&Entry> = entries.iter().collect();
1045        g_full.rebuild(&refs);
1046
1047        // Property should be 2 (updated).
1048        assert_eq!(
1049            g_inc.get_node("a").unwrap().properties.get("x"),
1050            Some(&Value::Int(2))
1051        );
1052        assert_eq!(
1053            g_full.get_node("a").unwrap().properties.get("x"),
1054            Some(&Value::Int(2))
1055        );
1056        // Edge should be removed.
1057        assert_eq!(g_inc.all_edges().len(), 0);
1058        assert_eq!(g_full.all_edges().len(), 0);
1059    }
1060
1061    #[test]
1062    fn lww_concurrent_property_update() {
1063        // Two instances update the same property — higher clock wins.
1064        let mut g = MaterializedGraph::new(test_ontology());
1065        g.apply(&make_entry(
1066            GraphOp::AddNode {
1067                node_id: "s1".into(),
1068                node_type: "entity".into(),
1069                label: "s1".into(),
1070                properties: BTreeMap::new(),
1071                subtype: None,
1072            },
1073            1,
1074            "inst-a",
1075        ));
1076        // inst-a sets status=alive at time 2
1077        g.apply(&make_entry(
1078            GraphOp::UpdateProperty {
1079                entity_id: "s1".into(),
1080                key: "status".into(),
1081                value: Value::String("alive".into()),
1082            },
1083            2,
1084            "inst-a",
1085        ));
1086        // inst-b sets status=dead at time 3 — wins (higher clock)
1087        g.apply(&make_entry(
1088            GraphOp::UpdateProperty {
1089                entity_id: "s1".into(),
1090                key: "status".into(),
1091                value: Value::String("dead".into()),
1092            },
1093            3,
1094            "inst-b",
1095        ));
1096        assert_eq!(
1097            g.get_node("s1").unwrap().properties.get("status"),
1098            Some(&Value::String("dead".into()))
1099        );
1100    }
1101
1102    #[test]
1103    fn lww_tiebreak_by_instance_id() {
1104        // Same clock time — higher instance ID wins.
1105        let mut g = MaterializedGraph::new(test_ontology());
1106        g.apply(&make_entry(
1107            GraphOp::AddNode {
1108                node_id: "s1".into(),
1109                node_type: "entity".into(),
1110                label: "s1".into(),
1111                properties: BTreeMap::new(),
1112                subtype: None,
1113            },
1114            1,
1115            "inst-a",
1116        ));
1117        // Both at physical_ms=5, logical=0. Lower id wins → "inst-a" wins.
1118        g.apply(&make_entry(
1119            GraphOp::UpdateProperty {
1120                entity_id: "s1".into(),
1121                key: "x".into(),
1122                value: Value::Int(1),
1123            },
1124            5,
1125            "inst-a",
1126        ));
1127        g.apply(&make_entry(
1128            GraphOp::UpdateProperty {
1129                entity_id: "s1".into(),
1130                key: "x".into(),
1131                value: Value::Int(2),
1132            },
1133            5,
1134            "inst-b",
1135        ));
1136        // inst-a has lower id → wins the tiebreak → value stays Int(1).
1137        assert_eq!(
1138            g.get_node("s1").unwrap().properties.get("x"),
1139            Some(&Value::Int(1))
1140        );
1141    }
1142
1143    #[test]
1144    fn lww_per_property_concurrent_different_keys() {
1145        // Two instances concurrently update DIFFERENT properties at the same
1146        // clock time. Both updates must be accepted — they don't conflict.
1147        // This requires per-property LWW, not node-level LWW.
1148        let mut g = MaterializedGraph::new(test_ontology());
1149        g.apply(&make_entry(
1150            GraphOp::AddNode {
1151                node_id: "s1".into(),
1152                node_type: "entity".into(),
1153                label: "s1".into(),
1154                properties: BTreeMap::from([
1155                    ("x".into(), Value::Int(0)),
1156                    ("y".into(), Value::Int(0)),
1157                ]),
1158                subtype: None,
1159            },
1160            1,
1161            "inst-a",
1162        ));
1163        // inst-a updates "x" at time 3
1164        g.apply(&make_entry(
1165            GraphOp::UpdateProperty {
1166                entity_id: "s1".into(),
1167                key: "x".into(),
1168                value: Value::Int(42),
1169            },
1170            3,
1171            "inst-a",
1172        ));
1173        // inst-b updates "y" at time 3 (concurrent, different property)
1174        g.apply(&make_entry(
1175            GraphOp::UpdateProperty {
1176                entity_id: "s1".into(),
1177                key: "y".into(),
1178                value: Value::Int(99),
1179            },
1180            3,
1181            "inst-b",
1182        ));
1183
1184        let node = g.get_node("s1").unwrap();
1185        // Both updates must be applied — no conflict.
1186        assert_eq!(
1187            node.properties.get("x"),
1188            Some(&Value::Int(42)),
1189            "update to 'x' must not be rejected by concurrent update to 'y'"
1190        );
1191        assert_eq!(
1192            node.properties.get("y"),
1193            Some(&Value::Int(99)),
1194            "update to 'y' must not be rejected by concurrent update to 'x'"
1195        );
1196    }
1197
1198    #[test]
1199    fn lww_per_property_order_independent() {
1200        // Same scenario but applied in reverse order — result must be identical.
1201        let mut g = MaterializedGraph::new(test_ontology());
1202        g.apply(&make_entry(
1203            GraphOp::AddNode {
1204                node_id: "s1".into(),
1205                node_type: "entity".into(),
1206                label: "s1".into(),
1207                properties: BTreeMap::from([
1208                    ("x".into(), Value::Int(0)),
1209                    ("y".into(), Value::Int(0)),
1210                ]),
1211                subtype: None,
1212            },
1213            1,
1214            "inst-a",
1215        ));
1216        // Apply inst-b first this time
1217        g.apply(&make_entry(
1218            GraphOp::UpdateProperty {
1219                entity_id: "s1".into(),
1220                key: "y".into(),
1221                value: Value::Int(99),
1222            },
1223            3,
1224            "inst-b",
1225        ));
1226        g.apply(&make_entry(
1227            GraphOp::UpdateProperty {
1228                entity_id: "s1".into(),
1229                key: "x".into(),
1230                value: Value::Int(42),
1231            },
1232            3,
1233            "inst-a",
1234        ));
1235
1236        let node = g.get_node("s1").unwrap();
1237        assert_eq!(node.properties.get("x"), Some(&Value::Int(42)));
1238        assert_eq!(node.properties.get("y"), Some(&Value::Int(99)));
1239    }
1240
1241    #[test]
1242    fn add_wins_over_remove() {
1243        // Concurrent add + remove → node should exist (add-wins).
1244        let mut g = MaterializedGraph::new(test_ontology());
1245        g.apply(&make_entry(
1246            GraphOp::AddNode {
1247                node_id: "s1".into(),
1248                node_type: "entity".into(),
1249                label: "s1".into(),
1250                properties: BTreeMap::new(),
1251                subtype: None,
1252            },
1253            1,
1254            "inst-a",
1255        ));
1256        // Remove at time 2.
1257        g.apply(&make_entry(
1258            GraphOp::RemoveNode {
1259                node_id: "s1".into(),
1260            },
1261            2,
1262            "inst-a",
1263        ));
1264        assert!(g.get_node("s1").is_none());
1265
1266        // Re-add at time 3 (add-wins — resurrects).
1267        g.apply(&make_entry(
1268            GraphOp::AddNode {
1269                node_id: "s1".into(),
1270                node_type: "entity".into(),
1271                label: "s1 v2".into(),
1272                properties: BTreeMap::new(),
1273                subtype: None,
1274            },
1275            3,
1276            "inst-b",
1277        ));
1278        let node = g.get_node("s1").unwrap();
1279        assert_eq!(node.label, "s1 v2");
1280        assert!(!node.tombstoned);
1281    }
1282}