1use crate::Result;
3use aws_sdk_s3::{primitives::ByteStream, Client};
4use bytes::Bytes;
5use futures::StreamExt;
6use rand::prelude::*;
7use sha2::Digest;
8use std::{
9 borrow::Cow,
10 collections::{HashMap, HashSet},
11 path::Path,
12};
13use tokio::io::AsyncReadExt;
14use tracing::instrument;
15use url::Url;
16
17const MAX_CONCURRENCY: usize = 10;
19
20#[derive(Clone, Debug)]
21pub struct TestObject {
22 pub key: String,
23 pub size: usize,
24}
25
26impl TestObject {
27 pub fn new(key: impl Into<String>, size: impl AsRef<str>) -> Self {
30 let key = key.into();
31
32 let size = byte_unit::Byte::from_str(size).unwrap();
33
34 Self {
35 key,
36 size: size.get_bytes() as usize,
37 }
38 }
39}
40
41#[derive(Clone, Debug)]
44pub struct TestObjectWithData {
45 pub key: String,
46 pub url: Url,
47 pub data: Vec<u8>,
48 pub hash: [u8; 32],
49}
50
51pub fn prepend_unique_prefix(
59 objects: impl IntoIterator<Item = TestObject>,
60) -> (String, impl IntoIterator<Item = TestObject>) {
61 let prefix = format!("{:08x}/", rand::thread_rng().next_u32());
62
63 let objects = {
64 let prefix = prefix.clone();
65
66 objects.into_iter().map(move |mut object| {
67 object.key = format!("{}{}", prefix, object.key);
68
69 object
70 })
71 };
72
73 (prefix, objects)
74}
75
76pub async fn make_test_data(
83 client: &Client,
84 bucket: &str,
85 objects: impl IntoIterator<Item = TestObject>,
86) -> Result<HashMap<String, TestObjectWithData>> {
87 let create_futs = objects.into_iter().map(|test_object| async move {
88 let data =
89 make_test_data_object(client, bucket, &test_object.key.clone(), test_object.size)
90 .await?;
91
92 Result::<_>::Ok((test_object.key, data))
93 });
94
95 let mut test_data_stream = futures::stream::iter(create_futs).buffer_unordered(MAX_CONCURRENCY);
97
98 let mut test_objects = HashMap::new();
99
100 while let Some(result) = test_data_stream.next().await {
101 let (key, data) = result?;
102
103 let mut hasher = sha2::Sha256::new();
104 hasher.update(&data);
105 let mut hash = [0u8; 32];
106 hash.copy_from_slice(&hasher.finalize());
107
108 let object = TestObjectWithData {
109 url: format!("s3://{}/{}", bucket, key).parse().unwrap(),
110 key: key.clone(),
111 data,
112 hash,
113 };
114 assert!(
115 test_objects.insert(object.key.clone(), object).is_none(),
116 "BUG: test data contains the same key '{}' more than once",
117 key
118 );
119 }
120
121 Ok(test_objects)
122}
123
124pub async fn make_test_data_object(
126 client: &Client,
127 bucket: &str,
128 key: &str,
129 size: usize,
130) -> Result<Vec<u8>> {
131 let mut rand = rand::thread_rng();
132 let mut data = vec![0u8; size];
133
134 rand.fill(&mut data[..]);
135
136 client
137 .put_object()
138 .bucket(bucket)
139 .key(key.to_string())
140 .body(ByteStream::from(Bytes::from(data.clone())))
141 .send()
142 .await?;
143
144 Result::<_>::Ok(data)
145}
146
147pub async fn validate_test_data_in_dir<Keys, Item>(
153 test_data: &HashMap<String, TestObjectWithData>,
154 path: &Path,
155 expected_keys: Keys,
156) -> Result<()>
157where
158 Keys: IntoIterator<Item = Item>,
159 Item: Into<Cow<'static, str>>,
160{
161 println!(
165 "Test data dir {} contains the following files:",
166 path.display()
167 );
168 let files = walkdir::WalkDir::new(path)
169 .into_iter()
170 .filter(|result| {
171 if let Ok(entry) = &result {
173 !entry.file_type().is_dir()
174 } else {
175 true
177 }
178 })
179 .map(|result| {
180 let entry = result?;
181
182 let relative_path = entry.path().strip_prefix(path)?.to_owned();
183
184 println!(
185 " {} ({} bytes)",
186 relative_path.display(),
187 entry.path().metadata()?.len()
188 );
189 Result::<_>::Ok(relative_path)
190 })
191 .collect::<Result<Vec<_>>>()?;
192
193 let mut expected_test_data: HashMap<String, &TestObjectWithData> = expected_keys.into_iter()
196 .map(|item| {
197 let key = item.into();
198 let data = test_data.get(key.as_ref())
199 .unwrap_or_else(|| panic!("BUG: test specifies expected key '{key}' but the `test_data` collection doesn't have such an entry"));
200
201 #[cfg(windows)]
205 let key = key.replace('/', "\\");
206
207 #[cfg(not(windows))]
208 let key = key.to_string();
209
210 (key, data)
211 })
212 .collect();
213
214 let mut expected_keys = expected_test_data
218 .keys()
219 .map(|key| key.to_string())
220 .collect::<HashSet<_>>();
221
222 for relative_path in files {
223 let key = relative_path.to_string_lossy();
224 let test_data = expected_test_data.remove(key.as_ref()).unwrap_or_else(|| {
225 panic!(
227 "Tar archive contains file `{}` which is not among the expected test data",
228 relative_path.display()
229 );
230 });
231 expected_keys.remove(key.as_ref());
232
233 let mut file = tokio::fs::File::open(path.join(&relative_path)).await?;
235 let metadata = file.metadata().await?;
236 let mut data = Vec::with_capacity(metadata.len() as usize);
237
238 file.read_to_end(&mut data).await?;
239
240 let mut hasher = sha2::Sha256::new();
241 hasher.update(&data);
242 let mut hash = [0u8; 32];
243 hash.copy_from_slice(&hasher.finalize());
244
245 assert_eq!(
246 hash,
247 test_data.hash,
248 "File '{}' (key '{}') hash doesn't match expected value",
249 relative_path.display(),
250 key
251 );
252 }
253
254 if !expected_keys.is_empty() {
255 panic!(
257 "One or more test data objects were not found in the archive: {}",
258 expected_keys.into_iter().collect::<Vec<_>>().join(",")
259 )
260 }
261
262 Ok(())
263}
264
265#[instrument(err, skip_all, fields(bucket, prefix))]
268pub async fn validate_test_data_in_s3<Keys, Item>(
269 client: &aws_sdk_s3::Client,
270 test_data: &HashMap<String, TestObjectWithData>,
271 bucket: &str,
272 prefix: &str,
273 expected_keys: Keys,
274) -> Result<()>
275where
276 Keys: IntoIterator<Item = Item>,
277 Item: Into<Cow<'static, str>>,
278{
279 let mut objects = HashMap::new();
281
282 let mut pages = client
283 .list_objects_v2()
284 .bucket(bucket)
285 .prefix(prefix)
286 .into_paginator()
287 .send();
288
289 let mut results = vec![];
292
293 while let Some(result) = pages.next().await {
294 let page = result?;
295 let result: Result<Vec<aws_sdk_s3::types::Object>> = Ok(page.contents.unwrap_or_default());
296
297 results.push(result?);
298 }
299
300 for object in results.into_iter().flatten() {
301 objects.insert(object.key().unwrap().to_owned(), object);
302 }
303
304 let mut expected_test_data: HashMap<Cow<'static, str>, &TestObjectWithData> = expected_keys.into_iter()
307 .map(|item| {
308 let key = item.into();
309 let data = test_data.get(key.as_ref())
310 .unwrap_or_else(|| panic!("BUG: test specifies expected key '{key}' but the `test_data` collection doesn't have such an entry"));
311
312 (key, data)
313 })
314 .collect();
315
316 let mut expected_keys = expected_test_data
320 .keys()
321 .map(|key| key.to_string())
322 .collect::<HashSet<_>>();
323
324 for (key, _object) in objects {
325 let relative_key = key.strip_prefix(prefix).unwrap();
328
329 let test_data = expected_test_data.remove(relative_key).unwrap_or_else(|| {
330 panic!(
332 "Bucket contains object `{}` which is not among the expected test data",
333 relative_key
334 );
335 });
336 expected_keys.remove(relative_key);
337
338 let hash = {
356 let response = client.get_object().bucket(bucket).key(&key).send().await?;
357
358 let mut body = response.body;
359 let mut hasher = sha2::Sha256::new();
360 while let Some(bytes) = body.try_next().await? {
361 hasher.update(bytes);
362 }
363 let mut hash = [0u8; 32];
364 hash.copy_from_slice(&hasher.finalize());
365 hash
366 };
367
368 assert_eq!(
369 hash, test_data.hash,
370 "S3 object '{}' (key '{}') hash doesn't match expected value",
371 relative_key, key
372 );
373 }
374
375 if !expected_keys.is_empty() {
376 panic!(
378 "One or more test data objects were not found in the archive: {}",
379 expected_keys.into_iter().collect::<Vec<_>>().join(",")
380 )
381 }
382
383 Ok(())
384}