1use std::{
5 collections::HashMap,
6 str::FromStr,
7 sync::{Arc, LazyLock},
8 time::Duration,
9};
10
11use object_store::ObjectStore as OSObjectStore;
12use object_store_opendal::OpendalStore;
13use opendal::{Operator, services::Azblob, services::Azdls};
14
15use object_store::{
16 RetryConfig,
17 azure::{AzureConfigKey, AzureCredential, MicrosoftAzureBuilder},
18};
19use url::Url;
20
21use crate::object_store::{
22 DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
23 ObjectStoreParams, ObjectStoreProvider, StorageOptions, StorageOptionsAccessor,
24 dynamic_credentials::build_dynamic_credential_provider,
25 throttle::{AimdThrottleConfig, AimdThrottledStore},
26};
27use lance_core::error::{Error, Result};
28
29#[derive(Default, Debug)]
30pub struct AzureBlobStoreProvider;
31
32impl AzureBlobStoreProvider {
33 fn normalize_opendal_azure_options(
38 options: &HashMap<String, String>,
39 ) -> HashMap<String, String> {
40 let mut config_map = options.clone();
42
43 let alias_groups: &[(&str, &[&str])] = &[
46 ("account_name", &["azure_storage_account_name"]),
47 ("endpoint", &["azure_storage_endpoint", "azure_endpoint"]),
48 (
49 "account_key",
50 &[
51 "azure_storage_account_key",
52 "azure_storage_access_key",
53 "azure_storage_master_key",
54 "access_key",
55 "master_key",
56 ],
57 ),
58 (
59 "sas_token",
60 &[
61 "azure_storage_sas_token",
62 "azure_storage_sas_key",
63 "sas_key",
64 ],
65 ),
66 ];
67
68 for (canonical, aliases) in alias_groups {
69 if !config_map.contains_key(*canonical) {
70 for alias in *aliases {
71 if let Some(value) = config_map.remove(*alias) {
72 config_map.insert(canonical.to_string(), value);
73 break;
74 }
75 }
76 } else {
77 for alias in *aliases {
79 config_map.remove(*alias);
80 }
81 }
82 }
83
84 config_map
85 }
86
87 fn build_opendal_operator(
88 base_path: &Url,
89 storage_options: &StorageOptions,
90 ) -> Result<Operator> {
91 let mut config_map = Self::normalize_opendal_azure_options(&storage_options.0);
94
95 match base_path.scheme() {
96 "az" => {
97 let container = base_path
98 .host_str()
99 .ok_or_else(|| Error::invalid_input("Azure URL must contain container name"))?
100 .to_string();
101
102 config_map.insert("container".to_string(), container);
103
104 let prefix = base_path.path().trim_start_matches('/');
105 if !prefix.is_empty() {
106 config_map.insert("root".to_string(), format!("/{}", prefix));
107 }
108
109 Operator::from_iter::<Azblob>(config_map)
110 .map_err(|e| {
111 Error::invalid_input(format!(
112 "Failed to create Azure Blob operator: {:?}",
113 e
114 ))
115 })
116 .map(|b| b.finish())
117 }
118 "abfss" => {
119 let filesystem = base_path.username();
120 if filesystem.is_empty() {
121 return Err(Error::invalid_input(
122 "abfss:// URL must include account: abfss://<filesystem>@<account>.dfs.core.windows.net/path",
123 ));
124 }
125 let host = base_path.host_str().ok_or_else(|| {
126 Error::invalid_input(
127 "abfss:// URL must include account: abfss://<filesystem>@<account>.dfs.core.windows.net/path"
128 )
129 })?;
130
131 config_map.insert("filesystem".to_string(), filesystem.to_string());
132 config_map.insert("endpoint".to_string(), format!("https://{}", host));
133 config_map
134 .entry("account_name".to_string())
135 .or_insert_with(|| host.split('.').next().unwrap_or(host).to_string());
136
137 let root_path = base_path.path().trim_start_matches('/');
138 if !root_path.is_empty() {
139 config_map.insert("root".to_string(), format!("/{}", root_path));
140 }
141
142 Operator::from_iter::<Azdls>(config_map)
143 .map_err(|e| {
144 Error::invalid_input(format!(
145 "Failed to create Azure DFS (ADLS Gen2) operator: {:?}",
146 e
147 ))
148 })
149 .map(|b| b.finish())
150 }
151 _ => Err(Error::invalid_input(format!(
152 "Unsupported Azure scheme: {}",
153 base_path.scheme()
154 ))),
155 }
156 }
157
158 async fn build_opendal_azure_store(
159 &self,
160 base_path: &Url,
161 storage_options: &StorageOptions,
162 ) -> Result<Arc<dyn OSObjectStore>> {
163 let operator = Self::build_opendal_operator(base_path, storage_options)?;
164 Ok(Arc::new(OpendalStore::new(operator)))
165 }
166
167 async fn build_microsoft_azure_store(
168 &self,
169 base_path: &Url,
170 storage_options: &StorageOptions,
171 accessor: Option<Arc<StorageOptionsAccessor>>,
172 ) -> Result<Arc<dyn OSObjectStore>> {
173 let retry_config = RetryConfig {
176 backoff: Default::default(),
177 max_retries: storage_options.client_max_retries(),
178 retry_timeout: Duration::from_secs(storage_options.client_retry_timeout()),
179 };
180
181 let mut builder = MicrosoftAzureBuilder::new()
182 .with_url(base_path.as_ref())
183 .with_retry(retry_config)
184 .with_client_options(storage_options.client_options()?);
185 for (key, value) in storage_options.as_azure_options() {
186 builder = builder.with_config(key, value);
187 }
188
189 if let Some(credentials) =
190 build_dynamic_credential_provider::<AzureCredential>(accessor).await?
191 {
192 builder = builder.with_credentials(credentials);
193 }
194
195 Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
196 }
197
198 fn calculate_object_store_prefix_with_env(
199 url: &Url,
200 storage_options: Option<&HashMap<String, String>>,
201 env_options: &HashMap<String, String>,
202 ) -> Result<String> {
203 let authority = url.authority();
204 let (container, account) = match authority.find("@") {
205 Some(at_index) => {
206 let container = &authority[..at_index];
211 let account = &authority[at_index + 1..];
212 (
213 container,
214 account.split(".").next().unwrap_or_default().to_string(),
215 )
216 }
217 None => {
218 let mut account = match storage_options {
221 Some(opts) => StorageOptions::find_configured_storage_account(opts),
222 None => None,
223 };
224 if account.is_none() {
225 account = StorageOptions::find_configured_storage_account(env_options);
226 }
227 let account = account.ok_or(Error::invalid_input("Unable to find object store prefix: no Azure account name in URI, and no storage account configured."))?;
228 (authority, account)
229 }
230 };
231 Ok(format!("{}${}@{}", url.scheme(), container, account))
232 }
233}
234
235#[async_trait::async_trait]
236impl ObjectStoreProvider for AzureBlobStoreProvider {
237 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
238 let scheme = base_path.scheme().to_string();
239 if scheme != "az" && scheme != "abfss" {
240 return Err(Error::invalid_input(format!(
241 "Unsupported Azure scheme '{}', expected 'az' or 'abfss'",
242 scheme
243 )));
244 }
245
246 let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
247 let mut storage_options =
248 StorageOptions::new(params.storage_options().cloned().unwrap_or_default());
249 storage_options.with_env_azure();
250 let download_retry_count = storage_options.download_retry_count();
251
252 let use_opendal = storage_options
253 .0
254 .get("use_opendal")
255 .map(|v| v.as_str() == "true")
256 .unwrap_or(false);
257
258 let accessor = params.get_accessor();
259
260 let inner: Arc<dyn OSObjectStore> = if use_opendal {
261 self.build_opendal_azure_store(&base_path, &storage_options)
264 .await?
265 } else {
266 self.build_microsoft_azure_store(&base_path, &storage_options, accessor)
267 .await?
268 };
269 let throttle_config = AimdThrottleConfig::from_storage_options(params.storage_options())?;
270 let inner = if throttle_config.is_disabled() {
271 inner
272 } else if storage_options.client_max_retries() == 0 {
273 log::warn!(
274 "AIMD throttle disabled: the current implementation relies on the object store \
275 client surfacing retry errors, which requires client_max_retries > 0. \
276 No throttle or retry layer will be applied."
277 );
278 inner
279 } else {
280 Arc::new(AimdThrottledStore::new(inner, throttle_config)?) as Arc<dyn OSObjectStore>
281 };
282
283 Ok(ObjectStore {
284 inner,
285 scheme,
286 block_size,
287 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
288 use_constant_size_upload_parts: false,
289 list_is_lexically_ordered: true,
290 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
291 download_retry_count,
292 io_tracker: Default::default(),
293 store_prefix: self
294 .calculate_object_store_prefix(&base_path, params.storage_options())?,
295 })
296 }
297
298 fn calculate_object_store_prefix(
299 &self,
300 url: &Url,
301 storage_options: Option<&HashMap<String, String>>,
302 ) -> Result<String> {
303 Self::calculate_object_store_prefix_with_env(url, storage_options, &ENV_OPTIONS.0)
304 }
305}
306
307static ENV_OPTIONS: LazyLock<StorageOptions> = LazyLock::new(StorageOptions::from_env);
308
309impl StorageOptions {
310 fn from_env() -> Self {
312 let mut opts = HashMap::<String, String>::new();
313 for (os_key, os_value) in std::env::vars_os() {
314 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str())
315 && let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase())
316 {
317 opts.insert(config_key.as_ref().to_string(), value.to_string());
318 }
319 }
320 Self(opts)
321 }
322
323 pub fn with_env_azure(&mut self) {
325 for (os_key, os_value) in &ENV_OPTIONS.0 {
326 if !self.0.contains_key(os_key) {
327 self.0.insert(os_key.clone(), os_value.clone());
328 }
329 }
330 }
331
332 pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
334 self.0
335 .iter()
336 .filter_map(|(key, value)| {
337 let az_key = AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
338 Some((az_key, value.clone()))
339 })
340 .collect()
341 }
342
343 #[allow(clippy::manual_map)]
344 fn find_configured_storage_account(map: &HashMap<String, String>) -> Option<String> {
345 if let Some(account) = map.get("azure_storage_account_name") {
346 Some(account.clone())
347 } else if let Some(account) = map.get("account_name") {
348 Some(account.clone())
349 } else {
350 None
351 }
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358 use std::sync::Arc;
359
360 use crate::object_store::test_utils::StaticMockStorageOptionsProvider;
361 use crate::object_store::{ObjectStoreParams, StorageOptionsAccessor};
362 use std::collections::HashMap;
363
364 #[test]
365 fn test_azure_store_path() {
366 let provider = AzureBlobStoreProvider;
367
368 let url = Url::parse("az://bucket/path/to/file").unwrap();
369 let path = provider.extract_path(&url).unwrap();
370 let expected_path = object_store::path::Path::from("path/to/file");
371 assert_eq!(path, expected_path);
372 }
373
374 #[tokio::test]
375 async fn test_use_opendal_flag() {
376 let provider = AzureBlobStoreProvider;
377 let url = Url::parse("az://test-container/path").unwrap();
378 let params_with_flag = ObjectStoreParams {
379 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
380 HashMap::from([
381 ("use_opendal".to_string(), "true".to_string()),
382 ("account_name".to_string(), "test_account".to_string()),
383 (
384 "endpoint".to_string(),
385 "https://test_account.blob.core.windows.net".to_string(),
386 ),
387 (
388 "account_key".to_string(),
389 "dGVzdF9hY2NvdW50X2tleQ==".to_string(),
390 ),
391 ]),
392 ))),
393 ..Default::default()
394 };
395
396 let store = provider
397 .new_store(url.clone(), ¶ms_with_flag)
398 .await
399 .unwrap();
400 assert_eq!(store.scheme, "az");
401 let inner_desc = store.inner.to_string();
402 assert!(
403 inner_desc.contains("Opendal") && inner_desc.contains("azblob"),
404 "az:// with use_opendal=true should use OpenDAL Azblob, got: {}",
405 inner_desc
406 );
407 }
408
409 #[tokio::test]
410 async fn test_dynamic_azure_credentials_provider() {
411 let accessor = Arc::new(StorageOptionsAccessor::with_provider(Arc::new(
412 StaticMockStorageOptionsProvider {
413 options: HashMap::from([(
414 "azure_storage_sas_token".to_string(),
415 "?sv=2022-11-02&sp=rl&sig=test".to_string(),
416 )]),
417 },
418 )));
419
420 let credentials = build_dynamic_credential_provider::<AzureCredential>(Some(accessor))
421 .await
422 .expect("dynamic azure credentials should build")
423 .expect("expected credential provider")
424 .get_credential()
425 .await
426 .expect("expected azure credential");
427
428 match credentials.as_ref() {
429 AzureCredential::SASToken(pairs) => {
430 assert!(
431 pairs
432 .iter()
433 .any(|(key, value)| key == "sig" && value == "test")
434 );
435 }
436 other => panic!("expected SAS token, got {other:?}"),
437 }
438 }
439
440 #[test]
441 fn test_find_configured_storage_account() {
442 assert_eq!(
443 Some("myaccount".to_string()),
444 StorageOptions::find_configured_storage_account(&HashMap::from_iter(
445 [
446 ("access_key".to_string(), "myaccesskey".to_string()),
447 (
448 "azure_storage_account_name".to_string(),
449 "myaccount".to_string()
450 )
451 ]
452 .into_iter()
453 ))
454 );
455 }
456
457 #[test]
458 fn test_calculate_object_store_prefix_from_url_and_options() {
459 let provider = AzureBlobStoreProvider;
460 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
461 assert_eq!(
462 "az$container@bob",
463 provider
464 .calculate_object_store_prefix(
465 &Url::parse("az://container/path").unwrap(),
466 Some(&options)
467 )
468 .unwrap()
469 );
470 }
471
472 #[test]
473 fn test_calculate_object_store_prefix_from_url_and_ignored_options() {
474 let provider = AzureBlobStoreProvider;
475 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
476 assert_eq!(
477 "az$container@account",
478 provider
479 .calculate_object_store_prefix(
480 &Url::parse("az://container@account.dfs.core.windows.net/path").unwrap(),
481 Some(&options)
482 )
483 .unwrap()
484 );
485 }
486
487 #[test]
488 fn test_calculate_object_store_prefix_from_url_short_account() {
489 let provider = AzureBlobStoreProvider;
490 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
491 assert_eq!(
492 "az$container@account",
493 provider
494 .calculate_object_store_prefix(
495 &Url::parse("az://container@account/path").unwrap(),
496 Some(&options)
497 )
498 .unwrap()
499 );
500 }
501
502 #[test]
503 fn test_fail_to_calculate_object_store_prefix_from_url() {
504 let options = HashMap::from_iter([("access_key".to_string(), "myaccesskey".to_string())]);
505 let expected = "Invalid user input: Unable to find object store prefix: no Azure account name in URI, and no storage account configured.";
506 let result = AzureBlobStoreProvider::calculate_object_store_prefix_with_env(
507 &Url::parse("az://container/path").unwrap(),
508 Some(&options),
509 &HashMap::new(),
510 )
511 .expect_err("expected error")
512 .to_string();
513 assert_eq!(expected, &result[..expected.len()]);
514 }
515
516 #[test]
519 fn test_abfss_extract_path() {
520 let provider = AzureBlobStoreProvider;
521 let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path/to/dataset.lance")
522 .unwrap();
523 let path = provider.extract_path(&url).unwrap();
524 assert_eq!(
525 path,
526 object_store::path::Path::from("path/to/dataset.lance")
527 );
528 }
529
530 #[test]
531 fn test_calculate_abfss_prefix() {
532 let provider = AzureBlobStoreProvider;
533 let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path/to/data").unwrap();
534 let prefix = provider.calculate_object_store_prefix(&url, None).unwrap();
535 assert_eq!(prefix, "abfss$myfs@myaccount");
536 }
537
538 #[test]
539 fn test_calculate_abfss_prefix_ignores_storage_options() {
540 let provider = AzureBlobStoreProvider;
541 let options =
542 HashMap::from_iter([("account_name".to_string(), "other_account".to_string())]);
543 let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path").unwrap();
544 let prefix = provider
545 .calculate_object_store_prefix(&url, Some(&options))
546 .unwrap();
547 assert_eq!(prefix, "abfss$myfs@myaccount");
548 }
549
550 #[tokio::test]
551 async fn test_abfss_default_uses_microsoft_builder() {
552 use crate::object_store::StorageOptionsAccessor;
553 let provider = AzureBlobStoreProvider;
554 let url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
555 let params = ObjectStoreParams {
556 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
557 HashMap::from([
558 ("account_name".to_string(), "testaccount".to_string()),
559 ("account_key".to_string(), "dGVzdA==".to_string()),
560 ]),
561 ))),
562 ..Default::default()
563 };
564
565 let store = provider.new_store(url, ¶ms).await.unwrap();
566 assert_eq!(store.scheme, "abfss");
567 assert!(!store.is_local());
568 assert!(store.is_cloud());
569 let inner_desc = store.inner.to_string();
570 assert!(
571 inner_desc.contains("MicrosoftAzure"),
572 "abfss:// without use_opendal should use MicrosoftAzureBuilder, got: {}",
573 inner_desc
574 );
575 }
576
577 #[tokio::test]
578 async fn test_unsupported_scheme_rejected() {
579 use crate::object_store::StorageOptionsAccessor;
580 let provider = AzureBlobStoreProvider;
581 let url = Url::parse("wasbs://container@myaccount.blob.core.windows.net/path").unwrap();
582 let params = ObjectStoreParams {
583 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
584 HashMap::from([
585 ("account_name".to_string(), "myaccount".to_string()),
586 ("account_key".to_string(), "dGVzdA==".to_string()),
587 ]),
588 ))),
589 ..Default::default()
590 };
591
592 let err = provider
593 .new_store(url, ¶ms)
594 .await
595 .expect_err("expected error for unsupported scheme");
596 assert!(
597 err.to_string().contains("Unsupported Azure scheme"),
598 "unexpected error: {}",
599 err
600 );
601 }
602
603 #[tokio::test]
604 async fn test_abfss_with_opendal_uses_azdls() {
605 use crate::object_store::StorageOptionsAccessor;
606 let provider = AzureBlobStoreProvider;
607 let url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
608 let params = ObjectStoreParams {
609 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
610 HashMap::from([
611 ("use_opendal".to_string(), "true".to_string()),
612 ("account_name".to_string(), "testaccount".to_string()),
613 ("account_key".to_string(), "dGVzdA==".to_string()),
614 ]),
615 ))),
616 ..Default::default()
617 };
618
619 let store = provider.new_store(url, ¶ms).await.unwrap();
620 assert_eq!(store.scheme, "abfss");
621 assert!(!store.is_local());
622 assert!(store.is_cloud());
623 let inner_desc = store.inner.to_string();
624 assert!(
625 inner_desc.contains("Opendal") && inner_desc.contains("azdls"),
626 "abfss:// with use_opendal=true should use OpenDAL Azdls, got: {}",
627 inner_desc
628 );
629 }
630
631 #[test]
632 fn test_azdls_capabilities_differ_from_azblob() {
633 let common_opts = StorageOptions(HashMap::from([
634 ("account_name".to_string(), "testaccount".to_string()),
635 ("account_key".to_string(), "dGVzdA==".to_string()),
636 (
637 "endpoint".to_string(),
638 "https://testaccount.blob.core.windows.net".to_string(),
639 ),
640 ]));
641
642 let az_url = Url::parse("az://test-container/path").unwrap();
644 let az_operator =
645 AzureBlobStoreProvider::build_opendal_operator(&az_url, &common_opts).unwrap();
646
647 let abfss_url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
649 let abfss_operator =
650 AzureBlobStoreProvider::build_opendal_operator(&abfss_url, &common_opts).unwrap();
651
652 let azblob_cap = az_operator.info().native_capability();
653 let azdls_cap = abfss_operator.info().native_capability();
654
655 assert!(azblob_cap.read);
657 assert!(azdls_cap.read);
658 assert!(azblob_cap.write);
659 assert!(azdls_cap.write);
660 assert!(azblob_cap.list);
661 assert!(azdls_cap.list);
662
663 assert!(azdls_cap.rename, "Azdls should support rename");
665 assert!(azdls_cap.create_dir, "Azdls should support create_dir");
666 assert!(!azblob_cap.rename, "Azblob should not support rename");
667 }
668}