use futures::{
Future,
task::{Context, Poll}
};
pub use mio_aio::LioError;
use nix::errno::Errno;
use tokio::io::bsd::{AioSource, Aio};
use std::{fs, io, mem};
use std::os::unix::fs::FileTypeExt;
use std::os::unix::io::{AsRawFd, RawFd};
use std::path::Path;
use std::pin::Pin;
nix::ioctl_read! {
diocgmediasize, 'd', 129, nix::libc::off_t
}
nix::ioctl_read! {
diocgsectorsize, 'd', 128, nix::libc::c_uint
}
nix::ioctl_read! {
diocgstripesize, 'd', 139, nix::libc::off_t
}
fn conv_poll_err<T>(e: io::Error) -> Poll<Result<T, nix::Error>> {
let raw = e.raw_os_error().unwrap_or(0);
let errno = Errno::from_i32(raw);
Poll::Ready(Err(errno))
}
macro_rules! lio_resubmit {
($self: ident, $ev: expr) => {
{
let result = (*$self.op.as_mut().unwrap()).0.resubmit();
$self.op
.as_mut()
.unwrap()
.clear_ready($ev);
match result {
Ok(()) => {
$self.state = AioState::InProgress;
None
},
Err(LioError::EINCOMPLETE) => None,
Err(LioError::EAGAIN) => Some(Err(Errno::EAGAIN)),
Err(LioError::EIO(_)) => Some(Err(Errno::EIO)),
}
}
}
}
#[derive(Debug)]
struct WrappedAioCb<'a>(mio_aio::AioCb<'a>);
impl<'a> AioSource for WrappedAioCb<'a> {
fn register(&mut self, kq: RawFd, token: usize) {
self.0.register_raw(kq, token)
}
fn deregister(&mut self) {
self.0.deregister_raw()
}
}
#[derive(Debug)]
struct WrappedLioCb<'a>(mio_aio::LioCb<'a>);
impl<'a> AioSource for WrappedLioCb<'a> {
fn register(&mut self, kq: RawFd, token: usize) {
self.0.register_raw(kq, token)
}
fn deregister(&mut self) {
self.0.deregister_raw()
}
}
#[derive(Debug)]
enum AioOp<'a> {
Fsync(Aio<WrappedAioCb<'static>>),
Read(Aio<WrappedAioCb<'a>>),
Write(Aio<WrappedAioCb<'a>>),
}
#[derive(Debug, Eq, PartialEq)]
enum AioState {
Allocated,
InProgress,
Incomplete,
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct AioFut<'a> {
op: AioOp<'a>,
state: AioState,
}
impl<'a> AioFut<'a> {
#[doc(hidden)]
fn aio_return(&mut self) -> Result<Option<isize>, nix::Error> {
match self.op {
AioOp::Fsync(ref mut io) =>
(*io).0.aio_return().map(|_| None),
AioOp::Read(ref mut io) | AioOp::Write(ref mut io) =>
(*io).0.aio_return().map(Some),
}
}
}
pub struct AioResult {
pub value: Option<isize>,
}
#[must_use = "futures do nothing unless polled"]
#[allow(clippy::type_complexity)]
pub struct ReadvAt<'a> {
op: Option<Aio<WrappedLioCb<'a>>>,
bufsav: Option<(Pin<Box<[u8]>>, &'a mut [&'a mut [u8]])>,
state: AioState,
}
#[must_use = "futures do nothing unless polled"]
pub struct WritevAt<'a> {
op: Option<Aio<WrappedLioCb<'a>>>,
_accumulator: Option<Pin<Box<[u8]>>>,
state: AioState,
}
impl<'a> Future for ReadvAt<'a> {
type Output = Result<usize, nix::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let AioState::Allocated = self.state {
let result = (*self.op.as_mut().unwrap()).0.submit();
match result {
Ok(()) => self.state = AioState::InProgress,
Err(LioError::EINCOMPLETE) => {
self.state = AioState::Incomplete;
},
Err(LioError::EAGAIN) =>
return Poll::Ready(Err(Errno::EAGAIN)),
Err(LioError::EIO(_)) => {
return Poll::Ready(Err(Errno::EIO))
},
}
}
loop {
let poll_result = self.op
.as_mut()
.unwrap()
.poll_ready(cx);
match poll_result {
Poll::Pending => break Poll::Pending,
Poll::Ready(Err(e)) => break conv_poll_err(e),
Poll::Ready(Ok(ev)) => {
if AioState::Incomplete == self.state {
if let Some(r) = lio_resubmit!(self, ev) {
break Poll::Ready(r);
}
} else {
let r = self.op.take()
.unwrap()
.into_inner()
.0
.into_results(|mut iter|
iter.try_fold(0, |total, lr|
lr.result.map(|r| total + r as usize)
)
);
if let Ok(v) = r {
if let Some((accum, ob)) = &mut self.bufsav {
let mut i = 0;
let mut j = 0;
let mut tot = 0;
while tot < v {
let z = (v - tot).min(ob[i].len() - j);
ob[i][j..j + z]
.copy_from_slice(&accum[tot..tot + z]);
j += z;
tot += z;
if j == ob[i].len() {
j = 0;
i += 1;
}
}
}
}
break Poll::Ready(r)
}
}
}
}
}
}
impl<'a> Future for WritevAt<'a> {
type Output = Result<usize, nix::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let AioState::Allocated = self.state {
let result = (*self.op.as_mut().unwrap()).0.submit();
match result {
Ok(()) => self.state = AioState::InProgress,
Err(LioError::EINCOMPLETE) => {
self.state = AioState::Incomplete;
},
Err(LioError::EAGAIN) =>
return Poll::Ready(Err(Errno::EAGAIN)),
Err(LioError::EIO(_)) => {
return Poll::Ready(Err(Errno::EIO))
},
}
}
loop {
let poll_result = self.op
.as_mut()
.unwrap()
.poll_ready(cx);
match poll_result {
Poll::Pending => break Poll::Pending,
Poll::Ready(Err(e)) => break conv_poll_err(e),
Poll::Ready(Ok(ev)) => {
if AioState::Incomplete == self.state {
if let Some(r) = lio_resubmit!(self, ev) {
break Poll::Ready(r);
}
} else {
let r = self.op.take()
.unwrap()
.into_inner()
.0
.into_results(|mut iter|
iter.try_fold(0, |total, lr|
lr.result.map(|r| total + r as usize)
)
);
break Poll::Ready(r);
}
}
}
}
}
}
#[derive(Debug)]
pub struct File {
file: fs::File,
sectorsize: usize
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::len_without_is_empty))]
impl File {
pub fn len(&self) -> io::Result<u64> {
let md = self.metadata()?;
if self.sectorsize > 1 {
let mut mediasize = mem::MaybeUninit::<nix::libc::off_t>::uninit();
unsafe {
diocgmediasize(self.file.as_raw_fd(), mediasize.as_mut_ptr())
}.map_err(|_| io::Error::from_raw_os_error(nix::errno::errno()))?;
unsafe { Ok(mediasize.assume_init() as u64) }
} else {
Ok(md.len())
}
}
pub fn metadata(&self) -> io::Result<fs::Metadata> {
self.file.metadata()
}
pub fn new(file: fs::File) -> File {
let md = file.metadata().unwrap();
let ft = md.file_type();
let sectorsize = if ft.is_block_device() || ft.is_char_device() {
let mut sectorsize = mem::MaybeUninit::<u32>::uninit();
let mut stripesize = mem::MaybeUninit::<nix::libc::off_t>::uninit();
let fd = file.as_raw_fd();
unsafe {
diocgsectorsize(fd, sectorsize.as_mut_ptr()).unwrap();
diocgstripesize(fd, stripesize.as_mut_ptr()).unwrap();
if stripesize.assume_init() > 0 {
stripesize.assume_init() as usize
} else {
sectorsize.assume_init() as usize
}
}
} else {
1
};
File {file, sectorsize}
}
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)
.map( File::new)
}
pub fn read_at<'a>(&self, buf: &'a mut [u8], offset: u64)
-> io::Result<AioFut<'a>>
{
let aiocb = mio_aio::AioCb::from_mut_slice(self.file.as_raw_fd(),
offset, buf,
0, mio_aio::LioOpcode::LIO_NOP);
let source = WrappedAioCb(aiocb);
Aio::new_for_aio(source)
.map(|pe| AioFut {
op: AioOp::Read(pe),
state: AioState::Allocated
})
}
pub fn readv_at<'a>(&self, bufs: &'a mut [&'a mut [u8]],
offset: u64) -> io::Result<ReadvAt<'a>>
{
let mut builder = mio_aio::LioCbBuilder::with_capacity(bufs.len());
let mut offs = offset;
let fd = self.file.as_raw_fd();
let mut bufsav = None;
if self.sectorsize > 1 &&
bufs.iter().any(|buf| buf.len() % self.sectorsize != 0)
{
let l = bufs.iter().map(|buf| buf.len()).sum();
let mut accumulator: Pin<Box<[u8]>> =
vec![0; l].into_boxed_slice().into();
let original_buffers = bufs;
let buf: &'static mut [u8] = unsafe{
mem::transmute::<&mut [u8], &'static mut [u8]>(
&mut (accumulator.as_mut())
)
};
bufsav = Some((accumulator, original_buffers));
builder = builder.emplace_mut_slice(
fd,
offs,
buf,
0,
mio_aio::LioOpcode::LIO_READ
);
} else {
for buf in bufs.iter_mut() {
let l = buf.len();
builder = builder.emplace_mut_slice(
fd,
offs,
*buf,
0,
mio_aio::LioOpcode::LIO_READ
);
offs += l as u64;
}
}
let liocb = builder.finish();
let source = WrappedLioCb(liocb);
Aio::new_for_lio(source)
.map(|pe| ReadvAt {
op: Some(pe),
bufsav,
state: AioState::Allocated,
})
}
pub fn write_at<'a>(&self, buf: &'a [u8],
offset: u64) -> io::Result<AioFut<'a>>
{
let fd = self.file.as_raw_fd();
let aiocb = mio_aio::AioCb::from_slice(fd, offset, buf, 0,
mio_aio::LioOpcode::LIO_NOP);
let source = WrappedAioCb(aiocb);
Aio::new_for_aio(source)
.map(|pe| AioFut{
op: AioOp::Write(pe),
state: AioState::Allocated
})
}
pub fn writev_at<'a>(&self, bufs: &[&'a [u8]], offset: u64)
-> io::Result<WritevAt<'a>>
{
let mut builder = mio_aio::LioCbBuilder::with_capacity(bufs.len());
let mut offs = offset;
let fd = self.file.as_raw_fd();
let mut accumulator: Option<Pin<Box<[u8]>>> = None;
if self.sectorsize > 1 &&
bufs.iter().any(|buf| buf.len() % self.sectorsize != 0)
{
let mut accum = Vec::<u8>::new();
for buf in bufs.iter() {
accum.extend_from_slice(&buf[..]);
}
accumulator = Some(accum.into_boxed_slice().into());
let buf: &'static [u8] = unsafe{
mem::transmute::<&[u8], &'static [u8]>(
&(accumulator.as_ref().unwrap().as_ref())
)
};
builder = builder.emplace_slice(
fd,
offs,
buf,
0,
mio_aio::LioOpcode::LIO_WRITE
);
} else {
for buf in bufs {
let l = buf.len();
builder = builder.emplace_slice(
fd,
offs,
buf,
0,
mio_aio::LioOpcode::LIO_WRITE
);
offs += l as u64;
}
}
let liocb = builder.finish();
let source = WrappedLioCb(liocb);
Aio::new_for_lio(source)
.map(|pe|
WritevAt {
_accumulator: accumulator,
op: Some(pe),
state: AioState::Allocated,
}
)
}
pub fn sync_all(&self) -> io::Result<AioFut<'static>> {
let aiocb = mio_aio::AioCb::from_fd(self.file.as_raw_fd(),
0, );
let source = WrappedAioCb(aiocb);
Aio::new_for_aio(source)
.map(|pe| AioFut{
op: AioOp::Fsync(pe),
state: AioState::Allocated
})
}
}
impl AsRawFd for File {
fn as_raw_fd(&self) -> RawFd {
self.file.as_raw_fd()
}
}
impl<'a> Future for AioFut<'a> {
type Output = Result<AioResult, nix::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let poll_result = match self.op {
AioOp::Fsync(ref mut io) =>
io.poll_ready(cx),
AioOp::Read(ref mut io) =>
io.poll_ready(cx),
AioOp::Write(ref mut io) =>
io.poll_ready(cx),
};
match poll_result {
Poll::Pending => {
if self.state == AioState::Allocated {
let r = match self.op {
AioOp::Fsync(ref mut pe) => (*pe).0
.fsync(mio_aio::AioFsyncMode::O_SYNC),
AioOp::Read(ref mut pe) => (*pe).0.read(),
AioOp::Write(ref mut pe) => (*pe).0.write(),
};
if let Err(e) = r {
return Poll::Ready(Err(e));
}
self.state = AioState::InProgress;
}
Poll::Pending
},
Poll::Ready(Err(e)) => conv_poll_err(e),
Poll::Ready(Ok(_ev)) => {
let result = self.aio_return();
match result {
Ok(x) => Poll::Ready(Ok(AioResult{value: x})),
Err(x) => Poll::Ready(Err(x))
}
}
}
}
}