iceberg_rust/object_store/
mod.rs

1/*!
2Defining the [Bucket] struct for specifying buckets for the ObjectStore.
3*/
4
5use std::{fmt::Display, path::Path, str::FromStr, sync::Arc};
6
7use object_store::{
8    aws::{AmazonS3Builder, AmazonS3ConfigKey, S3CopyIfNotExists},
9    azure::{AzureConfigKey, MicrosoftAzureBuilder},
10    gcp::{GoogleCloudStorageBuilder, GoogleConfigKey},
11    local::LocalFileSystem,
12    memory::InMemory,
13    ObjectStore,
14};
15
16use crate::error::Error;
17
18pub mod parse;
19pub mod store;
20
21/// Type for buckets for different cloud providers
22#[derive(Debug)]
23pub enum Bucket<'s> {
24    /// Aws S3 bucket
25    S3(&'s str),
26    /// GCS bucket
27    GCS(&'s str),
28    /// Azure container
29    Azure(&'s str),
30    /// No bucket
31    Local,
32}
33
34impl Display for Bucket<'_> {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            Bucket::S3(s) => write!(f, "s3://{s}"),
38            Bucket::GCS(s) => write!(f, "gs://{s}"),
39            Bucket::Azure(s) => write!(f, "https://{s}"),
40            Bucket::Local => write!(f, ""),
41        }
42    }
43}
44
45impl Bucket<'_> {
46    /// Get the bucket and cloud provider from the location string
47    pub fn from_path(path: &str) -> Result<Bucket<'_>, Error> {
48        if path.starts_with("s3://") || path.starts_with("s3a://") {
49            let prefix = if path.starts_with("s3://") {
50                "s3://"
51            } else {
52                "s3a://"
53            };
54            path.trim_start_matches(prefix)
55                .split('/')
56                .next()
57                .map(Bucket::S3)
58                .ok_or(Error::NotFound(format!("Bucket in path {path}")))
59        } else if path.starts_with("gcs://") || path.starts_with("gs://") {
60            let prefix = if path.starts_with("gcs://") {
61                "gcs://"
62            } else {
63                "gs://"
64            };
65            path.trim_start_matches(prefix)
66                .split('/')
67                .next()
68                .map(Bucket::GCS)
69                .ok_or(Error::NotFound(format!("Bucket in path {path}")))
70        } else if path.starts_with("https://")
71            && (path.contains("dfs.core.windows.net")
72                || path.contains("blob.core.windows.net")
73                || path.contains("dfs.fabric.microsoft.com")
74                || path.contains("blob.fabric.microsoft.com"))
75        {
76            path.trim_start_matches("https://")
77                .split('/')
78                .nth(1)
79                .map(Bucket::Azure)
80                .ok_or(Error::NotFound(format!("Bucket in path {path}")))
81        } else {
82            Ok(Bucket::Local)
83        }
84    }
85}
86
87/// A wrapper for ObjectStore builders that can be used as a template to generate an ObjectStore given a particular bucket.
88#[derive(Debug, Clone)]
89pub enum ObjectStoreBuilder {
90    /// Microsoft Azure builder
91    Azure(Box<MicrosoftAzureBuilder>),
92    /// AWS s3 builder
93    S3(Box<AmazonS3Builder>),
94    /// Google Cloud Storage builder
95    GCS(Box<GoogleCloudStorageBuilder>),
96    /// Filesystem builder
97    Filesystem(Arc<LocalFileSystem>),
98    /// In memory builder
99    Memory(Arc<InMemory>),
100}
101
102/// Configuration keys for [ObjectStoreBuilder]
103pub enum ConfigKey {
104    /// Configuration keys for Microsoft Azure
105    Azure(AzureConfigKey),
106    /// Configuration keys for AWS S3
107    AWS(AmazonS3ConfigKey),
108    /// Configuration keys for GCS
109    GCS(GoogleConfigKey),
110}
111
112impl FromStr for ConfigKey {
113    type Err = object_store::Error;
114    fn from_str(s: &str) -> Result<Self, Self::Err> {
115        if let Ok(x) = s.parse() {
116            return Ok(ConfigKey::Azure(x));
117        };
118        if let Ok(x) = s.parse() {
119            return Ok(ConfigKey::AWS(x));
120        };
121        if let Ok(x) = s.parse() {
122            return Ok(ConfigKey::GCS(x));
123        };
124        Err(object_store::Error::UnknownConfigurationKey {
125            store: "",
126            key: s.to_string(),
127        })
128    }
129}
130impl ObjectStoreBuilder {
131    /// Create a new Microsoft Azure ObjectStoreBuilder
132    pub fn azure() -> Self {
133        ObjectStoreBuilder::Azure(Box::new(MicrosoftAzureBuilder::from_env()))
134    }
135    /// Create new AWS S3 Object Store builder
136    pub fn s3() -> Self {
137        ObjectStoreBuilder::S3(Box::new(AmazonS3Builder::from_env()))
138    }
139    /// Create new AWS S3 Object Store builder
140    pub fn gcs() -> Self {
141        ObjectStoreBuilder::GCS(Box::new(GoogleCloudStorageBuilder::from_env()))
142    }
143    /// Create a new FileSystem ObjectStoreBuilder
144    pub fn filesystem(prefix: impl AsRef<Path>) -> Self {
145        ObjectStoreBuilder::Filesystem(Arc::new(LocalFileSystem::new_with_prefix(prefix).unwrap()))
146    }
147    /// Create a new InMemory ObjectStoreBuilder
148    pub fn memory() -> Self {
149        ObjectStoreBuilder::Memory(Arc::new(InMemory::new()))
150    }
151    /// Set config value for builder
152    pub fn with_config(
153        self,
154        key: impl Into<String>,
155        value: impl Into<String>,
156    ) -> Result<Self, Error> {
157        match self {
158            ObjectStoreBuilder::Azure(azure) => {
159                let key: AzureConfigKey = key.into().parse()?;
160                Ok(ObjectStoreBuilder::Azure(Box::new(
161                    azure.with_config(key, value),
162                )))
163            }
164            ObjectStoreBuilder::S3(aws) => {
165                let key: AmazonS3ConfigKey = key.into().parse()?;
166                Ok(ObjectStoreBuilder::S3(Box::new(
167                    aws.with_config(key, value),
168                )))
169            }
170            ObjectStoreBuilder::GCS(gcs) => {
171                let key: GoogleConfigKey = key.into().parse()?;
172                Ok(ObjectStoreBuilder::GCS(Box::new(
173                    gcs.with_config(key, value),
174                )))
175            }
176            x => Ok(x),
177        }
178    }
179    /// Create objectstore from template
180    pub fn build(&self, bucket: Bucket) -> Result<Arc<dyn ObjectStore>, Error> {
181        match (bucket, self) {
182            (Bucket::Azure(bucket), Self::Azure(builder)) => Ok::<_, Error>(Arc::new(
183                (**builder)
184                    .clone()
185                    .with_container_name(bucket)
186                    .build()
187                    .map_err(Error::from)?,
188            )),
189            (Bucket::S3(bucket), Self::S3(builder)) => Ok::<_, Error>(Arc::new(
190                (**builder)
191                    .clone()
192                    .with_bucket_name(bucket)
193                    .with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
194                    .build()
195                    .map_err(Error::from)?,
196            )),
197            (Bucket::GCS(bucket), Self::GCS(builder)) => Ok::<_, Error>(Arc::new(
198                (**builder)
199                    .clone()
200                    .with_bucket_name(bucket)
201                    .build()
202                    .map_err(Error::from)?,
203            )),
204            (Bucket::Local, Self::Filesystem(object_store)) => Ok(object_store.clone()),
205            (Bucket::Local, Self::Memory(object_store)) => Ok(object_store.clone()),
206            _ => Err(Error::NotSupported("Object store protocol".to_owned())),
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    #[test]
216    fn test_from_path_s3() {
217        let bucket = Bucket::from_path("s3://my-bucket/path/to/file").unwrap();
218        match bucket {
219            Bucket::S3(name) => assert_eq!(name, "my-bucket"),
220            _ => panic!("Expected S3 bucket"),
221        }
222    }
223
224    #[test]
225    fn test_from_path_s3a() {
226        let bucket = Bucket::from_path("s3a://my-bucket/path/to/file").unwrap();
227        match bucket {
228            Bucket::S3(name) => assert_eq!(name, "my-bucket"),
229            _ => panic!("Expected S3 bucket"),
230        }
231    }
232
233    #[test]
234    fn test_from_path_gcs() {
235        let bucket = Bucket::from_path("gcs://my-bucket/path/to/file").unwrap();
236        match bucket {
237            Bucket::GCS(name) => assert_eq!(name, "my-bucket"),
238            _ => panic!("Expected GCS bucket"),
239        }
240    }
241
242    #[test]
243    fn test_from_path_gs() {
244        let bucket = Bucket::from_path("gs://my-bucket/path/to/file").unwrap();
245        match bucket {
246            Bucket::GCS(name) => assert_eq!(name, "my-bucket"),
247            _ => panic!("Expected GCS bucket"),
248        }
249    }
250
251    #[test]
252    fn test_from_path_azure_dfs() {
253        let bucket =
254            Bucket::from_path("https://mystorageaccount.dfs.core.windows.net/container/path")
255                .unwrap();
256        match bucket {
257            Bucket::Azure(name) => assert_eq!(name, "container"),
258            _ => panic!("Expected Azure bucket"),
259        }
260    }
261
262    #[test]
263    fn test_from_path_azure_blob() {
264        let bucket =
265            Bucket::from_path("https://mystorageaccount.blob.core.windows.net/container/path")
266                .unwrap();
267        match bucket {
268            Bucket::Azure(name) => assert_eq!(name, "container"),
269            _ => panic!("Expected Azure bucket"),
270        }
271    }
272
273    #[test]
274    fn test_from_path_azure_fabric_dfs() {
275        let bucket =
276            Bucket::from_path("https://mystorageaccount.dfs.fabric.microsoft.com/container/path")
277                .unwrap();
278        match bucket {
279            Bucket::Azure(name) => assert_eq!(name, "container"),
280            _ => panic!("Expected Azure bucket"),
281        }
282    }
283
284    #[test]
285    fn test_from_path_azure_fabric_blob() {
286        let bucket =
287            Bucket::from_path("https://mystorageaccount.blob.fabric.microsoft.com/container/path")
288                .unwrap();
289        match bucket {
290            Bucket::Azure(name) => assert_eq!(name, "container"),
291            _ => panic!("Expected Azure bucket"),
292        }
293    }
294
295    #[test]
296    fn test_from_path_local() {
297        let bucket = Bucket::from_path("/local/path/to/file").unwrap();
298        match bucket {
299            Bucket::Local => {}
300            _ => panic!("Expected Local bucket"),
301        }
302    }
303
304    #[test]
305    fn test_from_path_https_non_azure() {
306        let bucket = Bucket::from_path("https://example.com/path").unwrap();
307        match bucket {
308            Bucket::Local => {}
309            _ => panic!("Expected Local bucket"),
310        }
311    }
312}