floe_core/io/storage/
mod.rs1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::{config, ConfigError, FloeResult};
5
6pub mod adls;
7pub mod archive;
8pub mod extensions;
9pub mod gcs;
10pub mod inputs;
11pub mod local;
12pub mod object_store;
13pub mod ops;
14pub mod output;
15pub mod paths;
16pub mod planner;
17pub mod s3;
18pub mod target;
19
20pub use archive::archive_input_file;
21pub use planner::{
22 filter_by_suffixes, join_prefix, normalize_separators, stable_sort_refs, temp_path_for_key,
23};
24pub use target::Target;
25
26pub use planner::ObjectRef;
27
28pub trait StorageClient: Send + Sync {
29 fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>>;
30 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf>;
31 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()>;
32 fn resolve_uri(&self, path: &str) -> FloeResult<String>;
33 fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()>;
34 fn delete_object(&self, uri: &str) -> FloeResult<()>;
35 fn exists(&self, uri: &str) -> FloeResult<bool>;
36}
37
38pub struct CloudClient {
39 clients: HashMap<String, Box<dyn StorageClient>>,
40}
41
42impl CloudClient {
43 pub fn new() -> Self {
44 Self {
45 clients: HashMap::new(),
46 }
47 }
48
49 pub fn client_for<'a>(
50 &'a mut self,
51 resolver: &config::StorageResolver,
52 storage: &str,
53 entity: &config::EntityConfig,
54 ) -> FloeResult<&'a mut dyn StorageClient> {
55 let context = format!("entity.name={}", entity.name);
56 self.client_for_context(resolver, storage, &context)
57 }
58
59 pub fn client_for_context<'a>(
60 &'a mut self,
61 resolver: &config::StorageResolver,
62 storage: &str,
63 context: &str,
64 ) -> FloeResult<&'a mut dyn StorageClient> {
65 if !self.clients.contains_key(storage) {
66 let definition = resolver.definition(storage).ok_or_else(|| {
67 Box::new(ConfigError(format!(
68 "{} storage {} is not defined",
69 context, storage
70 ))) as Box<dyn std::error::Error + Send + Sync>
71 })?;
72 let client = build_client(&definition)?;
73 self.clients.insert(storage.to_string(), client);
74 }
75 Ok(self
76 .clients
77 .get_mut(storage)
78 .expect("storage client inserted")
79 .as_mut())
80 }
81}
82
83impl Default for CloudClient {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89fn build_client(definition: &config::StorageDefinition) -> FloeResult<Box<dyn StorageClient>> {
90 let client: Box<dyn StorageClient> = match definition.fs_type.as_str() {
91 "local" => Box::new(local::LocalClient::new()),
92 "s3" => {
93 let bucket = definition.bucket.clone().ok_or_else(|| {
94 Box::new(ConfigError(format!(
95 "storage {} requires bucket for type s3",
96 definition.name
97 ))) as Box<dyn std::error::Error + Send + Sync>
98 })?;
99 Box::new(s3::S3Client::new(bucket, definition.region.as_deref())?)
100 }
101 "adls" => Box::new(adls::AdlsClient::new(definition)?),
102 "gcs" => {
103 let bucket = definition.bucket.clone().ok_or_else(|| {
104 Box::new(ConfigError(format!(
105 "storage {} requires bucket for type gcs",
106 definition.name
107 ))) as Box<dyn std::error::Error + Send + Sync>
108 })?;
109 Box::new(gcs::GcsClient::new(bucket)?)
110 }
111 other => {
112 return Err(Box::new(ConfigError(format!(
113 "storage type {} is unsupported",
114 other
115 ))))
116 }
117 };
118 Ok(client)
119}