1use std::fs;
4use std::io::{ErrorKind, Read, Write};
5use std::os::unix::fs::{DirBuilderExt, PermissionsExt};
6use std::path::{Path, PathBuf};
7use std::time::Instant;
8
9use async_trait::async_trait;
10use bincode::config::{Configuration, Fixint, Limit, LittleEndian};
11use bincode::error::{DecodeError, EncodeError};
12use bincode::{Decode, Encode};
13use bytes::Bytes;
14use linked_hash_map::LinkedHashMap;
15use mountpoint_s3_client::checksums::crc32c::{self, Crc32c};
16use sha2::{Digest, Sha256};
17use tempfile::NamedTempFile;
18use thiserror::Error;
19use tracing::{trace, warn};
20
21use crate::checksums::IntegrityError;
22use crate::data_cache::DataCacheError;
23use crate::memory::{BufferKind, PagedPool};
24use crate::metrics::defs::{
25 ATTR_CACHE, CACHE_DISK, CACHE_EVICT_LATENCY, CACHE_GET_ERRORS, CACHE_GET_IO_SIZE, CACHE_GET_LATENCY,
26 CACHE_PUT_ERRORS, CACHE_PUT_IO_SIZE, CACHE_PUT_LATENCY, CACHE_TOTAL_SIZE,
27};
28use crate::object::ObjectId;
29use crate::sync::Mutex;
30
31use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult};
32
33const CACHE_VERSION: &str = "V2";
35
36const HASHED_DIR_SPLIT_INDEX: usize = 2;
38
39pub struct DiskDataCache {
41 config: DiskDataCacheConfig,
42 pool: PagedPool,
43 usage: Option<Mutex<UsageInfo<DiskBlockKey>>>,
45}
46
47#[derive(Debug)]
49pub struct DiskDataCacheConfig {
50 pub cache_directory: PathBuf,
51 pub block_size: u64,
53 pub limit: CacheLimit,
55}
56
57#[derive(Debug)]
59pub enum CacheLimit {
60 Unbounded,
61 TotalSize { max_size: usize },
62 AvailableSpace { min_ratio: f64 },
63}
64
65impl Default for CacheLimit {
66 fn default() -> Self {
67 CacheLimit::AvailableSpace { min_ratio: 0.05 } }
69}
70
71#[derive(Encode, Decode, Debug)]
76struct DiskBlockHeader {
77 block_idx: BlockIndex,
78 block_offset: u64,
79 block_len: u64,
80 etag: String,
81 s3_key: String,
82 data_checksum: u32,
83 header_checksum: u32,
84}
85
86const BINCODE_HEADER_MAX_SIZE: usize = 10000;
92
93const BINCODE_CONFIG: Configuration<LittleEndian, Fixint, Limit<BINCODE_HEADER_MAX_SIZE>> = bincode::config::standard()
95 .with_fixed_int_encoding()
96 .with_limit::<BINCODE_HEADER_MAX_SIZE>();
97
98#[derive(Debug, Error)]
100enum DiskBlockCreationError {
101 #[error(transparent)]
103 IntegrityError(#[from] IntegrityError),
104}
105
106#[derive(Debug, Error)]
108enum DiskBlockAccessError {
109 #[error("checksum over the block's fields did not match the field content")]
110 ChecksumError,
111 #[error("one or more of the fields in this block were incorrect")]
112 FieldMismatchError,
113}
114
115#[derive(Debug, Error)]
117enum DiskBlockReadWriteError {
118 #[error("Invalid block length: {0}")]
119 InvalidBlockLength(u64),
120 #[error("Error decoding the block: {0}")]
121 DecodeError(DecodeError),
122 #[error("Error encoding the block: {0}")]
123 EncodeError(EncodeError),
124 #[error("IO error: {0}")]
125 IOError(#[from] std::io::Error),
126}
127
128impl DiskBlockHeader {
129 pub fn new(
131 block_idx: BlockIndex,
132 block_offset: u64,
133 block_len: usize,
134 etag: String,
135 s3_key: String,
136 data_checksum: Crc32c,
137 ) -> Self {
138 let data_checksum = data_checksum.value();
139 let header_checksum =
140 Self::compute_checksum(block_idx, block_offset, block_len, &etag, &s3_key, data_checksum).value();
141 DiskBlockHeader {
142 block_idx,
143 block_offset,
144 block_len: block_len as u64,
145 etag,
146 s3_key,
147 data_checksum,
148 header_checksum,
149 }
150 }
151
152 fn compute_checksum(
153 block_idx: BlockIndex,
154 block_offset: u64,
155 block_len: usize,
156 etag: &str,
157 s3_key: &str,
158 data_checksum: u32,
159 ) -> Crc32c {
160 let mut hasher = crc32c::Hasher::new();
161 hasher.update(&block_idx.to_be_bytes());
162 hasher.update(&block_offset.to_be_bytes());
163 hasher.update(&block_len.to_be_bytes());
164 hasher.update(etag.as_bytes());
165 hasher.update(s3_key.as_bytes());
166 hasher.update(&data_checksum.to_be_bytes());
167 hasher.finalize()
168 }
169
170 pub fn validate(
174 &self,
175 s3_key: &str,
176 etag: &str,
177 block_idx: BlockIndex,
178 block_offset: u64,
179 block_len: usize,
180 ) -> Result<Crc32c, DiskBlockAccessError> {
181 let s3_key_match = s3_key == self.s3_key;
182 let etag_match = etag == self.etag;
183 let block_idx_match = block_idx == self.block_idx;
184 let block_offset_match = block_offset == self.block_offset;
185 let block_size_match = block_len == self.block_len as usize;
186
187 let data_checksum = self.data_checksum;
188 if s3_key_match && etag_match && block_idx_match && block_offset_match && block_size_match {
189 if Self::compute_checksum(block_idx, block_offset, block_len, etag, s3_key, data_checksum).value()
190 != self.header_checksum
191 {
192 Err(DiskBlockAccessError::ChecksumError)
193 } else {
194 Ok(Crc32c::new(data_checksum))
195 }
196 } else {
197 warn!(
198 s3_key_match,
199 etag_match, block_idx_match, block_size_match, "block data did not match expected values",
200 );
201 Err(DiskBlockAccessError::FieldMismatchError)
202 }
203 }
204}
205
206#[derive(Debug)]
208struct DiskBlock {
209 header: DiskBlockHeader,
211 data: Bytes,
213}
214
215impl DiskBlock {
216 fn new(
221 cache_key: ObjectId,
222 block_idx: BlockIndex,
223 block_offset: u64,
224 bytes: ChecksummedBytes,
225 ) -> Result<Self, DiskBlockCreationError> {
226 let s3_key = cache_key.key().to_owned();
227 let etag = cache_key.etag().as_str().to_owned();
228 let (data, data_checksum) = bytes.into_inner()?;
229 let header = DiskBlockHeader::new(block_idx, block_offset, data.len(), etag, s3_key, data_checksum);
230
231 Ok(DiskBlock { data, header })
232 }
233
234 fn data(
238 &self,
239 cache_key: &ObjectId,
240 block_idx: BlockIndex,
241 block_offset: u64,
242 ) -> Result<ChecksummedBytes, DiskBlockAccessError> {
243 let data_checksum = self.header.validate(
244 cache_key.key(),
245 cache_key.etag().as_str(),
246 block_idx,
247 block_offset,
248 self.data.len(),
249 )?;
250 let bytes = ChecksummedBytes::new_from_inner_data(self.data.clone(), data_checksum);
251 Ok(bytes)
252 }
253
254 fn read(reader: &mut impl Read, block_size: u64, pool: &PagedPool) -> Result<Self, DiskBlockReadWriteError> {
256 let header: DiskBlockHeader = bincode::decode_from_std_read(reader, BINCODE_CONFIG)?;
257
258 if header.block_len > block_size {
259 return Err(DiskBlockReadWriteError::InvalidBlockLength(header.block_len));
260 }
261
262 let size = header.block_len as usize;
263 let mut buffer = pool.get_buffer_mut(size, BufferKind::DiskCache);
264 buffer.fill_from_reader(reader)?;
265 let data = buffer.into_bytes();
266
267 Ok(Self { header, data })
268 }
269
270 fn write(&self, writer: &mut impl Write) -> Result<usize, DiskBlockReadWriteError> {
272 let header_length = bincode::encode_into_std_write(&self.header, writer, BINCODE_CONFIG)?;
273 writer.write_all(&self.data)?;
274 Ok(header_length + self.data.len())
275 }
276}
277
278impl From<DecodeError> for DiskBlockReadWriteError {
279 fn from(value: DecodeError) -> Self {
280 match value {
281 DecodeError::Io { inner, .. } => DiskBlockReadWriteError::IOError(inner),
282 value => DiskBlockReadWriteError::DecodeError(value),
283 }
284 }
285}
286
287impl From<EncodeError> for DiskBlockReadWriteError {
288 fn from(value: EncodeError) -> Self {
289 match value {
290 EncodeError::Io { inner, .. } => DiskBlockReadWriteError::IOError(inner),
291 value => DiskBlockReadWriteError::EncodeError(value),
292 }
293 }
294}
295
296impl From<std::io::Error> for DataCacheError {
297 fn from(e: std::io::Error) -> Self {
298 DataCacheError::IoFailure(e.into())
299 }
300}
301
302impl From<DiskBlockReadWriteError> for DataCacheError {
303 fn from(value: DiskBlockReadWriteError) -> Self {
304 match value {
305 DiskBlockReadWriteError::IOError(e) => DataCacheError::IoFailure(e.into()),
306 _ => DataCacheError::InvalidBlockContent,
307 }
308 }
309}
310
311impl DiskDataCache {
312 pub fn new(config: DiskDataCacheConfig, pool: PagedPool) -> Self {
314 let usage = match &config.limit {
315 CacheLimit::Unbounded => None,
316 CacheLimit::TotalSize { .. } | CacheLimit::AvailableSpace { .. } => Some(Mutex::new(UsageInfo::new())),
317 };
318 DiskDataCache { config, pool, usage }
319 }
320
321 fn get_path_for_block_key(&self, block_key: &DiskBlockKey) -> PathBuf {
323 let mut path = self.config.cache_directory.join(CACHE_VERSION);
324 block_key.append_to_path(&mut path);
325 path
326 }
327
328 fn read_block(
329 &self,
330 path: impl AsRef<Path>,
331 cache_key: &ObjectId,
332 block_idx: BlockIndex,
333 block_offset: u64,
334 ) -> DataCacheResult<Option<ChecksummedBytes>> {
335 trace!(
336 key = ?cache_key.key(),
337 offset = block_offset,
338 path = ?path.as_ref(),
339 "reading cache block",
340 );
341 let mut file = match fs::File::open(path.as_ref()) {
342 Ok(file) => file,
343 Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None),
344 Err(err) => return Err(err.into()),
345 };
346
347 let mut block_version = [0; CACHE_VERSION.len()];
348 file.read_exact(&mut block_version)?;
349 if block_version != CACHE_VERSION.as_bytes() {
350 warn!(
351 found_version = ?block_version, expected_version = ?CACHE_VERSION, path = ?path.as_ref(),
352 "stale block format found during reading"
353 );
354 return Err(DataCacheError::InvalidBlockContent);
355 }
356
357 let block = DiskBlock::read(&mut file, self.block_size(), &self.pool)
358 .inspect_err(|e| warn!(path = ?path.as_ref(), "block could not be deserialized: {:?}", e))?;
359 let bytes = block
360 .data(cache_key, block_idx, block_offset)
361 .map_err(|err| match err {
362 DiskBlockAccessError::ChecksumError | DiskBlockAccessError::FieldMismatchError => {
363 DataCacheError::InvalidBlockContent
364 }
365 })?;
366
367 Ok(Some(bytes))
368 }
369
370 fn write_block(&self, path: impl AsRef<Path>, block: DiskBlock) -> DataCacheResult<(NamedTempFile, usize)> {
371 let path = path.as_ref();
372 let cache_path_for_key = path.parent().expect("path should include cache key in directory name");
373 fs::DirBuilder::new()
374 .mode(0o700)
375 .recursive(true)
376 .create(cache_path_for_key)?;
377
378 let mut temp_file = tempfile::Builder::new()
379 .permissions(fs::Permissions::from_mode(0o600))
380 .tempfile_in(cache_path_for_key)?;
381 trace!(
382 key = block.header.s3_key,
383 offset = block.header.block_offset,
384 block_path = ?path,
385 temp_path = ?temp_file.path(),
386 "writing cache block",
387 );
388 temp_file.write_all(CACHE_VERSION.as_bytes())?;
389 let bytes_written = block.write(&mut temp_file)?;
390 Ok((temp_file, bytes_written))
391 }
392
393 fn is_limit_exceeded(&self, size: usize) -> bool {
394 metrics::gauge!(CACHE_TOTAL_SIZE, ATTR_CACHE => CACHE_DISK).set(size as f64);
395 match self.config.limit {
396 CacheLimit::Unbounded => false,
397 CacheLimit::TotalSize { max_size } => size > max_size,
398 CacheLimit::AvailableSpace { min_ratio } => {
399 let stats = match nix::sys::statvfs::statvfs(&self.config.cache_directory) {
400 Ok(stats) if stats.blocks() == 0 => {
401 warn!("unable to determine available space (0 blocks reported)");
402 return false;
403 }
404 Ok(stats) => stats,
405 Err(error) => {
406 warn!(?error, "unable to determine available space");
407 return false;
408 }
409 };
410 (stats.blocks_free() as f64) < min_ratio * (stats.blocks() as f64)
411 }
412 }
413 }
414
415 fn evict_if_needed(&self) -> DataCacheResult<()> {
416 let Some(usage) = &self.usage else {
417 return Ok(());
418 };
419
420 loop {
421 let mut usage = usage.lock().unwrap();
422 if !self.is_limit_exceeded(usage.size) {
423 break;
424 }
425 let Some(to_remove) = usage.evict_lru() else {
426 warn!("cache limit exceeded but nothing to evict");
427 return Err(DataCacheError::EvictionFailure);
428 };
429 let path_to_remove = self.get_path_for_block_key(&to_remove);
430 trace!("evicting block at {}", path_to_remove.display());
431 if let Err(remove_err) = fs::remove_file(&path_to_remove)
432 && remove_err.kind() != ErrorKind::NotFound
433 {
434 warn!("unable to evict block: {:?}", remove_err);
435 }
436 }
437 Ok(())
438 }
439}
440
441fn hash_cache_key_raw(cache_key: &ObjectId) -> [u8; 32] {
443 let s3_key = cache_key.key();
444 let etag = cache_key.etag();
445
446 let mut hasher = Sha256::new();
447 hasher.update(CACHE_VERSION);
448 hasher.update(s3_key);
449 hasher.update(etag.as_str());
450 hasher.finalize().into()
451}
452
453#[async_trait]
454impl DataCache for DiskDataCache {
455 async fn get_block(
456 &self,
457 cache_key: &ObjectId,
458 block_idx: BlockIndex,
459 block_offset: u64,
460 _object_size: usize,
461 ) -> DataCacheResult<Option<ChecksummedBytes>> {
462 if block_offset != block_idx * self.config.block_size {
463 return Err(DataCacheError::InvalidBlockOffset);
464 }
465 let start = Instant::now();
466 let block_key = DiskBlockKey::new(cache_key, block_idx);
467 let path = self.get_path_for_block_key(&block_key);
468 let result = match self.read_block(&path, cache_key, block_idx, block_offset) {
469 Ok(None) => {
470 Ok(None)
472 }
473 Ok(Some(bytes)) => {
474 metrics::histogram!(CACHE_GET_IO_SIZE, ATTR_CACHE => CACHE_DISK).record(bytes.len() as f64);
476 if let Some(usage) = &self.usage {
477 usage.lock().unwrap().refresh(&block_key);
478 }
479 Ok(Some(bytes))
480 }
481 Err(err) => {
482 metrics::counter!(CACHE_GET_ERRORS, ATTR_CACHE => CACHE_DISK).increment(1);
484 Err(err)
485 }
486 };
487 metrics::histogram!(CACHE_GET_LATENCY, ATTR_CACHE => CACHE_DISK).record(start.elapsed().as_micros() as f64);
488 result
489 }
490
491 async fn put_block(
492 &self,
493 cache_key: ObjectId,
494 block_idx: BlockIndex,
495 block_offset: u64,
496 bytes: ChecksummedBytes,
497 _object_size: usize,
498 ) -> DataCacheResult<()> {
499 if block_offset != block_idx * self.config.block_size {
500 return Err(DataCacheError::InvalidBlockOffset);
501 }
502 let start = Instant::now();
503 let bytes_len = bytes.len();
504 let block_key = DiskBlockKey::new(&cache_key, block_idx);
505 let path = self.get_path_for_block_key(&block_key);
506 trace!(?cache_key, ?path, "new block will be created in disk cache");
507
508 let put_result = (|| -> DataCacheResult<()> {
511 let block = DiskBlock::new(cache_key, block_idx, block_offset, bytes).map_err(|err| match err {
512 DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent,
513 })?;
514
515 {
516 let eviction_start = Instant::now();
517 let result = self.evict_if_needed();
518 metrics::histogram!(CACHE_EVICT_LATENCY, ATTR_CACHE => CACHE_DISK)
519 .record(eviction_start.elapsed().as_micros() as f64);
520 result
521 }?;
522
523 let result = self.write_block(&path, block);
524 let (temp_file, size) = result?;
525
526 if let Some(usage) = &self.usage {
527 let mut usage = usage.lock().unwrap();
528 _ = temp_file.persist(path).map_err(|e| e.error)?;
529 usage.add(block_key, size);
530 } else {
531 _ = temp_file.persist(path).map_err(|e| e.error)?;
532 }
533
534 Ok(())
535 })();
536
537 if put_result.is_ok() {
538 metrics::histogram!(CACHE_PUT_IO_SIZE, ATTR_CACHE => CACHE_DISK).record(bytes_len as f64);
539 } else {
540 metrics::counter!(CACHE_PUT_ERRORS, ATTR_CACHE => CACHE_DISK).increment(1);
541 }
542 metrics::histogram!(CACHE_PUT_LATENCY, ATTR_CACHE => CACHE_DISK).record(start.elapsed().as_micros() as f64);
543 put_result
544 }
545
546 fn block_size(&self) -> u64 {
547 self.config.block_size
548 }
549}
550
551#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
556struct DiskBlockKey {
557 hashed_key: [u8; 32],
558 block_index: BlockIndex,
559}
560
561impl DiskBlockKey {
562 fn new(cache_key: &ObjectId, block_index: BlockIndex) -> Self {
563 let hashed_key = hash_cache_key_raw(cache_key);
564 Self {
565 hashed_key,
566 block_index,
567 }
568 }
569
570 fn hex_key(&self) -> String {
571 hex::encode(self.hashed_key)
572 }
573
574 fn append_to_path(&self, path: &mut PathBuf) {
575 let hashed_cache_key = self.hex_key();
576
577 let (first, second) = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
579 path.push(first);
580 path.push(second);
581
582 path.push(format!("{:010}", self.block_index));
584 }
585}
586
587struct UsageInfo<K> {
589 entries: LinkedHashMap<K, usize>,
590 size: usize,
591}
592
593impl<K> UsageInfo<K>
594where
595 K: std::hash::Hash + Eq + std::fmt::Debug,
596{
597 fn new() -> Self {
598 Self {
599 entries: LinkedHashMap::new(),
600 size: 0,
601 }
602 }
603
604 fn refresh(&mut self, key: &K) -> bool {
607 self.entries.get_refresh(key).is_some()
608 }
609
610 fn add(&mut self, key: K, size: usize) {
612 if let Some(previous_size) = self.entries.insert(key, size) {
613 self.size = self.size.saturating_sub(previous_size);
614 }
615
616 self.size = self.size.saturating_add(size);
617 }
618
619 fn evict_lru(&mut self) -> Option<K> {
622 let (key, size) = self.entries.pop_front()?;
623 self.size = self.size.saturating_sub(size);
624 Some(key)
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use std::str::FromStr;
631 use std::{ffi::OsString, io::Cursor};
632
633 use super::*;
634
635 use futures::StreamExt as _;
636 use futures::executor::{ThreadPool, block_on};
637 use futures::task::SpawnExt;
638 use mountpoint_s3_client::types::ETag;
639 use rand::rngs::SmallRng;
640 use rand::{Rng, SeedableRng};
641 use test_case::test_case;
642
643 use crate::sync::Arc;
644
645 #[test]
646 fn test_block_format_version_requires_update() {
647 let cache_key = ObjectId::new("hello-world".to_string(), ETag::for_tests());
648 let data = ChecksummedBytes::new("Foo".into());
649 let block = DiskBlock::new(cache_key, 100, 100 * 10, data).expect("should succeed as data checksum is valid");
650 let expected_bytes: Vec<u8> = vec![
651 100, 0, 0, 0, 0, 0, 0, 0, 232, 3, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116,
652 101, 115, 116, 95, 101, 116, 97, 103, 11, 0, 0, 0, 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114,
653 108, 100, 9, 85, 128, 46, 13, 202, 106, 46, 70, 111, 111,
654 ];
655 let mut serialized_bytes = Vec::new();
656 block.write(&mut serialized_bytes).unwrap();
657 assert_eq!(
658 expected_bytes, serialized_bytes,
659 "serialized disk format appears to have changed, version bump required"
660 );
661 }
662
663 #[test]
664 fn test_hash_cache_key_raw() {
665 let s3_key = "a".repeat(266);
666 let etag = ETag::for_tests();
667 let key = ObjectId::new(s3_key, etag);
668 let expected_hash = "1cfd611a26062b33e98d48a84e967ddcc2a42957479a8abd541e29cfa3258639";
669 let actual_hash = hex::encode(hash_cache_key_raw(&key));
670 assert_eq!(expected_hash, actual_hash);
671 }
672
673 #[test]
674 fn get_path_for_block_key() {
675 let cache_dir = PathBuf::from("mountpoint-cache/");
676 let pool = PagedPool::new_with_candidate_sizes([1024]);
677 let data_cache = DiskDataCache::new(
678 DiskDataCacheConfig {
679 cache_directory: cache_dir,
680 block_size: 1024,
681 limit: CacheLimit::Unbounded,
682 },
683 pool,
684 );
685
686 let s3_key = "a".repeat(266);
687 let etag = ETag::for_tests();
688 let key = ObjectId::new(s3_key.to_owned(), etag);
689
690 let block_key = DiskBlockKey::new(&key, 5);
691 let hashed_cache_key = hex::encode(hash_cache_key_raw(&key));
692 let split_hashed_key = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
693 let expected = vec![
694 "mountpoint-cache",
695 CACHE_VERSION,
696 split_hashed_key.0,
697 split_hashed_key.1,
698 "0000000005",
699 ];
700 let path = data_cache.get_path_for_block_key(&block_key);
701 let results: Vec<OsString> = path.iter().map(ToOwned::to_owned).collect();
702 assert_eq!(expected, results);
703 }
704
705 #[test]
706 fn get_path_for_block_key_huge_block_index() {
707 let cache_dir = PathBuf::from("mountpoint-cache/");
708 let pool = PagedPool::new_with_candidate_sizes([1024]);
709 let data_cache = DiskDataCache::new(
710 DiskDataCacheConfig {
711 cache_directory: cache_dir,
712 block_size: 1024,
713 limit: CacheLimit::Unbounded,
714 },
715 pool,
716 );
717
718 let s3_key = "a".repeat(266);
719 let etag = ETag::for_tests();
720 let key = ObjectId::new(s3_key.to_owned(), etag);
721
722 let block_key = DiskBlockKey::new(&key, 1000000000000000);
723 let hashed_cache_key = hex::encode(hash_cache_key_raw(&key));
724 let split_hashed_key = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
725 let expected = vec![
726 "mountpoint-cache",
727 CACHE_VERSION,
728 split_hashed_key.0,
729 split_hashed_key.1,
730 "1000000000000000",
731 ];
732 let path = data_cache.get_path_for_block_key(&block_key);
733 let results: Vec<OsString> = path.iter().map(ToOwned::to_owned).collect();
734 assert_eq!(expected, results);
735 }
736
737 #[test_case(8 * 1024 * 1024, 8 * 1024 * 1024; "matching block and pool buffer sizes")]
738 #[test_case(1024 * 1024, 8 * 1024 * 1024; "block size smaller than pool buffer size")]
739 #[test_case(8 * 1024 * 1024, 1024 * 1024; "block size larger than pool buffer size")]
740 #[tokio::test]
741 async fn test_put_get(block_size: u64, pool_buffer_size: usize) {
742 let data_1 = ChecksummedBytes::new("Foo".into());
743 let data_2 = ChecksummedBytes::new("Bar".into());
744 let data_3 = ChecksummedBytes::new("Baz".into());
745
746 let object_1_size = data_1.len() + data_3.len();
747 let object_2_size = data_2.len();
748
749 let cache_directory = tempfile::tempdir().unwrap();
750 let pool = PagedPool::new_with_candidate_sizes([pool_buffer_size]);
751 let cache = DiskDataCache::new(
752 DiskDataCacheConfig {
753 cache_directory: cache_directory.path().to_path_buf(),
754 block_size,
755 limit: CacheLimit::Unbounded,
756 },
757 pool,
758 );
759 let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
760 let cache_key_2 = ObjectId::new(
761 "long-key_".repeat(100), ETag::for_tests(),
763 );
764
765 let block = cache
766 .get_block(&cache_key_1, 0, 0, object_1_size)
767 .await
768 .expect("cache should be accessible");
769 assert!(
770 block.is_none(),
771 "no entry should be available to return but got {block:?}",
772 );
773
774 cache
776 .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size)
777 .await
778 .expect("cache should be accessible");
779 let entry = cache
780 .get_block(&cache_key_1, 0, 0, object_1_size)
781 .await
782 .expect("cache should be accessible")
783 .expect("cache entry should be returned");
784 assert_eq!(
785 data_1, entry,
786 "cache entry returned should match original bytes after put"
787 );
788
789 cache
791 .put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size)
792 .await
793 .expect("cache should be accessible");
794 let entry = cache
795 .get_block(&cache_key_2, 0, 0, object_2_size)
796 .await
797 .expect("cache should be accessible")
798 .expect("cache entry should be returned");
799 assert_eq!(
800 data_2, entry,
801 "cache entry returned should match original bytes after put"
802 );
803
804 cache
806 .put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size)
807 .await
808 .expect("cache should be accessible");
809 let entry = cache
810 .get_block(&cache_key_1, 1, block_size, object_1_size)
811 .await
812 .expect("cache should be accessible")
813 .expect("cache entry should be returned");
814 assert_eq!(
815 data_3, entry,
816 "cache entry returned should match original bytes after put"
817 );
818
819 let entry = cache
821 .get_block(&cache_key_1, 0, 0, object_1_size)
822 .await
823 .expect("cache should be accessible")
824 .expect("cache entry should be returned");
825 assert_eq!(
826 data_1, entry,
827 "cache entry returned should match original bytes after put"
828 );
829 }
830
831 #[tokio::test]
832 async fn test_checksummed_bytes_slice() {
833 let data = ChecksummedBytes::new("0123456789".into());
834 let slice = data.slice(1..5);
835
836 let cache_directory = tempfile::tempdir().unwrap();
837 let pool = PagedPool::new_with_candidate_sizes([8 * 1024 * 1024]);
838 let cache = DiskDataCache::new(
839 DiskDataCacheConfig {
840 cache_directory: cache_directory.path().to_path_buf(),
841 block_size: 8 * 1024 * 1024,
842 limit: CacheLimit::Unbounded,
843 },
844 pool,
845 );
846 let cache_key = ObjectId::new("a".into(), ETag::for_tests());
847
848 cache
849 .put_block(cache_key.clone(), 0, 0, slice.clone(), slice.len())
850 .await
851 .expect("cache should be accessible");
852 let entry = cache
853 .get_block(&cache_key, 0, 0, slice.len())
854 .await
855 .expect("cache should be accessible")
856 .expect("cache entry should be returned");
857 assert_eq!(
858 slice.into_bytes().expect("original slice should be valid"),
859 entry.into_bytes().expect("returned entry should be valid"),
860 "cache entry returned should match original slice after put"
861 );
862 }
863
864 #[tokio::test]
865 async fn test_eviction() {
866 const BLOCK_SIZE: usize = 100 * 1024;
867 const LARGE_OBJECT_SIZE: usize = 1024 * 1024;
868 const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2;
869 const CACHE_LIMIT: usize = LARGE_OBJECT_SIZE;
870
871 fn create_random(seed: u64, size: usize) -> ChecksummedBytes {
872 let mut rng = SmallRng::seed_from_u64(seed);
873 let mut body = vec![0u8; size];
874 rng.fill(&mut body[..]);
875
876 ChecksummedBytes::new(body.into())
877 }
878
879 async fn is_block_in_cache(
880 cache: &DiskDataCache,
881 cache_key: &ObjectId,
882 block_idx: u64,
883 expected_bytes: &ChecksummedBytes,
884 object_size: usize,
885 ) -> bool {
886 if let Some(retrieved) = cache
887 .get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64, object_size)
888 .await
889 .expect("cache should be accessible")
890 {
891 assert_eq!(
892 retrieved.clone().into_bytes().expect("retrieved bytes should be valid"),
893 expected_bytes
894 .clone()
895 .into_bytes()
896 .expect("original bytes should be valid")
897 );
898 true
899 } else {
900 false
901 }
902 }
903
904 let large_object = create_random(0x12345678, LARGE_OBJECT_SIZE);
905 let large_object_blocks: Vec<_> = (0..large_object.len())
906 .step_by(BLOCK_SIZE)
907 .map(|offset| large_object.slice(offset..(large_object.len().min(offset + BLOCK_SIZE))))
908 .collect();
909 let large_object_key = ObjectId::new("large".into(), ETag::for_tests());
910
911 let small_object = create_random(0x23456789, SMALL_OBJECT_SIZE);
912 let small_object_blocks: Vec<_> = (0..small_object.len())
913 .step_by(BLOCK_SIZE)
914 .map(|offset| small_object.slice(offset..(small_object.len().min(offset + BLOCK_SIZE))))
915 .collect();
916 let small_object_key = ObjectId::new("small".into(), ETag::for_tests());
917
918 let cache_directory = tempfile::tempdir().unwrap();
919 let pool = PagedPool::new_with_candidate_sizes([BLOCK_SIZE]);
920 let cache = DiskDataCache::new(
921 DiskDataCacheConfig {
922 cache_directory: cache_directory.path().to_path_buf(),
923 block_size: BLOCK_SIZE as u64,
924 limit: CacheLimit::TotalSize { max_size: CACHE_LIMIT },
925 },
926 pool,
927 );
928
929 for (block_idx, bytes) in large_object_blocks.iter().enumerate() {
931 cache
932 .put_block(
933 large_object_key.clone(),
934 block_idx as u64,
935 (block_idx * BLOCK_SIZE) as u64,
936 bytes.clone(),
937 LARGE_OBJECT_SIZE,
938 )
939 .await
940 .unwrap();
941 }
942
943 for (block_idx, bytes) in small_object_blocks.iter().enumerate() {
945 cache
946 .put_block(
947 small_object_key.clone(),
948 block_idx as u64,
949 (block_idx * BLOCK_SIZE) as u64,
950 bytes.clone(),
951 SMALL_OBJECT_SIZE,
952 )
953 .await
954 .unwrap();
955 }
956
957 let count_small_object_blocks_in_cache = futures::stream::iter(small_object_blocks.iter().enumerate())
958 .filter(|&(block_idx, bytes)| {
959 is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes, SMALL_OBJECT_SIZE)
960 })
961 .count()
962 .await;
963 assert_eq!(
964 count_small_object_blocks_in_cache,
965 small_object_blocks.len(),
966 "All blocks for small object should still be in the cache"
967 );
968
969 let count_large_object_blocks_in_cache = futures::stream::iter(large_object_blocks.iter().enumerate())
970 .filter(|&(block_idx, bytes)| {
971 is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes, LARGE_OBJECT_SIZE)
972 })
973 .count()
974 .await;
975 assert!(
976 count_large_object_blocks_in_cache < large_object_blocks.len(),
977 "Some blocks for the large object should have been evicted"
978 );
979 }
980
981 #[test]
982 fn data_block_extract_checks() {
983 let data_1 = ChecksummedBytes::new("Foo".into());
984
985 let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
986 let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests());
987 let cache_key_3 = ObjectId::new("a".into(), ETag::from_str("badetag").unwrap());
988
989 let block = DiskBlock::new(cache_key_1.clone(), 0, 0, data_1.clone()).expect("should have no checksum err");
990 block
991 .data(&cache_key_1, 1, 0)
992 .expect_err("should fail due to incorrect block index");
993 block
994 .data(&cache_key_1, 0, 1024)
995 .expect_err("should fail due to incorrect block offset");
996 block
997 .data(&cache_key_2, 0, 0)
998 .expect_err("should fail due to incorrect s3 key in cache key");
999 block
1000 .data(&cache_key_3, 0, 0)
1001 .expect_err("should fail due to incorrect etag in cache key");
1002 let unpacked_bytes = block
1003 .data(&cache_key_1, 0, 0)
1004 .expect("should be OK as all fields match");
1005 assert_eq!(data_1, unpacked_bytes, "data block should return original bytes");
1006 }
1007
1008 #[test]
1009 fn validate_block_header() {
1010 let block_idx = 0;
1011 let block_offset = 0;
1012 let block_size = 4;
1013 let etag = ETag::for_tests();
1014 let s3_key = String::from("s3/key");
1015 let data_checksum = Crc32c::new(42);
1016 let mut header = DiskBlockHeader::new(
1017 block_idx,
1018 block_offset,
1019 block_size,
1020 etag.as_str().to_owned(),
1021 s3_key.clone(),
1022 data_checksum,
1023 );
1024
1025 let checksum = header
1026 .validate(&s3_key, etag.as_str(), block_idx, block_offset, block_size)
1027 .expect("should be OK with valid fields and checksum");
1028 assert_eq!(data_checksum, checksum);
1029
1030 let err = header
1032 .validate("hello", etag.as_str(), block_idx, block_offset, block_size)
1033 .expect_err("should fail with invalid s3_key");
1034 assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1035 let err = header
1036 .validate(&s3_key, "bad etag", block_idx, block_offset, block_size)
1037 .expect_err("should fail with invalid etag");
1038 assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1039 let err = header
1040 .validate(&s3_key, etag.as_str(), 5, block_offset, block_size)
1041 .expect_err("should fail with invalid block idx");
1042 assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1043 let err = header
1044 .validate(&s3_key, etag.as_str(), block_idx, 1024, block_size)
1045 .expect_err("should fail with invalid block offset");
1046 assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1047 let err = header
1048 .validate(&s3_key, etag.as_str(), block_idx, block_offset, 42)
1049 .expect_err("should fail with invalid block length");
1050 assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1051
1052 header.header_checksum = 23;
1054 let err = header
1055 .validate(&s3_key, etag.as_str(), block_idx, block_offset, block_size)
1056 .expect_err("should fail with invalid checksum");
1057 assert!(matches!(err, DiskBlockAccessError::ChecksumError));
1058 }
1059
1060 #[test_case("key")]
1061 #[test_case("etag")]
1062 #[test_case("data")]
1063 fn read_corrupted_block_should_fail(length_to_corrupt: &str) {
1064 const MAX_LENGTH: u64 = 1024;
1065
1066 fn get_u64_at(slice: &[u8], offset: usize) -> u64 {
1068 u64::from_le_bytes(slice[offset..(offset + 8)].try_into().unwrap())
1069 }
1070
1071 fn replace_u64_at(slice: &mut [u8], offset: usize, new_value: u64) {
1073 slice[offset..(offset + 8)].copy_from_slice(&new_value.to_le_bytes());
1074 }
1075
1076 let original_length = 42;
1077 let data = ChecksummedBytes::new(vec![0u8; original_length].into());
1078 let cache_key = ObjectId::new("k".into(), ETag::from_str("e").unwrap());
1079 let block = DiskBlock::new(cache_key.clone(), 0, 0, data).expect("should have no checksum err");
1080
1081 let mut buf = Vec::new();
1082 block.write(&mut buf).unwrap();
1083
1084 let (offset, expected) = match length_to_corrupt {
1087 "key" => (24, cache_key.key().len()),
1088 "etag" => (32 + cache_key.key().len(), cache_key.etag().as_str().len()),
1089 "data" => (16, original_length),
1090 _ => panic!("invalid length: {length_to_corrupt}"),
1091 };
1092
1093 assert_eq!(
1094 get_u64_at(&buf, offset) as usize,
1095 expected,
1096 "serialized length should match the expected value (have we changed the serialization format?)"
1097 );
1098
1099 replace_u64_at(&mut buf, offset, u64::MAX);
1101
1102 let pool = PagedPool::new_with_candidate_sizes([MAX_LENGTH as usize]);
1103 let err = DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH, &pool).expect_err("deserialization should fail");
1104 match length_to_corrupt {
1105 "key" | "etag" => assert!(matches!(
1106 err,
1107 DiskBlockReadWriteError::DecodeError(DecodeError::LimitExceeded)
1108 )),
1109 "data" => assert!(matches!(err, DiskBlockReadWriteError::InvalidBlockLength(_))),
1110 _ => panic!("invalid length: {length_to_corrupt}"),
1111 }
1112 }
1113
1114 #[test]
1115 fn test_concurrent_access() {
1116 let block_size = 1024 * 1024;
1117 let cache_directory = tempfile::tempdir().unwrap();
1118 let pool = PagedPool::new_with_candidate_sizes([block_size]);
1119 let data_cache = DiskDataCache::new(
1120 DiskDataCacheConfig {
1121 cache_directory: cache_directory.path().to_path_buf(),
1122 block_size: block_size as u64,
1123 limit: CacheLimit::Unbounded,
1124 },
1125 pool,
1126 );
1127 let data_cache = Arc::new(data_cache);
1128
1129 let cache_key = ObjectId::new("foo".to_owned(), ETag::for_tests());
1130 let block_idx = 0;
1131 let block_offset = 0;
1132 let object_size = 10 * block_size;
1133
1134 let pool = ThreadPool::builder().pool_size(32).create().unwrap();
1135
1136 let mut handles = Vec::new();
1138 for _ in 0..100 {
1139 let data_cache = data_cache.clone();
1140 let cache_key = cache_key.clone();
1141 let handle = pool
1142 .spawn_with_handle(async move {
1143 let block = data_cache
1144 .get_block(&cache_key, block_idx, block_offset, object_size)
1145 .await
1146 .expect("get_block should not return error");
1147 if block.is_none() {
1148 let bytes: Bytes = vec![0u8; block_size].into();
1149 data_cache
1150 .put_block(cache_key, block_idx, block_offset, bytes.into(), object_size)
1151 .await
1152 .expect("put_block should succeed");
1153 }
1154 })
1155 .unwrap();
1156 handles.push(handle);
1157 }
1158
1159 block_on(async move {
1160 for handle in handles {
1161 handle.await
1162 }
1163 });
1164 }
1165}