use crate::callback_worker::Worker;
use nix::fcntl::{FallocateFlags, fallocate};
use nix::unistd::fsync;
use crate::tasks::{IOAction, IOCallback, IOEvent};
use crossfire::{BlockingRxTrait, Rx, SendError, Tx, spsc};
use nix::errno::Errno;
use std::collections::VecDeque;
use std::fs::File;
use std::os::fd::RawFd;
use std::os::unix::io::BorrowedFd;
use std::{
cell::UnsafeCell, io, mem::transmute, os::fd::AsRawFd, sync::Arc, thread, time::Duration,
};
pub struct AioSlot<C: IOCallback> {
pub(crate) iocb: iocb,
pub(crate) event: Option<Box<IOEvent<C>>>,
}
impl<C: IOCallback> AioSlot<C> {
pub fn new(slot_id: u64) -> Self {
Self { iocb: iocb { aio_data: slot_id, aio_reqprio: 1, ..Default::default() }, event: None }
}
pub fn fill_exit_slot(&mut self, slot_id: u16, fd: RawFd) {
let iocb = &mut self.iocb;
iocb.aio_data = slot_id as libc::__u64;
iocb.aio_fildes = fd as libc::__u32;
iocb.aio_lio_opcode = IOAction::Read as u16;
iocb.aio_buf = 0;
iocb.aio_nbytes = 0;
iocb.aio_offset = 0;
}
#[inline(always)]
pub fn fill_buffer_slot(&mut self, slot_id: u16, mut event: Box<IOEvent<C>>) {
let iocb = &mut self.iocb;
iocb.aio_data = slot_id as libc::__u64;
iocb.aio_fildes = event.fd as libc::__u32;
let (_offset, p, l) = event.get_param_for_io();
iocb.aio_lio_opcode = event.action as u16;
iocb.aio_buf = p as u64;
iocb.aio_nbytes = l as u64;
iocb.aio_offset = _offset as i64;
self.event.replace(event);
}
#[inline(always)]
pub fn set_result<W: Worker<C>>(&mut self, written: usize, cb: &W) {
if let Some(mut event) = self.event.take() {
event.set_copied(written);
cb.done(event);
}
}
#[inline(always)]
pub fn set_error<W: Worker<C>>(&mut self, errno: i32, cb: &W) {
if let Some(mut event) = self.event.take() {
event.set_error(errno);
cb.done(event);
}
}
}
struct AioInner<C: IOCallback> {
depth: usize,
context: aio_context_t,
slots: UnsafeCell<Vec<AioSlot<C>>>,
null_file: File,
}
unsafe impl<C: IOCallback> Send for AioInner<C> {}
unsafe impl<C: IOCallback> Sync for AioInner<C> {}
pub struct AioDriver<C: IOCallback, Q: BlockingRxTrait<Box<IOEvent<C>>>, W: Worker<C>> {
_marker: std::marker::PhantomData<(C, Q, W)>,
}
impl<
C: IOCallback,
Q: BlockingRxTrait<Box<IOEvent<C>>> + Send + 'static,
W: Worker<C> + Send + 'static,
> AioDriver<C, Q, W>
{
pub fn start(depth: usize, rx: Q, cb_workers: W) -> io::Result<()> {
let mut aio_context: aio_context_t = 0;
if io_setup(depth as c_long, &mut aio_context) != 0 {
return Err(io::Error::last_os_error());
}
let mut slots = Vec::with_capacity(depth);
for slot_id in 0..depth {
slots.push(AioSlot::new(slot_id as u64));
}
let null_file;
match File::open("/dev/null") {
Err(e) => {
error!("cannot open /dev/null: {}", e);
return Err(e);
}
Ok(f) => {
null_file = f;
}
}
let inner = Arc::new(AioInner {
depth,
context: aio_context,
slots: UnsafeCell::new(slots),
null_file,
});
let (s_free, r_free) = spsc::bounded_blocking::<u16>(depth);
for i in 0..depth {
let _ = s_free.send(i as u16);
}
let inner_submit = inner.clone();
thread::spawn(move || Self::submit_loop(inner_submit, rx, r_free));
thread::spawn(move || Self::poll_loop(inner, cb_workers, s_free));
Ok(())
}
fn background_worker(rx: Rx<spsc::Array<Box<IOEvent<C>>>>) {
loop {
match rx.recv() {
Ok(mut event) => {
let size = event.get_size() as i64;
let res = match event.action {
IOAction::Alloc => {
let fd = unsafe { BorrowedFd::borrow_raw(event.fd) };
fallocate(fd, FallocateFlags::empty(), event.offset, size)
.map_or_else(|e| -(e as i32), |_| 0)
}
IOAction::Fsync => fsync(unsafe { BorrowedFd::borrow_raw(event.fd) })
.map_or_else(|e| -(e as i32), |_| 0),
_ => -libc::EINVAL, };
if res == 0 {
event.set_copied(size as usize);
} else {
event.set_error(-res);
}
event.callback_unchecked(true);
}
Err(_) => {
break;
}
}
}
}
fn submit_loop(inner: Arc<AioInner<C>>, rx: Q, free_recv: Rx<spsc::Array<u16>>) {
let depth = inner.depth;
let mut iocbs = Vec::<*mut iocb>::with_capacity(depth);
let slots_ref: &mut Vec<AioSlot<C>> = unsafe { transmute(inner.slots.get()) };
let aio_context = inner.context;
let mut events_to_process = VecDeque::with_capacity(depth);
let mut background_channel_tx: Option<Tx<spsc::Array<Box<IOEvent<C>>>>> = None;
loop {
if events_to_process.is_empty() {
match rx.recv() {
Ok(event) => match event.action {
IOAction::Alloc | IOAction::Fsync => {
if background_channel_tx.is_none() {
let (tx, rx) = spsc::bounded_blocking::<Box<IOEvent<C>>>(depth);
thread::spawn(move || Self::background_worker(rx));
background_channel_tx = Some(tx);
}
if let Some(tx) = &background_channel_tx {
if let Err(SendError(mut event)) = tx.send(event) {
event.set_error(Errno::ESHUTDOWN as i32);
event.callback_unchecked(true);
}
}
}
_ => events_to_process.push_back(event),
},
Err(_) => {
let slot_id = free_recv.recv().unwrap();
let slot = &mut slots_ref[slot_id as usize];
slot.fill_exit_slot(slot_id, inner.null_file.as_raw_fd());
let mut iocb_ptr: *mut iocb = &mut slot.iocb as *mut _;
let res = io_submit(aio_context, 1, &mut iocb_ptr);
if res != 1 {
error!("Failed to submit exit signal: {}", res);
}
info!("io_submit worker exit due to queue closing");
break;
}
}
}
while events_to_process.len() < depth {
if let Ok(event) = rx.try_recv() {
match event.action {
IOAction::Alloc | IOAction::Fsync => {
if let Some(tx) = &background_channel_tx {
if let Err(SendError(mut event)) = tx.send(event) {
event.set_error(Errno::ESHUTDOWN as i32);
event.callback_unchecked(true);
}
}
}
_ => events_to_process.push_back(event),
}
} else {
break;
}
}
let mut first = true;
while !events_to_process.is_empty() {
let slot_id_opt =
if first { Some(free_recv.recv().unwrap()) } else { free_recv.try_recv().ok() };
if let Some(slot_id) = slot_id_opt {
first = false;
let event = events_to_process.pop_front().unwrap();
let slot = &mut slots_ref[slot_id as usize];
slot.fill_buffer_slot(slot_id, event);
iocbs.push(&mut slot.iocb as *mut iocb);
} else {
break;
}
}
if !iocbs.is_empty() {
let mut done: libc::c_long = 0;
let mut left = iocbs.len();
'submit: loop {
let result = unsafe {
let arr = iocbs.as_mut_ptr().add(done as usize);
io_submit(aio_context, left as libc::c_long, arr)
};
if result < 0 {
if -result == Errno::EINTR as i64 {
continue 'submit;
}
error!("io_submit error: {}", result);
break 'submit;
} else {
if result == left as libc::c_long {
trace!("io submit {} events", result);
break 'submit;
} else {
done += result;
left -= result as usize;
trace!("io submit {}/{} events", result, left);
}
}
}
iocbs.clear();
}
}
}
fn poll_loop(inner: Arc<AioInner<C>>, cb_workers: W, free_sender: Tx<spsc::Array<u16>>) {
let depth = inner.depth;
let mut infos = Vec::<io_event>::with_capacity(depth);
let slots_ref: &mut Vec<AioSlot<C>> = unsafe { transmute(inner.slots.get()) };
let aio_context = inner.context;
let mut exit_received = false;
loop {
infos.clear();
let result = io_getevents(
aio_context,
1,
depth as i64,
infos.as_mut_ptr(),
std::ptr::null_mut(),
);
if result < 0 {
if -result == Errno::EINTR as i64 {
continue;
}
error!("io_getevents errno: {}", -result);
thread::sleep(Duration::from_millis(10));
continue;
}
assert!(result > 0);
unsafe {
infos.set_len(result as usize);
}
for ref info in &infos {
let slot_id = (*info).data as usize;
let slot = &mut slots_ref[slot_id];
if slot.iocb.aio_nbytes == 0 && (*info).res == 0 {
exit_received = true;
let _ = free_sender.send(slot_id as u16);
continue;
}
Self::verify_result(&cb_workers, slot, info);
let _ = free_sender.send(slot_id as u16);
}
if exit_received {
info!("io_poll worker exit gracefully");
break;
}
}
info!("io_poll worker exit cleaning up");
let _ = io_destroy(aio_context);
}
#[inline(always)]
fn verify_result(cb_workers: &W, slot: &mut AioSlot<C>, info: &io_event) {
if info.res < 0 {
println!("set error {:?}", info.res);
slot.set_error((-info.res) as i32, cb_workers);
return;
}
slot.set_result(info.res as usize, cb_workers);
}
}
use io_engine_aio_bindings::{
__NR_io_destroy, __NR_io_getevents, __NR_io_setup, __NR_io_submit, aio_context_t, io_event,
iocb, syscall, timespec,
};
use libc::c_long;
#[inline(always)]
fn io_setup(nr: c_long, ctxp: *mut aio_context_t) -> c_long {
unsafe { syscall(__NR_io_setup as c_long, nr, ctxp) }
}
#[inline(always)]
fn io_destroy(ctx: aio_context_t) -> c_long {
unsafe { syscall(__NR_io_destroy as c_long, ctx) }
}
#[inline(always)]
fn io_submit(ctx: aio_context_t, nr: c_long, iocbpp: *mut *mut iocb) -> c_long {
unsafe { syscall(__NR_io_submit as c_long, ctx, nr, iocbpp) }
}
#[inline(always)]
fn io_getevents(
ctx: aio_context_t, min_nr: c_long, max_nr: c_long, events: *mut io_event,
timeout: *mut timespec,
) -> c_long {
unsafe { syscall(__NR_io_getevents as c_long, ctx, min_nr, max_nr, events, timeout) }
}