1use crate::{RequestBuilder, Response};
2use std::{
3 future::Future,
4 io::{self},
5 pin::{pin, Pin},
6 task::Poll,
7};
8type ResponseStream = Box<dyn Unpin + Future<Output = reqwest::Result<Response>>>;
9
10pub struct LazyResponseReader {
11 request: Option<RequestBuilder>,
12 stream: Option<ResponseStream>,
13 reader: Option<ResponseReader>,
14}
15
16impl From<RequestBuilder> for LazyResponseReader {
17 fn from(value: RequestBuilder) -> Self {
18 Self::new(value)
19 }
20}
21impl From<ResponseStream> for LazyResponseReader {
22 fn from(value: ResponseStream) -> Self {
23 Self {
24 request: None,
25 stream: Some(value),
26 reader: None,
27 }
28 }
29}
30impl LazyResponseReader {
31 pub fn new(builder: RequestBuilder) -> Self {
32 Self {
33 request: Some(builder),
34 stream: None,
35 reader: None,
36 }
37 }
38}
39impl Unpin for LazyResponseReader {}
40
41macro_rules! ready {
42 ($n:expr) => {
43 if ($n == 0) {
44 return Poll::Ready(Ok(()));
45 }
46 };
47 ($v1:expr, $v2:expr) => {
48 if ($v1 == $v2) {
49 return Poll::Ready(Ok(()));
50 }
51 };
52}
53
54impl tokio::io::AsyncRead for LazyResponseReader {
55 fn poll_read(
56 self: std::pin::Pin<&mut Self>,
57 cx: &mut std::task::Context<'_>,
58 buf: &mut tokio::io::ReadBuf<'_>,
59 ) -> std::task::Poll<io::Result<()>> {
60 ready!(buf.remaining());
61
62 let this = self.get_mut();
63
64 if this.request.is_some() {
65 let request = this.request.take().unwrap();
66 this.stream = Some(Box::new(request.send()));
67 }
68 if let Some(send) = &mut this.stream {
69 match Future::poll(Pin::new(send), cx) {
70 Poll::Ready(data) => match data {
71 Ok(response) => {
72 if !response.status().is_success() {
73 return Poll::Ready(Err(io::Error::other(
74 response.status().to_string(),
75 )));
76 }
77 this.stream = None;
78 this.reader = Some(ResponseReader::new(response))
79 }
80 Err(e) => return Poll::Ready(Err(io::Error::other(e.to_string()))),
81 },
82 Poll::Pending => return Poll::Pending,
83 }
84 }
85
86 if let Some(val) = &mut this.reader {
87 return Pin::new(val).poll_read(cx, buf);
88 }
89
90 Poll::Ready(Ok(()))
91 }
92}
93
94#[derive(Default)]
95pub struct ResponseReader {
96 inner: Option<Response>,
97 buf: Vec<u8>,
98}
99
100impl std::fmt::Debug for ResponseReader {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 match &self.inner {
103 Some(res) => res.fmt(f),
104 _ => f.write_str("None"),
105 }
106 }
107}
108impl From<Response> for ResponseReader {
109 fn from(value: Response) -> Self {
110 Self::new(value)
111 }
112}
113impl ResponseReader {
114 pub fn new(response: Response) -> ResponseReader {
115 Self {
116 inner: Some(response),
117 ..Default::default()
118 }
119 }
120}
121
122impl tokio::io::AsyncRead for ResponseReader {
123 fn poll_read(
124 self: std::pin::Pin<&mut Self>,
125 cx: &mut std::task::Context<'_>,
126 buf: &mut tokio::io::ReadBuf<'_>,
127 ) -> std::task::Poll<io::Result<()>> {
128 ready!(buf.remaining());
129 let this = self.get_mut();
130 if let Some(res) = &mut this.inner {
131 loop {
132 let chunk = res.chunk();
133 match Future::poll(pin!(chunk), cx) {
134 Poll::Ready(Ok(bytes)) => {
135 if let Some(bytes) = bytes {
136 this.buf.extend_from_slice(&bytes.slice(0..bytes.len()));
137 let remain = buf.remaining();
138 if this.buf.len() >= remain {
139 buf.put_slice(&this.buf[..remain]);
140 this.buf = this.buf[remain..].to_owned();
141 return Poll::Ready(Ok(()));
142 }
143 } else {
144 let remain = buf.remaining();
145 if this.buf.len() >= remain {
146 buf.put_slice(&this.buf[..remain]);
147 this.buf = this.buf[remain..].to_owned();
148 } else {
149 buf.put_slice(&this.buf);
150 this.buf = Vec::new();
151 }
152 return Poll::Ready(Ok(()));
153 }
154 }
155 Poll::Ready(Err(err)) => {
156 return Poll::Ready(Err(io::Error::other(err.to_string())));
157 }
158 Poll::Pending => return Poll::Pending,
159 }
160 }
161 } else {
162 Poll::Pending
163 }
164 }
165}
166
167impl Unpin for ResponseReader {}