drasi_core/index_cache/
cached_element_index.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::HashMap, sync::Arc};
16
17use async_stream::stream;
18use async_trait::async_trait;
19use caches::{lru::CacheError, Cache, DefaultHashBuilder, LRUCache};
20use tokio::sync::RwLock;
21use tokio_stream::StreamExt;
22
23use crate::{
24    interface::{ElementIndex, ElementStream, IndexError},
25    models::{Element, ElementReference, QueryJoin},
26    path_solver::match_path::MatchPath,
27};
28
29pub struct CachedElementIndex {
30    element_index: Arc<dyn ElementIndex>,
31    element_cache: Arc<RwLock<LRUCache<ElementReference, Arc<Element>, DefaultHashBuilder>>>,
32    slot_cache: Arc<RwLock<LRUCache<ElementReference, HashMap<usize, bool>, DefaultHashBuilder>>>,
33    inbound_cache:
34        Arc<RwLock<LRUCache<(ElementReference, usize), Vec<ElementReference>, DefaultHashBuilder>>>,
35    outbound_cache:
36        Arc<RwLock<LRUCache<(ElementReference, usize), Vec<ElementReference>, DefaultHashBuilder>>>,
37}
38
39impl CachedElementIndex {
40    pub fn new(
41        element_index: Arc<dyn ElementIndex>,
42        cache_size: usize,
43    ) -> Result<Self, CacheError> {
44        log::info!("using cached element index with size {cache_size}");
45
46        let element_cache = LRUCache::new(cache_size)?;
47        let element_cache = Arc::new(RwLock::new(element_cache));
48
49        let slot_cache = LRUCache::new(cache_size)?;
50        let slot_cache = Arc::new(RwLock::new(slot_cache));
51
52        let inbound_cache = LRUCache::new(cache_size)?;
53        let inbound_cache = Arc::new(RwLock::new(inbound_cache));
54
55        let outbound_cache = LRUCache::new(cache_size)?;
56        let outbound_cache = Arc::new(RwLock::new(outbound_cache));
57
58        Ok(Self {
59            element_index,
60            element_cache,
61            slot_cache,
62            inbound_cache,
63            outbound_cache,
64        })
65    }
66}
67
68#[async_trait]
69impl ElementIndex for CachedElementIndex {
70    async fn get_element(
71        &self,
72        element_ref: &ElementReference,
73    ) -> Result<Option<Arc<Element>>, IndexError> {
74        let element_index = self.element_index.clone();
75        let cache = self.element_cache.clone();
76
77        get_element_internal(element_index, cache, element_ref).await
78    }
79
80    async fn set_element(
81        &self,
82        element: &Element,
83        slot_affinity: &Vec<usize>,
84    ) -> Result<(), IndexError> {
85        self.element_index
86            .set_element(element, slot_affinity)
87            .await?;
88
89        let mut element_cache = self.element_cache.write().await;
90        element_cache.put(element.get_reference().clone(), Arc::new(element.clone()));
91
92        let mut slot_cache = self.slot_cache.write().await;
93        slot_cache.remove(element.get_reference());
94
95        let mut inbound_cache = self.inbound_cache.write().await;
96        inbound_cache.purge();
97
98        let mut outbound_cache = self.outbound_cache.write().await;
99        outbound_cache.purge();
100
101        Ok(())
102    }
103
104    async fn delete_element(&self, element_ref: &ElementReference) -> Result<(), IndexError> {
105        self.element_index.delete_element(element_ref).await?;
106
107        let mut element_cache = self.element_cache.write().await;
108        element_cache.remove(element_ref);
109
110        let mut slot_cache = self.slot_cache.write().await;
111        slot_cache.remove(element_ref);
112
113        let mut inbound_cache = self.inbound_cache.write().await;
114        inbound_cache.purge();
115
116        let mut outbound_cache = self.outbound_cache.write().await;
117        outbound_cache.purge();
118
119        Ok(())
120    }
121
122    async fn get_slot_element_by_ref(
123        &self,
124        slot: usize,
125        element_ref: &ElementReference,
126    ) -> Result<Option<Arc<Element>>, IndexError> {
127        let mut slot_cache = self.slot_cache.write().await;
128        let slot_elements = slot_cache.get_mut(element_ref);
129
130        match slot_elements {
131            Some(slot_elements) => {
132                let slot_element = slot_elements.get(&slot);
133                match slot_element {
134                    Some(slot_element) => {
135                        if *slot_element {
136                            self.get_element(element_ref).await
137                        } else {
138                            Ok(None)
139                        }
140                    }
141                    None => {
142                        let result = self
143                            .element_index
144                            .get_slot_element_by_ref(slot, element_ref)
145                            .await?;
146                        slot_elements.insert(slot, result.is_some());
147
148                        Ok(result)
149                    }
150                }
151            }
152            None => {
153                drop(slot_cache);
154                let result = self
155                    .element_index
156                    .get_slot_element_by_ref(slot, element_ref)
157                    .await?;
158
159                let mut slot_set = HashMap::new();
160                slot_set.insert(slot, result.is_some());
161
162                let mut slot_cache = self.slot_cache.write().await;
163                slot_cache.put(element_ref.clone(), slot_set);
164                drop(slot_cache);
165
166                Ok(result)
167            }
168        }
169    }
170
171    async fn get_slot_elements_by_inbound(
172        &self,
173        slot: usize,
174        inbound_ref: &ElementReference,
175    ) -> Result<ElementStream, IndexError> {
176        let mut inbound_cache = self.inbound_cache.write().await;
177        let key = (inbound_ref.clone(), slot);
178        match inbound_cache.get(&key) {
179            Some(elements) => {
180                let elements = elements.clone();
181                drop(inbound_cache);
182                let element_cache = self.element_cache.clone();
183                let element_index = self.element_index.clone();
184                let stream = stream! {
185                    for element_ref in elements {
186                        match get_element_internal(element_index.clone(), element_cache.clone(), &element_ref).await? {
187                            Some(element) => yield Ok(element),
188                            None => continue,
189                        }
190                    }
191                };
192                Ok(Box::pin(stream))
193            }
194            None => {
195                drop(inbound_cache);
196                let cache_source = self.inbound_cache.clone();
197                let element_index = self.element_index.clone();
198                let inbound_ref = inbound_ref.clone();
199                let stream = stream! {
200                    let mut element_stream = element_index.get_slot_elements_by_inbound(slot, &inbound_ref).await?;
201                    let mut elements = Vec::new();
202                    while let Some(element) = element_stream.next().await {
203                        match element {
204                            Ok(element) => {
205                                elements.push(element.get_reference().clone());
206                                yield Ok(element);
207                            },
208                            Err(err) => {
209                                yield Err(err);
210                            }
211                        };
212                    }
213
214                    let mut inbound_cache = cache_source.write().await;
215                    inbound_cache.put((inbound_ref, slot), elements);
216                    drop(inbound_cache);
217                };
218                Ok(Box::pin(stream))
219            }
220        }
221    }
222
223    async fn get_slot_elements_by_outbound(
224        &self,
225        slot: usize,
226        outbound_ref: &ElementReference,
227    ) -> Result<ElementStream, IndexError> {
228        let mut outbound_cache = self.outbound_cache.write().await;
229        let key = (outbound_ref.clone(), slot);
230        match outbound_cache.get(&key) {
231            Some(elements) => {
232                let elements = elements.clone();
233                drop(outbound_cache);
234                let element_cache = self.element_cache.clone();
235                let element_index = self.element_index.clone();
236                let stream = stream! {
237                    for element_ref in elements {
238                        match get_element_internal(element_index.clone(), element_cache.clone(), &element_ref).await? {
239                            Some(element) => yield Ok(element),
240                            None => continue,
241                        }
242                    }
243                };
244                Ok(Box::pin(stream))
245            }
246            None => {
247                drop(outbound_cache);
248                let cache_source = self.outbound_cache.clone();
249                let element_index = self.element_index.clone();
250                let outbound_ref = outbound_ref.clone();
251                let stream = stream! {
252                    let mut element_stream = element_index.get_slot_elements_by_outbound(slot, &outbound_ref).await?;
253                    let mut elements = Vec::new();
254                    while let Some(element) = element_stream.next().await {
255                        match element {
256                            Ok(element) => {
257                                elements.push(element.get_reference().clone());
258                                yield Ok(element);
259                            },
260                            Err(err) => {
261                                yield Err(err);
262                            }
263                        };
264                    }
265
266                    let mut outbound_cache = cache_source.write().await;
267                    outbound_cache.put((outbound_ref, slot), elements);
268                    drop(outbound_cache);
269                };
270                Ok(Box::pin(stream))
271            }
272        }
273    }
274
275    async fn clear(&self) -> Result<(), IndexError> {
276        self.element_index.clear().await?;
277
278        let mut element_cache = self.element_cache.write().await;
279        element_cache.purge();
280
281        let mut slot_cache = self.slot_cache.write().await;
282        slot_cache.purge();
283
284        let mut inbound_cache = self.inbound_cache.write().await;
285        inbound_cache.purge();
286
287        let mut outbound_cache = self.outbound_cache.write().await;
288        outbound_cache.purge();
289
290        Ok(())
291    }
292
293    async fn set_joins(&self, match_path: &MatchPath, joins: &Vec<Arc<QueryJoin>>) {
294        self.element_index.set_joins(match_path, joins).await;
295    }
296}
297
298async fn get_element_internal(
299    element_index: Arc<dyn ElementIndex>,
300    cache: Arc<RwLock<LRUCache<ElementReference, Arc<Element>, DefaultHashBuilder>>>,
301    element_ref: &ElementReference,
302) -> Result<Option<Arc<Element>>, IndexError> {
303    let mut element_cache = cache.write().await;
304    let element = element_cache.get(element_ref);
305    match element {
306        Some(element) => Ok(Some(element.clone())),
307        None => {
308            drop(element_cache);
309            let element = element_index.get_element(element_ref).await?;
310            match element {
311                Some(element) => {
312                    let mut element_cache = cache.write().await;
313                    element_cache.put(element_ref.clone(), element.clone());
314                    Ok(Some(element))
315                }
316                None => Ok(None),
317            }
318        }
319    }
320}