use std::{
collections::{HashMap, VecDeque},
io,
mem::ManuallyDrop,
os,
path::Path,
ptr::{self, NonNull},
};
use io_uring::{IoUring, opcode, types};
use crate::{
Completions, DirEntry, FstatHandle, Io, IoBuf, IoBufMut, OpHandle, OpenOptions, ReadHandle,
Statx, WriteHandle,
};
fn sq_full_error() -> io::Error {
io::Error::new(io::ErrorKind::WouldBlock, "submission queue is full")
}
struct FixedBufPool {
base: NonNull<u8>,
buf_size: usize,
count: u16,
free: VecDeque<u16>,
}
impl FixedBufPool {
pub fn new(count: u16, buf_size: usize) -> io::Result<Self> {
let total = buf_size * count as usize;
let ptr = unsafe {
libc::mmap(
ptr::null_mut(),
total,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_ANONYMOUS | libc::MAP_PRIVATE,
-1,
0,
)
};
if ptr == libc::MAP_FAILED {
return Err(io::Error::last_os_error());
}
let base = NonNull::new(ptr as *mut u8).expect("mmap succeeded");
let free = (0..count).collect();
Ok(Self {
base,
buf_size,
count,
free,
})
}
fn buf_index(&self, ptr: *const u8) -> Option<u16> {
let base = self.base.as_ptr() as usize;
let p = ptr as usize;
let offset = p.wrapping_sub(base);
if offset < self.buf_size * self.count as usize && offset % self.buf_size == 0 {
Some((offset / self.buf_size) as u16)
} else {
None
}
}
fn acquire(&mut self) -> Option<FixedBuf> {
let index = self.free.pop_front()?;
let offset = self.buf_size * index as usize;
let ptr = unsafe { self.base.add(offset) };
Some(FixedBuf {
ptr,
buf_index: index,
capacity: self.buf_size,
len: 0,
})
}
fn iovecs(&self) -> Vec<libc::iovec> {
(0..self.count)
.map(|i| libc::iovec {
iov_base: unsafe { self.base.as_ptr().add(self.buf_size * i as usize) } as *mut _,
iov_len: self.buf_size,
})
.collect()
}
}
impl Drop for FixedBufPool {
fn drop(&mut self) {
let total = self.buf_size * self.count as usize;
let ret = unsafe { libc::munmap(self.base.as_ptr() as *mut _, total) };
if ret != 0 {
error!(
"munmap failed for fixed buffer pool: {}",
io::Error::last_os_error()
);
}
}
}
pub struct FixedBuf {
ptr: NonNull<u8>,
buf_index: u16,
capacity: usize,
len: usize,
}
unsafe impl IoBuf for FixedBuf {
fn stable_ptr(&self) -> *const u8 {
self.ptr.as_ptr()
}
fn bytes_init(&self) -> usize {
self.len
}
fn bytes_total(&self) -> usize {
self.capacity
}
}
unsafe impl IoBufMut for FixedBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.ptr.as_ptr()
}
unsafe fn set_init(&mut self, pos: usize) {
self.len = pos
}
}
impl Statx for libc::statx {
fn stx_size(&self) -> u64 {
self.stx_size
}
}
const BLOCK_SIZE: usize = 4096;
pub struct LinuxIoConfig {
pub entries: u32,
pub buf_count: u16,
}
impl Default for LinuxIoConfig {
fn default() -> Self {
Self {
entries: 256,
buf_count: 256,
}
}
}
pub struct LinuxIo {
ring: ManuallyDrop<IoUring>,
pending_paths: HashMap<OpHandle, Vec<std::ffi::CString>>,
completions: Completions,
pool: ManuallyDrop<FixedBufPool>,
in_flight: usize,
}
impl LinuxIo {
pub fn new(config: LinuxIoConfig) -> io::Result<Self> {
let ring = IoUring::builder()
.setup_sqpoll(2000) .build(config.entries)?;
let pool = FixedBufPool::new(config.buf_count, BLOCK_SIZE)?;
let iovecs = pool.iovecs();
unsafe { ring.submitter().register_buffers(&iovecs)? };
Ok(Self {
ring: ManuallyDrop::new(ring),
pending_paths: HashMap::new(),
completions: Vec::new(),
pool: ManuallyDrop::new(pool),
in_flight: 0,
})
}
}
impl Drop for LinuxIo {
fn drop(&mut self) {
unsafe { ManuallyDrop::drop(&mut self.ring) };
unsafe { ManuallyDrop::drop(&mut self.pool) };
}
}
impl Io for LinuxIo {
fn block_size(&self) -> usize {
BLOCK_SIZE
}
fn now(&self) -> std::time::Duration {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time does not go backwards")
}
type Fd = os::unix::io::RawFd;
unsafe fn into_fd(result: u32) -> Self::Fd {
result as os::unix::io::RawFd
}
type RegisteredBuf = FixedBuf;
fn acquire_buf(&mut self) -> Option<Self::RegisteredBuf> {
self.pool.acquire()
}
fn release_buf(&mut self, buf: Self::RegisteredBuf) {
self.pool.free.push_back(buf.buf_index);
}
type Statx = libc::statx;
fn fstat(&mut self, fd: Self::Fd, handle: OpHandle) -> io::Result<FstatHandle<Self::Statx>> {
let mut statx = Box::new(unsafe { std::mem::zeroed::<libc::statx>() });
let entry = opcode::Statx::new(
types::Fd(fd),
b"\0".as_ptr() as _,
std::ptr::from_mut(statx.as_mut()) as *mut _,
)
.flags(libc::AT_EMPTY_PATH) .mask(libc::STATX_SIZE) .build()
.user_data(handle.0);
unsafe {
self.ring
.submission()
.push(&entry)
.map_err(|_| sq_full_error())?;
}
self.in_flight += 1;
Ok(FstatHandle::new(statx))
}
fn open(&mut self, path: &Path, opts: OpenOptions, handle: OpHandle) -> io::Result<()> {
let flags = opts.to_libc_flags();
let mode = 0o644u32;
let path_c = std::ffi::CString::new(path.as_os_str().as_encoded_bytes()).unwrap();
let entry = opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), path_c.as_ptr())
.flags(flags)
.mode(mode)
.build()
.user_data(handle.0);
unsafe {
self.ring
.submission()
.push(&entry)
.map_err(|_| sq_full_error())?;
};
self.in_flight += 1;
self.pending_paths.insert(handle, vec![path_c]);
Ok(())
}
fn close(&mut self, fd: Self::Fd, handle: OpHandle) -> io::Result<()> {
let entry = opcode::Close::new(types::Fd(fd))
.build()
.user_data(handle.0);
unsafe {
self.ring
.submission()
.push(&entry)
.map_err(|_| sq_full_error())?;
}
self.in_flight += 1;
Ok(())
}
fn read_at<B: IoBufMut>(
&mut self,
fd: Self::Fd,
mut buf: B,
offset: u64,
handle: OpHandle,
) -> Result<ReadHandle<B>, (std::io::Error, B)> {
let entry = match self.pool.buf_index(buf.stable_mut_ptr()) {
Some(idx) => opcode::ReadFixed::new(
types::Fd(fd),
buf.stable_mut_ptr(),
buf.bytes_total() as _,
idx,
)
.offset(offset)
.build(),
None => opcode::Read::new(types::Fd(fd), buf.stable_mut_ptr(), buf.bytes_total() as _)
.offset(offset)
.build(),
}
.user_data(handle.0);
unsafe {
if self.ring.submission().push(&entry).is_err() {
return Err((sq_full_error(), buf));
}
}
self.in_flight += 1;
Ok(ReadHandle::new(buf))
}
fn write_at<B: IoBuf>(
&mut self,
fd: Self::Fd,
buf: B,
offset: u64,
handle: OpHandle,
) -> Result<WriteHandle<B>, (std::io::Error, B)> {
let entry = match self.pool.buf_index(buf.stable_ptr()) {
Some(idx) => {
opcode::WriteFixed::new(types::Fd(fd), buf.stable_ptr(), buf.bytes_init() as _, idx)
.offset(offset)
.build()
}
None => opcode::Write::new(types::Fd(fd), buf.stable_ptr(), buf.bytes_init() as _)
.offset(offset)
.build(),
}
.user_data(handle.0);
unsafe {
if self.ring.submission().push(&entry).is_err() {
return Err((sq_full_error(), buf));
}
}
self.in_flight += 1;
Ok(WriteHandle::new(buf))
}
fn fsync(&mut self, fd: Self::Fd, handle: OpHandle) -> io::Result<()> {
let entry = opcode::Fsync::new(types::Fd(fd))
.build()
.user_data(handle.0);
unsafe {
self.ring
.submission()
.push(&entry)
.map_err(|_| sq_full_error())?;
}
self.in_flight += 1;
Ok(())
}
fn rename(&mut self, from: &Path, to: &Path, handle: OpHandle) -> io::Result<()> {
let from_c = std::ffi::CString::new(from.as_os_str().as_encoded_bytes()).unwrap();
let to_c = std::ffi::CString::new(to.as_os_str().as_encoded_bytes()).unwrap();
let entry = opcode::RenameAt::new(
types::Fd(libc::AT_FDCWD),
from_c.as_ptr(),
types::Fd(libc::AT_FDCWD),
to_c.as_ptr(),
)
.build()
.user_data(handle.0);
unsafe {
self.ring
.submission()
.push(&entry)
.map_err(|_| sq_full_error())?;
}
self.in_flight += 1;
self.pending_paths.insert(handle, vec![from_c, to_c]);
Ok(())
}
fn remove(&mut self, path: &Path, handle: OpHandle) -> io::Result<()> {
let path_c = std::ffi::CString::new(path.as_os_str().as_encoded_bytes()).unwrap();
let entry = opcode::UnlinkAt::new(types::Fd(libc::AT_FDCWD), path_c.as_ptr())
.build()
.user_data(handle.0);
unsafe {
self.ring
.submission()
.push(&entry)
.map_err(|_| sq_full_error())?;
}
self.in_flight += 1;
self.pending_paths.insert(handle, vec![path_c]);
Ok(())
}
fn poll(&mut self) -> io::Result<()> {
self.ring.submission().sync();
self.ring.submit_and_wait(0)?;
let mut cq = self.ring.completion();
cq.sync();
self.in_flight -= cq.len();
for cqe in cq {
let handle = OpHandle(cqe.user_data());
let result = if cqe.result() < 0 {
Err(io::Error::from_raw_os_error(-cqe.result()))
} else {
Ok(cqe.result() as u32)
};
self.pending_paths.remove(&handle);
self.completions.push((handle, result));
}
Ok(())
}
fn in_flight(&self) -> usize {
self.in_flight
}
fn park(&mut self) -> io::Result<()> {
self.ring.submission().sync();
self.ring.submit_and_wait(1).map(|_| ())
}
fn completions(&mut self) -> &mut Completions {
&mut self.completions
}
fn list_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
let mut results = Vec::new();
for entry in std::fs::read_dir(path)? {
let entry = entry?;
let metadata = entry.metadata()?;
results.push(DirEntry {
path: entry.path(),
is_dir: metadata.is_dir(),
});
}
Ok(results)
}
fn create_dir_all(&self, path: &Path) -> io::Result<()> {
std::fs::create_dir_all(path)
}
}