velesdb_core/collection/graph/edge.rs
1//! Graph edge types and storage for knowledge graph relationships.
2//!
3//! This module provides:
4//! - `GraphEdge`: A typed relationship between nodes with properties
5//! - `EdgeStore`: Bidirectional index for efficient edge traversal
6//!
7//! # Edge Removal Semantics
8//!
9//! During edge removal, the internal indexes may be temporarily inconsistent
10//! while the operation is in progress. The final state is always consistent.
11//! For concurrent access, use `ConcurrentEdgeStore` instead.
12
13use crate::error::{Error, Result};
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::collections::HashMap;
17
18/// A directed edge (relationship) in the knowledge graph.
19///
20/// Edges connect nodes and can have a label (type) and properties.
21///
22/// # Example
23///
24/// ```rust,ignore
25/// use velesdb_core::collection::graph::GraphEdge;
26/// use serde_json::json;
27/// use std::collections::HashMap;
28///
29/// let mut props = HashMap::new();
30/// props.insert("since".to_string(), json!("2020-01-01"));
31///
32/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")
33/// .with_properties(props);
34/// ```
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36pub struct GraphEdge {
37 id: u64,
38 source: u64,
39 target: u64,
40 label: String,
41 properties: HashMap<String, Value>,
42}
43
44impl GraphEdge {
45 /// Creates a new edge with the given ID, endpoints, and label.
46 ///
47 /// # Errors
48 ///
49 /// Returns `Error::InvalidEdgeLabel` if the label is empty or whitespace-only.
50 pub fn new(id: u64, source: u64, target: u64, label: &str) -> Result<Self> {
51 let trimmed = label.trim();
52 if trimmed.is_empty() {
53 return Err(Error::InvalidEdgeLabel(
54 "Edge label cannot be empty or whitespace-only".to_string(),
55 ));
56 }
57 Ok(Self {
58 id,
59 source,
60 target,
61 label: trimmed.to_string(),
62 properties: HashMap::new(),
63 })
64 }
65
66 /// Adds properties to this edge (builder pattern).
67 #[must_use]
68 pub fn with_properties(mut self, properties: HashMap<String, Value>) -> Self {
69 self.properties = properties;
70 self
71 }
72
73 /// Returns the edge ID.
74 #[must_use]
75 pub fn id(&self) -> u64 {
76 self.id
77 }
78
79 /// Returns the source node ID.
80 #[must_use]
81 pub fn source(&self) -> u64 {
82 self.source
83 }
84
85 /// Returns the target node ID.
86 #[must_use]
87 pub fn target(&self) -> u64 {
88 self.target
89 }
90
91 /// Returns the edge label (relationship type).
92 #[must_use]
93 pub fn label(&self) -> &str {
94 &self.label
95 }
96
97 /// Returns all properties of this edge.
98 #[must_use]
99 pub fn properties(&self) -> &HashMap<String, Value> {
100 &self.properties
101 }
102
103 /// Returns a specific property value, if it exists.
104 #[must_use]
105 pub fn property(&self, name: &str) -> Option<&Value> {
106 self.properties.get(name)
107 }
108}
109
110/// Storage for graph edges with bidirectional indexing.
111///
112/// Provides O(1) access to edges by ID and O(degree) access to
113/// outgoing/incoming edges for any node.
114///
115/// # Index Structure (EPIC-019 US-003)
116///
117/// - `by_label`: Secondary index for O(k) label-based queries
118/// - `outgoing_by_label`: Composite index (source, label) for O(k) filtered traversal
119#[derive(Debug, Default, Serialize, Deserialize)]
120pub struct EdgeStore {
121 /// All edges indexed by ID
122 edges: HashMap<u64, GraphEdge>,
123 /// Outgoing edges: source_id -> Vec<edge_id>
124 outgoing: HashMap<u64, Vec<u64>>,
125 /// Incoming edges: target_id -> Vec<edge_id>
126 incoming: HashMap<u64, Vec<u64>>,
127 /// Secondary index: label -> Vec<edge_id> for fast label queries
128 by_label: HashMap<String, Vec<u64>>,
129 /// Composite index: (source_id, label) -> Vec<edge_id> for fast filtered traversal
130 outgoing_by_label: HashMap<(u64, String), Vec<u64>>,
131}
132
133impl EdgeStore {
134 /// Creates a new empty edge store.
135 #[must_use]
136 pub fn new() -> Self {
137 Self::default()
138 }
139
140 /// Creates an edge store with pre-allocated capacity for better performance.
141 ///
142 /// Pre-allocating reduces memory reallocation overhead when inserting many edges.
143 /// With 10M edges, this can reduce peak memory usage by ~2x and improve insert throughput.
144 ///
145 /// # Arguments
146 ///
147 /// * `expected_edges` - Expected number of edges to store
148 /// * `expected_nodes` - Expected number of unique nodes (sources + targets)
149 ///
150 /// # Example
151 ///
152 /// ```rust,ignore
153 /// // For a graph with ~1M edges and ~100K nodes
154 /// let store = EdgeStore::with_capacity(1_000_000, 100_000);
155 /// ```
156 #[must_use]
157 pub fn with_capacity(expected_edges: usize, expected_nodes: usize) -> Self {
158 // Estimate ~10 unique labels typical for knowledge graphs
159 let expected_labels = 10usize;
160 // Use saturating_mul to prevent overflow for extreme inputs
161 let outgoing_by_label_cap = expected_nodes
162 .saturating_mul(expected_labels)
163 .saturating_div(10);
164 Self {
165 edges: HashMap::with_capacity(expected_edges),
166 outgoing: HashMap::with_capacity(expected_nodes),
167 incoming: HashMap::with_capacity(expected_nodes),
168 by_label: HashMap::with_capacity(expected_labels),
169 outgoing_by_label: HashMap::with_capacity(outgoing_by_label_cap),
170 }
171 }
172
173 /// Adds an edge to the store.
174 ///
175 /// Creates bidirectional index entries for efficient traversal.
176 /// Also maintains label-based secondary indices (EPIC-019 US-003).
177 ///
178 /// # Errors
179 ///
180 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
181 pub fn add_edge(&mut self, edge: GraphEdge) -> Result<()> {
182 let id = edge.id();
183 let source = edge.source();
184 let target = edge.target();
185 let label = edge.label().to_string();
186
187 // Check for duplicate ID
188 if self.edges.contains_key(&id) {
189 return Err(Error::EdgeExists(id));
190 }
191
192 // Add to outgoing index
193 self.outgoing.entry(source).or_default().push(id);
194
195 // Add to incoming index
196 self.incoming.entry(target).or_default().push(id);
197
198 // Add to label index (US-003)
199 self.by_label.entry(label.clone()).or_default().push(id);
200
201 // Add to composite (source, label) index (US-003)
202 self.outgoing_by_label
203 .entry((source, label))
204 .or_default()
205 .push(id);
206
207 // Store the edge
208 self.edges.insert(id, edge);
209 Ok(())
210 }
211
212 /// Adds an edge with only the outgoing index (for cross-shard storage).
213 ///
214 /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
215 /// The edge is stored and indexed by source node only.
216 ///
217 /// # Errors
218 ///
219 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
220 pub fn add_edge_outgoing_only(&mut self, edge: GraphEdge) -> Result<()> {
221 let id = edge.id();
222 let source = edge.source();
223 let label = edge.label().to_string();
224
225 // Check for duplicate ID
226 if self.edges.contains_key(&id) {
227 return Err(Error::EdgeExists(id));
228 }
229
230 // Add to outgoing index only
231 self.outgoing.entry(source).or_default().push(id);
232
233 // Add to label index (US-003)
234 self.by_label.entry(label.clone()).or_default().push(id);
235
236 // Add to composite (source, label) index (US-003)
237 self.outgoing_by_label
238 .entry((source, label))
239 .or_default()
240 .push(id);
241
242 // Store the edge
243 self.edges.insert(id, edge);
244 Ok(())
245 }
246
247 /// Adds an edge with only the incoming index (for cross-shard storage).
248 ///
249 /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
250 /// The edge is stored and indexed by target node only.
251 /// Note: Label indices are maintained by the source shard in ConcurrentEdgeStore.
252 ///
253 /// # Errors
254 ///
255 /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
256 pub fn add_edge_incoming_only(&mut self, edge: GraphEdge) -> Result<()> {
257 let id = edge.id();
258 let target = edge.target();
259
260 // Check for duplicate ID
261 if self.edges.contains_key(&id) {
262 return Err(Error::EdgeExists(id));
263 }
264
265 // Add to incoming index only
266 self.incoming.entry(target).or_default().push(id);
267
268 // Note: by_label and outgoing_by_label are maintained by source shard
269 // Store the edge
270 self.edges.insert(id, edge);
271 Ok(())
272 }
273
274 /// Returns the total number of edges in the store.
275 #[must_use]
276 pub fn edge_count(&self) -> usize {
277 self.edges.len()
278 }
279
280 /// Returns the count of edges where this shard is the source (for accurate cross-shard counting).
281 #[must_use]
282 pub fn outgoing_edge_count(&self) -> usize {
283 self.outgoing.values().map(Vec::len).sum()
284 }
285
286 /// Gets an edge by its ID.
287 #[must_use]
288 pub fn get_edge(&self, id: u64) -> Option<&GraphEdge> {
289 self.edges.get(&id)
290 }
291
292 /// Gets all outgoing edges from a node.
293 #[must_use]
294 pub fn get_outgoing(&self, node_id: u64) -> Vec<&GraphEdge> {
295 self.outgoing
296 .get(&node_id)
297 .map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
298 .unwrap_or_default()
299 }
300
301 /// Gets all incoming edges to a node.
302 #[must_use]
303 pub fn get_incoming(&self, node_id: u64) -> Vec<&GraphEdge> {
304 self.incoming
305 .get(&node_id)
306 .map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
307 .unwrap_or_default()
308 }
309
310 /// Gets outgoing edges filtered by label using composite index - O(k) where k = result count.
311 ///
312 /// Uses the `outgoing_by_label` composite index for fast lookup instead of
313 /// iterating through all outgoing edges (EPIC-019 US-003).
314 #[must_use]
315 pub fn get_outgoing_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
316 self.outgoing_by_label
317 .get(&(node_id, label.to_string()))
318 .map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
319 .unwrap_or_default()
320 }
321
322 /// Gets all edges with a specific label - O(k) where k = result count.
323 ///
324 /// Uses the `by_label` secondary index for fast lookup (EPIC-019 US-003).
325 #[must_use]
326 pub fn get_edges_by_label(&self, label: &str) -> Vec<&GraphEdge> {
327 self.by_label
328 .get(label)
329 .map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
330 .unwrap_or_default()
331 }
332
333 /// Gets incoming edges filtered by label.
334 #[must_use]
335 pub fn get_incoming_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
336 self.get_incoming(node_id)
337 .into_iter()
338 .filter(|e| e.label() == label)
339 .collect()
340 }
341
342 /// Checks if an edge with the given ID exists.
343 #[must_use]
344 pub fn contains_edge(&self, edge_id: u64) -> bool {
345 self.edges.contains_key(&edge_id)
346 }
347
348 /// Returns the number of edges in the store.
349 #[must_use]
350 pub fn len(&self) -> usize {
351 self.edges.len()
352 }
353
354 /// Returns true if the store contains no edges.
355 #[must_use]
356 pub fn is_empty(&self) -> bool {
357 self.edges.is_empty()
358 }
359
360 /// Returns all edges in the store.
361 #[must_use]
362 pub fn all_edges(&self) -> Vec<&GraphEdge> {
363 self.edges.values().collect()
364 }
365
366 /// Removes an edge by ID.
367 ///
368 /// Cleans up all indices: outgoing, incoming, by_label, and outgoing_by_label.
369 pub fn remove_edge(&mut self, edge_id: u64) {
370 if let Some(edge) = self.edges.remove(&edge_id) {
371 let source = edge.source();
372 let label = edge.label().to_string();
373
374 // Remove from outgoing index
375 if let Some(ids) = self.outgoing.get_mut(&source) {
376 ids.retain(|&id| id != edge_id);
377 }
378 // Remove from incoming index
379 if let Some(ids) = self.incoming.get_mut(&edge.target()) {
380 ids.retain(|&id| id != edge_id);
381 }
382 // Remove from label index (US-003)
383 if let Some(ids) = self.by_label.get_mut(&label) {
384 ids.retain(|&id| id != edge_id);
385 }
386 // Remove from composite index (US-003)
387 if let Some(ids) = self.outgoing_by_label.get_mut(&(source, label)) {
388 ids.retain(|&id| id != edge_id);
389 }
390 }
391 }
392
393 /// Removes an edge by ID, only cleaning the outgoing index.
394 ///
395 /// Used by `ConcurrentEdgeStore` for cross-shard cleanup.
396 /// Also cleans up label indices since they are maintained by source shard.
397 pub fn remove_edge_outgoing_only(&mut self, edge_id: u64) {
398 if let Some(edge) = self.edges.remove(&edge_id) {
399 let source = edge.source();
400 let label = edge.label().to_string();
401
402 if let Some(ids) = self.outgoing.get_mut(&source) {
403 ids.retain(|&id| id != edge_id);
404 }
405 // Clean label indices (US-003)
406 if let Some(ids) = self.by_label.get_mut(&label) {
407 ids.retain(|&id| id != edge_id);
408 }
409 if let Some(ids) = self.outgoing_by_label.get_mut(&(source, label)) {
410 ids.retain(|&id| id != edge_id);
411 }
412 }
413 }
414
415 /// Removes an edge by ID, only cleaning the incoming index.
416 ///
417 /// Used by `ConcurrentEdgeStore` for cross-shard cleanup.
418 pub fn remove_edge_incoming_only(&mut self, edge_id: u64) {
419 if let Some(edge) = self.edges.remove(&edge_id) {
420 if let Some(ids) = self.incoming.get_mut(&edge.target()) {
421 ids.retain(|&id| id != edge_id);
422 }
423 }
424 }
425
426 // =========================================================================
427 // Persistence
428 // =========================================================================
429
430 /// Serializes the edge store to bytes using `postcard`.
431 ///
432 /// # Errors
433 /// Returns an error if serialization fails.
434 pub fn to_bytes(&self) -> std::result::Result<Vec<u8>, postcard::Error> {
435 postcard::to_allocvec(self)
436 }
437
438 /// Deserializes an edge store from bytes.
439 ///
440 /// # Errors
441 /// Returns an error if deserialization fails (e.g., corrupted data).
442 pub fn from_bytes(bytes: &[u8]) -> std::result::Result<Self, postcard::Error> {
443 postcard::from_bytes(bytes)
444 }
445
446 /// Saves the edge store to a file.
447 ///
448 /// # Errors
449 /// Returns an error if serialization or file I/O fails.
450 pub fn save_to_file(&self, path: &std::path::Path) -> std::io::Result<()> {
451 let bytes = self
452 .to_bytes()
453 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
454 std::fs::write(path, bytes)
455 }
456
457 /// Loads an edge store from a file.
458 ///
459 /// # Errors
460 /// Returns an error if file I/O or deserialization fails.
461 pub fn load_from_file(path: &std::path::Path) -> std::io::Result<Self> {
462 let bytes = std::fs::read(path)?;
463 Self::from_bytes(&bytes)
464 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))
465 }
466
467 /// Removes all edges connected to a node (cascade delete).
468 ///
469 /// Removes both outgoing and incoming edges, cleaning up all indices
470 /// including label indices (EPIC-019 US-003).
471 pub fn remove_node_edges(&mut self, node_id: u64) {
472 // Collect edge IDs to remove (outgoing)
473 let outgoing_ids: Vec<u64> = self.outgoing.remove(&node_id).unwrap_or_default();
474
475 // Collect edge IDs to remove (incoming)
476 let incoming_ids: Vec<u64> = self.incoming.remove(&node_id).unwrap_or_default();
477
478 // Remove outgoing edges and clean all indices
479 for edge_id in outgoing_ids {
480 if let Some(edge) = self.edges.remove(&edge_id) {
481 let label = edge.label().to_string();
482 // Clean incoming index
483 if let Some(ids) = self.incoming.get_mut(&edge.target()) {
484 ids.retain(|&id| id != edge_id);
485 }
486 // Clean label index (US-003)
487 if let Some(ids) = self.by_label.get_mut(&label) {
488 ids.retain(|&id| id != edge_id);
489 }
490 // Clean composite index (US-003)
491 if let Some(ids) = self.outgoing_by_label.get_mut(&(node_id, label)) {
492 ids.retain(|&id| id != edge_id);
493 }
494 }
495 }
496
497 // Remove incoming edges and clean all indices
498 for edge_id in incoming_ids {
499 if let Some(edge) = self.edges.remove(&edge_id) {
500 let source = edge.source();
501 let label = edge.label().to_string();
502 // Clean outgoing index
503 if let Some(ids) = self.outgoing.get_mut(&source) {
504 ids.retain(|&id| id != edge_id);
505 }
506 // Clean label index (US-003)
507 if let Some(ids) = self.by_label.get_mut(&label) {
508 ids.retain(|&id| id != edge_id);
509 }
510 // Clean composite index (US-003)
511 if let Some(ids) = self.outgoing_by_label.get_mut(&(source, label)) {
512 ids.retain(|&id| id != edge_id);
513 }
514 }
515 }
516 }
517}