lance_io/object_store/providers/
azure.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
5
6use object_store::ObjectStore as OSObjectStore;
7use object_store_opendal::OpendalStore;
8use opendal::{services::Azblob, Operator};
9use snafu::location;
10
11use object_store::{
12    azure::{AzureConfigKey, MicrosoftAzureBuilder},
13    RetryConfig,
14};
15use url::Url;
16
17use crate::object_store::{
18    ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE,
19    DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE,
20};
21use lance_core::error::{Error, Result};
22
23#[derive(Default, Debug)]
24pub struct AzureBlobStoreProvider;
25
26impl AzureBlobStoreProvider {
27    async fn build_opendal_azure_store(
28        &self,
29        base_path: &Url,
30        storage_options: &StorageOptions,
31    ) -> Result<Arc<dyn OSObjectStore>> {
32        let container = base_path
33            .host_str()
34            .ok_or_else(|| {
35                Error::invalid_input("Azure URL must contain container name", location!())
36            })?
37            .to_string();
38
39        let prefix = base_path.path().trim_start_matches('/').to_string();
40
41        // Start with all storage options as the config map
42        // OpenDAL will handle environment variables through its default credentials chain
43        let mut config_map: HashMap<String, String> = storage_options.0.clone();
44
45        // Set required OpenDAL configuration
46        config_map.insert("container".to_string(), container);
47
48        if !prefix.is_empty() {
49            config_map.insert("root".to_string(), format!("/{}", prefix));
50        }
51
52        let operator = Operator::from_iter::<Azblob>(config_map)
53            .map_err(|e| {
54                Error::invalid_input(
55                    format!("Failed to create Azure Blob operator: {:?}", e),
56                    location!(),
57                )
58            })?
59            .finish();
60
61        Ok(Arc::new(OpendalStore::new(operator)) as Arc<dyn OSObjectStore>)
62    }
63
64    async fn build_microsoft_azure_store(
65        &self,
66        base_path: &Url,
67        storage_options: &StorageOptions,
68    ) -> Result<Arc<dyn OSObjectStore>> {
69        let max_retries = storage_options.client_max_retries();
70        let retry_timeout = storage_options.client_retry_timeout();
71        let retry_config = RetryConfig {
72            backoff: Default::default(),
73            max_retries,
74            retry_timeout: Duration::from_secs(retry_timeout),
75        };
76
77        let mut builder = MicrosoftAzureBuilder::new()
78            .with_url(base_path.as_ref())
79            .with_retry(retry_config);
80        for (key, value) in storage_options.as_azure_options() {
81            builder = builder.with_config(key, value);
82        }
83
84        Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
85    }
86}
87
88#[async_trait::async_trait]
89impl ObjectStoreProvider for AzureBlobStoreProvider {
90    async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
91        let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
92        let mut storage_options =
93            StorageOptions(params.storage_options.clone().unwrap_or_default());
94        storage_options.with_env_azure();
95        let download_retry_count = storage_options.download_retry_count();
96
97        let use_opendal = storage_options
98            .0
99            .get("use_opendal")
100            .map(|v| v.as_str() == "true")
101            .unwrap_or(false);
102
103        let inner = if use_opendal {
104            self.build_opendal_azure_store(&base_path, &storage_options)
105                .await?
106        } else {
107            self.build_microsoft_azure_store(&base_path, &storage_options)
108                .await?
109        };
110
111        Ok(ObjectStore {
112            inner,
113            scheme: String::from("az"),
114            block_size,
115            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
116            use_constant_size_upload_parts: false,
117            list_is_lexically_ordered: true,
118            io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
119            download_retry_count,
120        })
121    }
122}
123
124impl StorageOptions {
125    /// Add values from the environment to storage options
126    pub fn with_env_azure(&mut self) {
127        for (os_key, os_value) in std::env::vars_os() {
128            if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
129                if let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase()) {
130                    if !self.0.contains_key(config_key.as_ref()) {
131                        self.0
132                            .insert(config_key.as_ref().to_string(), value.to_string());
133                    }
134                }
135            }
136        }
137    }
138
139    /// Subset of options relevant for azure storage
140    pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
141        self.0
142            .iter()
143            .filter_map(|(key, value)| {
144                let az_key = AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
145                Some((az_key, value.clone()))
146            })
147            .collect()
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use crate::object_store::ObjectStoreParams;
155    use std::collections::HashMap;
156
157    #[test]
158    fn test_azure_store_path() {
159        let provider = AzureBlobStoreProvider;
160
161        let url = Url::parse("az://bucket/path/to/file").unwrap();
162        let path = provider.extract_path(&url).unwrap();
163        let expected_path = object_store::path::Path::from("path/to/file");
164        assert_eq!(path, expected_path);
165    }
166
167    #[tokio::test]
168    async fn test_use_opendal_flag() {
169        let provider = AzureBlobStoreProvider;
170        let url = Url::parse("az://test-container/path").unwrap();
171        let params_with_flag = ObjectStoreParams {
172            storage_options: Some(HashMap::from([
173                ("use_opendal".to_string(), "true".to_string()),
174                ("account_name".to_string(), "test_account".to_string()),
175                (
176                    "endpoint".to_string(),
177                    "https://test_account.blob.core.windows.net".to_string(),
178                ),
179                ("account_key".to_string(), "12345=".to_string()),
180            ])),
181            ..Default::default()
182        };
183
184        let store = provider
185            .new_store(url.clone(), &params_with_flag)
186            .await
187            .unwrap();
188        assert_eq!(store.scheme, "az");
189    }
190}