use serde::{Deserialize, Serialize};
use crate::index::IndexSpec;
use crate::value::{EdgeId, Value, VertexId};
use super::records::{EdgeRecord, NodeRecord};
pub const WAL_ENTRY_HEADER_SIZE: usize = 8;
#[repr(C, packed)]
#[derive(Copy, Clone, Debug)]
pub struct WalEntryHeader {
pub crc32: u32,
pub len: u32,
}
impl WalEntryHeader {
pub fn new(crc32: u32, len: u32) -> Self {
Self { crc32, len }
}
pub fn from_bytes(bytes: &[u8]) -> Self {
assert!(
bytes.len() >= WAL_ENTRY_HEADER_SIZE,
"Buffer too small for WalEntryHeader"
);
unsafe {
let ptr = bytes.as_ptr() as *const WalEntryHeader;
ptr.read_unaligned()
}
}
pub fn to_bytes(&self) -> [u8; WAL_ENTRY_HEADER_SIZE] {
unsafe {
let ptr = self as *const WalEntryHeader as *const u8;
let slice = std::slice::from_raw_parts(ptr, WAL_ENTRY_HEADER_SIZE);
let mut result = [0u8; WAL_ENTRY_HEADER_SIZE];
result.copy_from_slice(slice);
result
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum WalEntry {
BeginTx {
tx_id: u64,
timestamp: u64,
},
InsertNode {
id: VertexId,
record: SerializableNodeRecord,
},
InsertEdge {
id: EdgeId,
record: SerializableEdgeRecord,
},
UpdateProperty {
is_vertex: bool,
element_id: u64,
key_id: u32,
old_value: Value,
new_value: Value,
},
DeleteNode {
id: VertexId,
},
DeleteEdge {
id: EdgeId,
},
CommitTx {
tx_id: u64,
},
AbortTx {
tx_id: u64,
},
Checkpoint {
version: u64,
},
SchemaUpdate {
offset: u64,
data: Vec<u8>,
},
CreateIndex {
spec: IndexSpec,
},
DropIndex {
name: String,
},
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct SerializableNodeRecord {
pub id: u64,
pub label_id: u32,
pub flags: u32,
pub first_out_edge: u64,
pub first_in_edge: u64,
pub prop_head: u64,
}
impl From<NodeRecord> for SerializableNodeRecord {
fn from(record: NodeRecord) -> Self {
Self {
id: record.id,
label_id: record.label_id,
flags: record.flags,
first_out_edge: record.first_out_edge,
first_in_edge: record.first_in_edge,
prop_head: record.prop_head,
}
}
}
impl From<SerializableNodeRecord> for NodeRecord {
fn from(ser: SerializableNodeRecord) -> Self {
let mut record = NodeRecord::new(ser.id, ser.label_id);
record.flags = ser.flags;
record.first_out_edge = ser.first_out_edge;
record.first_in_edge = ser.first_in_edge;
record.prop_head = ser.prop_head;
record
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct SerializableEdgeRecord {
pub id: u64,
pub label_id: u32,
pub flags: u32,
pub src: u64,
pub dst: u64,
pub next_out: u64,
pub next_in: u64,
pub prop_head: u64,
}
impl From<EdgeRecord> for SerializableEdgeRecord {
fn from(record: EdgeRecord) -> Self {
Self {
id: record.id,
label_id: record.label_id,
flags: record.flags,
src: record.src,
dst: record.dst,
next_out: record.next_out,
next_in: record.next_in,
prop_head: record.prop_head,
}
}
}
impl From<SerializableEdgeRecord> for EdgeRecord {
fn from(ser: SerializableEdgeRecord) -> Self {
let mut record = EdgeRecord::new(ser.id, ser.label_id, ser.src, ser.dst);
record.flags = ser.flags;
record.next_out = ser.next_out;
record.next_in = ser.next_in;
record.prop_head = ser.prop_head;
record
}
}
use crate::error::StorageError;
use std::collections::HashSet;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
pub struct WriteAheadLog {
file: File,
next_tx_id: AtomicU64,
buffer: Vec<u8>,
}
impl WriteAheadLog {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, StorageError> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)?;
let mut wal = Self {
file,
next_tx_id: AtomicU64::new(0),
buffer: Vec::with_capacity(4096),
};
if let Some(max_tx_id) = wal.scan_max_tx_id()? {
wal.next_tx_id = AtomicU64::new(max_tx_id.saturating_add(1));
}
wal.file.seek(SeekFrom::End(0))?;
Ok(wal)
}
fn scan_max_tx_id(&mut self) -> Result<Option<u64>, StorageError> {
self.file.seek(SeekFrom::Start(0))?;
let mut max_tx_id: Option<u64> = None;
loop {
match self.read_entry() {
Ok(entry) => {
let tx_id = match &entry {
WalEntry::BeginTx { tx_id, .. } => Some(*tx_id),
WalEntry::CommitTx { tx_id } => Some(*tx_id),
WalEntry::AbortTx { tx_id } => Some(*tx_id),
_ => None,
};
if let Some(id) = tx_id {
max_tx_id = Some(max_tx_id.map_or(id, |max| max.max(id)));
}
}
Err(StorageError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(StorageError::WalCorrupted(_)) => {
break;
}
Err(e) => return Err(e),
}
}
Ok(max_tx_id)
}
pub fn begin_transaction(&mut self) -> Result<u64, StorageError> {
let tx_id = self.next_tx_id.fetch_add(1, Ordering::SeqCst);
self.log(WalEntry::BeginTx {
tx_id,
timestamp: Self::now(),
})?;
Ok(tx_id)
}
pub fn log(&mut self, entry: WalEntry) -> Result<u64, StorageError> {
self.buffer.clear();
bincode::serialize_into(&mut self.buffer, &entry)
.map_err(|e| StorageError::WalCorrupted(format!("serialization error: {}", e)))?;
let crc = crc32fast::hash(&self.buffer);
let header = WalEntryHeader::new(crc, self.buffer.len() as u32);
let header_bytes = header.to_bytes();
let offset = self.file.seek(SeekFrom::End(0))?;
self.file.write_all(&header_bytes)?;
self.file.write_all(&self.buffer)?;
Ok(offset)
}
pub fn sync(&mut self) -> Result<(), StorageError> {
self.file.sync_data()?;
Ok(())
}
fn now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
#[cfg(test)]
fn position(&mut self) -> Result<u64, StorageError> {
Ok(self.file.seek(SeekFrom::Current(0))?)
}
#[cfg(test)]
fn current_tx_id(&self) -> u64 {
self.next_tx_id.load(Ordering::SeqCst)
}
pub fn read_entry(&mut self) -> Result<WalEntry, StorageError> {
let mut header_bytes = [0u8; WAL_ENTRY_HEADER_SIZE];
self.file.read_exact(&mut header_bytes)?;
let header = WalEntryHeader::from_bytes(&header_bytes);
let crc32 = header.crc32;
let len = header.len;
if len > 100_000_000 {
return Err(StorageError::WalCorrupted(format!(
"entry length {} exceeds maximum",
len
)));
}
let mut entry_data = vec![0u8; len as usize];
self.file.read_exact(&mut entry_data)?;
let computed_crc = crc32fast::hash(&entry_data);
if computed_crc != crc32 {
return Err(StorageError::WalCorrupted(format!(
"CRC32 mismatch: expected {:08x}, got {:08x}",
crc32, computed_crc
)));
}
let entry: WalEntry = bincode::deserialize(&entry_data)
.map_err(|e| StorageError::WalCorrupted(format!("deserialization error: {}", e)))?;
Ok(entry)
}
pub fn needs_recovery(&mut self) -> bool {
let original_pos = match self.file.stream_position() {
Ok(pos) => pos,
Err(_) => return false,
};
if self.file.seek(SeekFrom::Start(0)).is_err() {
return false;
}
let mut active_transactions: HashSet<u64> = HashSet::new();
loop {
match self.read_entry() {
Ok(entry) => match entry {
WalEntry::BeginTx { tx_id, .. } => {
active_transactions.insert(tx_id);
}
WalEntry::CommitTx { tx_id } | WalEntry::AbortTx { tx_id } => {
active_transactions.remove(&tx_id);
}
WalEntry::Checkpoint { .. } => {
active_transactions.clear();
}
_ => {
}
},
Err(StorageError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(_) => {
let _ = self.file.seek(SeekFrom::Start(original_pos));
return true;
}
}
}
let _ = self.file.seek(SeekFrom::Start(original_pos));
!active_transactions.is_empty()
}
pub fn truncate(&mut self) -> Result<(), StorageError> {
self.file.set_len(0)?;
self.file.seek(SeekFrom::Start(0))?;
Ok(())
}
pub fn seek_to_start(&mut self) -> Result<(), StorageError> {
self.file.seek(SeekFrom::Start(0))?;
Ok(())
}
pub fn file_size(&self) -> Result<u64, StorageError> {
let metadata = self.file.metadata()?;
Ok(metadata.len())
}
pub fn read_all_entries(&mut self) -> Result<Vec<WalEntry>, StorageError> {
self.seek_to_start()?;
let mut entries = Vec::new();
loop {
match self.read_entry() {
Ok(entry) => entries.push(entry),
Err(StorageError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(e) => return Err(e),
}
}
Ok(entries)
}
pub fn get_committed_entries(&mut self) -> Result<Vec<WalEntry>, StorageError> {
use std::collections::HashMap;
self.seek_to_start()?;
let mut tx_entries: HashMap<u64, Vec<WalEntry>> = HashMap::new();
let mut committed_tx_ids: Vec<u64> = Vec::new();
let mut current_tx_id: Option<u64> = None;
loop {
match self.read_entry() {
Ok(entry) => match &entry {
WalEntry::BeginTx { tx_id, .. } => {
current_tx_id = Some(*tx_id);
tx_entries.insert(*tx_id, Vec::new());
}
WalEntry::CommitTx { tx_id } => {
committed_tx_ids.push(*tx_id);
current_tx_id = None;
}
WalEntry::AbortTx { tx_id } => {
tx_entries.remove(tx_id);
current_tx_id = None;
}
WalEntry::Checkpoint { .. } => {
}
_ => {
if let Some(tx_id) = current_tx_id {
if let Some(entries) = tx_entries.get_mut(&tx_id) {
entries.push(entry);
}
}
}
},
Err(StorageError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(e) => return Err(e),
}
}
let mut result = Vec::new();
for tx_id in committed_tx_ids {
if let Some(entries) = tx_entries.remove(&tx_id) {
result.extend(entries);
}
}
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
#[test]
fn test_wal_entry_header_size() {
assert_eq!(
std::mem::size_of::<WalEntryHeader>(),
WAL_ENTRY_HEADER_SIZE,
"WalEntryHeader size must be exactly 8 bytes"
);
}
#[test]
fn test_wal_entry_header_alignment() {
assert_eq!(
std::mem::size_of::<WalEntryHeader>(),
4 + 4,
"WalEntryHeader fields should sum to 8 bytes"
);
}
#[test]
fn test_wal_entry_header_new() {
let header = WalEntryHeader::new(0x12345678, 256);
let crc32 = header.crc32;
let len = header.len;
assert_eq!(crc32, 0x12345678);
assert_eq!(len, 256);
}
#[test]
fn test_wal_entry_header_roundtrip() {
let header = WalEntryHeader::new(0xDEADBEEF, 1024);
let orig_crc32 = header.crc32;
let orig_len = header.len;
let bytes = header.to_bytes();
assert_eq!(bytes.len(), WAL_ENTRY_HEADER_SIZE);
let recovered = WalEntryHeader::from_bytes(&bytes);
let rec_crc32 = recovered.crc32;
let rec_len = recovered.len;
assert_eq!(rec_crc32, orig_crc32);
assert_eq!(rec_len, orig_len);
}
#[test]
fn test_wal_entry_header_byte_order() {
let header = WalEntryHeader::new(0x01020304, 0x05060708);
let bytes = header.to_bytes();
let crc_bytes: [u8; 4] = [bytes[0], bytes[1], bytes[2], bytes[3]];
assert_eq!(crc_bytes[0], 0x04); assert_eq!(crc_bytes[3], 0x01);
let len_bytes: [u8; 4] = [bytes[4], bytes[5], bytes[6], bytes[7]];
assert_eq!(len_bytes[0], 0x08); assert_eq!(len_bytes[3], 0x05);
}
#[test]
fn test_begin_tx_serializes() {
let entry = WalEntry::BeginTx {
tx_id: 42,
timestamp: 1704067200,
};
let serialized = bincode::serialize(&entry).expect("serialize");
let deserialized: WalEntry = bincode::deserialize(&serialized).expect("deserialize");
assert_eq!(entry, deserialized);
}
#[test]
fn test_commit_tx_serializes() {
let entry = WalEntry::CommitTx { tx_id: 123 };
let serialized = bincode::serialize(&entry).expect("serialize");
let deserialized: WalEntry = bincode::deserialize(&serialized).expect("deserialize");
assert_eq!(entry, deserialized);
}
#[test]
fn test_abort_tx_serializes() {
let entry = WalEntry::AbortTx { tx_id: 456 };
let serialized = bincode::serialize(&entry).expect("serialize");
let deserialized: WalEntry = bincode::deserialize(&serialized).expect("deserialize");
assert_eq!(entry, deserialized);
}
#[test]
fn test_checkpoint_serializes() {
let entry = WalEntry::Checkpoint { version: 789 };
let serialized = bincode::serialize(&entry).expect("serialize");
let deserialized: WalEntry = bincode::deserialize(&serialized).expect("deserialize");
assert_eq!(entry, deserialized);
}
#[test]
fn test_insert_node_serializes() {
let record = SerializableNodeRecord {
id: 100,
label_id: 5,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: 1024,
};
let entry = WalEntry::InsertNode {
id: VertexId(100),
record,
};
let serialized = bincode::serialize(&entry).expect("serialize");
let deserialized: WalEntry = bincode::deserialize(&serialized).expect("deserialize");
assert_eq!(entry, deserialized);
}
#[test]
fn test_insert_edge_serializes() {
let record = SerializableEdgeRecord {
id: 200,
label_id: 10,
flags: 0,
src: 1,
dst: 2,
next_out: u64::MAX,
next_in: u64::MAX,
prop_head: 2048,
};
let entry = WalEntry::InsertEdge {
id: EdgeId(200),
record,
};
let serialized = bincode::serialize(&entry).expect("serialize");
let deserialized: WalEntry = bincode::deserialize(&serialized).expect("deserialize");
assert_eq!(entry, deserialized);
}
#[test]
fn test_update_property_serializes() {
let entry = WalEntry::UpdateProperty {
is_vertex: true,
element_id: 42,
key_id: 7,
old_value: Value::Int(10),
new_value: Value::Int(20),
};
let serialized = bincode::serialize(&entry).expect("serialize");
let deserialized: WalEntry = bincode::deserialize(&serialized).expect("deserialize");
assert_eq!(entry, deserialized);
}
#[test]
fn test_update_property_with_complex_values() {
let mut old_map = crate::value::ValueMap::new();
old_map.insert("name".to_string(), Value::String("Alice".to_string()));
let mut new_map = crate::value::ValueMap::new();
new_map.insert("name".to_string(), Value::String("Bob".to_string()));
new_map.insert("age".to_string(), Value::Int(30));
let entry = WalEntry::UpdateProperty {
is_vertex: false,
element_id: 99,
key_id: 15,
old_value: Value::Map(old_map),
new_value: Value::Map(new_map),
};
let serialized = bincode::serialize(&entry).expect("serialize");
let deserialized: WalEntry = bincode::deserialize(&serialized).expect("deserialize");
assert_eq!(entry, deserialized);
}
#[test]
fn test_delete_node_serializes() {
let entry = WalEntry::DeleteNode { id: VertexId(555) };
let serialized = bincode::serialize(&entry).expect("serialize");
let deserialized: WalEntry = bincode::deserialize(&serialized).expect("deserialize");
assert_eq!(entry, deserialized);
}
#[test]
fn test_delete_edge_serializes() {
let entry = WalEntry::DeleteEdge { id: EdgeId(666) };
let serialized = bincode::serialize(&entry).expect("serialize");
let deserialized: WalEntry = bincode::deserialize(&serialized).expect("deserialize");
assert_eq!(entry, deserialized);
}
#[test]
fn test_wal_entry_is_clone() {
let entry = WalEntry::BeginTx {
tx_id: 1,
timestamp: 1000,
};
let cloned = entry.clone();
assert_eq!(entry, cloned);
}
#[test]
fn test_wal_entry_is_debug() {
let entry = WalEntry::BeginTx {
tx_id: 1,
timestamp: 1000,
};
let debug_str = format!("{:?}", entry);
assert!(debug_str.contains("BeginTx"));
assert!(debug_str.contains("tx_id"));
}
#[test]
fn test_serializable_node_record_from_node_record() {
let node = NodeRecord::new(42, 7);
let ser: SerializableNodeRecord = node.into();
assert_eq!(ser.id, 42);
assert_eq!(ser.label_id, 7);
assert_eq!(ser.flags, 0);
assert_eq!(ser.first_out_edge, u64::MAX);
assert_eq!(ser.first_in_edge, u64::MAX);
assert_eq!(ser.prop_head, u64::MAX);
}
#[test]
fn test_node_record_from_serializable() {
let ser = SerializableNodeRecord {
id: 100,
label_id: 10,
flags: 1,
first_out_edge: 200,
first_in_edge: 300,
prop_head: 400,
};
let node: NodeRecord = ser.into();
let id = node.id;
let label_id = node.label_id;
let flags = node.flags;
let first_out_edge = node.first_out_edge;
let first_in_edge = node.first_in_edge;
let prop_head = node.prop_head;
assert_eq!(id, 100);
assert_eq!(label_id, 10);
assert_eq!(flags, 1);
assert_eq!(first_out_edge, 200);
assert_eq!(first_in_edge, 300);
assert_eq!(prop_head, 400);
}
#[test]
fn test_node_record_roundtrip_through_serializable() {
let mut original = NodeRecord::new(50, 5);
original.flags = 3;
original.first_out_edge = 100;
original.first_in_edge = 200;
original.prop_head = 300;
let orig_id = original.id;
let orig_label_id = original.label_id;
let orig_flags = original.flags;
let orig_first_out = original.first_out_edge;
let orig_first_in = original.first_in_edge;
let orig_prop_head = original.prop_head;
let ser: SerializableNodeRecord = original.into();
let recovered: NodeRecord = ser.into();
let rec_id = recovered.id;
let rec_label_id = recovered.label_id;
let rec_flags = recovered.flags;
let rec_first_out = recovered.first_out_edge;
let rec_first_in = recovered.first_in_edge;
let rec_prop_head = recovered.prop_head;
assert_eq!(rec_id, orig_id);
assert_eq!(rec_label_id, orig_label_id);
assert_eq!(rec_flags, orig_flags);
assert_eq!(rec_first_out, orig_first_out);
assert_eq!(rec_first_in, orig_first_in);
assert_eq!(rec_prop_head, orig_prop_head);
}
#[test]
fn test_serializable_edge_record_from_edge_record() {
let edge = EdgeRecord::new(42, 7, 10, 20);
let ser: SerializableEdgeRecord = edge.into();
assert_eq!(ser.id, 42);
assert_eq!(ser.label_id, 7);
assert_eq!(ser.flags, 0);
assert_eq!(ser.src, 10);
assert_eq!(ser.dst, 20);
assert_eq!(ser.next_out, u64::MAX);
assert_eq!(ser.next_in, u64::MAX);
assert_eq!(ser.prop_head, u64::MAX);
}
#[test]
fn test_edge_record_from_serializable() {
let ser = SerializableEdgeRecord {
id: 100,
label_id: 10,
flags: 1,
src: 5,
dst: 15,
next_out: 200,
next_in: 300,
prop_head: 400,
};
let edge: EdgeRecord = ser.into();
let id = edge.id;
let label_id = edge.label_id;
let flags = edge.flags;
let src = edge.src;
let dst = edge.dst;
let next_out = edge.next_out;
let next_in = edge.next_in;
let prop_head = edge.prop_head;
assert_eq!(id, 100);
assert_eq!(label_id, 10);
assert_eq!(flags, 1);
assert_eq!(src, 5);
assert_eq!(dst, 15);
assert_eq!(next_out, 200);
assert_eq!(next_in, 300);
assert_eq!(prop_head, 400);
}
#[test]
fn test_edge_record_roundtrip_through_serializable() {
let mut original = EdgeRecord::new(50, 5, 1, 2);
original.flags = 1;
original.next_out = 100;
original.next_in = 200;
original.prop_head = 300;
let orig_id = original.id;
let orig_label_id = original.label_id;
let orig_flags = original.flags;
let orig_src = original.src;
let orig_dst = original.dst;
let orig_next_out = original.next_out;
let orig_next_in = original.next_in;
let orig_prop_head = original.prop_head;
let ser: SerializableEdgeRecord = original.into();
let recovered: EdgeRecord = ser.into();
let rec_id = recovered.id;
let rec_label_id = recovered.label_id;
let rec_flags = recovered.flags;
let rec_src = recovered.src;
let rec_dst = recovered.dst;
let rec_next_out = recovered.next_out;
let rec_next_in = recovered.next_in;
let rec_prop_head = recovered.prop_head;
assert_eq!(rec_id, orig_id);
assert_eq!(rec_label_id, orig_label_id);
assert_eq!(rec_flags, orig_flags);
assert_eq!(rec_src, orig_src);
assert_eq!(rec_dst, orig_dst);
assert_eq!(rec_next_out, orig_next_out);
assert_eq!(rec_next_in, orig_next_in);
assert_eq!(rec_prop_head, orig_prop_head);
}
#[test]
fn test_all_entry_types_serialize_with_bincode() {
let entries = vec![
WalEntry::BeginTx {
tx_id: 1,
timestamp: 1000,
},
WalEntry::InsertNode {
id: VertexId(1),
record: SerializableNodeRecord {
id: 1,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
},
WalEntry::InsertEdge {
id: EdgeId(1),
record: SerializableEdgeRecord {
id: 1,
label_id: 1,
flags: 0,
src: 0,
dst: 1,
next_out: u64::MAX,
next_in: u64::MAX,
prop_head: u64::MAX,
},
},
WalEntry::UpdateProperty {
is_vertex: true,
element_id: 0,
key_id: 0,
old_value: Value::Null,
new_value: Value::Int(42),
},
WalEntry::DeleteNode { id: VertexId(0) },
WalEntry::DeleteEdge { id: EdgeId(0) },
WalEntry::CommitTx { tx_id: 1 },
WalEntry::AbortTx { tx_id: 2 },
WalEntry::Checkpoint { version: 1 },
];
for entry in entries {
let serialized = bincode::serialize(&entry).expect(&format!("serialize {:?}", entry));
let deserialized: WalEntry =
bincode::deserialize(&serialized).expect(&format!("deserialize {:?}", entry));
assert_eq!(
entry, deserialized,
"Entry {:?} did not roundtrip correctly",
entry
);
}
}
#[test]
fn test_wal_entry_header_size_constant() {
assert_eq!(WAL_ENTRY_HEADER_SIZE, 8);
}
#[test]
fn test_wal_entry_with_all_value_types() {
let value_variants = vec![
Value::Null,
Value::Bool(true),
Value::Bool(false),
Value::Int(i64::MIN),
Value::Int(i64::MAX),
Value::Float(f64::MIN),
Value::Float(f64::MAX),
Value::String("test string".to_string()),
Value::String(String::new()),
Value::List(vec![Value::Int(1), Value::Bool(true)]),
Value::List(vec![]),
Value::Vertex(VertexId(0)),
Value::Vertex(VertexId(u64::MAX)),
Value::Edge(EdgeId(0)),
Value::Edge(EdgeId(u64::MAX)),
];
for old_val in &value_variants {
for new_val in &value_variants {
let entry = WalEntry::UpdateProperty {
is_vertex: true,
element_id: 42,
key_id: 7,
old_value: old_val.clone(),
new_value: new_val.clone(),
};
let serialized = bincode::serialize(&entry)
.expect(&format!("serialize with {:?} -> {:?}", old_val, new_val));
let deserialized: WalEntry = bincode::deserialize(&serialized)
.expect(&format!("deserialize with {:?} -> {:?}", old_val, new_val));
assert_eq!(entry, deserialized);
}
}
}
#[test]
fn test_wal_entry_with_nested_map_value() {
let mut inner_map = crate::value::ValueMap::new();
inner_map.insert("nested_key".to_string(), Value::Int(100));
let mut outer_map = crate::value::ValueMap::new();
outer_map.insert("inner".to_string(), Value::Map(inner_map));
outer_map.insert(
"list".to_string(),
Value::List(vec![Value::Int(1), Value::Int(2)]),
);
let entry = WalEntry::UpdateProperty {
is_vertex: false,
element_id: 1,
key_id: 2,
old_value: Value::Null,
new_value: Value::Map(outer_map),
};
let serialized = bincode::serialize(&entry).expect("serialize");
let deserialized: WalEntry = bincode::deserialize(&serialized).expect("deserialize");
assert_eq!(entry, deserialized);
}
#[test]
fn test_wal_open_creates_new_file() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
assert!(!wal_path.exists(), "WAL file should not exist initially");
let wal = WriteAheadLog::open(&wal_path).expect("open WAL");
drop(wal);
assert!(wal_path.exists(), "WAL file should be created");
}
#[test]
fn test_wal_open_existing_file() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
{
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
assert_eq!(tx_id, 0, "first tx_id should be 0");
}
let mut wal = WriteAheadLog::open(&wal_path).expect("reopen WAL");
let tx_id = wal.begin_transaction().expect("begin another tx");
assert_eq!(
tx_id, 1,
"tx_id should continue after reopen, not reset to 0"
);
}
#[test]
fn test_wal_tx_id_continues_after_multiple_reopens() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
{
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx 1");
wal.log(WalEntry::CommitTx { tx_id: tx1 }).expect("commit");
let tx2 = wal.begin_transaction().expect("begin tx 2");
wal.log(WalEntry::CommitTx { tx_id: tx2 }).expect("commit");
let tx3 = wal.begin_transaction().expect("begin tx 3");
wal.log(WalEntry::CommitTx { tx_id: tx3 }).expect("commit");
wal.sync().expect("sync");
assert_eq!(tx3, 2, "third tx should be 2");
}
{
let mut wal = WriteAheadLog::open(&wal_path).expect("reopen WAL");
let tx4 = wal.begin_transaction().expect("begin tx 4");
assert_eq!(tx4, 3, "tx_id should be 3 after reopening with max=2");
wal.log(WalEntry::CommitTx { tx_id: tx4 }).expect("commit");
wal.sync().expect("sync");
}
{
let mut wal = WriteAheadLog::open(&wal_path).expect("reopen WAL again");
let tx5 = wal.begin_transaction().expect("begin tx 5");
assert_eq!(tx5, 4, "tx_id should be 4 after second reopen");
}
}
#[test]
fn test_wal_begin_transaction_returns_unique_ids() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx 1");
let tx2 = wal.begin_transaction().expect("begin tx 2");
let tx3 = wal.begin_transaction().expect("begin tx 3");
assert_eq!(tx1, 0);
assert_eq!(tx2, 1);
assert_eq!(tx3, 2);
}
#[test]
fn test_wal_begin_transaction_increments_counter() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
assert_eq!(wal.current_tx_id(), 0);
let _ = wal.begin_transaction().expect("begin tx");
assert_eq!(wal.current_tx_id(), 1);
let _ = wal.begin_transaction().expect("begin tx");
assert_eq!(wal.current_tx_id(), 2);
}
#[test]
fn test_wal_log_entry_increases_file_size() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let pos_before = wal.position().expect("get position");
assert_eq!(pos_before, 0, "should start at position 0");
let _ = wal.begin_transaction().expect("begin tx");
let pos_after = wal.position().expect("get position");
assert!(
pos_after > pos_before,
"position should increase after logging"
);
}
#[test]
fn test_wal_log_returns_offset() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let offset1 = wal
.log(WalEntry::BeginTx {
tx_id: 0,
timestamp: 1000,
})
.expect("log entry");
assert_eq!(offset1, 0, "first entry should be at offset 0");
let offset2 = wal.log(WalEntry::CommitTx { tx_id: 0 }).expect("log entry");
assert!(offset2 > offset1, "second entry should be at higher offset");
}
#[test]
fn test_wal_log_multiple_entries() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log insert node");
wal.log(WalEntry::InsertNode {
id: VertexId(1),
record: SerializableNodeRecord {
id: 1,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log insert node");
wal.log(WalEntry::InsertEdge {
id: EdgeId(0),
record: SerializableEdgeRecord {
id: 0,
label_id: 2,
flags: 0,
src: 0,
dst: 1,
next_out: u64::MAX,
next_in: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log insert edge");
wal.log(WalEntry::CommitTx { tx_id }).expect("log commit");
let pos = wal.position().expect("get position");
assert!(pos > 0, "WAL should have content");
}
#[test]
fn test_wal_sync_succeeds() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
wal.log(WalEntry::CommitTx { tx_id }).expect("log commit");
wal.sync().expect("sync should succeed");
}
#[test]
fn test_wal_entries_are_append_only() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let offsets: Vec<u64> = (0..5)
.map(|i| {
wal.log(WalEntry::BeginTx {
tx_id: i,
timestamp: 1000 + i,
})
.expect("log entry")
})
.collect();
for i in 1..offsets.len() {
assert!(
offsets[i] > offsets[i - 1],
"offsets should be strictly increasing"
);
}
}
#[test]
fn test_wal_crc32_is_written_correctly() {
use std::io::Read;
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
{
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
wal.log(WalEntry::BeginTx {
tx_id: 42,
timestamp: 1704067200,
})
.expect("log entry");
wal.sync().expect("sync");
}
let mut file = File::open(&wal_path).expect("open file");
let mut header_bytes = [0u8; WAL_ENTRY_HEADER_SIZE];
file.read_exact(&mut header_bytes).expect("read header");
let header = WalEntryHeader::from_bytes(&header_bytes);
let crc = header.crc32;
let len = header.len;
let mut entry_data = vec![0u8; len as usize];
file.read_exact(&mut entry_data).expect("read entry data");
let computed_crc = crc32fast::hash(&entry_data);
assert_eq!(crc, computed_crc, "CRC32 should match");
let entry: WalEntry = bincode::deserialize(&entry_data).expect("deserialize");
match entry {
WalEntry::BeginTx { tx_id, timestamp } => {
assert_eq!(tx_id, 42);
assert_eq!(timestamp, 1704067200);
}
_ => panic!("Expected BeginTx entry"),
}
}
#[test]
fn test_wal_log_all_entry_types() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let entries = vec![
WalEntry::BeginTx {
tx_id: 0,
timestamp: 1000,
},
WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
},
WalEntry::InsertEdge {
id: EdgeId(0),
record: SerializableEdgeRecord {
id: 0,
label_id: 1,
flags: 0,
src: 0,
dst: 1,
next_out: u64::MAX,
next_in: u64::MAX,
prop_head: u64::MAX,
},
},
WalEntry::UpdateProperty {
is_vertex: true,
element_id: 0,
key_id: 1,
old_value: Value::Null,
new_value: Value::Int(42),
},
WalEntry::DeleteNode { id: VertexId(0) },
WalEntry::DeleteEdge { id: EdgeId(0) },
WalEntry::CommitTx { tx_id: 0 },
WalEntry::AbortTx { tx_id: 1 },
WalEntry::Checkpoint { version: 1 },
];
for entry in entries {
wal.log(entry).expect("log entry");
}
let pos = wal.position().expect("get position");
assert!(
pos > 0,
"WAL should have content after logging all entry types"
);
}
#[test]
fn test_wal_log_large_property_value() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let large_string = "x".repeat(100_000);
wal.log(WalEntry::UpdateProperty {
is_vertex: true,
element_id: 0,
key_id: 1,
old_value: Value::Null,
new_value: Value::String(large_string),
})
.expect("log large property");
wal.sync().expect("sync");
let pos = wal.position().expect("get position");
assert!(pos > 100_000, "WAL should contain the large value");
}
#[test]
fn test_wal_multiple_transactions() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx1");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert node");
wal.log(WalEntry::CommitTx { tx_id: tx1 })
.expect("commit tx1");
let tx2 = wal.begin_transaction().expect("begin tx2");
wal.log(WalEntry::InsertNode {
id: VertexId(1),
record: SerializableNodeRecord {
id: 1,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert node");
wal.log(WalEntry::AbortTx { tx_id: tx2 })
.expect("abort tx2");
let tx3 = wal.begin_transaction().expect("begin tx3");
wal.log(WalEntry::InsertNode {
id: VertexId(2),
record: SerializableNodeRecord {
id: 2,
label_id: 2,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert node");
wal.log(WalEntry::CommitTx { tx_id: tx3 })
.expect("commit tx3");
wal.sync().expect("sync");
assert_eq!(tx1, 0);
assert_eq!(tx2, 1);
assert_eq!(tx3, 2);
}
#[test]
fn test_wal_now_returns_reasonable_timestamp() {
let timestamp = WriteAheadLog::now();
assert!(
timestamp > 1704067200,
"timestamp should be after 2024-01-01"
);
assert!(
timestamp < 4102444800,
"timestamp should be before 2100-01-01"
);
}
#[test]
fn test_read_entry_single() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
wal.log(WalEntry::BeginTx {
tx_id: 42,
timestamp: 1704067200,
})
.expect("log entry");
wal.sync().expect("sync");
wal.seek_to_start().expect("seek");
let entry = wal.read_entry().expect("read entry");
match entry {
WalEntry::BeginTx { tx_id, timestamp } => {
assert_eq!(tx_id, 42);
assert_eq!(timestamp, 1704067200);
}
_ => panic!("Expected BeginTx entry"),
}
}
#[test]
fn test_read_entry_multiple() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let entries_to_write = vec![
WalEntry::BeginTx {
tx_id: 0,
timestamp: 1000,
},
WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
},
WalEntry::CommitTx { tx_id: 0 },
];
for entry in &entries_to_write {
wal.log(entry.clone()).expect("log entry");
}
wal.sync().expect("sync");
wal.seek_to_start().expect("seek");
let mut read_entries = Vec::new();
loop {
match wal.read_entry() {
Ok(entry) => read_entries.push(entry),
Err(StorageError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
assert_eq!(read_entries.len(), 3);
assert_eq!(read_entries[0], entries_to_write[0]);
assert_eq!(read_entries[1], entries_to_write[1]);
assert_eq!(read_entries[2], entries_to_write[2]);
}
#[test]
fn test_read_entry_crc_mismatch_detected() {
use std::io::Write;
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
{
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
wal.log(WalEntry::BeginTx {
tx_id: 0,
timestamp: 1000,
})
.expect("log entry");
wal.sync().expect("sync");
}
{
let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(&wal_path)
.expect("open file");
file.seek(SeekFrom::Start(10)).expect("seek");
file.write_all(&[0xFF]).expect("write");
file.sync_all().expect("sync");
}
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
wal.seek_to_start().expect("seek");
let result = wal.read_entry();
assert!(result.is_err(), "Should detect CRC mismatch");
match result {
Err(StorageError::WalCorrupted(msg)) => {
assert!(
msg.contains("CRC32 mismatch"),
"Error should mention CRC mismatch: {}",
msg
);
}
_ => panic!("Expected WalCorrupted error"),
}
}
#[test]
fn test_read_entry_eof_error() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let result = wal.read_entry();
assert!(result.is_err());
match result {
Err(StorageError::Io(e)) => {
assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
}
_ => panic!("Expected EOF error"),
}
}
#[test]
fn test_read_all_entries() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log insert");
wal.log(WalEntry::CommitTx { tx_id }).expect("log commit");
wal.sync().expect("sync");
let entries = wal.read_all_entries().expect("read all");
assert_eq!(entries.len(), 3);
assert!(matches!(entries[0], WalEntry::BeginTx { .. }));
assert!(matches!(entries[1], WalEntry::InsertNode { .. }));
assert!(matches!(entries[2], WalEntry::CommitTx { .. }));
}
#[test]
fn test_read_all_entries_empty_file() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let entries = wal.read_all_entries().expect("read all");
assert!(entries.is_empty(), "Empty WAL should have no entries");
}
#[test]
fn test_needs_recovery_empty_wal() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
assert!(!wal.needs_recovery(), "Empty WAL should not need recovery");
}
#[test]
fn test_needs_recovery_committed_transaction() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log insert");
wal.log(WalEntry::CommitTx { tx_id }).expect("log commit");
wal.sync().expect("sync");
assert!(
!wal.needs_recovery(),
"Committed transaction should not need recovery"
);
}
#[test]
fn test_needs_recovery_uncommitted_transaction() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let _tx_id = wal.begin_transaction().expect("begin tx");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log insert");
wal.sync().expect("sync");
assert!(
wal.needs_recovery(),
"Uncommitted transaction should need recovery"
);
}
#[test]
fn test_needs_recovery_aborted_transaction() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log insert");
wal.log(WalEntry::AbortTx { tx_id }).expect("log abort");
wal.sync().expect("sync");
assert!(
!wal.needs_recovery(),
"Aborted transaction should not need recovery"
);
}
#[test]
fn test_needs_recovery_mixed_transactions() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx1");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.log(WalEntry::CommitTx { tx_id: tx1 })
.expect("commit tx1");
let _tx2 = wal.begin_transaction().expect("begin tx2");
wal.log(WalEntry::InsertNode {
id: VertexId(1),
record: SerializableNodeRecord {
id: 1,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.sync().expect("sync");
assert!(
wal.needs_recovery(),
"Mixed transactions with one uncommitted should need recovery"
);
}
#[test]
fn test_needs_recovery_checkpoint_clears_state() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.log(WalEntry::CommitTx { tx_id }).expect("commit");
wal.log(WalEntry::Checkpoint { version: 1 })
.expect("checkpoint");
wal.sync().expect("sync");
assert!(
!wal.needs_recovery(),
"After checkpoint, should not need recovery"
);
}
#[test]
fn test_truncate_clears_file() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.log(WalEntry::CommitTx { tx_id }).expect("commit");
wal.sync().expect("sync");
let size_before = wal.file_size().expect("get size");
assert!(size_before > 0, "WAL should have content before truncate");
wal.truncate().expect("truncate");
let size_after = wal.file_size().expect("get size");
assert_eq!(size_after, 0, "WAL should be empty after truncate");
}
#[test]
fn test_truncate_allows_new_writes() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx1");
wal.log(WalEntry::CommitTx { tx_id: tx1 })
.expect("commit tx1");
wal.truncate().expect("truncate");
let tx2 = wal.begin_transaction().expect("begin tx2");
wal.log(WalEntry::CommitTx { tx_id: tx2 })
.expect("commit tx2");
wal.sync().expect("sync");
let entries = wal.read_all_entries().expect("read all");
assert_eq!(entries.len(), 2, "Should have 2 entries from tx2");
}
#[test]
fn test_file_size() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let size0 = wal.file_size().expect("get size");
assert_eq!(size0, 0, "New WAL should be empty");
wal.log(WalEntry::BeginTx {
tx_id: 0,
timestamp: 1000,
})
.expect("log");
wal.sync().expect("sync");
let size1 = wal.file_size().expect("get size");
assert!(size1 > 0, "WAL should have content after logging");
}
#[test]
fn test_seek_to_start() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
wal.log(WalEntry::BeginTx {
tx_id: 42,
timestamp: 1000,
})
.expect("log");
wal.sync().expect("sync");
wal.seek_to_start().expect("seek");
let entry = wal.read_entry().expect("read");
match entry {
WalEntry::BeginTx { tx_id, .. } => assert_eq!(tx_id, 42),
_ => panic!("Expected BeginTx"),
}
wal.seek_to_start().expect("seek");
let entry2 = wal.read_entry().expect("read");
match entry2 {
WalEntry::BeginTx { tx_id, .. } => assert_eq!(tx_id, 42),
_ => panic!("Expected BeginTx"),
}
}
#[test]
fn test_get_committed_entries_single_committed() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.log(WalEntry::InsertNode {
id: VertexId(1),
record: SerializableNodeRecord {
id: 1,
label_id: 2,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.log(WalEntry::CommitTx { tx_id }).expect("commit");
wal.sync().expect("sync");
let committed = wal.get_committed_entries().expect("get committed");
assert_eq!(committed.len(), 2);
assert!(matches!(
committed[0],
WalEntry::InsertNode {
id: VertexId(0),
..
}
));
assert!(matches!(
committed[1],
WalEntry::InsertNode {
id: VertexId(1),
..
}
));
}
#[test]
fn test_get_committed_entries_excludes_aborted() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx1");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.log(WalEntry::CommitTx { tx_id: tx1 }).expect("commit");
let tx2 = wal.begin_transaction().expect("begin tx2");
wal.log(WalEntry::InsertNode {
id: VertexId(1),
record: SerializableNodeRecord {
id: 1,
label_id: 2,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.log(WalEntry::AbortTx { tx_id: tx2 }).expect("abort");
wal.sync().expect("sync");
let committed = wal.get_committed_entries().expect("get committed");
assert_eq!(committed.len(), 1);
assert!(matches!(
committed[0],
WalEntry::InsertNode {
id: VertexId(0),
..
}
));
}
#[test]
fn test_get_committed_entries_excludes_uncommitted() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx1");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.log(WalEntry::CommitTx { tx_id: tx1 }).expect("commit");
let _tx2 = wal.begin_transaction().expect("begin tx2");
wal.log(WalEntry::InsertNode {
id: VertexId(1),
record: SerializableNodeRecord {
id: 1,
label_id: 2,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.sync().expect("sync");
let committed = wal.get_committed_entries().expect("get committed");
assert_eq!(committed.len(), 1);
assert!(matches!(
committed[0],
WalEntry::InsertNode {
id: VertexId(0),
..
}
));
}
#[test]
fn test_get_committed_entries_preserves_order() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx1");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.log(WalEntry::CommitTx { tx_id: tx1 }).expect("commit");
let tx2 = wal.begin_transaction().expect("begin tx2");
wal.log(WalEntry::InsertNode {
id: VertexId(1),
record: SerializableNodeRecord {
id: 1,
label_id: 2,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.log(WalEntry::CommitTx { tx_id: tx2 }).expect("commit");
let tx3 = wal.begin_transaction().expect("begin tx3");
wal.log(WalEntry::InsertNode {
id: VertexId(2),
record: SerializableNodeRecord {
id: 2,
label_id: 3,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("insert");
wal.log(WalEntry::CommitTx { tx_id: tx3 }).expect("commit");
wal.sync().expect("sync");
let committed = wal.get_committed_entries().expect("get committed");
assert_eq!(committed.len(), 3);
assert!(matches!(
committed[0],
WalEntry::InsertNode {
id: VertexId(0),
..
}
));
assert!(matches!(
committed[1],
WalEntry::InsertNode {
id: VertexId(1),
..
}
));
assert!(matches!(
committed[2],
WalEntry::InsertNode {
id: VertexId(2),
..
}
));
}
#[test]
fn test_get_committed_entries_empty_wal() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let committed = wal.get_committed_entries().expect("get committed");
assert!(committed.is_empty());
}
#[test]
fn test_roundtrip_write_read_all_entry_types() {
let dir = tempfile::tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let entries_to_write = vec![
WalEntry::BeginTx {
tx_id: 0,
timestamp: 1000,
},
WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: 100,
},
},
WalEntry::InsertEdge {
id: EdgeId(0),
record: SerializableEdgeRecord {
id: 0,
label_id: 2,
flags: 0,
src: 0,
dst: 1,
next_out: u64::MAX,
next_in: u64::MAX,
prop_head: 200,
},
},
WalEntry::UpdateProperty {
is_vertex: true,
element_id: 0,
key_id: 3,
old_value: Value::Null,
new_value: Value::String("hello".to_string()),
},
WalEntry::DeleteNode { id: VertexId(0) },
WalEntry::DeleteEdge { id: EdgeId(0) },
WalEntry::CommitTx { tx_id: 0 },
WalEntry::AbortTx { tx_id: 1 },
WalEntry::Checkpoint { version: 42 },
];
for entry in &entries_to_write {
wal.log(entry.clone()).expect("log entry");
}
wal.sync().expect("sync");
let entries_read = wal.read_all_entries().expect("read all");
assert_eq!(entries_read.len(), entries_to_write.len());
for (i, (written, read)) in entries_to_write.iter().zip(entries_read.iter()).enumerate() {
assert_eq!(written, read, "Entry {} mismatch", i);
}
}
}