#![allow(unknown_lints)]
use {
super::{
input::Input,
output::{Output, Receive},
},
crate::CritError,
cookie::Cookie,
futures::{Future, Poll},
http::{
header::{COOKIE, SET_COOKIE},
Request, Response,
},
hyper::body::Payload,
std::{collections::HashMap, mem},
tsukuyomi_service::{MakeService, Service},
};
#[derive(Debug)]
pub struct Server<S, Rt = tokio::runtime::Runtime> {
make_service: S,
runtime: Rt,
}
impl<S, Rt> Server<S, Rt>
where
S: MakeService<(), Request<hyper::Body>>,
{
pub fn new(make_service: S, runtime: Rt) -> Self {
Self {
make_service,
runtime,
}
}
}
#[derive(Debug)]
#[allow(explicit_outlives_requirements)]
pub struct Session<'a, S, Rt: 'a> {
service: S,
cookies: Option<HashMap<String, String>>,
runtime: &'a mut Rt,
}
impl<'a, S, Rt> Session<'a, S, Rt>
where
S: Service<Request<hyper::Body>>,
{
fn new(service: S, runtime: &'a mut Rt) -> Self {
Session {
service,
runtime,
cookies: None,
}
}
pub fn save_cookies(mut self, enabled: bool) -> Self {
if enabled {
self.cookies.get_or_insert_with(Default::default);
} else {
self.cookies.take();
}
self
}
pub fn cookie(&self, name: &str) -> Option<&str> {
self.cookies.as_ref()?.get(name).map(|s| s.as_str())
}
pub fn runtime(&mut self) -> &mut Rt {
&mut *self.runtime
}
fn build_request<T>(&self, input: T) -> crate::Result<Request<hyper::Body>>
where
T: Input,
{
let mut request = input.build_request()?;
if let Some(cookies) = &self.cookies {
for (k, v) in cookies {
request.headers_mut().append(
COOKIE,
Cookie::new(k.to_owned(), v.to_owned())
.to_string()
.parse()?,
);
}
}
Ok(request)
}
fn handle_set_cookies(&mut self, response: &Response<Output>) -> crate::Result<()> {
if let Some(ref mut cookies) = &mut self.cookies {
for set_cookie in response.headers().get_all(SET_COOKIE) {
let cookie = Cookie::parse_encoded(set_cookie.to_str()?)?;
if cookie.value().is_empty() {
cookies.remove(cookie.name());
} else {
cookies.insert(cookie.name().to_owned(), cookie.value().to_owned());
}
}
}
Ok(())
}
}
mod threadpool {
use {
super::*,
std::panic::{resume_unwind, AssertUnwindSafe},
tokio::runtime::Runtime,
};
fn block_on<F>(runtime: &mut Runtime, future: F) -> Result<F::Item, F::Error>
where
F: Future + Send + 'static,
F::Item: Send + 'static,
F::Error: Send + 'static,
{
match runtime.block_on(AssertUnwindSafe(future).catch_unwind()) {
Ok(result) => result,
Err(err) => resume_unwind(Box::new(err)),
}
}
impl<S, Bd> Server<S, Runtime>
where
S: MakeService<(), Request<hyper::Body>, Response = Response<Bd>>,
Bd: Payload,
S::Error: Into<CritError>,
S::Future: Send + 'static,
S::MakeError: Into<CritError> + Send + 'static,
S::Service: Send + 'static,
{
pub fn new_session(&mut self) -> crate::Result<Session<'_, S::Service, Runtime>> {
let service = block_on(
&mut self.runtime,
self.make_service.make_service(()).map_err(Into::into),
)
.map_err(failure::Error::from_boxed_compat)?;
Ok(Session::new(service, &mut self.runtime))
}
pub fn perform<T>(&mut self, input: T) -> crate::Result<Response<Output>>
where
T: Input,
<S::Service as Service<Request<hyper::Body>>>::Future: Send + 'static,
{
let mut session = self.new_session()?;
session.perform(input)
}
}
impl<'a, S, Bd> Session<'a, S, Runtime>
where
S: Service<Request<hyper::Body>, Response = Response<Bd>>,
Bd: Payload,
S::Error: Into<CritError>,
S::Future: Send + 'static,
{
pub fn perform<T>(&mut self, input: T) -> crate::Result<Response<Output>>
where
T: Input,
{
let request = self.build_request(input)?;
let future = TestResponseFuture::Initial(self.service.call(request));
let response =
block_on(&mut self.runtime, future).map_err(failure::Error::from_boxed_compat)?;
self.handle_set_cookies(&response)?;
Ok(response)
}
}
}
mod current_thread {
use {super::*, tokio::runtime::current_thread::Runtime};
impl<S, Bd> Server<S, Runtime>
where
S: MakeService<(), Request<hyper::Body>, Response = Response<Bd>>,
Bd: Payload,
S::Error: Into<CritError>,
S::MakeError: Into<CritError>,
{
pub fn new_session(&mut self) -> crate::Result<Session<'_, S::Service, Runtime>> {
let service = self
.runtime
.block_on(self.make_service.make_service(()))
.map_err(|err| failure::Error::from_boxed_compat(err.into()))?;
Ok(Session::new(service, &mut self.runtime))
}
pub fn perform<T>(&mut self, input: T) -> crate::Result<Response<Output>>
where
T: Input,
{
let mut session = self.new_session()?;
session.perform(input)
}
}
impl<'a, S, Bd> Session<'a, S, Runtime>
where
S: Service<Request<hyper::Body>, Response = Response<Bd>>,
Bd: Payload,
S::Error: Into<CritError>,
{
pub fn perform<T>(&mut self, input: T) -> crate::Result<Response<Output>>
where
T: Input,
{
let request = self.build_request(input)?;
let future = TestResponseFuture::Initial(self.service.call(request));
let response = self
.runtime
.block_on(future)
.map_err(failure::Error::from_boxed_compat)?;
self.handle_set_cookies(&response)?;
Ok(response)
}
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum TestResponseFuture<F, Bd: Payload> {
Initial(F),
Receive(http::response::Parts, Receive<Bd>),
Done,
}
impl<F, Bd> Future for TestResponseFuture<F, Bd>
where
F: Future<Item = Response<Bd>>,
F::Error: Into<CritError>,
Bd: Payload,
{
type Item = Response<Output>;
type Error = CritError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
use self::TestResponseFuture::*;
loop {
let response = match *self {
Initial(ref mut f) => {
let response = futures::try_ready!(f.poll().map_err(Into::into));
Some(response)
}
Receive(_, ref mut receive) => {
futures::try_ready!(receive.poll_ready().map_err(Into::into));
None
}
_ => unreachable!("unexpected state"),
};
match mem::replace(self, TestResponseFuture::Done) {
TestResponseFuture::Initial(..) => {
let response = response.expect("unexpected condition");
let (parts, body) = response.into_parts();
let receive = self::Receive::new(body);
*self = TestResponseFuture::Receive(parts, receive);
}
TestResponseFuture::Receive(parts, receive) => {
let data = receive.into_data().expect("unexpected condition");
let response = Response::from_parts(parts, data);
return Ok(response.into());
}
_ => unreachable!("unexpected state"),
}
}
}
}