floe_core/io/storage/
mod.rs1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::{config, ConfigError, FloeResult};
5
6pub mod core;
7pub mod object_store;
8pub mod ops;
9pub mod providers;
10pub mod target;
11
12pub use core::{extensions, paths, placement, planner, uri, validation};
13pub use ops::{archive, inputs, output};
14pub use placement::OutputPlacement;
15pub use planner::{
16 filter_by_suffixes, join_prefix, normalize_separators, stable_sort_refs, temp_path_for_key,
17};
18pub use providers::{adls, gcs, local, s3};
19pub use target::Target;
20
21pub use planner::ObjectRef;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct StoredObject {
25 pub body: Vec<u8>,
26 pub version: String,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum ConditionalWrite {
31 Written { version: String },
32 Conflict,
33}
34
35pub trait StorageClient: Send + Sync {
36 fn list(&self, prefix_or_path: &str) -> FloeResult<Vec<ObjectRef>>;
37 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf>;
38 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()>;
39 fn resolve_uri(&self, path: &str) -> FloeResult<String>;
40 fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()>;
41 fn delete_object(&self, uri: &str) -> FloeResult<()>;
42 fn exists(&self, uri: &str) -> FloeResult<bool>;
43
44 fn read_object(&self, uri: &str) -> FloeResult<Option<StoredObject>> {
45 let _ = uri;
46 Err(Box::new(ConfigError(
47 "storage backend does not support object reads with version tokens".to_string(),
48 )))
49 }
50
51 fn write_object_conditional(
52 &self,
53 uri: &str,
54 expected_version: Option<&str>,
55 body: &[u8],
56 ) -> FloeResult<ConditionalWrite> {
57 let _ = (uri, expected_version, body);
58 Err(Box::new(ConfigError(
59 "storage backend does not support conditional object writes".to_string(),
60 )))
61 }
62
63 fn delete_object_conditional(
64 &self,
65 uri: &str,
66 expected_version: Option<&str>,
67 ) -> FloeResult<ConditionalWrite> {
68 let _ = (uri, expected_version);
69 Err(Box::new(ConfigError(
70 "storage backend does not support conditional object deletes".to_string(),
71 )))
72 }
73}
74
75pub struct CloudClient {
76 clients: HashMap<String, Box<dyn StorageClient>>,
77}
78
79impl CloudClient {
80 pub fn new() -> Self {
81 Self {
82 clients: HashMap::new(),
83 }
84 }
85
86 pub fn client_for<'a>(
87 &'a mut self,
88 resolver: &config::StorageResolver,
89 storage: &str,
90 entity: &config::EntityConfig,
91 ) -> FloeResult<&'a mut dyn StorageClient> {
92 let context = format!("entity.name={}", entity.name);
93 self.client_for_context(resolver, storage, &context)
94 }
95
96 pub fn client_for_context<'a>(
97 &'a mut self,
98 resolver: &config::StorageResolver,
99 storage: &str,
100 context: &str,
101 ) -> FloeResult<&'a mut dyn StorageClient> {
102 if !self.clients.contains_key(storage) {
103 let definition = resolver.definition(storage).ok_or_else(|| {
104 Box::new(ConfigError(format!(
105 "{} storage {} is not defined",
106 context, storage
107 ))) as Box<dyn std::error::Error + Send + Sync>
108 })?;
109 let client = build_client(&definition)?;
110 self.clients.insert(storage.to_string(), client);
111 }
112 Ok(self
113 .clients
114 .get_mut(storage)
115 .expect("storage client inserted")
116 .as_mut())
117 }
118}
119
120impl Default for CloudClient {
121 fn default() -> Self {
122 Self::new()
123 }
124}
125
126fn build_client(definition: &config::StorageDefinition) -> FloeResult<Box<dyn StorageClient>> {
127 let client: Box<dyn StorageClient> = match definition.fs_type.as_str() {
128 "local" => Box::new(local::LocalClient::new()),
129 "s3" => {
130 let bucket =
131 validation::require_field(definition, definition.bucket.as_ref(), "bucket", "s3")?;
132 Box::new(s3::S3Client::new(bucket, definition.region.as_deref())?)
133 }
134 "adls" => Box::new(adls::AdlsClient::new(definition)?),
135 "gcs" => {
136 let bucket =
137 validation::require_field(definition, definition.bucket.as_ref(), "bucket", "gcs")?;
138 Box::new(gcs::GcsClient::new(bucket)?)
139 }
140 other => {
141 return Err(Box::new(ConfigError(format!(
142 "storage type {} is unsupported",
143 other
144 ))))
145 }
146 };
147 Ok(client)
148}