skyzen_hyper/
service.rs

1use executor_core::AnyExecutor;
2use futures_util::{stream::MapOk, TryStreamExt};
3use http_body_util::{BodyDataStream, StreamBody};
4use hyper::{
5    body::{Frame, Incoming},
6    service::Service,
7};
8use std::{future::Future, pin::Pin, sync::Arc};
9
10use skyzen_core::{BodyError, Endpoint};
11
12type BoxFuture<T> = Pin<Box<dyn 'static + Send + Future<Output = T>>>;
13type Bytes = http_kit::utils::Bytes;
14
15/// Hyper service adapter for skyzen endpoints.
16#[derive(Debug)]
17pub struct IntoService<E> {
18    endpoint: E,
19    executor: Arc<AnyExecutor>,
20}
21
22impl<E: Endpoint + Clone> IntoService<E> {
23    /// Create a new service with the given endpoint and executor.
24    pub const fn new(endpoint: E, executor: Arc<AnyExecutor>) -> Self {
25        Self { endpoint, executor }
26    }
27}
28
29impl<E: Endpoint + Send + Sync + Clone + 'static> Service<hyper::Request<Incoming>>
30    for IntoService<E>
31{
32    type Response =
33        hyper::Response<StreamBody<MapOk<skyzen_core::Body, fn(Bytes) -> Frame<Bytes>>>>;
34    type Error = E::Error;
35    type Future = BoxFuture<Result<Self::Response, Self::Error>>;
36
37    fn call(&self, mut req: hyper::Request<Incoming>) -> Self::Future {
38        // TODO: Rewrite when impl Trait in associated types stabilized
39        let mut endpoint = self.endpoint.clone();
40        let executor = self.executor.clone();
41        let fut = async move {
42            let on_upgrade = hyper::upgrade::on(&mut req);
43            let mut request: skyzen_core::Request =
44                skyzen_core::Request::from(req.map(BodyDataStream::new).map(|body| {
45                    skyzen_core::Body::from_stream(
46                        body.map_err(|error| BodyError::Other(Box::new(error))),
47                    )
48                }));
49            request.extensions_mut().insert(on_upgrade);
50            request.extensions_mut().insert(executor);
51            let response: Result<skyzen_core::Response, _> = endpoint.respond(&mut request).await;
52
53            let response: Result<hyper::Response<skyzen_core::Body>, _> = response;
54
55            response.map(|response| {
56                response.map(|body| {
57                    let body: MapOk<skyzen_core::Body, fn(Bytes) -> Frame<Bytes>> =
58                        body.map_ok(Frame::data);
59
60                    StreamBody::new(body)
61                })
62            })
63        };
64
65        Box::pin(fut)
66    }
67}