fire_http/
server.rs

1use crate::fire::{self, Wood};
2use crate::util::PinnedFuture;
3use crate::{Error, FirePit, Result};
4
5use std::convert::Infallible;
6use std::net::SocketAddr;
7use std::pin::Pin;
8use std::result::Result as StdResult;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12use hyper_util::rt::{TokioExecutor, TokioIo};
13use hyper_util::server::conn::auto::Builder;
14use types::body::BodyHttp;
15
16use hyper::body::{Body, Frame, Incoming, SizeHint};
17use hyper::service::Service;
18use hyper::{Request, Response};
19
20pub type HyperRequest = hyper::Request<HyperBody>;
21
22use tokio::net::TcpListener;
23
24// todo replace this function once hyper-util is ready
25pub(crate) struct Server {
26	listener: TcpListener,
27	wood: Arc<Wood>,
28}
29
30impl Server {
31	pub(crate) async fn bind(
32		addr: SocketAddr,
33		wood: Arc<Wood>,
34	) -> Result<Self> {
35		let listener = TcpListener::bind(&addr)
36			.await
37			.map_err(Error::from_server_error)?;
38
39		Ok(Self { listener, wood })
40	}
41
42	pub fn local_addr(&self) -> Result<SocketAddr> {
43		self.listener.local_addr().map_err(Error::from_server_error)
44	}
45
46	pub async fn serve(self) -> Result<()> {
47		loop {
48			let (stream, address) = self
49				.listener
50				.accept()
51				.await
52				.map_err(Error::from_server_error)?;
53
54			let io = TokioIo::new(stream);
55
56			let service = FireService {
57				wood: self.wood.clone(),
58				address,
59			};
60
61			tokio::task::spawn(async move {
62				if let Err(err) = Builder::new(TokioExecutor::new())
63					.serve_connection_with_upgrades(io, service)
64					.await
65				{
66					tracing::error!(error = ?err, "Error serving connection: {err}");
67				}
68			});
69		}
70	}
71}
72
73pub struct FireService {
74	wood: Arc<Wood>,
75	address: SocketAddr,
76}
77
78impl FireService {
79	/// Creates a new FireService which can be passed to a hyper server.
80	pub fn new(pit: FirePit, address: SocketAddr) -> Self {
81		Self {
82			wood: pit.wood,
83			address,
84		}
85	}
86}
87
88impl Service<Request<Incoming>> for FireService {
89	type Response = Response<BodyHttp>;
90	type Error = Infallible;
91	type Future = PinnedFuture<'static, StdResult<Self::Response, Self::Error>>;
92
93	fn call(&self, req: Request<Incoming>) -> Self::Future {
94		let wood = self.wood.clone();
95		let address = self.address;
96		PinnedFuture::new(async move {
97			fire::route_hyper(&wood, req, address).await
98		})
99	}
100}
101
102#[derive(Debug)]
103pub struct HyperBody {
104	inner: InnerBody,
105}
106
107impl HyperBody {
108	pub fn new() -> Self {
109		Self::default()
110	}
111
112	pub fn take(&mut self) -> Self {
113		std::mem::take(self)
114	}
115}
116
117#[derive(Debug)]
118enum InnerBody {
119	Empty,
120	Incoming(Incoming),
121}
122
123impl Body for HyperBody {
124	type Data = hyper::body::Bytes;
125	type Error = hyper::Error;
126
127	fn poll_frame(
128		self: Pin<&mut Self>,
129		cx: &mut Context<'_>,
130	) -> Poll<Option<StdResult<Frame<Self::Data>, Self::Error>>> {
131		match &mut self.get_mut().inner {
132			InnerBody::Empty => Poll::Ready(None),
133			InnerBody::Incoming(inc) => Pin::new(inc).poll_frame(cx),
134		}
135	}
136
137	fn is_end_stream(&self) -> bool {
138		match &self.inner {
139			InnerBody::Empty => true,
140			InnerBody::Incoming(inc) => inc.is_end_stream(),
141		}
142	}
143
144	fn size_hint(&self) -> SizeHint {
145		match &self.inner {
146			InnerBody::Empty => SizeHint::default(),
147			InnerBody::Incoming(inc) => inc.size_hint(),
148		}
149	}
150}
151
152impl Default for HyperBody {
153	fn default() -> Self {
154		Self {
155			inner: InnerBody::Empty,
156		}
157	}
158}
159
160impl From<Incoming> for HyperBody {
161	fn from(inc: Incoming) -> Self {
162		Self {
163			inner: InnerBody::Incoming(inc),
164		}
165	}
166}
167
168impl From<HyperBody> for types::Body {
169	fn from(hyper_body: HyperBody) -> Self {
170		match hyper_body.inner {
171			InnerBody::Empty => Self::new(),
172			InnerBody::Incoming(inc) => Self::from(inc),
173		}
174	}
175}