use lio_uring::{
Entry, LioUring,
operation::{
self, Accept, Bind, Close, Connect, Fsync, Ftruncate, LinkAt, Listen,
OpenAt, Read, Recv, Send, Shutdown, Socket, SymlinkAt, Tee, Timeout, Write,
},
};
use crate::{
backends::{IoBackend, OpCompleted},
op::{Op, RawBuf},
};
use std::io;
use std::os::fd::AsRawFd;
use std::time::Duration;
fn create_io_uring_entry(op: &Op) -> Entry {
match op {
Op::Nop => operation::Nop::new().build(),
Op::Read { fd, buffer } => {
let RawBuf { ptr, len } = unsafe { buffer.peek::<RawBuf>() };
Read::new(fd.as_raw_fd(), ptr, len as u32).build()
}
Op::Write { fd, buffer } => {
let (ptr, len) = unsafe { buffer.peek::<(*const u8, usize)>() };
Write::new(fd.as_raw_fd(), ptr, len as u32).build()
}
Op::ReadAt { fd, offset, buffer } => {
let RawBuf { ptr, len } = unsafe { buffer.peek::<RawBuf>() };
Read::new(fd.as_raw_fd(), ptr, len as u32).offset(*offset as u64).build()
}
Op::WriteAt { fd, offset, buffer } => {
let (ptr, len) = unsafe { buffer.peek::<(*const u8, usize)>() };
Write::new(fd.as_raw_fd(), ptr, len as u32).offset(*offset as u64).build()
}
Op::Send { fd, flags, buffer } => {
let (ptr, len) = unsafe { buffer.peek::<(*const u8, usize)>() };
Send::new(fd.as_raw_fd(), ptr, len as u32).flags(*flags).build()
}
Op::Recv { fd, flags, buffer } => {
let RawBuf { ptr, len } = unsafe { buffer.peek::<RawBuf>() };
Recv::new(fd.as_raw_fd(), ptr, len as u32).flags(*flags).build()
}
Op::Accept { fd, addr, len } => {
Accept::new(fd.as_raw_fd(), (*addr) as *mut libc::sockaddr, *len).build()
}
Op::Connect { fd, addr, len, .. } => {
Connect::new(fd.as_raw_fd(), (*addr) as *const libc::sockaddr, *len)
.build()
}
Op::Bind { fd, addr, addrlen } => {
Bind::new(fd.as_raw_fd(), (*addr) as *const libc::sockaddr, *addrlen)
.build()
}
Op::Listen { fd, backlog } => Listen::new(fd.as_raw_fd(), *backlog).build(),
Op::Shutdown { fd, how } => Shutdown::new(fd.as_raw_fd(), *how).build(),
Op::Socket { domain, ty, proto } => {
Socket::new(*domain, *ty, *proto).build()
}
Op::OpenAt { dir_fd, path, flags } => {
OpenAt::new(dir_fd.as_raw_fd(), *path).flags(*flags).build()
}
Op::Close { fd } => Close::new(*fd).build(),
Op::Fsync { fd } => Fsync::new(fd.as_raw_fd()).build(),
Op::Truncate { fd, size } => Ftruncate::new(fd.as_raw_fd(), *size).build(),
Op::LinkAt { old_dir_fd, old_path, new_dir_fd, new_path } => LinkAt::new(
old_dir_fd.as_raw_fd(),
*old_path,
new_dir_fd.as_raw_fd(),
*new_path,
)
.build(),
Op::SymlinkAt { target, linkpath, dir_fd } => {
SymlinkAt::new(dir_fd.as_raw_fd(), *target, *linkpath).build()
}
#[cfg(target_os = "linux")]
Op::Tee { fd_in, fd_out, size } => {
Tee::new(fd_in.as_raw_fd(), fd_out.as_raw_fd(), *size).build()
}
Op::Timeout { timespec, .. } => {
Timeout::new(*timespec as *const _).build()
}
}
}
#[derive(Default)]
pub struct IoUring {
ring: Option<LioUring>,
completed: Vec<OpCompleted>,
}
impl IoUring {
pub fn new() -> Self {
Self::default()
}
#[inline]
fn ring(&mut self) -> &mut LioUring {
self.ring.as_mut().expect("IoUring not initialized - call init() first")
}
fn poll_inner(
&mut self,
timeout: Option<Duration>,
) -> io::Result<&[OpCompleted]> {
self.completed.clear();
let ring = self.ring.as_mut().expect("IoUring not initialized");
match timeout {
None => {
let first = ring.wait()?;
self
.completed
.push(OpCompleted::new(first.user_data(), first.result() as isize));
}
Some(d) if d.is_zero() => {
match ring.try_wait()? {
Some(op) => {
self
.completed
.push(OpCompleted::new(op.user_data(), op.result() as isize));
}
None => return Ok(&[]),
}
}
Some(d) => {
match ring.wait_timeout(d)? {
Some(first) => {
self.completed.push(OpCompleted::new(
first.user_data(),
first.result() as isize,
));
}
None => return Ok(&[]), }
}
}
let ring = self.ring.as_mut().expect("IoUring not initialized");
while let Ok(Some(op)) = ring.try_wait() {
self
.completed
.push(OpCompleted::new(op.user_data(), op.result() as isize));
}
Ok(&self.completed)
}
}
impl IoBackend for IoUring {
fn init(&mut self, cap: usize) -> io::Result<()> {
let ring = LioUring::new(cap as u32)?;
self.ring = Some(ring);
self.completed = Vec::with_capacity(cap.min(256));
Ok(())
}
fn push(&mut self, id: u64, op: Op) -> io::Result<()> {
let entry = create_io_uring_entry(&op);
unsafe { self.ring().push(entry, id) }.map_err(|_| {
io::Error::new(io::ErrorKind::WouldBlock, "submission queue full")
})?;
Ok(())
}
fn flush(&mut self) -> io::Result<usize> {
let submitted = self.ring().submit()?;
Ok(submitted)
}
fn wait_timeout(
&mut self,
timeout: Option<Duration>,
) -> io::Result<&[OpCompleted]> {
self.poll_inner(timeout)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_init() {
let mut backend = IoUring::new();
backend.init(64).unwrap();
}
}