solarsail/
lib.rs

1#![forbid(unsafe_code)]
2// #![warn(missing_docs)]
3
4pub mod body;
5mod redirect;
6mod request;
7pub mod response;
8mod result_ext;
9pub mod route;
10
11use std::convert::{Infallible, TryFrom};
12use std::future::Future;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pub use body::Body;
17use headers::HeaderMapExt;
18use http_body::Body as _;
19pub use http_body_util::{BodyExt, Full};
20pub use hyper::body::Bytes;
21use hyper::body::Incoming;
22use hyper::service::Service;
23use hyper_util::rt::TokioExecutor;
24use hyper_util::server::conn::auto;
25use hyper_util::service::TowerToHyperService;
26pub use redirect::Redirect;
27pub use request::{DecodeQueryError, Request, RequestExt};
28pub use response::{IntoResponse, Response};
29pub use result_ext::ResultExt;
30pub use {http, hyper};
31
32#[derive(Clone)]
33pub struct SolarSail<S, H> {
34    state: S,
35    handler: H,
36}
37
38impl<S, H> SolarSail<S, H> {
39    pub fn new(state: S, handler: H) -> Self {
40        SolarSail { state, handler }
41    }
42
43    async fn serve<F>(&self, req: Request) -> Response
44    where
45        S: Clone,
46        H: Fn(S, Request) -> F,
47        F: Future<Output = Response>,
48    {
49        (self.handler)(self.state.clone(), req)
50            .await
51            .into_response()
52    }
53
54    pub async fn run<F>(
55        self,
56        addr: &std::net::SocketAddr,
57    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
58    where
59        S: Clone + Send + Sync + 'static,
60        H: Fn(S, Request) -> F + Clone + Send + Sync + 'static,
61        F: Future<Output = Response> + Send + 'static,
62    {
63        use hyper_util::rt::TokioIo;
64        use tokio::net::TcpListener;
65
66        let listener = TcpListener::bind(addr).await?;
67        println!("Listening on http://{}", addr);
68        loop {
69            let (stream, _) = listener.accept().await?;
70            let service = self.clone();
71            tokio::task::spawn(async move {
72                if let Err(err) = auto::Builder::new(TokioExecutor::new())
73                    .serve_connection(TokioIo::new(stream), service)
74                    .await
75                {
76                    // Ignore certain network errors
77                    if let Some(err) = err.downcast_ref::<hyper::Error>() {
78                        if err.is_incomplete_message() {
79                            return;
80                        }
81                    }
82
83                    #[cfg(not(feature = "tracing"))]
84                    let _ = err;
85
86                    #[cfg(feature = "tracing")]
87                    tracing::error!(%err, "error serving connection");
88                }
89            });
90        }
91    }
92
93    pub async fn run_in<F, L, B>(
94        self,
95        addr: &std::net::SocketAddr,
96        layer: L,
97    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
98    where
99        S: Clone + Send + Sync + 'static,
100        H: Fn(S, Request) -> F + Clone + Send + Sync + 'static,
101        F: Future<Output = Response> + Send + 'static,
102        L: tower_layer::Layer<Self>,
103        L::Service: tower_service::Service<http::Request<Incoming>, Response = http::Response<B>>
104            + Clone
105            + Send
106            + Sync
107            + 'static,
108        <L::Service as tower_service::Service<http::Request<Incoming>>>::Future: Send,
109        <L::Service as tower_service::Service<http::Request<Incoming>>>::Error:
110            std::error::Error + Send + Sync,
111        B: http_body::Body + Send + 'static,
112        B::Data: Send,
113        B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
114    {
115        use hyper_util::rt::TokioIo;
116        use tokio::net::TcpListener;
117
118        let svc = TowerToHyperService::new(layer.layer(self));
119        let listener = TcpListener::bind(addr).await?;
120        println!("Listening on http://{}", addr);
121        loop {
122            let (stream, _) = listener.accept().await?;
123            let svc = svc.clone();
124            tokio::task::spawn(async move {
125                if let Err(err) = auto::Builder::new(TokioExecutor::new())
126                    .serve_connection(TokioIo::new(stream), svc)
127                    .await
128                {
129                    // Ignore certain network errors
130                    if let Some(err) = err.downcast_ref::<hyper::Error>() {
131                        if err.is_incomplete_message() {
132                            return;
133                        }
134                    }
135
136                    #[cfg(not(feature = "tracing"))]
137                    let _ = err;
138
139                    #[cfg(feature = "tracing")]
140                    tracing::error!(%err, "error serving connection");
141                }
142            });
143        }
144    }
145}
146
147type BoxTrySendFuture<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + Send>>;
148
149impl<S, H, F> Service<http::Request<Incoming>> for SolarSail<S, H>
150where
151    S: Clone + Send + Sync + 'static,
152    H: Fn(S, Request) -> F + Clone + Send + Sync + 'static,
153    F: Future<Output = Response> + Send + 'static,
154{
155    type Response = Response;
156    type Error = Infallible;
157    // TODO: get rid of Box?
158    type Future = BoxTrySendFuture<Self::Response, Self::Error>;
159
160    fn call(&self, req: http::Request<Incoming>) -> Self::Future {
161        let (parts, body) = req.into_parts();
162        let body = if body.is_end_stream() {
163            body::Body::empty()
164        } else {
165            body::Body::new(
166                body,
167                parts
168                    .headers
169                    .typed_get()
170                    .map(|h: headers::ContentLength| usize::try_from(h.0).unwrap_or(usize::MAX)),
171                parts.headers.get(http::header::CONTENT_TYPE).cloned(),
172            )
173        };
174
175        let svc = self.clone();
176        let req = Request::from_parts(parts, body);
177        Box::pin(async move { Ok(svc.serve(req).await) })
178    }
179}
180
181impl<S, H, F> tower_service::Service<http::Request<Incoming>> for SolarSail<S, H>
182where
183    S: Clone + Send + Sync + 'static,
184    H: Fn(S, Request) -> F + Clone + Send + Sync + 'static,
185    F: Future<Output = Response> + Send + 'static,
186{
187    type Response = Response;
188    type Error = Infallible;
189    // TODO: get rid of Box?
190    type Future = BoxTrySendFuture<Self::Response, Self::Error>;
191
192    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
193        Poll::Ready(Ok(()))
194    }
195
196    fn call(&mut self, req: http::Request<Incoming>) -> Self::Future {
197        Service::call(&*self, req)
198    }
199}