mountpoint_s3_fs/data_cache/
disk_data_cache.rs

1//! Module for the on-disk data cache implementation.
2
3use std::fs;
4use std::io::{ErrorKind, Read, Write};
5use std::os::unix::fs::{DirBuilderExt, PermissionsExt};
6use std::path::{Path, PathBuf};
7use std::time::Instant;
8
9use async_trait::async_trait;
10use bincode::config::{Configuration, Fixint, Limit, LittleEndian};
11use bincode::error::{DecodeError, EncodeError};
12use bincode::{Decode, Encode};
13use bytes::Bytes;
14use linked_hash_map::LinkedHashMap;
15use mountpoint_s3_client::checksums::crc32c::{self, Crc32c};
16use sha2::{Digest, Sha256};
17use tempfile::NamedTempFile;
18use thiserror::Error;
19use tracing::{trace, warn};
20
21use crate::checksums::IntegrityError;
22use crate::data_cache::DataCacheError;
23use crate::memory::{BufferKind, PagedPool};
24use crate::metrics::defs::{
25    ATTR_CACHE, CACHE_DISK, CACHE_EVICT_LATENCY, CACHE_GET_ERRORS, CACHE_GET_IO_SIZE, CACHE_GET_LATENCY,
26    CACHE_PUT_ERRORS, CACHE_PUT_IO_SIZE, CACHE_PUT_LATENCY, CACHE_TOTAL_SIZE,
27};
28use crate::object::ObjectId;
29use crate::sync::Mutex;
30
31use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult};
32
33/// Disk and file-layout versioning.
34const CACHE_VERSION: &str = "V2";
35
36/// Index where hashed directory names for the cache are split to avoid FS-specific limits.
37const HASHED_DIR_SPLIT_INDEX: usize = 2;
38
39/// On-disk implementation of [DataCache].
40pub struct DiskDataCache {
41    config: DiskDataCacheConfig,
42    pool: PagedPool,
43    /// Tracks blocks usage. `None` when no cache limit was set.
44    usage: Option<Mutex<UsageInfo<DiskBlockKey>>>,
45}
46
47/// Configuration for a [DiskDataCache].
48#[derive(Debug)]
49pub struct DiskDataCacheConfig {
50    pub cache_directory: PathBuf,
51    /// Size of data blocks.
52    pub block_size: u64,
53    /// How to limit the cache size.
54    pub limit: CacheLimit,
55}
56
57/// Limit the cache size.
58#[derive(Debug)]
59pub enum CacheLimit {
60    Unbounded,
61    TotalSize { max_size: usize },
62    AvailableSpace { min_ratio: f64 },
63}
64
65impl Default for CacheLimit {
66    fn default() -> Self {
67        CacheLimit::AvailableSpace { min_ratio: 0.05 } // Preserve 5% available space
68    }
69}
70
71/// Describes additional information about the data stored in the block.
72///
73/// It should be written alongside the block's data
74/// and used to verify it contains the correct contents to avoid blocks being mixed up.
75#[derive(Encode, Decode, Debug)]
76struct DiskBlockHeader {
77    block_idx: BlockIndex,
78    block_offset: u64,
79    block_len: u64,
80    etag: String,
81    s3_key: String,
82    data_checksum: u32,
83    header_checksum: u32,
84}
85
86/// Max size of header after encoding.
87/// 10000 should easily accommodate any block header:
88/// - S3 key should always be less than or equal to 1024
89/// - Allow 1024 for ETag, even though its always much smaller
90/// - Integers should be up to 8 bytes each
91const BINCODE_HEADER_MAX_SIZE: usize = 10000;
92
93/// Binary encoding configuration for the block header.
94const BINCODE_CONFIG: Configuration<LittleEndian, Fixint, Limit<BINCODE_HEADER_MAX_SIZE>> = bincode::config::standard()
95    .with_fixed_int_encoding()
96    .with_limit::<BINCODE_HEADER_MAX_SIZE>();
97
98/// Error during creation of a [DiskBlock]
99#[derive(Debug, Error)]
100enum DiskBlockCreationError {
101    /// Data corruption detected when unpacking bytes and checksum
102    #[error(transparent)]
103    IntegrityError(#[from] IntegrityError),
104}
105
106/// Error during access to a [DiskBlock]
107#[derive(Debug, Error)]
108enum DiskBlockAccessError {
109    #[error("checksum over the block's fields did not match the field content")]
110    ChecksumError,
111    #[error("one or more of the fields in this block were incorrect")]
112    FieldMismatchError,
113}
114
115/// Error when reading or writing a [DiskBlock]
116#[derive(Debug, Error)]
117enum DiskBlockReadWriteError {
118    #[error("Invalid block length: {0}")]
119    InvalidBlockLength(u64),
120    #[error("Error decoding the block: {0}")]
121    DecodeError(DecodeError),
122    #[error("Error encoding the block: {0}")]
123    EncodeError(EncodeError),
124    #[error("IO error: {0}")]
125    IOError(#[from] std::io::Error),
126}
127
128impl DiskBlockHeader {
129    /// Creates a new [DiskBlockHeader]
130    pub fn new(
131        block_idx: BlockIndex,
132        block_offset: u64,
133        block_len: usize,
134        etag: String,
135        s3_key: String,
136        data_checksum: Crc32c,
137    ) -> Self {
138        let data_checksum = data_checksum.value();
139        let header_checksum =
140            Self::compute_checksum(block_idx, block_offset, block_len, &etag, &s3_key, data_checksum).value();
141        DiskBlockHeader {
142            block_idx,
143            block_offset,
144            block_len: block_len as u64,
145            etag,
146            s3_key,
147            data_checksum,
148            header_checksum,
149        }
150    }
151
152    fn compute_checksum(
153        block_idx: BlockIndex,
154        block_offset: u64,
155        block_len: usize,
156        etag: &str,
157        s3_key: &str,
158        data_checksum: u32,
159    ) -> Crc32c {
160        let mut hasher = crc32c::Hasher::new();
161        hasher.update(&block_idx.to_be_bytes());
162        hasher.update(&block_offset.to_be_bytes());
163        hasher.update(&block_len.to_be_bytes());
164        hasher.update(etag.as_bytes());
165        hasher.update(s3_key.as_bytes());
166        hasher.update(&data_checksum.to_be_bytes());
167        hasher.finalize()
168    }
169
170    /// Validate the integrity of the contained data and return the stored data checksum.
171    ///
172    /// Execute this method before acting on the data contained within.
173    pub fn validate(
174        &self,
175        s3_key: &str,
176        etag: &str,
177        block_idx: BlockIndex,
178        block_offset: u64,
179        block_len: usize,
180    ) -> Result<Crc32c, DiskBlockAccessError> {
181        let s3_key_match = s3_key == self.s3_key;
182        let etag_match = etag == self.etag;
183        let block_idx_match = block_idx == self.block_idx;
184        let block_offset_match = block_offset == self.block_offset;
185        let block_size_match = block_len == self.block_len as usize;
186
187        let data_checksum = self.data_checksum;
188        if s3_key_match && etag_match && block_idx_match && block_offset_match && block_size_match {
189            if Self::compute_checksum(block_idx, block_offset, block_len, etag, s3_key, data_checksum).value()
190                != self.header_checksum
191            {
192                Err(DiskBlockAccessError::ChecksumError)
193            } else {
194                Ok(Crc32c::new(data_checksum))
195            }
196        } else {
197            warn!(
198                s3_key_match,
199                etag_match, block_idx_match, block_size_match, "block data did not match expected values",
200            );
201            Err(DiskBlockAccessError::FieldMismatchError)
202        }
203    }
204}
205
206/// Represents a fixed-size chunk of data that can be serialized.
207#[derive(Debug)]
208struct DiskBlock {
209    /// Information describing the content of `data`, to be used to verify correctness
210    header: DiskBlockHeader,
211    /// Cached bytes
212    data: Bytes,
213}
214
215impl DiskBlock {
216    /// Create a new [DiskBlock].
217    ///
218    /// This may return an integrity error if the checksummed byte buffer is found to be corrupt.
219    /// However, this check is not guaranteed and it shouldn't be assumed that the data within the block is not corrupt.
220    fn new(
221        cache_key: ObjectId,
222        block_idx: BlockIndex,
223        block_offset: u64,
224        bytes: ChecksummedBytes,
225    ) -> Result<Self, DiskBlockCreationError> {
226        let s3_key = cache_key.key().to_owned();
227        let etag = cache_key.etag().as_str().to_owned();
228        let (data, data_checksum) = bytes.into_inner()?;
229        let header = DiskBlockHeader::new(block_idx, block_offset, data.len(), etag, s3_key, data_checksum);
230
231        Ok(DiskBlock { data, header })
232    }
233
234    /// Extract the block data, checking that fields such as S3 key, etc. match what we expect.
235    ///
236    /// Comparing these fields helps ensure we have not corrupted or swapped block data on disk.
237    fn data(
238        &self,
239        cache_key: &ObjectId,
240        block_idx: BlockIndex,
241        block_offset: u64,
242    ) -> Result<ChecksummedBytes, DiskBlockAccessError> {
243        let data_checksum = self.header.validate(
244            cache_key.key(),
245            cache_key.etag().as_str(),
246            block_idx,
247            block_offset,
248            self.data.len(),
249        )?;
250        let bytes = ChecksummedBytes::new_from_inner_data(self.data.clone(), data_checksum);
251        Ok(bytes)
252    }
253
254    /// Deserialize an instance from `reader`.
255    fn read(reader: &mut impl Read, block_size: u64, pool: &PagedPool) -> Result<Self, DiskBlockReadWriteError> {
256        let header: DiskBlockHeader = bincode::decode_from_std_read(reader, BINCODE_CONFIG)?;
257
258        if header.block_len > block_size {
259            return Err(DiskBlockReadWriteError::InvalidBlockLength(header.block_len));
260        }
261
262        let size = header.block_len as usize;
263        let mut buffer = pool.get_buffer_mut(size, BufferKind::DiskCache);
264        buffer.fill_from_reader(reader)?;
265        let data = buffer.into_bytes();
266
267        Ok(Self { header, data })
268    }
269
270    /// Serialize this instance to `writer` and return the number of bytes written on success.
271    fn write(&self, writer: &mut impl Write) -> Result<usize, DiskBlockReadWriteError> {
272        let header_length = bincode::encode_into_std_write(&self.header, writer, BINCODE_CONFIG)?;
273        writer.write_all(&self.data)?;
274        Ok(header_length + self.data.len())
275    }
276}
277
278impl From<DecodeError> for DiskBlockReadWriteError {
279    fn from(value: DecodeError) -> Self {
280        match value {
281            DecodeError::Io { inner, .. } => DiskBlockReadWriteError::IOError(inner),
282            value => DiskBlockReadWriteError::DecodeError(value),
283        }
284    }
285}
286
287impl From<EncodeError> for DiskBlockReadWriteError {
288    fn from(value: EncodeError) -> Self {
289        match value {
290            EncodeError::Io { inner, .. } => DiskBlockReadWriteError::IOError(inner),
291            value => DiskBlockReadWriteError::EncodeError(value),
292        }
293    }
294}
295
296impl From<std::io::Error> for DataCacheError {
297    fn from(e: std::io::Error) -> Self {
298        DataCacheError::IoFailure(e.into())
299    }
300}
301
302impl From<DiskBlockReadWriteError> for DataCacheError {
303    fn from(value: DiskBlockReadWriteError) -> Self {
304        match value {
305            DiskBlockReadWriteError::IOError(e) => DataCacheError::IoFailure(e.into()),
306            _ => DataCacheError::InvalidBlockContent,
307        }
308    }
309}
310
311impl DiskDataCache {
312    /// Create a new instance of an [DiskDataCache] with the specified configuration.
313    pub fn new(config: DiskDataCacheConfig, pool: PagedPool) -> Self {
314        let usage = match &config.limit {
315            CacheLimit::Unbounded => None,
316            CacheLimit::TotalSize { .. } | CacheLimit::AvailableSpace { .. } => Some(Mutex::new(UsageInfo::new())),
317        };
318        DiskDataCache { config, pool, usage }
319    }
320
321    /// Get the relative path for the given block.
322    fn get_path_for_block_key(&self, block_key: &DiskBlockKey) -> PathBuf {
323        let mut path = self.config.cache_directory.join(CACHE_VERSION);
324        block_key.append_to_path(&mut path);
325        path
326    }
327
328    fn read_block(
329        &self,
330        path: impl AsRef<Path>,
331        cache_key: &ObjectId,
332        block_idx: BlockIndex,
333        block_offset: u64,
334    ) -> DataCacheResult<Option<ChecksummedBytes>> {
335        trace!(
336            key = ?cache_key.key(),
337            offset = block_offset,
338            path = ?path.as_ref(),
339            "reading cache block",
340        );
341        let mut file = match fs::File::open(path.as_ref()) {
342            Ok(file) => file,
343            Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None),
344            Err(err) => return Err(err.into()),
345        };
346
347        let mut block_version = [0; CACHE_VERSION.len()];
348        file.read_exact(&mut block_version)?;
349        if block_version != CACHE_VERSION.as_bytes() {
350            warn!(
351                found_version = ?block_version, expected_version = ?CACHE_VERSION, path = ?path.as_ref(),
352                "stale block format found during reading"
353            );
354            return Err(DataCacheError::InvalidBlockContent);
355        }
356
357        let block = DiskBlock::read(&mut file, self.block_size(), &self.pool)
358            .inspect_err(|e| warn!(path = ?path.as_ref(), "block could not be deserialized: {:?}", e))?;
359        let bytes = block
360            .data(cache_key, block_idx, block_offset)
361            .map_err(|err| match err {
362                DiskBlockAccessError::ChecksumError | DiskBlockAccessError::FieldMismatchError => {
363                    DataCacheError::InvalidBlockContent
364                }
365            })?;
366
367        Ok(Some(bytes))
368    }
369
370    fn write_block(&self, path: impl AsRef<Path>, block: DiskBlock) -> DataCacheResult<(NamedTempFile, usize)> {
371        let path = path.as_ref();
372        let cache_path_for_key = path.parent().expect("path should include cache key in directory name");
373        fs::DirBuilder::new()
374            .mode(0o700)
375            .recursive(true)
376            .create(cache_path_for_key)?;
377
378        let mut temp_file = tempfile::Builder::new()
379            .permissions(fs::Permissions::from_mode(0o600))
380            .tempfile_in(cache_path_for_key)?;
381        trace!(
382            key = block.header.s3_key,
383            offset = block.header.block_offset,
384            block_path = ?path,
385            temp_path = ?temp_file.path(),
386            "writing cache block",
387        );
388        temp_file.write_all(CACHE_VERSION.as_bytes())?;
389        let bytes_written = block.write(&mut temp_file)?;
390        Ok((temp_file, bytes_written))
391    }
392
393    fn is_limit_exceeded(&self, size: usize) -> bool {
394        metrics::gauge!(CACHE_TOTAL_SIZE, ATTR_CACHE => CACHE_DISK).set(size as f64);
395        match self.config.limit {
396            CacheLimit::Unbounded => false,
397            CacheLimit::TotalSize { max_size } => size > max_size,
398            CacheLimit::AvailableSpace { min_ratio } => {
399                let stats = match nix::sys::statvfs::statvfs(&self.config.cache_directory) {
400                    Ok(stats) if stats.blocks() == 0 => {
401                        warn!("unable to determine available space (0 blocks reported)");
402                        return false;
403                    }
404                    Ok(stats) => stats,
405                    Err(error) => {
406                        warn!(?error, "unable to determine available space");
407                        return false;
408                    }
409                };
410                (stats.blocks_free() as f64) < min_ratio * (stats.blocks() as f64)
411            }
412        }
413    }
414
415    fn evict_if_needed(&self) -> DataCacheResult<()> {
416        let Some(usage) = &self.usage else {
417            return Ok(());
418        };
419
420        loop {
421            let mut usage = usage.lock().unwrap();
422            if !self.is_limit_exceeded(usage.size) {
423                break;
424            }
425            let Some(to_remove) = usage.evict_lru() else {
426                warn!("cache limit exceeded but nothing to evict");
427                return Err(DataCacheError::EvictionFailure);
428            };
429            let path_to_remove = self.get_path_for_block_key(&to_remove);
430            trace!("evicting block at {}", path_to_remove.display());
431            if let Err(remove_err) = fs::remove_file(&path_to_remove)
432                && remove_err.kind() != ErrorKind::NotFound
433            {
434                warn!("unable to evict block: {:?}", remove_err);
435            }
436        }
437        Ok(())
438    }
439}
440
441/// Hash the cache key using its fields as well as the [CACHE_VERSION].
442fn hash_cache_key_raw(cache_key: &ObjectId) -> [u8; 32] {
443    let s3_key = cache_key.key();
444    let etag = cache_key.etag();
445
446    let mut hasher = Sha256::new();
447    hasher.update(CACHE_VERSION);
448    hasher.update(s3_key);
449    hasher.update(etag.as_str());
450    hasher.finalize().into()
451}
452
453#[async_trait]
454impl DataCache for DiskDataCache {
455    async fn get_block(
456        &self,
457        cache_key: &ObjectId,
458        block_idx: BlockIndex,
459        block_offset: u64,
460        _object_size: usize,
461    ) -> DataCacheResult<Option<ChecksummedBytes>> {
462        if block_offset != block_idx * self.config.block_size {
463            return Err(DataCacheError::InvalidBlockOffset);
464        }
465        let start = Instant::now();
466        let block_key = DiskBlockKey::new(cache_key, block_idx);
467        let path = self.get_path_for_block_key(&block_key);
468        let result = match self.read_block(&path, cache_key, block_idx, block_offset) {
469            Ok(None) => {
470                // Cache miss.
471                Ok(None)
472            }
473            Ok(Some(bytes)) => {
474                // Cache hit.
475                metrics::histogram!(CACHE_GET_IO_SIZE, ATTR_CACHE => CACHE_DISK).record(bytes.len() as f64);
476                if let Some(usage) = &self.usage {
477                    usage.lock().unwrap().refresh(&block_key);
478                }
479                Ok(Some(bytes))
480            }
481            Err(err) => {
482                // Invalid block.
483                metrics::counter!(CACHE_GET_ERRORS, ATTR_CACHE => CACHE_DISK).increment(1);
484                Err(err)
485            }
486        };
487        metrics::histogram!(CACHE_GET_LATENCY, ATTR_CACHE => CACHE_DISK).record(start.elapsed().as_micros() as f64);
488        result
489    }
490
491    async fn put_block(
492        &self,
493        cache_key: ObjectId,
494        block_idx: BlockIndex,
495        block_offset: u64,
496        bytes: ChecksummedBytes,
497        _object_size: usize,
498    ) -> DataCacheResult<()> {
499        if block_offset != block_idx * self.config.block_size {
500            return Err(DataCacheError::InvalidBlockOffset);
501        }
502        let start = Instant::now();
503        let bytes_len = bytes.len();
504        let block_key = DiskBlockKey::new(&cache_key, block_idx);
505        let path = self.get_path_for_block_key(&block_key);
506        trace!(?cache_key, ?path, "new block will be created in disk cache");
507
508        // Capture the put operation result separately from metrics recording
509        // to ensure we can record both success and error metrics consistently
510        let put_result = (|| -> DataCacheResult<()> {
511            let block = DiskBlock::new(cache_key, block_idx, block_offset, bytes).map_err(|err| match err {
512                DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent,
513            })?;
514
515            {
516                let eviction_start = Instant::now();
517                let result = self.evict_if_needed();
518                metrics::histogram!(CACHE_EVICT_LATENCY, ATTR_CACHE => CACHE_DISK)
519                    .record(eviction_start.elapsed().as_micros() as f64);
520                result
521            }?;
522
523            let result = self.write_block(&path, block);
524            let (temp_file, size) = result?;
525
526            if let Some(usage) = &self.usage {
527                let mut usage = usage.lock().unwrap();
528                _ = temp_file.persist(path).map_err(|e| e.error)?;
529                usage.add(block_key, size);
530            } else {
531                _ = temp_file.persist(path).map_err(|e| e.error)?;
532            }
533
534            Ok(())
535        })();
536
537        if put_result.is_ok() {
538            metrics::histogram!(CACHE_PUT_IO_SIZE, ATTR_CACHE => CACHE_DISK).record(bytes_len as f64);
539        } else {
540            metrics::counter!(CACHE_PUT_ERRORS, ATTR_CACHE => CACHE_DISK).increment(1);
541        }
542        metrics::histogram!(CACHE_PUT_LATENCY, ATTR_CACHE => CACHE_DISK).record(start.elapsed().as_micros() as f64);
543        put_result
544    }
545
546    fn block_size(&self) -> u64 {
547        self.config.block_size
548    }
549}
550
551/// Key to identify a block in the disk cache, composed of a hash of the S3 key and Etag, and the block index.
552/// An S3 key may be up to 1024 UTF-8 bytes long, which exceeds the maximum UNIX file name length.
553/// Instead, this key contains a hash of the S3 key and ETag to avoid the limit when used in paths.
554/// The risk of collisions is mitigated as we ignore blocks read that contain the wrong S3 key, etc..
555#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
556struct DiskBlockKey {
557    hashed_key: [u8; 32],
558    block_index: BlockIndex,
559}
560
561impl DiskBlockKey {
562    fn new(cache_key: &ObjectId, block_index: BlockIndex) -> Self {
563        let hashed_key = hash_cache_key_raw(cache_key);
564        Self {
565            hashed_key,
566            block_index,
567        }
568    }
569
570    fn hex_key(&self) -> String {
571        hex::encode(self.hashed_key)
572    }
573
574    fn append_to_path(&self, path: &mut PathBuf) {
575        let hashed_cache_key = self.hex_key();
576
577        // Split directories by taking the first few chars of hash to avoid hitting any FS-specific maximum number of directory entries.
578        let (first, second) = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
579        path.push(first);
580        path.push(second);
581
582        // Append the block index.
583        path.push(format!("{:010}", self.block_index));
584    }
585}
586
587/// Keeps track of entries usage and total size.
588struct UsageInfo<K> {
589    entries: LinkedHashMap<K, usize>,
590    size: usize,
591}
592
593impl<K> UsageInfo<K>
594where
595    K: std::hash::Hash + Eq + std::fmt::Debug,
596{
597    fn new() -> Self {
598        Self {
599            entries: LinkedHashMap::new(),
600            size: 0,
601        }
602    }
603
604    /// Refresh the given key if present, marking it as the most recently used.
605    /// Returns `false` if the key is not in the cache.
606    fn refresh(&mut self, key: &K) -> bool {
607        self.entries.get_refresh(key).is_some()
608    }
609
610    /// Add or replace a key and update the total size.
611    fn add(&mut self, key: K, size: usize) {
612        if let Some(previous_size) = self.entries.insert(key, size) {
613            self.size = self.size.saturating_sub(previous_size);
614        }
615
616        self.size = self.size.saturating_add(size);
617    }
618
619    /// Remove the least recently used key and update the total size.
620    /// Return `None` if empty.
621    fn evict_lru(&mut self) -> Option<K> {
622        let (key, size) = self.entries.pop_front()?;
623        self.size = self.size.saturating_sub(size);
624        Some(key)
625    }
626}
627
628#[cfg(test)]
629mod tests {
630    use std::str::FromStr;
631    use std::{ffi::OsString, io::Cursor};
632
633    use super::*;
634
635    use futures::StreamExt as _;
636    use futures::executor::{ThreadPool, block_on};
637    use futures::task::SpawnExt;
638    use mountpoint_s3_client::types::ETag;
639    use rand::rngs::SmallRng;
640    use rand::{Rng, SeedableRng};
641    use test_case::test_case;
642
643    use crate::sync::Arc;
644
645    #[test]
646    fn test_block_format_version_requires_update() {
647        let cache_key = ObjectId::new("hello-world".to_string(), ETag::for_tests());
648        let data = ChecksummedBytes::new("Foo".into());
649        let block = DiskBlock::new(cache_key, 100, 100 * 10, data).expect("should succeed as data checksum is valid");
650        let expected_bytes: Vec<u8> = vec![
651            100, 0, 0, 0, 0, 0, 0, 0, 232, 3, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116,
652            101, 115, 116, 95, 101, 116, 97, 103, 11, 0, 0, 0, 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114,
653            108, 100, 9, 85, 128, 46, 13, 202, 106, 46, 70, 111, 111,
654        ];
655        let mut serialized_bytes = Vec::new();
656        block.write(&mut serialized_bytes).unwrap();
657        assert_eq!(
658            expected_bytes, serialized_bytes,
659            "serialized disk format appears to have changed, version bump required"
660        );
661    }
662
663    #[test]
664    fn test_hash_cache_key_raw() {
665        let s3_key = "a".repeat(266);
666        let etag = ETag::for_tests();
667        let key = ObjectId::new(s3_key, etag);
668        let expected_hash = "1cfd611a26062b33e98d48a84e967ddcc2a42957479a8abd541e29cfa3258639";
669        let actual_hash = hex::encode(hash_cache_key_raw(&key));
670        assert_eq!(expected_hash, actual_hash);
671    }
672
673    #[test]
674    fn get_path_for_block_key() {
675        let cache_dir = PathBuf::from("mountpoint-cache/");
676        let pool = PagedPool::new_with_candidate_sizes([1024]);
677        let data_cache = DiskDataCache::new(
678            DiskDataCacheConfig {
679                cache_directory: cache_dir,
680                block_size: 1024,
681                limit: CacheLimit::Unbounded,
682            },
683            pool,
684        );
685
686        let s3_key = "a".repeat(266);
687        let etag = ETag::for_tests();
688        let key = ObjectId::new(s3_key.to_owned(), etag);
689
690        let block_key = DiskBlockKey::new(&key, 5);
691        let hashed_cache_key = hex::encode(hash_cache_key_raw(&key));
692        let split_hashed_key = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
693        let expected = vec![
694            "mountpoint-cache",
695            CACHE_VERSION,
696            split_hashed_key.0,
697            split_hashed_key.1,
698            "0000000005",
699        ];
700        let path = data_cache.get_path_for_block_key(&block_key);
701        let results: Vec<OsString> = path.iter().map(ToOwned::to_owned).collect();
702        assert_eq!(expected, results);
703    }
704
705    #[test]
706    fn get_path_for_block_key_huge_block_index() {
707        let cache_dir = PathBuf::from("mountpoint-cache/");
708        let pool = PagedPool::new_with_candidate_sizes([1024]);
709        let data_cache = DiskDataCache::new(
710            DiskDataCacheConfig {
711                cache_directory: cache_dir,
712                block_size: 1024,
713                limit: CacheLimit::Unbounded,
714            },
715            pool,
716        );
717
718        let s3_key = "a".repeat(266);
719        let etag = ETag::for_tests();
720        let key = ObjectId::new(s3_key.to_owned(), etag);
721
722        let block_key = DiskBlockKey::new(&key, 1000000000000000);
723        let hashed_cache_key = hex::encode(hash_cache_key_raw(&key));
724        let split_hashed_key = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX);
725        let expected = vec![
726            "mountpoint-cache",
727            CACHE_VERSION,
728            split_hashed_key.0,
729            split_hashed_key.1,
730            "1000000000000000",
731        ];
732        let path = data_cache.get_path_for_block_key(&block_key);
733        let results: Vec<OsString> = path.iter().map(ToOwned::to_owned).collect();
734        assert_eq!(expected, results);
735    }
736
737    #[test_case(8 * 1024 * 1024, 8 * 1024 * 1024; "matching block and pool buffer sizes")]
738    #[test_case(1024 * 1024, 8 * 1024 * 1024; "block size smaller than pool buffer size")]
739    #[test_case(8 * 1024 * 1024, 1024 * 1024; "block size larger than pool buffer size")]
740    #[tokio::test]
741    async fn test_put_get(block_size: u64, pool_buffer_size: usize) {
742        let data_1 = ChecksummedBytes::new("Foo".into());
743        let data_2 = ChecksummedBytes::new("Bar".into());
744        let data_3 = ChecksummedBytes::new("Baz".into());
745
746        let object_1_size = data_1.len() + data_3.len();
747        let object_2_size = data_2.len();
748
749        let cache_directory = tempfile::tempdir().unwrap();
750        let pool = PagedPool::new_with_candidate_sizes([pool_buffer_size]);
751        let cache = DiskDataCache::new(
752            DiskDataCacheConfig {
753                cache_directory: cache_directory.path().to_path_buf(),
754                block_size,
755                limit: CacheLimit::Unbounded,
756            },
757            pool,
758        );
759        let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
760        let cache_key_2 = ObjectId::new(
761            "long-key_".repeat(100), // at least 900 chars, exceeding easily 255 chars (UNIX filename limit)
762            ETag::for_tests(),
763        );
764
765        let block = cache
766            .get_block(&cache_key_1, 0, 0, object_1_size)
767            .await
768            .expect("cache should be accessible");
769        assert!(
770            block.is_none(),
771            "no entry should be available to return but got {block:?}",
772        );
773
774        // PUT and GET, OK?
775        cache
776            .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size)
777            .await
778            .expect("cache should be accessible");
779        let entry = cache
780            .get_block(&cache_key_1, 0, 0, object_1_size)
781            .await
782            .expect("cache should be accessible")
783            .expect("cache entry should be returned");
784        assert_eq!(
785            data_1, entry,
786            "cache entry returned should match original bytes after put"
787        );
788
789        // PUT AND GET a second file, OK?
790        cache
791            .put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size)
792            .await
793            .expect("cache should be accessible");
794        let entry = cache
795            .get_block(&cache_key_2, 0, 0, object_2_size)
796            .await
797            .expect("cache should be accessible")
798            .expect("cache entry should be returned");
799        assert_eq!(
800            data_2, entry,
801            "cache entry returned should match original bytes after put"
802        );
803
804        // PUT AND GET a second block in a cache entry, OK?
805        cache
806            .put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size)
807            .await
808            .expect("cache should be accessible");
809        let entry = cache
810            .get_block(&cache_key_1, 1, block_size, object_1_size)
811            .await
812            .expect("cache should be accessible")
813            .expect("cache entry should be returned");
814        assert_eq!(
815            data_3, entry,
816            "cache entry returned should match original bytes after put"
817        );
818
819        // Entry 1's first block still intact
820        let entry = cache
821            .get_block(&cache_key_1, 0, 0, object_1_size)
822            .await
823            .expect("cache should be accessible")
824            .expect("cache entry should be returned");
825        assert_eq!(
826            data_1, entry,
827            "cache entry returned should match original bytes after put"
828        );
829    }
830
831    #[tokio::test]
832    async fn test_checksummed_bytes_slice() {
833        let data = ChecksummedBytes::new("0123456789".into());
834        let slice = data.slice(1..5);
835
836        let cache_directory = tempfile::tempdir().unwrap();
837        let pool = PagedPool::new_with_candidate_sizes([8 * 1024 * 1024]);
838        let cache = DiskDataCache::new(
839            DiskDataCacheConfig {
840                cache_directory: cache_directory.path().to_path_buf(),
841                block_size: 8 * 1024 * 1024,
842                limit: CacheLimit::Unbounded,
843            },
844            pool,
845        );
846        let cache_key = ObjectId::new("a".into(), ETag::for_tests());
847
848        cache
849            .put_block(cache_key.clone(), 0, 0, slice.clone(), slice.len())
850            .await
851            .expect("cache should be accessible");
852        let entry = cache
853            .get_block(&cache_key, 0, 0, slice.len())
854            .await
855            .expect("cache should be accessible")
856            .expect("cache entry should be returned");
857        assert_eq!(
858            slice.into_bytes().expect("original slice should be valid"),
859            entry.into_bytes().expect("returned entry should be valid"),
860            "cache entry returned should match original slice after put"
861        );
862    }
863
864    #[tokio::test]
865    async fn test_eviction() {
866        const BLOCK_SIZE: usize = 100 * 1024;
867        const LARGE_OBJECT_SIZE: usize = 1024 * 1024;
868        const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2;
869        const CACHE_LIMIT: usize = LARGE_OBJECT_SIZE;
870
871        fn create_random(seed: u64, size: usize) -> ChecksummedBytes {
872            let mut rng = SmallRng::seed_from_u64(seed);
873            let mut body = vec![0u8; size];
874            rng.fill(&mut body[..]);
875
876            ChecksummedBytes::new(body.into())
877        }
878
879        async fn is_block_in_cache(
880            cache: &DiskDataCache,
881            cache_key: &ObjectId,
882            block_idx: u64,
883            expected_bytes: &ChecksummedBytes,
884            object_size: usize,
885        ) -> bool {
886            if let Some(retrieved) = cache
887                .get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64, object_size)
888                .await
889                .expect("cache should be accessible")
890            {
891                assert_eq!(
892                    retrieved.clone().into_bytes().expect("retrieved bytes should be valid"),
893                    expected_bytes
894                        .clone()
895                        .into_bytes()
896                        .expect("original bytes should be valid")
897                );
898                true
899            } else {
900                false
901            }
902        }
903
904        let large_object = create_random(0x12345678, LARGE_OBJECT_SIZE);
905        let large_object_blocks: Vec<_> = (0..large_object.len())
906            .step_by(BLOCK_SIZE)
907            .map(|offset| large_object.slice(offset..(large_object.len().min(offset + BLOCK_SIZE))))
908            .collect();
909        let large_object_key = ObjectId::new("large".into(), ETag::for_tests());
910
911        let small_object = create_random(0x23456789, SMALL_OBJECT_SIZE);
912        let small_object_blocks: Vec<_> = (0..small_object.len())
913            .step_by(BLOCK_SIZE)
914            .map(|offset| small_object.slice(offset..(small_object.len().min(offset + BLOCK_SIZE))))
915            .collect();
916        let small_object_key = ObjectId::new("small".into(), ETag::for_tests());
917
918        let cache_directory = tempfile::tempdir().unwrap();
919        let pool = PagedPool::new_with_candidate_sizes([BLOCK_SIZE]);
920        let cache = DiskDataCache::new(
921            DiskDataCacheConfig {
922                cache_directory: cache_directory.path().to_path_buf(),
923                block_size: BLOCK_SIZE as u64,
924                limit: CacheLimit::TotalSize { max_size: CACHE_LIMIT },
925            },
926            pool,
927        );
928
929        // Put all of large_object
930        for (block_idx, bytes) in large_object_blocks.iter().enumerate() {
931            cache
932                .put_block(
933                    large_object_key.clone(),
934                    block_idx as u64,
935                    (block_idx * BLOCK_SIZE) as u64,
936                    bytes.clone(),
937                    LARGE_OBJECT_SIZE,
938                )
939                .await
940                .unwrap();
941        }
942
943        // Put all of small_object
944        for (block_idx, bytes) in small_object_blocks.iter().enumerate() {
945            cache
946                .put_block(
947                    small_object_key.clone(),
948                    block_idx as u64,
949                    (block_idx * BLOCK_SIZE) as u64,
950                    bytes.clone(),
951                    SMALL_OBJECT_SIZE,
952                )
953                .await
954                .unwrap();
955        }
956
957        let count_small_object_blocks_in_cache = futures::stream::iter(small_object_blocks.iter().enumerate())
958            .filter(|&(block_idx, bytes)| {
959                is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes, SMALL_OBJECT_SIZE)
960            })
961            .count()
962            .await;
963        assert_eq!(
964            count_small_object_blocks_in_cache,
965            small_object_blocks.len(),
966            "All blocks for small object should still be in the cache"
967        );
968
969        let count_large_object_blocks_in_cache = futures::stream::iter(large_object_blocks.iter().enumerate())
970            .filter(|&(block_idx, bytes)| {
971                is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes, LARGE_OBJECT_SIZE)
972            })
973            .count()
974            .await;
975        assert!(
976            count_large_object_blocks_in_cache < large_object_blocks.len(),
977            "Some blocks for the large object should have been evicted"
978        );
979    }
980
981    #[test]
982    fn data_block_extract_checks() {
983        let data_1 = ChecksummedBytes::new("Foo".into());
984
985        let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
986        let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests());
987        let cache_key_3 = ObjectId::new("a".into(), ETag::from_str("badetag").unwrap());
988
989        let block = DiskBlock::new(cache_key_1.clone(), 0, 0, data_1.clone()).expect("should have no checksum err");
990        block
991            .data(&cache_key_1, 1, 0)
992            .expect_err("should fail due to incorrect block index");
993        block
994            .data(&cache_key_1, 0, 1024)
995            .expect_err("should fail due to incorrect block offset");
996        block
997            .data(&cache_key_2, 0, 0)
998            .expect_err("should fail due to incorrect s3 key in cache key");
999        block
1000            .data(&cache_key_3, 0, 0)
1001            .expect_err("should fail due to incorrect etag in cache key");
1002        let unpacked_bytes = block
1003            .data(&cache_key_1, 0, 0)
1004            .expect("should be OK as all fields match");
1005        assert_eq!(data_1, unpacked_bytes, "data block should return original bytes");
1006    }
1007
1008    #[test]
1009    fn validate_block_header() {
1010        let block_idx = 0;
1011        let block_offset = 0;
1012        let block_size = 4;
1013        let etag = ETag::for_tests();
1014        let s3_key = String::from("s3/key");
1015        let data_checksum = Crc32c::new(42);
1016        let mut header = DiskBlockHeader::new(
1017            block_idx,
1018            block_offset,
1019            block_size,
1020            etag.as_str().to_owned(),
1021            s3_key.clone(),
1022            data_checksum,
1023        );
1024
1025        let checksum = header
1026            .validate(&s3_key, etag.as_str(), block_idx, block_offset, block_size)
1027            .expect("should be OK with valid fields and checksum");
1028        assert_eq!(data_checksum, checksum);
1029
1030        // Bad fields
1031        let err = header
1032            .validate("hello", etag.as_str(), block_idx, block_offset, block_size)
1033            .expect_err("should fail with invalid s3_key");
1034        assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1035        let err = header
1036            .validate(&s3_key, "bad etag", block_idx, block_offset, block_size)
1037            .expect_err("should fail with invalid etag");
1038        assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1039        let err = header
1040            .validate(&s3_key, etag.as_str(), 5, block_offset, block_size)
1041            .expect_err("should fail with invalid block idx");
1042        assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1043        let err = header
1044            .validate(&s3_key, etag.as_str(), block_idx, 1024, block_size)
1045            .expect_err("should fail with invalid block offset");
1046        assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1047        let err = header
1048            .validate(&s3_key, etag.as_str(), block_idx, block_offset, 42)
1049            .expect_err("should fail with invalid block length");
1050        assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
1051
1052        // Bad checksum
1053        header.header_checksum = 23;
1054        let err = header
1055            .validate(&s3_key, etag.as_str(), block_idx, block_offset, block_size)
1056            .expect_err("should fail with invalid checksum");
1057        assert!(matches!(err, DiskBlockAccessError::ChecksumError));
1058    }
1059
1060    #[test_case("key")]
1061    #[test_case("etag")]
1062    #[test_case("data")]
1063    fn read_corrupted_block_should_fail(length_to_corrupt: &str) {
1064        const MAX_LENGTH: u64 = 1024;
1065
1066        /// Read the `u64` value at the given offset.
1067        fn get_u64_at(slice: &[u8], offset: usize) -> u64 {
1068            u64::from_le_bytes(slice[offset..(offset + 8)].try_into().unwrap())
1069        }
1070
1071        /// Replace the `u64` value at `offset` with `new_value`.
1072        fn replace_u64_at(slice: &mut [u8], offset: usize, new_value: u64) {
1073            slice[offset..(offset + 8)].copy_from_slice(&new_value.to_le_bytes());
1074        }
1075
1076        let original_length = 42;
1077        let data = ChecksummedBytes::new(vec![0u8; original_length].into());
1078        let cache_key = ObjectId::new("k".into(), ETag::from_str("e").unwrap());
1079        let block = DiskBlock::new(cache_key.clone(), 0, 0, data).expect("should have no checksum err");
1080
1081        let mut buf = Vec::new();
1082        block.write(&mut buf).unwrap();
1083
1084        // Determine the offset and expected value for the length field under test.
1085        // These values depends on the serialization format for `DiskBlock` and `DiskBlockHeader`.
1086        let (offset, expected) = match length_to_corrupt {
1087            "key" => (24, cache_key.key().len()),
1088            "etag" => (32 + cache_key.key().len(), cache_key.etag().as_str().len()),
1089            "data" => (16, original_length),
1090            _ => panic!("invalid length: {length_to_corrupt}"),
1091        };
1092
1093        assert_eq!(
1094            get_u64_at(&buf, offset) as usize,
1095            expected,
1096            "serialized length should match the expected value (have we changed the serialization format?)"
1097        );
1098
1099        // "Corrupt" the serialized value with an invalid length.
1100        replace_u64_at(&mut buf, offset, u64::MAX);
1101
1102        let pool = PagedPool::new_with_candidate_sizes([MAX_LENGTH as usize]);
1103        let err = DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH, &pool).expect_err("deserialization should fail");
1104        match length_to_corrupt {
1105            "key" | "etag" => assert!(matches!(
1106                err,
1107                DiskBlockReadWriteError::DecodeError(DecodeError::LimitExceeded)
1108            )),
1109            "data" => assert!(matches!(err, DiskBlockReadWriteError::InvalidBlockLength(_))),
1110            _ => panic!("invalid length: {length_to_corrupt}"),
1111        }
1112    }
1113
1114    #[test]
1115    fn test_concurrent_access() {
1116        let block_size = 1024 * 1024;
1117        let cache_directory = tempfile::tempdir().unwrap();
1118        let pool = PagedPool::new_with_candidate_sizes([block_size]);
1119        let data_cache = DiskDataCache::new(
1120            DiskDataCacheConfig {
1121                cache_directory: cache_directory.path().to_path_buf(),
1122                block_size: block_size as u64,
1123                limit: CacheLimit::Unbounded,
1124            },
1125            pool,
1126        );
1127        let data_cache = Arc::new(data_cache);
1128
1129        let cache_key = ObjectId::new("foo".to_owned(), ETag::for_tests());
1130        let block_idx = 0;
1131        let block_offset = 0;
1132        let object_size = 10 * block_size;
1133
1134        let pool = ThreadPool::builder().pool_size(32).create().unwrap();
1135
1136        // Run concurrent tasks getting the same block (and writing on cache miss)
1137        let mut handles = Vec::new();
1138        for _ in 0..100 {
1139            let data_cache = data_cache.clone();
1140            let cache_key = cache_key.clone();
1141            let handle = pool
1142                .spawn_with_handle(async move {
1143                    let block = data_cache
1144                        .get_block(&cache_key, block_idx, block_offset, object_size)
1145                        .await
1146                        .expect("get_block should not return error");
1147                    if block.is_none() {
1148                        let bytes: Bytes = vec![0u8; block_size].into();
1149                        data_cache
1150                            .put_block(cache_key, block_idx, block_offset, bytes.into(), object_size)
1151                            .await
1152                            .expect("put_block should succeed");
1153                    }
1154                })
1155                .unwrap();
1156            handles.push(handle);
1157        }
1158
1159        block_on(async move {
1160            for handle in handles {
1161                handle.await
1162            }
1163        });
1164    }
1165}