1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std;
use tokio;
use futures::{Poll, Async, Future};
use tokio::net::tcp::TcpStream;
use std::net::SocketAddr;
use std::process::{Stdio, Command};
use thrussh;
use std::io::Write;
pub enum Stream {
#[allow(missing_docs)]
Child(std::process::Child),
#[allow(missing_docs)]
Tcp(TcpStream)
}
pub struct ConnectFuture(Option<ConnectFuture_>);
enum ConnectFuture_ {
Tcp(tokio::net::tcp::ConnectFuture),
Child(std::process::Child),
}
impl Future for ConnectFuture {
type Item = Stream;
type Error = tokio::io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.take().unwrap() {
ConnectFuture_::Tcp(mut tcp) => {
match tcp.poll()? {
Async::Ready(tcp) => Ok(Async::Ready(Stream::Tcp(tcp))),
Async::NotReady => {
self.0 = Some(ConnectFuture_::Tcp(tcp));
Ok(Async::NotReady)
}
}
},
ConnectFuture_::Child(child) => Ok(Async::Ready(Stream::Child(child)))
}
}
}
impl Stream {
pub fn tcp_connect(addr: &SocketAddr) -> ConnectFuture {
ConnectFuture(Some(ConnectFuture_::Tcp(tokio::net::tcp::TcpStream::connect(addr))))
}
pub fn proxy_command(cmd: &str, args: &[&str]) -> ConnectFuture {
ConnectFuture(Some(ConnectFuture_::Child(
Command::new(cmd)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.args(args)
.spawn().unwrap()
)))
}
}
impl tokio::io::Read for Stream {
fn read(&mut self, r: &mut [u8]) -> std::io::Result<usize> {
match *self {
Stream::Child(ref mut c) => c.stdout.as_mut().unwrap().read(r),
Stream::Tcp(ref mut t) => t.read(r)
}
}
}
impl tokio::io::AsyncWrite for Stream {
fn shutdown(&mut self) -> Result<Async<()>, std::io::Error> {
match *self {
Stream::Child(ref mut c) => {
c.stdin.take();
Ok(Async::Ready(()))
},
Stream::Tcp(ref mut t) => t.shutdown()
}
}
fn poll_write(&mut self, r: &[u8]) -> Result<Async<usize>, std::io::Error> {
match *self {
Stream::Child(ref mut c) => {
match c.stdin.as_mut().unwrap().write(r) {
Ok(n) => Ok(Async::Ready(n)),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(Async::NotReady),
Err(e) => Err(e)
}
},
Stream::Tcp(ref mut t) => t.poll_write(r)
}
}
fn poll_flush(&mut self) -> Result<Async<()>, std::io::Error> {
match *self {
Stream::Child(ref mut c) => {
match c.stdin.as_mut().unwrap().flush() {
Ok(n) => Ok(Async::Ready(n)),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(Async::NotReady),
Err(e) => Err(e)
}
},
Stream::Tcp(ref mut t) => t.poll_flush()
}
}
}
impl std::io::Write for Stream {
fn write(&mut self, r: &[u8]) -> std::io::Result<usize> {
match *self {
Stream::Child(ref mut c) => c.stdin.as_mut().unwrap().write(r),
Stream::Tcp(ref mut t) => t.write(r)
}
}
fn flush(&mut self) -> std::io::Result<()> {
match *self {
Stream::Child(ref mut c) => c.stdin.as_mut().unwrap().flush(),
Stream::Tcp(ref mut t) => t.flush()
}
}
}
impl tokio::io::AsyncRead for Stream{}
impl thrussh::Tcp for Stream {
}