velesdb_core/collection/graph/edge.rs
1//! Graph edge types and storage for knowledge graph relationships.
2//!
3//! This module provides:
4//! - `GraphEdge`: A typed relationship between nodes with properties
5//! - `EdgeStore`: Bidirectional index for efficient edge traversal
6//!
7//! # Edge Removal Semantics
8//!
9//! During edge removal, the internal indexes may be temporarily inconsistent
10//! while the operation is in progress. The final state is always consistent.
11//! For concurrent access, use `ConcurrentEdgeStore` instead.
12
13use super::helpers::PostcardPersistence;
14use crate::error::{Error, Result};
15use serde::{Deserialize, Serialize};
16use serde_json::Value;
17use std::collections::HashMap;
18
19/// A directed edge (relationship) in the knowledge graph.
20///
21/// Edges connect nodes and can have a label (type) and properties.
22///
23/// # Example
24///
25/// ```rust,ignore
26/// use velesdb_core::collection::graph::GraphEdge;
27/// use serde_json::json;
28/// use std::collections::HashMap;
29///
30/// let mut props = HashMap::new();
31/// props.insert("since".to_string(), json!("2020-01-01"));
32///
33/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")
34/// .with_properties(props);
35/// ```
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
37pub struct GraphEdge {
38 id: u64,
39 source: u64,
40 target: u64,
41 label: String,
42 properties: HashMap<String, Value>,
43}
44
45impl GraphEdge {
46 /// Creates a new edge with the given ID, endpoints, and label.
47 ///
48 /// # Errors
49 ///
50 /// Returns `Error::InvalidEdgeLabel` if the label is empty or whitespace-only.
51 pub fn new(id: u64, source: u64, target: u64, label: &str) -> Result<Self> {
52 let trimmed = label.trim();
53 if trimmed.is_empty() {
54 return Err(Error::InvalidEdgeLabel(
55 "Edge label cannot be empty or whitespace-only".to_string(),
56 ));
57 }
58 Ok(Self {
59 id,
60 source,
61 target,
62 label: trimmed.to_string(),
63 properties: HashMap::new(),
64 })
65 }
66
67 /// Adds properties to this edge (builder pattern).
68 #[must_use]
69 pub fn with_properties(mut self, properties: HashMap<String, Value>) -> Self {
70 self.properties = properties;
71 self
72 }
73
74 /// Returns the edge ID.
75 #[must_use]
76 pub fn id(&self) -> u64 {
77 self.id
78 }
79
80 /// Returns the source node ID.
81 #[must_use]
82 pub fn source(&self) -> u64 {
83 self.source
84 }
85
86 /// Returns the target node ID.
87 #[must_use]
88 pub fn target(&self) -> u64 {
89 self.target
90 }
91
92 /// Returns the edge label (relationship type).
93 #[must_use]
94 pub fn label(&self) -> &str {
95 &self.label
96 }
97
98 /// Returns all properties of this edge.
99 #[must_use]
100 pub fn properties(&self) -> &HashMap<String, Value> {
101 &self.properties
102 }
103
104 /// Returns a specific property value, if it exists.
105 #[must_use]
106 pub fn property(&self, name: &str) -> Option<&Value> {
107 self.properties.get(name)
108 }
109}
110
111/// Storage for graph edges with bidirectional indexing.
112///
113/// Provides O(1) access to edges by ID and O(degree) access to
114/// outgoing/incoming edges for any node.
115///
116/// # Index Structure (EPIC-019 US-003)
117///
118/// - `by_label`: Secondary index for O(k) label-based queries
119/// - `outgoing_by_label`: Composite index (source, label) for O(k) filtered traversal
120#[derive(Debug, Default, Serialize, Deserialize)]
121pub struct EdgeStore {
122 /// All edges indexed by ID
123 edges: HashMap<u64, GraphEdge>,
124 /// Outgoing edges: source_id -> Vec<edge_id>
125 outgoing: HashMap<u64, Vec<u64>>,
126 /// Incoming edges: target_id -> Vec<edge_id>
127 incoming: HashMap<u64, Vec<u64>>,
128 /// Secondary index: label -> Vec<edge_id> for fast label queries
129 by_label: HashMap<String, Vec<u64>>,
130 /// Composite index: (source_id, label) -> Vec<edge_id> for fast filtered traversal
131 outgoing_by_label: HashMap<(u64, String), Vec<u64>>,
132}
133
134impl EdgeStore {
135 /// Creates a new empty edge store.
136 #[must_use]
137 pub fn new() -> Self {
138 Self::default()
139 }
140
141 /// Creates an edge store with pre-allocated capacity for better performance.
142 ///
143 /// Pre-allocating reduces memory reallocation overhead when inserting many edges.
144 /// With 10M edges, this can reduce peak memory usage by ~2x and improve insert throughput.
145 ///
146 /// # Arguments
147 ///
148 /// * `expected_edges` - Expected number of edges to store
149 /// * `expected_nodes` - Expected number of unique nodes (sources + targets)
150 ///
151 /// # Example
152 ///
153 /// ```rust,ignore
154 /// // For a graph with ~1M edges and ~100K nodes
155 /// let store = EdgeStore::with_capacity(1_000_000, 100_000);
156 /// ```
157 #[must_use]
158 pub fn with_capacity(expected_edges: usize, expected_nodes: usize) -> Self {
159 // Estimate ~10 unique labels typical for knowledge graphs
160 let expected_labels = 10usize;
161 // Use saturating_mul to prevent overflow for extreme inputs
162 let outgoing_by_label_cap = expected_nodes
163 .saturating_mul(expected_labels)
164 .saturating_div(10);
165 Self {
166 edges: HashMap::with_capacity(expected_edges),
167 outgoing: HashMap::with_capacity(expected_nodes),
168 incoming: HashMap::with_capacity(expected_nodes),
169 by_label: HashMap::with_capacity(expected_labels),
170 outgoing_by_label: HashMap::with_capacity(outgoing_by_label_cap),
171 }
172 }
173
174 /// Adds an edge to the store.
175 ///
176 /// Creates bidirectional index entries for efficient traversal.
177 /// Also maintains label-based secondary indices (EPIC-019 US-003).
178 ///
179 /// # Errors
180 ///
181 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
182 pub fn add_edge(&mut self, edge: GraphEdge) -> Result<()> {
183 self.insert_edge(edge, true, true)
184 }
185
186 /// Adds an edge with only the outgoing index (for cross-shard storage).
187 ///
188 /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
189 /// The edge is stored and indexed by source node only.
190 ///
191 /// # Errors
192 ///
193 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
194 pub fn add_edge_outgoing_only(&mut self, edge: GraphEdge) -> Result<()> {
195 self.insert_edge(edge, true, false)
196 }
197
198 /// Adds an edge with only the incoming index (for cross-shard storage).
199 ///
200 /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
201 /// The edge is stored and indexed by target node only.
202 /// Note: Label indices are maintained by the source shard in `ConcurrentEdgeStore`.
203 ///
204 /// # Errors
205 ///
206 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
207 pub fn add_edge_incoming_only(&mut self, edge: GraphEdge) -> Result<()> {
208 self.insert_edge(edge, false, true)
209 }
210
211 /// Shared implementation for all `add_edge*` variants.
212 ///
213 /// Validates uniqueness, populates the requested directional indices,
214 /// and stores the edge. Label indices (`by_label`, `outgoing_by_label`)
215 /// are maintained only when `index_outgoing` is `true` (source shard
216 /// owns label indices in the concurrent model).
217 fn insert_edge(
218 &mut self,
219 edge: GraphEdge,
220 index_outgoing: bool,
221 index_incoming: bool,
222 ) -> Result<()> {
223 let id = edge.id();
224 if self.edges.contains_key(&id) {
225 return Err(Error::EdgeExists(id));
226 }
227
228 if index_outgoing {
229 let source = edge.source();
230 let label = edge.label().to_string();
231 self.outgoing.entry(source).or_default().push(id);
232 // Label indices are owned by the source shard (US-003)
233 self.by_label.entry(label.clone()).or_default().push(id);
234 self.outgoing_by_label
235 .entry((source, label))
236 .or_default()
237 .push(id);
238 }
239
240 if index_incoming {
241 self.incoming.entry(edge.target()).or_default().push(id);
242 }
243
244 self.edges.insert(id, edge);
245 Ok(())
246 }
247
248 /// Returns the total number of edges in the store.
249 #[must_use]
250 pub fn edge_count(&self) -> usize {
251 self.edges.len()
252 }
253
254 /// Returns the count of edges where this shard is the source (for accurate cross-shard counting).
255 #[must_use]
256 pub fn outgoing_edge_count(&self) -> usize {
257 self.outgoing.values().map(Vec::len).sum()
258 }
259
260 /// Gets an edge by its ID.
261 #[must_use]
262 pub fn get_edge(&self, id: u64) -> Option<&GraphEdge> {
263 self.edges.get(&id)
264 }
265
266 /// Gets all outgoing edges from a node.
267 #[must_use]
268 pub fn get_outgoing(&self, node_id: u64) -> Vec<&GraphEdge> {
269 self.resolve_edge_ids(self.outgoing.get(&node_id))
270 }
271
272 /// Gets all incoming edges to a node.
273 #[must_use]
274 pub fn get_incoming(&self, node_id: u64) -> Vec<&GraphEdge> {
275 self.resolve_edge_ids(self.incoming.get(&node_id))
276 }
277
278 /// Gets outgoing edges filtered by label using composite index - O(k) where k = result count.
279 ///
280 /// Uses the `outgoing_by_label` composite index for fast lookup instead of
281 /// iterating through all outgoing edges (EPIC-019 US-003).
282 #[must_use]
283 pub fn get_outgoing_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
284 self.resolve_edge_ids(self.outgoing_by_label.get(&(node_id, label.to_string())))
285 }
286
287 /// Gets all edges with a specific label - O(k) where k = result count.
288 ///
289 /// Uses the `by_label` secondary index for fast lookup (EPIC-019 US-003).
290 #[must_use]
291 pub fn get_edges_by_label(&self, label: &str) -> Vec<&GraphEdge> {
292 self.resolve_edge_ids(self.by_label.get(label))
293 }
294
295 /// Resolves edge IDs from an index entry into edge references.
296 ///
297 /// Shared lookup pattern used by `get_outgoing`, `get_incoming`,
298 /// `get_outgoing_by_label`, and `get_edges_by_label`.
299 #[inline]
300 fn resolve_edge_ids(&self, ids: Option<&Vec<u64>>) -> Vec<&GraphEdge> {
301 ids.map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
302 .unwrap_or_default()
303 }
304
305 /// Gets incoming edges filtered by label.
306 #[must_use]
307 pub fn get_incoming_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
308 self.get_incoming(node_id)
309 .into_iter()
310 .filter(|e| e.label() == label)
311 .collect()
312 }
313
314 /// Checks if an edge with the given ID exists.
315 #[must_use]
316 pub fn contains_edge(&self, edge_id: u64) -> bool {
317 self.edges.contains_key(&edge_id)
318 }
319
320 /// Returns the number of edges in the store.
321 #[must_use]
322 pub fn len(&self) -> usize {
323 self.edges.len()
324 }
325
326 /// Returns true if the store contains no edges.
327 #[must_use]
328 pub fn is_empty(&self) -> bool {
329 self.edges.is_empty()
330 }
331
332 /// Returns all edges in the store.
333 #[must_use]
334 pub fn all_edges(&self) -> Vec<&GraphEdge> {
335 self.edges.values().collect()
336 }
337
338 /// Removes an edge by ID.
339 ///
340 /// Cleans up all indices: outgoing, incoming, by_label, and outgoing_by_label.
341 pub fn remove_edge(&mut self, edge_id: u64) {
342 if let Some(edge) = self.edges.remove(&edge_id) {
343 let source = edge.source();
344 self.purge_outgoing_index(edge_id, source);
345 self.purge_incoming_index(edge_id, edge.target());
346 self.purge_label_indices(edge_id, source, edge.label());
347 }
348 }
349
350 /// Removes an edge by ID, only cleaning the outgoing index.
351 ///
352 /// Used by `ConcurrentEdgeStore` for cross-shard cleanup.
353 /// Also cleans up label indices since they are maintained by source shard.
354 pub fn remove_edge_outgoing_only(&mut self, edge_id: u64) {
355 if let Some(edge) = self.edges.remove(&edge_id) {
356 let source = edge.source();
357 self.purge_outgoing_index(edge_id, source);
358 self.purge_label_indices(edge_id, source, edge.label());
359 }
360 }
361
362 /// Removes an edge by ID, only cleaning the incoming index.
363 ///
364 /// Used by `ConcurrentEdgeStore` for cross-shard cleanup.
365 pub fn remove_edge_incoming_only(&mut self, edge_id: u64) {
366 if let Some(edge) = self.edges.remove(&edge_id) {
367 self.purge_incoming_index(edge_id, edge.target());
368 }
369 }
370
371 /// Removes all edges connected to a node (cascade delete).
372 ///
373 /// Removes both outgoing and incoming edges, cleaning up all indices
374 /// including label indices (EPIC-019 US-003).
375 pub fn remove_node_edges(&mut self, node_id: u64) {
376 // Collect edge IDs to remove (outgoing)
377 let outgoing_ids: Vec<u64> = self.outgoing.remove(&node_id).unwrap_or_default();
378
379 // Collect edge IDs to remove (incoming)
380 let incoming_ids: Vec<u64> = self.incoming.remove(&node_id).unwrap_or_default();
381
382 // Remove outgoing edges: clean incoming + label indices for each
383 for edge_id in outgoing_ids {
384 if let Some(edge) = self.edges.remove(&edge_id) {
385 self.purge_incoming_index(edge_id, edge.target());
386 self.purge_label_indices(edge_id, node_id, edge.label());
387 }
388 }
389
390 // Remove incoming edges: clean outgoing + label indices for each
391 for edge_id in incoming_ids {
392 if let Some(edge) = self.edges.remove(&edge_id) {
393 let source = edge.source();
394 self.purge_outgoing_index(edge_id, source);
395 self.purge_label_indices(edge_id, source, edge.label());
396 }
397 }
398 }
399
400 /// Removes `edge_id` from the incoming index of `target_node`.
401 #[inline]
402 fn purge_incoming_index(&mut self, edge_id: u64, target_node: u64) {
403 if let Some(ids) = self.incoming.get_mut(&target_node) {
404 ids.retain(|&id| id != edge_id);
405 }
406 }
407
408 /// Removes `edge_id` from the outgoing index of `source_node`.
409 #[inline]
410 fn purge_outgoing_index(&mut self, edge_id: u64, source_node: u64) {
411 if let Some(ids) = self.outgoing.get_mut(&source_node) {
412 ids.retain(|&id| id != edge_id);
413 }
414 }
415
416 /// Removes `edge_id` from the `by_label` and `outgoing_by_label` indices (US-003).
417 #[inline]
418 fn purge_label_indices(&mut self, edge_id: u64, source_node: u64, label: &str) {
419 if let Some(ids) = self.by_label.get_mut(label) {
420 ids.retain(|&id| id != edge_id);
421 }
422 if let Some(ids) = self
423 .outgoing_by_label
424 .get_mut(&(source_node, label.to_string()))
425 {
426 ids.retain(|&id| id != edge_id);
427 }
428 }
429}
430
431impl PostcardPersistence for EdgeStore {}
432
433// Inherent persistence methods that delegate to `PostcardPersistence`.
434// Required so callers (e.g., `lifecycle.rs`) can use `EdgeStore::load_from_file`
435// without importing the trait.
436impl EdgeStore {
437 /// Serializes the edge store to bytes using `postcard`.
438 ///
439 /// # Errors
440 /// Returns an error if serialization fails.
441 pub fn to_bytes(&self) -> std::result::Result<Vec<u8>, postcard::Error> {
442 <Self as PostcardPersistence>::to_bytes(self)
443 }
444
445 /// Deserializes an edge store from bytes.
446 ///
447 /// # Errors
448 /// Returns an error if deserialization fails (e.g., corrupted data).
449 pub fn from_bytes(bytes: &[u8]) -> std::result::Result<Self, postcard::Error> {
450 <Self as PostcardPersistence>::from_bytes(bytes)
451 }
452
453 /// Saves the edge store to a file.
454 ///
455 /// # Errors
456 /// Returns an error if serialization or file I/O fails.
457 pub fn save_to_file(&self, path: &std::path::Path) -> std::io::Result<()> {
458 <Self as PostcardPersistence>::save_to_file(self, path)
459 }
460
461 /// Loads an edge store from a file.
462 ///
463 /// # Errors
464 /// Returns an error if file I/O or deserialization fails.
465 pub fn load_from_file(path: &std::path::Path) -> std::io::Result<Self> {
466 <Self as PostcardPersistence>::load_from_file(path)
467 }
468}