1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use crate::tcp;
use futures::task::{Context, Poll};
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::pin::Pin;
use std::process::Command;
use tokio;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;

/// A type to implement either a TCP socket, or proxying through an external command.
pub enum Stream {
    #[allow(missing_docs)]
    Child(std::process::Child),
    #[allow(missing_docs)]
    Tcp(TcpStream),
}

impl Stream {
    /// Connect a direct TCP stream (as opposed to a proxied one).
    pub async fn tcp_connect(addr: &SocketAddr) -> Result<Stream, tokio::io::Error> {
        TcpStream::connect(addr).await.map(Stream::Tcp)
    }
    /// Connect through a proxy command.
    pub fn proxy_connect(cmd: &str, args: &[&str]) -> Result<Stream, anyhow::Error> {
        Ok(Stream::Child(Command::new(cmd).args(args).spawn()?))
    }
}

impl AsyncRead for Stream {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context,
        buf: &mut [u8],
    ) -> Poll<Result<usize, tokio::io::Error>> {
        match *self {
            Stream::Child(ref mut c) => Poll::Ready(c.stdout.as_mut().unwrap().read(buf)),
            Stream::Tcp(ref mut t) => AsyncRead::poll_read(Pin::new(t), cx, buf),
        }
    }
}

impl AsyncWrite for Stream {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context,
        buf: &[u8],
    ) -> Poll<Result<usize, tokio::io::Error>> {
        match *self {
            Stream::Child(ref mut c) => Poll::Ready(c.stdin.as_mut().unwrap().write(buf)),
            Stream::Tcp(ref mut t) => AsyncWrite::poll_write(Pin::new(t), cx, buf),
        }
    }

    fn poll_flush(
        mut self: Pin<&mut Self>,
        cx: &mut Context,
    ) -> Poll<Result<(), tokio::io::Error>> {
        match *self {
            Stream::Child(_) => Poll::Ready(Ok(())),
            Stream::Tcp(ref mut t) => AsyncWrite::poll_flush(Pin::new(t), cx),
        }
    }

    fn poll_shutdown(
        mut self: Pin<&mut Self>,
        cx: &mut Context,
    ) -> Poll<Result<(), tokio::io::Error>> {
        match *self {
            Stream::Child(_) => Poll::Ready(Ok(())),
            Stream::Tcp(ref mut t) => AsyncWrite::poll_shutdown(Pin::new(t), cx),
        }
    }
}

impl tcp::Tcp for Stream {
    fn tcp_shutdown(&mut self) -> Result<(), std::io::Error> {
        match *self {
            Stream::Child(_) => Ok(()),
            Stream::Tcp(ref mut t) => t.tcp_shutdown(),
        }
    }
}