Skip to main content

lance_io/object_store/providers/
azure.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    str::FromStr,
7    sync::{Arc, LazyLock},
8    time::Duration,
9};
10
11use object_store::ObjectStore as OSObjectStore;
12use object_store_opendal::OpendalStore;
13use opendal::{Operator, services::Azblob, services::Azdls};
14
15use object_store::{
16    RetryConfig,
17    azure::{AzureConfigKey, AzureCredential, MicrosoftAzureBuilder},
18};
19use url::Url;
20
21use crate::object_store::{
22    DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
23    ObjectStoreParams, ObjectStoreProvider, StorageOptions, StorageOptionsAccessor,
24    dynamic_credentials::build_dynamic_credential_provider,
25    throttle::{AimdThrottleConfig, AimdThrottledStore},
26};
27use lance_core::error::{Error, Result};
28
29#[derive(Default, Debug)]
30pub struct AzureBlobStoreProvider;
31
32impl AzureBlobStoreProvider {
33    /// Normalize Azure storage options for OpenDAL, resolving aliases for
34    /// well-known keys while passing through all other options (e.g.
35    /// `client_id`, `tenant_id`, `encryption_key`, etc.) so that OpenDAL
36    /// can use them directly.
37    fn normalize_opendal_azure_options(
38        options: &HashMap<String, String>,
39    ) -> HashMap<String, String> {
40        // Start with all options so unknown keys are forwarded to OpenDAL.
41        let mut config_map = options.clone();
42
43        // Normalize well-known aliases into canonical OpenDAL key names.
44        // Remove the alias after resolving to avoid duplicate/conflicting entries.
45        let alias_groups: &[(&str, &[&str])] = &[
46            ("account_name", &["azure_storage_account_name"]),
47            ("endpoint", &["azure_storage_endpoint", "azure_endpoint"]),
48            (
49                "account_key",
50                &[
51                    "azure_storage_account_key",
52                    "azure_storage_access_key",
53                    "azure_storage_master_key",
54                    "access_key",
55                    "master_key",
56                ],
57            ),
58            (
59                "sas_token",
60                &[
61                    "azure_storage_sas_token",
62                    "azure_storage_sas_key",
63                    "sas_key",
64                ],
65            ),
66        ];
67
68        for (canonical, aliases) in alias_groups {
69            if !config_map.contains_key(*canonical) {
70                for alias in *aliases {
71                    if let Some(value) = config_map.remove(*alias) {
72                        config_map.insert(canonical.to_string(), value);
73                        break;
74                    }
75                }
76            } else {
77                // Canonical key exists; remove aliases to avoid conflicts.
78                for alias in *aliases {
79                    config_map.remove(*alias);
80                }
81            }
82        }
83
84        config_map
85    }
86
87    fn build_opendal_operator(
88        base_path: &Url,
89        storage_options: &StorageOptions,
90    ) -> Result<Operator> {
91        // Start with all storage options as the config map
92        // OpenDAL will handle environment variables through its default credentials chain
93        let mut config_map = Self::normalize_opendal_azure_options(&storage_options.0);
94
95        match base_path.scheme() {
96            "az" => {
97                let container = base_path
98                    .host_str()
99                    .ok_or_else(|| Error::invalid_input("Azure URL must contain container name"))?
100                    .to_string();
101
102                config_map.insert("container".to_string(), container);
103
104                let prefix = base_path.path().trim_start_matches('/');
105                if !prefix.is_empty() {
106                    config_map.insert("root".to_string(), format!("/{}", prefix));
107                }
108
109                Operator::from_iter::<Azblob>(config_map)
110                    .map_err(|e| {
111                        Error::invalid_input(format!(
112                            "Failed to create Azure Blob operator: {:?}",
113                            e
114                        ))
115                    })
116                    .map(|b| b.finish())
117            }
118            "abfss" => {
119                let filesystem = base_path.username();
120                if filesystem.is_empty() {
121                    return Err(Error::invalid_input(
122                        "abfss:// URL must include account: abfss://<filesystem>@<account>.dfs.core.windows.net/path",
123                    ));
124                }
125                let host = base_path.host_str().ok_or_else(|| {
126                    Error::invalid_input(
127                        "abfss:// URL must include account: abfss://<filesystem>@<account>.dfs.core.windows.net/path"
128                    )
129                })?;
130
131                config_map.insert("filesystem".to_string(), filesystem.to_string());
132                config_map.insert("endpoint".to_string(), format!("https://{}", host));
133                config_map
134                    .entry("account_name".to_string())
135                    .or_insert_with(|| host.split('.').next().unwrap_or(host).to_string());
136
137                let root_path = base_path.path().trim_start_matches('/');
138                if !root_path.is_empty() {
139                    config_map.insert("root".to_string(), format!("/{}", root_path));
140                }
141
142                Operator::from_iter::<Azdls>(config_map)
143                    .map_err(|e| {
144                        Error::invalid_input(format!(
145                            "Failed to create Azure DFS (ADLS Gen2) operator: {:?}",
146                            e
147                        ))
148                    })
149                    .map(|b| b.finish())
150            }
151            _ => Err(Error::invalid_input(format!(
152                "Unsupported Azure scheme: {}",
153                base_path.scheme()
154            ))),
155        }
156    }
157
158    async fn build_opendal_azure_store(
159        &self,
160        base_path: &Url,
161        storage_options: &StorageOptions,
162    ) -> Result<Arc<dyn OSObjectStore>> {
163        let operator = Self::build_opendal_operator(base_path, storage_options)?;
164        Ok(Arc::new(OpendalStore::new(operator)))
165    }
166
167    async fn build_microsoft_azure_store(
168        &self,
169        base_path: &Url,
170        storage_options: &StorageOptions,
171        accessor: Option<Arc<StorageOptionsAccessor>>,
172    ) -> Result<Arc<dyn OSObjectStore>> {
173        // Use a low retry count since the AIMD throttle layer handles
174        // throttle recovery with its own retry loop.
175        let retry_config = RetryConfig {
176            backoff: Default::default(),
177            max_retries: storage_options.client_max_retries(),
178            retry_timeout: Duration::from_secs(storage_options.client_retry_timeout()),
179        };
180
181        let mut builder = MicrosoftAzureBuilder::new()
182            .with_url(base_path.as_ref())
183            .with_retry(retry_config)
184            .with_client_options(storage_options.client_options()?);
185        for (key, value) in storage_options.as_azure_options() {
186            builder = builder.with_config(key, value);
187        }
188
189        if let Some(credentials) =
190            build_dynamic_credential_provider::<AzureCredential>(accessor).await?
191        {
192            builder = builder.with_credentials(credentials);
193        }
194
195        Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
196    }
197
198    fn calculate_object_store_prefix_with_env(
199        url: &Url,
200        storage_options: Option<&HashMap<String, String>>,
201        env_options: &HashMap<String, String>,
202    ) -> Result<String> {
203        let authority = url.authority();
204        let (container, account) = match authority.find("@") {
205            Some(at_index) => {
206                // The URI has an:
207                // - az:// schema type and is similar to 'az://container@account.dfs.core.windows.net/path-part/file
208                //         or possibly 'az://container@account/path-part/file' (the short version).
209                // - abfss:// schema type and is similar to 'abfss://filesystem@account.dfs.core.windows.net/path-part/file'.
210                let container = &authority[..at_index];
211                let account = &authority[at_index + 1..];
212                (
213                    container,
214                    account.split(".").next().unwrap_or_default().to_string(),
215                )
216            }
217            None => {
218                // The URI looks like 'az://container/path-part/file'.
219                // We must look at the storage options to find the account.
220                let mut account = match storage_options {
221                    Some(opts) => StorageOptions::find_configured_storage_account(opts),
222                    None => None,
223                };
224                if account.is_none() {
225                    account = StorageOptions::find_configured_storage_account(env_options);
226                }
227                let account = account.ok_or(Error::invalid_input("Unable to find object store prefix: no Azure account name in URI, and no storage account configured."))?;
228                (authority, account)
229            }
230        };
231        Ok(format!("{}${}@{}", url.scheme(), container, account))
232    }
233}
234
235#[async_trait::async_trait]
236impl ObjectStoreProvider for AzureBlobStoreProvider {
237    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
238        let scheme = base_path.scheme().to_string();
239        if scheme != "az" && scheme != "abfss" {
240            return Err(Error::invalid_input(format!(
241                "Unsupported Azure scheme '{}', expected 'az' or 'abfss'",
242                scheme
243            )));
244        }
245
246        let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
247        let mut storage_options =
248            StorageOptions::new(params.storage_options().cloned().unwrap_or_default());
249        storage_options.with_env_azure();
250        let download_retry_count = storage_options.download_retry_count();
251
252        let use_opendal = storage_options
253            .0
254            .get("use_opendal")
255            .map(|v| v.as_str() == "true")
256            .unwrap_or(false);
257
258        let accessor = params.get_accessor();
259
260        let inner: Arc<dyn OSObjectStore> = if use_opendal {
261            // OpenDAL Azure intentionally uses static/environment-backed configuration only.
262            // Namespace-vended dynamic credentials are supported on the native object_store path.
263            self.build_opendal_azure_store(&base_path, &storage_options)
264                .await?
265        } else {
266            self.build_microsoft_azure_store(&base_path, &storage_options, accessor)
267                .await?
268        };
269        let throttle_config = AimdThrottleConfig::from_storage_options(params.storage_options())?;
270        let inner = if throttle_config.is_disabled() {
271            inner
272        } else {
273            Arc::new(AimdThrottledStore::new(inner, throttle_config)?) as Arc<dyn OSObjectStore>
274        };
275
276        Ok(ObjectStore {
277            inner,
278            scheme,
279            block_size,
280            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
281            use_constant_size_upload_parts: false,
282            list_is_lexically_ordered: true,
283            io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
284            download_retry_count,
285            io_tracker: Default::default(),
286            store_prefix: self
287                .calculate_object_store_prefix(&base_path, params.storage_options())?,
288        })
289    }
290
291    fn calculate_object_store_prefix(
292        &self,
293        url: &Url,
294        storage_options: Option<&HashMap<String, String>>,
295    ) -> Result<String> {
296        Self::calculate_object_store_prefix_with_env(url, storage_options, &ENV_OPTIONS.0)
297    }
298}
299
300static ENV_OPTIONS: LazyLock<StorageOptions> = LazyLock::new(StorageOptions::from_env);
301
302impl StorageOptions {
303    /// Iterate over all environment variables, looking for anything related to Azure.
304    fn from_env() -> Self {
305        let mut opts = HashMap::<String, String>::new();
306        for (os_key, os_value) in std::env::vars_os() {
307            if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str())
308                && let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase())
309            {
310                opts.insert(config_key.as_ref().to_string(), value.to_string());
311            }
312        }
313        Self(opts)
314    }
315
316    /// Add values from the environment to storage options
317    pub fn with_env_azure(&mut self) {
318        for (os_key, os_value) in &ENV_OPTIONS.0 {
319            if !self.0.contains_key(os_key) {
320                self.0.insert(os_key.clone(), os_value.clone());
321            }
322        }
323    }
324
325    /// Subset of options relevant for azure storage
326    pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
327        self.0
328            .iter()
329            .filter_map(|(key, value)| {
330                let az_key = AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
331                Some((az_key, value.clone()))
332            })
333            .collect()
334    }
335
336    #[allow(clippy::manual_map)]
337    fn find_configured_storage_account(map: &HashMap<String, String>) -> Option<String> {
338        if let Some(account) = map.get("azure_storage_account_name") {
339            Some(account.clone())
340        } else if let Some(account) = map.get("account_name") {
341            Some(account.clone())
342        } else {
343            None
344        }
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351    use std::sync::Arc;
352
353    use crate::object_store::test_utils::StaticMockStorageOptionsProvider;
354    use crate::object_store::{ObjectStoreParams, StorageOptionsAccessor};
355    use std::collections::HashMap;
356
357    #[test]
358    fn test_azure_store_path() {
359        let provider = AzureBlobStoreProvider;
360
361        let url = Url::parse("az://bucket/path/to/file").unwrap();
362        let path = provider.extract_path(&url).unwrap();
363        let expected_path = object_store::path::Path::from("path/to/file");
364        assert_eq!(path, expected_path);
365    }
366
367    #[tokio::test]
368    async fn test_use_opendal_flag() {
369        let provider = AzureBlobStoreProvider;
370        let url = Url::parse("az://test-container/path").unwrap();
371        let params_with_flag = ObjectStoreParams {
372            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
373                HashMap::from([
374                    ("use_opendal".to_string(), "true".to_string()),
375                    ("account_name".to_string(), "test_account".to_string()),
376                    (
377                        "endpoint".to_string(),
378                        "https://test_account.blob.core.windows.net".to_string(),
379                    ),
380                    (
381                        "account_key".to_string(),
382                        "dGVzdF9hY2NvdW50X2tleQ==".to_string(),
383                    ),
384                ]),
385            ))),
386            ..Default::default()
387        };
388
389        let store = provider
390            .new_store(url.clone(), &params_with_flag)
391            .await
392            .unwrap();
393        assert_eq!(store.scheme, "az");
394        let inner_desc = store.inner.to_string();
395        assert!(
396            inner_desc.contains("Opendal") && inner_desc.contains("azblob"),
397            "az:// with use_opendal=true should use OpenDAL Azblob, got: {}",
398            inner_desc
399        );
400    }
401
402    #[tokio::test]
403    async fn test_dynamic_azure_credentials_provider() {
404        let accessor = Arc::new(StorageOptionsAccessor::with_provider(Arc::new(
405            StaticMockStorageOptionsProvider {
406                options: HashMap::from([(
407                    "azure_storage_sas_token".to_string(),
408                    "?sv=2022-11-02&sp=rl&sig=test".to_string(),
409                )]),
410            },
411        )));
412
413        let credentials = build_dynamic_credential_provider::<AzureCredential>(Some(accessor))
414            .await
415            .expect("dynamic azure credentials should build")
416            .expect("expected credential provider")
417            .get_credential()
418            .await
419            .expect("expected azure credential");
420
421        match credentials.as_ref() {
422            AzureCredential::SASToken(pairs) => {
423                assert!(
424                    pairs
425                        .iter()
426                        .any(|(key, value)| key == "sig" && value == "test")
427                );
428            }
429            other => panic!("expected SAS token, got {other:?}"),
430        }
431    }
432
433    #[test]
434    fn test_find_configured_storage_account() {
435        assert_eq!(
436            Some("myaccount".to_string()),
437            StorageOptions::find_configured_storage_account(&HashMap::from_iter(
438                [
439                    ("access_key".to_string(), "myaccesskey".to_string()),
440                    (
441                        "azure_storage_account_name".to_string(),
442                        "myaccount".to_string()
443                    )
444                ]
445                .into_iter()
446            ))
447        );
448    }
449
450    #[test]
451    fn test_calculate_object_store_prefix_from_url_and_options() {
452        let provider = AzureBlobStoreProvider;
453        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
454        assert_eq!(
455            "az$container@bob",
456            provider
457                .calculate_object_store_prefix(
458                    &Url::parse("az://container/path").unwrap(),
459                    Some(&options)
460                )
461                .unwrap()
462        );
463    }
464
465    #[test]
466    fn test_calculate_object_store_prefix_from_url_and_ignored_options() {
467        let provider = AzureBlobStoreProvider;
468        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
469        assert_eq!(
470            "az$container@account",
471            provider
472                .calculate_object_store_prefix(
473                    &Url::parse("az://container@account.dfs.core.windows.net/path").unwrap(),
474                    Some(&options)
475                )
476                .unwrap()
477        );
478    }
479
480    #[test]
481    fn test_calculate_object_store_prefix_from_url_short_account() {
482        let provider = AzureBlobStoreProvider;
483        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
484        assert_eq!(
485            "az$container@account",
486            provider
487                .calculate_object_store_prefix(
488                    &Url::parse("az://container@account/path").unwrap(),
489                    Some(&options)
490                )
491                .unwrap()
492        );
493    }
494
495    #[test]
496    fn test_fail_to_calculate_object_store_prefix_from_url() {
497        let options = HashMap::from_iter([("access_key".to_string(), "myaccesskey".to_string())]);
498        let expected = "Invalid user input: Unable to find object store prefix: no Azure account name in URI, and no storage account configured.";
499        let result = AzureBlobStoreProvider::calculate_object_store_prefix_with_env(
500            &Url::parse("az://container/path").unwrap(),
501            Some(&options),
502            &HashMap::new(),
503        )
504        .expect_err("expected error")
505        .to_string();
506        assert_eq!(expected, &result[..expected.len()]);
507    }
508
509    // --- abfss:// tests ---
510
511    #[test]
512    fn test_abfss_extract_path() {
513        let provider = AzureBlobStoreProvider;
514        let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path/to/dataset.lance")
515            .unwrap();
516        let path = provider.extract_path(&url).unwrap();
517        assert_eq!(
518            path,
519            object_store::path::Path::from("path/to/dataset.lance")
520        );
521    }
522
523    #[test]
524    fn test_calculate_abfss_prefix() {
525        let provider = AzureBlobStoreProvider;
526        let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path/to/data").unwrap();
527        let prefix = provider.calculate_object_store_prefix(&url, None).unwrap();
528        assert_eq!(prefix, "abfss$myfs@myaccount");
529    }
530
531    #[test]
532    fn test_calculate_abfss_prefix_ignores_storage_options() {
533        let provider = AzureBlobStoreProvider;
534        let options =
535            HashMap::from_iter([("account_name".to_string(), "other_account".to_string())]);
536        let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path").unwrap();
537        let prefix = provider
538            .calculate_object_store_prefix(&url, Some(&options))
539            .unwrap();
540        assert_eq!(prefix, "abfss$myfs@myaccount");
541    }
542
543    #[tokio::test]
544    async fn test_abfss_default_uses_microsoft_builder() {
545        use crate::object_store::StorageOptionsAccessor;
546        let provider = AzureBlobStoreProvider;
547        let url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
548        let params = ObjectStoreParams {
549            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
550                HashMap::from([
551                    ("account_name".to_string(), "testaccount".to_string()),
552                    ("account_key".to_string(), "dGVzdA==".to_string()),
553                ]),
554            ))),
555            ..Default::default()
556        };
557
558        let store = provider.new_store(url, &params).await.unwrap();
559        assert_eq!(store.scheme, "abfss");
560        assert!(!store.is_local());
561        assert!(store.is_cloud());
562        let inner_desc = store.inner.to_string();
563        assert!(
564            inner_desc.contains("MicrosoftAzure"),
565            "abfss:// without use_opendal should use MicrosoftAzureBuilder, got: {}",
566            inner_desc
567        );
568    }
569
570    #[tokio::test]
571    async fn test_unsupported_scheme_rejected() {
572        use crate::object_store::StorageOptionsAccessor;
573        let provider = AzureBlobStoreProvider;
574        let url = Url::parse("wasbs://container@myaccount.blob.core.windows.net/path").unwrap();
575        let params = ObjectStoreParams {
576            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
577                HashMap::from([
578                    ("account_name".to_string(), "myaccount".to_string()),
579                    ("account_key".to_string(), "dGVzdA==".to_string()),
580                ]),
581            ))),
582            ..Default::default()
583        };
584
585        let err = provider
586            .new_store(url, &params)
587            .await
588            .expect_err("expected error for unsupported scheme");
589        assert!(
590            err.to_string().contains("Unsupported Azure scheme"),
591            "unexpected error: {}",
592            err
593        );
594    }
595
596    #[tokio::test]
597    async fn test_abfss_with_opendal_uses_azdls() {
598        use crate::object_store::StorageOptionsAccessor;
599        let provider = AzureBlobStoreProvider;
600        let url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
601        let params = ObjectStoreParams {
602            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
603                HashMap::from([
604                    ("use_opendal".to_string(), "true".to_string()),
605                    ("account_name".to_string(), "testaccount".to_string()),
606                    ("account_key".to_string(), "dGVzdA==".to_string()),
607                ]),
608            ))),
609            ..Default::default()
610        };
611
612        let store = provider.new_store(url, &params).await.unwrap();
613        assert_eq!(store.scheme, "abfss");
614        assert!(!store.is_local());
615        assert!(store.is_cloud());
616        let inner_desc = store.inner.to_string();
617        assert!(
618            inner_desc.contains("Opendal") && inner_desc.contains("azdls"),
619            "abfss:// with use_opendal=true should use OpenDAL Azdls, got: {}",
620            inner_desc
621        );
622    }
623
624    #[test]
625    fn test_azdls_capabilities_differ_from_azblob() {
626        let common_opts = StorageOptions(HashMap::from([
627            ("account_name".to_string(), "testaccount".to_string()),
628            ("account_key".to_string(), "dGVzdA==".to_string()),
629            (
630                "endpoint".to_string(),
631                "https://testaccount.blob.core.windows.net".to_string(),
632            ),
633        ]));
634
635        // Build az:// operator (uses Azblob backend)
636        let az_url = Url::parse("az://test-container/path").unwrap();
637        let az_operator =
638            AzureBlobStoreProvider::build_opendal_operator(&az_url, &common_opts).unwrap();
639
640        // Build abfss:// operator (uses Azdls backend)
641        let abfss_url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
642        let abfss_operator =
643            AzureBlobStoreProvider::build_opendal_operator(&abfss_url, &common_opts).unwrap();
644
645        let azblob_cap = az_operator.info().native_capability();
646        let azdls_cap = abfss_operator.info().native_capability();
647
648        // Both support basic operations
649        assert!(azblob_cap.read);
650        assert!(azdls_cap.read);
651        assert!(azblob_cap.write);
652        assert!(azdls_cap.write);
653        assert!(azblob_cap.list);
654        assert!(azdls_cap.list);
655
656        // Azdls supports rename and create_dir (HNS features); Azblob does not
657        assert!(azdls_cap.rename, "Azdls should support rename");
658        assert!(azdls_cap.create_dir, "Azdls should support create_dir");
659        assert!(!azblob_cap.rename, "Azblob should not support rename");
660    }
661}