filesync/
s3.rs

1//! Provides a FileSource for a path in an S3 bucket.
2
3use std::path::{Path, PathBuf};
4
5use async_trait::async_trait;
6use aws_sdk_s3::Client;
7use chrono::{DateTime, NaiveDateTime, Utc};
8use thiserror::Error as ErrorTrait;
9
10use crate::{FileEntry, FileSource};
11
12/// Error type for `S3Files` errors.
13#[derive(Debug, ErrorTrait)]
14pub enum S3Error {
15    #[error("One of the objects returned does not have a key")]
16    ObjectMissingKey,
17
18    #[error("One of the objects returned has an incorrect prefix")]
19    ObjectWrongPrefix,
20
21    #[error(transparent)]
22    ByteStreamError(#[from] aws_sdk_s3::primitives::ByteStreamError),
23
24    #[error(transparent)]
25    S3Error(#[from] aws_sdk_s3::Error),
26}
27
28/// A [`FileSource`] for files under a path in an S3 bucket.
29///
30/// Depends on the `aws-sdk-s3` crate to read and write files.
31pub struct S3Files {
32    client: Client,
33    bucket: String,
34    prefix: PathBuf,
35    use_etag_as_hash: bool,
36}
37
38impl S3Files {
39    /// Create a new `S3Files` for a path in an S3 bucket.
40    ///
41    /// If the `use_etag_as_hash` flag is set, the ETag of each S3 object will be assumed to
42    /// be an MD5 hash of the contents (if it is a 128 hex value).
43    pub fn new<S: AsRef<str>, P: AsRef<Path>>(
44        client: Client,
45        bucket: S,
46        prefix: P,
47        use_etag_as_hash: bool,
48    ) -> Self {
49        S3Files {
50            client,
51            bucket: bucket.as_ref().to_owned(),
52            prefix: prefix.as_ref().to_owned(),
53            use_etag_as_hash,
54        }
55    }
56}
57
58#[async_trait]
59impl FileSource for S3Files {
60    type Error = S3Error;
61
62    async fn list_files(&mut self) -> Result<Vec<FileEntry>, Self::Error> {
63        let empty_path: PathBuf = PathBuf::new();
64
65        let response = self
66            .client
67            .list_objects_v2()
68            .bucket(self.bucket.clone())
69            .prefix(self.prefix.display().to_string())
70            .send()
71            .await
72            .map_err(aws_sdk_s3::Error::from)?;
73
74        let mut files = vec![];
75
76        if let Some(contents) = response.contents {
77            for object in contents {
78                let key: PathBuf = object
79                    .key
80                    .as_ref()
81                    .map(PathBuf::from)
82                    .ok_or(S3Error::ObjectMissingKey)?
83                    .strip_prefix(&self.prefix)
84                    .map_err(|_| S3Error::ObjectWrongPrefix)?
85                    .to_owned();
86
87                if key != empty_path {
88                    let modified = object.last_modified.and_then(|date_time| {
89                        NaiveDateTime::from_timestamp_opt(
90                            date_time.secs(),
91                            date_time.subsec_nanos(),
92                        )
93                        .map(|x| x.and_utc())
94                    });
95
96                    let md5_hash = match self.use_etag_as_hash {
97                        true => object.e_tag.and_then(|etag| {
98                            let digest: Option<u128> =
99                                u128::from_str_radix(etag.trim_matches('"'), 16).ok();
100
101                            digest
102                        }),
103                        false => None,
104                    };
105
106                    files.push(FileEntry {
107                        path: key,
108                        size: u64::try_from(object.size).ok(),
109                        modified,
110                        md5_hash,
111                    });
112                }
113            }
114        }
115
116        Ok(files)
117    }
118
119    async fn read_file<P: AsRef<Path> + Send>(&mut self, path: P) -> Result<Vec<u8>, Self::Error> {
120        let mut key = self.prefix.clone();
121        key.push(path.as_ref());
122        let key = key.display().to_string();
123
124        let output = self
125            .client
126            .get_object()
127            .bucket(self.bucket.clone())
128            .key(key)
129            .send()
130            .await
131            .map_err(aws_sdk_s3::Error::from)?;
132
133        let stream = output.body.collect().await?.to_vec();
134
135        Ok(stream)
136    }
137
138    async fn write_file<P: AsRef<Path> + Send>(
139        &mut self,
140        path: P,
141        bytes: &[u8],
142    ) -> Result<(), Self::Error> {
143        let mut key = self.prefix.clone();
144        key.push(path.as_ref());
145        let key = key.display().to_string();
146
147        let stream = aws_sdk_s3::primitives::ByteStream::from(bytes.to_owned());
148
149        self.client
150            .put_object()
151            .bucket(self.bucket.clone())
152            .key(key)
153            .body(stream)
154            .send()
155            .await
156            .map_err(aws_sdk_s3::Error::from)?;
157
158        Ok(())
159    }
160
161    async fn set_modified<P: AsRef<Path> + Send>(
162        &mut self,
163        _path: P,
164        _modified: Option<DateTime<Utc>>,
165    ) -> Result<bool, Self::Error> {
166        Ok(false)
167    }
168}