lance_io/object_store/providers/
azure.rs1use std::{
5 collections::HashMap,
6 str::FromStr,
7 sync::{Arc, LazyLock},
8 time::Duration,
9};
10
11use object_store::ObjectStore as OSObjectStore;
12use object_store_opendal::OpendalStore;
13use opendal::{services::Azblob, Operator};
14use snafu::location;
15
16use object_store::{
17 azure::{AzureConfigKey, MicrosoftAzureBuilder},
18 RetryConfig,
19};
20use url::Url;
21
22use crate::object_store::{
23 ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE,
24 DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE,
25};
26use lance_core::error::{Error, Result};
27
28#[derive(Default, Debug)]
29pub struct AzureBlobStoreProvider;
30
31impl AzureBlobStoreProvider {
32 async fn build_opendal_azure_store(
33 &self,
34 base_path: &Url,
35 storage_options: &StorageOptions,
36 ) -> Result<Arc<dyn OSObjectStore>> {
37 let container = base_path
38 .host_str()
39 .ok_or_else(|| {
40 Error::invalid_input("Azure URL must contain container name", location!())
41 })?
42 .to_string();
43
44 let prefix = base_path.path().trim_start_matches('/').to_string();
45
46 let mut config_map: HashMap<String, String> = storage_options.0.clone();
49
50 config_map.insert("container".to_string(), container);
52
53 if !prefix.is_empty() {
54 config_map.insert("root".to_string(), format!("/{}", prefix));
55 }
56
57 let operator = Operator::from_iter::<Azblob>(config_map)
58 .map_err(|e| {
59 Error::invalid_input(
60 format!("Failed to create Azure Blob operator: {:?}", e),
61 location!(),
62 )
63 })?
64 .finish();
65
66 Ok(Arc::new(OpendalStore::new(operator)) as Arc<dyn OSObjectStore>)
67 }
68
69 async fn build_microsoft_azure_store(
70 &self,
71 base_path: &Url,
72 storage_options: &StorageOptions,
73 ) -> Result<Arc<dyn OSObjectStore>> {
74 let max_retries = storage_options.client_max_retries();
75 let retry_timeout = storage_options.client_retry_timeout();
76 let retry_config = RetryConfig {
77 backoff: Default::default(),
78 max_retries,
79 retry_timeout: Duration::from_secs(retry_timeout),
80 };
81
82 let mut builder = MicrosoftAzureBuilder::new()
83 .with_url(base_path.as_ref())
84 .with_retry(retry_config);
85 for (key, value) in storage_options.as_azure_options() {
86 builder = builder.with_config(key, value);
87 }
88
89 Ok(Arc::new(builder.build()?) as Arc<dyn OSObjectStore>)
90 }
91}
92
93#[async_trait::async_trait]
94impl ObjectStoreProvider for AzureBlobStoreProvider {
95 async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
96 let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE);
97 let mut storage_options =
98 StorageOptions(params.storage_options().cloned().unwrap_or_default());
99 storage_options.with_env_azure();
100 let download_retry_count = storage_options.download_retry_count();
101
102 let use_opendal = storage_options
103 .0
104 .get("use_opendal")
105 .map(|v| v.as_str() == "true")
106 .unwrap_or(false);
107
108 let inner = if use_opendal {
109 self.build_opendal_azure_store(&base_path, &storage_options)
110 .await?
111 } else {
112 self.build_microsoft_azure_store(&base_path, &storage_options)
113 .await?
114 };
115
116 Ok(ObjectStore {
117 inner,
118 scheme: String::from("az"),
119 block_size,
120 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
121 use_constant_size_upload_parts: false,
122 list_is_lexically_ordered: true,
123 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
124 download_retry_count,
125 io_tracker: Default::default(),
126 store_prefix: self
127 .calculate_object_store_prefix(&base_path, params.storage_options())?,
128 })
129 }
130
131 fn calculate_object_store_prefix(
132 &self,
133 url: &Url,
134 storage_options: Option<&HashMap<String, String>>,
135 ) -> Result<String> {
136 let authority = url.authority();
137 let (container, account) = match authority.find("@") {
138 Some(at_index) => {
139 let container = &authority[..at_index];
142 let account = &authority[at_index + 1..];
143 (
144 container,
145 account.split(".").next().unwrap_or_default().to_string(),
146 )
147 }
148 None => {
149 let mut account = match storage_options {
152 Some(opts) => StorageOptions::find_configured_storage_account(opts),
153 None => None,
154 };
155 if account.is_none() {
156 account = StorageOptions::find_configured_storage_account(&ENV_OPTIONS.0);
157 }
158 let account = account.ok_or(Error::invalid_input(
159 "Unable to find object store prefix: no Azure account name in URI, and no storage account configured.",
160 location!(),
161 ))?;
162 (authority, account)
163 }
164 };
165 Ok(format!("{}${}@{}", url.scheme(), container, account))
166 }
167}
168
169static ENV_OPTIONS: LazyLock<StorageOptions> = LazyLock::new(StorageOptions::from_env);
170
171impl StorageOptions {
172 fn from_env() -> Self {
174 let mut opts = HashMap::<String, String>::new();
175 for (os_key, os_value) in std::env::vars_os() {
176 if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
177 if let Ok(config_key) = AzureConfigKey::from_str(&key.to_ascii_lowercase()) {
178 opts.insert(config_key.as_ref().to_string(), value.to_string());
179 }
180 }
181 }
182 Self(opts)
183 }
184
185 pub fn with_env_azure(&mut self) {
187 for (os_key, os_value) in &ENV_OPTIONS.0 {
188 if !self.0.contains_key(os_key) {
189 self.0.insert(os_key.clone(), os_value.clone());
190 }
191 }
192 }
193
194 pub fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
196 self.0
197 .iter()
198 .filter_map(|(key, value)| {
199 let az_key = AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?;
200 Some((az_key, value.clone()))
201 })
202 .collect()
203 }
204
205 #[allow(clippy::manual_map)]
206 fn find_configured_storage_account(map: &HashMap<String, String>) -> Option<String> {
207 if let Some(account) = map.get("azure_storage_account_name") {
208 Some(account.clone())
209 } else if let Some(account) = map.get("account_name") {
210 Some(account.clone())
211 } else {
212 None
213 }
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use crate::object_store::ObjectStoreParams;
221 use std::collections::HashMap;
222
223 #[test]
224 fn test_azure_store_path() {
225 let provider = AzureBlobStoreProvider;
226
227 let url = Url::parse("az://bucket/path/to/file").unwrap();
228 let path = provider.extract_path(&url).unwrap();
229 let expected_path = object_store::path::Path::from("path/to/file");
230 assert_eq!(path, expected_path);
231 }
232
233 #[tokio::test]
234 async fn test_use_opendal_flag() {
235 use crate::object_store::StorageOptionsAccessor;
236 let provider = AzureBlobStoreProvider;
237 let url = Url::parse("az://test-container/path").unwrap();
238 let params_with_flag = ObjectStoreParams {
239 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
240 HashMap::from([
241 ("use_opendal".to_string(), "true".to_string()),
242 ("account_name".to_string(), "test_account".to_string()),
243 (
244 "endpoint".to_string(),
245 "https://test_account.blob.core.windows.net".to_string(),
246 ),
247 (
248 "account_key".to_string(),
249 "dGVzdF9hY2NvdW50X2tleQ==".to_string(),
250 ),
251 ]),
252 ))),
253 ..Default::default()
254 };
255
256 let store = provider
257 .new_store(url.clone(), ¶ms_with_flag)
258 .await
259 .unwrap();
260 assert_eq!(store.scheme, "az");
261 }
262
263 #[test]
264 fn test_find_configured_storage_account() {
265 assert_eq!(
266 Some("myaccount".to_string()),
267 StorageOptions::find_configured_storage_account(&HashMap::from_iter(
268 [
269 ("access_key".to_string(), "myaccesskey".to_string()),
270 (
271 "azure_storage_account_name".to_string(),
272 "myaccount".to_string()
273 )
274 ]
275 .into_iter()
276 ))
277 );
278 }
279
280 #[test]
281 fn test_calculate_object_store_prefix_from_url_and_options() {
282 let provider = AzureBlobStoreProvider;
283 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
284 assert_eq!(
285 "az$container@bob",
286 provider
287 .calculate_object_store_prefix(
288 &Url::parse("az://container/path").unwrap(),
289 Some(&options)
290 )
291 .unwrap()
292 );
293 }
294
295 #[test]
296 fn test_calculate_object_store_prefix_from_url_and_ignored_options() {
297 let provider = AzureBlobStoreProvider;
298 let options = HashMap::from_iter([("account_name".to_string(), "bob".to_string())]);
299 assert_eq!(
300 "az$container@account",
301 provider
302 .calculate_object_store_prefix(
303 &Url::parse("az://container@account.dfs.core.windows.net/path").unwrap(),
304 Some(&options)
305 )
306 .unwrap()
307 );
308 }
309
310 #[test]
311 fn test_fail_to_calculate_object_store_prefix_from_url() {
312 let provider = AzureBlobStoreProvider;
313 let options = HashMap::from_iter([("access_key".to_string(), "myaccesskey".to_string())]);
314 let expected = "Invalid user input: Unable to find object store prefix: no Azure account name in URI, and no storage account configured.";
315 let result = provider
316 .calculate_object_store_prefix(
317 &Url::parse("az://container/path").unwrap(),
318 Some(&options),
319 )
320 .expect_err("expected error")
321 .to_string();
322 assert_eq!(expected, &result[..expected.len()]);
323 }
324}