floe_core/io/storage/
s3.rs1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::path::{Path, PathBuf};
4
5use aws_config::meta::region::RegionProviderChain;
6use aws_sdk_s3::config::Region;
7use aws_sdk_s3::primitives::ByteStream;
8use aws_sdk_s3::Client;
9use tokio::io::AsyncWriteExt;
10use tokio::runtime::Runtime;
11
12use crate::errors::{RunError, StorageError};
13use crate::{config, io, ConfigError, FloeResult};
14
15use super::{planner, ObjectRef, StorageClient};
16
17pub struct S3Client {
18 bucket: String,
19 client: Client,
20 runtime: Runtime,
21}
22
23impl S3Client {
24 pub fn new(bucket: String, region: Option<&str>) -> FloeResult<Self> {
25 let runtime = tokio::runtime::Builder::new_current_thread()
26 .enable_all()
27 .build()
28 .map_err(|err| Box::new(StorageError(format!("failed to build aws runtime: {err}"))))?;
29 let config = runtime.block_on(async {
30 let region_provider = match region {
31 Some(region) => RegionProviderChain::first_try(Region::new(region.to_string()))
32 .or_default_provider(),
33 None => RegionProviderChain::default_provider(),
34 };
35 aws_config::defaults(aws_config::BehaviorVersion::latest())
36 .region(region_provider)
37 .load()
38 .await
39 });
40 let client = Client::new(&config);
41 Ok(Self {
42 bucket,
43 client,
44 runtime,
45 })
46 }
47
48 fn bucket(&self) -> &str {
49 self.bucket.as_str()
50 }
51}
52
53impl StorageClient for S3Client {
54 fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
55 let bucket = self.bucket().to_string();
56 let prefix = prefix.to_string();
57 self.runtime.block_on(async {
58 let mut refs = Vec::new();
59 let mut continuation = None;
60 loop {
61 let mut request = self.client.list_objects_v2().bucket(&bucket);
62 if !prefix.is_empty() {
63 request = request.prefix(&prefix);
64 }
65 if let Some(token) = continuation {
66 request = request.continuation_token(token);
67 }
68 let response = request.send().await.map_err(|err| {
69 Box::new(StorageError(format!(
70 "s3 list objects failed for bucket {}: {err}",
71 bucket
72 ))) as Box<dyn std::error::Error + Send + Sync>
73 })?;
74 if let Some(contents) = response.contents {
75 for object in contents {
76 if let Some(key) = object.key {
77 let uri = format_s3_uri(&bucket, &key);
78 refs.push(ObjectRef {
79 uri,
80 key,
81 last_modified: object
82 .last_modified
83 .as_ref()
84 .map(|value| value.to_string()),
85 size: object.size.map(|value| value as u64),
86 });
87 }
88 }
89 }
90 if response.is_truncated.unwrap_or(false) {
91 continuation = response.next_continuation_token;
92 if continuation.is_none() {
93 break;
94 }
95 } else {
96 break;
97 }
98 }
99 Ok(refs)
100 })
101 }
102
103 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
104 let location = parse_s3_uri(uri)?;
105 let bucket = location.bucket;
106 let key = location.key;
107 let dest = temp_path_for_key(temp_dir, &key);
108 let dest_clone = dest.clone();
109 self.runtime.block_on(async move {
110 let response = self
111 .client
112 .get_object()
113 .bucket(bucket)
114 .key(key.clone())
115 .send()
116 .await
117 .map_err(|err| {
118 Box::new(StorageError(format!("s3 get object failed: {err}")))
119 as Box<dyn std::error::Error + Send + Sync>
120 })?;
121 if let Some(parent) = dest_clone.parent() {
122 tokio::fs::create_dir_all(parent).await?;
123 }
124 let mut file = tokio::fs::File::create(&dest_clone).await?;
125 let mut reader = response.body.into_async_read();
126 tokio::io::copy(&mut reader, &mut file).await?;
127 file.flush().await?;
128 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
129 })?;
130 Ok(dest)
131 }
132
133 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
134 let location = parse_s3_uri(uri)?;
135 let bucket = location.bucket;
136 let key = location.key;
137 let path = local_path.to_path_buf();
138 self.runtime.block_on(async move {
139 let body = ByteStream::from_path(path).await.map_err(|err| {
140 Box::new(StorageError(format!("s3 upload body failed: {err}")))
141 as Box<dyn std::error::Error + Send + Sync>
142 })?;
143 self.client
144 .put_object()
145 .bucket(bucket)
146 .key(key)
147 .body(body)
148 .send()
149 .await
150 .map_err(|err| {
151 Box::new(StorageError(format!("s3 put object failed: {err}")))
152 as Box<dyn std::error::Error + Send + Sync>
153 })?;
154 Ok(())
155 })
156 }
157
158 fn resolve_uri(&self, path: &str) -> FloeResult<String> {
159 Ok(format_s3_uri(self.bucket(), path.trim_start_matches('/')))
160 }
161
162 fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
163 let src = parse_s3_uri(src_uri)?;
164 let dst = parse_s3_uri(dst_uri)?;
165 let copy_source = format!("{}/{}", src.bucket, src.key);
166 self.runtime.block_on(async move {
167 self.client
168 .copy_object()
169 .bucket(dst.bucket)
170 .key(dst.key)
171 .copy_source(copy_source)
172 .send()
173 .await
174 .map_err(|err| {
175 Box::new(StorageError(format!("s3 copy object failed: {err}")))
176 as Box<dyn std::error::Error + Send + Sync>
177 })?;
178 Ok(())
179 })
180 }
181
182 fn delete_object(&self, uri: &str) -> FloeResult<()> {
183 let location = parse_s3_uri(uri)?;
184 let bucket = location.bucket;
185 let key = location.key;
186 self.runtime.block_on(async move {
187 self.client
188 .delete_object()
189 .bucket(bucket)
190 .key(key)
191 .send()
192 .await
193 .map_err(|err| {
194 Box::new(StorageError(format!("s3 delete object failed: {err}")))
195 as Box<dyn std::error::Error + Send + Sync>
196 })?;
197 Ok(())
198 })
199 }
200
201 fn exists(&self, uri: &str) -> FloeResult<bool> {
202 let location = parse_s3_uri(uri)?;
203 if location.key.is_empty() {
204 return Ok(false);
205 }
206 let refs = self.list(&location.key)?;
207 Ok(refs.iter().any(|object| object.key == location.key))
208 }
209}
210
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub struct S3Location {
213 pub bucket: String,
214 pub key: String,
215}
216
217pub fn parse_s3_uri(uri: &str) -> FloeResult<S3Location> {
218 let stripped = uri.strip_prefix("s3://").ok_or_else(|| {
219 Box::new(ConfigError(format!("expected s3 uri, got {}", uri)))
220 as Box<dyn std::error::Error + Send + Sync>
221 })?;
222 let mut parts = stripped.splitn(2, '/');
223 let bucket = parts.next().unwrap_or("").to_string();
224 if bucket.is_empty() {
225 return Err(Box::new(ConfigError(format!(
226 "missing bucket in s3 uri: {}",
227 uri
228 ))));
229 }
230 let key = parts.next().unwrap_or("").to_string();
231 Ok(S3Location { bucket, key })
232}
233
234pub fn format_s3_uri(bucket: &str, key: &str) -> String {
235 if key.is_empty() {
236 format!("s3://{}", bucket)
237 } else {
238 format!("s3://{}/{}", bucket, key)
239 }
240}
241
242pub fn filter_keys_by_suffixes(mut keys: Vec<String>, suffixes: &[String]) -> Vec<String> {
243 let mut refs = Vec::with_capacity(keys.len());
244 for key in keys.drain(..) {
245 refs.push(ObjectRef {
246 uri: key.clone(),
247 key,
248 last_modified: None,
249 size: None,
250 });
251 }
252 let filtered = planner::filter_by_suffixes(refs, suffixes);
253 let sorted = planner::stable_sort_refs(filtered);
254 sorted.into_iter().map(|obj| obj.key).collect()
255}
256
257pub fn temp_path_for_key(temp_dir: &Path, key: &str) -> PathBuf {
258 let mut hasher = DefaultHasher::new();
259 key.hash(&mut hasher);
260 let hash = hasher.finish();
261 let name = file_name_from_key(key).unwrap_or_else(|| "object".to_string());
262 let sanitized = sanitize_filename(&name);
263 temp_dir.join(format!("{hash:016x}_{sanitized}"))
264}
265
266pub fn file_name_from_key(key: &str) -> Option<String> {
267 Path::new(key)
268 .file_name()
269 .map(|name| name.to_string_lossy().to_string())
270}
271
272fn sanitize_filename(name: &str) -> String {
273 name.chars()
274 .map(|ch| {
275 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '-' | '_') {
276 ch
277 } else {
278 '_'
279 }
280 })
281 .collect()
282}
283
284pub fn file_stem_from_name(name: &str) -> Option<String> {
285 Path::new(name)
286 .file_stem()
287 .map(|stem| stem.to_string_lossy().to_string())
288}
289
290pub fn build_input_files(
291 client: &dyn StorageClient,
292 bucket: &str,
293 prefix: &str,
294 adapter: &dyn io::format::InputAdapter,
295 temp_dir: &Path,
296 entity: &config::EntityConfig,
297 storage: &str,
298) -> FloeResult<Vec<io::format::InputFile>> {
299 let suffixes = adapter.suffixes()?;
300 let list_refs = client.list(prefix)?;
301 let filtered = planner::filter_by_suffixes(list_refs, &suffixes);
302 let filtered = planner::stable_sort_refs(filtered);
303 if filtered.is_empty() {
304 return Err(Box::new(RunError(format!(
305 "entity.name={} source.storage={} no input objects matched (bucket={}, prefix={}, suffixes={})",
306 entity.name,
307 storage,
308 bucket,
309 prefix,
310 suffixes.join(",")
311 ))));
312 }
313 let mut inputs = Vec::with_capacity(filtered.len());
314 for object in filtered {
315 let local_path = client.download_to_temp(&object.uri, temp_dir)?;
316 let source_name = file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
317 let source_stem = file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
318 let source_uri = object.uri;
319 inputs.push(io::format::InputFile {
320 source_uri,
321 source_local_path: local_path,
322 source_name,
323 source_stem,
324 });
325 }
326 Ok(inputs)
327}