tokio_hglib/
connection.rs

1//! Abstraction to support both socket-like streams and pipes.
2
3use futures::{Sink, Stream};
4use std::io;
5use tokio::process::{Child, ChildStdin, ChildStdout};
6use tokio_util::codec::{FramedRead, FramedWrite};
7
8use crate::codec::{BlockEncoder, BlockMessage, ChannelDecoder, ChannelMessage};
9
10/// Leaky abstraction to hold connection to a command server.
11pub trait Connection {
12    type Rx: Stream<Item = io::Result<ChannelMessage>> + Unpin;
13    type Tx: Sink<BlockMessage, Error = io::Error> + Unpin;
14
15    fn get_rx_mut(&mut self) -> &mut Self::Rx;
16    fn get_tx_mut(&mut self) -> &mut Self::Tx;
17}
18
19/// Wrapper for `Child` process to keep it alive and provide stdio channels.
20#[derive(Debug)]
21pub struct PipeConnection {
22    rx: FramedRead<ChildStdout, ChannelDecoder>,
23    tx: FramedWrite<ChildStdin, BlockEncoder>,
24    child: Child,
25}
26
27impl PipeConnection {
28    pub fn new(mut child: Child) -> PipeConnection {
29        let stdout = child.stdout.take().unwrap();
30        let stdin = child.stdin.take().unwrap();
31        let rx = FramedRead::new(stdout, ChannelDecoder::new());
32        let tx = FramedWrite::new(stdin, BlockEncoder::new());
33        PipeConnection { rx, tx, child }
34    }
35}
36
37impl Connection for PipeConnection {
38    type Rx = FramedRead<ChildStdout, ChannelDecoder>;
39    type Tx = FramedWrite<ChildStdin, BlockEncoder>;
40
41    fn get_rx_mut(&mut self) -> &mut Self::Rx {
42        &mut self.rx
43    }
44
45    fn get_tx_mut(&mut self) -> &mut Self::Tx {
46        &mut self.tx
47    }
48}
49
50#[cfg(unix)]
51pub use self::unix::UnixConnection;
52
53#[cfg(unix)]
54mod unix {
55    use std::os::unix::io::{AsRawFd, RawFd};
56    use tokio::net::UnixStream;
57    use tokio_util::codec::{Decoder, Framed};
58
59    use super::*;
60    use crate::codec::ClientCodec;
61
62    /// Wrapper for `UnixStream` which has no separate read/write channels.
63    #[derive(Debug)]
64    pub struct UnixConnection {
65        framed: Framed<UnixStream, ClientCodec>,
66    }
67
68    impl UnixConnection {
69        pub fn new(stream: UnixStream) -> UnixConnection {
70            let framed = ClientCodec::new().framed(stream);
71            UnixConnection { framed }
72        }
73    }
74
75    impl Connection for UnixConnection {
76        type Rx = Framed<UnixStream, ClientCodec>;
77        type Tx = Framed<UnixStream, ClientCodec>;
78
79        fn get_rx_mut(&mut self) -> &mut Self::Rx {
80            &mut self.framed
81        }
82
83        fn get_tx_mut(&mut self) -> &mut Self::Tx {
84            &mut self.framed
85        }
86    }
87
88    impl AsRawFd for UnixConnection {
89        fn as_raw_fd(&self) -> RawFd {
90            self.framed.get_ref().as_raw_fd()
91        }
92    }
93}