s3_algo/
upload.rs

1use super::*;
2use aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder;
3use aws_sdk_s3::primitives::ByteStream;
4
5impl S3Algo {
6    /// Upload multiple files to S3.
7    ///
8    /// `upload_files` provides counting of uploaded files and bytes through the `progress` closure:
9    ///
10    /// For common use cases it is adviced to use [`files_recursive`](files_recursive) for the `files` parameter.
11    ///
12    /// `progress` will be called after the upload of each file, with some data about that upload.
13    /// The first `usize` parameter is the number of this file in the upload, while [`RequestReport`](struct.RequestReport.html)
14    /// holds more data such as size in bytes, and the duration of the upload. It is thus possible to
15    /// report progress both in amount of files, or amount of bytes, depending on what granularity is
16    /// desired.
17    /// `progress` returns a generic `F: Future` to support async operations like, for example, logging the
18    /// results to a file; this future will be run as part of the upload algorithm.
19    ///
20    /// `default_request` constructs the default request struct - only the fields `bucket`, `key`,
21    /// `body` and `content_length` are overwritten by the upload algorithm.
22    pub async fn upload_files<P, F, I, R>(
23        &self,
24        bucket: String,
25        files: I,
26        progress: P,
27        default_request: R,
28    ) -> Result<(), Error>
29    where
30        P: Fn(RequestReport) -> F + Clone + Send + Sync + 'static,
31        F: Future<Output = ()> + Send + 'static,
32        I: Iterator<Item = ObjectSource> + Send + 'static,
33        R: Fn(&Client) -> PutObjectFluentBuilder + Clone + Unpin + Sync + Send + 'static,
34    {
35        let copy_parallelization = self.config.copy_parallelization;
36        let n_retries = self.config.algorithm.n_retries;
37
38        let timeout_state = Arc::new(Mutex::new(TimeoutState::new(
39            self.config.algorithm.clone(),
40            self.config.put_requests.clone(),
41        )));
42        let timeout_state2 = timeout_state.clone();
43
44        let jobs = files.map(move |src| {
45            let (default, bucket, s3) = (default_request.clone(), bucket.clone(), self.s3.clone());
46            s3_request(
47                move || {
48                    src.clone()
49                        .create_upload_future(s3.clone(), bucket.clone(), default.clone())
50                },
51                |_, size| size,
52                n_retries,
53                timeout_state.clone(),
54            )
55            .boxed()
56        });
57
58        // Run jobs in parallel,
59        //  adding eventual delays after each file upload and also at the end,
60        //  and counting the progress
61        stream::iter(jobs)
62            .buffer_unordered(copy_parallelization)
63            .zip(stream::iter(0..))
64            .map(|(result, i)| result.map(|result| (i, result)))
65            .try_for_each(move |(i, (mut result, _))| {
66                let progress = progress.clone();
67                let timeout_state = timeout_state2.clone();
68                async move {
69                    result.seq = i;
70                    timeout_state.lock().await.update(&result);
71                    progress(result).map(Ok).await
72                }
73            })
74            .await
75    }
76}
77
78#[derive(Clone, Debug)]
79pub enum ObjectSource {
80    File { path: PathBuf, key: String },
81    Data { data: Vec<u8>, key: String },
82}
83impl ObjectSource {
84    pub fn file(path: PathBuf, key: String) -> Self {
85        Self::File { path, key }
86    }
87    pub fn data<D: Into<Vec<u8>>>(data: D, key: String) -> Self {
88        Self::Data {
89            data: data.into(),
90            key,
91        }
92    }
93    pub async fn create_stream(&self) -> Result<(ByteStream, usize), Error> {
94        match self {
95            Self::File { path, .. } => {
96                let file = tokio::fs::File::open(path.clone()).await.with_context({
97                    let path = path.clone();
98                    move || err::Io {
99                        description: path.display().to_string(),
100                    }
101                })?;
102                let metadata = file.metadata().await.with_context({
103                    let path = path.clone();
104                    move || err::Io {
105                        description: path.display().to_string(),
106                    }
107                })?;
108
109                let len = metadata.len() as usize;
110                // let boxbody = BoxBody::new(
111                //     FramedRead::new(file, BytesCodec::new()).map_ok(bytes::BytesMut::freeze),
112                // );
113                // let sdk_body = SdkBody::from_dyn(boxbody);
114
115                Ok((ByteStream::read_from().file(file).build().await?, len))
116            }
117            Self::Data { data, .. } => Ok((data.clone().into(), data.len())),
118        }
119    }
120    pub async fn create_upload_future<R>(
121        self,
122        s3: aws_sdk_s3::Client,
123        bucket: String,
124        default: R,
125    ) -> Result<(impl Future<Output = Result<(), Error>>, usize), Error>
126    where
127        R: Fn(&Client) -> PutObjectFluentBuilder + Clone + Unpin + Sync + Send + 'static,
128    {
129        let (stream, len) = self.create_stream().await?;
130        let key = self.get_key().to_owned();
131        let (s3, bucket, default) = (s3.clone(), bucket.clone(), default.clone());
132        let future = async move {
133            default(&s3)
134                .set_bucket(Some(bucket.clone()))
135                .set_key(Some(key.clone()))
136                .set_body(Some(stream))
137                .set_content_length(Some(len as i64))
138                .send()
139                .await
140                .map_err(|e| e.into())
141                // .await
142                .map(drop)
143        };
144        Ok((future, len))
145    }
146    pub fn get_key(&self) -> &str {
147        match self {
148            Self::File { key, .. } => key,
149            Self::Data { key, .. } => key,
150        }
151    }
152}
153
154/// Convenience function (using `walkdir`) to traverse all files in directory `src_dir`. Returns an
155/// iterator that can be used as input to `S3Algo::upload_files`, which uploads files
156/// with a key equal to the file's path with `src_dir` stripped away, and with `key_prefix`
157/// prepended.
158pub fn files_recursive(
159    src_dir: PathBuf,
160    key_prefix: PathBuf,
161) -> impl Iterator<Item = ObjectSource> {
162    #[cfg(windows)]
163    use path_slash::PathExt;
164    walkdir::WalkDir::new(&src_dir)
165        .into_iter()
166        .filter_map(move |entry| {
167            let src_dir = src_dir.clone();
168            let key_prefix = key_prefix.clone();
169            entry.ok().and_then(move |entry| {
170                if entry.file_type().is_file() {
171                    let path = entry.path().to_owned();
172                    let key_suffix = path.strip_prefix(&src_dir).unwrap().to_path_buf();
173                    let key = key_prefix.join(&key_suffix);
174                    Some(ObjectSource::File {
175                        path,
176                        #[cfg(unix)]
177                        key: key.to_string_lossy().to_string(),
178                        #[cfg(windows)]
179                        key: key.to_slash_lossy().to_string(),
180                    })
181                } else {
182                    None
183                }
184            })
185        })
186}
187
188#[cfg(test)]
189mod test {
190    use super::*;
191    use tempdir::TempDir;
192    #[test]
193    fn test_files_recursive() {
194        let tmp_dir = TempDir::new("s3-testing").unwrap();
195        let dir = tmp_dir.path();
196        for i in 0..10 {
197            std::fs::write(dir.join(format!("img_{}.tif", i)), "file contents").unwrap();
198        }
199        let files = files_recursive(dir.to_owned(), PathBuf::new());
200        assert_eq!(files.count(), 10);
201    }
202}