1use std::{
5 collections::HashMap,
6 str::FromStr,
7 sync::{Arc, LazyLock},
8 time::Duration,
9};
10
11use object_store::ObjectStore as OSObjectStore;
12use object_store_opendal::OpendalStore;
13use opendal::{Operator, services::Azblob, services::Azdls};
14
15use object_store::{
16 RetryConfig,
17 azure::{AzureConfigKey, MicrosoftAzureBuilder},
18};
19use url::Url;
20
21use crate::object_store::{
22 DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore,
23 ObjectStoreParams, ObjectStoreProvider, StorageOptions,
24};
25use lance_core::error::{Error, Result};
26
27#[derive(Default, Debug)]
28pub struct AzureBlobStoreProvider;
29
30impl AzureBlobStoreProvider {
31 fn build_opendal_operator(
32 base_path: &Url,
33 storage_options: &StorageOptions,
34 ) -> Result<Operator> {
35 let mut config_map: HashMap<String, String> = storage_options.0.clone();
38
39 match base_path.scheme() {
40 "az" => {
41 let container = base_path
42 .host_str()
43 .ok_or_else(|| Error::invalid_input("Azure URL must contain container name"))?
44 .to_string();
45
46 config_map.insert("container".to_string(), container);
47
48 let prefix = base_path.path().trim_start_matches('/');
49 if !prefix.is_empty() {
50 config_map.insert("root".to_string(), format!("/{}", prefix));
51 }
52
53 Operator::from_iter::<Azblob>(config_map)
54 .map_err(|e| {
55 Error::invalid_input(format!(
56 "Failed to create Azure Blob operator: {:?}",
57 e
58 ))
59 })
60 .map(|b| b.finish())
61 }
62 "abfss" => {
63 let filesystem = base_path.username();
64 if filesystem.is_empty() {
65 return Err(Error::invalid_input(
66 "abfss:// URL must include account: abfss://<filesystem>@<account>.dfs.core.windows.net/path",
67 ));
68 }
69 let host = base_path.host_str().ok_or_else(|| {
70 Error::invalid_input(
71 "abfss:// URL must include account: abfss://<filesystem>@<account>.dfs.core.windows.net/path"
72 )
73 })?;
74
75 config_map.insert("filesystem".to_string(), filesystem.to_string());
76 config_map.insert("endpoint".to_string(), format!("https://{}", host));
77 config_map
78 .entry("account_name".to_string())
79 .or_insert_with(|| host.split('.').next().unwrap_or(host).to_string());
80
81 let root_path = base_path.path().trim_start_matches('/');
82 if !root_path.is_empty() {
83 config_map.insert("root".to_string(), format!("/{}", root_path));
84 }
85
86 Operator::from_iter::<Azdls>(config_map)
87 .map_err(|e| {
88 Error::invalid_input(format!(
89 "Failed to create Azure DFS (ADLS Gen2) operator: {:?}",
90 e
91 ))
92 })
93 .map(|b| b.finish())
94 }
95 _ => Err(Error::invalid_input(format!(
96 "Unsupported Azure scheme: {}",
97 base_path.scheme()
98 ))),
99 }
100 }
101
102 async fn build_opendal_azure_store(
103 &self,
104 base_path: &Url,
105 storage_options: &StorageOptions,
106 ) -> Result<Arc<dyn OSObjectStore>> {
107 let operator = Self::build_opendal_operator(base_path, storage_options)?;
108 Ok(Arc::new(OpendalStore::new(operator)))
109 }
110
111 async fn build_microsoft_azure_store(
112 &self,
113 base_path: &Url,
114 storage_options: &StorageOptions,
115 ) -> Result<Arc<dyn OSObjectStore>> {
116 let max_retries = storage_options.client_max_retries();
117 let retry_timeout = storage_options.client_retry_timeout();
118 let retry_config = RetryConfig {
119 backoff: Default::default(),
120 max_retries,
121 retry_timeout: Duration::from_secs(retry_timeout),
122 };
123
124 let mut builder = MicrosoftAzureBuilder::new()
125 .with_url(base_path.as_ref())
126 .with_retry(retry_config)
127 .with_client_options(storage_options.client_options()?);
128 for (key, value) in storage_options.as_azure_options() {
129 builder = builder.with_config(key, value);
130 }
131
132 Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
133 }
134}
135
136#[async_trait::async_trait]
137impl ObjectStoreProvider for AzureBlobStoreProvider {
138 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
139 let scheme = base_path.scheme().to_string();
140 if scheme != "az" && scheme != "abfss" {
141 return Err(Error::invalid_input(format!(
142 "Unsupported Azure scheme '{}', expected 'az' or 'abfss'",
143 scheme
144 )));
145 }
146
147 let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
148 let mut storage_options =
149 StorageOptions(params.storage_options().cloned().unwrap_or_default());
150 storage_options.with_env_azure();
151 let download_retry_count = storage_options.download_retry_count();
152
153 let use_opendal = storage_options
154 .0
155 .get("use_opendal")
156 .map(|v| v.as_str() == "true")
157 .unwrap_or(false);
158
159 let inner: Arc<dyn OSObjectStore> = if use_opendal {
160 self.build_opendal_azure_store(&base_path, &storage_options)
161 .await?
162 } else {
163 self.build_microsoft_azure_store(&base_path, &storage_options)
164 .await?
165 };
166
167 Ok(ObjectStore {
168 inner,
169 scheme,
170 block_size,
171 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
172 use_constant_size_upload_parts: false,
173 list_is_lexically_ordered: true,
174 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
175 download_retry_count,
176 io_tracker: Default::default(),
177 store_prefix: self
178 .calculate_object_store_prefix(&base_path, params.storage_options())?,
179 })
180 }
181
182 fn calculate_object_store_prefix(
183 &self,
184 url: &Url,
185 storage_options: Option<&HashMap<String, String>>,
186 ) -> Result<String> {
187 let authority = url.authority();
188 let (container, account) = match authority.find("@") {
189 Some(at_index) => {
190 let container = &authority[..at_index];
195 let account = &authority[at_index + 1..];
196 (
197 container,
198 account.split(".").next().unwrap_or_default().to_string(),
199 )
200 }
201 None => {
202 let mut account = match storage_options {
205 Some(opts) => StorageOptions::find_configured_storage_account(opts),
206 None => None,
207 };
208 if account.is_none() {
209 account = StorageOptions::find_configured_storage_account(&ENV_OPTIONS.0);
210 }
211 let account = account.ok_or(Error::invalid_input("Unable to find object store prefix: no Azure account name in URI, and no storage account configured."))?;
212 (authority, account)
213 }
214 };
215 Ok(format!("{}${}@{}", url.scheme(), container, account))
216 }
217}
218
219static ENV_OPTIONS: LazyLock<StorageOptions> = LazyLock::new(StorageOptions::from_env);
220
221impl StorageOptions {
222 fn from_env() -> Self {
224 let mut opts = HashMap::<String, String>::new();
225 for (os_key, os_value) in std::env::vars_os() {
226 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str())
227 && let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase())
228 {
229 opts.insert(config_key.as_ref().to_string(), value.to_string());
230 }
231 }
232 Self(opts)
233 }
234
235 pub fn with_env_azure(&mut self) {
237 for (os_key, os_value) in &ENV_OPTIONS.0 {
238 if !self.0.contains_key(os_key) {
239 self.0.insert(os_key.clone(), os_value.clone());
240 }
241 }
242 }
243
244 pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
246 self.0
247 .iter()
248 .filter_map(|(key, value)| {
249 let az_key = AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
250 Some((az_key, value.clone()))
251 })
252 .collect()
253 }
254
255 #[allow(clippy::manual_map)]
256 fn find_configured_storage_account(map: &HashMap<String, String>) -> Option<String> {
257 if let Some(account) = map.get("azure_storage_account_name") {
258 Some(account.clone())
259 } else if let Some(account) = map.get("account_name") {
260 Some(account.clone())
261 } else {
262 None
263 }
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270 use crate::object_store::ObjectStoreParams;
271 use std::collections::HashMap;
272
273 #[test]
274 fn test_azure_store_path() {
275 let provider = AzureBlobStoreProvider;
276
277 let url = Url::parse("az://bucket/path/to/file").unwrap();
278 let path = provider.extract_path(&url).unwrap();
279 let expected_path = object_store::path::Path::from("path/to/file");
280 assert_eq!(path, expected_path);
281 }
282
283 #[tokio::test]
284 async fn test_use_opendal_flag() {
285 use crate::object_store::StorageOptionsAccessor;
286 let provider = AzureBlobStoreProvider;
287 let url = Url::parse("az://test-container/path").unwrap();
288 let params_with_flag = ObjectStoreParams {
289 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
290 HashMap::from([
291 ("use_opendal".to_string(), "true".to_string()),
292 ("account_name".to_string(), "test_account".to_string()),
293 (
294 "endpoint".to_string(),
295 "https://test_account.blob.core.windows.net".to_string(),
296 ),
297 (
298 "account_key".to_string(),
299 "dGVzdF9hY2NvdW50X2tleQ==".to_string(),
300 ),
301 ]),
302 ))),
303 ..Default::default()
304 };
305
306 let store = provider
307 .new_store(url.clone(), ¶ms_with_flag)
308 .await
309 .unwrap();
310 assert_eq!(store.scheme, "az");
311 let inner_desc = store.inner.to_string();
312 assert!(
313 inner_desc.contains("Opendal") && inner_desc.contains("azblob"),
314 "az:// with use_opendal=true should use OpenDAL Azblob, got: {}",
315 inner_desc
316 );
317 }
318
319 #[test]
320 fn test_find_configured_storage_account() {
321 assert_eq!(
322 Some("myaccount".to_string()),
323 StorageOptions::find_configured_storage_account(&HashMap::from_iter(
324 [
325 ("access_key".to_string(), "myaccesskey".to_string()),
326 (
327 "azure_storage_account_name".to_string(),
328 "myaccount".to_string()
329 )
330 ]
331 .into_iter()
332 ))
333 );
334 }
335
336 #[test]
337 fn test_calculate_object_store_prefix_from_url_and_options() {
338 let provider = AzureBlobStoreProvider;
339 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
340 assert_eq!(
341 "az$container@bob",
342 provider
343 .calculate_object_store_prefix(
344 &Url::parse("az://container/path").unwrap(),
345 Some(&options)
346 )
347 .unwrap()
348 );
349 }
350
351 #[test]
352 fn test_calculate_object_store_prefix_from_url_and_ignored_options() {
353 let provider = AzureBlobStoreProvider;
354 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
355 assert_eq!(
356 "az$container@account",
357 provider
358 .calculate_object_store_prefix(
359 &Url::parse("az://container@account.dfs.core.windows.net/path").unwrap(),
360 Some(&options)
361 )
362 .unwrap()
363 );
364 }
365
366 #[test]
367 fn test_calculate_object_store_prefix_from_url_short_account() {
368 let provider = AzureBlobStoreProvider;
369 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
370 assert_eq!(
371 "az$container@account",
372 provider
373 .calculate_object_store_prefix(
374 &Url::parse("az://container@account/path").unwrap(),
375 Some(&options)
376 )
377 .unwrap()
378 );
379 }
380
381 #[test]
382 fn test_fail_to_calculate_object_store_prefix_from_url() {
383 let provider = AzureBlobStoreProvider;
384 let options = HashMap::from_iter([("access_key".to_string(), "myaccesskey".to_string())]);
385 let expected = "Invalid user input: Unable to find object store prefix: no Azure account name in URI, and no storage account configured.";
386 let result = provider
387 .calculate_object_store_prefix(
388 &Url::parse("az://container/path").unwrap(),
389 Some(&options),
390 )
391 .expect_err("expected error")
392 .to_string();
393 assert_eq!(expected, &result[..expected.len()]);
394 }
395
396 #[test]
399 fn test_abfss_extract_path() {
400 let provider = AzureBlobStoreProvider;
401 let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path/to/dataset.lance")
402 .unwrap();
403 let path = provider.extract_path(&url).unwrap();
404 assert_eq!(
405 path,
406 object_store::path::Path::from("path/to/dataset.lance")
407 );
408 }
409
410 #[test]
411 fn test_calculate_abfss_prefix() {
412 let provider = AzureBlobStoreProvider;
413 let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path/to/data").unwrap();
414 let prefix = provider.calculate_object_store_prefix(&url, None).unwrap();
415 assert_eq!(prefix, "abfss$myfs@myaccount");
416 }
417
418 #[test]
419 fn test_calculate_abfss_prefix_ignores_storage_options() {
420 let provider = AzureBlobStoreProvider;
421 let options =
422 HashMap::from_iter([("account_name".to_string(), "other_account".to_string())]);
423 let url = Url::parse("abfss://myfs@myaccount.dfs.core.windows.net/path").unwrap();
424 let prefix = provider
425 .calculate_object_store_prefix(&url, Some(&options))
426 .unwrap();
427 assert_eq!(prefix, "abfss$myfs@myaccount");
428 }
429
430 #[tokio::test]
431 async fn test_abfss_default_uses_microsoft_builder() {
432 use crate::object_store::StorageOptionsAccessor;
433 let provider = AzureBlobStoreProvider;
434 let url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
435 let params = ObjectStoreParams {
436 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
437 HashMap::from([
438 ("account_name".to_string(), "testaccount".to_string()),
439 ("account_key".to_string(), "dGVzdA==".to_string()),
440 ]),
441 ))),
442 ..Default::default()
443 };
444
445 let store = provider.new_store(url, ¶ms).await.unwrap();
446 assert_eq!(store.scheme, "abfss");
447 assert!(!store.is_local());
448 assert!(store.is_cloud());
449 let inner_desc = store.inner.to_string();
450 assert!(
451 inner_desc.contains("MicrosoftAzure"),
452 "abfss:// without use_opendal should use MicrosoftAzureBuilder, got: {}",
453 inner_desc
454 );
455 }
456
457 #[tokio::test]
458 async fn test_unsupported_scheme_rejected() {
459 use crate::object_store::StorageOptionsAccessor;
460 let provider = AzureBlobStoreProvider;
461 let url = Url::parse("wasbs://container@myaccount.blob.core.windows.net/path").unwrap();
462 let params = ObjectStoreParams {
463 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
464 HashMap::from([
465 ("account_name".to_string(), "myaccount".to_string()),
466 ("account_key".to_string(), "dGVzdA==".to_string()),
467 ]),
468 ))),
469 ..Default::default()
470 };
471
472 let err = provider
473 .new_store(url, ¶ms)
474 .await
475 .expect_err("expected error for unsupported scheme");
476 assert!(
477 err.to_string().contains("Unsupported Azure scheme"),
478 "unexpected error: {}",
479 err
480 );
481 }
482
483 #[tokio::test]
484 async fn test_abfss_with_opendal_uses_azdls() {
485 use crate::object_store::StorageOptionsAccessor;
486 let provider = AzureBlobStoreProvider;
487 let url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
488 let params = ObjectStoreParams {
489 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
490 HashMap::from([
491 ("use_opendal".to_string(), "true".to_string()),
492 ("account_name".to_string(), "testaccount".to_string()),
493 ("account_key".to_string(), "dGVzdA==".to_string()),
494 ]),
495 ))),
496 ..Default::default()
497 };
498
499 let store = provider.new_store(url, ¶ms).await.unwrap();
500 assert_eq!(store.scheme, "abfss");
501 assert!(!store.is_local());
502 assert!(store.is_cloud());
503 let inner_desc = store.inner.to_string();
504 assert!(
505 inner_desc.contains("Opendal") && inner_desc.contains("azdls"),
506 "abfss:// with use_opendal=true should use OpenDAL Azdls, got: {}",
507 inner_desc
508 );
509 }
510
511 #[test]
512 fn test_azdls_capabilities_differ_from_azblob() {
513 let common_opts = StorageOptions(HashMap::from([
514 ("account_name".to_string(), "testaccount".to_string()),
515 ("account_key".to_string(), "dGVzdA==".to_string()),
516 (
517 "endpoint".to_string(),
518 "https://testaccount.blob.core.windows.net".to_string(),
519 ),
520 ]));
521
522 let az_url = Url::parse("az://test-container/path").unwrap();
524 let az_operator =
525 AzureBlobStoreProvider::build_opendal_operator(&az_url, &common_opts).unwrap();
526
527 let abfss_url = Url::parse("abfss://testfs@testaccount.dfs.core.windows.net/data").unwrap();
529 let abfss_operator =
530 AzureBlobStoreProvider::build_opendal_operator(&abfss_url, &common_opts).unwrap();
531
532 let azblob_cap = az_operator.info().native_capability();
533 let azdls_cap = abfss_operator.info().native_capability();
534
535 assert!(azblob_cap.read);
537 assert!(azdls_cap.read);
538 assert!(azblob_cap.write);
539 assert!(azdls_cap.write);
540 assert!(azblob_cap.list);
541 assert!(azdls_cap.list);
542
543 assert!(azdls_cap.rename, "Azdls should support rename");
545 assert!(azdls_cap.create_dir, "Azdls should support create_dir");
546 assert!(!azblob_cap.rename, "Azblob should not support rename");
547 }
548}