floe_core/io/storage/
mod.rs1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::{config, ConfigError, FloeResult};
5
6pub mod core;
7pub mod object_store;
8pub mod ops;
9pub mod providers;
10pub mod target;
11
12pub use core::{extensions, paths, placement, planner, uri, validation};
13pub use ops::{archive, inputs, output};
14pub use placement::OutputPlacement;
15pub use planner::{
16 filter_by_suffixes, join_prefix, normalize_separators, stable_sort_refs, temp_path_for_key,
17};
18pub use providers::{adls, gcs, local, s3};
19pub use target::Target;
20
21pub use planner::ObjectRef;
22
23pub trait StorageClient: Send + Sync {
24 fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>>;
25 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf>;
26 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()>;
27 fn resolve_uri(&self, path: &str) -> FloeResult<String>;
28 fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()>;
29 fn delete_object(&self, uri: &str) -> FloeResult<()>;
30 fn exists(&self, uri: &str) -> FloeResult<bool>;
31}
32
33pub struct CloudClient {
34 clients: HashMap<String, Box<dyn StorageClient>>,
35}
36
37impl CloudClient {
38 pub fn new() -> Self {
39 Self {
40 clients: HashMap::new(),
41 }
42 }
43
44 pub fn client_for<'a>(
45 &'a mut self,
46 resolver: &config::StorageResolver,
47 storage: &str,
48 entity: &config::EntityConfig,
49 ) -> FloeResult<&'a mut dyn StorageClient> {
50 let context = format!("entity.name={}", entity.name);
51 self.client_for_context(resolver, storage, &context)
52 }
53
54 pub fn client_for_context<'a>(
55 &'a mut self,
56 resolver: &config::StorageResolver,
57 storage: &str,
58 context: &str,
59 ) -> FloeResult<&'a mut dyn StorageClient> {
60 if !self.clients.contains_key(storage) {
61 let definition = resolver.definition(storage).ok_or_else(|| {
62 Box::new(ConfigError(format!(
63 "{} storage {} is not defined",
64 context, storage
65 ))) as Box<dyn std::error::Error + Send + Sync>
66 })?;
67 let client = build_client(&definition)?;
68 self.clients.insert(storage.to_string(), client);
69 }
70 Ok(self
71 .clients
72 .get_mut(storage)
73 .expect("storage client inserted")
74 .as_mut())
75 }
76}
77
78impl Default for CloudClient {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84fn build_client(definition: &config::StorageDefinition) -> FloeResult<Box<dyn StorageClient>> {
85 let client: Box<dyn StorageClient> = match definition.fs_type.as_str() {
86 "local" => Box::new(local::LocalClient::new()),
87 "s3" => {
88 let bucket =
89 validation::require_field(definition, definition.bucket.as_ref(), "bucket", "s3")?;
90 Box::new(s3::S3Client::new(bucket, definition.region.as_deref())?)
91 }
92 "adls" => Box::new(adls::AdlsClient::new(definition)?),
93 "gcs" => {
94 let bucket =
95 validation::require_field(definition, definition.bucket.as_ref(), "bucket", "gcs")?;
96 Box::new(gcs::GcsClient::new(bucket)?)
97 }
98 other => {
99 return Err(Box::new(ConfigError(format!(
100 "storage type {} is unsupported",
101 other
102 ))))
103 }
104 };
105 Ok(client)
106}