use super::{
Error,
index::{ChainIndex, ResolveNullTipset},
tipset_tracker::TipsetTracker,
};
use crate::message::{ChainMessage, SignedMessage};
use crate::networks::{ChainConfig, Height};
use crate::rpc::eth::{eth_tx_from_signed_eth_message, types::EthHash};
use crate::shim::clock::ChainEpoch;
use crate::shim::{executor::Receipt, message::Message, version::NetworkVersion};
use crate::utils::ShallowClone;
use crate::utils::db::{BlockstoreExt, CborStoreExt};
use crate::{
blocks::{CachingBlockHeader, Tipset, TipsetKey, TxMeta},
db::HeaviestTipsetKeyProvider,
};
use crate::{
db::{EthMappingsStore, EthMappingsStoreExt},
rpc::chain::PathChange,
};
use crate::{fil_cns, utils::cache::SizeTrackingLruCache};
use crate::{
interpreter::{BlockMessages, VMTrace},
rpc::chain::PathChanges,
};
use crate::{
libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite},
state_manager::ExecutedTipset,
};
use ahash::{HashMap, HashSet};
use anyhow::Context as _;
use cid::Cid;
use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore;
use itertools::Itertools;
use nonzero_ext::nonzero;
use parking_lot::{Mutex, RwLock};
use serde::{Serialize, de::DeserializeOwned};
use std::{num::NonZeroUsize, sync::Arc};
use tokio::sync::broadcast;
use tracing::{debug, error, trace, warn};
const SINK_CAP: usize = 200;
pub type ChainEpochDelta = ChainEpoch;
pub type HeadChange = PathChange<Tipset>;
pub type HeadChanges = PathChanges<Tipset>;
pub struct ChainStore<DB> {
head_changes_tx: broadcast::Sender<HeadChanges>,
db: Arc<DB>,
heaviest_tipset_key_provider: Arc<dyn HeaviestTipsetKeyProvider + Sync + Send>,
heaviest_tipset: Arc<RwLock<Tipset>>,
f3_finalized_tipset: Arc<RwLock<Option<Tipset>>>,
chain_index: ChainIndex<DB>,
tipset_tracker: TipsetTracker<DB>,
genesis_block_header: CachingBlockHeader,
pub(crate) validated_blocks: Mutex<HashSet<Cid>>,
eth_mappings: Arc<dyn EthMappingsStore + Sync + Send>,
chain_config: Arc<ChainConfig>,
messages_in_tipset_cache: MessagesInTipsetCache,
}
impl<DB> BitswapStoreRead for ChainStore<DB>
where
DB: BitswapStoreRead,
{
fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
self.db.contains(cid)
}
fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
self.db.get(cid)
}
}
impl<DB> BitswapStoreReadWrite for ChainStore<DB>
where
DB: BitswapStoreReadWrite,
{
type Hashes = <DB as BitswapStoreReadWrite>::Hashes;
fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
self.db.insert(block)
}
}
impl<DB> ChainStore<DB>
where
DB: Blockstore,
{
pub fn new(
db: Arc<DB>,
heaviest_tipset_key_provider: Arc<dyn HeaviestTipsetKeyProvider + Sync + Send>,
eth_mappings: Arc<dyn EthMappingsStore + Sync + Send>,
chain_config: Arc<ChainConfig>,
genesis_block_header: CachingBlockHeader,
) -> anyhow::Result<Self> {
let (publisher, _) = broadcast::channel(SINK_CAP);
let validated_blocks = Mutex::new(HashSet::default());
let head = if let Some(head_tsk) = heaviest_tipset_key_provider
.heaviest_tipset_key()
.context("failed to load head tipset key")?
{
Tipset::load_required(&db, &head_tsk)
.with_context(|| format!("failed to load head tipset with key {head_tsk}"))?
} else {
Tipset::from(&genesis_block_header)
};
let heaviest_tipset = Arc::new(RwLock::new(head));
let f3_finalized_tipset: Arc<RwLock<Option<Tipset>>> = Default::default();
let chain_index = ChainIndex::new(db.clone()).with_is_tipset_finalized(Arc::new({
let chain_finality = chain_config.policy.chain_finality;
let heaviest_tipset = heaviest_tipset.clone();
let f3_finalized_tipset = f3_finalized_tipset.clone();
move |ts| {
let finalized = f3_finalized_tipset
.read()
.as_ref()
.map(|ts| ts.epoch())
.unwrap_or_default()
.max(heaviest_tipset.read().epoch() - chain_finality);
ts.epoch() <= finalized
}
}));
let cs = Self {
head_changes_tx: publisher,
chain_index,
tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()),
db,
heaviest_tipset_key_provider,
heaviest_tipset,
f3_finalized_tipset,
genesis_block_header,
validated_blocks,
eth_mappings,
chain_config,
messages_in_tipset_cache: Default::default(),
};
Ok(cs)
}
pub fn set_f3_finalized_tipset(&self, ts: Tipset) {
self.f3_finalized_tipset.write().replace(ts);
}
pub fn f3_finalized_tipset(&self) -> Option<Tipset> {
self.f3_finalized_tipset.read().clone()
}
pub fn messages_in_tipset_cache(&self) -> &MessagesInTipsetCache {
&self.messages_in_tipset_cache
}
pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> {
head.key().save(self.blockstore())?;
self.heaviest_tipset_key_provider
.set_heaviest_tipset_key(head.key())?;
let old_head = std::mem::replace(&mut *self.heaviest_tipset.write(), head.clone());
if crate::utils::broadcast::has_subscribers(&self.head_changes_tx) {
let changes = match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key())
{
Ok(changes) => changes,
Err(e) => {
if old_head.epoch() > 0 {
error!("failed to get chain path changes: {e:#}");
}
PathChanges {
applies: vec![head],
reverts: vec![],
}
}
};
if self.head_changes_tx.send(changes).is_err() {
debug!("did not publish changes, no active receivers");
}
}
Ok(())
}
pub fn add_to_tipset_tracker(&self, header: &CachingBlockHeader) {
self.tipset_tracker.add(header);
}
pub fn put_tipset(&self, ts: &Tipset) -> Result<(), Error> {
persist_objects(self.blockstore(), ts.block_headers().iter())?;
let expanded = self.expand_tipset(ts.min_ticket_block().clone())?;
self.update_heaviest(expanded)?;
Ok(())
}
pub fn get_required_tipset_key(&self, hash: &EthHash) -> Result<TipsetKey, Error> {
Ok(TipsetKey::load(self.blockstore(), &hash.to_cid())?)
}
pub fn put_mapping(&self, k: EthHash, v: Cid, timestamp: u64) -> Result<(), Error> {
self.eth_mappings.write_obj(&k, &(v, timestamp))?;
Ok(())
}
pub fn get_mapping(&self, hash: &EthHash) -> Result<Option<Cid>, Error> {
Ok(self
.eth_mappings
.read_obj::<(Cid, u64)>(hash)?
.map(|(cid, _)| cid))
}
fn expand_tipset(&self, header: CachingBlockHeader) -> Result<Tipset, Error> {
self.tipset_tracker.expand(header)
}
pub fn genesis_block_header(&self) -> &CachingBlockHeader {
&self.genesis_block_header
}
pub fn heaviest_tipset(&self) -> Tipset {
self.heaviest_tipset.read().clone()
}
pub fn genesis_tipset(&self) -> Tipset {
Tipset::from(self.genesis_block_header())
}
pub fn subscribe_head_changes(&self) -> broadcast::Receiver<HeadChanges> {
self.head_changes_tx.subscribe()
}
pub fn blockstore(&self) -> &Arc<DB> {
&self.db
}
pub fn chain_index(&self) -> &ChainIndex<DB> {
&self.chain_index
}
pub fn chain_config(&self) -> &Arc<ChainConfig> {
&self.chain_config
}
#[tracing::instrument(skip_all)]
pub fn load_required_tipset_or_heaviest<'a>(
&self,
maybe_key: impl Into<Option<&'a TipsetKey>>,
) -> Result<Tipset, Error> {
match maybe_key.into() {
Some(key) => self.chain_index.load_required_tipset(key),
None => Ok(self.heaviest_tipset()),
}
}
pub fn load_child_tipset(&self, ts: &Tipset) -> Result<Tipset, Error> {
let head = self.heaviest_tipset();
if head.parents() == ts.key() {
Ok(head)
} else if head.epoch() > ts.epoch() {
let maybe_child = self.chain_index().tipset_by_height(
ts.epoch() + 1,
head,
ResolveNullTipset::TakeNewer,
)?;
if maybe_child.parents() == ts.key() {
Ok(maybe_child)
} else {
Err(Error::NotFound(
format!("child of tipset@{}", ts.epoch()).into(),
))
}
} else {
Err(Error::NotFound(
format!("child of tipset@{}", ts.epoch()).into(),
))
}
}
fn update_heaviest(&self, ts: Tipset) -> Result<(), Error> {
let heaviest_weight = fil_cns::weight(self.blockstore(), &self.heaviest_tipset())?;
let new_weight = fil_cns::weight(self.blockstore(), &ts)?;
let curr_weight = heaviest_weight;
if new_weight > curr_weight {
self.set_heaviest_tipset(ts)?;
}
Ok(())
}
pub fn is_block_validated(&self, cid: &Cid) -> bool {
let validated = self.validated_blocks.lock().contains(cid);
if validated {
trace!("Block {cid} was previously validated");
}
validated
}
pub fn mark_block_as_validated(&self, cid: &Cid) {
let mut file = self.validated_blocks.lock();
file.insert(*cid);
}
pub fn unmark_block_as_validated(&self, cid: &Cid) {
let mut file = self.validated_blocks.lock();
let _did_work = file.remove(cid);
}
pub fn messages_for_tipset(&self, ts: &Tipset) -> Result<Arc<Vec<ChainMessage>>, Error> {
Ok(self
.messages_in_tipset_cache()
.get_or_insert_with(ts.key(), || {
let bmsgs = BlockMessages::for_tipset(&self.db, ts)?;
Ok(bmsgs.into_iter().flat_map(|bm| bm.messages).collect_vec())
})?)
}
pub fn get_lookback_tipset_for_round(
chain_index: &ChainIndex<DB>,
chain_config: &Arc<ChainConfig>,
heaviest_tipset: &Tipset,
round: ChainEpoch,
) -> Result<(Tipset, Cid), Error>
where
DB: Send + Sync + 'static,
{
let version = chain_config.network_version(round);
let lb = if version <= NetworkVersion::V3 {
ChainEpoch::from(10)
} else {
chain_config.policy.chain_finality
};
let lbr = (round - lb).max(0);
if lbr >= heaviest_tipset.epoch() {
let genesis_timestamp = heaviest_tipset.genesis(chain_index.db())?.timestamp;
let beacon = Arc::new(chain_config.get_beacon_schedule(genesis_timestamp));
let ExecutedTipset { state_root, .. } = crate::state_manager::apply_block_messages(
genesis_timestamp,
chain_index.shallow_clone(),
chain_config.shallow_clone(),
beacon,
&crate::shim::machine::GLOBAL_MULTI_ENGINE,
heaviest_tipset.clone(),
crate::state_manager::NO_CALLBACK,
VMTrace::NotTraced,
)
.map_err(|e| Error::Other(e.to_string()))?;
return Ok((heaviest_tipset.clone(), state_root));
}
let next_ts = chain_index
.tipset_by_height(
lbr + 1,
heaviest_tipset.clone(),
ResolveNullTipset::TakeNewer,
)
.map_err(|e| Error::Other(format!("Could not get tipset by height {e:?}")))?;
if lbr > next_ts.epoch() {
return Err(Error::Other(format!(
"failed to find non-null tipset {:?} {} which is known to exist, found {:?} {}",
heaviest_tipset.key(),
heaviest_tipset.epoch(),
next_ts.key(),
next_ts.epoch()
)));
}
let lbts = chain_index
.load_required_tipset(next_ts.parents())
.map_err(|e| Error::Other(format!("Could not get tipset from keys {e:?}")))?;
Ok((lbts, *next_ts.parent_state()))
}
pub fn process_signed_messages(&self, messages: &[(SignedMessage, u64)]) -> anyhow::Result<()>
where
DB: fvm_ipld_blockstore::Blockstore,
{
let eth_txs: Vec<(EthHash, Cid, u64, usize)> = messages
.iter()
.enumerate()
.filter_map(|(i, (smsg, timestamp))| {
if let Ok((_, tx)) =
eth_tx_from_signed_eth_message(smsg, self.chain_config.eth_chain_id)
{
if let Ok(hash) = tx.eth_hash() {
Some((hash.into(), smsg.cid(), *timestamp, i))
} else {
None
}
} else {
None
}
})
.collect();
let filtered = filter_lowest_index(eth_txs);
let num_entries = filtered.len();
for (k, v, timestamp) in filtered.into_iter() {
tracing::trace!("Insert mapping {} => {}", k, v);
self.put_mapping(k, v, timestamp)?;
}
tracing::trace!("Wrote {} entries in Ethereum mapping", num_entries);
Ok(())
}
pub fn headers_delegated_messages<'a>(
&self,
headers: impl Iterator<Item = &'a CachingBlockHeader>,
) -> anyhow::Result<Vec<(SignedMessage, u64)>>
where
DB: fvm_ipld_blockstore::Blockstore,
{
let mut delegated_messages = vec![];
let filtered_headers =
headers.filter(|bh| bh.epoch >= self.chain_config.epoch(Height::Hygge));
for bh in filtered_headers {
if let Ok((_, secp_cids)) = block_messages(self.blockstore(), bh) {
let mut messages: Vec<_> = secp_cids
.into_iter()
.filter(|msg| msg.is_delegated())
.map(|m| (m, bh.timestamp))
.collect();
delegated_messages.append(&mut messages);
}
}
Ok(delegated_messages)
}
}
fn filter_lowest_index(values: Vec<(EthHash, Cid, u64, usize)>) -> Vec<(EthHash, Cid, u64)> {
let map: HashMap<EthHash, (Cid, u64, usize)> = values.into_iter().fold(
HashMap::default(),
|mut acc, (hash, cid, timestamp, index)| {
acc.entry(hash)
.and_modify(|&mut (_, _, ref mut min_index)| {
if index < *min_index {
*min_index = index;
}
})
.or_insert((cid, timestamp, index));
acc
},
);
map.into_iter()
.map(|(hash, (cid, timestamp, _))| (hash, cid, timestamp))
.collect()
}
pub fn block_messages<DB>(
db: &DB,
bh: &CachingBlockHeader,
) -> Result<(Vec<Message>, Vec<SignedMessage>), Error>
where
DB: Blockstore,
{
let (bls_cids, secpk_cids) = read_msg_cids(db, bh)?;
let bls_msgs: Vec<Message> = messages_from_cids(db, &bls_cids)?;
let secp_msgs: Vec<SignedMessage> = messages_from_cids(db, &secpk_cids)?;
Ok((bls_msgs, secp_msgs))
}
pub fn block_messages_from_cids<DB>(
db: &DB,
bls_cids: &[Cid],
secp_cids: &[Cid],
) -> Result<(Vec<Message>, Vec<SignedMessage>), Error>
where
DB: Blockstore,
{
let bls_msgs: Vec<Message> = messages_from_cids(db, bls_cids)?;
let secp_msgs: Vec<SignedMessage> = messages_from_cids(db, secp_cids)?;
Ok((bls_msgs, secp_msgs))
}
pub fn read_msg_cids<DB>(
db: &DB,
block_header: &CachingBlockHeader,
) -> Result<(Vec<Cid>, Vec<Cid>), Error>
where
DB: Blockstore,
{
let msg_cid = &block_header.messages;
if let Some(roots) = db.get_cbor::<TxMeta>(msg_cid)? {
let bls_cids = read_amt_cids(db, &roots.bls_message_root)?;
let secpk_cids = read_amt_cids(db, &roots.secp_message_root)?;
Ok((bls_cids, secpk_cids))
} else {
Err(Error::UndefinedKey(format!(
"no msg root with cid {msg_cid} at epoch {} in block {}",
block_header.epoch,
block_header.cid(),
)))
}
}
pub fn persist_objects<'a, DB, C>(
db: &DB,
headers: impl Iterator<Item = &'a C>,
) -> Result<(), Error>
where
DB: Blockstore,
C: 'a + Serialize,
{
for chunk in &headers.chunks(256) {
db.bulk_put(chunk, DB::default_code())?;
}
Ok(())
}
fn read_amt_cids<DB>(db: &DB, root: &Cid) -> Result<Vec<Cid>, Error>
where
DB: Blockstore,
{
let amt = Amt::<Cid, _>::load(root, db)?;
let mut cids = Vec::with_capacity(amt.count() as usize);
amt.for_each_cacheless(|_, c| {
cids.push(*c);
Ok(())
})?;
Ok(cids)
}
pub fn get_chain_message<DB>(db: &DB, key: &Cid) -> Result<ChainMessage, Error>
where
DB: Blockstore,
{
db.get_cbor(key)?
.ok_or_else(|| Error::UndefinedKey(key.to_string()))
}
pub struct MessagesInTipsetCache {
cache: SizeTrackingLruCache<TipsetKey, Arc<Vec<ChainMessage>>>,
}
impl MessagesInTipsetCache {
pub fn new(capacity: NonZeroUsize) -> Self {
Self {
cache: SizeTrackingLruCache::new_with_metrics("msg_in_tipset".into(), capacity),
}
}
pub fn get(&self, key: &TipsetKey) -> Option<Arc<Vec<ChainMessage>>> {
self.cache.get_cloned(key)
}
pub fn get_or_insert_with<F>(
&self,
key: &TipsetKey,
f: F,
) -> anyhow::Result<Arc<Vec<ChainMessage>>>
where
F: FnOnce() -> anyhow::Result<Vec<ChainMessage>>,
{
if let Some(cached) = self.get(key) {
Ok(cached)
} else {
Ok(self.insert(key.clone(), f()?))
}
}
pub fn insert(&self, key: TipsetKey, mut value: Vec<ChainMessage>) -> Arc<Vec<ChainMessage>> {
value.shrink_to_fit();
let value = Arc::new(value);
self.cache.push(key, value.clone());
value
}
fn read_cache_size() -> NonZeroUsize {
const DEFAULT: NonZeroUsize = nonzero!(1024usize);
std::env::var("FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT)
}
}
impl Default for MessagesInTipsetCache {
fn default() -> Self {
Self::new(Self::read_cache_size())
}
}
pub fn messages_from_cids<DB, T>(db: &DB, keys: &[Cid]) -> Result<Vec<T>, Error>
where
DB: Blockstore,
T: DeserializeOwned,
{
keys.iter().map(|k| message_from_cid(db, k)).collect()
}
pub fn message_from_cid<DB, T>(db: &DB, key: &Cid) -> Result<T, Error>
where
DB: Blockstore,
T: DeserializeOwned,
{
db.get_cbor(key)?
.ok_or_else(|| Error::UndefinedKey(key.to_string()))
}
pub fn get_parent_receipt(
db: &impl Blockstore,
block_header: &CachingBlockHeader,
i: usize,
) -> Result<Option<Receipt>, Error> {
Ok(Receipt::get_receipt(
db,
&block_header.message_receipts,
i as u64,
)?)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::multihash::prelude::*;
use crate::{blocks::RawBlockHeader, shim::address::Address};
use cid::Cid;
use fvm_ipld_encoding::DAG_CBOR;
#[test]
fn genesis_test() {
let db = Arc::new(crate::db::MemoryDB::default());
let chain_config = Arc::new(ChainConfig::default());
let gen_block = CachingBlockHeader::new(RawBlockHeader {
miner_address: Address::new_id(0),
state_root: Cid::new_v1(DAG_CBOR, MultihashCode::Identity.digest(&[])),
epoch: 1,
weight: 2u32.into(),
messages: Cid::new_v1(DAG_CBOR, MultihashCode::Identity.digest(&[])),
message_receipts: Cid::new_v1(DAG_CBOR, MultihashCode::Identity.digest(&[])),
..Default::default()
});
let cs =
ChainStore::new(db.clone(), db.clone(), db, chain_config, gen_block.clone()).unwrap();
assert_eq!(cs.genesis_block_header(), &gen_block);
}
#[test]
fn block_validation_cache_basic() {
let db = Arc::new(crate::db::MemoryDB::default());
let chain_config = Arc::new(ChainConfig::default());
let gen_block = CachingBlockHeader::new(RawBlockHeader {
miner_address: Address::new_id(0),
..Default::default()
});
let cs = ChainStore::new(db.clone(), db.clone(), db, chain_config, gen_block).unwrap();
let cid = Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&[1, 2, 3]));
assert!(!cs.is_block_validated(&cid));
cs.mark_block_as_validated(&cid);
assert!(cs.is_block_validated(&cid));
}
#[test]
fn test_messages_in_tipset_cache() {
let cache = MessagesInTipsetCache::new(nonzero!(2_usize));
let key1 = TipsetKey::from(nunny::vec![Cid::new_v1(
DAG_CBOR,
MultihashCode::Blake2b256.digest(&[1])
)]);
assert!(cache.get(&key1).is_none());
let msgs = vec![Message::default().into()];
cache.insert(key1.clone(), msgs.clone());
assert_eq!(&msgs, &*cache.get(&key1).unwrap());
let inserter_executed: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
let key_inserter = || {
inserter_executed.store(true, std::sync::atomic::Ordering::Relaxed);
Ok(msgs.clone())
};
assert_eq!(
&msgs,
&*cache.get_or_insert_with(&key1, key_inserter).unwrap()
);
assert!(!inserter_executed.load(std::sync::atomic::Ordering::Relaxed));
let key2 = TipsetKey::from(nunny::vec![Cid::new_v1(
DAG_CBOR,
MultihashCode::Blake2b256.digest(&[2])
)]);
assert!(cache.get(&key2).is_none());
assert_eq!(
&msgs,
&*cache.get_or_insert_with(&key2, key_inserter).unwrap()
);
assert!(inserter_executed.load(std::sync::atomic::Ordering::Relaxed));
}
}