1use crate::{CreateBucketOutcome, FileStream, StorageLayerError, StorageLayerImpl};
13use aws_config::SdkConfig;
14use aws_sdk_s3::{
15 config::Credentials,
16 error::SdkError,
17 operation::{
18 create_bucket::CreateBucketError, delete_bucket::DeleteBucketError,
19 delete_object::DeleteObjectError, get_object::GetObjectError, head_bucket::HeadBucketError,
20 put_bucket_cors::PutBucketCorsError,
21 put_bucket_notification_configuration::PutBucketNotificationConfigurationError,
22 put_object::PutObjectError,
23 },
24 presigning::{PresignedRequest, PresigningConfig},
25 primitives::ByteStream,
26 types::{
27 BucketLocationConstraint, CorsConfiguration, CorsRule, CreateBucketConfiguration,
28 NotificationConfiguration, QueueConfiguration,
29 },
30};
31use bytes::Bytes;
32use chrono::{DateTime, TimeDelta, Utc};
33use futures::Stream;
34use serde::{Deserialize, Serialize};
35use std::{error::Error, fmt::Debug, time::Duration};
36use thiserror::Error;
37
38type S3Client = aws_sdk_s3::Client;
39
40#[derive(Debug, Clone, Deserialize, Serialize)]
42pub struct S3StorageLayerFactoryConfig {
43 pub endpoint: S3Endpoint,
45}
46
47#[derive(Debug, Error)]
49pub enum S3StorageLayerFactoryConfigError {
50 #[error("cannot use DOCBOX_S3_ENDPOINT without specifying DOCBOX_S3_ACCESS_KEY_ID")]
52 MissingAccessKeyId,
53
54 #[error("cannot use DOCBOX_S3_ENDPOINT without specifying DOCBOX_S3_ACCESS_KEY_SECRET")]
56 MissingAccessKeySecret,
57}
58
59impl S3StorageLayerFactoryConfig {
60 pub fn from_env() -> Result<Self, S3StorageLayerFactoryConfigError> {
62 let endpoint = S3Endpoint::from_env()?;
63
64 Ok(Self { endpoint })
65 }
66}
67
68#[derive(Clone, Deserialize, Serialize)]
70#[serde(tag = "type", rename_all = "snake_case")]
71pub enum S3Endpoint {
72 Aws,
74 Custom {
76 endpoint: String,
78 external_endpoint: Option<String>,
80 access_key_id: String,
82 access_key_secret: String,
84 },
85}
86
87impl Debug for S3Endpoint {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 Self::Aws => write!(f, "Aws"),
91 Self::Custom { endpoint, .. } => f
92 .debug_struct("Custom")
93 .field("endpoint", endpoint)
94 .finish(),
95 }
96 }
97}
98
99impl S3Endpoint {
100 pub fn from_env() -> Result<Self, S3StorageLayerFactoryConfigError> {
102 match std::env::var("DOCBOX_S3_ENDPOINT") {
103 Ok(endpoint_url) => {
105 let access_key_id = std::env::var("DOCBOX_S3_ACCESS_KEY_ID")
106 .map_err(|_| S3StorageLayerFactoryConfigError::MissingAccessKeyId)?;
107 let access_key_secret = std::env::var("DOCBOX_S3_ACCESS_KEY_SECRET")
108 .map_err(|_| S3StorageLayerFactoryConfigError::MissingAccessKeySecret)?;
109
110 let external_endpoint = std::env::var("DOCBOX_S3_EXTERNAL_ENDPOINT").ok();
111
112 Ok(S3Endpoint::Custom {
113 endpoint: endpoint_url,
114 external_endpoint,
115 access_key_id,
116 access_key_secret,
117 })
118 }
119 Err(_) => Ok(S3Endpoint::Aws),
120 }
121 }
122}
123
124#[derive(Clone)]
126pub struct S3StorageLayerFactory {
127 client: S3Client,
129 external_client: Option<S3Client>,
131}
132
133impl S3StorageLayerFactory {
134 pub fn from_config(aws_config: &SdkConfig, config: S3StorageLayerFactoryConfig) -> Self {
136 let (client, external_client) = match config.endpoint {
137 S3Endpoint::Aws => {
138 tracing::debug!("using aws s3 storage layer");
139 (S3Client::new(aws_config), None)
140 }
141 S3Endpoint::Custom {
142 endpoint,
143 external_endpoint,
144 access_key_id,
145 access_key_secret,
146 } => {
147 tracing::debug!("using custom s3 storage layer");
148 let credentials = Credentials::new(
149 access_key_id,
150 access_key_secret,
151 None,
152 None,
153 "docbox_key_provider",
154 );
155
156 let config_builder = aws_sdk_s3::config::Builder::from(aws_config)
158 .force_path_style(true)
159 .endpoint_url(endpoint)
160 .credentials_provider(credentials);
161
162 let external_client = match external_endpoint {
164 Some(external_endpoint) => {
165 let config = config_builder
166 .clone()
167 .endpoint_url(external_endpoint)
168 .build();
169 let client = S3Client::from_conf(config);
170 Some(client)
171 }
172 None => None,
173 };
174
175 let config = config_builder.build();
176 let client = S3Client::from_conf(config);
177
178 (client, external_client)
179 }
180 };
181
182 Self {
183 client,
184 external_client,
185 }
186 }
187
188 pub fn create_storage_layer(&self, bucket_name: String) -> S3StorageLayer {
190 S3StorageLayer::new(
191 self.client.clone(),
192 self.external_client.clone(),
193 bucket_name,
194 )
195 }
196}
197
198#[derive(Clone)]
200pub struct S3StorageLayer {
201 bucket_name: String,
203
204 client: S3Client,
206
207 external_client: Option<S3Client>,
209}
210
211impl S3StorageLayer {
212 fn new(client: S3Client, external_client: Option<S3Client>, bucket_name: String) -> Self {
214 Self {
215 bucket_name,
216 client,
217 external_client,
218 }
219 }
220}
221
222#[derive(Debug, Error)]
227pub enum S3StorageError {
228 #[error("invalid server configuration (region)")]
230 MissingRegion,
231
232 #[error("failed to create storage bucket")]
234 CreateBucket(SdkError<CreateBucketError>),
235
236 #[error("failed to delete storage bucket")]
238 DeleteBucket(SdkError<DeleteBucketError>),
239
240 #[error("failed to get storage bucket")]
242 HeadBucket(SdkError<HeadBucketError>),
243
244 #[error("failed to store file object")]
246 PutObject(SdkError<PutObjectError>),
247
248 #[error("failed to calculate expiry timestamp")]
250 UnixTimeCalculation,
251
252 #[error("failed to create presigned store file object")]
254 PutObjectPresigned(SdkError<PutObjectError>),
255
256 #[error("failed to create presigned config")]
258 PresignedConfig,
259
260 #[error("failed to get presigned store file object")]
262 GetObjectPresigned(SdkError<GetObjectError>),
263
264 #[error("failed to create bucket notification queue config")]
266 QueueConfig,
267
268 #[error("failed to add bucket notification queue: {0}")]
274 PutBucketNotification(SdkError<PutBucketNotificationConfigurationError>),
275
276 #[error("failed to create bucket cors config")]
278 CreateCorsConfig,
279
280 #[error("failed to set bucket cors rules: {0}")]
286 PutBucketCors(SdkError<PutBucketCorsError>),
287
288 #[error("failed to delete file object")]
290 DeleteObject(SdkError<DeleteObjectError>),
291
292 #[error("failed to get file storage object")]
294 GetObject(SdkError<GetObjectError>),
295}
296
297impl StorageLayerImpl for S3StorageLayer {
298 async fn create_bucket(&self) -> Result<CreateBucketOutcome, StorageLayerError> {
299 let bucket_region = self
300 .client
301 .config()
302 .region()
303 .ok_or(S3StorageError::MissingRegion)?
304 .to_string();
305
306 let constraint = BucketLocationConstraint::from(bucket_region.as_str());
307
308 let cfg = CreateBucketConfiguration::builder()
309 .location_constraint(constraint)
310 .build();
311
312 if let Err(error) = self
313 .client
314 .create_bucket()
315 .create_bucket_configuration(cfg)
316 .bucket(&self.bucket_name)
317 .send()
318 .await
319 {
320 let already_exists = error
321 .as_service_error()
322 .is_some_and(|value| value.is_bucket_already_owned_by_you());
323
324 if already_exists {
326 tracing::debug!("bucket already exists");
327 return Ok(CreateBucketOutcome::Existing);
328 }
329
330 tracing::error!(?error, "failed to create bucket");
331 return Err(S3StorageError::CreateBucket(error).into());
332 }
333
334 Ok(CreateBucketOutcome::New)
335 }
336
337 async fn bucket_exists(&self) -> Result<bool, StorageLayerError> {
338 if let Err(error) = self
339 .client
340 .head_bucket()
341 .bucket(&self.bucket_name)
342 .send()
343 .await
344 {
345 if error
347 .as_service_error()
348 .is_some_and(|error| error.is_not_found())
349 {
350 return Ok(false);
351 }
352
353 return Err(S3StorageError::HeadBucket(error).into());
354 }
355
356 Ok(true)
357 }
358
359 async fn delete_bucket(&self) -> Result<(), StorageLayerError> {
360 if let Err(error) = self
361 .client
362 .delete_bucket()
363 .bucket(&self.bucket_name)
364 .send()
365 .await
366 {
367 if error
370 .as_service_error()
371 .and_then(|err| err.meta().code())
372 .is_some_and(|code| code == "NoSuchBucket")
373 {
374 tracing::debug!("bucket did not exist");
375 return Ok(());
376 }
377
378 tracing::error!(?error, "failed to delete bucket");
379
380 return Err(S3StorageError::DeleteBucket(error).into());
381 }
382
383 Ok(())
384 }
385
386 async fn upload_file(
387 &self,
388 key: &str,
389 content_type: String,
390 body: Bytes,
391 ) -> Result<(), StorageLayerError> {
392 self.client
393 .put_object()
394 .bucket(&self.bucket_name)
395 .content_type(content_type)
396 .key(key)
397 .body(body.into())
398 .send()
399 .await
400 .map_err(|error| {
401 tracing::error!(?error, "failed to store file object");
402 S3StorageError::PutObject(error)
403 })?;
404
405 Ok(())
406 }
407
408 async fn create_presigned(
409 &self,
410 key: &str,
411 size: i64,
412 ) -> Result<(PresignedRequest, DateTime<Utc>), StorageLayerError> {
413 let expiry_time_minutes = 30;
414 let expires_at = Utc::now()
415 .checked_add_signed(TimeDelta::minutes(expiry_time_minutes))
416 .ok_or(S3StorageError::UnixTimeCalculation)?;
417
418 let client = match self.external_client.as_ref() {
419 Some(external_client) => external_client,
420 None => &self.client,
421 };
422
423 let result = client
424 .put_object()
425 .bucket(&self.bucket_name)
426 .key(key)
427 .content_length(size)
428 .presigned(
429 PresigningConfig::builder()
430 .expires_in(Duration::from_secs(60 * expiry_time_minutes as u64))
431 .build()
432 .map_err(|error| {
433 tracing::error!(?error, "Failed to create presigned store config");
434 S3StorageError::PresignedConfig
435 })?,
436 )
437 .await
438 .map_err(|error| {
439 tracing::error!(?error, "failed to create presigned store file object");
440 S3StorageError::PutObjectPresigned(error)
441 })?;
442
443 Ok((result, expires_at))
444 }
445
446 async fn create_presigned_download(
447 &self,
448 key: &str,
449 expires_in: Duration,
450 ) -> Result<(PresignedRequest, DateTime<Utc>), StorageLayerError> {
451 let expires_at = Utc::now()
452 .checked_add_signed(TimeDelta::seconds(expires_in.as_secs() as i64))
453 .ok_or(S3StorageError::UnixTimeCalculation)?;
454
455 let client = match self.external_client.as_ref() {
456 Some(external_client) => external_client,
457 None => &self.client,
458 };
459
460 let result = client
461 .get_object()
462 .bucket(&self.bucket_name)
463 .key(key)
464 .presigned(PresigningConfig::expires_in(expires_in).map_err(|error| {
465 tracing::error!(?error, "failed to create presigned download config");
466 S3StorageError::PresignedConfig
467 })?)
468 .await
469 .map_err(|error| {
470 tracing::error!(?error, "failed to create presigned download");
471 S3StorageError::GetObjectPresigned(error)
472 })?;
473
474 Ok((result, expires_at))
475 }
476
477 async fn add_bucket_notifications(&self, sqs_arn: &str) -> Result<(), StorageLayerError> {
478 self.client
480 .put_bucket_notification_configuration()
481 .bucket(&self.bucket_name)
482 .notification_configuration(
483 NotificationConfiguration::builder()
484 .set_queue_configurations(Some(vec![
485 QueueConfiguration::builder()
486 .queue_arn(sqs_arn)
487 .events(aws_sdk_s3::types::Event::S3ObjectCreated)
488 .build()
489 .map_err(|error| {
490 tracing::error!(
491 ?error,
492 "failed to create bucket notification queue config"
493 );
494 S3StorageError::QueueConfig
495 })?,
496 ]))
497 .build(),
498 )
499 .send()
500 .await
501 .map_err(|error| {
502 tracing::error!(?error, "failed to add bucket notification queue");
503 S3StorageError::PutBucketNotification(error)
504 })?;
505
506 Ok(())
507 }
508
509 async fn set_bucket_cors_origins(&self, origins: Vec<String>) -> Result<(), StorageLayerError> {
510 if let Err(error) = self
511 .client
512 .put_bucket_cors()
513 .bucket(&self.bucket_name)
514 .cors_configuration(
515 CorsConfiguration::builder()
516 .cors_rules(
517 CorsRule::builder()
518 .allowed_headers("*")
519 .allowed_methods("PUT")
520 .set_allowed_origins(Some(origins))
521 .set_expose_headers(Some(Vec::new()))
522 .build()
523 .map_err(|error| {
524 tracing::error!(?error, "failed to create cors rule");
525 S3StorageError::CreateCorsConfig
526 })?,
527 )
528 .build()
529 .map_err(|error| {
530 tracing::error!(?error, "failed to create cors config");
531 S3StorageError::CreateCorsConfig
532 })?,
533 )
534 .send()
535 .await
536 {
537 if error
539 .raw_response()
540 .is_some_and(|response| response.status().as_u16() == 501)
542 {
543 tracing::warn!("storage s3 backend does not support PutBucketCors.. skipping..");
544 return Ok(());
545 }
546
547 tracing::error!(?error, "failed to add bucket cors");
548 return Err(S3StorageError::PutBucketCors(error).into());
549 };
550
551 Ok(())
552 }
553
554 async fn delete_file(&self, key: &str) -> Result<(), StorageLayerError> {
555 if let Err(error) = self
556 .client
557 .delete_object()
558 .bucket(&self.bucket_name)
559 .key(key)
560 .send()
561 .await
562 {
563 if error
566 .as_service_error()
567 .and_then(|err| err.source())
568 .and_then(|source| source.downcast_ref::<aws_sdk_s3::Error>())
569 .is_some_and(|err| matches!(err, aws_sdk_s3::Error::NoSuchKey(_)))
570 {
571 return Ok(());
572 }
573
574 tracing::error!(?error, "failed to delete file object");
575 return Err(S3StorageError::DeleteObject(error).into());
576 }
577
578 Ok(())
579 }
580
581 async fn get_file(&self, key: &str) -> Result<FileStream, StorageLayerError> {
582 let object = self
583 .client
584 .get_object()
585 .bucket(&self.bucket_name)
586 .key(key)
587 .send()
588 .await
589 .map_err(|error| {
590 tracing::error!(?error, "failed to get file storage object");
591 S3StorageError::GetObject(error)
592 })?;
593
594 let stream = FileStream {
595 stream: Box::pin(AwsFileStream { inner: object.body }),
596 };
597
598 Ok(stream)
599 }
600}
601
602pub struct AwsFileStream {
604 inner: ByteStream,
605}
606
607impl AwsFileStream {
608 pub fn into_inner(self) -> ByteStream {
610 self.inner
611 }
612}
613
614impl Stream for AwsFileStream {
615 type Item = std::io::Result<Bytes>;
616
617 fn poll_next(
618 self: std::pin::Pin<&mut Self>,
619 cx: &mut std::task::Context<'_>,
620 ) -> std::task::Poll<Option<Self::Item>> {
621 let this = self.get_mut();
622 let inner = std::pin::Pin::new(&mut this.inner);
623 inner.poll_next(cx).map_err(std::io::Error::other)
624 }
625}