1use std::fmt::Display;
2use std::fmt::{self};
3use std::hash::Hash;
4use std::num::NonZeroUsize;
5use std::sync::Arc;
6use std::sync::atomic::AtomicI64;
7use std::sync::atomic::Ordering;
8
9use lru::LruCache;
10use opentelemetry::KeyValue;
11use opentelemetry::metrics::MeterProvider;
12use opentelemetry::metrics::ObservableGauge;
13use serde::Serialize;
14use serde::de::DeserializeOwned;
15use tokio::sync::Mutex;
16use tokio::time::Instant;
17use tower::BoxError;
18
19use super::redis::*;
20use crate::configuration::RedisCache;
21use crate::metrics;
22use crate::plugins::telemetry::config_new::instruments::METER_NAME;
23
24pub(crate) trait KeyType:
25 Clone + fmt::Debug + fmt::Display + Hash + Eq + Send + Sync
26{
27}
28pub(crate) trait ValueType:
29 Clone + fmt::Debug + Send + Sync + Serialize + DeserializeOwned
30{
31 fn estimated_size(&self) -> Option<usize> {
33 None
34 }
35}
36
37impl<K> KeyType for K
39where
40 K: Clone + fmt::Debug + fmt::Display + Hash + Eq + Send + Sync,
41{
42 }
45
46pub(crate) type InMemoryCache<K, V> = Arc<Mutex<LruCache<K, V>>>;
47
48#[derive(Clone)]
53pub(crate) struct CacheStorage<K: KeyType, V: ValueType> {
54 caller: &'static str,
55 inner: Arc<Mutex<LruCache<K, V>>>,
56 redis: Option<RedisCacheStorage>,
57 cache_size: Arc<AtomicI64>,
58 cache_estimated_storage: Arc<AtomicI64>,
59 cache_size_gauge: Arc<parking_lot::Mutex<Option<ObservableGauge<i64>>>>,
61 cache_estimated_storage_gauge: Arc<parking_lot::Mutex<Option<ObservableGauge<i64>>>>,
62}
63
64impl<K, V> CacheStorage<K, V>
65where
66 K: KeyType,
67 V: ValueType,
68{
69 pub(crate) async fn new(
70 max_capacity: NonZeroUsize,
71 config: Option<RedisCache>,
72 caller: &'static str,
73 ) -> Result<Self, BoxError> {
74 Ok(Self {
75 cache_size_gauge: Default::default(),
76 cache_estimated_storage_gauge: Default::default(),
77 cache_size: Default::default(),
78 cache_estimated_storage: Default::default(),
79 caller,
80 inner: Arc::new(Mutex::new(LruCache::new(max_capacity))),
81 redis: if let Some(config) = config {
82 let required_to_start = config.required_to_start;
83 match RedisCacheStorage::new(config, caller).await {
84 Err(e) => {
85 tracing::error!(
86 cache = caller,
87 e,
88 "could not open connection to Redis for caching",
89 );
90 if required_to_start {
91 return Err(e);
92 }
93 None
94 }
95 Ok(storage) => Some(storage),
96 }
97 } else {
98 None
99 },
100 })
101 }
102
103 pub(crate) fn new_in_memory(max_capacity: NonZeroUsize, caller: &'static str) -> Self {
104 Self {
105 cache_size_gauge: Default::default(),
106 cache_estimated_storage_gauge: Default::default(),
107 cache_size: Default::default(),
108 cache_estimated_storage: Default::default(),
109 caller,
110 inner: Arc::new(Mutex::new(LruCache::new(max_capacity))),
111 redis: None,
112 }
113 }
114
115 fn create_cache_size_gauge(&self) -> ObservableGauge<i64> {
116 let meter: opentelemetry::metrics::Meter = metrics::meter_provider().meter(METER_NAME);
117 let current_cache_size_for_gauge = self.cache_size.clone();
118 let caller = self.caller;
119 meter
120 .i64_observable_gauge("apollo.router.cache.size")
121 .with_description("Cache size")
122 .with_callback(move |i| {
123 i.observe(
124 current_cache_size_for_gauge.load(Ordering::SeqCst),
125 &[
126 KeyValue::new("kind", caller),
127 KeyValue::new("type", "memory"),
128 ],
129 )
130 })
131 .init()
132 }
133
134 fn create_cache_estimated_storage_size_gauge(&self) -> ObservableGauge<i64> {
135 let meter: opentelemetry::metrics::Meter = metrics::meter_provider().meter(METER_NAME);
136 let cache_estimated_storage_for_gauge = self.cache_estimated_storage.clone();
137 let caller = self.caller;
138
139 meter
140 .i64_observable_gauge("apollo.router.cache.storage.estimated_size")
141 .with_description("Estimated cache storage")
142 .with_unit("bytes")
143 .with_callback(move |i| {
144 let value = cache_estimated_storage_for_gauge.load(Ordering::SeqCst);
146 if value > 0 {
147 i.observe(
148 cache_estimated_storage_for_gauge.load(Ordering::SeqCst),
149 &[
150 KeyValue::new("kind", caller),
151 KeyValue::new("type", "memory"),
152 ],
153 )
154 }
155 })
156 .init()
157 }
158
159 pub(crate) async fn get(
162 &self,
163 key: &K,
164 mut init_from_redis: impl FnMut(&mut V) -> Result<(), String>,
165 ) -> Option<V> {
166 let instant_memory = Instant::now();
167 let res = self.inner.lock().await.get(key).cloned();
168
169 match res {
170 Some(v) => {
171 let duration = instant_memory.elapsed();
172 f64_histogram!(
173 "apollo.router.cache.hit.time",
174 "Time to get a value from the cache in seconds",
175 duration.as_secs_f64(),
176 kind = self.caller,
177 storage = CacheStorageName::Memory.to_string()
178 );
179 Some(v)
180 }
181 None => {
182 let duration = instant_memory.elapsed();
183 f64_histogram!(
184 "apollo.router.cache.miss.time",
185 "Time to check the cache for an uncached value in seconds",
186 duration.as_secs_f64(),
187 kind = self.caller,
188 storage = CacheStorageName::Memory.to_string()
189 );
190
191 let instant_redis = Instant::now();
192 if let Some(redis) = self.redis.as_ref() {
193 let inner_key = RedisKey(key.clone());
194 let redis_value = redis.get(inner_key).await.ok().and_then(|mut v| {
195 match init_from_redis(&mut v.0) {
196 Ok(()) => Some(v),
197 Err(e) => {
198 tracing::error!("Invalid value from Redis cache: {e}");
199 None
200 }
201 }
202 });
203 match redis_value {
204 Some(v) => {
205 self.insert_in_memory(key.clone(), v.0.clone()).await;
206
207 let duration = instant_redis.elapsed();
208 f64_histogram!(
209 "apollo.router.cache.hit.time",
210 "Time to get a value from the cache in seconds",
211 duration.as_secs_f64(),
212 kind = self.caller,
213 storage = CacheStorageName::Redis.to_string()
214 );
215 Some(v.0)
216 }
217 None => {
218 let duration = instant_redis.elapsed();
219 f64_histogram!(
220 "apollo.router.cache.miss.time",
221 "Time to check the cache for an uncached value in seconds",
222 duration.as_secs_f64(),
223 kind = self.caller,
224 storage = CacheStorageName::Redis.to_string()
225 );
226 None
227 }
228 }
229 } else {
230 None
231 }
232 }
233 }
234 }
235
236 pub(crate) async fn insert(&self, key: K, value: V) {
237 if let Some(redis) = self.redis.as_ref() {
238 redis
239 .insert(RedisKey(key.clone()), RedisValue(value.clone()), None)
240 .await;
241 }
242
243 self.insert_in_memory(key, value).await;
244 }
245
246 pub(crate) async fn insert_in_memory(&self, key: K, value: V)
247 where
248 V: ValueType,
249 {
250 let new_value_size = value.estimated_size().unwrap_or(0) as i64;
253
254 let (old_value, length) = {
255 let mut in_memory = self.inner.lock().await;
256 (in_memory.push(key, value), in_memory.len())
257 };
258
259 let size_delta = match old_value {
260 Some((_, old_value)) => {
261 let old_value_size = old_value.estimated_size().unwrap_or(0) as i64;
262 new_value_size - old_value_size
263 }
264 None => new_value_size,
265 };
266 self.cache_estimated_storage
267 .fetch_add(size_delta, Ordering::SeqCst);
268
269 self.cache_size.store(length as i64, Ordering::SeqCst);
270 }
271
272 pub(crate) fn in_memory_cache(&self) -> InMemoryCache<K, V> {
273 self.inner.clone()
274 }
275
276 #[cfg(test)]
277 pub(crate) async fn len(&self) -> usize {
278 self.inner.lock().await.len()
279 }
280
281 pub(crate) fn activate(&self) {
282 *self.cache_size_gauge.lock() = Some(self.create_cache_size_gauge());
285 *self.cache_estimated_storage_gauge.lock() =
286 Some(self.create_cache_estimated_storage_size_gauge());
287 }
288}
289
290enum CacheStorageName {
291 Redis,
292 Memory,
293}
294
295impl Display for CacheStorageName {
296 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
297 match self {
298 CacheStorageName::Redis => write!(f, "redis"),
299 CacheStorageName::Memory => write!(f, "memory"),
300 }
301 }
302}
303
304impl ValueType for String {
305 fn estimated_size(&self) -> Option<usize> {
306 Some(self.len())
307 }
308}
309
310impl ValueType for crate::graphql::Response {
311 fn estimated_size(&self) -> Option<usize> {
312 None
313 }
314}
315
316impl ValueType for usize {
317 fn estimated_size(&self) -> Option<usize> {
318 Some(std::mem::size_of::<usize>())
319 }
320}
321
322#[cfg(test)]
323mod test {
324 use std::num::NonZeroUsize;
325
326 use crate::cache::estimate_size;
327 use crate::cache::storage::CacheStorage;
328 use crate::cache::storage::ValueType;
329 use crate::metrics::FutureMetricsExt;
330
331 #[tokio::test]
332 async fn test_metrics() {
333 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
334 struct Stuff {}
335 impl ValueType for Stuff {
336 fn estimated_size(&self) -> Option<usize> {
337 Some(1)
338 }
339 }
340
341 async {
342 let cache: CacheStorage<String, Stuff> =
343 CacheStorage::new(NonZeroUsize::new(10).unwrap(), None, "test")
344 .await
345 .unwrap();
346 cache.activate();
347
348 cache.insert("test".to_string(), Stuff {}).await;
349 assert_gauge!(
350 "apollo.router.cache.storage.estimated_size",
351 1,
352 "kind" = "test",
353 "type" = "memory"
354 );
355 assert_gauge!(
356 "apollo.router.cache.size",
357 1,
358 "kind" = "test",
359 "type" = "memory"
360 );
361 }
362 .with_metrics()
363 .await;
364 }
365
366 #[tokio::test]
367 #[should_panic]
368 async fn test_metrics_not_emitted_where_no_estimated_size() {
369 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
370 struct Stuff {}
371 impl ValueType for Stuff {
372 fn estimated_size(&self) -> Option<usize> {
373 None
374 }
375 }
376
377 async {
378 let cache: CacheStorage<String, Stuff> =
379 CacheStorage::new(NonZeroUsize::new(10).unwrap(), None, "test")
380 .await
381 .unwrap();
382 cache.activate();
383
384 cache.insert("test".to_string(), Stuff {}).await;
385 assert_gauge!(
387 "apollo.router.cache.size",
388 0,
389 "kind" = "test",
390 "type" = "memory"
391 );
392 }
393 .with_metrics()
394 .await;
395 }
396
397 #[tokio::test]
398 async fn test_metrics_eviction() {
399 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
400 struct Stuff {
401 test: String,
402 }
403 impl ValueType for Stuff {
404 fn estimated_size(&self) -> Option<usize> {
405 Some(estimate_size(self))
406 }
407 }
408
409 async {
410 let cache: CacheStorage<String, Stuff> =
413 CacheStorage::new(NonZeroUsize::new(1).unwrap(), None, "test")
414 .await
415 .unwrap();
416 cache.activate();
417
418 cache
419 .insert(
420 "test".to_string(),
421 Stuff {
422 test: "test".to_string(),
423 },
424 )
425 .await;
426 assert_gauge!(
427 "apollo.router.cache.storage.estimated_size",
428 28,
429 "kind" = "test",
430 "type" = "memory"
431 );
432 assert_gauge!(
433 "apollo.router.cache.size",
434 1,
435 "kind" = "test",
436 "type" = "memory"
437 );
438
439 cache
441 .insert(
442 "test".to_string(),
443 Stuff {
444 test: "test_extended".to_string(),
445 },
446 )
447 .await;
448 assert_gauge!(
449 "apollo.router.cache.storage.estimated_size",
450 37,
451 "kind" = "test",
452 "type" = "memory"
453 );
454 assert_gauge!(
455 "apollo.router.cache.size",
456 1,
457 "kind" = "test",
458 "type" = "memory"
459 );
460
461 cache
463 .insert(
464 "test2".to_string(),
465 Stuff {
466 test: "test".to_string(),
467 },
468 )
469 .await;
470 assert_gauge!(
471 "apollo.router.cache.storage.estimated_size",
472 28,
473 "kind" = "test",
474 "type" = "memory"
475 );
476 assert_gauge!(
477 "apollo.router.cache.size",
478 1,
479 "kind" = "test",
480 "type" = "memory"
481 );
482 }
483 .with_metrics()
484 .await;
485 }
486}