forest/state_manager/
cache.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3use crate::blocks::TipsetKey;
4use crate::shim::executor::Receipt;
5use crate::state_manager::{DEFAULT_TIPSET_CACHE_SIZE, StateEvents};
6use crate::utils::cache::{LruValueConstraints, SizeTrackingLruCache};
7use nonzero_ext::nonzero;
8use parking_lot::Mutex as SyncMutex;
9use std::future::Future;
10use std::num::NonZeroUsize;
11use std::pin::Pin;
12use std::sync::Arc;
13use tokio::sync::Mutex as TokioMutex;
14
15struct TipsetStateCacheInner<V: LruValueConstraints> {
16    values: SizeTrackingLruCache<TipsetKey, V>,
17    pending: Vec<(TipsetKey, Arc<TokioMutex<()>>)>,
18}
19
20impl<V: LruValueConstraints> TipsetStateCacheInner<V> {
21    pub fn with_size(cache_identifier: &str, cache_size: NonZeroUsize) -> Self {
22        Self {
23            values: SizeTrackingLruCache::new_with_metrics(
24                Self::cache_name(cache_identifier).into(),
25                cache_size,
26            ),
27            pending: Vec::with_capacity(8),
28        }
29    }
30
31    fn cache_name(cache_identifier: &str) -> String {
32        format!("tipset_state_{cache_identifier}")
33    }
34}
35
36/// A generic cache that handles concurrent access and computation for tipset-related data.
37pub(crate) struct TipsetStateCache<V: LruValueConstraints> {
38    cache: Arc<SyncMutex<TipsetStateCacheInner<V>>>,
39}
40
41enum CacheLookupStatus<V> {
42    Exist(V),
43    Empty(Arc<TokioMutex<()>>),
44}
45
46impl<V: LruValueConstraints> TipsetStateCache<V> {
47    pub fn new(cache_identifier: &str) -> Self {
48        Self::with_size(cache_identifier, DEFAULT_TIPSET_CACHE_SIZE)
49    }
50
51    pub fn with_size(cache_identifier: &str, cache_size: NonZeroUsize) -> Self {
52        Self {
53            cache: Arc::new(SyncMutex::new(TipsetStateCacheInner::with_size(
54                cache_identifier,
55                cache_size,
56            ))),
57        }
58    }
59
60    fn with_inner<F, T>(&self, func: F) -> T
61    where
62        F: FnOnce(&mut TipsetStateCacheInner<V>) -> T,
63    {
64        let mut lock = self.cache.lock();
65        func(&mut lock)
66    }
67
68    pub async fn get_or_else<F, Fut>(&self, key: &TipsetKey, compute: F) -> anyhow::Result<V>
69    where
70        F: FnOnce() -> Fut,
71        Fut: Future<Output = anyhow::Result<V>> + Send,
72        V: Send + Sync + 'static,
73    {
74        let status = self.with_inner(|inner| match inner.values.get_cloned(key) {
75            Some(v) => CacheLookupStatus::Exist(v),
76            None => {
77                let option = inner
78                    .pending
79                    .iter()
80                    .find(|(k, _)| k == key)
81                    .map(|(_, mutex)| mutex);
82                match option {
83                    Some(mutex) => CacheLookupStatus::Empty(mutex.clone()),
84                    None => {
85                        let mutex = Arc::new(TokioMutex::new(()));
86                        inner.pending.push((key.clone(), mutex.clone()));
87                        CacheLookupStatus::Empty(mutex)
88                    }
89                }
90            }
91        });
92        match status {
93            CacheLookupStatus::Exist(x) => {
94                crate::metrics::LRU_CACHE_HIT
95                    .get_or_create(&crate::metrics::values::STATE_MANAGER_TIPSET)
96                    .inc();
97                Ok(x)
98            }
99            CacheLookupStatus::Empty(mtx) => {
100                let _guard = mtx.lock().await;
101                match self.get(key) {
102                    Some(v) => {
103                        // While locking someone else computed the pending task
104                        crate::metrics::LRU_CACHE_HIT
105                            .get_or_create(&crate::metrics::values::STATE_MANAGER_TIPSET)
106                            .inc();
107
108                        Ok(v)
109                    }
110                    None => {
111                        // Entry does not have state computed yet, compute value and fill the cache
112                        crate::metrics::LRU_CACHE_MISS
113                            .get_or_create(&crate::metrics::values::STATE_MANAGER_TIPSET)
114                            .inc();
115
116                        let value = compute().await?;
117
118                        // Write back to cache, release lock and return value
119                        self.insert(key.clone(), value.clone());
120                        Ok(value)
121                    }
122                }
123            }
124        }
125    }
126
127    pub fn get(&self, key: &TipsetKey) -> Option<V> {
128        self.with_inner(|inner| inner.values.get_cloned(key))
129    }
130
131    pub fn insert(&self, key: TipsetKey, value: V) {
132        self.with_inner(|inner| {
133            inner.pending.retain(|(k, _)| k != &key);
134            inner.values.push(key, value);
135        });
136    }
137}
138
139// Type alias for the compute function for receipts
140type ComputeReceiptFn =
141    Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Receipt>>> + Send>> + Send>;
142
143// Type alias for the compute function for state events
144type ComputeEventsFn =
145    Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = anyhow::Result<StateEvents>> + Send>> + Send>;
146
147/// Defines the interface for caching and retrieving tipset-specific events and receipts.
148pub trait TipsetReceiptEventCacheHandler: Send + Sync + 'static {
149    fn insert_receipt(&self, key: &TipsetKey, receipt: Vec<Receipt>);
150    fn insert_events(&self, key: &TipsetKey, events: StateEvents);
151    #[allow(dead_code)]
152    fn get_events(&self, key: &TipsetKey) -> Option<StateEvents>;
153    #[allow(dead_code)]
154    fn get_receipts(&self, key: &TipsetKey) -> Option<Vec<Receipt>>;
155    fn get_receipt_or_else(
156        &self,
157        key: &TipsetKey,
158        compute: ComputeReceiptFn,
159    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Receipt>>> + Send + '_>>;
160    fn get_events_or_else(
161        &self,
162        key: &TipsetKey,
163        compute: ComputeEventsFn,
164    ) -> Pin<Box<dyn Future<Output = anyhow::Result<StateEvents>> + Send + '_>>;
165}
166
167/// Cache for tipset-related events and receipts.
168pub struct EnabledTipsetDataCache {
169    events_cache: TipsetStateCache<StateEvents>,
170    receipt_cache: TipsetStateCache<Vec<Receipt>>,
171}
172
173impl EnabledTipsetDataCache {
174    pub fn new() -> Self {
175        const DEFAULT_RECEIPT_AND_EVENT_CACHE_SIZE: NonZeroUsize = nonzero!(4096usize);
176
177        Self {
178            events_cache: TipsetStateCache::with_size(
179                "events",
180                DEFAULT_RECEIPT_AND_EVENT_CACHE_SIZE,
181            ),
182            receipt_cache: TipsetStateCache::with_size(
183                "receipts",
184                DEFAULT_RECEIPT_AND_EVENT_CACHE_SIZE,
185            ),
186        }
187    }
188}
189
190impl TipsetReceiptEventCacheHandler for EnabledTipsetDataCache {
191    fn insert_receipt(&self, key: &TipsetKey, mut receipts: Vec<Receipt>) {
192        if !receipts.is_empty() {
193            receipts.shrink_to_fit();
194            self.receipt_cache.insert(key.clone(), receipts);
195        }
196    }
197
198    fn insert_events(&self, key: &TipsetKey, mut events_data: StateEvents) {
199        if !events_data.events.is_empty() {
200            events_data.events.shrink_to_fit();
201            events_data.roots.shrink_to_fit();
202            self.events_cache.insert(key.clone(), events_data);
203        }
204    }
205
206    fn get_events(&self, key: &TipsetKey) -> Option<StateEvents> {
207        self.events_cache.get(key)
208    }
209
210    fn get_receipts(&self, key: &TipsetKey) -> Option<Vec<Receipt>> {
211        self.receipt_cache.get(key)
212    }
213
214    fn get_receipt_or_else(
215        &self,
216        key: &TipsetKey,
217        compute: ComputeReceiptFn,
218    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Receipt>>> + Send + '_>> {
219        let key = key.clone();
220        let receipt_cache = &self.receipt_cache;
221
222        Box::pin(async move {
223            receipt_cache
224                .get_or_else(&key, || async move { compute().await })
225                .await
226        })
227    }
228
229    fn get_events_or_else(
230        &self,
231        key: &TipsetKey,
232        compute: ComputeEventsFn,
233    ) -> Pin<Box<dyn Future<Output = anyhow::Result<StateEvents>> + Send + '_>> {
234        let key = key.clone();
235        let events_cache = &self.events_cache;
236
237        Box::pin(async move {
238            events_cache
239                .get_or_else(&key, || async move { compute().await })
240                .await
241        })
242    }
243}
244
245/// Fake cache for tipset-related events and receipts.
246pub struct DisabledTipsetDataCache;
247
248impl DisabledTipsetDataCache {
249    pub fn new() -> Self {
250        Self {}
251    }
252}
253
254impl TipsetReceiptEventCacheHandler for DisabledTipsetDataCache {
255    fn insert_receipt(&self, _key: &TipsetKey, _receipts: Vec<Receipt>) {
256        // No-op
257    }
258
259    fn insert_events(&self, _key: &TipsetKey, _events_data: StateEvents) {
260        // No-op
261    }
262
263    fn get_events(&self, _key: &TipsetKey) -> Option<StateEvents> {
264        None
265    }
266
267    fn get_receipts(&self, _key: &TipsetKey) -> Option<Vec<Receipt>> {
268        None
269    }
270
271    fn get_receipt_or_else(
272        &self,
273        _key: &TipsetKey,
274        _compute: ComputeReceiptFn,
275    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Receipt>>> + Send + '_>> {
276        Box::pin(async move { Ok(vec![]) })
277    }
278
279    fn get_events_or_else(
280        &self,
281        _key: &TipsetKey,
282        _compute: ComputeEventsFn,
283    ) -> Pin<Box<dyn Future<Output = anyhow::Result<StateEvents>> + Send + '_>> {
284        Box::pin(async move {
285            Ok(StateEvents {
286                events: vec![],
287                roots: vec![],
288            })
289        })
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use crate::blocks::TipsetKey;
297    use crate::shim::executor::Receipt;
298    use cid::Cid;
299    use fvm_ipld_encoding::DAG_CBOR;
300    use multihash_derive::MultihashDigest;
301    use std::sync::Arc;
302    use std::sync::atomic::{AtomicU8, AtomicU32, Ordering};
303    use std::time::Duration;
304
305    fn create_test_tipset_key(i: u64) -> TipsetKey {
306        let bytes = i.to_le_bytes().to_vec();
307        let cid = Cid::new_v1(
308            DAG_CBOR,
309            crate::utils::multihash::MultihashCode::Blake2b256.digest(&bytes),
310        );
311        TipsetKey::from(nunny::vec![cid])
312    }
313
314    fn create_test_receipt(i: u64) -> Vec<Receipt> {
315        vec![Receipt::V4(fvm_shared4::receipt::Receipt {
316            exit_code: fvm_shared4::error::ExitCode::new(0),
317            return_data: fvm_ipld_encoding::RawBytes::default(),
318            gas_used: i * 100,
319            events_root: None,
320        })]
321    }
322
323    #[tokio::test]
324    async fn test_tipset_cache_basic_functionality() {
325        let cache: TipsetStateCache<String> = TipsetStateCache::new("test");
326        let key = create_test_tipset_key(1);
327
328        // Test cache miss and computation
329        let result = cache
330            .get_or_else(&key, || async { Ok("computed_value".to_string()) })
331            .await
332            .unwrap();
333        assert_eq!(result, "computed_value");
334
335        // Test cache hit
336        let result = cache
337            .get_or_else(&key, || async { Ok("should_not_compute".to_string()) })
338            .await
339            .unwrap();
340        assert_eq!(result, "computed_value");
341    }
342
343    #[tokio::test]
344    async fn test_concurrent_same_key_computation() {
345        let cache: Arc<TipsetStateCache<String>> = Arc::new(TipsetStateCache::new("test"));
346        let key = create_test_tipset_key(1);
347        let computation_count = Arc::new(AtomicU8::new(0));
348
349        // Start multiple tasks that try to compute the same key concurrently
350        let mut handles = vec![];
351        for i in 0..10 {
352            let cache_clone = Arc::clone(&cache);
353            let key_clone = key.clone();
354            let count_clone = Arc::clone(&computation_count);
355
356            let handle = tokio::spawn(async move {
357                cache_clone
358                    .get_or_else(&key_clone, || {
359                        let count = Arc::clone(&count_clone);
360                        async move {
361                            // Increment computation count
362                            count.fetch_add(1, Ordering::SeqCst);
363                            // Simulate some computation time
364                            tokio::time::sleep(Duration::from_millis(10)).await;
365                            Ok(format!("computed_value_{i}"))
366                        }
367                    })
368                    .await
369            });
370            handles.push(handle);
371        }
372
373        let results: Vec<_> = futures::future::join_all(handles)
374            .await
375            .into_iter()
376            .collect::<Result<Vec<_>, _>>()
377            .unwrap();
378
379        // Computation should have been performed once
380        assert_eq!(computation_count.load(Ordering::SeqCst), 1);
381
382        // Only one result should be returned as computation was performed once,
383        // and all tasks will get the same result from the cache
384        let first_result = results[0].as_ref().unwrap();
385        for result in &results {
386            assert_eq!(result.as_ref().unwrap(), first_result);
387        }
388    }
389
390    #[tokio::test]
391    async fn test_concurrent_different_keys() {
392        let cache: Arc<TipsetStateCache<String>> = Arc::new(TipsetStateCache::new("test"));
393        let computation_count = Arc::new(AtomicU8::new(0));
394
395        // Start tasks that try to compute the different keys
396        let mut handles = vec![];
397        for i in 0..10 {
398            let cache_clone = Arc::clone(&cache);
399            let key = create_test_tipset_key(i);
400            let count_clone = Arc::clone(&computation_count);
401
402            let handle = tokio::spawn(async move {
403                cache_clone
404                    .get_or_else(&key, || {
405                        let count = Arc::clone(&count_clone);
406                        async move {
407                            count.fetch_add(1, Ordering::SeqCst);
408                            tokio::time::sleep(Duration::from_millis(5)).await;
409                            Ok(format!("value_{i}"))
410                        }
411                    })
412                    .await
413            });
414            handles.push(handle);
415        }
416
417        let results: Vec<_> = futures::future::join_all(handles)
418            .await
419            .into_iter()
420            .collect::<Result<Vec<_>, _>>()
421            .unwrap();
422
423        // Computation should have been performed for each key
424        assert_eq!(computation_count.load(Ordering::SeqCst), 10);
425
426        // All results should be returned as computation was performed once for each key
427        for (i, result) in results.iter().enumerate() {
428            assert_eq!(result.as_ref().unwrap(), &format!("value_{i}"));
429        }
430    }
431
432    #[tokio::test]
433    async fn test_enabled_cache_concurrent_access() {
434        let cache = Arc::new(EnabledTipsetDataCache::new());
435        let key = create_test_tipset_key(1);
436        let computation_count = Arc::new(AtomicU32::new(0));
437
438        let mut handles = vec![];
439        for i in 0..5 {
440            let cache_clone = Arc::clone(&cache);
441            let key_clone = key.clone();
442            let count_clone = Arc::clone(&computation_count);
443
444            let handle = tokio::spawn(async move {
445                cache_clone
446                    .get_receipt_or_else(
447                        &key_clone,
448                        Box::new(move || {
449                            let count = Arc::clone(&count_clone);
450                            Box::pin(async move {
451                                count.fetch_add(1, Ordering::SeqCst);
452                                tokio::time::sleep(Duration::from_millis(10)).await;
453                                Ok(create_test_receipt(i))
454                            })
455                        }),
456                    )
457                    .await
458            });
459            handles.push(handle);
460        }
461
462        let results: Vec<_> = futures::future::join_all(handles)
463            .await
464            .into_iter()
465            .collect::<Result<Vec<_>, _>>()
466            .unwrap();
467
468        // Computation should have been performed once
469        assert_eq!(computation_count.load(Ordering::SeqCst), 1);
470
471        // Only one result should be returned as computation was performed once,
472        // and all tasks will get the same result from the cache
473        let first_result = results[0].as_ref().unwrap();
474        for result in &results {
475            let receipts = result.as_ref().unwrap();
476            assert_eq!(receipts.len(), first_result.len());
477        }
478    }
479
480    #[tokio::test]
481    async fn test_disabled_cache_behavior() {
482        let cache = Arc::new(DisabledTipsetDataCache::new());
483        let key = create_test_tipset_key(1);
484        let computation_count = Arc::new(AtomicU32::new(0));
485
486        // Test that the disabled cache doesn't compute and returns empty results
487        let mut handles = vec![];
488        for i in 0..3 {
489            let cache_clone = Arc::clone(&cache);
490            let key_clone = key.clone();
491            let count_clone = Arc::clone(&computation_count);
492
493            let handle = tokio::spawn(async move {
494                cache_clone
495                    .get_receipt_or_else(
496                        &key_clone,
497                        Box::new(move || {
498                            let count = Arc::clone(&count_clone);
499                            Box::pin(async move {
500                                count.fetch_add(1, Ordering::SeqCst);
501                                Ok(create_test_receipt(i))
502                            })
503                        }),
504                    )
505                    .await
506            });
507            handles.push(handle);
508        }
509
510        let results: Vec<_> = futures::future::join_all(handles)
511            .await
512            .into_iter()
513            .collect::<Result<Vec<_>, _>>()
514            .unwrap();
515
516        // Disabled cache should never compute - it returns empty results immediately
517        assert_eq!(computation_count.load(Ordering::SeqCst), 0);
518
519        // All results should be empty
520        for result in &results {
521            let receipts = result.as_ref().unwrap();
522            assert!(receipts.is_empty());
523        }
524    }
525}