Skip to main content

lance_io/object_store/providers/
azure.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    str::FromStr,
7    sync::{Arc, LazyLock},
8    time::Duration,
9};
10
11use object_store::ObjectStore as OSObjectStore;
12use object_store_opendal::OpendalStore;
13use opendal::{Operator, services::Azblob, services::Azdls};
14
15use object_store::{
16    RetryConfig,
17    azure::{AzureConfigKey, AzureCredential, MicrosoftAzureBuilder},
18};
19use url::Url;
20
21use crate::object_store::{
22    DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
23    ObjectStoreParams, ObjectStoreProvider, StorageOptions, StorageOptionsAccessor,
24    dynamic_credentials::build_dynamic_credential_provider,
25    throttle::{AimdThrottleConfig, AimdThrottledStore},
26};
27use lance_core::error::{Error, Result};
28
29#[derive(Default, Debug)]
30pub struct AzureBlobStoreProvider;
31
32impl AzureBlobStoreProvider {
33    /// Normalize Azure storage options for OpenDAL, resolving aliases for
34    /// well-known keys while passing through all other options (e.g.
35    /// `client_id`, `tenant_id`, `encryption_key`, etc.) so that OpenDAL
36    /// can use them directly.
37    fn normalize_opendal_azure_options(
38        options: &HashMap<String, String>,
39    ) -> HashMap<String, String> {
40        // Start with all options so unknown keys are forwarded to OpenDAL.
41        let mut config_map = options.clone();
42
43        // Normalize well-known aliases into canonical OpenDAL key names.
44        // Remove the alias after resolving to avoid duplicate/conflicting entries.
45        let alias_groups: &[(&str, &[&str])] = &[
46            ("account_name", &["azure_storage_account_name"]),
47            ("endpoint", &["azure_storage_endpoint", "azure_endpoint"]),
48            (
49                "account_key",
50                &[
51                    "azure_storage_account_key",
52                    "azure_storage_access_key",
53                    "azure_storage_master_key",
54                    "access_key",
55                    "master_key",
56                ],
57            ),
58            (
59                "sas_token",
60                &[
61                    "azure_storage_sas_token",
62                    "azure_storage_sas_key",
63                    "sas_key",
64                ],
65            ),
66        ];
67
68        for (canonical, aliases) in alias_groups {
69            if !config_map.contains_key(*canonical) {
70                for alias in *aliases {
71                    if let Some(value) = config_map.remove(*alias) {
72                        config_map.insert(canonical.to_string(), value);
73                        break;
74                    }
75                }
76            } else {
77                // Canonical key exists; remove aliases to avoid conflicts.
78                for alias in *aliases {
79                    config_map.remove(*alias);
80                }
81            }
82        }
83
84        config_map
85    }
86
87    fn build_opendal_operator(
88        base_path: &Url,
89        storage_options: &StorageOptions,
90    ) -> Result<Operator> {
91        // Start with all storage options as the config map
92        // OpenDAL will handle environment variables through its default credentials chain
93        let mut config_map = Self::normalize_opendal_azure_options(&storage_options.0);
94
95        match base_path.scheme() {
96            "az" => {
97                let container = base_path
98                    .host_str()
99                    .ok_or_else(|| Error::invalid_input("Azure URL must contain container name"))?
100                    .to_string();
101
102                config_map.insert("container".to_string(), container);
103
104                let prefix = base_path.path().trim_start_matches('/');
105                if !prefix.is_empty() {
106                    config_map.insert("root".to_string(), format!("/{}", prefix));
107                }
108
109                Operator::from_iter::<Azblob>(config_map)
110                    .map_err(|e| {
111                        Error::invalid_input(format!(
112                            "Failed to create Azure Blob operator: {:?}",
113                            e
114                        ))
115                    })
116                    .map(|b| b.finish())
117            }
118            "abfss" => {
119                let filesystem = base_path.username();
120                if filesystem.is_empty() {
121                    return Err(Error::invalid_input(
122                        "abfss:// URL must include account: abfss://<filesystem>@<account>.dfs.core.windows.net/path",
123                    ));
124                }
125                let host = base_path.host_str().ok_or_else(|| {
126                    Error::invalid_input(
127                        "abfss:// URL must include account: abfss://<filesystem>@<account>.dfs.core.windows.net/path"
128                    )
129                })?;
130
131                config_map.insert("filesystem".to_string(), filesystem.to_string());
132                config_map.insert("endpoint".to_string(), format!("https://{}", host));
133                config_map
134                    .entry("account_name".to_string())
135                    .or_insert_with(|| host.split('.').next().unwrap_or(host).to_string());
136
137                let root_path = base_path.path().trim_start_matches('/');
138                if !root_path.is_empty() {
139                    config_map.insert("root".to_string(), format!("/{}", root_path));
140                }
141
142                Operator::from_iter::<Azdls>(config_map)
143                    .map_err(|e| {
144                        Error::invalid_input(format!(
145                            "Failed to create Azure DFS (ADLS Gen2) operator: {:?}",
146                            e
147                        ))
148                    })
149                    .map(|b| b.finish())
150            }
151            _ => Err(Error::invalid_input(format!(
152                "Unsupported Azure scheme: {}",
153                base_path.scheme()
154            ))),
155        }
156    }
157
158    async fn build_opendal_azure_store(
159        &self,
160        base_path: &Url,
161        storage_options: &StorageOptions,
162    ) -> Result<Arc<dyn OSObjectStore>> {
163        let operator = Self::build_opendal_operator(base_path, storage_options)?;
164        Ok(Arc::new(OpendalStore::new(operator)))
165    }
166
167    async fn build_microsoft_azure_store(
168        &self,
169        base_path: &Url,
170        storage_options: &StorageOptions,
171        accessor: Option<Arc<StorageOptionsAccessor>>,
172    ) -> Result<Arc<dyn OSObjectStore>> {
173        // Use a low retry count since the AIMD throttle layer handles
174        // throttle recovery with its own retry loop.
175        let retry_config = RetryConfig {
176            backoff: Default::default(),
177            max_retries: storage_options.client_max_retries(),
178            retry_timeout: Duration::from_secs(storage_options.client_retry_timeout()),
179        };
180
181        let mut builder = MicrosoftAzureBuilder::new()
182            .with_url(base_path.as_ref())
183            .with_retry(retry_config)
184            .with_client_options(storage_options.client_options()?);
185        for (key, value) in storage_options.as_azure_options() {
186            builder = builder.with_config(key, value);
187        }
188
189        if let Some(credentials) =
190            build_dynamic_credential_provider::<AzureCredential>(accessor).await?
191        {
192            builder = builder.with_credentials(credentials);
193        }
194
195        Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
196    }
197
198    fn calculate_object_store_prefix_with_env(
199        url: &Url,
200        storage_options: Option<&HashMap<String, String>>,
201        env_options: &HashMap<String, String>,
202    ) -> Result<String> {
203        let authority = url.authority();
204        let (container, account) = match authority.find("@") {
205            Some(at_index) => {
206                // The URI has an:
207                // - az:// schema type and is similar to 'az://container@account.dfs.core.windows.net/path-part/file
208                //         or possibly 'az://container@account/path-part/file' (the short version).
209                // - abfss:// schema type and is similar to 'abfss://filesystem@account.dfs.core.windows.net/path-part/file'.
210                let container = &authority[..at_index];
211                let account = &authority[at_index + 1..];
212                (
213                    container,
214                    account.split(".").next().unwrap_or_default().to_string(),
215                )
216            }
217            None => {
218                // The URI looks like 'az://container/path-part/file'.
219                // We must look at the storage options to find the account.
220                let mut account = match storage_options {
221                    Some(opts) => StorageOptions::find_configured_storage_account(opts),
222                    None => None,
223                };
224                if account.is_none() {
225                    account = StorageOptions::find_configured_storage_account(env_options);
226                }
227                let account = account.ok_or(Error::invalid_input("Unable to find object store prefix: no Azure account name in URI, and no storage account configured."))?;
228                (authority, account)
229            }
230        };
231        Ok(format!("{}${}@{}", url.scheme(), container, account))
232    }
233}
234
235#[async_trait::async_trait]
236impl ObjectStoreProvider for AzureBlobStoreProvider {
237    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
238        let scheme = base_path.scheme().to_string();
239        if scheme != "az" && scheme != "abfss" {
240            return Err(Error::invalid_input(format!(
241                "Unsupported Azure scheme '{}', expected 'az' or 'abfss'",
242                scheme
243            )));
244        }
245
246        let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
247        let mut storage_options =
248            StorageOptions::new(params.storage_options().cloned().unwrap_or_default());
249        storage_options.with_env_azure();
250        let download_retry_count = storage_options.download_retry_count();
251
252        let use_opendal = storage_options
253            .0
254            .get("use_opendal")
255            .map(|v| v.as_str() == "true")
256            .unwrap_or(false);
257
258        let accessor = params.get_accessor();
259
260        let inner: Arc<dyn OSObjectStore> = if use_opendal {
261            // OpenDAL Azure intentionally uses static/environment-backed configuration only.
262            // Namespace-vended dynamic credentials are supported on the native object_store path.
263            self.build_opendal_azure_store(&base_path, &storage_options)
264                .await?
265        } else {
266            self.build_microsoft_azure_store(&base_path, &storage_options, accessor)
267                .await?
268        };
269        let throttle_config = AimdThrottleConfig::from_storage_options(params.storage_options())?;
270        let inner = if throttle_config.is_disabled() {
271            inner
272        } else if storage_options.client_max_retries() == 0 {
273            log::warn!(
274                "AIMD throttle disabled: the current implementation relies on the object store \
275                 client surfacing retry errors, which requires client_max_retries > 0. \
276                 No throttle or retry layer will be applied."
277            );
278            inner
279        } else {
280            Arc::new(AimdThrottledStore::new(inner, throttle_config)?) as Arc<dyn OSObjectStore>
281        };
282
283        Ok(ObjectStore {
284            inner,
285            scheme,
286            block_size,
287            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
288            use_constant_size_upload_parts: false,
289            list_is_lexically_ordered: true,
290            io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
291            download_retry_count,
292            io_tracker: Default::default(),
293            store_prefix: self
294                .calculate_object_store_prefix(&base_path, params.storage_options())?,
295        })
296    }
297
298    fn calculate_object_store_prefix(
299        &self,
300        url: &Url,
301        storage_options: Option<&HashMap<String, String>>,
302    ) -> Result<String> {
303        Self::calculate_object_store_prefix_with_env(url, storage_options, &ENV_OPTIONS.0)
304    }
305}
306
307static ENV_OPTIONS: LazyLock<StorageOptions> = LazyLock::new(StorageOptions::from_env);
308
309impl StorageOptions {
310    /// Iterate over all environment variables, looking for anything related to Azure.
311    fn from_env() -> Self {
312        let mut opts = HashMap::<String, String>::new();
313        for (os_key, os_value) in std::env::vars_os() {
314            if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str())
315                && let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase())
316            {
317                opts.insert(config_key.as_ref().to_string(), value.to_string());
318            }
319        }
320        Self(opts)
321    }
322
323    /// Add values from the environment to storage options
324    pub fn with_env_azure(&mut self) {
325        for (os_key, os_value) in &ENV_OPTIONS.0 {
326            if !self.0.contains_key(os_key) {
327                self.0.insert(os_key.clone(), os_value.clone());
328            }
329        }
330    }
331
332    /// Subset of options relevant for azure storage
333    pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
334        self.0
335            .iter()
336            .filter_map(|(key, value)| {
337                let az_key = AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
338                Some((az_key, value.clone()))
339            })
340            .collect()
341    }
342
343    #[allow(clippy::manual_map)]
344    fn find_configured_storage_account(map: &HashMap<String, String>) -> Option<String> {
345        if let Some(account) = map.get("azure_storage_account_name") {
346            Some(account.clone())
347        } else if let Some(account) = map.get("account_name") {
348            Some(account.clone())
349        } else {
350            None
351        }
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358    use std::sync::Arc;
359
360    use crate::object_store::test_utils::StaticMockStorageOptionsProvider;
361    use crate::object_store::{ObjectStoreParams, StorageOptionsAccessor};
362    use std::collections::HashMap;
363
364    #[test]
365    fn test_azure_store_path() {
366        let provider = AzureBlobStoreProvider;
367
368        let url = Url::parse("az://bucket/path/to/file").unwrap();
369        let path = provider.extract_path(&url).unwrap();
370        let expected_path = object_store::path::Path::from("path/to/file");
371        assert_eq!(path, expected_path);
372    }
373
374    #[tokio::test]
375    async fn test_use_opendal_flag() {
376        let provider = AzureBlobStoreProvider;
377        let url = Url::parse("az://test-container/path").unwrap();
378        let params_with_flag = ObjectStoreParams {
379            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
380                HashMap::from([
381                    ("use_opendal".to_string(), "true".to_string()),
382                    ("account_name".to_string(), "test_account".to_string()),
383                    (
384                        "endpoint".to_string(),
385                        "https://test_account.blob.core.windows.net".to_string(),
386                    ),
387                    (
388                        "account_key".to_string(),
389                        "dGVzdF9hY2NvdW50X2tleQ==".to_string(),
390                    ),
391                ]),
392            ))),
393            ..Default::default()
394        };
395
396        let store = provider
397            .new_store(url.clone(), &params_with_flag)
398            .await
399            .unwrap();
400        assert_eq!(store.scheme, "az");
401        let inner_desc = store.inner.to_string();
402        assert!(
403            inner_desc.contains("Opendal") && inner_desc.contains("azblob"),
404            "az:// with use_opendal=true should use OpenDAL Azblob, got: {}",
405            inner_desc
406        );
407    }
408
409    #[tokio::test]
410    async fn test_dynamic_azure_credentials_provider() {
411        let accessor = Arc::new(StorageOptionsAccessor::with_provider(Arc::new(
412            StaticMockStorageOptionsProvider {
413                options: HashMap::from([(
414                    "azure_storage_sas_token".to_string(),
415                    "?sv=2022-11-02&sp=rl&sig=test".to_string(),
416                )]),
417            },
418        )));
419
420        let credentials = build_dynamic_credential_provider::<AzureCredential>(Some(accessor))
421            .await
422            .expect("dynamic azure credentials should build")
423            .expect("expected credential provider")
424            .get_credential()
425            .await
426            .expect("expected azure credential");
427
428        match credentials.as_ref() {
429            AzureCredential::SASToken(pairs) => {
430                assert!(
431                    pairs
432                        .iter()
433                        .any(|(key, value)| key == "sig" && value == "test")
434                );
435            }
436            other => panic!("expected SAS token, got {other:?}"),
437        }
438    }
439
440    #[test]
441    fn test_find_configured_storage_account() {
442        assert_eq!(
443            Some("myaccount".to_string()),
444            StorageOptions::find_configured_storage_account(&HashMap::from_iter(
445                [
446                    ("access_key".to_string(), "myaccesskey".to_string()),
447                    (
448                        "azure_storage_account_name".to_string(),
449                        "myaccount".to_string()
450                    )
451                ]
452                .into_iter()
453            ))
454        );
455    }
456
457    #[test]
458    fn test_calculate_object_store_prefix_from_url_and_options() {
459        let provider = AzureBlobStoreProvider;
460        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
461        assert_eq!(
462            "az$container@bob",
463            provider
464                .calculate_object_store_prefix(
465                    &Url::parse("az://container/path").unwrap(),
466                    Some(&options)
467                )
468                .unwrap()
469        );
470    }
471
472    #[test]
473    fn test_calculate_object_store_prefix_from_url_and_ignored_options() {
474        let provider = AzureBlobStoreProvider;
475        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
476        assert_eq!(
477            "az$container@account",
478            provider
479                .calculate_object_store_prefix(
480                    &Url::parse("az://container@account.dfs.core.windows.net/path").unwrap(),
481                    Some(&options)
482                )
483                .unwrap()
484        );
485    }
486
487    #[test]
488    fn test_calculate_object_store_prefix_from_url_short_account() {
489        let provider = AzureBlobStoreProvider;
490        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
491        assert_eq!(
492            "az$container@account",
493            provider
494                .calculate_object_store_prefix(
495                    &Url::parse("az://container@account/path").unwrap(),
496                    Some(&options)
497                )
498                .unwrap()
499        );
500    }
501
502    #[test]
503    fn test_fail_to_calculate_object_store_prefix_from_url() {
504        let options = HashMap::from_iter([("access_key".to_string(), "myaccesskey".to_string())]);
505        let expected = "Invalid user input: Unable to find object store prefix: no Azure account name in URI, and no storage account configured.";
506        let result = AzureBlobStoreProvider::calculate_object_store_prefix_with_env(
507            &Url::parse("az://container/path").unwrap(),
508            Some(&options),
509            &HashMap::new(),
510        )
511        .expect_err("expected error")
512        .to_string();
513        assert_eq!(expected, &result[..expected.len()]);
514    }
515
516    // --- abfss:// tests ---
517
518    #[test]
519    fn test_abfss_extract_path() {
520        let provider = AzureBlobStoreProvider;
521        let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path/to/dataset.lance")
522            .unwrap();
523        let path = provider.extract_path(&url).unwrap();
524        assert_eq!(
525            path,
526            object_store::path::Path::from("path/to/dataset.lance")
527        );
528    }
529
530    #[test]
531    fn test_calculate_abfss_prefix() {
532        let provider = AzureBlobStoreProvider;
533        let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path/to/data").unwrap();
534        let prefix = provider.calculate_object_store_prefix(&url, None).unwrap();
535        assert_eq!(prefix, "abfss$myfs@myaccount");
536    }
537
538    #[test]
539    fn test_calculate_abfss_prefix_ignores_storage_options() {
540        let provider = AzureBlobStoreProvider;
541        let options =
542            HashMap::from_iter([("account_name".to_string(), "other_account".to_string())]);
543        let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path").unwrap();
544        let prefix = provider
545            .calculate_object_store_prefix(&url, Some(&options))
546            .unwrap();
547        assert_eq!(prefix, "abfss$myfs@myaccount");
548    }
549
550    #[tokio::test]
551    async fn test_abfss_default_uses_microsoft_builder() {
552        use crate::object_store::StorageOptionsAccessor;
553        let provider = AzureBlobStoreProvider;
554        let url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
555        let params = ObjectStoreParams {
556            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
557                HashMap::from([
558                    ("account_name".to_string(), "testaccount".to_string()),
559                    ("account_key".to_string(), "dGVzdA==".to_string()),
560                ]),
561            ))),
562            ..Default::default()
563        };
564
565        let store = provider.new_store(url, &params).await.unwrap();
566        assert_eq!(store.scheme, "abfss");
567        assert!(!store.is_local());
568        assert!(store.is_cloud());
569        let inner_desc = store.inner.to_string();
570        assert!(
571            inner_desc.contains("MicrosoftAzure"),
572            "abfss:// without use_opendal should use MicrosoftAzureBuilder, got: {}",
573            inner_desc
574        );
575    }
576
577    #[tokio::test]
578    async fn test_unsupported_scheme_rejected() {
579        use crate::object_store::StorageOptionsAccessor;
580        let provider = AzureBlobStoreProvider;
581        let url = Url::parse("wasbs://container@myaccount.blob.core.windows.net/path").unwrap();
582        let params = ObjectStoreParams {
583            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
584                HashMap::from([
585                    ("account_name".to_string(), "myaccount".to_string()),
586                    ("account_key".to_string(), "dGVzdA==".to_string()),
587                ]),
588            ))),
589            ..Default::default()
590        };
591
592        let err = provider
593            .new_store(url, &params)
594            .await
595            .expect_err("expected error for unsupported scheme");
596        assert!(
597            err.to_string().contains("Unsupported Azure scheme"),
598            "unexpected error: {}",
599            err
600        );
601    }
602
603    #[tokio::test]
604    async fn test_abfss_with_opendal_uses_azdls() {
605        use crate::object_store::StorageOptionsAccessor;
606        let provider = AzureBlobStoreProvider;
607        let url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
608        let params = ObjectStoreParams {
609            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
610                HashMap::from([
611                    ("use_opendal".to_string(), "true".to_string()),
612                    ("account_name".to_string(), "testaccount".to_string()),
613                    ("account_key".to_string(), "dGVzdA==".to_string()),
614                ]),
615            ))),
616            ..Default::default()
617        };
618
619        let store = provider.new_store(url, &params).await.unwrap();
620        assert_eq!(store.scheme, "abfss");
621        assert!(!store.is_local());
622        assert!(store.is_cloud());
623        let inner_desc = store.inner.to_string();
624        assert!(
625            inner_desc.contains("Opendal") && inner_desc.contains("azdls"),
626            "abfss:// with use_opendal=true should use OpenDAL Azdls, got: {}",
627            inner_desc
628        );
629    }
630
631    #[test]
632    fn test_azdls_capabilities_differ_from_azblob() {
633        let common_opts = StorageOptions(HashMap::from([
634            ("account_name".to_string(), "testaccount".to_string()),
635            ("account_key".to_string(), "dGVzdA==".to_string()),
636            (
637                "endpoint".to_string(),
638                "https://testaccount.blob.core.windows.net".to_string(),
639            ),
640        ]));
641
642        // Build az:// operator (uses Azblob backend)
643        let az_url = Url::parse("az://test-container/path").unwrap();
644        let az_operator =
645            AzureBlobStoreProvider::build_opendal_operator(&az_url, &common_opts).unwrap();
646
647        // Build abfss:// operator (uses Azdls backend)
648        let abfss_url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
649        let abfss_operator =
650            AzureBlobStoreProvider::build_opendal_operator(&abfss_url, &common_opts).unwrap();
651
652        let azblob_cap = az_operator.info().native_capability();
653        let azdls_cap = abfss_operator.info().native_capability();
654
655        // Both support basic operations
656        assert!(azblob_cap.read);
657        assert!(azdls_cap.read);
658        assert!(azblob_cap.write);
659        assert!(azdls_cap.write);
660        assert!(azblob_cap.list);
661        assert!(azdls_cap.list);
662
663        // Azdls supports rename and create_dir (HNS features); Azblob does not
664        assert!(azdls_cap.rename, "Azdls should support rename");
665        assert!(azdls_cap.create_dir, "Azdls should support create_dir");
666        assert!(!azblob_cap.rename, "Azblob should not support rename");
667    }
668}