#![cfg(target_os = "linux")]
#![allow(warnings)]
use std::cell::UnsafeCell;
use std::os::fd::{AsRawFd, RawFd};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::time::Duration;
use crate::driver::{CompletionEntry, Driver, ERROR_TRANSPORT, Interest, SubmitEntry};
const MIN_IOURING_SIZE: u32 = 32;
const MAX_CQES: u32 = 256;
#[repr(C)]
#[derive(Clone, Copy, Debug, Default)]
struct IoUringParams {
sq_entries: u32,
cq_entries: u32,
flags: u32,
_resv: [u32; 5],
sq_off: IoUringOffsets,
cq_off: IoUringOffsets,
}
#[repr(C)]
#[derive(Clone, Copy, Debug, Default)]
struct IoUringOffsets {
head: u32,
tail: u32,
ring_mask: u32,
ring_entries: u32,
flags: u32,
dropped: u32,
array: u32,
overflow: u32,
cqes: u32,
_resv: [u32; 2],
}
#[repr(C)]
#[derive(Clone, Copy)]
struct SubmissionQueueEntry {
opcode: u8,
flags: u8,
ioprio: u16,
fd: i32,
offset: u64,
addr: u64,
len: u32,
rw_flags: i32,
user_data: u64,
buf_index: u16,
personality: u16,
_spare: [u64; 3],
}
#[repr(C)]
#[derive(Clone, Copy)]
struct CompletionQueueEntry {
user_data: u64,
res: i32,
flags: u32,
_resv: [u64; 2],
}
struct SubmissionQueue {
head: *const u32,
tail: *const u32,
ring_mask: *const u32,
ring_entries: *const u32,
flags: *const u32,
array: *mut u32,
sqes: *mut SubmissionQueueEntry,
ring_mask_value: u32,
entries: u32,
}
unsafe impl Send for SubmissionQueue {}
unsafe impl Sync for SubmissionQueue {}
struct CompletionQueue {
head: *const u32,
tail: *const u32,
ring_mask: *const u32,
ring_entries: *const u32,
overflow: *const u32,
cqes: *const CompletionQueueEntry,
ring_mask_value: u32,
}
unsafe impl Send for CompletionQueue {}
unsafe impl Sync for CompletionQueue {}
struct IoUringState {
sq_head: AtomicU32,
sq_tail: AtomicU32,
cq_head: AtomicU32,
cq_tail: AtomicU32,
sq_len: AtomicUsize,
}
pub struct IoUringDriver {
ring_fd: RawFd,
sq_ring: *mut u8,
cq_ring: *mut u8,
sqes: *mut SubmissionQueueEntry,
sq: SubmissionQueue,
cq: CompletionQueue,
capacity: usize,
state: Arc<IoUringState>,
submit_queue: UnsafeCell<Vec<SubmitEntry>>,
completion_queue: UnsafeCell<Vec<Option<CompletionEntry>>>,
sq_ring_mmap_size: usize,
cq_ring_mmap_size: usize,
#[cfg(debug_assertions)]
owner_thread: std::thread::ThreadId,
}
unsafe impl Send for IoUringDriver {}
unsafe impl Sync for IoUringDriver {}
impl IoUringDriver {
#[inline]
#[cfg(debug_assertions)]
fn assert_owner(&self) {
assert_eq!(
std::thread::current().id(),
self.owner_thread,
"IoUringDriver accessed from wrong thread: thread-per-core violation"
);
}
#[inline]
#[cfg(not(debug_assertions))]
fn assert_owner(&self) {}
pub fn new() -> std::io::Result<Self> {
Self::with_config(crate::driver::DriverConfig::default())
}
pub fn with_config(config: crate::driver::DriverConfig) -> std::io::Result<Self> {
let entries = config.entries.max(MIN_IOURING_SIZE);
let mut params = IoUringParams {
sq_entries: entries,
cq_entries: entries,
flags: 0, ..Default::default()
};
let ring_fd = unsafe {
libc::syscall(
425, entries as libc::c_long,
&mut params as *mut _ as libc::c_long,
) as RawFd
};
if ring_fd < 0 {
return Err(std::io::Error::last_os_error());
}
let sq_ring_size = unsafe {
(params.sq_off.array as usize) + (params.sq_entries as usize) * 4
};
let cq_ring_size = unsafe {
(params.cq_off.array as usize) + (params.cq_entries as usize) * 16
};
let sqes_size = (params.sq_entries as usize) * size_of::<SubmissionQueueEntry>();
let sq_ring = unsafe {
libc::mmap(
std::ptr::null_mut(),
sq_ring_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED | libc::MAP_POPULATE,
ring_fd,
0, ) as *mut u8
};
if sq_ring as *mut libc::c_void == libc::MAP_FAILED {
unsafe { libc::close(ring_fd) };
return Err(std::io::Error::last_os_error());
}
let cq_ring = unsafe {
libc::mmap(
std::ptr::null_mut(),
cq_ring_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED | libc::MAP_POPULATE,
ring_fd,
sq_ring_size as libc::off_t, ) as *mut u8
};
if cq_ring as *mut libc::c_void == libc::MAP_FAILED {
unsafe {
libc::munmap(sq_ring as *mut libc::c_void, sq_ring_size);
libc::close(ring_fd);
}
return Err(std::io::Error::last_os_error());
}
let sqes = unsafe {
libc::mmap(
std::ptr::null_mut(),
sqes_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED | libc::MAP_POPULATE,
ring_fd,
0x8000_0000_usize as libc::off_t, ) as *mut SubmissionQueueEntry
};
if sqes as *mut libc::c_void == libc::MAP_FAILED {
unsafe {
libc::munmap(sq_ring as *mut libc::c_void, sq_ring_size);
libc::munmap(cq_ring as *mut libc::c_void, cq_ring_size);
libc::close(ring_fd);
}
return Err(std::io::Error::last_os_error());
}
let sq = unsafe {
let sq_ptr = sq_ring as *const u8;
SubmissionQueue {
head: sq_ptr.add(params.sq_off.head as usize) as *const u32,
tail: sq_ptr.add(params.sq_off.tail as usize) as *const u32,
ring_mask: sq_ptr.add(params.sq_off.ring_mask as usize) as *const u32,
ring_entries: sq_ptr.add(params.sq_off.ring_entries as usize) as *const u32,
flags: sq_ptr.add(params.sq_off.flags as usize) as *const u32,
array: sq_ptr.add(params.sq_off.array as usize) as *mut u32,
sqes: sqes as *mut SubmissionQueueEntry,
ring_mask_value: *(sq_ptr.add(params.sq_off.ring_mask as usize) as *const u32),
entries: params.sq_entries,
}
};
let cq = unsafe {
let cq_ptr = cq_ring as *const u8;
CompletionQueue {
head: cq_ptr.add(params.cq_off.head as usize) as *const u32,
tail: cq_ptr.add(params.cq_off.tail as usize) as *const u32,
ring_mask: cq_ptr.add(params.cq_off.ring_mask as usize) as *const u32,
ring_entries: cq_ptr.add(params.cq_off.ring_entries as usize) as *const u32,
overflow: cq_ptr.add(params.cq_off.overflow as usize) as *const u32,
cqes: cq_ptr.add(params.cq_off.cqes as usize) as *const CompletionQueueEntry,
ring_mask_value: *(cq_ptr.add(params.cq_off.ring_mask as usize) as *const u32),
}
};
let capacity = entries as usize;
Ok(Self {
ring_fd,
sq_ring,
cq_ring,
sqes,
sq,
cq,
capacity,
state: Arc::new(IoUringState {
sq_head: AtomicU32::new(0),
sq_tail: AtomicU32::new(0),
cq_head: AtomicU32::new(0),
cq_tail: AtomicU32::new(0),
sq_len: AtomicUsize::new(0),
}),
submit_queue: UnsafeCell::new(vec![SubmitEntry::new(-1, 0, 0); capacity]),
completion_queue: UnsafeCell::new(vec![None; capacity]),
sq_ring_mmap_size: sq_ring_size,
cq_ring_mmap_size: cq_ring_size,
#[cfg(debug_assertions)]
owner_thread: std::thread::current().id(),
})
}
#[inline]
fn sq_pos(&self, index: u32) -> u32 {
index & self.sq.ring_mask_value
}
#[inline]
fn cq_pos(&self, index: u32) -> u32 {
index & self.cq.ring_mask_value
}
fn submit_to_kernel(&self) -> std::io::Result<usize> {
let head = unsafe { *self.sq.head };
let tail = self.state.sq_tail.load(Ordering::Acquire);
let to_submit = tail - head;
if to_submit == 0 {
return Ok(0);
}
unsafe {
*(self.sq.tail as *mut u32) = tail;
}
let result = unsafe {
libc::syscall(
426, self.ring_fd as libc::c_long,
to_submit as libc::c_long,
0, 1, std::ptr::null_mut::<libc::sigset_t>(),
) as libc::c_long
};
if result < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(result as usize)
}
}
fn get_free_sqe(&self) -> Option<*mut SubmissionQueueEntry> {
let head = unsafe { *self.sq.head };
let tail = self.state.sq_tail.load(Ordering::Acquire);
let next_tail = tail + 1;
if next_tail - head >= self.sq.entries {
return None;
}
let index = self.sq_pos(tail);
unsafe { Some(self.sq.sqes.add(index as usize)) }
}
}
impl Drop for IoUringDriver {
fn drop(&mut self) {
let sqes_size = self.capacity * size_of::<SubmissionQueueEntry>();
unsafe {
libc::munmap(self.sq_ring as *mut libc::c_void, self.sq_ring_mmap_size);
libc::munmap(self.cq_ring as *mut libc::c_void, self.cq_ring_mmap_size);
libc::munmap(self.sqes as *mut libc::c_void, sqes_size);
libc::close(self.ring_fd);
}
}
}
impl AsRawFd for IoUringDriver {
fn as_raw_fd(&self) -> RawFd {
self.ring_fd
}
}
impl Driver for IoUringDriver {
fn submit(&self) -> std::io::Result<usize> {
let mut submitted = 0;
self.assert_owner();
let len = self.state.sq_len.load(Ordering::Acquire);
for i in 0..len {
let submit_queue = unsafe { &*self.submit_queue.get() };
let entry = &submit_queue[i];
if entry.fd >= 0 {
if let Some(sqe) = self.get_free_sqe() {
unsafe {
(*sqe).opcode = entry.opcode;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = entry.fd;
(*sqe).offset = entry.offset as u64;
(*sqe).addr = entry.buf_ptr.map_or(0, |p| p.as_ptr() as u64);
(*sqe).len = entry.buf_len;
(*sqe).rw_flags = 0;
(*sqe).user_data = entry.user_data;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
let tail = self.state.sq_tail.load(Ordering::Acquire);
let index = self.sq_pos(tail);
*self.sq.array.add(index as usize) = index;
self.state.sq_tail.store(tail + 1, Ordering::Release);
}
submitted += 1;
}
}
}
self.state.sq_len.store(0, Ordering::Release);
let _kernel_submitted = self.submit_to_kernel()?;
Ok(submitted)
}
fn wait(&self) -> std::io::Result<usize> {
self.wait_timeout(Duration::from_secs(1)).map(|(n, _)| n)
}
fn wait_timeout(&self, duration: Duration) -> std::io::Result<(usize, bool)> {
let ts = libc::timespec {
tv_sec: duration.as_secs() as libc::time_t,
tv_nsec: duration.subsec_nanos() as libc::c_long,
};
let result = unsafe {
libc::syscall(
426, self.ring_fd as libc::c_long,
0, 1, 2, &ts as *const _ as *const libc::sigset_t,
) as libc::c_long
};
if result < 0 {
return Err(std::io::Error::last_os_error());
}
self.assert_owner();
let mut completed = 0;
let head = self.state.cq_head.load(Ordering::Acquire);
let tail = unsafe { *self.cq.tail };
while head != tail {
let index = self.cq_pos(head);
let cqe = unsafe { &*self.cq.cqes.add(index as usize) };
unsafe {
let completion_queue = &mut *self.completion_queue.get();
let pos = self.state.cq_tail.load(Ordering::Acquire) as usize % self.capacity;
completion_queue[pos] = Some(CompletionEntry {
user_data: (*cqe).user_data,
result: if (*cqe).res < 0 {
ERROR_TRANSPORT
} else {
(*cqe).res
},
flags: (*cqe).flags,
});
self.state.cq_tail.fetch_add(1, Ordering::Release);
}
completed += 1;
unsafe {
*(self.cq.head as *mut u32) = head + 1;
}
}
self.state.cq_head.store(tail, Ordering::Release);
let timed_out = completed == 0;
Ok((completed, timed_out))
}
fn get_submission(&self) -> Option<&mut SubmitEntry> {
self.assert_owner();
let len = self.state.sq_len.load(Ordering::Acquire);
if len >= self.capacity {
return None;
}
self.state.sq_len.fetch_add(1, Ordering::Release);
unsafe {
let submit_queue = &mut *self.submit_queue.get();
Some(&mut submit_queue[len])
}
}
fn get_completion(&self) -> Option<&CompletionEntry> {
self.assert_owner();
let head = self.state.cq_head.load(Ordering::Acquire);
let tail = self.state.cq_tail.load(Ordering::Acquire);
if head == tail {
return None;
}
unsafe {
let completion_queue = &*self.completion_queue.get();
let pos = head as usize % self.capacity;
completion_queue[pos].as_ref()
}
}
fn advance_completion(&self) {
self.assert_owner();
let head = self.state.cq_head.load(Ordering::Acquire);
let tail = self.state.cq_tail.load(Ordering::Acquire);
if head != tail {
unsafe {
let completion_queue = &mut *self.completion_queue.get();
let pos = head as usize % self.capacity;
completion_queue[pos] = None;
}
self.state.cq_head.fetch_add(1, Ordering::Release);
}
}
fn register(&self, fd: RawFd, interest: Interest) -> std::io::Result<()> {
let mut events = 0i16;
if interest.readable {
events |= libc::POLLIN as i16;
}
if interest.writable {
events |= libc::POLLOUT as i16;
}
let sqe = self.get_free_sqe().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::WouldBlock, "Submission queue full")
})?;
unsafe {
(*sqe).opcode = 6; (*sqe).fd = fd;
(*sqe).addr = events as u64;
(*sqe).len = 0;
(*sqe).user_data = fd as u64;
let tail = self.state.sq_tail.load(Ordering::Acquire);
let index = self.sq_pos(tail);
*self.sq.array.add(index as usize) = index;
self.state.sq_tail.store(tail + 1, Ordering::Release);
}
Ok(())
}
fn deregister(&self, fd: RawFd) -> std::io::Result<()> {
let sqe = self.get_free_sqe().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::WouldBlock, "Submission queue full")
})?;
unsafe {
(*sqe).opcode = 7; (*sqe).fd = -1;
(*sqe).addr = fd as u64;
(*sqe).user_data = fd as u64;
let tail = self.state.sq_tail.load(Ordering::Acquire);
let index = self.sq_pos(tail);
*self.sq.array.add(index as usize) = index;
self.state.sq_tail.store(tail + 1, Ordering::Release);
}
Ok(())
}
fn modify(&self, fd: RawFd, interest: Interest) -> std::io::Result<()> {
self.deregister(fd)?;
self.register(fd, interest)
}
fn submission_capacity(&self) -> usize {
self.capacity
}
fn completion_capacity(&self) -> usize {
self.capacity
}
fn supports_operation(&self, opcode: u8) -> bool {
matches!(
opcode,
crate::driver::opcode::READ
| crate::driver::opcode::WRITE
| crate::driver::opcode::FSYNC
| crate::driver::opcode::CLOSE
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_iouring_driver_creation() {
let driver = IoUringDriver::new();
let _ = driver;
}
#[test]
fn test_iouring_params_size() {
assert_eq!(size_of::<IoUringParams>(), 40);
}
#[test]
fn test_submission_queue_entry_size() {
assert_eq!(size_of::<SubmissionQueueEntry>(), 64);
}
#[test]
fn test_completion_queue_entry_size() {
assert_eq!(size_of::<CompletionQueueEntry>(), 16);
}
}