docbox_core/storage/
mod.rs1use aws_config::SdkConfig;
2use aws_sdk_s3::presigning::PresignedRequest;
3use bytes::{Buf, Bytes};
4use bytes_utils::SegmentedBuf;
5use chrono::{DateTime, Utc};
6use docbox_database::models::tenant::Tenant;
7use futures::{Stream, StreamExt};
8use s3::{S3StorageLayer, S3StorageLayerFactory};
9use serde::Deserialize;
10use std::pin::Pin;
11
12use crate::storage::s3::S3StorageLayerFactoryConfig;
13
14pub mod s3;
15
16#[derive(Debug, Clone, Deserialize)]
17#[serde(tag = "provider", rename_all = "snake_case")]
18pub enum StorageLayerFactoryConfig {
19 S3(S3StorageLayerFactoryConfig),
20}
21
22impl StorageLayerFactoryConfig {
23 pub fn from_env() -> anyhow::Result<Self> {
24 S3StorageLayerFactoryConfig::from_env().map(Self::S3)
25 }
26}
27
28#[derive(Clone)]
29pub enum StorageLayerFactory {
30 S3(S3StorageLayerFactory),
31}
32
33impl StorageLayerFactory {
34 pub fn from_config(aws_config: &SdkConfig, config: StorageLayerFactoryConfig) -> Self {
35 match config {
36 StorageLayerFactoryConfig::S3(config) => {
37 Self::S3(S3StorageLayerFactory::from_config(aws_config, config))
38 }
39 }
40 }
41
42 pub fn create_storage_layer(&self, tenant: &Tenant) -> TenantStorageLayer {
43 match self {
44 StorageLayerFactory::S3(s3) => {
45 let bucket_name = tenant.s3_name.clone();
46 let layer = s3.create_storage_layer(bucket_name);
47 TenantStorageLayer::S3(layer)
48 }
49 }
50 }
51}
52
53#[derive(Clone)]
54pub enum TenantStorageLayer {
55 S3(S3StorageLayer),
57}
58
59impl TenantStorageLayer {
60 pub async fn create_bucket(&self) -> anyhow::Result<()> {
62 match self {
63 TenantStorageLayer::S3(layer) => layer.create_bucket().await,
64 }
65 }
66
67 pub async fn delete_bucket(&self) -> anyhow::Result<()> {
69 match self {
70 TenantStorageLayer::S3(layer) => layer.delete_bucket().await,
71 }
72 }
73
74 pub async fn create_presigned(
76 &self,
77 key: &str,
78 size: i64,
79 ) -> anyhow::Result<(PresignedRequest, DateTime<Utc>)> {
80 match self {
81 TenantStorageLayer::S3(layer) => layer.create_presigned(key, size).await,
82 }
83 }
84
85 pub async fn upload_file(
87 &self,
88 key: &str,
89 content_type: String,
90 body: Bytes,
91 ) -> anyhow::Result<()> {
92 match self {
93 TenantStorageLayer::S3(layer) => layer.upload_file(key, content_type, body).await,
94 }
95 }
96
97 pub async fn add_bucket_notifications(&self, sns_arn: &str) -> anyhow::Result<()> {
99 match self {
100 TenantStorageLayer::S3(layer) => layer.add_bucket_notifications(sns_arn).await,
101 }
102 }
103
104 pub async fn add_bucket_cors(&self, origins: Vec<String>) -> anyhow::Result<()> {
106 match self {
107 TenantStorageLayer::S3(layer) => layer.add_bucket_cors(origins).await,
108 }
109 }
110
111 pub async fn delete_file(&self, key: &str) -> anyhow::Result<()> {
113 match self {
114 TenantStorageLayer::S3(layer) => layer.delete_file(key).await,
115 }
116 }
117
118 pub async fn get_file(&self, key: &str) -> anyhow::Result<FileStream> {
120 match self {
121 TenantStorageLayer::S3(layer) => layer.get_file(key).await,
122 }
123 }
124}
125
126pub(crate) trait StorageLayer {
127 async fn create_bucket(&self) -> anyhow::Result<()>;
129
130 async fn delete_bucket(&self) -> anyhow::Result<()>;
132
133 async fn create_presigned(
135 &self,
136 key: &str,
137 size: i64,
138 ) -> anyhow::Result<(PresignedRequest, DateTime<Utc>)>;
139
140 async fn upload_file(&self, key: &str, content_type: String, body: Bytes)
142 -> anyhow::Result<()>;
143
144 async fn add_bucket_notifications(&self, sns_arn: &str) -> anyhow::Result<()>;
146
147 async fn add_bucket_cors(&self, origins: Vec<String>) -> anyhow::Result<()>;
149
150 async fn delete_file(&self, key: &str) -> anyhow::Result<()>;
152
153 async fn get_file(&self, key: &str) -> anyhow::Result<FileStream>;
155}
156
157pub struct FileStream {
159 pub stream: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>,
160}
161
162impl Stream for FileStream {
163 type Item = std::io::Result<Bytes>;
164
165 fn poll_next(
166 mut self: std::pin::Pin<&mut Self>,
167 cx: &mut std::task::Context<'_>,
168 ) -> std::task::Poll<Option<Self::Item>> {
169 let stream = unsafe { self.as_mut().map_unchecked_mut(|s| &mut s.stream) };
171 stream.poll_next(cx)
172 }
173}
174
175impl FileStream {
176 pub async fn collect_bytes(mut self) -> anyhow::Result<Bytes> {
178 let mut output = SegmentedBuf::new();
179
180 while let Some(result) = self.next().await {
181 let chunk = result?;
182 output.push(chunk);
183 }
184
185 Ok(output.copy_to_bytes(output.remaining()))
186 }
187}