1use std::fmt::Display;
2use std::fmt::{self};
3use std::hash::Hash;
4use std::num::NonZeroUsize;
5use std::sync::Arc;
6use std::sync::atomic::AtomicI64;
7use std::sync::atomic::Ordering;
8
9use lru::LruCache;
10use opentelemetry::KeyValue;
11use opentelemetry::metrics::MeterProvider;
12use opentelemetry::metrics::ObservableGauge;
13use serde::Serialize;
14use serde::de::DeserializeOwned;
15use tokio::sync::Mutex;
16use tokio::time::Instant;
17use tower::BoxError;
18
19use super::redis::*;
20use crate::configuration::RedisCache;
21use crate::metrics;
22use crate::plugins::telemetry::config_new::instruments::METER_NAME;
23
24pub(crate) trait KeyType:
25 Clone + fmt::Debug + fmt::Display + Hash + Eq + Send + Sync
26{
27}
28pub(crate) trait ValueType:
29 Clone + fmt::Debug + Send + Sync + Serialize + DeserializeOwned
30{
31 fn estimated_size(&self) -> Option<usize> {
33 None
34 }
35}
36
37impl<K> KeyType for K
39where
40 K: Clone + fmt::Debug + fmt::Display + Hash + Eq + Send + Sync,
41{
42 }
45
46pub(crate) type InMemoryCache<K, V> = Arc<Mutex<LruCache<K, V>>>;
47
48#[derive(Clone)]
53pub(crate) struct CacheStorage<K: KeyType, V: ValueType> {
54 caller: &'static str,
55 inner: Arc<Mutex<LruCache<K, V>>>,
56 redis: Option<RedisCacheStorage>,
57 cache_size: Arc<AtomicI64>,
58 cache_estimated_storage: Arc<AtomicI64>,
59 cache_size_gauge: Arc<parking_lot::Mutex<Option<ObservableGauge<i64>>>>,
61 cache_estimated_storage_gauge: Arc<parking_lot::Mutex<Option<ObservableGauge<i64>>>>,
62}
63
64impl<K, V> CacheStorage<K, V>
65where
66 K: KeyType,
67 V: ValueType,
68{
69 pub(crate) async fn new(
70 max_capacity: NonZeroUsize,
71 config: Option<RedisCache>,
72 caller: &'static str,
73 ) -> Result<Self, BoxError> {
74 Ok(Self {
75 cache_size_gauge: Default::default(),
76 cache_estimated_storage_gauge: Default::default(),
77 cache_size: Default::default(),
78 cache_estimated_storage: Default::default(),
79 caller,
80 inner: Arc::new(Mutex::new(LruCache::new(max_capacity))),
81 redis: if let Some(config) = config {
82 let required_to_start = config.required_to_start;
83 match RedisCacheStorage::new(config, caller).await {
84 Err(e) => {
85 tracing::error!(
86 cache = caller,
87 e,
88 "could not open connection to Redis for caching",
89 );
90 if required_to_start {
91 return Err(e);
92 }
93 None
94 }
95 Ok(storage) => Some(storage),
96 }
97 } else {
98 None
99 },
100 })
101 }
102
103 pub(crate) fn new_in_memory(max_capacity: NonZeroUsize, caller: &'static str) -> Self {
104 Self {
105 cache_size_gauge: Default::default(),
106 cache_estimated_storage_gauge: Default::default(),
107 cache_size: Default::default(),
108 cache_estimated_storage: Default::default(),
109 caller,
110 inner: Arc::new(Mutex::new(LruCache::new(max_capacity))),
111 redis: None,
112 }
113 }
114
115 fn create_cache_size_gauge(&self) -> ObservableGauge<i64> {
116 let meter: opentelemetry::metrics::Meter = metrics::meter_provider().meter(METER_NAME);
117 let current_cache_size_for_gauge = self.cache_size.clone();
118 let caller = self.caller;
119 meter
120 .i64_observable_gauge("apollo.router.cache.size")
121 .with_description("Cache size")
122 .with_callback(move |i| {
123 i.observe(
124 current_cache_size_for_gauge.load(Ordering::SeqCst),
125 &[
126 KeyValue::new("kind", caller),
127 KeyValue::new("type", "memory"),
128 ],
129 )
130 })
131 .build()
132 }
133
134 fn create_cache_estimated_storage_size_gauge(&self) -> ObservableGauge<i64> {
135 let meter: opentelemetry::metrics::Meter = metrics::meter_provider().meter(METER_NAME);
136 let cache_estimated_storage_for_gauge = self.cache_estimated_storage.clone();
137 let caller = self.caller;
138
139 meter
140 .i64_observable_gauge("apollo.router.cache.storage.estimated_size")
141 .with_description("Estimated cache storage")
142 .with_unit("bytes")
143 .with_callback(move |i| {
144 let value = cache_estimated_storage_for_gauge.load(Ordering::SeqCst);
146 if value > 0 {
147 i.observe(
148 cache_estimated_storage_for_gauge.load(Ordering::SeqCst),
149 &[
150 KeyValue::new("kind", caller),
151 KeyValue::new("type", "memory"),
152 ],
153 )
154 }
155 })
156 .build()
157 }
158
159 pub(crate) async fn get(
162 &self,
163 key: &K,
164 mut init_from_redis: impl FnMut(&mut V) -> Result<(), String>,
165 ) -> Option<V> {
166 let instant_memory = Instant::now();
167 let res = self.inner.lock().await.get(key).cloned();
168
169 match res {
170 Some(v) => {
171 let duration = instant_memory.elapsed();
172 f64_histogram!(
173 "apollo.router.cache.hit.time",
174 "Time to get a value from the cache in seconds",
175 duration.as_secs_f64(),
176 kind = self.caller,
177 storage = CacheStorageName::Memory.to_string()
178 );
179 Some(v)
180 }
181 None => {
182 let duration = instant_memory.elapsed();
183 f64_histogram!(
184 "apollo.router.cache.miss.time",
185 "Time to check the cache for an uncached value in seconds",
186 duration.as_secs_f64(),
187 kind = self.caller,
188 storage = CacheStorageName::Memory.to_string()
189 );
190
191 let instant_redis = Instant::now();
192 if let Some(redis) = self.redis.as_ref() {
193 let inner_key = RedisKey(key.clone());
194 let redis_value = redis.get(inner_key).await.ok().and_then(|mut v| {
195 match init_from_redis(&mut v.0) {
196 Ok(()) => Some(v),
197 Err(e) => {
198 tracing::error!("Invalid value from Redis cache: {e}");
199 None
200 }
201 }
202 });
203 match redis_value {
204 Some(v) => {
205 self.insert_in_memory(key.clone(), v.0.clone()).await;
206
207 let duration = instant_redis.elapsed();
208 f64_histogram!(
209 "apollo.router.cache.hit.time",
210 "Time to get a value from the cache in seconds",
211 duration.as_secs_f64(),
212 kind = self.caller,
213 storage = CacheStorageName::Redis.to_string()
214 );
215 Some(v.0)
216 }
217 None => {
218 let duration = instant_redis.elapsed();
219 f64_histogram!(
220 "apollo.router.cache.miss.time",
221 "Time to check the cache for an uncached value in seconds",
222 duration.as_secs_f64(),
223 kind = self.caller,
224 storage = CacheStorageName::Redis.to_string()
225 );
226 None
227 }
228 }
229 } else {
230 None
231 }
232 }
233 }
234 }
235
236 pub(crate) async fn insert(&self, key: K, value: V) {
237 if let Some(redis) = self.redis.as_ref() {
238 redis
239 .insert(RedisKey(key.clone()), RedisValue(value.clone()), None)
240 .await;
241 }
242
243 self.insert_in_memory(key, value).await;
244 }
245
246 pub(crate) async fn insert_in_memory(&self, key: K, value: V)
247 where
248 V: ValueType,
249 {
250 let new_value_size = value.estimated_size().unwrap_or(0) as i64;
253
254 let (old_value, length) = {
255 let mut in_memory = self.inner.lock().await;
256 (in_memory.push(key, value), in_memory.len())
257 };
258
259 let size_delta = match old_value {
260 Some((_, old_value)) => {
261 let old_value_size = old_value.estimated_size().unwrap_or(0) as i64;
262 new_value_size - old_value_size
263 }
264 None => new_value_size,
265 };
266 self.cache_estimated_storage
267 .fetch_add(size_delta, Ordering::SeqCst);
268
269 self.cache_size.store(length as i64, Ordering::SeqCst);
270 }
271
272 pub(crate) fn in_memory_cache(&self) -> InMemoryCache<K, V> {
273 self.inner.clone()
274 }
275
276 #[cfg(test)]
277 pub(crate) async fn len(&self) -> usize {
278 self.inner.lock().await.len()
279 }
280
281 pub(crate) fn activate(&self) {
282 *self.cache_size_gauge.lock() = Some(self.create_cache_size_gauge());
285 *self.cache_estimated_storage_gauge.lock() =
286 Some(self.create_cache_estimated_storage_size_gauge());
287
288 if let Some(redis) = &self.redis {
290 redis.activate();
291 }
292 }
293}
294
295enum CacheStorageName {
296 Redis,
297 Memory,
298}
299
300impl Display for CacheStorageName {
301 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
302 match self {
303 CacheStorageName::Redis => write!(f, "redis"),
304 CacheStorageName::Memory => write!(f, "memory"),
305 }
306 }
307}
308
309impl ValueType for String {
310 fn estimated_size(&self) -> Option<usize> {
311 Some(self.len())
312 }
313}
314
315impl ValueType for crate::graphql::Response {
316 fn estimated_size(&self) -> Option<usize> {
317 None
318 }
319}
320
321impl ValueType for usize {
322 fn estimated_size(&self) -> Option<usize> {
323 Some(std::mem::size_of::<usize>())
324 }
325}
326
327#[cfg(test)]
328mod test {
329 use std::num::NonZeroUsize;
330
331 use crate::cache::estimate_size;
332 use crate::cache::storage::CacheStorage;
333 use crate::cache::storage::ValueType;
334 use crate::metrics::FutureMetricsExt;
335
336 #[tokio::test]
337 async fn test_metrics() {
338 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
339 struct Stuff {}
340 impl ValueType for Stuff {
341 fn estimated_size(&self) -> Option<usize> {
342 Some(1)
343 }
344 }
345
346 async {
347 let cache: CacheStorage<String, Stuff> =
348 CacheStorage::new(NonZeroUsize::new(10).unwrap(), None, "test")
349 .await
350 .unwrap();
351 cache.activate();
352
353 cache.insert("test".to_string(), Stuff {}).await;
354 assert_gauge!(
355 "apollo.router.cache.storage.estimated_size",
356 1,
357 "kind" = "test",
358 "type" = "memory"
359 );
360 assert_gauge!(
361 "apollo.router.cache.size",
362 1,
363 "kind" = "test",
364 "type" = "memory"
365 );
366 }
367 .with_metrics()
368 .await;
369 }
370
371 #[tokio::test]
372 #[should_panic]
373 async fn test_metrics_not_emitted_where_no_estimated_size() {
374 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
375 struct Stuff {}
376 impl ValueType for Stuff {
377 fn estimated_size(&self) -> Option<usize> {
378 None
379 }
380 }
381
382 async {
383 let cache: CacheStorage<String, Stuff> =
384 CacheStorage::new(NonZeroUsize::new(10).unwrap(), None, "test")
385 .await
386 .unwrap();
387 cache.activate();
388
389 cache.insert("test".to_string(), Stuff {}).await;
390 assert_gauge!(
392 "apollo.router.cache.size",
393 0,
394 "kind" = "test",
395 "type" = "memory"
396 );
397 }
398 .with_metrics()
399 .await;
400 }
401
402 #[tokio::test]
403 async fn test_metrics_eviction() {
404 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
405 struct Stuff {
406 test: String,
407 }
408 impl ValueType for Stuff {
409 fn estimated_size(&self) -> Option<usize> {
410 Some(estimate_size(self))
411 }
412 }
413
414 async {
415 let cache: CacheStorage<String, Stuff> =
418 CacheStorage::new(NonZeroUsize::new(1).unwrap(), None, "test")
419 .await
420 .unwrap();
421 cache.activate();
422
423 cache
424 .insert(
425 "test".to_string(),
426 Stuff {
427 test: "test".to_string(),
428 },
429 )
430 .await;
431 assert_gauge!(
432 "apollo.router.cache.storage.estimated_size",
433 28,
434 "kind" = "test",
435 "type" = "memory"
436 );
437 assert_gauge!(
438 "apollo.router.cache.size",
439 1,
440 "kind" = "test",
441 "type" = "memory"
442 );
443
444 cache
446 .insert(
447 "test".to_string(),
448 Stuff {
449 test: "test_extended".to_string(),
450 },
451 )
452 .await;
453 assert_gauge!(
454 "apollo.router.cache.storage.estimated_size",
455 37,
456 "kind" = "test",
457 "type" = "memory"
458 );
459 assert_gauge!(
460 "apollo.router.cache.size",
461 1,
462 "kind" = "test",
463 "type" = "memory"
464 );
465
466 cache
468 .insert(
469 "test2".to_string(),
470 Stuff {
471 test: "test".to_string(),
472 },
473 )
474 .await;
475 assert_gauge!(
476 "apollo.router.cache.storage.estimated_size",
477 28,
478 "kind" = "test",
479 "type" = "memory"
480 );
481 assert_gauge!(
482 "apollo.router.cache.size",
483 1,
484 "kind" = "test",
485 "type" = "memory"
486 );
487 }
488 .with_metrics()
489 .await;
490 }
491}