docker_sdk/
tty.rs

1//! Types for working with docker TTY streams
2
3use crate::{Error, Result};
4use futures_util::{
5    io::{AsyncRead, AsyncReadExt, AsyncWrite},
6    stream::{Stream, TryStreamExt},
7};
8use pin_project::pin_project;
9use std::{convert::TryInto, io};
10
11/// An enum representing a chunk of TTY text streamed from a Docker container.
12///
13/// For convenience, this type can deref to the contained `Vec<u8>`.
14#[derive(Debug, Clone)]
15pub enum TtyChunk {
16    StdIn(Vec<u8>),
17    StdOut(Vec<u8>),
18    StdErr(Vec<u8>),
19}
20
21impl From<TtyChunk> for Vec<u8> {
22    fn from(tty_chunk: TtyChunk) -> Self {
23        match tty_chunk {
24            TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
25        }
26    }
27}
28
29impl AsRef<Vec<u8>> for TtyChunk {
30    fn as_ref(&self) -> &Vec<u8> {
31        match self {
32            TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
33        }
34    }
35}
36
37impl std::ops::Deref for TtyChunk {
38    type Target = Vec<u8>;
39    fn deref(&self) -> &Self::Target {
40        self.as_ref()
41    }
42}
43
44impl std::ops::DerefMut for TtyChunk {
45    fn deref_mut(&mut self) -> &mut Vec<u8> {
46        match self {
47            TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
48        }
49    }
50}
51
52async fn decode_chunk<S>(mut stream: S) -> Option<(Result<TtyChunk>, S)>
53where
54    S: AsyncRead + Unpin,
55{
56    let mut header_bytes = [0u8; 8];
57
58    match stream.read_exact(&mut header_bytes).await {
59        Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => return None,
60        Err(e) => return Some((Err(Error::IO(e)), stream)),
61        _ => (),
62    }
63
64    let size_bytes = &header_bytes[4..];
65    let data_length = u32::from_be_bytes(size_bytes.try_into().unwrap());
66
67    let mut data = vec![0u8; data_length as usize];
68
69    if stream.read_exact(&mut data).await.is_err() {
70        return None;
71    }
72
73    let chunk = match header_bytes[0] {
74        0 => TtyChunk::StdIn(data),
75        1 => TtyChunk::StdOut(data),
76        2 => TtyChunk::StdErr(data),
77        n => panic!("invalid stream number from docker daemon: '{}'", n),
78    };
79
80    Some((Ok(chunk), stream))
81}
82
83pub(crate) fn decode<S>(hyper_chunk_stream: S) -> impl Stream<Item = Result<TtyChunk>>
84where
85    S: Stream<Item = Result<hyper::body::Bytes>> + Unpin,
86{
87    let stream = hyper_chunk_stream
88        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
89        .into_async_read();
90
91    futures_util::stream::unfold(stream, decode_chunk)
92}
93
94type TtyReader<'a> = Pin<Box<dyn Stream<Item = Result<TtyChunk>> + Send + 'a>>;
95type TtyWriter<'a> = Pin<Box<dyn AsyncWrite + Send + 'a>>;
96
97/// TTY multiplexer returned by the `attach` method.
98///
99/// This object can emit a stream of `TtyChunk`s and also implements `AsyncWrite` for streaming bytes to Stdin.
100#[pin_project]
101pub struct Multiplexer<'a> {
102    #[pin]
103    reader: TtyReader<'a>,
104    #[pin]
105    writer: TtyWriter<'a>,
106}
107
108impl<'a> Multiplexer<'a> {
109    pub(crate) fn new<T>(tcp_connection: T) -> Self
110    where
111        T: AsyncRead + AsyncWrite + Send + 'a,
112    {
113        let (reader, writer) = tcp_connection.split();
114
115        Self {
116            reader: Box::pin(futures_util::stream::unfold(reader, |reader| {
117                decode_chunk(reader)
118            })),
119            writer: Box::pin(writer),
120        }
121    }
122}
123
124use std::{
125    pin::Pin,
126    task::{Context, Poll},
127};
128
129impl<'a> Stream for Multiplexer<'a> {
130    type Item = Result<TtyChunk>;
131    fn poll_next(
132        self: Pin<&mut Self>,
133        cx: &mut Context<'_>,
134    ) -> Poll<Option<Self::Item>> {
135        self.project().reader.poll_next(cx)
136    }
137}
138
139impl<'a> AsyncWrite for Multiplexer<'a> {
140    fn poll_write(
141        self: Pin<&mut Self>,
142        cx: &mut Context<'_>,
143        buf: &[u8],
144    ) -> Poll<io::Result<usize>> {
145        self.project().writer.poll_write(cx, buf)
146    }
147    fn poll_flush(
148        self: Pin<&mut Self>,
149        cx: &mut Context<'_>,
150    ) -> Poll<io::Result<()>> {
151        self.project().writer.poll_flush(cx)
152    }
153    fn poll_close(
154        self: Pin<&mut Self>,
155        cx: &mut Context<'_>,
156    ) -> Poll<io::Result<()>> {
157        self.project().writer.poll_close(cx)
158    }
159}
160
161impl<'a> Multiplexer<'a> {
162    /// Split the `Multiplexer` into the component `Stream` and `AsyncWrite` parts
163    pub fn split(
164        self
165    ) -> (
166        impl Stream<Item = Result<TtyChunk>> + 'a,
167        impl AsyncWrite + Send + 'a,
168    ) {
169        (self.reader, self.writer)
170    }
171}