containers_api/conn/
tty.rs

1//! Types for working with TTY streams
2
3use crate::conn::{Error, Result};
4use futures_util::{
5    io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadHalf},
6    stream::{Stream, TryStreamExt},
7};
8use pin_project::pin_project;
9use std::{convert::TryInto, io};
10use std::{
11    pin::Pin,
12    task::{Context, Poll},
13};
14
15#[derive(Debug, Clone)]
16#[allow(clippy::enum_variant_names)]
17/// An enum representing a chunk of TTY text streamed from a Podman container.
18///
19/// For convenience, this type can deref to the contained `Vec<u8>`.
20pub enum TtyChunk {
21    StdIn(Vec<u8>),
22    StdOut(Vec<u8>),
23    StdErr(Vec<u8>),
24}
25
26impl From<TtyChunk> for Vec<u8> {
27    fn from(tty_chunk: TtyChunk) -> Self {
28        match tty_chunk {
29            TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
30        }
31    }
32}
33
34impl AsRef<Vec<u8>> for TtyChunk {
35    fn as_ref(&self) -> &Vec<u8> {
36        match self {
37            TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
38        }
39    }
40}
41
42impl std::ops::Deref for TtyChunk {
43    type Target = Vec<u8>;
44    fn deref(&self) -> &Self::Target {
45        self.as_ref()
46    }
47}
48
49impl std::ops::DerefMut for TtyChunk {
50    fn deref_mut(&mut self) -> &mut Vec<u8> {
51        match self {
52            TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
53        }
54    }
55}
56
57pub async fn decode_chunk<S>(mut stream: S) -> Option<(Result<TtyChunk>, S)>
58where
59    S: AsyncRead + Unpin,
60{
61    let mut header_bytes = [0u8; 8];
62
63    match stream.read_exact(&mut header_bytes).await {
64        Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => return None,
65        Err(e) => return Some((Err(Error::IO(e)), stream)),
66        _ => (),
67    }
68
69    let size_bytes = &header_bytes[4..];
70    let data_length = u32::from_be_bytes(size_bytes.try_into().ok()?);
71
72    let mut data = vec![0u8; data_length as usize];
73
74    if stream.read_exact(&mut data).await.is_err() {
75        return None;
76    }
77
78    let chunk = match header_bytes[0] {
79        0 => TtyChunk::StdIn(data),
80        1 => TtyChunk::StdOut(data),
81        2 => TtyChunk::StdErr(data),
82        n => panic!("invalid stream number from podman daemon: '{n}'"),
83    };
84
85    Some((Ok(chunk), stream))
86}
87
88/// Decodes a TTY chunk from a stream.
89pub fn decode<S>(hyper_chunk_stream: S) -> impl Stream<Item = Result<TtyChunk>>
90where
91    S: Stream<Item = Result<hyper::body::Bytes>> + Unpin,
92{
93    let stream = hyper_chunk_stream
94        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
95        .into_async_read();
96
97    futures_util::stream::unfold(stream, decode_chunk)
98}
99
100pub async fn decode_raw<S>(stream: S) -> Option<(Result<TtyChunk>, S)>
101where
102    S: AsyncRead + Unpin,
103{
104    use futures_util::io::AsyncBufReadExt;
105    let mut reader = futures_util::io::BufReader::new(stream);
106    match reader.fill_buf().await {
107        Ok(buf) if buf.is_empty() => None,
108        Ok(buf) => Some((Ok(TtyChunk::StdOut(buf.to_vec())), reader.into_inner())),
109        Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => None,
110        Err(e) => Some((Err(Error::IO(e)), reader.into_inner())),
111    }
112}
113
114type TtyReader = Pin<Box<dyn Stream<Item = Result<TtyChunk>> + Send + 'static>>;
115type TtyWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>;
116
117/// This object can emit a stream of `TtyChunk`s and also implements `AsyncWrite` for streaming bytes to Stdin.
118#[pin_project]
119pub struct Multiplexer {
120    #[pin]
121    reader: TtyReader,
122    #[pin]
123    writer: TtyWriter,
124}
125
126impl Multiplexer {
127    pub fn new<Con, F, Fut>(tcp_connection: Con, mut read_fn: F) -> Self
128    where
129        Con: AsyncRead + AsyncWrite + Send + 'static,
130        F: FnMut(ReadHalf<Con>) -> Fut + Send + 'static,
131        Fut: futures_util::Future<Output = Option<(Result<TtyChunk>, ReadHalf<Con>)>>
132            + Send
133            + 'static,
134    {
135        let (reader, writer) = tcp_connection.split();
136
137        Self {
138            reader: Box::pin(futures_util::stream::unfold(reader, move |reader| {
139                read_fn(reader)
140            })),
141            writer: Box::pin(writer),
142        }
143    }
144}
145
146impl Stream for Multiplexer {
147    type Item = Result<TtyChunk>;
148    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
149        self.project().reader.poll_next(cx)
150    }
151}
152
153impl AsyncWrite for Multiplexer {
154    fn poll_write(
155        self: Pin<&mut Self>,
156        cx: &mut Context<'_>,
157        buf: &[u8],
158    ) -> Poll<io::Result<usize>> {
159        self.project().writer.poll_write(cx, buf)
160    }
161    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
162        self.project().writer.poll_flush(cx)
163    }
164    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
165        self.project().writer.poll_close(cx)
166    }
167}
168
169impl Multiplexer {
170    /// Split the `Multiplexer` into the component `Stream` and `AsyncWrite` parts
171    pub fn split(self) -> (impl Stream<Item = Result<TtyChunk>>, impl AsyncWrite + Send) {
172        (self.reader, self.writer)
173    }
174}