containers_api/conn/
tty.rs1use crate::conn::{Error, Result};
4use futures_util::{
5 io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadHalf},
6 stream::{Stream, TryStreamExt},
7};
8use pin_project::pin_project;
9use std::{convert::TryInto, io};
10use std::{
11 pin::Pin,
12 task::{Context, Poll},
13};
14
15#[derive(Debug, Clone)]
16#[allow(clippy::enum_variant_names)]
17pub enum TtyChunk {
21 StdIn(Vec<u8>),
22 StdOut(Vec<u8>),
23 StdErr(Vec<u8>),
24}
25
26impl From<TtyChunk> for Vec<u8> {
27 fn from(tty_chunk: TtyChunk) -> Self {
28 match tty_chunk {
29 TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
30 }
31 }
32}
33
34impl AsRef<Vec<u8>> for TtyChunk {
35 fn as_ref(&self) -> &Vec<u8> {
36 match self {
37 TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
38 }
39 }
40}
41
42impl std::ops::Deref for TtyChunk {
43 type Target = Vec<u8>;
44 fn deref(&self) -> &Self::Target {
45 self.as_ref()
46 }
47}
48
49impl std::ops::DerefMut for TtyChunk {
50 fn deref_mut(&mut self) -> &mut Vec<u8> {
51 match self {
52 TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
53 }
54 }
55}
56
57pub async fn decode_chunk<S>(mut stream: S) -> Option<(Result<TtyChunk>, S)>
58where
59 S: AsyncRead + Unpin,
60{
61 let mut header_bytes = [0u8; 8];
62
63 match stream.read_exact(&mut header_bytes).await {
64 Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => return None,
65 Err(e) => return Some((Err(Error::IO(e)), stream)),
66 _ => (),
67 }
68
69 let size_bytes = &header_bytes[4..];
70 let data_length = u32::from_be_bytes(size_bytes.try_into().ok()?);
71
72 let mut data = vec![0u8; data_length as usize];
73
74 if stream.read_exact(&mut data).await.is_err() {
75 return None;
76 }
77
78 let chunk = match header_bytes[0] {
79 0 => TtyChunk::StdIn(data),
80 1 => TtyChunk::StdOut(data),
81 2 => TtyChunk::StdErr(data),
82 n => panic!("invalid stream number from podman daemon: '{n}'"),
83 };
84
85 Some((Ok(chunk), stream))
86}
87
88pub fn decode<S>(hyper_chunk_stream: S) -> impl Stream<Item = Result<TtyChunk>>
90where
91 S: Stream<Item = Result<hyper::body::Bytes>> + Unpin,
92{
93 let stream = hyper_chunk_stream
94 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
95 .into_async_read();
96
97 futures_util::stream::unfold(stream, decode_chunk)
98}
99
100pub async fn decode_raw<S>(stream: S) -> Option<(Result<TtyChunk>, S)>
101where
102 S: AsyncRead + Unpin,
103{
104 use futures_util::io::AsyncBufReadExt;
105 let mut reader = futures_util::io::BufReader::new(stream);
106 match reader.fill_buf().await {
107 Ok(buf) if buf.is_empty() => None,
108 Ok(buf) => Some((Ok(TtyChunk::StdOut(buf.to_vec())), reader.into_inner())),
109 Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => None,
110 Err(e) => Some((Err(Error::IO(e)), reader.into_inner())),
111 }
112}
113
114type TtyReader = Pin<Box<dyn Stream<Item = Result<TtyChunk>> + Send + 'static>>;
115type TtyWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>;
116
117#[pin_project]
119pub struct Multiplexer {
120 #[pin]
121 reader: TtyReader,
122 #[pin]
123 writer: TtyWriter,
124}
125
126impl Multiplexer {
127 pub fn new<Con, F, Fut>(tcp_connection: Con, mut read_fn: F) -> Self
128 where
129 Con: AsyncRead + AsyncWrite + Send + 'static,
130 F: FnMut(ReadHalf<Con>) -> Fut + Send + 'static,
131 Fut: futures_util::Future<Output = Option<(Result<TtyChunk>, ReadHalf<Con>)>>
132 + Send
133 + 'static,
134 {
135 let (reader, writer) = tcp_connection.split();
136
137 Self {
138 reader: Box::pin(futures_util::stream::unfold(reader, move |reader| {
139 read_fn(reader)
140 })),
141 writer: Box::pin(writer),
142 }
143 }
144}
145
146impl Stream for Multiplexer {
147 type Item = Result<TtyChunk>;
148 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
149 self.project().reader.poll_next(cx)
150 }
151}
152
153impl AsyncWrite for Multiplexer {
154 fn poll_write(
155 self: Pin<&mut Self>,
156 cx: &mut Context<'_>,
157 buf: &[u8],
158 ) -> Poll<io::Result<usize>> {
159 self.project().writer.poll_write(cx, buf)
160 }
161 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
162 self.project().writer.poll_flush(cx)
163 }
164 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
165 self.project().writer.poll_close(cx)
166 }
167}
168
169impl Multiplexer {
170 pub fn split(self) -> (impl Stream<Item = Result<TtyChunk>>, impl AsyncWrite + Send) {
172 (self.reader, self.writer)
173 }
174}