1use std::collections::HashMap;
2use std::sync::Arc;
3
4use object_store::azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder};
5use object_store::ObjectStoreScheme;
6use pyo3::prelude::*;
7use pyo3::pybacked::PyBackedStr;
8use pyo3::types::{PyDict, PyString, PyTuple, PyType};
9use pyo3::{intern, IntoPyObjectExt};
10use url::Url;
11
12use crate::azure::credentials::PyAzureCredentialProvider;
13use crate::client::PyClientOptions;
14use crate::config::PyConfigValue;
15use crate::error::{GenericError, ParseUrlError, PyObjectStoreError, PyObjectStoreResult};
16use crate::path::PyPath;
17use crate::retry::PyRetryConfig;
18use crate::{MaybePrefixedStore, PyUrl};
19
20#[derive(Debug, Clone, PartialEq)]
21struct AzureConfig {
22 prefix: Option<PyPath>,
23 config: PyAzureConfig,
24 client_options: Option<PyClientOptions>,
25 retry_config: Option<PyRetryConfig>,
26 credential_provider: Option<PyAzureCredentialProvider>,
27}
28
29impl AzureConfig {
30 fn account_name(&self) -> &str {
31 self.config
32 .0
33 .get(&PyAzureConfigKey(AzureConfigKey::AccountName))
34 .expect("Account name should always exist in the config")
35 .as_ref()
36 }
37
38 fn container_name(&self) -> &str {
39 self.config
40 .0
41 .get(&PyAzureConfigKey(AzureConfigKey::ContainerName))
42 .expect("Container should always exist in the config")
43 .as_ref()
44 }
45
46 fn __getnewargs_ex__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
47 let args = PyTuple::empty(py).into_bound_py_any(py)?;
48 let kwargs = PyDict::new(py);
49
50 if let Some(prefix) = &self.prefix {
51 kwargs.set_item(intern!(py, "prefix"), prefix.as_ref().as_ref())?;
52 }
53 kwargs.set_item(intern!(py, "config"), &self.config)?;
54 if let Some(client_options) = &self.client_options {
55 kwargs.set_item(intern!(py, "client_options"), client_options)?;
56 }
57 if let Some(retry_config) = &self.retry_config {
58 kwargs.set_item(intern!(py, "retry_config"), retry_config)?;
59 }
60 if let Some(credential_provider) = &self.credential_provider {
61 kwargs.set_item("credential_provider", credential_provider)?;
62 }
63
64 PyTuple::new(py, [args, kwargs.into_bound_py_any(py)?])
65 }
66}
67
68#[derive(Debug, Clone)]
70#[pyclass(name = "AzureStore", frozen, subclass, from_py_object)]
71pub struct PyAzureStore {
72 store: Arc<MaybePrefixedStore<MicrosoftAzure>>,
73 config: AzureConfig,
75}
76
77impl AsRef<Arc<MaybePrefixedStore<MicrosoftAzure>>> for PyAzureStore {
78 fn as_ref(&self) -> &Arc<MaybePrefixedStore<MicrosoftAzure>> {
79 &self.store
80 }
81}
82
83impl PyAzureStore {
84 pub fn into_inner(self) -> Arc<MaybePrefixedStore<MicrosoftAzure>> {
86 self.store
87 }
88}
89
90#[pymethods]
91impl PyAzureStore {
92 #[new]
94 #[pyo3(signature = (container_name=None, *, prefix=None, config=None, client_options=None, retry_config=None, credential_provider=None, **kwargs))]
95 fn new(
96 container_name: Option<String>,
97 mut prefix: Option<PyPath>,
98 config: Option<PyAzureConfig>,
99 client_options: Option<PyClientOptions>,
100 retry_config: Option<PyRetryConfig>,
101 credential_provider: Option<PyAzureCredentialProvider>,
102 kwargs: Option<PyAzureConfig>,
103 ) -> PyObjectStoreResult<Self> {
104 let mut builder = MicrosoftAzureBuilder::from_env();
105 let mut config = config.unwrap_or_default();
106
107 if let Some(container_name) = container_name {
108 config.insert_raising_if_exists(AzureConfigKey::ContainerName, container_name)?;
111 }
112
113 let mut combined_config = combine_config_kwargs(Some(config), kwargs)?;
114
115 if let Some(client_options) = client_options.clone() {
116 builder = builder.with_client_options(client_options.into())
117 }
118 if let Some(retry_config) = retry_config.clone() {
119 builder = builder.with_retry(retry_config.into())
120 }
121
122 if let Some(credential_provider) = credential_provider.clone() {
123 if let Some(credential_config) = credential_provider.config() {
125 for (key, val) in credential_config.0.iter() {
126 combined_config.insert_if_not_exists(key.clone(), val.clone());
128 }
129 }
130
131 if let Some(passed_down_prefix) = credential_provider.prefix() {
132 if prefix.is_none() {
137 prefix = Some(passed_down_prefix.clone());
138 }
139 }
140
141 builder = builder.with_credentials(Arc::new(credential_provider));
142 }
143
144 builder = combined_config.clone().apply_config(builder);
145
146 Ok(Self {
147 store: Arc::new(MaybePrefixedStore::new(builder.build()?, prefix.clone())),
148 config: AzureConfig {
149 prefix,
150 config: combined_config,
151 client_options,
152 retry_config,
153 credential_provider,
154 },
155 })
156 }
157
158 #[classmethod]
159 #[pyo3(signature = (url, *, config=None, client_options=None, retry_config=None, credential_provider=None, **kwargs))]
160 pub(crate) fn from_url<'py>(
161 cls: &Bound<'py, PyType>,
162 url: PyUrl,
163 config: Option<PyAzureConfig>,
164 client_options: Option<PyClientOptions>,
165 retry_config: Option<PyRetryConfig>,
166 credential_provider: Option<PyAzureCredentialProvider>,
167 kwargs: Option<PyAzureConfig>,
168 ) -> PyObjectStoreResult<Bound<'py, PyAny>> {
169 let (_, prefix) =
172 ObjectStoreScheme::parse(url.as_ref()).map_err(object_store::Error::from)?;
173 let prefix: Option<String> = if prefix.parts().count() != 0 {
174 Some(prefix.into())
175 } else {
176 None
177 };
178 let config = parse_url(config, url.as_ref())?;
179
180 let kwargs = kwargs.unwrap_or_default().into_pyobject(cls.py())?;
183 kwargs.set_item("prefix", prefix)?;
184 kwargs.set_item("config", config)?;
185 kwargs.set_item("client_options", client_options)?;
186 kwargs.set_item("retry_config", retry_config)?;
187 kwargs.set_item("credential_provider", credential_provider)?;
188 Ok(cls.call((), Some(&kwargs))?)
189 }
190
191 fn __eq__(&self, other: &Bound<PyAny>) -> bool {
192 other
195 .cast::<PyAzureStore>()
196 .map(|other| self.config == other.get().config)
197 .unwrap_or(false)
198 }
199
200 fn __getnewargs_ex__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
201 self.config.__getnewargs_ex__(py)
202 }
203
204 fn __repr__(&self) -> String {
205 let account_name = self.config.account_name();
206 let container_name = self.config.container_name();
207 if let Some(prefix) = &self.config.prefix {
208 format!(
209 "AzureStore(container_name=\"{}\", account_name=\"{}\", prefix=\"{}\")",
210 container_name,
211 account_name,
212 prefix.as_ref()
213 )
214 } else {
215 format!(
216 "AzureStore(container_name=\"{container_name}\", account_name=\"{account_name}\")"
217 )
218 }
219 }
220
221 #[getter]
222 fn prefix(&self) -> Option<&PyPath> {
223 self.config.prefix.as_ref()
224 }
225
226 #[getter]
227 fn config(&self) -> &PyAzureConfig {
228 &self.config.config
229 }
230
231 #[getter]
232 fn client_options(&self) -> Option<&PyClientOptions> {
233 self.config.client_options.as_ref()
234 }
235
236 #[getter]
237 fn credential_provider(&self) -> Option<&PyAzureCredentialProvider> {
238 self.config.credential_provider.as_ref()
239 }
240
241 #[getter]
242 fn retry_config(&self) -> Option<&PyRetryConfig> {
243 self.config.retry_config.as_ref()
244 }
245}
246
247#[derive(Clone, Debug, PartialEq, Eq, Hash)]
248pub struct PyAzureConfigKey(AzureConfigKey);
249
250impl<'py> FromPyObject<'_, 'py> for PyAzureConfigKey {
251 type Error = PyErr;
252
253 fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
254 let s = obj.extract::<PyBackedStr>()?.to_lowercase();
255 let key = s.parse().map_err(PyObjectStoreError::ObjectStoreError)?;
256 Ok(Self(key))
257 }
258}
259
260impl AsRef<str> for PyAzureConfigKey {
261 fn as_ref(&self) -> &str {
262 self.0.as_ref()
263 }
264}
265
266impl<'py> IntoPyObject<'py> for &PyAzureConfigKey {
267 type Target = PyString;
268 type Output = Bound<'py, PyString>;
269 type Error = PyErr;
270
271 fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
272 let s = self.0.as_ref();
273 if let Some(stripped) = s.strip_prefix("azure_storage_") {
275 return Ok(PyString::new(py, stripped));
276 }
277 Ok(PyString::new(
278 py,
279 s.strip_prefix("azure_")
280 .expect("Expected config prefix to start with azure_"),
281 ))
282 }
283}
284
285impl<'py> IntoPyObject<'py> for PyAzureConfigKey {
286 type Target = PyString;
287 type Output = Bound<'py, PyString>;
288 type Error = PyErr;
289
290 fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
291 (&self).into_pyobject(py)
292 }
293}
294
295impl From<AzureConfigKey> for PyAzureConfigKey {
296 fn from(value: AzureConfigKey) -> Self {
297 Self(value)
298 }
299}
300
301impl From<PyAzureConfigKey> for AzureConfigKey {
302 fn from(value: PyAzureConfigKey) -> Self {
303 value.0
304 }
305}
306
307#[derive(Clone, Debug, Default, PartialEq, Eq, IntoPyObject, IntoPyObjectRef)]
308pub struct PyAzureConfig(HashMap<PyAzureConfigKey, PyConfigValue>);
309
310impl<'py> FromPyObject<'_, 'py> for PyAzureConfig {
315 type Error = PyErr;
316
317 fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
318 let mut slf = Self::new();
319 for (key, val) in obj.extract::<Bound<'py, PyDict>>()?.iter() {
320 slf.insert_raising_if_exists(
321 key.extract::<PyAzureConfigKey>()?,
322 val.extract::<PyConfigValue>()?,
323 )?;
324 }
325 Ok(slf)
326 }
327}
328
329impl PyAzureConfig {
330 fn new() -> Self {
331 Self(HashMap::new())
332 }
333
334 fn apply_config(self, mut builder: MicrosoftAzureBuilder) -> MicrosoftAzureBuilder {
335 for (key, value) in self.0.into_iter() {
336 builder = builder.with_config(key.0, value.0);
337 }
338 builder
339 }
340
341 fn merge(mut self, other: PyAzureConfig) -> PyObjectStoreResult<PyAzureConfig> {
342 for (key, val) in other.0.into_iter() {
343 self.insert_raising_if_exists(key, val)?;
344 }
345
346 Ok(self)
347 }
348
349 fn insert_raising_if_exists(
350 &mut self,
351 key: impl Into<PyAzureConfigKey>,
352 val: impl Into<String>,
353 ) -> PyObjectStoreResult<()> {
354 let key = key.into();
355 let old_value = self.0.insert(key.clone(), PyConfigValue::new(val.into()));
356 if old_value.is_some() {
357 return Err(GenericError::new_err(format!(
358 "Duplicate key {} provided",
359 key.0.as_ref()
360 ))
361 .into());
362 }
363
364 Ok(())
365 }
366
367 fn insert_if_not_exists(&mut self, key: impl Into<PyAzureConfigKey>, val: impl Into<String>) {
372 self.0.entry(key.into()).or_insert(PyConfigValue::new(val));
373 }
374}
375
376fn combine_config_kwargs(
377 config: Option<PyAzureConfig>,
378 kwargs: Option<PyAzureConfig>,
379) -> PyObjectStoreResult<PyAzureConfig> {
380 match (config, kwargs) {
381 (None, None) => Ok(Default::default()),
382 (Some(x), None) | (None, Some(x)) => Ok(x),
383 (Some(config), Some(kwargs)) => Ok(config.merge(kwargs)?),
384 }
385}
386
387fn parse_url(config: Option<PyAzureConfig>, parsed: &Url) -> object_store::Result<PyAzureConfig> {
397 let host = parsed
398 .host_str()
399 .ok_or_else(|| ParseUrlError::UrlNotRecognised {
400 url: parsed.as_str().to_string(),
401 })?;
402 let mut config = config.unwrap_or_default();
403
404 let validate = |s: &str| match s.contains('.') {
405 true => Err(ParseUrlError::UrlNotRecognised {
406 url: parsed.as_str().to_string(),
407 }),
408 false => Ok(s.to_string()),
409 };
410
411 match parsed.scheme() {
412 "adl" | "azure" => {
413 config.insert_if_not_exists(AzureConfigKey::ContainerName, validate(host)?);
414 }
415 "az" | "abfs" | "abfss" => {
416 if parsed.username().is_empty() {
419 config.insert_if_not_exists(AzureConfigKey::ContainerName, validate(host)?);
420 } else if let Some(a) = host.strip_suffix(".dfs.core.windows.net") {
421 config.insert_if_not_exists(
422 AzureConfigKey::ContainerName,
423 validate(parsed.username())?,
424 );
425 config.insert_if_not_exists(AzureConfigKey::AccountName, validate(a)?);
426 } else if let Some(a) = host.strip_suffix(".dfs.fabric.microsoft.com") {
427 config.insert_if_not_exists(
428 AzureConfigKey::ContainerName,
429 validate(parsed.username())?,
430 );
431 config.insert_if_not_exists(AzureConfigKey::AccountName, validate(a)?);
432 config.insert_if_not_exists(AzureConfigKey::UseFabricEndpoint, "true");
433 } else {
434 return Err(ParseUrlError::UrlNotRecognised {
435 url: parsed.as_str().to_string(),
436 }
437 .into());
438 }
439 }
440 "https" => match host.split_once('.') {
441 Some((a, "dfs.core.windows.net")) | Some((a, "blob.core.windows.net")) => {
442 config.insert_if_not_exists(AzureConfigKey::AccountName, validate(a)?);
443 if let Some(container) = parsed.path_segments().unwrap().next() {
444 config
445 .insert_if_not_exists(AzureConfigKey::ContainerName, validate(container)?);
446 }
447 }
448 Some((a, "dfs.fabric.microsoft.com")) | Some((a, "blob.fabric.microsoft.com")) => {
449 config.insert_if_not_exists(AzureConfigKey::AccountName, validate(a)?);
450 if let Some(workspace) = parsed.path_segments().unwrap().next() {
456 if !workspace.is_empty() {
457 config.insert_if_not_exists(AzureConfigKey::ContainerName, workspace);
458 }
459 }
460 config.insert_if_not_exists(AzureConfigKey::UseFabricEndpoint, "true");
461 }
462 _ => {
463 return Err(ParseUrlError::UrlNotRecognised {
464 url: parsed.as_str().to_string(),
465 }
466 .into())
467 }
468 },
469 scheme => {
470 let scheme = scheme.into();
471 return Err(ParseUrlError::UnknownUrlScheme { scheme }.into());
472 }
473 }
474
475 Ok(config)
476}