Skip to main content

xet_client/chunk_cache/
disk.rs

1use std::collections::HashMap;
2use std::fs::{DirEntry, File};
3use std::io::{self, Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
4use std::mem::size_of;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use base64::Engine;
10use base64::engine::GeneralPurpose;
11use base64::engine::general_purpose::URL_SAFE;
12use tokio::sync::RwLock;
13use tracing::{debug, error};
14use xet_core_structures::merklehash::MerkleHash;
15use xet_runtime::core::xet_config;
16use xet_runtime::error_printer::ErrorPrinter;
17use xet_runtime::file_utils::SafeFileCreator;
18use xet_runtime::utils::output_bytes;
19
20use self::cache_file_header::CacheFileHeader;
21use self::cache_item::{CacheItem, VerificationCell};
22use super::error::ChunkCacheError;
23use super::{CacheConfig, CacheRange, ChunkCache};
24use crate::cas_types::{ChunkRange, Key};
25
26mod cache_file_header;
27mod cache_item;
28pub mod test_utils;
29
30// consistently use URL_SAFE (also file path safe) base64 codec
31pub(crate) const BASE64_ENGINE: GeneralPurpose = URL_SAFE;
32const PREFIX_DIR_NAME_LEN: usize = 2;
33
34type OptionResult<T, E> = Result<Option<T>, E>;
35
36#[derive(Debug, Clone)]
37struct CacheState {
38    inner: HashMap<Key, Vec<VerificationCell<CacheItem>>>,
39    num_items: usize,
40    total_bytes: u64,
41}
42
43impl CacheState {
44    fn new(state: HashMap<Key, Vec<VerificationCell<CacheItem>>>, num_items: usize, total_bytes: u64) -> Self {
45        Self {
46            inner: state,
47            num_items,
48            total_bytes,
49        }
50    }
51
52    fn find_match(&self, key: &Key, range: &ChunkRange) -> Option<VerificationCell<CacheItem>> {
53        let items = self.inner.get(key)?;
54
55        // attempt to find a matching range in the given key's items using
56        for item in items.iter() {
57            if item.range.start <= range.start && range.end <= item.range.end {
58                return Some(item.clone());
59            }
60        }
61        None
62    }
63
64    /// removed items from the cache (including deleting from file system)
65    /// until at least to_remove number of bytes have been removed
66    ///
67    /// removes data from in memory state and returns a list of file paths to delete
68    /// (so that deletion can occur after the locked state is dropped)
69    fn evict_to_capacity(
70        &mut self,
71        max_total_bytes: u64,
72    ) -> Result<Vec<(Key, VerificationCell<CacheItem>)>, ChunkCacheError> {
73        let original_total_bytes = self.total_bytes;
74        let mut ret = Vec::new();
75
76        while self.total_bytes > max_total_bytes {
77            let Some((key, idx)) = self.random_item() else {
78                error!("attempted to evict item, but no item could be found to be evicted");
79                break;
80            };
81            let items = self.inner.get_mut(&key).ok_or(ChunkCacheError::Infallible)?;
82            let cache_item = items.swap_remove(idx);
83            let len = cache_item.len;
84
85            if items.is_empty() {
86                self.inner.remove(&key);
87            }
88
89            ret.push((key, cache_item));
90
91            self.total_bytes -= len;
92            self.num_items -= 1;
93        }
94        debug!(
95            "cache evicting {} items totaling {}",
96            ret.len(),
97            output_bytes(original_total_bytes - self.total_bytes)
98        );
99
100        Ok(ret)
101    }
102
103    /// returns the key and index within that key for a random item
104    fn random_item(&self) -> Option<(Key, usize)> {
105        debug_assert_eq!(
106            self.inner.values().map(|v| v.len()).sum::<usize>(),
107            self.num_items,
108            "real num items != stored num items"
109        );
110
111        if self.num_items == 0 {
112            error!("cache random_item for eviction: no items in cache");
113            return None;
114        }
115        let random_item = rand::random::<u32>() as usize % self.num_items;
116        let mut count = 0;
117        for (key, items) in self.inner.iter() {
118            if random_item < count + items.len() {
119                return Some((key.clone(), random_item - count));
120            }
121            count += items.len();
122        }
123        // should never occur
124        error!("cache random_item for eviction: tried to return random item error not enough items");
125        None
126    }
127}
128
129/// DiskCache is a ChunkCache implementor that saves data on the file system
130#[derive(Debug, Clone)]
131pub struct DiskCache {
132    cache_root: PathBuf,
133    capacity: u64,
134    state: Arc<RwLock<CacheState>>,
135}
136
137// helper for analysis binary to print inner state
138#[cfg(feature = "analysis")]
139impl DiskCache {
140    pub async fn print(&self) {
141        let state = self.state.read().await;
142        let total_num_items = state.num_items;
143        let total_total_bytes = state.total_bytes;
144
145        println!(
146            "total items: {}, total bytes {} for the whole cache",
147            total_num_items,
148            output_bytes(total_total_bytes)
149        );
150
151        for (key, items) in state.inner.iter() {
152            println!();
153            let num_items = items.len();
154            let total_bytes: usize = items.iter().map(|item| item.len).fold(0usize, |acc, len| acc + len as usize);
155            println!("key: {key}");
156            println!("\ttotal items: {}, total bytes {} for key {key}", num_items, output_bytes(total_bytes as u64));
157            println!();
158            for item in items.iter() {
159                println!(
160                    "\titem: chunk range [{}-{}) ; len({}); checksum({})",
161                    item.range.start,
162                    item.range.end,
163                    output_bytes(item.len),
164                    item.checksum,
165                );
166            }
167        }
168    }
169}
170
171impl DiskCache {
172    pub async fn num_items(&self) -> usize {
173        self.state.read().await.num_items
174    }
175
176    pub async fn total_bytes(&self) -> u64 {
177        self.state.read().await.total_bytes
178    }
179
180    /// initialize will create a new DiskCache with the capacity and cache root based on the config
181    /// the cache file system layout is rooted at the provided config.cache_directory and initialize
182    /// will attempt to load any pre-existing cache state into memory.
183    ///
184    /// an configured size of 0 caused initialization to fail
185    ///
186    /// The cache layout is as follows:
187    ///
188    /// each key (cas hash) in the cache is a directory, containing "cache items" that each provide
189    /// some range of data.
190    ///
191    /// keys are grouped into subdirectories under the cache rootbased on the first 2 chacters of their
192    /// file name, which is base64 encoded, leading to at most 64 * 64 directories under the cache root.
193    ///
194    /// cache_root/
195    /// ├── [ab]/
196    /// │   ├── [key 1 (ab123...)]/
197    /// │   │   ├── [range 0-100, file_len, file_hash]
198    /// │   │   ├── [range 102-300, file_len, file_hash]
199    /// │   │   └── [range 900-1024, file_len, file_hash]
200    /// │   ├── [key 2 (ab456...)]/
201    /// │       └── [range 0-1020, file_len, file_hash]
202    /// ├── [cd]/
203    /// │   └── [key 3 (cd123...)]/
204    /// │       ├── [range 30-31, file_len, file_hash]
205    /// │       ├── [range 400-402, file_len, file_hash]
206    /// │       ├── [range 404-405, file_len, file_hash]
207    /// │       └── [range 679-700, file_len, file_hash]
208    pub fn initialize(config: &CacheConfig) -> Result<Self, ChunkCacheError> {
209        if config.cache_size == 0 {
210            return Err(ChunkCacheError::InvalidArguments);
211        }
212        let capacity = config.cache_size;
213        let cache_root = config.cache_directory.clone();
214
215        // May take a while; don't block the runtime for this.
216        let state = Self::initialize_state(&cache_root, capacity)?;
217
218        Ok(Self {
219            state: Arc::new(RwLock::new(state)),
220            cache_root: config.cache_directory.clone(),
221            capacity,
222        })
223    }
224
225    fn initialize_state(cache_root: &PathBuf, capacity: u64) -> Result<CacheState, ChunkCacheError> {
226        let mut state = HashMap::new();
227        let mut total_bytes = 0;
228        let mut num_items = 0;
229        let max_num_bytes = 2 * capacity;
230
231        let Some(cache_root_readdir) = read_dir(cache_root)? else {
232            return Ok(CacheState::new(state, 0, 0));
233        };
234
235        // loop through cache root directory, first level containing "prefix" directories
236        // each of which may contain key directories with cache items
237        for key_prefix_dir in cache_root_readdir {
238            let Some(key_prefix_dir) = is_ok_dir(key_prefix_dir)? else {
239                continue;
240            };
241
242            let key_prefix_dir_name = key_prefix_dir.file_name();
243            if key_prefix_dir_name.as_encoded_bytes().len() != PREFIX_DIR_NAME_LEN {
244                debug!("prefix dir name len != {PREFIX_DIR_NAME_LEN}");
245                continue;
246            }
247
248            let Some(key_prefix_readdir) = read_dir(key_prefix_dir.path())? else {
249                continue;
250            };
251
252            // loop through key directories inside prefix directory
253            for key_dir in key_prefix_readdir {
254                let key_dir = match is_ok_dir(key_dir) {
255                    Ok(Some(dirent)) => dirent,
256                    Ok(None) => continue,
257                    Err(e) => return Err(e),
258                };
259
260                let key_dir_name = key_dir.file_name();
261
262                // asserts that the prefix dir name is actually the prefix of this key dir
263                debug_assert_eq!(
264                    key_dir_name.as_encoded_bytes()[..PREFIX_DIR_NAME_LEN].to_ascii_uppercase(),
265                    key_prefix_dir_name.as_encoded_bytes().to_ascii_uppercase(),
266                    "{key_dir_name:?}",
267                );
268
269                let key = match try_parse_key(key_dir_name.as_encoded_bytes()) {
270                    Ok(key) => key,
271                    Err(e) => {
272                        debug!("failed to decoded a directory name as a key: {e}");
273                        continue;
274                    },
275                };
276
277                let mut items = Vec::new();
278
279                let key_readdir = match read_dir(key_dir.path()) {
280                    Ok(Some(krd)) => krd,
281                    Ok(None) => continue,
282                    Err(e) => return Err(e),
283                };
284
285                // loop through cache items inside key directory
286                for item in key_readdir {
287                    let cache_item = match try_parse_cache_file(item, capacity) {
288                        Ok(Some(ci)) => ci,
289                        Ok(None) => continue,
290                        Err(e) => return Err(e),
291                    };
292
293                    total_bytes += cache_item.len;
294                    num_items += 1;
295                    items.push(VerificationCell::new_unverified(cache_item));
296
297                    // if already filled capacity, stop iterating over cache items
298                    if total_bytes >= max_num_bytes {
299                        state.insert(key, items);
300                        return Ok(CacheState::new(state, num_items, total_bytes));
301                    }
302                }
303
304                if !items.is_empty() {
305                    state.insert(key, items);
306                }
307            }
308        }
309
310        Ok(CacheState::new(state, num_items, total_bytes))
311    }
312
313    async fn get_impl(&self, key: &Key, range: &ChunkRange) -> OptionResult<CacheRange, ChunkCacheError> {
314        if range.start >= range.end {
315            return Err(ChunkCacheError::InvalidArguments);
316        }
317
318        loop {
319            let Some(cache_item) = self.state.read().await.find_match(key, range) else {
320                return Ok(None);
321            };
322
323            let path = self.item_path(key, &cache_item)?;
324
325            let mut file = match File::open(&path) {
326                Ok(file) => file,
327                Err(e) => match e.kind() {
328                    ErrorKind::NotFound => {
329                        self.remove_item(key, &cache_item).await?;
330                        continue;
331                    },
332                    _ => return Err(e.into()),
333                },
334            };
335
336            if !cache_item.is_verified() {
337                let checksum = crc32_from_reader(&mut file)?;
338                if checksum == cache_item.checksum {
339                    cache_item.verify();
340                    file.rewind()?;
341                } else {
342                    debug!("computed checksum {checksum} mismatch on cache item {key}/{cache_item}");
343                    self.remove_item(key, &cache_item).await?;
344                    continue;
345                }
346            }
347
348            let mut file_reader = std::io::BufReader::new(file);
349
350            let Ok(header) = CacheFileHeader::deserialize(&mut file_reader)
351                .debug_error(format!("failed to deserialize cache file header on path: {path:?}"))
352            else {
353                self.remove_item(key, &cache_item).await?;
354                continue;
355            };
356
357            let start = cache_item.range.start;
358            let result_buf = get_range_from_cache_file(&header, &mut file_reader, range, start)?;
359            return Ok(Some(result_buf));
360        }
361    }
362
363    async fn put_impl(
364        &self,
365        key: &Key,
366        range: &ChunkRange,
367        chunk_byte_indices: &[u32],
368        data: &[u8],
369    ) -> Result<(), ChunkCacheError> {
370        if range.start >= range.end
371            || chunk_byte_indices.len() != (range.end - range.start + 1) as usize
372            // chunk_byte_indices is guaranteed to be more than 1 element at this point
373            || chunk_byte_indices[0] != 0
374            || *chunk_byte_indices.last().unwrap() as usize != data.len()
375            || !strictly_increasing(chunk_byte_indices)
376        {
377            return Err(ChunkCacheError::InvalidArguments);
378        }
379
380        // check if we already contain the range
381        while let Some(cache_item) = self.state.read().await.find_match(key, range) {
382            if self.validate_match(key, range, chunk_byte_indices, data, &cache_item).await? {
383                return Ok(());
384            }
385        }
386
387        let header = CacheFileHeader::new(chunk_byte_indices);
388        let mut header_buf = Vec::with_capacity(header.header_len());
389        header.serialize(&mut header_buf)?;
390        let len = (header_buf.len() + data.len()) as u64;
391        if len > self.capacity {
392            // refusing to add this item as it is too large for the cache with configured capacity
393            return Ok(());
394        }
395
396        let checksum = {
397            let mut hasher = crc32fast::Hasher::new();
398            hasher.update(&header_buf);
399            hasher.update(data);
400            hasher.finalize()
401        };
402
403        let cache_item = CacheItem {
404            range: *range,
405            len,
406            checksum,
407        };
408
409        // write cache item file
410        let path = self.item_path(key, &cache_item)?;
411        let mut fw = SafeFileCreator::new(path)?;
412        fw.write_all(&header_buf)?;
413        fw.write_all(data)?;
414
415        // evict items after ensuring the file write but before committing to cache state
416        // to avoid removing new item.
417        let mut state_write = self.state.write().await;
418
419        // acquiring lock to state before closing the file
420        // this will ensure that this thread is the only one writing to the final
421        // cache file but allowing other threads to modify the state while we write the file
422        // before committing it.
423        if state_write.find_match(key, range).is_some() {
424            // another thread already added this item or overlapping item while this thread
425            // was writing the file
426            fw.abort()?;
427            return Ok(());
428        }
429        fw.close()?;
430
431        // Evict entries to make sure we have enough room.
432        let evicted_paths = state_write.evict_to_capacity(self.capacity - cache_item.len)?;
433
434        // add the item info in-memory state after evictions are done
435        state_write.num_items += 1;
436        state_write.total_bytes += cache_item.len;
437        let item_set = state_write.inner.entry(key.clone()).or_default();
438        item_set.push(VerificationCell::new_verified(cache_item));
439
440        // release lock
441        drop(state_write);
442
443        // remove files after done with modifying in memory state and releasing lock
444        for (key, cache_item) in evicted_paths {
445            let path = self.item_path(&key, &cache_item)?;
446            remove_file(&path)?;
447            // check and try to remove key path if all items evicted for key
448            let dir_path = path.parent().ok_or(ChunkCacheError::Infallible)?;
449            check_remove_dir(dir_path)?;
450        }
451
452        Ok(())
453    }
454
455    // on a non-error case, returns true if the item is a good match and a new item should not be inserted
456    // returns false if not a good match and should be removed.
457    async fn validate_match(
458        &self,
459        key: &Key,
460        range: &ChunkRange,
461        chunk_byte_indices: &[u32],
462        data: &[u8],
463        cache_item: &VerificationCell<CacheItem>,
464    ) -> Result<bool, ChunkCacheError> {
465        // this is a redundant check
466        if range.start < cache_item.range.start || range.end > cache_item.range.end {
467            return Err(ChunkCacheError::BadRange);
468        }
469
470        // validate stored data
471        let path = self.item_path(key, cache_item)?;
472
473        let Ok(mut file) = File::open(path) else {
474            self.remove_item(key, cache_item).await?;
475            return Ok(false);
476        };
477        let md = file.metadata()?;
478        if md.len() != cache_item.len {
479            self.remove_item(key, cache_item).await?;
480            return Ok(false);
481        }
482        let mut buf = Vec::with_capacity(md.len() as usize);
483        file.read_to_end(&mut buf)?;
484        let checksum = crc32fast::hash(&buf);
485        if checksum != cache_item.checksum {
486            self.remove_item(key, cache_item).await?;
487            return Ok(false);
488        }
489        let mut reader = Cursor::new(buf);
490        let Ok(header) = CacheFileHeader::deserialize(&mut reader) else {
491            self.remove_item(key, cache_item).await?;
492            return Ok(false);
493        };
494
495        // validate the chunk_byte_indices and data input against stored data
496        // the chunk_byte_indices should match the chunk lengths, if the ranges
497        // don't start at the same chunk, values will be different, what's important
498        // to match is the chunk lengths, i.e. difference in the offsets.
499        let idx_start = (range.start - cache_item.range.start) as usize;
500        let idx_end = (range.end - cache_item.range.start + 1) as usize;
501        for i in idx_start..idx_end - 1 {
502            let stored_diff = header.chunk_byte_indices[i + 1] - header.chunk_byte_indices[i];
503            let given_diff = chunk_byte_indices[i + 1 - idx_start] - chunk_byte_indices[i - idx_start];
504            if stored_diff != given_diff {
505                debug!(
506                    "failed to match chunk lens for these chunk offsets {} {:?}\n{} {:?}",
507                    cache_item.range,
508                    &header.chunk_byte_indices[idx_start..idx_end],
509                    range,
510                    chunk_byte_indices
511                );
512                return Err(ChunkCacheError::InvalidArguments);
513            }
514        }
515
516        let stored = get_range_from_cache_file(&header, &mut reader, range, cache_item.range.start)?;
517        if data != stored.data {
518            return Err(ChunkCacheError::InvalidArguments);
519        }
520        Ok(true)
521    }
522
523    /// removes an item from both the in-memory state of the cache and the file system
524    async fn remove_item(&self, key: &Key, cache_item: &VerificationCell<CacheItem>) -> Result<(), ChunkCacheError> {
525        {
526            let mut state = self.state.write().await;
527            if let Some(items) = state.inner.get_mut(key) {
528                let idx = match index_of(items, cache_item) {
529                    Some(idx) => idx,
530                    // item is no longer in the state
531                    None => return Ok(()),
532                };
533
534                items.swap_remove(idx);
535                if items.is_empty() {
536                    state.inner.remove(key);
537                }
538                state.total_bytes -= cache_item.len;
539                state.num_items -= 1;
540            }
541        }
542
543        let path = self.item_path(key, cache_item)?;
544
545        if !path.exists() {
546            return Ok(());
547        }
548        remove_file(&path)?;
549        let dir_path = path.parent().ok_or(ChunkCacheError::Infallible)?;
550        check_remove_dir(dir_path)
551    }
552
553    fn item_path(&self, key: &Key, cache_item: &CacheItem) -> Result<PathBuf, ChunkCacheError> {
554        Ok(self.cache_root.join(key_dir(key)).join(cache_item.file_name()?))
555    }
556}
557
558fn crc32_from_reader(reader: &mut impl Read) -> Result<u32, ChunkCacheError> {
559    const CRC_BUFFER_SIZE: usize = 4096;
560    let mut buf = [0u8; CRC_BUFFER_SIZE];
561    let mut hasher = crc32fast::Hasher::new();
562    loop {
563        let num_read = reader.read(&mut buf)?;
564        if num_read == 0 {
565            break;
566        }
567        hasher.update(&buf[..num_read])
568    }
569    Ok(hasher.finalize())
570}
571
572#[inline]
573fn index_of<T: PartialEq>(list: &[T], value: &T) -> Option<usize> {
574    for (i, list_value) in list.iter().enumerate() {
575        if list_value == value {
576            return Some(i);
577        }
578    }
579    None
580}
581
582fn strictly_increasing(chunk_byte_indices: &[u32]) -> bool {
583    for i in 1..chunk_byte_indices.len() {
584        if chunk_byte_indices[i - 1] >= chunk_byte_indices[i] {
585            return false;
586        }
587    }
588    true
589}
590
591fn get_range_from_cache_file<R: Read + Seek>(
592    header: &CacheFileHeader,
593    file_contents: &mut R,
594    range: &ChunkRange,
595    start: u32,
596) -> Result<CacheRange, ChunkCacheError> {
597    let start_idx = (range.start - start) as usize;
598    let end_idx = (range.end - start) as usize;
599    let start_byte = header.chunk_byte_indices.get(start_idx).ok_or(ChunkCacheError::BadRange)?;
600    let end_byte = header.chunk_byte_indices.get(end_idx).ok_or(ChunkCacheError::BadRange)?;
601    file_contents.seek(SeekFrom::Start((*start_byte as usize + header.header_len()) as u64))?;
602    let mut data = vec![0; (end_byte - start_byte) as usize];
603    file_contents.read_exact(&mut data)?;
604    let offsets: Vec<u32> = header.chunk_byte_indices[start_idx..=end_idx]
605        .iter()
606        .map(|v| *v - header.chunk_byte_indices[start_idx])
607        .collect();
608
609    debug_assert_eq!(range.end - range.start, offsets.len() as u32 - 1);
610
611    Ok(CacheRange {
612        offsets,
613        data,
614        range: *range,
615    })
616}
617
618// wrapper over std::fs::read_dir
619// returns Ok(None) on a not found error
620fn read_dir(path: impl AsRef<Path>) -> OptionResult<std::fs::ReadDir, ChunkCacheError> {
621    match std::fs::read_dir(path) {
622        Ok(rd) => Ok(Some(rd)),
623        Err(e) => {
624            if e.kind() == ErrorKind::NotFound {
625                Ok(None)
626            } else {
627                Err(e.into())
628            }
629        },
630    }
631}
632
633// returns Ok(Some(_)) if result dirent is a directory, Ok(None) if was removed
634// also returns an Ok(None) if the dirent is not a directory, in which case we should
635//   not remove it in case the user put something inadvertantly or intentionally,
636//   but not attempt to parse it as a valid cache directory.
637// Err(_) if an unrecoverable error occurred
638fn is_ok_dir(dir_result: Result<DirEntry, io::Error>) -> OptionResult<DirEntry, ChunkCacheError> {
639    let dirent = match dir_result {
640        Ok(kd) => kd,
641        Err(e) => {
642            if e.kind() == ErrorKind::NotFound {
643                return Ok(None);
644            }
645            return Err(e.into());
646        },
647    };
648    let md = match dirent.metadata() {
649        Ok(md) => md,
650        Err(e) => {
651            if e.kind() == ErrorKind::NotFound {
652                return Ok(None);
653            }
654            return Err(e.into());
655        },
656    };
657    if !md.is_dir() {
658        debug!("CACHE: expected directory at {:?}, is not directory", dirent.path());
659        return Ok(None);
660    }
661    Ok(Some(dirent))
662}
663
664// given a result from readdir attempts to parse it as a cache file handle
665// i.e. validate its file name against the contents (excluding file-hash-validation)
666// validate that it is a file, correct len, and is not too large.
667fn try_parse_cache_file(file_result: io::Result<DirEntry>, capacity: u64) -> OptionResult<CacheItem, ChunkCacheError> {
668    let item = match file_result {
669        Ok(item) => item,
670        Err(e) => {
671            if e.kind() == ErrorKind::NotFound {
672                return Ok(None);
673            }
674            return Err(e.into());
675        },
676    };
677    let md = match item.metadata() {
678        Ok(md) => md,
679        Err(e) => {
680            if e.kind() == ErrorKind::NotFound {
681                return Ok(None);
682            }
683            return Err(e.into());
684        },
685    };
686
687    if !md.is_file() {
688        return Ok(None);
689    }
690    if md.len() > xet_config().chunk_cache.size_bytes {
691        return Err(ChunkCacheError::general(format!(
692            "Cache directory contains a file larger than {} GB, cache directory state is invalid",
693            (xet_config().chunk_cache.size_bytes as f64 / (1 << 30) as f64)
694        )));
695    }
696
697    // don't track an item that takes up the whole capacity
698    if md.len() > capacity {
699        return Ok(None);
700    }
701
702    let cache_item = match CacheItem::parse(item.file_name().as_encoded_bytes())
703        .debug_error("failed to decode a file name as a cache item")
704    {
705        Ok(i) => i,
706        Err(e) => {
707            debug!("not a valid cache file, removing: {:?} {e:?}", item.file_name());
708            remove_file(item.path())?;
709            return Ok(None);
710        },
711    };
712    if md.len() != cache_item.len {
713        // file is invalid, remove it
714        debug!(
715            "cache file len {} does not match expected length {}, removing path: {:?}",
716            md.len(),
717            cache_item.len,
718            item.path()
719        );
720        remove_file(item.path())?;
721        return Ok(None);
722    }
723    Ok(Some(cache_item))
724}
725
726/// removes a file but disregards a "NotFound" error if the file is already gone
727fn remove_file(path: impl AsRef<Path>) -> Result<(), ChunkCacheError> {
728    if let Err(e) = std::fs::remove_file(path)
729        && e.kind() != ErrorKind::NotFound
730    {
731        return Err(e.into());
732    }
733    Ok(())
734}
735
736/// removes a directory but disregards a "NotFound" error if the directory is already gone
737fn remove_dir(path: impl AsRef<Path>) -> Result<(), ChunkCacheError> {
738    if let Err(e) = std::fs::remove_dir(path)
739        && e.kind() != ErrorKind::NotFound
740    {
741        return Err(e.into());
742    }
743    Ok(())
744}
745
746// assumes dir_path is a path to a key directory i.e. cache_root/<prefix_dir>/<key_dir>
747// assumes a misformatted path is an error
748// checks if the directory is empty and removes it if so, then checks if the prefix dir is empty and removes it if so
749fn check_remove_dir(dir_path: impl AsRef<Path>) -> Result<(), ChunkCacheError> {
750    let readdir = match read_dir(&dir_path)? {
751        Some(rd) => rd,
752        None => return Ok(()),
753    };
754    if readdir.peekable().peek().is_some() {
755        return Ok(());
756    }
757    // directory empty, remove it
758    remove_dir(&dir_path)?;
759
760    // try to check and remove the prefix dir
761    let prefix_dir = dir_path.as_ref().parent().ok_or(ChunkCacheError::Infallible)?;
762
763    let prefix_readdir = match read_dir(prefix_dir)? {
764        Some(prd) => prd,
765        None => return Ok(()),
766    };
767    if prefix_readdir.peekable().peek().is_some() {
768        return Ok(());
769    }
770    // directory empty, remove it
771    remove_dir(prefix_dir)
772}
773
774/// tries to parse just a Key from a file name encoded by fn `key_dir`
775/// expects only the key portion of the file path, with the prefix not present.
776fn try_parse_key(file_name: &[u8]) -> Result<Key, ChunkCacheError> {
777    let buf = BASE64_ENGINE.decode(file_name)?;
778    let hash = MerkleHash::from_slice(&buf[..size_of::<MerkleHash>()])?;
779    let prefix = String::from(std::str::from_utf8(&buf[size_of::<MerkleHash>()..])?);
780    Ok(Key { prefix, hash })
781}
782
783/// key_dir returns a directory name string formed from the key
784/// the format is BASE64_encode([ key.hash[..], key.prefix.as_bytes()[..] ])
785fn key_dir(key: &Key) -> PathBuf {
786    let prefix_bytes = key.prefix.as_bytes();
787    let mut buf = vec![0u8; size_of::<MerkleHash>() + prefix_bytes.len()];
788    buf[..size_of::<MerkleHash>()].copy_from_slice(key.hash.as_bytes());
789    buf[size_of::<MerkleHash>()..].copy_from_slice(prefix_bytes);
790    let encoded = BASE64_ENGINE.encode(&buf);
791    let prefix_dir = &encoded[..PREFIX_DIR_NAME_LEN];
792    let dir_str = format!("{prefix_dir}/{encoded}");
793    PathBuf::from(dir_str)
794}
795
796#[async_trait]
797impl ChunkCache for DiskCache {
798    async fn get(&self, key: &Key, range: &ChunkRange) -> Result<Option<CacheRange>, ChunkCacheError> {
799        self.get_impl(key, range).await
800    }
801
802    async fn put(
803        &self,
804        key: &Key,
805        range: &ChunkRange,
806        chunk_byte_indices: &[u32],
807        data: &[u8],
808    ) -> Result<(), ChunkCacheError> {
809        self.put_impl(key, range, chunk_byte_indices, data).await
810    }
811}
812
813#[cfg(test)]
814mod tests {
815    use std::collections::BTreeSet;
816
817    use rand::SeedableRng;
818    use rand::rngs::StdRng;
819    use tempfile::TempDir;
820    use xet_runtime::utils::output_bytes;
821
822    use super::super::{CacheConfig, ChunkCache};
823    use super::test_utils::*;
824    use super::{DiskCache, try_parse_key};
825    use crate::cas_types::{ChunkRange, Key};
826
827    const RANDOM_SEED: u64 = 9089 << 20 | 120043;
828
829    const DEFAULT_CHUNK_CACHE_CAPACITY: u64 = 10_000_000_000;
830
831    #[tokio::test]
832    async fn test_get_cache_empty() {
833        let mut rng = StdRng::seed_from_u64(RANDOM_SEED);
834        let cache_root = TempDir::new().unwrap();
835        let config = CacheConfig {
836            cache_directory: cache_root.path().to_path_buf(),
837            cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
838            ..Default::default()
839        };
840        let cache = DiskCache::initialize(&config).unwrap();
841        assert!(
842            cache
843                .get(&random_key(&mut rng), &random_range(&mut rng))
844                .await
845                .unwrap()
846                .is_none()
847        );
848    }
849
850    #[tokio::test]
851    async fn test_put_get_simple() {
852        let mut rng = StdRng::seed_from_u64(RANDOM_SEED);
853        let cache_root = TempDir::new().unwrap();
854        let config = CacheConfig {
855            cache_directory: cache_root.path().to_path_buf(),
856            cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
857            ..Default::default()
858        };
859        let cache = DiskCache::initialize(&config).unwrap();
860
861        let key = random_key(&mut rng);
862        let range = ChunkRange::new(0, 4);
863        let (chunk_byte_indices, data) = random_bytes(&mut rng, &range, RANGE_LEN);
864        let put_result = cache.put(&key, &range, &chunk_byte_indices, data.as_slice()).await;
865        assert!(put_result.is_ok(), "{put_result:?}");
866
867        print_directory_contents(cache_root.as_ref());
868
869        // hit
870        let cache_result = cache.get(&key, &range).await.unwrap();
871        assert!(cache_result.is_some());
872        let cache_range = cache_result.unwrap();
873        assert_eq!(cache_range.data, data);
874        assert_eq!(cache_range.range, range);
875        assert_eq!(cache_range.offsets, chunk_byte_indices);
876
877        let miss_range = ChunkRange::new(100, 101);
878        // miss
879        assert!(cache.get(&key, &miss_range).await.unwrap().is_none());
880    }
881
882    #[tokio::test]
883    async fn test_put_get_subrange() {
884        let mut rng = StdRng::seed_from_u64(RANDOM_SEED);
885        let cache_root = TempDir::new().unwrap();
886        let config = CacheConfig {
887            cache_directory: cache_root.path().to_path_buf(),
888            cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
889            ..Default::default()
890        };
891        let cache = DiskCache::initialize(&config).unwrap();
892
893        let key = random_key(&mut rng);
894        // following parts of test assume overall inserted range includes chunk 0
895        let range = ChunkRange::new(0, 4);
896        let (chunk_byte_indices, data) = random_bytes(&mut rng, &range, RANGE_LEN);
897        let put_result = cache.put(&key, &range, &chunk_byte_indices, data.as_slice()).await;
898        assert!(put_result.is_ok(), "{put_result:?}");
899
900        print_directory_contents(cache_root.as_ref());
901
902        for start in range.start..range.end {
903            for end in (start + 1)..=range.end {
904                let sub_range = ChunkRange::new(start, end);
905                let get_result = cache.get(&key, &sub_range).await.unwrap();
906                assert!(get_result.is_some(), "range: [{start} {end})");
907                let cache_range = get_result.unwrap();
908                assert_eq!(cache_range.range, sub_range);
909                // assert that offsets has 1 more item than the range len difference
910                assert_eq!(cache_range.offsets.len() as u32, sub_range.end - sub_range.start + 1);
911
912                for (expected, actual) in chunk_byte_indices[(start as usize)..=(end as usize)]
913                    .iter()
914                    .map(|v| *v - chunk_byte_indices[start as usize])
915                    .zip(cache_range.offsets.iter())
916                {
917                    assert_eq!(*actual, expected);
918                }
919
920                let start_byte = chunk_byte_indices[sub_range.start as usize] as usize;
921                let end_byte = chunk_byte_indices[sub_range.end as usize] as usize;
922                let data_portion = &data[start_byte..end_byte];
923                assert_eq!(data_portion, &cache_range.data);
924            }
925        }
926    }
927
928    #[tokio::test]
929    async fn test_puts_eviction() {
930        const MIN_NUM_KEYS: u32 = 12;
931        const CAP: u64 = (RANGE_LEN * (MIN_NUM_KEYS - 1)) as u64;
932        let cache_root = TempDir::new().unwrap();
933        let config = CacheConfig {
934            cache_directory: cache_root.path().to_path_buf(),
935            cache_size: CAP,
936            ..Default::default()
937        };
938        let cache = DiskCache::initialize(&config).unwrap();
939        let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
940
941        // fill the cache to almost capacity
942        for _ in 0..MIN_NUM_KEYS {
943            let (key, range, offsets, data) = it.next().unwrap();
944            assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
945        }
946        let total_bytes = cache.total_bytes().await;
947        assert!(total_bytes <= CAP, "cache size: {} <= {}", output_bytes(total_bytes), output_bytes(CAP));
948
949        let (key, range, offsets, data) = it.next().unwrap();
950        let result = cache.put(&key, &range, &offsets, &data).await;
951        assert!(result.is_ok());
952    }
953
954    #[tokio::test]
955    async fn test_same_puts_noop() {
956        let cache_root = TempDir::new().unwrap();
957        let config = CacheConfig {
958            cache_directory: cache_root.path().to_path_buf(),
959            cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
960            ..Default::default()
961        };
962        let cache = DiskCache::initialize(&config).unwrap();
963        let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED).with_range_len(1000);
964        let (key, range, offsets, data) = it.next().unwrap();
965        assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
966        assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
967    }
968
969    #[tokio::test]
970    async fn test_overlap_range_data_mismatch_fail() {
971        let setup = || async move {
972            let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
973            let cache_root = TempDir::new().unwrap();
974            let config = CacheConfig {
975                cache_directory: cache_root.path().to_path_buf(),
976                cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
977                ..Default::default()
978            };
979            let cache = DiskCache::initialize(&config).unwrap();
980            let (key, range, offsets, data) = it.next().unwrap();
981            assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
982            (cache_root, cache, key, range, offsets, data)
983        };
984
985        // bad offsets
986        // totally random, mismatch len from range
987        let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
988        offsets.remove(1);
989        assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
990
991        // start isn't 0
992        let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
993        offsets[0] = 100;
994        assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
995
996        // end isn't data.len()
997        let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
998        *offsets.last_mut().unwrap() = data.len() as u32 + 1;
999        assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
1000
1001        // not strictly increasing
1002        let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
1003        offsets[2] = offsets[1];
1004        assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
1005
1006        // not matching
1007        let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
1008        offsets[1] += 1;
1009        assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
1010
1011        // bad data
1012        // size mismatch given offsets
1013        let (_cache_root, cache, key, range, offsets, data) = setup().await;
1014        assert!(cache.put(&key, &range, &offsets, &data[1..]).await.is_err());
1015
1016        // data changed
1017        let (_cache_root, cache, key, range, offsets, mut data) = setup().await;
1018        data[0] += 1;
1019        assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
1020    }
1021
1022    #[tokio::test]
1023    async fn test_initialize_non_empty() {
1024        let cache_root = TempDir::new().unwrap();
1025        let config = CacheConfig {
1026            cache_directory: cache_root.path().to_path_buf(),
1027            cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
1028            ..Default::default()
1029        };
1030        let cache = DiskCache::initialize(&config).unwrap();
1031
1032        let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
1033
1034        let mut keys_and_ranges = Vec::new();
1035
1036        for _ in 0..20 {
1037            let (key, range, offsets, data) = it.next().unwrap();
1038            assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
1039            keys_and_ranges.push((key, range));
1040        }
1041
1042        let cache2 = DiskCache::initialize(&config).unwrap();
1043        for (i, (key, range)) in keys_and_ranges.iter().enumerate() {
1044            let get_result = cache2.get(&key, &range).await;
1045            assert!(get_result.is_ok(), "{i} {get_result:?}");
1046            assert!(get_result.unwrap().is_some(), "{i}");
1047        }
1048
1049        let cache_keys = cache.state.read().await.inner.keys().cloned().collect::<BTreeSet<_>>();
1050        let cache2_keys = cache2.state.read().await.inner.keys().cloned().collect::<BTreeSet<_>>();
1051        assert_eq!(cache_keys, cache2_keys);
1052    }
1053
1054    #[tokio::test]
1055    async fn test_initialize_too_large_file() {
1056        const LARGE_FILE: u64 = 1000;
1057        let cache_root = TempDir::new().unwrap();
1058        let config = CacheConfig {
1059            cache_directory: cache_root.path().to_path_buf(),
1060            cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
1061            ..Default::default()
1062        };
1063        let cache = DiskCache::initialize(&config).unwrap();
1064        let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED).with_range_len(LARGE_FILE as u32);
1065
1066        let (key, range, offsets, data) = it.next().unwrap();
1067        cache.put(&key, &range, &offsets, &data).await.unwrap();
1068        let config = CacheConfig {
1069            cache_directory: cache_root.path().to_path_buf(),
1070            cache_size: LARGE_FILE - 1,
1071            ..Default::default()
1072        };
1073        let cache2 = DiskCache::initialize(&config).unwrap();
1074
1075        assert_eq!(cache2.total_bytes().await, 0);
1076    }
1077
1078    #[tokio::test]
1079    async fn test_initialize_stops_loading_early_with_too_many_files() {
1080        const LARGE_FILE: u64 = 1000;
1081        let cache_root = TempDir::new().unwrap();
1082        let config = CacheConfig {
1083            cache_directory: cache_root.path().to_path_buf(),
1084            cache_size: LARGE_FILE * 10,
1085            ..Default::default()
1086        };
1087        let cache = DiskCache::initialize(&config).unwrap();
1088        let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED).with_range_len(LARGE_FILE as u32);
1089        for _ in 0..10 {
1090            let (key, range, offsets, data) = it.next().unwrap();
1091            cache.put(&key, &range, &offsets, &data).await.unwrap();
1092        }
1093
1094        let cap2 = LARGE_FILE * 2;
1095        let config = CacheConfig {
1096            cache_directory: cache_root.path().to_path_buf(),
1097            cache_size: cap2,
1098            ..Default::default()
1099        };
1100        let cache2 = DiskCache::initialize(&config).unwrap();
1101
1102        assert!(cache2.total_bytes().await < cap2 * 3, "{} < {}", cache2.total_bytes().await, cap2 * 3);
1103    }
1104
1105    #[test]
1106    fn test_dir_name_to_key() {
1107        let s = "oL-Xqk1J00kVe1U4kCko-Kw4zaVv3-4U73i27w5DViBkZWZhdWx0";
1108        let key = try_parse_key(s.as_bytes());
1109        assert!(key.is_ok(), "{key:?}")
1110    }
1111
1112    #[tokio::test]
1113    async fn test_unknown_eviction() {
1114        let cache_root = TempDir::new().unwrap();
1115        let capacity = 12 * RANGE_LEN as u64;
1116        let config = CacheConfig {
1117            cache_directory: cache_root.path().to_path_buf(),
1118            cache_size: capacity,
1119            ..Default::default()
1120        };
1121        let cache = DiskCache::initialize(&config).unwrap();
1122        let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
1123        let (key, range, chunk_byte_indices, data) = it.next().unwrap();
1124        cache.put(&key, &range, &chunk_byte_indices, &data).await.unwrap();
1125
1126        let cache2 = DiskCache::initialize(&config).unwrap();
1127        let get_result = cache2.get(&key, &range).await;
1128        assert!(get_result.is_ok());
1129        assert!(get_result.unwrap().is_some());
1130
1131        let (key2, range2, chunk_byte_indices2, data2) = it.next().unwrap();
1132        assert!(cache2.put(&key2, &range2, &chunk_byte_indices2, &data2).await.is_ok());
1133
1134        let mut get_result_1 = cache2.get(&key, &range).await.unwrap();
1135        let mut i = 0;
1136        while get_result_1.is_some() && i < 50 {
1137            i += 1;
1138            let (key2, range2, chunk_byte_indices2, data2) = it.next().unwrap();
1139            cache2.put(&key2, &range2, &chunk_byte_indices2, &data2).await.unwrap();
1140            get_result_1 = cache2.get(&key, &range).await.unwrap();
1141        }
1142        if get_result_1.is_some() {
1143            // randomness didn't evict the record after 50 tries, don't test this case now
1144            return;
1145        }
1146        // we've evicted the original record from the cache
1147        // note using the original cache handle without updates!
1148        let get_result_post_eviction = cache.get(&key, &range).await;
1149        assert!(get_result_post_eviction.is_ok());
1150        assert!(get_result_post_eviction.unwrap().is_none());
1151    }
1152
1153    #[tokio::test]
1154    async fn put_subrange() {
1155        let cache_root = TempDir::new().unwrap();
1156        let config = CacheConfig {
1157            cache_directory: cache_root.path().to_path_buf(),
1158            cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
1159            ..Default::default()
1160        };
1161        let cache = DiskCache::initialize(&config).unwrap();
1162
1163        let (key, range, chunk_byte_indices, data) = RandomEntryIterator::std_from_seed(RANDOM_SEED).next().unwrap();
1164        cache.put(&key, &range, &chunk_byte_indices, &data).await.unwrap();
1165        let total_bytes = cache.total_bytes().await;
1166
1167        // left range
1168        let left_range = ChunkRange::new(range.start, range.end - 1);
1169        let left_chunk_byte_indices = &chunk_byte_indices[..chunk_byte_indices.len() - 1];
1170        let left_data = &data[..*left_chunk_byte_indices.last().unwrap() as usize];
1171        assert!(cache.put(&key, &left_range, left_chunk_byte_indices, left_data).await.is_ok());
1172        assert_eq!(total_bytes, cache.total_bytes().await);
1173
1174        // right range
1175        let right_range = ChunkRange::new(range.start + 1, range.end);
1176        let right_chunk_byte_indices: Vec<u32> =
1177            (&chunk_byte_indices[1..]).iter().map(|v| v - chunk_byte_indices[1]).collect();
1178        let right_data = &data[chunk_byte_indices[1] as usize..];
1179        assert!(
1180            cache
1181                .put(&key, &right_range, &right_chunk_byte_indices, right_data)
1182                .await
1183                .is_ok()
1184        );
1185        assert_eq!(total_bytes, cache.total_bytes().await);
1186
1187        // middle range
1188        let middle_range = ChunkRange::new(range.start + 1, range.end - 1);
1189        let middle_chunk_byte_indices: Vec<u32> = (&chunk_byte_indices[1..(chunk_byte_indices.len() - 1)])
1190            .iter()
1191            .map(|v| v - chunk_byte_indices[1])
1192            .collect();
1193        let middle_data =
1194            &data[chunk_byte_indices[1] as usize..chunk_byte_indices[chunk_byte_indices.len() - 2] as usize];
1195
1196        assert!(
1197            cache
1198                .put(&key, &middle_range, &middle_chunk_byte_indices, middle_data)
1199                .await
1200                .is_ok()
1201        );
1202        assert_eq!(total_bytes, cache.total_bytes().await);
1203    }
1204
1205    #[tokio::test]
1206    async fn test_evictions_with_multiple_range_per_key() {
1207        const NUM: u32 = 12;
1208        let cache_root = TempDir::new().unwrap();
1209        let capacity = (NUM * RANGE_LEN) as u64;
1210        let config = CacheConfig {
1211            cache_directory: cache_root.path().to_path_buf(),
1212            cache_size: capacity,
1213            ..Default::default()
1214        };
1215        let cache = DiskCache::initialize(&config).unwrap();
1216        let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED).with_one_chunk_ranges(true);
1217        let (key, _, _, _) = it.next().unwrap();
1218        let mut previously_put: Vec<(Key, ChunkRange)> = Vec::new();
1219
1220        for _ in 0..(NUM / 2) {
1221            let (key2, mut range, chunk_byte_indices, data) = it.next().unwrap();
1222            while previously_put.iter().any(|(_, r)| r.start == range.start) {
1223                range.start += 1 % 1000;
1224            }
1225            cache.put(&key, &range, &chunk_byte_indices, &data).await.unwrap();
1226            previously_put.push((key.clone(), range.clone()));
1227            cache.put(&key2, &range, &chunk_byte_indices, &data).await.unwrap();
1228            previously_put.push((key2, range));
1229        }
1230
1231        let mut num_hits = 0;
1232        for (key, range) in &previously_put {
1233            let result = cache.get(key, range).await;
1234            assert!(result.is_ok());
1235            let result = result.unwrap();
1236            if result.is_some() {
1237                num_hits += 1;
1238            }
1239        }
1240        // assert got some hits, exact number depends on item size
1241        assert_ne!(num_hits, 0);
1242
1243        // assert that we haven't evicted all keys for key with multiple items
1244        assert!(cache.state.read().await.inner.contains_key(&key), "evicted key that should have remained in cache");
1245    }
1246
1247    #[test]
1248    fn test_initialize_with_cache_size_0() {
1249        assert!(
1250            DiskCache::initialize(&CacheConfig {
1251                cache_directory: "/tmp".into(),
1252                cache_size: 0,
1253            })
1254            .is_err()
1255        );
1256    }
1257}
1258
1259#[cfg(test)]
1260mod concurrency_tests {
1261    use tempfile::TempDir;
1262
1263    use super::super::{CacheConfig, ChunkCache};
1264    use super::DiskCache;
1265    use super::test_utils::{RANGE_LEN, RandomEntryIterator};
1266
1267    const NUM_ITEMS_PER_TASK: usize = 20;
1268    const RANDOM_SEED: u64 = 878987298749287;
1269
1270    const DEFAULT_CHUNK_CACHE_CAPACITY: u64 = 10_000_000_000;
1271
1272    #[tokio::test]
1273    async fn test_run_concurrently() {
1274        let cache_root = TempDir::new().unwrap();
1275
1276        let config = CacheConfig {
1277            cache_directory: cache_root.path().to_path_buf(),
1278            cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
1279            ..Default::default()
1280        };
1281        let cache = DiskCache::initialize(&config).unwrap();
1282
1283        let num_tasks = 2 + rand::random::<u8>() % 14;
1284
1285        let mut handles = Vec::with_capacity(num_tasks as usize);
1286        for _ in 0..num_tasks {
1287            let cache_clone = cache.clone();
1288            handles.push(tokio::spawn(async move {
1289                let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
1290                let mut kr = Vec::with_capacity(NUM_ITEMS_PER_TASK);
1291                for _ in 0..NUM_ITEMS_PER_TASK {
1292                    let (key, range, chunk_byte_indices, data) = it.next().unwrap();
1293                    assert!(cache_clone.put(&key, &range, &chunk_byte_indices, &data).await.is_ok());
1294                    kr.push((key, range));
1295                }
1296                for (key, range) in kr {
1297                    assert!(cache_clone.get(&key, &range).await.is_ok());
1298                }
1299            }))
1300        }
1301
1302        for handle in handles {
1303            handle.await.expect("join should not error");
1304        }
1305    }
1306
1307    #[tokio::test]
1308    #[cfg_attr(feature = "smoke-test", ignore)]
1309    async fn test_run_concurrently_with_evictions() {
1310        let cache_root = TempDir::new().unwrap();
1311        let config = CacheConfig {
1312            cache_directory: cache_root.path().to_path_buf(),
1313            cache_size: RANGE_LEN as u64 * NUM_ITEMS_PER_TASK as u64,
1314            ..Default::default()
1315        };
1316        let cache = DiskCache::initialize(&config).unwrap();
1317
1318        let num_tasks = 2 + rand::random::<u8>() % 14;
1319
1320        let mut handles = Vec::with_capacity(num_tasks as usize);
1321        for _ in 0..num_tasks {
1322            let cache_clone = cache.clone();
1323            handles.push(tokio::spawn(async move {
1324                let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
1325                let mut kr = Vec::with_capacity(NUM_ITEMS_PER_TASK);
1326                for _ in 0..NUM_ITEMS_PER_TASK {
1327                    let (key, range, chunk_byte_indices, data) = it.next().unwrap();
1328                    assert!(cache_clone.put(&key, &range, &chunk_byte_indices, &data).await.is_ok());
1329                    kr.push((key, range));
1330                }
1331                for (key, range) in kr {
1332                    assert!(cache_clone.get(&key, &range).await.is_ok());
1333                }
1334            }))
1335        }
1336
1337        for handle in handles {
1338            handle.await.expect("join should not error");
1339        }
1340    }
1341
1342    #[tokio::test(flavor = "multi_thread")]
1343    async fn test_run_concurrently_thundering_herd() {
1344        let cache_root = TempDir::new().unwrap();
1345        let config = CacheConfig {
1346            cache_directory: cache_root.path().to_path_buf(),
1347            cache_size: RANGE_LEN as u64 * NUM_ITEMS_PER_TASK as u64,
1348        };
1349        let cache = DiskCache::initialize(&config).unwrap();
1350
1351        // data inserted is the same
1352        let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
1353        let (key, range, chunk_byte_indices, data) = it.next().unwrap();
1354
1355        // Spawn tasks to simultaneously insert into cache
1356        let num_tasks = 64;
1357        let mut handles = Vec::with_capacity(num_tasks as usize);
1358        for _ in 0..num_tasks {
1359            let cache_clone = cache.clone();
1360            let key = key.clone();
1361            let range = range.clone();
1362            let chunk_byte_indices = chunk_byte_indices.clone();
1363            let data_clone = data.clone();
1364            handles.push(tokio::spawn(async move {
1365                let res = cache_clone.put(&key, &range, &chunk_byte_indices, &data_clone).await;
1366                assert!(res.is_ok(), "err: {res:?}");
1367            }))
1368        }
1369
1370        for handle in handles {
1371            handle.await.expect("join should not error");
1372        }
1373
1374        // check that there is only 1 term in the cache for this data
1375        let state = cache.state.read().await;
1376        let items = state.inner.get(&key).unwrap();
1377
1378        let num = items.iter().filter(|item| item.range == range).count();
1379        assert_eq!(num, 1);
1380    }
1381}