Skip to main content

apollo_router/cache/
storage.rs

1use std::fmt::Display;
2use std::fmt::{self};
3use std::hash::Hash;
4use std::num::NonZeroUsize;
5use std::sync::Arc;
6use std::sync::atomic::AtomicI64;
7use std::sync::atomic::Ordering;
8
9use lru::LruCache;
10use opentelemetry::KeyValue;
11use opentelemetry::metrics::MeterProvider;
12use opentelemetry::metrics::ObservableGauge;
13use serde::Serialize;
14use serde::de::DeserializeOwned;
15use tokio::sync::Mutex;
16use tokio::time::Instant;
17use tower::BoxError;
18
19use super::redis::*;
20use crate::configuration::RedisCache;
21use crate::metrics;
22use crate::plugins::telemetry::config_new::instruments::METER_NAME;
23
24pub(crate) trait KeyType:
25    Clone + fmt::Debug + fmt::Display + Hash + Eq + Send + Sync
26{
27}
28pub(crate) trait ValueType:
29    Clone + fmt::Debug + Send + Sync + Serialize + DeserializeOwned
30{
31    /// Returns an estimated size of the cache entry in bytes.
32    fn estimated_size(&self) -> Option<usize> {
33        None
34    }
35}
36
37// Blanket implementation which satisfies the compiler
38impl<K> KeyType for K
39where
40    K: Clone + fmt::Debug + fmt::Display + Hash + Eq + Send + Sync,
41{
42    // Nothing to implement, since K already supports the other traits.
43    // It has the functions it needs already
44}
45
46pub(crate) type InMemoryCache<K, V> = Arc<Mutex<LruCache<K, V>>>;
47
48// placeholder storage module
49//
50// this will be replaced by the multi level (in memory + redis/memcached) once we find
51// a suitable implementation.
52#[derive(Clone)]
53pub(crate) struct CacheStorage<K: KeyType, V: ValueType> {
54    caller: &'static str,
55    inner: Arc<Mutex<LruCache<K, V>>>,
56    redis: Option<RedisCacheStorage>,
57    cache_size: Arc<AtomicI64>,
58    cache_estimated_storage: Arc<AtomicI64>,
59    // It's OK for these to be mutexes as they are only initialized once
60    cache_size_gauge: Arc<parking_lot::Mutex<Option<ObservableGauge<i64>>>>,
61    cache_estimated_storage_gauge: Arc<parking_lot::Mutex<Option<ObservableGauge<i64>>>>,
62}
63
64impl<K, V> CacheStorage<K, V>
65where
66    K: KeyType,
67    V: ValueType,
68{
69    pub(crate) async fn new(
70        max_capacity: NonZeroUsize,
71        config: Option<RedisCache>,
72        caller: &'static str,
73    ) -> Result<Self, BoxError> {
74        Ok(Self {
75            cache_size_gauge: Default::default(),
76            cache_estimated_storage_gauge: Default::default(),
77            cache_size: Default::default(),
78            cache_estimated_storage: Default::default(),
79            caller,
80            inner: Arc::new(Mutex::new(LruCache::new(max_capacity))),
81            redis: if let Some(config) = config {
82                let required_to_start = config.required_to_start;
83                match RedisCacheStorage::new(config, caller).await {
84                    Err(e) => {
85                        tracing::error!(
86                            cache = caller,
87                            e,
88                            "could not open connection to Redis for caching",
89                        );
90                        if required_to_start {
91                            return Err(e);
92                        }
93                        None
94                    }
95                    Ok(storage) => Some(storage),
96                }
97            } else {
98                None
99            },
100        })
101    }
102
103    pub(crate) fn new_in_memory(max_capacity: NonZeroUsize, caller: &'static str) -> Self {
104        Self {
105            cache_size_gauge: Default::default(),
106            cache_estimated_storage_gauge: Default::default(),
107            cache_size: Default::default(),
108            cache_estimated_storage: Default::default(),
109            caller,
110            inner: Arc::new(Mutex::new(LruCache::new(max_capacity))),
111            redis: None,
112        }
113    }
114
115    fn create_cache_size_gauge(&self) -> ObservableGauge<i64> {
116        let meter: opentelemetry::metrics::Meter = metrics::meter_provider().meter(METER_NAME);
117        let current_cache_size_for_gauge = self.cache_size.clone();
118        let caller = self.caller;
119        meter
120            .i64_observable_gauge("apollo.router.cache.size")
121            .with_description("Cache size")
122            .with_callback(move |i| {
123                i.observe(
124                    current_cache_size_for_gauge.load(Ordering::SeqCst),
125                    &[
126                        KeyValue::new("kind", caller),
127                        KeyValue::new("type", "memory"),
128                    ],
129                )
130            })
131            .build()
132    }
133
134    fn create_cache_estimated_storage_size_gauge(&self) -> ObservableGauge<i64> {
135        let meter: opentelemetry::metrics::Meter = metrics::meter_provider().meter(METER_NAME);
136        let cache_estimated_storage_for_gauge = self.cache_estimated_storage.clone();
137        let caller = self.caller;
138
139        meter
140            .i64_observable_gauge("apollo.router.cache.storage.estimated_size")
141            .with_description("Estimated cache storage")
142            .with_unit("bytes")
143            .with_callback(move |i| {
144                // If there's no storage then don't bother updating the gauge
145                let value = cache_estimated_storage_for_gauge.load(Ordering::SeqCst);
146                if value > 0 {
147                    i.observe(
148                        cache_estimated_storage_for_gauge.load(Ordering::SeqCst),
149                        &[
150                            KeyValue::new("kind", caller),
151                            KeyValue::new("type", "memory"),
152                        ],
153                    )
154                }
155            })
156            .build()
157    }
158
159    /// `init_from_redis` is called with values newly deserialized from Redis cache
160    /// if an error is returned, the value is ignored and considered a cache miss.
161    pub(crate) async fn get(
162        &self,
163        key: &K,
164        mut init_from_redis: impl FnMut(&mut V) -> Result<(), String>,
165    ) -> Option<V> {
166        let instant_memory = Instant::now();
167        let res = self.inner.lock().await.get(key).cloned();
168
169        match res {
170            Some(v) => {
171                let duration = instant_memory.elapsed();
172                f64_histogram!(
173                    "apollo.router.cache.hit.time",
174                    "Time to get a value from the cache in seconds",
175                    duration.as_secs_f64(),
176                    kind = self.caller,
177                    storage = CacheStorageName::Memory.to_string()
178                );
179                Some(v)
180            }
181            None => {
182                let duration = instant_memory.elapsed();
183                f64_histogram!(
184                    "apollo.router.cache.miss.time",
185                    "Time to check the cache for an uncached value in seconds",
186                    duration.as_secs_f64(),
187                    kind = self.caller,
188                    storage = CacheStorageName::Memory.to_string()
189                );
190
191                let instant_redis = Instant::now();
192                if let Some(redis) = self.redis.as_ref() {
193                    let inner_key = RedisKey(key.clone());
194                    let redis_value = redis.get(inner_key).await.ok().and_then(|mut v| {
195                        match init_from_redis(&mut v.0) {
196                            Ok(()) => Some(v),
197                            Err(e) => {
198                                tracing::error!("Invalid value from Redis cache: {e}");
199                                None
200                            }
201                        }
202                    });
203                    match redis_value {
204                        Some(v) => {
205                            self.insert_in_memory(key.clone(), v.0.clone()).await;
206
207                            let duration = instant_redis.elapsed();
208                            f64_histogram!(
209                                "apollo.router.cache.hit.time",
210                                "Time to get a value from the cache in seconds",
211                                duration.as_secs_f64(),
212                                kind = self.caller,
213                                storage = CacheStorageName::Redis.to_string()
214                            );
215                            Some(v.0)
216                        }
217                        None => {
218                            let duration = instant_redis.elapsed();
219                            f64_histogram!(
220                                "apollo.router.cache.miss.time",
221                                "Time to check the cache for an uncached value in seconds",
222                                duration.as_secs_f64(),
223                                kind = self.caller,
224                                storage = CacheStorageName::Redis.to_string()
225                            );
226                            None
227                        }
228                    }
229                } else {
230                    None
231                }
232            }
233        }
234    }
235
236    pub(crate) async fn insert(&self, key: K, value: V) {
237        if let Some(redis) = self.redis.as_ref() {
238            redis
239                .insert(RedisKey(key.clone()), RedisValue(value.clone()), None)
240                .await;
241        }
242
243        self.insert_in_memory(key, value).await;
244    }
245
246    pub(crate) async fn insert_in_memory(&self, key: K, value: V)
247    where
248        V: ValueType,
249    {
250        // Update the cache size and estimated storage size
251        // This is cheaper than trying to estimate the cache storage size by iterating over the cache
252        let new_value_size = value.estimated_size().unwrap_or(0) as i64;
253
254        let (old_value, length) = {
255            let mut in_memory = self.inner.lock().await;
256            (in_memory.push(key, value), in_memory.len())
257        };
258
259        let size_delta = match old_value {
260            Some((_, old_value)) => {
261                let old_value_size = old_value.estimated_size().unwrap_or(0) as i64;
262                new_value_size - old_value_size
263            }
264            None => new_value_size,
265        };
266        self.cache_estimated_storage
267            .fetch_add(size_delta, Ordering::SeqCst);
268
269        self.cache_size.store(length as i64, Ordering::SeqCst);
270    }
271
272    pub(crate) fn in_memory_cache(&self) -> InMemoryCache<K, V> {
273        self.inner.clone()
274    }
275
276    #[cfg(test)]
277    pub(crate) async fn len(&self) -> usize {
278        self.inner.lock().await.len()
279    }
280
281    pub(crate) fn activate(&self) {
282        // Gauges MUST be created after the meter provider is initialized.
283        // This means that on reload we need a non-fallible way to recreate the gauges.
284        *self.cache_size_gauge.lock() = Some(self.create_cache_size_gauge());
285        *self.cache_estimated_storage_gauge.lock() =
286            Some(self.create_cache_estimated_storage_size_gauge());
287
288        // Also activate Redis metrics if present
289        if let Some(redis) = &self.redis {
290            redis.activate();
291        }
292    }
293}
294
295enum CacheStorageName {
296    Redis,
297    Memory,
298}
299
300impl Display for CacheStorageName {
301    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
302        match self {
303            CacheStorageName::Redis => write!(f, "redis"),
304            CacheStorageName::Memory => write!(f, "memory"),
305        }
306    }
307}
308
309impl ValueType for String {
310    fn estimated_size(&self) -> Option<usize> {
311        Some(self.len())
312    }
313}
314
315impl ValueType for crate::graphql::Response {
316    fn estimated_size(&self) -> Option<usize> {
317        None
318    }
319}
320
321impl ValueType for usize {
322    fn estimated_size(&self) -> Option<usize> {
323        Some(std::mem::size_of::<usize>())
324    }
325}
326
327#[cfg(test)]
328mod test {
329    use std::num::NonZeroUsize;
330
331    use crate::cache::estimate_size;
332    use crate::cache::storage::CacheStorage;
333    use crate::cache::storage::ValueType;
334    use crate::metrics::FutureMetricsExt;
335
336    #[tokio::test]
337    async fn test_metrics() {
338        #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
339        struct Stuff {}
340        impl ValueType for Stuff {
341            fn estimated_size(&self) -> Option<usize> {
342                Some(1)
343            }
344        }
345
346        async {
347            let cache: CacheStorage<String, Stuff> =
348                CacheStorage::new(NonZeroUsize::new(10).unwrap(), None, "test")
349                    .await
350                    .unwrap();
351            cache.activate();
352
353            cache.insert("test".to_string(), Stuff {}).await;
354            assert_gauge!(
355                "apollo.router.cache.storage.estimated_size",
356                1,
357                "kind" = "test",
358                "type" = "memory"
359            );
360            assert_gauge!(
361                "apollo.router.cache.size",
362                1,
363                "kind" = "test",
364                "type" = "memory"
365            );
366        }
367        .with_metrics()
368        .await;
369    }
370
371    #[tokio::test]
372    #[should_panic]
373    async fn test_metrics_not_emitted_where_no_estimated_size() {
374        #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
375        struct Stuff {}
376        impl ValueType for Stuff {
377            fn estimated_size(&self) -> Option<usize> {
378                None
379            }
380        }
381
382        async {
383            let cache: CacheStorage<String, Stuff> =
384                CacheStorage::new(NonZeroUsize::new(10).unwrap(), None, "test")
385                    .await
386                    .unwrap();
387            cache.activate();
388
389            cache.insert("test".to_string(), Stuff {}).await;
390            // This metric won't exist
391            assert_gauge!(
392                "apollo.router.cache.size",
393                0,
394                "kind" = "test",
395                "type" = "memory"
396            );
397        }
398        .with_metrics()
399        .await;
400    }
401
402    #[tokio::test]
403    async fn test_metrics_eviction() {
404        #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
405        struct Stuff {
406            test: String,
407        }
408        impl ValueType for Stuff {
409            fn estimated_size(&self) -> Option<usize> {
410                Some(estimate_size(self))
411            }
412        }
413
414        async {
415            // note that the cache size is 1
416            // so the second insert will always evict
417            let cache: CacheStorage<String, Stuff> =
418                CacheStorage::new(NonZeroUsize::new(1).unwrap(), None, "test")
419                    .await
420                    .unwrap();
421            cache.activate();
422
423            cache
424                .insert(
425                    "test".to_string(),
426                    Stuff {
427                        test: "test".to_string(),
428                    },
429                )
430                .await;
431            assert_gauge!(
432                "apollo.router.cache.storage.estimated_size",
433                28,
434                "kind" = "test",
435                "type" = "memory"
436            );
437            assert_gauge!(
438                "apollo.router.cache.size",
439                1,
440                "kind" = "test",
441                "type" = "memory"
442            );
443
444            // Insert something slightly larger
445            cache
446                .insert(
447                    "test".to_string(),
448                    Stuff {
449                        test: "test_extended".to_string(),
450                    },
451                )
452                .await;
453            assert_gauge!(
454                "apollo.router.cache.storage.estimated_size",
455                37,
456                "kind" = "test",
457                "type" = "memory"
458            );
459            assert_gauge!(
460                "apollo.router.cache.size",
461                1,
462                "kind" = "test",
463                "type" = "memory"
464            );
465
466            // Even though this is a new cache entry, we should get back to where we initially were
467            cache
468                .insert(
469                    "test2".to_string(),
470                    Stuff {
471                        test: "test".to_string(),
472                    },
473                )
474                .await;
475            assert_gauge!(
476                "apollo.router.cache.storage.estimated_size",
477                28,
478                "kind" = "test",
479                "type" = "memory"
480            );
481            assert_gauge!(
482                "apollo.router.cache.size",
483                1,
484                "kind" = "test",
485                "type" = "memory"
486            );
487        }
488        .with_metrics()
489        .await;
490    }
491}