Skip to main content

io_http/rfc9112/
chunk_stream.rs

1//! I/O-free coroutine decoding a `Transfer-Encoding: chunked` body
2//! ([RFC 9112 §7.1]) one chunk at a time; suitable for SSE and other
3//! long-lived streams.
4//!
5//! [RFC 9112 §7.1]: https://www.rfc-editor.org/rfc/rfc9112#section-7.1
6
7use core::{fmt, mem};
8
9use alloc::{
10    string::{String, ToString},
11    vec::Vec,
12};
13
14use log::trace;
15use memchr::{memchr, memmem};
16use thiserror::Error;
17
18use crate::{coroutine::*, rfc9110::chars::CRLF};
19
20/// Failure causes during the HTTP/1.1 chunked-body streaming flow.
21#[derive(Debug, Error)]
22pub enum Http11ReadChunksStreamError {
23    #[error("HTTP/1.1 read chunks failed: invalid chunk size `{0}`")]
24    InvalidChunkSize(String),
25}
26
27/// Per-step yield emitted by [`Http11ReadChunksStream`]; adds
28/// [`Self::Frame`] to the standard [`HttpYield`] shape.
29#[derive(Debug)]
30pub enum Http11ReadChunksStreamYield {
31    WantsRead,
32    Frame { body: Vec<u8> },
33}
34
35#[derive(Debug, Default)]
36enum State {
37    #[default]
38    ChunkSize,
39    ChunkData(usize),
40}
41
42impl fmt::Display for State {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        match self {
45            Self::ChunkSize => f.write_str("read chunk size"),
46            Self::ChunkData(_) => f.write_str("read chunk data"),
47        }
48    }
49}
50
51/// I/O-free streaming chunked-body decoder. `Complete(Ok(remaining))`
52/// carries bytes already buffered past the zero-length terminator.
53#[derive(Debug, Default)]
54pub struct Http11ReadChunksStream {
55    state: State,
56    wants_read: bool,
57    done: bool,
58    buf: Vec<u8>,
59}
60
61impl HttpCoroutine for Http11ReadChunksStream {
62    type Yield = Http11ReadChunksStreamYield;
63    type Return = Result<Vec<u8>, Http11ReadChunksStreamError>;
64
65    fn resume(&mut self, arg: Option<&[u8]>) -> HttpCoroutineState<Self::Yield, Self::Return> {
66        if let Some(data) = arg {
67            self.buf.extend_from_slice(data);
68        }
69
70        loop {
71            trace!("http/1.1 stream chunks: {}", self.state);
72
73            if self.wants_read {
74                self.wants_read = false;
75                return HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::WantsRead);
76            }
77
78            if self.done {
79                let remaining = mem::take(&mut self.buf);
80                return HttpCoroutineState::Complete(Ok(remaining));
81            }
82
83            match self.state {
84                State::ChunkSize => {
85                    let Some(crlf) = memmem::find(&self.buf, &CRLF) else {
86                        self.wants_read = true;
87                        continue;
88                    };
89
90                    let ext = match memchr(b';', &self.buf[..crlf]) {
91                        None => crlf,
92                        Some(ext) => {
93                            let exts = String::from_utf8_lossy(self.buf[ext..crlf].trim_ascii());
94                            trace!("ignore extension(s) `{exts}`");
95                            ext
96                        }
97                    };
98
99                    let chunk_size = String::from_utf8_lossy(self.buf[..ext].trim_ascii());
100
101                    let Ok(n) = usize::from_str_radix(&chunk_size, 16) else {
102                        let chunk_size = chunk_size.to_string();
103                        let err = Http11ReadChunksStreamError::InvalidChunkSize(chunk_size);
104                        return HttpCoroutineState::Complete(Err(err));
105                    };
106
107                    self.buf.drain(..crlf + CRLF.len());
108                    self.state = State::ChunkData(n);
109                }
110                State::ChunkData(size) if self.buf.len() < size + CRLF.len() => {
111                    trace!("received incomplete chunk data {}/{size}", self.buf.len());
112                    self.wants_read = true;
113                    continue;
114                }
115                State::ChunkData(0) => {
116                    self.buf.drain(..CRLF.len());
117                    self.state = State::ChunkSize;
118                    self.done = true;
119                }
120                State::ChunkData(size) => {
121                    let body: Vec<u8> = self.buf.drain(..size).collect();
122                    self.buf.drain(..CRLF.len());
123                    self.state = State::ChunkSize;
124                    return HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::Frame {
125                        body,
126                    });
127                }
128            }
129        }
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use alloc::vec;
136
137    use super::*;
138
139    #[test]
140    fn single_chunk() {
141        let frames = collect_all(b"5\r\nhello\r\n0\r\n\r\n");
142        assert_eq!(frames, vec![b"hello".to_vec()]);
143    }
144
145    #[test]
146    fn two_chunks_yielded_separately() {
147        let frames = collect_all(b"5\r\nhello\r\n6\r\n world\r\n0\r\n\r\n");
148        assert_eq!(frames, vec![b"hello".to_vec(), b" world".to_vec()]);
149    }
150
151    #[test]
152    fn empty_body() {
153        let frames = collect_all(b"0\r\n\r\n");
154        assert!(frames.is_empty());
155    }
156
157    #[test]
158    fn ignored_extension() {
159        let frames = collect_all(b"5;ext\r\nHello\r\n0\r\n\r\n");
160        assert_eq!(frames, vec![b"Hello".to_vec()]);
161    }
162
163    #[test]
164    fn invalid_chunk_size() {
165        let mut coroutine = Http11ReadChunksStream::default();
166        let err = expect_complete_err(&mut coroutine, Some(b":\r\n0\r\n\r\n"));
167        let Http11ReadChunksStreamError::InvalidChunkSize(s) = err;
168        assert_eq!(s, ":");
169    }
170
171    #[test]
172    fn incomplete_chunk_size_then_resume() {
173        let mut coroutine = Http11ReadChunksStream::default();
174        expect_wants_read(&mut coroutine, Some(b"5\r"));
175        let body = expect_frame(&mut coroutine, Some(b"\nHello\r\n0\r\n\r\n"));
176        assert_eq!(body, b"Hello");
177        let remaining = expect_complete_ok(&mut coroutine, None);
178        assert!(remaining.is_empty());
179    }
180
181    #[test]
182    fn remaining_bytes_after_terminator() {
183        let mut coroutine = Http11ReadChunksStream::default();
184        let body = expect_frame(&mut coroutine, Some(b"5\r\nhello\r\n0\r\n\r\nNEXT"));
185        assert_eq!(body, b"hello");
186        let remaining = expect_complete_ok(&mut coroutine, None);
187        assert_eq!(remaining, b"NEXT");
188    }
189
190    // --- utils
191
192    fn collect_all(encoded: &[u8]) -> Vec<Vec<u8>> {
193        let mut coroutine = Http11ReadChunksStream::default();
194        let mut arg: Option<&[u8]> = Some(encoded);
195        let mut frames = Vec::new();
196
197        loop {
198            match coroutine.resume(arg.take()) {
199                HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::Frame { body }) => {
200                    frames.push(body);
201                }
202                HttpCoroutineState::Complete(Ok(remaining)) => {
203                    assert!(remaining.is_empty(), "unexpected remaining bytes");
204                    return frames;
205                }
206                state => panic!("expected Frame or Complete, got {state:?}"),
207            }
208        }
209    }
210
211    fn expect_wants_read(cor: &mut Http11ReadChunksStream, arg: Option<&[u8]>) {
212        match cor.resume(arg) {
213            HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::WantsRead) => {}
214            state => panic!("expected WantsRead, got {state:?}"),
215        }
216    }
217
218    fn expect_frame(cor: &mut Http11ReadChunksStream, arg: Option<&[u8]>) -> Vec<u8> {
219        match cor.resume(arg) {
220            HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::Frame { body }) => body,
221            state => panic!("expected Frame, got {state:?}"),
222        }
223    }
224
225    fn expect_complete_ok(cor: &mut Http11ReadChunksStream, arg: Option<&[u8]>) -> Vec<u8> {
226        match cor.resume(arg) {
227            HttpCoroutineState::Complete(Ok(remaining)) => remaining,
228            state => panic!("expected Complete(Ok), got {state:?}"),
229        }
230    }
231
232    fn expect_complete_err(
233        cor: &mut Http11ReadChunksStream,
234        arg: Option<&[u8]>,
235    ) -> Http11ReadChunksStreamError {
236        match cor.resume(arg) {
237            HttpCoroutineState::Complete(Err(err)) => err,
238            state => panic!("expected Complete(Err), got {state:?}"),
239        }
240    }
241}