#![forbid(unsafe_code)]
pub mod body;
mod redirect;
mod request;
pub mod response;
mod result_ext;
pub mod route;
use std::convert::{Infallible, TryFrom};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub use body::Body;
use headers::HeaderMapExt;
use http_body::Body as _;
pub use http_body_util::{BodyExt, Full};
pub use hyper::body::Bytes;
use hyper::body::Incoming;
use hyper::service::Service;
use hyper_util::rt::TokioExecutor;
use hyper_util::server::conn::auto;
use hyper_util::service::TowerToHyperService;
pub use redirect::Redirect;
pub use request::{DecodeQueryError, Request, RequestExt};
pub use response::{IntoResponse, Response};
pub use result_ext::ResultExt;
pub use {http, hyper};
#[derive(Clone)]
pub struct SolarSail<S, H> {
state: S,
handler: H,
}
impl<S, H> SolarSail<S, H> {
pub fn new(state: S, handler: H) -> Self {
SolarSail { state, handler }
}
async fn serve<F>(&self, req: Request) -> Response
where
S: Clone,
H: Fn(S, Request) -> F,
F: Future<Output = Response>,
{
(self.handler)(self.state.clone(), req)
.await
.into_response()
}
pub async fn run<F>(
self,
addr: &std::net::SocketAddr,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
S: Clone + Send + Sync + 'static,
H: Fn(S, Request) -> F + Clone + Send + Sync + 'static,
F: Future<Output = Response> + Send + 'static,
{
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
let listener = TcpListener::bind(addr).await?;
println!("Listening on http://{}", addr);
loop {
let (stream, _) = listener.accept().await?;
let service = self.clone();
tokio::task::spawn(async move {
if let Err(err) = auto::Builder::new(TokioExecutor::new())
.serve_connection(TokioIo::new(stream), service)
.await
{
if let Some(err) = err.downcast_ref::<hyper::Error>() {
if err.is_incomplete_message() {
return;
}
}
#[cfg(not(feature = "tracing"))]
let _ = err;
#[cfg(feature = "tracing")]
tracing::error!(%err, "error serving connection");
}
});
}
}
pub async fn run_in<F, L, B>(
self,
addr: &std::net::SocketAddr,
layer: L,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
S: Clone + Send + Sync + 'static,
H: Fn(S, Request) -> F + Clone + Send + Sync + 'static,
F: Future<Output = Response> + Send + 'static,
L: tower_layer::Layer<Self>,
L::Service: tower_service::Service<http::Request<Incoming>, Response = http::Response<B>>
+ Clone
+ Send
+ Sync
+ 'static,
<L::Service as tower_service::Service<http::Request<Incoming>>>::Future: Send,
<L::Service as tower_service::Service<http::Request<Incoming>>>::Error:
std::error::Error + Send + Sync,
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
{
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
let svc = TowerToHyperService::new(layer.layer(self));
let listener = TcpListener::bind(addr).await?;
println!("Listening on http://{}", addr);
loop {
let (stream, _) = listener.accept().await?;
let svc = svc.clone();
tokio::task::spawn(async move {
if let Err(err) = auto::Builder::new(TokioExecutor::new())
.serve_connection(TokioIo::new(stream), svc)
.await
{
if let Some(err) = err.downcast_ref::<hyper::Error>() {
if err.is_incomplete_message() {
return;
}
}
#[cfg(not(feature = "tracing"))]
let _ = err;
#[cfg(feature = "tracing")]
tracing::error!(%err, "error serving connection");
}
});
}
}
}
type BoxTrySendFuture<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + Send>>;
impl<S, H, F> Service<http::Request<Incoming>> for SolarSail<S, H>
where
S: Clone + Send + Sync + 'static,
H: Fn(S, Request) -> F + Clone + Send + Sync + 'static,
F: Future<Output = Response> + Send + 'static,
{
type Response = Response;
type Error = Infallible;
type Future = BoxTrySendFuture<Self::Response, Self::Error>;
fn call(&self, req: http::Request<Incoming>) -> Self::Future {
let (parts, body) = req.into_parts();
let body = if body.is_end_stream() {
body::Body::empty()
} else {
body::Body::new(
body,
parts
.headers
.typed_get()
.map(|h: headers::ContentLength| usize::try_from(h.0).unwrap_or(usize::MAX)),
parts.headers.get(http::header::CONTENT_TYPE).cloned(),
)
};
let svc = self.clone();
let req = Request::from_parts(parts, body);
Box::pin(async move { Ok(svc.serve(req).await) })
}
}
impl<S, H, F> tower_service::Service<http::Request<Incoming>> for SolarSail<S, H>
where
S: Clone + Send + Sync + 'static,
H: Fn(S, Request) -> F + Clone + Send + Sync + 'static,
F: Future<Output = Response> + Send + 'static,
{
type Response = Response;
type Error = Infallible;
type Future = BoxTrySendFuture<Self::Response, Self::Error>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<Incoming>) -> Self::Future {
Service::call(&*self, req)
}
}