use crate::Result;
use crate::errno::Errno;
use std::os::unix::io::RawFd;
use libc::{c_void, off_t, size_t};
use std::fmt;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem;
use std::pin::Pin;
use std::ptr::{null, null_mut};
use crate::sys::signal::*;
use std::thread;
use crate::sys::time::TimeSpec;
libc_enum! {
#[repr(i32)]
#[non_exhaustive]
pub enum AioFsyncMode {
O_SYNC,
#[cfg(any(target_os = "ios",
target_os = "linux",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"))]
#[cfg_attr(docsrs, doc(cfg(all())))]
O_DSYNC
}
}
libc_enum! {
#[repr(i32)]
#[non_exhaustive]
pub enum LioOpcode {
LIO_NOP,
LIO_WRITE,
LIO_READ,
}
}
libc_enum! {
#[repr(i32)]
pub enum LioMode {
LIO_WAIT,
LIO_NOWAIT,
}
}
#[repr(i32)]
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum AioCancelStat {
AioCanceled = libc::AIO_CANCELED,
AioNotCanceled = libc::AIO_NOTCANCELED,
AioAllDone = libc::AIO_ALLDONE,
}
#[repr(transparent)]
struct LibcAiocb(libc::aiocb);
unsafe impl Send for LibcAiocb {}
unsafe impl Sync for LibcAiocb {}
pub struct AioCb<'a> {
aiocb: LibcAiocb,
mutable: bool,
in_progress: bool,
_buffer: std::marker::PhantomData<&'a [u8]>,
_pin: std::marker::PhantomPinned
}
impl<'a> AioCb<'a> {
pub fn fd(&self) -> RawFd {
self.aiocb.0.aio_fildes
}
pub fn from_fd(fd: RawFd, prio: libc::c_int,
sigev_notify: SigevNotify) -> Pin<Box<AioCb<'a>>> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.0.aio_offset = 0;
a.0.aio_nbytes = 0;
a.0.aio_buf = null_mut();
Box::pin(AioCb {
aiocb: a,
mutable: false,
in_progress: false,
_buffer: PhantomData,
_pin: std::marker::PhantomPinned
})
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
fn from_mut_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a mut [u8],
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb<'a>
{
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.0.aio_offset = offs;
a.0.aio_nbytes = buf.len() as size_t;
a.0.aio_buf = buf.as_ptr() as *mut c_void;
a.0.aio_lio_opcode = opcode as libc::c_int;
AioCb {
aiocb: a,
mutable: true,
in_progress: false,
_buffer: PhantomData,
_pin: std::marker::PhantomPinned
}
}
pub fn from_mut_slice(fd: RawFd, offs: off_t, buf: &'a mut [u8],
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.0.aio_offset = offs;
a.0.aio_nbytes = buf.len() as size_t;
a.0.aio_buf = buf.as_ptr() as *mut c_void;
a.0.aio_lio_opcode = opcode as libc::c_int;
Box::pin(AioCb {
aiocb: a,
mutable: true,
in_progress: false,
_buffer: PhantomData,
_pin: std::marker::PhantomPinned
})
}
pub unsafe fn from_mut_ptr(fd: RawFd, offs: off_t,
buf: *mut c_void, len: usize,
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.0.aio_offset = offs;
a.0.aio_nbytes = len;
a.0.aio_buf = buf;
a.0.aio_lio_opcode = opcode as libc::c_int;
Box::pin(AioCb {
aiocb: a,
mutable: true,
in_progress: false,
_buffer: PhantomData,
_pin: std::marker::PhantomPinned,
})
}
pub unsafe fn from_ptr(fd: RawFd, offs: off_t,
buf: *const c_void, len: usize,
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> Pin<Box<AioCb<'a>>> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.0.aio_offset = offs;
a.0.aio_nbytes = len;
a.0.aio_buf = buf as *mut c_void;
a.0.aio_lio_opcode = opcode as libc::c_int;
Box::pin(AioCb {
aiocb: a,
mutable: false,
in_progress: false,
_buffer: PhantomData,
_pin: std::marker::PhantomPinned
})
}
fn from_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a [u8],
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb
{
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.0.aio_offset = offs;
a.0.aio_nbytes = buf.len() as size_t;
a.0.aio_buf = buf.as_ptr() as *mut c_void;
assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer");
a.0.aio_lio_opcode = opcode as libc::c_int;
AioCb {
aiocb: a,
mutable: false,
in_progress: false,
_buffer: PhantomData,
_pin: std::marker::PhantomPinned
}
}
pub fn from_slice(fd: RawFd, offs: off_t, buf: &'a [u8],
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> Pin<Box<AioCb>>
{
Box::pin(AioCb::from_slice_unpinned(fd, offs, buf, prio, sigev_notify,
opcode))
}
fn common_init(fd: RawFd, prio: libc::c_int,
sigev_notify: SigevNotify) -> LibcAiocb {
let mut a = unsafe { mem::zeroed::<libc::aiocb>()};
a.aio_fildes = fd;
a.aio_reqprio = prio;
a.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
LibcAiocb(a)
}
pub fn set_sigev_notify(self: &mut Pin<Box<Self>>,
sigev_notify: SigevNotify)
{
let selfp = unsafe {
self.as_mut().get_unchecked_mut()
};
selfp.aiocb.0.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
}
pub fn cancel(self: &mut Pin<Box<Self>>) -> Result<AioCancelStat> {
let r = unsafe {
let selfp = self.as_mut().get_unchecked_mut();
libc::aio_cancel(selfp.aiocb.0.aio_fildes, &mut selfp.aiocb.0)
};
match r {
libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
-1 => Err(Errno::last()),
_ => panic!("unknown aio_cancel return value")
}
}
fn error_unpinned(&mut self) -> Result<()> {
let r = unsafe {
libc::aio_error(&mut self.aiocb.0 as *mut libc::aiocb)
};
match r {
0 => Ok(()),
num if num > 0 => Err(Errno::from_i32(num)),
-1 => Err(Errno::last()),
num => panic!("unknown aio_error return value {:?}", num)
}
}
pub fn error(self: &mut Pin<Box<Self>>) -> Result<()> {
let selfp = unsafe {
self.as_mut().get_unchecked_mut()
};
selfp.error_unpinned()
}
pub fn fsync(self: &mut Pin<Box<Self>>, mode: AioFsyncMode) -> Result<()> {
unsafe {
let selfp = self.as_mut().get_unchecked_mut();
Errno::result({
let p: *mut libc::aiocb = &mut selfp.aiocb.0;
libc::aio_fsync(mode as libc::c_int, p)
}).map(|_| {
selfp.in_progress = true;
})
}
}
pub fn lio_opcode(&self) -> Option<LioOpcode> {
match self.aiocb.0.aio_lio_opcode {
libc::LIO_READ => Some(LioOpcode::LIO_READ),
libc::LIO_WRITE => Some(LioOpcode::LIO_WRITE),
libc::LIO_NOP => Some(LioOpcode::LIO_NOP),
_ => None
}
}
pub fn nbytes(&self) -> usize {
self.aiocb.0.aio_nbytes
}
pub fn offset(&self) -> off_t {
self.aiocb.0.aio_offset
}
pub fn priority(&self) -> libc::c_int {
self.aiocb.0.aio_reqprio
}
pub fn read(self: &mut Pin<Box<Self>>) -> Result<()> {
assert!(self.mutable, "Can't read into an immutable buffer");
let selfp = unsafe {
self.as_mut().get_unchecked_mut()
};
Errno::result({
let p: *mut libc::aiocb = &mut selfp.aiocb.0;
unsafe { libc::aio_read(p) }
}).map(|_| {
selfp.in_progress = true;
})
}
pub fn sigevent(&self) -> SigEvent {
SigEvent::from(&self.aiocb.0.aio_sigevent)
}
fn aio_return_unpinned(&mut self) -> Result<isize> {
unsafe {
let p: *mut libc::aiocb = &mut self.aiocb.0;
self.in_progress = false;
Errno::result(libc::aio_return(p))
}
}
pub fn aio_return(self: &mut Pin<Box<Self>>) -> Result<isize> {
let selfp = unsafe {
self.as_mut().get_unchecked_mut()
};
selfp.aio_return_unpinned()
}
pub fn write(self: &mut Pin<Box<Self>>) -> Result<()> {
let selfp = unsafe {
self.as_mut().get_unchecked_mut()
};
Errno::result({
let p: *mut libc::aiocb = &mut selfp.aiocb.0;
unsafe{ libc::aio_write(p) }
}).map(|_| {
selfp.in_progress = true;
})
}
}
pub fn aio_cancel_all(fd: RawFd) -> Result<AioCancelStat> {
match unsafe { libc::aio_cancel(fd, null_mut()) } {
libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
-1 => Err(Errno::last()),
_ => panic!("unknown aio_cancel return value")
}
}
pub fn aio_suspend(list: &[Pin<&AioCb>], timeout: Option<TimeSpec>) -> Result<()> {
let plist = list as *const [Pin<&AioCb>] as *const [*const libc::aiocb];
let p = plist as *const *const libc::aiocb;
let timep = match timeout {
None => null::<libc::timespec>(),
Some(x) => x.as_ref() as *const libc::timespec
};
Errno::result(unsafe {
libc::aio_suspend(p, list.len() as i32, timep)
}).map(drop)
}
impl<'a> Debug for AioCb<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("AioCb")
.field("aiocb", &self.aiocb.0)
.field("mutable", &self.mutable)
.field("in_progress", &self.in_progress)
.finish()
}
}
impl<'a> Drop for AioCb<'a> {
fn drop(&mut self) {
assert!(thread::panicking() || !self.in_progress,
"Dropped an in-progress AioCb");
}
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(docsrs, doc(cfg(all())))]
pub struct LioCb<'a> {
aiocbs: Box<[AioCb<'a>]>,
list: Vec<*mut libc::aiocb>,
results: Vec<Option<Result<isize>>>
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
unsafe impl<'a> Send for LioCb<'a> {}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
unsafe impl<'a> Sync for LioCb<'a> {}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(docsrs, doc(cfg(all())))]
impl<'a> LioCb<'a> {
pub fn is_empty(&self) -> bool {
self.aiocbs.is_empty()
}
pub fn len(&self) -> usize {
self.aiocbs.len()
}
pub fn listio(&mut self, mode: LioMode,
sigev_notify: SigevNotify) -> Result<()> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
self.list.clear();
for a in &mut self.aiocbs.iter_mut() {
a.in_progress = true;
self.list.push(a as *mut AioCb<'a>
as *mut libc::aiocb);
}
let p = self.list.as_ptr();
Errno::result(unsafe {
libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
}).map(drop)
}
pub fn listio_resubmit(&mut self, mode:LioMode,
sigev_notify: SigevNotify) -> Result<()> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
self.list.clear();
while self.results.len() < self.aiocbs.len() {
self.results.push(None);
}
for (i, a) in self.aiocbs.iter_mut().enumerate() {
if self.results[i].is_some() {
continue;
}
match a.error_unpinned() {
Ok(()) => {
self.results[i] = Some(a.aio_return_unpinned());
},
Err(Errno::EAGAIN) => {
self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb);
},
Err(Errno::EINPROGRESS) => {
},
Err(Errno::EINVAL) => panic!(
"AioCb was never submitted, or already finalized"),
_ => unreachable!()
}
}
let p = self.list.as_ptr();
Errno::result(unsafe {
libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
}).map(drop)
}
pub fn aio_return(&mut self, i: usize) -> Result<isize> {
if i >= self.results.len() || self.results[i].is_none() {
self.aiocbs[i].aio_return_unpinned()
} else {
self.results[i].unwrap()
}
}
pub fn error(&mut self, i: usize) -> Result<()> {
if i >= self.results.len() || self.results[i].is_none() {
self.aiocbs[i].error_unpinned()
} else {
Ok(())
}
}
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
impl<'a> Debug for LioCb<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("LioCb")
.field("aiocbs", &self.aiocbs)
.finish()
}
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(docsrs, doc(cfg(all())))]
#[derive(Debug)]
pub struct LioCbBuilder<'a> {
pub aiocbs: Vec<AioCb<'a>>,
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(docsrs, doc(cfg(all())))]
impl<'a> LioCbBuilder<'a> {
pub fn with_capacity(capacity: usize) -> LioCbBuilder<'a> {
LioCbBuilder {
aiocbs: Vec::with_capacity(capacity),
}
}
#[must_use]
pub fn emplace_slice(mut self, fd: RawFd, offs: off_t, buf: &'a [u8],
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> Self
{
self.aiocbs.push(AioCb::from_slice_unpinned(fd, offs, buf, prio,
sigev_notify, opcode));
self
}
#[must_use]
pub fn emplace_mut_slice(mut self, fd: RawFd, offs: off_t,
buf: &'a mut [u8], prio: libc::c_int,
sigev_notify: SigevNotify, opcode: LioOpcode)
-> Self
{
self.aiocbs.push(AioCb::from_mut_slice_unpinned(fd, offs, buf, prio,
sigev_notify, opcode));
self
}
pub fn finish(self) -> LioCb<'a> {
let len = self.aiocbs.len();
LioCb {
aiocbs: self.aiocbs.into(),
list: Vec::with_capacity(len),
results: Vec::with_capacity(len)
}
}
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg(test)]
mod t {
use super::*;
#[test]
fn liocb_is_unpin() {
use assert_impl::assert_impl;
assert_impl!(Unpin: LioCb);
}
}