// vim: tw=80
use futures::{
Future,
task::{Context, Poll}
};
pub use mio_aio::LioError;
use nix::errno::Errno;
use tokio::io::bsd::{AioSource, Aio};
use std::{fs, io, mem};
use std::os::unix::fs::FileTypeExt;
use std::os::unix::io::{AsRawFd, RawFd};
use std::path::Path;
use std::pin::Pin;
nix::ioctl_read! {
/// Get the size of the entire device in bytes. This should be a multiple
/// of the sector size.
diocgmediasize, 'd', 129, nix::libc::off_t
}
nix::ioctl_read! {
diocgsectorsize, 'd', 128, nix::libc::c_uint
}
nix::ioctl_read! {
diocgstripesize, 'd', 139, nix::libc::off_t
}
fn conv_poll_err<T>(e: io::Error) -> Poll<Result<T, nix::Error>> {
let raw = e.raw_os_error().unwrap_or(0);
let errno = Errno::from_i32(raw);
Poll::Ready(Err(errno))
}
macro_rules! lio_resubmit {
($self: ident, $ev: expr) => {
{
// Some requests must've completed; now issue the rest.
let result = (*$self.op.as_mut().unwrap()).0.resubmit();
$self.op
.as_mut()
.unwrap()
.clear_ready($ev);
match result {
Ok(()) => {
$self.state = AioState::InProgress;
None
},
Err(LioError::EINCOMPLETE) => None,
Err(LioError::EAGAIN) => Some(Err(Errno::EAGAIN)),
Err(LioError::EIO(_)) => Some(Err(Errno::EIO)),
}
}
}
}
/// Must wrap the real mio::event::Source in order to implement AioSource, which
/// exists because Tokio has a policy of not exposing any mio types in its
/// public API.
#[derive(Debug)]
struct WrappedAioCb<'a>(mio_aio::AioCb<'a>);
impl<'a> AioSource for WrappedAioCb<'a> {
fn register(&mut self, kq: RawFd, token: usize) {
self.0.register_raw(kq, token)
}
fn deregister(&mut self) {
self.0.deregister_raw()
}
}
/// Must wrap the real mio::event::Source in order to implement AioSource, which
/// exists because Tokio has a policy of not exposing any mio types in its
/// public API.
#[derive(Debug)]
struct WrappedLioCb<'a>(mio_aio::LioCb<'a>);
impl<'a> AioSource for WrappedLioCb<'a> {
fn register(&mut self, kq: RawFd, token: usize) {
self.0.register_raw(kq, token)
}
fn deregister(&mut self) {
self.0.deregister_raw()
}
}
// LCOV_EXCL_START
#[derive(Debug)]
enum AioOp<'a> {
Fsync(Aio<WrappedAioCb<'static>>),
Read(Aio<WrappedAioCb<'a>>),
Write(Aio<WrappedAioCb<'a>>),
}
// LCOV_EXCL_STOP
// LCOV_EXCL_START
/// Represents the progress of a single AIO operation
#[derive(Debug, Eq, PartialEq)]
enum AioState {
/// The AioFut has been allocated, but not submitted to the system
Allocated,
/// The AioFut has been submitted, ie with `write_at`, and is currently in
/// progress, but its status has not been returned with `aio_return`
InProgress,
/// A `ReadvAt` or `WritevAt` has been submitted and some of its constituent
/// `AioCb`s are in-progress, but some are not due to resource limitations.
Incomplete,
}
// LCOV_EXCL_STOP
// LCOV_EXCL_START
/// A Future representing an AIO operation.
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct AioFut<'a> {
op: AioOp<'a>,
state: AioState,
}
// LCOV_EXCL_STOP
impl<'a> AioFut<'a> {
// Used internally by `futures::Future::poll`. Should not be called by the
// user.
#[doc(hidden)]
fn aio_return(&mut self) -> Result<Option<isize>, nix::Error> {
match self.op {
AioOp::Fsync(ref mut io) =>
(*io).0.aio_return().map(|_| None),
AioOp::Read(ref mut io) | AioOp::Write(ref mut io) =>
(*io).0.aio_return().map(Some),
}
}
}
/// Holds the result of an individual aio operation
pub struct AioResult {
/// This is what the AIO operation would've returned, had it been
/// synchronous and successful. fsync operations return `None`, read and
/// write operations return an `isize`
pub value: Option<isize>,
}
/// The return value of [`File::readv_at`]
#[must_use = "futures do nothing unless polled"]
#[allow(clippy::type_complexity)]
pub struct ReadvAt<'a> {
op: Option<Aio<WrappedLioCb<'a>>>,
/// If needed, bufsav.0 combines [`File::readv_at`]'s argument slices into a
/// bigger slice that satisfies sectorsize requirements. After completion,
/// the data will be copied back to bufsav.1
bufsav: Option<(Pin<Box<[u8]>>, &'a mut [&'a mut [u8]])>,
state: AioState,
}
/// The return value of [`File::writev_at`]
#[must_use = "futures do nothing unless polled"]
pub struct WritevAt<'a> {
op: Option<Aio<WrappedLioCb<'a>>>,
/// If needed, _accumulator combines [`File::writev_at`]'s argument slices
/// into a bigger slice that satisfies sectorsize requirements, and owns the
/// data for the lifetime of [`WritevAt::op`].
_accumulator: Option<Pin<Box<[u8]>>>,
state: AioState,
}
impl<'a> Future for ReadvAt<'a> {
type Output = Result<usize, nix::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let AioState::Allocated = self.state {
let result = (*self.op.as_mut().unwrap()).0.submit();
match result {
Ok(()) => self.state = AioState::InProgress,
Err(LioError::EINCOMPLETE) => {
// EINCOMPLETE means that some requests failed, but some
// were initiated
self.state = AioState::Incomplete;
},
Err(LioError::EAGAIN) =>
return Poll::Ready(Err(Errno::EAGAIN)),
Err(LioError::EIO(_)) => {
return Poll::Ready(Err(Errno::EIO))
},
}
}
loop {
let poll_result = self.op
.as_mut()
.unwrap()
.poll_ready(cx);
match poll_result {
Poll::Pending => break Poll::Pending,
Poll::Ready(Err(e)) => break conv_poll_err(e),
Poll::Ready(Ok(ev)) => {
if AioState::Incomplete == self.state {
if let Some(r) = lio_resubmit!(self, ev) {
break Poll::Ready(r);
}
} else {
let r = self.op.take()
.unwrap()
.into_inner()
.0
.into_results(|mut iter|
iter.try_fold(0, |total, lr|
lr.result.map(|r| total + r as usize)
)
);
if let Ok(v) = r {
if let Some((accum, ob)) = &mut self.bufsav {
// Copy results back into the individual buffers
let mut i = 0;
let mut j = 0;
let mut tot = 0;
while tot < v {
let z = (v - tot).min(ob[i].len() - j);
ob[i][j..j + z]
.copy_from_slice(&accum[tot..tot + z]);
j += z;
tot += z;
if j == ob[i].len() {
j = 0;
i += 1;
}
}
}
}
break Poll::Ready(r)
}
}
}
}
}
}
impl<'a> Future for WritevAt<'a> {
type Output = Result<usize, nix::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let AioState::Allocated = self.state {
let result = (*self.op.as_mut().unwrap()).0.submit();
match result {
Ok(()) => self.state = AioState::InProgress,
Err(LioError::EINCOMPLETE) => {
// EINCOMPLETE means that some requests failed, but some
// were initiated
self.state = AioState::Incomplete;
},
Err(LioError::EAGAIN) =>
return Poll::Ready(Err(Errno::EAGAIN)),
Err(LioError::EIO(_)) => {
return Poll::Ready(Err(Errno::EIO))
},
}
}
loop {
let poll_result = self.op
.as_mut()
.unwrap()
.poll_ready(cx);
match poll_result {
Poll::Pending => break Poll::Pending,
Poll::Ready(Err(e)) => break conv_poll_err(e),
Poll::Ready(Ok(ev)) => {
if AioState::Incomplete == self.state {
if let Some(r) = lio_resubmit!(self, ev) {
break Poll::Ready(r);
}
} else {
let r = self.op.take()
.unwrap()
.into_inner()
.0
.into_results(|mut iter|
iter.try_fold(0, |total, lr|
lr.result.map(|r| total + r as usize)
)
);
break Poll::Ready(r);
}
}
}
}
}
}
/// Basically a Tokio file handle. This is the starting point for tokio-file.
// LCOV_EXCL_START
#[derive(Debug)]
pub struct File {
file: fs::File,
/// The preferred (not necessarily minimum) sector size for accessing
/// the device
sectorsize: usize
}
// LCOV_EXCL_STOP
// is_empty doesn't make much sense for files
#[cfg_attr(feature = "cargo-clippy", allow(clippy::len_without_is_empty))]
impl File {
/// Get the file's size in bytes
pub fn len(&self) -> io::Result<u64> {
let md = self.metadata()?;
if self.sectorsize > 1 {
let mut mediasize = mem::MaybeUninit::<nix::libc::off_t>::uninit();
// This ioctl is always safe
unsafe {
diocgmediasize(self.file.as_raw_fd(), mediasize.as_mut_ptr())
}.map_err(|_| io::Error::from_raw_os_error(nix::errno::errno()))?;
// Safe because we know the ioctl succeeded
unsafe { Ok(mediasize.assume_init() as u64) }
} else {
Ok(md.len())
}
}
/// Get metadata from the underlying file
///
/// POSIX AIO doesn't provide a way to do this asynchronously, so it must be
/// synchronous.
pub fn metadata(&self) -> io::Result<fs::Metadata> {
self.file.metadata()
}
/// Create a new Tokio File from an ordinary `std::fs::File` object
///
/// # Examples
///
/// ```
/// use std::fs;
/// use tokio_file;
///
/// fs::OpenOptions::new()
/// .read(true)
/// .write(true)
/// .create(true)
/// .open("/tmp/tokio-file-new-example")
/// .map(tokio_file::File::new)
/// .unwrap();
/// # fs::remove_file("/tmp/tokio-file-new-example").unwrap();
/// ```
pub fn new(file: fs::File) -> File {
let md = file.metadata().unwrap();
let ft = md.file_type();
let sectorsize = if ft.is_block_device() || ft.is_char_device() {
let mut sectorsize = mem::MaybeUninit::<u32>::uninit();
let mut stripesize = mem::MaybeUninit::<nix::libc::off_t>::uninit();
let fd = file.as_raw_fd();
unsafe {
diocgsectorsize(fd, sectorsize.as_mut_ptr()).unwrap();
diocgstripesize(fd, stripesize.as_mut_ptr()).unwrap();
if stripesize.assume_init() > 0 {
stripesize.assume_init() as usize
} else {
sectorsize.assume_init() as usize
}
}
} else {
1
};
File {file, sectorsize}
}
/// Open a new Tokio file with mode `O_RDWR | O_CREAT`.
// Technically, sfd::fs::File::open can block, so we should make a
// nonblocking File::open method and have it return a Future. That's what
// Seastar does. But POSIX AIO doesn't have any kind of asynchronous open
// function, so there's no straightforward way to implement such a method.
// Instead, we'll block.
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)
.map( File::new)
}
/// Asynchronous equivalent of `std::fs::File::read_at`
///
/// # Examples
///
/// ```
/// use std::fs;
/// use std::io::Write;
/// use tempfile::TempDir;
/// use tokio::runtime;
///
/// const WBUF: &[u8] = b"abcdef";
/// const EXPECT: &[u8] = b"cdef";
/// let mut rbuf = vec![0; 4];
/// let dir = TempDir::new().unwrap();
/// let path = dir.path().join("foo");
/// let mut f = fs::File::create(&path).unwrap();
/// f.write(WBUF).unwrap();
///
/// let file = fs::OpenOptions::new()
/// .read(true)
/// .open(&path)
/// .map(tokio_file::File::new)
/// .unwrap();
/// let rt = runtime::Builder::new_current_thread()
/// .enable_io()
/// .build()
/// .unwrap();
/// let r = rt.block_on(async {
/// file.read_at(&mut rbuf[..], 2).unwrap().await
/// }).unwrap();
/// assert_eq!(&rbuf[..], &EXPECT[..]);
/// ```
pub fn read_at<'a>(&self, buf: &'a mut [u8], offset: u64)
-> io::Result<AioFut<'a>>
{
let aiocb = mio_aio::AioCb::from_mut_slice(self.file.as_raw_fd(),
offset, //offset
buf,
0, //priority
mio_aio::LioOpcode::LIO_NOP);
let source = WrappedAioCb(aiocb);
Aio::new_for_aio(source)
.map(|pe| AioFut {
op: AioOp::Read(pe),
state: AioState::Allocated
})
}
/// Asynchronous equivalent of `preadv`.
///
/// Similar to
/// [preadv(2)](https://www.freebsd.org/cgi/man.cgi?query=read&sektion=2)
/// but asynchronous. Reads a contiguous portion of a file into a
/// scatter-gather list of buffers. Unlike `preadv`, there is no guarantee
/// of overall atomicity. Each scatter gather element's contents could
/// reflect the state of the file at a different point in time.
///
/// # Parameters
///
/// - `bufs`: The destination for the read. A scatter-gather list of
/// buffers.
/// - `offset`: Offset within the file at which to begin the read
///
/// # Returns
///
/// - `Ok(x)`: The operation was successfully created. The future may be
/// polled and will eventually return the final status of the
/// operation. If the operation was partially successful, the
/// future will complete an error with no indication of which
/// parts of `bufs` are valid.
/// - `Err(x)`: An error occurred before issueing the operation. The result
/// may be `drop`ped.
///
/// # Examples
///
/// ```
/// use std::borrow::BorrowMut;
/// use std::fs;
/// use std::io::Write;
/// use tempfile::TempDir;
/// use tokio::runtime;
///
/// const WBUF: &[u8] = b"abcdefghijklmnopqrwtuvwxyz";
/// const EXPECT0: &[u8] = b"cdef";
/// const EXPECT1: &[u8] = b"ghijklmn";
/// let l0 = 4;
/// let l1 = 8;
/// let mut rbuf0 = vec![0; l0];
/// let mut rbuf1 = vec![0; l1];
/// let mut rbufs = [&mut rbuf0[..], &mut rbuf1[..]];
///
/// let dir = TempDir::new().unwrap();
/// let path = dir.path().join("foo");
/// let mut f = fs::File::create(&path).unwrap();
/// f.write(WBUF).unwrap();
///
/// let file = fs::OpenOptions::new()
/// .read(true)
/// .open(&path)
/// .map(tokio_file::File::new)
/// .unwrap();
/// let rt = runtime::Builder::new_current_thread()
/// .enable_io()
/// .build()
/// .unwrap();
/// let mut r = rt.block_on(async {
/// file.readv_at(&mut rbufs[..], 2).unwrap().await
/// }).unwrap();
///
/// assert_eq!(l0 + l1, r);
/// assert_eq!(&rbuf0[..], &EXPECT0[..]);
/// assert_eq!(&rbuf1[..], &EXPECT1[..]);
/// ```
pub fn readv_at<'a>(&self, bufs: &'a mut [&'a mut [u8]],
offset: u64) -> io::Result<ReadvAt<'a>>
{
let mut builder = mio_aio::LioCbBuilder::with_capacity(bufs.len());
let mut offs = offset;
let fd = self.file.as_raw_fd();
let mut bufsav = None;
if self.sectorsize > 1 &&
bufs.iter().any(|buf| buf.len() % self.sectorsize != 0)
{
let l = bufs.iter().map(|buf| buf.len()).sum();
let mut accumulator: Pin<Box<[u8]>> =
vec![0; l].into_boxed_slice().into();
let original_buffers = bufs;
let buf: &'static mut [u8] = unsafe{
// Safe because the liocb's lifetime is equal to accumulator's
// (or rather, it will be once we move it into the ReadvAt
// struct).
mem::transmute::<&mut [u8], &'static mut [u8]>(
&mut (accumulator.as_mut())
)
};
bufsav = Some((accumulator, original_buffers));
builder = builder.emplace_mut_slice(
fd,
offs,
buf,
0,
mio_aio::LioOpcode::LIO_READ
);
} else {
for buf in bufs.iter_mut() {
let l = buf.len();
builder = builder.emplace_mut_slice(
fd,
offs,
*buf,
0,
mio_aio::LioOpcode::LIO_READ
);
offs += l as u64;
}
}
let liocb = builder.finish();
let source = WrappedLioCb(liocb);
Aio::new_for_lio(source)
.map(|pe| ReadvAt {
op: Some(pe),
bufsav,
//accumulator,
state: AioState::Allocated,
//original_buffers
})
}
/// Asynchronous equivalent of `std::fs::File::write_at`.
///
/// # Examples
///
/// ```
/// use std::fs;
/// use std::io::Read;
/// use tempfile::TempDir;
/// use tokio::runtime;
///
/// let contents = b"abcdef";
/// let mut rbuf = Vec::new();
///
/// let dir = TempDir::new().unwrap();
/// let path = dir.path().join("foo");
/// let file = fs::OpenOptions::new()
/// .create(true)
/// .write(true)
/// .open(&path)
/// .map(tokio_file::File::new)
/// .unwrap();
/// let rt = runtime::Builder::new_current_thread()
/// .enable_io()
/// .build()
/// .unwrap();
/// let r = rt.block_on(async {
/// file.write_at(contents, 0).unwrap().await
/// }).unwrap();
/// assert_eq!(r.value.unwrap() as usize, contents.len());
/// drop(file);
///
/// let mut file = fs::File::open(&path).unwrap();
/// assert_eq!(file.read_to_end(&mut rbuf).unwrap(), contents.len());
/// assert_eq!(&contents[..], &rbuf[..]);
/// ```
pub fn write_at<'a>(&self, buf: &'a [u8],
offset: u64) -> io::Result<AioFut<'a>>
{
let fd = self.file.as_raw_fd();
let aiocb = mio_aio::AioCb::from_slice(fd, offset, buf, 0,
mio_aio::LioOpcode::LIO_NOP);
let source = WrappedAioCb(aiocb);
Aio::new_for_aio(source)
.map(|pe| AioFut{
op: AioOp::Write(pe),
state: AioState::Allocated
})
}
/// Asynchronous equivalent of `pwritev`
///
/// Similar to
/// [pwritev(2)](https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2)
/// but asynchronous. Writes a scatter-gather list of buffers into a
/// contiguous portion of a file. Unlike `pwritev`, there is no guarantee
/// of overall atomicity. Each scatter gather element's contents are
/// written independently.
///
/// # Parameters
///
/// - `bufs`: The data to write. A scatter-gather list of buffers.
/// - `offset`: Offset within the file at which to begin the write
///
/// # Returns
///
/// - `Ok(x)`: The operation was successfully created. The future may be
/// polled and will eventually return the final status of the
/// operation. If the operation was partially successful, the
/// future will complete an error with no indication of which
/// parts of the file were actually written.
/// - `Err(x)`: An error occurred before issueing the operation. The result
/// may be `drop`ped.
///
/// # Examples
///
/// ```
/// use std::fs;
/// use std::io::Read;
/// use tempfile::TempDir;
/// use tokio::runtime;
///
/// const EXPECT: &[u8] = b"abcdefghij";
/// let wbuf0 = b"abcdef";
/// let wbuf1 = b"ghij";
/// let wbufs = vec![&wbuf0[..], &wbuf1[..]];
/// let mut rbuf = Vec::new();
///
/// let dir = TempDir::new().unwrap();
/// let path = dir.path().join("foo");
/// let file = fs::OpenOptions::new()
/// .create(true)
/// .write(true)
/// .open(&path)
/// .map(tokio_file::File::new)
/// .unwrap();
/// let rt = runtime::Builder::new_current_thread()
/// .enable_io()
/// .build()
/// .unwrap();
/// let r = rt.block_on(async {
/// file.writev_at(&wbufs[..], 0).unwrap().await
/// }).unwrap();
///
/// assert_eq!(r, 10);
///
/// let mut f = fs::File::open(&path).unwrap();
/// let len = f.read_to_end(&mut rbuf).unwrap();
/// assert_eq!(len, EXPECT.len());
/// assert_eq!(rbuf, EXPECT);
pub fn writev_at<'a>(&self, bufs: &[&'a [u8]], offset: u64)
-> io::Result<WritevAt<'a>>
{
let mut builder = mio_aio::LioCbBuilder::with_capacity(bufs.len());
let mut offs = offset;
let fd = self.file.as_raw_fd();
let mut accumulator: Option<Pin<Box<[u8]>>> = None;
if self.sectorsize > 1 &&
bufs.iter().any(|buf| buf.len() % self.sectorsize != 0)
{
let mut accum = Vec::<u8>::new();
for buf in bufs.iter() {
accum.extend_from_slice(&buf[..]);
}
accumulator = Some(accum.into_boxed_slice().into());
let buf: &'static [u8] = unsafe{
// Safe because the liocb's lifetime is equal to accumulator's
// (or rather, it will be once we move it into the WritevAt
// struct).
mem::transmute::<&[u8], &'static [u8]>(
&(accumulator.as_ref().unwrap().as_ref())
)
};
builder = builder.emplace_slice(
fd,
offs,
buf,
0,
mio_aio::LioOpcode::LIO_WRITE
);
} else {
for buf in bufs {
let l = buf.len();
builder = builder.emplace_slice(
fd,
offs,
buf,
0,
mio_aio::LioOpcode::LIO_WRITE
);
offs += l as u64;
}
}
let liocb = builder.finish();
let source = WrappedLioCb(liocb);
Aio::new_for_lio(source)
.map(|pe|
WritevAt {
_accumulator: accumulator,
op: Some(pe),
state: AioState::Allocated,
}
)
}
/// Asynchronous equivalent of `std::fs::File::sync_all`
///
/// # Examples
///
/// ```
/// use std::borrow::BorrowMut;
/// use std::fs;
/// use std::io::Write;
/// use tempfile::TempDir;
/// use tokio::runtime;
///
/// let dir = TempDir::new().unwrap();
/// let path = dir.path().join("foo");
///
/// let file = fs::OpenOptions::new()
/// .write(true)
/// .create(true)
/// .open(&path)
/// .map(tokio_file::File::new)
/// .unwrap();
/// let rt = runtime::Builder::new_current_thread()
/// .enable_io()
/// .build()
/// .unwrap();
/// let r = rt.block_on(async {
/// file.sync_all().unwrap().await
/// }).unwrap();
/// ```
// TODO: add sync_all_data, for supported operating systems
pub fn sync_all(&self) -> io::Result<AioFut<'static>> {
let aiocb = mio_aio::AioCb::from_fd(self.file.as_raw_fd(),
0, //priority
);
let source = WrappedAioCb(aiocb);
Aio::new_for_aio(source)
.map(|pe| AioFut{
op: AioOp::Fsync(pe),
state: AioState::Allocated
})
}
}
impl AsRawFd for File {
fn as_raw_fd(&self) -> RawFd {
self.file.as_raw_fd()
}
}
impl<'a> Future for AioFut<'a> {
type Output = Result<AioResult, nix::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let poll_result = match self.op {
AioOp::Fsync(ref mut io) =>
io.poll_ready(cx),
AioOp::Read(ref mut io) =>
io.poll_ready(cx),
AioOp::Write(ref mut io) =>
io.poll_ready(cx),
};
match poll_result {
Poll::Pending => {
if self.state == AioState::Allocated {
let r = match self.op {
AioOp::Fsync(ref mut pe) => (*pe).0
.fsync(mio_aio::AioFsyncMode::O_SYNC),
AioOp::Read(ref mut pe) => (*pe).0.read(),
AioOp::Write(ref mut pe) => (*pe).0.write(),
};
if let Err(e) = r {
return Poll::Ready(Err(e));
}
self.state = AioState::InProgress;
}
Poll::Pending
},
Poll::Ready(Err(e)) => conv_poll_err(e),
Poll::Ready(Ok(_ev)) => {
let result = self.aio_return();
match result {
Ok(x) => Poll::Ready(Ok(AioResult{value: x})),
Err(x) => Poll::Ready(Err(x))
}
}
}
}
}