drasi_core/index_cache/
cached_result_index.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    hash::{Hash, Hasher},
17    sync::Arc,
18};
19
20use async_trait::async_trait;
21use caches::{lru::CacheError, Cache, DefaultHashBuilder, LRUCache};
22use hashers::builtin::DefaultHasher;
23use ordered_float::OrderedFloat;
24use tokio::sync::RwLock;
25
26use crate::{
27    evaluation::functions::aggregation::ValueAccumulator,
28    interface::{
29        AccumulatorIndex, IndexError, LazySortedSetStore, ResultIndex, ResultKey, ResultOwner,
30        ResultSequence, ResultSequenceCounter,
31    },
32};
33
34pub struct CachedResultIndex {
35    inner: Arc<dyn ResultIndex>,
36
37    value_cache: Arc<RwLock<LRUCache<u64, ValueAccumulator, DefaultHashBuilder>>>,
38    set_count_cache: Arc<RwLock<LRUCache<(u64, OrderedFloat<f64>), isize, DefaultHashBuilder>>>,
39}
40
41impl CachedResultIndex {
42    pub fn new(inner: Arc<dyn ResultIndex>, cache_size: usize) -> Result<Self, CacheError> {
43        log::info!("using cached result index with cache size {cache_size}");
44
45        let value_cache = LRUCache::new(cache_size)?;
46        let set_count_cache = LRUCache::new(cache_size)?;
47
48        Ok(CachedResultIndex {
49            inner,
50            value_cache: Arc::new(RwLock::new(value_cache)),
51            set_count_cache: Arc::new(RwLock::new(set_count_cache)),
52        })
53    }
54}
55
56#[async_trait]
57impl AccumulatorIndex for CachedResultIndex {
58    async fn clear(&self) -> Result<(), IndexError> {
59        self.inner.clear().await?;
60
61        let mut value_cache = self.value_cache.write().await;
62        value_cache.purge();
63
64        let mut set_count_cache = self.set_count_cache.write().await;
65        set_count_cache.purge();
66
67        Ok(())
68    }
69
70    async fn get(
71        &self,
72        key: &ResultKey,
73        owner: &ResultOwner,
74    ) -> Result<Option<ValueAccumulator>, IndexError> {
75        let cache_key = get_hash_key(owner, key);
76
77        let mut cache = self.value_cache.write().await;
78        match cache.get(&cache_key) {
79            None => {
80                let value = self.inner.get(key, owner).await?;
81                match value {
82                    None => Ok(None),
83                    Some(v) => {
84                        _ = cache.put(cache_key, v.clone());
85                        Ok(Some(v))
86                    }
87                }
88            }
89            Some(v) => Ok(Some(v.clone())),
90        }
91    }
92
93    async fn set(
94        &self,
95        key: ResultKey,
96        owner: ResultOwner,
97        value: Option<ValueAccumulator>,
98    ) -> Result<(), IndexError> {
99        let cache_key = get_hash_key(&owner, &key);
100
101        self.inner.set(key, owner, value.clone()).await?;
102
103        let mut cache = self.value_cache.write().await;
104        match value {
105            None => _ = cache.remove(&cache_key),
106            Some(v) => _ = cache.put(cache_key, v),
107        };
108
109        Ok(())
110    }
111}
112
113#[async_trait]
114impl LazySortedSetStore for CachedResultIndex {
115    async fn get_next(
116        &self,
117        set_id: u64,
118        value: Option<OrderedFloat<f64>>,
119    ) -> Result<Option<(OrderedFloat<f64>, isize)>, IndexError> {
120        self.inner.get_next(set_id, value).await
121    }
122
123    async fn get_value_count(
124        &self,
125        set_id: u64,
126        value: OrderedFloat<f64>,
127    ) -> Result<isize, IndexError> {
128        let cache_key = (set_id, value);
129
130        let mut cache = self.set_count_cache.write().await;
131        match cache.get(&cache_key) {
132            None => {
133                let value = self.inner.get_value_count(set_id, value).await?;
134                _ = cache.put(cache_key, value);
135                Ok(value)
136            }
137            Some(v) => Ok(*v),
138        }
139    }
140
141    async fn increment_value_count(
142        &self,
143        set_id: u64,
144        value: OrderedFloat<f64>,
145        delta: isize,
146    ) -> Result<(), IndexError> {
147        self.inner
148            .increment_value_count(set_id, value, delta)
149            .await?;
150
151        let cache_key = (set_id, value);
152        let mut cache = self.set_count_cache.write().await;
153
154        match cache.get_mut(&cache_key) {
155            None => _ = cache.put(cache_key, delta),
156            Some(v) => *v += delta,
157        }
158
159        Ok(())
160    }
161}
162
163#[async_trait]
164impl ResultSequenceCounter for CachedResultIndex {
165    async fn apply_sequence(
166        &self,
167        sequence: u64,
168        source_change_id: &str,
169    ) -> Result<(), IndexError> {
170        self.inner.apply_sequence(sequence, source_change_id).await
171    }
172
173    async fn get_sequence(&self) -> Result<ResultSequence, IndexError> {
174        self.inner.get_sequence().await
175    }
176}
177
178impl ResultIndex for CachedResultIndex {}
179
180fn get_hash_key(owner: &ResultOwner, key: &ResultKey) -> u64 {
181    let mut hasher = DefaultHasher::new();
182    owner.hash(&mut hasher);
183    key.hash(&mut hasher);
184    hasher.finish()
185}