Skip to main content

alien_bindings/providers/storage/
aws_s3.rs

1use crate::providers::storage::credential_bridge::AwsCredentialBridge;
2use crate::providers::utils::{prefixed_path, relativize_path};
3use crate::{
4    error::{Error, ErrorData},
5    presigned::{PresignedOperation, PresignedRequest, PresignedRequestBackend},
6    traits::{Binding, Storage},
7};
8use alien_error::{AlienError, Context, IntoAlienError};
9use async_trait::async_trait;
10use bytes::Bytes;
11use chrono::Utc;
12use futures::stream::BoxStream;
13use futures::TryStreamExt as _;
14use object_store::signer::Signer;
15use object_store::{
16    aws::{AmazonS3, AmazonS3Builder},
17    path::Path,
18    GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions,
19    PutPayload, PutResult, Result as ObjectStoreResult,
20};
21use reqwest::Method;
22use std::collections::HashMap;
23use std::sync::Arc;
24use std::time::Duration;
25use url::Url;
26
27/// AWS S3 storage implementation using object_store backend.
28#[derive(Debug)]
29pub struct S3Storage {
30    url: Url,
31    base_dir: Path,
32    inner: AmazonS3,
33}
34
35impl S3Storage {
36    /// Creates a new `S3Storage` instance from bucket configuration.
37    ///
38    /// Uses AWS config for credentials.
39    pub fn new(
40        bucket_name: String,
41        aws_config: &alien_core::AwsClientConfig,
42    ) -> Result<Self, Error> {
43        let s3_url = format!("s3://{}", bucket_name);
44        let url =
45            Url::parse(&s3_url)
46                .into_alien_error()
47                .context(ErrorData::InvalidConfigurationUrl {
48                    url: s3_url.clone(),
49                    reason: "Invalid S3 URL format".to_string(),
50                })?;
51
52        // Build the store with credentials bridged from AwsClientConfig
53        let credentials = AwsCredentialBridge::new(aws_config.clone());
54        let store = AmazonS3Builder::new()
55            .with_bucket_name(&bucket_name)
56            .with_region(&aws_config.region)
57            .with_credentials(Arc::new(credentials))
58            .build()
59            .into_alien_error()
60            .context(ErrorData::BindingSetupFailed {
61                binding_type: "AWS S3 storage".to_string(),
62                reason: format!("Failed to build S3 client for bucket: {}", bucket_name),
63            })?;
64
65        // Extract the base path from the URL path segments, handling the None case.
66        let base_dir = match url.path_segments() {
67            Some(segments) => Path::from_iter(segments.filter(|s| !s.is_empty())),
68            None => Path::default(), // Use an empty path if there are no segments
69        };
70
71        Ok(Self {
72            url,
73            base_dir,
74            inner: store,
75        })
76    }
77}
78
79impl Binding for S3Storage {}
80
81#[async_trait]
82impl Storage for S3Storage {
83    fn get_base_dir(&self) -> Path {
84        self.base_dir.clone()
85    }
86
87    fn get_url(&self) -> Url {
88        self.url.clone()
89    }
90
91    async fn presigned_put(
92        &self,
93        path: &Path,
94        expires_in: Duration,
95    ) -> crate::error::Result<PresignedRequest> {
96        let dst = prefixed_path(&self.base_dir, path);
97        let signed_url = self
98            .inner
99            .signed_url(Method::PUT, &dst, expires_in)
100            .await
101            .into_alien_error()
102            .context(ErrorData::StorageOperationFailed {
103                binding_name: "aws-s3".to_string(),
104                operation: format!("generate presigned PUT URL for {}", path),
105            })?;
106
107        let headers = HashMap::new();
108
109        Ok(PresignedRequest {
110            backend: PresignedRequestBackend::Http {
111                url: signed_url.to_string(),
112                method: "PUT".to_string(),
113                headers,
114            },
115            expiration: Utc::now()
116                + chrono::Duration::from_std(expires_in).map_err(|e| {
117                    AlienError::new(ErrorData::Other {
118                        message: format!("Invalid duration: {}", e),
119                    })
120                })?,
121            operation: PresignedOperation::Put,
122            path: path.to_string(),
123        })
124    }
125
126    async fn presigned_get(
127        &self,
128        path: &Path,
129        expires_in: Duration,
130    ) -> crate::error::Result<PresignedRequest> {
131        let dst = prefixed_path(&self.base_dir, path);
132        let signed_url = self
133            .inner
134            .signed_url(Method::GET, &dst, expires_in)
135            .await
136            .into_alien_error()
137            .context(ErrorData::StorageOperationFailed {
138                binding_name: "aws-s3".to_string(),
139                operation: format!("generate presigned GET URL for {}", path),
140            })?;
141
142        let headers = HashMap::new();
143
144        Ok(PresignedRequest {
145            backend: PresignedRequestBackend::Http {
146                url: signed_url.to_string(),
147                method: "GET".to_string(),
148                headers,
149            },
150            expiration: Utc::now()
151                + chrono::Duration::from_std(expires_in).map_err(|e| {
152                    AlienError::new(ErrorData::Other {
153                        message: format!("Invalid duration: {}", e),
154                    })
155                })?,
156            operation: PresignedOperation::Get,
157            path: path.to_string(),
158        })
159    }
160
161    async fn presigned_delete(
162        &self,
163        path: &Path,
164        expires_in: Duration,
165    ) -> crate::error::Result<PresignedRequest> {
166        let dst = prefixed_path(&self.base_dir, path);
167        let signed_url = self
168            .inner
169            .signed_url(Method::DELETE, &dst, expires_in)
170            .await
171            .into_alien_error()
172            .context(ErrorData::StorageOperationFailed {
173                binding_name: "aws-s3".to_string(),
174                operation: format!("generate presigned DELETE URL for {}", path),
175            })?;
176
177        let headers = HashMap::new();
178
179        Ok(PresignedRequest {
180            backend: PresignedRequestBackend::Http {
181                url: signed_url.to_string(),
182                method: "DELETE".to_string(),
183                headers,
184            },
185            expiration: Utc::now()
186                + chrono::Duration::from_std(expires_in).map_err(|e| {
187                    AlienError::new(ErrorData::Other {
188                        message: format!("Invalid duration: {}", e),
189                    })
190                })?,
191            operation: PresignedOperation::Delete,
192            path: path.to_string(),
193        })
194    }
195}
196
197// Delegate ObjectStore trait implementation to the inner store,
198// prefixing paths with the base_dir.
199#[async_trait]
200impl ObjectStore for S3Storage {
201    async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
202        let dst = prefixed_path(&self.base_dir, location);
203        self.inner.put(&dst, payload).await
204    }
205
206    async fn put_opts(
207        &self,
208        location: &Path,
209        payload: PutPayload,
210        opts: PutOptions,
211    ) -> ObjectStoreResult<PutResult> {
212        let dst = prefixed_path(&self.base_dir, location);
213        self.inner.put_opts(&dst, payload, opts).await
214    }
215
216    async fn put_multipart(
217        &self,
218        location: &Path,
219    ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
220        let dst = prefixed_path(&self.base_dir, location);
221        self.inner.put_multipart(&dst).await
222    }
223
224    async fn put_multipart_opts(
225        &self,
226        location: &Path,
227        opts: PutMultipartOpts,
228    ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
229        let dst = prefixed_path(&self.base_dir, location);
230        self.inner.put_multipart_opts(&dst, opts).await
231    }
232
233    async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
234        let src = prefixed_path(&self.base_dir, location);
235        self.inner.get(&src).await
236    }
237
238    async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
239        let src = prefixed_path(&self.base_dir, location);
240        self.inner.get_opts(&src, options).await
241    }
242
243    async fn get_range(
244        &self,
245        location: &Path,
246        range: std::ops::Range<u64>,
247    ) -> ObjectStoreResult<Bytes> {
248        let src = prefixed_path(&self.base_dir, location);
249        self.inner.get_range(&src, range).await
250    }
251
252    async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
253        let src = prefixed_path(&self.base_dir, location);
254        let mut meta = self.inner.head(&src).await?;
255        meta.location = relativize_path(&self.base_dir, meta.location, "AwsStorage")?;
256        Ok(meta)
257    }
258
259    async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
260        let src = prefixed_path(&self.base_dir, location);
261        self.inner.delete(&src).await
262    }
263
264    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
265        let list_prefix_for_inner = prefix
266            .map(|p| prefixed_path(&self.base_dir, p))
267            .unwrap_or_else(|| self.base_dir.clone());
268
269        let base_dir_for_stream = self.base_dir.clone();
270
271        Box::pin(
272            self.inner
273                .list(Some(&list_prefix_for_inner))
274                .and_then(move |mut meta| {
275                    let captured_base_dir = base_dir_for_stream.clone();
276                    async move {
277                        meta.location =
278                            relativize_path(&captured_base_dir, meta.location, "AwsStorage")?;
279                        Ok(meta)
280                    }
281                }),
282        )
283    }
284
285    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
286        let list_prefix_for_inner = prefix
287            .map(|p| prefixed_path(&self.base_dir, p))
288            .unwrap_or_else(|| self.base_dir.clone());
289        let mut result = self
290            .inner
291            .list_with_delimiter(Some(&list_prefix_for_inner))
292            .await?;
293
294        for meta_obj in &mut result.objects {
295            let original_location = std::mem::take(&mut meta_obj.location);
296            meta_obj.location = relativize_path(&self.base_dir, original_location, "AwsStorage")?;
297        }
298
299        let mut new_common_prefixes = Vec::with_capacity(result.common_prefixes.len());
300        for cp in result.common_prefixes {
301            new_common_prefixes.push(relativize_path(&self.base_dir, cp, "AwsStorage")?);
302        }
303        result.common_prefixes = new_common_prefixes;
304
305        Ok(result)
306    }
307
308    async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
309        let src = prefixed_path(&self.base_dir, from);
310        let dst = prefixed_path(&self.base_dir, to);
311        self.inner.copy(&src, &dst).await
312    }
313
314    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
315        let src = prefixed_path(&self.base_dir, from);
316        let dst = prefixed_path(&self.base_dir, to);
317        self.inner.copy_if_not_exists(&src, &dst).await
318    }
319}
320
321impl std::fmt::Display for S3Storage {
322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323        write!(f, "AwsStorage(url={})", self.url)
324    }
325}