1use std::fs;
4use std::io::{ErrorKind, Read, Write};
5use std::os::unix::fs::{DirBuilderExt, PermissionsExt};
6use std::path::{Path, PathBuf};
7use std::time::Instant;
8
9use async_trait::async_trait;
10use bincode::config::{Configuration, Fixint, Limit, LittleEndian};
11use bincode::error::{DecodeError, EncodeError};
12use bincode::{Decode, Encode};
13use bytes::Bytes;
14use linked_hash_map::LinkedHashMap;
15use mountpoint_s3_client::checksums::crc32c::{self, Crc32c};
16use sha2::{Digest, Sha256};
17use tempfile::NamedTempFile;
18use thiserror::Error;
19use tracing::{trace, warn};
20
21use crate::checksums::IntegrityError;
22use crate::data_cache::DataCacheError;
23use crate::memory::{BufferKind, PagedPool};
24use crate::metrics::defs::{
25 ATTR_CACHE, CACHE_DISK, CACHE_EVICT_LATENCY, CACHE_GET_ERRORS, CACHE_GET_IO_SIZE, CACHE_GET_LATENCY,
26 CACHE_PUT_ERRORS, CACHE_PUT_IO_SIZE, CACHE_PUT_LATENCY, CACHE_TOTAL_SIZE,
27};
28use crate::object::ObjectId;
29use crate::sync::Mutex;
30
31use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult};
32
33const CACHE_VERSION: &str = "V2";
35
36const HASHED_DIR_SPLIT_INDEX: usize = 2;
38
39pub struct DiskDataCache {
41 config: DiskDataCacheConfig,
42 pool: PagedPool,
43 usage: Option<Mutex<UsageInfo<DiskBlockKey>>>,
45}
46
47#[derive(Debug)]
49pub struct DiskDataCacheConfig {
50 pub cache_directory: PathBuf,
51 pub block_size: u64,
53 pub limit: CacheLimit,
55}
56
57#[derive(Debug)]
59pub enum CacheLimit {
60 Unbounded,
61 TotalSize { max_size: usize },
62 AvailableSpace { min_ratio: f64 },
63}
64
65pub const DEFAULT_CACHE_MIN_AVAILABLE_RATIO: f64 = 0.05;
68
69impl Default for CacheLimit {
70 fn default() -> Self {
71 CacheLimit::AvailableSpace {
72 min_ratio: DEFAULT_CACHE_MIN_AVAILABLE_RATIO,
73 }
74 }
75}
76
77#[derive(Encode, Decode, Debug)]
82struct DiskBlockHeader {
83 block_idx: BlockIndex,
84 block_offset: u64,
85 block_len: u64,
86 etag: String,
87 s3_key: String,
88 data_checksum: u32,
89 header_checksum: u32,
90}
91
92const BINCODE_HEADER_MAX_SIZE: usize = 10000;
98
99const BINCODE_CONFIG: Configuration<LittleEndian, Fixint, Limit<BINCODE_HEADER_MAX_SIZE>> = bincode::config::standard()
101 .with_fixed_int_encoding()
102 .with_limit::<BINCODE_HEADER_MAX_SIZE>();
103
104#[derive(Debug, Error)]
106enum DiskBlockCreationError {
107 #[error(transparent)]
109 IntegrityError(#[from] IntegrityError),
110}
111
112#[derive(Debug, Error)]
114enum DiskBlockAccessError {
115 #[error("checksum over the block's fields did not match the field content")]
116 ChecksumError,
117 #[error("one or more of the fields in this block were incorrect")]
118 FieldMismatchError,
119}
120
121#[derive(Debug, Error)]
123enum DiskBlockReadWriteError {
124 #[error("Invalid block length: {0}")]
125 InvalidBlockLength(u64),
126 #[error("Error decoding the block: {0}")]
127 DecodeError(DecodeError),
128 #[error("Error encoding the block: {0}")]
129 EncodeError(EncodeError),
130 #[error("IO error: {0}")]
131 IOError(#[from] std::io::Error),
132}
133
134impl DiskBlockHeader {
135 pub fn new(
137 block_idx: BlockIndex,
138 block_offset: u64,
139 block_len: usize,
140 etag: String,
141 s3_key: String,
142 data_checksum: Crc32c,
143 ) -> Self {
144 let data_checksum = data_checksum.value();
145 let header_checksum =
146 Self::compute_checksum(block_idx, block_offset, block_len, &etag, &s3_key, data_checksum).value();
147 DiskBlockHeader {
148 block_idx,
149 block_offset,
150 block_len: block_len as u64,
151 etag,
152 s3_key,
153 data_checksum,
154 header_checksum,
155 }
156 }
157
158 fn compute_checksum(
159 block_idx: BlockIndex,
160 block_offset: u64,
161 block_len: usize,
162 etag: &str,
163 s3_key: &str,
164 data_checksum: u32,
165 ) -> Crc32c {
166 let mut hasher = crc32c::Hasher::new();
167 hasher.update(&block_idx.to_be_bytes());
168 hasher.update(&block_offset.to_be_bytes());
169 hasher.update(&block_len.to_be_bytes());
170 hasher.update(etag.as_bytes());
171 hasher.update(s3_key.as_bytes());
172 hasher.update(&data_checksum.to_be_bytes());
173 hasher.finalize()
174 }
175
176 pub fn validate(
180 &self,
181 s3_key: &str,
182 etag: &str,
183 block_idx: BlockIndex,
184 block_offset: u64,
185 block_len: usize,
186 ) -> Result<Crc32c, DiskBlockAccessError> {
187 let s3_key_match = s3_key == self.s3_key;
188 let etag_match = etag == self.etag;
189 let block_idx_match = block_idx == self.block_idx;
190 let block_offset_match = block_offset == self.block_offset;
191 let block_size_match = block_len == self.block_len as usize;
192
193 let data_checksum = self.data_checksum;
194 if s3_key_match && etag_match && block_idx_match && block_offset_match && block_size_match {
195 if Self::compute_checksum(block_idx, block_offset, block_len, etag, s3_key, data_checksum).value()
196 != self.header_checksum
197 {
198 Err(DiskBlockAccessError::ChecksumError)
199 } else {
200 Ok(Crc32c::new(data_checksum))
201 }
202 } else {
203 warn!(
204 s3_key_match,
205 etag_match, block_idx_match, block_size_match, "block data did not match expected values",
206 );
207 Err(DiskBlockAccessError::FieldMismatchError)
208 }
209 }
210}
211
212#[derive(Debug)]
214struct DiskBlock {
215 header: DiskBlockHeader,
217 data: Bytes,
219}
220
221impl DiskBlock {
222 fn new(
227 cache_key: ObjectId,
228 block_idx: BlockIndex,
229 block_offset: u64,
230 bytes: ChecksummedBytes,
231 ) -> Result<Self, DiskBlockCreationError> {
232 let s3_key = cache_key.key().to_owned();
233 let etag = cache_key.etag().as_str().to_owned();
234 let (data, data_checksum) = bytes.into_inner()?;
235 let header = DiskBlockHeader::new(block_idx, block_offset, data.len(), etag, s3_key, data_checksum);
236
237 Ok(DiskBlock { data, header })
238 }
239
240 fn data(
244 &self,
245 cache_key: &ObjectId,
246 block_idx: BlockIndex,
247 block_offset: u64,
248 ) -> Result<ChecksummedBytes, DiskBlockAccessError> {
249 let data_checksum = self.header.validate(
250 cache_key.key(),
251 cache_key.etag().as_str(),
252 block_idx,
253 block_offset,
254 self.data.len(),
255 )?;
256 let bytes = ChecksummedBytes::new_from_inner_data(self.data.clone(), data_checksum);
257 Ok(bytes)
258 }
259
260 fn read(reader: &mut impl Read, block_size: u64, pool: &PagedPool) -> Result<Self, DiskBlockReadWriteError> {
262 let header: DiskBlockHeader = bincode::decode_from_std_read(reader, BINCODE_CONFIG)?;
263
264 if header.block_len > block_size {
265 return Err(DiskBlockReadWriteError::InvalidBlockLength(header.block_len));
266 }
267
268 let size = header.block_len as usize;
269 let mut buffer = pool.get_buffer_mut(size, BufferKind::DiskCache);
270 buffer.fill_from_reader(reader)?;
271 let data = buffer.into_bytes();
272
273 Ok(Self { header, data })
274 }
275
276 fn write(&self, writer: &mut impl Write) -> Result<usize, DiskBlockReadWriteError> {
278 let header_length = bincode::encode_into_std_write(&self.header, writer, BINCODE_CONFIG)?;
279 writer.write_all(&self.data)?;
280 Ok(header_length + self.data.len())
281 }
282}
283
284impl From<DecodeError> for DiskBlockReadWriteError {
285 fn from(value: DecodeError) -> Self {
286 match value {
287 DecodeError::Io { inner, .. } => DiskBlockReadWriteError::IOError(inner),
288 value => DiskBlockReadWriteError::DecodeError(value),
289 }
290 }
291}
292
293impl From<EncodeError> for DiskBlockReadWriteError {
294 fn from(value: EncodeError) -> Self {
295 match value {
296 EncodeError::Io { inner, .. } => DiskBlockReadWriteError::IOError(inner),
297 value => DiskBlockReadWriteError::EncodeError(value),
298 }
299 }
300}
301
302impl From<std::io::Error> for DataCacheError {
303 fn from(e: std::io::Error) -> Self {
304 DataCacheError::IoFailure(e.into())
305 }
306}
307
308impl From<DiskBlockReadWriteError> for DataCacheError {
309 fn from(value: DiskBlockReadWriteError) -> Self {
310 match value {
311 DiskBlockReadWriteError::IOError(e) => DataCacheError::IoFailure(e.into()),
312 _ => DataCacheError::InvalidBlockContent,
313 }
314 }
315}
316
317impl DiskDataCache {
318 pub fn new(config: DiskDataCacheConfig, pool: PagedPool) -> Self {
320 let usage = match &config.limit {
321 CacheLimit::Unbounded => None,
322 CacheLimit::TotalSize { .. } | CacheLimit::AvailableSpace { .. } => Some(Mutex::new(UsageInfo::new())),
323 };
324 DiskDataCache { config, pool, usage }
325 }
326
327 fn get_path_for_block_key(&self, block_key: &DiskBlockKey) -> PathBuf {
329 let mut path = self.config.cache_directory.join(CACHE_VERSION);
330 block_key.append_to_path(&mut path);
331 path
332 }
333
334 fn read_block(
335 &self,
336 path: impl AsRef<Path>,
337 cache_key: &ObjectId,
338 block_idx: BlockIndex,
339 block_offset: u64,
340 ) -> DataCacheResult<Option<ChecksummedBytes>> {
341 trace!(
342 key = ?cache_key.key(),
343 offset = block_offset,
344 path = ?path.as_ref(),
345 "reading cache block",
346 );
347 let mut file = match fs::File::open(path.as_ref()) {
348 Ok(file) => file,
349 Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None),
350 Err(err) => return Err(err.into()),
351 };
352
353 let mut block_version = [0; CACHE_VERSION.len()];
354 file.read_exact(&mut block_version)?;
355 if block_version != CACHE_VERSION.as_bytes() {
356 warn!(
357 found_version = ?block_version, expected_version = ?CACHE_VERSION, path = ?path.as_ref(),
358 "stale block format found during reading"
359 );
360 return Err(DataCacheError::InvalidBlockContent);
361 }
362
363 let block = DiskBlock::read(&mut file, self.block_size(), &self.pool)
364 .inspect_err(|e| warn!(path = ?path.as_ref(), "block could not be deserialized: {:?}", e))?;
365 let bytes = block
366 .data(cache_key, block_idx, block_offset)
367 .map_err(|err| match err {
368 DiskBlockAccessError::ChecksumError | DiskBlockAccessError::FieldMismatchError => {
369 DataCacheError::InvalidBlockContent
370 }
371 })?;
372
373 Ok(Some(bytes))
374 }
375
376 fn write_block(&self, path: impl AsRef<Path>, block: DiskBlock) -> DataCacheResult<(NamedTempFile, usize)> {
377 let path = path.as_ref();
378 let cache_path_for_key = path.parent().expect("path should include cache key in directory name");
379 fs::DirBuilder::new()
380 .mode(0o700)
381 .recursive(true)
382 .create(cache_path_for_key)?;
383
384 let mut temp_file = tempfile::Builder::new()
385 .permissions(fs::Permissions::from_mode(0o600))
386 .tempfile_in(cache_path_for_key)?;
387 trace!(
388 key = block.header.s3_key,
389 offset = block.header.block_offset,
390 block_path = ?path,
391 temp_path = ?temp_file.path(),
392 "writing cache block",
393 );
394 temp_file.write_all(CACHE_VERSION.as_bytes())?;
395 let bytes_written = block.write(&mut temp_file)?;
396 Ok((temp_file, bytes_written))
397 }
398
399 fn is_limit_exceeded(&self, size: usize) -> bool {
400 metrics::gauge!(CACHE_TOTAL_SIZE, ATTR_CACHE => CACHE_DISK).set(size as f64);
401 match self.config.limit {
402 CacheLimit::Unbounded => false,
403 CacheLimit::TotalSize { max_size } => size > max_size,
404 CacheLimit::AvailableSpace { min_ratio } => {
405 let stats = match nix::sys::statvfs::statvfs(&self.config.cache_directory) {
406 Ok(stats) if stats.blocks() == 0 => {
407 warn!("unable to determine available space (0 blocks reported)");
408 return false;
409 }
410 Ok(stats) => stats,
411 Err(error) => {
412 warn!(?error, "unable to determine available space");
413 return false;
414 }
415 };
416 (stats.blocks_available() as f64) < min_ratio * (stats.blocks() as f64)
417 }
418 }
419 }
420
421 fn evict_if_needed(&self) -> DataCacheResult<()> {
422 let Some(usage) = &self.usage else {
423 return Ok(());
424 };
425
426 loop {
427 let mut usage = usage.lock().unwrap();
428 if !self.is_limit_exceeded(usage.size) {
429 break;
430 }
431 let Some(to_remove) = usage.evict_lru() else {
432 warn!("cache limit exceeded but nothing to evict");
433 return Err(DataCacheError::EvictionFailure);
434 };
435 let path_to_remove = self.get_path_for_block_key(&to_remove);
436 trace!("evicting block at {}", path_to_remove.display());
437 if let Err(remove_err) = fs::remove_file(&path_to_remove)
438 && remove_err.kind() != ErrorKind::NotFound
439 {
440 warn!("unable to evict block: {:?}", remove_err);
441 }
442 }
443 Ok(())
444 }
445}
446
447fn hash_cache_key_raw(cache_key: &ObjectId) -> [u8; 32] {
449 let s3_key = cache_key.key();
450 let etag = cache_key.etag();
451
452 let mut hasher = Sha256::new();
453 hasher.update(CACHE_VERSION);
454 hasher.update(s3_key);
455 hasher.update(etag.as_str());
456 hasher.finalize().into()
457}
458
459#[async_trait]
460impl DataCache for DiskDataCache {
461 async fn get_block(
462 &self,
463 cache_key: &ObjectId,
464 block_idx: BlockIndex,
465 block_offset: u64,
466 _object_size: usize,
467 ) -> DataCacheResult<Option<ChecksummedBytes>> {
468 if block_offset != block_idx * self.config.block_size {
469 return Err(DataCacheError::InvalidBlockOffset);
470 }
471 let start = Instant::now();
472 let block_key = DiskBlockKey::new(cache_key, block_idx);
473 let path = self.get_path_for_block_key(&block_key);
474 let result = match self.read_block(&path, cache_key, block_idx, block_offset) {
475 Ok(None) => {
476 Ok(None)
478 }
479 Ok(Some(bytes)) => {
480 metrics::histogram!(CACHE_GET_IO_SIZE, ATTR_CACHE => CACHE_DISK).record(bytes.len() as f64);
482 if let Some(usage) = &self.usage {
483 usage.lock().unwrap().refresh(&block_key);
484 }
485 Ok(Some(bytes))
486 }
487 Err(err) => {
488 metrics::counter!(CACHE_GET_ERRORS, ATTR_CACHE => CACHE_DISK).increment(1);
490 Err(err)
491 }
492 };
493 metrics::histogram!(CACHE_GET_LATENCY, ATTR_CACHE => CACHE_DISK).record(start.elapsed().as_micros() as f64);
494 result
495 }
496
497 async fn put_block(
498 &self,
499 cache_key: ObjectId,
500 block_idx: BlockIndex,
501 block_offset: u64,
502 bytes: ChecksummedBytes,
503 _object_size: usize,
504 ) -> DataCacheResult<()> {
505 if block_offset != block_idx * self.config.block_size {
506 return Err(DataCacheError::InvalidBlockOffset);
507 }
508 let start = Instant::now();
509 let bytes_len = bytes.len();
510 let block_key = DiskBlockKey::new(&cache_key, block_idx);
511 let path = self.get_path_for_block_key(&block_key);
512 trace!(?cache_key, ?path, "new block will be created in disk cache");
513
514 let put_result = (|| -> DataCacheResult<()> {
517 let block = DiskBlock::new(cache_key, block_idx, block_offset, bytes).map_err(|err| match err {
518 DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent,
519 })?;
520
521 {
522 let eviction_start = Instant::now();
523 let result = self.evict_if_needed();
524 metrics::histogram!(CACHE_EVICT_LATENCY, ATTR_CACHE => CACHE_DISK)
525 .record(eviction_start.elapsed().as_micros() as f64);
526 result
527 }?;
528
529 let result = self.write_block(&path, block);
530 let (temp_file, size) = result?;
531
532 if let Some(usage) = &self.usage {
533 let mut usage = usage.lock().unwrap();
534 _ = temp_file.persist(path).map_err(|e| e.error)?;
535 usage.add(block_key, size);
536 } else {
537 _ = temp_file.persist(path).map_err(|e| e.error)?;
538 }
539
540 Ok(())
541 })();
542
543 if put_result.is_ok() {
544 metrics::histogram!(CACHE_PUT_IO_SIZE, ATTR_CACHE => CACHE_DISK).record(bytes_len as f64);
545 } else {
546 metrics::counter!(CACHE_PUT_ERRORS, ATTR_CACHE => CACHE_DISK).increment(1);
547 }
548 metrics::histogram!(CACHE_PUT_LATENCY, ATTR_CACHE => CACHE_DISK).record(start.elapsed().as_micros() as f64);
549 put_result
550 }
551
552 fn block_size(&self) -> u64 {
553 self.config.block_size
554 }
555}
556
557#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
562struct DiskBlockKey {
563 hashed_key: [u8; 32],
564 block_index: BlockIndex,
565}
566
567impl DiskBlockKey {
568 fn new(cache_key: &ObjectId, block_index: BlockIndex) -> Self {
569 let hashed_key = hash_cache_key_raw(cache_key);
570 Self {
571 hashed_key,
572 block_index,
573 }
574 }
575
576 fn hex_key(&self) -> String {
577 hex::encode(self.hashed_key)
578 }
579
580 fn append_to_path(&self, path: &mut PathBuf) {
581 let hashed_cache_key = self.hex_key();
582
583 let (first, second) = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
585 path.push(first);
586 path.push(second);
587
588 path.push(format!("{:010}", self.block_index));
590 }
591}
592
593struct UsageInfo<K> {
595 entries: LinkedHashMap<K, usize>,
596 size: usize,
597}
598
599impl<K> UsageInfo<K>
600where
601 K: std::hash::Hash + Eq + std::fmt::Debug,
602{
603 fn new() -> Self {
604 Self {
605 entries: LinkedHashMap::new(),
606 size: 0,
607 }
608 }
609
610 fn refresh(&mut self, key: &K) -> bool {
613 self.entries.get_refresh(key).is_some()
614 }
615
616 fn add(&mut self, key: K, size: usize) {
618 if let Some(previous_size) = self.entries.insert(key, size) {
619 self.size = self.size.saturating_sub(previous_size);
620 }
621
622 self.size = self.size.saturating_add(size);
623 }
624
625 fn evict_lru(&mut self) -> Option<K> {
628 let (key, size) = self.entries.pop_front()?;
629 self.size = self.size.saturating_sub(size);
630 Some(key)
631 }
632}
633
634#[cfg(test)]
635mod tests {
636 use std::str::FromStr;
637 use std::{ffi::OsString, io::Cursor};
638
639 use super::*;
640
641 use futures::StreamExt as _;
642 use futures::executor::{ThreadPool, block_on};
643 use futures::task::SpawnExt;
644 use mountpoint_s3_client::types::ETag;
645 use rand::rngs::SmallRng;
646 use rand::{RngExt, SeedableRng};
647 use test_case::test_case;
648
649 use crate::sync::Arc;
650
651 #[test]
652 fn test_block_format_version_requires_update() {
653 let cache_key = ObjectId::new("hello-world".to_string(), ETag::for_tests());
654 let data = ChecksummedBytes::new("Foo".into());
655 let block = DiskBlock::new(cache_key, 100, 100 * 10, data).expect("should succeed as data checksum is valid");
656 let expected_bytes: Vec<u8> = vec![
657 100, 0, 0, 0, 0, 0, 0, 0, 232, 3, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116,
658 101, 115, 116, 95, 101, 116, 97, 103, 11, 0, 0, 0, 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114,
659 108, 100, 9, 85, 128, 46, 13, 202, 106, 46, 70, 111, 111,
660 ];
661 let mut serialized_bytes = Vec::new();
662 block.write(&mut serialized_bytes).unwrap();
663 assert_eq!(
664 expected_bytes, serialized_bytes,
665 "serialized disk format appears to have changed, version bump required"
666 );
667 }
668
669 #[test]
670 fn test_hash_cache_key_raw() {
671 let s3_key = "a".repeat(266);
672 let etag = ETag::for_tests();
673 let key = ObjectId::new(s3_key, etag);
674 let expected_hash = "1cfd611a26062b33e98d48a84e967ddcc2a42957479a8abd541e29cfa3258639";
675 let actual_hash = hex::encode(hash_cache_key_raw(&key));
676 assert_eq!(expected_hash, actual_hash);
677 }
678
679 #[test]
680 fn get_path_for_block_key() {
681 let cache_dir = PathBuf::from("mountpoint-cache/");
682 let pool = PagedPool::new_with_candidate_sizes([1024]);
683 let data_cache = DiskDataCache::new(
684 DiskDataCacheConfig {
685 cache_directory: cache_dir,
686 block_size: 1024,
687 limit: CacheLimit::Unbounded,
688 },
689 pool,
690 );
691
692 let s3_key = "a".repeat(266);
693 let etag = ETag::for_tests();
694 let key = ObjectId::new(s3_key.to_owned(), etag);
695
696 let block_key = DiskBlockKey::new(&key, 5);
697 let hashed_cache_key = hex::encode(hash_cache_key_raw(&key));
698 let split_hashed_key = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
699 let expected = vec![
700 "mountpoint-cache",
701 CACHE_VERSION,
702 split_hashed_key.0,
703 split_hashed_key.1,
704 "0000000005",
705 ];
706 let path = data_cache.get_path_for_block_key(&block_key);
707 let results: Vec<OsString> = path.iter().map(ToOwned::to_owned).collect();
708 assert_eq!(expected, results);
709 }
710
711 #[test]
712 fn get_path_for_block_key_huge_block_index() {
713 let cache_dir = PathBuf::from("mountpoint-cache/");
714 let pool = PagedPool::new_with_candidate_sizes([1024]);
715 let data_cache = DiskDataCache::new(
716 DiskDataCacheConfig {
717 cache_directory: cache_dir,
718 block_size: 1024,
719 limit: CacheLimit::Unbounded,
720 },
721 pool,
722 );
723
724 let s3_key = "a".repeat(266);
725 let etag = ETag::for_tests();
726 let key = ObjectId::new(s3_key.to_owned(), etag);
727
728 let block_key = DiskBlockKey::new(&key, 1000000000000000);
729 let hashed_cache_key = hex::encode(hash_cache_key_raw(&key));
730 let split_hashed_key = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
731 let expected = vec![
732 "mountpoint-cache",
733 CACHE_VERSION,
734 split_hashed_key.0,
735 split_hashed_key.1,
736 "1000000000000000",
737 ];
738 let path = data_cache.get_path_for_block_key(&block_key);
739 let results: Vec<OsString> = path.iter().map(ToOwned::to_owned).collect();
740 assert_eq!(expected, results);
741 }
742
743 #[test_case(8 * 1024 * 1024, 8 * 1024 * 1024; "matching block and pool buffer sizes")]
744 #[test_case(1024 * 1024, 8 * 1024 * 1024; "block size smaller than pool buffer size")]
745 #[test_case(8 * 1024 * 1024, 1024 * 1024; "block size larger than pool buffer size")]
746 #[tokio::test]
747 async fn test_put_get(block_size: u64, pool_buffer_size: usize) {
748 let data_1 = ChecksummedBytes::new("Foo".into());
749 let data_2 = ChecksummedBytes::new("Bar".into());
750 let data_3 = ChecksummedBytes::new("Baz".into());
751
752 let object_1_size = data_1.len() + data_3.len();
753 let object_2_size = data_2.len();
754
755 let cache_directory = tempfile::tempdir().unwrap();
756 let pool = PagedPool::new_with_candidate_sizes([pool_buffer_size]);
757 let cache = DiskDataCache::new(
758 DiskDataCacheConfig {
759 cache_directory: cache_directory.path().to_path_buf(),
760 block_size,
761 limit: CacheLimit::Unbounded,
762 },
763 pool,
764 );
765 let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
766 let cache_key_2 = ObjectId::new(
767 "long-key_".repeat(100), ETag::for_tests(),
769 );
770
771 let block = cache
772 .get_block(&cache_key_1, 0, 0, object_1_size)
773 .await
774 .expect("cache should be accessible");
775 assert!(
776 block.is_none(),
777 "no entry should be available to return but got {block:?}",
778 );
779
780 cache
782 .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size)
783 .await
784 .expect("cache should be accessible");
785 let entry = cache
786 .get_block(&cache_key_1, 0, 0, object_1_size)
787 .await
788 .expect("cache should be accessible")
789 .expect("cache entry should be returned");
790 assert_eq!(
791 data_1, entry,
792 "cache entry returned should match original bytes after put"
793 );
794
795 cache
797 .put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size)
798 .await
799 .expect("cache should be accessible");
800 let entry = cache
801 .get_block(&cache_key_2, 0, 0, object_2_size)
802 .await
803 .expect("cache should be accessible")
804 .expect("cache entry should be returned");
805 assert_eq!(
806 data_2, entry,
807 "cache entry returned should match original bytes after put"
808 );
809
810 cache
812 .put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size)
813 .await
814 .expect("cache should be accessible");
815 let entry = cache
816 .get_block(&cache_key_1, 1, block_size, object_1_size)
817 .await
818 .expect("cache should be accessible")
819 .expect("cache entry should be returned");
820 assert_eq!(
821 data_3, entry,
822 "cache entry returned should match original bytes after put"
823 );
824
825 let entry = cache
827 .get_block(&cache_key_1, 0, 0, object_1_size)
828 .await
829 .expect("cache should be accessible")
830 .expect("cache entry should be returned");
831 assert_eq!(
832 data_1, entry,
833 "cache entry returned should match original bytes after put"
834 );
835 }
836
837 #[tokio::test]
838 async fn test_checksummed_bytes_slice() {
839 let data = ChecksummedBytes::new("0123456789".into());
840 let slice = data.slice(1..5);
841
842 let cache_directory = tempfile::tempdir().unwrap();
843 let pool = PagedPool::new_with_candidate_sizes([8 * 1024 * 1024]);
844 let cache = DiskDataCache::new(
845 DiskDataCacheConfig {
846 cache_directory: cache_directory.path().to_path_buf(),
847 block_size: 8 * 1024 * 1024,
848 limit: CacheLimit::Unbounded,
849 },
850 pool,
851 );
852 let cache_key = ObjectId::new("a".into(), ETag::for_tests());
853
854 cache
855 .put_block(cache_key.clone(), 0, 0, slice.clone(), slice.len())
856 .await
857 .expect("cache should be accessible");
858 let entry = cache
859 .get_block(&cache_key, 0, 0, slice.len())
860 .await
861 .expect("cache should be accessible")
862 .expect("cache entry should be returned");
863 assert_eq!(
864 slice.into_bytes().expect("original slice should be valid"),
865 entry.into_bytes().expect("returned entry should be valid"),
866 "cache entry returned should match original slice after put"
867 );
868 }
869
870 #[tokio::test]
871 async fn test_eviction() {
872 const BLOCK_SIZE: usize = 100 * 1024;
873 const LARGE_OBJECT_SIZE: usize = 1024 * 1024;
874 const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2;
875 const CACHE_LIMIT: usize = LARGE_OBJECT_SIZE;
876
877 fn create_random(seed: u64, size: usize) -> ChecksummedBytes {
878 let mut rng = SmallRng::seed_from_u64(seed);
879 let mut body = vec![0u8; size];
880 rng.fill(&mut body[..]);
881
882 ChecksummedBytes::new(body.into())
883 }
884
885 async fn is_block_in_cache(
886 cache: &DiskDataCache,
887 cache_key: &ObjectId,
888 block_idx: u64,
889 expected_bytes: &ChecksummedBytes,
890 object_size: usize,
891 ) -> bool {
892 if let Some(retrieved) = cache
893 .get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64, object_size)
894 .await
895 .expect("cache should be accessible")
896 {
897 assert_eq!(
898 retrieved.clone().into_bytes().expect("retrieved bytes should be valid"),
899 expected_bytes
900 .clone()
901 .into_bytes()
902 .expect("original bytes should be valid")
903 );
904 true
905 } else {
906 false
907 }
908 }
909
910 let large_object = create_random(0x12345678, LARGE_OBJECT_SIZE);
911 let large_object_blocks: Vec<_> = (0..large_object.len())
912 .step_by(BLOCK_SIZE)
913 .map(|offset| large_object.slice(offset..(large_object.len().min(offset + BLOCK_SIZE))))
914 .collect();
915 let large_object_key = ObjectId::new("large".into(), ETag::for_tests());
916
917 let small_object = create_random(0x23456789, SMALL_OBJECT_SIZE);
918 let small_object_blocks: Vec<_> = (0..small_object.len())
919 .step_by(BLOCK_SIZE)
920 .map(|offset| small_object.slice(offset..(small_object.len().min(offset + BLOCK_SIZE))))
921 .collect();
922 let small_object_key = ObjectId::new("small".into(), ETag::for_tests());
923
924 let cache_directory = tempfile::tempdir().unwrap();
925 let pool = PagedPool::new_with_candidate_sizes([BLOCK_SIZE]);
926 let cache = DiskDataCache::new(
927 DiskDataCacheConfig {
928 cache_directory: cache_directory.path().to_path_buf(),
929 block_size: BLOCK_SIZE as u64,
930 limit: CacheLimit::TotalSize { max_size: CACHE_LIMIT },
931 },
932 pool,
933 );
934
935 for (block_idx, bytes) in large_object_blocks.iter().enumerate() {
937 cache
938 .put_block(
939 large_object_key.clone(),
940 block_idx as u64,
941 (block_idx * BLOCK_SIZE) as u64,
942 bytes.clone(),
943 LARGE_OBJECT_SIZE,
944 )
945 .await
946 .unwrap();
947 }
948
949 for (block_idx, bytes) in small_object_blocks.iter().enumerate() {
951 cache
952 .put_block(
953 small_object_key.clone(),
954 block_idx as u64,
955 (block_idx * BLOCK_SIZE) as u64,
956 bytes.clone(),
957 SMALL_OBJECT_SIZE,
958 )
959 .await
960 .unwrap();
961 }
962
963 let count_small_object_blocks_in_cache = futures::stream::iter(small_object_blocks.iter().enumerate())
964 .filter(|&(block_idx, bytes)| {
965 is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes, SMALL_OBJECT_SIZE)
966 })
967 .count()
968 .await;
969 assert_eq!(
970 count_small_object_blocks_in_cache,
971 small_object_blocks.len(),
972 "All blocks for small object should still be in the cache"
973 );
974
975 let count_large_object_blocks_in_cache = futures::stream::iter(large_object_blocks.iter().enumerate())
976 .filter(|&(block_idx, bytes)| {
977 is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes, LARGE_OBJECT_SIZE)
978 })
979 .count()
980 .await;
981 assert!(
982 count_large_object_blocks_in_cache < large_object_blocks.len(),
983 "Some blocks for the large object should have been evicted"
984 );
985 }
986
987 #[test]
988 fn data_block_extract_checks() {
989 let data_1 = ChecksummedBytes::new("Foo".into());
990
991 let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
992 let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests());
993 let cache_key_3 = ObjectId::new("a".into(), ETag::from_str("badetag").unwrap());
994
995 let block = DiskBlock::new(cache_key_1.clone(), 0, 0, data_1.clone()).expect("should have no checksum err");
996 block
997 .data(&cache_key_1, 1, 0)
998 .expect_err("should fail due to incorrect block index");
999 block
1000 .data(&cache_key_1, 0, 1024)
1001 .expect_err("should fail due to incorrect block offset");
1002 block
1003 .data(&cache_key_2, 0, 0)
1004 .expect_err("should fail due to incorrect s3 key in cache key");
1005 block
1006 .data(&cache_key_3, 0, 0)
1007 .expect_err("should fail due to incorrect etag in cache key");
1008 let unpacked_bytes = block
1009 .data(&cache_key_1, 0, 0)
1010 .expect("should be OK as all fields match");
1011 assert_eq!(data_1, unpacked_bytes, "data block should return original bytes");
1012 }
1013
1014 #[test]
1015 fn validate_block_header() {
1016 let block_idx = 0;
1017 let block_offset = 0;
1018 let block_size = 4;
1019 let etag = ETag::for_tests();
1020 let s3_key = String::from("s3/key");
1021 let data_checksum = Crc32c::new(42);
1022 let mut header = DiskBlockHeader::new(
1023 block_idx,
1024 block_offset,
1025 block_size,
1026 etag.as_str().to_owned(),
1027 s3_key.clone(),
1028 data_checksum,
1029 );
1030
1031 let checksum = header
1032 .validate(&s3_key, etag.as_str(), block_idx, block_offset, block_size)
1033 .expect("should be OK with valid fields and checksum");
1034 assert_eq!(data_checksum, checksum);
1035
1036 let err = header
1038 .validate("hello", etag.as_str(), block_idx, block_offset, block_size)
1039 .expect_err("should fail with invalid s3_key");
1040 assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1041 let err = header
1042 .validate(&s3_key, "bad etag", block_idx, block_offset, block_size)
1043 .expect_err("should fail with invalid etag");
1044 assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1045 let err = header
1046 .validate(&s3_key, etag.as_str(), 5, block_offset, block_size)
1047 .expect_err("should fail with invalid block idx");
1048 assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1049 let err = header
1050 .validate(&s3_key, etag.as_str(), block_idx, 1024, block_size)
1051 .expect_err("should fail with invalid block offset");
1052 assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1053 let err = header
1054 .validate(&s3_key, etag.as_str(), block_idx, block_offset, 42)
1055 .expect_err("should fail with invalid block length");
1056 assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1057
1058 header.header_checksum = 23;
1060 let err = header
1061 .validate(&s3_key, etag.as_str(), block_idx, block_offset, block_size)
1062 .expect_err("should fail with invalid checksum");
1063 assert!(matches!(err, DiskBlockAccessError::ChecksumError));
1064 }
1065
1066 #[test_case("key")]
1067 #[test_case("etag")]
1068 #[test_case("data")]
1069 fn read_corrupted_block_should_fail(length_to_corrupt: &str) {
1070 const MAX_LENGTH: u64 = 1024;
1071
1072 fn get_u64_at(slice: &[u8], offset: usize) -> u64 {
1074 u64::from_le_bytes(slice[offset..(offset + 8)].try_into().unwrap())
1075 }
1076
1077 fn replace_u64_at(slice: &mut [u8], offset: usize, new_value: u64) {
1079 slice[offset..(offset + 8)].copy_from_slice(&new_value.to_le_bytes());
1080 }
1081
1082 let original_length = 42;
1083 let data = ChecksummedBytes::new(vec![0u8; original_length].into());
1084 let cache_key = ObjectId::new("k".into(), ETag::from_str("e").unwrap());
1085 let block = DiskBlock::new(cache_key.clone(), 0, 0, data).expect("should have no checksum err");
1086
1087 let mut buf = Vec::new();
1088 block.write(&mut buf).unwrap();
1089
1090 let (offset, expected) = match length_to_corrupt {
1093 "key" => (24, cache_key.key().len()),
1094 "etag" => (32 + cache_key.key().len(), cache_key.etag().as_str().len()),
1095 "data" => (16, original_length),
1096 _ => panic!("invalid length: {length_to_corrupt}"),
1097 };
1098
1099 assert_eq!(
1100 get_u64_at(&buf, offset) as usize,
1101 expected,
1102 "serialized length should match the expected value (have we changed the serialization format?)"
1103 );
1104
1105 replace_u64_at(&mut buf, offset, u64::MAX);
1107
1108 let pool = PagedPool::new_with_candidate_sizes([MAX_LENGTH as usize]);
1109 let err = DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH, &pool).expect_err("deserialization should fail");
1110 match length_to_corrupt {
1111 "key" | "etag" => assert!(matches!(
1112 err,
1113 DiskBlockReadWriteError::DecodeError(DecodeError::LimitExceeded)
1114 )),
1115 "data" => assert!(matches!(err, DiskBlockReadWriteError::InvalidBlockLength(_))),
1116 _ => panic!("invalid length: {length_to_corrupt}"),
1117 }
1118 }
1119
1120 #[test]
1121 fn test_concurrent_access() {
1122 let block_size = 1024 * 1024;
1123 let cache_directory = tempfile::tempdir().unwrap();
1124 let pool = PagedPool::new_with_candidate_sizes([block_size]);
1125 let data_cache = DiskDataCache::new(
1126 DiskDataCacheConfig {
1127 cache_directory: cache_directory.path().to_path_buf(),
1128 block_size: block_size as u64,
1129 limit: CacheLimit::Unbounded,
1130 },
1131 pool,
1132 );
1133 let data_cache = Arc::new(data_cache);
1134
1135 let cache_key = ObjectId::new("foo".to_owned(), ETag::for_tests());
1136 let block_idx = 0;
1137 let block_offset = 0;
1138 let object_size = 10 * block_size;
1139
1140 let pool = ThreadPool::builder().pool_size(32).create().unwrap();
1141
1142 let mut handles = Vec::new();
1144 for _ in 0..100 {
1145 let data_cache = data_cache.clone();
1146 let cache_key = cache_key.clone();
1147 let handle = pool
1148 .spawn_with_handle(async move {
1149 let block = data_cache
1150 .get_block(&cache_key, block_idx, block_offset, object_size)
1151 .await
1152 .expect("get_block should not return error");
1153 if block.is_none() {
1154 let bytes: Bytes = vec![0u8; block_size].into();
1155 data_cache
1156 .put_block(cache_key, block_idx, block_offset, bytes.into(), object_size)
1157 .await
1158 .expect("put_block should succeed");
1159 }
1160 })
1161 .unwrap();
1162 handles.push(handle);
1163 }
1164
1165 block_on(async move {
1166 for handle in handles {
1167 handle.await
1168 }
1169 });
1170 }
1171}