1use std::{
5 collections::HashMap,
6 str::FromStr,
7 sync::{Arc, LazyLock},
8 time::Duration,
9};
10
11use object_store::ObjectStore as OSObjectStore;
12use object_store_opendal::OpendalStore;
13use opendal::{Operator, services::Azblob, services::Azdls};
14
15use object_store::{
16 RetryConfig,
17 azure::{AzureConfigKey, AzureCredential, MicrosoftAzureBuilder},
18};
19use url::Url;
20
21use crate::object_store::{
22 DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
23 ObjectStoreParams, ObjectStoreProvider, StorageOptions, StorageOptionsAccessor,
24 dynamic_credentials::build_dynamic_credential_provider,
25 throttle::{AimdThrottleConfig, AimdThrottledStore},
26};
27use lance_core::error::{Error, Result};
28
29#[derive(Default, Debug)]
30pub struct AzureBlobStoreProvider;
31
32impl AzureBlobStoreProvider {
33 fn normalize_opendal_azure_options(
38 options: &HashMap<String, String>,
39 ) -> HashMap<String, String> {
40 let mut config_map = options.clone();
42
43 let alias_groups: &[(&str, &[&str])] = &[
46 ("account_name", &["azure_storage_account_name"]),
47 ("endpoint", &["azure_storage_endpoint", "azure_endpoint"]),
48 (
49 "account_key",
50 &[
51 "azure_storage_account_key",
52 "azure_storage_access_key",
53 "azure_storage_master_key",
54 "access_key",
55 "master_key",
56 ],
57 ),
58 (
59 "sas_token",
60 &[
61 "azure_storage_sas_token",
62 "azure_storage_sas_key",
63 "sas_key",
64 ],
65 ),
66 ];
67
68 for (canonical, aliases) in alias_groups {
69 if !config_map.contains_key(*canonical) {
70 for alias in *aliases {
71 if let Some(value) = config_map.remove(*alias) {
72 config_map.insert(canonical.to_string(), value);
73 break;
74 }
75 }
76 } else {
77 for alias in *aliases {
79 config_map.remove(*alias);
80 }
81 }
82 }
83
84 config_map
85 }
86
87 fn build_opendal_operator(
88 base_path: &Url,
89 storage_options: &StorageOptions,
90 ) -> Result<Operator> {
91 let mut config_map = Self::normalize_opendal_azure_options(&storage_options.0);
94
95 match base_path.scheme() {
96 "az" => {
97 let container = base_path
98 .host_str()
99 .ok_or_else(|| Error::invalid_input("Azure URL must contain container name"))?
100 .to_string();
101
102 config_map.insert("container".to_string(), container);
103
104 let prefix = base_path.path().trim_start_matches('/');
105 if !prefix.is_empty() {
106 config_map.insert("root".to_string(), format!("/{}", prefix));
107 }
108
109 Operator::from_iter::<Azblob>(config_map)
110 .map_err(|e| {
111 Error::invalid_input(format!(
112 "Failed to create Azure Blob operator: {:?}",
113 e
114 ))
115 })
116 .map(|b| b.finish())
117 }
118 "abfss" => {
119 let filesystem = base_path.username();
120 if filesystem.is_empty() {
121 return Err(Error::invalid_input(
122 "abfss:// URL must include account: abfss://<filesystem>@<account>.dfs.core.windows.net/path",
123 ));
124 }
125 let host = base_path.host_str().ok_or_else(|| {
126 Error::invalid_input(
127 "abfss:// URL must include account: abfss://<filesystem>@<account>.dfs.core.windows.net/path"
128 )
129 })?;
130
131 config_map.insert("filesystem".to_string(), filesystem.to_string());
132 config_map.insert("endpoint".to_string(), format!("https://{}", host));
133 config_map
134 .entry("account_name".to_string())
135 .or_insert_with(|| host.split('.').next().unwrap_or(host).to_string());
136
137 let root_path = base_path.path().trim_start_matches('/');
138 if !root_path.is_empty() {
139 config_map.insert("root".to_string(), format!("/{}", root_path));
140 }
141
142 Operator::from_iter::<Azdls>(config_map)
143 .map_err(|e| {
144 Error::invalid_input(format!(
145 "Failed to create Azure DFS (ADLS Gen2) operator: {:?}",
146 e
147 ))
148 })
149 .map(|b| b.finish())
150 }
151 _ => Err(Error::invalid_input(format!(
152 "Unsupported Azure scheme: {}",
153 base_path.scheme()
154 ))),
155 }
156 }
157
158 async fn build_opendal_azure_store(
159 &self,
160 base_path: &Url,
161 storage_options: &StorageOptions,
162 ) -> Result<Arc<dyn OSObjectStore>> {
163 let operator = Self::build_opendal_operator(base_path, storage_options)?;
164 Ok(Arc::new(OpendalStore::new(operator)))
165 }
166
167 async fn build_microsoft_azure_store(
168 &self,
169 base_path: &Url,
170 storage_options: &StorageOptions,
171 accessor: Option<Arc<StorageOptionsAccessor>>,
172 ) -> Result<Arc<dyn OSObjectStore>> {
173 let retry_config = RetryConfig {
176 backoff: Default::default(),
177 max_retries: storage_options.client_max_retries(),
178 retry_timeout: Duration::from_secs(storage_options.client_retry_timeout()),
179 };
180
181 let mut builder = MicrosoftAzureBuilder::new()
182 .with_url(base_path.as_ref())
183 .with_retry(retry_config)
184 .with_client_options(storage_options.client_options()?);
185 for (key, value) in storage_options.as_azure_options() {
186 builder = builder.with_config(key, value);
187 }
188
189 if let Some(credentials) =
190 build_dynamic_credential_provider::<AzureCredential>(accessor).await?
191 {
192 builder = builder.with_credentials(credentials);
193 }
194
195 Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
196 }
197
198 fn calculate_object_store_prefix_with_env(
199 url: &Url,
200 storage_options: Option<&HashMap<String, String>>,
201 env_options: &HashMap<String, String>,
202 ) -> Result<String> {
203 let authority = url.authority();
204 let (container, account) = match authority.find("@") {
205 Some(at_index) => {
206 let container = &authority[..at_index];
211 let account = &authority[at_index + 1..];
212 (
213 container,
214 account.split(".").next().unwrap_or_default().to_string(),
215 )
216 }
217 None => {
218 let mut account = match storage_options {
221 Some(opts) => StorageOptions::find_configured_storage_account(opts),
222 None => None,
223 };
224 if account.is_none() {
225 account = StorageOptions::find_configured_storage_account(env_options);
226 }
227 let account = account.ok_or(Error::invalid_input("Unable to find object store prefix: no Azure account name in URI, and no storage account configured."))?;
228 (authority, account)
229 }
230 };
231 Ok(format!("{}${}@{}", url.scheme(), container, account))
232 }
233}
234
235#[async_trait::async_trait]
236impl ObjectStoreProvider for AzureBlobStoreProvider {
237 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
238 let scheme = base_path.scheme().to_string();
239 if scheme != "az" && scheme != "abfss" {
240 return Err(Error::invalid_input(format!(
241 "Unsupported Azure scheme '{}', expected 'az' or 'abfss'",
242 scheme
243 )));
244 }
245
246 let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
247 let mut storage_options =
248 StorageOptions::new(params.storage_options().cloned().unwrap_or_default());
249 storage_options.with_env_azure();
250 let download_retry_count = storage_options.download_retry_count();
251
252 let use_opendal = storage_options
253 .0
254 .get("use_opendal")
255 .map(|v| v.as_str() == "true")
256 .unwrap_or(false);
257
258 let accessor = params.get_accessor();
259
260 let inner: Arc<dyn OSObjectStore> = if use_opendal {
261 self.build_opendal_azure_store(&base_path, &storage_options)
264 .await?
265 } else {
266 self.build_microsoft_azure_store(&base_path, &storage_options, accessor)
267 .await?
268 };
269 let throttle_config = AimdThrottleConfig::from_storage_options(params.storage_options())?;
270 let inner = if throttle_config.is_disabled() {
271 inner
272 } else {
273 Arc::new(AimdThrottledStore::new(inner, throttle_config)?) as Arc<dyn OSObjectStore>
274 };
275
276 Ok(ObjectStore {
277 inner,
278 scheme,
279 block_size,
280 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
281 use_constant_size_upload_parts: false,
282 list_is_lexically_ordered: true,
283 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
284 download_retry_count,
285 io_tracker: Default::default(),
286 store_prefix: self
287 .calculate_object_store_prefix(&base_path, params.storage_options())?,
288 })
289 }
290
291 fn calculate_object_store_prefix(
292 &self,
293 url: &Url,
294 storage_options: Option<&HashMap<String, String>>,
295 ) -> Result<String> {
296 Self::calculate_object_store_prefix_with_env(url, storage_options, &ENV_OPTIONS.0)
297 }
298}
299
300static ENV_OPTIONS: LazyLock<StorageOptions> = LazyLock::new(StorageOptions::from_env);
301
302impl StorageOptions {
303 fn from_env() -> Self {
305 let mut opts = HashMap::<String, String>::new();
306 for (os_key, os_value) in std::env::vars_os() {
307 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str())
308 && let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase())
309 {
310 opts.insert(config_key.as_ref().to_string(), value.to_string());
311 }
312 }
313 Self(opts)
314 }
315
316 pub fn with_env_azure(&mut self) {
318 for (os_key, os_value) in &ENV_OPTIONS.0 {
319 if !self.0.contains_key(os_key) {
320 self.0.insert(os_key.clone(), os_value.clone());
321 }
322 }
323 }
324
325 pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
327 self.0
328 .iter()
329 .filter_map(|(key, value)| {
330 let az_key = AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
331 Some((az_key, value.clone()))
332 })
333 .collect()
334 }
335
336 #[allow(clippy::manual_map)]
337 fn find_configured_storage_account(map: &HashMap<String, String>) -> Option<String> {
338 if let Some(account) = map.get("azure_storage_account_name") {
339 Some(account.clone())
340 } else if let Some(account) = map.get("account_name") {
341 Some(account.clone())
342 } else {
343 None
344 }
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351 use std::sync::Arc;
352
353 use crate::object_store::test_utils::StaticMockStorageOptionsProvider;
354 use crate::object_store::{ObjectStoreParams, StorageOptionsAccessor};
355 use std::collections::HashMap;
356
357 #[test]
358 fn test_azure_store_path() {
359 let provider = AzureBlobStoreProvider;
360
361 let url = Url::parse("az://bucket/path/to/file").unwrap();
362 let path = provider.extract_path(&url).unwrap();
363 let expected_path = object_store::path::Path::from("path/to/file");
364 assert_eq!(path, expected_path);
365 }
366
367 #[tokio::test]
368 async fn test_use_opendal_flag() {
369 let provider = AzureBlobStoreProvider;
370 let url = Url::parse("az://test-container/path").unwrap();
371 let params_with_flag = ObjectStoreParams {
372 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
373 HashMap::from([
374 ("use_opendal".to_string(), "true".to_string()),
375 ("account_name".to_string(), "test_account".to_string()),
376 (
377 "endpoint".to_string(),
378 "https://test_account.blob.core.windows.net".to_string(),
379 ),
380 (
381 "account_key".to_string(),
382 "dGVzdF9hY2NvdW50X2tleQ==".to_string(),
383 ),
384 ]),
385 ))),
386 ..Default::default()
387 };
388
389 let store = provider
390 .new_store(url.clone(), ¶ms_with_flag)
391 .await
392 .unwrap();
393 assert_eq!(store.scheme, "az");
394 let inner_desc = store.inner.to_string();
395 assert!(
396 inner_desc.contains("Opendal") && inner_desc.contains("azblob"),
397 "az:// with use_opendal=true should use OpenDAL Azblob, got: {}",
398 inner_desc
399 );
400 }
401
402 #[tokio::test]
403 async fn test_dynamic_azure_credentials_provider() {
404 let accessor = Arc::new(StorageOptionsAccessor::with_provider(Arc::new(
405 StaticMockStorageOptionsProvider {
406 options: HashMap::from([(
407 "azure_storage_sas_token".to_string(),
408 "?sv=2022-11-02&sp=rl&sig=test".to_string(),
409 )]),
410 },
411 )));
412
413 let credentials = build_dynamic_credential_provider::<AzureCredential>(Some(accessor))
414 .await
415 .expect("dynamic azure credentials should build")
416 .expect("expected credential provider")
417 .get_credential()
418 .await
419 .expect("expected azure credential");
420
421 match credentials.as_ref() {
422 AzureCredential::SASToken(pairs) => {
423 assert!(
424 pairs
425 .iter()
426 .any(|(key, value)| key == "sig" && value == "test")
427 );
428 }
429 other => panic!("expected SAS token, got {other:?}"),
430 }
431 }
432
433 #[test]
434 fn test_find_configured_storage_account() {
435 assert_eq!(
436 Some("myaccount".to_string()),
437 StorageOptions::find_configured_storage_account(&HashMap::from_iter(
438 [
439 ("access_key".to_string(), "myaccesskey".to_string()),
440 (
441 "azure_storage_account_name".to_string(),
442 "myaccount".to_string()
443 )
444 ]
445 .into_iter()
446 ))
447 );
448 }
449
450 #[test]
451 fn test_calculate_object_store_prefix_from_url_and_options() {
452 let provider = AzureBlobStoreProvider;
453 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
454 assert_eq!(
455 "az$container@bob",
456 provider
457 .calculate_object_store_prefix(
458 &Url::parse("az://container/path").unwrap(),
459 Some(&options)
460 )
461 .unwrap()
462 );
463 }
464
465 #[test]
466 fn test_calculate_object_store_prefix_from_url_and_ignored_options() {
467 let provider = AzureBlobStoreProvider;
468 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
469 assert_eq!(
470 "az$container@account",
471 provider
472 .calculate_object_store_prefix(
473 &Url::parse("az://container@account.dfs.core.windows.net/path").unwrap(),
474 Some(&options)
475 )
476 .unwrap()
477 );
478 }
479
480 #[test]
481 fn test_calculate_object_store_prefix_from_url_short_account() {
482 let provider = AzureBlobStoreProvider;
483 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
484 assert_eq!(
485 "az$container@account",
486 provider
487 .calculate_object_store_prefix(
488 &Url::parse("az://container@account/path").unwrap(),
489 Some(&options)
490 )
491 .unwrap()
492 );
493 }
494
495 #[test]
496 fn test_fail_to_calculate_object_store_prefix_from_url() {
497 let options = HashMap::from_iter([("access_key".to_string(), "myaccesskey".to_string())]);
498 let expected = "Invalid user input: Unable to find object store prefix: no Azure account name in URI, and no storage account configured.";
499 let result = AzureBlobStoreProvider::calculate_object_store_prefix_with_env(
500 &Url::parse("az://container/path").unwrap(),
501 Some(&options),
502 &HashMap::new(),
503 )
504 .expect_err("expected error")
505 .to_string();
506 assert_eq!(expected, &result[..expected.len()]);
507 }
508
509 #[test]
512 fn test_abfss_extract_path() {
513 let provider = AzureBlobStoreProvider;
514 let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path/to/dataset.lance")
515 .unwrap();
516 let path = provider.extract_path(&url).unwrap();
517 assert_eq!(
518 path,
519 object_store::path::Path::from("path/to/dataset.lance")
520 );
521 }
522
523 #[test]
524 fn test_calculate_abfss_prefix() {
525 let provider = AzureBlobStoreProvider;
526 let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path/to/data").unwrap();
527 let prefix = provider.calculate_object_store_prefix(&url, None).unwrap();
528 assert_eq!(prefix, "abfss$myfs@myaccount");
529 }
530
531 #[test]
532 fn test_calculate_abfss_prefix_ignores_storage_options() {
533 let provider = AzureBlobStoreProvider;
534 let options =
535 HashMap::from_iter([("account_name".to_string(), "other_account".to_string())]);
536 let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path").unwrap();
537 let prefix = provider
538 .calculate_object_store_prefix(&url, Some(&options))
539 .unwrap();
540 assert_eq!(prefix, "abfss$myfs@myaccount");
541 }
542
543 #[tokio::test]
544 async fn test_abfss_default_uses_microsoft_builder() {
545 use crate::object_store::StorageOptionsAccessor;
546 let provider = AzureBlobStoreProvider;
547 let url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
548 let params = ObjectStoreParams {
549 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
550 HashMap::from([
551 ("account_name".to_string(), "testaccount".to_string()),
552 ("account_key".to_string(), "dGVzdA==".to_string()),
553 ]),
554 ))),
555 ..Default::default()
556 };
557
558 let store = provider.new_store(url, ¶ms).await.unwrap();
559 assert_eq!(store.scheme, "abfss");
560 assert!(!store.is_local());
561 assert!(store.is_cloud());
562 let inner_desc = store.inner.to_string();
563 assert!(
564 inner_desc.contains("MicrosoftAzure"),
565 "abfss:// without use_opendal should use MicrosoftAzureBuilder, got: {}",
566 inner_desc
567 );
568 }
569
570 #[tokio::test]
571 async fn test_unsupported_scheme_rejected() {
572 use crate::object_store::StorageOptionsAccessor;
573 let provider = AzureBlobStoreProvider;
574 let url = Url::parse("wasbs://container@myaccount.blob.core.windows.net/path").unwrap();
575 let params = ObjectStoreParams {
576 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
577 HashMap::from([
578 ("account_name".to_string(), "myaccount".to_string()),
579 ("account_key".to_string(), "dGVzdA==".to_string()),
580 ]),
581 ))),
582 ..Default::default()
583 };
584
585 let err = provider
586 .new_store(url, ¶ms)
587 .await
588 .expect_err("expected error for unsupported scheme");
589 assert!(
590 err.to_string().contains("Unsupported Azure scheme"),
591 "unexpected error: {}",
592 err
593 );
594 }
595
596 #[tokio::test]
597 async fn test_abfss_with_opendal_uses_azdls() {
598 use crate::object_store::StorageOptionsAccessor;
599 let provider = AzureBlobStoreProvider;
600 let url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
601 let params = ObjectStoreParams {
602 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
603 HashMap::from([
604 ("use_opendal".to_string(), "true".to_string()),
605 ("account_name".to_string(), "testaccount".to_string()),
606 ("account_key".to_string(), "dGVzdA==".to_string()),
607 ]),
608 ))),
609 ..Default::default()
610 };
611
612 let store = provider.new_store(url, ¶ms).await.unwrap();
613 assert_eq!(store.scheme, "abfss");
614 assert!(!store.is_local());
615 assert!(store.is_cloud());
616 let inner_desc = store.inner.to_string();
617 assert!(
618 inner_desc.contains("Opendal") && inner_desc.contains("azdls"),
619 "abfss:// with use_opendal=true should use OpenDAL Azdls, got: {}",
620 inner_desc
621 );
622 }
623
624 #[test]
625 fn test_azdls_capabilities_differ_from_azblob() {
626 let common_opts = StorageOptions(HashMap::from([
627 ("account_name".to_string(), "testaccount".to_string()),
628 ("account_key".to_string(), "dGVzdA==".to_string()),
629 (
630 "endpoint".to_string(),
631 "https://testaccount.blob.core.windows.net".to_string(),
632 ),
633 ]));
634
635 let az_url = Url::parse("az://test-container/path").unwrap();
637 let az_operator =
638 AzureBlobStoreProvider::build_opendal_operator(&az_url, &common_opts).unwrap();
639
640 let abfss_url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
642 let abfss_operator =
643 AzureBlobStoreProvider::build_opendal_operator(&abfss_url, &common_opts).unwrap();
644
645 let azblob_cap = az_operator.info().native_capability();
646 let azdls_cap = abfss_operator.info().native_capability();
647
648 assert!(azblob_cap.read);
650 assert!(azdls_cap.read);
651 assert!(azblob_cap.write);
652 assert!(azdls_cap.write);
653 assert!(azblob_cap.list);
654 assert!(azdls_cap.list);
655
656 assert!(azdls_cap.rename, "Azdls should support rename");
658 assert!(azdls_cap.create_dir, "Azdls should support create_dir");
659 assert!(!azblob_cap.rename, "Azblob should not support rename");
660 }
661}