Skip to main content

pyo3_object_store/aws/
store.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use itertools::Itertools;
5use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey};
6use object_store::ObjectStoreScheme;
7use pyo3::prelude::*;
8use pyo3::pybacked::PyBackedStr;
9use pyo3::types::{PyDict, PyString, PyTuple, PyType};
10use pyo3::{intern, IntoPyObjectExt};
11use url::Url;
12
13use crate::aws::credentials::PyAWSCredentialProvider;
14use crate::client::PyClientOptions;
15use crate::config::PyConfigValue;
16use crate::error::{GenericError, ParseUrlError, PyObjectStoreError, PyObjectStoreResult};
17use crate::path::PyPath;
18use crate::prefix::MaybePrefixedStore;
19use crate::retry::PyRetryConfig;
20use crate::PyUrl;
21
22#[derive(Debug, Clone, PartialEq)]
23struct S3Config {
24    prefix: Option<PyPath>,
25    config: PyAmazonS3Config,
26    client_options: Option<PyClientOptions>,
27    retry_config: Option<PyRetryConfig>,
28    credential_provider: Option<PyAWSCredentialProvider>,
29}
30
31impl S3Config {
32    fn bucket(&self) -> &str {
33        self.config
34            .0
35            .get(&PyAmazonS3ConfigKey(AmazonS3ConfigKey::Bucket))
36            .expect("bucket should always exist in the config")
37            .as_ref()
38    }
39
40    fn __getnewargs_ex__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
41        let args = PyTuple::empty(py).into_bound_py_any(py)?;
42        let kwargs = PyDict::new(py);
43
44        if let Some(prefix) = &self.prefix {
45            kwargs.set_item(intern!(py, "prefix"), prefix.as_ref().as_ref())?;
46        }
47        kwargs.set_item(intern!(py, "config"), &self.config)?;
48        if let Some(client_options) = &self.client_options {
49            kwargs.set_item(intern!(py, "client_options"), client_options)?;
50        }
51        if let Some(retry_config) = &self.retry_config {
52            kwargs.set_item(intern!(py, "retry_config"), retry_config)?;
53        }
54        if let Some(credential_provider) = &self.credential_provider {
55            kwargs.set_item("credential_provider", credential_provider)?;
56        }
57
58        PyTuple::new(py, [args, kwargs.into_bound_py_any(py)?])
59    }
60}
61
62/// A Python-facing wrapper around an [`AmazonS3`].
63#[derive(Debug, Clone)]
64#[pyclass(name = "S3Store", frozen, subclass, from_py_object)]
65pub struct PyS3Store {
66    store: Arc<MaybePrefixedStore<AmazonS3>>,
67    /// A config used for pickling. This must stay in sync with the underlying store's config.
68    config: S3Config,
69}
70
71impl AsRef<Arc<MaybePrefixedStore<AmazonS3>>> for PyS3Store {
72    fn as_ref(&self) -> &Arc<MaybePrefixedStore<AmazonS3>> {
73        &self.store
74    }
75}
76
77impl PyS3Store {
78    /// Consume self and return the underlying [`AmazonS3`].
79    pub fn into_inner(self) -> Arc<MaybePrefixedStore<AmazonS3>> {
80        self.store
81    }
82}
83
84#[pymethods]
85impl PyS3Store {
86    // Create from parameters
87    #[new]
88    #[pyo3(signature = (bucket=None, *, prefix=None, config=None, client_options=None, retry_config=None, credential_provider=None, **kwargs))]
89    fn new(
90        bucket: Option<String>,
91        prefix: Option<PyPath>,
92        config: Option<PyAmazonS3Config>,
93        client_options: Option<PyClientOptions>,
94        retry_config: Option<PyRetryConfig>,
95        credential_provider: Option<PyAWSCredentialProvider>,
96        kwargs: Option<PyAmazonS3Config>,
97    ) -> PyObjectStoreResult<Self> {
98        let mut builder = AmazonS3Builder::from_env();
99        let mut config = config.unwrap_or_default();
100
101        if let Some(bucket) = bucket {
102            // Note: we apply the bucket to the config, not directly to the builder, so they stay
103            // in sync.
104            config.insert_raising_if_exists(AmazonS3ConfigKey::Bucket, bucket)?;
105        }
106
107        let mut combined_config = combine_config_kwargs(config, kwargs)?;
108
109        if let Some(client_options) = client_options.clone() {
110            builder = builder.with_client_options(client_options.into())
111        }
112        if let Some(retry_config) = retry_config.clone() {
113            builder = builder.with_retry(retry_config.into())
114        }
115
116        if let Some(credential_provider) = credential_provider.clone() {
117            // Apply credential provider config onto main config
118            if let Some(credential_config) = credential_provider.config() {
119                for (key, val) in credential_config.0.iter() {
120                    // Give precedence to passed-in config values
121                    combined_config.insert_if_not_exists(key.clone(), val.clone());
122                }
123            }
124            builder = builder.with_credentials(Arc::new(credential_provider));
125        }
126
127        builder = combined_config.clone().apply_config(builder);
128
129        Ok(Self {
130            store: Arc::new(MaybePrefixedStore::new(builder.build()?, prefix.clone())),
131            config: S3Config {
132                prefix,
133                config: combined_config,
134                client_options,
135                retry_config,
136                credential_provider,
137            },
138        })
139    }
140
141    #[classmethod]
142    #[pyo3(signature = (url, *, config=None, client_options=None, retry_config=None, credential_provider=None, **kwargs))]
143    pub(crate) fn from_url<'py>(
144        cls: &Bound<'py, PyType>,
145        url: PyUrl,
146        config: Option<PyAmazonS3Config>,
147        client_options: Option<PyClientOptions>,
148        retry_config: Option<PyRetryConfig>,
149        credential_provider: Option<PyAWSCredentialProvider>,
150        kwargs: Option<PyAmazonS3Config>,
151    ) -> PyObjectStoreResult<Bound<'py, PyAny>> {
152        // We manually parse the URL to find the prefix because `with_url` does not apply the
153        // prefix.
154        let (_, prefix) =
155            ObjectStoreScheme::parse(url.as_ref()).map_err(object_store::Error::from)?;
156        let prefix: Option<String> = if prefix.parts().count() != 0 {
157            Some(prefix.into())
158        } else {
159            None
160        };
161        let config = parse_url(config, url.as_ref())?;
162
163        // Note: we pass **back** through Python so that if cls is a subclass, we instantiate the
164        // subclass
165        let kwargs = kwargs.unwrap_or_default().into_pyobject(cls.py())?;
166        kwargs.set_item("prefix", prefix)?;
167        kwargs.set_item("config", config)?;
168        kwargs.set_item("client_options", client_options)?;
169        kwargs.set_item("retry_config", retry_config)?;
170        kwargs.set_item("credential_provider", credential_provider)?;
171        Ok(cls.call((), Some(&kwargs))?)
172    }
173
174    fn __eq__(&self, other: &Bound<PyAny>) -> bool {
175        // Ensure we never error on __eq__ by returning false if the other object is not an S3Store
176        other
177            .cast::<PyS3Store>()
178            .map(|other| self.config == other.get().config)
179            .unwrap_or(false)
180    }
181
182    fn __getnewargs_ex__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
183        self.config.__getnewargs_ex__(py)
184    }
185
186    fn __repr__(&self) -> String {
187        let bucket = self.config.bucket();
188        if let Some(prefix) = &self.config.prefix {
189            format!(
190                "S3Store(bucket=\"{}\", prefix=\"{}\")",
191                bucket,
192                prefix.as_ref()
193            )
194        } else {
195            format!("S3Store(bucket=\"{bucket}\")")
196        }
197    }
198
199    #[getter]
200    fn prefix(&self) -> Option<&PyPath> {
201        self.config.prefix.as_ref()
202    }
203
204    #[getter]
205    fn config(&self) -> &PyAmazonS3Config {
206        &self.config.config
207    }
208
209    #[getter]
210    fn client_options(&self) -> Option<&PyClientOptions> {
211        self.config.client_options.as_ref()
212    }
213
214    #[getter]
215    fn credential_provider(&self) -> Option<&PyAWSCredentialProvider> {
216        self.config.credential_provider.as_ref()
217    }
218
219    #[getter]
220    fn retry_config(&self) -> Option<&PyRetryConfig> {
221        self.config.retry_config.as_ref()
222    }
223}
224
225#[derive(Clone, Debug, PartialEq, Eq, Hash)]
226pub struct PyAmazonS3ConfigKey(AmazonS3ConfigKey);
227
228impl<'py> FromPyObject<'_, 'py> for PyAmazonS3ConfigKey {
229    type Error = PyErr;
230
231    fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
232        let s = obj.extract::<PyBackedStr>()?.to_lowercase();
233        let key = s.parse().map_err(PyObjectStoreError::ObjectStoreError)?;
234        Ok(Self(key))
235    }
236}
237
238impl AsRef<str> for PyAmazonS3ConfigKey {
239    fn as_ref(&self) -> &str {
240        self.0.as_ref()
241    }
242}
243
244impl<'py> IntoPyObject<'py> for &PyAmazonS3ConfigKey {
245    type Target = PyString;
246    type Output = Bound<'py, PyString>;
247    type Error = PyErr;
248
249    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
250        let s = self
251            .0
252            .as_ref()
253            .strip_prefix("aws_")
254            .expect("Expected config prefix to start with aws_");
255        Ok(PyString::new(py, s))
256    }
257}
258
259impl<'py> IntoPyObject<'py> for PyAmazonS3ConfigKey {
260    type Target = PyString;
261    type Output = Bound<'py, PyString>;
262    type Error = PyErr;
263
264    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
265        (&self).into_pyobject(py)
266    }
267}
268
269impl From<AmazonS3ConfigKey> for PyAmazonS3ConfigKey {
270    fn from(value: AmazonS3ConfigKey) -> Self {
271        Self(value)
272    }
273}
274
275impl From<PyAmazonS3ConfigKey> for AmazonS3ConfigKey {
276    fn from(value: PyAmazonS3ConfigKey) -> Self {
277        value.0
278    }
279}
280
281#[derive(Clone, Debug, Default, PartialEq, Eq, IntoPyObject, IntoPyObjectRef)]
282pub struct PyAmazonS3Config(HashMap<PyAmazonS3ConfigKey, PyConfigValue>);
283
284// Note: we manually impl FromPyObject instead of deriving it so that we can raise an
285// UnknownConfigurationKeyError instead of a `TypeError` on invalid config keys.
286//
287// We also manually impl this so that we can raise on duplicate keys.
288impl<'py> FromPyObject<'_, 'py> for PyAmazonS3Config {
289    type Error = PyErr;
290
291    fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
292        let mut slf = Self::new();
293        for (key, val) in obj.extract::<Bound<'py, PyDict>>()?.iter() {
294            slf.insert_raising_if_exists(
295                key.extract::<PyAmazonS3ConfigKey>()?,
296                val.extract::<PyConfigValue>()?,
297            )?;
298        }
299        Ok(slf)
300    }
301}
302
303impl PyAmazonS3Config {
304    fn new() -> Self {
305        Self(HashMap::new())
306    }
307
308    fn apply_config(self, mut builder: AmazonS3Builder) -> AmazonS3Builder {
309        for (key, value) in self.0.into_iter() {
310            builder = builder.with_config(key.0, value.0);
311        }
312        builder
313    }
314
315    fn merge(mut self, other: PyAmazonS3Config) -> PyObjectStoreResult<PyAmazonS3Config> {
316        for (key, val) in other.0.into_iter() {
317            self.insert_raising_if_exists(key, val)?;
318        }
319
320        Ok(self)
321    }
322
323    fn insert_raising_if_exists(
324        &mut self,
325        key: impl Into<PyAmazonS3ConfigKey>,
326        val: impl Into<String>,
327    ) -> PyObjectStoreResult<()> {
328        let key = key.into();
329        let old_value = self.0.insert(key.clone(), PyConfigValue::new(val.into()));
330        if old_value.is_some() {
331            return Err(GenericError::new_err(format!(
332                "Duplicate key {} provided",
333                key.0.as_ref()
334            ))
335            .into());
336        }
337
338        Ok(())
339    }
340
341    /// Insert a key only if it does not already exist.
342    ///
343    /// This is used for URL parsing, where any parts of the URL **do not** override any
344    /// configuration keys passed manually.
345    fn insert_if_not_exists(
346        &mut self,
347        key: impl Into<PyAmazonS3ConfigKey>,
348        val: impl Into<String>,
349    ) {
350        self.0.entry(key.into()).or_insert(PyConfigValue::new(val));
351    }
352}
353
354fn combine_config_kwargs(
355    config: PyAmazonS3Config,
356    kwargs: Option<PyAmazonS3Config>,
357) -> PyObjectStoreResult<PyAmazonS3Config> {
358    if let Some(kwargs) = kwargs {
359        config.merge(kwargs)
360    } else {
361        Ok(config)
362    }
363}
364
365/// Sets properties on a configuration based on a URL
366///
367/// This is vendored from
368/// https://github.com/apache/arrow-rs/blob/f7263e253655b2ee613be97f9d00e063444d3df5/object_store/src/aws/builder.rs#L600-L647
369///
370/// We do our own URL parsing so that we can keep our own config in sync with what is passed to the
371/// underlying ObjectStore builder. Passing the URL on verbatim makes it hard because the URL
372/// parsing only happens in `build()`. Then the config parameters we have don't include any config
373/// applied from the URL.
374fn parse_url(
375    config: Option<PyAmazonS3Config>,
376    parsed: &Url,
377) -> object_store::Result<PyAmazonS3Config> {
378    let host = parsed
379        .host_str()
380        .ok_or_else(|| ParseUrlError::UrlNotRecognised {
381            url: parsed.as_str().to_string(),
382        })?;
383    let mut config = config.unwrap_or_default();
384
385    match parsed.scheme() {
386        "s3" | "s3a" => {
387            config.insert_if_not_exists(AmazonS3ConfigKey::Bucket, host);
388        }
389        "https" => match host.splitn(4, '.').collect_tuple() {
390            Some(("s3", region, "amazonaws", "com")) => {
391                config.insert_if_not_exists(AmazonS3ConfigKey::Region, region);
392                let bucket = parsed.path_segments().into_iter().flatten().next();
393                if let Some(bucket) = bucket {
394                    config.insert_if_not_exists(AmazonS3ConfigKey::Bucket, bucket);
395                }
396            }
397            Some((bucket, "s3", "amazonaws", "com")) => {
398                config.insert_if_not_exists(AmazonS3ConfigKey::Bucket, bucket);
399                config.insert_if_not_exists(AmazonS3ConfigKey::VirtualHostedStyleRequest, "true");
400            }
401            Some((bucket, "s3", region, "amazonaws.com")) => {
402                config.insert_if_not_exists(AmazonS3ConfigKey::Bucket, bucket);
403                config.insert_if_not_exists(AmazonS3ConfigKey::Region, region);
404                config.insert_if_not_exists(AmazonS3ConfigKey::VirtualHostedStyleRequest, "true");
405            }
406            Some((account, "r2", "cloudflarestorage", "com")) => {
407                config.insert_if_not_exists(AmazonS3ConfigKey::Region, "auto");
408                let endpoint = format!("https://{account}.r2.cloudflarestorage.com");
409                config.insert_if_not_exists(AmazonS3ConfigKey::Endpoint, endpoint);
410
411                let bucket = parsed.path_segments().into_iter().flatten().next();
412                if let Some(bucket) = bucket {
413                    config.insert_if_not_exists(AmazonS3ConfigKey::Bucket, bucket);
414                }
415            }
416            _ => {
417                return Err(ParseUrlError::UrlNotRecognised {
418                    url: parsed.as_str().to_string(),
419                }
420                .into())
421            }
422        },
423        scheme => {
424            let scheme = scheme.into();
425            return Err(ParseUrlError::UnknownUrlScheme { scheme }.into());
426        }
427    };
428
429    Ok(config)
430}