Skip to main content

agentics_storage/
s3.rs

1use std::path::{Path, PathBuf};
2use std::time::SystemTime;
3
4use async_trait::async_trait;
5use aws_config::BehaviorVersion;
6use aws_sdk_s3::config::Region;
7use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
8use aws_sdk_s3::operation::head_object::HeadObjectError;
9use aws_sdk_s3::operation::put_object::PutObjectError;
10use aws_sdk_s3::primitives::ByteStream;
11use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
12
13use crate::factory::storage_work_root;
14use crate::fs_utils::{
15    cleanup_temp_file_on_error, create_private_file, ensure_private_directory,
16    finalize_local_temp_without_overwrite,
17};
18use crate::{Result, Storage, StorageError, StorageKey, StorageWriteIntent};
19
20/// S3-compatible durable object storage.
21#[derive(Debug, Clone)]
22pub struct S3Storage {
23    client: aws_sdk_s3::Client,
24    bucket: String,
25    prefix: Option<String>,
26    work_root: PathBuf,
27}
28
29/// Connection settings for S3-compatible durable object storage.
30#[derive(Debug, Clone)]
31pub struct S3StorageOptions {
32    pub bucket: String,
33    pub prefix: Option<String>,
34    pub region: String,
35    pub endpoint_url: Option<url::Url>,
36    pub force_path_style: bool,
37    pub work_root: Option<PathBuf>,
38}
39
40impl S3Storage {
41    /// Build an S3 storage client from explicit options.
42    pub async fn from_options(options: S3StorageOptions) -> anyhow::Result<Self> {
43        let bucket = options.bucket.trim().to_string();
44        if bucket.is_empty() {
45            anyhow::bail!("S3 bucket must be set");
46        }
47        let region = Region::new(options.region.trim().to_string());
48        let mut loader = aws_config::defaults(BehaviorVersion::latest()).region(region);
49        if let Some(endpoint) = options.endpoint_url.as_ref() {
50            loader = loader.endpoint_url(endpoint.as_str());
51        }
52        let shared_config = loader.load().await;
53        let mut s3_config = aws_sdk_s3::config::Builder::from(&shared_config);
54        if options.force_path_style {
55            s3_config = s3_config.force_path_style(true);
56        }
57        Ok(Self {
58            client: aws_sdk_s3::Client::from_conf(s3_config.build()),
59            bucket,
60            prefix: normalized_s3_prefix(options.prefix.as_deref())?,
61            work_root: storage_work_root(options.work_root.as_deref())?,
62        })
63    }
64
65    fn object_key(&self, key: &StorageKey) -> String {
66        match &self.prefix {
67            Some(prefix) => format!("{prefix}/{}", key.as_str()),
68            None => key.as_str().to_string(),
69        }
70    }
71
72    async fn object_len(&self, key: &StorageKey) -> Result<Option<u64>> {
73        let object_key = self.object_key(key);
74        match self
75            .client
76            .head_object()
77            .bucket(&self.bucket)
78            .key(&object_key)
79            .send()
80            .await
81        {
82            Ok(output) => {
83                let len = output.content_length().unwrap_or(0);
84                u64::try_from(len)
85                    .map(Some)
86                    .map_err(|_| StorageError::Internal("negative S3 content length".to_string()))
87            }
88            Err(error) if s3_head_object_error_is_not_found(&error) => Ok(None),
89            Err(error) => Err(StorageError::Backend(format!("{error:?}"))),
90        }
91    }
92
93    async fn verify_object_len(&self, key: &StorageKey, expected: u64) -> Result<()> {
94        let actual = self
95            .object_len(key)
96            .await?
97            .ok_or_else(|| StorageError::ObjectNotFound(key.to_string()))?;
98        if actual != expected {
99            return Err(StorageError::Internal(format!(
100                "S3 object length mismatch for {key}: expected {expected}, got {actual}"
101            )));
102        }
103        Ok(())
104    }
105}
106
107#[async_trait]
108impl Storage for S3Storage {
109    async fn put(
110        &self,
111        key: &StorageKey,
112        content: &[u8],
113        intent: StorageWriteIntent,
114    ) -> Result<StorageKey> {
115        let len = u64::try_from(content.len()).map_err(|_| StorageError::ObjectTooLarge {
116            label: intent.label(),
117            actual: u64::MAX,
118            limit: intent.max_bytes(),
119        })?;
120        intent.ensure_len(len)?;
121        let object_key = self.object_key(key);
122        let put_request = self
123            .client
124            .put_object()
125            .bucket(&self.bucket)
126            .key(&object_key)
127            .body(ByteStream::from(content.to_vec()))
128            .if_none_match("*");
129        let put_result = put_request.send().await;
130        if let Err(error) = put_result {
131            if s3_put_object_error_is_conflict(&error) {
132                return Err(StorageError::ObjectConflict(key.to_string()));
133            }
134            return Err(StorageError::Backend(format!("{error:?}")));
135        }
136        if let Err(error) = self.verify_object_len(key, len).await {
137            drop(self.delete(key).await);
138            return Err(error);
139        }
140        Ok(key.clone())
141    }
142
143    async fn put_file(
144        &self,
145        key: &StorageKey,
146        source: &Path,
147        intent: StorageWriteIntent,
148    ) -> Result<StorageKey> {
149        let len = tokio::fs::metadata(source).await?.len();
150        intent.ensure_len(len)?;
151        let object_key = self.object_key(key);
152        let body = ByteStream::from_path(source)
153            .await
154            .map_err(|e| StorageError::Io(std::io::Error::other(e)))?;
155        let put_request = self
156            .client
157            .put_object()
158            .bucket(&self.bucket)
159            .key(&object_key)
160            .body(body)
161            .if_none_match("*");
162        let put_result = put_request.send().await;
163        if let Err(error) = put_result {
164            if s3_put_object_error_is_conflict(&error) {
165                return Err(StorageError::ObjectConflict(key.to_string()));
166            }
167            return Err(StorageError::Backend(format!("{error:?}")));
168        }
169        if let Err(error) = self.verify_object_len(key, len).await {
170            drop(self.delete(key).await);
171            return Err(error);
172        }
173        Ok(key.clone())
174    }
175
176    async fn promote(
177        &self,
178        temporary_key: &StorageKey,
179        durable_key: &StorageKey,
180    ) -> Result<StorageKey> {
181        let source_len = self
182            .object_len(temporary_key)
183            .await?
184            .ok_or_else(|| StorageError::ObjectNotFound(temporary_key.to_string()))?;
185        ensure_private_directory(&self.work_root).await?;
186        let local_temp = self
187            .work_root
188            .join(format!("agentics-s3-promote-{}", uuid::Uuid::new_v4()));
189        let intent = StorageWriteIntent::new("temporary storage object", source_len);
190        let promote_result = async {
191            self.get_to_file(temporary_key, &local_temp, intent).await?;
192            self.put_file(durable_key, &local_temp, intent).await?;
193            // After `put_file` returns, the durable object has been created and
194            // length-verified. Temp cleanup is intentionally best-effort so a
195            // transient delete failure cannot make callers roll back the DB row
196            // while the durable object already exists.
197            drop(self.delete(temporary_key).await);
198            Ok(durable_key.clone())
199        }
200        .await;
201        let cleanup_result = tokio::fs::remove_file(&local_temp).await;
202        match cleanup_result {
203            Ok(()) => promote_result,
204            Err(error) if error.kind() == std::io::ErrorKind::NotFound => promote_result,
205            Err(_cleanup_error) => promote_result,
206        }
207    }
208
209    async fn get(&self, key: &StorageKey, intent: StorageWriteIntent) -> Result<Vec<u8>> {
210        let object_len = self
211            .object_len(key)
212            .await?
213            .ok_or_else(|| StorageError::ObjectNotFound(key.to_string()))?;
214        intent.ensure_len(object_len)?;
215        let object_key = self.object_key(key);
216        let output = self
217            .client
218            .get_object()
219            .bucket(&self.bucket)
220            .key(&object_key)
221            .send()
222            .await
223            .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
224        let mut body = output.body.into_async_read();
225        let mut bytes = Vec::new();
226        let mut read_total = 0u64;
227        let mut buffer = [0u8; 64 * 1024];
228        loop {
229            let len = body
230                .read(&mut buffer)
231                .await
232                .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
233            if len == 0 {
234                break;
235            }
236            let len_u64 = u64::try_from(len).map_err(|_| {
237                StorageError::Internal("S3 download chunk length overflow".to_string())
238            })?;
239            read_total =
240                read_total
241                    .checked_add(len_u64)
242                    .ok_or_else(|| StorageError::ObjectTooLarge {
243                        label: intent.label(),
244                        actual: u64::MAX,
245                        limit: intent.max_bytes(),
246                    })?;
247            intent.ensure_len(read_total)?;
248            let chunk = buffer.get(..len).ok_or_else(|| {
249                StorageError::Internal("S3 download chunk range invalid".to_string())
250            })?;
251            bytes.extend_from_slice(chunk);
252        }
253        if read_total != object_len {
254            return Err(StorageError::Internal(format!(
255                "S3 object length mismatch while reading {key}: expected {object_len}, read {read_total}"
256            )));
257        }
258        Ok(bytes)
259    }
260
261    async fn get_to_file(
262        &self,
263        key: &StorageKey,
264        destination: &Path,
265        intent: StorageWriteIntent,
266    ) -> Result<()> {
267        let expected_len = self
268            .object_len(key)
269            .await?
270            .ok_or_else(|| StorageError::ObjectNotFound(key.to_string()))?;
271        intent.ensure_len(expected_len)?;
272        let output = self
273            .client
274            .get_object()
275            .bucket(&self.bucket)
276            .key(self.object_key(key))
277            .send()
278            .await
279            .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
280        if let Some(parent) = destination.parent() {
281            tokio::fs::create_dir_all(parent).await?;
282        }
283        let temporary =
284            destination.with_extension(format!("agentics-download-{}", uuid::Uuid::new_v4()));
285        let write_result = async {
286            if tokio::fs::try_exists(destination).await? {
287                return Err(StorageError::ObjectConflict(
288                    destination.display().to_string(),
289                ));
290            }
291            let mut file = create_private_file(&temporary).await?;
292            let mut body = output.body.into_async_read();
293            let mut written = 0u64;
294            let mut buffer = [0u8; 64 * 1024];
295            loop {
296                let len = body
297                    .read(&mut buffer)
298                    .await
299                    .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
300                if len == 0 {
301                    break;
302                }
303                let len_u64 = u64::try_from(len).map_err(|_| {
304                    StorageError::Internal("S3 download chunk length overflow".to_string())
305                })?;
306                written = written.checked_add(len_u64).ok_or_else(|| {
307                    StorageError::ObjectTooLarge {
308                        label: intent.label(),
309                        actual: u64::MAX,
310                        limit: intent.max_bytes(),
311                    }
312                })?;
313                intent.ensure_len(written)?;
314                let chunk = buffer.get(..len).ok_or_else(|| {
315                    StorageError::Internal("S3 download chunk range invalid".to_string())
316                })?;
317                file.write_all(chunk).await?;
318            }
319            if written != expected_len {
320                return Err(StorageError::Internal(format!(
321                    "S3 object length mismatch while downloading {key}: expected {expected_len}, wrote {written}"
322                )));
323            }
324            file.flush().await?;
325            drop(file);
326            finalize_local_temp_without_overwrite(
327                &temporary,
328                destination,
329                &destination.display().to_string(),
330            )
331            .await?;
332            Ok::<(), StorageError>(())
333        }
334        .await;
335        cleanup_temp_file_on_error(write_result, &temporary).await
336    }
337
338    async fn exists(&self, key: &StorageKey) -> Result<bool> {
339        self.object_len(key).await.map(|value| value.is_some())
340    }
341
342    async fn delete(&self, key: &StorageKey) -> Result<()> {
343        self.client
344            .delete_object()
345            .bucket(&self.bucket)
346            .key(self.object_key(key))
347            .send()
348            .await
349            .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
350        Ok(())
351    }
352
353    async fn list_prefix(&self, prefix: &StorageKey) -> Result<Vec<StorageKey>> {
354        let mut continuation_token = None;
355        let mut keys = Vec::new();
356        let physical_prefix = self.object_key(prefix);
357        loop {
358            let output = self
359                .client
360                .list_objects_v2()
361                .bucket(&self.bucket)
362                .prefix(&physical_prefix)
363                .set_continuation_token(continuation_token.clone())
364                .send()
365                .await
366                .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
367            for object in output.contents() {
368                if let Some(key) = object.key() {
369                    let logical_key = self.strip_prefix(key)?;
370                    if storage_key_is_within_prefix(&logical_key, prefix) {
371                        keys.push(logical_key);
372                    }
373                }
374            }
375            continuation_token = output.next_continuation_token().map(ToOwned::to_owned);
376            if continuation_token.is_none() {
377                break;
378            }
379        }
380        Ok(keys)
381    }
382
383    async fn delete_prefix_older_than(
384        &self,
385        prefix: &StorageKey,
386        older_than: SystemTime,
387    ) -> Result<u64> {
388        let mut continuation_token = None;
389        let physical_prefix = self.object_key(prefix);
390        let mut keys_to_delete = Vec::new();
391        loop {
392            let output = self
393                .client
394                .list_objects_v2()
395                .bucket(&self.bucket)
396                .prefix(&physical_prefix)
397                .set_continuation_token(continuation_token.clone())
398                .send()
399                .await
400                .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
401            for object in output.contents() {
402                let Some(key) = object.key() else {
403                    continue;
404                };
405                let logical_key = self.strip_prefix(key)?;
406                if !storage_key_is_within_prefix(&logical_key, prefix) {
407                    continue;
408                }
409                let Some(last_modified) = object.last_modified() else {
410                    continue;
411                };
412                let modified = SystemTime::try_from(*last_modified)
413                    .map_err(|e| StorageError::Backend(format!("{e:?}")))?;
414                if modified < older_than {
415                    keys_to_delete.push(logical_key);
416                }
417            }
418            continuation_token = output.next_continuation_token().map(ToOwned::to_owned);
419            if continuation_token.is_none() {
420                break;
421            }
422        }
423        let mut deleted = 0u64;
424        for key in keys_to_delete {
425            self.delete(&key).await?;
426            deleted = deleted.checked_add(1).ok_or_else(|| {
427                StorageError::Internal("deleted object count overflow".to_string())
428            })?;
429        }
430        Ok(deleted)
431    }
432}
433
434impl S3Storage {
435    fn strip_prefix(&self, physical_key: &str) -> Result<StorageKey> {
436        let logical = match &self.prefix {
437            Some(prefix) => physical_key
438                .strip_prefix(prefix)
439                .and_then(|value| value.strip_prefix('/'))
440                .ok_or_else(|| {
441                    StorageError::Internal(format!(
442                        "S3 list returned key outside configured prefix: {physical_key}"
443                    ))
444                })?,
445            None => physical_key,
446        };
447        StorageKey::try_new(logical).map_err(|e| StorageError::InvalidKey(e.to_string()))
448    }
449
450    /// Create the configured bucket when a test harness owns the object store.
451    ///
452    /// Application startup intentionally does not create buckets; production
453    /// Compose and external S3 deployments provision storage outside the app.
454    pub async fn create_bucket_if_missing_for_tests(&self) -> Result<()> {
455        let create_bucket = self
456            .client
457            .create_bucket()
458            .bucket(&self.bucket)
459            .send()
460            .await;
461        if let Err(error) = create_bucket {
462            let text = format!("{error} {error:?}");
463            if !(text.contains("BucketAlreadyOwnedByYou")
464                || text.contains("BucketAlreadyExists")
465                || text.contains("Conflict")
466                || text.contains("409"))
467            {
468                return Err(StorageError::Backend(format!("{error:?}")));
469            }
470        }
471        Ok(())
472    }
473}
474
475fn storage_key_is_within_prefix(key: &StorageKey, prefix: &StorageKey) -> bool {
476    key == prefix
477        || key
478            .as_str()
479            .strip_prefix(prefix.as_str())
480            .is_some_and(|remainder| remainder.starts_with('/'))
481}
482
483fn normalized_s3_prefix(value: Option<&str>) -> Result<Option<String>> {
484    let Some(prefix) = value.map(str::trim).filter(|value| !value.is_empty()) else {
485        return Ok(None);
486    };
487    StorageKey::try_new(prefix)
488        .map_err(|e| StorageError::InvalidKey(e.to_string()))
489        .map(|key| Some(key.to_string()))
490}
491
492fn s3_head_object_error_is_not_found(error: &SdkError<HeadObjectError>) -> bool {
493    if error
494        .as_service_error()
495        .is_some_and(HeadObjectError::is_not_found)
496    {
497        return true;
498    }
499    if error
500        .raw_response()
501        .is_some_and(|response| response.status().as_u16() == 404)
502    {
503        return true;
504    }
505    matches!(error.code(), Some("NotFound" | "NoSuchKey" | "404"))
506}
507
508fn s3_put_object_error_is_conflict(error: &SdkError<PutObjectError>) -> bool {
509    if error
510        .raw_response()
511        .is_some_and(|response| response.status().as_u16() == 412)
512    {
513        return true;
514    }
515    if error.raw_response().is_some_and(|response| {
516        response.status().as_u16() == 409
517            && matches!(error.code(), Some("ConditionalRequestConflict"))
518    }) {
519        return true;
520    }
521    matches!(
522        error.code(),
523        Some("ConditionalRequestConflict" | "PreconditionFailed")
524    )
525}