1use std::sync::Arc;
7
8use aws_config::BehaviorVersion;
9use aws_sdk_s3::{
10 config::{Credentials, Region},
11 primitives::ByteStream,
12 Client,
13};
14use bytes::Bytes;
15use tokio::runtime::Runtime;
16
17use super::StorageBackend;
18use crate::error::{Error, Result};
19
20#[derive(Debug, Clone)]
22pub enum CredentialSource {
23 Environment,
25 Static {
27 access_key: String,
29 secret_key: String,
31 },
32 Anonymous,
34}
35
36pub struct S3Backend {
68 client: Client,
69 bucket: String,
70 runtime: Arc<Runtime>,
71}
72
73impl std::fmt::Debug for S3Backend {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("S3Backend")
76 .field("bucket", &self.bucket)
77 .finish_non_exhaustive()
78 }
79}
80
81impl S3Backend {
82 pub fn new(
95 bucket: impl Into<String>,
96 region: impl Into<String>,
97 endpoint: Option<String>,
98 credentials: CredentialSource,
99 ) -> Result<Self> {
100 let runtime =
101 Runtime::new().map_err(|e| Error::storage(format!("Failed to create runtime: {e}")))?;
102
103 let bucket = bucket.into();
104 let region = region.into();
105
106 let client = runtime
107 .block_on(async { Self::create_client(®ion, endpoint, credentials).await })?;
108
109 Ok(Self {
110 client,
111 bucket,
112 runtime: Arc::new(runtime),
113 })
114 }
115
116 async fn create_client(
117 region: &str,
118 endpoint: Option<String>,
119 credentials: CredentialSource,
120 ) -> Result<Client> {
121 let region = Region::new(region.to_string());
122
123 let mut config_loader = aws_config::defaults(BehaviorVersion::latest()).region(region);
124
125 match credentials {
127 CredentialSource::Environment => {
128 }
130 CredentialSource::Static {
131 access_key,
132 secret_key,
133 } => {
134 let creds = Credentials::new(access_key, secret_key, None, None, "alimentar");
135 config_loader = config_loader.credentials_provider(creds);
136 }
137 CredentialSource::Anonymous => {
138 let creds = Credentials::new("", "", None, None, "anonymous");
139 config_loader = config_loader.credentials_provider(creds);
140 }
141 }
142
143 let sdk_config = config_loader.load().await;
144
145 let mut s3_config = aws_sdk_s3::config::Builder::from(&sdk_config);
146
147 if let Some(endpoint_url) = endpoint {
149 s3_config = s3_config.endpoint_url(&endpoint_url).force_path_style(true);
150 }
152
153 Ok(Client::from_conf(s3_config.build()))
154 }
155
156 pub fn bucket(&self) -> &str {
158 &self.bucket
159 }
160
161 fn block_on<F: std::future::Future>(&self, future: F) -> F::Output {
162 self.runtime.block_on(future)
163 }
164}
165
166impl StorageBackend for S3Backend {
167 fn list(&self, prefix: &str) -> Result<Vec<String>> {
168 self.block_on(async {
169 let mut keys = Vec::new();
170 let mut continuation_token: Option<String> = None;
171
172 loop {
173 let mut request = self
174 .client
175 .list_objects_v2()
176 .bucket(&self.bucket)
177 .prefix(prefix);
178
179 if let Some(token) = continuation_token.take() {
180 request = request.continuation_token(token);
181 }
182
183 let response = request
184 .send()
185 .await
186 .map_err(|e| Error::storage(format!("S3 list error: {e}")))?;
187
188 if let Some(contents) = response.contents {
189 for object in contents {
190 if let Some(key) = object.key {
191 keys.push(key);
192 }
193 }
194 }
195
196 if response.is_truncated.unwrap_or(false) {
197 continuation_token = response.next_continuation_token;
198 } else {
199 break;
200 }
201 }
202
203 Ok(keys)
204 })
205 }
206
207 fn get(&self, key: &str) -> Result<Bytes> {
208 self.block_on(async {
209 let response = self
210 .client
211 .get_object()
212 .bucket(&self.bucket)
213 .key(key)
214 .send()
215 .await
216 .map_err(|e| Error::storage(format!("S3 get error for key '{}': {}", key, e)))?;
217
218 let body = response
219 .body
220 .collect()
221 .await
222 .map_err(|e| Error::storage(format!("S3 body read error: {e}")))?;
223
224 Ok(body.into_bytes())
225 })
226 }
227
228 fn put(&self, key: &str, data: Bytes) -> Result<()> {
229 self.block_on(async {
230 self.client
231 .put_object()
232 .bucket(&self.bucket)
233 .key(key)
234 .body(ByteStream::from(data))
235 .send()
236 .await
237 .map_err(|e| Error::storage(format!("S3 put error for key '{}': {}", key, e)))?;
238
239 Ok(())
240 })
241 }
242
243 fn delete(&self, key: &str) -> Result<()> {
244 self.block_on(async {
245 self.client
246 .delete_object()
247 .bucket(&self.bucket)
248 .key(key)
249 .send()
250 .await
251 .map_err(|e| Error::storage(format!("S3 delete error for key '{}': {}", key, e)))?;
252
253 Ok(())
254 })
255 }
256
257 fn exists(&self, key: &str) -> Result<bool> {
258 self.block_on(async {
259 match self
260 .client
261 .head_object()
262 .bucket(&self.bucket)
263 .key(key)
264 .send()
265 .await
266 {
267 Ok(_) => Ok(true),
268 Err(e) => {
269 let service_error = e.into_service_error();
271 if service_error.is_not_found() {
272 Ok(false)
273 } else {
274 Err(Error::storage(format!(
275 "S3 exists error for key '{}': {}",
276 key, service_error
277 )))
278 }
279 }
280 }
281 })
282 }
283
284 fn size(&self, key: &str) -> Result<u64> {
285 self.block_on(async {
286 let response = self
287 .client
288 .head_object()
289 .bucket(&self.bucket)
290 .key(key)
291 .send()
292 .await
293 .map_err(|e| Error::storage(format!("S3 head error for key '{}': {}", key, e)))?;
294
295 let size = response
296 .content_length
297 .and_then(|l| u64::try_from(l).ok())
298 .unwrap_or(0);
299 Ok(size)
300 })
301 }
302}
303
304#[cfg(test)]
310mod tests {
311 use super::*;
312
313 #[test]
314 fn test_credential_source_environment() {
315 let creds = CredentialSource::Environment;
316 assert!(matches!(creds, CredentialSource::Environment));
317 }
318
319 #[test]
320 fn test_credential_source_static() {
321 let creds = CredentialSource::Static {
322 access_key: "test".to_string(),
323 secret_key: "secret".to_string(),
324 };
325 assert!(matches!(creds, CredentialSource::Static { .. }));
326 }
327
328 #[test]
329 fn test_credential_source_anonymous() {
330 let creds = CredentialSource::Anonymous;
331 assert!(matches!(creds, CredentialSource::Anonymous));
332 }
333
334 }