use std::collections::VecDeque;
use std::fs::File;
use std::io::{Read, Write};
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use crate::common::ascii::{CR, CRLF_LEN, LF};
use crate::common::sock_ctrl_msg::ScmSocket;
use crate::common::Body;
pub use crate::common::{ConnectionError, HttpHeaderError, RequestError};
use crate::headers::Headers;
use crate::request::{find, Request, RequestLine};
use crate::response::{Response, StatusCode};
use crate::server::MAX_PAYLOAD_SIZE;
const BUFFER_SIZE: usize = 1024;
const SCM_MAX_FD: usize = 253;
enum ConnectionState {
WaitingForRequestLine,
WaitingForHeaders,
WaitingForBody,
RequestReady,
}
pub struct HttpConnection<T> {
pending_request: Option<Request>,
stream: T,
state: ConnectionState,
buffer: [u8; BUFFER_SIZE],
read_cursor: usize,
body_vec: Vec<u8>,
body_bytes_to_be_read: u32,
parsed_requests: VecDeque<Request>,
response_queue: VecDeque<Response>,
response_buffer: Option<Vec<u8>>,
files: Vec<File>,
payload_max_size: usize,
}
impl<T> AsRawFd for HttpConnection<T>
where
T: AsRawFd,
{
fn as_raw_fd(&self) -> RawFd {
self.stream.as_raw_fd()
}
}
impl<T: Read + Write + ScmSocket> HttpConnection<T> {
pub fn new(stream: T) -> Self {
Self {
pending_request: None,
stream,
state: ConnectionState::WaitingForRequestLine,
buffer: [0; BUFFER_SIZE],
read_cursor: 0,
body_vec: vec![],
body_bytes_to_be_read: 0,
parsed_requests: VecDeque::new(),
response_queue: VecDeque::new(),
response_buffer: None,
files: Vec::new(),
payload_max_size: MAX_PAYLOAD_SIZE,
}
}
pub fn set_payload_max_size(&mut self, request_payload_max_size: usize) {
self.payload_max_size = request_payload_max_size;
}
pub fn try_read(&mut self) -> Result<(), ConnectionError> {
let end_cursor = self.read_bytes()?;
let mut line_start_index = 0;
loop {
match self.state {
ConnectionState::WaitingForRequestLine => {
if !self.parse_request_line(&mut line_start_index, end_cursor)? {
return Ok(());
}
}
ConnectionState::WaitingForHeaders => {
if !self.parse_headers(&mut line_start_index, end_cursor)? {
return Ok(());
}
}
ConnectionState::WaitingForBody => {
if !self.parse_body(&mut line_start_index, end_cursor)? {
return Ok(());
}
}
ConnectionState::RequestReady => {
self.state = ConnectionState::WaitingForRequestLine;
self.body_bytes_to_be_read = 0;
let mut pending_request = self.pending_request.take().unwrap();
pending_request.files = self.files.drain(..).collect();
self.parsed_requests.push_back(pending_request);
}
};
}
}
fn read_bytes(&mut self) -> Result<usize, ConnectionError> {
if self.read_cursor >= BUFFER_SIZE {
return Err(ConnectionError::ParseError(RequestError::Overflow));
}
let (bytes_read, new_files) = self.recv_with_fds()?;
self.files.extend(new_files);
if bytes_read == 0 {
return Err(ConnectionError::ConnectionClosed);
}
bytes_read
.checked_add(self.read_cursor)
.ok_or(ConnectionError::ParseError(RequestError::Overflow))
}
fn recv_with_fds(&mut self) -> Result<(usize, Vec<File>), ConnectionError> {
let buf = &mut self.buffer[self.read_cursor..];
let mut fds = [0; SCM_MAX_FD];
let mut iovecs = [libc::iovec {
iov_base: buf.as_mut_ptr() as *mut _,
iov_len: buf.len(),
}];
let (read_count, fd_count) = unsafe {
self.stream
.recv_with_fds(&mut iovecs, &mut fds)
.map_err(ConnectionError::StreamReadError)?
};
Ok((
read_count,
fds.iter()
.take(fd_count)
.map(|fd| {
unsafe { File::from_raw_fd(*fd) }
})
.collect(),
))
}
fn parse_request_line(
&mut self,
start: &mut usize,
end: usize,
) -> Result<bool, ConnectionError> {
if end < *start {
return Err(ConnectionError::ParseError(RequestError::Underflow));
}
if end > self.buffer.len() {
return Err(ConnectionError::ParseError(RequestError::Overflow));
}
match find(&self.buffer[*start..end], &[CR, LF]) {
Some(line_end_index) => {
let line = &self.buffer[*start..(*start + line_end_index)];
*start = *start + line_end_index + CRLF_LEN;
self.pending_request = Some(Request {
request_line: RequestLine::try_from(line)
.map_err(ConnectionError::ParseError)?,
headers: Headers::default(),
body: None,
files: Vec::new(),
});
self.state = ConnectionState::WaitingForHeaders;
Ok(true)
}
None => {
if end == BUFFER_SIZE && *start == 0 {
return Err(ConnectionError::ParseError(RequestError::InvalidRequest));
} else {
self.shift_buffer_left(*start, end)
.map_err(ConnectionError::ParseError)?;
}
Ok(false)
}
}
}
fn parse_headers(
&mut self,
line_start_index: &mut usize,
end_cursor: usize,
) -> Result<bool, ConnectionError> {
if end_cursor > self.buffer.len() {
return Err(ConnectionError::ParseError(RequestError::Overflow));
}
if end_cursor < *line_start_index {
return Err(ConnectionError::ParseError(RequestError::Underflow));
}
match find(&self.buffer[*line_start_index..end_cursor], &[CR, LF]) {
Some(0) => {
let request = self
.pending_request
.as_mut()
.ok_or(ConnectionError::ParseError(
RequestError::HeadersWithoutPendingRequest,
))?;
if request.headers.content_length() == 0 {
self.state = ConnectionState::RequestReady;
} else {
if request.headers.content_length() as usize > self.payload_max_size {
return Err(ConnectionError::ParseError(
RequestError::SizeLimitExceeded(
self.payload_max_size,
request.headers.content_length() as usize,
),
));
}
if request.headers.expect() {
let expect_response =
Response::new(request.http_version(), StatusCode::Continue);
self.response_queue.push_back(expect_response);
}
self.body_bytes_to_be_read = request.headers.content_length();
request.body = Some(Body::new(vec![]));
self.state = ConnectionState::WaitingForBody;
}
*line_start_index = line_start_index
.checked_add(CRLF_LEN)
.ok_or(ConnectionError::ParseError(RequestError::Overflow))?;
Ok(true)
}
Some(relative_line_end_index) => {
let request = self
.pending_request
.as_mut()
.ok_or(ConnectionError::ParseError(
RequestError::HeadersWithoutPendingRequest,
))?;
let line_end_index = relative_line_end_index
.checked_add(*line_start_index)
.ok_or(ConnectionError::ParseError(RequestError::Overflow))?;
let line = &self.buffer[*line_start_index..line_end_index];
match request.headers.parse_header_line(line) {
Ok(_)
| Err(RequestError::HeaderError(HttpHeaderError::UnsupportedValue(_, _))) => {}
Err(e) => return Err(ConnectionError::ParseError(e)),
};
*line_start_index = line_end_index
.checked_add(CRLF_LEN)
.ok_or(ConnectionError::ParseError(RequestError::Overflow))?;
Ok(true)
}
None => {
if *line_start_index == 0 && end_cursor == BUFFER_SIZE {
let utf8_string = String::from_utf8_lossy(&self.buffer);
return Err(ConnectionError::ParseError(RequestError::HeaderError(
HttpHeaderError::SizeLimitExceeded(utf8_string.to_string()),
)));
}
self.shift_buffer_left(*line_start_index, end_cursor)
.map_err(ConnectionError::ParseError)?;
Ok(false)
}
}
}
fn parse_body(
&mut self,
line_start_index: &mut usize,
end_cursor: usize,
) -> Result<bool, ConnectionError> {
if end_cursor > self.buffer.len() {
return Err(ConnectionError::ParseError(RequestError::Overflow));
}
let start_to_end = end_cursor
.checked_sub(*line_start_index)
.ok_or(ConnectionError::ParseError(RequestError::Underflow))?
as u32;
if self.body_bytes_to_be_read > start_to_end {
self.body_vec
.extend_from_slice(&self.buffer[*line_start_index..end_cursor]);
self.body_bytes_to_be_read -= start_to_end;
for i in 0..BUFFER_SIZE {
self.buffer[i] = 0;
}
self.read_cursor = 0;
return Ok(false);
}
let line_end = line_start_index
.checked_add(self.body_bytes_to_be_read as usize)
.ok_or(ConnectionError::ParseError(RequestError::Overflow))?;
self.body_vec
.extend_from_slice(&self.buffer[*line_start_index..line_end]);
*line_start_index = line_end;
self.body_bytes_to_be_read = 0;
let request = self
.pending_request
.as_mut()
.ok_or(ConnectionError::ParseError(
RequestError::BodyWithoutPendingRequest,
))?;
let placeholder: Vec<_> = self
.body_vec
.drain(..request.headers.content_length() as usize)
.collect();
request.body = Some(Body::new(placeholder));
if !self.body_vec.is_empty() {
return Err(ConnectionError::ParseError(RequestError::InvalidRequest));
}
self.state = ConnectionState::RequestReady;
Ok(true)
}
pub fn try_write(&mut self) -> Result<(), ConnectionError> {
if self.response_buffer.is_none() {
if let Some(response) = self.response_queue.pop_front() {
let mut response_buffer_vec: Vec<u8> = Vec::new();
response
.write_all(&mut response_buffer_vec)
.map_err(ConnectionError::StreamWriteError)?;
self.response_buffer = Some(response_buffer_vec);
} else {
return Err(ConnectionError::InvalidWrite);
}
}
let mut response_fully_written = false;
let mut connection_closed = false;
if let Some(response_buffer_vec) = self.response_buffer.as_mut() {
let bytes_to_be_written = response_buffer_vec.len();
match self.stream.write(response_buffer_vec.as_slice()) {
Ok(0) => connection_closed = true,
Ok(bytes_written) => {
if bytes_written != bytes_to_be_written {
response_buffer_vec.drain(..bytes_written);
} else {
response_fully_written = true;
}
}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(_) => connection_closed = true,
}
}
if connection_closed {
self.clear_write_buffer();
return Err(ConnectionError::ConnectionClosed);
} else if response_fully_written {
self.response_buffer.take();
}
Ok(())
}
pub fn clear_write_buffer(&mut self) {
self.response_queue.clear();
self.response_buffer.take();
}
pub fn enqueue_response(&mut self, response: Response) {
self.response_queue.push_back(response);
}
fn shift_buffer_left(
&mut self,
line_start_index: usize,
end_cursor: usize,
) -> Result<(), RequestError> {
if end_cursor > self.buffer.len() {
return Err(RequestError::Overflow);
}
let delta_bytes = end_cursor
.checked_sub(line_start_index)
.ok_or(RequestError::Underflow)?;
if line_start_index != 0 {
for cursor in 0..delta_bytes {
self.buffer[cursor] = self.buffer[line_start_index + cursor];
}
for cursor in delta_bytes..end_cursor {
self.buffer[cursor] = 0;
}
}
self.read_cursor = delta_bytes;
Ok(())
}
pub fn pop_parsed_request(&mut self) -> Option<Request> {
self.parsed_requests.pop_front()
}
pub fn has_parsed_requests(&self) -> bool {
!self.parsed_requests.is_empty()
}
pub fn pending_write(&self) -> bool {
self.response_buffer.is_some() || !self.response_queue.is_empty()
}
}
#[cfg(test)]
mod tests {
use std::io::{Seek, SeekFrom};
use std::net::Shutdown;
use std::os::unix::net::UnixStream;
use std::os::unix::prelude::IntoRawFd;
use super::*;
use crate::common::{Method, Version};
use crate::server::MAX_PAYLOAD_SIZE;
use vmm_sys_util::tempfile::TempFile;
#[test]
fn test_try_read_expect() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender
.write_all(
b"PATCH http://localhost/home HTTP/1.1\r\n\
Expect: 100-continue\r\n\
Content-Length: 26\r\n\
Transfer-Encoding: chunked\r\n\r\n",
)
.unwrap();
assert!(conn.try_read().is_ok());
sender.write_all(b"this is not\n\r\na json \nbody").unwrap();
conn.try_read().unwrap();
let request = conn.pop_parsed_request().unwrap();
let expected_request = Request {
request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
headers: Headers::new(26, true, true),
body: Some(Body::new(b"this is not\n\r\na json \nbody".to_vec())),
files: Vec::new(),
};
assert_eq!(request, expected_request);
}
#[test]
fn test_try_read_long_headers() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender
.write_all(
b"PATCH http://localhost/home HTTP/1.1\r\n\
Expect: 100-continue\r\n\
Transfer-Encoding: chunked\r\n",
)
.unwrap();
for i in 0..90 {
sender.write_all(b"Custom-Header-Testing: 1").unwrap();
sender.write_all(i.to_string().as_bytes()).unwrap();
sender.write_all(b"\r\n").unwrap();
}
sender
.write_all(b"Content-Length: 26\r\n\r\nthis is not\n\r\na json \nbody")
.unwrap();
assert!(conn.try_read().is_ok());
assert!(conn.try_read().is_ok());
assert!(conn.try_read().is_ok());
let request = conn.pop_parsed_request().unwrap();
let expected_request = Request {
request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
headers: Headers::new(26, true, true),
body: Some(Body::new(b"this is not\n\r\na json \nbody".to_vec())),
files: Vec::new(),
};
assert_eq!(request, expected_request);
}
#[test]
fn test_try_read_split_ending() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender
.write_all(
b"PATCH http://localhost/home HTTP/1.1\r\n\
Expect: 100-continue\r\n\
Transfer-Encoding: chunked\r\n",
)
.unwrap();
for i in 0..32 {
sender.write_all(b"Custom-Header-Testing: 1").unwrap();
sender.write_all(i.to_string().as_bytes()).unwrap();
sender.write_all(b"\r\n").unwrap();
}
sender
.write_all(b"Head: aaaaa\r\nContent-Length: 26\r\n\r\nthis is not\n\r\na json \nbody")
.unwrap();
assert!(conn.try_read().is_ok());
conn.try_read().unwrap();
let request = conn.pop_parsed_request().unwrap();
let expected_request = Request {
request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
headers: Headers::new(26, true, true),
body: Some(Body::new(b"this is not\n\r\na json \nbody".to_vec())),
files: Vec::new(),
};
assert_eq!(request, expected_request);
}
#[test]
fn test_try_read_invalid_request() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender
.write_all(
b"PATCH http://localhost/home HTTP/1.1\r\n\
Expect: 100-continue\r\n\
Transfer-Encoding: chunked\r\n",
)
.unwrap();
for i in 0..40 {
sender.write_all(b"Custom-Header-Testing: 1").unwrap();
sender.write_all(i.to_string().as_bytes()).unwrap();
sender.write_all(b"\r\n").unwrap();
}
sender
.write_all(b"Content-Length: alpha\r\n\r\nthis is not\n\r\na json \nbody")
.unwrap();
assert!(conn.try_read().is_ok());
let request_error = conn.try_read().unwrap_err();
assert_eq!(
request_error,
ConnectionError::ParseError(RequestError::HeaderError(HttpHeaderError::InvalidValue(
"Content-Length".to_string(),
" alpha".to_string()
)))
);
}
#[test]
fn test_try_read_long_request_body() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender
.write_all(
b"PATCH http://localhost/home HTTP/1.1\r\n\
Expect: 100-continue\r\n\
Transfer-Encoding: chunked\r\n\
Content-Length: 1400\r\n\r\n",
)
.unwrap();
let mut request_body: Vec<u8> = Vec::with_capacity(1400);
for _ in 0..100 {
request_body.write_all(b"This is a test").unwrap();
}
sender.write_all(request_body.as_slice()).unwrap();
assert!(conn.try_read().is_ok());
conn.try_read().unwrap();
let request = conn.pop_parsed_request().unwrap();
let expected_request = Request {
request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
headers: Headers::new(1400, true, true),
body: Some(Body::new(request_body)),
files: Vec::new(),
};
assert_eq!(request, expected_request);
}
#[test]
fn test_try_read_large_req_line() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender.write_all(b"PATCH http://localhost/home").unwrap();
let mut request_body: Vec<u8> = Vec::with_capacity(1400);
for _ in 0..200 {
request_body.write_all(b"/home").unwrap();
}
sender.write_all(request_body.as_slice()).unwrap();
assert_eq!(
conn.try_read().unwrap_err(),
ConnectionError::ParseError(RequestError::InvalidRequest)
);
}
#[test]
fn test_try_read_large_header_line() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender
.write_all(b"PATCH http://localhost/home HTTP/1.1\r\nhead: ")
.unwrap();
let mut request_body: Vec<u8> = Vec::with_capacity(1030);
for _ in 0..86 {
request_body.write_all(b"abcdefghijkl").unwrap();
}
request_body.write_all(b"\r\n\r\n").unwrap();
sender.write_all(request_body.as_slice()).unwrap();
assert!(conn.try_read().is_ok());
let expected_msg = &format!("head: {}", String::from_utf8(request_body).unwrap())[..1024];
assert_eq!(
conn.try_read().unwrap_err(),
ConnectionError::ParseError(RequestError::HeaderError(
HttpHeaderError::SizeLimitExceeded(expected_msg.to_string())
))
);
}
#[test]
fn test_try_read_no_body_request() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender
.write_all(
b"PATCH http://localhost/home HTTP/1.1\r\n\
Expect: 100-continue\r\n\
Transfer-Encoding: chunked\r\n\r\n",
)
.unwrap();
conn.try_read().unwrap();
let request = conn.pop_parsed_request().unwrap();
let expected_request = Request {
request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
headers: Headers::new(0, true, true),
body: None,
files: Vec::new(),
};
assert_eq!(request, expected_request);
}
#[test]
fn test_try_read_segmented_req_line() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender.write_all(b"PATCH http://local").unwrap();
assert!(conn.try_read().is_ok());
sender.write_all(b"host/home HTTP/1.1\r\n\r\n").unwrap();
conn.try_read().unwrap();
let request = conn.pop_parsed_request().unwrap();
let expected_request = Request {
request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
headers: Headers::new(0, false, false),
body: None,
files: Vec::new(),
};
assert_eq!(request, expected_request);
}
#[test]
fn test_try_read_long_req_line_b2b() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender
.write_all(b"PATCH http://localhost/home HTTP/1.1\r\n\r\nPATCH http://localhost/")
.unwrap();
let mut request_line: Vec<u8> = Vec::with_capacity(980);
for _ in 0..98 {
request_line.write_all(b"localhost/").unwrap();
}
request_line.write_all(b" HTTP/1.1\r\n\r\n").unwrap();
sender.write_all(request_line.as_slice()).unwrap();
conn.try_read().unwrap();
let request = conn.pop_parsed_request().unwrap();
let expected_request = Request {
request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
headers: Headers::new(0, false, false),
body: None,
files: Vec::new(),
};
assert_eq!(request, expected_request);
conn.try_read().unwrap();
let request = conn.pop_parsed_request().unwrap();
let mut expected_request_as_bytes = Vec::new();
expected_request_as_bytes
.write_all(b"http://localhost/")
.unwrap();
expected_request_as_bytes.append(request_line.as_mut());
let expected_request = Request {
request_line: RequestLine::new(
Method::Patch,
std::str::from_utf8(&expected_request_as_bytes[..997]).unwrap(),
Version::Http11,
),
headers: Headers::new(0, false, false),
body: None,
files: Vec::new(),
};
assert_eq!(request, expected_request);
}
#[test]
fn test_try_read_double_request() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender
.write_all(
b"PATCH http://localhost/home HTTP/1.1\r\n\
Transfer-Encoding: chunked\r\n\
Content-Length: 26\r\n\r\nthis is not\n\r\na json \nbody",
)
.unwrap();
sender
.write_all(
b"PUT http://farhost/away HTTP/1.1\r\nContent-Length: 23\r\n\r\nthis is another request",
)
.unwrap();
let expected_request_first = Request {
request_line: RequestLine::new(Method::Patch, "http://localhost/home", Version::Http11),
headers: Headers::new(26, false, true),
body: Some(Body::new(b"this is not\n\r\na json \nbody".to_vec())),
files: Vec::new(),
};
conn.try_read().unwrap();
let request_first = conn.pop_parsed_request().unwrap();
let request_second = conn.pop_parsed_request().unwrap();
let expected_request_second = Request {
request_line: RequestLine::new(Method::Put, "http://farhost/away", Version::Http11),
headers: Headers::new(23, false, false),
body: Some(Body::new(b"this is another request".to_vec())),
files: Vec::new(),
};
assert_eq!(request_first, expected_request_first);
assert_eq!(request_second, expected_request_second);
}
#[test]
fn test_try_read_connection_closed() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender
.write_all(
b"PATCH http://localhost/home HTTP/1.1\r\n\
Transfer-Encoding: chunked\r\n\
Content-Len",
)
.unwrap();
conn.try_read().unwrap();
sender.shutdown(std::net::Shutdown::Both).unwrap();
assert_eq!(
conn.try_read().unwrap_err(),
ConnectionError::ConnectionClosed
);
}
#[test]
fn test_enqueue_response() {
let (sender, mut receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(sender);
let response = Response::new(Version::Http11, StatusCode::OK);
let mut expected_response: Vec<u8> = vec![];
response.write_all(&mut expected_response).unwrap();
conn.enqueue_response(response);
assert!(conn.try_write().is_ok());
let mut response_buffer = vec![0u8; expected_response.len()];
receiver.read_exact(&mut response_buffer).unwrap();
assert_eq!(response_buffer, expected_response);
let (sender, mut receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(sender);
let mut response = Response::new(Version::Http11, StatusCode::OK);
let mut body: Vec<u8> = vec![];
body.write_all(br#"{ "json": "body", "hello": "world" }"#)
.unwrap();
response.set_body(Body::new(body));
let mut expected_response: Vec<u8> = vec![];
response.write_all(&mut expected_response).unwrap();
conn.enqueue_response(response);
assert!(conn.try_write().is_ok());
let mut response_buffer = vec![0u8; expected_response.len()];
receiver.read_exact(&mut response_buffer).unwrap();
assert_eq!(response_buffer, expected_response);
}
#[test]
fn test_try_read_negative_content_len() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
sender
.write_all(
b"PUT http://localhost/home HTTP/1.1\r\n\
Content-Length: -1\r\n\r\n",
)
.unwrap();
assert_eq!(
conn.try_read().unwrap_err(),
ConnectionError::ParseError(RequestError::HeaderError(HttpHeaderError::InvalidValue(
"Content-Length".to_string(),
" -1".to_string()
)))
);
}
#[test]
fn test_payload_size_limit() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
conn.set_payload_max_size(5);
sender
.write_all(
b"PUT http://localhost/home HTTP/1.1\r\n\
Content-Length: 51200\r\n\r\naaaaaa",
)
.unwrap();
assert_eq!(
conn.try_read().unwrap_err(),
ConnectionError::ParseError(RequestError::SizeLimitExceeded(5, MAX_PAYLOAD_SIZE))
);
}
#[test]
fn test_read_bytes() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
conn.read_cursor = BUFFER_SIZE;
sender.write_all(b"hello\0").unwrap();
assert_eq!(
conn.read_bytes().unwrap_err(),
ConnectionError::ParseError(RequestError::Overflow)
);
conn.read_cursor = BUFFER_SIZE - 3;
sender.write_all(b"hello\0").unwrap();
assert_eq!(conn.read_bytes(), Ok(BUFFER_SIZE));
conn.read_cursor = 0;
assert_eq!(conn.read_bytes(), Ok(9));
sender.shutdown(Shutdown::Write).unwrap();
assert_eq!(
conn.read_bytes().unwrap_err(),
ConnectionError::ConnectionClosed
);
}
#[test]
fn test_read_bytes_with_files() {
let (sender, receiver) = UnixStream::pair().unwrap();
receiver.set_nonblocking(true).expect("Can't modify socket");
let mut conn = HttpConnection::new(receiver);
let mut file1 = TempFile::new().unwrap().into_file();
let mut file2 = TempFile::new().unwrap().into_file();
let mut file3 = TempFile::new().unwrap().into_file();
file1.write_all(b"foo").unwrap();
file1.seek(SeekFrom::Start(0)).unwrap();
file2.write_all(b"bar").unwrap();
file2.seek(SeekFrom::Start(0)).unwrap();
file3.write_all(b"foobar").unwrap();
file3.seek(SeekFrom::Start(0)).unwrap();
assert_eq!(
sender.send_with_fds(
&[[1, 2, 3].as_ref()],
&[file1.into_raw_fd(), file2.into_raw_fd()]
),
Ok(3)
);
assert_eq!(conn.read_bytes(), Ok(3));
assert_eq!(conn.files.len(), 2);
assert_eq!(conn.buffer[0], 1);
assert_eq!(conn.buffer[1], 2);
assert_eq!(conn.buffer[2], 3);
let mut buf = [0; 10];
assert_eq!(conn.files[0].read(&mut buf).unwrap(), 3);
assert_eq!(&buf[..3], b"foo");
assert_eq!(conn.files[1].read(&mut buf).unwrap(), 3);
assert_eq!(&buf[..3], b"bar");
assert_eq!(
sender.send_with_fds(&[[10].as_ref()], &[file3.into_raw_fd()]),
Ok(1)
);
assert_eq!(conn.read_bytes(), Ok(1));
assert_eq!(conn.files.len(), 3);
assert_eq!(conn.buffer[0], 10);
let mut buf = [0; 10];
assert_eq!(conn.files[2].read(&mut buf).unwrap(), 6);
assert_eq!(&buf[..6], b"foobar");
sender.shutdown(Shutdown::Write).unwrap();
assert_eq!(
conn.read_bytes().unwrap_err(),
ConnectionError::ConnectionClosed
);
}
#[test]
fn test_shift_buffer_left() {
let (_, receiver) = UnixStream::pair().unwrap();
let mut conn = HttpConnection::new(receiver);
assert_eq!(
conn.shift_buffer_left(0, conn.buffer.len() + 1)
.unwrap_err(),
RequestError::Overflow
);
assert_eq!(
conn.shift_buffer_left(1, 0).unwrap_err(),
RequestError::Underflow
);
assert!(conn.shift_buffer_left(1, conn.buffer.len()).is_ok());
}
#[test]
fn test_parse_request_line() {
let (_, receiver) = UnixStream::pair().unwrap();
let mut conn = HttpConnection::new(receiver);
assert_eq!(
conn.parse_request_line(&mut 0, conn.buffer.len() + 1)
.unwrap_err(),
ConnectionError::ParseError(RequestError::Overflow)
);
assert_eq!(
conn.parse_request_line(&mut 1, 0).unwrap_err(),
ConnectionError::ParseError(RequestError::Underflow)
);
assert_eq!(
conn.parse_request_line(&mut 0, BUFFER_SIZE).unwrap_err(),
ConnectionError::ParseError(RequestError::InvalidRequest)
);
assert_eq!(conn.parse_request_line(&mut 1, BUFFER_SIZE), Ok(false));
conn.buffer[0..8].copy_from_slice(b"foo\r\nbar");
assert_eq!(
conn.parse_request_line(&mut 0, BUFFER_SIZE).unwrap_err(),
ConnectionError::ParseError(RequestError::InvalidRequest)
);
conn.buffer[0..29].copy_from_slice(b"GET http://foo/bar HTTP/1.1\r\n");
assert_eq!(conn.parse_request_line(&mut 0, BUFFER_SIZE), Ok(true));
}
#[test]
fn test_parse_headers() {
let (_, receiver) = UnixStream::pair().unwrap();
let mut conn = HttpConnection::new(receiver);
assert_eq!(
conn.parse_headers(&mut 0, conn.buffer.len() + 1)
.unwrap_err(),
ConnectionError::ParseError(RequestError::Overflow)
);
assert_eq!(
conn.parse_headers(&mut 1, 0).unwrap_err(),
ConnectionError::ParseError(RequestError::Underflow)
);
conn.buffer[0] = CR;
conn.buffer[1] = LF;
assert_eq!(
conn.parse_headers(&mut 0, BUFFER_SIZE).unwrap_err(),
ConnectionError::ParseError(RequestError::HeadersWithoutPendingRequest)
);
conn.buffer[0] = 0;
conn.buffer[1] = CR;
conn.buffer[2] = LF;
assert_eq!(
conn.parse_headers(&mut 0, BUFFER_SIZE).unwrap_err(),
ConnectionError::ParseError(RequestError::HeadersWithoutPendingRequest)
);
conn.pending_request = Some(Request {
request_line: RequestLine::new(Method::Get, "http://foo/bar", Version::Http11),
headers: Headers::new(0, true, true),
body: None,
files: Vec::new(),
});
assert_eq!(
conn.parse_headers(&mut 0, BUFFER_SIZE).unwrap_err(),
ConnectionError::ParseError(RequestError::HeaderError(HttpHeaderError::InvalidFormat(
"\0".to_string()
)))
);
let hdr = b"Custom-Header-Testing: 1";
conn.buffer[..hdr.len()].copy_from_slice(hdr);
assert_eq!(conn.parse_headers(&mut 0, hdr.len()), Ok(false));
let hdr = b"Custom-Header-Testing: 1\r\n";
conn.buffer[..hdr.len()].copy_from_slice(hdr);
assert_eq!(conn.parse_headers(&mut 0, hdr.len()), Ok(true));
let hdr = b"\r\n";
conn.buffer[..hdr.len()].copy_from_slice(hdr);
assert_eq!(conn.parse_headers(&mut 0, hdr.len()), Ok(true));
}
#[test]
fn test_parse_body() {
let (_, receiver) = UnixStream::pair().unwrap();
let mut conn = HttpConnection::new(receiver);
assert_eq!(
conn.parse_body(&mut 0usize, conn.buffer.len() + 1)
.unwrap_err(),
ConnectionError::ParseError(RequestError::Overflow)
);
assert_eq!(
conn.parse_body(&mut 1usize, 0usize).unwrap_err(),
ConnectionError::ParseError(RequestError::Underflow)
);
conn.body_bytes_to_be_read = 1;
assert_eq!(conn.parse_body(&mut 0usize, 0usize), Ok(false));
assert_eq!(
conn.parse_body(&mut 0, BUFFER_SIZE).unwrap_err(),
ConnectionError::ParseError(RequestError::BodyWithoutPendingRequest)
);
conn.pending_request = Some(Request {
request_line: RequestLine::new(Method::Get, "http://foo/bar", Version::Http11),
headers: Headers::new(0, true, true),
body: None,
files: Vec::new(),
});
conn.body_vec = vec![0xde, 0xad, 0xbe, 0xef];
assert_eq!(
conn.parse_body(&mut 0, BUFFER_SIZE).unwrap_err(),
ConnectionError::ParseError(RequestError::InvalidRequest)
);
conn.body_vec.clear();
assert_eq!(conn.parse_body(&mut 0, BUFFER_SIZE), Ok(true));
}
}