1use std::{fmt::Display, path::Path, str::FromStr, sync::Arc};
6
7use object_store::{
8 aws::{AmazonS3Builder, AmazonS3ConfigKey, S3CopyIfNotExists},
9 azure::{AzureConfigKey, MicrosoftAzureBuilder},
10 gcp::{GoogleCloudStorageBuilder, GoogleConfigKey},
11 local::LocalFileSystem,
12 memory::InMemory,
13 ObjectStore,
14};
15
16use crate::error::Error;
17
18pub mod parse;
19pub mod store;
20
21#[derive(Debug)]
23pub enum AzureEndpointType {
24 DFS,
26 Blob,
28}
29
30impl Display for AzureEndpointType {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 AzureEndpointType::DFS => write!(f, "dfs"),
34 AzureEndpointType::Blob => write!(f, "blob"),
35 }
36 }
37}
38
39#[derive(Debug)]
41pub enum Bucket<'s> {
42 S3(&'s str),
44 GCS(&'s str),
46 Azure {
48 account: &'s str,
50 container: &'s str,
52 endpoint_type: AzureEndpointType,
54 },
55 Local,
57}
58
59impl Display for Bucket<'_> {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 match self {
62 Bucket::S3(s) => write!(f, "s3://{s}"),
63 Bucket::GCS(s) => write!(f, "gs://{s}"),
64 Bucket::Azure {
65 account,
66 container,
67 endpoint_type,
68 } => {
69 write!(
70 f,
71 "abfss://{container}@{account}.{endpoint_type}.core.windows.net"
72 )
73 }
74 Bucket::Local => write!(f, ""),
75 }
76 }
77}
78
79impl Bucket<'_> {
80 pub fn from_path(path: &str) -> Result<Bucket<'_>, Error> {
82 let extract_prefix = |path: &str| {
83 path.split("://")
84 .next()
85 .map(|p| format!("{}://", p))
86 .unwrap_or_default()
87 };
88
89 if path.starts_with("s3://") || path.starts_with("s3a://") {
90 let prefix = extract_prefix(path);
91 path.trim_start_matches(prefix.as_str())
92 .split('/')
93 .next()
94 .map(Bucket::S3)
95 .ok_or(Error::NotFound(format!("Bucket in path {path}")))
96 } else if path.starts_with("gcs://") || path.starts_with("gs://") {
97 let prefix = extract_prefix(path);
98 path.trim_start_matches(prefix.as_str())
99 .split('/')
100 .next()
101 .map(Bucket::GCS)
102 .ok_or(Error::NotFound(format!("Bucket in path {path}")))
103 } else if path.starts_with("az://")
104 || path.starts_with("adl://")
105 || path.starts_with("azure://")
106 {
107 let prefix = extract_prefix(path);
109 let container = path
110 .trim_start_matches(prefix.as_str())
111 .split('/')
112 .next()
113 .ok_or(Error::NotFound(format!("Container in path {path}")))?;
114 Ok(Bucket::Azure {
115 account: "",
116 container,
117 endpoint_type: AzureEndpointType::DFS,
118 })
119 } else if path.starts_with("abfs://") || path.starts_with("abfss://") {
120 let prefix = extract_prefix(path);
121 let remainder = path.trim_start_matches(prefix.as_str());
122
123 if remainder.contains('@') {
124 let container = remainder
126 .split('@')
127 .next()
128 .ok_or(Error::NotFound(format!("Container in path {path}")))?;
129 let account = remainder
130 .split('@')
131 .nth(1)
132 .and_then(|s| s.split('.').next())
133 .ok_or(Error::NotFound(format!("Account in path {path}")))?;
134 Ok(Bucket::Azure {
135 account,
136 container,
137 endpoint_type: AzureEndpointType::DFS,
138 })
139 } else {
140 let container = remainder
142 .split('/')
143 .next()
144 .ok_or(Error::NotFound(format!("Container in path {path}")))?;
145 Ok(Bucket::Azure {
146 account: "",
147 container,
148 endpoint_type: AzureEndpointType::DFS,
149 })
150 }
151 } else if path.starts_with("https://")
152 && (path.contains("dfs.core.windows.net")
153 || path.contains("blob.core.windows.net")
154 || path.contains("dfs.fabric.microsoft.com")
155 || path.contains("blob.fabric.microsoft.com"))
156 {
157 let remainder = path.trim_start_matches("https://");
159 let account = remainder
160 .split('.')
161 .next()
162 .ok_or(Error::NotFound(format!("Account in path {path}")))?;
163 let container = remainder.split('/').nth(1).unwrap_or("");
164
165 let endpoint_type = if remainder.contains("blob.") {
166 AzureEndpointType::Blob
167 } else {
168 AzureEndpointType::DFS
169 };
170
171 Ok(Bucket::Azure {
172 account,
173 container,
174 endpoint_type,
175 })
176 } else {
177 Ok(Bucket::Local)
178 }
179 }
180}
181
182#[derive(Debug, Clone)]
184pub enum ObjectStoreBuilder {
185 Azure(Box<MicrosoftAzureBuilder>),
187 S3(Box<AmazonS3Builder>),
189 GCS(Box<GoogleCloudStorageBuilder>),
191 Filesystem(Arc<LocalFileSystem>),
193 Memory(Arc<InMemory>),
195}
196
197pub enum ConfigKey {
199 Azure(AzureConfigKey),
201 AWS(AmazonS3ConfigKey),
203 GCS(GoogleConfigKey),
205}
206
207impl FromStr for ConfigKey {
208 type Err = object_store::Error;
209 fn from_str(s: &str) -> Result<Self, Self::Err> {
210 if let Ok(x) = s.parse() {
211 return Ok(ConfigKey::Azure(x));
212 };
213 if let Ok(x) = s.parse() {
214 return Ok(ConfigKey::AWS(x));
215 };
216 if let Ok(x) = s.parse() {
217 return Ok(ConfigKey::GCS(x));
218 };
219 Err(object_store::Error::UnknownConfigurationKey {
220 store: "",
221 key: s.to_string(),
222 })
223 }
224}
225impl ObjectStoreBuilder {
226 pub fn azure() -> Self {
228 ObjectStoreBuilder::Azure(Box::new(MicrosoftAzureBuilder::from_env()))
229 }
230 pub fn s3() -> Self {
232 ObjectStoreBuilder::S3(Box::new(AmazonS3Builder::from_env()))
233 }
234 pub fn gcs() -> Self {
236 ObjectStoreBuilder::GCS(Box::new(GoogleCloudStorageBuilder::from_env()))
237 }
238 pub fn filesystem(prefix: impl AsRef<Path>) -> Self {
240 ObjectStoreBuilder::Filesystem(Arc::new(LocalFileSystem::new_with_prefix(prefix).unwrap()))
241 }
242 pub fn memory() -> Self {
244 ObjectStoreBuilder::Memory(Arc::new(InMemory::new()))
245 }
246 pub fn with_config(
248 self,
249 key: impl Into<String>,
250 value: impl Into<String>,
251 ) -> Result<Self, Error> {
252 match self {
253 ObjectStoreBuilder::Azure(azure) => {
254 let key: AzureConfigKey = key.into().parse()?;
255 Ok(ObjectStoreBuilder::Azure(Box::new(
256 azure.with_config(key, value),
257 )))
258 }
259 ObjectStoreBuilder::S3(aws) => {
260 let key: AmazonS3ConfigKey = key.into().parse()?;
261 Ok(ObjectStoreBuilder::S3(Box::new(
262 aws.with_config(key, value),
263 )))
264 }
265 ObjectStoreBuilder::GCS(gcs) => {
266 let key: GoogleConfigKey = key.into().parse()?;
267 Ok(ObjectStoreBuilder::GCS(Box::new(
268 gcs.with_config(key, value),
269 )))
270 }
271 x => Ok(x),
272 }
273 }
274 pub fn build(&self, bucket: Bucket) -> Result<Arc<dyn ObjectStore>, Error> {
276 match (bucket, self) {
277 (
278 Bucket::Azure {
279 account, container, ..
280 },
281 Self::Azure(builder),
282 ) => Ok::<_, Error>(Arc::new(
283 (**builder)
284 .clone()
285 .with_account(account)
286 .with_container_name(container)
287 .build()
288 .map_err(Error::from)?,
289 )),
290 (Bucket::S3(bucket), Self::S3(builder)) => Ok::<_, Error>(Arc::new(
291 (**builder)
292 .clone()
293 .with_bucket_name(bucket)
294 .with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
295 .build()
296 .map_err(Error::from)?,
297 )),
298 (Bucket::GCS(bucket), Self::GCS(builder)) => Ok::<_, Error>(Arc::new(
299 (**builder)
300 .clone()
301 .with_bucket_name(bucket)
302 .build()
303 .map_err(Error::from)?,
304 )),
305 (Bucket::Local, Self::Filesystem(object_store)) => Ok(object_store.clone()),
306 (Bucket::Local, Self::Memory(object_store)) => Ok(object_store.clone()),
307 _ => Err(Error::NotSupported("Object store protocol".to_owned())),
308 }
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315
316 #[test]
317 fn test_from_path_s3() {
318 let bucket = Bucket::from_path("s3://my-bucket/path/to/file").unwrap();
319 match bucket {
320 Bucket::S3(name) => assert_eq!(name, "my-bucket"),
321 _ => panic!("Expected S3 bucket"),
322 }
323 }
324
325 #[test]
326 fn test_from_path_s3a() {
327 let bucket = Bucket::from_path("s3a://my-bucket/path/to/file").unwrap();
328 match bucket {
329 Bucket::S3(name) => assert_eq!(name, "my-bucket"),
330 _ => panic!("Expected S3 bucket"),
331 }
332 }
333
334 #[test]
335 fn test_from_path_gcs() {
336 let bucket = Bucket::from_path("gcs://my-bucket/path/to/file").unwrap();
337 match bucket {
338 Bucket::GCS(name) => assert_eq!(name, "my-bucket"),
339 _ => panic!("Expected GCS bucket"),
340 }
341 }
342
343 #[test]
344 fn test_from_path_gs() {
345 let bucket = Bucket::from_path("gs://my-bucket/path/to/file").unwrap();
346 match bucket {
347 Bucket::GCS(name) => assert_eq!(name, "my-bucket"),
348 _ => panic!("Expected GCS bucket"),
349 }
350 }
351
352 #[test]
353 fn test_from_path_azure_abfs_simple() {
354 let bucket = Bucket::from_path("abfs://container/path").unwrap();
355 match bucket {
356 Bucket::Azure {
357 account,
358 container,
359 endpoint_type,
360 } => {
361 assert_eq!(account, "");
362 assert_eq!(container, "container");
363 assert!(matches!(endpoint_type, AzureEndpointType::DFS));
364 }
365 _ => panic!("Expected Azure bucket"),
366 }
367 }
368
369 #[test]
370 fn test_from_path_azure_abfss_simple() {
371 let bucket = Bucket::from_path("abfss://container/path").unwrap();
372 match bucket {
373 Bucket::Azure {
374 account,
375 container,
376 endpoint_type,
377 } => {
378 assert_eq!(account, "");
379 assert_eq!(container, "container");
380 assert!(matches!(endpoint_type, AzureEndpointType::DFS));
381 }
382 _ => panic!("Expected Azure bucket"),
383 }
384 }
385
386 #[test]
387 fn test_from_path_azure_abfs_with_account() {
388 let bucket = Bucket::from_path(
389 "abfs://myfilesystem@mystorageaccount.dfs.core.windows.net/path/to/file",
390 )
391 .unwrap();
392 match bucket {
393 Bucket::Azure {
394 account,
395 container,
396 endpoint_type,
397 } => {
398 assert_eq!(account, "mystorageaccount");
399 assert_eq!(container, "myfilesystem");
400 assert!(matches!(endpoint_type, AzureEndpointType::DFS));
401 }
402 _ => panic!("Expected Azure bucket"),
403 }
404 }
405
406 #[test]
407 fn test_from_path_azure_abfss_with_account() {
408 let bucket = Bucket::from_path(
409 "abfss://myfilesystem@mystorageaccount.dfs.core.windows.net/path/to/file",
410 )
411 .unwrap();
412 match bucket {
413 Bucket::Azure {
414 account,
415 container,
416 endpoint_type,
417 } => {
418 assert_eq!(account, "mystorageaccount");
419 assert_eq!(container, "myfilesystem");
420 assert!(matches!(endpoint_type, AzureEndpointType::DFS));
421 }
422 _ => panic!("Expected Azure bucket"),
423 }
424 }
425
426 #[test]
427 fn test_from_path_azure_abfs_fabric() {
428 let bucket = Bucket::from_path(
429 "abfs://myfilesystem@mystorageaccount.dfs.fabric.microsoft.com/path/to/file",
430 )
431 .unwrap();
432 match bucket {
433 Bucket::Azure {
434 account,
435 container,
436 endpoint_type,
437 } => {
438 assert_eq!(account, "mystorageaccount");
439 assert_eq!(container, "myfilesystem");
440 assert!(matches!(endpoint_type, AzureEndpointType::DFS));
441 }
442 _ => panic!("Expected Azure bucket"),
443 }
444 }
445
446 #[test]
447 fn test_from_path_azure_abfss_fabric() {
448 let bucket = Bucket::from_path(
449 "abfss://myfilesystem@mystorageaccount.dfs.fabric.microsoft.com/path/to/file",
450 )
451 .unwrap();
452 match bucket {
453 Bucket::Azure {
454 account,
455 container,
456 endpoint_type,
457 } => {
458 assert_eq!(account, "mystorageaccount");
459 assert_eq!(container, "myfilesystem");
460 assert!(matches!(endpoint_type, AzureEndpointType::DFS));
461 }
462 _ => panic!("Expected Azure bucket"),
463 }
464 }
465
466 #[test]
467 fn test_from_path_azure_az() {
468 let bucket = Bucket::from_path("az://container/path/to/file").unwrap();
469 match bucket {
470 Bucket::Azure {
471 account,
472 container,
473 endpoint_type,
474 } => {
475 assert_eq!(account, "");
476 assert_eq!(container, "container");
477 assert!(matches!(endpoint_type, AzureEndpointType::DFS));
478 }
479 _ => panic!("Expected Azure bucket"),
480 }
481 }
482
483 #[test]
484 fn test_from_path_azure_adl() {
485 let bucket = Bucket::from_path("adl://container/path/to/file").unwrap();
486 match bucket {
487 Bucket::Azure {
488 account,
489 container,
490 endpoint_type,
491 } => {
492 assert_eq!(account, "");
493 assert_eq!(container, "container");
494 assert!(matches!(endpoint_type, AzureEndpointType::DFS));
495 }
496 _ => panic!("Expected Azure bucket"),
497 }
498 }
499
500 #[test]
501 fn test_from_path_azure_azure_scheme() {
502 let bucket = Bucket::from_path("azure://container/path/to/file").unwrap();
503 match bucket {
504 Bucket::Azure {
505 account,
506 container,
507 endpoint_type,
508 } => {
509 assert_eq!(account, "");
510 assert_eq!(container, "container");
511 assert!(matches!(endpoint_type, AzureEndpointType::DFS));
512 }
513 _ => panic!("Expected Azure bucket"),
514 }
515 }
516
517 #[test]
518 fn test_from_path_azure_https_dfs_core() {
519 let bucket = Bucket::from_path("https://mystorageaccount.dfs.core.windows.net").unwrap();
520 match bucket {
521 Bucket::Azure {
522 account,
523 container,
524 endpoint_type,
525 } => {
526 assert_eq!(account, "mystorageaccount");
527 assert_eq!(container, "");
528 assert!(matches!(endpoint_type, AzureEndpointType::DFS));
529 }
530 _ => panic!("Expected Azure bucket"),
531 }
532 }
533
534 #[test]
535 fn test_from_path_azure_https_blob_core() {
536 let bucket = Bucket::from_path("https://mystorageaccount.blob.core.windows.net").unwrap();
537 match bucket {
538 Bucket::Azure {
539 account,
540 container,
541 endpoint_type,
542 } => {
543 assert_eq!(account, "mystorageaccount");
544 assert_eq!(container, "");
545 assert!(matches!(endpoint_type, AzureEndpointType::Blob));
546 }
547 _ => panic!("Expected Azure bucket"),
548 }
549 }
550
551 #[test]
552 fn test_from_path_azure_https_blob_core_with_container() {
553 let bucket =
554 Bucket::from_path("https://mystorageaccount.blob.core.windows.net/container").unwrap();
555 match bucket {
556 Bucket::Azure {
557 account,
558 container,
559 endpoint_type,
560 } => {
561 assert_eq!(account, "mystorageaccount");
562 assert_eq!(container, "container");
563 assert!(matches!(endpoint_type, AzureEndpointType::Blob));
564 }
565 _ => panic!("Expected Azure bucket"),
566 }
567 }
568
569 #[test]
570 fn test_from_path_azure_https_dfs_fabric() {
571 let bucket =
572 Bucket::from_path("https://mystorageaccount.dfs.fabric.microsoft.com").unwrap();
573 match bucket {
574 Bucket::Azure {
575 account,
576 container,
577 endpoint_type,
578 } => {
579 assert_eq!(account, "mystorageaccount");
580 assert_eq!(container, "");
581 assert!(matches!(endpoint_type, AzureEndpointType::DFS));
582 }
583 _ => panic!("Expected Azure bucket"),
584 }
585 }
586
587 #[test]
588 fn test_from_path_azure_https_dfs_fabric_with_container() {
589 let bucket =
590 Bucket::from_path("https://mystorageaccount.dfs.fabric.microsoft.com/container")
591 .unwrap();
592 match bucket {
593 Bucket::Azure {
594 account,
595 container,
596 endpoint_type,
597 } => {
598 assert_eq!(account, "mystorageaccount");
599 assert_eq!(container, "container");
600 assert!(matches!(endpoint_type, AzureEndpointType::DFS));
601 }
602 _ => panic!("Expected Azure bucket"),
603 }
604 }
605
606 #[test]
607 fn test_from_path_azure_https_blob_fabric() {
608 let bucket =
609 Bucket::from_path("https://mystorageaccount.blob.fabric.microsoft.com").unwrap();
610 match bucket {
611 Bucket::Azure {
612 account,
613 container,
614 endpoint_type,
615 } => {
616 assert_eq!(account, "mystorageaccount");
617 assert_eq!(container, "");
618 assert!(matches!(endpoint_type, AzureEndpointType::Blob));
619 }
620 _ => panic!("Expected Azure bucket"),
621 }
622 }
623
624 #[test]
625 fn test_from_path_azure_https_blob_fabric_with_container() {
626 let bucket =
627 Bucket::from_path("https://mystorageaccount.blob.fabric.microsoft.com/container")
628 .unwrap();
629 match bucket {
630 Bucket::Azure {
631 account,
632 container,
633 endpoint_type,
634 } => {
635 assert_eq!(account, "mystorageaccount");
636 assert_eq!(container, "container");
637 assert!(matches!(endpoint_type, AzureEndpointType::Blob));
638 }
639 _ => panic!("Expected Azure bucket"),
640 }
641 }
642
643 #[test]
644 fn test_from_path_local() {
645 let bucket = Bucket::from_path("/local/path/to/file").unwrap();
646 match bucket {
647 Bucket::Local => {}
648 _ => panic!("Expected Local bucket"),
649 }
650 }
651
652 #[test]
653 fn test_from_path_https_non_azure() {
654 let bucket = Bucket::from_path("https://example.com/path").unwrap();
655 match bucket {
656 Bucket::Local => {}
657 _ => panic!("Expected Local bucket"),
658 }
659 }
660}