sparrowdb_storage/node_store.rs
1//! Node property storage.
2//!
3//! Nodes are stored as typed property columns. Each `(label_id, col_id)` pair
4//! maps to a flat binary file of fixed-width values. Valid `u64` values pack
5//! `(label_id: u32, slot: u32)` into a single `u64` node ID consistent with
6//! [`sparrowdb_common::NodeId`] semantics.
7//!
8//! ## File layout
9//!
10//! ```text
11//! nodes/{label_id}/col_{col_id}.bin
12//! ```
13//!
14//! Each column file is a flat array of `u64` LE values (one per slot).
15//! The high-water mark is tracked in-memory and written to a small header file:
16//!
17//! ```text
18//! nodes/{label_id}/hwm.bin — [hwm: u64 LE]
19//! ```
20//!
21//! ## Node ID packing
22//!
23//! ```text
24//! node_id = (label_id as u64) << 32 | slot as u64
25//! ```
26//!
27//! Upper 32 bits are `label_id`, lower 32 bits are the within-label slot number.
28
29use std::collections::{HashMap, HashSet};
30use std::fs;
31use std::io::{Read, Seek, SeekFrom};
32use std::path::{Path, PathBuf};
33
34use sparrowdb_common::{Error, NodeId, Result};
35
36// ── Value type ────────────────────────────────────────────────────────────────
37
38/// A typed property value.
39#[derive(Debug, Clone, PartialEq)]
40pub enum Value {
41 /// Signed 64-bit integer, stored as raw `u64` bits (two's-complement).
42 Int64(i64),
43 /// Raw byte blob, stored as a fixed-width 8-byte reference in v1.
44 /// The actual bytes are placed inline for values ≤ 8 bytes; longer blobs
45 /// are truncated and marked with a sentinel in v1 (overflow deferred).
46 Bytes(Vec<u8>),
47 /// IEEE-754 double-precision float. Stored as 8 raw bytes in the overflow
48 /// heap so that no bits are masked by the type-tag scheme (SPA-267).
49 Float(f64),
50}
51
52/// Type tag embedded in the top byte (byte index 7 in LE) of a stored `u64`.
53///
54/// - `0x00` = `Int64` — lower 7 bytes hold the signed integer (56-bit range).
55/// - `0x01` = `Bytes` — lower 7 bytes hold up to 7 bytes of inline string data.
56/// - `0x02` = `BytesOverflow` — lower 7 bytes encode `(offset: u40 LE, len: u16 LE)`
57/// pointing into `strings.bin` (SPA-212).
58/// - `0x03` = `Float` — lower 7 bytes encode `(offset: u40 LE, len: u16 LE)`
59/// pointing into `strings.bin` where 8 raw IEEE-754 bytes are stored (SPA-267).
60///
61/// The overflow encoding packs a heap pointer into 7 bytes:
62/// bytes[0..5] = heap byte offset (u40 LE, max ~1 TiB)
63/// bytes[5..7] = byte length (u16 LE, max 65535 bytes)
64///
65/// This lets `decode_raw_value` reconstruct strings of any length, fixing SPA-212.
66const TAG_INT64: u8 = 0x00;
67const TAG_BYTES: u8 = 0x01;
68/// Tag for strings > 7 bytes stored in the overflow string heap (SPA-212).
69const TAG_BYTES_OVERFLOW: u8 = 0x02;
70/// Tag for f64 values stored as 8 raw IEEE-754 bytes in the overflow heap (SPA-267).
71/// Using the heap ensures all 64 bits of the float are preserved without any masking.
72const TAG_FLOAT: u8 = 0x03;
73/// Maximum bytes that fit inline in the 7-byte payload (one byte is the tag).
74const MAX_INLINE_BYTES: usize = 7;
75
76impl Value {
77 /// Encode as a packed `u64` for column storage.
78 ///
79 /// The top byte (byte 7 in little-endian) is a type tag; the remaining
80 /// 7 bytes carry the payload. This allows `from_u64` to reconstruct the
81 /// correct variant at read time (SPA-169).
82 ///
83 /// For `Bytes` values that exceed 7 bytes, this method only encodes the
84 /// first 7 bytes inline. Callers that need full overflow support must use
85 /// [`NodeStore::encode_value`] instead, which writes long strings to the
86 /// heap and returns an overflow-tagged u64 (SPA-212).
87 ///
88 /// # Int64 range
89 /// Only the lower 56 bits of the integer are stored. This covers all
90 /// practical node IDs and numeric property values; very large i64 values
91 /// (> 2^55 or < -2^55) would be truncated. Full 64-bit range is deferred
92 /// to a later overflow encoding.
93 pub fn to_u64(&self) -> u64 {
94 match self {
95 Value::Int64(v) => {
96 // Top byte = TAG_INT64 (0x00); lower 7 bytes = lower 56 bits of v.
97 // For TAG_INT64 = 0x00 this is just the value masked to 56 bits,
98 // which is a no-op for any i64 whose top byte is already 0x00.
99 let payload = (*v as u64) & 0x00FF_FFFF_FFFF_FFFF;
100 // Tag byte goes into byte 7 (the most significant byte in LE).
101 payload | ((TAG_INT64 as u64) << 56)
102 }
103 Value::Bytes(b) => {
104 let mut arr = [0u8; 8];
105 arr[7] = TAG_BYTES; // type tag in top byte
106 let len = b.len().min(MAX_INLINE_BYTES);
107 arr[..len].copy_from_slice(&b[..len]);
108 u64::from_le_bytes(arr)
109 }
110 Value::Float(_) => {
111 // Float values require heap storage — callers must use
112 // NodeStore::encode_value instead of Value::to_u64.
113 panic!("Value::Float cannot be inline-encoded; use NodeStore::encode_value");
114 }
115 }
116 }
117
118 /// Reconstruct a `Value` from a stored `u64`, using the top byte as a
119 /// type tag (SPA-169).
120 ///
121 /// Only handles inline encodings (`TAG_INT64` and `TAG_BYTES`).
122 /// For overflow strings (`TAG_BYTES_OVERFLOW`), use [`NodeStore::decode_raw_value`]
123 /// which has access to the string heap (SPA-212).
124 pub fn from_u64(v: u64) -> Self {
125 let bytes = v.to_le_bytes(); // bytes[7] = top byte = tag
126 match bytes[7] {
127 TAG_BYTES => {
128 // Inline string: bytes[0..7] hold the data; strip trailing zeros.
129 let data: Vec<u8> = bytes[..7].iter().copied().take_while(|&b| b != 0).collect();
130 Value::Bytes(data)
131 }
132 _ => {
133 // TAG_INT64 (0x00) or any unrecognised tag → Int64.
134 // Sign-extend from 56 bits: shift left 8 to bring bit 55 into
135 // sign position, then arithmetic shift right 8.
136 let shifted = (v << 8) as i64;
137 Value::Int64(shifted >> 8)
138 }
139 }
140 }
141
142 /// Reconstruct an `Int64` value from a stored `u64`.
143 ///
144 /// Preserved for callers that know the column type is always Int64 (e.g.
145 /// pre-SPA-169 paths). New code should prefer `from_u64`.
146 pub fn int64_from_u64(v: u64) -> Self {
147 Value::Int64(v as i64)
148 }
149}
150
151// ── NodeStore ─────────────────────────────────────────────────────────────────
152
153/// Persistent node property store rooted at a database directory.
154///
155/// On-disk layout:
156/// ```text
157/// {root}/nodes/{label_id}/hwm.bin — high-water mark (u64 LE)
158/// {root}/nodes/{label_id}/col_{col_id}.bin — flat u64 column array
159/// {root}/strings.bin — overflow string heap (SPA-212)
160/// ```
161///
162/// The overflow heap is an append-only byte file. Each entry is a raw byte
163/// sequence (no length prefix); the offset and length are encoded into the
164/// `TAG_BYTES_OVERFLOW` u64 stored in the column file.
165pub struct NodeStore {
166 root: PathBuf,
167 /// In-memory high-water marks per label. Loaded lazily from disk.
168 hwm: HashMap<u32, u64>,
169 /// Labels whose in-memory HWM has been advanced but not yet persisted to
170 /// `hwm.bin`. Flushed atomically by [`flush_hwms`] at transaction commit.
171 hwm_dirty: HashSet<u32>,
172}
173
174impl NodeStore {
175 /// Open (or create) a node store rooted at `db_root`.
176 pub fn open(db_root: &Path) -> Result<Self> {
177 Ok(NodeStore {
178 root: db_root.to_path_buf(),
179 hwm: HashMap::new(),
180 hwm_dirty: HashSet::new(),
181 })
182 }
183
184 /// Return the root directory path of this node store.
185 pub fn root_path(&self) -> &Path {
186 &self.root
187 }
188
189 // ── Internal helpers ──────────────────────────────────────────────────────
190
191 fn label_dir(&self, label_id: u32) -> PathBuf {
192 self.root.join("nodes").join(label_id.to_string())
193 }
194
195 fn hwm_path(&self, label_id: u32) -> PathBuf {
196 self.label_dir(label_id).join("hwm.bin")
197 }
198
199 fn col_path(&self, label_id: u32, col_id: u32) -> PathBuf {
200 self.label_dir(label_id).join(format!("col_{col_id}.bin"))
201 }
202
203 /// Path to the null-bitmap sidecar for a column (SPA-207).
204 ///
205 /// Bit N = 1 means slot N has a real value (present/non-null).
206 /// Bit N = 0 means slot N was zero-padded (absent/null).
207 fn null_bitmap_path(&self, label_id: u32, col_id: u32) -> PathBuf {
208 self.label_dir(label_id)
209 .join(format!("col_{col_id}_null.bin"))
210 }
211
212 /// Mark slot `slot` as present (has a real value) in the null bitmap (SPA-207).
213 fn set_null_bit(&self, label_id: u32, col_id: u32, slot: u32) -> Result<()> {
214 let path = self.null_bitmap_path(label_id, col_id);
215 if let Some(parent) = path.parent() {
216 fs::create_dir_all(parent).map_err(Error::Io)?;
217 }
218 let byte_idx = (slot / 8) as usize;
219 let bit_idx = slot % 8;
220 let mut bits = if path.exists() {
221 fs::read(&path).map_err(Error::Io)?
222 } else {
223 vec![]
224 };
225 if bits.len() <= byte_idx {
226 bits.resize(byte_idx + 1, 0);
227 }
228 bits[byte_idx] |= 1 << bit_idx;
229 fs::write(&path, &bits).map_err(Error::Io)
230 }
231
232 /// Returns `true` if slot `slot` has a real value (present/non-null) (SPA-207).
233 ///
234 /// Backward-compatible: if no bitmap file exists (old data written before
235 /// the null-bitmap fix), every slot is treated as present.
236 fn get_null_bit(&self, label_id: u32, col_id: u32, slot: u32) -> Result<bool> {
237 let path = self.null_bitmap_path(label_id, col_id);
238 if !path.exists() {
239 // No bitmap file → backward-compatible: treat all slots as present.
240 return Ok(true);
241 }
242 let bits = fs::read(&path).map_err(Error::Io)?;
243 let byte_idx = (slot / 8) as usize;
244 if byte_idx >= bits.len() {
245 return Ok(false);
246 }
247 Ok((bits[byte_idx] >> (slot % 8)) & 1 == 1)
248 }
249
250 /// Path to the overflow string heap (shared across all labels).
251 fn strings_bin_path(&self) -> PathBuf {
252 self.root.join("strings.bin")
253 }
254
255 // ── Overflow string heap (SPA-212) ────────────────────────────────────────
256
257 /// Append `bytes` to the overflow string heap and return an
258 /// `TAG_BYTES_OVERFLOW`-tagged `u64` encoding the (offset, len) pair.
259 ///
260 /// Layout of the returned `u64` (little-endian bytes):
261 /// bytes[0..5] = heap byte offset as u40 LE (max ~1 TiB)
262 /// bytes[5..7] = byte length as u16 LE (max 65 535 bytes)
263 /// bytes[7] = `TAG_BYTES_OVERFLOW` (0x02)
264 fn append_to_string_heap(&self, bytes: &[u8]) -> Result<u64> {
265 use std::io::{Seek, SeekFrom, Write};
266 let path = self.strings_bin_path();
267 let mut file = fs::OpenOptions::new()
268 .create(true)
269 .truncate(false)
270 .append(false)
271 .read(true)
272 .write(true)
273 .open(&path)
274 .map_err(Error::Io)?;
275
276 // The heap offset is the current end of the file.
277 let offset = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
278 file.write_all(bytes).map_err(Error::Io)?;
279
280 // Encode (offset, len) into a 7-byte payload.
281 let len = bytes.len() as u64;
282 debug_assert!(
283 offset <= 0x00FF_FFFF_FFFF_u64,
284 "string heap too large for 5-byte offset"
285 );
286 debug_assert!(len <= 0xFFFF, "string longer than 65535 bytes");
287
288 let mut arr = [0u8; 8];
289 // Store offset in bytes[0..5] (40-bit LE).
290 arr[0] = offset as u8;
291 arr[1] = (offset >> 8) as u8;
292 arr[2] = (offset >> 16) as u8;
293 arr[3] = (offset >> 24) as u8;
294 arr[4] = (offset >> 32) as u8;
295 // Store len in bytes[5..7] (16-bit LE).
296 arr[5] = len as u8;
297 arr[6] = (len >> 8) as u8;
298 // Tag byte.
299 arr[7] = TAG_BYTES_OVERFLOW;
300 Ok(u64::from_le_bytes(arr))
301 }
302
303 /// Read string bytes from the overflow heap given an `TAG_BYTES_OVERFLOW`
304 /// tagged `u64` produced by [`append_to_string_heap`].
305 ///
306 /// Uses `seek + read_exact` to load only the `len` bytes at `offset`,
307 /// avoiding a full file load regardless of `strings.bin` size (SPA-208).
308 fn read_from_string_heap(&self, tagged: u64) -> Result<Vec<u8>> {
309 use std::io::{Read, Seek, SeekFrom};
310
311 let arr = tagged.to_le_bytes();
312 debug_assert_eq!(arr[7], TAG_BYTES_OVERFLOW, "not an overflow pointer");
313
314 // Decode (offset, len).
315 let offset = arr[0] as u64
316 | ((arr[1] as u64) << 8)
317 | ((arr[2] as u64) << 16)
318 | ((arr[3] as u64) << 24)
319 | ((arr[4] as u64) << 32);
320 let len = arr[5] as usize | ((arr[6] as usize) << 8);
321
322 let path = self.strings_bin_path();
323 let mut file = fs::File::open(&path).map_err(Error::Io)?;
324 file.seek(SeekFrom::Start(offset)).map_err(|e| {
325 Error::Corruption(format!("string heap seek failed (offset={offset}): {e}"))
326 })?;
327 let mut buf = vec![0u8; len];
328 file.read_exact(&mut buf).map_err(|e| {
329 Error::Corruption(format!(
330 "string heap too short: need {} bytes at offset {offset}: {e}",
331 len
332 ))
333 })?;
334 Ok(buf)
335 }
336
337 // ── Value encode / decode with overflow support ───────────────────────────
338
339 /// Encode a `Value` for column storage, writing long `Bytes` strings to
340 /// the overflow heap (SPA-212).
341 ///
342 /// - `Int64` → identical to `Value::to_u64()`.
343 /// - `Bytes` ≤ 7 B → inline `TAG_BYTES` encoding, identical to `Value::to_u64()`.
344 /// - `Bytes` > 7 B → appended to `strings.bin`; returns `TAG_BYTES_OVERFLOW` u64.
345 /// - `Float` → 8 raw IEEE-754 bytes appended to `strings.bin`;
346 /// returns a `TAG_FLOAT` u64 so all 64 float bits are preserved (SPA-267).
347 pub fn encode_value(&self, val: &Value) -> Result<u64> {
348 match val {
349 Value::Int64(_) => Ok(val.to_u64()),
350 Value::Bytes(b) if b.len() <= MAX_INLINE_BYTES => Ok(val.to_u64()),
351 Value::Bytes(b) => self.append_to_string_heap(b),
352 // SPA-267: store all 8 float bytes in the heap so no bits are masked.
353 // The heap pointer uses the same (offset: u40, len: u16) layout as
354 // TAG_BYTES_OVERFLOW but with TAG_FLOAT in byte 7.
355 Value::Float(f) => {
356 let bits = f.to_bits().to_le_bytes();
357 let heap_tagged = self.append_to_string_heap(&bits)?;
358 // Replace the TAG_BYTES_OVERFLOW tag byte with TAG_FLOAT.
359 let payload = heap_tagged & 0x00FF_FFFF_FFFF_FFFF;
360 Ok((TAG_FLOAT as u64) << 56 | payload)
361 }
362 }
363 }
364
365 /// Decode a raw `u64` column value back to a `Value`, reading the
366 /// overflow string heap when the tag is `TAG_BYTES_OVERFLOW` or `TAG_FLOAT` (SPA-212, SPA-267).
367 ///
368 /// Handles all four tags:
369 /// - `TAG_INT64` → `Value::Int64`
370 /// - `TAG_BYTES` → `Value::Bytes` (inline, ≤ 7 bytes)
371 /// - `TAG_BYTES_OVERFLOW` → `Value::Bytes` (from heap)
372 /// - `TAG_FLOAT` → `Value::Float` (8 raw IEEE-754 bytes from heap)
373 pub fn decode_raw_value(&self, raw: u64) -> Value {
374 let tag = (raw >> 56) as u8;
375 match tag {
376 TAG_BYTES_OVERFLOW => match self.read_from_string_heap(raw) {
377 Ok(bytes) => Value::Bytes(bytes),
378 Err(e) => {
379 // Corruption fallback: return empty bytes and log.
380 eprintln!(
381 "WARN: failed to read overflow string from heap (raw={raw:#018x}): {e}"
382 );
383 Value::Bytes(Vec::new())
384 }
385 },
386 // SPA-267: float values are stored as 8-byte IEEE-754 blobs in the heap.
387 // Reconstruct the heap pointer by swapping TAG_FLOAT → TAG_BYTES_OVERFLOW
388 // so read_from_string_heap can locate the bytes.
389 TAG_FLOAT => {
390 let payload = raw & 0x00FF_FFFF_FFFF_FFFF;
391 let heap_tagged = (TAG_BYTES_OVERFLOW as u64) << 56 | payload;
392 match self.read_from_string_heap(heap_tagged) {
393 Ok(bytes) if bytes.len() == 8 => {
394 let arr: [u8; 8] = bytes.try_into().unwrap();
395 Value::Float(f64::from_bits(u64::from_le_bytes(arr)))
396 }
397 Ok(bytes) => {
398 eprintln!(
399 "WARN: float heap blob has unexpected length {} (raw={raw:#018x})",
400 bytes.len()
401 );
402 Value::Float(f64::NAN)
403 }
404 Err(e) => {
405 eprintln!("WARN: failed to read float from heap (raw={raw:#018x}): {e}");
406 Value::Float(f64::NAN)
407 }
408 }
409 }
410 _ => Value::from_u64(raw),
411 }
412 }
413
414 /// Check whether a raw stored `u64` encodes a string equal to `s`.
415 ///
416 /// Handles both inline (`TAG_BYTES`) and overflow (`TAG_BYTES_OVERFLOW`)
417 /// encodings (SPA-212). Used by WHERE-clause and prop-filter comparison.
418 pub fn raw_str_matches(&self, raw: u64, s: &str) -> bool {
419 let tag = (raw >> 56) as u8;
420 match tag {
421 TAG_BYTES => {
422 // Fast inline comparison: encode s the same way and compare u64s.
423 raw == Value::Bytes(s.as_bytes().to_vec()).to_u64()
424 }
425 TAG_BYTES_OVERFLOW => {
426 // Overflow: read from heap and compare bytes.
427 match self.read_from_string_heap(raw) {
428 Ok(bytes) => bytes == s.as_bytes(),
429 Err(_) => false,
430 }
431 }
432 _ => false, // INT64 or unknown — not a string
433 }
434 }
435
436 /// Read the high-water mark for `label_id` from disk (or return 0).
437 ///
438 /// Recovery path (SPA-211): if `hwm.bin` is missing or corrupt but
439 /// `hwm.bin.tmp` exists (leftover from a previous crashed write), we
440 /// promote the tmp file to `hwm.bin` and use its value.
441 fn load_hwm(&self, label_id: u32) -> Result<u64> {
442 let path = self.hwm_path(label_id);
443 let tmp_path = self.hwm_tmp_path(label_id);
444
445 // Try to read the canonical file first.
446 let try_read = |p: &std::path::Path| -> Option<u64> {
447 let bytes = fs::read(p).ok()?;
448 if bytes.len() < 8 {
449 return None;
450 }
451 Some(u64::from_le_bytes(bytes[..8].try_into().unwrap()))
452 };
453
454 if path.exists() {
455 match try_read(&path) {
456 Some(v) => return Ok(v),
457 None => {
458 // hwm.bin exists but is corrupt — fall through to tmp recovery.
459 }
460 }
461 }
462
463 // hwm.bin is absent or unreadable. Check for a tmp leftover.
464 if tmp_path.exists() {
465 if let Some(v) = try_read(&tmp_path) {
466 // Promote: atomically rename tmp → canonical so the next open
467 // is clean even if we crash again immediately here.
468 let _ = fs::rename(&tmp_path, &path);
469 return Ok(v);
470 }
471 }
472
473 // Last resort: infer the HWM from the sizes of the column files.
474 //
475 // Each `col_{n}.bin` is a flat array of 8-byte u64 LE values, one per
476 // slot. The number of slots written equals `file_len / 8`. If
477 // `hwm.bin` is corrupt we can reconstruct the HWM as the maximum slot
478 // count across all column files for this label.
479 {
480 let inferred = self.infer_hwm_from_cols(label_id);
481 if inferred > 0 {
482 // Persist the recovered HWM so the next open is clean.
483 let _ = self.save_hwm(label_id, inferred);
484 return Ok(inferred);
485 }
486 }
487
488 // No usable file at all — fresh label, HWM is 0.
489 Ok(0)
490 }
491
492 /// Infer the high-water mark for `label_id` from the sizes of the column
493 /// files on disk. Returns 0 if no column files exist or none are readable.
494 ///
495 /// Each `col_{n}.bin` stores one u64 per slot, so `file_len / 8` gives the
496 /// slot count. We return the maximum over all columns.
497 fn infer_hwm_from_cols(&self, label_id: u32) -> u64 {
498 let dir = self.label_dir(label_id);
499 let read_dir = match fs::read_dir(&dir) {
500 Ok(rd) => rd,
501 Err(_) => return 0,
502 };
503 read_dir
504 .flatten()
505 .filter_map(|entry| {
506 let name = entry.file_name();
507 let name_str = name.to_string_lossy().into_owned();
508 // Only consider col_{n}.bin files.
509 name_str.strip_prefix("col_")?.strip_suffix(".bin")?;
510 let meta = entry.metadata().ok()?;
511 Some(meta.len() / 8)
512 })
513 .max()
514 .unwrap_or(0)
515 }
516
517 /// Write the high-water mark for `label_id` to disk atomically.
518 ///
519 /// Strategy (SPA-211): write to `hwm.bin.tmp`, fsync, then rename to
520 /// `hwm.bin`. On POSIX, `rename(2)` is atomic, so a crash at any point
521 /// leaves either the old `hwm.bin` intact or the fully-written new one.
522 fn save_hwm(&self, label_id: u32, hwm: u64) -> Result<()> {
523 use std::io::Write as _;
524
525 let path = self.hwm_path(label_id);
526 let tmp_path = self.hwm_tmp_path(label_id);
527
528 if let Some(parent) = path.parent() {
529 fs::create_dir_all(parent).map_err(Error::Io)?;
530 }
531
532 // Write to the tmp file and fsync before renaming.
533 {
534 let mut file = fs::File::create(&tmp_path).map_err(Error::Io)?;
535 file.write_all(&hwm.to_le_bytes()).map_err(Error::Io)?;
536 file.sync_all().map_err(Error::Io)?;
537 }
538
539 // Atomic rename: on success the old hwm.bin is replaced in one syscall.
540 fs::rename(&tmp_path, &path).map_err(Error::Io)
541 }
542
543 fn hwm_tmp_path(&self, label_id: u32) -> PathBuf {
544 self.label_dir(label_id).join("hwm.bin.tmp")
545 }
546
547 /// Persist all dirty in-memory HWMs to disk atomically.
548 ///
549 /// Called **once per transaction commit** rather than once per node creation,
550 /// so that bulk imports do not incur one fsync per node (SPA-217 regression fix).
551 ///
552 /// Each dirty label's HWM is written via the same tmp+fsync+rename strategy
553 /// used by [`save_hwm`], preserving the SPA-211 crash-safety guarantee.
554 /// After all writes succeed the dirty set is cleared.
555 pub fn flush_hwms(&mut self) -> Result<()> {
556 let dirty: Vec<u32> = self.hwm_dirty.iter().copied().collect();
557 for label_id in dirty {
558 let hwm = match self.hwm.get(&label_id) {
559 Some(&v) => v,
560 None => continue,
561 };
562 self.save_hwm(label_id, hwm)?;
563 }
564 self.hwm_dirty.clear();
565 Ok(())
566 }
567
568 /// Append a `u64` value to a column file.
569 fn append_col(&self, label_id: u32, col_id: u32, slot: u32, value: u64) -> Result<()> {
570 use std::io::{Seek, SeekFrom, Write};
571 let path = self.col_path(label_id, col_id);
572 if let Some(parent) = path.parent() {
573 fs::create_dir_all(parent).map_err(Error::Io)?;
574 }
575 // Open without truncation so we can inspect the current length.
576 let mut file = fs::OpenOptions::new()
577 .create(true)
578 .truncate(false)
579 .read(true)
580 .write(true)
581 .open(&path)
582 .map_err(Error::Io)?;
583
584 // Pad with zeros for any slots that were skipped (sparse write pattern).
585 // Without padding, a later slot's value would be written at offset 0,
586 // causing earlier slots to incorrectly read that value.
587 let existing_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
588 let expected_offset = slot as u64 * 8;
589 if existing_len < expected_offset {
590 file.seek(SeekFrom::Start(existing_len))
591 .map_err(Error::Io)?;
592 const CHUNK: usize = 65536;
593 let zeros = [0u8; CHUNK];
594 let mut remaining = (expected_offset - existing_len) as usize;
595 while remaining > 0 {
596 let n = remaining.min(CHUNK);
597 file.write_all(&zeros[..n]).map_err(Error::Io)?;
598 remaining -= n;
599 }
600 }
601
602 // Seek to the correct slot position and write the value.
603 file.seek(SeekFrom::Start(expected_offset))
604 .map_err(Error::Io)?;
605 file.write_all(&value.to_le_bytes()).map_err(Error::Io)
606 }
607
608 /// Read the `u64` stored at `slot` in the given column file.
609 ///
610 /// Returns `Ok(0)` when the column file does not exist yet — a missing file
611 /// means no value has ever been written for this `(label_id, col_id)` pair,
612 /// which is represented as the zero bit-pattern (SPA-166).
613 fn read_col_slot(&self, label_id: u32, col_id: u32, slot: u32) -> Result<u64> {
614 let path = self.col_path(label_id, col_id);
615 let bytes = match fs::read(&path) {
616 Ok(b) => b,
617 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
618 Err(e) => return Err(Error::Io(e)),
619 };
620 let offset = slot as usize * 8;
621 if bytes.len() < offset + 8 {
622 return Err(Error::NotFound);
623 }
624 Ok(u64::from_le_bytes(
625 bytes[offset..offset + 8].try_into().unwrap(),
626 ))
627 }
628
629 // ── Public API ────────────────────────────────────────────────────────────
630
631 /// Return the high-water mark (slot count) for a label.
632 ///
633 /// Returns `0` if no nodes have been created for that label yet.
634 pub fn hwm_for_label(&self, label_id: u32) -> Result<u64> {
635 if let Some(&h) = self.hwm.get(&label_id) {
636 return Ok(h);
637 }
638 self.load_hwm(label_id)
639 }
640
641 /// Discover all column IDs that currently exist on disk for `label_id`.
642 ///
643 /// Scans the label directory for `col_{id}.bin` files and returns the
644 /// parsed `col_id` values. Used by `create_node` to zero-pad columns
645 /// that are not supplied for a new node (SPA-187).
646 ///
647 /// Returns `Err` when the directory exists but cannot be read (e.g.
648 /// permissions failure or I/O error). A missing directory is not an
649 /// error — it simply means no nodes of this label have been created yet.
650 pub fn col_ids_for_label(&self, label_id: u32) -> Result<Vec<u32>> {
651 self.existing_col_ids(label_id)
652 }
653
654 fn existing_col_ids(&self, label_id: u32) -> Result<Vec<u32>> {
655 let dir = self.label_dir(label_id);
656 let read_dir = match fs::read_dir(&dir) {
657 Ok(rd) => rd,
658 // Directory does not exist yet → no columns on disk.
659 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
660 Err(e) => return Err(Error::Io(e)),
661 };
662 let mut ids: Vec<u32> = read_dir
663 .flatten()
664 .filter_map(|entry| {
665 let name = entry.file_name();
666 let name_str = name.to_string_lossy().into_owned();
667 // Match "col_{col_id}.bin" filenames.
668 let id_str = name_str.strip_prefix("col_")?.strip_suffix(".bin")?;
669 id_str.parse::<u32>().ok()
670 })
671 .collect();
672 ids.sort_unstable();
673 Ok(ids)
674 }
675
676 /// Return the **on-disk** high-water mark for a label, bypassing any
677 /// in-memory advances made by `peek_next_slot`.
678 ///
679 /// Used by [`WriteTx::merge_node`] to limit the disk scan to only slots
680 /// that have actually been persisted.
681 pub fn disk_hwm_for_label(&self, label_id: u32) -> Result<u64> {
682 self.load_hwm(label_id)
683 }
684
685 /// Reserve the slot index that the *next* `create_node` call will use for
686 /// `label_id`, advancing the in-memory HWM so that the slot is not
687 /// assigned again within the same [`NodeStore`] instance.
688 ///
689 /// This is used by [`WriteTx::create_node`] to pre-compute a [`NodeId`]
690 /// before the actual disk write, so the ID can be returned to the caller
691 /// while the write is deferred until commit (SPA-181).
692 ///
693 /// The on-disk HWM is **not** updated here; it is updated when the
694 /// buffered `NodeCreate` operation is applied in `commit()`.
695 pub fn peek_next_slot(&mut self, label_id: u32) -> Result<u32> {
696 // Load from disk if not cached yet.
697 if !self.hwm.contains_key(&label_id) {
698 let h = self.load_hwm(label_id)?;
699 self.hwm.insert(label_id, h);
700 }
701 let h = *self.hwm.get(&label_id).unwrap();
702 // Advance the in-memory HWM so a subsequent peek returns the next slot.
703 self.hwm.insert(label_id, h + 1);
704 Ok(h as u32)
705 }
706
707 /// Write a node at a pre-reserved `slot` (SPA-181 commit path).
708 ///
709 /// Like [`create_node`] but uses the caller-specified `slot` index instead
710 /// of deriving it from the HWM. Used by [`WriteTx::commit`] to flush
711 /// buffered node-create operations in the exact order they were issued,
712 /// with slots that were already pre-allocated by [`peek_next_slot`].
713 ///
714 /// Advances the on-disk HWM to `slot + 1` (or higher if already past that).
715 pub fn create_node_at_slot(
716 &mut self,
717 label_id: u32,
718 slot: u32,
719 props: &[(u32, Value)],
720 ) -> Result<NodeId> {
721 // Snapshot original column sizes for rollback on partial failure.
722 let original_sizes: Vec<(u32, u64)> = props
723 .iter()
724 .map(|&(col_id, _)| {
725 let path = self.col_path(label_id, col_id);
726 let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
727 (col_id, size)
728 })
729 .collect();
730
731 let write_result = (|| {
732 for &(col_id, ref val) in props {
733 self.append_col(label_id, col_id, slot, self.encode_value(val)?)?;
734 // Mark this slot as present in the null bitmap (SPA-207).
735 self.set_null_bit(label_id, col_id, slot)?;
736 }
737 Ok::<(), sparrowdb_common::Error>(())
738 })();
739
740 if let Err(e) = write_result {
741 for (col_id, original_size) in &original_sizes {
742 let path = self.col_path(label_id, *col_id);
743 if path.exists() {
744 if let Err(rollback_err) = fs::OpenOptions::new()
745 .write(true)
746 .open(&path)
747 .and_then(|f| f.set_len(*original_size))
748 {
749 eprintln!(
750 "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
751 path.display(),
752 original_size,
753 rollback_err
754 );
755 }
756 }
757 }
758 return Err(e);
759 }
760
761 // Advance the in-memory HWM to at least slot + 1 and mark the label
762 // dirty so that flush_hwms() will persist it at commit boundary.
763 //
764 // We do NOT call save_hwm here. The caller (WriteTx::commit) is
765 // responsible for calling flush_hwms() once after all PendingOp::NodeCreate
766 // entries have been applied. This avoids one fsync per node during bulk
767 // imports (SPA-217 regression fix) while preserving crash-safety: the WAL
768 // record is already durable at this point, so recovery can reconstruct the
769 // HWM if we crash before flush_hwms() completes.
770 //
771 // NOTE: peek_next_slot() may have already advanced self.hwm to slot+1,
772 // so new_hwm > mem_hwm might be false. We mark the label dirty
773 // unconditionally so that flush_hwms() always writes through to disk.
774 let new_hwm = slot as u64 + 1;
775 let mem_hwm = self.hwm.get(&label_id).copied().unwrap_or(0);
776 if new_hwm > mem_hwm {
777 self.hwm.insert(label_id, new_hwm);
778 }
779 // Always mark dirty — flush_hwms() must write the post-commit HWM even
780 // if peek_next_slot already set mem HWM == new_hwm.
781 self.hwm_dirty.insert(label_id);
782
783 Ok(NodeId((label_id as u64) << 32 | slot as u64))
784 }
785
786 /// Batch-write column data for multiple nodes created in a single transaction
787 /// commit (SPA-212 write-amplification fix).
788 ///
789 /// # Why this exists
790 ///
791 /// The naive path calls `create_node_at_slot` per node, which opens and
792 /// closes every column file once per node. For a transaction that creates
793 /// `N` nodes each with `C` columns, that is `O(N × C)` file-open/close
794 /// syscalls.
795 ///
796 /// This method instead:
797 /// 1. Accepts pre-encoded `(label_id, col_id, slot, raw_value, is_present)`
798 /// tuples from the caller (value encoding happens in `commit()` before
799 /// the call).
800 /// 2. Sorts by `(label_id, col_id)` so all writes to the same column file
801 /// are contiguous.
802 /// 3. Opens each `(label_id, col_id)` file exactly **once**, writes all
803 /// slots for that column, then closes it — reducing file opens to
804 /// `O(labels × cols)`.
805 /// 4. Updates each null-bitmap file once per `(label_id, col_id)` group.
806 ///
807 /// HWM advances are applied for every `(label_id, slot)` in `node_slots`,
808 /// exactly as `create_node_at_slot` would do them. `node_slots` must
809 /// include **all** created nodes — including those with zero properties —
810 /// so that the HWM is advanced even for property-less nodes.
811 ///
812 /// # Rollback
813 ///
814 /// On I/O failure the method truncates every file that was opened back to
815 /// its pre-call size, matching the rollback contract of
816 /// `create_node_at_slot`.
817 pub fn batch_write_node_creates(
818 &mut self,
819 // (label_id, col_id, slot, raw_u64, is_present)
820 mut writes: Vec<(u32, u32, u32, u64, bool)>,
821 // All (label_id, slot) pairs for every created node, including those
822 // with zero properties.
823 node_slots: &[(u32, u32)],
824 ) -> Result<()> {
825 use std::io::{Seek, SeekFrom, Write};
826
827 if writes.is_empty() && node_slots.is_empty() {
828 return Ok(());
829 }
830
831 // Sort by (label_id, col_id, slot) so all writes to the same file are
832 // contiguous.
833 writes.sort_unstable_by_key(|&(lid, cid, slot, _, _)| (lid, cid, slot));
834
835 // Snapshot original sizes for rollback. We only need one entry per
836 // (label_id, col_id) pair.
837 let mut original_sizes: Vec<(u32, u32, PathBuf, u64)> = Vec::new();
838 {
839 let mut prev_key: Option<(u32, u32)> = None;
840 for &(lid, cid, _, _, _) in &writes {
841 let key = (lid, cid);
842 if prev_key == Some(key) {
843 continue;
844 }
845 prev_key = Some(key);
846 let path = self.col_path(lid, cid);
847 let original = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
848 original_sizes.push((lid, cid, path, original));
849 }
850 }
851 // Also snapshot null-bitmap files (one per col group).
852 let mut bitmap_originals: Vec<(u32, u32, PathBuf, u64)> = Vec::new();
853 {
854 let mut prev_key: Option<(u32, u32)> = None;
855 for &(lid, cid, _, _, is_present) in &writes {
856 if !is_present {
857 continue;
858 }
859 let key = (lid, cid);
860 if prev_key == Some(key) {
861 continue;
862 }
863 prev_key = Some(key);
864 let path = self.null_bitmap_path(lid, cid);
865 let original = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
866 bitmap_originals.push((lid, cid, path, original));
867 }
868 }
869
870 let write_result = (|| -> Result<()> {
871 let mut i = 0;
872 while i < writes.len() {
873 let (lid, cid, _, _, _) = writes[i];
874
875 // Find the end of this (label_id, col_id) group.
876 let group_start = i;
877 while i < writes.len() && writes[i].0 == lid && writes[i].1 == cid {
878 i += 1;
879 }
880 let group = &writes[group_start..i];
881
882 // ── Column file ──────────────────────────────────────────────
883 let path = self.col_path(lid, cid);
884 if let Some(parent) = path.parent() {
885 fs::create_dir_all(parent).map_err(Error::Io)?;
886 }
887 let mut file = fs::OpenOptions::new()
888 .create(true)
889 .truncate(false)
890 .read(true)
891 .write(true)
892 .open(&path)
893 .map_err(Error::Io)?;
894
895 let mut current_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
896
897 for &(_, _, slot, raw_value, _) in group {
898 let expected_offset = slot as u64 * 8;
899 // Pad with zeros for any skipped slots.
900 if current_len < expected_offset {
901 file.seek(SeekFrom::Start(current_len)).map_err(Error::Io)?;
902 const CHUNK: usize = 65536;
903 let zeros = [0u8; CHUNK];
904 let mut remaining = (expected_offset - current_len) as usize;
905 while remaining > 0 {
906 let n = remaining.min(CHUNK);
907 file.write_all(&zeros[..n]).map_err(Error::Io)?;
908 remaining -= n;
909 }
910 current_len = expected_offset;
911 }
912 // Seek to exact slot and write.
913 file.seek(SeekFrom::Start(expected_offset))
914 .map_err(Error::Io)?;
915 file.write_all(&raw_value.to_le_bytes())
916 .map_err(Error::Io)?;
917 // Advance current_len to reflect what we wrote.
918 let after = expected_offset + 8;
919 if after > current_len {
920 current_len = after;
921 }
922 }
923
924 // ── Null bitmap (one read-modify-write per col group) ────────
925 let present_slots: Vec<u32> = group
926 .iter()
927 .filter(|&&(_, _, _, _, is_present)| is_present)
928 .map(|&(_, _, slot, _, _)| slot)
929 .collect();
930
931 if !present_slots.is_empty() {
932 let bmap_path = self.null_bitmap_path(lid, cid);
933 if let Some(parent) = bmap_path.parent() {
934 fs::create_dir_all(parent).map_err(Error::Io)?;
935 }
936 let mut bits = if bmap_path.exists() {
937 fs::read(&bmap_path).map_err(Error::Io)?
938 } else {
939 vec![]
940 };
941 for slot in present_slots {
942 let byte_idx = (slot / 8) as usize;
943 let bit_idx = slot % 8;
944 if bits.len() <= byte_idx {
945 bits.resize(byte_idx + 1, 0);
946 }
947 bits[byte_idx] |= 1 << bit_idx;
948 }
949 fs::write(&bmap_path, &bits).map_err(Error::Io)?;
950 }
951 }
952 Ok(())
953 })();
954
955 if let Err(e) = write_result {
956 // Roll back column files.
957 for (_, _, path, original_size) in &original_sizes {
958 if path.exists() {
959 if let Err(rollback_err) = fs::OpenOptions::new()
960 .write(true)
961 .open(path)
962 .and_then(|f| f.set_len(*original_size))
963 {
964 eprintln!(
965 "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
966 path.display(),
967 original_size,
968 rollback_err
969 );
970 }
971 }
972 }
973 // Roll back null bitmap files.
974 for (_, _, path, original_size) in &bitmap_originals {
975 if path.exists() {
976 if let Err(rollback_err) = fs::OpenOptions::new()
977 .write(true)
978 .open(path)
979 .and_then(|f| f.set_len(*original_size))
980 {
981 eprintln!(
982 "CRITICAL: Failed to roll back null bitmap file {} to size {}: {}. Data may be corrupt.",
983 path.display(),
984 original_size,
985 rollback_err
986 );
987 }
988 }
989 }
990 return Err(e);
991 }
992
993 // Advance HWMs using the explicit node_slots list so that nodes with
994 // zero properties also advance the HWM, matching the contract of
995 // create_node_at_slot.
996 for &(lid, slot) in node_slots {
997 let new_hwm = slot as u64 + 1;
998 let mem_hwm = self.hwm.get(&lid).copied().unwrap_or(0);
999 if new_hwm > mem_hwm {
1000 self.hwm.insert(lid, new_hwm);
1001 }
1002 self.hwm_dirty.insert(lid);
1003 }
1004
1005 Ok(())
1006 }
1007
1008 /// Create a new node in `label_id` with the given properties.
1009 ///
1010 /// Returns the new [`NodeId`] packed as `(label_id << 32) | slot`.
1011 ///
1012 /// ## Slot alignment guarantee (SPA-187)
1013 ///
1014 /// Every column file for `label_id` must have exactly `node_count * 8`
1015 /// bytes so that slot N always refers to node N across all columns. When
1016 /// a node is created without a value for an already-known column, that
1017 /// column file is zero-padded to `(slot + 1) * 8` bytes. The zero
1018 /// sentinel is recognised by `read_col_slot_nullable` as "absent" and
1019 /// surfaces as `Value::Null` in query results.
1020 pub fn create_node(&mut self, label_id: u32, props: &[(u32, Value)]) -> Result<NodeId> {
1021 // Load or get cached hwm.
1022 let hwm = if let Some(h) = self.hwm.get(&label_id) {
1023 *h
1024 } else {
1025 let h = self.load_hwm(label_id)?;
1026 self.hwm.insert(label_id, h);
1027 h
1028 };
1029
1030 let slot = hwm as u32;
1031
1032 // Collect the set of col_ids supplied for this node.
1033 let supplied_col_ids: std::collections::HashSet<u32> =
1034 props.iter().map(|&(col_id, _)| col_id).collect();
1035
1036 // Discover all columns already on disk for this label. Any that are
1037 // NOT in `supplied_col_ids` must be zero-padded so their slot count
1038 // stays in sync with the HWM (SPA-187).
1039 let existing_col_ids = self.existing_col_ids(label_id)?;
1040
1041 // Columns that need zero-padding: exist on disk but not in this node's props.
1042 let cols_to_zero_pad: Vec<u32> = existing_col_ids
1043 .iter()
1044 .copied()
1045 .filter(|col_id| !supplied_col_ids.contains(col_id))
1046 .collect();
1047
1048 // Build the full list of columns to touch for rollback tracking:
1049 // supplied columns + columns that need zero-padding.
1050 let all_col_ids_to_touch: Vec<u32> = supplied_col_ids
1051 .iter()
1052 .copied()
1053 .chain(cols_to_zero_pad.iter().copied())
1054 .collect();
1055
1056 // Snapshot the original size of each column file so we can roll back
1057 // on partial failure. A column that does not yet exist has size 0.
1058 let original_sizes: Vec<(u32, u64)> = all_col_ids_to_touch
1059 .iter()
1060 .map(|&col_id| {
1061 let path = self.col_path(label_id, col_id);
1062 let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
1063 (col_id, size)
1064 })
1065 .collect();
1066
1067 // Write each property column. For columns not in `props`, write a
1068 // zero-padded entry (the zero sentinel means "absent" / NULL).
1069 // On failure, roll back all columns that were already written to avoid
1070 // slot misalignment.
1071 let write_result = (|| {
1072 // Write supplied columns with their actual values.
1073 for &(col_id, ref val) in props {
1074 self.append_col(label_id, col_id, slot, self.encode_value(val)?)?;
1075 // Mark this slot as present in the null bitmap (SPA-207).
1076 self.set_null_bit(label_id, col_id, slot)?;
1077 }
1078 // Zero-pad existing columns that were NOT supplied for this node.
1079 // Do NOT set null bitmap bits for these — they remain absent/null.
1080 for &col_id in &cols_to_zero_pad {
1081 self.append_col(label_id, col_id, slot, 0u64)?;
1082 }
1083 Ok::<(), sparrowdb_common::Error>(())
1084 })();
1085
1086 if let Err(e) = write_result {
1087 // Truncate each column back to its original size.
1088 for (col_id, original_size) in &original_sizes {
1089 let path = self.col_path(label_id, *col_id);
1090 if path.exists() {
1091 if let Err(rollback_err) = fs::OpenOptions::new()
1092 .write(true)
1093 .open(&path)
1094 .and_then(|f| f.set_len(*original_size))
1095 {
1096 eprintln!(
1097 "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
1098 path.display(),
1099 original_size,
1100 rollback_err
1101 );
1102 }
1103 }
1104 }
1105 return Err(e);
1106 }
1107
1108 // Update hwm.
1109 let new_hwm = hwm + 1;
1110 self.save_hwm(label_id, new_hwm)?;
1111 *self.hwm.get_mut(&label_id).unwrap() = new_hwm;
1112
1113 // Pack node ID.
1114 let node_id = ((label_id as u64) << 32) | (slot as u64);
1115 Ok(NodeId(node_id))
1116 }
1117
1118 /// Write a deletion tombstone (`u64::MAX`) into `col_0.bin` for `node_id`.
1119 ///
1120 /// Creates `col_0.bin` (and its parent directory) if it does not exist,
1121 /// zero-padding all preceding slots. This ensures that nodes which were
1122 /// created without any `col_0` property are still properly marked as deleted
1123 /// and become invisible to subsequent scans.
1124 ///
1125 /// Called from [`WriteTx::commit`] when flushing a buffered `NodeDelete`.
1126 pub fn tombstone_node(&self, node_id: NodeId) -> Result<()> {
1127 use std::io::{Seek, SeekFrom, Write};
1128 let label_id = (node_id.0 >> 32) as u32;
1129 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1130 let col0 = self.col_path(label_id, 0);
1131
1132 // Ensure the parent directory exists.
1133 if let Some(parent) = col0.parent() {
1134 fs::create_dir_all(parent).map_err(Error::Io)?;
1135 }
1136
1137 let mut f = fs::OpenOptions::new()
1138 .create(true)
1139 .truncate(false)
1140 .read(true)
1141 .write(true)
1142 .open(&col0)
1143 .map_err(Error::Io)?;
1144
1145 // Zero-pad any slots before `slot` that are not yet in the file.
1146 let needed_len = (slot as u64 + 1) * 8;
1147 let existing_len = f.seek(SeekFrom::End(0)).map_err(Error::Io)?;
1148 if existing_len < needed_len {
1149 let zeros = vec![0u8; (needed_len - existing_len) as usize];
1150 f.write_all(&zeros).map_err(Error::Io)?;
1151 }
1152
1153 // Seek to the slot and write the tombstone value.
1154 f.seek(SeekFrom::Start(slot as u64 * 8))
1155 .map_err(Error::Io)?;
1156 f.write_all(&u64::MAX.to_le_bytes()).map_err(Error::Io)
1157 }
1158
1159 /// Overwrite the value of a single column for an existing node.
1160 ///
1161 /// Seeks to the slot's offset within `col_{col_id}.bin` and writes the new
1162 /// 8-byte little-endian value in-place. Returns `Err(NotFound)` if the
1163 /// slot does not exist yet.
1164 pub fn set_node_col(&self, node_id: NodeId, col_id: u32, value: &Value) -> Result<()> {
1165 use std::io::{Seek, SeekFrom, Write};
1166 let label_id = (node_id.0 >> 32) as u32;
1167 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1168 let path = self.col_path(label_id, col_id);
1169 let mut file = fs::OpenOptions::new()
1170 .write(true)
1171 .open(&path)
1172 .map_err(|_| Error::NotFound)?;
1173 let offset = slot as u64 * 8;
1174 file.seek(SeekFrom::Start(offset)).map_err(Error::Io)?;
1175 file.write_all(&self.encode_value(value)?.to_le_bytes())
1176 .map_err(Error::Io)
1177 }
1178
1179 /// Write or create a column value for a node, creating and zero-padding the
1180 /// column file if it does not yet exist.
1181 ///
1182 /// Unlike [`set_node_col`], this method creates the column file and fills all
1183 /// slots from 0 to `slot - 1` with zeros before writing the target value.
1184 /// This supports adding new property columns to existing nodes (Phase 7
1185 /// `set_property` semantics).
1186 pub fn upsert_node_col(&self, node_id: NodeId, col_id: u32, value: &Value) -> Result<()> {
1187 use std::io::{Seek, SeekFrom, Write};
1188 let label_id = (node_id.0 >> 32) as u32;
1189 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1190 let path = self.col_path(label_id, col_id);
1191
1192 // Ensure parent directory exists.
1193 if let Some(parent) = path.parent() {
1194 fs::create_dir_all(parent).map_err(Error::Io)?;
1195 }
1196
1197 // Open the file (create if absent), then pad with zeros up to and
1198 // including the target slot, and finally seek back to write the value.
1199 // We must NOT truncate: we only update a specific slot, not the whole file.
1200 let mut file = fs::OpenOptions::new()
1201 .create(true)
1202 .truncate(false)
1203 .read(true)
1204 .write(true)
1205 .open(&path)
1206 .map_err(Error::Io)?;
1207
1208 // How many bytes are already in the file?
1209 let existing_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
1210 let needed_len = (slot as u64 + 1) * 8;
1211 if existing_len < needed_len {
1212 // Extend file with zeros from existing_len to needed_len.
1213 // Write in fixed-size chunks to avoid a single large allocation
1214 // that could OOM when the node slot ID is very high.
1215 file.seek(SeekFrom::Start(existing_len))
1216 .map_err(Error::Io)?;
1217 const CHUNK: usize = 65536; // 64 KiB
1218 let zeros = [0u8; CHUNK];
1219 let mut remaining = (needed_len - existing_len) as usize;
1220 while remaining > 0 {
1221 let n = remaining.min(CHUNK);
1222 file.write_all(&zeros[..n]).map_err(Error::Io)?;
1223 remaining -= n;
1224 }
1225 }
1226
1227 // Seek to target slot and overwrite.
1228 file.seek(SeekFrom::Start(slot as u64 * 8))
1229 .map_err(Error::Io)?;
1230 file.write_all(&self.encode_value(value)?.to_le_bytes())
1231 .map_err(Error::Io)
1232 }
1233
1234 /// Retrieve all stored properties of a node.
1235 ///
1236 /// Returns `(col_id, raw_u64)` pairs in the order the columns were defined.
1237 /// The caller knows the schema (col IDs) from the catalog.
1238 pub fn get_node_raw(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, u64)>> {
1239 let label_id = (node_id.0 >> 32) as u32;
1240 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1241
1242 let mut result = Vec::with_capacity(col_ids.len());
1243 for &col_id in col_ids {
1244 let val = self.read_col_slot(label_id, col_id, slot)?;
1245 result.push((col_id, val));
1246 }
1247 Ok(result)
1248 }
1249
1250 /// Like [`get_node_raw`] but treats absent columns as `None` rather than
1251 /// propagating [`Error::NotFound`].
1252 ///
1253 /// A column is considered absent when:
1254 /// - Its column file does not exist (property never written for the label).
1255 /// - Its column file is shorter than `slot + 1` entries (sparse write —
1256 /// an earlier node never wrote this column; a later node that did write it
1257 /// padded the file, but this slot's value was never explicitly stored).
1258 ///
1259 /// This is the correct read path for IS NULL evaluation: absent properties
1260 /// must appear as `Value::Null`, not as an error or as integer 0.
1261 pub fn get_node_raw_nullable(
1262 &self,
1263 node_id: NodeId,
1264 col_ids: &[u32],
1265 ) -> Result<Vec<(u32, Option<u64>)>> {
1266 let label_id = (node_id.0 >> 32) as u32;
1267 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1268
1269 let mut result = Vec::with_capacity(col_ids.len());
1270 for &col_id in col_ids {
1271 let opt_val = self.read_col_slot_nullable(label_id, col_id, slot)?;
1272 result.push((col_id, opt_val));
1273 }
1274 Ok(result)
1275 }
1276
1277 /// Read a single column slot, returning `None` when the column was never
1278 /// written for this node (file absent or slot out of bounds / not set).
1279 ///
1280 /// Unlike [`read_col_slot`], this function distinguishes between:
1281 /// - Column file does not exist → `None` (property never set on any node).
1282 /// - Slot is beyond the file end → `None` (property not set on this node).
1283 /// - Null bitmap says slot is absent → `None` (slot was zero-padded for alignment).
1284 /// - Null bitmap says slot is present → `Some(raw)` (real value, may be 0 for Int64(0)).
1285 ///
1286 /// The null-bitmap sidecar (`col_{id}_null.bin`) is used to distinguish
1287 /// legitimately-stored 0 values (e.g. `Int64(0)`) from absent/zero-padded
1288 /// slots (SPA-207). Backward compat: if no bitmap file exists, all slots
1289 /// within the file are treated as present.
1290 fn read_col_slot_nullable(&self, label_id: u32, col_id: u32, slot: u32) -> Result<Option<u64>> {
1291 let path = self.col_path(label_id, col_id);
1292 let bytes = match fs::read(&path) {
1293 Ok(b) => b,
1294 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
1295 Err(e) => return Err(Error::Io(e)),
1296 };
1297 let offset = slot as usize * 8;
1298 if bytes.len() < offset + 8 {
1299 return Ok(None);
1300 }
1301 // Use the null-bitmap sidecar to determine whether this slot has a real
1302 // value. This replaces the old raw==0 sentinel which incorrectly treated
1303 // Int64(0) as absent (SPA-207).
1304 if !self.get_null_bit(label_id, col_id, slot)? {
1305 return Ok(None);
1306 }
1307 let raw = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
1308 Ok(Some(raw))
1309 }
1310
1311 /// Retrieve the typed property values for a node.
1312 ///
1313 /// Convenience wrapper over [`get_node_raw`] that decodes every raw `u64`
1314 /// back to a `Value`, reading the overflow string heap when needed (SPA-212).
1315 pub fn get_node(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, Value)>> {
1316 let raw = self.get_node_raw(node_id, col_ids)?;
1317 Ok(raw
1318 .into_iter()
1319 .map(|(col_id, v)| (col_id, self.decode_raw_value(v)))
1320 .collect())
1321 }
1322
1323 /// Read the entire contents of `col_{col_id}.bin` for `label_id` as a
1324 /// flat `Vec<u64>`. Returns an empty vec when the file does not exist yet.
1325 ///
1326 /// This is used by [`crate::property_index::PropertyIndex::build`] to scan
1327 /// all slot values in one `fs::read` call rather than one `read_col_slot`
1328 /// per node, making index construction O(n) rather than O(n * syscall-overhead).
1329 pub fn read_col_all(&self, label_id: u32, col_id: u32) -> Result<Vec<u64>> {
1330 let path = self.col_path(label_id, col_id);
1331 let bytes = match fs::read(&path) {
1332 Ok(b) => b,
1333 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
1334 Err(e) => return Err(Error::Io(e)),
1335 };
1336 // Interpret the byte slice as a flat array of little-endian u64s.
1337 let count = bytes.len() / 8;
1338 let mut out = Vec::with_capacity(count);
1339 for i in 0..count {
1340 let offset = i * 8;
1341 let v = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
1342 out.push(v);
1343 }
1344 Ok(out)
1345 }
1346
1347 /// Read the null-bitmap for `(label_id, col_id)` and return a `Vec<bool>`
1348 /// where index `i` is `true` iff slot `i` has a real value (was explicitly
1349 /// written, as opposed to zero-padded for alignment or never written).
1350 ///
1351 /// If no bitmap file exists (old data written before SPA-207), returns
1352 /// `None` to signal backward-compat mode — callers should fall back to the
1353 /// `raw != 0` sentinel in that case.
1354 ///
1355 /// Used by [`PropertyIndex::build_for`] and [`PropertyIndex::build`] to
1356 /// correctly index slots whose value is `Int64(0)` (raw = 0), which is
1357 /// otherwise indistinguishable from the absent sentinel without the bitmap.
1358 pub fn read_null_bitmap_all(&self, label_id: u32, col_id: u32) -> Result<Option<Vec<bool>>> {
1359 let path = self.null_bitmap_path(label_id, col_id);
1360 if !path.exists() {
1361 return Ok(None); // backward compat: no bitmap file
1362 }
1363 let bits = fs::read(&path).map_err(Error::Io)?;
1364 // Each byte encodes 8 slots: bit i of byte j → slot (j*8 + i).
1365 let mut out = Vec::with_capacity(bits.len() * 8);
1366 for &byte in &bits {
1367 for bit_idx in 0..8u32 {
1368 out.push((byte >> bit_idx) & 1 == 1);
1369 }
1370 }
1371 Ok(Some(out))
1372 }
1373
1374 /// Selective sorted-slot read — O(K) seeks instead of O(N) reads.
1375 ///
1376 /// For each column, opens the file once and reads only the `slots` needed,
1377 /// in slot-ascending order (for sequential-ish I/O). This is the hot path
1378 /// for hop queries where K neighbor slots ≪ N total nodes.
1379 ///
1380 /// `slots` **must** be pre-sorted ascending by the caller.
1381 /// Returns a `Vec` indexed parallel to `slots`; each inner `Vec` is indexed
1382 /// parallel to `col_ids`. Out-of-range or missing-file slots return 0.
1383 ///
1384 /// SPA-200: replaces full O(N) column loads with O(K) per-column seeks.
1385 fn read_col_slots_sorted(
1386 &self,
1387 label_id: u32,
1388 slots: &[u32],
1389 col_ids: &[u32],
1390 ) -> Result<Vec<Vec<u64>>> {
1391 if slots.is_empty() || col_ids.is_empty() {
1392 // Return a row of empty vecs parallel to slots
1393 return Ok(slots.iter().map(|_| vec![0u64; col_ids.len()]).collect());
1394 }
1395
1396 // result[slot_idx][col_idx]
1397 let mut result: Vec<Vec<u64>> = slots.iter().map(|_| vec![0u64; col_ids.len()]).collect();
1398
1399 for (col_idx, &col_id) in col_ids.iter().enumerate() {
1400 let path = self.col_path(label_id, col_id);
1401 let mut file = match fs::File::open(&path) {
1402 Ok(f) => f,
1403 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1404 // Column file doesn't exist — all slots stay 0
1405 continue;
1406 }
1407 Err(e) => return Err(Error::Io(e)),
1408 };
1409
1410 let file_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
1411 // Reset to start for sequential reads
1412 file.seek(SeekFrom::Start(0)).map_err(Error::Io)?;
1413
1414 let mut buf = [0u8; 8];
1415 let mut current_pos: u64 = 0;
1416
1417 for (slot_idx, &slot) in slots.iter().enumerate() {
1418 let target_pos = slot as u64 * 8;
1419 if target_pos + 8 > file_len {
1420 // Slot beyond end of file — leave as 0
1421 continue;
1422 }
1423 if target_pos != current_pos {
1424 file.seek(SeekFrom::Start(target_pos)).map_err(Error::Io)?;
1425 current_pos = target_pos;
1426 }
1427 file.read_exact(&mut buf).map_err(Error::Io)?;
1428 current_pos += 8;
1429 result[slot_idx][col_idx] = u64::from_le_bytes(buf);
1430 }
1431 }
1432
1433 Ok(result)
1434 }
1435
1436 /// Batch-read multiple slots from multiple columns, returning nullable results.
1437 ///
1438 /// Like [`batch_read_node_props`] but preserves null semantics using the
1439 /// null-bitmap sidecar (`col_{id}_null.bin`). Each `Option<u64>` is:
1440 /// - `Some(raw)` — the slot has a real stored value (may be 0 for `Int64(0)`).
1441 /// - `None` — the slot was zero-padded, never written, or the null bitmap
1442 /// marks it absent.
1443 ///
1444 /// Backward compatible: if no bitmap file exists for a column (data written
1445 /// before SPA-207), every slot whose column file entry is within bounds is
1446 /// treated as present (`Some`).
1447 ///
1448 /// This replaces the raw-`0`-as-absent semantic of [`batch_read_node_props`]
1449 /// and is the correct API for the chunked pipeline (Phase 2, SPA-299).
1450 pub fn batch_read_node_props_nullable(
1451 &self,
1452 label_id: u32,
1453 slots: &[u32],
1454 col_ids: &[u32],
1455 ) -> Result<Vec<Vec<Option<u64>>>> {
1456 if slots.is_empty() {
1457 return Ok(vec![]);
1458 }
1459 if col_ids.is_empty() {
1460 return Ok(slots.iter().map(|_| vec![]).collect());
1461 }
1462
1463 let hwm = self.hwm_for_label(label_id).unwrap_or(0) as usize;
1464 // Use sorted-slot reads when K < 50% of N, identical heuristic as
1465 // batch_read_node_props.
1466 let use_sorted = hwm == 0 || slots.len() * 2 < hwm;
1467
1468 // Pre-load null bitmaps for each column (None = no bitmap = backward compat).
1469 let null_bitmaps: Vec<Option<Vec<bool>>> = col_ids
1470 .iter()
1471 .map(|&col_id| self.read_null_bitmap_all(label_id, col_id))
1472 .collect::<Result<_>>()?;
1473
1474 if use_sorted {
1475 // Build sorted permutation for sequential I/O.
1476 let mut order: Vec<usize> = (0..slots.len()).collect();
1477 order.sort_unstable_by_key(|&i| slots[i]);
1478 let sorted_slots: Vec<u32> = order.iter().map(|&i| slots[i]).collect();
1479
1480 // Raw values (0 for missing/out-of-range).
1481 let raw = self.read_col_slots_sorted(label_id, &sorted_slots, col_ids)?;
1482
1483 // Build result in original order, applying null bitmaps.
1484 let mut result = vec![vec![None; col_ids.len()]; slots.len()];
1485 for (sorted_idx, orig_idx) in order.into_iter().enumerate() {
1486 let slot = sorted_slots[sorted_idx];
1487 for (col_idx, bm) in null_bitmaps.iter().enumerate() {
1488 let is_present = match bm {
1489 None => {
1490 // No bitmap — backward compat: treat as present if
1491 // the raw value is non-zero. This mirrors the old
1492 // `raw != 0` sentinel from before SPA-207.
1493 raw[sorted_idx][col_idx] != 0
1494 }
1495 Some(bits) => bits.get(slot as usize).copied().unwrap_or(false),
1496 };
1497 if is_present {
1498 result[orig_idx][col_idx] = Some(raw[sorted_idx][col_idx]);
1499 }
1500 }
1501 }
1502 Ok(result)
1503 } else {
1504 // Full-column load path.
1505 let col_data: Vec<Vec<u64>> = col_ids
1506 .iter()
1507 .map(|&col_id| self.read_col_all(label_id, col_id))
1508 .collect::<Result<_>>()?;
1509 Ok(slots
1510 .iter()
1511 .map(|&slot| {
1512 col_ids
1513 .iter()
1514 .enumerate()
1515 .map(|(ci, _)| {
1516 let raw = col_data[ci].get(slot as usize).copied().unwrap_or(0);
1517 let is_present = match &null_bitmaps[ci] {
1518 None => raw != 0,
1519 Some(bits) => bits.get(slot as usize).copied().unwrap_or(false),
1520 };
1521 if is_present {
1522 Some(raw)
1523 } else {
1524 None
1525 }
1526 })
1527 .collect()
1528 })
1529 .collect())
1530 }
1531 }
1532
1533 /// Batch-read multiple slots from multiple columns.
1534 ///
1535 /// Chooses between two strategies based on the K/N ratio:
1536 /// - **Sorted-slot reads** (SPA-200): when K ≪ N, seeks to each slot
1537 /// offset — O(K × C) I/O instead of O(N × C). Slots are sorted
1538 /// ascending before the read so seeks are sequential.
1539 /// - **Full-column load**: when K is close to N (>50% of column), reading
1540 /// the whole file is cheaper than many random seeks.
1541 ///
1542 /// All `slots` must belong to `label_id`.
1543 /// Returns a `Vec` indexed parallel to `slots`; inner `Vec` indexed
1544 /// parallel to `col_ids`. Missing/out-of-range slots return 0.
1545 pub fn batch_read_node_props(
1546 &self,
1547 label_id: u32,
1548 slots: &[u32],
1549 col_ids: &[u32],
1550 ) -> Result<Vec<Vec<u64>>> {
1551 if slots.is_empty() {
1552 return Ok(vec![]);
1553 }
1554
1555 // Determine the column high-water mark (total node count for this label).
1556 let hwm = self.hwm_for_label(label_id).unwrap_or(0) as usize;
1557
1558 // Use sorted-slot reads when K < 50% of N. For K=100, N=4039 that is
1559 // a ~40× reduction in bytes read per column. The HWM=0 guard handles
1560 // labels that have no nodes yet (shouldn't reach here, but be safe).
1561 let use_sorted = hwm == 0 || slots.len() * 2 < hwm;
1562
1563 if use_sorted {
1564 // Sort slots ascending for sequential I/O; keep a permutation index
1565 // so we can return results in the original caller order.
1566 let mut order: Vec<usize> = (0..slots.len()).collect();
1567 order.sort_unstable_by_key(|&i| slots[i]);
1568 let sorted_slots: Vec<u32> = order.iter().map(|&i| slots[i]).collect();
1569
1570 let sorted_result = self.read_col_slots_sorted(label_id, &sorted_slots, col_ids)?;
1571
1572 // Re-order back to the original slot order expected by the caller.
1573 let mut result = vec![vec![0u64; col_ids.len()]; slots.len()];
1574 for (sorted_idx, orig_idx) in order.into_iter().enumerate() {
1575 result[orig_idx] = sorted_result[sorted_idx].clone();
1576 }
1577 Ok(result)
1578 } else {
1579 // Fall back to full column load when most slots are needed anyway.
1580 let col_data: Vec<Vec<u64>> = col_ids
1581 .iter()
1582 .map(|&col_id| self.read_col_all(label_id, col_id))
1583 .collect::<Result<_>>()?;
1584 Ok(slots
1585 .iter()
1586 .map(|&slot| {
1587 col_ids
1588 .iter()
1589 .enumerate()
1590 .map(|(ci, _)| col_data[ci].get(slot as usize).copied().unwrap_or(0))
1591 .collect()
1592 })
1593 .collect())
1594 }
1595 }
1596}
1597
1598// ── Helpers ─────────────────────────────────────────────────────────────────── ───────────────────────────────────────────────────────────────────
1599
1600/// Unpack `(label_id, slot)` from a [`NodeId`].
1601pub fn unpack_node_id(node_id: NodeId) -> (u32, u32) {
1602 let label_id = (node_id.0 >> 32) as u32;
1603 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1604 (label_id, slot)
1605}
1606
1607// ── Tests ─────────────────────────────────────────────────────────────────────
1608
1609#[cfg(test)]
1610mod tests {
1611 use super::*;
1612 use tempfile::tempdir;
1613
1614 #[test]
1615 fn test_node_create_and_get() {
1616 let dir = tempdir().unwrap();
1617 let mut store = NodeStore::open(dir.path()).unwrap();
1618
1619 let label_id = 1u32;
1620 let props = vec![(0u32, Value::Int64(42)), (1u32, Value::Int64(100))];
1621
1622 let node_id = store.create_node(label_id, &props).unwrap();
1623
1624 // Verify the packed node ID.
1625 let (lid, slot) = unpack_node_id(node_id);
1626 assert_eq!(lid, label_id);
1627 assert_eq!(slot, 0); // first node → slot 0
1628
1629 // Get back the values.
1630 let retrieved = store.get_node(node_id, &[0, 1]).unwrap();
1631 assert_eq!(retrieved.len(), 2);
1632 assert_eq!(retrieved[0], (0, Value::Int64(42)));
1633 assert_eq!(retrieved[1], (1, Value::Int64(100)));
1634 }
1635
1636 #[test]
1637 fn test_node_multiple_nodes_sequential_slots() {
1638 let dir = tempdir().unwrap();
1639 let mut store = NodeStore::open(dir.path()).unwrap();
1640
1641 let n1 = store.create_node(1, &[(0, Value::Int64(10))]).unwrap();
1642 let n2 = store.create_node(1, &[(0, Value::Int64(20))]).unwrap();
1643 let n3 = store.create_node(1, &[(0, Value::Int64(30))]).unwrap();
1644
1645 let (_, s1) = unpack_node_id(n1);
1646 let (_, s2) = unpack_node_id(n2);
1647 let (_, s3) = unpack_node_id(n3);
1648 assert_eq!(s1, 0);
1649 assert_eq!(s2, 1);
1650 assert_eq!(s3, 2);
1651
1652 assert_eq!(store.get_node(n1, &[0]).unwrap()[0].1, Value::Int64(10));
1653 assert_eq!(store.get_node(n2, &[0]).unwrap()[0].1, Value::Int64(20));
1654 assert_eq!(store.get_node(n3, &[0]).unwrap()[0].1, Value::Int64(30));
1655 }
1656
1657 #[test]
1658 fn test_node_persists_across_reopen() {
1659 let dir = tempdir().unwrap();
1660
1661 let node_id = {
1662 let mut store = NodeStore::open(dir.path()).unwrap();
1663 store
1664 .create_node(2, &[(0, Value::Int64(999)), (1, Value::Int64(-1))])
1665 .unwrap()
1666 };
1667
1668 // Reopen store from disk.
1669 let store2 = NodeStore::open(dir.path()).unwrap();
1670 let vals = store2.get_node(node_id, &[0, 1]).unwrap();
1671 assert_eq!(vals[0].1, Value::Int64(999));
1672 assert_eq!(vals[1].1, Value::Int64(-1));
1673 }
1674
1675 #[test]
1676 fn test_node_hwm_persists_across_reopen() {
1677 let dir = tempdir().unwrap();
1678
1679 // Create 3 nodes in session 1.
1680 {
1681 let mut store = NodeStore::open(dir.path()).unwrap();
1682 store.create_node(0, &[(0, Value::Int64(1))]).unwrap();
1683 store.create_node(0, &[(0, Value::Int64(2))]).unwrap();
1684 store.create_node(0, &[(0, Value::Int64(3))]).unwrap();
1685 }
1686
1687 // Reopen and create a 4th node — must get slot 3.
1688 let mut store2 = NodeStore::open(dir.path()).unwrap();
1689 let n4 = store2.create_node(0, &[(0, Value::Int64(4))]).unwrap();
1690 let (_, slot) = unpack_node_id(n4);
1691 assert_eq!(slot, 3);
1692 }
1693
1694 #[test]
1695 fn test_node_different_labels_independent() {
1696 let dir = tempdir().unwrap();
1697 let mut store = NodeStore::open(dir.path()).unwrap();
1698
1699 let a = store.create_node(10, &[(0, Value::Int64(1))]).unwrap();
1700 let b = store.create_node(20, &[(0, Value::Int64(2))]).unwrap();
1701
1702 let (la, sa) = unpack_node_id(a);
1703 let (lb, sb) = unpack_node_id(b);
1704 assert_eq!(la, 10);
1705 assert_eq!(sa, 0);
1706 assert_eq!(lb, 20);
1707 assert_eq!(sb, 0);
1708 }
1709}