#[cfg(target_os = "linux")]
use rustix_uring::{IoUring, cqueue, opcode, squeue, types};
#[cfg(target_os = "linux")]
use crate::error::{DbError, DbResult};
#[cfg(target_os = "linux")]
pub struct UringWriter {
ring: IoUring<squeue::Entry, cqueue::Entry>,
fd: Option<std::os::unix::io::RawFd>,
}
#[cfg(target_os = "linux")]
impl UringWriter {
pub fn new() -> DbResult<Self> {
let ring = IoUring::builder()
.setup_sqpoll(2000)
.build(256)
.map_err(|e| DbError::Io(e.into()))?;
Ok(Self { ring, fd: None })
}
pub fn set_file(&mut self, fd: std::os::unix::io::RawFd) {
self.fd = Some(fd);
}
fn submit_write(&mut self, data: &[u8], offset: u64) -> DbResult<u32> {
let fd = self.fd.expect("No file set in UringWriter");
let write_e = opcode::Write::new(types::Fd(fd), data.as_ptr(), data.len() as u32)
.offset(offset)
.build()
.user_data(1);
unsafe {
if self.ring.submission().push(&write_e).is_err() {
self.ring.submit().map_err(|e| DbError::Io(e.into()))?;
self.ring.submission().push(&write_e).expect("Queue full");
}
}
self.ring
.submit_and_wait(1)
.map_err(|e| DbError::Io(e.into()))?;
let mut cq = self.ring.completion();
let mut written = 0u32;
for cqe in &mut cq {
written = cqe.result().map_err(|e| DbError::Io(e.into()))?;
}
Ok(written)
}
pub fn write_at(&mut self, data: &[u8], file_offset: u64) -> DbResult<()> {
debug_assert!(
data.len() <= u32::MAX as usize,
"write_at data exceeds u32::MAX"
);
if data.is_empty() {
return Ok(());
}
let mut written = 0usize;
while written < data.len() {
let remaining = &data[written..];
let n = self.submit_write(remaining, file_offset + written as u64)?;
if n == 0 {
return Err(DbError::Io(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"uring write returned zero bytes",
)));
}
written += n as usize;
}
Ok(())
}
pub fn fsync(&mut self) -> DbResult<()> {
let fd = self.fd.expect("No file set in UringWriter");
let fsync_e = opcode::Fsync::new(types::Fd(fd)).build().user_data(2);
unsafe {
if self.ring.submission().push(&fsync_e).is_err() {
self.ring.submit().map_err(|e| DbError::Io(e.into()))?;
self.ring.submission().push(&fsync_e).expect("Queue full");
}
}
self.ring
.submit_and_wait(1)
.map_err(|e| DbError::Io(e.into()))?;
let mut cq = self.ring.completion();
for cqe in &mut cq {
if let Err(err) = cqe.result() {
return Err(DbError::Io(err.into()));
}
}
Ok(())
}
}