drasi_core/index_cache/
cached_result_index.rs1use std::{
16 hash::{Hash, Hasher},
17 sync::Arc,
18};
19
20use async_trait::async_trait;
21use caches::{lru::CacheError, Cache, DefaultHashBuilder, LRUCache};
22use hashers::builtin::DefaultHasher;
23use ordered_float::OrderedFloat;
24use tokio::sync::RwLock;
25
26use crate::{
27 evaluation::functions::aggregation::ValueAccumulator,
28 interface::{
29 AccumulatorIndex, IndexError, LazySortedSetStore, ResultIndex, ResultKey, ResultOwner,
30 ResultSequence, ResultSequenceCounter,
31 },
32};
33
34pub struct CachedResultIndex {
35 inner: Arc<dyn ResultIndex>,
36
37 value_cache: Arc<RwLock<LRUCache<u64, ValueAccumulator, DefaultHashBuilder>>>,
38 set_count_cache: Arc<RwLock<LRUCache<(u64, OrderedFloat<f64>), isize, DefaultHashBuilder>>>,
39}
40
41impl CachedResultIndex {
42 pub fn new(inner: Arc<dyn ResultIndex>, cache_size: usize) -> Result<Self, CacheError> {
43 log::info!("using cached result index with cache size {cache_size}");
44
45 let value_cache = LRUCache::new(cache_size)?;
46 let set_count_cache = LRUCache::new(cache_size)?;
47
48 Ok(CachedResultIndex {
49 inner,
50 value_cache: Arc::new(RwLock::new(value_cache)),
51 set_count_cache: Arc::new(RwLock::new(set_count_cache)),
52 })
53 }
54}
55
56#[async_trait]
57impl AccumulatorIndex for CachedResultIndex {
58 async fn clear(&self) -> Result<(), IndexError> {
59 self.inner.clear().await?;
60
61 let mut value_cache = self.value_cache.write().await;
62 value_cache.purge();
63
64 let mut set_count_cache = self.set_count_cache.write().await;
65 set_count_cache.purge();
66
67 Ok(())
68 }
69
70 async fn get(
71 &self,
72 key: &ResultKey,
73 owner: &ResultOwner,
74 ) -> Result<Option<ValueAccumulator>, IndexError> {
75 let cache_key = get_hash_key(owner, key);
76
77 let mut cache = self.value_cache.write().await;
78 match cache.get(&cache_key) {
79 None => {
80 let value = self.inner.get(key, owner).await?;
81 match value {
82 None => Ok(None),
83 Some(v) => {
84 _ = cache.put(cache_key, v.clone());
85 Ok(Some(v))
86 }
87 }
88 }
89 Some(v) => Ok(Some(v.clone())),
90 }
91 }
92
93 async fn set(
94 &self,
95 key: ResultKey,
96 owner: ResultOwner,
97 value: Option<ValueAccumulator>,
98 ) -> Result<(), IndexError> {
99 let cache_key = get_hash_key(&owner, &key);
100
101 self.inner.set(key, owner, value.clone()).await?;
102
103 let mut cache = self.value_cache.write().await;
104 match value {
105 None => _ = cache.remove(&cache_key),
106 Some(v) => _ = cache.put(cache_key, v),
107 };
108
109 Ok(())
110 }
111}
112
113#[async_trait]
114impl LazySortedSetStore for CachedResultIndex {
115 async fn get_next(
116 &self,
117 set_id: u64,
118 value: Option<OrderedFloat<f64>>,
119 ) -> Result<Option<(OrderedFloat<f64>, isize)>, IndexError> {
120 self.inner.get_next(set_id, value).await
121 }
122
123 async fn get_value_count(
124 &self,
125 set_id: u64,
126 value: OrderedFloat<f64>,
127 ) -> Result<isize, IndexError> {
128 let cache_key = (set_id, value);
129
130 let mut cache = self.set_count_cache.write().await;
131 match cache.get(&cache_key) {
132 None => {
133 let value = self.inner.get_value_count(set_id, value).await?;
134 _ = cache.put(cache_key, value);
135 Ok(value)
136 }
137 Some(v) => Ok(*v),
138 }
139 }
140
141 async fn increment_value_count(
142 &self,
143 set_id: u64,
144 value: OrderedFloat<f64>,
145 delta: isize,
146 ) -> Result<(), IndexError> {
147 self.inner
148 .increment_value_count(set_id, value, delta)
149 .await?;
150
151 let cache_key = (set_id, value);
152 let mut cache = self.set_count_cache.write().await;
153
154 match cache.get_mut(&cache_key) {
155 None => _ = cache.put(cache_key, delta),
156 Some(v) => *v += delta,
157 }
158
159 Ok(())
160 }
161}
162
163#[async_trait]
164impl ResultSequenceCounter for CachedResultIndex {
165 async fn apply_sequence(
166 &self,
167 sequence: u64,
168 source_change_id: &str,
169 ) -> Result<(), IndexError> {
170 self.inner.apply_sequence(sequence, source_change_id).await
171 }
172
173 async fn get_sequence(&self) -> Result<ResultSequence, IndexError> {
174 self.inner.get_sequence().await
175 }
176}
177
178impl ResultIndex for CachedResultIndex {}
179
180fn get_hash_key(owner: &ResultOwner, key: &ResultKey) -> u64 {
181 let mut hasher = DefaultHasher::new();
182 owner.hash(&mut hasher);
183 key.hash(&mut hasher);
184 hasher.finish()
185}