1use core::{
2 cell::RefCell,
3 convert::Infallible,
4 pin::Pin,
5 task::{Context, Poll, ready},
6};
7
8use std::borrow::Cow;
9
10use futures_core::stream::Stream;
11use http_body::{Body, Frame, SizeHint};
12use pin_project_lite::pin_project;
13use xitca_http::{
14 body::{BodySize, none_body_hint},
15 util::service::router::{PathGen, RouteGen, RouterMapErr},
16};
17use xitca_unsafe_collection::fake::{FakeSend, FakeSync};
18
19use crate::{
20 body::ResponseBody,
21 bytes::{Buf, Bytes, BytesMut},
22 context::WebContext,
23 http::{Request, RequestExt, Response, WebResponse},
24 service::{Service, ready::ReadyService},
25};
26
27pub struct TowerHttpCompat<S>(S);
30
31impl<S> TowerHttpCompat<S> {
32 pub const fn new(service: S) -> Self
33 where
34 S: Clone,
35 {
36 Self(service)
37 }
38}
39
40impl<S> Service for TowerHttpCompat<S>
41where
42 S: Clone,
43{
44 type Response = TowerCompatService<S>;
45 type Error = Infallible;
46
47 async fn call(&self, _: ()) -> Result<Self::Response, Self::Error> {
48 let service = self.0.clone();
49 Ok(TowerCompatService(RefCell::new(service)))
50 }
51}
52
53impl<S> PathGen for TowerHttpCompat<S> {}
54
55impl<S> RouteGen for TowerHttpCompat<S> {
56 type Route<R> = RouterMapErr<R>;
57
58 fn route_gen<R>(route: R) -> Self::Route<R> {
59 RouterMapErr(route)
60 }
61}
62
63pub struct TowerCompatService<S>(RefCell<S>);
64
65impl<S> TowerCompatService<S> {
66 pub const fn new(service: S) -> Self {
67 Self(RefCell::new(service))
68 }
69}
70
71impl<'r, C, ReqB, S, ResB> Service<WebContext<'r, C, ReqB>> for TowerCompatService<S>
72where
73 S: tower_service::Service<Request<CompatReqBody<RequestExt<ReqB>, C>>, Response = Response<ResB>>,
74 ResB: Body,
75 C: Clone + 'static,
76 ReqB: Default,
77{
78 type Response = WebResponse<CompatResBody<ResB>>;
79 type Error = S::Error;
80
81 async fn call(&self, mut ctx: WebContext<'r, C, ReqB>) -> Result<Self::Response, Self::Error> {
82 let (parts, ext) = ctx.take_request().into_parts();
83 let ctx = ctx.state().clone();
84 let req = Request::from_parts(parts, CompatReqBody::new(ext, ctx));
85 let fut = tower_service::Service::call(&mut *self.0.borrow_mut(), req);
86 fut.await.map(|res| res.map(CompatResBody::new))
87 }
88}
89
90impl<S> ReadyService for TowerCompatService<S> {
91 type Ready = ();
92
93 #[inline]
94 async fn ready(&self) -> Self::Ready {}
95}
96
97pub struct CompatReqBody<B, C> {
98 body: FakeSend<B>,
99 ctx: FakeSend<FakeSync<C>>,
100}
101
102impl<B, C> CompatReqBody<B, C> {
103 #[inline]
104 pub fn new(body: B, ctx: C) -> Self {
105 Self {
106 body: FakeSend::new(body),
107 ctx: FakeSend::new(FakeSync::new(ctx)),
108 }
109 }
110
111 #[inline]
116 pub fn into_parts(self) -> (B, C) {
117 (self.body.into_inner(), self.ctx.into_inner().into_inner())
118 }
119}
120
121impl<B, C, T, E> Body for CompatReqBody<B, C>
122where
123 B: Stream<Item = Result<T, E>> + Unpin,
124 C: Unpin,
125 T: Buf,
126{
127 type Data = T;
128 type Error = E;
129
130 #[inline]
131 fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
132 Pin::new(&mut *self.get_mut().body).poll_next(cx).map_ok(Frame::data)
133 }
134
135 #[inline]
136 fn size_hint(&self) -> SizeHint {
137 size_hint(BodySize::from_stream(&*self.body))
138 }
139}
140
141pin_project! {
142 #[derive(Default)]
143 pub struct CompatResBody<B> {
144 #[pin]
145 body: B
146 }
147}
148
149impl<B> CompatResBody<B> {
150 pub const fn new(body: B) -> Self {
151 Self { body }
152 }
153
154 pub fn into_inner(self) -> B {
155 self.body
156 }
157}
158
159macro_rules! impl_from {
161 ($ty: ty) => {
162 impl<B> From<$ty> for CompatResBody<B>
163 where
164 B: From<$ty>,
165 {
166 fn from(body: $ty) -> Self {
167 Self::new(B::from(body))
168 }
169 }
170 };
171}
172
173impl_from!(Bytes);
174impl_from!(BytesMut);
175impl_from!(&'static [u8]);
176impl_from!(&'static str);
177impl_from!(Box<[u8]>);
178impl_from!(Vec<u8>);
179impl_from!(String);
180impl_from!(Box<str>);
181impl_from!(Cow<'static, str>);
182impl_from!(ResponseBody);
183
184impl<B, T, E> Body for CompatResBody<B>
185where
186 B: Stream<Item = Result<T, E>>,
187 T: Buf,
188{
189 type Data = T;
190 type Error = E;
191
192 #[inline]
193 fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
194 self.project().body.poll_next(cx).map_ok(Frame::data)
195 }
196
197 #[inline]
198 fn size_hint(&self) -> SizeHint {
199 size_hint(BodySize::from_stream(&self.body))
200 }
201}
202
203impl<B> Stream for CompatResBody<B>
204where
205 B: Body,
206{
207 type Item = Result<B::Data, B::Error>;
208
209 #[inline]
210 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
211 match ready!(self.project().body.poll_frame(cx)) {
212 Some(res) => Poll::Ready(res.map(|frame| frame.into_data().ok()).transpose()),
213 None => Poll::Ready(None),
214 }
215 }
216
217 fn size_hint(&self) -> (usize, Option<usize>) {
218 let hint = self.body.size_hint();
219 (hint.lower() as usize, hint.upper().map(|num| num as usize))
220 }
221}
222
223fn size_hint(size: BodySize) -> SizeHint {
224 let mut hint = SizeHint::new();
225 match size {
226 BodySize::None => {
227 let (low, upper) = none_body_hint();
228 hint.set_lower(low as u64);
229 hint.set_upper(upper.unwrap() as u64);
230 }
231 BodySize::Sized(size) => hint.set_exact(size as u64),
232 BodySize::Stream => {}
233 }
234
235 hint
236}
237
238#[cfg(test)]
239mod test {
240 use xitca_http::body::{Once, exact_body_hint};
241
242 use super::*;
243
244 #[test]
245 fn body_compat() {
246 let buf = Bytes::from_static(b"996");
247 let len = buf.len();
248 let body = CompatResBody::new(Once::new(buf));
249
250 let size = Body::size_hint(&body);
251
252 assert_eq!(
253 (size.lower() as usize, size.upper().map(|num| num as usize)),
254 exact_body_hint(len)
255 );
256
257 let body = CompatResBody::new(body);
258
259 let size = Stream::size_hint(&body);
260
261 assert_eq!(size, exact_body_hint(len));
262 }
263}