Skip to main content

velesdb_core/collection/graph/
edge.rs

1//! Graph edge types and storage for knowledge graph relationships.
2//!
3//! This module provides:
4//! - `GraphEdge`: A typed relationship between nodes with properties
5//! - `EdgeStore`: Bidirectional index for efficient edge traversal
6//!
7//! CSR snapshot types are in [`super::csr_snapshot`].
8//!
9//! # Edge Removal Semantics
10//!
11//! During edge removal, the internal indexes may be temporarily inconsistent
12//! while the operation is in progress. The final state is always consistent.
13//! For concurrent access, use `ConcurrentEdgeStore` instead.
14
15use super::csr_snapshot::CsrSnapshot;
16use crate::error::{Error, Result};
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use std::collections::HashMap;
20
21/// A directed edge (relationship) in the knowledge graph.
22///
23/// Edges connect nodes and can have a label (type) and properties.
24///
25/// # Example
26///
27/// ```rust,ignore
28/// use velesdb_core::collection::graph::GraphEdge;
29/// use serde_json::json;
30/// use std::collections::HashMap;
31///
32/// let mut props = HashMap::new();
33/// props.insert("since".to_string(), json!("2020-01-01"));
34///
35/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")
36///     .with_properties(props);
37/// ```
38#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
39pub struct GraphEdge {
40    id: u64,
41    source: u64,
42    target: u64,
43    label: String,
44    properties: HashMap<String, Value>,
45}
46
47impl GraphEdge {
48    /// Creates a new edge with the given ID, endpoints, and label.
49    ///
50    /// # Errors
51    ///
52    /// Returns `Error::InvalidEdgeLabel` if the label is empty or whitespace-only.
53    pub fn new(id: u64, source: u64, target: u64, label: &str) -> Result<Self> {
54        let trimmed = label.trim();
55        if trimmed.is_empty() {
56            return Err(Error::InvalidEdgeLabel(
57                "Edge label cannot be empty or whitespace-only".to_string(),
58            ));
59        }
60        Ok(Self {
61            id,
62            source,
63            target,
64            label: trimmed.to_string(),
65            properties: HashMap::new(),
66        })
67    }
68
69    /// Adds properties to this edge (builder pattern).
70    #[must_use]
71    pub fn with_properties(mut self, properties: HashMap<String, Value>) -> Self {
72        self.properties = properties;
73        self
74    }
75
76    /// Returns the edge ID.
77    #[must_use]
78    pub fn id(&self) -> u64 {
79        self.id
80    }
81
82    /// Returns the source node ID.
83    #[must_use]
84    pub fn source(&self) -> u64 {
85        self.source
86    }
87
88    /// Returns the target node ID.
89    #[must_use]
90    pub fn target(&self) -> u64 {
91        self.target
92    }
93
94    /// Returns the edge label (relationship type).
95    #[must_use]
96    pub fn label(&self) -> &str {
97        &self.label
98    }
99
100    /// Returns all properties of this edge.
101    #[must_use]
102    pub fn properties(&self) -> &HashMap<String, Value> {
103        &self.properties
104    }
105
106    /// Returns a specific property value, if it exists.
107    #[must_use]
108    pub fn property(&self, name: &str) -> Option<&Value> {
109        self.properties.get(name)
110    }
111}
112
113/// Storage for graph edges with bidirectional indexing.
114///
115/// Provides O(1) access to edges by ID and O(degree) access to
116/// outgoing/incoming edges for any node.
117///
118/// # Index Structure (EPIC-019 US-003)
119///
120/// - `by_label`: Secondary index for O(k) label-based queries
121/// - `outgoing_by_label`: Composite index (source, label) for O(k) filtered traversal
122///
123/// # CSR Snapshot (G1)
124///
125/// After loading from disk or after explicit `build_read_snapshot()`, the
126/// `csr_snapshot` field provides zero-copy `&[u64]` access to neighbor
127/// target IDs and edge IDs. Writes invalidate the snapshot automatically.
128#[derive(Debug, Default, Serialize, Deserialize)]
129pub struct EdgeStore {
130    /// All edges indexed by ID
131    pub(super) edges: HashMap<u64, GraphEdge>,
132    /// Outgoing edges: source_id -> Vec<edge_id>
133    pub(super) outgoing: HashMap<u64, Vec<u64>>,
134    /// Incoming edges: target_id -> Vec<edge_id>
135    pub(super) incoming: HashMap<u64, Vec<u64>>,
136    /// Secondary index: label -> Vec<edge_id> for fast label queries
137    pub(super) by_label: HashMap<String, Vec<u64>>,
138    /// Composite index: (source_id, label) -> Vec<edge_id> for fast filtered traversal
139    pub(super) outgoing_by_label: HashMap<(u64, String), Vec<u64>>,
140    /// Zero-copy CSR snapshot for BFS traversal (G1).
141    /// Built on-demand via `build_read_snapshot()`, invalidated by writes.
142    #[serde(skip)]
143    pub(super) csr_snapshot: Option<CsrSnapshot>,
144}
145
146impl EdgeStore {
147    /// Creates a new empty edge store.
148    #[must_use]
149    pub fn new() -> Self {
150        Self::default()
151    }
152
153    /// Creates an edge store with pre-allocated capacity for better performance.
154    ///
155    /// Pre-allocating reduces memory reallocation overhead when inserting many edges.
156    /// With 10M edges, this can reduce peak memory usage by ~2x and improve insert throughput.
157    ///
158    /// # Arguments
159    ///
160    /// * `expected_edges` - Expected number of edges to store
161    /// * `expected_nodes` - Expected number of unique nodes (sources + targets)
162    ///
163    /// # Example
164    ///
165    /// ```rust,ignore
166    /// // For a graph with ~1M edges and ~100K nodes
167    /// let store = EdgeStore::with_capacity(1_000_000, 100_000);
168    /// ```
169    #[must_use]
170    pub fn with_capacity(expected_edges: usize, expected_nodes: usize) -> Self {
171        // Estimate ~10 unique labels typical for knowledge graphs
172        let expected_labels = 10usize;
173        // Use saturating_mul to prevent overflow for extreme inputs
174        let outgoing_by_label_cap = expected_nodes
175            .saturating_mul(expected_labels)
176            .saturating_div(10);
177        Self {
178            edges: HashMap::with_capacity(expected_edges),
179            outgoing: HashMap::with_capacity(expected_nodes),
180            incoming: HashMap::with_capacity(expected_nodes),
181            by_label: HashMap::with_capacity(expected_labels),
182            outgoing_by_label: HashMap::with_capacity(outgoing_by_label_cap),
183            csr_snapshot: None,
184        }
185    }
186
187    /// Adds an edge to the store.
188    ///
189    /// Creates bidirectional index entries for efficient traversal.
190    /// Also maintains label-based secondary indices (EPIC-019 US-003).
191    ///
192    /// # Errors
193    ///
194    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
195    pub fn add_edge(&mut self, edge: GraphEdge) -> Result<()> {
196        self.insert_edge(edge, true, true)
197    }
198
199    /// Adds an edge with only the outgoing index (for cross-shard storage).
200    ///
201    /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
202    /// The edge is stored and indexed by source node only.
203    ///
204    /// # Errors
205    ///
206    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
207    pub fn add_edge_outgoing_only(&mut self, edge: GraphEdge) -> Result<()> {
208        self.insert_edge(edge, true, false)
209    }
210
211    /// Adds an edge with only the incoming index (for cross-shard storage).
212    ///
213    /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
214    /// The edge is stored and indexed by target node only.
215    /// Note: Label indices are maintained by the source shard in `ConcurrentEdgeStore`.
216    ///
217    /// # Errors
218    ///
219    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
220    pub fn add_edge_incoming_only(&mut self, edge: GraphEdge) -> Result<()> {
221        self.insert_edge(edge, false, true)
222    }
223
224    /// Shared implementation for all `add_edge*` variants.
225    ///
226    /// Validates uniqueness, populates the requested directional indices,
227    /// and stores the edge. Label indices (`by_label`, `outgoing_by_label`)
228    /// are maintained only when `index_outgoing` is `true` (source shard
229    /// owns label indices in the concurrent model).
230    fn insert_edge(
231        &mut self,
232        edge: GraphEdge,
233        index_outgoing: bool,
234        index_incoming: bool,
235    ) -> Result<()> {
236        let id = edge.id();
237        if self.edges.contains_key(&id) {
238            return Err(Error::EdgeExists(id));
239        }
240
241        if index_outgoing {
242            let source = edge.source();
243            let label = edge.label().to_string();
244            self.outgoing.entry(source).or_default().push(id);
245            // Label indices are owned by the source shard (US-003)
246            self.by_label.entry(label.clone()).or_default().push(id);
247            self.outgoing_by_label
248                .entry((source, label))
249                .or_default()
250                .push(id);
251        }
252
253        if index_incoming {
254            self.incoming.entry(edge.target()).or_default().push(id);
255        }
256
257        self.edges.insert(id, edge);
258        // Invalidate CSR snapshot — writes make it stale (G1).
259        self.csr_snapshot = None;
260        Ok(())
261    }
262
263    /// Returns the total number of edges in the store.
264    #[must_use]
265    pub fn edge_count(&self) -> usize {
266        self.edges.len()
267    }
268
269    /// Returns the count of edges where this shard is the source (for accurate cross-shard counting).
270    #[must_use]
271    pub fn outgoing_edge_count(&self) -> usize {
272        self.outgoing.values().map(Vec::len).sum()
273    }
274
275    /// Gets an edge by its ID.
276    #[must_use]
277    pub fn get_edge(&self, id: u64) -> Option<&GraphEdge> {
278        self.edges.get(&id)
279    }
280
281    /// Gets all outgoing edges from a node.
282    #[must_use]
283    pub fn get_outgoing(&self, node_id: u64) -> Vec<&GraphEdge> {
284        self.resolve_edge_ids(self.outgoing.get(&node_id))
285    }
286
287    /// Invokes `f` for each outgoing edge from `node_id` without allocating a `Vec`.
288    ///
289    /// Prefer this over [`get_outgoing`](Self::get_outgoing) in hot loops (e.g. BFS
290    /// frontiers) where the caller processes edges inline rather than collecting them.
291    #[inline]
292    pub fn for_each_outgoing<F: FnMut(&GraphEdge)>(&self, node_id: u64, mut f: F) {
293        if let Some(ids) = self.outgoing.get(&node_id) {
294            for id in ids {
295                if let Some(edge) = self.edges.get(id) {
296                    f(edge);
297                }
298            }
299        }
300    }
301
302    /// Returns the number of outgoing edges from `node_id` without materializing them.
303    #[must_use]
304    #[inline]
305    pub fn outgoing_degree(&self, node_id: u64) -> usize {
306        self.outgoing.get(&node_id).map_or(0, Vec::len)
307    }
308
309    /// Returns the number of incoming edges to `node_id` without materializing them.
310    #[must_use]
311    #[inline]
312    pub fn incoming_degree(&self, node_id: u64) -> usize {
313        self.incoming.get(&node_id).map_or(0, Vec::len)
314    }
315
316    /// Gets all incoming edges to a node.
317    #[must_use]
318    pub fn get_incoming(&self, node_id: u64) -> Vec<&GraphEdge> {
319        self.resolve_edge_ids(self.incoming.get(&node_id))
320    }
321
322    /// Gets outgoing edges filtered by label using composite index - O(k) where k = result count.
323    ///
324    /// Uses the `outgoing_by_label` composite index for fast lookup instead of
325    /// iterating through all outgoing edges (EPIC-019 US-003).
326    #[must_use]
327    pub fn get_outgoing_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
328        self.resolve_edge_ids(self.outgoing_by_label.get(&(node_id, label.to_string())))
329    }
330
331    /// Gets all edges with a specific label - O(k) where k = result count.
332    ///
333    /// Uses the `by_label` secondary index for fast lookup (EPIC-019 US-003).
334    #[must_use]
335    pub fn get_edges_by_label(&self, label: &str) -> Vec<&GraphEdge> {
336        self.resolve_edge_ids(self.by_label.get(label))
337    }
338
339    /// Resolves edge IDs from an index entry into edge references.
340    ///
341    /// Shared lookup pattern used by `get_outgoing`, `get_incoming`,
342    /// `get_outgoing_by_label`, and `get_edges_by_label`.
343    #[inline]
344    fn resolve_edge_ids(&self, ids: Option<&Vec<u64>>) -> Vec<&GraphEdge> {
345        ids.map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
346            .unwrap_or_default()
347    }
348
349    /// Gets incoming edges filtered by label.
350    #[must_use]
351    pub fn get_incoming_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
352        self.get_incoming(node_id)
353            .into_iter()
354            .filter(|e| e.label() == label)
355            .collect()
356    }
357
358    /// Checks if an edge with the given ID exists.
359    #[must_use]
360    pub fn contains_edge(&self, edge_id: u64) -> bool {
361        self.edges.contains_key(&edge_id)
362    }
363
364    /// Returns the number of edges in the store.
365    #[must_use]
366    pub fn len(&self) -> usize {
367        self.edges.len()
368    }
369
370    /// Returns true if the store contains no edges.
371    #[must_use]
372    pub fn is_empty(&self) -> bool {
373        self.edges.is_empty()
374    }
375
376    /// Returns all edges in the store.
377    #[must_use]
378    pub fn all_edges(&self) -> Vec<&GraphEdge> {
379        self.edges.values().collect()
380    }
381
382    /// Returns all outgoing source node IDs (keys of the outgoing index).
383    ///
384    /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) to
385    /// enumerate source nodes for CSR construction.
386    #[must_use]
387    pub(crate) fn outgoing_keys(&self) -> Vec<u64> {
388        self.outgoing.keys().copied().collect()
389    }
390
391    /// Returns the total number of outgoing edge entries across all nodes.
392    ///
393    /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) for
394    /// pre-allocation.
395    #[must_use]
396    pub(crate) fn total_outgoing_edges(&self) -> usize {
397        self.outgoing.values().map(Vec::len).sum()
398    }
399
400    /// Invokes `f` for each outgoing edge from `node_id` (by edge object).
401    ///
402    /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) to
403    /// iterate edges without exposing internal index structure.
404    pub(crate) fn for_each_outgoing_edge<F: FnMut(&GraphEdge)>(&self, node_id: u64, mut f: F) {
405        if let Some(ids) = self.outgoing.get(&node_id) {
406            for id in ids {
407                if let Some(edge) = self.edges.get(id) {
408                    f(edge);
409                }
410            }
411        }
412    }
413}
414
415// Edge removal operations are in `edge_removal.rs`.
416// CSR snapshot methods and persistence are in `edge_persistence.rs`.