1use std::{collections::HashMap, sync::Arc};
16
17use async_stream::stream;
18use async_trait::async_trait;
19use caches::{lru::CacheError, Cache, DefaultHashBuilder, LRUCache};
20use tokio::sync::RwLock;
21use tokio_stream::StreamExt;
22
23use crate::{
24 interface::{ElementIndex, ElementStream, IndexError},
25 models::{Element, ElementReference, QueryJoin},
26 path_solver::match_path::MatchPath,
27};
28
29pub struct CachedElementIndex {
30 element_index: Arc<dyn ElementIndex>,
31 element_cache: Arc<RwLock<LRUCache<ElementReference, Arc<Element>, DefaultHashBuilder>>>,
32 slot_cache: Arc<RwLock<LRUCache<ElementReference, HashMap<usize, bool>, DefaultHashBuilder>>>,
33 inbound_cache:
34 Arc<RwLock<LRUCache<(ElementReference, usize), Vec<ElementReference>, DefaultHashBuilder>>>,
35 outbound_cache:
36 Arc<RwLock<LRUCache<(ElementReference, usize), Vec<ElementReference>, DefaultHashBuilder>>>,
37}
38
39impl CachedElementIndex {
40 pub fn new(
41 element_index: Arc<dyn ElementIndex>,
42 cache_size: usize,
43 ) -> Result<Self, CacheError> {
44 log::info!("using cached element index with size {cache_size}");
45
46 let element_cache = LRUCache::new(cache_size)?;
47 let element_cache = Arc::new(RwLock::new(element_cache));
48
49 let slot_cache = LRUCache::new(cache_size)?;
50 let slot_cache = Arc::new(RwLock::new(slot_cache));
51
52 let inbound_cache = LRUCache::new(cache_size)?;
53 let inbound_cache = Arc::new(RwLock::new(inbound_cache));
54
55 let outbound_cache = LRUCache::new(cache_size)?;
56 let outbound_cache = Arc::new(RwLock::new(outbound_cache));
57
58 Ok(Self {
59 element_index,
60 element_cache,
61 slot_cache,
62 inbound_cache,
63 outbound_cache,
64 })
65 }
66}
67
68#[async_trait]
69impl ElementIndex for CachedElementIndex {
70 async fn get_element(
71 &self,
72 element_ref: &ElementReference,
73 ) -> Result<Option<Arc<Element>>, IndexError> {
74 let element_index = self.element_index.clone();
75 let cache = self.element_cache.clone();
76
77 get_element_internal(element_index, cache, element_ref).await
78 }
79
80 async fn set_element(
81 &self,
82 element: &Element,
83 slot_affinity: &Vec<usize>,
84 ) -> Result<(), IndexError> {
85 self.element_index
86 .set_element(element, slot_affinity)
87 .await?;
88
89 let mut element_cache = self.element_cache.write().await;
90 element_cache.put(element.get_reference().clone(), Arc::new(element.clone()));
91
92 let mut slot_cache = self.slot_cache.write().await;
93 slot_cache.remove(element.get_reference());
94
95 let mut inbound_cache = self.inbound_cache.write().await;
96 inbound_cache.purge();
97
98 let mut outbound_cache = self.outbound_cache.write().await;
99 outbound_cache.purge();
100
101 Ok(())
102 }
103
104 async fn delete_element(&self, element_ref: &ElementReference) -> Result<(), IndexError> {
105 self.element_index.delete_element(element_ref).await?;
106
107 let mut element_cache = self.element_cache.write().await;
108 element_cache.remove(element_ref);
109
110 let mut slot_cache = self.slot_cache.write().await;
111 slot_cache.remove(element_ref);
112
113 let mut inbound_cache = self.inbound_cache.write().await;
114 inbound_cache.purge();
115
116 let mut outbound_cache = self.outbound_cache.write().await;
117 outbound_cache.purge();
118
119 Ok(())
120 }
121
122 async fn get_slot_element_by_ref(
123 &self,
124 slot: usize,
125 element_ref: &ElementReference,
126 ) -> Result<Option<Arc<Element>>, IndexError> {
127 let mut slot_cache = self.slot_cache.write().await;
128 let slot_elements = slot_cache.get_mut(element_ref);
129
130 match slot_elements {
131 Some(slot_elements) => {
132 let slot_element = slot_elements.get(&slot);
133 match slot_element {
134 Some(slot_element) => {
135 if *slot_element {
136 self.get_element(element_ref).await
137 } else {
138 Ok(None)
139 }
140 }
141 None => {
142 let result = self
143 .element_index
144 .get_slot_element_by_ref(slot, element_ref)
145 .await?;
146 slot_elements.insert(slot, result.is_some());
147
148 Ok(result)
149 }
150 }
151 }
152 None => {
153 drop(slot_cache);
154 let result = self
155 .element_index
156 .get_slot_element_by_ref(slot, element_ref)
157 .await?;
158
159 let mut slot_set = HashMap::new();
160 slot_set.insert(slot, result.is_some());
161
162 let mut slot_cache = self.slot_cache.write().await;
163 slot_cache.put(element_ref.clone(), slot_set);
164 drop(slot_cache);
165
166 Ok(result)
167 }
168 }
169 }
170
171 async fn get_slot_elements_by_inbound(
172 &self,
173 slot: usize,
174 inbound_ref: &ElementReference,
175 ) -> Result<ElementStream, IndexError> {
176 let mut inbound_cache = self.inbound_cache.write().await;
177 let key = (inbound_ref.clone(), slot);
178 match inbound_cache.get(&key) {
179 Some(elements) => {
180 let elements = elements.clone();
181 drop(inbound_cache);
182 let element_cache = self.element_cache.clone();
183 let element_index = self.element_index.clone();
184 let stream = stream! {
185 for element_ref in elements {
186 match get_element_internal(element_index.clone(), element_cache.clone(), &element_ref).await? {
187 Some(element) => yield Ok(element),
188 None => continue,
189 }
190 }
191 };
192 Ok(Box::pin(stream))
193 }
194 None => {
195 drop(inbound_cache);
196 let cache_source = self.inbound_cache.clone();
197 let element_index = self.element_index.clone();
198 let inbound_ref = inbound_ref.clone();
199 let stream = stream! {
200 let mut element_stream = element_index.get_slot_elements_by_inbound(slot, &inbound_ref).await?;
201 let mut elements = Vec::new();
202 while let Some(element) = element_stream.next().await {
203 match element {
204 Ok(element) => {
205 elements.push(element.get_reference().clone());
206 yield Ok(element);
207 },
208 Err(err) => {
209 yield Err(err);
210 }
211 };
212 }
213
214 let mut inbound_cache = cache_source.write().await;
215 inbound_cache.put((inbound_ref, slot), elements);
216 drop(inbound_cache);
217 };
218 Ok(Box::pin(stream))
219 }
220 }
221 }
222
223 async fn get_slot_elements_by_outbound(
224 &self,
225 slot: usize,
226 outbound_ref: &ElementReference,
227 ) -> Result<ElementStream, IndexError> {
228 let mut outbound_cache = self.outbound_cache.write().await;
229 let key = (outbound_ref.clone(), slot);
230 match outbound_cache.get(&key) {
231 Some(elements) => {
232 let elements = elements.clone();
233 drop(outbound_cache);
234 let element_cache = self.element_cache.clone();
235 let element_index = self.element_index.clone();
236 let stream = stream! {
237 for element_ref in elements {
238 match get_element_internal(element_index.clone(), element_cache.clone(), &element_ref).await? {
239 Some(element) => yield Ok(element),
240 None => continue,
241 }
242 }
243 };
244 Ok(Box::pin(stream))
245 }
246 None => {
247 drop(outbound_cache);
248 let cache_source = self.outbound_cache.clone();
249 let element_index = self.element_index.clone();
250 let outbound_ref = outbound_ref.clone();
251 let stream = stream! {
252 let mut element_stream = element_index.get_slot_elements_by_outbound(slot, &outbound_ref).await?;
253 let mut elements = Vec::new();
254 while let Some(element) = element_stream.next().await {
255 match element {
256 Ok(element) => {
257 elements.push(element.get_reference().clone());
258 yield Ok(element);
259 },
260 Err(err) => {
261 yield Err(err);
262 }
263 };
264 }
265
266 let mut outbound_cache = cache_source.write().await;
267 outbound_cache.put((outbound_ref, slot), elements);
268 drop(outbound_cache);
269 };
270 Ok(Box::pin(stream))
271 }
272 }
273 }
274
275 async fn clear(&self) -> Result<(), IndexError> {
276 self.element_index.clear().await?;
277
278 let mut element_cache = self.element_cache.write().await;
279 element_cache.purge();
280
281 let mut slot_cache = self.slot_cache.write().await;
282 slot_cache.purge();
283
284 let mut inbound_cache = self.inbound_cache.write().await;
285 inbound_cache.purge();
286
287 let mut outbound_cache = self.outbound_cache.write().await;
288 outbound_cache.purge();
289
290 Ok(())
291 }
292
293 async fn set_joins(&self, match_path: &MatchPath, joins: &Vec<Arc<QueryJoin>>) {
294 self.element_index.set_joins(match_path, joins).await;
295 }
296}
297
298async fn get_element_internal(
299 element_index: Arc<dyn ElementIndex>,
300 cache: Arc<RwLock<LRUCache<ElementReference, Arc<Element>, DefaultHashBuilder>>>,
301 element_ref: &ElementReference,
302) -> Result<Option<Arc<Element>>, IndexError> {
303 let mut element_cache = cache.write().await;
304 let element = element_cache.get(element_ref);
305 match element {
306 Some(element) => Ok(Some(element.clone())),
307 None => {
308 drop(element_cache);
309 let element = element_index.get_element(element_ref).await?;
310 match element {
311 Some(element) => {
312 let mut element_cache = cache.write().await;
313 element_cache.put(element_ref.clone(), element.clone());
314 Ok(Some(element))
315 }
316 None => Ok(None),
317 }
318 }
319 }
320}