use byteorder::{LittleEndian, ReadBytesExt};
use parking_lot::Mutex;
use std::cell::Cell;
use std::collections::HashSet;
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use sochdb_core::{Result, SochDBError, WalRecordType};
const CACHE_VALIDITY_NS: u64 = 1_000_000;
thread_local! {
static TS_CACHE: Cell<(Instant, u64)> = Cell::new((Instant::now(), 0));
}
#[inline(always)]
pub fn cached_timestamp_us() -> u64 {
TS_CACHE.with(|cache| {
let (instant, ts) = cache.get();
let elapsed_ns = instant.elapsed().as_nanos() as u64;
if elapsed_ns < CACHE_VALIDITY_NS {
ts + elapsed_ns / 1000
} else {
let new_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as u64;
cache.set((Instant::now(), new_ts));
new_ts
}
})
}
const RECORD_HEADER_SIZE: usize = 4 + 1 + 8 + 8 + 4 + 4;
const CHECKSUM_SIZE: usize = 4;
const DEFAULT_TXN_BUFFER_CAPACITY: usize = 32 * 1024;
#[derive(Debug)]
pub struct TxnWalBuffer {
txn_id: u64,
buffer: Vec<u8>,
entry_count: usize,
}
impl TxnWalBuffer {
#[inline]
pub fn new(txn_id: u64) -> Self {
Self {
txn_id,
buffer: Vec::with_capacity(DEFAULT_TXN_BUFFER_CAPACITY),
entry_count: 0,
}
}
#[inline]
pub fn with_capacity(txn_id: u64, capacity: usize) -> Self {
Self {
txn_id,
buffer: Vec::with_capacity(capacity),
entry_count: 0,
}
}
#[inline]
pub fn append(&mut self, key: &[u8], value: &[u8]) {
let timestamp_us = cached_timestamp_us();
let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
let entry_start = self.buffer.len();
self.buffer.extend_from_slice(&[0u8; 4]);
let mut hasher = crc32fast::Hasher::new();
let record_type_byte = WalRecordType::Data as u8;
self.buffer.push(record_type_byte);
hasher.update(&[record_type_byte]);
let txn_bytes = self.txn_id.to_le_bytes();
self.buffer.extend_from_slice(&txn_bytes);
hasher.update(&txn_bytes);
let ts_bytes = timestamp_us.to_le_bytes();
self.buffer.extend_from_slice(&ts_bytes);
hasher.update(&ts_bytes);
let key_len_bytes = (key.len() as u32).to_le_bytes();
self.buffer.extend_from_slice(&key_len_bytes);
hasher.update(&key_len_bytes);
let val_len_bytes = (value.len() as u32).to_le_bytes();
self.buffer.extend_from_slice(&val_len_bytes);
hasher.update(&val_len_bytes);
self.buffer.extend_from_slice(key);
hasher.update(key);
self.buffer.extend_from_slice(value);
hasher.update(value);
self.buffer
.extend_from_slice(&hasher.finalize().to_le_bytes());
let content_len = (total_len - 4) as u32;
self.buffer[entry_start..entry_start + 4].copy_from_slice(&content_len.to_le_bytes());
self.entry_count += 1;
}
#[inline]
pub fn flush_to_wal(&self, wal: &TxnWal) -> Result<u64> {
wal.flush_buffer(self)
}
#[inline]
pub fn clear(&mut self) {
self.buffer.clear();
self.entry_count = 0;
}
#[inline]
pub fn entry_count(&self) -> usize {
self.entry_count
}
#[inline]
pub fn bytes_buffered(&self) -> usize {
self.buffer.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct TxnWalEntry {
pub record_type: WalRecordType,
pub txn_id: u64,
pub timestamp_us: u64,
pub key: Vec<u8>,
pub value: Vec<u8>,
}
impl TxnWalEntry {
pub fn data(txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
Self {
record_type: WalRecordType::Data,
txn_id,
timestamp_us: Self::now_us(),
key,
value,
}
}
pub fn txn_begin(txn_id: u64) -> Self {
Self {
record_type: WalRecordType::TxnBegin,
txn_id,
timestamp_us: Self::now_us(),
key: Vec::new(),
value: Vec::new(),
}
}
pub fn txn_commit(txn_id: u64) -> Self {
Self {
record_type: WalRecordType::TxnCommit,
txn_id,
timestamp_us: Self::now_us(),
key: Vec::new(),
value: Vec::new(),
}
}
pub fn txn_abort(txn_id: u64) -> Self {
Self {
record_type: WalRecordType::TxnAbort,
txn_id,
timestamp_us: Self::now_us(),
key: Vec::new(),
value: Vec::new(),
}
}
pub fn checkpoint(txn_id: u64) -> Self {
Self {
record_type: WalRecordType::Checkpoint,
txn_id,
timestamp_us: Self::now_us(),
key: Vec::new(),
value: Vec::new(),
}
}
pub fn schema_change(txn_id: u64, schema_data: Vec<u8>) -> Self {
Self {
record_type: WalRecordType::SchemaChange,
txn_id,
timestamp_us: Self::now_us(),
key: Vec::new(),
value: schema_data,
}
}
#[inline]
fn now_us() -> u64 {
cached_timestamp_us()
}
pub fn checksum(&self) -> u32 {
let mut hasher = crc32fast::Hasher::new();
hasher.update(&[self.record_type as u8]);
hasher.update(&self.txn_id.to_le_bytes());
hasher.update(&self.timestamp_us.to_le_bytes());
hasher.update(&(self.key.len() as u32).to_le_bytes());
hasher.update(&(self.value.len() as u32).to_le_bytes());
hasher.update(&self.key);
hasher.update(&self.value);
hasher.finalize()
}
pub fn to_bytes(&self) -> Vec<u8> {
let total_len = RECORD_HEADER_SIZE + self.key.len() + self.value.len() + CHECKSUM_SIZE;
let mut buf = Vec::with_capacity(total_len);
let mut hasher = crc32fast::Hasher::new();
let content_len = (total_len - 4) as u32;
buf.extend_from_slice(&content_len.to_le_bytes());
let record_type_byte = self.record_type as u8;
buf.push(record_type_byte);
hasher.update(&[record_type_byte]);
let txn_bytes = self.txn_id.to_le_bytes();
buf.extend_from_slice(&txn_bytes);
hasher.update(&txn_bytes);
let ts_bytes = self.timestamp_us.to_le_bytes();
buf.extend_from_slice(&ts_bytes);
hasher.update(&ts_bytes);
let key_len_bytes = (self.key.len() as u32).to_le_bytes();
buf.extend_from_slice(&key_len_bytes);
hasher.update(&key_len_bytes);
let val_len_bytes = (self.value.len() as u32).to_le_bytes();
buf.extend_from_slice(&val_len_bytes);
hasher.update(&val_len_bytes);
buf.extend_from_slice(&self.key);
hasher.update(&self.key);
buf.extend_from_slice(&self.value);
hasher.update(&self.value);
buf.extend_from_slice(&hasher.finalize().to_le_bytes());
buf
}
pub fn from_reader<R: Read>(reader: &mut R) -> Result<Self> {
let content_len = reader.read_u32::<LittleEndian>()?;
if content_len < (RECORD_HEADER_SIZE - 4 + CHECKSUM_SIZE) as u32 {
return Err(SochDBError::Corruption("WAL entry too short".into()));
}
let record_type_byte = reader.read_u8()?;
let record_type = WalRecordType::try_from(record_type_byte).map_err(|_| {
SochDBError::Corruption(format!("Invalid record type: {}", record_type_byte))
})?;
let txn_id = reader.read_u64::<LittleEndian>()?;
let timestamp_us = reader.read_u64::<LittleEndian>()?;
let key_len = reader.read_u32::<LittleEndian>()? as usize;
let value_len = reader.read_u32::<LittleEndian>()? as usize;
let mut key = vec![0u8; key_len];
reader.read_exact(&mut key)?;
let mut value = vec![0u8; value_len];
reader.read_exact(&mut value)?;
let stored_checksum = reader.read_u32::<LittleEndian>()?;
let entry = Self {
record_type,
txn_id,
timestamp_us,
key,
value,
};
if entry.checksum() != stored_checksum {
return Err(SochDBError::Corruption(format!(
"WAL checksum mismatch for txn_id {}: expected {}, got {}",
txn_id,
entry.checksum(),
stored_checksum
)));
}
Ok(entry)
}
}
pub struct TxnWal {
path: PathBuf,
writer: Mutex<BufWriter<File>>,
next_txn_id: AtomicU64,
sequence: AtomicU64,
bytes_since_sync: AtomicU64,
cached_timestamp_us: AtomicU64,
}
impl TxnWal {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_path_buf();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = OpenOptions::new()
.create(true)
.append(true)
.read(true)
.open(&path)?;
let now_us = cached_timestamp_us();
let wal = Self {
path,
writer: Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
next_txn_id: AtomicU64::new(1),
sequence: AtomicU64::new(0),
bytes_since_sync: AtomicU64::new(0),
cached_timestamp_us: AtomicU64::new(now_us),
};
wal.recover_state()?;
Ok(wal)
}
fn recover_state(&self) -> Result<()> {
let file = File::open(&self.path)?;
let mut reader = BufReader::new(file);
let mut max_txn_id: u64 = 0;
let mut count: u64 = 0;
loop {
match TxnWalEntry::from_reader(&mut reader) {
Ok(entry) => {
if entry.txn_id > max_txn_id {
max_txn_id = entry.txn_id;
}
count += 1;
}
Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(_) => {
break;
}
}
}
self.next_txn_id.store(max_txn_id + 1, Ordering::SeqCst);
self.sequence.store(count, Ordering::SeqCst);
Ok(())
}
#[inline]
fn get_cached_timestamp(&self) -> u64 {
let cached = self.cached_timestamp_us.load(Ordering::Relaxed);
let seq = self.sequence.load(Ordering::Relaxed);
if seq & 0x3FF == 0 {
let now_us = cached_timestamp_us();
self.cached_timestamp_us.store(now_us, Ordering::Relaxed);
return now_us;
}
cached
}
pub fn append(&self, entry: &TxnWalEntry) -> Result<u64> {
let bytes = entry.to_bytes();
let mut writer = self.writer.lock();
writer.write_all(&bytes)?;
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
self.bytes_since_sync
.fetch_add(bytes.len() as u64, Ordering::Relaxed);
Ok(seq)
}
#[inline]
pub fn append_no_flush(&self, entry: &TxnWalEntry) -> Result<u64> {
let bytes = entry.to_bytes();
let mut writer = self.writer.lock();
writer.write_all(&bytes)?;
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
self.bytes_since_sync
.fetch_add(bytes.len() as u64, Ordering::Relaxed);
Ok(seq)
}
#[inline]
pub fn write_no_flush(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
let entry = TxnWalEntry::data(txn_id, key, value);
self.append_no_flush(&entry)
}
#[inline]
pub fn write_no_flush_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<u64> {
let timestamp_us = self.get_cached_timestamp();
let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
let mut hasher = crc32fast::Hasher::new();
let mut writer = self.writer.lock();
let content_len = (total_len - 4) as u32;
writer.write_all(&content_len.to_le_bytes())?;
let record_type_byte = WalRecordType::Data as u8;
writer.write_all(&[record_type_byte])?;
hasher.update(&[record_type_byte]);
let txn_bytes = txn_id.to_le_bytes();
writer.write_all(&txn_bytes)?;
hasher.update(&txn_bytes);
let ts_bytes = timestamp_us.to_le_bytes();
writer.write_all(&ts_bytes)?;
hasher.update(&ts_bytes);
let key_len_bytes = (key.len() as u32).to_le_bytes();
writer.write_all(&key_len_bytes)?;
hasher.update(&key_len_bytes);
let val_len_bytes = (value.len() as u32).to_le_bytes();
writer.write_all(&val_len_bytes)?;
hasher.update(&val_len_bytes);
writer.write_all(key)?;
hasher.update(key);
writer.write_all(value)?;
hasher.update(value);
writer.write_all(&hasher.finalize().to_le_bytes())?;
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
self.bytes_since_sync
.fetch_add(total_len as u64, Ordering::Relaxed);
Ok(seq)
}
pub fn flush(&self) -> Result<()> {
let mut writer = self.writer.lock();
writer.flush()?;
Ok(())
}
pub fn append_sync(&self, entry: &TxnWalEntry) -> Result<u64> {
let seq = self.append(entry)?;
self.sync()?;
Ok(seq)
}
pub fn sync(&self) -> Result<()> {
let writer = self.writer.lock();
writer.get_ref().sync_all()?;
self.bytes_since_sync.store(0, Ordering::Relaxed);
Ok(())
}
#[inline]
pub fn flush_buffer(&self, buffer: &TxnWalBuffer) -> Result<u64> {
if buffer.is_empty() {
return Ok(0);
}
let mut writer = self.writer.lock();
writer.write_all(&buffer.buffer)?;
let seq = self
.sequence
.fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
self.bytes_since_sync
.fetch_add(buffer.buffer.len() as u64, Ordering::Relaxed);
Ok(seq)
}
pub fn size_bytes(&self) -> u64 {
std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0)
}
pub fn alloc_txn_id(&self) -> u64 {
self.next_txn_id.fetch_add(1, Ordering::SeqCst)
}
pub fn begin_transaction(&self) -> Result<u64> {
let txn_id = self.alloc_txn_id();
let entry = TxnWalEntry::txn_begin(txn_id);
self.append(&entry)?;
Ok(txn_id)
}
pub fn commit_transaction(&self, txn_id: u64) -> Result<()> {
self.flush()?;
let entry = TxnWalEntry::txn_commit(txn_id);
self.append_sync(&entry)?;
Ok(())
}
pub fn abort_transaction(&self, txn_id: u64) -> Result<()> {
let entry = TxnWalEntry::txn_abort(txn_id);
self.append(&entry)?;
Ok(())
}
pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
let entry = TxnWalEntry::data(txn_id, key, value);
self.append(&entry)
}
#[allow(clippy::type_complexity)]
pub fn replay_for_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, usize)> {
let file = File::open(&self.path)?;
let mut reader = BufReader::new(file);
let mut committed_txns: HashSet<u64> = HashSet::new();
let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
std::collections::HashMap::new();
loop {
match TxnWalEntry::from_reader(&mut reader) {
Ok(entry) => match entry.record_type {
WalRecordType::TxnBegin => {
pending_writes.insert(entry.txn_id, Vec::new());
}
WalRecordType::Data => {
if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
writes.push((entry.key, entry.value));
}
}
WalRecordType::TxnCommit => {
committed_txns.insert(entry.txn_id);
}
WalRecordType::TxnAbort => {
pending_writes.remove(&entry.txn_id);
}
_ => {}
},
Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(_) => {
break;
}
}
}
let mut result = Vec::new();
let mut txn_count = 0;
for (txn_id, writes) in pending_writes {
if committed_txns.contains(&txn_id) {
result.extend(writes);
txn_count += 1;
}
}
Ok((result, txn_count))
}
pub fn replay<F>(&self, mut callback: F) -> Result<u64>
where
F: FnMut(TxnWalEntry) -> Result<()>,
{
let file = File::open(&self.path)?;
let mut reader = BufReader::new(file);
let mut count = 0u64;
loop {
match TxnWalEntry::from_reader(&mut reader) {
Ok(entry) => {
callback(entry)?;
count += 1;
}
Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(e) => {
eprintln!("WAL replay warning: {:?}", e);
break;
}
}
}
Ok(count)
}
pub fn truncate(&self) -> Result<()> {
let writer = self.writer.lock();
let file = writer.get_ref();
file.set_len(0)?;
file.sync_all()?;
self.sequence.store(0, Ordering::SeqCst);
self.bytes_since_sync.store(0, Ordering::Relaxed);
Ok(())
}
pub fn write_checkpoint(&self) -> Result<u64> {
let entry = TxnWalEntry::checkpoint(0);
self.append_sync(&entry)
}
pub fn append_clr(
&self,
txn_id: u64,
_original_lsn: u64,
undo_next_lsn: Option<u64>,
undo_data: &[u8],
) -> Result<u64> {
let key = undo_next_lsn.unwrap_or(0).to_le_bytes().to_vec();
let entry = TxnWalEntry {
record_type: WalRecordType::CompensationLogRecord,
txn_id,
timestamp_us: TxnWalEntry::now_us(),
key, value: undo_data.to_vec(),
};
self.append(&entry)
}
pub fn write_checkpoint_with_data(&self, checkpoint_data: &[u8]) -> Result<u64> {
let entry = TxnWalEntry {
record_type: WalRecordType::Checkpoint,
txn_id: 0,
timestamp_us: TxnWalEntry::now_us(),
key: Vec::new(),
value: checkpoint_data.to_vec(),
};
self.append_sync(&entry)
}
pub fn write_checkpoint_end(&self, checkpoint_data: &[u8]) -> Result<u64> {
let entry = TxnWalEntry {
record_type: WalRecordType::CheckpointEnd,
txn_id: 0,
timestamp_us: TxnWalEntry::now_us(),
key: Vec::new(),
value: checkpoint_data.to_vec(),
};
self.append_sync(&entry)
}
pub fn sequence(&self) -> u64 {
self.sequence.load(Ordering::SeqCst)
}
pub fn bytes_since_sync(&self) -> u64 {
self.bytes_since_sync.load(Ordering::Relaxed)
}
pub fn path(&self) -> &Path {
&self.path
}
}
#[derive(Debug, Clone, Default)]
pub struct TxnWalStats {
pub entries_written: u64,
pub bytes_since_sync: u64,
pub next_txn_id: u64,
}
#[allow(dead_code)]
pub struct ShardedWal {
shards: Vec<parking_lot::Mutex<WalShard>>,
num_shards: usize,
central_writer: parking_lot::Mutex<BufWriter<File>>,
next_txn_id: AtomicU64,
sequence: AtomicU64,
path: PathBuf,
}
struct WalShard {
buffer: Vec<u8>,
entry_count: usize,
}
impl WalShard {
fn new() -> Self {
Self {
buffer: Vec::with_capacity(64 * 1024), entry_count: 0,
}
}
fn append(&mut self, entry: &TxnWalEntry) {
let bytes = entry.to_bytes();
self.buffer.extend_from_slice(&bytes);
self.entry_count += 1;
}
fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
fn drain(&mut self) -> Vec<u8> {
self.entry_count = 0;
std::mem::take(&mut self.buffer)
}
}
impl ShardedWal {
pub fn new<P: AsRef<Path>>(path: P, num_shards: usize) -> Result<Self> {
let path = path.as_ref().to_path_buf();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.read(true)
.open(&path)?;
let num_shards = num_shards.next_power_of_two();
let shards: Vec<_> = (0..num_shards)
.map(|_| parking_lot::Mutex::new(WalShard::new()))
.collect();
Ok(Self {
shards,
num_shards,
central_writer: parking_lot::Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
next_txn_id: AtomicU64::new(1),
sequence: AtomicU64::new(0),
path,
})
}
#[inline]
fn shard_idx(&self, txn_id: u64) -> usize {
(txn_id as usize) & (self.num_shards - 1)
}
pub fn append(&self, entry: &TxnWalEntry) -> u64 {
let shard_idx = self.shard_idx(entry.txn_id);
let mut shard = self.shards[shard_idx].lock();
shard.append(entry);
self.sequence.fetch_add(1, Ordering::SeqCst)
}
pub fn alloc_txn_id(&self) -> u64 {
self.next_txn_id.fetch_add(1, Ordering::SeqCst)
}
pub fn flush(&self) -> Result<()> {
let mut central = self.central_writer.lock();
for shard in &self.shards {
let mut shard_guard = shard.lock();
if !shard_guard.is_empty() {
let data = shard_guard.drain();
central.write_all(&data)?;
}
}
central.flush()?;
Ok(())
}
pub fn sync(&self) -> Result<()> {
self.flush()?;
let central = self.central_writer.lock();
central.get_ref().sync_all()?;
Ok(())
}
pub fn begin_transaction(&self) -> Result<u64> {
let txn_id = self.alloc_txn_id();
let entry = TxnWalEntry::txn_begin(txn_id);
self.append(&entry);
Ok(txn_id)
}
pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
let entry = TxnWalEntry::data(txn_id, key, value);
Ok(self.append(&entry))
}
pub fn commit_transaction(&self, txn_id: u64) -> Result<u64> {
let entry = TxnWalEntry::txn_commit(txn_id);
let seq = self.append(&entry);
self.sync()?; Ok(seq)
}
pub fn stats(&self) -> ShardedWalStats {
let mut shard_entry_counts = Vec::with_capacity(self.num_shards);
for shard in &self.shards {
shard_entry_counts.push(shard.lock().entry_count);
}
ShardedWalStats {
num_shards: self.num_shards,
total_entries: self.sequence.load(Ordering::SeqCst),
next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
shard_entry_counts,
}
}
}
#[derive(Debug, Clone)]
pub struct ShardedWalStats {
pub num_shards: usize,
pub total_entries: u64,
pub next_txn_id: u64,
pub shard_entry_counts: Vec<usize>,
}
#[derive(Debug, Clone, Default)]
pub struct CrashRecoveryStats {
pub total_records: u64,
pub committed_txns: u64,
pub rolled_back_txns: u64,
pub aborted_txns: u64,
pub recovered_writes: u64,
pub torn_records: u64,
pub bytes_read: u64,
pub recovery_duration_us: u64,
pub max_txn_id: u64,
}
impl TxnWal {
pub fn stats(&self) -> TxnWalStats {
TxnWalStats {
entries_written: self.sequence.load(Ordering::SeqCst),
bytes_since_sync: self.bytes_since_sync.load(Ordering::Relaxed),
next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
}
}
#[allow(clippy::type_complexity)]
pub fn crash_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, CrashRecoveryStats)> {
let start_time = std::time::Instant::now();
let file = File::open(&self.path)?;
let file_size = file.metadata()?.len();
let mut reader = BufReader::new(file);
let mut stats = CrashRecoveryStats {
bytes_read: file_size,
..Default::default()
};
let mut committed_txns: HashSet<u64> = HashSet::new();
let mut aborted_txns: HashSet<u64> = HashSet::new();
let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
std::collections::HashMap::new();
let mut all_txns: HashSet<u64> = HashSet::new();
loop {
match TxnWalEntry::from_reader(&mut reader) {
Ok(entry) => {
stats.total_records += 1;
if entry.txn_id > stats.max_txn_id {
stats.max_txn_id = entry.txn_id;
}
match entry.record_type {
WalRecordType::TxnBegin => {
pending_writes.insert(entry.txn_id, Vec::new());
all_txns.insert(entry.txn_id);
}
WalRecordType::Data => {
if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
writes.push((entry.key, entry.value));
}
}
WalRecordType::TxnCommit => {
committed_txns.insert(entry.txn_id);
}
WalRecordType::TxnAbort => {
pending_writes.remove(&entry.txn_id);
aborted_txns.insert(entry.txn_id);
}
_ => {}
}
}
Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(_) => {
stats.torn_records += 1;
break;
}
}
}
let mut result = Vec::new();
for (txn_id, writes) in &pending_writes {
if committed_txns.contains(txn_id) {
stats.committed_txns += 1;
stats.recovered_writes += writes.len() as u64;
result.extend(writes.clone());
}
}
stats.aborted_txns = aborted_txns.len() as u64;
stats.rolled_back_txns = all_txns.len() as u64 - stats.committed_txns - stats.aborted_txns;
stats.recovery_duration_us = start_time.elapsed().as_micros() as u64;
Ok((result, stats))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_wal_entry_roundtrip() {
let entry = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
let bytes = entry.to_bytes();
let mut cursor = std::io::Cursor::new(bytes);
let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
assert_eq!(recovered.record_type, WalRecordType::Data);
assert_eq!(recovered.txn_id, 42);
assert_eq!(recovered.key, b"key");
assert_eq!(recovered.value, b"value");
}
#[test]
fn test_wal_append_and_replay() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
{
let wal = TxnWal::new(&wal_path).unwrap();
let txn_id = wal.begin_transaction().unwrap();
wal.write(txn_id, b"k1".to_vec(), b"v1".to_vec()).unwrap();
wal.write(txn_id, b"k2".to_vec(), b"v2".to_vec()).unwrap();
wal.commit_transaction(txn_id).unwrap();
}
{
let wal = TxnWal::new(&wal_path).unwrap();
let (writes, txn_count) = wal.replay_for_recovery().unwrap();
assert_eq!(txn_count, 1);
assert_eq!(writes.len(), 2);
assert_eq!(writes[0], (b"k1".to_vec(), b"v1".to_vec()));
assert_eq!(writes[1], (b"k2".to_vec(), b"v2".to_vec()));
}
}
#[test]
fn test_uncommitted_transaction_rollback() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
{
let wal = TxnWal::new(&wal_path).unwrap();
let txn1 = wal.begin_transaction().unwrap();
wal.write(txn1, b"committed".to_vec(), b"yes".to_vec())
.unwrap();
wal.commit_transaction(txn1).unwrap();
let txn2 = wal.begin_transaction().unwrap();
wal.write(txn2, b"uncommitted".to_vec(), b"no".to_vec())
.unwrap();
}
{
let wal = TxnWal::new(&wal_path).unwrap();
let (writes, txn_count) = wal.replay_for_recovery().unwrap();
assert_eq!(txn_count, 1); assert_eq!(writes.len(), 1);
assert_eq!(writes[0], (b"committed".to_vec(), b"yes".to_vec()));
}
}
#[test]
fn test_aborted_transaction() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
{
let wal = TxnWal::new(&wal_path).unwrap();
let txn = wal.begin_transaction().unwrap();
wal.write(txn, b"aborted".to_vec(), b"data".to_vec())
.unwrap();
wal.abort_transaction(txn).unwrap();
}
{
let wal = TxnWal::new(&wal_path).unwrap();
let (writes, txn_count) = wal.replay_for_recovery().unwrap();
assert_eq!(txn_count, 0);
assert!(writes.is_empty());
}
}
#[test]
fn test_checksum_validation() {
let entry = TxnWalEntry::data(1, b"key".to_vec(), b"value".to_vec());
let mut bytes = entry.to_bytes();
let len = bytes.len();
bytes[len - 1] ^= 0xFF;
let mut cursor = std::io::Cursor::new(bytes);
let result = TxnWalEntry::from_reader(&mut cursor);
assert!(result.is_err());
}
#[test]
fn test_crash_recovery_with_stats() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
{
let wal = TxnWal::new(&wal_path).unwrap();
let txn1 = wal.begin_transaction().unwrap();
wal.write(txn1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
wal.write(txn1, b"k2".to_vec(), b"v2".to_vec()).unwrap();
wal.commit_transaction(txn1).unwrap();
let txn2 = wal.begin_transaction().unwrap();
wal.write(txn2, b"aborted_key".to_vec(), b"aborted_val".to_vec())
.unwrap();
wal.abort_transaction(txn2).unwrap();
let txn3 = wal.begin_transaction().unwrap();
wal.write(txn3, b"k3".to_vec(), b"v3".to_vec()).unwrap();
wal.commit_transaction(txn3).unwrap();
let txn4 = wal.begin_transaction().unwrap();
wal.write(txn4, b"uncommitted".to_vec(), b"data".to_vec())
.unwrap();
}
{
let wal = TxnWal::new(&wal_path).unwrap();
let (writes, stats) = wal.crash_recovery().unwrap();
assert_eq!(writes.len(), 3);
assert_eq!(stats.committed_txns, 2);
assert_eq!(stats.aborted_txns, 1);
assert_eq!(stats.rolled_back_txns, 1); assert_eq!(stats.recovered_writes, 3);
assert!(stats.recovery_duration_us > 0);
}
}
#[test]
fn test_torn_write_detection() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
{
let wal = TxnWal::new(&wal_path).unwrap();
let txn = wal.begin_transaction().unwrap();
wal.write(txn, b"key".to_vec(), b"value".to_vec()).unwrap();
wal.commit_transaction(txn).unwrap();
}
{
use std::io::Write;
let mut file = std::fs::OpenOptions::new()
.append(true)
.open(&wal_path)
.unwrap();
file.write_all(&[0x10, 0x00, 0x00, 0x00, 0xFF, 0xFF])
.unwrap();
}
{
let wal = TxnWal::new(&wal_path).unwrap();
let (writes, stats) = wal.crash_recovery().unwrap();
assert_eq!(writes.len(), 1);
assert_eq!(stats.committed_txns, 1);
assert_eq!(stats.torn_records, 1);
}
}
#[test]
fn test_crc32_determinism() {
let mut entry1 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
entry1.timestamp_us = 12345;
let mut entry2 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
entry2.timestamp_us = 12345;
assert_eq!(entry1.checksum(), entry2.checksum());
let mut entry3 = TxnWalEntry::data(42, b"key".to_vec(), b"different".to_vec());
entry3.timestamp_us = 12345;
assert_ne!(entry1.checksum(), entry3.checksum());
let bytes = entry1.to_bytes();
let mut cursor = std::io::Cursor::new(bytes);
let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
assert_eq!(recovered.checksum(), entry1.checksum());
}
}