floe_core/io/storage/
mod.rs1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::{config, ConfigError, FloeResult};
5
6pub mod adls;
7pub mod extensions;
8pub mod gcs;
9pub mod inputs;
10pub mod local;
11pub mod object_store;
12pub mod output;
13pub mod paths;
14pub mod planner;
15pub mod s3;
16pub mod target;
17
18pub use planner::{filter_by_suffixes, join_prefix, normalize_separators, stable_sort_refs};
19pub use target::Target;
20
21pub use planner::ObjectRef;
22
23pub trait StorageClient: Send + Sync {
24 fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>>;
25 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf>;
26 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()>;
27 fn resolve_uri(&self, path: &str) -> FloeResult<String>;
28 fn delete(&self, uri: &str) -> FloeResult<()>;
29}
30
31pub struct CloudClient {
32 clients: HashMap<String, Box<dyn StorageClient>>,
33}
34
35impl CloudClient {
36 pub fn new() -> Self {
37 Self {
38 clients: HashMap::new(),
39 }
40 }
41
42 pub fn client_for<'a>(
43 &'a mut self,
44 resolver: &config::StorageResolver,
45 storage: &str,
46 entity: &config::EntityConfig,
47 ) -> FloeResult<&'a mut dyn StorageClient> {
48 if !self.clients.contains_key(storage) {
49 let definition = resolver.definition(storage).ok_or_else(|| {
50 Box::new(ConfigError(format!(
51 "entity.name={} storage {} is not defined",
52 entity.name, storage
53 ))) as Box<dyn std::error::Error + Send + Sync>
54 })?;
55 let client: Box<dyn StorageClient> = match definition.fs_type.as_str() {
56 "local" => Box::new(local::LocalClient::new()),
57 "s3" => {
58 let bucket = definition.bucket.clone().ok_or_else(|| {
59 Box::new(ConfigError(format!(
60 "storage {} requires bucket for type s3",
61 definition.name
62 ))) as Box<dyn std::error::Error + Send + Sync>
63 })?;
64 Box::new(s3::S3Client::new(bucket, definition.region.as_deref())?)
65 }
66 "adls" => Box::new(adls::AdlsClient::new(&definition)?),
67 "gcs" => {
68 let bucket = definition.bucket.clone().ok_or_else(|| {
69 Box::new(ConfigError(format!(
70 "storage {} requires bucket for type gcs",
71 definition.name
72 ))) as Box<dyn std::error::Error + Send + Sync>
73 })?;
74 Box::new(gcs::GcsClient::new(bucket)?)
75 }
76 other => {
77 return Err(Box::new(ConfigError(format!(
78 "storage type {} is unsupported",
79 other
80 ))))
81 }
82 };
83 self.clients.insert(storage.to_string(), client);
84 }
85 Ok(self
86 .clients
87 .get_mut(storage)
88 .expect("storage client inserted")
89 .as_mut())
90 }
91}
92
93impl Default for CloudClient {
94 fn default() -> Self {
95 Self::new()
96 }
97}