use std::collections::hash_map::{self, HashMap};
use std::io::{Error, Result};
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
use io_uring::{opcode, types, IoUring};
use super::{Operation, Request};
use crate::channel::parallel;
use crate::channel::prelude::*;
use crate::net::Registry;
impl Request {
pub fn uring_entry(&self) -> io_uring::squeue::Entry {
let entry = match self.operation {
Operation::Open { path, flags, mode } => {
opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), path).flags(flags).mode(mode as libc::mode_t).build()
},
Operation::Read { fd, buf, len, offset } => {
opcode::Read::new(types::Fd(fd), buf, len as u32).offset(offset as u64).build()
},
Operation::Write { fd, buf, len, offset } => {
opcode::Write::new(types::Fd(fd), buf, len as u32).offset(offset as u64).build()
},
Operation::Fsync { fd, only_data } => {
let flags = if only_data { types::FsyncFlags::DATASYNC } else { types::FsyncFlags::empty() };
opcode::Fsync::new(types::Fd(fd)).flags(flags).build()
},
Operation::Stat { fd, path, metadata } => {
let stat = unsafe { &(*metadata).stat as *const _ as *mut _ };
opcode::Statx::new(types::Fd(fd), path, stat)
.flags(libc::AT_SYMLINK_NOFOLLOW | libc::AT_STATX_SYNC_AS_STAT | libc::AT_EMPTY_PATH)
.mask(libc::STATX_ALL)
.build()
},
Operation::Truncate { .. } => unreachable!("no truncate support on linux uring"),
Operation::Cancel { user_data } => opcode::AsyncCancel::new(user_data as u64).build(),
};
entry.user_data(self.result.user_data() as u64)
}
}
struct State {
entries: Vec<io_uring::squeue::Entry>,
requests: HashMap<usize, Request>,
}
impl State {
fn new() -> Self {
Self { entries: Vec::with_capacity(1024), requests: HashMap::with_capacity(4096) }
}
fn push(&mut self, request: Request) {
if let Operation::Cancel { user_data } = request.operation {
match self.requests.entry(user_data) {
hash_map::Entry::Vacant(_) => {
request.result.wake(Ok(0));
return;
},
hash_map::Entry::Occupied(entry) => {
if let Some(i) = self.entries.iter().position(|entry| entry.get_user_data() == user_data as u64) {
request.result.wake(Ok(0));
self.entries.swap_remove(i);
let (_, cancelled) = entry.remove_entry();
cancelled.result.wake(Err(Error::from_raw_os_error(libc::ECANCELED)));
return;
}
},
}
}
let entry = request.uring_entry();
self.requests.insert(request.result.user_data(), request);
self.entries.push(entry);
}
}
pub struct Uring {
state: State,
uring: IoUring,
eventfd: OwnedFd,
}
impl Uring {
pub fn new(registry: &Registry) -> Result<(Self, parallel::Receiver<()>)> {
let eventfd = unsafe { OwnedFd::from_raw_fd(libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC)) };
let (_token, readable) = registry.register_reader(&eventfd.as_raw_fd())?;
let uring = IoUring::builder()
.dontfork()
.setup_clamp()
.setup_r_disabled()
.setup_single_issuer()
.setup_coop_taskrun()
.setup_submit_all()
.build(4096)?;
Ok((Self { uring, eventfd, state: State::new() }, readable))
}
pub fn is_empty(&self) -> bool {
self.state.requests.is_empty()
}
pub fn start(&self) -> Result<()> {
let submitter = self.uring.submitter();
submitter.register_eventfd_async(self.eventfd.as_raw_fd())?;
submitter.register_enable_rings()?;
Ok(())
}
pub fn submit(&mut self, request: Request, requester: &mut parallel::Receiver<Request>) {
let mut submission = self.uring.submission();
let max = submission.capacity() - submission.len();
self.state.push(request);
while self.state.entries.len() < max {
let Ok(request) = requester.try_recv() else {
break;
};
self.state.push(request);
}
unsafe {
submission.push_multiple(&self.state.entries).unwrap();
}
self.state.entries.clear();
drop(submission);
self.uring.submit().unwrap();
self.consume();
}
pub fn consume(&mut self) {
let mut buf = [0u8; 0];
_ = unsafe { libc::read(self.eventfd.as_raw_fd(), buf.as_mut_ptr() as *mut libc::c_void, 8) };
for entry in self.uring.completion() {
let user_data = entry.user_data() as usize;
let request = self.state.requests.remove(&user_data).expect("no uring request found");
let result = entry.result();
let result = if result < 0 { Err(Error::from_raw_os_error(-result)) } else { Ok(result) };
request.result.wake(result);
}
}
}