mountpoint_s3_fs/data_cache/
express_data_cache.rs

1use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
2use crate::ServerSideEncryption;
3use crate::metrics::defs::{
4    ATTR_CACHE, CACHE_EXPRESS, CACHE_GET_ERRORS, CACHE_GET_IO_SIZE, CACHE_GET_LATENCY, CACHE_OVERSIZED_OBJECTS,
5    CACHE_PUT_ERRORS, CACHE_PUT_IO_SIZE, CACHE_PUT_LATENCY,
6};
7use crate::object::ObjectId;
8use std::collections::HashMap;
9use std::time::Instant;
10
11use async_trait::async_trait;
12use base64ct::{Base64, Encoding};
13use bytes::{Bytes, BytesMut};
14use futures::{StreamExt, pin_mut};
15use mountpoint_s3_client::ObjectClient;
16use mountpoint_s3_client::checksums::crc32c::{self, Crc32c};
17use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
18use mountpoint_s3_client::types::{
19    ChecksumMode, ClientBackpressureHandle, GetBodyPart, GetObjectParams, GetObjectResponse, PutObjectSingleParams,
20    UploadChecksum,
21};
22use sha2::{Digest, Sha256};
23use tracing::Instrument;
24
25use mountpoint_s3_client::checksums::crc32c_from_base64;
26
27const CACHE_VERSION: &str = "V2";
28
29/// Configuration for a [ExpressDataCache].
30#[derive(Debug)]
31pub struct ExpressDataCacheConfig {
32    /// Name of the S3 Express bucket to store the blocks.
33    pub bucket_name: String,
34    /// Name of the mounted bucket.
35    pub source_bucket_name: String,
36    /// Size of data blocks.
37    pub block_size: u64,
38    /// The maximum size of an object to be cached.
39    pub max_object_size: usize,
40    /// The SSE to be used in PUT requests to the cache bucket.
41    pub sse: ServerSideEncryption,
42}
43
44impl ExpressDataCacheConfig {
45    pub fn new(bucket_name: &str, source_bucket_name: &str) -> Self {
46        Self {
47            bucket_name: bucket_name.to_owned(),
48            source_bucket_name: source_bucket_name.to_owned(),
49            block_size: 1024 * 1024,      // 1 MiB
50            max_object_size: 1024 * 1024, // 1 MiB
51            sse: ServerSideEncryption::default(),
52        }
53    }
54
55    pub fn block_size(mut self, block_size: u64) -> Self {
56        self.block_size = block_size;
57        self
58    }
59
60    pub fn max_object_size(mut self, max_object_size: usize) -> Self {
61        self.max_object_size = max_object_size;
62        self
63    }
64
65    pub fn sse(mut self, sse: ServerSideEncryption) -> Self {
66        self.sse = sse;
67        self
68    }
69}
70
71/// A data cache on S3 Express One Zone that can be shared across Mountpoint instances.
72pub struct ExpressDataCache<Client: ObjectClient> {
73    client: Client,
74    prefix: String,
75    config: ExpressDataCacheConfig,
76}
77
78impl<S, C> From<ObjectClientError<S, C>> for DataCacheError
79where
80    S: std::error::Error + Send + Sync + 'static,
81    C: std::error::Error + Send + Sync + 'static,
82{
83    fn from(e: ObjectClientError<S, C>) -> Self {
84        DataCacheError::IoFailure(e.into())
85    }
86}
87
88impl<Client> ExpressDataCache<Client>
89where
90    Client: ObjectClient + Send + Sync + 'static,
91{
92    /// Create a new instance.
93    pub fn new(client: Client, config: ExpressDataCacheConfig) -> Self {
94        Self {
95            client,
96            prefix: build_prefix(&config.source_bucket_name, config.block_size),
97            config,
98        }
99    }
100
101    pub async fn make_put_object_request<'a>(
102        &self,
103        mut params: PutObjectSingleParams,
104        object_key: &str,
105        data: impl AsRef<[u8]> + Send + 'a,
106    ) -> Result<(), DataCacheError> {
107        let (sse_type, key_id) = self
108            .config
109            .sse
110            .clone()
111            .into_inner()
112            .map_err(|err| DataCacheError::IoFailure(err.into()))?;
113        params = params.server_side_encryption(sse_type);
114        params = params.ssekms_key_id(key_id);
115
116        let result = self
117            .client
118            .put_object_single(&self.config.bucket_name, object_key, &params, data)
119            .in_current_span()
120            .await
121            .map_err(|err| DataCacheError::IoFailure(err.into()))?;
122
123        // Verify that headers of the PUT response match the expected SSE
124        if let Err(err) = self
125            .config
126            .sse
127            .verify_response(result.sse_type.as_deref(), result.sse_kms_key_id.as_deref())
128        {
129            tracing::error!(object_key=?object_key, error=?err, "Unexpected SSE in PutObject response from S3. A cache block may be stored with wrong encryption settings.");
130            // Reaching this point is very unlikely and means that SSE settings were corrupted in transit or on S3 side, this may be a sign of a bug
131            // in CRT code or S3. Thus, we terminate Mountpoint to send the most noticeable signal to customer about the issue. We prefer exiting
132            // instead of returning an error because:
133            // 1. this error would only be reported to logs because the cache population is an async process
134            // 2. the reported error is severe as the object was already uploaded to S3.
135            std::process::exit(1);
136        }
137
138        Ok(())
139    }
140
141    pub async fn verify_cache_valid(&self) -> Result<(), DataCacheError> {
142        let object_key = format!("{}/_mountpoint_cache_metadata", &self.prefix);
143        // This data is human-readable, and not expected to be read by Mountpoint.
144        // The file format used here is NOT stable.
145        // For now, let's just include the data that's guaranteed to be correct as it's what
146        // calculates the prefix.
147        let data = format!(
148            "source_bucket={}\nblock_size={}",
149            self.config.source_bucket_name, self.config.block_size
150        );
151
152        // put_object is sufficient for validating cache, as S3 Directory buckets only support
153        // read-only, or read-write. Write implies read access.
154        // Validating we're in a directory bucket by using the `EXPRESS_ONEZONE` storage class.
155        let params = PutObjectSingleParams::new().storage_class("EXPRESS_ONEZONE".to_string());
156        self.make_put_object_request(params, &object_key, data).await
157    }
158
159    // Ensure the flow-control window is large enough for reading a block of data if backpressure is enabled.
160    fn ensure_read_window(&self, backpressure_handle: Option<&mut impl ClientBackpressureHandle>) {
161        if let Some(backpressure_handle) = backpressure_handle {
162            backpressure_handle.increment_read_window(self.config.block_size as usize);
163        }
164    }
165
166    async fn read_block(
167        &self,
168        cache_key: &ObjectId,
169        block_idx: BlockIndex,
170        block_offset: u64,
171        object_size: usize,
172    ) -> DataCacheResult<Option<ChecksummedBytes>> {
173        if object_size > self.config.max_object_size {
174            metrics::counter!(CACHE_OVERSIZED_OBJECTS, ATTR_CACHE => CACHE_EXPRESS).increment(1);
175            return Ok(None);
176        }
177
178        if block_offset != block_idx * self.config.block_size {
179            return Err(DataCacheError::InvalidBlockOffset);
180        }
181
182        let object_key = get_s3_key(&self.prefix, cache_key, block_idx);
183        let mut result = match self
184            .client
185            .get_object(
186                &self.config.bucket_name,
187                &object_key,
188                &GetObjectParams::new().checksum_mode(Some(ChecksumMode::Enabled)),
189            )
190            .await
191        {
192            Ok(result) => result,
193            Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey(_))) => {
194                return Ok(None);
195            }
196            Err(e) => {
197                return Err(DataCacheError::IoFailure(e.into()));
198            }
199        };
200        let mut backpressure_handle = result.backpressure_handle().cloned();
201
202        // Guarantee that the request will start even in case of `initial_read_window == 0`.
203        self.ensure_read_window(backpressure_handle.as_mut());
204
205        let mut buffer: Bytes = Bytes::new();
206        pin_mut!(result);
207        while let Some(chunk) = result.next().await {
208            match chunk {
209                Ok(GetBodyPart { offset, data: body }) => {
210                    if offset != buffer.len() as u64 {
211                        return Err(DataCacheError::InvalidBlockOffset);
212                    }
213
214                    buffer = if buffer.is_empty() {
215                        body
216                    } else {
217                        // Unlikely: we expect `get_object` to return a single chunk.
218                        let mut buffer = BytesMut::from(buffer);
219                        buffer.extend_from_slice(&body);
220                        buffer.freeze()
221                    };
222
223                    // Ensure the flow-control window is large enough.
224                    self.ensure_read_window(backpressure_handle.as_mut());
225                }
226                Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey(_))) => {
227                    return Ok(None);
228                }
229                Err(e) => {
230                    return Err(DataCacheError::IoFailure(e.into()));
231                }
232            }
233        }
234
235        let object_metadata = result.get_object_metadata();
236
237        let checksum = result
238            .get_object_checksum()
239            .map_err(|_| DataCacheError::InvalidBlockChecksum)?;
240        let crc32c_b64 = checksum
241            .checksum_crc32c
242            .ok_or_else(|| DataCacheError::InvalidBlockChecksum)?;
243        let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|_| DataCacheError::InvalidBlockChecksum)?;
244
245        let block_metadata = BlockMetadata::new(
246            block_idx,
247            block_offset,
248            cache_key,
249            &self.config.source_bucket_name,
250            crc32c,
251        );
252        block_metadata.validate_object_metadata(&object_metadata)?;
253
254        Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c)))
255    }
256
257    async fn write_block(
258        &self,
259        cache_key: ObjectId,
260        block_idx: BlockIndex,
261        block_offset: u64,
262        bytes: ChecksummedBytes,
263        object_size: usize,
264    ) -> DataCacheResult<()> {
265        if object_size > self.config.max_object_size {
266            metrics::counter!(CACHE_OVERSIZED_OBJECTS, ATTR_CACHE => CACHE_EXPRESS).increment(1);
267            return Ok(());
268        }
269
270        if block_offset != block_idx * self.config.block_size {
271            return Err(DataCacheError::InvalidBlockOffset);
272        }
273
274        let object_key = get_s3_key(&self.prefix, &cache_key, block_idx);
275
276        let (data, checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?;
277        let block_metadata = BlockMetadata::new(
278            block_idx,
279            block_offset,
280            &cache_key,
281            &self.config.source_bucket_name,
282            checksum,
283        );
284
285        self.make_put_object_request(block_metadata.to_put_object_params(), &object_key, data)
286            .await
287    }
288}
289
290#[async_trait]
291impl<Client> DataCache for ExpressDataCache<Client>
292where
293    Client: ObjectClient + Send + Sync + 'static,
294{
295    async fn get_block(
296        &self,
297        cache_key: &ObjectId,
298        block_idx: BlockIndex,
299        block_offset: u64,
300        object_size: usize,
301    ) -> DataCacheResult<Option<ChecksummedBytes>> {
302        let start = Instant::now();
303        let result = match self.read_block(cache_key, block_idx, block_offset, object_size).await {
304            Ok(Some(data)) => {
305                metrics::histogram!(CACHE_GET_IO_SIZE, ATTR_CACHE => CACHE_EXPRESS).record(data.len() as f64);
306                Ok(Some(data))
307            }
308            Ok(None) => Ok(None),
309            Err(err) => {
310                metrics::counter!(CACHE_GET_ERRORS, ATTR_CACHE => CACHE_EXPRESS).increment(1);
311                Err(err)
312            }
313        };
314        metrics::histogram!(CACHE_GET_LATENCY, ATTR_CACHE => CACHE_EXPRESS).record(start.elapsed().as_micros() as f64);
315        result
316    }
317
318    async fn put_block(
319        &self,
320        cache_key: ObjectId,
321        block_idx: BlockIndex,
322        block_offset: u64,
323        bytes: ChecksummedBytes,
324        object_size: usize,
325    ) -> DataCacheResult<()> {
326        let start = Instant::now();
327        let bytes_len = bytes.len();
328        let result = match self
329            .write_block(cache_key, block_idx, block_offset, bytes, object_size)
330            .await
331        {
332            Ok(()) => {
333                metrics::histogram!(CACHE_PUT_IO_SIZE, ATTR_CACHE => CACHE_EXPRESS).record(bytes_len as f64);
334                Ok(())
335            }
336            Err(err) => {
337                metrics::counter!(CACHE_PUT_ERRORS, ATTR_CACHE => CACHE_EXPRESS).increment(1);
338                Err(err)
339            }
340        };
341        metrics::histogram!(CACHE_PUT_LATENCY, ATTR_CACHE => CACHE_EXPRESS).record(start.elapsed().as_micros() as f64);
342        result
343    }
344
345    fn block_size(&self) -> u64 {
346        self.config.block_size
347    }
348}
349
350/// Metadata about the cached object to ensure that the object we've retrieved is the one we were
351/// wanting to get (and avoid collisions with the key).
352/// On miss, bypass the cache and go to the main data source.
353#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
354#[derive(Clone, Debug, PartialEq, Eq)]
355struct BlockMetadata {
356    block_idx: BlockIndex,
357    block_offset: u64,
358    etag: String,
359    source_key: String,
360    source_bucket_name: String,
361    data_checksum: u32,
362    header_checksum: u32,
363}
364
365impl BlockMetadata {
366    pub fn new(
367        block_idx: BlockIndex,
368        block_offset: u64,
369        cache_key: &ObjectId,
370        source_bucket_name: &str,
371        data_checksum: Crc32c,
372    ) -> Self {
373        let header_checksum =
374            Self::get_header_checksum(block_idx, block_offset, cache_key, source_bucket_name, data_checksum).value();
375        Self {
376            block_idx,
377            block_offset,
378            etag: cache_key.etag().as_str().to_string(),
379            source_key: cache_key.key().to_string(),
380            source_bucket_name: source_bucket_name.to_string(),
381            data_checksum: data_checksum.value(),
382            header_checksum,
383        }
384    }
385
386    /// Build parameters to be used when running a PutObject for this block
387    pub fn to_put_object_params(&self) -> PutObjectSingleParams {
388        // Convert to object metadata that is HTTP header safe (ASCII only)
389        let source_key_encoded = Base64::encode_string(self.source_key.as_bytes());
390        let object_metadata = HashMap::from([
391            ("cache-version".to_string(), CACHE_VERSION.to_string()),
392            ("block-idx".to_string(), format!("{}", self.block_idx)),
393            ("block-offset".to_string(), format!("{}", self.block_offset)),
394            ("etag".to_string(), self.etag.clone()),
395            ("source-key".to_string(), source_key_encoded),
396            ("source-bucket-name".to_string(), self.source_bucket_name.clone()),
397            ("header-checksum".to_string(), format!("{}", self.header_checksum)),
398        ]);
399
400        PutObjectSingleParams::new()
401            .object_metadata(object_metadata)
402            .checksum(Some(UploadChecksum::Crc32c(Crc32c::new(self.data_checksum))))
403    }
404
405    /// Validate the object metadata headers received match this BlockMetadata object.
406    pub fn validate_object_metadata(&self, headers: &HashMap<String, String>) -> DataCacheResult<()> {
407        self.validate_header(headers, "cache-version", |version| version == CACHE_VERSION)?;
408        self.validate_header(headers, "block-idx", |block_idx| {
409            block_idx.parse() == Ok(self.block_idx)
410        })?;
411        self.validate_header(headers, "block-offset", |block_offset| {
412            block_offset.parse() == Ok(self.block_offset)
413        })?;
414        self.validate_header(headers, "etag", |etag| etag == self.etag)?;
415        self.validate_header(headers, "source-key", |source_key| {
416            source_key == Base64::encode_string(self.source_key.as_bytes())
417        })?;
418        self.validate_header(headers, "source-bucket-name", |source_bucket_name| {
419            source_bucket_name == self.source_bucket_name
420        })?;
421        self.validate_header(headers, "header-checksum", |header_checksum| {
422            header_checksum.parse() == Ok(self.header_checksum)
423        })?;
424
425        Ok(())
426    }
427
428    fn validate_header<F: Fn(&str) -> bool>(
429        &self,
430        headers: &HashMap<String, String>,
431        header: &str,
432        is_valid: F,
433    ) -> DataCacheResult<()> {
434        let value = headers
435            .get(header)
436            .ok_or(DataCacheError::InvalidBlockHeader(header.to_string()))?;
437        is_valid(value)
438            .then_some(())
439            .ok_or(DataCacheError::InvalidBlockHeader(header.to_string()))
440    }
441
442    fn get_header_checksum(
443        block_idx: BlockIndex,
444        block_offset: u64,
445        cache_key: &ObjectId,
446        source_bucket_name: &str,
447        data_checksum: Crc32c,
448    ) -> Crc32c {
449        let mut hasher = crc32c::Hasher::new();
450        hasher.update(CACHE_VERSION.as_bytes());
451        hasher.update(&block_idx.to_be_bytes());
452        hasher.update(&block_offset.to_be_bytes());
453        hasher.update(cache_key.etag().as_str().as_bytes());
454        hasher.update(cache_key.key().as_bytes());
455        hasher.update(source_bucket_name.as_bytes());
456        hasher.update(&data_checksum.value().to_be_bytes());
457        hasher.finalize()
458    }
459}
460
461/// Get the prefix for objects we'll be creating in S3
462pub fn build_prefix(source_bucket_name: &str, block_size: u64) -> String {
463    hex::encode(
464        Sha256::new()
465            .chain_update(CACHE_VERSION.as_bytes())
466            .chain_update(block_size.to_be_bytes())
467            .chain_update(source_bucket_name.as_bytes())
468            .finalize(),
469    )
470}
471
472/// Get the S3 key this block should be written to or read from.
473pub fn get_s3_key(prefix: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String {
474    let hashed_cache_key = hex::encode(
475        Sha256::new()
476            .chain_update(cache_key.key())
477            .chain_update(cache_key.etag().as_str())
478            .finalize(),
479    );
480    format!("{prefix}/{hashed_cache_key}/{block_idx:010}")
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486    use crate::checksums::ChecksummedBytes;
487    use crate::sync::Arc;
488    use proptest::{prop_assert, proptest};
489
490    use mountpoint_s3_client::failure_client::{CountdownFailureConfig, countdown_failure_client};
491    use mountpoint_s3_client::mock_client::{MockClient, MockClientError};
492    use mountpoint_s3_client::types::ETag;
493    use test_case::test_case;
494
495    #[test_case(1024, 512 * 1024; "block_size smaller than part_size")]
496    #[test_case(8 * 1024 * 1024, 512 * 1024; "block_size larger than part_size")]
497    #[tokio::test]
498    async fn test_put_get(part_size: usize, block_size: u64) {
499        let bucket = "test-bucket";
500        let client = Arc::new(
501            MockClient::config()
502                .bucket(bucket)
503                .part_size(part_size)
504                .enable_backpressure(true)
505                .initial_read_window_size(part_size)
506                .build(),
507        );
508
509        let config = ExpressDataCacheConfig::new(bucket, "unique source description").block_size(block_size);
510        let cache = ExpressDataCache::new(client, config);
511
512        let data_1 = ChecksummedBytes::new("Foo".into());
513        let data_2 = ChecksummedBytes::new("Bar".into());
514        let data_3 = ChecksummedBytes::new("a".repeat(block_size as usize).into());
515
516        let object_1_size = data_1.len() + data_3.len();
517        let object_2_size = data_2.len();
518
519        let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
520        let cache_key_2 = ObjectId::new(
521            "longkey_".repeat(128), // 1024 bytes, max length for S3 keys
522            ETag::for_tests(),
523        );
524
525        let block = cache
526            .get_block(&cache_key_1, 0, 0, object_1_size)
527            .await
528            .expect("cache should be accessible");
529        assert!(
530            block.is_none(),
531            "no entry should be available to return but got {block:?}",
532        );
533
534        // PUT and GET, OK?
535        cache
536            .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size)
537            .await
538            .expect("cache should be accessible");
539        let entry = cache
540            .get_block(&cache_key_1, 0, 0, object_1_size)
541            .await
542            .expect("cache should be accessible")
543            .expect("cache entry should be returned");
544        assert!(entry.validate().is_ok(), "CRC32C should match");
545        assert_eq!(
546            data_1, entry,
547            "cache entry returned should match original bytes after put"
548        );
549
550        // PUT AND GET block for a second key, OK?
551        cache
552            .put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size)
553            .await
554            .expect("cache should be accessible");
555        let entry = cache
556            .get_block(&cache_key_2, 0, 0, object_2_size)
557            .await
558            .expect("cache should be accessible")
559            .expect("cache entry should be returned");
560        assert!(entry.validate().is_ok(), "CRC32C should match");
561        assert_eq!(
562            data_2, entry,
563            "cache entry returned should match original bytes after put"
564        );
565
566        // PUT AND GET a second block in a cache entry, OK?
567        cache
568            .put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size)
569            .await
570            .expect("cache should be accessible");
571        let entry = cache
572            .get_block(&cache_key_1, 1, block_size, object_1_size)
573            .await
574            .expect("cache should be accessible")
575            .expect("cache entry should be returned");
576        assert!(entry.validate().is_ok(), "CRC32C should match");
577        assert_eq!(
578            data_3, entry,
579            "cache entry returned should match original bytes after put"
580        );
581
582        // Entry 1's first block still intact
583        let entry = cache
584            .get_block(&cache_key_1, 0, 0, object_1_size)
585            .await
586            .expect("cache should be accessible")
587            .expect("cache entry should be returned");
588        assert!(entry.validate().is_ok(), "CRC32C should match");
589        assert_eq!(
590            data_1, entry,
591            "cache entry returned should match original bytes after put"
592        );
593    }
594
595    #[tokio::test]
596    async fn large_object_bypassed() {
597        let bucket = "test-bucket";
598        let client = Arc::new(
599            MockClient::config()
600                .bucket(bucket)
601                .part_size(8 * 1024 * 1024)
602                .enable_backpressure(true)
603                .initial_read_window_size(8 * 1024 * 1024)
604                .build(),
605        );
606        let cache = ExpressDataCache::new(
607            client.clone(),
608            ExpressDataCacheConfig::new(bucket, "unique source description"),
609        );
610        let data_1 = vec![0u8; 1024 * 1024 + 1];
611        let data_1 = ChecksummedBytes::new(data_1.into());
612        let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
613        // PUT and GET for a large object should be no-op
614        cache
615            .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), data_1.len())
616            .await
617            .expect("cache should be accessible");
618        let get_result = cache
619            .get_block(&cache_key_1, 0, 0, data_1.len())
620            .await
621            .expect("cache should be accessible");
622        assert!(get_result.is_none());
623        assert_eq!(client.object_count(), 0, "cache must be empty");
624    }
625
626    #[tokio::test]
627    async fn test_get_validate_failure() {
628        let source_bucket = "source-bucket";
629        let bucket = "test-bucket";
630        let client = Arc::new(
631            MockClient::config()
632                .bucket(bucket)
633                .part_size(8 * 1024 * 1024)
634                .enable_backpressure(true)
635                .initial_read_window_size(8 * 1024 * 1024)
636                .build(),
637        );
638        let config = ExpressDataCacheConfig::new(bucket, source_bucket);
639        let block_size = config.block_size;
640        let cache = ExpressDataCache::new(client.clone(), config);
641
642        let data = ChecksummedBytes::new("Foo".into());
643        let data_2 = ChecksummedBytes::new("Bar".into());
644        let cache_key = ObjectId::new("a".into(), ETag::for_tests());
645        let cache_key_non_existent = ObjectId::new("does-not-exist".into(), ETag::for_tests());
646
647        // Setup cache
648        let object_key = get_s3_key(&build_prefix(source_bucket, block_size), &cache_key, 0);
649
650        let (data, checksum) = data.into_inner().unwrap();
651        let block_metadata = BlockMetadata::new(0, 0, &cache_key, source_bucket, checksum);
652        let put_params = block_metadata.to_put_object_params();
653
654        let (data_2, checksum_2) = data_2.into_inner().unwrap();
655        let block_metadata_2 = BlockMetadata::new(0, 0, &cache_key, source_bucket, checksum_2);
656        let put_params_2 = block_metadata_2.to_put_object_params();
657
658        // Store with correct metadata and expect a successful get_block
659        client
660            .put_object_single(bucket, &object_key, &put_params, data.clone())
661            .in_current_span()
662            .await
663            .unwrap();
664        let (received_data, _) = cache
665            .get_block(&cache_key, 0, 0, data.len())
666            .await
667            .expect("get should succeed with intact metadata")
668            .expect("block should be non-empty")
669            .into_inner()
670            .expect("block should be valid");
671        assert_eq!(received_data, data);
672
673        // Remove the checksum when writing.
674        client
675            .put_object_single(bucket, &object_key, &put_params.clone().checksum(None), data.clone())
676            .in_current_span()
677            .await
678            .unwrap();
679        let err = cache
680            .get_block(&cache_key, 0, 0, data.len())
681            .await
682            .expect_err("cache should return error if checksum isn't present");
683        assert!(matches!(err, DataCacheError::InvalidBlockChecksum));
684
685        // Remove the object metadata when writing.
686        client
687            .put_object_single(
688                bucket,
689                &object_key,
690                &put_params.clone().object_metadata(HashMap::new()),
691                data.clone(),
692            )
693            .in_current_span()
694            .await
695            .unwrap();
696        let err = cache
697            .get_block(&cache_key, 0, 0, data.len())
698            .await
699            .expect_err("cache should return error if object metadata isn't present");
700        assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));
701
702        // Emulate corrupt data by writing 'incorrect' data.
703        client
704            .put_object_single(bucket, &object_key, &put_params.clone(), data_2.clone())
705            .in_current_span()
706            .await
707            .expect_err("should fail to write as checksum incorrect");
708
709        // Write data with object metadata header for different object
710        client
711            .put_object_single(
712                bucket,
713                &object_key,
714                &put_params.clone().checksum(put_params_2.checksum.clone()),
715                data_2.clone(),
716            )
717            .in_current_span()
718            .await
719            .unwrap();
720        let err = cache
721            .get_block(&cache_key, 0, 0, data_2.len())
722            .await
723            .expect_err("cache should return error if object metadata doesn't match data");
724        assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));
725
726        // Write data with object metadata header for object from a different bucket
727        let mut corrupted_metadata = block_metadata.clone();
728        corrupted_metadata.source_bucket_name = bucket.to_owned();
729        client
730            .put_object_single(
731                bucket,
732                &object_key,
733                &corrupted_metadata.to_put_object_params(),
734                data.clone(),
735            )
736            .in_current_span()
737            .await
738            .unwrap();
739        let err = cache
740            .get_block(&cache_key, 0, 0, data.len())
741            .await
742            .expect_err("cache should return error if source bucket does not match");
743        assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));
744
745        // Get data that's not been written yet
746        let result = cache
747            .get_block(&cache_key_non_existent, 0, 0, data.len())
748            .await
749            .expect("cache should return None if data is not present");
750        assert_eq!(result, None);
751    }
752
753    #[tokio::test]
754    async fn test_verify_cache_valid_success() {
755        let source_bucket = "source-bucket";
756        let bucket = "test-bucket";
757        let client = Arc::new(
758            MockClient::config()
759                .bucket(bucket)
760                .part_size(8 * 1024 * 1024)
761                .enable_backpressure(true)
762                .initial_read_window_size(8 * 1024 * 1024)
763                .build(),
764        );
765        let cache = ExpressDataCache::new(client.clone(), ExpressDataCacheConfig::new(bucket, source_bucket));
766
767        cache.verify_cache_valid().await.expect("cache should work");
768    }
769
770    #[tokio::test]
771    async fn test_verify_cache_valid_failure() {
772        let source_bucket = "source-bucket";
773        let bucket = "test-bucket";
774        let client = Arc::new(
775            MockClient::config()
776                .bucket(bucket)
777                .part_size(8 * 1024 * 1024)
778                .enable_backpressure(true)
779                .initial_read_window_size(8 * 1024 * 1024)
780                .build(),
781        );
782
783        let mut put_single_failures = HashMap::new();
784        put_single_failures.insert(1, MockClientError("error".to_owned().into()).into());
785
786        let failure_client = Arc::new(countdown_failure_client(
787            client.clone(),
788            CountdownFailureConfig {
789                put_single_failures,
790                ..Default::default()
791            },
792        ));
793
794        let cache = ExpressDataCache::new(failure_client, ExpressDataCacheConfig::new(bucket, source_bucket));
795
796        cache
797            .verify_cache_valid()
798            .await
799            .expect_err("cache should not report valid if cannot write");
800    }
801
802    proptest! {
803        #[test]
804        fn proptest_creates_small_s3_keys(key: String, etag: String, block_idx: BlockIndex, source_description: String, block_size: u64) {
805            // Ensure we can always serialise to S3 keys smaller than 1kb
806            let cache_key = ObjectId::new(key, etag.into());
807            let prefix = build_prefix(&source_description, block_size);
808            prop_assert!(get_s3_key(&prefix, &cache_key, block_idx).len() <= 1024);
809        }
810
811        #[test]
812        fn proptest_block_metadata_to_headers_s3_key_ascii_only(block_metadata: BlockMetadata) {
813            // Validate that even with UTF keys, the source key is always ascii
814            let params = block_metadata.to_put_object_params();
815            prop_assert!(params.object_metadata.get("source-key").unwrap().is_ascii());
816        }
817
818        #[test]
819        fn proptest_block_metadata_validates_headers(block_metadata: BlockMetadata, block_metadata2: BlockMetadata) {
820            // `to_headers` should contain enough information that `validate_headers` acts as equality
821            let params = block_metadata.to_put_object_params();
822            if block_metadata == block_metadata2 {
823                prop_assert!(block_metadata2.validate_object_metadata(&params.object_metadata).is_ok());
824            } else {
825                prop_assert!(block_metadata2.validate_object_metadata(&params.object_metadata).is_err());
826            }
827        }
828
829        #[test]
830        fn proptest_block_metadata_validates_headers_is_equal(block_metadata: BlockMetadata) {
831            // More checks to verify the equals path, as generating lots of equals examples may be hard
832            let params = block_metadata.to_put_object_params();
833            prop_assert!(block_metadata.validate_object_metadata(&params.object_metadata).is_ok());
834            prop_assert!(matches!(params.checksum, Some(UploadChecksum::Crc32c(x)) if x == Crc32c::new(block_metadata.data_checksum)));
835        }
836    }
837}