connect_rpc/
stream.rs

1use bytes::{Buf, BufMut, Bytes, BytesMut};
2use futures_util::{stream, Stream, StreamExt, TryStream, TryStreamExt};
3use http_body::Body;
4use http_body_util::BodyExt;
5
6use crate::{BoxError, Error};
7
8pub struct ConnectFrame {
9    pub compressed: bool,
10    pub end: bool,
11    pub data: Bytes,
12}
13
14const FLAGS_COMPRESSED: u8 = 0b1;
15const FLAGS_END: u8 = 0b01;
16
17impl ConnectFrame {
18    pub fn body_stream<B>(body: B) -> impl Stream<Item = Result<Self, Error>>
19    where
20        B: Body<Error: Into<BoxError>>,
21    {
22        Self::bytes_stream(body.into_data_stream())
23    }
24
25    pub fn bytes_stream<S>(s: S) -> impl Stream<Item = Result<Self, Error>>
26    where
27        S: TryStream<Ok: Buf, Error: Into<BoxError>>,
28    {
29        let mut parse_state = FrameParseState::default();
30        s.map_err(Error::body)
31            .map(Some)
32            .chain(stream::iter([None]))
33            .flat_map(move |item| stream::iter(parse_state.feed(item)))
34    }
35}
36
37#[derive(Default)]
38struct FrameParseState {
39    buf: BytesMut,
40    failed: bool,
41}
42
43impl FrameParseState {
44    fn feed(&mut self, item: Option<Result<impl Buf, Error>>) -> Vec<Result<ConnectFrame, Error>> {
45        if self.failed {
46            return vec![];
47        }
48        let data = match item {
49            Some(Ok(data)) => data,
50            Some(Err(err)) => {
51                self.failed = true;
52                return vec![Err(Error::body(err))];
53            }
54            None => {
55                if !self.buf.is_empty() {
56                    return vec![Err(Error::body("partial frame at end of stream"))];
57                }
58                return vec![];
59            }
60        };
61
62        self.buf.put(data);
63
64        let mut frames = vec![];
65        loop {
66            match self.parse_frame() {
67                Ok(Some(frame)) => frames.push(Ok(frame)),
68                Ok(None) => return frames,
69                Err(err) => {
70                    self.failed = true;
71                    frames.push(Err(err));
72                }
73            }
74        }
75    }
76
77    fn parse_frame(&mut self) -> Result<Option<ConnectFrame>, Error> {
78        if self.buf.len() < 5 {
79            return Ok(None);
80        }
81        let data_len = (&self.buf[1..]).get_u32();
82        let Ok(frame_len) = ((data_len as u64) + 5).try_into() else {
83            return Err(Error::body("frame too large"));
84        };
85        if self.buf.len() < frame_len {
86            return Ok(None);
87        }
88        let mut frame = self.buf.split_to(frame_len);
89        let data = frame.split_off(5).freeze();
90        let flags = frame[0];
91        Ok(Some(ConnectFrame {
92            compressed: flags & FLAGS_COMPRESSED != 0,
93            end: flags & FLAGS_END != 0,
94            data,
95        }))
96    }
97}