Skip to main content

docbox_storage/
s3.rs

1//! # S3 Storage Backend
2//!
3//! Storage backend backed by a [S3](https://docs.aws.amazon.com/s3/) compatible storage solution (AWS S3, MinIO, ...etc)
4//!
5//! # Environment Variables
6//!
7//! * `DOCBOX_S3_ENDPOINT` - URL to use when using a custom S3 endpoint
8//! * `DOCBOX_S3_EXTERNAL_ENDPOINT` - Alternative "external" user facing endpoint, useful when running the server in docker with a different endpoint
9//! * `DOCBOX_S3_ACCESS_KEY_ID` - Access key ID when using a custom S3 endpoint
10//! * `DOCBOX_S3_ACCESS_KEY_SECRET` - Access key secret when using a custom S3 endpoint
11
12use crate::{
13    CreateBucketOutcome, FileStream, StorageLayerError, StorageLayerImpl, UploadFileOptions,
14    UploadFileTag,
15};
16use aws_config::SdkConfig;
17use aws_sdk_s3::{
18    config::Credentials,
19    error::SdkError,
20    operation::{
21        create_bucket::CreateBucketError, delete_bucket::DeleteBucketError,
22        delete_object::DeleteObjectError,
23        get_bucket_lifecycle_configuration::GetBucketLifecycleConfigurationError,
24        get_object::GetObjectError, head_bucket::HeadBucketError,
25        put_bucket_cors::PutBucketCorsError,
26        put_bucket_lifecycle_configuration::PutBucketLifecycleConfigurationError,
27        put_bucket_notification_configuration::PutBucketNotificationConfigurationError,
28        put_object::PutObjectError,
29    },
30    presigning::{PresignedRequest, PresigningConfig},
31    primitives::ByteStream,
32    types::{
33        BucketLifecycleConfiguration, BucketLocationConstraint, CorsConfiguration, CorsRule,
34        CreateBucketConfiguration, LifecycleExpiration, LifecycleRule, LifecycleRuleFilter,
35        NotificationConfiguration, QueueConfiguration, Tag,
36    },
37};
38use bytes::Bytes;
39use chrono::{DateTime, TimeDelta, Utc};
40use futures::Stream;
41use serde::{Deserialize, Serialize};
42use std::{error::Error, fmt::Debug, time::Duration};
43use thiserror::Error;
44
45type S3Client = aws_sdk_s3::Client;
46
47/// Configuration for the S3 storage layer
48#[derive(Debug, Default, Clone, Deserialize, Serialize)]
49#[serde(default)]
50pub struct S3StorageLayerFactoryConfig {
51    /// Endpoint to use for requests
52    pub endpoint: S3Endpoint,
53}
54
55/// Errors that could occur when loading the S3 storage layer configuration
56#[derive(Debug, Error)]
57pub enum S3StorageLayerFactoryConfigError {
58    /// Using a custom endpoint but didn't specify the access key ID
59    #[error("cannot use DOCBOX_S3_ENDPOINT without specifying DOCBOX_S3_ACCESS_KEY_ID")]
60    MissingAccessKeyId,
61
62    /// Using a custom endpoint but didn't specify the access key secret
63    #[error("cannot use DOCBOX_S3_ENDPOINT without specifying DOCBOX_S3_ACCESS_KEY_SECRET")]
64    MissingAccessKeySecret,
65}
66
67impl S3StorageLayerFactoryConfig {
68    /// Load a [S3StorageLayerFactoryConfig] from the current environment
69    pub fn from_env() -> Result<Self, S3StorageLayerFactoryConfigError> {
70        let endpoint = S3Endpoint::from_env()?;
71
72        Ok(Self { endpoint })
73    }
74}
75
76/// Endpoint to use for S3 operations
77#[derive(Default, Clone, Deserialize, Serialize)]
78#[serde(tag = "type", rename_all = "snake_case")]
79pub enum S3Endpoint {
80    /// AWS default endpoint
81    #[default]
82    Aws,
83    /// Custom endpoint (Minio or other compatible)
84    Custom {
85        /// Endpoint URL
86        endpoint: String,
87        /// Endpoint to use for external requests (Presigned requests)
88        external_endpoint: Option<String>,
89        /// Access key ID to use
90        access_key_id: String,
91        /// Access key secret to use
92        access_key_secret: String,
93    },
94}
95
96impl Debug for S3Endpoint {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        match self {
99            Self::Aws => write!(f, "Aws"),
100            Self::Custom { endpoint, .. } => f
101                .debug_struct("Custom")
102                .field("endpoint", endpoint)
103                .finish(),
104        }
105    }
106}
107
108impl S3Endpoint {
109    /// Load a [S3Endpoint] from the current environment
110    pub fn from_env() -> Result<Self, S3StorageLayerFactoryConfigError> {
111        match std::env::var("DOCBOX_S3_ENDPOINT") {
112            // Using a custom S3 endpoint
113            Ok(endpoint_url) => {
114                let access_key_id = std::env::var("DOCBOX_S3_ACCESS_KEY_ID")
115                    .map_err(|_| S3StorageLayerFactoryConfigError::MissingAccessKeyId)?;
116                let access_key_secret = std::env::var("DOCBOX_S3_ACCESS_KEY_SECRET")
117                    .map_err(|_| S3StorageLayerFactoryConfigError::MissingAccessKeySecret)?;
118
119                let external_endpoint = std::env::var("DOCBOX_S3_EXTERNAL_ENDPOINT").ok();
120
121                Ok(S3Endpoint::Custom {
122                    endpoint: endpoint_url,
123                    external_endpoint,
124                    access_key_id,
125                    access_key_secret,
126                })
127            }
128            Err(_) => Ok(S3Endpoint::Aws),
129        }
130    }
131}
132
133/// Storage layer factory backend by a S3 compatible service
134#[derive(Clone)]
135pub struct S3StorageLayerFactory {
136    /// Client to access S3
137    client: S3Client,
138    /// Optional different client for creating presigned external requests
139    external_client: Option<S3Client>,
140}
141
142impl S3StorageLayerFactory {
143    /// Create a [S3StorageLayerFactory] from a config
144    pub fn from_config(aws_config: &SdkConfig, config: S3StorageLayerFactoryConfig) -> Self {
145        let (client, external_client) = match config.endpoint {
146            S3Endpoint::Aws => {
147                tracing::debug!("using aws s3 storage layer");
148                (S3Client::new(aws_config), None)
149            }
150            S3Endpoint::Custom {
151                endpoint,
152                external_endpoint,
153                access_key_id,
154                access_key_secret,
155            } => {
156                tracing::debug!("using custom s3 storage layer");
157                let credentials = Credentials::new(
158                    access_key_id,
159                    access_key_secret,
160                    None,
161                    None,
162                    "docbox_key_provider",
163                );
164
165                // Enforces the "path" style for S3 bucket access
166                let config_builder = aws_sdk_s3::config::Builder::from(aws_config)
167                    .force_path_style(true)
168                    .endpoint_url(endpoint)
169                    .credentials_provider(credentials);
170
171                // Create an external client for external s3 requests if needed
172                let external_client = match external_endpoint {
173                    Some(external_endpoint) => {
174                        let config = config_builder
175                            .clone()
176                            .endpoint_url(external_endpoint)
177                            .build();
178                        let client = S3Client::from_conf(config);
179                        Some(client)
180                    }
181                    None => None,
182                };
183
184                let config = config_builder.build();
185                let client = S3Client::from_conf(config);
186
187                (client, external_client)
188            }
189        };
190
191        Self {
192            client,
193            external_client,
194        }
195    }
196
197    /// Create a [S3StorageLayer] for the provided `bucket_name`
198    pub fn create_storage_layer(&self, bucket_name: String) -> S3StorageLayer {
199        S3StorageLayer::new(
200            self.client.clone(),
201            self.external_client.clone(),
202            bucket_name,
203        )
204    }
205}
206
207/// Storage layer backend by a S3 compatible service
208#[derive(Clone)]
209pub struct S3StorageLayer {
210    /// Name of the bucket to use
211    bucket_name: String,
212
213    /// Client to access S3
214    client: S3Client,
215
216    /// Optional different client for creating presigned external requests
217    external_client: Option<S3Client>,
218}
219
220impl S3StorageLayer {
221    /// Create a new S3 storage layer from the client and bucket name
222    fn new(client: S3Client, external_client: Option<S3Client>, bucket_name: String) -> Self {
223        Self {
224            bucket_name,
225            client,
226            external_client,
227        }
228    }
229
230    /// Migration to add storage lifecycle rules tags to the storage bucket
231    /// to allow expiring objects
232    async fn m1_storage_lifecycle_rules(&self) -> Result<(), StorageLayerError> {
233        let existing_lifecycle_configuration_rules = match self
234            .client
235            .get_bucket_lifecycle_configuration()
236            .bucket(&self.bucket_name)
237            .send()
238            .await
239            .inspect_err(|error| {
240                tracing::error!(
241                    ?error,
242                    "failed to get existing bucket lifecycle configuration"
243                )
244            }) {
245            Ok(value) => value.rules,
246            Err(error) => match error.as_service_error() {
247                // Tolerate NoSuchLifecycleConfiguration error for buckets that have no rules yet
248                Some(error)
249                    if error
250                        .meta()
251                        .code()
252                        .is_some_and(|code| code == "NoSuchLifecycleConfiguration") =>
253                {
254                    None
255                }
256
257                _ => return Err(S3StorageError::GetBucketLifecycleConfiguration(error).into()),
258            },
259        };
260
261        self.client
262            .put_bucket_lifecycle_configuration()
263            .bucket(&self.bucket_name)
264            .lifecycle_configuration(
265                BucketLifecycleConfiguration::builder()
266                    // Copy existing lifecycle rules
267                    .set_rules(existing_lifecycle_configuration_rules)
268                    // expire: 1d (1 day file expiry rule)
269                    .rules(
270                        LifecycleRule::builder()
271                            .id("expire-1d")
272                            .status(aws_sdk_s3::types::ExpirationStatus::Enabled)
273                            .filter(
274                                LifecycleRuleFilter::builder()
275                                    .tag(
276                                        Tag::builder()
277                                            .key("expire")
278                                            .value("1d")
279                                            .build()
280                                            .expect("invalid tag"),
281                                    )
282                                    .build(),
283                            )
284                            .expiration(LifecycleExpiration::builder().days(1).build())
285                            .build()
286                            .expect("invalid lifecycle rule configuration"),
287                    )
288                    // expire: 30d (30 day file expiry rule)
289                    .rules(
290                        LifecycleRule::builder()
291                            .id("expire-30d")
292                            .status(aws_sdk_s3::types::ExpirationStatus::Enabled)
293                            .filter(
294                                LifecycleRuleFilter::builder()
295                                    .tag(
296                                        Tag::builder()
297                                            .key("expire")
298                                            .value("30d")
299                                            .build()
300                                            .expect("invalid tag"),
301                                    )
302                                    .build(),
303                            )
304                            .expiration(LifecycleExpiration::builder().days(30).build())
305                            .build()
306                            .expect("invalid lifecycle rule configuration"),
307                    )
308                    .build()
309                    .expect("invalid lifecycle configuration"),
310            )
311            .send()
312            .await
313            .inspect_err(|error| {
314                tracing::error!(?error, "failed to put bucket lifecycle configuration")
315            })
316            .map_err(S3StorageError::PutBucketLifecycleConfiguration)?;
317
318        Ok(())
319    }
320}
321
322/// User facing storage errors
323///
324/// Should not contain the actual error types, these will be logged
325/// early, only includes the actual error message
326#[derive(Debug, Error)]
327pub enum S3StorageError {
328    /// AWS region missing
329    #[error("invalid server configuration (region)")]
330    MissingRegion,
331
332    /// Failed to create a bucket
333    #[error("failed to create storage bucket")]
334    CreateBucket(SdkError<CreateBucketError>),
335
336    /// Failed to delete a bucket
337    #[error("failed to delete storage bucket")]
338    DeleteBucket(SdkError<DeleteBucketError>),
339
340    /// Failed to head a bucket
341    #[error("failed to get storage bucket")]
342    HeadBucket(SdkError<HeadBucketError>),
343
344    /// Failed to store a file in a bucket
345    #[error("failed to store file object")]
346    PutObject(SdkError<PutObjectError>),
347
348    /// Failed to calculate future unix timestamps
349    #[error("failed to calculate expiry timestamp")]
350    UnixTimeCalculation,
351
352    /// Failed to create presigned upload
353    #[error("failed to create presigned store file object")]
354    PutObjectPresigned(SdkError<PutObjectError>),
355
356    /// Failed to create presigned config
357    #[error("failed to create presigned config")]
358    PresignedConfig,
359
360    /// Failed to create presigned download
361    #[error("failed to get presigned store file object")]
362    GetObjectPresigned(SdkError<GetObjectError>),
363
364    /// Failed to create the config for the notification queue
365    #[error("failed to create bucket notification queue config")]
366    QueueConfig,
367
368    /// Failed to setup a notification queue on the bucket
369    ///
370    /// This error is allowed to expose the inner error details as
371    /// it is only used by the management layer and these errors are
372    /// helpful for management
373    #[error("failed to add bucket notification queue: {0}")]
374    PutBucketNotification(SdkError<PutBucketNotificationConfigurationError>),
375
376    /// Failed to make the cors config or rules
377    #[error("failed to create bucket cors config")]
378    CreateCorsConfig,
379
380    /// Failed to put the bucket cors config
381    ///
382    /// This error is allowed to expose the inner error details as
383    /// it is only used by the management layer and these errors are
384    /// helpful for management
385    #[error("failed to set bucket cors rules: {0}")]
386    PutBucketCors(SdkError<PutBucketCorsError>),
387
388    /// Failed to delete a file object
389    #[error("failed to delete file object")]
390    DeleteObject(SdkError<DeleteObjectError>),
391
392    /// Failed to get the file storage object
393    #[error("failed to get file storage object")]
394    GetObject(SdkError<GetObjectError>),
395
396    /// Failed to get the existing bucket lifecycle configuration
397    ///
398    /// This error is allowed to expose the inner error details as
399    /// it is only used by the management layer and these errors are
400    /// helpful for management
401    #[error("failed to get bucket lifecycle configuration: {0}")]
402    GetBucketLifecycleConfiguration(SdkError<GetBucketLifecycleConfigurationError>),
403
404    /// Failed to put the bucket lifecycle configuration
405    ///
406    /// This error is allowed to expose the inner error details as
407    /// it is only used by the management layer and these errors are
408    /// helpful for management
409    #[error("failed to put bucket lifecycle configuration: {0}")]
410    PutBucketLifecycleConfiguration(SdkError<PutBucketLifecycleConfigurationError>),
411}
412
413const MIGRATION_NAMES: &[&str] = &["m1_storage_lifecycle_rules"];
414
415impl StorageLayerImpl for S3StorageLayer {
416    fn bucket_name(&self) -> String {
417        self.bucket_name.clone()
418    }
419
420    async fn create_bucket(&self) -> Result<CreateBucketOutcome, StorageLayerError> {
421        let bucket_region = self
422            .client
423            .config()
424            .region()
425            .ok_or(S3StorageError::MissingRegion)?
426            .to_string();
427
428        let constraint = BucketLocationConstraint::from(bucket_region.as_str());
429
430        let cfg = CreateBucketConfiguration::builder()
431            .location_constraint(constraint)
432            .build();
433
434        if let Err(error) = self
435            .client
436            .create_bucket()
437            .create_bucket_configuration(cfg)
438            .bucket(&self.bucket_name)
439            .send()
440            .await
441        {
442            let already_exists = error
443                .as_service_error()
444                .is_some_and(|value| value.is_bucket_already_owned_by_you());
445
446            // Bucket has already been created
447            if already_exists {
448                tracing::debug!("bucket already exists");
449                return Ok(CreateBucketOutcome::Existing);
450            }
451
452            tracing::error!(?error, "failed to create bucket");
453            return Err(S3StorageError::CreateBucket(error).into());
454        }
455
456        Ok(CreateBucketOutcome::New)
457    }
458
459    async fn bucket_exists(&self) -> Result<bool, StorageLayerError> {
460        if let Err(error) = self
461            .client
462            .head_bucket()
463            .bucket(&self.bucket_name)
464            .send()
465            .await
466        {
467            // Handle not found error (In this case its an indicator and not an error)
468            if error
469                .as_service_error()
470                .is_some_and(|error| error.is_not_found())
471            {
472                return Ok(false);
473            }
474
475            return Err(S3StorageError::HeadBucket(error).into());
476        }
477
478        Ok(true)
479    }
480
481    async fn delete_bucket(&self) -> Result<(), StorageLayerError> {
482        if let Err(error) = self
483            .client
484            .delete_bucket()
485            .bucket(&self.bucket_name)
486            .send()
487            .await
488        {
489            // Handle the bucket not existing
490            // (This is not a failure and indicates the bucket is already deleted)
491            if error
492                .as_service_error()
493                .and_then(|err| err.meta().code())
494                .is_some_and(|code| code == "NoSuchBucket")
495            {
496                tracing::debug!("bucket did not exist");
497                return Ok(());
498            }
499
500            tracing::error!(?error, "failed to delete bucket");
501
502            return Err(S3StorageError::DeleteBucket(error).into());
503        }
504
505        Ok(())
506    }
507
508    async fn upload_file(
509        &self,
510        key: &str,
511        body: Bytes,
512        options: UploadFileOptions,
513    ) -> Result<(), StorageLayerError> {
514        let tagging = options.tags.map(|tags| {
515            use itertools::Itertools;
516
517            tags.into_iter()
518                .map(|tag| match tag {
519                    UploadFileTag::ExpireDays1 => "expire=1d",
520                    UploadFileTag::ExpireDays30 => "expire=30d",
521                })
522                .join("&")
523        });
524
525        self.client
526            .put_object()
527            .bucket(&self.bucket_name)
528            .content_type(options.content_type)
529            .key(key)
530            .set_tagging(tagging)
531            .body(body.into())
532            .send()
533            .await
534            .map_err(|error| {
535                tracing::error!(?error, "failed to store file object");
536                S3StorageError::PutObject(error)
537            })?;
538
539        Ok(())
540    }
541
542    async fn create_presigned(
543        &self,
544        key: &str,
545        size: i64,
546    ) -> Result<(PresignedRequest, DateTime<Utc>), StorageLayerError> {
547        let expiry_time_minutes = 30;
548        let expires_at = Utc::now()
549            .checked_add_signed(TimeDelta::minutes(expiry_time_minutes))
550            .ok_or(S3StorageError::UnixTimeCalculation)?;
551
552        let client = match self.external_client.as_ref() {
553            Some(external_client) => external_client,
554            None => &self.client,
555        };
556
557        let result = client
558            .put_object()
559            .bucket(&self.bucket_name)
560            .key(key)
561            .content_length(size)
562            .presigned(
563                PresigningConfig::builder()
564                    .expires_in(Duration::from_secs(60 * expiry_time_minutes as u64))
565                    .build()
566                    .map_err(|error| {
567                        tracing::error!(?error, "Failed to create presigned store config");
568                        S3StorageError::PresignedConfig
569                    })?,
570            )
571            .await
572            .map_err(|error| {
573                tracing::error!(?error, "failed to create presigned store file object");
574                S3StorageError::PutObjectPresigned(error)
575            })?;
576
577        Ok((result, expires_at))
578    }
579
580    async fn create_presigned_download(
581        &self,
582        key: &str,
583        expires_in: Duration,
584    ) -> Result<(PresignedRequest, DateTime<Utc>), StorageLayerError> {
585        let expires_at = Utc::now()
586            .checked_add_signed(TimeDelta::seconds(expires_in.as_secs() as i64))
587            .ok_or(S3StorageError::UnixTimeCalculation)?;
588
589        let client = match self.external_client.as_ref() {
590            Some(external_client) => external_client,
591            None => &self.client,
592        };
593
594        let result = client
595            .get_object()
596            .bucket(&self.bucket_name)
597            .key(key)
598            .presigned(PresigningConfig::expires_in(expires_in).map_err(|error| {
599                tracing::error!(?error, "failed to create presigned download config");
600                S3StorageError::PresignedConfig
601            })?)
602            .await
603            .map_err(|error| {
604                tracing::error!(?error, "failed to create presigned download");
605                S3StorageError::GetObjectPresigned(error)
606            })?;
607
608        Ok((result, expires_at))
609    }
610
611    async fn add_bucket_notifications(&self, sqs_arn: &str) -> Result<(), StorageLayerError> {
612        // Connect the S3 bucket for file upload notifications
613        self.client
614            .put_bucket_notification_configuration()
615            .bucket(&self.bucket_name)
616            .notification_configuration(
617                NotificationConfiguration::builder()
618                    .set_queue_configurations(Some(vec![
619                        QueueConfiguration::builder()
620                            .queue_arn(sqs_arn)
621                            .events(aws_sdk_s3::types::Event::S3ObjectCreated)
622                            .build()
623                            .map_err(|error| {
624                                tracing::error!(
625                                    ?error,
626                                    "failed to create bucket notification queue config"
627                                );
628                                S3StorageError::QueueConfig
629                            })?,
630                    ]))
631                    .build(),
632            )
633            .send()
634            .await
635            .map_err(|error| {
636                tracing::error!(?error, "failed to add bucket notification queue");
637                S3StorageError::PutBucketNotification(error)
638            })?;
639
640        Ok(())
641    }
642
643    async fn set_bucket_cors_origins(&self, origins: Vec<String>) -> Result<(), StorageLayerError> {
644        if let Err(error) = self
645            .client
646            .put_bucket_cors()
647            .bucket(&self.bucket_name)
648            .cors_configuration(
649                CorsConfiguration::builder()
650                    .cors_rules(
651                        CorsRule::builder()
652                            .allowed_headers("*")
653                            .allowed_methods("PUT")
654                            .set_allowed_origins(Some(origins))
655                            .set_expose_headers(Some(Vec::new()))
656                            .build()
657                            .map_err(|error| {
658                                tracing::error!(?error, "failed to create cors rule");
659                                S3StorageError::CreateCorsConfig
660                            })?,
661                    )
662                    .build()
663                    .map_err(|error| {
664                        tracing::error!(?error, "failed to create cors config");
665                        S3StorageError::CreateCorsConfig
666                    })?,
667            )
668            .send()
669            .await
670        {
671            // Handle "NotImplemented" errors (minio does not have CORS support)
672            if error
673                .raw_response()
674                // (501 Not Implemented)
675                .is_some_and(|response| response.status().as_u16() == 501)
676            {
677                tracing::warn!("storage s3 backend does not support PutBucketCors.. skipping..");
678                return Ok(());
679            }
680
681            tracing::error!(?error, "failed to add bucket cors");
682            return Err(S3StorageError::PutBucketCors(error).into());
683        };
684
685        Ok(())
686    }
687
688    async fn delete_file(&self, key: &str) -> Result<(), StorageLayerError> {
689        if let Err(error) = self
690            .client
691            .delete_object()
692            .bucket(&self.bucket_name)
693            .key(key)
694            .send()
695            .await
696        {
697            // Handle keys that don't exist in the bucket
698            // (This is not a failure and indicates the file is already deleted)
699            if error
700                .as_service_error()
701                .and_then(|err| err.source())
702                .and_then(|source| source.downcast_ref::<aws_sdk_s3::Error>())
703                .is_some_and(|err| matches!(err, aws_sdk_s3::Error::NoSuchKey(_)))
704            {
705                return Ok(());
706            }
707
708            tracing::error!(?error, "failed to delete file object");
709            return Err(S3StorageError::DeleteObject(error).into());
710        }
711
712        Ok(())
713    }
714
715    async fn get_file(&self, key: &str) -> Result<FileStream, StorageLayerError> {
716        let object = self
717            .client
718            .get_object()
719            .bucket(&self.bucket_name)
720            .key(key)
721            .send()
722            .await
723            .map_err(|error| {
724                tracing::error!(?error, "failed to get file storage object");
725                S3StorageError::GetObject(error)
726            })?;
727
728        let stream = FileStream {
729            stream: Box::pin(AwsFileStream { inner: object.body }),
730        };
731
732        Ok(stream)
733    }
734
735    async fn get_pending_migrations(
736        &self,
737        applied_names: Vec<String>,
738    ) -> Result<Vec<String>, StorageLayerError> {
739        Ok(MIGRATION_NAMES
740            .iter()
741            .map(|name| name.to_string())
742            .filter(|name| !applied_names.contains(name))
743            .collect())
744    }
745
746    async fn apply_migration(&self, name: &str) -> Result<(), StorageLayerError> {
747        #[allow(clippy::single_match)]
748        match name {
749            "m1_storage_lifecycle_rules" => self.m1_storage_lifecycle_rules().await,
750            _ => Ok(()),
751        }
752    }
753}
754
755/// File stream based on the AWS [ByteStream] type
756pub struct AwsFileStream {
757    inner: ByteStream,
758}
759
760impl AwsFileStream {
761    /// Get the underlying stream
762    pub fn into_inner(self) -> ByteStream {
763        self.inner
764    }
765}
766
767impl Stream for AwsFileStream {
768    type Item = std::io::Result<Bytes>;
769
770    fn poll_next(
771        self: std::pin::Pin<&mut Self>,
772        cx: &mut std::task::Context<'_>,
773    ) -> std::task::Poll<Option<Self::Item>> {
774        let this = self.get_mut();
775        let inner = std::pin::Pin::new(&mut this.inner);
776        inner.poll_next(cx).map_err(std::io::Error::other)
777    }
778}