Skip to main content

issundb_core/graph/
edge.rs

1use super::*;
2
3impl Graph {
4    // ------------------------------------------------------------------
5    // Edges
6    // ------------------------------------------------------------------
7
8    /// Insert a directed edge `src → dst` with a string type and properties.
9    #[instrument(skip(self, props), fields(src = %src, dst = %dst, etype = %etype))]
10    pub fn add_edge(
11        &self,
12        src: NodeId,
13        dst: NodeId,
14        etype: &str,
15        props: &impl Serialize,
16    ) -> Result<EdgeId, Error> {
17        let _guard = self._write_lock.lock();
18        let mut wtxn = self.storage.env.write_txn()?;
19        let edge_id = self.add_edge_impl(&mut wtxn, src, dst, etype, props)?;
20        wtxn.commit()?;
21        self.csr_cache.record_added_edge(src, dst);
22        self.maybe_spawn_rebuild();
23        Ok(edge_id)
24    }
25
26    pub(super) fn add_edge_impl(
27        &self,
28        wtxn: &mut heed::RwTxn,
29        src: NodeId,
30        dst: NodeId,
31        etype: &str,
32        props: &impl Serialize,
33    ) -> Result<EdgeId, Error> {
34        let type_id = get_or_create_type(&self.storage, wtxn, etype)?;
35        let edge_id = alloc_edge_id(&self.storage, wtxn)?;
36        let encoded_props = props::encode(props)?;
37
38        // Validate constraints and populate indexes
39        self.write_edge_index_entries(wtxn, edge_id, type_id, etype, &encoded_props)?;
40
41        let record = EdgeRecord {
42            src,
43            dst,
44            edge_type: type_id,
45            props: encoded_props,
46        };
47        self.storage
48            .edges
49            .put(wtxn, &edge_id, &props::encode(&record)?)?;
50        self.storage
51            .type_idx
52            .put(wtxn, &composite_key(type_id, edge_id), &())?;
53
54        self.append_adj(wtxn, src, dst, type_id, edge_id, true)?;
55        self.append_adj(wtxn, dst, src, type_id, edge_id, false)?;
56
57        adjust_type_count(&self.storage, wtxn, type_id, 1)?;
58
59        Ok(edge_id)
60    }
61
62    /// Update the properties of an existing edge, preserving src, dst, and type.
63    pub fn update_edge(&self, id: EdgeId, props: &impl serde::Serialize) -> Result<(), Error> {
64        let _guard = self._write_lock.lock();
65        let mut wtxn = self.storage.env.write_txn()?;
66        let existing = self
67            .storage
68            .edges
69            .get(&wtxn, &id)?
70            .ok_or(Error::EdgeNotFound(id))?;
71        let record: EdgeRecord = crate::storage::props::decode(existing)?;
72        let etype = self
73            .type_name_impl(&wtxn, record.edge_type)?
74            .ok_or(Error::Corrupt("edge type name missing"))?;
75
76        // Re-index under the new properties: drop the old entries first so the
77        // unique check never conflicts with the edge against itself. A
78        // constraint violation aborts the uncommitted transaction, so the old
79        // entries survive.
80        self.delete_edge_index_entries(&mut wtxn, id, &record)?;
81        let encoded_props = crate::storage::props::encode(props)?;
82        self.write_edge_index_entries(&mut wtxn, id, record.edge_type, &etype, &encoded_props)?;
83
84        let new_record = EdgeRecord {
85            src: record.src,
86            dst: record.dst,
87            edge_type: record.edge_type,
88            props: encoded_props,
89        };
90        self.storage
91            .edges
92            .put(&mut wtxn, &id, &crate::storage::props::encode(&new_record)?)?;
93        wtxn.commit()?;
94        Ok(())
95    }
96
97    /// Fetch an edge record by id.
98    pub fn get_edge(&self, id: EdgeId) -> Result<Option<EdgeRecord>, Error> {
99        let rtxn = self.storage.env.read_txn()?;
100        self.get_edge_impl(&rtxn, id)
101    }
102
103    pub(super) fn get_edge_impl(
104        &self,
105        txn: &heed::RoTxn,
106        id: EdgeId,
107    ) -> Result<Option<EdgeRecord>, Error> {
108        match self.storage.edges.get(txn, &id)? {
109            Some(bytes) => Ok(Some(props::decode(bytes)?)),
110            None => Ok(None),
111        }
112    }
113
114    /// Delete an edge.
115    #[instrument(skip(self))]
116    pub fn delete_edge(&self, id: EdgeId) -> Result<(), Error> {
117        let _guard = self._write_lock.lock();
118        let mut wtxn = self.storage.env.write_txn()?;
119        let endpoints = self.delete_edge_impl(&mut wtxn, id)?;
120        wtxn.commit()?;
121        if let Some((src, dst)) = endpoints {
122            self.csr_cache.record_removed_edge(src, dst);
123        }
124        self.maybe_spawn_rebuild();
125        Ok(())
126    }
127
128    /// Delete an edge inside an open write transaction. Returns the deleted
129    /// edge's `(src, dst)` endpoints so the caller can record the adjacency
130    /// removal, or `None` if no such edge existed.
131    pub(crate) fn delete_edge_impl(
132        &self,
133        wtxn: &mut heed::RwTxn,
134        id: EdgeId,
135    ) -> Result<Option<(NodeId, NodeId)>, Error> {
136        let record: EdgeRecord = match self.get_edge_impl(wtxn, id)? {
137            Some(rec) => rec,
138            None => return Ok(None),
139        };
140
141        // 1. Delete from edge property index
142        self.delete_edge_index_entries(wtxn, id, &record)?;
143
144        // 2. Delete the edge record itself
145        self.storage.edges.delete(wtxn, &id)?;
146
147        // 3. Delete from the type index
148        self.storage
149            .type_idx
150            .delete(wtxn, &composite_key(record.edge_type, id))?;
151
152        // 4. Adjust the type count
153        adjust_type_count(&self.storage, wtxn, record.edge_type, -1)?;
154
155        // 5. Delete from out_adj (key is src, other is dst)
156        let out_entry = AdjEntry {
157            edge_type: record.edge_type,
158            other: record.dst,
159            edge_id: id,
160        };
161        self.storage
162            .out_adj
163            .delete_one_duplicate(wtxn, &record.src, out_entry.as_bytes())?;
164
165        // 6. Delete from in_adj (key is dst, other is src)
166        let in_entry = AdjEntry {
167            edge_type: record.edge_type,
168            other: record.src,
169            edge_id: id,
170        };
171        self.storage
172            .in_adj
173            .delete_one_duplicate(wtxn, &record.dst, in_entry.as_bytes())?;
174
175        Ok(Some((record.src, record.dst)))
176    }
177
178    // ------------------------------------------------------------------
179    // Traversal
180    // ------------------------------------------------------------------
181
182    /// Returns neighbor entries for all outgoing edges of `node`.
183    ///
184    /// Reads the `out_adj` store directly through the supplied transaction so
185    /// the result always reflects committed (and, inside a [`WriteTxn`],
186    /// uncommitted) writes. The CSR snapshot is deliberately not consulted here:
187    /// it lags writes until the background rebuild runs, so serving point
188    /// lookups from it would return deleted edges, hide newly added ones, and
189    /// disagree with [`Self::in_neighbors`]. The snapshot remains the basis for
190    /// the GraphBLAS matrix algorithms, which have explicit snapshot semantics.
191    pub fn out_neighbors(&self, node: NodeId) -> Result<Vec<NeighborEntry>, Error> {
192        let rtxn = self.storage.env.read_txn()?;
193        self.out_neighbors_impl(&rtxn, node)
194    }
195
196    pub(super) fn out_neighbors_impl(
197        &self,
198        rtxn: &heed::RoTxn,
199        node: NodeId,
200    ) -> Result<Vec<NeighborEntry>, Error> {
201        self.adj_entries_impl(rtxn, node, true)
202    }
203
204    /// Returns neighbor entries for all incoming edges of `node`.
205    pub fn in_neighbors(&self, node: NodeId) -> Result<Vec<NeighborEntry>, Error> {
206        let rtxn = self.storage.env.read_txn()?;
207        self.in_neighbors_impl(&rtxn, node)
208    }
209
210    pub(super) fn in_neighbors_impl(
211        &self,
212        rtxn: &heed::RoTxn,
213        node: NodeId,
214    ) -> Result<Vec<NeighborEntry>, Error> {
215        self.adj_entries_impl(rtxn, node, false)
216    }
217
218    /// Returns whether the node has any incident relationship, reading the
219    /// adjacency stores directly. Unlike [`Self::out_neighbors`], this never
220    /// consults the CSR snapshot, which lags writes until the background rebuild
221    /// completes. Write-time consistency checks (such as the DELETE connected-node
222    /// guard) must see just-applied edge deletions, so they rely on this method.
223    pub fn node_has_relationships(&self, node: NodeId) -> Result<bool, Error> {
224        let rtxn = self.storage.env.read_txn()?;
225        if !self.adj_entries_impl(&rtxn, node, true)?.is_empty() {
226            return Ok(true);
227        }
228        Ok(!self.adj_entries_impl(&rtxn, node, false)?.is_empty())
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use tempfile::TempDir;
235
236    use super::*;
237
238    fn open_tmp() -> (TempDir, Graph) {
239        let dir = TempDir::new().unwrap();
240        let g = Graph::open(dir.path(), 1).unwrap();
241        (dir, g)
242    }
243
244    /// After a CSR rebuild captures a node into the snapshot, adding an edge to
245    /// that node must be visible through `out_neighbors`. The snapshot lags
246    /// writes, so consulting it for point lookups would hide the new edge.
247    #[test]
248    fn out_neighbors_reflects_edge_added_after_snapshot() {
249        let (_dir, g) = open_tmp();
250        let a = g.add_node("N", &()).unwrap();
251        let b = g.add_node("N", &()).unwrap();
252
253        // Force a snapshot that includes `a` with zero outgoing edges.
254        g.rebuild_csr().unwrap();
255        assert!(g.out_neighbors(a).unwrap().is_empty());
256
257        let eid = g.add_edge(a, b, "E", &()).unwrap();
258
259        let out = g.out_neighbors(a).unwrap();
260        assert_eq!(out.len(), 1, "new edge must be visible despite stale CSR");
261        assert_eq!(out[0].edge, eid);
262        assert_eq!(out[0].node, b);
263    }
264
265    /// After a CSR rebuild captures an edge into the snapshot, deleting that
266    /// edge must remove it from `out_neighbors`. Serving from the stale snapshot
267    /// would return the deleted edge.
268    #[test]
269    fn out_neighbors_reflects_edge_deleted_after_snapshot() {
270        let (_dir, g) = open_tmp();
271        let a = g.add_node("N", &()).unwrap();
272        let b = g.add_node("N", &()).unwrap();
273        let eid = g.add_edge(a, b, "E", &()).unwrap();
274
275        g.rebuild_csr().unwrap();
276        assert_eq!(g.out_neighbors(a).unwrap().len(), 1);
277
278        g.delete_edge(eid).unwrap();
279
280        assert!(
281            g.out_neighbors(a).unwrap().is_empty(),
282            "deleted edge must not appear, even though CSR still holds it"
283        );
284    }
285
286    /// `out_neighbors` and `in_neighbors` must agree on the same edge after a
287    /// mutation that postdates the snapshot. This is the asymmetry the snapshot
288    /// fast path introduced: `in_neighbors` always read LMDB while
289    /// `out_neighbors` trusted the snapshot.
290    #[test]
291    fn out_and_in_neighbors_agree_after_snapshot() {
292        let (_dir, g) = open_tmp();
293        let a = g.add_node("N", &()).unwrap();
294        let b = g.add_node("N", &()).unwrap();
295        g.rebuild_csr().unwrap();
296
297        let eid = g.add_edge(a, b, "E", &()).unwrap();
298
299        let out = g.out_neighbors(a).unwrap();
300        let inc = g.in_neighbors(b).unwrap();
301        assert_eq!(out.len(), 1);
302        assert_eq!(inc.len(), 1);
303        assert_eq!(out[0].edge, eid);
304        assert_eq!(inc[0].edge, eid);
305    }
306
307    /// Inside a write transaction, `out_neighbors` must observe the edge created
308    /// earlier in the same uncommitted transaction (read-your-writes).
309    #[test]
310    fn write_txn_out_neighbors_sees_uncommitted_edge() {
311        let (_dir, g) = open_tmp();
312        let a = g.add_node("N", &()).unwrap();
313        let b = g.add_node("N", &()).unwrap();
314        // Snapshot `a` with no outgoing edges so the stale path would return [].
315        g.rebuild_csr().unwrap();
316
317        g.update(|txn| {
318            let eid = txn.add_edge(a, b, "E", &())?;
319            let out = txn.out_neighbors(a)?;
320            assert_eq!(out.len(), 1, "uncommitted edge must be visible in-txn");
321            assert_eq!(out[0].edge, eid);
322            Ok(())
323        })
324        .unwrap();
325    }
326
327    /// `update_edge` must replace the stored properties and leave the
328    /// endpoints and type untouched.
329    #[test]
330    fn update_edge_replaces_props() {
331        let (_dir, g) = open_tmp();
332        let a = g.add_node("N", &()).unwrap();
333        let b = g.add_node("N", &()).unwrap();
334        let eid = g.add_edge(a, b, "E", &serde_json::json!({"w": 1})).unwrap();
335
336        g.update_edge(eid, &serde_json::json!({"w": 2})).unwrap();
337
338        let rec = g.get_edge(eid).unwrap().expect("edge must still exist");
339        assert_eq!(rec.src, a);
340        assert_eq!(rec.dst, b);
341        let props: serde_json::Value = rmp_serde::from_slice(&rec.props).unwrap();
342        assert_eq!(props["w"], serde_json::json!(2));
343    }
344
345    #[test]
346    fn update_edge_missing_edge_errors() {
347        let (_dir, g) = open_tmp();
348        let err = g
349            .update_edge(999, &serde_json::json!({"w": 1}))
350            .unwrap_err();
351        assert!(matches!(err, Error::EdgeNotFound(999)));
352    }
353
354    /// `node_has_relationships` must reflect both adjacency directions and
355    /// must go back to `false` once the last edge is deleted.
356    #[test]
357    fn node_has_relationships_reflects_adjacency() {
358        let (_dir, g) = open_tmp();
359        let a = g.add_node("N", &()).unwrap();
360        let b = g.add_node("N", &()).unwrap();
361        assert!(!g.node_has_relationships(a).unwrap());
362        assert!(!g.node_has_relationships(b).unwrap());
363
364        let eid = g.add_edge(a, b, "E", &()).unwrap();
365        assert!(g.node_has_relationships(a).unwrap(), "out edge counts");
366        assert!(g.node_has_relationships(b).unwrap(), "in edge counts");
367
368        g.delete_edge(eid).unwrap();
369        assert!(!g.node_has_relationships(a).unwrap());
370        assert!(!g.node_has_relationships(b).unwrap());
371    }
372}