1use std::path::PathBuf;
2use std::str::FromStr;
3use std::time::Duration;
4
5use aws_sdk_s3::model::{Delete, ObjectIdentifier};
6use aws_sdk_s3::presigning::config::PresigningConfig;
7use aws_sdk_s3::types::ByteStream;
8use aws_sdk_s3::{Client, Config, Endpoint};
10use aws_types::region::Region;
11use aws_types::Credentials;
12use http::{HeaderMap, Uri};
14use tokio::fs::File;
15use tokio::io::AsyncWriteExt;
16
17pub use attachment::{Attachment, AttachmentData};
18pub use attachment_blob::AttachmentBlob;
19
20mod attachment;
21mod attachment_blob;
22mod schema;
23
24#[tsync::tsync]
25type ID = i32;
26
27#[tsync::tsync]
28#[cfg(not(feature = "database_sqlite"))]
29type Utc = chrono::DateTime<chrono::Utc>;
30#[cfg(feature = "database_sqlite")]
31type Utc = chrono::NaiveDateTime;
32
33#[derive(Clone)]
34pub struct Storage {
35 client: Option<Client>,
36 bucket: String,
37 host: String,
38}
39
40pub struct UploadURI {
41 pub headers: HeaderMap,
42 pub uri: Uri,
43}
44impl Default for Storage {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49impl Storage {
50 pub async fn download(&self, key: String, to_path: PathBuf) -> Result<(), String> {
60 let client = self.client_or_error()?;
61
62 let response = client
63 .get_object()
64 .bucket(&self.bucket)
65 .key(key.clone())
66 .send()
67 .await
68 .map_err(|err| self.error_string("Could not download object", &key, err))?;
69
70 let data = response
71 .body
72 .collect()
73 .await
74 .map_err(|err| self.error_string("Could not download object", &key, err))?;
75
76 let mut file = File::create(to_path)
77 .await
78 .map_err(|err| self.error_string("Could not download object", &key, err))?;
79
80 file.write_all(&data.into_bytes())
81 .await
82 .map_err(|err| self.error_string("Could not download object", key, err))?;
83
84 Ok(())
85 }
86
87 pub async fn download_uri(
98 &self,
99 key: String,
100 expires_in: Option<Duration>,
101 ) -> Result<String, String> {
102 let expires_in = match expires_in {
103 None => {
104 let host = self.host.clone();
105 let host = if host.ends_with('/') {
106 host
107 } else {
108 format!("{host}/")
109 };
110 let bucket = &self.bucket;
111 return Ok(format!("{host}{bucket}/{key}"));
112 }
113 Some(time) => time,
114 };
115
116 let client = self.client_or_error()?;
117
118 let response =
119 client
120 .get_object()
121 .bucket(&self.bucket)
122 .key(key.clone())
123 .presigned(PresigningConfig::expires_in(expires_in).map_err(|err| {
124 self.error_string("Could not retrieve download URI", &key, err)
125 })?)
126 .await
127 .map_err(|err| self.error_string("Could not retrieve download URI", key, err))?;
128
129 Ok(response.uri().to_string())
130 }
131
132 pub async fn upload(
143 &self,
144 key: String,
145 bytes: Vec<u8>,
146 content_type: String,
147 _content_md5: String,
148 ) -> Result<(), String> {
149 let stream = ByteStream::from(bytes);
150
151 let client = self.client_or_error()?;
152
153 client
154 .put_object()
155 .bucket(&self.bucket)
156 .key(&key)
157 .body(stream)
158 .content_type(content_type)
159 .send()
162 .await
163 .map_err(|err| self.error_string("Could not upload object", key, err))?;
164
165 Ok(())
166 }
167
168 pub async fn upload_uri(&self, key: String, expires_in: Duration) -> Result<UploadURI, String> {
177 let client = self.client_or_error()?;
178
179 let response = client
180 .put_object()
181 .bucket(&self.bucket)
182 .key(&key)
183 .presigned(
184 PresigningConfig::expires_in(expires_in)
185 .map_err(|err| self.error_string("Could not retrieve upload URI", &key, err))?,
186 )
187 .await
188 .map_err(|err| self.error_string("Could not retrieve upload URI", key, err))?;
189
190 let upload_uri = UploadURI {
191 uri: response.uri().clone(),
192 headers: response.headers().clone(),
193 };
194
195 Ok(upload_uri)
196 }
197
198 pub async fn delete(&self, key: String) -> Result<(), String> {
206 let client = self.client_or_error()?;
207
208 client
209 .delete_object()
210 .bucket(&self.bucket)
211 .key(&key)
212 .send()
213 .await
214 .map_err(|err| self.error_string("Could not delete object", key, err))?;
215
216 Ok(())
217 }
218
219 pub async fn delete_many(&self, keys: Vec<String>) -> Result<(), String> {
227 let client = self.client_or_error()?;
228
229 let ids = keys
230 .iter()
231 .map(|k| {
232 ObjectIdentifier::builder()
233 .set_key(Some(k.to_string()))
234 .build()
235 })
236 .collect::<Vec<ObjectIdentifier>>();
237 let delete = Delete::builder().set_objects(Some(ids)).build();
238
239 client
240 .delete_objects()
241 .bucket(&self.bucket)
242 .delete(delete)
243 .send()
244 .await
245 .map_err(|err| {
246 self.error_string("Could not delete objects", format!("{keys:#?}"), err)
247 })?;
248
249 Ok(())
250 }
251
252 fn error_string(
253 &self,
254 message: &'static str,
255 key: impl std::fmt::Display,
256 error: impl std::fmt::Display,
257 ) -> String {
258 let bucket = &self.bucket;
259 format!("{message} (bucket: '{bucket}', key: '{key}', error: '{error}')")
260 }
261
262 fn client_or_error(&self) -> Result<&Client, String> {
263 self.client.as_ref().ok_or_else(|| {
264 "The storage is not available; did you set the right environment variables?".to_string()
265 })
266 }
267
268 fn check_environment_variables() {
269 let vars = vec![
270 "S3_HOST",
271 "S3_REGION",
272 "S3_BUCKET",
273 "S3_ACCESS_KEY_ID",
274 "S3_SECRET_ACCESS_KEY",
275 ];
276
277 let unset_vars = vars
278 .into_iter()
279 .filter(|v| std::env::var(v).is_err())
280 .collect::<Vec<_>>();
281
282 if !unset_vars.is_empty() {
283 println!(
284 "Warning: Storage disabled; the following variables must be set: {}",
285 unset_vars.join(", ")
286 );
287 }
288 }
289
290 fn init(
291 host: &str,
292 region: Region,
293 access_key_id: String,
294 secret_access_key: String,
295 ) -> Result<Option<Client>, String> {
296 Self::check_environment_variables();
297
298 let s3_config = Config::builder()
299 .region(region)
300 .endpoint_resolver(Endpoint::immutable(Uri::from_str(host).map_err(|err| {
301 let error = err.to_string();
302 format!("Could not initialize storage (error: '{error}')")
303 })?))
304 .credentials_provider(Credentials::new(
305 access_key_id,
306 secret_access_key,
307 None,
308 None,
309 "UNNAMED_PROVIDER",
310 ))
311 .build();
312 let client = Client::from_conf(s3_config);
313
314 Ok(Some(client))
315 }
316
317 #[must_use]
318 pub fn new() -> Self {
319 let host = std::env::var("S3_HOST").unwrap_or_else(|_| String::new());
320 let region = std::env::var("S3_REGION").unwrap_or_else(|_| String::new());
321 let bucket = std::env::var("S3_BUCKET").unwrap_or_else(|_| String::new());
322 let access_key_id = std::env::var("S3_ACCESS_KEY_ID").unwrap_or_else(|_| String::new());
323 let secret_access_key =
324 std::env::var("S3_SECRET_ACCESS_KEY").unwrap_or_else(|_| String::new());
325
326 let client = Self::init(&host, Region::new(region), access_key_id, secret_access_key)
327 .unwrap_or(None);
328
329 Self {
330 client,
331 bucket,
332 host,
333 }
334 }
335}