Skip to main content

floe_core/io/storage/providers/
s3.rs

1use std::path::{Path, PathBuf};
2
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::config::Region;
5use aws_sdk_s3::primitives::ByteStream;
6use aws_sdk_s3::Client;
7use tokio::io::AsyncWriteExt;
8use tokio::runtime::Runtime;
9
10use crate::errors::StorageError;
11use crate::io::storage::uri::{format_bucket_uri, parse_bucket_uri, BucketLocation};
12use crate::io::storage::{planner, ConditionalWrite, ObjectRef, StorageClient, StoredObject};
13use crate::FloeResult;
14
15pub struct S3Client {
16    bucket: String,
17    client: Client,
18    runtime: Runtime,
19}
20
21impl S3Client {
22    pub fn new(bucket: String, region: Option<&str>) -> FloeResult<Self> {
23        let runtime = tokio::runtime::Builder::new_current_thread()
24            .enable_all()
25            .build()
26            .map_err(|err| Box::new(StorageError(format!("failed to build aws runtime: {err}"))))?;
27        let config = runtime.block_on(async {
28            let region_provider = match region {
29                Some(region) => RegionProviderChain::first_try(Region::new(region.to_string()))
30                    .or_default_provider(),
31                None => RegionProviderChain::default_provider(),
32            };
33            aws_config::defaults(aws_config::BehaviorVersion::latest())
34                .region(region_provider)
35                .load()
36                .await
37        });
38        let client = Client::new(&config);
39        Ok(Self {
40            bucket,
41            client,
42            runtime,
43        })
44    }
45
46    fn bucket(&self) -> &str {
47        self.bucket.as_str()
48    }
49}
50
51impl StorageClient for S3Client {
52    fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
53        let bucket = self.bucket().to_string();
54        let prefix = prefix.to_string();
55        self.runtime.block_on(async {
56            let mut refs = Vec::new();
57            let mut continuation = None;
58            loop {
59                let mut request = self.client.list_objects_v2().bucket(&bucket);
60                if !prefix.is_empty() {
61                    request = request.prefix(&prefix);
62                }
63                if let Some(token) = continuation {
64                    request = request.continuation_token(token);
65                }
66                let response = request.send().await.map_err(|err| {
67                    Box::new(StorageError(format!(
68                        "s3 list objects failed for bucket {}: {err}",
69                        bucket
70                    ))) as Box<dyn std::error::Error + Send + Sync>
71                })?;
72                if let Some(contents) = response.contents {
73                    for object in contents {
74                        if let Some(key) = object.key {
75                            let uri = format_s3_uri(&bucket, &key);
76                            refs.push(planner::object_ref(
77                                uri,
78                                key,
79                                object.last_modified.as_ref().map(|value| value.to_string()),
80                                object.size.map(|value| value as u64),
81                            ));
82                        }
83                    }
84                }
85                if response.is_truncated.unwrap_or(false) {
86                    continuation = response.next_continuation_token;
87                    if continuation.is_none() {
88                        break;
89                    }
90                } else {
91                    break;
92                }
93            }
94            Ok(refs)
95        })
96    }
97
98    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
99        let location = parse_s3_uri(uri)?;
100        let bucket = location.bucket;
101        let key = location.key;
102        let dest = planner::temp_path_for_key(temp_dir, &key);
103        let dest_clone = dest.clone();
104        self.runtime.block_on(async move {
105            let response = self
106                .client
107                .get_object()
108                .bucket(bucket)
109                .key(key.clone())
110                .send()
111                .await
112                .map_err(|err| {
113                    Box::new(StorageError(format!("s3 get object failed: {err}")))
114                        as Box<dyn std::error::Error + Send + Sync>
115                })?;
116            if let Some(parent) = dest_clone.parent() {
117                tokio::fs::create_dir_all(parent).await?;
118            }
119            let mut file = tokio::fs::File::create(&dest_clone).await?;
120            let mut reader = response.body.into_async_read();
121            tokio::io::copy(&mut reader, &mut file).await?;
122            file.flush().await?;
123            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
124        })?;
125        Ok(dest)
126    }
127
128    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
129        let location = parse_s3_uri(uri)?;
130        let bucket = location.bucket;
131        let key = location.key;
132        let path = local_path.to_path_buf();
133        self.runtime.block_on(async move {
134            let body = ByteStream::from_path(path).await.map_err(|err| {
135                Box::new(StorageError(format!("s3 upload body failed: {err}")))
136                    as Box<dyn std::error::Error + Send + Sync>
137            })?;
138            self.client
139                .put_object()
140                .bucket(bucket)
141                .key(key)
142                .body(body)
143                .send()
144                .await
145                .map_err(|err| {
146                    Box::new(StorageError(format!("s3 put object failed: {err}")))
147                        as Box<dyn std::error::Error + Send + Sync>
148                })?;
149            Ok(())
150        })
151    }
152
153    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
154        Ok(format_s3_uri(self.bucket(), path.trim_start_matches('/')))
155    }
156
157    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
158        let src = parse_s3_uri(src_uri)?;
159        let dst = parse_s3_uri(dst_uri)?;
160        let copy_source = format!("{}/{}", src.bucket, src.key);
161        self.runtime.block_on(async move {
162            self.client
163                .copy_object()
164                .bucket(dst.bucket)
165                .key(dst.key)
166                .copy_source(copy_source)
167                .send()
168                .await
169                .map_err(|err| {
170                    Box::new(StorageError(format!("s3 copy object failed: {err}")))
171                        as Box<dyn std::error::Error + Send + Sync>
172                })?;
173            Ok(())
174        })
175    }
176
177    fn delete_object(&self, uri: &str) -> FloeResult<()> {
178        let location = parse_s3_uri(uri)?;
179        let bucket = location.bucket;
180        let key = location.key;
181        self.runtime.block_on(async move {
182            self.client
183                .delete_object()
184                .bucket(bucket)
185                .key(key)
186                .send()
187                .await
188                .map_err(|err| {
189                    Box::new(StorageError(format!("s3 delete object failed: {err}")))
190                        as Box<dyn std::error::Error + Send + Sync>
191                })?;
192            Ok(())
193        })
194    }
195
196    fn exists(&self, uri: &str) -> FloeResult<bool> {
197        let location = parse_s3_uri(uri)?;
198        planner::exists_by_key(self, &location.key)
199    }
200
201    fn read_object(&self, uri: &str) -> FloeResult<Option<StoredObject>> {
202        let location = parse_s3_uri(uri)?;
203        self.runtime.block_on(async move {
204            let response = self
205                .client
206                .get_object()
207                .bucket(location.bucket)
208                .key(location.key)
209                .send()
210                .await;
211            let response = match response {
212                Ok(response) => response,
213                Err(err) if is_not_found(&err) => return Ok(None),
214                Err(err) => {
215                    return Err(
216                        Box::new(StorageError(format!("s3 get object failed: {err}")))
217                            as Box<dyn std::error::Error + Send + Sync>,
218                    )
219                }
220            };
221            let version = response.e_tag.unwrap_or_default();
222            let body = response.body.collect().await.map_err(|err| {
223                Box::new(StorageError(format!("s3 read object body failed: {err}")))
224                    as Box<dyn std::error::Error + Send + Sync>
225            })?;
226            Ok(Some(StoredObject {
227                body: body.into_bytes().to_vec(),
228                version,
229            }))
230        })
231    }
232
233    fn write_object_conditional(
234        &self,
235        uri: &str,
236        expected_version: Option<&str>,
237        body: &[u8],
238    ) -> FloeResult<ConditionalWrite> {
239        let location = parse_s3_uri(uri)?;
240        let body = ByteStream::from(body.to_vec());
241        self.runtime.block_on(async move {
242            let mut request = self
243                .client
244                .put_object()
245                .bucket(location.bucket)
246                .key(location.key)
247                .body(body);
248            request = match expected_version {
249                Some(version) => request.if_match(version),
250                None => request.if_none_match("*"),
251            };
252            match request.send().await {
253                Ok(output) => Ok(ConditionalWrite::Written {
254                    version: output.e_tag.unwrap_or_default(),
255                }),
256                Err(err) if is_precondition_or_conflict(&err) => Ok(ConditionalWrite::Conflict),
257                Err(err) => Err(
258                    Box::new(StorageError(format!("s3 put object failed: {err}")))
259                        as Box<dyn std::error::Error + Send + Sync>,
260                ),
261            }
262        })
263    }
264
265    fn delete_object_conditional(
266        &self,
267        uri: &str,
268        expected_version: Option<&str>,
269    ) -> FloeResult<ConditionalWrite> {
270        let Some(expected_version) = expected_version else {
271            // expected_version == None means we expected no object to exist.
272            // Do not delete — any object present was created by a concurrent runner.
273            return Ok(ConditionalWrite::Written {
274                version: "deleted".to_string(),
275            });
276        };
277        let location = parse_s3_uri(uri)?;
278        self.runtime.block_on(async move {
279            match self
280                .client
281                .delete_object()
282                .bucket(location.bucket)
283                .key(location.key)
284                .if_match(expected_version)
285                .send()
286                .await
287            {
288                Ok(_) => Ok(ConditionalWrite::Written {
289                    version: "deleted".to_string(),
290                }),
291                Err(err) if is_precondition_or_conflict(&err) => Ok(ConditionalWrite::Conflict),
292                Err(err) => Err(
293                    Box::new(StorageError(format!("s3 delete object failed: {err}")))
294                        as Box<dyn std::error::Error + Send + Sync>,
295                ),
296            }
297        })
298    }
299}
300
301fn is_not_found<E: std::fmt::Display>(err: &E) -> bool {
302    let text = err.to_string();
303    text.contains("NoSuchKey") || text.contains("NotFound") || text.contains("404")
304}
305
306fn is_precondition_or_conflict<E: std::fmt::Display>(err: &E) -> bool {
307    let text = err.to_string();
308    text.contains("PreconditionFailed")
309        || text.contains("ConditionalRequestConflict")
310        || text.contains("412")
311        || text.contains("409")
312}
313
314pub fn parse_s3_uri(uri: &str) -> FloeResult<S3Location> {
315    parse_bucket_uri("s3", uri)
316}
317
318pub fn format_s3_uri(bucket: &str, key: &str) -> String {
319    format_bucket_uri("s3", bucket, key)
320}
321
322pub type S3Location = BucketLocation;
323
324pub fn filter_keys_by_suffixes(mut keys: Vec<String>, suffixes: &[String]) -> Vec<String> {
325    let mut refs = Vec::with_capacity(keys.len());
326    for key in keys.drain(..) {
327        refs.push(ObjectRef {
328            uri: key.clone(),
329            key,
330            last_modified: None,
331            size: None,
332        });
333    }
334    let filtered = planner::filter_by_suffixes(refs, suffixes);
335    let sorted = planner::stable_sort_refs(filtered);
336    sorted.into_iter().map(|obj| obj.key).collect()
337}