lance_io/object_store/providers/
azure.rs1use std::{
5 collections::HashMap,
6 str::FromStr,
7 sync::{Arc, LazyLock},
8 time::Duration,
9};
10
11use object_store::ObjectStore as OSObjectStore;
12use object_store_opendal::OpendalStore;
13use opendal::{Operator, services::Azblob};
14
15use object_store::{
16 RetryConfig,
17 azure::{AzureConfigKey, MicrosoftAzureBuilder},
18};
19use url::Url;
20
21use crate::object_store::{
22 DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
23 ObjectStoreParams, ObjectStoreProvider, StorageOptions,
24};
25use lance_core::error::{Error, Result};
26
27#[derive(Default, Debug)]
28pub struct AzureBlobStoreProvider;
29
30impl AzureBlobStoreProvider {
31 async fn build_opendal_azure_store(
32 &self,
33 base_path: &Url,
34 storage_options: &StorageOptions,
35 ) -> Result<Arc<dyn OSObjectStore>> {
36 let container = base_path
37 .host_str()
38 .ok_or_else(|| Error::invalid_input("Azure URL must contain container name"))?
39 .to_string();
40
41 let prefix = base_path.path().trim_start_matches('/').to_string();
42
43 let mut config_map: HashMap<String, String> = storage_options.0.clone();
46
47 config_map.insert("container".to_string(), container);
49
50 if !prefix.is_empty() {
51 config_map.insert("root".to_string(), format!("/{}", prefix));
52 }
53
54 let operator = Operator::from_iter::<Azblob>(config_map)
55 .map_err(|e| {
56 Error::invalid_input(format!("Failed to create Azure Blob operator: {:?}", e))
57 })?
58 .finish();
59
60 Ok(Arc::new(OpendalStore::new(operator)) as Arc<dyn OSObjectStore>)
61 }
62
63 async fn build_microsoft_azure_store(
64 &self,
65 base_path: &Url,
66 storage_options: &StorageOptions,
67 ) -> Result<Arc<dyn OSObjectStore>> {
68 let max_retries = storage_options.client_max_retries();
69 let retry_timeout = storage_options.client_retry_timeout();
70 let retry_config = RetryConfig {
71 backoff: Default::default(),
72 max_retries,
73 retry_timeout: Duration::from_secs(retry_timeout),
74 };
75
76 let mut builder = MicrosoftAzureBuilder::new()
77 .with_url(base_path.as_ref())
78 .with_retry(retry_config)
79 .with_client_options(storage_options.client_options()?);
80 for (key, value) in storage_options.as_azure_options() {
81 builder = builder.with_config(key, value);
82 }
83
84 Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
85 }
86}
87
88#[async_trait::async_trait]
89impl ObjectStoreProvider for AzureBlobStoreProvider {
90 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
91 let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
92 let mut storage_options =
93 StorageOptions(params.storage_options().cloned().unwrap_or_default());
94 storage_options.with_env_azure();
95 let download_retry_count = storage_options.download_retry_count();
96
97 let use_opendal = storage_options
98 .0
99 .get("use_opendal")
100 .map(|v| v.as_str() == "true")
101 .unwrap_or(false);
102
103 let inner = if use_opendal {
104 self.build_opendal_azure_store(&base_path, &storage_options)
105 .await?
106 } else {
107 self.build_microsoft_azure_store(&base_path, &storage_options)
108 .await?
109 };
110
111 Ok(ObjectStore {
112 inner,
113 scheme: String::from("az"),
114 block_size,
115 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
116 use_constant_size_upload_parts: false,
117 list_is_lexically_ordered: true,
118 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
119 download_retry_count,
120 io_tracker: Default::default(),
121 store_prefix: self
122 .calculate_object_store_prefix(&base_path, params.storage_options())?,
123 })
124 }
125
126 fn calculate_object_store_prefix(
127 &self,
128 url: &Url,
129 storage_options: Option<&HashMap<String, String>>,
130 ) -> Result<String> {
131 let authority = url.authority();
132 let (container, account) = match authority.find("@") {
133 Some(at_index) => {
134 let container = &authority[..at_index];
137 let account = &authority[at_index + 1..];
138 (
139 container,
140 account.split(".").next().unwrap_or_default().to_string(),
141 )
142 }
143 None => {
144 let mut account = match storage_options {
147 Some(opts) => StorageOptions::find_configured_storage_account(opts),
148 None => None,
149 };
150 if account.is_none() {
151 account = StorageOptions::find_configured_storage_account(&ENV_OPTIONS.0);
152 }
153 let account = account.ok_or(Error::invalid_input("Unable to find object store prefix: no Azure account name in URI, and no storage account configured."))?;
154 (authority, account)
155 }
156 };
157 Ok(format!("{}${}@{}", url.scheme(), container, account))
158 }
159}
160
161static ENV_OPTIONS: LazyLock<StorageOptions> = LazyLock::new(StorageOptions::from_env);
162
163impl StorageOptions {
164 fn from_env() -> Self {
166 let mut opts = HashMap::<String, String>::new();
167 for (os_key, os_value) in std::env::vars_os() {
168 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str())
169 && let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase())
170 {
171 opts.insert(config_key.as_ref().to_string(), value.to_string());
172 }
173 }
174 Self(opts)
175 }
176
177 pub fn with_env_azure(&mut self) {
179 for (os_key, os_value) in &ENV_OPTIONS.0 {
180 if !self.0.contains_key(os_key) {
181 self.0.insert(os_key.clone(), os_value.clone());
182 }
183 }
184 }
185
186 pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
188 self.0
189 .iter()
190 .filter_map(|(key, value)| {
191 let az_key = AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
192 Some((az_key, value.clone()))
193 })
194 .collect()
195 }
196
197 #[allow(clippy::manual_map)]
198 fn find_configured_storage_account(map: &HashMap<String, String>) -> Option<String> {
199 if let Some(account) = map.get("azure_storage_account_name") {
200 Some(account.clone())
201 } else if let Some(account) = map.get("account_name") {
202 Some(account.clone())
203 } else {
204 None
205 }
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use crate::object_store::ObjectStoreParams;
213 use std::collections::HashMap;
214
215 #[test]
216 fn test_azure_store_path() {
217 let provider = AzureBlobStoreProvider;
218
219 let url = Url::parse("az://bucket/path/to/file").unwrap();
220 let path = provider.extract_path(&url).unwrap();
221 let expected_path = object_store::path::Path::from("path/to/file");
222 assert_eq!(path, expected_path);
223 }
224
225 #[tokio::test]
226 async fn test_use_opendal_flag() {
227 use crate::object_store::StorageOptionsAccessor;
228 let provider = AzureBlobStoreProvider;
229 let url = Url::parse("az://test-container/path").unwrap();
230 let params_with_flag = ObjectStoreParams {
231 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
232 HashMap::from([
233 ("use_opendal".to_string(), "true".to_string()),
234 ("account_name".to_string(), "test_account".to_string()),
235 (
236 "endpoint".to_string(),
237 "https://test_account.blob.core.windows.net".to_string(),
238 ),
239 (
240 "account_key".to_string(),
241 "dGVzdF9hY2NvdW50X2tleQ==".to_string(),
242 ),
243 ]),
244 ))),
245 ..Default::default()
246 };
247
248 let store = provider
249 .new_store(url.clone(), ¶ms_with_flag)
250 .await
251 .unwrap();
252 assert_eq!(store.scheme, "az");
253 }
254
255 #[test]
256 fn test_find_configured_storage_account() {
257 assert_eq!(
258 Some("myaccount".to_string()),
259 StorageOptions::find_configured_storage_account(&HashMap::from_iter(
260 [
261 ("access_key".to_string(), "myaccesskey".to_string()),
262 (
263 "azure_storage_account_name".to_string(),
264 "myaccount".to_string()
265 )
266 ]
267 .into_iter()
268 ))
269 );
270 }
271
272 #[test]
273 fn test_calculate_object_store_prefix_from_url_and_options() {
274 let provider = AzureBlobStoreProvider;
275 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
276 assert_eq!(
277 "az$container@bob",
278 provider
279 .calculate_object_store_prefix(
280 &Url::parse("az://container/path").unwrap(),
281 Some(&options)
282 )
283 .unwrap()
284 );
285 }
286
287 #[test]
288 fn test_calculate_object_store_prefix_from_url_and_ignored_options() {
289 let provider = AzureBlobStoreProvider;
290 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
291 assert_eq!(
292 "az$container@account",
293 provider
294 .calculate_object_store_prefix(
295 &Url::parse("az://container@account.dfs.core.windows.net/path").unwrap(),
296 Some(&options)
297 )
298 .unwrap()
299 );
300 }
301
302 #[test]
303 fn test_fail_to_calculate_object_store_prefix_from_url() {
304 let provider = AzureBlobStoreProvider;
305 let options = HashMap::from_iter([("access_key".to_string(), "myaccesskey".to_string())]);
306 let expected = "Invalid user input: Unable to find object store prefix: no Azure account name in URI, and no storage account configured.";
307 let result = provider
308 .calculate_object_store_prefix(
309 &Url::parse("az://container/path").unwrap(),
310 Some(&options),
311 )
312 .expect_err("expected error")
313 .to_string();
314 assert_eq!(expected, &result[..expected.len()]);
315 }
316}