1use std::{
2 any::{Any, TypeId as StdTypeId},
3 collections::HashMap,
4 path::Path,
5 sync::Arc,
6};
7
8use parking_lot::ReentrantMutex;
9use serde::Serialize;
10use tracing::instrument;
11use zerocopy::{FromBytes, IntoBytes};
12
13use ahash::{AHashMap, AHashSet};
14
15use crate::matrices::MatrixSet;
16use crate::{
17 csr::{CsrCache, CsrSnapshot},
18 error::Error,
19 schema::{
20 AdjEntry, DirectedNeighborEntry, EdgeId, EdgeRecord, LabelId, Language, NeighborEntry,
21 NodeId, NodeRecord, PropKeyId, PropValue, TypeId, WeightedPath,
22 },
23 storage::{
24 fts,
25 ids::{
26 adjust_label_count, adjust_type_count, alloc_edge_id, alloc_node_id, get_label,
27 get_or_create_label, get_or_create_prop_key, get_or_create_type, get_prop_key,
28 get_prop_key_name, get_type,
29 },
30 lmdb::Storage,
31 props,
32 },
33};
34
35pub mod algo;
36pub mod edge;
37pub mod fts_mod;
38pub mod graphblas;
39pub mod index;
40pub mod node;
41pub mod txn;
42pub mod vector;
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
46pub enum DegreeDirection {
47 In,
49 Out,
51 Both,
53}
54
55#[derive(Debug, Clone, Default)]
60pub struct TriangleCountSpec<'a> {
61 pub rel_types: [Option<&'a str>; 3],
63 pub labels: [Option<&'a str>; 3],
65}
66
67pub(super) fn composite_key(prefix: u32, id: u64) -> [u8; 12] {
69 let mut key = [0u8; 12];
70 key[..4].copy_from_slice(&prefix.to_be_bytes());
71 key[4..].copy_from_slice(&id.to_be_bytes());
72 key
73}
74
75pub(super) const ENCODED_NULL: u8 = 0x00;
77
78const SORT_SIGN_BIT: u64 = 0x8000_0000_0000_0000;
81
82pub(super) fn encode_property_value(val: &serde_json::Value) -> Option<Vec<u8>> {
94 match val {
95 serde_json::Value::Null => Some(vec![ENCODED_NULL]),
96 serde_json::Value::Bool(false) => Some(vec![0x01]),
97 serde_json::Value::Bool(true) => Some(vec![0x02]),
98 serde_json::Value::Number(num) => {
99 let float_val = num.as_f64()?;
100 let bits = float_val.to_bits();
101 let masked = if (bits & SORT_SIGN_BIT) != 0 {
102 !bits
103 } else {
104 bits ^ SORT_SIGN_BIT
105 };
106 let int_disambig: u64 = if let Some(i) = num.as_i64() {
113 (i as u64) ^ SORT_SIGN_BIT
114 } else if float_val.fract() == 0.0
115 && float_val >= i64::MIN as f64
116 && float_val <= i64::MAX as f64
117 {
118 ((float_val as i64) as u64) ^ SORT_SIGN_BIT
119 } else {
120 0
121 };
122 let mut buf = Vec::with_capacity(17);
123 buf.push(0x03);
124 buf.extend_from_slice(&masked.to_be_bytes());
125 buf.extend_from_slice(&int_disambig.to_be_bytes());
126 Some(buf)
127 }
128 serde_json::Value::String(s) => {
129 let mut buf = Vec::with_capacity(1 + s.len() + 1);
130 buf.push(0x04);
131 buf.extend_from_slice(s.as_bytes());
132 buf.push(0x00);
133 Some(buf)
134 }
135 _ => None, }
137}
138
139#[allow(dead_code)]
141pub(super) fn decode_property_value(bytes: &[u8]) -> Option<serde_json::Value> {
142 if bytes.is_empty() {
143 return None;
144 }
145 match bytes[0] {
146 0x00 => Some(serde_json::Value::Null),
147 0x01 => Some(serde_json::Value::Bool(false)),
148 0x02 => Some(serde_json::Value::Bool(true)),
149 0x03 => {
150 if bytes.len() < 17 {
152 return None;
153 }
154 let mut int_arr = [0u8; 8];
157 int_arr.copy_from_slice(&bytes[9..17]);
158 let int_val = (u64::from_be_bytes(int_arr) ^ SORT_SIGN_BIT) as i64;
159
160 let mut arr = [0u8; 8];
161 arr.copy_from_slice(&bytes[1..9]);
162 let masked = u64::from_be_bytes(arr);
163 let bits = if (masked & SORT_SIGN_BIT) == 0 {
164 !masked
165 } else {
166 masked ^ SORT_SIGN_BIT
167 };
168 let float_val = f64::from_bits(bits);
169
170 if (int_val as f64) == float_val {
175 Some(serde_json::Value::Number(int_val.into()))
176 } else {
177 serde_json::Number::from_f64(float_val).map(serde_json::Value::Number)
178 }
179 }
180 0x04 => {
181 let str_bytes = if bytes.ends_with(&[0x00]) {
182 &bytes[1..bytes.len() - 1]
183 } else {
184 &bytes[1..]
185 };
186 String::from_utf8(str_bytes.to_vec())
187 .ok()
188 .map(serde_json::Value::String)
189 }
190 _ => None,
191 }
192}
193
194pub(super) fn node_prop_index_key(
196 label_id: LabelId,
197 prop_key_id: PropKeyId,
198 encoded_val: &[u8],
199 node_id: NodeId,
200) -> Vec<u8> {
201 let mut key = Vec::with_capacity(4 + 4 + encoded_val.len() + 8);
202 key.extend_from_slice(&label_id.to_be_bytes());
203 key.extend_from_slice(&prop_key_id.to_be_bytes());
204 key.extend_from_slice(encoded_val);
205 key.extend_from_slice(&node_id.to_be_bytes());
206 key
207}
208
209pub(super) fn edge_prop_index_key(
211 type_id: TypeId,
212 prop_key_id: PropKeyId,
213 encoded_val: &[u8],
214 edge_id: EdgeId,
215) -> Vec<u8> {
216 let mut key = Vec::with_capacity(4 + 4 + encoded_val.len() + 8);
217 key.extend_from_slice(&type_id.to_be_bytes());
218 key.extend_from_slice(&prop_key_id.to_be_bytes());
219 key.extend_from_slice(encoded_val);
220 key.extend_from_slice(&edge_id.to_be_bytes());
221 key
222}
223
224pub(super) fn fts_postings_key(label_id: LabelId, prop_key_id: PropKeyId, term: &str) -> Vec<u8> {
226 let mut key = Vec::with_capacity(8 + term.len());
227 key.extend_from_slice(&label_id.to_be_bytes());
228 key.extend_from_slice(&prop_key_id.to_be_bytes());
229 key.extend_from_slice(term.as_bytes());
230 key
231}
232
233pub(super) fn fts_posting_val(node_id: NodeId, frequency: u32) -> [u8; 12] {
235 let mut val = [0u8; 12];
236 val[0..8].copy_from_slice(&node_id.to_be_bytes());
237 val[8..12].copy_from_slice(&frequency.to_be_bytes());
238 val
239}
240
241pub(super) fn parse_fts_posting_val(bytes: &[u8]) -> Result<(NodeId, u32), Error> {
243 if bytes.len() != 12 {
244 return Err(Error::Corrupt("fts posting value must be 12 bytes"));
245 }
246 let node_id = NodeId::from_be_bytes(
247 bytes[0..8]
248 .try_into()
249 .map_err(|_| Error::Corrupt("fts posting: node_id slice wrong size"))?,
250 );
251 let frequency = u32::from_be_bytes(
252 bytes[8..12]
253 .try_into()
254 .map_err(|_| Error::Corrupt("fts posting: frequency slice wrong size"))?,
255 );
256 Ok((node_id, frequency))
257}
258
259pub(super) fn fts_doc_key(label_id: LabelId, prop_key_id: PropKeyId, node_id: NodeId) -> [u8; 16] {
261 let mut key = [0u8; 16];
262 key[0..4].copy_from_slice(&label_id.to_be_bytes());
263 key[4..8].copy_from_slice(&prop_key_id.to_be_bytes());
264 key[8..16].copy_from_slice(&node_id.to_be_bytes());
265 key
266}
267
268pub(super) fn parse_fts_doc_val(bytes: &[u8]) -> Result<u32, Error> {
270 if bytes.len() != 4 {
271 return Err(Error::Corrupt("fts doc val must be 4 bytes"));
272 }
273 Ok(u32::from_be_bytes(bytes.try_into().map_err(|_| {
274 Error::Corrupt("fts doc val: slice wrong size")
275 })?))
276}
277
278pub(super) fn fts_stats_n_key(label_id: LabelId, prop_key_id: PropKeyId) -> String {
279 format!("fts_stats:node:l:{label_id}:p:{prop_key_id}:N")
280}
281
282pub(super) fn fts_stats_sum_dl_key(label_id: LabelId, prop_key_id: PropKeyId) -> String {
283 format!("fts_stats:node:l:{label_id}:p:{prop_key_id}:sum_dl")
284}
285
286#[derive(Clone)]
288pub struct Graph {
289 pub(super) storage: Arc<Storage>,
290 pub(super) _write_lock: Arc<ReentrantMutex<()>>,
291 pub(super) csr_cache: Arc<CsrCache>,
292 pub(super) matrices: Arc<parking_lot::RwLock<Option<MatrixSet>>>,
293 pub(super) prop_columns: Arc<crate::columns::ColumnsCache>,
294 pub(super) n_threads: Arc<std::sync::atomic::AtomicI32>,
295 pub(crate) extensions: Arc<parking_lot::Mutex<AHashMap<StdTypeId, Box<dyn Any + Send + Sync>>>>,
301}
302
303pub struct ReadTxn<'a> {
305 pub(super) graph: &'a Graph,
306 pub(super) rtxn: heed::RoTxn<'a, heed::WithTls>,
307}
308
309pub struct WriteTxn<'a> {
311 pub(super) graph: &'a Graph,
312 pub(super) wtxn: heed::RwTxn<'a>,
313 pub(super) mutations_count: usize,
314 pub(super) delta: crate::csr::GraphDelta,
317}
318
319impl Graph {
320 pub fn open(path: &Path, map_size_gb: usize) -> Result<Self, Error> {
321 let storage = Storage::open(path, map_size_gb)?;
322 let _ = std::fs::remove_file(path.join("csr_snapshot.bin"));
325 let initial = CsrSnapshot::build(&storage)?;
326 let storage = Arc::new(storage);
327 let csr_cache = Arc::new(CsrCache::new(initial));
328 let matrices = {
329 let initial_snap = csr_cache.snapshot.load();
330 let m = MatrixSet::materialize(&initial_snap, 0)?;
331 Arc::new(parking_lot::RwLock::new(Some(m)))
332 };
333 Ok(Self {
334 storage,
335 _write_lock: Arc::new(ReentrantMutex::new(())),
336 csr_cache,
337 matrices,
338 prop_columns: Arc::new(crate::columns::ColumnsCache::default()),
339 n_threads: Arc::new(std::sync::atomic::AtomicI32::new(0)),
340 extensions: Arc::new(parking_lot::Mutex::new(AHashMap::new())),
341 })
342 }
343
344 pub fn set_thread_count(&self, n: i32) -> Result<(), Error> {
347 self.n_threads
348 .store(n, std::sync::atomic::Ordering::Release);
349 issundb_graphblas::set_global_threads(n).map_err(|e| Error::GraphBLAS(e.to_string()))?;
350 Ok(())
351 }
352
353 pub fn node_prop_json(
359 &self,
360 id: NodeId,
361 prop: &str,
362 ) -> Result<Option<serde_json::Value>, Error> {
363 self.prop_columns.with_fresh(&self.storage, |cols| {
364 cols.id_to_dense.get(&id).map(|&d| {
365 cols.cols
366 .get(prop)
367 .and_then(|c| c.get_json_opt(d as usize))
368 .unwrap_or(serde_json::Value::Null)
369 })
370 })
371 }
372
373 pub fn node_props_json_table(
379 &self,
380 ids: &[NodeId],
381 props: &[&str],
382 ) -> Result<Vec<Vec<serde_json::Value>>, Error> {
383 self.prop_columns
384 .with_fresh(&self.storage, |cols| cols.props_table(ids, props))?
385 }
386
387 pub fn node_prop_json_column(
393 &self,
394 ids: &[NodeId],
395 prop: &str,
396 ) -> Result<Vec<serde_json::Value>, Error> {
397 self.prop_columns
398 .with_fresh(&self.storage, |cols| cols.prop_column(ids, prop))?
399 }
400
401 pub fn node_prop_group_codes(
408 &self,
409 ids: &[NodeId],
410 prop: &str,
411 ) -> Result<(Vec<u32>, Vec<serde_json::Value>), Error> {
412 self.prop_columns
413 .with_fresh(&self.storage, |cols| cols.group_codes(ids, prop))?
414 }
415
416 pub fn set_extension<T: Any + Send + Sync>(&self, val: Arc<T>) {
419 self.extensions
420 .lock()
421 .insert(StdTypeId::of::<T>(), Box::new(val));
422 }
423
424 pub fn get_extension<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
426 self.extensions
427 .lock()
428 .get(&StdTypeId::of::<T>())
429 .and_then(|b| b.downcast_ref::<Arc<T>>())
430 .cloned()
431 }
432
433 pub fn get_or_init_extension_with<T, E, F>(&self, init: F) -> Result<Arc<T>, E>
442 where
443 T: Any + Send + Sync,
444 F: FnOnce() -> Result<Arc<T>, E>,
445 {
446 if let Some(existing) = self.get_extension::<T>() {
447 return Ok(existing);
448 }
449 let value = init()?;
450 let mut ext = self.extensions.lock();
451 if let Some(existing) = ext
454 .get(&StdTypeId::of::<T>())
455 .and_then(|b| b.downcast_ref::<Arc<T>>())
456 {
457 return Ok(existing.clone());
458 }
459 ext.insert(StdTypeId::of::<T>(), Box::new(value.clone()));
460 Ok(value)
461 }
462
463 pub fn view<F, T>(&self, f: F) -> Result<T, Error>
465 where
466 F: FnOnce(&ReadTxn) -> Result<T, Error>,
467 {
468 let rtxn = self.storage.env.read_txn()?;
469 let txn = ReadTxn { graph: self, rtxn };
470 f(&txn)
471 }
472
473 pub fn update<F, T>(&self, f: F) -> Result<T, Error>
475 where
476 F: FnOnce(&mut WriteTxn) -> Result<T, Error>,
477 {
478 let _guard = self._write_lock.lock();
479 let wtxn = self.storage.env.write_txn()?;
480 let mut txn = WriteTxn {
481 graph: self,
482 wtxn,
483 mutations_count: 0,
484 delta: crate::csr::GraphDelta::default(),
485 };
486 match f(&mut txn) {
487 Ok(val) => {
488 let mutations_count = txn.mutations_count;
489 let delta = std::mem::take(&mut txn.delta);
490 txn.wtxn.commit()?;
491 if delta.force_full {
492 self.prop_columns.record_force_full();
493 } else {
494 self.prop_columns.record_touched_many(&delta.added_nodes);
495 self.prop_columns.record_touched_many(&delta.updated_nodes);
496 }
497 self.csr_cache.record_batch(delta);
498 if mutations_count > 0 {
499 self.maybe_spawn_rebuild_n(mutations_count);
500 }
501 Ok(val)
502 }
503 Err(err) => {
504 txn.wtxn.abort();
505 Err(err)
506 }
507 }
508 }
509
510 pub fn with_write_lock<F, R>(&self, f: F) -> R
514 where
515 F: FnOnce() -> R,
516 {
517 let _guard = self._write_lock.lock();
518 f()
519 }
520
521 #[instrument(skip(self))]
525 pub fn rebuild_csr(&self) -> Result<(), Error> {
526 let built_gen = self.csr_cache.current_gen();
529 self.csr_cache.clear_delta();
533 let snap = CsrSnapshot::build(&self.storage)?;
534 let m = MatrixSet::materialize(
535 &snap,
536 self.n_threads.load(std::sync::atomic::Ordering::Acquire),
537 )?;
538 *self.matrices.write() = Some(m);
539 self.csr_cache.install_full(snap, built_gen);
540 Ok(())
541 }
542
543 pub fn backup(&self, destination: &Path) -> Result<(), Error> {
552 self.storage
553 .env
554 .copy_to_path(destination, heed::CompactionOption::Disabled)
555 .map(|_| ())
556 .map_err(Error::Storage)
557 }
558
559 pub fn backup_compact(&self, destination: &Path) -> Result<(), Error> {
564 self.storage
565 .env
566 .copy_to_path(destination, heed::CompactionOption::Enabled)
567 .map(|_| ())
568 .map_err(Error::Storage)
569 }
570
571 pub fn restore(snapshot_file: &Path, dst_dir: &Path) -> Result<(), Error> {
578 std::fs::create_dir_all(dst_dir)?;
579 let dst_file = dst_dir.join("data.mdb");
580 std::fs::copy(snapshot_file, &dst_file)?;
581 Ok(())
582 }
583}
584
585#[cfg(test)]
586mod extension_tests {
587 use std::sync::Arc;
588
589 use tempfile::TempDir;
590
591 use super::Graph;
592
593 fn open_tmp() -> (TempDir, Graph) {
594 let dir = TempDir::new().unwrap();
595 let g = Graph::open(dir.path(), 1).unwrap();
596 (dir, g)
597 }
598
599 #[test]
603 fn extension_roundtrip_by_type() {
604 let (_dir, g) = open_tmp();
605 assert!(g.get_extension::<String>().is_none());
606
607 g.set_extension(Arc::new(String::from("cache")));
608 let got = g.get_extension::<String>().expect("extension must exist");
609 assert_eq!(*got, "cache");
610 assert!(g.get_extension::<u64>().is_none(), "distinct type slot");
611
612 g.set_extension(Arc::new(String::from("replaced")));
613 assert_eq!(*g.get_extension::<String>().unwrap(), "replaced");
614 }
615
616 #[test]
619 fn get_or_init_extension_initializes_once() {
620 let (_dir, g) = open_tmp();
621
622 let v1 = g
623 .get_or_init_extension_with::<u64, std::convert::Infallible, _>(|| Ok(Arc::new(7)))
624 .unwrap();
625 assert_eq!(*v1, 7);
626
627 let v2 = g
628 .get_or_init_extension_with::<u64, std::convert::Infallible, _>(|| Ok(Arc::new(9)))
629 .unwrap();
630 assert_eq!(*v2, 7, "second init must not replace the stored value");
631 }
632
633 #[test]
635 fn get_or_init_extension_propagates_init_error() {
636 let (_dir, g) = open_tmp();
637
638 let err = g
639 .get_or_init_extension_with::<u64, &str, _>(|| Err("init failed"))
640 .unwrap_err();
641 assert_eq!(err, "init failed");
642 assert!(g.get_extension::<u64>().is_none());
643
644 let v = g
645 .get_or_init_extension_with::<u64, &str, _>(|| Ok(Arc::new(7)))
646 .unwrap();
647 assert_eq!(*v, 7);
648 }
649}
650
651#[cfg(test)]
652mod encode_tests {
653 use serde_json::json;
654
655 use super::{decode_property_value, encode_property_value};
656
657 #[test]
661 fn large_integers_do_not_collide() {
662 let a = encode_property_value(&json!(9_007_199_254_740_992_i64)).unwrap(); let b = encode_property_value(&json!(9_007_199_254_740_993_i64)).unwrap(); assert_ne!(a, b, "distinct large integers must encode distinctly");
665 }
666
667 #[test]
670 fn integer_and_equal_float_unify() {
671 assert_eq!(
672 encode_property_value(&json!(30)).unwrap(),
673 encode_property_value(&json!(30.0)).unwrap(),
674 );
675 assert_eq!(
676 encode_property_value(&json!(0)).unwrap(),
677 encode_property_value(&json!(0.0)).unwrap(),
678 );
679 }
680
681 #[test]
684 fn numeric_encoding_is_fixed_length() {
685 for v in [
686 json!(1),
687 json!(-1),
688 json!(0),
689 json!(i64::MAX),
690 json!(i64::MIN),
691 json!(3.5),
692 json!(-2.5e10),
693 ] {
694 assert_eq!(encode_property_value(&v).unwrap().len(), 17, "value {v}");
695 }
696 }
697
698 #[test]
701 fn numeric_ordering_preserved() {
702 let ascending: Vec<i64> = vec![
703 i64::MIN,
704 -1_000,
705 -1,
706 0,
707 1,
708 1_000,
709 1 << 53,
710 (1 << 53) + 1,
711 i64::MAX,
712 ];
713 let encoded: Vec<Vec<u8>> = ascending
714 .iter()
715 .map(|v| encode_property_value(&json!(v)).unwrap())
716 .collect();
717 let mut sorted = encoded.clone();
718 sorted.sort();
719 assert_eq!(encoded, sorted, "encodings must sort in numeric order");
720 }
721
722 #[test]
724 fn decode_round_trips_large_integer() {
725 for v in [
726 json!(0),
727 json!(-1),
728 json!(9_007_199_254_740_993_i64),
729 json!(i64::MAX),
730 ] {
731 let enc = encode_property_value(&v).unwrap();
732 assert_eq!(decode_property_value(&enc), Some(v.clone()), "value {v}");
733 }
734 }
735}