use std::fs::{File, OpenOptions};
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::AsRawFd;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use io_uring::{IoUring, opcode, types};
use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT};
use crate::error::{Result, WalError};
use crate::record::{HEADER_SIZE, WalRecord};
#[derive(Debug, Clone)]
pub struct UringWriterConfig {
pub write_buffer_size: usize,
pub alignment: usize,
pub ring_depth: u32,
pub use_direct_io: bool,
}
impl Default for UringWriterConfig {
fn default() -> Self {
Self {
write_buffer_size: crate::writer::DEFAULT_WRITE_BUFFER_SIZE,
alignment: DEFAULT_ALIGNMENT,
ring_depth: 64,
use_direct_io: true,
}
}
}
pub struct UringWriter {
file: File,
buffer: AlignedBuf,
file_offset: u64,
next_lsn: AtomicU64,
ring: IoUring,
sealed: bool,
config: UringWriterConfig,
encryption_key: Option<crate::crypto::WalEncryptionKey>,
}
impl UringWriter {
pub fn open(path: &Path, config: UringWriterConfig) -> Result<Self> {
let mut opts = OpenOptions::new();
opts.create(true).write(true).read(true);
if config.use_direct_io {
opts.custom_flags(libc::O_DIRECT);
}
let file = opts.open(path)?;
let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
let (file_offset, next_lsn) = if path.exists() && std::fs::metadata(path)?.len() > 0 {
let info = crate::recovery::recover(path)?;
(info.end_offset, info.next_lsn())
} else {
(0, 1)
};
let ring = IoUring::new(config.ring_depth).map_err(WalError::Io)?;
Ok(Self {
file,
buffer,
file_offset,
next_lsn: AtomicU64::new(next_lsn),
ring,
sealed: false,
config,
encryption_key: None,
})
}
pub fn open_without_direct_io(path: &Path) -> Result<Self> {
Self::open(
path,
UringWriterConfig {
use_direct_io: false,
..Default::default()
},
)
}
pub fn set_encryption_key(&mut self, key: crate::crypto::WalEncryptionKey) {
self.encryption_key = Some(key);
}
pub fn append(
&mut self,
record_type: u16,
tenant_id: u32,
vshard_id: u16,
payload: &[u8],
) -> Result<u64> {
if self.sealed {
return Err(WalError::Sealed);
}
let lsn = self.next_lsn.fetch_add(1, Ordering::Relaxed);
let record = WalRecord::new(
record_type,
lsn,
tenant_id,
vshard_id,
payload.to_vec(),
self.encryption_key.as_ref(),
)?;
let header_bytes = record.header.to_bytes();
let total_size = HEADER_SIZE + record.payload.len();
if self.buffer.remaining() < total_size {
self.submit_and_wait_write()?;
}
if total_size > self.buffer.capacity() {
return Err(WalError::PayloadTooLarge {
size: record.payload.len(),
max: self.buffer.capacity() - HEADER_SIZE,
});
}
self.buffer.write(&header_bytes);
self.buffer.write(&record.payload);
Ok(lsn)
}
pub fn submit_and_sync(&mut self) -> Result<()> {
if self.buffer.is_empty() {
return Ok(());
}
self.submit_and_wait_write()?;
self.submit_and_wait_fsync()?;
Ok(())
}
fn submit_and_wait_write(&mut self) -> Result<()> {
if self.buffer.is_empty() {
return Ok(());
}
let data = if self.config.use_direct_io {
self.buffer.as_aligned_slice()
} else {
self.buffer.as_slice()
};
let fd = types::Fd(self.file.as_raw_fd());
let write_op = opcode::Write::new(fd, data.as_ptr(), data.len() as u32)
.offset(self.file_offset)
.build()
.user_data(0x01);
unsafe {
self.ring
.submission()
.push(&write_op)
.map_err(|_| WalError::Io(std::io::Error::other("io_uring SQ full")))?;
}
self.ring.submit_and_wait(1).map_err(WalError::Io)?;
let cqe =
self.ring.completion().next().ok_or_else(|| {
WalError::Io(std::io::Error::other("io_uring: no CQE after write"))
})?;
if cqe.result() < 0 {
return Err(WalError::Io(std::io::Error::from_raw_os_error(
-cqe.result(),
)));
}
self.file_offset += self.buffer.len() as u64;
self.buffer.clear();
Ok(())
}
fn submit_and_wait_fsync(&mut self) -> Result<()> {
let fd = types::Fd(self.file.as_raw_fd());
let fsync_op = opcode::Fsync::new(fd).build().user_data(0x02);
unsafe {
self.ring
.submission()
.push(&fsync_op)
.map_err(|_| WalError::Io(std::io::Error::other("io_uring SQ full")))?;
}
self.ring.submit_and_wait(1).map_err(WalError::Io)?;
let cqe =
self.ring.completion().next().ok_or_else(|| {
WalError::Io(std::io::Error::other("io_uring: no CQE after fsync"))
})?;
if cqe.result() < 0 {
return Err(WalError::Io(std::io::Error::from_raw_os_error(
-cqe.result(),
)));
}
Ok(())
}
pub fn seal(&mut self) -> Result<()> {
self.submit_and_sync()?;
self.sealed = true;
Ok(())
}
pub fn next_lsn(&self) -> u64 {
self.next_lsn.load(Ordering::Relaxed)
}
pub fn file_offset(&self) -> u64 {
self.file_offset
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::reader::WalReader;
use crate::record::RecordType;
#[test]
fn uring_write_and_read_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("uring.wal");
{
let mut writer = UringWriter::open_without_direct_io(&path).unwrap();
writer
.append(RecordType::Put as u16, 1, 0, b"hello-uring")
.unwrap();
writer
.append(RecordType::Put as u16, 2, 1, b"world-uring")
.unwrap();
writer.submit_and_sync().unwrap();
}
let reader = WalReader::open(&path).unwrap();
let records: Vec<_> = reader
.records()
.collect::<crate::error::Result<_>>()
.unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].payload, b"hello-uring");
assert_eq!(records[1].payload, b"world-uring");
assert_eq!(records[0].header.lsn, 1);
assert_eq!(records[1].header.lsn, 2);
}
#[test]
fn uring_group_commit_many_records() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("uring_batch.wal");
{
let mut writer = UringWriter::open_without_direct_io(&path).unwrap();
for i in 0..1000u32 {
let payload = format!("record-{i}");
writer
.append(RecordType::Put as u16, 1, 0, payload.as_bytes())
.unwrap();
}
writer.submit_and_sync().unwrap();
}
let reader = WalReader::open(&path).unwrap();
let records: Vec<_> = reader
.records()
.collect::<crate::error::Result<_>>()
.unwrap();
assert_eq!(records.len(), 1000);
assert_eq!(records[999].header.lsn, 1000);
}
#[test]
fn uring_reopen_continues_lsn() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("uring_reopen.wal");
{
let mut writer = UringWriter::open_without_direct_io(&path).unwrap();
writer
.append(RecordType::Put as u16, 1, 0, b"first")
.unwrap();
writer
.append(RecordType::Put as u16, 1, 0, b"second")
.unwrap();
writer.submit_and_sync().unwrap();
}
{
let mut writer = UringWriter::open_without_direct_io(&path).unwrap();
assert_eq!(writer.next_lsn(), 3);
let lsn = writer
.append(RecordType::Put as u16, 1, 0, b"third")
.unwrap();
assert_eq!(lsn, 3);
writer.submit_and_sync().unwrap();
}
let reader = WalReader::open(&path).unwrap();
let records: Vec<_> = reader
.records()
.collect::<crate::error::Result<_>>()
.unwrap();
assert_eq!(records.len(), 3);
}
}