Skip to main content

sparrowdb_storage/
node_store.rs

1//! Node property storage.
2//!
3//! Nodes are stored as typed property columns.  Each `(label_id, col_id)` pair
4//! maps to a flat binary file of fixed-width values.  Valid `u64` values pack
5//! `(label_id: u32, slot: u32)` into a single `u64` node ID consistent with
6//! [`sparrowdb_common::NodeId`] semantics.
7//!
8//! ## File layout
9//!
10//! ```text
11//! nodes/{label_id}/col_{col_id}.bin
12//! ```
13//!
14//! Each column file is a flat array of `u64` LE values (one per slot).
15//! The high-water mark is tracked in-memory and written to a small header file:
16//!
17//! ```text
18//! nodes/{label_id}/hwm.bin   — [hwm: u64 LE]
19//! ```
20//!
21//! ## Node ID packing
22//!
23//! ```text
24//! node_id = (label_id as u64) << 32 | slot as u64
25//! ```
26//!
27//! Upper 32 bits are `label_id`, lower 32 bits are the within-label slot number.
28
29use std::collections::{HashMap, HashSet};
30use std::fs;
31use std::io::{Read, Seek, SeekFrom};
32use std::path::{Path, PathBuf};
33
34use sparrowdb_common::{Error, NodeId, Result};
35
36// ── Value type ────────────────────────────────────────────────────────────────
37
38/// A typed property value.
39#[derive(Debug, Clone, PartialEq)]
40pub enum Value {
41    /// Signed 64-bit integer, stored as raw `u64` bits (two's-complement).
42    Int64(i64),
43    /// Raw byte blob, stored as a fixed-width 8-byte reference in v1.
44    /// The actual bytes are placed inline for values ≤ 8 bytes; longer blobs
45    /// are truncated and marked with a sentinel in v1 (overflow deferred).
46    Bytes(Vec<u8>),
47    /// IEEE-754 double-precision float.  Stored as 8 raw bytes in the overflow
48    /// heap so that no bits are masked by the type-tag scheme (SPA-267).
49    Float(f64),
50}
51
52/// Type tag embedded in the top byte (byte index 7 in LE) of a stored `u64`.
53///
54/// - `0x00` = `Int64`         — lower 7 bytes hold the signed integer (56-bit range).
55/// - `0x01` = `Bytes`         — lower 7 bytes hold up to 7 bytes of inline string data.
56/// - `0x02` = `BytesOverflow` — lower 7 bytes encode `(offset: u40 LE, len: u16 LE)`
57///   pointing into `strings.bin` (SPA-212).
58/// - `0x03` = `Float`         — lower 7 bytes encode `(offset: u40 LE, len: u16 LE)`
59///   pointing into `strings.bin` where 8 raw IEEE-754 bytes are stored (SPA-267).
60///
61/// The overflow encoding packs a heap pointer into 7 bytes:
62///   bytes[0..5] = heap byte offset (u40 LE, max ~1 TiB)
63///   bytes[5..7] = byte length (u16 LE, max 65535 bytes)
64///
65/// This lets `decode_raw_value` reconstruct strings of any length, fixing SPA-212.
66const TAG_INT64: u8 = 0x00;
67const TAG_BYTES: u8 = 0x01;
68/// Tag for strings > 7 bytes stored in the overflow string heap (SPA-212).
69const TAG_BYTES_OVERFLOW: u8 = 0x02;
70/// Tag for f64 values stored as 8 raw IEEE-754 bytes in the overflow heap (SPA-267).
71/// Using the heap ensures all 64 bits of the float are preserved without any masking.
72const TAG_FLOAT: u8 = 0x03;
73/// Maximum bytes that fit inline in the 7-byte payload (one byte is the tag).
74const MAX_INLINE_BYTES: usize = 7;
75
76impl Value {
77    /// Encode as a packed `u64` for column storage.
78    ///
79    /// The top byte (byte 7 in little-endian) is a type tag; the remaining
80    /// 7 bytes carry the payload.  This allows `from_u64` to reconstruct the
81    /// correct variant at read time (SPA-169).
82    ///
83    /// For `Bytes` values that exceed 7 bytes, this method only encodes the
84    /// first 7 bytes inline.  Callers that need full overflow support must use
85    /// [`NodeStore::encode_value`] instead, which writes long strings to the
86    /// heap and returns an overflow-tagged u64 (SPA-212).
87    ///
88    /// # Int64 range
89    /// Only the lower 56 bits of the integer are stored.  This covers all
90    /// practical node IDs and numeric property values; very large i64 values
91    /// (> 2^55 or < -2^55) would be truncated.  Full 64-bit range is deferred
92    /// to a later overflow encoding.
93    pub fn to_u64(&self) -> u64 {
94        match self {
95            Value::Int64(v) => {
96                // Top byte = TAG_INT64 (0x00); lower 7 bytes = lower 56 bits of v.
97                // For TAG_INT64 = 0x00 this is just the value masked to 56 bits,
98                // which is a no-op for any i64 whose top byte is already 0x00.
99                let payload = (*v as u64) & 0x00FF_FFFF_FFFF_FFFF;
100                // Tag byte goes into byte 7 (the most significant byte in LE).
101                payload | ((TAG_INT64 as u64) << 56)
102            }
103            Value::Bytes(b) => {
104                let mut arr = [0u8; 8];
105                arr[7] = TAG_BYTES; // type tag in top byte
106                let len = b.len().min(MAX_INLINE_BYTES);
107                arr[..len].copy_from_slice(&b[..len]);
108                u64::from_le_bytes(arr)
109            }
110            Value::Float(_) => {
111                // Float values require heap storage — callers must use
112                // NodeStore::encode_value instead of Value::to_u64.
113                panic!("Value::Float cannot be inline-encoded; use NodeStore::encode_value");
114            }
115        }
116    }
117
118    /// Reconstruct a `Value` from a stored `u64`, using the top byte as a
119    /// type tag (SPA-169).
120    ///
121    /// Only handles inline encodings (`TAG_INT64` and `TAG_BYTES`).
122    /// For overflow strings (`TAG_BYTES_OVERFLOW`), use [`NodeStore::decode_raw_value`]
123    /// which has access to the string heap (SPA-212).
124    pub fn from_u64(v: u64) -> Self {
125        let bytes = v.to_le_bytes(); // bytes[7] = top byte = tag
126        match bytes[7] {
127            TAG_BYTES => {
128                // Inline string: bytes[0..7] hold the data; strip trailing zeros.
129                let data: Vec<u8> = bytes[..7].iter().copied().take_while(|&b| b != 0).collect();
130                Value::Bytes(data)
131            }
132            _ => {
133                // TAG_INT64 (0x00) or any unrecognised tag → Int64.
134                // Sign-extend from 56 bits: shift left 8 to bring bit 55 into
135                // sign position, then arithmetic shift right 8.
136                let shifted = (v << 8) as i64;
137                Value::Int64(shifted >> 8)
138            }
139        }
140    }
141
142    /// Reconstruct an `Int64` value from a stored `u64`.
143    ///
144    /// Preserved for callers that know the column type is always Int64 (e.g.
145    /// pre-SPA-169 paths).  New code should prefer `from_u64`.
146    pub fn int64_from_u64(v: u64) -> Self {
147        Value::Int64(v as i64)
148    }
149}
150
151// ── NodeStore ─────────────────────────────────────────────────────────────────
152
153/// Persistent node property store rooted at a database directory.
154///
155/// On-disk layout:
156/// ```text
157/// {root}/nodes/{label_id}/hwm.bin            — high-water mark (u64 LE)
158/// {root}/nodes/{label_id}/col_{col_id}.bin   — flat u64 column array
159/// {root}/strings.bin                         — overflow string heap (SPA-212)
160/// ```
161///
162/// The overflow heap is an append-only byte file.  Each entry is a raw byte
163/// sequence (no length prefix); the offset and length are encoded into the
164/// `TAG_BYTES_OVERFLOW` u64 stored in the column file.
165pub struct NodeStore {
166    root: PathBuf,
167    /// In-memory high-water marks per label.  Loaded lazily from disk.
168    hwm: HashMap<u32, u64>,
169    /// Labels whose in-memory HWM has been advanced but not yet persisted to
170    /// `hwm.bin`.  Flushed atomically by [`flush_hwms`] at transaction commit.
171    hwm_dirty: HashSet<u32>,
172}
173
174impl NodeStore {
175    /// Open (or create) a node store rooted at `db_root`.
176    pub fn open(db_root: &Path) -> Result<Self> {
177        Ok(NodeStore {
178            root: db_root.to_path_buf(),
179            hwm: HashMap::new(),
180            hwm_dirty: HashSet::new(),
181        })
182    }
183
184    // ── Internal helpers ──────────────────────────────────────────────────────
185
186    fn label_dir(&self, label_id: u32) -> PathBuf {
187        self.root.join("nodes").join(label_id.to_string())
188    }
189
190    fn hwm_path(&self, label_id: u32) -> PathBuf {
191        self.label_dir(label_id).join("hwm.bin")
192    }
193
194    fn col_path(&self, label_id: u32, col_id: u32) -> PathBuf {
195        self.label_dir(label_id).join(format!("col_{col_id}.bin"))
196    }
197
198    /// Path to the null-bitmap sidecar for a column (SPA-207).
199    ///
200    /// Bit N = 1 means slot N has a real value (present/non-null).
201    /// Bit N = 0 means slot N was zero-padded (absent/null).
202    fn null_bitmap_path(&self, label_id: u32, col_id: u32) -> PathBuf {
203        self.label_dir(label_id)
204            .join(format!("col_{col_id}_null.bin"))
205    }
206
207    /// Mark slot `slot` as present (has a real value) in the null bitmap (SPA-207).
208    fn set_null_bit(&self, label_id: u32, col_id: u32, slot: u32) -> Result<()> {
209        let path = self.null_bitmap_path(label_id, col_id);
210        if let Some(parent) = path.parent() {
211            fs::create_dir_all(parent).map_err(Error::Io)?;
212        }
213        let byte_idx = (slot / 8) as usize;
214        let bit_idx = slot % 8;
215        let mut bits = if path.exists() {
216            fs::read(&path).map_err(Error::Io)?
217        } else {
218            vec![]
219        };
220        if bits.len() <= byte_idx {
221            bits.resize(byte_idx + 1, 0);
222        }
223        bits[byte_idx] |= 1 << bit_idx;
224        fs::write(&path, &bits).map_err(Error::Io)
225    }
226
227    /// Returns `true` if slot `slot` has a real value (present/non-null) (SPA-207).
228    ///
229    /// Backward-compatible: if no bitmap file exists (old data written before
230    /// the null-bitmap fix), every slot is treated as present.
231    fn get_null_bit(&self, label_id: u32, col_id: u32, slot: u32) -> Result<bool> {
232        let path = self.null_bitmap_path(label_id, col_id);
233        if !path.exists() {
234            // No bitmap file → backward-compatible: treat all slots as present.
235            return Ok(true);
236        }
237        let bits = fs::read(&path).map_err(Error::Io)?;
238        let byte_idx = (slot / 8) as usize;
239        if byte_idx >= bits.len() {
240            return Ok(false);
241        }
242        Ok((bits[byte_idx] >> (slot % 8)) & 1 == 1)
243    }
244
245    /// Path to the overflow string heap (shared across all labels).
246    fn strings_bin_path(&self) -> PathBuf {
247        self.root.join("strings.bin")
248    }
249
250    // ── Overflow string heap (SPA-212) ────────────────────────────────────────
251
252    /// Append `bytes` to the overflow string heap and return an
253    /// `TAG_BYTES_OVERFLOW`-tagged `u64` encoding the (offset, len) pair.
254    ///
255    /// Layout of the returned `u64` (little-endian bytes):
256    ///   bytes[0..5] = heap byte offset as u40 LE  (max ~1 TiB)
257    ///   bytes[5..7] = byte length as u16 LE       (max 65 535 bytes)
258    ///   bytes[7]    = `TAG_BYTES_OVERFLOW` (0x02)
259    fn append_to_string_heap(&self, bytes: &[u8]) -> Result<u64> {
260        use std::io::{Seek, SeekFrom, Write};
261        let path = self.strings_bin_path();
262        let mut file = fs::OpenOptions::new()
263            .create(true)
264            .truncate(false)
265            .append(false)
266            .read(true)
267            .write(true)
268            .open(&path)
269            .map_err(Error::Io)?;
270
271        // The heap offset is the current end of the file.
272        let offset = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
273        file.write_all(bytes).map_err(Error::Io)?;
274
275        // Encode (offset, len) into a 7-byte payload.
276        let len = bytes.len() as u64;
277        debug_assert!(
278            offset <= 0x00FF_FFFF_FFFF_u64,
279            "string heap too large for 5-byte offset"
280        );
281        debug_assert!(len <= 0xFFFF, "string longer than 65535 bytes");
282
283        let mut arr = [0u8; 8];
284        // Store offset in bytes[0..5] (40-bit LE).
285        arr[0] = offset as u8;
286        arr[1] = (offset >> 8) as u8;
287        arr[2] = (offset >> 16) as u8;
288        arr[3] = (offset >> 24) as u8;
289        arr[4] = (offset >> 32) as u8;
290        // Store len in bytes[5..7] (16-bit LE).
291        arr[5] = len as u8;
292        arr[6] = (len >> 8) as u8;
293        // Tag byte.
294        arr[7] = TAG_BYTES_OVERFLOW;
295        Ok(u64::from_le_bytes(arr))
296    }
297
298    /// Read string bytes from the overflow heap given an `TAG_BYTES_OVERFLOW`
299    /// tagged `u64` produced by [`append_to_string_heap`].
300    ///
301    /// Uses `seek + read_exact` to load only the `len` bytes at `offset`,
302    /// avoiding a full file load regardless of `strings.bin` size (SPA-208).
303    fn read_from_string_heap(&self, tagged: u64) -> Result<Vec<u8>> {
304        use std::io::{Read, Seek, SeekFrom};
305
306        let arr = tagged.to_le_bytes();
307        debug_assert_eq!(arr[7], TAG_BYTES_OVERFLOW, "not an overflow pointer");
308
309        // Decode (offset, len).
310        let offset = arr[0] as u64
311            | ((arr[1] as u64) << 8)
312            | ((arr[2] as u64) << 16)
313            | ((arr[3] as u64) << 24)
314            | ((arr[4] as u64) << 32);
315        let len = arr[5] as usize | ((arr[6] as usize) << 8);
316
317        let path = self.strings_bin_path();
318        let mut file = fs::File::open(&path).map_err(Error::Io)?;
319        file.seek(SeekFrom::Start(offset)).map_err(|e| {
320            Error::Corruption(format!("string heap seek failed (offset={offset}): {e}"))
321        })?;
322        let mut buf = vec![0u8; len];
323        file.read_exact(&mut buf).map_err(|e| {
324            Error::Corruption(format!(
325                "string heap too short: need {} bytes at offset {offset}: {e}",
326                len
327            ))
328        })?;
329        Ok(buf)
330    }
331
332    // ── Value encode / decode with overflow support ───────────────────────────
333
334    /// Encode a `Value` for column storage, writing long `Bytes` strings to
335    /// the overflow heap (SPA-212).
336    ///
337    /// - `Int64`          → identical to `Value::to_u64()`.
338    /// - `Bytes` ≤ 7 B    → inline `TAG_BYTES` encoding, identical to `Value::to_u64()`.
339    /// - `Bytes` > 7 B    → appended to `strings.bin`; returns `TAG_BYTES_OVERFLOW` u64.
340    /// - `Float`          → 8 raw IEEE-754 bytes appended to `strings.bin`;
341    ///   returns a `TAG_FLOAT` u64 so all 64 float bits are preserved (SPA-267).
342    pub fn encode_value(&self, val: &Value) -> Result<u64> {
343        match val {
344            Value::Int64(_) => Ok(val.to_u64()),
345            Value::Bytes(b) if b.len() <= MAX_INLINE_BYTES => Ok(val.to_u64()),
346            Value::Bytes(b) => self.append_to_string_heap(b),
347            // SPA-267: store all 8 float bytes in the heap so no bits are masked.
348            // The heap pointer uses the same (offset: u40, len: u16) layout as
349            // TAG_BYTES_OVERFLOW but with TAG_FLOAT in byte 7.
350            Value::Float(f) => {
351                let bits = f.to_bits().to_le_bytes();
352                let heap_tagged = self.append_to_string_heap(&bits)?;
353                // Replace the TAG_BYTES_OVERFLOW tag byte with TAG_FLOAT.
354                let payload = heap_tagged & 0x00FF_FFFF_FFFF_FFFF;
355                Ok((TAG_FLOAT as u64) << 56 | payload)
356            }
357        }
358    }
359
360    /// Decode a raw `u64` column value back to a `Value`, reading the
361    /// overflow string heap when the tag is `TAG_BYTES_OVERFLOW` or `TAG_FLOAT` (SPA-212, SPA-267).
362    ///
363    /// Handles all four tags:
364    /// - `TAG_INT64`          → `Value::Int64`
365    /// - `TAG_BYTES`          → `Value::Bytes` (inline, ≤ 7 bytes)
366    /// - `TAG_BYTES_OVERFLOW` → `Value::Bytes` (from heap)
367    /// - `TAG_FLOAT`          → `Value::Float` (8 raw IEEE-754 bytes from heap)
368    pub fn decode_raw_value(&self, raw: u64) -> Value {
369        let tag = (raw >> 56) as u8;
370        match tag {
371            TAG_BYTES_OVERFLOW => match self.read_from_string_heap(raw) {
372                Ok(bytes) => Value::Bytes(bytes),
373                Err(e) => {
374                    // Corruption fallback: return empty bytes and log.
375                    eprintln!(
376                        "WARN: failed to read overflow string from heap (raw={raw:#018x}): {e}"
377                    );
378                    Value::Bytes(Vec::new())
379                }
380            },
381            // SPA-267: float values are stored as 8-byte IEEE-754 blobs in the heap.
382            // Reconstruct the heap pointer by swapping TAG_FLOAT → TAG_BYTES_OVERFLOW
383            // so read_from_string_heap can locate the bytes.
384            TAG_FLOAT => {
385                let payload = raw & 0x00FF_FFFF_FFFF_FFFF;
386                let heap_tagged = (TAG_BYTES_OVERFLOW as u64) << 56 | payload;
387                match self.read_from_string_heap(heap_tagged) {
388                    Ok(bytes) if bytes.len() == 8 => {
389                        let arr: [u8; 8] = bytes.try_into().unwrap();
390                        Value::Float(f64::from_bits(u64::from_le_bytes(arr)))
391                    }
392                    Ok(bytes) => {
393                        eprintln!(
394                            "WARN: float heap blob has unexpected length {} (raw={raw:#018x})",
395                            bytes.len()
396                        );
397                        Value::Float(f64::NAN)
398                    }
399                    Err(e) => {
400                        eprintln!("WARN: failed to read float from heap (raw={raw:#018x}): {e}");
401                        Value::Float(f64::NAN)
402                    }
403                }
404            }
405            _ => Value::from_u64(raw),
406        }
407    }
408
409    /// Check whether a raw stored `u64` encodes a string equal to `s`.
410    ///
411    /// Handles both inline (`TAG_BYTES`) and overflow (`TAG_BYTES_OVERFLOW`)
412    /// encodings (SPA-212).  Used by WHERE-clause and prop-filter comparison.
413    pub fn raw_str_matches(&self, raw: u64, s: &str) -> bool {
414        let tag = (raw >> 56) as u8;
415        match tag {
416            TAG_BYTES => {
417                // Fast inline comparison: encode s the same way and compare u64s.
418                raw == Value::Bytes(s.as_bytes().to_vec()).to_u64()
419            }
420            TAG_BYTES_OVERFLOW => {
421                // Overflow: read from heap and compare bytes.
422                match self.read_from_string_heap(raw) {
423                    Ok(bytes) => bytes == s.as_bytes(),
424                    Err(_) => false,
425                }
426            }
427            _ => false, // INT64 or unknown — not a string
428        }
429    }
430
431    /// Read the high-water mark for `label_id` from disk (or return 0).
432    ///
433    /// Recovery path (SPA-211): if `hwm.bin` is missing or corrupt but
434    /// `hwm.bin.tmp` exists (leftover from a previous crashed write), we
435    /// promote the tmp file to `hwm.bin` and use its value.
436    fn load_hwm(&self, label_id: u32) -> Result<u64> {
437        let path = self.hwm_path(label_id);
438        let tmp_path = self.hwm_tmp_path(label_id);
439
440        // Try to read the canonical file first.
441        let try_read = |p: &std::path::Path| -> Option<u64> {
442            let bytes = fs::read(p).ok()?;
443            if bytes.len() < 8 {
444                return None;
445            }
446            Some(u64::from_le_bytes(bytes[..8].try_into().unwrap()))
447        };
448
449        if path.exists() {
450            match try_read(&path) {
451                Some(v) => return Ok(v),
452                None => {
453                    // hwm.bin exists but is corrupt — fall through to tmp recovery.
454                }
455            }
456        }
457
458        // hwm.bin is absent or unreadable.  Check for a tmp leftover.
459        if tmp_path.exists() {
460            if let Some(v) = try_read(&tmp_path) {
461                // Promote: atomically rename tmp → canonical so the next open
462                // is clean even if we crash again immediately here.
463                let _ = fs::rename(&tmp_path, &path);
464                return Ok(v);
465            }
466        }
467
468        // Last resort: infer the HWM from the sizes of the column files.
469        //
470        // Each `col_{n}.bin` is a flat array of 8-byte u64 LE values, one per
471        // slot.  The number of slots written equals `file_len / 8`.  If
472        // `hwm.bin` is corrupt we can reconstruct the HWM as the maximum slot
473        // count across all column files for this label.
474        {
475            let inferred = self.infer_hwm_from_cols(label_id);
476            if inferred > 0 {
477                // Persist the recovered HWM so the next open is clean.
478                let _ = self.save_hwm(label_id, inferred);
479                return Ok(inferred);
480            }
481        }
482
483        // No usable file at all — fresh label, HWM is 0.
484        Ok(0)
485    }
486
487    /// Infer the high-water mark for `label_id` from the sizes of the column
488    /// files on disk.  Returns 0 if no column files exist or none are readable.
489    ///
490    /// Each `col_{n}.bin` stores one u64 per slot, so `file_len / 8` gives the
491    /// slot count.  We return the maximum over all columns.
492    fn infer_hwm_from_cols(&self, label_id: u32) -> u64 {
493        let dir = self.label_dir(label_id);
494        let read_dir = match fs::read_dir(&dir) {
495            Ok(rd) => rd,
496            Err(_) => return 0,
497        };
498        read_dir
499            .flatten()
500            .filter_map(|entry| {
501                let name = entry.file_name();
502                let name_str = name.to_string_lossy().into_owned();
503                // Only consider col_{n}.bin files.
504                name_str.strip_prefix("col_")?.strip_suffix(".bin")?;
505                let meta = entry.metadata().ok()?;
506                Some(meta.len() / 8)
507            })
508            .max()
509            .unwrap_or(0)
510    }
511
512    /// Write the high-water mark for `label_id` to disk atomically.
513    ///
514    /// Strategy (SPA-211): write to `hwm.bin.tmp`, fsync, then rename to
515    /// `hwm.bin`.  On POSIX, `rename(2)` is atomic, so a crash at any point
516    /// leaves either the old `hwm.bin` intact or the fully-written new one.
517    fn save_hwm(&self, label_id: u32, hwm: u64) -> Result<()> {
518        use std::io::Write as _;
519
520        let path = self.hwm_path(label_id);
521        let tmp_path = self.hwm_tmp_path(label_id);
522
523        if let Some(parent) = path.parent() {
524            fs::create_dir_all(parent).map_err(Error::Io)?;
525        }
526
527        // Write to the tmp file and fsync before renaming.
528        {
529            let mut file = fs::File::create(&tmp_path).map_err(Error::Io)?;
530            file.write_all(&hwm.to_le_bytes()).map_err(Error::Io)?;
531            file.sync_all().map_err(Error::Io)?;
532        }
533
534        // Atomic rename: on success the old hwm.bin is replaced in one syscall.
535        fs::rename(&tmp_path, &path).map_err(Error::Io)
536    }
537
538    fn hwm_tmp_path(&self, label_id: u32) -> PathBuf {
539        self.label_dir(label_id).join("hwm.bin.tmp")
540    }
541
542    /// Persist all dirty in-memory HWMs to disk atomically.
543    ///
544    /// Called **once per transaction commit** rather than once per node creation,
545    /// so that bulk imports do not incur one fsync per node (SPA-217 regression fix).
546    ///
547    /// Each dirty label's HWM is written via the same tmp+fsync+rename strategy
548    /// used by [`save_hwm`], preserving the SPA-211 crash-safety guarantee.
549    /// After all writes succeed the dirty set is cleared.
550    pub fn flush_hwms(&mut self) -> Result<()> {
551        let dirty: Vec<u32> = self.hwm_dirty.iter().copied().collect();
552        for label_id in dirty {
553            let hwm = match self.hwm.get(&label_id) {
554                Some(&v) => v,
555                None => continue,
556            };
557            self.save_hwm(label_id, hwm)?;
558        }
559        self.hwm_dirty.clear();
560        Ok(())
561    }
562
563    /// Append a `u64` value to a column file.
564    fn append_col(&self, label_id: u32, col_id: u32, slot: u32, value: u64) -> Result<()> {
565        use std::io::{Seek, SeekFrom, Write};
566        let path = self.col_path(label_id, col_id);
567        if let Some(parent) = path.parent() {
568            fs::create_dir_all(parent).map_err(Error::Io)?;
569        }
570        // Open without truncation so we can inspect the current length.
571        let mut file = fs::OpenOptions::new()
572            .create(true)
573            .truncate(false)
574            .read(true)
575            .write(true)
576            .open(&path)
577            .map_err(Error::Io)?;
578
579        // Pad with zeros for any slots that were skipped (sparse write pattern).
580        // Without padding, a later slot's value would be written at offset 0,
581        // causing earlier slots to incorrectly read that value.
582        let existing_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
583        let expected_offset = slot as u64 * 8;
584        if existing_len < expected_offset {
585            file.seek(SeekFrom::Start(existing_len))
586                .map_err(Error::Io)?;
587            const CHUNK: usize = 65536;
588            let zeros = [0u8; CHUNK];
589            let mut remaining = (expected_offset - existing_len) as usize;
590            while remaining > 0 {
591                let n = remaining.min(CHUNK);
592                file.write_all(&zeros[..n]).map_err(Error::Io)?;
593                remaining -= n;
594            }
595        }
596
597        // Seek to the correct slot position and write the value.
598        file.seek(SeekFrom::Start(expected_offset))
599            .map_err(Error::Io)?;
600        file.write_all(&value.to_le_bytes()).map_err(Error::Io)
601    }
602
603    /// Read the `u64` stored at `slot` in the given column file.
604    ///
605    /// Returns `Ok(0)` when the column file does not exist yet — a missing file
606    /// means no value has ever been written for this `(label_id, col_id)` pair,
607    /// which is represented as the zero bit-pattern (SPA-166).
608    fn read_col_slot(&self, label_id: u32, col_id: u32, slot: u32) -> Result<u64> {
609        let path = self.col_path(label_id, col_id);
610        let bytes = match fs::read(&path) {
611            Ok(b) => b,
612            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
613            Err(e) => return Err(Error::Io(e)),
614        };
615        let offset = slot as usize * 8;
616        if bytes.len() < offset + 8 {
617            return Err(Error::NotFound);
618        }
619        Ok(u64::from_le_bytes(
620            bytes[offset..offset + 8].try_into().unwrap(),
621        ))
622    }
623
624    // ── Public API ────────────────────────────────────────────────────────────
625
626    /// Return the high-water mark (slot count) for a label.
627    ///
628    /// Returns `0` if no nodes have been created for that label yet.
629    pub fn hwm_for_label(&self, label_id: u32) -> Result<u64> {
630        if let Some(&h) = self.hwm.get(&label_id) {
631            return Ok(h);
632        }
633        self.load_hwm(label_id)
634    }
635
636    /// Discover all column IDs that currently exist on disk for `label_id`.
637    ///
638    /// Scans the label directory for `col_{id}.bin` files and returns the
639    /// parsed `col_id` values.  Used by `create_node` to zero-pad columns
640    /// that are not supplied for a new node (SPA-187).
641    ///
642    /// Returns `Err` when the directory exists but cannot be read (e.g.
643    /// permissions failure or I/O error).  A missing directory is not an
644    /// error — it simply means no nodes of this label have been created yet.
645    pub fn col_ids_for_label(&self, label_id: u32) -> Result<Vec<u32>> {
646        self.existing_col_ids(label_id)
647    }
648
649    fn existing_col_ids(&self, label_id: u32) -> Result<Vec<u32>> {
650        let dir = self.label_dir(label_id);
651        let read_dir = match fs::read_dir(&dir) {
652            Ok(rd) => rd,
653            // Directory does not exist yet → no columns on disk.
654            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
655            Err(e) => return Err(Error::Io(e)),
656        };
657        let mut ids: Vec<u32> = read_dir
658            .flatten()
659            .filter_map(|entry| {
660                let name = entry.file_name();
661                let name_str = name.to_string_lossy().into_owned();
662                // Match "col_{col_id}.bin" filenames.
663                let id_str = name_str.strip_prefix("col_")?.strip_suffix(".bin")?;
664                id_str.parse::<u32>().ok()
665            })
666            .collect();
667        ids.sort_unstable();
668        Ok(ids)
669    }
670
671    /// Return the **on-disk** high-water mark for a label, bypassing any
672    /// in-memory advances made by `peek_next_slot`.
673    ///
674    /// Used by [`WriteTx::merge_node`] to limit the disk scan to only slots
675    /// that have actually been persisted.
676    pub fn disk_hwm_for_label(&self, label_id: u32) -> Result<u64> {
677        self.load_hwm(label_id)
678    }
679
680    /// Reserve the slot index that the *next* `create_node` call will use for
681    /// `label_id`, advancing the in-memory HWM so that the slot is not
682    /// assigned again within the same [`NodeStore`] instance.
683    ///
684    /// This is used by [`WriteTx::create_node`] to pre-compute a [`NodeId`]
685    /// before the actual disk write, so the ID can be returned to the caller
686    /// while the write is deferred until commit (SPA-181).
687    ///
688    /// The on-disk HWM is **not** updated here; it is updated when the
689    /// buffered `NodeCreate` operation is applied in `commit()`.
690    pub fn peek_next_slot(&mut self, label_id: u32) -> Result<u32> {
691        // Load from disk if not cached yet.
692        if !self.hwm.contains_key(&label_id) {
693            let h = self.load_hwm(label_id)?;
694            self.hwm.insert(label_id, h);
695        }
696        let h = *self.hwm.get(&label_id).unwrap();
697        // Advance the in-memory HWM so a subsequent peek returns the next slot.
698        self.hwm.insert(label_id, h + 1);
699        Ok(h as u32)
700    }
701
702    /// Write a node at a pre-reserved `slot` (SPA-181 commit path).
703    ///
704    /// Like [`create_node`] but uses the caller-specified `slot` index instead
705    /// of deriving it from the HWM.  Used by [`WriteTx::commit`] to flush
706    /// buffered node-create operations in the exact order they were issued,
707    /// with slots that were already pre-allocated by [`peek_next_slot`].
708    ///
709    /// Advances the on-disk HWM to `slot + 1` (or higher if already past that).
710    pub fn create_node_at_slot(
711        &mut self,
712        label_id: u32,
713        slot: u32,
714        props: &[(u32, Value)],
715    ) -> Result<NodeId> {
716        // Snapshot original column sizes for rollback on partial failure.
717        let original_sizes: Vec<(u32, u64)> = props
718            .iter()
719            .map(|&(col_id, _)| {
720                let path = self.col_path(label_id, col_id);
721                let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
722                (col_id, size)
723            })
724            .collect();
725
726        let write_result = (|| {
727            for &(col_id, ref val) in props {
728                self.append_col(label_id, col_id, slot, self.encode_value(val)?)?;
729                // Mark this slot as present in the null bitmap (SPA-207).
730                self.set_null_bit(label_id, col_id, slot)?;
731            }
732            Ok::<(), sparrowdb_common::Error>(())
733        })();
734
735        if let Err(e) = write_result {
736            for (col_id, original_size) in &original_sizes {
737                let path = self.col_path(label_id, *col_id);
738                if path.exists() {
739                    if let Err(rollback_err) = fs::OpenOptions::new()
740                        .write(true)
741                        .open(&path)
742                        .and_then(|f| f.set_len(*original_size))
743                    {
744                        eprintln!(
745                            "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
746                            path.display(),
747                            original_size,
748                            rollback_err
749                        );
750                    }
751                }
752            }
753            return Err(e);
754        }
755
756        // Advance the in-memory HWM to at least slot + 1 and mark the label
757        // dirty so that flush_hwms() will persist it at commit boundary.
758        //
759        // We do NOT call save_hwm here.  The caller (WriteTx::commit) is
760        // responsible for calling flush_hwms() once after all PendingOp::NodeCreate
761        // entries have been applied.  This avoids one fsync per node during bulk
762        // imports (SPA-217 regression fix) while preserving crash-safety: the WAL
763        // record is already durable at this point, so recovery can reconstruct the
764        // HWM if we crash before flush_hwms() completes.
765        //
766        // NOTE: peek_next_slot() may have already advanced self.hwm to slot+1,
767        // so new_hwm > mem_hwm might be false.  We mark the label dirty
768        // unconditionally so that flush_hwms() always writes through to disk.
769        let new_hwm = slot as u64 + 1;
770        let mem_hwm = self.hwm.get(&label_id).copied().unwrap_or(0);
771        if new_hwm > mem_hwm {
772            self.hwm.insert(label_id, new_hwm);
773        }
774        // Always mark dirty — flush_hwms() must write the post-commit HWM even
775        // if peek_next_slot already set mem HWM == new_hwm.
776        self.hwm_dirty.insert(label_id);
777
778        Ok(NodeId((label_id as u64) << 32 | slot as u64))
779    }
780
781    /// Batch-write column data for multiple nodes created in a single transaction
782    /// commit (SPA-212 write-amplification fix).
783    ///
784    /// # Why this exists
785    ///
786    /// The naive path calls `create_node_at_slot` per node, which opens and
787    /// closes every column file once per node.  For a transaction that creates
788    /// `N` nodes each with `C` columns, that is `O(N × C)` file-open/close
789    /// syscalls.
790    ///
791    /// This method instead:
792    /// 1. Accepts pre-encoded `(label_id, col_id, slot, raw_value, is_present)`
793    ///    tuples from the caller (value encoding happens in `commit()` before
794    ///    the call).
795    /// 2. Sorts by `(label_id, col_id)` so all writes to the same column file
796    ///    are contiguous.
797    /// 3. Opens each `(label_id, col_id)` file exactly **once**, writes all
798    ///    slots for that column, then closes it — reducing file opens to
799    ///    `O(labels × cols)`.
800    /// 4. Updates each null-bitmap file once per `(label_id, col_id)` group.
801    ///
802    /// HWM advances are applied for every `(label_id, slot)` in `node_slots`,
803    /// exactly as `create_node_at_slot` would do them.  `node_slots` must
804    /// include **all** created nodes — including those with zero properties —
805    /// so that the HWM is advanced even for property-less nodes.
806    ///
807    /// # Rollback
808    ///
809    /// On I/O failure the method truncates every file that was opened back to
810    /// its pre-call size, matching the rollback contract of
811    /// `create_node_at_slot`.
812    pub fn batch_write_node_creates(
813        &mut self,
814        // (label_id, col_id, slot, raw_u64, is_present)
815        mut writes: Vec<(u32, u32, u32, u64, bool)>,
816        // All (label_id, slot) pairs for every created node, including those
817        // with zero properties.
818        node_slots: &[(u32, u32)],
819    ) -> Result<()> {
820        use std::io::{Seek, SeekFrom, Write};
821
822        if writes.is_empty() && node_slots.is_empty() {
823            return Ok(());
824        }
825
826        // Sort by (label_id, col_id, slot) so all writes to the same file are
827        // contiguous.
828        writes.sort_unstable_by_key(|&(lid, cid, slot, _, _)| (lid, cid, slot));
829
830        // Snapshot original sizes for rollback.  We only need one entry per
831        // (label_id, col_id) pair.
832        let mut original_sizes: Vec<(u32, u32, PathBuf, u64)> = Vec::new();
833        {
834            let mut prev_key: Option<(u32, u32)> = None;
835            for &(lid, cid, _, _, _) in &writes {
836                let key = (lid, cid);
837                if prev_key == Some(key) {
838                    continue;
839                }
840                prev_key = Some(key);
841                let path = self.col_path(lid, cid);
842                let original = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
843                original_sizes.push((lid, cid, path, original));
844            }
845        }
846        // Also snapshot null-bitmap files (one per col group).
847        let mut bitmap_originals: Vec<(u32, u32, PathBuf, u64)> = Vec::new();
848        {
849            let mut prev_key: Option<(u32, u32)> = None;
850            for &(lid, cid, _, _, is_present) in &writes {
851                if !is_present {
852                    continue;
853                }
854                let key = (lid, cid);
855                if prev_key == Some(key) {
856                    continue;
857                }
858                prev_key = Some(key);
859                let path = self.null_bitmap_path(lid, cid);
860                let original = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
861                bitmap_originals.push((lid, cid, path, original));
862            }
863        }
864
865        let write_result = (|| -> Result<()> {
866            let mut i = 0;
867            while i < writes.len() {
868                let (lid, cid, _, _, _) = writes[i];
869
870                // Find the end of this (label_id, col_id) group.
871                let group_start = i;
872                while i < writes.len() && writes[i].0 == lid && writes[i].1 == cid {
873                    i += 1;
874                }
875                let group = &writes[group_start..i];
876
877                // ── Column file ──────────────────────────────────────────────
878                let path = self.col_path(lid, cid);
879                if let Some(parent) = path.parent() {
880                    fs::create_dir_all(parent).map_err(Error::Io)?;
881                }
882                let mut file = fs::OpenOptions::new()
883                    .create(true)
884                    .truncate(false)
885                    .read(true)
886                    .write(true)
887                    .open(&path)
888                    .map_err(Error::Io)?;
889
890                let mut current_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
891
892                for &(_, _, slot, raw_value, _) in group {
893                    let expected_offset = slot as u64 * 8;
894                    // Pad with zeros for any skipped slots.
895                    if current_len < expected_offset {
896                        file.seek(SeekFrom::Start(current_len)).map_err(Error::Io)?;
897                        const CHUNK: usize = 65536;
898                        let zeros = [0u8; CHUNK];
899                        let mut remaining = (expected_offset - current_len) as usize;
900                        while remaining > 0 {
901                            let n = remaining.min(CHUNK);
902                            file.write_all(&zeros[..n]).map_err(Error::Io)?;
903                            remaining -= n;
904                        }
905                        current_len = expected_offset;
906                    }
907                    // Seek to exact slot and write.
908                    file.seek(SeekFrom::Start(expected_offset))
909                        .map_err(Error::Io)?;
910                    file.write_all(&raw_value.to_le_bytes())
911                        .map_err(Error::Io)?;
912                    // Advance current_len to reflect what we wrote.
913                    let after = expected_offset + 8;
914                    if after > current_len {
915                        current_len = after;
916                    }
917                }
918
919                // ── Null bitmap (one read-modify-write per col group) ────────
920                let present_slots: Vec<u32> = group
921                    .iter()
922                    .filter(|&&(_, _, _, _, is_present)| is_present)
923                    .map(|&(_, _, slot, _, _)| slot)
924                    .collect();
925
926                if !present_slots.is_empty() {
927                    let bmap_path = self.null_bitmap_path(lid, cid);
928                    if let Some(parent) = bmap_path.parent() {
929                        fs::create_dir_all(parent).map_err(Error::Io)?;
930                    }
931                    let mut bits = if bmap_path.exists() {
932                        fs::read(&bmap_path).map_err(Error::Io)?
933                    } else {
934                        vec![]
935                    };
936                    for slot in present_slots {
937                        let byte_idx = (slot / 8) as usize;
938                        let bit_idx = slot % 8;
939                        if bits.len() <= byte_idx {
940                            bits.resize(byte_idx + 1, 0);
941                        }
942                        bits[byte_idx] |= 1 << bit_idx;
943                    }
944                    fs::write(&bmap_path, &bits).map_err(Error::Io)?;
945                }
946            }
947            Ok(())
948        })();
949
950        if let Err(e) = write_result {
951            // Roll back column files.
952            for (_, _, path, original_size) in &original_sizes {
953                if path.exists() {
954                    if let Err(rollback_err) = fs::OpenOptions::new()
955                        .write(true)
956                        .open(path)
957                        .and_then(|f| f.set_len(*original_size))
958                    {
959                        eprintln!(
960                            "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
961                            path.display(),
962                            original_size,
963                            rollback_err
964                        );
965                    }
966                }
967            }
968            // Roll back null bitmap files.
969            for (_, _, path, original_size) in &bitmap_originals {
970                if path.exists() {
971                    if let Err(rollback_err) = fs::OpenOptions::new()
972                        .write(true)
973                        .open(path)
974                        .and_then(|f| f.set_len(*original_size))
975                    {
976                        eprintln!(
977                            "CRITICAL: Failed to roll back null bitmap file {} to size {}: {}. Data may be corrupt.",
978                            path.display(),
979                            original_size,
980                            rollback_err
981                        );
982                    }
983                }
984            }
985            return Err(e);
986        }
987
988        // Advance HWMs using the explicit node_slots list so that nodes with
989        // zero properties also advance the HWM, matching the contract of
990        // create_node_at_slot.
991        for &(lid, slot) in node_slots {
992            let new_hwm = slot as u64 + 1;
993            let mem_hwm = self.hwm.get(&lid).copied().unwrap_or(0);
994            if new_hwm > mem_hwm {
995                self.hwm.insert(lid, new_hwm);
996            }
997            self.hwm_dirty.insert(lid);
998        }
999
1000        Ok(())
1001    }
1002
1003    /// Create a new node in `label_id` with the given properties.
1004    ///
1005    /// Returns the new [`NodeId`] packed as `(label_id << 32) | slot`.
1006    ///
1007    /// ## Slot alignment guarantee (SPA-187)
1008    ///
1009    /// Every column file for `label_id` must have exactly `node_count * 8`
1010    /// bytes so that slot N always refers to node N across all columns.  When
1011    /// a node is created without a value for an already-known column, that
1012    /// column file is zero-padded to `(slot + 1) * 8` bytes.  The zero
1013    /// sentinel is recognised by `read_col_slot_nullable` as "absent" and
1014    /// surfaces as `Value::Null` in query results.
1015    pub fn create_node(&mut self, label_id: u32, props: &[(u32, Value)]) -> Result<NodeId> {
1016        // Load or get cached hwm.
1017        let hwm = if let Some(h) = self.hwm.get(&label_id) {
1018            *h
1019        } else {
1020            let h = self.load_hwm(label_id)?;
1021            self.hwm.insert(label_id, h);
1022            h
1023        };
1024
1025        let slot = hwm as u32;
1026
1027        // Collect the set of col_ids supplied for this node.
1028        let supplied_col_ids: std::collections::HashSet<u32> =
1029            props.iter().map(|&(col_id, _)| col_id).collect();
1030
1031        // Discover all columns already on disk for this label.  Any that are
1032        // NOT in `supplied_col_ids` must be zero-padded so their slot count
1033        // stays in sync with the HWM (SPA-187).
1034        let existing_col_ids = self.existing_col_ids(label_id)?;
1035
1036        // Columns that need zero-padding: exist on disk but not in this node's props.
1037        let cols_to_zero_pad: Vec<u32> = existing_col_ids
1038            .iter()
1039            .copied()
1040            .filter(|col_id| !supplied_col_ids.contains(col_id))
1041            .collect();
1042
1043        // Build the full list of columns to touch for rollback tracking:
1044        // supplied columns + columns that need zero-padding.
1045        let all_col_ids_to_touch: Vec<u32> = supplied_col_ids
1046            .iter()
1047            .copied()
1048            .chain(cols_to_zero_pad.iter().copied())
1049            .collect();
1050
1051        // Snapshot the original size of each column file so we can roll back
1052        // on partial failure.  A column that does not yet exist has size 0.
1053        let original_sizes: Vec<(u32, u64)> = all_col_ids_to_touch
1054            .iter()
1055            .map(|&col_id| {
1056                let path = self.col_path(label_id, col_id);
1057                let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
1058                (col_id, size)
1059            })
1060            .collect();
1061
1062        // Write each property column.  For columns not in `props`, write a
1063        // zero-padded entry (the zero sentinel means "absent" / NULL).
1064        // On failure, roll back all columns that were already written to avoid
1065        // slot misalignment.
1066        let write_result = (|| {
1067            // Write supplied columns with their actual values.
1068            for &(col_id, ref val) in props {
1069                self.append_col(label_id, col_id, slot, self.encode_value(val)?)?;
1070                // Mark this slot as present in the null bitmap (SPA-207).
1071                self.set_null_bit(label_id, col_id, slot)?;
1072            }
1073            // Zero-pad existing columns that were NOT supplied for this node.
1074            // Do NOT set null bitmap bits for these — they remain absent/null.
1075            for &col_id in &cols_to_zero_pad {
1076                self.append_col(label_id, col_id, slot, 0u64)?;
1077            }
1078            Ok::<(), sparrowdb_common::Error>(())
1079        })();
1080
1081        if let Err(e) = write_result {
1082            // Truncate each column back to its original size.
1083            for (col_id, original_size) in &original_sizes {
1084                let path = self.col_path(label_id, *col_id);
1085                if path.exists() {
1086                    if let Err(rollback_err) = fs::OpenOptions::new()
1087                        .write(true)
1088                        .open(&path)
1089                        .and_then(|f| f.set_len(*original_size))
1090                    {
1091                        eprintln!(
1092                            "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
1093                            path.display(),
1094                            original_size,
1095                            rollback_err
1096                        );
1097                    }
1098                }
1099            }
1100            return Err(e);
1101        }
1102
1103        // Update hwm.
1104        let new_hwm = hwm + 1;
1105        self.save_hwm(label_id, new_hwm)?;
1106        *self.hwm.get_mut(&label_id).unwrap() = new_hwm;
1107
1108        // Pack node ID.
1109        let node_id = ((label_id as u64) << 32) | (slot as u64);
1110        Ok(NodeId(node_id))
1111    }
1112
1113    /// Write a deletion tombstone (`u64::MAX`) into `col_0.bin` for `node_id`.
1114    ///
1115    /// Creates `col_0.bin` (and its parent directory) if it does not exist,
1116    /// zero-padding all preceding slots.  This ensures that nodes which were
1117    /// created without any `col_0` property are still properly marked as deleted
1118    /// and become invisible to subsequent scans.
1119    ///
1120    /// Called from [`WriteTx::commit`] when flushing a buffered `NodeDelete`.
1121    pub fn tombstone_node(&self, node_id: NodeId) -> Result<()> {
1122        use std::io::{Seek, SeekFrom, Write};
1123        let label_id = (node_id.0 >> 32) as u32;
1124        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1125        let col0 = self.col_path(label_id, 0);
1126
1127        // Ensure the parent directory exists.
1128        if let Some(parent) = col0.parent() {
1129            fs::create_dir_all(parent).map_err(Error::Io)?;
1130        }
1131
1132        let mut f = fs::OpenOptions::new()
1133            .create(true)
1134            .truncate(false)
1135            .read(true)
1136            .write(true)
1137            .open(&col0)
1138            .map_err(Error::Io)?;
1139
1140        // Zero-pad any slots before `slot` that are not yet in the file.
1141        let needed_len = (slot as u64 + 1) * 8;
1142        let existing_len = f.seek(SeekFrom::End(0)).map_err(Error::Io)?;
1143        if existing_len < needed_len {
1144            let zeros = vec![0u8; (needed_len - existing_len) as usize];
1145            f.write_all(&zeros).map_err(Error::Io)?;
1146        }
1147
1148        // Seek to the slot and write the tombstone value.
1149        f.seek(SeekFrom::Start(slot as u64 * 8))
1150            .map_err(Error::Io)?;
1151        f.write_all(&u64::MAX.to_le_bytes()).map_err(Error::Io)
1152    }
1153
1154    /// Overwrite the value of a single column for an existing node.
1155    ///
1156    /// Seeks to the slot's offset within `col_{col_id}.bin` and writes the new
1157    /// 8-byte little-endian value in-place.  Returns `Err(NotFound)` if the
1158    /// slot does not exist yet.
1159    pub fn set_node_col(&self, node_id: NodeId, col_id: u32, value: &Value) -> Result<()> {
1160        use std::io::{Seek, SeekFrom, Write};
1161        let label_id = (node_id.0 >> 32) as u32;
1162        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1163        let path = self.col_path(label_id, col_id);
1164        let mut file = fs::OpenOptions::new()
1165            .write(true)
1166            .open(&path)
1167            .map_err(|_| Error::NotFound)?;
1168        let offset = slot as u64 * 8;
1169        file.seek(SeekFrom::Start(offset)).map_err(Error::Io)?;
1170        file.write_all(&self.encode_value(value)?.to_le_bytes())
1171            .map_err(Error::Io)
1172    }
1173
1174    /// Write or create a column value for a node, creating and zero-padding the
1175    /// column file if it does not yet exist.
1176    ///
1177    /// Unlike [`set_node_col`], this method creates the column file and fills all
1178    /// slots from 0 to `slot - 1` with zeros before writing the target value.
1179    /// This supports adding new property columns to existing nodes (Phase 7
1180    /// `set_property` semantics).
1181    pub fn upsert_node_col(&self, node_id: NodeId, col_id: u32, value: &Value) -> Result<()> {
1182        use std::io::{Seek, SeekFrom, Write};
1183        let label_id = (node_id.0 >> 32) as u32;
1184        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1185        let path = self.col_path(label_id, col_id);
1186
1187        // Ensure parent directory exists.
1188        if let Some(parent) = path.parent() {
1189            fs::create_dir_all(parent).map_err(Error::Io)?;
1190        }
1191
1192        // Open the file (create if absent), then pad with zeros up to and
1193        // including the target slot, and finally seek back to write the value.
1194        // We must NOT truncate: we only update a specific slot, not the whole file.
1195        let mut file = fs::OpenOptions::new()
1196            .create(true)
1197            .truncate(false)
1198            .read(true)
1199            .write(true)
1200            .open(&path)
1201            .map_err(Error::Io)?;
1202
1203        // How many bytes are already in the file?
1204        let existing_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
1205        let needed_len = (slot as u64 + 1) * 8;
1206        if existing_len < needed_len {
1207            // Extend file with zeros from existing_len to needed_len.
1208            // Write in fixed-size chunks to avoid a single large allocation
1209            // that could OOM when the node slot ID is very high.
1210            file.seek(SeekFrom::Start(existing_len))
1211                .map_err(Error::Io)?;
1212            const CHUNK: usize = 65536; // 64 KiB
1213            let zeros = [0u8; CHUNK];
1214            let mut remaining = (needed_len - existing_len) as usize;
1215            while remaining > 0 {
1216                let n = remaining.min(CHUNK);
1217                file.write_all(&zeros[..n]).map_err(Error::Io)?;
1218                remaining -= n;
1219            }
1220        }
1221
1222        // Seek to target slot and overwrite.
1223        file.seek(SeekFrom::Start(slot as u64 * 8))
1224            .map_err(Error::Io)?;
1225        file.write_all(&self.encode_value(value)?.to_le_bytes())
1226            .map_err(Error::Io)
1227    }
1228
1229    /// Retrieve all stored properties of a node.
1230    ///
1231    /// Returns `(col_id, raw_u64)` pairs in the order the columns were defined.
1232    /// The caller knows the schema (col IDs) from the catalog.
1233    pub fn get_node_raw(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, u64)>> {
1234        let label_id = (node_id.0 >> 32) as u32;
1235        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1236
1237        let mut result = Vec::with_capacity(col_ids.len());
1238        for &col_id in col_ids {
1239            let val = self.read_col_slot(label_id, col_id, slot)?;
1240            result.push((col_id, val));
1241        }
1242        Ok(result)
1243    }
1244
1245    /// Like [`get_node_raw`] but treats absent columns as `None` rather than
1246    /// propagating [`Error::NotFound`].
1247    ///
1248    /// A column is considered absent when:
1249    /// - Its column file does not exist (property never written for the label).
1250    /// - Its column file is shorter than `slot + 1` entries (sparse write —
1251    ///   an earlier node never wrote this column; a later node that did write it
1252    ///   padded the file, but this slot's value was never explicitly stored).
1253    ///
1254    /// This is the correct read path for IS NULL evaluation: absent properties
1255    /// must appear as `Value::Null`, not as an error or as integer 0.
1256    pub fn get_node_raw_nullable(
1257        &self,
1258        node_id: NodeId,
1259        col_ids: &[u32],
1260    ) -> Result<Vec<(u32, Option<u64>)>> {
1261        let label_id = (node_id.0 >> 32) as u32;
1262        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1263
1264        let mut result = Vec::with_capacity(col_ids.len());
1265        for &col_id in col_ids {
1266            let opt_val = self.read_col_slot_nullable(label_id, col_id, slot)?;
1267            result.push((col_id, opt_val));
1268        }
1269        Ok(result)
1270    }
1271
1272    /// Read a single column slot, returning `None` when the column was never
1273    /// written for this node (file absent or slot out of bounds / not set).
1274    ///
1275    /// Unlike [`read_col_slot`], this function distinguishes between:
1276    /// - Column file does not exist → `None` (property never set on any node).
1277    /// - Slot is beyond the file end → `None` (property not set on this node).
1278    /// - Null bitmap says slot is absent → `None` (slot was zero-padded for alignment).
1279    /// - Null bitmap says slot is present → `Some(raw)` (real value, may be 0 for Int64(0)).
1280    ///
1281    /// The null-bitmap sidecar (`col_{id}_null.bin`) is used to distinguish
1282    /// legitimately-stored 0 values (e.g. `Int64(0)`) from absent/zero-padded
1283    /// slots (SPA-207).  Backward compat: if no bitmap file exists, all slots
1284    /// within the file are treated as present.
1285    fn read_col_slot_nullable(&self, label_id: u32, col_id: u32, slot: u32) -> Result<Option<u64>> {
1286        let path = self.col_path(label_id, col_id);
1287        let bytes = match fs::read(&path) {
1288            Ok(b) => b,
1289            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
1290            Err(e) => return Err(Error::Io(e)),
1291        };
1292        let offset = slot as usize * 8;
1293        if bytes.len() < offset + 8 {
1294            return Ok(None);
1295        }
1296        // Use the null-bitmap sidecar to determine whether this slot has a real
1297        // value.  This replaces the old raw==0 sentinel which incorrectly treated
1298        // Int64(0) as absent (SPA-207).
1299        if !self.get_null_bit(label_id, col_id, slot)? {
1300            return Ok(None);
1301        }
1302        let raw = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
1303        Ok(Some(raw))
1304    }
1305
1306    /// Retrieve the typed property values for a node.
1307    ///
1308    /// Convenience wrapper over [`get_node_raw`] that decodes every raw `u64`
1309    /// back to a `Value`, reading the overflow string heap when needed (SPA-212).
1310    pub fn get_node(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, Value)>> {
1311        let raw = self.get_node_raw(node_id, col_ids)?;
1312        Ok(raw
1313            .into_iter()
1314            .map(|(col_id, v)| (col_id, self.decode_raw_value(v)))
1315            .collect())
1316    }
1317
1318    /// Read the entire contents of `col_{col_id}.bin` for `label_id` as a
1319    /// flat `Vec<u64>`.  Returns an empty vec when the file does not exist yet.
1320    ///
1321    /// This is used by [`crate::property_index::PropertyIndex::build`] to scan
1322    /// all slot values in one `fs::read` call rather than one `read_col_slot`
1323    /// per node, making index construction O(n) rather than O(n * syscall-overhead).
1324    pub fn read_col_all(&self, label_id: u32, col_id: u32) -> Result<Vec<u64>> {
1325        let path = self.col_path(label_id, col_id);
1326        let bytes = match fs::read(&path) {
1327            Ok(b) => b,
1328            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
1329            Err(e) => return Err(Error::Io(e)),
1330        };
1331        // Interpret the byte slice as a flat array of little-endian u64s.
1332        let count = bytes.len() / 8;
1333        let mut out = Vec::with_capacity(count);
1334        for i in 0..count {
1335            let offset = i * 8;
1336            let v = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
1337            out.push(v);
1338        }
1339        Ok(out)
1340    }
1341
1342    /// Selective sorted-slot read — O(K) seeks instead of O(N) reads.
1343    ///
1344    /// For each column, opens the file once and reads only the `slots` needed,
1345    /// in slot-ascending order (for sequential-ish I/O).  This is the hot path
1346    /// for hop queries where K neighbor slots ≪ N total nodes.
1347    ///
1348    /// `slots` **must** be pre-sorted ascending by the caller.
1349    /// Returns a `Vec` indexed parallel to `slots`; each inner `Vec` is indexed
1350    /// parallel to `col_ids`.  Out-of-range or missing-file slots return 0.
1351    ///
1352    /// SPA-200: replaces full O(N) column loads with O(K) per-column seeks.
1353    fn read_col_slots_sorted(
1354        &self,
1355        label_id: u32,
1356        slots: &[u32],
1357        col_ids: &[u32],
1358    ) -> Result<Vec<Vec<u64>>> {
1359        if slots.is_empty() || col_ids.is_empty() {
1360            // Return a row of empty vecs parallel to slots
1361            return Ok(slots.iter().map(|_| vec![0u64; col_ids.len()]).collect());
1362        }
1363
1364        // result[slot_idx][col_idx]
1365        let mut result: Vec<Vec<u64>> = slots.iter().map(|_| vec![0u64; col_ids.len()]).collect();
1366
1367        for (col_idx, &col_id) in col_ids.iter().enumerate() {
1368            let path = self.col_path(label_id, col_id);
1369            let mut file = match fs::File::open(&path) {
1370                Ok(f) => f,
1371                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1372                    // Column file doesn't exist — all slots stay 0
1373                    continue;
1374                }
1375                Err(e) => return Err(Error::Io(e)),
1376            };
1377
1378            let file_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
1379            // Reset to start for sequential reads
1380            file.seek(SeekFrom::Start(0)).map_err(Error::Io)?;
1381
1382            let mut buf = [0u8; 8];
1383            let mut current_pos: u64 = 0;
1384
1385            for (slot_idx, &slot) in slots.iter().enumerate() {
1386                let target_pos = slot as u64 * 8;
1387                if target_pos + 8 > file_len {
1388                    // Slot beyond end of file — leave as 0
1389                    continue;
1390                }
1391                if target_pos != current_pos {
1392                    file.seek(SeekFrom::Start(target_pos)).map_err(Error::Io)?;
1393                    current_pos = target_pos;
1394                }
1395                file.read_exact(&mut buf).map_err(Error::Io)?;
1396                current_pos += 8;
1397                result[slot_idx][col_idx] = u64::from_le_bytes(buf);
1398            }
1399        }
1400
1401        Ok(result)
1402    }
1403
1404    /// Batch-read multiple slots from multiple columns.
1405    ///
1406    /// Chooses between two strategies based on the K/N ratio:
1407    /// - **Sorted-slot reads** (SPA-200): when K ≪ N, seeks to each slot
1408    ///   offset — O(K × C) I/O instead of O(N × C).  Slots are sorted
1409    ///   ascending before the read so seeks are sequential.
1410    /// - **Full-column load**: when K is close to N (>50% of column), reading
1411    ///   the whole file is cheaper than many random seeks.
1412    ///
1413    /// All `slots` must belong to `label_id`.
1414    /// Returns a `Vec` indexed parallel to `slots`; inner `Vec` indexed
1415    /// parallel to `col_ids`.  Missing/out-of-range slots return 0.
1416    pub fn batch_read_node_props(
1417        &self,
1418        label_id: u32,
1419        slots: &[u32],
1420        col_ids: &[u32],
1421    ) -> Result<Vec<Vec<u64>>> {
1422        if slots.is_empty() {
1423            return Ok(vec![]);
1424        }
1425
1426        // Determine the column high-water mark (total node count for this label).
1427        let hwm = self.hwm_for_label(label_id).unwrap_or(0) as usize;
1428
1429        // Use sorted-slot reads when K < 50% of N.  For K=100, N=4039 that is
1430        // a ~40× reduction in bytes read per column.  The HWM=0 guard handles
1431        // labels that have no nodes yet (shouldn't reach here, but be safe).
1432        let use_sorted = hwm == 0 || slots.len() * 2 < hwm;
1433
1434        if use_sorted {
1435            // Sort slots ascending for sequential I/O; keep a permutation index
1436            // so we can return results in the original caller order.
1437            let mut order: Vec<usize> = (0..slots.len()).collect();
1438            order.sort_unstable_by_key(|&i| slots[i]);
1439            let sorted_slots: Vec<u32> = order.iter().map(|&i| slots[i]).collect();
1440
1441            let sorted_result = self.read_col_slots_sorted(label_id, &sorted_slots, col_ids)?;
1442
1443            // Re-order back to the original slot order expected by the caller.
1444            let mut result = vec![vec![0u64; col_ids.len()]; slots.len()];
1445            for (sorted_idx, orig_idx) in order.into_iter().enumerate() {
1446                result[orig_idx] = sorted_result[sorted_idx].clone();
1447            }
1448            Ok(result)
1449        } else {
1450            // Fall back to full column load when most slots are needed anyway.
1451            let col_data: Vec<Vec<u64>> = col_ids
1452                .iter()
1453                .map(|&col_id| self.read_col_all(label_id, col_id))
1454                .collect::<Result<_>>()?;
1455            Ok(slots
1456                .iter()
1457                .map(|&slot| {
1458                    col_ids
1459                        .iter()
1460                        .enumerate()
1461                        .map(|(ci, _)| col_data[ci].get(slot as usize).copied().unwrap_or(0))
1462                        .collect()
1463                })
1464                .collect())
1465        }
1466    }
1467}
1468
1469// ── Helpers ─────────────────────────────────────────────────────────────────── ───────────────────────────────────────────────────────────────────
1470
1471/// Unpack `(label_id, slot)` from a [`NodeId`].
1472pub fn unpack_node_id(node_id: NodeId) -> (u32, u32) {
1473    let label_id = (node_id.0 >> 32) as u32;
1474    let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1475    (label_id, slot)
1476}
1477
1478// ── Tests ─────────────────────────────────────────────────────────────────────
1479
1480#[cfg(test)]
1481mod tests {
1482    use super::*;
1483    use tempfile::tempdir;
1484
1485    #[test]
1486    fn test_node_create_and_get() {
1487        let dir = tempdir().unwrap();
1488        let mut store = NodeStore::open(dir.path()).unwrap();
1489
1490        let label_id = 1u32;
1491        let props = vec![(0u32, Value::Int64(42)), (1u32, Value::Int64(100))];
1492
1493        let node_id = store.create_node(label_id, &props).unwrap();
1494
1495        // Verify the packed node ID.
1496        let (lid, slot) = unpack_node_id(node_id);
1497        assert_eq!(lid, label_id);
1498        assert_eq!(slot, 0); // first node → slot 0
1499
1500        // Get back the values.
1501        let retrieved = store.get_node(node_id, &[0, 1]).unwrap();
1502        assert_eq!(retrieved.len(), 2);
1503        assert_eq!(retrieved[0], (0, Value::Int64(42)));
1504        assert_eq!(retrieved[1], (1, Value::Int64(100)));
1505    }
1506
1507    #[test]
1508    fn test_node_multiple_nodes_sequential_slots() {
1509        let dir = tempdir().unwrap();
1510        let mut store = NodeStore::open(dir.path()).unwrap();
1511
1512        let n1 = store.create_node(1, &[(0, Value::Int64(10))]).unwrap();
1513        let n2 = store.create_node(1, &[(0, Value::Int64(20))]).unwrap();
1514        let n3 = store.create_node(1, &[(0, Value::Int64(30))]).unwrap();
1515
1516        let (_, s1) = unpack_node_id(n1);
1517        let (_, s2) = unpack_node_id(n2);
1518        let (_, s3) = unpack_node_id(n3);
1519        assert_eq!(s1, 0);
1520        assert_eq!(s2, 1);
1521        assert_eq!(s3, 2);
1522
1523        assert_eq!(store.get_node(n1, &[0]).unwrap()[0].1, Value::Int64(10));
1524        assert_eq!(store.get_node(n2, &[0]).unwrap()[0].1, Value::Int64(20));
1525        assert_eq!(store.get_node(n3, &[0]).unwrap()[0].1, Value::Int64(30));
1526    }
1527
1528    #[test]
1529    fn test_node_persists_across_reopen() {
1530        let dir = tempdir().unwrap();
1531
1532        let node_id = {
1533            let mut store = NodeStore::open(dir.path()).unwrap();
1534            store
1535                .create_node(2, &[(0, Value::Int64(999)), (1, Value::Int64(-1))])
1536                .unwrap()
1537        };
1538
1539        // Reopen store from disk.
1540        let store2 = NodeStore::open(dir.path()).unwrap();
1541        let vals = store2.get_node(node_id, &[0, 1]).unwrap();
1542        assert_eq!(vals[0].1, Value::Int64(999));
1543        assert_eq!(vals[1].1, Value::Int64(-1));
1544    }
1545
1546    #[test]
1547    fn test_node_hwm_persists_across_reopen() {
1548        let dir = tempdir().unwrap();
1549
1550        // Create 3 nodes in session 1.
1551        {
1552            let mut store = NodeStore::open(dir.path()).unwrap();
1553            store.create_node(0, &[(0, Value::Int64(1))]).unwrap();
1554            store.create_node(0, &[(0, Value::Int64(2))]).unwrap();
1555            store.create_node(0, &[(0, Value::Int64(3))]).unwrap();
1556        }
1557
1558        // Reopen and create a 4th node — must get slot 3.
1559        let mut store2 = NodeStore::open(dir.path()).unwrap();
1560        let n4 = store2.create_node(0, &[(0, Value::Int64(4))]).unwrap();
1561        let (_, slot) = unpack_node_id(n4);
1562        assert_eq!(slot, 3);
1563    }
1564
1565    #[test]
1566    fn test_node_different_labels_independent() {
1567        let dir = tempdir().unwrap();
1568        let mut store = NodeStore::open(dir.path()).unwrap();
1569
1570        let a = store.create_node(10, &[(0, Value::Int64(1))]).unwrap();
1571        let b = store.create_node(20, &[(0, Value::Int64(2))]).unwrap();
1572
1573        let (la, sa) = unpack_node_id(a);
1574        let (lb, sb) = unpack_node_id(b);
1575        assert_eq!(la, 10);
1576        assert_eq!(sa, 0);
1577        assert_eq!(lb, 20);
1578        assert_eq!(sb, 0);
1579    }
1580}