ballista_cache/metrics/
loading_cache.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::loading_cache::CacheGetStatus;
19use std::fmt::Debug;
20use std::hash::Hash;
21use std::marker::PhantomData;
22
23use crate::listener::cache_policy::CachePolicyListener;
24use crate::listener::loading_cache::LoadingCacheListener;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27
28/// Struct containing all the metrics
29#[derive(Debug)]
30pub struct Metrics<K, V>
31where
32    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
33    V: Clone + Debug + Send + 'static,
34{
35    get_hit_count: U64Counter,
36    get_miss_count: U64Counter,
37    get_miss_already_loading_count: U64Counter,
38    get_cancelled_count: U64Counter,
39    put_count: U64Counter,
40    eviction_count: U64Counter,
41    _key_marker: PhantomData<K>,
42    _value_marker: PhantomData<V>,
43}
44
45impl<K, V> Default for Metrics<K, V>
46where
47    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
48    V: Clone + Debug + Send + 'static,
49{
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl<K, V> Metrics<K, V>
56where
57    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
58    V: Clone + Debug + Send + 'static,
59{
60    pub fn new() -> Self {
61        Self {
62            get_hit_count: Default::default(),
63            get_miss_count: Default::default(),
64            get_miss_already_loading_count: Default::default(),
65            get_cancelled_count: Default::default(),
66            put_count: Default::default(),
67            eviction_count: Default::default(),
68            _key_marker: Default::default(),
69            _value_marker: Default::default(),
70        }
71    }
72
73    pub fn get_hit_count(&self) -> u64 {
74        self.get_hit_count.fetch()
75    }
76
77    pub fn get_miss_count(&self) -> u64 {
78        self.get_miss_count.fetch()
79    }
80
81    pub fn get_miss_already_loading_count(&self) -> u64 {
82        self.get_miss_already_loading_count.fetch()
83    }
84
85    pub fn get_cancelled_count(&self) -> u64 {
86        self.get_cancelled_count.fetch()
87    }
88
89    pub fn put_count(&self) -> u64 {
90        self.put_count.fetch()
91    }
92
93    pub fn eviction_count(&self) -> u64 {
94        self.eviction_count.fetch()
95    }
96}
97
98// Since we don't store K and V directly, it will be safe.
99unsafe impl<K, V> Sync for Metrics<K, V>
100where
101    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
102    V: Clone + Debug + Send + 'static,
103{
104}
105
106impl<K, V> LoadingCacheListener for Metrics<K, V>
107where
108    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
109    V: Clone + Debug + Send + 'static,
110{
111    type K = K;
112    type V = V;
113
114    fn listen_on_get_if_present(&self, _k: Self::K, v: Option<Self::V>) {
115        if v.is_some() {
116            &self.get_hit_count
117        } else {
118            &self.get_miss_count
119        }
120        .inc(1);
121    }
122
123    fn listen_on_get(&self, _k: Self::K, _v: Self::V, status: CacheGetStatus) {
124        match status {
125            CacheGetStatus::Hit => &self.get_hit_count,
126
127            CacheGetStatus::Miss => &self.get_miss_count,
128
129            CacheGetStatus::MissAlreadyLoading => &self.get_miss_already_loading_count,
130        }
131        .inc(1);
132    }
133
134    fn listen_on_put(&self, _k: Self::K, _v: Self::V) {
135        // Do nothing
136    }
137
138    fn listen_on_invalidate(&self, _k: Self::K) {
139        // Do nothing
140    }
141
142    fn listen_on_get_cancelling(&self, _k: Self::K) {
143        self.get_cancelled_count.inc(1);
144    }
145}
146
147impl<K, V> CachePolicyListener for Metrics<K, V>
148where
149    K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
150    V: Clone + Debug + Send + 'static,
151{
152    type K = K;
153    type V = V;
154
155    fn listen_on_get(&self, _k: Self::K, _v: Option<Self::V>) {
156        // Do nothing
157    }
158
159    fn listen_on_peek(&self, _k: Self::K, _v: Option<Self::V>) {
160        // Do nothing
161    }
162
163    fn listen_on_put(&self, _k: Self::K, _v: Self::V, _old_v: Option<Self::V>) {
164        self.put_count.inc(1);
165    }
166
167    fn listen_on_remove(&self, _k: Self::K, _v: Option<Self::V>) {
168        self.eviction_count.inc(1);
169    }
170
171    fn listen_on_pop(&self, _entry: (Self::K, Self::V)) {
172        self.eviction_count.inc(1);
173    }
174}
175
176/// A monotonic counter
177#[derive(Debug, Clone, Default)]
178pub struct U64Counter {
179    counter: Arc<AtomicU64>,
180}
181
182impl U64Counter {
183    pub fn inc(&self, count: u64) {
184        self.counter.fetch_add(count, Ordering::Relaxed);
185    }
186
187    pub fn fetch(&self) -> u64 {
188        self.counter.load(Ordering::Relaxed)
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use crate::backend::policy::lru::lru_cache::LruCache;
195    use crate::backend::policy::lru::DefaultResourceCounter;
196    use crate::create_loading_cache_with_metrics;
197    use crate::loading_cache::loader::CacheLoader;
198    use crate::loading_cache::LoadingCache;
199    use async_trait::async_trait;
200    use std::sync::Arc;
201
202    #[tokio::test]
203    async fn test_metrics() {
204        let cache_policy =
205            LruCache::with_resource_counter(DefaultResourceCounter::new(3));
206        let loader = TestStringCacheLoader {
207            prefix: "file".to_string(),
208        };
209        let (loading_cache, metrics) =
210            create_loading_cache_with_metrics(cache_policy, Arc::new(loader));
211
212        assert_eq!(
213            "file1".to_string(),
214            loading_cache.get("1".to_string(), ()).await
215        );
216        assert_eq!(
217            "file2".to_string(),
218            loading_cache.get("2".to_string(), ()).await
219        );
220        assert_eq!(
221            "file3".to_string(),
222            loading_cache.get("3".to_string(), ()).await
223        );
224        assert_eq!(3, metrics.get_miss_count());
225
226        assert_eq!(
227            "file4".to_string(),
228            loading_cache.get("4".to_string(), ()).await
229        );
230        assert_eq!(0, metrics.get_hit_count());
231        assert_eq!(4, metrics.get_miss_count());
232        assert_eq!(4, metrics.put_count());
233        assert_eq!(1, metrics.eviction_count());
234
235        assert!(loading_cache.get_if_present("1".to_string()).is_none());
236        assert_eq!(0, metrics.get_hit_count());
237        assert_eq!(5, metrics.get_miss_count());
238        assert_eq!(4, metrics.put_count());
239        assert_eq!(1, metrics.eviction_count());
240
241        loading_cache
242            .put("2".to_string(), "file2-bak".to_string())
243            .await;
244        assert_eq!(0, metrics.get_hit_count());
245        assert_eq!(5, metrics.get_miss_count());
246        assert_eq!(5, metrics.put_count());
247        assert_eq!(1, metrics.eviction_count());
248
249        assert_eq!(
250            "file5".to_string(),
251            loading_cache.get("5".to_string(), ()).await
252        );
253        assert_eq!(0, metrics.get_hit_count());
254        assert_eq!(6, metrics.get_miss_count());
255        assert_eq!(6, metrics.put_count());
256        assert_eq!(2, metrics.eviction_count());
257
258        assert!(loading_cache.get_if_present("3".to_string()).is_none());
259        assert_eq!(0, metrics.get_hit_count());
260        assert_eq!(7, metrics.get_miss_count());
261        assert_eq!(6, metrics.put_count());
262        assert_eq!(2, metrics.eviction_count());
263
264        assert!(loading_cache.get_if_present("2".to_string()).is_some());
265        assert_eq!(1, metrics.get_hit_count());
266        assert_eq!(7, metrics.get_miss_count());
267        assert_eq!(6, metrics.put_count());
268        assert_eq!(2, metrics.eviction_count());
269
270        loading_cache.invalidate("2".to_string());
271        assert_eq!(1, metrics.get_hit_count());
272        assert_eq!(7, metrics.get_miss_count());
273        assert_eq!(6, metrics.put_count());
274        assert_eq!(3, metrics.eviction_count());
275    }
276
277    #[derive(Debug)]
278    struct TestStringCacheLoader {
279        prefix: String,
280    }
281
282    #[async_trait]
283    impl CacheLoader for TestStringCacheLoader {
284        type K = String;
285        type V = String;
286        type Extra = ();
287
288        async fn load(&self, k: Self::K, _extra: Self::Extra) -> Self::V {
289            format!("{}{k}", self.prefix)
290        }
291    }
292}