Skip to main content

velesdb_core/collection/graph/
edge.rs

1//! Graph edge types and storage for knowledge graph relationships.
2//!
3//! This module provides:
4//! - `GraphEdge`: A typed relationship between nodes with properties
5//! - `EdgeStore`: Bidirectional index for efficient edge traversal
6//!
7//! # Edge Removal Semantics
8//!
9//! During edge removal, the internal indexes may be temporarily inconsistent
10//! while the operation is in progress. The final state is always consistent.
11//! For concurrent access, use `ConcurrentEdgeStore` instead.
12
13use crate::error::{Error, Result};
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::collections::HashMap;
17
18/// A directed edge (relationship) in the knowledge graph.
19///
20/// Edges connect nodes and can have a label (type) and properties.
21///
22/// # Example
23///
24/// ```rust,ignore
25/// use velesdb_core::collection::graph::GraphEdge;
26/// use serde_json::json;
27/// use std::collections::HashMap;
28///
29/// let mut props = HashMap::new();
30/// props.insert("since".to_string(), json!("2020-01-01"));
31///
32/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")
33///     .with_properties(props);
34/// ```
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36pub struct GraphEdge {
37    id: u64,
38    source: u64,
39    target: u64,
40    label: String,
41    properties: HashMap<String, Value>,
42}
43
44impl GraphEdge {
45    /// Creates a new edge with the given ID, endpoints, and label.
46    ///
47    /// # Errors
48    ///
49    /// Returns `Error::InvalidEdgeLabel` if the label is empty or whitespace-only.
50    pub fn new(id: u64, source: u64, target: u64, label: &str) -> Result<Self> {
51        let trimmed = label.trim();
52        if trimmed.is_empty() {
53            return Err(Error::InvalidEdgeLabel(
54                "Edge label cannot be empty or whitespace-only".to_string(),
55            ));
56        }
57        Ok(Self {
58            id,
59            source,
60            target,
61            label: trimmed.to_string(),
62            properties: HashMap::new(),
63        })
64    }
65
66    /// Adds properties to this edge (builder pattern).
67    #[must_use]
68    pub fn with_properties(mut self, properties: HashMap<String, Value>) -> Self {
69        self.properties = properties;
70        self
71    }
72
73    /// Returns the edge ID.
74    #[must_use]
75    pub fn id(&self) -> u64 {
76        self.id
77    }
78
79    /// Returns the source node ID.
80    #[must_use]
81    pub fn source(&self) -> u64 {
82        self.source
83    }
84
85    /// Returns the target node ID.
86    #[must_use]
87    pub fn target(&self) -> u64 {
88        self.target
89    }
90
91    /// Returns the edge label (relationship type).
92    #[must_use]
93    pub fn label(&self) -> &str {
94        &self.label
95    }
96
97    /// Returns all properties of this edge.
98    #[must_use]
99    pub fn properties(&self) -> &HashMap<String, Value> {
100        &self.properties
101    }
102
103    /// Returns a specific property value, if it exists.
104    #[must_use]
105    pub fn property(&self, name: &str) -> Option<&Value> {
106        self.properties.get(name)
107    }
108}
109
110/// Storage for graph edges with bidirectional indexing.
111///
112/// Provides O(1) access to edges by ID and O(degree) access to
113/// outgoing/incoming edges for any node.
114///
115/// # Index Structure (EPIC-019 US-003)
116///
117/// - `by_label`: Secondary index for O(k) label-based queries
118/// - `outgoing_by_label`: Composite index (source, label) for O(k) filtered traversal
119#[derive(Debug, Default, Serialize, Deserialize)]
120pub struct EdgeStore {
121    /// All edges indexed by ID
122    edges: HashMap<u64, GraphEdge>,
123    /// Outgoing edges: source_id -> Vec<edge_id>
124    outgoing: HashMap<u64, Vec<u64>>,
125    /// Incoming edges: target_id -> Vec<edge_id>
126    incoming: HashMap<u64, Vec<u64>>,
127    /// Secondary index: label -> Vec<edge_id> for fast label queries
128    by_label: HashMap<String, Vec<u64>>,
129    /// Composite index: (source_id, label) -> Vec<edge_id> for fast filtered traversal
130    outgoing_by_label: HashMap<(u64, String), Vec<u64>>,
131}
132
133impl EdgeStore {
134    /// Creates a new empty edge store.
135    #[must_use]
136    pub fn new() -> Self {
137        Self::default()
138    }
139
140    /// Creates an edge store with pre-allocated capacity for better performance.
141    ///
142    /// Pre-allocating reduces memory reallocation overhead when inserting many edges.
143    /// With 10M edges, this can reduce peak memory usage by ~2x and improve insert throughput.
144    ///
145    /// # Arguments
146    ///
147    /// * `expected_edges` - Expected number of edges to store
148    /// * `expected_nodes` - Expected number of unique nodes (sources + targets)
149    ///
150    /// # Example
151    ///
152    /// ```rust,ignore
153    /// // For a graph with ~1M edges and ~100K nodes
154    /// let store = EdgeStore::with_capacity(1_000_000, 100_000);
155    /// ```
156    #[must_use]
157    pub fn with_capacity(expected_edges: usize, expected_nodes: usize) -> Self {
158        // Estimate ~10 unique labels typical for knowledge graphs
159        let expected_labels = 10usize;
160        // Use saturating_mul to prevent overflow for extreme inputs
161        let outgoing_by_label_cap = expected_nodes
162            .saturating_mul(expected_labels)
163            .saturating_div(10);
164        Self {
165            edges: HashMap::with_capacity(expected_edges),
166            outgoing: HashMap::with_capacity(expected_nodes),
167            incoming: HashMap::with_capacity(expected_nodes),
168            by_label: HashMap::with_capacity(expected_labels),
169            outgoing_by_label: HashMap::with_capacity(outgoing_by_label_cap),
170        }
171    }
172
173    /// Adds an edge to the store.
174    ///
175    /// Creates bidirectional index entries for efficient traversal.
176    /// Also maintains label-based secondary indices (EPIC-019 US-003).
177    ///
178    /// # Errors
179    ///
180    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
181    pub fn add_edge(&mut self, edge: GraphEdge) -> Result<()> {
182        let id = edge.id();
183        let source = edge.source();
184        let target = edge.target();
185        let label = edge.label().to_string();
186
187        // Check for duplicate ID
188        if self.edges.contains_key(&id) {
189            return Err(Error::EdgeExists(id));
190        }
191
192        // Add to outgoing index
193        self.outgoing.entry(source).or_default().push(id);
194
195        // Add to incoming index
196        self.incoming.entry(target).or_default().push(id);
197
198        // Add to label index (US-003)
199        self.by_label.entry(label.clone()).or_default().push(id);
200
201        // Add to composite (source, label) index (US-003)
202        self.outgoing_by_label
203            .entry((source, label))
204            .or_default()
205            .push(id);
206
207        // Store the edge
208        self.edges.insert(id, edge);
209        Ok(())
210    }
211
212    /// Adds an edge with only the outgoing index (for cross-shard storage).
213    ///
214    /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
215    /// The edge is stored and indexed by source node only.
216    ///
217    /// # Errors
218    ///
219    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
220    pub fn add_edge_outgoing_only(&mut self, edge: GraphEdge) -> Result<()> {
221        let id = edge.id();
222        let source = edge.source();
223        let label = edge.label().to_string();
224
225        // Check for duplicate ID
226        if self.edges.contains_key(&id) {
227            return Err(Error::EdgeExists(id));
228        }
229
230        // Add to outgoing index only
231        self.outgoing.entry(source).or_default().push(id);
232
233        // Add to label index (US-003)
234        self.by_label.entry(label.clone()).or_default().push(id);
235
236        // Add to composite (source, label) index (US-003)
237        self.outgoing_by_label
238            .entry((source, label))
239            .or_default()
240            .push(id);
241
242        // Store the edge
243        self.edges.insert(id, edge);
244        Ok(())
245    }
246
247    /// Adds an edge with only the incoming index (for cross-shard storage).
248    ///
249    /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
250    /// The edge is stored and indexed by target node only.
251    /// Note: Label indices are maintained by the source shard in ConcurrentEdgeStore.
252    ///
253    /// # Errors
254    ///
255    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
256    pub fn add_edge_incoming_only(&mut self, edge: GraphEdge) -> Result<()> {
257        let id = edge.id();
258        let target = edge.target();
259
260        // Check for duplicate ID
261        if self.edges.contains_key(&id) {
262            return Err(Error::EdgeExists(id));
263        }
264
265        // Add to incoming index only
266        self.incoming.entry(target).or_default().push(id);
267
268        // Note: by_label and outgoing_by_label are maintained by source shard
269        // Store the edge
270        self.edges.insert(id, edge);
271        Ok(())
272    }
273
274    /// Returns the total number of edges in the store.
275    #[must_use]
276    pub fn edge_count(&self) -> usize {
277        self.edges.len()
278    }
279
280    /// Returns the count of edges where this shard is the source (for accurate cross-shard counting).
281    #[must_use]
282    pub fn outgoing_edge_count(&self) -> usize {
283        self.outgoing.values().map(Vec::len).sum()
284    }
285
286    /// Gets an edge by its ID.
287    #[must_use]
288    pub fn get_edge(&self, id: u64) -> Option<&GraphEdge> {
289        self.edges.get(&id)
290    }
291
292    /// Gets all outgoing edges from a node.
293    #[must_use]
294    pub fn get_outgoing(&self, node_id: u64) -> Vec<&GraphEdge> {
295        self.outgoing
296            .get(&node_id)
297            .map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
298            .unwrap_or_default()
299    }
300
301    /// Gets all incoming edges to a node.
302    #[must_use]
303    pub fn get_incoming(&self, node_id: u64) -> Vec<&GraphEdge> {
304        self.incoming
305            .get(&node_id)
306            .map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
307            .unwrap_or_default()
308    }
309
310    /// Gets outgoing edges filtered by label using composite index - O(k) where k = result count.
311    ///
312    /// Uses the `outgoing_by_label` composite index for fast lookup instead of
313    /// iterating through all outgoing edges (EPIC-019 US-003).
314    #[must_use]
315    pub fn get_outgoing_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
316        self.outgoing_by_label
317            .get(&(node_id, label.to_string()))
318            .map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
319            .unwrap_or_default()
320    }
321
322    /// Gets all edges with a specific label - O(k) where k = result count.
323    ///
324    /// Uses the `by_label` secondary index for fast lookup (EPIC-019 US-003).
325    #[must_use]
326    pub fn get_edges_by_label(&self, label: &str) -> Vec<&GraphEdge> {
327        self.by_label
328            .get(label)
329            .map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
330            .unwrap_or_default()
331    }
332
333    /// Gets incoming edges filtered by label.
334    #[must_use]
335    pub fn get_incoming_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
336        self.get_incoming(node_id)
337            .into_iter()
338            .filter(|e| e.label() == label)
339            .collect()
340    }
341
342    /// Checks if an edge with the given ID exists.
343    #[must_use]
344    pub fn contains_edge(&self, edge_id: u64) -> bool {
345        self.edges.contains_key(&edge_id)
346    }
347
348    /// Returns the number of edges in the store.
349    #[must_use]
350    pub fn len(&self) -> usize {
351        self.edges.len()
352    }
353
354    /// Returns true if the store contains no edges.
355    #[must_use]
356    pub fn is_empty(&self) -> bool {
357        self.edges.is_empty()
358    }
359
360    /// Returns all edges in the store.
361    #[must_use]
362    pub fn all_edges(&self) -> Vec<&GraphEdge> {
363        self.edges.values().collect()
364    }
365
366    /// Removes an edge by ID.
367    ///
368    /// Cleans up all indices: outgoing, incoming, by_label, and outgoing_by_label.
369    pub fn remove_edge(&mut self, edge_id: u64) {
370        if let Some(edge) = self.edges.remove(&edge_id) {
371            let source = edge.source();
372            let label = edge.label().to_string();
373
374            // Remove from outgoing index
375            if let Some(ids) = self.outgoing.get_mut(&source) {
376                ids.retain(|&id| id != edge_id);
377            }
378            // Remove from incoming index
379            if let Some(ids) = self.incoming.get_mut(&edge.target()) {
380                ids.retain(|&id| id != edge_id);
381            }
382            // Remove from label index (US-003)
383            if let Some(ids) = self.by_label.get_mut(&label) {
384                ids.retain(|&id| id != edge_id);
385            }
386            // Remove from composite index (US-003)
387            if let Some(ids) = self.outgoing_by_label.get_mut(&(source, label)) {
388                ids.retain(|&id| id != edge_id);
389            }
390        }
391    }
392
393    /// Removes an edge by ID, only cleaning the outgoing index.
394    ///
395    /// Used by `ConcurrentEdgeStore` for cross-shard cleanup.
396    /// Also cleans up label indices since they are maintained by source shard.
397    pub fn remove_edge_outgoing_only(&mut self, edge_id: u64) {
398        if let Some(edge) = self.edges.remove(&edge_id) {
399            let source = edge.source();
400            let label = edge.label().to_string();
401
402            if let Some(ids) = self.outgoing.get_mut(&source) {
403                ids.retain(|&id| id != edge_id);
404            }
405            // Clean label indices (US-003)
406            if let Some(ids) = self.by_label.get_mut(&label) {
407                ids.retain(|&id| id != edge_id);
408            }
409            if let Some(ids) = self.outgoing_by_label.get_mut(&(source, label)) {
410                ids.retain(|&id| id != edge_id);
411            }
412        }
413    }
414
415    /// Removes an edge by ID, only cleaning the incoming index.
416    ///
417    /// Used by `ConcurrentEdgeStore` for cross-shard cleanup.
418    pub fn remove_edge_incoming_only(&mut self, edge_id: u64) {
419        if let Some(edge) = self.edges.remove(&edge_id) {
420            if let Some(ids) = self.incoming.get_mut(&edge.target()) {
421                ids.retain(|&id| id != edge_id);
422            }
423        }
424    }
425
426    // =========================================================================
427    // Persistence
428    // =========================================================================
429
430    /// Serializes the edge store to bytes using `postcard`.
431    ///
432    /// # Errors
433    /// Returns an error if serialization fails.
434    pub fn to_bytes(&self) -> std::result::Result<Vec<u8>, postcard::Error> {
435        postcard::to_allocvec(self)
436    }
437
438    /// Deserializes an edge store from bytes.
439    ///
440    /// # Errors
441    /// Returns an error if deserialization fails (e.g., corrupted data).
442    pub fn from_bytes(bytes: &[u8]) -> std::result::Result<Self, postcard::Error> {
443        postcard::from_bytes(bytes)
444    }
445
446    /// Saves the edge store to a file.
447    ///
448    /// # Errors
449    /// Returns an error if serialization or file I/O fails.
450    pub fn save_to_file(&self, path: &std::path::Path) -> std::io::Result<()> {
451        let bytes = self
452            .to_bytes()
453            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
454        std::fs::write(path, bytes)
455    }
456
457    /// Loads an edge store from a file.
458    ///
459    /// # Errors
460    /// Returns an error if file I/O or deserialization fails.
461    pub fn load_from_file(path: &std::path::Path) -> std::io::Result<Self> {
462        let bytes = std::fs::read(path)?;
463        Self::from_bytes(&bytes)
464            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))
465    }
466
467    /// Removes all edges connected to a node (cascade delete).
468    ///
469    /// Removes both outgoing and incoming edges, cleaning up all indices
470    /// including label indices (EPIC-019 US-003).
471    pub fn remove_node_edges(&mut self, node_id: u64) {
472        // Collect edge IDs to remove (outgoing)
473        let outgoing_ids: Vec<u64> = self.outgoing.remove(&node_id).unwrap_or_default();
474
475        // Collect edge IDs to remove (incoming)
476        let incoming_ids: Vec<u64> = self.incoming.remove(&node_id).unwrap_or_default();
477
478        // Remove outgoing edges and clean all indices
479        for edge_id in outgoing_ids {
480            if let Some(edge) = self.edges.remove(&edge_id) {
481                let label = edge.label().to_string();
482                // Clean incoming index
483                if let Some(ids) = self.incoming.get_mut(&edge.target()) {
484                    ids.retain(|&id| id != edge_id);
485                }
486                // Clean label index (US-003)
487                if let Some(ids) = self.by_label.get_mut(&label) {
488                    ids.retain(|&id| id != edge_id);
489                }
490                // Clean composite index (US-003)
491                if let Some(ids) = self.outgoing_by_label.get_mut(&(node_id, label)) {
492                    ids.retain(|&id| id != edge_id);
493                }
494            }
495        }
496
497        // Remove incoming edges and clean all indices
498        for edge_id in incoming_ids {
499            if let Some(edge) = self.edges.remove(&edge_id) {
500                let source = edge.source();
501                let label = edge.label().to_string();
502                // Clean outgoing index
503                if let Some(ids) = self.outgoing.get_mut(&source) {
504                    ids.retain(|&id| id != edge_id);
505                }
506                // Clean label index (US-003)
507                if let Some(ids) = self.by_label.get_mut(&label) {
508                    ids.retain(|&id| id != edge_id);
509                }
510                // Clean composite index (US-003)
511                if let Some(ids) = self.outgoing_by_label.get_mut(&(source, label)) {
512                    ids.retain(|&id| id != edge_id);
513                }
514            }
515        }
516    }
517}