dbs_uhttp/
connection.rs

1// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::VecDeque;
5use std::fs::File;
6use std::io::{Read, Write};
7use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
8
9use crate::common::ascii::{CR, CRLF_LEN, LF};
10use crate::common::sock_ctrl_msg::ScmSocket;
11use crate::common::Body;
12pub use crate::common::{ConnectionError, HttpHeaderError, RequestError};
13use crate::headers::Headers;
14use crate::request::{find, Request, RequestLine};
15use crate::response::{Response, StatusCode};
16use crate::server::MAX_PAYLOAD_SIZE;
17
18const BUFFER_SIZE: usize = 1024;
19const SCM_MAX_FD: usize = 253;
20
21/// Describes the state machine of an HTTP connection.
22enum ConnectionState {
23    WaitingForRequestLine,
24    WaitingForHeaders,
25    WaitingForBody,
26    RequestReady,
27}
28
29/// A wrapper over a HTTP Connection.
30pub struct HttpConnection<T> {
31    /// A partial request that is still being received.
32    pending_request: Option<Request>,
33    /// Stream implementing `Read` and `Write`, capable of sending and
34    /// receiving bytes.
35    stream: T,
36    /// The state of the connection regarding the current request that
37    /// is being processed.
38    state: ConnectionState,
39    /// Buffer where we store the bytes we read from the stream.
40    buffer: [u8; BUFFER_SIZE],
41    /// The index in the buffer from where we have to start reading in
42    /// the next `try_read` call.
43    read_cursor: usize,
44    /// Contains all bytes pertaining to the body of the request that
45    /// is currently being processed.
46    body_vec: Vec<u8>,
47    /// Represents how many bytes from the body of the request are still
48    /// to be read.
49    body_bytes_to_be_read: u32,
50    /// A queue of all requests that have been fully received and parsed.
51    parsed_requests: VecDeque<Request>,
52    /// A queue of requests that are waiting to be sent.
53    response_queue: VecDeque<Response>,
54    /// A buffer containing the bytes of a response that is currently
55    /// being sent.
56    response_buffer: Option<Vec<u8>>,
57    /// The list of files that has been received and which must be associated
58    /// with the pending request.
59    files: Vec<File>,
60    /// Optional payload max size.
61    payload_max_size: usize,
62}
63
64impl<T> AsRawFd for HttpConnection<T>
65where
66    T: AsRawFd,
67{
68    fn as_raw_fd(&self) -> RawFd {
69        self.stream.as_raw_fd()
70    }
71}
72
73// impl<T: Read + Write + ScmSocket> HttpConnection<T> {
74impl<T: Read + Write + ScmSocket> HttpConnection<T> {
75    /// Creates an empty connection.
76    pub fn new(stream: T) -> Self {
77        Self {
78            pending_request: None,
79            stream,
80            state: ConnectionState::WaitingForRequestLine,
81            buffer: [0; BUFFER_SIZE],
82            read_cursor: 0,
83            body_vec: vec![],
84            body_bytes_to_be_read: 0,
85            parsed_requests: VecDeque::new(),
86            response_queue: VecDeque::new(),
87            response_buffer: None,
88            files: Vec::new(),
89            payload_max_size: MAX_PAYLOAD_SIZE,
90        }
91    }
92
93    /// This function sets the limit for PUT/PATCH requests. It overwrites the
94    /// default limit of 0.05MiB with the one allowed by server.
95    pub fn set_payload_max_size(&mut self, request_payload_max_size: usize) {
96        self.payload_max_size = request_payload_max_size;
97    }
98
99    /// Tries to read new bytes from the stream and automatically update the request.
100    /// Meant to be used only with non-blocking streams and an `EPOLL` structure.
101    /// Should be called whenever an `EPOLLIN` event is signaled.
102    ///
103    /// # Errors
104    /// `StreamError` is returned when an IO operation fails.
105    /// `ConnectionClosed` is returned when a client prematurely closes the connection.
106    /// `ParseError` is returned when a parsing operation fails.
107    pub fn try_read(&mut self) -> Result<(), ConnectionError> {
108        // Read some bytes from the stream, which will be appended to what is already
109        // present in the buffer from a previous call of `try_read`. There are already
110        // `read_cursor` bytes present in the buffer.
111        let end_cursor = self.read_bytes()?;
112
113        let mut line_start_index = 0;
114        loop {
115            match self.state {
116                ConnectionState::WaitingForRequestLine => {
117                    if !self.parse_request_line(&mut line_start_index, end_cursor)? {
118                        return Ok(());
119                    }
120                }
121                ConnectionState::WaitingForHeaders => {
122                    if !self.parse_headers(&mut line_start_index, end_cursor)? {
123                        return Ok(());
124                    }
125                }
126                ConnectionState::WaitingForBody => {
127                    if !self.parse_body(&mut line_start_index, end_cursor)? {
128                        return Ok(());
129                    }
130                }
131                ConnectionState::RequestReady => {
132                    // This request is ready to be passed for handling.
133                    // Update the state machine to expect a new request and push this request into
134                    // the `parsed_requests` queue.
135                    self.state = ConnectionState::WaitingForRequestLine;
136                    self.body_bytes_to_be_read = 0;
137                    let mut pending_request = self.pending_request.take().unwrap();
138                    pending_request.files = self.files.drain(..).collect();
139                    self.parsed_requests.push_back(pending_request);
140                }
141            };
142        }
143    }
144
145    /// Reads a maximum of 1024 bytes from the stream into `buffer`.
146    /// The return value represents the end index of what we have just appended.
147    ///
148    /// # Errors
149    /// `StreamError` is returned if any error occurred while reading the stream.
150    /// `ConnectionClosed` is returned if the client closed the connection.
151    /// `Overflow` is returned if an arithmetic overflow occurs while parsing the request.
152    fn read_bytes(&mut self) -> Result<usize, ConnectionError> {
153        if self.read_cursor >= BUFFER_SIZE {
154            return Err(ConnectionError::ParseError(RequestError::Overflow));
155        }
156        // Append new bytes to what we already have in the buffer.
157        // The slice access is safe, the index is checked above.
158        let (bytes_read, new_files) = self.recv_with_fds()?;
159
160        // Update the internal list of files that must be associated with the
161        // request.
162        self.files.extend(new_files);
163
164        // If the read returned 0 then the client has closed the connection.
165        if bytes_read == 0 {
166            return Err(ConnectionError::ConnectionClosed);
167        }
168        bytes_read
169            .checked_add(self.read_cursor)
170            .ok_or(ConnectionError::ParseError(RequestError::Overflow))
171    }
172
173    /// Receive data along with optional files descriptors.
174    /// It is a wrapper around the same function from vmm-sys-util.
175    ///
176    /// # Errors
177    /// `StreamError` is returned if any error occurred while reading the stream.
178    fn recv_with_fds(&mut self) -> Result<(usize, Vec<File>), ConnectionError> {
179        let buf = &mut self.buffer[self.read_cursor..];
180        // We must allocate the maximum number of receivable file descriptors
181        // if don't want to miss any of them. Allocating a too small number
182        // would lead to the incapacity of receiving the file descriptors.
183        let mut fds = [0; SCM_MAX_FD];
184        let mut iovecs = [libc::iovec {
185            iov_base: buf.as_mut_ptr() as *mut _,
186            iov_len: buf.len(),
187        }];
188
189        let (read_count, fd_count) = unsafe {
190            self.stream
191                .recv_with_fds(&mut iovecs, &mut fds)
192                .map_err(ConnectionError::StreamReadError)?
193        };
194
195        Ok((
196            read_count,
197            fds.iter()
198                .take(fd_count)
199                .map(|fd| {
200                    // Safe because all fds are owned by us after they have been
201                    // received through the socket.
202                    unsafe { File::from_raw_fd(*fd) }
203                })
204                .collect(),
205        ))
206    }
207
208    /// Parses bytes in `buffer` for a valid request line.
209    /// Returns `false` if there are no more bytes to be parsed in the buffer.
210    ///
211    /// # Errors
212    /// `ParseError` is returned if unable to parse request line or line longer than BUFFER_SIZE.
213    fn parse_request_line(
214        &mut self,
215        start: &mut usize,
216        end: usize,
217    ) -> Result<bool, ConnectionError> {
218        if end < *start {
219            return Err(ConnectionError::ParseError(RequestError::Underflow));
220        }
221        if end > self.buffer.len() {
222            return Err(ConnectionError::ParseError(RequestError::Overflow));
223        }
224        // The slice access is safe because `end` is checked to be smaller than the buffer size
225        // and larger than `start`.
226        match find(&self.buffer[*start..end], &[CR, LF]) {
227            Some(line_end_index) => {
228                // The unchecked addition `start + line_end_index` is safe because `line_end_index`
229                // is returned by `find` and thus guaranteed to be in-bounds. This also makes the
230                // slice access safe.
231                let line = &self.buffer[*start..(*start + line_end_index)];
232
233                // The unchecked addition is safe because of the previous `find()`.
234                *start = *start + line_end_index + CRLF_LEN;
235
236                // Form the request with a valid request line, which is the bare minimum
237                // for a valid request.
238                self.pending_request = Some(Request {
239                    request_line: RequestLine::try_from(line)
240                        .map_err(ConnectionError::ParseError)?,
241                    headers: Headers::default(),
242                    body: None,
243                    files: Vec::new(),
244                });
245                self.state = ConnectionState::WaitingForHeaders;
246                Ok(true)
247            }
248            None => {
249                // The request line is longer than BUFFER_SIZE bytes, so the request is invalid.
250                if end == BUFFER_SIZE && *start == 0 {
251                    return Err(ConnectionError::ParseError(RequestError::InvalidRequest));
252                } else {
253                    // Move the incomplete request line to the beginning of the buffer and wait
254                    // for the next `try_read` call to complete it.
255                    // This can only happen if another request was sent before this one, as the
256                    // limit for the length of a request line in this implementation is 1024 bytes.
257                    self.shift_buffer_left(*start, end)
258                        .map_err(ConnectionError::ParseError)?;
259                }
260                Ok(false)
261            }
262        }
263    }
264
265    /// Parses bytes in `buffer` for header fields.
266    /// Returns `false` if there are no more bytes to be parsed in the buffer.
267    ///
268    /// # Errors
269    /// `ParseError` is returned if unable to parse header or line longer than BUFFER_SIZE.
270    fn parse_headers(
271        &mut self,
272        line_start_index: &mut usize,
273        end_cursor: usize,
274    ) -> Result<bool, ConnectionError> {
275        if end_cursor > self.buffer.len() {
276            return Err(ConnectionError::ParseError(RequestError::Overflow));
277        }
278        if end_cursor < *line_start_index {
279            return Err(ConnectionError::ParseError(RequestError::Underflow));
280        }
281        // Safe to access the slice as the bounds are checked above.
282        match find(&self.buffer[*line_start_index..end_cursor], &[CR, LF]) {
283            // `line_start_index` points to the end of the most recently found CR LF
284            // sequence. That means that if we found the next CR LF sequence at this index,
285            // they are, in fact, a CR LF CR LF sequence, which marks the end of the header
286            // fields, per HTTP specification.
287
288            // We have found the end of the header.
289            Some(0) => {
290                // The current state is `WaitingForHeaders`, ensuring a valid request formed from a
291                // request line.
292                let request = self
293                    .pending_request
294                    .as_mut()
295                    .ok_or(ConnectionError::ParseError(
296                        RequestError::HeadersWithoutPendingRequest,
297                    ))?;
298                if request.headers.content_length() == 0 {
299                    self.state = ConnectionState::RequestReady;
300                } else {
301                    if request.headers.content_length() as usize > self.payload_max_size {
302                        return Err(ConnectionError::ParseError(
303                            RequestError::SizeLimitExceeded(
304                                self.payload_max_size,
305                                request.headers.content_length() as usize,
306                            ),
307                        ));
308                    }
309                    if request.headers.expect() {
310                        // Send expect.
311                        let expect_response =
312                            Response::new(request.http_version(), StatusCode::Continue);
313                        self.response_queue.push_back(expect_response);
314                    }
315
316                    self.body_bytes_to_be_read = request.headers.content_length();
317                    request.body = Some(Body::new(vec![]));
318                    self.state = ConnectionState::WaitingForBody;
319                }
320
321                // Update the index for the next header.
322                *line_start_index = line_start_index
323                    .checked_add(CRLF_LEN)
324                    .ok_or(ConnectionError::ParseError(RequestError::Overflow))?;
325                Ok(true)
326            }
327            // We have found the end of a header line.
328            Some(relative_line_end_index) => {
329                let request = self
330                    .pending_request
331                    .as_mut()
332                    .ok_or(ConnectionError::ParseError(
333                        RequestError::HeadersWithoutPendingRequest,
334                    ))?;
335                // The `line_end_index` relative to the whole buffer.
336                let line_end_index = relative_line_end_index
337                    .checked_add(*line_start_index)
338                    .ok_or(ConnectionError::ParseError(RequestError::Overflow))?;
339
340                // Get the line slice and parse it.
341                // The slice access is safe because `line_end_index` is a sum of `line_end_index`
342                // and something else, and `line_end_index` itself is guaranteed to be within
343                // `self.buffer`'s bounds by the `find()`.
344                let line = &self.buffer[*line_start_index..line_end_index];
345                match request.headers.parse_header_line(line) {
346                    // If a header is unsupported we ignore it.
347                    Ok(_)
348                    | Err(RequestError::HeaderError(HttpHeaderError::UnsupportedValue(_, _))) => {}
349                    // If parsing the header invalidates the request, we propagate
350                    // the error.
351                    Err(e) => return Err(ConnectionError::ParseError(e)),
352                };
353
354                // Update the `line_start_index` to where we finished parsing.
355                *line_start_index = line_end_index
356                    .checked_add(CRLF_LEN)
357                    .ok_or(ConnectionError::ParseError(RequestError::Overflow))?;
358                Ok(true)
359            }
360            // If we have an incomplete header line.
361            None => {
362                // If we have parsed BUFFER_SIZE bytes and still haven't found the header
363                // line end sequence.
364                if *line_start_index == 0 && end_cursor == BUFFER_SIZE {
365                    // Header line is longer than BUFFER_SIZE bytes, so it is invalid.
366                    let utf8_string = String::from_utf8_lossy(&self.buffer);
367                    return Err(ConnectionError::ParseError(RequestError::HeaderError(
368                        HttpHeaderError::SizeLimitExceeded(utf8_string.to_string()),
369                    )));
370                }
371                // Move the incomplete header line from the end of the buffer to
372                // the beginning, so that we can append the rest of the line and
373                // parse it in the next `try_read` call.
374                self.shift_buffer_left(*line_start_index, end_cursor)
375                    .map_err(ConnectionError::ParseError)?;
376                Ok(false)
377            }
378        }
379    }
380
381    /// Parses bytes in `buffer` to be put into the request body, if there should be one.
382    /// Returns `false` if there are no more bytes to be parsed in the buffer.
383    ///
384    /// # Errors
385    /// `ParseError` is returned when the body is larger than the specified content-length.
386    fn parse_body(
387        &mut self,
388        line_start_index: &mut usize,
389        end_cursor: usize,
390    ) -> Result<bool, ConnectionError> {
391        // If what we have just read is not enough to complete the request and
392        // there are more bytes pertaining to the body of the request.
393        if end_cursor > self.buffer.len() {
394            return Err(ConnectionError::ParseError(RequestError::Overflow));
395        }
396        let start_to_end = end_cursor
397            .checked_sub(*line_start_index)
398            .ok_or(ConnectionError::ParseError(RequestError::Underflow))?
399            as u32;
400        if self.body_bytes_to_be_read > start_to_end {
401            // Append everything that we read to our current incomplete body and update
402            // `body_bytes_to_be_read`.
403            // The slice access is safe, otherwise `checked_sub` would have failed.
404            self.body_vec
405                .extend_from_slice(&self.buffer[*line_start_index..end_cursor]);
406            // Safe to subtract directly as the `if` condition prevents underflow.
407            self.body_bytes_to_be_read -= start_to_end;
408
409            // Clear the buffer and reset the starting index.
410            for i in 0..BUFFER_SIZE {
411                self.buffer[i] = 0;
412            }
413            self.read_cursor = 0;
414
415            return Ok(false);
416        }
417
418        // Append only the remaining necessary bytes to the body of the request.
419        let line_end = line_start_index
420            .checked_add(self.body_bytes_to_be_read as usize)
421            .ok_or(ConnectionError::ParseError(RequestError::Overflow))?;
422        // The slice access is safe as `line_end` is a sum of `line_start_index` + something else.
423        self.body_vec
424            .extend_from_slice(&self.buffer[*line_start_index..line_end]);
425        *line_start_index = line_end;
426        self.body_bytes_to_be_read = 0;
427
428        let request = self
429            .pending_request
430            .as_mut()
431            .ok_or(ConnectionError::ParseError(
432                RequestError::BodyWithoutPendingRequest,
433            ))?;
434        // If there are no more bytes to be read for this request.
435        // Assign the body of the request.
436        let placeholder: Vec<_> = self
437            .body_vec
438            .drain(..request.headers.content_length() as usize)
439            .collect();
440        request.body = Some(Body::new(placeholder));
441
442        // If we read more bytes than we should have into the body of the request.
443        if !self.body_vec.is_empty() {
444            return Err(ConnectionError::ParseError(RequestError::InvalidRequest));
445        }
446
447        self.state = ConnectionState::RequestReady;
448        Ok(true)
449    }
450
451    /// Tries to write the first available response to the provided stream.
452    /// Meant to be used only with non-blocking streams and an `EPOLL` structure.
453    /// Should be called whenever an `EPOLLOUT` event is signaled. If no bytes
454    /// were written to the stream or error occurred while trying to write to stream,
455    /// we will discard all responses from response_queue because there is no way
456    /// to deliver it to client.
457    ///
458    /// # Errors
459    /// `StreamError` is returned when an IO operation fails.
460    /// `ConnectionClosed` is returned when trying to write on a closed connection.
461    /// `InvalidWrite` is returned when trying to write on a connection with an
462    /// empty outgoing buffer.
463    pub fn try_write(&mut self) -> Result<(), ConnectionError> {
464        if self.response_buffer.is_none() {
465            if let Some(response) = self.response_queue.pop_front() {
466                let mut response_buffer_vec: Vec<u8> = Vec::new();
467                response
468                    .write_all(&mut response_buffer_vec)
469                    .map_err(ConnectionError::StreamWriteError)?;
470                self.response_buffer = Some(response_buffer_vec);
471            } else {
472                return Err(ConnectionError::InvalidWrite);
473            }
474        }
475
476        let mut response_fully_written = false;
477        let mut connection_closed = false;
478
479        if let Some(response_buffer_vec) = self.response_buffer.as_mut() {
480            let bytes_to_be_written = response_buffer_vec.len();
481            match self.stream.write(response_buffer_vec.as_slice()) {
482                Ok(0) => connection_closed = true,
483                Ok(bytes_written) => {
484                    if bytes_written != bytes_to_be_written {
485                        response_buffer_vec.drain(..bytes_written);
486                    } else {
487                        response_fully_written = true;
488                    }
489                }
490                Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
491                Err(_) => connection_closed = true,
492            }
493        }
494
495        if connection_closed {
496            self.clear_write_buffer();
497            return Err(ConnectionError::ConnectionClosed);
498        } else if response_fully_written {
499            self.response_buffer.take();
500        }
501
502        Ok(())
503    }
504
505    /// Discards all pending writes from the connection.
506    pub fn clear_write_buffer(&mut self) {
507        self.response_queue.clear();
508        self.response_buffer.take();
509    }
510
511    /// Send a response back to the source of a request.
512    pub fn enqueue_response(&mut self, response: Response) {
513        self.response_queue.push_back(response);
514    }
515
516    fn shift_buffer_left(
517        &mut self,
518        line_start_index: usize,
519        end_cursor: usize,
520    ) -> Result<(), RequestError> {
521        if end_cursor > self.buffer.len() {
522            return Err(RequestError::Overflow);
523        }
524        // We don't want to shift something that is already at the beginning.
525        let delta_bytes = end_cursor
526            .checked_sub(line_start_index)
527            .ok_or(RequestError::Underflow)?;
528        if line_start_index != 0 {
529            // Move the bytes from `line_start_index` to the beginning of the buffer.
530            for cursor in 0..delta_bytes {
531                // The unchecked addition is safe, guaranteed by the result of the substraction
532                // above.
533                // The slice access is safe, as `line_start_index + cursor` is <= `end_cursor`,
534                // checked at the start of the function.
535                self.buffer[cursor] = self.buffer[line_start_index + cursor];
536            }
537
538            // Clear the rest of the buffer.
539            for cursor in delta_bytes..end_cursor {
540                self.buffer[cursor] = 0;
541            }
542        }
543
544        // Update `read_cursor`.
545        self.read_cursor = delta_bytes;
546        Ok(())
547    }
548
549    /// Returns the first parsed request in the queue or `None` if the queue
550    /// is empty.
551    pub fn pop_parsed_request(&mut self) -> Option<Request> {
552        self.parsed_requests.pop_front()
553    }
554
555    /// Returns if parsed_requests is not empty
556    pub fn has_parsed_requests(&self) -> bool {
557        !self.parsed_requests.is_empty()
558    }
559
560    /// Returns `true` if there are bytes waiting to be written into the stream.
561    pub fn pending_write(&self) -> bool {
562        self.response_buffer.is_some() || !self.response_queue.is_empty()
563    }
564}
565
566#[cfg(test)]
567mod tests {
568    use std::io::{Seek, SeekFrom};
569    use std::net::Shutdown;
570    use std::os::unix::net::UnixStream;
571    use std::os::unix::prelude::IntoRawFd;
572
573    use super::*;
574    use crate::common::{Method, Version};
575    use crate::server::MAX_PAYLOAD_SIZE;
576
577    use vmm_sys_util::tempfile::TempFile;
578
579    #[test]
580    fn test_try_read_expect() {
581        // Test request with `Expect` header.
582        let (mut sender, receiver) = UnixStream::pair().unwrap();
583        receiver.set_nonblocking(true).expect("Can't modify socket");
584        let mut conn = HttpConnection::new(receiver);
585        sender
586            .write_all(
587                b"PATCH http://localhost/home HTTP/1.1\r\n\
588                                 Expect: 100-continue\r\n\
589                                 Content-Length: 26\r\n\
590                                 Transfer-Encoding: chunked\r\n\r\n",
591            )
592            .unwrap();
593        assert!(conn.try_read().is_ok());
594
595        sender.write_all(b"this is not\n\r\na json \nbody").unwrap();
596        conn.try_read().unwrap();
597        let request = conn.pop_parsed_request().unwrap();
598
599        let expected_request = Request {
600            request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
601            headers: Headers::new(26, true, true),
602            body: Some(Body::new(b"this is not\n\r\na json \nbody".to_vec())),
603            files: Vec::new(),
604        };
605
606        assert_eq!(request, expected_request);
607    }
608
609    #[test]
610    fn test_try_read_long_headers() {
611        // Long request headers.
612        let (mut sender, receiver) = UnixStream::pair().unwrap();
613        receiver.set_nonblocking(true).expect("Can't modify socket");
614        let mut conn = HttpConnection::new(receiver);
615        sender
616            .write_all(
617                b"PATCH http://localhost/home HTTP/1.1\r\n\
618                                 Expect: 100-continue\r\n\
619                                 Transfer-Encoding: chunked\r\n",
620            )
621            .unwrap();
622
623        for i in 0..90 {
624            sender.write_all(b"Custom-Header-Testing: 1").unwrap();
625            sender.write_all(i.to_string().as_bytes()).unwrap();
626            sender.write_all(b"\r\n").unwrap();
627        }
628        sender
629            .write_all(b"Content-Length: 26\r\n\r\nthis is not\n\r\na json \nbody")
630            .unwrap();
631        assert!(conn.try_read().is_ok());
632        assert!(conn.try_read().is_ok());
633        assert!(conn.try_read().is_ok());
634        let request = conn.pop_parsed_request().unwrap();
635
636        let expected_request = Request {
637            request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
638            headers: Headers::new(26, true, true),
639            body: Some(Body::new(b"this is not\n\r\na json \nbody".to_vec())),
640            files: Vec::new(),
641        };
642        assert_eq!(request, expected_request);
643    }
644
645    #[test]
646    fn test_try_read_split_ending() {
647        // Long request with '\r\n' on BUFFER_SIZEth and 1025th positions in the request.
648        let (mut sender, receiver) = UnixStream::pair().unwrap();
649        receiver.set_nonblocking(true).expect("Can't modify socket");
650        let mut conn = HttpConnection::new(receiver);
651        sender
652            .write_all(
653                b"PATCH http://localhost/home HTTP/1.1\r\n\
654                                 Expect: 100-continue\r\n\
655                                 Transfer-Encoding: chunked\r\n",
656            )
657            .unwrap();
658
659        for i in 0..32 {
660            sender.write_all(b"Custom-Header-Testing: 1").unwrap();
661            sender.write_all(i.to_string().as_bytes()).unwrap();
662            sender.write_all(b"\r\n").unwrap();
663        }
664        sender
665            .write_all(b"Head: aaaaa\r\nContent-Length: 26\r\n\r\nthis is not\n\r\na json \nbody")
666            .unwrap();
667        assert!(conn.try_read().is_ok());
668        conn.try_read().unwrap();
669        let request = conn.pop_parsed_request().unwrap();
670        let expected_request = Request {
671            request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
672            headers: Headers::new(26, true, true),
673            body: Some(Body::new(b"this is not\n\r\na json \nbody".to_vec())),
674            files: Vec::new(),
675        };
676        assert_eq!(request, expected_request);
677    }
678
679    #[test]
680    fn test_try_read_invalid_request() {
681        // Invalid request.
682        let (mut sender, receiver) = UnixStream::pair().unwrap();
683        receiver.set_nonblocking(true).expect("Can't modify socket");
684        let mut conn = HttpConnection::new(receiver);
685        sender
686            .write_all(
687                b"PATCH http://localhost/home HTTP/1.1\r\n\
688                                 Expect: 100-continue\r\n\
689                                 Transfer-Encoding: chunked\r\n",
690            )
691            .unwrap();
692
693        for i in 0..40 {
694            sender.write_all(b"Custom-Header-Testing: 1").unwrap();
695            sender.write_all(i.to_string().as_bytes()).unwrap();
696            sender.write_all(b"\r\n").unwrap();
697        }
698        sender
699            .write_all(b"Content-Length: alpha\r\n\r\nthis is not\n\r\na json \nbody")
700            .unwrap();
701        assert!(conn.try_read().is_ok());
702        let request_error = conn.try_read().unwrap_err();
703        assert_eq!(
704            request_error,
705            ConnectionError::ParseError(RequestError::HeaderError(HttpHeaderError::InvalidValue(
706                "Content-Length".to_string(),
707                " alpha".to_string()
708            )))
709        );
710    }
711
712    #[test]
713    fn test_try_read_long_request_body() {
714        // Long request body.
715        let (mut sender, receiver) = UnixStream::pair().unwrap();
716        receiver.set_nonblocking(true).expect("Can't modify socket");
717        let mut conn = HttpConnection::new(receiver);
718        sender
719            .write_all(
720                b"PATCH http://localhost/home HTTP/1.1\r\n\
721                                 Expect: 100-continue\r\n\
722                                 Transfer-Encoding: chunked\r\n\
723                                 Content-Length: 1400\r\n\r\n",
724            )
725            .unwrap();
726
727        let mut request_body: Vec<u8> = Vec::with_capacity(1400);
728        for _ in 0..100 {
729            request_body.write_all(b"This is a test").unwrap();
730        }
731        sender.write_all(request_body.as_slice()).unwrap();
732        assert!(conn.try_read().is_ok());
733        conn.try_read().unwrap();
734        let request = conn.pop_parsed_request().unwrap();
735        let expected_request = Request {
736            request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
737            headers: Headers::new(1400, true, true),
738            body: Some(Body::new(request_body)),
739            files: Vec::new(),
740        };
741
742        assert_eq!(request, expected_request);
743    }
744
745    #[test]
746    fn test_try_read_large_req_line() {
747        // Request line longer than BUFFER_SIZE bytes.
748        let (mut sender, receiver) = UnixStream::pair().unwrap();
749        receiver.set_nonblocking(true).expect("Can't modify socket");
750        let mut conn = HttpConnection::new(receiver);
751        sender.write_all(b"PATCH http://localhost/home").unwrap();
752
753        let mut request_body: Vec<u8> = Vec::with_capacity(1400);
754        for _ in 0..200 {
755            request_body.write_all(b"/home").unwrap();
756        }
757        sender.write_all(request_body.as_slice()).unwrap();
758        assert_eq!(
759            conn.try_read().unwrap_err(),
760            ConnectionError::ParseError(RequestError::InvalidRequest)
761        );
762    }
763
764    #[test]
765    fn test_try_read_large_header_line() {
766        // Header line longer than BUFFER_SIZE bytes.
767        let (mut sender, receiver) = UnixStream::pair().unwrap();
768        receiver.set_nonblocking(true).expect("Can't modify socket");
769        let mut conn = HttpConnection::new(receiver);
770        sender
771            .write_all(b"PATCH http://localhost/home HTTP/1.1\r\nhead: ")
772            .unwrap();
773
774        let mut request_body: Vec<u8> = Vec::with_capacity(1030);
775        for _ in 0..86 {
776            request_body.write_all(b"abcdefghijkl").unwrap();
777        }
778        request_body.write_all(b"\r\n\r\n").unwrap();
779        sender.write_all(request_body.as_slice()).unwrap();
780        assert!(conn.try_read().is_ok());
781
782        let expected_msg = &format!("head: {}", String::from_utf8(request_body).unwrap())[..1024];
783        assert_eq!(
784            conn.try_read().unwrap_err(),
785            ConnectionError::ParseError(RequestError::HeaderError(
786                HttpHeaderError::SizeLimitExceeded(expected_msg.to_string())
787            ))
788        );
789    }
790
791    #[test]
792    fn test_try_read_no_body_request() {
793        // Request without body.
794        let (mut sender, receiver) = UnixStream::pair().unwrap();
795        receiver.set_nonblocking(true).expect("Can't modify socket");
796        let mut conn = HttpConnection::new(receiver);
797        sender
798            .write_all(
799                b"PATCH http://localhost/home HTTP/1.1\r\n\
800                                 Expect: 100-continue\r\n\
801                                 Transfer-Encoding: chunked\r\n\r\n",
802            )
803            .unwrap();
804        conn.try_read().unwrap();
805        let request = conn.pop_parsed_request().unwrap();
806        let expected_request = Request {
807            request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
808            headers: Headers::new(0, true, true),
809            body: None,
810            files: Vec::new(),
811        };
812        assert_eq!(request, expected_request);
813    }
814
815    #[test]
816    fn test_try_read_segmented_req_line() {
817        // Segmented request line.
818        let (mut sender, receiver) = UnixStream::pair().unwrap();
819        receiver.set_nonblocking(true).expect("Can't modify socket");
820        let mut conn = HttpConnection::new(receiver);
821        sender.write_all(b"PATCH http://local").unwrap();
822        assert!(conn.try_read().is_ok());
823
824        sender.write_all(b"host/home HTTP/1.1\r\n\r\n").unwrap();
825
826        conn.try_read().unwrap();
827        let request = conn.pop_parsed_request().unwrap();
828        let expected_request = Request {
829            request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
830            headers: Headers::new(0, false, false),
831            body: None,
832            files: Vec::new(),
833        };
834        assert_eq!(request, expected_request);
835    }
836
837    #[test]
838    fn test_try_read_long_req_line_b2b() {
839        // Long request line after another request.
840        let (mut sender, receiver) = UnixStream::pair().unwrap();
841        receiver.set_nonblocking(true).expect("Can't modify socket");
842        let mut conn = HttpConnection::new(receiver);
843        // Req line 23 + 10*x + 13 = 36 + 10* x    984 free in first try read
844        sender
845            .write_all(b"PATCH http://localhost/home HTTP/1.1\r\n\r\nPATCH http://localhost/")
846            .unwrap();
847
848        let mut request_line: Vec<u8> = Vec::with_capacity(980);
849        for _ in 0..98 {
850            request_line.write_all(b"localhost/").unwrap();
851        }
852        request_line.write_all(b" HTTP/1.1\r\n\r\n").unwrap();
853        sender.write_all(request_line.as_slice()).unwrap();
854
855        conn.try_read().unwrap();
856        let request = conn.pop_parsed_request().unwrap();
857        let expected_request = Request {
858            request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
859            headers: Headers::new(0, false, false),
860            body: None,
861            files: Vec::new(),
862        };
863        assert_eq!(request, expected_request);
864
865        conn.try_read().unwrap();
866        let request = conn.pop_parsed_request().unwrap();
867        let mut expected_request_as_bytes = Vec::new();
868        expected_request_as_bytes
869            .write_all(b"http://localhost/")
870            .unwrap();
871        expected_request_as_bytes.append(request_line.as_mut());
872        let expected_request = Request {
873            request_line: RequestLine::new(
874                Method::Patch,
875                std::str::from_utf8(&expected_request_as_bytes[..997]).unwrap(),
876                Version::Http11,
877            ),
878            headers: Headers::new(0, false, false),
879            body: None,
880            files: Vec::new(),
881        };
882        assert_eq!(request, expected_request);
883    }
884
885    #[test]
886    fn test_try_read_double_request() {
887        // Double request in a single read.
888        let (mut sender, receiver) = UnixStream::pair().unwrap();
889        receiver.set_nonblocking(true).expect("Can't modify socket");
890        let mut conn = HttpConnection::new(receiver);
891        sender
892            .write_all(
893                b"PATCH http://localhost/home HTTP/1.1\r\n\
894                                 Transfer-Encoding: chunked\r\n\
895                                 Content-Length: 26\r\n\r\nthis is not\n\r\na json \nbody",
896            )
897            .unwrap();
898        sender
899            .write_all(
900                b"PUT http://farhost/away HTTP/1.1\r\nContent-Length: 23\r\n\r\nthis is another request",
901            )
902            .unwrap();
903
904        let expected_request_first = Request {
905            request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
906            headers: Headers::new(26, false, true),
907            body: Some(Body::new(b"this is not\n\r\na json \nbody".to_vec())),
908            files: Vec::new(),
909        };
910
911        conn.try_read().unwrap();
912        let request_first = conn.pop_parsed_request().unwrap();
913        let request_second = conn.pop_parsed_request().unwrap();
914
915        let expected_request_second = Request {
916            request_line: RequestLine::new(Method::Put, "http://farhost/away", Version::Http11),
917            headers: Headers::new(23, false, false),
918            body: Some(Body::new(b"this is another request".to_vec())),
919            files: Vec::new(),
920        };
921        assert_eq!(request_first, expected_request_first);
922        assert_eq!(request_second, expected_request_second);
923    }
924
925    #[test]
926    fn test_try_read_connection_closed() {
927        // Connection abruptly closed.
928        let (mut sender, receiver) = UnixStream::pair().unwrap();
929        receiver.set_nonblocking(true).expect("Can't modify socket");
930        let mut conn = HttpConnection::new(receiver);
931        sender
932            .write_all(
933                b"PATCH http://localhost/home HTTP/1.1\r\n\
934                                 Transfer-Encoding: chunked\r\n\
935                                 Content-Len",
936            )
937            .unwrap();
938
939        conn.try_read().unwrap();
940        sender.shutdown(std::net::Shutdown::Both).unwrap();
941
942        assert_eq!(
943            conn.try_read().unwrap_err(),
944            ConnectionError::ConnectionClosed
945        );
946    }
947
948    #[test]
949    fn test_enqueue_response() {
950        // Response without body.
951        let (sender, mut receiver) = UnixStream::pair().unwrap();
952        receiver.set_nonblocking(true).expect("Can't modify socket");
953        let mut conn = HttpConnection::new(sender);
954
955        let response = Response::new(Version::Http11, StatusCode::OK);
956        let mut expected_response: Vec<u8> = vec![];
957        response.write_all(&mut expected_response).unwrap();
958
959        conn.enqueue_response(response);
960        assert!(conn.try_write().is_ok());
961
962        let mut response_buffer = vec![0u8; expected_response.len()];
963        receiver.read_exact(&mut response_buffer).unwrap();
964        assert_eq!(response_buffer, expected_response);
965
966        // Response with body.
967        let (sender, mut receiver) = UnixStream::pair().unwrap();
968        receiver.set_nonblocking(true).expect("Can't modify socket");
969        let mut conn = HttpConnection::new(sender);
970        let mut response = Response::new(Version::Http11, StatusCode::OK);
971        let mut body: Vec<u8> = vec![];
972        body.write_all(br#"{ "json": "body", "hello": "world" }"#)
973            .unwrap();
974        response.set_body(Body::new(body));
975        let mut expected_response: Vec<u8> = vec![];
976        response.write_all(&mut expected_response).unwrap();
977
978        conn.enqueue_response(response);
979        assert!(conn.try_write().is_ok());
980
981        let mut response_buffer = vec![0u8; expected_response.len()];
982        receiver.read_exact(&mut response_buffer).unwrap();
983        assert_eq!(response_buffer, expected_response);
984    }
985
986    #[test]
987    fn test_try_read_negative_content_len() {
988        // Request with negative `Content-Length` header.
989        let (mut sender, receiver) = UnixStream::pair().unwrap();
990        receiver.set_nonblocking(true).expect("Can't modify socket");
991        let mut conn = HttpConnection::new(receiver);
992        sender
993            .write_all(
994                b"PUT http://localhost/home HTTP/1.1\r\n\
995                                 Content-Length: -1\r\n\r\n",
996            )
997            .unwrap();
998        assert_eq!(
999            conn.try_read().unwrap_err(),
1000            ConnectionError::ParseError(RequestError::HeaderError(HttpHeaderError::InvalidValue(
1001                "Content-Length".to_string(),
1002                " -1".to_string()
1003            )))
1004        );
1005    }
1006
1007    #[test]
1008    fn test_payload_size_limit() {
1009        let (mut sender, receiver) = UnixStream::pair().unwrap();
1010        receiver.set_nonblocking(true).expect("Can't modify socket");
1011        let mut conn = HttpConnection::new(receiver);
1012        conn.set_payload_max_size(5);
1013        sender
1014            .write_all(
1015                b"PUT http://localhost/home HTTP/1.1\r\n\
1016                                 Content-Length: 51200\r\n\r\naaaaaa",
1017            )
1018            .unwrap();
1019        assert_eq!(
1020            conn.try_read().unwrap_err(),
1021            ConnectionError::ParseError(RequestError::SizeLimitExceeded(5, MAX_PAYLOAD_SIZE))
1022        );
1023    }
1024
1025    #[test]
1026    fn test_read_bytes() {
1027        let (mut sender, receiver) = UnixStream::pair().unwrap();
1028        receiver.set_nonblocking(true).expect("Can't modify socket");
1029        let mut conn = HttpConnection::new(receiver);
1030
1031        // Cursor positioned at buffer end. Read should fail.
1032        conn.read_cursor = BUFFER_SIZE;
1033        sender.write_all(b"hello\0").unwrap();
1034        assert_eq!(
1035            conn.read_bytes().unwrap_err(),
1036            ConnectionError::ParseError(RequestError::Overflow)
1037        );
1038
1039        // Cursor positioned before buffer end. Partial read should succeed.
1040        conn.read_cursor = BUFFER_SIZE - 3;
1041        sender.write_all(b"hello\0").unwrap();
1042        assert_eq!(conn.read_bytes(), Ok(BUFFER_SIZE));
1043
1044        // Read the remaining 9 bytes - 3 left from the first "hello" and the 2nd full "hello".
1045        conn.read_cursor = 0;
1046        assert_eq!(conn.read_bytes(), Ok(9));
1047        sender.shutdown(Shutdown::Write).unwrap();
1048        assert_eq!(
1049            conn.read_bytes().unwrap_err(),
1050            ConnectionError::ConnectionClosed
1051        );
1052    }
1053
1054    #[test]
1055    fn test_read_bytes_with_files() {
1056        let (sender, receiver) = UnixStream::pair().unwrap();
1057        receiver.set_nonblocking(true).expect("Can't modify socket");
1058        let mut conn = HttpConnection::new(receiver);
1059
1060        // Create 3 files, edit the content and rewind back to the start.
1061        let mut file1 = TempFile::new().unwrap().into_file();
1062        let mut file2 = TempFile::new().unwrap().into_file();
1063        let mut file3 = TempFile::new().unwrap().into_file();
1064        file1.write_all(b"foo").unwrap();
1065        file1.seek(SeekFrom::Start(0)).unwrap();
1066        file2.write_all(b"bar").unwrap();
1067        file2.seek(SeekFrom::Start(0)).unwrap();
1068        file3.write_all(b"foobar").unwrap();
1069        file3.seek(SeekFrom::Start(0)).unwrap();
1070
1071        // Send 2 file descriptors along with 3 bytes of data.
1072        assert_eq!(
1073            sender.send_with_fds(
1074                &[[1, 2, 3].as_ref()],
1075                &[file1.into_raw_fd(), file2.into_raw_fd()]
1076            ),
1077            Ok(3)
1078        );
1079
1080        // Check we receive the right amount of data along with the right
1081        // amount of file descriptors.
1082        assert_eq!(conn.read_bytes(), Ok(3));
1083        assert_eq!(conn.files.len(), 2);
1084
1085        // Check the content of the data received
1086        assert_eq!(conn.buffer[0], 1);
1087        assert_eq!(conn.buffer[1], 2);
1088        assert_eq!(conn.buffer[2], 3);
1089
1090        // Check the file descriptors are usable by checking the content that
1091        // can be read.
1092        let mut buf = [0; 10];
1093        assert_eq!(conn.files[0].read(&mut buf).unwrap(), 3);
1094        assert_eq!(&buf[..3], b"foo");
1095        assert_eq!(conn.files[1].read(&mut buf).unwrap(), 3);
1096        assert_eq!(&buf[..3], b"bar");
1097
1098        // Send the 3rd file descriptor along with 1 byte of data.
1099        assert_eq!(
1100            sender.send_with_fds(&[[10].as_ref()], &[file3.into_raw_fd()]),
1101            Ok(1)
1102        );
1103
1104        // Check the amount of data along with the amount of file descriptors
1105        // are updated.
1106        assert_eq!(conn.read_bytes(), Ok(1));
1107        assert_eq!(conn.files.len(), 3);
1108
1109        // Check the content of the new data received
1110        assert_eq!(conn.buffer[0], 10);
1111
1112        // Check the latest file descriptor is usable by checking the content
1113        // that can be read.
1114        let mut buf = [0; 10];
1115        assert_eq!(conn.files[2].read(&mut buf).unwrap(), 6);
1116        assert_eq!(&buf[..6], b"foobar");
1117
1118        sender.shutdown(Shutdown::Write).unwrap();
1119        assert_eq!(
1120            conn.read_bytes().unwrap_err(),
1121            ConnectionError::ConnectionClosed
1122        );
1123    }
1124
1125    #[test]
1126    fn test_shift_buffer_left() {
1127        let (_, receiver) = UnixStream::pair().unwrap();
1128        let mut conn = HttpConnection::new(receiver);
1129
1130        assert_eq!(
1131            conn.shift_buffer_left(0, conn.buffer.len() + 1)
1132                .unwrap_err(),
1133            RequestError::Overflow
1134        );
1135        assert_eq!(
1136            conn.shift_buffer_left(1, 0).unwrap_err(),
1137            RequestError::Underflow
1138        );
1139        assert!(conn.shift_buffer_left(1, conn.buffer.len()).is_ok());
1140    }
1141
1142    #[test]
1143    fn test_parse_request_line() {
1144        let (_, receiver) = UnixStream::pair().unwrap();
1145        let mut conn = HttpConnection::new(receiver);
1146
1147        // Error case: end past buffer end.
1148        assert_eq!(
1149            conn.parse_request_line(&mut 0, conn.buffer.len() + 1)
1150                .unwrap_err(),
1151            ConnectionError::ParseError(RequestError::Overflow)
1152        );
1153
1154        // Error case: start is past end.
1155        assert_eq!(
1156            conn.parse_request_line(&mut 1, 0).unwrap_err(),
1157            ConnectionError::ParseError(RequestError::Underflow)
1158        );
1159
1160        // Error case: the request line is longer than BUFFER_SIZE.
1161        assert_eq!(
1162            conn.parse_request_line(&mut 0, BUFFER_SIZE).unwrap_err(),
1163            ConnectionError::ParseError(RequestError::InvalidRequest)
1164        );
1165
1166        // OK case.
1167        assert_eq!(conn.parse_request_line(&mut 1, BUFFER_SIZE), Ok(false));
1168
1169        // Error case: invalid content.
1170        conn.buffer[0..8].copy_from_slice(b"foo\r\nbar");
1171        assert_eq!(
1172            conn.parse_request_line(&mut 0, BUFFER_SIZE).unwrap_err(),
1173            ConnectionError::ParseError(RequestError::InvalidRequest)
1174        );
1175
1176        // OK case.
1177        conn.buffer[0..29].copy_from_slice(b"GET http://foo/bar HTTP/1.1\r\n");
1178        assert_eq!(conn.parse_request_line(&mut 0, BUFFER_SIZE), Ok(true));
1179    }
1180
1181    #[test]
1182    fn test_parse_headers() {
1183        let (_, receiver) = UnixStream::pair().unwrap();
1184        let mut conn = HttpConnection::new(receiver);
1185
1186        // Error case: end_cursor past buffer end.
1187        assert_eq!(
1188            conn.parse_headers(&mut 0, conn.buffer.len() + 1)
1189                .unwrap_err(),
1190            ConnectionError::ParseError(RequestError::Overflow)
1191        );
1192
1193        // Error case: line_start_index is past end_cursor.
1194        assert_eq!(
1195            conn.parse_headers(&mut 1, 0).unwrap_err(),
1196            ConnectionError::ParseError(RequestError::Underflow)
1197        );
1198
1199        // Error case: no request pending.
1200        // CRLF can be at the start of the buffer...
1201        conn.buffer[0] = CR;
1202        conn.buffer[1] = LF;
1203        assert_eq!(
1204            conn.parse_headers(&mut 0, BUFFER_SIZE).unwrap_err(),
1205            ConnectionError::ParseError(RequestError::HeadersWithoutPendingRequest)
1206        );
1207        // ...or somewhere in the middle.
1208        conn.buffer[0] = 0;
1209        conn.buffer[1] = CR;
1210        conn.buffer[2] = LF;
1211        assert_eq!(
1212            conn.parse_headers(&mut 0, BUFFER_SIZE).unwrap_err(),
1213            ConnectionError::ParseError(RequestError::HeadersWithoutPendingRequest)
1214        );
1215
1216        // Error case: invalid header.
1217        conn.pending_request = Some(Request {
1218            request_line: RequestLine::new(Method::Get, "http://foo/bar", Version::Http11),
1219            headers: Headers::new(0, true, true),
1220            body: None,
1221            files: Vec::new(),
1222        });
1223        assert_eq!(
1224            conn.parse_headers(&mut 0, BUFFER_SIZE).unwrap_err(),
1225            ConnectionError::ParseError(RequestError::HeaderError(HttpHeaderError::InvalidFormat(
1226                "\0".to_string()
1227            )))
1228        );
1229
1230        // OK case: incomplete header line.
1231        let hdr = b"Custom-Header-Testing: 1";
1232        conn.buffer[..hdr.len()].copy_from_slice(hdr);
1233        assert_eq!(conn.parse_headers(&mut 0, hdr.len()), Ok(false));
1234
1235        // OK case: complete header line.
1236        let hdr = b"Custom-Header-Testing: 1\r\n";
1237        conn.buffer[..hdr.len()].copy_from_slice(hdr);
1238        assert_eq!(conn.parse_headers(&mut 0, hdr.len()), Ok(true));
1239
1240        // OK case: complete header line, end of header.
1241        let hdr = b"\r\n";
1242        conn.buffer[..hdr.len()].copy_from_slice(hdr);
1243        assert_eq!(conn.parse_headers(&mut 0, hdr.len()), Ok(true));
1244    }
1245
1246    #[test]
1247    fn test_parse_body() {
1248        let (_, receiver) = UnixStream::pair().unwrap();
1249        let mut conn = HttpConnection::new(receiver);
1250
1251        // Error case: end_cursor past buffer end.
1252        assert_eq!(
1253            conn.parse_body(&mut 0usize, conn.buffer.len() + 1)
1254                .unwrap_err(),
1255            ConnectionError::ParseError(RequestError::Overflow)
1256        );
1257
1258        // Error case: line_start_index is past end_cursor.
1259        assert_eq!(
1260            conn.parse_body(&mut 1usize, 0usize).unwrap_err(),
1261            ConnectionError::ParseError(RequestError::Underflow)
1262        );
1263
1264        // OK case: consume the buffer.
1265        conn.body_bytes_to_be_read = 1;
1266        assert_eq!(conn.parse_body(&mut 0usize, 0usize), Ok(false));
1267
1268        // Error case: there's more body to be parsed, but no pending request set.
1269        assert_eq!(
1270            conn.parse_body(&mut 0, BUFFER_SIZE).unwrap_err(),
1271            ConnectionError::ParseError(RequestError::BodyWithoutPendingRequest)
1272        );
1273
1274        // Error case: read more bytes than we should have into the body of the request.
1275        conn.pending_request = Some(Request {
1276            request_line: RequestLine::new(Method::Get, "http://foo/bar", Version::Http11),
1277            headers: Headers::new(0, true, true),
1278            body: None,
1279            files: Vec::new(),
1280        });
1281        conn.body_vec = vec![0xde, 0xad, 0xbe, 0xef];
1282        assert_eq!(
1283            conn.parse_body(&mut 0, BUFFER_SIZE).unwrap_err(),
1284            ConnectionError::ParseError(RequestError::InvalidRequest)
1285        );
1286
1287        // OK case.
1288        conn.body_vec.clear();
1289        assert_eq!(conn.parse_body(&mut 0, BUFFER_SIZE), Ok(true));
1290    }
1291}