1use crate::StorageConfig;
23use aws_sdk_s3::{
24 primitives::ByteStream,
25 types::{BucketCannedAcl, Object, ObjectCannedAcl},
26 Client, Config,
27};
28use remi::{async_trait, Blob, Bytes, Directory, File, ListBlobsRequest, UploadRequest};
29use std::{borrow::Cow, path::Path};
30
31const DEFAULT_CONTENT_TYPE: &str = "application/octet-stream";
32
33#[derive(Debug, Clone)]
35pub struct StorageService {
36 client: Client,
37 config: StorageConfig,
38}
39
40impl StorageService {
41 pub fn new(config: StorageConfig) -> StorageService {
43 let client = Client::from_conf(From::from(config.clone()));
44 StorageService { client, config }
45 }
46
47 pub fn with_sdk_conf<I: Into<Config>>(config: I) -> StorageService {
50 let client = Client::from_conf(config.into());
51 StorageService {
52 client,
53 config: StorageConfig::default(),
54 }
55 }
56
57 pub fn with_config(self, config: StorageConfig) -> StorageService {
64 StorageService {
65 client: self.client,
66 config,
67 }
68 }
69
70 fn resolve_path<P: AsRef<Path>>(&self, path: P) -> crate::Result<String> {
71 let path = path
72 .as_ref()
73 .to_str()
74 .ok_or_else(|| crate::error::lib("expected valud a utf-8 string as the path"))?;
75
76 let path = path.trim_start_matches("~/").trim_start_matches("./");
78 let prefix = self.config.prefix.as_deref().unwrap_or_default();
79
80 if prefix.is_empty() {
81 return Ok(path.to_owned());
82 }
83
84 Ok(format!(
85 "{prefix}/{path}",
86 prefix = prefix.trim_start_matches("~/").trim_start_matches("./")
87 ))
88 }
89
90 async fn s3_obj_to_blob(&self, entry: &Object) -> crate::Result<Option<Blob>> {
91 use remi::StorageService;
92
93 match entry.key() {
94 Some(key) if key.ends_with('/') => Ok(Some(Blob::Directory(Directory {
95 created_at: None,
96 name: key.to_owned(),
97 path: format!("s3://{key}"),
98 }))),
99
100 Some(key) => self.blob(key).await,
101 None => Ok(None),
102 }
103 }
104}
105
106#[async_trait]
107impl remi::StorageService for StorageService {
108 type Error = crate::Error;
109
110 fn name(&self) -> Cow<'static, str> {
111 Cow::Borrowed("remi:s3")
112 }
113
114 #[cfg_attr(
115 feature = "tracing",
116 tracing::instrument(
117 name = "remi.s3.init",
118 skip_all,
119 fields(
120 bucket = self.config.bucket,
121 remi.service = "s3"
122 )
123 )
124 )]
125 async fn init(&self) -> crate::Result<()> {
126 #[cfg(feature = "log")]
127 log::info!("ensuring that bucket [{}] exists!", self.config.bucket);
128
129 #[cfg(feature = "tracing")]
130 tracing::info!("ensuring that bucket exists");
131
132 let output = self.client.list_buckets().send().await?;
133 if !output.buckets().iter().any(|x| match x.name() {
134 Some(name) => name == self.config.bucket,
135 None => false,
136 }) {
137 #[cfg(feature = "log")]
138 log::info!(
139 "creating bucket [{}] due to no bucket existing on this AWS account",
140 self.config.bucket
141 );
142
143 #[cfg(feature = "tracing")]
144 tracing::info!("creating bucket due to the bucket not existing on this AWS account");
145
146 #[allow(unused)]
147 self.client
148 .create_bucket()
149 .bucket(&self.config.bucket)
150 .acl(
151 self.config
152 .default_bucket_acl
153 .clone()
154 .unwrap_or(BucketCannedAcl::Private),
155 )
156 .send()
157 .await
158 .map(|output| {
159 #[cfg(feature = "log")]
160 log::info!("bucket [{}] was created successfully", self.config.bucket);
161
162 #[cfg(feature = "log")]
163 log::trace!("{output:?}");
164
165 #[cfg(feature = "tracing")]
166 tracing::info!(bucket = self.config.bucket, "bucket was created successfully");
167
168 #[cfg(feature = "tracing")]
169 tracing::trace!(bucket = self.config.bucket, "{output:?}");
170 })?;
171 }
172
173 Ok(())
174 }
175
176 #[cfg_attr(
177 feature = "tracing",
178 tracing::instrument(
179 name = "remi.s3.blob.open",
180 skip(self, path),
181 fields(
182 remi.service = "s3",
183 path = %path.as_ref().display()
184 )
185 )
186 )]
187 async fn open<P: AsRef<Path> + Send>(&self, path: P) -> crate::Result<Option<Bytes>> {
188 let normalized = self.resolve_path(path)?;
189
190 #[cfg(feature = "log")]
191 log::trace!("opening file [{normalized}]");
192
193 #[cfg(feature = "tracing")]
194 tracing::trace!(path = normalized, "opening file");
195
196 let fut = self
197 .client
198 .get_object()
199 .bucket(&self.config.bucket)
200 .key(&normalized)
201 .send();
202
203 match fut.await {
204 Ok(object) => {
205 let stream = object.body;
206 let data = stream.collect().await?.into_bytes();
207
208 Ok(Some(data))
209 }
210
211 Err(e) => {
212 let err = e.into_service_error();
213 if err.is_no_such_key() {
214 return Ok(None);
215 }
216
217 Err(err.into())
218 }
219 }
220 }
221
222 #[cfg_attr(
223 feature = "tracing",
224 tracing::instrument(
225 name = "remi.s3.blob.get",
226 skip(self, path),
227 fields(
228 remi.service = "s3",
229 path = %path.as_ref().display()
230 )
231 )
232 )]
233 async fn blob<P: AsRef<Path> + Send>(&self, path: P) -> crate::Result<Option<Blob>> {
234 let normalized = self.resolve_path(path)?;
235
236 #[cfg(feature = "log")]
237 log::trace!("locating file [{normalized}]");
238
239 #[cfg(feature = "tracing")]
240 tracing::trace!(path = normalized, "locating file");
241
242 let fut = self
243 .client
244 .get_object()
245 .bucket(&self.config.bucket)
246 .key(&normalized)
247 .send();
248
249 match fut.await {
250 Ok(object) => {
251 let content_type = object.content_type().map(|x| x.to_owned());
253 let last_modified_at = object
254 .last_modified()
255 .map(|dt| dt.to_millis().expect("cant convert into millis") as u128);
256
257 let stream = object.body;
259 let data = stream.collect().await?.into_bytes();
260 let size = data.len();
261
262 Ok(Some(Blob::File(File {
263 last_modified_at,
264 metadata: object.metadata.clone().unwrap_or_default(),
265 content_type,
266 created_at: None,
267 is_symlink: false,
268 data,
269 name: normalized.clone(),
270 path: format!("s3://{normalized}"),
271 size,
272 })))
273 }
274
275 Err(e) => {
276 let err = e.into_service_error();
277 if err.is_no_such_key() {
278 return Ok(None);
279 }
280
281 Err(err.into())
282 }
283 }
284 }
285
286 #[cfg_attr(
287 feature = "tracing",
288 tracing::instrument(
289 name = "remi.s3.blob.list",
290 skip(self, path),
291 fields(
292 remi.service = "s3",
293 path = ?path.as_ref().map(|path| path.as_ref().display())
294 )
295 )
296 )]
297 async fn blobs<P: AsRef<Path> + Send>(
298 &self,
299 path: Option<P>,
300 options: Option<ListBlobsRequest>,
301 ) -> crate::Result<Vec<Blob>> {
302 let options = options.unwrap_or_default();
303 let mut blobs = Vec::new();
304 let mut req = match path {
305 Some(path) => self
306 .client
307 .list_objects_v2()
308 .bucket(&self.config.bucket)
309 .max_keys(1000)
310 .prefix(self.resolve_path(path)?),
311
312 None => {
313 let mut req = self.client.list_objects_v2().bucket(&self.config.bucket).max_keys(1000);
314 if let Some(ref prefix) = self.config.prefix {
315 req = req.prefix(prefix.trim_start_matches("~/").trim_end_matches("./"));
316 }
317
318 req
319 }
320 };
321
322 loop {
323 let resp = req.clone().send().await?;
324 let entries = resp.contents();
325
326 for entry in entries {
327 let Some(name) = entry.key() else {
328 #[cfg(feature = "log")]
329 log::warn!("skipping entry due to no name");
330
331 #[cfg(feature = "log")]
332 log::trace!("{entry:?}");
333
334 #[cfg(feature = "tracing")]
335 tracing::warn!("skipping entry due to no name");
336
337 #[cfg(feature = "tracing")]
338 tracing::trace!("{entry:?}");
339
340 continue;
341 };
342
343 if options.is_excluded(name) {
344 #[cfg(feature = "log")]
345 log::warn!("excluding entry [{name}] due to options passed in");
346
347 #[cfg(feature = "log")]
348 log::trace!("{entry:?}");
349
350 #[cfg(feature = "tracing")]
351 tracing::warn!(name, "skipping entry due to no name");
352
353 #[cfg(feature = "tracing")]
354 tracing::trace!("{entry:?}");
355
356 continue;
357 }
358
359 if !name.ends_with('/') && name.contains('.') {
361 let idx = name.chars().position(|x| x == '.');
362 if let Some(idx) = idx {
363 let ext = &name[idx + 1..];
364 if !options.is_ext_allowed(ext) {
365 #[cfg(feature = "log")]
366 log::warn!("excluding entry [{name}] due to extension [{ext}] not being allowed");
367
368 #[cfg(feature = "log")]
369 log::trace!("{entry:?}");
370
371 #[cfg(feature = "tracing")]
372 tracing::warn!(name, ext = &ext, "skipping entry due to extension not being allowed");
373
374 #[cfg(feature = "tracing")]
375 tracing::trace!("{entry:?}");
376
377 continue;
378 }
379 }
380 }
381
382 match self.s3_obj_to_blob(entry).await {
383 Ok(Some(blob)) => blobs.push(blob),
384 Ok(None) => continue,
385
386 #[allow(unused)]
387 Err(e) => {
388 #[cfg(feature = "log")]
389 log::warn!("received SDK error when trying to getting blob information: {e}");
390
391 #[cfg(feature = "log")]
392 log::trace!("{entry:?}");
393
394 #[cfg(feature = "tracing")]
395 tracing::warn!(
396 name,
397 error = %e,
398 "received SDK error when trying to getting blob information"
399 );
400
401 #[cfg(feature = "tracing")]
402 tracing::trace!("{entry:?}");
403
404 continue;
405 }
406 }
407 }
408
409 match resp.continuation_token() {
410 Some(token) => {
411 req = req.clone().continuation_token(token);
412 }
413
414 None => break,
415 }
416 }
417
418 Ok(blobs)
419 }
420
421 #[cfg_attr(
422 feature = "tracing",
423 tracing::instrument(
424 name = "remi.s3.blob.delete",
425 skip(self, path),
426 fields(
427 remi.service = "s3",
428 path = %path.as_ref().display()
429 )
430 )
431 )]
432 async fn delete<P: AsRef<Path> + Send>(&self, path: P) -> crate::Result<()> {
433 self.client
434 .delete_object()
435 .bucket(&self.config.bucket)
436 .key(self.resolve_path(path)?)
437 .send()
438 .await
439 .map(|_| ())
440 .map_err(From::from)
441 }
442
443 #[cfg_attr(
444 feature = "tracing",
445 tracing::instrument(
446 name = "remi.s3.blob.exists",
447 skip(self, path),
448 fields(
449 remi.service = "s3",
450 path = %path.as_ref().display()
451 )
452 )
453 )]
454 async fn exists<P: AsRef<Path> + Send>(&self, path: P) -> crate::Result<bool> {
455 let fut = self
456 .client
457 .head_object()
458 .bucket(&self.config.bucket)
459 .key(self.resolve_path(path)?)
460 .send();
461
462 match fut.await {
463 Ok(res) => {
464 if res.delete_marker().is_some() {
465 return Ok(false);
466 }
467
468 Ok(true)
469 }
470
471 Err(e) => {
472 let inner = e.into_service_error();
473 if inner.is_not_found() {
474 return Ok(false);
475 }
476
477 Err(inner.into())
478 }
479 }
480 }
481
482 #[cfg_attr(
483 feature = "tracing",
484 tracing::instrument(
485 name = "remi.s3.blob.upload",
486 skip(self, path, options),
487 fields(
488 remi.service = "s3",
489 path = %path.as_ref().display()
490 )
491 )
492 )]
493 async fn upload<P: AsRef<Path> + Send>(&self, path: P, options: UploadRequest) -> crate::Result<()> {
494 let normalized = self.resolve_path(path)?;
495 let content_type = options.content_type.unwrap_or(DEFAULT_CONTENT_TYPE.into());
496
497 #[cfg(feature = "log")]
498 log::trace!("uploading object [{normalized}] with content type [{content_type}]");
499
500 #[cfg(feature = "tracing")]
501 tracing::trace!(content_type, "uploading object with content type to Amazon S3");
502
503 let len = options.data.len();
504 let stream = ByteStream::from(options.data);
505
506 self.client
507 .put_object()
508 .bucket(&self.config.bucket)
509 .key(normalized)
510 .acl(
511 self.config
512 .default_object_acl
513 .clone()
514 .unwrap_or(ObjectCannedAcl::BucketOwnerFullControl),
515 )
516 .body(stream)
517 .content_type(content_type)
518 .content_length(len.try_into().expect("unable to convert usize ~> i64"))
519 .set_metadata(match options.metadata.is_empty() {
520 true => None,
521 false => Some(options.metadata.clone()),
522 })
523 .send()
524 .await
525 .map(|_| ())
526 .map_err(From::from)
527 }
528
529 #[cfg(feature = "unstable")]
530 #[cfg_attr(any(noeldoc, docsrs), doc(cfg(feature = "unstable")))]
531 #[cfg_attr(feature = "tracing", tracing::instrument(name = "remi.s3.healthcheck", skip_all))]
532 async fn healthcheck(&self) -> Result<(), Self::Error> {
533 #[cfg(feature = "log")]
534 log::trace!("performing healthcheck...");
535
536 #[cfg(feature = "tracing")]
537 tracing::trace!("performing healthcheck...");
538
539 self.client
540 .head_bucket()
541 .bucket(&self.config.bucket)
542 .send()
543 .await
544 .map(|_| ())
545 .map_err(From::from)
546 }
547}
548
549#[cfg(test)]
550mod tests {
551 use super::*;
552
553 #[test]
554 fn resolve_path_without_prefix() {
555 let storage = StorageService::new(StorageConfig::default());
556 assert_eq!(storage.resolve_path("./weow.txt").unwrap(), String::from("weow.txt"));
557 assert_eq!(storage.resolve_path("~/weow.txt").unwrap(), String::from("weow.txt"));
558 assert_eq!(storage.resolve_path("weow.txt").unwrap(), String::from("weow.txt"));
559 assert_eq!(
560 storage.resolve_path("~/weow/fluff/wooo.exe").unwrap(),
561 String::from("weow/fluff/wooo.exe")
562 );
563
564 let storage = StorageService::new(StorageConfig {
565 prefix: Some(String::from("wow/epic/sauce")),
566 ..Default::default()
567 });
568
569 assert_eq!(
570 storage.resolve_path("./weow.txt").unwrap(),
571 String::from("wow/epic/sauce/weow.txt")
572 );
573
574 assert_eq!(
575 storage.resolve_path("~/weow.txt").unwrap(),
576 String::from("wow/epic/sauce/weow.txt")
577 );
578
579 assert_eq!(
580 storage.resolve_path("weow.txt").unwrap(),
581 String::from("wow/epic/sauce/weow.txt")
582 );
583
584 assert_eq!(
585 storage.resolve_path("~/weow/fluff/wooo.exe").unwrap(),
586 String::from("wow/epic/sauce/weow/fluff/wooo.exe")
587 );
588 }
589
590 #[test]
591 fn resolve_path_with_prefix() {
592 let storage = StorageService::new(StorageConfig {
593 prefix: Some("wwww".into()),
594 ..Default::default()
595 });
596
597 assert_eq!(
598 storage.resolve_path("./weow.txt").unwrap(),
599 String::from("wwww/weow.txt")
600 );
601
602 assert_eq!(
603 storage.resolve_path("~/weow.txt").unwrap(),
604 String::from("wwww/weow.txt")
605 );
606
607 assert_eq!(storage.resolve_path("weow.txt").unwrap(), String::from("wwww/weow.txt"));
608 assert_eq!(
609 storage.resolve_path("~/weow/fluff/wooo.exe").unwrap(),
610 String::from("wwww/weow/fluff/wooo.exe")
611 );
612
613 let storage = StorageService::new(StorageConfig {
614 prefix: Some(String::from("wwww/wow/epic/sauce")),
615 ..Default::default()
616 });
617
618 assert_eq!(
619 storage.resolve_path("./weow.txt").unwrap(),
620 String::from("wwww/wow/epic/sauce/weow.txt")
621 );
622
623 assert_eq!(
624 storage.resolve_path("~/weow.txt").unwrap(),
625 String::from("wwww/wow/epic/sauce/weow.txt")
626 );
627
628 assert_eq!(
629 storage.resolve_path("weow.txt").unwrap(),
630 String::from("wwww/wow/epic/sauce/weow.txt")
631 );
632
633 assert_eq!(
634 storage.resolve_path("~/weow/fluff/wooo.exe").unwrap(),
635 String::from("wwww/wow/epic/sauce/weow/fluff/wooo.exe")
636 );
637
638 let storage = StorageService::new(StorageConfig {
639 prefix: Some("~/hello".into()),
640 ..Default::default()
641 });
642
643 assert_eq!(
644 storage.resolve_path("./weow.txt").unwrap(),
645 String::from("hello/weow.txt")
646 );
647
648 assert_eq!(
649 storage.resolve_path("~/weow.txt").unwrap(),
650 String::from("hello/weow.txt")
651 );
652
653 assert_eq!(
654 storage.resolve_path("weow.txt").unwrap(),
655 String::from("hello/weow.txt")
656 );
657
658 assert_eq!(
659 storage.resolve_path("~/weow/fluff/wooo.exe").unwrap(),
660 String::from("hello/weow/fluff/wooo.exe")
661 );
662 }
663}