docbox_storage/
s3.rs

1//! # S3 Storage Backend
2//!
3//! Storage backend backed by a [S3](https://docs.aws.amazon.com/s3/) compatible storage solution (AWS S3, MinIO, ...etc)
4//!
5//! # Environment Variables
6//!
7//! * `DOCBOX_S3_ENDPOINT` - URL to use when using a custom S3 endpoint
8//! * `DOCBOX_S3_EXTERNAL_ENDPOINT` - Alternative "external" user facing endpoint, useful when running the server in docker with a different endpoint
9//! * `DOCBOX_S3_ACCESS_KEY_ID` - Access key ID when using a custom S3 endpoint
10//! * `DOCBOX_S3_ACCESS_KEY_SECRET` - Access key secret when using a custom S3 endpoint
11
12use crate::{CreateBucketOutcome, FileStream, StorageLayerError, StorageLayerImpl};
13use aws_config::SdkConfig;
14use aws_sdk_s3::{
15    config::Credentials,
16    error::SdkError,
17    operation::{
18        create_bucket::CreateBucketError, delete_bucket::DeleteBucketError,
19        delete_object::DeleteObjectError, get_object::GetObjectError, head_bucket::HeadBucketError,
20        put_bucket_cors::PutBucketCorsError,
21        put_bucket_notification_configuration::PutBucketNotificationConfigurationError,
22        put_object::PutObjectError,
23    },
24    presigning::{PresignedRequest, PresigningConfig},
25    primitives::ByteStream,
26    types::{
27        BucketLocationConstraint, CorsConfiguration, CorsRule, CreateBucketConfiguration,
28        NotificationConfiguration, QueueConfiguration,
29    },
30};
31use bytes::Bytes;
32use chrono::{DateTime, TimeDelta, Utc};
33use futures::Stream;
34use serde::{Deserialize, Serialize};
35use std::{error::Error, fmt::Debug, time::Duration};
36use thiserror::Error;
37
38type S3Client = aws_sdk_s3::Client;
39
40/// Configuration for the S3 storage layer
41#[derive(Debug, Clone, Deserialize, Serialize)]
42pub struct S3StorageLayerFactoryConfig {
43    /// Endpoint to use for requests
44    pub endpoint: S3Endpoint,
45}
46
47/// Errors that could occur when loading the S3 storage layer configuration
48#[derive(Debug, Error)]
49pub enum S3StorageLayerFactoryConfigError {
50    /// Using a custom endpoint but didn't specify the access key ID
51    #[error("cannot use DOCBOX_S3_ENDPOINT without specifying DOCBOX_S3_ACCESS_KEY_ID")]
52    MissingAccessKeyId,
53
54    /// Using a custom endpoint but didn't specify the access key secret
55    #[error("cannot use DOCBOX_S3_ENDPOINT without specifying DOCBOX_S3_ACCESS_KEY_SECRET")]
56    MissingAccessKeySecret,
57}
58
59impl S3StorageLayerFactoryConfig {
60    /// Load a [S3StorageLayerFactoryConfig] from the current environment
61    pub fn from_env() -> Result<Self, S3StorageLayerFactoryConfigError> {
62        let endpoint = S3Endpoint::from_env()?;
63
64        Ok(Self { endpoint })
65    }
66}
67
68/// Endpoint to use for S3 operations
69#[derive(Clone, Deserialize, Serialize)]
70#[serde(tag = "type", rename_all = "snake_case")]
71pub enum S3Endpoint {
72    /// AWS default endpoint
73    Aws,
74    /// Custom endpoint (Minio or other compatible)
75    Custom {
76        /// Endpoint URL
77        endpoint: String,
78        /// Endpoint to use for external requests (Presigned requests)
79        external_endpoint: Option<String>,
80        /// Access key ID to use
81        access_key_id: String,
82        /// Access key secret to use
83        access_key_secret: String,
84    },
85}
86
87impl Debug for S3Endpoint {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        match self {
90            Self::Aws => write!(f, "Aws"),
91            Self::Custom { endpoint, .. } => f
92                .debug_struct("Custom")
93                .field("endpoint", endpoint)
94                .finish(),
95        }
96    }
97}
98
99impl S3Endpoint {
100    /// Load a [S3Endpoint] from the current environment
101    pub fn from_env() -> Result<Self, S3StorageLayerFactoryConfigError> {
102        match std::env::var("DOCBOX_S3_ENDPOINT") {
103            // Using a custom S3 endpoint
104            Ok(endpoint_url) => {
105                let access_key_id = std::env::var("DOCBOX_S3_ACCESS_KEY_ID")
106                    .map_err(|_| S3StorageLayerFactoryConfigError::MissingAccessKeyId)?;
107                let access_key_secret = std::env::var("DOCBOX_S3_ACCESS_KEY_SECRET")
108                    .map_err(|_| S3StorageLayerFactoryConfigError::MissingAccessKeySecret)?;
109
110                let external_endpoint = std::env::var("DOCBOX_S3_EXTERNAL_ENDPOINT").ok();
111
112                Ok(S3Endpoint::Custom {
113                    endpoint: endpoint_url,
114                    external_endpoint,
115                    access_key_id,
116                    access_key_secret,
117                })
118            }
119            Err(_) => Ok(S3Endpoint::Aws),
120        }
121    }
122}
123
124/// Storage layer factory backend by a S3 compatible service
125#[derive(Clone)]
126pub struct S3StorageLayerFactory {
127    /// Client to access S3
128    client: S3Client,
129    /// Optional different client for creating presigned external requests
130    external_client: Option<S3Client>,
131}
132
133impl S3StorageLayerFactory {
134    /// Create a [S3StorageLayerFactory] from a config
135    pub fn from_config(aws_config: &SdkConfig, config: S3StorageLayerFactoryConfig) -> Self {
136        let (client, external_client) = match config.endpoint {
137            S3Endpoint::Aws => {
138                tracing::debug!("using aws s3 storage layer");
139                (S3Client::new(aws_config), None)
140            }
141            S3Endpoint::Custom {
142                endpoint,
143                external_endpoint,
144                access_key_id,
145                access_key_secret,
146            } => {
147                tracing::debug!("using custom s3 storage layer");
148                let credentials = Credentials::new(
149                    access_key_id,
150                    access_key_secret,
151                    None,
152                    None,
153                    "docbox_key_provider",
154                );
155
156                // Enforces the "path" style for S3 bucket access
157                let config_builder = aws_sdk_s3::config::Builder::from(aws_config)
158                    .force_path_style(true)
159                    .endpoint_url(endpoint)
160                    .credentials_provider(credentials);
161
162                // Create an external client for external s3 requests if needed
163                let external_client = match external_endpoint {
164                    Some(external_endpoint) => {
165                        let config = config_builder
166                            .clone()
167                            .endpoint_url(external_endpoint)
168                            .build();
169                        let client = S3Client::from_conf(config);
170                        Some(client)
171                    }
172                    None => None,
173                };
174
175                let config = config_builder.build();
176                let client = S3Client::from_conf(config);
177
178                (client, external_client)
179            }
180        };
181
182        Self {
183            client,
184            external_client,
185        }
186    }
187
188    /// Create a [S3StorageLayer] for the provided `bucket_name`
189    pub fn create_storage_layer(&self, bucket_name: String) -> S3StorageLayer {
190        S3StorageLayer::new(
191            self.client.clone(),
192            self.external_client.clone(),
193            bucket_name,
194        )
195    }
196}
197
198/// Storage layer backend by a S3 compatible service
199#[derive(Clone)]
200pub struct S3StorageLayer {
201    /// Name of the bucket to use
202    bucket_name: String,
203
204    /// Client to access S3
205    client: S3Client,
206
207    /// Optional different client for creating presigned external requests
208    external_client: Option<S3Client>,
209}
210
211impl S3StorageLayer {
212    /// Create a new S3 storage layer from the client and bucket name
213    fn new(client: S3Client, external_client: Option<S3Client>, bucket_name: String) -> Self {
214        Self {
215            bucket_name,
216            client,
217            external_client,
218        }
219    }
220}
221
222/// User facing storage errors
223///
224/// Should not contain the actual error types, these will be logged
225/// early, only includes the actual error message
226#[derive(Debug, Error)]
227pub enum S3StorageError {
228    /// AWS region missing
229    #[error("invalid server configuration (region)")]
230    MissingRegion,
231
232    /// Failed to create a bucket
233    #[error("failed to create storage bucket")]
234    CreateBucket(SdkError<CreateBucketError>),
235
236    /// Failed to delete a bucket
237    #[error("failed to delete storage bucket")]
238    DeleteBucket(SdkError<DeleteBucketError>),
239
240    /// Failed to head a bucket
241    #[error("failed to get storage bucket")]
242    HeadBucket(SdkError<HeadBucketError>),
243
244    /// Failed to store a file in a bucket
245    #[error("failed to store file object")]
246    PutObject(SdkError<PutObjectError>),
247
248    /// Failed to calculate future unix timestamps
249    #[error("failed to calculate expiry timestamp")]
250    UnixTimeCalculation,
251
252    /// Failed to create presigned upload
253    #[error("failed to create presigned store file object")]
254    PutObjectPresigned(SdkError<PutObjectError>),
255
256    /// Failed to create presigned config
257    #[error("failed to create presigned config")]
258    PresignedConfig,
259
260    /// Failed to create presigned download
261    #[error("failed to get presigned store file object")]
262    GetObjectPresigned(SdkError<GetObjectError>),
263
264    /// Failed to create the config for the notification queue
265    #[error("failed to create bucket notification queue config")]
266    QueueConfig,
267
268    /// Failed to setup a notification queue on the bucket
269    ///
270    /// This error is allowed to expose the inner error details as
271    /// it is only used by the management layer and these errors are
272    /// helpful for management
273    #[error("failed to add bucket notification queue: {0}")]
274    PutBucketNotification(SdkError<PutBucketNotificationConfigurationError>),
275
276    /// Failed to make the cors config or rules
277    #[error("failed to create bucket cors config")]
278    CreateCorsConfig,
279
280    /// Failed to put the bucket cors config
281    ///
282    /// This error is allowed to expose the inner error details as
283    /// it is only used by the management layer and these errors are
284    /// helpful for management
285    #[error("failed to set bucket cors rules: {0}")]
286    PutBucketCors(SdkError<PutBucketCorsError>),
287
288    /// Failed to delete a file object
289    #[error("failed to delete file object")]
290    DeleteObject(SdkError<DeleteObjectError>),
291
292    /// Failed to get the file storage object
293    #[error("failed to get file storage object")]
294    GetObject(SdkError<GetObjectError>),
295}
296
297impl StorageLayerImpl for S3StorageLayer {
298    async fn create_bucket(&self) -> Result<CreateBucketOutcome, StorageLayerError> {
299        let bucket_region = self
300            .client
301            .config()
302            .region()
303            .ok_or(S3StorageError::MissingRegion)?
304            .to_string();
305
306        let constraint = BucketLocationConstraint::from(bucket_region.as_str());
307
308        let cfg = CreateBucketConfiguration::builder()
309            .location_constraint(constraint)
310            .build();
311
312        if let Err(error) = self
313            .client
314            .create_bucket()
315            .create_bucket_configuration(cfg)
316            .bucket(&self.bucket_name)
317            .send()
318            .await
319        {
320            let already_exists = error
321                .as_service_error()
322                .is_some_and(|value| value.is_bucket_already_owned_by_you());
323
324            // Bucket has already been created
325            if already_exists {
326                tracing::debug!("bucket already exists");
327                return Ok(CreateBucketOutcome::Existing);
328            }
329
330            tracing::error!(?error, "failed to create bucket");
331            return Err(S3StorageError::CreateBucket(error).into());
332        }
333
334        Ok(CreateBucketOutcome::New)
335    }
336
337    async fn bucket_exists(&self) -> Result<bool, StorageLayerError> {
338        if let Err(error) = self
339            .client
340            .head_bucket()
341            .bucket(&self.bucket_name)
342            .send()
343            .await
344        {
345            // Handle not found error (In this case its an indicator and not an error)
346            if error
347                .as_service_error()
348                .is_some_and(|error| error.is_not_found())
349            {
350                return Ok(false);
351            }
352
353            return Err(S3StorageError::HeadBucket(error).into());
354        }
355
356        Ok(true)
357    }
358
359    async fn delete_bucket(&self) -> Result<(), StorageLayerError> {
360        if let Err(error) = self
361            .client
362            .delete_bucket()
363            .bucket(&self.bucket_name)
364            .send()
365            .await
366        {
367            // Handle the bucket not existing
368            // (This is not a failure and indicates the bucket is already deleted)
369            if error
370                .as_service_error()
371                .and_then(|err| err.meta().code())
372                .is_some_and(|code| code == "NoSuchBucket")
373            {
374                tracing::debug!("bucket did not exist");
375                return Ok(());
376            }
377
378            tracing::error!(?error, "failed to delete bucket");
379
380            return Err(S3StorageError::DeleteBucket(error).into());
381        }
382
383        Ok(())
384    }
385
386    async fn upload_file(
387        &self,
388        key: &str,
389        content_type: String,
390        body: Bytes,
391    ) -> Result<(), StorageLayerError> {
392        self.client
393            .put_object()
394            .bucket(&self.bucket_name)
395            .content_type(content_type)
396            .key(key)
397            .body(body.into())
398            .send()
399            .await
400            .map_err(|error| {
401                tracing::error!(?error, "failed to store file object");
402                S3StorageError::PutObject(error)
403            })?;
404
405        Ok(())
406    }
407
408    async fn create_presigned(
409        &self,
410        key: &str,
411        size: i64,
412    ) -> Result<(PresignedRequest, DateTime<Utc>), StorageLayerError> {
413        let expiry_time_minutes = 30;
414        let expires_at = Utc::now()
415            .checked_add_signed(TimeDelta::minutes(expiry_time_minutes))
416            .ok_or(S3StorageError::UnixTimeCalculation)?;
417
418        let client = match self.external_client.as_ref() {
419            Some(external_client) => external_client,
420            None => &self.client,
421        };
422
423        let result = client
424            .put_object()
425            .bucket(&self.bucket_name)
426            .key(key)
427            .content_length(size)
428            .presigned(
429                PresigningConfig::builder()
430                    .expires_in(Duration::from_secs(60 * expiry_time_minutes as u64))
431                    .build()
432                    .map_err(|error| {
433                        tracing::error!(?error, "Failed to create presigned store config");
434                        S3StorageError::PresignedConfig
435                    })?,
436            )
437            .await
438            .map_err(|error| {
439                tracing::error!(?error, "failed to create presigned store file object");
440                S3StorageError::PutObjectPresigned(error)
441            })?;
442
443        Ok((result, expires_at))
444    }
445
446    async fn create_presigned_download(
447        &self,
448        key: &str,
449        expires_in: Duration,
450    ) -> Result<(PresignedRequest, DateTime<Utc>), StorageLayerError> {
451        let expires_at = Utc::now()
452            .checked_add_signed(TimeDelta::seconds(expires_in.as_secs() as i64))
453            .ok_or(S3StorageError::UnixTimeCalculation)?;
454
455        let client = match self.external_client.as_ref() {
456            Some(external_client) => external_client,
457            None => &self.client,
458        };
459
460        let result = client
461            .get_object()
462            .bucket(&self.bucket_name)
463            .key(key)
464            .presigned(PresigningConfig::expires_in(expires_in).map_err(|error| {
465                tracing::error!(?error, "failed to create presigned download config");
466                S3StorageError::PresignedConfig
467            })?)
468            .await
469            .map_err(|error| {
470                tracing::error!(?error, "failed to create presigned download");
471                S3StorageError::GetObjectPresigned(error)
472            })?;
473
474        Ok((result, expires_at))
475    }
476
477    async fn add_bucket_notifications(&self, sqs_arn: &str) -> Result<(), StorageLayerError> {
478        // Connect the S3 bucket for file upload notifications
479        self.client
480            .put_bucket_notification_configuration()
481            .bucket(&self.bucket_name)
482            .notification_configuration(
483                NotificationConfiguration::builder()
484                    .set_queue_configurations(Some(vec![
485                        QueueConfiguration::builder()
486                            .queue_arn(sqs_arn)
487                            .events(aws_sdk_s3::types::Event::S3ObjectCreated)
488                            .build()
489                            .map_err(|error| {
490                                tracing::error!(
491                                    ?error,
492                                    "failed to create bucket notification queue config"
493                                );
494                                S3StorageError::QueueConfig
495                            })?,
496                    ]))
497                    .build(),
498            )
499            .send()
500            .await
501            .map_err(|error| {
502                tracing::error!(?error, "failed to add bucket notification queue");
503                S3StorageError::PutBucketNotification(error)
504            })?;
505
506        Ok(())
507    }
508
509    async fn set_bucket_cors_origins(&self, origins: Vec<String>) -> Result<(), StorageLayerError> {
510        if let Err(error) = self
511            .client
512            .put_bucket_cors()
513            .bucket(&self.bucket_name)
514            .cors_configuration(
515                CorsConfiguration::builder()
516                    .cors_rules(
517                        CorsRule::builder()
518                            .allowed_headers("*")
519                            .allowed_methods("PUT")
520                            .set_allowed_origins(Some(origins))
521                            .set_expose_headers(Some(Vec::new()))
522                            .build()
523                            .map_err(|error| {
524                                tracing::error!(?error, "failed to create cors rule");
525                                S3StorageError::CreateCorsConfig
526                            })?,
527                    )
528                    .build()
529                    .map_err(|error| {
530                        tracing::error!(?error, "failed to create cors config");
531                        S3StorageError::CreateCorsConfig
532                    })?,
533            )
534            .send()
535            .await
536        {
537            // Handle "NotImplemented" errors (minio does not have CORS support)
538            if error
539                .raw_response()
540                // (501 Not Implemented)
541                .is_some_and(|response| response.status().as_u16() == 501)
542            {
543                tracing::warn!("storage s3 backend does not support PutBucketCors.. skipping..");
544                return Ok(());
545            }
546
547            tracing::error!(?error, "failed to add bucket cors");
548            return Err(S3StorageError::PutBucketCors(error).into());
549        };
550
551        Ok(())
552    }
553
554    async fn delete_file(&self, key: &str) -> Result<(), StorageLayerError> {
555        if let Err(error) = self
556            .client
557            .delete_object()
558            .bucket(&self.bucket_name)
559            .key(key)
560            .send()
561            .await
562        {
563            // Handle keys that don't exist in the bucket
564            // (This is not a failure and indicates the file is already deleted)
565            if error
566                .as_service_error()
567                .and_then(|err| err.source())
568                .and_then(|source| source.downcast_ref::<aws_sdk_s3::Error>())
569                .is_some_and(|err| matches!(err, aws_sdk_s3::Error::NoSuchKey(_)))
570            {
571                return Ok(());
572            }
573
574            tracing::error!(?error, "failed to delete file object");
575            return Err(S3StorageError::DeleteObject(error).into());
576        }
577
578        Ok(())
579    }
580
581    async fn get_file(&self, key: &str) -> Result<FileStream, StorageLayerError> {
582        let object = self
583            .client
584            .get_object()
585            .bucket(&self.bucket_name)
586            .key(key)
587            .send()
588            .await
589            .map_err(|error| {
590                tracing::error!(?error, "failed to get file storage object");
591                S3StorageError::GetObject(error)
592            })?;
593
594        let stream = FileStream {
595            stream: Box::pin(AwsFileStream { inner: object.body }),
596        };
597
598        Ok(stream)
599    }
600}
601
602/// File stream based on the AWS [ByteStream] type
603pub struct AwsFileStream {
604    inner: ByteStream,
605}
606
607impl AwsFileStream {
608    /// Get the underlying stream
609    pub fn into_inner(self) -> ByteStream {
610        self.inner
611    }
612}
613
614impl Stream for AwsFileStream {
615    type Item = std::io::Result<Bytes>;
616
617    fn poll_next(
618        self: std::pin::Pin<&mut Self>,
619        cx: &mut std::task::Context<'_>,
620    ) -> std::task::Poll<Option<Self::Item>> {
621        let this = self.get_mut();
622        let inner = std::pin::Pin::new(&mut this.inner);
623        inner.poll_next(cx).map_err(std::io::Error::other)
624    }
625}