use crate::ports::P2pDb;
use fuel_core_metrics::p2p_metrics::{
increment_p2p_req_res_cache_hits,
increment_p2p_req_res_cache_misses,
};
use fuel_core_storage::Result as StorageResult;
use fuel_core_types::{
blockchain::SealedBlockHeader,
services::p2p::Transactions,
};
use quick_cache::sync::Cache;
use std::ops::Range;
type BlockHeight = u32;
pub(super) struct CachedView {
sealed_block_headers: Cache<BlockHeight, SealedBlockHeader>,
transactions_on_blocks: Cache<BlockHeight, Transactions>,
metrics: bool,
}
impl CachedView {
pub fn new(capacity: usize, metrics: bool) -> Self {
Self {
sealed_block_headers: Cache::new(capacity),
transactions_on_blocks: Cache::new(capacity),
metrics,
}
}
fn update_metrics<U>(&self, update_fn: U)
where
U: FnOnce(),
{
if self.metrics {
update_fn()
}
}
fn get_from_cache_or_db<V, T, F>(
&self,
cache: &Cache<u32, T>,
view: &V,
range: Range<u32>,
fetch_fn: F,
) -> StorageResult<Option<Vec<T>>>
where
V: P2pDb,
T: Clone,
F: Fn(&V, Range<u32>) -> StorageResult<Option<Vec<T>>>,
{
let mut items = Vec::new();
let mut missing_start = None;
for height in range.clone() {
if let Some(item) = cache.get(&height) {
items.push(item);
} else {
missing_start = Some(height);
break;
}
}
let Some(missing_start) = missing_start else {
self.update_metrics(increment_p2p_req_res_cache_hits);
return Ok(Some(items));
};
let missing_range = missing_start..range.end;
self.update_metrics(increment_p2p_req_res_cache_misses);
if let Some(fetched_items) = fetch_fn(view, missing_range.clone())? {
for (height, item) in missing_range.zip(fetched_items.into_iter()) {
cache.insert(height, item.clone());
items.push(item);
}
return Ok(Some(items));
}
Ok(None)
}
pub(crate) fn get_sealed_headers<V>(
&self,
view: &V,
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<SealedBlockHeader>>>
where
V: P2pDb,
{
self.get_from_cache_or_db(
&self.sealed_block_headers,
view,
block_height_range,
V::get_sealed_headers,
)
}
pub(crate) fn get_transactions<V>(
&self,
view: &V,
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>>
where
V: P2pDb,
{
self.get_from_cache_or_db(
&self.transactions_on_blocks,
view,
block_height_range,
V::get_transactions,
)
}
}
#[allow(non_snake_case)]
#[cfg(test)]
mod tests {
use super::*;
use fuel_core_types::blockchain::consensus::Genesis;
use std::sync::Arc;
use tokio::sync::Notify;
struct FakeDb {
sender: Arc<Notify>,
values: bool,
}
#[inline]
fn default_sealed_headers(range: Range<u32>) -> Vec<SealedBlockHeader> {
vec![SealedBlockHeader::default(); range.len()]
}
#[inline]
fn default_transactions(range: Range<u32>) -> Vec<Transactions> {
vec![Transactions::default(); range.len()]
}
impl P2pDb for FakeDb {
fn get_sealed_headers(
&self,
range: Range<u32>,
) -> StorageResult<Option<Vec<SealedBlockHeader>>> {
self.sender.notify_waiters();
if !self.values {
return Ok(None);
}
let headers = default_sealed_headers(range);
Ok(Some(headers))
}
fn get_transactions(
&self,
range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>> {
self.sender.notify_waiters();
if !self.values {
return Ok(None);
}
let transactions = default_transactions(range);
Ok(Some(transactions))
}
fn get_genesis(&self) -> StorageResult<Genesis> {
self.sender.notify_waiters();
Ok(Genesis::default())
}
}
#[tokio::test]
async fn cached_view__get_sealed_headers__cache_hit() {
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: true,
};
let cached_view = CachedView::new(10, false);
let block_height_range = 0..100;
let sealed_headers = default_sealed_headers(block_height_range.clone());
for (block_height, header) in
block_height_range.clone().zip(sealed_headers.iter())
{
cached_view
.sealed_block_headers
.insert(block_height, header.clone());
}
let result = cached_view
.get_sealed_headers(&db, block_height_range.clone())
.unwrap();
assert_eq!(result, Some(sealed_headers));
}
#[tokio::test]
async fn cached_view__get_sealed_headers__cache_miss() {
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: true,
};
let cached_view = CachedView::new(10, false);
let notified = sender.notified();
let block_height_range = 0..100;
let sealed_headers = default_sealed_headers(block_height_range.clone());
let result = cached_view
.get_sealed_headers(&db, block_height_range.clone())
.unwrap();
notified.await;
assert_eq!(result, Some(sealed_headers));
}
#[tokio::test]
async fn cached_view__when_response_is_none__get_sealed_headers__cache_miss() {
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: false,
};
let cached_view = CachedView::new(10, false);
let notified = sender.notified();
let block_height_range = 0..100;
let result = cached_view
.get_sealed_headers(&db, block_height_range.clone())
.unwrap();
notified.await;
assert!(result.is_none());
}
#[tokio::test]
async fn cached_view__get_transactions__cache_hit() {
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: true,
};
let cached_view = CachedView::new(10, false);
let block_height_range = 0..100;
let transactions = default_transactions(block_height_range.clone());
for (block_height, transactions) in
block_height_range.clone().zip(transactions.iter())
{
cached_view
.transactions_on_blocks
.insert(block_height, transactions.clone());
}
let result = cached_view
.get_transactions(&db, block_height_range.clone())
.unwrap();
for (expected, actual) in transactions.iter().zip(result.unwrap().iter()) {
assert_eq!(expected.0, actual.0);
}
}
#[tokio::test]
async fn cached_view__get_transactions__cache_miss() {
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: true,
};
let cached_view = CachedView::new(10, false);
let notified = sender.notified();
let block_height_range = 0..100;
let transactions = default_transactions(block_height_range.clone());
let result = cached_view
.get_transactions(&db, block_height_range.clone())
.unwrap();
notified.await;
for (expected, actual) in transactions.iter().zip(result.unwrap().iter()) {
assert_eq!(expected.0, actual.0);
}
}
#[tokio::test]
async fn cached_view__when_response_is_none__get_transactions__cache_miss() {
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: false,
};
let cached_view = CachedView::new(10, false);
let notified = sender.notified();
let block_height_range = 0..100;
let result = cached_view
.get_transactions(&db, block_height_range.clone())
.unwrap();
notified.await;
assert!(result.is_none());
}
#[tokio::test]
async fn cached_view__when_lru_is_full_it_makes_call_to_db() {
let cache_capacity = 10;
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: true,
};
let cached_view = CachedView::new(cache_capacity, false);
let block_height_range = 0..u32::try_from(cache_capacity).unwrap() + 1;
let _ = cached_view
.get_transactions(&db, block_height_range.clone())
.unwrap();
let notified = sender.notified();
let result = cached_view
.get_transactions(&db, block_height_range.clone())
.unwrap();
notified.await;
assert!(result.is_some());
}
#[tokio::test]
async fn cached_view__when_lru_is_partially_full_it_does_not_make_call_to_db() {
let cache_capacity = 100;
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: true,
};
let cached_view = CachedView::new(cache_capacity, false);
let block_height_range = 0..10;
let _ = cached_view
.get_transactions(&db, block_height_range.clone())
.unwrap();
let notified = sender.notified();
let _ = cached_view
.get_transactions(&db, block_height_range.clone())
.unwrap();
assert!(
tokio::time::timeout(std::time::Duration::from_millis(50), notified)
.await
.is_err()
)
}
}