alien_bindings/providers/storage/
aws_s3.rs1use crate::providers::storage::credential_bridge::AwsCredentialBridge;
2use crate::providers::utils::{prefixed_path, relativize_path};
3use crate::{
4 error::{Error, ErrorData},
5 presigned::{PresignedOperation, PresignedRequest, PresignedRequestBackend},
6 traits::{Binding, Storage},
7};
8use alien_error::{AlienError, Context, IntoAlienError};
9use async_trait::async_trait;
10use bytes::Bytes;
11use chrono::Utc;
12use futures::stream::BoxStream;
13use futures::TryStreamExt as _;
14use object_store::signer::Signer;
15use object_store::{
16 aws::{AmazonS3, AmazonS3Builder},
17 path::Path,
18 GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions,
19 PutPayload, PutResult, Result as ObjectStoreResult,
20};
21use reqwest::Method;
22use std::collections::HashMap;
23use std::sync::Arc;
24use std::time::Duration;
25use url::Url;
26
27#[derive(Debug)]
29pub struct S3Storage {
30 url: Url,
31 base_dir: Path,
32 inner: AmazonS3,
33}
34
35impl S3Storage {
36 pub fn new(
40 bucket_name: String,
41 aws_config: &alien_core::AwsClientConfig,
42 ) -> Result<Self, Error> {
43 let s3_url = format!("s3://{}", bucket_name);
44 let url =
45 Url::parse(&s3_url)
46 .into_alien_error()
47 .context(ErrorData::InvalidConfigurationUrl {
48 url: s3_url.clone(),
49 reason: "Invalid S3 URL format".to_string(),
50 })?;
51
52 let credentials = AwsCredentialBridge::new(aws_config.clone());
54 let store = AmazonS3Builder::new()
55 .with_bucket_name(&bucket_name)
56 .with_region(&aws_config.region)
57 .with_credentials(Arc::new(credentials))
58 .build()
59 .into_alien_error()
60 .context(ErrorData::BindingSetupFailed {
61 binding_type: "AWS S3 storage".to_string(),
62 reason: format!("Failed to build S3 client for bucket: {}", bucket_name),
63 })?;
64
65 let base_dir = match url.path_segments() {
67 Some(segments) => Path::from_iter(segments.filter(|s| !s.is_empty())),
68 None => Path::default(), };
70
71 Ok(Self {
72 url,
73 base_dir,
74 inner: store,
75 })
76 }
77}
78
79impl Binding for S3Storage {}
80
81#[async_trait]
82impl Storage for S3Storage {
83 fn get_base_dir(&self) -> Path {
84 self.base_dir.clone()
85 }
86
87 fn get_url(&self) -> Url {
88 self.url.clone()
89 }
90
91 async fn presigned_put(
92 &self,
93 path: &Path,
94 expires_in: Duration,
95 ) -> crate::error::Result<PresignedRequest> {
96 let dst = prefixed_path(&self.base_dir, path);
97 let signed_url = self
98 .inner
99 .signed_url(Method::PUT, &dst, expires_in)
100 .await
101 .into_alien_error()
102 .context(ErrorData::StorageOperationFailed {
103 binding_name: "aws-s3".to_string(),
104 operation: format!("generate presigned PUT URL for {}", path),
105 })?;
106
107 let headers = HashMap::new();
108
109 Ok(PresignedRequest {
110 backend: PresignedRequestBackend::Http {
111 url: signed_url.to_string(),
112 method: "PUT".to_string(),
113 headers,
114 },
115 expiration: Utc::now()
116 + chrono::Duration::from_std(expires_in).map_err(|e| {
117 AlienError::new(ErrorData::Other {
118 message: format!("Invalid duration: {}", e),
119 })
120 })?,
121 operation: PresignedOperation::Put,
122 path: path.to_string(),
123 })
124 }
125
126 async fn presigned_get(
127 &self,
128 path: &Path,
129 expires_in: Duration,
130 ) -> crate::error::Result<PresignedRequest> {
131 let dst = prefixed_path(&self.base_dir, path);
132 let signed_url = self
133 .inner
134 .signed_url(Method::GET, &dst, expires_in)
135 .await
136 .into_alien_error()
137 .context(ErrorData::StorageOperationFailed {
138 binding_name: "aws-s3".to_string(),
139 operation: format!("generate presigned GET URL for {}", path),
140 })?;
141
142 let headers = HashMap::new();
143
144 Ok(PresignedRequest {
145 backend: PresignedRequestBackend::Http {
146 url: signed_url.to_string(),
147 method: "GET".to_string(),
148 headers,
149 },
150 expiration: Utc::now()
151 + chrono::Duration::from_std(expires_in).map_err(|e| {
152 AlienError::new(ErrorData::Other {
153 message: format!("Invalid duration: {}", e),
154 })
155 })?,
156 operation: PresignedOperation::Get,
157 path: path.to_string(),
158 })
159 }
160
161 async fn presigned_delete(
162 &self,
163 path: &Path,
164 expires_in: Duration,
165 ) -> crate::error::Result<PresignedRequest> {
166 let dst = prefixed_path(&self.base_dir, path);
167 let signed_url = self
168 .inner
169 .signed_url(Method::DELETE, &dst, expires_in)
170 .await
171 .into_alien_error()
172 .context(ErrorData::StorageOperationFailed {
173 binding_name: "aws-s3".to_string(),
174 operation: format!("generate presigned DELETE URL for {}", path),
175 })?;
176
177 let headers = HashMap::new();
178
179 Ok(PresignedRequest {
180 backend: PresignedRequestBackend::Http {
181 url: signed_url.to_string(),
182 method: "DELETE".to_string(),
183 headers,
184 },
185 expiration: Utc::now()
186 + chrono::Duration::from_std(expires_in).map_err(|e| {
187 AlienError::new(ErrorData::Other {
188 message: format!("Invalid duration: {}", e),
189 })
190 })?,
191 operation: PresignedOperation::Delete,
192 path: path.to_string(),
193 })
194 }
195}
196
197#[async_trait]
200impl ObjectStore for S3Storage {
201 async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
202 let dst = prefixed_path(&self.base_dir, location);
203 self.inner.put(&dst, payload).await
204 }
205
206 async fn put_opts(
207 &self,
208 location: &Path,
209 payload: PutPayload,
210 opts: PutOptions,
211 ) -> ObjectStoreResult<PutResult> {
212 let dst = prefixed_path(&self.base_dir, location);
213 self.inner.put_opts(&dst, payload, opts).await
214 }
215
216 async fn put_multipart(
217 &self,
218 location: &Path,
219 ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
220 let dst = prefixed_path(&self.base_dir, location);
221 self.inner.put_multipart(&dst).await
222 }
223
224 async fn put_multipart_opts(
225 &self,
226 location: &Path,
227 opts: PutMultipartOpts,
228 ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
229 let dst = prefixed_path(&self.base_dir, location);
230 self.inner.put_multipart_opts(&dst, opts).await
231 }
232
233 async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
234 let src = prefixed_path(&self.base_dir, location);
235 self.inner.get(&src).await
236 }
237
238 async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
239 let src = prefixed_path(&self.base_dir, location);
240 self.inner.get_opts(&src, options).await
241 }
242
243 async fn get_range(
244 &self,
245 location: &Path,
246 range: std::ops::Range<u64>,
247 ) -> ObjectStoreResult<Bytes> {
248 let src = prefixed_path(&self.base_dir, location);
249 self.inner.get_range(&src, range).await
250 }
251
252 async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
253 let src = prefixed_path(&self.base_dir, location);
254 let mut meta = self.inner.head(&src).await?;
255 meta.location = relativize_path(&self.base_dir, meta.location, "AwsStorage")?;
256 Ok(meta)
257 }
258
259 async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
260 let src = prefixed_path(&self.base_dir, location);
261 self.inner.delete(&src).await
262 }
263
264 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
265 let list_prefix_for_inner = prefix
266 .map(|p| prefixed_path(&self.base_dir, p))
267 .unwrap_or_else(|| self.base_dir.clone());
268
269 let base_dir_for_stream = self.base_dir.clone();
270
271 Box::pin(
272 self.inner
273 .list(Some(&list_prefix_for_inner))
274 .and_then(move |mut meta| {
275 let captured_base_dir = base_dir_for_stream.clone();
276 async move {
277 meta.location =
278 relativize_path(&captured_base_dir, meta.location, "AwsStorage")?;
279 Ok(meta)
280 }
281 }),
282 )
283 }
284
285 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
286 let list_prefix_for_inner = prefix
287 .map(|p| prefixed_path(&self.base_dir, p))
288 .unwrap_or_else(|| self.base_dir.clone());
289 let mut result = self
290 .inner
291 .list_with_delimiter(Some(&list_prefix_for_inner))
292 .await?;
293
294 for meta_obj in &mut result.objects {
295 let original_location = std::mem::take(&mut meta_obj.location);
296 meta_obj.location = relativize_path(&self.base_dir, original_location, "AwsStorage")?;
297 }
298
299 let mut new_common_prefixes = Vec::with_capacity(result.common_prefixes.len());
300 for cp in result.common_prefixes {
301 new_common_prefixes.push(relativize_path(&self.base_dir, cp, "AwsStorage")?);
302 }
303 result.common_prefixes = new_common_prefixes;
304
305 Ok(result)
306 }
307
308 async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
309 let src = prefixed_path(&self.base_dir, from);
310 let dst = prefixed_path(&self.base_dir, to);
311 self.inner.copy(&src, &dst).await
312 }
313
314 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
315 let src = prefixed_path(&self.base_dir, from);
316 let dst = prefixed_path(&self.base_dir, to);
317 self.inner.copy_if_not_exists(&src, &dst).await
318 }
319}
320
321impl std::fmt::Display for S3Storage {
322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323 write!(f, "AwsStorage(url={})", self.url)
324 }
325}