podman_rest_client/
attach_frame_stream.rs1use std::io;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::Stream;
6use hyper::body::Bytes;
7use tokio::io::AsyncRead;
8use tokio::io::ReadBuf;
9
10#[derive(Debug, PartialEq, Eq)]
11pub enum AttachFrame {
12 Stdin(Bytes),
13 Stdout(Bytes),
14 Stderr(Bytes),
15}
16
17pub struct AttachFrameStream<R> {
18 reader: R,
19 buf: Vec<u8>,
20 pos: usize,
21 state: AttachFrameStreamState,
22}
23
24enum AttachFrameStreamState {
25 ReadingHeader,
26 ReadingData { frame_type: u8, size: usize },
27}
28
29impl<R: AsyncRead + Unpin> AttachFrameStream<R> {
30 pub fn new(reader: R) -> Self {
31 AttachFrameStream {
32 reader,
33 buf: vec![0; 8],
34 pos: 0,
35 state: AttachFrameStreamState::ReadingHeader,
36 }
37 }
38}
39
40impl<R: AsyncRead + Unpin> Stream for AttachFrameStream<R> {
41 type Item = io::Result<AttachFrame>;
42
43 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44 loop {
45 let AttachFrameStream {
46 reader, buf, pos, ..
47 } = &mut *self;
48
49 let reader = Pin::new(reader);
50 let mut read_buf = ReadBuf::new(&mut buf[*pos..]);
51 let poll = reader.poll_read(cx, &mut read_buf);
52 let n = read_buf.filled().len();
53 self.pos += n;
54
55 match poll {
56 Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
57 Poll::Pending => return Poll::Pending,
58 Poll::Ready(Ok(())) => {
59 if n == 0 {
60 return Poll::Ready(None);
61 }
62
63 match self.state {
64 AttachFrameStreamState::ReadingHeader => {
65 if self.pos == self.buf.len() {
66 let frame_type = self.buf[0];
67 let frame_size = u32::from_be_bytes([
68 self.buf[4],
69 self.buf[5],
70 self.buf[6],
71 self.buf[7],
72 ]) as usize;
73 self.buf = vec![0; frame_size];
74 self.pos = 0;
75
76 self.state = AttachFrameStreamState::ReadingData {
77 frame_type,
78 size: frame_size,
79 };
80 }
81 }
82 AttachFrameStreamState::ReadingData { frame_type, size } => {
83 if self.pos == size {
84 let frame = match frame_type {
85 0x00 => AttachFrame::Stdin(self.buf.clone().into()),
86 0x01 => AttachFrame::Stdout(self.buf.clone().into()),
87 0x02 => AttachFrame::Stderr(self.buf.clone().into()),
88 _ => {
89 return Poll::Ready(Some(Err(io::Error::new(
90 io::ErrorKind::InvalidData,
91 "Unknown frame type",
92 ))))
93 }
94 };
95 self.buf = vec![0; 8];
96 self.pos = 0;
97
98 self.state = AttachFrameStreamState::ReadingHeader;
99 return Poll::Ready(Some(Ok(frame)));
100 }
101 }
102 }
103 }
104 }
105 }
106 }
107}