use std::fs::File;
use crate::error::StorageError;
use crate::value::{EdgeId, VertexId};
use super::records::{EdgeRecord, NodeRecord, EDGE_RECORD_SIZE, HEADER_SIZE, NODE_RECORD_SIZE};
use super::wal::{WalEntry, WriteAheadLog};
pub fn recover(
wal: &mut WriteAheadLog,
data_file: &File,
node_capacity: u64,
) -> Result<RecoveryStats, StorageError> {
let mut stats = RecoveryStats::default();
let entries = wal.get_committed_entries()?;
if entries.is_empty() {
if wal.file_size()? > 0 {
wal.truncate()?;
}
return Ok(stats);
}
for entry in entries {
match entry {
WalEntry::InsertNode { id, record } => {
write_node_to_file(data_file, id, &record.into())?;
stats.nodes_recovered += 1;
}
WalEntry::InsertEdge { id, record } => {
write_edge_to_file(data_file, id, &record.into(), node_capacity)?;
stats.edges_recovered += 1;
}
WalEntry::DeleteNode { id } => {
mark_node_deleted(data_file, id)?;
stats.nodes_deleted += 1;
}
WalEntry::DeleteEdge { id } => {
mark_edge_deleted(data_file, id, node_capacity)?;
stats.edges_deleted += 1;
}
WalEntry::UpdateProperty { .. } => {
stats.properties_updated += 1;
}
WalEntry::SchemaUpdate { offset, data } => {
write_schema_to_file(data_file, offset, &data)?;
stats.schema_updates += 1;
}
WalEntry::BeginTx { .. }
| WalEntry::CommitTx { .. }
| WalEntry::AbortTx { .. }
| WalEntry::Checkpoint { .. } => {}
WalEntry::CreateIndex { .. } | WalEntry::DropIndex { .. } => {}
}
}
data_file.sync_data()?;
wal.truncate()?;
Ok(stats)
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct RecoveryStats {
pub nodes_recovered: u64,
pub edges_recovered: u64,
pub nodes_deleted: u64,
pub edges_deleted: u64,
pub properties_updated: u64,
pub schema_updates: u64,
}
impl RecoveryStats {
pub fn is_empty(&self) -> bool {
self.nodes_recovered == 0
&& self.edges_recovered == 0
&& self.nodes_deleted == 0
&& self.edges_deleted == 0
&& self.properties_updated == 0
&& self.schema_updates == 0
}
pub fn total_operations(&self) -> u64 {
self.nodes_recovered
+ self.edges_recovered
+ self.nodes_deleted
+ self.edges_deleted
+ self.properties_updated
+ self.schema_updates
}
}
#[inline]
fn node_offset(id: VertexId) -> u64 {
HEADER_SIZE as u64 + (id.0 * NODE_RECORD_SIZE as u64)
}
#[inline]
fn edge_offset(id: EdgeId, node_capacity: u64) -> u64 {
HEADER_SIZE as u64
+ (node_capacity * NODE_RECORD_SIZE as u64)
+ (id.0 * EDGE_RECORD_SIZE as u64)
}
fn write_node_to_file(file: &File, id: VertexId, record: &NodeRecord) -> Result<(), StorageError> {
let offset = node_offset(id);
let bytes = record.to_bytes();
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file.write_all_at(&bytes, offset)?;
}
#[cfg(not(unix))]
{
use std::io::{Seek, SeekFrom, Write};
let mut file = file;
file.seek(SeekFrom::Start(offset))?;
file.write_all(&bytes)?;
}
Ok(())
}
fn write_edge_to_file(
file: &File,
id: EdgeId,
record: &EdgeRecord,
node_capacity: u64,
) -> Result<(), StorageError> {
let offset = edge_offset(id, node_capacity);
let bytes = record.to_bytes();
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file.write_all_at(&bytes, offset)?;
}
#[cfg(not(unix))]
{
use std::io::{Seek, SeekFrom, Write};
let mut file = file;
file.seek(SeekFrom::Start(offset))?;
file.write_all(&bytes)?;
}
Ok(())
}
fn mark_node_deleted(file: &File, id: VertexId) -> Result<(), StorageError> {
let offset = node_offset(id);
let mut bytes = [0u8; NODE_RECORD_SIZE];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file.read_exact_at(&mut bytes, offset)?;
}
#[cfg(not(unix))]
{
use std::io::{Read, Seek, SeekFrom};
let mut file = file;
file.seek(SeekFrom::Start(offset))?;
file.read_exact(&mut bytes)?;
}
let mut record = NodeRecord::from_bytes(&bytes);
record.mark_deleted();
let updated_bytes = record.to_bytes();
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file.write_all_at(&updated_bytes, offset)?;
}
#[cfg(not(unix))]
{
use std::io::{Seek, SeekFrom, Write};
let mut file = file;
file.seek(SeekFrom::Start(offset))?;
file.write_all(&updated_bytes)?;
}
Ok(())
}
fn mark_edge_deleted(file: &File, id: EdgeId, node_capacity: u64) -> Result<(), StorageError> {
let offset = edge_offset(id, node_capacity);
let mut bytes = [0u8; EDGE_RECORD_SIZE];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file.read_exact_at(&mut bytes, offset)?;
}
#[cfg(not(unix))]
{
use std::io::{Read, Seek, SeekFrom};
let mut file = file;
file.seek(SeekFrom::Start(offset))?;
file.read_exact(&mut bytes)?;
}
let mut record = EdgeRecord::from_bytes(&bytes);
record.mark_deleted();
let updated_bytes = record.to_bytes();
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file.write_all_at(&updated_bytes, offset)?;
}
#[cfg(not(unix))]
{
use std::io::{Seek, SeekFrom, Write};
let mut file = file;
file.seek(SeekFrom::Start(offset))?;
file.write_all(&updated_bytes)?;
}
Ok(())
}
fn write_schema_to_file(file: &File, offset: u64, data: &[u8]) -> Result<(), StorageError> {
let metadata = file.metadata()?;
let required_size = offset + data.len() as u64;
if required_size > metadata.len() {
file.set_len(required_size)?;
}
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file.write_all_at(data, offset)?;
}
#[cfg(not(unix))]
{
use std::io::{Seek, SeekFrom, Write};
let mut file = file;
file.seek(SeekFrom::Start(offset))?;
file.write_all(data)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::mmap::records::FileHeader;
use crate::storage::mmap::wal::{SerializableEdgeRecord, SerializableNodeRecord, WalEntry};
use std::fs::OpenOptions;
use tempfile::TempDir;
fn create_test_db(dir: &TempDir, node_capacity: u64, edge_capacity: u64) -> File {
let db_path = dir.path().join("test.db");
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&db_path)
.expect("create db file");
let node_table_size = node_capacity * NODE_RECORD_SIZE as u64;
let edge_table_size = edge_capacity * EDGE_RECORD_SIZE as u64;
let file_size = HEADER_SIZE as u64 + node_table_size + edge_table_size;
file.set_len(file_size).expect("set file length");
let mut header = FileHeader::new();
header.node_capacity = node_capacity;
header.edge_capacity = edge_capacity;
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file.write_all_at(&header.to_bytes(), 0)
.expect("write header");
}
#[cfg(not(unix))]
{
use std::io::{Seek, SeekFrom, Write};
let mut f = &file;
f.seek(SeekFrom::Start(0)).unwrap();
f.write_all(&header.to_bytes()).unwrap();
}
file.sync_all().expect("sync file");
file
}
fn read_node_record(file: &File, id: VertexId) -> NodeRecord {
let offset = node_offset(id);
let mut bytes = [0u8; NODE_RECORD_SIZE];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file.read_exact_at(&mut bytes, offset).expect("read node");
}
#[cfg(not(unix))]
{
use std::io::{Read, Seek, SeekFrom};
let mut f = file;
f.seek(SeekFrom::Start(offset)).unwrap();
f.read_exact(&mut bytes).unwrap();
}
NodeRecord::from_bytes(&bytes)
}
fn read_edge_record(file: &File, id: EdgeId, node_capacity: u64) -> EdgeRecord {
let offset = edge_offset(id, node_capacity);
let mut bytes = [0u8; EDGE_RECORD_SIZE];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file.read_exact_at(&mut bytes, offset).expect("read edge");
}
#[cfg(not(unix))]
{
use std::io::{Read, Seek, SeekFrom};
let mut f = file;
f.seek(SeekFrom::Start(offset)).unwrap();
f.read_exact(&mut bytes).unwrap();
}
EdgeRecord::from_bytes(&bytes)
}
#[test]
fn test_node_offset_calculation() {
assert_eq!(node_offset(VertexId(0)), HEADER_SIZE as u64);
assert_eq!(
node_offset(VertexId(1)),
HEADER_SIZE as u64 + NODE_RECORD_SIZE as u64
);
assert_eq!(
node_offset(VertexId(100)),
HEADER_SIZE as u64 + 100 * NODE_RECORD_SIZE as u64
);
}
#[test]
fn test_edge_offset_calculation() {
let node_capacity = 1000u64;
let expected_base = HEADER_SIZE as u64 + node_capacity * NODE_RECORD_SIZE as u64;
assert_eq!(edge_offset(EdgeId(0), node_capacity), expected_base);
assert_eq!(
edge_offset(EdgeId(1), node_capacity),
expected_base + EDGE_RECORD_SIZE as u64
);
assert_eq!(
edge_offset(EdgeId(500), node_capacity),
expected_base + 500 * EDGE_RECORD_SIZE as u64
);
}
#[test]
fn test_recovery_stats_default_is_empty() {
let stats = RecoveryStats::default();
assert!(stats.is_empty());
assert_eq!(stats.total_operations(), 0);
}
#[test]
fn test_recovery_stats_not_empty_with_operations() {
let mut stats = RecoveryStats::default();
stats.nodes_recovered = 5;
assert!(!stats.is_empty());
assert_eq!(stats.total_operations(), 5);
}
#[test]
fn test_recovery_stats_total_operations() {
let stats = RecoveryStats {
nodes_recovered: 10,
edges_recovered: 20,
nodes_deleted: 3,
edges_deleted: 2,
properties_updated: 5,
schema_updates: 0,
};
assert_eq!(stats.total_operations(), 40);
}
#[test]
fn test_write_node_to_file() {
let dir = TempDir::new().unwrap();
let file = create_test_db(&dir, 100, 100);
let node = NodeRecord::new(42, 7);
write_node_to_file(&file, VertexId(42), &node).expect("write node");
file.sync_all().expect("sync");
let read_back = read_node_record(&file, VertexId(42));
let id = read_back.id;
let label_id = read_back.label_id;
assert_eq!(id, 42);
assert_eq!(label_id, 7);
}
#[test]
fn test_write_edge_to_file() {
let dir = TempDir::new().unwrap();
let node_capacity = 100;
let file = create_test_db(&dir, node_capacity, 100);
let edge = EdgeRecord::new(10, 5, 1, 2);
write_edge_to_file(&file, EdgeId(10), &edge, node_capacity).expect("write edge");
file.sync_all().expect("sync");
let read_back = read_edge_record(&file, EdgeId(10), node_capacity);
let id = read_back.id;
let label_id = read_back.label_id;
let src = read_back.src;
let dst = read_back.dst;
assert_eq!(id, 10);
assert_eq!(label_id, 5);
assert_eq!(src, 1);
assert_eq!(dst, 2);
}
#[test]
fn test_mark_node_deleted() {
let dir = TempDir::new().unwrap();
let file = create_test_db(&dir, 100, 100);
let node = NodeRecord::new(5, 3);
write_node_to_file(&file, VertexId(5), &node).expect("write node");
file.sync_all().expect("sync");
let before = read_node_record(&file, VertexId(5));
assert!(!before.is_deleted());
mark_node_deleted(&file, VertexId(5)).expect("mark deleted");
file.sync_all().expect("sync");
let after = read_node_record(&file, VertexId(5));
assert!(after.is_deleted());
}
#[test]
fn test_mark_edge_deleted() {
let dir = TempDir::new().unwrap();
let node_capacity = 100;
let file = create_test_db(&dir, node_capacity, 100);
let edge = EdgeRecord::new(7, 2, 0, 1);
write_edge_to_file(&file, EdgeId(7), &edge, node_capacity).expect("write edge");
file.sync_all().expect("sync");
let before = read_edge_record(&file, EdgeId(7), node_capacity);
assert!(!before.is_deleted());
mark_edge_deleted(&file, EdgeId(7), node_capacity).expect("mark deleted");
file.sync_all().expect("sync");
let after = read_edge_record(&file, EdgeId(7), node_capacity);
assert!(after.is_deleted());
}
#[test]
fn test_recover_empty_wal() {
let dir = TempDir::new().unwrap();
let wal_path = dir.path().join("test.wal");
let node_capacity = 100;
let file = create_test_db(&dir, node_capacity, 100);
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let stats = recover(&mut wal, &file, node_capacity).expect("recover");
assert!(stats.is_empty());
assert_eq!(stats.total_operations(), 0);
}
#[test]
fn test_recover_single_committed_node() {
let dir = TempDir::new().unwrap();
let wal_path = dir.path().join("test.wal");
let node_capacity = 100;
let file = create_test_db(&dir, node_capacity, 100);
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 42,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log insert");
wal.log(WalEntry::CommitTx { tx_id }).expect("commit");
wal.sync().expect("sync");
let stats = recover(&mut wal, &file, node_capacity).expect("recover");
assert_eq!(stats.nodes_recovered, 1);
assert_eq!(stats.edges_recovered, 0);
assert_eq!(stats.total_operations(), 1);
let node = read_node_record(&file, VertexId(0));
let id = node.id;
let label_id = node.label_id;
assert_eq!(id, 0);
assert_eq!(label_id, 42);
assert_eq!(wal.file_size().expect("file size"), 0);
}
#[test]
fn test_recover_multiple_committed_transactions() {
let dir = TempDir::new().unwrap();
let wal_path = dir.path().join("test.wal");
let node_capacity = 100;
let file = create_test_db(&dir, node_capacity, 100);
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx1");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 10,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::CommitTx { tx_id: tx1 }).expect("commit");
let tx2 = wal.begin_transaction().expect("begin tx2");
wal.log(WalEntry::InsertNode {
id: VertexId(1),
record: SerializableNodeRecord {
id: 1,
label_id: 20,
flags: 0,
first_out_edge: 0,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::InsertEdge {
id: EdgeId(0),
record: SerializableEdgeRecord {
id: 0,
label_id: 5,
flags: 0,
src: 0,
dst: 1,
next_out: u64::MAX,
next_in: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::CommitTx { tx_id: tx2 }).expect("commit");
wal.sync().expect("sync");
let stats = recover(&mut wal, &file, node_capacity).expect("recover");
assert_eq!(stats.nodes_recovered, 2);
assert_eq!(stats.edges_recovered, 1);
let node0 = read_node_record(&file, VertexId(0));
let (node0_id, node0_label) = (node0.id, node0.label_id);
assert_eq!(node0_id, 0);
assert_eq!(node0_label, 10);
let node1 = read_node_record(&file, VertexId(1));
let (node1_id, node1_label) = (node1.id, node1.label_id);
assert_eq!(node1_id, 1);
assert_eq!(node1_label, 20);
let edge0 = read_edge_record(&file, EdgeId(0), node_capacity);
let (edge0_id, edge0_label, edge0_src, edge0_dst) =
(edge0.id, edge0.label_id, edge0.src, edge0.dst);
assert_eq!(edge0_id, 0);
assert_eq!(edge0_label, 5);
assert_eq!(edge0_src, 0);
assert_eq!(edge0_dst, 1);
}
#[test]
fn test_recover_discards_uncommitted_transactions() {
let dir = TempDir::new().unwrap();
let wal_path = dir.path().join("test.wal");
let node_capacity = 100;
let file = create_test_db(&dir, node_capacity, 100);
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx1");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 100,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::CommitTx { tx_id: tx1 }).expect("commit");
let _tx2 = wal.begin_transaction().expect("begin tx2");
wal.log(WalEntry::InsertNode {
id: VertexId(1),
record: SerializableNodeRecord {
id: 1,
label_id: 999, flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.sync().expect("sync");
let stats = recover(&mut wal, &file, node_capacity).expect("recover");
assert_eq!(stats.nodes_recovered, 1);
let node0 = read_node_record(&file, VertexId(0));
let (node0_id, node0_label) = (node0.id, node0.label_id);
assert_eq!(node0_id, 0);
assert_eq!(node0_label, 100);
let node1 = read_node_record(&file, VertexId(1));
let node1_id = node1.id;
let node1_label = node1.label_id;
assert!(
node1_id != 1 || node1_label != 999,
"Uncommitted transaction should not be recovered"
);
}
#[test]
fn test_recover_discards_aborted_transactions() {
let dir = TempDir::new().unwrap();
let wal_path = dir.path().join("test.wal");
let node_capacity = 100;
let file = create_test_db(&dir, node_capacity, 100);
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx1");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 50,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::CommitTx { tx_id: tx1 }).expect("commit");
let tx2 = wal.begin_transaction().expect("begin tx2");
wal.log(WalEntry::InsertNode {
id: VertexId(1),
record: SerializableNodeRecord {
id: 1,
label_id: 888, flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::AbortTx { tx_id: tx2 }).expect("abort");
let tx3 = wal.begin_transaction().expect("begin tx3");
wal.log(WalEntry::InsertNode {
id: VertexId(2),
record: SerializableNodeRecord {
id: 2,
label_id: 60,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::CommitTx { tx_id: tx3 }).expect("commit");
wal.sync().expect("sync");
let stats = recover(&mut wal, &file, node_capacity).expect("recover");
assert_eq!(stats.nodes_recovered, 2);
let node0 = read_node_record(&file, VertexId(0));
let node0_label = node0.label_id;
assert_eq!(node0_label, 50);
let node2 = read_node_record(&file, VertexId(2));
let node2_label = node2.label_id;
assert_eq!(node2_label, 60);
let node1 = read_node_record(&file, VertexId(1));
assert!(
node1.label_id != 888,
"Aborted transaction should not be recovered"
);
}
#[test]
fn test_recover_handles_delete_operations() {
let dir = TempDir::new().unwrap();
let wal_path = dir.path().join("test.wal");
let node_capacity = 100;
let file = create_test_db(&dir, node_capacity, 100);
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx1");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 10,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::InsertNode {
id: VertexId(1),
record: SerializableNodeRecord {
id: 1,
label_id: 20,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::DeleteNode { id: VertexId(0) })
.expect("log delete");
wal.log(WalEntry::CommitTx { tx_id: tx1 }).expect("commit");
wal.sync().expect("sync");
let stats = recover(&mut wal, &file, node_capacity).expect("recover");
assert_eq!(stats.nodes_recovered, 2);
assert_eq!(stats.nodes_deleted, 1);
let node0 = read_node_record(&file, VertexId(0));
assert!(node0.is_deleted(), "Node 0 should be marked as deleted");
let node1 = read_node_record(&file, VertexId(1));
assert!(!node1.is_deleted(), "Node 1 should not be deleted");
}
#[test]
fn test_recover_is_idempotent() {
let dir = TempDir::new().unwrap();
let wal_path = dir.path().join("test.wal");
let node_capacity = 100;
let file = create_test_db(&dir, node_capacity, 100);
{
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 123,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::CommitTx { tx_id }).expect("commit");
wal.sync().expect("sync");
let stats = recover(&mut wal, &file, node_capacity).expect("recover");
assert_eq!(stats.nodes_recovered, 1);
}
{
let mut wal = WriteAheadLog::open(&wal_path).expect("reopen WAL");
assert_eq!(wal.file_size().expect("file size"), 0);
let stats = recover(&mut wal, &file, node_capacity).expect("recover again");
assert!(stats.is_empty());
}
let node = read_node_record(&file, VertexId(0));
let node_label = node.label_id;
assert_eq!(node_label, 123);
}
#[test]
fn test_recover_truncates_wal_on_success() {
let dir = TempDir::new().unwrap();
let wal_path = dir.path().join("test.wal");
let node_capacity = 100;
let file = create_test_db(&dir, node_capacity, 100);
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::CommitTx { tx_id }).expect("commit");
wal.sync().expect("sync");
assert!(wal.file_size().expect("file size") > 0);
recover(&mut wal, &file, node_capacity).expect("recover");
assert_eq!(wal.file_size().expect("file size"), 0);
}
#[test]
fn test_recover_preserves_order() {
let dir = TempDir::new().unwrap();
let wal_path = dir.path().join("test.wal");
let node_capacity = 100;
let file = create_test_db(&dir, node_capacity, 100);
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx1 = wal.begin_transaction().expect("begin tx1");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 1,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::CommitTx { tx_id: tx1 }).expect("commit");
let tx2 = wal.begin_transaction().expect("begin tx2");
wal.log(WalEntry::InsertNode {
id: VertexId(0),
record: SerializableNodeRecord {
id: 0,
label_id: 2, flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log");
wal.log(WalEntry::CommitTx { tx_id: tx2 }).expect("commit");
wal.sync().expect("sync");
recover(&mut wal, &file, node_capacity).expect("recover");
let node = read_node_record(&file, VertexId(0));
let node_label = node.label_id;
assert_eq!(
node_label, 2,
"Later transaction should overwrite earlier one"
);
}
#[test]
fn test_recover_mixed_nodes_and_edges() {
let dir = TempDir::new().unwrap();
let wal_path = dir.path().join("test.wal");
let node_capacity = 100;
let file = create_test_db(&dir, node_capacity, 100);
let mut wal = WriteAheadLog::open(&wal_path).expect("open WAL");
let tx_id = wal.begin_transaction().expect("begin tx");
for i in 0..3 {
wal.log(WalEntry::InsertNode {
id: VertexId(i),
record: SerializableNodeRecord {
id: i,
label_id: (i * 10) as u32,
flags: 0,
first_out_edge: u64::MAX,
first_in_edge: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log node");
}
wal.log(WalEntry::InsertEdge {
id: EdgeId(0),
record: SerializableEdgeRecord {
id: 0,
label_id: 100,
flags: 0,
src: 0,
dst: 1,
next_out: u64::MAX,
next_in: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log edge");
wal.log(WalEntry::InsertEdge {
id: EdgeId(1),
record: SerializableEdgeRecord {
id: 1,
label_id: 101,
flags: 0,
src: 1,
dst: 2,
next_out: u64::MAX,
next_in: u64::MAX,
prop_head: u64::MAX,
},
})
.expect("log edge");
wal.log(WalEntry::CommitTx { tx_id }).expect("commit");
wal.sync().expect("sync");
let stats = recover(&mut wal, &file, node_capacity).expect("recover");
assert_eq!(stats.nodes_recovered, 3);
assert_eq!(stats.edges_recovered, 2);
for i in 0..3 {
let node = read_node_record(&file, VertexId(i));
let (node_id, node_label) = (node.id, node.label_id);
assert_eq!(node_id, i);
assert_eq!(node_label, (i * 10) as u32);
}
let edge0 = read_edge_record(&file, EdgeId(0), node_capacity);
let (edge0_id, edge0_src, edge0_dst) = (edge0.id, edge0.src, edge0.dst);
assert_eq!(edge0_id, 0);
assert_eq!(edge0_src, 0);
assert_eq!(edge0_dst, 1);
let edge1 = read_edge_record(&file, EdgeId(1), node_capacity);
let (edge1_id, edge1_src, edge1_dst) = (edge1.id, edge1.src, edge1.dst);
assert_eq!(edge1_id, 1);
assert_eq!(edge1_src, 1);
assert_eq!(edge1_dst, 2);
}
}