Skip to main content

alien_bindings/providers/storage/
aws_s3.rs

1use crate::providers::storage::credential_bridge::AwsCredentialBridge;
2use crate::providers::utils::{prefixed_path, relativize_path};
3use crate::{
4    error::{Error, ErrorData},
5    presigned::{PresignedOperation, PresignedRequest, PresignedRequestBackend},
6    traits::{Binding, Storage},
7};
8use alien_aws_clients::AwsCredentialProvider;
9use alien_error::{AlienError, Context, IntoAlienError};
10use async_trait::async_trait;
11use bytes::Bytes;
12use chrono::Utc;
13use futures::stream::BoxStream;
14use futures::TryStreamExt as _;
15use object_store::signer::Signer;
16use object_store::{
17    aws::{AmazonS3, AmazonS3Builder},
18    path::Path,
19    GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions,
20    PutPayload, PutResult, Result as ObjectStoreResult,
21};
22use reqwest::Method;
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::time::Duration;
26use url::Url;
27
28/// AWS S3 storage implementation using object_store backend.
29#[derive(Debug)]
30pub struct S3Storage {
31    url: Url,
32    base_dir: Path,
33    inner: AmazonS3,
34}
35
36impl S3Storage {
37    /// Creates a new `S3Storage` instance from bucket configuration.
38    ///
39    /// Uses AWS config for credentials.
40    pub fn new(bucket_name: String, credentials: AwsCredentialProvider) -> Result<Self, Error> {
41        let s3_url = format!("s3://{}", bucket_name);
42        let url =
43            Url::parse(&s3_url)
44                .into_alien_error()
45                .context(ErrorData::InvalidConfigurationUrl {
46                    url: s3_url.clone(),
47                    reason: "Invalid S3 URL format".to_string(),
48                })?;
49
50        // Build the store with credentials bridged from AwsCredentialProvider
51        let region = credentials.region().to_string();
52        let cred_bridge = AwsCredentialBridge::new(credentials);
53        let store = AmazonS3Builder::new()
54            .with_bucket_name(&bucket_name)
55            .with_region(&region)
56            .with_credentials(Arc::new(cred_bridge))
57            .build()
58            .into_alien_error()
59            .context(ErrorData::BindingSetupFailed {
60                binding_type: "AWS S3 storage".to_string(),
61                reason: format!("Failed to build S3 client for bucket: {}", bucket_name),
62            })?;
63
64        // Extract the base path from the URL path segments, handling the None case.
65        let base_dir = match url.path_segments() {
66            Some(segments) => Path::from_iter(segments.filter(|s| !s.is_empty())),
67            None => Path::default(), // Use an empty path if there are no segments
68        };
69
70        Ok(Self {
71            url,
72            base_dir,
73            inner: store,
74        })
75    }
76}
77
78impl Binding for S3Storage {}
79
80#[async_trait]
81impl Storage for S3Storage {
82    fn get_base_dir(&self) -> Path {
83        self.base_dir.clone()
84    }
85
86    fn get_url(&self) -> Url {
87        self.url.clone()
88    }
89
90    async fn presigned_put(
91        &self,
92        path: &Path,
93        expires_in: Duration,
94    ) -> crate::error::Result<PresignedRequest> {
95        let dst = prefixed_path(&self.base_dir, path);
96        let signed_url = self
97            .inner
98            .signed_url(Method::PUT, &dst, expires_in)
99            .await
100            .into_alien_error()
101            .context(ErrorData::StorageOperationFailed {
102                binding_name: "aws-s3".to_string(),
103                operation: format!("generate presigned PUT URL for {}", path),
104            })?;
105
106        let headers = HashMap::new();
107
108        Ok(PresignedRequest {
109            backend: PresignedRequestBackend::Http {
110                url: signed_url.to_string(),
111                method: "PUT".to_string(),
112                headers,
113            },
114            expiration: Utc::now()
115                + chrono::Duration::from_std(expires_in).map_err(|e| {
116                    AlienError::new(ErrorData::Other {
117                        message: format!("Invalid duration: {}", e),
118                    })
119                })?,
120            operation: PresignedOperation::Put,
121            path: path.to_string(),
122        })
123    }
124
125    async fn presigned_get(
126        &self,
127        path: &Path,
128        expires_in: Duration,
129    ) -> crate::error::Result<PresignedRequest> {
130        let dst = prefixed_path(&self.base_dir, path);
131        let signed_url = self
132            .inner
133            .signed_url(Method::GET, &dst, expires_in)
134            .await
135            .into_alien_error()
136            .context(ErrorData::StorageOperationFailed {
137                binding_name: "aws-s3".to_string(),
138                operation: format!("generate presigned GET URL for {}", path),
139            })?;
140
141        let headers = HashMap::new();
142
143        Ok(PresignedRequest {
144            backend: PresignedRequestBackend::Http {
145                url: signed_url.to_string(),
146                method: "GET".to_string(),
147                headers,
148            },
149            expiration: Utc::now()
150                + chrono::Duration::from_std(expires_in).map_err(|e| {
151                    AlienError::new(ErrorData::Other {
152                        message: format!("Invalid duration: {}", e),
153                    })
154                })?,
155            operation: PresignedOperation::Get,
156            path: path.to_string(),
157        })
158    }
159
160    async fn presigned_delete(
161        &self,
162        path: &Path,
163        expires_in: Duration,
164    ) -> crate::error::Result<PresignedRequest> {
165        let dst = prefixed_path(&self.base_dir, path);
166        let signed_url = self
167            .inner
168            .signed_url(Method::DELETE, &dst, expires_in)
169            .await
170            .into_alien_error()
171            .context(ErrorData::StorageOperationFailed {
172                binding_name: "aws-s3".to_string(),
173                operation: format!("generate presigned DELETE URL for {}", path),
174            })?;
175
176        let headers = HashMap::new();
177
178        Ok(PresignedRequest {
179            backend: PresignedRequestBackend::Http {
180                url: signed_url.to_string(),
181                method: "DELETE".to_string(),
182                headers,
183            },
184            expiration: Utc::now()
185                + chrono::Duration::from_std(expires_in).map_err(|e| {
186                    AlienError::new(ErrorData::Other {
187                        message: format!("Invalid duration: {}", e),
188                    })
189                })?,
190            operation: PresignedOperation::Delete,
191            path: path.to_string(),
192        })
193    }
194}
195
196// Delegate ObjectStore trait implementation to the inner store,
197// prefixing paths with the base_dir.
198#[async_trait]
199impl ObjectStore for S3Storage {
200    async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
201        let dst = prefixed_path(&self.base_dir, location);
202        self.inner.put(&dst, payload).await
203    }
204
205    async fn put_opts(
206        &self,
207        location: &Path,
208        payload: PutPayload,
209        opts: PutOptions,
210    ) -> ObjectStoreResult<PutResult> {
211        let dst = prefixed_path(&self.base_dir, location);
212        self.inner.put_opts(&dst, payload, opts).await
213    }
214
215    async fn put_multipart(
216        &self,
217        location: &Path,
218    ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
219        let dst = prefixed_path(&self.base_dir, location);
220        self.inner.put_multipart(&dst).await
221    }
222
223    async fn put_multipart_opts(
224        &self,
225        location: &Path,
226        opts: PutMultipartOptions,
227    ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
228        let dst = prefixed_path(&self.base_dir, location);
229        self.inner.put_multipart_opts(&dst, opts).await
230    }
231
232    async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
233        let src = prefixed_path(&self.base_dir, location);
234        self.inner.get(&src).await
235    }
236
237    async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
238        let src = prefixed_path(&self.base_dir, location);
239        self.inner.get_opts(&src, options).await
240    }
241
242    async fn get_range(
243        &self,
244        location: &Path,
245        range: std::ops::Range<u64>,
246    ) -> ObjectStoreResult<Bytes> {
247        let src = prefixed_path(&self.base_dir, location);
248        self.inner.get_range(&src, range).await
249    }
250
251    async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
252        let src = prefixed_path(&self.base_dir, location);
253        let mut meta = self.inner.head(&src).await?;
254        meta.location = relativize_path(&self.base_dir, meta.location, "AwsStorage")?;
255        Ok(meta)
256    }
257
258    async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
259        let src = prefixed_path(&self.base_dir, location);
260        self.inner.delete(&src).await
261    }
262
263    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
264        let list_prefix_for_inner = prefix
265            .map(|p| prefixed_path(&self.base_dir, p))
266            .unwrap_or_else(|| self.base_dir.clone());
267
268        let base_dir_for_stream = self.base_dir.clone();
269
270        Box::pin(
271            self.inner
272                .list(Some(&list_prefix_for_inner))
273                .and_then(move |mut meta| {
274                    let captured_base_dir = base_dir_for_stream.clone();
275                    async move {
276                        meta.location =
277                            relativize_path(&captured_base_dir, meta.location, "AwsStorage")?;
278                        Ok(meta)
279                    }
280                }),
281        )
282    }
283
284    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
285        let list_prefix_for_inner = prefix
286            .map(|p| prefixed_path(&self.base_dir, p))
287            .unwrap_or_else(|| self.base_dir.clone());
288        let mut result = self
289            .inner
290            .list_with_delimiter(Some(&list_prefix_for_inner))
291            .await?;
292
293        for meta_obj in &mut result.objects {
294            let original_location = std::mem::take(&mut meta_obj.location);
295            meta_obj.location = relativize_path(&self.base_dir, original_location, "AwsStorage")?;
296        }
297
298        let mut new_common_prefixes = Vec::with_capacity(result.common_prefixes.len());
299        for cp in result.common_prefixes {
300            new_common_prefixes.push(relativize_path(&self.base_dir, cp, "AwsStorage")?);
301        }
302        result.common_prefixes = new_common_prefixes;
303
304        Ok(result)
305    }
306
307    async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
308        let src = prefixed_path(&self.base_dir, from);
309        let dst = prefixed_path(&self.base_dir, to);
310        self.inner.copy(&src, &dst).await
311    }
312
313    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
314        let src = prefixed_path(&self.base_dir, from);
315        let dst = prefixed_path(&self.base_dir, to);
316        self.inner.copy_if_not_exists(&src, &dst).await
317    }
318}
319
320impl std::fmt::Display for S3Storage {
321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322        write!(f, "AwsStorage(url={})", self.url)
323    }
324}