docbox_core/storage/
mod.rs

1use aws_config::SdkConfig;
2use aws_sdk_s3::presigning::PresignedRequest;
3use bytes::{Buf, Bytes};
4use bytes_utils::SegmentedBuf;
5use chrono::{DateTime, Utc};
6use docbox_database::models::tenant::Tenant;
7use futures::{Stream, StreamExt};
8use s3::{S3StorageLayer, S3StorageLayerFactory};
9use serde::Deserialize;
10use std::pin::Pin;
11
12use crate::storage::s3::S3StorageLayerFactoryConfig;
13
14pub mod s3;
15
16#[derive(Debug, Clone, Deserialize)]
17#[serde(tag = "provider", rename_all = "snake_case")]
18pub enum StorageLayerFactoryConfig {
19    S3(S3StorageLayerFactoryConfig),
20}
21
22impl StorageLayerFactoryConfig {
23    pub fn from_env() -> anyhow::Result<Self> {
24        S3StorageLayerFactoryConfig::from_env().map(Self::S3)
25    }
26}
27
28#[derive(Clone)]
29pub enum StorageLayerFactory {
30    S3(S3StorageLayerFactory),
31}
32
33impl StorageLayerFactory {
34    pub fn from_config(aws_config: &SdkConfig, config: StorageLayerFactoryConfig) -> Self {
35        match config {
36            StorageLayerFactoryConfig::S3(config) => {
37                Self::S3(S3StorageLayerFactory::from_config(aws_config, config))
38            }
39        }
40    }
41
42    pub fn create_storage_layer(&self, tenant: &Tenant) -> TenantStorageLayer {
43        match self {
44            StorageLayerFactory::S3(s3) => {
45                let bucket_name = tenant.s3_name.clone();
46                let layer = s3.create_storage_layer(bucket_name);
47                TenantStorageLayer::S3(layer)
48            }
49        }
50    }
51}
52
53#[derive(Clone)]
54pub enum TenantStorageLayer {
55    /// Storage layer backed by S3
56    S3(S3StorageLayer),
57}
58
59impl TenantStorageLayer {
60    /// Creates the tenant S3 bucket
61    pub async fn create_bucket(&self) -> anyhow::Result<()> {
62        match self {
63            TenantStorageLayer::S3(layer) => layer.create_bucket().await,
64        }
65    }
66
67    /// Deletes the tenant S3 bucket
68    pub async fn delete_bucket(&self) -> anyhow::Result<()> {
69        match self {
70            TenantStorageLayer::S3(layer) => layer.delete_bucket().await,
71        }
72    }
73
74    /// Create a presigned file upload URL
75    pub async fn create_presigned(
76        &self,
77        key: &str,
78        size: i64,
79    ) -> anyhow::Result<(PresignedRequest, DateTime<Utc>)> {
80        match self {
81            TenantStorageLayer::S3(layer) => layer.create_presigned(key, size).await,
82        }
83    }
84
85    /// Uploads a file to the S3 bucket for the tenant
86    pub async fn upload_file(
87        &self,
88        key: &str,
89        content_type: String,
90        body: Bytes,
91    ) -> anyhow::Result<()> {
92        match self {
93            TenantStorageLayer::S3(layer) => layer.upload_file(key, content_type, body).await,
94        }
95    }
96
97    /// Add the SNS notification to a bucket
98    pub async fn add_bucket_notifications(&self, sns_arn: &str) -> anyhow::Result<()> {
99        match self {
100            TenantStorageLayer::S3(layer) => layer.add_bucket_notifications(sns_arn).await,
101        }
102    }
103
104    /// Applies CORS rules for a bucket
105    pub async fn add_bucket_cors(&self, origins: Vec<String>) -> anyhow::Result<()> {
106        match self {
107            TenantStorageLayer::S3(layer) => layer.add_bucket_cors(origins).await,
108        }
109    }
110
111    /// Deletes the S3 file
112    pub async fn delete_file(&self, key: &str) -> anyhow::Result<()> {
113        match self {
114            TenantStorageLayer::S3(layer) => layer.delete_file(key).await,
115        }
116    }
117
118    /// Gets a byte stream for a file from S3
119    pub async fn get_file(&self, key: &str) -> anyhow::Result<FileStream> {
120        match self {
121            TenantStorageLayer::S3(layer) => layer.get_file(key).await,
122        }
123    }
124}
125
126pub(crate) trait StorageLayer {
127    /// Creates the tenant S3 bucket
128    async fn create_bucket(&self) -> anyhow::Result<()>;
129
130    /// Deletes the tenant S3 bucket
131    async fn delete_bucket(&self) -> anyhow::Result<()>;
132
133    /// Create a presigned file upload URL
134    async fn create_presigned(
135        &self,
136        key: &str,
137        size: i64,
138    ) -> anyhow::Result<(PresignedRequest, DateTime<Utc>)>;
139
140    /// Uploads a file to the S3 bucket for the tenant
141    async fn upload_file(&self, key: &str, content_type: String, body: Bytes)
142    -> anyhow::Result<()>;
143
144    /// Add the SNS notification to a bucket
145    async fn add_bucket_notifications(&self, sns_arn: &str) -> anyhow::Result<()>;
146
147    /// Applies CORS rules for a bucket
148    async fn add_bucket_cors(&self, origins: Vec<String>) -> anyhow::Result<()>;
149
150    /// Deletes the S3 file
151    async fn delete_file(&self, key: &str) -> anyhow::Result<()>;
152
153    /// Gets a byte stream for a file from S3
154    async fn get_file(&self, key: &str) -> anyhow::Result<FileStream>;
155}
156
157/// Stream of bytes from a file
158pub struct FileStream {
159    pub stream: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>,
160}
161
162impl Stream for FileStream {
163    type Item = std::io::Result<Bytes>;
164
165    fn poll_next(
166        mut self: std::pin::Pin<&mut Self>,
167        cx: &mut std::task::Context<'_>,
168    ) -> std::task::Poll<Option<Self::Item>> {
169        // Pin projection to the underlying stream
170        let stream = unsafe { self.as_mut().map_unchecked_mut(|s| &mut s.stream) };
171        stream.poll_next(cx)
172    }
173}
174
175impl FileStream {
176    /// Collect the stream to completion as a single [Bytes] buffer
177    pub async fn collect_bytes(mut self) -> anyhow::Result<Bytes> {
178        let mut output = SegmentedBuf::new();
179
180        while let Some(result) = self.next().await {
181            let chunk = result?;
182            output.push(chunk);
183        }
184
185        Ok(output.copy_to_bytes(output.remaining()))
186    }
187}