Skip to main content

velesdb_core/collection/graph/
edge.rs

1//! Graph edge types and storage for knowledge graph relationships.
2//!
3//! This module provides:
4//! - `GraphEdge`: A typed relationship between nodes with properties
5//! - `EdgeStore`: Bidirectional index for efficient edge traversal
6//!
7//! # Edge Removal Semantics
8//!
9//! During edge removal, the internal indexes may be temporarily inconsistent
10//! while the operation is in progress. The final state is always consistent.
11//! For concurrent access, use `ConcurrentEdgeStore` instead.
12
13use super::helpers::PostcardPersistence;
14use crate::error::{Error, Result};
15use serde::{Deserialize, Serialize};
16use serde_json::Value;
17use std::collections::HashMap;
18
19/// A directed edge (relationship) in the knowledge graph.
20///
21/// Edges connect nodes and can have a label (type) and properties.
22///
23/// # Example
24///
25/// ```rust,ignore
26/// use velesdb_core::collection::graph::GraphEdge;
27/// use serde_json::json;
28/// use std::collections::HashMap;
29///
30/// let mut props = HashMap::new();
31/// props.insert("since".to_string(), json!("2020-01-01"));
32///
33/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")
34///     .with_properties(props);
35/// ```
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
37pub struct GraphEdge {
38    id: u64,
39    source: u64,
40    target: u64,
41    label: String,
42    properties: HashMap<String, Value>,
43}
44
45impl GraphEdge {
46    /// Creates a new edge with the given ID, endpoints, and label.
47    ///
48    /// # Errors
49    ///
50    /// Returns `Error::InvalidEdgeLabel` if the label is empty or whitespace-only.
51    pub fn new(id: u64, source: u64, target: u64, label: &str) -> Result<Self> {
52        let trimmed = label.trim();
53        if trimmed.is_empty() {
54            return Err(Error::InvalidEdgeLabel(
55                "Edge label cannot be empty or whitespace-only".to_string(),
56            ));
57        }
58        Ok(Self {
59            id,
60            source,
61            target,
62            label: trimmed.to_string(),
63            properties: HashMap::new(),
64        })
65    }
66
67    /// Adds properties to this edge (builder pattern).
68    #[must_use]
69    pub fn with_properties(mut self, properties: HashMap<String, Value>) -> Self {
70        self.properties = properties;
71        self
72    }
73
74    /// Returns the edge ID.
75    #[must_use]
76    pub fn id(&self) -> u64 {
77        self.id
78    }
79
80    /// Returns the source node ID.
81    #[must_use]
82    pub fn source(&self) -> u64 {
83        self.source
84    }
85
86    /// Returns the target node ID.
87    #[must_use]
88    pub fn target(&self) -> u64 {
89        self.target
90    }
91
92    /// Returns the edge label (relationship type).
93    #[must_use]
94    pub fn label(&self) -> &str {
95        &self.label
96    }
97
98    /// Returns all properties of this edge.
99    #[must_use]
100    pub fn properties(&self) -> &HashMap<String, Value> {
101        &self.properties
102    }
103
104    /// Returns a specific property value, if it exists.
105    #[must_use]
106    pub fn property(&self, name: &str) -> Option<&Value> {
107        self.properties.get(name)
108    }
109}
110
111/// Storage for graph edges with bidirectional indexing.
112///
113/// Provides O(1) access to edges by ID and O(degree) access to
114/// outgoing/incoming edges for any node.
115///
116/// # Index Structure (EPIC-019 US-003)
117///
118/// - `by_label`: Secondary index for O(k) label-based queries
119/// - `outgoing_by_label`: Composite index (source, label) for O(k) filtered traversal
120#[derive(Debug, Default, Serialize, Deserialize)]
121pub struct EdgeStore {
122    /// All edges indexed by ID
123    edges: HashMap<u64, GraphEdge>,
124    /// Outgoing edges: source_id -> Vec<edge_id>
125    outgoing: HashMap<u64, Vec<u64>>,
126    /// Incoming edges: target_id -> Vec<edge_id>
127    incoming: HashMap<u64, Vec<u64>>,
128    /// Secondary index: label -> Vec<edge_id> for fast label queries
129    by_label: HashMap<String, Vec<u64>>,
130    /// Composite index: (source_id, label) -> Vec<edge_id> for fast filtered traversal
131    outgoing_by_label: HashMap<(u64, String), Vec<u64>>,
132}
133
134impl EdgeStore {
135    /// Creates a new empty edge store.
136    #[must_use]
137    pub fn new() -> Self {
138        Self::default()
139    }
140
141    /// Creates an edge store with pre-allocated capacity for better performance.
142    ///
143    /// Pre-allocating reduces memory reallocation overhead when inserting many edges.
144    /// With 10M edges, this can reduce peak memory usage by ~2x and improve insert throughput.
145    ///
146    /// # Arguments
147    ///
148    /// * `expected_edges` - Expected number of edges to store
149    /// * `expected_nodes` - Expected number of unique nodes (sources + targets)
150    ///
151    /// # Example
152    ///
153    /// ```rust,ignore
154    /// // For a graph with ~1M edges and ~100K nodes
155    /// let store = EdgeStore::with_capacity(1_000_000, 100_000);
156    /// ```
157    #[must_use]
158    pub fn with_capacity(expected_edges: usize, expected_nodes: usize) -> Self {
159        // Estimate ~10 unique labels typical for knowledge graphs
160        let expected_labels = 10usize;
161        // Use saturating_mul to prevent overflow for extreme inputs
162        let outgoing_by_label_cap = expected_nodes
163            .saturating_mul(expected_labels)
164            .saturating_div(10);
165        Self {
166            edges: HashMap::with_capacity(expected_edges),
167            outgoing: HashMap::with_capacity(expected_nodes),
168            incoming: HashMap::with_capacity(expected_nodes),
169            by_label: HashMap::with_capacity(expected_labels),
170            outgoing_by_label: HashMap::with_capacity(outgoing_by_label_cap),
171        }
172    }
173
174    /// Adds an edge to the store.
175    ///
176    /// Creates bidirectional index entries for efficient traversal.
177    /// Also maintains label-based secondary indices (EPIC-019 US-003).
178    ///
179    /// # Errors
180    ///
181    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
182    pub fn add_edge(&mut self, edge: GraphEdge) -> Result<()> {
183        self.insert_edge(edge, true, true)
184    }
185
186    /// Adds an edge with only the outgoing index (for cross-shard storage).
187    ///
188    /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
189    /// The edge is stored and indexed by source node only.
190    ///
191    /// # Errors
192    ///
193    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
194    pub fn add_edge_outgoing_only(&mut self, edge: GraphEdge) -> Result<()> {
195        self.insert_edge(edge, true, false)
196    }
197
198    /// Adds an edge with only the incoming index (for cross-shard storage).
199    ///
200    /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
201    /// The edge is stored and indexed by target node only.
202    /// Note: Label indices are maintained by the source shard in `ConcurrentEdgeStore`.
203    ///
204    /// # Errors
205    ///
206    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
207    pub fn add_edge_incoming_only(&mut self, edge: GraphEdge) -> Result<()> {
208        self.insert_edge(edge, false, true)
209    }
210
211    /// Shared implementation for all `add_edge*` variants.
212    ///
213    /// Validates uniqueness, populates the requested directional indices,
214    /// and stores the edge. Label indices (`by_label`, `outgoing_by_label`)
215    /// are maintained only when `index_outgoing` is `true` (source shard
216    /// owns label indices in the concurrent model).
217    fn insert_edge(
218        &mut self,
219        edge: GraphEdge,
220        index_outgoing: bool,
221        index_incoming: bool,
222    ) -> Result<()> {
223        let id = edge.id();
224        if self.edges.contains_key(&id) {
225            return Err(Error::EdgeExists(id));
226        }
227
228        if index_outgoing {
229            let source = edge.source();
230            let label = edge.label().to_string();
231            self.outgoing.entry(source).or_default().push(id);
232            // Label indices are owned by the source shard (US-003)
233            self.by_label.entry(label.clone()).or_default().push(id);
234            self.outgoing_by_label
235                .entry((source, label))
236                .or_default()
237                .push(id);
238        }
239
240        if index_incoming {
241            self.incoming.entry(edge.target()).or_default().push(id);
242        }
243
244        self.edges.insert(id, edge);
245        Ok(())
246    }
247
248    /// Returns the total number of edges in the store.
249    #[must_use]
250    pub fn edge_count(&self) -> usize {
251        self.edges.len()
252    }
253
254    /// Returns the count of edges where this shard is the source (for accurate cross-shard counting).
255    #[must_use]
256    pub fn outgoing_edge_count(&self) -> usize {
257        self.outgoing.values().map(Vec::len).sum()
258    }
259
260    /// Gets an edge by its ID.
261    #[must_use]
262    pub fn get_edge(&self, id: u64) -> Option<&GraphEdge> {
263        self.edges.get(&id)
264    }
265
266    /// Gets all outgoing edges from a node.
267    #[must_use]
268    pub fn get_outgoing(&self, node_id: u64) -> Vec<&GraphEdge> {
269        self.resolve_edge_ids(self.outgoing.get(&node_id))
270    }
271
272    /// Gets all incoming edges to a node.
273    #[must_use]
274    pub fn get_incoming(&self, node_id: u64) -> Vec<&GraphEdge> {
275        self.resolve_edge_ids(self.incoming.get(&node_id))
276    }
277
278    /// Gets outgoing edges filtered by label using composite index - O(k) where k = result count.
279    ///
280    /// Uses the `outgoing_by_label` composite index for fast lookup instead of
281    /// iterating through all outgoing edges (EPIC-019 US-003).
282    #[must_use]
283    pub fn get_outgoing_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
284        self.resolve_edge_ids(self.outgoing_by_label.get(&(node_id, label.to_string())))
285    }
286
287    /// Gets all edges with a specific label - O(k) where k = result count.
288    ///
289    /// Uses the `by_label` secondary index for fast lookup (EPIC-019 US-003).
290    #[must_use]
291    pub fn get_edges_by_label(&self, label: &str) -> Vec<&GraphEdge> {
292        self.resolve_edge_ids(self.by_label.get(label))
293    }
294
295    /// Resolves edge IDs from an index entry into edge references.
296    ///
297    /// Shared lookup pattern used by `get_outgoing`, `get_incoming`,
298    /// `get_outgoing_by_label`, and `get_edges_by_label`.
299    #[inline]
300    fn resolve_edge_ids(&self, ids: Option<&Vec<u64>>) -> Vec<&GraphEdge> {
301        ids.map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
302            .unwrap_or_default()
303    }
304
305    /// Gets incoming edges filtered by label.
306    #[must_use]
307    pub fn get_incoming_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
308        self.get_incoming(node_id)
309            .into_iter()
310            .filter(|e| e.label() == label)
311            .collect()
312    }
313
314    /// Checks if an edge with the given ID exists.
315    #[must_use]
316    pub fn contains_edge(&self, edge_id: u64) -> bool {
317        self.edges.contains_key(&edge_id)
318    }
319
320    /// Returns the number of edges in the store.
321    #[must_use]
322    pub fn len(&self) -> usize {
323        self.edges.len()
324    }
325
326    /// Returns true if the store contains no edges.
327    #[must_use]
328    pub fn is_empty(&self) -> bool {
329        self.edges.is_empty()
330    }
331
332    /// Returns all edges in the store.
333    #[must_use]
334    pub fn all_edges(&self) -> Vec<&GraphEdge> {
335        self.edges.values().collect()
336    }
337
338    /// Removes an edge by ID.
339    ///
340    /// Cleans up all indices: outgoing, incoming, by_label, and outgoing_by_label.
341    pub fn remove_edge(&mut self, edge_id: u64) {
342        if let Some(edge) = self.edges.remove(&edge_id) {
343            let source = edge.source();
344            self.purge_outgoing_index(edge_id, source);
345            self.purge_incoming_index(edge_id, edge.target());
346            self.purge_label_indices(edge_id, source, edge.label());
347        }
348    }
349
350    /// Removes an edge by ID, only cleaning the outgoing index.
351    ///
352    /// Used by `ConcurrentEdgeStore` for cross-shard cleanup.
353    /// Also cleans up label indices since they are maintained by source shard.
354    pub fn remove_edge_outgoing_only(&mut self, edge_id: u64) {
355        if let Some(edge) = self.edges.remove(&edge_id) {
356            let source = edge.source();
357            self.purge_outgoing_index(edge_id, source);
358            self.purge_label_indices(edge_id, source, edge.label());
359        }
360    }
361
362    /// Removes an edge by ID, only cleaning the incoming index.
363    ///
364    /// Used by `ConcurrentEdgeStore` for cross-shard cleanup.
365    pub fn remove_edge_incoming_only(&mut self, edge_id: u64) {
366        if let Some(edge) = self.edges.remove(&edge_id) {
367            self.purge_incoming_index(edge_id, edge.target());
368        }
369    }
370
371    /// Removes all edges connected to a node (cascade delete).
372    ///
373    /// Removes both outgoing and incoming edges, cleaning up all indices
374    /// including label indices (EPIC-019 US-003).
375    pub fn remove_node_edges(&mut self, node_id: u64) {
376        // Collect edge IDs to remove (outgoing)
377        let outgoing_ids: Vec<u64> = self.outgoing.remove(&node_id).unwrap_or_default();
378
379        // Collect edge IDs to remove (incoming)
380        let incoming_ids: Vec<u64> = self.incoming.remove(&node_id).unwrap_or_default();
381
382        // Remove outgoing edges: clean incoming + label indices for each
383        for edge_id in outgoing_ids {
384            if let Some(edge) = self.edges.remove(&edge_id) {
385                self.purge_incoming_index(edge_id, edge.target());
386                self.purge_label_indices(edge_id, node_id, edge.label());
387            }
388        }
389
390        // Remove incoming edges: clean outgoing + label indices for each
391        for edge_id in incoming_ids {
392            if let Some(edge) = self.edges.remove(&edge_id) {
393                let source = edge.source();
394                self.purge_outgoing_index(edge_id, source);
395                self.purge_label_indices(edge_id, source, edge.label());
396            }
397        }
398    }
399
400    /// Removes `edge_id` from the incoming index of `target_node`.
401    #[inline]
402    fn purge_incoming_index(&mut self, edge_id: u64, target_node: u64) {
403        if let Some(ids) = self.incoming.get_mut(&target_node) {
404            ids.retain(|&id| id != edge_id);
405        }
406    }
407
408    /// Removes `edge_id` from the outgoing index of `source_node`.
409    #[inline]
410    fn purge_outgoing_index(&mut self, edge_id: u64, source_node: u64) {
411        if let Some(ids) = self.outgoing.get_mut(&source_node) {
412            ids.retain(|&id| id != edge_id);
413        }
414    }
415
416    /// Removes `edge_id` from the `by_label` and `outgoing_by_label` indices (US-003).
417    #[inline]
418    fn purge_label_indices(&mut self, edge_id: u64, source_node: u64, label: &str) {
419        if let Some(ids) = self.by_label.get_mut(label) {
420            ids.retain(|&id| id != edge_id);
421        }
422        if let Some(ids) = self
423            .outgoing_by_label
424            .get_mut(&(source_node, label.to_string()))
425        {
426            ids.retain(|&id| id != edge_id);
427        }
428    }
429}
430
431impl PostcardPersistence for EdgeStore {}
432
433// Inherent persistence methods that delegate to `PostcardPersistence`.
434// Required so callers (e.g., `lifecycle.rs`) can use `EdgeStore::load_from_file`
435// without importing the trait.
436impl EdgeStore {
437    /// Serializes the edge store to bytes using `postcard`.
438    ///
439    /// # Errors
440    /// Returns an error if serialization fails.
441    pub fn to_bytes(&self) -> std::result::Result<Vec<u8>, postcard::Error> {
442        <Self as PostcardPersistence>::to_bytes(self)
443    }
444
445    /// Deserializes an edge store from bytes.
446    ///
447    /// # Errors
448    /// Returns an error if deserialization fails (e.g., corrupted data).
449    pub fn from_bytes(bytes: &[u8]) -> std::result::Result<Self, postcard::Error> {
450        <Self as PostcardPersistence>::from_bytes(bytes)
451    }
452
453    /// Saves the edge store to a file.
454    ///
455    /// # Errors
456    /// Returns an error if serialization or file I/O fails.
457    pub fn save_to_file(&self, path: &std::path::Path) -> std::io::Result<()> {
458        <Self as PostcardPersistence>::save_to_file(self, path)
459    }
460
461    /// Loads an edge store from a file.
462    ///
463    /// # Errors
464    /// Returns an error if file I/O or deserialization fails.
465    pub fn load_from_file(path: &std::path::Path) -> std::io::Result<Self> {
466        <Self as PostcardPersistence>::load_from_file(path)
467    }
468}