Skip to main content

floe_core/io/storage/
mod.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::{config, ConfigError, FloeResult};
5
6pub mod adls;
7pub mod extensions;
8pub mod gcs;
9pub mod inputs;
10pub mod local;
11pub mod object_store;
12pub mod output;
13pub mod paths;
14pub mod planner;
15pub mod s3;
16pub mod target;
17
18pub use planner::{filter_by_suffixes, join_prefix, normalize_separators, stable_sort_refs};
19pub use target::Target;
20
21pub use planner::ObjectRef;
22
23pub trait StorageClient: Send + Sync {
24    fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>>;
25    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf>;
26    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()>;
27    fn resolve_uri(&self, path: &str) -> FloeResult<String>;
28    fn delete(&self, uri: &str) -> FloeResult<()>;
29}
30
31pub struct CloudClient {
32    clients: HashMap<String, Box<dyn StorageClient>>,
33}
34
35impl CloudClient {
36    pub fn new() -> Self {
37        Self {
38            clients: HashMap::new(),
39        }
40    }
41
42    pub fn client_for<'a>(
43        &'a mut self,
44        resolver: &config::StorageResolver,
45        storage: &str,
46        entity: &config::EntityConfig,
47    ) -> FloeResult<&'a mut dyn StorageClient> {
48        if !self.clients.contains_key(storage) {
49            let definition = resolver.definition(storage).ok_or_else(|| {
50                Box::new(ConfigError(format!(
51                    "entity.name={} storage {} is not defined",
52                    entity.name, storage
53                ))) as Box<dyn std::error::Error + Send + Sync>
54            })?;
55            let client: Box<dyn StorageClient> = match definition.fs_type.as_str() {
56                "local" => Box::new(local::LocalClient::new()),
57                "s3" => {
58                    let bucket = definition.bucket.clone().ok_or_else(|| {
59                        Box::new(ConfigError(format!(
60                            "storage {} requires bucket for type s3",
61                            definition.name
62                        ))) as Box<dyn std::error::Error + Send + Sync>
63                    })?;
64                    Box::new(s3::S3Client::new(bucket, definition.region.as_deref())?)
65                }
66                "adls" => Box::new(adls::AdlsClient::new(&definition)?),
67                "gcs" => {
68                    let bucket = definition.bucket.clone().ok_or_else(|| {
69                        Box::new(ConfigError(format!(
70                            "storage {} requires bucket for type gcs",
71                            definition.name
72                        ))) as Box<dyn std::error::Error + Send + Sync>
73                    })?;
74                    Box::new(gcs::GcsClient::new(bucket)?)
75                }
76                other => {
77                    return Err(Box::new(ConfigError(format!(
78                        "storage type {} is unsupported",
79                        other
80                    ))))
81                }
82            };
83            self.clients.insert(storage.to_string(), client);
84        }
85        Ok(self
86            .clients
87            .get_mut(storage)
88            .expect("storage client inserted")
89            .as_mut())
90    }
91}
92
93impl Default for CloudClient {
94    fn default() -> Self {
95        Self::new()
96    }
97}