Skip to main content

pyo3_object_store/azure/
store.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use object_store::azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder};
5use object_store::ObjectStoreScheme;
6use pyo3::prelude::*;
7use pyo3::pybacked::PyBackedStr;
8use pyo3::types::{PyDict, PyString, PyTuple, PyType};
9use pyo3::{intern, IntoPyObjectExt};
10use url::Url;
11
12use crate::azure::credentials::PyAzureCredentialProvider;
13use crate::client::PyClientOptions;
14use crate::config::PyConfigValue;
15use crate::error::{GenericError, ParseUrlError, PyObjectStoreError, PyObjectStoreResult};
16use crate::path::PyPath;
17use crate::retry::PyRetryConfig;
18use crate::{MaybePrefixedStore, PyUrl};
19
20#[derive(Debug, Clone, PartialEq)]
21struct AzureConfig {
22    prefix: Option<PyPath>,
23    config: PyAzureConfig,
24    client_options: Option<PyClientOptions>,
25    retry_config: Option<PyRetryConfig>,
26    credential_provider: Option<PyAzureCredentialProvider>,
27}
28
29impl AzureConfig {
30    fn account_name(&self) -> &str {
31        self.config
32            .0
33            .get(&PyAzureConfigKey(AzureConfigKey::AccountName))
34            .expect("Account name should always exist in the config")
35            .as_ref()
36    }
37
38    fn container_name(&self) -> &str {
39        self.config
40            .0
41            .get(&PyAzureConfigKey(AzureConfigKey::ContainerName))
42            .expect("Container should always exist in the config")
43            .as_ref()
44    }
45
46    fn __getnewargs_ex__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
47        let args = PyTuple::empty(py).into_bound_py_any(py)?;
48        let kwargs = PyDict::new(py);
49
50        if let Some(prefix) = &self.prefix {
51            kwargs.set_item(intern!(py, "prefix"), prefix.as_ref().as_ref())?;
52        }
53        kwargs.set_item(intern!(py, "config"), &self.config)?;
54        if let Some(client_options) = &self.client_options {
55            kwargs.set_item(intern!(py, "client_options"), client_options)?;
56        }
57        if let Some(retry_config) = &self.retry_config {
58            kwargs.set_item(intern!(py, "retry_config"), retry_config)?;
59        }
60        if let Some(credential_provider) = &self.credential_provider {
61            kwargs.set_item("credential_provider", credential_provider)?;
62        }
63
64        PyTuple::new(py, [args, kwargs.into_bound_py_any(py)?])
65    }
66}
67
68/// A Python-facing wrapper around a [`MicrosoftAzure`].
69#[derive(Debug, Clone)]
70#[pyclass(name = "AzureStore", frozen, subclass, from_py_object)]
71pub struct PyAzureStore {
72    store: Arc<MaybePrefixedStore<MicrosoftAzure>>,
73    /// A config used for pickling. This must stay in sync with the underlying store's config.
74    config: AzureConfig,
75}
76
77impl AsRef<Arc<MaybePrefixedStore<MicrosoftAzure>>> for PyAzureStore {
78    fn as_ref(&self) -> &Arc<MaybePrefixedStore<MicrosoftAzure>> {
79        &self.store
80    }
81}
82
83impl PyAzureStore {
84    /// Consume self and return the underlying [`MicrosoftAzure`].
85    pub fn into_inner(self) -> Arc<MaybePrefixedStore<MicrosoftAzure>> {
86        self.store
87    }
88}
89
90#[pymethods]
91impl PyAzureStore {
92    // Create from parameters
93    #[new]
94    #[pyo3(signature = (container_name=None, *, prefix=None, config=None, client_options=None, retry_config=None, credential_provider=None, **kwargs))]
95    fn new(
96        container_name: Option<String>,
97        mut prefix: Option<PyPath>,
98        config: Option<PyAzureConfig>,
99        client_options: Option<PyClientOptions>,
100        retry_config: Option<PyRetryConfig>,
101        credential_provider: Option<PyAzureCredentialProvider>,
102        kwargs: Option<PyAzureConfig>,
103    ) -> PyObjectStoreResult<Self> {
104        let mut builder = MicrosoftAzureBuilder::from_env();
105        let mut config = config.unwrap_or_default();
106
107        if let Some(container_name) = container_name {
108            // Note: we apply the bucket to the config, not directly to the builder, so they stay
109            // in sync.
110            config.insert_raising_if_exists(AzureConfigKey::ContainerName, container_name)?;
111        }
112
113        let mut combined_config = combine_config_kwargs(Some(config), kwargs)?;
114
115        if let Some(client_options) = client_options.clone() {
116            builder = builder.with_client_options(client_options.into())
117        }
118        if let Some(retry_config) = retry_config.clone() {
119            builder = builder.with_retry(retry_config.into())
120        }
121
122        if let Some(credential_provider) = credential_provider.clone() {
123            // Apply credential provider config onto main config
124            if let Some(credential_config) = credential_provider.config() {
125                for (key, val) in credential_config.0.iter() {
126                    // Give precedence to passed-in config values
127                    combined_config.insert_if_not_exists(key.clone(), val.clone());
128                }
129            }
130
131            if let Some(passed_down_prefix) = credential_provider.prefix() {
132                // Don't override a prefix manually passed in to AzureStore
133                //
134                // If a user wishes to override a prefix passed down by a credential provider, they
135                // can pass `prefix=""` or `prefix="/"` to the AzureStore.
136                if prefix.is_none() {
137                    prefix = Some(passed_down_prefix.clone());
138                }
139            }
140
141            builder = builder.with_credentials(Arc::new(credential_provider));
142        }
143
144        builder = combined_config.clone().apply_config(builder);
145
146        Ok(Self {
147            store: Arc::new(MaybePrefixedStore::new(builder.build()?, prefix.clone())),
148            config: AzureConfig {
149                prefix,
150                config: combined_config,
151                client_options,
152                retry_config,
153                credential_provider,
154            },
155        })
156    }
157
158    #[classmethod]
159    #[pyo3(signature = (url, *, config=None, client_options=None, retry_config=None, credential_provider=None, **kwargs))]
160    pub(crate) fn from_url<'py>(
161        cls: &Bound<'py, PyType>,
162        url: PyUrl,
163        config: Option<PyAzureConfig>,
164        client_options: Option<PyClientOptions>,
165        retry_config: Option<PyRetryConfig>,
166        credential_provider: Option<PyAzureCredentialProvider>,
167        kwargs: Option<PyAzureConfig>,
168    ) -> PyObjectStoreResult<Bound<'py, PyAny>> {
169        // We manually parse the URL to find the prefix because `parse_url` does not apply the
170        // prefix.
171        let (_, prefix) =
172            ObjectStoreScheme::parse(url.as_ref()).map_err(object_store::Error::from)?;
173        let prefix: Option<String> = if prefix.parts().count() != 0 {
174            Some(prefix.into())
175        } else {
176            None
177        };
178        let config = parse_url(config, url.as_ref())?;
179
180        // Note: we pass **back** through Python so that if cls is a subclass, we instantiate the
181        // subclass
182        let kwargs = kwargs.unwrap_or_default().into_pyobject(cls.py())?;
183        kwargs.set_item("prefix", prefix)?;
184        kwargs.set_item("config", config)?;
185        kwargs.set_item("client_options", client_options)?;
186        kwargs.set_item("retry_config", retry_config)?;
187        kwargs.set_item("credential_provider", credential_provider)?;
188        Ok(cls.call((), Some(&kwargs))?)
189    }
190
191    fn __eq__(&self, other: &Bound<PyAny>) -> bool {
192        // Ensure we never error on __eq__ by returning false if the other object is not the same
193        // type
194        other
195            .cast::<PyAzureStore>()
196            .map(|other| self.config == other.get().config)
197            .unwrap_or(false)
198    }
199
200    fn __getnewargs_ex__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
201        self.config.__getnewargs_ex__(py)
202    }
203
204    fn __repr__(&self) -> String {
205        let account_name = self.config.account_name();
206        let container_name = self.config.container_name();
207        if let Some(prefix) = &self.config.prefix {
208            format!(
209                "AzureStore(container_name=\"{}\", account_name=\"{}\", prefix=\"{}\")",
210                container_name,
211                account_name,
212                prefix.as_ref()
213            )
214        } else {
215            format!(
216                "AzureStore(container_name=\"{container_name}\", account_name=\"{account_name}\")"
217            )
218        }
219    }
220
221    #[getter]
222    fn prefix(&self) -> Option<&PyPath> {
223        self.config.prefix.as_ref()
224    }
225
226    #[getter]
227    fn config(&self) -> &PyAzureConfig {
228        &self.config.config
229    }
230
231    #[getter]
232    fn client_options(&self) -> Option<&PyClientOptions> {
233        self.config.client_options.as_ref()
234    }
235
236    #[getter]
237    fn credential_provider(&self) -> Option<&PyAzureCredentialProvider> {
238        self.config.credential_provider.as_ref()
239    }
240
241    #[getter]
242    fn retry_config(&self) -> Option<&PyRetryConfig> {
243        self.config.retry_config.as_ref()
244    }
245}
246
247#[derive(Clone, Debug, PartialEq, Eq, Hash)]
248pub struct PyAzureConfigKey(AzureConfigKey);
249
250impl<'py> FromPyObject<'_, 'py> for PyAzureConfigKey {
251    type Error = PyErr;
252
253    fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
254        let s = obj.extract::<PyBackedStr>()?.to_lowercase();
255        let key = s.parse().map_err(PyObjectStoreError::ObjectStoreError)?;
256        Ok(Self(key))
257    }
258}
259
260impl AsRef<str> for PyAzureConfigKey {
261    fn as_ref(&self) -> &str {
262        self.0.as_ref()
263    }
264}
265
266impl<'py> IntoPyObject<'py> for &PyAzureConfigKey {
267    type Target = PyString;
268    type Output = Bound<'py, PyString>;
269    type Error = PyErr;
270
271    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
272        let s = self.0.as_ref();
273        // Anything with an `azure_storage_` prefix we can fully strip
274        if let Some(stripped) = s.strip_prefix("azure_storage_") {
275            return Ok(PyString::new(py, stripped));
276        }
277        Ok(PyString::new(
278            py,
279            s.strip_prefix("azure_")
280                .expect("Expected config prefix to start with azure_"),
281        ))
282    }
283}
284
285impl<'py> IntoPyObject<'py> for PyAzureConfigKey {
286    type Target = PyString;
287    type Output = Bound<'py, PyString>;
288    type Error = PyErr;
289
290    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
291        (&self).into_pyobject(py)
292    }
293}
294
295impl From<AzureConfigKey> for PyAzureConfigKey {
296    fn from(value: AzureConfigKey) -> Self {
297        Self(value)
298    }
299}
300
301impl From<PyAzureConfigKey> for AzureConfigKey {
302    fn from(value: PyAzureConfigKey) -> Self {
303        value.0
304    }
305}
306
307#[derive(Clone, Debug, Default, PartialEq, Eq, IntoPyObject, IntoPyObjectRef)]
308pub struct PyAzureConfig(HashMap<PyAzureConfigKey, PyConfigValue>);
309
310// Note: we manually impl FromPyObject instead of deriving it so that we can raise an
311// UnknownConfigurationKeyError instead of a `TypeError` on invalid config keys.
312//
313// We also manually impl this so that we can raise on duplicate keys.
314impl<'py> FromPyObject<'_, 'py> for PyAzureConfig {
315    type Error = PyErr;
316
317    fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
318        let mut slf = Self::new();
319        for (key, val) in obj.extract::<Bound<'py, PyDict>>()?.iter() {
320            slf.insert_raising_if_exists(
321                key.extract::<PyAzureConfigKey>()?,
322                val.extract::<PyConfigValue>()?,
323            )?;
324        }
325        Ok(slf)
326    }
327}
328
329impl PyAzureConfig {
330    fn new() -> Self {
331        Self(HashMap::new())
332    }
333
334    fn apply_config(self, mut builder: MicrosoftAzureBuilder) -> MicrosoftAzureBuilder {
335        for (key, value) in self.0.into_iter() {
336            builder = builder.with_config(key.0, value.0);
337        }
338        builder
339    }
340
341    fn merge(mut self, other: PyAzureConfig) -> PyObjectStoreResult<PyAzureConfig> {
342        for (key, val) in other.0.into_iter() {
343            self.insert_raising_if_exists(key, val)?;
344        }
345
346        Ok(self)
347    }
348
349    fn insert_raising_if_exists(
350        &mut self,
351        key: impl Into<PyAzureConfigKey>,
352        val: impl Into<String>,
353    ) -> PyObjectStoreResult<()> {
354        let key = key.into();
355        let old_value = self.0.insert(key.clone(), PyConfigValue::new(val.into()));
356        if old_value.is_some() {
357            return Err(GenericError::new_err(format!(
358                "Duplicate key {} provided",
359                key.0.as_ref()
360            ))
361            .into());
362        }
363
364        Ok(())
365    }
366
367    /// Insert a key only if it does not already exist.
368    ///
369    /// This is used for URL parsing, where any parts of the URL **do not** override any
370    /// configuration keys passed manually.
371    fn insert_if_not_exists(&mut self, key: impl Into<PyAzureConfigKey>, val: impl Into<String>) {
372        self.0.entry(key.into()).or_insert(PyConfigValue::new(val));
373    }
374}
375
376fn combine_config_kwargs(
377    config: Option<PyAzureConfig>,
378    kwargs: Option<PyAzureConfig>,
379) -> PyObjectStoreResult<PyAzureConfig> {
380    match (config, kwargs) {
381        (None, None) => Ok(Default::default()),
382        (Some(x), None) | (None, Some(x)) => Ok(x),
383        (Some(config), Some(kwargs)) => Ok(config.merge(kwargs)?),
384    }
385}
386
387/// Sets properties on this builder based on a URL
388///
389/// This is vendored from
390/// https://github.com/apache/arrow-rs/blob/f7263e253655b2ee613be97f9d00e063444d3df5/object_store/src/azure/builder.rs#L639-L705
391///
392/// We do our own URL parsing so that we can keep our own config in sync with what is passed to the
393/// underlying ObjectStore builder. Passing the URL on verbatim makes it hard because the URL
394/// parsing only happens in `build()`. Then the config parameters we have don't include any config
395/// applied from the URL.
396fn parse_url(config: Option<PyAzureConfig>, parsed: &Url) -> object_store::Result<PyAzureConfig> {
397    let host = parsed
398        .host_str()
399        .ok_or_else(|| ParseUrlError::UrlNotRecognised {
400            url: parsed.as_str().to_string(),
401        })?;
402    let mut config = config.unwrap_or_default();
403
404    let validate = |s: &str| match s.contains('.') {
405        true => Err(ParseUrlError::UrlNotRecognised {
406            url: parsed.as_str().to_string(),
407        }),
408        false => Ok(s.to_string()),
409    };
410
411    match parsed.scheme() {
412        "adl" | "azure" => {
413            config.insert_if_not_exists(AzureConfigKey::ContainerName, validate(host)?);
414        }
415        "az" | "abfs" | "abfss" => {
416            // abfs(s) might refer to the fsspec convention abfs://<container>/<path>
417            // or the convention for the hadoop driver abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>
418            if parsed.username().is_empty() {
419                config.insert_if_not_exists(AzureConfigKey::ContainerName, validate(host)?);
420            } else if let Some(a) = host.strip_suffix(".dfs.core.windows.net") {
421                config.insert_if_not_exists(
422                    AzureConfigKey::ContainerName,
423                    validate(parsed.username())?,
424                );
425                config.insert_if_not_exists(AzureConfigKey::AccountName, validate(a)?);
426            } else if let Some(a) = host.strip_suffix(".dfs.fabric.microsoft.com") {
427                config.insert_if_not_exists(
428                    AzureConfigKey::ContainerName,
429                    validate(parsed.username())?,
430                );
431                config.insert_if_not_exists(AzureConfigKey::AccountName, validate(a)?);
432                config.insert_if_not_exists(AzureConfigKey::UseFabricEndpoint, "true");
433            } else {
434                return Err(ParseUrlError::UrlNotRecognised {
435                    url: parsed.as_str().to_string(),
436                }
437                .into());
438            }
439        }
440        "https" => match host.split_once('.') {
441            Some((a, "dfs.core.windows.net")) | Some((a, "blob.core.windows.net")) => {
442                config.insert_if_not_exists(AzureConfigKey::AccountName, validate(a)?);
443                if let Some(container) = parsed.path_segments().unwrap().next() {
444                    config
445                        .insert_if_not_exists(AzureConfigKey::ContainerName, validate(container)?);
446                }
447            }
448            Some((a, "dfs.fabric.microsoft.com")) | Some((a, "blob.fabric.microsoft.com")) => {
449                config.insert_if_not_exists(AzureConfigKey::AccountName, validate(a)?);
450                // Attempt to infer the container name from the URL
451                // - https://onelake.dfs.fabric.microsoft.com/<workspaceGUID>/<itemGUID>/Files/test.csv
452                // - https://onelake.dfs.fabric.microsoft.com/<workspace>/<item>.<itemtype>/<path>/<fileName>
453                //
454                // See <https://learn.microsoft.com/en-us/fabric/onelake/onelake-access-api>
455                if let Some(workspace) = parsed.path_segments().unwrap().next() {
456                    if !workspace.is_empty() {
457                        config.insert_if_not_exists(AzureConfigKey::ContainerName, workspace);
458                    }
459                }
460                config.insert_if_not_exists(AzureConfigKey::UseFabricEndpoint, "true");
461            }
462            _ => {
463                return Err(ParseUrlError::UrlNotRecognised {
464                    url: parsed.as_str().to_string(),
465                }
466                .into())
467            }
468        },
469        scheme => {
470            let scheme = scheme.into();
471            return Err(ParseUrlError::UnknownUrlScheme { scheme }.into());
472        }
473    }
474
475    Ok(config)
476}