1use super::*;
2
3impl Graph {
4 #[instrument(skip(self, props), fields(label = %label))]
10 pub fn add_node(&self, label: &str, props: &impl Serialize) -> Result<NodeId, Error> {
11 let _guard = self._write_lock.lock();
12 let mut wtxn = self.storage.env.write_txn()?;
13 let id = self.add_node_impl(&mut wtxn, &[label], props)?;
14 wtxn.commit()?;
15 self.csr_cache.record_added_node(id);
16 self.prop_columns.record_touched(id);
17 self.maybe_spawn_rebuild();
18 Ok(id)
19 }
20
21 pub fn add_node_multi(&self, labels: &[&str], props: &impl Serialize) -> Result<NodeId, Error> {
24 let _guard = self._write_lock.lock();
25 let mut wtxn = self.storage.env.write_txn()?;
26 let id = self.add_node_impl(&mut wtxn, labels, props)?;
27 wtxn.commit()?;
28 self.csr_cache.record_added_node(id);
29 self.prop_columns.record_touched(id);
30 self.maybe_spawn_rebuild();
31 Ok(id)
32 }
33
34 pub(super) fn add_node_impl(
35 &self,
36 wtxn: &mut heed::RwTxn,
37 labels: &[&str],
38 props: &impl Serialize,
39 ) -> Result<NodeId, Error> {
40 let encoded_props = props::encode(props)?;
41 let props_json: serde_json::Value = props::decode(&encoded_props)?;
42
43 let mut resolved: Vec<(LabelId, String)> = Vec::with_capacity(labels.len());
45 for &name in labels {
46 let id = get_or_create_label(&self.storage, wtxn, name)?;
47 if !resolved.iter().any(|(lid, _)| *lid == id) {
48 resolved.push((id, name.to_string()));
49 }
50 }
51
52 let node_id = alloc_node_id(&self.storage, wtxn)?;
53 let record = NodeRecord {
54 labels: resolved.iter().map(|(id, _)| *id).collect(),
55 props: encoded_props,
56 };
57 self.storage
58 .nodes
59 .put(wtxn, &node_id, &props::encode(&record)?)?;
60
61 for (label_id, label_name) in &resolved {
64 self.storage
65 .label_idx
66 .put(wtxn, &composite_key(*label_id, node_id), &())?;
67 adjust_label_count(&self.storage, wtxn, *label_id, 1)?;
68 self.index_node_for_label(wtxn, *label_id, label_name, node_id, &props_json)?;
69 }
70
71 Ok(node_id)
72 }
73
74 fn index_node_for_label(
78 &self,
79 wtxn: &mut heed::RwTxn,
80 label_id: LabelId,
81 label_name: &str,
82 node_id: NodeId,
83 props_json: &serde_json::Value,
84 ) -> Result<(), Error> {
85 let active_indexes = self.get_active_node_indexes(wtxn, label_id)?;
87 for (prop_key_id, flags) in active_indexes {
88 if let Some(prop_name) = self.prop_key_name_impl(wtxn, prop_key_id)? {
89 let prop_val = props_json.get(&prop_name);
90
91 if flags == 0x02
93 && (prop_val.is_none() || prop_val == Some(&serde_json::Value::Null))
94 {
95 return Err(Error::RequiredConstraintViolation(
96 label_name.to_string(),
97 prop_name.to_string(),
98 ));
99 }
100
101 if let Some(val) = prop_val {
102 if val != &serde_json::Value::Null {
103 if let Some(encoded) = encode_property_value(val) {
104 if flags == 0x01 {
107 let mut prefix = Vec::with_capacity(4 + 4 + encoded.len());
108 prefix.extend_from_slice(&label_id.to_be_bytes());
109 prefix.extend_from_slice(&prop_key_id.to_be_bytes());
110 prefix.extend_from_slice(&encoded);
111
112 for entry in
113 self.storage.node_prop_idx.prefix_iter(wtxn, &prefix)?
114 {
115 let (key, _) = entry?;
116 if key.len() >= 8 {
117 let mut node_id_bytes = [0u8; 8];
118 node_id_bytes.copy_from_slice(&key[key.len() - 8..]);
119 let found_node_id = u64::from_be_bytes(node_id_bytes);
120 if found_node_id != node_id {
121 return Err(Error::UniqueConstraintViolation(
122 label_name.to_string(),
123 prop_name.to_string(),
124 val.to_string(),
125 ));
126 }
127 }
128 }
129 }
130
131 let idx_key =
132 node_prop_index_key(label_id, prop_key_id, &encoded, node_id);
133 self.storage.node_prop_idx.put(wtxn, &idx_key, &())?;
134 }
135 }
136 }
137 }
138 }
139
140 if let Some(obj) = props_json.as_object() {
143 for (prop_name, val) in obj {
144 if val.is_null() {
145 continue;
146 }
147 if let Some(encoded) = encode_property_value(val) {
148 let prop_key_id = get_or_create_prop_key(&self.storage, wtxn, prop_name)?;
149 let idx_key = node_prop_index_key(label_id, prop_key_id, &encoded, node_id);
150 self.storage.node_prop_idx.put(wtxn, &idx_key, &())?;
151 }
152 }
153 }
154
155 self.index_node_fts(wtxn, node_id, label_id, props_json)?;
157
158 Ok(())
159 }
160
161 fn unindex_node_for_label(
164 &self,
165 wtxn: &mut heed::RwTxn,
166 label_id: LabelId,
167 node_id: NodeId,
168 props_json: &serde_json::Value,
169 ) -> Result<(), Error> {
170 let active = self.get_active_node_indexes(wtxn, label_id)?;
171 for (prop_key_id, _) in active {
172 if let Some(prop_name) = self.prop_key_name_impl(wtxn, prop_key_id)? {
173 if let Some(val) = props_json.get(&prop_name) {
174 if let Some(encoded) = encode_property_value(val) {
175 let idx_key = node_prop_index_key(label_id, prop_key_id, &encoded, node_id);
176 self.storage.node_prop_idx.delete(wtxn, &idx_key)?;
177 }
178 }
179 }
180 }
181
182 if let Some(obj) = props_json.as_object() {
184 for (prop_name, val) in obj {
185 if val.is_null() {
186 continue;
187 }
188 if let Some(encoded) = encode_property_value(val) {
189 if let Some(pkid) = get_prop_key(&self.storage, &*wtxn, prop_name)? {
190 let idx_key = node_prop_index_key(label_id, pkid, &encoded, node_id);
191 self.storage.node_prop_idx.delete(wtxn, &idx_key)?;
192 }
193 }
194 }
195 }
196
197 self.delete_node_fts(wtxn, node_id, label_id, props_json)?;
199
200 Ok(())
201 }
202
203 pub fn get_node(&self, id: NodeId) -> Result<Option<NodeRecord>, Error> {
205 let rtxn = self.storage.env.read_txn()?;
206 self.get_node_impl(&rtxn, id)
207 }
208
209 pub(super) fn get_node_impl(
210 &self,
211 txn: &heed::RoTxn,
212 id: NodeId,
213 ) -> Result<Option<NodeRecord>, Error> {
214 match self.storage.nodes.get(txn, &id)? {
215 Some(bytes) => Ok(Some(props::decode(bytes)?)),
216 None => Ok(None),
217 }
218 }
219
220 pub fn update_node(&self, id: NodeId, props: &impl Serialize) -> Result<(), Error> {
227 let _guard = self._write_lock.lock();
228 let mut wtxn = self.storage.env.write_txn()?;
229 self.update_node_impl(&mut wtxn, id, props)?;
230 wtxn.commit()?;
231 self.prop_columns.record_touched(id);
232 self.maybe_spawn_rebuild();
233 Ok(())
234 }
235
236 pub(super) fn update_node_impl(
237 &self,
238 wtxn: &mut heed::RwTxn,
239 id: NodeId,
240 props: &impl Serialize,
241 ) -> Result<(), Error> {
242 let old_rec: NodeRecord = match self.storage.nodes.get(wtxn, &id)? {
243 Some(bytes) => props::decode(bytes)?,
244 None => return Err(Error::NodeNotFound(id)),
245 };
246
247 let labels = old_rec.labels.clone();
248 let encoded_props = props::encode(props)?;
249 let props_json: serde_json::Value = props::decode(&encoded_props)?;
250 let old_props_json: serde_json::Value = props::decode(&old_rec.props)?;
251
252 for &label_id in &labels {
257 let label_name = self
258 .label_name_impl(wtxn, label_id)?
259 .unwrap_or_else(|| label_id.to_string());
260 self.unindex_node_for_label(wtxn, label_id, id, &old_props_json)?;
261 self.index_node_for_label(wtxn, label_id, &label_name, id, &props_json)?;
262 }
263
264 let record = NodeRecord {
265 labels,
266 props: encoded_props,
267 };
268 self.storage
269 .nodes
270 .put(wtxn, &id, &props::encode(&record)?)?;
271 Ok(())
272 }
273
274 pub fn add_label(&self, id: NodeId, label: &str) -> Result<(), Error> {
276 let _guard = self._write_lock.lock();
277 let mut wtxn = self.storage.env.write_txn()?;
278 self.add_label_impl(&mut wtxn, id, label)?;
279 wtxn.commit()?;
280 self.maybe_spawn_rebuild();
281 Ok(())
282 }
283
284 pub(super) fn add_label_impl(
285 &self,
286 wtxn: &mut heed::RwTxn,
287 id: NodeId,
288 label: &str,
289 ) -> Result<(), Error> {
290 let mut record: NodeRecord = match self.storage.nodes.get(wtxn, &id)? {
291 Some(bytes) => props::decode(bytes)?,
292 None => return Err(Error::NodeNotFound(id)),
293 };
294 let label_id = get_or_create_label(&self.storage, wtxn, label)?;
295 if record.labels.contains(&label_id) {
296 return Ok(());
297 }
298 let props_json: serde_json::Value = props::decode(&record.props)?;
299 record.labels.push(label_id);
300 self.storage
301 .nodes
302 .put(wtxn, &id, &props::encode(&record)?)?;
303 self.storage
304 .label_idx
305 .put(wtxn, &composite_key(label_id, id), &())?;
306 adjust_label_count(&self.storage, wtxn, label_id, 1)?;
307 self.index_node_for_label(wtxn, label_id, label, id, &props_json)?;
308 Ok(())
309 }
310
311 pub fn remove_label(&self, id: NodeId, label: &str) -> Result<(), Error> {
314 let _guard = self._write_lock.lock();
315 let mut wtxn = self.storage.env.write_txn()?;
316 self.remove_label_impl(&mut wtxn, id, label)?;
317 wtxn.commit()?;
318 self.maybe_spawn_rebuild();
319 Ok(())
320 }
321
322 pub(super) fn remove_label_impl(
323 &self,
324 wtxn: &mut heed::RwTxn,
325 id: NodeId,
326 label: &str,
327 ) -> Result<(), Error> {
328 let mut record: NodeRecord = match self.storage.nodes.get(wtxn, &id)? {
329 Some(bytes) => props::decode(bytes)?,
330 None => return Ok(()),
331 };
332 let label_id = match get_label(&self.storage, &*wtxn, label)? {
333 Some(lid) => lid,
334 None => return Ok(()),
335 };
336 if let Some(pos) = record.labels.iter().position(|&l| l == label_id) {
337 let props_json: serde_json::Value = props::decode(&record.props)?;
338 record.labels.remove(pos);
339 self.storage
340 .nodes
341 .put(wtxn, &id, &props::encode(&record)?)?;
342 self.storage
343 .label_idx
344 .delete(wtxn, &composite_key(label_id, id))?;
345 adjust_label_count(&self.storage, wtxn, label_id, -1)?;
346 self.unindex_node_for_label(wtxn, label_id, id, &props_json)?;
347 }
348 Ok(())
349 }
350
351 pub fn node_labels(&self, id: NodeId) -> Result<Vec<String>, Error> {
354 let rtxn = self.storage.env.read_txn()?;
355 self.node_labels_impl(&rtxn, id)
356 }
357
358 pub(super) fn node_labels_impl(
359 &self,
360 rtxn: &heed::RoTxn,
361 id: NodeId,
362 ) -> Result<Vec<String>, Error> {
363 match self.get_node_impl(rtxn, id)? {
364 Some(rec) => {
365 let mut names = Vec::with_capacity(rec.labels.len());
366 for lid in rec.labels {
367 if let Some(name) = self.label_name_impl(rtxn, lid)? {
368 names.push(name);
369 }
370 }
371 Ok(names)
372 }
373 None => Ok(vec![]),
374 }
375 }
376
377 #[instrument(skip(self))]
379 pub fn delete_node(&self, id: NodeId) -> Result<(), Error> {
380 let _guard = self._write_lock.lock();
381 let mut wtxn = self.storage.env.write_txn()?;
382 self.delete_node_impl(&mut wtxn, id)?;
383 wtxn.commit()?;
384 self.csr_cache.mark_force_full();
387 self.prop_columns.record_force_full();
388 self.maybe_spawn_rebuild();
389 Ok(())
390 }
391
392 pub(super) fn delete_node_impl(&self, wtxn: &mut heed::RwTxn, id: NodeId) -> Result<(), Error> {
393 let record: NodeRecord = match self.storage.nodes.get(wtxn, &id)? {
394 Some(bytes) => props::decode(bytes)?,
395 None => return Ok(()),
396 };
397
398 let props_json: serde_json::Value = props::decode(&record.props)?;
399
400 for &label_id in &record.labels {
403 self.unindex_node_for_label(wtxn, label_id, id, &props_json)?;
404 self.storage
405 .label_idx
406 .delete(wtxn, &composite_key(label_id, id))?;
407 adjust_label_count(&self.storage, wtxn, label_id, -1)?;
408 }
409
410 let mut out_edges = Vec::new();
412 if let Some(iter) = self.storage.out_adj.get_duplicates(wtxn, &id)? {
413 for result in iter {
414 let (_, bytes) = result?;
415 let entry = AdjEntry::read_from_bytes(bytes)
416 .ok()
417 .ok_or(Error::Corrupt("AdjEntry value is not exactly 20 bytes"))?;
418 out_edges.push(entry);
419 }
420 }
421
422 for entry in out_edges {
423 let edge_id = entry.edge_id;
424 let other = entry.other;
425 if let Some(edge_rec) = self.get_edge_impl(wtxn, edge_id)? {
426 self.delete_edge_index_entries(wtxn, edge_id, &edge_rec)?;
427 }
428 self.storage.edges.delete(wtxn, &edge_id)?;
430 self.storage
431 .type_idx
432 .delete(wtxn, &composite_key(entry.edge_type, edge_id))?;
433
434 adjust_type_count(&self.storage, wtxn, entry.edge_type, -1)?;
435
436 let in_entry = AdjEntry {
438 edge_type: entry.edge_type,
439 other: id,
440 edge_id,
441 };
442 self.storage
443 .in_adj
444 .delete_one_duplicate(wtxn, &other, in_entry.as_bytes())?;
445 }
446
447 let mut in_edges = Vec::new();
449 if let Some(iter) = self.storage.in_adj.get_duplicates(wtxn, &id)? {
450 for result in iter {
451 let (_, bytes) = result?;
452 let entry = AdjEntry::read_from_bytes(bytes)
453 .ok()
454 .ok_or(Error::Corrupt("AdjEntry value is not exactly 20 bytes"))?;
455 in_edges.push(entry);
456 }
457 }
458
459 for entry in in_edges {
460 let edge_id = entry.edge_id;
461 let other = entry.other;
462 if let Some(edge_rec) = self.get_edge_impl(wtxn, edge_id)? {
463 self.delete_edge_index_entries(wtxn, edge_id, &edge_rec)?;
464 }
465 self.storage.edges.delete(wtxn, &edge_id)?;
467 self.storage
468 .type_idx
469 .delete(wtxn, &composite_key(entry.edge_type, edge_id))?;
470
471 adjust_type_count(&self.storage, wtxn, entry.edge_type, -1)?;
472
473 let out_entry = AdjEntry {
475 edge_type: entry.edge_type,
476 other: id,
477 edge_id,
478 };
479 self.storage
480 .out_adj
481 .delete_one_duplicate(wtxn, &other, out_entry.as_bytes())?;
482 }
483
484 self.storage.out_adj.delete(wtxn, &id)?;
486 self.storage.in_adj.delete(wtxn, &id)?;
487
488 self.storage.vectors.delete(wtxn, &id)?;
490
491 self.storage.nodes.delete(wtxn, &id)?;
493
494 Ok(())
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use serde_json::json;
501 use tempfile::TempDir;
502
503 use super::*;
504
505 fn open_tmp() -> (TempDir, Graph) {
506 let dir = TempDir::new().unwrap();
507 let g = Graph::open(dir.path(), 1).unwrap();
508 (dir, g)
509 }
510
511 #[test]
514 fn multi_label_add_and_query() {
515 let (_dir, g) = open_tmp();
516 let id = g
517 .add_node_multi(&["A", "B", "C"], &json!({"x": 1}))
518 .unwrap();
519
520 let mut labels = g.node_labels(id).unwrap();
521 labels.sort();
522 assert_eq!(labels, vec!["A", "B", "C"]);
523
524 assert_eq!(g.nodes_by_label("A").unwrap(), vec![id]);
525 assert_eq!(g.nodes_by_label("B").unwrap(), vec![id]);
526 assert_eq!(g.nodes_by_label("C").unwrap(), vec![id]);
527 assert_eq!(g.node_count_by_label("B").unwrap(), 1);
528 }
529
530 #[test]
532 fn multi_label_empty_creates_unlabeled_node() {
533 let (_dir, g) = open_tmp();
534 let id = g.add_node_multi(&[], &json!({"x": 1})).unwrap();
535 assert!(g.node_labels(id).unwrap().is_empty());
536 assert!(g.get_node(id).unwrap().unwrap().labels.is_empty());
537 }
538
539 #[test]
541 fn multi_label_dedups() {
542 let (_dir, g) = open_tmp();
543 let id = g.add_node_multi(&["A", "A", "B"], &json!({})).unwrap();
544 assert_eq!(g.get_node(id).unwrap().unwrap().labels.len(), 2);
545 }
546
547 #[test]
549 fn add_label_is_idempotent_and_additive() {
550 let (_dir, g) = open_tmp();
551 let id = g.add_node("A", &json!({"x": 1})).unwrap();
552
553 g.add_label(id, "B").unwrap();
554 g.add_label(id, "B").unwrap(); let mut labels = g.node_labels(id).unwrap();
557 labels.sort();
558 assert_eq!(labels, vec!["A", "B"]);
559 assert_eq!(g.nodes_by_label("B").unwrap(), vec![id]);
560 assert_eq!(g.node_count_by_label("B").unwrap(), 1);
561 }
562
563 #[test]
565 fn remove_label_drops_one_keeps_rest() {
566 let (_dir, g) = open_tmp();
567 let id = g.add_node_multi(&["A", "B"], &json!({})).unwrap();
568
569 g.remove_label(id, "A").unwrap();
570
571 assert_eq!(g.node_labels(id).unwrap(), vec!["B"]);
572 assert!(g.nodes_by_label("A").unwrap().is_empty());
573 assert_eq!(g.nodes_by_label("B").unwrap(), vec![id]);
574 assert_eq!(g.node_count_by_label("A").unwrap(), 0);
575 }
576
577 #[test]
579 fn remove_label_missing_is_noop() {
580 let (_dir, g) = open_tmp();
581 let id = g.add_node("A", &json!({})).unwrap();
582 g.remove_label(id, "Nonexistent").unwrap();
583 g.remove_label(id, "B").unwrap();
584 assert_eq!(g.node_labels(id).unwrap(), vec!["A"]);
585 }
586
587 #[test]
590 fn label_mutation_updates_property_index() {
591 let (_dir, g) = open_tmp();
592 let id = g.add_node("A", &json!({"age": 30})).unwrap();
593
594 g.add_label(id, "B").unwrap();
595 assert_eq!(
596 g.nodes_by_property("B", "age", PropValue::Int(30)).unwrap(),
597 vec![id]
598 );
599
600 g.remove_label(id, "B").unwrap();
601 assert!(
602 g.nodes_by_property("B", "age", PropValue::Int(30))
603 .unwrap()
604 .is_empty()
605 );
606 assert_eq!(
608 g.nodes_by_property("A", "age", PropValue::Int(30)).unwrap(),
609 vec![id]
610 );
611 }
612
613 #[test]
615 fn delete_multi_label_node_clears_all_indexes() {
616 let (_dir, g) = open_tmp();
617 let id = g.add_node_multi(&["A", "B"], &json!({})).unwrap();
618 g.delete_node(id).unwrap();
619 assert!(g.nodes_by_label("A").unwrap().is_empty());
620 assert!(g.nodes_by_label("B").unwrap().is_empty());
621 assert_eq!(g.node_count_by_label("A").unwrap(), 0);
622 assert_eq!(g.node_count_by_label("B").unwrap(), 0);
623 }
624
625 #[test]
628 fn auto_index_on_insert() {
629 let (_dir, g) = open_tmp();
630
631 let node_id = g
632 .add_node("Person", &json!({"name": "Alice", "age": 30}))
633 .unwrap();
634
635 let hits = g
637 .nodes_by_property("Person", "age", PropValue::Int(30))
638 .unwrap();
639 assert_eq!(hits, vec![node_id]);
640
641 assert!(g.has_node_property_index("Person", "age").unwrap());
643 assert!(g.has_node_property_index("Person", "name").unwrap());
644 }
645
646 #[test]
648 fn auto_index_on_update() {
649 let (_dir, g) = open_tmp();
650
651 let node_id = g
652 .add_node("Person", &json!({"name": "Bob", "age": 25}))
653 .unwrap();
654
655 g.update_node(node_id, &json!({"name": "Bob", "age": 26}))
657 .unwrap();
658
659 let old_hits = g
661 .nodes_by_property("Person", "age", PropValue::Int(25))
662 .unwrap();
663 assert!(old_hits.is_empty());
664
665 let new_hits = g
667 .nodes_by_property("Person", "age", PropValue::Int(26))
668 .unwrap();
669 assert_eq!(new_hits, vec![node_id]);
670 }
671
672 #[test]
674 fn auto_index_on_delete() {
675 let (_dir, g) = open_tmp();
676
677 let node_id = g
678 .add_node("Person", &json!({"name": "Carol", "age": 40}))
679 .unwrap();
680
681 g.delete_node(node_id).unwrap();
682
683 let hits = g
684 .nodes_by_property("Person", "age", PropValue::Int(40))
685 .unwrap();
686 assert!(hits.is_empty());
687 }
688}