lance_io/object_store/providers/
azure.rs1use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
5
6use object_store::{
7 azure::{AzureConfigKey, MicrosoftAzureBuilder},
8 RetryConfig,
9};
10use url::Url;
11
12use crate::object_store::{
13 ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE,
14 DEFAULT_CLOUD_IO_PARALLELISM,
15};
16use lance_core::error::Result;
17
18#[derive(Default, Debug)]
19pub struct AzureBlobStoreProvider;
20
21#[async_trait::async_trait]
22impl ObjectStoreProvider for AzureBlobStoreProvider {
23 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
24 let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
25 let mut storage_options =
26 StorageOptions(params.storage_options.clone().unwrap_or_default());
27 let download_retry_count = storage_options.download_retry_count();
28
29 let max_retries = storage_options.client_max_retries();
30 let retry_timeout = storage_options.client_retry_timeout();
31 let retry_config = RetryConfig {
32 backoff: Default::default(),
33 max_retries,
34 retry_timeout: Duration::from_secs(retry_timeout),
35 };
36
37 storage_options.with_env_azure();
38 let mut builder = MicrosoftAzureBuilder::new()
39 .with_url(base_path.as_ref())
40 .with_retry(retry_config);
41 for (key, value) in storage_options.as_azure_options() {
42 builder = builder.with_config(key, value);
43 }
44 let inner = Arc::new(builder.build()?);
45
46 Ok(ObjectStore {
47 inner,
48 scheme: String::from("az"),
49 block_size,
50 use_constant_size_upload_parts: false,
51 list_is_lexically_ordered: true,
52 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
53 download_retry_count,
54 })
55 }
56}
57
58impl StorageOptions {
59 pub fn with_env_azure(&mut self) {
61 for (os_key, os_value) in std::env::vars_os() {
62 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
63 if let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase()) {
64 if !self.0.contains_key(config_key.as_ref()) {
65 self.0
66 .insert(config_key.as_ref().to_string(), value.to_string());
67 }
68 }
69 }
70 }
71 }
72
73 pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
75 self.0
76 .iter()
77 .filter_map(|(key, value)| {
78 let az_key = AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
79 Some((az_key, value.clone()))
80 })
81 .collect()
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88
89 #[test]
90 fn test_azure_store_path() {
91 let provider = AzureBlobStoreProvider;
92
93 let url = Url::parse("az://bucket/path/to/file").unwrap();
94 let path = provider.extract_path(&url);
95 let expected_path = object_store::path::Path::from("path/to/file");
96 assert_eq!(path, expected_path);
97 }
98}