alien_bindings/providers/storage/
aws_s3.rs1use crate::providers::storage::credential_bridge::AwsCredentialBridge;
2use crate::providers::utils::{prefixed_path, relativize_path};
3use crate::{
4 error::{Error, ErrorData},
5 presigned::{PresignedOperation, PresignedRequest, PresignedRequestBackend},
6 traits::{Binding, Storage},
7};
8use alien_aws_clients::AwsCredentialProvider;
9use alien_error::{AlienError, Context, IntoAlienError};
10use async_trait::async_trait;
11use bytes::Bytes;
12use chrono::Utc;
13use futures::stream::BoxStream;
14use futures::TryStreamExt as _;
15use object_store::signer::Signer;
16use object_store::{
17 aws::{AmazonS3, AmazonS3Builder},
18 path::Path,
19 GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions,
20 PutPayload, PutResult, Result as ObjectStoreResult,
21};
22use reqwest::Method;
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::time::Duration;
26use url::Url;
27
28#[derive(Debug)]
30pub struct S3Storage {
31 url: Url,
32 base_dir: Path,
33 inner: AmazonS3,
34}
35
36impl S3Storage {
37 pub fn new(bucket_name: String, credentials: AwsCredentialProvider) -> Result<Self, Error> {
41 let s3_url = format!("s3://{}", bucket_name);
42 let url =
43 Url::parse(&s3_url)
44 .into_alien_error()
45 .context(ErrorData::InvalidConfigurationUrl {
46 url: s3_url.clone(),
47 reason: "Invalid S3 URL format".to_string(),
48 })?;
49
50 let region = credentials.region().to_string();
52 let cred_bridge = AwsCredentialBridge::new(credentials);
53 let store = AmazonS3Builder::new()
54 .with_bucket_name(&bucket_name)
55 .with_region(®ion)
56 .with_credentials(Arc::new(cred_bridge))
57 .build()
58 .into_alien_error()
59 .context(ErrorData::BindingSetupFailed {
60 binding_type: "AWS S3 storage".to_string(),
61 reason: format!("Failed to build S3 client for bucket: {}", bucket_name),
62 })?;
63
64 let base_dir = match url.path_segments() {
66 Some(segments) => Path::from_iter(segments.filter(|s| !s.is_empty())),
67 None => Path::default(), };
69
70 Ok(Self {
71 url,
72 base_dir,
73 inner: store,
74 })
75 }
76}
77
78impl Binding for S3Storage {}
79
80#[async_trait]
81impl Storage for S3Storage {
82 fn get_base_dir(&self) -> Path {
83 self.base_dir.clone()
84 }
85
86 fn get_url(&self) -> Url {
87 self.url.clone()
88 }
89
90 async fn presigned_put(
91 &self,
92 path: &Path,
93 expires_in: Duration,
94 ) -> crate::error::Result<PresignedRequest> {
95 let dst = prefixed_path(&self.base_dir, path);
96 let signed_url = self
97 .inner
98 .signed_url(Method::PUT, &dst, expires_in)
99 .await
100 .into_alien_error()
101 .context(ErrorData::StorageOperationFailed {
102 binding_name: "aws-s3".to_string(),
103 operation: format!("generate presigned PUT URL for {}", path),
104 })?;
105
106 let headers = HashMap::new();
107
108 Ok(PresignedRequest {
109 backend: PresignedRequestBackend::Http {
110 url: signed_url.to_string(),
111 method: "PUT".to_string(),
112 headers,
113 },
114 expiration: Utc::now()
115 + chrono::Duration::from_std(expires_in).map_err(|e| {
116 AlienError::new(ErrorData::Other {
117 message: format!("Invalid duration: {}", e),
118 })
119 })?,
120 operation: PresignedOperation::Put,
121 path: path.to_string(),
122 })
123 }
124
125 async fn presigned_get(
126 &self,
127 path: &Path,
128 expires_in: Duration,
129 ) -> crate::error::Result<PresignedRequest> {
130 let dst = prefixed_path(&self.base_dir, path);
131 let signed_url = self
132 .inner
133 .signed_url(Method::GET, &dst, expires_in)
134 .await
135 .into_alien_error()
136 .context(ErrorData::StorageOperationFailed {
137 binding_name: "aws-s3".to_string(),
138 operation: format!("generate presigned GET URL for {}", path),
139 })?;
140
141 let headers = HashMap::new();
142
143 Ok(PresignedRequest {
144 backend: PresignedRequestBackend::Http {
145 url: signed_url.to_string(),
146 method: "GET".to_string(),
147 headers,
148 },
149 expiration: Utc::now()
150 + chrono::Duration::from_std(expires_in).map_err(|e| {
151 AlienError::new(ErrorData::Other {
152 message: format!("Invalid duration: {}", e),
153 })
154 })?,
155 operation: PresignedOperation::Get,
156 path: path.to_string(),
157 })
158 }
159
160 async fn presigned_delete(
161 &self,
162 path: &Path,
163 expires_in: Duration,
164 ) -> crate::error::Result<PresignedRequest> {
165 let dst = prefixed_path(&self.base_dir, path);
166 let signed_url = self
167 .inner
168 .signed_url(Method::DELETE, &dst, expires_in)
169 .await
170 .into_alien_error()
171 .context(ErrorData::StorageOperationFailed {
172 binding_name: "aws-s3".to_string(),
173 operation: format!("generate presigned DELETE URL for {}", path),
174 })?;
175
176 let headers = HashMap::new();
177
178 Ok(PresignedRequest {
179 backend: PresignedRequestBackend::Http {
180 url: signed_url.to_string(),
181 method: "DELETE".to_string(),
182 headers,
183 },
184 expiration: Utc::now()
185 + chrono::Duration::from_std(expires_in).map_err(|e| {
186 AlienError::new(ErrorData::Other {
187 message: format!("Invalid duration: {}", e),
188 })
189 })?,
190 operation: PresignedOperation::Delete,
191 path: path.to_string(),
192 })
193 }
194}
195
196#[async_trait]
199impl ObjectStore for S3Storage {
200 async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
201 let dst = prefixed_path(&self.base_dir, location);
202 self.inner.put(&dst, payload).await
203 }
204
205 async fn put_opts(
206 &self,
207 location: &Path,
208 payload: PutPayload,
209 opts: PutOptions,
210 ) -> ObjectStoreResult<PutResult> {
211 let dst = prefixed_path(&self.base_dir, location);
212 self.inner.put_opts(&dst, payload, opts).await
213 }
214
215 async fn put_multipart(
216 &self,
217 location: &Path,
218 ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
219 let dst = prefixed_path(&self.base_dir, location);
220 self.inner.put_multipart(&dst).await
221 }
222
223 async fn put_multipart_opts(
224 &self,
225 location: &Path,
226 opts: PutMultipartOpts,
227 ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
228 let dst = prefixed_path(&self.base_dir, location);
229 self.inner.put_multipart_opts(&dst, opts).await
230 }
231
232 async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
233 let src = prefixed_path(&self.base_dir, location);
234 self.inner.get(&src).await
235 }
236
237 async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
238 let src = prefixed_path(&self.base_dir, location);
239 self.inner.get_opts(&src, options).await
240 }
241
242 async fn get_range(
243 &self,
244 location: &Path,
245 range: std::ops::Range<u64>,
246 ) -> ObjectStoreResult<Bytes> {
247 let src = prefixed_path(&self.base_dir, location);
248 self.inner.get_range(&src, range).await
249 }
250
251 async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
252 let src = prefixed_path(&self.base_dir, location);
253 let mut meta = self.inner.head(&src).await?;
254 meta.location = relativize_path(&self.base_dir, meta.location, "AwsStorage")?;
255 Ok(meta)
256 }
257
258 async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
259 let src = prefixed_path(&self.base_dir, location);
260 self.inner.delete(&src).await
261 }
262
263 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
264 let list_prefix_for_inner = prefix
265 .map(|p| prefixed_path(&self.base_dir, p))
266 .unwrap_or_else(|| self.base_dir.clone());
267
268 let base_dir_for_stream = self.base_dir.clone();
269
270 Box::pin(
271 self.inner
272 .list(Some(&list_prefix_for_inner))
273 .and_then(move |mut meta| {
274 let captured_base_dir = base_dir_for_stream.clone();
275 async move {
276 meta.location =
277 relativize_path(&captured_base_dir, meta.location, "AwsStorage")?;
278 Ok(meta)
279 }
280 }),
281 )
282 }
283
284 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
285 let list_prefix_for_inner = prefix
286 .map(|p| prefixed_path(&self.base_dir, p))
287 .unwrap_or_else(|| self.base_dir.clone());
288 let mut result = self
289 .inner
290 .list_with_delimiter(Some(&list_prefix_for_inner))
291 .await?;
292
293 for meta_obj in &mut result.objects {
294 let original_location = std::mem::take(&mut meta_obj.location);
295 meta_obj.location = relativize_path(&self.base_dir, original_location, "AwsStorage")?;
296 }
297
298 let mut new_common_prefixes = Vec::with_capacity(result.common_prefixes.len());
299 for cp in result.common_prefixes {
300 new_common_prefixes.push(relativize_path(&self.base_dir, cp, "AwsStorage")?);
301 }
302 result.common_prefixes = new_common_prefixes;
303
304 Ok(result)
305 }
306
307 async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
308 let src = prefixed_path(&self.base_dir, from);
309 let dst = prefixed_path(&self.base_dir, to);
310 self.inner.copy(&src, &dst).await
311 }
312
313 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
314 let src = prefixed_path(&self.base_dir, from);
315 let dst = prefixed_path(&self.base_dir, to);
316 self.inner.copy_if_not_exists(&src, &dst).await
317 }
318}
319
320impl std::fmt::Display for S3Storage {
321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 write!(f, "AwsStorage(url={})", self.url)
323 }
324}