#[cfg(feature = "runtime")]
mod runtime;
mod future;
mod stream;
use crate::{
join, join_all, Context, Error, Middleware, Next, Request, Response, Result, State,
};
use future::SendFuture;
use http::{Request as HttpRequest, Response as HttpResponse};
use hyper::service::Service;
use hyper::Body as HyperBody;
use hyper::Server;
use std::error::Error as StdError;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::result::Result as StdResult;
use std::sync::Arc;
use std::task::Poll;
use crate::Accept;
use crate::{Executor, Spawn};
pub use stream::AddrStream;
pub struct App<S> {
middleware: Arc<dyn Middleware<S>>,
exec: Executor,
pub(crate) state: S,
}
pub struct HttpService<S> {
middleware: Arc<dyn Middleware<S>>,
remote_addr: SocketAddr,
exec: Executor,
pub(crate) state: S,
}
impl<S: State> App<S> {
pub fn with_exec(state: S, exec: impl 'static + Send + Sync + Spawn) -> Self {
Self {
middleware: Arc::new(join_all(Vec::new())),
exec: Executor(Arc::new(exec)),
state,
}
}
pub fn gate(&mut self, middleware: impl Middleware<S>) -> &mut Self {
self.middleware = Arc::new(join(self.middleware.clone(), middleware));
self
}
pub fn gate_fn<F>(
&mut self,
middleware: impl 'static + Sync + Send + Fn(Context<S>, Next) -> F,
) -> &mut Self
where
F: 'static + Future<Output = Result>,
{
self.gate(middleware)
}
pub fn end<F>(&mut self, endpoint: fn(Context<S>) -> F) -> &mut Self
where
F: 'static + Future<Output = Result>,
{
self.gate(endpoint)
}
pub fn accept<I>(&self, incoming: I) -> Server<I, Self, Executor>
where
I: Accept<Conn = AddrStream>,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
{
Server::builder(incoming)
.executor(self.exec.clone())
.serve(self.clone())
}
#[cfg(test)]
pub fn fake_service(&self) -> HttpService<S> {
let middleware = self.middleware.clone();
let addr = ([127, 0, 0, 1], 0);
let state = self.state.clone();
let exec = self.exec.clone();
HttpService::new(middleware, addr.into(), exec, state)
}
}
macro_rules! impl_poll_ready {
() => {
#[inline]
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<StdResult<(), Self::Error>> {
Poll::Ready(Ok(()))
}
};
}
type AppFuture<S> =
Pin<Box<dyn 'static + Future<Output = std::io::Result<HttpService<S>>> + Send>>;
impl<S: State> Service<&AddrStream> for App<S> {
type Response = HttpService<S>;
type Error = std::io::Error;
type Future = AppFuture<S>;
impl_poll_ready!();
#[inline]
fn call(&mut self, stream: &AddrStream) -> Self::Future {
let middleware = self.middleware.clone();
let addr = stream.remote_addr();
let state = self.state.clone();
let exec = self.exec.clone();
Box::pin(async move { Ok(HttpService::new(middleware, addr, exec, state)) })
}
}
type HttpFuture =
Pin<Box<dyn 'static + Future<Output = Result<HttpResponse<HyperBody>>> + Send>>;
impl<S: State> Service<HttpRequest<HyperBody>> for HttpService<S> {
type Response = HttpResponse<HyperBody>;
type Error = Error;
type Future = HttpFuture;
impl_poll_ready!();
#[inline]
fn call(&mut self, req: HttpRequest<HyperBody>) -> Self::Future {
let service = self.clone();
Box::pin(async move {
let serve_future = SendFuture(Box::pin(service.serve(req.into())));
Ok(serve_future.await?.into())
})
}
}
impl<S: State> HttpService<S> {
pub fn new(
middleware: Arc<dyn Middleware<S>>,
remote_addr: SocketAddr,
exec: Executor,
state: S,
) -> Self {
Self {
middleware,
remote_addr,
exec,
state,
}
}
pub async fn serve(self, req: Request) -> Result<Response> {
let Self {
middleware,
remote_addr,
exec,
state,
} = self;
let mut context = Context::new(req, state, exec, remote_addr);
if let Err(err) = middleware.end(unsafe { context.unsafe_clone() }).await {
context.resp_mut().status = err.status_code;
if err.expose && !err.need_throw() {
context.resp_mut().write(err.message);
} else if err.expose && err.need_throw() {
context.resp_mut().write(err.message.clone());
return Err(err);
} else if err.need_throw() {
return Err(err);
}
}
Ok(std::mem::take(&mut *context.resp_mut()))
}
}
impl<S: State> Clone for App<S> {
fn clone(&self) -> Self {
Self {
middleware: self.middleware.clone(),
exec: self.exec.clone(),
state: self.state.clone(),
}
}
}
impl<S: State> Clone for HttpService<S> {
fn clone(&self) -> Self {
Self {
middleware: self.middleware.clone(),
state: self.state.clone(),
exec: self.exec.clone(),
remote_addr: self.remote_addr,
}
}
}
#[cfg(all(test, feature = "runtime"))]
mod tests {
use crate::{App, Request};
use http::StatusCode;
use std::time::Instant;
#[async_std::test]
async fn gate_simple() -> Result<(), Box<dyn std::error::Error>> {
let service = App::new(())
.gate_fn(|_ctx, next| async move {
let inbound = Instant::now();
next.await?;
println!("time elapsed: {} ms", inbound.elapsed().as_millis());
Ok(())
})
.fake_service();
let resp = service.serve(Request::default()).await?;
assert_eq!(StatusCode::OK, resp.status);
Ok(())
}
}