Skip to main content

floe_core/io/storage/providers/
s3.rs

1use std::path::{Path, PathBuf};
2
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::config::Region;
5use aws_sdk_s3::primitives::ByteStream;
6use aws_sdk_s3::Client;
7use tokio::io::AsyncWriteExt;
8use tokio::runtime::Runtime;
9
10use crate::errors::StorageError;
11use crate::io::storage::uri::{format_bucket_uri, parse_bucket_uri, BucketLocation};
12use crate::io::storage::{planner, ObjectRef, StorageClient};
13use crate::FloeResult;
14
15pub struct S3Client {
16    bucket: String,
17    client: Client,
18    runtime: Runtime,
19}
20
21impl S3Client {
22    pub fn new(bucket: String, region: Option<&str>) -> FloeResult<Self> {
23        let runtime = tokio::runtime::Builder::new_current_thread()
24            .enable_all()
25            .build()
26            .map_err(|err| Box::new(StorageError(format!("failed to build aws runtime: {err}"))))?;
27        let config = runtime.block_on(async {
28            let region_provider = match region {
29                Some(region) => RegionProviderChain::first_try(Region::new(region.to_string()))
30                    .or_default_provider(),
31                None => RegionProviderChain::default_provider(),
32            };
33            aws_config::defaults(aws_config::BehaviorVersion::latest())
34                .region(region_provider)
35                .load()
36                .await
37        });
38        let client = Client::new(&config);
39        Ok(Self {
40            bucket,
41            client,
42            runtime,
43        })
44    }
45
46    fn bucket(&self) -> &str {
47        self.bucket.as_str()
48    }
49}
50
51impl StorageClient for S3Client {
52    fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
53        let bucket = self.bucket().to_string();
54        let prefix = prefix.to_string();
55        self.runtime.block_on(async {
56            let mut refs = Vec::new();
57            let mut continuation = None;
58            loop {
59                let mut request = self.client.list_objects_v2().bucket(&bucket);
60                if !prefix.is_empty() {
61                    request = request.prefix(&prefix);
62                }
63                if let Some(token) = continuation {
64                    request = request.continuation_token(token);
65                }
66                let response = request.send().await.map_err(|err| {
67                    Box::new(StorageError(format!(
68                        "s3 list objects failed for bucket {}: {err}",
69                        bucket
70                    ))) as Box<dyn std::error::Error + Send + Sync>
71                })?;
72                if let Some(contents) = response.contents {
73                    for object in contents {
74                        if let Some(key) = object.key {
75                            let uri = format_s3_uri(&bucket, &key);
76                            refs.push(planner::object_ref(
77                                uri,
78                                key,
79                                object.last_modified.as_ref().map(|value| value.to_string()),
80                                object.size.map(|value| value as u64),
81                            ));
82                        }
83                    }
84                }
85                if response.is_truncated.unwrap_or(false) {
86                    continuation = response.next_continuation_token;
87                    if continuation.is_none() {
88                        break;
89                    }
90                } else {
91                    break;
92                }
93            }
94            Ok(refs)
95        })
96    }
97
98    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
99        let location = parse_s3_uri(uri)?;
100        let bucket = location.bucket;
101        let key = location.key;
102        let dest = planner::temp_path_for_key(temp_dir, &key);
103        let dest_clone = dest.clone();
104        self.runtime.block_on(async move {
105            let response = self
106                .client
107                .get_object()
108                .bucket(bucket)
109                .key(key.clone())
110                .send()
111                .await
112                .map_err(|err| {
113                    Box::new(StorageError(format!("s3 get object failed: {err}")))
114                        as Box<dyn std::error::Error + Send + Sync>
115                })?;
116            if let Some(parent) = dest_clone.parent() {
117                tokio::fs::create_dir_all(parent).await?;
118            }
119            let mut file = tokio::fs::File::create(&dest_clone).await?;
120            let mut reader = response.body.into_async_read();
121            tokio::io::copy(&mut reader, &mut file).await?;
122            file.flush().await?;
123            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
124        })?;
125        Ok(dest)
126    }
127
128    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
129        let location = parse_s3_uri(uri)?;
130        let bucket = location.bucket;
131        let key = location.key;
132        let path = local_path.to_path_buf();
133        self.runtime.block_on(async move {
134            let body = ByteStream::from_path(path).await.map_err(|err| {
135                Box::new(StorageError(format!("s3 upload body failed: {err}")))
136                    as Box<dyn std::error::Error + Send + Sync>
137            })?;
138            self.client
139                .put_object()
140                .bucket(bucket)
141                .key(key)
142                .body(body)
143                .send()
144                .await
145                .map_err(|err| {
146                    Box::new(StorageError(format!("s3 put object failed: {err}")))
147                        as Box<dyn std::error::Error + Send + Sync>
148                })?;
149            Ok(())
150        })
151    }
152
153    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
154        Ok(format_s3_uri(self.bucket(), path.trim_start_matches('/')))
155    }
156
157    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
158        let src = parse_s3_uri(src_uri)?;
159        let dst = parse_s3_uri(dst_uri)?;
160        let copy_source = format!("{}/{}", src.bucket, src.key);
161        self.runtime.block_on(async move {
162            self.client
163                .copy_object()
164                .bucket(dst.bucket)
165                .key(dst.key)
166                .copy_source(copy_source)
167                .send()
168                .await
169                .map_err(|err| {
170                    Box::new(StorageError(format!("s3 copy object failed: {err}")))
171                        as Box<dyn std::error::Error + Send + Sync>
172                })?;
173            Ok(())
174        })
175    }
176
177    fn delete_object(&self, uri: &str) -> FloeResult<()> {
178        let location = parse_s3_uri(uri)?;
179        let bucket = location.bucket;
180        let key = location.key;
181        self.runtime.block_on(async move {
182            self.client
183                .delete_object()
184                .bucket(bucket)
185                .key(key)
186                .send()
187                .await
188                .map_err(|err| {
189                    Box::new(StorageError(format!("s3 delete object failed: {err}")))
190                        as Box<dyn std::error::Error + Send + Sync>
191                })?;
192            Ok(())
193        })
194    }
195
196    fn exists(&self, uri: &str) -> FloeResult<bool> {
197        let location = parse_s3_uri(uri)?;
198        planner::exists_by_key(self, &location.key)
199    }
200}
201
202pub fn parse_s3_uri(uri: &str) -> FloeResult<S3Location> {
203    parse_bucket_uri("s3", uri)
204}
205
206pub fn format_s3_uri(bucket: &str, key: &str) -> String {
207    format_bucket_uri("s3", bucket, key)
208}
209
210pub type S3Location = BucketLocation;
211
212pub fn filter_keys_by_suffixes(mut keys: Vec<String>, suffixes: &[String]) -> Vec<String> {
213    let mut refs = Vec::with_capacity(keys.len());
214    for key in keys.drain(..) {
215        refs.push(ObjectRef {
216            uri: key.clone(),
217            key,
218            last_modified: None,
219            size: None,
220        });
221    }
222    let filtered = planner::filter_by_suffixes(refs, suffixes);
223    let sorted = planner::stable_sort_refs(filtered);
224    sorted.into_iter().map(|obj| obj.key).collect()
225}