floe_core/io/storage/providers/
s3.rs1use std::path::{Path, PathBuf};
2
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::config::Region;
5use aws_sdk_s3::primitives::ByteStream;
6use aws_sdk_s3::Client;
7use tokio::io::AsyncWriteExt;
8use tokio::runtime::Runtime;
9
10use crate::errors::StorageError;
11use crate::io::storage::uri::{format_bucket_uri, parse_bucket_uri, BucketLocation};
12use crate::io::storage::{planner, ConditionalWrite, ObjectRef, StorageClient, StoredObject};
13use crate::FloeResult;
14
15pub struct S3Client {
16 bucket: String,
17 client: Client,
18 runtime: Runtime,
19}
20
21impl S3Client {
22 pub fn new(bucket: String, region: Option<&str>) -> FloeResult<Self> {
23 let runtime = tokio::runtime::Builder::new_current_thread()
24 .enable_all()
25 .build()
26 .map_err(|err| Box::new(StorageError(format!("failed to build aws runtime: {err}"))))?;
27 let config = runtime.block_on(async {
28 let region_provider = match region {
29 Some(region) => RegionProviderChain::first_try(Region::new(region.to_string()))
30 .or_default_provider(),
31 None => RegionProviderChain::default_provider(),
32 };
33 aws_config::defaults(aws_config::BehaviorVersion::latest())
34 .region(region_provider)
35 .load()
36 .await
37 });
38 let client = Client::new(&config);
39 Ok(Self {
40 bucket,
41 client,
42 runtime,
43 })
44 }
45
46 fn bucket(&self) -> &str {
47 self.bucket.as_str()
48 }
49}
50
51impl StorageClient for S3Client {
52 fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
53 let bucket = self.bucket().to_string();
54 let prefix = prefix.to_string();
55 self.runtime.block_on(async {
56 let mut refs = Vec::new();
57 let mut continuation = None;
58 loop {
59 let mut request = self.client.list_objects_v2().bucket(&bucket);
60 if !prefix.is_empty() {
61 request = request.prefix(&prefix);
62 }
63 if let Some(token) = continuation {
64 request = request.continuation_token(token);
65 }
66 let response = request.send().await.map_err(|err| {
67 Box::new(StorageError(format!(
68 "s3 list objects failed for bucket {}: {err}",
69 bucket
70 ))) as Box<dyn std::error::Error + Send + Sync>
71 })?;
72 if let Some(contents) = response.contents {
73 for object in contents {
74 if let Some(key) = object.key {
75 let uri = format_s3_uri(&bucket, &key);
76 refs.push(planner::object_ref(
77 uri,
78 key,
79 object.last_modified.as_ref().map(|value| value.to_string()),
80 object.size.map(|value| value as u64),
81 ));
82 }
83 }
84 }
85 if response.is_truncated.unwrap_or(false) {
86 continuation = response.next_continuation_token;
87 if continuation.is_none() {
88 break;
89 }
90 } else {
91 break;
92 }
93 }
94 Ok(refs)
95 })
96 }
97
98 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
99 let location = parse_s3_uri(uri)?;
100 let bucket = location.bucket;
101 let key = location.key;
102 let dest = planner::temp_path_for_key(temp_dir, &key);
103 let dest_clone = dest.clone();
104 self.runtime.block_on(async move {
105 let response = self
106 .client
107 .get_object()
108 .bucket(bucket)
109 .key(key.clone())
110 .send()
111 .await
112 .map_err(|err| {
113 Box::new(StorageError(format!("s3 get object failed: {err}")))
114 as Box<dyn std::error::Error + Send + Sync>
115 })?;
116 if let Some(parent) = dest_clone.parent() {
117 tokio::fs::create_dir_all(parent).await?;
118 }
119 let mut file = tokio::fs::File::create(&dest_clone).await?;
120 let mut reader = response.body.into_async_read();
121 tokio::io::copy(&mut reader, &mut file).await?;
122 file.flush().await?;
123 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
124 })?;
125 Ok(dest)
126 }
127
128 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
129 let location = parse_s3_uri(uri)?;
130 let bucket = location.bucket;
131 let key = location.key;
132 let path = local_path.to_path_buf();
133 self.runtime.block_on(async move {
134 let body = ByteStream::from_path(path).await.map_err(|err| {
135 Box::new(StorageError(format!("s3 upload body failed: {err}")))
136 as Box<dyn std::error::Error + Send + Sync>
137 })?;
138 self.client
139 .put_object()
140 .bucket(bucket)
141 .key(key)
142 .body(body)
143 .send()
144 .await
145 .map_err(|err| {
146 Box::new(StorageError(format!("s3 put object failed: {err}")))
147 as Box<dyn std::error::Error + Send + Sync>
148 })?;
149 Ok(())
150 })
151 }
152
153 fn resolve_uri(&self, path: &str) -> FloeResult<String> {
154 Ok(format_s3_uri(self.bucket(), path.trim_start_matches('/')))
155 }
156
157 fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
158 let src = parse_s3_uri(src_uri)?;
159 let dst = parse_s3_uri(dst_uri)?;
160 let copy_source = format!("{}/{}", src.bucket, src.key);
161 self.runtime.block_on(async move {
162 self.client
163 .copy_object()
164 .bucket(dst.bucket)
165 .key(dst.key)
166 .copy_source(copy_source)
167 .send()
168 .await
169 .map_err(|err| {
170 Box::new(StorageError(format!("s3 copy object failed: {err}")))
171 as Box<dyn std::error::Error + Send + Sync>
172 })?;
173 Ok(())
174 })
175 }
176
177 fn delete_object(&self, uri: &str) -> FloeResult<()> {
178 let location = parse_s3_uri(uri)?;
179 let bucket = location.bucket;
180 let key = location.key;
181 self.runtime.block_on(async move {
182 self.client
183 .delete_object()
184 .bucket(bucket)
185 .key(key)
186 .send()
187 .await
188 .map_err(|err| {
189 Box::new(StorageError(format!("s3 delete object failed: {err}")))
190 as Box<dyn std::error::Error + Send + Sync>
191 })?;
192 Ok(())
193 })
194 }
195
196 fn exists(&self, uri: &str) -> FloeResult<bool> {
197 let location = parse_s3_uri(uri)?;
198 planner::exists_by_key(self, &location.key)
199 }
200
201 fn read_object(&self, uri: &str) -> FloeResult<Option<StoredObject>> {
202 let location = parse_s3_uri(uri)?;
203 self.runtime.block_on(async move {
204 let response = self
205 .client
206 .get_object()
207 .bucket(location.bucket)
208 .key(location.key)
209 .send()
210 .await;
211 let response = match response {
212 Ok(response) => response,
213 Err(err) if is_not_found(&err) => return Ok(None),
214 Err(err) => {
215 return Err(
216 Box::new(StorageError(format!("s3 get object failed: {err}")))
217 as Box<dyn std::error::Error + Send + Sync>,
218 )
219 }
220 };
221 let version = response.e_tag.unwrap_or_default();
222 let body = response.body.collect().await.map_err(|err| {
223 Box::new(StorageError(format!("s3 read object body failed: {err}")))
224 as Box<dyn std::error::Error + Send + Sync>
225 })?;
226 Ok(Some(StoredObject {
227 body: body.into_bytes().to_vec(),
228 version,
229 }))
230 })
231 }
232
233 fn write_object_conditional(
234 &self,
235 uri: &str,
236 expected_version: Option<&str>,
237 body: &[u8],
238 ) -> FloeResult<ConditionalWrite> {
239 let location = parse_s3_uri(uri)?;
240 let body = ByteStream::from(body.to_vec());
241 self.runtime.block_on(async move {
242 let mut request = self
243 .client
244 .put_object()
245 .bucket(location.bucket)
246 .key(location.key)
247 .body(body);
248 request = match expected_version {
249 Some(version) => request.if_match(version),
250 None => request.if_none_match("*"),
251 };
252 match request.send().await {
253 Ok(output) => Ok(ConditionalWrite::Written {
254 version: output.e_tag.unwrap_or_default(),
255 }),
256 Err(err) if is_precondition_or_conflict(&err) => Ok(ConditionalWrite::Conflict),
257 Err(err) => Err(
258 Box::new(StorageError(format!("s3 put object failed: {err}")))
259 as Box<dyn std::error::Error + Send + Sync>,
260 ),
261 }
262 })
263 }
264
265 fn delete_object_conditional(
266 &self,
267 uri: &str,
268 expected_version: Option<&str>,
269 ) -> FloeResult<ConditionalWrite> {
270 let Some(expected_version) = expected_version else {
271 return Ok(ConditionalWrite::Written {
274 version: "deleted".to_string(),
275 });
276 };
277 let location = parse_s3_uri(uri)?;
278 self.runtime.block_on(async move {
279 match self
280 .client
281 .delete_object()
282 .bucket(location.bucket)
283 .key(location.key)
284 .if_match(expected_version)
285 .send()
286 .await
287 {
288 Ok(_) => Ok(ConditionalWrite::Written {
289 version: "deleted".to_string(),
290 }),
291 Err(err) if is_precondition_or_conflict(&err) => Ok(ConditionalWrite::Conflict),
292 Err(err) => Err(
293 Box::new(StorageError(format!("s3 delete object failed: {err}")))
294 as Box<dyn std::error::Error + Send + Sync>,
295 ),
296 }
297 })
298 }
299}
300
301fn is_not_found<E: std::fmt::Display>(err: &E) -> bool {
302 let text = err.to_string();
303 text.contains("NoSuchKey") || text.contains("NotFound") || text.contains("404")
304}
305
306fn is_precondition_or_conflict<E: std::fmt::Display>(err: &E) -> bool {
307 let text = err.to_string();
308 text.contains("PreconditionFailed")
309 || text.contains("ConditionalRequestConflict")
310 || text.contains("412")
311 || text.contains("409")
312}
313
314pub fn parse_s3_uri(uri: &str) -> FloeResult<S3Location> {
315 parse_bucket_uri("s3", uri)
316}
317
318pub fn format_s3_uri(bucket: &str, key: &str) -> String {
319 format_bucket_uri("s3", bucket, key)
320}
321
322pub type S3Location = BucketLocation;
323
324pub fn filter_keys_by_suffixes(mut keys: Vec<String>, suffixes: &[String]) -> Vec<String> {
325 let mut refs = Vec::with_capacity(keys.len());
326 for key in keys.drain(..) {
327 refs.push(ObjectRef {
328 uri: key.clone(),
329 key,
330 last_modified: None,
331 size: None,
332 });
333 }
334 let filtered = planner::filter_by_suffixes(refs, suffixes);
335 let sorted = planner::stable_sort_refs(filtered);
336 sorted.into_iter().map(|obj| obj.key).collect()
337}