Skip to main content

lance_io/object_store/providers/
azure.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    str::FromStr,
7    sync::{Arc, LazyLock},
8    time::Duration,
9};
10
11use object_store::ObjectStore as OSObjectStore;
12use object_store_opendal::OpendalStore;
13use opendal::{services::Azblob, Operator};
14use snafu::location;
15
16use object_store::{
17    azure::{AzureConfigKey, MicrosoftAzureBuilder},
18    RetryConfig,
19};
20use url::Url;
21
22use crate::object_store::{
23    ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE,
24    DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE,
25};
26use lance_core::error::{Error, Result};
27
28#[derive(Default, Debug)]
29pub struct AzureBlobStoreProvider;
30
31impl AzureBlobStoreProvider {
32    async fn build_opendal_azure_store(
33        &self,
34        base_path: &Url,
35        storage_options: &StorageOptions,
36    ) -> Result<Arc<dyn OSObjectStore>> {
37        let container = base_path
38            .host_str()
39            .ok_or_else(|| {
40                Error::invalid_input("Azure URL must contain container name", location!())
41            })?
42            .to_string();
43
44        let prefix = base_path.path().trim_start_matches('/').to_string();
45
46        // Start with all storage options as the config map
47        // OpenDAL will handle environment variables through its default credentials chain
48        let mut config_map: HashMap<String, String> = storage_options.0.clone();
49
50        // Set required OpenDAL configuration
51        config_map.insert("container".to_string(), container);
52
53        if !prefix.is_empty() {
54            config_map.insert("root".to_string(), format!("/{}", prefix));
55        }
56
57        let operator = Operator::from_iter::<Azblob>(config_map)
58            .map_err(|e| {
59                Error::invalid_input(
60                    format!("Failed to create Azure Blob operator: {:?}", e),
61                    location!(),
62                )
63            })?
64            .finish();
65
66        Ok(Arc::new(OpendalStore::new(operator)) as Arc<dyn OSObjectStore>)
67    }
68
69    async fn build_microsoft_azure_store(
70        &self,
71        base_path: &Url,
72        storage_options: &StorageOptions,
73    ) -> Result<Arc<dyn OSObjectStore>> {
74        let max_retries = storage_options.client_max_retries();
75        let retry_timeout = storage_options.client_retry_timeout();
76        let retry_config = RetryConfig {
77            backoff: Default::default(),
78            max_retries,
79            retry_timeout: Duration::from_secs(retry_timeout),
80        };
81
82        let mut builder = MicrosoftAzureBuilder::new()
83            .with_url(base_path.as_ref())
84            .with_retry(retry_config);
85        for (key, value) in storage_options.as_azure_options() {
86            builder = builder.with_config(key, value);
87        }
88
89        Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
90    }
91}
92
93#[async_trait::async_trait]
94impl ObjectStoreProvider for AzureBlobStoreProvider {
95    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
96        let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
97        let mut storage_options =
98            StorageOptions(params.storage_options().cloned().unwrap_or_default());
99        storage_options.with_env_azure();
100        let download_retry_count = storage_options.download_retry_count();
101
102        let use_opendal = storage_options
103            .0
104            .get("use_opendal")
105            .map(|v| v.as_str() == "true")
106            .unwrap_or(false);
107
108        let inner = if use_opendal {
109            self.build_opendal_azure_store(&base_path, &storage_options)
110                .await?
111        } else {
112            self.build_microsoft_azure_store(&base_path, &storage_options)
113                .await?
114        };
115
116        Ok(ObjectStore {
117            inner,
118            scheme: String::from("az"),
119            block_size,
120            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
121            use_constant_size_upload_parts: false,
122            list_is_lexically_ordered: true,
123            io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
124            download_retry_count,
125            io_tracker: Default::default(),
126            store_prefix: self
127                .calculate_object_store_prefix(&base_path, params.storage_options())?,
128        })
129    }
130
131    fn calculate_object_store_prefix(
132        &self,
133        url: &Url,
134        storage_options: Option<&HashMap<String, String>>,
135    ) -> Result<String> {
136        let authority = url.authority();
137        let (container, account) = match authority.find("@") {
138            Some(at_index) => {
139                // The URI looks like 'az://container@account.dfs.core.windows.net/path-part/file',
140                // or possibly 'az://container@account/path-part/file'.
141                let container = &authority[..at_index];
142                let account = &authority[at_index + 1..];
143                (
144                    container,
145                    account.split(".").next().unwrap_or_default().to_string(),
146                )
147            }
148            None => {
149                // The URI looks like 'az://container/path-part/file'.
150                // We must look at the storage options to find the account.
151                let mut account = match storage_options {
152                    Some(opts) => StorageOptions::find_configured_storage_account(opts),
153                    None => None,
154                };
155                if account.is_none() {
156                    account = StorageOptions::find_configured_storage_account(&ENV_OPTIONS.0);
157                }
158                let account = account.ok_or(Error::invalid_input(
159                    "Unable to find object store prefix: no Azure account name in URI, and no storage account configured.",
160                    location!(),
161                ))?;
162                (authority, account)
163            }
164        };
165        Ok(format!("{}${}@{}", url.scheme(), container, account))
166    }
167}
168
169static ENV_OPTIONS: LazyLock<StorageOptions> = LazyLock::new(StorageOptions::from_env);
170
171impl StorageOptions {
172    /// Iterate over all environment variables, looking for anything related to Azure.
173    fn from_env() -> Self {
174        let mut opts = HashMap::<String, String>::new();
175        for (os_key, os_value) in std::env::vars_os() {
176            if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
177                if let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase()) {
178                    opts.insert(config_key.as_ref().to_string(), value.to_string());
179                }
180            }
181        }
182        Self(opts)
183    }
184
185    /// Add values from the environment to storage options
186    pub fn with_env_azure(&mut self) {
187        for (os_key, os_value) in &ENV_OPTIONS.0 {
188            if !self.0.contains_key(os_key) {
189                self.0.insert(os_key.clone(), os_value.clone());
190            }
191        }
192    }
193
194    /// Subset of options relevant for azure storage
195    pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
196        self.0
197            .iter()
198            .filter_map(|(key, value)| {
199                let az_key = AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
200                Some((az_key, value.clone()))
201            })
202            .collect()
203    }
204
205    #[allow(clippy::manual_map)]
206    fn find_configured_storage_account(map: &HashMap<String, String>) -> Option<String> {
207        if let Some(account) = map.get("azure_storage_account_name") {
208            Some(account.clone())
209        } else if let Some(account) = map.get("account_name") {
210            Some(account.clone())
211        } else {
212            None
213        }
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use crate::object_store::ObjectStoreParams;
221    use std::collections::HashMap;
222
223    #[test]
224    fn test_azure_store_path() {
225        let provider = AzureBlobStoreProvider;
226
227        let url = Url::parse("az://bucket/path/to/file").unwrap();
228        let path = provider.extract_path(&url).unwrap();
229        let expected_path = object_store::path::Path::from("path/to/file");
230        assert_eq!(path, expected_path);
231    }
232
233    #[tokio::test]
234    async fn test_use_opendal_flag() {
235        use crate::object_store::StorageOptionsAccessor;
236        let provider = AzureBlobStoreProvider;
237        let url = Url::parse("az://test-container/path").unwrap();
238        let params_with_flag = ObjectStoreParams {
239            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
240                HashMap::from([
241                    ("use_opendal".to_string(), "true".to_string()),
242                    ("account_name".to_string(), "test_account".to_string()),
243                    (
244                        "endpoint".to_string(),
245                        "https://test_account.blob.core.windows.net".to_string(),
246                    ),
247                    (
248                        "account_key".to_string(),
249                        "dGVzdF9hY2NvdW50X2tleQ==".to_string(),
250                    ),
251                ]),
252            ))),
253            ..Default::default()
254        };
255
256        let store = provider
257            .new_store(url.clone(), &params_with_flag)
258            .await
259            .unwrap();
260        assert_eq!(store.scheme, "az");
261    }
262
263    #[test]
264    fn test_find_configured_storage_account() {
265        assert_eq!(
266            Some("myaccount".to_string()),
267            StorageOptions::find_configured_storage_account(&HashMap::from_iter(
268                [
269                    ("access_key".to_string(), "myaccesskey".to_string()),
270                    (
271                        "azure_storage_account_name".to_string(),
272                        "myaccount".to_string()
273                    )
274                ]
275                .into_iter()
276            ))
277        );
278    }
279
280    #[test]
281    fn test_calculate_object_store_prefix_from_url_and_options() {
282        let provider = AzureBlobStoreProvider;
283        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
284        assert_eq!(
285            "az$container@bob",
286            provider
287                .calculate_object_store_prefix(
288                    &Url::parse("az://container/path").unwrap(),
289                    Some(&options)
290                )
291                .unwrap()
292        );
293    }
294
295    #[test]
296    fn test_calculate_object_store_prefix_from_url_and_ignored_options() {
297        let provider = AzureBlobStoreProvider;
298        let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
299        assert_eq!(
300            "az$container@account",
301            provider
302                .calculate_object_store_prefix(
303                    &Url::parse("az://container@account.dfs.core.windows.net/path").unwrap(),
304                    Some(&options)
305                )
306                .unwrap()
307        );
308    }
309
310    #[test]
311    fn test_fail_to_calculate_object_store_prefix_from_url() {
312        let provider = AzureBlobStoreProvider;
313        let options = HashMap::from_iter([("access_key".to_string(), "myaccesskey".to_string())]);
314        let expected = "Invalid user input: Unable to find object store prefix: no Azure account name in URI, and no storage account configured.";
315        let result = provider
316            .calculate_object_store_prefix(
317                &Url::parse("az://container/path").unwrap(),
318                Some(&options),
319            )
320            .expect_err("expected error")
321            .to_string();
322        assert_eq!(expected, &result[..expected.len()]);
323    }
324}