floe_core/io/storage/
s3.rs1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::path::{Path, PathBuf};
4
5use aws_config::meta::region::RegionProviderChain;
6use aws_sdk_s3::config::Region;
7use aws_sdk_s3::primitives::ByteStream;
8use aws_sdk_s3::Client;
9use tokio::io::AsyncWriteExt;
10use tokio::runtime::Runtime;
11
12use crate::errors::{RunError, StorageError};
13use crate::{config, io, ConfigError, FloeResult};
14
15use super::{planner, ObjectRef, StorageClient};
16
17pub struct S3Client {
18 bucket: String,
19 client: Client,
20 runtime: Runtime,
21}
22
23impl S3Client {
24 pub fn new(bucket: String, region: Option<&str>) -> FloeResult<Self> {
25 let runtime = tokio::runtime::Builder::new_current_thread()
26 .enable_all()
27 .build()
28 .map_err(|err| Box::new(StorageError(format!("failed to build aws runtime: {err}"))))?;
29 let config = runtime.block_on(async {
30 let region_provider = match region {
31 Some(region) => RegionProviderChain::first_try(Region::new(region.to_string()))
32 .or_default_provider(),
33 None => RegionProviderChain::default_provider(),
34 };
35 aws_config::defaults(aws_config::BehaviorVersion::latest())
36 .region(region_provider)
37 .load()
38 .await
39 });
40 let client = Client::new(&config);
41 Ok(Self {
42 bucket,
43 client,
44 runtime,
45 })
46 }
47
48 fn bucket(&self) -> &str {
49 self.bucket.as_str()
50 }
51}
52
53impl StorageClient for S3Client {
54 fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
55 let bucket = self.bucket().to_string();
56 let prefix = prefix.to_string();
57 self.runtime.block_on(async {
58 let mut refs = Vec::new();
59 let mut continuation = None;
60 loop {
61 let mut request = self.client.list_objects_v2().bucket(&bucket);
62 if !prefix.is_empty() {
63 request = request.prefix(&prefix);
64 }
65 if let Some(token) = continuation {
66 request = request.continuation_token(token);
67 }
68 let response = request.send().await.map_err(|err| {
69 Box::new(StorageError(format!(
70 "s3 list objects failed for bucket {}: {err}",
71 bucket
72 ))) as Box<dyn std::error::Error + Send + Sync>
73 })?;
74 if let Some(contents) = response.contents {
75 for object in contents {
76 if let Some(key) = object.key {
77 let uri = format_s3_uri(&bucket, &key);
78 refs.push(ObjectRef {
79 uri,
80 key,
81 last_modified: object
82 .last_modified
83 .as_ref()
84 .map(|value| value.to_string()),
85 size: object.size.map(|value| value as u64),
86 });
87 }
88 }
89 }
90 if response.is_truncated.unwrap_or(false) {
91 continuation = response.next_continuation_token;
92 if continuation.is_none() {
93 break;
94 }
95 } else {
96 break;
97 }
98 }
99 Ok(refs)
100 })
101 }
102
103 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
104 let location = parse_s3_uri(uri)?;
105 let bucket = location.bucket;
106 let key = location.key;
107 let dest = temp_path_for_key(temp_dir, &key);
108 let dest_clone = dest.clone();
109 self.runtime.block_on(async move {
110 let response = self
111 .client
112 .get_object()
113 .bucket(bucket)
114 .key(key.clone())
115 .send()
116 .await
117 .map_err(|err| {
118 Box::new(StorageError(format!("s3 get object failed: {err}")))
119 as Box<dyn std::error::Error + Send + Sync>
120 })?;
121 if let Some(parent) = dest_clone.parent() {
122 tokio::fs::create_dir_all(parent).await?;
123 }
124 let mut file = tokio::fs::File::create(&dest_clone).await?;
125 let mut reader = response.body.into_async_read();
126 tokio::io::copy(&mut reader, &mut file).await?;
127 file.flush().await?;
128 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
129 })?;
130 Ok(dest)
131 }
132
133 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
134 let location = parse_s3_uri(uri)?;
135 let bucket = location.bucket;
136 let key = location.key;
137 let path = local_path.to_path_buf();
138 self.runtime.block_on(async move {
139 let body = ByteStream::from_path(path).await.map_err(|err| {
140 Box::new(StorageError(format!("s3 upload body failed: {err}")))
141 as Box<dyn std::error::Error + Send + Sync>
142 })?;
143 self.client
144 .put_object()
145 .bucket(bucket)
146 .key(key)
147 .body(body)
148 .send()
149 .await
150 .map_err(|err| {
151 Box::new(StorageError(format!("s3 put object failed: {err}")))
152 as Box<dyn std::error::Error + Send + Sync>
153 })?;
154 Ok(())
155 })
156 }
157
158 fn resolve_uri(&self, path: &str) -> FloeResult<String> {
159 Ok(format_s3_uri(self.bucket(), path.trim_start_matches('/')))
160 }
161
162 fn delete(&self, uri: &str) -> FloeResult<()> {
163 let location = parse_s3_uri(uri)?;
164 let bucket = location.bucket;
165 let key = location.key;
166 self.runtime.block_on(async move {
167 self.client
168 .delete_object()
169 .bucket(bucket)
170 .key(key)
171 .send()
172 .await
173 .map_err(|err| {
174 Box::new(StorageError(format!("s3 delete object failed: {err}")))
175 as Box<dyn std::error::Error + Send + Sync>
176 })?;
177 Ok(())
178 })
179 }
180}
181
182#[derive(Debug, Clone, PartialEq, Eq)]
183pub struct S3Location {
184 pub bucket: String,
185 pub key: String,
186}
187
188pub fn parse_s3_uri(uri: &str) -> FloeResult<S3Location> {
189 let stripped = uri.strip_prefix("s3://").ok_or_else(|| {
190 Box::new(ConfigError(format!("expected s3 uri, got {}", uri)))
191 as Box<dyn std::error::Error + Send + Sync>
192 })?;
193 let mut parts = stripped.splitn(2, '/');
194 let bucket = parts.next().unwrap_or("").to_string();
195 if bucket.is_empty() {
196 return Err(Box::new(ConfigError(format!(
197 "missing bucket in s3 uri: {}",
198 uri
199 ))));
200 }
201 let key = parts.next().unwrap_or("").to_string();
202 Ok(S3Location { bucket, key })
203}
204
205pub fn format_s3_uri(bucket: &str, key: &str) -> String {
206 if key.is_empty() {
207 format!("s3://{}", bucket)
208 } else {
209 format!("s3://{}/{}", bucket, key)
210 }
211}
212
213pub fn filter_keys_by_suffixes(mut keys: Vec<String>, suffixes: &[String]) -> Vec<String> {
214 let mut refs = Vec::with_capacity(keys.len());
215 for key in keys.drain(..) {
216 refs.push(ObjectRef {
217 uri: key.clone(),
218 key,
219 last_modified: None,
220 size: None,
221 });
222 }
223 let filtered = planner::filter_by_suffixes(refs, suffixes);
224 let sorted = planner::stable_sort_refs(filtered);
225 sorted.into_iter().map(|obj| obj.key).collect()
226}
227
228pub fn temp_path_for_key(temp_dir: &Path, key: &str) -> PathBuf {
229 let mut hasher = DefaultHasher::new();
230 key.hash(&mut hasher);
231 let hash = hasher.finish();
232 let name = file_name_from_key(key).unwrap_or_else(|| "object".to_string());
233 let sanitized = sanitize_filename(&name);
234 temp_dir.join(format!("{hash:016x}_{sanitized}"))
235}
236
237pub fn file_name_from_key(key: &str) -> Option<String> {
238 Path::new(key)
239 .file_name()
240 .map(|name| name.to_string_lossy().to_string())
241}
242
243fn sanitize_filename(name: &str) -> String {
244 name.chars()
245 .map(|ch| {
246 if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '-' | '_') {
247 ch
248 } else {
249 '_'
250 }
251 })
252 .collect()
253}
254
255pub fn file_stem_from_name(name: &str) -> Option<String> {
256 Path::new(name)
257 .file_stem()
258 .map(|stem| stem.to_string_lossy().to_string())
259}
260
261pub fn build_input_files(
262 client: &dyn StorageClient,
263 bucket: &str,
264 prefix: &str,
265 adapter: &dyn io::format::InputAdapter,
266 temp_dir: &Path,
267 entity: &config::EntityConfig,
268 storage: &str,
269) -> FloeResult<Vec<io::format::InputFile>> {
270 let suffixes = adapter.suffixes()?;
271 let list_refs = client.list(prefix)?;
272 let filtered = planner::filter_by_suffixes(list_refs, &suffixes);
273 let filtered = planner::stable_sort_refs(filtered);
274 if filtered.is_empty() {
275 return Err(Box::new(RunError(format!(
276 "entity.name={} source.storage={} no input objects matched (bucket={}, prefix={}, suffixes={})",
277 entity.name,
278 storage,
279 bucket,
280 prefix,
281 suffixes.join(",")
282 ))));
283 }
284 let mut inputs = Vec::with_capacity(filtered.len());
285 for object in filtered {
286 let local_path = client.download_to_temp(&object.uri, temp_dir)?;
287 let source_name = file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
288 let source_stem = file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
289 let source_uri = object.uri;
290 inputs.push(io::format::InputFile {
291 source_uri,
292 source_local_path: local_path,
293 source_name,
294 source_stem,
295 });
296 }
297 Ok(inputs)
298}