use std::fs;
use std::io::{ErrorKind, Read, Write};
use std::os::unix::fs::{DirBuilderExt, PermissionsExt};
use std::path::{Path, PathBuf};
use std::time::Instant;
use async_trait::async_trait;
use bincode::config::{Configuration, Fixint, Limit, LittleEndian};
use bincode::error::{DecodeError, EncodeError};
use bincode::{Decode, Encode};
use bytes::Bytes;
use linked_hash_map::LinkedHashMap;
use mountpoint_s3_client::checksums::crc32c::{self, Crc32c};
use sha2::{Digest, Sha256};
use tempfile::NamedTempFile;
use thiserror::Error;
use tracing::{trace, warn};
use crate::checksums::IntegrityError;
use crate::data_cache::DataCacheError;
use crate::memory::{BufferKind, PagedPool};
use crate::metrics::defs::{
ATTR_CACHE, CACHE_DISK, CACHE_EVICT_LATENCY, CACHE_GET_ERRORS, CACHE_GET_IO_SIZE, CACHE_GET_LATENCY,
CACHE_PUT_ERRORS, CACHE_PUT_IO_SIZE, CACHE_PUT_LATENCY, CACHE_TOTAL_SIZE,
};
use crate::object::ObjectId;
use crate::sync::Mutex;
use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult};
const CACHE_VERSION: &str = "V2";
const HASHED_DIR_SPLIT_INDEX: usize = 2;
pub struct DiskDataCache {
config: DiskDataCacheConfig,
pool: PagedPool,
usage: Option<Mutex<UsageInfo<DiskBlockKey>>>,
}
#[derive(Debug)]
pub struct DiskDataCacheConfig {
pub cache_directory: PathBuf,
pub block_size: u64,
pub limit: CacheLimit,
}
#[derive(Debug)]
pub enum CacheLimit {
Unbounded,
TotalSize { max_size: usize },
AvailableSpace { min_ratio: f64 },
}
pub const DEFAULT_CACHE_MIN_AVAILABLE_RATIO: f64 = 0.05;
impl Default for CacheLimit {
fn default() -> Self {
CacheLimit::AvailableSpace {
min_ratio: DEFAULT_CACHE_MIN_AVAILABLE_RATIO,
}
}
}
#[derive(Encode, Decode, Debug)]
struct DiskBlockHeader {
block_idx: BlockIndex,
block_offset: u64,
block_len: u64,
etag: String,
s3_key: String,
data_checksum: u32,
header_checksum: u32,
}
const BINCODE_HEADER_MAX_SIZE: usize = 10000;
const BINCODE_CONFIG: Configuration<LittleEndian, Fixint, Limit<BINCODE_HEADER_MAX_SIZE>> = bincode::config::standard()
.with_fixed_int_encoding()
.with_limit::<BINCODE_HEADER_MAX_SIZE>();
#[derive(Debug, Error)]
enum DiskBlockCreationError {
#[error(transparent)]
IntegrityError(#[from] IntegrityError),
}
#[derive(Debug, Error)]
enum DiskBlockAccessError {
#[error("checksum over the block's fields did not match the field content")]
ChecksumError,
#[error("one or more of the fields in this block were incorrect")]
FieldMismatchError,
}
#[derive(Debug, Error)]
enum DiskBlockReadWriteError {
#[error("Invalid block length: {0}")]
InvalidBlockLength(u64),
#[error("Error decoding the block: {0}")]
DecodeError(DecodeError),
#[error("Error encoding the block: {0}")]
EncodeError(EncodeError),
#[error("IO error: {0}")]
IOError(#[from] std::io::Error),
}
impl DiskBlockHeader {
pub fn new(
block_idx: BlockIndex,
block_offset: u64,
block_len: usize,
etag: String,
s3_key: String,
data_checksum: Crc32c,
) -> Self {
let data_checksum = data_checksum.value();
let header_checksum =
Self::compute_checksum(block_idx, block_offset, block_len, &etag, &s3_key, data_checksum).value();
DiskBlockHeader {
block_idx,
block_offset,
block_len: block_len as u64,
etag,
s3_key,
data_checksum,
header_checksum,
}
}
fn compute_checksum(
block_idx: BlockIndex,
block_offset: u64,
block_len: usize,
etag: &str,
s3_key: &str,
data_checksum: u32,
) -> Crc32c {
let mut hasher = crc32c::Hasher::new();
hasher.update(&block_idx.to_be_bytes());
hasher.update(&block_offset.to_be_bytes());
hasher.update(&block_len.to_be_bytes());
hasher.update(etag.as_bytes());
hasher.update(s3_key.as_bytes());
hasher.update(&data_checksum.to_be_bytes());
hasher.finalize()
}
pub fn validate(
&self,
s3_key: &str,
etag: &str,
block_idx: BlockIndex,
block_offset: u64,
block_len: usize,
) -> Result<Crc32c, DiskBlockAccessError> {
let s3_key_match = s3_key == self.s3_key;
let etag_match = etag == self.etag;
let block_idx_match = block_idx == self.block_idx;
let block_offset_match = block_offset == self.block_offset;
let block_size_match = block_len == self.block_len as usize;
let data_checksum = self.data_checksum;
if s3_key_match && etag_match && block_idx_match && block_offset_match && block_size_match {
if Self::compute_checksum(block_idx, block_offset, block_len, etag, s3_key, data_checksum).value()
!= self.header_checksum
{
Err(DiskBlockAccessError::ChecksumError)
} else {
Ok(Crc32c::new(data_checksum))
}
} else {
warn!(
s3_key_match,
etag_match, block_idx_match, block_size_match, "block data did not match expected values",
);
Err(DiskBlockAccessError::FieldMismatchError)
}
}
}
#[derive(Debug)]
struct DiskBlock {
header: DiskBlockHeader,
data: Bytes,
}
impl DiskBlock {
fn new(
cache_key: ObjectId,
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
) -> Result<Self, DiskBlockCreationError> {
let s3_key = cache_key.key().to_owned();
let etag = cache_key.etag().as_str().to_owned();
let (data, data_checksum) = bytes.into_inner()?;
let header = DiskBlockHeader::new(block_idx, block_offset, data.len(), etag, s3_key, data_checksum);
Ok(DiskBlock { data, header })
}
fn data(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
) -> Result<ChecksummedBytes, DiskBlockAccessError> {
let data_checksum = self.header.validate(
cache_key.key(),
cache_key.etag().as_str(),
block_idx,
block_offset,
self.data.len(),
)?;
let bytes = ChecksummedBytes::new_from_inner_data(self.data.clone(), data_checksum);
Ok(bytes)
}
fn read(reader: &mut impl Read, block_size: u64, pool: &PagedPool) -> Result<Self, DiskBlockReadWriteError> {
let header: DiskBlockHeader = bincode::decode_from_std_read(reader, BINCODE_CONFIG)?;
if header.block_len > block_size {
return Err(DiskBlockReadWriteError::InvalidBlockLength(header.block_len));
}
let size = header.block_len as usize;
let mut buffer = pool.get_buffer_mut(size, BufferKind::DiskCache);
buffer.fill_from_reader(reader)?;
let data = buffer.into_bytes();
Ok(Self { header, data })
}
fn write(&self, writer: &mut impl Write) -> Result<usize, DiskBlockReadWriteError> {
let header_length = bincode::encode_into_std_write(&self.header, writer, BINCODE_CONFIG)?;
writer.write_all(&self.data)?;
Ok(header_length + self.data.len())
}
}
impl From<DecodeError> for DiskBlockReadWriteError {
fn from(value: DecodeError) -> Self {
match value {
DecodeError::Io { inner, .. } => DiskBlockReadWriteError::IOError(inner),
value => DiskBlockReadWriteError::DecodeError(value),
}
}
}
impl From<EncodeError> for DiskBlockReadWriteError {
fn from(value: EncodeError) -> Self {
match value {
EncodeError::Io { inner, .. } => DiskBlockReadWriteError::IOError(inner),
value => DiskBlockReadWriteError::EncodeError(value),
}
}
}
impl From<std::io::Error> for DataCacheError {
fn from(e: std::io::Error) -> Self {
DataCacheError::IoFailure(e.into())
}
}
impl From<DiskBlockReadWriteError> for DataCacheError {
fn from(value: DiskBlockReadWriteError) -> Self {
match value {
DiskBlockReadWriteError::IOError(e) => DataCacheError::IoFailure(e.into()),
_ => DataCacheError::InvalidBlockContent,
}
}
}
impl DiskDataCache {
pub fn new(config: DiskDataCacheConfig, pool: PagedPool) -> Self {
let usage = match &config.limit {
CacheLimit::Unbounded => None,
CacheLimit::TotalSize { .. } | CacheLimit::AvailableSpace { .. } => Some(Mutex::new(UsageInfo::new())),
};
DiskDataCache { config, pool, usage }
}
fn get_path_for_block_key(&self, block_key: &DiskBlockKey) -> PathBuf {
let mut path = self.config.cache_directory.join(CACHE_VERSION);
block_key.append_to_path(&mut path);
path
}
fn read_block(
&self,
path: impl AsRef<Path>,
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
) -> DataCacheResult<Option<ChecksummedBytes>> {
trace!(
key = ?cache_key.key(),
offset = block_offset,
path = ?path.as_ref(),
"reading cache block",
);
let mut file = match fs::File::open(path.as_ref()) {
Ok(file) => file,
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(err.into()),
};
let mut block_version = [0; CACHE_VERSION.len()];
file.read_exact(&mut block_version)?;
if block_version != CACHE_VERSION.as_bytes() {
warn!(
found_version = ?block_version, expected_version = ?CACHE_VERSION, path = ?path.as_ref(),
"stale block format found during reading"
);
return Err(DataCacheError::InvalidBlockContent);
}
let block = DiskBlock::read(&mut file, self.block_size(), &self.pool)
.inspect_err(|e| warn!(path = ?path.as_ref(), "block could not be deserialized: {:?}", e))?;
let bytes = block
.data(cache_key, block_idx, block_offset)
.map_err(|err| match err {
DiskBlockAccessError::ChecksumError | DiskBlockAccessError::FieldMismatchError => {
DataCacheError::InvalidBlockContent
}
})?;
Ok(Some(bytes))
}
fn write_block(&self, path: impl AsRef<Path>, block: DiskBlock) -> DataCacheResult<(NamedTempFile, usize)> {
let path = path.as_ref();
let cache_path_for_key = path.parent().expect("path should include cache key in directory name");
fs::DirBuilder::new()
.mode(0o700)
.recursive(true)
.create(cache_path_for_key)?;
let mut temp_file = tempfile::Builder::new()
.permissions(fs::Permissions::from_mode(0o600))
.tempfile_in(cache_path_for_key)?;
trace!(
key = block.header.s3_key,
offset = block.header.block_offset,
block_path = ?path,
temp_path = ?temp_file.path(),
"writing cache block",
);
temp_file.write_all(CACHE_VERSION.as_bytes())?;
let bytes_written = block.write(&mut temp_file)?;
Ok((temp_file, bytes_written))
}
fn is_limit_exceeded(&self, size: usize) -> bool {
metrics::gauge!(CACHE_TOTAL_SIZE, ATTR_CACHE => CACHE_DISK).set(size as f64);
match self.config.limit {
CacheLimit::Unbounded => false,
CacheLimit::TotalSize { max_size } => size > max_size,
CacheLimit::AvailableSpace { min_ratio } => {
let stats = match nix::sys::statvfs::statvfs(&self.config.cache_directory) {
Ok(stats) if stats.blocks() == 0 => {
warn!("unable to determine available space (0 blocks reported)");
return false;
}
Ok(stats) => stats,
Err(error) => {
warn!(?error, "unable to determine available space");
return false;
}
};
(stats.blocks_available() as f64) < min_ratio * (stats.blocks() as f64)
}
}
}
fn evict_if_needed(&self) -> DataCacheResult<()> {
let Some(usage) = &self.usage else {
return Ok(());
};
loop {
let mut usage = usage.lock().unwrap();
if !self.is_limit_exceeded(usage.size) {
break;
}
let Some(to_remove) = usage.evict_lru() else {
warn!("cache limit exceeded but nothing to evict");
return Err(DataCacheError::EvictionFailure);
};
let path_to_remove = self.get_path_for_block_key(&to_remove);
trace!("evicting block at {}", path_to_remove.display());
if let Err(remove_err) = fs::remove_file(&path_to_remove)
&& remove_err.kind() != ErrorKind::NotFound
{
warn!("unable to evict block: {:?}", remove_err);
}
}
Ok(())
}
}
fn hash_cache_key_raw(cache_key: &ObjectId) -> [u8; 32] {
let s3_key = cache_key.key();
let etag = cache_key.etag();
let mut hasher = Sha256::new();
hasher.update(CACHE_VERSION);
hasher.update(s3_key);
hasher.update(etag.as_str());
hasher.finalize().into()
}
#[async_trait]
impl DataCache for DiskDataCache {
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
_object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
if block_offset != block_idx * self.config.block_size {
return Err(DataCacheError::InvalidBlockOffset);
}
let start = Instant::now();
let block_key = DiskBlockKey::new(cache_key, block_idx);
let path = self.get_path_for_block_key(&block_key);
let result = match self.read_block(&path, cache_key, block_idx, block_offset) {
Ok(None) => {
Ok(None)
}
Ok(Some(bytes)) => {
metrics::histogram!(CACHE_GET_IO_SIZE, ATTR_CACHE => CACHE_DISK).record(bytes.len() as f64);
if let Some(usage) = &self.usage {
usage.lock().unwrap().refresh(&block_key);
}
Ok(Some(bytes))
}
Err(err) => {
metrics::counter!(CACHE_GET_ERRORS, ATTR_CACHE => CACHE_DISK).increment(1);
Err(err)
}
};
metrics::histogram!(CACHE_GET_LATENCY, ATTR_CACHE => CACHE_DISK).record(start.elapsed().as_micros() as f64);
result
}
async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
_object_size: usize,
) -> DataCacheResult<()> {
if block_offset != block_idx * self.config.block_size {
return Err(DataCacheError::InvalidBlockOffset);
}
let start = Instant::now();
let bytes_len = bytes.len();
let block_key = DiskBlockKey::new(&cache_key, block_idx);
let path = self.get_path_for_block_key(&block_key);
trace!(?cache_key, ?path, "new block will be created in disk cache");
let put_result = (|| -> DataCacheResult<()> {
let block = DiskBlock::new(cache_key, block_idx, block_offset, bytes).map_err(|err| match err {
DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent,
})?;
{
let eviction_start = Instant::now();
let result = self.evict_if_needed();
metrics::histogram!(CACHE_EVICT_LATENCY, ATTR_CACHE => CACHE_DISK)
.record(eviction_start.elapsed().as_micros() as f64);
result
}?;
let result = self.write_block(&path, block);
let (temp_file, size) = result?;
if let Some(usage) = &self.usage {
let mut usage = usage.lock().unwrap();
_ = temp_file.persist(path).map_err(|e| e.error)?;
usage.add(block_key, size);
} else {
_ = temp_file.persist(path).map_err(|e| e.error)?;
}
Ok(())
})();
if put_result.is_ok() {
metrics::histogram!(CACHE_PUT_IO_SIZE, ATTR_CACHE => CACHE_DISK).record(bytes_len as f64);
} else {
metrics::counter!(CACHE_PUT_ERRORS, ATTR_CACHE => CACHE_DISK).increment(1);
}
metrics::histogram!(CACHE_PUT_LATENCY, ATTR_CACHE => CACHE_DISK).record(start.elapsed().as_micros() as f64);
put_result
}
fn block_size(&self) -> u64 {
self.config.block_size
}
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
struct DiskBlockKey {
hashed_key: [u8; 32],
block_index: BlockIndex,
}
impl DiskBlockKey {
fn new(cache_key: &ObjectId, block_index: BlockIndex) -> Self {
let hashed_key = hash_cache_key_raw(cache_key);
Self {
hashed_key,
block_index,
}
}
fn hex_key(&self) -> String {
hex::encode(self.hashed_key)
}
fn append_to_path(&self, path: &mut PathBuf) {
let hashed_cache_key = self.hex_key();
let (first, second) = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
path.push(first);
path.push(second);
path.push(format!("{:010}", self.block_index));
}
}
struct UsageInfo<K> {
entries: LinkedHashMap<K, usize>,
size: usize,
}
impl<K> UsageInfo<K>
where
K: std::hash::Hash + Eq + std::fmt::Debug,
{
fn new() -> Self {
Self {
entries: LinkedHashMap::new(),
size: 0,
}
}
fn refresh(&mut self, key: &K) -> bool {
self.entries.get_refresh(key).is_some()
}
fn add(&mut self, key: K, size: usize) {
if let Some(previous_size) = self.entries.insert(key, size) {
self.size = self.size.saturating_sub(previous_size);
}
self.size = self.size.saturating_add(size);
}
fn evict_lru(&mut self) -> Option<K> {
let (key, size) = self.entries.pop_front()?;
self.size = self.size.saturating_sub(size);
Some(key)
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use std::{ffi::OsString, io::Cursor};
use super::*;
use futures::StreamExt as _;
use futures::executor::{ThreadPool, block_on};
use futures::task::SpawnExt;
use mountpoint_s3_client::types::ETag;
use rand::rngs::SmallRng;
use rand::{RngExt, SeedableRng};
use test_case::test_case;
use crate::sync::Arc;
#[test]
fn test_block_format_version_requires_update() {
let cache_key = ObjectId::new("hello-world".to_string(), ETag::for_tests());
let data = ChecksummedBytes::new("Foo".into());
let block = DiskBlock::new(cache_key, 100, 100 * 10, data).expect("should succeed as data checksum is valid");
let expected_bytes: Vec<u8> = vec![
100, 0, 0, 0, 0, 0, 0, 0, 232, 3, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116,
101, 115, 116, 95, 101, 116, 97, 103, 11, 0, 0, 0, 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114,
108, 100, 9, 85, 128, 46, 13, 202, 106, 46, 70, 111, 111,
];
let mut serialized_bytes = Vec::new();
block.write(&mut serialized_bytes).unwrap();
assert_eq!(
expected_bytes, serialized_bytes,
"serialized disk format appears to have changed, version bump required"
);
}
#[test]
fn test_hash_cache_key_raw() {
let s3_key = "a".repeat(266);
let etag = ETag::for_tests();
let key = ObjectId::new(s3_key, etag);
let expected_hash = "1cfd611a26062b33e98d48a84e967ddcc2a42957479a8abd541e29cfa3258639";
let actual_hash = hex::encode(hash_cache_key_raw(&key));
assert_eq!(expected_hash, actual_hash);
}
#[test]
fn get_path_for_block_key() {
let cache_dir = PathBuf::from("mountpoint-cache/");
let pool = PagedPool::new_with_candidate_sizes([1024]);
let data_cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_dir,
block_size: 1024,
limit: CacheLimit::Unbounded,
},
pool,
);
let s3_key = "a".repeat(266);
let etag = ETag::for_tests();
let key = ObjectId::new(s3_key.to_owned(), etag);
let block_key = DiskBlockKey::new(&key, 5);
let hashed_cache_key = hex::encode(hash_cache_key_raw(&key));
let split_hashed_key = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
let expected = vec![
"mountpoint-cache",
CACHE_VERSION,
split_hashed_key.0,
split_hashed_key.1,
"0000000005",
];
let path = data_cache.get_path_for_block_key(&block_key);
let results: Vec<OsString> = path.iter().map(ToOwned::to_owned).collect();
assert_eq!(expected, results);
}
#[test]
fn get_path_for_block_key_huge_block_index() {
let cache_dir = PathBuf::from("mountpoint-cache/");
let pool = PagedPool::new_with_candidate_sizes([1024]);
let data_cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_dir,
block_size: 1024,
limit: CacheLimit::Unbounded,
},
pool,
);
let s3_key = "a".repeat(266);
let etag = ETag::for_tests();
let key = ObjectId::new(s3_key.to_owned(), etag);
let block_key = DiskBlockKey::new(&key, 1000000000000000);
let hashed_cache_key = hex::encode(hash_cache_key_raw(&key));
let split_hashed_key = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
let expected = vec![
"mountpoint-cache",
CACHE_VERSION,
split_hashed_key.0,
split_hashed_key.1,
"1000000000000000",
];
let path = data_cache.get_path_for_block_key(&block_key);
let results: Vec<OsString> = path.iter().map(ToOwned::to_owned).collect();
assert_eq!(expected, results);
}
#[test_case(8 * 1024 * 1024, 8 * 1024 * 1024; "matching block and pool buffer sizes")]
#[test_case(1024 * 1024, 8 * 1024 * 1024; "block size smaller than pool buffer size")]
#[test_case(8 * 1024 * 1024, 1024 * 1024; "block size larger than pool buffer size")]
#[tokio::test]
async fn test_put_get(block_size: u64, pool_buffer_size: usize) {
let data_1 = ChecksummedBytes::new("Foo".into());
let data_2 = ChecksummedBytes::new("Bar".into());
let data_3 = ChecksummedBytes::new("Baz".into());
let object_1_size = data_1.len() + data_3.len();
let object_2_size = data_2.len();
let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes([pool_buffer_size]);
let cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
block_size,
limit: CacheLimit::Unbounded,
},
pool,
);
let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
let cache_key_2 = ObjectId::new(
"long-key_".repeat(100), ETag::for_tests(),
);
let block = cache
.get_block(&cache_key_1, 0, 0, object_1_size)
.await
.expect("cache should be accessible");
assert!(
block.is_none(),
"no entry should be available to return but got {block:?}",
);
cache
.put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size)
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_1, 0, 0, object_1_size)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
data_1, entry,
"cache entry returned should match original bytes after put"
);
cache
.put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size)
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_2, 0, 0, object_2_size)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
data_2, entry,
"cache entry returned should match original bytes after put"
);
cache
.put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size)
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_1, 1, block_size, object_1_size)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
data_3, entry,
"cache entry returned should match original bytes after put"
);
let entry = cache
.get_block(&cache_key_1, 0, 0, object_1_size)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
data_1, entry,
"cache entry returned should match original bytes after put"
);
}
#[tokio::test]
async fn test_checksummed_bytes_slice() {
let data = ChecksummedBytes::new("0123456789".into());
let slice = data.slice(1..5);
let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes([8 * 1024 * 1024]);
let cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
block_size: 8 * 1024 * 1024,
limit: CacheLimit::Unbounded,
},
pool,
);
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
cache
.put_block(cache_key.clone(), 0, 0, slice.clone(), slice.len())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key, 0, 0, slice.len())
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
slice.into_bytes().expect("original slice should be valid"),
entry.into_bytes().expect("returned entry should be valid"),
"cache entry returned should match original slice after put"
);
}
#[tokio::test]
async fn test_eviction() {
const BLOCK_SIZE: usize = 100 * 1024;
const LARGE_OBJECT_SIZE: usize = 1024 * 1024;
const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2;
const CACHE_LIMIT: usize = LARGE_OBJECT_SIZE;
fn create_random(seed: u64, size: usize) -> ChecksummedBytes {
let mut rng = SmallRng::seed_from_u64(seed);
let mut body = vec![0u8; size];
rng.fill(&mut body[..]);
ChecksummedBytes::new(body.into())
}
async fn is_block_in_cache(
cache: &DiskDataCache,
cache_key: &ObjectId,
block_idx: u64,
expected_bytes: &ChecksummedBytes,
object_size: usize,
) -> bool {
if let Some(retrieved) = cache
.get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64, object_size)
.await
.expect("cache should be accessible")
{
assert_eq!(
retrieved.clone().into_bytes().expect("retrieved bytes should be valid"),
expected_bytes
.clone()
.into_bytes()
.expect("original bytes should be valid")
);
true
} else {
false
}
}
let large_object = create_random(0x12345678, LARGE_OBJECT_SIZE);
let large_object_blocks: Vec<_> = (0..large_object.len())
.step_by(BLOCK_SIZE)
.map(|offset| large_object.slice(offset..(large_object.len().min(offset + BLOCK_SIZE))))
.collect();
let large_object_key = ObjectId::new("large".into(), ETag::for_tests());
let small_object = create_random(0x23456789, SMALL_OBJECT_SIZE);
let small_object_blocks: Vec<_> = (0..small_object.len())
.step_by(BLOCK_SIZE)
.map(|offset| small_object.slice(offset..(small_object.len().min(offset + BLOCK_SIZE))))
.collect();
let small_object_key = ObjectId::new("small".into(), ETag::for_tests());
let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes([BLOCK_SIZE]);
let cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
block_size: BLOCK_SIZE as u64,
limit: CacheLimit::TotalSize { max_size: CACHE_LIMIT },
},
pool,
);
for (block_idx, bytes) in large_object_blocks.iter().enumerate() {
cache
.put_block(
large_object_key.clone(),
block_idx as u64,
(block_idx * BLOCK_SIZE) as u64,
bytes.clone(),
LARGE_OBJECT_SIZE,
)
.await
.unwrap();
}
for (block_idx, bytes) in small_object_blocks.iter().enumerate() {
cache
.put_block(
small_object_key.clone(),
block_idx as u64,
(block_idx * BLOCK_SIZE) as u64,
bytes.clone(),
SMALL_OBJECT_SIZE,
)
.await
.unwrap();
}
let count_small_object_blocks_in_cache = futures::stream::iter(small_object_blocks.iter().enumerate())
.filter(|&(block_idx, bytes)| {
is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes, SMALL_OBJECT_SIZE)
})
.count()
.await;
assert_eq!(
count_small_object_blocks_in_cache,
small_object_blocks.len(),
"All blocks for small object should still be in the cache"
);
let count_large_object_blocks_in_cache = futures::stream::iter(large_object_blocks.iter().enumerate())
.filter(|&(block_idx, bytes)| {
is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes, LARGE_OBJECT_SIZE)
})
.count()
.await;
assert!(
count_large_object_blocks_in_cache < large_object_blocks.len(),
"Some blocks for the large object should have been evicted"
);
}
#[test]
fn data_block_extract_checks() {
let data_1 = ChecksummedBytes::new("Foo".into());
let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests());
let cache_key_3 = ObjectId::new("a".into(), ETag::from_str("badetag").unwrap());
let block = DiskBlock::new(cache_key_1.clone(), 0, 0, data_1.clone()).expect("should have no checksum err");
block
.data(&cache_key_1, 1, 0)
.expect_err("should fail due to incorrect block index");
block
.data(&cache_key_1, 0, 1024)
.expect_err("should fail due to incorrect block offset");
block
.data(&cache_key_2, 0, 0)
.expect_err("should fail due to incorrect s3 key in cache key");
block
.data(&cache_key_3, 0, 0)
.expect_err("should fail due to incorrect etag in cache key");
let unpacked_bytes = block
.data(&cache_key_1, 0, 0)
.expect("should be OK as all fields match");
assert_eq!(data_1, unpacked_bytes, "data block should return original bytes");
}
#[test]
fn validate_block_header() {
let block_idx = 0;
let block_offset = 0;
let block_size = 4;
let etag = ETag::for_tests();
let s3_key = String::from("s3/key");
let data_checksum = Crc32c::new(42);
let mut header = DiskBlockHeader::new(
block_idx,
block_offset,
block_size,
etag.as_str().to_owned(),
s3_key.clone(),
data_checksum,
);
let checksum = header
.validate(&s3_key, etag.as_str(), block_idx, block_offset, block_size)
.expect("should be OK with valid fields and checksum");
assert_eq!(data_checksum, checksum);
let err = header
.validate("hello", etag.as_str(), block_idx, block_offset, block_size)
.expect_err("should fail with invalid s3_key");
assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
let err = header
.validate(&s3_key, "bad etag", block_idx, block_offset, block_size)
.expect_err("should fail with invalid etag");
assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
let err = header
.validate(&s3_key, etag.as_str(), 5, block_offset, block_size)
.expect_err("should fail with invalid block idx");
assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
let err = header
.validate(&s3_key, etag.as_str(), block_idx, 1024, block_size)
.expect_err("should fail with invalid block offset");
assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
let err = header
.validate(&s3_key, etag.as_str(), block_idx, block_offset, 42)
.expect_err("should fail with invalid block length");
assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
header.header_checksum = 23;
let err = header
.validate(&s3_key, etag.as_str(), block_idx, block_offset, block_size)
.expect_err("should fail with invalid checksum");
assert!(matches!(err, DiskBlockAccessError::ChecksumError));
}
#[test_case("key")]
#[test_case("etag")]
#[test_case("data")]
fn read_corrupted_block_should_fail(length_to_corrupt: &str) {
const MAX_LENGTH: u64 = 1024;
fn get_u64_at(slice: &[u8], offset: usize) -> u64 {
u64::from_le_bytes(slice[offset..(offset + 8)].try_into().unwrap())
}
fn replace_u64_at(slice: &mut [u8], offset: usize, new_value: u64) {
slice[offset..(offset + 8)].copy_from_slice(&new_value.to_le_bytes());
}
let original_length = 42;
let data = ChecksummedBytes::new(vec![0u8; original_length].into());
let cache_key = ObjectId::new("k".into(), ETag::from_str("e").unwrap());
let block = DiskBlock::new(cache_key.clone(), 0, 0, data).expect("should have no checksum err");
let mut buf = Vec::new();
block.write(&mut buf).unwrap();
let (offset, expected) = match length_to_corrupt {
"key" => (24, cache_key.key().len()),
"etag" => (32 + cache_key.key().len(), cache_key.etag().as_str().len()),
"data" => (16, original_length),
_ => panic!("invalid length: {length_to_corrupt}"),
};
assert_eq!(
get_u64_at(&buf, offset) as usize,
expected,
"serialized length should match the expected value (have we changed the serialization format?)"
);
replace_u64_at(&mut buf, offset, u64::MAX);
let pool = PagedPool::new_with_candidate_sizes([MAX_LENGTH as usize]);
let err = DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH, &pool).expect_err("deserialization should fail");
match length_to_corrupt {
"key" | "etag" => assert!(matches!(
err,
DiskBlockReadWriteError::DecodeError(DecodeError::LimitExceeded)
)),
"data" => assert!(matches!(err, DiskBlockReadWriteError::InvalidBlockLength(_))),
_ => panic!("invalid length: {length_to_corrupt}"),
}
}
#[test]
fn test_concurrent_access() {
let block_size = 1024 * 1024;
let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes([block_size]);
let data_cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
block_size: block_size as u64,
limit: CacheLimit::Unbounded,
},
pool,
);
let data_cache = Arc::new(data_cache);
let cache_key = ObjectId::new("foo".to_owned(), ETag::for_tests());
let block_idx = 0;
let block_offset = 0;
let object_size = 10 * block_size;
let pool = ThreadPool::builder().pool_size(32).create().unwrap();
let mut handles = Vec::new();
for _ in 0..100 {
let data_cache = data_cache.clone();
let cache_key = cache_key.clone();
let handle = pool
.spawn_with_handle(async move {
let block = data_cache
.get_block(&cache_key, block_idx, block_offset, object_size)
.await
.expect("get_block should not return error");
if block.is_none() {
let bytes: Bytes = vec![0u8; block_size].into();
data_cache
.put_block(cache_key, block_idx, block_offset, bytes.into(), object_size)
.await
.expect("put_block should succeed");
}
})
.unwrap();
handles.push(handle);
}
block_on(async move {
for handle in handles {
handle.await
}
});
}
}