object_store/http/
client.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::client::get::GetClient;
19use crate::client::header::HeaderConfig;
20use crate::client::retry::{self, RetryConfig, RetryExt};
21use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpResponse};
22use crate::path::{Path, DELIMITER};
23use crate::util::deserialize_rfc1123;
24use crate::{Attribute, Attributes, ClientOptions, GetOptions, ObjectMeta, PutPayload, Result};
25use async_trait::async_trait;
26use bytes::Buf;
27use chrono::{DateTime, Utc};
28use http::header::{
29    CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH,
30    CONTENT_TYPE,
31};
32use percent_encoding::percent_decode_str;
33use reqwest::{Method, StatusCode};
34use serde::Deserialize;
35use url::Url;
36
37#[derive(Debug, thiserror::Error)]
38enum Error {
39    #[error("Request error: {}", source)]
40    Request { source: retry::RetryError },
41
42    #[error("Request error: {}", source)]
43    Reqwest { source: HttpError },
44
45    #[error("Range request not supported by {}", href)]
46    RangeNotSupported { href: String },
47
48    #[error("Error decoding PROPFIND response: {}", source)]
49    InvalidPropFind { source: quick_xml::de::DeError },
50
51    #[error("Missing content size for {}", href)]
52    MissingSize { href: String },
53
54    #[error("Error getting properties of \"{}\" got \"{}\"", href, status)]
55    PropStatus { href: String, status: String },
56
57    #[error("Failed to parse href \"{}\": {}", href, source)]
58    InvalidHref {
59        href: String,
60        source: url::ParseError,
61    },
62
63    #[error("Path \"{}\" contained non-unicode characters: {}", path, source)]
64    NonUnicode {
65        path: String,
66        source: std::str::Utf8Error,
67    },
68
69    #[error("Encountered invalid path \"{}\": {}", path, source)]
70    InvalidPath {
71        path: String,
72        source: crate::path::Error,
73    },
74}
75
76impl From<Error> for crate::Error {
77    fn from(err: Error) -> Self {
78        Self::Generic {
79            store: "HTTP",
80            source: Box::new(err),
81        }
82    }
83}
84
85/// Internal client for HttpStore
86#[derive(Debug)]
87pub(crate) struct Client {
88    url: Url,
89    client: HttpClient,
90    retry_config: RetryConfig,
91    client_options: ClientOptions,
92}
93
94impl Client {
95    pub(crate) fn new(
96        url: Url,
97        client: HttpClient,
98        client_options: ClientOptions,
99        retry_config: RetryConfig,
100    ) -> Self {
101        Self {
102            url,
103            retry_config,
104            client_options,
105            client,
106        }
107    }
108
109    pub(crate) fn base_url(&self) -> &Url {
110        &self.url
111    }
112
113    fn path_url(&self, location: &Path) -> String {
114        let mut url = self.url.clone();
115        url.path_segments_mut().unwrap().extend(location.parts());
116        url.to_string()
117    }
118
119    /// Create a directory with `path` using MKCOL
120    async fn make_directory(&self, path: &str) -> Result<(), Error> {
121        let method = Method::from_bytes(b"MKCOL").unwrap();
122        let mut url = self.url.clone();
123        url.path_segments_mut()
124            .unwrap()
125            .extend(path.split(DELIMITER));
126
127        self.client
128            .request(method, String::from(url))
129            .send_retry(&self.retry_config)
130            .await
131            .map_err(|source| Error::Request { source })?;
132
133        Ok(())
134    }
135
136    /// Recursively create parent directories
137    async fn create_parent_directories(&self, location: &Path) -> Result<()> {
138        let mut stack = vec![];
139
140        // Walk backwards until a request succeeds
141        let mut last_prefix = location.as_ref();
142        while let Some((prefix, _)) = last_prefix.rsplit_once(DELIMITER) {
143            last_prefix = prefix;
144
145            match self.make_directory(prefix).await {
146                Ok(_) => break,
147                Err(Error::Request { source })
148                    if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
149                {
150                    // Need to create parent
151                    stack.push(prefix)
152                }
153                Err(e) => return Err(e.into()),
154            }
155        }
156
157        // Retry the failed requests, which should now succeed
158        for prefix in stack.into_iter().rev() {
159            self.make_directory(prefix).await?;
160        }
161
162        Ok(())
163    }
164
165    pub(crate) async fn put(
166        &self,
167        location: &Path,
168        payload: PutPayload,
169        attributes: Attributes,
170    ) -> Result<HttpResponse> {
171        let mut retry = false;
172        loop {
173            let url = self.path_url(location);
174            let mut builder = self.client.put(url);
175
176            let mut has_content_type = false;
177            for (k, v) in &attributes {
178                builder = match k {
179                    Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
180                    Attribute::ContentDisposition => {
181                        builder.header(CONTENT_DISPOSITION, v.as_ref())
182                    }
183                    Attribute::ContentEncoding => builder.header(CONTENT_ENCODING, v.as_ref()),
184                    Attribute::ContentLanguage => builder.header(CONTENT_LANGUAGE, v.as_ref()),
185                    Attribute::ContentType => {
186                        has_content_type = true;
187                        builder.header(CONTENT_TYPE, v.as_ref())
188                    }
189                    // Ignore metadata attributes
190                    Attribute::Metadata(_) => builder,
191                };
192            }
193
194            if !has_content_type {
195                if let Some(value) = self.client_options.get_content_type(location) {
196                    builder = builder.header(CONTENT_TYPE, value);
197                }
198            }
199
200            let resp = builder
201                .header(CONTENT_LENGTH, payload.content_length())
202                .retryable(&self.retry_config)
203                .idempotent(true)
204                .payload(Some(payload.clone()))
205                .send()
206                .await;
207
208            match resp {
209                Ok(response) => return Ok(response),
210                Err(source) => match source.status() {
211                    // Some implementations return 404 instead of 409
212                    Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => {
213                        retry = true;
214                        self.create_parent_directories(location).await?
215                    }
216                    _ => return Err(Error::Request { source }.into()),
217                },
218            }
219        }
220    }
221
222    pub(crate) async fn list(&self, location: Option<&Path>, depth: &str) -> Result<MultiStatus> {
223        let url = location
224            .map(|path| self.path_url(path))
225            .unwrap_or_else(|| self.url.to_string());
226
227        let method = Method::from_bytes(b"PROPFIND").unwrap();
228        let result = self
229            .client
230            .request(method, url)
231            .header("Depth", depth)
232            .retryable(&self.retry_config)
233            .idempotent(true)
234            .send()
235            .await;
236
237        let response = match result {
238            Ok(result) => result
239                .into_body()
240                .bytes()
241                .await
242                .map_err(|source| Error::Reqwest { source })?,
243            Err(e) if matches!(e.status(), Some(StatusCode::NOT_FOUND)) => {
244                return match depth {
245                    "0" => {
246                        let path = location.map(|x| x.as_ref()).unwrap_or("");
247                        Err(crate::Error::NotFound {
248                            path: path.to_string(),
249                            source: Box::new(e),
250                        })
251                    }
252                    _ => {
253                        // If prefix not found, return empty result set
254                        Ok(Default::default())
255                    }
256                };
257            }
258            Err(source) => return Err(Error::Request { source }.into()),
259        };
260
261        let status = quick_xml::de::from_reader(response.reader())
262            .map_err(|source| Error::InvalidPropFind { source })?;
263
264        Ok(status)
265    }
266
267    pub(crate) async fn delete(&self, path: &Path) -> Result<()> {
268        let url = self.path_url(path);
269        self.client
270            .delete(url)
271            .send_retry(&self.retry_config)
272            .await
273            .map_err(|source| match source.status() {
274                Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
275                    source: Box::new(source),
276                    path: path.to_string(),
277                },
278                _ => Error::Request { source }.into(),
279            })?;
280        Ok(())
281    }
282
283    pub(crate) async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
284        let mut retry = false;
285        loop {
286            let method = Method::from_bytes(b"COPY").unwrap();
287
288            let mut builder = self
289                .client
290                .request(method, self.path_url(from))
291                .header("Destination", self.path_url(to).as_str());
292
293            if !overwrite {
294                // While the Overwrite header appears to duplicate
295                // the functionality of the If-Match: * header of HTTP/1.1, If-Match
296                // applies only to the Request-URI, and not to the Destination of a COPY
297                // or MOVE.
298                builder = builder.header("Overwrite", "F");
299            }
300
301            return match builder.send_retry(&self.retry_config).await {
302                Ok(_) => Ok(()),
303                Err(source) => Err(match source.status() {
304                    Some(StatusCode::PRECONDITION_FAILED) if !overwrite => {
305                        crate::Error::AlreadyExists {
306                            path: to.to_string(),
307                            source: Box::new(source),
308                        }
309                    }
310                    // Some implementations return 404 instead of 409
311                    Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => {
312                        retry = true;
313                        self.create_parent_directories(to).await?;
314                        continue;
315                    }
316                    _ => Error::Request { source }.into(),
317                }),
318            };
319        }
320    }
321}
322
323#[async_trait]
324impl GetClient for Client {
325    const STORE: &'static str = "HTTP";
326
327    /// Override the [`HeaderConfig`] to be less strict to support a
328    /// broader range of HTTP servers (#4831)
329    const HEADER_CONFIG: HeaderConfig = HeaderConfig {
330        etag_required: false,
331        last_modified_required: false,
332        version_header: None,
333        user_defined_metadata_prefix: None,
334    };
335
336    async fn get_request(&self, path: &Path, options: GetOptions) -> Result<HttpResponse> {
337        let url = self.path_url(path);
338        let method = match options.head {
339            true => Method::HEAD,
340            false => Method::GET,
341        };
342        let has_range = options.range.is_some();
343        let builder = self.client.request(method, url);
344
345        let res = builder
346            .with_get_options(options)
347            .send_retry(&self.retry_config)
348            .await
349            .map_err(|source| match source.status() {
350                // Some stores return METHOD_NOT_ALLOWED for get on directories
351                Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => {
352                    crate::Error::NotFound {
353                        source: Box::new(source),
354                        path: path.to_string(),
355                    }
356                }
357                _ => Error::Request { source }.into(),
358            })?;
359
360        // We expect a 206 Partial Content response if a range was requested
361        // a 200 OK response would indicate the server did not fulfill the request
362        if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
363            return Err(crate::Error::NotSupported {
364                source: Box::new(Error::RangeNotSupported {
365                    href: path.to_string(),
366                }),
367            });
368        }
369
370        Ok(res)
371    }
372}
373
374/// The response returned by a PROPFIND request, i.e. list
375#[derive(Deserialize, Default)]
376pub(crate) struct MultiStatus {
377    pub response: Vec<MultiStatusResponse>,
378}
379
380#[derive(Deserialize)]
381pub(crate) struct MultiStatusResponse {
382    href: String,
383    #[serde(rename = "propstat")]
384    prop_stat: PropStat,
385}
386
387impl MultiStatusResponse {
388    /// Returns an error if this response is not OK
389    pub(crate) fn check_ok(&self) -> Result<()> {
390        match self.prop_stat.status.contains("200 OK") {
391            true => Ok(()),
392            false => Err(Error::PropStatus {
393                href: self.href.clone(),
394                status: self.prop_stat.status.clone(),
395            }
396            .into()),
397        }
398    }
399
400    /// Returns the resolved path of this element relative to `base_url`
401    pub(crate) fn path(&self, base_url: &Url) -> Result<Path> {
402        let url = Url::options()
403            .base_url(Some(base_url))
404            .parse(&self.href)
405            .map_err(|source| Error::InvalidHref {
406                href: self.href.clone(),
407                source,
408            })?;
409
410        // Reverse any percent encoding
411        let path = percent_decode_str(url.path())
412            .decode_utf8()
413            .map_err(|source| Error::NonUnicode {
414                path: url.path().into(),
415                source,
416            })?;
417
418        Ok(Path::parse(path.as_ref()).map_err(|source| {
419            let path = path.into();
420            Error::InvalidPath { path, source }
421        })?)
422    }
423
424    fn size(&self) -> Result<u64> {
425        let size = self
426            .prop_stat
427            .prop
428            .content_length
429            .ok_or_else(|| Error::MissingSize {
430                href: self.href.clone(),
431            })?;
432
433        Ok(size)
434    }
435
436    /// Returns this objects metadata as [`ObjectMeta`]
437    pub(crate) fn object_meta(&self, base_url: &Url) -> Result<ObjectMeta> {
438        let last_modified = self.prop_stat.prop.last_modified;
439        Ok(ObjectMeta {
440            location: self.path(base_url)?,
441            last_modified,
442            size: self.size()?,
443            e_tag: self.prop_stat.prop.e_tag.clone(),
444            version: None,
445        })
446    }
447
448    /// Returns true if this is a directory / collection
449    pub(crate) fn is_dir(&self) -> bool {
450        self.prop_stat.prop.resource_type.collection.is_some()
451    }
452}
453
454#[derive(Deserialize)]
455pub(crate) struct PropStat {
456    prop: Prop,
457    status: String,
458}
459
460#[derive(Deserialize)]
461pub(crate) struct Prop {
462    #[serde(deserialize_with = "deserialize_rfc1123", rename = "getlastmodified")]
463    last_modified: DateTime<Utc>,
464
465    #[serde(rename = "getcontentlength")]
466    content_length: Option<u64>,
467
468    #[serde(rename = "resourcetype")]
469    resource_type: ResourceType,
470
471    #[serde(rename = "getetag")]
472    e_tag: Option<String>,
473}
474
475#[derive(Deserialize)]
476pub(crate) struct ResourceType {
477    collection: Option<()>,
478}