manifold_graph/
graph.rs

1//! Graph table implementation with bidirectional edge storage.
2
3use crate::edge::{current_timestamp_nanos, Edge};
4use manifold::{
5    ReadOnlyTable, ReadTransaction, ReadableTable, ReadableTableMetadata, StorageError, Table,
6    TableDefinition, TableError, WriteTransaction,
7};
8use uuid::Uuid;
9
10/// A table storing graph edges with bidirectional indexes and temporal tracking.
11///
12/// This table maintains two internal tables (forward and reverse) to enable
13/// efficient queries for both outgoing and incoming edges. Both tables are
14/// updated atomically within the same write transaction.
15///
16/// Value tuple: (is_active, weight, created_at, deleted_at)
17pub struct GraphTable<'txn> {
18    forward: Table<'txn, (Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
19    reverse: Table<'txn, (Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
20}
21
22impl<'txn> GraphTable<'txn> {
23    /// Opens a graph table for writing.
24    ///
25    /// Creates two internal tables: `{name}_forward` and `{name}_reverse`.
26    pub fn open(txn: &'txn WriteTransaction, name: &str) -> Result<Self, TableError> {
27        let forward_name = format!("{name}_forward");
28        let reverse_name = format!("{name}_reverse");
29
30        let forward_def: TableDefinition<(Uuid, &str, Uuid), (bool, f32, u64, u64)> =
31            TableDefinition::new(&forward_name);
32        let reverse_def: TableDefinition<(Uuid, &str, Uuid), (bool, f32, u64, u64)> =
33            TableDefinition::new(&reverse_name);
34
35        let forward = txn.open_table(forward_def)?;
36        let reverse = txn.open_table(reverse_def)?;
37
38        Ok(Self { forward, reverse })
39    }
40
41    /// Adds an edge to the graph with optional timestamp.
42    ///
43    /// Updates both forward and reverse indexes atomically.
44    ///
45    /// # Arguments
46    ///
47    /// * `source` - Source vertex UUID
48    /// * `edge_type` - Edge type (e.g., "follows", "knows")
49    /// * `target` - Target vertex UUID
50    /// * `is_active` - Whether the edge is active
51    /// * `weight` - Edge weight/score
52    /// * `created_at` - Optional creation timestamp (uses current time if None)
53    pub fn add_edge(
54        &mut self,
55        source: &Uuid,
56        edge_type: &str,
57        target: &Uuid,
58        is_active: bool,
59        weight: f32,
60        created_at: Option<u64>,
61    ) -> Result<(), TableError> {
62        let timestamp = created_at.unwrap_or_else(current_timestamp_nanos);
63        let properties = (is_active, weight, timestamp, 0);
64
65        // Insert into forward table: (source, edge_type, target) -> properties
66        self.forward
67            .insert(&(*source, edge_type, *target), &properties)?;
68
69        // Insert into reverse table: (target, edge_type, source) -> properties
70        self.reverse
71            .insert(&(*target, edge_type, *source), &properties)?;
72
73        Ok(())
74    }
75
76    /// Soft deletes an edge from the graph by setting deleted_at timestamp.
77    ///
78    /// The edge remains in storage for temporal queries but is marked as deleted.
79    /// Updates both forward and reverse indexes atomically.
80    pub fn remove_edge(
81        &mut self,
82        source: &Uuid,
83        edge_type: &str,
84        target: &Uuid,
85    ) -> Result<(), StorageError> {
86        // Get existing edge to preserve created_at
87        let key = (*source, edge_type, *target);
88        let edge_data = if let Some(guard) = self.forward.get(&key)? {
89            let (is_active, weight, created_at, _) = guard.value();
90            Some((is_active, weight, created_at))
91        } else {
92            None
93        };
94
95        if let Some((is_active, weight, created_at)) = edge_data {
96            let deleted_at = current_timestamp_nanos();
97            let properties = (is_active, weight, created_at, deleted_at);
98
99            // Update forward table with deleted_at
100            self.forward.insert(&key, &properties)?;
101
102            // Update reverse table with deleted_at
103            self.reverse
104                .insert(&(*target, edge_type, *source), &properties)?;
105        }
106
107        Ok(())
108    }
109
110    /// Hard deletes an edge from the graph, removing it entirely from storage.
111    ///
112    /// This permanently removes the edge and its history. Use remove_edge() for
113    /// soft delete that preserves temporal history.
114    pub fn hard_delete_edge(
115        &mut self,
116        source: &Uuid,
117        edge_type: &str,
118        target: &Uuid,
119    ) -> Result<(), StorageError> {
120        self.forward.remove(&(*source, edge_type, *target))?;
121        self.reverse.remove(&(*target, edge_type, *source))?;
122        Ok(())
123    }
124
125    /// Updates the properties of an existing edge while preserving timestamps.
126    ///
127    /// Updates both forward and reverse indexes atomically.
128    pub fn update_edge(
129        &mut self,
130        source: &Uuid,
131        edge_type: &str,
132        target: &Uuid,
133        is_active: bool,
134        weight: f32,
135    ) -> Result<(), TableError> {
136        // Get existing edge to preserve created_at
137        let key = (*source, edge_type, *target);
138        let created_at = if let Some(guard) = self.forward.get(&key)? {
139            let (_, _, timestamp, _) = guard.value();
140            timestamp
141        } else {
142            current_timestamp_nanos()
143        };
144
145        // Use add_edge with preserved created_at
146        self.add_edge(source, edge_type, target, is_active, weight, Some(created_at))
147    }
148
149    /// Adds multiple edges to the graph in a single batch operation.
150    ///
151    /// This method leverages Manifold's bulk insertion API for improved throughput,
152    /// especially beneficial when loading large graphs. Both forward and reverse
153    /// indexes are updated atomically within the same transaction.
154    ///
155    /// # Arguments
156    ///
157    /// * `edges` - Vector of edge tuples: (source, `edge_type`, target, `is_active`, `weight`, `created_at`)
158    /// * `sorted` - Whether the input is pre-sorted by (source, `edge_type`, target).
159    ///   Set to `true` if your data is already sorted for best performance.
160    ///
161    /// # Returns
162    ///
163    /// Returns the number of edges inserted.
164    ///
165    /// # Performance
166    ///
167    /// - Sorted data (`sorted = true`): Uses optimized insertion with minimal tree rebalancing
168    /// - Unsorted data (`sorted = false`): Chunks and sorts data internally for good performance
169    /// - Batch operations benefit from WAL group commit for higher throughput
170    ///
171    /// # Example
172    ///
173    /// ```rust,no_run
174    /// # use manifold::column_family::ColumnFamilyDatabase;
175    /// # use manifold_graph::GraphTable;
176    /// # use uuid::Uuid;
177    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
178    /// # let db = ColumnFamilyDatabase::open("test.db")?;
179    /// # let cf = db.column_family_or_create("graph")?;
180    /// # let write_txn = cf.begin_write()?;
181    /// # let mut graph = GraphTable::open(&write_txn, "edges")?;
182    /// let u1 = Uuid::new_v4();
183    /// let u2 = Uuid::new_v4();
184    /// let u3 = Uuid::new_v4();
185    /// let now = 1234567890;
186    ///
187    /// let edges = vec![
188    ///     (u1, "follows", u2, true, 1.0, now),
189    ///     (u1, "follows", u3, true, 0.8, now),
190    ///     (u2, "follows", u3, true, 0.9, now),
191    /// ];
192    ///
193    /// let count = graph.add_edges_batch(&edges, false)?;
194    /// assert_eq!(count, 3);
195    /// # Ok(())
196    /// # }
197    /// ```
198    #[allow(clippy::type_complexity)]
199    pub fn add_edges_batch(
200        &mut self,
201        edges: &[(Uuid, &str, Uuid, bool, f32, u64)],
202        sorted: bool,
203    ) -> Result<usize, StorageError> {
204        // Prepare forward table items: (source, edge_type, target) -> (is_active, weight, created_at, deleted_at)
205        let forward_items: Vec<((Uuid, &str, Uuid), (bool, f32, u64, u64))> = edges
206            .iter()
207            .map(|(source, edge_type, target, is_active, weight, created_at)| {
208                (
209                    (*source, *edge_type, *target),
210                    (*is_active, *weight, *created_at, 0),
211                )
212            })
213            .collect();
214
215        // Prepare reverse table items: (target, edge_type, source) -> (is_active, weight, created_at, deleted_at)
216        let reverse_items: Vec<((Uuid, &str, Uuid), (bool, f32, u64, u64))> = edges
217            .iter()
218            .map(|(source, edge_type, target, is_active, weight, created_at)| {
219                (
220                    (*target, *edge_type, *source),
221                    (*is_active, *weight, *created_at, 0),
222                )
223            })
224            .collect();
225
226        // Note: reverse items are NOT sorted even if forward items are,
227        // so we always use sorted=false for reverse table
228        let count = self.forward.insert_bulk(forward_items, sorted)?;
229
230        self.reverse.insert_bulk(reverse_items, false)?;
231
232        Ok(count)
233    }
234
235    /// Returns the number of edges in the forward table.
236    pub fn len(&self) -> Result<u64, StorageError> {
237        self.forward.len()
238    }
239
240    /// Returns `true` if the table contains no edges.
241    pub fn is_empty(&self) -> Result<bool, StorageError> {
242        Ok(self.len()? == 0)
243    }
244}
245
246/// Read-only graph table providing efficient edge traversal with temporal support.
247pub struct GraphTableRead {
248    forward: ReadOnlyTable<(Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
249    reverse: ReadOnlyTable<(Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
250}
251
252impl GraphTableRead {
253    /// Opens a graph table for reading.
254    pub fn open(txn: &ReadTransaction, name: &str) -> Result<Self, StorageError> {
255        let forward_name = format!("{name}_forward");
256        let reverse_name = format!("{name}_reverse");
257
258        let forward_def: TableDefinition<(Uuid, &str, Uuid), (bool, f32, u64, u64)> =
259            TableDefinition::new(&forward_name);
260        let reverse_def: TableDefinition<(Uuid, &str, Uuid), (bool, f32, u64, u64)> =
261            TableDefinition::new(&reverse_name);
262
263        let forward = txn.open_table(forward_def).map_err(|e| match e {
264            TableError::Storage(s) => s,
265            _ => StorageError::Io(std::io::Error::other(e)),
266        })?;
267
268        let reverse = txn.open_table(reverse_def).map_err(|e| match e {
269            TableError::Storage(s) => s,
270            _ => StorageError::Io(std::io::Error::other(e)),
271        })?;
272
273        Ok(Self { forward, reverse })
274    }
275
276    /// Retrieves a specific edge by source, edge_type, and target.
277    ///
278    /// Returns None if the edge doesn't exist or has been soft-deleted.
279    /// Use get_edge_at() for temporal queries.
280    pub fn get_edge(
281        &self,
282        source: &Uuid,
283        edge_type: &str,
284        target: &Uuid,
285    ) -> Result<Option<Edge>, StorageError> {
286        Ok(self
287            .forward
288            .get(&(*source, edge_type, *target))?
289            .and_then(|guard| {
290                let (is_active, weight, created_at, deleted_at) = guard.value();
291                // Only return if not deleted
292                if deleted_at == 0 {
293                    Some(Edge::with_timestamps(
294                        *source,
295                        edge_type,
296                        *target,
297                        is_active,
298                        weight,
299                        created_at,
300                        deleted_at,
301                    ))
302                } else {
303                    None
304                }
305            }))
306    }
307
308    /// Retrieves a specific edge at a given timestamp.
309    ///
310    /// Returns the edge if it existed at the specified timestamp (created_at <= timestamp
311    /// and either not deleted or deleted_at > timestamp).
312    pub fn get_edge_at(
313        &self,
314        source: &Uuid,
315        edge_type: &str,
316        target: &Uuid,
317        timestamp: u64,
318    ) -> Result<Option<Edge>, StorageError> {
319        Ok(self
320            .forward
321            .get(&(*source, edge_type, *target))?
322            .and_then(|guard| {
323                let (is_active, weight, created_at, deleted_at) = guard.value();
324                let edge = Edge::with_timestamps(
325                    *source,
326                    edge_type,
327                    *target,
328                    is_active,
329                    weight,
330                    created_at,
331                    deleted_at,
332                );
333
334                if edge.is_active_at(timestamp) {
335                    Some(edge)
336                } else {
337                    None
338                }
339            }))
340    }
341
342    /// Returns an iterator over all outgoing edges from the given source vertex.
343    ///
344    /// By default, excludes soft-deleted edges. Use outgoing_edges_with_deleted() to include them.
345    pub fn outgoing_edges(&self, source: &Uuid) -> Result<OutgoingEdgeIter<'_>, StorageError> {
346        // Range from (source, "", nil_uuid) to (source, max_str, max_uuid)
347        let start = (*source, "", Uuid::nil());
348        let end = (*source, "\u{FFFF}", Uuid::max());
349
350        Ok(OutgoingEdgeIter {
351            inner: self.forward.range(start..end)?,
352            include_deleted: false,
353        })
354    }
355
356    /// Returns an iterator over all outgoing edges including soft-deleted ones.
357    pub fn outgoing_edges_with_deleted(
358        &self,
359        source: &Uuid,
360    ) -> Result<OutgoingEdgeIter<'_>, StorageError> {
361        let start = (*source, "", Uuid::nil());
362        let end = (*source, "\u{FFFF}", Uuid::max());
363
364        Ok(OutgoingEdgeIter {
365            inner: self.forward.range(start..end)?,
366            include_deleted: true,
367        })
368    }
369
370    /// Returns an iterator over all incoming edges to the given target vertex.
371    ///
372    /// By default, excludes soft-deleted edges. Use incoming_edges_with_deleted() to include them.
373    pub fn incoming_edges(&self, target: &Uuid) -> Result<IncomingEdgeIter<'_>, StorageError> {
374        // Range from (target, "", nil_uuid) to (target, max_str, max_uuid)
375        let start = (*target, "", Uuid::nil());
376        let end = (*target, "\u{FFFF}", Uuid::max());
377
378        Ok(IncomingEdgeIter {
379            inner: self.reverse.range(start..end)?,
380            include_deleted: false,
381        })
382    }
383
384    /// Returns an iterator over all incoming edges including soft-deleted ones.
385    pub fn incoming_edges_with_deleted(
386        &self,
387        target: &Uuid,
388    ) -> Result<IncomingEdgeIter<'_>, StorageError> {
389        let start = (*target, "", Uuid::nil());
390        let end = (*target, "\u{FFFF}", Uuid::max());
391
392        Ok(IncomingEdgeIter {
393            inner: self.reverse.range(start..end)?,
394            include_deleted: true,
395        })
396    }
397
398    /// Returns an iterator over all edges in the graph.
399    ///
400    /// By default, excludes soft-deleted edges. Use all_edges_with_deleted() to include them.
401    pub fn all_edges(&self) -> Result<AllEdgesIter<'_>, StorageError> {
402        Ok(AllEdgesIter {
403            inner: self.forward.iter()?,
404            include_deleted: false,
405        })
406    }
407
408    /// Returns an iterator over all edges including soft-deleted ones.
409    pub fn all_edges_with_deleted(&self) -> Result<AllEdgesIter<'_>, StorageError> {
410        Ok(AllEdgesIter {
411            inner: self.forward.iter()?,
412            include_deleted: true,
413        })
414    }
415
416    /// Returns the number of edges stored in this table.
417    pub fn len(&self) -> Result<u64, StorageError> {
418        self.forward.len()
419    }
420
421    /// Returns `true` if the table contains no edges.
422    pub fn is_empty(&self) -> Result<bool, StorageError> {
423        Ok(self.len()? == 0)
424    }
425}
426
427/// Iterator over outgoing edges from a source vertex.
428///
429/// By default, only returns non-deleted edges. Use all_edges_with_deleted() to include soft-deleted edges.
430pub struct OutgoingEdgeIter<'a> {
431    inner: manifold::Range<'a, (Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
432    include_deleted: bool,
433}
434
435impl Iterator for OutgoingEdgeIter<'_> {
436    type Item = Result<Edge, StorageError>;
437
438    fn next(&mut self) -> Option<Self::Item> {
439        loop {
440            let result = self.inner.next()?;
441
442            match result {
443                Ok((key_guard, value_guard)) => {
444                    let (source, edge_type, target) = key_guard.value();
445                    let (is_active, weight, created_at, deleted_at) = value_guard.value();
446
447                    // Skip deleted edges unless include_deleted is true
448                    if !self.include_deleted && deleted_at != 0 {
449                        continue;
450                    }
451
452                    return Some(Ok(Edge::with_timestamps(
453                        source,
454                        edge_type,
455                        target,
456                        is_active,
457                        weight,
458                        created_at,
459                        deleted_at,
460                    )));
461                }
462                Err(e) => return Some(Err(e)),
463            }
464        }
465    }
466}
467
468/// Iterator over all edges in the graph.
469///
470/// By default, only returns non-deleted edges.
471pub struct AllEdgesIter<'a> {
472    inner: manifold::Range<'a, (Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
473    include_deleted: bool,
474}
475
476impl Iterator for AllEdgesIter<'_> {
477    type Item = Result<Edge, StorageError>;
478
479    fn next(&mut self) -> Option<Self::Item> {
480        loop {
481            let result = self.inner.next()?;
482
483            match result {
484                Ok((key_guard, value_guard)) => {
485                    let (source, edge_type, target) = key_guard.value();
486                    let (is_active, weight, created_at, deleted_at) = value_guard.value();
487
488                    // Skip deleted edges unless include_deleted is true
489                    if !self.include_deleted && deleted_at != 0 {
490                        continue;
491                    }
492
493                    return Some(Ok(Edge::with_timestamps(
494                        source,
495                        edge_type,
496                        target,
497                        is_active,
498                        weight,
499                        created_at,
500                        deleted_at,
501                    )));
502                }
503                Err(e) => return Some(Err(e)),
504            }
505        }
506    }
507}
508
509/// Iterator over incoming edges to a target vertex.
510///
511/// By default, only returns non-deleted edges.
512pub struct IncomingEdgeIter<'a> {
513    inner: manifold::Range<'a, (Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
514    include_deleted: bool,
515}
516
517impl Iterator for IncomingEdgeIter<'_> {
518    type Item = Result<Edge, StorageError>;
519
520    fn next(&mut self) -> Option<Self::Item> {
521        loop {
522            let result = self.inner.next()?;
523
524            match result {
525                Ok((key_guard, value_guard)) => {
526                    let (target, edge_type, source) = key_guard.value();
527                    let (is_active, weight, created_at, deleted_at) = value_guard.value();
528
529                    // Skip deleted edges unless include_deleted is true
530                    if !self.include_deleted && deleted_at != 0 {
531                        continue;
532                    }
533
534                    // Note: In reverse table, first UUID is target, third is source
535                    return Some(Ok(Edge::with_timestamps(
536                        source,
537                        edge_type,
538                        target,
539                        is_active,
540                        weight,
541                        created_at,
542                        deleted_at,
543                    )));
544                }
545                Err(e) => return Some(Err(e)),
546            }
547        }
548    }
549}