Skip to main content

sparrowdb_storage/
node_store.rs

1//! Node property storage.
2//!
3//! Nodes are stored as typed property columns.  Each `(label_id, col_id)` pair
4//! maps to a flat binary file of fixed-width values.  Valid `u64` values pack
5//! `(label_id: u32, slot: u32)` into a single `u64` node ID consistent with
6//! [`sparrowdb_common::NodeId`] semantics.
7//!
8//! ## File layout
9//!
10//! ```text
11//! nodes/{label_id}/col_{col_id}.bin
12//! ```
13//!
14//! Each column file is a flat array of `u64` LE values (one per slot).
15//! The high-water mark is tracked in-memory and written to a small header file:
16//!
17//! ```text
18//! nodes/{label_id}/hwm.bin   — [hwm: u64 LE]
19//! ```
20//!
21//! ## Node ID packing
22//!
23//! ```text
24//! node_id = (label_id as u64) << 32 | slot as u64
25//! ```
26//!
27//! Upper 32 bits are `label_id`, lower 32 bits are the within-label slot number.
28
29use std::collections::HashMap;
30use std::fs;
31use std::path::{Path, PathBuf};
32
33use sparrowdb_common::{Error, NodeId, Result};
34
35// ── Value type ────────────────────────────────────────────────────────────────
36
37/// A typed property value.
38#[derive(Debug, Clone, PartialEq)]
39pub enum Value {
40    /// Signed 64-bit integer, stored as raw `u64` bits (two's-complement).
41    Int64(i64),
42    /// Raw byte blob, stored as a fixed-width 8-byte reference in v1.
43    /// The actual bytes are placed inline for values ≤ 8 bytes; longer blobs
44    /// are truncated and marked with a sentinel in v1 (overflow deferred).
45    Bytes(Vec<u8>),
46    /// IEEE-754 double-precision float.  Stored as 8 raw bytes in the overflow
47    /// heap so that no bits are masked by the type-tag scheme (SPA-267).
48    Float(f64),
49}
50
51/// Type tag embedded in the top byte (byte index 7 in LE) of a stored `u64`.
52///
53/// - `0x00` = `Int64`         — lower 7 bytes hold the signed integer (56-bit range).
54/// - `0x01` = `Bytes`         — lower 7 bytes hold up to 7 bytes of inline string data.
55/// - `0x02` = `BytesOverflow` — lower 7 bytes encode `(offset: u40 LE, len: u16 LE)`
56///   pointing into `strings.bin` (SPA-212).
57/// - `0x03` = `Float`         — lower 7 bytes encode `(offset: u40 LE, len: u16 LE)`
58///   pointing into `strings.bin` where 8 raw IEEE-754 bytes are stored (SPA-267).
59///
60/// The overflow encoding packs a heap pointer into 7 bytes:
61///   bytes[0..5] = heap byte offset (u40 LE, max ~1 TiB)
62///   bytes[5..7] = byte length (u16 LE, max 65535 bytes)
63///
64/// This lets `decode_raw_value` reconstruct strings of any length, fixing SPA-212.
65const TAG_INT64: u8 = 0x00;
66const TAG_BYTES: u8 = 0x01;
67/// Tag for strings > 7 bytes stored in the overflow string heap (SPA-212).
68const TAG_BYTES_OVERFLOW: u8 = 0x02;
69/// Tag for f64 values stored as 8 raw IEEE-754 bytes in the overflow heap (SPA-267).
70/// Using the heap ensures all 64 bits of the float are preserved without any masking.
71const TAG_FLOAT: u8 = 0x03;
72/// Maximum bytes that fit inline in the 7-byte payload (one byte is the tag).
73const MAX_INLINE_BYTES: usize = 7;
74
75impl Value {
76    /// Encode as a packed `u64` for column storage.
77    ///
78    /// The top byte (byte 7 in little-endian) is a type tag; the remaining
79    /// 7 bytes carry the payload.  This allows `from_u64` to reconstruct the
80    /// correct variant at read time (SPA-169).
81    ///
82    /// For `Bytes` values that exceed 7 bytes, this method only encodes the
83    /// first 7 bytes inline.  Callers that need full overflow support must use
84    /// [`NodeStore::encode_value`] instead, which writes long strings to the
85    /// heap and returns an overflow-tagged u64 (SPA-212).
86    ///
87    /// # Int64 range
88    /// Only the lower 56 bits of the integer are stored.  This covers all
89    /// practical node IDs and numeric property values; very large i64 values
90    /// (> 2^55 or < -2^55) would be truncated.  Full 64-bit range is deferred
91    /// to a later overflow encoding.
92    pub fn to_u64(&self) -> u64 {
93        match self {
94            Value::Int64(v) => {
95                // Top byte = TAG_INT64 (0x00); lower 7 bytes = lower 56 bits of v.
96                // For TAG_INT64 = 0x00 this is just the value masked to 56 bits,
97                // which is a no-op for any i64 whose top byte is already 0x00.
98                let payload = (*v as u64) & 0x00FF_FFFF_FFFF_FFFF;
99                // Tag byte goes into byte 7 (the most significant byte in LE).
100                payload | ((TAG_INT64 as u64) << 56)
101            }
102            Value::Bytes(b) => {
103                let mut arr = [0u8; 8];
104                arr[7] = TAG_BYTES; // type tag in top byte
105                let len = b.len().min(MAX_INLINE_BYTES);
106                arr[..len].copy_from_slice(&b[..len]);
107                u64::from_le_bytes(arr)
108            }
109            Value::Float(_) => {
110                // Float values require heap storage — callers must use
111                // NodeStore::encode_value instead of Value::to_u64.
112                panic!("Value::Float cannot be inline-encoded; use NodeStore::encode_value");
113            }
114        }
115    }
116
117    /// Reconstruct a `Value` from a stored `u64`, using the top byte as a
118    /// type tag (SPA-169).
119    ///
120    /// Only handles inline encodings (`TAG_INT64` and `TAG_BYTES`).
121    /// For overflow strings (`TAG_BYTES_OVERFLOW`), use [`NodeStore::decode_raw_value`]
122    /// which has access to the string heap (SPA-212).
123    pub fn from_u64(v: u64) -> Self {
124        let bytes = v.to_le_bytes(); // bytes[7] = top byte = tag
125        match bytes[7] {
126            TAG_BYTES => {
127                // Inline string: bytes[0..7] hold the data; strip trailing zeros.
128                let data: Vec<u8> = bytes[..7].iter().copied().take_while(|&b| b != 0).collect();
129                Value::Bytes(data)
130            }
131            _ => {
132                // TAG_INT64 (0x00) or any unrecognised tag → Int64.
133                // Sign-extend from 56 bits: shift left 8 to bring bit 55 into
134                // sign position, then arithmetic shift right 8.
135                let shifted = (v << 8) as i64;
136                Value::Int64(shifted >> 8)
137            }
138        }
139    }
140
141    /// Reconstruct an `Int64` value from a stored `u64`.
142    ///
143    /// Preserved for callers that know the column type is always Int64 (e.g.
144    /// pre-SPA-169 paths).  New code should prefer `from_u64`.
145    pub fn int64_from_u64(v: u64) -> Self {
146        Value::Int64(v as i64)
147    }
148}
149
150// ── NodeStore ─────────────────────────────────────────────────────────────────
151
152/// Persistent node property store rooted at a database directory.
153///
154/// On-disk layout:
155/// ```text
156/// {root}/nodes/{label_id}/hwm.bin            — high-water mark (u64 LE)
157/// {root}/nodes/{label_id}/col_{col_id}.bin   — flat u64 column array
158/// {root}/strings.bin                         — overflow string heap (SPA-212)
159/// ```
160///
161/// The overflow heap is an append-only byte file.  Each entry is a raw byte
162/// sequence (no length prefix); the offset and length are encoded into the
163/// `TAG_BYTES_OVERFLOW` u64 stored in the column file.
164pub struct NodeStore {
165    root: PathBuf,
166    /// In-memory high-water marks per label.  Loaded lazily from disk.
167    hwm: HashMap<u32, u64>,
168}
169
170impl NodeStore {
171    /// Open (or create) a node store rooted at `db_root`.
172    pub fn open(db_root: &Path) -> Result<Self> {
173        Ok(NodeStore {
174            root: db_root.to_path_buf(),
175            hwm: HashMap::new(),
176        })
177    }
178
179    // ── Internal helpers ──────────────────────────────────────────────────────
180
181    fn label_dir(&self, label_id: u32) -> PathBuf {
182        self.root.join("nodes").join(label_id.to_string())
183    }
184
185    fn hwm_path(&self, label_id: u32) -> PathBuf {
186        self.label_dir(label_id).join("hwm.bin")
187    }
188
189    fn col_path(&self, label_id: u32, col_id: u32) -> PathBuf {
190        self.label_dir(label_id).join(format!("col_{col_id}.bin"))
191    }
192
193    /// Path to the overflow string heap (shared across all labels).
194    fn strings_bin_path(&self) -> PathBuf {
195        self.root.join("strings.bin")
196    }
197
198    // ── Overflow string heap (SPA-212) ────────────────────────────────────────
199
200    /// Append `bytes` to the overflow string heap and return an
201    /// `TAG_BYTES_OVERFLOW`-tagged `u64` encoding the (offset, len) pair.
202    ///
203    /// Layout of the returned `u64` (little-endian bytes):
204    ///   bytes[0..5] = heap byte offset as u40 LE  (max ~1 TiB)
205    ///   bytes[5..7] = byte length as u16 LE       (max 65 535 bytes)
206    ///   bytes[7]    = `TAG_BYTES_OVERFLOW` (0x02)
207    fn append_to_string_heap(&self, bytes: &[u8]) -> Result<u64> {
208        use std::io::{Seek, SeekFrom, Write};
209        let path = self.strings_bin_path();
210        let mut file = fs::OpenOptions::new()
211            .create(true)
212            .truncate(false)
213            .append(false)
214            .read(true)
215            .write(true)
216            .open(&path)
217            .map_err(Error::Io)?;
218
219        // The heap offset is the current end of the file.
220        let offset = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
221        file.write_all(bytes).map_err(Error::Io)?;
222
223        // Encode (offset, len) into a 7-byte payload.
224        let len = bytes.len() as u64;
225        debug_assert!(
226            offset <= 0x00FF_FFFF_FFFF_u64,
227            "string heap too large for 5-byte offset"
228        );
229        debug_assert!(len <= 0xFFFF, "string longer than 65535 bytes");
230
231        let mut arr = [0u8; 8];
232        // Store offset in bytes[0..5] (40-bit LE).
233        arr[0] = offset as u8;
234        arr[1] = (offset >> 8) as u8;
235        arr[2] = (offset >> 16) as u8;
236        arr[3] = (offset >> 24) as u8;
237        arr[4] = (offset >> 32) as u8;
238        // Store len in bytes[5..7] (16-bit LE).
239        arr[5] = len as u8;
240        arr[6] = (len >> 8) as u8;
241        // Tag byte.
242        arr[7] = TAG_BYTES_OVERFLOW;
243        Ok(u64::from_le_bytes(arr))
244    }
245
246    /// Read string bytes from the overflow heap given an `TAG_BYTES_OVERFLOW`
247    /// tagged `u64` produced by [`append_to_string_heap`].
248    fn read_from_string_heap(&self, tagged: u64) -> Result<Vec<u8>> {
249        let arr = tagged.to_le_bytes();
250        debug_assert_eq!(arr[7], TAG_BYTES_OVERFLOW, "not an overflow pointer");
251
252        // Decode (offset, len).
253        let offset = arr[0] as u64
254            | ((arr[1] as u64) << 8)
255            | ((arr[2] as u64) << 16)
256            | ((arr[3] as u64) << 24)
257            | ((arr[4] as u64) << 32);
258        let len = arr[5] as usize | ((arr[6] as usize) << 8);
259
260        let path = self.strings_bin_path();
261        let file_bytes = fs::read(&path).map_err(Error::Io)?;
262        let end = offset as usize + len;
263        if file_bytes.len() < end {
264            return Err(Error::Corruption(format!(
265                "string heap too short: need {} bytes, have {}",
266                end,
267                file_bytes.len()
268            )));
269        }
270        Ok(file_bytes[offset as usize..end].to_vec())
271    }
272
273    // ── Value encode / decode with overflow support ───────────────────────────
274
275    /// Encode a `Value` for column storage, writing long `Bytes` strings to
276    /// the overflow heap (SPA-212).
277    ///
278    /// - `Int64`          → identical to `Value::to_u64()`.
279    /// - `Bytes` ≤ 7 B    → inline `TAG_BYTES` encoding, identical to `Value::to_u64()`.
280    /// - `Bytes` > 7 B    → appended to `strings.bin`; returns `TAG_BYTES_OVERFLOW` u64.
281    /// - `Float`          → 8 raw IEEE-754 bytes appended to `strings.bin`;
282    ///   returns a `TAG_FLOAT` u64 so all 64 float bits are preserved (SPA-267).
283    pub fn encode_value(&self, val: &Value) -> Result<u64> {
284        match val {
285            Value::Int64(_) => Ok(val.to_u64()),
286            Value::Bytes(b) if b.len() <= MAX_INLINE_BYTES => Ok(val.to_u64()),
287            Value::Bytes(b) => self.append_to_string_heap(b),
288            // SPA-267: store all 8 float bytes in the heap so no bits are masked.
289            // The heap pointer uses the same (offset: u40, len: u16) layout as
290            // TAG_BYTES_OVERFLOW but with TAG_FLOAT in byte 7.
291            Value::Float(f) => {
292                let bits = f.to_bits().to_le_bytes();
293                let heap_tagged = self.append_to_string_heap(&bits)?;
294                // Replace the TAG_BYTES_OVERFLOW tag byte with TAG_FLOAT.
295                let payload = heap_tagged & 0x00FF_FFFF_FFFF_FFFF;
296                Ok((TAG_FLOAT as u64) << 56 | payload)
297            }
298        }
299    }
300
301    /// Decode a raw `u64` column value back to a `Value`, reading the
302    /// overflow string heap when the tag is `TAG_BYTES_OVERFLOW` or `TAG_FLOAT` (SPA-212, SPA-267).
303    ///
304    /// Handles all four tags:
305    /// - `TAG_INT64`          → `Value::Int64`
306    /// - `TAG_BYTES`          → `Value::Bytes` (inline, ≤ 7 bytes)
307    /// - `TAG_BYTES_OVERFLOW` → `Value::Bytes` (from heap)
308    /// - `TAG_FLOAT`          → `Value::Float` (8 raw IEEE-754 bytes from heap)
309    pub fn decode_raw_value(&self, raw: u64) -> Value {
310        let tag = (raw >> 56) as u8;
311        match tag {
312            TAG_BYTES_OVERFLOW => match self.read_from_string_heap(raw) {
313                Ok(bytes) => Value::Bytes(bytes),
314                Err(e) => {
315                    // Corruption fallback: return empty bytes and log.
316                    eprintln!(
317                        "WARN: failed to read overflow string from heap (raw={raw:#018x}): {e}"
318                    );
319                    Value::Bytes(Vec::new())
320                }
321            },
322            // SPA-267: float values are stored as 8-byte IEEE-754 blobs in the heap.
323            // Reconstruct the heap pointer by swapping TAG_FLOAT → TAG_BYTES_OVERFLOW
324            // so read_from_string_heap can locate the bytes.
325            TAG_FLOAT => {
326                let payload = raw & 0x00FF_FFFF_FFFF_FFFF;
327                let heap_tagged = (TAG_BYTES_OVERFLOW as u64) << 56 | payload;
328                match self.read_from_string_heap(heap_tagged) {
329                    Ok(bytes) if bytes.len() == 8 => {
330                        let arr: [u8; 8] = bytes.try_into().unwrap();
331                        Value::Float(f64::from_bits(u64::from_le_bytes(arr)))
332                    }
333                    Ok(bytes) => {
334                        eprintln!(
335                            "WARN: float heap blob has unexpected length {} (raw={raw:#018x})",
336                            bytes.len()
337                        );
338                        Value::Float(f64::NAN)
339                    }
340                    Err(e) => {
341                        eprintln!("WARN: failed to read float from heap (raw={raw:#018x}): {e}");
342                        Value::Float(f64::NAN)
343                    }
344                }
345            }
346            _ => Value::from_u64(raw),
347        }
348    }
349
350    /// Check whether a raw stored `u64` encodes a string equal to `s`.
351    ///
352    /// Handles both inline (`TAG_BYTES`) and overflow (`TAG_BYTES_OVERFLOW`)
353    /// encodings (SPA-212).  Used by WHERE-clause and prop-filter comparison.
354    pub fn raw_str_matches(&self, raw: u64, s: &str) -> bool {
355        let tag = (raw >> 56) as u8;
356        match tag {
357            TAG_BYTES => {
358                // Fast inline comparison: encode s the same way and compare u64s.
359                raw == Value::Bytes(s.as_bytes().to_vec()).to_u64()
360            }
361            TAG_BYTES_OVERFLOW => {
362                // Overflow: read from heap and compare bytes.
363                match self.read_from_string_heap(raw) {
364                    Ok(bytes) => bytes == s.as_bytes(),
365                    Err(_) => false,
366                }
367            }
368            _ => false, // INT64 or unknown — not a string
369        }
370    }
371
372    /// Read the high-water mark for `label_id` from disk (or return 0).
373    fn load_hwm(&self, label_id: u32) -> Result<u64> {
374        let path = self.hwm_path(label_id);
375        if !path.exists() {
376            return Ok(0);
377        }
378        let bytes = fs::read(&path).map_err(Error::Io)?;
379        if bytes.len() < 8 {
380            return Err(Error::Corruption(format!(
381                "hwm.bin for label {label_id} is truncated"
382            )));
383        }
384        Ok(u64::from_le_bytes(bytes[..8].try_into().unwrap()))
385    }
386
387    /// Write the high-water mark for `label_id` to disk.
388    fn save_hwm(&self, label_id: u32, hwm: u64) -> Result<()> {
389        let path = self.hwm_path(label_id);
390        if let Some(parent) = path.parent() {
391            fs::create_dir_all(parent).map_err(Error::Io)?;
392        }
393        fs::write(&path, hwm.to_le_bytes()).map_err(Error::Io)
394    }
395
396    /// Append a `u64` value to a column file.
397    fn append_col(&self, label_id: u32, col_id: u32, slot: u32, value: u64) -> Result<()> {
398        use std::io::{Seek, SeekFrom, Write};
399        let path = self.col_path(label_id, col_id);
400        if let Some(parent) = path.parent() {
401            fs::create_dir_all(parent).map_err(Error::Io)?;
402        }
403        // Open without truncation so we can inspect the current length.
404        let mut file = fs::OpenOptions::new()
405            .create(true)
406            .truncate(false)
407            .read(true)
408            .write(true)
409            .open(&path)
410            .map_err(Error::Io)?;
411
412        // Pad with zeros for any slots that were skipped (sparse write pattern).
413        // Without padding, a later slot's value would be written at offset 0,
414        // causing earlier slots to incorrectly read that value.
415        let existing_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
416        let expected_offset = slot as u64 * 8;
417        if existing_len < expected_offset {
418            file.seek(SeekFrom::Start(existing_len))
419                .map_err(Error::Io)?;
420            const CHUNK: usize = 65536;
421            let zeros = [0u8; CHUNK];
422            let mut remaining = (expected_offset - existing_len) as usize;
423            while remaining > 0 {
424                let n = remaining.min(CHUNK);
425                file.write_all(&zeros[..n]).map_err(Error::Io)?;
426                remaining -= n;
427            }
428        }
429
430        // Seek to the correct slot position and write the value.
431        file.seek(SeekFrom::Start(expected_offset))
432            .map_err(Error::Io)?;
433        file.write_all(&value.to_le_bytes()).map_err(Error::Io)
434    }
435
436    /// Read the `u64` stored at `slot` in the given column file.
437    ///
438    /// Returns `Ok(0)` when the column file does not exist yet — a missing file
439    /// means no value has ever been written for this `(label_id, col_id)` pair,
440    /// which is represented as the zero bit-pattern (SPA-166).
441    fn read_col_slot(&self, label_id: u32, col_id: u32, slot: u32) -> Result<u64> {
442        let path = self.col_path(label_id, col_id);
443        let bytes = match fs::read(&path) {
444            Ok(b) => b,
445            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
446            Err(e) => return Err(Error::Io(e)),
447        };
448        let offset = slot as usize * 8;
449        if bytes.len() < offset + 8 {
450            return Err(Error::NotFound);
451        }
452        Ok(u64::from_le_bytes(
453            bytes[offset..offset + 8].try_into().unwrap(),
454        ))
455    }
456
457    // ── Public API ────────────────────────────────────────────────────────────
458
459    /// Return the high-water mark (slot count) for a label.
460    ///
461    /// Returns `0` if no nodes have been created for that label yet.
462    pub fn hwm_for_label(&self, label_id: u32) -> Result<u64> {
463        if let Some(&h) = self.hwm.get(&label_id) {
464            return Ok(h);
465        }
466        self.load_hwm(label_id)
467    }
468
469    /// Discover all column IDs that currently exist on disk for `label_id`.
470    ///
471    /// Scans the label directory for `col_{id}.bin` files and returns the
472    /// parsed `col_id` values.  Used by `create_node` to zero-pad columns
473    /// that are not supplied for a new node (SPA-187).
474    ///
475    /// Returns `Err` when the directory exists but cannot be read (e.g.
476    /// permissions failure or I/O error).  A missing directory is not an
477    /// error — it simply means no nodes of this label have been created yet.
478    pub fn col_ids_for_label(&self, label_id: u32) -> Result<Vec<u32>> {
479        self.existing_col_ids(label_id)
480    }
481
482    fn existing_col_ids(&self, label_id: u32) -> Result<Vec<u32>> {
483        let dir = self.label_dir(label_id);
484        let read_dir = match fs::read_dir(&dir) {
485            Ok(rd) => rd,
486            // Directory does not exist yet → no columns on disk.
487            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
488            Err(e) => return Err(Error::Io(e)),
489        };
490        let mut ids: Vec<u32> = read_dir
491            .flatten()
492            .filter_map(|entry| {
493                let name = entry.file_name();
494                let name_str = name.to_string_lossy().into_owned();
495                // Match "col_{col_id}.bin" filenames.
496                let id_str = name_str.strip_prefix("col_")?.strip_suffix(".bin")?;
497                id_str.parse::<u32>().ok()
498            })
499            .collect();
500        ids.sort_unstable();
501        Ok(ids)
502    }
503
504    /// Return the **on-disk** high-water mark for a label, bypassing any
505    /// in-memory advances made by `peek_next_slot`.
506    ///
507    /// Used by [`WriteTx::merge_node`] to limit the disk scan to only slots
508    /// that have actually been persisted.
509    pub fn disk_hwm_for_label(&self, label_id: u32) -> Result<u64> {
510        self.load_hwm(label_id)
511    }
512
513    /// Reserve the slot index that the *next* `create_node` call will use for
514    /// `label_id`, advancing the in-memory HWM so that the slot is not
515    /// assigned again within the same [`NodeStore`] instance.
516    ///
517    /// This is used by [`WriteTx::create_node`] to pre-compute a [`NodeId`]
518    /// before the actual disk write, so the ID can be returned to the caller
519    /// while the write is deferred until commit (SPA-181).
520    ///
521    /// The on-disk HWM is **not** updated here; it is updated when the
522    /// buffered `NodeCreate` operation is applied in `commit()`.
523    pub fn peek_next_slot(&mut self, label_id: u32) -> Result<u32> {
524        // Load from disk if not cached yet.
525        if !self.hwm.contains_key(&label_id) {
526            let h = self.load_hwm(label_id)?;
527            self.hwm.insert(label_id, h);
528        }
529        let h = *self.hwm.get(&label_id).unwrap();
530        // Advance the in-memory HWM so a subsequent peek returns the next slot.
531        self.hwm.insert(label_id, h + 1);
532        Ok(h as u32)
533    }
534
535    /// Write a node at a pre-reserved `slot` (SPA-181 commit path).
536    ///
537    /// Like [`create_node`] but uses the caller-specified `slot` index instead
538    /// of deriving it from the HWM.  Used by [`WriteTx::commit`] to flush
539    /// buffered node-create operations in the exact order they were issued,
540    /// with slots that were already pre-allocated by [`peek_next_slot`].
541    ///
542    /// Advances the on-disk HWM to `slot + 1` (or higher if already past that).
543    pub fn create_node_at_slot(
544        &mut self,
545        label_id: u32,
546        slot: u32,
547        props: &[(u32, Value)],
548    ) -> Result<NodeId> {
549        // Snapshot original column sizes for rollback on partial failure.
550        let original_sizes: Vec<(u32, u64)> = props
551            .iter()
552            .map(|&(col_id, _)| {
553                let path = self.col_path(label_id, col_id);
554                let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
555                (col_id, size)
556            })
557            .collect();
558
559        let write_result = (|| {
560            for &(col_id, ref val) in props {
561                self.append_col(label_id, col_id, slot, self.encode_value(val)?)?;
562            }
563            Ok::<(), sparrowdb_common::Error>(())
564        })();
565
566        if let Err(e) = write_result {
567            for (col_id, original_size) in &original_sizes {
568                let path = self.col_path(label_id, *col_id);
569                if path.exists() {
570                    if let Err(rollback_err) = fs::OpenOptions::new()
571                        .write(true)
572                        .open(&path)
573                        .and_then(|f| f.set_len(*original_size))
574                    {
575                        eprintln!(
576                            "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
577                            path.display(),
578                            original_size,
579                            rollback_err
580                        );
581                    }
582                }
583            }
584            return Err(e);
585        }
586
587        // Advance the on-disk HWM to at least slot + 1.
588        // Always write to disk; in-memory HWM may have been speculatively
589        // advanced by peek_next_slot but the disk HWM may be lower.
590        let new_hwm = slot as u64 + 1;
591        let disk_hwm = self.load_hwm(label_id)?;
592        if new_hwm > disk_hwm {
593            if let Err(e) = self.save_hwm(label_id, new_hwm) {
594                // Column bytes were already written; roll them back to preserve
595                // the atomicity guarantee (SPA-181).
596                for (col_id, original_size) in &original_sizes {
597                    let path = self.col_path(label_id, *col_id);
598                    if path.exists() {
599                        if let Err(rollback_err) = fs::OpenOptions::new()
600                            .write(true)
601                            .open(&path)
602                            .and_then(|f| f.set_len(*original_size))
603                        {
604                            eprintln!(
605                                "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
606                                path.display(),
607                                original_size,
608                                rollback_err
609                            );
610                        }
611                    }
612                }
613                return Err(e);
614            }
615        }
616        // Keep in-memory HWM at least as high as new_hwm.
617        let mem_hwm = self.hwm.get(&label_id).copied().unwrap_or(0);
618        if new_hwm > mem_hwm {
619            self.hwm.insert(label_id, new_hwm);
620        }
621
622        Ok(NodeId((label_id as u64) << 32 | slot as u64))
623    }
624
625    /// Create a new node in `label_id` with the given properties.
626    ///
627    /// Returns the new [`NodeId`] packed as `(label_id << 32) | slot`.
628    ///
629    /// ## Slot alignment guarantee (SPA-187)
630    ///
631    /// Every column file for `label_id` must have exactly `node_count * 8`
632    /// bytes so that slot N always refers to node N across all columns.  When
633    /// a node is created without a value for an already-known column, that
634    /// column file is zero-padded to `(slot + 1) * 8` bytes.  The zero
635    /// sentinel is recognised by `read_col_slot_nullable` as "absent" and
636    /// surfaces as `Value::Null` in query results.
637    pub fn create_node(&mut self, label_id: u32, props: &[(u32, Value)]) -> Result<NodeId> {
638        // Load or get cached hwm.
639        let hwm = if let Some(h) = self.hwm.get(&label_id) {
640            *h
641        } else {
642            let h = self.load_hwm(label_id)?;
643            self.hwm.insert(label_id, h);
644            h
645        };
646
647        let slot = hwm as u32;
648
649        // Collect the set of col_ids supplied for this node.
650        let supplied_col_ids: std::collections::HashSet<u32> =
651            props.iter().map(|&(col_id, _)| col_id).collect();
652
653        // Discover all columns already on disk for this label.  Any that are
654        // NOT in `supplied_col_ids` must be zero-padded so their slot count
655        // stays in sync with the HWM (SPA-187).
656        let existing_col_ids = self.existing_col_ids(label_id)?;
657
658        // Columns that need zero-padding: exist on disk but not in this node's props.
659        let cols_to_zero_pad: Vec<u32> = existing_col_ids
660            .iter()
661            .copied()
662            .filter(|col_id| !supplied_col_ids.contains(col_id))
663            .collect();
664
665        // Build the full list of columns to touch for rollback tracking:
666        // supplied columns + columns that need zero-padding.
667        let all_col_ids_to_touch: Vec<u32> = supplied_col_ids
668            .iter()
669            .copied()
670            .chain(cols_to_zero_pad.iter().copied())
671            .collect();
672
673        // Snapshot the original size of each column file so we can roll back
674        // on partial failure.  A column that does not yet exist has size 0.
675        let original_sizes: Vec<(u32, u64)> = all_col_ids_to_touch
676            .iter()
677            .map(|&col_id| {
678                let path = self.col_path(label_id, col_id);
679                let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
680                (col_id, size)
681            })
682            .collect();
683
684        // Write each property column.  For columns not in `props`, write a
685        // zero-padded entry (the zero sentinel means "absent" / NULL).
686        // On failure, roll back all columns that were already written to avoid
687        // slot misalignment.
688        let write_result = (|| {
689            // Write supplied columns with their actual values.
690            for &(col_id, ref val) in props {
691                self.append_col(label_id, col_id, slot, self.encode_value(val)?)?;
692            }
693            // Zero-pad existing columns that were NOT supplied for this node.
694            for &col_id in &cols_to_zero_pad {
695                self.append_col(label_id, col_id, slot, 0u64)?;
696            }
697            Ok::<(), sparrowdb_common::Error>(())
698        })();
699
700        if let Err(e) = write_result {
701            // Truncate each column back to its original size.
702            for (col_id, original_size) in &original_sizes {
703                let path = self.col_path(label_id, *col_id);
704                if path.exists() {
705                    if let Err(rollback_err) = fs::OpenOptions::new()
706                        .write(true)
707                        .open(&path)
708                        .and_then(|f| f.set_len(*original_size))
709                    {
710                        eprintln!(
711                            "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
712                            path.display(),
713                            original_size,
714                            rollback_err
715                        );
716                    }
717                }
718            }
719            return Err(e);
720        }
721
722        // Update hwm.
723        let new_hwm = hwm + 1;
724        self.save_hwm(label_id, new_hwm)?;
725        *self.hwm.get_mut(&label_id).unwrap() = new_hwm;
726
727        // Pack node ID.
728        let node_id = ((label_id as u64) << 32) | (slot as u64);
729        Ok(NodeId(node_id))
730    }
731
732    /// Write a deletion tombstone (`u64::MAX`) into `col_0.bin` for `node_id`.
733    ///
734    /// Creates `col_0.bin` (and its parent directory) if it does not exist,
735    /// zero-padding all preceding slots.  This ensures that nodes which were
736    /// created without any `col_0` property are still properly marked as deleted
737    /// and become invisible to subsequent scans.
738    ///
739    /// Called from [`WriteTx::commit`] when flushing a buffered `NodeDelete`.
740    pub fn tombstone_node(&self, node_id: NodeId) -> Result<()> {
741        use std::io::{Seek, SeekFrom, Write};
742        let label_id = (node_id.0 >> 32) as u32;
743        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
744        let col0 = self.col_path(label_id, 0);
745
746        // Ensure the parent directory exists.
747        if let Some(parent) = col0.parent() {
748            fs::create_dir_all(parent).map_err(Error::Io)?;
749        }
750
751        let mut f = fs::OpenOptions::new()
752            .create(true)
753            .truncate(false)
754            .read(true)
755            .write(true)
756            .open(&col0)
757            .map_err(Error::Io)?;
758
759        // Zero-pad any slots before `slot` that are not yet in the file.
760        let needed_len = (slot as u64 + 1) * 8;
761        let existing_len = f.seek(SeekFrom::End(0)).map_err(Error::Io)?;
762        if existing_len < needed_len {
763            let zeros = vec![0u8; (needed_len - existing_len) as usize];
764            f.write_all(&zeros).map_err(Error::Io)?;
765        }
766
767        // Seek to the slot and write the tombstone value.
768        f.seek(SeekFrom::Start(slot as u64 * 8))
769            .map_err(Error::Io)?;
770        f.write_all(&u64::MAX.to_le_bytes()).map_err(Error::Io)
771    }
772
773    /// Overwrite the value of a single column for an existing node.
774    ///
775    /// Seeks to the slot's offset within `col_{col_id}.bin` and writes the new
776    /// 8-byte little-endian value in-place.  Returns `Err(NotFound)` if the
777    /// slot does not exist yet.
778    pub fn set_node_col(&self, node_id: NodeId, col_id: u32, value: &Value) -> Result<()> {
779        use std::io::{Seek, SeekFrom, Write};
780        let label_id = (node_id.0 >> 32) as u32;
781        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
782        let path = self.col_path(label_id, col_id);
783        let mut file = fs::OpenOptions::new()
784            .write(true)
785            .open(&path)
786            .map_err(|_| Error::NotFound)?;
787        let offset = slot as u64 * 8;
788        file.seek(SeekFrom::Start(offset)).map_err(Error::Io)?;
789        file.write_all(&self.encode_value(value)?.to_le_bytes())
790            .map_err(Error::Io)
791    }
792
793    /// Write or create a column value for a node, creating and zero-padding the
794    /// column file if it does not yet exist.
795    ///
796    /// Unlike [`set_node_col`], this method creates the column file and fills all
797    /// slots from 0 to `slot - 1` with zeros before writing the target value.
798    /// This supports adding new property columns to existing nodes (Phase 7
799    /// `set_property` semantics).
800    pub fn upsert_node_col(&self, node_id: NodeId, col_id: u32, value: &Value) -> Result<()> {
801        use std::io::{Seek, SeekFrom, Write};
802        let label_id = (node_id.0 >> 32) as u32;
803        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
804        let path = self.col_path(label_id, col_id);
805
806        // Ensure parent directory exists.
807        if let Some(parent) = path.parent() {
808            fs::create_dir_all(parent).map_err(Error::Io)?;
809        }
810
811        // Open the file (create if absent), then pad with zeros up to and
812        // including the target slot, and finally seek back to write the value.
813        // We must NOT truncate: we only update a specific slot, not the whole file.
814        let mut file = fs::OpenOptions::new()
815            .create(true)
816            .truncate(false)
817            .read(true)
818            .write(true)
819            .open(&path)
820            .map_err(Error::Io)?;
821
822        // How many bytes are already in the file?
823        let existing_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
824        let needed_len = (slot as u64 + 1) * 8;
825        if existing_len < needed_len {
826            // Extend file with zeros from existing_len to needed_len.
827            // Write in fixed-size chunks to avoid a single large allocation
828            // that could OOM when the node slot ID is very high.
829            file.seek(SeekFrom::Start(existing_len))
830                .map_err(Error::Io)?;
831            const CHUNK: usize = 65536; // 64 KiB
832            let zeros = [0u8; CHUNK];
833            let mut remaining = (needed_len - existing_len) as usize;
834            while remaining > 0 {
835                let n = remaining.min(CHUNK);
836                file.write_all(&zeros[..n]).map_err(Error::Io)?;
837                remaining -= n;
838            }
839        }
840
841        // Seek to target slot and overwrite.
842        file.seek(SeekFrom::Start(slot as u64 * 8))
843            .map_err(Error::Io)?;
844        file.write_all(&self.encode_value(value)?.to_le_bytes())
845            .map_err(Error::Io)
846    }
847
848    /// Retrieve all stored properties of a node.
849    ///
850    /// Returns `(col_id, raw_u64)` pairs in the order the columns were defined.
851    /// The caller knows the schema (col IDs) from the catalog.
852    pub fn get_node_raw(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, u64)>> {
853        let label_id = (node_id.0 >> 32) as u32;
854        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
855
856        let mut result = Vec::with_capacity(col_ids.len());
857        for &col_id in col_ids {
858            let val = self.read_col_slot(label_id, col_id, slot)?;
859            result.push((col_id, val));
860        }
861        Ok(result)
862    }
863
864    /// Like [`get_node_raw`] but treats absent columns as `None` rather than
865    /// propagating [`Error::NotFound`].
866    ///
867    /// A column is considered absent when:
868    /// - Its column file does not exist (property never written for the label).
869    /// - Its column file is shorter than `slot + 1` entries (sparse write —
870    ///   an earlier node never wrote this column; a later node that did write it
871    ///   padded the file, but this slot's value was never explicitly stored).
872    ///
873    /// This is the correct read path for IS NULL evaluation: absent properties
874    /// must appear as `Value::Null`, not as an error or as integer 0.
875    pub fn get_node_raw_nullable(
876        &self,
877        node_id: NodeId,
878        col_ids: &[u32],
879    ) -> Result<Vec<(u32, Option<u64>)>> {
880        let label_id = (node_id.0 >> 32) as u32;
881        let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
882
883        let mut result = Vec::with_capacity(col_ids.len());
884        for &col_id in col_ids {
885            let opt_val = self.read_col_slot_nullable(label_id, col_id, slot)?;
886            result.push((col_id, opt_val));
887        }
888        Ok(result)
889    }
890
891    /// Read a single column slot, returning `None` when the column was never
892    /// written for this node (file absent or slot out of bounds / zero-padded).
893    ///
894    /// Unlike [`read_col_slot`], this function distinguishes between:
895    /// - Column file does not exist → `None` (property never set on any node).
896    /// - Slot falls within the file but reads as 0 → the slot was zero-padded
897    ///   by `append_col` for a later node's write; treat as `None`.
898    /// - Slot has a non-zero value → `Some(value)`.
899    /// - Slot is beyond the file end → `None` (property not set on this node).
900    ///
901    /// Value 0 is used as the "absent" sentinel: the storage encoding ensures
902    /// that any legitimately stored property (Int64, Bytes, Bool, Float) encodes
903    /// to a non-zero u64.  Specifically, `StoreValue::Int64(0)` stores as 0 and
904    /// would be misidentified as absent — callers that need to store integer 0
905    /// should be aware of this limitation.
906    fn read_col_slot_nullable(&self, label_id: u32, col_id: u32, slot: u32) -> Result<Option<u64>> {
907        let path = self.col_path(label_id, col_id);
908        let bytes = match fs::read(&path) {
909            Ok(b) => b,
910            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
911            Err(e) => return Err(Error::Io(e)),
912        };
913        let offset = slot as usize * 8;
914        if bytes.len() < offset + 8 {
915            return Ok(None);
916        }
917        let raw = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
918        // Zero means "never written" (absent). Non-zero means a real value.
919        if raw == 0 {
920            Ok(None)
921        } else {
922            Ok(Some(raw))
923        }
924    }
925
926    /// Retrieve the typed property values for a node.
927    ///
928    /// Convenience wrapper over [`get_node_raw`] that decodes every raw `u64`
929    /// back to a `Value`, reading the overflow string heap when needed (SPA-212).
930    pub fn get_node(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, Value)>> {
931        let raw = self.get_node_raw(node_id, col_ids)?;
932        Ok(raw
933            .into_iter()
934            .map(|(col_id, v)| (col_id, self.decode_raw_value(v)))
935            .collect())
936    }
937
938    /// Read the entire contents of `col_{col_id}.bin` for `label_id` as a
939    /// flat `Vec<u64>`.  Returns an empty vec when the file does not exist yet.
940    ///
941    /// This is used by [`crate::property_index::PropertyIndex::build`] to scan
942    /// all slot values in one `fs::read` call rather than one `read_col_slot`
943    /// per node, making index construction O(n) rather than O(n * syscall-overhead).
944    pub fn read_col_all(&self, label_id: u32, col_id: u32) -> Result<Vec<u64>> {
945        let path = self.col_path(label_id, col_id);
946        let bytes = match fs::read(&path) {
947            Ok(b) => b,
948            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
949            Err(e) => return Err(Error::Io(e)),
950        };
951        // Interpret the byte slice as a flat array of little-endian u64s.
952        let count = bytes.len() / 8;
953        let mut out = Vec::with_capacity(count);
954        for i in 0..count {
955            let offset = i * 8;
956            let v = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
957            out.push(v);
958        }
959        Ok(out)
960    }
961}
962
963// ── Helpers ─────────────────────────────────────────────────────────────────── ───────────────────────────────────────────────────────────────────
964
965/// Unpack `(label_id, slot)` from a [`NodeId`].
966pub fn unpack_node_id(node_id: NodeId) -> (u32, u32) {
967    let label_id = (node_id.0 >> 32) as u32;
968    let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
969    (label_id, slot)
970}
971
972// ── Tests ─────────────────────────────────────────────────────────────────────
973
974#[cfg(test)]
975mod tests {
976    use super::*;
977    use tempfile::tempdir;
978
979    #[test]
980    fn test_node_create_and_get() {
981        let dir = tempdir().unwrap();
982        let mut store = NodeStore::open(dir.path()).unwrap();
983
984        let label_id = 1u32;
985        let props = vec![(0u32, Value::Int64(42)), (1u32, Value::Int64(100))];
986
987        let node_id = store.create_node(label_id, &props).unwrap();
988
989        // Verify the packed node ID.
990        let (lid, slot) = unpack_node_id(node_id);
991        assert_eq!(lid, label_id);
992        assert_eq!(slot, 0); // first node → slot 0
993
994        // Get back the values.
995        let retrieved = store.get_node(node_id, &[0, 1]).unwrap();
996        assert_eq!(retrieved.len(), 2);
997        assert_eq!(retrieved[0], (0, Value::Int64(42)));
998        assert_eq!(retrieved[1], (1, Value::Int64(100)));
999    }
1000
1001    #[test]
1002    fn test_node_multiple_nodes_sequential_slots() {
1003        let dir = tempdir().unwrap();
1004        let mut store = NodeStore::open(dir.path()).unwrap();
1005
1006        let n1 = store.create_node(1, &[(0, Value::Int64(10))]).unwrap();
1007        let n2 = store.create_node(1, &[(0, Value::Int64(20))]).unwrap();
1008        let n3 = store.create_node(1, &[(0, Value::Int64(30))]).unwrap();
1009
1010        let (_, s1) = unpack_node_id(n1);
1011        let (_, s2) = unpack_node_id(n2);
1012        let (_, s3) = unpack_node_id(n3);
1013        assert_eq!(s1, 0);
1014        assert_eq!(s2, 1);
1015        assert_eq!(s3, 2);
1016
1017        assert_eq!(store.get_node(n1, &[0]).unwrap()[0].1, Value::Int64(10));
1018        assert_eq!(store.get_node(n2, &[0]).unwrap()[0].1, Value::Int64(20));
1019        assert_eq!(store.get_node(n3, &[0]).unwrap()[0].1, Value::Int64(30));
1020    }
1021
1022    #[test]
1023    fn test_node_persists_across_reopen() {
1024        let dir = tempdir().unwrap();
1025
1026        let node_id = {
1027            let mut store = NodeStore::open(dir.path()).unwrap();
1028            store
1029                .create_node(2, &[(0, Value::Int64(999)), (1, Value::Int64(-1))])
1030                .unwrap()
1031        };
1032
1033        // Reopen store from disk.
1034        let store2 = NodeStore::open(dir.path()).unwrap();
1035        let vals = store2.get_node(node_id, &[0, 1]).unwrap();
1036        assert_eq!(vals[0].1, Value::Int64(999));
1037        assert_eq!(vals[1].1, Value::Int64(-1));
1038    }
1039
1040    #[test]
1041    fn test_node_hwm_persists_across_reopen() {
1042        let dir = tempdir().unwrap();
1043
1044        // Create 3 nodes in session 1.
1045        {
1046            let mut store = NodeStore::open(dir.path()).unwrap();
1047            store.create_node(0, &[(0, Value::Int64(1))]).unwrap();
1048            store.create_node(0, &[(0, Value::Int64(2))]).unwrap();
1049            store.create_node(0, &[(0, Value::Int64(3))]).unwrap();
1050        }
1051
1052        // Reopen and create a 4th node — must get slot 3.
1053        let mut store2 = NodeStore::open(dir.path()).unwrap();
1054        let n4 = store2.create_node(0, &[(0, Value::Int64(4))]).unwrap();
1055        let (_, slot) = unpack_node_id(n4);
1056        assert_eq!(slot, 3);
1057    }
1058
1059    #[test]
1060    fn test_node_different_labels_independent() {
1061        let dir = tempdir().unwrap();
1062        let mut store = NodeStore::open(dir.path()).unwrap();
1063
1064        let a = store.create_node(10, &[(0, Value::Int64(1))]).unwrap();
1065        let b = store.create_node(20, &[(0, Value::Int64(2))]).unwrap();
1066
1067        let (la, sa) = unpack_node_id(a);
1068        let (lb, sb) = unpack_node_id(b);
1069        assert_eq!(la, 10);
1070        assert_eq!(sa, 0);
1071        assert_eq!(lb, 20);
1072        assert_eq!(sb, 0);
1073    }
1074}