1use executor_core::AnyExecutor;
2use futures_util::{stream::MapOk, TryStreamExt};
3use http_body_util::{BodyDataStream, StreamBody};
4use hyper::{
5 body::{Frame, Incoming},
6 service::Service,
7};
8use std::{future::Future, pin::Pin, sync::Arc};
9
10use skyzen_core::{BodyError, Endpoint};
11
12type BoxFuture<T> = Pin<Box<dyn 'static + Send + Future<Output = T>>>;
13type Bytes = http_kit::utils::Bytes;
14
15#[derive(Debug)]
17pub struct IntoService<E> {
18 endpoint: E,
19 executor: Arc<AnyExecutor>,
20}
21
22impl<E: Endpoint + Clone> IntoService<E> {
23 pub const fn new(endpoint: E, executor: Arc<AnyExecutor>) -> Self {
25 Self { endpoint, executor }
26 }
27}
28
29impl<E: Endpoint + Send + Sync + Clone + 'static> Service<hyper::Request<Incoming>>
30 for IntoService<E>
31{
32 type Response =
33 hyper::Response<StreamBody<MapOk<skyzen_core::Body, fn(Bytes) -> Frame<Bytes>>>>;
34 type Error = E::Error;
35 type Future = BoxFuture<Result<Self::Response, Self::Error>>;
36
37 fn call(&self, mut req: hyper::Request<Incoming>) -> Self::Future {
38 let mut endpoint = self.endpoint.clone();
40 let executor = self.executor.clone();
41 let fut = async move {
42 let on_upgrade = hyper::upgrade::on(&mut req);
43 let mut request: skyzen_core::Request =
44 skyzen_core::Request::from(req.map(BodyDataStream::new).map(|body| {
45 skyzen_core::Body::from_stream(
46 body.map_err(|error| BodyError::Other(Box::new(error))),
47 )
48 }));
49 request.extensions_mut().insert(on_upgrade);
50 request.extensions_mut().insert(executor);
51 let response: Result<skyzen_core::Response, _> = endpoint.respond(&mut request).await;
52
53 let response: Result<hyper::Response<skyzen_core::Body>, _> = response;
54
55 response.map(|response| {
56 response.map(|body| {
57 let body: MapOk<skyzen_core::Body, fn(Bytes) -> Frame<Bytes>> =
58 body.map_ok(Frame::data);
59
60 StreamBody::new(body)
61 })
62 })
63 };
64
65 Box::pin(fut)
66 }
67}