1use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
2use crate::ServerSideEncryption;
3use crate::metrics::defs::{
4 ATTR_CACHE, CACHE_EXPRESS, CACHE_GET_ERRORS, CACHE_GET_IO_SIZE, CACHE_GET_LATENCY, CACHE_OVERSIZED_OBJECTS,
5 CACHE_PUT_ERRORS, CACHE_PUT_IO_SIZE, CACHE_PUT_LATENCY,
6};
7use crate::object::ObjectId;
8use std::collections::HashMap;
9use std::time::Instant;
10
11use async_trait::async_trait;
12use base64ct::{Base64, Encoding};
13use bytes::{Bytes, BytesMut};
14use futures::{StreamExt, pin_mut};
15use mountpoint_s3_client::ObjectClient;
16use mountpoint_s3_client::checksums::crc32c::{self, Crc32c};
17use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
18use mountpoint_s3_client::types::{
19 ChecksumMode, ClientBackpressureHandle, GetBodyPart, GetObjectParams, GetObjectResponse, PutObjectSingleParams,
20 UploadChecksum,
21};
22use sha2::{Digest, Sha256};
23use tracing::Instrument;
24
25use mountpoint_s3_client::checksums::crc32c_from_base64;
26
27const CACHE_VERSION: &str = "V2";
28
29#[derive(Debug)]
31pub struct ExpressDataCacheConfig {
32 pub bucket_name: String,
34 pub source_bucket_name: String,
36 pub block_size: u64,
38 pub max_object_size: usize,
40 pub sse: ServerSideEncryption,
42}
43
44impl ExpressDataCacheConfig {
45 pub fn new(bucket_name: &str, source_bucket_name: &str) -> Self {
46 Self {
47 bucket_name: bucket_name.to_owned(),
48 source_bucket_name: source_bucket_name.to_owned(),
49 block_size: 1024 * 1024, max_object_size: 1024 * 1024, sse: ServerSideEncryption::default(),
52 }
53 }
54
55 pub fn block_size(mut self, block_size: u64) -> Self {
56 self.block_size = block_size;
57 self
58 }
59
60 pub fn max_object_size(mut self, max_object_size: usize) -> Self {
61 self.max_object_size = max_object_size;
62 self
63 }
64
65 pub fn sse(mut self, sse: ServerSideEncryption) -> Self {
66 self.sse = sse;
67 self
68 }
69}
70
71pub struct ExpressDataCache<Client: ObjectClient> {
73 client: Client,
74 prefix: String,
75 config: ExpressDataCacheConfig,
76}
77
78impl<S, C> From<ObjectClientError<S, C>> for DataCacheError
79where
80 S: std::error::Error + Send + Sync + 'static,
81 C: std::error::Error + Send + Sync + 'static,
82{
83 fn from(e: ObjectClientError<S, C>) -> Self {
84 DataCacheError::IoFailure(e.into())
85 }
86}
87
88impl<Client> ExpressDataCache<Client>
89where
90 Client: ObjectClient + Send + Sync + 'static,
91{
92 pub fn new(client: Client, config: ExpressDataCacheConfig) -> Self {
94 Self {
95 client,
96 prefix: build_prefix(&config.source_bucket_name, config.block_size),
97 config,
98 }
99 }
100
101 pub async fn make_put_object_request<'a>(
102 &self,
103 mut params: PutObjectSingleParams,
104 object_key: &str,
105 data: impl AsRef<[u8]> + Send + 'a,
106 ) -> Result<(), DataCacheError> {
107 let (sse_type, key_id) = self
108 .config
109 .sse
110 .clone()
111 .into_inner()
112 .map_err(|err| DataCacheError::IoFailure(err.into()))?;
113 params = params.server_side_encryption(sse_type);
114 params = params.ssekms_key_id(key_id);
115
116 let result = self
117 .client
118 .put_object_single(&self.config.bucket_name, object_key, ¶ms, data)
119 .in_current_span()
120 .await
121 .map_err(|err| DataCacheError::IoFailure(err.into()))?;
122
123 if let Err(err) = self
125 .config
126 .sse
127 .verify_response(result.sse_type.as_deref(), result.sse_kms_key_id.as_deref())
128 {
129 tracing::error!(object_key=?object_key, error=?err, "Unexpected SSE in PutObject response from S3. A cache block may be stored with wrong encryption settings.");
130 std::process::exit(1);
136 }
137
138 Ok(())
139 }
140
141 pub async fn verify_cache_valid(&self) -> Result<(), DataCacheError> {
142 let object_key = format!("{}/_mountpoint_cache_metadata", &self.prefix);
143 let data = format!(
148 "source_bucket={}\nblock_size={}",
149 self.config.source_bucket_name, self.config.block_size
150 );
151
152 let params = PutObjectSingleParams::new().storage_class("EXPRESS_ONEZONE".to_string());
156 self.make_put_object_request(params, &object_key, data).await
157 }
158
159 fn ensure_read_window(&self, backpressure_handle: Option<&mut impl ClientBackpressureHandle>) {
161 if let Some(backpressure_handle) = backpressure_handle {
162 backpressure_handle.increment_read_window(self.config.block_size as usize);
163 }
164 }
165
166 async fn read_block(
167 &self,
168 cache_key: &ObjectId,
169 block_idx: BlockIndex,
170 block_offset: u64,
171 object_size: usize,
172 ) -> DataCacheResult<Option<ChecksummedBytes>> {
173 if object_size > self.config.max_object_size {
174 metrics::counter!(CACHE_OVERSIZED_OBJECTS, ATTR_CACHE => CACHE_EXPRESS).increment(1);
175 return Ok(None);
176 }
177
178 if block_offset != block_idx * self.config.block_size {
179 return Err(DataCacheError::InvalidBlockOffset);
180 }
181
182 let object_key = get_s3_key(&self.prefix, cache_key, block_idx);
183 let mut result = match self
184 .client
185 .get_object(
186 &self.config.bucket_name,
187 &object_key,
188 &GetObjectParams::new().checksum_mode(Some(ChecksumMode::Enabled)),
189 )
190 .await
191 {
192 Ok(result) => result,
193 Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey(_))) => {
194 return Ok(None);
195 }
196 Err(e) => {
197 return Err(DataCacheError::IoFailure(e.into()));
198 }
199 };
200 let mut backpressure_handle = result.backpressure_handle().cloned();
201
202 self.ensure_read_window(backpressure_handle.as_mut());
204
205 let mut buffer: Bytes = Bytes::new();
206 pin_mut!(result);
207 while let Some(chunk) = result.next().await {
208 match chunk {
209 Ok(GetBodyPart { offset, data: body }) => {
210 if offset != buffer.len() as u64 {
211 return Err(DataCacheError::InvalidBlockOffset);
212 }
213
214 buffer = if buffer.is_empty() {
215 body
216 } else {
217 let mut buffer = BytesMut::from(buffer);
219 buffer.extend_from_slice(&body);
220 buffer.freeze()
221 };
222
223 self.ensure_read_window(backpressure_handle.as_mut());
225 }
226 Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey(_))) => {
227 return Ok(None);
228 }
229 Err(e) => {
230 return Err(DataCacheError::IoFailure(e.into()));
231 }
232 }
233 }
234
235 let object_metadata = result.get_object_metadata();
236
237 let checksum = result
238 .get_object_checksum()
239 .map_err(|_| DataCacheError::InvalidBlockChecksum)?;
240 let crc32c_b64 = checksum
241 .checksum_crc32c
242 .ok_or_else(|| DataCacheError::InvalidBlockChecksum)?;
243 let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|_| DataCacheError::InvalidBlockChecksum)?;
244
245 let block_metadata = BlockMetadata::new(
246 block_idx,
247 block_offset,
248 cache_key,
249 &self.config.source_bucket_name,
250 crc32c,
251 );
252 block_metadata.validate_object_metadata(&object_metadata)?;
253
254 Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c)))
255 }
256
257 async fn write_block(
258 &self,
259 cache_key: ObjectId,
260 block_idx: BlockIndex,
261 block_offset: u64,
262 bytes: ChecksummedBytes,
263 object_size: usize,
264 ) -> DataCacheResult<()> {
265 if object_size > self.config.max_object_size {
266 metrics::counter!(CACHE_OVERSIZED_OBJECTS, ATTR_CACHE => CACHE_EXPRESS).increment(1);
267 return Ok(());
268 }
269
270 if block_offset != block_idx * self.config.block_size {
271 return Err(DataCacheError::InvalidBlockOffset);
272 }
273
274 let object_key = get_s3_key(&self.prefix, &cache_key, block_idx);
275
276 let (data, checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?;
277 let block_metadata = BlockMetadata::new(
278 block_idx,
279 block_offset,
280 &cache_key,
281 &self.config.source_bucket_name,
282 checksum,
283 );
284
285 self.make_put_object_request(block_metadata.to_put_object_params(), &object_key, data)
286 .await
287 }
288}
289
290#[async_trait]
291impl<Client> DataCache for ExpressDataCache<Client>
292where
293 Client: ObjectClient + Send + Sync + 'static,
294{
295 async fn get_block(
296 &self,
297 cache_key: &ObjectId,
298 block_idx: BlockIndex,
299 block_offset: u64,
300 object_size: usize,
301 ) -> DataCacheResult<Option<ChecksummedBytes>> {
302 let start = Instant::now();
303 let result = match self.read_block(cache_key, block_idx, block_offset, object_size).await {
304 Ok(Some(data)) => {
305 metrics::histogram!(CACHE_GET_IO_SIZE, ATTR_CACHE => CACHE_EXPRESS).record(data.len() as f64);
306 Ok(Some(data))
307 }
308 Ok(None) => Ok(None),
309 Err(err) => {
310 metrics::counter!(CACHE_GET_ERRORS, ATTR_CACHE => CACHE_EXPRESS).increment(1);
311 Err(err)
312 }
313 };
314 metrics::histogram!(CACHE_GET_LATENCY, ATTR_CACHE => CACHE_EXPRESS).record(start.elapsed().as_micros() as f64);
315 result
316 }
317
318 async fn put_block(
319 &self,
320 cache_key: ObjectId,
321 block_idx: BlockIndex,
322 block_offset: u64,
323 bytes: ChecksummedBytes,
324 object_size: usize,
325 ) -> DataCacheResult<()> {
326 let start = Instant::now();
327 let bytes_len = bytes.len();
328 let result = match self
329 .write_block(cache_key, block_idx, block_offset, bytes, object_size)
330 .await
331 {
332 Ok(()) => {
333 metrics::histogram!(CACHE_PUT_IO_SIZE, ATTR_CACHE => CACHE_EXPRESS).record(bytes_len as f64);
334 Ok(())
335 }
336 Err(err) => {
337 metrics::counter!(CACHE_PUT_ERRORS, ATTR_CACHE => CACHE_EXPRESS).increment(1);
338 Err(err)
339 }
340 };
341 metrics::histogram!(CACHE_PUT_LATENCY, ATTR_CACHE => CACHE_EXPRESS).record(start.elapsed().as_micros() as f64);
342 result
343 }
344
345 fn block_size(&self) -> u64 {
346 self.config.block_size
347 }
348}
349
350#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
354#[derive(Clone, Debug, PartialEq, Eq)]
355struct BlockMetadata {
356 block_idx: BlockIndex,
357 block_offset: u64,
358 etag: String,
359 source_key: String,
360 source_bucket_name: String,
361 data_checksum: u32,
362 header_checksum: u32,
363}
364
365impl BlockMetadata {
366 pub fn new(
367 block_idx: BlockIndex,
368 block_offset: u64,
369 cache_key: &ObjectId,
370 source_bucket_name: &str,
371 data_checksum: Crc32c,
372 ) -> Self {
373 let header_checksum =
374 Self::get_header_checksum(block_idx, block_offset, cache_key, source_bucket_name, data_checksum).value();
375 Self {
376 block_idx,
377 block_offset,
378 etag: cache_key.etag().as_str().to_string(),
379 source_key: cache_key.key().to_string(),
380 source_bucket_name: source_bucket_name.to_string(),
381 data_checksum: data_checksum.value(),
382 header_checksum,
383 }
384 }
385
386 pub fn to_put_object_params(&self) -> PutObjectSingleParams {
388 let source_key_encoded = Base64::encode_string(self.source_key.as_bytes());
390 let object_metadata = HashMap::from([
391 ("cache-version".to_string(), CACHE_VERSION.to_string()),
392 ("block-idx".to_string(), format!("{}", self.block_idx)),
393 ("block-offset".to_string(), format!("{}", self.block_offset)),
394 ("etag".to_string(), self.etag.clone()),
395 ("source-key".to_string(), source_key_encoded),
396 ("source-bucket-name".to_string(), self.source_bucket_name.clone()),
397 ("header-checksum".to_string(), format!("{}", self.header_checksum)),
398 ]);
399
400 PutObjectSingleParams::new()
401 .object_metadata(object_metadata)
402 .checksum(Some(UploadChecksum::Crc32c(Crc32c::new(self.data_checksum))))
403 }
404
405 pub fn validate_object_metadata(&self, headers: &HashMap<String, String>) -> DataCacheResult<()> {
407 self.validate_header(headers, "cache-version", |version| version == CACHE_VERSION)?;
408 self.validate_header(headers, "block-idx", |block_idx| {
409 block_idx.parse() == Ok(self.block_idx)
410 })?;
411 self.validate_header(headers, "block-offset", |block_offset| {
412 block_offset.parse() == Ok(self.block_offset)
413 })?;
414 self.validate_header(headers, "etag", |etag| etag == self.etag)?;
415 self.validate_header(headers, "source-key", |source_key| {
416 source_key == Base64::encode_string(self.source_key.as_bytes())
417 })?;
418 self.validate_header(headers, "source-bucket-name", |source_bucket_name| {
419 source_bucket_name == self.source_bucket_name
420 })?;
421 self.validate_header(headers, "header-checksum", |header_checksum| {
422 header_checksum.parse() == Ok(self.header_checksum)
423 })?;
424
425 Ok(())
426 }
427
428 fn validate_header<F: Fn(&str) -> bool>(
429 &self,
430 headers: &HashMap<String, String>,
431 header: &str,
432 is_valid: F,
433 ) -> DataCacheResult<()> {
434 let value = headers
435 .get(header)
436 .ok_or(DataCacheError::InvalidBlockHeader(header.to_string()))?;
437 is_valid(value)
438 .then_some(())
439 .ok_or(DataCacheError::InvalidBlockHeader(header.to_string()))
440 }
441
442 fn get_header_checksum(
443 block_idx: BlockIndex,
444 block_offset: u64,
445 cache_key: &ObjectId,
446 source_bucket_name: &str,
447 data_checksum: Crc32c,
448 ) -> Crc32c {
449 let mut hasher = crc32c::Hasher::new();
450 hasher.update(CACHE_VERSION.as_bytes());
451 hasher.update(&block_idx.to_be_bytes());
452 hasher.update(&block_offset.to_be_bytes());
453 hasher.update(cache_key.etag().as_str().as_bytes());
454 hasher.update(cache_key.key().as_bytes());
455 hasher.update(source_bucket_name.as_bytes());
456 hasher.update(&data_checksum.value().to_be_bytes());
457 hasher.finalize()
458 }
459}
460
461pub fn build_prefix(source_bucket_name: &str, block_size: u64) -> String {
463 hex::encode(
464 Sha256::new()
465 .chain_update(CACHE_VERSION.as_bytes())
466 .chain_update(block_size.to_be_bytes())
467 .chain_update(source_bucket_name.as_bytes())
468 .finalize(),
469 )
470}
471
472pub fn get_s3_key(prefix: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String {
474 let hashed_cache_key = hex::encode(
475 Sha256::new()
476 .chain_update(cache_key.key())
477 .chain_update(cache_key.etag().as_str())
478 .finalize(),
479 );
480 format!("{prefix}/{hashed_cache_key}/{block_idx:010}")
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486 use crate::checksums::ChecksummedBytes;
487 use crate::sync::Arc;
488 use proptest::{prop_assert, proptest};
489
490 use mountpoint_s3_client::failure_client::{CountdownFailureConfig, countdown_failure_client};
491 use mountpoint_s3_client::mock_client::{MockClient, MockClientError};
492 use mountpoint_s3_client::types::ETag;
493 use test_case::test_case;
494
495 #[test_case(1024, 512 * 1024; "block_size smaller than part_size")]
496 #[test_case(8 * 1024 * 1024, 512 * 1024; "block_size larger than part_size")]
497 #[tokio::test]
498 async fn test_put_get(part_size: usize, block_size: u64) {
499 let bucket = "test-bucket";
500 let client = Arc::new(
501 MockClient::config()
502 .bucket(bucket)
503 .part_size(part_size)
504 .enable_backpressure(true)
505 .initial_read_window_size(part_size)
506 .build(),
507 );
508
509 let config = ExpressDataCacheConfig::new(bucket, "unique source description").block_size(block_size);
510 let cache = ExpressDataCache::new(client, config);
511
512 let data_1 = ChecksummedBytes::new("Foo".into());
513 let data_2 = ChecksummedBytes::new("Bar".into());
514 let data_3 = ChecksummedBytes::new("a".repeat(block_size as usize).into());
515
516 let object_1_size = data_1.len() + data_3.len();
517 let object_2_size = data_2.len();
518
519 let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
520 let cache_key_2 = ObjectId::new(
521 "longkey_".repeat(128), ETag::for_tests(),
523 );
524
525 let block = cache
526 .get_block(&cache_key_1, 0, 0, object_1_size)
527 .await
528 .expect("cache should be accessible");
529 assert!(
530 block.is_none(),
531 "no entry should be available to return but got {block:?}",
532 );
533
534 cache
536 .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size)
537 .await
538 .expect("cache should be accessible");
539 let entry = cache
540 .get_block(&cache_key_1, 0, 0, object_1_size)
541 .await
542 .expect("cache should be accessible")
543 .expect("cache entry should be returned");
544 assert!(entry.validate().is_ok(), "CRC32C should match");
545 assert_eq!(
546 data_1, entry,
547 "cache entry returned should match original bytes after put"
548 );
549
550 cache
552 .put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size)
553 .await
554 .expect("cache should be accessible");
555 let entry = cache
556 .get_block(&cache_key_2, 0, 0, object_2_size)
557 .await
558 .expect("cache should be accessible")
559 .expect("cache entry should be returned");
560 assert!(entry.validate().is_ok(), "CRC32C should match");
561 assert_eq!(
562 data_2, entry,
563 "cache entry returned should match original bytes after put"
564 );
565
566 cache
568 .put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size)
569 .await
570 .expect("cache should be accessible");
571 let entry = cache
572 .get_block(&cache_key_1, 1, block_size, object_1_size)
573 .await
574 .expect("cache should be accessible")
575 .expect("cache entry should be returned");
576 assert!(entry.validate().is_ok(), "CRC32C should match");
577 assert_eq!(
578 data_3, entry,
579 "cache entry returned should match original bytes after put"
580 );
581
582 let entry = cache
584 .get_block(&cache_key_1, 0, 0, object_1_size)
585 .await
586 .expect("cache should be accessible")
587 .expect("cache entry should be returned");
588 assert!(entry.validate().is_ok(), "CRC32C should match");
589 assert_eq!(
590 data_1, entry,
591 "cache entry returned should match original bytes after put"
592 );
593 }
594
595 #[tokio::test]
596 async fn large_object_bypassed() {
597 let bucket = "test-bucket";
598 let client = Arc::new(
599 MockClient::config()
600 .bucket(bucket)
601 .part_size(8 * 1024 * 1024)
602 .enable_backpressure(true)
603 .initial_read_window_size(8 * 1024 * 1024)
604 .build(),
605 );
606 let cache = ExpressDataCache::new(
607 client.clone(),
608 ExpressDataCacheConfig::new(bucket, "unique source description"),
609 );
610 let data_1 = vec![0u8; 1024 * 1024 + 1];
611 let data_1 = ChecksummedBytes::new(data_1.into());
612 let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
613 cache
615 .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), data_1.len())
616 .await
617 .expect("cache should be accessible");
618 let get_result = cache
619 .get_block(&cache_key_1, 0, 0, data_1.len())
620 .await
621 .expect("cache should be accessible");
622 assert!(get_result.is_none());
623 assert_eq!(client.object_count(), 0, "cache must be empty");
624 }
625
626 #[tokio::test]
627 async fn test_get_validate_failure() {
628 let source_bucket = "source-bucket";
629 let bucket = "test-bucket";
630 let client = Arc::new(
631 MockClient::config()
632 .bucket(bucket)
633 .part_size(8 * 1024 * 1024)
634 .enable_backpressure(true)
635 .initial_read_window_size(8 * 1024 * 1024)
636 .build(),
637 );
638 let config = ExpressDataCacheConfig::new(bucket, source_bucket);
639 let block_size = config.block_size;
640 let cache = ExpressDataCache::new(client.clone(), config);
641
642 let data = ChecksummedBytes::new("Foo".into());
643 let data_2 = ChecksummedBytes::new("Bar".into());
644 let cache_key = ObjectId::new("a".into(), ETag::for_tests());
645 let cache_key_non_existent = ObjectId::new("does-not-exist".into(), ETag::for_tests());
646
647 let object_key = get_s3_key(&build_prefix(source_bucket, block_size), &cache_key, 0);
649
650 let (data, checksum) = data.into_inner().unwrap();
651 let block_metadata = BlockMetadata::new(0, 0, &cache_key, source_bucket, checksum);
652 let put_params = block_metadata.to_put_object_params();
653
654 let (data_2, checksum_2) = data_2.into_inner().unwrap();
655 let block_metadata_2 = BlockMetadata::new(0, 0, &cache_key, source_bucket, checksum_2);
656 let put_params_2 = block_metadata_2.to_put_object_params();
657
658 client
660 .put_object_single(bucket, &object_key, &put_params, data.clone())
661 .in_current_span()
662 .await
663 .unwrap();
664 let (received_data, _) = cache
665 .get_block(&cache_key, 0, 0, data.len())
666 .await
667 .expect("get should succeed with intact metadata")
668 .expect("block should be non-empty")
669 .into_inner()
670 .expect("block should be valid");
671 assert_eq!(received_data, data);
672
673 client
675 .put_object_single(bucket, &object_key, &put_params.clone().checksum(None), data.clone())
676 .in_current_span()
677 .await
678 .unwrap();
679 let err = cache
680 .get_block(&cache_key, 0, 0, data.len())
681 .await
682 .expect_err("cache should return error if checksum isn't present");
683 assert!(matches!(err, DataCacheError::InvalidBlockChecksum));
684
685 client
687 .put_object_single(
688 bucket,
689 &object_key,
690 &put_params.clone().object_metadata(HashMap::new()),
691 data.clone(),
692 )
693 .in_current_span()
694 .await
695 .unwrap();
696 let err = cache
697 .get_block(&cache_key, 0, 0, data.len())
698 .await
699 .expect_err("cache should return error if object metadata isn't present");
700 assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));
701
702 client
704 .put_object_single(bucket, &object_key, &put_params.clone(), data_2.clone())
705 .in_current_span()
706 .await
707 .expect_err("should fail to write as checksum incorrect");
708
709 client
711 .put_object_single(
712 bucket,
713 &object_key,
714 &put_params.clone().checksum(put_params_2.checksum.clone()),
715 data_2.clone(),
716 )
717 .in_current_span()
718 .await
719 .unwrap();
720 let err = cache
721 .get_block(&cache_key, 0, 0, data_2.len())
722 .await
723 .expect_err("cache should return error if object metadata doesn't match data");
724 assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));
725
726 let mut corrupted_metadata = block_metadata.clone();
728 corrupted_metadata.source_bucket_name = bucket.to_owned();
729 client
730 .put_object_single(
731 bucket,
732 &object_key,
733 &corrupted_metadata.to_put_object_params(),
734 data.clone(),
735 )
736 .in_current_span()
737 .await
738 .unwrap();
739 let err = cache
740 .get_block(&cache_key, 0, 0, data.len())
741 .await
742 .expect_err("cache should return error if source bucket does not match");
743 assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));
744
745 let result = cache
747 .get_block(&cache_key_non_existent, 0, 0, data.len())
748 .await
749 .expect("cache should return None if data is not present");
750 assert_eq!(result, None);
751 }
752
753 #[tokio::test]
754 async fn test_verify_cache_valid_success() {
755 let source_bucket = "source-bucket";
756 let bucket = "test-bucket";
757 let client = Arc::new(
758 MockClient::config()
759 .bucket(bucket)
760 .part_size(8 * 1024 * 1024)
761 .enable_backpressure(true)
762 .initial_read_window_size(8 * 1024 * 1024)
763 .build(),
764 );
765 let cache = ExpressDataCache::new(client.clone(), ExpressDataCacheConfig::new(bucket, source_bucket));
766
767 cache.verify_cache_valid().await.expect("cache should work");
768 }
769
770 #[tokio::test]
771 async fn test_verify_cache_valid_failure() {
772 let source_bucket = "source-bucket";
773 let bucket = "test-bucket";
774 let client = Arc::new(
775 MockClient::config()
776 .bucket(bucket)
777 .part_size(8 * 1024 * 1024)
778 .enable_backpressure(true)
779 .initial_read_window_size(8 * 1024 * 1024)
780 .build(),
781 );
782
783 let mut put_single_failures = HashMap::new();
784 put_single_failures.insert(1, MockClientError("error".to_owned().into()).into());
785
786 let failure_client = Arc::new(countdown_failure_client(
787 client.clone(),
788 CountdownFailureConfig {
789 put_single_failures,
790 ..Default::default()
791 },
792 ));
793
794 let cache = ExpressDataCache::new(failure_client, ExpressDataCacheConfig::new(bucket, source_bucket));
795
796 cache
797 .verify_cache_valid()
798 .await
799 .expect_err("cache should not report valid if cannot write");
800 }
801
802 proptest! {
803 #[test]
804 fn proptest_creates_small_s3_keys(key: String, etag: String, block_idx: BlockIndex, source_description: String, block_size: u64) {
805 let cache_key = ObjectId::new(key, etag.into());
807 let prefix = build_prefix(&source_description, block_size);
808 prop_assert!(get_s3_key(&prefix, &cache_key, block_idx).len() <= 1024);
809 }
810
811 #[test]
812 fn proptest_block_metadata_to_headers_s3_key_ascii_only(block_metadata: BlockMetadata) {
813 let params = block_metadata.to_put_object_params();
815 prop_assert!(params.object_metadata.get("source-key").unwrap().is_ascii());
816 }
817
818 #[test]
819 fn proptest_block_metadata_validates_headers(block_metadata: BlockMetadata, block_metadata2: BlockMetadata) {
820 let params = block_metadata.to_put_object_params();
822 if block_metadata == block_metadata2 {
823 prop_assert!(block_metadata2.validate_object_metadata(¶ms.object_metadata).is_ok());
824 } else {
825 prop_assert!(block_metadata2.validate_object_metadata(¶ms.object_metadata).is_err());
826 }
827 }
828
829 #[test]
830 fn proptest_block_metadata_validates_headers_is_equal(block_metadata: BlockMetadata) {
831 let params = block_metadata.to_put_object_params();
833 prop_assert!(block_metadata.validate_object_metadata(¶ms.object_metadata).is_ok());
834 prop_assert!(matches!(params.checksum, Some(UploadChecksum::Crc32c(x)) if x == Crc32c::new(block_metadata.data_checksum)));
835 }
836 }
837}