manifold_graph/graph.rs
1//! Graph table implementation with bidirectional edge storage.
2
3use crate::edge::{current_timestamp_nanos, Edge};
4use manifold::{
5 ReadOnlyTable, ReadTransaction, ReadableTable, ReadableTableMetadata, StorageError, Table,
6 TableDefinition, TableError, WriteTransaction,
7};
8use uuid::Uuid;
9
10/// A table storing graph edges with bidirectional indexes and temporal tracking.
11///
12/// This table maintains two internal tables (forward and reverse) to enable
13/// efficient queries for both outgoing and incoming edges. Both tables are
14/// updated atomically within the same write transaction.
15///
16/// Value tuple: (is_active, weight, created_at, deleted_at)
17pub struct GraphTable<'txn> {
18 forward: Table<'txn, (Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
19 reverse: Table<'txn, (Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
20}
21
22impl<'txn> GraphTable<'txn> {
23 /// Opens a graph table for writing.
24 ///
25 /// Creates two internal tables: `{name}_forward` and `{name}_reverse`.
26 pub fn open(txn: &'txn WriteTransaction, name: &str) -> Result<Self, TableError> {
27 let forward_name = format!("{name}_forward");
28 let reverse_name = format!("{name}_reverse");
29
30 let forward_def: TableDefinition<(Uuid, &str, Uuid), (bool, f32, u64, u64)> =
31 TableDefinition::new(&forward_name);
32 let reverse_def: TableDefinition<(Uuid, &str, Uuid), (bool, f32, u64, u64)> =
33 TableDefinition::new(&reverse_name);
34
35 let forward = txn.open_table(forward_def)?;
36 let reverse = txn.open_table(reverse_def)?;
37
38 Ok(Self { forward, reverse })
39 }
40
41 /// Adds an edge to the graph with optional timestamp.
42 ///
43 /// Updates both forward and reverse indexes atomically.
44 ///
45 /// # Arguments
46 ///
47 /// * `source` - Source vertex UUID
48 /// * `edge_type` - Edge type (e.g., "follows", "knows")
49 /// * `target` - Target vertex UUID
50 /// * `is_active` - Whether the edge is active
51 /// * `weight` - Edge weight/score
52 /// * `created_at` - Optional creation timestamp (uses current time if None)
53 pub fn add_edge(
54 &mut self,
55 source: &Uuid,
56 edge_type: &str,
57 target: &Uuid,
58 is_active: bool,
59 weight: f32,
60 created_at: Option<u64>,
61 ) -> Result<(), TableError> {
62 let timestamp = created_at.unwrap_or_else(current_timestamp_nanos);
63 let properties = (is_active, weight, timestamp, 0);
64
65 // Insert into forward table: (source, edge_type, target) -> properties
66 self.forward
67 .insert(&(*source, edge_type, *target), &properties)?;
68
69 // Insert into reverse table: (target, edge_type, source) -> properties
70 self.reverse
71 .insert(&(*target, edge_type, *source), &properties)?;
72
73 Ok(())
74 }
75
76 /// Soft deletes an edge from the graph by setting deleted_at timestamp.
77 ///
78 /// The edge remains in storage for temporal queries but is marked as deleted.
79 /// Updates both forward and reverse indexes atomically.
80 pub fn remove_edge(
81 &mut self,
82 source: &Uuid,
83 edge_type: &str,
84 target: &Uuid,
85 ) -> Result<(), StorageError> {
86 // Get existing edge to preserve created_at
87 let key = (*source, edge_type, *target);
88 let edge_data = if let Some(guard) = self.forward.get(&key)? {
89 let (is_active, weight, created_at, _) = guard.value();
90 Some((is_active, weight, created_at))
91 } else {
92 None
93 };
94
95 if let Some((is_active, weight, created_at)) = edge_data {
96 let deleted_at = current_timestamp_nanos();
97 let properties = (is_active, weight, created_at, deleted_at);
98
99 // Update forward table with deleted_at
100 self.forward.insert(&key, &properties)?;
101
102 // Update reverse table with deleted_at
103 self.reverse
104 .insert(&(*target, edge_type, *source), &properties)?;
105 }
106
107 Ok(())
108 }
109
110 /// Hard deletes an edge from the graph, removing it entirely from storage.
111 ///
112 /// This permanently removes the edge and its history. Use remove_edge() for
113 /// soft delete that preserves temporal history.
114 pub fn hard_delete_edge(
115 &mut self,
116 source: &Uuid,
117 edge_type: &str,
118 target: &Uuid,
119 ) -> Result<(), StorageError> {
120 self.forward.remove(&(*source, edge_type, *target))?;
121 self.reverse.remove(&(*target, edge_type, *source))?;
122 Ok(())
123 }
124
125 /// Updates the properties of an existing edge while preserving timestamps.
126 ///
127 /// Updates both forward and reverse indexes atomically.
128 pub fn update_edge(
129 &mut self,
130 source: &Uuid,
131 edge_type: &str,
132 target: &Uuid,
133 is_active: bool,
134 weight: f32,
135 ) -> Result<(), TableError> {
136 // Get existing edge to preserve created_at
137 let key = (*source, edge_type, *target);
138 let created_at = if let Some(guard) = self.forward.get(&key)? {
139 let (_, _, timestamp, _) = guard.value();
140 timestamp
141 } else {
142 current_timestamp_nanos()
143 };
144
145 // Use add_edge with preserved created_at
146 self.add_edge(source, edge_type, target, is_active, weight, Some(created_at))
147 }
148
149 /// Adds multiple edges to the graph in a single batch operation.
150 ///
151 /// This method leverages Manifold's bulk insertion API for improved throughput,
152 /// especially beneficial when loading large graphs. Both forward and reverse
153 /// indexes are updated atomically within the same transaction.
154 ///
155 /// # Arguments
156 ///
157 /// * `edges` - Vector of edge tuples: (source, `edge_type`, target, `is_active`, `weight`, `created_at`)
158 /// * `sorted` - Whether the input is pre-sorted by (source, `edge_type`, target).
159 /// Set to `true` if your data is already sorted for best performance.
160 ///
161 /// # Returns
162 ///
163 /// Returns the number of edges inserted.
164 ///
165 /// # Performance
166 ///
167 /// - Sorted data (`sorted = true`): Uses optimized insertion with minimal tree rebalancing
168 /// - Unsorted data (`sorted = false`): Chunks and sorts data internally for good performance
169 /// - Batch operations benefit from WAL group commit for higher throughput
170 ///
171 /// # Example
172 ///
173 /// ```rust,no_run
174 /// # use manifold::column_family::ColumnFamilyDatabase;
175 /// # use manifold_graph::GraphTable;
176 /// # use uuid::Uuid;
177 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
178 /// # let db = ColumnFamilyDatabase::open("test.db")?;
179 /// # let cf = db.column_family_or_create("graph")?;
180 /// # let write_txn = cf.begin_write()?;
181 /// # let mut graph = GraphTable::open(&write_txn, "edges")?;
182 /// let u1 = Uuid::new_v4();
183 /// let u2 = Uuid::new_v4();
184 /// let u3 = Uuid::new_v4();
185 /// let now = 1234567890;
186 ///
187 /// let edges = vec![
188 /// (u1, "follows", u2, true, 1.0, now),
189 /// (u1, "follows", u3, true, 0.8, now),
190 /// (u2, "follows", u3, true, 0.9, now),
191 /// ];
192 ///
193 /// let count = graph.add_edges_batch(&edges, false)?;
194 /// assert_eq!(count, 3);
195 /// # Ok(())
196 /// # }
197 /// ```
198 #[allow(clippy::type_complexity)]
199 pub fn add_edges_batch(
200 &mut self,
201 edges: &[(Uuid, &str, Uuid, bool, f32, u64)],
202 sorted: bool,
203 ) -> Result<usize, StorageError> {
204 // Prepare forward table items: (source, edge_type, target) -> (is_active, weight, created_at, deleted_at)
205 let forward_items: Vec<((Uuid, &str, Uuid), (bool, f32, u64, u64))> = edges
206 .iter()
207 .map(|(source, edge_type, target, is_active, weight, created_at)| {
208 (
209 (*source, *edge_type, *target),
210 (*is_active, *weight, *created_at, 0),
211 )
212 })
213 .collect();
214
215 // Prepare reverse table items: (target, edge_type, source) -> (is_active, weight, created_at, deleted_at)
216 let reverse_items: Vec<((Uuid, &str, Uuid), (bool, f32, u64, u64))> = edges
217 .iter()
218 .map(|(source, edge_type, target, is_active, weight, created_at)| {
219 (
220 (*target, *edge_type, *source),
221 (*is_active, *weight, *created_at, 0),
222 )
223 })
224 .collect();
225
226 // Note: reverse items are NOT sorted even if forward items are,
227 // so we always use sorted=false for reverse table
228 let count = self.forward.insert_bulk(forward_items, sorted)?;
229
230 self.reverse.insert_bulk(reverse_items, false)?;
231
232 Ok(count)
233 }
234
235 /// Returns the number of edges in the forward table.
236 pub fn len(&self) -> Result<u64, StorageError> {
237 self.forward.len()
238 }
239
240 /// Returns `true` if the table contains no edges.
241 pub fn is_empty(&self) -> Result<bool, StorageError> {
242 Ok(self.len()? == 0)
243 }
244}
245
246/// Read-only graph table providing efficient edge traversal with temporal support.
247pub struct GraphTableRead {
248 forward: ReadOnlyTable<(Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
249 reverse: ReadOnlyTable<(Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
250}
251
252impl GraphTableRead {
253 /// Opens a graph table for reading.
254 pub fn open(txn: &ReadTransaction, name: &str) -> Result<Self, StorageError> {
255 let forward_name = format!("{name}_forward");
256 let reverse_name = format!("{name}_reverse");
257
258 let forward_def: TableDefinition<(Uuid, &str, Uuid), (bool, f32, u64, u64)> =
259 TableDefinition::new(&forward_name);
260 let reverse_def: TableDefinition<(Uuid, &str, Uuid), (bool, f32, u64, u64)> =
261 TableDefinition::new(&reverse_name);
262
263 let forward = txn.open_table(forward_def).map_err(|e| match e {
264 TableError::Storage(s) => s,
265 _ => StorageError::Io(std::io::Error::other(e)),
266 })?;
267
268 let reverse = txn.open_table(reverse_def).map_err(|e| match e {
269 TableError::Storage(s) => s,
270 _ => StorageError::Io(std::io::Error::other(e)),
271 })?;
272
273 Ok(Self { forward, reverse })
274 }
275
276 /// Retrieves a specific edge by source, edge_type, and target.
277 ///
278 /// Returns None if the edge doesn't exist or has been soft-deleted.
279 /// Use get_edge_at() for temporal queries.
280 pub fn get_edge(
281 &self,
282 source: &Uuid,
283 edge_type: &str,
284 target: &Uuid,
285 ) -> Result<Option<Edge>, StorageError> {
286 Ok(self
287 .forward
288 .get(&(*source, edge_type, *target))?
289 .and_then(|guard| {
290 let (is_active, weight, created_at, deleted_at) = guard.value();
291 // Only return if not deleted
292 if deleted_at == 0 {
293 Some(Edge::with_timestamps(
294 *source,
295 edge_type,
296 *target,
297 is_active,
298 weight,
299 created_at,
300 deleted_at,
301 ))
302 } else {
303 None
304 }
305 }))
306 }
307
308 /// Retrieves a specific edge at a given timestamp.
309 ///
310 /// Returns the edge if it existed at the specified timestamp (created_at <= timestamp
311 /// and either not deleted or deleted_at > timestamp).
312 pub fn get_edge_at(
313 &self,
314 source: &Uuid,
315 edge_type: &str,
316 target: &Uuid,
317 timestamp: u64,
318 ) -> Result<Option<Edge>, StorageError> {
319 Ok(self
320 .forward
321 .get(&(*source, edge_type, *target))?
322 .and_then(|guard| {
323 let (is_active, weight, created_at, deleted_at) = guard.value();
324 let edge = Edge::with_timestamps(
325 *source,
326 edge_type,
327 *target,
328 is_active,
329 weight,
330 created_at,
331 deleted_at,
332 );
333
334 if edge.is_active_at(timestamp) {
335 Some(edge)
336 } else {
337 None
338 }
339 }))
340 }
341
342 /// Returns an iterator over all outgoing edges from the given source vertex.
343 ///
344 /// By default, excludes soft-deleted edges. Use outgoing_edges_with_deleted() to include them.
345 pub fn outgoing_edges(&self, source: &Uuid) -> Result<OutgoingEdgeIter<'_>, StorageError> {
346 // Range from (source, "", nil_uuid) to (source, max_str, max_uuid)
347 let start = (*source, "", Uuid::nil());
348 let end = (*source, "\u{FFFF}", Uuid::max());
349
350 Ok(OutgoingEdgeIter {
351 inner: self.forward.range(start..end)?,
352 include_deleted: false,
353 })
354 }
355
356 /// Returns an iterator over all outgoing edges including soft-deleted ones.
357 pub fn outgoing_edges_with_deleted(
358 &self,
359 source: &Uuid,
360 ) -> Result<OutgoingEdgeIter<'_>, StorageError> {
361 let start = (*source, "", Uuid::nil());
362 let end = (*source, "\u{FFFF}", Uuid::max());
363
364 Ok(OutgoingEdgeIter {
365 inner: self.forward.range(start..end)?,
366 include_deleted: true,
367 })
368 }
369
370 /// Returns an iterator over all incoming edges to the given target vertex.
371 ///
372 /// By default, excludes soft-deleted edges. Use incoming_edges_with_deleted() to include them.
373 pub fn incoming_edges(&self, target: &Uuid) -> Result<IncomingEdgeIter<'_>, StorageError> {
374 // Range from (target, "", nil_uuid) to (target, max_str, max_uuid)
375 let start = (*target, "", Uuid::nil());
376 let end = (*target, "\u{FFFF}", Uuid::max());
377
378 Ok(IncomingEdgeIter {
379 inner: self.reverse.range(start..end)?,
380 include_deleted: false,
381 })
382 }
383
384 /// Returns an iterator over all incoming edges including soft-deleted ones.
385 pub fn incoming_edges_with_deleted(
386 &self,
387 target: &Uuid,
388 ) -> Result<IncomingEdgeIter<'_>, StorageError> {
389 let start = (*target, "", Uuid::nil());
390 let end = (*target, "\u{FFFF}", Uuid::max());
391
392 Ok(IncomingEdgeIter {
393 inner: self.reverse.range(start..end)?,
394 include_deleted: true,
395 })
396 }
397
398 /// Returns an iterator over all edges in the graph.
399 ///
400 /// By default, excludes soft-deleted edges. Use all_edges_with_deleted() to include them.
401 pub fn all_edges(&self) -> Result<AllEdgesIter<'_>, StorageError> {
402 Ok(AllEdgesIter {
403 inner: self.forward.iter()?,
404 include_deleted: false,
405 })
406 }
407
408 /// Returns an iterator over all edges including soft-deleted ones.
409 pub fn all_edges_with_deleted(&self) -> Result<AllEdgesIter<'_>, StorageError> {
410 Ok(AllEdgesIter {
411 inner: self.forward.iter()?,
412 include_deleted: true,
413 })
414 }
415
416 /// Returns the number of edges stored in this table.
417 pub fn len(&self) -> Result<u64, StorageError> {
418 self.forward.len()
419 }
420
421 /// Returns `true` if the table contains no edges.
422 pub fn is_empty(&self) -> Result<bool, StorageError> {
423 Ok(self.len()? == 0)
424 }
425}
426
427/// Iterator over outgoing edges from a source vertex.
428///
429/// By default, only returns non-deleted edges. Use all_edges_with_deleted() to include soft-deleted edges.
430pub struct OutgoingEdgeIter<'a> {
431 inner: manifold::Range<'a, (Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
432 include_deleted: bool,
433}
434
435impl Iterator for OutgoingEdgeIter<'_> {
436 type Item = Result<Edge, StorageError>;
437
438 fn next(&mut self) -> Option<Self::Item> {
439 loop {
440 let result = self.inner.next()?;
441
442 match result {
443 Ok((key_guard, value_guard)) => {
444 let (source, edge_type, target) = key_guard.value();
445 let (is_active, weight, created_at, deleted_at) = value_guard.value();
446
447 // Skip deleted edges unless include_deleted is true
448 if !self.include_deleted && deleted_at != 0 {
449 continue;
450 }
451
452 return Some(Ok(Edge::with_timestamps(
453 source,
454 edge_type,
455 target,
456 is_active,
457 weight,
458 created_at,
459 deleted_at,
460 )));
461 }
462 Err(e) => return Some(Err(e)),
463 }
464 }
465 }
466}
467
468/// Iterator over all edges in the graph.
469///
470/// By default, only returns non-deleted edges.
471pub struct AllEdgesIter<'a> {
472 inner: manifold::Range<'a, (Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
473 include_deleted: bool,
474}
475
476impl Iterator for AllEdgesIter<'_> {
477 type Item = Result<Edge, StorageError>;
478
479 fn next(&mut self) -> Option<Self::Item> {
480 loop {
481 let result = self.inner.next()?;
482
483 match result {
484 Ok((key_guard, value_guard)) => {
485 let (source, edge_type, target) = key_guard.value();
486 let (is_active, weight, created_at, deleted_at) = value_guard.value();
487
488 // Skip deleted edges unless include_deleted is true
489 if !self.include_deleted && deleted_at != 0 {
490 continue;
491 }
492
493 return Some(Ok(Edge::with_timestamps(
494 source,
495 edge_type,
496 target,
497 is_active,
498 weight,
499 created_at,
500 deleted_at,
501 )));
502 }
503 Err(e) => return Some(Err(e)),
504 }
505 }
506 }
507}
508
509/// Iterator over incoming edges to a target vertex.
510///
511/// By default, only returns non-deleted edges.
512pub struct IncomingEdgeIter<'a> {
513 inner: manifold::Range<'a, (Uuid, &'static str, Uuid), (bool, f32, u64, u64)>,
514 include_deleted: bool,
515}
516
517impl Iterator for IncomingEdgeIter<'_> {
518 type Item = Result<Edge, StorageError>;
519
520 fn next(&mut self) -> Option<Self::Item> {
521 loop {
522 let result = self.inner.next()?;
523
524 match result {
525 Ok((key_guard, value_guard)) => {
526 let (target, edge_type, source) = key_guard.value();
527 let (is_active, weight, created_at, deleted_at) = value_guard.value();
528
529 // Skip deleted edges unless include_deleted is true
530 if !self.include_deleted && deleted_at != 0 {
531 continue;
532 }
533
534 // Note: In reverse table, first UUID is target, third is source
535 return Some(Ok(Edge::with_timestamps(
536 source,
537 edge_type,
538 target,
539 is_active,
540 weight,
541 created_at,
542 deleted_at,
543 )));
544 }
545 Err(e) => return Some(Err(e)),
546 }
547 }
548 }
549}