Skip to main content

floe_core/io/storage/
mod.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::{config, ConfigError, FloeResult};
5
6pub mod core;
7pub mod object_store;
8pub mod ops;
9pub mod providers;
10pub mod target;
11
12pub use core::{extensions, paths, placement, planner, uri, validation};
13pub use ops::{archive, inputs, output};
14pub use placement::OutputPlacement;
15pub use planner::{
16    filter_by_suffixes, join_prefix, normalize_separators, stable_sort_refs, temp_path_for_key,
17};
18pub use providers::{adls, gcs, local, s3};
19pub use target::Target;
20
21pub use planner::ObjectRef;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct StoredObject {
25    pub body: Vec<u8>,
26    pub version: String,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum ConditionalWrite {
31    Written { version: String },
32    Conflict,
33}
34
35pub trait StorageClient: Send + Sync {
36    fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>>;
37    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf>;
38    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()>;
39    fn resolve_uri(&self, path: &str) -> FloeResult<String>;
40    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()>;
41    fn delete_object(&self, uri: &str) -> FloeResult<()>;
42    fn exists(&self, uri: &str) -> FloeResult<bool>;
43
44    fn read_object(&self, uri: &str) -> FloeResult<Option<StoredObject>> {
45        let _ = uri;
46        Err(Box::new(ConfigError(
47            "storage backend does not support object reads with version tokens".to_string(),
48        )))
49    }
50
51    fn write_object_conditional(
52        &self,
53        uri: &str,
54        expected_version: Option<&str>,
55        body: &[u8],
56    ) -> FloeResult<ConditionalWrite> {
57        let _ = (uri, expected_version, body);
58        Err(Box::new(ConfigError(
59            "storage backend does not support conditional object writes".to_string(),
60        )))
61    }
62
63    fn delete_object_conditional(
64        &self,
65        uri: &str,
66        expected_version: Option<&str>,
67    ) -> FloeResult<ConditionalWrite> {
68        let _ = (uri, expected_version);
69        Err(Box::new(ConfigError(
70            "storage backend does not support conditional object deletes".to_string(),
71        )))
72    }
73}
74
75pub struct CloudClient {
76    clients: HashMap<String, Box<dyn StorageClient>>,
77}
78
79impl CloudClient {
80    pub fn new() -> Self {
81        Self {
82            clients: HashMap::new(),
83        }
84    }
85
86    pub fn client_for<'a>(
87        &'a mut self,
88        resolver: &config::StorageResolver,
89        storage: &str,
90        entity: &config::EntityConfig,
91    ) -> FloeResult<&'a mut dyn StorageClient> {
92        let context = format!("entity.name={}", entity.name);
93        self.client_for_context(resolver, storage, &context)
94    }
95
96    pub fn client_for_context<'a>(
97        &'a mut self,
98        resolver: &config::StorageResolver,
99        storage: &str,
100        context: &str,
101    ) -> FloeResult<&'a mut dyn StorageClient> {
102        if !self.clients.contains_key(storage) {
103            let definition = resolver.definition(storage).ok_or_else(|| {
104                Box::new(ConfigError(format!(
105                    "{} storage {} is not defined",
106                    context, storage
107                ))) as Box<dyn std::error::Error + Send + Sync>
108            })?;
109            let client = build_client(&definition)?;
110            self.clients.insert(storage.to_string(), client);
111        }
112        Ok(self
113            .clients
114            .get_mut(storage)
115            .expect("storage client inserted")
116            .as_mut())
117    }
118}
119
120impl Default for CloudClient {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126fn build_client(definition: &config::StorageDefinition) -> FloeResult<Box<dyn StorageClient>> {
127    let client: Box<dyn StorageClient> = match definition.fs_type.as_str() {
128        "local" => Box::new(local::LocalClient::new()),
129        "s3" => {
130            let bucket =
131                validation::require_field(definition, definition.bucket.as_ref(), "bucket", "s3")?;
132            Box::new(s3::S3Client::new(
133                bucket,
134                definition.region.as_deref(),
135                definition.endpoint.as_deref(),
136                definition.path_style_access,
137            )?)
138        }
139        "adls" => Box::new(adls::AdlsClient::new(definition)?),
140        "gcs" => {
141            let bucket =
142                validation::require_field(definition, definition.bucket.as_ref(), "bucket", "gcs")?;
143            Box::new(gcs::GcsClient::new(bucket)?)
144        }
145        other => {
146            return Err(Box::new(ConfigError(format!(
147                "storage type {} is unsupported",
148                other
149            ))))
150        }
151    };
152    Ok(client)
153}