Skip to main content

iceberg_rust/object_store/
mod.rs

1/*!
2Defining the [Bucket] struct for specifying buckets for the ObjectStore.
3*/
4
5use std::{fmt::Display, path::Path, str::FromStr, sync::Arc};
6
7use object_store::{
8    aws::{AmazonS3Builder, AmazonS3ConfigKey, S3CopyIfNotExists},
9    azure::{AzureConfigKey, MicrosoftAzureBuilder},
10    gcp::{GoogleCloudStorageBuilder, GoogleConfigKey},
11    local::LocalFileSystem,
12    memory::InMemory,
13    ObjectStore,
14};
15
16use crate::error::Error;
17
18pub mod parse;
19pub mod store;
20
21/// Azure endpoint types
22#[derive(Debug)]
23pub enum AzureEndpointType {
24    /// Distributed File System
25    DFS,
26    /// Binary Large Object
27    Blob,
28}
29
30impl Display for AzureEndpointType {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            AzureEndpointType::DFS => write!(f, "dfs"),
34            AzureEndpointType::Blob => write!(f, "blob"),
35        }
36    }
37}
38
39/// Type for buckets for different cloud providers
40#[derive(Debug)]
41pub enum Bucket<'s> {
42    /// Aws S3 bucket
43    S3(&'s str),
44    /// GCS bucket
45    GCS(&'s str),
46    /// Azure
47    Azure {
48        /// Account name
49        account: &'s str,
50        /// Container name
51        container: &'s str,
52        /// Endpoint type
53        endpoint_type: AzureEndpointType,
54    },
55    /// No bucket
56    Local,
57}
58
59impl Display for Bucket<'_> {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            Bucket::S3(s) => write!(f, "s3://{s}"),
63            Bucket::GCS(s) => write!(f, "gs://{s}"),
64            Bucket::Azure {
65                account,
66                container,
67                endpoint_type,
68            } => {
69                write!(
70                    f,
71                    "abfss://{container}@{account}.{endpoint_type}.core.windows.net"
72                )
73            }
74            Bucket::Local => write!(f, ""),
75        }
76    }
77}
78
79impl Bucket<'_> {
80    /// Get the bucket and cloud provider from the location string
81    pub fn from_path(path: &str) -> Result<Bucket<'_>, Error> {
82        let extract_prefix = |path: &str| {
83            path.split("://")
84                .next()
85                .map(|p| format!("{}://", p))
86                .unwrap_or_default()
87        };
88
89        if path.starts_with("s3://") || path.starts_with("s3a://") {
90            let prefix = extract_prefix(path);
91            path.trim_start_matches(prefix.as_str())
92                .split('/')
93                .next()
94                .map(Bucket::S3)
95                .ok_or(Error::NotFound(format!("Bucket in path {path}")))
96        } else if path.starts_with("gcs://") || path.starts_with("gs://") {
97            let prefix = extract_prefix(path);
98            path.trim_start_matches(prefix.as_str())
99                .split('/')
100                .next()
101                .map(Bucket::GCS)
102                .ok_or(Error::NotFound(format!("Bucket in path {path}")))
103        } else if path.starts_with("az://")
104            || path.starts_with("adl://")
105            || path.starts_with("azure://")
106        {
107            // Format: az://container/path or adl://container/path or azure://container/path
108            let prefix = extract_prefix(path);
109            let container = path
110                .trim_start_matches(prefix.as_str())
111                .split('/')
112                .next()
113                .ok_or(Error::NotFound(format!("Container in path {path}")))?;
114            Ok(Bucket::Azure {
115                account: "",
116                container,
117                endpoint_type: AzureEndpointType::DFS,
118            })
119        } else if path.starts_with("abfs://") || path.starts_with("abfss://") {
120            let prefix = extract_prefix(path);
121            let remainder = path.trim_start_matches(prefix.as_str());
122
123            if remainder.contains('@') {
124                // Format: abfs[s]://file_system@account_name.dfs.core.windows.net/path
125                let container = remainder
126                    .split('@')
127                    .next()
128                    .ok_or(Error::NotFound(format!("Container in path {path}")))?;
129                let account = remainder
130                    .split('@')
131                    .nth(1)
132                    .and_then(|s| s.split('.').next())
133                    .ok_or(Error::NotFound(format!("Account in path {path}")))?;
134                Ok(Bucket::Azure {
135                    account,
136                    container,
137                    endpoint_type: AzureEndpointType::DFS,
138                })
139            } else {
140                // Format: abfs[s]://container/path
141                let container = remainder
142                    .split('/')
143                    .next()
144                    .ok_or(Error::NotFound(format!("Container in path {path}")))?;
145                Ok(Bucket::Azure {
146                    account: "",
147                    container,
148                    endpoint_type: AzureEndpointType::DFS,
149                })
150            }
151        } else if path.starts_with("https://")
152            && (path.contains("dfs.core.windows.net")
153                || path.contains("blob.core.windows.net")
154                || path.contains("dfs.fabric.microsoft.com")
155                || path.contains("blob.fabric.microsoft.com"))
156        {
157            // Format: https://account.dfs.core.windows.net/container/path
158            let remainder = path.trim_start_matches("https://");
159            let account = remainder
160                .split('.')
161                .next()
162                .ok_or(Error::NotFound(format!("Account in path {path}")))?;
163            let container = remainder.split('/').nth(1).unwrap_or("");
164
165            let endpoint_type = if remainder.contains("blob.") {
166                AzureEndpointType::Blob
167            } else {
168                AzureEndpointType::DFS
169            };
170
171            Ok(Bucket::Azure {
172                account,
173                container,
174                endpoint_type,
175            })
176        } else {
177            Ok(Bucket::Local)
178        }
179    }
180}
181
182/// A wrapper for ObjectStore builders that can be used as a template to generate an ObjectStore given a particular bucket.
183#[derive(Debug, Clone)]
184pub enum ObjectStoreBuilder {
185    /// Microsoft Azure builder
186    Azure(Box<MicrosoftAzureBuilder>),
187    /// AWS s3 builder
188    S3(Box<AmazonS3Builder>),
189    /// Google Cloud Storage builder
190    GCS(Box<GoogleCloudStorageBuilder>),
191    /// Filesystem builder
192    Filesystem(Arc<LocalFileSystem>),
193    /// In memory builder
194    Memory(Arc<InMemory>),
195}
196
197/// Configuration keys for [ObjectStoreBuilder]
198pub enum ConfigKey {
199    /// Configuration keys for Microsoft Azure
200    Azure(AzureConfigKey),
201    /// Configuration keys for AWS S3
202    AWS(AmazonS3ConfigKey),
203    /// Configuration keys for GCS
204    GCS(GoogleConfigKey),
205}
206
207impl FromStr for ConfigKey {
208    type Err = object_store::Error;
209    fn from_str(s: &str) -> Result<Self, Self::Err> {
210        if let Ok(x) = s.parse() {
211            return Ok(ConfigKey::Azure(x));
212        };
213        if let Ok(x) = s.parse() {
214            return Ok(ConfigKey::AWS(x));
215        };
216        if let Ok(x) = s.parse() {
217            return Ok(ConfigKey::GCS(x));
218        };
219        Err(object_store::Error::UnknownConfigurationKey {
220            store: "",
221            key: s.to_string(),
222        })
223    }
224}
225impl ObjectStoreBuilder {
226    /// Create a new Microsoft Azure ObjectStoreBuilder
227    pub fn azure() -> Self {
228        ObjectStoreBuilder::Azure(Box::new(MicrosoftAzureBuilder::from_env()))
229    }
230    /// Create new AWS S3 Object Store builder
231    pub fn s3() -> Self {
232        ObjectStoreBuilder::S3(Box::new(AmazonS3Builder::from_env()))
233    }
234    /// Create new AWS S3 Object Store builder
235    pub fn gcs() -> Self {
236        ObjectStoreBuilder::GCS(Box::new(GoogleCloudStorageBuilder::from_env()))
237    }
238    /// Create a new FileSystem ObjectStoreBuilder
239    pub fn filesystem(prefix: impl AsRef<Path>) -> Self {
240        ObjectStoreBuilder::Filesystem(Arc::new(LocalFileSystem::new_with_prefix(prefix).unwrap()))
241    }
242    /// Create a new InMemory ObjectStoreBuilder
243    pub fn memory() -> Self {
244        ObjectStoreBuilder::Memory(Arc::new(InMemory::new()))
245    }
246    /// Set config value for builder
247    pub fn with_config(
248        self,
249        key: impl Into<String>,
250        value: impl Into<String>,
251    ) -> Result<Self, Error> {
252        match self {
253            ObjectStoreBuilder::Azure(azure) => {
254                let key: AzureConfigKey = key.into().parse()?;
255                Ok(ObjectStoreBuilder::Azure(Box::new(
256                    azure.with_config(key, value),
257                )))
258            }
259            ObjectStoreBuilder::S3(aws) => {
260                let key: AmazonS3ConfigKey = key.into().parse()?;
261                Ok(ObjectStoreBuilder::S3(Box::new(
262                    aws.with_config(key, value),
263                )))
264            }
265            ObjectStoreBuilder::GCS(gcs) => {
266                let key: GoogleConfigKey = key.into().parse()?;
267                Ok(ObjectStoreBuilder::GCS(Box::new(
268                    gcs.with_config(key, value),
269                )))
270            }
271            x => Ok(x),
272        }
273    }
274    /// Create objectstore from template
275    pub fn build(&self, bucket: Bucket) -> Result<Arc<dyn ObjectStore>, Error> {
276        match (bucket, self) {
277            (
278                Bucket::Azure {
279                    account, container, ..
280                },
281                Self::Azure(builder),
282            ) => Ok::<_, Error>(Arc::new(
283                (**builder)
284                    .clone()
285                    .with_account(account)
286                    .with_container_name(container)
287                    .build()
288                    .map_err(Error::from)?,
289            )),
290            (Bucket::S3(bucket), Self::S3(builder)) => Ok::<_, Error>(Arc::new(
291                (**builder)
292                    .clone()
293                    .with_bucket_name(bucket)
294                    .with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
295                    .build()
296                    .map_err(Error::from)?,
297            )),
298            (Bucket::GCS(bucket), Self::GCS(builder)) => Ok::<_, Error>(Arc::new(
299                (**builder)
300                    .clone()
301                    .with_bucket_name(bucket)
302                    .build()
303                    .map_err(Error::from)?,
304            )),
305            (Bucket::Local, Self::Filesystem(object_store)) => Ok(object_store.clone()),
306            (Bucket::Local, Self::Memory(object_store)) => Ok(object_store.clone()),
307            _ => Err(Error::NotSupported("Object store protocol".to_owned())),
308        }
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315
316    #[test]
317    fn test_from_path_s3() {
318        let bucket = Bucket::from_path("s3://my-bucket/path/to/file").unwrap();
319        match bucket {
320            Bucket::S3(name) => assert_eq!(name, "my-bucket"),
321            _ => panic!("Expected S3 bucket"),
322        }
323    }
324
325    #[test]
326    fn test_from_path_s3a() {
327        let bucket = Bucket::from_path("s3a://my-bucket/path/to/file").unwrap();
328        match bucket {
329            Bucket::S3(name) => assert_eq!(name, "my-bucket"),
330            _ => panic!("Expected S3 bucket"),
331        }
332    }
333
334    #[test]
335    fn test_from_path_gcs() {
336        let bucket = Bucket::from_path("gcs://my-bucket/path/to/file").unwrap();
337        match bucket {
338            Bucket::GCS(name) => assert_eq!(name, "my-bucket"),
339            _ => panic!("Expected GCS bucket"),
340        }
341    }
342
343    #[test]
344    fn test_from_path_gs() {
345        let bucket = Bucket::from_path("gs://my-bucket/path/to/file").unwrap();
346        match bucket {
347            Bucket::GCS(name) => assert_eq!(name, "my-bucket"),
348            _ => panic!("Expected GCS bucket"),
349        }
350    }
351
352    #[test]
353    fn test_from_path_azure_abfs_simple() {
354        let bucket = Bucket::from_path("abfs://container/path").unwrap();
355        match bucket {
356            Bucket::Azure {
357                account,
358                container,
359                endpoint_type,
360            } => {
361                assert_eq!(account, "");
362                assert_eq!(container, "container");
363                assert!(matches!(endpoint_type, AzureEndpointType::DFS));
364            }
365            _ => panic!("Expected Azure bucket"),
366        }
367    }
368
369    #[test]
370    fn test_from_path_azure_abfss_simple() {
371        let bucket = Bucket::from_path("abfss://container/path").unwrap();
372        match bucket {
373            Bucket::Azure {
374                account,
375                container,
376                endpoint_type,
377            } => {
378                assert_eq!(account, "");
379                assert_eq!(container, "container");
380                assert!(matches!(endpoint_type, AzureEndpointType::DFS));
381            }
382            _ => panic!("Expected Azure bucket"),
383        }
384    }
385
386    #[test]
387    fn test_from_path_azure_abfs_with_account() {
388        let bucket = Bucket::from_path(
389            "abfs://myfilesystem@mystorageaccount.dfs.core.windows.net/path/to/file",
390        )
391        .unwrap();
392        match bucket {
393            Bucket::Azure {
394                account,
395                container,
396                endpoint_type,
397            } => {
398                assert_eq!(account, "mystorageaccount");
399                assert_eq!(container, "myfilesystem");
400                assert!(matches!(endpoint_type, AzureEndpointType::DFS));
401            }
402            _ => panic!("Expected Azure bucket"),
403        }
404    }
405
406    #[test]
407    fn test_from_path_azure_abfss_with_account() {
408        let bucket = Bucket::from_path(
409            "abfss://myfilesystem@mystorageaccount.dfs.core.windows.net/path/to/file",
410        )
411        .unwrap();
412        match bucket {
413            Bucket::Azure {
414                account,
415                container,
416                endpoint_type,
417            } => {
418                assert_eq!(account, "mystorageaccount");
419                assert_eq!(container, "myfilesystem");
420                assert!(matches!(endpoint_type, AzureEndpointType::DFS));
421            }
422            _ => panic!("Expected Azure bucket"),
423        }
424    }
425
426    #[test]
427    fn test_from_path_azure_abfs_fabric() {
428        let bucket = Bucket::from_path(
429            "abfs://myfilesystem@mystorageaccount.dfs.fabric.microsoft.com/path/to/file",
430        )
431        .unwrap();
432        match bucket {
433            Bucket::Azure {
434                account,
435                container,
436                endpoint_type,
437            } => {
438                assert_eq!(account, "mystorageaccount");
439                assert_eq!(container, "myfilesystem");
440                assert!(matches!(endpoint_type, AzureEndpointType::DFS));
441            }
442            _ => panic!("Expected Azure bucket"),
443        }
444    }
445
446    #[test]
447    fn test_from_path_azure_abfss_fabric() {
448        let bucket = Bucket::from_path(
449            "abfss://myfilesystem@mystorageaccount.dfs.fabric.microsoft.com/path/to/file",
450        )
451        .unwrap();
452        match bucket {
453            Bucket::Azure {
454                account,
455                container,
456                endpoint_type,
457            } => {
458                assert_eq!(account, "mystorageaccount");
459                assert_eq!(container, "myfilesystem");
460                assert!(matches!(endpoint_type, AzureEndpointType::DFS));
461            }
462            _ => panic!("Expected Azure bucket"),
463        }
464    }
465
466    #[test]
467    fn test_from_path_azure_az() {
468        let bucket = Bucket::from_path("az://container/path/to/file").unwrap();
469        match bucket {
470            Bucket::Azure {
471                account,
472                container,
473                endpoint_type,
474            } => {
475                assert_eq!(account, "");
476                assert_eq!(container, "container");
477                assert!(matches!(endpoint_type, AzureEndpointType::DFS));
478            }
479            _ => panic!("Expected Azure bucket"),
480        }
481    }
482
483    #[test]
484    fn test_from_path_azure_adl() {
485        let bucket = Bucket::from_path("adl://container/path/to/file").unwrap();
486        match bucket {
487            Bucket::Azure {
488                account,
489                container,
490                endpoint_type,
491            } => {
492                assert_eq!(account, "");
493                assert_eq!(container, "container");
494                assert!(matches!(endpoint_type, AzureEndpointType::DFS));
495            }
496            _ => panic!("Expected Azure bucket"),
497        }
498    }
499
500    #[test]
501    fn test_from_path_azure_azure_scheme() {
502        let bucket = Bucket::from_path("azure://container/path/to/file").unwrap();
503        match bucket {
504            Bucket::Azure {
505                account,
506                container,
507                endpoint_type,
508            } => {
509                assert_eq!(account, "");
510                assert_eq!(container, "container");
511                assert!(matches!(endpoint_type, AzureEndpointType::DFS));
512            }
513            _ => panic!("Expected Azure bucket"),
514        }
515    }
516
517    #[test]
518    fn test_from_path_azure_https_dfs_core() {
519        let bucket = Bucket::from_path("https://mystorageaccount.dfs.core.windows.net").unwrap();
520        match bucket {
521            Bucket::Azure {
522                account,
523                container,
524                endpoint_type,
525            } => {
526                assert_eq!(account, "mystorageaccount");
527                assert_eq!(container, "");
528                assert!(matches!(endpoint_type, AzureEndpointType::DFS));
529            }
530            _ => panic!("Expected Azure bucket"),
531        }
532    }
533
534    #[test]
535    fn test_from_path_azure_https_blob_core() {
536        let bucket = Bucket::from_path("https://mystorageaccount.blob.core.windows.net").unwrap();
537        match bucket {
538            Bucket::Azure {
539                account,
540                container,
541                endpoint_type,
542            } => {
543                assert_eq!(account, "mystorageaccount");
544                assert_eq!(container, "");
545                assert!(matches!(endpoint_type, AzureEndpointType::Blob));
546            }
547            _ => panic!("Expected Azure bucket"),
548        }
549    }
550
551    #[test]
552    fn test_from_path_azure_https_blob_core_with_container() {
553        let bucket =
554            Bucket::from_path("https://mystorageaccount.blob.core.windows.net/container").unwrap();
555        match bucket {
556            Bucket::Azure {
557                account,
558                container,
559                endpoint_type,
560            } => {
561                assert_eq!(account, "mystorageaccount");
562                assert_eq!(container, "container");
563                assert!(matches!(endpoint_type, AzureEndpointType::Blob));
564            }
565            _ => panic!("Expected Azure bucket"),
566        }
567    }
568
569    #[test]
570    fn test_from_path_azure_https_dfs_fabric() {
571        let bucket =
572            Bucket::from_path("https://mystorageaccount.dfs.fabric.microsoft.com").unwrap();
573        match bucket {
574            Bucket::Azure {
575                account,
576                container,
577                endpoint_type,
578            } => {
579                assert_eq!(account, "mystorageaccount");
580                assert_eq!(container, "");
581                assert!(matches!(endpoint_type, AzureEndpointType::DFS));
582            }
583            _ => panic!("Expected Azure bucket"),
584        }
585    }
586
587    #[test]
588    fn test_from_path_azure_https_dfs_fabric_with_container() {
589        let bucket =
590            Bucket::from_path("https://mystorageaccount.dfs.fabric.microsoft.com/container")
591                .unwrap();
592        match bucket {
593            Bucket::Azure {
594                account,
595                container,
596                endpoint_type,
597            } => {
598                assert_eq!(account, "mystorageaccount");
599                assert_eq!(container, "container");
600                assert!(matches!(endpoint_type, AzureEndpointType::DFS));
601            }
602            _ => panic!("Expected Azure bucket"),
603        }
604    }
605
606    #[test]
607    fn test_from_path_azure_https_blob_fabric() {
608        let bucket =
609            Bucket::from_path("https://mystorageaccount.blob.fabric.microsoft.com").unwrap();
610        match bucket {
611            Bucket::Azure {
612                account,
613                container,
614                endpoint_type,
615            } => {
616                assert_eq!(account, "mystorageaccount");
617                assert_eq!(container, "");
618                assert!(matches!(endpoint_type, AzureEndpointType::Blob));
619            }
620            _ => panic!("Expected Azure bucket"),
621        }
622    }
623
624    #[test]
625    fn test_from_path_azure_https_blob_fabric_with_container() {
626        let bucket =
627            Bucket::from_path("https://mystorageaccount.blob.fabric.microsoft.com/container")
628                .unwrap();
629        match bucket {
630            Bucket::Azure {
631                account,
632                container,
633                endpoint_type,
634            } => {
635                assert_eq!(account, "mystorageaccount");
636                assert_eq!(container, "container");
637                assert!(matches!(endpoint_type, AzureEndpointType::Blob));
638            }
639            _ => panic!("Expected Azure bucket"),
640        }
641    }
642
643    #[test]
644    fn test_from_path_local() {
645        let bucket = Bucket::from_path("/local/path/to/file").unwrap();
646        match bucket {
647            Bucket::Local => {}
648            _ => panic!("Expected Local bucket"),
649        }
650    }
651
652    #[test]
653    fn test_from_path_https_non_azure() {
654        let bucket = Bucket::from_path("https://example.com/path").unwrap();
655        match bucket {
656            Bucket::Local => {}
657            _ => panic!("Expected Local bucket"),
658        }
659    }
660}