xitca_web/service/
tower_http_compat.rs

1use core::{
2    cell::RefCell,
3    convert::Infallible,
4    pin::Pin,
5    task::{Context, Poll, ready},
6};
7
8use std::borrow::Cow;
9
10use futures_core::stream::Stream;
11use http_body::{Body, Frame, SizeHint};
12use pin_project_lite::pin_project;
13use xitca_http::{
14    body::{BodySize, none_body_hint},
15    util::service::router::{PathGen, RouteGen, RouterMapErr},
16};
17use xitca_unsafe_collection::fake::{FakeSend, FakeSync};
18
19use crate::{
20    body::ResponseBody,
21    bytes::{Buf, Bytes, BytesMut},
22    context::WebContext,
23    http::{Request, RequestExt, Response, WebResponse},
24    service::{Service, ready::ReadyService},
25};
26
27/// A middleware type that bridge `xitca-service` and `tower-service`.
28/// Any `tower-http` type that impl [tower_service::Service] trait can be passed to it and used as xitca-web's service type.
29pub struct TowerHttpCompat<S>(S);
30
31impl<S> TowerHttpCompat<S> {
32    pub const fn new(service: S) -> Self
33    where
34        S: Clone,
35    {
36        Self(service)
37    }
38}
39
40impl<S> Service for TowerHttpCompat<S>
41where
42    S: Clone,
43{
44    type Response = TowerCompatService<S>;
45    type Error = Infallible;
46
47    async fn call(&self, _: ()) -> Result<Self::Response, Self::Error> {
48        let service = self.0.clone();
49        Ok(TowerCompatService(RefCell::new(service)))
50    }
51}
52
53impl<S> PathGen for TowerHttpCompat<S> {}
54
55impl<S> RouteGen for TowerHttpCompat<S> {
56    type Route<R> = RouterMapErr<R>;
57
58    fn route_gen<R>(route: R) -> Self::Route<R> {
59        RouterMapErr(route)
60    }
61}
62
63pub struct TowerCompatService<S>(RefCell<S>);
64
65impl<S> TowerCompatService<S> {
66    pub const fn new(service: S) -> Self {
67        Self(RefCell::new(service))
68    }
69}
70
71impl<'r, C, ReqB, S, ResB> Service<WebContext<'r, C, ReqB>> for TowerCompatService<S>
72where
73    S: tower_service::Service<Request<CompatReqBody<RequestExt<ReqB>, C>>, Response = Response<ResB>>,
74    ResB: Body,
75    C: Clone + 'static,
76    ReqB: Default,
77{
78    type Response = WebResponse<CompatResBody<ResB>>;
79    type Error = S::Error;
80
81    async fn call(&self, mut ctx: WebContext<'r, C, ReqB>) -> Result<Self::Response, Self::Error> {
82        let (parts, ext) = ctx.take_request().into_parts();
83        let ctx = ctx.state().clone();
84        let req = Request::from_parts(parts, CompatReqBody::new(ext, ctx));
85        let fut = tower_service::Service::call(&mut *self.0.borrow_mut(), req);
86        fut.await.map(|res| res.map(CompatResBody::new))
87    }
88}
89
90impl<S> ReadyService for TowerCompatService<S> {
91    type Ready = ();
92
93    #[inline]
94    async fn ready(&self) -> Self::Ready {}
95}
96
97pub struct CompatReqBody<B, C> {
98    body: FakeSend<B>,
99    ctx: FakeSend<FakeSync<C>>,
100}
101
102impl<B, C> CompatReqBody<B, C> {
103    #[inline]
104    pub fn new(body: B, ctx: C) -> Self {
105        Self {
106            body: FakeSend::new(body),
107            ctx: FakeSend::new(FakeSync::new(ctx)),
108        }
109    }
110
111    /// destruct compat body into owned value of body and state context
112    ///
113    /// # Panics
114    /// - When called from a thread not where B is originally constructed.
115    #[inline]
116    pub fn into_parts(self) -> (B, C) {
117        (self.body.into_inner(), self.ctx.into_inner().into_inner())
118    }
119}
120
121impl<B, C, T, E> Body for CompatReqBody<B, C>
122where
123    B: Stream<Item = Result<T, E>> + Unpin,
124    C: Unpin,
125    T: Buf,
126{
127    type Data = T;
128    type Error = E;
129
130    #[inline]
131    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
132        Pin::new(&mut *self.get_mut().body).poll_next(cx).map_ok(Frame::data)
133    }
134
135    #[inline]
136    fn size_hint(&self) -> SizeHint {
137        size_hint(BodySize::from_stream(&*self.body))
138    }
139}
140
141pin_project! {
142    #[derive(Default)]
143    pub struct CompatResBody<B> {
144        #[pin]
145        body: B
146    }
147}
148
149impl<B> CompatResBody<B> {
150    pub const fn new(body: B) -> Self {
151        Self { body }
152    }
153
154    pub fn into_inner(self) -> B {
155        self.body
156    }
157}
158
159// useful shortcuts conversion for B type.
160macro_rules! impl_from {
161    ($ty: ty) => {
162        impl<B> From<$ty> for CompatResBody<B>
163        where
164            B: From<$ty>,
165        {
166            fn from(body: $ty) -> Self {
167                Self::new(B::from(body))
168            }
169        }
170    };
171}
172
173impl_from!(Bytes);
174impl_from!(BytesMut);
175impl_from!(&'static [u8]);
176impl_from!(&'static str);
177impl_from!(Box<[u8]>);
178impl_from!(Vec<u8>);
179impl_from!(String);
180impl_from!(Box<str>);
181impl_from!(Cow<'static, str>);
182impl_from!(ResponseBody);
183
184impl<B, T, E> Body for CompatResBody<B>
185where
186    B: Stream<Item = Result<T, E>>,
187    T: Buf,
188{
189    type Data = T;
190    type Error = E;
191
192    #[inline]
193    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
194        self.project().body.poll_next(cx).map_ok(Frame::data)
195    }
196
197    #[inline]
198    fn size_hint(&self) -> SizeHint {
199        size_hint(BodySize::from_stream(&self.body))
200    }
201}
202
203impl<B> Stream for CompatResBody<B>
204where
205    B: Body,
206{
207    type Item = Result<B::Data, B::Error>;
208
209    #[inline]
210    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
211        match ready!(self.project().body.poll_frame(cx)) {
212            Some(res) => Poll::Ready(res.map(|frame| frame.into_data().ok()).transpose()),
213            None => Poll::Ready(None),
214        }
215    }
216
217    fn size_hint(&self) -> (usize, Option<usize>) {
218        let hint = self.body.size_hint();
219        (hint.lower() as usize, hint.upper().map(|num| num as usize))
220    }
221}
222
223fn size_hint(size: BodySize) -> SizeHint {
224    let mut hint = SizeHint::new();
225    match size {
226        BodySize::None => {
227            let (low, upper) = none_body_hint();
228            hint.set_lower(low as u64);
229            hint.set_upper(upper.unwrap() as u64);
230        }
231        BodySize::Sized(size) => hint.set_exact(size as u64),
232        BodySize::Stream => {}
233    }
234
235    hint
236}
237
238#[cfg(test)]
239mod test {
240    use xitca_http::body::{Once, exact_body_hint};
241
242    use super::*;
243
244    #[test]
245    fn body_compat() {
246        let buf = Bytes::from_static(b"996");
247        let len = buf.len();
248        let body = CompatResBody::new(Once::new(buf));
249
250        let size = Body::size_hint(&body);
251
252        assert_eq!(
253            (size.lower() as usize, size.upper().map(|num| num as usize)),
254            exact_body_hint(len)
255        );
256
257        let body = CompatResBody::new(body);
258
259        let size = Stream::size_hint(&body);
260
261        assert_eq!(size, exact_body_hint(len));
262    }
263}