ssstar_testing/
test_data.rs

1//! Create test data in S3-compatible object storage
2use crate::Result;
3use aws_sdk_s3::{primitives::ByteStream, Client};
4use bytes::Bytes;
5use futures::StreamExt;
6use rand::prelude::*;
7use sha2::Digest;
8use std::{
9    borrow::Cow,
10    collections::{HashMap, HashSet},
11    path::Path,
12};
13use tokio::io::AsyncReadExt;
14use tracing::instrument;
15use url::Url;
16
17/// Max concurrent S3 operations when dealing with test data
18const MAX_CONCURRENCY: usize = 10;
19
20#[derive(Clone, Debug)]
21pub struct TestObject {
22    pub key: String,
23    pub size: usize,
24}
25
26impl TestObject {
27    /// Make a new test object spec with the size specified as a string so we can use
28    /// human-friendly units like "10 KB" or "20 MiB"
29    pub fn new(key: impl Into<String>, size: impl AsRef<str>) -> Self {
30        let key = key.into();
31
32        let size = byte_unit::Byte::from_str(size).unwrap();
33
34        Self {
35            key,
36            size: size.get_bytes() as usize,
37        }
38    }
39}
40
41/// The same test object spec as in [`TestObject`], but with the data that is written to the object
42/// as well
43#[derive(Clone, Debug)]
44pub struct TestObjectWithData {
45    pub key: String,
46    pub url: Url,
47    pub data: Vec<u8>,
48    pub hash: [u8; 32],
49}
50
51/// Generate a unique prefix ending in a `/` character, and prepend it to the `key` in a collection
52/// of [`TestObject`]s.
53///
54/// Returns the unique prefix an an iterator that yields the modified test objects.
55///
56/// This is useful when running tests against a real S3 bucket, where multiple runs of the same
57/// test may write to the bucket so each test's object keys must be unique
58pub fn prepend_unique_prefix(
59    objects: impl IntoIterator<Item = TestObject>,
60) -> (String, impl IntoIterator<Item = TestObject>) {
61    let prefix = format!("{:08x}/", rand::thread_rng().next_u32());
62
63    let objects = {
64        let prefix = prefix.clone();
65
66        objects.into_iter().map(move |mut object| {
67            object.key = format!("{}{}", prefix, object.key);
68
69            object
70        })
71    };
72
73    (prefix, objects)
74}
75
76/// Generate one or more test objects in a bucket.
77///
78/// Each object has a size specified.  Random data will be generated for each object.
79///
80/// The return value upon success is the same test objects specified on input, but with test data
81/// included.  The key to the hash table is the object key
82pub async fn make_test_data(
83    client: &Client,
84    bucket: &str,
85    objects: impl IntoIterator<Item = TestObject>,
86) -> Result<HashMap<String, TestObjectWithData>> {
87    let create_futs = objects.into_iter().map(|test_object| async move {
88        let data =
89            make_test_data_object(client, bucket, &test_object.key.clone(), test_object.size)
90                .await?;
91
92        Result::<_>::Ok((test_object.key, data))
93    });
94
95    // Run these futures in parallel as part of a stream
96    let mut test_data_stream = futures::stream::iter(create_futs).buffer_unordered(MAX_CONCURRENCY);
97
98    let mut test_objects = HashMap::new();
99
100    while let Some(result) = test_data_stream.next().await {
101        let (key, data) = result?;
102
103        let mut hasher = sha2::Sha256::new();
104        hasher.update(&data);
105        let mut hash = [0u8; 32];
106        hash.copy_from_slice(&hasher.finalize());
107
108        let object = TestObjectWithData {
109            url: format!("s3://{}/{}", bucket, key).parse().unwrap(),
110            key: key.clone(),
111            data,
112            hash,
113        };
114        assert!(
115            test_objects.insert(object.key.clone(), object).is_none(),
116            "BUG: test data contains the same key '{}' more than once",
117            key
118        );
119    }
120
121    Ok(test_objects)
122}
123
124/// Generate test data for and upload a single test object
125pub async fn make_test_data_object(
126    client: &Client,
127    bucket: &str,
128    key: &str,
129    size: usize,
130) -> Result<Vec<u8>> {
131    let mut rand = rand::thread_rng();
132    let mut data = vec![0u8; size];
133
134    rand.fill(&mut data[..]);
135
136    client
137        .put_object()
138        .bucket(bucket)
139        .key(key.to_string())
140        .body(ByteStream::from(Bytes::from(data.clone())))
141        .send()
142        .await?;
143
144    Result::<_>::Ok(data)
145}
146
147/// Validate the test data in a hash map against files in a directory where the file paths relative
148/// to `path` correspond to the keys in [`TestObjectWithData`].
149///
150/// This assumes that the tar archive has been extracted to local storage somewhere for validation
151/// purposes.
152pub async fn validate_test_data_in_dir<Keys, Item>(
153    test_data: &HashMap<String, TestObjectWithData>,
154    path: &Path,
155    expected_keys: Keys,
156) -> Result<()>
157where
158    Keys: IntoIterator<Item = Item>,
159    Item: Into<Cow<'static, str>>,
160{
161    // First recursively list all files in `path` (with their paths relative to `path` so it will
162    // match the corresponding object store keys) and verify the files on disk and the expected
163    // test data objects match exactly
164    println!(
165        "Test data dir  {} contains the following files:",
166        path.display()
167    );
168    let files = walkdir::WalkDir::new(path)
169        .into_iter()
170        .filter(|result| {
171            // We are not interested in directories, just files
172            if let Ok(entry) = &result {
173                !entry.file_type().is_dir()
174            } else {
175                // Errors should always be passed on to the next stage so they get reported
176                true
177            }
178        })
179        .map(|result| {
180            let entry = result?;
181
182            let relative_path = entry.path().strip_prefix(path)?.to_owned();
183
184            println!(
185                "  {} ({} bytes)",
186                relative_path.display(),
187                entry.path().metadata()?.len()
188            );
189            Result::<_>::Ok(relative_path)
190        })
191        .collect::<Result<Vec<_>>>()?;
192
193    // Verify all of the expected keys are actually present in the `test_data` hash table, and make
194    // a new hash table of just the expected keys
195    let mut expected_test_data: HashMap<String, &TestObjectWithData> = expected_keys.into_iter()
196        .map(|item| {
197            let key = item.into();
198            let data = test_data.get(key.as_ref())
199                .unwrap_or_else(|| panic!("BUG: test specifies expected key '{key}' but the `test_data` collection doesn't have such an entry"));
200
201            // On Windows, the file paths listed in `files` will use `\` path separators, but our
202            // tests always specify expected keys using `/` separators.  Rewrite the expected keys
203            // here
204            #[cfg(windows)]
205            let key = key.replace('/', "\\");
206
207            #[cfg(not(windows))]
208            let key = key.to_string();
209
210            (key, data)
211        })
212        .collect();
213
214    // Make a set of all expected object keys, and go file by file making sure there was an object key with
215    // the same name, then remove it from the set so that we can also list excess object keys that
216    // don't correspond to any files
217    let mut expected_keys = expected_test_data
218        .keys()
219        .map(|key| key.to_string())
220        .collect::<HashSet<_>>();
221
222    for relative_path in files {
223        let key = relative_path.to_string_lossy();
224        let test_data = expected_test_data.remove(key.as_ref()).unwrap_or_else(|| {
225            // There was no object with this name
226            panic!(
227                "Tar archive contains file `{}` which is not among the expected test data",
228                relative_path.display()
229            );
230        });
231        expected_keys.remove(key.as_ref());
232
233        // Verify the file contents matches the expected data
234        let mut file = tokio::fs::File::open(path.join(&relative_path)).await?;
235        let metadata = file.metadata().await?;
236        let mut data = Vec::with_capacity(metadata.len() as usize);
237
238        file.read_to_end(&mut data).await?;
239
240        let mut hasher = sha2::Sha256::new();
241        hasher.update(&data);
242        let mut hash = [0u8; 32];
243        hash.copy_from_slice(&hasher.finalize());
244
245        assert_eq!(
246            hash,
247            test_data.hash,
248            "File '{}' (key '{}') hash doesn't match expected value",
249            relative_path.display(),
250            key
251        );
252    }
253
254    if !expected_keys.is_empty() {
255        // Some test data objects were not present in the path
256        panic!(
257            "One or more test data objects were not found in the archive: {}",
258            expected_keys.into_iter().collect::<Vec<_>>().join(",")
259        )
260    }
261
262    Ok(())
263}
264
265/// Validate the test data in a hash map against an S3 bucket to which an archive containing the
266/// test data has been extracted.
267#[instrument(err, skip_all, fields(bucket, prefix))]
268pub async fn validate_test_data_in_s3<Keys, Item>(
269    client: &aws_sdk_s3::Client,
270    test_data: &HashMap<String, TestObjectWithData>,
271    bucket: &str,
272    prefix: &str,
273    expected_keys: Keys,
274) -> Result<()>
275where
276    Keys: IntoIterator<Item = Item>,
277    Item: Into<Cow<'static, str>>,
278{
279    // First list all objects in the bucket and prefix
280    let mut objects = HashMap::new();
281
282    let mut pages = client
283        .list_objects_v2()
284        .bucket(bucket)
285        .prefix(prefix)
286        .into_paginator()
287        .send();
288
289    // Translate this stream of pages of object listings into a stream of AWS SDK
290    // 'Object' structs so we can process them one at a time
291    let mut results = vec![];
292
293    while let Some(result) = pages.next().await {
294        let page = result?;
295        let result: Result<Vec<aws_sdk_s3::types::Object>> = Ok(page.contents.unwrap_or_default());
296
297        results.push(result?);
298    }
299
300    for object in results.into_iter().flatten() {
301        objects.insert(object.key().unwrap().to_owned(), object);
302    }
303
304    // Verify all of the expected keys are actually present in the `test_data` hash table, and make
305    // a new hash table of just the expected keys
306    let mut expected_test_data: HashMap<Cow<'static, str>, &TestObjectWithData> = expected_keys.into_iter()
307        .map(|item| {
308            let key = item.into();
309            let data = test_data.get(key.as_ref())
310                .unwrap_or_else(|| panic!("BUG: test specifies expected key '{key}' but the `test_data` collection doesn't have such an entry"));
311
312            (key, data)
313        })
314        .collect();
315
316    // Make a set of all expected object keys, and go object by object making sure there was an object key with
317    // the same name, then remove it from the set so that we can also list excess object keys that
318    // don't correspond to any objects
319    let mut expected_keys = expected_test_data
320        .keys()
321        .map(|key| key.to_string())
322        .collect::<HashSet<_>>();
323
324    for (key, _object) in objects {
325        // In the test data hashmap, keys are identified without whatever prefix was used when
326        // extracting, so use that form here
327        let relative_key = key.strip_prefix(prefix).unwrap();
328
329        let test_data = expected_test_data.remove(relative_key).unwrap_or_else(|| {
330            // There was no object with this name
331            panic!(
332                "Bucket contains object `{}` which is not among the expected test data",
333                relative_key
334            );
335        });
336        expected_keys.remove(relative_key);
337
338        // Verify the object contents matches the expected data.
339        //
340        // This usually requires reading the entire object back and calculating the hash, even
341        // though the extract operation populates the SHA256 hash header.  That's not enough to
342        // avoid having to read the whole data because:
343        //
344        // - For multi-part uploads, the SHA256 checksum that gets reported is a hash of the hashes
345        // of all of the parts, not the hash of the contents.  When we generate test data we don't
346        // know how it will be broken up into parts, so we dont' know these per-part hashes.  That
347        // means for the biggest of objects (by nature multipart uploads are performed on objects
348        // large enough to exceed the multipart threshold) we still have to download and recompute
349        // the contents
350        // - As of January 2023, the latest MinIO is a new and interesting kind of broken.  It
351        // computes SHA256 hashes for multipart uploads, but it does it in a way that doesn't match
352        // the AWS behavior, so rather than try to guess which S3 impl this is and calculate the
353        // hash in the appropriate way, it's easier to just download the object and compute its
354        // hash.
355        let hash = {
356            let response = client.get_object().bucket(bucket).key(&key).send().await?;
357
358            let mut body = response.body;
359            let mut hasher = sha2::Sha256::new();
360            while let Some(bytes) = body.try_next().await? {
361                hasher.update(bytes);
362            }
363            let mut hash = [0u8; 32];
364            hash.copy_from_slice(&hasher.finalize());
365            hash
366        };
367
368        assert_eq!(
369            hash, test_data.hash,
370            "S3 object '{}' (key '{}') hash doesn't match expected value",
371            relative_key, key
372        );
373    }
374
375    if !expected_keys.is_empty() {
376        // Some test data objects were not present in the path
377        panic!(
378            "One or more test data objects were not found in the archive: {}",
379            expected_keys.into_iter().collect::<Vec<_>>().join(",")
380        )
381    }
382
383    Ok(())
384}