1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use futures::{Sink, Stream};
use std::io;
use tokio_codec::{FramedRead, FramedWrite};
use tokio_io::io::{ReadHalf, WriteHalf};
use tokio_process::{Child, ChildStdin, ChildStdout};
use super::codec::{BlockCodec, BlockMessage, ChannelCodec, ChannelMessage};
pub trait Connection: Send + 'static {
type Rx: Stream<Item = ChannelMessage, Error = io::Error> + Send;
type Tx: Sink<SinkItem = BlockMessage, SinkError = io::Error> + Send;
type Aux: Send;
fn from_parts(rx: Self::Rx, tx: Self::Tx, aux: Self::Aux) -> Self;
fn into_parts(self) -> (Self::Rx, Self::Tx, Self::Aux);
}
#[derive(Debug)]
pub struct PipeConnection {
rx: FramedRead<ChildStdout, ChannelCodec>,
tx: FramedWrite<ChildStdin, BlockCodec>,
child: Child,
}
impl PipeConnection {
pub fn new(mut child: Child) -> PipeConnection {
let stdout = child.stdout().take().unwrap();
let stdin = child.stdin().take().unwrap();
let rx = FramedRead::new(stdout, ChannelCodec::new());
let tx = FramedWrite::new(stdin, BlockCodec::new());
PipeConnection { rx, tx, child }
}
}
impl Connection for PipeConnection {
type Rx = FramedRead<ChildStdout, ChannelCodec>;
type Tx = FramedWrite<ChildStdin, BlockCodec>;
type Aux = Child;
fn from_parts(rx: Self::Rx, tx: Self::Tx, child: Self::Aux) -> Self {
PipeConnection { rx, tx, child }
}
fn into_parts(self) -> (Self::Rx, Self::Tx, Self::Aux) {
(self.rx, self.tx, self.child)
}
}
#[cfg(unix)]
pub use self::unix::UnixConnection;
#[cfg(unix)]
mod unix {
use super::*;
use std::os::unix::io::{AsRawFd, RawFd};
use tokio_io::AsyncRead;
use tokio_uds::UnixStream;
#[derive(Debug)]
pub struct UnixConnection {
rx: FramedRead<ReadHalf<UnixStream>, ChannelCodec>,
tx: FramedWrite<WriteHalf<UnixStream>, BlockCodec>,
raw_fd: RawFd,
}
impl UnixConnection {
pub fn new(stream: UnixStream) -> UnixConnection {
let raw_fd = stream.as_raw_fd();
let (r, w) = stream.split();
let rx = FramedRead::new(r, ChannelCodec::new());
let tx = FramedWrite::new(w, BlockCodec::new());
UnixConnection { rx, tx, raw_fd }
}
}
impl Connection for UnixConnection {
type Rx = FramedRead<ReadHalf<UnixStream>, ChannelCodec>;
type Tx = FramedWrite<WriteHalf<UnixStream>, BlockCodec>;
type Aux = RawFd;
fn from_parts(rx: Self::Rx, tx: Self::Tx, raw_fd: Self::Aux) -> Self {
UnixConnection { rx, tx, raw_fd }
}
fn into_parts(self) -> (Self::Rx, Self::Tx, Self::Aux) {
(self.rx, self.tx, self.raw_fd)
}
}
impl AsRawFd for UnixConnection {
fn as_raw_fd(&self) -> RawFd {
self.raw_fd
}
}
}