sparrowdb_storage/node_store.rs
1//! Node property storage.
2//!
3//! Nodes are stored as typed property columns. Each `(label_id, col_id)` pair
4//! maps to a flat binary file of fixed-width values. Valid `u64` values pack
5//! `(label_id: u32, slot: u32)` into a single `u64` node ID consistent with
6//! [`sparrowdb_common::NodeId`] semantics.
7//!
8//! ## File layout
9//!
10//! ```text
11//! nodes/{label_id}/col_{col_id}.bin
12//! ```
13//!
14//! Each column file is a flat array of `u64` LE values (one per slot).
15//! The high-water mark is tracked in-memory and written to a small header file:
16//!
17//! ```text
18//! nodes/{label_id}/hwm.bin — [hwm: u64 LE]
19//! ```
20//!
21//! ## Node ID packing
22//!
23//! ```text
24//! node_id = (label_id as u64) << 32 | slot as u64
25//! ```
26//!
27//! Upper 32 bits are `label_id`, lower 32 bits are the within-label slot number.
28
29use std::collections::{HashMap, HashSet};
30use std::fs;
31use std::io::{Read, Seek, SeekFrom};
32use std::path::{Path, PathBuf};
33
34use sparrowdb_common::{Error, NodeId, Result};
35
36// ── Value type ────────────────────────────────────────────────────────────────
37
38/// A typed property value.
39#[derive(Debug, Clone, PartialEq)]
40pub enum Value {
41 /// Signed 64-bit integer, stored as raw `u64` bits (two's-complement).
42 Int64(i64),
43 /// Raw byte blob, stored as a fixed-width 8-byte reference in v1.
44 /// The actual bytes are placed inline for values ≤ 8 bytes; longer blobs
45 /// are truncated and marked with a sentinel in v1 (overflow deferred).
46 Bytes(Vec<u8>),
47 /// IEEE-754 double-precision float. Stored as 8 raw bytes in the overflow
48 /// heap so that no bits are masked by the type-tag scheme (SPA-267).
49 Float(f64),
50}
51
52/// Type tag embedded in the top byte (byte index 7 in LE) of a stored `u64`.
53///
54/// - `0x00` = `Int64` — lower 7 bytes hold the signed integer (56-bit range).
55/// - `0x01` = `Bytes` — lower 7 bytes hold up to 7 bytes of inline string data.
56/// - `0x02` = `BytesOverflow` — lower 7 bytes encode `(offset: u40 LE, len: u16 LE)`
57/// pointing into `strings.bin` (SPA-212).
58/// - `0x03` = `Float` — lower 7 bytes encode `(offset: u40 LE, len: u16 LE)`
59/// pointing into `strings.bin` where 8 raw IEEE-754 bytes are stored (SPA-267).
60///
61/// The overflow encoding packs a heap pointer into 7 bytes:
62/// bytes[0..5] = heap byte offset (u40 LE, max ~1 TiB)
63/// bytes[5..7] = byte length (u16 LE, max 65535 bytes)
64///
65/// This lets `decode_raw_value` reconstruct strings of any length, fixing SPA-212.
66const TAG_INT64: u8 = 0x00;
67const TAG_BYTES: u8 = 0x01;
68/// Tag for strings > 7 bytes stored in the overflow string heap (SPA-212).
69const TAG_BYTES_OVERFLOW: u8 = 0x02;
70/// Tag for f64 values stored as 8 raw IEEE-754 bytes in the overflow heap (SPA-267).
71/// Using the heap ensures all 64 bits of the float are preserved without any masking.
72const TAG_FLOAT: u8 = 0x03;
73/// Maximum bytes that fit inline in the 7-byte payload (one byte is the tag).
74const MAX_INLINE_BYTES: usize = 7;
75
76impl Value {
77 /// Encode as a packed `u64` for column storage.
78 ///
79 /// The top byte (byte 7 in little-endian) is a type tag; the remaining
80 /// 7 bytes carry the payload. This allows `from_u64` to reconstruct the
81 /// correct variant at read time (SPA-169).
82 ///
83 /// For `Bytes` values that exceed 7 bytes, this method only encodes the
84 /// first 7 bytes inline. Callers that need full overflow support must use
85 /// [`NodeStore::encode_value`] instead, which writes long strings to the
86 /// heap and returns an overflow-tagged u64 (SPA-212).
87 ///
88 /// # Int64 range
89 /// Only the lower 56 bits of the integer are stored. This covers all
90 /// practical node IDs and numeric property values; very large i64 values
91 /// (> 2^55 or < -2^55) would be truncated. Full 64-bit range is deferred
92 /// to a later overflow encoding.
93 pub fn to_u64(&self) -> u64 {
94 match self {
95 Value::Int64(v) => {
96 // Top byte = TAG_INT64 (0x00); lower 7 bytes = lower 56 bits of v.
97 // For TAG_INT64 = 0x00 this is just the value masked to 56 bits,
98 // which is a no-op for any i64 whose top byte is already 0x00.
99 let payload = (*v as u64) & 0x00FF_FFFF_FFFF_FFFF;
100 // Tag byte goes into byte 7 (the most significant byte in LE).
101 payload | ((TAG_INT64 as u64) << 56)
102 }
103 Value::Bytes(b) => {
104 let mut arr = [0u8; 8];
105 arr[7] = TAG_BYTES; // type tag in top byte
106 let len = b.len().min(MAX_INLINE_BYTES);
107 arr[..len].copy_from_slice(&b[..len]);
108 u64::from_le_bytes(arr)
109 }
110 Value::Float(_) => {
111 // Float values require heap storage — callers must use
112 // NodeStore::encode_value instead of Value::to_u64.
113 panic!("Value::Float cannot be inline-encoded; use NodeStore::encode_value");
114 }
115 }
116 }
117
118 /// Reconstruct a `Value` from a stored `u64`, using the top byte as a
119 /// type tag (SPA-169).
120 ///
121 /// Only handles inline encodings (`TAG_INT64` and `TAG_BYTES`).
122 /// For overflow strings (`TAG_BYTES_OVERFLOW`), use [`NodeStore::decode_raw_value`]
123 /// which has access to the string heap (SPA-212).
124 pub fn from_u64(v: u64) -> Self {
125 let bytes = v.to_le_bytes(); // bytes[7] = top byte = tag
126 match bytes[7] {
127 TAG_BYTES => {
128 // Inline string: bytes[0..7] hold the data; strip trailing zeros.
129 let data: Vec<u8> = bytes[..7].iter().copied().take_while(|&b| b != 0).collect();
130 Value::Bytes(data)
131 }
132 _ => {
133 // TAG_INT64 (0x00) or any unrecognised tag → Int64.
134 // Sign-extend from 56 bits: shift left 8 to bring bit 55 into
135 // sign position, then arithmetic shift right 8.
136 let shifted = (v << 8) as i64;
137 Value::Int64(shifted >> 8)
138 }
139 }
140 }
141
142 /// Reconstruct an `Int64` value from a stored `u64`.
143 ///
144 /// Preserved for callers that know the column type is always Int64 (e.g.
145 /// pre-SPA-169 paths). New code should prefer `from_u64`.
146 pub fn int64_from_u64(v: u64) -> Self {
147 Value::Int64(v as i64)
148 }
149}
150
151// ── NodeStore ─────────────────────────────────────────────────────────────────
152
153/// Persistent node property store rooted at a database directory.
154///
155/// On-disk layout:
156/// ```text
157/// {root}/nodes/{label_id}/hwm.bin — high-water mark (u64 LE)
158/// {root}/nodes/{label_id}/col_{col_id}.bin — flat u64 column array
159/// {root}/strings.bin — overflow string heap (SPA-212)
160/// ```
161///
162/// The overflow heap is an append-only byte file. Each entry is a raw byte
163/// sequence (no length prefix); the offset and length are encoded into the
164/// `TAG_BYTES_OVERFLOW` u64 stored in the column file.
165pub struct NodeStore {
166 root: PathBuf,
167 /// In-memory high-water marks per label. Loaded lazily from disk.
168 hwm: HashMap<u32, u64>,
169 /// Labels whose in-memory HWM has been advanced but not yet persisted to
170 /// `hwm.bin`. Flushed atomically by [`flush_hwms`] at transaction commit.
171 hwm_dirty: HashSet<u32>,
172}
173
174impl NodeStore {
175 /// Open (or create) a node store rooted at `db_root`.
176 pub fn open(db_root: &Path) -> Result<Self> {
177 Ok(NodeStore {
178 root: db_root.to_path_buf(),
179 hwm: HashMap::new(),
180 hwm_dirty: HashSet::new(),
181 })
182 }
183
184 // ── Internal helpers ──────────────────────────────────────────────────────
185
186 fn label_dir(&self, label_id: u32) -> PathBuf {
187 self.root.join("nodes").join(label_id.to_string())
188 }
189
190 fn hwm_path(&self, label_id: u32) -> PathBuf {
191 self.label_dir(label_id).join("hwm.bin")
192 }
193
194 fn col_path(&self, label_id: u32, col_id: u32) -> PathBuf {
195 self.label_dir(label_id).join(format!("col_{col_id}.bin"))
196 }
197
198 /// Path to the null-bitmap sidecar for a column (SPA-207).
199 ///
200 /// Bit N = 1 means slot N has a real value (present/non-null).
201 /// Bit N = 0 means slot N was zero-padded (absent/null).
202 fn null_bitmap_path(&self, label_id: u32, col_id: u32) -> PathBuf {
203 self.label_dir(label_id)
204 .join(format!("col_{col_id}_null.bin"))
205 }
206
207 /// Mark slot `slot` as present (has a real value) in the null bitmap (SPA-207).
208 fn set_null_bit(&self, label_id: u32, col_id: u32, slot: u32) -> Result<()> {
209 let path = self.null_bitmap_path(label_id, col_id);
210 if let Some(parent) = path.parent() {
211 fs::create_dir_all(parent).map_err(Error::Io)?;
212 }
213 let byte_idx = (slot / 8) as usize;
214 let bit_idx = slot % 8;
215 let mut bits = if path.exists() {
216 fs::read(&path).map_err(Error::Io)?
217 } else {
218 vec![]
219 };
220 if bits.len() <= byte_idx {
221 bits.resize(byte_idx + 1, 0);
222 }
223 bits[byte_idx] |= 1 << bit_idx;
224 fs::write(&path, &bits).map_err(Error::Io)
225 }
226
227 /// Returns `true` if slot `slot` has a real value (present/non-null) (SPA-207).
228 ///
229 /// Backward-compatible: if no bitmap file exists (old data written before
230 /// the null-bitmap fix), every slot is treated as present.
231 fn get_null_bit(&self, label_id: u32, col_id: u32, slot: u32) -> Result<bool> {
232 let path = self.null_bitmap_path(label_id, col_id);
233 if !path.exists() {
234 // No bitmap file → backward-compatible: treat all slots as present.
235 return Ok(true);
236 }
237 let bits = fs::read(&path).map_err(Error::Io)?;
238 let byte_idx = (slot / 8) as usize;
239 if byte_idx >= bits.len() {
240 return Ok(false);
241 }
242 Ok((bits[byte_idx] >> (slot % 8)) & 1 == 1)
243 }
244
245 /// Path to the overflow string heap (shared across all labels).
246 fn strings_bin_path(&self) -> PathBuf {
247 self.root.join("strings.bin")
248 }
249
250 // ── Overflow string heap (SPA-212) ────────────────────────────────────────
251
252 /// Append `bytes` to the overflow string heap and return an
253 /// `TAG_BYTES_OVERFLOW`-tagged `u64` encoding the (offset, len) pair.
254 ///
255 /// Layout of the returned `u64` (little-endian bytes):
256 /// bytes[0..5] = heap byte offset as u40 LE (max ~1 TiB)
257 /// bytes[5..7] = byte length as u16 LE (max 65 535 bytes)
258 /// bytes[7] = `TAG_BYTES_OVERFLOW` (0x02)
259 fn append_to_string_heap(&self, bytes: &[u8]) -> Result<u64> {
260 use std::io::{Seek, SeekFrom, Write};
261 let path = self.strings_bin_path();
262 let mut file = fs::OpenOptions::new()
263 .create(true)
264 .truncate(false)
265 .append(false)
266 .read(true)
267 .write(true)
268 .open(&path)
269 .map_err(Error::Io)?;
270
271 // The heap offset is the current end of the file.
272 let offset = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
273 file.write_all(bytes).map_err(Error::Io)?;
274
275 // Encode (offset, len) into a 7-byte payload.
276 let len = bytes.len() as u64;
277 debug_assert!(
278 offset <= 0x00FF_FFFF_FFFF_u64,
279 "string heap too large for 5-byte offset"
280 );
281 debug_assert!(len <= 0xFFFF, "string longer than 65535 bytes");
282
283 let mut arr = [0u8; 8];
284 // Store offset in bytes[0..5] (40-bit LE).
285 arr[0] = offset as u8;
286 arr[1] = (offset >> 8) as u8;
287 arr[2] = (offset >> 16) as u8;
288 arr[3] = (offset >> 24) as u8;
289 arr[4] = (offset >> 32) as u8;
290 // Store len in bytes[5..7] (16-bit LE).
291 arr[5] = len as u8;
292 arr[6] = (len >> 8) as u8;
293 // Tag byte.
294 arr[7] = TAG_BYTES_OVERFLOW;
295 Ok(u64::from_le_bytes(arr))
296 }
297
298 /// Read string bytes from the overflow heap given an `TAG_BYTES_OVERFLOW`
299 /// tagged `u64` produced by [`append_to_string_heap`].
300 ///
301 /// Uses `seek + read_exact` to load only the `len` bytes at `offset`,
302 /// avoiding a full file load regardless of `strings.bin` size (SPA-208).
303 fn read_from_string_heap(&self, tagged: u64) -> Result<Vec<u8>> {
304 use std::io::{Read, Seek, SeekFrom};
305
306 let arr = tagged.to_le_bytes();
307 debug_assert_eq!(arr[7], TAG_BYTES_OVERFLOW, "not an overflow pointer");
308
309 // Decode (offset, len).
310 let offset = arr[0] as u64
311 | ((arr[1] as u64) << 8)
312 | ((arr[2] as u64) << 16)
313 | ((arr[3] as u64) << 24)
314 | ((arr[4] as u64) << 32);
315 let len = arr[5] as usize | ((arr[6] as usize) << 8);
316
317 let path = self.strings_bin_path();
318 let mut file = fs::File::open(&path).map_err(Error::Io)?;
319 file.seek(SeekFrom::Start(offset)).map_err(|e| {
320 Error::Corruption(format!("string heap seek failed (offset={offset}): {e}"))
321 })?;
322 let mut buf = vec![0u8; len];
323 file.read_exact(&mut buf).map_err(|e| {
324 Error::Corruption(format!(
325 "string heap too short: need {} bytes at offset {offset}: {e}",
326 len
327 ))
328 })?;
329 Ok(buf)
330 }
331
332 // ── Value encode / decode with overflow support ───────────────────────────
333
334 /// Encode a `Value` for column storage, writing long `Bytes` strings to
335 /// the overflow heap (SPA-212).
336 ///
337 /// - `Int64` → identical to `Value::to_u64()`.
338 /// - `Bytes` ≤ 7 B → inline `TAG_BYTES` encoding, identical to `Value::to_u64()`.
339 /// - `Bytes` > 7 B → appended to `strings.bin`; returns `TAG_BYTES_OVERFLOW` u64.
340 /// - `Float` → 8 raw IEEE-754 bytes appended to `strings.bin`;
341 /// returns a `TAG_FLOAT` u64 so all 64 float bits are preserved (SPA-267).
342 pub fn encode_value(&self, val: &Value) -> Result<u64> {
343 match val {
344 Value::Int64(_) => Ok(val.to_u64()),
345 Value::Bytes(b) if b.len() <= MAX_INLINE_BYTES => Ok(val.to_u64()),
346 Value::Bytes(b) => self.append_to_string_heap(b),
347 // SPA-267: store all 8 float bytes in the heap so no bits are masked.
348 // The heap pointer uses the same (offset: u40, len: u16) layout as
349 // TAG_BYTES_OVERFLOW but with TAG_FLOAT in byte 7.
350 Value::Float(f) => {
351 let bits = f.to_bits().to_le_bytes();
352 let heap_tagged = self.append_to_string_heap(&bits)?;
353 // Replace the TAG_BYTES_OVERFLOW tag byte with TAG_FLOAT.
354 let payload = heap_tagged & 0x00FF_FFFF_FFFF_FFFF;
355 Ok((TAG_FLOAT as u64) << 56 | payload)
356 }
357 }
358 }
359
360 /// Decode a raw `u64` column value back to a `Value`, reading the
361 /// overflow string heap when the tag is `TAG_BYTES_OVERFLOW` or `TAG_FLOAT` (SPA-212, SPA-267).
362 ///
363 /// Handles all four tags:
364 /// - `TAG_INT64` → `Value::Int64`
365 /// - `TAG_BYTES` → `Value::Bytes` (inline, ≤ 7 bytes)
366 /// - `TAG_BYTES_OVERFLOW` → `Value::Bytes` (from heap)
367 /// - `TAG_FLOAT` → `Value::Float` (8 raw IEEE-754 bytes from heap)
368 pub fn decode_raw_value(&self, raw: u64) -> Value {
369 let tag = (raw >> 56) as u8;
370 match tag {
371 TAG_BYTES_OVERFLOW => match self.read_from_string_heap(raw) {
372 Ok(bytes) => Value::Bytes(bytes),
373 Err(e) => {
374 // Corruption fallback: return empty bytes and log.
375 eprintln!(
376 "WARN: failed to read overflow string from heap (raw={raw:#018x}): {e}"
377 );
378 Value::Bytes(Vec::new())
379 }
380 },
381 // SPA-267: float values are stored as 8-byte IEEE-754 blobs in the heap.
382 // Reconstruct the heap pointer by swapping TAG_FLOAT → TAG_BYTES_OVERFLOW
383 // so read_from_string_heap can locate the bytes.
384 TAG_FLOAT => {
385 let payload = raw & 0x00FF_FFFF_FFFF_FFFF;
386 let heap_tagged = (TAG_BYTES_OVERFLOW as u64) << 56 | payload;
387 match self.read_from_string_heap(heap_tagged) {
388 Ok(bytes) if bytes.len() == 8 => {
389 let arr: [u8; 8] = bytes.try_into().unwrap();
390 Value::Float(f64::from_bits(u64::from_le_bytes(arr)))
391 }
392 Ok(bytes) => {
393 eprintln!(
394 "WARN: float heap blob has unexpected length {} (raw={raw:#018x})",
395 bytes.len()
396 );
397 Value::Float(f64::NAN)
398 }
399 Err(e) => {
400 eprintln!("WARN: failed to read float from heap (raw={raw:#018x}): {e}");
401 Value::Float(f64::NAN)
402 }
403 }
404 }
405 _ => Value::from_u64(raw),
406 }
407 }
408
409 /// Check whether a raw stored `u64` encodes a string equal to `s`.
410 ///
411 /// Handles both inline (`TAG_BYTES`) and overflow (`TAG_BYTES_OVERFLOW`)
412 /// encodings (SPA-212). Used by WHERE-clause and prop-filter comparison.
413 pub fn raw_str_matches(&self, raw: u64, s: &str) -> bool {
414 let tag = (raw >> 56) as u8;
415 match tag {
416 TAG_BYTES => {
417 // Fast inline comparison: encode s the same way and compare u64s.
418 raw == Value::Bytes(s.as_bytes().to_vec()).to_u64()
419 }
420 TAG_BYTES_OVERFLOW => {
421 // Overflow: read from heap and compare bytes.
422 match self.read_from_string_heap(raw) {
423 Ok(bytes) => bytes == s.as_bytes(),
424 Err(_) => false,
425 }
426 }
427 _ => false, // INT64 or unknown — not a string
428 }
429 }
430
431 /// Read the high-water mark for `label_id` from disk (or return 0).
432 ///
433 /// Recovery path (SPA-211): if `hwm.bin` is missing or corrupt but
434 /// `hwm.bin.tmp` exists (leftover from a previous crashed write), we
435 /// promote the tmp file to `hwm.bin` and use its value.
436 fn load_hwm(&self, label_id: u32) -> Result<u64> {
437 let path = self.hwm_path(label_id);
438 let tmp_path = self.hwm_tmp_path(label_id);
439
440 // Try to read the canonical file first.
441 let try_read = |p: &std::path::Path| -> Option<u64> {
442 let bytes = fs::read(p).ok()?;
443 if bytes.len() < 8 {
444 return None;
445 }
446 Some(u64::from_le_bytes(bytes[..8].try_into().unwrap()))
447 };
448
449 if path.exists() {
450 match try_read(&path) {
451 Some(v) => return Ok(v),
452 None => {
453 // hwm.bin exists but is corrupt — fall through to tmp recovery.
454 }
455 }
456 }
457
458 // hwm.bin is absent or unreadable. Check for a tmp leftover.
459 if tmp_path.exists() {
460 if let Some(v) = try_read(&tmp_path) {
461 // Promote: atomically rename tmp → canonical so the next open
462 // is clean even if we crash again immediately here.
463 let _ = fs::rename(&tmp_path, &path);
464 return Ok(v);
465 }
466 }
467
468 // Last resort: infer the HWM from the sizes of the column files.
469 //
470 // Each `col_{n}.bin` is a flat array of 8-byte u64 LE values, one per
471 // slot. The number of slots written equals `file_len / 8`. If
472 // `hwm.bin` is corrupt we can reconstruct the HWM as the maximum slot
473 // count across all column files for this label.
474 {
475 let inferred = self.infer_hwm_from_cols(label_id);
476 if inferred > 0 {
477 // Persist the recovered HWM so the next open is clean.
478 let _ = self.save_hwm(label_id, inferred);
479 return Ok(inferred);
480 }
481 }
482
483 // No usable file at all — fresh label, HWM is 0.
484 Ok(0)
485 }
486
487 /// Infer the high-water mark for `label_id` from the sizes of the column
488 /// files on disk. Returns 0 if no column files exist or none are readable.
489 ///
490 /// Each `col_{n}.bin` stores one u64 per slot, so `file_len / 8` gives the
491 /// slot count. We return the maximum over all columns.
492 fn infer_hwm_from_cols(&self, label_id: u32) -> u64 {
493 let dir = self.label_dir(label_id);
494 let read_dir = match fs::read_dir(&dir) {
495 Ok(rd) => rd,
496 Err(_) => return 0,
497 };
498 read_dir
499 .flatten()
500 .filter_map(|entry| {
501 let name = entry.file_name();
502 let name_str = name.to_string_lossy().into_owned();
503 // Only consider col_{n}.bin files.
504 name_str.strip_prefix("col_")?.strip_suffix(".bin")?;
505 let meta = entry.metadata().ok()?;
506 Some(meta.len() / 8)
507 })
508 .max()
509 .unwrap_or(0)
510 }
511
512 /// Write the high-water mark for `label_id` to disk atomically.
513 ///
514 /// Strategy (SPA-211): write to `hwm.bin.tmp`, fsync, then rename to
515 /// `hwm.bin`. On POSIX, `rename(2)` is atomic, so a crash at any point
516 /// leaves either the old `hwm.bin` intact or the fully-written new one.
517 fn save_hwm(&self, label_id: u32, hwm: u64) -> Result<()> {
518 use std::io::Write as _;
519
520 let path = self.hwm_path(label_id);
521 let tmp_path = self.hwm_tmp_path(label_id);
522
523 if let Some(parent) = path.parent() {
524 fs::create_dir_all(parent).map_err(Error::Io)?;
525 }
526
527 // Write to the tmp file and fsync before renaming.
528 {
529 let mut file = fs::File::create(&tmp_path).map_err(Error::Io)?;
530 file.write_all(&hwm.to_le_bytes()).map_err(Error::Io)?;
531 file.sync_all().map_err(Error::Io)?;
532 }
533
534 // Atomic rename: on success the old hwm.bin is replaced in one syscall.
535 fs::rename(&tmp_path, &path).map_err(Error::Io)
536 }
537
538 fn hwm_tmp_path(&self, label_id: u32) -> PathBuf {
539 self.label_dir(label_id).join("hwm.bin.tmp")
540 }
541
542 /// Persist all dirty in-memory HWMs to disk atomically.
543 ///
544 /// Called **once per transaction commit** rather than once per node creation,
545 /// so that bulk imports do not incur one fsync per node (SPA-217 regression fix).
546 ///
547 /// Each dirty label's HWM is written via the same tmp+fsync+rename strategy
548 /// used by [`save_hwm`], preserving the SPA-211 crash-safety guarantee.
549 /// After all writes succeed the dirty set is cleared.
550 pub fn flush_hwms(&mut self) -> Result<()> {
551 let dirty: Vec<u32> = self.hwm_dirty.iter().copied().collect();
552 for label_id in dirty {
553 let hwm = match self.hwm.get(&label_id) {
554 Some(&v) => v,
555 None => continue,
556 };
557 self.save_hwm(label_id, hwm)?;
558 }
559 self.hwm_dirty.clear();
560 Ok(())
561 }
562
563 /// Append a `u64` value to a column file.
564 fn append_col(&self, label_id: u32, col_id: u32, slot: u32, value: u64) -> Result<()> {
565 use std::io::{Seek, SeekFrom, Write};
566 let path = self.col_path(label_id, col_id);
567 if let Some(parent) = path.parent() {
568 fs::create_dir_all(parent).map_err(Error::Io)?;
569 }
570 // Open without truncation so we can inspect the current length.
571 let mut file = fs::OpenOptions::new()
572 .create(true)
573 .truncate(false)
574 .read(true)
575 .write(true)
576 .open(&path)
577 .map_err(Error::Io)?;
578
579 // Pad with zeros for any slots that were skipped (sparse write pattern).
580 // Without padding, a later slot's value would be written at offset 0,
581 // causing earlier slots to incorrectly read that value.
582 let existing_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
583 let expected_offset = slot as u64 * 8;
584 if existing_len < expected_offset {
585 file.seek(SeekFrom::Start(existing_len))
586 .map_err(Error::Io)?;
587 const CHUNK: usize = 65536;
588 let zeros = [0u8; CHUNK];
589 let mut remaining = (expected_offset - existing_len) as usize;
590 while remaining > 0 {
591 let n = remaining.min(CHUNK);
592 file.write_all(&zeros[..n]).map_err(Error::Io)?;
593 remaining -= n;
594 }
595 }
596
597 // Seek to the correct slot position and write the value.
598 file.seek(SeekFrom::Start(expected_offset))
599 .map_err(Error::Io)?;
600 file.write_all(&value.to_le_bytes()).map_err(Error::Io)
601 }
602
603 /// Read the `u64` stored at `slot` in the given column file.
604 ///
605 /// Returns `Ok(0)` when the column file does not exist yet — a missing file
606 /// means no value has ever been written for this `(label_id, col_id)` pair,
607 /// which is represented as the zero bit-pattern (SPA-166).
608 fn read_col_slot(&self, label_id: u32, col_id: u32, slot: u32) -> Result<u64> {
609 let path = self.col_path(label_id, col_id);
610 let bytes = match fs::read(&path) {
611 Ok(b) => b,
612 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
613 Err(e) => return Err(Error::Io(e)),
614 };
615 let offset = slot as usize * 8;
616 if bytes.len() < offset + 8 {
617 return Err(Error::NotFound);
618 }
619 Ok(u64::from_le_bytes(
620 bytes[offset..offset + 8].try_into().unwrap(),
621 ))
622 }
623
624 // ── Public API ────────────────────────────────────────────────────────────
625
626 /// Return the high-water mark (slot count) for a label.
627 ///
628 /// Returns `0` if no nodes have been created for that label yet.
629 pub fn hwm_for_label(&self, label_id: u32) -> Result<u64> {
630 if let Some(&h) = self.hwm.get(&label_id) {
631 return Ok(h);
632 }
633 self.load_hwm(label_id)
634 }
635
636 /// Discover all column IDs that currently exist on disk for `label_id`.
637 ///
638 /// Scans the label directory for `col_{id}.bin` files and returns the
639 /// parsed `col_id` values. Used by `create_node` to zero-pad columns
640 /// that are not supplied for a new node (SPA-187).
641 ///
642 /// Returns `Err` when the directory exists but cannot be read (e.g.
643 /// permissions failure or I/O error). A missing directory is not an
644 /// error — it simply means no nodes of this label have been created yet.
645 pub fn col_ids_for_label(&self, label_id: u32) -> Result<Vec<u32>> {
646 self.existing_col_ids(label_id)
647 }
648
649 fn existing_col_ids(&self, label_id: u32) -> Result<Vec<u32>> {
650 let dir = self.label_dir(label_id);
651 let read_dir = match fs::read_dir(&dir) {
652 Ok(rd) => rd,
653 // Directory does not exist yet → no columns on disk.
654 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
655 Err(e) => return Err(Error::Io(e)),
656 };
657 let mut ids: Vec<u32> = read_dir
658 .flatten()
659 .filter_map(|entry| {
660 let name = entry.file_name();
661 let name_str = name.to_string_lossy().into_owned();
662 // Match "col_{col_id}.bin" filenames.
663 let id_str = name_str.strip_prefix("col_")?.strip_suffix(".bin")?;
664 id_str.parse::<u32>().ok()
665 })
666 .collect();
667 ids.sort_unstable();
668 Ok(ids)
669 }
670
671 /// Return the **on-disk** high-water mark for a label, bypassing any
672 /// in-memory advances made by `peek_next_slot`.
673 ///
674 /// Used by [`WriteTx::merge_node`] to limit the disk scan to only slots
675 /// that have actually been persisted.
676 pub fn disk_hwm_for_label(&self, label_id: u32) -> Result<u64> {
677 self.load_hwm(label_id)
678 }
679
680 /// Reserve the slot index that the *next* `create_node` call will use for
681 /// `label_id`, advancing the in-memory HWM so that the slot is not
682 /// assigned again within the same [`NodeStore`] instance.
683 ///
684 /// This is used by [`WriteTx::create_node`] to pre-compute a [`NodeId`]
685 /// before the actual disk write, so the ID can be returned to the caller
686 /// while the write is deferred until commit (SPA-181).
687 ///
688 /// The on-disk HWM is **not** updated here; it is updated when the
689 /// buffered `NodeCreate` operation is applied in `commit()`.
690 pub fn peek_next_slot(&mut self, label_id: u32) -> Result<u32> {
691 // Load from disk if not cached yet.
692 if !self.hwm.contains_key(&label_id) {
693 let h = self.load_hwm(label_id)?;
694 self.hwm.insert(label_id, h);
695 }
696 let h = *self.hwm.get(&label_id).unwrap();
697 // Advance the in-memory HWM so a subsequent peek returns the next slot.
698 self.hwm.insert(label_id, h + 1);
699 Ok(h as u32)
700 }
701
702 /// Write a node at a pre-reserved `slot` (SPA-181 commit path).
703 ///
704 /// Like [`create_node`] but uses the caller-specified `slot` index instead
705 /// of deriving it from the HWM. Used by [`WriteTx::commit`] to flush
706 /// buffered node-create operations in the exact order they were issued,
707 /// with slots that were already pre-allocated by [`peek_next_slot`].
708 ///
709 /// Advances the on-disk HWM to `slot + 1` (or higher if already past that).
710 pub fn create_node_at_slot(
711 &mut self,
712 label_id: u32,
713 slot: u32,
714 props: &[(u32, Value)],
715 ) -> Result<NodeId> {
716 // Snapshot original column sizes for rollback on partial failure.
717 let original_sizes: Vec<(u32, u64)> = props
718 .iter()
719 .map(|&(col_id, _)| {
720 let path = self.col_path(label_id, col_id);
721 let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
722 (col_id, size)
723 })
724 .collect();
725
726 let write_result = (|| {
727 for &(col_id, ref val) in props {
728 self.append_col(label_id, col_id, slot, self.encode_value(val)?)?;
729 // Mark this slot as present in the null bitmap (SPA-207).
730 self.set_null_bit(label_id, col_id, slot)?;
731 }
732 Ok::<(), sparrowdb_common::Error>(())
733 })();
734
735 if let Err(e) = write_result {
736 for (col_id, original_size) in &original_sizes {
737 let path = self.col_path(label_id, *col_id);
738 if path.exists() {
739 if let Err(rollback_err) = fs::OpenOptions::new()
740 .write(true)
741 .open(&path)
742 .and_then(|f| f.set_len(*original_size))
743 {
744 eprintln!(
745 "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
746 path.display(),
747 original_size,
748 rollback_err
749 );
750 }
751 }
752 }
753 return Err(e);
754 }
755
756 // Advance the in-memory HWM to at least slot + 1 and mark the label
757 // dirty so that flush_hwms() will persist it at commit boundary.
758 //
759 // We do NOT call save_hwm here. The caller (WriteTx::commit) is
760 // responsible for calling flush_hwms() once after all PendingOp::NodeCreate
761 // entries have been applied. This avoids one fsync per node during bulk
762 // imports (SPA-217 regression fix) while preserving crash-safety: the WAL
763 // record is already durable at this point, so recovery can reconstruct the
764 // HWM if we crash before flush_hwms() completes.
765 //
766 // NOTE: peek_next_slot() may have already advanced self.hwm to slot+1,
767 // so new_hwm > mem_hwm might be false. We mark the label dirty
768 // unconditionally so that flush_hwms() always writes through to disk.
769 let new_hwm = slot as u64 + 1;
770 let mem_hwm = self.hwm.get(&label_id).copied().unwrap_or(0);
771 if new_hwm > mem_hwm {
772 self.hwm.insert(label_id, new_hwm);
773 }
774 // Always mark dirty — flush_hwms() must write the post-commit HWM even
775 // if peek_next_slot already set mem HWM == new_hwm.
776 self.hwm_dirty.insert(label_id);
777
778 Ok(NodeId((label_id as u64) << 32 | slot as u64))
779 }
780
781 /// Batch-write column data for multiple nodes created in a single transaction
782 /// commit (SPA-212 write-amplification fix).
783 ///
784 /// # Why this exists
785 ///
786 /// The naive path calls `create_node_at_slot` per node, which opens and
787 /// closes every column file once per node. For a transaction that creates
788 /// `N` nodes each with `C` columns, that is `O(N × C)` file-open/close
789 /// syscalls.
790 ///
791 /// This method instead:
792 /// 1. Accepts pre-encoded `(label_id, col_id, slot, raw_value, is_present)`
793 /// tuples from the caller (value encoding happens in `commit()` before
794 /// the call).
795 /// 2. Sorts by `(label_id, col_id)` so all writes to the same column file
796 /// are contiguous.
797 /// 3. Opens each `(label_id, col_id)` file exactly **once**, writes all
798 /// slots for that column, then closes it — reducing file opens to
799 /// `O(labels × cols)`.
800 /// 4. Updates each null-bitmap file once per `(label_id, col_id)` group.
801 ///
802 /// HWM advances are applied for every `(label_id, slot)` in `node_slots`,
803 /// exactly as `create_node_at_slot` would do them. `node_slots` must
804 /// include **all** created nodes — including those with zero properties —
805 /// so that the HWM is advanced even for property-less nodes.
806 ///
807 /// # Rollback
808 ///
809 /// On I/O failure the method truncates every file that was opened back to
810 /// its pre-call size, matching the rollback contract of
811 /// `create_node_at_slot`.
812 pub fn batch_write_node_creates(
813 &mut self,
814 // (label_id, col_id, slot, raw_u64, is_present)
815 mut writes: Vec<(u32, u32, u32, u64, bool)>,
816 // All (label_id, slot) pairs for every created node, including those
817 // with zero properties.
818 node_slots: &[(u32, u32)],
819 ) -> Result<()> {
820 use std::io::{Seek, SeekFrom, Write};
821
822 if writes.is_empty() && node_slots.is_empty() {
823 return Ok(());
824 }
825
826 // Sort by (label_id, col_id, slot) so all writes to the same file are
827 // contiguous.
828 writes.sort_unstable_by_key(|&(lid, cid, slot, _, _)| (lid, cid, slot));
829
830 // Snapshot original sizes for rollback. We only need one entry per
831 // (label_id, col_id) pair.
832 let mut original_sizes: Vec<(u32, u32, PathBuf, u64)> = Vec::new();
833 {
834 let mut prev_key: Option<(u32, u32)> = None;
835 for &(lid, cid, _, _, _) in &writes {
836 let key = (lid, cid);
837 if prev_key == Some(key) {
838 continue;
839 }
840 prev_key = Some(key);
841 let path = self.col_path(lid, cid);
842 let original = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
843 original_sizes.push((lid, cid, path, original));
844 }
845 }
846 // Also snapshot null-bitmap files (one per col group).
847 let mut bitmap_originals: Vec<(u32, u32, PathBuf, u64)> = Vec::new();
848 {
849 let mut prev_key: Option<(u32, u32)> = None;
850 for &(lid, cid, _, _, is_present) in &writes {
851 if !is_present {
852 continue;
853 }
854 let key = (lid, cid);
855 if prev_key == Some(key) {
856 continue;
857 }
858 prev_key = Some(key);
859 let path = self.null_bitmap_path(lid, cid);
860 let original = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
861 bitmap_originals.push((lid, cid, path, original));
862 }
863 }
864
865 let write_result = (|| -> Result<()> {
866 let mut i = 0;
867 while i < writes.len() {
868 let (lid, cid, _, _, _) = writes[i];
869
870 // Find the end of this (label_id, col_id) group.
871 let group_start = i;
872 while i < writes.len() && writes[i].0 == lid && writes[i].1 == cid {
873 i += 1;
874 }
875 let group = &writes[group_start..i];
876
877 // ── Column file ──────────────────────────────────────────────
878 let path = self.col_path(lid, cid);
879 if let Some(parent) = path.parent() {
880 fs::create_dir_all(parent).map_err(Error::Io)?;
881 }
882 let mut file = fs::OpenOptions::new()
883 .create(true)
884 .truncate(false)
885 .read(true)
886 .write(true)
887 .open(&path)
888 .map_err(Error::Io)?;
889
890 let mut current_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
891
892 for &(_, _, slot, raw_value, _) in group {
893 let expected_offset = slot as u64 * 8;
894 // Pad with zeros for any skipped slots.
895 if current_len < expected_offset {
896 file.seek(SeekFrom::Start(current_len)).map_err(Error::Io)?;
897 const CHUNK: usize = 65536;
898 let zeros = [0u8; CHUNK];
899 let mut remaining = (expected_offset - current_len) as usize;
900 while remaining > 0 {
901 let n = remaining.min(CHUNK);
902 file.write_all(&zeros[..n]).map_err(Error::Io)?;
903 remaining -= n;
904 }
905 current_len = expected_offset;
906 }
907 // Seek to exact slot and write.
908 file.seek(SeekFrom::Start(expected_offset))
909 .map_err(Error::Io)?;
910 file.write_all(&raw_value.to_le_bytes())
911 .map_err(Error::Io)?;
912 // Advance current_len to reflect what we wrote.
913 let after = expected_offset + 8;
914 if after > current_len {
915 current_len = after;
916 }
917 }
918
919 // ── Null bitmap (one read-modify-write per col group) ────────
920 let present_slots: Vec<u32> = group
921 .iter()
922 .filter(|&&(_, _, _, _, is_present)| is_present)
923 .map(|&(_, _, slot, _, _)| slot)
924 .collect();
925
926 if !present_slots.is_empty() {
927 let bmap_path = self.null_bitmap_path(lid, cid);
928 if let Some(parent) = bmap_path.parent() {
929 fs::create_dir_all(parent).map_err(Error::Io)?;
930 }
931 let mut bits = if bmap_path.exists() {
932 fs::read(&bmap_path).map_err(Error::Io)?
933 } else {
934 vec![]
935 };
936 for slot in present_slots {
937 let byte_idx = (slot / 8) as usize;
938 let bit_idx = slot % 8;
939 if bits.len() <= byte_idx {
940 bits.resize(byte_idx + 1, 0);
941 }
942 bits[byte_idx] |= 1 << bit_idx;
943 }
944 fs::write(&bmap_path, &bits).map_err(Error::Io)?;
945 }
946 }
947 Ok(())
948 })();
949
950 if let Err(e) = write_result {
951 // Roll back column files.
952 for (_, _, path, original_size) in &original_sizes {
953 if path.exists() {
954 if let Err(rollback_err) = fs::OpenOptions::new()
955 .write(true)
956 .open(path)
957 .and_then(|f| f.set_len(*original_size))
958 {
959 eprintln!(
960 "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
961 path.display(),
962 original_size,
963 rollback_err
964 );
965 }
966 }
967 }
968 // Roll back null bitmap files.
969 for (_, _, path, original_size) in &bitmap_originals {
970 if path.exists() {
971 if let Err(rollback_err) = fs::OpenOptions::new()
972 .write(true)
973 .open(path)
974 .and_then(|f| f.set_len(*original_size))
975 {
976 eprintln!(
977 "CRITICAL: Failed to roll back null bitmap file {} to size {}: {}. Data may be corrupt.",
978 path.display(),
979 original_size,
980 rollback_err
981 );
982 }
983 }
984 }
985 return Err(e);
986 }
987
988 // Advance HWMs using the explicit node_slots list so that nodes with
989 // zero properties also advance the HWM, matching the contract of
990 // create_node_at_slot.
991 for &(lid, slot) in node_slots {
992 let new_hwm = slot as u64 + 1;
993 let mem_hwm = self.hwm.get(&lid).copied().unwrap_or(0);
994 if new_hwm > mem_hwm {
995 self.hwm.insert(lid, new_hwm);
996 }
997 self.hwm_dirty.insert(lid);
998 }
999
1000 Ok(())
1001 }
1002
1003 /// Create a new node in `label_id` with the given properties.
1004 ///
1005 /// Returns the new [`NodeId`] packed as `(label_id << 32) | slot`.
1006 ///
1007 /// ## Slot alignment guarantee (SPA-187)
1008 ///
1009 /// Every column file for `label_id` must have exactly `node_count * 8`
1010 /// bytes so that slot N always refers to node N across all columns. When
1011 /// a node is created without a value for an already-known column, that
1012 /// column file is zero-padded to `(slot + 1) * 8` bytes. The zero
1013 /// sentinel is recognised by `read_col_slot_nullable` as "absent" and
1014 /// surfaces as `Value::Null` in query results.
1015 pub fn create_node(&mut self, label_id: u32, props: &[(u32, Value)]) -> Result<NodeId> {
1016 // Load or get cached hwm.
1017 let hwm = if let Some(h) = self.hwm.get(&label_id) {
1018 *h
1019 } else {
1020 let h = self.load_hwm(label_id)?;
1021 self.hwm.insert(label_id, h);
1022 h
1023 };
1024
1025 let slot = hwm as u32;
1026
1027 // Collect the set of col_ids supplied for this node.
1028 let supplied_col_ids: std::collections::HashSet<u32> =
1029 props.iter().map(|&(col_id, _)| col_id).collect();
1030
1031 // Discover all columns already on disk for this label. Any that are
1032 // NOT in `supplied_col_ids` must be zero-padded so their slot count
1033 // stays in sync with the HWM (SPA-187).
1034 let existing_col_ids = self.existing_col_ids(label_id)?;
1035
1036 // Columns that need zero-padding: exist on disk but not in this node's props.
1037 let cols_to_zero_pad: Vec<u32> = existing_col_ids
1038 .iter()
1039 .copied()
1040 .filter(|col_id| !supplied_col_ids.contains(col_id))
1041 .collect();
1042
1043 // Build the full list of columns to touch for rollback tracking:
1044 // supplied columns + columns that need zero-padding.
1045 let all_col_ids_to_touch: Vec<u32> = supplied_col_ids
1046 .iter()
1047 .copied()
1048 .chain(cols_to_zero_pad.iter().copied())
1049 .collect();
1050
1051 // Snapshot the original size of each column file so we can roll back
1052 // on partial failure. A column that does not yet exist has size 0.
1053 let original_sizes: Vec<(u32, u64)> = all_col_ids_to_touch
1054 .iter()
1055 .map(|&col_id| {
1056 let path = self.col_path(label_id, col_id);
1057 let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
1058 (col_id, size)
1059 })
1060 .collect();
1061
1062 // Write each property column. For columns not in `props`, write a
1063 // zero-padded entry (the zero sentinel means "absent" / NULL).
1064 // On failure, roll back all columns that were already written to avoid
1065 // slot misalignment.
1066 let write_result = (|| {
1067 // Write supplied columns with their actual values.
1068 for &(col_id, ref val) in props {
1069 self.append_col(label_id, col_id, slot, self.encode_value(val)?)?;
1070 // Mark this slot as present in the null bitmap (SPA-207).
1071 self.set_null_bit(label_id, col_id, slot)?;
1072 }
1073 // Zero-pad existing columns that were NOT supplied for this node.
1074 // Do NOT set null bitmap bits for these — they remain absent/null.
1075 for &col_id in &cols_to_zero_pad {
1076 self.append_col(label_id, col_id, slot, 0u64)?;
1077 }
1078 Ok::<(), sparrowdb_common::Error>(())
1079 })();
1080
1081 if let Err(e) = write_result {
1082 // Truncate each column back to its original size.
1083 for (col_id, original_size) in &original_sizes {
1084 let path = self.col_path(label_id, *col_id);
1085 if path.exists() {
1086 if let Err(rollback_err) = fs::OpenOptions::new()
1087 .write(true)
1088 .open(&path)
1089 .and_then(|f| f.set_len(*original_size))
1090 {
1091 eprintln!(
1092 "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
1093 path.display(),
1094 original_size,
1095 rollback_err
1096 );
1097 }
1098 }
1099 }
1100 return Err(e);
1101 }
1102
1103 // Update hwm.
1104 let new_hwm = hwm + 1;
1105 self.save_hwm(label_id, new_hwm)?;
1106 *self.hwm.get_mut(&label_id).unwrap() = new_hwm;
1107
1108 // Pack node ID.
1109 let node_id = ((label_id as u64) << 32) | (slot as u64);
1110 Ok(NodeId(node_id))
1111 }
1112
1113 /// Write a deletion tombstone (`u64::MAX`) into `col_0.bin` for `node_id`.
1114 ///
1115 /// Creates `col_0.bin` (and its parent directory) if it does not exist,
1116 /// zero-padding all preceding slots. This ensures that nodes which were
1117 /// created without any `col_0` property are still properly marked as deleted
1118 /// and become invisible to subsequent scans.
1119 ///
1120 /// Called from [`WriteTx::commit`] when flushing a buffered `NodeDelete`.
1121 pub fn tombstone_node(&self, node_id: NodeId) -> Result<()> {
1122 use std::io::{Seek, SeekFrom, Write};
1123 let label_id = (node_id.0 >> 32) as u32;
1124 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1125 let col0 = self.col_path(label_id, 0);
1126
1127 // Ensure the parent directory exists.
1128 if let Some(parent) = col0.parent() {
1129 fs::create_dir_all(parent).map_err(Error::Io)?;
1130 }
1131
1132 let mut f = fs::OpenOptions::new()
1133 .create(true)
1134 .truncate(false)
1135 .read(true)
1136 .write(true)
1137 .open(&col0)
1138 .map_err(Error::Io)?;
1139
1140 // Zero-pad any slots before `slot` that are not yet in the file.
1141 let needed_len = (slot as u64 + 1) * 8;
1142 let existing_len = f.seek(SeekFrom::End(0)).map_err(Error::Io)?;
1143 if existing_len < needed_len {
1144 let zeros = vec![0u8; (needed_len - existing_len) as usize];
1145 f.write_all(&zeros).map_err(Error::Io)?;
1146 }
1147
1148 // Seek to the slot and write the tombstone value.
1149 f.seek(SeekFrom::Start(slot as u64 * 8))
1150 .map_err(Error::Io)?;
1151 f.write_all(&u64::MAX.to_le_bytes()).map_err(Error::Io)
1152 }
1153
1154 /// Overwrite the value of a single column for an existing node.
1155 ///
1156 /// Seeks to the slot's offset within `col_{col_id}.bin` and writes the new
1157 /// 8-byte little-endian value in-place. Returns `Err(NotFound)` if the
1158 /// slot does not exist yet.
1159 pub fn set_node_col(&self, node_id: NodeId, col_id: u32, value: &Value) -> Result<()> {
1160 use std::io::{Seek, SeekFrom, Write};
1161 let label_id = (node_id.0 >> 32) as u32;
1162 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1163 let path = self.col_path(label_id, col_id);
1164 let mut file = fs::OpenOptions::new()
1165 .write(true)
1166 .open(&path)
1167 .map_err(|_| Error::NotFound)?;
1168 let offset = slot as u64 * 8;
1169 file.seek(SeekFrom::Start(offset)).map_err(Error::Io)?;
1170 file.write_all(&self.encode_value(value)?.to_le_bytes())
1171 .map_err(Error::Io)
1172 }
1173
1174 /// Write or create a column value for a node, creating and zero-padding the
1175 /// column file if it does not yet exist.
1176 ///
1177 /// Unlike [`set_node_col`], this method creates the column file and fills all
1178 /// slots from 0 to `slot - 1` with zeros before writing the target value.
1179 /// This supports adding new property columns to existing nodes (Phase 7
1180 /// `set_property` semantics).
1181 pub fn upsert_node_col(&self, node_id: NodeId, col_id: u32, value: &Value) -> Result<()> {
1182 use std::io::{Seek, SeekFrom, Write};
1183 let label_id = (node_id.0 >> 32) as u32;
1184 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1185 let path = self.col_path(label_id, col_id);
1186
1187 // Ensure parent directory exists.
1188 if let Some(parent) = path.parent() {
1189 fs::create_dir_all(parent).map_err(Error::Io)?;
1190 }
1191
1192 // Open the file (create if absent), then pad with zeros up to and
1193 // including the target slot, and finally seek back to write the value.
1194 // We must NOT truncate: we only update a specific slot, not the whole file.
1195 let mut file = fs::OpenOptions::new()
1196 .create(true)
1197 .truncate(false)
1198 .read(true)
1199 .write(true)
1200 .open(&path)
1201 .map_err(Error::Io)?;
1202
1203 // How many bytes are already in the file?
1204 let existing_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
1205 let needed_len = (slot as u64 + 1) * 8;
1206 if existing_len < needed_len {
1207 // Extend file with zeros from existing_len to needed_len.
1208 // Write in fixed-size chunks to avoid a single large allocation
1209 // that could OOM when the node slot ID is very high.
1210 file.seek(SeekFrom::Start(existing_len))
1211 .map_err(Error::Io)?;
1212 const CHUNK: usize = 65536; // 64 KiB
1213 let zeros = [0u8; CHUNK];
1214 let mut remaining = (needed_len - existing_len) as usize;
1215 while remaining > 0 {
1216 let n = remaining.min(CHUNK);
1217 file.write_all(&zeros[..n]).map_err(Error::Io)?;
1218 remaining -= n;
1219 }
1220 }
1221
1222 // Seek to target slot and overwrite.
1223 file.seek(SeekFrom::Start(slot as u64 * 8))
1224 .map_err(Error::Io)?;
1225 file.write_all(&self.encode_value(value)?.to_le_bytes())
1226 .map_err(Error::Io)
1227 }
1228
1229 /// Retrieve all stored properties of a node.
1230 ///
1231 /// Returns `(col_id, raw_u64)` pairs in the order the columns were defined.
1232 /// The caller knows the schema (col IDs) from the catalog.
1233 pub fn get_node_raw(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, u64)>> {
1234 let label_id = (node_id.0 >> 32) as u32;
1235 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1236
1237 let mut result = Vec::with_capacity(col_ids.len());
1238 for &col_id in col_ids {
1239 let val = self.read_col_slot(label_id, col_id, slot)?;
1240 result.push((col_id, val));
1241 }
1242 Ok(result)
1243 }
1244
1245 /// Like [`get_node_raw`] but treats absent columns as `None` rather than
1246 /// propagating [`Error::NotFound`].
1247 ///
1248 /// A column is considered absent when:
1249 /// - Its column file does not exist (property never written for the label).
1250 /// - Its column file is shorter than `slot + 1` entries (sparse write —
1251 /// an earlier node never wrote this column; a later node that did write it
1252 /// padded the file, but this slot's value was never explicitly stored).
1253 ///
1254 /// This is the correct read path for IS NULL evaluation: absent properties
1255 /// must appear as `Value::Null`, not as an error or as integer 0.
1256 pub fn get_node_raw_nullable(
1257 &self,
1258 node_id: NodeId,
1259 col_ids: &[u32],
1260 ) -> Result<Vec<(u32, Option<u64>)>> {
1261 let label_id = (node_id.0 >> 32) as u32;
1262 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1263
1264 let mut result = Vec::with_capacity(col_ids.len());
1265 for &col_id in col_ids {
1266 let opt_val = self.read_col_slot_nullable(label_id, col_id, slot)?;
1267 result.push((col_id, opt_val));
1268 }
1269 Ok(result)
1270 }
1271
1272 /// Read a single column slot, returning `None` when the column was never
1273 /// written for this node (file absent or slot out of bounds / not set).
1274 ///
1275 /// Unlike [`read_col_slot`], this function distinguishes between:
1276 /// - Column file does not exist → `None` (property never set on any node).
1277 /// - Slot is beyond the file end → `None` (property not set on this node).
1278 /// - Null bitmap says slot is absent → `None` (slot was zero-padded for alignment).
1279 /// - Null bitmap says slot is present → `Some(raw)` (real value, may be 0 for Int64(0)).
1280 ///
1281 /// The null-bitmap sidecar (`col_{id}_null.bin`) is used to distinguish
1282 /// legitimately-stored 0 values (e.g. `Int64(0)`) from absent/zero-padded
1283 /// slots (SPA-207). Backward compat: if no bitmap file exists, all slots
1284 /// within the file are treated as present.
1285 fn read_col_slot_nullable(&self, label_id: u32, col_id: u32, slot: u32) -> Result<Option<u64>> {
1286 let path = self.col_path(label_id, col_id);
1287 let bytes = match fs::read(&path) {
1288 Ok(b) => b,
1289 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
1290 Err(e) => return Err(Error::Io(e)),
1291 };
1292 let offset = slot as usize * 8;
1293 if bytes.len() < offset + 8 {
1294 return Ok(None);
1295 }
1296 // Use the null-bitmap sidecar to determine whether this slot has a real
1297 // value. This replaces the old raw==0 sentinel which incorrectly treated
1298 // Int64(0) as absent (SPA-207).
1299 if !self.get_null_bit(label_id, col_id, slot)? {
1300 return Ok(None);
1301 }
1302 let raw = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
1303 Ok(Some(raw))
1304 }
1305
1306 /// Retrieve the typed property values for a node.
1307 ///
1308 /// Convenience wrapper over [`get_node_raw`] that decodes every raw `u64`
1309 /// back to a `Value`, reading the overflow string heap when needed (SPA-212).
1310 pub fn get_node(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, Value)>> {
1311 let raw = self.get_node_raw(node_id, col_ids)?;
1312 Ok(raw
1313 .into_iter()
1314 .map(|(col_id, v)| (col_id, self.decode_raw_value(v)))
1315 .collect())
1316 }
1317
1318 /// Read the entire contents of `col_{col_id}.bin` for `label_id` as a
1319 /// flat `Vec<u64>`. Returns an empty vec when the file does not exist yet.
1320 ///
1321 /// This is used by [`crate::property_index::PropertyIndex::build`] to scan
1322 /// all slot values in one `fs::read` call rather than one `read_col_slot`
1323 /// per node, making index construction O(n) rather than O(n * syscall-overhead).
1324 pub fn read_col_all(&self, label_id: u32, col_id: u32) -> Result<Vec<u64>> {
1325 let path = self.col_path(label_id, col_id);
1326 let bytes = match fs::read(&path) {
1327 Ok(b) => b,
1328 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
1329 Err(e) => return Err(Error::Io(e)),
1330 };
1331 // Interpret the byte slice as a flat array of little-endian u64s.
1332 let count = bytes.len() / 8;
1333 let mut out = Vec::with_capacity(count);
1334 for i in 0..count {
1335 let offset = i * 8;
1336 let v = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
1337 out.push(v);
1338 }
1339 Ok(out)
1340 }
1341
1342 /// Selective sorted-slot read — O(K) seeks instead of O(N) reads.
1343 ///
1344 /// For each column, opens the file once and reads only the `slots` needed,
1345 /// in slot-ascending order (for sequential-ish I/O). This is the hot path
1346 /// for hop queries where K neighbor slots ≪ N total nodes.
1347 ///
1348 /// `slots` **must** be pre-sorted ascending by the caller.
1349 /// Returns a `Vec` indexed parallel to `slots`; each inner `Vec` is indexed
1350 /// parallel to `col_ids`. Out-of-range or missing-file slots return 0.
1351 ///
1352 /// SPA-200: replaces full O(N) column loads with O(K) per-column seeks.
1353 fn read_col_slots_sorted(
1354 &self,
1355 label_id: u32,
1356 slots: &[u32],
1357 col_ids: &[u32],
1358 ) -> Result<Vec<Vec<u64>>> {
1359 if slots.is_empty() || col_ids.is_empty() {
1360 // Return a row of empty vecs parallel to slots
1361 return Ok(slots.iter().map(|_| vec![0u64; col_ids.len()]).collect());
1362 }
1363
1364 // result[slot_idx][col_idx]
1365 let mut result: Vec<Vec<u64>> = slots.iter().map(|_| vec![0u64; col_ids.len()]).collect();
1366
1367 for (col_idx, &col_id) in col_ids.iter().enumerate() {
1368 let path = self.col_path(label_id, col_id);
1369 let mut file = match fs::File::open(&path) {
1370 Ok(f) => f,
1371 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1372 // Column file doesn't exist — all slots stay 0
1373 continue;
1374 }
1375 Err(e) => return Err(Error::Io(e)),
1376 };
1377
1378 let file_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
1379 // Reset to start for sequential reads
1380 file.seek(SeekFrom::Start(0)).map_err(Error::Io)?;
1381
1382 let mut buf = [0u8; 8];
1383 let mut current_pos: u64 = 0;
1384
1385 for (slot_idx, &slot) in slots.iter().enumerate() {
1386 let target_pos = slot as u64 * 8;
1387 if target_pos + 8 > file_len {
1388 // Slot beyond end of file — leave as 0
1389 continue;
1390 }
1391 if target_pos != current_pos {
1392 file.seek(SeekFrom::Start(target_pos)).map_err(Error::Io)?;
1393 current_pos = target_pos;
1394 }
1395 file.read_exact(&mut buf).map_err(Error::Io)?;
1396 current_pos += 8;
1397 result[slot_idx][col_idx] = u64::from_le_bytes(buf);
1398 }
1399 }
1400
1401 Ok(result)
1402 }
1403
1404 /// Batch-read multiple slots from multiple columns.
1405 ///
1406 /// Chooses between two strategies based on the K/N ratio:
1407 /// - **Sorted-slot reads** (SPA-200): when K ≪ N, seeks to each slot
1408 /// offset — O(K × C) I/O instead of O(N × C). Slots are sorted
1409 /// ascending before the read so seeks are sequential.
1410 /// - **Full-column load**: when K is close to N (>50% of column), reading
1411 /// the whole file is cheaper than many random seeks.
1412 ///
1413 /// All `slots` must belong to `label_id`.
1414 /// Returns a `Vec` indexed parallel to `slots`; inner `Vec` indexed
1415 /// parallel to `col_ids`. Missing/out-of-range slots return 0.
1416 pub fn batch_read_node_props(
1417 &self,
1418 label_id: u32,
1419 slots: &[u32],
1420 col_ids: &[u32],
1421 ) -> Result<Vec<Vec<u64>>> {
1422 if slots.is_empty() {
1423 return Ok(vec![]);
1424 }
1425
1426 // Determine the column high-water mark (total node count for this label).
1427 let hwm = self.hwm_for_label(label_id).unwrap_or(0) as usize;
1428
1429 // Use sorted-slot reads when K < 50% of N. For K=100, N=4039 that is
1430 // a ~40× reduction in bytes read per column. The HWM=0 guard handles
1431 // labels that have no nodes yet (shouldn't reach here, but be safe).
1432 let use_sorted = hwm == 0 || slots.len() * 2 < hwm;
1433
1434 if use_sorted {
1435 // Sort slots ascending for sequential I/O; keep a permutation index
1436 // so we can return results in the original caller order.
1437 let mut order: Vec<usize> = (0..slots.len()).collect();
1438 order.sort_unstable_by_key(|&i| slots[i]);
1439 let sorted_slots: Vec<u32> = order.iter().map(|&i| slots[i]).collect();
1440
1441 let sorted_result = self.read_col_slots_sorted(label_id, &sorted_slots, col_ids)?;
1442
1443 // Re-order back to the original slot order expected by the caller.
1444 let mut result = vec![vec![0u64; col_ids.len()]; slots.len()];
1445 for (sorted_idx, orig_idx) in order.into_iter().enumerate() {
1446 result[orig_idx] = sorted_result[sorted_idx].clone();
1447 }
1448 Ok(result)
1449 } else {
1450 // Fall back to full column load when most slots are needed anyway.
1451 let col_data: Vec<Vec<u64>> = col_ids
1452 .iter()
1453 .map(|&col_id| self.read_col_all(label_id, col_id))
1454 .collect::<Result<_>>()?;
1455 Ok(slots
1456 .iter()
1457 .map(|&slot| {
1458 col_ids
1459 .iter()
1460 .enumerate()
1461 .map(|(ci, _)| col_data[ci].get(slot as usize).copied().unwrap_or(0))
1462 .collect()
1463 })
1464 .collect())
1465 }
1466 }
1467}
1468
1469// ── Helpers ─────────────────────────────────────────────────────────────────── ───────────────────────────────────────────────────────────────────
1470
1471/// Unpack `(label_id, slot)` from a [`NodeId`].
1472pub fn unpack_node_id(node_id: NodeId) -> (u32, u32) {
1473 let label_id = (node_id.0 >> 32) as u32;
1474 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
1475 (label_id, slot)
1476}
1477
1478// ── Tests ─────────────────────────────────────────────────────────────────────
1479
1480#[cfg(test)]
1481mod tests {
1482 use super::*;
1483 use tempfile::tempdir;
1484
1485 #[test]
1486 fn test_node_create_and_get() {
1487 let dir = tempdir().unwrap();
1488 let mut store = NodeStore::open(dir.path()).unwrap();
1489
1490 let label_id = 1u32;
1491 let props = vec![(0u32, Value::Int64(42)), (1u32, Value::Int64(100))];
1492
1493 let node_id = store.create_node(label_id, &props).unwrap();
1494
1495 // Verify the packed node ID.
1496 let (lid, slot) = unpack_node_id(node_id);
1497 assert_eq!(lid, label_id);
1498 assert_eq!(slot, 0); // first node → slot 0
1499
1500 // Get back the values.
1501 let retrieved = store.get_node(node_id, &[0, 1]).unwrap();
1502 assert_eq!(retrieved.len(), 2);
1503 assert_eq!(retrieved[0], (0, Value::Int64(42)));
1504 assert_eq!(retrieved[1], (1, Value::Int64(100)));
1505 }
1506
1507 #[test]
1508 fn test_node_multiple_nodes_sequential_slots() {
1509 let dir = tempdir().unwrap();
1510 let mut store = NodeStore::open(dir.path()).unwrap();
1511
1512 let n1 = store.create_node(1, &[(0, Value::Int64(10))]).unwrap();
1513 let n2 = store.create_node(1, &[(0, Value::Int64(20))]).unwrap();
1514 let n3 = store.create_node(1, &[(0, Value::Int64(30))]).unwrap();
1515
1516 let (_, s1) = unpack_node_id(n1);
1517 let (_, s2) = unpack_node_id(n2);
1518 let (_, s3) = unpack_node_id(n3);
1519 assert_eq!(s1, 0);
1520 assert_eq!(s2, 1);
1521 assert_eq!(s3, 2);
1522
1523 assert_eq!(store.get_node(n1, &[0]).unwrap()[0].1, Value::Int64(10));
1524 assert_eq!(store.get_node(n2, &[0]).unwrap()[0].1, Value::Int64(20));
1525 assert_eq!(store.get_node(n3, &[0]).unwrap()[0].1, Value::Int64(30));
1526 }
1527
1528 #[test]
1529 fn test_node_persists_across_reopen() {
1530 let dir = tempdir().unwrap();
1531
1532 let node_id = {
1533 let mut store = NodeStore::open(dir.path()).unwrap();
1534 store
1535 .create_node(2, &[(0, Value::Int64(999)), (1, Value::Int64(-1))])
1536 .unwrap()
1537 };
1538
1539 // Reopen store from disk.
1540 let store2 = NodeStore::open(dir.path()).unwrap();
1541 let vals = store2.get_node(node_id, &[0, 1]).unwrap();
1542 assert_eq!(vals[0].1, Value::Int64(999));
1543 assert_eq!(vals[1].1, Value::Int64(-1));
1544 }
1545
1546 #[test]
1547 fn test_node_hwm_persists_across_reopen() {
1548 let dir = tempdir().unwrap();
1549
1550 // Create 3 nodes in session 1.
1551 {
1552 let mut store = NodeStore::open(dir.path()).unwrap();
1553 store.create_node(0, &[(0, Value::Int64(1))]).unwrap();
1554 store.create_node(0, &[(0, Value::Int64(2))]).unwrap();
1555 store.create_node(0, &[(0, Value::Int64(3))]).unwrap();
1556 }
1557
1558 // Reopen and create a 4th node — must get slot 3.
1559 let mut store2 = NodeStore::open(dir.path()).unwrap();
1560 let n4 = store2.create_node(0, &[(0, Value::Int64(4))]).unwrap();
1561 let (_, slot) = unpack_node_id(n4);
1562 assert_eq!(slot, 3);
1563 }
1564
1565 #[test]
1566 fn test_node_different_labels_independent() {
1567 let dir = tempdir().unwrap();
1568 let mut store = NodeStore::open(dir.path()).unwrap();
1569
1570 let a = store.create_node(10, &[(0, Value::Int64(1))]).unwrap();
1571 let b = store.create_node(20, &[(0, Value::Int64(2))]).unwrap();
1572
1573 let (la, sa) = unpack_node_id(a);
1574 let (lb, sb) = unpack_node_id(b);
1575 assert_eq!(la, 10);
1576 assert_eq!(sa, 0);
1577 assert_eq!(lb, 20);
1578 assert_eq!(sb, 0);
1579 }
1580}