use {Error, Result};
use errno::Errno;
use std::os::unix::io::RawFd;
use libc::{c_void, off_t, size_t};
use libc;
use std::borrow::{Borrow, BorrowMut};
use std::fmt;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem;
use std::ptr::{null, null_mut};
use sys::signal::*;
use std::thread;
use sys::time::TimeSpec;
libc_enum! {
#[repr(i32)]
pub enum AioFsyncMode {
O_SYNC,
#[cfg(any(target_os = "ios",
target_os = "linux",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"))]
O_DSYNC
}
}
libc_enum! {
#[repr(i32)]
pub enum LioOpcode {
LIO_NOP,
LIO_WRITE,
LIO_READ,
}
}
libc_enum! {
#[repr(i32)]
pub enum LioMode {
LIO_WAIT,
LIO_NOWAIT,
}
}
#[repr(i32)]
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum AioCancelStat {
AioCanceled = libc::AIO_CANCELED,
AioNotCanceled = libc::AIO_NOTCANCELED,
AioAllDone = libc::AIO_ALLDONE,
}
pub enum Buffer<'a> {
None,
Phantom(PhantomData<&'a mut [u8]>),
BoxedSlice(Box<dyn Borrow<[u8]>>),
BoxedMutSlice(Box<dyn BorrowMut<[u8]>>),
}
impl<'a> Debug for Buffer<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
Buffer::None => write!(fmt, "None"),
Buffer::Phantom(p) => p.fmt(fmt),
Buffer::BoxedSlice(ref bs) => {
let borrowed : &dyn Borrow<[u8]> = bs.borrow();
write!(fmt, "BoxedSlice({:?})",
borrowed as *const dyn Borrow<[u8]>)
},
Buffer::BoxedMutSlice(ref bms) => {
let borrowed : &dyn BorrowMut<[u8]> = bms.borrow();
write!(fmt, "BoxedMutSlice({:?})",
borrowed as *const dyn BorrowMut<[u8]>)
}
}
}
}
pub struct AioCb<'a> {
aiocb: libc::aiocb,
mutable: bool,
in_progress: bool,
buffer: Buffer<'a>
}
impl<'a> AioCb<'a> {
pub fn buffer(&mut self) -> Buffer<'a> {
assert!(!self.in_progress);
let mut x = Buffer::None;
mem::swap(&mut self.buffer, &mut x);
x
}
pub fn boxed_slice(&mut self) -> Option<Box<dyn Borrow<[u8]>>> {
assert!(!self.in_progress, "Can't remove the buffer from an AioCb that's still in-progress. Did you forget to call aio_return?");
if let Buffer::BoxedSlice(_) = self.buffer {
let mut oldbuffer = Buffer::None;
mem::swap(&mut self.buffer, &mut oldbuffer);
if let Buffer::BoxedSlice(inner) = oldbuffer {
Some(inner)
} else {
unreachable!();
}
} else {
None
}
}
pub fn boxed_mut_slice(&mut self) -> Option<Box<dyn BorrowMut<[u8]>>> {
assert!(!self.in_progress, "Can't remove the buffer from an AioCb that's still in-progress. Did you forget to call aio_return?");
if let Buffer::BoxedMutSlice(_) = self.buffer {
let mut oldbuffer = Buffer::None;
mem::swap(&mut self.buffer, &mut oldbuffer);
if let Buffer::BoxedMutSlice(inner) = oldbuffer {
Some(inner)
} else {
unreachable!();
}
} else {
None
}
}
pub fn fd(&self) -> RawFd {
self.aiocb.aio_fildes
}
pub fn from_fd(fd: RawFd, prio: libc::c_int,
sigev_notify: SigevNotify) -> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = 0;
a.aio_nbytes = 0;
a.aio_buf = null_mut();
AioCb {
aiocb: a,
mutable: false,
in_progress: false,
buffer: Buffer::None
}
}
pub fn from_mut_slice(fd: RawFd, offs: off_t, buf: &'a mut [u8],
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = buf.len() as size_t;
a.aio_buf = buf.as_ptr() as *mut c_void;
a.aio_lio_opcode = opcode as libc::c_int;
AioCb {
aiocb: a,
mutable: true,
in_progress: false,
buffer: Buffer::Phantom(PhantomData),
}
}
pub fn from_boxed_slice(fd: RawFd, offs: off_t, buf: Box<dyn Borrow<[u8]>>,
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
{
let borrowed : &dyn Borrow<[u8]> = buf.borrow();
let slice : &[u8] = borrowed.borrow();
a.aio_nbytes = slice.len() as size_t;
a.aio_buf = slice.as_ptr() as *mut c_void;
}
a.aio_offset = offs;
a.aio_lio_opcode = opcode as libc::c_int;
AioCb {
aiocb: a,
mutable: false,
in_progress: false,
buffer: Buffer::BoxedSlice(buf),
}
}
pub fn from_boxed_mut_slice(fd: RawFd, offs: off_t,
mut buf: Box<dyn BorrowMut<[u8]>>,
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
{
let borrowed : &mut dyn BorrowMut<[u8]> = buf.borrow_mut();
let slice : &mut [u8] = borrowed.borrow_mut();
a.aio_nbytes = slice.len() as size_t;
a.aio_buf = slice.as_mut_ptr() as *mut c_void;
}
a.aio_offset = offs;
a.aio_lio_opcode = opcode as libc::c_int;
AioCb {
aiocb: a,
mutable: true,
in_progress: false,
buffer: Buffer::BoxedMutSlice(buf),
}
}
pub unsafe fn from_mut_ptr(fd: RawFd, offs: off_t,
buf: *mut c_void, len: usize,
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = len;
a.aio_buf = buf;
a.aio_lio_opcode = opcode as libc::c_int;
AioCb {
aiocb: a,
mutable: true,
in_progress: false,
buffer: Buffer::None
}
}
pub unsafe fn from_ptr(fd: RawFd, offs: off_t,
buf: *const c_void, len: usize,
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb<'a> {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = len;
a.aio_buf = buf as *mut c_void;
a.aio_lio_opcode = opcode as libc::c_int;
AioCb {
aiocb: a,
mutable: false,
in_progress: false,
buffer: Buffer::None
}
}
pub fn from_slice(fd: RawFd, offs: off_t, buf: &'a [u8],
prio: libc::c_int, sigev_notify: SigevNotify,
opcode: LioOpcode) -> AioCb {
let mut a = AioCb::common_init(fd, prio, sigev_notify);
a.aio_offset = offs;
a.aio_nbytes = buf.len() as size_t;
a.aio_buf = buf.as_ptr() as *mut c_void;
assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer");
a.aio_lio_opcode = opcode as libc::c_int;
AioCb {
aiocb: a,
mutable: false,
in_progress: false,
buffer: Buffer::None,
}
}
fn common_init(fd: RawFd, prio: libc::c_int,
sigev_notify: SigevNotify) -> libc::aiocb {
let mut a = unsafe { mem::zeroed::<libc::aiocb>()};
a.aio_fildes = fd;
a.aio_reqprio = prio;
a.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
a
}
pub fn set_sigev_notify(&mut self, sigev_notify: SigevNotify) {
self.aiocb.aio_sigevent = SigEvent::new(sigev_notify).sigevent();
}
pub fn cancel(&mut self) -> Result<AioCancelStat> {
match unsafe { libc::aio_cancel(self.aiocb.aio_fildes, &mut self.aiocb) } {
libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
-1 => Err(Error::last()),
_ => panic!("unknown aio_cancel return value")
}
}
pub fn error(&mut self) -> Result<()> {
match unsafe { libc::aio_error(&mut self.aiocb as *mut libc::aiocb) } {
0 => Ok(()),
num if num > 0 => Err(Error::from_errno(Errno::from_i32(num))),
-1 => Err(Error::last()),
num => panic!("unknown aio_error return value {:?}", num)
}
}
pub fn fsync(&mut self, mode: AioFsyncMode) -> Result<()> {
let p: *mut libc::aiocb = &mut self.aiocb;
Errno::result(unsafe {
libc::aio_fsync(mode as libc::c_int, p)
}).map(|_| {
self.in_progress = true;
})
}
pub fn lio_opcode(&self) -> Option<LioOpcode> {
match self.aiocb.aio_lio_opcode {
libc::LIO_READ => Some(LioOpcode::LIO_READ),
libc::LIO_WRITE => Some(LioOpcode::LIO_WRITE),
libc::LIO_NOP => Some(LioOpcode::LIO_NOP),
_ => None
}
}
pub fn nbytes(&self) -> usize {
self.aiocb.aio_nbytes
}
pub fn offset(&self) -> off_t {
self.aiocb.aio_offset
}
pub fn priority(&self) -> libc::c_int {
self.aiocb.aio_reqprio
}
pub fn read(&mut self) -> Result<()> {
assert!(self.mutable, "Can't read into an immutable buffer");
let p: *mut libc::aiocb = &mut self.aiocb;
Errno::result(unsafe {
libc::aio_read(p)
}).map(|_| {
self.in_progress = true;
})
}
pub fn sigevent(&self) -> SigEvent {
SigEvent::from(&self.aiocb.aio_sigevent)
}
pub fn aio_return(&mut self) -> Result<isize> {
let p: *mut libc::aiocb = &mut self.aiocb;
self.in_progress = false;
Errno::result(unsafe { libc::aio_return(p) })
}
pub fn write(&mut self) -> Result<()> {
let p: *mut libc::aiocb = &mut self.aiocb;
Errno::result(unsafe {
libc::aio_write(p)
}).map(|_| {
self.in_progress = true;
})
}
}
pub fn aio_cancel_all(fd: RawFd) -> Result<AioCancelStat> {
match unsafe { libc::aio_cancel(fd, null_mut()) } {
libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled),
libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled),
libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone),
-1 => Err(Error::last()),
_ => panic!("unknown aio_cancel return value")
}
}
pub fn aio_suspend(list: &[&AioCb], timeout: Option<TimeSpec>) -> Result<()> {
let plist = list as *const [&AioCb] as *const [*const libc::aiocb];
let p = plist as *const *const libc::aiocb;
let timep = match timeout {
None => null::<libc::timespec>(),
Some(x) => x.as_ref() as *const libc::timespec
};
Errno::result(unsafe {
libc::aio_suspend(p, list.len() as i32, timep)
}).map(drop)
}
impl<'a> Debug for AioCb<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("AioCb")
.field("aiocb", &self.aiocb)
.field("mutable", &self.mutable)
.field("in_progress", &self.in_progress)
.finish()
}
}
impl<'a> Drop for AioCb<'a> {
fn drop(&mut self) {
assert!(thread::panicking() || !self.in_progress,
"Dropped an in-progress AioCb");
}
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
pub struct LioCb<'a> {
pub aiocbs: Vec<AioCb<'a>>,
list: Vec<*mut libc::aiocb>,
results: Vec<Option<Result<isize>>>
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
impl<'a> LioCb<'a> {
pub fn with_capacity(capacity: usize) -> LioCb<'a> {
LioCb {
aiocbs: Vec::with_capacity(capacity),
list: Vec::with_capacity(capacity),
results: Vec::with_capacity(capacity)
}
}
pub fn listio(&mut self, mode: LioMode,
sigev_notify: SigevNotify) -> Result<()> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
self.list.clear();
for a in &mut self.aiocbs {
a.in_progress = true;
self.list.push(a as *mut AioCb<'a>
as *mut libc::aiocb);
}
let p = self.list.as_ptr();
Errno::result(unsafe {
libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
}).map(drop)
}
pub fn listio_resubmit(&mut self, mode:LioMode,
sigev_notify: SigevNotify) -> Result<()> {
let sigev = SigEvent::new(sigev_notify);
let sigevp = &mut sigev.sigevent() as *mut libc::sigevent;
self.list.clear();
while self.results.len() < self.aiocbs.len() {
self.results.push(None);
}
for (i, a) in self.aiocbs.iter_mut().enumerate() {
if self.results[i].is_some() {
continue;
}
match a.error() {
Ok(()) => {
self.results[i] = Some(a.aio_return());
},
Err(Error::Sys(Errno::EAGAIN)) => {
self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb);
},
Err(Error::Sys(Errno::EINPROGRESS)) => {
},
Err(Error::Sys(Errno::EINVAL)) => panic!(
"AioCb was never submitted, or already finalized"),
_ => unreachable!()
}
}
let p = self.list.as_ptr();
Errno::result(unsafe {
libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp)
}).map(drop)
}
pub fn aio_return(&mut self, i: usize) -> Result<isize> {
if i >= self.results.len() || self.results[i].is_none() {
self.aiocbs[i].aio_return()
} else {
self.results[i].unwrap()
}
}
pub fn error(&mut self, i: usize) -> Result<()> {
if i >= self.results.len() || self.results[i].is_none() {
self.aiocbs[i].error()
} else {
Ok(())
}
}
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
impl<'a> Debug for LioCb<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("LioCb")
.field("aiocbs", &self.aiocbs)
.finish()
}
}
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
impl<'a> From<Vec<AioCb<'a>>> for LioCb<'a> {
fn from(src: Vec<AioCb<'a>>) -> LioCb<'a> {
LioCb {
list: Vec::with_capacity(src.capacity()),
results: Vec::with_capacity(src.capacity()),
aiocbs: src,
}
}
}