fast_down/http/
puller.rs

1use crate::http::{
2    FileId, GetRequestError, GetResponse, HttpClient, HttpError, HttpHeaders, HttpRequestBuilder,
3    HttpResponse,
4};
5use bytes::Bytes;
6use fast_pull::{ProgressEntry, PullResult, PullStream, RandPuller, SeqPuller};
7use futures::{Stream, TryFutureExt};
8use std::{
9    fmt::Debug,
10    pin::{Pin, pin},
11    sync::Arc,
12    task::{Context, Poll},
13    time::Duration,
14};
15use tokio::sync::Mutex;
16use url::Url;
17
18#[derive(Clone)]
19pub struct HttpPuller<Client: HttpClient> {
20    pub(crate) client: Client,
21    url: Url,
22    resp: Option<Arc<Mutex<Option<GetResponse<Client>>>>>,
23    file_id: FileId,
24}
25impl<Client: HttpClient> HttpPuller<Client> {
26    pub fn new(
27        url: Url,
28        client: Client,
29        resp: Option<Arc<Mutex<Option<GetResponse<Client>>>>>,
30        file_id: FileId,
31    ) -> Self {
32        Self {
33            client,
34            url,
35            resp,
36            file_id,
37        }
38    }
39}
40impl<Client: HttpClient> Debug for HttpPuller<Client> {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("HttpPuller")
43            .field("client", &"...")
44            .field("url", &self.url)
45            .field("resp", &"...")
46            .field("file_id", &self.file_id)
47            .finish()
48    }
49}
50
51type ResponseFut<Client> = Pin<
52    Box<
53        dyn Future<
54                Output = Result<GetResponse<Client>, (GetRequestError<Client>, Option<Duration>)>,
55            > + Send,
56    >,
57>;
58enum ResponseState<Client: HttpClient> {
59    Pending(ResponseFut<Client>),
60    Ready(GetResponse<Client>),
61    None,
62}
63
64impl<Client: HttpClient + 'static> RandPuller for HttpPuller<Client> {
65    type Error = HttpError<Client>;
66    async fn pull(
67        &mut self,
68        range: &ProgressEntry,
69    ) -> PullResult<Self::Error, impl PullStream<Self::Error>> {
70        Ok(RandRequestStream {
71            client: self.client.clone(),
72            url: self.url.clone(),
73            start: range.start,
74            end: range.end,
75            state: if range.start == 0
76                && let Some(resp) = &self.resp
77                && let Some(resp) = resp.lock().await.take()
78            {
79                ResponseState::Ready(resp)
80            } else {
81                ResponseState::None
82            },
83            file_id: self.file_id.clone(),
84        })
85    }
86}
87struct RandRequestStream<Client: HttpClient + 'static> {
88    client: Client,
89    url: Url,
90    start: u64,
91    end: u64,
92    state: ResponseState<Client>,
93    file_id: FileId,
94}
95impl<Client: HttpClient> Stream for RandRequestStream<Client> {
96    type Item = Result<Bytes, (HttpError<Client>, Option<Duration>)>;
97    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
98        let chunk_global;
99        match &mut self.state {
100            ResponseState::Pending(resp) => {
101                return match resp.try_poll_unpin(cx) {
102                    Poll::Ready(resp) => match resp {
103                        Ok(resp) => {
104                            let etag = resp.headers().get("etag").ok();
105                            let last_modified = resp.headers().get("last-modified").ok();
106                            let new_file_id = FileId::new(etag, last_modified);
107                            if new_file_id != self.file_id {
108                                self.state = ResponseState::None;
109                                Poll::Ready(Some(Err((
110                                    HttpError::MismatchedBody(new_file_id),
111                                    None,
112                                ))))
113                            } else {
114                                self.state = ResponseState::Ready(resp);
115                                self.poll_next(cx)
116                            }
117                        }
118                        Err((e, d)) => {
119                            self.state = ResponseState::None;
120                            Poll::Ready(Some(Err((HttpError::Request(e), d))))
121                        }
122                    },
123                    Poll::Pending => Poll::Pending,
124                };
125            }
126            ResponseState::None => {
127                let resp = self
128                    .client
129                    .get(self.url.clone(), Some(self.start..self.end))
130                    .send();
131                self.state = ResponseState::Pending(Box::pin(resp));
132                return self.poll_next(cx);
133            }
134            ResponseState::Ready(resp) => {
135                let mut chunk = pin!(resp.chunk());
136                match chunk.try_poll_unpin(cx) {
137                    Poll::Ready(Ok(Some(chunk))) => chunk_global = Ok(chunk),
138                    Poll::Ready(Ok(None)) => return Poll::Ready(None),
139                    Poll::Ready(Err(e)) => chunk_global = Err(e),
140                    Poll::Pending => return Poll::Pending,
141                };
142            }
143        };
144        match chunk_global {
145            Ok(chunk) => {
146                self.start += chunk.len() as u64;
147                Poll::Ready(Some(Ok(chunk)))
148            }
149            Err(e) => {
150                self.state = ResponseState::None;
151                Poll::Ready(Some(Err((HttpError::Chunk(e), None))))
152            }
153        }
154    }
155}
156
157impl<Client: HttpClient + 'static> SeqPuller for HttpPuller<Client> {
158    type Error = HttpError<Client>;
159    async fn pull(&mut self) -> PullResult<Self::Error, impl PullStream<Self::Error>> {
160        Ok(SeqRequestStream {
161            state: if let Some(resp) = &self.resp
162                && let Some(resp) = resp.lock().await.take()
163            {
164                ResponseState::Ready(resp)
165            } else {
166                let req = self.client.get(self.url.clone(), None).send();
167                ResponseState::Pending(Box::pin(req))
168            },
169            file_id: self.file_id.clone(),
170        })
171    }
172}
173struct SeqRequestStream<Client: HttpClient + 'static> {
174    state: ResponseState<Client>,
175    file_id: FileId,
176}
177impl<Client: HttpClient> Stream for SeqRequestStream<Client> {
178    type Item = Result<Bytes, (HttpError<Client>, Option<Duration>)>;
179    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180        let chunk_global;
181        match &mut self.state {
182            ResponseState::Pending(resp) => {
183                return match resp.try_poll_unpin(cx) {
184                    Poll::Ready(resp) => match resp {
185                        Ok(resp) => {
186                            let etag = resp.headers().get("etag").ok();
187                            let last_modified = resp.headers().get("last-modified").ok();
188                            let new_file_id = FileId::new(etag, last_modified);
189                            if new_file_id != self.file_id {
190                                self.state = ResponseState::None;
191                                Poll::Ready(Some(Err((
192                                    HttpError::MismatchedBody(new_file_id),
193                                    None,
194                                ))))
195                            } else {
196                                self.state = ResponseState::Ready(resp);
197                                self.poll_next(cx)
198                            }
199                        }
200                        Err((e, d)) => {
201                            self.state = ResponseState::None;
202                            Poll::Ready(Some(Err((HttpError::Request(e), d))))
203                        }
204                    },
205                    Poll::Pending => Poll::Pending,
206                };
207            }
208            ResponseState::None => return Poll::Ready(Some(Err((HttpError::Irrecoverable, None)))),
209            ResponseState::Ready(resp) => {
210                let mut chunk = pin!(resp.chunk());
211                match chunk.try_poll_unpin(cx) {
212                    Poll::Ready(Ok(Some(chunk))) => chunk_global = Ok(chunk),
213                    Poll::Ready(Ok(None)) => return Poll::Ready(None),
214                    Poll::Ready(Err(e)) => chunk_global = Err(e),
215                    Poll::Pending => return Poll::Pending,
216                };
217            }
218        };
219        match chunk_global {
220            Ok(chunk) => Poll::Ready(Some(Ok(chunk))),
221            Err(e) => {
222                self.state = ResponseState::None;
223                Poll::Ready(Some(Err((HttpError::Chunk(e), None))))
224            }
225        }
226    }
227}