docbox_storage/
s3.rs

1//! # S3 Storage Backend
2//!
3//! Storage backend backed by a [S3](https://docs.aws.amazon.com/s3/) compatible storage solution (AWS S3, MinIO, ...etc)
4//!
5//! # Environment Variables
6//!
7//! * `DOCBOX_S3_ENDPOINT` - URL to use when using a custom S3 endpoint
8//! * `DOCBOX_S3_ACCESS_KEY_ID` - Access key ID when using a custom S3 endpoint
9//! * `DOCBOX_S3_ACCESS_KEY_SECRET` - Access key secret when using a custom S3 endpoint
10
11use crate::{CreateBucketOutcome, FileStream, StorageLayerError, StorageLayerImpl};
12use aws_config::SdkConfig;
13use aws_sdk_s3::{
14    config::Credentials,
15    error::SdkError,
16    operation::{
17        create_bucket::CreateBucketError, delete_bucket::DeleteBucketError,
18        delete_object::DeleteObjectError, get_object::GetObjectError, head_bucket::HeadBucketError,
19        put_bucket_cors::PutBucketCorsError,
20        put_bucket_notification_configuration::PutBucketNotificationConfigurationError,
21        put_object::PutObjectError,
22    },
23    presigning::{PresignedRequest, PresigningConfig},
24    primitives::ByteStream,
25    types::{
26        BucketLocationConstraint, CorsConfiguration, CorsRule, CreateBucketConfiguration,
27        NotificationConfiguration, QueueConfiguration,
28    },
29};
30use bytes::Bytes;
31use chrono::{DateTime, TimeDelta, Utc};
32use futures::Stream;
33use serde::{Deserialize, Serialize};
34use std::{error::Error, fmt::Debug, time::Duration};
35use thiserror::Error;
36
37type S3Client = aws_sdk_s3::Client;
38
39/// Configuration for the S3 storage layer
40#[derive(Debug, Clone, Deserialize, Serialize)]
41pub struct S3StorageLayerFactoryConfig {
42    /// Endpoint to use for requests
43    pub endpoint: S3Endpoint,
44}
45
46/// Errors that could occur when loading the S3 storage layer configuration
47#[derive(Debug, Error)]
48pub enum S3StorageLayerFactoryConfigError {
49    /// Using a custom endpoint but didn't specify the access key ID
50    #[error("cannot use DOCBOX_S3_ENDPOINT without specifying DOCBOX_S3_ACCESS_KEY_ID")]
51    MissingAccessKeyId,
52
53    /// Using a custom endpoint but didn't specify the access key secret
54    #[error("cannot use DOCBOX_S3_ENDPOINT without specifying DOCBOX_S3_ACCESS_KEY_SECRET")]
55    MissingAccessKeySecret,
56}
57
58impl S3StorageLayerFactoryConfig {
59    /// Load a [S3StorageLayerFactoryConfig] from the current environment
60    pub fn from_env() -> Result<Self, S3StorageLayerFactoryConfigError> {
61        let endpoint = S3Endpoint::from_env()?;
62
63        Ok(Self { endpoint })
64    }
65}
66
67/// Endpoint to use for S3 operations
68#[derive(Clone, Deserialize, Serialize)]
69#[serde(tag = "type", rename_all = "snake_case")]
70pub enum S3Endpoint {
71    /// AWS default endpoint
72    Aws,
73    /// Custom endpoint (Minio or other compatible)
74    Custom {
75        /// Endpoint URL
76        endpoint: String,
77        /// Access key ID to use
78        access_key_id: String,
79        /// Access key secret to use
80        access_key_secret: String,
81    },
82}
83
84impl Debug for S3Endpoint {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        match self {
87            Self::Aws => write!(f, "Aws"),
88            Self::Custom { endpoint, .. } => f
89                .debug_struct("Custom")
90                .field("endpoint", endpoint)
91                .finish(),
92        }
93    }
94}
95
96impl S3Endpoint {
97    /// Load a [S3Endpoint] from the current environment
98    pub fn from_env() -> Result<Self, S3StorageLayerFactoryConfigError> {
99        match std::env::var("DOCBOX_S3_ENDPOINT") {
100            // Using a custom S3 endpoint
101            Ok(endpoint_url) => {
102                let access_key_id = std::env::var("DOCBOX_S3_ACCESS_KEY_ID")
103                    .map_err(|_| S3StorageLayerFactoryConfigError::MissingAccessKeyId)?;
104                let access_key_secret = std::env::var("DOCBOX_S3_ACCESS_KEY_SECRET")
105                    .map_err(|_| S3StorageLayerFactoryConfigError::MissingAccessKeySecret)?;
106
107                Ok(S3Endpoint::Custom {
108                    endpoint: endpoint_url,
109                    access_key_id,
110                    access_key_secret,
111                })
112            }
113            Err(_) => Ok(S3Endpoint::Aws),
114        }
115    }
116}
117
118/// Storage layer factory backend by a S3 compatible service
119#[derive(Clone)]
120pub struct S3StorageLayerFactory {
121    /// Client to access S3
122    client: S3Client,
123}
124
125impl S3StorageLayerFactory {
126    /// Create a [S3StorageLayerFactory] from a config
127    pub fn from_config(aws_config: &SdkConfig, config: S3StorageLayerFactoryConfig) -> Self {
128        let client = match config.endpoint {
129            S3Endpoint::Aws => {
130                tracing::debug!("using aws s3 storage layer");
131                S3Client::new(aws_config)
132            }
133            S3Endpoint::Custom {
134                endpoint,
135                access_key_id,
136                access_key_secret,
137            } => {
138                tracing::debug!("using custom s3 storage layer");
139                let credentials = Credentials::new(
140                    access_key_id,
141                    access_key_secret,
142                    None,
143                    None,
144                    "docbox_key_provider",
145                );
146
147                // Enforces the "path" style for S3 bucket access
148                let config = aws_sdk_s3::config::Builder::from(aws_config)
149                    .force_path_style(true)
150                    .endpoint_url(endpoint)
151                    .credentials_provider(credentials)
152                    .build();
153                S3Client::from_conf(config)
154            }
155        };
156
157        Self { client }
158    }
159
160    /// Create a [S3StorageLayer] for the provided `bucket_name`
161    pub fn create_storage_layer(&self, bucket_name: String) -> S3StorageLayer {
162        S3StorageLayer::new(self.client.clone(), bucket_name)
163    }
164}
165
166/// Storage layer backend by a S3 compatible service
167#[derive(Clone)]
168pub struct S3StorageLayer {
169    /// Client to access S3
170    client: S3Client,
171
172    /// Name of the bucket to use
173    bucket_name: String,
174}
175
176impl S3StorageLayer {
177    /// Create a new S3 storage layer from the client and bucket name
178    fn new(client: S3Client, bucket_name: String) -> Self {
179        Self {
180            client,
181            bucket_name,
182        }
183    }
184}
185
186/// User facing storage errors
187///
188/// Should not contain the actual error types, these will be logged
189/// early, only includes the actual error message
190#[derive(Debug, Error)]
191pub enum S3StorageError {
192    /// AWS region missing
193    #[error("invalid server configuration (region)")]
194    MissingRegion,
195
196    /// Failed to create a bucket
197    #[error("failed to create storage bucket")]
198    CreateBucket(SdkError<CreateBucketError>),
199
200    /// Failed to delete a bucket
201    #[error("failed to delete storage bucket")]
202    DeleteBucket(SdkError<DeleteBucketError>),
203
204    /// Failed to head a bucket
205    #[error("failed to get storage bucket")]
206    HeadBucket(SdkError<HeadBucketError>),
207
208    /// Failed to store a file in a bucket
209    #[error("failed to store file object")]
210    PutObject(SdkError<PutObjectError>),
211
212    /// Failed to calculate future unix timestamps
213    #[error("failed to calculate expiry timestamp")]
214    UnixTimeCalculation,
215
216    /// Failed to create presigned upload
217    #[error("failed to create presigned store file object")]
218    PutObjectPresigned(SdkError<PutObjectError>),
219
220    /// Failed to create presigned config
221    #[error("failed to create presigned config")]
222    PresignedConfig,
223
224    /// Failed to create presigned download
225    #[error("failed to get presigned store file object")]
226    GetObjectPresigned(SdkError<GetObjectError>),
227
228    /// Failed to create the config for the notification queue
229    #[error("failed to create bucket notification queue config")]
230    QueueConfig,
231
232    /// Failed to setup a notification queue on the bucket
233    ///
234    /// This error is allowed to expose the inner error details as
235    /// it is only used by the management layer and these errors are
236    /// helpful for management
237    #[error("failed to add bucket notification queue: {0}")]
238    PutBucketNotification(SdkError<PutBucketNotificationConfigurationError>),
239
240    /// Failed to make the cors config or rules
241    #[error("failed to create bucket cors config")]
242    CreateCorsConfig,
243
244    /// Failed to put the bucket cors config
245    ///
246    /// This error is allowed to expose the inner error details as
247    /// it is only used by the management layer and these errors are
248    /// helpful for management
249    #[error("failed to set bucket cors rules: {0}")]
250    PutBucketCors(SdkError<PutBucketCorsError>),
251
252    /// Failed to delete a file object
253    #[error("failed to delete file object")]
254    DeleteObject(SdkError<DeleteObjectError>),
255
256    /// Failed to get the file storage object
257    #[error("failed to get file storage object")]
258    GetObject(SdkError<GetObjectError>),
259}
260
261impl StorageLayerImpl for S3StorageLayer {
262    async fn create_bucket(&self) -> Result<CreateBucketOutcome, StorageLayerError> {
263        let bucket_region = self
264            .client
265            .config()
266            .region()
267            .ok_or(S3StorageError::MissingRegion)?
268            .to_string();
269
270        let constraint = BucketLocationConstraint::from(bucket_region.as_str());
271
272        let cfg = CreateBucketConfiguration::builder()
273            .location_constraint(constraint)
274            .build();
275
276        if let Err(error) = self
277            .client
278            .create_bucket()
279            .create_bucket_configuration(cfg)
280            .bucket(&self.bucket_name)
281            .send()
282            .await
283        {
284            let already_exists = error
285                .as_service_error()
286                .is_some_and(|value| value.is_bucket_already_owned_by_you());
287
288            // Bucket has already been created
289            if already_exists {
290                tracing::debug!("bucket already exists");
291                return Ok(CreateBucketOutcome::Existing);
292            }
293
294            tracing::error!(?error, "failed to create bucket");
295            return Err(S3StorageError::CreateBucket(error).into());
296        }
297
298        Ok(CreateBucketOutcome::New)
299    }
300
301    async fn bucket_exists(&self) -> Result<bool, StorageLayerError> {
302        if let Err(error) = self
303            .client
304            .head_bucket()
305            .bucket(&self.bucket_name)
306            .send()
307            .await
308        {
309            // Handle not found error (In this case its an indicator and not an error)
310            if error
311                .as_service_error()
312                .is_some_and(|error| error.is_not_found())
313            {
314                return Ok(false);
315            }
316
317            return Err(S3StorageError::HeadBucket(error).into());
318        }
319
320        Ok(true)
321    }
322
323    async fn delete_bucket(&self) -> Result<(), StorageLayerError> {
324        if let Err(error) = self
325            .client
326            .delete_bucket()
327            .bucket(&self.bucket_name)
328            .send()
329            .await
330        {
331            // Handle the bucket not existing
332            // (This is not a failure and indicates the bucket is already deleted)
333            if error
334                .as_service_error()
335                .and_then(|err| err.meta().code())
336                .is_some_and(|code| code == "NoSuchBucket")
337            {
338                tracing::debug!("bucket did not exist");
339                return Ok(());
340            }
341
342            tracing::error!(?error, "failed to delete bucket");
343
344            return Err(S3StorageError::DeleteBucket(error).into());
345        }
346
347        Ok(())
348    }
349
350    async fn upload_file(
351        &self,
352        key: &str,
353        content_type: String,
354        body: Bytes,
355    ) -> Result<(), StorageLayerError> {
356        self.client
357            .put_object()
358            .bucket(&self.bucket_name)
359            .content_type(content_type)
360            .key(key)
361            .body(body.into())
362            .send()
363            .await
364            .map_err(|error| {
365                tracing::error!(?error, "failed to store file object");
366                S3StorageError::PutObject(error)
367            })?;
368
369        Ok(())
370    }
371
372    async fn create_presigned(
373        &self,
374        key: &str,
375        size: i64,
376    ) -> Result<(PresignedRequest, DateTime<Utc>), StorageLayerError> {
377        let expiry_time_minutes = 30;
378        let expires_at = Utc::now()
379            .checked_add_signed(TimeDelta::minutes(expiry_time_minutes))
380            .ok_or(S3StorageError::UnixTimeCalculation)?;
381
382        let result = self
383            .client
384            .put_object()
385            .bucket(&self.bucket_name)
386            .key(key)
387            .content_length(size)
388            .presigned(
389                PresigningConfig::builder()
390                    .expires_in(Duration::from_secs(60 * expiry_time_minutes as u64))
391                    .build()
392                    .map_err(|error| {
393                        tracing::error!(?error, "Failed to create presigned store config");
394                        S3StorageError::PresignedConfig
395                    })?,
396            )
397            .await
398            .map_err(|error| {
399                tracing::error!(?error, "failed to create presigned store file object");
400                S3StorageError::PutObjectPresigned(error)
401            })?;
402
403        Ok((result, expires_at))
404    }
405
406    async fn create_presigned_download(
407        &self,
408        key: &str,
409        expires_in: Duration,
410    ) -> Result<(PresignedRequest, DateTime<Utc>), StorageLayerError> {
411        let expires_at = Utc::now()
412            .checked_add_signed(TimeDelta::seconds(expires_in.as_secs() as i64))
413            .ok_or(S3StorageError::UnixTimeCalculation)?;
414
415        let result = self
416            .client
417            .get_object()
418            .bucket(&self.bucket_name)
419            .key(key)
420            .presigned(PresigningConfig::expires_in(expires_in).map_err(|error| {
421                tracing::error!(?error, "failed to create presigned download config");
422                S3StorageError::PresignedConfig
423            })?)
424            .await
425            .map_err(|error| {
426                tracing::error!(?error, "failed to create presigned download");
427                S3StorageError::GetObjectPresigned(error)
428            })?;
429
430        Ok((result, expires_at))
431    }
432
433    async fn add_bucket_notifications(&self, sqs_arn: &str) -> Result<(), StorageLayerError> {
434        // Connect the S3 bucket for file upload notifications
435        self.client
436            .put_bucket_notification_configuration()
437            .bucket(&self.bucket_name)
438            .notification_configuration(
439                NotificationConfiguration::builder()
440                    .set_queue_configurations(Some(vec![
441                        QueueConfiguration::builder()
442                            .queue_arn(sqs_arn)
443                            .events(aws_sdk_s3::types::Event::S3ObjectCreated)
444                            .build()
445                            .map_err(|error| {
446                                tracing::error!(
447                                    ?error,
448                                    "failed to create bucket notification queue config"
449                                );
450                                S3StorageError::QueueConfig
451                            })?,
452                    ]))
453                    .build(),
454            )
455            .send()
456            .await
457            .map_err(|error| {
458                tracing::error!(?error, "failed to add bucket notification queue");
459                S3StorageError::PutBucketNotification(error)
460            })?;
461
462        Ok(())
463    }
464
465    async fn set_bucket_cors_origins(&self, origins: Vec<String>) -> Result<(), StorageLayerError> {
466        if let Err(error) = self
467            .client
468            .put_bucket_cors()
469            .bucket(&self.bucket_name)
470            .cors_configuration(
471                CorsConfiguration::builder()
472                    .cors_rules(
473                        CorsRule::builder()
474                            .allowed_headers("*")
475                            .allowed_methods("PUT")
476                            .set_allowed_origins(Some(origins))
477                            .set_expose_headers(Some(Vec::new()))
478                            .build()
479                            .map_err(|error| {
480                                tracing::error!(?error, "failed to create cors rule");
481                                S3StorageError::CreateCorsConfig
482                            })?,
483                    )
484                    .build()
485                    .map_err(|error| {
486                        tracing::error!(?error, "failed to create cors config");
487                        S3StorageError::CreateCorsConfig
488                    })?,
489            )
490            .send()
491            .await
492        {
493            // Handle "NotImplemented" errors (minio does not have CORS support)
494            if error
495                .raw_response()
496                // (501 Not Implemented)
497                .is_some_and(|response| response.status().as_u16() == 501)
498            {
499                tracing::warn!("storage s3 backend does not support PutBucketCors.. skipping..");
500                return Ok(());
501            }
502
503            tracing::error!(?error, "failed to add bucket cors");
504            return Err(S3StorageError::PutBucketCors(error).into());
505        };
506
507        Ok(())
508    }
509
510    async fn delete_file(&self, key: &str) -> Result<(), StorageLayerError> {
511        if let Err(error) = self
512            .client
513            .delete_object()
514            .bucket(&self.bucket_name)
515            .key(key)
516            .send()
517            .await
518        {
519            // Handle keys that don't exist in the bucket
520            // (This is not a failure and indicates the file is already deleted)
521            if error
522                .as_service_error()
523                .and_then(|err| err.source())
524                .and_then(|source| source.downcast_ref::<aws_sdk_s3::Error>())
525                .is_some_and(|err| matches!(err, aws_sdk_s3::Error::NoSuchKey(_)))
526            {
527                return Ok(());
528            }
529
530            tracing::error!(?error, "failed to delete file object");
531            return Err(S3StorageError::DeleteObject(error).into());
532        }
533
534        Ok(())
535    }
536
537    async fn get_file(&self, key: &str) -> Result<FileStream, StorageLayerError> {
538        let object = self
539            .client
540            .get_object()
541            .bucket(&self.bucket_name)
542            .key(key)
543            .send()
544            .await
545            .map_err(|error| {
546                tracing::error!(?error, "failed to get file storage object");
547                S3StorageError::GetObject(error)
548            })?;
549
550        let stream = FileStream {
551            stream: Box::pin(AwsFileStream { inner: object.body }),
552        };
553
554        Ok(stream)
555    }
556}
557
558/// File stream based on the AWS [ByteStream] type
559pub struct AwsFileStream {
560    inner: ByteStream,
561}
562
563impl AwsFileStream {
564    /// Get the underlying stream
565    pub fn into_inner(self) -> ByteStream {
566        self.inner
567    }
568}
569
570impl Stream for AwsFileStream {
571    type Item = std::io::Result<Bytes>;
572
573    fn poll_next(
574        self: std::pin::Pin<&mut Self>,
575        cx: &mut std::task::Context<'_>,
576    ) -> std::task::Poll<Option<Self::Item>> {
577        let this = self.get_mut();
578        let inner = std::pin::Pin::new(&mut this.inner);
579        inner.poll_next(cx).map_err(std::io::Error::other)
580    }
581}