1use std::fmt;
2use std::pin::Pin;
3use std::future::Future;
4use std::marker::PhantomData;
5use std::sync::Arc;
6use std::convert::TryFrom;
7
8use http::{Uri, Request};
9use plumb::{Pipe,PipeExt};
10use plumb::tuple_utils::{Prepend, Pluck, Merge, Call};
11
12use crate::route::{
13 PathParser,
14 Route,
15 RouteUntyped,
16 PathParseError,
17};
18
19type PinBoxFut<T> = Pin<Box<dyn Future<Output = T> + Send>>;
20
21#[derive(Debug)]
22pub enum MuxError {
23 NotFound(String),
24 MethodNotAllowed(http::Method, String),
25 Parse(String, PathParseError),
26}
27
28impl std::error::Error for MuxError {}
29
30impl fmt::Display for MuxError {
31 fn fmt(&self, f : &mut fmt::Formatter) -> Result<(), fmt::Error> {
32 use MuxError::*;
33 match self {
34 NotFound(s) => write!(f, "no handler found for path {}", s),
35 MethodNotAllowed(m, s) => write!(f, "method {} not allowe for path {}", m, s),
36 Parse(s, e) => write!(f, "failed to parse path {}, {}", s, e),
37 }
38 }
39}
40
41pub fn new_handler<E, I>() -> impl Pipe<Input = I, Output = Result<I, E>>
44where
45 I : Send + 'static,
46 E : 'static,
47{
48 plumb::id().tuple().seq(Ok)
49}
50
51trait HandlerT<E, I, ReqB, Res>
52where
53 E : From<MuxError>,
54 I : Prepend<Request<ReqB>>,
55{
56 fn serve(
57 &self,
58 input : I::Output,
59 ) -> PinBoxFut<Result<Res, E>>;
60}
61
62struct HandlerFn<F, T, Fut> {
63 f : F,
64 _marker : PhantomData<fn() -> Fut>,
65 route : Route<T>,
66}
67
68impl<E, ReqB, Res, I, F, Fut, T> HandlerT<E, I, ReqB, Res> for HandlerFn<F, T, Fut>
69where
70 E : From<MuxError> + 'static,
71 Res : 'static,
72 I : Prepend<Request<ReqB>>,
73 <I as Prepend<Request<ReqB>>>::Output : Merge<T>,
74 F : Call<
75 <I::Output as Merge<T>>::Output,
76 Fut,
77 > + Clone,
78 Fut : Future<Output = Result<Res, E>> + Send + 'static,
79 T : for<'a, 'b> TryFrom<PathParser<'a, 'b>, Error = PathParseError>,
80{
81 fn serve(
82 &self,
83 po : I::Output,
84 ) -> PinBoxFut<Result<Res, E>> {
85 let (req, po_tail) = po.pluck();
86
87 let res = T::try_from(self.route.parser(req.uri().path()))
88 .map_err(|e| {
89 MuxError::Parse(
90 req.uri().path().to_string(),
91 e
92 ).into()
93 })
94 .map(|t| {
95 Call::call(
96 self.f.clone(),
97 po_tail.prepend(req).merge(t),
98 )
99 });
100
101 Box::pin(async move {
102 res?.await
103 })
104 }
105}
106
107struct HandlerPipe<P, T> {
108 pipe : P,
109 route : Route<T>,
110}
111
112impl<E, ReqB, Res, I, P, T> HandlerT<E, I, ReqB, Res> for HandlerPipe<P, T>
113where
114 E : From<MuxError> + 'static,
115 Res : 'static,
116 I : Prepend<Request<ReqB>>,
117 <I as Prepend<Request<ReqB>>>::Output : Merge<T>,
118 P : Pipe<
119 Input = <I::Output as Merge<T>>::Output,
120 Output = Result<Res, E>,
121 >,
122 T : for<'a, 'b> TryFrom<PathParser<'a, 'b>, Error = PathParseError>,
123{
124 fn serve(
125 &self,
126 po : I::Output,
127 ) -> PinBoxFut<Result<Res, E>> {
128 let (req, po_tail) = po.pluck();
129
130 let res = T::try_from(self.route.parser(req.uri().path()))
131 .map_err(|e| {
132 MuxError::Parse(
133 req.uri().path().to_string(),
134 e,
135 ).into()
136 })
137 .map(|t| {
138 self.pipe.run(po_tail.prepend(req).merge(t))
139 });
140
141 Box::pin(async move {
142 res?.await
143 })
144 }
145}
146
147struct Handler<E, I, ReqB, Res> {
148 route : RouteUntyped,
149 handler : Arc<dyn HandlerT<E, I, ReqB, Res> + Send + Sync>
150}
151
152pub struct Mux<E, I, ReqB, Res>
153where
154 E : From<MuxError>,
155 I : Prepend<Request<ReqB>>,
156{
157 _error : PhantomData<fn () -> E>,
158 _input : PhantomData<fn (I)>,
159 handlers : Vec<Handler<E, I, ReqB, Res>>,
160}
161
162pub fn new_mux<E, ReqB, Res>() -> Mux<
163 E,
164 (),
165 ReqB,
166 Res
167>
168where
169 E : From<MuxError> + 'static,
170 ReqB : Send + 'static,
171{
172 Mux{
173 _error : Default::default(),
174 _input : Default::default(),
175 handlers : Vec::new(),
176 }
177}
178
179
180
181impl<E, I, ReqB, Res> Mux<E, I, ReqB, Res>
182where
183 E : From<MuxError>,
184 I : Prepend<Request<ReqB>>,
185{
186 pub fn handle_fn<T, Hf, Fut>(
187 mut self,
188 route : Route<T>,
189 f : Hf,
190 ) -> Self
191 where
192 E : 'static,
193 Res : 'static,
194 Hf : Call<
195 <I::Output as Merge<T>>::Output,
196 Fut,
197 > + Clone + Send + Sync + 'static,
198 Fut : Future<Output = Result<Res, E>> + Send + 'static,
199 I::Output : Merge<T>,
200 T : for<'a, 'b> TryFrom<PathParser<'a, 'b>, Error = PathParseError> + 'static,
201 {
202 self.handlers.push(Handler{
203 route : route.clone().into(),
204 handler : Arc::new(HandlerFn{
205 _marker : Default::default(),
206 f,
207 route
208 }),
209 });
210
211 self
212 }
213 pub fn handle<T, Hp>(
214 mut self,
215 route : Route<T>,
216 pipe : Hp,
217 ) -> Self
218 where
219 E : 'static,
220 I::Output : Merge<T>,
221 Res : 'static,
222 Hp : Pipe<
223 Input = <I::Output as Merge<T>>::Output,
224 Output = Result<Res, E>,
225 > + Send + Sync + 'static,
226 T : for<'a, 'b> TryFrom<PathParser<'a, 'b>, Error = PathParseError> + 'static,
227 {
228 self.handlers.push(Handler{
229 route : route.clone().into(),
230 handler : Arc::new(HandlerPipe{
231 pipe, route
232 }),
233 });
234
235 self
236 }
237
238 pub fn serve(
239 &self,
240 input : I::Output,
241 ) -> impl Future<Output = Result<Res, E>>
242 {
243 let (req, input_tail) = input.pluck();
244
245 let uri_path = req.uri().path();
246 let mut handler_match = None;
249 let mut path_matched = false;
250
251 for Handler{route, handler} in self.handlers.iter() {
256 if route.path_matches(&req) {
257 path_matched |= true;
258 if route.method_matches(&req) {
259 handler_match = Some(handler);
260 break
261 }
262 }
263 }
264
265 let handler_result = match handler_match {
266 Some(handler) => {
267 Ok(handler.serve(input_tail.prepend(req)))
268 },
269 None => {
270 Err(if path_matched {
271 MuxError::MethodNotAllowed(req.method().clone(), uri_path.to_string())
272 } else {
273 MuxError::NotFound(uri_path.to_string())
274 }.into())
275 }
276 };
277
278 async move {
279 match handler_result {
280 Err(e) => Err(e),
281 Ok(pre_fut) => {
282 pre_fut.await
283 }
284 }
285 }
286 }
287}
288
289impl<E, I, ReqB, Res> Pipe for Mux<E, I, ReqB, Res>
290where
291 E : From<MuxError> + Send + 'static,
292 I : Prepend<Request<ReqB>> + 'static,
293 ReqB : 'static,
294 Res : 'static,
295{
296 type Input = I::Output;
297 type Output = Result<Res, E>;
298
299 fn run(
300 &self,
301 input : Self::Input
302 ) -> PinBoxFut<Self::Output> {
303 Box::pin(self.serve(input))
304 }
305}
306
307
308pub fn replace_path<Req>(req : &mut Request<Req>, path : &str) {
309 let u = req.uri();
310 let mut tmp = Uri::builder();
311
312 if let Some(scheme) = u.scheme() {
313 tmp = tmp.scheme(scheme.as_str());
314 }
315
316 if let Some(a) = u.authority() {
317 tmp = tmp.authority(a.as_str());
318 }
319
320 let mut pq = String::new();
321 pq.push_str(path);
322
323 if let Some(q) = u.query() {
324 pq.push_str(q);
325 }
326
327 tmp = tmp.path_and_query(pq);
328
329 *req.uri_mut() = tmp.build().unwrap();
330}