velesdb_core/collection/graph/edge.rs
1//! Graph edge types and storage for knowledge graph relationships.
2//!
3//! This module provides:
4//! - `GraphEdge`: A typed relationship between nodes with properties
5//! - `EdgeStore`: Bidirectional index for efficient edge traversal
6//!
7//! CSR snapshot types are in [`super::csr_snapshot`].
8//!
9//! # Edge Removal Semantics
10//!
11//! During edge removal, the internal indexes may be temporarily inconsistent
12//! while the operation is in progress. The final state is always consistent.
13//! For concurrent access, use `ConcurrentEdgeStore` instead.
14
15use super::csr_snapshot::CsrSnapshot;
16use crate::error::{Error, Result};
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use std::collections::HashMap;
20
21/// A directed edge (relationship) in the knowledge graph.
22///
23/// Edges connect nodes and can have a label (type) and properties.
24///
25/// # Example
26///
27/// ```rust,ignore
28/// use velesdb_core::collection::graph::GraphEdge;
29/// use serde_json::json;
30/// use std::collections::HashMap;
31///
32/// let mut props = HashMap::new();
33/// props.insert("since".to_string(), json!("2020-01-01"));
34///
35/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")
36/// .with_properties(props);
37/// ```
38#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
39pub struct GraphEdge {
40 id: u64,
41 source: u64,
42 target: u64,
43 label: String,
44 properties: HashMap<String, Value>,
45}
46
47impl GraphEdge {
48 /// Creates a new edge with the given ID, endpoints, and label.
49 ///
50 /// # Errors
51 ///
52 /// Returns `Error::InvalidEdgeLabel` if the label is empty or whitespace-only.
53 pub fn new(id: u64, source: u64, target: u64, label: &str) -> Result<Self> {
54 let trimmed = label.trim();
55 if trimmed.is_empty() {
56 return Err(Error::InvalidEdgeLabel(
57 "Edge label cannot be empty or whitespace-only".to_string(),
58 ));
59 }
60 Ok(Self {
61 id,
62 source,
63 target,
64 label: trimmed.to_string(),
65 properties: HashMap::new(),
66 })
67 }
68
69 /// Adds properties to this edge (builder pattern).
70 #[must_use]
71 pub fn with_properties(mut self, properties: HashMap<String, Value>) -> Self {
72 self.properties = properties;
73 self
74 }
75
76 /// Returns the edge ID.
77 #[must_use]
78 pub fn id(&self) -> u64 {
79 self.id
80 }
81
82 /// Returns the source node ID.
83 #[must_use]
84 pub fn source(&self) -> u64 {
85 self.source
86 }
87
88 /// Returns the target node ID.
89 #[must_use]
90 pub fn target(&self) -> u64 {
91 self.target
92 }
93
94 /// Returns the edge label (relationship type).
95 #[must_use]
96 pub fn label(&self) -> &str {
97 &self.label
98 }
99
100 /// Returns all properties of this edge.
101 #[must_use]
102 pub fn properties(&self) -> &HashMap<String, Value> {
103 &self.properties
104 }
105
106 /// Returns a specific property value, if it exists.
107 #[must_use]
108 pub fn property(&self, name: &str) -> Option<&Value> {
109 self.properties.get(name)
110 }
111}
112
113/// Storage for graph edges with bidirectional indexing.
114///
115/// Provides O(1) access to edges by ID and O(degree) access to
116/// outgoing/incoming edges for any node.
117///
118/// # Index Structure (EPIC-019 US-003)
119///
120/// - `by_label`: Secondary index for O(k) label-based queries
121/// - `outgoing_by_label`: Composite index (source, label) for O(k) filtered traversal
122///
123/// # CSR Snapshot (G1)
124///
125/// After loading from disk or after explicit `build_read_snapshot()`, the
126/// `csr_snapshot` field provides zero-copy `&[u64]` access to neighbor
127/// target IDs and edge IDs. Writes invalidate the snapshot automatically.
128#[derive(Debug, Default, Serialize, Deserialize)]
129pub struct EdgeStore {
130 /// All edges indexed by ID
131 pub(super) edges: HashMap<u64, GraphEdge>,
132 /// Outgoing edges: source_id -> Vec<edge_id>
133 pub(super) outgoing: HashMap<u64, Vec<u64>>,
134 /// Incoming edges: target_id -> Vec<edge_id>
135 pub(super) incoming: HashMap<u64, Vec<u64>>,
136 /// Secondary index: label -> Vec<edge_id> for fast label queries
137 pub(super) by_label: HashMap<String, Vec<u64>>,
138 /// Composite index: (source_id, label) -> Vec<edge_id> for fast filtered traversal
139 pub(super) outgoing_by_label: HashMap<(u64, String), Vec<u64>>,
140 /// Zero-copy CSR snapshot for BFS traversal (G1).
141 /// Built on-demand via `build_read_snapshot()`, invalidated by writes.
142 #[serde(skip)]
143 pub(super) csr_snapshot: Option<CsrSnapshot>,
144}
145
146impl EdgeStore {
147 /// Creates a new empty edge store.
148 #[must_use]
149 pub fn new() -> Self {
150 Self::default()
151 }
152
153 /// Creates an edge store with pre-allocated capacity for better performance.
154 ///
155 /// Pre-allocating reduces memory reallocation overhead when inserting many edges.
156 /// With 10M edges, this can reduce peak memory usage by ~2x and improve insert throughput.
157 ///
158 /// # Arguments
159 ///
160 /// * `expected_edges` - Expected number of edges to store
161 /// * `expected_nodes` - Expected number of unique nodes (sources + targets)
162 ///
163 /// # Example
164 ///
165 /// ```rust,ignore
166 /// // For a graph with ~1M edges and ~100K nodes
167 /// let store = EdgeStore::with_capacity(1_000_000, 100_000);
168 /// ```
169 #[must_use]
170 pub fn with_capacity(expected_edges: usize, expected_nodes: usize) -> Self {
171 // Estimate ~10 unique labels typical for knowledge graphs
172 let expected_labels = 10usize;
173 // Use saturating_mul to prevent overflow for extreme inputs
174 let outgoing_by_label_cap = expected_nodes
175 .saturating_mul(expected_labels)
176 .saturating_div(10);
177 Self {
178 edges: HashMap::with_capacity(expected_edges),
179 outgoing: HashMap::with_capacity(expected_nodes),
180 incoming: HashMap::with_capacity(expected_nodes),
181 by_label: HashMap::with_capacity(expected_labels),
182 outgoing_by_label: HashMap::with_capacity(outgoing_by_label_cap),
183 csr_snapshot: None,
184 }
185 }
186
187 /// Adds an edge to the store.
188 ///
189 /// Creates bidirectional index entries for efficient traversal.
190 /// Also maintains label-based secondary indices (EPIC-019 US-003).
191 ///
192 /// # Errors
193 ///
194 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
195 pub fn add_edge(&mut self, edge: GraphEdge) -> Result<()> {
196 self.insert_edge(edge, true, true)
197 }
198
199 /// Adds an edge with only the outgoing index (for cross-shard storage).
200 ///
201 /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
202 /// The edge is stored and indexed by source node only.
203 ///
204 /// # Errors
205 ///
206 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
207 pub fn add_edge_outgoing_only(&mut self, edge: GraphEdge) -> Result<()> {
208 self.insert_edge(edge, true, false)
209 }
210
211 /// Adds an edge with only the incoming index (for cross-shard storage).
212 ///
213 /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
214 /// The edge is stored and indexed by target node only.
215 /// Note: Label indices are maintained by the source shard in `ConcurrentEdgeStore`.
216 ///
217 /// # Errors
218 ///
219 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
220 pub fn add_edge_incoming_only(&mut self, edge: GraphEdge) -> Result<()> {
221 self.insert_edge(edge, false, true)
222 }
223
224 /// Shared implementation for all `add_edge*` variants.
225 ///
226 /// Validates uniqueness, populates the requested directional indices,
227 /// and stores the edge. Label indices (`by_label`, `outgoing_by_label`)
228 /// are maintained only when `index_outgoing` is `true` (source shard
229 /// owns label indices in the concurrent model).
230 fn insert_edge(
231 &mut self,
232 edge: GraphEdge,
233 index_outgoing: bool,
234 index_incoming: bool,
235 ) -> Result<()> {
236 let id = edge.id();
237 if self.edges.contains_key(&id) {
238 return Err(Error::EdgeExists(id));
239 }
240
241 if index_outgoing {
242 let source = edge.source();
243 let label = edge.label().to_string();
244 self.outgoing.entry(source).or_default().push(id);
245 // Label indices are owned by the source shard (US-003)
246 self.by_label.entry(label.clone()).or_default().push(id);
247 self.outgoing_by_label
248 .entry((source, label))
249 .or_default()
250 .push(id);
251 }
252
253 if index_incoming {
254 self.incoming.entry(edge.target()).or_default().push(id);
255 }
256
257 self.edges.insert(id, edge);
258 // Invalidate CSR snapshot — writes make it stale (G1).
259 self.csr_snapshot = None;
260 Ok(())
261 }
262
263 /// Returns the total number of edges in the store.
264 #[must_use]
265 pub fn edge_count(&self) -> usize {
266 self.edges.len()
267 }
268
269 /// Returns the count of edges where this shard is the source (for accurate cross-shard counting).
270 #[must_use]
271 pub fn outgoing_edge_count(&self) -> usize {
272 self.outgoing.values().map(Vec::len).sum()
273 }
274
275 /// Gets an edge by its ID.
276 #[must_use]
277 pub fn get_edge(&self, id: u64) -> Option<&GraphEdge> {
278 self.edges.get(&id)
279 }
280
281 /// Gets all outgoing edges from a node.
282 #[must_use]
283 pub fn get_outgoing(&self, node_id: u64) -> Vec<&GraphEdge> {
284 self.resolve_edge_ids(self.outgoing.get(&node_id))
285 }
286
287 /// Invokes `f` for each outgoing edge from `node_id` without allocating a `Vec`.
288 ///
289 /// Prefer this over [`get_outgoing`](Self::get_outgoing) in hot loops (e.g. BFS
290 /// frontiers) where the caller processes edges inline rather than collecting them.
291 #[inline]
292 pub fn for_each_outgoing<F: FnMut(&GraphEdge)>(&self, node_id: u64, mut f: F) {
293 if let Some(ids) = self.outgoing.get(&node_id) {
294 for id in ids {
295 if let Some(edge) = self.edges.get(id) {
296 f(edge);
297 }
298 }
299 }
300 }
301
302 /// Returns the number of outgoing edges from `node_id` without materializing them.
303 #[must_use]
304 #[inline]
305 pub fn outgoing_degree(&self, node_id: u64) -> usize {
306 self.outgoing.get(&node_id).map_or(0, Vec::len)
307 }
308
309 /// Returns the number of incoming edges to `node_id` without materializing them.
310 #[must_use]
311 #[inline]
312 pub fn incoming_degree(&self, node_id: u64) -> usize {
313 self.incoming.get(&node_id).map_or(0, Vec::len)
314 }
315
316 /// Gets all incoming edges to a node.
317 #[must_use]
318 pub fn get_incoming(&self, node_id: u64) -> Vec<&GraphEdge> {
319 self.resolve_edge_ids(self.incoming.get(&node_id))
320 }
321
322 /// Gets outgoing edges filtered by label using composite index - O(k) where k = result count.
323 ///
324 /// Uses the `outgoing_by_label` composite index for fast lookup instead of
325 /// iterating through all outgoing edges (EPIC-019 US-003).
326 #[must_use]
327 pub fn get_outgoing_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
328 self.resolve_edge_ids(self.outgoing_by_label.get(&(node_id, label.to_string())))
329 }
330
331 /// Gets all edges with a specific label - O(k) where k = result count.
332 ///
333 /// Uses the `by_label` secondary index for fast lookup (EPIC-019 US-003).
334 #[must_use]
335 pub fn get_edges_by_label(&self, label: &str) -> Vec<&GraphEdge> {
336 self.resolve_edge_ids(self.by_label.get(label))
337 }
338
339 /// Resolves edge IDs from an index entry into edge references.
340 ///
341 /// Shared lookup pattern used by `get_outgoing`, `get_incoming`,
342 /// `get_outgoing_by_label`, and `get_edges_by_label`.
343 #[inline]
344 fn resolve_edge_ids(&self, ids: Option<&Vec<u64>>) -> Vec<&GraphEdge> {
345 ids.map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
346 .unwrap_or_default()
347 }
348
349 /// Gets incoming edges filtered by label.
350 #[must_use]
351 pub fn get_incoming_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
352 self.get_incoming(node_id)
353 .into_iter()
354 .filter(|e| e.label() == label)
355 .collect()
356 }
357
358 /// Checks if an edge with the given ID exists.
359 #[must_use]
360 pub fn contains_edge(&self, edge_id: u64) -> bool {
361 self.edges.contains_key(&edge_id)
362 }
363
364 /// Returns the number of edges in the store.
365 #[must_use]
366 pub fn len(&self) -> usize {
367 self.edges.len()
368 }
369
370 /// Returns true if the store contains no edges.
371 #[must_use]
372 pub fn is_empty(&self) -> bool {
373 self.edges.is_empty()
374 }
375
376 /// Returns all edges in the store.
377 #[must_use]
378 pub fn all_edges(&self) -> Vec<&GraphEdge> {
379 self.edges.values().collect()
380 }
381
382 /// Returns all outgoing source node IDs (keys of the outgoing index).
383 ///
384 /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) to
385 /// enumerate source nodes for CSR construction.
386 #[must_use]
387 pub(crate) fn outgoing_keys(&self) -> Vec<u64> {
388 self.outgoing.keys().copied().collect()
389 }
390
391 /// Returns the total number of outgoing edge entries across all nodes.
392 ///
393 /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) for
394 /// pre-allocation.
395 #[must_use]
396 pub(crate) fn total_outgoing_edges(&self) -> usize {
397 self.outgoing.values().map(Vec::len).sum()
398 }
399
400 /// Invokes `f` for each outgoing edge from `node_id` (by edge object).
401 ///
402 /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) to
403 /// iterate edges without exposing internal index structure.
404 pub(crate) fn for_each_outgoing_edge<F: FnMut(&GraphEdge)>(&self, node_id: u64, mut f: F) {
405 if let Some(ids) = self.outgoing.get(&node_id) {
406 for id in ids {
407 if let Some(edge) = self.edges.get(id) {
408 f(edge);
409 }
410 }
411 }
412 }
413}
414
415// Edge removal operations are in `edge_removal.rs`.
416// CSR snapshot methods and persistence are in `edge_persistence.rs`.