use std::collections::{HashMap, VecDeque};
use std::fs::{File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
pub type Lsn = u64;
pub type TxnId = u64;
pub type PageId = u64;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum WalRecordType {
Update = 1,
Commit = 2,
Abort = 3,
Clr = 4,
Checkpoint = 5,
Begin = 6,
End = 7,
}
impl WalRecordType {
pub fn to_canonical(self) -> sochdb_core::txn::WalRecordType {
use sochdb_core::txn::WalRecordType as C;
match self {
Self::Update => C::PageUpdate,
Self::Commit => C::TxnCommit,
Self::Abort => C::TxnAbort,
Self::Clr => C::CompensationLogRecord,
Self::Checkpoint => C::Checkpoint,
Self::Begin => C::TxnBegin,
Self::End => C::TxnEnd,
}
}
pub fn from_canonical(rt: sochdb_core::txn::WalRecordType) -> Option<Self> {
use sochdb_core::txn::WalRecordType as C;
match rt {
C::PageUpdate => Some(Self::Update),
C::TxnCommit => Some(Self::Commit),
C::TxnAbort => Some(Self::Abort),
C::CompensationLogRecord => Some(Self::Clr),
C::Checkpoint => Some(Self::Checkpoint),
C::TxnBegin => Some(Self::Begin),
C::TxnEnd => Some(Self::End),
_ => None,
}
}
}
impl TryFrom<u8> for WalRecordType {
type Error = ();
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
1 => Ok(WalRecordType::Update),
2 => Ok(WalRecordType::Commit),
3 => Ok(WalRecordType::Abort),
4 => Ok(WalRecordType::Clr),
5 => Ok(WalRecordType::Checkpoint),
6 => Ok(WalRecordType::Begin),
7 => Ok(WalRecordType::End),
_ => Err(()),
}
}
}
#[derive(Debug, Clone)]
#[repr(C, packed)]
pub struct WalRecordHeader {
pub lsn: u64,
pub txn_id: u64,
pub record_type: u8,
pub prev_lsn: u64,
pub page_id: u64,
pub offset: u16,
pub data_length: u32,
pub before_length: u16,
_reserved: [u8; 5],
}
impl WalRecordHeader {
pub const SIZE: usize = 48;
pub fn serialize(&self) -> [u8; Self::SIZE] {
let mut buf = [0u8; Self::SIZE];
buf[0..8].copy_from_slice(&self.lsn.to_le_bytes());
buf[8..16].copy_from_slice(&self.txn_id.to_le_bytes());
buf[16] = self.record_type;
buf[17..25].copy_from_slice(&self.prev_lsn.to_le_bytes());
buf[25..33].copy_from_slice(&self.page_id.to_le_bytes());
buf[33..35].copy_from_slice(&self.offset.to_le_bytes());
buf[35..39].copy_from_slice(&self.data_length.to_le_bytes());
buf[39..41].copy_from_slice(&self.before_length.to_le_bytes());
buf
}
pub fn deserialize(buf: &[u8]) -> Option<Self> {
if buf.len() < Self::SIZE {
return None;
}
Some(Self {
lsn: u64::from_le_bytes(buf[0..8].try_into().ok()?),
txn_id: u64::from_le_bytes(buf[8..16].try_into().ok()?),
record_type: buf[16],
prev_lsn: u64::from_le_bytes(buf[17..25].try_into().ok()?),
page_id: u64::from_le_bytes(buf[25..33].try_into().ok()?),
offset: u16::from_le_bytes(buf[33..35].try_into().ok()?),
data_length: u32::from_le_bytes(buf[35..39].try_into().ok()?),
before_length: u16::from_le_bytes(buf[39..41].try_into().ok()?),
_reserved: [0; 5],
})
}
}
#[derive(Debug, Clone)]
pub struct WalRecord {
pub header: WalRecordHeader,
pub before_image: Vec<u8>,
pub after_image: Vec<u8>,
}
impl WalRecord {
pub fn update(
lsn: Lsn,
txn_id: TxnId,
prev_lsn: Lsn,
page_id: PageId,
offset: u16,
before: Vec<u8>,
after: Vec<u8>,
) -> Self {
Self {
header: WalRecordHeader {
lsn,
txn_id,
record_type: WalRecordType::Update as u8,
prev_lsn,
page_id,
offset,
data_length: (before.len() + after.len()) as u32,
before_length: before.len() as u16,
_reserved: [0; 5],
},
before_image: before,
after_image: after,
}
}
pub fn commit(lsn: Lsn, txn_id: TxnId, prev_lsn: Lsn) -> Self {
Self {
header: WalRecordHeader {
lsn,
txn_id,
record_type: WalRecordType::Commit as u8,
prev_lsn,
page_id: 0,
offset: 0,
data_length: 0,
before_length: 0,
_reserved: [0; 5],
},
before_image: Vec::new(),
after_image: Vec::new(),
}
}
pub fn begin(lsn: Lsn, txn_id: TxnId) -> Self {
Self {
header: WalRecordHeader {
lsn,
txn_id,
record_type: WalRecordType::Begin as u8,
prev_lsn: 0,
page_id: 0,
offset: 0,
data_length: 0,
before_length: 0,
_reserved: [0; 5],
},
before_image: Vec::new(),
after_image: Vec::new(),
}
}
pub fn abort(lsn: Lsn, txn_id: TxnId, prev_lsn: Lsn) -> Self {
Self {
header: WalRecordHeader {
lsn,
txn_id,
record_type: WalRecordType::Abort as u8,
prev_lsn,
page_id: 0,
offset: 0,
data_length: 0,
before_length: 0,
_reserved: [0; 5],
},
before_image: Vec::new(),
after_image: Vec::new(),
}
}
pub fn clr(
lsn: Lsn,
txn_id: TxnId,
prev_lsn: Lsn,
page_id: PageId,
offset: u16,
undo_next_lsn: Lsn, ) -> Self {
Self {
header: WalRecordHeader {
lsn,
txn_id,
record_type: WalRecordType::Clr as u8,
prev_lsn,
page_id,
offset,
data_length: 8,
before_length: 0,
_reserved: [0; 5],
},
before_image: Vec::new(),
after_image: undo_next_lsn.to_le_bytes().to_vec(),
}
}
pub fn serialize(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(
WalRecordHeader::SIZE + self.before_image.len() + self.after_image.len() + 4,
);
buf.extend_from_slice(&self.header.serialize());
buf.extend_from_slice(&self.before_image);
buf.extend_from_slice(&self.after_image);
let crc = crc32_of(&buf);
buf.extend_from_slice(&crc.to_le_bytes());
buf
}
pub fn deserialize(buf: &[u8]) -> Option<Self> {
if buf.len() < WalRecordHeader::SIZE + 4 {
return None;
}
let header = WalRecordHeader::deserialize(buf)?;
let data_start = WalRecordHeader::SIZE;
let data_end = data_start + header.data_length as usize;
if buf.len() < data_end + 4 {
return None;
}
let expected_crc = u32::from_le_bytes(buf[data_end..data_end + 4].try_into().ok()?);
let actual_crc = crc32_of(&buf[..data_end]);
if expected_crc != actual_crc {
return None;
}
let before_end = data_start + header.before_length as usize;
Some(Self {
header,
before_image: buf[data_start..before_end].to_vec(),
after_image: buf[before_end..data_end].to_vec(),
})
}
pub fn size(&self) -> usize {
WalRecordHeader::SIZE + self.before_image.len() + self.after_image.len() + 4
}
}
fn crc32_of(data: &[u8]) -> u32 {
let mut crc: u32 = 0xFFFFFFFF;
for byte in data {
crc ^= *byte as u32;
for _ in 0..8 {
if crc & 1 != 0 {
crc = (crc >> 1) ^ 0xEDB88320;
} else {
crc >>= 1;
}
}
}
!crc
}
#[derive(Debug)]
pub struct GroupCommitBuffer {
records: Vec<WalRecord>,
bytes: usize,
waiters: Vec<(TxnId, std::sync::mpsc::Sender<Result<Lsn, WalError>>)>,
last_flush: Instant,
}
impl GroupCommitBuffer {
fn new() -> Self {
Self {
records: Vec::with_capacity(128),
bytes: 0,
waiters: Vec::new(),
last_flush: Instant::now(),
}
}
fn add_record(&mut self, record: WalRecord) {
self.bytes += record.size();
self.records.push(record);
}
fn add_waiter(
&mut self,
txn_id: TxnId,
sender: std::sync::mpsc::Sender<Result<Lsn, WalError>>,
) {
self.waiters.push((txn_id, sender));
}
fn should_flush(&self, config: &WalConfig) -> bool {
self.bytes >= config.buffer_size
|| self.records.len() >= config.max_batch_size
|| self.last_flush.elapsed() >= config.flush_interval
}
fn clear(&mut self) {
self.records.clear();
self.bytes = 0;
self.waiters.clear();
self.last_flush = Instant::now();
}
}
#[derive(Debug, Clone)]
pub struct WalConfig {
pub buffer_size: usize,
pub max_batch_size: usize,
pub flush_interval: Duration,
pub sync_mode: SyncMode,
pub checkpoint_interval: u64,
}
impl Default for WalConfig {
fn default() -> Self {
Self {
buffer_size: 1024 * 1024, max_batch_size: 1000,
flush_interval: Duration::from_millis(10),
sync_mode: SyncMode::Fsync,
checkpoint_interval: 100_000,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncMode {
None,
Fsync,
FdataSync,
}
#[derive(Debug, Clone)]
pub enum WalError {
Io(String),
Corruption(String),
InvalidRecord,
BufferFull,
}
impl std::fmt::Display for WalError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WalError::Io(e) => write!(f, "WAL I/O error: {}", e),
WalError::Corruption(e) => write!(f, "WAL corruption: {}", e),
WalError::InvalidRecord => write!(f, "Invalid WAL record"),
WalError::BufferFull => write!(f, "WAL buffer full"),
}
}
}
impl std::error::Error for WalError {}
#[allow(dead_code)]
pub struct WriteAheadLog {
dir: PathBuf,
file: Mutex<File>,
file_number: AtomicU64,
lsn: AtomicU64,
buffer: Mutex<GroupCommitBuffer>,
flush_cv: Condvar,
config: WalConfig,
stats: WalStats,
running: AtomicBool,
flushed_lsn: AtomicU64,
txn_prev_lsn: RwLock<HashMap<TxnId, Lsn>>,
}
#[derive(Debug, Default)]
pub struct WalStats {
pub records_written: AtomicU64,
pub bytes_written: AtomicU64,
pub flushes: AtomicU64,
pub total_batch_records: AtomicU64,
pub total_flush_time_us: AtomicU64,
}
impl WalStats {
pub fn avg_batch_size(&self) -> f64 {
let flushes = self.flushes.load(Ordering::Relaxed);
if flushes == 0 {
return 0.0;
}
self.total_batch_records.load(Ordering::Relaxed) as f64 / flushes as f64
}
pub fn avg_flush_time_us(&self) -> f64 {
let flushes = self.flushes.load(Ordering::Relaxed);
if flushes == 0 {
return 0.0;
}
self.total_flush_time_us.load(Ordering::Relaxed) as f64 / flushes as f64
}
}
impl WriteAheadLog {
pub fn open(dir: impl AsRef<Path>, config: WalConfig) -> Result<Self, WalError> {
let dir = dir.as_ref().to_path_buf();
std::fs::create_dir_all(&dir).map_err(|e| WalError::Io(e.to_string()))?;
let file_number = Self::find_latest_file(&dir).unwrap_or(0);
let file_path = dir.join(format!("wal_{:08}.log", file_number));
let file = OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(&file_path)
.map_err(|e| WalError::Io(e.to_string()))?;
let lsn = Self::find_last_lsn(&file_path).unwrap_or(0);
Ok(Self {
dir,
file: Mutex::new(file),
file_number: AtomicU64::new(file_number),
lsn: AtomicU64::new(lsn),
buffer: Mutex::new(GroupCommitBuffer::new()),
flush_cv: Condvar::new(),
config,
stats: WalStats::default(),
running: AtomicBool::new(true),
flushed_lsn: AtomicU64::new(lsn),
txn_prev_lsn: RwLock::new(HashMap::new()),
})
}
fn find_latest_file(dir: &Path) -> Option<u64> {
std::fs::read_dir(dir)
.ok()?
.filter_map(|e| e.ok())
.filter_map(|e| {
let name = e.file_name().to_string_lossy().to_string();
if name.starts_with("wal_") && name.ends_with(".log") {
name[4..12].parse::<u64>().ok()
} else {
None
}
})
.max()
}
fn find_last_lsn(path: &Path) -> Option<Lsn> {
let mut file = File::open(path).ok()?;
let mut lsn = 0u64;
let mut buf = [0u8; WalRecordHeader::SIZE];
while let Ok(n) = file.read(&mut buf) {
if n < WalRecordHeader::SIZE {
break;
}
if let Some(header) = WalRecordHeader::deserialize(&buf) {
lsn = header.lsn;
let skip = header.data_length as i64 + 4; if file.seek(SeekFrom::Current(skip)).is_err() {
break;
}
} else {
break;
}
}
Some(lsn)
}
pub fn next_lsn(&self) -> Lsn {
self.lsn.fetch_add(1, Ordering::SeqCst) + 1
}
pub fn current_lsn(&self) -> Lsn {
self.lsn.load(Ordering::SeqCst)
}
pub fn flushed_lsn(&self) -> Lsn {
self.flushed_lsn.load(Ordering::Acquire)
}
pub fn begin_txn(&self, txn_id: TxnId) -> Result<Lsn, WalError> {
let lsn = self.next_lsn();
let record = WalRecord::begin(lsn, txn_id);
{
let mut prev_lsn = self.txn_prev_lsn.write().unwrap_or_else(|e| e.into_inner());
prev_lsn.insert(txn_id, lsn);
}
self.append(record)?;
Ok(lsn)
}
pub fn log_update(
&self,
txn_id: TxnId,
page_id: PageId,
offset: u16,
before: Vec<u8>,
after: Vec<u8>,
) -> Result<Lsn, WalError> {
let lsn = self.next_lsn();
let prev_lsn = {
let prev_lsn = self.txn_prev_lsn.read().unwrap_or_else(|e| e.into_inner());
prev_lsn.get(&txn_id).copied().unwrap_or(0)
};
let record = WalRecord::update(lsn, txn_id, prev_lsn, page_id, offset, before, after);
{
let mut prev_lsn_map = self.txn_prev_lsn.write().unwrap_or_else(|e| e.into_inner());
prev_lsn_map.insert(txn_id, lsn);
}
self.append(record)?;
Ok(lsn)
}
pub fn commit_txn(&self, txn_id: TxnId) -> Result<Lsn, WalError> {
let lsn = self.next_lsn();
let prev_lsn = {
let prev_lsn = self.txn_prev_lsn.read().unwrap_or_else(|e| e.into_inner());
prev_lsn.get(&txn_id).copied().unwrap_or(0)
};
let record = WalRecord::commit(lsn, txn_id, prev_lsn);
let (tx, rx) = std::sync::mpsc::channel();
{
let mut buffer = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
buffer.add_record(record);
buffer.add_waiter(txn_id, tx);
if buffer.should_flush(&self.config) {
self.flush_buffer_locked(&mut buffer)?;
} else {
let remaining = self
.config
.flush_interval
.saturating_sub(buffer.last_flush.elapsed());
let (mut buffer, _timeout) = self
.flush_cv
.wait_timeout(buffer, remaining)
.unwrap_or_else(|e| e.into_inner());
if !buffer.records.is_empty() {
self.flush_buffer_locked(&mut buffer)?;
}
}
}
{
let mut prev_lsn_map = self.txn_prev_lsn.write().unwrap_or_else(|e| e.into_inner());
prev_lsn_map.remove(&txn_id);
}
rx.recv()
.map_err(|_| WalError::Io("Channel closed".to_string()))?
}
pub fn abort_txn(&self, txn_id: TxnId) -> Result<Lsn, WalError> {
let lsn = self.next_lsn();
let prev_lsn = {
let prev_lsn = self.txn_prev_lsn.read().unwrap_or_else(|e| e.into_inner());
prev_lsn.get(&txn_id).copied().unwrap_or(0)
};
let record = WalRecord::abort(lsn, txn_id, prev_lsn);
{
let mut prev_lsn_map = self.txn_prev_lsn.write().unwrap_or_else(|e| e.into_inner());
prev_lsn_map.remove(&txn_id);
}
self.append(record)?;
self.force_flush()?;
Ok(lsn)
}
fn append(&self, record: WalRecord) -> Result<(), WalError> {
let mut buffer = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
buffer.add_record(record);
if buffer.should_flush(&self.config) {
self.flush_buffer_locked(&mut buffer)?;
}
Ok(())
}
pub fn force_flush(&self) -> Result<Lsn, WalError> {
let mut buffer = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
if !buffer.records.is_empty() {
self.flush_buffer_locked(&mut buffer)?;
}
Ok(self.flushed_lsn.load(Ordering::Acquire))
}
fn flush_buffer_locked(&self, buffer: &mut GroupCommitBuffer) -> Result<(), WalError> {
if buffer.records.is_empty() {
return Ok(());
}
let start = Instant::now();
let record_count = buffer.records.len() as u64;
let mut data = Vec::with_capacity(buffer.bytes);
let mut last_lsn = 0;
for record in &buffer.records {
last_lsn = record.header.lsn;
data.extend(record.serialize());
}
{
let mut file = self.file.lock().unwrap_or_else(|e| e.into_inner());
file.write_all(&data)
.map_err(|e| WalError::Io(e.to_string()))?;
match self.config.sync_mode {
SyncMode::Fsync => {
file.sync_all().map_err(|e| WalError::Io(e.to_string()))?;
}
SyncMode::FdataSync => {
file.sync_data().map_err(|e| WalError::Io(e.to_string()))?;
}
SyncMode::None => {}
}
}
self.flushed_lsn.store(last_lsn, Ordering::Release);
let elapsed_us = start.elapsed().as_micros() as u64;
self.stats
.records_written
.fetch_add(record_count, Ordering::Relaxed);
self.stats
.bytes_written
.fetch_add(data.len() as u64, Ordering::Relaxed);
self.stats.flushes.fetch_add(1, Ordering::Relaxed);
self.stats
.total_batch_records
.fetch_add(record_count, Ordering::Relaxed);
self.stats
.total_flush_time_us
.fetch_add(elapsed_us, Ordering::Relaxed);
for (_, sender) in buffer.waiters.drain(..) {
let _ = sender.send(Ok(last_lsn));
}
buffer.clear();
self.flush_cv.notify_all();
Ok(())
}
pub fn stats(&self) -> &WalStats {
&self.stats
}
pub fn recover<R: RecoveryHandler>(&self, handler: &mut R) -> Result<RecoveryStats, WalError> {
let start = Instant::now();
let (dirty_pages, active_txns, last_checkpoint) = self.analysis_pass()?;
let redo_start = dirty_pages
.values()
.min()
.copied()
.unwrap_or(last_checkpoint);
let redo_count = self.redo_pass(redo_start, handler)?;
let undo_count = self.undo_pass(&active_txns, handler)?;
Ok(RecoveryStats {
analysis_time: start.elapsed(),
redo_records: redo_count,
undo_records: undo_count,
dirty_pages: dirty_pages.len(),
active_txns: active_txns.len(),
})
}
#[allow(clippy::type_complexity)]
fn analysis_pass(&self) -> Result<(HashMap<PageId, Lsn>, HashMap<TxnId, Lsn>, Lsn), WalError> {
let mut dirty_pages: HashMap<PageId, Lsn> = HashMap::new();
let mut active_txns: HashMap<TxnId, Lsn> = HashMap::new();
let mut last_checkpoint = 0;
for record in self.iter_records()? {
let record = record?;
let lsn = record.header.lsn;
let txn_id = record.header.txn_id;
match WalRecordType::try_from(record.header.record_type) {
Ok(WalRecordType::Begin) => {
active_txns.insert(txn_id, lsn);
}
Ok(WalRecordType::Update) => {
let page_id = record.header.page_id;
dirty_pages.entry(page_id).or_insert(lsn);
active_txns.insert(txn_id, lsn);
}
Ok(WalRecordType::Commit) | Ok(WalRecordType::Abort) | Ok(WalRecordType::End) => {
active_txns.remove(&txn_id);
}
Ok(WalRecordType::Clr) => {
active_txns.insert(txn_id, lsn);
}
Ok(WalRecordType::Checkpoint) => {
last_checkpoint = lsn;
}
Err(_) => {}
}
}
Ok((dirty_pages, active_txns, last_checkpoint))
}
fn redo_pass<R: RecoveryHandler>(
&self,
start_lsn: Lsn,
handler: &mut R,
) -> Result<u64, WalError> {
let mut count = 0;
for record in self.iter_records_from(start_lsn)? {
let record = record?;
match WalRecordType::try_from(record.header.record_type) {
Ok(WalRecordType::Update) => {
handler.redo(&record)?;
count += 1;
}
Ok(WalRecordType::Clr) => {
count += 1;
}
_ => {}
}
}
Ok(count)
}
fn undo_pass<R: RecoveryHandler>(
&self,
active_txns: &HashMap<TxnId, Lsn>,
handler: &mut R,
) -> Result<u64, WalError> {
let mut count = 0;
let mut undo_list: VecDeque<(Lsn, TxnId)> = active_txns
.iter()
.map(|(&txn_id, &lsn)| (lsn, txn_id))
.collect();
undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
while let Some((lsn, txn_id)) = undo_list.pop_front() {
if lsn == 0 {
continue;
}
if let Some(record) = self.read_record_at(lsn)? {
match WalRecordType::try_from(record.header.record_type) {
Ok(WalRecordType::Update) => {
handler.undo(&record)?;
count += 1;
let clr_lsn = self.next_lsn();
let clr = WalRecord::clr(
clr_lsn,
txn_id,
lsn,
record.header.page_id,
record.header.offset,
record.header.prev_lsn,
);
self.append(clr)?;
if record.header.prev_lsn > 0 {
undo_list.push_back((record.header.prev_lsn, txn_id));
undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
}
}
Ok(WalRecordType::Clr) => {
if record.after_image.len() >= 8 {
let bytes: [u8; 8] =
record.after_image[0..8].try_into().unwrap_or([0; 8]);
let undo_next = u64::from_le_bytes(bytes);
if undo_next > 0 {
undo_list.push_back((undo_next, txn_id));
undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
}
}
}
_ => {
if record.header.prev_lsn > 0 {
undo_list.push_back((record.header.prev_lsn, txn_id));
undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
}
}
}
}
}
self.force_flush()?;
Ok(count)
}
fn read_record_at(&self, target_lsn: Lsn) -> Result<Option<WalRecord>, WalError> {
for record in self.iter_records()? {
let record = record?;
if record.header.lsn == target_lsn {
return Ok(Some(record));
}
if record.header.lsn > target_lsn {
break;
}
}
Ok(None)
}
fn iter_records(&self) -> Result<WalIterator, WalError> {
self.iter_records_from(0)
}
fn iter_records_from(&self, start_lsn: Lsn) -> Result<WalIterator, WalError> {
let file_path = self.dir.join(format!(
"wal_{:08}.log",
self.file_number.load(Ordering::Relaxed)
));
let file = File::open(&file_path).map_err(|e| WalError::Io(e.to_string()))?;
Ok(WalIterator {
file,
start_lsn,
started: false,
})
}
}
pub trait RecoveryHandler {
fn redo(&mut self, record: &WalRecord) -> Result<(), WalError>;
fn undo(&mut self, record: &WalRecord) -> Result<(), WalError>;
}
pub struct NoOpRecoveryHandler;
impl RecoveryHandler for NoOpRecoveryHandler {
fn redo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
Ok(())
}
fn undo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct RecoveryStats {
pub analysis_time: Duration,
pub redo_records: u64,
pub undo_records: u64,
pub dirty_pages: usize,
pub active_txns: usize,
}
pub struct WalIterator {
file: File,
start_lsn: Lsn,
started: bool,
}
impl Iterator for WalIterator {
type Item = Result<WalRecord, WalError>;
fn next(&mut self) -> Option<Self::Item> {
let mut header_buf = [0u8; WalRecordHeader::SIZE];
match self.file.read_exact(&mut header_buf) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return None,
Err(e) => return Some(Err(WalError::Io(e.to_string()))),
}
let header = match WalRecordHeader::deserialize(&header_buf) {
Some(h) => h,
None => return Some(Err(WalError::InvalidRecord)),
};
if !self.started {
if header.lsn < self.start_lsn {
let skip = header.data_length as i64 + 4;
if let Err(e) = self.file.seek(SeekFrom::Current(skip)) {
return Some(Err(WalError::Io(e.to_string())));
}
return self.next();
}
self.started = true;
}
let data_len = header.data_length as usize;
let mut data_buf = vec![0u8; data_len + 4];
match self.file.read_exact(&mut data_buf) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return None,
Err(e) => return Some(Err(WalError::Io(e.to_string()))),
}
let mut full_buf = Vec::with_capacity(WalRecordHeader::SIZE + data_len);
full_buf.extend_from_slice(&header_buf);
full_buf.extend_from_slice(&data_buf[..data_len]);
let crc_bytes: [u8; 4] = match data_buf[data_len..data_len + 4].try_into() {
Ok(b) => b,
Err(_) => return Some(Err(WalError::Corruption("CRC bytes truncated".to_string()))),
};
let expected_crc = u32::from_le_bytes(crc_bytes);
let actual_crc = crc32_of(&full_buf);
if expected_crc != actual_crc {
return Some(Err(WalError::Corruption("CRC mismatch".to_string())));
}
let before_end = header.before_length as usize;
Some(Ok(WalRecord {
header,
before_image: data_buf[..before_end].to_vec(),
after_image: data_buf[before_end..data_len].to_vec(),
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
use sochdb_core::ValidityBitmap;
use std::sync::atomic::AtomicU64;
use tempfile::TempDir;
#[test]
fn test_wal_record_serialization() {
let record = WalRecord::update(1, 100, 0, 1000, 0, vec![1, 2, 3, 4], vec![5, 6, 7, 8]);
let serialized = record.serialize();
let deserialized = WalRecord::deserialize(&serialized).unwrap();
let lsn = deserialized.header.lsn;
let txn_id = deserialized.header.txn_id;
assert_eq!(lsn, 1);
assert_eq!(txn_id, 100);
assert_eq!(deserialized.before_image, vec![1, 2, 3, 4]);
assert_eq!(deserialized.after_image, vec![5, 6, 7, 8]);
}
#[test]
fn test_lone_commit_does_not_hang() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
sync_mode: SyncMode::None, buffer_size: 10_000_000, max_batch_size: 1_000_000, flush_interval: Duration::from_millis(20),
..Default::default()
};
let wal = WriteAheadLog::open(dir.path(), config).unwrap();
wal.begin_txn(1).unwrap();
wal.log_update(1, 100, 0, vec![0; 10], vec![1; 10]).unwrap();
let (done_tx, done_rx) = std::sync::mpsc::channel();
let handle = std::thread::spawn(move || {
let lsn = wal.commit_txn(1);
let _ = done_tx.send(lsn.map(|l| l > 0));
});
match done_rx.recv_timeout(std::time::Duration::from_secs(5)) {
Ok(Ok(true)) => {}
Ok(other) => panic!("commit_txn returned unexpected result: {:?}", other),
Err(_) => panic!("commit_txn hung: lone commit did not flush within timeout"),
}
handle.join().unwrap();
}
#[test]
#[ignore] fn test_wal_basic_operations() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
sync_mode: SyncMode::None, ..Default::default()
};
let wal = WriteAheadLog::open(dir.path(), config).unwrap();
let begin_lsn = wal.begin_txn(1).unwrap();
assert!(begin_lsn > 0);
let update_lsn = wal.log_update(1, 100, 0, vec![0; 10], vec![1; 10]).unwrap();
assert!(update_lsn > begin_lsn);
let commit_lsn = wal.commit_txn(1).unwrap();
assert!(commit_lsn > update_lsn);
assert!(wal.stats().records_written.load(Ordering::Relaxed) >= 3);
}
#[test]
#[ignore] fn test_wal_group_commit() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
sync_mode: SyncMode::None,
buffer_size: 10000, max_batch_size: 100,
flush_interval: Duration::from_secs(10), ..Default::default()
};
let wal = WriteAheadLog::open(dir.path(), config).unwrap();
for i in 0..10 {
wal.begin_txn(i).unwrap();
wal.log_update(i, 100 + i, 0, vec![0; 10], vec![1; 10])
.unwrap();
}
wal.force_flush().unwrap();
let stats = wal.stats();
let flushes = stats.flushes.load(Ordering::Relaxed);
let records = stats.records_written.load(Ordering::Relaxed);
assert!(records >= 20); println!(
"Flushes: {}, Records: {}, Avg batch: {:.1}",
flushes,
records,
stats.avg_batch_size()
);
}
#[test]
fn test_crc32() {
let data = b"hello world";
let crc = crc32_of(data);
assert_ne!(crc, 0);
let crc2 = crc32_of(data);
assert_eq!(crc, crc2);
let data2 = b"hello World"; let crc3 = crc32_of(data2);
assert_ne!(crc, crc3);
}
#[test]
#[ignore] fn test_wal_iterator() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
sync_mode: SyncMode::None,
..Default::default()
};
let wal = WriteAheadLog::open(dir.path(), config).unwrap();
wal.begin_txn(1).unwrap();
wal.log_update(1, 100, 0, vec![1, 2, 3], vec![4, 5, 6])
.unwrap();
wal.log_update(1, 101, 0, vec![7, 8, 9], vec![10, 11, 12])
.unwrap();
wal.force_flush().unwrap();
let count = wal.iter_records().unwrap().count();
assert_eq!(count, 3); }
#[test]
#[ignore] fn test_wal_persistence() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
sync_mode: SyncMode::None,
..Default::default()
};
{
let wal = WriteAheadLog::open(dir.path(), config.clone()).unwrap();
wal.begin_txn(1).unwrap();
wal.log_update(1, 100, 0, vec![1, 2, 3], vec![4, 5, 6])
.unwrap();
wal.commit_txn(1).unwrap();
}
{
let wal = WriteAheadLog::open(dir.path(), config).unwrap();
let count = wal.iter_records().unwrap().count();
assert_eq!(count, 3); }
}
#[test]
#[ignore] fn test_wal_recovery_analysis() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
sync_mode: SyncMode::None,
..Default::default()
};
let wal = WriteAheadLog::open(dir.path(), config).unwrap();
wal.begin_txn(1).unwrap();
wal.log_update(1, 100, 0, vec![1, 2], vec![3, 4]).unwrap();
wal.commit_txn(1).unwrap();
wal.begin_txn(2).unwrap();
wal.log_update(2, 200, 0, vec![5, 6], vec![7, 8]).unwrap();
wal.force_flush().unwrap();
let (dirty_pages, active_txns, _) = wal.analysis_pass().unwrap();
assert!(!active_txns.contains_key(&1)); assert!(active_txns.contains_key(&2)); assert!(dirty_pages.contains_key(&200)); }
struct TestRecoveryHandler {
redo_count: AtomicU64,
undo_count: AtomicU64,
}
impl RecoveryHandler for TestRecoveryHandler {
fn redo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
self.redo_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn undo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
self.undo_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
#[test]
#[ignore] fn test_wal_full_recovery() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
sync_mode: SyncMode::None,
..Default::default()
};
{
let wal = WriteAheadLog::open(dir.path(), config.clone()).unwrap();
wal.begin_txn(1).unwrap();
wal.log_update(1, 100, 0, vec![1, 2], vec![3, 4]).unwrap();
wal.commit_txn(1).unwrap();
wal.begin_txn(2).unwrap();
wal.log_update(2, 200, 0, vec![5, 6], vec![7, 8]).unwrap();
wal.log_update(2, 201, 0, vec![9, 10], vec![11, 12])
.unwrap();
wal.force_flush().unwrap();
}
{
let wal = WriteAheadLog::open(dir.path(), config).unwrap();
let mut handler = TestRecoveryHandler {
redo_count: AtomicU64::new(0),
undo_count: AtomicU64::new(0),
};
let stats = wal.recover(&mut handler).unwrap();
assert_eq!(stats.redo_records, 3);
assert_eq!(stats.undo_records, 2);
assert_eq!(stats.active_txns, 1);
}
}
#[test]
#[ignore]
fn test_validity_bitmap() {
let mut bitmap = ValidityBitmap::new_all_valid(100);
assert_eq!(bitmap.len(), 100);
assert_eq!(bitmap.null_count(), 0);
for i in 0..100 {
assert!(bitmap.is_valid(i));
}
bitmap.set_null(10);
bitmap.set_null(50);
bitmap.set_null(99);
assert_eq!(bitmap.null_count(), 3);
assert!(!bitmap.is_valid(10));
assert!(!bitmap.is_valid(50));
assert!(!bitmap.is_valid(99));
assert!(bitmap.is_valid(11));
}
}