Skip to main content

floe_core/io/storage/providers/
s3.rs

1use std::path::{Path, PathBuf};
2
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::config::Region;
5use aws_sdk_s3::primitives::ByteStream;
6use aws_sdk_s3::Client;
7use tokio::io::AsyncWriteExt;
8use tokio::runtime::Runtime;
9
10use crate::errors::StorageError;
11use crate::io::storage::uri::{format_bucket_uri, parse_bucket_uri, BucketLocation};
12use crate::io::storage::{planner, ConditionalWrite, ObjectRef, StorageClient, StoredObject};
13use crate::FloeResult;
14
15pub struct S3Client {
16    bucket: String,
17    client: Client,
18    runtime: Runtime,
19}
20
21impl S3Client {
22    pub fn new(
23        bucket: String,
24        region: Option<&str>,
25        endpoint: Option<&str>,
26        path_style_access: Option<bool>,
27    ) -> FloeResult<Self> {
28        let runtime = tokio::runtime::Builder::new_current_thread()
29            .enable_all()
30            .build()
31            .map_err(|err| Box::new(StorageError(format!("failed to build aws runtime: {err}"))))?;
32        let endpoint = endpoint.map(ToOwned::to_owned);
33        let config = runtime.block_on(async {
34            let region_provider = match region {
35                Some(region) => RegionProviderChain::first_try(Region::new(region.to_string()))
36                    .or_default_provider(),
37                None => RegionProviderChain::default_provider(),
38            };
39            let mut builder =
40                aws_config::defaults(aws_config::BehaviorVersion::latest()).region(region_provider);
41            if let Some(ep) = endpoint {
42                builder = builder.endpoint_url(ep);
43            }
44            builder.load().await
45        });
46        let mut s3_builder = aws_sdk_s3::config::Builder::from(&config);
47        if path_style_access.unwrap_or(false) {
48            s3_builder = s3_builder.force_path_style(true);
49        }
50        let client = Client::from_conf(s3_builder.build());
51        Ok(Self {
52            bucket,
53            client,
54            runtime,
55        })
56    }
57
58    fn bucket(&self) -> &str {
59        self.bucket.as_str()
60    }
61}
62
63impl StorageClient for S3Client {
64    fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
65        let bucket = self.bucket().to_string();
66        let prefix = prefix.to_string();
67        self.runtime.block_on(async {
68            let mut refs = Vec::new();
69            let mut continuation = None;
70            loop {
71                let mut request = self.client.list_objects_v2().bucket(&bucket);
72                if !prefix.is_empty() {
73                    request = request.prefix(&prefix);
74                }
75                if let Some(token) = continuation {
76                    request = request.continuation_token(token);
77                }
78                let response = request.send().await.map_err(|err| {
79                    Box::new(StorageError(format!(
80                        "s3 list objects failed for bucket {}: {err}",
81                        bucket
82                    ))) as Box<dyn std::error::Error + Send + Sync>
83                })?;
84                if let Some(contents) = response.contents {
85                    for object in contents {
86                        if let Some(key) = object.key {
87                            let uri = format_s3_uri(&bucket, &key);
88                            refs.push(planner::object_ref(
89                                uri,
90                                key,
91                                object.last_modified.as_ref().map(|value| value.to_string()),
92                                object.size.map(|value| value as u64),
93                            ));
94                        }
95                    }
96                }
97                if response.is_truncated.unwrap_or(false) {
98                    continuation = response.next_continuation_token;
99                    if continuation.is_none() {
100                        break;
101                    }
102                } else {
103                    break;
104                }
105            }
106            Ok(refs)
107        })
108    }
109
110    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
111        let location = parse_s3_uri(uri)?;
112        let bucket = location.bucket;
113        let key = location.key;
114        let dest = planner::temp_path_for_key(temp_dir, &key);
115        let dest_clone = dest.clone();
116        self.runtime.block_on(async move {
117            let response = self
118                .client
119                .get_object()
120                .bucket(bucket)
121                .key(key.clone())
122                .send()
123                .await
124                .map_err(|err| {
125                    Box::new(StorageError(format!("s3 get object failed: {err}")))
126                        as Box<dyn std::error::Error + Send + Sync>
127                })?;
128            if let Some(parent) = dest_clone.parent() {
129                tokio::fs::create_dir_all(parent).await?;
130            }
131            let mut file = tokio::fs::File::create(&dest_clone).await?;
132            let mut reader = response.body.into_async_read();
133            tokio::io::copy(&mut reader, &mut file).await?;
134            file.flush().await?;
135            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
136        })?;
137        Ok(dest)
138    }
139
140    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
141        let location = parse_s3_uri(uri)?;
142        let bucket = location.bucket;
143        let key = location.key;
144        let path = local_path.to_path_buf();
145        self.runtime.block_on(async move {
146            let body = ByteStream::from_path(path).await.map_err(|err| {
147                Box::new(StorageError(format!("s3 upload body failed: {err}")))
148                    as Box<dyn std::error::Error + Send + Sync>
149            })?;
150            self.client
151                .put_object()
152                .bucket(bucket)
153                .key(key)
154                .body(body)
155                .send()
156                .await
157                .map_err(|err| {
158                    Box::new(StorageError(format!("s3 put object failed: {err}")))
159                        as Box<dyn std::error::Error + Send + Sync>
160                })?;
161            Ok(())
162        })
163    }
164
165    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
166        Ok(format_s3_uri(self.bucket(), path.trim_start_matches('/')))
167    }
168
169    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
170        let src = parse_s3_uri(src_uri)?;
171        let dst = parse_s3_uri(dst_uri)?;
172        let copy_source = format!("{}/{}", src.bucket, src.key);
173        self.runtime.block_on(async move {
174            self.client
175                .copy_object()
176                .bucket(dst.bucket)
177                .key(dst.key)
178                .copy_source(copy_source)
179                .send()
180                .await
181                .map_err(|err| {
182                    Box::new(StorageError(format!("s3 copy object failed: {err}")))
183                        as Box<dyn std::error::Error + Send + Sync>
184                })?;
185            Ok(())
186        })
187    }
188
189    fn delete_object(&self, uri: &str) -> FloeResult<()> {
190        let location = parse_s3_uri(uri)?;
191        let bucket = location.bucket;
192        let key = location.key;
193        self.runtime.block_on(async move {
194            self.client
195                .delete_object()
196                .bucket(bucket)
197                .key(key)
198                .send()
199                .await
200                .map_err(|err| {
201                    Box::new(StorageError(format!("s3 delete object failed: {err}")))
202                        as Box<dyn std::error::Error + Send + Sync>
203                })?;
204            Ok(())
205        })
206    }
207
208    fn exists(&self, uri: &str) -> FloeResult<bool> {
209        let location = parse_s3_uri(uri)?;
210        planner::exists_by_key(self, &location.key)
211    }
212
213    fn read_object(&self, uri: &str) -> FloeResult<Option<StoredObject>> {
214        let location = parse_s3_uri(uri)?;
215        self.runtime.block_on(async move {
216            let response = self
217                .client
218                .get_object()
219                .bucket(location.bucket)
220                .key(location.key)
221                .send()
222                .await;
223            let response = match response {
224                Ok(response) => response,
225                Err(err) if is_not_found(&err) => return Ok(None),
226                Err(err) => {
227                    return Err(
228                        Box::new(StorageError(format!("s3 get object failed: {err}")))
229                            as Box<dyn std::error::Error + Send + Sync>,
230                    )
231                }
232            };
233            let version = response.e_tag.unwrap_or_default();
234            let body = response.body.collect().await.map_err(|err| {
235                Box::new(StorageError(format!("s3 read object body failed: {err}")))
236                    as Box<dyn std::error::Error + Send + Sync>
237            })?;
238            Ok(Some(StoredObject {
239                body: body.into_bytes().to_vec(),
240                version,
241            }))
242        })
243    }
244
245    fn write_object_conditional(
246        &self,
247        uri: &str,
248        expected_version: Option<&str>,
249        body: &[u8],
250    ) -> FloeResult<ConditionalWrite> {
251        let location = parse_s3_uri(uri)?;
252        let body = ByteStream::from(body.to_vec());
253        self.runtime.block_on(async move {
254            let mut request = self
255                .client
256                .put_object()
257                .bucket(location.bucket)
258                .key(location.key)
259                .body(body);
260            request = match expected_version {
261                Some(version) => request.if_match(version),
262                None => request.if_none_match("*"),
263            };
264            match request.send().await {
265                Ok(output) => Ok(ConditionalWrite::Written {
266                    version: output.e_tag.unwrap_or_default(),
267                }),
268                Err(err) if is_precondition_or_conflict(&err) => Ok(ConditionalWrite::Conflict),
269                Err(err) => Err(
270                    Box::new(StorageError(format!("s3 put object failed: {err}")))
271                        as Box<dyn std::error::Error + Send + Sync>,
272                ),
273            }
274        })
275    }
276
277    fn delete_object_conditional(
278        &self,
279        uri: &str,
280        expected_version: Option<&str>,
281    ) -> FloeResult<ConditionalWrite> {
282        let Some(expected_version) = expected_version else {
283            // expected_version == None means we expected no object to exist.
284            // Do not delete — any object present was created by a concurrent runner.
285            return Ok(ConditionalWrite::Written {
286                version: "deleted".to_string(),
287            });
288        };
289        let location = parse_s3_uri(uri)?;
290        self.runtime.block_on(async move {
291            match self
292                .client
293                .delete_object()
294                .bucket(location.bucket)
295                .key(location.key)
296                .if_match(expected_version)
297                .send()
298                .await
299            {
300                Ok(_) => Ok(ConditionalWrite::Written {
301                    version: "deleted".to_string(),
302                }),
303                Err(err) if is_precondition_or_conflict(&err) => Ok(ConditionalWrite::Conflict),
304                Err(err) => Err(
305                    Box::new(StorageError(format!("s3 delete object failed: {err}")))
306                        as Box<dyn std::error::Error + Send + Sync>,
307                ),
308            }
309        })
310    }
311}
312
313fn is_not_found<E: std::fmt::Display>(err: &E) -> bool {
314    let text = err.to_string();
315    text.contains("NoSuchKey") || text.contains("NotFound") || text.contains("404")
316}
317
318fn is_precondition_or_conflict<E: std::fmt::Display>(err: &E) -> bool {
319    let text = err.to_string();
320    text.contains("PreconditionFailed")
321        || text.contains("ConditionalRequestConflict")
322        || text.contains("412")
323        || text.contains("409")
324}
325
326pub fn parse_s3_uri(uri: &str) -> FloeResult<S3Location> {
327    parse_bucket_uri("s3", uri)
328}
329
330pub fn format_s3_uri(bucket: &str, key: &str) -> String {
331    format_bucket_uri("s3", bucket, key)
332}
333
334pub type S3Location = BucketLocation;
335
336pub fn filter_keys_by_suffixes(mut keys: Vec<String>, suffixes: &[String]) -> Vec<String> {
337    let mut refs = Vec::with_capacity(keys.len());
338    for key in keys.drain(..) {
339        refs.push(ObjectRef {
340            uri: key.clone(),
341            key,
342            last_modified: None,
343            size: None,
344        });
345    }
346    let filtered = planner::filter_by_suffixes(refs, suffixes);
347    let sorted = planner::stable_sort_refs(filtered);
348    sorted.into_iter().map(|obj| obj.key).collect()
349}