use std::future::Future;
use std::io::{self, IoSlice};
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::process::{ChildStderr, ChildStdin, ChildStdout};
use std::task::{self, Poll};
use heph::actor;
use mio::unix::pipe;
use mio::Interest;
use crate::bytes::{Bytes, BytesVectored, MaybeUninitSlice};
use crate::{self as rt, Bound};
pub fn new<M, RT>(ctx: &mut actor::Context<M, RT>) -> io::Result<(Sender, Receiver)>
where
RT: rt::Access,
{
let (mut sender, mut receiver) = pipe::new()?;
let rt = ctx.runtime();
rt.register(&mut sender, Interest::WRITABLE)?;
rt.register(&mut receiver, Interest::READABLE)?;
Ok((Sender { inner: sender }, Receiver { inner: receiver }))
}
#[derive(Debug)]
pub struct Sender {
inner: pipe::Sender,
}
impl Sender {
pub fn from_child_stdin<M, RT>(
ctx: &mut actor::Context<M, RT>,
stdin: ChildStdin,
) -> io::Result<Sender>
where
RT: rt::Access,
{
let mut sender = pipe::Sender::from(stdin);
sender.set_nonblocking(true)?;
ctx.runtime().register(&mut sender, Interest::WRITABLE)?;
Ok(Sender { inner: sender })
}
pub fn try_write(&mut self, buf: &[u8]) -> io::Result<usize> {
io::Write::write(&mut self.inner, buf)
}
pub fn write<'a, 'b>(&'a mut self, buf: &'b [u8]) -> Write<'a, 'b> {
Write { sender: self, buf }
}
pub fn write_all<'a, 'b>(&'a mut self, buf: &'b [u8]) -> WriteAll<'a, 'b> {
WriteAll { sender: self, buf }
}
pub fn try_write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
io::Write::write_vectored(&mut self.inner, bufs)
}
pub fn write_vectored<'a, 'b>(
&'a mut self,
bufs: &'b mut [IoSlice<'b>],
) -> WriteVectored<'a, 'b> {
WriteVectored { sender: self, bufs }
}
pub fn write_vectored_all<'a, 'b>(
&'a mut self,
bufs: &'b mut [IoSlice<'b>],
) -> WriteVectoredAll<'a, 'b> {
WriteVectoredAll { sender: self, bufs }
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Write<'a, 'b> {
sender: &'a mut Sender,
buf: &'b [u8],
}
impl<'a, 'b> Future for Write<'a, 'b> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
let Write { sender, buf } = Pin::into_inner(self);
try_io!(sender.try_write(*buf))
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteAll<'a, 'b> {
sender: &'a mut Sender,
buf: &'b [u8],
}
impl<'a, 'b> Future for WriteAll<'a, 'b> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
let WriteAll { sender, buf } = Pin::into_inner(self);
loop {
match sender.try_write(*buf) {
Ok(0) => return Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
Ok(n) if buf.len() <= n => return Poll::Ready(Ok(())),
Ok(n) => {
*buf = &buf[n..];
continue;
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break Poll::Pending,
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => break Poll::Ready(Err(err)),
}
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteVectored<'a, 'b> {
sender: &'a mut Sender,
bufs: &'b mut [IoSlice<'b>],
}
impl<'a, 'b> Future for WriteVectored<'a, 'b> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
let WriteVectored { sender, bufs } = Pin::into_inner(self);
try_io!(sender.try_write_vectored(*bufs))
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteVectoredAll<'a, 'b> {
sender: &'a mut Sender,
bufs: &'b mut [IoSlice<'b>],
}
impl<'a, 'b> Future for WriteVectoredAll<'a, 'b> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
let WriteVectoredAll { sender, bufs } = Pin::into_inner(self);
while !bufs.is_empty() {
match sender.try_write_vectored(*bufs) {
Ok(0) => return Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
Ok(n) => IoSlice::advance_slices(bufs, n),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return Poll::Pending,
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => return Poll::Ready(Err(err)),
}
}
Poll::Ready(Ok(()))
}
}
impl<RT: rt::Access> Bound<RT> for Sender {
type Error = io::Error;
fn bind_to<M>(&mut self, ctx: &mut actor::Context<M, RT>) -> io::Result<()> {
ctx.runtime()
.reregister(&mut self.inner, Interest::WRITABLE)
}
}
#[derive(Debug)]
pub struct Receiver {
inner: pipe::Receiver,
}
impl Receiver {
pub fn from_child_stdout<M, RT>(
ctx: &mut actor::Context<M, RT>,
stdout: ChildStdout,
) -> io::Result<Receiver>
where
RT: rt::Access,
{
let mut receiver = pipe::Receiver::from(stdout);
receiver.set_nonblocking(true)?;
ctx.runtime().register(&mut receiver, Interest::READABLE)?;
Ok(Receiver { inner: receiver })
}
pub fn from_child_stderr<M, RT>(
ctx: &mut actor::Context<M, RT>,
stderr: ChildStderr,
) -> io::Result<Receiver>
where
RT: rt::Access,
{
let mut receiver = pipe::Receiver::from(stderr);
receiver.set_nonblocking(true)?;
ctx.runtime().register(&mut receiver, Interest::READABLE)?;
Ok(Receiver { inner: receiver })
}
pub fn try_read<B>(&mut self, mut buf: B) -> io::Result<usize>
where
B: Bytes,
{
debug_assert!(
buf.has_spare_capacity(),
"called `Receiver::try_read` with an empty buffer"
);
let buf_bytes = unsafe { &mut *(buf.as_bytes() as *mut [MaybeUninit<u8>] as *mut [u8]) };
io::Read::read(&mut self.inner, buf_bytes).map(|read| {
unsafe { buf.update_length(read) }
read
})
}
pub fn read<'a, B>(&'a mut self, buf: B) -> Read<'a, B>
where
B: Bytes,
{
Read {
receiver: self,
buf,
}
}
pub fn read_n<'a, B>(&'a mut self, buf: B, n: usize) -> ReadN<'a, B>
where
B: Bytes,
{
debug_assert!(
buf.spare_capacity() >= n,
"called `Reader::read_n` with a buffer smaller then `n`",
);
ReadN {
receiver: self,
buf,
left: n,
}
}
pub fn try_read_vectored<B>(&mut self, mut bufs: B) -> io::Result<usize>
where
B: BytesVectored,
{
debug_assert!(
bufs.has_spare_capacity(),
"called `Receiver::try_read_vectored` with empty buffers"
);
let mut buffers = bufs.as_bufs();
let bufs_bytes = unsafe { MaybeUninitSlice::as_io(buffers.as_mut()) };
match io::Read::read_vectored(&mut self.inner, bufs_bytes) {
Ok(read) => {
drop(buffers);
unsafe { bufs.update_lengths(read) }
Ok(read)
}
Err(err) => Err(err),
}
}
pub fn read_vectored<B>(&mut self, bufs: B) -> ReadVectored<'_, B>
where
B: BytesVectored,
{
debug_assert!(
bufs.has_spare_capacity(),
"called `Receiver::read_vectored` with empty buffers"
);
ReadVectored {
receiver: self,
bufs,
}
}
pub fn read_n_vectored<B>(&mut self, bufs: B, n: usize) -> ReadNVectored<'_, B>
where
B: BytesVectored,
{
debug_assert!(
bufs.spare_capacity() >= n,
"called `Receiver::read_n_vectored` with a buffer smaller then `n`"
);
ReadNVectored {
receiver: self,
bufs,
left: n,
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Read<'b, B> {
receiver: &'b mut Receiver,
buf: B,
}
impl<'b, B> Future for Read<'b, B>
where
B: Bytes + Unpin,
{
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
let Read { receiver, buf } = Pin::into_inner(self);
try_io!(receiver.try_read(&mut *buf))
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadN<'b, B> {
receiver: &'b mut Receiver,
buf: B,
left: usize,
}
impl<'b, B> Future for ReadN<'b, B>
where
B: Bytes + Unpin,
{
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
let ReadN {
receiver,
buf,
left,
} = Pin::into_inner(self);
loop {
match receiver.try_read(&mut *buf) {
Ok(0) => return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())),
Ok(n) if n >= *left => return Poll::Ready(Ok(())),
Ok(n) => {
*left -= n;
continue;
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break Poll::Pending,
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => break Poll::Ready(Err(err)),
}
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadVectored<'b, B> {
receiver: &'b mut Receiver,
bufs: B,
}
impl<'b, B> Future for ReadVectored<'b, B>
where
B: BytesVectored + Unpin,
{
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
let ReadVectored { receiver, bufs } = Pin::into_inner(self);
try_io!(receiver.try_read_vectored(&mut *bufs))
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadNVectored<'b, B> {
receiver: &'b mut Receiver,
bufs: B,
left: usize,
}
impl<'b, B> Future for ReadNVectored<'b, B>
where
B: BytesVectored + Unpin,
{
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
let ReadNVectored {
receiver,
bufs,
left,
} = Pin::into_inner(self);
loop {
match receiver.try_read_vectored(&mut *bufs) {
Ok(0) => return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())),
Ok(n) if n >= *left => return Poll::Ready(Ok(())),
Ok(n) => {
*left -= n;
continue;
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break Poll::Pending,
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => break Poll::Ready(Err(err)),
}
}
}
}
impl<RT: rt::Access> Bound<RT> for Receiver {
type Error = io::Error;
fn bind_to<M>(&mut self, ctx: &mut actor::Context<M, RT>) -> io::Result<()> {
ctx.runtime()
.reregister(&mut self.inner, Interest::READABLE)
}
}