Skip to main content

silk/
graph.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2
3use crate::clock::LamportClock;
4use crate::entry::{Entry, GraphOp, Hash, Value};
5use crate::ontology::Ontology;
6
7/// A materialized node in the graph.
8#[derive(Debug, Clone, PartialEq)]
9pub struct Node {
10    pub node_id: String,
11    pub node_type: String,
12    pub subtype: Option<String>,
13    pub label: String,
14    pub properties: BTreeMap<String, Value>,
15    /// Per-property clocks for LWW conflict resolution.
16    /// Each property key tracks the clock of its last write, so
17    /// concurrent updates to different properties don't conflict.
18    pub property_clocks: HashMap<String, LamportClock>,
19    /// Clock of the entry that last modified this node.
20    /// Used for add-wins semantics and label LWW.
21    pub last_clock: LamportClock,
22    /// Clock of the most recent AddNode for this node.
23    /// Used for add-wins semantics: remove only wins if its clock
24    /// is strictly greater than last_add_clock.
25    pub last_add_clock: LamportClock,
26    /// Whether this node has been tombstoned (removed).
27    pub tombstoned: bool,
28}
29
30/// A materialized edge in the graph.
31#[derive(Debug, Clone, PartialEq)]
32pub struct Edge {
33    pub edge_id: String,
34    pub edge_type: String,
35    pub source_id: String,
36    pub target_id: String,
37    pub properties: BTreeMap<String, Value>,
38    /// Per-property clocks for LWW conflict resolution.
39    pub property_clocks: HashMap<String, LamportClock>,
40    pub last_clock: LamportClock,
41    /// Clock of the most recent AddEdge for this edge.
42    pub last_add_clock: LamportClock,
43    pub tombstoned: bool,
44}
45
46/// Materialized graph — derived from the op log.
47///
48/// Provides fast queries without replaying the full log.
49/// Updated incrementally as new entries arrive, or rebuilt
50/// from scratch by replaying the entire op log.
51///
52/// CRDT semantics:
53/// - **Add-wins** for topology (concurrent add + remove → node/edge exists)
54/// - **LWW** (Last-Writer-Wins) per property key (highest Lamport clock wins)
55/// - **Tombstones** for deletes (mark as deleted, don't physically remove)
56pub struct MaterializedGraph {
57    /// node_id → Node
58    pub nodes: HashMap<String, Node>,
59    /// edge_id → Edge
60    pub edges: HashMap<String, Edge>,
61    /// node_id → set of outgoing edge_ids
62    pub outgoing: HashMap<String, HashSet<String>>,
63    /// node_id → set of incoming edge_ids
64    pub incoming: HashMap<String, HashSet<String>>,
65    /// node_type → set of node_ids (type index)
66    pub by_type: HashMap<String, HashSet<String>>,
67    /// The ontology (for validation during materialization)
68    pub ontology: Ontology,
69    /// R-02: entries that failed ontology validation during apply().
70    /// These entries exist in the oplog (for CRDT convergence) but are
71    /// invisible in the materialized graph. Grow-only within a single
72    /// materialization pass. Cleared and rebuilt on `rebuild()` — this
73    /// allows previously-quarantined entries to be re-evaluated when the
74    /// ontology evolves (e.g., after ExtendOntology arrives via sync).
75    pub quarantined: HashSet<Hash>,
76}
77
78impl MaterializedGraph {
79    /// Create an empty materialized graph with the given ontology.
80    pub fn new(ontology: Ontology) -> Self {
81        Self {
82            nodes: HashMap::new(),
83            edges: HashMap::new(),
84            outgoing: HashMap::new(),
85            incoming: HashMap::new(),
86            by_type: HashMap::new(),
87            ontology,
88            quarantined: HashSet::new(),
89        }
90    }
91
92    /// Apply a single entry to the graph (incremental materialization).
93    ///
94    /// R-02: Validates AddNode/AddEdge payloads against the ontology.
95    /// Invalid entries are quarantined (added to `self.quarantined`) and
96    /// skipped for materialization. They remain in the oplog for CRDT
97    /// convergence — quarantine is a graph-layer concern, not an oplog concern.
98    pub fn apply(&mut self, entry: &Entry) {
99        match &entry.payload {
100            GraphOp::Checkpoint { ops, op_clocks, .. } => {
101                // R-08: Replay synthetic ops to restore graph state.
102                // Bug 6 fix: use per-op clocks (preserves LWW metadata).
103                for (i, op) in ops.iter().enumerate() {
104                    let clock = if i < op_clocks.len() {
105                        LamportClock::with_values(&entry.author, op_clocks[i].0, op_clocks[i].1)
106                    } else {
107                        entry.clock.clone() // fallback for old checkpoints without op_clocks
108                    };
109                    let synthetic = Entry::new(op.clone(), vec![], vec![], clock, &entry.author);
110                    self.apply(&synthetic);
111                }
112            }
113            GraphOp::DefineOntology { .. } => {
114                // Genesis — nothing to materialize.
115            }
116            GraphOp::ExtendOntology { extension } => {
117                if let Err(_e) = self.ontology.merge_extension(extension) {
118                    self.quarantined.insert(entry.hash);
119                }
120            }
121            GraphOp::AddNode {
122                node_id,
123                node_type,
124                subtype,
125                label,
126                properties,
127            } => {
128                // R-02: validate against ontology, quarantine if invalid
129                if let Err(_e) =
130                    self.ontology
131                        .validate_node(node_type, subtype.as_deref(), properties)
132                {
133                    self.quarantined.insert(entry.hash);
134                    return;
135                }
136                self.apply_add_node(
137                    node_id,
138                    node_type,
139                    subtype.as_deref(),
140                    label,
141                    properties,
142                    &entry.clock,
143                );
144            }
145            GraphOp::AddEdge {
146                edge_id,
147                edge_type,
148                source_id,
149                target_id,
150                properties,
151            } => {
152                // R-02: validate edge type exists.
153                if !self.ontology.edge_types.contains_key(edge_type.as_str()) {
154                    self.quarantined.insert(entry.hash);
155                    return;
156                }
157                // Bug 13 fix: validate source/target type constraints when both nodes
158                // are materialized. If one is missing (out-of-order sync), skip —
159                // validation happens on rebuild.
160                if let (Some(src), Some(tgt)) = (
161                    self.nodes.get(source_id.as_str()),
162                    self.nodes.get(target_id.as_str()),
163                ) {
164                    if self
165                        .ontology
166                        .validate_edge(edge_type, &src.node_type, &tgt.node_type, properties)
167                        .is_err()
168                    {
169                        self.quarantined.insert(entry.hash);
170                        return;
171                    }
172                }
173                self.apply_add_edge(
174                    edge_id,
175                    edge_type,
176                    source_id,
177                    target_id,
178                    properties,
179                    &entry.clock,
180                );
181            }
182            GraphOp::UpdateProperty {
183                entity_id,
184                key,
185                value,
186            } => {
187                self.apply_update_property(entity_id, key, value, &entry.clock);
188            }
189            GraphOp::RemoveNode { node_id } => {
190                self.apply_remove_node(node_id, &entry.clock);
191            }
192            GraphOp::RemoveEdge { edge_id } => {
193                self.apply_remove_edge(edge_id, &entry.clock);
194            }
195        }
196    }
197
198    /// Apply a sequence of entries (full rematerialization from op log).
199    pub fn apply_all(&mut self, entries: &[&Entry]) {
200        for entry in entries {
201            self.apply(entry);
202        }
203    }
204
205    /// Rebuild from scratch: clear everything and replay all entries.
206    pub fn rebuild(&mut self, entries: &[&Entry]) {
207        self.nodes.clear();
208        self.edges.clear();
209        self.outgoing.clear();
210        self.incoming.clear();
211        self.by_type.clear();
212        self.quarantined.clear();
213        self.apply_all(entries);
214    }
215
216    // -- Queries --
217
218    /// Get a node by ID (returns None if not found or tombstoned).
219    pub fn get_node(&self, node_id: &str) -> Option<&Node> {
220        self.nodes.get(node_id).filter(|n| !n.tombstoned)
221    }
222
223    /// Get an edge by ID (returns None if not found or tombstoned).
224    pub fn get_edge(&self, edge_id: &str) -> Option<&Edge> {
225        self.edges.get(edge_id).filter(|e| !e.tombstoned)
226    }
227
228    /// Query all live nodes of a given type, including descendants (RDFS rdfs9).
229    /// If "entity" has children "server" and "project", querying "entity" returns all three.
230    pub fn nodes_by_type(&self, node_type: &str) -> Vec<&Node> {
231        let mut types = vec![node_type.to_string()];
232        types.extend(
233            self.ontology
234                .descendants(node_type)
235                .into_iter()
236                .map(|s| s.to_string()),
237        );
238        types
239            .iter()
240            .flat_map(|t| self.by_type.get(t.as_str()))
241            .flatten()
242            .filter_map(|id| self.get_node(id))
243            .collect()
244    }
245
246    /// Query all live nodes of a given subtype.
247    pub fn nodes_by_subtype(&self, subtype: &str) -> Vec<&Node> {
248        self.nodes
249            .values()
250            .filter(|n| !n.tombstoned && n.subtype.as_deref() == Some(subtype))
251            .collect()
252    }
253
254    /// Query nodes by a property value.
255    pub fn nodes_by_property(&self, key: &str, value: &Value) -> Vec<&Node> {
256        self.nodes
257            .values()
258            .filter(|n| !n.tombstoned && n.properties.get(key) == Some(value))
259            .collect()
260    }
261
262    /// Get outgoing edges for a node (only live edges with live endpoints).
263    pub fn outgoing_edges(&self, node_id: &str) -> Vec<&Edge> {
264        match self.outgoing.get(node_id) {
265            Some(edge_ids) => edge_ids
266                .iter()
267                .filter_map(|eid| self.get_edge(eid))
268                .filter(|e| self.is_node_live(&e.target_id))
269                .collect(),
270            None => vec![],
271        }
272    }
273
274    /// Get incoming edges for a node (only live edges with live endpoints).
275    pub fn incoming_edges(&self, node_id: &str) -> Vec<&Edge> {
276        match self.incoming.get(node_id) {
277            Some(edge_ids) => edge_ids
278                .iter()
279                .filter_map(|eid| self.get_edge(eid))
280                .filter(|e| self.is_node_live(&e.source_id))
281                .collect(),
282            None => vec![],
283        }
284    }
285
286    /// Approximate heap memory used by the materialized graph (bytes).
287    /// Uses fixed overhead estimates per node/edge. Does not account for heap
288    /// allocations behind String/Vec in property values or allocator fragmentation.
289    /// Actual memory may be 2-3x higher for string-heavy graphs.
290    pub fn estimated_memory_bytes(&self) -> usize {
291        let mut total = 0;
292        // Nodes: id string + type string + label + properties + clocks + overhead
293        for node in self.nodes.values() {
294            total += node.node_id.len() + node.node_type.len() + node.label.len();
295            total += node.subtype.as_ref().map_or(0, |s| s.len());
296            // Properties: key + estimated value size + clock per property
297            for (k, v) in &node.properties {
298                total += k.len() + std::mem::size_of_val(v) + 48; // key + value + clock overhead
299            }
300            total += 128; // fixed struct overhead (clocks, bools, HashMap shells)
301        }
302        // Edges: similar structure
303        for edge in self.edges.values() {
304            total += edge.edge_id.len() + edge.edge_type.len();
305            total += edge.source_id.len() + edge.target_id.len();
306            for (k, v) in &edge.properties {
307                total += k.len() + std::mem::size_of_val(v) + 48;
308            }
309            total += 128;
310        }
311        // Adjacency indexes: outgoing + incoming (id strings + HashSet overhead)
312        for (k, set) in &self.outgoing {
313            total += k.len() + set.len() * 32;
314        }
315        for (k, set) in &self.incoming {
316            total += k.len() + set.len() * 32;
317        }
318        // Type index
319        for (k, set) in &self.by_type {
320            total += k.len() + set.len() * 32;
321        }
322        // Quarantine set
323        total += self.quarantined.len() * 48;
324        total
325    }
326
327    /// All live nodes.
328    pub fn all_nodes(&self) -> Vec<&Node> {
329        self.nodes.values().filter(|n| !n.tombstoned).collect()
330    }
331
332    /// All live edges (with live endpoints).
333    pub fn all_edges(&self) -> Vec<&Edge> {
334        self.edges
335            .values()
336            .filter(|e| {
337                !e.tombstoned && self.is_node_live(&e.source_id) && self.is_node_live(&e.target_id)
338            })
339            .collect()
340    }
341
342    /// Neighbors of a node (connected via outgoing edges).
343    pub fn neighbors(&self, node_id: &str) -> Vec<&str> {
344        self.outgoing_edges(node_id)
345            .iter()
346            .map(|e| e.target_id.as_str())
347            .collect()
348    }
349
350    /// Reverse neighbors (connected via incoming edges).
351    pub fn reverse_neighbors(&self, node_id: &str) -> Vec<&str> {
352        self.incoming_edges(node_id)
353            .iter()
354            .map(|e| e.source_id.as_str())
355            .collect()
356    }
357
358    // -- CRDT application helpers --
359
360    fn apply_add_node(
361        &mut self,
362        node_id: &str,
363        node_type: &str,
364        subtype: Option<&str>,
365        label: &str,
366        properties: &BTreeMap<String, Value>,
367        clock: &LamportClock,
368    ) {
369        if let Some(existing) = self.nodes.get_mut(node_id) {
370            // Add-wins: always resurrect from tombstone.
371            existing.tombstoned = false;
372            // Track the latest add clock for add-wins semantics.
373            if clock_wins(clock, &existing.last_add_clock) {
374                existing.last_add_clock = clock.clone();
375            }
376            // LWW merge for label, subtype, and properties.
377            if clock_wins(clock, &existing.last_clock) {
378                existing.label = label.to_string();
379                existing.subtype = subtype.map(|s| s.to_string());
380                existing.last_clock = clock.clone();
381            }
382            merge_properties_lww(
383                &mut existing.properties,
384                &mut existing.property_clocks,
385                properties,
386                clock,
387            );
388        } else {
389            let property_clocks: HashMap<String, LamportClock> = properties
390                .keys()
391                .map(|k| (k.clone(), clock.clone()))
392                .collect();
393            let node = Node {
394                node_id: node_id.to_string(),
395                node_type: node_type.to_string(),
396                subtype: subtype.map(|s| s.to_string()),
397                label: label.to_string(),
398                properties: properties.clone(),
399                property_clocks,
400                last_clock: clock.clone(),
401                last_add_clock: clock.clone(),
402                tombstoned: false,
403            };
404            self.by_type
405                .entry(node_type.to_string())
406                .or_default()
407                .insert(node_id.to_string());
408            self.nodes.insert(node_id.to_string(), node);
409        }
410    }
411
412    fn apply_add_edge(
413        &mut self,
414        edge_id: &str,
415        edge_type: &str,
416        source_id: &str,
417        target_id: &str,
418        properties: &BTreeMap<String, Value>,
419        clock: &LamportClock,
420    ) {
421        if let Some(existing) = self.edges.get_mut(edge_id) {
422            // Add-wins: always resurrect if tombstoned.
423            existing.tombstoned = false;
424            if clock_wins(clock, &existing.last_add_clock) {
425                existing.last_add_clock = clock.clone();
426            }
427            if clock_wins(clock, &existing.last_clock) {
428                existing.last_clock = clock.clone();
429            }
430            merge_properties_lww(
431                &mut existing.properties,
432                &mut existing.property_clocks,
433                properties,
434                clock,
435            );
436        } else {
437            let property_clocks: HashMap<String, LamportClock> = properties
438                .keys()
439                .map(|k| (k.clone(), clock.clone()))
440                .collect();
441            let edge = Edge {
442                edge_id: edge_id.to_string(),
443                edge_type: edge_type.to_string(),
444                source_id: source_id.to_string(),
445                target_id: target_id.to_string(),
446                properties: properties.clone(),
447                property_clocks,
448                last_clock: clock.clone(),
449                last_add_clock: clock.clone(),
450                tombstoned: false,
451            };
452            self.outgoing
453                .entry(source_id.to_string())
454                .or_default()
455                .insert(edge_id.to_string());
456            self.incoming
457                .entry(target_id.to_string())
458                .or_default()
459                .insert(edge_id.to_string());
460            self.edges.insert(edge_id.to_string(), edge);
461        }
462    }
463
464    fn apply_update_property(
465        &mut self,
466        entity_id: &str,
467        key: &str,
468        value: &Value,
469        clock: &LamportClock,
470    ) {
471        // Try node first, then edge. Per-property LWW: each key competes
472        // only with other writes to the same key, not the entire entity.
473        if let Some(node) = self.nodes.get_mut(entity_id) {
474            let dominated = node
475                .property_clocks
476                .get(key)
477                .map(|c| clock_wins(clock, c))
478                .unwrap_or(true);
479            if dominated {
480                node.properties.insert(key.to_string(), value.clone());
481                node.property_clocks.insert(key.to_string(), clock.clone());
482            }
483            // Update entity-level clock for add-wins tracking.
484            if clock_wins(clock, &node.last_clock) {
485                node.last_clock = clock.clone();
486            }
487        } else if let Some(edge) = self.edges.get_mut(entity_id) {
488            let dominated = edge
489                .property_clocks
490                .get(key)
491                .map(|c| clock_wins(clock, c))
492                .unwrap_or(true);
493            if dominated {
494                edge.properties.insert(key.to_string(), value.clone());
495                edge.property_clocks.insert(key.to_string(), clock.clone());
496            }
497            if clock_wins(clock, &edge.last_clock) {
498                edge.last_clock = clock.clone();
499            }
500        }
501        // If entity not found, silently ignore (may arrive out of order in sync).
502    }
503
504    fn apply_remove_node(&mut self, node_id: &str, clock: &LamportClock) {
505        if let Some(node) = self.nodes.get_mut(node_id) {
506            // Add-wins: only tombstone if the remove clock is strictly greater
507            // than the last add clock. If a concurrent (or later) add exists,
508            // the node stays alive.
509            if clock_wins(clock, &node.last_add_clock) {
510                node.tombstoned = true;
511                node.last_clock = clock.clone();
512            }
513        }
514        // Tombstoning a node doesn't physically remove edges — they just become
515        // invisible via is_node_live() checks in queries.
516    }
517
518    fn apply_remove_edge(&mut self, edge_id: &str, clock: &LamportClock) {
519        if let Some(edge) = self.edges.get_mut(edge_id) {
520            // Add-wins: only tombstone if remove clock > last add clock.
521            if clock_wins(clock, &edge.last_add_clock) {
522                edge.tombstoned = true;
523                edge.last_clock = clock.clone();
524            }
525        }
526    }
527
528    fn is_node_live(&self, node_id: &str) -> bool {
529        self.nodes
530            .get(node_id)
531            .map(|n| !n.tombstoned)
532            .unwrap_or(false)
533    }
534}
535
536/// Per-property LWW merge: each property from `new_props` competes with
537/// existing properties. Higher clock wins per key.
538fn merge_properties_lww(
539    existing_props: &mut BTreeMap<String, Value>,
540    existing_clocks: &mut HashMap<String, LamportClock>,
541    new_props: &BTreeMap<String, Value>,
542    clock: &LamportClock,
543) {
544    for (k, v) in new_props {
545        let dominated = existing_clocks
546            .get(k)
547            .map(|c| clock_wins(clock, c))
548            .unwrap_or(true);
549        if dominated {
550            existing_props.insert(k.clone(), v.clone());
551            existing_clocks.insert(k.clone(), clock.clone());
552        }
553    }
554}
555
556/// LWW comparison: returns true if `new_clock` wins over `existing_clock`.
557/// Uses HybridClock total ordering: (physical_ms, logical, id).
558fn clock_wins(new_clock: &LamportClock, existing_clock: &LamportClock) -> bool {
559    new_clock.cmp_order(existing_clock) == std::cmp::Ordering::Greater
560}
561
562#[cfg(test)]
563mod tests {
564    use super::*;
565    use crate::entry::Entry;
566    use crate::ontology::{EdgeTypeDef, NodeTypeDef};
567
568    fn test_ontology() -> Ontology {
569        Ontology {
570            node_types: BTreeMap::from([
571                (
572                    "entity".into(),
573                    NodeTypeDef {
574                        description: None,
575                        properties: BTreeMap::new(),
576                        subtypes: None,
577                        parent_type: None,
578                    },
579                ),
580                (
581                    "signal".into(),
582                    NodeTypeDef {
583                        description: None,
584                        properties: BTreeMap::new(),
585                        subtypes: None,
586                        parent_type: None,
587                    },
588                ),
589            ]),
590            edge_types: BTreeMap::from([
591                (
592                    "RUNS_ON".into(),
593                    EdgeTypeDef {
594                        description: None,
595                        source_types: vec!["entity".into()],
596                        target_types: vec!["entity".into()],
597                        properties: BTreeMap::new(),
598                    },
599                ),
600                (
601                    "OBSERVES".into(),
602                    EdgeTypeDef {
603                        description: None,
604                        source_types: vec!["signal".into()],
605                        target_types: vec!["entity".into()],
606                        properties: BTreeMap::new(),
607                    },
608                ),
609            ]),
610        }
611    }
612
613    fn make_entry(op: GraphOp, clock_time: u64, author: &str) -> Entry {
614        Entry::new(
615            op,
616            vec![],
617            vec![],
618            LamportClock::with_values(author, clock_time, 0),
619            author,
620        )
621    }
622
623    // -- test_graph.rs spec from docs/silk.md --
624
625    #[test]
626    fn add_node_appears_in_query() {
627        let mut g = MaterializedGraph::new(test_ontology());
628        let entry = make_entry(
629            GraphOp::AddNode {
630                node_id: "server-1".into(),
631                node_type: "entity".into(),
632                label: "Server 1".into(),
633                properties: BTreeMap::from([("ip".into(), Value::String("10.0.0.1".into()))]),
634                subtype: None,
635            },
636            1,
637            "inst-a",
638        );
639        g.apply(&entry);
640
641        let node = g.get_node("server-1").unwrap();
642        assert_eq!(node.node_type, "entity");
643        assert_eq!(node.label, "Server 1");
644        assert_eq!(
645            node.properties.get("ip"),
646            Some(&Value::String("10.0.0.1".into()))
647        );
648    }
649
650    #[test]
651    fn add_edge_creates_adjacency() {
652        let mut g = MaterializedGraph::new(test_ontology());
653        g.apply(&make_entry(
654            GraphOp::AddNode {
655                node_id: "svc".into(),
656                node_type: "entity".into(),
657                label: "svc".into(),
658                properties: BTreeMap::new(),
659                subtype: None,
660            },
661            1,
662            "inst-a",
663        ));
664        g.apply(&make_entry(
665            GraphOp::AddNode {
666                node_id: "srv".into(),
667                node_type: "entity".into(),
668                label: "srv".into(),
669                properties: BTreeMap::new(),
670                subtype: None,
671            },
672            2,
673            "inst-a",
674        ));
675        g.apply(&make_entry(
676            GraphOp::AddEdge {
677                edge_id: "e1".into(),
678                edge_type: "RUNS_ON".into(),
679                source_id: "svc".into(),
680                target_id: "srv".into(),
681                properties: BTreeMap::new(),
682            },
683            3,
684            "inst-a",
685        ));
686
687        // Both endpoints know about the edge.
688        let out = g.outgoing_edges("svc");
689        assert_eq!(out.len(), 1);
690        assert_eq!(out[0].target_id, "srv");
691
692        let inc = g.incoming_edges("srv");
693        assert_eq!(inc.len(), 1);
694        assert_eq!(inc[0].source_id, "svc");
695
696        assert_eq!(g.neighbors("svc"), vec!["srv"]);
697    }
698
699    #[test]
700    fn update_property_reflected() {
701        let mut g = MaterializedGraph::new(test_ontology());
702        g.apply(&make_entry(
703            GraphOp::AddNode {
704                node_id: "s1".into(),
705                node_type: "entity".into(),
706                label: "s1".into(),
707                properties: BTreeMap::new(),
708                subtype: None,
709            },
710            1,
711            "inst-a",
712        ));
713        g.apply(&make_entry(
714            GraphOp::UpdateProperty {
715                entity_id: "s1".into(),
716                key: "cpu".into(),
717                value: Value::Float(85.5),
718            },
719            2,
720            "inst-a",
721        ));
722
723        let node = g.get_node("s1").unwrap();
724        assert_eq!(node.properties.get("cpu"), Some(&Value::Float(85.5)));
725    }
726
727    #[test]
728    fn remove_node_cascades_edges() {
729        let mut g = MaterializedGraph::new(test_ontology());
730        g.apply(&make_entry(
731            GraphOp::AddNode {
732                node_id: "a".into(),
733                node_type: "entity".into(),
734                label: "a".into(),
735                properties: BTreeMap::new(),
736                subtype: None,
737            },
738            1,
739            "inst-a",
740        ));
741        g.apply(&make_entry(
742            GraphOp::AddNode {
743                node_id: "b".into(),
744                node_type: "entity".into(),
745                label: "b".into(),
746                properties: BTreeMap::new(),
747                subtype: None,
748            },
749            2,
750            "inst-a",
751        ));
752        g.apply(&make_entry(
753            GraphOp::AddEdge {
754                edge_id: "e1".into(),
755                edge_type: "RUNS_ON".into(),
756                source_id: "a".into(),
757                target_id: "b".into(),
758                properties: BTreeMap::new(),
759            },
760            3,
761            "inst-a",
762        ));
763        assert_eq!(g.all_edges().len(), 1);
764
765        // Remove node 'b' — edge becomes invisible (dangling target).
766        g.apply(&make_entry(
767            GraphOp::RemoveNode {
768                node_id: "b".into(),
769            },
770            4,
771            "inst-a",
772        ));
773        assert!(g.get_node("b").is_none());
774        // Edge still exists but not returned by all_edges (target tombstoned).
775        assert_eq!(g.all_edges().len(), 0);
776        // Outgoing from 'a' also filters out dangling edges.
777        assert_eq!(g.outgoing_edges("a").len(), 0);
778    }
779
780    #[test]
781    fn remove_edge_preserves_nodes() {
782        let mut g = MaterializedGraph::new(test_ontology());
783        g.apply(&make_entry(
784            GraphOp::AddNode {
785                node_id: "a".into(),
786                node_type: "entity".into(),
787                label: "a".into(),
788                properties: BTreeMap::new(),
789                subtype: None,
790            },
791            1,
792            "inst-a",
793        ));
794        g.apply(&make_entry(
795            GraphOp::AddNode {
796                node_id: "b".into(),
797                node_type: "entity".into(),
798                label: "b".into(),
799                properties: BTreeMap::new(),
800                subtype: None,
801            },
802            2,
803            "inst-a",
804        ));
805        g.apply(&make_entry(
806            GraphOp::AddEdge {
807                edge_id: "e1".into(),
808                edge_type: "RUNS_ON".into(),
809                source_id: "a".into(),
810                target_id: "b".into(),
811                properties: BTreeMap::new(),
812            },
813            3,
814            "inst-a",
815        ));
816        g.apply(&make_entry(
817            GraphOp::RemoveEdge {
818                edge_id: "e1".into(),
819            },
820            4,
821            "inst-a",
822        ));
823
824        // Nodes still exist.
825        assert!(g.get_node("a").is_some());
826        assert!(g.get_node("b").is_some());
827        // Edge is gone.
828        assert!(g.get_edge("e1").is_none());
829        assert_eq!(g.all_edges().len(), 0);
830    }
831
832    #[test]
833    fn query_by_type_filters() {
834        let mut g = MaterializedGraph::new(test_ontology());
835        g.apply(&make_entry(
836            GraphOp::AddNode {
837                node_id: "s1".into(),
838                node_type: "entity".into(),
839                label: "s1".into(),
840                properties: BTreeMap::new(),
841                subtype: None,
842            },
843            1,
844            "inst-a",
845        ));
846        g.apply(&make_entry(
847            GraphOp::AddNode {
848                node_id: "s2".into(),
849                node_type: "entity".into(),
850                label: "s2".into(),
851                properties: BTreeMap::new(),
852                subtype: None,
853            },
854            2,
855            "inst-a",
856        ));
857        g.apply(&make_entry(
858            GraphOp::AddNode {
859                node_id: "alert".into(),
860                node_type: "signal".into(),
861                label: "alert".into(),
862                properties: BTreeMap::new(),
863                subtype: None,
864            },
865            3,
866            "inst-a",
867        ));
868
869        let entities = g.nodes_by_type("entity");
870        assert_eq!(entities.len(), 2);
871        let signals = g.nodes_by_type("signal");
872        assert_eq!(signals.len(), 1);
873        assert_eq!(signals[0].node_id, "alert");
874    }
875
876    #[test]
877    fn query_by_property_filters() {
878        let mut g = MaterializedGraph::new(test_ontology());
879        g.apply(&make_entry(
880            GraphOp::AddNode {
881                node_id: "s1".into(),
882                node_type: "entity".into(),
883                label: "s1".into(),
884                properties: BTreeMap::from([("status".into(), Value::String("alive".into()))]),
885                subtype: None,
886            },
887            1,
888            "inst-a",
889        ));
890        g.apply(&make_entry(
891            GraphOp::AddNode {
892                node_id: "s2".into(),
893                node_type: "entity".into(),
894                label: "s2".into(),
895                properties: BTreeMap::from([("status".into(), Value::String("dead".into()))]),
896                subtype: None,
897            },
898            2,
899            "inst-a",
900        ));
901
902        let alive = g.nodes_by_property("status", &Value::String("alive".into()));
903        assert_eq!(alive.len(), 1);
904        assert_eq!(alive[0].node_id, "s1");
905    }
906
907    #[test]
908    fn materialization_from_empty() {
909        // Build graph incrementally.
910        let mut g1 = MaterializedGraph::new(test_ontology());
911        let entries = vec![
912            make_entry(
913                GraphOp::DefineOntology {
914                    ontology: test_ontology(),
915                },
916                0,
917                "inst-a",
918            ),
919            make_entry(
920                GraphOp::AddNode {
921                    node_id: "a".into(),
922                    node_type: "entity".into(),
923                    label: "a".into(),
924                    properties: BTreeMap::new(),
925                    subtype: None,
926                },
927                1,
928                "inst-a",
929            ),
930            make_entry(
931                GraphOp::AddNode {
932                    node_id: "b".into(),
933                    node_type: "entity".into(),
934                    label: "b".into(),
935                    properties: BTreeMap::new(),
936                    subtype: None,
937                },
938                2,
939                "inst-a",
940            ),
941            make_entry(
942                GraphOp::AddEdge {
943                    edge_id: "e1".into(),
944                    edge_type: "RUNS_ON".into(),
945                    source_id: "a".into(),
946                    target_id: "b".into(),
947                    properties: BTreeMap::new(),
948                },
949                3,
950                "inst-a",
951            ),
952        ];
953        for e in &entries {
954            g1.apply(e);
955        }
956
957        // Rebuild from scratch.
958        let mut g2 = MaterializedGraph::new(test_ontology());
959        let refs: Vec<&Entry> = entries.iter().collect();
960        g2.rebuild(&refs);
961
962        // Same result.
963        assert_eq!(g1.all_nodes().len(), g2.all_nodes().len());
964        assert_eq!(g1.all_edges().len(), g2.all_edges().len());
965        for node in g1.all_nodes() {
966            let n2 = g2.get_node(&node.node_id).unwrap();
967            assert_eq!(node.node_type, n2.node_type);
968            assert_eq!(node.properties, n2.properties);
969        }
970    }
971
972    #[test]
973    fn incremental_equals_full() {
974        let entries = vec![
975            make_entry(
976                GraphOp::DefineOntology {
977                    ontology: test_ontology(),
978                },
979                0,
980                "inst-a",
981            ),
982            make_entry(
983                GraphOp::AddNode {
984                    node_id: "a".into(),
985                    node_type: "entity".into(),
986                    label: "a".into(),
987                    properties: BTreeMap::from([("x".into(), Value::Int(1))]),
988                    subtype: None,
989                },
990                1,
991                "inst-a",
992            ),
993            make_entry(
994                GraphOp::UpdateProperty {
995                    entity_id: "a".into(),
996                    key: "x".into(),
997                    value: Value::Int(2),
998                },
999                2,
1000                "inst-a",
1001            ),
1002            make_entry(
1003                GraphOp::AddNode {
1004                    node_id: "b".into(),
1005                    node_type: "entity".into(),
1006                    label: "b".into(),
1007                    properties: BTreeMap::new(),
1008                    subtype: None,
1009                },
1010                3,
1011                "inst-a",
1012            ),
1013            make_entry(
1014                GraphOp::AddEdge {
1015                    edge_id: "e1".into(),
1016                    edge_type: "RUNS_ON".into(),
1017                    source_id: "a".into(),
1018                    target_id: "b".into(),
1019                    properties: BTreeMap::new(),
1020                },
1021                4,
1022                "inst-a",
1023            ),
1024            make_entry(
1025                GraphOp::RemoveEdge {
1026                    edge_id: "e1".into(),
1027                },
1028                5,
1029                "inst-a",
1030            ),
1031        ];
1032
1033        // Incremental.
1034        let mut g_inc = MaterializedGraph::new(test_ontology());
1035        for e in &entries {
1036            g_inc.apply(e);
1037        }
1038
1039        // Full replay.
1040        let mut g_full = MaterializedGraph::new(test_ontology());
1041        let refs: Vec<&Entry> = entries.iter().collect();
1042        g_full.rebuild(&refs);
1043
1044        // Property should be 2 (updated).
1045        assert_eq!(
1046            g_inc.get_node("a").unwrap().properties.get("x"),
1047            Some(&Value::Int(2))
1048        );
1049        assert_eq!(
1050            g_full.get_node("a").unwrap().properties.get("x"),
1051            Some(&Value::Int(2))
1052        );
1053        // Edge should be removed.
1054        assert_eq!(g_inc.all_edges().len(), 0);
1055        assert_eq!(g_full.all_edges().len(), 0);
1056    }
1057
1058    #[test]
1059    fn lww_concurrent_property_update() {
1060        // Two instances update the same property — higher clock wins.
1061        let mut g = MaterializedGraph::new(test_ontology());
1062        g.apply(&make_entry(
1063            GraphOp::AddNode {
1064                node_id: "s1".into(),
1065                node_type: "entity".into(),
1066                label: "s1".into(),
1067                properties: BTreeMap::new(),
1068                subtype: None,
1069            },
1070            1,
1071            "inst-a",
1072        ));
1073        // inst-a sets status=alive at time 2
1074        g.apply(&make_entry(
1075            GraphOp::UpdateProperty {
1076                entity_id: "s1".into(),
1077                key: "status".into(),
1078                value: Value::String("alive".into()),
1079            },
1080            2,
1081            "inst-a",
1082        ));
1083        // inst-b sets status=dead at time 3 — wins (higher clock)
1084        g.apply(&make_entry(
1085            GraphOp::UpdateProperty {
1086                entity_id: "s1".into(),
1087                key: "status".into(),
1088                value: Value::String("dead".into()),
1089            },
1090            3,
1091            "inst-b",
1092        ));
1093        assert_eq!(
1094            g.get_node("s1").unwrap().properties.get("status"),
1095            Some(&Value::String("dead".into()))
1096        );
1097    }
1098
1099    #[test]
1100    fn lww_tiebreak_by_instance_id() {
1101        // Same clock time — higher instance ID wins.
1102        let mut g = MaterializedGraph::new(test_ontology());
1103        g.apply(&make_entry(
1104            GraphOp::AddNode {
1105                node_id: "s1".into(),
1106                node_type: "entity".into(),
1107                label: "s1".into(),
1108                properties: BTreeMap::new(),
1109                subtype: None,
1110            },
1111            1,
1112            "inst-a",
1113        ));
1114        // Both at physical_ms=5, logical=0. Lower id wins → "inst-a" wins.
1115        g.apply(&make_entry(
1116            GraphOp::UpdateProperty {
1117                entity_id: "s1".into(),
1118                key: "x".into(),
1119                value: Value::Int(1),
1120            },
1121            5,
1122            "inst-a",
1123        ));
1124        g.apply(&make_entry(
1125            GraphOp::UpdateProperty {
1126                entity_id: "s1".into(),
1127                key: "x".into(),
1128                value: Value::Int(2),
1129            },
1130            5,
1131            "inst-b",
1132        ));
1133        // inst-a has lower id → wins the tiebreak → value stays Int(1).
1134        assert_eq!(
1135            g.get_node("s1").unwrap().properties.get("x"),
1136            Some(&Value::Int(1))
1137        );
1138    }
1139
1140    #[test]
1141    fn lww_per_property_concurrent_different_keys() {
1142        // Two instances concurrently update DIFFERENT properties at the same
1143        // clock time. Both updates must be accepted — they don't conflict.
1144        // This requires per-property LWW, not node-level LWW.
1145        let mut g = MaterializedGraph::new(test_ontology());
1146        g.apply(&make_entry(
1147            GraphOp::AddNode {
1148                node_id: "s1".into(),
1149                node_type: "entity".into(),
1150                label: "s1".into(),
1151                properties: BTreeMap::from([
1152                    ("x".into(), Value::Int(0)),
1153                    ("y".into(), Value::Int(0)),
1154                ]),
1155                subtype: None,
1156            },
1157            1,
1158            "inst-a",
1159        ));
1160        // inst-a updates "x" at time 3
1161        g.apply(&make_entry(
1162            GraphOp::UpdateProperty {
1163                entity_id: "s1".into(),
1164                key: "x".into(),
1165                value: Value::Int(42),
1166            },
1167            3,
1168            "inst-a",
1169        ));
1170        // inst-b updates "y" at time 3 (concurrent, different property)
1171        g.apply(&make_entry(
1172            GraphOp::UpdateProperty {
1173                entity_id: "s1".into(),
1174                key: "y".into(),
1175                value: Value::Int(99),
1176            },
1177            3,
1178            "inst-b",
1179        ));
1180
1181        let node = g.get_node("s1").unwrap();
1182        // Both updates must be applied — no conflict.
1183        assert_eq!(
1184            node.properties.get("x"),
1185            Some(&Value::Int(42)),
1186            "update to 'x' must not be rejected by concurrent update to 'y'"
1187        );
1188        assert_eq!(
1189            node.properties.get("y"),
1190            Some(&Value::Int(99)),
1191            "update to 'y' must not be rejected by concurrent update to 'x'"
1192        );
1193    }
1194
1195    #[test]
1196    fn lww_per_property_order_independent() {
1197        // Same scenario but applied in reverse order — result must be identical.
1198        let mut g = MaterializedGraph::new(test_ontology());
1199        g.apply(&make_entry(
1200            GraphOp::AddNode {
1201                node_id: "s1".into(),
1202                node_type: "entity".into(),
1203                label: "s1".into(),
1204                properties: BTreeMap::from([
1205                    ("x".into(), Value::Int(0)),
1206                    ("y".into(), Value::Int(0)),
1207                ]),
1208                subtype: None,
1209            },
1210            1,
1211            "inst-a",
1212        ));
1213        // Apply inst-b first this time
1214        g.apply(&make_entry(
1215            GraphOp::UpdateProperty {
1216                entity_id: "s1".into(),
1217                key: "y".into(),
1218                value: Value::Int(99),
1219            },
1220            3,
1221            "inst-b",
1222        ));
1223        g.apply(&make_entry(
1224            GraphOp::UpdateProperty {
1225                entity_id: "s1".into(),
1226                key: "x".into(),
1227                value: Value::Int(42),
1228            },
1229            3,
1230            "inst-a",
1231        ));
1232
1233        let node = g.get_node("s1").unwrap();
1234        assert_eq!(node.properties.get("x"), Some(&Value::Int(42)));
1235        assert_eq!(node.properties.get("y"), Some(&Value::Int(99)));
1236    }
1237
1238    #[test]
1239    fn add_wins_over_remove() {
1240        // Concurrent add + remove → node should exist (add-wins).
1241        let mut g = MaterializedGraph::new(test_ontology());
1242        g.apply(&make_entry(
1243            GraphOp::AddNode {
1244                node_id: "s1".into(),
1245                node_type: "entity".into(),
1246                label: "s1".into(),
1247                properties: BTreeMap::new(),
1248                subtype: None,
1249            },
1250            1,
1251            "inst-a",
1252        ));
1253        // Remove at time 2.
1254        g.apply(&make_entry(
1255            GraphOp::RemoveNode {
1256                node_id: "s1".into(),
1257            },
1258            2,
1259            "inst-a",
1260        ));
1261        assert!(g.get_node("s1").is_none());
1262
1263        // Re-add at time 3 (add-wins — resurrects).
1264        g.apply(&make_entry(
1265            GraphOp::AddNode {
1266                node_id: "s1".into(),
1267                node_type: "entity".into(),
1268                label: "s1 v2".into(),
1269                properties: BTreeMap::new(),
1270                subtype: None,
1271            },
1272            3,
1273            "inst-b",
1274        ));
1275        let node = g.get_node("s1").unwrap();
1276        assert_eq!(node.label, "s1 v2");
1277        assert!(!node.tombstoned);
1278    }
1279}