object_store_wasm/http/
mod.rs

1use std::fmt::Display;
2
3use bytes::Bytes;
4use chrono::{DateTime, TimeZone, Utc};
5use futures::channel::oneshot;
6use futures::stream::BoxStream;
7use futures::stream::StreamExt;
8use object_store::PutResult;
9use object_store::{path::Path, ObjectMeta};
10use object_store::{Attributes, PutMode};
11use object_store::{Error, GetOptions, GetRange, GetResult, GetResultPayload, ObjectStore, Result};
12use url::Url;
13use wasm_bindgen_futures::spawn_local;
14// use tracing::info;
15use backon::ExponentialBuilder;
16use backon::Retryable;
17
18use async_trait::async_trait;
19use reqwest::{
20    header::{HeaderMap, CONTENT_LENGTH, ETAG, LAST_MODIFIED},
21    Client, Method, RequestBuilder, Response, StatusCode,
22};
23use snafu::{OptionExt, ResultExt, Snafu};
24
25#[derive(Debug, Copy, Clone)]
26/// Configuration for header extraction
27struct HeaderConfig {
28    /// Whether to require an ETag header when extracting [`ObjectMeta`] from headers.
29    ///
30    /// Defaults to `true`
31    pub etag_required: bool,
32    /// Whether to require a Last-Modified header when extracting [`ObjectMeta`] from headers.
33    ///
34    /// Defaults to `true`
35    pub last_modified_required: bool,
36
37    /// The version header name if any
38    pub version_header: Option<&'static str>,
39}
40
41#[derive(Debug, Snafu)]
42enum HeaderError {
43    #[snafu(display("ETag Header missing from response"))]
44    MissingEtag,
45
46    #[snafu(display("Received header containing non-ASCII data"))]
47    BadHeader { source: reqwest::header::ToStrError },
48
49    #[snafu(display("Last-Modified Header missing from response"))]
50    MissingLastModified,
51
52    #[snafu(display("Content-Length Header missing from response"))]
53    MissingContentLength,
54
55    #[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
56    InvalidLastModified {
57        last_modified: String,
58        source: chrono::ParseError,
59    },
60
61    #[snafu(display("Invalid content length '{}': {}", content_length, source))]
62    InvalidContentLength {
63        content_length: String,
64        source: std::num::ParseIntError,
65    },
66}
67
68fn get_etag(headers: &HeaderMap) -> Result<String, HeaderError> {
69    let e_tag = headers.get(ETAG).ok_or(HeaderError::MissingEtag)?;
70    Ok(e_tag.to_str().context(BadHeaderSnafu)?.to_string())
71}
72
73fn header_meta(
74    location: &Path,
75    headers: &HeaderMap,
76    cfg: HeaderConfig,
77) -> Result<ObjectMeta, HeaderError> {
78    let last_modified = match headers.get(LAST_MODIFIED) {
79        Some(last_modified) => {
80            let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
81            DateTime::parse_from_rfc2822(last_modified)
82                .context(InvalidLastModifiedSnafu { last_modified })?
83                .with_timezone(&Utc)
84        }
85        None if cfg.last_modified_required => return Err(HeaderError::MissingLastModified),
86        None => Utc.timestamp_nanos(0),
87    };
88
89    let e_tag = match get_etag(headers) {
90        Ok(e_tag) => Some(e_tag),
91        Err(HeaderError::MissingEtag) if !cfg.etag_required => None,
92        Err(e) => return Err(e),
93    };
94
95    let content_length = headers
96        .get(CONTENT_LENGTH)
97        .context(MissingContentLengthSnafu)?;
98
99    let content_length = content_length.to_str().context(BadHeaderSnafu)?;
100    let size = content_length
101        .parse()
102        .context(InvalidContentLengthSnafu { content_length })?;
103
104    let version = match cfg.version_header.and_then(|h| headers.get(h)) {
105        Some(v) => Some(v.to_str().context(BadHeaderSnafu)?.to_string()),
106        None => None,
107    };
108
109    Ok(ObjectMeta {
110        location: location.clone(),
111        last_modified,
112        version,
113        size,
114        e_tag,
115    })
116}
117
118pub trait GetOptionsExt {
119    fn with_get_options(self, options: GetOptions) -> Self;
120}
121
122impl GetOptionsExt for RequestBuilder {
123    fn with_get_options(mut self, options: GetOptions) -> Self {
124        use reqwest::header::*;
125
126        if let Some(range) = options.range {
127            let range = match range {
128                GetRange::Bounded(range) => {
129                    format!("bytes={}-{}", range.start, range.end.saturating_sub(1))
130                }
131                GetRange::Offset(offset) => {
132                    format!("bytes={}-", offset)
133                }
134                GetRange::Suffix(upper_limit) => format!("bytes=-{}", upper_limit),
135            };
136            self = self.header(RANGE, range);
137        }
138
139        if let Some(tag) = options.if_match {
140            self = self.header(IF_MATCH, tag);
141        }
142
143        if let Some(tag) = options.if_none_match {
144            self = self.header(IF_NONE_MATCH, tag);
145        }
146
147        const DATE_FORMAT: &str = "%a, %d %b %Y %H:%M:%S GMT";
148        if let Some(date) = options.if_unmodified_since {
149            self = self.header(IF_UNMODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
150        }
151
152        if let Some(date) = options.if_modified_since {
153            self = self.header(IF_MODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
154        }
155
156        self
157    }
158}
159
160#[derive(Debug, Clone)]
161struct InnerClient {
162    url: Url,
163    client: Client,
164}
165
166impl InnerClient {
167    const STORE: &'static str = "HTTP";
168    const HEADER_CONFIG: HeaderConfig = HeaderConfig {
169        etag_required: false,
170        last_modified_required: false,
171        version_header: None,
172    };
173    fn new(url: Url) -> Self {
174        Self {
175            url,
176            client: Client::new(),
177        }
178    }
179
180    fn path_url(&self, location: &Path) -> Url {
181        let mut url = self.url.clone();
182        url.path_segments_mut()
183            .unwrap()
184            .pop_if_empty()
185            .extend(location.parts());
186        url
187    }
188
189    async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
190        let url = self.path_url(path);
191        let has_range = options.range.is_some();
192        let method = match options.head {
193            true => Method::HEAD,
194            false => Method::GET,
195        };
196        let builder = self.client.request(method, url).with_get_options(options);
197        let res_func = || async {
198            builder
199                .try_clone()
200                .unwrap()
201                .send()
202                .await
203                .and_then(|res| res.error_for_status())
204        };
205        let res = res_func
206            .retry(&ExponentialBuilder::default())
207            .await
208            .map_err(|source| match source.status() {
209                // Some stores return METHOD_NOT_ALLOWED for get on directories
210                Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => Error::NotFound {
211                    source: Box::new(source),
212                    path: path.to_string(),
213                },
214                _ => Error::Generic {
215                    store: InnerClient::STORE,
216                    source: Box::new(source),
217                },
218            })?;
219
220        // We expect a 206 Partial Content response if a range was requested
221        // a 200 OK response would indicate the server did not fulfill the request
222        if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
223            return Err(Error::NotSupported {
224                source: Box::new(Error::NotImplemented {
225                    // href: path.to_string(),
226                }),
227            });
228        }
229
230        Ok(res)
231    }
232
233    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
234        let range = options.range.clone();
235        let no_body = options.head;
236        let response = self.get_request(location, options).await?;
237        let meta =
238            header_meta(location, response.headers(), InnerClient::HEADER_CONFIG).map_err(|e| {
239                Error::Generic {
240                    store: InnerClient::STORE,
241                    source: Box::new(e),
242                }
243            })?;
244        if no_body {
245            return Ok(GetResult {
246                range: Default::default(),
247                payload: GetResultPayload::Stream(futures::stream::empty().boxed()),
248                meta,
249                attributes: Attributes::new(),
250            });
251        }
252        let (tx, rx) = futures::channel::mpsc::channel(1);
253        spawn_local(async move {
254            let stream = response.bytes_stream();
255            stream
256                .map(|chunk| {
257                    Ok(chunk.map_err(|source| Error::Generic {
258                        store: InnerClient::STORE,
259                        source: Box::new(source),
260                    }))
261                })
262                .forward(tx)
263                .await
264                .unwrap();
265        });
266        let safe_stream = rx.boxed();
267
268        let resolved_range = match range {
269            Some(GetRange::Bounded(inner_range)) => inner_range,
270            Some(GetRange::Offset(lower_limit)) => lower_limit..meta.size,
271            Some(GetRange::Suffix(upper_limit)) => 0..upper_limit,
272            None => 0..meta.size,
273        };
274        Ok(GetResult {
275            range: resolved_range,
276            payload: GetResultPayload::Stream(safe_stream),
277            meta,
278            attributes: Attributes::new(),
279        })
280    }
281    pub async fn delete(&self, path: &Path) -> Result<()> {
282        let url = self.path_url(path);
283        self.client
284            .delete(url)
285            .send()
286            .await
287            .and_then(|res| res.error_for_status())
288            .map_err(|source| match source.status() {
289                Some(StatusCode::NOT_FOUND) => Error::NotFound {
290                    source: Box::new(source),
291                    path: path.to_string(),
292                },
293                // TODO: de-genericize
294                _ => Error::Generic {
295                    store: InnerClient::STORE,
296                    source: Box::new(source),
297                },
298            })?;
299        Ok(())
300    }
301
302    pub async fn put(&self, _path: &Path, _payload: Bytes) -> Result<Response> {
303        todo!()
304    }
305}
306
307#[derive(Debug)]
308pub struct HttpStore {
309    client: InnerClient,
310}
311
312impl HttpStore {
313    pub fn new(url: Url) -> Self {
314        Self {
315            client: InnerClient::new(url),
316        }
317    }
318}
319
320#[async_trait]
321impl ObjectStore for HttpStore {
322    async fn put_multipart(
323        &self,
324        _location: &Path,
325    ) -> object_store::Result<Box<dyn object_store::MultipartUpload>> {
326        Err(Error::NotImplemented)
327    }
328
329    async fn put_multipart_opts(
330        &self,
331        _location: &Path,
332        _opts: object_store::PutMultipartOpts,
333    ) -> object_store::Result<Box<dyn object_store::MultipartUpload>> {
334        Err(Error::NotImplemented)
335    }
336
337    async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
338        todo!()
339    }
340    async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
341        todo!()
342    }
343    async fn delete(&self, _location: &Path) -> object_store::Result<()> {
344        let (sender, receiver) = oneshot::channel();
345        let copied_client = self.client.clone();
346        let cloned_location = _location.clone();
347        spawn_local(async move {
348            let res = copied_client.delete(&cloned_location).await;
349            sender.send(res).unwrap();
350        });
351        receiver.await.unwrap()
352    }
353
354    async fn get_opts(
355        &self,
356        location: &Path,
357        options: object_store::GetOptions,
358    ) -> object_store::Result<object_store::GetResult> {
359        let (sender, receiver) = oneshot::channel();
360        let copied_client = self.client.clone();
361        let copied_location = location.clone();
362        spawn_local(async move {
363            let res = copied_client.get_opts(&copied_location, options).await;
364            sender.send(res).unwrap();
365        });
366
367        receiver.await.unwrap()
368    }
369    async fn put_opts(
370        &self,
371        _location: &Path,
372        payload: object_store::PutPayload,
373        _options: object_store::PutOptions,
374    ) -> object_store::Result<object_store::PutResult> {
375        if _options.mode != PutMode::Overwrite {
376            // TODO: Add support for If header - https://datatracker.ietf.org/doc/html/rfc2518#section-9.4
377            return Err(Error::NotImplemented);
378        }
379
380        let response = self.client.put(_location, payload.into()).await?;
381        let e_tag = match get_etag(response.headers()) {
382            Ok(e_tag) => Some(e_tag),
383            Err(HeaderError::MissingEtag) => None,
384            Err(source) => {
385                return Err(Error::Generic {
386                    store: InnerClient::STORE,
387                    source: Box::new(source),
388                })
389            }
390        };
391
392        Ok(PutResult {
393            e_tag,
394            version: None,
395        })
396    }
397    fn list(&self, _prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
398        todo!()
399    }
400    async fn list_with_delimiter(
401        &self,
402        _prefix: Option<&Path>,
403    ) -> object_store::Result<object_store::ListResult> {
404        todo!()
405    }
406}
407impl Display for HttpStore {
408    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409        write!(f, "{:?}", self.client)
410    }
411}