floe_core/io/storage/providers/
s3.rs1use std::path::{Path, PathBuf};
2
3use aws_config::meta::region::RegionProviderChain;
4use aws_sdk_s3::config::Region;
5use aws_sdk_s3::primitives::ByteStream;
6use aws_sdk_s3::Client;
7use tokio::io::AsyncWriteExt;
8use tokio::runtime::Runtime;
9
10use crate::errors::StorageError;
11use crate::io::storage::uri::{format_bucket_uri, parse_bucket_uri, BucketLocation};
12use crate::io::storage::{planner, ObjectRef, StorageClient};
13use crate::FloeResult;
14
15pub struct S3Client {
16 bucket: String,
17 client: Client,
18 runtime: Runtime,
19}
20
21impl S3Client {
22 pub fn new(bucket: String, region: Option<&str>) -> FloeResult<Self> {
23 let runtime = tokio::runtime::Builder::new_current_thread()
24 .enable_all()
25 .build()
26 .map_err(|err| Box::new(StorageError(format!("failed to build aws runtime: {err}"))))?;
27 let config = runtime.block_on(async {
28 let region_provider = match region {
29 Some(region) => RegionProviderChain::first_try(Region::new(region.to_string()))
30 .or_default_provider(),
31 None => RegionProviderChain::default_provider(),
32 };
33 aws_config::defaults(aws_config::BehaviorVersion::latest())
34 .region(region_provider)
35 .load()
36 .await
37 });
38 let client = Client::new(&config);
39 Ok(Self {
40 bucket,
41 client,
42 runtime,
43 })
44 }
45
46 fn bucket(&self) -> &str {
47 self.bucket.as_str()
48 }
49}
50
51impl StorageClient for S3Client {
52 fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
53 let bucket = self.bucket().to_string();
54 let prefix = prefix.to_string();
55 self.runtime.block_on(async {
56 let mut refs = Vec::new();
57 let mut continuation = None;
58 loop {
59 let mut request = self.client.list_objects_v2().bucket(&bucket);
60 if !prefix.is_empty() {
61 request = request.prefix(&prefix);
62 }
63 if let Some(token) = continuation {
64 request = request.continuation_token(token);
65 }
66 let response = request.send().await.map_err(|err| {
67 Box::new(StorageError(format!(
68 "s3 list objects failed for bucket {}: {err}",
69 bucket
70 ))) as Box<dyn std::error::Error + Send + Sync>
71 })?;
72 if let Some(contents) = response.contents {
73 for object in contents {
74 if let Some(key) = object.key {
75 let uri = format_s3_uri(&bucket, &key);
76 refs.push(planner::object_ref(
77 uri,
78 key,
79 object.last_modified.as_ref().map(|value| value.to_string()),
80 object.size.map(|value| value as u64),
81 ));
82 }
83 }
84 }
85 if response.is_truncated.unwrap_or(false) {
86 continuation = response.next_continuation_token;
87 if continuation.is_none() {
88 break;
89 }
90 } else {
91 break;
92 }
93 }
94 Ok(refs)
95 })
96 }
97
98 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
99 let location = parse_s3_uri(uri)?;
100 let bucket = location.bucket;
101 let key = location.key;
102 let dest = planner::temp_path_for_key(temp_dir, &key);
103 let dest_clone = dest.clone();
104 self.runtime.block_on(async move {
105 let response = self
106 .client
107 .get_object()
108 .bucket(bucket)
109 .key(key.clone())
110 .send()
111 .await
112 .map_err(|err| {
113 Box::new(StorageError(format!("s3 get object failed: {err}")))
114 as Box<dyn std::error::Error + Send + Sync>
115 })?;
116 if let Some(parent) = dest_clone.parent() {
117 tokio::fs::create_dir_all(parent).await?;
118 }
119 let mut file = tokio::fs::File::create(&dest_clone).await?;
120 let mut reader = response.body.into_async_read();
121 tokio::io::copy(&mut reader, &mut file).await?;
122 file.flush().await?;
123 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
124 })?;
125 Ok(dest)
126 }
127
128 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
129 let location = parse_s3_uri(uri)?;
130 let bucket = location.bucket;
131 let key = location.key;
132 let path = local_path.to_path_buf();
133 self.runtime.block_on(async move {
134 let body = ByteStream::from_path(path).await.map_err(|err| {
135 Box::new(StorageError(format!("s3 upload body failed: {err}")))
136 as Box<dyn std::error::Error + Send + Sync>
137 })?;
138 self.client
139 .put_object()
140 .bucket(bucket)
141 .key(key)
142 .body(body)
143 .send()
144 .await
145 .map_err(|err| {
146 Box::new(StorageError(format!("s3 put object failed: {err}")))
147 as Box<dyn std::error::Error + Send + Sync>
148 })?;
149 Ok(())
150 })
151 }
152
153 fn resolve_uri(&self, path: &str) -> FloeResult<String> {
154 Ok(format_s3_uri(self.bucket(), path.trim_start_matches('/')))
155 }
156
157 fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
158 let src = parse_s3_uri(src_uri)?;
159 let dst = parse_s3_uri(dst_uri)?;
160 let copy_source = format!("{}/{}", src.bucket, src.key);
161 self.runtime.block_on(async move {
162 self.client
163 .copy_object()
164 .bucket(dst.bucket)
165 .key(dst.key)
166 .copy_source(copy_source)
167 .send()
168 .await
169 .map_err(|err| {
170 Box::new(StorageError(format!("s3 copy object failed: {err}")))
171 as Box<dyn std::error::Error + Send + Sync>
172 })?;
173 Ok(())
174 })
175 }
176
177 fn delete_object(&self, uri: &str) -> FloeResult<()> {
178 let location = parse_s3_uri(uri)?;
179 let bucket = location.bucket;
180 let key = location.key;
181 self.runtime.block_on(async move {
182 self.client
183 .delete_object()
184 .bucket(bucket)
185 .key(key)
186 .send()
187 .await
188 .map_err(|err| {
189 Box::new(StorageError(format!("s3 delete object failed: {err}")))
190 as Box<dyn std::error::Error + Send + Sync>
191 })?;
192 Ok(())
193 })
194 }
195
196 fn exists(&self, uri: &str) -> FloeResult<bool> {
197 let location = parse_s3_uri(uri)?;
198 planner::exists_by_key(self, &location.key)
199 }
200}
201
202pub fn parse_s3_uri(uri: &str) -> FloeResult<S3Location> {
203 parse_bucket_uri("s3", uri)
204}
205
206pub fn format_s3_uri(bucket: &str, key: &str) -> String {
207 format_bucket_uri("s3", bucket, key)
208}
209
210pub type S3Location = BucketLocation;
211
212pub fn filter_keys_by_suffixes(mut keys: Vec<String>, suffixes: &[String]) -> Vec<String> {
213 let mut refs = Vec::with_capacity(keys.len());
214 for key in keys.drain(..) {
215 refs.push(ObjectRef {
216 uri: key.clone(),
217 key,
218 last_modified: None,
219 size: None,
220 });
221 }
222 let filtered = planner::filter_by_suffixes(refs, suffixes);
223 let sorted = planner::stable_sort_refs(filtered);
224 sorted.into_iter().map(|obj| obj.key).collect()
225}