1use crate::blocks::TipsetKey;
4use crate::shim::executor::Receipt;
5use crate::state_manager::{DEFAULT_TIPSET_CACHE_SIZE, StateEvents};
6use crate::utils::cache::{LruValueConstraints, SizeTrackingLruCache};
7use nonzero_ext::nonzero;
8use parking_lot::Mutex as SyncMutex;
9use std::future::Future;
10use std::num::NonZeroUsize;
11use std::pin::Pin;
12use std::sync::Arc;
13use tokio::sync::Mutex as TokioMutex;
14
15struct TipsetStateCacheInner<V: LruValueConstraints> {
16 values: SizeTrackingLruCache<TipsetKey, V>,
17 pending: Vec<(TipsetKey, Arc<TokioMutex<()>>)>,
18}
19
20impl<V: LruValueConstraints> TipsetStateCacheInner<V> {
21 pub fn with_size(cache_identifier: &str, cache_size: NonZeroUsize) -> Self {
22 Self {
23 values: SizeTrackingLruCache::new_with_metrics(
24 Self::cache_name(cache_identifier).into(),
25 cache_size,
26 ),
27 pending: Vec::with_capacity(8),
28 }
29 }
30
31 fn cache_name(cache_identifier: &str) -> String {
32 format!("tipset_state_{cache_identifier}")
33 }
34}
35
36pub(crate) struct TipsetStateCache<V: LruValueConstraints> {
38 cache: Arc<SyncMutex<TipsetStateCacheInner<V>>>,
39}
40
41enum CacheLookupStatus<V> {
42 Exist(V),
43 Empty(Arc<TokioMutex<()>>),
44}
45
46impl<V: LruValueConstraints> TipsetStateCache<V> {
47 pub fn new(cache_identifier: &str) -> Self {
48 Self::with_size(cache_identifier, DEFAULT_TIPSET_CACHE_SIZE)
49 }
50
51 pub fn with_size(cache_identifier: &str, cache_size: NonZeroUsize) -> Self {
52 Self {
53 cache: Arc::new(SyncMutex::new(TipsetStateCacheInner::with_size(
54 cache_identifier,
55 cache_size,
56 ))),
57 }
58 }
59
60 fn with_inner<F, T>(&self, func: F) -> T
61 where
62 F: FnOnce(&mut TipsetStateCacheInner<V>) -> T,
63 {
64 let mut lock = self.cache.lock();
65 func(&mut lock)
66 }
67
68 pub async fn get_or_else<F, Fut>(&self, key: &TipsetKey, compute: F) -> anyhow::Result<V>
69 where
70 F: FnOnce() -> Fut,
71 Fut: Future<Output = anyhow::Result<V>> + Send,
72 V: Send + Sync + 'static,
73 {
74 let status = self.with_inner(|inner| match inner.values.get_cloned(key) {
75 Some(v) => CacheLookupStatus::Exist(v),
76 None => {
77 let option = inner
78 .pending
79 .iter()
80 .find(|(k, _)| k == key)
81 .map(|(_, mutex)| mutex);
82 match option {
83 Some(mutex) => CacheLookupStatus::Empty(mutex.clone()),
84 None => {
85 let mutex = Arc::new(TokioMutex::new(()));
86 inner.pending.push((key.clone(), mutex.clone()));
87 CacheLookupStatus::Empty(mutex)
88 }
89 }
90 }
91 });
92 match status {
93 CacheLookupStatus::Exist(x) => {
94 crate::metrics::LRU_CACHE_HIT
95 .get_or_create(&crate::metrics::values::STATE_MANAGER_TIPSET)
96 .inc();
97 Ok(x)
98 }
99 CacheLookupStatus::Empty(mtx) => {
100 let _guard = mtx.lock().await;
101 match self.get(key) {
102 Some(v) => {
103 crate::metrics::LRU_CACHE_HIT
105 .get_or_create(&crate::metrics::values::STATE_MANAGER_TIPSET)
106 .inc();
107
108 Ok(v)
109 }
110 None => {
111 crate::metrics::LRU_CACHE_MISS
113 .get_or_create(&crate::metrics::values::STATE_MANAGER_TIPSET)
114 .inc();
115
116 let value = compute().await?;
117
118 self.insert(key.clone(), value.clone());
120 Ok(value)
121 }
122 }
123 }
124 }
125 }
126
127 pub fn get(&self, key: &TipsetKey) -> Option<V> {
128 self.with_inner(|inner| inner.values.get_cloned(key))
129 }
130
131 pub fn insert(&self, key: TipsetKey, value: V) {
132 self.with_inner(|inner| {
133 inner.pending.retain(|(k, _)| k != &key);
134 inner.values.push(key, value);
135 });
136 }
137}
138
139type ComputeReceiptFn =
141 Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Receipt>>> + Send>> + Send>;
142
143type ComputeEventsFn =
145 Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = anyhow::Result<StateEvents>> + Send>> + Send>;
146
147pub trait TipsetReceiptEventCacheHandler: Send + Sync + 'static {
149 fn insert_receipt(&self, key: &TipsetKey, receipt: Vec<Receipt>);
150 fn insert_events(&self, key: &TipsetKey, events: StateEvents);
151 #[allow(dead_code)]
152 fn get_events(&self, key: &TipsetKey) -> Option<StateEvents>;
153 #[allow(dead_code)]
154 fn get_receipts(&self, key: &TipsetKey) -> Option<Vec<Receipt>>;
155 fn get_receipt_or_else(
156 &self,
157 key: &TipsetKey,
158 compute: ComputeReceiptFn,
159 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Receipt>>> + Send + '_>>;
160 fn get_events_or_else(
161 &self,
162 key: &TipsetKey,
163 compute: ComputeEventsFn,
164 ) -> Pin<Box<dyn Future<Output = anyhow::Result<StateEvents>> + Send + '_>>;
165}
166
167pub struct EnabledTipsetDataCache {
169 events_cache: TipsetStateCache<StateEvents>,
170 receipt_cache: TipsetStateCache<Vec<Receipt>>,
171}
172
173impl EnabledTipsetDataCache {
174 pub fn new() -> Self {
175 const DEFAULT_RECEIPT_AND_EVENT_CACHE_SIZE: NonZeroUsize = nonzero!(4096usize);
176
177 Self {
178 events_cache: TipsetStateCache::with_size(
179 "events",
180 DEFAULT_RECEIPT_AND_EVENT_CACHE_SIZE,
181 ),
182 receipt_cache: TipsetStateCache::with_size(
183 "receipts",
184 DEFAULT_RECEIPT_AND_EVENT_CACHE_SIZE,
185 ),
186 }
187 }
188}
189
190impl TipsetReceiptEventCacheHandler for EnabledTipsetDataCache {
191 fn insert_receipt(&self, key: &TipsetKey, mut receipts: Vec<Receipt>) {
192 if !receipts.is_empty() {
193 receipts.shrink_to_fit();
194 self.receipt_cache.insert(key.clone(), receipts);
195 }
196 }
197
198 fn insert_events(&self, key: &TipsetKey, mut events_data: StateEvents) {
199 if !events_data.events.is_empty() {
200 events_data.events.shrink_to_fit();
201 events_data.roots.shrink_to_fit();
202 self.events_cache.insert(key.clone(), events_data);
203 }
204 }
205
206 fn get_events(&self, key: &TipsetKey) -> Option<StateEvents> {
207 self.events_cache.get(key)
208 }
209
210 fn get_receipts(&self, key: &TipsetKey) -> Option<Vec<Receipt>> {
211 self.receipt_cache.get(key)
212 }
213
214 fn get_receipt_or_else(
215 &self,
216 key: &TipsetKey,
217 compute: ComputeReceiptFn,
218 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Receipt>>> + Send + '_>> {
219 let key = key.clone();
220 let receipt_cache = &self.receipt_cache;
221
222 Box::pin(async move {
223 receipt_cache
224 .get_or_else(&key, || async move { compute().await })
225 .await
226 })
227 }
228
229 fn get_events_or_else(
230 &self,
231 key: &TipsetKey,
232 compute: ComputeEventsFn,
233 ) -> Pin<Box<dyn Future<Output = anyhow::Result<StateEvents>> + Send + '_>> {
234 let key = key.clone();
235 let events_cache = &self.events_cache;
236
237 Box::pin(async move {
238 events_cache
239 .get_or_else(&key, || async move { compute().await })
240 .await
241 })
242 }
243}
244
245pub struct DisabledTipsetDataCache;
247
248impl DisabledTipsetDataCache {
249 pub fn new() -> Self {
250 Self {}
251 }
252}
253
254impl TipsetReceiptEventCacheHandler for DisabledTipsetDataCache {
255 fn insert_receipt(&self, _key: &TipsetKey, _receipts: Vec<Receipt>) {
256 }
258
259 fn insert_events(&self, _key: &TipsetKey, _events_data: StateEvents) {
260 }
262
263 fn get_events(&self, _key: &TipsetKey) -> Option<StateEvents> {
264 None
265 }
266
267 fn get_receipts(&self, _key: &TipsetKey) -> Option<Vec<Receipt>> {
268 None
269 }
270
271 fn get_receipt_or_else(
272 &self,
273 _key: &TipsetKey,
274 _compute: ComputeReceiptFn,
275 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Receipt>>> + Send + '_>> {
276 Box::pin(async move { Ok(vec![]) })
277 }
278
279 fn get_events_or_else(
280 &self,
281 _key: &TipsetKey,
282 _compute: ComputeEventsFn,
283 ) -> Pin<Box<dyn Future<Output = anyhow::Result<StateEvents>> + Send + '_>> {
284 Box::pin(async move {
285 Ok(StateEvents {
286 events: vec![],
287 roots: vec![],
288 })
289 })
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use crate::blocks::TipsetKey;
297 use crate::shim::executor::Receipt;
298 use cid::Cid;
299 use fvm_ipld_encoding::DAG_CBOR;
300 use multihash_derive::MultihashDigest;
301 use std::sync::Arc;
302 use std::sync::atomic::{AtomicU8, AtomicU32, Ordering};
303 use std::time::Duration;
304
305 fn create_test_tipset_key(i: u64) -> TipsetKey {
306 let bytes = i.to_le_bytes().to_vec();
307 let cid = Cid::new_v1(
308 DAG_CBOR,
309 crate::utils::multihash::MultihashCode::Blake2b256.digest(&bytes),
310 );
311 TipsetKey::from(nunny::vec![cid])
312 }
313
314 fn create_test_receipt(i: u64) -> Vec<Receipt> {
315 vec![Receipt::V4(fvm_shared4::receipt::Receipt {
316 exit_code: fvm_shared4::error::ExitCode::new(0),
317 return_data: fvm_ipld_encoding::RawBytes::default(),
318 gas_used: i * 100,
319 events_root: None,
320 })]
321 }
322
323 #[tokio::test]
324 async fn test_tipset_cache_basic_functionality() {
325 let cache: TipsetStateCache<String> = TipsetStateCache::new("test");
326 let key = create_test_tipset_key(1);
327
328 let result = cache
330 .get_or_else(&key, || async { Ok("computed_value".to_string()) })
331 .await
332 .unwrap();
333 assert_eq!(result, "computed_value");
334
335 let result = cache
337 .get_or_else(&key, || async { Ok("should_not_compute".to_string()) })
338 .await
339 .unwrap();
340 assert_eq!(result, "computed_value");
341 }
342
343 #[tokio::test]
344 async fn test_concurrent_same_key_computation() {
345 let cache: Arc<TipsetStateCache<String>> = Arc::new(TipsetStateCache::new("test"));
346 let key = create_test_tipset_key(1);
347 let computation_count = Arc::new(AtomicU8::new(0));
348
349 let mut handles = vec![];
351 for i in 0..10 {
352 let cache_clone = Arc::clone(&cache);
353 let key_clone = key.clone();
354 let count_clone = Arc::clone(&computation_count);
355
356 let handle = tokio::spawn(async move {
357 cache_clone
358 .get_or_else(&key_clone, || {
359 let count = Arc::clone(&count_clone);
360 async move {
361 count.fetch_add(1, Ordering::SeqCst);
363 tokio::time::sleep(Duration::from_millis(10)).await;
365 Ok(format!("computed_value_{i}"))
366 }
367 })
368 .await
369 });
370 handles.push(handle);
371 }
372
373 let results: Vec<_> = futures::future::join_all(handles)
374 .await
375 .into_iter()
376 .collect::<Result<Vec<_>, _>>()
377 .unwrap();
378
379 assert_eq!(computation_count.load(Ordering::SeqCst), 1);
381
382 let first_result = results[0].as_ref().unwrap();
385 for result in &results {
386 assert_eq!(result.as_ref().unwrap(), first_result);
387 }
388 }
389
390 #[tokio::test]
391 async fn test_concurrent_different_keys() {
392 let cache: Arc<TipsetStateCache<String>> = Arc::new(TipsetStateCache::new("test"));
393 let computation_count = Arc::new(AtomicU8::new(0));
394
395 let mut handles = vec![];
397 for i in 0..10 {
398 let cache_clone = Arc::clone(&cache);
399 let key = create_test_tipset_key(i);
400 let count_clone = Arc::clone(&computation_count);
401
402 let handle = tokio::spawn(async move {
403 cache_clone
404 .get_or_else(&key, || {
405 let count = Arc::clone(&count_clone);
406 async move {
407 count.fetch_add(1, Ordering::SeqCst);
408 tokio::time::sleep(Duration::from_millis(5)).await;
409 Ok(format!("value_{i}"))
410 }
411 })
412 .await
413 });
414 handles.push(handle);
415 }
416
417 let results: Vec<_> = futures::future::join_all(handles)
418 .await
419 .into_iter()
420 .collect::<Result<Vec<_>, _>>()
421 .unwrap();
422
423 assert_eq!(computation_count.load(Ordering::SeqCst), 10);
425
426 for (i, result) in results.iter().enumerate() {
428 assert_eq!(result.as_ref().unwrap(), &format!("value_{i}"));
429 }
430 }
431
432 #[tokio::test]
433 async fn test_enabled_cache_concurrent_access() {
434 let cache = Arc::new(EnabledTipsetDataCache::new());
435 let key = create_test_tipset_key(1);
436 let computation_count = Arc::new(AtomicU32::new(0));
437
438 let mut handles = vec![];
439 for i in 0..5 {
440 let cache_clone = Arc::clone(&cache);
441 let key_clone = key.clone();
442 let count_clone = Arc::clone(&computation_count);
443
444 let handle = tokio::spawn(async move {
445 cache_clone
446 .get_receipt_or_else(
447 &key_clone,
448 Box::new(move || {
449 let count = Arc::clone(&count_clone);
450 Box::pin(async move {
451 count.fetch_add(1, Ordering::SeqCst);
452 tokio::time::sleep(Duration::from_millis(10)).await;
453 Ok(create_test_receipt(i))
454 })
455 }),
456 )
457 .await
458 });
459 handles.push(handle);
460 }
461
462 let results: Vec<_> = futures::future::join_all(handles)
463 .await
464 .into_iter()
465 .collect::<Result<Vec<_>, _>>()
466 .unwrap();
467
468 assert_eq!(computation_count.load(Ordering::SeqCst), 1);
470
471 let first_result = results[0].as_ref().unwrap();
474 for result in &results {
475 let receipts = result.as_ref().unwrap();
476 assert_eq!(receipts.len(), first_result.len());
477 }
478 }
479
480 #[tokio::test]
481 async fn test_disabled_cache_behavior() {
482 let cache = Arc::new(DisabledTipsetDataCache::new());
483 let key = create_test_tipset_key(1);
484 let computation_count = Arc::new(AtomicU32::new(0));
485
486 let mut handles = vec![];
488 for i in 0..3 {
489 let cache_clone = Arc::clone(&cache);
490 let key_clone = key.clone();
491 let count_clone = Arc::clone(&computation_count);
492
493 let handle = tokio::spawn(async move {
494 cache_clone
495 .get_receipt_or_else(
496 &key_clone,
497 Box::new(move || {
498 let count = Arc::clone(&count_clone);
499 Box::pin(async move {
500 count.fetch_add(1, Ordering::SeqCst);
501 Ok(create_test_receipt(i))
502 })
503 }),
504 )
505 .await
506 });
507 handles.push(handle);
508 }
509
510 let results: Vec<_> = futures::future::join_all(handles)
511 .await
512 .into_iter()
513 .collect::<Result<Vec<_>, _>>()
514 .unwrap();
515
516 assert_eq!(computation_count.load(Ordering::SeqCst), 0);
518
519 for result in &results {
521 let receipts = result.as_ref().unwrap();
522 assert!(receipts.is_empty());
523 }
524 }
525}