manifoldb_graph/store/edge.rs
1//! Edge (relationship) storage operations.
2//!
3//! This module provides CRUD operations for edges in the graph.
4
5use std::ops::Bound;
6
7use manifoldb_core::encoding::keys::{
8 decode_edge_key, encode_edge_key, encode_edge_type_index_prefix, PREFIX_EDGE,
9};
10use manifoldb_core::encoding::{Decoder, Encoder};
11use manifoldb_core::{Edge, EdgeId, EdgeType, EntityId};
12use manifoldb_storage::{Cursor, Transaction};
13
14use super::error::{GraphError, GraphResult};
15use super::node::NodeStore;
16use super::IdGenerator;
17use crate::index::{AdjacencyIndex, IndexMaintenance};
18
19/// Table name for edge data.
20pub const TABLE_EDGES: &str = "edges";
21
22/// Table name for edges indexed by source entity.
23pub const TABLE_EDGES_BY_SOURCE: &str = "edges_by_source";
24
25/// Table name for edges indexed by target entity.
26pub const TABLE_EDGES_BY_TARGET: &str = "edges_by_target";
27
28/// Table name for edge type index.
29pub const TABLE_EDGE_TYPES: &str = "edge_types";
30
31/// Edge storage operations.
32///
33/// `EdgeStore` provides transactional CRUD operations for graph edges.
34/// All operations work within a transaction context for ACID guarantees.
35///
36/// # Indexes
37///
38/// Edges are indexed by:
39/// - Source entity (for outgoing edge queries)
40/// - Target entity (for incoming edge queries)
41/// - Edge type (for type-based filtering)
42///
43/// # Example
44///
45/// ```ignore
46/// use manifoldb_graph::store::{EdgeStore, NodeStore, IdGenerator};
47///
48/// // Create an edge between two nodes
49/// let gen = IdGenerator::new();
50/// let edge = EdgeStore::create(&mut tx, &gen, source_id, target_id, "FOLLOWS", |id| {
51/// Edge::new(id, source_id, target_id, "FOLLOWS")
52/// .with_property("since", "2024-01-01")
53/// })?;
54///
55/// // Find all outgoing edges from a node
56/// let outgoing = EdgeStore::get_outgoing(&tx, source_id)?;
57/// ```
58pub struct EdgeStore;
59
60impl EdgeStore {
61 /// Create a new edge in the store.
62 ///
63 /// # Arguments
64 ///
65 /// * `tx` - The transaction to use
66 /// * `id_gen` - The ID generator
67 /// * `source` - The source entity ID
68 /// * `target` - The target entity ID
69 /// * `edge_type` - The edge type
70 /// * `builder` - A function that builds the edge given an ID
71 ///
72 /// # Returns
73 ///
74 /// The created edge with its assigned ID.
75 ///
76 /// # Errors
77 ///
78 /// Returns [`GraphError::InvalidEntityReference`] if source or target doesn't exist.
79 pub fn create<T: Transaction, F>(
80 tx: &mut T,
81 id_gen: &IdGenerator,
82 source: EntityId,
83 target: EntityId,
84 edge_type: impl Into<EdgeType>,
85 builder: F,
86 ) -> GraphResult<Edge>
87 where
88 F: FnOnce(EdgeId) -> Edge,
89 {
90 // Verify source and target exist
91 if !NodeStore::exists(tx, source)? {
92 return Err(GraphError::InvalidEntityReference(source));
93 }
94 if !NodeStore::exists(tx, target)? {
95 return Err(GraphError::InvalidEntityReference(target));
96 }
97
98 let id = id_gen.next_edge_id();
99 let _edge_type = edge_type.into();
100 let edge = builder(id);
101
102 Self::store_edge(tx, &edge)?;
103 Ok(edge)
104 }
105
106 /// Create an edge with a specific ID.
107 ///
108 /// This is useful when importing data or when you need to control IDs.
109 ///
110 /// # Arguments
111 ///
112 /// * `tx` - The transaction to use
113 /// * `edge` - The edge to store (must have a valid ID)
114 /// * `validate_refs` - Whether to validate that source/target entities exist
115 ///
116 /// # Errors
117 ///
118 /// Returns [`GraphError::EdgeAlreadyExists`] if an edge with this ID exists.
119 /// Returns [`GraphError::InvalidEntityReference`] if validation is enabled and entities don't exist.
120 pub fn create_with_id<T: Transaction>(
121 tx: &mut T,
122 edge: &Edge,
123 validate_refs: bool,
124 ) -> GraphResult<()> {
125 let key = encode_edge_key(edge.id);
126
127 // Check if edge already exists
128 if tx.get(TABLE_EDGES, &key)?.is_some() {
129 return Err(GraphError::EdgeAlreadyExists(edge.id));
130 }
131
132 if validate_refs {
133 if !NodeStore::exists(tx, edge.source)? {
134 return Err(GraphError::InvalidEntityReference(edge.source));
135 }
136 if !NodeStore::exists(tx, edge.target)? {
137 return Err(GraphError::InvalidEntityReference(edge.target));
138 }
139 }
140
141 Self::store_edge(tx, edge)?;
142 Ok(())
143 }
144
145 /// Internal helper to store an edge and its indexes.
146 fn store_edge<T: Transaction>(tx: &mut T, edge: &Edge) -> GraphResult<()> {
147 // Store the edge data
148 let key = encode_edge_key(edge.id);
149 let value = edge.encode()?;
150 tx.put(TABLE_EDGES, &key, &value)?;
151
152 // Add all indexes using IndexMaintenance
153 IndexMaintenance::add_edge_indexes(tx, edge)?;
154
155 Ok(())
156 }
157
158 /// Get an edge by ID.
159 ///
160 /// # Arguments
161 ///
162 /// * `tx` - The transaction to use
163 /// * `id` - The edge ID to look up
164 ///
165 /// # Returns
166 ///
167 /// The edge if found, or `None` if it doesn't exist.
168 pub fn get<T: Transaction>(tx: &T, id: EdgeId) -> GraphResult<Option<Edge>> {
169 let key = encode_edge_key(id);
170 match tx.get(TABLE_EDGES, &key)? {
171 Some(value) => {
172 let edge = Edge::decode(&value)?;
173 Ok(Some(edge))
174 }
175 None => Ok(None),
176 }
177 }
178
179 /// Get an edge by ID, returning an error if not found.
180 ///
181 /// # Arguments
182 ///
183 /// * `tx` - The transaction to use
184 /// * `id` - The edge ID to look up
185 ///
186 /// # Errors
187 ///
188 /// Returns [`GraphError::EdgeNotFound`] if the edge doesn't exist.
189 pub fn get_or_error<T: Transaction>(tx: &T, id: EdgeId) -> GraphResult<Edge> {
190 Self::get(tx, id)?.ok_or(GraphError::EdgeNotFound(id))
191 }
192
193 /// Check if an edge exists.
194 ///
195 /// # Arguments
196 ///
197 /// * `tx` - The transaction to use
198 /// * `id` - The edge ID to check
199 pub fn exists<T: Transaction>(tx: &T, id: EdgeId) -> GraphResult<bool> {
200 let key = encode_edge_key(id);
201 Ok(tx.get(TABLE_EDGES, &key)?.is_some())
202 }
203
204 /// Update an existing edge.
205 ///
206 /// Note: The source, target, and edge type cannot be changed.
207 /// To change these, delete the edge and create a new one.
208 ///
209 /// # Arguments
210 ///
211 /// * `tx` - The transaction to use
212 /// * `edge` - The edge with updated data
213 ///
214 /// # Errors
215 ///
216 /// Returns [`GraphError::EdgeNotFound`] if the edge doesn't exist.
217 pub fn update<T: Transaction>(tx: &mut T, edge: &Edge) -> GraphResult<()> {
218 let key = encode_edge_key(edge.id);
219
220 // Get the old edge to verify it exists and check if structure changed
221 let old_value = tx.get(TABLE_EDGES, &key)?.ok_or(GraphError::EdgeNotFound(edge.id))?;
222 let old_edge = Edge::decode(&old_value)?;
223
224 // Update indexes if edge structure changed
225 IndexMaintenance::update_edge_indexes(tx, &old_edge, edge)?;
226
227 // Store updated edge
228 let value = edge.encode()?;
229 tx.put(TABLE_EDGES, &key, &value)?;
230
231 Ok(())
232 }
233
234 /// Delete an edge by ID.
235 ///
236 /// # Arguments
237 ///
238 /// * `tx` - The transaction to use
239 /// * `id` - The edge ID to delete
240 ///
241 /// # Returns
242 ///
243 /// `true` if the edge was deleted, `false` if it didn't exist.
244 pub fn delete<T: Transaction>(tx: &mut T, id: EdgeId) -> GraphResult<bool> {
245 let key = encode_edge_key(id);
246
247 // Get the edge to clean up indexes
248 let Some(value) = tx.get(TABLE_EDGES, &key)? else {
249 return Ok(false);
250 };
251 let edge = Edge::decode(&value)?;
252
253 // Remove all indexes using IndexMaintenance
254 IndexMaintenance::remove_edge_indexes(tx, &edge)?;
255
256 // Delete the edge
257 tx.delete(TABLE_EDGES, &key)?;
258 Ok(true)
259 }
260
261 /// Get all outgoing edges from an entity.
262 ///
263 /// # Arguments
264 ///
265 /// * `tx` - The transaction to use
266 /// * `source` - The source entity ID
267 pub fn get_outgoing<T: Transaction>(tx: &T, source: EntityId) -> GraphResult<Vec<Edge>> {
268 let edge_ids = AdjacencyIndex::get_outgoing_edge_ids(tx, source)?;
269 Self::get_edges_by_ids(tx, &edge_ids)
270 }
271
272 /// Get IDs of all outgoing edges from an entity.
273 ///
274 /// # Arguments
275 ///
276 /// * `tx` - The transaction to use
277 /// * `source` - The source entity ID
278 pub fn get_outgoing_ids<T: Transaction>(tx: &T, source: EntityId) -> GraphResult<Vec<EdgeId>> {
279 AdjacencyIndex::get_outgoing_edge_ids(tx, source)
280 }
281
282 /// Get outgoing edges of a specific type from an entity.
283 ///
284 /// # Arguments
285 ///
286 /// * `tx` - The transaction to use
287 /// * `source` - The source entity ID
288 /// * `edge_type` - The edge type to filter by
289 pub fn get_outgoing_by_type<T: Transaction>(
290 tx: &T,
291 source: EntityId,
292 edge_type: &EdgeType,
293 ) -> GraphResult<Vec<Edge>> {
294 let edge_ids = AdjacencyIndex::get_outgoing_by_type(tx, source, edge_type)?;
295 Self::get_edges_by_ids(tx, &edge_ids)
296 }
297
298 /// Get all incoming edges to an entity.
299 ///
300 /// # Arguments
301 ///
302 /// * `tx` - The transaction to use
303 /// * `target` - The target entity ID
304 pub fn get_incoming<T: Transaction>(tx: &T, target: EntityId) -> GraphResult<Vec<Edge>> {
305 let edge_ids = AdjacencyIndex::get_incoming_edge_ids(tx, target)?;
306 Self::get_edges_by_ids(tx, &edge_ids)
307 }
308
309 /// Get IDs of all incoming edges to an entity.
310 ///
311 /// # Arguments
312 ///
313 /// * `tx` - The transaction to use
314 /// * `target` - The target entity ID
315 pub fn get_incoming_ids<T: Transaction>(tx: &T, target: EntityId) -> GraphResult<Vec<EdgeId>> {
316 AdjacencyIndex::get_incoming_edge_ids(tx, target)
317 }
318
319 /// Get incoming edges of a specific type to an entity.
320 ///
321 /// # Arguments
322 ///
323 /// * `tx` - The transaction to use
324 /// * `target` - The target entity ID
325 /// * `edge_type` - The edge type to filter by
326 pub fn get_incoming_by_type<T: Transaction>(
327 tx: &T,
328 target: EntityId,
329 edge_type: &EdgeType,
330 ) -> GraphResult<Vec<Edge>> {
331 let edge_ids = AdjacencyIndex::get_incoming_by_type(tx, target, edge_type)?;
332 Self::get_edges_by_ids(tx, &edge_ids)
333 }
334
335 /// Find all edges of a specific type.
336 ///
337 /// # Arguments
338 ///
339 /// * `tx` - The transaction to use
340 /// * `edge_type` - The edge type to search for
341 pub fn find_by_type<T: Transaction>(tx: &T, edge_type: &EdgeType) -> GraphResult<Vec<EdgeId>> {
342 let prefix = encode_edge_type_index_prefix(edge_type);
343
344 // Create end bound
345 let mut end_prefix = prefix.clone();
346 if let Some(last) = end_prefix.last_mut() {
347 *last = last.saturating_add(1);
348 }
349
350 let mut cursor = tx.range(
351 TABLE_EDGE_TYPES,
352 Bound::Included(prefix.as_slice()),
353 Bound::Excluded(end_prefix.as_slice()),
354 )?;
355
356 let mut ids = Vec::new();
357 while let Some((key, _)) = cursor.next()? {
358 // Extract edge ID from the key (last 8 bytes after prefix + type hash)
359 if key.len() >= 17 {
360 let id_bytes: [u8; 8] = key[9..17].try_into().map_err(|_| {
361 GraphError::DataCorruption(format!(
362 "malformed edge type index key: expected 8 bytes for edge ID at offset 9, got {} total bytes",
363 key.len()
364 ))
365 })?;
366 ids.push(EdgeId::new(u64::from_be_bytes(id_bytes)));
367 } else {
368 return Err(GraphError::DataCorruption(format!(
369 "malformed edge type index key: expected at least 17 bytes, got {}",
370 key.len()
371 )));
372 }
373 }
374
375 Ok(ids)
376 }
377
378 /// Delete all edges connected to an entity.
379 ///
380 /// This removes all incoming and outgoing edges. Call this before
381 /// deleting an entity to maintain graph consistency.
382 ///
383 /// # Arguments
384 ///
385 /// * `tx` - The transaction to use
386 /// * `entity_id` - The entity ID
387 ///
388 /// # Returns
389 ///
390 /// The number of edges deleted.
391 pub fn delete_edges_for_entity<T: Transaction>(
392 tx: &mut T,
393 entity_id: EntityId,
394 ) -> GraphResult<usize> {
395 let mut deleted = 0;
396
397 // Delete outgoing edges
398 let outgoing_ids = Self::get_outgoing_ids(tx, entity_id)?;
399 for edge_id in outgoing_ids {
400 if Self::delete(tx, edge_id)? {
401 deleted += 1;
402 }
403 }
404
405 // Delete incoming edges
406 let incoming_ids = Self::get_incoming_ids(tx, entity_id)?;
407 for edge_id in incoming_ids {
408 if Self::delete(tx, edge_id)? {
409 deleted += 1;
410 }
411 }
412
413 Ok(deleted)
414 }
415
416 /// Helper to get full edges from a list of IDs.
417 fn get_edges_by_ids<T: Transaction>(tx: &T, ids: &[EdgeId]) -> GraphResult<Vec<Edge>> {
418 let mut edges = Vec::with_capacity(ids.len());
419 for &id in ids {
420 if let Some(edge) = Self::get(tx, id)? {
421 edges.push(edge);
422 }
423 }
424 Ok(edges)
425 }
426
427 /// Count all edges in the store.
428 ///
429 /// # Arguments
430 ///
431 /// * `tx` - The transaction to use
432 pub fn count<T: Transaction>(tx: &T) -> GraphResult<usize> {
433 let start = [PREFIX_EDGE];
434 let end = [PREFIX_EDGE + 1];
435
436 let mut cursor = tx.range(
437 TABLE_EDGES,
438 Bound::Included(start.as_slice()),
439 Bound::Excluded(end.as_slice()),
440 )?;
441
442 let mut count = 0;
443 while cursor.next()?.is_some() {
444 count += 1;
445 }
446
447 Ok(count)
448 }
449
450 /// Iterate over all edges.
451 ///
452 /// # Arguments
453 ///
454 /// * `tx` - The transaction to use
455 /// * `f` - A function to call for each edge. Return `false` to stop iteration.
456 pub fn for_each<T: Transaction, F>(tx: &T, mut f: F) -> GraphResult<()>
457 where
458 F: FnMut(&Edge) -> bool,
459 {
460 let start = [PREFIX_EDGE];
461 let end = [PREFIX_EDGE + 1];
462
463 let mut cursor = tx.range(
464 TABLE_EDGES,
465 Bound::Included(start.as_slice()),
466 Bound::Excluded(end.as_slice()),
467 )?;
468
469 while let Some((_, value)) = cursor.next()? {
470 let edge = Edge::decode(&value)?;
471 if !f(&edge) {
472 break;
473 }
474 }
475
476 Ok(())
477 }
478
479 /// Get all edges as a vector.
480 ///
481 /// Use with caution on large datasets - prefer [`Self::for_each`] for
482 /// processing edges without loading all into memory.
483 ///
484 /// # Arguments
485 ///
486 /// * `tx` - The transaction to use
487 pub fn all<T: Transaction>(tx: &T) -> GraphResult<Vec<Edge>> {
488 let mut edges = Vec::new();
489 Self::for_each(tx, |edge| {
490 edges.push(edge.clone());
491 true
492 })?;
493 Ok(edges)
494 }
495
496 /// Find the highest edge ID in the store.
497 ///
498 /// This is useful for initializing the ID generator after loading data.
499 ///
500 /// # Arguments
501 ///
502 /// * `tx` - The transaction to use
503 ///
504 /// # Returns
505 ///
506 /// The highest edge ID, or `None` if there are no edges.
507 pub fn max_id<T: Transaction>(tx: &T) -> GraphResult<Option<EdgeId>> {
508 let start = [PREFIX_EDGE];
509 let end = [PREFIX_EDGE + 1];
510
511 let mut cursor = tx.range(
512 TABLE_EDGES,
513 Bound::Included(start.as_slice()),
514 Bound::Excluded(end.as_slice()),
515 )?;
516
517 // Seek to the last key in the range
518 if cursor.seek_last()?.is_some() {
519 if let Some((key, _)) = cursor.current().map(|(k, v)| (k.to_vec(), v.to_vec())) {
520 return Ok(decode_edge_key(&key));
521 }
522 }
523
524 Ok(None)
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531
532 // Note: Integration tests with actual storage backend are in the tests/ directory
533
534 #[test]
535 fn table_names_are_valid() {
536 assert!(!TABLE_EDGES.is_empty());
537 assert!(!TABLE_EDGES_BY_SOURCE.is_empty());
538 assert!(!TABLE_EDGES_BY_TARGET.is_empty());
539 assert!(!TABLE_EDGE_TYPES.is_empty());
540 }
541
542 #[test]
543 fn table_names_are_unique() {
544 let tables = [TABLE_EDGES, TABLE_EDGES_BY_SOURCE, TABLE_EDGES_BY_TARGET, TABLE_EDGE_TYPES];
545 for (i, a) in tables.iter().enumerate() {
546 for (j, b) in tables.iter().enumerate() {
547 if i != j {
548 assert_ne!(a, b);
549 }
550 }
551 }
552 }
553}