1use super::*;
2use aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder;
3use aws_sdk_s3::primitives::ByteStream;
4
5impl S3Algo {
6 pub async fn upload_files<P, F, I, R>(
23 &self,
24 bucket: String,
25 files: I,
26 progress: P,
27 default_request: R,
28 ) -> Result<(), Error>
29 where
30 P: Fn(RequestReport) -> F + Clone + Send + Sync + 'static,
31 F: Future<Output = ()> + Send + 'static,
32 I: Iterator<Item = ObjectSource> + Send + 'static,
33 R: Fn(&Client) -> PutObjectFluentBuilder + Clone + Unpin + Sync + Send + 'static,
34 {
35 let copy_parallelization = self.config.copy_parallelization;
36 let n_retries = self.config.algorithm.n_retries;
37
38 let timeout_state = Arc::new(Mutex::new(TimeoutState::new(
39 self.config.algorithm.clone(),
40 self.config.put_requests.clone(),
41 )));
42 let timeout_state2 = timeout_state.clone();
43
44 let jobs = files.map(move |src| {
45 let (default, bucket, s3) = (default_request.clone(), bucket.clone(), self.s3.clone());
46 s3_request(
47 move || {
48 src.clone()
49 .create_upload_future(s3.clone(), bucket.clone(), default.clone())
50 },
51 |_, size| size,
52 n_retries,
53 timeout_state.clone(),
54 )
55 .boxed()
56 });
57
58 stream::iter(jobs)
62 .buffer_unordered(copy_parallelization)
63 .zip(stream::iter(0..))
64 .map(|(result, i)| result.map(|result| (i, result)))
65 .try_for_each(move |(i, (mut result, _))| {
66 let progress = progress.clone();
67 let timeout_state = timeout_state2.clone();
68 async move {
69 result.seq = i;
70 timeout_state.lock().await.update(&result);
71 progress(result).map(Ok).await
72 }
73 })
74 .await
75 }
76}
77
78#[derive(Clone, Debug)]
79pub enum ObjectSource {
80 File { path: PathBuf, key: String },
81 Data { data: Vec<u8>, key: String },
82}
83impl ObjectSource {
84 pub fn file(path: PathBuf, key: String) -> Self {
85 Self::File { path, key }
86 }
87 pub fn data<D: Into<Vec<u8>>>(data: D, key: String) -> Self {
88 Self::Data {
89 data: data.into(),
90 key,
91 }
92 }
93 pub async fn create_stream(&self) -> Result<(ByteStream, usize), Error> {
94 match self {
95 Self::File { path, .. } => {
96 let file = tokio::fs::File::open(path.clone()).await.with_context({
97 let path = path.clone();
98 move || err::Io {
99 description: path.display().to_string(),
100 }
101 })?;
102 let metadata = file.metadata().await.with_context({
103 let path = path.clone();
104 move || err::Io {
105 description: path.display().to_string(),
106 }
107 })?;
108
109 let len = metadata.len() as usize;
110 Ok((ByteStream::read_from().file(file).build().await?, len))
116 }
117 Self::Data { data, .. } => Ok((data.clone().into(), data.len())),
118 }
119 }
120 pub async fn create_upload_future<R>(
121 self,
122 s3: aws_sdk_s3::Client,
123 bucket: String,
124 default: R,
125 ) -> Result<(impl Future<Output = Result<(), Error>>, usize), Error>
126 where
127 R: Fn(&Client) -> PutObjectFluentBuilder + Clone + Unpin + Sync + Send + 'static,
128 {
129 let (stream, len) = self.create_stream().await?;
130 let key = self.get_key().to_owned();
131 let (s3, bucket, default) = (s3.clone(), bucket.clone(), default.clone());
132 let future = async move {
133 default(&s3)
134 .set_bucket(Some(bucket.clone()))
135 .set_key(Some(key.clone()))
136 .set_body(Some(stream))
137 .set_content_length(Some(len as i64))
138 .send()
139 .await
140 .map_err(|e| e.into())
141 .map(drop)
143 };
144 Ok((future, len))
145 }
146 pub fn get_key(&self) -> &str {
147 match self {
148 Self::File { key, .. } => key,
149 Self::Data { key, .. } => key,
150 }
151 }
152}
153
154pub fn files_recursive(
159 src_dir: PathBuf,
160 key_prefix: PathBuf,
161) -> impl Iterator<Item = ObjectSource> {
162 #[cfg(windows)]
163 use path_slash::PathExt;
164 walkdir::WalkDir::new(&src_dir)
165 .into_iter()
166 .filter_map(move |entry| {
167 let src_dir = src_dir.clone();
168 let key_prefix = key_prefix.clone();
169 entry.ok().and_then(move |entry| {
170 if entry.file_type().is_file() {
171 let path = entry.path().to_owned();
172 let key_suffix = path.strip_prefix(&src_dir).unwrap().to_path_buf();
173 let key = key_prefix.join(&key_suffix);
174 Some(ObjectSource::File {
175 path,
176 #[cfg(unix)]
177 key: key.to_string_lossy().to_string(),
178 #[cfg(windows)]
179 key: key.to_slash_lossy().to_string(),
180 })
181 } else {
182 None
183 }
184 })
185 })
186}
187
188#[cfg(test)]
189mod test {
190 use super::*;
191 use tempdir::TempDir;
192 #[test]
193 fn test_files_recursive() {
194 let tmp_dir = TempDir::new("s3-testing").unwrap();
195 let dir = tmp_dir.path();
196 for i in 0..10 {
197 std::fs::write(dir.join(format!("img_{}.tif", i)), "file contents").unwrap();
198 }
199 let files = files_recursive(dir.to_owned(), PathBuf::new());
200 assert_eq!(files.count(), 10);
201 }
202}