1#![forbid(unsafe_code)]
2pub mod body;
5mod redirect;
6mod request;
7pub mod response;
8mod result_ext;
9pub mod route;
10
11use std::convert::{Infallible, TryFrom};
12use std::future::Future;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pub use body::Body;
17use headers::HeaderMapExt;
18use http_body::Body as _;
19pub use http_body_util::{BodyExt, Full};
20pub use hyper::body::Bytes;
21use hyper::body::Incoming;
22use hyper::service::Service;
23use hyper_util::rt::TokioExecutor;
24use hyper_util::server::conn::auto;
25use hyper_util::service::TowerToHyperService;
26pub use redirect::Redirect;
27pub use request::{DecodeQueryError, Request, RequestExt};
28pub use response::{IntoResponse, Response};
29pub use result_ext::ResultExt;
30pub use {http, hyper};
31
32#[derive(Clone)]
33pub struct SolarSail<S, H> {
34 state: S,
35 handler: H,
36}
37
38impl<S, H> SolarSail<S, H> {
39 pub fn new(state: S, handler: H) -> Self {
40 SolarSail { state, handler }
41 }
42
43 async fn serve<F>(&self, req: Request) -> Response
44 where
45 S: Clone,
46 H: Fn(S, Request) -> F,
47 F: Future<Output = Response>,
48 {
49 (self.handler)(self.state.clone(), req)
50 .await
51 .into_response()
52 }
53
54 pub async fn run<F>(
55 self,
56 addr: &std::net::SocketAddr,
57 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
58 where
59 S: Clone + Send + Sync + 'static,
60 H: Fn(S, Request) -> F + Clone + Send + Sync + 'static,
61 F: Future<Output = Response> + Send + 'static,
62 {
63 use hyper_util::rt::TokioIo;
64 use tokio::net::TcpListener;
65
66 let listener = TcpListener::bind(addr).await?;
67 println!("Listening on http://{}", addr);
68 loop {
69 let (stream, _) = listener.accept().await?;
70 let service = self.clone();
71 tokio::task::spawn(async move {
72 if let Err(err) = auto::Builder::new(TokioExecutor::new())
73 .serve_connection(TokioIo::new(stream), service)
74 .await
75 {
76 if let Some(err) = err.downcast_ref::<hyper::Error>() {
78 if err.is_incomplete_message() {
79 return;
80 }
81 }
82
83 #[cfg(not(feature = "tracing"))]
84 let _ = err;
85
86 #[cfg(feature = "tracing")]
87 tracing::error!(%err, "error serving connection");
88 }
89 });
90 }
91 }
92
93 pub async fn run_in<F, L, B>(
94 self,
95 addr: &std::net::SocketAddr,
96 layer: L,
97 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
98 where
99 S: Clone + Send + Sync + 'static,
100 H: Fn(S, Request) -> F + Clone + Send + Sync + 'static,
101 F: Future<Output = Response> + Send + 'static,
102 L: tower_layer::Layer<Self>,
103 L::Service: tower_service::Service<http::Request<Incoming>, Response = http::Response<B>>
104 + Clone
105 + Send
106 + Sync
107 + 'static,
108 <L::Service as tower_service::Service<http::Request<Incoming>>>::Future: Send,
109 <L::Service as tower_service::Service<http::Request<Incoming>>>::Error:
110 std::error::Error + Send + Sync,
111 B: http_body::Body + Send + 'static,
112 B::Data: Send,
113 B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
114 {
115 use hyper_util::rt::TokioIo;
116 use tokio::net::TcpListener;
117
118 let svc = TowerToHyperService::new(layer.layer(self));
119 let listener = TcpListener::bind(addr).await?;
120 println!("Listening on http://{}", addr);
121 loop {
122 let (stream, _) = listener.accept().await?;
123 let svc = svc.clone();
124 tokio::task::spawn(async move {
125 if let Err(err) = auto::Builder::new(TokioExecutor::new())
126 .serve_connection(TokioIo::new(stream), svc)
127 .await
128 {
129 if let Some(err) = err.downcast_ref::<hyper::Error>() {
131 if err.is_incomplete_message() {
132 return;
133 }
134 }
135
136 #[cfg(not(feature = "tracing"))]
137 let _ = err;
138
139 #[cfg(feature = "tracing")]
140 tracing::error!(%err, "error serving connection");
141 }
142 });
143 }
144 }
145}
146
147type BoxTrySendFuture<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + Send>>;
148
149impl<S, H, F> Service<http::Request<Incoming>> for SolarSail<S, H>
150where
151 S: Clone + Send + Sync + 'static,
152 H: Fn(S, Request) -> F + Clone + Send + Sync + 'static,
153 F: Future<Output = Response> + Send + 'static,
154{
155 type Response = Response;
156 type Error = Infallible;
157 type Future = BoxTrySendFuture<Self::Response, Self::Error>;
159
160 fn call(&self, req: http::Request<Incoming>) -> Self::Future {
161 let (parts, body) = req.into_parts();
162 let body = if body.is_end_stream() {
163 body::Body::empty()
164 } else {
165 body::Body::new(
166 body,
167 parts
168 .headers
169 .typed_get()
170 .map(|h: headers::ContentLength| usize::try_from(h.0).unwrap_or(usize::MAX)),
171 parts.headers.get(http::header::CONTENT_TYPE).cloned(),
172 )
173 };
174
175 let svc = self.clone();
176 let req = Request::from_parts(parts, body);
177 Box::pin(async move { Ok(svc.serve(req).await) })
178 }
179}
180
181impl<S, H, F> tower_service::Service<http::Request<Incoming>> for SolarSail<S, H>
182where
183 S: Clone + Send + Sync + 'static,
184 H: Fn(S, Request) -> F + Clone + Send + Sync + 'static,
185 F: Future<Output = Response> + Send + 'static,
186{
187 type Response = Response;
188 type Error = Infallible;
189 type Future = BoxTrySendFuture<Self::Response, Self::Error>;
191
192 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
193 Poll::Ready(Ok(()))
194 }
195
196 fn call(&mut self, req: http::Request<Incoming>) -> Self::Future {
197 Service::call(&*self, req)
198 }
199}