remi_s3/
service.rs

1// ๐Ÿปโ€โ„๏ธ๐Ÿงถ remi-rs: Asynchronous Rust crate to handle communication between applications and object storage providers
2// Copyright (c) 2022-2025 Noelware, LLC. <team@noelware.org>
3//
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in all
12// copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20// SOFTWARE.
21
22use crate::StorageConfig;
23use aws_sdk_s3::{
24    primitives::ByteStream,
25    types::{BucketCannedAcl, Object, ObjectCannedAcl},
26    Client, Config,
27};
28use remi::{async_trait, Blob, Bytes, Directory, File, ListBlobsRequest, UploadRequest};
29use std::{borrow::Cow, path::Path};
30
31const DEFAULT_CONTENT_TYPE: &str = "application/octet-stream";
32
33/// Represents an implementation of [`StorageService`] for Amazon Simple Storage Service.
34#[derive(Debug, Clone)]
35pub struct StorageService {
36    client: Client,
37    config: StorageConfig,
38}
39
40impl StorageService {
41    /// Creates a [`StorageService`] with a given storage service configuration.
42    pub fn new(config: StorageConfig) -> StorageService {
43        let client = Client::from_conf(From::from(config.clone()));
44        StorageService { client, config }
45    }
46
47    /// Creates a new [`StorageService`] with a implementator of [`Config`] that can
48    /// represent the AWS SDK S3 configuration that you want.
49    pub fn with_sdk_conf<I: Into<Config>>(config: I) -> StorageService {
50        let client = Client::from_conf(config.into());
51        StorageService {
52            client,
53            config: StorageConfig::default(),
54        }
55    }
56
57    /// Overwrites a [`StorageConfig`] instance on this service without modifying the
58    /// actual SDK client. This is useful if you used the [`StorageService::with_sdk_conf`]
59    /// method.
60    ///
61    /// If you wish to modify the SDK client with a [`StorageConfig`], then use the [`StorageService::new`]
62    /// method instead.
63    pub fn with_config(self, config: StorageConfig) -> StorageService {
64        StorageService {
65            client: self.client,
66            config,
67        }
68    }
69
70    fn resolve_path<P: AsRef<Path>>(&self, path: P) -> crate::Result<String> {
71        let path = path
72            .as_ref()
73            .to_str()
74            .ok_or_else(|| crate::error::lib("expected valud a utf-8 string as the path"))?;
75
76        // trim `./` and `~/` since S3 doesn't accept ./ or ~/ as valid paths
77        let path = path.trim_start_matches("~/").trim_start_matches("./");
78        let prefix = self.config.prefix.as_deref().unwrap_or_default();
79
80        if prefix.is_empty() {
81            return Ok(path.to_owned());
82        }
83
84        Ok(format!(
85            "{prefix}/{path}",
86            prefix = prefix.trim_start_matches("~/").trim_start_matches("./")
87        ))
88    }
89
90    async fn s3_obj_to_blob(&self, entry: &Object) -> crate::Result<Option<Blob>> {
91        use remi::StorageService;
92
93        match entry.key() {
94            Some(key) if key.ends_with('/') => Ok(Some(Blob::Directory(Directory {
95                created_at: None,
96                name: key.to_owned(),
97                path: format!("s3://{key}"),
98            }))),
99
100            Some(key) => self.blob(key).await,
101            None => Ok(None),
102        }
103    }
104}
105
106#[async_trait]
107impl remi::StorageService for StorageService {
108    type Error = crate::Error;
109
110    fn name(&self) -> Cow<'static, str> {
111        Cow::Borrowed("remi:s3")
112    }
113
114    #[cfg_attr(
115        feature = "tracing",
116        tracing::instrument(
117            name = "remi.s3.init",
118            skip_all,
119            fields(
120                bucket = self.config.bucket,
121                remi.service = "s3"
122            )
123        )
124    )]
125    async fn init(&self) -> crate::Result<()> {
126        #[cfg(feature = "log")]
127        log::info!("ensuring that bucket [{}] exists!", self.config.bucket);
128
129        #[cfg(feature = "tracing")]
130        tracing::info!("ensuring that bucket exists");
131
132        let output = self.client.list_buckets().send().await?;
133        if !output.buckets().iter().any(|x| match x.name() {
134            Some(name) => name == self.config.bucket,
135            None => false,
136        }) {
137            #[cfg(feature = "log")]
138            log::info!(
139                "creating bucket [{}] due to no bucket existing on this AWS account",
140                self.config.bucket
141            );
142
143            #[cfg(feature = "tracing")]
144            tracing::info!("creating bucket due to the bucket not existing on this AWS account");
145
146            #[allow(unused)]
147            self.client
148                .create_bucket()
149                .bucket(&self.config.bucket)
150                .acl(
151                    self.config
152                        .default_bucket_acl
153                        .clone()
154                        .unwrap_or(BucketCannedAcl::Private),
155                )
156                .send()
157                .await
158                .map(|output| {
159                    #[cfg(feature = "log")]
160                    log::info!("bucket [{}] was created successfully", self.config.bucket);
161
162                    #[cfg(feature = "log")]
163                    log::trace!("{output:?}");
164
165                    #[cfg(feature = "tracing")]
166                    tracing::info!(bucket = self.config.bucket, "bucket was created successfully");
167
168                    #[cfg(feature = "tracing")]
169                    tracing::trace!(bucket = self.config.bucket, "{output:?}");
170                })?;
171        }
172
173        Ok(())
174    }
175
176    #[cfg_attr(
177        feature = "tracing",
178        tracing::instrument(
179            name = "remi.s3.blob.open",
180            skip(self, path),
181            fields(
182                remi.service = "s3",
183                path = %path.as_ref().display()
184            )
185        )
186    )]
187    async fn open<P: AsRef<Path> + Send>(&self, path: P) -> crate::Result<Option<Bytes>> {
188        let normalized = self.resolve_path(path)?;
189
190        #[cfg(feature = "log")]
191        log::trace!("opening file [{normalized}]");
192
193        #[cfg(feature = "tracing")]
194        tracing::trace!(path = normalized, "opening file");
195
196        let fut = self
197            .client
198            .get_object()
199            .bucket(&self.config.bucket)
200            .key(&normalized)
201            .send();
202
203        match fut.await {
204            Ok(object) => {
205                let stream = object.body;
206                let data = stream.collect().await?.into_bytes();
207
208                Ok(Some(data))
209            }
210
211            Err(e) => {
212                let err = e.into_service_error();
213                if err.is_no_such_key() {
214                    return Ok(None);
215                }
216
217                Err(err.into())
218            }
219        }
220    }
221
222    #[cfg_attr(
223        feature = "tracing",
224        tracing::instrument(
225            name = "remi.s3.blob.get",
226            skip(self, path),
227            fields(
228                remi.service = "s3",
229                path = %path.as_ref().display()
230            )
231        )
232    )]
233    async fn blob<P: AsRef<Path> + Send>(&self, path: P) -> crate::Result<Option<Blob>> {
234        let normalized = self.resolve_path(path)?;
235
236        #[cfg(feature = "log")]
237        log::trace!("locating file [{normalized}]");
238
239        #[cfg(feature = "tracing")]
240        tracing::trace!(path = normalized, "locating file");
241
242        let fut = self
243            .client
244            .get_object()
245            .bucket(&self.config.bucket)
246            .key(&normalized)
247            .send();
248
249        match fut.await {
250            Ok(object) => {
251                // Get metadata before we read the body
252                let content_type = object.content_type().map(|x| x.to_owned());
253                let last_modified_at = object
254                    .last_modified()
255                    .map(|dt| dt.to_millis().expect("cant convert into millis") as u128);
256
257                // Read the entire body of the object itself
258                let stream = object.body;
259                let data = stream.collect().await?.into_bytes();
260                let size = data.len();
261
262                Ok(Some(Blob::File(File {
263                    last_modified_at,
264                    metadata: object.metadata.clone().unwrap_or_default(),
265                    content_type,
266                    created_at: None,
267                    is_symlink: false,
268                    data,
269                    name: normalized.clone(),
270                    path: format!("s3://{normalized}"),
271                    size,
272                })))
273            }
274
275            Err(e) => {
276                let err = e.into_service_error();
277                if err.is_no_such_key() {
278                    return Ok(None);
279                }
280
281                Err(err.into())
282            }
283        }
284    }
285
286    #[cfg_attr(
287        feature = "tracing",
288        tracing::instrument(
289            name = "remi.s3.blob.list",
290            skip(self, path),
291            fields(
292                remi.service = "s3",
293                path = ?path.as_ref().map(|path| path.as_ref().display())
294            )
295        )
296    )]
297    async fn blobs<P: AsRef<Path> + Send>(
298        &self,
299        path: Option<P>,
300        options: Option<ListBlobsRequest>,
301    ) -> crate::Result<Vec<Blob>> {
302        let options = options.unwrap_or_default();
303        let mut blobs = Vec::new();
304        let mut req = match path {
305            Some(path) => self
306                .client
307                .list_objects_v2()
308                .bucket(&self.config.bucket)
309                .max_keys(1000)
310                .prefix(self.resolve_path(path)?),
311
312            None => {
313                let mut req = self.client.list_objects_v2().bucket(&self.config.bucket).max_keys(1000);
314                if let Some(ref prefix) = self.config.prefix {
315                    req = req.prefix(prefix.trim_start_matches("~/").trim_end_matches("./"));
316                }
317
318                req
319            }
320        };
321
322        loop {
323            let resp = req.clone().send().await?;
324            let entries = resp.contents();
325
326            for entry in entries {
327                let Some(name) = entry.key() else {
328                    #[cfg(feature = "log")]
329                    log::warn!("skipping entry due to no name");
330
331                    #[cfg(feature = "log")]
332                    log::trace!("{entry:?}");
333
334                    #[cfg(feature = "tracing")]
335                    tracing::warn!("skipping entry due to no name");
336
337                    #[cfg(feature = "tracing")]
338                    tracing::trace!("{entry:?}");
339
340                    continue;
341                };
342
343                if options.is_excluded(name) {
344                    #[cfg(feature = "log")]
345                    log::warn!("excluding entry [{name}] due to options passed in");
346
347                    #[cfg(feature = "log")]
348                    log::trace!("{entry:?}");
349
350                    #[cfg(feature = "tracing")]
351                    tracing::warn!(name, "skipping entry due to no name");
352
353                    #[cfg(feature = "tracing")]
354                    tracing::trace!("{entry:?}");
355
356                    continue;
357                }
358
359                // most files include a '.'
360                if !name.ends_with('/') && name.contains('.') {
361                    let idx = name.chars().position(|x| x == '.');
362                    if let Some(idx) = idx {
363                        let ext = &name[idx + 1..];
364                        if !options.is_ext_allowed(ext) {
365                            #[cfg(feature = "log")]
366                            log::warn!("excluding entry [{name}] due to extension [{ext}] not being allowed");
367
368                            #[cfg(feature = "log")]
369                            log::trace!("{entry:?}");
370
371                            #[cfg(feature = "tracing")]
372                            tracing::warn!(name, ext = &ext, "skipping entry due to extension not being allowed");
373
374                            #[cfg(feature = "tracing")]
375                            tracing::trace!("{entry:?}");
376
377                            continue;
378                        }
379                    }
380                }
381
382                match self.s3_obj_to_blob(entry).await {
383                    Ok(Some(blob)) => blobs.push(blob),
384                    Ok(None) => continue,
385
386                    #[allow(unused)]
387                    Err(e) => {
388                        #[cfg(feature = "log")]
389                        log::warn!("received SDK error when trying to getting blob information: {e}");
390
391                        #[cfg(feature = "log")]
392                        log::trace!("{entry:?}");
393
394                        #[cfg(feature = "tracing")]
395                        tracing::warn!(
396                            name,
397                            error = %e,
398                            "received SDK error when trying to getting blob information"
399                        );
400
401                        #[cfg(feature = "tracing")]
402                        tracing::trace!("{entry:?}");
403
404                        continue;
405                    }
406                }
407            }
408
409            match resp.continuation_token() {
410                Some(token) => {
411                    req = req.clone().continuation_token(token);
412                }
413
414                None => break,
415            }
416        }
417
418        Ok(blobs)
419    }
420
421    #[cfg_attr(
422        feature = "tracing",
423        tracing::instrument(
424            name = "remi.s3.blob.delete",
425            skip(self, path),
426            fields(
427                remi.service = "s3",
428                path = %path.as_ref().display()
429            )
430        )
431    )]
432    async fn delete<P: AsRef<Path> + Send>(&self, path: P) -> crate::Result<()> {
433        self.client
434            .delete_object()
435            .bucket(&self.config.bucket)
436            .key(self.resolve_path(path)?)
437            .send()
438            .await
439            .map(|_| ())
440            .map_err(From::from)
441    }
442
443    #[cfg_attr(
444        feature = "tracing",
445        tracing::instrument(
446            name = "remi.s3.blob.exists",
447            skip(self, path),
448            fields(
449                remi.service = "s3",
450                path = %path.as_ref().display()
451            )
452        )
453    )]
454    async fn exists<P: AsRef<Path> + Send>(&self, path: P) -> crate::Result<bool> {
455        let fut = self
456            .client
457            .head_object()
458            .bucket(&self.config.bucket)
459            .key(self.resolve_path(path)?)
460            .send();
461
462        match fut.await {
463            Ok(res) => {
464                if res.delete_marker().is_some() {
465                    return Ok(false);
466                }
467
468                Ok(true)
469            }
470
471            Err(e) => {
472                let inner = e.into_service_error();
473                if inner.is_not_found() {
474                    return Ok(false);
475                }
476
477                Err(inner.into())
478            }
479        }
480    }
481
482    #[cfg_attr(
483        feature = "tracing",
484        tracing::instrument(
485            name = "remi.s3.blob.upload",
486            skip(self, path, options),
487            fields(
488                remi.service = "s3",
489                path = %path.as_ref().display()
490            )
491        )
492    )]
493    async fn upload<P: AsRef<Path> + Send>(&self, path: P, options: UploadRequest) -> crate::Result<()> {
494        let normalized = self.resolve_path(path)?;
495        let content_type = options.content_type.unwrap_or(DEFAULT_CONTENT_TYPE.into());
496
497        #[cfg(feature = "log")]
498        log::trace!("uploading object [{normalized}] with content type [{content_type}]");
499
500        #[cfg(feature = "tracing")]
501        tracing::trace!(content_type, "uploading object with content type to Amazon S3");
502
503        let len = options.data.len();
504        let stream = ByteStream::from(options.data);
505
506        self.client
507            .put_object()
508            .bucket(&self.config.bucket)
509            .key(normalized)
510            .acl(
511                self.config
512                    .default_object_acl
513                    .clone()
514                    .unwrap_or(ObjectCannedAcl::BucketOwnerFullControl),
515            )
516            .body(stream)
517            .content_type(content_type)
518            .content_length(len.try_into().expect("unable to convert usize ~> i64"))
519            .set_metadata(match options.metadata.is_empty() {
520                true => None,
521                false => Some(options.metadata.clone()),
522            })
523            .send()
524            .await
525            .map(|_| ())
526            .map_err(From::from)
527    }
528
529    #[cfg(feature = "unstable")]
530    #[cfg_attr(any(noeldoc, docsrs), doc(cfg(feature = "unstable")))]
531    #[cfg_attr(feature = "tracing", tracing::instrument(name = "remi.s3.healthcheck", skip_all))]
532    async fn healthcheck(&self) -> Result<(), Self::Error> {
533        #[cfg(feature = "log")]
534        log::trace!("performing healthcheck...");
535
536        #[cfg(feature = "tracing")]
537        tracing::trace!("performing healthcheck...");
538
539        self.client
540            .head_bucket()
541            .bucket(&self.config.bucket)
542            .send()
543            .await
544            .map(|_| ())
545            .map_err(From::from)
546    }
547}
548
549#[cfg(test)]
550mod tests {
551    use super::*;
552
553    #[test]
554    fn resolve_path_without_prefix() {
555        let storage = StorageService::new(StorageConfig::default());
556        assert_eq!(storage.resolve_path("./weow.txt").unwrap(), String::from("weow.txt"));
557        assert_eq!(storage.resolve_path("~/weow.txt").unwrap(), String::from("weow.txt"));
558        assert_eq!(storage.resolve_path("weow.txt").unwrap(), String::from("weow.txt"));
559        assert_eq!(
560            storage.resolve_path("~/weow/fluff/wooo.exe").unwrap(),
561            String::from("weow/fluff/wooo.exe")
562        );
563
564        let storage = StorageService::new(StorageConfig {
565            prefix: Some(String::from("wow/epic/sauce")),
566            ..Default::default()
567        });
568
569        assert_eq!(
570            storage.resolve_path("./weow.txt").unwrap(),
571            String::from("wow/epic/sauce/weow.txt")
572        );
573
574        assert_eq!(
575            storage.resolve_path("~/weow.txt").unwrap(),
576            String::from("wow/epic/sauce/weow.txt")
577        );
578
579        assert_eq!(
580            storage.resolve_path("weow.txt").unwrap(),
581            String::from("wow/epic/sauce/weow.txt")
582        );
583
584        assert_eq!(
585            storage.resolve_path("~/weow/fluff/wooo.exe").unwrap(),
586            String::from("wow/epic/sauce/weow/fluff/wooo.exe")
587        );
588    }
589
590    #[test]
591    fn resolve_path_with_prefix() {
592        let storage = StorageService::new(StorageConfig {
593            prefix: Some("wwww".into()),
594            ..Default::default()
595        });
596
597        assert_eq!(
598            storage.resolve_path("./weow.txt").unwrap(),
599            String::from("wwww/weow.txt")
600        );
601
602        assert_eq!(
603            storage.resolve_path("~/weow.txt").unwrap(),
604            String::from("wwww/weow.txt")
605        );
606
607        assert_eq!(storage.resolve_path("weow.txt").unwrap(), String::from("wwww/weow.txt"));
608        assert_eq!(
609            storage.resolve_path("~/weow/fluff/wooo.exe").unwrap(),
610            String::from("wwww/weow/fluff/wooo.exe")
611        );
612
613        let storage = StorageService::new(StorageConfig {
614            prefix: Some(String::from("wwww/wow/epic/sauce")),
615            ..Default::default()
616        });
617
618        assert_eq!(
619            storage.resolve_path("./weow.txt").unwrap(),
620            String::from("wwww/wow/epic/sauce/weow.txt")
621        );
622
623        assert_eq!(
624            storage.resolve_path("~/weow.txt").unwrap(),
625            String::from("wwww/wow/epic/sauce/weow.txt")
626        );
627
628        assert_eq!(
629            storage.resolve_path("weow.txt").unwrap(),
630            String::from("wwww/wow/epic/sauce/weow.txt")
631        );
632
633        assert_eq!(
634            storage.resolve_path("~/weow/fluff/wooo.exe").unwrap(),
635            String::from("wwww/wow/epic/sauce/weow/fluff/wooo.exe")
636        );
637
638        let storage = StorageService::new(StorageConfig {
639            prefix: Some("~/hello".into()),
640            ..Default::default()
641        });
642
643        assert_eq!(
644            storage.resolve_path("./weow.txt").unwrap(),
645            String::from("hello/weow.txt")
646        );
647
648        assert_eq!(
649            storage.resolve_path("~/weow.txt").unwrap(),
650            String::from("hello/weow.txt")
651        );
652
653        assert_eq!(
654            storage.resolve_path("weow.txt").unwrap(),
655            String::from("hello/weow.txt")
656        );
657
658        assert_eq!(
659            storage.resolve_path("~/weow/fluff/wooo.exe").unwrap(),
660            String::from("hello/weow/fluff/wooo.exe")
661        );
662    }
663}