iceberg_rust/object_store/
parse.rs1use crate::error::Error;
5use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
6use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
7use object_store::gcp::{GcpCredential, GoogleCloudStorageBuilder, GoogleConfigKey};
8use object_store::{parse_url_opts, ObjectStore, ObjectStoreScheme, StaticCredentialProvider};
9use std::collections::HashMap;
10use std::sync::Arc;
11use url::Url;
12
13const CLIENT_REGION: &str = "client.region";
15const AWS_ACCESS_KEY_ID: &str = "s3.access-key-id";
16const AWS_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
17const AWS_SESSION_TOKEN: &str = "s3.session-token";
18const AWS_REGION: &str = "s3.region";
19const AWS_ENDPOINT: &str = "s3.endpoint";
20const AWS_ALLOW_ANONYMOUS: &str = "s3.allow-anonymous";
21
22const GCS_BUCKET: &str = "gcs.bucket";
24const GCS_CREDENTIALS_JSON: &str = "gcs.credentials-json";
25const GCS_TOKEN: &str = "gcs.oauth2.token";
26
27const AZURE_CONTAINER_NAME: &str = "azure.container-name";
29const AZURE_ENDPOINT: &str = "azure.endpoint";
30const AZURE_STORAGE_ACCESS_KEY: &str = "azure.access-key";
31const AZURE_STORAGE_ACCOUNT_NAME: &str = "azure.account-name";
32
33pub fn object_store_from_config(
36 url: Url,
37 config: HashMap<String, String>,
38) -> Result<Arc<dyn ObjectStore>, Error> {
39 let store = match ObjectStoreScheme::parse(&url).map_err(object_store::Error::from)? {
40 (ObjectStoreScheme::AmazonS3, _) => {
41 let mut builder = AmazonS3Builder::new().with_url(url);
42 for (key, option) in config {
43 let s3_key = match key.as_str() {
44 AWS_ACCESS_KEY_ID => AmazonS3ConfigKey::AccessKeyId,
45 AWS_SECRET_ACCESS_KEY => AmazonS3ConfigKey::SecretAccessKey,
46 AWS_SESSION_TOKEN => AmazonS3ConfigKey::Token,
47 CLIENT_REGION | AWS_REGION => AmazonS3ConfigKey::Region,
48 AWS_ENDPOINT => {
49 if option.starts_with("http://") {
50 builder = builder.with_allow_http(true);
52 }
53 AmazonS3ConfigKey::Endpoint
54 }
55 AWS_ALLOW_ANONYMOUS => AmazonS3ConfigKey::SkipSignature,
56 _ => continue,
57 };
58 builder = builder.with_config(s3_key, option);
59 }
60 Arc::new(builder.build()?) as Arc<dyn ObjectStore>
61 }
62
63 (ObjectStoreScheme::GoogleCloudStorage, _) => {
64 let mut builder = GoogleCloudStorageBuilder::new().with_url(url);
65 for (key, option) in config {
66 let gcs_key = match key.as_str() {
67 GCS_CREDENTIALS_JSON => GoogleConfigKey::ServiceAccountKey,
68 GCS_BUCKET => GoogleConfigKey::Bucket,
69 GCS_TOKEN => {
70 let credential = GcpCredential { bearer: option };
71 let credential_provider =
72 Arc::new(StaticCredentialProvider::new(credential)) as _;
73 builder = builder.with_credentials(credential_provider);
74 continue;
75 }
76 _ => continue,
77 };
78 builder = builder.with_config(gcs_key, option);
79 }
80 Arc::new(builder.build()?) as Arc<dyn ObjectStore>
81 }
82
83 (ObjectStoreScheme::MicrosoftAzure, _) => {
84 let mut builder = MicrosoftAzureBuilder::new().with_url(url);
85 for (key, option) in config {
86 let azure_key = match key.as_str() {
87 AZURE_CONTAINER_NAME => AzureConfigKey::ContainerName,
88 AZURE_STORAGE_ACCOUNT_NAME => AzureConfigKey::AccountName,
89 AZURE_STORAGE_ACCESS_KEY => AzureConfigKey::AccessKey,
90 AZURE_ENDPOINT => {
91 if option.starts_with("http://") {
92 builder = builder.with_allow_http(true);
94 }
95 AzureConfigKey::Endpoint
96 }
97 _ => continue,
98 };
99 builder = builder.with_config(azure_key, option);
100 }
101 Arc::new(builder.build()?) as Arc<dyn ObjectStore>
102 }
103
104 _ => {
105 let (store, _path) = parse_url_opts(&url, config)?;
106 store.into()
107 }
108 };
109
110 Ok(store)
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116 use serde_json::json;
117 use std::collections::HashMap;
118 use url::Url;
119
120 #[test]
121 fn test_s3_config_basic() {
122 let url = Url::parse("s3://test-bucket/path").unwrap();
123 let mut config = HashMap::new();
124 config.insert(AWS_ACCESS_KEY_ID.to_string(), "test-key".to_string());
125 config.insert(AWS_SECRET_ACCESS_KEY.to_string(), "test-secret".to_string());
126 config.insert(AWS_SESSION_TOKEN.to_string(), "test-session".to_string());
127 config.insert(AWS_REGION.to_string(), "us-east-1".to_string());
128
129 let store = object_store_from_config(url, config).unwrap();
130 let store_repr = format!("{store:?}");
131
132 assert!(store_repr.contains("region: \"us-east-1\""));
133 assert!(store_repr.contains("bucket: \"test-bucket\""));
134 assert!(store_repr.contains("key_id: \"test-key\""));
135 assert!(store_repr.contains("secret_key: \"******\""));
136 assert!(store_repr.contains("token: Some(\"******\")"));
137 assert!(store_repr.contains("endpoint: None"));
138 assert!(store_repr.contains("allow_http: Parsed(false)"));
139 assert!(store_repr.contains("skip_signature: false"));
140 }
141
142 #[test]
143 fn test_s3_config_with_http_endpoint() {
144 let url = Url::parse("s3://test-bucket/").unwrap();
145 let mut config = HashMap::new();
146 config.insert(
147 AWS_ENDPOINT.to_string(),
148 "http://localhost:9000".to_string(),
149 );
150 config.insert(AWS_ALLOW_ANONYMOUS.to_string(), "true".to_string());
151
152 let store = object_store_from_config(url, config).unwrap();
153 let store_repr = format!("{store:?}");
154
155 assert!(store_repr.contains("region: \"us-east-1\""));
156 assert!(store_repr.contains("bucket: \"test-bucket\""));
157 assert!(!store_repr.contains("key_id: "));
158 assert!(!store_repr.contains("secret_key: "));
159 assert!(!store_repr.contains("token: "));
160 assert!(store_repr.contains("endpoint: Some(\"http://localhost:9000\")"));
161 assert!(store_repr.contains("allow_http: Parsed(true)"));
162 assert!(store_repr.contains("skip_signature: true"));
163 }
164
165 #[test]
166 fn test_gcs_config_with_service_account() {
167 let url = Url::parse("gs://test-bucket/").unwrap();
168 let mut config = HashMap::new();
169 config.insert(
170 GCS_CREDENTIALS_JSON.to_string(),
171 json!(
172 {
173 "disable_oauth": true, "client_email": "", "private_key": "", "private_key_id": ""
174 }
175 )
176 .to_string(),
177 );
178 config.insert(GCS_BUCKET.to_string(), "test-bucket".to_string());
179
180 let store = object_store_from_config(url, config).unwrap();
181 let store_repr = format!("{store:?}");
182
183 assert!(store_repr.contains("bearer: \"\""));
184 assert!(store_repr.contains("bucket_name: \"test-bucket\""));
185 }
186
187 #[test]
188 fn test_gcs_config_with_oauth_token() {
189 let url = Url::parse("gs://test-bucket/").unwrap();
190 let mut config = HashMap::new();
191 config.insert(GCS_TOKEN.to_string(), "oauth-token-123".to_string());
192 config.insert(GCS_BUCKET.to_string(), "test-bucket".to_string());
193
194 let store = object_store_from_config(url, config).unwrap();
195 let store_repr = format!("{store:?}");
196
197 assert!(store_repr.contains("bearer: \"oauth-token-123\""));
198 assert!(store_repr.contains("bucket_name: \"test-bucket\""));
199 }
200
201 #[test]
202 fn test_azure_config_basic() {
203 let url = Url::parse("https://testaccount.blob.core.windows.net/test-container").unwrap();
204 let mut config = HashMap::new();
205 config.insert(
206 AZURE_STORAGE_ACCOUNT_NAME.to_string(),
207 "testaccount".to_string(),
208 );
209 config.insert(AZURE_STORAGE_ACCESS_KEY.to_string(), "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==".to_string());
210
211 let store = object_store_from_config(url, config).unwrap();
212 let store_repr = format!("{store:?}");
213
214 println!("{}", store_repr);
215 assert!(store_repr.contains("account: \"testaccount\""));
216 assert!(store_repr.contains("container: \"test-container\""));
217 assert!(store_repr.contains("host: Some(Domain(\"testaccount.blob.core.windows.net\"))"));
218 assert!(store_repr.contains("port: None"));
219 assert!(store_repr.contains("scheme: \"https\""));
220 assert!(store_repr.contains("allow_http: Parsed(false)"));
221 }
222
223 #[test]
224 fn test_azure_config_with_http_endpoint() {
225 let url = Url::parse("https://testaccount.blob.core.windows.net/test-container").unwrap();
226 let mut config = HashMap::new();
227 config.insert(
228 AZURE_ENDPOINT.to_string(),
229 "http://localhost:9000".to_string(),
230 );
231 config.insert(
232 AZURE_STORAGE_ACCOUNT_NAME.to_string(),
233 "testaccount".to_string(),
234 );
235 config.insert(AZURE_STORAGE_ACCESS_KEY.to_string(), "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==".to_string());
236
237 let store = object_store_from_config(url, config).unwrap();
238 let store_repr = format!("{store:?}");
239
240 assert!(store_repr.contains("account: \"testaccount\""));
241 assert!(store_repr.contains("container: \"test-container\""));
242 assert!(store_repr.contains("host: Some(Domain(\"localhost\"))"));
243 assert!(store_repr.contains("port: Some(9000)"));
244 assert!(store_repr.contains("scheme: \"http\""));
245 assert!(store_repr.contains("allow_http: Parsed(true)"));
246 }
247}