iceberg_rust/object_store/
mod.rs1use std::{fmt::Display, path::Path, str::FromStr, sync::Arc};
6
7use object_store::{
8 aws::{AmazonS3Builder, AmazonS3ConfigKey, S3CopyIfNotExists},
9 azure::{AzureConfigKey, MicrosoftAzureBuilder},
10 gcp::{GoogleCloudStorageBuilder, GoogleConfigKey},
11 local::LocalFileSystem,
12 memory::InMemory,
13 ObjectStore,
14};
15
16use crate::error::Error;
17
18pub mod parse;
19pub mod store;
20
21#[derive(Debug)]
23pub enum Bucket<'s> {
24 S3(&'s str),
26 GCS(&'s str),
28 Azure(&'s str),
30 Local,
32}
33
34impl Display for Bucket<'_> {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 Bucket::S3(s) => write!(f, "s3://{s}"),
38 Bucket::GCS(s) => write!(f, "gs://{s}"),
39 Bucket::Azure(s) => write!(f, "https://{s}"),
40 Bucket::Local => write!(f, ""),
41 }
42 }
43}
44
45impl Bucket<'_> {
46 pub fn from_path(path: &str) -> Result<Bucket<'_>, Error> {
48 if path.starts_with("s3://") || path.starts_with("s3a://") {
49 let prefix = if path.starts_with("s3://") {
50 "s3://"
51 } else {
52 "s3a://"
53 };
54 path.trim_start_matches(prefix)
55 .split('/')
56 .next()
57 .map(Bucket::S3)
58 .ok_or(Error::NotFound(format!("Bucket in path {path}")))
59 } else if path.starts_with("gcs://") || path.starts_with("gs://") {
60 let prefix = if path.starts_with("gcs://") {
61 "gcs://"
62 } else {
63 "gs://"
64 };
65 path.trim_start_matches(prefix)
66 .split('/')
67 .next()
68 .map(Bucket::GCS)
69 .ok_or(Error::NotFound(format!("Bucket in path {path}")))
70 } else if path.starts_with("https://")
71 && (path.contains("dfs.core.windows.net")
72 || path.contains("blob.core.windows.net")
73 || path.contains("dfs.fabric.microsoft.com")
74 || path.contains("blob.fabric.microsoft.com"))
75 {
76 path.trim_start_matches("https://")
77 .split('/')
78 .nth(1)
79 .map(Bucket::Azure)
80 .ok_or(Error::NotFound(format!("Bucket in path {path}")))
81 } else {
82 Ok(Bucket::Local)
83 }
84 }
85}
86
87#[derive(Debug, Clone)]
89pub enum ObjectStoreBuilder {
90 Azure(Box<MicrosoftAzureBuilder>),
92 S3(Box<AmazonS3Builder>),
94 GCS(Box<GoogleCloudStorageBuilder>),
96 Filesystem(Arc<LocalFileSystem>),
98 Memory(Arc<InMemory>),
100}
101
102pub enum ConfigKey {
104 Azure(AzureConfigKey),
106 AWS(AmazonS3ConfigKey),
108 GCS(GoogleConfigKey),
110}
111
112impl FromStr for ConfigKey {
113 type Err = object_store::Error;
114 fn from_str(s: &str) -> Result<Self, Self::Err> {
115 if let Ok(x) = s.parse() {
116 return Ok(ConfigKey::Azure(x));
117 };
118 if let Ok(x) = s.parse() {
119 return Ok(ConfigKey::AWS(x));
120 };
121 if let Ok(x) = s.parse() {
122 return Ok(ConfigKey::GCS(x));
123 };
124 Err(object_store::Error::UnknownConfigurationKey {
125 store: "",
126 key: s.to_string(),
127 })
128 }
129}
130impl ObjectStoreBuilder {
131 pub fn azure() -> Self {
133 ObjectStoreBuilder::Azure(Box::new(MicrosoftAzureBuilder::from_env()))
134 }
135 pub fn s3() -> Self {
137 ObjectStoreBuilder::S3(Box::new(AmazonS3Builder::from_env()))
138 }
139 pub fn gcs() -> Self {
141 ObjectStoreBuilder::GCS(Box::new(GoogleCloudStorageBuilder::from_env()))
142 }
143 pub fn filesystem(prefix: impl AsRef<Path>) -> Self {
145 ObjectStoreBuilder::Filesystem(Arc::new(LocalFileSystem::new_with_prefix(prefix).unwrap()))
146 }
147 pub fn memory() -> Self {
149 ObjectStoreBuilder::Memory(Arc::new(InMemory::new()))
150 }
151 pub fn with_config(
153 self,
154 key: impl Into<String>,
155 value: impl Into<String>,
156 ) -> Result<Self, Error> {
157 match self {
158 ObjectStoreBuilder::Azure(azure) => {
159 let key: AzureConfigKey = key.into().parse()?;
160 Ok(ObjectStoreBuilder::Azure(Box::new(
161 azure.with_config(key, value),
162 )))
163 }
164 ObjectStoreBuilder::S3(aws) => {
165 let key: AmazonS3ConfigKey = key.into().parse()?;
166 Ok(ObjectStoreBuilder::S3(Box::new(
167 aws.with_config(key, value),
168 )))
169 }
170 ObjectStoreBuilder::GCS(gcs) => {
171 let key: GoogleConfigKey = key.into().parse()?;
172 Ok(ObjectStoreBuilder::GCS(Box::new(
173 gcs.with_config(key, value),
174 )))
175 }
176 x => Ok(x),
177 }
178 }
179 pub fn build(&self, bucket: Bucket) -> Result<Arc<dyn ObjectStore>, Error> {
181 match (bucket, self) {
182 (Bucket::Azure(bucket), Self::Azure(builder)) => Ok::<_, Error>(Arc::new(
183 (**builder)
184 .clone()
185 .with_container_name(bucket)
186 .build()
187 .map_err(Error::from)?,
188 )),
189 (Bucket::S3(bucket), Self::S3(builder)) => Ok::<_, Error>(Arc::new(
190 (**builder)
191 .clone()
192 .with_bucket_name(bucket)
193 .with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
194 .build()
195 .map_err(Error::from)?,
196 )),
197 (Bucket::GCS(bucket), Self::GCS(builder)) => Ok::<_, Error>(Arc::new(
198 (**builder)
199 .clone()
200 .with_bucket_name(bucket)
201 .build()
202 .map_err(Error::from)?,
203 )),
204 (Bucket::Local, Self::Filesystem(object_store)) => Ok(object_store.clone()),
205 (Bucket::Local, Self::Memory(object_store)) => Ok(object_store.clone()),
206 _ => Err(Error::NotSupported("Object store protocol".to_owned())),
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214
215 #[test]
216 fn test_from_path_s3() {
217 let bucket = Bucket::from_path("s3://my-bucket/path/to/file").unwrap();
218 match bucket {
219 Bucket::S3(name) => assert_eq!(name, "my-bucket"),
220 _ => panic!("Expected S3 bucket"),
221 }
222 }
223
224 #[test]
225 fn test_from_path_s3a() {
226 let bucket = Bucket::from_path("s3a://my-bucket/path/to/file").unwrap();
227 match bucket {
228 Bucket::S3(name) => assert_eq!(name, "my-bucket"),
229 _ => panic!("Expected S3 bucket"),
230 }
231 }
232
233 #[test]
234 fn test_from_path_gcs() {
235 let bucket = Bucket::from_path("gcs://my-bucket/path/to/file").unwrap();
236 match bucket {
237 Bucket::GCS(name) => assert_eq!(name, "my-bucket"),
238 _ => panic!("Expected GCS bucket"),
239 }
240 }
241
242 #[test]
243 fn test_from_path_gs() {
244 let bucket = Bucket::from_path("gs://my-bucket/path/to/file").unwrap();
245 match bucket {
246 Bucket::GCS(name) => assert_eq!(name, "my-bucket"),
247 _ => panic!("Expected GCS bucket"),
248 }
249 }
250
251 #[test]
252 fn test_from_path_azure_dfs() {
253 let bucket =
254 Bucket::from_path("https://mystorageaccount.dfs.core.windows.net/container/path")
255 .unwrap();
256 match bucket {
257 Bucket::Azure(name) => assert_eq!(name, "container"),
258 _ => panic!("Expected Azure bucket"),
259 }
260 }
261
262 #[test]
263 fn test_from_path_azure_blob() {
264 let bucket =
265 Bucket::from_path("https://mystorageaccount.blob.core.windows.net/container/path")
266 .unwrap();
267 match bucket {
268 Bucket::Azure(name) => assert_eq!(name, "container"),
269 _ => panic!("Expected Azure bucket"),
270 }
271 }
272
273 #[test]
274 fn test_from_path_azure_fabric_dfs() {
275 let bucket =
276 Bucket::from_path("https://mystorageaccount.dfs.fabric.microsoft.com/container/path")
277 .unwrap();
278 match bucket {
279 Bucket::Azure(name) => assert_eq!(name, "container"),
280 _ => panic!("Expected Azure bucket"),
281 }
282 }
283
284 #[test]
285 fn test_from_path_azure_fabric_blob() {
286 let bucket =
287 Bucket::from_path("https://mystorageaccount.blob.fabric.microsoft.com/container/path")
288 .unwrap();
289 match bucket {
290 Bucket::Azure(name) => assert_eq!(name, "container"),
291 _ => panic!("Expected Azure bucket"),
292 }
293 }
294
295 #[test]
296 fn test_from_path_local() {
297 let bucket = Bucket::from_path("/local/path/to/file").unwrap();
298 match bucket {
299 Bucket::Local => {}
300 _ => panic!("Expected Local bucket"),
301 }
302 }
303
304 #[test]
305 fn test_from_path_https_non_azure() {
306 let bucket = Bucket::from_path("https://example.com/path").unwrap();
307 match bucket {
308 Bucket::Local => {}
309 _ => panic!("Expected Local bucket"),
310 }
311 }
312}