clia_reqwest_ntex_stream/lib.rs
1use std::io;
2// use std::mem;
3
4use ntex::web;
5
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use futures::Stream;
9use ntex::http::error::PayloadError;
10
11/// 把 ntex 的 Payload 强行改成可发送的
12/// 这样才能支持reqwest的流分发方式
13///
14/// 注意:此物最好不要跨线程使用,否则就真的不安全了
15///
16/// ## Example
17///
18/// ```rust
19/// async fn handle(
20/// body: ntex::web::types::Payload,
21/// ) {
22/// let mut builder = client.get(url);
23/// builder = builder.body(reqwest::Body::wrap_stream(reqwest_ntex_stream::PayloadStream {
24/// payload: body,
25/// }));
26/// builder.send().await;
27/// }
28/// ```
29pub struct PayloadStream {
30 pub payload: web::types::Payload,
31}
32
33unsafe impl Send for PayloadStream {}
34unsafe impl Sync for PayloadStream {}
35
36impl Stream for PayloadStream {
37 type Item = Result<bytes::Bytes, io::Error>;
38
39 #[inline]
40 fn poll_next(
41 mut self: Pin<&mut Self>,
42 cx: &mut Context<'_>,
43 ) -> Poll<Option<Self::Item>> {
44
45 // 由于 ntex 的 PayloadError 只在体系内,这里需要转换一下才能传给reqwest
46 match Pin::new(&mut self.payload).poll_next(cx) {
47 Poll::Pending => Poll::Pending,
48 Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(bytes::Bytes::copy_from_slice(res.as_ref())))),
49 // {
50 // let bytes = unsafe { mem::transmute::<ntex::util::Bytes, bytes::Bytes>(res) };
51 // Poll::Ready(Some(Ok(bytes)))
52 // }
53 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(match e {
54 PayloadError::Incomplete(o) => match o {
55 Some(e) => e,
56 None => io::Error::new(io::ErrorKind::Other, "PayloadError::Incomplete None"),
57 },
58 PayloadError::EncodingCorrupted => io::Error::new(io::ErrorKind::Other, "PayloadError::EncodingCorrupted"),
59 PayloadError::Overflow => io::Error::new(io::ErrorKind::Other, "PayloadError::Overflow"),
60 PayloadError::UnknownLength => io::Error::new(io::ErrorKind::Other, "PayloadError::UnknownLength"),
61 PayloadError::Http2Payload(e) => io::Error::new(io::ErrorKind::Other, format!("PayloadError::Http2Payload {:?}", e)),
62 PayloadError::Parse(e) => io::Error::new(io::ErrorKind::Other, format!("PayloadError::Parse {:?}", e)),
63 PayloadError::Io(e) => e,
64 }))),
65 Poll::Ready(None) => Poll::Ready(None),
66 }
67 }
68}
69
70/// 把 reqwest 的流也封装一下,转换需要的错误类型
71///
72/// ## Example
73///
74/// ```rust
75/// let res = builder.send().await;
76/// let stream = res.bytes_stream();
77/// let mut resp = HttpResponse::build(res.status());
78/// // 这种方式默认会使用 chunked 传输方式
79/// return Ok(resp.streaming(reqwest_ntex_stream::ResponseStream{ stream: stream }));
80/// ```
81pub struct ResponseStream<T> where T: Stream<Item = reqwest::Result<bytes::Bytes>> + Unpin {
82 // stream: Box<dyn Stream<Item = reqwest::Result<web::Bytes>>>,
83 // stream: Box<dyn Stream<Item = reqwest::Result<web::Bytes>>>,
84 pub stream: T,
85}
86
87impl<T> Stream for ResponseStream<T> where T: Stream<Item = reqwest::Result<bytes::Bytes>> + Unpin {
88 type Item = Result<bytes::Bytes, ntex::web::Error>;
89
90 #[inline]
91 fn poll_next(
92 mut self: Pin<&mut Self>,
93 cx: &mut Context<'_>,
94 ) -> Poll<Option<Self::Item>> {
95
96 // 由于 ntex 和 reqwest 的错误体系不互通,这里需要转换一下
97 // match Pin::new(&mut self.stream).poll_next(cx) {
98 // match Pin::new(Box::leak(self.stream)).poll_next(cx) {
99 // let s = Pin::new(&mut self.stream);
100 // let s2 = unsafe { Box::from_raw(Box::into_raw(self.stream)) };
101 // match <Pin<Box<_>>>::from(self.stream).as_mut().poll_next(cx) {
102 // match s.as_mut().poll_next(cx) {
103 // match Pin::new(&mut Box::new(&mut self.stream)).poll_next(cx) {
104 // match Pin::new((&mut self.stream).as_mut()).poll_next(cx) {
105 // match self.stream.poll_next(cx) {
106 // match Box::pin(Box::new(self.stream)).as_mut().poll_next(cx) {
107 match Pin::new(&mut self.stream).poll_next(cx) {
108 Poll::Pending => Poll::Pending,
109 Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
110 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, format!("{:?}", e)).into()))),
111 Poll::Ready(None) => Poll::Ready(None),
112 }
113 }
114}
115
116/// 把 reqwest 的流也封装一下,转换需要的错误类型
117///
118/// ## Example
119///
120/// ```rust
121/// let res = builder.send().await;
122/// let stream = res.bytes_stream();
123/// let mut resp = HttpResponse::build(res.status());
124/// // 这种方式默认会使用 chunked 传输方式
125/// return Ok(resp.streaming(reqwest_ntex_stream::ResponseStream{ stream: stream }));
126/// ```
127pub struct SizedResponseStream<T> where T: Stream<Item = reqwest::Result<bytes::Bytes>> + Unpin {
128 // stream: Box<dyn Stream<Item = reqwest::Result<web::Bytes>>>,
129 // stream: Box<dyn Stream<Item = reqwest::Result<web::Bytes>>>,
130 pub stream: T,
131}
132
133impl<T> Stream for SizedResponseStream<T> where T: Stream<Item = reqwest::Result<bytes::Bytes>> + Unpin {
134 type Item = Result<bytes::Bytes, Box<dyn std::error::Error>>;
135
136 #[inline]
137 fn poll_next(
138 mut self: Pin<&mut Self>,
139 cx: &mut Context<'_>,
140 ) -> Poll<Option<Self::Item>> {
141
142 // 由于 ntex 和 reqwest 的错误体系不互通,这里需要转换一下
143 // match Pin::new(&mut self.stream).poll_next(cx) {
144 // match Pin::new(Box::leak(self.stream)).poll_next(cx) {
145 // let s = Pin::new(&mut self.stream);
146 // let s2 = unsafe { Box::from_raw(Box::into_raw(self.stream)) };
147 // match <Pin<Box<_>>>::from(self.stream).as_mut().poll_next(cx) {
148 // match s.as_mut().poll_next(cx) {
149 // match Pin::new(&mut Box::new(&mut self.stream)).poll_next(cx) {
150 // match Pin::new((&mut self.stream).as_mut()).poll_next(cx) {
151 // match self.stream.poll_next(cx) {
152 // match Box::pin(Box::new(self.stream)).as_mut().poll_next(cx) {
153 match Pin::new(&mut self.stream).poll_next(cx) {
154 Poll::Pending => Poll::Pending,
155 Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
156 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, format!("{:?}", e)).into()))),
157 Poll::Ready(None) => Poll::Ready(None),
158 }
159 }
160}