fast_down/http/
puller.rs

1use crate::http::{
2    GetRequestError, GetResponse, HttpClient, HttpError, HttpRequestBuilder, HttpResponse,
3};
4use bytes::Bytes;
5use fast_pull::{ProgressEntry, RandPuller, SeqPuller};
6use futures::{Stream, TryFutureExt, TryStream};
7use std::{
8    pin::{Pin, pin},
9    task::{Context, Poll},
10};
11use url::Url;
12
13#[derive(Clone)]
14pub struct HttpPuller<Client: HttpClient> {
15    pub(crate) client: Client,
16    url: Url,
17}
18impl<Client: HttpClient> HttpPuller<Client> {
19    pub fn new(url: Url, client: Client) -> Self {
20        Self { client, url }
21    }
22}
23
24type ResponseFut<Client> =
25    Pin<Box<dyn Future<Output = Result<GetResponse<Client>, GetRequestError<Client>>> + Send>>;
26enum ResponseState<Client: HttpClient> {
27    Pending(ResponseFut<Client>),
28    Ready(GetResponse<Client>),
29    None,
30}
31
32impl<Client: HttpClient + 'static> RandPuller for HttpPuller<Client> {
33    type Error = HttpError<Client>;
34    fn pull(
35        &mut self,
36        range: &ProgressEntry,
37    ) -> impl TryStream<Ok = Bytes, Error = Self::Error> + Send + Unpin {
38        RandRequestStream {
39            client: self.client.clone(),
40            url: self.url.clone(),
41            start: range.start,
42            end: range.end,
43            state: ResponseState::None,
44        }
45    }
46}
47struct RandRequestStream<Client: HttpClient + 'static> {
48    client: Client,
49    url: Url,
50    start: u64,
51    end: u64,
52    state: ResponseState<Client>,
53}
54impl<Client: HttpClient> Stream for RandRequestStream<Client> {
55    type Item = Result<Bytes, HttpError<Client>>;
56    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57        let chunk_global;
58        match &mut self.state {
59            ResponseState::Pending(resp) => {
60                return match resp.try_poll_unpin(cx) {
61                    Poll::Ready(resp) => match resp {
62                        Ok(resp) => {
63                            self.state = ResponseState::Ready(resp);
64                            self.poll_next(cx)
65                        }
66                        Err(e) => {
67                            self.state = ResponseState::None;
68                            Poll::Ready(Some(Err(HttpError::Request(e))))
69                        }
70                    },
71                    Poll::Pending => Poll::Pending,
72                };
73            }
74            ResponseState::None => {
75                let resp = self
76                    .client
77                    .get(self.url.clone(), Some(self.start..self.end))
78                    .send();
79                self.state = ResponseState::Pending(Box::pin(resp));
80                return self.poll_next(cx);
81            }
82            ResponseState::Ready(resp) => {
83                let mut chunk = pin!(resp.chunk());
84                match chunk.try_poll_unpin(cx) {
85                    Poll::Ready(Ok(Some(chunk))) => chunk_global = Ok(chunk),
86                    Poll::Ready(Ok(None)) => return Poll::Ready(None),
87                    Poll::Ready(Err(e)) => chunk_global = Err(e),
88                    Poll::Pending => return Poll::Pending,
89                };
90            }
91        };
92        match chunk_global {
93            Ok(chunk) => {
94                self.start += chunk.len() as u64;
95                Poll::Ready(Some(Ok(chunk)))
96            }
97            Err(e) => {
98                self.state = ResponseState::None;
99                Poll::Ready(Some(Err(HttpError::Chunk(e))))
100            }
101        }
102    }
103}
104
105impl<Client: HttpClient + 'static> SeqPuller for HttpPuller<Client> {
106    type Error = HttpError<Client>;
107    fn pull(&mut self) -> impl TryStream<Ok = Bytes, Error = Self::Error> + Send + Unpin {
108        let req = self.client.get(self.url.clone(), None).send();
109        SeqRequestStream {
110            state: ResponseState::Pending(Box::pin(req)),
111        }
112    }
113}
114struct SeqRequestStream<Client: HttpClient + 'static> {
115    state: ResponseState<Client>,
116}
117impl<Client: HttpClient> Stream for SeqRequestStream<Client> {
118    type Item = Result<Bytes, HttpError<Client>>;
119    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
120        let chunk_global;
121        match &mut self.state {
122            ResponseState::Pending(resp) => {
123                return match resp.try_poll_unpin(cx) {
124                    Poll::Ready(resp) => match resp {
125                        Ok(resp) => {
126                            self.state = ResponseState::Ready(resp);
127                            self.poll_next(cx)
128                        }
129                        Err(e) => {
130                            self.state = ResponseState::None;
131                            Poll::Ready(Some(Err(HttpError::Request(e))))
132                        }
133                    },
134                    Poll::Pending => Poll::Pending,
135                };
136            }
137            ResponseState::None => return Poll::Ready(Some(Err(HttpError::Irrecoverable))),
138            ResponseState::Ready(resp) => {
139                let mut chunk = pin!(resp.chunk());
140                match chunk.try_poll_unpin(cx) {
141                    Poll::Ready(Ok(Some(chunk))) => chunk_global = Ok(chunk),
142                    Poll::Ready(Ok(None)) => return Poll::Ready(None),
143                    Poll::Ready(Err(e)) => chunk_global = Err(e),
144                    Poll::Pending => return Poll::Pending,
145                };
146            }
147        };
148        match chunk_global {
149            Ok(chunk) => Poll::Ready(Some(Ok(chunk))),
150            Err(e) => {
151                self.state = ResponseState::None;
152                Poll::Ready(Some(Err(HttpError::Chunk(e))))
153            }
154        }
155    }
156}