sov_first_read_last_write_cache/
cache.rs

1use std::collections::hash_map::Entry;
2use std::collections::HashMap;
3
4use thiserror::Error;
5
6use crate::access::{Access, MergeError};
7use crate::{CacheKey, CacheValue};
8
9#[derive(Error, Debug, Eq, PartialEq)]
10pub enum ReadError {
11    #[error("inconsistent read, expected: {expected:?}, found: {found:?}")]
12    InconsistentRead {
13        expected: Option<CacheValue>,
14        found: Option<CacheValue>,
15    },
16}
17
18/// Cache entry can be in three states:
19/// - Does not exists, a given key was never inserted in the cache:
20///     ValueExists::No
21/// - Exists but the value is empty.
22///      ValueExists::Yes(None)
23/// - Exists and contains a value:
24///     ValueExists::Yes(Some(value))
25pub enum ValueExists {
26    Yes(Option<CacheValue>),
27    No,
28}
29
30/// CacheLog keeps track of the original and current values of each key accessed.
31/// By tracking original values, we can detect and eliminate write patterns where a key is
32/// changed temporarily and then reset to its original value
33#[derive(Default)]
34pub struct CacheLog {
35    log: HashMap<CacheKey, Access>,
36}
37
38impl CacheLog {
39    pub fn with_capacity(capacity: usize) -> Self {
40        Self {
41            log: HashMap::with_capacity(capacity),
42        }
43    }
44}
45
46impl CacheLog {
47    pub fn take_writes(self) -> Vec<(CacheKey, Option<CacheValue>)> {
48        self.log
49            .into_iter()
50            .filter_map(|(k, v)| filter_writes(k, v))
51            .collect()
52    }
53
54    /// Returns a value corresponding to the key.
55    pub fn get_value(&self, key: &CacheKey) -> ValueExists {
56        match self.log.get(key) {
57            Some(value) => ValueExists::Yes(value.last_value().clone()),
58            None => ValueExists::No,
59        }
60    }
61
62    /// The first read for a given key is inserted in the cache. For an existing cache entry
63    /// checks if reads are consistent with previous reads/writes.
64    pub fn add_read(&mut self, key: CacheKey, value: Option<CacheValue>) -> Result<(), ReadError> {
65        match self.log.entry(key) {
66            Entry::Occupied(existing) => {
67                let last_value = existing.get().last_value().clone();
68
69                if last_value != value {
70                    return Err(ReadError::InconsistentRead {
71                        expected: last_value,
72                        found: value,
73                    });
74                }
75                Ok(())
76            }
77            Entry::Vacant(vacancy) => {
78                vacancy.insert(Access::Read(value));
79                Ok(())
80            }
81        }
82    }
83
84    /// Adds a write entry to the cache.
85    pub fn add_write(&mut self, key: CacheKey, value: Option<CacheValue>) {
86        match self.log.entry(key) {
87            Entry::Occupied(mut existing) => {
88                existing.get_mut().write_value(value);
89            }
90            Entry::Vacant(vacancy) => {
91                vacancy.insert(Access::Write(value));
92            }
93        }
94    }
95
96    /// Merges two cache logs in a way that preserves the first read (from self) and the last write (from rhs)
97    /// for the same key in both caches.
98    /// The merge succeeds if the first read in the right cache for a key 'k' is consistent with the last read/write
99    /// in the self cache.
100    ///
101    /// Example:
102    ///
103    /// Cache1:        Cache2:
104    ///     k1 => v1       k1 => v1'
105    ///     k2 => v2       k3 => v3
106    ///
107    /// Merged Cache:
108    ///     k1 => v1.merge(v1') <- preserves the first read and the last write for 'k1'
109    ///     k2 => v2
110    ///     k3 => v3
111    pub fn merge_left(&mut self, rhs: Self) -> Result<(), MergeError> {
112        self.merge_left_with_filter_map(rhs, Some)
113    }
114
115    pub fn merge_writes_left(&mut self, rhs: Self) -> Result<(), MergeError> {
116        self.merge_left_with_filter_map(rhs, |(key, access)| match access {
117            Access::Read(_) => None,
118            Access::ReadThenWrite { modified, .. } => Some((key, Access::Write(modified))),
119            Access::Write(w) => Some((key, Access::Write(w))),
120        })
121    }
122
123    pub fn merge_reads_left(&mut self, rhs: Self) -> Result<(), MergeError> {
124        self.merge_left_with_filter_map(rhs, |(key, access)| match access {
125            Access::Read(read) => Some((key, Access::Read(read))),
126            Access::ReadThenWrite { original, .. } => Some((key, Access::Read(original))),
127            Access::Write(_) => None,
128        })
129    }
130
131    fn merge_left_with_filter_map<F: FnMut((CacheKey, Access)) -> Option<(CacheKey, Access)>>(
132        &mut self,
133        rhs: Self,
134        filter: F,
135    ) -> Result<(), MergeError> {
136        for (rhs_key, rhs_access) in rhs.log.into_iter().filter_map(filter) {
137            match self.log.get_mut(&rhs_key) {
138                Some(self_access) => self_access.merge(rhs_access)?,
139                None => {
140                    self.log.insert(rhs_key, rhs_access);
141                }
142            };
143        }
144        Ok(())
145    }
146
147    pub fn len(&self) -> usize {
148        self.log.len()
149    }
150
151    pub fn is_empty(&self) -> bool {
152        self.log.is_empty()
153    }
154}
155
156fn filter_writes(k: CacheKey, access: Access) -> Option<(CacheKey, Option<CacheValue>)> {
157    match access {
158        Access::Read(_) => None,
159        Access::ReadThenWrite { modified, .. } => Some((k, modified)),
160        Access::Write(write) => Some((k, write)),
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use proptest::prelude::*;
167
168    use super::*;
169    use crate::utils::test_util::{create_key, create_value};
170
171    impl ValueExists {
172        fn get(self) -> Option<CacheValue> {
173            match self {
174                ValueExists::Yes(value) => value,
175                ValueExists::No => unreachable!(),
176            }
177        }
178    }
179
180    #[test]
181    fn test_cache_read_write() {
182        let mut cache_log = CacheLog::default();
183        let key = create_key(1);
184
185        {
186            let value = create_value(2);
187
188            cache_log.add_read(key.clone(), value.clone()).unwrap();
189            let value_from_cache = cache_log.get_value(&key).get();
190            assert_eq!(value_from_cache, value);
191        }
192
193        {
194            let value = create_value(3);
195
196            cache_log.add_write(key.clone(), value.clone());
197
198            let value_from_cache = cache_log.get_value(&key).get();
199            assert_eq!(value_from_cache, value);
200
201            cache_log.add_read(key.clone(), value.clone()).unwrap();
202
203            let value_from_cache = cache_log.get_value(&key).get();
204            assert_eq!(value_from_cache, value);
205        }
206    }
207
208    #[derive(PartialEq, Eq, Clone, Debug)]
209    pub(crate) struct CacheEntry {
210        key: CacheKey,
211        value: Option<CacheValue>,
212    }
213
214    impl CacheEntry {
215        fn new(key: CacheKey, value: Option<CacheValue>) -> Self {
216            Self { key, value }
217        }
218    }
219
220    fn new_cache_entry(key: u8, value: u8) -> CacheEntry {
221        CacheEntry::new(create_key(key), create_value(value))
222    }
223
224    #[derive(Clone)]
225    enum ReadWrite {
226        Read(CacheEntry),
227        Write(CacheEntry),
228    }
229
230    impl ReadWrite {
231        fn get_value(self) -> CacheEntry {
232            match self {
233                ReadWrite::Read(r) => r,
234                ReadWrite::Write(w) => w,
235            }
236        }
237
238        fn check_cache_consistency(self, rhs: Self, merged: &CacheLog) {
239            match (self, rhs) {
240                (ReadWrite::Read(left_read), ReadWrite::Read(right_read)) => {
241                    assert_eq!(left_read, right_read);
242                    let value = merged.get_value(&left_read.key).get();
243                    assert_eq!(left_read.value, value)
244                }
245                (ReadWrite::Read(_), ReadWrite::Write(right_write)) => {
246                    let value = merged.get_value(&right_write.key).get();
247                    assert_eq!(right_write.value, value)
248                }
249                (ReadWrite::Write(left_write), ReadWrite::Read(right_write)) => {
250                    assert_eq!(left_write, right_write);
251                    let value = merged.get_value(&left_write.key).get();
252                    assert_eq!(left_write.value, value)
253                }
254                (ReadWrite::Write(_), ReadWrite::Write(right_write)) => {
255                    let value = merged.get_value(&right_write.key).get();
256                    assert_eq!(right_write.value, value)
257                }
258            }
259        }
260    }
261
262    impl CacheLog {
263        fn add_to_cache(&mut self, rw: ReadWrite) -> Result<(), ReadError> {
264            match rw {
265                ReadWrite::Read(r) => self.add_read(r.key, r.value),
266                ReadWrite::Write(w) => {
267                    self.add_write(w.key, w.value);
268                    Ok(())
269                }
270            }
271        }
272    }
273
274    #[derive(Clone)]
275    struct TestCase {
276        left: Option<ReadWrite>,
277        right: Option<ReadWrite>,
278    }
279
280    #[test]
281    fn test_add_read() {
282        let mut cache = CacheLog::default();
283
284        let entry = new_cache_entry(1, 1);
285
286        let res = cache.add_read(entry.key, entry.value);
287        assert!(res.is_ok());
288
289        let entry = new_cache_entry(2, 1);
290        let res = cache.add_read(entry.key, entry.value);
291        assert!(res.is_ok());
292
293        let entry = new_cache_entry(1, 2);
294        let res = cache.add_read(entry.key, entry.value);
295
296        assert_eq!(
297            res,
298            Err(ReadError::InconsistentRead {
299                expected: create_value(1),
300                found: create_value(2)
301            })
302        )
303    }
304
305    #[test]
306    fn test_merge_ok() {
307        let test_cases = vec![
308            TestCase {
309                left: Some(ReadWrite::Read(new_cache_entry(1, 11))),
310                right: Some(ReadWrite::Read(new_cache_entry(1, 11))),
311            },
312            TestCase {
313                left: Some(ReadWrite::Read(new_cache_entry(2, 12))),
314                right: Some(ReadWrite::Write(new_cache_entry(2, 22))),
315            },
316            TestCase {
317                left: Some(ReadWrite::Write(new_cache_entry(3, 13))),
318                right: Some(ReadWrite::Write(new_cache_entry(3, 23))),
319            },
320            TestCase {
321                left: Some(ReadWrite::Write(new_cache_entry(4, 14))),
322                right: None,
323            },
324            TestCase {
325                left: None,
326                right: Some(ReadWrite::Read(new_cache_entry(5, 25))),
327            },
328            TestCase {
329                left: None,
330                right: Some(ReadWrite::Write(new_cache_entry(6, 25))),
331            },
332            TestCase {
333                left: Some(ReadWrite::Write(new_cache_entry(7, 17))),
334                right: Some(ReadWrite::Read(new_cache_entry(7, 17))),
335            },
336        ];
337
338        test_merge_ok_helper(test_cases);
339    }
340
341    #[test]
342    fn test_merge_fail() {
343        let test_cases = vec![
344            TestCase {
345                left: Some(ReadWrite::Read(new_cache_entry(1, 11))),
346                // The read is inconsistent with the previous read.
347                right: Some(ReadWrite::Read(new_cache_entry(1, 12))),
348            },
349            TestCase {
350                left: Some(ReadWrite::Write(new_cache_entry(2, 12))),
351                // The read is inconsistent with the previous write.
352                right: Some(ReadWrite::Read(new_cache_entry(2, 22))),
353            },
354        ];
355
356        let result = test_merge_helper(test_cases);
357        assert!(result.is_err());
358    }
359
360    proptest! {
361        #[test]
362        fn test_merge_fuzz(s: u8) {
363            let num_cases = 15;
364            let mut testvec = Vec::with_capacity(num_cases);
365
366            for i in 0..num_cases {
367                testvec.push( s.wrapping_add(i as u8));
368            }
369
370            let test_cases = vec![
371                TestCase {
372                    left: Some(ReadWrite::Read(new_cache_entry(testvec[0], testvec[1]))),
373                    right: Some(ReadWrite::Read(new_cache_entry(testvec[0], testvec[1]))),
374                },
375                TestCase {
376                    left: Some(ReadWrite::Read(new_cache_entry(testvec[2], testvec[3]))),
377                    right: Some(ReadWrite::Write(new_cache_entry(testvec[2], testvec[4]))),
378                },
379                TestCase {
380                    left: Some(ReadWrite::Write(new_cache_entry(testvec[5], testvec[6]))),
381                    right: Some(ReadWrite::Write(new_cache_entry(testvec[5], testvec[7]))),
382                },
383                TestCase {
384                    left: Some(ReadWrite::Write(new_cache_entry(testvec[8], testvec[9]))),
385                    right: None,
386                },
387                TestCase {
388                    left: None,
389                    right: Some(ReadWrite::Read(new_cache_entry(testvec[10], testvec[11]))),
390                },
391                TestCase {
392                    left: None,
393                    right: Some(ReadWrite::Write(new_cache_entry(testvec[12], testvec[11]))),
394                },
395                TestCase {
396                    left: Some(ReadWrite::Write(new_cache_entry(testvec[13], testvec[14]))),
397                    right: Some(ReadWrite::Read(new_cache_entry(testvec[13], testvec[14]))),
398                },
399            ];
400
401            test_merge_ok_helper(test_cases);
402        }
403    }
404
405    fn test_merge_ok_helper(test_cases: Vec<TestCase>) {
406        let result = test_merge_helper(test_cases.clone());
407        assert!(result.is_ok());
408
409        let merged = result.unwrap();
410        assert_eq!(merged.log.len(), test_cases.len());
411
412        for TestCase { left, right } in test_cases {
413            match (left, right) {
414                (None, None) => unreachable!(),
415                (None, Some(rw)) => {
416                    let entry = rw.get_value();
417                    let value = merged.get_value(&entry.key).get();
418                    assert_eq!(entry.value, value)
419                }
420                (Some(rw), None) => {
421                    let entry = rw.get_value();
422                    let value = merged.get_value(&entry.key).get();
423                    assert_eq!(entry.value, value)
424                }
425                (Some(left_rw), Some(right_rw)) => {
426                    left_rw.check_cache_consistency(right_rw, &merged);
427                }
428            }
429        }
430    }
431
432    fn test_merge_helper(test_cases: Vec<TestCase>) -> Result<CacheLog, MergeError> {
433        let mut left_cache = CacheLog::default();
434        let mut right_cache = CacheLog::default();
435
436        for TestCase { left, right } in test_cases {
437            match (left, right) {
438                (None, None) => {}
439                (None, Some(rw)) => right_cache.add_to_cache(rw).unwrap(),
440                (Some(rw), None) => left_cache.add_to_cache(rw).unwrap(),
441                (Some(left_rw), Some(right_rw)) => {
442                    left_cache.add_to_cache(left_rw).unwrap();
443                    right_cache.add_to_cache(right_rw).unwrap();
444                }
445            }
446        }
447
448        left_cache.merge_left(right_cache)?;
449        Ok(left_cache)
450    }
451}