http_mux/
mux.rs

1use std::fmt;
2use std::pin::Pin;
3use std::future::Future;
4use std::marker::PhantomData;
5use std::sync::Arc;
6use std::convert::TryFrom;
7
8use http::{Uri, Request};
9use plumb::{Pipe,PipeExt};
10use plumb::tuple_utils::{Prepend, Pluck, Merge, Call};
11
12use crate::route::{
13    PathParser,
14    Route,
15    RouteUntyped,
16    PathParseError,
17};
18
19type PinBoxFut<T> = Pin<Box<dyn Future<Output = T> + Send>>;
20
21#[derive(Debug)]
22pub enum MuxError {
23    NotFound(String),
24    MethodNotAllowed(http::Method, String),
25    Parse(String, PathParseError),
26}
27
28impl std::error::Error for MuxError {}
29
30impl fmt::Display for MuxError {
31    fn fmt(&self, f : &mut fmt::Formatter) -> Result<(), fmt::Error> {
32        use MuxError::*;
33        match self {
34            NotFound(s) => write!(f, "no handler found for path {}", s),
35            MethodNotAllowed(m, s) => write!(f, "method {} not allowe for path {}", m, s),
36            Parse(s, e) => write!(f, "failed to parse path {}, {}", s, e),
37        }
38    }
39}
40
41/// Convenience function for putting the input into a `Result`, allowing
42/// subsequent calls to functions like `and_then` to Just Work.
43pub fn new_handler<E, I>() -> impl Pipe<Input = I, Output = Result<I, E>>
44where
45    I : Send + 'static,
46    E : 'static,
47{
48    plumb::id().tuple().seq(Ok)
49}
50
51trait HandlerT<E, I, ReqB, Res>
52where
53    E : From<MuxError>,
54    I : Prepend<Request<ReqB>>,
55{
56    fn serve(
57        &self,
58        input : I::Output,
59    ) -> PinBoxFut<Result<Res, E>>;
60}
61
62struct HandlerFn<F, T, Fut> {
63    f : F,
64    _marker : PhantomData<fn() -> Fut>,
65    route : Route<T>,
66}
67
68impl<E, ReqB, Res, I, F, Fut, T> HandlerT<E, I, ReqB, Res> for HandlerFn<F, T, Fut>
69where
70    E : From<MuxError> + 'static,
71    Res : 'static,
72    I : Prepend<Request<ReqB>>,
73    <I as Prepend<Request<ReqB>>>::Output : Merge<T>,
74    F : Call<
75        <I::Output as Merge<T>>::Output,
76        Fut,
77    > + Clone,
78    Fut : Future<Output = Result<Res, E>> + Send + 'static,
79    T : for<'a, 'b> TryFrom<PathParser<'a, 'b>, Error = PathParseError>,
80{
81    fn serve(
82        &self,
83        po : I::Output,
84    ) -> PinBoxFut<Result<Res, E>> {
85        let (req, po_tail) = po.pluck();
86
87        let res = T::try_from(self.route.parser(req.uri().path()))
88            .map_err(|e| {
89                MuxError::Parse(
90                    req.uri().path().to_string(),
91                    e
92                ).into()
93            })
94            .map(|t| {
95                Call::call(
96                    self.f.clone(),
97                    po_tail.prepend(req).merge(t),
98                )
99            });
100
101        Box::pin(async move {
102            res?.await
103        })
104    }
105}
106
107struct HandlerPipe<P, T> {
108    pipe : P,
109    route : Route<T>,
110}
111
112impl<E, ReqB, Res, I, P, T> HandlerT<E, I, ReqB, Res> for HandlerPipe<P, T>
113where
114    E : From<MuxError> + 'static,
115    Res : 'static,
116    I : Prepend<Request<ReqB>>,
117    <I as Prepend<Request<ReqB>>>::Output : Merge<T>,
118    P : Pipe<
119        Input = <I::Output as Merge<T>>::Output,
120        Output = Result<Res, E>,
121    >,
122    T : for<'a, 'b> TryFrom<PathParser<'a, 'b>, Error = PathParseError>,
123{
124    fn serve(
125        &self,
126        po : I::Output,
127    ) -> PinBoxFut<Result<Res, E>> {
128        let (req, po_tail) = po.pluck();
129
130        let res = T::try_from(self.route.parser(req.uri().path()))
131            .map_err(|e| {
132                MuxError::Parse(
133                    req.uri().path().to_string(),
134                    e,
135                ).into()
136            })
137            .map(|t| {
138                self.pipe.run(po_tail.prepend(req).merge(t))
139            });
140
141        Box::pin(async move {
142            res?.await
143        })
144    }
145}
146
147struct Handler<E, I, ReqB, Res> {
148    route : RouteUntyped,
149    handler : Arc<dyn HandlerT<E, I, ReqB, Res> + Send + Sync>
150}
151
152pub struct Mux<E, I, ReqB, Res>
153where
154    E : From<MuxError>,
155    I : Prepend<Request<ReqB>>,
156{
157    _error : PhantomData<fn () -> E>,
158    _input : PhantomData<fn (I)>,
159    handlers : Vec<Handler<E, I, ReqB, Res>>,
160}
161
162pub fn new_mux<E, ReqB, Res>() -> Mux<
163    E,
164    (),
165    ReqB,
166    Res
167>
168where
169    E : From<MuxError> + 'static,
170    ReqB : Send + 'static,
171{
172    Mux{
173        _error : Default::default(),
174        _input : Default::default(),
175        handlers : Vec::new(),
176    }
177}
178
179
180
181impl<E, I, ReqB, Res> Mux<E, I, ReqB, Res>
182where
183    E : From<MuxError>,
184    I : Prepend<Request<ReqB>>,
185{
186    pub fn handle_fn<T, Hf, Fut>(
187        mut self,
188        route : Route<T>,
189        f : Hf,
190    ) -> Self
191    where
192        E : 'static,
193        Res : 'static,
194        Hf : Call<
195            <I::Output as Merge<T>>::Output,
196            Fut,
197        > + Clone + Send + Sync + 'static,
198        Fut : Future<Output = Result<Res, E>> + Send + 'static,
199        I::Output : Merge<T>,
200        T : for<'a, 'b> TryFrom<PathParser<'a, 'b>, Error = PathParseError> + 'static,
201    {
202        self.handlers.push(Handler{
203            route : route.clone().into(),
204            handler : Arc::new(HandlerFn{
205                _marker : Default::default(),
206                f,
207                route
208            }),
209        });
210
211        self
212    }
213    pub fn handle<T, Hp>(
214        mut self,
215        route : Route<T>,
216        pipe : Hp,
217    ) -> Self
218    where
219        E : 'static,
220        I::Output : Merge<T>,
221        Res : 'static,
222        Hp : Pipe<
223            Input = <I::Output as Merge<T>>::Output,
224            Output = Result<Res, E>,
225        > + Send + Sync + 'static,
226        T : for<'a, 'b> TryFrom<PathParser<'a, 'b>, Error = PathParseError> + 'static,
227    {
228        self.handlers.push(Handler{
229            route : route.clone().into(),
230            handler : Arc::new(HandlerPipe{
231                pipe, route
232            }),
233        });
234
235        self
236    }
237
238    pub fn serve(
239        &self,
240        input : I::Output,
241    ) -> impl Future<Output = Result<Res, E>>
242    {
243        let (req, input_tail) = input.pluck();
244
245        let uri_path = req.uri().path();
246        // let mut vars = Vec::new();
247
248        let mut handler_match = None;
249        let mut path_matched = false;
250
251        // TODO: use an automata
252        // * the loop is innefficient
253        // * the `path_matches` call uses the same underlying function as
254        //      `parse`, which results in allocations
255        for Handler{route, handler} in self.handlers.iter() {
256            if route.path_matches(&req) {
257                path_matched |= true;
258                if route.method_matches(&req) {
259                    handler_match = Some(handler);
260                    break
261                }
262            }
263        }
264
265        let handler_result = match handler_match {
266            Some(handler) => {
267                Ok(handler.serve(input_tail.prepend(req)))
268            },
269            None => {
270                Err(if path_matched {
271                    MuxError::MethodNotAllowed(req.method().clone(), uri_path.to_string())
272                } else {
273                    MuxError::NotFound(uri_path.to_string())
274                }.into())
275            }
276        };
277
278        async move {
279            match handler_result {
280                Err(e) => Err(e),
281                Ok(pre_fut) => {
282                    pre_fut.await
283                }
284            }
285        }
286    }
287}
288
289impl<E, I, ReqB, Res> Pipe for Mux<E, I, ReqB, Res>
290where
291    E : From<MuxError> + Send + 'static,
292    I : Prepend<Request<ReqB>> + 'static,
293    ReqB : 'static,
294    Res : 'static,
295{
296    type Input = I::Output;
297    type Output = Result<Res, E>;
298
299    fn run(
300        &self,
301        input : Self::Input
302    ) -> PinBoxFut<Self::Output> {
303        Box::pin(self.serve(input))
304    }
305}
306
307
308pub fn replace_path<Req>(req : &mut Request<Req>, path : &str) {
309    let u = req.uri();
310    let mut tmp = Uri::builder();
311
312    if let Some(scheme) = u.scheme() {
313        tmp = tmp.scheme(scheme.as_str());
314    }
315
316    if let Some(a) = u.authority() {
317        tmp = tmp.authority(a.as_str());
318    }
319
320    let mut pq = String::new();
321    pq.push_str(path);
322
323    if let Some(q) = u.query() {
324        pq.push_str(q);
325    }
326
327    tmp = tmp.path_and_query(pq);
328
329    *req.uri_mut() = tmp.build().unwrap();
330}