use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use crate::core::error::{Error, Result, StorageError};
use crate::core::hlc::HybridTimestamp;
use crate::core::id::{EdgeId, NodeId, VersionId};
use crate::core::property::PropertyMap;
use super::serialization::{
OP_CHECKPOINT, OP_CREATE_EDGE, OP_CREATE_NODE, OP_DELETE_EDGE, OP_DELETE_NODE, OP_UPDATE_EDGE,
OP_UPDATE_NODE,
};
use super::{LSN, WalEntry, WalOperation};
pub(crate) const WAL_MAGIC: [u8; 4] = *b"GWAL";
pub(crate) const WAL_VERSION: u8 = 1;
pub(crate) const WAL_VERSION_ENCRYPTED: u8 = 2;
const WAL_VERSION_MAX: u8 = WAL_VERSION_ENCRYPTED;
pub(crate) const WAL_HEADER_SIZE: usize = 5;
pub(crate) const MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024;
pub fn read_entries_from_dir(wal_dir: &Path, start_lsn: LSN) -> Result<Vec<WalEntry>> {
read_entries_from_dir_with_cipher(wal_dir, start_lsn, None)
}
pub fn read_entries_from_dir_with_cipher(
wal_dir: &Path,
start_lsn: LSN,
cipher: Option<&Arc<dyn crate::encryption::cipher::Cipher>>,
) -> Result<Vec<WalEntry>> {
let mut entries = Vec::new();
let mut segments = Vec::with_capacity(16); if let Ok(dir_entries) = std::fs::read_dir(wal_dir) {
for entry in dir_entries.flatten() {
if let Some(name) = entry.file_name().to_str()
&& name.ends_with(".log")
&& let Some(seg_id) = name
.strip_suffix(".log")
.and_then(|s| s.parse::<u64>().ok())
{
segments.push((seg_id, entry.path()));
}
}
}
segments.sort_by_key(|(id, _)| *id);
for (_, path) in segments {
let segment_entries = read_segment_with_cipher(&path, start_lsn, cipher)?;
entries.extend(segment_entries);
}
entries.sort_by_key(|entry| entry.lsn);
Ok(entries)
}
pub fn read_segment(path: &Path, start_lsn: LSN) -> Result<Vec<WalEntry>> {
read_segment_with_cipher(path, start_lsn, None)
}
pub fn read_segment_with_cipher(
path: &Path,
start_lsn: LSN,
cipher: Option<&Arc<dyn crate::encryption::cipher::Cipher>>,
) -> Result<Vec<WalEntry>> {
let file = match File::open(path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => {
return Err(StorageError::IoError(format!(
"Failed to open WAL segment {:?}: {}",
path, e
))
.into());
}
};
let metadata = file
.metadata()
.map_err(|e| StorageError::IoError(format!("Failed to get file metadata: {}", e)))?;
if metadata.len() > MAX_SEGMENT_SIZE {
return Err(StorageError::CorruptedData(format!(
"WAL segment too large: {} bytes (max: {} bytes)",
metadata.len(),
MAX_SEGMENT_SIZE
))
.into());
}
if metadata.len() == 0 {
return Ok(Vec::new());
}
let mmap = unsafe {
memmap2::Mmap::map(&file).map_err(|e| {
StorageError::IoError(format!("Failed to memory-map WAL segment: {}", e))
})?
};
let buffer = &mmap[..];
let capacity_hint = (buffer.len() / 128).max(1);
let mut entries = Vec::with_capacity(capacity_hint);
let (version, mut offset) = if buffer.len() >= WAL_HEADER_SIZE && buffer[0..4] == WAL_MAGIC {
let ver = buffer[4];
if ver > WAL_VERSION_MAX {
return Err(StorageError::CorruptedData(format!(
"Unsupported WAL version: {} (max supported: {})",
ver, WAL_VERSION_MAX
))
.into());
}
(ver, WAL_HEADER_SIZE)
} else if !buffer.is_empty() {
return Err(StorageError::CorruptedData(
"Invalid WAL segment: missing GWAL magic header".to_string(),
)
.into());
} else {
return Ok(Vec::new()); };
if version == WAL_VERSION_ENCRYPTED && cipher.is_none() {
return Err(StorageError::Encryption(
"Cannot read encrypted WAL segment (version 2) without a cipher".to_string(),
)
.into());
}
if version == WAL_VERSION_ENCRYPTED {
let cipher = cipher.expect("cipher presence checked above");
parse_encrypted_entries(buffer, &mut offset, start_lsn, cipher, path, &mut entries)?;
} else {
parse_plaintext_entries(buffer, &mut offset, version, start_lsn, path, &mut entries)?;
}
Ok(entries)
}
fn parse_plaintext_entries(
buffer: &[u8],
offset: &mut usize,
version: u8,
start_lsn: LSN,
path: &Path,
entries: &mut Vec<WalEntry>,
) -> Result<()> {
while *offset < buffer.len() {
match parse_entry_at(buffer, *offset, version) {
Ok((entry, bytes_consumed)) => {
if entry.lsn >= start_lsn {
entries.push(entry);
}
*offset += bytes_consumed;
}
Err(e) => {
if *offset + 24 > buffer.len() {
#[cfg(feature = "observability")]
tracing::debug!(
"Partial entry at end of WAL segment {:?} (offset {}/{}), stopping read",
path,
offset,
buffer.len()
);
break;
} else {
let header_slice = &buffer[*offset..*offset + 24];
if header_slice.iter().all(|&b| b == 0) {
#[cfg(feature = "observability")]
tracing::debug!(
"Zeroed region at end of WAL segment {:?} (offset {}/{}), stopping read",
path,
offset,
buffer.len()
);
break;
}
#[cfg(feature = "observability")]
tracing::error!(
"Failed to parse WAL entry in segment {:?} at offset {}: {}",
path,
offset,
e
);
#[cfg(not(feature = "observability"))]
{
eprintln!(
"CRITICAL: Failed to parse WAL entry in segment {:?} at offset {}: {}",
path, offset, e
);
eprintln!("Header slice: {:?}", header_slice);
}
return Err(e);
}
}
}
}
Ok(())
}
fn parse_encrypted_entries(
buffer: &[u8],
offset: &mut usize,
start_lsn: LSN,
cipher: &Arc<dyn crate::encryption::cipher::Cipher>,
path: &Path,
entries: &mut Vec<WalEntry>,
) -> Result<()> {
while *offset < buffer.len() {
if *offset + 4 > buffer.len() {
#[cfg(feature = "observability")]
tracing::debug!(
"Partial length prefix at end of encrypted WAL segment {:?} (offset {}/{}), stopping read",
path,
offset,
buffer.len()
);
break;
}
let len_bytes: [u8; 4] = buffer[*offset..*offset + 4]
.try_into()
.expect("slice length verified above");
let entry_len = u32::from_le_bytes(len_bytes) as usize;
if entry_len == 0 {
break;
}
*offset += 4;
if *offset + entry_len > buffer.len() {
#[cfg(feature = "observability")]
tracing::debug!(
"Truncated encrypted entry at end of WAL segment {:?} (offset {}, entry_len {}, buf_len {}), stopping read",
path,
offset,
entry_len,
buffer.len()
);
break;
}
let encrypted_entry = &buffer[*offset..*offset + entry_len];
*offset += entry_len;
let decrypted =
crate::encryption::wal_encryption::decrypt_wal_payload(encrypted_entry, cipher)
.map_err(|e| {
Error::Storage(StorageError::Encryption(format!(
"Failed to decrypt WAL entry in segment {:?}: {}",
path, e
)))
})?;
match parse_entry_at(&decrypted, 0, WAL_VERSION) {
Ok((entry, _bytes_consumed)) => {
if entry.lsn >= start_lsn {
entries.push(entry);
}
}
Err(e) => {
#[cfg(feature = "observability")]
tracing::error!(
"Failed to parse decrypted WAL entry in segment {:?}: {}",
path,
e
);
#[cfg(not(feature = "observability"))]
eprintln!(
"CRITICAL: Failed to parse decrypted WAL entry in segment {:?}: {}",
path, e
);
return Err(e);
}
}
}
Ok(())
}
#[inline]
fn advance(offset: &mut usize, n: usize) -> Result<()> {
*offset = offset.checked_add(n).ok_or_else(|| {
Error::Storage(StorageError::CorruptedData(
"WAL offset overflow".to_string(),
))
})?;
Ok(())
}
#[inline]
fn require_bytes(buffer: &[u8], offset: usize, n: usize, context: &str) -> Result<()> {
let end = offset.checked_add(n).ok_or_else(|| {
Error::Storage(StorageError::CorruptedData(
"WAL offset overflow".to_string(),
))
})?;
if end > buffer.len() {
return Err(StorageError::CorruptedData(format!(
"Insufficient buffer size for {}",
context
))
.into());
}
Ok(())
}
#[inline]
fn read_label(
buffer: &[u8],
offset: &mut usize,
context: &str,
) -> Result<crate::core::interning::InternedString> {
require_bytes(buffer, *offset, 4, context)?;
let label_id = u32::from_le_bytes(buffer[*offset..*offset + 4].try_into().unwrap());
advance(offset, 4)?;
Ok(crate::core::interning::InternedString::from_raw(label_id))
}
fn read_props_and_valid_from(
buffer: &[u8],
offset: &mut usize,
version: u8,
tx_timestamp: HybridTimestamp,
) -> Result<(PropertyMap, HybridTimestamp)> {
if version >= WAL_VERSION {
let (props, props_len) = PropertyMap::deserialize(&buffer[*offset..])?;
advance(offset, props_len)?;
let (valid_from, ts_len) = HybridTimestamp::deserialize(&buffer[*offset..])?;
advance(offset, ts_len)?;
Ok((props, valid_from))
} else {
Ok((PropertyMap::new(), tx_timestamp))
}
}
fn parse_create_node_op(
buffer: &[u8],
offset: &mut usize,
version: u8,
tx_timestamp: HybridTimestamp,
) -> Result<WalOperation> {
let node_id = deserialize_node_id(buffer, *offset, "CreateNode")?;
advance(offset, 8)?;
let label = read_label(buffer, offset, "CreateNode label")?;
let (properties, valid_from) =
read_props_and_valid_from(buffer, offset, version, tx_timestamp)?;
Ok(WalOperation::CreateNode {
node_id,
label,
properties,
valid_from,
})
}
fn parse_create_edge_op(
buffer: &[u8],
offset: &mut usize,
version: u8,
tx_timestamp: HybridTimestamp,
) -> Result<WalOperation> {
let edge_id = deserialize_edge_id(buffer, *offset, "CreateEdge")?;
advance(offset, 8)?;
let source = deserialize_node_id(buffer, *offset, "CreateEdge source")?;
advance(offset, 8)?;
let target = deserialize_node_id(buffer, *offset, "CreateEdge target")?;
advance(offset, 8)?;
let label = read_label(buffer, offset, "CreateEdge label")?;
let (properties, valid_from) =
read_props_and_valid_from(buffer, offset, version, tx_timestamp)?;
Ok(WalOperation::CreateEdge {
edge_id,
source,
target,
label,
properties,
valid_from,
})
}
fn parse_update_node_op(
buffer: &[u8],
offset: &mut usize,
version: u8,
tx_timestamp: HybridTimestamp,
) -> Result<WalOperation> {
let node_id = deserialize_node_id(buffer, *offset, "UpdateNode")?;
advance(offset, 8)?;
let version_id = deserialize_version_id(buffer, *offset, "UpdateNode")?;
advance(offset, 8)?;
let (label, properties, valid_from) = if version >= WAL_VERSION {
let label = read_label(buffer, offset, "UpdateNode label")?;
let (props, valid_from) = read_props_and_valid_from(buffer, offset, version, tx_timestamp)?;
(label, props, valid_from)
} else {
(
crate::core::interning::InternedString::from_raw(0),
PropertyMap::new(),
tx_timestamp,
)
};
Ok(WalOperation::UpdateNode {
node_id,
version_id,
label,
properties,
valid_from,
})
}
fn parse_update_edge_op(
buffer: &[u8],
offset: &mut usize,
version: u8,
tx_timestamp: HybridTimestamp,
) -> Result<WalOperation> {
let required = if version >= WAL_VERSION { 20 } else { 16 };
require_bytes(buffer, *offset, required, "UpdateEdge")?;
let edge_id = deserialize_edge_id(buffer, *offset, "UpdateEdge")?;
advance(offset, 8)?;
let version_id = deserialize_version_id(buffer, *offset, "UpdateEdge")?;
advance(offset, 8)?;
let (label, properties, valid_from) = if version >= WAL_VERSION {
let label = read_label(buffer, offset, "UpdateEdge label")?;
let (props, valid_from) = read_props_and_valid_from(buffer, offset, version, tx_timestamp)?;
(label, props, valid_from)
} else {
(
crate::core::interning::InternedString::from_raw(0),
PropertyMap::new(),
tx_timestamp,
)
};
Ok(WalOperation::UpdateEdge {
edge_id,
version_id,
label,
properties,
valid_from,
})
}
fn parse_delete_node_op(
buffer: &[u8],
offset: &mut usize,
version: u8,
tx_timestamp: HybridTimestamp,
) -> Result<WalOperation> {
let node_id = deserialize_node_id(buffer, *offset, "DeleteNode")?;
advance(offset, 8)?;
let valid_from = if version >= WAL_VERSION {
let (ts, ts_len) = HybridTimestamp::deserialize(&buffer[*offset..])?;
advance(offset, ts_len)?;
ts
} else {
tx_timestamp
};
Ok(WalOperation::DeleteNode {
node_id,
valid_from,
})
}
fn parse_delete_edge_op(
buffer: &[u8],
offset: &mut usize,
version: u8,
tx_timestamp: HybridTimestamp,
) -> Result<WalOperation> {
let edge_id = deserialize_edge_id(buffer, *offset, "DeleteEdge")?;
advance(offset, 8)?;
let valid_from = if version >= WAL_VERSION {
let (ts, ts_len) = HybridTimestamp::deserialize(&buffer[*offset..])?;
advance(offset, ts_len)?;
ts
} else {
tx_timestamp
};
Ok(WalOperation::DeleteEdge {
edge_id,
valid_from,
})
}
fn parse_checkpoint_op(buffer: &[u8], offset: &mut usize) -> Result<WalOperation> {
require_bytes(buffer, *offset, 20, "Checkpoint")?;
let cp_lsn = LSN(u64::from_le_bytes(
buffer[*offset..*offset + 8].try_into().unwrap(),
));
advance(offset, 8)?;
let (cp_timestamp, consumed) = HybridTimestamp::deserialize(&buffer[*offset..])?;
advance(offset, consumed)?;
Ok(WalOperation::Checkpoint {
lsn: cp_lsn,
timestamp: cp_timestamp,
})
}
pub(crate) fn parse_entry_at(
buffer: &[u8],
offset: usize,
version: u8,
) -> Result<(WalEntry, usize)> {
let start_offset = offset;
let mut cur = offset;
require_bytes(buffer, cur, 24, "WAL entry header")?;
let lsn = LSN(u64::from_le_bytes(
buffer[cur..cur + 8].try_into().unwrap(), ));
advance(&mut cur, 8)?;
let (timestamp, _) = HybridTimestamp::deserialize(&buffer[cur..]).map_err(|e| {
StorageError::CorruptedData(format!("Failed to deserialize timestamp: {}", e))
})?;
advance(&mut cur, 12)?;
let checksum = u32::from_le_bytes(
buffer[cur..cur + 4].try_into().unwrap(), );
advance(&mut cur, 4)?;
if cur >= buffer.len() {
return Err(StorageError::CorruptedData(
"Insufficient buffer size for operation type".to_string(),
)
.into());
}
let op_type = buffer[cur];
advance(&mut cur, 1)?;
let operation = match op_type {
OP_CREATE_NODE => parse_create_node_op(buffer, &mut cur, version, timestamp)?,
OP_CREATE_EDGE => parse_create_edge_op(buffer, &mut cur, version, timestamp)?,
OP_UPDATE_NODE => parse_update_node_op(buffer, &mut cur, version, timestamp)?,
OP_UPDATE_EDGE => parse_update_edge_op(buffer, &mut cur, version, timestamp)?,
OP_DELETE_NODE => parse_delete_node_op(buffer, &mut cur, version, timestamp)?,
OP_DELETE_EDGE => parse_delete_edge_op(buffer, &mut cur, version, timestamp)?,
OP_CHECKPOINT => parse_checkpoint_op(buffer, &mut cur)?,
_ => {
return Err(StorageError::CorruptedData(format!(
"Unknown WAL operation type: {}",
op_type
))
.into());
}
};
let mut hasher = crc32fast::Hasher::new();
hasher.update(&buffer[start_offset..start_offset + 20]);
hasher.update(&buffer[start_offset + 24..cur]);
let computed_checksum = hasher.finalize();
if checksum != computed_checksum {
return Err(StorageError::CorruptedData(format!(
"WAL entry checksum mismatch for LSN {}: expected {:#x}, got {:#x}. Entry is corrupted.",
lsn.0, checksum, computed_checksum
))
.into());
}
let entry = WalEntry {
lsn,
timestamp,
operation,
checksum,
};
let bytes_consumed = cur - start_offset;
Ok((entry, bytes_consumed))
}
#[inline]
fn deserialize_node_id(buffer: &[u8], offset: usize, context: &str) -> Result<NodeId> {
let bytes = buffer.get(offset..offset + 8).ok_or_else(|| {
Error::Storage(StorageError::CorruptedData(format!(
"Insufficient buffer size for NodeId in {}",
context
)))
})?;
let raw_id = u64::from_le_bytes(bytes.try_into().unwrap());
NodeId::new(raw_id).map_err(|e| {
Error::Storage(StorageError::CorruptedData(format!(
"Invalid node ID in WAL {}: {}",
context, e
)))
})
}
#[inline]
fn deserialize_edge_id(buffer: &[u8], offset: usize, context: &str) -> Result<EdgeId> {
let bytes = buffer.get(offset..offset + 8).ok_or_else(|| {
Error::Storage(StorageError::CorruptedData(format!(
"Insufficient buffer size for EdgeId in {}",
context
)))
})?;
let raw_id = u64::from_le_bytes(bytes.try_into().unwrap());
EdgeId::new(raw_id).map_err(|e| {
Error::Storage(StorageError::CorruptedData(format!(
"Invalid edge ID in WAL {}: {}",
context, e
)))
})
}
#[inline]
fn deserialize_version_id(buffer: &[u8], offset: usize, context: &str) -> Result<VersionId> {
let bytes = buffer.get(offset..offset + 8).ok_or_else(|| {
Error::Storage(StorageError::CorruptedData(format!(
"Insufficient buffer size for VersionId in {}",
context
)))
})?;
let raw_id = u64::from_le_bytes(bytes.try_into().unwrap());
VersionId::new(raw_id).map_err(|e| {
Error::Storage(StorageError::CorruptedData(format!(
"Invalid version ID in WAL {}: {}",
context, e
)))
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::interning::GLOBAL_INTERNER;
use crate::core::temporal::time;
use crate::storage::wal::serialization::serialize_entry_into;
use tempfile::TempDir;
#[test]
fn test_read_empty_directory() {
let dir = TempDir::new().unwrap();
let entries = read_entries_from_dir(dir.path(), LSN(1)).unwrap();
assert!(entries.is_empty());
}
#[test]
fn test_read_nonexistent_segment() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("nonexistent.log");
let entries = read_segment(&path, LSN(1)).unwrap();
assert!(entries.is_empty());
}
#[test]
fn test_parse_entry_at_create_node() {
let node_id = NodeId::new(42).unwrap();
let operation = WalOperation::CreateNode {
node_id,
label: GLOBAL_INTERNER.intern("Person").unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry = WalEntry::new(LSN(1), operation);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
let (parsed_entry, bytes_consumed) = parse_entry_at(&buffer, 0, WAL_VERSION).unwrap();
assert_eq!(parsed_entry.lsn, LSN(1));
assert_eq!(bytes_consumed, buffer.len());
match parsed_entry.operation {
WalOperation::CreateNode {
node_id: parsed_id,
label,
..
} => {
assert_eq!(parsed_id, node_id);
assert_eq!(label, GLOBAL_INTERNER.intern("Person").unwrap());
}
_ => panic!("Expected CreateNode operation"),
}
}
#[test]
fn test_parse_entry_at_create_edge() {
let edge_id = EdgeId::new(100).unwrap();
let source = NodeId::new(1).unwrap();
let target = NodeId::new(2).unwrap();
let operation = WalOperation::CreateEdge {
edge_id,
source,
target,
label: GLOBAL_INTERNER.intern("KNOWS").unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry = WalEntry::new(LSN(2), operation);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
let (parsed_entry, bytes_consumed) = parse_entry_at(&buffer, 0, WAL_VERSION).unwrap();
assert_eq!(parsed_entry.lsn, LSN(2));
assert_eq!(bytes_consumed, buffer.len());
match parsed_entry.operation {
WalOperation::CreateEdge {
edge_id: parsed_id,
source: parsed_source,
target: parsed_target,
label,
..
} => {
assert_eq!(parsed_id, edge_id);
assert_eq!(parsed_source, source);
assert_eq!(parsed_target, target);
assert_eq!(label, GLOBAL_INTERNER.intern("KNOWS").unwrap());
}
_ => panic!("Expected CreateEdge operation"),
}
}
#[test]
fn test_parse_entry_at_update_node() {
let node_id = NodeId::new(42).unwrap();
let version_id = VersionId::new(1).unwrap();
let operation = WalOperation::UpdateNode {
node_id,
version_id,
label: GLOBAL_INTERNER.intern("UpdatedPerson").unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry = WalEntry::new(LSN(3), operation);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
let (parsed_entry, bytes_consumed) = parse_entry_at(&buffer, 0, WAL_VERSION).unwrap();
assert_eq!(parsed_entry.lsn, LSN(3));
assert_eq!(bytes_consumed, buffer.len());
match parsed_entry.operation {
WalOperation::UpdateNode {
node_id: parsed_id,
version_id: parsed_version,
label,
..
} => {
assert_eq!(parsed_id, node_id);
assert_eq!(parsed_version, version_id);
assert_eq!(label, GLOBAL_INTERNER.intern("UpdatedPerson").unwrap());
}
_ => panic!("Expected UpdateNode operation"),
}
}
#[test]
fn test_parse_entry_at_update_edge() {
let edge_id = EdgeId::new(100).unwrap();
let version_id = VersionId::new(1).unwrap();
let operation = WalOperation::UpdateEdge {
edge_id,
version_id,
label: GLOBAL_INTERNER.intern("UPDATED_KNOWS").unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry = WalEntry::new(LSN(4), operation);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
let (parsed_entry, bytes_consumed) = parse_entry_at(&buffer, 0, WAL_VERSION).unwrap();
assert_eq!(parsed_entry.lsn, LSN(4));
assert_eq!(bytes_consumed, buffer.len());
match parsed_entry.operation {
WalOperation::UpdateEdge {
edge_id: parsed_id,
version_id: parsed_version,
label,
..
} => {
assert_eq!(parsed_id, edge_id);
assert_eq!(parsed_version, version_id);
assert_eq!(label, GLOBAL_INTERNER.intern("UPDATED_KNOWS").unwrap());
}
_ => panic!("Expected UpdateEdge operation"),
}
}
#[test]
fn test_parse_entry_at_delete_node() {
let node_id = NodeId::new(42).unwrap();
let operation = WalOperation::DeleteNode {
node_id,
valid_from: time::now(),
};
let entry = WalEntry::new(LSN(5), operation);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
let (parsed_entry, bytes_consumed) = parse_entry_at(&buffer, 0, WAL_VERSION).unwrap();
assert_eq!(parsed_entry.lsn, LSN(5));
assert_eq!(bytes_consumed, buffer.len());
match parsed_entry.operation {
WalOperation::DeleteNode {
node_id: parsed_id, ..
} => {
assert_eq!(parsed_id, node_id);
}
_ => panic!("Expected DeleteNode operation"),
}
}
#[test]
fn test_parse_entry_at_delete_edge() {
let edge_id = EdgeId::new(100).unwrap();
let operation = WalOperation::DeleteEdge {
edge_id,
valid_from: time::now(),
};
let entry = WalEntry::new(LSN(6), operation);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
let (parsed_entry, bytes_consumed) = parse_entry_at(&buffer, 0, WAL_VERSION).unwrap();
assert_eq!(parsed_entry.lsn, LSN(6));
assert_eq!(bytes_consumed, buffer.len());
match parsed_entry.operation {
WalOperation::DeleteEdge {
edge_id: parsed_id, ..
} => {
assert_eq!(parsed_id, edge_id);
}
_ => panic!("Expected DeleteEdge operation"),
}
}
#[test]
fn test_parse_entry_at_checkpoint() {
let cp_timestamp = time::now();
let operation = WalOperation::Checkpoint {
lsn: LSN(100),
timestamp: cp_timestamp,
};
let entry = WalEntry::new(LSN(7), operation);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
let (parsed_entry, bytes_consumed) = parse_entry_at(&buffer, 0, WAL_VERSION).unwrap();
assert_eq!(parsed_entry.lsn, LSN(7));
assert_eq!(bytes_consumed, buffer.len());
match parsed_entry.operation {
WalOperation::Checkpoint { lsn, .. } => {
assert_eq!(lsn, LSN(100));
}
_ => panic!("Expected Checkpoint operation"),
}
}
#[test]
fn test_parse_entry_at_with_offset() {
let operation1 = WalOperation::CreateNode {
node_id: NodeId::new(1).unwrap(),
label: GLOBAL_INTERNER.intern("First").unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry1 = WalEntry::new(LSN(1), operation1);
let operation2 = WalOperation::CreateNode {
node_id: NodeId::new(2).unwrap(),
label: GLOBAL_INTERNER.intern("Second").unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry2 = WalEntry::new(LSN(2), operation2);
let mut buffer = Vec::new();
serialize_entry_into(&entry1, &mut buffer).unwrap();
let offset1_end = buffer.len();
let mut buffer2 = Vec::new();
serialize_entry_into(&entry2, &mut buffer2).unwrap();
buffer.extend_from_slice(&buffer2);
let (parsed_entry, bytes_consumed) =
parse_entry_at(&buffer, offset1_end, WAL_VERSION).unwrap();
assert_eq!(parsed_entry.lsn, LSN(2));
match parsed_entry.operation {
WalOperation::CreateNode { label, .. } => {
assert_eq!(label, GLOBAL_INTERNER.intern("Second").unwrap());
}
_ => panic!("Expected CreateNode operation"),
}
assert_eq!(bytes_consumed, buffer.len() - offset1_end);
}
#[test]
fn test_parse_entry_at_insufficient_buffer() {
let buffer = vec![0u8; 10];
let result = parse_entry_at(&buffer, 0, WAL_VERSION);
assert!(result.is_err());
}
#[test]
fn test_parse_entry_at_unknown_operation_type() {
let mut buffer = Vec::new();
buffer.extend_from_slice(&1u64.to_le_bytes());
let timestamp = time::now();
timestamp.serialize_into(&mut buffer);
buffer.extend_from_slice(&0u32.to_le_bytes());
buffer.push(255);
let result = parse_entry_at(&buffer, 0, WAL_VERSION);
assert!(result.is_err());
}
#[test]
fn test_parse_entry_at_truncated_operation_data() {
let mut buffer = Vec::new();
buffer.extend_from_slice(&1u64.to_le_bytes());
let timestamp = time::now();
timestamp.serialize_into(&mut buffer);
buffer.extend_from_slice(&0u32.to_le_bytes());
buffer.push(1);
buffer.extend_from_slice(&[1, 2, 3, 4]);
let result = parse_entry_at(&buffer, 0, WAL_VERSION);
assert!(result.is_err());
}
#[test]
fn test_parse_entry_at_version_0_compatibility() {
let mut buffer = Vec::new();
buffer.extend_from_slice(&42u64.to_le_bytes());
let timestamp = time::now();
timestamp.serialize_into(&mut buffer);
let checksum_offset = buffer.len();
buffer.extend_from_slice(&0u32.to_le_bytes());
buffer.push(1);
buffer.extend_from_slice(&123u64.to_le_bytes());
let label_id = GLOBAL_INTERNER.intern("TestNode").unwrap().as_u32();
buffer.extend_from_slice(&label_id.to_le_bytes());
let mut hasher = crc32fast::Hasher::new();
hasher.update(&buffer[0..checksum_offset]); hasher.update(&buffer[checksum_offset + 4..]); let checksum = hasher.finalize();
buffer[checksum_offset..checksum_offset + 4].copy_from_slice(&checksum.to_le_bytes());
let (parsed_entry, bytes_consumed) = parse_entry_at(&buffer, 0, 0).unwrap();
assert_eq!(parsed_entry.lsn.0, 42);
assert_eq!(bytes_consumed, buffer.len());
match parsed_entry.operation {
WalOperation::CreateNode {
node_id,
label: parsed_label,
properties,
valid_from,
} => {
assert_eq!(node_id.as_u64(), 123);
assert_eq!(parsed_label, GLOBAL_INTERNER.intern("TestNode").unwrap());
assert!(properties.is_empty());
assert_eq!(valid_from, timestamp);
}
_ => panic!("Expected CreateNode operation"),
}
}
#[test]
fn test_parse_entry_at_checksum_mismatch() {
let node_id = NodeId::new(42).unwrap();
let operation = WalOperation::CreateNode {
node_id,
label: GLOBAL_INTERNER.intern("Person").unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry = WalEntry::new(LSN(1), operation);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
buffer[20] ^= 0xFF;
let result = parse_entry_at(&buffer, 0, WAL_VERSION);
assert!(result.is_err());
if let Err(e) = result {
let error_msg = format!("{}", e);
assert!(error_msg.contains("checksum mismatch"));
}
}
#[test]
fn test_parse_entry_at_update_edge_truncated_label() {
let mut buffer = Vec::new();
buffer.extend_from_slice(&1u64.to_le_bytes());
let timestamp = time::now();
timestamp.serialize_into(&mut buffer);
let checksum_offset = buffer.len();
buffer.extend_from_slice(&0u32.to_le_bytes());
buffer.push(4);
buffer.extend_from_slice(&100u64.to_le_bytes());
buffer.extend_from_slice(&1u64.to_le_bytes());
let mut hasher = crc32fast::Hasher::new();
hasher.update(&buffer[0..checksum_offset]); hasher.update(&buffer[checksum_offset + 4..]); let checksum = hasher.finalize();
buffer[checksum_offset..checksum_offset + 4].copy_from_slice(&checksum.to_le_bytes());
let result = parse_entry_at(&buffer, 0, WAL_VERSION);
assert!(result.is_err());
let err = result.unwrap_err();
let err_msg = format!("{}", err);
assert!(err_msg.contains("Insufficient buffer size"));
}
#[test]
fn test_parse_entry_at_update_node_truncated_label() {
let mut buffer = Vec::new();
buffer.extend_from_slice(&1u64.to_le_bytes());
let timestamp = time::now();
timestamp.serialize_into(&mut buffer);
let checksum_offset = buffer.len();
buffer.extend_from_slice(&0u32.to_le_bytes());
buffer.push(3);
buffer.extend_from_slice(&100u64.to_le_bytes());
buffer.extend_from_slice(&1u64.to_le_bytes());
let mut hasher = crc32fast::Hasher::new();
hasher.update(&buffer[0..checksum_offset]); hasher.update(&buffer[checksum_offset + 4..]); let checksum = hasher.finalize();
buffer[checksum_offset..checksum_offset + 4].copy_from_slice(&checksum.to_le_bytes());
let result = parse_entry_at(&buffer, 0, WAL_VERSION);
assert!(result.is_err());
let err = result.unwrap_err();
let err_msg = format!("{}", err);
assert!(err_msg.contains("Insufficient buffer size"));
}
#[test]
fn test_read_large_segment_memory_efficient() {
use std::io::Write;
let dir = TempDir::new().unwrap();
let segment_path = dir.path().join("large_segment.log");
let mut file = File::create(&segment_path).unwrap();
file.write_all(&WAL_MAGIC).unwrap();
file.write_all(&[WAL_VERSION]).unwrap();
let num_entries = 1000;
let mut expected_lsns = Vec::new();
for i in 0..num_entries {
let lsn = LSN(i + 1);
expected_lsns.push(lsn);
let operation = WalOperation::CreateNode {
node_id: NodeId::new(i + 1).unwrap(),
label: GLOBAL_INTERNER.intern(format!("Node_{}", i)).unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry = WalEntry::new(lsn, operation);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
file.write_all(&buffer).unwrap();
}
file.sync_all().unwrap();
drop(file);
let entries = read_segment(&segment_path, LSN(1)).unwrap();
assert_eq!(entries.len(), num_entries as usize);
for (i, entry) in entries.iter().enumerate() {
assert_eq!(entry.lsn, LSN(i as u64 + 1));
}
}
#[test]
fn test_read_multiple_segments_sequentially() {
use std::io::Write;
let dir = TempDir::new().unwrap();
let num_segments = 5;
let entries_per_segment = 100;
for seg_id in 0..num_segments {
let segment_path = dir.path().join(format!("{}.log", seg_id));
let mut file = File::create(&segment_path).unwrap();
file.write_all(&WAL_MAGIC).unwrap();
file.write_all(&[WAL_VERSION]).unwrap();
for i in 0..entries_per_segment {
let lsn = LSN((seg_id * entries_per_segment) + i + 1);
let operation = WalOperation::CreateNode {
node_id: NodeId::new(lsn.0).unwrap(),
label: GLOBAL_INTERNER
.intern(format!("Node_seg{}_entry{}", seg_id, i))
.unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry = WalEntry::new(lsn, operation);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
file.write_all(&buffer).unwrap();
}
file.sync_all().unwrap();
}
let entries = read_entries_from_dir(dir.path(), LSN(1)).unwrap();
assert_eq!(entries.len(), (num_segments * entries_per_segment) as usize);
for i in 0..entries.len() - 1 {
assert!(entries[i].lsn <= entries[i + 1].lsn);
}
}
#[test]
fn test_read_segment_with_start_lsn_filter() {
use std::io::Write;
let dir = TempDir::new().unwrap();
let segment_path = dir.path().join("filtered_segment.log");
let mut file = File::create(&segment_path).unwrap();
file.write_all(&WAL_MAGIC).unwrap();
file.write_all(&[WAL_VERSION]).unwrap();
for i in 1..=100 {
let lsn = LSN(i);
let operation = WalOperation::CreateNode {
node_id: NodeId::new(i).unwrap(),
label: GLOBAL_INTERNER.intern(format!("Node_{}", i)).unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry = WalEntry::new(lsn, operation);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
file.write_all(&buffer).unwrap();
}
file.sync_all().unwrap();
drop(file);
let entries = read_segment(&segment_path, LSN(50)).unwrap();
assert_eq!(entries.len(), 51); assert_eq!(entries[0].lsn, LSN(50));
assert_eq!(entries[entries.len() - 1].lsn, LSN(100));
}
#[test]
fn test_read_empty_segment_efficient() {
use std::io::Write;
let dir = TempDir::new().unwrap();
let segment_path = dir.path().join("empty_segment.log");
let mut file = File::create(&segment_path).unwrap();
file.write_all(&WAL_MAGIC).unwrap();
file.write_all(&[WAL_VERSION]).unwrap();
file.sync_all().unwrap();
drop(file);
let entries = read_segment(&segment_path, LSN(1)).unwrap();
assert!(entries.is_empty());
}
#[test]
fn test_read_segment_with_truncated_entry() {
use std::io::Write;
let dir = TempDir::new().unwrap();
let segment_path = dir.path().join("truncated_segment.log");
let mut file = File::create(&segment_path).unwrap();
file.write_all(&WAL_MAGIC).unwrap();
file.write_all(&[WAL_VERSION]).unwrap();
let operation = WalOperation::CreateNode {
node_id: NodeId::new(1).unwrap(),
label: GLOBAL_INTERNER.intern("Node_1").unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry = WalEntry::new(LSN(1), operation);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
file.write_all(&buffer).unwrap();
file.write_all(&42u64.to_le_bytes()).unwrap();
file.sync_all().unwrap();
drop(file);
let entries = read_segment(&segment_path, LSN(1)).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].lsn, LSN(1));
}
#[test]
fn test_read_nonexistent_file_returns_empty() {
let dir = TempDir::new().unwrap();
let nonexistent = dir.path().join("does_not_exist.log");
let result = read_segment(&nonexistent, LSN(1));
assert!(result.is_ok());
assert!(result.unwrap().is_empty());
}
#[test]
fn test_read_segment_rejects_oversized_file() {
use std::io::Write;
let dir = TempDir::new().unwrap();
let segment_path = dir.path().join("oversized_segment.log");
let mut file = File::create(&segment_path).unwrap();
file.write_all(&WAL_MAGIC).unwrap();
file.write_all(&[WAL_VERSION]).unwrap();
const OVERSIZED: u64 = 1024 * 1024 * 1024 + 1; file.set_len(OVERSIZED).unwrap();
file.sync_all().unwrap();
drop(file);
let result = read_segment(&segment_path, LSN(1));
assert!(result.is_err());
let error_msg = format!("{}", result.unwrap_err());
assert!(
error_msg.contains("too large"),
"Expected 'too large' error, got: {}",
error_msg
);
}
#[test]
fn test_wal_offset_overflow_protection() {
let buffer = [0u8; 100];
let offset = usize::MAX - 10;
let result = parse_entry_at(&buffer, offset, 1);
assert!(result.is_err());
match result {
Err(Error::Storage(StorageError::CorruptedData(msg))) => {
assert_eq!(msg, "WAL offset overflow");
}
_ => panic!("Expected WAL offset overflow error, got: {:?}", result),
}
}
#[test]
fn test_update_node_insufficient_buffer_for_label() {
let node_id = NodeId::new(42).unwrap();
let version_id = VersionId::new(1).unwrap();
let operation = WalOperation::UpdateNode {
node_id,
version_id,
label: GLOBAL_INTERNER.intern("UpdatedPerson").unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry = WalEntry::new(LSN(1), operation);
let mut full_buffer = Vec::new();
serialize_entry_into(&entry, &mut full_buffer).unwrap();
let truncated_buffer = &full_buffer[0..41];
let result = parse_entry_at(truncated_buffer, 0, WAL_VERSION);
assert!(result.is_err());
if let Err(Error::Storage(StorageError::CorruptedData(msg))) = result {
assert_eq!(msg, "Insufficient buffer size for UpdateNode label");
} else {
panic!("Expected specific CorruptedData error, got: {:?}", result);
}
}
#[test]
fn test_update_edge_insufficient_buffer_for_label() {
let edge_id = EdgeId::new(100).unwrap();
let version_id = VersionId::new(1).unwrap();
let operation = WalOperation::UpdateEdge {
edge_id,
version_id,
label: GLOBAL_INTERNER.intern("UPDATED_KNOWS").unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
};
let entry = WalEntry::new(LSN(1), operation);
let mut full_buffer = Vec::new();
serialize_entry_into(&entry, &mut full_buffer).unwrap();
let truncated_buffer = &full_buffer[0..41];
let result = parse_entry_at(truncated_buffer, 0, WAL_VERSION);
assert!(result.is_err());
if let Err(Error::Storage(StorageError::CorruptedData(msg))) = result {
assert_eq!(msg, "Insufficient buffer size for UpdateEdge");
} else {
panic!("Expected specific CorruptedData error, got: {:?}", result);
}
}
#[test]
fn test_update_edge_offset_overflow_before_label() {
}
#[test]
fn test_advance_overflow_protection() {
let mut offset = usize::MAX;
let result = advance(&mut offset, 1);
assert!(result.is_err());
match result {
Err(Error::Storage(StorageError::CorruptedData(msg))) => {
assert_eq!(msg, "WAL offset overflow");
}
_ => panic!("Expected WAL offset overflow error, got: {:?}", result),
}
}
fn make_v0_buffer(
op_byte: u8,
op_data: &[u8],
timestamp: crate::core::hlc::HybridTimestamp,
) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&1u64.to_le_bytes()); timestamp.serialize_into(&mut buf); let checksum_off = buf.len();
buf.extend_from_slice(&0u32.to_le_bytes()); buf.push(op_byte);
buf.extend_from_slice(op_data);
let mut hasher = crc32fast::Hasher::new();
hasher.update(&buf[0..checksum_off]);
hasher.update(&buf[checksum_off + 4..]);
let cs = hasher.finalize();
buf[checksum_off..checksum_off + 4].copy_from_slice(&cs.to_le_bytes());
buf
}
#[test]
fn test_parse_entry_at_version_0_delete_node() {
let timestamp = time::now();
let node_id = NodeId::new(55).unwrap();
let buf = make_v0_buffer(6, &55u64.to_le_bytes(), timestamp); let (entry, consumed) = parse_entry_at(&buf, 0, 0).unwrap();
assert_eq!(consumed, buf.len());
match entry.operation {
WalOperation::DeleteNode {
node_id: parsed_id,
valid_from,
} => {
assert_eq!(parsed_id, node_id);
assert_eq!(valid_from, timestamp);
}
_ => panic!("Expected DeleteNode"),
}
}
#[test]
fn test_parse_entry_at_version_0_delete_edge() {
let timestamp = time::now();
let edge_id = EdgeId::new(200).unwrap();
let buf = make_v0_buffer(7, &200u64.to_le_bytes(), timestamp); let (entry, consumed) = parse_entry_at(&buf, 0, 0).unwrap();
assert_eq!(consumed, buf.len());
match entry.operation {
WalOperation::DeleteEdge {
edge_id: parsed_id,
valid_from,
} => {
assert_eq!(parsed_id, edge_id);
assert_eq!(valid_from, timestamp);
}
_ => panic!("Expected DeleteEdge"),
}
}
#[test]
fn test_parse_entry_at_version_0_update_node() {
let timestamp = time::now();
let node_id = NodeId::new(42).unwrap();
let version_id = VersionId::new(7).unwrap();
let mut op_data = Vec::new();
op_data.extend_from_slice(&42u64.to_le_bytes());
op_data.extend_from_slice(&7u64.to_le_bytes());
let buf = make_v0_buffer(3, &op_data, timestamp); let (entry, consumed) = parse_entry_at(&buf, 0, 0).unwrap();
assert_eq!(consumed, buf.len());
match entry.operation {
WalOperation::UpdateNode {
node_id: parsed_node,
version_id: parsed_ver,
properties,
valid_from,
..
} => {
assert_eq!(parsed_node, node_id);
assert_eq!(parsed_ver, version_id);
assert!(properties.is_empty());
assert_eq!(valid_from, timestamp);
}
_ => panic!("Expected UpdateNode"),
}
}
#[test]
fn test_parse_entry_at_version_0_update_edge() {
let timestamp = time::now();
let edge_id = EdgeId::new(300).unwrap();
let version_id = VersionId::new(5).unwrap();
let mut op_data = Vec::new();
op_data.extend_from_slice(&300u64.to_le_bytes());
op_data.extend_from_slice(&5u64.to_le_bytes());
let buf = make_v0_buffer(4, &op_data, timestamp); let (entry, consumed) = parse_entry_at(&buf, 0, 0).unwrap();
assert_eq!(consumed, buf.len());
match entry.operation {
WalOperation::UpdateEdge {
edge_id: parsed_edge,
version_id: parsed_ver,
properties,
valid_from,
..
} => {
assert_eq!(parsed_edge, edge_id);
assert_eq!(parsed_ver, version_id);
assert!(properties.is_empty());
assert_eq!(valid_from, timestamp);
}
_ => panic!("Expected UpdateEdge"),
}
}
}
#[cfg(test)]
mod regression_tests {
use super::*;
#[test]
fn test_repro_fuzz_update_edge_panic() {
let data = vec![
71, 87, 65, 76, 1, 190, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 40, 1, 1, 1, 1, 1, 71, 87, 65, 76, 0, 4, 0, 0, 0, 1, 40, 1, 1, 1, 1, 1, 71, 87, 65, 76, 76,
0, ];
let result = parse_entry_at(&data, 5, 1);
assert!(
result.is_err(),
"Should return error for truncated buffer, got {:?}",
result
);
}
}
#[cfg(test)]
mod fuzz_tests {
use super::*;
use proptest::prelude::*;
proptest! {
#[test]
fn fuzz_parse_entry_at(
bytes in prop::collection::vec(any::<u8>(), 0..2048),
offset in 0..100usize,
version in 0..2u8
) {
let _ = parse_entry_at(&bytes, offset, version);
}
}
}
#[cfg(test)]
mod sentry_tests {
use super::*;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
#[test]
fn test_read_segment_exactly_max_size_allowed() {
let dir = TempDir::new().unwrap();
let segment_path = dir.path().join("max_size.log");
let mut file = File::create(&segment_path).unwrap();
file.write_all(&WAL_MAGIC).unwrap();
file.write_all(&[WAL_VERSION]).unwrap();
file.set_len(MAX_SEGMENT_SIZE).unwrap();
drop(file);
let result = read_segment(&segment_path, LSN(1));
match result {
Ok(_) => {
}
Err(e) => {
let msg = e.to_string();
assert!(
!msg.contains("too large"),
"Should not reject max size file. Error was: {}",
msg
);
}
}
}
#[test]
fn test_read_segment_header_only() {
let dir = TempDir::new().unwrap();
let segment_path = dir.path().join("header_only.log");
let mut file = File::create(&segment_path).unwrap();
file.write_all(&WAL_MAGIC).unwrap();
file.write_all(&[WAL_VERSION]).unwrap();
drop(file);
let result = read_segment(&segment_path, LSN(1));
assert!(result.is_ok(), "Should accept header-only segment");
assert!(result.unwrap().is_empty());
}
#[test]
fn test_parse_entry_at_exact_header_size() {
let buffer = vec![0u8; 24];
let result = parse_entry_at(&buffer, 0, WAL_VERSION);
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(
msg.contains("operation type"),
"Should fail at op type check, not header check. Got: {}",
msg
);
}
#[test]
fn test_parse_entry_at_exact_header_and_op_type() {
let mut buffer = vec![0u8; 25];
buffer[24] = 255;
let result = parse_entry_at(&buffer, 0, WAL_VERSION);
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(
msg.contains("Unknown WAL operation type"),
"Should read op type and fail validation. Got: {}",
msg
);
}
#[test]
fn test_bolt_pre_allocate_segment_capacity() {
use std::io::Write;
let dir = tempfile::TempDir::new().unwrap();
let file_path = dir.path().join("1.log");
let mut file = std::fs::File::create(&file_path).unwrap();
let mut buffer = Vec::new();
buffer.extend_from_slice(&super::WAL_MAGIC);
buffer.push(super::WAL_VERSION);
buffer.extend(vec![0; 1024 - buffer.len()]);
file.write_all(&buffer).unwrap();
file.sync_all().unwrap();
let entries = read_segment(&file_path, crate::storage::LSN(1)).unwrap();
assert!(
entries.capacity() >= 8,
"⚡ Bolt: Vector should be pre-allocated with capacity based on file size. Capacity was {}",
entries.capacity()
);
}
}