1use crate::{
13 CreateBucketOutcome, FileStream, StorageLayerError, StorageLayerImpl, UploadFileOptions,
14 UploadFileTag,
15};
16use aws_config::SdkConfig;
17use aws_sdk_s3::{
18 config::Credentials,
19 error::SdkError,
20 operation::{
21 create_bucket::CreateBucketError, delete_bucket::DeleteBucketError,
22 delete_object::DeleteObjectError,
23 get_bucket_lifecycle_configuration::GetBucketLifecycleConfigurationError,
24 get_object::GetObjectError, head_bucket::HeadBucketError,
25 put_bucket_cors::PutBucketCorsError,
26 put_bucket_lifecycle_configuration::PutBucketLifecycleConfigurationError,
27 put_bucket_notification_configuration::PutBucketNotificationConfigurationError,
28 put_object::PutObjectError,
29 },
30 presigning::{PresignedRequest, PresigningConfig},
31 primitives::ByteStream,
32 types::{
33 BucketLifecycleConfiguration, BucketLocationConstraint, CorsConfiguration, CorsRule,
34 CreateBucketConfiguration, LifecycleExpiration, LifecycleRule, LifecycleRuleFilter,
35 NotificationConfiguration, QueueConfiguration, Tag,
36 },
37};
38use bytes::Bytes;
39use chrono::{DateTime, TimeDelta, Utc};
40use futures::Stream;
41use serde::{Deserialize, Serialize};
42use std::{error::Error, fmt::Debug, time::Duration};
43use thiserror::Error;
44
45type S3Client = aws_sdk_s3::Client;
46
47#[derive(Debug, Default, Clone, Deserialize, Serialize)]
49#[serde(default)]
50pub struct S3StorageLayerFactoryConfig {
51 pub endpoint: S3Endpoint,
53}
54
55#[derive(Debug, Error)]
57pub enum S3StorageLayerFactoryConfigError {
58 #[error("cannot use DOCBOX_S3_ENDPOINT without specifying DOCBOX_S3_ACCESS_KEY_ID")]
60 MissingAccessKeyId,
61
62 #[error("cannot use DOCBOX_S3_ENDPOINT without specifying DOCBOX_S3_ACCESS_KEY_SECRET")]
64 MissingAccessKeySecret,
65}
66
67impl S3StorageLayerFactoryConfig {
68 pub fn from_env() -> Result<Self, S3StorageLayerFactoryConfigError> {
70 let endpoint = S3Endpoint::from_env()?;
71
72 Ok(Self { endpoint })
73 }
74}
75
76#[derive(Default, Clone, Deserialize, Serialize)]
78#[serde(tag = "type", rename_all = "snake_case")]
79pub enum S3Endpoint {
80 #[default]
82 Aws,
83 Custom {
85 endpoint: String,
87 external_endpoint: Option<String>,
89 access_key_id: String,
91 access_key_secret: String,
93 },
94}
95
96impl Debug for S3Endpoint {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 match self {
99 Self::Aws => write!(f, "Aws"),
100 Self::Custom { endpoint, .. } => f
101 .debug_struct("Custom")
102 .field("endpoint", endpoint)
103 .finish(),
104 }
105 }
106}
107
108impl S3Endpoint {
109 pub fn from_env() -> Result<Self, S3StorageLayerFactoryConfigError> {
111 match std::env::var("DOCBOX_S3_ENDPOINT") {
112 Ok(endpoint_url) => {
114 let access_key_id = std::env::var("DOCBOX_S3_ACCESS_KEY_ID")
115 .map_err(|_| S3StorageLayerFactoryConfigError::MissingAccessKeyId)?;
116 let access_key_secret = std::env::var("DOCBOX_S3_ACCESS_KEY_SECRET")
117 .map_err(|_| S3StorageLayerFactoryConfigError::MissingAccessKeySecret)?;
118
119 let external_endpoint = std::env::var("DOCBOX_S3_EXTERNAL_ENDPOINT").ok();
120
121 Ok(S3Endpoint::Custom {
122 endpoint: endpoint_url,
123 external_endpoint,
124 access_key_id,
125 access_key_secret,
126 })
127 }
128 Err(_) => Ok(S3Endpoint::Aws),
129 }
130 }
131}
132
133#[derive(Clone)]
135pub struct S3StorageLayerFactory {
136 client: S3Client,
138 external_client: Option<S3Client>,
140}
141
142impl S3StorageLayerFactory {
143 pub fn from_config(aws_config: &SdkConfig, config: S3StorageLayerFactoryConfig) -> Self {
145 let (client, external_client) = match config.endpoint {
146 S3Endpoint::Aws => {
147 tracing::debug!("using aws s3 storage layer");
148 (S3Client::new(aws_config), None)
149 }
150 S3Endpoint::Custom {
151 endpoint,
152 external_endpoint,
153 access_key_id,
154 access_key_secret,
155 } => {
156 tracing::debug!("using custom s3 storage layer");
157 let credentials = Credentials::new(
158 access_key_id,
159 access_key_secret,
160 None,
161 None,
162 "docbox_key_provider",
163 );
164
165 let config_builder = aws_sdk_s3::config::Builder::from(aws_config)
167 .force_path_style(true)
168 .endpoint_url(endpoint)
169 .credentials_provider(credentials);
170
171 let external_client = match external_endpoint {
173 Some(external_endpoint) => {
174 let config = config_builder
175 .clone()
176 .endpoint_url(external_endpoint)
177 .build();
178 let client = S3Client::from_conf(config);
179 Some(client)
180 }
181 None => None,
182 };
183
184 let config = config_builder.build();
185 let client = S3Client::from_conf(config);
186
187 (client, external_client)
188 }
189 };
190
191 Self {
192 client,
193 external_client,
194 }
195 }
196
197 pub fn create_storage_layer(&self, bucket_name: String) -> S3StorageLayer {
199 S3StorageLayer::new(
200 self.client.clone(),
201 self.external_client.clone(),
202 bucket_name,
203 )
204 }
205}
206
207#[derive(Clone)]
209pub struct S3StorageLayer {
210 bucket_name: String,
212
213 client: S3Client,
215
216 external_client: Option<S3Client>,
218}
219
220impl S3StorageLayer {
221 fn new(client: S3Client, external_client: Option<S3Client>, bucket_name: String) -> Self {
223 Self {
224 bucket_name,
225 client,
226 external_client,
227 }
228 }
229
230 async fn m1_storage_lifecycle_rules(&self) -> Result<(), StorageLayerError> {
233 let existing_lifecycle_configuration_rules = match self
234 .client
235 .get_bucket_lifecycle_configuration()
236 .bucket(&self.bucket_name)
237 .send()
238 .await
239 .inspect_err(|error| {
240 tracing::error!(
241 ?error,
242 "failed to get existing bucket lifecycle configuration"
243 )
244 }) {
245 Ok(value) => value.rules,
246 Err(error) => match error.as_service_error() {
247 Some(error)
249 if error
250 .meta()
251 .code()
252 .is_some_and(|code| code == "NoSuchLifecycleConfiguration") =>
253 {
254 None
255 }
256
257 _ => return Err(S3StorageError::GetBucketLifecycleConfiguration(error).into()),
258 },
259 };
260
261 self.client
262 .put_bucket_lifecycle_configuration()
263 .bucket(&self.bucket_name)
264 .lifecycle_configuration(
265 BucketLifecycleConfiguration::builder()
266 .set_rules(existing_lifecycle_configuration_rules)
268 .rules(
270 LifecycleRule::builder()
271 .id("expire-1d")
272 .status(aws_sdk_s3::types::ExpirationStatus::Enabled)
273 .filter(
274 LifecycleRuleFilter::builder()
275 .tag(
276 Tag::builder()
277 .key("expire")
278 .value("1d")
279 .build()
280 .expect("invalid tag"),
281 )
282 .build(),
283 )
284 .expiration(LifecycleExpiration::builder().days(1).build())
285 .build()
286 .expect("invalid lifecycle rule configuration"),
287 )
288 .rules(
290 LifecycleRule::builder()
291 .id("expire-30d")
292 .status(aws_sdk_s3::types::ExpirationStatus::Enabled)
293 .filter(
294 LifecycleRuleFilter::builder()
295 .tag(
296 Tag::builder()
297 .key("expire")
298 .value("30d")
299 .build()
300 .expect("invalid tag"),
301 )
302 .build(),
303 )
304 .expiration(LifecycleExpiration::builder().days(30).build())
305 .build()
306 .expect("invalid lifecycle rule configuration"),
307 )
308 .build()
309 .expect("invalid lifecycle configuration"),
310 )
311 .send()
312 .await
313 .inspect_err(|error| {
314 tracing::error!(?error, "failed to put bucket lifecycle configuration")
315 })
316 .map_err(S3StorageError::PutBucketLifecycleConfiguration)?;
317
318 Ok(())
319 }
320}
321
322#[derive(Debug, Error)]
327pub enum S3StorageError {
328 #[error("invalid server configuration (region)")]
330 MissingRegion,
331
332 #[error("failed to create storage bucket")]
334 CreateBucket(SdkError<CreateBucketError>),
335
336 #[error("failed to delete storage bucket")]
338 DeleteBucket(SdkError<DeleteBucketError>),
339
340 #[error("failed to get storage bucket")]
342 HeadBucket(SdkError<HeadBucketError>),
343
344 #[error("failed to store file object")]
346 PutObject(SdkError<PutObjectError>),
347
348 #[error("failed to calculate expiry timestamp")]
350 UnixTimeCalculation,
351
352 #[error("failed to create presigned store file object")]
354 PutObjectPresigned(SdkError<PutObjectError>),
355
356 #[error("failed to create presigned config")]
358 PresignedConfig,
359
360 #[error("failed to get presigned store file object")]
362 GetObjectPresigned(SdkError<GetObjectError>),
363
364 #[error("failed to create bucket notification queue config")]
366 QueueConfig,
367
368 #[error("failed to add bucket notification queue: {0}")]
374 PutBucketNotification(SdkError<PutBucketNotificationConfigurationError>),
375
376 #[error("failed to create bucket cors config")]
378 CreateCorsConfig,
379
380 #[error("failed to set bucket cors rules: {0}")]
386 PutBucketCors(SdkError<PutBucketCorsError>),
387
388 #[error("failed to delete file object")]
390 DeleteObject(SdkError<DeleteObjectError>),
391
392 #[error("failed to get file storage object")]
394 GetObject(SdkError<GetObjectError>),
395
396 #[error("failed to get bucket lifecycle configuration: {0}")]
402 GetBucketLifecycleConfiguration(SdkError<GetBucketLifecycleConfigurationError>),
403
404 #[error("failed to put bucket lifecycle configuration: {0}")]
410 PutBucketLifecycleConfiguration(SdkError<PutBucketLifecycleConfigurationError>),
411}
412
413const MIGRATION_NAMES: &[&str] = &["m1_storage_lifecycle_rules"];
414
415impl StorageLayerImpl for S3StorageLayer {
416 fn bucket_name(&self) -> String {
417 self.bucket_name.clone()
418 }
419
420 async fn create_bucket(&self) -> Result<CreateBucketOutcome, StorageLayerError> {
421 let bucket_region = self
422 .client
423 .config()
424 .region()
425 .ok_or(S3StorageError::MissingRegion)?
426 .to_string();
427
428 let constraint = BucketLocationConstraint::from(bucket_region.as_str());
429
430 let cfg = CreateBucketConfiguration::builder()
431 .location_constraint(constraint)
432 .build();
433
434 if let Err(error) = self
435 .client
436 .create_bucket()
437 .create_bucket_configuration(cfg)
438 .bucket(&self.bucket_name)
439 .send()
440 .await
441 {
442 let already_exists = error
443 .as_service_error()
444 .is_some_and(|value| value.is_bucket_already_owned_by_you());
445
446 if already_exists {
448 tracing::debug!("bucket already exists");
449 return Ok(CreateBucketOutcome::Existing);
450 }
451
452 tracing::error!(?error, "failed to create bucket");
453 return Err(S3StorageError::CreateBucket(error).into());
454 }
455
456 Ok(CreateBucketOutcome::New)
457 }
458
459 async fn bucket_exists(&self) -> Result<bool, StorageLayerError> {
460 if let Err(error) = self
461 .client
462 .head_bucket()
463 .bucket(&self.bucket_name)
464 .send()
465 .await
466 {
467 if error
469 .as_service_error()
470 .is_some_and(|error| error.is_not_found())
471 {
472 return Ok(false);
473 }
474
475 return Err(S3StorageError::HeadBucket(error).into());
476 }
477
478 Ok(true)
479 }
480
481 async fn delete_bucket(&self) -> Result<(), StorageLayerError> {
482 if let Err(error) = self
483 .client
484 .delete_bucket()
485 .bucket(&self.bucket_name)
486 .send()
487 .await
488 {
489 if error
492 .as_service_error()
493 .and_then(|err| err.meta().code())
494 .is_some_and(|code| code == "NoSuchBucket")
495 {
496 tracing::debug!("bucket did not exist");
497 return Ok(());
498 }
499
500 tracing::error!(?error, "failed to delete bucket");
501
502 return Err(S3StorageError::DeleteBucket(error).into());
503 }
504
505 Ok(())
506 }
507
508 async fn upload_file(
509 &self,
510 key: &str,
511 body: Bytes,
512 options: UploadFileOptions,
513 ) -> Result<(), StorageLayerError> {
514 let tagging = options.tags.map(|tags| {
515 use itertools::Itertools;
516
517 tags.into_iter()
518 .map(|tag| match tag {
519 UploadFileTag::ExpireDays1 => "expire=1d",
520 UploadFileTag::ExpireDays30 => "expire=30d",
521 })
522 .join("&")
523 });
524
525 self.client
526 .put_object()
527 .bucket(&self.bucket_name)
528 .content_type(options.content_type)
529 .key(key)
530 .set_tagging(tagging)
531 .body(body.into())
532 .send()
533 .await
534 .map_err(|error| {
535 tracing::error!(?error, "failed to store file object");
536 S3StorageError::PutObject(error)
537 })?;
538
539 Ok(())
540 }
541
542 async fn create_presigned(
543 &self,
544 key: &str,
545 size: i64,
546 ) -> Result<(PresignedRequest, DateTime<Utc>), StorageLayerError> {
547 let expiry_time_minutes = 30;
548 let expires_at = Utc::now()
549 .checked_add_signed(TimeDelta::minutes(expiry_time_minutes))
550 .ok_or(S3StorageError::UnixTimeCalculation)?;
551
552 let client = match self.external_client.as_ref() {
553 Some(external_client) => external_client,
554 None => &self.client,
555 };
556
557 let result = client
558 .put_object()
559 .bucket(&self.bucket_name)
560 .key(key)
561 .content_length(size)
562 .presigned(
563 PresigningConfig::builder()
564 .expires_in(Duration::from_secs(60 * expiry_time_minutes as u64))
565 .build()
566 .map_err(|error| {
567 tracing::error!(?error, "Failed to create presigned store config");
568 S3StorageError::PresignedConfig
569 })?,
570 )
571 .await
572 .map_err(|error| {
573 tracing::error!(?error, "failed to create presigned store file object");
574 S3StorageError::PutObjectPresigned(error)
575 })?;
576
577 Ok((result, expires_at))
578 }
579
580 async fn create_presigned_download(
581 &self,
582 key: &str,
583 expires_in: Duration,
584 ) -> Result<(PresignedRequest, DateTime<Utc>), StorageLayerError> {
585 let expires_at = Utc::now()
586 .checked_add_signed(TimeDelta::seconds(expires_in.as_secs() as i64))
587 .ok_or(S3StorageError::UnixTimeCalculation)?;
588
589 let client = match self.external_client.as_ref() {
590 Some(external_client) => external_client,
591 None => &self.client,
592 };
593
594 let result = client
595 .get_object()
596 .bucket(&self.bucket_name)
597 .key(key)
598 .presigned(PresigningConfig::expires_in(expires_in).map_err(|error| {
599 tracing::error!(?error, "failed to create presigned download config");
600 S3StorageError::PresignedConfig
601 })?)
602 .await
603 .map_err(|error| {
604 tracing::error!(?error, "failed to create presigned download");
605 S3StorageError::GetObjectPresigned(error)
606 })?;
607
608 Ok((result, expires_at))
609 }
610
611 async fn add_bucket_notifications(&self, sqs_arn: &str) -> Result<(), StorageLayerError> {
612 self.client
614 .put_bucket_notification_configuration()
615 .bucket(&self.bucket_name)
616 .notification_configuration(
617 NotificationConfiguration::builder()
618 .set_queue_configurations(Some(vec![
619 QueueConfiguration::builder()
620 .queue_arn(sqs_arn)
621 .events(aws_sdk_s3::types::Event::S3ObjectCreated)
622 .build()
623 .map_err(|error| {
624 tracing::error!(
625 ?error,
626 "failed to create bucket notification queue config"
627 );
628 S3StorageError::QueueConfig
629 })?,
630 ]))
631 .build(),
632 )
633 .send()
634 .await
635 .map_err(|error| {
636 tracing::error!(?error, "failed to add bucket notification queue");
637 S3StorageError::PutBucketNotification(error)
638 })?;
639
640 Ok(())
641 }
642
643 async fn set_bucket_cors_origins(&self, origins: Vec<String>) -> Result<(), StorageLayerError> {
644 if let Err(error) = self
645 .client
646 .put_bucket_cors()
647 .bucket(&self.bucket_name)
648 .cors_configuration(
649 CorsConfiguration::builder()
650 .cors_rules(
651 CorsRule::builder()
652 .allowed_headers("*")
653 .allowed_methods("PUT")
654 .set_allowed_origins(Some(origins))
655 .set_expose_headers(Some(Vec::new()))
656 .build()
657 .map_err(|error| {
658 tracing::error!(?error, "failed to create cors rule");
659 S3StorageError::CreateCorsConfig
660 })?,
661 )
662 .build()
663 .map_err(|error| {
664 tracing::error!(?error, "failed to create cors config");
665 S3StorageError::CreateCorsConfig
666 })?,
667 )
668 .send()
669 .await
670 {
671 if error
673 .raw_response()
674 .is_some_and(|response| response.status().as_u16() == 501)
676 {
677 tracing::warn!("storage s3 backend does not support PutBucketCors.. skipping..");
678 return Ok(());
679 }
680
681 tracing::error!(?error, "failed to add bucket cors");
682 return Err(S3StorageError::PutBucketCors(error).into());
683 };
684
685 Ok(())
686 }
687
688 async fn delete_file(&self, key: &str) -> Result<(), StorageLayerError> {
689 if let Err(error) = self
690 .client
691 .delete_object()
692 .bucket(&self.bucket_name)
693 .key(key)
694 .send()
695 .await
696 {
697 if error
700 .as_service_error()
701 .and_then(|err| err.source())
702 .and_then(|source| source.downcast_ref::<aws_sdk_s3::Error>())
703 .is_some_and(|err| matches!(err, aws_sdk_s3::Error::NoSuchKey(_)))
704 {
705 return Ok(());
706 }
707
708 tracing::error!(?error, "failed to delete file object");
709 return Err(S3StorageError::DeleteObject(error).into());
710 }
711
712 Ok(())
713 }
714
715 async fn get_file(&self, key: &str) -> Result<FileStream, StorageLayerError> {
716 let object = self
717 .client
718 .get_object()
719 .bucket(&self.bucket_name)
720 .key(key)
721 .send()
722 .await
723 .map_err(|error| {
724 tracing::error!(?error, "failed to get file storage object");
725 S3StorageError::GetObject(error)
726 })?;
727
728 let stream = FileStream {
729 stream: Box::pin(AwsFileStream { inner: object.body }),
730 };
731
732 Ok(stream)
733 }
734
735 async fn get_pending_migrations(
736 &self,
737 applied_names: Vec<String>,
738 ) -> Result<Vec<String>, StorageLayerError> {
739 Ok(MIGRATION_NAMES
740 .iter()
741 .map(|name| name.to_string())
742 .filter(|name| !applied_names.contains(name))
743 .collect())
744 }
745
746 async fn apply_migration(&self, name: &str) -> Result<(), StorageLayerError> {
747 #[allow(clippy::single_match)]
748 match name {
749 "m1_storage_lifecycle_rules" => self.m1_storage_lifecycle_rules().await,
750 _ => Ok(()),
751 }
752 }
753}
754
755pub struct AwsFileStream {
757 inner: ByteStream,
758}
759
760impl AwsFileStream {
761 pub fn into_inner(self) -> ByteStream {
763 self.inner
764 }
765}
766
767impl Stream for AwsFileStream {
768 type Item = std::io::Result<Bytes>;
769
770 fn poll_next(
771 self: std::pin::Pin<&mut Self>,
772 cx: &mut std::task::Context<'_>,
773 ) -> std::task::Poll<Option<Self::Item>> {
774 let this = self.get_mut();
775 let inner = std::pin::Pin::new(&mut this.inner);
776 inner.poll_next(cx).map_err(std::io::Error::other)
777 }
778}