1use crate::{Error, Result};
4use futures_util::{
5 io::{AsyncRead, AsyncReadExt, AsyncWrite},
6 stream::{Stream, TryStreamExt},
7};
8use pin_project::pin_project;
9use std::{convert::TryInto, io};
10
11#[derive(Debug, Clone)]
15pub enum TtyChunk {
16 StdIn(Vec<u8>),
17 StdOut(Vec<u8>),
18 StdErr(Vec<u8>),
19}
20
21impl From<TtyChunk> for Vec<u8> {
22 fn from(tty_chunk: TtyChunk) -> Self {
23 match tty_chunk {
24 TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
25 }
26 }
27}
28
29impl AsRef<Vec<u8>> for TtyChunk {
30 fn as_ref(&self) -> &Vec<u8> {
31 match self {
32 TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
33 }
34 }
35}
36
37impl std::ops::Deref for TtyChunk {
38 type Target = Vec<u8>;
39 fn deref(&self) -> &Self::Target {
40 self.as_ref()
41 }
42}
43
44impl std::ops::DerefMut for TtyChunk {
45 fn deref_mut(&mut self) -> &mut Vec<u8> {
46 match self {
47 TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
48 }
49 }
50}
51
52async fn decode_chunk<S>(mut stream: S) -> Option<(Result<TtyChunk>, S)>
53where
54 S: AsyncRead + Unpin,
55{
56 let mut header_bytes = [0u8; 8];
57
58 match stream.read_exact(&mut header_bytes).await {
59 Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => return None,
60 Err(e) => return Some((Err(Error::IO(e)), stream)),
61 _ => (),
62 }
63
64 let size_bytes = &header_bytes[4..];
65 let data_length = u32::from_be_bytes(size_bytes.try_into().unwrap());
66
67 let mut data = vec![0u8; data_length as usize];
68
69 if stream.read_exact(&mut data).await.is_err() {
70 return None;
71 }
72
73 let chunk = match header_bytes[0] {
74 0 => TtyChunk::StdIn(data),
75 1 => TtyChunk::StdOut(data),
76 2 => TtyChunk::StdErr(data),
77 n => panic!("invalid stream number from docker daemon: '{}'", n),
78 };
79
80 Some((Ok(chunk), stream))
81}
82
83pub(crate) fn decode<S>(hyper_chunk_stream: S) -> impl Stream<Item = Result<TtyChunk>>
84where
85 S: Stream<Item = Result<hyper::body::Bytes>> + Unpin,
86{
87 let stream = hyper_chunk_stream
88 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
89 .into_async_read();
90
91 futures_util::stream::unfold(stream, decode_chunk)
92}
93
94type TtyReader<'a> = Pin<Box<dyn Stream<Item = Result<TtyChunk>> + Send + 'a>>;
95type TtyWriter<'a> = Pin<Box<dyn AsyncWrite + Send + 'a>>;
96
97#[pin_project]
101pub struct Multiplexer<'a> {
102 #[pin]
103 reader: TtyReader<'a>,
104 #[pin]
105 writer: TtyWriter<'a>,
106}
107
108impl<'a> Multiplexer<'a> {
109 pub(crate) fn new<T>(tcp_connection: T) -> Self
110 where
111 T: AsyncRead + AsyncWrite + Send + 'a,
112 {
113 let (reader, writer) = tcp_connection.split();
114
115 Self {
116 reader: Box::pin(futures_util::stream::unfold(reader, |reader| {
117 decode_chunk(reader)
118 })),
119 writer: Box::pin(writer),
120 }
121 }
122}
123
124use std::{
125 pin::Pin,
126 task::{Context, Poll},
127};
128
129impl<'a> Stream for Multiplexer<'a> {
130 type Item = Result<TtyChunk>;
131 fn poll_next(
132 self: Pin<&mut Self>,
133 cx: &mut Context<'_>,
134 ) -> Poll<Option<Self::Item>> {
135 self.project().reader.poll_next(cx)
136 }
137}
138
139impl<'a> AsyncWrite for Multiplexer<'a> {
140 fn poll_write(
141 self: Pin<&mut Self>,
142 cx: &mut Context<'_>,
143 buf: &[u8],
144 ) -> Poll<io::Result<usize>> {
145 self.project().writer.poll_write(cx, buf)
146 }
147 fn poll_flush(
148 self: Pin<&mut Self>,
149 cx: &mut Context<'_>,
150 ) -> Poll<io::Result<()>> {
151 self.project().writer.poll_flush(cx)
152 }
153 fn poll_close(
154 self: Pin<&mut Self>,
155 cx: &mut Context<'_>,
156 ) -> Poll<io::Result<()>> {
157 self.project().writer.poll_close(cx)
158 }
159}
160
161impl<'a> Multiplexer<'a> {
162 pub fn split(
164 self
165 ) -> (
166 impl Stream<Item = Result<TtyChunk>> + 'a,
167 impl AsyncWrite + Send + 'a,
168 ) {
169 (self.reader, self.writer)
170 }
171}