io_http/rfc9112/
chunk_stream.rs1use core::{fmt, mem};
8
9use alloc::{
10 string::{String, ToString},
11 vec::Vec,
12};
13
14use log::trace;
15use memchr::{memchr, memmem};
16use thiserror::Error;
17
18use crate::{coroutine::*, rfc9110::chars::CRLF};
19
20#[derive(Debug, Error)]
22pub enum Http11ReadChunksStreamError {
23 #[error("HTTP/1.1 read chunks failed: invalid chunk size `{0}`")]
24 InvalidChunkSize(String),
25}
26
27#[derive(Debug)]
30pub enum Http11ReadChunksStreamYield {
31 WantsRead,
32 Frame { body: Vec<u8> },
33}
34
35#[derive(Debug, Default)]
36enum State {
37 #[default]
38 ChunkSize,
39 ChunkData(usize),
40}
41
42impl fmt::Display for State {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 match self {
45 Self::ChunkSize => f.write_str("read chunk size"),
46 Self::ChunkData(_) => f.write_str("read chunk data"),
47 }
48 }
49}
50
51#[derive(Debug, Default)]
54pub struct Http11ReadChunksStream {
55 state: State,
56 wants_read: bool,
57 done: bool,
58 buf: Vec<u8>,
59}
60
61impl HttpCoroutine for Http11ReadChunksStream {
62 type Yield = Http11ReadChunksStreamYield;
63 type Return = Result<Vec<u8>, Http11ReadChunksStreamError>;
64
65 fn resume(&mut self, arg: Option<&[u8]>) -> HttpCoroutineState<Self::Yield, Self::Return> {
66 if let Some(data) = arg {
67 self.buf.extend_from_slice(data);
68 }
69
70 loop {
71 trace!("http/1.1 stream chunks: {}", self.state);
72
73 if self.wants_read {
74 self.wants_read = false;
75 return HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::WantsRead);
76 }
77
78 if self.done {
79 let remaining = mem::take(&mut self.buf);
80 return HttpCoroutineState::Complete(Ok(remaining));
81 }
82
83 match self.state {
84 State::ChunkSize => {
85 let Some(crlf) = memmem::find(&self.buf, &CRLF) else {
86 self.wants_read = true;
87 continue;
88 };
89
90 let ext = match memchr(b';', &self.buf[..crlf]) {
91 None => crlf,
92 Some(ext) => {
93 let exts = String::from_utf8_lossy(self.buf[ext..crlf].trim_ascii());
94 trace!("ignore extension(s) `{exts}`");
95 ext
96 }
97 };
98
99 let chunk_size = String::from_utf8_lossy(self.buf[..ext].trim_ascii());
100
101 let Ok(n) = usize::from_str_radix(&chunk_size, 16) else {
102 let chunk_size = chunk_size.to_string();
103 let err = Http11ReadChunksStreamError::InvalidChunkSize(chunk_size);
104 return HttpCoroutineState::Complete(Err(err));
105 };
106
107 self.buf.drain(..crlf + CRLF.len());
108 self.state = State::ChunkData(n);
109 }
110 State::ChunkData(size) if self.buf.len() < size + CRLF.len() => {
111 trace!("received incomplete chunk data {}/{size}", self.buf.len());
112 self.wants_read = true;
113 continue;
114 }
115 State::ChunkData(0) => {
116 self.buf.drain(..CRLF.len());
117 self.state = State::ChunkSize;
118 self.done = true;
119 }
120 State::ChunkData(size) => {
121 let body: Vec<u8> = self.buf.drain(..size).collect();
122 self.buf.drain(..CRLF.len());
123 self.state = State::ChunkSize;
124 return HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::Frame {
125 body,
126 });
127 }
128 }
129 }
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use alloc::vec;
136
137 use super::*;
138
139 #[test]
140 fn single_chunk() {
141 let frames = collect_all(b"5\r\nhello\r\n0\r\n\r\n");
142 assert_eq!(frames, vec![b"hello".to_vec()]);
143 }
144
145 #[test]
146 fn two_chunks_yielded_separately() {
147 let frames = collect_all(b"5\r\nhello\r\n6\r\n world\r\n0\r\n\r\n");
148 assert_eq!(frames, vec![b"hello".to_vec(), b" world".to_vec()]);
149 }
150
151 #[test]
152 fn empty_body() {
153 let frames = collect_all(b"0\r\n\r\n");
154 assert!(frames.is_empty());
155 }
156
157 #[test]
158 fn ignored_extension() {
159 let frames = collect_all(b"5;ext\r\nHello\r\n0\r\n\r\n");
160 assert_eq!(frames, vec![b"Hello".to_vec()]);
161 }
162
163 #[test]
164 fn invalid_chunk_size() {
165 let mut coroutine = Http11ReadChunksStream::default();
166 let err = expect_complete_err(&mut coroutine, Some(b":\r\n0\r\n\r\n"));
167 let Http11ReadChunksStreamError::InvalidChunkSize(s) = err;
168 assert_eq!(s, ":");
169 }
170
171 #[test]
172 fn incomplete_chunk_size_then_resume() {
173 let mut coroutine = Http11ReadChunksStream::default();
174 expect_wants_read(&mut coroutine, Some(b"5\r"));
175 let body = expect_frame(&mut coroutine, Some(b"\nHello\r\n0\r\n\r\n"));
176 assert_eq!(body, b"Hello");
177 let remaining = expect_complete_ok(&mut coroutine, None);
178 assert!(remaining.is_empty());
179 }
180
181 #[test]
182 fn remaining_bytes_after_terminator() {
183 let mut coroutine = Http11ReadChunksStream::default();
184 let body = expect_frame(&mut coroutine, Some(b"5\r\nhello\r\n0\r\n\r\nNEXT"));
185 assert_eq!(body, b"hello");
186 let remaining = expect_complete_ok(&mut coroutine, None);
187 assert_eq!(remaining, b"NEXT");
188 }
189
190 fn collect_all(encoded: &[u8]) -> Vec<Vec<u8>> {
193 let mut coroutine = Http11ReadChunksStream::default();
194 let mut arg: Option<&[u8]> = Some(encoded);
195 let mut frames = Vec::new();
196
197 loop {
198 match coroutine.resume(arg.take()) {
199 HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::Frame { body }) => {
200 frames.push(body);
201 }
202 HttpCoroutineState::Complete(Ok(remaining)) => {
203 assert!(remaining.is_empty(), "unexpected remaining bytes");
204 return frames;
205 }
206 state => panic!("expected Frame or Complete, got {state:?}"),
207 }
208 }
209 }
210
211 fn expect_wants_read(cor: &mut Http11ReadChunksStream, arg: Option<&[u8]>) {
212 match cor.resume(arg) {
213 HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::WantsRead) => {}
214 state => panic!("expected WantsRead, got {state:?}"),
215 }
216 }
217
218 fn expect_frame(cor: &mut Http11ReadChunksStream, arg: Option<&[u8]>) -> Vec<u8> {
219 match cor.resume(arg) {
220 HttpCoroutineState::Yielded(Http11ReadChunksStreamYield::Frame { body }) => body,
221 state => panic!("expected Frame, got {state:?}"),
222 }
223 }
224
225 fn expect_complete_ok(cor: &mut Http11ReadChunksStream, arg: Option<&[u8]>) -> Vec<u8> {
226 match cor.resume(arg) {
227 HttpCoroutineState::Complete(Ok(remaining)) => remaining,
228 state => panic!("expected Complete(Ok), got {state:?}"),
229 }
230 }
231
232 fn expect_complete_err(
233 cor: &mut Http11ReadChunksStream,
234 arg: Option<&[u8]>,
235 ) -> Http11ReadChunksStreamError {
236 match cor.resume(arg) {
237 HttpCoroutineState::Complete(Err(err)) => err,
238 state => panic!("expected Complete(Err), got {state:?}"),
239 }
240 }
241}