use crate::client::{HttpResponse, HttpResponseBody};
use futures_util::FutureExt;
use futures_util::future::BoxFuture;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tokio::task::{JoinHandle, JoinSet};
pub(crate) type ResponseFn =
Box<dyn FnOnce(Request<Incoming>) -> BoxFuture<'static, HttpResponse> + Send>;
pub(crate) struct MockServer {
responses: Arc<Mutex<VecDeque<ResponseFn>>>,
shutdown: oneshot::Sender<()>,
handle: JoinHandle<()>,
url: String,
}
impl MockServer {
pub(crate) async fn new() -> Self {
let responses: Arc<Mutex<VecDeque<ResponseFn>>> =
Arc::new(Mutex::new(VecDeque::with_capacity(10)));
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let listener = TcpListener::bind(addr).await.unwrap();
let (shutdown, mut rx) = oneshot::channel::<()>();
let url = format!("http://{}", listener.local_addr().unwrap());
let r = Arc::clone(&responses);
let handle = tokio::spawn(async move {
let mut set = JoinSet::new();
loop {
let (stream, _) = tokio::select! {
conn = listener.accept() => conn.unwrap(),
_ = &mut rx => break,
};
let r = Arc::clone(&r);
set.spawn(async move {
let _ = http1::Builder::new()
.serve_connection(
TokioIo::new(stream),
service_fn(move |req| {
let r = Arc::clone(&r);
let next = r.lock().pop_front();
async move {
Ok::<_, Infallible>(match next {
Some(r) => r(req).await,
None => HttpResponse::new("Hello World".to_string().into()),
})
}
}),
)
.await;
});
}
set.abort_all();
});
Self {
responses,
shutdown,
handle,
url,
}
}
pub(crate) fn url(&self) -> &str {
&self.url
}
pub(crate) fn push<B: Into<HttpResponseBody>>(&self, response: Response<B>) {
let resp = response.map(Into::into);
self.push_fn(|_| resp)
}
pub(crate) fn push_fn<F, B>(&self, f: F)
where
F: FnOnce(Request<Incoming>) -> Response<B> + Send + 'static,
B: Into<HttpResponseBody>,
{
let f = Box::new(|req| async move { f(req).map(Into::into) }.boxed());
self.responses.lock().push_back(f)
}
pub(crate) fn push_async_fn<F, Fut>(&self, f: F)
where
F: FnOnce(Request<Incoming>) -> Fut + Send + 'static,
Fut: Future<Output = Response<String>> + Send + 'static,
{
let f = Box::new(|r| f(r).map(|b| b.map(Into::into)).boxed());
self.responses.lock().push_back(f)
}
pub(crate) async fn shutdown(self) {
let _ = self.shutdown.send(());
self.handle.await.unwrap()
}
}