floe_core/io/storage/providers/
s3.rs1use std::path::{Path, PathBuf};
2
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::config::Region;
5use aws_sdk_s3::primitives::ByteStream;
6use aws_sdk_s3::Client;
7use tokio::io::AsyncWriteExt;
8use tokio::runtime::Runtime;
9
10use crate::errors::StorageError;
11use crate::io::storage::uri::{format_bucket_uri, parse_bucket_uri, BucketLocation};
12use crate::io::storage::{planner, ConditionalWrite, ObjectRef, StorageClient, StoredObject};
13use crate::FloeResult;
14
15pub struct S3Client {
16 bucket: String,
17 client: Client,
18 runtime: Runtime,
19}
20
21impl S3Client {
22 pub fn new(
23 bucket: String,
24 region: Option<&str>,
25 endpoint: Option<&str>,
26 path_style_access: Option<bool>,
27 ) -> FloeResult<Self> {
28 let runtime = tokio::runtime::Builder::new_current_thread()
29 .enable_all()
30 .build()
31 .map_err(|err| Box::new(StorageError(format!("failed to build aws runtime: {err}"))))?;
32 let endpoint = endpoint.map(ToOwned::to_owned);
33 let config = runtime.block_on(async {
34 let region_provider = match region {
35 Some(region) => RegionProviderChain::first_try(Region::new(region.to_string()))
36 .or_default_provider(),
37 None => RegionProviderChain::default_provider(),
38 };
39 let mut builder =
40 aws_config::defaults(aws_config::BehaviorVersion::latest()).region(region_provider);
41 if let Some(ep) = endpoint {
42 builder = builder.endpoint_url(ep);
43 }
44 builder.load().await
45 });
46 let mut s3_builder = aws_sdk_s3::config::Builder::from(&config);
47 if path_style_access.unwrap_or(false) {
48 s3_builder = s3_builder.force_path_style(true);
49 }
50 let client = Client::from_conf(s3_builder.build());
51 Ok(Self {
52 bucket,
53 client,
54 runtime,
55 })
56 }
57
58 fn bucket(&self) -> &str {
59 self.bucket.as_str()
60 }
61}
62
63impl StorageClient for S3Client {
64 fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
65 let bucket = self.bucket().to_string();
66 let prefix = prefix.to_string();
67 self.runtime.block_on(async {
68 let mut refs = Vec::new();
69 let mut continuation = None;
70 loop {
71 let mut request = self.client.list_objects_v2().bucket(&bucket);
72 if !prefix.is_empty() {
73 request = request.prefix(&prefix);
74 }
75 if let Some(token) = continuation {
76 request = request.continuation_token(token);
77 }
78 let response = request.send().await.map_err(|err| {
79 Box::new(StorageError(format!(
80 "s3 list objects failed for bucket {}: {err}",
81 bucket
82 ))) as Box<dyn std::error::Error + Send + Sync>
83 })?;
84 if let Some(contents) = response.contents {
85 for object in contents {
86 if let Some(key) = object.key {
87 let uri = format_s3_uri(&bucket, &key);
88 refs.push(planner::object_ref(
89 uri,
90 key,
91 object.last_modified.as_ref().map(|value| value.to_string()),
92 object.size.map(|value| value as u64),
93 ));
94 }
95 }
96 }
97 if response.is_truncated.unwrap_or(false) {
98 continuation = response.next_continuation_token;
99 if continuation.is_none() {
100 break;
101 }
102 } else {
103 break;
104 }
105 }
106 Ok(refs)
107 })
108 }
109
110 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
111 let location = parse_s3_uri(uri)?;
112 let bucket = location.bucket;
113 let key = location.key;
114 let dest = planner::temp_path_for_key(temp_dir, &key);
115 let dest_clone = dest.clone();
116 self.runtime.block_on(async move {
117 let response = self
118 .client
119 .get_object()
120 .bucket(bucket)
121 .key(key.clone())
122 .send()
123 .await
124 .map_err(|err| {
125 Box::new(StorageError(format!("s3 get object failed: {err}")))
126 as Box<dyn std::error::Error + Send + Sync>
127 })?;
128 if let Some(parent) = dest_clone.parent() {
129 tokio::fs::create_dir_all(parent).await?;
130 }
131 let mut file = tokio::fs::File::create(&dest_clone).await?;
132 let mut reader = response.body.into_async_read();
133 tokio::io::copy(&mut reader, &mut file).await?;
134 file.flush().await?;
135 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
136 })?;
137 Ok(dest)
138 }
139
140 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
141 let location = parse_s3_uri(uri)?;
142 let bucket = location.bucket;
143 let key = location.key;
144 let path = local_path.to_path_buf();
145 self.runtime.block_on(async move {
146 let body = ByteStream::from_path(path).await.map_err(|err| {
147 Box::new(StorageError(format!("s3 upload body failed: {err}")))
148 as Box<dyn std::error::Error + Send + Sync>
149 })?;
150 self.client
151 .put_object()
152 .bucket(bucket)
153 .key(key)
154 .body(body)
155 .send()
156 .await
157 .map_err(|err| {
158 Box::new(StorageError(format!("s3 put object failed: {err}")))
159 as Box<dyn std::error::Error + Send + Sync>
160 })?;
161 Ok(())
162 })
163 }
164
165 fn resolve_uri(&self, path: &str) -> FloeResult<String> {
166 Ok(format_s3_uri(self.bucket(), path.trim_start_matches('/')))
167 }
168
169 fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
170 let src = parse_s3_uri(src_uri)?;
171 let dst = parse_s3_uri(dst_uri)?;
172 let copy_source = format!("{}/{}", src.bucket, src.key);
173 self.runtime.block_on(async move {
174 self.client
175 .copy_object()
176 .bucket(dst.bucket)
177 .key(dst.key)
178 .copy_source(copy_source)
179 .send()
180 .await
181 .map_err(|err| {
182 Box::new(StorageError(format!("s3 copy object failed: {err}")))
183 as Box<dyn std::error::Error + Send + Sync>
184 })?;
185 Ok(())
186 })
187 }
188
189 fn delete_object(&self, uri: &str) -> FloeResult<()> {
190 let location = parse_s3_uri(uri)?;
191 let bucket = location.bucket;
192 let key = location.key;
193 self.runtime.block_on(async move {
194 self.client
195 .delete_object()
196 .bucket(bucket)
197 .key(key)
198 .send()
199 .await
200 .map_err(|err| {
201 Box::new(StorageError(format!("s3 delete object failed: {err}")))
202 as Box<dyn std::error::Error + Send + Sync>
203 })?;
204 Ok(())
205 })
206 }
207
208 fn exists(&self, uri: &str) -> FloeResult<bool> {
209 let location = parse_s3_uri(uri)?;
210 planner::exists_by_key(self, &location.key)
211 }
212
213 fn read_object(&self, uri: &str) -> FloeResult<Option<StoredObject>> {
214 let location = parse_s3_uri(uri)?;
215 self.runtime.block_on(async move {
216 let response = self
217 .client
218 .get_object()
219 .bucket(location.bucket)
220 .key(location.key)
221 .send()
222 .await;
223 let response = match response {
224 Ok(response) => response,
225 Err(err) if is_not_found(&err) => return Ok(None),
226 Err(err) => {
227 return Err(
228 Box::new(StorageError(format!("s3 get object failed: {err}")))
229 as Box<dyn std::error::Error + Send + Sync>,
230 )
231 }
232 };
233 let version = response.e_tag.unwrap_or_default();
234 let body = response.body.collect().await.map_err(|err| {
235 Box::new(StorageError(format!("s3 read object body failed: {err}")))
236 as Box<dyn std::error::Error + Send + Sync>
237 })?;
238 Ok(Some(StoredObject {
239 body: body.into_bytes().to_vec(),
240 version,
241 }))
242 })
243 }
244
245 fn write_object_conditional(
246 &self,
247 uri: &str,
248 expected_version: Option<&str>,
249 body: &[u8],
250 ) -> FloeResult<ConditionalWrite> {
251 let location = parse_s3_uri(uri)?;
252 let body = ByteStream::from(body.to_vec());
253 self.runtime.block_on(async move {
254 let mut request = self
255 .client
256 .put_object()
257 .bucket(location.bucket)
258 .key(location.key)
259 .body(body);
260 request = match expected_version {
261 Some(version) => request.if_match(version),
262 None => request.if_none_match("*"),
263 };
264 match request.send().await {
265 Ok(output) => Ok(ConditionalWrite::Written {
266 version: output.e_tag.unwrap_or_default(),
267 }),
268 Err(err) if is_precondition_or_conflict(&err) => Ok(ConditionalWrite::Conflict),
269 Err(err) => Err(
270 Box::new(StorageError(format!("s3 put object failed: {err}")))
271 as Box<dyn std::error::Error + Send + Sync>,
272 ),
273 }
274 })
275 }
276
277 fn delete_object_conditional(
278 &self,
279 uri: &str,
280 expected_version: Option<&str>,
281 ) -> FloeResult<ConditionalWrite> {
282 let Some(expected_version) = expected_version else {
283 return Ok(ConditionalWrite::Written {
286 version: "deleted".to_string(),
287 });
288 };
289 let location = parse_s3_uri(uri)?;
290 self.runtime.block_on(async move {
291 match self
292 .client
293 .delete_object()
294 .bucket(location.bucket)
295 .key(location.key)
296 .if_match(expected_version)
297 .send()
298 .await
299 {
300 Ok(_) => Ok(ConditionalWrite::Written {
301 version: "deleted".to_string(),
302 }),
303 Err(err) if is_precondition_or_conflict(&err) => Ok(ConditionalWrite::Conflict),
304 Err(err) => Err(
305 Box::new(StorageError(format!("s3 delete object failed: {err}")))
306 as Box<dyn std::error::Error + Send + Sync>,
307 ),
308 }
309 })
310 }
311}
312
313fn is_not_found<E: std::fmt::Display>(err: &E) -> bool {
314 let text = err.to_string();
315 text.contains("NoSuchKey") || text.contains("NotFound") || text.contains("404")
316}
317
318fn is_precondition_or_conflict<E: std::fmt::Display>(err: &E) -> bool {
319 let text = err.to_string();
320 text.contains("PreconditionFailed")
321 || text.contains("ConditionalRequestConflict")
322 || text.contains("412")
323 || text.contains("409")
324}
325
326pub fn parse_s3_uri(uri: &str) -> FloeResult<S3Location> {
327 parse_bucket_uri("s3", uri)
328}
329
330pub fn format_s3_uri(bucket: &str, key: &str) -> String {
331 format_bucket_uri("s3", bucket, key)
332}
333
334pub type S3Location = BucketLocation;
335
336pub fn filter_keys_by_suffixes(mut keys: Vec<String>, suffixes: &[String]) -> Vec<String> {
337 let mut refs = Vec::with_capacity(keys.len());
338 for key in keys.drain(..) {
339 refs.push(ObjectRef {
340 uri: key.clone(),
341 key,
342 last_modified: None,
343 size: None,
344 });
345 }
346 let filtered = planner::filter_by_suffixes(refs, suffixes);
347 let sorted = planner::stable_sort_refs(filtered);
348 sorted.into_iter().map(|obj| obj.key).collect()
349}