#![deny(missing_docs)]
use core::cell::Cell;
use std::io;
use std::mem;
use std::os::unix::io::RawFd;
use libc;
use liburing_sys::{
io_uring, io_uring_cqe, io_uring_cqe_seen, io_uring_get_sqe, io_uring_peek_cqe,
io_uring_prep_readv, io_uring_prep_writev, io_uring_queue_exit, io_uring_queue_init,
io_uring_register, io_uring_submit, io_uring_wait_cqe,
};
pub const IORING_REGISTER_EVENTFD: u32 = 4;
pub const IORING_UNREGISTER_EVENTFD: u32 = 5;
#[derive(PartialEq)]
enum RequestType {
Read,
Write,
}
#[repr(C)]
struct UringRequest {
iov: libc::iovec,
fd: RawFd,
offset: i64,
client_iov: libc::iovec,
client_cookie: u64,
self_allocated: bool,
request_type: RequestType,
}
#[derive(Debug)]
pub enum Error {
CantInitializeQueue(io::Error),
CantRegisterEventFd(io::Error),
CantAllocateAlignedBuffer(io::Error),
CantSubmitRequest(io::Error),
CantCheckCompletionQueue(io::Error),
IOError(io::Error),
}
pub struct UringQueue {
ring: io_uring,
page_size: usize,
}
impl UringQueue {
pub fn new(queue_depth: u32) -> Result<Self, Error> {
let mut ring: io_uring = unsafe { mem::uninitialized() };
let ret = unsafe { io_uring_queue_init(queue_depth, &mut ring, 0) };
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize };
if ret < 0 {
Err(Error::CantInitializeQueue(io::Error::from_raw_os_error(
ret,
)))
} else {
Ok(UringQueue { ring, page_size })
}
}
pub fn register_eventfd(&mut self, efd: RawFd) -> Result<(), Error> {
let ret = unsafe {
io_uring_register(
self.ring.ring_fd,
IORING_REGISTER_EVENTFD,
Cell::new(efd).as_ptr() as *mut i32 as *mut core::ffi::c_void,
1,
)
};
if ret < 0 {
Err(Error::CantRegisterEventFd(io::Error::from_raw_os_error(
ret,
)))
} else {
Ok(())
}
}
unsafe fn prepare_request(
&self,
fd: RawFd,
buf_ptr: usize,
buf_len: usize,
offset: i64,
cookie: u64,
request_type: RequestType,
) -> Result<*mut UringRequest, Error> {
let req_ptr = libc::malloc(mem::size_of::<UringRequest>());
let mut req = req_ptr as *mut UringRequest;
if buf_ptr % self.page_size == 0 {
(*req).iov.iov_base = buf_ptr as *mut core::ffi::c_void;
(*req).iov.iov_len = buf_len;
(*req).self_allocated = false;
} else {
let mut aligned_buf = mem::uninitialized();
let ret = libc::posix_memalign(&mut aligned_buf, self.page_size, buf_len);
if ret != 0 {
return Err(Error::CantAllocateAlignedBuffer(
io::Error::from_raw_os_error(ret),
));
}
if request_type == RequestType::Write {
libc::memcpy(aligned_buf, buf_ptr as *mut core::ffi::c_void, buf_len);
}
(*req).client_iov.iov_base = buf_ptr as *mut core::ffi::c_void;
(*req).client_iov.iov_len = buf_len;
(*req).iov.iov_base = aligned_buf;
(*req).iov.iov_len = buf_len;
(*req).fd = fd;
(*req).offset = offset;
(*req).self_allocated = true;
}
(*req).client_cookie = cookie;
(*req).request_type = request_type;
Ok(req)
}
pub fn prepare_read(
&mut self,
fd: RawFd,
buf: &mut [u8],
offset: i64,
cookie: u64,
) -> Result<(), Error> {
let req = unsafe {
self.prepare_request(
fd,
buf.as_ptr() as usize,
buf.len(),
offset,
cookie,
RequestType::Read,
)?
};
let sqe = unsafe { io_uring_get_sqe(&mut self.ring) };
if sqe.is_null() {
panic!("can't get sqe");
}
unsafe { io_uring_prep_readv(sqe, fd, &mut (*req).iov, 1, offset) };
unsafe { (*sqe).user_data = req as u64 };
Ok(())
}
pub fn prepare_write(
&mut self,
fd: RawFd,
buf: &[u8],
offset: i64,
cookie: u64,
) -> Result<(), Error> {
let req = unsafe {
self.prepare_request(
fd,
buf.as_ptr() as usize,
buf.len(),
offset,
cookie,
RequestType::Write,
)?
};
let sqe = unsafe { io_uring_get_sqe(&mut self.ring) };
if sqe.is_null() {
panic!("can't get sqe");
}
unsafe { io_uring_prep_writev(sqe, fd, &mut (*req).iov, 1, offset) };
unsafe { (*sqe).user_data = req as u64 };
Ok(())
}
pub fn submit_requests(&mut self) -> Result<(), Error> {
let ret = unsafe { io_uring_submit(&mut self.ring) };
if ret < 0 {
return Err(Error::CantSubmitRequest(io::Error::from_raw_os_error(ret)));
}
Ok(())
}
pub fn get_completion(&mut self, wait: bool) -> Result<Option<u64>, Error> {
let mut cqe: *mut io_uring_cqe = unsafe { std::mem::zeroed() };
let ret = if wait {
unsafe { io_uring_wait_cqe(&mut self.ring, &mut cqe) }
} else {
unsafe { io_uring_peek_cqe(&mut self.ring, &mut cqe) }
};
if ret < 0 {
return Err(Error::CantCheckCompletionQueue(
io::Error::from_raw_os_error(ret),
));
}
if cqe.is_null() {
return Ok(None);
}
let mut req = unsafe { &mut *((*cqe).user_data as *mut UringRequest) };
unsafe {
if (*cqe).res < 0 {
return Err(Error::IOError(io::Error::from_raw_os_error((*cqe).res)));
} else if ((*cqe).res as usize) < req.iov.iov_len {
req.iov.iov_base =
((req.iov.iov_base as usize) + (*cqe).res as usize) as *mut core::ffi::c_void;
req.iov.iov_len -= (*cqe).res as usize;
let sqe = io_uring_get_sqe(&mut self.ring);
if sqe.is_null() {
panic!("can't get sqe");
}
match req.request_type {
RequestType::Read => {
io_uring_prep_readv(sqe, req.fd, &mut req.iov, 1, req.offset)
}
RequestType::Write => {
io_uring_prep_writev(sqe, req.fd, &mut req.iov, 1, req.offset)
}
};
(*sqe).user_data = req as *const _ as u64;
if !wait {
return Ok(None);
} else {
return self.get_completion(wait);
}
}
}
if req.self_allocated {
if req.request_type == RequestType::Read {
unsafe {
libc::memcpy(
req.client_iov.iov_base,
req.iov.iov_base,
req.client_iov.iov_len,
)
};
}
unsafe { libc::free(req.iov.iov_base) };
}
unsafe { libc::free(req as *const _ as *mut core::ffi::c_void) };
unsafe { io_uring_cqe_seen(&mut self.ring, cqe) };
Ok(Some(req.client_cookie))
}
}
impl Drop for UringQueue {
fn drop(&mut self) {
unsafe { io_uring_queue_exit(&mut self.ring) };
}
}
#[cfg(test)]
mod tests {
use crate::{Error, UringQueue};
use std::fs::File;
use std::io::Write;
use std::os::unix::io::AsRawFd;
use tempfile::tempfile;
fn prepare_test_file() -> Result<File, Error> {
let mut file = tempfile().unwrap();
writeln!(file, "Test file").unwrap();
Ok(file)
}
#[test]
fn create_queue() -> Result<(), Error> {
let _queue = UringQueue::new(128)?;
Ok(())
}
#[test]
fn submit_read() -> Result<(), Error> {
let mut queue = UringQueue::new(128)?;
let mut buf: [u8; 1] = [0u8];
let file = prepare_test_file()?;
queue.prepare_read(file.as_raw_fd(), &mut buf[..], 0, 0)?;
queue.submit_requests()?;
Ok(())
}
#[test]
fn submit_write() -> Result<(), Error> {
let mut queue = UringQueue::new(128)?;
let buf: [u8; 1] = [0u8];
let file = prepare_test_file()?;
queue.prepare_write(file.as_raw_fd(), &buf[..], 0, 0)?;
queue.submit_requests()?;
Ok(())
}
#[test]
fn submit_read_and_wait_for_completion() -> Result<(), Error> {
let cookie: u64 = 1234;
let mut queue = UringQueue::new(128)?;
let mut buf: [u8; 1] = [0u8];
let file = prepare_test_file()?;
queue.prepare_read(file.as_raw_fd(), &mut buf[..], 0, cookie)?;
queue.submit_requests()?;
let completion_cookie_opt = queue.get_completion(true)?;
assert!(completion_cookie_opt.is_some());
let completion_cookie = completion_cookie_opt.unwrap();
assert!(
completion_cookie == cookie,
"completion_cookie={} cookie={}",
completion_cookie,
cookie
);
Ok(())
}
#[test]
fn submit_read_and_peek_for_completion() -> Result<(), Error> {
let cookie: u64 = 1234;
let mut queue = UringQueue::new(128)?;
let mut buf: [u8; 1] = [0u8];
let file = prepare_test_file()?;
queue.prepare_read(file.as_raw_fd(), &mut buf[..], 0, cookie)?;
queue.submit_requests()?;
let completion_cookie_opt = queue.get_completion(true)?;
assert!(completion_cookie_opt.is_some());
let completion_cookie = completion_cookie_opt.unwrap();
let completion_cookie_opt = queue.get_completion(false)?;
assert!(completion_cookie_opt.is_none());
assert!(
completion_cookie == cookie,
"completion_cookie={} cookie={}",
completion_cookie,
cookie
);
Ok(())
}
#[test]
fn submit_write_and_check_contents() -> Result<(), Error> {
let wcookie: u64 = 1234;
let rcookie: u64 = 4321;
let mut queue = UringQueue::new(128)?;
let test_str = "write test";
let mut buf: Vec<u8> = vec![];
let file = prepare_test_file()?;
queue.prepare_write(file.as_raw_fd(), &test_str.as_bytes(), 0, wcookie)?;
queue.submit_requests()?;
let completion_cookie_opt = queue.get_completion(true)?;
assert!(completion_cookie_opt.is_some());
let completion_cookie = completion_cookie_opt.unwrap();
assert!(
completion_cookie == wcookie,
"completion_cookie={} cookie={}",
completion_cookie,
wcookie
);
buf.resize_with(test_str.as_bytes().len(), Default::default);
queue.prepare_read(file.as_raw_fd(), &mut buf[..], 0, rcookie)?;
queue.submit_requests()?;
let completion_cookie_opt = queue.get_completion(true)?;
assert!(completion_cookie_opt.is_some());
let completion_cookie = completion_cookie_opt.unwrap();
assert!(
completion_cookie == rcookie,
"completion_cookie={} cookie={}",
completion_cookie,
rcookie
);
let result_str = std::str::from_utf8(&buf).unwrap();
assert!(
result_str == test_str,
"result_str={}, test_str={}",
result_str,
test_str
);
Ok(())
}
}