1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use deltalake_core::logstore::{
6 LogStore, LogStoreFactory, ObjectStoreFactory, ObjectStoreRef, StorageConfig,
7 client_options_from_certificate, default_logstore, logstore_factories, object_store_factories,
8};
9use deltalake_core::{DeltaResult, DeltaTableError, Path};
10use object_store::ObjectStoreScheme;
11use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
12use object_store::client::SpawnedReqwestConnector;
13use url::Url;
14
15mod config;
16pub mod error;
17
18trait AzureOptions {
19 fn as_azure_options(&self) -> HashMap<AzureConfigKey, String>;
20}
21
22impl AzureOptions for HashMap<String, String> {
23 fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
24 self.iter()
25 .filter_map(|(key, value)| {
26 Some((
27 AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?,
28 value.clone(),
29 ))
30 })
31 .collect()
32 }
33}
34
35#[derive(Clone, Default, Debug)]
36pub struct AzureFactory {}
37
38impl ObjectStoreFactory for AzureFactory {
39 fn parse_url_opts(
40 &self,
41 url: &Url,
42 config: &StorageConfig,
43 ) -> DeltaResult<(ObjectStoreRef, Path)> {
44 let mut builder = MicrosoftAzureBuilder::new()
45 .with_url(url.to_string())
46 .with_retry(config.retry.clone());
47 if let Some(runtime) = &config.runtime {
48 builder =
49 builder.with_http_connector(SpawnedReqwestConnector::new(runtime.get_handle()));
50 }
51
52 if let Some(ref cert_config) = config.certificate
53 && let Some(ref path) = cert_config.certificate_path
54 {
55 builder = builder.with_client_options(client_options_from_certificate(path)?);
56 }
57
58 let config = config::AzureConfigHelper::try_new(config.raw.as_azure_options())?.build()?;
59
60 for (key, value) in config.iter() {
61 builder = builder.with_config(*key, value.clone());
62 }
63
64 let store = builder.build()?;
65
66 let (_, path) =
67 ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError {
68 source: Box::new(e),
69 })?;
70 let prefix = Path::parse(path)?;
71
72 Ok((Arc::new(store), prefix))
73 }
74}
75
76impl LogStoreFactory for AzureFactory {
77 fn with_options(
78 &self,
79 prefixed_store: ObjectStoreRef,
80 root_store: ObjectStoreRef,
81 location: &Url,
82 options: &StorageConfig,
83 ) -> DeltaResult<Arc<dyn LogStore>> {
84 Ok(default_logstore(
85 prefixed_store,
86 root_store,
87 location,
88 options,
89 ))
90 }
91}
92
93pub fn register_handlers(_additional_prefixes: Option<Url>) {
95 let factory = Arc::new(AzureFactory {});
96 for scheme in ["az", "adl", "azure", "abfs", "abfss"].iter() {
97 let url = Url::parse(&format!("{scheme}://")).unwrap();
98 object_store_factories().insert(url.clone(), factory.clone());
99 logstore_factories().insert(url.clone(), factory.clone());
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use std::collections::HashMap;
107
108 #[test]
109 fn test_as_azure_options() {
110 use object_store::azure::AzureConfigKey;
111 let mut options = HashMap::default();
112 let key = "AZURE_STORAGE_ACCOUNT_KEY".to_string();
113 let value = "value".to_string();
114 options.insert(key, value.clone());
115
116 let converted = options.as_azure_options();
117 assert_eq!(converted.get(&AzureConfigKey::AccessKey), Some(&value));
118 }
119}