1use crate::http::{
2 GetRequestError, GetResponse, HttpClient, HttpError, HttpRequestBuilder, HttpResponse,
3};
4use bytes::Bytes;
5use fast_pull::{ProgressEntry, RandPuller, SeqPuller};
6use futures::{Stream, TryFutureExt, TryStream};
7use std::{
8 pin::{Pin, pin},
9 task::{Context, Poll},
10};
11use url::Url;
12
13#[derive(Clone)]
14pub struct HttpPuller<Client: HttpClient> {
15 pub(crate) client: Client,
16 url: Url,
17}
18impl<Client: HttpClient> HttpPuller<Client> {
19 pub fn new(url: Url, client: Client) -> Self {
20 Self { client, url }
21 }
22}
23
24type ResponseFut<Client> =
25 Pin<Box<dyn Future<Output = Result<GetResponse<Client>, GetRequestError<Client>>> + Send>>;
26enum ResponseState<Client: HttpClient> {
27 Pending(ResponseFut<Client>),
28 Ready(GetResponse<Client>),
29 None,
30}
31
32impl<Client: HttpClient + 'static> RandPuller for HttpPuller<Client> {
33 type Error = HttpError<Client>;
34 fn pull(
35 &mut self,
36 range: &ProgressEntry,
37 ) -> impl TryStream<Ok = Bytes, Error = Self::Error> + Send + Unpin {
38 RandRequestStream {
39 client: self.client.clone(),
40 url: self.url.clone(),
41 start: range.start,
42 end: range.end,
43 state: ResponseState::None,
44 }
45 }
46}
47struct RandRequestStream<Client: HttpClient + 'static> {
48 client: Client,
49 url: Url,
50 start: u64,
51 end: u64,
52 state: ResponseState<Client>,
53}
54impl<Client: HttpClient> Stream for RandRequestStream<Client> {
55 type Item = Result<Bytes, HttpError<Client>>;
56 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57 let chunk_global;
58 match &mut self.state {
59 ResponseState::Pending(resp) => {
60 return match resp.try_poll_unpin(cx) {
61 Poll::Ready(resp) => match resp {
62 Ok(resp) => {
63 self.state = ResponseState::Ready(resp);
64 self.poll_next(cx)
65 }
66 Err(e) => {
67 self.state = ResponseState::None;
68 Poll::Ready(Some(Err(HttpError::Request(e))))
69 }
70 },
71 Poll::Pending => Poll::Pending,
72 };
73 }
74 ResponseState::None => {
75 let resp = self
76 .client
77 .get(self.url.clone(), Some(self.start..self.end))
78 .send();
79 self.state = ResponseState::Pending(Box::pin(resp));
80 return self.poll_next(cx);
81 }
82 ResponseState::Ready(resp) => {
83 let mut chunk = pin!(resp.chunk());
84 match chunk.try_poll_unpin(cx) {
85 Poll::Ready(Ok(Some(chunk))) => chunk_global = Ok(chunk),
86 Poll::Ready(Ok(None)) => return Poll::Ready(None),
87 Poll::Ready(Err(e)) => chunk_global = Err(e),
88 Poll::Pending => return Poll::Pending,
89 };
90 }
91 };
92 match chunk_global {
93 Ok(chunk) => {
94 self.start += chunk.len() as u64;
95 Poll::Ready(Some(Ok(chunk)))
96 }
97 Err(e) => {
98 self.state = ResponseState::None;
99 Poll::Ready(Some(Err(HttpError::Chunk(e))))
100 }
101 }
102 }
103}
104
105impl<Client: HttpClient + 'static> SeqPuller for HttpPuller<Client> {
106 type Error = HttpError<Client>;
107 fn pull(&mut self) -> impl TryStream<Ok = Bytes, Error = Self::Error> + Send + Unpin {
108 let req = self.client.get(self.url.clone(), None).send();
109 SeqRequestStream {
110 state: ResponseState::Pending(Box::pin(req)),
111 }
112 }
113}
114struct SeqRequestStream<Client: HttpClient + 'static> {
115 state: ResponseState<Client>,
116}
117impl<Client: HttpClient> Stream for SeqRequestStream<Client> {
118 type Item = Result<Bytes, HttpError<Client>>;
119 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
120 let chunk_global;
121 match &mut self.state {
122 ResponseState::Pending(resp) => {
123 return match resp.try_poll_unpin(cx) {
124 Poll::Ready(resp) => match resp {
125 Ok(resp) => {
126 self.state = ResponseState::Ready(resp);
127 self.poll_next(cx)
128 }
129 Err(e) => {
130 self.state = ResponseState::None;
131 Poll::Ready(Some(Err(HttpError::Request(e))))
132 }
133 },
134 Poll::Pending => Poll::Pending,
135 };
136 }
137 ResponseState::None => return Poll::Ready(Some(Err(HttpError::Irrecoverable))),
138 ResponseState::Ready(resp) => {
139 let mut chunk = pin!(resp.chunk());
140 match chunk.try_poll_unpin(cx) {
141 Poll::Ready(Ok(Some(chunk))) => chunk_global = Ok(chunk),
142 Poll::Ready(Ok(None)) => return Poll::Ready(None),
143 Poll::Ready(Err(e)) => chunk_global = Err(e),
144 Poll::Pending => return Poll::Pending,
145 };
146 }
147 };
148 match chunk_global {
149 Ok(chunk) => Poll::Ready(Some(Ok(chunk))),
150 Err(e) => {
151 self.state = ResponseState::None;
152 Poll::Ready(Some(Err(HttpError::Chunk(e))))
153 }
154 }
155 }
156}