Skip to main content

floe_core/io/storage/
mod.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::{config, ConfigError, FloeResult};
5
6pub mod adls;
7pub mod archive;
8pub mod extensions;
9pub mod gcs;
10pub mod inputs;
11pub mod local;
12pub mod object_store;
13pub mod ops;
14pub mod output;
15pub mod paths;
16pub mod planner;
17pub mod s3;
18pub mod target;
19
20pub use archive::archive_input_file;
21pub use planner::{
22    filter_by_suffixes, join_prefix, normalize_separators, stable_sort_refs, temp_path_for_key,
23};
24pub use target::Target;
25
26pub use planner::ObjectRef;
27
28pub trait StorageClient: Send + Sync {
29    fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>>;
30    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf>;
31    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()>;
32    fn resolve_uri(&self, path: &str) -> FloeResult<String>;
33    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()>;
34    fn delete_object(&self, uri: &str) -> FloeResult<()>;
35    fn exists(&self, uri: &str) -> FloeResult<bool>;
36}
37
38pub struct CloudClient {
39    clients: HashMap<String, Box<dyn StorageClient>>,
40}
41
42impl CloudClient {
43    pub fn new() -> Self {
44        Self {
45            clients: HashMap::new(),
46        }
47    }
48
49    pub fn client_for<'a>(
50        &'a mut self,
51        resolver: &config::StorageResolver,
52        storage: &str,
53        entity: &config::EntityConfig,
54    ) -> FloeResult<&'a mut dyn StorageClient> {
55        let context = format!("entity.name={}", entity.name);
56        self.client_for_context(resolver, storage, &context)
57    }
58
59    pub fn client_for_context<'a>(
60        &'a mut self,
61        resolver: &config::StorageResolver,
62        storage: &str,
63        context: &str,
64    ) -> FloeResult<&'a mut dyn StorageClient> {
65        if !self.clients.contains_key(storage) {
66            let definition = resolver.definition(storage).ok_or_else(|| {
67                Box::new(ConfigError(format!(
68                    "{} storage {} is not defined",
69                    context, storage
70                ))) as Box<dyn std::error::Error + Send + Sync>
71            })?;
72            let client = build_client(&definition)?;
73            self.clients.insert(storage.to_string(), client);
74        }
75        Ok(self
76            .clients
77            .get_mut(storage)
78            .expect("storage client inserted")
79            .as_mut())
80    }
81}
82
83impl Default for CloudClient {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89fn build_client(definition: &config::StorageDefinition) -> FloeResult<Box<dyn StorageClient>> {
90    let client: Box<dyn StorageClient> = match definition.fs_type.as_str() {
91        "local" => Box::new(local::LocalClient::new()),
92        "s3" => {
93            let bucket = definition.bucket.clone().ok_or_else(|| {
94                Box::new(ConfigError(format!(
95                    "storage {} requires bucket for type s3",
96                    definition.name
97                ))) as Box<dyn std::error::Error + Send + Sync>
98            })?;
99            Box::new(s3::S3Client::new(bucket, definition.region.as_deref())?)
100        }
101        "adls" => Box::new(adls::AdlsClient::new(definition)?),
102        "gcs" => {
103            let bucket = definition.bucket.clone().ok_or_else(|| {
104                Box::new(ConfigError(format!(
105                    "storage {} requires bucket for type gcs",
106                    definition.name
107                ))) as Box<dyn std::error::Error + Send + Sync>
108            })?;
109            Box::new(gcs::GcsClient::new(bucket)?)
110        }
111        other => {
112            return Err(Box::new(ConfigError(format!(
113                "storage type {} is unsupported",
114                other
115            ))))
116        }
117    };
118    Ok(client)
119}