velesdb_core/collection/graph/edge.rs
1//! Graph edge types and storage for knowledge graph relationships.
2//!
3//! This module provides:
4//! - `GraphEdge`: A typed relationship between nodes with properties
5//! - `EdgeStore`: Bidirectional index for efficient edge traversal
6//!
7//! CSR snapshot types are in [`super::csr_snapshot`].
8//!
9//! # Edge Removal Semantics
10//!
11//! During edge removal, the internal indexes may be temporarily inconsistent
12//! while the operation is in progress. The final state is always consistent.
13//! For concurrent access, use `ConcurrentEdgeStore` instead.
14
15use super::csr_snapshot::CsrSnapshot;
16use crate::error::{Error, Result};
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use std::collections::HashMap;
20
21/// A directed edge (relationship) in the knowledge graph.
22///
23/// Edges connect nodes and can have a label (type) and properties.
24///
25/// # Example
26///
27/// ```rust,ignore
28/// use velesdb_core::collection::graph::GraphEdge;
29/// use serde_json::json;
30/// use std::collections::HashMap;
31///
32/// let mut props = HashMap::new();
33/// props.insert("since".to_string(), json!("2020-01-01"));
34///
35/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")
36/// .with_properties(props);
37/// ```
38#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
39pub struct GraphEdge {
40 id: u64,
41 source: u64,
42 target: u64,
43 label: String,
44 properties: HashMap<String, Value>,
45}
46
47impl GraphEdge {
48 /// Creates a new edge with the given ID, endpoints, and label.
49 ///
50 /// # Errors
51 ///
52 /// Returns `Error::InvalidEdgeLabel` if the label is empty or whitespace-only.
53 pub fn new(id: u64, source: u64, target: u64, label: &str) -> Result<Self> {
54 let trimmed = label.trim();
55 if trimmed.is_empty() {
56 return Err(Error::InvalidEdgeLabel(
57 "Edge label cannot be empty or whitespace-only".to_string(),
58 ));
59 }
60 Ok(Self {
61 id,
62 source,
63 target,
64 label: trimmed.to_string(),
65 properties: HashMap::new(),
66 })
67 }
68
69 /// Adds properties to this edge (builder pattern).
70 #[must_use]
71 pub fn with_properties(mut self, properties: HashMap<String, Value>) -> Self {
72 self.properties = properties;
73 self
74 }
75
76 /// Returns the edge ID.
77 #[must_use]
78 pub fn id(&self) -> u64 {
79 self.id
80 }
81
82 /// Returns the source node ID.
83 #[must_use]
84 pub fn source(&self) -> u64 {
85 self.source
86 }
87
88 /// Returns the target node ID.
89 #[must_use]
90 pub fn target(&self) -> u64 {
91 self.target
92 }
93
94 /// Returns the edge label (relationship type).
95 #[must_use]
96 pub fn label(&self) -> &str {
97 &self.label
98 }
99
100 /// Returns all properties of this edge.
101 #[must_use]
102 pub fn properties(&self) -> &HashMap<String, Value> {
103 &self.properties
104 }
105
106 /// Returns a specific property value, if it exists.
107 #[must_use]
108 pub fn property(&self, name: &str) -> Option<&Value> {
109 self.properties.get(name)
110 }
111}
112
113/// Storage for graph edges with bidirectional indexing.
114///
115/// Provides O(1) access to edges by ID and O(degree) access to
116/// outgoing/incoming edges for any node.
117///
118/// # Index Structure (EPIC-019 US-003)
119///
120/// - `by_label`: Secondary index for O(k) label-based queries
121/// - `outgoing_by_label`: Composite index (source, label) for O(k) filtered traversal
122///
123/// # CSR Snapshot (G1)
124///
125/// After loading from disk or after explicit `build_read_snapshot()`, the
126/// `csr_snapshot` field provides zero-copy `&[u64]` access to neighbor
127/// target IDs and edge IDs. Writes invalidate the snapshot automatically.
128#[derive(Debug, Default, Serialize, Deserialize)]
129pub struct EdgeStore {
130 /// All edges indexed by ID
131 pub(super) edges: HashMap<u64, GraphEdge>,
132 /// Outgoing edges: source_id -> Vec<edge_id>
133 pub(super) outgoing: HashMap<u64, Vec<u64>>,
134 /// Incoming edges: target_id -> Vec<edge_id>
135 pub(super) incoming: HashMap<u64, Vec<u64>>,
136 /// Secondary index: label -> Vec<edge_id> for fast label queries
137 pub(super) by_label: HashMap<String, Vec<u64>>,
138 /// Composite index: (source_id, label) -> Vec<edge_id> for fast filtered traversal
139 pub(super) outgoing_by_label: HashMap<(u64, String), Vec<u64>>,
140 /// Zero-copy CSR snapshot for BFS traversal (G1).
141 /// Built on-demand via `build_read_snapshot()`, invalidated by writes.
142 #[serde(skip)]
143 pub(super) csr_snapshot: Option<CsrSnapshot>,
144}
145
146impl EdgeStore {
147 /// Creates a new empty edge store.
148 #[must_use]
149 pub fn new() -> Self {
150 Self::default()
151 }
152
153 /// Creates an edge store with pre-allocated capacity for better performance.
154 ///
155 /// Pre-allocating reduces memory reallocation overhead when inserting many edges.
156 /// With 10M edges, this can reduce peak memory usage by ~2x and improve insert throughput.
157 ///
158 /// Note: when accessed through the sharded `ConcurrentEdgeStore`, an edge
159 /// whose endpoints hash to different shards is stored in **both** shards
160 /// (outgoing + incoming halves), which offsets part of that saving — with
161 /// many shards this applies to nearly all edges.
162 ///
163 /// # Arguments
164 ///
165 /// * `expected_edges` - Expected number of edges to store
166 /// * `expected_nodes` - Expected number of unique nodes (sources + targets)
167 ///
168 /// # Example
169 ///
170 /// ```rust,ignore
171 /// // For a graph with ~1M edges and ~100K nodes
172 /// let store = EdgeStore::with_capacity(1_000_000, 100_000);
173 /// ```
174 #[must_use]
175 pub fn with_capacity(expected_edges: usize, expected_nodes: usize) -> Self {
176 // Estimate ~10 unique labels typical for knowledge graphs
177 let expected_labels = 10usize;
178 // Use saturating_mul to prevent overflow for extreme inputs
179 let outgoing_by_label_cap = expected_nodes
180 .saturating_mul(expected_labels)
181 .saturating_div(10);
182 Self {
183 edges: HashMap::with_capacity(expected_edges),
184 outgoing: HashMap::with_capacity(expected_nodes),
185 incoming: HashMap::with_capacity(expected_nodes),
186 by_label: HashMap::with_capacity(expected_labels),
187 outgoing_by_label: HashMap::with_capacity(outgoing_by_label_cap),
188 csr_snapshot: None,
189 }
190 }
191
192 /// Adds an edge to the store.
193 ///
194 /// Creates bidirectional index entries for efficient traversal.
195 /// Also maintains label-based secondary indices (EPIC-019 US-003).
196 ///
197 /// # Errors
198 ///
199 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
200 pub fn add_edge(&mut self, edge: GraphEdge) -> Result<()> {
201 self.insert_edge(edge, true, true)
202 }
203
204 /// Adds an edge with only the outgoing index (for cross-shard storage).
205 ///
206 /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
207 /// The edge is stored and indexed by source node only.
208 ///
209 /// # Errors
210 ///
211 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
212 pub fn add_edge_outgoing_only(&mut self, edge: GraphEdge) -> Result<()> {
213 self.insert_edge(edge, true, false)
214 }
215
216 /// Adds an edge with only the incoming index (for cross-shard storage).
217 ///
218 /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
219 /// The edge is stored and indexed by target node only.
220 /// Note: Label indices are maintained by the source shard in `ConcurrentEdgeStore`.
221 ///
222 /// # Errors
223 ///
224 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
225 pub fn add_edge_incoming_only(&mut self, edge: GraphEdge) -> Result<()> {
226 self.insert_edge(edge, false, true)
227 }
228
229 /// Shared implementation for all `add_edge*` variants.
230 ///
231 /// Validates uniqueness, populates the requested directional indices,
232 /// and stores the edge. Label indices (`by_label`, `outgoing_by_label`)
233 /// are maintained only when `index_outgoing` is `true` (source shard
234 /// owns label indices in the concurrent model).
235 fn insert_edge(
236 &mut self,
237 edge: GraphEdge,
238 index_outgoing: bool,
239 index_incoming: bool,
240 ) -> Result<()> {
241 let id = edge.id();
242 if self.edges.contains_key(&id) {
243 return Err(Error::EdgeExists(id));
244 }
245
246 if index_outgoing {
247 let source = edge.source();
248 let label = edge.label().to_string();
249 self.outgoing.entry(source).or_default().push(id);
250 // Label indices are owned by the source shard (US-003)
251 self.by_label.entry(label.clone()).or_default().push(id);
252 self.outgoing_by_label
253 .entry((source, label))
254 .or_default()
255 .push(id);
256 }
257
258 if index_incoming {
259 self.incoming.entry(edge.target()).or_default().push(id);
260 }
261
262 self.edges.insert(id, edge);
263 // Invalidate CSR snapshot — writes make it stale (G1).
264 self.csr_snapshot = None;
265 Ok(())
266 }
267
268 /// Returns the total number of edges in the store.
269 #[must_use]
270 pub fn edge_count(&self) -> usize {
271 self.edges.len()
272 }
273
274 /// Returns the count of edges where this shard is the source (for accurate cross-shard counting).
275 #[must_use]
276 pub fn outgoing_edge_count(&self) -> usize {
277 self.outgoing.values().map(Vec::len).sum()
278 }
279
280 /// Gets an edge by its ID.
281 #[must_use]
282 pub fn get_edge(&self, id: u64) -> Option<&GraphEdge> {
283 self.edges.get(&id)
284 }
285
286 /// Gets all outgoing edges from a node.
287 #[must_use]
288 pub fn get_outgoing(&self, node_id: u64) -> Vec<&GraphEdge> {
289 self.resolve_edge_ids(self.outgoing.get(&node_id))
290 }
291
292 /// Invokes `f` for each outgoing edge from `node_id` without allocating a `Vec`.
293 ///
294 /// Prefer this over [`get_outgoing`](Self::get_outgoing) in hot loops (e.g. BFS
295 /// frontiers) where the caller processes edges inline rather than collecting them.
296 #[inline]
297 pub fn for_each_outgoing<F: FnMut(&GraphEdge)>(&self, node_id: u64, mut f: F) {
298 if let Some(ids) = self.outgoing.get(&node_id) {
299 for id in ids {
300 if let Some(edge) = self.edges.get(id) {
301 f(edge);
302 }
303 }
304 }
305 }
306
307 /// Returns the number of outgoing edges from `node_id` without materializing them.
308 #[must_use]
309 #[inline]
310 pub fn outgoing_degree(&self, node_id: u64) -> usize {
311 self.outgoing.get(&node_id).map_or(0, Vec::len)
312 }
313
314 /// Returns the number of incoming edges to `node_id` without materializing them.
315 #[must_use]
316 #[inline]
317 pub fn incoming_degree(&self, node_id: u64) -> usize {
318 self.incoming.get(&node_id).map_or(0, Vec::len)
319 }
320
321 /// Gets all incoming edges to a node.
322 #[must_use]
323 pub fn get_incoming(&self, node_id: u64) -> Vec<&GraphEdge> {
324 self.resolve_edge_ids(self.incoming.get(&node_id))
325 }
326
327 /// Gets outgoing edges filtered by label using composite index - O(k) where k = result count.
328 ///
329 /// Uses the `outgoing_by_label` composite index for fast lookup instead of
330 /// iterating through all outgoing edges (EPIC-019 US-003).
331 #[must_use]
332 pub fn get_outgoing_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
333 self.resolve_edge_ids(self.outgoing_by_label.get(&(node_id, label.to_string())))
334 }
335
336 /// Gets all edges with a specific label - O(k) where k = result count.
337 ///
338 /// Uses the `by_label` secondary index for fast lookup (EPIC-019 US-003).
339 #[must_use]
340 pub fn get_edges_by_label(&self, label: &str) -> Vec<&GraphEdge> {
341 self.resolve_edge_ids(self.by_label.get(label))
342 }
343
344 /// Resolves edge IDs from an index entry into edge references.
345 ///
346 /// Shared lookup pattern used by `get_outgoing`, `get_incoming`,
347 /// `get_outgoing_by_label`, and `get_edges_by_label`.
348 #[inline]
349 fn resolve_edge_ids(&self, ids: Option<&Vec<u64>>) -> Vec<&GraphEdge> {
350 ids.map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
351 .unwrap_or_default()
352 }
353
354 /// Gets incoming edges filtered by label.
355 #[must_use]
356 pub fn get_incoming_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
357 self.get_incoming(node_id)
358 .into_iter()
359 .filter(|e| e.label() == label)
360 .collect()
361 }
362
363 /// Checks if an edge with the given ID exists.
364 #[must_use]
365 pub fn contains_edge(&self, edge_id: u64) -> bool {
366 self.edges.contains_key(&edge_id)
367 }
368
369 /// Returns the number of edges in the store.
370 #[must_use]
371 pub fn len(&self) -> usize {
372 self.edges.len()
373 }
374
375 /// Returns true if the store contains no edges.
376 #[must_use]
377 pub fn is_empty(&self) -> bool {
378 self.edges.is_empty()
379 }
380
381 /// Returns all edges in the store.
382 #[must_use]
383 pub fn all_edges(&self) -> Vec<&GraphEdge> {
384 self.edges.values().collect()
385 }
386
387 /// Returns all outgoing source node IDs (keys of the outgoing index).
388 ///
389 /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) to
390 /// enumerate source nodes for CSR construction.
391 #[must_use]
392 pub(crate) fn outgoing_keys(&self) -> Vec<u64> {
393 self.outgoing.keys().copied().collect()
394 }
395
396 /// Returns the total number of outgoing edge entries across all nodes.
397 ///
398 /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) for
399 /// pre-allocation.
400 #[must_use]
401 pub(crate) fn total_outgoing_edges(&self) -> usize {
402 self.outgoing.values().map(Vec::len).sum()
403 }
404
405 /// Invokes `f` for each outgoing edge from `node_id` (by edge object).
406 ///
407 /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) to
408 /// iterate edges without exposing internal index structure.
409 pub(crate) fn for_each_outgoing_edge<F: FnMut(&GraphEdge)>(&self, node_id: u64, mut f: F) {
410 if let Some(ids) = self.outgoing.get(&node_id) {
411 for id in ids {
412 if let Some(edge) = self.edges.get(id) {
413 f(edge);
414 }
415 }
416 }
417 }
418}
419
420// Edge removal operations are in `edge_removal.rs`.
421// CSR snapshot methods and persistence are in `edge_persistence.rs`.