use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION};
use std::fs::{File, OpenOptions};
use std::io::{self, BufWriter, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::Arc;
const WAL_BUFFER_BYTES: usize = 64 * 1024;
const WAL_SEGMENT_BYTES: u64 = 16 * 1024 * 1024;
#[inline]
fn next_wal_segment_boundary(pos: u64) -> u64 {
(pos / WAL_SEGMENT_BYTES + 1) * WAL_SEGMENT_BYTES
}
#[cfg(target_os = "linux")]
fn reserve_wal_blocks(file: &File, offset: u64, len: u64) -> io::Result<()> {
use std::os::unix::io::AsRawFd;
if len == 0 {
return Ok(());
}
let ret = unsafe {
libc::fallocate(
file.as_raw_fd(),
libc::FALLOC_FL_KEEP_SIZE,
offset as libc::off_t,
len as libc::off_t,
)
};
if ret == 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
}
#[cfg(not(target_os = "linux"))]
fn reserve_wal_blocks(_file: &File, _offset: u64, _len: u64) -> io::Result<()> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"WAL preallocation is only implemented on linux",
))
}
fn fallocate_unsupported(err: &io::Error) -> bool {
if err.kind() == io::ErrorKind::Unsupported {
return true;
}
#[cfg(target_os = "linux")]
{
matches!(
err.raw_os_error(),
Some(libc::EOPNOTSUPP) | Some(libc::ENOSYS) | Some(libc::EINVAL)
)
}
#[cfg(not(target_os = "linux"))]
{
false
}
}
pub struct WalWriter {
file: BufWriter<File>,
sync_handle: Arc<File>,
current_lsn: u64,
durable_lsn: u64,
preallocated_to: u64,
prealloc_supported: bool,
}
impl WalWriter {
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let exists = path.as_ref().exists();
let mut raw = OpenOptions::new()
.read(true)
.create(true)
.append(true)
.open(path)?;
let current_lsn = if !exists || raw.metadata()?.len() == 0 {
let mut header = Vec::with_capacity(8);
header.extend_from_slice(WAL_MAGIC);
header.push(WAL_VERSION);
header.extend_from_slice(&[0u8; 3]);
raw.write_all(&header)?;
raw.sync_all()?;
8
} else {
raw.seek(SeekFrom::End(0))?
};
let sync_handle = Arc::new(raw.try_clone()?);
let file = BufWriter::with_capacity(WAL_BUFFER_BYTES, raw);
let mut writer = Self {
file,
sync_handle,
current_lsn,
durable_lsn: current_lsn,
preallocated_to: 0,
prealloc_supported: true,
};
writer.ensure_preallocated()?;
Ok(writer)
}
fn ensure_preallocated(&mut self) -> io::Result<()> {
if !self.prealloc_supported {
return Ok(());
}
let target = next_wal_segment_boundary(self.current_lsn);
if target <= self.preallocated_to {
return Ok(());
}
let from = self.preallocated_to;
match reserve_wal_blocks(self.file.get_ref(), from, target - from) {
Ok(()) => self.preallocated_to = target,
Err(ref e) if fallocate_unsupported(e) => self.prealloc_supported = false,
Err(_) => {
}
}
Ok(())
}
pub fn append(&mut self, record: &WalRecord) -> io::Result<u64> {
let bytes = record.encode();
self.file.write_all(&bytes)?;
let record_lsn = self.current_lsn;
self.current_lsn += bytes.len() as u64;
self.ensure_preallocated()?;
Ok(record_lsn)
}
pub fn append_bytes(&mut self, bytes: &[u8]) -> io::Result<u64> {
self.file.write_all(bytes)?;
let record_lsn = self.current_lsn;
self.current_lsn += bytes.len() as u64;
self.ensure_preallocated()?;
Ok(record_lsn)
}
pub fn set_current_lsn(&mut self, lsn: u64) {
self.current_lsn = lsn;
}
pub fn sync(&mut self) -> io::Result<()> {
self.file.flush()?;
self.file.get_ref().sync_all()?;
self.durable_lsn = self.current_lsn;
Ok(())
}
pub fn flush_until(&mut self, target: u64) -> io::Result<()> {
if self.durable_lsn >= target {
return Ok(());
}
self.file.flush()?;
self.file.get_ref().sync_all()?;
self.durable_lsn = self.current_lsn;
Ok(())
}
pub fn durable_lsn(&self) -> u64 {
self.durable_lsn
}
pub fn current_lsn(&self) -> u64 {
self.current_lsn
}
pub fn drain_for_group_sync(&mut self) -> io::Result<(u64, Arc<File>)> {
self.file.flush()?;
Ok((self.current_lsn, Arc::clone(&self.sync_handle)))
}
pub fn mark_durable(&mut self, lsn: u64) {
if lsn > self.durable_lsn {
self.durable_lsn = lsn;
}
}
pub fn truncate(&mut self) -> io::Result<()> {
self.file.flush()?;
{
let raw = self.file.get_mut();
raw.set_len(0)?;
raw.seek(SeekFrom::Start(0))?;
}
let mut header = Vec::with_capacity(8);
header.extend_from_slice(WAL_MAGIC);
header.push(WAL_VERSION);
header.extend_from_slice(&[0u8; 3]);
self.file.write_all(&header)?;
self.file.flush()?;
self.file.get_ref().sync_all()?;
self.current_lsn = 8;
self.durable_lsn = 8;
self.preallocated_to = 0;
self.ensure_preallocated()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
struct FileGuard {
path: PathBuf,
}
impl Drop for FileGuard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
let path =
std::env::temp_dir().join(format!("rb_wal_writer_{}_{}.wal", name, std::process::id()));
let guard = FileGuard { path: path.clone() };
let _ = std::fs::remove_file(&path);
(guard, path)
}
#[test]
fn test_create_new_wal() {
let (_guard, path) = temp_wal("create");
let writer = WalWriter::open(&path).unwrap();
assert_eq!(writer.current_lsn(), 8);
assert!(path.exists());
}
#[test]
fn test_append_record() {
let (_guard, path) = temp_wal("append");
let mut writer = WalWriter::open(&path).unwrap();
let record = WalRecord::Begin { tx_id: 42 };
let lsn = writer.append(&record).unwrap();
assert_eq!(lsn, 8);
assert_eq!(writer.current_lsn(), 8 + 21);
}
#[test]
fn test_append_multiple_records() {
let (_guard, path) = temp_wal("multi");
let mut writer = WalWriter::open(&path).unwrap();
let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
let lsn2 = writer.append(&WalRecord::Begin { tx_id: 2 }).unwrap();
let lsn3 = writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
assert_eq!(lsn1, 8);
assert_eq!(lsn2, 8 + 21);
assert_eq!(lsn3, 8 + 21 + 21);
}
#[test]
fn test_page_write_lsn() {
let (_guard, path) = temp_wal("pagewrite");
let mut writer = WalWriter::open(&path).unwrap();
let lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
assert_eq!(lsn1, 8);
let data = vec![1, 2, 3, 4, 5];
let lsn2 = writer
.append(&WalRecord::PageWrite {
tx_id: 1,
page_id: 100,
data: data.clone(),
})
.unwrap();
assert_eq!(lsn2, 8 + 21);
assert_eq!(writer.current_lsn(), 8 + 21 + 34);
}
#[test]
fn test_sync() {
let (_guard, path) = temp_wal("sync");
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
writer.sync().unwrap();
assert!(path.exists());
}
#[test]
fn test_truncate() {
let (_guard, path) = temp_wal("truncate");
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
writer
.append(&WalRecord::PageWrite {
tx_id: 1,
page_id: 0,
data: vec![0; 100],
})
.unwrap();
writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
let lsn_before = writer.current_lsn();
assert!(lsn_before > 8);
writer.truncate().unwrap();
assert_eq!(writer.current_lsn(), 8);
let len = std::fs::metadata(&path).unwrap().len();
assert_eq!(len, 8);
}
#[test]
fn test_reopen_existing() {
let (_guard, path) = temp_wal("reopen");
let lsn_after_write;
{
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
lsn_after_write = writer.current_lsn();
}
{
let writer = WalWriter::open(&path).unwrap();
assert_eq!(writer.current_lsn(), lsn_after_write);
}
}
#[test]
fn test_checkpoint_record() {
let (_guard, path) = temp_wal("checkpoint");
let mut writer = WalWriter::open(&path).unwrap();
let lsn = writer
.append(&WalRecord::Checkpoint { lsn: 12345 })
.unwrap();
assert_eq!(lsn, 8);
assert_eq!(writer.current_lsn(), 8 + 21);
}
#[test]
fn fresh_wal_has_durable_lsn_at_header_end() {
let (_guard, path) = temp_wal("durable_init");
let writer = WalWriter::open(&path).unwrap();
assert_eq!(writer.durable_lsn(), 8);
assert_eq!(writer.current_lsn(), 8);
}
#[test]
fn flush_until_below_durable_is_noop() {
let (_guard, path) = temp_wal("flush_noop");
let mut writer = WalWriter::open(&path).unwrap();
let before = writer.durable_lsn();
writer.flush_until(0).unwrap();
writer.flush_until(8).unwrap();
assert_eq!(writer.durable_lsn(), before);
}
#[test]
fn flush_until_advances_durable_to_current() {
let (_guard, path) = temp_wal("flush_advance");
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 7 }).unwrap();
writer.append(&WalRecord::Commit { tx_id: 7 }).unwrap();
let target = writer.current_lsn();
assert_eq!(writer.durable_lsn(), 8);
writer.flush_until(target).unwrap();
assert_eq!(writer.durable_lsn(), target);
}
#[test]
fn flush_until_is_monotonic() {
let (_guard, path) = temp_wal("flush_monotonic");
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
let lo = writer.current_lsn();
writer.flush_until(lo).unwrap();
let durable_after_lo = writer.durable_lsn();
writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
let hi = writer.current_lsn();
writer.flush_until(hi).unwrap();
assert!(writer.durable_lsn() >= durable_after_lo);
writer.flush_until(lo).unwrap();
assert_eq!(writer.durable_lsn(), hi);
}
#[test]
fn sync_advances_durable_lsn_too() {
let (_guard, path) = temp_wal("sync_durable");
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 9 }).unwrap();
let before = writer.durable_lsn();
let after_append = writer.current_lsn();
assert!(after_append > before);
writer.sync().unwrap();
assert_eq!(writer.durable_lsn(), after_append);
}
#[test]
fn truncate_resets_durable_lsn() {
let (_guard, path) = temp_wal("truncate_durable");
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
writer.sync().unwrap();
assert!(writer.durable_lsn() > 8);
writer.truncate().unwrap();
assert_eq!(writer.durable_lsn(), 8);
assert_eq!(writer.current_lsn(), 8);
}
#[test]
fn reopen_initialises_durable_to_current() {
let (_guard, path) = temp_wal("reopen_durable");
{
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
writer.sync().unwrap();
}
let writer = WalWriter::open(&path).unwrap();
assert_eq!(writer.durable_lsn(), writer.current_lsn());
}
#[test]
fn bufwriter_coalesces_until_sync() {
let (_guard, path) = temp_wal("bufwriter_coalesce");
let mut writer = WalWriter::open(&path).unwrap();
for tx in 0..100u64 {
writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
}
assert_eq!(writer.current_lsn(), 8 + 100 * 21);
let on_disk = std::fs::metadata(&path).unwrap().len();
assert_eq!(on_disk, 8, "BufWriter leaked bytes to disk before sync");
}
#[test]
fn sync_drains_bufwriter_before_fsync() {
let (_guard, path) = temp_wal("sync_drains");
let mut writer = WalWriter::open(&path).unwrap();
for tx in 0..50u64 {
writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
}
writer.sync().unwrap();
let on_disk = std::fs::metadata(&path).unwrap().len();
assert_eq!(on_disk, writer.current_lsn());
assert_eq!(writer.durable_lsn(), writer.current_lsn());
}
#[test]
fn flush_until_drains_bufwriter_too() {
let (_guard, path) = temp_wal("flush_until_drains");
let mut writer = WalWriter::open(&path).unwrap();
for tx in 0..30u64 {
writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
}
let target = writer.current_lsn();
writer.flush_until(target).unwrap();
let on_disk = std::fs::metadata(&path).unwrap().len();
assert_eq!(on_disk, target);
assert_eq!(writer.durable_lsn(), target);
}
#[test]
fn truncate_drains_pending_bufwriter_bytes_first() {
let (_guard, path) = temp_wal("truncate_drain");
let mut writer = WalWriter::open(&path).unwrap();
for tx in 0..200u64 {
writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
}
assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
writer.truncate().unwrap();
let on_disk = std::fs::metadata(&path).unwrap().len();
assert_eq!(on_disk, 8);
assert_eq!(writer.current_lsn(), 8);
assert_eq!(writer.durable_lsn(), 8);
writer.append(&WalRecord::Begin { tx_id: 99 }).unwrap();
writer.sync().unwrap();
assert_eq!(std::fs::metadata(&path).unwrap().len(), 8 + 21);
}
#[test]
fn reopen_sees_only_synced_records() {
let (_guard, path) = temp_wal("reopen_synced_only");
let synced_lsn;
{
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
writer.sync().unwrap();
synced_lsn = writer.current_lsn();
for tx in 100..120u64 {
writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
}
}
let writer = WalWriter::open(&path).unwrap();
assert!(writer.durable_lsn() >= synced_lsn);
}
fn allocated_bytes(path: &std::path::Path) -> u64 {
use fs2::FileExt;
let f = std::fs::File::open(path).unwrap();
f.allocated_size().unwrap()
}
#[test]
fn segment_boundary_rounds_strictly_up() {
assert_eq!(next_wal_segment_boundary(0), WAL_SEGMENT_BYTES);
assert_eq!(next_wal_segment_boundary(8), WAL_SEGMENT_BYTES);
assert_eq!(
next_wal_segment_boundary(WAL_SEGMENT_BYTES - 1),
WAL_SEGMENT_BYTES
);
assert_eq!(
next_wal_segment_boundary(WAL_SEGMENT_BYTES),
2 * WAL_SEGMENT_BYTES
);
assert_eq!(
next_wal_segment_boundary(WAL_SEGMENT_BYTES + 1),
2 * WAL_SEGMENT_BYTES
);
}
#[test]
fn open_preallocates_first_segment() {
let (_guard, path) = temp_wal("prealloc_open");
let writer = WalWriter::open(&path).unwrap();
if !writer.prealloc_supported {
return; }
assert_eq!(writer.preallocated_to, WAL_SEGMENT_BYTES);
assert!(allocated_bytes(&path) >= WAL_SEGMENT_BYTES);
assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
}
#[test]
fn preallocation_does_not_grow_logical_length() {
let (_guard, path) = temp_wal("prealloc_logical");
let mut writer = WalWriter::open(&path).unwrap();
for tx in 0..50u64 {
writer.append(&WalRecord::Begin { tx_id: tx }).unwrap();
}
writer.sync().unwrap();
let logical = std::fs::metadata(&path).unwrap().len();
assert_eq!(logical, 8 + 50 * 21, "preallocation inflated i_size");
assert_eq!(writer.current_lsn(), logical);
}
#[test]
fn truncate_re_extends_a_fresh_segment() {
let (_guard, path) = temp_wal("prealloc_truncate");
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
writer.sync().unwrap();
writer.truncate().unwrap();
assert_eq!(writer.current_lsn(), 8);
assert_eq!(std::fs::metadata(&path).unwrap().len(), 8);
if writer.prealloc_supported {
assert_eq!(writer.preallocated_to, WAL_SEGMENT_BYTES);
assert!(allocated_bytes(&path) >= WAL_SEGMENT_BYTES);
}
}
#[test]
fn preallocated_wal_recovers_records_without_trailing_garbage() {
use super::super::reader::WalReader;
let (_guard, path) = temp_wal("prealloc_recover");
{
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
writer
.append(&WalRecord::PageWrite {
tx_id: 1,
page_id: 7,
data: vec![1, 2, 3, 4],
})
.unwrap();
writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
writer.sync().unwrap();
}
let records: Vec<_> = WalReader::open(&path)
.unwrap()
.iter()
.collect::<Result<_, _>>()
.expect("reader must stop cleanly at real EOF, not in reserved tail");
assert_eq!(records.len(), 3);
assert_eq!(records[0].1, WalRecord::Begin { tx_id: 1 });
assert_eq!(records[2].1, WalRecord::Commit { tx_id: 1 });
}
}