1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
//! This crate provides a zero-allocation iterator for the payload bytes in an HTTP
//! [chunked-encoded](https://tools.ietf.org/html/rfc7230#section-4.1) body. It wraps a
//! given iterator over raw HTTP body bytes, decodes the chunked transfer protocol, and
//! yields the data bytes from each chunk. The result can be fed, for example, into a
//! byte-based parser such as
//! [serde_json::from_iter](https://docs.serde.rs/serde_json/de/fn.from_iter.html).
//!
//! This implementation supports chunk lengths up to that which can be stored by `usize`
//! on the target platform. Chunk extension parameters are discarded, and trailing headers
//! aren't processed, although they can be retrieved from the wrapped source iterator at
//! the end of chunked payload iteration.
//!
//! ## Example
//!
//! ```rust
//! use uhttp_chunked_bytes::ChunkedBytes;
//!
//! // Create a sample json body `{"key": 42}`, split over two chunks.
//! let body = b"4\r\n{\"ke\r\n7\r\ny\": 42}\r\n0\r\n\r\n";
//! let mut stream = body.iter().map(|&b| Ok(b));
//!
//! let mut bytes = ChunkedBytes::new(&mut stream);
//! assert_eq!(bytes.next().unwrap().unwrap(), b'{');
//! assert_eq!(bytes.next().unwrap().unwrap(), b'"');
//! assert_eq!(bytes.next().unwrap().unwrap(), b'k');
//! assert_eq!(bytes.next().unwrap().unwrap(), b'e');
//! assert_eq!(bytes.next().unwrap().unwrap(), b'y');
//! assert_eq!(bytes.next().unwrap().unwrap(), b'"');
//! assert_eq!(bytes.next().unwrap().unwrap(), b':');
//! assert_eq!(bytes.next().unwrap().unwrap(), b' ');
//! assert_eq!(bytes.next().unwrap().unwrap(), b'4');
//! assert_eq!(bytes.next().unwrap().unwrap(), b'2');
//! assert_eq!(bytes.next().unwrap().unwrap(), b'}');
//! assert!(bytes.next().is_none());
//! ```

/// A 64-bit usize number can have at most 16 hex digits.
#[cfg(target_pointer_width = "64")]
type DigitBuf = [u8; 16];

/// A 32-bit usize number can have at most 8 hex digits.
#[cfg(target_pointer_width = "32")]
type DigitBuf = [u8; 8];

/// Iterator over payload bytes in a chunked-encoded stream.
///
/// When the iterator returns `None`, the wrapped stream will typically contain a final
/// CRLF to end the body, but it may also contain [trailing header
/// fields](https://tools.ietf.org/html/rfc7230#section-4.1.2) before the final CRLF.
pub struct ChunkedBytes<I: Iterator<Item = std::io::Result<u8>>> {
    /// Underlying byte stream in chunked transfer-encoding format.
    stream: I,
    /// Number of remaining bytes in the current chunk.
    remain: usize,
}

impl<I: Iterator<Item = std::io::Result<u8>>> ChunkedBytes<I> {
    /// Create a new `ChunkedBytes` iterator over the given byte stream.
    pub fn new(stream: I) -> Self {
        ChunkedBytes {
            stream: stream,
            remain: 0,
        }
    }

    /// Parse the number of bytes in the next chunk.
    fn parse_size(&mut self) -> Option<std::io::Result<usize>> {
        let mut digits = DigitBuf::default();

        let slice = match self.parse_digits(&mut digits[..]) {
            // This is safe because the following call to `from_str_radix` does
            // its own verification on the bytes.
            Some(Ok(s)) => unsafe { std::str::from_utf8_unchecked(s) },
            Some(Err(e)) => return Some(Err(e)),
            None => return None,
        };

        match usize::from_str_radix(slice, 16) {
            Ok(n) => Some(Ok(n)),
            Err(_) => Some(Err(std::io::ErrorKind::InvalidData.into())),
        }
    }

    /// Extract the hex digits for the current chunk size.
    fn parse_digits<'a>(&mut self, digits: &'a mut [u8])
        -> Option<std::io::Result<&'a [u8]>>
    {
        // Number of hex digits that have been extracted.
        let mut len = 0;

        loop {
            let b = match self.stream.next() {
                Some(Ok(b)) => b,
                Some(Err(e)) => return Some(Err(e)),
                None => return if len == 0 {
                    // If EOF at the beginning of a new chunk, the stream is finished.
                    None
                } else {
                    Some(Err(std::io::ErrorKind::UnexpectedEof.into()))
                },
            };

            match b {
                b'\r' => if let Err(e) = self.consume_lf() {
                    return Some(Err(e));
                } else {
                    break;
                },
                b';' => if let Err(e) = self.consume_ext() {
                    return Some(Err(e));
                } else {
                    break;
                },
                _ => {
                    match digits.get_mut(len) {
                        Some(d) => *d = b,
                        None => return Some(Err(std::io::ErrorKind::Other.into())),
                    }

                    len += 1;
                },
            }
        }

        Some(Ok(&digits[..len]))
    }

    /// Consume and discard current chunk extension.
    ///
    /// This doesn't check whether the characters up to CRLF actually have correct syntax.
    fn consume_ext(&mut self) -> std::io::Result<()> {
        loop {
            match self.stream.next() {
                Some(Ok(b'\r')) => return self.consume_lf(),
                Some(Ok(_)) => {},
                Some(Err(e)) => return Err(e),
                None => return Err(std::io::ErrorKind::UnexpectedEof.into()),
            }
        }
    }

    /// Verify the next bytes in the stream are CRLF.
    fn consume_crlf(&mut self) -> std::io::Result<()> {
        match self.stream.next() {
            Some(Ok(b'\r')) => self.consume_lf(),
            Some(Ok(_)) => Err(std::io::ErrorKind::InvalidData.into()),
            Some(Err(e)) => Err(e),
            None => Err(std::io::ErrorKind::UnexpectedEof.into()),
        }
    }

    /// Verify the next byte in the stream is LF.
    fn consume_lf(&mut self) -> std::io::Result<()> {
        match self.stream.next() {
            Some(Ok(b'\n')) => Ok(()),
            Some(Ok(_)) => Err(std::io::ErrorKind::InvalidData.into()),
            Some(Err(e)) => Err(e),
            None => Err(std::io::ErrorKind::UnexpectedEof.into()),
        }
    }
}

impl<I: Iterator<Item = std::io::Result<u8>>> Iterator for ChunkedBytes<I> {
    type Item = std::io::Result<u8>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.remain == 0 {
            let size = match self.parse_size() {
                Some(Ok(s)) => s,
                Some(Err(e)) => return Some(Err(e)),
                None => return None,
            };

            // If chunk size is zero (final chunk), the stream is finished [RFC7230§4.1].
            if size == 0 {
                return None;
            }

            self.remain = size;
        }

        let next = self.stream.next();
        self.remain -= 1;

        // If current chunk is finished, verify it ends with CRLF [RFC7230§4.1].
        if self.remain == 0 {
            if let Err(e) = self.consume_crlf() {
                return Some(Err(e));
            }
        }

        next
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use std;

    #[test]
    fn test_chunked_bytes() {
        let stream = b"A\r\nabcdefghij\r\n2\r\n42\r\n";
        let mut c = ChunkedBytes::new(stream.iter().map(|&x| Ok(x)));
        assert_eq!(c.next().unwrap().unwrap(), b'a');
        assert_eq!(c.next().unwrap().unwrap(), b'b');
        assert_eq!(c.next().unwrap().unwrap(), b'c');
        assert_eq!(c.next().unwrap().unwrap(), b'd');
        assert_eq!(c.next().unwrap().unwrap(), b'e');
        assert_eq!(c.next().unwrap().unwrap(), b'f');
        assert_eq!(c.next().unwrap().unwrap(), b'g');
        assert_eq!(c.next().unwrap().unwrap(), b'h');
        assert_eq!(c.next().unwrap().unwrap(), b'i');
        assert_eq!(c.next().unwrap().unwrap(), b'j');
        assert_eq!(c.next().unwrap().unwrap(), b'4');
        assert_eq!(c.next().unwrap().unwrap(), b'2');
        assert!(c.next().is_none());

        let stream = b"a\r\nabc\r\nfghij\r\n2\r\n42\r\n";
        let mut c = ChunkedBytes::new(stream.iter().map(|&x| Ok(x)));
        assert_eq!(c.next().unwrap().unwrap(), b'a');
        assert_eq!(c.next().unwrap().unwrap(), b'b');
        assert_eq!(c.next().unwrap().unwrap(), b'c');
        assert_eq!(c.next().unwrap().unwrap(), b'\r');
        assert_eq!(c.next().unwrap().unwrap(), b'\n');
        assert_eq!(c.next().unwrap().unwrap(), b'f');
        assert_eq!(c.next().unwrap().unwrap(), b'g');
        assert_eq!(c.next().unwrap().unwrap(), b'h');
        assert_eq!(c.next().unwrap().unwrap(), b'i');
        assert_eq!(c.next().unwrap().unwrap(), b'j');
        assert_eq!(c.next().unwrap().unwrap(), b'4');
        assert_eq!(c.next().unwrap().unwrap(), b'2');
        assert!(c.next().is_none());

        let stream = b"4\r\nabcd\r\n0\r\n\r\n";
        let mut iter = stream.iter().map(|&x| Ok(x));

        {
            let mut c = ChunkedBytes::new(&mut iter);
            assert_eq!(c.next().unwrap().unwrap(), b'a');
            assert_eq!(c.next().unwrap().unwrap(), b'b');
            assert_eq!(c.next().unwrap().unwrap(), b'c');
            assert_eq!(c.next().unwrap().unwrap(), b'd');
            assert!(c.next().is_none());
        }

        assert_eq!(iter.next().unwrap().unwrap(), b'\r');
        assert_eq!(iter.next().unwrap().unwrap(), b'\n');
        assert!(iter.next().is_none());

        let stream = b"4\r\nabcd\r\n0\r\nA: B\r\n\r\n";
        let mut iter = stream.iter().map(|&x| Ok(x));

        {
            let mut c = ChunkedBytes::new(&mut iter);
            assert_eq!(c.next().unwrap().unwrap(), b'a');
            assert_eq!(c.next().unwrap().unwrap(), b'b');
            assert_eq!(c.next().unwrap().unwrap(), b'c');
            assert_eq!(c.next().unwrap().unwrap(), b'd');
            assert!(c.next().is_none());
        }

        assert_eq!(iter.next().unwrap().unwrap(), b'A');
        assert_eq!(iter.next().unwrap().unwrap(), b':');
        assert_eq!(iter.next().unwrap().unwrap(), b' ');
        assert_eq!(iter.next().unwrap().unwrap(), b'B');
        assert_eq!(iter.next().unwrap().unwrap(), b'\r');
        assert_eq!(iter.next().unwrap().unwrap(), b'\n');
        assert_eq!(iter.next().unwrap().unwrap(), b'\r');
        assert_eq!(iter.next().unwrap().unwrap(), b'\n');
        assert!(iter.next().is_none());

        let stream = b"";
        let mut c = ChunkedBytes::new(stream.iter().map(|&x| Ok(x)));
        assert!(c.next().is_none());

        let stream = b"0\r\n\r\n";
        let mut c = ChunkedBytes::new(stream.iter().map(|&x| Ok(x)));
        assert!(c.next().is_none());

        let stream = b"h\r\n";
        let mut c = ChunkedBytes::new(stream.iter().map(|&x| Ok(x)));
        assert!(c.next().unwrap().is_err());

        let stream = b"\r\na";
        let mut c = ChunkedBytes::new(stream.iter().map(|&x| Ok(x)));
        assert!(c.next().unwrap().is_err());

        let stream = b"4\r\nabcdefg";
        let mut c = ChunkedBytes::new(stream.iter().map(|&x| Ok(x)));
        assert_eq!(c.next().unwrap().unwrap(), b'a');
        assert_eq!(c.next().unwrap().unwrap(), b'b');
        assert_eq!(c.next().unwrap().unwrap(), b'c');
        assert!(c.next().unwrap().is_err());
    }


    #[cfg(target_pointer_width = "64")]
    #[test]
    fn test_max_size() {
        let stream = b"FFFFFFFFFFFFFFFF\r\na";
        let mut c = ChunkedBytes::new(stream.iter().map(|&x| Ok(x)));
        assert_eq!(c.next().unwrap().unwrap(), b'a');
        assert_eq!(c.remain, std::usize::MAX - 1);

        let stream = b"FFFFFFFFFFFFFFFFF\r\na";
        let mut c = ChunkedBytes::new(stream.iter().map(|&x| Ok(x)));
        assert!(c.next().unwrap().is_err());
    }

    #[cfg(target_pointer_width = "32")]
    #[test]
    fn test_max_size() {
        let stream = b"FFFFFFFF\r\na";
        let mut c = ChunkedBytes::new(stream.iter().map(|&x| Ok(x)));
        assert_eq!(c.next().unwrap().unwrap(), b'a');
        assert_eq!(c.remain, std::usize::MAX - 1);

        let stream = b"FFFFFFFFF\r\na";
        let mut c = ChunkedBytes::new(stream.iter().map(|&x| Ok(x)));
        assert!(c.next().unwrap().is_err());
    }
}