Skip to main content

lance_io/object_store/providers/
azure.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    str::FromStr,
7    sync::{Arc, LazyLock},
8    time::Duration,
9};
10
11use object_store::ObjectStore as OSObjectStore;
12use object_store_opendal::OpendalStore;
13use opendal::{Operator, services::Azblob, services::Azdls};
14
15use object_store::{
16    RetryConfig,
17    azure::{AzureConfigKey, MicrosoftAzureBuilder},
18};
19use url::Url;
20
21use crate::object_store::{
22    DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
23    ObjectStoreParams, ObjectStoreProvider, StorageOptions,
24};
25use lance_core::error::{Error, Result};
26
27#[derive(Default, Debug)]
28pub struct AzureBlobStoreProvider;
29
30impl AzureBlobStoreProvider {
31    fn build_opendal_operator(
32        base_path: &Url,
33        storage_options: &StorageOptions,
34    ) -> Result<Operator> {
35        // Start with all storage options as the config map
36        // OpenDAL will handle environment variables through its default credentials chain
37        let mut config_map: HashMap<String, String> = storage_options.0.clone();
38
39        match base_path.scheme() {
40            "az" => {
41                let container = base_path
42                    .host_str()
43                    .ok_or_else(|| Error::invalid_input("Azure URL must contain container name"))?
44                    .to_string();
45
46                config_map.insert("container".to_string(), container);
47
48                let prefix = base_path.path().trim_start_matches('/');
49                if !prefix.is_empty() {
50                    config_map.insert("root".to_string(), format!("/{}", prefix));
51                }
52
53                Operator::from_iter::<Azblob>(config_map)
54                    .map_err(|e| {
55                        Error::invalid_input(format!(
56                            "Failed to create Azure Blob operator: {:?}",
57                            e
58                        ))
59                    })
60                    .map(|b| b.finish())
61            }
62            "abfss" => {
63                let filesystem = base_path.username();
64                if filesystem.is_empty() {
65                    return Err(Error::invalid_input(
66                        "abfss:// URL must include account: abfss://<filesystem>@<account>.dfs.core.windows.net/path",
67                    ));
68                }
69                let host = base_path.host_str().ok_or_else(|| {
70                    Error::invalid_input(
71                        "abfss:// URL must include account: abfss://<filesystem>@<account>.dfs.core.windows.net/path"
72                    )
73                })?;
74
75                config_map.insert("filesystem".to_string(), filesystem.to_string());
76                config_map.insert("endpoint".to_string(), format!("https://{}", host));
77                config_map
78                    .entry("account_name".to_string())
79                    .or_insert_with(|| host.split('.').next().unwrap_or(host).to_string());
80
81                let root_path = base_path.path().trim_start_matches('/');
82                if !root_path.is_empty() {
83                    config_map.insert("root".to_string(), format!("/{}", root_path));
84                }
85
86                Operator::from_iter::<Azdls>(config_map)
87                    .map_err(|e| {
88                        Error::invalid_input(format!(
89                            "Failed to create Azure DFS (ADLS Gen2) operator: {:?}",
90                            e
91                        ))
92                    })
93                    .map(|b| b.finish())
94            }
95            _ => Err(Error::invalid_input(format!(
96                "Unsupported Azure scheme: {}",
97                base_path.scheme()
98            ))),
99        }
100    }
101
102    async fn build_opendal_azure_store(
103        &self,
104        base_path: &Url,
105        storage_options: &StorageOptions,
106    ) -> Result<Arc<dyn OSObjectStore>> {
107        let operator = Self::build_opendal_operator(base_path, storage_options)?;
108        Ok(Arc::new(OpendalStore::new(operator)))
109    }
110
111    async fn build_microsoft_azure_store(
112        &self,
113        base_path: &Url,
114        storage_options: &StorageOptions,
115    ) -> Result<Arc<dyn OSObjectStore>> {
116        let max_retries = storage_options.client_max_retries();
117        let retry_timeout = storage_options.client_retry_timeout();
118        let retry_config = RetryConfig {
119            backoff: Default::default(),
120            max_retries,
121            retry_timeout: Duration::from_secs(retry_timeout),
122        };
123
124        let mut builder = MicrosoftAzureBuilder::new()
125            .with_url(base_path.as_ref())
126            .with_retry(retry_config)
127            .with_client_options(storage_options.client_options()?);
128        for (key, value) in storage_options.as_azure_options() {
129            builder = builder.with_config(key, value);
130        }
131
132        Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
133    }
134}
135
136#[async_trait::async_trait]
137impl ObjectStoreProvider for AzureBlobStoreProvider {
138    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
139        let scheme = base_path.scheme().to_string();
140        if scheme != "az" && scheme != "abfss" {
141            return Err(Error::invalid_input(format!(
142                "Unsupported Azure scheme '{}', expected 'az' or 'abfss'",
143                scheme
144            )));
145        }
146
147        let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
148        let mut storage_options =
149            StorageOptions(params.storage_options().cloned().unwrap_or_default());
150        storage_options.with_env_azure();
151        let download_retry_count = storage_options.download_retry_count();
152
153        let use_opendal = storage_options
154            .0
155            .get("use_opendal")
156            .map(|v| v.as_str() == "true")
157            .unwrap_or(false);
158
159        let inner: Arc<dyn OSObjectStore> = if use_opendal {
160            self.build_opendal_azure_store(&base_path, &storage_options)
161                .await?
162        } else {
163            self.build_microsoft_azure_store(&base_path, &storage_options)
164                .await?
165        };
166
167        Ok(ObjectStore {
168            inner,
169            scheme,
170            block_size,
171            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
172            use_constant_size_upload_parts: false,
173            list_is_lexically_ordered: true,
174            io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
175            download_retry_count,
176            io_tracker: Default::default(),
177            store_prefix: self
178                .calculate_object_store_prefix(&base_path, params.storage_options())?,
179        })
180    }
181
182    fn calculate_object_store_prefix(
183        &self,
184        url: &Url,
185        storage_options: Option<&HashMap<String, String>>,
186    ) -> Result<String> {
187        let authority = url.authority();
188        let (container, account) = match authority.find("@") {
189            Some(at_index) => {
190                // The URI has an:
191                // - az:// schema type and is similar to 'az://container@account.dfs.core.windows.net/path-part/file
192                //         or possibly 'az://container@account/path-part/file' (the short version).
193                // - abfss:// schema type and is similar to 'abfss://filesystem@account.dfs.core.windows.net/path-part/file'.
194                let container = &authority[..at_index];
195                let account = &authority[at_index + 1..];
196                (
197                    container,
198                    account.split(".").next().unwrap_or_default().to_string(),
199                )
200            }
201            None => {
202                // The URI looks like 'az://container/path-part/file'.
203                // We must look at the storage options to find the account.
204                let mut account = match storage_options {
205                    Some(opts) => StorageOptions::find_configured_storage_account(opts),
206                    None => None,
207                };
208                if account.is_none() {
209                    account = StorageOptions::find_configured_storage_account(&ENV_OPTIONS.0);
210                }
211                let account = account.ok_or(Error::invalid_input("Unable to find object store prefix: no Azure account name in URI, and no storage account configured."))?;
212                (authority, account)
213            }
214        };
215        Ok(format!("{}${}@{}", url.scheme(), container, account))
216    }
217}
218
219static ENV_OPTIONS: LazyLock<StorageOptions> = LazyLock::new(StorageOptions::from_env);
220
221impl StorageOptions {
222    /// Iterate over all environment variables, looking for anything related to Azure.
223    fn from_env() -> Self {
224        let mut opts = HashMap::<String, String>::new();
225        for (os_key, os_value) in std::env::vars_os() {
226            if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str())
227                && let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase())
228            {
229                opts.insert(config_key.as_ref().to_string(), value.to_string());
230            }
231        }
232        Self(opts)
233    }
234
235    /// Add values from the environment to storage options
236    pub fn with_env_azure(&mut self) {
237        for (os_key, os_value) in &ENV_OPTIONS.0 {
238            if !self.0.contains_key(os_key) {
239                self.0.insert(os_key.clone(), os_value.clone());
240            }
241        }
242    }
243
244    /// Subset of options relevant for azure storage
245    pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
246        self.0
247            .iter()
248            .filter_map(|(key, value)| {
249                let az_key = AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
250                Some((az_key, value.clone()))
251            })
252            .collect()
253    }
254
255    #[allow(clippy::manual_map)]
256    fn find_configured_storage_account(map: &HashMap<String, String>) -> Option<String> {
257        if let Some(account) = map.get("azure_storage_account_name") {
258            Some(account.clone())
259        } else if let Some(account) = map.get("account_name") {
260            Some(account.clone())
261        } else {
262            None
263        }
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use crate::object_store::ObjectStoreParams;
271    use std::collections::HashMap;
272
273    #[test]
274    fn test_azure_store_path() {
275        let provider = AzureBlobStoreProvider;
276
277        let url = Url::parse("az://bucket/path/to/file").unwrap();
278        let path = provider.extract_path(&url).unwrap();
279        let expected_path = object_store::path::Path::from("path/to/file");
280        assert_eq!(path, expected_path);
281    }
282
283    #[tokio::test]
284    async fn test_use_opendal_flag() {
285        use crate::object_store::StorageOptionsAccessor;
286        let provider = AzureBlobStoreProvider;
287        let url = Url::parse("az://test-container/path").unwrap();
288        let params_with_flag = ObjectStoreParams {
289            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
290                HashMap::from([
291                    ("use_opendal".to_string(), "true".to_string()),
292                    ("account_name".to_string(), "test_account".to_string()),
293                    (
294                        "endpoint".to_string(),
295                        "https://test_account.blob.core.windows.net".to_string(),
296                    ),
297                    (
298                        "account_key".to_string(),
299                        "dGVzdF9hY2NvdW50X2tleQ==".to_string(),
300                    ),
301                ]),
302            ))),
303            ..Default::default()
304        };
305
306        let store = provider
307            .new_store(url.clone(), &params_with_flag)
308            .await
309            .unwrap();
310        assert_eq!(store.scheme, "az");
311        let inner_desc = store.inner.to_string();
312        assert!(
313            inner_desc.contains("Opendal") && inner_desc.contains("azblob"),
314            "az:// with use_opendal=true should use OpenDAL Azblob, got: {}",
315            inner_desc
316        );
317    }
318
319    #[test]
320    fn test_find_configured_storage_account() {
321        assert_eq!(
322            Some("myaccount".to_string()),
323            StorageOptions::find_configured_storage_account(&HashMap::from_iter(
324                [
325                    ("access_key".to_string(), "myaccesskey".to_string()),
326                    (
327                        "azure_storage_account_name".to_string(),
328                        "myaccount".to_string()
329                    )
330                ]
331                .into_iter()
332            ))
333        );
334    }
335
336    #[test]
337    fn test_calculate_object_store_prefix_from_url_and_options() {
338        let provider = AzureBlobStoreProvider;
339        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
340        assert_eq!(
341            "az$container@bob",
342            provider
343                .calculate_object_store_prefix(
344                    &Url::parse("az://container/path").unwrap(),
345                    Some(&options)
346                )
347                .unwrap()
348        );
349    }
350
351    #[test]
352    fn test_calculate_object_store_prefix_from_url_and_ignored_options() {
353        let provider = AzureBlobStoreProvider;
354        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
355        assert_eq!(
356            "az$container@account",
357            provider
358                .calculate_object_store_prefix(
359                    &Url::parse("az://container@account.dfs.core.windows.net/path").unwrap(),
360                    Some(&options)
361                )
362                .unwrap()
363        );
364    }
365
366    #[test]
367    fn test_calculate_object_store_prefix_from_url_short_account() {
368        let provider = AzureBlobStoreProvider;
369        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
370        assert_eq!(
371            "az$container@account",
372            provider
373                .calculate_object_store_prefix(
374                    &Url::parse("az://container@account/path").unwrap(),
375                    Some(&options)
376                )
377                .unwrap()
378        );
379    }
380
381    #[test]
382    fn test_fail_to_calculate_object_store_prefix_from_url() {
383        let provider = AzureBlobStoreProvider;
384        let options = HashMap::from_iter([("access_key".to_string(), "myaccesskey".to_string())]);
385        let expected = "Invalid user input: Unable to find object store prefix: no Azure account name in URI, and no storage account configured.";
386        let result = provider
387            .calculate_object_store_prefix(
388                &Url::parse("az://container/path").unwrap(),
389                Some(&options),
390            )
391            .expect_err("expected error")
392            .to_string();
393        assert_eq!(expected, &result[..expected.len()]);
394    }
395
396    // --- abfss:// tests ---
397
398    #[test]
399    fn test_abfss_extract_path() {
400        let provider = AzureBlobStoreProvider;
401        let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path/to/dataset.lance")
402            .unwrap();
403        let path = provider.extract_path(&url).unwrap();
404        assert_eq!(
405            path,
406            object_store::path::Path::from("path/to/dataset.lance")
407        );
408    }
409
410    #[test]
411    fn test_calculate_abfss_prefix() {
412        let provider = AzureBlobStoreProvider;
413        let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path/to/data").unwrap();
414        let prefix = provider.calculate_object_store_prefix(&url, None).unwrap();
415        assert_eq!(prefix, "abfss$myfs@myaccount");
416    }
417
418    #[test]
419    fn test_calculate_abfss_prefix_ignores_storage_options() {
420        let provider = AzureBlobStoreProvider;
421        let options =
422            HashMap::from_iter([("account_name".to_string(), "other_account".to_string())]);
423        let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path").unwrap();
424        let prefix = provider
425            .calculate_object_store_prefix(&url, Some(&options))
426            .unwrap();
427        assert_eq!(prefix, "abfss$myfs@myaccount");
428    }
429
430    #[tokio::test]
431    async fn test_abfss_default_uses_microsoft_builder() {
432        use crate::object_store::StorageOptionsAccessor;
433        let provider = AzureBlobStoreProvider;
434        let url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
435        let params = ObjectStoreParams {
436            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
437                HashMap::from([
438                    ("account_name".to_string(), "testaccount".to_string()),
439                    ("account_key".to_string(), "dGVzdA==".to_string()),
440                ]),
441            ))),
442            ..Default::default()
443        };
444
445        let store = provider.new_store(url, &params).await.unwrap();
446        assert_eq!(store.scheme, "abfss");
447        assert!(!store.is_local());
448        assert!(store.is_cloud());
449        let inner_desc = store.inner.to_string();
450        assert!(
451            inner_desc.contains("MicrosoftAzure"),
452            "abfss:// without use_opendal should use MicrosoftAzureBuilder, got: {}",
453            inner_desc
454        );
455    }
456
457    #[tokio::test]
458    async fn test_unsupported_scheme_rejected() {
459        use crate::object_store::StorageOptionsAccessor;
460        let provider = AzureBlobStoreProvider;
461        let url = Url::parse("wasbs://container@myaccount.blob.core.windows.net/path").unwrap();
462        let params = ObjectStoreParams {
463            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
464                HashMap::from([
465                    ("account_name".to_string(), "myaccount".to_string()),
466                    ("account_key".to_string(), "dGVzdA==".to_string()),
467                ]),
468            ))),
469            ..Default::default()
470        };
471
472        let err = provider
473            .new_store(url, &params)
474            .await
475            .expect_err("expected error for unsupported scheme");
476        assert!(
477            err.to_string().contains("Unsupported Azure scheme"),
478            "unexpected error: {}",
479            err
480        );
481    }
482
483    #[tokio::test]
484    async fn test_abfss_with_opendal_uses_azdls() {
485        use crate::object_store::StorageOptionsAccessor;
486        let provider = AzureBlobStoreProvider;
487        let url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
488        let params = ObjectStoreParams {
489            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
490                HashMap::from([
491                    ("use_opendal".to_string(), "true".to_string()),
492                    ("account_name".to_string(), "testaccount".to_string()),
493                    ("account_key".to_string(), "dGVzdA==".to_string()),
494                ]),
495            ))),
496            ..Default::default()
497        };
498
499        let store = provider.new_store(url, &params).await.unwrap();
500        assert_eq!(store.scheme, "abfss");
501        assert!(!store.is_local());
502        assert!(store.is_cloud());
503        let inner_desc = store.inner.to_string();
504        assert!(
505            inner_desc.contains("Opendal") && inner_desc.contains("azdls"),
506            "abfss:// with use_opendal=true should use OpenDAL Azdls, got: {}",
507            inner_desc
508        );
509    }
510
511    #[test]
512    fn test_azdls_capabilities_differ_from_azblob() {
513        let common_opts = StorageOptions(HashMap::from([
514            ("account_name".to_string(), "testaccount".to_string()),
515            ("account_key".to_string(), "dGVzdA==".to_string()),
516            (
517                "endpoint".to_string(),
518                "https://testaccount.blob.core.windows.net".to_string(),
519            ),
520        ]));
521
522        // Build az:// operator (uses Azblob backend)
523        let az_url = Url::parse("az://test-container/path").unwrap();
524        let az_operator =
525            AzureBlobStoreProvider::build_opendal_operator(&az_url, &common_opts).unwrap();
526
527        // Build abfss:// operator (uses Azdls backend)
528        let abfss_url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
529        let abfss_operator =
530            AzureBlobStoreProvider::build_opendal_operator(&abfss_url, &common_opts).unwrap();
531
532        let azblob_cap = az_operator.info().native_capability();
533        let azdls_cap = abfss_operator.info().native_capability();
534
535        // Both support basic operations
536        assert!(azblob_cap.read);
537        assert!(azdls_cap.read);
538        assert!(azblob_cap.write);
539        assert!(azdls_cap.write);
540        assert!(azblob_cap.list);
541        assert!(azdls_cap.list);
542
543        // Azdls supports rename and create_dir (HNS features); Azblob does not
544        assert!(azdls_cap.rename, "Azdls should support rename");
545        assert!(azdls_cap.create_dir, "Azdls should support create_dir");
546        assert!(!azblob_cap.rename, "Azblob should not support rename");
547    }
548}