apollo_router/cache/
storage.rs

1use std::fmt::Display;
2use std::fmt::{self};
3use std::hash::Hash;
4use std::num::NonZeroUsize;
5use std::sync::Arc;
6use std::sync::atomic::AtomicI64;
7use std::sync::atomic::Ordering;
8
9use lru::LruCache;
10use opentelemetry::KeyValue;
11use opentelemetry::metrics::MeterProvider;
12use opentelemetry::metrics::ObservableGauge;
13use serde::Serialize;
14use serde::de::DeserializeOwned;
15use tokio::sync::Mutex;
16use tokio::time::Instant;
17use tower::BoxError;
18
19use super::redis::*;
20use crate::configuration::RedisCache;
21use crate::metrics;
22use crate::plugins::telemetry::config_new::instruments::METER_NAME;
23
24pub(crate) trait KeyType:
25    Clone + fmt::Debug + fmt::Display + Hash + Eq + Send + Sync
26{
27}
28pub(crate) trait ValueType:
29    Clone + fmt::Debug + Send + Sync + Serialize + DeserializeOwned
30{
31    /// Returns an estimated size of the cache entry in bytes.
32    fn estimated_size(&self) -> Option<usize> {
33        None
34    }
35}
36
37// Blanket implementation which satisfies the compiler
38impl<K> KeyType for K
39where
40    K: Clone + fmt::Debug + fmt::Display + Hash + Eq + Send + Sync,
41{
42    // Nothing to implement, since K already supports the other traits.
43    // It has the functions it needs already
44}
45
46pub(crate) type InMemoryCache<K, V> = Arc<Mutex<LruCache<K, V>>>;
47
48// placeholder storage module
49//
50// this will be replaced by the multi level (in memory + redis/memcached) once we find
51// a suitable implementation.
52#[derive(Clone)]
53pub(crate) struct CacheStorage<K: KeyType, V: ValueType> {
54    caller: &'static str,
55    inner: Arc<Mutex<LruCache<K, V>>>,
56    redis: Option<RedisCacheStorage>,
57    cache_size: Arc<AtomicI64>,
58    cache_estimated_storage: Arc<AtomicI64>,
59    // It's OK for these to be mutexes as they are only initialized once
60    cache_size_gauge: Arc<parking_lot::Mutex<Option<ObservableGauge<i64>>>>,
61    cache_estimated_storage_gauge: Arc<parking_lot::Mutex<Option<ObservableGauge<i64>>>>,
62}
63
64impl<K, V> CacheStorage<K, V>
65where
66    K: KeyType,
67    V: ValueType,
68{
69    pub(crate) async fn new(
70        max_capacity: NonZeroUsize,
71        config: Option<RedisCache>,
72        caller: &'static str,
73    ) -> Result<Self, BoxError> {
74        Ok(Self {
75            cache_size_gauge: Default::default(),
76            cache_estimated_storage_gauge: Default::default(),
77            cache_size: Default::default(),
78            cache_estimated_storage: Default::default(),
79            caller,
80            inner: Arc::new(Mutex::new(LruCache::new(max_capacity))),
81            redis: if let Some(config) = config {
82                let required_to_start = config.required_to_start;
83                match RedisCacheStorage::new(config, caller).await {
84                    Err(e) => {
85                        tracing::error!(
86                            cache = caller,
87                            e,
88                            "could not open connection to Redis for caching",
89                        );
90                        if required_to_start {
91                            return Err(e);
92                        }
93                        None
94                    }
95                    Ok(storage) => Some(storage),
96                }
97            } else {
98                None
99            },
100        })
101    }
102
103    pub(crate) fn new_in_memory(max_capacity: NonZeroUsize, caller: &'static str) -> Self {
104        Self {
105            cache_size_gauge: Default::default(),
106            cache_estimated_storage_gauge: Default::default(),
107            cache_size: Default::default(),
108            cache_estimated_storage: Default::default(),
109            caller,
110            inner: Arc::new(Mutex::new(LruCache::new(max_capacity))),
111            redis: None,
112        }
113    }
114
115    fn create_cache_size_gauge(&self) -> ObservableGauge<i64> {
116        let meter: opentelemetry::metrics::Meter = metrics::meter_provider().meter(METER_NAME);
117        let current_cache_size_for_gauge = self.cache_size.clone();
118        let caller = self.caller;
119        meter
120            .i64_observable_gauge("apollo.router.cache.size")
121            .with_description("Cache size")
122            .with_callback(move |i| {
123                i.observe(
124                    current_cache_size_for_gauge.load(Ordering::SeqCst),
125                    &[
126                        KeyValue::new("kind", caller),
127                        KeyValue::new("type", "memory"),
128                    ],
129                )
130            })
131            .init()
132    }
133
134    fn create_cache_estimated_storage_size_gauge(&self) -> ObservableGauge<i64> {
135        let meter: opentelemetry::metrics::Meter = metrics::meter_provider().meter(METER_NAME);
136        let cache_estimated_storage_for_gauge = self.cache_estimated_storage.clone();
137        let caller = self.caller;
138
139        meter
140            .i64_observable_gauge("apollo.router.cache.storage.estimated_size")
141            .with_description("Estimated cache storage")
142            .with_unit("bytes")
143            .with_callback(move |i| {
144                // If there's no storage then don't bother updating the gauge
145                let value = cache_estimated_storage_for_gauge.load(Ordering::SeqCst);
146                if value > 0 {
147                    i.observe(
148                        cache_estimated_storage_for_gauge.load(Ordering::SeqCst),
149                        &[
150                            KeyValue::new("kind", caller),
151                            KeyValue::new("type", "memory"),
152                        ],
153                    )
154                }
155            })
156            .init()
157    }
158
159    /// `init_from_redis` is called with values newly deserialized from Redis cache
160    /// if an error is returned, the value is ignored and considered a cache miss.
161    pub(crate) async fn get(
162        &self,
163        key: &K,
164        mut init_from_redis: impl FnMut(&mut V) -> Result<(), String>,
165    ) -> Option<V> {
166        let instant_memory = Instant::now();
167        let res = self.inner.lock().await.get(key).cloned();
168
169        match res {
170            Some(v) => {
171                let duration = instant_memory.elapsed();
172                f64_histogram!(
173                    "apollo.router.cache.hit.time",
174                    "Time to get a value from the cache in seconds",
175                    duration.as_secs_f64(),
176                    kind = self.caller,
177                    storage = CacheStorageName::Memory.to_string()
178                );
179                Some(v)
180            }
181            None => {
182                let duration = instant_memory.elapsed();
183                f64_histogram!(
184                    "apollo.router.cache.miss.time",
185                    "Time to check the cache for an uncached value in seconds",
186                    duration.as_secs_f64(),
187                    kind = self.caller,
188                    storage = CacheStorageName::Memory.to_string()
189                );
190
191                let instant_redis = Instant::now();
192                if let Some(redis) = self.redis.as_ref() {
193                    let inner_key = RedisKey(key.clone());
194                    let redis_value = redis.get(inner_key).await.ok().and_then(|mut v| {
195                        match init_from_redis(&mut v.0) {
196                            Ok(()) => Some(v),
197                            Err(e) => {
198                                tracing::error!("Invalid value from Redis cache: {e}");
199                                None
200                            }
201                        }
202                    });
203                    match redis_value {
204                        Some(v) => {
205                            self.insert_in_memory(key.clone(), v.0.clone()).await;
206
207                            let duration = instant_redis.elapsed();
208                            f64_histogram!(
209                                "apollo.router.cache.hit.time",
210                                "Time to get a value from the cache in seconds",
211                                duration.as_secs_f64(),
212                                kind = self.caller,
213                                storage = CacheStorageName::Redis.to_string()
214                            );
215                            Some(v.0)
216                        }
217                        None => {
218                            let duration = instant_redis.elapsed();
219                            f64_histogram!(
220                                "apollo.router.cache.miss.time",
221                                "Time to check the cache for an uncached value in seconds",
222                                duration.as_secs_f64(),
223                                kind = self.caller,
224                                storage = CacheStorageName::Redis.to_string()
225                            );
226                            None
227                        }
228                    }
229                } else {
230                    None
231                }
232            }
233        }
234    }
235
236    pub(crate) async fn insert(&self, key: K, value: V) {
237        if let Some(redis) = self.redis.as_ref() {
238            redis
239                .insert(RedisKey(key.clone()), RedisValue(value.clone()), None)
240                .await;
241        }
242
243        self.insert_in_memory(key, value).await;
244    }
245
246    pub(crate) async fn insert_in_memory(&self, key: K, value: V)
247    where
248        V: ValueType,
249    {
250        // Update the cache size and estimated storage size
251        // This is cheaper than trying to estimate the cache storage size by iterating over the cache
252        let new_value_size = value.estimated_size().unwrap_or(0) as i64;
253
254        let (old_value, length) = {
255            let mut in_memory = self.inner.lock().await;
256            (in_memory.push(key, value), in_memory.len())
257        };
258
259        let size_delta = match old_value {
260            Some((_, old_value)) => {
261                let old_value_size = old_value.estimated_size().unwrap_or(0) as i64;
262                new_value_size - old_value_size
263            }
264            None => new_value_size,
265        };
266        self.cache_estimated_storage
267            .fetch_add(size_delta, Ordering::SeqCst);
268
269        self.cache_size.store(length as i64, Ordering::SeqCst);
270    }
271
272    pub(crate) fn in_memory_cache(&self) -> InMemoryCache<K, V> {
273        self.inner.clone()
274    }
275
276    #[cfg(test)]
277    pub(crate) async fn len(&self) -> usize {
278        self.inner.lock().await.len()
279    }
280
281    pub(crate) fn activate(&self) {
282        // Gauges MUST be created after the meter provider is initialized
283        // This means that on reload we need a non-fallible way to recreate the gauges, hence this function.
284        *self.cache_size_gauge.lock() = Some(self.create_cache_size_gauge());
285        *self.cache_estimated_storage_gauge.lock() =
286            Some(self.create_cache_estimated_storage_size_gauge());
287    }
288}
289
290enum CacheStorageName {
291    Redis,
292    Memory,
293}
294
295impl Display for CacheStorageName {
296    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
297        match self {
298            CacheStorageName::Redis => write!(f, "redis"),
299            CacheStorageName::Memory => write!(f, "memory"),
300        }
301    }
302}
303
304impl ValueType for String {
305    fn estimated_size(&self) -> Option<usize> {
306        Some(self.len())
307    }
308}
309
310impl ValueType for crate::graphql::Response {
311    fn estimated_size(&self) -> Option<usize> {
312        None
313    }
314}
315
316impl ValueType for usize {
317    fn estimated_size(&self) -> Option<usize> {
318        Some(std::mem::size_of::<usize>())
319    }
320}
321
322#[cfg(test)]
323mod test {
324    use std::num::NonZeroUsize;
325
326    use crate::cache::estimate_size;
327    use crate::cache::storage::CacheStorage;
328    use crate::cache::storage::ValueType;
329    use crate::metrics::FutureMetricsExt;
330
331    #[tokio::test]
332    async fn test_metrics() {
333        #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
334        struct Stuff {}
335        impl ValueType for Stuff {
336            fn estimated_size(&self) -> Option<usize> {
337                Some(1)
338            }
339        }
340
341        async {
342            let cache: CacheStorage<String, Stuff> =
343                CacheStorage::new(NonZeroUsize::new(10).unwrap(), None, "test")
344                    .await
345                    .unwrap();
346            cache.activate();
347
348            cache.insert("test".to_string(), Stuff {}).await;
349            assert_gauge!(
350                "apollo.router.cache.storage.estimated_size",
351                1,
352                "kind" = "test",
353                "type" = "memory"
354            );
355            assert_gauge!(
356                "apollo.router.cache.size",
357                1,
358                "kind" = "test",
359                "type" = "memory"
360            );
361        }
362        .with_metrics()
363        .await;
364    }
365
366    #[tokio::test]
367    #[should_panic]
368    async fn test_metrics_not_emitted_where_no_estimated_size() {
369        #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
370        struct Stuff {}
371        impl ValueType for Stuff {
372            fn estimated_size(&self) -> Option<usize> {
373                None
374            }
375        }
376
377        async {
378            let cache: CacheStorage<String, Stuff> =
379                CacheStorage::new(NonZeroUsize::new(10).unwrap(), None, "test")
380                    .await
381                    .unwrap();
382            cache.activate();
383
384            cache.insert("test".to_string(), Stuff {}).await;
385            // This metric won't exist
386            assert_gauge!(
387                "apollo.router.cache.size",
388                0,
389                "kind" = "test",
390                "type" = "memory"
391            );
392        }
393        .with_metrics()
394        .await;
395    }
396
397    #[tokio::test]
398    async fn test_metrics_eviction() {
399        #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
400        struct Stuff {
401            test: String,
402        }
403        impl ValueType for Stuff {
404            fn estimated_size(&self) -> Option<usize> {
405                Some(estimate_size(self))
406            }
407        }
408
409        async {
410            // note that the cache size is 1
411            // so the second insert will always evict
412            let cache: CacheStorage<String, Stuff> =
413                CacheStorage::new(NonZeroUsize::new(1).unwrap(), None, "test")
414                    .await
415                    .unwrap();
416            cache.activate();
417
418            cache
419                .insert(
420                    "test".to_string(),
421                    Stuff {
422                        test: "test".to_string(),
423                    },
424                )
425                .await;
426            assert_gauge!(
427                "apollo.router.cache.storage.estimated_size",
428                28,
429                "kind" = "test",
430                "type" = "memory"
431            );
432            assert_gauge!(
433                "apollo.router.cache.size",
434                1,
435                "kind" = "test",
436                "type" = "memory"
437            );
438
439            // Insert something slightly larger
440            cache
441                .insert(
442                    "test".to_string(),
443                    Stuff {
444                        test: "test_extended".to_string(),
445                    },
446                )
447                .await;
448            assert_gauge!(
449                "apollo.router.cache.storage.estimated_size",
450                37,
451                "kind" = "test",
452                "type" = "memory"
453            );
454            assert_gauge!(
455                "apollo.router.cache.size",
456                1,
457                "kind" = "test",
458                "type" = "memory"
459            );
460
461            // Even though this is a new cache entry, we should get back to where we initially were
462            cache
463                .insert(
464                    "test2".to_string(),
465                    Stuff {
466                        test: "test".to_string(),
467                    },
468                )
469                .await;
470            assert_gauge!(
471                "apollo.router.cache.storage.estimated_size",
472                28,
473                "kind" = "test",
474                "type" = "memory"
475            );
476            assert_gauge!(
477                "apollo.router.cache.size",
478                1,
479                "kind" = "test",
480                "type" = "memory"
481            );
482        }
483        .with_metrics()
484        .await;
485    }
486}