1use bytes::{Buf, BufMut, Bytes, BytesMut};
2use futures_util::{stream, Stream, StreamExt, TryStream, TryStreamExt};
3use http_body::Body;
4use http_body_util::BodyExt;
5
6use crate::{BoxError, Error};
7
8pub struct ConnectFrame {
9 pub compressed: bool,
10 pub end: bool,
11 pub data: Bytes,
12}
13
14const FLAGS_COMPRESSED: u8 = 0b1;
15const FLAGS_END: u8 = 0b01;
16
17impl ConnectFrame {
18 pub fn body_stream<B>(body: B) -> impl Stream<Item = Result<Self, Error>>
19 where
20 B: Body<Error: Into<BoxError>>,
21 {
22 Self::bytes_stream(body.into_data_stream())
23 }
24
25 pub fn bytes_stream<S>(s: S) -> impl Stream<Item = Result<Self, Error>>
26 where
27 S: TryStream<Ok: Buf, Error: Into<BoxError>>,
28 {
29 let mut parse_state = FrameParseState::default();
30 s.map_err(Error::body)
31 .map(Some)
32 .chain(stream::iter([None]))
33 .flat_map(move |item| stream::iter(parse_state.feed(item)))
34 }
35}
36
37#[derive(Default)]
38struct FrameParseState {
39 buf: BytesMut,
40 failed: bool,
41}
42
43impl FrameParseState {
44 fn feed(&mut self, item: Option<Result<impl Buf, Error>>) -> Vec<Result<ConnectFrame, Error>> {
45 if self.failed {
46 return vec![];
47 }
48 let data = match item {
49 Some(Ok(data)) => data,
50 Some(Err(err)) => {
51 self.failed = true;
52 return vec![Err(Error::body(err))];
53 }
54 None => {
55 if !self.buf.is_empty() {
56 return vec![Err(Error::body("partial frame at end of stream"))];
57 }
58 return vec![];
59 }
60 };
61
62 self.buf.put(data);
63
64 let mut frames = vec![];
65 loop {
66 match self.parse_frame() {
67 Ok(Some(frame)) => frames.push(Ok(frame)),
68 Ok(None) => return frames,
69 Err(err) => {
70 self.failed = true;
71 frames.push(Err(err));
72 }
73 }
74 }
75 }
76
77 fn parse_frame(&mut self) -> Result<Option<ConnectFrame>, Error> {
78 if self.buf.len() < 5 {
79 return Ok(None);
80 }
81 let data_len = (&self.buf[1..]).get_u32();
82 let Ok(frame_len) = ((data_len as u64) + 5).try_into() else {
83 return Err(Error::body("frame too large"));
84 };
85 if self.buf.len() < frame_len {
86 return Ok(None);
87 }
88 let mut frame = self.buf.split_to(frame_len);
89 let data = frame.split_off(5).freeze();
90 let flags = frame[0];
91 Ok(Some(ConnectFrame {
92 compressed: flags & FLAGS_COMPRESSED != 0,
93 end: flags & FLAGS_END != 0,
94 data,
95 }))
96 }
97}