1use super::*;
2
3impl Graph {
4 #[instrument(skip(self, props), fields(src = %src, dst = %dst, etype = %etype))]
10 pub fn add_edge(
11 &self,
12 src: NodeId,
13 dst: NodeId,
14 etype: &str,
15 props: &impl Serialize,
16 ) -> Result<EdgeId, Error> {
17 let _guard = self._write_lock.lock();
18 let mut wtxn = self.storage.env.write_txn()?;
19 let edge_id = self.add_edge_impl(&mut wtxn, src, dst, etype, props)?;
20 wtxn.commit()?;
21 self.csr_cache.record_added_edge(src, dst);
22 self.maybe_spawn_rebuild();
23 Ok(edge_id)
24 }
25
26 pub(super) fn add_edge_impl(
27 &self,
28 wtxn: &mut heed::RwTxn,
29 src: NodeId,
30 dst: NodeId,
31 etype: &str,
32 props: &impl Serialize,
33 ) -> Result<EdgeId, Error> {
34 let type_id = get_or_create_type(&self.storage, wtxn, etype)?;
35 let edge_id = alloc_edge_id(&self.storage, wtxn)?;
36 let encoded_props = props::encode(props)?;
37
38 self.write_edge_index_entries(wtxn, edge_id, type_id, etype, &encoded_props)?;
40
41 let record = EdgeRecord {
42 src,
43 dst,
44 edge_type: type_id,
45 props: encoded_props,
46 };
47 self.storage
48 .edges
49 .put(wtxn, &edge_id, &props::encode(&record)?)?;
50 self.storage
51 .type_idx
52 .put(wtxn, &composite_key(type_id, edge_id), &())?;
53
54 self.append_adj(wtxn, src, dst, type_id, edge_id, true)?;
55 self.append_adj(wtxn, dst, src, type_id, edge_id, false)?;
56
57 adjust_type_count(&self.storage, wtxn, type_id, 1)?;
58
59 Ok(edge_id)
60 }
61
62 pub fn update_edge(&self, id: EdgeId, props: &impl serde::Serialize) -> Result<(), Error> {
64 let _guard = self._write_lock.lock();
65 let mut wtxn = self.storage.env.write_txn()?;
66 let existing = self
67 .storage
68 .edges
69 .get(&wtxn, &id)?
70 .ok_or(Error::EdgeNotFound(id))?;
71 let record: EdgeRecord = crate::storage::props::decode(existing)?;
72 let etype = self
73 .type_name_impl(&wtxn, record.edge_type)?
74 .ok_or(Error::Corrupt("edge type name missing"))?;
75
76 self.delete_edge_index_entries(&mut wtxn, id, &record)?;
81 let encoded_props = crate::storage::props::encode(props)?;
82 self.write_edge_index_entries(&mut wtxn, id, record.edge_type, &etype, &encoded_props)?;
83
84 let new_record = EdgeRecord {
85 src: record.src,
86 dst: record.dst,
87 edge_type: record.edge_type,
88 props: encoded_props,
89 };
90 self.storage
91 .edges
92 .put(&mut wtxn, &id, &crate::storage::props::encode(&new_record)?)?;
93 wtxn.commit()?;
94 Ok(())
95 }
96
97 pub fn get_edge(&self, id: EdgeId) -> Result<Option<EdgeRecord>, Error> {
99 let rtxn = self.storage.env.read_txn()?;
100 self.get_edge_impl(&rtxn, id)
101 }
102
103 pub(super) fn get_edge_impl(
104 &self,
105 txn: &heed::RoTxn,
106 id: EdgeId,
107 ) -> Result<Option<EdgeRecord>, Error> {
108 match self.storage.edges.get(txn, &id)? {
109 Some(bytes) => Ok(Some(props::decode(bytes)?)),
110 None => Ok(None),
111 }
112 }
113
114 #[instrument(skip(self))]
116 pub fn delete_edge(&self, id: EdgeId) -> Result<(), Error> {
117 let _guard = self._write_lock.lock();
118 let mut wtxn = self.storage.env.write_txn()?;
119 let endpoints = self.delete_edge_impl(&mut wtxn, id)?;
120 wtxn.commit()?;
121 if let Some((src, dst)) = endpoints {
122 self.csr_cache.record_removed_edge(src, dst);
123 }
124 self.maybe_spawn_rebuild();
125 Ok(())
126 }
127
128 pub(crate) fn delete_edge_impl(
132 &self,
133 wtxn: &mut heed::RwTxn,
134 id: EdgeId,
135 ) -> Result<Option<(NodeId, NodeId)>, Error> {
136 let record: EdgeRecord = match self.get_edge_impl(wtxn, id)? {
137 Some(rec) => rec,
138 None => return Ok(None),
139 };
140
141 self.delete_edge_index_entries(wtxn, id, &record)?;
143
144 self.storage.edges.delete(wtxn, &id)?;
146
147 self.storage
149 .type_idx
150 .delete(wtxn, &composite_key(record.edge_type, id))?;
151
152 adjust_type_count(&self.storage, wtxn, record.edge_type, -1)?;
154
155 let out_entry = AdjEntry {
157 edge_type: record.edge_type,
158 other: record.dst,
159 edge_id: id,
160 };
161 self.storage
162 .out_adj
163 .delete_one_duplicate(wtxn, &record.src, out_entry.as_bytes())?;
164
165 let in_entry = AdjEntry {
167 edge_type: record.edge_type,
168 other: record.src,
169 edge_id: id,
170 };
171 self.storage
172 .in_adj
173 .delete_one_duplicate(wtxn, &record.dst, in_entry.as_bytes())?;
174
175 Ok(Some((record.src, record.dst)))
176 }
177
178 pub fn out_neighbors(&self, node: NodeId) -> Result<Vec<NeighborEntry>, Error> {
192 let rtxn = self.storage.env.read_txn()?;
193 self.out_neighbors_impl(&rtxn, node)
194 }
195
196 pub(super) fn out_neighbors_impl(
197 &self,
198 rtxn: &heed::RoTxn,
199 node: NodeId,
200 ) -> Result<Vec<NeighborEntry>, Error> {
201 self.adj_entries_impl(rtxn, node, true)
202 }
203
204 pub fn in_neighbors(&self, node: NodeId) -> Result<Vec<NeighborEntry>, Error> {
206 let rtxn = self.storage.env.read_txn()?;
207 self.in_neighbors_impl(&rtxn, node)
208 }
209
210 pub(super) fn in_neighbors_impl(
211 &self,
212 rtxn: &heed::RoTxn,
213 node: NodeId,
214 ) -> Result<Vec<NeighborEntry>, Error> {
215 self.adj_entries_impl(rtxn, node, false)
216 }
217
218 pub fn node_has_relationships(&self, node: NodeId) -> Result<bool, Error> {
224 let rtxn = self.storage.env.read_txn()?;
225 if !self.adj_entries_impl(&rtxn, node, true)?.is_empty() {
226 return Ok(true);
227 }
228 Ok(!self.adj_entries_impl(&rtxn, node, false)?.is_empty())
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use tempfile::TempDir;
235
236 use super::*;
237
238 fn open_tmp() -> (TempDir, Graph) {
239 let dir = TempDir::new().unwrap();
240 let g = Graph::open(dir.path(), 1).unwrap();
241 (dir, g)
242 }
243
244 #[test]
248 fn out_neighbors_reflects_edge_added_after_snapshot() {
249 let (_dir, g) = open_tmp();
250 let a = g.add_node("N", &()).unwrap();
251 let b = g.add_node("N", &()).unwrap();
252
253 g.rebuild_csr().unwrap();
255 assert!(g.out_neighbors(a).unwrap().is_empty());
256
257 let eid = g.add_edge(a, b, "E", &()).unwrap();
258
259 let out = g.out_neighbors(a).unwrap();
260 assert_eq!(out.len(), 1, "new edge must be visible despite stale CSR");
261 assert_eq!(out[0].edge, eid);
262 assert_eq!(out[0].node, b);
263 }
264
265 #[test]
269 fn out_neighbors_reflects_edge_deleted_after_snapshot() {
270 let (_dir, g) = open_tmp();
271 let a = g.add_node("N", &()).unwrap();
272 let b = g.add_node("N", &()).unwrap();
273 let eid = g.add_edge(a, b, "E", &()).unwrap();
274
275 g.rebuild_csr().unwrap();
276 assert_eq!(g.out_neighbors(a).unwrap().len(), 1);
277
278 g.delete_edge(eid).unwrap();
279
280 assert!(
281 g.out_neighbors(a).unwrap().is_empty(),
282 "deleted edge must not appear, even though CSR still holds it"
283 );
284 }
285
286 #[test]
291 fn out_and_in_neighbors_agree_after_snapshot() {
292 let (_dir, g) = open_tmp();
293 let a = g.add_node("N", &()).unwrap();
294 let b = g.add_node("N", &()).unwrap();
295 g.rebuild_csr().unwrap();
296
297 let eid = g.add_edge(a, b, "E", &()).unwrap();
298
299 let out = g.out_neighbors(a).unwrap();
300 let inc = g.in_neighbors(b).unwrap();
301 assert_eq!(out.len(), 1);
302 assert_eq!(inc.len(), 1);
303 assert_eq!(out[0].edge, eid);
304 assert_eq!(inc[0].edge, eid);
305 }
306
307 #[test]
310 fn write_txn_out_neighbors_sees_uncommitted_edge() {
311 let (_dir, g) = open_tmp();
312 let a = g.add_node("N", &()).unwrap();
313 let b = g.add_node("N", &()).unwrap();
314 g.rebuild_csr().unwrap();
316
317 g.update(|txn| {
318 let eid = txn.add_edge(a, b, "E", &())?;
319 let out = txn.out_neighbors(a)?;
320 assert_eq!(out.len(), 1, "uncommitted edge must be visible in-txn");
321 assert_eq!(out[0].edge, eid);
322 Ok(())
323 })
324 .unwrap();
325 }
326
327 #[test]
330 fn update_edge_replaces_props() {
331 let (_dir, g) = open_tmp();
332 let a = g.add_node("N", &()).unwrap();
333 let b = g.add_node("N", &()).unwrap();
334 let eid = g.add_edge(a, b, "E", &serde_json::json!({"w": 1})).unwrap();
335
336 g.update_edge(eid, &serde_json::json!({"w": 2})).unwrap();
337
338 let rec = g.get_edge(eid).unwrap().expect("edge must still exist");
339 assert_eq!(rec.src, a);
340 assert_eq!(rec.dst, b);
341 let props: serde_json::Value = rmp_serde::from_slice(&rec.props).unwrap();
342 assert_eq!(props["w"], serde_json::json!(2));
343 }
344
345 #[test]
346 fn update_edge_missing_edge_errors() {
347 let (_dir, g) = open_tmp();
348 let err = g
349 .update_edge(999, &serde_json::json!({"w": 1}))
350 .unwrap_err();
351 assert!(matches!(err, Error::EdgeNotFound(999)));
352 }
353
354 #[test]
357 fn node_has_relationships_reflects_adjacency() {
358 let (_dir, g) = open_tmp();
359 let a = g.add_node("N", &()).unwrap();
360 let b = g.add_node("N", &()).unwrap();
361 assert!(!g.node_has_relationships(a).unwrap());
362 assert!(!g.node_has_relationships(b).unwrap());
363
364 let eid = g.add_edge(a, b, "E", &()).unwrap();
365 assert!(g.node_has_relationships(a).unwrap(), "out edge counts");
366 assert!(g.node_has_relationships(b).unwrap(), "in edge counts");
367
368 g.delete_edge(eid).unwrap();
369 assert!(!g.node_has_relationships(a).unwrap());
370 assert!(!g.node_has_relationships(b).unwrap());
371 }
372}