1use std::fmt::Display;
2
3use bytes::Bytes;
4use chrono::{DateTime, TimeZone, Utc};
5use futures::channel::oneshot;
6use futures::stream::BoxStream;
7use futures::stream::StreamExt;
8use object_store::PutResult;
9use object_store::{path::Path, ObjectMeta};
10use object_store::{Attributes, PutMode};
11use object_store::{Error, GetOptions, GetRange, GetResult, GetResultPayload, ObjectStore, Result};
12use url::Url;
13use wasm_bindgen_futures::spawn_local;
14use backon::ExponentialBuilder;
16use backon::Retryable;
17
18use async_trait::async_trait;
19use reqwest::{
20 header::{HeaderMap, CONTENT_LENGTH, ETAG, LAST_MODIFIED},
21 Client, Method, RequestBuilder, Response, StatusCode,
22};
23use snafu::{OptionExt, ResultExt, Snafu};
24
25#[derive(Debug, Copy, Clone)]
26struct HeaderConfig {
28 pub etag_required: bool,
32 pub last_modified_required: bool,
36
37 pub version_header: Option<&'static str>,
39}
40
41#[derive(Debug, Snafu)]
42enum HeaderError {
43 #[snafu(display("ETag Header missing from response"))]
44 MissingEtag,
45
46 #[snafu(display("Received header containing non-ASCII data"))]
47 BadHeader { source: reqwest::header::ToStrError },
48
49 #[snafu(display("Last-Modified Header missing from response"))]
50 MissingLastModified,
51
52 #[snafu(display("Content-Length Header missing from response"))]
53 MissingContentLength,
54
55 #[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
56 InvalidLastModified {
57 last_modified: String,
58 source: chrono::ParseError,
59 },
60
61 #[snafu(display("Invalid content length '{}': {}", content_length, source))]
62 InvalidContentLength {
63 content_length: String,
64 source: std::num::ParseIntError,
65 },
66}
67
68fn get_etag(headers: &HeaderMap) -> Result<String, HeaderError> {
69 let e_tag = headers.get(ETAG).ok_or(HeaderError::MissingEtag)?;
70 Ok(e_tag.to_str().context(BadHeaderSnafu)?.to_string())
71}
72
73fn header_meta(
74 location: &Path,
75 headers: &HeaderMap,
76 cfg: HeaderConfig,
77) -> Result<ObjectMeta, HeaderError> {
78 let last_modified = match headers.get(LAST_MODIFIED) {
79 Some(last_modified) => {
80 let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
81 DateTime::parse_from_rfc2822(last_modified)
82 .context(InvalidLastModifiedSnafu { last_modified })?
83 .with_timezone(&Utc)
84 }
85 None if cfg.last_modified_required => return Err(HeaderError::MissingLastModified),
86 None => Utc.timestamp_nanos(0),
87 };
88
89 let e_tag = match get_etag(headers) {
90 Ok(e_tag) => Some(e_tag),
91 Err(HeaderError::MissingEtag) if !cfg.etag_required => None,
92 Err(e) => return Err(e),
93 };
94
95 let content_length = headers
96 .get(CONTENT_LENGTH)
97 .context(MissingContentLengthSnafu)?;
98
99 let content_length = content_length.to_str().context(BadHeaderSnafu)?;
100 let size = content_length
101 .parse()
102 .context(InvalidContentLengthSnafu { content_length })?;
103
104 let version = match cfg.version_header.and_then(|h| headers.get(h)) {
105 Some(v) => Some(v.to_str().context(BadHeaderSnafu)?.to_string()),
106 None => None,
107 };
108
109 Ok(ObjectMeta {
110 location: location.clone(),
111 last_modified,
112 version,
113 size,
114 e_tag,
115 })
116}
117
118pub trait GetOptionsExt {
119 fn with_get_options(self, options: GetOptions) -> Self;
120}
121
122impl GetOptionsExt for RequestBuilder {
123 fn with_get_options(mut self, options: GetOptions) -> Self {
124 use reqwest::header::*;
125
126 if let Some(range) = options.range {
127 let range = match range {
128 GetRange::Bounded(range) => {
129 format!("bytes={}-{}", range.start, range.end.saturating_sub(1))
130 }
131 GetRange::Offset(offset) => {
132 format!("bytes={}-", offset)
133 }
134 GetRange::Suffix(upper_limit) => format!("bytes=-{}", upper_limit),
135 };
136 self = self.header(RANGE, range);
137 }
138
139 if let Some(tag) = options.if_match {
140 self = self.header(IF_MATCH, tag);
141 }
142
143 if let Some(tag) = options.if_none_match {
144 self = self.header(IF_NONE_MATCH, tag);
145 }
146
147 const DATE_FORMAT: &str = "%a, %d %b %Y %H:%M:%S GMT";
148 if let Some(date) = options.if_unmodified_since {
149 self = self.header(IF_UNMODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
150 }
151
152 if let Some(date) = options.if_modified_since {
153 self = self.header(IF_MODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
154 }
155
156 self
157 }
158}
159
160#[derive(Debug, Clone)]
161struct InnerClient {
162 url: Url,
163 client: Client,
164}
165
166impl InnerClient {
167 const STORE: &'static str = "HTTP";
168 const HEADER_CONFIG: HeaderConfig = HeaderConfig {
169 etag_required: false,
170 last_modified_required: false,
171 version_header: None,
172 };
173 fn new(url: Url) -> Self {
174 Self {
175 url,
176 client: Client::new(),
177 }
178 }
179
180 fn path_url(&self, location: &Path) -> Url {
181 let mut url = self.url.clone();
182 url.path_segments_mut()
183 .unwrap()
184 .pop_if_empty()
185 .extend(location.parts());
186 url
187 }
188
189 async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
190 let url = self.path_url(path);
191 let has_range = options.range.is_some();
192 let method = match options.head {
193 true => Method::HEAD,
194 false => Method::GET,
195 };
196 let builder = self.client.request(method, url).with_get_options(options);
197 let res_func = || async {
198 builder
199 .try_clone()
200 .unwrap()
201 .send()
202 .await
203 .and_then(|res| res.error_for_status())
204 };
205 let res = res_func
206 .retry(&ExponentialBuilder::default())
207 .await
208 .map_err(|source| match source.status() {
209 Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => Error::NotFound {
211 source: Box::new(source),
212 path: path.to_string(),
213 },
214 _ => Error::Generic {
215 store: InnerClient::STORE,
216 source: Box::new(source),
217 },
218 })?;
219
220 if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
223 return Err(Error::NotSupported {
224 source: Box::new(Error::NotImplemented {
225 }),
227 });
228 }
229
230 Ok(res)
231 }
232
233 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
234 let range = options.range.clone();
235 let no_body = options.head;
236 let response = self.get_request(location, options).await?;
237 let meta =
238 header_meta(location, response.headers(), InnerClient::HEADER_CONFIG).map_err(|e| {
239 Error::Generic {
240 store: InnerClient::STORE,
241 source: Box::new(e),
242 }
243 })?;
244 if no_body {
245 return Ok(GetResult {
246 range: Default::default(),
247 payload: GetResultPayload::Stream(futures::stream::empty().boxed()),
248 meta,
249 attributes: Attributes::new(),
250 });
251 }
252 let (tx, rx) = futures::channel::mpsc::channel(1);
253 spawn_local(async move {
254 let stream = response.bytes_stream();
255 stream
256 .map(|chunk| {
257 Ok(chunk.map_err(|source| Error::Generic {
258 store: InnerClient::STORE,
259 source: Box::new(source),
260 }))
261 })
262 .forward(tx)
263 .await
264 .unwrap();
265 });
266 let safe_stream = rx.boxed();
267
268 let resolved_range = match range {
269 Some(GetRange::Bounded(inner_range)) => inner_range,
270 Some(GetRange::Offset(lower_limit)) => lower_limit..meta.size,
271 Some(GetRange::Suffix(upper_limit)) => 0..upper_limit,
272 None => 0..meta.size,
273 };
274 Ok(GetResult {
275 range: resolved_range,
276 payload: GetResultPayload::Stream(safe_stream),
277 meta,
278 attributes: Attributes::new(),
279 })
280 }
281 pub async fn delete(&self, path: &Path) -> Result<()> {
282 let url = self.path_url(path);
283 self.client
284 .delete(url)
285 .send()
286 .await
287 .and_then(|res| res.error_for_status())
288 .map_err(|source| match source.status() {
289 Some(StatusCode::NOT_FOUND) => Error::NotFound {
290 source: Box::new(source),
291 path: path.to_string(),
292 },
293 _ => Error::Generic {
295 store: InnerClient::STORE,
296 source: Box::new(source),
297 },
298 })?;
299 Ok(())
300 }
301
302 pub async fn put(&self, _path: &Path, _payload: Bytes) -> Result<Response> {
303 todo!()
304 }
305}
306
307#[derive(Debug)]
308pub struct HttpStore {
309 client: InnerClient,
310}
311
312impl HttpStore {
313 pub fn new(url: Url) -> Self {
314 Self {
315 client: InnerClient::new(url),
316 }
317 }
318}
319
320#[async_trait]
321impl ObjectStore for HttpStore {
322 async fn put_multipart(
323 &self,
324 _location: &Path,
325 ) -> object_store::Result<Box<dyn object_store::MultipartUpload>> {
326 Err(Error::NotImplemented)
327 }
328
329 async fn put_multipart_opts(
330 &self,
331 _location: &Path,
332 _opts: object_store::PutMultipartOpts,
333 ) -> object_store::Result<Box<dyn object_store::MultipartUpload>> {
334 Err(Error::NotImplemented)
335 }
336
337 async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
338 todo!()
339 }
340 async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
341 todo!()
342 }
343 async fn delete(&self, _location: &Path) -> object_store::Result<()> {
344 let (sender, receiver) = oneshot::channel();
345 let copied_client = self.client.clone();
346 let cloned_location = _location.clone();
347 spawn_local(async move {
348 let res = copied_client.delete(&cloned_location).await;
349 sender.send(res).unwrap();
350 });
351 receiver.await.unwrap()
352 }
353
354 async fn get_opts(
355 &self,
356 location: &Path,
357 options: object_store::GetOptions,
358 ) -> object_store::Result<object_store::GetResult> {
359 let (sender, receiver) = oneshot::channel();
360 let copied_client = self.client.clone();
361 let copied_location = location.clone();
362 spawn_local(async move {
363 let res = copied_client.get_opts(&copied_location, options).await;
364 sender.send(res).unwrap();
365 });
366
367 receiver.await.unwrap()
368 }
369 async fn put_opts(
370 &self,
371 _location: &Path,
372 payload: object_store::PutPayload,
373 _options: object_store::PutOptions,
374 ) -> object_store::Result<object_store::PutResult> {
375 if _options.mode != PutMode::Overwrite {
376 return Err(Error::NotImplemented);
378 }
379
380 let response = self.client.put(_location, payload.into()).await?;
381 let e_tag = match get_etag(response.headers()) {
382 Ok(e_tag) => Some(e_tag),
383 Err(HeaderError::MissingEtag) => None,
384 Err(source) => {
385 return Err(Error::Generic {
386 store: InnerClient::STORE,
387 source: Box::new(source),
388 })
389 }
390 };
391
392 Ok(PutResult {
393 e_tag,
394 version: None,
395 })
396 }
397 fn list(&self, _prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
398 todo!()
399 }
400 async fn list_with_delimiter(
401 &self,
402 _prefix: Option<&Path>,
403 ) -> object_store::Result<object_store::ListResult> {
404 todo!()
405 }
406}
407impl Display for HttpStore {
408 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409 write!(f, "{:?}", self.client)
410 }
411}