mountpoint_s3_fs/data_cache/
in_memory_data_cache.rs1use std::collections::HashMap;
4use std::default::Default;
5
6use async_trait::async_trait;
7
8use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
9use crate::object::ObjectId;
10use crate::sync::RwLock;
11
12pub struct InMemoryDataCache {
14 data: RwLock<HashMap<ObjectId, HashMap<BlockIndex, ChecksummedBytes>>>,
15 block_size: u64,
16}
17
18impl InMemoryDataCache {
19 pub fn new(block_size: u64) -> Self {
21 InMemoryDataCache {
22 data: Default::default(),
23 block_size,
24 }
25 }
26
27 pub fn block_count(&self, cache_key: &ObjectId) -> usize {
29 let data = self.data.read().unwrap();
30 data.get(cache_key).map_or(0, |cache| cache.len())
31 }
32}
33
34#[async_trait]
35impl DataCache for InMemoryDataCache {
36 async fn get_block(
37 &self,
38 cache_key: &ObjectId,
39 block_idx: BlockIndex,
40 block_offset: u64,
41 _object_size: usize,
42 ) -> DataCacheResult<Option<ChecksummedBytes>> {
43 if block_offset != block_idx * self.block_size {
44 return Err(DataCacheError::InvalidBlockOffset);
45 }
46 let data = self.data.read().unwrap();
47 let block_data = data.get(cache_key).and_then(|blocks| blocks.get(&block_idx)).cloned();
48 Ok(block_data)
49 }
50
51 async fn put_block(
52 &self,
53 cache_key: ObjectId,
54 block_idx: BlockIndex,
55 block_offset: u64,
56 bytes: ChecksummedBytes,
57 _object_size: usize,
58 ) -> DataCacheResult<()> {
59 if block_offset != block_idx * self.block_size {
60 return Err(DataCacheError::InvalidBlockOffset);
61 }
62 let mut data = self.data.write().unwrap();
63 let blocks = data.entry(cache_key).or_default();
64 blocks.insert(block_idx, bytes);
65 Ok(())
66 }
67
68 fn block_size(&self) -> u64 {
69 self.block_size
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use super::*;
76
77 use bytes::Bytes;
78 use mountpoint_s3_client::types::ETag;
79
80 #[tokio::test]
81 async fn test_put_get() {
82 let data_1 = Bytes::from_static(b"Hello world");
83 let data_1 = ChecksummedBytes::new(data_1.clone());
84 let data_2 = Bytes::from_static(b"Foo bar");
85 let data_2 = ChecksummedBytes::new(data_2.clone());
86 let data_3 = Bytes::from_static(b"Baz");
87 let data_3 = ChecksummedBytes::new(data_3.clone());
88
89 let object_1_size = data_1.len() + data_3.len();
90 let object_2_size = data_2.len();
91
92 let block_size = 8 * 1024 * 1024;
93 let cache = InMemoryDataCache::new(block_size);
94 let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
95 let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests());
96
97 let block = cache
98 .get_block(&cache_key_1, 0, 0, object_1_size)
99 .await
100 .expect("cache is accessible");
101 assert!(
102 block.is_none(),
103 "no entry should be available to return but got {block:?}",
104 );
105
106 cache
108 .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size)
109 .await
110 .expect("cache is accessible");
111 let entry = cache
112 .get_block(&cache_key_1, 0, 0, object_1_size)
113 .await
114 .expect("cache is accessible")
115 .expect("cache entry should be returned");
116 assert_eq!(
117 data_1, entry,
118 "cache entry returned should match original bytes after put"
119 );
120
121 cache
123 .put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size)
124 .await
125 .expect("cache is accessible");
126 let entry = cache
127 .get_block(&cache_key_2, 0, 0, object_2_size)
128 .await
129 .expect("cache is accessible")
130 .expect("cache entry should be returned");
131 assert_eq!(
132 data_2, entry,
133 "cache entry returned should match original bytes after put"
134 );
135
136 cache
138 .put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size)
139 .await
140 .expect("cache is accessible");
141 let entry = cache
142 .get_block(&cache_key_1, 1, block_size, object_1_size)
143 .await
144 .expect("cache is accessible")
145 .expect("cache entry should be returned");
146 assert_eq!(
147 data_3, entry,
148 "cache entry returned should match original bytes after put"
149 );
150
151 let entry = cache
153 .get_block(&cache_key_1, 0, 0, object_1_size)
154 .await
155 .expect("cache is accessible")
156 .expect("cache entry should be returned");
157 assert_eq!(
158 data_1, entry,
159 "cache entry returned should match original bytes after put"
160 );
161 }
162}