#![allow(dead_code)]
use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::{AsRawFd, RawFd};
use std::path::PathBuf;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU64, Ordering};
use parking_lot::Mutex;
use xxhash_rust::xxh3::xxh3_64;
const RING_SIZE: usize = 4 * 1024 * 1024;
const ALIGNMENT: usize = 4096;
const FRAME_HEADER_LEN: usize = 12;
pub const WAL_MAX_PAYLOAD: usize = RING_SIZE - FRAME_HEADER_LEN;
#[derive(Debug, Clone, Copy)]
pub struct WalTicket {
pub offset: u64,
pub len: u32,
}
#[derive(Debug, Clone)]
pub struct WalOptions {
pub path: PathBuf,
pub direct_io: bool,
pub ring_bytes: usize,
}
impl Default for WalOptions {
fn default() -> Self {
Self {
path: PathBuf::from("wal.log"),
direct_io: true,
ring_bytes: RING_SIZE,
}
}
}
pub struct Wal {
file: Mutex<File>,
fd: RawFd,
buffer: Mutex<RingBuffer>,
committed: AtomicU64,
direct_io: bool,
ring_len: usize,
}
impl Wal {
pub fn open(options: WalOptions) -> io::Result<Self> {
let mut open_options = OpenOptions::new();
open_options
.create(true)
.write(true)
.read(true)
.append(true);
if options.direct_io {
open_options.custom_flags(libc::O_DIRECT);
}
let file = open_options.open(&options.path)?;
let fd = file.as_raw_fd();
let start_offset = file.metadata()?.len();
Ok(Self {
file: Mutex::new(file),
fd,
buffer: Mutex::new(RingBuffer::new(options.ring_bytes, start_offset)?),
committed: AtomicU64::new(start_offset),
direct_io: options.direct_io,
ring_len: options.ring_bytes,
})
}
pub fn append(&self, payload: &[u8]) -> io::Result<WalTicket> {
if payload.len() + FRAME_HEADER_LEN > self.ring_len {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"payload larger than WAL ring",
));
}
loop {
{
let mut ring = self.buffer.lock();
let frame_len = align_up(payload.len() + FRAME_HEADER_LEN, ALIGNMENT);
if ring.available() >= frame_len {
let offset = ring.reserve(frame_len);
ring.write_frame(offset, payload);
return Ok(WalTicket {
offset,
len: payload.len() as u32,
});
}
}
self.flush()?;
}
}
pub fn flush(&self) -> io::Result<()> {
let mut ring = self.buffer.lock();
if ring.pending() == 0 {
return Ok(());
}
let mut file = self.file.lock();
let pending = ring.pending() as usize;
if pending == 0 {
return Ok(());
}
let mut drained = 0usize;
while drained < pending {
let chunk = ring.chunk();
if chunk.is_empty() {
break;
}
self.write_chunk(&mut file, chunk)?;
let chunk_len = chunk.len();
drained += chunk_len;
ring.consume(chunk_len as u64);
}
Ok(())
}
pub fn sync(&self) -> io::Result<()> {
let res = unsafe { libc::fdatasync(self.fd) };
if res != 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
pub fn commit(&self) -> io::Result<()> {
self.flush()?;
self.sync()
}
fn write_chunk(&self, file: &mut File, chunk: &[u8]) -> io::Result<()> {
if chunk.is_empty() {
return Ok(());
}
if self.direct_io {
write_all_direct(self.fd, chunk)?;
} else {
file.write_all(chunk)?;
}
self.committed
.fetch_add(chunk.len() as u64, Ordering::Release);
Ok(())
}
pub fn committed_bytes(&self) -> u64 {
self.committed.load(Ordering::Acquire)
}
pub fn max_payload(&self) -> usize {
self.ring_len.saturating_sub(FRAME_HEADER_LEN)
}
}
fn align_up(len: usize, align: usize) -> usize {
if len % align == 0 {
len
} else {
len + (align - (len % align))
}
}
fn write_all_direct(fd: RawFd, buf: &[u8]) -> io::Result<()> {
let mut written = 0usize;
while written < buf.len() {
let ptr = unsafe { buf.as_ptr().add(written) as *const libc::c_void };
let to_write = buf.len() - written;
let ret = unsafe { libc::write(fd, ptr, to_write) };
if ret < 0 {
return Err(io::Error::last_os_error());
}
written += ret as usize;
}
Ok(())
}
struct RingBuffer {
storage: AlignedBuffer,
head: u64,
tail: u64,
len: u64,
}
impl RingBuffer {
fn new(len: usize, start_offset: u64) -> io::Result<Self> {
Ok(Self {
storage: AlignedBuffer::new(len, ALIGNMENT)?,
head: start_offset,
tail: start_offset,
len: len as u64,
})
}
fn reserve(&mut self, size: usize) -> u64 {
let offset = self.head;
self.head += size as u64;
offset
}
fn write_raw(&mut self, start: usize, data: &[u8]) {
let end = start + data.len();
if end <= self.capacity() {
unsafe {
let dst = self.storage.as_mut_slice(start, data.len());
dst.copy_from_slice(data);
}
} else {
let first = self.capacity() - start;
unsafe {
let dst = self.storage.as_mut_slice(start, first);
dst.copy_from_slice(&data[..first]);
}
unsafe {
let dst = self.storage.as_mut_slice(0, data.len() - first);
dst.copy_from_slice(&data[first..]);
}
}
}
fn write_frame(&mut self, offset: u64, payload: &[u8]) {
let frame_len = align_up(payload.len() + FRAME_HEADER_LEN, ALIGNMENT);
let start = (offset % self.len) as usize;
let mut frame = vec![0u8; frame_len];
frame[..4].copy_from_slice(&(payload.len() as u32).to_le_bytes());
frame[4..8].copy_from_slice(&(FRAME_HEADER_LEN as u32).to_le_bytes());
frame[8..12].copy_from_slice(&(xxh3_64(payload) as u32).to_le_bytes());
frame[FRAME_HEADER_LEN..FRAME_HEADER_LEN + payload.len()].copy_from_slice(payload);
self.write_raw(start, &frame);
}
fn available(&self) -> usize {
(self.len - self.pending()) as usize
}
fn pending(&self) -> u64 {
self.head - self.tail
}
fn capacity(&self) -> usize {
self.len as usize
}
fn chunk(&self) -> &[u8] {
let pending = self.pending();
if pending == 0 {
return &[];
}
let tail = (self.tail % self.len) as usize;
let contiguous = (self.capacity() - tail).min(pending as usize);
unsafe { self.storage.as_slice(tail, contiguous) }
}
fn consume(&mut self, len: u64) {
self.tail += len;
if self.tail > self.head {
self.tail = self.head;
}
}
}
struct AlignedBuffer {
ptr: NonNull<u8>,
len: usize,
alignment: usize,
}
unsafe impl Send for AlignedBuffer {}
unsafe impl Sync for AlignedBuffer {}
impl AlignedBuffer {
fn new(len: usize, alignment: usize) -> io::Result<Self> {
let mut ptr = std::ptr::null_mut();
let res = unsafe { libc::posix_memalign(&mut ptr, alignment, len) };
if res != 0 {
return Err(io::Error::from_raw_os_error(res));
}
Ok(Self {
ptr: NonNull::new(ptr as *mut u8).expect("posix_memalign returned null"),
len,
alignment,
})
}
unsafe fn as_slice(&self, start: usize, len: usize) -> &[u8] {
debug_assert!(start + len <= self.len);
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr().add(start), len) }
}
unsafe fn as_mut_slice(&mut self, start: usize, len: usize) -> &mut [u8] {
debug_assert!(start + len <= self.len);
unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr().add(start), len) }
}
}
impl Drop for AlignedBuffer {
fn drop(&mut self) {
unsafe {
libc::free(self.ptr.as_ptr() as *mut libc::c_void);
}
}
}