Skip to main content

deltalake_azure/
lib.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use deltalake_core::logstore::{
6    LogStore, LogStoreFactory, ObjectStoreFactory, ObjectStoreRef, StorageConfig,
7    client_options_from_certificate, default_logstore, logstore_factories, object_store_factories,
8};
9use deltalake_core::{DeltaResult, DeltaTableError, Path};
10use object_store::ObjectStoreScheme;
11use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
12use object_store::client::SpawnedReqwestConnector;
13use url::Url;
14
15mod config;
16pub mod error;
17
18trait AzureOptions {
19    fn as_azure_options(&self) -> HashMap<AzureConfigKey, String>;
20}
21
22impl AzureOptions for HashMap<String, String> {
23    fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
24        self.iter()
25            .filter_map(|(key, value)| {
26                Some((
27                    AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?,
28                    value.clone(),
29                ))
30            })
31            .collect()
32    }
33}
34
35#[derive(Clone, Default, Debug)]
36pub struct AzureFactory {}
37
38impl ObjectStoreFactory for AzureFactory {
39    fn parse_url_opts(
40        &self,
41        url: &Url,
42        config: &StorageConfig,
43    ) -> DeltaResult<(ObjectStoreRef, Path)> {
44        let mut builder = MicrosoftAzureBuilder::new()
45            .with_url(url.to_string())
46            .with_retry(config.retry.clone());
47        if let Some(runtime) = &config.runtime {
48            builder =
49                builder.with_http_connector(SpawnedReqwestConnector::new(runtime.get_handle()));
50        }
51
52        if let Some(ref cert_config) = config.certificate
53            && let Some(ref path) = cert_config.certificate_path
54        {
55            builder = builder.with_client_options(client_options_from_certificate(path)?);
56        }
57
58        let config = config::AzureConfigHelper::try_new(config.raw.as_azure_options())?.build()?;
59
60        for (key, value) in config.iter() {
61            builder = builder.with_config(*key, value.clone());
62        }
63
64        let store = builder.build()?;
65
66        let (_, path) =
67            ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError {
68                source: Box::new(e),
69            })?;
70        let prefix = Path::parse(path)?;
71
72        Ok((Arc::new(store), prefix))
73    }
74}
75
76impl LogStoreFactory for AzureFactory {
77    fn with_options(
78        &self,
79        prefixed_store: ObjectStoreRef,
80        root_store: ObjectStoreRef,
81        location: &Url,
82        options: &StorageConfig,
83    ) -> DeltaResult<Arc<dyn LogStore>> {
84        Ok(default_logstore(
85            prefixed_store,
86            root_store,
87            location,
88            options,
89        ))
90    }
91}
92
93/// Register an [ObjectStoreFactory] for common Azure [Url] schemes
94pub fn register_handlers(_additional_prefixes: Option<Url>) {
95    let factory = Arc::new(AzureFactory {});
96    for scheme in ["az", "adl", "azure", "abfs", "abfss"].iter() {
97        let url = Url::parse(&format!("{scheme}://")).unwrap();
98        object_store_factories().insert(url.clone(), factory.clone());
99        logstore_factories().insert(url.clone(), factory.clone());
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use std::collections::HashMap;
107
108    #[test]
109    fn test_as_azure_options() {
110        use object_store::azure::AzureConfigKey;
111        let mut options = HashMap::default();
112        let key = "AZURE_STORAGE_ACCOUNT_KEY".to_string();
113        let value = "value".to_string();
114        options.insert(key, value.clone());
115
116        let converted = options.as_azure_options();
117        assert_eq!(converted.get(&AzureConfigKey::AccessKey), Some(&value));
118    }
119}