1use std::path::{Path, PathBuf};
2use std::time::SystemTime;
3
4use async_trait::async_trait;
5use aws_config::BehaviorVersion;
6use aws_sdk_s3::config::Region;
7use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
8use aws_sdk_s3::operation::head_object::HeadObjectError;
9use aws_sdk_s3::operation::put_object::PutObjectError;
10use aws_sdk_s3::primitives::ByteStream;
11use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
12
13use crate::factory::storage_work_root;
14use crate::fs_utils::{
15 cleanup_temp_file_on_error, create_private_file, ensure_private_directory,
16 finalize_local_temp_without_overwrite,
17};
18use crate::{Result, Storage, StorageError, StorageKey, StorageWriteIntent};
19
20#[derive(Debug, Clone)]
22pub struct S3Storage {
23 client: aws_sdk_s3::Client,
24 bucket: String,
25 prefix: Option<String>,
26 work_root: PathBuf,
27}
28
29#[derive(Debug, Clone)]
31pub struct S3StorageOptions {
32 pub bucket: String,
33 pub prefix: Option<String>,
34 pub region: String,
35 pub endpoint_url: Option<url::Url>,
36 pub force_path_style: bool,
37 pub work_root: Option<PathBuf>,
38}
39
40impl S3Storage {
41 pub async fn from_options(options: S3StorageOptions) -> anyhow::Result<Self> {
43 let bucket = options.bucket.trim().to_string();
44 if bucket.is_empty() {
45 anyhow::bail!("S3 bucket must be set");
46 }
47 let region = Region::new(options.region.trim().to_string());
48 let mut loader = aws_config::defaults(BehaviorVersion::latest()).region(region);
49 if let Some(endpoint) = options.endpoint_url.as_ref() {
50 loader = loader.endpoint_url(endpoint.as_str());
51 }
52 let shared_config = loader.load().await;
53 let mut s3_config = aws_sdk_s3::config::Builder::from(&shared_config);
54 if options.force_path_style {
55 s3_config = s3_config.force_path_style(true);
56 }
57 Ok(Self {
58 client: aws_sdk_s3::Client::from_conf(s3_config.build()),
59 bucket,
60 prefix: normalized_s3_prefix(options.prefix.as_deref())?,
61 work_root: storage_work_root(options.work_root.as_deref())?,
62 })
63 }
64
65 fn object_key(&self, key: &StorageKey) -> String {
66 match &self.prefix {
67 Some(prefix) => format!("{prefix}/{}", key.as_str()),
68 None => key.as_str().to_string(),
69 }
70 }
71
72 async fn object_len(&self, key: &StorageKey) -> Result<Option<u64>> {
73 let object_key = self.object_key(key);
74 match self
75 .client
76 .head_object()
77 .bucket(&self.bucket)
78 .key(&object_key)
79 .send()
80 .await
81 {
82 Ok(output) => {
83 let len = output.content_length().unwrap_or(0);
84 u64::try_from(len)
85 .map(Some)
86 .map_err(|_| StorageError::Internal("negative S3 content length".to_string()))
87 }
88 Err(error) if s3_head_object_error_is_not_found(&error) => Ok(None),
89 Err(error) => Err(StorageError::Backend(format!("{error:?}"))),
90 }
91 }
92
93 async fn verify_object_len(&self, key: &StorageKey, expected: u64) -> Result<()> {
94 let actual = self
95 .object_len(key)
96 .await?
97 .ok_or_else(|| StorageError::ObjectNotFound(key.to_string()))?;
98 if actual != expected {
99 return Err(StorageError::Internal(format!(
100 "S3 object length mismatch for {key}: expected {expected}, got {actual}"
101 )));
102 }
103 Ok(())
104 }
105}
106
107#[async_trait]
108impl Storage for S3Storage {
109 async fn put(
110 &self,
111 key: &StorageKey,
112 content: &[u8],
113 intent: StorageWriteIntent,
114 ) -> Result<StorageKey> {
115 let len = u64::try_from(content.len()).map_err(|_| StorageError::ObjectTooLarge {
116 label: intent.label(),
117 actual: u64::MAX,
118 limit: intent.max_bytes(),
119 })?;
120 intent.ensure_len(len)?;
121 let object_key = self.object_key(key);
122 let put_request = self
123 .client
124 .put_object()
125 .bucket(&self.bucket)
126 .key(&object_key)
127 .body(ByteStream::from(content.to_vec()))
128 .if_none_match("*");
129 let put_result = put_request.send().await;
130 if let Err(error) = put_result {
131 if s3_put_object_error_is_conflict(&error) {
132 return Err(StorageError::ObjectConflict(key.to_string()));
133 }
134 return Err(StorageError::Backend(format!("{error:?}")));
135 }
136 if let Err(error) = self.verify_object_len(key, len).await {
137 drop(self.delete(key).await);
138 return Err(error);
139 }
140 Ok(key.clone())
141 }
142
143 async fn put_file(
144 &self,
145 key: &StorageKey,
146 source: &Path,
147 intent: StorageWriteIntent,
148 ) -> Result<StorageKey> {
149 let len = tokio::fs::metadata(source).await?.len();
150 intent.ensure_len(len)?;
151 let object_key = self.object_key(key);
152 let body = ByteStream::from_path(source)
153 .await
154 .map_err(|e| StorageError::Io(std::io::Error::other(e)))?;
155 let put_request = self
156 .client
157 .put_object()
158 .bucket(&self.bucket)
159 .key(&object_key)
160 .body(body)
161 .if_none_match("*");
162 let put_result = put_request.send().await;
163 if let Err(error) = put_result {
164 if s3_put_object_error_is_conflict(&error) {
165 return Err(StorageError::ObjectConflict(key.to_string()));
166 }
167 return Err(StorageError::Backend(format!("{error:?}")));
168 }
169 if let Err(error) = self.verify_object_len(key, len).await {
170 drop(self.delete(key).await);
171 return Err(error);
172 }
173 Ok(key.clone())
174 }
175
176 async fn promote(
177 &self,
178 temporary_key: &StorageKey,
179 durable_key: &StorageKey,
180 ) -> Result<StorageKey> {
181 let source_len = self
182 .object_len(temporary_key)
183 .await?
184 .ok_or_else(|| StorageError::ObjectNotFound(temporary_key.to_string()))?;
185 ensure_private_directory(&self.work_root).await?;
186 let local_temp = self
187 .work_root
188 .join(format!("agentics-s3-promote-{}", uuid::Uuid::new_v4()));
189 let intent = StorageWriteIntent::new("temporary storage object", source_len);
190 let promote_result = async {
191 self.get_to_file(temporary_key, &local_temp, intent).await?;
192 self.put_file(durable_key, &local_temp, intent).await?;
193 drop(self.delete(temporary_key).await);
198 Ok(durable_key.clone())
199 }
200 .await;
201 let cleanup_result = tokio::fs::remove_file(&local_temp).await;
202 match cleanup_result {
203 Ok(()) => promote_result,
204 Err(error) if error.kind() == std::io::ErrorKind::NotFound => promote_result,
205 Err(_cleanup_error) => promote_result,
206 }
207 }
208
209 async fn get(&self, key: &StorageKey, intent: StorageWriteIntent) -> Result<Vec<u8>> {
210 let object_len = self
211 .object_len(key)
212 .await?
213 .ok_or_else(|| StorageError::ObjectNotFound(key.to_string()))?;
214 intent.ensure_len(object_len)?;
215 let object_key = self.object_key(key);
216 let output = self
217 .client
218 .get_object()
219 .bucket(&self.bucket)
220 .key(&object_key)
221 .send()
222 .await
223 .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
224 let mut body = output.body.into_async_read();
225 let mut bytes = Vec::new();
226 let mut read_total = 0u64;
227 let mut buffer = [0u8; 64 * 1024];
228 loop {
229 let len = body
230 .read(&mut buffer)
231 .await
232 .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
233 if len == 0 {
234 break;
235 }
236 let len_u64 = u64::try_from(len).map_err(|_| {
237 StorageError::Internal("S3 download chunk length overflow".to_string())
238 })?;
239 read_total =
240 read_total
241 .checked_add(len_u64)
242 .ok_or_else(|| StorageError::ObjectTooLarge {
243 label: intent.label(),
244 actual: u64::MAX,
245 limit: intent.max_bytes(),
246 })?;
247 intent.ensure_len(read_total)?;
248 let chunk = buffer.get(..len).ok_or_else(|| {
249 StorageError::Internal("S3 download chunk range invalid".to_string())
250 })?;
251 bytes.extend_from_slice(chunk);
252 }
253 if read_total != object_len {
254 return Err(StorageError::Internal(format!(
255 "S3 object length mismatch while reading {key}: expected {object_len}, read {read_total}"
256 )));
257 }
258 Ok(bytes)
259 }
260
261 async fn get_to_file(
262 &self,
263 key: &StorageKey,
264 destination: &Path,
265 intent: StorageWriteIntent,
266 ) -> Result<()> {
267 let expected_len = self
268 .object_len(key)
269 .await?
270 .ok_or_else(|| StorageError::ObjectNotFound(key.to_string()))?;
271 intent.ensure_len(expected_len)?;
272 let output = self
273 .client
274 .get_object()
275 .bucket(&self.bucket)
276 .key(self.object_key(key))
277 .send()
278 .await
279 .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
280 if let Some(parent) = destination.parent() {
281 tokio::fs::create_dir_all(parent).await?;
282 }
283 let temporary =
284 destination.with_extension(format!("agentics-download-{}", uuid::Uuid::new_v4()));
285 let write_result = async {
286 if tokio::fs::try_exists(destination).await? {
287 return Err(StorageError::ObjectConflict(
288 destination.display().to_string(),
289 ));
290 }
291 let mut file = create_private_file(&temporary).await?;
292 let mut body = output.body.into_async_read();
293 let mut written = 0u64;
294 let mut buffer = [0u8; 64 * 1024];
295 loop {
296 let len = body
297 .read(&mut buffer)
298 .await
299 .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
300 if len == 0 {
301 break;
302 }
303 let len_u64 = u64::try_from(len).map_err(|_| {
304 StorageError::Internal("S3 download chunk length overflow".to_string())
305 })?;
306 written = written.checked_add(len_u64).ok_or_else(|| {
307 StorageError::ObjectTooLarge {
308 label: intent.label(),
309 actual: u64::MAX,
310 limit: intent.max_bytes(),
311 }
312 })?;
313 intent.ensure_len(written)?;
314 let chunk = buffer.get(..len).ok_or_else(|| {
315 StorageError::Internal("S3 download chunk range invalid".to_string())
316 })?;
317 file.write_all(chunk).await?;
318 }
319 if written != expected_len {
320 return Err(StorageError::Internal(format!(
321 "S3 object length mismatch while downloading {key}: expected {expected_len}, wrote {written}"
322 )));
323 }
324 file.flush().await?;
325 drop(file);
326 finalize_local_temp_without_overwrite(
327 &temporary,
328 destination,
329 &destination.display().to_string(),
330 )
331 .await?;
332 Ok::<(), StorageError>(())
333 }
334 .await;
335 cleanup_temp_file_on_error(write_result, &temporary).await
336 }
337
338 async fn exists(&self, key: &StorageKey) -> Result<bool> {
339 self.object_len(key).await.map(|value| value.is_some())
340 }
341
342 async fn delete(&self, key: &StorageKey) -> Result<()> {
343 self.client
344 .delete_object()
345 .bucket(&self.bucket)
346 .key(self.object_key(key))
347 .send()
348 .await
349 .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
350 Ok(())
351 }
352
353 async fn list_prefix(&self, prefix: &StorageKey) -> Result<Vec<StorageKey>> {
354 let mut continuation_token = None;
355 let mut keys = Vec::new();
356 let physical_prefix = self.object_key(prefix);
357 loop {
358 let output = self
359 .client
360 .list_objects_v2()
361 .bucket(&self.bucket)
362 .prefix(&physical_prefix)
363 .set_continuation_token(continuation_token.clone())
364 .send()
365 .await
366 .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
367 for object in output.contents() {
368 if let Some(key) = object.key() {
369 let logical_key = self.strip_prefix(key)?;
370 if storage_key_is_within_prefix(&logical_key, prefix) {
371 keys.push(logical_key);
372 }
373 }
374 }
375 continuation_token = output.next_continuation_token().map(ToOwned::to_owned);
376 if continuation_token.is_none() {
377 break;
378 }
379 }
380 Ok(keys)
381 }
382
383 async fn delete_prefix_older_than(
384 &self,
385 prefix: &StorageKey,
386 older_than: SystemTime,
387 ) -> Result<u64> {
388 let mut continuation_token = None;
389 let physical_prefix = self.object_key(prefix);
390 let mut keys_to_delete = Vec::new();
391 loop {
392 let output = self
393 .client
394 .list_objects_v2()
395 .bucket(&self.bucket)
396 .prefix(&physical_prefix)
397 .set_continuation_token(continuation_token.clone())
398 .send()
399 .await
400 .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
401 for object in output.contents() {
402 let Some(key) = object.key() else {
403 continue;
404 };
405 let logical_key = self.strip_prefix(key)?;
406 if !storage_key_is_within_prefix(&logical_key, prefix) {
407 continue;
408 }
409 let Some(last_modified) = object.last_modified() else {
410 continue;
411 };
412 let modified = SystemTime::try_from(*last_modified)
413 .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
414 if modified < older_than {
415 keys_to_delete.push(logical_key);
416 }
417 }
418 continuation_token = output.next_continuation_token().map(ToOwned::to_owned);
419 if continuation_token.is_none() {
420 break;
421 }
422 }
423 let mut deleted = 0u64;
424 for key in keys_to_delete {
425 self.delete(&key).await?;
426 deleted = deleted.checked_add(1).ok_or_else(|| {
427 StorageError::Internal("deleted object count overflow".to_string())
428 })?;
429 }
430 Ok(deleted)
431 }
432}
433
434impl S3Storage {
435 fn strip_prefix(&self, physical_key: &str) -> Result<StorageKey> {
436 let logical = match &self.prefix {
437 Some(prefix) => physical_key
438 .strip_prefix(prefix)
439 .and_then(|value| value.strip_prefix('/'))
440 .ok_or_else(|| {
441 StorageError::Internal(format!(
442 "S3 list returned key outside configured prefix: {physical_key}"
443 ))
444 })?,
445 None => physical_key,
446 };
447 StorageKey::try_new(logical).map_err(|e| StorageError::InvalidKey(e.to_string()))
448 }
449
450 pub async fn create_bucket_if_missing_for_tests(&self) -> Result<()> {
455 let create_bucket = self
456 .client
457 .create_bucket()
458 .bucket(&self.bucket)
459 .send()
460 .await;
461 if let Err(error) = create_bucket {
462 let text = format!("{error} {error:?}");
463 if !(text.contains("BucketAlreadyOwnedByYou")
464 || text.contains("BucketAlreadyExists")
465 || text.contains("Conflict")
466 || text.contains("409"))
467 {
468 return Err(StorageError::Backend(format!("{error:?}")));
469 }
470 }
471 Ok(())
472 }
473}
474
475fn storage_key_is_within_prefix(key: &StorageKey, prefix: &StorageKey) -> bool {
476 key == prefix
477 || key
478 .as_str()
479 .strip_prefix(prefix.as_str())
480 .is_some_and(|remainder| remainder.starts_with('/'))
481}
482
483fn normalized_s3_prefix(value: Option<&str>) -> Result<Option<String>> {
484 let Some(prefix) = value.map(str::trim).filter(|value| !value.is_empty()) else {
485 return Ok(None);
486 };
487 StorageKey::try_new(prefix)
488 .map_err(|e| StorageError::InvalidKey(e.to_string()))
489 .map(|key| Some(key.to_string()))
490}
491
492fn s3_head_object_error_is_not_found(error: &SdkError<HeadObjectError>) -> bool {
493 if error
494 .as_service_error()
495 .is_some_and(HeadObjectError::is_not_found)
496 {
497 return true;
498 }
499 if error
500 .raw_response()
501 .is_some_and(|response| response.status().as_u16() == 404)
502 {
503 return true;
504 }
505 matches!(error.code(), Some("NotFound" | "NoSuchKey" | "404"))
506}
507
508fn s3_put_object_error_is_conflict(error: &SdkError<PutObjectError>) -> bool {
509 if error
510 .raw_response()
511 .is_some_and(|response| response.status().as_u16() == 412)
512 {
513 return true;
514 }
515 if error.raw_response().is_some_and(|response| {
516 response.status().as_u16() == 409
517 && matches!(error.code(), Some("ConditionalRequestConflict"))
518 }) {
519 return true;
520 }
521 matches!(
522 error.code(),
523 Some("ConditionalRequestConflict" | "PreconditionFailed")
524 )
525}