1use std::collections::VecDeque;
5use std::fs::File;
6use std::io::{Read, Write};
7use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
8
9use crate::common::ascii::{CR, CRLF_LEN, LF};
10use crate::common::sock_ctrl_msg::ScmSocket;
11use crate::common::Body;
12pub use crate::common::{ConnectionError, HttpHeaderError, RequestError};
13use crate::headers::Headers;
14use crate::request::{find, Request, RequestLine};
15use crate::response::{Response, StatusCode};
16use crate::server::MAX_PAYLOAD_SIZE;
17
18const BUFFER_SIZE: usize = 1024;
19const SCM_MAX_FD: usize = 253;
20
21enum ConnectionState {
23 WaitingForRequestLine,
24 WaitingForHeaders,
25 WaitingForBody,
26 RequestReady,
27}
28
29pub struct HttpConnection<T> {
31 pending_request: Option<Request>,
33 stream: T,
36 state: ConnectionState,
39 buffer: [u8; BUFFER_SIZE],
41 read_cursor: usize,
44 body_vec: Vec<u8>,
47 body_bytes_to_be_read: u32,
50 parsed_requests: VecDeque<Request>,
52 response_queue: VecDeque<Response>,
54 response_buffer: Option<Vec<u8>>,
57 files: Vec<File>,
60 payload_max_size: usize,
62}
63
64impl<T> AsRawFd for HttpConnection<T>
65where
66 T: AsRawFd,
67{
68 fn as_raw_fd(&self) -> RawFd {
69 self.stream.as_raw_fd()
70 }
71}
72
73impl<T: Read + Write + ScmSocket> HttpConnection<T> {
75 pub fn new(stream: T) -> Self {
77 Self {
78 pending_request: None,
79 stream,
80 state: ConnectionState::WaitingForRequestLine,
81 buffer: [0; BUFFER_SIZE],
82 read_cursor: 0,
83 body_vec: vec![],
84 body_bytes_to_be_read: 0,
85 parsed_requests: VecDeque::new(),
86 response_queue: VecDeque::new(),
87 response_buffer: None,
88 files: Vec::new(),
89 payload_max_size: MAX_PAYLOAD_SIZE,
90 }
91 }
92
93 pub fn set_payload_max_size(&mut self, request_payload_max_size: usize) {
96 self.payload_max_size = request_payload_max_size;
97 }
98
99 pub fn try_read(&mut self) -> Result<(), ConnectionError> {
108 let end_cursor = self.read_bytes()?;
112
113 let mut line_start_index = 0;
114 loop {
115 match self.state {
116 ConnectionState::WaitingForRequestLine => {
117 if !self.parse_request_line(&mut line_start_index, end_cursor)? {
118 return Ok(());
119 }
120 }
121 ConnectionState::WaitingForHeaders => {
122 if !self.parse_headers(&mut line_start_index, end_cursor)? {
123 return Ok(());
124 }
125 }
126 ConnectionState::WaitingForBody => {
127 if !self.parse_body(&mut line_start_index, end_cursor)? {
128 return Ok(());
129 }
130 }
131 ConnectionState::RequestReady => {
132 self.state = ConnectionState::WaitingForRequestLine;
136 self.body_bytes_to_be_read = 0;
137 let mut pending_request = self.pending_request.take().unwrap();
138 pending_request.files = self.files.drain(..).collect();
139 self.parsed_requests.push_back(pending_request);
140 }
141 };
142 }
143 }
144
145 fn read_bytes(&mut self) -> Result<usize, ConnectionError> {
153 if self.read_cursor >= BUFFER_SIZE {
154 return Err(ConnectionError::ParseError(RequestError::Overflow));
155 }
156 let (bytes_read, new_files) = self.recv_with_fds()?;
159
160 self.files.extend(new_files);
163
164 if bytes_read == 0 {
166 return Err(ConnectionError::ConnectionClosed);
167 }
168 bytes_read
169 .checked_add(self.read_cursor)
170 .ok_or(ConnectionError::ParseError(RequestError::Overflow))
171 }
172
173 fn recv_with_fds(&mut self) -> Result<(usize, Vec<File>), ConnectionError> {
179 let buf = &mut self.buffer[self.read_cursor..];
180 let mut fds = [0; SCM_MAX_FD];
184 let mut iovecs = [libc::iovec {
185 iov_base: buf.as_mut_ptr() as *mut _,
186 iov_len: buf.len(),
187 }];
188
189 let (read_count, fd_count) = unsafe {
190 self.stream
191 .recv_with_fds(&mut iovecs, &mut fds)
192 .map_err(ConnectionError::StreamReadError)?
193 };
194
195 Ok((
196 read_count,
197 fds.iter()
198 .take(fd_count)
199 .map(|fd| {
200 unsafe { File::from_raw_fd(*fd) }
203 })
204 .collect(),
205 ))
206 }
207
208 fn parse_request_line(
214 &mut self,
215 start: &mut usize,
216 end: usize,
217 ) -> Result<bool, ConnectionError> {
218 if end < *start {
219 return Err(ConnectionError::ParseError(RequestError::Underflow));
220 }
221 if end > self.buffer.len() {
222 return Err(ConnectionError::ParseError(RequestError::Overflow));
223 }
224 match find(&self.buffer[*start..end], &[CR, LF]) {
227 Some(line_end_index) => {
228 let line = &self.buffer[*start..(*start + line_end_index)];
232
233 *start = *start + line_end_index + CRLF_LEN;
235
236 self.pending_request = Some(Request {
239 request_line: RequestLine::try_from(line)
240 .map_err(ConnectionError::ParseError)?,
241 headers: Headers::default(),
242 body: None,
243 files: Vec::new(),
244 });
245 self.state = ConnectionState::WaitingForHeaders;
246 Ok(true)
247 }
248 None => {
249 if end == BUFFER_SIZE && *start == 0 {
251 return Err(ConnectionError::ParseError(RequestError::InvalidRequest));
252 } else {
253 self.shift_buffer_left(*start, end)
258 .map_err(ConnectionError::ParseError)?;
259 }
260 Ok(false)
261 }
262 }
263 }
264
265 fn parse_headers(
271 &mut self,
272 line_start_index: &mut usize,
273 end_cursor: usize,
274 ) -> Result<bool, ConnectionError> {
275 if end_cursor > self.buffer.len() {
276 return Err(ConnectionError::ParseError(RequestError::Overflow));
277 }
278 if end_cursor < *line_start_index {
279 return Err(ConnectionError::ParseError(RequestError::Underflow));
280 }
281 match find(&self.buffer[*line_start_index..end_cursor], &[CR, LF]) {
283 Some(0) => {
290 let request = self
293 .pending_request
294 .as_mut()
295 .ok_or(ConnectionError::ParseError(
296 RequestError::HeadersWithoutPendingRequest,
297 ))?;
298 if request.headers.content_length() == 0 {
299 self.state = ConnectionState::RequestReady;
300 } else {
301 if request.headers.content_length() as usize > self.payload_max_size {
302 return Err(ConnectionError::ParseError(
303 RequestError::SizeLimitExceeded(
304 self.payload_max_size,
305 request.headers.content_length() as usize,
306 ),
307 ));
308 }
309 if request.headers.expect() {
310 let expect_response =
312 Response::new(request.http_version(), StatusCode::Continue);
313 self.response_queue.push_back(expect_response);
314 }
315
316 self.body_bytes_to_be_read = request.headers.content_length();
317 request.body = Some(Body::new(vec![]));
318 self.state = ConnectionState::WaitingForBody;
319 }
320
321 *line_start_index = line_start_index
323 .checked_add(CRLF_LEN)
324 .ok_or(ConnectionError::ParseError(RequestError::Overflow))?;
325 Ok(true)
326 }
327 Some(relative_line_end_index) => {
329 let request = self
330 .pending_request
331 .as_mut()
332 .ok_or(ConnectionError::ParseError(
333 RequestError::HeadersWithoutPendingRequest,
334 ))?;
335 let line_end_index = relative_line_end_index
337 .checked_add(*line_start_index)
338 .ok_or(ConnectionError::ParseError(RequestError::Overflow))?;
339
340 let line = &self.buffer[*line_start_index..line_end_index];
345 match request.headers.parse_header_line(line) {
346 Ok(_)
348 | Err(RequestError::HeaderError(HttpHeaderError::UnsupportedValue(_, _))) => {}
349 Err(e) => return Err(ConnectionError::ParseError(e)),
352 };
353
354 *line_start_index = line_end_index
356 .checked_add(CRLF_LEN)
357 .ok_or(ConnectionError::ParseError(RequestError::Overflow))?;
358 Ok(true)
359 }
360 None => {
362 if *line_start_index == 0 && end_cursor == BUFFER_SIZE {
365 let utf8_string = String::from_utf8_lossy(&self.buffer);
367 return Err(ConnectionError::ParseError(RequestError::HeaderError(
368 HttpHeaderError::SizeLimitExceeded(utf8_string.to_string()),
369 )));
370 }
371 self.shift_buffer_left(*line_start_index, end_cursor)
375 .map_err(ConnectionError::ParseError)?;
376 Ok(false)
377 }
378 }
379 }
380
381 fn parse_body(
387 &mut self,
388 line_start_index: &mut usize,
389 end_cursor: usize,
390 ) -> Result<bool, ConnectionError> {
391 if end_cursor > self.buffer.len() {
394 return Err(ConnectionError::ParseError(RequestError::Overflow));
395 }
396 let start_to_end = end_cursor
397 .checked_sub(*line_start_index)
398 .ok_or(ConnectionError::ParseError(RequestError::Underflow))?
399 as u32;
400 if self.body_bytes_to_be_read > start_to_end {
401 self.body_vec
405 .extend_from_slice(&self.buffer[*line_start_index..end_cursor]);
406 self.body_bytes_to_be_read -= start_to_end;
408
409 for i in 0..BUFFER_SIZE {
411 self.buffer[i] = 0;
412 }
413 self.read_cursor = 0;
414
415 return Ok(false);
416 }
417
418 let line_end = line_start_index
420 .checked_add(self.body_bytes_to_be_read as usize)
421 .ok_or(ConnectionError::ParseError(RequestError::Overflow))?;
422 self.body_vec
424 .extend_from_slice(&self.buffer[*line_start_index..line_end]);
425 *line_start_index = line_end;
426 self.body_bytes_to_be_read = 0;
427
428 let request = self
429 .pending_request
430 .as_mut()
431 .ok_or(ConnectionError::ParseError(
432 RequestError::BodyWithoutPendingRequest,
433 ))?;
434 let placeholder: Vec<_> = self
437 .body_vec
438 .drain(..request.headers.content_length() as usize)
439 .collect();
440 request.body = Some(Body::new(placeholder));
441
442 if !self.body_vec.is_empty() {
444 return Err(ConnectionError::ParseError(RequestError::InvalidRequest));
445 }
446
447 self.state = ConnectionState::RequestReady;
448 Ok(true)
449 }
450
451 pub fn try_write(&mut self) -> Result<(), ConnectionError> {
464 if self.response_buffer.is_none() {
465 if let Some(response) = self.response_queue.pop_front() {
466 let mut response_buffer_vec: Vec<u8> = Vec::new();
467 response
468 .write_all(&mut response_buffer_vec)
469 .map_err(ConnectionError::StreamWriteError)?;
470 self.response_buffer = Some(response_buffer_vec);
471 } else {
472 return Err(ConnectionError::InvalidWrite);
473 }
474 }
475
476 let mut response_fully_written = false;
477 let mut connection_closed = false;
478
479 if let Some(response_buffer_vec) = self.response_buffer.as_mut() {
480 let bytes_to_be_written = response_buffer_vec.len();
481 match self.stream.write(response_buffer_vec.as_slice()) {
482 Ok(0) => connection_closed = true,
483 Ok(bytes_written) => {
484 if bytes_written != bytes_to_be_written {
485 response_buffer_vec.drain(..bytes_written);
486 } else {
487 response_fully_written = true;
488 }
489 }
490 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
491 Err(_) => connection_closed = true,
492 }
493 }
494
495 if connection_closed {
496 self.clear_write_buffer();
497 return Err(ConnectionError::ConnectionClosed);
498 } else if response_fully_written {
499 self.response_buffer.take();
500 }
501
502 Ok(())
503 }
504
505 pub fn clear_write_buffer(&mut self) {
507 self.response_queue.clear();
508 self.response_buffer.take();
509 }
510
511 pub fn enqueue_response(&mut self, response: Response) {
513 self.response_queue.push_back(response);
514 }
515
516 fn shift_buffer_left(
517 &mut self,
518 line_start_index: usize,
519 end_cursor: usize,
520 ) -> Result<(), RequestError> {
521 if end_cursor > self.buffer.len() {
522 return Err(RequestError::Overflow);
523 }
524 let delta_bytes = end_cursor
526 .checked_sub(line_start_index)
527 .ok_or(RequestError::Underflow)?;
528 if line_start_index != 0 {
529 for cursor in 0..delta_bytes {
531 self.buffer[cursor] = self.buffer[line_start_index + cursor];
536 }
537
538 for cursor in delta_bytes..end_cursor {
540 self.buffer[cursor] = 0;
541 }
542 }
543
544 self.read_cursor = delta_bytes;
546 Ok(())
547 }
548
549 pub fn pop_parsed_request(&mut self) -> Option<Request> {
552 self.parsed_requests.pop_front()
553 }
554
555 pub fn has_parsed_requests(&self) -> bool {
557 !self.parsed_requests.is_empty()
558 }
559
560 pub fn pending_write(&self) -> bool {
562 self.response_buffer.is_some() || !self.response_queue.is_empty()
563 }
564}
565
566#[cfg(test)]
567mod tests {
568 use std::io::{Seek, SeekFrom};
569 use std::net::Shutdown;
570 use std::os::unix::net::UnixStream;
571 use std::os::unix::prelude::IntoRawFd;
572
573 use super::*;
574 use crate::common::{Method, Version};
575 use crate::server::MAX_PAYLOAD_SIZE;
576
577 use vmm_sys_util::tempfile::TempFile;
578
579 #[test]
580 fn test_try_read_expect() {
581 let (mut sender, receiver) = UnixStream::pair().unwrap();
583 receiver.set_nonblocking(true).expect("Can't modify socket");
584 let mut conn = HttpConnection::new(receiver);
585 sender
586 .write_all(
587 b"PATCH http://localhost/home HTTP/1.1\r\n\
588 Expect: 100-continue\r\n\
589 Content-Length: 26\r\n\
590 Transfer-Encoding: chunked\r\n\r\n",
591 )
592 .unwrap();
593 assert!(conn.try_read().is_ok());
594
595 sender.write_all(b"this is not\n\r\na json \nbody").unwrap();
596 conn.try_read().unwrap();
597 let request = conn.pop_parsed_request().unwrap();
598
599 let expected_request = Request {
600 request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
601 headers: Headers::new(26, true, true),
602 body: Some(Body::new(b"this is not\n\r\na json \nbody".to_vec())),
603 files: Vec::new(),
604 };
605
606 assert_eq!(request, expected_request);
607 }
608
609 #[test]
610 fn test_try_read_long_headers() {
611 let (mut sender, receiver) = UnixStream::pair().unwrap();
613 receiver.set_nonblocking(true).expect("Can't modify socket");
614 let mut conn = HttpConnection::new(receiver);
615 sender
616 .write_all(
617 b"PATCH http://localhost/home HTTP/1.1\r\n\
618 Expect: 100-continue\r\n\
619 Transfer-Encoding: chunked\r\n",
620 )
621 .unwrap();
622
623 for i in 0..90 {
624 sender.write_all(b"Custom-Header-Testing: 1").unwrap();
625 sender.write_all(i.to_string().as_bytes()).unwrap();
626 sender.write_all(b"\r\n").unwrap();
627 }
628 sender
629 .write_all(b"Content-Length: 26\r\n\r\nthis is not\n\r\na json \nbody")
630 .unwrap();
631 assert!(conn.try_read().is_ok());
632 assert!(conn.try_read().is_ok());
633 assert!(conn.try_read().is_ok());
634 let request = conn.pop_parsed_request().unwrap();
635
636 let expected_request = Request {
637 request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
638 headers: Headers::new(26, true, true),
639 body: Some(Body::new(b"this is not\n\r\na json \nbody".to_vec())),
640 files: Vec::new(),
641 };
642 assert_eq!(request, expected_request);
643 }
644
645 #[test]
646 fn test_try_read_split_ending() {
647 let (mut sender, receiver) = UnixStream::pair().unwrap();
649 receiver.set_nonblocking(true).expect("Can't modify socket");
650 let mut conn = HttpConnection::new(receiver);
651 sender
652 .write_all(
653 b"PATCH http://localhost/home HTTP/1.1\r\n\
654 Expect: 100-continue\r\n\
655 Transfer-Encoding: chunked\r\n",
656 )
657 .unwrap();
658
659 for i in 0..32 {
660 sender.write_all(b"Custom-Header-Testing: 1").unwrap();
661 sender.write_all(i.to_string().as_bytes()).unwrap();
662 sender.write_all(b"\r\n").unwrap();
663 }
664 sender
665 .write_all(b"Head: aaaaa\r\nContent-Length: 26\r\n\r\nthis is not\n\r\na json \nbody")
666 .unwrap();
667 assert!(conn.try_read().is_ok());
668 conn.try_read().unwrap();
669 let request = conn.pop_parsed_request().unwrap();
670 let expected_request = Request {
671 request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
672 headers: Headers::new(26, true, true),
673 body: Some(Body::new(b"this is not\n\r\na json \nbody".to_vec())),
674 files: Vec::new(),
675 };
676 assert_eq!(request, expected_request);
677 }
678
679 #[test]
680 fn test_try_read_invalid_request() {
681 let (mut sender, receiver) = UnixStream::pair().unwrap();
683 receiver.set_nonblocking(true).expect("Can't modify socket");
684 let mut conn = HttpConnection::new(receiver);
685 sender
686 .write_all(
687 b"PATCH http://localhost/home HTTP/1.1\r\n\
688 Expect: 100-continue\r\n\
689 Transfer-Encoding: chunked\r\n",
690 )
691 .unwrap();
692
693 for i in 0..40 {
694 sender.write_all(b"Custom-Header-Testing: 1").unwrap();
695 sender.write_all(i.to_string().as_bytes()).unwrap();
696 sender.write_all(b"\r\n").unwrap();
697 }
698 sender
699 .write_all(b"Content-Length: alpha\r\n\r\nthis is not\n\r\na json \nbody")
700 .unwrap();
701 assert!(conn.try_read().is_ok());
702 let request_error = conn.try_read().unwrap_err();
703 assert_eq!(
704 request_error,
705 ConnectionError::ParseError(RequestError::HeaderError(HttpHeaderError::InvalidValue(
706 "Content-Length".to_string(),
707 " alpha".to_string()
708 )))
709 );
710 }
711
712 #[test]
713 fn test_try_read_long_request_body() {
714 let (mut sender, receiver) = UnixStream::pair().unwrap();
716 receiver.set_nonblocking(true).expect("Can't modify socket");
717 let mut conn = HttpConnection::new(receiver);
718 sender
719 .write_all(
720 b"PATCH http://localhost/home HTTP/1.1\r\n\
721 Expect: 100-continue\r\n\
722 Transfer-Encoding: chunked\r\n\
723 Content-Length: 1400\r\n\r\n",
724 )
725 .unwrap();
726
727 let mut request_body: Vec<u8> = Vec::with_capacity(1400);
728 for _ in 0..100 {
729 request_body.write_all(b"This is a test").unwrap();
730 }
731 sender.write_all(request_body.as_slice()).unwrap();
732 assert!(conn.try_read().is_ok());
733 conn.try_read().unwrap();
734 let request = conn.pop_parsed_request().unwrap();
735 let expected_request = Request {
736 request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
737 headers: Headers::new(1400, true, true),
738 body: Some(Body::new(request_body)),
739 files: Vec::new(),
740 };
741
742 assert_eq!(request, expected_request);
743 }
744
745 #[test]
746 fn test_try_read_large_req_line() {
747 let (mut sender, receiver) = UnixStream::pair().unwrap();
749 receiver.set_nonblocking(true).expect("Can't modify socket");
750 let mut conn = HttpConnection::new(receiver);
751 sender.write_all(b"PATCH http://localhost/home").unwrap();
752
753 let mut request_body: Vec<u8> = Vec::with_capacity(1400);
754 for _ in 0..200 {
755 request_body.write_all(b"/home").unwrap();
756 }
757 sender.write_all(request_body.as_slice()).unwrap();
758 assert_eq!(
759 conn.try_read().unwrap_err(),
760 ConnectionError::ParseError(RequestError::InvalidRequest)
761 );
762 }
763
764 #[test]
765 fn test_try_read_large_header_line() {
766 let (mut sender, receiver) = UnixStream::pair().unwrap();
768 receiver.set_nonblocking(true).expect("Can't modify socket");
769 let mut conn = HttpConnection::new(receiver);
770 sender
771 .write_all(b"PATCH http://localhost/home HTTP/1.1\r\nhead: ")
772 .unwrap();
773
774 let mut request_body: Vec<u8> = Vec::with_capacity(1030);
775 for _ in 0..86 {
776 request_body.write_all(b"abcdefghijkl").unwrap();
777 }
778 request_body.write_all(b"\r\n\r\n").unwrap();
779 sender.write_all(request_body.as_slice()).unwrap();
780 assert!(conn.try_read().is_ok());
781
782 let expected_msg = &format!("head: {}", String::from_utf8(request_body).unwrap())[..1024];
783 assert_eq!(
784 conn.try_read().unwrap_err(),
785 ConnectionError::ParseError(RequestError::HeaderError(
786 HttpHeaderError::SizeLimitExceeded(expected_msg.to_string())
787 ))
788 );
789 }
790
791 #[test]
792 fn test_try_read_no_body_request() {
793 let (mut sender, receiver) = UnixStream::pair().unwrap();
795 receiver.set_nonblocking(true).expect("Can't modify socket");
796 let mut conn = HttpConnection::new(receiver);
797 sender
798 .write_all(
799 b"PATCH http://localhost/home HTTP/1.1\r\n\
800 Expect: 100-continue\r\n\
801 Transfer-Encoding: chunked\r\n\r\n",
802 )
803 .unwrap();
804 conn.try_read().unwrap();
805 let request = conn.pop_parsed_request().unwrap();
806 let expected_request = Request {
807 request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
808 headers: Headers::new(0, true, true),
809 body: None,
810 files: Vec::new(),
811 };
812 assert_eq!(request, expected_request);
813 }
814
815 #[test]
816 fn test_try_read_segmented_req_line() {
817 let (mut sender, receiver) = UnixStream::pair().unwrap();
819 receiver.set_nonblocking(true).expect("Can't modify socket");
820 let mut conn = HttpConnection::new(receiver);
821 sender.write_all(b"PATCH http://local").unwrap();
822 assert!(conn.try_read().is_ok());
823
824 sender.write_all(b"host/home HTTP/1.1\r\n\r\n").unwrap();
825
826 conn.try_read().unwrap();
827 let request = conn.pop_parsed_request().unwrap();
828 let expected_request = Request {
829 request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
830 headers: Headers::new(0, false, false),
831 body: None,
832 files: Vec::new(),
833 };
834 assert_eq!(request, expected_request);
835 }
836
837 #[test]
838 fn test_try_read_long_req_line_b2b() {
839 let (mut sender, receiver) = UnixStream::pair().unwrap();
841 receiver.set_nonblocking(true).expect("Can't modify socket");
842 let mut conn = HttpConnection::new(receiver);
843 sender
845 .write_all(b"PATCH http://localhost/home HTTP/1.1\r\n\r\nPATCH http://localhost/")
846 .unwrap();
847
848 let mut request_line: Vec<u8> = Vec::with_capacity(980);
849 for _ in 0..98 {
850 request_line.write_all(b"localhost/").unwrap();
851 }
852 request_line.write_all(b" HTTP/1.1\r\n\r\n").unwrap();
853 sender.write_all(request_line.as_slice()).unwrap();
854
855 conn.try_read().unwrap();
856 let request = conn.pop_parsed_request().unwrap();
857 let expected_request = Request {
858 request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
859 headers: Headers::new(0, false, false),
860 body: None,
861 files: Vec::new(),
862 };
863 assert_eq!(request, expected_request);
864
865 conn.try_read().unwrap();
866 let request = conn.pop_parsed_request().unwrap();
867 let mut expected_request_as_bytes = Vec::new();
868 expected_request_as_bytes
869 .write_all(b"http://localhost/")
870 .unwrap();
871 expected_request_as_bytes.append(request_line.as_mut());
872 let expected_request = Request {
873 request_line: RequestLine::new(
874 Method::Patch,
875 std::str::from_utf8(&expected_request_as_bytes[..997]).unwrap(),
876 Version::Http11,
877 ),
878 headers: Headers::new(0, false, false),
879 body: None,
880 files: Vec::new(),
881 };
882 assert_eq!(request, expected_request);
883 }
884
885 #[test]
886 fn test_try_read_double_request() {
887 let (mut sender, receiver) = UnixStream::pair().unwrap();
889 receiver.set_nonblocking(true).expect("Can't modify socket");
890 let mut conn = HttpConnection::new(receiver);
891 sender
892 .write_all(
893 b"PATCH http://localhost/home HTTP/1.1\r\n\
894 Transfer-Encoding: chunked\r\n\
895 Content-Length: 26\r\n\r\nthis is not\n\r\na json \nbody",
896 )
897 .unwrap();
898 sender
899 .write_all(
900 b"PUT http://farhost/away HTTP/1.1\r\nContent-Length: 23\r\n\r\nthis is another request",
901 )
902 .unwrap();
903
904 let expected_request_first = Request {
905 request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
906 headers: Headers::new(26, false, true),
907 body: Some(Body::new(b"this is not\n\r\na json \nbody".to_vec())),
908 files: Vec::new(),
909 };
910
911 conn.try_read().unwrap();
912 let request_first = conn.pop_parsed_request().unwrap();
913 let request_second = conn.pop_parsed_request().unwrap();
914
915 let expected_request_second = Request {
916 request_line: RequestLine::new(Method::Put, "http://farhost/away", Version::Http11),
917 headers: Headers::new(23, false, false),
918 body: Some(Body::new(b"this is another request".to_vec())),
919 files: Vec::new(),
920 };
921 assert_eq!(request_first, expected_request_first);
922 assert_eq!(request_second, expected_request_second);
923 }
924
925 #[test]
926 fn test_try_read_connection_closed() {
927 let (mut sender, receiver) = UnixStream::pair().unwrap();
929 receiver.set_nonblocking(true).expect("Can't modify socket");
930 let mut conn = HttpConnection::new(receiver);
931 sender
932 .write_all(
933 b"PATCH http://localhost/home HTTP/1.1\r\n\
934 Transfer-Encoding: chunked\r\n\
935 Content-Len",
936 )
937 .unwrap();
938
939 conn.try_read().unwrap();
940 sender.shutdown(std::net::Shutdown::Both).unwrap();
941
942 assert_eq!(
943 conn.try_read().unwrap_err(),
944 ConnectionError::ConnectionClosed
945 );
946 }
947
948 #[test]
949 fn test_enqueue_response() {
950 let (sender, mut receiver) = UnixStream::pair().unwrap();
952 receiver.set_nonblocking(true).expect("Can't modify socket");
953 let mut conn = HttpConnection::new(sender);
954
955 let response = Response::new(Version::Http11, StatusCode::OK);
956 let mut expected_response: Vec<u8> = vec![];
957 response.write_all(&mut expected_response).unwrap();
958
959 conn.enqueue_response(response);
960 assert!(conn.try_write().is_ok());
961
962 let mut response_buffer = vec![0u8; expected_response.len()];
963 receiver.read_exact(&mut response_buffer).unwrap();
964 assert_eq!(response_buffer, expected_response);
965
966 let (sender, mut receiver) = UnixStream::pair().unwrap();
968 receiver.set_nonblocking(true).expect("Can't modify socket");
969 let mut conn = HttpConnection::new(sender);
970 let mut response = Response::new(Version::Http11, StatusCode::OK);
971 let mut body: Vec<u8> = vec![];
972 body.write_all(br#"{ "json": "body", "hello": "world" }"#)
973 .unwrap();
974 response.set_body(Body::new(body));
975 let mut expected_response: Vec<u8> = vec![];
976 response.write_all(&mut expected_response).unwrap();
977
978 conn.enqueue_response(response);
979 assert!(conn.try_write().is_ok());
980
981 let mut response_buffer = vec![0u8; expected_response.len()];
982 receiver.read_exact(&mut response_buffer).unwrap();
983 assert_eq!(response_buffer, expected_response);
984 }
985
986 #[test]
987 fn test_try_read_negative_content_len() {
988 let (mut sender, receiver) = UnixStream::pair().unwrap();
990 receiver.set_nonblocking(true).expect("Can't modify socket");
991 let mut conn = HttpConnection::new(receiver);
992 sender
993 .write_all(
994 b"PUT http://localhost/home HTTP/1.1\r\n\
995 Content-Length: -1\r\n\r\n",
996 )
997 .unwrap();
998 assert_eq!(
999 conn.try_read().unwrap_err(),
1000 ConnectionError::ParseError(RequestError::HeaderError(HttpHeaderError::InvalidValue(
1001 "Content-Length".to_string(),
1002 " -1".to_string()
1003 )))
1004 );
1005 }
1006
1007 #[test]
1008 fn test_payload_size_limit() {
1009 let (mut sender, receiver) = UnixStream::pair().unwrap();
1010 receiver.set_nonblocking(true).expect("Can't modify socket");
1011 let mut conn = HttpConnection::new(receiver);
1012 conn.set_payload_max_size(5);
1013 sender
1014 .write_all(
1015 b"PUT http://localhost/home HTTP/1.1\r\n\
1016 Content-Length: 51200\r\n\r\naaaaaa",
1017 )
1018 .unwrap();
1019 assert_eq!(
1020 conn.try_read().unwrap_err(),
1021 ConnectionError::ParseError(RequestError::SizeLimitExceeded(5, MAX_PAYLOAD_SIZE))
1022 );
1023 }
1024
1025 #[test]
1026 fn test_read_bytes() {
1027 let (mut sender, receiver) = UnixStream::pair().unwrap();
1028 receiver.set_nonblocking(true).expect("Can't modify socket");
1029 let mut conn = HttpConnection::new(receiver);
1030
1031 conn.read_cursor = BUFFER_SIZE;
1033 sender.write_all(b"hello\0").unwrap();
1034 assert_eq!(
1035 conn.read_bytes().unwrap_err(),
1036 ConnectionError::ParseError(RequestError::Overflow)
1037 );
1038
1039 conn.read_cursor = BUFFER_SIZE - 3;
1041 sender.write_all(b"hello\0").unwrap();
1042 assert_eq!(conn.read_bytes(), Ok(BUFFER_SIZE));
1043
1044 conn.read_cursor = 0;
1046 assert_eq!(conn.read_bytes(), Ok(9));
1047 sender.shutdown(Shutdown::Write).unwrap();
1048 assert_eq!(
1049 conn.read_bytes().unwrap_err(),
1050 ConnectionError::ConnectionClosed
1051 );
1052 }
1053
1054 #[test]
1055 fn test_read_bytes_with_files() {
1056 let (sender, receiver) = UnixStream::pair().unwrap();
1057 receiver.set_nonblocking(true).expect("Can't modify socket");
1058 let mut conn = HttpConnection::new(receiver);
1059
1060 let mut file1 = TempFile::new().unwrap().into_file();
1062 let mut file2 = TempFile::new().unwrap().into_file();
1063 let mut file3 = TempFile::new().unwrap().into_file();
1064 file1.write_all(b"foo").unwrap();
1065 file1.seek(SeekFrom::Start(0)).unwrap();
1066 file2.write_all(b"bar").unwrap();
1067 file2.seek(SeekFrom::Start(0)).unwrap();
1068 file3.write_all(b"foobar").unwrap();
1069 file3.seek(SeekFrom::Start(0)).unwrap();
1070
1071 assert_eq!(
1073 sender.send_with_fds(
1074 &[[1, 2, 3].as_ref()],
1075 &[file1.into_raw_fd(), file2.into_raw_fd()]
1076 ),
1077 Ok(3)
1078 );
1079
1080 assert_eq!(conn.read_bytes(), Ok(3));
1083 assert_eq!(conn.files.len(), 2);
1084
1085 assert_eq!(conn.buffer[0], 1);
1087 assert_eq!(conn.buffer[1], 2);
1088 assert_eq!(conn.buffer[2], 3);
1089
1090 let mut buf = [0; 10];
1093 assert_eq!(conn.files[0].read(&mut buf).unwrap(), 3);
1094 assert_eq!(&buf[..3], b"foo");
1095 assert_eq!(conn.files[1].read(&mut buf).unwrap(), 3);
1096 assert_eq!(&buf[..3], b"bar");
1097
1098 assert_eq!(
1100 sender.send_with_fds(&[[10].as_ref()], &[file3.into_raw_fd()]),
1101 Ok(1)
1102 );
1103
1104 assert_eq!(conn.read_bytes(), Ok(1));
1107 assert_eq!(conn.files.len(), 3);
1108
1109 assert_eq!(conn.buffer[0], 10);
1111
1112 let mut buf = [0; 10];
1115 assert_eq!(conn.files[2].read(&mut buf).unwrap(), 6);
1116 assert_eq!(&buf[..6], b"foobar");
1117
1118 sender.shutdown(Shutdown::Write).unwrap();
1119 assert_eq!(
1120 conn.read_bytes().unwrap_err(),
1121 ConnectionError::ConnectionClosed
1122 );
1123 }
1124
1125 #[test]
1126 fn test_shift_buffer_left() {
1127 let (_, receiver) = UnixStream::pair().unwrap();
1128 let mut conn = HttpConnection::new(receiver);
1129
1130 assert_eq!(
1131 conn.shift_buffer_left(0, conn.buffer.len() + 1)
1132 .unwrap_err(),
1133 RequestError::Overflow
1134 );
1135 assert_eq!(
1136 conn.shift_buffer_left(1, 0).unwrap_err(),
1137 RequestError::Underflow
1138 );
1139 assert!(conn.shift_buffer_left(1, conn.buffer.len()).is_ok());
1140 }
1141
1142 #[test]
1143 fn test_parse_request_line() {
1144 let (_, receiver) = UnixStream::pair().unwrap();
1145 let mut conn = HttpConnection::new(receiver);
1146
1147 assert_eq!(
1149 conn.parse_request_line(&mut 0, conn.buffer.len() + 1)
1150 .unwrap_err(),
1151 ConnectionError::ParseError(RequestError::Overflow)
1152 );
1153
1154 assert_eq!(
1156 conn.parse_request_line(&mut 1, 0).unwrap_err(),
1157 ConnectionError::ParseError(RequestError::Underflow)
1158 );
1159
1160 assert_eq!(
1162 conn.parse_request_line(&mut 0, BUFFER_SIZE).unwrap_err(),
1163 ConnectionError::ParseError(RequestError::InvalidRequest)
1164 );
1165
1166 assert_eq!(conn.parse_request_line(&mut 1, BUFFER_SIZE), Ok(false));
1168
1169 conn.buffer[0..8].copy_from_slice(b"foo\r\nbar");
1171 assert_eq!(
1172 conn.parse_request_line(&mut 0, BUFFER_SIZE).unwrap_err(),
1173 ConnectionError::ParseError(RequestError::InvalidRequest)
1174 );
1175
1176 conn.buffer[0..29].copy_from_slice(b"GET http://foo/bar HTTP/1.1\r\n");
1178 assert_eq!(conn.parse_request_line(&mut 0, BUFFER_SIZE), Ok(true));
1179 }
1180
1181 #[test]
1182 fn test_parse_headers() {
1183 let (_, receiver) = UnixStream::pair().unwrap();
1184 let mut conn = HttpConnection::new(receiver);
1185
1186 assert_eq!(
1188 conn.parse_headers(&mut 0, conn.buffer.len() + 1)
1189 .unwrap_err(),
1190 ConnectionError::ParseError(RequestError::Overflow)
1191 );
1192
1193 assert_eq!(
1195 conn.parse_headers(&mut 1, 0).unwrap_err(),
1196 ConnectionError::ParseError(RequestError::Underflow)
1197 );
1198
1199 conn.buffer[0] = CR;
1202 conn.buffer[1] = LF;
1203 assert_eq!(
1204 conn.parse_headers(&mut 0, BUFFER_SIZE).unwrap_err(),
1205 ConnectionError::ParseError(RequestError::HeadersWithoutPendingRequest)
1206 );
1207 conn.buffer[0] = 0;
1209 conn.buffer[1] = CR;
1210 conn.buffer[2] = LF;
1211 assert_eq!(
1212 conn.parse_headers(&mut 0, BUFFER_SIZE).unwrap_err(),
1213 ConnectionError::ParseError(RequestError::HeadersWithoutPendingRequest)
1214 );
1215
1216 conn.pending_request = Some(Request {
1218 request_line: RequestLine::new(Method::Get, "http://foo/bar", Version::Http11),
1219 headers: Headers::new(0, true, true),
1220 body: None,
1221 files: Vec::new(),
1222 });
1223 assert_eq!(
1224 conn.parse_headers(&mut 0, BUFFER_SIZE).unwrap_err(),
1225 ConnectionError::ParseError(RequestError::HeaderError(HttpHeaderError::InvalidFormat(
1226 "\0".to_string()
1227 )))
1228 );
1229
1230 let hdr = b"Custom-Header-Testing: 1";
1232 conn.buffer[..hdr.len()].copy_from_slice(hdr);
1233 assert_eq!(conn.parse_headers(&mut 0, hdr.len()), Ok(false));
1234
1235 let hdr = b"Custom-Header-Testing: 1\r\n";
1237 conn.buffer[..hdr.len()].copy_from_slice(hdr);
1238 assert_eq!(conn.parse_headers(&mut 0, hdr.len()), Ok(true));
1239
1240 let hdr = b"\r\n";
1242 conn.buffer[..hdr.len()].copy_from_slice(hdr);
1243 assert_eq!(conn.parse_headers(&mut 0, hdr.len()), Ok(true));
1244 }
1245
1246 #[test]
1247 fn test_parse_body() {
1248 let (_, receiver) = UnixStream::pair().unwrap();
1249 let mut conn = HttpConnection::new(receiver);
1250
1251 assert_eq!(
1253 conn.parse_body(&mut 0usize, conn.buffer.len() + 1)
1254 .unwrap_err(),
1255 ConnectionError::ParseError(RequestError::Overflow)
1256 );
1257
1258 assert_eq!(
1260 conn.parse_body(&mut 1usize, 0usize).unwrap_err(),
1261 ConnectionError::ParseError(RequestError::Underflow)
1262 );
1263
1264 conn.body_bytes_to_be_read = 1;
1266 assert_eq!(conn.parse_body(&mut 0usize, 0usize), Ok(false));
1267
1268 assert_eq!(
1270 conn.parse_body(&mut 0, BUFFER_SIZE).unwrap_err(),
1271 ConnectionError::ParseError(RequestError::BodyWithoutPendingRequest)
1272 );
1273
1274 conn.pending_request = Some(Request {
1276 request_line: RequestLine::new(Method::Get, "http://foo/bar", Version::Http11),
1277 headers: Headers::new(0, true, true),
1278 body: None,
1279 files: Vec::new(),
1280 });
1281 conn.body_vec = vec![0xde, 0xad, 0xbe, 0xef];
1282 assert_eq!(
1283 conn.parse_body(&mut 0, BUFFER_SIZE).unwrap_err(),
1284 ConnectionError::ParseError(RequestError::InvalidRequest)
1285 );
1286
1287 conn.body_vec.clear();
1289 assert_eq!(conn.parse_body(&mut 0, BUFFER_SIZE), Ok(true));
1290 }
1291}