mountpoint_s3_fs/data_cache/
in_memory_data_cache.rs

1//! Module for the in-memory data cache implementation used for testing.
2
3use std::collections::HashMap;
4use std::default::Default;
5
6use async_trait::async_trait;
7
8use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
9use crate::object::ObjectId;
10use crate::sync::RwLock;
11
12/// Simple in-memory (RAM) implementation of [DataCache]. Recommended for use in testing only.
13pub struct InMemoryDataCache {
14    data: RwLock<HashMap<ObjectId, HashMap<BlockIndex, ChecksummedBytes>>>,
15    block_size: u64,
16}
17
18impl InMemoryDataCache {
19    /// Create a new instance of an [InMemoryDataCache] with the specified `block_size`.
20    pub fn new(block_size: u64) -> Self {
21        InMemoryDataCache {
22            data: Default::default(),
23            block_size,
24        }
25    }
26
27    /// Get number of caching blocks for the given cache key.
28    pub fn block_count(&self, cache_key: &ObjectId) -> usize {
29        let data = self.data.read().unwrap();
30        data.get(cache_key).map_or(0, |cache| cache.len())
31    }
32}
33
34#[async_trait]
35impl DataCache for InMemoryDataCache {
36    async fn get_block(
37        &self,
38        cache_key: &ObjectId,
39        block_idx: BlockIndex,
40        block_offset: u64,
41        _object_size: usize,
42    ) -> DataCacheResult<Option<ChecksummedBytes>> {
43        if block_offset != block_idx * self.block_size {
44            return Err(DataCacheError::InvalidBlockOffset);
45        }
46        let data = self.data.read().unwrap();
47        let block_data = data.get(cache_key).and_then(|blocks| blocks.get(&block_idx)).cloned();
48        Ok(block_data)
49    }
50
51    async fn put_block(
52        &self,
53        cache_key: ObjectId,
54        block_idx: BlockIndex,
55        block_offset: u64,
56        bytes: ChecksummedBytes,
57        _object_size: usize,
58    ) -> DataCacheResult<()> {
59        if block_offset != block_idx * self.block_size {
60            return Err(DataCacheError::InvalidBlockOffset);
61        }
62        let mut data = self.data.write().unwrap();
63        let blocks = data.entry(cache_key).or_default();
64        blocks.insert(block_idx, bytes);
65        Ok(())
66    }
67
68    fn block_size(&self) -> u64 {
69        self.block_size
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use super::*;
76
77    use bytes::Bytes;
78    use mountpoint_s3_client::types::ETag;
79
80    #[tokio::test]
81    async fn test_put_get() {
82        let data_1 = Bytes::from_static(b"Hello world");
83        let data_1 = ChecksummedBytes::new(data_1.clone());
84        let data_2 = Bytes::from_static(b"Foo bar");
85        let data_2 = ChecksummedBytes::new(data_2.clone());
86        let data_3 = Bytes::from_static(b"Baz");
87        let data_3 = ChecksummedBytes::new(data_3.clone());
88
89        let object_1_size = data_1.len() + data_3.len();
90        let object_2_size = data_2.len();
91
92        let block_size = 8 * 1024 * 1024;
93        let cache = InMemoryDataCache::new(block_size);
94        let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
95        let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests());
96
97        let block = cache
98            .get_block(&cache_key_1, 0, 0, object_1_size)
99            .await
100            .expect("cache is accessible");
101        assert!(
102            block.is_none(),
103            "no entry should be available to return but got {block:?}",
104        );
105
106        // PUT and GET, OK?
107        cache
108            .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size)
109            .await
110            .expect("cache is accessible");
111        let entry = cache
112            .get_block(&cache_key_1, 0, 0, object_1_size)
113            .await
114            .expect("cache is accessible")
115            .expect("cache entry should be returned");
116        assert_eq!(
117            data_1, entry,
118            "cache entry returned should match original bytes after put"
119        );
120
121        // PUT AND GET a second file, OK?
122        cache
123            .put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size)
124            .await
125            .expect("cache is accessible");
126        let entry = cache
127            .get_block(&cache_key_2, 0, 0, object_2_size)
128            .await
129            .expect("cache is accessible")
130            .expect("cache entry should be returned");
131        assert_eq!(
132            data_2, entry,
133            "cache entry returned should match original bytes after put"
134        );
135
136        // PUT AND GET a second block in a cache entry, OK?
137        cache
138            .put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size)
139            .await
140            .expect("cache is accessible");
141        let entry = cache
142            .get_block(&cache_key_1, 1, block_size, object_1_size)
143            .await
144            .expect("cache is accessible")
145            .expect("cache entry should be returned");
146        assert_eq!(
147            data_3, entry,
148            "cache entry returned should match original bytes after put"
149        );
150
151        // Entry 1's first block still intact
152        let entry = cache
153            .get_block(&cache_key_1, 0, 0, object_1_size)
154            .await
155            .expect("cache is accessible")
156            .expect("cache entry should be returned");
157        assert_eq!(
158            data_1, entry,
159            "cache entry returned should match original bytes after put"
160        );
161    }
162}