podman_rest_client/
attach_frame_stream.rs

1use std::io;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::Stream;
6use hyper::body::Bytes;
7use tokio::io::AsyncRead;
8use tokio::io::ReadBuf;
9
10#[derive(Debug, PartialEq, Eq)]
11pub enum AttachFrame {
12    Stdin(Bytes),
13    Stdout(Bytes),
14    Stderr(Bytes),
15}
16
17pub struct AttachFrameStream<R> {
18    reader: R,
19    buf: Vec<u8>,
20    pos: usize,
21    state: AttachFrameStreamState,
22}
23
24enum AttachFrameStreamState {
25    ReadingHeader,
26    ReadingData { frame_type: u8, size: usize },
27}
28
29impl<R: AsyncRead + Unpin> AttachFrameStream<R> {
30    pub fn new(reader: R) -> Self {
31        AttachFrameStream {
32            reader,
33            buf: vec![0; 8],
34            pos: 0,
35            state: AttachFrameStreamState::ReadingHeader,
36        }
37    }
38}
39
40impl<R: AsyncRead + Unpin> Stream for AttachFrameStream<R> {
41    type Item = io::Result<AttachFrame>;
42
43    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44        loop {
45            let AttachFrameStream {
46                reader, buf, pos, ..
47            } = &mut *self;
48
49            let reader = Pin::new(reader);
50            let mut read_buf = ReadBuf::new(&mut buf[*pos..]);
51            let poll = reader.poll_read(cx, &mut read_buf);
52            let n = read_buf.filled().len();
53            self.pos += n;
54
55            match poll {
56                Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
57                Poll::Pending => return Poll::Pending,
58                Poll::Ready(Ok(())) => {
59                    if n == 0 {
60                        return Poll::Ready(None);
61                    }
62
63                    match self.state {
64                        AttachFrameStreamState::ReadingHeader => {
65                            if self.pos == self.buf.len() {
66                                let frame_type = self.buf[0];
67                                let frame_size = u32::from_be_bytes([
68                                    self.buf[4],
69                                    self.buf[5],
70                                    self.buf[6],
71                                    self.buf[7],
72                                ]) as usize;
73                                self.buf = vec![0; frame_size];
74                                self.pos = 0;
75
76                                self.state = AttachFrameStreamState::ReadingData {
77                                    frame_type,
78                                    size: frame_size,
79                                };
80                            }
81                        }
82                        AttachFrameStreamState::ReadingData { frame_type, size } => {
83                            if self.pos == size {
84                                let frame = match frame_type {
85                                    0x00 => AttachFrame::Stdin(self.buf.clone().into()),
86                                    0x01 => AttachFrame::Stdout(self.buf.clone().into()),
87                                    0x02 => AttachFrame::Stderr(self.buf.clone().into()),
88                                    _ => {
89                                        return Poll::Ready(Some(Err(io::Error::new(
90                                            io::ErrorKind::InvalidData,
91                                            "Unknown frame type",
92                                        ))))
93                                    }
94                                };
95                                self.buf = vec![0; 8];
96                                self.pos = 0;
97
98                                self.state = AttachFrameStreamState::ReadingHeader;
99                                return Poll::Ready(Some(Ok(frame)));
100                            }
101                        }
102                    }
103                }
104            }
105        }
106    }
107}