eventlogs/caches/
fake.rs

1use crate::{
2    caches::{ReductionCache, ReductionCacheError},
3    ids::LogId,
4    Reduction,
5};
6use std::collections::HashMap;
7use tokio::sync::{mpsc::Sender, Mutex};
8
9#[derive(Debug, PartialEq)]
10pub enum FakeReductionCacheOp<A: Clone + Send + Sync> {
11    Get {
12        log_id: LogId,
13        response: Option<Reduction<A>>,
14    },
15    Put {
16        reduction: Reduction<A>,
17    },
18}
19
20#[derive(Debug)]
21struct DB<A: Clone + Sync + Send> {
22    table: HashMap<LogId, Reduction<A>>,
23    op_sender: Option<Sender<FakeReductionCacheOp<A>>>,
24}
25
26/// A fake implementation of [ReductionCache] that should only be used for testing.
27#[derive(Debug)]
28pub struct FakeReductionCache<A: Clone + Send + Sync> {
29    mx_db: Mutex<DB<A>>,
30}
31
32impl<A> FakeReductionCache<A>
33where
34    A: Clone + Sync + Send,
35{
36    pub fn new() -> Self {
37        Self {
38            mx_db: Mutex::new(DB {
39                table: HashMap::new(),
40                op_sender: None,
41            }),
42        }
43    }
44
45    pub fn with_notifications(op_sender: Sender<FakeReductionCacheOp<A>>) -> Self {
46        Self {
47            mx_db: Mutex::new(DB {
48                table: HashMap::new(),
49                op_sender: Some(op_sender),
50            }),
51        }
52    }
53
54    pub async fn evict(&self, log_id: &LogId) {
55        let mut db = self.mx_db.lock().await;
56        db.table.remove(log_id);
57    }
58}
59
60impl<A> Default for FakeReductionCache<A>
61where
62    A: Clone + Sync + Send,
63{
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69impl<A> ReductionCache<A> for FakeReductionCache<A>
70where
71    A: Clone + Sync + Send,
72{
73    async fn put(&self, reduction: &Reduction<A>) -> Result<(), ReductionCacheError> {
74        let mut db = self.mx_db.lock().await;
75
76        db.table
77            .insert(reduction.log_id().clone(), reduction.clone());
78
79        if let Some(sender) = &db.op_sender {
80            sender
81                .send(FakeReductionCacheOp::Put {
82                    reduction: reduction.clone(),
83                })
84                .await
85                .unwrap();
86        }
87
88        Ok(())
89    }
90
91    async fn get(&self, log_id: &LogId) -> Result<Option<Reduction<A>>, ReductionCacheError> {
92        let db = self.mx_db.lock().await;
93
94        let maybe_reduction = db.table.get(log_id).cloned();
95
96        if let Some(sender) = &db.op_sender {
97            sender
98                .send(FakeReductionCacheOp::Get {
99                    log_id: log_id.clone(),
100                    response: maybe_reduction.clone(),
101                })
102                .await
103                .unwrap();
104        }
105
106        Ok(maybe_reduction)
107    }
108}