1use std::collections::HashMap;
2use std::fs::{DirEntry, File};
3use std::io::{self, Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
4use std::mem::size_of;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use base64::Engine;
10use base64::engine::GeneralPurpose;
11use base64::engine::general_purpose::URL_SAFE;
12use tokio::sync::RwLock;
13use tracing::{debug, error};
14use xet_core_structures::merklehash::MerkleHash;
15use xet_runtime::core::xet_config;
16use xet_runtime::error_printer::ErrorPrinter;
17use xet_runtime::file_utils::SafeFileCreator;
18use xet_runtime::utils::output_bytes;
19
20use self::cache_file_header::CacheFileHeader;
21use self::cache_item::{CacheItem, VerificationCell};
22use super::error::ChunkCacheError;
23use super::{CacheConfig, CacheRange, ChunkCache};
24use crate::cas_types::{ChunkRange, Key};
25
26mod cache_file_header;
27mod cache_item;
28pub mod test_utils;
29
30pub(crate) const BASE64_ENGINE: GeneralPurpose = URL_SAFE;
32const PREFIX_DIR_NAME_LEN: usize = 2;
33
34type OptionResult<T, E> = Result<Option<T>, E>;
35
36#[derive(Debug, Clone)]
37struct CacheState {
38 inner: HashMap<Key, Vec<VerificationCell<CacheItem>>>,
39 num_items: usize,
40 total_bytes: u64,
41}
42
43impl CacheState {
44 fn new(state: HashMap<Key, Vec<VerificationCell<CacheItem>>>, num_items: usize, total_bytes: u64) -> Self {
45 Self {
46 inner: state,
47 num_items,
48 total_bytes,
49 }
50 }
51
52 fn find_match(&self, key: &Key, range: &ChunkRange) -> Option<VerificationCell<CacheItem>> {
53 let items = self.inner.get(key)?;
54
55 for item in items.iter() {
57 if item.range.start <= range.start && range.end <= item.range.end {
58 return Some(item.clone());
59 }
60 }
61 None
62 }
63
64 fn evict_to_capacity(
70 &mut self,
71 max_total_bytes: u64,
72 ) -> Result<Vec<(Key, VerificationCell<CacheItem>)>, ChunkCacheError> {
73 let original_total_bytes = self.total_bytes;
74 let mut ret = Vec::new();
75
76 while self.total_bytes > max_total_bytes {
77 let Some((key, idx)) = self.random_item() else {
78 error!("attempted to evict item, but no item could be found to be evicted");
79 break;
80 };
81 let items = self.inner.get_mut(&key).ok_or(ChunkCacheError::Infallible)?;
82 let cache_item = items.swap_remove(idx);
83 let len = cache_item.len;
84
85 if items.is_empty() {
86 self.inner.remove(&key);
87 }
88
89 ret.push((key, cache_item));
90
91 self.total_bytes -= len;
92 self.num_items -= 1;
93 }
94 debug!(
95 "cache evicting {} items totaling {}",
96 ret.len(),
97 output_bytes(original_total_bytes - self.total_bytes)
98 );
99
100 Ok(ret)
101 }
102
103 fn random_item(&self) -> Option<(Key, usize)> {
105 debug_assert_eq!(
106 self.inner.values().map(|v| v.len()).sum::<usize>(),
107 self.num_items,
108 "real num items != stored num items"
109 );
110
111 if self.num_items == 0 {
112 error!("cache random_item for eviction: no items in cache");
113 return None;
114 }
115 let random_item = rand::random::<u32>() as usize % self.num_items;
116 let mut count = 0;
117 for (key, items) in self.inner.iter() {
118 if random_item < count + items.len() {
119 return Some((key.clone(), random_item - count));
120 }
121 count += items.len();
122 }
123 error!("cache random_item for eviction: tried to return random item error not enough items");
125 None
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct DiskCache {
132 cache_root: PathBuf,
133 capacity: u64,
134 state: Arc<RwLock<CacheState>>,
135}
136
137#[cfg(feature = "analysis")]
139impl DiskCache {
140 pub async fn print(&self) {
141 let state = self.state.read().await;
142 let total_num_items = state.num_items;
143 let total_total_bytes = state.total_bytes;
144
145 println!(
146 "total items: {}, total bytes {} for the whole cache",
147 total_num_items,
148 output_bytes(total_total_bytes)
149 );
150
151 for (key, items) in state.inner.iter() {
152 println!();
153 let num_items = items.len();
154 let total_bytes: usize = items.iter().map(|item| item.len).fold(0usize, |acc, len| acc + len as usize);
155 println!("key: {key}");
156 println!("\ttotal items: {}, total bytes {} for key {key}", num_items, output_bytes(total_bytes as u64));
157 println!();
158 for item in items.iter() {
159 println!(
160 "\titem: chunk range [{}-{}) ; len({}); checksum({})",
161 item.range.start,
162 item.range.end,
163 output_bytes(item.len),
164 item.checksum,
165 );
166 }
167 }
168 }
169}
170
171impl DiskCache {
172 pub async fn num_items(&self) -> usize {
173 self.state.read().await.num_items
174 }
175
176 pub async fn total_bytes(&self) -> u64 {
177 self.state.read().await.total_bytes
178 }
179
180 pub fn initialize(config: &CacheConfig) -> Result<Self, ChunkCacheError> {
209 if config.cache_size == 0 {
210 return Err(ChunkCacheError::InvalidArguments);
211 }
212 let capacity = config.cache_size;
213 let cache_root = config.cache_directory.clone();
214
215 let state = Self::initialize_state(&cache_root, capacity)?;
217
218 Ok(Self {
219 state: Arc::new(RwLock::new(state)),
220 cache_root: config.cache_directory.clone(),
221 capacity,
222 })
223 }
224
225 fn initialize_state(cache_root: &PathBuf, capacity: u64) -> Result<CacheState, ChunkCacheError> {
226 let mut state = HashMap::new();
227 let mut total_bytes = 0;
228 let mut num_items = 0;
229 let max_num_bytes = 2 * capacity;
230
231 let Some(cache_root_readdir) = read_dir(cache_root)? else {
232 return Ok(CacheState::new(state, 0, 0));
233 };
234
235 for key_prefix_dir in cache_root_readdir {
238 let Some(key_prefix_dir) = is_ok_dir(key_prefix_dir)? else {
239 continue;
240 };
241
242 let key_prefix_dir_name = key_prefix_dir.file_name();
243 if key_prefix_dir_name.as_encoded_bytes().len() != PREFIX_DIR_NAME_LEN {
244 debug!("prefix dir name len != {PREFIX_DIR_NAME_LEN}");
245 continue;
246 }
247
248 let Some(key_prefix_readdir) = read_dir(key_prefix_dir.path())? else {
249 continue;
250 };
251
252 for key_dir in key_prefix_readdir {
254 let key_dir = match is_ok_dir(key_dir) {
255 Ok(Some(dirent)) => dirent,
256 Ok(None) => continue,
257 Err(e) => return Err(e),
258 };
259
260 let key_dir_name = key_dir.file_name();
261
262 debug_assert_eq!(
264 key_dir_name.as_encoded_bytes()[..PREFIX_DIR_NAME_LEN].to_ascii_uppercase(),
265 key_prefix_dir_name.as_encoded_bytes().to_ascii_uppercase(),
266 "{key_dir_name:?}",
267 );
268
269 let key = match try_parse_key(key_dir_name.as_encoded_bytes()) {
270 Ok(key) => key,
271 Err(e) => {
272 debug!("failed to decoded a directory name as a key: {e}");
273 continue;
274 },
275 };
276
277 let mut items = Vec::new();
278
279 let key_readdir = match read_dir(key_dir.path()) {
280 Ok(Some(krd)) => krd,
281 Ok(None) => continue,
282 Err(e) => return Err(e),
283 };
284
285 for item in key_readdir {
287 let cache_item = match try_parse_cache_file(item, capacity) {
288 Ok(Some(ci)) => ci,
289 Ok(None) => continue,
290 Err(e) => return Err(e),
291 };
292
293 total_bytes += cache_item.len;
294 num_items += 1;
295 items.push(VerificationCell::new_unverified(cache_item));
296
297 if total_bytes >= max_num_bytes {
299 state.insert(key, items);
300 return Ok(CacheState::new(state, num_items, total_bytes));
301 }
302 }
303
304 if !items.is_empty() {
305 state.insert(key, items);
306 }
307 }
308 }
309
310 Ok(CacheState::new(state, num_items, total_bytes))
311 }
312
313 async fn get_impl(&self, key: &Key, range: &ChunkRange) -> OptionResult<CacheRange, ChunkCacheError> {
314 if range.start >= range.end {
315 return Err(ChunkCacheError::InvalidArguments);
316 }
317
318 loop {
319 let Some(cache_item) = self.state.read().await.find_match(key, range) else {
320 return Ok(None);
321 };
322
323 let path = self.item_path(key, &cache_item)?;
324
325 let mut file = match File::open(&path) {
326 Ok(file) => file,
327 Err(e) => match e.kind() {
328 ErrorKind::NotFound => {
329 self.remove_item(key, &cache_item).await?;
330 continue;
331 },
332 _ => return Err(e.into()),
333 },
334 };
335
336 if !cache_item.is_verified() {
337 let checksum = crc32_from_reader(&mut file)?;
338 if checksum == cache_item.checksum {
339 cache_item.verify();
340 file.rewind()?;
341 } else {
342 debug!("computed checksum {checksum} mismatch on cache item {key}/{cache_item}");
343 self.remove_item(key, &cache_item).await?;
344 continue;
345 }
346 }
347
348 let mut file_reader = std::io::BufReader::new(file);
349
350 let Ok(header) = CacheFileHeader::deserialize(&mut file_reader)
351 .debug_error(format!("failed to deserialize cache file header on path: {path:?}"))
352 else {
353 self.remove_item(key, &cache_item).await?;
354 continue;
355 };
356
357 let start = cache_item.range.start;
358 let result_buf = get_range_from_cache_file(&header, &mut file_reader, range, start)?;
359 return Ok(Some(result_buf));
360 }
361 }
362
363 async fn put_impl(
364 &self,
365 key: &Key,
366 range: &ChunkRange,
367 chunk_byte_indices: &[u32],
368 data: &[u8],
369 ) -> Result<(), ChunkCacheError> {
370 if range.start >= range.end
371 || chunk_byte_indices.len() != (range.end - range.start + 1) as usize
372 || chunk_byte_indices[0] != 0
374 || *chunk_byte_indices.last().unwrap() as usize != data.len()
375 || !strictly_increasing(chunk_byte_indices)
376 {
377 return Err(ChunkCacheError::InvalidArguments);
378 }
379
380 while let Some(cache_item) = self.state.read().await.find_match(key, range) {
382 if self.validate_match(key, range, chunk_byte_indices, data, &cache_item).await? {
383 return Ok(());
384 }
385 }
386
387 let header = CacheFileHeader::new(chunk_byte_indices);
388 let mut header_buf = Vec::with_capacity(header.header_len());
389 header.serialize(&mut header_buf)?;
390 let len = (header_buf.len() + data.len()) as u64;
391 if len > self.capacity {
392 return Ok(());
394 }
395
396 let checksum = {
397 let mut hasher = crc32fast::Hasher::new();
398 hasher.update(&header_buf);
399 hasher.update(data);
400 hasher.finalize()
401 };
402
403 let cache_item = CacheItem {
404 range: *range,
405 len,
406 checksum,
407 };
408
409 let path = self.item_path(key, &cache_item)?;
411 let mut fw = SafeFileCreator::new(path)?;
412 fw.write_all(&header_buf)?;
413 fw.write_all(data)?;
414
415 let mut state_write = self.state.write().await;
418
419 if state_write.find_match(key, range).is_some() {
424 fw.abort()?;
427 return Ok(());
428 }
429 fw.close()?;
430
431 let evicted_paths = state_write.evict_to_capacity(self.capacity - cache_item.len)?;
433
434 state_write.num_items += 1;
436 state_write.total_bytes += cache_item.len;
437 let item_set = state_write.inner.entry(key.clone()).or_default();
438 item_set.push(VerificationCell::new_verified(cache_item));
439
440 drop(state_write);
442
443 for (key, cache_item) in evicted_paths {
445 let path = self.item_path(&key, &cache_item)?;
446 remove_file(&path)?;
447 let dir_path = path.parent().ok_or(ChunkCacheError::Infallible)?;
449 check_remove_dir(dir_path)?;
450 }
451
452 Ok(())
453 }
454
455 async fn validate_match(
458 &self,
459 key: &Key,
460 range: &ChunkRange,
461 chunk_byte_indices: &[u32],
462 data: &[u8],
463 cache_item: &VerificationCell<CacheItem>,
464 ) -> Result<bool, ChunkCacheError> {
465 if range.start < cache_item.range.start || range.end > cache_item.range.end {
467 return Err(ChunkCacheError::BadRange);
468 }
469
470 let path = self.item_path(key, cache_item)?;
472
473 let Ok(mut file) = File::open(path) else {
474 self.remove_item(key, cache_item).await?;
475 return Ok(false);
476 };
477 let md = file.metadata()?;
478 if md.len() != cache_item.len {
479 self.remove_item(key, cache_item).await?;
480 return Ok(false);
481 }
482 let mut buf = Vec::with_capacity(md.len() as usize);
483 file.read_to_end(&mut buf)?;
484 let checksum = crc32fast::hash(&buf);
485 if checksum != cache_item.checksum {
486 self.remove_item(key, cache_item).await?;
487 return Ok(false);
488 }
489 let mut reader = Cursor::new(buf);
490 let Ok(header) = CacheFileHeader::deserialize(&mut reader) else {
491 self.remove_item(key, cache_item).await?;
492 return Ok(false);
493 };
494
495 let idx_start = (range.start - cache_item.range.start) as usize;
500 let idx_end = (range.end - cache_item.range.start + 1) as usize;
501 for i in idx_start..idx_end - 1 {
502 let stored_diff = header.chunk_byte_indices[i + 1] - header.chunk_byte_indices[i];
503 let given_diff = chunk_byte_indices[i + 1 - idx_start] - chunk_byte_indices[i - idx_start];
504 if stored_diff != given_diff {
505 debug!(
506 "failed to match chunk lens for these chunk offsets {} {:?}\n{} {:?}",
507 cache_item.range,
508 &header.chunk_byte_indices[idx_start..idx_end],
509 range,
510 chunk_byte_indices
511 );
512 return Err(ChunkCacheError::InvalidArguments);
513 }
514 }
515
516 let stored = get_range_from_cache_file(&header, &mut reader, range, cache_item.range.start)?;
517 if data != stored.data {
518 return Err(ChunkCacheError::InvalidArguments);
519 }
520 Ok(true)
521 }
522
523 async fn remove_item(&self, key: &Key, cache_item: &VerificationCell<CacheItem>) -> Result<(), ChunkCacheError> {
525 {
526 let mut state = self.state.write().await;
527 if let Some(items) = state.inner.get_mut(key) {
528 let idx = match index_of(items, cache_item) {
529 Some(idx) => idx,
530 None => return Ok(()),
532 };
533
534 items.swap_remove(idx);
535 if items.is_empty() {
536 state.inner.remove(key);
537 }
538 state.total_bytes -= cache_item.len;
539 state.num_items -= 1;
540 }
541 }
542
543 let path = self.item_path(key, cache_item)?;
544
545 if !path.exists() {
546 return Ok(());
547 }
548 remove_file(&path)?;
549 let dir_path = path.parent().ok_or(ChunkCacheError::Infallible)?;
550 check_remove_dir(dir_path)
551 }
552
553 fn item_path(&self, key: &Key, cache_item: &CacheItem) -> Result<PathBuf, ChunkCacheError> {
554 Ok(self.cache_root.join(key_dir(key)).join(cache_item.file_name()?))
555 }
556}
557
558fn crc32_from_reader(reader: &mut impl Read) -> Result<u32, ChunkCacheError> {
559 const CRC_BUFFER_SIZE: usize = 4096;
560 let mut buf = [0u8; CRC_BUFFER_SIZE];
561 let mut hasher = crc32fast::Hasher::new();
562 loop {
563 let num_read = reader.read(&mut buf)?;
564 if num_read == 0 {
565 break;
566 }
567 hasher.update(&buf[..num_read])
568 }
569 Ok(hasher.finalize())
570}
571
572#[inline]
573fn index_of<T: PartialEq>(list: &[T], value: &T) -> Option<usize> {
574 for (i, list_value) in list.iter().enumerate() {
575 if list_value == value {
576 return Some(i);
577 }
578 }
579 None
580}
581
582fn strictly_increasing(chunk_byte_indices: &[u32]) -> bool {
583 for i in 1..chunk_byte_indices.len() {
584 if chunk_byte_indices[i - 1] >= chunk_byte_indices[i] {
585 return false;
586 }
587 }
588 true
589}
590
591fn get_range_from_cache_file<R: Read + Seek>(
592 header: &CacheFileHeader,
593 file_contents: &mut R,
594 range: &ChunkRange,
595 start: u32,
596) -> Result<CacheRange, ChunkCacheError> {
597 let start_idx = (range.start - start) as usize;
598 let end_idx = (range.end - start) as usize;
599 let start_byte = header.chunk_byte_indices.get(start_idx).ok_or(ChunkCacheError::BadRange)?;
600 let end_byte = header.chunk_byte_indices.get(end_idx).ok_or(ChunkCacheError::BadRange)?;
601 file_contents.seek(SeekFrom::Start((*start_byte as usize + header.header_len()) as u64))?;
602 let mut data = vec![0; (end_byte - start_byte) as usize];
603 file_contents.read_exact(&mut data)?;
604 let offsets: Vec<u32> = header.chunk_byte_indices[start_idx..=end_idx]
605 .iter()
606 .map(|v| *v - header.chunk_byte_indices[start_idx])
607 .collect();
608
609 debug_assert_eq!(range.end - range.start, offsets.len() as u32 - 1);
610
611 Ok(CacheRange {
612 offsets,
613 data,
614 range: *range,
615 })
616}
617
618fn read_dir(path: impl AsRef<Path>) -> OptionResult<std::fs::ReadDir, ChunkCacheError> {
621 match std::fs::read_dir(path) {
622 Ok(rd) => Ok(Some(rd)),
623 Err(e) => {
624 if e.kind() == ErrorKind::NotFound {
625 Ok(None)
626 } else {
627 Err(e.into())
628 }
629 },
630 }
631}
632
633fn is_ok_dir(dir_result: Result<DirEntry, io::Error>) -> OptionResult<DirEntry, ChunkCacheError> {
639 let dirent = match dir_result {
640 Ok(kd) => kd,
641 Err(e) => {
642 if e.kind() == ErrorKind::NotFound {
643 return Ok(None);
644 }
645 return Err(e.into());
646 },
647 };
648 let md = match dirent.metadata() {
649 Ok(md) => md,
650 Err(e) => {
651 if e.kind() == ErrorKind::NotFound {
652 return Ok(None);
653 }
654 return Err(e.into());
655 },
656 };
657 if !md.is_dir() {
658 debug!("CACHE: expected directory at {:?}, is not directory", dirent.path());
659 return Ok(None);
660 }
661 Ok(Some(dirent))
662}
663
664fn try_parse_cache_file(file_result: io::Result<DirEntry>, capacity: u64) -> OptionResult<CacheItem, ChunkCacheError> {
668 let item = match file_result {
669 Ok(item) => item,
670 Err(e) => {
671 if e.kind() == ErrorKind::NotFound {
672 return Ok(None);
673 }
674 return Err(e.into());
675 },
676 };
677 let md = match item.metadata() {
678 Ok(md) => md,
679 Err(e) => {
680 if e.kind() == ErrorKind::NotFound {
681 return Ok(None);
682 }
683 return Err(e.into());
684 },
685 };
686
687 if !md.is_file() {
688 return Ok(None);
689 }
690 if md.len() > xet_config().chunk_cache.size_bytes {
691 return Err(ChunkCacheError::general(format!(
692 "Cache directory contains a file larger than {} GB, cache directory state is invalid",
693 (xet_config().chunk_cache.size_bytes as f64 / (1 << 30) as f64)
694 )));
695 }
696
697 if md.len() > capacity {
699 return Ok(None);
700 }
701
702 let cache_item = match CacheItem::parse(item.file_name().as_encoded_bytes())
703 .debug_error("failed to decode a file name as a cache item")
704 {
705 Ok(i) => i,
706 Err(e) => {
707 debug!("not a valid cache file, removing: {:?} {e:?}", item.file_name());
708 remove_file(item.path())?;
709 return Ok(None);
710 },
711 };
712 if md.len() != cache_item.len {
713 debug!(
715 "cache file len {} does not match expected length {}, removing path: {:?}",
716 md.len(),
717 cache_item.len,
718 item.path()
719 );
720 remove_file(item.path())?;
721 return Ok(None);
722 }
723 Ok(Some(cache_item))
724}
725
726fn remove_file(path: impl AsRef<Path>) -> Result<(), ChunkCacheError> {
728 if let Err(e) = std::fs::remove_file(path)
729 && e.kind() != ErrorKind::NotFound
730 {
731 return Err(e.into());
732 }
733 Ok(())
734}
735
736fn remove_dir(path: impl AsRef<Path>) -> Result<(), ChunkCacheError> {
738 if let Err(e) = std::fs::remove_dir(path)
739 && e.kind() != ErrorKind::NotFound
740 {
741 return Err(e.into());
742 }
743 Ok(())
744}
745
746fn check_remove_dir(dir_path: impl AsRef<Path>) -> Result<(), ChunkCacheError> {
750 let readdir = match read_dir(&dir_path)? {
751 Some(rd) => rd,
752 None => return Ok(()),
753 };
754 if readdir.peekable().peek().is_some() {
755 return Ok(());
756 }
757 remove_dir(&dir_path)?;
759
760 let prefix_dir = dir_path.as_ref().parent().ok_or(ChunkCacheError::Infallible)?;
762
763 let prefix_readdir = match read_dir(prefix_dir)? {
764 Some(prd) => prd,
765 None => return Ok(()),
766 };
767 if prefix_readdir.peekable().peek().is_some() {
768 return Ok(());
769 }
770 remove_dir(prefix_dir)
772}
773
774fn try_parse_key(file_name: &[u8]) -> Result<Key, ChunkCacheError> {
777 let buf = BASE64_ENGINE.decode(file_name)?;
778 let hash = MerkleHash::from_slice(&buf[..size_of::<MerkleHash>()])?;
779 let prefix = String::from(std::str::from_utf8(&buf[size_of::<MerkleHash>()..])?);
780 Ok(Key { prefix, hash })
781}
782
783fn key_dir(key: &Key) -> PathBuf {
786 let prefix_bytes = key.prefix.as_bytes();
787 let mut buf = vec![0u8; size_of::<MerkleHash>() + prefix_bytes.len()];
788 buf[..size_of::<MerkleHash>()].copy_from_slice(key.hash.as_bytes());
789 buf[size_of::<MerkleHash>()..].copy_from_slice(prefix_bytes);
790 let encoded = BASE64_ENGINE.encode(&buf);
791 let prefix_dir = &encoded[..PREFIX_DIR_NAME_LEN];
792 let dir_str = format!("{prefix_dir}/{encoded}");
793 PathBuf::from(dir_str)
794}
795
796#[async_trait]
797impl ChunkCache for DiskCache {
798 async fn get(&self, key: &Key, range: &ChunkRange) -> Result<Option<CacheRange>, ChunkCacheError> {
799 self.get_impl(key, range).await
800 }
801
802 async fn put(
803 &self,
804 key: &Key,
805 range: &ChunkRange,
806 chunk_byte_indices: &[u32],
807 data: &[u8],
808 ) -> Result<(), ChunkCacheError> {
809 self.put_impl(key, range, chunk_byte_indices, data).await
810 }
811}
812
813#[cfg(test)]
814mod tests {
815 use std::collections::BTreeSet;
816
817 use rand::SeedableRng;
818 use rand::rngs::StdRng;
819 use tempfile::TempDir;
820 use xet_runtime::utils::output_bytes;
821
822 use super::super::{CacheConfig, ChunkCache};
823 use super::test_utils::*;
824 use super::{DiskCache, try_parse_key};
825 use crate::cas_types::{ChunkRange, Key};
826
827 const RANDOM_SEED: u64 = 9089 << 20 | 120043;
828
829 const DEFAULT_CHUNK_CACHE_CAPACITY: u64 = 10_000_000_000;
830
831 #[tokio::test]
832 async fn test_get_cache_empty() {
833 let mut rng = StdRng::seed_from_u64(RANDOM_SEED);
834 let cache_root = TempDir::new().unwrap();
835 let config = CacheConfig {
836 cache_directory: cache_root.path().to_path_buf(),
837 cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
838 ..Default::default()
839 };
840 let cache = DiskCache::initialize(&config).unwrap();
841 assert!(
842 cache
843 .get(&random_key(&mut rng), &random_range(&mut rng))
844 .await
845 .unwrap()
846 .is_none()
847 );
848 }
849
850 #[tokio::test]
851 async fn test_put_get_simple() {
852 let mut rng = StdRng::seed_from_u64(RANDOM_SEED);
853 let cache_root = TempDir::new().unwrap();
854 let config = CacheConfig {
855 cache_directory: cache_root.path().to_path_buf(),
856 cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
857 ..Default::default()
858 };
859 let cache = DiskCache::initialize(&config).unwrap();
860
861 let key = random_key(&mut rng);
862 let range = ChunkRange::new(0, 4);
863 let (chunk_byte_indices, data) = random_bytes(&mut rng, &range, RANGE_LEN);
864 let put_result = cache.put(&key, &range, &chunk_byte_indices, data.as_slice()).await;
865 assert!(put_result.is_ok(), "{put_result:?}");
866
867 print_directory_contents(cache_root.as_ref());
868
869 let cache_result = cache.get(&key, &range).await.unwrap();
871 assert!(cache_result.is_some());
872 let cache_range = cache_result.unwrap();
873 assert_eq!(cache_range.data, data);
874 assert_eq!(cache_range.range, range);
875 assert_eq!(cache_range.offsets, chunk_byte_indices);
876
877 let miss_range = ChunkRange::new(100, 101);
878 assert!(cache.get(&key, &miss_range).await.unwrap().is_none());
880 }
881
882 #[tokio::test]
883 async fn test_put_get_subrange() {
884 let mut rng = StdRng::seed_from_u64(RANDOM_SEED);
885 let cache_root = TempDir::new().unwrap();
886 let config = CacheConfig {
887 cache_directory: cache_root.path().to_path_buf(),
888 cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
889 ..Default::default()
890 };
891 let cache = DiskCache::initialize(&config).unwrap();
892
893 let key = random_key(&mut rng);
894 let range = ChunkRange::new(0, 4);
896 let (chunk_byte_indices, data) = random_bytes(&mut rng, &range, RANGE_LEN);
897 let put_result = cache.put(&key, &range, &chunk_byte_indices, data.as_slice()).await;
898 assert!(put_result.is_ok(), "{put_result:?}");
899
900 print_directory_contents(cache_root.as_ref());
901
902 for start in range.start..range.end {
903 for end in (start + 1)..=range.end {
904 let sub_range = ChunkRange::new(start, end);
905 let get_result = cache.get(&key, &sub_range).await.unwrap();
906 assert!(get_result.is_some(), "range: [{start} {end})");
907 let cache_range = get_result.unwrap();
908 assert_eq!(cache_range.range, sub_range);
909 assert_eq!(cache_range.offsets.len() as u32, sub_range.end - sub_range.start + 1);
911
912 for (expected, actual) in chunk_byte_indices[(start as usize)..=(end as usize)]
913 .iter()
914 .map(|v| *v - chunk_byte_indices[start as usize])
915 .zip(cache_range.offsets.iter())
916 {
917 assert_eq!(*actual, expected);
918 }
919
920 let start_byte = chunk_byte_indices[sub_range.start as usize] as usize;
921 let end_byte = chunk_byte_indices[sub_range.end as usize] as usize;
922 let data_portion = &data[start_byte..end_byte];
923 assert_eq!(data_portion, &cache_range.data);
924 }
925 }
926 }
927
928 #[tokio::test]
929 async fn test_puts_eviction() {
930 const MIN_NUM_KEYS: u32 = 12;
931 const CAP: u64 = (RANGE_LEN * (MIN_NUM_KEYS - 1)) as u64;
932 let cache_root = TempDir::new().unwrap();
933 let config = CacheConfig {
934 cache_directory: cache_root.path().to_path_buf(),
935 cache_size: CAP,
936 ..Default::default()
937 };
938 let cache = DiskCache::initialize(&config).unwrap();
939 let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
940
941 for _ in 0..MIN_NUM_KEYS {
943 let (key, range, offsets, data) = it.next().unwrap();
944 assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
945 }
946 let total_bytes = cache.total_bytes().await;
947 assert!(total_bytes <= CAP, "cache size: {} <= {}", output_bytes(total_bytes), output_bytes(CAP));
948
949 let (key, range, offsets, data) = it.next().unwrap();
950 let result = cache.put(&key, &range, &offsets, &data).await;
951 assert!(result.is_ok());
952 }
953
954 #[tokio::test]
955 async fn test_same_puts_noop() {
956 let cache_root = TempDir::new().unwrap();
957 let config = CacheConfig {
958 cache_directory: cache_root.path().to_path_buf(),
959 cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
960 ..Default::default()
961 };
962 let cache = DiskCache::initialize(&config).unwrap();
963 let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED).with_range_len(1000);
964 let (key, range, offsets, data) = it.next().unwrap();
965 assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
966 assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
967 }
968
969 #[tokio::test]
970 async fn test_overlap_range_data_mismatch_fail() {
971 let setup = || async move {
972 let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
973 let cache_root = TempDir::new().unwrap();
974 let config = CacheConfig {
975 cache_directory: cache_root.path().to_path_buf(),
976 cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
977 ..Default::default()
978 };
979 let cache = DiskCache::initialize(&config).unwrap();
980 let (key, range, offsets, data) = it.next().unwrap();
981 assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
982 (cache_root, cache, key, range, offsets, data)
983 };
984
985 let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
988 offsets.remove(1);
989 assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
990
991 let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
993 offsets[0] = 100;
994 assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
995
996 let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
998 *offsets.last_mut().unwrap() = data.len() as u32 + 1;
999 assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
1000
1001 let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
1003 offsets[2] = offsets[1];
1004 assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
1005
1006 let (_cache_root, cache, key, range, mut offsets, data) = setup().await;
1008 offsets[1] += 1;
1009 assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
1010
1011 let (_cache_root, cache, key, range, offsets, data) = setup().await;
1014 assert!(cache.put(&key, &range, &offsets, &data[1..]).await.is_err());
1015
1016 let (_cache_root, cache, key, range, offsets, mut data) = setup().await;
1018 data[0] += 1;
1019 assert!(cache.put(&key, &range, &offsets, &data).await.is_err());
1020 }
1021
1022 #[tokio::test]
1023 async fn test_initialize_non_empty() {
1024 let cache_root = TempDir::new().unwrap();
1025 let config = CacheConfig {
1026 cache_directory: cache_root.path().to_path_buf(),
1027 cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
1028 ..Default::default()
1029 };
1030 let cache = DiskCache::initialize(&config).unwrap();
1031
1032 let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
1033
1034 let mut keys_and_ranges = Vec::new();
1035
1036 for _ in 0..20 {
1037 let (key, range, offsets, data) = it.next().unwrap();
1038 assert!(cache.put(&key, &range, &offsets, &data).await.is_ok());
1039 keys_and_ranges.push((key, range));
1040 }
1041
1042 let cache2 = DiskCache::initialize(&config).unwrap();
1043 for (i, (key, range)) in keys_and_ranges.iter().enumerate() {
1044 let get_result = cache2.get(&key, &range).await;
1045 assert!(get_result.is_ok(), "{i} {get_result:?}");
1046 assert!(get_result.unwrap().is_some(), "{i}");
1047 }
1048
1049 let cache_keys = cache.state.read().await.inner.keys().cloned().collect::<BTreeSet<_>>();
1050 let cache2_keys = cache2.state.read().await.inner.keys().cloned().collect::<BTreeSet<_>>();
1051 assert_eq!(cache_keys, cache2_keys);
1052 }
1053
1054 #[tokio::test]
1055 async fn test_initialize_too_large_file() {
1056 const LARGE_FILE: u64 = 1000;
1057 let cache_root = TempDir::new().unwrap();
1058 let config = CacheConfig {
1059 cache_directory: cache_root.path().to_path_buf(),
1060 cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
1061 ..Default::default()
1062 };
1063 let cache = DiskCache::initialize(&config).unwrap();
1064 let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED).with_range_len(LARGE_FILE as u32);
1065
1066 let (key, range, offsets, data) = it.next().unwrap();
1067 cache.put(&key, &range, &offsets, &data).await.unwrap();
1068 let config = CacheConfig {
1069 cache_directory: cache_root.path().to_path_buf(),
1070 cache_size: LARGE_FILE - 1,
1071 ..Default::default()
1072 };
1073 let cache2 = DiskCache::initialize(&config).unwrap();
1074
1075 assert_eq!(cache2.total_bytes().await, 0);
1076 }
1077
1078 #[tokio::test]
1079 async fn test_initialize_stops_loading_early_with_too_many_files() {
1080 const LARGE_FILE: u64 = 1000;
1081 let cache_root = TempDir::new().unwrap();
1082 let config = CacheConfig {
1083 cache_directory: cache_root.path().to_path_buf(),
1084 cache_size: LARGE_FILE * 10,
1085 ..Default::default()
1086 };
1087 let cache = DiskCache::initialize(&config).unwrap();
1088 let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED).with_range_len(LARGE_FILE as u32);
1089 for _ in 0..10 {
1090 let (key, range, offsets, data) = it.next().unwrap();
1091 cache.put(&key, &range, &offsets, &data).await.unwrap();
1092 }
1093
1094 let cap2 = LARGE_FILE * 2;
1095 let config = CacheConfig {
1096 cache_directory: cache_root.path().to_path_buf(),
1097 cache_size: cap2,
1098 ..Default::default()
1099 };
1100 let cache2 = DiskCache::initialize(&config).unwrap();
1101
1102 assert!(cache2.total_bytes().await < cap2 * 3, "{} < {}", cache2.total_bytes().await, cap2 * 3);
1103 }
1104
1105 #[test]
1106 fn test_dir_name_to_key() {
1107 let s = "oL-Xqk1J00kVe1U4kCko-Kw4zaVv3-4U73i27w5DViBkZWZhdWx0";
1108 let key = try_parse_key(s.as_bytes());
1109 assert!(key.is_ok(), "{key:?}")
1110 }
1111
1112 #[tokio::test]
1113 async fn test_unknown_eviction() {
1114 let cache_root = TempDir::new().unwrap();
1115 let capacity = 12 * RANGE_LEN as u64;
1116 let config = CacheConfig {
1117 cache_directory: cache_root.path().to_path_buf(),
1118 cache_size: capacity,
1119 ..Default::default()
1120 };
1121 let cache = DiskCache::initialize(&config).unwrap();
1122 let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
1123 let (key, range, chunk_byte_indices, data) = it.next().unwrap();
1124 cache.put(&key, &range, &chunk_byte_indices, &data).await.unwrap();
1125
1126 let cache2 = DiskCache::initialize(&config).unwrap();
1127 let get_result = cache2.get(&key, &range).await;
1128 assert!(get_result.is_ok());
1129 assert!(get_result.unwrap().is_some());
1130
1131 let (key2, range2, chunk_byte_indices2, data2) = it.next().unwrap();
1132 assert!(cache2.put(&key2, &range2, &chunk_byte_indices2, &data2).await.is_ok());
1133
1134 let mut get_result_1 = cache2.get(&key, &range).await.unwrap();
1135 let mut i = 0;
1136 while get_result_1.is_some() && i < 50 {
1137 i += 1;
1138 let (key2, range2, chunk_byte_indices2, data2) = it.next().unwrap();
1139 cache2.put(&key2, &range2, &chunk_byte_indices2, &data2).await.unwrap();
1140 get_result_1 = cache2.get(&key, &range).await.unwrap();
1141 }
1142 if get_result_1.is_some() {
1143 return;
1145 }
1146 let get_result_post_eviction = cache.get(&key, &range).await;
1149 assert!(get_result_post_eviction.is_ok());
1150 assert!(get_result_post_eviction.unwrap().is_none());
1151 }
1152
1153 #[tokio::test]
1154 async fn put_subrange() {
1155 let cache_root = TempDir::new().unwrap();
1156 let config = CacheConfig {
1157 cache_directory: cache_root.path().to_path_buf(),
1158 cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
1159 ..Default::default()
1160 };
1161 let cache = DiskCache::initialize(&config).unwrap();
1162
1163 let (key, range, chunk_byte_indices, data) = RandomEntryIterator::std_from_seed(RANDOM_SEED).next().unwrap();
1164 cache.put(&key, &range, &chunk_byte_indices, &data).await.unwrap();
1165 let total_bytes = cache.total_bytes().await;
1166
1167 let left_range = ChunkRange::new(range.start, range.end - 1);
1169 let left_chunk_byte_indices = &chunk_byte_indices[..chunk_byte_indices.len() - 1];
1170 let left_data = &data[..*left_chunk_byte_indices.last().unwrap() as usize];
1171 assert!(cache.put(&key, &left_range, left_chunk_byte_indices, left_data).await.is_ok());
1172 assert_eq!(total_bytes, cache.total_bytes().await);
1173
1174 let right_range = ChunkRange::new(range.start + 1, range.end);
1176 let right_chunk_byte_indices: Vec<u32> =
1177 (&chunk_byte_indices[1..]).iter().map(|v| v - chunk_byte_indices[1]).collect();
1178 let right_data = &data[chunk_byte_indices[1] as usize..];
1179 assert!(
1180 cache
1181 .put(&key, &right_range, &right_chunk_byte_indices, right_data)
1182 .await
1183 .is_ok()
1184 );
1185 assert_eq!(total_bytes, cache.total_bytes().await);
1186
1187 let middle_range = ChunkRange::new(range.start + 1, range.end - 1);
1189 let middle_chunk_byte_indices: Vec<u32> = (&chunk_byte_indices[1..(chunk_byte_indices.len() - 1)])
1190 .iter()
1191 .map(|v| v - chunk_byte_indices[1])
1192 .collect();
1193 let middle_data =
1194 &data[chunk_byte_indices[1] as usize..chunk_byte_indices[chunk_byte_indices.len() - 2] as usize];
1195
1196 assert!(
1197 cache
1198 .put(&key, &middle_range, &middle_chunk_byte_indices, middle_data)
1199 .await
1200 .is_ok()
1201 );
1202 assert_eq!(total_bytes, cache.total_bytes().await);
1203 }
1204
1205 #[tokio::test]
1206 async fn test_evictions_with_multiple_range_per_key() {
1207 const NUM: u32 = 12;
1208 let cache_root = TempDir::new().unwrap();
1209 let capacity = (NUM * RANGE_LEN) as u64;
1210 let config = CacheConfig {
1211 cache_directory: cache_root.path().to_path_buf(),
1212 cache_size: capacity,
1213 ..Default::default()
1214 };
1215 let cache = DiskCache::initialize(&config).unwrap();
1216 let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED).with_one_chunk_ranges(true);
1217 let (key, _, _, _) = it.next().unwrap();
1218 let mut previously_put: Vec<(Key, ChunkRange)> = Vec::new();
1219
1220 for _ in 0..(NUM / 2) {
1221 let (key2, mut range, chunk_byte_indices, data) = it.next().unwrap();
1222 while previously_put.iter().any(|(_, r)| r.start == range.start) {
1223 range.start += 1 % 1000;
1224 }
1225 cache.put(&key, &range, &chunk_byte_indices, &data).await.unwrap();
1226 previously_put.push((key.clone(), range.clone()));
1227 cache.put(&key2, &range, &chunk_byte_indices, &data).await.unwrap();
1228 previously_put.push((key2, range));
1229 }
1230
1231 let mut num_hits = 0;
1232 for (key, range) in &previously_put {
1233 let result = cache.get(key, range).await;
1234 assert!(result.is_ok());
1235 let result = result.unwrap();
1236 if result.is_some() {
1237 num_hits += 1;
1238 }
1239 }
1240 assert_ne!(num_hits, 0);
1242
1243 assert!(cache.state.read().await.inner.contains_key(&key), "evicted key that should have remained in cache");
1245 }
1246
1247 #[test]
1248 fn test_initialize_with_cache_size_0() {
1249 assert!(
1250 DiskCache::initialize(&CacheConfig {
1251 cache_directory: "/tmp".into(),
1252 cache_size: 0,
1253 })
1254 .is_err()
1255 );
1256 }
1257}
1258
1259#[cfg(test)]
1260mod concurrency_tests {
1261 use tempfile::TempDir;
1262
1263 use super::super::{CacheConfig, ChunkCache};
1264 use super::DiskCache;
1265 use super::test_utils::{RANGE_LEN, RandomEntryIterator};
1266
1267 const NUM_ITEMS_PER_TASK: usize = 20;
1268 const RANDOM_SEED: u64 = 878987298749287;
1269
1270 const DEFAULT_CHUNK_CACHE_CAPACITY: u64 = 10_000_000_000;
1271
1272 #[tokio::test]
1273 async fn test_run_concurrently() {
1274 let cache_root = TempDir::new().unwrap();
1275
1276 let config = CacheConfig {
1277 cache_directory: cache_root.path().to_path_buf(),
1278 cache_size: DEFAULT_CHUNK_CACHE_CAPACITY,
1279 ..Default::default()
1280 };
1281 let cache = DiskCache::initialize(&config).unwrap();
1282
1283 let num_tasks = 2 + rand::random::<u8>() % 14;
1284
1285 let mut handles = Vec::with_capacity(num_tasks as usize);
1286 for _ in 0..num_tasks {
1287 let cache_clone = cache.clone();
1288 handles.push(tokio::spawn(async move {
1289 let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
1290 let mut kr = Vec::with_capacity(NUM_ITEMS_PER_TASK);
1291 for _ in 0..NUM_ITEMS_PER_TASK {
1292 let (key, range, chunk_byte_indices, data) = it.next().unwrap();
1293 assert!(cache_clone.put(&key, &range, &chunk_byte_indices, &data).await.is_ok());
1294 kr.push((key, range));
1295 }
1296 for (key, range) in kr {
1297 assert!(cache_clone.get(&key, &range).await.is_ok());
1298 }
1299 }))
1300 }
1301
1302 for handle in handles {
1303 handle.await.expect("join should not error");
1304 }
1305 }
1306
1307 #[tokio::test]
1308 #[cfg_attr(feature = "smoke-test", ignore)]
1309 async fn test_run_concurrently_with_evictions() {
1310 let cache_root = TempDir::new().unwrap();
1311 let config = CacheConfig {
1312 cache_directory: cache_root.path().to_path_buf(),
1313 cache_size: RANGE_LEN as u64 * NUM_ITEMS_PER_TASK as u64,
1314 ..Default::default()
1315 };
1316 let cache = DiskCache::initialize(&config).unwrap();
1317
1318 let num_tasks = 2 + rand::random::<u8>() % 14;
1319
1320 let mut handles = Vec::with_capacity(num_tasks as usize);
1321 for _ in 0..num_tasks {
1322 let cache_clone = cache.clone();
1323 handles.push(tokio::spawn(async move {
1324 let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
1325 let mut kr = Vec::with_capacity(NUM_ITEMS_PER_TASK);
1326 for _ in 0..NUM_ITEMS_PER_TASK {
1327 let (key, range, chunk_byte_indices, data) = it.next().unwrap();
1328 assert!(cache_clone.put(&key, &range, &chunk_byte_indices, &data).await.is_ok());
1329 kr.push((key, range));
1330 }
1331 for (key, range) in kr {
1332 assert!(cache_clone.get(&key, &range).await.is_ok());
1333 }
1334 }))
1335 }
1336
1337 for handle in handles {
1338 handle.await.expect("join should not error");
1339 }
1340 }
1341
1342 #[tokio::test(flavor = "multi_thread")]
1343 async fn test_run_concurrently_thundering_herd() {
1344 let cache_root = TempDir::new().unwrap();
1345 let config = CacheConfig {
1346 cache_directory: cache_root.path().to_path_buf(),
1347 cache_size: RANGE_LEN as u64 * NUM_ITEMS_PER_TASK as u64,
1348 };
1349 let cache = DiskCache::initialize(&config).unwrap();
1350
1351 let mut it = RandomEntryIterator::std_from_seed(RANDOM_SEED);
1353 let (key, range, chunk_byte_indices, data) = it.next().unwrap();
1354
1355 let num_tasks = 64;
1357 let mut handles = Vec::with_capacity(num_tasks as usize);
1358 for _ in 0..num_tasks {
1359 let cache_clone = cache.clone();
1360 let key = key.clone();
1361 let range = range.clone();
1362 let chunk_byte_indices = chunk_byte_indices.clone();
1363 let data_clone = data.clone();
1364 handles.push(tokio::spawn(async move {
1365 let res = cache_clone.put(&key, &range, &chunk_byte_indices, &data_clone).await;
1366 assert!(res.is_ok(), "err: {res:?}");
1367 }))
1368 }
1369
1370 for handle in handles {
1371 handle.await.expect("join should not error");
1372 }
1373
1374 let state = cache.state.read().await;
1376 let items = state.inner.get(&key).unwrap();
1377
1378 let num = items.iter().filter(|item| item.range == range).count();
1379 assert_eq!(num, 1);
1380 }
1381}