Skip to main content

floe_core/io/storage/
s3.rs

1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::path::{Path, PathBuf};
4
5use aws_config::meta::region::RegionProviderChain;
6use aws_sdk_s3::config::Region;
7use aws_sdk_s3::primitives::ByteStream;
8use aws_sdk_s3::Client;
9use tokio::io::AsyncWriteExt;
10use tokio::runtime::Runtime;
11
12use crate::errors::{RunError, StorageError};
13use crate::{config, io, ConfigError, FloeResult};
14
15use super::{planner, ObjectRef, StorageClient};
16
17pub struct S3Client {
18    bucket: String,
19    client: Client,
20    runtime: Runtime,
21}
22
23impl S3Client {
24    pub fn new(bucket: String, region: Option<&str>) -> FloeResult<Self> {
25        let runtime = tokio::runtime::Builder::new_current_thread()
26            .enable_all()
27            .build()
28            .map_err(|err| Box::new(StorageError(format!("failed to build aws runtime: {err}"))))?;
29        let config = runtime.block_on(async {
30            let region_provider = match region {
31                Some(region) => RegionProviderChain::first_try(Region::new(region.to_string()))
32                    .or_default_provider(),
33                None => RegionProviderChain::default_provider(),
34            };
35            aws_config::defaults(aws_config::BehaviorVersion::latest())
36                .region(region_provider)
37                .load()
38                .await
39        });
40        let client = Client::new(&config);
41        Ok(Self {
42            bucket,
43            client,
44            runtime,
45        })
46    }
47
48    fn bucket(&self) -> &str {
49        self.bucket.as_str()
50    }
51}
52
53impl StorageClient for S3Client {
54    fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
55        let bucket = self.bucket().to_string();
56        let prefix = prefix.to_string();
57        self.runtime.block_on(async {
58            let mut refs = Vec::new();
59            let mut continuation = None;
60            loop {
61                let mut request = self.client.list_objects_v2().bucket(&bucket);
62                if !prefix.is_empty() {
63                    request = request.prefix(&prefix);
64                }
65                if let Some(token) = continuation {
66                    request = request.continuation_token(token);
67                }
68                let response = request.send().await.map_err(|err| {
69                    Box::new(StorageError(format!(
70                        "s3 list objects failed for bucket {}: {err}",
71                        bucket
72                    ))) as Box<dyn std::error::Error + Send + Sync>
73                })?;
74                if let Some(contents) = response.contents {
75                    for object in contents {
76                        if let Some(key) = object.key {
77                            let uri = format_s3_uri(&bucket, &key);
78                            refs.push(ObjectRef {
79                                uri,
80                                key,
81                                last_modified: object
82                                    .last_modified
83                                    .as_ref()
84                                    .map(|value| value.to_string()),
85                                size: object.size.map(|value| value as u64),
86                            });
87                        }
88                    }
89                }
90                if response.is_truncated.unwrap_or(false) {
91                    continuation = response.next_continuation_token;
92                    if continuation.is_none() {
93                        break;
94                    }
95                } else {
96                    break;
97                }
98            }
99            Ok(refs)
100        })
101    }
102
103    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
104        let location = parse_s3_uri(uri)?;
105        let bucket = location.bucket;
106        let key = location.key;
107        let dest = temp_path_for_key(temp_dir, &key);
108        let dest_clone = dest.clone();
109        self.runtime.block_on(async move {
110            let response = self
111                .client
112                .get_object()
113                .bucket(bucket)
114                .key(key.clone())
115                .send()
116                .await
117                .map_err(|err| {
118                    Box::new(StorageError(format!("s3 get object failed: {err}")))
119                        as Box<dyn std::error::Error + Send + Sync>
120                })?;
121            if let Some(parent) = dest_clone.parent() {
122                tokio::fs::create_dir_all(parent).await?;
123            }
124            let mut file = tokio::fs::File::create(&dest_clone).await?;
125            let mut reader = response.body.into_async_read();
126            tokio::io::copy(&mut reader, &mut file).await?;
127            file.flush().await?;
128            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
129        })?;
130        Ok(dest)
131    }
132
133    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
134        let location = parse_s3_uri(uri)?;
135        let bucket = location.bucket;
136        let key = location.key;
137        let path = local_path.to_path_buf();
138        self.runtime.block_on(async move {
139            let body = ByteStream::from_path(path).await.map_err(|err| {
140                Box::new(StorageError(format!("s3 upload body failed: {err}")))
141                    as Box<dyn std::error::Error + Send + Sync>
142            })?;
143            self.client
144                .put_object()
145                .bucket(bucket)
146                .key(key)
147                .body(body)
148                .send()
149                .await
150                .map_err(|err| {
151                    Box::new(StorageError(format!("s3 put object failed: {err}")))
152                        as Box<dyn std::error::Error + Send + Sync>
153                })?;
154            Ok(())
155        })
156    }
157
158    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
159        Ok(format_s3_uri(self.bucket(), path.trim_start_matches('/')))
160    }
161
162    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
163        let src = parse_s3_uri(src_uri)?;
164        let dst = parse_s3_uri(dst_uri)?;
165        let copy_source = format!("{}/{}", src.bucket, src.key);
166        self.runtime.block_on(async move {
167            self.client
168                .copy_object()
169                .bucket(dst.bucket)
170                .key(dst.key)
171                .copy_source(copy_source)
172                .send()
173                .await
174                .map_err(|err| {
175                    Box::new(StorageError(format!("s3 copy object failed: {err}")))
176                        as Box<dyn std::error::Error + Send + Sync>
177                })?;
178            Ok(())
179        })
180    }
181
182    fn delete_object(&self, uri: &str) -> FloeResult<()> {
183        let location = parse_s3_uri(uri)?;
184        let bucket = location.bucket;
185        let key = location.key;
186        self.runtime.block_on(async move {
187            self.client
188                .delete_object()
189                .bucket(bucket)
190                .key(key)
191                .send()
192                .await
193                .map_err(|err| {
194                    Box::new(StorageError(format!("s3 delete object failed: {err}")))
195                        as Box<dyn std::error::Error + Send + Sync>
196                })?;
197            Ok(())
198        })
199    }
200
201    fn exists(&self, uri: &str) -> FloeResult<bool> {
202        let location = parse_s3_uri(uri)?;
203        if location.key.is_empty() {
204            return Ok(false);
205        }
206        let refs = self.list(&location.key)?;
207        Ok(refs.iter().any(|object| object.key == location.key))
208    }
209}
210
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub struct S3Location {
213    pub bucket: String,
214    pub key: String,
215}
216
217pub fn parse_s3_uri(uri: &str) -> FloeResult<S3Location> {
218    let stripped = uri.strip_prefix("s3://").ok_or_else(|| {
219        Box::new(ConfigError(format!("expected s3 uri, got {}", uri)))
220            as Box<dyn std::error::Error + Send + Sync>
221    })?;
222    let mut parts = stripped.splitn(2, '/');
223    let bucket = parts.next().unwrap_or("").to_string();
224    if bucket.is_empty() {
225        return Err(Box::new(ConfigError(format!(
226            "missing bucket in s3 uri: {}",
227            uri
228        ))));
229    }
230    let key = parts.next().unwrap_or("").to_string();
231    Ok(S3Location { bucket, key })
232}
233
234pub fn format_s3_uri(bucket: &str, key: &str) -> String {
235    if key.is_empty() {
236        format!("s3://{}", bucket)
237    } else {
238        format!("s3://{}/{}", bucket, key)
239    }
240}
241
242pub fn filter_keys_by_suffixes(mut keys: Vec<String>, suffixes: &[String]) -> Vec<String> {
243    let mut refs = Vec::with_capacity(keys.len());
244    for key in keys.drain(..) {
245        refs.push(ObjectRef {
246            uri: key.clone(),
247            key,
248            last_modified: None,
249            size: None,
250        });
251    }
252    let filtered = planner::filter_by_suffixes(refs, suffixes);
253    let sorted = planner::stable_sort_refs(filtered);
254    sorted.into_iter().map(|obj| obj.key).collect()
255}
256
257pub fn temp_path_for_key(temp_dir: &Path, key: &str) -> PathBuf {
258    let mut hasher = DefaultHasher::new();
259    key.hash(&mut hasher);
260    let hash = hasher.finish();
261    let name = file_name_from_key(key).unwrap_or_else(|| "object".to_string());
262    let sanitized = sanitize_filename(&name);
263    temp_dir.join(format!("{hash:016x}_{sanitized}"))
264}
265
266pub fn file_name_from_key(key: &str) -> Option<String> {
267    Path::new(key)
268        .file_name()
269        .map(|name| name.to_string_lossy().to_string())
270}
271
272fn sanitize_filename(name: &str) -> String {
273    name.chars()
274        .map(|ch| {
275            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '-' | '_') {
276                ch
277            } else {
278                '_'
279            }
280        })
281        .collect()
282}
283
284pub fn file_stem_from_name(name: &str) -> Option<String> {
285    Path::new(name)
286        .file_stem()
287        .map(|stem| stem.to_string_lossy().to_string())
288}
289
290pub fn build_input_files(
291    client: &dyn StorageClient,
292    bucket: &str,
293    prefix: &str,
294    adapter: &dyn io::format::InputAdapter,
295    temp_dir: &Path,
296    entity: &config::EntityConfig,
297    storage: &str,
298) -> FloeResult<Vec<io::format::InputFile>> {
299    let suffixes = adapter.suffixes()?;
300    let list_refs = client.list(prefix)?;
301    let filtered = planner::filter_by_suffixes(list_refs, &suffixes);
302    let filtered = planner::stable_sort_refs(filtered);
303    if filtered.is_empty() {
304        return Err(Box::new(RunError(format!(
305            "entity.name={} source.storage={} no input objects matched (bucket={}, prefix={}, suffixes={})",
306            entity.name,
307            storage,
308            bucket,
309            prefix,
310            suffixes.join(",")
311        ))));
312    }
313    let mut inputs = Vec::with_capacity(filtered.len());
314    for object in filtered {
315        let local_path = client.download_to_temp(&object.uri, temp_dir)?;
316        let source_name = file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
317        let source_stem = file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
318        let source_uri = object.uri;
319        inputs.push(io::format::InputFile {
320            source_uri,
321            source_local_path: local_path,
322            source_name,
323            source_stem,
324        });
325    }
326    Ok(inputs)
327}