Skip to main content

manifoldb_graph/store/
edge.rs

1//! Edge (relationship) storage operations.
2//!
3//! This module provides CRUD operations for edges in the graph.
4
5use std::ops::Bound;
6
7use manifoldb_core::encoding::keys::{
8    decode_edge_key, encode_edge_key, encode_edge_type_index_prefix, PREFIX_EDGE,
9};
10use manifoldb_core::encoding::{Decoder, Encoder};
11use manifoldb_core::{Edge, EdgeId, EdgeType, EntityId};
12use manifoldb_storage::{Cursor, Transaction};
13
14use super::error::{GraphError, GraphResult};
15use super::node::NodeStore;
16use super::IdGenerator;
17use crate::index::{AdjacencyIndex, IndexMaintenance};
18
19/// Table name for edge data.
20pub const TABLE_EDGES: &str = "edges";
21
22/// Table name for edges indexed by source entity.
23pub const TABLE_EDGES_BY_SOURCE: &str = "edges_by_source";
24
25/// Table name for edges indexed by target entity.
26pub const TABLE_EDGES_BY_TARGET: &str = "edges_by_target";
27
28/// Table name for edge type index.
29pub const TABLE_EDGE_TYPES: &str = "edge_types";
30
31/// Edge storage operations.
32///
33/// `EdgeStore` provides transactional CRUD operations for graph edges.
34/// All operations work within a transaction context for ACID guarantees.
35///
36/// # Indexes
37///
38/// Edges are indexed by:
39/// - Source entity (for outgoing edge queries)
40/// - Target entity (for incoming edge queries)
41/// - Edge type (for type-based filtering)
42///
43/// # Example
44///
45/// ```ignore
46/// use manifoldb_graph::store::{EdgeStore, NodeStore, IdGenerator};
47///
48/// // Create an edge between two nodes
49/// let gen = IdGenerator::new();
50/// let edge = EdgeStore::create(&mut tx, &gen, source_id, target_id, "FOLLOWS", |id| {
51///     Edge::new(id, source_id, target_id, "FOLLOWS")
52///         .with_property("since", "2024-01-01")
53/// })?;
54///
55/// // Find all outgoing edges from a node
56/// let outgoing = EdgeStore::get_outgoing(&tx, source_id)?;
57/// ```
58pub struct EdgeStore;
59
60impl EdgeStore {
61    /// Create a new edge in the store.
62    ///
63    /// # Arguments
64    ///
65    /// * `tx` - The transaction to use
66    /// * `id_gen` - The ID generator
67    /// * `source` - The source entity ID
68    /// * `target` - The target entity ID
69    /// * `edge_type` - The edge type
70    /// * `builder` - A function that builds the edge given an ID
71    ///
72    /// # Returns
73    ///
74    /// The created edge with its assigned ID.
75    ///
76    /// # Errors
77    ///
78    /// Returns [`GraphError::InvalidEntityReference`] if source or target doesn't exist.
79    pub fn create<T: Transaction, F>(
80        tx: &mut T,
81        id_gen: &IdGenerator,
82        source: EntityId,
83        target: EntityId,
84        edge_type: impl Into<EdgeType>,
85        builder: F,
86    ) -> GraphResult<Edge>
87    where
88        F: FnOnce(EdgeId) -> Edge,
89    {
90        // Verify source and target exist
91        if !NodeStore::exists(tx, source)? {
92            return Err(GraphError::InvalidEntityReference(source));
93        }
94        if !NodeStore::exists(tx, target)? {
95            return Err(GraphError::InvalidEntityReference(target));
96        }
97
98        let id = id_gen.next_edge_id();
99        let _edge_type = edge_type.into();
100        let edge = builder(id);
101
102        Self::store_edge(tx, &edge)?;
103        Ok(edge)
104    }
105
106    /// Create an edge with a specific ID.
107    ///
108    /// This is useful when importing data or when you need to control IDs.
109    ///
110    /// # Arguments
111    ///
112    /// * `tx` - The transaction to use
113    /// * `edge` - The edge to store (must have a valid ID)
114    /// * `validate_refs` - Whether to validate that source/target entities exist
115    ///
116    /// # Errors
117    ///
118    /// Returns [`GraphError::EdgeAlreadyExists`] if an edge with this ID exists.
119    /// Returns [`GraphError::InvalidEntityReference`] if validation is enabled and entities don't exist.
120    pub fn create_with_id<T: Transaction>(
121        tx: &mut T,
122        edge: &Edge,
123        validate_refs: bool,
124    ) -> GraphResult<()> {
125        let key = encode_edge_key(edge.id);
126
127        // Check if edge already exists
128        if tx.get(TABLE_EDGES, &key)?.is_some() {
129            return Err(GraphError::EdgeAlreadyExists(edge.id));
130        }
131
132        if validate_refs {
133            if !NodeStore::exists(tx, edge.source)? {
134                return Err(GraphError::InvalidEntityReference(edge.source));
135            }
136            if !NodeStore::exists(tx, edge.target)? {
137                return Err(GraphError::InvalidEntityReference(edge.target));
138            }
139        }
140
141        Self::store_edge(tx, edge)?;
142        Ok(())
143    }
144
145    /// Internal helper to store an edge and its indexes.
146    fn store_edge<T: Transaction>(tx: &mut T, edge: &Edge) -> GraphResult<()> {
147        // Store the edge data
148        let key = encode_edge_key(edge.id);
149        let value = edge.encode()?;
150        tx.put(TABLE_EDGES, &key, &value)?;
151
152        // Add all indexes using IndexMaintenance
153        IndexMaintenance::add_edge_indexes(tx, edge)?;
154
155        Ok(())
156    }
157
158    /// Get an edge by ID.
159    ///
160    /// # Arguments
161    ///
162    /// * `tx` - The transaction to use
163    /// * `id` - The edge ID to look up
164    ///
165    /// # Returns
166    ///
167    /// The edge if found, or `None` if it doesn't exist.
168    pub fn get<T: Transaction>(tx: &T, id: EdgeId) -> GraphResult<Option<Edge>> {
169        let key = encode_edge_key(id);
170        match tx.get(TABLE_EDGES, &key)? {
171            Some(value) => {
172                let edge = Edge::decode(&value)?;
173                Ok(Some(edge))
174            }
175            None => Ok(None),
176        }
177    }
178
179    /// Get an edge by ID, returning an error if not found.
180    ///
181    /// # Arguments
182    ///
183    /// * `tx` - The transaction to use
184    /// * `id` - The edge ID to look up
185    ///
186    /// # Errors
187    ///
188    /// Returns [`GraphError::EdgeNotFound`] if the edge doesn't exist.
189    pub fn get_or_error<T: Transaction>(tx: &T, id: EdgeId) -> GraphResult<Edge> {
190        Self::get(tx, id)?.ok_or(GraphError::EdgeNotFound(id))
191    }
192
193    /// Check if an edge exists.
194    ///
195    /// # Arguments
196    ///
197    /// * `tx` - The transaction to use
198    /// * `id` - The edge ID to check
199    pub fn exists<T: Transaction>(tx: &T, id: EdgeId) -> GraphResult<bool> {
200        let key = encode_edge_key(id);
201        Ok(tx.get(TABLE_EDGES, &key)?.is_some())
202    }
203
204    /// Update an existing edge.
205    ///
206    /// Note: The source, target, and edge type cannot be changed.
207    /// To change these, delete the edge and create a new one.
208    ///
209    /// # Arguments
210    ///
211    /// * `tx` - The transaction to use
212    /// * `edge` - The edge with updated data
213    ///
214    /// # Errors
215    ///
216    /// Returns [`GraphError::EdgeNotFound`] if the edge doesn't exist.
217    pub fn update<T: Transaction>(tx: &mut T, edge: &Edge) -> GraphResult<()> {
218        let key = encode_edge_key(edge.id);
219
220        // Get the old edge to verify it exists and check if structure changed
221        let old_value = tx.get(TABLE_EDGES, &key)?.ok_or(GraphError::EdgeNotFound(edge.id))?;
222        let old_edge = Edge::decode(&old_value)?;
223
224        // Update indexes if edge structure changed
225        IndexMaintenance::update_edge_indexes(tx, &old_edge, edge)?;
226
227        // Store updated edge
228        let value = edge.encode()?;
229        tx.put(TABLE_EDGES, &key, &value)?;
230
231        Ok(())
232    }
233
234    /// Delete an edge by ID.
235    ///
236    /// # Arguments
237    ///
238    /// * `tx` - The transaction to use
239    /// * `id` - The edge ID to delete
240    ///
241    /// # Returns
242    ///
243    /// `true` if the edge was deleted, `false` if it didn't exist.
244    pub fn delete<T: Transaction>(tx: &mut T, id: EdgeId) -> GraphResult<bool> {
245        let key = encode_edge_key(id);
246
247        // Get the edge to clean up indexes
248        let Some(value) = tx.get(TABLE_EDGES, &key)? else {
249            return Ok(false);
250        };
251        let edge = Edge::decode(&value)?;
252
253        // Remove all indexes using IndexMaintenance
254        IndexMaintenance::remove_edge_indexes(tx, &edge)?;
255
256        // Delete the edge
257        tx.delete(TABLE_EDGES, &key)?;
258        Ok(true)
259    }
260
261    /// Get all outgoing edges from an entity.
262    ///
263    /// # Arguments
264    ///
265    /// * `tx` - The transaction to use
266    /// * `source` - The source entity ID
267    pub fn get_outgoing<T: Transaction>(tx: &T, source: EntityId) -> GraphResult<Vec<Edge>> {
268        let edge_ids = AdjacencyIndex::get_outgoing_edge_ids(tx, source)?;
269        Self::get_edges_by_ids(tx, &edge_ids)
270    }
271
272    /// Get IDs of all outgoing edges from an entity.
273    ///
274    /// # Arguments
275    ///
276    /// * `tx` - The transaction to use
277    /// * `source` - The source entity ID
278    pub fn get_outgoing_ids<T: Transaction>(tx: &T, source: EntityId) -> GraphResult<Vec<EdgeId>> {
279        AdjacencyIndex::get_outgoing_edge_ids(tx, source)
280    }
281
282    /// Get outgoing edges of a specific type from an entity.
283    ///
284    /// # Arguments
285    ///
286    /// * `tx` - The transaction to use
287    /// * `source` - The source entity ID
288    /// * `edge_type` - The edge type to filter by
289    pub fn get_outgoing_by_type<T: Transaction>(
290        tx: &T,
291        source: EntityId,
292        edge_type: &EdgeType,
293    ) -> GraphResult<Vec<Edge>> {
294        let edge_ids = AdjacencyIndex::get_outgoing_by_type(tx, source, edge_type)?;
295        Self::get_edges_by_ids(tx, &edge_ids)
296    }
297
298    /// Get all incoming edges to an entity.
299    ///
300    /// # Arguments
301    ///
302    /// * `tx` - The transaction to use
303    /// * `target` - The target entity ID
304    pub fn get_incoming<T: Transaction>(tx: &T, target: EntityId) -> GraphResult<Vec<Edge>> {
305        let edge_ids = AdjacencyIndex::get_incoming_edge_ids(tx, target)?;
306        Self::get_edges_by_ids(tx, &edge_ids)
307    }
308
309    /// Get IDs of all incoming edges to an entity.
310    ///
311    /// # Arguments
312    ///
313    /// * `tx` - The transaction to use
314    /// * `target` - The target entity ID
315    pub fn get_incoming_ids<T: Transaction>(tx: &T, target: EntityId) -> GraphResult<Vec<EdgeId>> {
316        AdjacencyIndex::get_incoming_edge_ids(tx, target)
317    }
318
319    /// Get incoming edges of a specific type to an entity.
320    ///
321    /// # Arguments
322    ///
323    /// * `tx` - The transaction to use
324    /// * `target` - The target entity ID
325    /// * `edge_type` - The edge type to filter by
326    pub fn get_incoming_by_type<T: Transaction>(
327        tx: &T,
328        target: EntityId,
329        edge_type: &EdgeType,
330    ) -> GraphResult<Vec<Edge>> {
331        let edge_ids = AdjacencyIndex::get_incoming_by_type(tx, target, edge_type)?;
332        Self::get_edges_by_ids(tx, &edge_ids)
333    }
334
335    /// Find all edges of a specific type.
336    ///
337    /// # Arguments
338    ///
339    /// * `tx` - The transaction to use
340    /// * `edge_type` - The edge type to search for
341    pub fn find_by_type<T: Transaction>(tx: &T, edge_type: &EdgeType) -> GraphResult<Vec<EdgeId>> {
342        let prefix = encode_edge_type_index_prefix(edge_type);
343
344        // Create end bound
345        let mut end_prefix = prefix.clone();
346        if let Some(last) = end_prefix.last_mut() {
347            *last = last.saturating_add(1);
348        }
349
350        let mut cursor = tx.range(
351            TABLE_EDGE_TYPES,
352            Bound::Included(prefix.as_slice()),
353            Bound::Excluded(end_prefix.as_slice()),
354        )?;
355
356        let mut ids = Vec::new();
357        while let Some((key, _)) = cursor.next()? {
358            // Extract edge ID from the key (last 8 bytes after prefix + type hash)
359            if key.len() >= 17 {
360                let id_bytes: [u8; 8] = key[9..17].try_into().map_err(|_| {
361                    GraphError::DataCorruption(format!(
362                        "malformed edge type index key: expected 8 bytes for edge ID at offset 9, got {} total bytes",
363                        key.len()
364                    ))
365                })?;
366                ids.push(EdgeId::new(u64::from_be_bytes(id_bytes)));
367            } else {
368                return Err(GraphError::DataCorruption(format!(
369                    "malformed edge type index key: expected at least 17 bytes, got {}",
370                    key.len()
371                )));
372            }
373        }
374
375        Ok(ids)
376    }
377
378    /// Delete all edges connected to an entity.
379    ///
380    /// This removes all incoming and outgoing edges. Call this before
381    /// deleting an entity to maintain graph consistency.
382    ///
383    /// # Arguments
384    ///
385    /// * `tx` - The transaction to use
386    /// * `entity_id` - The entity ID
387    ///
388    /// # Returns
389    ///
390    /// The number of edges deleted.
391    pub fn delete_edges_for_entity<T: Transaction>(
392        tx: &mut T,
393        entity_id: EntityId,
394    ) -> GraphResult<usize> {
395        let mut deleted = 0;
396
397        // Delete outgoing edges
398        let outgoing_ids = Self::get_outgoing_ids(tx, entity_id)?;
399        for edge_id in outgoing_ids {
400            if Self::delete(tx, edge_id)? {
401                deleted += 1;
402            }
403        }
404
405        // Delete incoming edges
406        let incoming_ids = Self::get_incoming_ids(tx, entity_id)?;
407        for edge_id in incoming_ids {
408            if Self::delete(tx, edge_id)? {
409                deleted += 1;
410            }
411        }
412
413        Ok(deleted)
414    }
415
416    /// Helper to get full edges from a list of IDs.
417    fn get_edges_by_ids<T: Transaction>(tx: &T, ids: &[EdgeId]) -> GraphResult<Vec<Edge>> {
418        let mut edges = Vec::with_capacity(ids.len());
419        for &id in ids {
420            if let Some(edge) = Self::get(tx, id)? {
421                edges.push(edge);
422            }
423        }
424        Ok(edges)
425    }
426
427    /// Count all edges in the store.
428    ///
429    /// # Arguments
430    ///
431    /// * `tx` - The transaction to use
432    pub fn count<T: Transaction>(tx: &T) -> GraphResult<usize> {
433        let start = [PREFIX_EDGE];
434        let end = [PREFIX_EDGE + 1];
435
436        let mut cursor = tx.range(
437            TABLE_EDGES,
438            Bound::Included(start.as_slice()),
439            Bound::Excluded(end.as_slice()),
440        )?;
441
442        let mut count = 0;
443        while cursor.next()?.is_some() {
444            count += 1;
445        }
446
447        Ok(count)
448    }
449
450    /// Iterate over all edges.
451    ///
452    /// # Arguments
453    ///
454    /// * `tx` - The transaction to use
455    /// * `f` - A function to call for each edge. Return `false` to stop iteration.
456    pub fn for_each<T: Transaction, F>(tx: &T, mut f: F) -> GraphResult<()>
457    where
458        F: FnMut(&Edge) -> bool,
459    {
460        let start = [PREFIX_EDGE];
461        let end = [PREFIX_EDGE + 1];
462
463        let mut cursor = tx.range(
464            TABLE_EDGES,
465            Bound::Included(start.as_slice()),
466            Bound::Excluded(end.as_slice()),
467        )?;
468
469        while let Some((_, value)) = cursor.next()? {
470            let edge = Edge::decode(&value)?;
471            if !f(&edge) {
472                break;
473            }
474        }
475
476        Ok(())
477    }
478
479    /// Get all edges as a vector.
480    ///
481    /// Use with caution on large datasets - prefer [`Self::for_each`] for
482    /// processing edges without loading all into memory.
483    ///
484    /// # Arguments
485    ///
486    /// * `tx` - The transaction to use
487    pub fn all<T: Transaction>(tx: &T) -> GraphResult<Vec<Edge>> {
488        let mut edges = Vec::new();
489        Self::for_each(tx, |edge| {
490            edges.push(edge.clone());
491            true
492        })?;
493        Ok(edges)
494    }
495
496    /// Find the highest edge ID in the store.
497    ///
498    /// This is useful for initializing the ID generator after loading data.
499    ///
500    /// # Arguments
501    ///
502    /// * `tx` - The transaction to use
503    ///
504    /// # Returns
505    ///
506    /// The highest edge ID, or `None` if there are no edges.
507    pub fn max_id<T: Transaction>(tx: &T) -> GraphResult<Option<EdgeId>> {
508        let start = [PREFIX_EDGE];
509        let end = [PREFIX_EDGE + 1];
510
511        let mut cursor = tx.range(
512            TABLE_EDGES,
513            Bound::Included(start.as_slice()),
514            Bound::Excluded(end.as_slice()),
515        )?;
516
517        // Seek to the last key in the range
518        if cursor.seek_last()?.is_some() {
519            if let Some((key, _)) = cursor.current().map(|(k, v)| (k.to_vec(), v.to_vec())) {
520                return Ok(decode_edge_key(&key));
521            }
522        }
523
524        Ok(None)
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531
532    // Note: Integration tests with actual storage backend are in the tests/ directory
533
534    #[test]
535    fn table_names_are_valid() {
536        assert!(!TABLE_EDGES.is_empty());
537        assert!(!TABLE_EDGES_BY_SOURCE.is_empty());
538        assert!(!TABLE_EDGES_BY_TARGET.is_empty());
539        assert!(!TABLE_EDGE_TYPES.is_empty());
540    }
541
542    #[test]
543    fn table_names_are_unique() {
544        let tables = [TABLE_EDGES, TABLE_EDGES_BY_SOURCE, TABLE_EDGES_BY_TARGET, TABLE_EDGE_TYPES];
545        for (i, a) in tables.iter().enumerate() {
546            for (j, b) in tables.iter().enumerate() {
547                if i != j {
548                    assert_ne!(a, b);
549                }
550            }
551        }
552    }
553}