Skip to main content

issundb_core/graph/
mod.rs

1use std::{
2    any::{Any, TypeId as StdTypeId},
3    collections::HashMap,
4    path::Path,
5    sync::Arc,
6};
7
8use parking_lot::ReentrantMutex;
9use serde::Serialize;
10use tracing::instrument;
11use zerocopy::{FromBytes, IntoBytes};
12
13use ahash::{AHashMap, AHashSet};
14
15use crate::matrices::MatrixSet;
16use crate::{
17    csr::{CsrCache, CsrSnapshot},
18    error::Error,
19    schema::{
20        AdjEntry, DirectedNeighborEntry, EdgeId, EdgeRecord, LabelId, Language, NeighborEntry,
21        NodeId, NodeRecord, PropKeyId, PropValue, TypeId, WeightedPath,
22    },
23    storage::{
24        fts,
25        ids::{
26            adjust_label_count, adjust_type_count, alloc_edge_id, alloc_node_id, get_label,
27            get_or_create_label, get_or_create_prop_key, get_or_create_type, get_prop_key,
28            get_prop_key_name, get_type,
29        },
30        lmdb::Storage,
31        props,
32    },
33};
34
35pub mod algo;
36pub mod edge;
37pub mod fts_mod;
38pub mod graphblas;
39pub mod index;
40pub mod node;
41pub mod txn;
42pub mod vector;
43
44/// The direction of edges to count for degree centrality.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
46pub enum DegreeDirection {
47    /// Count incoming edges only.
48    In,
49    /// Count outgoing edges only.
50    Out,
51    /// Count both incoming and outgoing edges.
52    Both,
53}
54
55/// Pattern description for [`Graph::count_triangle_cycles`]: the directed
56/// cycle `(a)-[t1]->(b)-[t2]->(c)-[t3]->(a)` with an optional relationship
57/// type per hop and an optional label per node variable. `None` means
58/// unconstrained.
59#[derive(Debug, Clone, Default)]
60pub struct TriangleCountSpec<'a> {
61    /// Relationship types for the hops `a -> b`, `b -> c`, and `c -> a`.
62    pub rel_types: [Option<&'a str>; 3],
63    /// Labels required on `a`, `b`, and `c`.
64    pub labels: [Option<&'a str>; 3],
65}
66
67/// Builds a 12-byte composite key `(prefix u32 BE, id u64 BE)` for secondary index lookups.
68pub(super) fn composite_key(prefix: u32, id: u64) -> [u8; 12] {
69    let mut key = [0u8; 12];
70    key[..4].copy_from_slice(&prefix.to_be_bytes());
71    key[4..].copy_from_slice(&id.to_be_bytes());
72    key
73}
74
75/// Type tag for a null value in the sortable property encoding.
76pub(super) const ENCODED_NULL: u8 = 0x00;
77
78/// Sign bit mask used to make IEEE-754 `f64` bit patterns and two's-complement
79/// `i64` values sort in ascending numeric order as big-endian bytes.
80const SORT_SIGN_BIT: u64 = 0x8000_0000_0000_0000;
81
82/// Encodes a JSON property value into a sortable byte representation for the index.
83///
84/// Numbers use a fixed 17-byte encoding: a `0x03` tag, then 8 bytes of the
85/// order-preserving `f64` bit pattern (the primary numeric sort key), then 8
86/// bytes of an integer disambiguator. The disambiguator makes the encoding
87/// lossless for `i64` values: two integers that round to the same `f64` (any
88/// pair beyond 2^53) still produce distinct keys, while an integer and a float
89/// of the same real value (e.g. `30` and `30.0`) produce identical keys so they
90/// continue to compare equal. Keeping every numeric encoding the same length is
91/// required because property lookups match by key prefix; a variable-length
92/// encoding where one value is a prefix of another would yield false matches.
93pub(super) fn encode_property_value(val: &serde_json::Value) -> Option<Vec<u8>> {
94    match val {
95        serde_json::Value::Null => Some(vec![ENCODED_NULL]),
96        serde_json::Value::Bool(false) => Some(vec![0x01]),
97        serde_json::Value::Bool(true) => Some(vec![0x02]),
98        serde_json::Value::Number(num) => {
99            let float_val = num.as_f64()?;
100            let bits = float_val.to_bits();
101            let masked = if (bits & SORT_SIGN_BIT) != 0 {
102                !bits
103            } else {
104                bits ^ SORT_SIGN_BIT
105            };
106            // Integer disambiguator: for any number whose exact real value is an
107            // integer in `i64` range, store that integer in sign-flipped
108            // big-endian order so distinct large integers never collide. All
109            // other numbers (non-integers, out-of-range) get a fixed sentinel;
110            // they already have a unique `f64` bit pattern in the primary key,
111            // so the sentinel value cannot affect ordering or equality.
112            let int_disambig: u64 = if let Some(i) = num.as_i64() {
113                (i as u64) ^ SORT_SIGN_BIT
114            } else if float_val.fract() == 0.0
115                && float_val >= i64::MIN as f64
116                && float_val <= i64::MAX as f64
117            {
118                ((float_val as i64) as u64) ^ SORT_SIGN_BIT
119            } else {
120                0
121            };
122            let mut buf = Vec::with_capacity(17);
123            buf.push(0x03);
124            buf.extend_from_slice(&masked.to_be_bytes());
125            buf.extend_from_slice(&int_disambig.to_be_bytes());
126            Some(buf)
127        }
128        serde_json::Value::String(s) => {
129            let mut buf = Vec::with_capacity(1 + s.len() + 1);
130            buf.push(0x04);
131            buf.extend_from_slice(s.as_bytes());
132            buf.push(0x00);
133            Some(buf)
134        }
135        _ => None, // Skip arrays and objects
136    }
137}
138
139/// Decodes a sortable byte representation back into a JSON property value.
140#[allow(dead_code)]
141pub(super) fn decode_property_value(bytes: &[u8]) -> Option<serde_json::Value> {
142    if bytes.is_empty() {
143        return None;
144    }
145    match bytes[0] {
146        0x00 => Some(serde_json::Value::Null),
147        0x01 => Some(serde_json::Value::Bool(false)),
148        0x02 => Some(serde_json::Value::Bool(true)),
149        0x03 => {
150            // Numbers are `tag + 8-byte f64 sort key + 8-byte int disambiguator`.
151            if bytes.len() < 17 {
152                return None;
153            }
154            // Prefer the lossless integer disambiguator when it round-trips,
155            // so large integers decode exactly rather than through `f64`.
156            let mut int_arr = [0u8; 8];
157            int_arr.copy_from_slice(&bytes[9..17]);
158            let int_val = (u64::from_be_bytes(int_arr) ^ SORT_SIGN_BIT) as i64;
159
160            let mut arr = [0u8; 8];
161            arr.copy_from_slice(&bytes[1..9]);
162            let masked = u64::from_be_bytes(arr);
163            let bits = if (masked & SORT_SIGN_BIT) == 0 {
164                !masked
165            } else {
166                masked ^ SORT_SIGN_BIT
167            };
168            let float_val = f64::from_bits(bits);
169
170            // If the disambiguator's integer equals the float key, the value was
171            // an integer (or integer-valued float): return it losslessly as an
172            // integer. Non-integers store a sentinel whose sign-flipped form is
173            // `i64::MIN`, which never matches a non-integer float key.
174            if (int_val as f64) == float_val {
175                Some(serde_json::Value::Number(int_val.into()))
176            } else {
177                serde_json::Number::from_f64(float_val).map(serde_json::Value::Number)
178            }
179        }
180        0x04 => {
181            let str_bytes = if bytes.ends_with(&[0x00]) {
182                &bytes[1..bytes.len() - 1]
183            } else {
184                &bytes[1..]
185            };
186            String::from_utf8(str_bytes.to_vec())
187                .ok()
188                .map(serde_json::Value::String)
189        }
190        _ => None,
191    }
192}
193
194/// Builds a composite key `(label_id, prop_key_id, encoded_val, node_id)` for node property index.
195pub(super) fn node_prop_index_key(
196    label_id: LabelId,
197    prop_key_id: PropKeyId,
198    encoded_val: &[u8],
199    node_id: NodeId,
200) -> Vec<u8> {
201    let mut key = Vec::with_capacity(4 + 4 + encoded_val.len() + 8);
202    key.extend_from_slice(&label_id.to_be_bytes());
203    key.extend_from_slice(&prop_key_id.to_be_bytes());
204    key.extend_from_slice(encoded_val);
205    key.extend_from_slice(&node_id.to_be_bytes());
206    key
207}
208
209/// Builds a composite key `(type_id, prop_key_id, encoded_val, edge_id)` for edge property index.
210pub(super) fn edge_prop_index_key(
211    type_id: TypeId,
212    prop_key_id: PropKeyId,
213    encoded_val: &[u8],
214    edge_id: EdgeId,
215) -> Vec<u8> {
216    let mut key = Vec::with_capacity(4 + 4 + encoded_val.len() + 8);
217    key.extend_from_slice(&type_id.to_be_bytes());
218    key.extend_from_slice(&prop_key_id.to_be_bytes());
219    key.extend_from_slice(encoded_val);
220    key.extend_from_slice(&edge_id.to_be_bytes());
221    key
222}
223
224/// Builds a composite key `(label_id, prop_key_id, term)` for FTS postings.
225pub(super) fn fts_postings_key(label_id: LabelId, prop_key_id: PropKeyId, term: &str) -> Vec<u8> {
226    let mut key = Vec::with_capacity(8 + term.len());
227    key.extend_from_slice(&label_id.to_be_bytes());
228    key.extend_from_slice(&prop_key_id.to_be_bytes());
229    key.extend_from_slice(term.as_bytes());
230    key
231}
232
233/// Builds a 12-byte FTS posting value `(node_id, frequency)`.
234pub(super) fn fts_posting_val(node_id: NodeId, frequency: u32) -> [u8; 12] {
235    let mut val = [0u8; 12];
236    val[0..8].copy_from_slice(&node_id.to_be_bytes());
237    val[8..12].copy_from_slice(&frequency.to_be_bytes());
238    val
239}
240
241/// Parses a 12-byte FTS posting value into `(node_id, frequency)`.
242pub(super) fn parse_fts_posting_val(bytes: &[u8]) -> Result<(NodeId, u32), Error> {
243    if bytes.len() != 12 {
244        return Err(Error::Corrupt("fts posting value must be 12 bytes"));
245    }
246    let node_id = NodeId::from_be_bytes(
247        bytes[0..8]
248            .try_into()
249            .map_err(|_| Error::Corrupt("fts posting: node_id slice wrong size"))?,
250    );
251    let frequency = u32::from_be_bytes(
252        bytes[8..12]
253            .try_into()
254            .map_err(|_| Error::Corrupt("fts posting: frequency slice wrong size"))?,
255    );
256    Ok((node_id, frequency))
257}
258
259/// Builds a 16-byte FTS doc key `(label_id, prop_key_id, node_id)`.
260pub(super) fn fts_doc_key(label_id: LabelId, prop_key_id: PropKeyId, node_id: NodeId) -> [u8; 16] {
261    let mut key = [0u8; 16];
262    key[0..4].copy_from_slice(&label_id.to_be_bytes());
263    key[4..8].copy_from_slice(&prop_key_id.to_be_bytes());
264    key[8..16].copy_from_slice(&node_id.to_be_bytes());
265    key
266}
267
268/// Parses a 4-byte doc length value.
269pub(super) fn parse_fts_doc_val(bytes: &[u8]) -> Result<u32, Error> {
270    if bytes.len() != 4 {
271        return Err(Error::Corrupt("fts doc val must be 4 bytes"));
272    }
273    Ok(u32::from_be_bytes(bytes.try_into().map_err(|_| {
274        Error::Corrupt("fts doc val: slice wrong size")
275    })?))
276}
277
278pub(super) fn fts_stats_n_key(label_id: LabelId, prop_key_id: PropKeyId) -> String {
279    format!("fts_stats:node:l:{label_id}:p:{prop_key_id}:N")
280}
281
282pub(super) fn fts_stats_sum_dl_key(label_id: LabelId, prop_key_id: PropKeyId) -> String {
283    format!("fts_stats:node:l:{label_id}:p:{prop_key_id}:sum_dl")
284}
285
286/// The graph database handle. Cheap to clone: all state is behind `Arc`.
287#[derive(Clone)]
288pub struct Graph {
289    pub(super) storage: Arc<Storage>,
290    pub(super) _write_lock: Arc<ReentrantMutex<()>>,
291    pub(super) csr_cache: Arc<CsrCache>,
292    pub(super) matrices: Arc<parking_lot::RwLock<Option<MatrixSet>>>,
293    pub(super) prop_columns: Arc<crate::columns::ColumnsCache>,
294    pub(super) n_threads: Arc<std::sync::atomic::AtomicI32>,
295    /// Type-erased extension cache. Higher-level crates attach caches (e.g. the
296    /// HNSW vector index) to a Graph without creating a circular dependency,
297    /// through the `get_extension`, `set_extension`, and
298    /// `get_or_init_extension_with` methods. Keys are `std::any::TypeId`; values
299    /// are `Arc<dyn Any + Send + Sync>`.
300    pub(crate) extensions: Arc<parking_lot::Mutex<AHashMap<StdTypeId, Box<dyn Any + Send + Sync>>>>,
301}
302
303/// A read-only transaction on the graph.
304pub struct ReadTxn<'a> {
305    pub(super) graph: &'a Graph,
306    pub(super) rtxn: heed::RoTxn<'a, heed::WithTls>,
307}
308
309/// A read-write transaction on the graph.
310pub struct WriteTxn<'a> {
311    pub(super) graph: &'a Graph,
312    pub(super) wtxn: heed::RwTxn<'a>,
313    pub(super) mutations_count: usize,
314    /// Structural mutations staged during this transaction, flushed to the
315    /// `CsrCache` only on commit so an aborted transaction records nothing.
316    pub(super) delta: crate::csr::GraphDelta,
317}
318
319impl Graph {
320    pub fn open(path: &Path, map_size_gb: usize) -> Result<Self, Error> {
321        let storage = Storage::open(path, map_size_gb)?;
322        // Older versions persisted the CSR snapshot next to the LMDB files but
323        // never read it back; remove the stale artifact if one is present.
324        let _ = std::fs::remove_file(path.join("csr_snapshot.bin"));
325        let initial = CsrSnapshot::build(&storage)?;
326        let storage = Arc::new(storage);
327        let csr_cache = Arc::new(CsrCache::new(initial));
328        let matrices = {
329            let initial_snap = csr_cache.snapshot.load();
330            let m = MatrixSet::materialize(&initial_snap, 0)?;
331            Arc::new(parking_lot::RwLock::new(Some(m)))
332        };
333        Ok(Self {
334            storage,
335            _write_lock: Arc::new(ReentrantMutex::new(())),
336            csr_cache,
337            matrices,
338            prop_columns: Arc::new(crate::columns::ColumnsCache::default()),
339            n_threads: Arc::new(std::sync::atomic::AtomicI32::new(0)),
340            extensions: Arc::new(parking_lot::Mutex::new(AHashMap::new())),
341        })
342    }
343
344    /// Set the thread count for GraphBLAS matrix computations, overriding the
345    /// `ISSUNDB_NUM_THREADS` environment variable. Set to 0 to restore the default behavior.
346    pub fn set_thread_count(&self, n: i32) -> Result<(), Error> {
347        self.n_threads
348            .store(n, std::sync::atomic::Ordering::Release);
349        issundb_graphblas::set_global_threads(n).map_err(|e| Error::GraphBLAS(e.to_string()))?;
350        Ok(())
351    }
352
353    /// Read one property of a node through the in-memory property columns,
354    /// as the `serde_json::Value` that decoding the stored record would give.
355    /// Returns `None` for a nonexistent node and `Some(Value::Null)` for a
356    /// missing property. Builds or refreshes the columns on first use after a
357    /// write, so the result always reflects committed state.
358    pub fn node_prop_json(
359        &self,
360        id: NodeId,
361        prop: &str,
362    ) -> Result<Option<serde_json::Value>, Error> {
363        self.prop_columns.with_fresh(&self.storage, |cols| {
364            cols.id_to_dense.get(&id).map(|&d| {
365                cols.cols
366                    .get(prop)
367                    .and_then(|c| c.get_json_opt(d as usize))
368                    .unwrap_or(serde_json::Value::Null)
369            })
370        })
371    }
372
373    /// Bulk form of [`Graph::node_prop_json`]: gather `props` for each id in
374    /// `ids` through the in-memory property columns, row-major (`out[i][j]` is
375    /// `props[j]` on `ids[i]`). One columns refresh covers the whole gather,
376    /// and each id resolves to its dense index once. A missing property reads
377    /// as `Value::Null`; a nonexistent node is [`Error::NodeNotFound`].
378    pub fn node_props_json_table(
379        &self,
380        ids: &[NodeId],
381        props: &[&str],
382    ) -> Result<Vec<Vec<serde_json::Value>>, Error> {
383        self.prop_columns
384            .with_fresh(&self.storage, |cols| cols.props_table(ids, props))?
385    }
386
387    /// Single-property column form of [`Graph::node_props_json_table`]:
388    /// `out[i]` is the value of `prop` on `ids[i]`, as one flat vector, so a
389    /// bulk single-property gather does not pay one row vector allocation per
390    /// id. A missing property reads as `Value::Null`; a nonexistent node is
391    /// [`Error::NodeNotFound`].
392    pub fn node_prop_json_column(
393        &self,
394        ids: &[NodeId],
395        prop: &str,
396    ) -> Result<Vec<serde_json::Value>, Error> {
397        self.prop_columns
398            .with_fresh(&self.storage, |cols| cols.prop_column(ids, prop))?
399    }
400
401    /// Group `ids` by the exact value of `prop` through the in-memory
402    /// property columns: one dense group code per id, plus one representative
403    /// value per code (the first occurrence). Null and missing property
404    /// values share one code represented by `Value::Null`; a nonexistent node
405    /// is [`Error::NodeNotFound`]. Codes are assigned under value identity,
406    /// which for the typed columns needs no per-row value materialization.
407    pub fn node_prop_group_codes(
408        &self,
409        ids: &[NodeId],
410        prop: &str,
411    ) -> Result<(Vec<u32>, Vec<serde_json::Value>), Error> {
412        self.prop_columns
413            .with_fresh(&self.storage, |cols| cols.group_codes(ids, prop))?
414    }
415
416    /// Store an extension value (as `Arc`) keyed by its concrete type.
417    /// Replaces any existing value of the same type.
418    pub fn set_extension<T: Any + Send + Sync>(&self, val: Arc<T>) {
419        self.extensions
420            .lock()
421            .insert(StdTypeId::of::<T>(), Box::new(val));
422    }
423
424    /// Retrieve an `Arc` to a previously stored extension value, or `None` if absent.
425    pub fn get_extension<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
426        self.extensions
427            .lock()
428            .get(&StdTypeId::of::<T>())
429            .and_then(|b| b.downcast_ref::<Arc<T>>())
430            .cloned()
431    }
432
433    /// Return the extension of type `T`, initializing it with `init` if absent.
434    ///
435    /// `init` runs without the extensions lock held, so it may call back into
436    /// the graph (for example, to read from storage) without risking a lock
437    /// ordering problem. If two threads initialize concurrently, both may run
438    /// `init`, but only the first stored value is kept and every caller observes
439    /// that same `Arc`. `init` is fallible; on error nothing is stored and the
440    /// error is propagated.
441    pub fn get_or_init_extension_with<T, E, F>(&self, init: F) -> Result<Arc<T>, E>
442    where
443        T: Any + Send + Sync,
444        F: FnOnce() -> Result<Arc<T>, E>,
445    {
446        if let Some(existing) = self.get_extension::<T>() {
447            return Ok(existing);
448        }
449        let value = init()?;
450        let mut ext = self.extensions.lock();
451        // Another thread may have initialized while we built ours; prefer the
452        // already-stored value so all callers share one instance.
453        if let Some(existing) = ext
454            .get(&StdTypeId::of::<T>())
455            .and_then(|b| b.downcast_ref::<Arc<T>>())
456        {
457            return Ok(existing.clone());
458        }
459        ext.insert(StdTypeId::of::<T>(), Box::new(value.clone()));
460        Ok(value)
461    }
462
463    /// Execute a read-only transaction inside a closure.
464    pub fn view<F, T>(&self, f: F) -> Result<T, Error>
465    where
466        F: FnOnce(&ReadTxn) -> Result<T, Error>,
467    {
468        let rtxn = self.storage.env.read_txn()?;
469        let txn = ReadTxn { graph: self, rtxn };
470        f(&txn)
471    }
472
473    /// Execute a read-write transaction inside a closure.
474    pub fn update<F, T>(&self, f: F) -> Result<T, Error>
475    where
476        F: FnOnce(&mut WriteTxn) -> Result<T, Error>,
477    {
478        let _guard = self._write_lock.lock();
479        let wtxn = self.storage.env.write_txn()?;
480        let mut txn = WriteTxn {
481            graph: self,
482            wtxn,
483            mutations_count: 0,
484            delta: crate::csr::GraphDelta::default(),
485        };
486        match f(&mut txn) {
487            Ok(val) => {
488                let mutations_count = txn.mutations_count;
489                let delta = std::mem::take(&mut txn.delta);
490                txn.wtxn.commit()?;
491                if delta.force_full {
492                    self.prop_columns.record_force_full();
493                } else {
494                    self.prop_columns.record_touched_many(&delta.added_nodes);
495                    self.prop_columns.record_touched_many(&delta.updated_nodes);
496                }
497                self.csr_cache.record_batch(delta);
498                if mutations_count > 0 {
499                    self.maybe_spawn_rebuild_n(mutations_count);
500                }
501                Ok(val)
502            }
503            Err(err) => {
504                txn.wtxn.abort();
505                Err(err)
506            }
507        }
508    }
509
510    /// Hold the write lock for the duration of `f`, executing `f` without
511    /// starting an LMDB transaction. Use this to make a multi-step read-then-write
512    /// sequence (such as MERGE) atomic with respect to other writers.
513    pub fn with_write_lock<F, R>(&self, f: F) -> R
514    where
515        F: FnOnce() -> R,
516    {
517        let _guard = self._write_lock.lock();
518        f()
519    }
520
521    /// Synchronously rebuild the CSR snapshot from LMDB. Useful after bulk
522    /// loads or when tests need a consistent read view before the threshold
523    /// has been crossed.
524    #[instrument(skip(self))]
525    pub fn rebuild_csr(&self) -> Result<(), Error> {
526        // Capture the generation before reading LMDB so writes that land during
527        // the build leave the snapshot conservatively stale.
528        let built_gen = self.csr_cache.current_gen();
529        // Clear the delta before reading LMDB: writes that commit during the
530        // build land in the emptied delta and are re-applied incrementally later
531        // (idempotently) rather than lost.
532        self.csr_cache.clear_delta();
533        let snap = CsrSnapshot::build(&self.storage)?;
534        let m = MatrixSet::materialize(
535            &snap,
536            self.n_threads.load(std::sync::atomic::Ordering::Acquire),
537        )?;
538        *self.matrices.write() = Some(m);
539        self.csr_cache.install_full(snap, built_gen);
540        Ok(())
541    }
542
543    /// Create a hot backup of this database to `destination`.
544    ///
545    /// `destination` is a **file path** for the backup snapshot (e.g.
546    /// `/backups/mydb_2026-05-27.mdb`). The file is a complete, portable
547    /// LMDB snapshot. Concurrent reads and writes are not blocked.
548    ///
549    /// To restore: create an empty directory, copy the snapshot file to
550    /// `<dir>/data.mdb`, then call `Graph::open(<dir>, map_size_gb)`.
551    pub fn backup(&self, destination: &Path) -> Result<(), Error> {
552        self.storage
553            .env
554            .copy_to_path(destination, heed::CompactionOption::Disabled)
555            .map(|_| ())
556            .map_err(Error::Storage)
557    }
558
559    /// Same as `backup` but compacts the database during the copy.
560    ///
561    /// The resulting file is smaller than a raw backup but the operation
562    /// takes longer because it rewrites every live page.
563    pub fn backup_compact(&self, destination: &Path) -> Result<(), Error> {
564        self.storage
565            .env
566            .copy_to_path(destination, heed::CompactionOption::Enabled)
567            .map(|_| ())
568            .map_err(Error::Storage)
569    }
570
571    /// Restore a backup snapshot created by `backup` or `backup_compact` into
572    /// a new database directory.
573    ///
574    /// Creates `dst_dir` if it does not exist, then copies `snapshot_file` into
575    /// `dst_dir/data.mdb`. After this call succeeds the caller can open the
576    /// restored database with `Graph::open(dst_dir, map_size_gb)`.
577    pub fn restore(snapshot_file: &Path, dst_dir: &Path) -> Result<(), Error> {
578        std::fs::create_dir_all(dst_dir)?;
579        let dst_file = dst_dir.join("data.mdb");
580        std::fs::copy(snapshot_file, &dst_file)?;
581        Ok(())
582    }
583}
584
585#[cfg(test)]
586mod extension_tests {
587    use std::sync::Arc;
588
589    use tempfile::TempDir;
590
591    use super::Graph;
592
593    fn open_tmp() -> (TempDir, Graph) {
594        let dir = TempDir::new().unwrap();
595        let g = Graph::open(dir.path(), 1).unwrap();
596        (dir, g)
597    }
598
599    /// Extensions are keyed by concrete type: a stored value round-trips, an
600    /// absent type returns `None`, and a second `set_extension` replaces the
601    /// previous value of the same type.
602    #[test]
603    fn extension_roundtrip_by_type() {
604        let (_dir, g) = open_tmp();
605        assert!(g.get_extension::<String>().is_none());
606
607        g.set_extension(Arc::new(String::from("cache")));
608        let got = g.get_extension::<String>().expect("extension must exist");
609        assert_eq!(*got, "cache");
610        assert!(g.get_extension::<u64>().is_none(), "distinct type slot");
611
612        g.set_extension(Arc::new(String::from("replaced")));
613        assert_eq!(*g.get_extension::<String>().unwrap(), "replaced");
614    }
615
616    /// `get_or_init_extension_with` runs `init` only when the slot is empty;
617    /// later callers observe the first stored value.
618    #[test]
619    fn get_or_init_extension_initializes_once() {
620        let (_dir, g) = open_tmp();
621
622        let v1 = g
623            .get_or_init_extension_with::<u64, std::convert::Infallible, _>(|| Ok(Arc::new(7)))
624            .unwrap();
625        assert_eq!(*v1, 7);
626
627        let v2 = g
628            .get_or_init_extension_with::<u64, std::convert::Infallible, _>(|| Ok(Arc::new(9)))
629            .unwrap();
630        assert_eq!(*v2, 7, "second init must not replace the stored value");
631    }
632
633    /// An `init` failure stores nothing, so a later successful `init` runs.
634    #[test]
635    fn get_or_init_extension_propagates_init_error() {
636        let (_dir, g) = open_tmp();
637
638        let err = g
639            .get_or_init_extension_with::<u64, &str, _>(|| Err("init failed"))
640            .unwrap_err();
641        assert_eq!(err, "init failed");
642        assert!(g.get_extension::<u64>().is_none());
643
644        let v = g
645            .get_or_init_extension_with::<u64, &str, _>(|| Ok(Arc::new(7)))
646            .unwrap();
647        assert_eq!(*v, 7);
648    }
649}
650
651#[cfg(test)]
652mod encode_tests {
653    use serde_json::json;
654
655    use super::{decode_property_value, encode_property_value};
656
657    /// Distinct integers beyond 2^53 must encode to distinct keys. Encoding
658    /// purely through `f64` (the previous behavior) collapsed them, causing
659    /// index collisions and wrong `nodes_by_property` matches.
660    #[test]
661    fn large_integers_do_not_collide() {
662        let a = encode_property_value(&json!(9_007_199_254_740_992_i64)).unwrap(); // 2^53
663        let b = encode_property_value(&json!(9_007_199_254_740_993_i64)).unwrap(); // 2^53 + 1
664        assert_ne!(a, b, "distinct large integers must encode distinctly");
665    }
666
667    /// An integer and the float of the same real value must encode identically
668    /// so they keep comparing equal in the index (Cypher treats `30 = 30.0`).
669    #[test]
670    fn integer_and_equal_float_unify() {
671        assert_eq!(
672            encode_property_value(&json!(30)).unwrap(),
673            encode_property_value(&json!(30.0)).unwrap(),
674        );
675        assert_eq!(
676            encode_property_value(&json!(0)).unwrap(),
677            encode_property_value(&json!(0.0)).unwrap(),
678        );
679    }
680
681    /// Every numeric encoding must be the same length: property lookups match by
682    /// key prefix, so a value whose encoding prefixes another's would alias.
683    #[test]
684    fn numeric_encoding_is_fixed_length() {
685        for v in [
686            json!(1),
687            json!(-1),
688            json!(0),
689            json!(i64::MAX),
690            json!(i64::MIN),
691            json!(3.5),
692            json!(-2.5e10),
693        ] {
694            assert_eq!(encode_property_value(&v).unwrap().len(), 17, "value {v}");
695        }
696    }
697
698    /// Byte-lexicographic order of encodings must match numeric order, including
699    /// across the 2^53 boundary where the disambiguator orders the tie.
700    #[test]
701    fn numeric_ordering_preserved() {
702        let ascending: Vec<i64> = vec![
703            i64::MIN,
704            -1_000,
705            -1,
706            0,
707            1,
708            1_000,
709            1 << 53,
710            (1 << 53) + 1,
711            i64::MAX,
712        ];
713        let encoded: Vec<Vec<u8>> = ascending
714            .iter()
715            .map(|v| encode_property_value(&json!(v)).unwrap())
716            .collect();
717        let mut sorted = encoded.clone();
718        sorted.sort();
719        assert_eq!(encoded, sorted, "encodings must sort in numeric order");
720    }
721
722    /// Large integers must decode back to the exact integer, not a rounded float.
723    #[test]
724    fn decode_round_trips_large_integer() {
725        for v in [
726            json!(0),
727            json!(-1),
728            json!(9_007_199_254_740_993_i64),
729            json!(i64::MAX),
730        ] {
731            let enc = encode_property_value(&v).unwrap();
732            assert_eq!(decode_property_value(&enc), Some(v.clone()), "value {v}");
733        }
734    }
735}