iceberg_rust/object_store/
parse.rs

1/*! Utils for converting standard Iceberg config formats to equivalent `object_store` options
2*/
3
4use crate::error::Error;
5use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
6use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
7use object_store::gcp::{GcpCredential, GoogleCloudStorageBuilder, GoogleConfigKey};
8use object_store::{parse_url_opts, ObjectStore, ObjectStoreScheme, StaticCredentialProvider};
9use std::collections::HashMap;
10use std::sync::Arc;
11use url::Url;
12
13/// AWS configs
14const CLIENT_REGION: &str = "client.region";
15const AWS_ACCESS_KEY_ID: &str = "s3.access-key-id";
16const AWS_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
17const AWS_SESSION_TOKEN: &str = "s3.session-token";
18const AWS_REGION: &str = "s3.region";
19const AWS_ENDPOINT: &str = "s3.endpoint";
20const AWS_ALLOW_ANONYMOUS: &str = "s3.allow-anonymous";
21
22/// GCP configs
23const GCS_BUCKET: &str = "gcs.bucket";
24const GCS_CREDENTIALS_JSON: &str = "gcs.credentials-json";
25const GCS_TOKEN: &str = "gcs.oauth2.token";
26
27/// Azure configs
28const AZURE_CONTAINER_NAME: &str = "azure.container-name";
29const AZURE_ENDPOINT: &str = "azure.endpoint";
30const AZURE_STORAGE_ACCESS_KEY: &str = "azure.access-key";
31const AZURE_STORAGE_ACCOUNT_NAME: &str = "azure.account-name";
32
33/// Parse the url and Iceberg format of variuos storage options into the equivalent `object_store`
34/// options and build the corresponding `ObjectStore`.
35pub fn object_store_from_config(
36    url: Url,
37    config: HashMap<String, String>,
38) -> Result<Arc<dyn ObjectStore>, Error> {
39    let store = match ObjectStoreScheme::parse(&url).map_err(object_store::Error::from)? {
40        (ObjectStoreScheme::AmazonS3, _) => {
41            let mut builder = AmazonS3Builder::new().with_url(url);
42            for (key, option) in config {
43                let s3_key = match key.as_str() {
44                    AWS_ACCESS_KEY_ID => AmazonS3ConfigKey::AccessKeyId,
45                    AWS_SECRET_ACCESS_KEY => AmazonS3ConfigKey::SecretAccessKey,
46                    AWS_SESSION_TOKEN => AmazonS3ConfigKey::Token,
47                    CLIENT_REGION | AWS_REGION => AmazonS3ConfigKey::Region,
48                    AWS_ENDPOINT => {
49                        if option.starts_with("http://") {
50                            // This is mainly used for testing, e.g. against MinIO
51                            builder = builder.with_allow_http(true);
52                        }
53                        AmazonS3ConfigKey::Endpoint
54                    }
55                    AWS_ALLOW_ANONYMOUS => AmazonS3ConfigKey::SkipSignature,
56                    _ => continue,
57                };
58                builder = builder.with_config(s3_key, option);
59            }
60            Arc::new(builder.build()?) as Arc<dyn ObjectStore>
61        }
62
63        (ObjectStoreScheme::GoogleCloudStorage, _) => {
64            let mut builder = GoogleCloudStorageBuilder::new().with_url(url);
65            for (key, option) in config {
66                let gcs_key = match key.as_str() {
67                    GCS_CREDENTIALS_JSON => GoogleConfigKey::ServiceAccountKey,
68                    GCS_BUCKET => GoogleConfigKey::Bucket,
69                    GCS_TOKEN => {
70                        let credential = GcpCredential { bearer: option };
71                        let credential_provider =
72                            Arc::new(StaticCredentialProvider::new(credential)) as _;
73                        builder = builder.with_credentials(credential_provider);
74                        continue;
75                    }
76                    _ => continue,
77                };
78                builder = builder.with_config(gcs_key, option);
79            }
80            Arc::new(builder.build()?) as Arc<dyn ObjectStore>
81        }
82
83        (ObjectStoreScheme::MicrosoftAzure, _) => {
84            let mut builder = MicrosoftAzureBuilder::new().with_url(url);
85            for (key, option) in config {
86                let azure_key = match key.as_str() {
87                    AZURE_CONTAINER_NAME => AzureConfigKey::ContainerName,
88                    AZURE_STORAGE_ACCOUNT_NAME => AzureConfigKey::AccountName,
89                    AZURE_STORAGE_ACCESS_KEY => AzureConfigKey::AccessKey,
90                    AZURE_ENDPOINT => {
91                        if option.starts_with("http://") {
92                            // This is mainly used for testing, e.g. against Azurite
93                            builder = builder.with_allow_http(true);
94                        }
95                        AzureConfigKey::Endpoint
96                    }
97                    _ => continue,
98                };
99                builder = builder.with_config(azure_key, option);
100            }
101            Arc::new(builder.build()?) as Arc<dyn ObjectStore>
102        }
103
104        _ => {
105            let (store, _path) = parse_url_opts(&url, config)?;
106            store.into()
107        }
108    };
109
110    Ok(store)
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use serde_json::json;
117    use std::collections::HashMap;
118    use url::Url;
119
120    #[test]
121    fn test_s3_config_basic() {
122        let url = Url::parse("s3://test-bucket/path").unwrap();
123        let mut config = HashMap::new();
124        config.insert(AWS_ACCESS_KEY_ID.to_string(), "test-key".to_string());
125        config.insert(AWS_SECRET_ACCESS_KEY.to_string(), "test-secret".to_string());
126        config.insert(AWS_SESSION_TOKEN.to_string(), "test-session".to_string());
127        config.insert(AWS_REGION.to_string(), "us-east-1".to_string());
128
129        let store = object_store_from_config(url, config).unwrap();
130        let store_repr = format!("{store:?}");
131
132        assert!(store_repr.contains("region: \"us-east-1\""));
133        assert!(store_repr.contains("bucket: \"test-bucket\""));
134        assert!(store_repr.contains("key_id: \"test-key\""));
135        assert!(store_repr.contains("secret_key: \"******\""));
136        assert!(store_repr.contains("token: Some(\"******\")"));
137        assert!(store_repr.contains("endpoint: None"));
138        assert!(store_repr.contains("allow_http: Parsed(false)"));
139        assert!(store_repr.contains("skip_signature: false"));
140    }
141
142    #[test]
143    fn test_s3_config_with_http_endpoint() {
144        let url = Url::parse("s3://test-bucket/").unwrap();
145        let mut config = HashMap::new();
146        config.insert(
147            AWS_ENDPOINT.to_string(),
148            "http://localhost:9000".to_string(),
149        );
150        config.insert(AWS_ALLOW_ANONYMOUS.to_string(), "true".to_string());
151
152        let store = object_store_from_config(url, config).unwrap();
153        let store_repr = format!("{store:?}");
154
155        assert!(store_repr.contains("region: \"us-east-1\""));
156        assert!(store_repr.contains("bucket: \"test-bucket\""));
157        assert!(!store_repr.contains("key_id: "));
158        assert!(!store_repr.contains("secret_key: "));
159        assert!(!store_repr.contains("token: "));
160        assert!(store_repr.contains("endpoint: Some(\"http://localhost:9000\")"));
161        assert!(store_repr.contains("allow_http: Parsed(true)"));
162        assert!(store_repr.contains("skip_signature: true"));
163    }
164
165    #[test]
166    fn test_gcs_config_with_service_account() {
167        let url = Url::parse("gs://test-bucket/").unwrap();
168        let mut config = HashMap::new();
169        config.insert(
170            GCS_CREDENTIALS_JSON.to_string(),
171            json!(
172                {
173                  "disable_oauth": true, "client_email": "", "private_key": "", "private_key_id": ""
174                }
175            )
176            .to_string(),
177        );
178        config.insert(GCS_BUCKET.to_string(), "test-bucket".to_string());
179
180        let store = object_store_from_config(url, config).unwrap();
181        let store_repr = format!("{store:?}");
182
183        assert!(store_repr.contains("bearer: \"\""));
184        assert!(store_repr.contains("bucket_name: \"test-bucket\""));
185    }
186
187    #[test]
188    fn test_gcs_config_with_oauth_token() {
189        let url = Url::parse("gs://test-bucket/").unwrap();
190        let mut config = HashMap::new();
191        config.insert(GCS_TOKEN.to_string(), "oauth-token-123".to_string());
192        config.insert(GCS_BUCKET.to_string(), "test-bucket".to_string());
193
194        let store = object_store_from_config(url, config).unwrap();
195        let store_repr = format!("{store:?}");
196
197        assert!(store_repr.contains("bearer: \"oauth-token-123\""));
198        assert!(store_repr.contains("bucket_name: \"test-bucket\""));
199    }
200
201    #[test]
202    fn test_azure_config_basic() {
203        let url = Url::parse("https://testaccount.blob.core.windows.net/test-container").unwrap();
204        let mut config = HashMap::new();
205        config.insert(
206            AZURE_STORAGE_ACCOUNT_NAME.to_string(),
207            "testaccount".to_string(),
208        );
209        config.insert(AZURE_STORAGE_ACCESS_KEY.to_string(), "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==".to_string());
210
211        let store = object_store_from_config(url, config).unwrap();
212        let store_repr = format!("{store:?}");
213
214        println!("{}", store_repr);
215        assert!(store_repr.contains("account: \"testaccount\""));
216        assert!(store_repr.contains("container: \"test-container\""));
217        assert!(store_repr.contains("host: Some(Domain(\"testaccount.blob.core.windows.net\"))"));
218        assert!(store_repr.contains("port: None"));
219        assert!(store_repr.contains("scheme: \"https\""));
220        assert!(store_repr.contains("allow_http: Parsed(false)"));
221    }
222
223    #[test]
224    fn test_azure_config_with_http_endpoint() {
225        let url = Url::parse("https://testaccount.blob.core.windows.net/test-container").unwrap();
226        let mut config = HashMap::new();
227        config.insert(
228            AZURE_ENDPOINT.to_string(),
229            "http://localhost:9000".to_string(),
230        );
231        config.insert(
232            AZURE_STORAGE_ACCOUNT_NAME.to_string(),
233            "testaccount".to_string(),
234        );
235        config.insert(AZURE_STORAGE_ACCESS_KEY.to_string(), "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==".to_string());
236
237        let store = object_store_from_config(url, config).unwrap();
238        let store_repr = format!("{store:?}");
239
240        assert!(store_repr.contains("account: \"testaccount\""));
241        assert!(store_repr.contains("container: \"test-container\""));
242        assert!(store_repr.contains("host: Some(Domain(\"localhost\"))"));
243        assert!(store_repr.contains("port: Some(9000)"));
244        assert!(store_repr.contains("scheme: \"http\""));
245        assert!(store_repr.contains("allow_http: Parsed(true)"));
246    }
247}