use std::collections::HashMap;
use std::io::{ErrorKind, Read, Write};
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::Path;
use std::thread::sleep;
use std::time::Duration;
use mio::unix::SourceFd;
use mio::{Events, Interest, Poll, Token};
use crate::common::sock_ctrl_msg::ScmSocket;
use crate::common::{Body, ConnectionError, ServerError, Version};
use crate::connection::HttpConnection;
use crate::request::Request;
use crate::response::{Response, StatusCode};
static SERVER_FULL_ERROR_MESSAGE: &[u8] = b"HTTP/1.1 503\r\n\
Server: Firecracker API\r\n\
Connection: close\r\n\
Content-Length: 40\r\n\r\n{ \"error\": \"Too many open connections\" }";
#[cfg(target_os = "linux")]
const MAX_CONNECTIONS: usize = 256;
#[cfg(not(target_os = "linux"))]
const MAX_CONNECTIONS: usize = 10;
const MAX_EVENTS: usize = 64;
pub(crate) const MAX_PAYLOAD_SIZE: usize = 51200;
type Result<T> = std::result::Result<T, ServerError>;
#[derive(Debug)]
pub struct ServerRequest {
pub request: Request,
id: Token,
}
impl ServerRequest {
pub fn new(request: Request, id: Token) -> Self {
Self { request, id }
}
pub fn inner(&self) -> &Request {
&self.request
}
pub fn process<F>(&self, mut callable: F) -> ServerResponse
where
F: FnMut(&Request) -> Response,
{
let http_response = callable(self.inner());
ServerResponse::new(http_response, self.id)
}
}
#[derive(Debug)]
pub struct ServerResponse {
response: Response,
id: Token,
}
impl ServerResponse {
fn new(response: Response, id: Token) -> Self {
Self { response, id }
}
}
#[derive(PartialOrd, PartialEq)]
enum ClientConnectionState {
AwaitingIncoming,
AwaitingOutgoing,
Closed,
}
struct ClientConnection<T> {
connection: HttpConnection<T>,
state: ClientConnectionState,
in_flight_response_count: u32,
}
impl<T: Read + Write + ScmSocket> ClientConnection<T> {
fn new(connection: HttpConnection<T>) -> Self {
Self {
connection,
state: ClientConnectionState::AwaitingIncoming,
in_flight_response_count: 0,
}
}
fn read(&mut self) -> Result<Vec<Request>> {
let mut parsed_requests = vec![];
let mut retry_limit = 32;
'out: loop {
match self.connection.try_read() {
Err(ConnectionError::ConnectionClosed) => {
self.state = ClientConnectionState::Closed;
return Ok(vec![]);
}
Err(ConnectionError::StreamReadError(inner)) => {
#[cfg(target_os = "linux")]
if inner.errno() == libc::EAGAIN && retry_limit > 0 {
sleep(Duration::from_micros(20));
retry_limit -= 1;
continue;
}
let mut internal_error_response =
Response::new(Version::Http11, StatusCode::InternalServerError);
internal_error_response.set_body(Body::new(inner.to_string()));
self.connection.enqueue_response(internal_error_response);
break;
}
Err(ConnectionError::ParseError(inner)) => {
while let Some(_discarded_request) = self.connection.pop_parsed_request() {}
let mut error_response = Response::new(Version::Http11, StatusCode::BadRequest);
error_response.set_body(Body::new(format!(
"{{ \"error\": \"{}\nAll previous unanswered requests will be dropped.\" }}",
inner
)));
self.connection.enqueue_response(error_response);
break;
}
Err(ConnectionError::InvalidWrite) | Err(ConnectionError::StreamWriteError(_)) => {
unreachable!();
}
Ok(()) => {
if self.connection.has_parsed_requests() {
while let Some(request) = self.connection.pop_parsed_request() {
parsed_requests.push(request);
}
break 'out;
}
}
}
}
self.in_flight_response_count = self
.in_flight_response_count
.checked_add(parsed_requests.len() as u32)
.ok_or(ServerError::Overflow)?;
if self.connection.pending_write() {
self.state = ClientConnectionState::AwaitingOutgoing;
}
Ok(parsed_requests)
}
fn write(&mut self) -> Result<()> {
while self.state != ClientConnectionState::Closed {
match self.connection.try_write() {
Err(ConnectionError::ConnectionClosed)
| Err(ConnectionError::StreamWriteError(_)) => {
self.state = ClientConnectionState::Closed;
}
Err(ConnectionError::InvalidWrite) => {
return Err(ServerError::ConnectionError(ConnectionError::InvalidWrite));
}
_ => {
if !self.connection.pending_write() {
self.state = ClientConnectionState::AwaitingIncoming;
break;
}
}
}
}
Ok(())
}
fn enqueue_response(&mut self, response: Response) -> Result<()> {
if self.state != ClientConnectionState::Closed {
self.connection.enqueue_response(response);
}
self.in_flight_response_count = self
.in_flight_response_count
.checked_sub(1)
.ok_or(ServerError::Underflow)?;
Ok(())
}
fn clear_write_buffer(&mut self) {
self.connection.clear_write_buffer();
}
fn is_done(&self) -> bool {
self.state == ClientConnectionState::Closed
&& !self.connection.pending_write()
&& self.in_flight_response_count == 0
}
fn close(&mut self) {
self.clear_write_buffer();
self.state = ClientConnectionState::Closed;
}
}
pub struct HttpServer {
socket: UnixListener,
poll: Poll,
connections: HashMap<Token, ClientConnection<UnixStream>>,
payload_max_size: usize,
}
impl HttpServer {
pub fn new<P: AsRef<Path>>(path_to_socket: P) -> Result<Self> {
let socket = UnixListener::bind(path_to_socket).map_err(ServerError::IOError)?;
Self::new_from_socket(socket)
}
pub fn new_from_fd(socket_fd: RawFd) -> Result<Self> {
let socket = unsafe { UnixListener::from_raw_fd(socket_fd) };
Self::new_from_socket(socket)
}
fn new_from_socket(socket: UnixListener) -> Result<Self> {
socket.set_nonblocking(true).map_err(ServerError::IOError)?;
let poll = Poll::new().map_err(ServerError::IOError)?;
Ok(HttpServer {
socket,
poll,
connections: HashMap::new(),
payload_max_size: MAX_PAYLOAD_SIZE,
})
}
pub fn set_payload_max_size(&mut self, request_payload_max_size: usize) {
self.payload_max_size = request_payload_max_size;
}
pub fn start_server(&mut self) -> Result<()> {
Self::epoll_add(
&self.poll,
Token(self.socket.as_raw_fd() as usize),
self.socket.as_raw_fd(),
)
}
fn poll_events(&mut self, events: &mut Events) -> Result<()> {
loop {
if let Err(e) = self.poll.poll(events, None) {
if e.kind() == ErrorKind::Interrupted || e.kind() == ErrorKind::WouldBlock {
continue;
}
return Err(ServerError::IOError(e));
}
return Ok(());
}
}
pub fn requests(&mut self) -> Result<Vec<ServerRequest>> {
let mut parsed_requests: Vec<ServerRequest> = vec![];
let mut events = mio::Events::with_capacity(MAX_EVENTS);
self.poll_events(&mut events)?;
for e in events.iter() {
match e.token() {
Token(fd) if fd == self.socket.as_raw_fd() as usize => {
match self.handle_new_connection() {
Err(ServerError::ServerFull) => {
self.socket
.accept()
.map_err(ServerError::IOError)
.and_then(move |(mut stream, _)| {
stream
.write(SERVER_FULL_ERROR_MESSAGE)
.map_err(ServerError::IOError)
})?;
}
Err(error) => return Err(error),
Ok(()) => {}
}
}
t => {
let client_connection = self.connections.get_mut(&t).unwrap();
if e.is_error() || e.is_read_closed() || e.is_write_closed() {
client_connection.close();
continue;
}
if e.is_readable() {
parsed_requests.append(
&mut client_connection
.read()?
.into_iter()
.map(|request| ServerRequest::new(request, e.token()))
.collect(),
);
if client_connection.state == ClientConnectionState::AwaitingOutgoing {
Self::epoll_mod(
&self.poll,
client_connection.connection.as_raw_fd(),
t,
Interest::WRITABLE,
)?;
}
} else if e.is_writable() {
client_connection.write()?;
if client_connection.state == ClientConnectionState::AwaitingIncoming {
Self::epoll_mod(
&self.poll,
client_connection.connection.as_raw_fd(),
t,
Interest::READABLE,
)?;
}
}
}
}
}
let epoll = &self.poll;
self.connections.retain(|_token, client_connection| {
if client_connection.is_done() {
Self::epoll_del(epoll, client_connection.connection.as_raw_fd()).unwrap();
false
} else {
true
}
});
Ok(parsed_requests)
}
pub fn flush_outgoing_writes(&mut self) {
for (_, connection) in self.connections.iter_mut() {
while connection.state == ClientConnectionState::AwaitingOutgoing {
if let Err(e) = connection.write() {
if let ServerError::ConnectionError(ConnectionError::InvalidWrite) = e {
}
break;
}
}
}
}
pub fn epoll(&self) -> &Poll {
&self.poll
}
pub fn enqueue_responses(&mut self, responses: Vec<ServerResponse>) -> Result<()> {
for response in responses {
self.respond(response)?;
}
Ok(())
}
pub fn respond(&mut self, response: ServerResponse) -> Result<()> {
if let Some(client_connection) = self.connections.get_mut(&response.id) {
if ClientConnectionState::AwaitingIncoming == client_connection.state {
client_connection.state = ClientConnectionState::AwaitingOutgoing;
Self::epoll_mod(
&self.poll,
client_connection.connection.as_raw_fd(),
response.id,
Interest::WRITABLE,
)?;
}
client_connection.enqueue_response(response.response)?;
}
Ok(())
}
fn handle_new_connection(&mut self) -> Result<()> {
if self.connections.len() == MAX_CONNECTIONS {
return Err(ServerError::ServerFull);
}
loop {
if let Err(e) = self
.socket
.accept()
.and_then(|(stream, _)| stream.set_nonblocking(true).map(|_| stream))
.and_then(|stream| {
let raw_fd = stream.as_raw_fd();
let token = Token(raw_fd as usize);
self.poll.registry().register(
&mut SourceFd(&raw_fd),
token,
Interest::READABLE,
)?;
let mut conn = HttpConnection::new(stream);
conn.set_payload_max_size(self.payload_max_size);
self.connections.insert(token, ClientConnection::new(conn));
Ok(())
})
{
if e.kind() == ErrorKind::Interrupted {
continue;
}
if e.kind() == ErrorKind::WouldBlock {
break;
}
return Err(ServerError::IOError(e));
}
}
Ok(())
}
fn epoll_mod(epoll: &Poll, stream_fd: RawFd, token: Token, evset: Interest) -> Result<()> {
epoll
.registry()
.reregister(&mut SourceFd(&stream_fd), token, evset)
.map_err(ServerError::IOError)
}
fn epoll_add(poll: &Poll, token: Token, stream_fd: RawFd) -> Result<()> {
poll.registry()
.register(&mut SourceFd(&stream_fd), token, Interest::READABLE)
.map_err(ServerError::IOError)
}
fn epoll_del(poll: &Poll, stream_fd: RawFd) -> Result<()> {
poll.registry()
.deregister(&mut SourceFd(&stream_fd))
.map_err(ServerError::IOError)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{Read, Write};
use std::net::Shutdown;
use std::os::unix::net::UnixStream;
use crate::common::Body;
use vmm_sys_util::tempfile::TempFile;
fn get_temp_socket_file() -> TempFile {
let mut path_to_socket = TempFile::new().unwrap();
path_to_socket.remove().unwrap();
path_to_socket
}
#[test]
fn test_wait_one_connection() {
let path_to_socket = get_temp_socket_file();
let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
server.start_server().unwrap();
let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
assert!(server.requests().unwrap().is_empty());
socket
.write_all(
b"PATCH /machine-config HTTP/1.1\r\n\
Content-Length: 13\r\n\
Content-Type: application/json\r\n\r\nwhatever body",
)
.unwrap();
let mut req_vec = server.requests().unwrap();
let server_request = req_vec.remove(0);
server
.respond(server_request.process(|_request| {
let mut response = Response::new(Version::Http11, StatusCode::OK);
let response_body = b"response body";
response.set_body(Body::new(response_body.to_vec()));
response
}))
.unwrap();
assert!(server.requests().unwrap().is_empty());
let mut buf: [u8; 1024] = [0; 1024];
assert!(socket.read(&mut buf[..]).unwrap() > 0);
}
#[test]
fn test_large_payload() {
let path_to_socket = get_temp_socket_file();
let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
server.start_server().unwrap();
let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
assert!(server.requests().unwrap().is_empty());
let mut packets = String::from(
"PATCH /machine-config HTTP/1.1\r\n\
Content-Length: 1028\r\n\
Content-Type: application/json\r\n\r\n",
);
for i in 0..1028 {
packets.push_str(&i.to_string());
}
socket.write_all(packets.as_bytes()).unwrap();
let mut req_vec = server.requests().unwrap();
let server_request = req_vec.remove(0);
server
.respond(server_request.process(|_request| {
let mut response = Response::new(Version::Http11, StatusCode::OK);
let response_body = b"response body";
response.set_body(Body::new(response_body.to_vec()));
response
}))
.unwrap();
assert!(server.requests().unwrap().is_empty());
let mut buf: [u8; 1024] = [0; 1024];
assert!(socket.read(&mut buf[..]).unwrap() > 0);
}
#[test]
fn test_connection_size_limit_exceeded() {
let path_to_socket = get_temp_socket_file();
let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
server.start_server().unwrap();
let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
assert!(server.requests().unwrap().is_empty());
socket
.write_all(
b"PATCH /machine-config HTTP/1.1\r\n\
Content-Length: 51201\r\n\
Content-Type: application/json\r\n\r\naaaaa",
)
.unwrap();
assert!(server.requests().unwrap().is_empty());
assert!(server.requests().unwrap().is_empty());
let mut buf: [u8; 265] = [0; 265];
assert!(socket.read(&mut buf[..]).unwrap() > 0);
let error_message = b"HTTP/1.1 400 \r\n\
Server: Firecracker API\r\n\
Connection: keep-alive\r\n\
Content-Type: application/json\r\n\
Content-Length: 149\r\n\r\n{ \"error\": \"\
Request payload with size 51201 is larger than \
the limit of 51200 allowed by server.\nAll \
previous unanswered requests will be dropped.";
assert_eq!(&buf[..], &error_message[..]);
}
#[test]
fn test_set_payload_size() {
let path_to_socket = get_temp_socket_file();
let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
server.start_server().unwrap();
server.set_payload_max_size(4);
let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
assert!(server.requests().unwrap().is_empty());
socket
.write_all(
b"PATCH /machine-config HTTP/1.1\r\n\
Content-Length: 5\r\n\
Content-Type: application/json\r\n\r\naaaaa",
)
.unwrap();
assert!(server.requests().unwrap().is_empty());
assert!(server.requests().unwrap().is_empty());
let mut buf: [u8; 260] = [0; 260];
assert!(socket.read(&mut buf[..]).unwrap() > 0);
let error_message = b"HTTP/1.1 400 \r\n\
Server: Firecracker API\r\n\
Connection: keep-alive\r\n\
Content-Type: application/json\r\n\
Content-Length: 141\r\n\r\n{ \"error\": \"\
Request payload with size 5 is larger than the \
limit of 4 allowed by server.\nAll previous \
unanswered requests will be dropped.\" }";
assert_eq!(&buf[..], &error_message[..]);
}
#[test]
fn test_wait_one_fd_connection() {
use std::os::unix::io::IntoRawFd;
let path_to_socket = get_temp_socket_file();
let socket_listener = UnixListener::bind(path_to_socket.as_path()).unwrap();
let socket_fd = socket_listener.into_raw_fd();
let mut server = HttpServer::new_from_fd(socket_fd).unwrap();
server.start_server().unwrap();
let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
assert!(server.requests().unwrap().is_empty());
socket
.write_all(
b"PATCH /machine-config HTTP/1.1\r\n\
Content-Length: 13\r\n\
Content-Type: application/json\r\n\r\nwhatever body",
)
.unwrap();
let mut req_vec = server.requests().unwrap();
let server_request = req_vec.remove(0);
server
.respond(server_request.process(|request| {
assert_eq!(
std::str::from_utf8(&request.body.as_ref().unwrap().body).unwrap(),
"whatever body"
);
let mut response = Response::new(Version::Http11, StatusCode::OK);
let response_body = b"response body";
response.set_body(Body::new(response_body.to_vec()));
response
}))
.unwrap();
assert!(server.requests().unwrap().is_empty());
let mut buf: [u8; 1024] = [0; 1024];
assert!(socket.read(&mut buf[..]).unwrap() > 0);
assert!(String::from_utf8_lossy(&buf).contains("response body"));
}
#[test]
fn test_wait_concurrent_connections() {
let path_to_socket = get_temp_socket_file();
let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
server.start_server().unwrap();
let mut first_socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
assert!(server.requests().unwrap().is_empty());
first_socket
.write_all(
b"PATCH /machine-config HTTP/1.1\r\n\
Content-Length: 13\r\n\
Content-Type: application/json\r\n\r\nwhatever body",
)
.unwrap();
let mut second_socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
let mut req_vec = server.requests().unwrap();
let server_request = req_vec.remove(0);
server
.respond(server_request.process(|_request| {
let mut response = Response::new(Version::Http11, StatusCode::OK);
let response_body = b"response body";
response.set_body(Body::new(response_body.to_vec()));
response
}))
.unwrap();
second_socket
.write_all(
b"GET /machine-config HTTP/1.1\r\n\
Content-Type: application/json\r\n\r\n",
)
.unwrap();
let mut req_vec = server.requests().unwrap();
let second_server_request = req_vec.remove(0);
assert_eq!(
second_server_request.request,
Request::try_from(
b"GET /machine-config HTTP/1.1\r\n\
Content-Type: application/json\r\n\r\n",
None
)
.unwrap()
);
let mut buf: [u8; 1024] = [0; 1024];
assert!(first_socket.read(&mut buf[..]).unwrap() > 0);
first_socket.shutdown(std::net::Shutdown::Both).unwrap();
server
.respond(second_server_request.process(|_request| {
let mut response = Response::new(Version::Http11, StatusCode::OK);
let response_body = b"response second body";
response.set_body(Body::new(response_body.to_vec()));
response
}))
.unwrap();
assert!(server.requests().unwrap().is_empty());
let mut buf: [u8; 1024] = [0; 1024];
assert!(second_socket.read(&mut buf[..]).unwrap() > 0);
second_socket.shutdown(std::net::Shutdown::Both).unwrap();
assert!(server.requests().unwrap().is_empty());
}
#[test]
fn test_wait_expect_connection() {
let path_to_socket = get_temp_socket_file();
let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
server.start_server().unwrap();
let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
assert!(server.requests().unwrap().is_empty());
socket
.write_all(
b"PATCH /machine-config HTTP/1.1\r\n\
Content-Length: 13\r\n\
Expect: 100-continue\r\n\r\n",
)
.unwrap();
let req_vec = server.requests().unwrap();
assert!(req_vec.is_empty());
let req_vec = server.requests().unwrap();
assert!(req_vec.is_empty());
let mut buf: [u8; 1024] = [0; 1024];
assert!(socket.read(&mut buf[..]).unwrap() > 0);
socket.write_all(b"whatever body").unwrap();
let mut req_vec = server.requests().unwrap();
let server_request = req_vec.remove(0);
server
.respond(server_request.process(|_request| {
let mut response = Response::new(Version::Http11, StatusCode::OK);
let response_body = b"response body";
response.set_body(Body::new(response_body.to_vec()));
response
}))
.unwrap();
let req_vec = server.requests().unwrap();
assert!(req_vec.is_empty());
let mut buf: [u8; 1024] = [0; 1024];
assert!(socket.read(&mut buf[..]).unwrap() > 0);
}
#[test]
fn test_wait_many_connections() {
let path_to_socket = get_temp_socket_file();
let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
server.start_server().unwrap();
let mut sockets: Vec<UnixStream> = Vec::with_capacity(MAX_CONNECTIONS + 1);
for _ in 0..MAX_CONNECTIONS {
sockets.push(UnixStream::connect(path_to_socket.as_path()).unwrap());
assert!(server.requests().unwrap().is_empty());
}
sockets.push(UnixStream::connect(path_to_socket.as_path()).unwrap());
assert!(server.requests().unwrap().is_empty());
let mut buf: [u8; 120] = [0; 120];
sockets[MAX_CONNECTIONS].read_exact(&mut buf).unwrap();
assert_eq!(&buf[..], SERVER_FULL_ERROR_MESSAGE);
assert_eq!(server.connections.len(), MAX_CONNECTIONS);
{
let _refused_stream = sockets.pop().unwrap();
}
assert_eq!(server.connections.len(), MAX_CONNECTIONS);
let sock: &UnixStream = sockets.get(0).unwrap();
sock.shutdown(Shutdown::Both).unwrap();
assert!(server.requests().unwrap().is_empty());
assert_eq!(server.connections.len(), MAX_CONNECTIONS - 1);
{
let _sock = sockets.pop().unwrap();
}
assert!(server.requests().unwrap().is_empty());
assert_eq!(server.connections.len(), MAX_CONNECTIONS - 2);
let sock: &UnixStream = sockets.get(1).unwrap();
sock.shutdown(Shutdown::Read).unwrap();
sock.shutdown(Shutdown::Write).unwrap();
assert!(server.requests().unwrap().is_empty());
assert_eq!(server.connections.len(), MAX_CONNECTIONS - 3);
}
#[test]
fn test_wait_parse_error() {
let path_to_socket = get_temp_socket_file();
let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
server.start_server().unwrap();
let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
socket.set_nonblocking(true).unwrap();
assert!(server.requests().unwrap().is_empty());
socket
.write_all(
b"PATCH /machine-config HTTP/1.1\r\n\
Content-Length: alpha\r\n\
Content-Type: application/json\r\n\r\nwhatever body",
)
.unwrap();
assert!(server.requests().unwrap().is_empty());
assert!(server.requests().unwrap().is_empty());
let mut buf: [u8; 255] = [0; 255];
assert!(socket.read(&mut buf[..]).unwrap() > 0);
let error_message = b"HTTP/1.1 400 \r\n\
Server: Firecracker API\r\n\
Connection: keep-alive\r\n\
Content-Type: application/json\r\n\
Content-Length: 136\r\n\r\n{ \"error\": \"Invalid header. \
Reason: Invalid value. Key:Content-Length; Value: alpha\nAll previous unanswered requests will be dropped.\" }";
assert_eq!(&buf[..], &error_message[..]);
socket
.write_all(
b"PATCH /machine-config HTTP/1.1\r\n\
Content-Length: alpha\r\n\
Content-Type: application/json\r\n\r\nwhatever body",
)
.unwrap();
assert!(server.requests().unwrap().is_empty());
assert!(server.requests().unwrap().is_empty());
}
#[test]
fn test_wait_in_flight_responses() {
let path_to_socket = get_temp_socket_file();
let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
server.start_server().unwrap();
let mut first_socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
assert!(server.requests().unwrap().is_empty());
first_socket
.write_all(
b"PATCH /machine-config HTTP/1.1\r\n\
Content-Length: 13\r\n\
Content-Type: application/json\r\n\r\nwhatever body",
)
.unwrap();
let mut req_vec = server.requests().unwrap();
let server_request = req_vec.remove(0);
first_socket.shutdown(std::net::Shutdown::Both).unwrap();
assert!(server.requests().unwrap().is_empty());
assert_eq!(server.connections.len(), 1);
let mut second_socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
second_socket.set_nonblocking(true).unwrap();
assert!(server.requests().unwrap().is_empty());
assert_eq!(server.connections.len(), 2);
server
.enqueue_responses(vec![server_request.process(|_request| {
let mut response = Response::new(Version::Http11, StatusCode::OK);
let response_body = b"response body";
response.set_body(Body::new(response_body.to_vec()));
response
})])
.unwrap();
assert_eq!(server.connections.len(), 2);
let mut buf: [u8; 1024] = [0; 1024];
assert!(second_socket.read(&mut buf[..]).is_err());
second_socket
.write_all(
b"GET /machine-config HTTP/1.1\r\n\
Content-Type: application/json\r\n\r\n",
)
.unwrap();
let mut req_vec = server.requests().unwrap();
let second_server_request = req_vec.remove(0);
assert_eq!(server.connections.len(), 1);
assert_eq!(
second_server_request.request,
Request::try_from(
b"GET /machine-config HTTP/1.1\r\n\
Content-Type: application/json\r\n\r\n",
None
)
.unwrap()
);
server
.respond(second_server_request.process(|_request| {
let mut response = Response::new(Version::Http11, StatusCode::OK);
let response_body = b"response second body";
response.set_body(Body::new(response_body.to_vec()));
response
}))
.unwrap();
assert!(server.requests().unwrap().is_empty());
let mut buf: [u8; 1024] = [0; 1024];
assert!(second_socket.read(&mut buf[..]).unwrap() > 0);
second_socket.shutdown(std::net::Shutdown::Both).unwrap();
assert!(server.requests().is_ok());
}
#[test]
fn test_wait_two_messages() {
let path_to_socket = get_temp_socket_file();
let mut server = HttpServer::new(path_to_socket.as_path()).unwrap();
server.start_server().unwrap();
let mut socket = UnixStream::connect(path_to_socket.as_path()).unwrap();
assert!(server.requests().unwrap().is_empty());
socket
.write_all(
b"PATCH /machine-config HTTP/1.1\r\n\
Content-Length: 13\r\n\
Content-Type: application/json\r\n\r\nwhatever body",
)
.unwrap();
let mut req_vec = server.requests().unwrap();
let server_request = req_vec.remove(0);
socket
.write_all(
b"PATCH /machine-config HTTP/1.1\r\n\
Content-Length: 13\r\n\
Content-Type: application/json\r\n\r\nwhatever body",
)
.unwrap();
server
.respond(server_request.process(|_request| {
let mut response = Response::new(Version::Http11, StatusCode::OK);
let response_body = b"response body";
response.set_body(Body::new(response_body.to_vec()));
response
}))
.unwrap();
assert!(server.requests().unwrap().is_empty());
let mut buf: [u8; 1024] = [0; 1024];
assert!(socket.read(&mut buf[..]).unwrap() > 0);
let mut req_vec = server.requests().unwrap();
let server_request = req_vec.remove(0);
server
.respond(server_request.process(|_request| {
let mut response = Response::new(Version::Http11, StatusCode::OK);
let response_body = b"response body";
response.set_body(Body::new(response_body.to_vec()));
response
}))
.unwrap();
assert!(server.requests().unwrap().is_empty());
let mut buf: [u8; 1024] = [0; 1024];
assert!(socket.read(&mut buf[..]).unwrap() > 0);
}
}