Skip to main content

floe_core/io/storage/
mod.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::{config, ConfigError, FloeResult};
5
6pub mod core;
7pub mod object_store;
8pub mod ops;
9pub mod providers;
10pub mod target;
11
12pub use core::{extensions, paths, placement, planner, uri, validation};
13pub use ops::{archive, inputs, output};
14pub use placement::OutputPlacement;
15pub use planner::{
16    filter_by_suffixes, join_prefix, normalize_separators, stable_sort_refs, temp_path_for_key,
17};
18pub use providers::{adls, gcs, local, s3};
19pub use target::Target;
20
21pub use planner::ObjectRef;
22
23pub trait StorageClient: Send + Sync {
24    fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>>;
25    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf>;
26    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()>;
27    fn resolve_uri(&self, path: &str) -> FloeResult<String>;
28    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()>;
29    fn delete_object(&self, uri: &str) -> FloeResult<()>;
30    fn exists(&self, uri: &str) -> FloeResult<bool>;
31}
32
33pub struct CloudClient {
34    clients: HashMap<String, Box<dyn StorageClient>>,
35}
36
37impl CloudClient {
38    pub fn new() -> Self {
39        Self {
40            clients: HashMap::new(),
41        }
42    }
43
44    pub fn client_for<'a>(
45        &'a mut self,
46        resolver: &config::StorageResolver,
47        storage: &str,
48        entity: &config::EntityConfig,
49    ) -> FloeResult<&'a mut dyn StorageClient> {
50        let context = format!("entity.name={}", entity.name);
51        self.client_for_context(resolver, storage, &context)
52    }
53
54    pub fn client_for_context<'a>(
55        &'a mut self,
56        resolver: &config::StorageResolver,
57        storage: &str,
58        context: &str,
59    ) -> FloeResult<&'a mut dyn StorageClient> {
60        if !self.clients.contains_key(storage) {
61            let definition = resolver.definition(storage).ok_or_else(|| {
62                Box::new(ConfigError(format!(
63                    "{} storage {} is not defined",
64                    context, storage
65                ))) as Box<dyn std::error::Error + Send + Sync>
66            })?;
67            let client = build_client(&definition)?;
68            self.clients.insert(storage.to_string(), client);
69        }
70        Ok(self
71            .clients
72            .get_mut(storage)
73            .expect("storage client inserted")
74            .as_mut())
75    }
76}
77
78impl Default for CloudClient {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84fn build_client(definition: &config::StorageDefinition) -> FloeResult<Box<dyn StorageClient>> {
85    let client: Box<dyn StorageClient> = match definition.fs_type.as_str() {
86        "local" => Box::new(local::LocalClient::new()),
87        "s3" => {
88            let bucket =
89                validation::require_field(definition, definition.bucket.as_ref(), "bucket", "s3")?;
90            Box::new(s3::S3Client::new(bucket, definition.region.as_deref())?)
91        }
92        "adls" => Box::new(adls::AdlsClient::new(definition)?),
93        "gcs" => {
94            let bucket =
95                validation::require_field(definition, definition.bucket.as_ref(), "bucket", "gcs")?;
96            Box::new(gcs::GcsClient::new(bucket)?)
97        }
98        other => {
99            return Err(Box::new(ConfigError(format!(
100                "storage type {} is unsupported",
101                other
102            ))))
103        }
104    };
105    Ok(client)
106}