1use crate::http::{
2 FileId, GetRequestError, GetResponse, HttpClient, HttpError, HttpHeaders, HttpRequestBuilder,
3 HttpResponse,
4};
5use bytes::Bytes;
6use fast_pull::{ProgressEntry, PullResult, PullStream, RandPuller, SeqPuller};
7use futures::{Stream, TryFutureExt};
8use std::{
9 fmt::Debug,
10 pin::{Pin, pin},
11 sync::Arc,
12 task::{Context, Poll},
13 time::Duration,
14};
15use tokio::sync::Mutex;
16use url::Url;
17
18#[derive(Clone)]
19pub struct HttpPuller<Client: HttpClient> {
20 pub(crate) client: Client,
21 url: Url,
22 resp: Option<Arc<Mutex<Option<GetResponse<Client>>>>>,
23 file_id: FileId,
24}
25impl<Client: HttpClient> HttpPuller<Client> {
26 pub fn new(
27 url: Url,
28 client: Client,
29 resp: Option<Arc<Mutex<Option<GetResponse<Client>>>>>,
30 file_id: FileId,
31 ) -> Self {
32 Self {
33 client,
34 url,
35 resp,
36 file_id,
37 }
38 }
39}
40impl<Client: HttpClient> Debug for HttpPuller<Client> {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("HttpPuller")
43 .field("client", &"...")
44 .field("url", &self.url)
45 .field("resp", &"...")
46 .field("file_id", &self.file_id)
47 .finish()
48 }
49}
50
51type ResponseFut<Client> = Pin<
52 Box<
53 dyn Future<
54 Output = Result<GetResponse<Client>, (GetRequestError<Client>, Option<Duration>)>,
55 > + Send,
56 >,
57>;
58enum ResponseState<Client: HttpClient> {
59 Pending(ResponseFut<Client>),
60 Ready(GetResponse<Client>),
61 None,
62}
63
64impl<Client: HttpClient + 'static> RandPuller for HttpPuller<Client> {
65 type Error = HttpError<Client>;
66 async fn pull(
67 &mut self,
68 range: &ProgressEntry,
69 ) -> PullResult<Self::Error, impl PullStream<Self::Error>> {
70 Ok(RandRequestStream {
71 client: self.client.clone(),
72 url: self.url.clone(),
73 start: range.start,
74 end: range.end,
75 state: if range.start == 0
76 && let Some(resp) = &self.resp
77 && let Some(resp) = resp.lock().await.take()
78 {
79 ResponseState::Ready(resp)
80 } else {
81 ResponseState::None
82 },
83 file_id: self.file_id.clone(),
84 })
85 }
86}
87struct RandRequestStream<Client: HttpClient + 'static> {
88 client: Client,
89 url: Url,
90 start: u64,
91 end: u64,
92 state: ResponseState<Client>,
93 file_id: FileId,
94}
95impl<Client: HttpClient> Stream for RandRequestStream<Client> {
96 type Item = Result<Bytes, (HttpError<Client>, Option<Duration>)>;
97 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
98 let chunk_global;
99 match &mut self.state {
100 ResponseState::Pending(resp) => {
101 return match resp.try_poll_unpin(cx) {
102 Poll::Ready(resp) => match resp {
103 Ok(resp) => {
104 let etag = resp.headers().get("etag").ok();
105 let last_modified = resp.headers().get("last-modified").ok();
106 let new_file_id = FileId::new(etag, last_modified);
107 if new_file_id != self.file_id {
108 self.state = ResponseState::None;
109 Poll::Ready(Some(Err((
110 HttpError::MismatchedBody(new_file_id),
111 None,
112 ))))
113 } else {
114 self.state = ResponseState::Ready(resp);
115 self.poll_next(cx)
116 }
117 }
118 Err((e, d)) => {
119 self.state = ResponseState::None;
120 Poll::Ready(Some(Err((HttpError::Request(e), d))))
121 }
122 },
123 Poll::Pending => Poll::Pending,
124 };
125 }
126 ResponseState::None => {
127 let resp = self
128 .client
129 .get(self.url.clone(), Some(self.start..self.end))
130 .send();
131 self.state = ResponseState::Pending(Box::pin(resp));
132 return self.poll_next(cx);
133 }
134 ResponseState::Ready(resp) => {
135 let mut chunk = pin!(resp.chunk());
136 match chunk.try_poll_unpin(cx) {
137 Poll::Ready(Ok(Some(chunk))) => chunk_global = Ok(chunk),
138 Poll::Ready(Ok(None)) => return Poll::Ready(None),
139 Poll::Ready(Err(e)) => chunk_global = Err(e),
140 Poll::Pending => return Poll::Pending,
141 };
142 }
143 };
144 match chunk_global {
145 Ok(chunk) => {
146 self.start += chunk.len() as u64;
147 Poll::Ready(Some(Ok(chunk)))
148 }
149 Err(e) => {
150 self.state = ResponseState::None;
151 Poll::Ready(Some(Err((HttpError::Chunk(e), None))))
152 }
153 }
154 }
155}
156
157impl<Client: HttpClient + 'static> SeqPuller for HttpPuller<Client> {
158 type Error = HttpError<Client>;
159 async fn pull(&mut self) -> PullResult<Self::Error, impl PullStream<Self::Error>> {
160 Ok(SeqRequestStream {
161 state: if let Some(resp) = &self.resp
162 && let Some(resp) = resp.lock().await.take()
163 {
164 ResponseState::Ready(resp)
165 } else {
166 let req = self.client.get(self.url.clone(), None).send();
167 ResponseState::Pending(Box::pin(req))
168 },
169 file_id: self.file_id.clone(),
170 })
171 }
172}
173struct SeqRequestStream<Client: HttpClient + 'static> {
174 state: ResponseState<Client>,
175 file_id: FileId,
176}
177impl<Client: HttpClient> Stream for SeqRequestStream<Client> {
178 type Item = Result<Bytes, (HttpError<Client>, Option<Duration>)>;
179 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180 let chunk_global;
181 match &mut self.state {
182 ResponseState::Pending(resp) => {
183 return match resp.try_poll_unpin(cx) {
184 Poll::Ready(resp) => match resp {
185 Ok(resp) => {
186 let etag = resp.headers().get("etag").ok();
187 let last_modified = resp.headers().get("last-modified").ok();
188 let new_file_id = FileId::new(etag, last_modified);
189 if new_file_id != self.file_id {
190 self.state = ResponseState::None;
191 Poll::Ready(Some(Err((
192 HttpError::MismatchedBody(new_file_id),
193 None,
194 ))))
195 } else {
196 self.state = ResponseState::Ready(resp);
197 self.poll_next(cx)
198 }
199 }
200 Err((e, d)) => {
201 self.state = ResponseState::None;
202 Poll::Ready(Some(Err((HttpError::Request(e), d))))
203 }
204 },
205 Poll::Pending => Poll::Pending,
206 };
207 }
208 ResponseState::None => return Poll::Ready(Some(Err((HttpError::Irrecoverable, None)))),
209 ResponseState::Ready(resp) => {
210 let mut chunk = pin!(resp.chunk());
211 match chunk.try_poll_unpin(cx) {
212 Poll::Ready(Ok(Some(chunk))) => chunk_global = Ok(chunk),
213 Poll::Ready(Ok(None)) => return Poll::Ready(None),
214 Poll::Ready(Err(e)) => chunk_global = Err(e),
215 Poll::Pending => return Poll::Pending,
216 };
217 }
218 };
219 match chunk_global {
220 Ok(chunk) => Poll::Ready(Some(Ok(chunk))),
221 Err(e) => {
222 self.state = ResponseState::None;
223 Poll::Ready(Some(Err((HttpError::Chunk(e), None))))
224 }
225 }
226 }
227}