Skip to main content

mountpoint_s3_fs/data_cache/
disk_data_cache.rs

1//! Module for the on-disk data cache implementation.
2
3use std::fs;
4use std::io::{ErrorKind, Read, Write};
5use std::os::unix::fs::{DirBuilderExt, PermissionsExt};
6use std::path::{Path, PathBuf};
7use std::time::Instant;
8
9use async_trait::async_trait;
10use bincode::config::{Configuration, Fixint, Limit, LittleEndian};
11use bincode::error::{DecodeError, EncodeError};
12use bincode::{Decode, Encode};
13use bytes::Bytes;
14use linked_hash_map::LinkedHashMap;
15use mountpoint_s3_client::checksums::crc32c::{self, Crc32c};
16use sha2::{Digest, Sha256};
17use tempfile::NamedTempFile;
18use thiserror::Error;
19use tracing::{trace, warn};
20
21use crate::checksums::IntegrityError;
22use crate::data_cache::DataCacheError;
23use crate::memory::{BufferKind, PagedPool};
24use crate::metrics::defs::{
25    ATTR_CACHE, CACHE_DISK, CACHE_EVICT_LATENCY, CACHE_GET_ERRORS, CACHE_GET_IO_SIZE, CACHE_GET_LATENCY,
26    CACHE_PUT_ERRORS, CACHE_PUT_IO_SIZE, CACHE_PUT_LATENCY, CACHE_TOTAL_SIZE,
27};
28use crate::object::ObjectId;
29use crate::sync::Mutex;
30
31use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult};
32
33/// Disk and file-layout versioning.
34const CACHE_VERSION: &str = "V2";
35
36/// Index where hashed directory names for the cache are split to avoid FS-specific limits.
37const HASHED_DIR_SPLIT_INDEX: usize = 2;
38
39/// On-disk implementation of [DataCache].
40pub struct DiskDataCache {
41    config: DiskDataCacheConfig,
42    pool: PagedPool,
43    /// Tracks blocks usage. `None` when no cache limit was set.
44    usage: Option<Mutex<UsageInfo<DiskBlockKey>>>,
45}
46
47/// Configuration for a [DiskDataCache].
48#[derive(Debug)]
49pub struct DiskDataCacheConfig {
50    pub cache_directory: PathBuf,
51    /// Size of data blocks.
52    pub block_size: u64,
53    /// How to limit the cache size.
54    pub limit: CacheLimit,
55}
56
57/// Limit the cache size.
58#[derive(Debug)]
59pub enum CacheLimit {
60    Unbounded,
61    TotalSize { max_size: usize },
62    AvailableSpace { min_ratio: f64 },
63}
64
65/// Default minimum ratio of available space to preserve when using AvailableSpace cache limit.
66/// This preserves 5% of the filesystem's total space as available space.
67pub const DEFAULT_CACHE_MIN_AVAILABLE_RATIO: f64 = 0.05;
68
69impl Default for CacheLimit {
70    fn default() -> Self {
71        CacheLimit::AvailableSpace {
72            min_ratio: DEFAULT_CACHE_MIN_AVAILABLE_RATIO,
73        }
74    }
75}
76
77/// Describes additional information about the data stored in the block.
78///
79/// It should be written alongside the block's data
80/// and used to verify it contains the correct contents to avoid blocks being mixed up.
81#[derive(Encode, Decode, Debug)]
82struct DiskBlockHeader {
83    block_idx: BlockIndex,
84    block_offset: u64,
85    block_len: u64,
86    etag: String,
87    s3_key: String,
88    data_checksum: u32,
89    header_checksum: u32,
90}
91
92/// Max size of header after encoding.
93/// 10000 should easily accommodate any block header:
94/// - S3 key should always be less than or equal to 1024
95/// - Allow 1024 for ETag, even though its always much smaller
96/// - Integers should be up to 8 bytes each
97const BINCODE_HEADER_MAX_SIZE: usize = 10000;
98
99/// Binary encoding configuration for the block header.
100const BINCODE_CONFIG: Configuration<LittleEndian, Fixint, Limit<BINCODE_HEADER_MAX_SIZE>> = bincode::config::standard()
101    .with_fixed_int_encoding()
102    .with_limit::<BINCODE_HEADER_MAX_SIZE>();
103
104/// Error during creation of a [DiskBlock]
105#[derive(Debug, Error)]
106enum DiskBlockCreationError {
107    /// Data corruption detected when unpacking bytes and checksum
108    #[error(transparent)]
109    IntegrityError(#[from] IntegrityError),
110}
111
112/// Error during access to a [DiskBlock]
113#[derive(Debug, Error)]
114enum DiskBlockAccessError {
115    #[error("checksum over the block's fields did not match the field content")]
116    ChecksumError,
117    #[error("one or more of the fields in this block were incorrect")]
118    FieldMismatchError,
119}
120
121/// Error when reading or writing a [DiskBlock]
122#[derive(Debug, Error)]
123enum DiskBlockReadWriteError {
124    #[error("Invalid block length: {0}")]
125    InvalidBlockLength(u64),
126    #[error("Error decoding the block: {0}")]
127    DecodeError(DecodeError),
128    #[error("Error encoding the block: {0}")]
129    EncodeError(EncodeError),
130    #[error("IO error: {0}")]
131    IOError(#[from] std::io::Error),
132}
133
134impl DiskBlockHeader {
135    /// Creates a new [DiskBlockHeader]
136    pub fn new(
137        block_idx: BlockIndex,
138        block_offset: u64,
139        block_len: usize,
140        etag: String,
141        s3_key: String,
142        data_checksum: Crc32c,
143    ) -> Self {
144        let data_checksum = data_checksum.value();
145        let header_checksum =
146            Self::compute_checksum(block_idx, block_offset, block_len, &etag, &s3_key, data_checksum).value();
147        DiskBlockHeader {
148            block_idx,
149            block_offset,
150            block_len: block_len as u64,
151            etag,
152            s3_key,
153            data_checksum,
154            header_checksum,
155        }
156    }
157
158    fn compute_checksum(
159        block_idx: BlockIndex,
160        block_offset: u64,
161        block_len: usize,
162        etag: &str,
163        s3_key: &str,
164        data_checksum: u32,
165    ) -> Crc32c {
166        let mut hasher = crc32c::Hasher::new();
167        hasher.update(&block_idx.to_be_bytes());
168        hasher.update(&block_offset.to_be_bytes());
169        hasher.update(&block_len.to_be_bytes());
170        hasher.update(etag.as_bytes());
171        hasher.update(s3_key.as_bytes());
172        hasher.update(&data_checksum.to_be_bytes());
173        hasher.finalize()
174    }
175
176    /// Validate the integrity of the contained data and return the stored data checksum.
177    ///
178    /// Execute this method before acting on the data contained within.
179    pub fn validate(
180        &self,
181        s3_key: &str,
182        etag: &str,
183        block_idx: BlockIndex,
184        block_offset: u64,
185        block_len: usize,
186    ) -> Result<Crc32c, DiskBlockAccessError> {
187        let s3_key_match = s3_key == self.s3_key;
188        let etag_match = etag == self.etag;
189        let block_idx_match = block_idx == self.block_idx;
190        let block_offset_match = block_offset == self.block_offset;
191        let block_size_match = block_len == self.block_len as usize;
192
193        let data_checksum = self.data_checksum;
194        if s3_key_match && etag_match && block_idx_match && block_offset_match && block_size_match {
195            if Self::compute_checksum(block_idx, block_offset, block_len, etag, s3_key, data_checksum).value()
196                != self.header_checksum
197            {
198                Err(DiskBlockAccessError::ChecksumError)
199            } else {
200                Ok(Crc32c::new(data_checksum))
201            }
202        } else {
203            warn!(
204                s3_key_match,
205                etag_match, block_idx_match, block_size_match, "block data did not match expected values",
206            );
207            Err(DiskBlockAccessError::FieldMismatchError)
208        }
209    }
210}
211
212/// Represents a fixed-size chunk of data that can be serialized.
213#[derive(Debug)]
214struct DiskBlock {
215    /// Information describing the content of `data`, to be used to verify correctness
216    header: DiskBlockHeader,
217    /// Cached bytes
218    data: Bytes,
219}
220
221impl DiskBlock {
222    /// Create a new [DiskBlock].
223    ///
224    /// This may return an integrity error if the checksummed byte buffer is found to be corrupt.
225    /// However, this check is not guaranteed and it shouldn't be assumed that the data within the block is not corrupt.
226    fn new(
227        cache_key: ObjectId,
228        block_idx: BlockIndex,
229        block_offset: u64,
230        bytes: ChecksummedBytes,
231    ) -> Result<Self, DiskBlockCreationError> {
232        let s3_key = cache_key.key().to_owned();
233        let etag = cache_key.etag().as_str().to_owned();
234        let (data, data_checksum) = bytes.into_inner()?;
235        let header = DiskBlockHeader::new(block_idx, block_offset, data.len(), etag, s3_key, data_checksum);
236
237        Ok(DiskBlock { data, header })
238    }
239
240    /// Extract the block data, checking that fields such as S3 key, etc. match what we expect.
241    ///
242    /// Comparing these fields helps ensure we have not corrupted or swapped block data on disk.
243    fn data(
244        &self,
245        cache_key: &ObjectId,
246        block_idx: BlockIndex,
247        block_offset: u64,
248    ) -> Result<ChecksummedBytes, DiskBlockAccessError> {
249        let data_checksum = self.header.validate(
250            cache_key.key(),
251            cache_key.etag().as_str(),
252            block_idx,
253            block_offset,
254            self.data.len(),
255        )?;
256        let bytes = ChecksummedBytes::new_from_inner_data(self.data.clone(), data_checksum);
257        Ok(bytes)
258    }
259
260    /// Deserialize an instance from `reader`.
261    fn read(reader: &mut impl Read, block_size: u64, pool: &PagedPool) -> Result<Self, DiskBlockReadWriteError> {
262        let header: DiskBlockHeader = bincode::decode_from_std_read(reader, BINCODE_CONFIG)?;
263
264        if header.block_len > block_size {
265            return Err(DiskBlockReadWriteError::InvalidBlockLength(header.block_len));
266        }
267
268        let size = header.block_len as usize;
269        let mut buffer = pool.get_buffer_mut(size, BufferKind::DiskCache);
270        buffer.fill_from_reader(reader)?;
271        let data = buffer.into_bytes();
272
273        Ok(Self { header, data })
274    }
275
276    /// Serialize this instance to `writer` and return the number of bytes written on success.
277    fn write(&self, writer: &mut impl Write) -> Result<usize, DiskBlockReadWriteError> {
278        let header_length = bincode::encode_into_std_write(&self.header, writer, BINCODE_CONFIG)?;
279        writer.write_all(&self.data)?;
280        Ok(header_length + self.data.len())
281    }
282}
283
284impl From<DecodeError> for DiskBlockReadWriteError {
285    fn from(value: DecodeError) -> Self {
286        match value {
287            DecodeError::Io { inner, .. } => DiskBlockReadWriteError::IOError(inner),
288            value => DiskBlockReadWriteError::DecodeError(value),
289        }
290    }
291}
292
293impl From<EncodeError> for DiskBlockReadWriteError {
294    fn from(value: EncodeError) -> Self {
295        match value {
296            EncodeError::Io { inner, .. } => DiskBlockReadWriteError::IOError(inner),
297            value => DiskBlockReadWriteError::EncodeError(value),
298        }
299    }
300}
301
302impl From<std::io::Error> for DataCacheError {
303    fn from(e: std::io::Error) -> Self {
304        DataCacheError::IoFailure(e.into())
305    }
306}
307
308impl From<DiskBlockReadWriteError> for DataCacheError {
309    fn from(value: DiskBlockReadWriteError) -> Self {
310        match value {
311            DiskBlockReadWriteError::IOError(e) => DataCacheError::IoFailure(e.into()),
312            _ => DataCacheError::InvalidBlockContent,
313        }
314    }
315}
316
317impl DiskDataCache {
318    /// Create a new instance of an [DiskDataCache] with the specified configuration.
319    pub fn new(config: DiskDataCacheConfig, pool: PagedPool) -> Self {
320        let usage = match &config.limit {
321            CacheLimit::Unbounded => None,
322            CacheLimit::TotalSize { .. } | CacheLimit::AvailableSpace { .. } => Some(Mutex::new(UsageInfo::new())),
323        };
324        DiskDataCache { config, pool, usage }
325    }
326
327    /// Get the relative path for the given block.
328    fn get_path_for_block_key(&self, block_key: &DiskBlockKey) -> PathBuf {
329        let mut path = self.config.cache_directory.join(CACHE_VERSION);
330        block_key.append_to_path(&mut path);
331        path
332    }
333
334    fn read_block(
335        &self,
336        path: impl AsRef<Path>,
337        cache_key: &ObjectId,
338        block_idx: BlockIndex,
339        block_offset: u64,
340    ) -> DataCacheResult<Option<ChecksummedBytes>> {
341        trace!(
342            key = ?cache_key.key(),
343            offset = block_offset,
344            path = ?path.as_ref(),
345            "reading cache block",
346        );
347        let mut file = match fs::File::open(path.as_ref()) {
348            Ok(file) => file,
349            Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None),
350            Err(err) => return Err(err.into()),
351        };
352
353        let mut block_version = [0; CACHE_VERSION.len()];
354        file.read_exact(&mut block_version)?;
355        if block_version != CACHE_VERSION.as_bytes() {
356            warn!(
357                found_version = ?block_version, expected_version = ?CACHE_VERSION, path = ?path.as_ref(),
358                "stale block format found during reading"
359            );
360            return Err(DataCacheError::InvalidBlockContent);
361        }
362
363        let block = DiskBlock::read(&mut file, self.block_size(), &self.pool)
364            .inspect_err(|e| warn!(path = ?path.as_ref(), "block could not be deserialized: {:?}", e))?;
365        let bytes = block
366            .data(cache_key, block_idx, block_offset)
367            .map_err(|err| match err {
368                DiskBlockAccessError::ChecksumError | DiskBlockAccessError::FieldMismatchError => {
369                    DataCacheError::InvalidBlockContent
370                }
371            })?;
372
373        Ok(Some(bytes))
374    }
375
376    fn write_block(&self, path: impl AsRef<Path>, block: DiskBlock) -> DataCacheResult<(NamedTempFile, usize)> {
377        let path = path.as_ref();
378        let cache_path_for_key = path.parent().expect("path should include cache key in directory name");
379        fs::DirBuilder::new()
380            .mode(0o700)
381            .recursive(true)
382            .create(cache_path_for_key)?;
383
384        let mut temp_file = tempfile::Builder::new()
385            .permissions(fs::Permissions::from_mode(0o600))
386            .tempfile_in(cache_path_for_key)?;
387        trace!(
388            key = block.header.s3_key,
389            offset = block.header.block_offset,
390            block_path = ?path,
391            temp_path = ?temp_file.path(),
392            "writing cache block",
393        );
394        temp_file.write_all(CACHE_VERSION.as_bytes())?;
395        let bytes_written = block.write(&mut temp_file)?;
396        Ok((temp_file, bytes_written))
397    }
398
399    fn is_limit_exceeded(&self, size: usize) -> bool {
400        metrics::gauge!(CACHE_TOTAL_SIZE, ATTR_CACHE => CACHE_DISK).set(size as f64);
401        match self.config.limit {
402            CacheLimit::Unbounded => false,
403            CacheLimit::TotalSize { max_size } => size > max_size,
404            CacheLimit::AvailableSpace { min_ratio } => {
405                let stats = match nix::sys::statvfs::statvfs(&self.config.cache_directory) {
406                    Ok(stats) if stats.blocks() == 0 => {
407                        warn!("unable to determine available space (0 blocks reported)");
408                        return false;
409                    }
410                    Ok(stats) => stats,
411                    Err(error) => {
412                        warn!(?error, "unable to determine available space");
413                        return false;
414                    }
415                };
416                (stats.blocks_available() as f64) < min_ratio * (stats.blocks() as f64)
417            }
418        }
419    }
420
421    fn evict_if_needed(&self) -> DataCacheResult<()> {
422        let Some(usage) = &self.usage else {
423            return Ok(());
424        };
425
426        loop {
427            let mut usage = usage.lock().unwrap();
428            if !self.is_limit_exceeded(usage.size) {
429                break;
430            }
431            let Some(to_remove) = usage.evict_lru() else {
432                warn!("cache limit exceeded but nothing to evict");
433                return Err(DataCacheError::EvictionFailure);
434            };
435            let path_to_remove = self.get_path_for_block_key(&to_remove);
436            trace!("evicting block at {}", path_to_remove.display());
437            if let Err(remove_err) = fs::remove_file(&path_to_remove)
438                && remove_err.kind() != ErrorKind::NotFound
439            {
440                warn!("unable to evict block: {:?}", remove_err);
441            }
442        }
443        Ok(())
444    }
445}
446
447/// Hash the cache key using its fields as well as the [CACHE_VERSION].
448fn hash_cache_key_raw(cache_key: &ObjectId) -> [u8; 32] {
449    let s3_key = cache_key.key();
450    let etag = cache_key.etag();
451
452    let mut hasher = Sha256::new();
453    hasher.update(CACHE_VERSION);
454    hasher.update(s3_key);
455    hasher.update(etag.as_str());
456    hasher.finalize().into()
457}
458
459#[async_trait]
460impl DataCache for DiskDataCache {
461    async fn get_block(
462        &self,
463        cache_key: &ObjectId,
464        block_idx: BlockIndex,
465        block_offset: u64,
466        _object_size: usize,
467    ) -> DataCacheResult<Option<ChecksummedBytes>> {
468        if block_offset != block_idx * self.config.block_size {
469            return Err(DataCacheError::InvalidBlockOffset);
470        }
471        let start = Instant::now();
472        let block_key = DiskBlockKey::new(cache_key, block_idx);
473        let path = self.get_path_for_block_key(&block_key);
474        let result = match self.read_block(&path, cache_key, block_idx, block_offset) {
475            Ok(None) => {
476                // Cache miss.
477                Ok(None)
478            }
479            Ok(Some(bytes)) => {
480                // Cache hit.
481                metrics::histogram!(CACHE_GET_IO_SIZE, ATTR_CACHE => CACHE_DISK).record(bytes.len() as f64);
482                if let Some(usage) = &self.usage {
483                    usage.lock().unwrap().refresh(&block_key);
484                }
485                Ok(Some(bytes))
486            }
487            Err(err) => {
488                // Invalid block.
489                metrics::counter!(CACHE_GET_ERRORS, ATTR_CACHE => CACHE_DISK).increment(1);
490                Err(err)
491            }
492        };
493        metrics::histogram!(CACHE_GET_LATENCY, ATTR_CACHE => CACHE_DISK).record(start.elapsed().as_micros() as f64);
494        result
495    }
496
497    async fn put_block(
498        &self,
499        cache_key: ObjectId,
500        block_idx: BlockIndex,
501        block_offset: u64,
502        bytes: ChecksummedBytes,
503        _object_size: usize,
504    ) -> DataCacheResult<()> {
505        if block_offset != block_idx * self.config.block_size {
506            return Err(DataCacheError::InvalidBlockOffset);
507        }
508        let start = Instant::now();
509        let bytes_len = bytes.len();
510        let block_key = DiskBlockKey::new(&cache_key, block_idx);
511        let path = self.get_path_for_block_key(&block_key);
512        trace!(?cache_key, ?path, "new block will be created in disk cache");
513
514        // Capture the put operation result separately from metrics recording
515        // to ensure we can record both success and error metrics consistently
516        let put_result = (|| -> DataCacheResult<()> {
517            let block = DiskBlock::new(cache_key, block_idx, block_offset, bytes).map_err(|err| match err {
518                DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent,
519            })?;
520
521            {
522                let eviction_start = Instant::now();
523                let result = self.evict_if_needed();
524                metrics::histogram!(CACHE_EVICT_LATENCY, ATTR_CACHE => CACHE_DISK)
525                    .record(eviction_start.elapsed().as_micros() as f64);
526                result
527            }?;
528
529            let result = self.write_block(&path, block);
530            let (temp_file, size) = result?;
531
532            if let Some(usage) = &self.usage {
533                let mut usage = usage.lock().unwrap();
534                _ = temp_file.persist(path).map_err(|e| e.error)?;
535                usage.add(block_key, size);
536            } else {
537                _ = temp_file.persist(path).map_err(|e| e.error)?;
538            }
539
540            Ok(())
541        })();
542
543        if put_result.is_ok() {
544            metrics::histogram!(CACHE_PUT_IO_SIZE, ATTR_CACHE => CACHE_DISK).record(bytes_len as f64);
545        } else {
546            metrics::counter!(CACHE_PUT_ERRORS, ATTR_CACHE => CACHE_DISK).increment(1);
547        }
548        metrics::histogram!(CACHE_PUT_LATENCY, ATTR_CACHE => CACHE_DISK).record(start.elapsed().as_micros() as f64);
549        put_result
550    }
551
552    fn block_size(&self) -> u64 {
553        self.config.block_size
554    }
555}
556
557/// Key to identify a block in the disk cache, composed of a hash of the S3 key and Etag, and the block index.
558/// An S3 key may be up to 1024 UTF-8 bytes long, which exceeds the maximum UNIX file name length.
559/// Instead, this key contains a hash of the S3 key and ETag to avoid the limit when used in paths.
560/// The risk of collisions is mitigated as we ignore blocks read that contain the wrong S3 key, etc..
561#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
562struct DiskBlockKey {
563    hashed_key: [u8; 32],
564    block_index: BlockIndex,
565}
566
567impl DiskBlockKey {
568    fn new(cache_key: &ObjectId, block_index: BlockIndex) -> Self {
569        let hashed_key = hash_cache_key_raw(cache_key);
570        Self {
571            hashed_key,
572            block_index,
573        }
574    }
575
576    fn hex_key(&self) -> String {
577        hex::encode(self.hashed_key)
578    }
579
580    fn append_to_path(&self, path: &mut PathBuf) {
581        let hashed_cache_key = self.hex_key();
582
583        // Split directories by taking the first few chars of hash to avoid hitting any FS-specific maximum number of directory entries.
584        let (first, second) = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
585        path.push(first);
586        path.push(second);
587
588        // Append the block index.
589        path.push(format!("{:010}", self.block_index));
590    }
591}
592
593/// Keeps track of entries usage and total size.
594struct UsageInfo<K> {
595    entries: LinkedHashMap<K, usize>,
596    size: usize,
597}
598
599impl<K> UsageInfo<K>
600where
601    K: std::hash::Hash + Eq + std::fmt::Debug,
602{
603    fn new() -> Self {
604        Self {
605            entries: LinkedHashMap::new(),
606            size: 0,
607        }
608    }
609
610    /// Refresh the given key if present, marking it as the most recently used.
611    /// Returns `false` if the key is not in the cache.
612    fn refresh(&mut self, key: &K) -> bool {
613        self.entries.get_refresh(key).is_some()
614    }
615
616    /// Add or replace a key and update the total size.
617    fn add(&mut self, key: K, size: usize) {
618        if let Some(previous_size) = self.entries.insert(key, size) {
619            self.size = self.size.saturating_sub(previous_size);
620        }
621
622        self.size = self.size.saturating_add(size);
623    }
624
625    /// Remove the least recently used key and update the total size.
626    /// Return `None` if empty.
627    fn evict_lru(&mut self) -> Option<K> {
628        let (key, size) = self.entries.pop_front()?;
629        self.size = self.size.saturating_sub(size);
630        Some(key)
631    }
632}
633
634#[cfg(test)]
635mod tests {
636    use std::str::FromStr;
637    use std::{ffi::OsString, io::Cursor};
638
639    use super::*;
640
641    use futures::StreamExt as _;
642    use futures::executor::{ThreadPool, block_on};
643    use futures::task::SpawnExt;
644    use mountpoint_s3_client::types::ETag;
645    use rand::rngs::SmallRng;
646    use rand::{RngExt, SeedableRng};
647    use test_case::test_case;
648
649    use crate::sync::Arc;
650
651    #[test]
652    fn test_block_format_version_requires_update() {
653        let cache_key = ObjectId::new("hello-world".to_string(), ETag::for_tests());
654        let data = ChecksummedBytes::new("Foo".into());
655        let block = DiskBlock::new(cache_key, 100, 100 * 10, data).expect("should succeed as data checksum is valid");
656        let expected_bytes: Vec<u8> = vec![
657            100, 0, 0, 0, 0, 0, 0, 0, 232, 3, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116,
658            101, 115, 116, 95, 101, 116, 97, 103, 11, 0, 0, 0, 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114,
659            108, 100, 9, 85, 128, 46, 13, 202, 106, 46, 70, 111, 111,
660        ];
661        let mut serialized_bytes = Vec::new();
662        block.write(&mut serialized_bytes).unwrap();
663        assert_eq!(
664            expected_bytes, serialized_bytes,
665            "serialized disk format appears to have changed, version bump required"
666        );
667    }
668
669    #[test]
670    fn test_hash_cache_key_raw() {
671        let s3_key = "a".repeat(266);
672        let etag = ETag::for_tests();
673        let key = ObjectId::new(s3_key, etag);
674        let expected_hash = "1cfd611a26062b33e98d48a84e967ddcc2a42957479a8abd541e29cfa3258639";
675        let actual_hash = hex::encode(hash_cache_key_raw(&key));
676        assert_eq!(expected_hash, actual_hash);
677    }
678
679    #[test]
680    fn get_path_for_block_key() {
681        let cache_dir = PathBuf::from("mountpoint-cache/");
682        let pool = PagedPool::new_with_candidate_sizes([1024]);
683        let data_cache = DiskDataCache::new(
684            DiskDataCacheConfig {
685                cache_directory: cache_dir,
686                block_size: 1024,
687                limit: CacheLimit::Unbounded,
688            },
689            pool,
690        );
691
692        let s3_key = "a".repeat(266);
693        let etag = ETag::for_tests();
694        let key = ObjectId::new(s3_key.to_owned(), etag);
695
696        let block_key = DiskBlockKey::new(&key, 5);
697        let hashed_cache_key = hex::encode(hash_cache_key_raw(&key));
698        let split_hashed_key = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
699        let expected = vec![
700            "mountpoint-cache",
701            CACHE_VERSION,
702            split_hashed_key.0,
703            split_hashed_key.1,
704            "0000000005",
705        ];
706        let path = data_cache.get_path_for_block_key(&block_key);
707        let results: Vec<OsString> = path.iter().map(ToOwned::to_owned).collect();
708        assert_eq!(expected, results);
709    }
710
711    #[test]
712    fn get_path_for_block_key_huge_block_index() {
713        let cache_dir = PathBuf::from("mountpoint-cache/");
714        let pool = PagedPool::new_with_candidate_sizes([1024]);
715        let data_cache = DiskDataCache::new(
716            DiskDataCacheConfig {
717                cache_directory: cache_dir,
718                block_size: 1024,
719                limit: CacheLimit::Unbounded,
720            },
721            pool,
722        );
723
724        let s3_key = "a".repeat(266);
725        let etag = ETag::for_tests();
726        let key = ObjectId::new(s3_key.to_owned(), etag);
727
728        let block_key = DiskBlockKey::new(&key, 1000000000000000);
729        let hashed_cache_key = hex::encode(hash_cache_key_raw(&key));
730        let split_hashed_key = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
731        let expected = vec![
732            "mountpoint-cache",
733            CACHE_VERSION,
734            split_hashed_key.0,
735            split_hashed_key.1,
736            "1000000000000000",
737        ];
738        let path = data_cache.get_path_for_block_key(&block_key);
739        let results: Vec<OsString> = path.iter().map(ToOwned::to_owned).collect();
740        assert_eq!(expected, results);
741    }
742
743    #[test_case(8 * 1024 * 1024, 8 * 1024 * 1024; "matching block and pool buffer sizes")]
744    #[test_case(1024 * 1024, 8 * 1024 * 1024; "block size smaller than pool buffer size")]
745    #[test_case(8 * 1024 * 1024, 1024 * 1024; "block size larger than pool buffer size")]
746    #[tokio::test]
747    async fn test_put_get(block_size: u64, pool_buffer_size: usize) {
748        let data_1 = ChecksummedBytes::new("Foo".into());
749        let data_2 = ChecksummedBytes::new("Bar".into());
750        let data_3 = ChecksummedBytes::new("Baz".into());
751
752        let object_1_size = data_1.len() + data_3.len();
753        let object_2_size = data_2.len();
754
755        let cache_directory = tempfile::tempdir().unwrap();
756        let pool = PagedPool::new_with_candidate_sizes([pool_buffer_size]);
757        let cache = DiskDataCache::new(
758            DiskDataCacheConfig {
759                cache_directory: cache_directory.path().to_path_buf(),
760                block_size,
761                limit: CacheLimit::Unbounded,
762            },
763            pool,
764        );
765        let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
766        let cache_key_2 = ObjectId::new(
767            "long-key_".repeat(100), // at least 900 chars, exceeding easily 255 chars (UNIX filename limit)
768            ETag::for_tests(),
769        );
770
771        let block = cache
772            .get_block(&cache_key_1, 0, 0, object_1_size)
773            .await
774            .expect("cache should be accessible");
775        assert!(
776            block.is_none(),
777            "no entry should be available to return but got {block:?}",
778        );
779
780        // PUT and GET, OK?
781        cache
782            .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size)
783            .await
784            .expect("cache should be accessible");
785        let entry = cache
786            .get_block(&cache_key_1, 0, 0, object_1_size)
787            .await
788            .expect("cache should be accessible")
789            .expect("cache entry should be returned");
790        assert_eq!(
791            data_1, entry,
792            "cache entry returned should match original bytes after put"
793        );
794
795        // PUT AND GET a second file, OK?
796        cache
797            .put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size)
798            .await
799            .expect("cache should be accessible");
800        let entry = cache
801            .get_block(&cache_key_2, 0, 0, object_2_size)
802            .await
803            .expect("cache should be accessible")
804            .expect("cache entry should be returned");
805        assert_eq!(
806            data_2, entry,
807            "cache entry returned should match original bytes after put"
808        );
809
810        // PUT AND GET a second block in a cache entry, OK?
811        cache
812            .put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size)
813            .await
814            .expect("cache should be accessible");
815        let entry = cache
816            .get_block(&cache_key_1, 1, block_size, object_1_size)
817            .await
818            .expect("cache should be accessible")
819            .expect("cache entry should be returned");
820        assert_eq!(
821            data_3, entry,
822            "cache entry returned should match original bytes after put"
823        );
824
825        // Entry 1's first block still intact
826        let entry = cache
827            .get_block(&cache_key_1, 0, 0, object_1_size)
828            .await
829            .expect("cache should be accessible")
830            .expect("cache entry should be returned");
831        assert_eq!(
832            data_1, entry,
833            "cache entry returned should match original bytes after put"
834        );
835    }
836
837    #[tokio::test]
838    async fn test_checksummed_bytes_slice() {
839        let data = ChecksummedBytes::new("0123456789".into());
840        let slice = data.slice(1..5);
841
842        let cache_directory = tempfile::tempdir().unwrap();
843        let pool = PagedPool::new_with_candidate_sizes([8 * 1024 * 1024]);
844        let cache = DiskDataCache::new(
845            DiskDataCacheConfig {
846                cache_directory: cache_directory.path().to_path_buf(),
847                block_size: 8 * 1024 * 1024,
848                limit: CacheLimit::Unbounded,
849            },
850            pool,
851        );
852        let cache_key = ObjectId::new("a".into(), ETag::for_tests());
853
854        cache
855            .put_block(cache_key.clone(), 0, 0, slice.clone(), slice.len())
856            .await
857            .expect("cache should be accessible");
858        let entry = cache
859            .get_block(&cache_key, 0, 0, slice.len())
860            .await
861            .expect("cache should be accessible")
862            .expect("cache entry should be returned");
863        assert_eq!(
864            slice.into_bytes().expect("original slice should be valid"),
865            entry.into_bytes().expect("returned entry should be valid"),
866            "cache entry returned should match original slice after put"
867        );
868    }
869
870    #[tokio::test]
871    async fn test_eviction() {
872        const BLOCK_SIZE: usize = 100 * 1024;
873        const LARGE_OBJECT_SIZE: usize = 1024 * 1024;
874        const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2;
875        const CACHE_LIMIT: usize = LARGE_OBJECT_SIZE;
876
877        fn create_random(seed: u64, size: usize) -> ChecksummedBytes {
878            let mut rng = SmallRng::seed_from_u64(seed);
879            let mut body = vec![0u8; size];
880            rng.fill(&mut body[..]);
881
882            ChecksummedBytes::new(body.into())
883        }
884
885        async fn is_block_in_cache(
886            cache: &DiskDataCache,
887            cache_key: &ObjectId,
888            block_idx: u64,
889            expected_bytes: &ChecksummedBytes,
890            object_size: usize,
891        ) -> bool {
892            if let Some(retrieved) = cache
893                .get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64, object_size)
894                .await
895                .expect("cache should be accessible")
896            {
897                assert_eq!(
898                    retrieved.clone().into_bytes().expect("retrieved bytes should be valid"),
899                    expected_bytes
900                        .clone()
901                        .into_bytes()
902                        .expect("original bytes should be valid")
903                );
904                true
905            } else {
906                false
907            }
908        }
909
910        let large_object = create_random(0x12345678, LARGE_OBJECT_SIZE);
911        let large_object_blocks: Vec<_> = (0..large_object.len())
912            .step_by(BLOCK_SIZE)
913            .map(|offset| large_object.slice(offset..(large_object.len().min(offset + BLOCK_SIZE))))
914            .collect();
915        let large_object_key = ObjectId::new("large".into(), ETag::for_tests());
916
917        let small_object = create_random(0x23456789, SMALL_OBJECT_SIZE);
918        let small_object_blocks: Vec<_> = (0..small_object.len())
919            .step_by(BLOCK_SIZE)
920            .map(|offset| small_object.slice(offset..(small_object.len().min(offset + BLOCK_SIZE))))
921            .collect();
922        let small_object_key = ObjectId::new("small".into(), ETag::for_tests());
923
924        let cache_directory = tempfile::tempdir().unwrap();
925        let pool = PagedPool::new_with_candidate_sizes([BLOCK_SIZE]);
926        let cache = DiskDataCache::new(
927            DiskDataCacheConfig {
928                cache_directory: cache_directory.path().to_path_buf(),
929                block_size: BLOCK_SIZE as u64,
930                limit: CacheLimit::TotalSize { max_size: CACHE_LIMIT },
931            },
932            pool,
933        );
934
935        // Put all of large_object
936        for (block_idx, bytes) in large_object_blocks.iter().enumerate() {
937            cache
938                .put_block(
939                    large_object_key.clone(),
940                    block_idx as u64,
941                    (block_idx * BLOCK_SIZE) as u64,
942                    bytes.clone(),
943                    LARGE_OBJECT_SIZE,
944                )
945                .await
946                .unwrap();
947        }
948
949        // Put all of small_object
950        for (block_idx, bytes) in small_object_blocks.iter().enumerate() {
951            cache
952                .put_block(
953                    small_object_key.clone(),
954                    block_idx as u64,
955                    (block_idx * BLOCK_SIZE) as u64,
956                    bytes.clone(),
957                    SMALL_OBJECT_SIZE,
958                )
959                .await
960                .unwrap();
961        }
962
963        let count_small_object_blocks_in_cache = futures::stream::iter(small_object_blocks.iter().enumerate())
964            .filter(|&(block_idx, bytes)| {
965                is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes, SMALL_OBJECT_SIZE)
966            })
967            .count()
968            .await;
969        assert_eq!(
970            count_small_object_blocks_in_cache,
971            small_object_blocks.len(),
972            "All blocks for small object should still be in the cache"
973        );
974
975        let count_large_object_blocks_in_cache = futures::stream::iter(large_object_blocks.iter().enumerate())
976            .filter(|&(block_idx, bytes)| {
977                is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes, LARGE_OBJECT_SIZE)
978            })
979            .count()
980            .await;
981        assert!(
982            count_large_object_blocks_in_cache < large_object_blocks.len(),
983            "Some blocks for the large object should have been evicted"
984        );
985    }
986
987    #[test]
988    fn data_block_extract_checks() {
989        let data_1 = ChecksummedBytes::new("Foo".into());
990
991        let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
992        let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests());
993        let cache_key_3 = ObjectId::new("a".into(), ETag::from_str("badetag").unwrap());
994
995        let block = DiskBlock::new(cache_key_1.clone(), 0, 0, data_1.clone()).expect("should have no checksum err");
996        block
997            .data(&cache_key_1, 1, 0)
998            .expect_err("should fail due to incorrect block index");
999        block
1000            .data(&cache_key_1, 0, 1024)
1001            .expect_err("should fail due to incorrect block offset");
1002        block
1003            .data(&cache_key_2, 0, 0)
1004            .expect_err("should fail due to incorrect s3 key in cache key");
1005        block
1006            .data(&cache_key_3, 0, 0)
1007            .expect_err("should fail due to incorrect etag in cache key");
1008        let unpacked_bytes = block
1009            .data(&cache_key_1, 0, 0)
1010            .expect("should be OK as all fields match");
1011        assert_eq!(data_1, unpacked_bytes, "data block should return original bytes");
1012    }
1013
1014    #[test]
1015    fn validate_block_header() {
1016        let block_idx = 0;
1017        let block_offset = 0;
1018        let block_size = 4;
1019        let etag = ETag::for_tests();
1020        let s3_key = String::from("s3/key");
1021        let data_checksum = Crc32c::new(42);
1022        let mut header = DiskBlockHeader::new(
1023            block_idx,
1024            block_offset,
1025            block_size,
1026            etag.as_str().to_owned(),
1027            s3_key.clone(),
1028            data_checksum,
1029        );
1030
1031        let checksum = header
1032            .validate(&s3_key, etag.as_str(), block_idx, block_offset, block_size)
1033            .expect("should be OK with valid fields and checksum");
1034        assert_eq!(data_checksum, checksum);
1035
1036        // Bad fields
1037        let err = header
1038            .validate("hello", etag.as_str(), block_idx, block_offset, block_size)
1039            .expect_err("should fail with invalid s3_key");
1040        assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1041        let err = header
1042            .validate(&s3_key, "bad etag", block_idx, block_offset, block_size)
1043            .expect_err("should fail with invalid etag");
1044        assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1045        let err = header
1046            .validate(&s3_key, etag.as_str(), 5, block_offset, block_size)
1047            .expect_err("should fail with invalid block idx");
1048        assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1049        let err = header
1050            .validate(&s3_key, etag.as_str(), block_idx, 1024, block_size)
1051            .expect_err("should fail with invalid block offset");
1052        assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1053        let err = header
1054            .validate(&s3_key, etag.as_str(), block_idx, block_offset, 42)
1055            .expect_err("should fail with invalid block length");
1056        assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1057
1058        // Bad checksum
1059        header.header_checksum = 23;
1060        let err = header
1061            .validate(&s3_key, etag.as_str(), block_idx, block_offset, block_size)
1062            .expect_err("should fail with invalid checksum");
1063        assert!(matches!(err, DiskBlockAccessError::ChecksumError));
1064    }
1065
1066    #[test_case("key")]
1067    #[test_case("etag")]
1068    #[test_case("data")]
1069    fn read_corrupted_block_should_fail(length_to_corrupt: &str) {
1070        const MAX_LENGTH: u64 = 1024;
1071
1072        /// Read the `u64` value at the given offset.
1073        fn get_u64_at(slice: &[u8], offset: usize) -> u64 {
1074            u64::from_le_bytes(slice[offset..(offset + 8)].try_into().unwrap())
1075        }
1076
1077        /// Replace the `u64` value at `offset` with `new_value`.
1078        fn replace_u64_at(slice: &mut [u8], offset: usize, new_value: u64) {
1079            slice[offset..(offset + 8)].copy_from_slice(&new_value.to_le_bytes());
1080        }
1081
1082        let original_length = 42;
1083        let data = ChecksummedBytes::new(vec![0u8; original_length].into());
1084        let cache_key = ObjectId::new("k".into(), ETag::from_str("e").unwrap());
1085        let block = DiskBlock::new(cache_key.clone(), 0, 0, data).expect("should have no checksum err");
1086
1087        let mut buf = Vec::new();
1088        block.write(&mut buf).unwrap();
1089
1090        // Determine the offset and expected value for the length field under test.
1091        // These values depends on the serialization format for `DiskBlock` and `DiskBlockHeader`.
1092        let (offset, expected) = match length_to_corrupt {
1093            "key" => (24, cache_key.key().len()),
1094            "etag" => (32 + cache_key.key().len(), cache_key.etag().as_str().len()),
1095            "data" => (16, original_length),
1096            _ => panic!("invalid length: {length_to_corrupt}"),
1097        };
1098
1099        assert_eq!(
1100            get_u64_at(&buf, offset) as usize,
1101            expected,
1102            "serialized length should match the expected value (have we changed the serialization format?)"
1103        );
1104
1105        // "Corrupt" the serialized value with an invalid length.
1106        replace_u64_at(&mut buf, offset, u64::MAX);
1107
1108        let pool = PagedPool::new_with_candidate_sizes([MAX_LENGTH as usize]);
1109        let err = DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH, &pool).expect_err("deserialization should fail");
1110        match length_to_corrupt {
1111            "key" | "etag" => assert!(matches!(
1112                err,
1113                DiskBlockReadWriteError::DecodeError(DecodeError::LimitExceeded)
1114            )),
1115            "data" => assert!(matches!(err, DiskBlockReadWriteError::InvalidBlockLength(_))),
1116            _ => panic!("invalid length: {length_to_corrupt}"),
1117        }
1118    }
1119
1120    #[test]
1121    fn test_concurrent_access() {
1122        let block_size = 1024 * 1024;
1123        let cache_directory = tempfile::tempdir().unwrap();
1124        let pool = PagedPool::new_with_candidate_sizes([block_size]);
1125        let data_cache = DiskDataCache::new(
1126            DiskDataCacheConfig {
1127                cache_directory: cache_directory.path().to_path_buf(),
1128                block_size: block_size as u64,
1129                limit: CacheLimit::Unbounded,
1130            },
1131            pool,
1132        );
1133        let data_cache = Arc::new(data_cache);
1134
1135        let cache_key = ObjectId::new("foo".to_owned(), ETag::for_tests());
1136        let block_idx = 0;
1137        let block_offset = 0;
1138        let object_size = 10 * block_size;
1139
1140        let pool = ThreadPool::builder().pool_size(32).create().unwrap();
1141
1142        // Run concurrent tasks getting the same block (and writing on cache miss)
1143        let mut handles = Vec::new();
1144        for _ in 0..100 {
1145            let data_cache = data_cache.clone();
1146            let cache_key = cache_key.clone();
1147            let handle = pool
1148                .spawn_with_handle(async move {
1149                    let block = data_cache
1150                        .get_block(&cache_key, block_idx, block_offset, object_size)
1151                        .await
1152                        .expect("get_block should not return error");
1153                    if block.is_none() {
1154                        let bytes: Bytes = vec![0u8; block_size].into();
1155                        data_cache
1156                            .put_block(cache_key, block_idx, block_offset, bytes.into(), object_size)
1157                            .await
1158                            .expect("put_block should succeed");
1159                    }
1160                })
1161                .unwrap();
1162            handles.push(handle);
1163        }
1164
1165        block_on(async move {
1166            for handle in handles {
1167                handle.await
1168            }
1169        });
1170    }
1171}