Skip to main content

sparrowdb_storage/
node_store.rs

1//! Node property storage.
2//!
3//! Nodes are stored as typed property columns.  Each `(label_id, col_id)` pair
4//! maps to a flat binary file of fixed-width values.  Valid `u64` values pack
5//! `(label_id: u32, slot: u32)` into a single `u64` node ID consistent with
6//! [`sparrowdb_common::NodeId`] semantics.
7//!
8//! ## File layout
9//!
10//! ```text
11//! nodes/{label_id}/col_{col_id}.bin
12//! ```
13//!
14//! Each column file is a flat array of `u64` LE values (one per slot).
15//! The high-water mark is tracked in-memory and written to a small header file:
16//!
17//! ```text
18//! nodes/{label_id}/hwm.bin   — [hwm: u64 LE]
19//! ```
20//!
21//! ## Node ID packing
22//!
23//! ```text
24//! node_id = (label_id as u64) << 32 | slot as u64
25//! ```
26//!
27//! Upper 32 bits are `label_id`, lower 32 bits are the within-label slot number.
28
29use std::collections::{HashMap, HashSet};
30use std::fs;
31use std::io::{Read, Seek, SeekFrom};
32use std::path::{Path, PathBuf};
33
34use sparrowdb_common::{Error, NodeId, Result};
35
36// ── Value type ────────────────────────────────────────────────────────────────
37
38/// A typed property value.
39#[derive(Debug, Clone, PartialEq)]
40pub enum Value {
41    /// Signed 64-bit integer, stored as raw `u64` bits (two's-complement).
42    Int64(i64),
43    /// Raw byte blob, stored as a fixed-width 8-byte reference in v1.
44    /// The actual bytes are placed inline for values ≤ 8 bytes; longer blobs
45    /// are truncated and marked with a sentinel in v1 (overflow deferred).
46    Bytes(Vec<u8>),
47    /// IEEE-754 double-precision float.  Stored as 8 raw bytes in the overflow
48    /// heap so that no bits are masked by the type-tag scheme (SPA-267).
49    Float(f64),
50}
51
52/// Type tag embedded in the top byte (byte index 7 in LE) of a stored `u64`.
53///
54/// - `0x00` = `Int64`         — lower 7 bytes hold the signed integer (56-bit range).
55/// - `0x01` = `Bytes`         — lower 7 bytes hold up to 7 bytes of inline string data.
56/// - `0x02` = `BytesOverflow` — lower 7 bytes encode `(offset: u40 LE, len: u16 LE)`
57///   pointing into `strings.bin` (SPA-212).
58/// - `0x03` = `Float`         — lower 7 bytes encode `(offset: u40 LE, len: u16 LE)`
59///   pointing into `strings.bin` where 8 raw IEEE-754 bytes are stored (SPA-267).
60///
61/// The overflow encoding packs a heap pointer into 7 bytes:
62///   bytes[0..5] = heap byte offset (u40 LE, max ~1 TiB)
63///   bytes[5..7] = byte length (u16 LE, max 65535 bytes)
64///
65/// This lets `decode_raw_value` reconstruct strings of any length, fixing SPA-212.
66const TAG_INT64: u8 = 0x00;
67const TAG_BYTES: u8 = 0x01;
68/// Tag for strings > 7 bytes stored in the overflow string heap (SPA-212).
69const TAG_BYTES_OVERFLOW: u8 = 0x02;
70/// Tag for f64 values stored as 8 raw IEEE-754 bytes in the overflow heap (SPA-267).
71/// Using the heap ensures all 64 bits of the float are preserved without any masking.
72const TAG_FLOAT: u8 = 0x03;
73/// Maximum bytes that fit inline in the 7-byte payload (one byte is the tag).
74const MAX_INLINE_BYTES: usize = 7;
75
76impl Value {
77    /// Encode as a packed `u64` for column storage.
78    ///
79    /// The top byte (byte 7 in little-endian) is a type tag; the remaining
80    /// 7 bytes carry the payload.  This allows `from_u64` to reconstruct the
81    /// correct variant at read time (SPA-169).
82    ///
83    /// For `Bytes` values that exceed 7 bytes, this method only encodes the
84    /// first 7 bytes inline.  Callers that need full overflow support must use
85    /// [`NodeStore::encode_value`] instead, which writes long strings to the
86    /// heap and returns an overflow-tagged u64 (SPA-212).
87    ///
88    /// # Int64 range
89    /// Only the lower 56 bits of the integer are stored.  This covers all
90    /// practical node IDs and numeric property values; very large i64 values
91    /// (> 2^55 or < -2^55) would be truncated.  Full 64-bit range is deferred
92    /// to a later overflow encoding.
93    pub fn to_u64(&self) -> u64 {
94        match self {
95            Value::Int64(v) => {
96                // Top byte = TAG_INT64 (0x00); lower 7 bytes = lower 56 bits of v.
97                // For TAG_INT64 = 0x00 this is just the value masked to 56 bits,
98                // which is a no-op for any i64 whose top byte is already 0x00.
99                let payload = (*v as u64) & 0x00FF_FFFF_FFFF_FFFF;
100                // Tag byte goes into byte 7 (the most significant byte in LE).
101                payload | ((TAG_INT64 as u64) << 56)
102            }
103            Value::Bytes(b) => {
104                let mut arr = [0u8; 8];
105                arr[7] = TAG_BYTES; // type tag in top byte
106                let len = b.len().min(MAX_INLINE_BYTES);
107                arr[..len].copy_from_slice(&b[..len]);
108                u64::from_le_bytes(arr)
109            }
110            Value::Float(_) => {
111                // Float values require heap storage — callers must use
112                // NodeStore::encode_value instead of Value::to_u64.
113                panic!("Value::Float cannot be inline-encoded; use NodeStore::encode_value");
114            }
115        }
116    }
117
118    /// Reconstruct a `Value` from a stored `u64`, using the top byte as a
119    /// type tag (SPA-169).
120    ///
121    /// Only handles inline encodings (`TAG_INT64` and `TAG_BYTES`).
122    /// For overflow strings (`TAG_BYTES_OVERFLOW`), use [`NodeStore::decode_raw_value`]
123    /// which has access to the string heap (SPA-212).
124    pub fn from_u64(v: u64) -> Self {
125        let bytes = v.to_le_bytes(); // bytes[7] = top byte = tag
126        match bytes[7] {
127            TAG_BYTES => {
128                // Inline string: bytes[0..7] hold the data; strip trailing zeros.
129                let data: Vec<u8> = bytes[..7].iter().copied().take_while(|&b| b != 0).collect();
130                Value::Bytes(data)
131            }
132            _ => {
133                // TAG_INT64 (0x00) or any unrecognised tag → Int64.
134                // Sign-extend from 56 bits: shift left 8 to bring bit 55 into
135                // sign position, then arithmetic shift right 8.
136                let shifted = (v << 8) as i64;
137                Value::Int64(shifted >> 8)
138            }
139        }
140    }
141
142    /// Reconstruct an `Int64` value from a stored `u64`.
143    ///
144    /// Preserved for callers that know the column type is always Int64 (e.g.
145    /// pre-SPA-169 paths).  New code should prefer `from_u64`.
146    pub fn int64_from_u64(v: u64) -> Self {
147        Value::Int64(v as i64)
148    }
149}
150
151// ── NodeStore ─────────────────────────────────────────────────────────────────
152
153/// Persistent node property store rooted at a database directory.
154///
155/// On-disk layout:
156/// ```text
157/// {root}/nodes/{label_id}/hwm.bin            — high-water mark (u64 LE)
158/// {root}/nodes/{label_id}/col_{col_id}.bin   — flat u64 column array
159/// {root}/strings.bin                         — overflow string heap (SPA-212)
160/// ```
161///
162/// The overflow heap is an append-only byte file.  Each entry is a raw byte
163/// sequence (no length prefix); the offset and length are encoded into the
164/// `TAG_BYTES_OVERFLOW` u64 stored in the column file.
165pub struct NodeStore {
166    root: PathBuf,
167    /// In-memory high-water marks per label.  Loaded lazily from disk.
168    hwm: HashMap<u32, u64>,
169    /// Labels whose in-memory HWM has been advanced but not yet persisted to
170    /// `hwm.bin`.  Flushed atomically by [`flush_hwms`] at transaction commit.
171    hwm_dirty: HashSet<u32>,
172}
173
174impl NodeStore {
175    /// Open (or create) a node store rooted at `db_root`.
176    pub fn open(db_root: &Path) -> Result<Self> {
177        Ok(NodeStore {
178            root: db_root.to_path_buf(),
179            hwm: HashMap::new(),
180            hwm_dirty: HashSet::new(),
181        })
182    }
183
184    /// Return the root directory path of this node store.
185    pub fn root_path(&self) -> &Path {
186        &self.root
187    }
188
189    // ── Internal helpers ──────────────────────────────────────────────────────
190
191    fn label_dir(&self, label_id: u32) -> PathBuf {
192        self.root.join("nodes").join(label_id.to_string())
193    }
194
195    fn hwm_path(&self, label_id: u32) -> PathBuf {
196        self.label_dir(label_id).join("hwm.bin")
197    }
198
199    fn col_path(&self, label_id: u32, col_id: u32) -> PathBuf {
200        self.label_dir(label_id).join(format!("col_{col_id}.bin"))
201    }
202
203    /// Path to the null-bitmap sidecar for a column (SPA-207).
204    ///
205    /// Bit N = 1 means slot N has a real value (present/non-null).
206    /// Bit N = 0 means slot N was zero-padded (absent/null).
207    fn null_bitmap_path(&self, label_id: u32, col_id: u32) -> PathBuf {
208        self.label_dir(label_id)
209            .join(format!("col_{col_id}_null.bin"))
210    }
211
212    /// Mark slot `slot` as present (has a real value) in the null bitmap (SPA-207).
213    fn set_null_bit(&self, label_id: u32, col_id: u32, slot: u32) -> Result<()> {
214        let path = self.null_bitmap_path(label_id, col_id);
215        if let Some(parent) = path.parent() {
216            fs::create_dir_all(parent).map_err(Error::Io)?;
217        }
218        let byte_idx = (slot / 8) as usize;
219        let bit_idx = slot % 8;
220        let mut bits = if path.exists() {
221            fs::read(&path).map_err(Error::Io)?
222        } else {
223            vec![]
224        };
225        if bits.len() <= byte_idx {
226            bits.resize(byte_idx + 1, 0);
227        }
228        bits[byte_idx] |= 1 << bit_idx;
229        fs::write(&path, &bits).map_err(Error::Io)
230    }
231
232    /// Returns `true` if slot `slot` has a real value (present/non-null) (SPA-207).
233    ///
234    /// Backward-compatible: if no bitmap file exists (old data written before
235    /// the null-bitmap fix), every slot is treated as present.
236    fn get_null_bit(&self, label_id: u32, col_id: u32, slot: u32) -> Result<bool> {
237        let path = self.null_bitmap_path(label_id, col_id);
238        if !path.exists() {
239            // No bitmap file → backward-compatible: treat all slots as present.
240            return Ok(true);
241        }
242        let bits = fs::read(&path).map_err(Error::Io)?;
243        let byte_idx = (slot / 8) as usize;
244        if byte_idx >= bits.len() {
245            return Ok(false);
246        }
247        Ok((bits[byte_idx] >> (slot % 8)) & 1 == 1)
248    }
249
250    /// Path to the overflow string heap (shared across all labels).
251    fn strings_bin_path(&self) -> PathBuf {
252        self.root.join("strings.bin")
253    }
254
255    // ── Overflow string heap (SPA-212) ────────────────────────────────────────
256
257    /// Append `bytes` to the overflow string heap and return an
258    /// `TAG_BYTES_OVERFLOW`-tagged `u64` encoding the (offset, len) pair.
259    ///
260    /// Layout of the returned `u64` (little-endian bytes):
261    ///   bytes[0..5] = heap byte offset as u40 LE  (max ~1 TiB)
262    ///   bytes[5..7] = byte length as u16 LE       (max 65 535 bytes)
263    ///   bytes[7]    = `TAG_BYTES_OVERFLOW` (0x02)
264    fn append_to_string_heap(&self, bytes: &[u8]) -> Result<u64> {
265        use std::io::{Seek, SeekFrom, Write};
266        let path = self.strings_bin_path();
267        let mut file = fs::OpenOptions::new()
268            .create(true)
269            .truncate(false)
270            .append(false)
271            .read(true)
272            .write(true)
273            .open(&path)
274            .map_err(Error::Io)?;
275
276        // The heap offset is the current end of the file.
277        let offset = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
278        file.write_all(bytes).map_err(Error::Io)?;
279
280        // Encode (offset, len) into a 7-byte payload.
281        let len = bytes.len() as u64;
282        debug_assert!(
283            offset <= 0x00FF_FFFF_FFFF_u64,
284            "string heap too large for 5-byte offset"
285        );
286        debug_assert!(len <= 0xFFFF, "string longer than 65535 bytes");
287
288        let mut arr = [0u8; 8];
289        // Store offset in bytes[0..5] (40-bit LE).
290        arr[0] = offset as u8;
291        arr[1] = (offset >> 8) as u8;
292        arr[2] = (offset >> 16) as u8;
293        arr[3] = (offset >> 24) as u8;
294        arr[4] = (offset >> 32) as u8;
295        // Store len in bytes[5..7] (16-bit LE).
296        arr[5] = len as u8;
297        arr[6] = (len >> 8) as u8;
298        // Tag byte.
299        arr[7] = TAG_BYTES_OVERFLOW;
300        Ok(u64::from_le_bytes(arr))
301    }
302
303    /// Read string bytes from the overflow heap given an `TAG_BYTES_OVERFLOW`
304    /// tagged `u64` produced by [`append_to_string_heap`].
305    ///
306    /// Uses `seek + read_exact` to load only the `len` bytes at `offset`,
307    /// avoiding a full file load regardless of `strings.bin` size (SPA-208).
308    fn read_from_string_heap(&self, tagged: u64) -> Result<Vec<u8>> {
309        use std::io::{Read, Seek, SeekFrom};
310
311        let arr = tagged.to_le_bytes();
312        debug_assert_eq!(arr[7], TAG_BYTES_OVERFLOW, "not an overflow pointer");
313
314        // Decode (offset, len).
315        let offset = arr[0] as u64
316            | ((arr[1] as u64) << 8)
317            | ((arr[2] as u64) << 16)
318            | ((arr[3] as u64) << 24)
319            | ((arr[4] as u64) << 32);
320        let len = arr[5] as usize | ((arr[6] as usize) << 8);
321
322        let path = self.strings_bin_path();
323        let mut file = fs::File::open(&path).map_err(Error::Io)?;
324        file.seek(SeekFrom::Start(offset)).map_err(|e| {
325            Error::Corruption(format!("string heap seek failed (offset={offset}): {e}"))
326        })?;
327        let mut buf = vec![0u8; len];
328        file.read_exact(&mut buf).map_err(|e| {
329            Error::Corruption(format!(
330                "string heap too short: need {} bytes at offset {offset}: {e}",
331                len
332            ))
333        })?;
334        Ok(buf)
335    }
336
337    // ── Value encode / decode with overflow support ───────────────────────────
338
339    /// Encode a `Value` for column storage, writing long `Bytes` strings to
340    /// the overflow heap (SPA-212).
341    ///
342    /// - `Int64`          → identical to `Value::to_u64()`.
343    /// - `Bytes` ≤ 7 B    → inline `TAG_BYTES` encoding, identical to `Value::to_u64()`.
344    /// - `Bytes` > 7 B    → appended to `strings.bin`; returns `TAG_BYTES_OVERFLOW` u64.
345    /// - `Float`          → 8 raw IEEE-754 bytes appended to `strings.bin`;
346    ///   returns a `TAG_FLOAT` u64 so all 64 float bits are preserved (SPA-267).
347    pub fn encode_value(&self, val: &Value) -> Result<u64> {
348        match val {
349            Value::Int64(_) => Ok(val.to_u64()),
350            Value::Bytes(b) if b.len() <= MAX_INLINE_BYTES => Ok(val.to_u64()),
351            Value::Bytes(b) => self.append_to_string_heap(b),
352            // SPA-267: store all 8 float bytes in the heap so no bits are masked.
353            // The heap pointer uses the same (offset: u40, len: u16) layout as
354            // TAG_BYTES_OVERFLOW but with TAG_FLOAT in byte 7.
355            Value::Float(f) => {
356                let bits = f.to_bits().to_le_bytes();
357                let heap_tagged = self.append_to_string_heap(&bits)?;
358                // Replace the TAG_BYTES_OVERFLOW tag byte with TAG_FLOAT.
359                let payload = heap_tagged & 0x00FF_FFFF_FFFF_FFFF;
360                Ok((TAG_FLOAT as u64) << 56 | payload)
361            }
362        }
363    }
364
365    /// Decode a raw `u64` column value back to a `Value`, reading the
366    /// overflow string heap when the tag is `TAG_BYTES_OVERFLOW` or `TAG_FLOAT` (SPA-212, SPA-267).
367    ///
368    /// Handles all four tags:
369    /// - `TAG_INT64`          → `Value::Int64`
370    /// - `TAG_BYTES`          → `Value::Bytes` (inline, ≤ 7 bytes)
371    /// - `TAG_BYTES_OVERFLOW` → `Value::Bytes` (from heap)
372    /// - `TAG_FLOAT`          → `Value::Float` (8 raw IEEE-754 bytes from heap)
373    pub fn decode_raw_value(&self, raw: u64) -> Value {
374        let tag = (raw >> 56) as u8;
375        match tag {
376            TAG_BYTES_OVERFLOW => match self.read_from_string_heap(raw) {
377                Ok(bytes) => Value::Bytes(bytes),
378                Err(e) => {
379                    // Corruption fallback: return empty bytes and log.
380                    eprintln!(
381                        "WARN: failed to read overflow string from heap (raw={raw:#018x}): {e}"
382                    );
383                    Value::Bytes(Vec::new())
384                }
385            },
386            // SPA-267: float values are stored as 8-byte IEEE-754 blobs in the heap.
387            // Reconstruct the heap pointer by swapping TAG_FLOAT → TAG_BYTES_OVERFLOW
388            // so read_from_string_heap can locate the bytes.
389            TAG_FLOAT => {
390                let payload = raw & 0x00FF_FFFF_FFFF_FFFF;
391                let heap_tagged = (TAG_BYTES_OVERFLOW as u64) << 56 | payload;
392                match self.read_from_string_heap(heap_tagged) {
393                    Ok(bytes) if bytes.len() == 8 => {
394                        let arr: [u8; 8] = bytes.try_into().unwrap();
395                        Value::Float(f64::from_bits(u64::from_le_bytes(arr)))
396                    }
397                    Ok(bytes) => {
398                        eprintln!(
399                            "WARN: float heap blob has unexpected length {} (raw={raw:#018x})",
400                            bytes.len()
401                        );
402                        Value::Float(f64::NAN)
403                    }
404                    Err(e) => {
405                        eprintln!("WARN: failed to read float from heap (raw={raw:#018x}): {e}");
406                        Value::Float(f64::NAN)
407                    }
408                }
409            }
410            _ => Value::from_u64(raw),
411        }
412    }
413
414    /// Check whether a raw stored `u64` encodes a string equal to `s`.
415    ///
416    /// Handles both inline (`TAG_BYTES`) and overflow (`TAG_BYTES_OVERFLOW`)
417    /// encodings (SPA-212).  Used by WHERE-clause and prop-filter comparison.
418    pub fn raw_str_matches(&self, raw: u64, s: &str) -> bool {
419        let tag = (raw >> 56) as u8;
420        match tag {
421            TAG_BYTES => {
422                // Fast inline comparison: encode s the same way and compare u64s.
423                raw == Value::Bytes(s.as_bytes().to_vec()).to_u64()
424            }
425            TAG_BYTES_OVERFLOW => {
426                // Overflow: read from heap and compare bytes.
427                match self.read_from_string_heap(raw) {
428                    Ok(bytes) => bytes == s.as_bytes(),
429                    Err(_) => false,
430                }
431            }
432            _ => false, // INT64 or unknown — not a string
433        }
434    }
435
436    /// Read the high-water mark for `label_id` from disk (or return 0).
437    ///
438    /// Recovery path (SPA-211): if `hwm.bin` is missing or corrupt but
439    /// `hwm.bin.tmp` exists (leftover from a previous crashed write), we
440    /// promote the tmp file to `hwm.bin` and use its value.
441    fn load_hwm(&self, label_id: u32) -> Result<u64> {
442        let path = self.hwm_path(label_id);
443        let tmp_path = self.hwm_tmp_path(label_id);
444
445        // Try to read the canonical file first.
446        let try_read = |p: &std::path::Path| -> Option<u64> {
447            let bytes = fs::read(p).ok()?;
448            if bytes.len() < 8 {
449                return None;
450            }
451            Some(u64::from_le_bytes(bytes[..8].try_into().unwrap()))
452        };
453
454        if path.exists() {
455            match try_read(&path) {
456                Some(v) => return Ok(v),
457                None => {
458                    // hwm.bin exists but is corrupt — fall through to tmp recovery.
459                }
460            }
461        }
462
463        // hwm.bin is absent or unreadable.  Check for a tmp leftover.
464        if tmp_path.exists() {
465            if let Some(v) = try_read(&tmp_path) {
466                // Promote: atomically rename tmp → canonical so the next open
467                // is clean even if we crash again immediately here.
468                let _ = fs::rename(&tmp_path, &path);
469                return Ok(v);
470            }
471        }
472
473        // Last resort: infer the HWM from the sizes of the column files.
474        //
475        // Each `col_{n}.bin` is a flat array of 8-byte u64 LE values, one per
476        // slot.  The number of slots written equals `file_len / 8`.  If
477        // `hwm.bin` is corrupt we can reconstruct the HWM as the maximum slot
478        // count across all column files for this label.
479        {
480            let inferred = self.infer_hwm_from_cols(label_id);
481            if inferred > 0 {
482                // Persist the recovered HWM so the next open is clean.
483                let _ = self.save_hwm(label_id, inferred);
484                return Ok(inferred);
485            }
486        }
487
488        // No usable file at all — fresh label, HWM is 0.
489        Ok(0)
490    }
491
492    /// Infer the high-water mark for `label_id` from the sizes of the column
493    /// files on disk.  Returns 0 if no column files exist or none are readable.
494    ///
495    /// Each `col_{n}.bin` stores one u64 per slot, so `file_len / 8` gives the
496    /// slot count.  We return the maximum over all columns.
497    fn infer_hwm_from_cols(&self, label_id: u32) -> u64 {
498        let dir = self.label_dir(label_id);
499        let read_dir = match fs::read_dir(&dir) {
500            Ok(rd) => rd,
501            Err(_) => return 0,
502        };
503        read_dir
504            .flatten()
505            .filter_map(|entry| {
506                let name = entry.file_name();
507                let name_str = name.to_string_lossy().into_owned();
508                // Only consider col_{n}.bin files.
509                name_str.strip_prefix("col_")?.strip_suffix(".bin")?;
510                let meta = entry.metadata().ok()?;
511                Some(meta.len() / 8)
512            })
513            .max()
514            .unwrap_or(0)
515    }
516
517    /// Write the high-water mark for `label_id` to disk atomically.
518    ///
519    /// Strategy (SPA-211): write to `hwm.bin.tmp`, fsync, then rename to
520    /// `hwm.bin`.  On POSIX, `rename(2)` is atomic, so a crash at any point
521    /// leaves either the old `hwm.bin` intact or the fully-written new one.
522    fn save_hwm(&self, label_id: u32, hwm: u64) -> Result<()> {
523        use std::io::Write as _;
524
525        let path = self.hwm_path(label_id);
526        let tmp_path = self.hwm_tmp_path(label_id);
527
528        if let Some(parent) = path.parent() {
529            fs::create_dir_all(parent).map_err(Error::Io)?;
530        }
531
532        // Write to the tmp file and fsync before renaming.
533        {
534            let mut file = fs::File::create(&tmp_path).map_err(Error::Io)?;
535            file.write_all(&hwm.to_le_bytes()).map_err(Error::Io)?;
536            file.sync_all().map_err(Error::Io)?;
537        }
538
539        // Atomic rename: on success the old hwm.bin is replaced in one syscall.
540        fs::rename(&tmp_path, &path).map_err(Error::Io)
541    }
542
543    fn hwm_tmp_path(&self, label_id: u32) -> PathBuf {
544        self.label_dir(label_id).join("hwm.bin.tmp")
545    }
546
547    /// Persist all dirty in-memory HWMs to disk atomically.
548    ///
549    /// Called **once per transaction commit** rather than once per node creation,
550    /// so that bulk imports do not incur one fsync per node (SPA-217 regression fix).
551    ///
552    /// Each dirty label's HWM is written via the same tmp+fsync+rename strategy
553    /// used by [`save_hwm`], preserving the SPA-211 crash-safety guarantee.
554    /// After all writes succeed the dirty set is cleared.
555    pub fn flush_hwms(&mut self) -> Result<()> {
556        let dirty: Vec<u32> = self.hwm_dirty.iter().copied().collect();
557        for label_id in dirty {
558            let hwm = match self.hwm.get(&label_id) {
559                Some(&v) => v,
560                None => continue,
561            };
562            self.save_hwm(label_id, hwm)?;
563        }
564        self.hwm_dirty.clear();
565        Ok(())
566    }
567
568    /// Append a `u64` value to a column file.
569    fn append_col(&self, label_id: u32, col_id: u32, slot: u32, value: u64) -> Result<()> {
570        use std::io::{Seek, SeekFrom, Write};
571        let path = self.col_path(label_id, col_id);
572        if let Some(parent) = path.parent() {
573            fs::create_dir_all(parent).map_err(Error::Io)?;
574        }
575        // Open without truncation so we can inspect the current length.
576        let mut file = fs::OpenOptions::new()
577            .create(true)
578            .truncate(false)
579            .read(true)
580            .write(true)
581            .open(&path)
582            .map_err(Error::Io)?;
583
584        // Pad with zeros for any slots that were skipped (sparse write pattern).
585        // Without padding, a later slot's value would be written at offset 0,
586        // causing earlier slots to incorrectly read that value.
587        let existing_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
588        let expected_offset = slot as u64 * 8;
589        if existing_len < expected_offset {
590            file.seek(SeekFrom::Start(existing_len))
591                .map_err(Error::Io)?;
592            const CHUNK: usize = 65536;
593            let zeros = [0u8; CHUNK];
594            let mut remaining = (expected_offset - existing_len) as usize;
595            while remaining > 0 {
596                let n = remaining.min(CHUNK);
597                file.write_all(&zeros[..n]).map_err(Error::Io)?;
598                remaining -= n;
599            }
600        }
601
602        // Seek to the correct slot position and write the value.
603        file.seek(SeekFrom::Start(expected_offset))
604            .map_err(Error::Io)?;
605        file.write_all(&value.to_le_bytes()).map_err(Error::Io)
606    }
607
608    /// Read the `u64` stored at `slot` in the given column file.
609    ///
610    /// Returns `Ok(0)` when the column file does not exist yet — a missing file
611    /// means no value has ever been written for this `(label_id, col_id)` pair,
612    /// which is represented as the zero bit-pattern (SPA-166).
613    fn read_col_slot(&self, label_id: u32, col_id: u32, slot: u32) -> Result<u64> {
614        let path = self.col_path(label_id, col_id);
615        let bytes = match fs::read(&path) {
616            Ok(b) => b,
617            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
618            Err(e) => return Err(Error::Io(e)),
619        };
620        let offset = slot as usize * 8;
621        if bytes.len() < offset + 8 {
622            return Err(Error::NotFound);
623        }
624        Ok(u64::from_le_bytes(
625            bytes[offset..offset + 8].try_into().unwrap(),
626        ))
627    }
628
629    // ── Public API ────────────────────────────────────────────────────────────
630
631    /// Return the high-water mark (slot count) for a label.
632    ///
633    /// Returns `0` if no nodes have been created for that label yet.
634    pub fn hwm_for_label(&self, label_id: u32) -> Result<u64> {
635        if let Some(&h) = self.hwm.get(&label_id) {
636            return Ok(h);
637        }
638        self.load_hwm(label_id)
639    }
640
641    /// Discover all column IDs that currently exist on disk for `label_id`.
642    ///
643    /// Scans the label directory for `col_{id}.bin` files and returns the
644    /// parsed `col_id` values.  Used by `create_node` to zero-pad columns
645    /// that are not supplied for a new node (SPA-187).
646    ///
647    /// Returns `Err` when the directory exists but cannot be read (e.g.
648    /// permissions failure or I/O error).  A missing directory is not an
649    /// error — it simply means no nodes of this label have been created yet.
650    pub fn col_ids_for_label(&self, label_id: u32) -> Result<Vec<u32>> {
651        self.existing_col_ids(label_id)
652    }
653
654    fn existing_col_ids(&self, label_id: u32) -> Result<Vec<u32>> {
655        let dir = self.label_dir(label_id);
656        let read_dir = match fs::read_dir(&dir) {
657            Ok(rd) => rd,
658            // Directory does not exist yet → no columns on disk.
659            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
660            Err(e) => return Err(Error::Io(e)),
661        };
662        let mut ids: Vec<u32> = read_dir
663            .flatten()
664            .filter_map(|entry| {
665                let name = entry.file_name();
666                let name_str = name.to_string_lossy().into_owned();
667                // Match "col_{col_id}.bin" filenames.
668                let id_str = name_str.strip_prefix("col_")?.strip_suffix(".bin")?;
669                id_str.parse::<u32>().ok()
670            })
671            .collect();
672        ids.sort_unstable();
673        Ok(ids)
674    }
675
676    /// Return the **on-disk** high-water mark for a label, bypassing any
677    /// in-memory advances made by `peek_next_slot`.
678    ///
679    /// Used by [`WriteTx::merge_node`] to limit the disk scan to only slots
680    /// that have actually been persisted.
681    pub fn disk_hwm_for_label(&self, label_id: u32) -> Result<u64> {
682        self.load_hwm(label_id)
683    }
684
685    /// Reserve the slot index that the *next* `create_node` call will use for
686    /// `label_id`, advancing the in-memory HWM so that the slot is not
687    /// assigned again within the same [`NodeStore`] instance.
688    ///
689    /// This is used by [`WriteTx::create_node`] to pre-compute a [`NodeId`]
690    /// before the actual disk write, so the ID can be returned to the caller
691    /// while the write is deferred until commit (SPA-181).
692    ///
693    /// The on-disk HWM is **not** updated here; it is updated when the
694    /// buffered `NodeCreate` operation is applied in `commit()`.
695    pub fn peek_next_slot(&mut self, label_id: u32) -> Result<u32> {
696        // Load from disk if not cached yet.
697        if !self.hwm.contains_key(&label_id) {
698            let h = self.load_hwm(label_id)?;
699            self.hwm.insert(label_id, h);
700        }
701        let h = *self.hwm.get(&label_id).unwrap();
702        // Advance the in-memory HWM so a subsequent peek returns the next slot.
703        self.hwm.insert(label_id, h + 1);
704        Ok(h as u32)
705    }
706
707    /// Write a node at a pre-reserved `slot` (SPA-181 commit path).
708    ///
709    /// Like [`create_node`] but uses the caller-specified `slot` index instead
710    /// of deriving it from the HWM.  Used by [`WriteTx::commit`] to flush
711    /// buffered node-create operations in the exact order they were issued,
712    /// with slots that were already pre-allocated by [`peek_next_slot`].
713    ///
714    /// Advances the on-disk HWM to `slot + 1` (or higher if already past that).
715    pub fn create_node_at_slot(
716        &mut self,
717        label_id: u32,
718        slot: u32,
719        props: &[(u32, Value)],
720    ) -> Result<NodeId> {
721        // Snapshot original column sizes for rollback on partial failure.
722        let original_sizes: Vec<(u32, u64)> = props
723            .iter()
724            .map(|&(col_id, _)| {
725                let path = self.col_path(label_id, col_id);
726                let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
727                (col_id, size)
728            })
729            .collect();
730
731        let write_result = (|| {
732            for &(col_id, ref val) in props {
733                self.append_col(label_id, col_id, slot, self.encode_value(val)?)?;
734                // Mark this slot as present in the null bitmap (SPA-207).
735                self.set_null_bit(label_id, col_id, slot)?;
736            }
737            Ok::<(), sparrowdb_common::Error>(())
738        })();
739
740        if let Err(e) = write_result {
741            for (col_id, original_size) in &original_sizes {
742                let path = self.col_path(label_id, *col_id);
743                if path.exists() {
744                    if let Err(rollback_err) = fs::OpenOptions::new()
745                        .write(true)
746                        .open(&path)
747                        .and_then(|f| f.set_len(*original_size))
748                    {
749                        eprintln!(
750                            "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
751                            path.display(),
752                            original_size,
753                            rollback_err
754                        );
755                    }
756                }
757            }
758            return Err(e);
759        }
760
761        // Advance the in-memory HWM to at least slot + 1 and mark the label
762        // dirty so that flush_hwms() will persist it at commit boundary.
763        //
764        // We do NOT call save_hwm here.  The caller (WriteTx::commit) is
765        // responsible for calling flush_hwms() once after all PendingOp::NodeCreate
766        // entries have been applied.  This avoids one fsync per node during bulk
767        // imports (SPA-217 regression fix) while preserving crash-safety: the WAL
768        // record is already durable at this point, so recovery can reconstruct the
769        // HWM if we crash before flush_hwms() completes.
770        //
771        // NOTE: peek_next_slot() may have already advanced self.hwm to slot+1,
772        // so new_hwm > mem_hwm might be false.  We mark the label dirty
773        // unconditionally so that flush_hwms() always writes through to disk.
774        let new_hwm = slot as u64 + 1;
775        let mem_hwm = self.hwm.get(&label_id).copied().unwrap_or(0);
776        if new_hwm > mem_hwm {
777            self.hwm.insert(label_id, new_hwm);
778        }
779        // Always mark dirty — flush_hwms() must write the post-commit HWM even
780        // if peek_next_slot already set mem HWM == new_hwm.
781        self.hwm_dirty.insert(label_id);
782
783        Ok(NodeId((label_id as u64) << 32 | slot as u64))
784    }
785
786    /// Batch-write column data for multiple nodes created in a single transaction
787    /// commit (SPA-212 write-amplification fix).
788    ///
789    /// # Why this exists
790    ///
791    /// The naive path calls `create_node_at_slot` per node, which opens and
792    /// closes every column file once per node.  For a transaction that creates
793    /// `N` nodes each with `C` columns, that is `O(N × C)` file-open/close
794    /// syscalls.
795    ///
796    /// This method instead:
797    /// 1. Accepts pre-encoded `(label_id, col_id, slot, raw_value, is_present)`
798    ///    tuples from the caller (value encoding happens in `commit()` before
799    ///    the call).
800    /// 2. Sorts by `(label_id, col_id)` so all writes to the same column file
801    ///    are contiguous.
802    /// 3. Opens each `(label_id, col_id)` file exactly **once**, writes all
803    ///    slots for that column, then closes it — reducing file opens to
804    ///    `O(labels × cols)`.
805    /// 4. Updates each null-bitmap file once per `(label_id, col_id)` group.
806    ///
807    /// HWM advances are applied for every `(label_id, slot)` in `node_slots`,
808    /// exactly as `create_node_at_slot` would do them.  `node_slots` must
809    /// include **all** created nodes — including those with zero properties —
810    /// so that the HWM is advanced even for property-less nodes.
811    ///
812    /// # Rollback
813    ///
814    /// On I/O failure the method truncates every file that was opened back to
815    /// its pre-call size, matching the rollback contract of
816    /// `create_node_at_slot`.
817    pub fn batch_write_node_creates(
818        &mut self,
819        // (label_id, col_id, slot, raw_u64, is_present)
820        mut writes: Vec<(u32, u32, u32, u64, bool)>,
821        // All (label_id, slot) pairs for every created node, including those
822        // with zero properties.
823        node_slots: &[(u32, u32)],
824    ) -> Result<()> {
825        use std::io::{Seek, SeekFrom, Write};
826
827        if writes.is_empty() && node_slots.is_empty() {
828            return Ok(());
829        }
830
831        // Sort by (label_id, col_id, slot) so all writes to the same file are
832        // contiguous.
833        writes.sort_unstable_by_key(|&(lid, cid, slot, _, _)| (lid, cid, slot));
834
835        // Snapshot original sizes for rollback.  We only need one entry per
836        // (label_id, col_id) pair.
837        let mut original_sizes: Vec<(u32, u32, PathBuf, u64)> = Vec::new();
838        {
839            let mut prev_key: Option<(u32, u32)> = None;
840            for &(lid, cid, _, _, _) in &writes {
841                let key = (lid, cid);
842                if prev_key == Some(key) {
843                    continue;
844                }
845                prev_key = Some(key);
846                let path = self.col_path(lid, cid);
847                let original = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
848                original_sizes.push((lid, cid, path, original));
849            }
850        }
851        // Also snapshot null-bitmap files (one per col group).
852        let mut bitmap_originals: Vec<(u32, u32, PathBuf, u64)> = Vec::new();
853        {
854            let mut prev_key: Option<(u32, u32)> = None;
855            for &(lid, cid, _, _, is_present) in &writes {
856                if !is_present {
857                    continue;
858                }
859                let key = (lid, cid);
860                if prev_key == Some(key) {
861                    continue;
862                }
863                prev_key = Some(key);
864                let path = self.null_bitmap_path(lid, cid);
865                let original = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
866                bitmap_originals.push((lid, cid, path, original));
867            }
868        }
869
870        let write_result = (|| -> Result<()> {
871            let mut i = 0;
872            while i < writes.len() {
873                let (lid, cid, _, _, _) = writes[i];
874
875                // Find the end of this (label_id, col_id) group.
876                let group_start = i;
877                while i < writes.len() && writes[i].0 == lid && writes[i].1 == cid {
878                    i += 1;
879                }
880                let group = &writes[group_start..i];
881
882                // ── Column file ──────────────────────────────────────────────
883                let path = self.col_path(lid, cid);
884                if let Some(parent) = path.parent() {
885                    fs::create_dir_all(parent).map_err(Error::Io)?;
886                }
887                let mut file = fs::OpenOptions::new()
888                    .create(true)
889                    .truncate(false)
890                    .read(true)
891                    .write(true)
892                    .open(&path)
893                    .map_err(Error::Io)?;
894
895                let mut current_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
896
897                for &(_, _, slot, raw_value, _) in group {
898                    let expected_offset = slot as u64 * 8;
899                    // Pad with zeros for any skipped slots.
900                    if current_len < expected_offset {
901                        file.seek(SeekFrom::Start(current_len)).map_err(Error::Io)?;
902                        const CHUNK: usize = 65536;
903                        let zeros = [0u8; CHUNK];
904                        let mut remaining = (expected_offset - current_len) as usize;
905                        while remaining > 0 {
906                            let n = remaining.min(CHUNK);
907                            file.write_all(&zeros[..n]).map_err(Error::Io)?;
908                            remaining -= n;
909                        }
910                        current_len = expected_offset;
911                    }
912                    // Seek to exact slot and write.
913                    file.seek(SeekFrom::Start(expected_offset))
914                        .map_err(Error::Io)?;
915                    file.write_all(&raw_value.to_le_bytes())
916                        .map_err(Error::Io)?;
917                    // Advance current_len to reflect what we wrote.
918                    let after = expected_offset + 8;
919                    if after > current_len {
920                        current_len = after;
921                    }
922                }
923
924                // ── Null bitmap (one read-modify-write per col group) ────────
925                let present_slots: Vec<u32> = group
926                    .iter()
927                    .filter(|&&(_, _, _, _, is_present)| is_present)
928                    .map(|&(_, _, slot, _, _)| slot)
929                    .collect();
930
931                if !present_slots.is_empty() {
932                    let bmap_path = self.null_bitmap_path(lid, cid);
933                    if let Some(parent) = bmap_path.parent() {
934                        fs::create_dir_all(parent).map_err(Error::Io)?;
935                    }
936                    let mut bits = if bmap_path.exists() {
937                        fs::read(&bmap_path).map_err(Error::Io)?
938                    } else {
939                        vec![]
940                    };
941                    for slot in present_slots {
942                        let byte_idx = (slot / 8) as usize;
943                        let bit_idx = slot % 8;
944                        if bits.len() <= byte_idx {
945                            bits.resize(byte_idx + 1, 0);
946                        }
947                        bits[byte_idx] |= 1 << bit_idx;
948                    }
949                    fs::write(&bmap_path, &bits).map_err(Error::Io)?;
950                }
951            }
952            Ok(())
953        })();
954
955        if let Err(e) = write_result {
956            // Roll back column files.
957            for (_, _, path, original_size) in &original_sizes {
958                if path.exists() {
959                    if let Err(rollback_err) = fs::OpenOptions::new()
960                        .write(true)
961                        .open(path)
962                        .and_then(|f| f.set_len(*original_size))
963                    {
964                        eprintln!(
965                            "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
966                            path.display(),
967                            original_size,
968                            rollback_err
969                        );
970                    }
971                }
972            }
973            // Roll back null bitmap files.
974            for (_, _, path, original_size) in &bitmap_originals {
975                if path.exists() {
976                    if let Err(rollback_err) = fs::OpenOptions::new()
977                        .write(true)
978                        .open(path)
979                        .and_then(|f| f.set_len(*original_size))
980                    {
981                        eprintln!(
982                            "CRITICAL: Failed to roll back null bitmap file {} to size {}: {}. Data may be corrupt.",
983                            path.display(),
984                            original_size,
985                            rollback_err
986                        );
987                    }
988                }
989            }
990            return Err(e);
991        }
992
993        // Advance HWMs using the explicit node_slots list so that nodes with
994        // zero properties also advance the HWM, matching the contract of
995        // create_node_at_slot.
996        for &(lid, slot) in node_slots {
997            let new_hwm = slot as u64 + 1;
998            let mem_hwm = self.hwm.get(&lid).copied().unwrap_or(0);
999            if new_hwm > mem_hwm {
1000                self.hwm.insert(lid, new_hwm);
1001            }
1002            self.hwm_dirty.insert(lid);
1003        }
1004
1005        Ok(())
1006    }
1007
1008    /// Create a new node in `label_id` with the given properties.
1009    ///
1010    /// Returns the new [`NodeId`] packed as `(label_id << 32) | slot`.
1011    ///
1012    /// ## Slot alignment guarantee (SPA-187)
1013    ///
1014    /// Every column file for `label_id` must have exactly `node_count * 8`
1015    /// bytes so that slot N always refers to node N across all columns.  When
1016    /// a node is created without a value for an already-known column, that
1017    /// column file is zero-padded to `(slot + 1) * 8` bytes.  The zero
1018    /// sentinel is recognised by `read_col_slot_nullable` as "absent" and
1019    /// surfaces as `Value::Null` in query results.
1020    pub fn create_node(&mut self, label_id: u32, props: &[(u32, Value)]) -> Result<NodeId> {
1021        // Load or get cached hwm.
1022        let hwm = if let Some(h) = self.hwm.get(&label_id) {
1023            *h
1024        } else {
1025            let h = self.load_hwm(label_id)?;
1026            self.hwm.insert(label_id, h);
1027            h
1028        };
1029
1030        let slot = hwm as u32;
1031
1032        // Collect the set of col_ids supplied for this node.
1033        let supplied_col_ids: std::collections::HashSet<u32> =
1034            props.iter().map(|&(col_id, _)| col_id).collect();
1035
1036        // Discover all columns already on disk for this label.  Any that are
1037        // NOT in `supplied_col_ids` must be zero-padded so their slot count
1038        // stays in sync with the HWM (SPA-187).
1039        let existing_col_ids = self.existing_col_ids(label_id)?;
1040
1041        // Columns that need zero-padding: exist on disk but not in this node's props.
1042        let cols_to_zero_pad: Vec<u32> = existing_col_ids
1043            .iter()
1044            .copied()
1045            .filter(|col_id| !supplied_col_ids.contains(col_id))
1046            .collect();
1047
1048        // Build the full list of columns to touch for rollback tracking:
1049        // supplied columns + columns that need zero-padding.
1050        let all_col_ids_to_touch: Vec<u32> = supplied_col_ids
1051            .iter()
1052            .copied()
1053            .chain(cols_to_zero_pad.iter().copied())
1054            .collect();
1055
1056        // Snapshot the original size of each column file so we can roll back
1057        // on partial failure.  A column that does not yet exist has size 0.
1058        let original_sizes: Vec<(u32, u64)> = all_col_ids_to_touch
1059            .iter()
1060            .map(|&col_id| {
1061                let path = self.col_path(label_id, col_id);
1062                let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
1063                (col_id, size)
1064            })
1065            .collect();
1066
1067        // Write each property column.  For columns not in `props`, write a
1068        // zero-padded entry (the zero sentinel means "absent" / NULL).
1069        // On failure, roll back all columns that were already written to avoid
1070        // slot misalignment.
1071        let write_result = (|| {
1072            // Write supplied columns with their actual values.
1073            for &(col_id, ref val) in props {
1074                self.append_col(label_id, col_id, slot, self.encode_value(val)?)?;
1075                // Mark this slot as present in the null bitmap (SPA-207).
1076                self.set_null_bit(label_id, col_id, slot)?;
1077            }
1078            // Zero-pad existing columns that were NOT supplied for this node.
1079            // Do NOT set null bitmap bits for these — they remain absent/null.
1080            for &col_id in &cols_to_zero_pad {
1081                self.append_col(label_id, col_id, slot, 0u64)?;
1082            }
1083            Ok::<(), sparrowdb_common::Error>(())
1084        })();
1085
1086        if let Err(e) = write_result {
1087            // Truncate each column back to its original size.
1088            for (col_id, original_size) in &original_sizes {
1089                let path = self.col_path(label_id, *col_id);
1090                if path.exists() {
1091                    if let Err(rollback_err) = fs::OpenOptions::new()
1092                        .write(true)
1093                        .open(&path)
1094                        .and_then(|f| f.set_len(*original_size))
1095                    {
1096                        eprintln!(
1097                            "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
1098                            path.display(),
1099                            original_size,
1100                            rollback_err
1101                        );
1102                    }
1103                }
1104            }
1105            return Err(e);
1106        }
1107
1108        // Update hwm.
1109        let new_hwm = hwm + 1;
1110        self.save_hwm(label_id, new_hwm)?;
1111        *self.hwm.get_mut(&label_id).unwrap() = new_hwm;
1112
1113        // Pack node ID.
1114        let node_id = ((label_id as u64) << 32) | (slot as u64);
1115        Ok(NodeId(node_id))
1116    }
1117
1118    /// Write a deletion tombstone (`u64::MAX`) into `col_0.bin` for `node_id`.
1119    ///
1120    /// Creates `col_0.bin` (and its parent directory) if it does not exist,
1121    /// zero-padding all preceding slots.  This ensures that nodes which were
1122    /// created without any `col_0` property are still properly marked as deleted
1123    /// and become invisible to subsequent scans.
1124    ///
1125    /// Called from [`WriteTx::commit`] when flushing a buffered `NodeDelete`.
1126    pub fn tombstone_node(&self, node_id: NodeId) -> Result<()> {
1127        use std::io::{Seek, SeekFrom, Write};
1128        let label_id = (node_id.0 >> 32) as u32;
1129        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1130        let col0 = self.col_path(label_id, 0);
1131
1132        // Ensure the parent directory exists.
1133        if let Some(parent) = col0.parent() {
1134            fs::create_dir_all(parent).map_err(Error::Io)?;
1135        }
1136
1137        let mut f = fs::OpenOptions::new()
1138            .create(true)
1139            .truncate(false)
1140            .read(true)
1141            .write(true)
1142            .open(&col0)
1143            .map_err(Error::Io)?;
1144
1145        // Zero-pad any slots before `slot` that are not yet in the file.
1146        let needed_len = (slot as u64 + 1) * 8;
1147        let existing_len = f.seek(SeekFrom::End(0)).map_err(Error::Io)?;
1148        if existing_len < needed_len {
1149            let zeros = vec![0u8; (needed_len - existing_len) as usize];
1150            f.write_all(&zeros).map_err(Error::Io)?;
1151        }
1152
1153        // Seek to the slot and write the tombstone value.
1154        f.seek(SeekFrom::Start(slot as u64 * 8))
1155            .map_err(Error::Io)?;
1156        f.write_all(&u64::MAX.to_le_bytes()).map_err(Error::Io)
1157    }
1158
1159    /// Overwrite the value of a single column for an existing node.
1160    ///
1161    /// Seeks to the slot's offset within `col_{col_id}.bin` and writes the new
1162    /// 8-byte little-endian value in-place.  Returns `Err(NotFound)` if the
1163    /// slot does not exist yet.
1164    pub fn set_node_col(&self, node_id: NodeId, col_id: u32, value: &Value) -> Result<()> {
1165        use std::io::{Seek, SeekFrom, Write};
1166        let label_id = (node_id.0 >> 32) as u32;
1167        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1168        let path = self.col_path(label_id, col_id);
1169        let mut file = fs::OpenOptions::new()
1170            .write(true)
1171            .open(&path)
1172            .map_err(|_| Error::NotFound)?;
1173        let offset = slot as u64 * 8;
1174        file.seek(SeekFrom::Start(offset)).map_err(Error::Io)?;
1175        file.write_all(&self.encode_value(value)?.to_le_bytes())
1176            .map_err(Error::Io)
1177    }
1178
1179    /// Write or create a column value for a node, creating and zero-padding the
1180    /// column file if it does not yet exist.
1181    ///
1182    /// Unlike [`set_node_col`], this method creates the column file and fills all
1183    /// slots from 0 to `slot - 1` with zeros before writing the target value.
1184    /// This supports adding new property columns to existing nodes (Phase 7
1185    /// `set_property` semantics).
1186    pub fn upsert_node_col(&self, node_id: NodeId, col_id: u32, value: &Value) -> Result<()> {
1187        use std::io::{Seek, SeekFrom, Write};
1188        let label_id = (node_id.0 >> 32) as u32;
1189        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1190        let path = self.col_path(label_id, col_id);
1191
1192        // Ensure parent directory exists.
1193        if let Some(parent) = path.parent() {
1194            fs::create_dir_all(parent).map_err(Error::Io)?;
1195        }
1196
1197        // Open the file (create if absent), then pad with zeros up to and
1198        // including the target slot, and finally seek back to write the value.
1199        // We must NOT truncate: we only update a specific slot, not the whole file.
1200        let mut file = fs::OpenOptions::new()
1201            .create(true)
1202            .truncate(false)
1203            .read(true)
1204            .write(true)
1205            .open(&path)
1206            .map_err(Error::Io)?;
1207
1208        // How many bytes are already in the file?
1209        let existing_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
1210        let needed_len = (slot as u64 + 1) * 8;
1211        if existing_len < needed_len {
1212            // Extend file with zeros from existing_len to needed_len.
1213            // Write in fixed-size chunks to avoid a single large allocation
1214            // that could OOM when the node slot ID is very high.
1215            file.seek(SeekFrom::Start(existing_len))
1216                .map_err(Error::Io)?;
1217            const CHUNK: usize = 65536; // 64 KiB
1218            let zeros = [0u8; CHUNK];
1219            let mut remaining = (needed_len - existing_len) as usize;
1220            while remaining > 0 {
1221                let n = remaining.min(CHUNK);
1222                file.write_all(&zeros[..n]).map_err(Error::Io)?;
1223                remaining -= n;
1224            }
1225        }
1226
1227        // Seek to target slot and overwrite.
1228        file.seek(SeekFrom::Start(slot as u64 * 8))
1229            .map_err(Error::Io)?;
1230        file.write_all(&self.encode_value(value)?.to_le_bytes())
1231            .map_err(Error::Io)
1232    }
1233
1234    /// Retrieve all stored properties of a node.
1235    ///
1236    /// Returns `(col_id, raw_u64)` pairs in the order the columns were defined.
1237    /// The caller knows the schema (col IDs) from the catalog.
1238    pub fn get_node_raw(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, u64)>> {
1239        let label_id = (node_id.0 >> 32) as u32;
1240        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1241
1242        let mut result = Vec::with_capacity(col_ids.len());
1243        for &col_id in col_ids {
1244            let val = self.read_col_slot(label_id, col_id, slot)?;
1245            result.push((col_id, val));
1246        }
1247        Ok(result)
1248    }
1249
1250    /// Like [`get_node_raw`] but treats absent columns as `None` rather than
1251    /// propagating [`Error::NotFound`].
1252    ///
1253    /// A column is considered absent when:
1254    /// - Its column file does not exist (property never written for the label).
1255    /// - Its column file is shorter than `slot + 1` entries (sparse write —
1256    ///   an earlier node never wrote this column; a later node that did write it
1257    ///   padded the file, but this slot's value was never explicitly stored).
1258    ///
1259    /// This is the correct read path for IS NULL evaluation: absent properties
1260    /// must appear as `Value::Null`, not as an error or as integer 0.
1261    pub fn get_node_raw_nullable(
1262        &self,
1263        node_id: NodeId,
1264        col_ids: &[u32],
1265    ) -> Result<Vec<(u32, Option<u64>)>> {
1266        let label_id = (node_id.0 >> 32) as u32;
1267        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1268
1269        let mut result = Vec::with_capacity(col_ids.len());
1270        for &col_id in col_ids {
1271            let opt_val = self.read_col_slot_nullable(label_id, col_id, slot)?;
1272            result.push((col_id, opt_val));
1273        }
1274        Ok(result)
1275    }
1276
1277    /// Read a single column slot, returning `None` when the column was never
1278    /// written for this node (file absent or slot out of bounds / not set).
1279    ///
1280    /// Unlike [`read_col_slot`], this function distinguishes between:
1281    /// - Column file does not exist → `None` (property never set on any node).
1282    /// - Slot is beyond the file end → `None` (property not set on this node).
1283    /// - Null bitmap says slot is absent → `None` (slot was zero-padded for alignment).
1284    /// - Null bitmap says slot is present → `Some(raw)` (real value, may be 0 for Int64(0)).
1285    ///
1286    /// The null-bitmap sidecar (`col_{id}_null.bin`) is used to distinguish
1287    /// legitimately-stored 0 values (e.g. `Int64(0)`) from absent/zero-padded
1288    /// slots (SPA-207).  Backward compat: if no bitmap file exists, all slots
1289    /// within the file are treated as present.
1290    fn read_col_slot_nullable(&self, label_id: u32, col_id: u32, slot: u32) -> Result<Option<u64>> {
1291        let path = self.col_path(label_id, col_id);
1292        let bytes = match fs::read(&path) {
1293            Ok(b) => b,
1294            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
1295            Err(e) => return Err(Error::Io(e)),
1296        };
1297        let offset = slot as usize * 8;
1298        if bytes.len() < offset + 8 {
1299            return Ok(None);
1300        }
1301        // Use the null-bitmap sidecar to determine whether this slot has a real
1302        // value.  This replaces the old raw==0 sentinel which incorrectly treated
1303        // Int64(0) as absent (SPA-207).
1304        if !self.get_null_bit(label_id, col_id, slot)? {
1305            return Ok(None);
1306        }
1307        let raw = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
1308        Ok(Some(raw))
1309    }
1310
1311    /// Retrieve the typed property values for a node.
1312    ///
1313    /// Convenience wrapper over [`get_node_raw`] that decodes every raw `u64`
1314    /// back to a `Value`, reading the overflow string heap when needed (SPA-212).
1315    pub fn get_node(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, Value)>> {
1316        let raw = self.get_node_raw(node_id, col_ids)?;
1317        Ok(raw
1318            .into_iter()
1319            .map(|(col_id, v)| (col_id, self.decode_raw_value(v)))
1320            .collect())
1321    }
1322
1323    /// Read the entire contents of `col_{col_id}.bin` for `label_id` as a
1324    /// flat `Vec<u64>`.  Returns an empty vec when the file does not exist yet.
1325    ///
1326    /// This is used by [`crate::property_index::PropertyIndex::build`] to scan
1327    /// all slot values in one `fs::read` call rather than one `read_col_slot`
1328    /// per node, making index construction O(n) rather than O(n * syscall-overhead).
1329    pub fn read_col_all(&self, label_id: u32, col_id: u32) -> Result<Vec<u64>> {
1330        let path = self.col_path(label_id, col_id);
1331        let bytes = match fs::read(&path) {
1332            Ok(b) => b,
1333            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
1334            Err(e) => return Err(Error::Io(e)),
1335        };
1336        // Interpret the byte slice as a flat array of little-endian u64s.
1337        let count = bytes.len() / 8;
1338        let mut out = Vec::with_capacity(count);
1339        for i in 0..count {
1340            let offset = i * 8;
1341            let v = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
1342            out.push(v);
1343        }
1344        Ok(out)
1345    }
1346
1347    /// Read the null-bitmap for `(label_id, col_id)` and return a `Vec<bool>`
1348    /// where index `i` is `true` iff slot `i` has a real value (was explicitly
1349    /// written, as opposed to zero-padded for alignment or never written).
1350    ///
1351    /// If no bitmap file exists (old data written before SPA-207), returns
1352    /// `None` to signal backward-compat mode — callers should fall back to the
1353    /// `raw != 0` sentinel in that case.
1354    ///
1355    /// Used by [`PropertyIndex::build_for`] and [`PropertyIndex::build`] to
1356    /// correctly index slots whose value is `Int64(0)` (raw = 0), which is
1357    /// otherwise indistinguishable from the absent sentinel without the bitmap.
1358    pub fn read_null_bitmap_all(&self, label_id: u32, col_id: u32) -> Result<Option<Vec<bool>>> {
1359        let path = self.null_bitmap_path(label_id, col_id);
1360        if !path.exists() {
1361            return Ok(None); // backward compat: no bitmap file
1362        }
1363        let bits = fs::read(&path).map_err(Error::Io)?;
1364        // Each byte encodes 8 slots: bit i of byte j → slot (j*8 + i).
1365        let mut out = Vec::with_capacity(bits.len() * 8);
1366        for &byte in &bits {
1367            for bit_idx in 0..8u32 {
1368                out.push((byte >> bit_idx) & 1 == 1);
1369            }
1370        }
1371        Ok(Some(out))
1372    }
1373
1374    /// Selective sorted-slot read — O(K) seeks instead of O(N) reads.
1375    ///
1376    /// For each column, opens the file once and reads only the `slots` needed,
1377    /// in slot-ascending order (for sequential-ish I/O).  This is the hot path
1378    /// for hop queries where K neighbor slots ≪ N total nodes.
1379    ///
1380    /// `slots` **must** be pre-sorted ascending by the caller.
1381    /// Returns a `Vec` indexed parallel to `slots`; each inner `Vec` is indexed
1382    /// parallel to `col_ids`.  Out-of-range or missing-file slots return 0.
1383    ///
1384    /// SPA-200: replaces full O(N) column loads with O(K) per-column seeks.
1385    fn read_col_slots_sorted(
1386        &self,
1387        label_id: u32,
1388        slots: &[u32],
1389        col_ids: &[u32],
1390    ) -> Result<Vec<Vec<u64>>> {
1391        if slots.is_empty() || col_ids.is_empty() {
1392            // Return a row of empty vecs parallel to slots
1393            return Ok(slots.iter().map(|_| vec![0u64; col_ids.len()]).collect());
1394        }
1395
1396        // result[slot_idx][col_idx]
1397        let mut result: Vec<Vec<u64>> = slots.iter().map(|_| vec![0u64; col_ids.len()]).collect();
1398
1399        for (col_idx, &col_id) in col_ids.iter().enumerate() {
1400            let path = self.col_path(label_id, col_id);
1401            let mut file = match fs::File::open(&path) {
1402                Ok(f) => f,
1403                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1404                    // Column file doesn't exist — all slots stay 0
1405                    continue;
1406                }
1407                Err(e) => return Err(Error::Io(e)),
1408            };
1409
1410            let file_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
1411            // Reset to start for sequential reads
1412            file.seek(SeekFrom::Start(0)).map_err(Error::Io)?;
1413
1414            let mut buf = [0u8; 8];
1415            let mut current_pos: u64 = 0;
1416
1417            for (slot_idx, &slot) in slots.iter().enumerate() {
1418                let target_pos = slot as u64 * 8;
1419                if target_pos + 8 > file_len {
1420                    // Slot beyond end of file — leave as 0
1421                    continue;
1422                }
1423                if target_pos != current_pos {
1424                    file.seek(SeekFrom::Start(target_pos)).map_err(Error::Io)?;
1425                    current_pos = target_pos;
1426                }
1427                file.read_exact(&mut buf).map_err(Error::Io)?;
1428                current_pos += 8;
1429                result[slot_idx][col_idx] = u64::from_le_bytes(buf);
1430            }
1431        }
1432
1433        Ok(result)
1434    }
1435
1436    /// Batch-read multiple slots from multiple columns, returning nullable results.
1437    ///
1438    /// Like [`batch_read_node_props`] but preserves null semantics using the
1439    /// null-bitmap sidecar (`col_{id}_null.bin`).  Each `Option<u64>` is:
1440    /// - `Some(raw)` — the slot has a real stored value (may be 0 for `Int64(0)`).
1441    /// - `None` — the slot was zero-padded, never written, or the null bitmap
1442    ///   marks it absent.
1443    ///
1444    /// Backward compatible: if no bitmap file exists for a column (data written
1445    /// before SPA-207), every slot whose column file entry is within bounds is
1446    /// treated as present (`Some`).
1447    ///
1448    /// This replaces the raw-`0`-as-absent semantic of [`batch_read_node_props`]
1449    /// and is the correct API for the chunked pipeline (Phase 2, SPA-299).
1450    pub fn batch_read_node_props_nullable(
1451        &self,
1452        label_id: u32,
1453        slots: &[u32],
1454        col_ids: &[u32],
1455    ) -> Result<Vec<Vec<Option<u64>>>> {
1456        if slots.is_empty() {
1457            return Ok(vec![]);
1458        }
1459        if col_ids.is_empty() {
1460            return Ok(slots.iter().map(|_| vec![]).collect());
1461        }
1462
1463        let hwm = self.hwm_for_label(label_id).unwrap_or(0) as usize;
1464        // Use sorted-slot reads when K < 50% of N, identical heuristic as
1465        // batch_read_node_props.
1466        let use_sorted = hwm == 0 || slots.len() * 2 < hwm;
1467
1468        // Pre-load null bitmaps for each column (None = no bitmap = backward compat).
1469        let null_bitmaps: Vec<Option<Vec<bool>>> = col_ids
1470            .iter()
1471            .map(|&col_id| self.read_null_bitmap_all(label_id, col_id))
1472            .collect::<Result<_>>()?;
1473
1474        if use_sorted {
1475            // Build sorted permutation for sequential I/O.
1476            let mut order: Vec<usize> = (0..slots.len()).collect();
1477            order.sort_unstable_by_key(|&i| slots[i]);
1478            let sorted_slots: Vec<u32> = order.iter().map(|&i| slots[i]).collect();
1479
1480            // Raw values (0 for missing/out-of-range).
1481            let raw = self.read_col_slots_sorted(label_id, &sorted_slots, col_ids)?;
1482
1483            // Build result in original order, applying null bitmaps.
1484            let mut result = vec![vec![None; col_ids.len()]; slots.len()];
1485            for (sorted_idx, orig_idx) in order.into_iter().enumerate() {
1486                let slot = sorted_slots[sorted_idx];
1487                for (col_idx, bm) in null_bitmaps.iter().enumerate() {
1488                    let is_present = match bm {
1489                        None => {
1490                            // No bitmap — backward compat: treat as present if
1491                            // the raw value is non-zero.  This mirrors the old
1492                            // `raw != 0` sentinel from before SPA-207.
1493                            raw[sorted_idx][col_idx] != 0
1494                        }
1495                        Some(bits) => bits.get(slot as usize).copied().unwrap_or(false),
1496                    };
1497                    if is_present {
1498                        result[orig_idx][col_idx] = Some(raw[sorted_idx][col_idx]);
1499                    }
1500                }
1501            }
1502            Ok(result)
1503        } else {
1504            // Full-column load path.
1505            let col_data: Vec<Vec<u64>> = col_ids
1506                .iter()
1507                .map(|&col_id| self.read_col_all(label_id, col_id))
1508                .collect::<Result<_>>()?;
1509            Ok(slots
1510                .iter()
1511                .map(|&slot| {
1512                    col_ids
1513                        .iter()
1514                        .enumerate()
1515                        .map(|(ci, _)| {
1516                            let raw = col_data[ci].get(slot as usize).copied().unwrap_or(0);
1517                            let is_present = match &null_bitmaps[ci] {
1518                                None => raw != 0,
1519                                Some(bits) => bits.get(slot as usize).copied().unwrap_or(false),
1520                            };
1521                            if is_present {
1522                                Some(raw)
1523                            } else {
1524                                None
1525                            }
1526                        })
1527                        .collect()
1528                })
1529                .collect())
1530        }
1531    }
1532
1533    /// Batch-read multiple slots from multiple columns.
1534    ///
1535    /// Chooses between two strategies based on the K/N ratio:
1536    /// - **Sorted-slot reads** (SPA-200): when K ≪ N, seeks to each slot
1537    ///   offset — O(K × C) I/O instead of O(N × C).  Slots are sorted
1538    ///   ascending before the read so seeks are sequential.
1539    /// - **Full-column load**: when K is close to N (>50% of column), reading
1540    ///   the whole file is cheaper than many random seeks.
1541    ///
1542    /// All `slots` must belong to `label_id`.
1543    /// Returns a `Vec` indexed parallel to `slots`; inner `Vec` indexed
1544    /// parallel to `col_ids`.  Missing/out-of-range slots return 0.
1545    pub fn batch_read_node_props(
1546        &self,
1547        label_id: u32,
1548        slots: &[u32],
1549        col_ids: &[u32],
1550    ) -> Result<Vec<Vec<u64>>> {
1551        if slots.is_empty() {
1552            return Ok(vec![]);
1553        }
1554
1555        // Determine the column high-water mark (total node count for this label).
1556        let hwm = self.hwm_for_label(label_id).unwrap_or(0) as usize;
1557
1558        // Use sorted-slot reads when K < 50% of N.  For K=100, N=4039 that is
1559        // a ~40× reduction in bytes read per column.  The HWM=0 guard handles
1560        // labels that have no nodes yet (shouldn't reach here, but be safe).
1561        let use_sorted = hwm == 0 || slots.len() * 2 < hwm;
1562
1563        if use_sorted {
1564            // Sort slots ascending for sequential I/O; keep a permutation index
1565            // so we can return results in the original caller order.
1566            let mut order: Vec<usize> = (0..slots.len()).collect();
1567            order.sort_unstable_by_key(|&i| slots[i]);
1568            let sorted_slots: Vec<u32> = order.iter().map(|&i| slots[i]).collect();
1569
1570            let sorted_result = self.read_col_slots_sorted(label_id, &sorted_slots, col_ids)?;
1571
1572            // Re-order back to the original slot order expected by the caller.
1573            let mut result = vec![vec![0u64; col_ids.len()]; slots.len()];
1574            for (sorted_idx, orig_idx) in order.into_iter().enumerate() {
1575                result[orig_idx] = sorted_result[sorted_idx].clone();
1576            }
1577            Ok(result)
1578        } else {
1579            // Fall back to full column load when most slots are needed anyway.
1580            let col_data: Vec<Vec<u64>> = col_ids
1581                .iter()
1582                .map(|&col_id| self.read_col_all(label_id, col_id))
1583                .collect::<Result<_>>()?;
1584            Ok(slots
1585                .iter()
1586                .map(|&slot| {
1587                    col_ids
1588                        .iter()
1589                        .enumerate()
1590                        .map(|(ci, _)| col_data[ci].get(slot as usize).copied().unwrap_or(0))
1591                        .collect()
1592                })
1593                .collect())
1594        }
1595    }
1596}
1597
1598// ── Helpers ─────────────────────────────────────────────────────────────────── ───────────────────────────────────────────────────────────────────
1599
1600/// Unpack `(label_id, slot)` from a [`NodeId`].
1601pub fn unpack_node_id(node_id: NodeId) -> (u32, u32) {
1602    let label_id = (node_id.0 >> 32) as u32;
1603    let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1604    (label_id, slot)
1605}
1606
1607// ── Tests ─────────────────────────────────────────────────────────────────────
1608
1609#[cfg(test)]
1610mod tests {
1611    use super::*;
1612    use tempfile::tempdir;
1613
1614    #[test]
1615    fn test_node_create_and_get() {
1616        let dir = tempdir().unwrap();
1617        let mut store = NodeStore::open(dir.path()).unwrap();
1618
1619        let label_id = 1u32;
1620        let props = vec![(0u32, Value::Int64(42)), (1u32, Value::Int64(100))];
1621
1622        let node_id = store.create_node(label_id, &props).unwrap();
1623
1624        // Verify the packed node ID.
1625        let (lid, slot) = unpack_node_id(node_id);
1626        assert_eq!(lid, label_id);
1627        assert_eq!(slot, 0); // first node → slot 0
1628
1629        // Get back the values.
1630        let retrieved = store.get_node(node_id, &[0, 1]).unwrap();
1631        assert_eq!(retrieved.len(), 2);
1632        assert_eq!(retrieved[0], (0, Value::Int64(42)));
1633        assert_eq!(retrieved[1], (1, Value::Int64(100)));
1634    }
1635
1636    #[test]
1637    fn test_node_multiple_nodes_sequential_slots() {
1638        let dir = tempdir().unwrap();
1639        let mut store = NodeStore::open(dir.path()).unwrap();
1640
1641        let n1 = store.create_node(1, &[(0, Value::Int64(10))]).unwrap();
1642        let n2 = store.create_node(1, &[(0, Value::Int64(20))]).unwrap();
1643        let n3 = store.create_node(1, &[(0, Value::Int64(30))]).unwrap();
1644
1645        let (_, s1) = unpack_node_id(n1);
1646        let (_, s2) = unpack_node_id(n2);
1647        let (_, s3) = unpack_node_id(n3);
1648        assert_eq!(s1, 0);
1649        assert_eq!(s2, 1);
1650        assert_eq!(s3, 2);
1651
1652        assert_eq!(store.get_node(n1, &[0]).unwrap()[0].1, Value::Int64(10));
1653        assert_eq!(store.get_node(n2, &[0]).unwrap()[0].1, Value::Int64(20));
1654        assert_eq!(store.get_node(n3, &[0]).unwrap()[0].1, Value::Int64(30));
1655    }
1656
1657    #[test]
1658    fn test_node_persists_across_reopen() {
1659        let dir = tempdir().unwrap();
1660
1661        let node_id = {
1662            let mut store = NodeStore::open(dir.path()).unwrap();
1663            store
1664                .create_node(2, &[(0, Value::Int64(999)), (1, Value::Int64(-1))])
1665                .unwrap()
1666        };
1667
1668        // Reopen store from disk.
1669        let store2 = NodeStore::open(dir.path()).unwrap();
1670        let vals = store2.get_node(node_id, &[0, 1]).unwrap();
1671        assert_eq!(vals[0].1, Value::Int64(999));
1672        assert_eq!(vals[1].1, Value::Int64(-1));
1673    }
1674
1675    #[test]
1676    fn test_node_hwm_persists_across_reopen() {
1677        let dir = tempdir().unwrap();
1678
1679        // Create 3 nodes in session 1.
1680        {
1681            let mut store = NodeStore::open(dir.path()).unwrap();
1682            store.create_node(0, &[(0, Value::Int64(1))]).unwrap();
1683            store.create_node(0, &[(0, Value::Int64(2))]).unwrap();
1684            store.create_node(0, &[(0, Value::Int64(3))]).unwrap();
1685        }
1686
1687        // Reopen and create a 4th node — must get slot 3.
1688        let mut store2 = NodeStore::open(dir.path()).unwrap();
1689        let n4 = store2.create_node(0, &[(0, Value::Int64(4))]).unwrap();
1690        let (_, slot) = unpack_node_id(n4);
1691        assert_eq!(slot, 3);
1692    }
1693
1694    #[test]
1695    fn test_node_different_labels_independent() {
1696        let dir = tempdir().unwrap();
1697        let mut store = NodeStore::open(dir.path()).unwrap();
1698
1699        let a = store.create_node(10, &[(0, Value::Int64(1))]).unwrap();
1700        let b = store.create_node(20, &[(0, Value::Int64(2))]).unwrap();
1701
1702        let (la, sa) = unpack_node_id(a);
1703        let (lb, sb) = unpack_node_id(b);
1704        assert_eq!(la, 10);
1705        assert_eq!(sa, 0);
1706        assert_eq!(lb, 20);
1707        assert_eq!(sb, 0);
1708    }
1709}