Skip to main content

floe_core/io/storage/
s3.rs

1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::path::{Path, PathBuf};
4
5use aws_config::meta::region::RegionProviderChain;
6use aws_sdk_s3::config::Region;
7use aws_sdk_s3::primitives::ByteStream;
8use aws_sdk_s3::Client;
9use tokio::io::AsyncWriteExt;
10use tokio::runtime::Runtime;
11
12use crate::errors::{RunError, StorageError};
13use crate::{config, io, ConfigError, FloeResult};
14
15use super::{planner, ObjectRef, StorageClient};
16
17pub struct S3Client {
18    bucket: String,
19    client: Client,
20    runtime: Runtime,
21}
22
23impl S3Client {
24    pub fn new(bucket: String, region: Option<&str>) -> FloeResult<Self> {
25        let runtime = tokio::runtime::Builder::new_current_thread()
26            .enable_all()
27            .build()
28            .map_err(|err| Box::new(StorageError(format!("failed to build aws runtime: {err}"))))?;
29        let config = runtime.block_on(async {
30            let region_provider = match region {
31                Some(region) => RegionProviderChain::first_try(Region::new(region.to_string()))
32                    .or_default_provider(),
33                None => RegionProviderChain::default_provider(),
34            };
35            aws_config::defaults(aws_config::BehaviorVersion::latest())
36                .region(region_provider)
37                .load()
38                .await
39        });
40        let client = Client::new(&config);
41        Ok(Self {
42            bucket,
43            client,
44            runtime,
45        })
46    }
47
48    fn bucket(&self) -> &str {
49        self.bucket.as_str()
50    }
51}
52
53impl StorageClient for S3Client {
54    fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
55        let bucket = self.bucket().to_string();
56        let prefix = prefix.to_string();
57        self.runtime.block_on(async {
58            let mut refs = Vec::new();
59            let mut continuation = None;
60            loop {
61                let mut request = self.client.list_objects_v2().bucket(&bucket);
62                if !prefix.is_empty() {
63                    request = request.prefix(&prefix);
64                }
65                if let Some(token) = continuation {
66                    request = request.continuation_token(token);
67                }
68                let response = request.send().await.map_err(|err| {
69                    Box::new(StorageError(format!(
70                        "s3 list objects failed for bucket {}: {err}",
71                        bucket
72                    ))) as Box<dyn std::error::Error + Send + Sync>
73                })?;
74                if let Some(contents) = response.contents {
75                    for object in contents {
76                        if let Some(key) = object.key {
77                            let uri = format_s3_uri(&bucket, &key);
78                            refs.push(ObjectRef {
79                                uri,
80                                key,
81                                last_modified: object
82                                    .last_modified
83                                    .as_ref()
84                                    .map(|value| value.to_string()),
85                                size: object.size.map(|value| value as u64),
86                            });
87                        }
88                    }
89                }
90                if response.is_truncated.unwrap_or(false) {
91                    continuation = response.next_continuation_token;
92                    if continuation.is_none() {
93                        break;
94                    }
95                } else {
96                    break;
97                }
98            }
99            Ok(refs)
100        })
101    }
102
103    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
104        let location = parse_s3_uri(uri)?;
105        let bucket = location.bucket;
106        let key = location.key;
107        let dest = temp_path_for_key(temp_dir, &key);
108        let dest_clone = dest.clone();
109        self.runtime.block_on(async move {
110            let response = self
111                .client
112                .get_object()
113                .bucket(bucket)
114                .key(key.clone())
115                .send()
116                .await
117                .map_err(|err| {
118                    Box::new(StorageError(format!("s3 get object failed: {err}")))
119                        as Box<dyn std::error::Error + Send + Sync>
120                })?;
121            if let Some(parent) = dest_clone.parent() {
122                tokio::fs::create_dir_all(parent).await?;
123            }
124            let mut file = tokio::fs::File::create(&dest_clone).await?;
125            let mut reader = response.body.into_async_read();
126            tokio::io::copy(&mut reader, &mut file).await?;
127            file.flush().await?;
128            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
129        })?;
130        Ok(dest)
131    }
132
133    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
134        let location = parse_s3_uri(uri)?;
135        let bucket = location.bucket;
136        let key = location.key;
137        let path = local_path.to_path_buf();
138        self.runtime.block_on(async move {
139            let body = ByteStream::from_path(path).await.map_err(|err| {
140                Box::new(StorageError(format!("s3 upload body failed: {err}")))
141                    as Box<dyn std::error::Error + Send + Sync>
142            })?;
143            self.client
144                .put_object()
145                .bucket(bucket)
146                .key(key)
147                .body(body)
148                .send()
149                .await
150                .map_err(|err| {
151                    Box::new(StorageError(format!("s3 put object failed: {err}")))
152                        as Box<dyn std::error::Error + Send + Sync>
153                })?;
154            Ok(())
155        })
156    }
157
158    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
159        Ok(format_s3_uri(self.bucket(), path.trim_start_matches('/')))
160    }
161
162    fn delete(&self, uri: &str) -> FloeResult<()> {
163        let location = parse_s3_uri(uri)?;
164        let bucket = location.bucket;
165        let key = location.key;
166        self.runtime.block_on(async move {
167            self.client
168                .delete_object()
169                .bucket(bucket)
170                .key(key)
171                .send()
172                .await
173                .map_err(|err| {
174                    Box::new(StorageError(format!("s3 delete object failed: {err}")))
175                        as Box<dyn std::error::Error + Send + Sync>
176                })?;
177            Ok(())
178        })
179    }
180}
181
182#[derive(Debug, Clone, PartialEq, Eq)]
183pub struct S3Location {
184    pub bucket: String,
185    pub key: String,
186}
187
188pub fn parse_s3_uri(uri: &str) -> FloeResult<S3Location> {
189    let stripped = uri.strip_prefix("s3://").ok_or_else(|| {
190        Box::new(ConfigError(format!("expected s3 uri, got {}", uri)))
191            as Box<dyn std::error::Error + Send + Sync>
192    })?;
193    let mut parts = stripped.splitn(2, '/');
194    let bucket = parts.next().unwrap_or("").to_string();
195    if bucket.is_empty() {
196        return Err(Box::new(ConfigError(format!(
197            "missing bucket in s3 uri: {}",
198            uri
199        ))));
200    }
201    let key = parts.next().unwrap_or("").to_string();
202    Ok(S3Location { bucket, key })
203}
204
205pub fn format_s3_uri(bucket: &str, key: &str) -> String {
206    if key.is_empty() {
207        format!("s3://{}", bucket)
208    } else {
209        format!("s3://{}/{}", bucket, key)
210    }
211}
212
213pub fn filter_keys_by_suffixes(mut keys: Vec<String>, suffixes: &[String]) -> Vec<String> {
214    let mut refs = Vec::with_capacity(keys.len());
215    for key in keys.drain(..) {
216        refs.push(ObjectRef {
217            uri: key.clone(),
218            key,
219            last_modified: None,
220            size: None,
221        });
222    }
223    let filtered = planner::filter_by_suffixes(refs, suffixes);
224    let sorted = planner::stable_sort_refs(filtered);
225    sorted.into_iter().map(|obj| obj.key).collect()
226}
227
228pub fn temp_path_for_key(temp_dir: &Path, key: &str) -> PathBuf {
229    let mut hasher = DefaultHasher::new();
230    key.hash(&mut hasher);
231    let hash = hasher.finish();
232    let name = file_name_from_key(key).unwrap_or_else(|| "object".to_string());
233    let sanitized = sanitize_filename(&name);
234    temp_dir.join(format!("{hash:016x}_{sanitized}"))
235}
236
237pub fn file_name_from_key(key: &str) -> Option<String> {
238    Path::new(key)
239        .file_name()
240        .map(|name| name.to_string_lossy().to_string())
241}
242
243fn sanitize_filename(name: &str) -> String {
244    name.chars()
245        .map(|ch| {
246            if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '-' | '_') {
247                ch
248            } else {
249                '_'
250            }
251        })
252        .collect()
253}
254
255pub fn file_stem_from_name(name: &str) -> Option<String> {
256    Path::new(name)
257        .file_stem()
258        .map(|stem| stem.to_string_lossy().to_string())
259}
260
261pub fn build_input_files(
262    client: &dyn StorageClient,
263    bucket: &str,
264    prefix: &str,
265    adapter: &dyn io::format::InputAdapter,
266    temp_dir: &Path,
267    entity: &config::EntityConfig,
268    storage: &str,
269) -> FloeResult<Vec<io::format::InputFile>> {
270    let suffixes = adapter.suffixes()?;
271    let list_refs = client.list(prefix)?;
272    let filtered = planner::filter_by_suffixes(list_refs, &suffixes);
273    let filtered = planner::stable_sort_refs(filtered);
274    if filtered.is_empty() {
275        return Err(Box::new(RunError(format!(
276            "entity.name={} source.storage={} no input objects matched (bucket={}, prefix={}, suffixes={})",
277            entity.name,
278            storage,
279            bucket,
280            prefix,
281            suffixes.join(",")
282        ))));
283    }
284    let mut inputs = Vec::with_capacity(filtered.len());
285    for object in filtered {
286        let local_path = client.download_to_temp(&object.uri, temp_dir)?;
287        let source_name = file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
288        let source_stem = file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
289        let source_uri = object.uri;
290        inputs.push(io::format::InputFile {
291            source_uri,
292            source_local_path: local_path,
293            source_name,
294            source_stem,
295        });
296    }
297    Ok(inputs)
298}