1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
//! Abstraction to support both socket-like streams and pipes.

use futures::{Sink, Stream};
use std::io;
use tokio_codec::{FramedRead, FramedWrite};
use tokio_io::io::{ReadHalf, WriteHalf};
use tokio_process::{Child, ChildStdin, ChildStdout};

use super::codec::{BlockCodec, BlockMessage, ChannelCodec, ChannelMessage};

/// Leaky abstraction to hold connection to a command server.
pub trait Connection: Send + 'static {
    type Rx: Stream<Item = ChannelMessage, Error = io::Error> + Send;
    type Tx: Sink<SinkItem = BlockMessage, SinkError = io::Error> + Send;
    type Aux: Send;

    fn from_parts(rx: Self::Rx, tx: Self::Tx, aux: Self::Aux) -> Self;
    fn into_parts(self) -> (Self::Rx, Self::Tx, Self::Aux);
}

/// Wrapper for `Child` process to keep it alive and provide stdio channels.
#[derive(Debug)]
pub struct PipeConnection {
    rx: FramedRead<ChildStdout, ChannelCodec>,
    tx: FramedWrite<ChildStdin, BlockCodec>,
    child: Child,
}

impl PipeConnection {
    pub fn new(mut child: Child) -> PipeConnection {
        let stdout = child.stdout().take().unwrap();
        let stdin = child.stdin().take().unwrap();
        let rx = FramedRead::new(stdout, ChannelCodec::new());
        let tx = FramedWrite::new(stdin, BlockCodec::new());
        PipeConnection { rx, tx, child }
    }
}

impl Connection for PipeConnection {
    type Rx = FramedRead<ChildStdout, ChannelCodec>;
    type Tx = FramedWrite<ChildStdin, BlockCodec>;
    type Aux = Child;

    fn from_parts(rx: Self::Rx, tx: Self::Tx, child: Self::Aux) -> Self {
        PipeConnection { rx, tx, child }
    }

    fn into_parts(self) -> (Self::Rx, Self::Tx, Self::Aux) {
        (self.rx, self.tx, self.child)
    }
}

#[cfg(unix)]
pub use self::unix::UnixConnection;

#[cfg(unix)]
mod unix {
    use super::*;
    use std::os::unix::io::{AsRawFd, RawFd};
    use tokio_io::AsyncRead;
    use tokio_uds::UnixStream;

    /// Wrapper for `UnixStream` which has no separate read/write channels.
    #[derive(Debug)]
    pub struct UnixConnection {
        rx: FramedRead<ReadHalf<UnixStream>, ChannelCodec>,
        tx: FramedWrite<WriteHalf<UnixStream>, BlockCodec>,
        raw_fd: RawFd,
    }

    impl UnixConnection {
        pub fn new(stream: UnixStream) -> UnixConnection {
            // remember raw_fd here because rx and tx can't be reunited as of Tokio 0.1.8
            // (TODO: keep (rx, tx) reunited in UnixConnection, and remove raw_fd field)
            let raw_fd = stream.as_raw_fd();
            let (r, w) = stream.split();
            let rx = FramedRead::new(r, ChannelCodec::new());
            let tx = FramedWrite::new(w, BlockCodec::new());
            UnixConnection { rx, tx, raw_fd }
        }
    }

    impl Connection for UnixConnection {
        type Rx = FramedRead<ReadHalf<UnixStream>, ChannelCodec>;
        type Tx = FramedWrite<WriteHalf<UnixStream>, BlockCodec>;
        type Aux = RawFd;

        fn from_parts(rx: Self::Rx, tx: Self::Tx, raw_fd: Self::Aux) -> Self {
            UnixConnection { rx, tx, raw_fd }
        }

        fn into_parts(self) -> (Self::Rx, Self::Tx, Self::Aux) {
            (self.rx, self.tx, self.raw_fd)
        }
    }

    impl AsRawFd for UnixConnection {
        fn as_raw_fd(&self) -> RawFd {
            self.raw_fd
        }
    }
}