Skip to main content

lance_io/object_store/providers/
azure.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    str::FromStr,
7    sync::{Arc, LazyLock},
8    time::Duration,
9};
10
11use object_store::ObjectStore as OSObjectStore;
12use object_store_opendal::OpendalStore;
13use opendal::{Operator, services::Azblob};
14
15use object_store::{
16    RetryConfig,
17    azure::{AzureConfigKey, MicrosoftAzureBuilder},
18};
19use url::Url;
20
21use crate::object_store::{
22    DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
23    ObjectStoreParams, ObjectStoreProvider, StorageOptions,
24};
25use lance_core::error::{Error, Result};
26
27#[derive(Default, Debug)]
28pub struct AzureBlobStoreProvider;
29
30impl AzureBlobStoreProvider {
31    async fn build_opendal_azure_store(
32        &self,
33        base_path: &Url,
34        storage_options: &StorageOptions,
35    ) -> Result<Arc<dyn OSObjectStore>> {
36        let container = base_path
37            .host_str()
38            .ok_or_else(|| Error::invalid_input("Azure URL must contain container name"))?
39            .to_string();
40
41        let prefix = base_path.path().trim_start_matches('/').to_string();
42
43        // Start with all storage options as the config map
44        // OpenDAL will handle environment variables through its default credentials chain
45        let mut config_map: HashMap<String, String> = storage_options.0.clone();
46
47        // Set required OpenDAL configuration
48        config_map.insert("container".to_string(), container);
49
50        if !prefix.is_empty() {
51            config_map.insert("root".to_string(), format!("/{}", prefix));
52        }
53
54        let operator = Operator::from_iter::<Azblob>(config_map)
55            .map_err(|e| {
56                Error::invalid_input(format!("Failed to create Azure Blob operator: {:?}", e))
57            })?
58            .finish();
59
60        Ok(Arc::new(OpendalStore::new(operator)) as Arc<dyn OSObjectStore>)
61    }
62
63    async fn build_microsoft_azure_store(
64        &self,
65        base_path: &Url,
66        storage_options: &StorageOptions,
67    ) -> Result<Arc<dyn OSObjectStore>> {
68        let max_retries = storage_options.client_max_retries();
69        let retry_timeout = storage_options.client_retry_timeout();
70        let retry_config = RetryConfig {
71            backoff: Default::default(),
72            max_retries,
73            retry_timeout: Duration::from_secs(retry_timeout),
74        };
75
76        let mut builder = MicrosoftAzureBuilder::new()
77            .with_url(base_path.as_ref())
78            .with_retry(retry_config)
79            .with_client_options(storage_options.client_options()?);
80        for (key, value) in storage_options.as_azure_options() {
81            builder = builder.with_config(key, value);
82        }
83
84        Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
85    }
86}
87
88#[async_trait::async_trait]
89impl ObjectStoreProvider for AzureBlobStoreProvider {
90    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
91        let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
92        let mut storage_options =
93            StorageOptions(params.storage_options().cloned().unwrap_or_default());
94        storage_options.with_env_azure();
95        let download_retry_count = storage_options.download_retry_count();
96
97        let use_opendal = storage_options
98            .0
99            .get("use_opendal")
100            .map(|v| v.as_str() == "true")
101            .unwrap_or(false);
102
103        let inner = if use_opendal {
104            self.build_opendal_azure_store(&base_path, &storage_options)
105                .await?
106        } else {
107            self.build_microsoft_azure_store(&base_path, &storage_options)
108                .await?
109        };
110
111        Ok(ObjectStore {
112            inner,
113            scheme: String::from("az"),
114            block_size,
115            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
116            use_constant_size_upload_parts: false,
117            list_is_lexically_ordered: true,
118            io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
119            download_retry_count,
120            io_tracker: Default::default(),
121            store_prefix: self
122                .calculate_object_store_prefix(&base_path, params.storage_options())?,
123        })
124    }
125
126    fn calculate_object_store_prefix(
127        &self,
128        url: &Url,
129        storage_options: Option<&HashMap<String, String>>,
130    ) -> Result<String> {
131        let authority = url.authority();
132        let (container, account) = match authority.find("@") {
133            Some(at_index) => {
134                // The URI looks like 'az://container@account.dfs.core.windows.net/path-part/file',
135                // or possibly 'az://container@account/path-part/file'.
136                let container = &authority[..at_index];
137                let account = &authority[at_index + 1..];
138                (
139                    container,
140                    account.split(".").next().unwrap_or_default().to_string(),
141                )
142            }
143            None => {
144                // The URI looks like 'az://container/path-part/file'.
145                // We must look at the storage options to find the account.
146                let mut account = match storage_options {
147                    Some(opts) => StorageOptions::find_configured_storage_account(opts),
148                    None => None,
149                };
150                if account.is_none() {
151                    account = StorageOptions::find_configured_storage_account(&ENV_OPTIONS.0);
152                }
153                let account = account.ok_or(Error::invalid_input("Unable to find object store prefix: no Azure account name in URI, and no storage account configured."))?;
154                (authority, account)
155            }
156        };
157        Ok(format!("{}${}@{}", url.scheme(), container, account))
158    }
159}
160
161static ENV_OPTIONS: LazyLock<StorageOptions> = LazyLock::new(StorageOptions::from_env);
162
163impl StorageOptions {
164    /// Iterate over all environment variables, looking for anything related to Azure.
165    fn from_env() -> Self {
166        let mut opts = HashMap::<String, String>::new();
167        for (os_key, os_value) in std::env::vars_os() {
168            if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str())
169                && let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase())
170            {
171                opts.insert(config_key.as_ref().to_string(), value.to_string());
172            }
173        }
174        Self(opts)
175    }
176
177    /// Add values from the environment to storage options
178    pub fn with_env_azure(&mut self) {
179        for (os_key, os_value) in &ENV_OPTIONS.0 {
180            if !self.0.contains_key(os_key) {
181                self.0.insert(os_key.clone(), os_value.clone());
182            }
183        }
184    }
185
186    /// Subset of options relevant for azure storage
187    pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
188        self.0
189            .iter()
190            .filter_map(|(key, value)| {
191                let az_key = AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
192                Some((az_key, value.clone()))
193            })
194            .collect()
195    }
196
197    #[allow(clippy::manual_map)]
198    fn find_configured_storage_account(map: &HashMap<String, String>) -> Option<String> {
199        if let Some(account) = map.get("azure_storage_account_name") {
200            Some(account.clone())
201        } else if let Some(account) = map.get("account_name") {
202            Some(account.clone())
203        } else {
204            None
205        }
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use crate::object_store::ObjectStoreParams;
213    use std::collections::HashMap;
214
215    #[test]
216    fn test_azure_store_path() {
217        let provider = AzureBlobStoreProvider;
218
219        let url = Url::parse("az://bucket/path/to/file").unwrap();
220        let path = provider.extract_path(&url).unwrap();
221        let expected_path = object_store::path::Path::from("path/to/file");
222        assert_eq!(path, expected_path);
223    }
224
225    #[tokio::test]
226    async fn test_use_opendal_flag() {
227        use crate::object_store::StorageOptionsAccessor;
228        let provider = AzureBlobStoreProvider;
229        let url = Url::parse("az://test-container/path").unwrap();
230        let params_with_flag = ObjectStoreParams {
231            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
232                HashMap::from([
233                    ("use_opendal".to_string(), "true".to_string()),
234                    ("account_name".to_string(), "test_account".to_string()),
235                    (
236                        "endpoint".to_string(),
237                        "https://test_account.blob.core.windows.net".to_string(),
238                    ),
239                    (
240                        "account_key".to_string(),
241                        "dGVzdF9hY2NvdW50X2tleQ==".to_string(),
242                    ),
243                ]),
244            ))),
245            ..Default::default()
246        };
247
248        let store = provider
249            .new_store(url.clone(), &params_with_flag)
250            .await
251            .unwrap();
252        assert_eq!(store.scheme, "az");
253    }
254
255    #[test]
256    fn test_find_configured_storage_account() {
257        assert_eq!(
258            Some("myaccount".to_string()),
259            StorageOptions::find_configured_storage_account(&HashMap::from_iter(
260                [
261                    ("access_key".to_string(), "myaccesskey".to_string()),
262                    (
263                        "azure_storage_account_name".to_string(),
264                        "myaccount".to_string()
265                    )
266                ]
267                .into_iter()
268            ))
269        );
270    }
271
272    #[test]
273    fn test_calculate_object_store_prefix_from_url_and_options() {
274        let provider = AzureBlobStoreProvider;
275        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
276        assert_eq!(
277            "az$container@bob",
278            provider
279                .calculate_object_store_prefix(
280                    &Url::parse("az://container/path").unwrap(),
281                    Some(&options)
282                )
283                .unwrap()
284        );
285    }
286
287    #[test]
288    fn test_calculate_object_store_prefix_from_url_and_ignored_options() {
289        let provider = AzureBlobStoreProvider;
290        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
291        assert_eq!(
292            "az$container@account",
293            provider
294                .calculate_object_store_prefix(
295                    &Url::parse("az://container@account.dfs.core.windows.net/path").unwrap(),
296                    Some(&options)
297                )
298                .unwrap()
299        );
300    }
301
302    #[test]
303    fn test_fail_to_calculate_object_store_prefix_from_url() {
304        let provider = AzureBlobStoreProvider;
305        let options = HashMap::from_iter([("access_key".to_string(), "myaccesskey".to_string())]);
306        let expected = "Invalid user input: Unable to find object store prefix: no Azure account name in URI, and no storage account configured.";
307        let result = provider
308            .calculate_object_store_prefix(
309                &Url::parse("az://container/path").unwrap(),
310                Some(&options),
311            )
312            .expect_err("expected error")
313            .to_string();
314        assert_eq!(expected, &result[..expected.len()]);
315    }
316}