1use crate::fire::{self, Wood};
2use crate::util::PinnedFuture;
3use crate::{Error, FirePit, Result};
4
5use std::convert::Infallible;
6use std::net::SocketAddr;
7use std::pin::Pin;
8use std::result::Result as StdResult;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12use hyper_util::rt::{TokioExecutor, TokioIo};
13use hyper_util::server::conn::auto::Builder;
14use types::body::BodyHttp;
15
16use hyper::body::{Body, Frame, Incoming, SizeHint};
17use hyper::service::Service;
18use hyper::{Request, Response};
19
20pub type HyperRequest = hyper::Request<HyperBody>;
21
22use tokio::net::TcpListener;
23
24pub(crate) struct Server {
26 listener: TcpListener,
27 wood: Arc<Wood>,
28}
29
30impl Server {
31 pub(crate) async fn bind(
32 addr: SocketAddr,
33 wood: Arc<Wood>,
34 ) -> Result<Self> {
35 let listener = TcpListener::bind(&addr)
36 .await
37 .map_err(Error::from_server_error)?;
38
39 Ok(Self { listener, wood })
40 }
41
42 pub fn local_addr(&self) -> Result<SocketAddr> {
43 self.listener.local_addr().map_err(Error::from_server_error)
44 }
45
46 pub async fn serve(self) -> Result<()> {
47 loop {
48 let (stream, address) = self
49 .listener
50 .accept()
51 .await
52 .map_err(Error::from_server_error)?;
53
54 let io = TokioIo::new(stream);
55
56 let service = FireService {
57 wood: self.wood.clone(),
58 address,
59 };
60
61 tokio::task::spawn(async move {
62 if let Err(err) = Builder::new(TokioExecutor::new())
63 .serve_connection_with_upgrades(io, service)
64 .await
65 {
66 tracing::error!(error = ?err, "Error serving connection: {err}");
67 }
68 });
69 }
70 }
71}
72
73pub struct FireService {
74 wood: Arc<Wood>,
75 address: SocketAddr,
76}
77
78impl FireService {
79 pub fn new(pit: FirePit, address: SocketAddr) -> Self {
81 Self {
82 wood: pit.wood,
83 address,
84 }
85 }
86}
87
88impl Service<Request<Incoming>> for FireService {
89 type Response = Response<BodyHttp>;
90 type Error = Infallible;
91 type Future = PinnedFuture<'static, StdResult<Self::Response, Self::Error>>;
92
93 fn call(&self, req: Request<Incoming>) -> Self::Future {
94 let wood = self.wood.clone();
95 let address = self.address;
96 PinnedFuture::new(async move {
97 fire::route_hyper(&wood, req, address).await
98 })
99 }
100}
101
102#[derive(Debug)]
103pub struct HyperBody {
104 inner: InnerBody,
105}
106
107impl HyperBody {
108 pub fn new() -> Self {
109 Self::default()
110 }
111
112 pub fn take(&mut self) -> Self {
113 std::mem::take(self)
114 }
115}
116
117#[derive(Debug)]
118enum InnerBody {
119 Empty,
120 Incoming(Incoming),
121}
122
123impl Body for HyperBody {
124 type Data = hyper::body::Bytes;
125 type Error = hyper::Error;
126
127 fn poll_frame(
128 self: Pin<&mut Self>,
129 cx: &mut Context<'_>,
130 ) -> Poll<Option<StdResult<Frame<Self::Data>, Self::Error>>> {
131 match &mut self.get_mut().inner {
132 InnerBody::Empty => Poll::Ready(None),
133 InnerBody::Incoming(inc) => Pin::new(inc).poll_frame(cx),
134 }
135 }
136
137 fn is_end_stream(&self) -> bool {
138 match &self.inner {
139 InnerBody::Empty => true,
140 InnerBody::Incoming(inc) => inc.is_end_stream(),
141 }
142 }
143
144 fn size_hint(&self) -> SizeHint {
145 match &self.inner {
146 InnerBody::Empty => SizeHint::default(),
147 InnerBody::Incoming(inc) => inc.size_hint(),
148 }
149 }
150}
151
152impl Default for HyperBody {
153 fn default() -> Self {
154 Self {
155 inner: InnerBody::Empty,
156 }
157 }
158}
159
160impl From<Incoming> for HyperBody {
161 fn from(inc: Incoming) -> Self {
162 Self {
163 inner: InnerBody::Incoming(inc),
164 }
165 }
166}
167
168impl From<HyperBody> for types::Body {
169 fn from(hyper_body: HyperBody) -> Self {
170 match hyper_body.inner {
171 InnerBody::Empty => Self::new(),
172 InnerBody::Incoming(inc) => Self::from(inc),
173 }
174 }
175}