use crate::{
Command, Control, Error, ErrorKind, ResultExt, StandardError, StandardInput, StandardOutput,
};
use futures::prelude::*;
use std::{marker::PhantomData, mem::ManuallyDrop};
use tokio_codec::{Decoder, FramedRead};
use tokio_io::AsyncRead;
use tokio_process::{ChildStderr, ChildStdin, ChildStdout};
pub struct Capture<'a, C, D, R, Item>
where
R: AsyncRead,
{
command: *mut C,
framed_read: ManuallyDrop<FramedRead<&'a mut R, D>>,
_item: PhantomData<Item>,
}
impl<'a, C, D, Item> Capture<'a, C, D, ChildStdout, Item>
where
C: Control + StandardOutput<'a> + 'a,
D: Decoder<Item = Item>,
{
pub(super) fn new_stdout(command: C, decoder: D) -> Self {
unsafe {
let ptr = Box::into_raw(Box::new(command));
let stdout = (*ptr).standard_output();
let framed_read = ManuallyDrop::new(FramedRead::new(stdout, decoder));
Capture {
command: ptr,
framed_read,
_item: PhantomData,
}
}
}
}
impl<'a, C, D, Item> Capture<'a, C, D, ChildStderr, Item>
where
C: Control + StandardError<'a> + 'a,
D: Decoder<Item = Item>,
{
pub(super) fn new_stderr(command: C, decoder: D) -> Self {
unsafe {
let ptr = Box::into_raw(Box::new(command));
let stderr = (*ptr).standard_error();
let framed_read = ManuallyDrop::new(FramedRead::new(stderr, decoder));
Capture {
command: ptr,
framed_read,
_item: PhantomData,
}
}
}
}
impl<'a, C, D, R, Item> Control for Capture<'a, C, D, R, Item>
where
C: Control,
R: AsyncRead,
{
#[inline]
fn command(&self) -> &Command {
unsafe { (*self.command).command() }
}
#[inline]
fn id(&self) -> u32 {
unsafe { (*self.command).id() }
}
#[inline]
fn kill(&mut self) -> Result<(), Error> {
unsafe { (*self.command).kill() }
}
}
impl<'a, C, D, Item> StandardOutput<'a> for Capture<'a, C, D, ChildStderr, Item>
where
C: StandardOutput<'a>,
D: 'a,
Item: 'a,
{
#[inline]
fn standard_output(&mut self) -> &mut ChildStdout {
unsafe { (*self.command).standard_output() }
}
}
impl<'a, C, D, Item> StandardError<'a> for Capture<'a, C, D, ChildStdout, Item>
where
C: StandardError<'a>,
D: 'a,
Item: 'a,
{
#[inline]
fn standard_error(&mut self) -> &mut ChildStderr {
unsafe { (*self.command).standard_error() }
}
}
impl<'a, C, D, R, Item> StandardInput<'a> for Capture<'a, C, D, R, Item>
where
R: AsyncRead,
C: StandardInput<'a>,
D: 'a,
Item: 'a,
{
#[inline]
fn standard_input(&mut self) -> &mut ChildStdin {
unsafe { (*self.command).standard_input() }
}
}
impl<'a, C, D, R, Item> Drop for Capture<'a, C, D, R, Item>
where
R: AsyncRead,
{
fn drop(&mut self) {
let boxed = unsafe { Box::from_raw(self.command) };
unsafe {
ManuallyDrop::drop(&mut self.framed_read);
}
std::mem::drop(boxed);
}
}
impl<'a, C, D, E, R, Item> Stream for Capture<'a, C, D, R, Item>
where
R: AsyncRead,
D: Decoder<Item = Item, Error = E>,
E: std::error::Error + Send + From<std::io::Error> + 'static,
{
type Item = Item;
type Error = Error;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.framed_read.poll().chain_err(|| ErrorKind::Capture)
}
}
impl<'a, C, D, R, Item> Sink for Capture<'a, C, D, R, Item>
where
C: Sink,
R: AsyncRead,
{
type SinkItem = <C as Sink>::SinkItem;
type SinkError = <C as Sink>::SinkError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
unsafe { (*self.command).start_send(item) }
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
unsafe { (*self.command).poll_complete() }
}
}