Skip to main content

pyo3_object_store/gcp/
store.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder, GoogleConfigKey};
5use object_store::ObjectStoreScheme;
6use pyo3::prelude::*;
7use pyo3::pybacked::PyBackedStr;
8use pyo3::types::{PyDict, PyString, PyTuple, PyType};
9use pyo3::{intern, IntoPyObjectExt};
10use url::Url;
11
12use crate::client::PyClientOptions;
13use crate::config::PyConfigValue;
14use crate::error::{GenericError, ParseUrlError, PyObjectStoreError, PyObjectStoreResult};
15use crate::gcp::credentials::PyGcpCredentialProvider;
16use crate::path::PyPath;
17use crate::retry::PyRetryConfig;
18use crate::{MaybePrefixedStore, PyUrl};
19
20#[derive(Debug, Clone, PartialEq)]
21struct GCSConfig {
22    prefix: Option<PyPath>,
23    config: PyGoogleConfig,
24    client_options: Option<PyClientOptions>,
25    retry_config: Option<PyRetryConfig>,
26    credential_provider: Option<PyGcpCredentialProvider>,
27}
28
29impl GCSConfig {
30    fn bucket(&self) -> &str {
31        self.config
32            .0
33            .get(&PyGoogleConfigKey(GoogleConfigKey::Bucket))
34            .expect("Bucket should always exist in the config")
35            .as_ref()
36    }
37
38    fn __getnewargs_ex__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
39        let args = PyTuple::empty(py).into_bound_py_any(py)?;
40        let kwargs = PyDict::new(py);
41
42        if let Some(prefix) = &self.prefix {
43            kwargs.set_item(intern!(py, "prefix"), prefix.as_ref().as_ref())?;
44        }
45        kwargs.set_item(intern!(py, "config"), &self.config)?;
46        if let Some(client_options) = &self.client_options {
47            kwargs.set_item(intern!(py, "client_options"), client_options)?;
48        }
49        if let Some(retry_config) = &self.retry_config {
50            kwargs.set_item(intern!(py, "retry_config"), retry_config)?;
51        }
52        if let Some(credential_provider) = &self.credential_provider {
53            kwargs.set_item("credential_provider", credential_provider)?;
54        }
55
56        PyTuple::new(py, [args, kwargs.into_bound_py_any(py)?])
57    }
58}
59
60/// A Python-facing wrapper around a [`GoogleCloudStorage`].
61#[derive(Debug, Clone)]
62#[pyclass(name = "GCSStore", frozen, subclass, from_py_object)]
63pub struct PyGCSStore {
64    store: Arc<MaybePrefixedStore<GoogleCloudStorage>>,
65    /// A config used for pickling. This must stay in sync with the underlying store's config.
66    config: GCSConfig,
67}
68
69impl AsRef<Arc<MaybePrefixedStore<GoogleCloudStorage>>> for PyGCSStore {
70    fn as_ref(&self) -> &Arc<MaybePrefixedStore<GoogleCloudStorage>> {
71        &self.store
72    }
73}
74
75impl PyGCSStore {
76    /// Consume self and return the underlying [`GoogleCloudStorage`].
77    pub fn into_inner(self) -> Arc<MaybePrefixedStore<GoogleCloudStorage>> {
78        self.store
79    }
80}
81
82#[pymethods]
83impl PyGCSStore {
84    // Create from parameters
85    #[new]
86    #[pyo3(signature = (bucket=None, *, prefix=None, config=None, client_options=None, retry_config=None, credential_provider=None, **kwargs))]
87    fn new(
88        bucket: Option<String>,
89        prefix: Option<PyPath>,
90        config: Option<PyGoogleConfig>,
91        client_options: Option<PyClientOptions>,
92        retry_config: Option<PyRetryConfig>,
93        credential_provider: Option<PyGcpCredentialProvider>,
94        kwargs: Option<PyGoogleConfig>,
95    ) -> PyObjectStoreResult<Self> {
96        let mut builder = GoogleCloudStorageBuilder::from_env();
97        let mut config = config.unwrap_or_default();
98        if let Some(bucket) = bucket.clone() {
99            // Note: we apply the bucket to the config, not directly to the builder, so they stay
100            // in sync.
101            config.insert_raising_if_exists(GoogleConfigKey::Bucket, bucket)?;
102        }
103        let combined_config = combine_config_kwargs(Some(config), kwargs)?;
104        builder = combined_config.clone().apply_config(builder);
105        if let Some(client_options) = client_options.clone() {
106            builder = builder.with_client_options(client_options.into())
107        }
108        if let Some(retry_config) = retry_config.clone() {
109            builder = builder.with_retry(retry_config.into())
110        }
111        if let Some(credential_provider) = credential_provider.clone() {
112            builder = builder.with_credentials(Arc::new(credential_provider));
113        }
114        Ok(Self {
115            store: Arc::new(MaybePrefixedStore::new(builder.build()?, prefix.clone())),
116            config: GCSConfig {
117                prefix,
118                config: combined_config,
119                client_options,
120                retry_config,
121                credential_provider,
122            },
123        })
124    }
125
126    #[classmethod]
127    #[pyo3(signature = (url, *, config=None, client_options=None, retry_config=None, credential_provider=None, **kwargs))]
128    pub(crate) fn from_url<'py>(
129        cls: &Bound<'py, PyType>,
130        url: PyUrl,
131        config: Option<PyGoogleConfig>,
132        client_options: Option<PyClientOptions>,
133        retry_config: Option<PyRetryConfig>,
134        credential_provider: Option<PyGcpCredentialProvider>,
135        kwargs: Option<PyGoogleConfig>,
136    ) -> PyObjectStoreResult<Bound<'py, PyAny>> {
137        // We manually parse the URL to find the prefix because `parse_url` does not apply the
138        // prefix.
139        let (_, prefix) =
140            ObjectStoreScheme::parse(url.as_ref()).map_err(object_store::Error::from)?;
141        let prefix: Option<String> = if prefix.parts().count() != 0 {
142            Some(prefix.into())
143        } else {
144            None
145        };
146        let config = parse_url(config, url.as_ref())?;
147
148        // Note: we pass **back** through Python so that if cls is a subclass, we instantiate the
149        // subclass
150        let kwargs = kwargs.unwrap_or_default().into_pyobject(cls.py())?;
151        kwargs.set_item("prefix", prefix)?;
152        kwargs.set_item("config", config)?;
153        kwargs.set_item("client_options", client_options)?;
154        kwargs.set_item("retry_config", retry_config)?;
155        kwargs.set_item("credential_provider", credential_provider)?;
156        Ok(cls.call((), Some(&kwargs))?)
157    }
158
159    fn __eq__(&self, other: &Bound<PyAny>) -> bool {
160        // Ensure we never error on __eq__ by returning false if the other object is not the same
161        // type
162        other
163            .cast::<PyGCSStore>()
164            .map(|other| self.config == other.get().config)
165            .unwrap_or(false)
166    }
167
168    fn __getnewargs_ex__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
169        self.config.__getnewargs_ex__(py)
170    }
171
172    fn __repr__(&self) -> String {
173        let bucket = self.config.bucket();
174        if let Some(prefix) = &self.config.prefix {
175            format!(
176                "GCSStore(bucket=\"{}\", prefix=\"{}\")",
177                bucket,
178                prefix.as_ref()
179            )
180        } else {
181            format!("GCSStore(bucket=\"{bucket}\")")
182        }
183    }
184
185    #[getter]
186    fn prefix(&self) -> Option<&PyPath> {
187        self.config.prefix.as_ref()
188    }
189
190    #[getter]
191    fn config(&self) -> &PyGoogleConfig {
192        &self.config.config
193    }
194
195    #[getter]
196    fn client_options(&self) -> Option<&PyClientOptions> {
197        self.config.client_options.as_ref()
198    }
199
200    #[getter]
201    fn credential_provider(&self) -> Option<&PyGcpCredentialProvider> {
202        self.config.credential_provider.as_ref()
203    }
204
205    #[getter]
206    fn retry_config(&self) -> Option<&PyRetryConfig> {
207        self.config.retry_config.as_ref()
208    }
209}
210
211#[derive(Clone, Debug, PartialEq, Eq, Hash)]
212pub struct PyGoogleConfigKey(GoogleConfigKey);
213
214impl<'py> FromPyObject<'_, 'py> for PyGoogleConfigKey {
215    type Error = PyErr;
216
217    fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
218        let s = obj.extract::<PyBackedStr>()?.to_lowercase();
219        // https://github.com/apache/arrow-rs-object-store/pull/467
220        if s == "application_credentials" {
221            return Ok(Self(GoogleConfigKey::ApplicationCredentials));
222        }
223
224        let key = s.parse().map_err(PyObjectStoreError::ObjectStoreError)?;
225        Ok(Self(key))
226    }
227}
228
229impl AsRef<str> for PyGoogleConfigKey {
230    fn as_ref(&self) -> &str {
231        self.0.as_ref()
232    }
233}
234
235impl<'py> IntoPyObject<'py> for PyGoogleConfigKey {
236    type Target = PyString;
237    type Output = Bound<'py, PyString>;
238    type Error = PyErr;
239
240    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
241        (&self).into_pyobject(py)
242    }
243}
244
245impl<'py> IntoPyObject<'py> for &PyGoogleConfigKey {
246    type Target = PyString;
247    type Output = Bound<'py, PyString>;
248    type Error = PyErr;
249
250    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
251        let s = self
252            .0
253            .as_ref()
254            .strip_prefix("google_")
255            .expect("Expected config prefix to start with google_");
256        Ok(PyString::new(py, s))
257    }
258}
259
260impl From<GoogleConfigKey> for PyGoogleConfigKey {
261    fn from(value: GoogleConfigKey) -> Self {
262        Self(value)
263    }
264}
265
266impl From<PyGoogleConfigKey> for GoogleConfigKey {
267    fn from(value: PyGoogleConfigKey) -> Self {
268        value.0
269    }
270}
271
272#[derive(Clone, Debug, Default, PartialEq, Eq, IntoPyObject, IntoPyObjectRef)]
273pub struct PyGoogleConfig(HashMap<PyGoogleConfigKey, PyConfigValue>);
274
275// Note: we manually impl FromPyObject instead of deriving it so that we can raise an
276// UnknownConfigurationKeyError instead of a `TypeError` on invalid config keys.
277//
278// We also manually impl this so that we can raise on duplicate keys.
279impl<'py> FromPyObject<'_, 'py> for PyGoogleConfig {
280    type Error = PyErr;
281
282    fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
283        let mut slf = Self::new();
284        for (key, val) in obj.extract::<Bound<'py, PyDict>>()?.iter() {
285            slf.insert_raising_if_exists(
286                key.extract::<PyGoogleConfigKey>()?,
287                val.extract::<PyConfigValue>()?,
288            )?;
289        }
290        Ok(slf)
291    }
292}
293
294impl PyGoogleConfig {
295    fn new() -> Self {
296        Self(HashMap::new())
297    }
298
299    fn apply_config(self, mut builder: GoogleCloudStorageBuilder) -> GoogleCloudStorageBuilder {
300        for (key, value) in self.0.into_iter() {
301            builder = builder.with_config(key.0, value.0);
302        }
303        builder
304    }
305
306    fn merge(mut self, other: PyGoogleConfig) -> PyObjectStoreResult<PyGoogleConfig> {
307        for (key, val) in other.0.into_iter() {
308            self.insert_raising_if_exists(key, val)?;
309        }
310
311        Ok(self)
312    }
313
314    fn insert_raising_if_exists(
315        &mut self,
316        key: impl Into<PyGoogleConfigKey>,
317        val: impl Into<String>,
318    ) -> PyObjectStoreResult<()> {
319        let key = key.into();
320        let old_value = self.0.insert(key.clone(), PyConfigValue::new(val.into()));
321        if old_value.is_some() {
322            return Err(GenericError::new_err(format!(
323                "Duplicate key {} provided",
324                key.0.as_ref()
325            ))
326            .into());
327        }
328
329        Ok(())
330    }
331
332    /// Insert a key only if it does not already exist.
333    ///
334    /// This is used for URL parsing, where any parts of the URL **do not** override any
335    /// configuration keys passed manually.
336    fn insert_if_not_exists(&mut self, key: impl Into<PyGoogleConfigKey>, val: impl Into<String>) {
337        self.0.entry(key.into()).or_insert(PyConfigValue::new(val));
338    }
339}
340
341fn combine_config_kwargs(
342    config: Option<PyGoogleConfig>,
343    kwargs: Option<PyGoogleConfig>,
344) -> PyObjectStoreResult<PyGoogleConfig> {
345    match (config, kwargs) {
346        (None, None) => Ok(Default::default()),
347        (Some(x), None) | (None, Some(x)) => Ok(x),
348        (Some(config), Some(kwargs)) => Ok(config.merge(kwargs)?),
349    }
350}
351
352/// Sets properties on this builder based on a URL
353///
354/// This is vendored from
355/// https://github.com/apache/arrow-rs/blob/f7263e253655b2ee613be97f9d00e063444d3df5/object_store/src/gcp/builder.rs#L316-L338
356///
357/// We do our own URL parsing so that we can keep our own config in sync with what is passed to the
358/// underlying ObjectStore builder. Passing the URL on verbatim makes it hard because the URL
359/// parsing only happens in `build()`. Then the config parameters we have don't include any config
360/// applied from the URL.
361fn parse_url(config: Option<PyGoogleConfig>, parsed: &Url) -> object_store::Result<PyGoogleConfig> {
362    let host = parsed
363        .host_str()
364        .ok_or_else(|| ParseUrlError::UrlNotRecognised {
365            url: parsed.as_str().to_string(),
366        })?;
367    let mut config = config.unwrap_or_default();
368
369    match parsed.scheme() {
370        "gs" => {
371            config.insert_if_not_exists(GoogleConfigKey::Bucket, host);
372        }
373        scheme => {
374            let scheme = scheme.to_string();
375            return Err(ParseUrlError::UnknownUrlScheme { scheme }.into());
376        }
377    }
378
379    Ok(config)
380}