use crate::txn::version_store::{TransactionId, Timestamp};
use crate::types::{Row, RowId, PartitionId};
use crate::{Result, StorageError};
use crate::config::DurabilityLevel;
use crate::storage::checksum::{Checksum, ChecksumType};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::thread;
use dashmap::DashMap;
pub type LogSequenceNumber = u64;
#[derive(Debug, Clone)]
#[derive(Default)]
pub struct WALConfig {
pub durability_level: DurabilityLevel,
}
impl From<crate::config::WALConfig> for WALConfig {
fn from(config: crate::config::WALConfig) -> Self {
Self {
durability_level: config.durability_level,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum WALRecord {
Insert {
table_name: String, row_id: RowId,
partition: PartitionId,
data: Row,
},
Update {
table_name: String, row_id: RowId,
partition: PartitionId,
old_data: Row, new_data: Row,
},
Delete {
table_name: String, row_id: RowId,
partition: PartitionId,
old_data: Row, timestamp: u64, },
Begin {
txn_id: TransactionId,
isolation_level: u8, },
Commit {
txn_id: TransactionId,
commit_ts: Timestamp,
},
Rollback {
txn_id: TransactionId,
},
Checkpoint { lsn: LogSequenceNumber },
}
#[allow(dead_code)]
#[derive(Debug, Clone, Serialize, Deserialize)]
struct WALEntry {
lsn: LogSequenceNumber,
record: WALRecord,
checksum: u32, }
struct PartitionWAL {
path: PathBuf,
file: File,
next_lsn: LogSequenceNumber,
last_checkpoint: LogSequenceNumber,
config: WALConfig,
}
impl PartitionWAL {
#[allow(dead_code)]
fn create(path: PathBuf) -> Result<Self> {
Self::create_with_config(path, WALConfig::default())
}
fn create_with_config(path: PathBuf, config: WALConfig) -> Result<Self> {
let file = OpenOptions::new()
.create(true)
.append(true)
.read(true)
.open(&path)?;
Ok(Self {
path,
file,
next_lsn: 0,
last_checkpoint: 0,
config,
})
}
#[allow(dead_code)]
fn open(path: PathBuf) -> Result<Self> {
Self::open_with_config(path, WALConfig::default())
}
fn open_with_config(path: PathBuf, config: WALConfig) -> Result<Self> {
let mut file = OpenOptions::new()
.append(true)
.read(true)
.open(&path)?;
let mut next_lsn = 0;
let mut last_checkpoint = 0;
let mut corrupted_count = 0;
file.seek(SeekFrom::Start(0))?;
loop {
let mut len_buf = [0u8; 4];
match file.read_exact(&mut len_buf) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
let _total_len = u32::from_le_bytes(len_buf) as usize;
let mut header = [0u8; 16];
match file.read_exact(&mut header) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
debug_log!("WAL open: Detected partial write (header)");
break;
}
Err(e) => return Err(e.into()),
}
let lsn = u64::from_le_bytes(header[0..8].try_into().unwrap());
let checksum = u32::from_le_bytes(header[8..12].try_into().unwrap());
let record_len = u32::from_le_bytes(header[12..16].try_into().unwrap()) as usize;
let mut record_data = vec![0u8; record_len];
match file.read_exact(&mut record_data) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
debug_log!("WAL open: Detected partial write at end of file");
break;
}
Err(e) => return Err(e.into()),
}
if Checksum::verify(ChecksumType::CRC32C, &record_data, checksum).is_err() {
corrupted_count += 1;
continue;
}
next_lsn = lsn + 1;
if let Ok(record) = bincode::deserialize::<WALRecord>(&record_data) {
if let WALRecord::Checkpoint { lsn: cp_lsn } = record {
last_checkpoint = cp_lsn;
}
}
}
if corrupted_count > 0 {
debug_log!("WAL open: Found {} corrupted records (will skip during recovery)", corrupted_count);
}
Ok(Self {
path,
file,
next_lsn,
last_checkpoint,
config,
})
}
fn append(&mut self, record: WALRecord) -> Result<LogSequenceNumber> {
let lsn = self.next_lsn;
self.next_lsn += 1;
let record_data = bincode::serialize(&record)?;
let checksum = Checksum::compute(ChecksumType::CRC32C, &record_data);
let header_size = 4 + 8 + 4 + 4; let total_len = (header_size + record_data.len()) as u32;
self.file.write_all(&total_len.to_le_bytes())?;
self.file.write_all(&lsn.to_le_bytes())?;
self.file.write_all(&checksum.to_le_bytes())?;
self.file.write_all(&(record_data.len() as u32).to_le_bytes())?;
self.file.write_all(&record_data)?;
match self.config.durability_level {
DurabilityLevel::Synchronous => {
self.file.sync_data()?;
}
DurabilityLevel::GroupCommit { .. } => {
}
DurabilityLevel::Periodic { .. } => {
}
DurabilityLevel::NoSync => {}
}
Ok(lsn)
}
fn batch_append(&mut self, records: Vec<WALRecord>) -> Result<Vec<LogSequenceNumber>> {
if records.is_empty() {
return Ok(Vec::new());
}
let mut lsns = Vec::with_capacity(records.len());
let mut buffer = Vec::new();
for record in records {
let lsn = self.next_lsn;
self.next_lsn += 1;
lsns.push(lsn);
let record_data = bincode::serialize(&record)?;
let checksum = Checksum::compute(ChecksumType::CRC32C, &record_data);
let header_size = 4 + 8 + 4 + 4; let total_len = (header_size + record_data.len()) as u32;
buffer.extend_from_slice(&total_len.to_le_bytes());
buffer.extend_from_slice(&lsn.to_le_bytes());
buffer.extend_from_slice(&checksum.to_le_bytes());
buffer.extend_from_slice(&(record_data.len() as u32).to_le_bytes());
buffer.extend_from_slice(&record_data);
}
self.file.write_all(&buffer)?;
match self.config.durability_level {
DurabilityLevel::Synchronous | DurabilityLevel::GroupCommit { .. } => {
self.file.sync_data()?;
}
DurabilityLevel::Periodic { .. } => {
}
DurabilityLevel::NoSync => {
}
}
Ok(lsns)
}
fn checkpoint(&mut self) -> Result<()> {
if self.next_lsn == 0 {
return Ok(());
}
let lsn = self.next_lsn - 1;
self.append(WALRecord::Checkpoint { lsn })?;
self.last_checkpoint = lsn;
self.file.set_len(0)?;
self.file.sync_all()?;
self.next_lsn = 0;
self.last_checkpoint = 0;
Ok(())
}
fn recover(&mut self) -> Result<Vec<WALRecord>> {
let mut records = Vec::new();
let mut file = File::open(&self.path)?;
file.seek(SeekFrom::Start(0))?;
let mut skipped_corrupted = 0;
loop {
let mut len_buf = [0u8; 4];
match file.read_exact(&mut len_buf) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
let _total_len = u32::from_le_bytes(len_buf) as usize;
let mut header = [0u8; 16];
match file.read_exact(&mut header) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
debug_log!("WAL recovery: Detected partial write (header), skipping");
break;
}
Err(e) => return Err(e.into()),
}
let lsn = u64::from_le_bytes(header[0..8].try_into().unwrap());
let checksum = u32::from_le_bytes(header[8..12].try_into().unwrap());
let record_len = u32::from_le_bytes(header[12..16].try_into().unwrap()) as usize;
let mut record_data = vec![0u8; record_len];
match file.read_exact(&mut record_data) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
debug_log!("WAL recovery: Detected partial write (record), skipping");
break;
}
Err(e) => return Err(e.into()),
}
if let Err(e) = Checksum::verify(ChecksumType::CRC32C, &record_data, checksum) {
debug_log!("WAL recovery: Checksum verification failed for LSN {}: {}", lsn, e);
skipped_corrupted += 1;
continue;
}
let record: WALRecord = match bincode::deserialize(&record_data) {
Ok(r) => r,
Err(e) => {
debug_log!("WAL recovery: Failed to deserialize record: {}", e);
skipped_corrupted += 1;
continue;
}
};
if lsn >= self.last_checkpoint {
if !matches!(record, WALRecord::Checkpoint { .. }) {
records.push(record);
}
}
}
if skipped_corrupted > 0 {
debug_log!("WAL recovery: Skipped {} corrupted records", skipped_corrupted);
}
Ok(records)
}
}
pub struct WALManager {
_base_path: PathBuf,
partitions: Arc<DashMap<PartitionId, parking_lot::Mutex<PartitionWAL>>>,
#[allow(dead_code)]
num_partitions: u8,
_config: WALConfig,
flush_thread: Option<FlushThread>,
}
struct FlushThread {
handle: Option<thread::JoinHandle<()>>,
should_stop: Arc<AtomicBool>,
}
impl WALManager {
pub fn create<P: AsRef<Path>>(base_path: P, num_partitions: u8) -> Result<Self> {
Self::create_with_config(base_path, num_partitions, WALConfig::default())
}
pub fn create_with_config<P: AsRef<Path>>(
base_path: P,
num_partitions: u8,
config: WALConfig,
) -> Result<Self> {
let base_path = base_path.as_ref().to_path_buf();
std::fs::create_dir_all(&base_path)?;
let partitions = DashMap::new();
for partition_id in 0..num_partitions {
let wal_path = base_path.join(format!("partition_{}.wal", partition_id));
let wal = PartitionWAL::create_with_config(wal_path, config.clone())?;
partitions.insert(partition_id, parking_lot::Mutex::new(wal));
}
let partitions = Arc::new(partitions);
let flush_thread = Self::start_flush_thread_if_needed(&config, partitions.clone());
Ok(Self {
_base_path: base_path,
partitions,
num_partitions,
_config: config,
flush_thread,
})
}
pub fn open<P: AsRef<Path>>(base_path: P, num_partitions: u8) -> Result<Self> {
Self::open_with_config(base_path, num_partitions, WALConfig::default())
}
pub fn open_with_config<P: AsRef<Path>>(
base_path: P,
num_partitions: u8,
config: WALConfig,
) -> Result<Self> {
let base_path = base_path.as_ref().to_path_buf();
let partitions = DashMap::new();
for partition_id in 0..num_partitions {
let wal_path = base_path.join(format!("partition_{}.wal", partition_id));
if wal_path.exists() {
let wal = PartitionWAL::open_with_config(wal_path, config.clone())?;
partitions.insert(partition_id, parking_lot::Mutex::new(wal));
} else {
let wal = PartitionWAL::create_with_config(wal_path, config.clone())?;
partitions.insert(partition_id, parking_lot::Mutex::new(wal));
}
}
let partitions = Arc::new(partitions);
let flush_thread = Self::start_flush_thread_if_needed(&config, partitions.clone());
Ok(Self {
_base_path: base_path,
partitions,
num_partitions,
_config: config,
flush_thread,
})
}
fn start_flush_thread_if_needed(
config: &WALConfig,
partitions: Arc<DashMap<PartitionId, parking_lot::Mutex<PartitionWAL>>>,
) -> Option<FlushThread> {
if let DurabilityLevel::Periodic { interval_ms } = config.durability_level {
let should_stop = Arc::new(AtomicBool::new(false));
let should_stop_clone = should_stop.clone();
let interval = Duration::from_millis(interval_ms);
let handle = thread::spawn(move || {
while !should_stop_clone.load(Ordering::Relaxed) {
thread::sleep(interval);
for entry in partitions.iter() {
let wal = entry.value().lock();
let _ = wal.file.sync_data();
}
}
});
Some(FlushThread {
handle: Some(handle),
should_stop,
})
} else {
None
}
}
pub fn log_insert(
&self,
table_name: &str,
partition: PartitionId,
row_id: RowId,
data: Row,
) -> Result<LogSequenceNumber> {
let record = WALRecord::Insert {
table_name: table_name.to_string(),
row_id,
partition,
data,
};
let entry = self.partitions.get(&partition)
.ok_or_else(|| StorageError::Transaction("Invalid partition ID".to_string()))?;
let mut wal = entry.value().lock();
wal.append(record)
}
pub fn log_update(
&self,
table_name: &str,
partition: PartitionId,
row_id: RowId,
old_data: Row,
new_data: Row,
) -> Result<LogSequenceNumber> {
let record = WALRecord::Update {
table_name: table_name.to_string(),
row_id,
partition,
old_data,
new_data,
};
let entry = self.partitions.get(&partition)
.ok_or_else(|| StorageError::Transaction("Invalid partition ID".to_string()))?;
let mut wal = entry.value().lock();
wal.append(record)
}
pub fn log_delete(
&self,
table_name: &str,
partition: PartitionId,
row_id: RowId,
old_data: Row,
timestamp: u64,
) -> Result<LogSequenceNumber> {
let record = WALRecord::Delete {
table_name: table_name.to_string(),
row_id,
partition,
old_data,
timestamp,
};
let entry = self.partitions.get(&partition)
.ok_or_else(|| StorageError::Transaction("Invalid partition ID".to_string()))?;
let mut wal = entry.value().lock();
wal.append(record)
}
pub fn log_begin(
&self,
partition: PartitionId,
txn_id: TransactionId,
isolation_level: u8,
) -> Result<LogSequenceNumber> {
let record = WALRecord::Begin {
txn_id,
isolation_level,
};
let entry = self.partitions.get(&partition)
.ok_or_else(|| StorageError::Transaction("Invalid partition ID".to_string()))?;
let mut wal = entry.value().lock();
wal.append(record)
}
pub fn log_commit(
&self,
partition: PartitionId,
txn_id: TransactionId,
commit_ts: Timestamp,
) -> Result<LogSequenceNumber> {
let record = WALRecord::Commit {
txn_id,
commit_ts,
};
let entry = self.partitions.get(&partition)
.ok_or_else(|| StorageError::Transaction("Invalid partition ID".to_string()))?;
let mut wal = entry.value().lock();
wal.append(record)
}
pub fn log_rollback(
&self,
partition: PartitionId,
txn_id: TransactionId,
) -> Result<LogSequenceNumber> {
let record = WALRecord::Rollback {
txn_id,
};
let entry = self.partitions.get(&partition)
.ok_or_else(|| StorageError::Transaction("Invalid partition ID".to_string()))?;
let mut wal = entry.value().lock();
wal.append(record)
}
pub fn batch_append(
&self,
partition: PartitionId,
records: Vec<WALRecord>,
) -> Result<Vec<LogSequenceNumber>> {
let entry = self.partitions.get(&partition)
.ok_or_else(|| StorageError::Transaction("Invalid partition ID".to_string()))?;
let mut wal = entry.value().lock();
wal.batch_append(records)
}
pub fn checkpoint(&self, partition: PartitionId) -> Result<()> {
let entry = self.partitions.get(&partition)
.ok_or_else(|| StorageError::Transaction("Invalid partition ID".to_string()))?;
let mut wal = entry.value().lock();
wal.checkpoint()
}
pub fn checkpoint_all(&self) -> Result<()> {
for entry in self.partitions.iter() {
let mut wal = entry.value().lock();
wal.checkpoint()?;
}
Ok(())
}
pub fn recover(&self) -> Result<HashMap<PartitionId, Vec<WALRecord>>> {
let mut result = HashMap::new();
for entry in self.partitions.iter() {
let partition_id = *entry.key();
let mut wal = entry.value().lock();
let records = wal.recover()?;
result.insert(partition_id, records);
}
Ok(result)
}
}
impl Drop for WALManager {
fn drop(&mut self) {
if let Some(mut flush_thread) = self.flush_thread.take() {
flush_thread.should_stop.store(true, Ordering::Relaxed);
if let Some(handle) = flush_thread.handle.take() {
let _ = handle.join();
}
}
for entry in self.partitions.iter() {
let wal = entry.value().lock();
let _ = wal.file.sync_data();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{Value, Timestamp};
use tempfile::TempDir;
#[test]
fn test_wal_create() {
let temp_dir = TempDir::new().unwrap();
let wal = WALManager::create(temp_dir.path(), 4).unwrap();
assert_eq!(wal.num_partitions, 4);
}
#[test]
fn test_wal_log_insert() {
let temp_dir = TempDir::new().unwrap();
let wal = WALManager::create(temp_dir.path(), 4).unwrap();
let row = vec![Value::Null];
let lsn = wal.log_insert("test_table", 0, 1, row).unwrap();
assert_eq!(lsn, 0);
}
#[test]
fn test_wal_checkpoint() {
let temp_dir = TempDir::new().unwrap();
let wal = WALManager::create(temp_dir.path(), 4).unwrap();
wal.log_insert("test_table", 0, 1, vec![Value::Null]).unwrap();
wal.checkpoint(0).unwrap();
}
#[test]
fn test_wal_recovery() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path();
{
let wal = WALManager::create(path, 2).unwrap();
wal.log_insert("test_table", 0, 1, vec![Value::Null]).unwrap();
wal.log_insert("test_table", 0, 2, vec![Value::Null]).unwrap();
wal.log_insert("test_table", 1, 3, vec![Value::Null]).unwrap();
}
{
let wal = WALManager::open(path, 2).unwrap();
let recovered = wal.recover().unwrap();
assert_eq!(recovered.len(), 2);
let count_inserts = |records: &[WALRecord]| -> usize {
records.iter().filter(|r| matches!(r, WALRecord::Insert { .. })).count()
};
assert_eq!(count_inserts(recovered.get(&0).unwrap()), 2);
assert_eq!(count_inserts(recovered.get(&1).unwrap()), 1);
}
}
#[test]
fn test_wal_update_operation() {
let temp_dir = TempDir::new().unwrap();
let wal = WALManager::create(temp_dir.path(), 2).unwrap();
let old_data = vec![Value::Null];
let new_data = vec![Value::Null];
let lsn = wal.log_update("test_table", 0, 1, old_data.clone(), new_data.clone()).unwrap();
assert_eq!(lsn, 0);
let recovered = wal.recover().unwrap();
let records = recovered.get(&0).unwrap();
assert_eq!(records.len(), 1);
assert!(matches!(records[0], WALRecord::Update { .. }));
}
#[test]
fn test_wal_delete_operation() {
let temp_dir = TempDir::new().unwrap();
let wal = WALManager::create(temp_dir.path(), 2).unwrap();
let old_data = vec![Value::Null];
let lsn = wal.log_delete("test_table", 0, 1, old_data.clone(), 12345).unwrap();
assert_eq!(lsn, 0);
let recovered = wal.recover().unwrap();
let records = recovered.get(&0).unwrap();
assert_eq!(records.len(), 1);
assert!(matches!(records[0], WALRecord::Delete { .. }));
}
#[test]
fn test_wal_transaction_boundaries() {
let temp_dir = TempDir::new().unwrap();
let wal = WALManager::create(temp_dir.path(), 2).unwrap();
let lsn1 = wal.log_begin(0, 1, 1).unwrap();
assert_eq!(lsn1, 0);
let lsn2 = wal.log_insert("test_table", 0, 10, vec![Value::Null]).unwrap();
assert_eq!(lsn2, 1);
let lsn3 = wal.log_commit(0, 1, 100).unwrap();
assert_eq!(lsn3, 2);
let recovered = wal.recover().unwrap();
let records = recovered.get(&0).unwrap();
assert_eq!(records.len(), 3);
assert!(matches!(records[0], WALRecord::Begin { txn_id: 1, .. }));
assert!(matches!(records[1], WALRecord::Insert { row_id: 10, .. }));
assert!(matches!(records[2], WALRecord::Commit { txn_id: 1, .. }));
}
#[test]
fn test_wal_transaction_rollback() {
let temp_dir = TempDir::new().unwrap();
let wal = WALManager::create(temp_dir.path(), 2).unwrap();
wal.log_begin(0, 1, 1).unwrap();
wal.log_insert("test_table", 0, 10, vec![Value::Null]).unwrap();
wal.log_rollback(0, 1).unwrap();
let recovered = wal.recover().unwrap();
let records = recovered.get(&0).unwrap();
assert_eq!(records.len(), 3);
assert!(matches!(records[0], WALRecord::Begin { txn_id: 1, .. }));
assert!(matches!(records[1], WALRecord::Insert { row_id: 10, .. }));
assert!(matches!(records[2], WALRecord::Rollback { txn_id: 1 }));
}
#[test]
fn test_wal_complete_transaction_flow() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path();
{
let wal = WALManager::create(path, 2).unwrap();
wal.log_begin(0, 1, 2).unwrap();
wal.log_insert("test_table", 0, 100, vec![Value::Null]).unwrap();
wal.log_update("test_table", 0, 100, vec![Value::Null], vec![Value::Null]).unwrap();
wal.log_commit(0, 1, 1000).unwrap();
wal.log_begin(0, 2, 2).unwrap();
wal.log_insert("test_table", 0, 200, vec![Value::Null]).unwrap();
wal.log_rollback(0, 2).unwrap();
wal.log_begin(0, 3, 2).unwrap();
wal.log_delete("test_table", 0, 100, vec![Value::Null], 12345).unwrap();
wal.log_commit(0, 3, 2000).unwrap();
}
{
let wal = WALManager::open(path, 2).unwrap();
let recovered = wal.recover().unwrap();
let records = recovered.get(&0).unwrap();
assert_eq!(records.len(), 10);
let count_type = |records: &[WALRecord], pred: fn(&WALRecord) -> bool| -> usize {
records.iter().filter(|r| pred(r)).count()
};
assert_eq!(count_type(records, |r| matches!(r, WALRecord::Begin { .. })), 3);
assert_eq!(count_type(records, |r| matches!(r, WALRecord::Insert { .. })), 2);
assert_eq!(count_type(records, |r| matches!(r, WALRecord::Update { .. })), 1);
assert_eq!(count_type(records, |r| matches!(r, WALRecord::Delete { .. })), 1);
assert_eq!(count_type(records, |r| matches!(r, WALRecord::Commit { .. })), 2);
assert_eq!(count_type(records, |r| matches!(r, WALRecord::Rollback { .. })), 1);
}
}
#[test]
fn test_wal_batch_append() {
let temp_dir = TempDir::new().unwrap();
let wal = WALManager::create(temp_dir.path(), 2).unwrap();
let records = vec![
WALRecord::Begin { txn_id: 1, isolation_level: 2 },
WALRecord::Insert {
table_name: "test_table".to_string(),
row_id: 100,
partition: 0,
data: vec![Value::Timestamp(Timestamp::from_micros(42))]
},
WALRecord::Insert {
table_name: "test_table".to_string(),
row_id: 101,
partition: 0,
data: vec![Value::Timestamp(Timestamp::from_micros(43))]
},
WALRecord::Update {
table_name: "test_table".to_string(),
row_id: 100,
partition: 0,
old_data: vec![Value::Timestamp(Timestamp::from_micros(42))],
new_data: vec![Value::Timestamp(Timestamp::from_micros(100))],
},
WALRecord::Commit { txn_id: 1, commit_ts: 1000 },
];
let lsns = wal.batch_append(0, records).unwrap();
assert_eq!(lsns.len(), 5);
assert_eq!(lsns[0], 0);
assert_eq!(lsns[1], 1);
assert_eq!(lsns[2], 2);
assert_eq!(lsns[3], 3);
assert_eq!(lsns[4], 4);
let recovered = wal.recover().unwrap();
let records = recovered.get(&0).unwrap();
assert_eq!(records.len(), 5);
assert!(matches!(records[0], WALRecord::Begin { txn_id: 1, .. }));
assert!(matches!(records[1], WALRecord::Insert { row_id: 100, .. }));
assert!(matches!(records[2], WALRecord::Insert { row_id: 101, .. }));
assert!(matches!(records[3], WALRecord::Update { row_id: 100, .. }));
assert!(matches!(records[4], WALRecord::Commit { txn_id: 1, .. }));
}
#[test]
fn test_wal_batch_append_empty() {
let temp_dir = TempDir::new().unwrap();
let wal = WALManager::create(temp_dir.path(), 2).unwrap();
let lsns = wal.batch_append(0, vec![]).unwrap();
assert_eq!(lsns.len(), 0);
}
#[test]
fn test_wal_batch_append_multiple_transactions() {
let temp_dir = TempDir::new().unwrap();
let wal = WALManager::create(temp_dir.path(), 2).unwrap();
let records1 = vec![
WALRecord::Begin { txn_id: 1, isolation_level: 2 },
WALRecord::Insert { table_name: "test_table".to_string(), row_id: 100, partition: 0, data: vec![Value::Null] },
WALRecord::Commit { txn_id: 1, commit_ts: 1000 },
];
wal.batch_append(0, records1).unwrap();
let records2 = vec![
WALRecord::Begin { txn_id: 2, isolation_level: 2 },
WALRecord::Insert { table_name: "test_table".to_string(), row_id: 200, partition: 0, data: vec![Value::Null] },
WALRecord::Insert { table_name: "test_table".to_string(), row_id: 201, partition: 0, data: vec![Value::Null] },
WALRecord::Commit { txn_id: 2, commit_ts: 2000 },
];
wal.batch_append(0, records2).unwrap();
let records3 = vec![
WALRecord::Begin { txn_id: 3, isolation_level: 2 },
WALRecord::Delete { table_name: "test_table".to_string(), row_id: 100, partition: 0, old_data: vec![Value::Null], timestamp: 0 },
WALRecord::Rollback { txn_id: 3 },
];
wal.batch_append(0, records3).unwrap();
let recovered = wal.recover().unwrap();
let records = recovered.get(&0).unwrap();
assert_eq!(records.len(), 10);
let count_type = |records: &[WALRecord], pred: fn(&WALRecord) -> bool| -> usize {
records.iter().filter(|r| pred(r)).count()
};
assert_eq!(count_type(records, |r| matches!(r, WALRecord::Begin { .. })), 3);
assert_eq!(count_type(records, |r| matches!(r, WALRecord::Commit { .. })), 2);
assert_eq!(count_type(records, |r| matches!(r, WALRecord::Rollback { .. })), 1);
}
}