sparrowdb_storage/node_store.rs
1//! Node property storage.
2//!
3//! Nodes are stored as typed property columns. Each `(label_id, col_id)` pair
4//! maps to a flat binary file of fixed-width values. Valid `u64` values pack
5//! `(label_id: u32, slot: u32)` into a single `u64` node ID consistent with
6//! [`sparrowdb_common::NodeId`] semantics.
7//!
8//! ## File layout
9//!
10//! ```text
11//! nodes/{label_id}/col_{col_id}.bin
12//! ```
13//!
14//! Each column file is a flat array of `u64` LE values (one per slot).
15//! The high-water mark is tracked in-memory and written to a small header file:
16//!
17//! ```text
18//! nodes/{label_id}/hwm.bin — [hwm: u64 LE]
19//! ```
20//!
21//! ## Node ID packing
22//!
23//! ```text
24//! node_id = (label_id as u64) << 32 | slot as u64
25//! ```
26//!
27//! Upper 32 bits are `label_id`, lower 32 bits are the within-label slot number.
28
29use std::collections::HashMap;
30use std::fs;
31use std::path::{Path, PathBuf};
32
33use sparrowdb_common::{Error, NodeId, Result};
34
35// ── Value type ────────────────────────────────────────────────────────────────
36
37/// A typed property value.
38#[derive(Debug, Clone, PartialEq)]
39pub enum Value {
40 /// Signed 64-bit integer, stored as raw `u64` bits (two's-complement).
41 Int64(i64),
42 /// Raw byte blob, stored as a fixed-width 8-byte reference in v1.
43 /// The actual bytes are placed inline for values ≤ 8 bytes; longer blobs
44 /// are truncated and marked with a sentinel in v1 (overflow deferred).
45 Bytes(Vec<u8>),
46 /// IEEE-754 double-precision float. Stored as 8 raw bytes in the overflow
47 /// heap so that no bits are masked by the type-tag scheme (SPA-267).
48 Float(f64),
49}
50
51/// Type tag embedded in the top byte (byte index 7 in LE) of a stored `u64`.
52///
53/// - `0x00` = `Int64` — lower 7 bytes hold the signed integer (56-bit range).
54/// - `0x01` = `Bytes` — lower 7 bytes hold up to 7 bytes of inline string data.
55/// - `0x02` = `BytesOverflow` — lower 7 bytes encode `(offset: u40 LE, len: u16 LE)`
56/// pointing into `strings.bin` (SPA-212).
57/// - `0x03` = `Float` — lower 7 bytes encode `(offset: u40 LE, len: u16 LE)`
58/// pointing into `strings.bin` where 8 raw IEEE-754 bytes are stored (SPA-267).
59///
60/// The overflow encoding packs a heap pointer into 7 bytes:
61/// bytes[0..5] = heap byte offset (u40 LE, max ~1 TiB)
62/// bytes[5..7] = byte length (u16 LE, max 65535 bytes)
63///
64/// This lets `decode_raw_value` reconstruct strings of any length, fixing SPA-212.
65const TAG_INT64: u8 = 0x00;
66const TAG_BYTES: u8 = 0x01;
67/// Tag for strings > 7 bytes stored in the overflow string heap (SPA-212).
68const TAG_BYTES_OVERFLOW: u8 = 0x02;
69/// Tag for f64 values stored as 8 raw IEEE-754 bytes in the overflow heap (SPA-267).
70/// Using the heap ensures all 64 bits of the float are preserved without any masking.
71const TAG_FLOAT: u8 = 0x03;
72/// Maximum bytes that fit inline in the 7-byte payload (one byte is the tag).
73const MAX_INLINE_BYTES: usize = 7;
74
75impl Value {
76 /// Encode as a packed `u64` for column storage.
77 ///
78 /// The top byte (byte 7 in little-endian) is a type tag; the remaining
79 /// 7 bytes carry the payload. This allows `from_u64` to reconstruct the
80 /// correct variant at read time (SPA-169).
81 ///
82 /// For `Bytes` values that exceed 7 bytes, this method only encodes the
83 /// first 7 bytes inline. Callers that need full overflow support must use
84 /// [`NodeStore::encode_value`] instead, which writes long strings to the
85 /// heap and returns an overflow-tagged u64 (SPA-212).
86 ///
87 /// # Int64 range
88 /// Only the lower 56 bits of the integer are stored. This covers all
89 /// practical node IDs and numeric property values; very large i64 values
90 /// (> 2^55 or < -2^55) would be truncated. Full 64-bit range is deferred
91 /// to a later overflow encoding.
92 pub fn to_u64(&self) -> u64 {
93 match self {
94 Value::Int64(v) => {
95 // Top byte = TAG_INT64 (0x00); lower 7 bytes = lower 56 bits of v.
96 // For TAG_INT64 = 0x00 this is just the value masked to 56 bits,
97 // which is a no-op for any i64 whose top byte is already 0x00.
98 let payload = (*v as u64) & 0x00FF_FFFF_FFFF_FFFF;
99 // Tag byte goes into byte 7 (the most significant byte in LE).
100 payload | ((TAG_INT64 as u64) << 56)
101 }
102 Value::Bytes(b) => {
103 let mut arr = [0u8; 8];
104 arr[7] = TAG_BYTES; // type tag in top byte
105 let len = b.len().min(MAX_INLINE_BYTES);
106 arr[..len].copy_from_slice(&b[..len]);
107 u64::from_le_bytes(arr)
108 }
109 Value::Float(_) => {
110 // Float values require heap storage — callers must use
111 // NodeStore::encode_value instead of Value::to_u64.
112 panic!("Value::Float cannot be inline-encoded; use NodeStore::encode_value");
113 }
114 }
115 }
116
117 /// Reconstruct a `Value` from a stored `u64`, using the top byte as a
118 /// type tag (SPA-169).
119 ///
120 /// Only handles inline encodings (`TAG_INT64` and `TAG_BYTES`).
121 /// For overflow strings (`TAG_BYTES_OVERFLOW`), use [`NodeStore::decode_raw_value`]
122 /// which has access to the string heap (SPA-212).
123 pub fn from_u64(v: u64) -> Self {
124 let bytes = v.to_le_bytes(); // bytes[7] = top byte = tag
125 match bytes[7] {
126 TAG_BYTES => {
127 // Inline string: bytes[0..7] hold the data; strip trailing zeros.
128 let data: Vec<u8> = bytes[..7].iter().copied().take_while(|&b| b != 0).collect();
129 Value::Bytes(data)
130 }
131 _ => {
132 // TAG_INT64 (0x00) or any unrecognised tag → Int64.
133 // Sign-extend from 56 bits: shift left 8 to bring bit 55 into
134 // sign position, then arithmetic shift right 8.
135 let shifted = (v << 8) as i64;
136 Value::Int64(shifted >> 8)
137 }
138 }
139 }
140
141 /// Reconstruct an `Int64` value from a stored `u64`.
142 ///
143 /// Preserved for callers that know the column type is always Int64 (e.g.
144 /// pre-SPA-169 paths). New code should prefer `from_u64`.
145 pub fn int64_from_u64(v: u64) -> Self {
146 Value::Int64(v as i64)
147 }
148}
149
150// ── NodeStore ─────────────────────────────────────────────────────────────────
151
152/// Persistent node property store rooted at a database directory.
153///
154/// On-disk layout:
155/// ```text
156/// {root}/nodes/{label_id}/hwm.bin — high-water mark (u64 LE)
157/// {root}/nodes/{label_id}/col_{col_id}.bin — flat u64 column array
158/// {root}/strings.bin — overflow string heap (SPA-212)
159/// ```
160///
161/// The overflow heap is an append-only byte file. Each entry is a raw byte
162/// sequence (no length prefix); the offset and length are encoded into the
163/// `TAG_BYTES_OVERFLOW` u64 stored in the column file.
164pub struct NodeStore {
165 root: PathBuf,
166 /// In-memory high-water marks per label. Loaded lazily from disk.
167 hwm: HashMap<u32, u64>,
168}
169
170impl NodeStore {
171 /// Open (or create) a node store rooted at `db_root`.
172 pub fn open(db_root: &Path) -> Result<Self> {
173 Ok(NodeStore {
174 root: db_root.to_path_buf(),
175 hwm: HashMap::new(),
176 })
177 }
178
179 // ── Internal helpers ──────────────────────────────────────────────────────
180
181 fn label_dir(&self, label_id: u32) -> PathBuf {
182 self.root.join("nodes").join(label_id.to_string())
183 }
184
185 fn hwm_path(&self, label_id: u32) -> PathBuf {
186 self.label_dir(label_id).join("hwm.bin")
187 }
188
189 fn col_path(&self, label_id: u32, col_id: u32) -> PathBuf {
190 self.label_dir(label_id).join(format!("col_{col_id}.bin"))
191 }
192
193 /// Path to the overflow string heap (shared across all labels).
194 fn strings_bin_path(&self) -> PathBuf {
195 self.root.join("strings.bin")
196 }
197
198 // ── Overflow string heap (SPA-212) ────────────────────────────────────────
199
200 /// Append `bytes` to the overflow string heap and return an
201 /// `TAG_BYTES_OVERFLOW`-tagged `u64` encoding the (offset, len) pair.
202 ///
203 /// Layout of the returned `u64` (little-endian bytes):
204 /// bytes[0..5] = heap byte offset as u40 LE (max ~1 TiB)
205 /// bytes[5..7] = byte length as u16 LE (max 65 535 bytes)
206 /// bytes[7] = `TAG_BYTES_OVERFLOW` (0x02)
207 fn append_to_string_heap(&self, bytes: &[u8]) -> Result<u64> {
208 use std::io::{Seek, SeekFrom, Write};
209 let path = self.strings_bin_path();
210 let mut file = fs::OpenOptions::new()
211 .create(true)
212 .truncate(false)
213 .append(false)
214 .read(true)
215 .write(true)
216 .open(&path)
217 .map_err(Error::Io)?;
218
219 // The heap offset is the current end of the file.
220 let offset = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
221 file.write_all(bytes).map_err(Error::Io)?;
222
223 // Encode (offset, len) into a 7-byte payload.
224 let len = bytes.len() as u64;
225 debug_assert!(
226 offset <= 0x00FF_FFFF_FFFF_u64,
227 "string heap too large for 5-byte offset"
228 );
229 debug_assert!(len <= 0xFFFF, "string longer than 65535 bytes");
230
231 let mut arr = [0u8; 8];
232 // Store offset in bytes[0..5] (40-bit LE).
233 arr[0] = offset as u8;
234 arr[1] = (offset >> 8) as u8;
235 arr[2] = (offset >> 16) as u8;
236 arr[3] = (offset >> 24) as u8;
237 arr[4] = (offset >> 32) as u8;
238 // Store len in bytes[5..7] (16-bit LE).
239 arr[5] = len as u8;
240 arr[6] = (len >> 8) as u8;
241 // Tag byte.
242 arr[7] = TAG_BYTES_OVERFLOW;
243 Ok(u64::from_le_bytes(arr))
244 }
245
246 /// Read string bytes from the overflow heap given an `TAG_BYTES_OVERFLOW`
247 /// tagged `u64` produced by [`append_to_string_heap`].
248 fn read_from_string_heap(&self, tagged: u64) -> Result<Vec<u8>> {
249 let arr = tagged.to_le_bytes();
250 debug_assert_eq!(arr[7], TAG_BYTES_OVERFLOW, "not an overflow pointer");
251
252 // Decode (offset, len).
253 let offset = arr[0] as u64
254 | ((arr[1] as u64) << 8)
255 | ((arr[2] as u64) << 16)
256 | ((arr[3] as u64) << 24)
257 | ((arr[4] as u64) << 32);
258 let len = arr[5] as usize | ((arr[6] as usize) << 8);
259
260 let path = self.strings_bin_path();
261 let file_bytes = fs::read(&path).map_err(Error::Io)?;
262 let end = offset as usize + len;
263 if file_bytes.len() < end {
264 return Err(Error::Corruption(format!(
265 "string heap too short: need {} bytes, have {}",
266 end,
267 file_bytes.len()
268 )));
269 }
270 Ok(file_bytes[offset as usize..end].to_vec())
271 }
272
273 // ── Value encode / decode with overflow support ───────────────────────────
274
275 /// Encode a `Value` for column storage, writing long `Bytes` strings to
276 /// the overflow heap (SPA-212).
277 ///
278 /// - `Int64` → identical to `Value::to_u64()`.
279 /// - `Bytes` ≤ 7 B → inline `TAG_BYTES` encoding, identical to `Value::to_u64()`.
280 /// - `Bytes` > 7 B → appended to `strings.bin`; returns `TAG_BYTES_OVERFLOW` u64.
281 /// - `Float` → 8 raw IEEE-754 bytes appended to `strings.bin`;
282 /// returns a `TAG_FLOAT` u64 so all 64 float bits are preserved (SPA-267).
283 pub fn encode_value(&self, val: &Value) -> Result<u64> {
284 match val {
285 Value::Int64(_) => Ok(val.to_u64()),
286 Value::Bytes(b) if b.len() <= MAX_INLINE_BYTES => Ok(val.to_u64()),
287 Value::Bytes(b) => self.append_to_string_heap(b),
288 // SPA-267: store all 8 float bytes in the heap so no bits are masked.
289 // The heap pointer uses the same (offset: u40, len: u16) layout as
290 // TAG_BYTES_OVERFLOW but with TAG_FLOAT in byte 7.
291 Value::Float(f) => {
292 let bits = f.to_bits().to_le_bytes();
293 let heap_tagged = self.append_to_string_heap(&bits)?;
294 // Replace the TAG_BYTES_OVERFLOW tag byte with TAG_FLOAT.
295 let payload = heap_tagged & 0x00FF_FFFF_FFFF_FFFF;
296 Ok((TAG_FLOAT as u64) << 56 | payload)
297 }
298 }
299 }
300
301 /// Decode a raw `u64` column value back to a `Value`, reading the
302 /// overflow string heap when the tag is `TAG_BYTES_OVERFLOW` or `TAG_FLOAT` (SPA-212, SPA-267).
303 ///
304 /// Handles all four tags:
305 /// - `TAG_INT64` → `Value::Int64`
306 /// - `TAG_BYTES` → `Value::Bytes` (inline, ≤ 7 bytes)
307 /// - `TAG_BYTES_OVERFLOW` → `Value::Bytes` (from heap)
308 /// - `TAG_FLOAT` → `Value::Float` (8 raw IEEE-754 bytes from heap)
309 pub fn decode_raw_value(&self, raw: u64) -> Value {
310 let tag = (raw >> 56) as u8;
311 match tag {
312 TAG_BYTES_OVERFLOW => match self.read_from_string_heap(raw) {
313 Ok(bytes) => Value::Bytes(bytes),
314 Err(e) => {
315 // Corruption fallback: return empty bytes and log.
316 eprintln!(
317 "WARN: failed to read overflow string from heap (raw={raw:#018x}): {e}"
318 );
319 Value::Bytes(Vec::new())
320 }
321 },
322 // SPA-267: float values are stored as 8-byte IEEE-754 blobs in the heap.
323 // Reconstruct the heap pointer by swapping TAG_FLOAT → TAG_BYTES_OVERFLOW
324 // so read_from_string_heap can locate the bytes.
325 TAG_FLOAT => {
326 let payload = raw & 0x00FF_FFFF_FFFF_FFFF;
327 let heap_tagged = (TAG_BYTES_OVERFLOW as u64) << 56 | payload;
328 match self.read_from_string_heap(heap_tagged) {
329 Ok(bytes) if bytes.len() == 8 => {
330 let arr: [u8; 8] = bytes.try_into().unwrap();
331 Value::Float(f64::from_bits(u64::from_le_bytes(arr)))
332 }
333 Ok(bytes) => {
334 eprintln!(
335 "WARN: float heap blob has unexpected length {} (raw={raw:#018x})",
336 bytes.len()
337 );
338 Value::Float(f64::NAN)
339 }
340 Err(e) => {
341 eprintln!("WARN: failed to read float from heap (raw={raw:#018x}): {e}");
342 Value::Float(f64::NAN)
343 }
344 }
345 }
346 _ => Value::from_u64(raw),
347 }
348 }
349
350 /// Check whether a raw stored `u64` encodes a string equal to `s`.
351 ///
352 /// Handles both inline (`TAG_BYTES`) and overflow (`TAG_BYTES_OVERFLOW`)
353 /// encodings (SPA-212). Used by WHERE-clause and prop-filter comparison.
354 pub fn raw_str_matches(&self, raw: u64, s: &str) -> bool {
355 let tag = (raw >> 56) as u8;
356 match tag {
357 TAG_BYTES => {
358 // Fast inline comparison: encode s the same way and compare u64s.
359 raw == Value::Bytes(s.as_bytes().to_vec()).to_u64()
360 }
361 TAG_BYTES_OVERFLOW => {
362 // Overflow: read from heap and compare bytes.
363 match self.read_from_string_heap(raw) {
364 Ok(bytes) => bytes == s.as_bytes(),
365 Err(_) => false,
366 }
367 }
368 _ => false, // INT64 or unknown — not a string
369 }
370 }
371
372 /// Read the high-water mark for `label_id` from disk (or return 0).
373 fn load_hwm(&self, label_id: u32) -> Result<u64> {
374 let path = self.hwm_path(label_id);
375 if !path.exists() {
376 return Ok(0);
377 }
378 let bytes = fs::read(&path).map_err(Error::Io)?;
379 if bytes.len() < 8 {
380 return Err(Error::Corruption(format!(
381 "hwm.bin for label {label_id} is truncated"
382 )));
383 }
384 Ok(u64::from_le_bytes(bytes[..8].try_into().unwrap()))
385 }
386
387 /// Write the high-water mark for `label_id` to disk.
388 fn save_hwm(&self, label_id: u32, hwm: u64) -> Result<()> {
389 let path = self.hwm_path(label_id);
390 if let Some(parent) = path.parent() {
391 fs::create_dir_all(parent).map_err(Error::Io)?;
392 }
393 fs::write(&path, hwm.to_le_bytes()).map_err(Error::Io)
394 }
395
396 /// Append a `u64` value to a column file.
397 fn append_col(&self, label_id: u32, col_id: u32, slot: u32, value: u64) -> Result<()> {
398 use std::io::{Seek, SeekFrom, Write};
399 let path = self.col_path(label_id, col_id);
400 if let Some(parent) = path.parent() {
401 fs::create_dir_all(parent).map_err(Error::Io)?;
402 }
403 // Open without truncation so we can inspect the current length.
404 let mut file = fs::OpenOptions::new()
405 .create(true)
406 .truncate(false)
407 .read(true)
408 .write(true)
409 .open(&path)
410 .map_err(Error::Io)?;
411
412 // Pad with zeros for any slots that were skipped (sparse write pattern).
413 // Without padding, a later slot's value would be written at offset 0,
414 // causing earlier slots to incorrectly read that value.
415 let existing_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
416 let expected_offset = slot as u64 * 8;
417 if existing_len < expected_offset {
418 file.seek(SeekFrom::Start(existing_len))
419 .map_err(Error::Io)?;
420 const CHUNK: usize = 65536;
421 let zeros = [0u8; CHUNK];
422 let mut remaining = (expected_offset - existing_len) as usize;
423 while remaining > 0 {
424 let n = remaining.min(CHUNK);
425 file.write_all(&zeros[..n]).map_err(Error::Io)?;
426 remaining -= n;
427 }
428 }
429
430 // Seek to the correct slot position and write the value.
431 file.seek(SeekFrom::Start(expected_offset))
432 .map_err(Error::Io)?;
433 file.write_all(&value.to_le_bytes()).map_err(Error::Io)
434 }
435
436 /// Read the `u64` stored at `slot` in the given column file.
437 ///
438 /// Returns `Ok(0)` when the column file does not exist yet — a missing file
439 /// means no value has ever been written for this `(label_id, col_id)` pair,
440 /// which is represented as the zero bit-pattern (SPA-166).
441 fn read_col_slot(&self, label_id: u32, col_id: u32, slot: u32) -> Result<u64> {
442 let path = self.col_path(label_id, col_id);
443 let bytes = match fs::read(&path) {
444 Ok(b) => b,
445 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
446 Err(e) => return Err(Error::Io(e)),
447 };
448 let offset = slot as usize * 8;
449 if bytes.len() < offset + 8 {
450 return Err(Error::NotFound);
451 }
452 Ok(u64::from_le_bytes(
453 bytes[offset..offset + 8].try_into().unwrap(),
454 ))
455 }
456
457 // ── Public API ────────────────────────────────────────────────────────────
458
459 /// Return the high-water mark (slot count) for a label.
460 ///
461 /// Returns `0` if no nodes have been created for that label yet.
462 pub fn hwm_for_label(&self, label_id: u32) -> Result<u64> {
463 if let Some(&h) = self.hwm.get(&label_id) {
464 return Ok(h);
465 }
466 self.load_hwm(label_id)
467 }
468
469 /// Discover all column IDs that currently exist on disk for `label_id`.
470 ///
471 /// Scans the label directory for `col_{id}.bin` files and returns the
472 /// parsed `col_id` values. Used by `create_node` to zero-pad columns
473 /// that are not supplied for a new node (SPA-187).
474 ///
475 /// Returns `Err` when the directory exists but cannot be read (e.g.
476 /// permissions failure or I/O error). A missing directory is not an
477 /// error — it simply means no nodes of this label have been created yet.
478 pub fn col_ids_for_label(&self, label_id: u32) -> Result<Vec<u32>> {
479 self.existing_col_ids(label_id)
480 }
481
482 fn existing_col_ids(&self, label_id: u32) -> Result<Vec<u32>> {
483 let dir = self.label_dir(label_id);
484 let read_dir = match fs::read_dir(&dir) {
485 Ok(rd) => rd,
486 // Directory does not exist yet → no columns on disk.
487 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
488 Err(e) => return Err(Error::Io(e)),
489 };
490 let mut ids: Vec<u32> = read_dir
491 .flatten()
492 .filter_map(|entry| {
493 let name = entry.file_name();
494 let name_str = name.to_string_lossy().into_owned();
495 // Match "col_{col_id}.bin" filenames.
496 let id_str = name_str.strip_prefix("col_")?.strip_suffix(".bin")?;
497 id_str.parse::<u32>().ok()
498 })
499 .collect();
500 ids.sort_unstable();
501 Ok(ids)
502 }
503
504 /// Return the **on-disk** high-water mark for a label, bypassing any
505 /// in-memory advances made by `peek_next_slot`.
506 ///
507 /// Used by [`WriteTx::merge_node`] to limit the disk scan to only slots
508 /// that have actually been persisted.
509 pub fn disk_hwm_for_label(&self, label_id: u32) -> Result<u64> {
510 self.load_hwm(label_id)
511 }
512
513 /// Reserve the slot index that the *next* `create_node` call will use for
514 /// `label_id`, advancing the in-memory HWM so that the slot is not
515 /// assigned again within the same [`NodeStore`] instance.
516 ///
517 /// This is used by [`WriteTx::create_node`] to pre-compute a [`NodeId`]
518 /// before the actual disk write, so the ID can be returned to the caller
519 /// while the write is deferred until commit (SPA-181).
520 ///
521 /// The on-disk HWM is **not** updated here; it is updated when the
522 /// buffered `NodeCreate` operation is applied in `commit()`.
523 pub fn peek_next_slot(&mut self, label_id: u32) -> Result<u32> {
524 // Load from disk if not cached yet.
525 if !self.hwm.contains_key(&label_id) {
526 let h = self.load_hwm(label_id)?;
527 self.hwm.insert(label_id, h);
528 }
529 let h = *self.hwm.get(&label_id).unwrap();
530 // Advance the in-memory HWM so a subsequent peek returns the next slot.
531 self.hwm.insert(label_id, h + 1);
532 Ok(h as u32)
533 }
534
535 /// Write a node at a pre-reserved `slot` (SPA-181 commit path).
536 ///
537 /// Like [`create_node`] but uses the caller-specified `slot` index instead
538 /// of deriving it from the HWM. Used by [`WriteTx::commit`] to flush
539 /// buffered node-create operations in the exact order they were issued,
540 /// with slots that were already pre-allocated by [`peek_next_slot`].
541 ///
542 /// Advances the on-disk HWM to `slot + 1` (or higher if already past that).
543 pub fn create_node_at_slot(
544 &mut self,
545 label_id: u32,
546 slot: u32,
547 props: &[(u32, Value)],
548 ) -> Result<NodeId> {
549 // Snapshot original column sizes for rollback on partial failure.
550 let original_sizes: Vec<(u32, u64)> = props
551 .iter()
552 .map(|&(col_id, _)| {
553 let path = self.col_path(label_id, col_id);
554 let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
555 (col_id, size)
556 })
557 .collect();
558
559 let write_result = (|| {
560 for &(col_id, ref val) in props {
561 self.append_col(label_id, col_id, slot, self.encode_value(val)?)?;
562 }
563 Ok::<(), sparrowdb_common::Error>(())
564 })();
565
566 if let Err(e) = write_result {
567 for (col_id, original_size) in &original_sizes {
568 let path = self.col_path(label_id, *col_id);
569 if path.exists() {
570 if let Err(rollback_err) = fs::OpenOptions::new()
571 .write(true)
572 .open(&path)
573 .and_then(|f| f.set_len(*original_size))
574 {
575 eprintln!(
576 "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
577 path.display(),
578 original_size,
579 rollback_err
580 );
581 }
582 }
583 }
584 return Err(e);
585 }
586
587 // Advance the on-disk HWM to at least slot + 1.
588 // Always write to disk; in-memory HWM may have been speculatively
589 // advanced by peek_next_slot but the disk HWM may be lower.
590 let new_hwm = slot as u64 + 1;
591 let disk_hwm = self.load_hwm(label_id)?;
592 if new_hwm > disk_hwm {
593 if let Err(e) = self.save_hwm(label_id, new_hwm) {
594 // Column bytes were already written; roll them back to preserve
595 // the atomicity guarantee (SPA-181).
596 for (col_id, original_size) in &original_sizes {
597 let path = self.col_path(label_id, *col_id);
598 if path.exists() {
599 if let Err(rollback_err) = fs::OpenOptions::new()
600 .write(true)
601 .open(&path)
602 .and_then(|f| f.set_len(*original_size))
603 {
604 eprintln!(
605 "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
606 path.display(),
607 original_size,
608 rollback_err
609 );
610 }
611 }
612 }
613 return Err(e);
614 }
615 }
616 // Keep in-memory HWM at least as high as new_hwm.
617 let mem_hwm = self.hwm.get(&label_id).copied().unwrap_or(0);
618 if new_hwm > mem_hwm {
619 self.hwm.insert(label_id, new_hwm);
620 }
621
622 Ok(NodeId((label_id as u64) << 32 | slot as u64))
623 }
624
625 /// Create a new node in `label_id` with the given properties.
626 ///
627 /// Returns the new [`NodeId`] packed as `(label_id << 32) | slot`.
628 ///
629 /// ## Slot alignment guarantee (SPA-187)
630 ///
631 /// Every column file for `label_id` must have exactly `node_count * 8`
632 /// bytes so that slot N always refers to node N across all columns. When
633 /// a node is created without a value for an already-known column, that
634 /// column file is zero-padded to `(slot + 1) * 8` bytes. The zero
635 /// sentinel is recognised by `read_col_slot_nullable` as "absent" and
636 /// surfaces as `Value::Null` in query results.
637 pub fn create_node(&mut self, label_id: u32, props: &[(u32, Value)]) -> Result<NodeId> {
638 // Load or get cached hwm.
639 let hwm = if let Some(h) = self.hwm.get(&label_id) {
640 *h
641 } else {
642 let h = self.load_hwm(label_id)?;
643 self.hwm.insert(label_id, h);
644 h
645 };
646
647 let slot = hwm as u32;
648
649 // Collect the set of col_ids supplied for this node.
650 let supplied_col_ids: std::collections::HashSet<u32> =
651 props.iter().map(|&(col_id, _)| col_id).collect();
652
653 // Discover all columns already on disk for this label. Any that are
654 // NOT in `supplied_col_ids` must be zero-padded so their slot count
655 // stays in sync with the HWM (SPA-187).
656 let existing_col_ids = self.existing_col_ids(label_id)?;
657
658 // Columns that need zero-padding: exist on disk but not in this node's props.
659 let cols_to_zero_pad: Vec<u32> = existing_col_ids
660 .iter()
661 .copied()
662 .filter(|col_id| !supplied_col_ids.contains(col_id))
663 .collect();
664
665 // Build the full list of columns to touch for rollback tracking:
666 // supplied columns + columns that need zero-padding.
667 let all_col_ids_to_touch: Vec<u32> = supplied_col_ids
668 .iter()
669 .copied()
670 .chain(cols_to_zero_pad.iter().copied())
671 .collect();
672
673 // Snapshot the original size of each column file so we can roll back
674 // on partial failure. A column that does not yet exist has size 0.
675 let original_sizes: Vec<(u32, u64)> = all_col_ids_to_touch
676 .iter()
677 .map(|&col_id| {
678 let path = self.col_path(label_id, col_id);
679 let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
680 (col_id, size)
681 })
682 .collect();
683
684 // Write each property column. For columns not in `props`, write a
685 // zero-padded entry (the zero sentinel means "absent" / NULL).
686 // On failure, roll back all columns that were already written to avoid
687 // slot misalignment.
688 let write_result = (|| {
689 // Write supplied columns with their actual values.
690 for &(col_id, ref val) in props {
691 self.append_col(label_id, col_id, slot, self.encode_value(val)?)?;
692 }
693 // Zero-pad existing columns that were NOT supplied for this node.
694 for &col_id in &cols_to_zero_pad {
695 self.append_col(label_id, col_id, slot, 0u64)?;
696 }
697 Ok::<(), sparrowdb_common::Error>(())
698 })();
699
700 if let Err(e) = write_result {
701 // Truncate each column back to its original size.
702 for (col_id, original_size) in &original_sizes {
703 let path = self.col_path(label_id, *col_id);
704 if path.exists() {
705 if let Err(rollback_err) = fs::OpenOptions::new()
706 .write(true)
707 .open(&path)
708 .and_then(|f| f.set_len(*original_size))
709 {
710 eprintln!(
711 "CRITICAL: Failed to roll back column file {} to size {}: {}. Data may be corrupt.",
712 path.display(),
713 original_size,
714 rollback_err
715 );
716 }
717 }
718 }
719 return Err(e);
720 }
721
722 // Update hwm.
723 let new_hwm = hwm + 1;
724 self.save_hwm(label_id, new_hwm)?;
725 *self.hwm.get_mut(&label_id).unwrap() = new_hwm;
726
727 // Pack node ID.
728 let node_id = ((label_id as u64) << 32) | (slot as u64);
729 Ok(NodeId(node_id))
730 }
731
732 /// Write a deletion tombstone (`u64::MAX`) into `col_0.bin` for `node_id`.
733 ///
734 /// Creates `col_0.bin` (and its parent directory) if it does not exist,
735 /// zero-padding all preceding slots. This ensures that nodes which were
736 /// created without any `col_0` property are still properly marked as deleted
737 /// and become invisible to subsequent scans.
738 ///
739 /// Called from [`WriteTx::commit`] when flushing a buffered `NodeDelete`.
740 pub fn tombstone_node(&self, node_id: NodeId) -> Result<()> {
741 use std::io::{Seek, SeekFrom, Write};
742 let label_id = (node_id.0 >> 32) as u32;
743 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
744 let col0 = self.col_path(label_id, 0);
745
746 // Ensure the parent directory exists.
747 if let Some(parent) = col0.parent() {
748 fs::create_dir_all(parent).map_err(Error::Io)?;
749 }
750
751 let mut f = fs::OpenOptions::new()
752 .create(true)
753 .truncate(false)
754 .read(true)
755 .write(true)
756 .open(&col0)
757 .map_err(Error::Io)?;
758
759 // Zero-pad any slots before `slot` that are not yet in the file.
760 let needed_len = (slot as u64 + 1) * 8;
761 let existing_len = f.seek(SeekFrom::End(0)).map_err(Error::Io)?;
762 if existing_len < needed_len {
763 let zeros = vec![0u8; (needed_len - existing_len) as usize];
764 f.write_all(&zeros).map_err(Error::Io)?;
765 }
766
767 // Seek to the slot and write the tombstone value.
768 f.seek(SeekFrom::Start(slot as u64 * 8))
769 .map_err(Error::Io)?;
770 f.write_all(&u64::MAX.to_le_bytes()).map_err(Error::Io)
771 }
772
773 /// Overwrite the value of a single column for an existing node.
774 ///
775 /// Seeks to the slot's offset within `col_{col_id}.bin` and writes the new
776 /// 8-byte little-endian value in-place. Returns `Err(NotFound)` if the
777 /// slot does not exist yet.
778 pub fn set_node_col(&self, node_id: NodeId, col_id: u32, value: &Value) -> Result<()> {
779 use std::io::{Seek, SeekFrom, Write};
780 let label_id = (node_id.0 >> 32) as u32;
781 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
782 let path = self.col_path(label_id, col_id);
783 let mut file = fs::OpenOptions::new()
784 .write(true)
785 .open(&path)
786 .map_err(|_| Error::NotFound)?;
787 let offset = slot as u64 * 8;
788 file.seek(SeekFrom::Start(offset)).map_err(Error::Io)?;
789 file.write_all(&self.encode_value(value)?.to_le_bytes())
790 .map_err(Error::Io)
791 }
792
793 /// Write or create a column value for a node, creating and zero-padding the
794 /// column file if it does not yet exist.
795 ///
796 /// Unlike [`set_node_col`], this method creates the column file and fills all
797 /// slots from 0 to `slot - 1` with zeros before writing the target value.
798 /// This supports adding new property columns to existing nodes (Phase 7
799 /// `set_property` semantics).
800 pub fn upsert_node_col(&self, node_id: NodeId, col_id: u32, value: &Value) -> Result<()> {
801 use std::io::{Seek, SeekFrom, Write};
802 let label_id = (node_id.0 >> 32) as u32;
803 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
804 let path = self.col_path(label_id, col_id);
805
806 // Ensure parent directory exists.
807 if let Some(parent) = path.parent() {
808 fs::create_dir_all(parent).map_err(Error::Io)?;
809 }
810
811 // Open the file (create if absent), then pad with zeros up to and
812 // including the target slot, and finally seek back to write the value.
813 // We must NOT truncate: we only update a specific slot, not the whole file.
814 let mut file = fs::OpenOptions::new()
815 .create(true)
816 .truncate(false)
817 .read(true)
818 .write(true)
819 .open(&path)
820 .map_err(Error::Io)?;
821
822 // How many bytes are already in the file?
823 let existing_len = file.seek(SeekFrom::End(0)).map_err(Error::Io)?;
824 let needed_len = (slot as u64 + 1) * 8;
825 if existing_len < needed_len {
826 // Extend file with zeros from existing_len to needed_len.
827 // Write in fixed-size chunks to avoid a single large allocation
828 // that could OOM when the node slot ID is very high.
829 file.seek(SeekFrom::Start(existing_len))
830 .map_err(Error::Io)?;
831 const CHUNK: usize = 65536; // 64 KiB
832 let zeros = [0u8; CHUNK];
833 let mut remaining = (needed_len - existing_len) as usize;
834 while remaining > 0 {
835 let n = remaining.min(CHUNK);
836 file.write_all(&zeros[..n]).map_err(Error::Io)?;
837 remaining -= n;
838 }
839 }
840
841 // Seek to target slot and overwrite.
842 file.seek(SeekFrom::Start(slot as u64 * 8))
843 .map_err(Error::Io)?;
844 file.write_all(&self.encode_value(value)?.to_le_bytes())
845 .map_err(Error::Io)
846 }
847
848 /// Retrieve all stored properties of a node.
849 ///
850 /// Returns `(col_id, raw_u64)` pairs in the order the columns were defined.
851 /// The caller knows the schema (col IDs) from the catalog.
852 pub fn get_node_raw(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, u64)>> {
853 let label_id = (node_id.0 >> 32) as u32;
854 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
855
856 let mut result = Vec::with_capacity(col_ids.len());
857 for &col_id in col_ids {
858 let val = self.read_col_slot(label_id, col_id, slot)?;
859 result.push((col_id, val));
860 }
861 Ok(result)
862 }
863
864 /// Like [`get_node_raw`] but treats absent columns as `None` rather than
865 /// propagating [`Error::NotFound`].
866 ///
867 /// A column is considered absent when:
868 /// - Its column file does not exist (property never written for the label).
869 /// - Its column file is shorter than `slot + 1` entries (sparse write —
870 /// an earlier node never wrote this column; a later node that did write it
871 /// padded the file, but this slot's value was never explicitly stored).
872 ///
873 /// This is the correct read path for IS NULL evaluation: absent properties
874 /// must appear as `Value::Null`, not as an error or as integer 0.
875 pub fn get_node_raw_nullable(
876 &self,
877 node_id: NodeId,
878 col_ids: &[u32],
879 ) -> Result<Vec<(u32, Option<u64>)>> {
880 let label_id = (node_id.0 >> 32) as u32;
881 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
882
883 let mut result = Vec::with_capacity(col_ids.len());
884 for &col_id in col_ids {
885 let opt_val = self.read_col_slot_nullable(label_id, col_id, slot)?;
886 result.push((col_id, opt_val));
887 }
888 Ok(result)
889 }
890
891 /// Read a single column slot, returning `None` when the column was never
892 /// written for this node (file absent or slot out of bounds / zero-padded).
893 ///
894 /// Unlike [`read_col_slot`], this function distinguishes between:
895 /// - Column file does not exist → `None` (property never set on any node).
896 /// - Slot falls within the file but reads as 0 → the slot was zero-padded
897 /// by `append_col` for a later node's write; treat as `None`.
898 /// - Slot has a non-zero value → `Some(value)`.
899 /// - Slot is beyond the file end → `None` (property not set on this node).
900 ///
901 /// Value 0 is used as the "absent" sentinel: the storage encoding ensures
902 /// that any legitimately stored property (Int64, Bytes, Bool, Float) encodes
903 /// to a non-zero u64. Specifically, `StoreValue::Int64(0)` stores as 0 and
904 /// would be misidentified as absent — callers that need to store integer 0
905 /// should be aware of this limitation.
906 fn read_col_slot_nullable(&self, label_id: u32, col_id: u32, slot: u32) -> Result<Option<u64>> {
907 let path = self.col_path(label_id, col_id);
908 let bytes = match fs::read(&path) {
909 Ok(b) => b,
910 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
911 Err(e) => return Err(Error::Io(e)),
912 };
913 let offset = slot as usize * 8;
914 if bytes.len() < offset + 8 {
915 return Ok(None);
916 }
917 let raw = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
918 // Zero means "never written" (absent). Non-zero means a real value.
919 if raw == 0 {
920 Ok(None)
921 } else {
922 Ok(Some(raw))
923 }
924 }
925
926 /// Retrieve the typed property values for a node.
927 ///
928 /// Convenience wrapper over [`get_node_raw`] that decodes every raw `u64`
929 /// back to a `Value`, reading the overflow string heap when needed (SPA-212).
930 pub fn get_node(&self, node_id: NodeId, col_ids: &[u32]) -> Result<Vec<(u32, Value)>> {
931 let raw = self.get_node_raw(node_id, col_ids)?;
932 Ok(raw
933 .into_iter()
934 .map(|(col_id, v)| (col_id, self.decode_raw_value(v)))
935 .collect())
936 }
937
938 /// Read the entire contents of `col_{col_id}.bin` for `label_id` as a
939 /// flat `Vec<u64>`. Returns an empty vec when the file does not exist yet.
940 ///
941 /// This is used by [`crate::property_index::PropertyIndex::build`] to scan
942 /// all slot values in one `fs::read` call rather than one `read_col_slot`
943 /// per node, making index construction O(n) rather than O(n * syscall-overhead).
944 pub fn read_col_all(&self, label_id: u32, col_id: u32) -> Result<Vec<u64>> {
945 let path = self.col_path(label_id, col_id);
946 let bytes = match fs::read(&path) {
947 Ok(b) => b,
948 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
949 Err(e) => return Err(Error::Io(e)),
950 };
951 // Interpret the byte slice as a flat array of little-endian u64s.
952 let count = bytes.len() / 8;
953 let mut out = Vec::with_capacity(count);
954 for i in 0..count {
955 let offset = i * 8;
956 let v = u64::from_le_bytes(bytes[offset..offset + 8].try_into().unwrap());
957 out.push(v);
958 }
959 Ok(out)
960 }
961}
962
963// ── Helpers ─────────────────────────────────────────────────────────────────── ───────────────────────────────────────────────────────────────────
964
965/// Unpack `(label_id, slot)` from a [`NodeId`].
966pub fn unpack_node_id(node_id: NodeId) -> (u32, u32) {
967 let label_id = (node_id.0 >> 32) as u32;
968 let slot = (node_id.0 & 0xFFFF_FFFF) as u32;
969 (label_id, slot)
970}
971
972// ── Tests ─────────────────────────────────────────────────────────────────────
973
974#[cfg(test)]
975mod tests {
976 use super::*;
977 use tempfile::tempdir;
978
979 #[test]
980 fn test_node_create_and_get() {
981 let dir = tempdir().unwrap();
982 let mut store = NodeStore::open(dir.path()).unwrap();
983
984 let label_id = 1u32;
985 let props = vec![(0u32, Value::Int64(42)), (1u32, Value::Int64(100))];
986
987 let node_id = store.create_node(label_id, &props).unwrap();
988
989 // Verify the packed node ID.
990 let (lid, slot) = unpack_node_id(node_id);
991 assert_eq!(lid, label_id);
992 assert_eq!(slot, 0); // first node → slot 0
993
994 // Get back the values.
995 let retrieved = store.get_node(node_id, &[0, 1]).unwrap();
996 assert_eq!(retrieved.len(), 2);
997 assert_eq!(retrieved[0], (0, Value::Int64(42)));
998 assert_eq!(retrieved[1], (1, Value::Int64(100)));
999 }
1000
1001 #[test]
1002 fn test_node_multiple_nodes_sequential_slots() {
1003 let dir = tempdir().unwrap();
1004 let mut store = NodeStore::open(dir.path()).unwrap();
1005
1006 let n1 = store.create_node(1, &[(0, Value::Int64(10))]).unwrap();
1007 let n2 = store.create_node(1, &[(0, Value::Int64(20))]).unwrap();
1008 let n3 = store.create_node(1, &[(0, Value::Int64(30))]).unwrap();
1009
1010 let (_, s1) = unpack_node_id(n1);
1011 let (_, s2) = unpack_node_id(n2);
1012 let (_, s3) = unpack_node_id(n3);
1013 assert_eq!(s1, 0);
1014 assert_eq!(s2, 1);
1015 assert_eq!(s3, 2);
1016
1017 assert_eq!(store.get_node(n1, &[0]).unwrap()[0].1, Value::Int64(10));
1018 assert_eq!(store.get_node(n2, &[0]).unwrap()[0].1, Value::Int64(20));
1019 assert_eq!(store.get_node(n3, &[0]).unwrap()[0].1, Value::Int64(30));
1020 }
1021
1022 #[test]
1023 fn test_node_persists_across_reopen() {
1024 let dir = tempdir().unwrap();
1025
1026 let node_id = {
1027 let mut store = NodeStore::open(dir.path()).unwrap();
1028 store
1029 .create_node(2, &[(0, Value::Int64(999)), (1, Value::Int64(-1))])
1030 .unwrap()
1031 };
1032
1033 // Reopen store from disk.
1034 let store2 = NodeStore::open(dir.path()).unwrap();
1035 let vals = store2.get_node(node_id, &[0, 1]).unwrap();
1036 assert_eq!(vals[0].1, Value::Int64(999));
1037 assert_eq!(vals[1].1, Value::Int64(-1));
1038 }
1039
1040 #[test]
1041 fn test_node_hwm_persists_across_reopen() {
1042 let dir = tempdir().unwrap();
1043
1044 // Create 3 nodes in session 1.
1045 {
1046 let mut store = NodeStore::open(dir.path()).unwrap();
1047 store.create_node(0, &[(0, Value::Int64(1))]).unwrap();
1048 store.create_node(0, &[(0, Value::Int64(2))]).unwrap();
1049 store.create_node(0, &[(0, Value::Int64(3))]).unwrap();
1050 }
1051
1052 // Reopen and create a 4th node — must get slot 3.
1053 let mut store2 = NodeStore::open(dir.path()).unwrap();
1054 let n4 = store2.create_node(0, &[(0, Value::Int64(4))]).unwrap();
1055 let (_, slot) = unpack_node_id(n4);
1056 assert_eq!(slot, 3);
1057 }
1058
1059 #[test]
1060 fn test_node_different_labels_independent() {
1061 let dir = tempdir().unwrap();
1062 let mut store = NodeStore::open(dir.path()).unwrap();
1063
1064 let a = store.create_node(10, &[(0, Value::Int64(1))]).unwrap();
1065 let b = store.create_node(20, &[(0, Value::Int64(2))]).unwrap();
1066
1067 let (la, sa) = unpack_node_id(a);
1068 let (lb, sb) = unpack_node_id(b);
1069 assert_eq!(la, 10);
1070 assert_eq!(sa, 0);
1071 assert_eq!(lb, 20);
1072 assert_eq!(sb, 0);
1073 }
1074}