clia_reqwest_ntex_stream/
lib.rs

1use std::io;
2// use std::mem;
3
4use ntex::web;
5
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use futures::Stream;
9use ntex::http::error::PayloadError;
10
11/// 把 ntex 的 Payload 强行改成可发送的
12/// 这样才能支持reqwest的流分发方式
13/// 
14/// 注意:此物最好不要跨线程使用,否则就真的不安全了
15/// 
16/// ## Example
17///
18/// ```rust
19/// async fn handle(
20///     body: ntex::web::types::Payload,
21/// ) {
22///     let mut builder = client.get(url);
23///     builder = builder.body(reqwest::Body::wrap_stream(reqwest_ntex_stream::PayloadStream {
24///         payload: body,
25///     }));
26///     builder.send().await;
27/// }
28/// ```
29pub struct PayloadStream {
30    pub payload: web::types::Payload,
31}
32
33unsafe impl Send for PayloadStream {}
34unsafe impl Sync for PayloadStream {}
35
36impl Stream for PayloadStream {
37    type Item = Result<bytes::Bytes, io::Error>;
38
39    #[inline]
40    fn poll_next(
41        mut self: Pin<&mut Self>,
42        cx: &mut Context<'_>,
43    ) -> Poll<Option<Self::Item>> {
44
45        // 由于 ntex 的 PayloadError 只在体系内,这里需要转换一下才能传给reqwest
46        match Pin::new(&mut self.payload).poll_next(cx) {
47            Poll::Pending => Poll::Pending,
48            Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(bytes::Bytes::copy_from_slice(res.as_ref())))),
49            // {
50            //     let bytes = unsafe { mem::transmute::<ntex::util::Bytes, bytes::Bytes>(res) };
51            //     Poll::Ready(Some(Ok(bytes)))
52            // }
53            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(match e {
54                PayloadError::Incomplete(o) => match o {
55                    Some(e) => e,
56                    None => io::Error::new(io::ErrorKind::Other, "PayloadError::Incomplete None"),
57                },
58                PayloadError::EncodingCorrupted => io::Error::new(io::ErrorKind::Other, "PayloadError::EncodingCorrupted"),
59                PayloadError::Overflow => io::Error::new(io::ErrorKind::Other, "PayloadError::Overflow"),
60                PayloadError::UnknownLength => io::Error::new(io::ErrorKind::Other, "PayloadError::UnknownLength"),
61                PayloadError::Http2Payload(e) => io::Error::new(io::ErrorKind::Other, format!("PayloadError::Http2Payload {:?}", e)),
62                PayloadError::Parse(e) => io::Error::new(io::ErrorKind::Other, format!("PayloadError::Parse {:?}", e)),
63                PayloadError::Io(e) => e,
64            }))),
65            Poll::Ready(None) => Poll::Ready(None),
66        }
67    }
68}
69
70/// 把 reqwest 的流也封装一下,转换需要的错误类型
71/// 
72/// ## Example
73///
74/// ```rust
75/// let res = builder.send().await;
76/// let stream = res.bytes_stream();
77/// let mut resp = HttpResponse::build(res.status());
78/// // 这种方式默认会使用 chunked 传输方式
79/// return Ok(resp.streaming(reqwest_ntex_stream::ResponseStream{ stream: stream }));
80/// ```
81pub struct ResponseStream<T> where T: Stream<Item = reqwest::Result<bytes::Bytes>> + Unpin {
82    // stream: Box<dyn Stream<Item = reqwest::Result<web::Bytes>>>,
83    // stream: Box<dyn Stream<Item = reqwest::Result<web::Bytes>>>,
84    pub stream: T,
85}
86
87impl<T> Stream for ResponseStream<T> where T: Stream<Item = reqwest::Result<bytes::Bytes>> + Unpin {
88    type Item = Result<bytes::Bytes, ntex::web::Error>;
89
90    #[inline]
91    fn poll_next(
92        mut self: Pin<&mut Self>,
93        cx: &mut Context<'_>,
94    ) -> Poll<Option<Self::Item>> {
95
96        // 由于 ntex 和 reqwest 的错误体系不互通,这里需要转换一下
97        // match Pin::new(&mut self.stream).poll_next(cx) {
98        // match Pin::new(Box::leak(self.stream)).poll_next(cx) {
99        // let s = Pin::new(&mut self.stream);
100        // let s2 = unsafe { Box::from_raw(Box::into_raw(self.stream)) };
101        // match <Pin<Box<_>>>::from(self.stream).as_mut().poll_next(cx) {
102        // match s.as_mut().poll_next(cx) {
103        // match Pin::new(&mut Box::new(&mut self.stream)).poll_next(cx) {
104        // match Pin::new((&mut self.stream).as_mut()).poll_next(cx) {
105        // match self.stream.poll_next(cx) {
106        // match Box::pin(Box::new(self.stream)).as_mut().poll_next(cx) {
107        match Pin::new(&mut self.stream).poll_next(cx) {
108            Poll::Pending => Poll::Pending,
109            Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
110            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, format!("{:?}", e)).into()))),
111            Poll::Ready(None) => Poll::Ready(None),
112        }
113    }
114}
115
116/// 把 reqwest 的流也封装一下,转换需要的错误类型
117/// 
118/// ## Example
119///
120/// ```rust
121/// let res = builder.send().await;
122/// let stream = res.bytes_stream();
123/// let mut resp = HttpResponse::build(res.status());
124/// // 这种方式默认会使用 chunked 传输方式
125/// return Ok(resp.streaming(reqwest_ntex_stream::ResponseStream{ stream: stream }));
126/// ```
127pub struct SizedResponseStream<T> where T: Stream<Item = reqwest::Result<bytes::Bytes>> + Unpin {
128    // stream: Box<dyn Stream<Item = reqwest::Result<web::Bytes>>>,
129    // stream: Box<dyn Stream<Item = reqwest::Result<web::Bytes>>>,
130    pub stream: T,
131}
132
133impl<T> Stream for SizedResponseStream<T> where T: Stream<Item = reqwest::Result<bytes::Bytes>> + Unpin {
134    type Item = Result<bytes::Bytes, Box<dyn std::error::Error>>;
135
136    #[inline]
137    fn poll_next(
138        mut self: Pin<&mut Self>,
139        cx: &mut Context<'_>,
140    ) -> Poll<Option<Self::Item>> {
141
142        // 由于 ntex 和 reqwest 的错误体系不互通,这里需要转换一下
143        // match Pin::new(&mut self.stream).poll_next(cx) {
144        // match Pin::new(Box::leak(self.stream)).poll_next(cx) {
145        // let s = Pin::new(&mut self.stream);
146        // let s2 = unsafe { Box::from_raw(Box::into_raw(self.stream)) };
147        // match <Pin<Box<_>>>::from(self.stream).as_mut().poll_next(cx) {
148        // match s.as_mut().poll_next(cx) {
149        // match Pin::new(&mut Box::new(&mut self.stream)).poll_next(cx) {
150        // match Pin::new((&mut self.stream).as_mut()).poll_next(cx) {
151        // match self.stream.poll_next(cx) {
152        // match Box::pin(Box::new(self.stream)).as_mut().poll_next(cx) {
153        match Pin::new(&mut self.stream).poll_next(cx) {
154            Poll::Pending => Poll::Pending,
155            Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
156            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, format!("{:?}", e)).into()))),
157            Poll::Ready(None) => Poll::Ready(None),
158        }
159    }
160}