Skip to main content

velesdb_core/collection/graph/
edge.rs

1//! Graph edge types and storage for knowledge graph relationships.
2//!
3//! This module provides:
4//! - `GraphEdge`: A typed relationship between nodes with properties
5//! - `EdgeStore`: Bidirectional index for efficient edge traversal
6//!
7//! CSR snapshot types are in [`super::csr_snapshot`].
8//!
9//! # Edge Removal Semantics
10//!
11//! During edge removal, the internal indexes may be temporarily inconsistent
12//! while the operation is in progress. The final state is always consistent.
13//! For concurrent access, use `ConcurrentEdgeStore` instead.
14
15use super::csr_snapshot::CsrSnapshot;
16use crate::error::{Error, Result};
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use std::collections::HashMap;
20
21/// A directed edge (relationship) in the knowledge graph.
22///
23/// Edges connect nodes and can have a label (type) and properties.
24///
25/// # Example
26///
27/// ```rust,ignore
28/// use velesdb_core::collection::graph::GraphEdge;
29/// use serde_json::json;
30/// use std::collections::HashMap;
31///
32/// let mut props = HashMap::new();
33/// props.insert("since".to_string(), json!("2020-01-01"));
34///
35/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")
36///     .with_properties(props);
37/// ```
38#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
39pub struct GraphEdge {
40    id: u64,
41    source: u64,
42    target: u64,
43    label: String,
44    properties: HashMap<String, Value>,
45}
46
47impl GraphEdge {
48    /// Creates a new edge with the given ID, endpoints, and label.
49    ///
50    /// # Errors
51    ///
52    /// Returns `Error::InvalidEdgeLabel` if the label is empty or whitespace-only.
53    pub fn new(id: u64, source: u64, target: u64, label: &str) -> Result<Self> {
54        let trimmed = label.trim();
55        if trimmed.is_empty() {
56            return Err(Error::InvalidEdgeLabel(
57                "Edge label cannot be empty or whitespace-only".to_string(),
58            ));
59        }
60        Ok(Self {
61            id,
62            source,
63            target,
64            label: trimmed.to_string(),
65            properties: HashMap::new(),
66        })
67    }
68
69    /// Adds properties to this edge (builder pattern).
70    #[must_use]
71    pub fn with_properties(mut self, properties: HashMap<String, Value>) -> Self {
72        self.properties = properties;
73        self
74    }
75
76    /// Returns the edge ID.
77    #[must_use]
78    pub fn id(&self) -> u64 {
79        self.id
80    }
81
82    /// Returns the source node ID.
83    #[must_use]
84    pub fn source(&self) -> u64 {
85        self.source
86    }
87
88    /// Returns the target node ID.
89    #[must_use]
90    pub fn target(&self) -> u64 {
91        self.target
92    }
93
94    /// Returns the edge label (relationship type).
95    #[must_use]
96    pub fn label(&self) -> &str {
97        &self.label
98    }
99
100    /// Returns all properties of this edge.
101    #[must_use]
102    pub fn properties(&self) -> &HashMap<String, Value> {
103        &self.properties
104    }
105
106    /// Returns a specific property value, if it exists.
107    #[must_use]
108    pub fn property(&self, name: &str) -> Option<&Value> {
109        self.properties.get(name)
110    }
111}
112
113/// Storage for graph edges with bidirectional indexing.
114///
115/// Provides O(1) access to edges by ID and O(degree) access to
116/// outgoing/incoming edges for any node.
117///
118/// # Index Structure (EPIC-019 US-003)
119///
120/// - `by_label`: Secondary index for O(k) label-based queries
121/// - `outgoing_by_label`: Composite index (source, label) for O(k) filtered traversal
122///
123/// # CSR Snapshot (G1)
124///
125/// After loading from disk or after explicit `build_read_snapshot()`, the
126/// `csr_snapshot` field provides zero-copy `&[u64]` access to neighbor
127/// target IDs and edge IDs. Writes invalidate the snapshot automatically.
128#[derive(Debug, Default, Serialize, Deserialize)]
129pub struct EdgeStore {
130    /// All edges indexed by ID
131    pub(super) edges: HashMap<u64, GraphEdge>,
132    /// Outgoing edges: source_id -> Vec<edge_id>
133    pub(super) outgoing: HashMap<u64, Vec<u64>>,
134    /// Incoming edges: target_id -> Vec<edge_id>
135    pub(super) incoming: HashMap<u64, Vec<u64>>,
136    /// Secondary index: label -> Vec<edge_id> for fast label queries
137    pub(super) by_label: HashMap<String, Vec<u64>>,
138    /// Composite index: (source_id, label) -> Vec<edge_id> for fast filtered traversal
139    pub(super) outgoing_by_label: HashMap<(u64, String), Vec<u64>>,
140    /// Zero-copy CSR snapshot for BFS traversal (G1).
141    /// Built on-demand via `build_read_snapshot()`, invalidated by writes.
142    #[serde(skip)]
143    pub(super) csr_snapshot: Option<CsrSnapshot>,
144}
145
146impl EdgeStore {
147    /// Creates a new empty edge store.
148    #[must_use]
149    pub fn new() -> Self {
150        Self::default()
151    }
152
153    /// Creates an edge store with pre-allocated capacity for better performance.
154    ///
155    /// Pre-allocating reduces memory reallocation overhead when inserting many edges.
156    /// With 10M edges, this can reduce peak memory usage by ~2x and improve insert throughput.
157    ///
158    /// Note: when accessed through the sharded `ConcurrentEdgeStore`, an edge
159    /// whose endpoints hash to different shards is stored in **both** shards
160    /// (outgoing + incoming halves), which offsets part of that saving — with
161    /// many shards this applies to nearly all edges.
162    ///
163    /// # Arguments
164    ///
165    /// * `expected_edges` - Expected number of edges to store
166    /// * `expected_nodes` - Expected number of unique nodes (sources + targets)
167    ///
168    /// # Example
169    ///
170    /// ```rust,ignore
171    /// // For a graph with ~1M edges and ~100K nodes
172    /// let store = EdgeStore::with_capacity(1_000_000, 100_000);
173    /// ```
174    #[must_use]
175    pub fn with_capacity(expected_edges: usize, expected_nodes: usize) -> Self {
176        // Estimate ~10 unique labels typical for knowledge graphs
177        let expected_labels = 10usize;
178        // Use saturating_mul to prevent overflow for extreme inputs
179        let outgoing_by_label_cap = expected_nodes
180            .saturating_mul(expected_labels)
181            .saturating_div(10);
182        Self {
183            edges: HashMap::with_capacity(expected_edges),
184            outgoing: HashMap::with_capacity(expected_nodes),
185            incoming: HashMap::with_capacity(expected_nodes),
186            by_label: HashMap::with_capacity(expected_labels),
187            outgoing_by_label: HashMap::with_capacity(outgoing_by_label_cap),
188            csr_snapshot: None,
189        }
190    }
191
192    /// Adds an edge to the store.
193    ///
194    /// Creates bidirectional index entries for efficient traversal.
195    /// Also maintains label-based secondary indices (EPIC-019 US-003).
196    ///
197    /// # Errors
198    ///
199    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
200    pub fn add_edge(&mut self, edge: GraphEdge) -> Result<()> {
201        self.insert_edge(edge, true, true)
202    }
203
204    /// Adds an edge with only the outgoing index (for cross-shard storage).
205    ///
206    /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
207    /// The edge is stored and indexed by source node only.
208    ///
209    /// # Errors
210    ///
211    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
212    pub fn add_edge_outgoing_only(&mut self, edge: GraphEdge) -> Result<()> {
213        self.insert_edge(edge, true, false)
214    }
215
216    /// Adds an edge with only the incoming index (for cross-shard storage).
217    ///
218    /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
219    /// The edge is stored and indexed by target node only.
220    /// Note: Label indices are maintained by the source shard in `ConcurrentEdgeStore`.
221    ///
222    /// # Errors
223    ///
224    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
225    pub fn add_edge_incoming_only(&mut self, edge: GraphEdge) -> Result<()> {
226        self.insert_edge(edge, false, true)
227    }
228
229    /// Shared implementation for all `add_edge*` variants.
230    ///
231    /// Validates uniqueness, populates the requested directional indices,
232    /// and stores the edge. Label indices (`by_label`, `outgoing_by_label`)
233    /// are maintained only when `index_outgoing` is `true` (source shard
234    /// owns label indices in the concurrent model).
235    fn insert_edge(
236        &mut self,
237        edge: GraphEdge,
238        index_outgoing: bool,
239        index_incoming: bool,
240    ) -> Result<()> {
241        let id = edge.id();
242        if self.edges.contains_key(&id) {
243            return Err(Error::EdgeExists(id));
244        }
245
246        if index_outgoing {
247            let source = edge.source();
248            let label = edge.label().to_string();
249            self.outgoing.entry(source).or_default().push(id);
250            // Label indices are owned by the source shard (US-003)
251            self.by_label.entry(label.clone()).or_default().push(id);
252            self.outgoing_by_label
253                .entry((source, label))
254                .or_default()
255                .push(id);
256        }
257
258        if index_incoming {
259            self.incoming.entry(edge.target()).or_default().push(id);
260        }
261
262        self.edges.insert(id, edge);
263        // Invalidate CSR snapshot — writes make it stale (G1).
264        self.csr_snapshot = None;
265        Ok(())
266    }
267
268    /// Returns the total number of edges in the store.
269    #[must_use]
270    pub fn edge_count(&self) -> usize {
271        self.edges.len()
272    }
273
274    /// Returns the count of edges where this shard is the source (for accurate cross-shard counting).
275    #[must_use]
276    pub fn outgoing_edge_count(&self) -> usize {
277        self.outgoing.values().map(Vec::len).sum()
278    }
279
280    /// Gets an edge by its ID.
281    #[must_use]
282    pub fn get_edge(&self, id: u64) -> Option<&GraphEdge> {
283        self.edges.get(&id)
284    }
285
286    /// Gets all outgoing edges from a node.
287    #[must_use]
288    pub fn get_outgoing(&self, node_id: u64) -> Vec<&GraphEdge> {
289        self.resolve_edge_ids(self.outgoing.get(&node_id))
290    }
291
292    /// Invokes `f` for each outgoing edge from `node_id` without allocating a `Vec`.
293    ///
294    /// Prefer this over [`get_outgoing`](Self::get_outgoing) in hot loops (e.g. BFS
295    /// frontiers) where the caller processes edges inline rather than collecting them.
296    #[inline]
297    pub fn for_each_outgoing<F: FnMut(&GraphEdge)>(&self, node_id: u64, mut f: F) {
298        if let Some(ids) = self.outgoing.get(&node_id) {
299            for id in ids {
300                if let Some(edge) = self.edges.get(id) {
301                    f(edge);
302                }
303            }
304        }
305    }
306
307    /// Returns the number of outgoing edges from `node_id` without materializing them.
308    #[must_use]
309    #[inline]
310    pub fn outgoing_degree(&self, node_id: u64) -> usize {
311        self.outgoing.get(&node_id).map_or(0, Vec::len)
312    }
313
314    /// Returns the number of incoming edges to `node_id` without materializing them.
315    #[must_use]
316    #[inline]
317    pub fn incoming_degree(&self, node_id: u64) -> usize {
318        self.incoming.get(&node_id).map_or(0, Vec::len)
319    }
320
321    /// Gets all incoming edges to a node.
322    #[must_use]
323    pub fn get_incoming(&self, node_id: u64) -> Vec<&GraphEdge> {
324        self.resolve_edge_ids(self.incoming.get(&node_id))
325    }
326
327    /// Gets outgoing edges filtered by label using composite index - O(k) where k = result count.
328    ///
329    /// Uses the `outgoing_by_label` composite index for fast lookup instead of
330    /// iterating through all outgoing edges (EPIC-019 US-003).
331    #[must_use]
332    pub fn get_outgoing_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
333        self.resolve_edge_ids(self.outgoing_by_label.get(&(node_id, label.to_string())))
334    }
335
336    /// Gets all edges with a specific label - O(k) where k = result count.
337    ///
338    /// Uses the `by_label` secondary index for fast lookup (EPIC-019 US-003).
339    #[must_use]
340    pub fn get_edges_by_label(&self, label: &str) -> Vec<&GraphEdge> {
341        self.resolve_edge_ids(self.by_label.get(label))
342    }
343
344    /// Resolves edge IDs from an index entry into edge references.
345    ///
346    /// Shared lookup pattern used by `get_outgoing`, `get_incoming`,
347    /// `get_outgoing_by_label`, and `get_edges_by_label`.
348    #[inline]
349    fn resolve_edge_ids(&self, ids: Option<&Vec<u64>>) -> Vec<&GraphEdge> {
350        ids.map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
351            .unwrap_or_default()
352    }
353
354    /// Gets incoming edges filtered by label.
355    #[must_use]
356    pub fn get_incoming_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
357        self.get_incoming(node_id)
358            .into_iter()
359            .filter(|e| e.label() == label)
360            .collect()
361    }
362
363    /// Checks if an edge with the given ID exists.
364    #[must_use]
365    pub fn contains_edge(&self, edge_id: u64) -> bool {
366        self.edges.contains_key(&edge_id)
367    }
368
369    /// Returns the number of edges in the store.
370    #[must_use]
371    pub fn len(&self) -> usize {
372        self.edges.len()
373    }
374
375    /// Returns true if the store contains no edges.
376    #[must_use]
377    pub fn is_empty(&self) -> bool {
378        self.edges.is_empty()
379    }
380
381    /// Returns all edges in the store.
382    #[must_use]
383    pub fn all_edges(&self) -> Vec<&GraphEdge> {
384        self.edges.values().collect()
385    }
386
387    /// Returns all outgoing source node IDs (keys of the outgoing index).
388    ///
389    /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) to
390    /// enumerate source nodes for CSR construction.
391    #[must_use]
392    pub(crate) fn outgoing_keys(&self) -> Vec<u64> {
393        self.outgoing.keys().copied().collect()
394    }
395
396    /// Returns the total number of outgoing edge entries across all nodes.
397    ///
398    /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) for
399    /// pre-allocation.
400    #[must_use]
401    pub(crate) fn total_outgoing_edges(&self) -> usize {
402        self.outgoing.values().map(Vec::len).sum()
403    }
404
405    /// Invokes `f` for each outgoing edge from `node_id` (by edge object).
406    ///
407    /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) to
408    /// iterate edges without exposing internal index structure.
409    pub(crate) fn for_each_outgoing_edge<F: FnMut(&GraphEdge)>(&self, node_id: u64, mut f: F) {
410        if let Some(ids) = self.outgoing.get(&node_id) {
411            for id in ids {
412                if let Some(edge) = self.edges.get(id) {
413                    f(edge);
414                }
415            }
416        }
417    }
418}
419
420// Edge removal operations are in `edge_removal.rs`.
421// CSR snapshot methods and persistence are in `edge_persistence.rs`.