use super::{
Error,
index::{ChainIndex, ResolveNullTipset},
tipset_tracker::TipsetTracker,
};
use crate::networks::{ChainConfig, Height};
use crate::prelude::*;
use crate::rpc::{
chain::ChainGetTipSetFinalityStatus,
eth::{eth_tx_from_signed_eth_message, types::EthHash},
};
use crate::shim::clock::ChainEpoch;
use crate::shim::{executor::Receipt, message::Message, version::NetworkVersion};
use crate::state_manager::ExecutedTipset;
use crate::utils::db::{BlockstoreExt, CborStoreExt};
use crate::{
blocks::{CachingBlockHeader, Tipset, TipsetKey, TxMeta},
db::{DbImpl, HeaviestTipsetKeyProvider},
message::{ChainMessage, SignedMessage},
};
use crate::{db::EthMappingsStoreExt, rpc::chain::PathChange};
use crate::{fil_cns, utils::cache::SizeTrackingCache};
use crate::{
interpreter::{BlockMessages, VMTrace},
rpc::chain::PathChanges,
};
use ahash::HashMap;
use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
use fvm_ipld_encoding::CborStore;
use nonzero_ext::nonzero;
use parking_lot::RwLock;
use serde::{Serialize, de::DeserializeOwned};
use std::{
num::NonZeroUsize,
sync::atomic::{self, AtomicI64},
};
use tokio::sync::broadcast;
use tracing::{debug, error, trace, warn};
const SINK_CAP: usize = 200;
const VALIDATED_BLOCKS_CACHE_SIZE: NonZeroUsize = nonzero!(14400usize);
pub type ChainEpochDelta = ChainEpoch;
pub type HeadChange = PathChange<Tipset>;
pub type HeadChanges = PathChanges<Tipset>;
pub struct ChainStore {
head_changes_tx: broadcast::Sender<HeadChanges>,
heaviest_tipset: Arc<RwLock<Tipset>>,
f3_finalized_tipset: Arc<RwLock<Option<Tipset>>>,
ec_calculator_finalized_epoch: Arc<AtomicI64>,
chain_index: ChainIndex,
tipset_tracker: TipsetTracker<DbImpl>,
genesis_block_header: Arc<CachingBlockHeader>,
pub(crate) validated_blocks: SizeTrackingCache<CidWrapper, ()>,
chain_config: Arc<ChainConfig>,
messages_in_tipset_cache: MessagesInTipsetCache,
}
impl ShallowClone for ChainStore {
fn shallow_clone(&self) -> Self {
Self {
head_changes_tx: self.head_changes_tx.clone(),
heaviest_tipset: self.heaviest_tipset.shallow_clone(),
f3_finalized_tipset: self.f3_finalized_tipset.shallow_clone(),
ec_calculator_finalized_epoch: self.ec_calculator_finalized_epoch.shallow_clone(),
chain_index: self.chain_index.shallow_clone(),
tipset_tracker: self.tipset_tracker.shallow_clone(),
genesis_block_header: self.genesis_block_header.shallow_clone(),
validated_blocks: self.validated_blocks.shallow_clone(),
chain_config: self.chain_config.shallow_clone(),
messages_in_tipset_cache: self.messages_in_tipset_cache.shallow_clone(),
}
}
}
impl ChainStore {
pub fn new(
db: impl Into<DbImpl>,
chain_config: Arc<ChainConfig>,
genesis_block_header: CachingBlockHeader,
) -> anyhow::Result<Self> {
let db = db.into();
let (publisher, _) = broadcast::channel(SINK_CAP);
let head = if let Some(head_tsk) = db
.heaviest_tipset_key()
.context("failed to load head tipset key")?
{
Tipset::load_required(&db, &head_tsk)
.with_context(|| format!("failed to load head tipset with key {head_tsk}"))?
} else {
Tipset::from(&genesis_block_header)
};
let heaviest_tipset = Arc::new(RwLock::new(head.shallow_clone()));
let f3_finalized_tipset: Arc<RwLock<Option<Tipset>>> = Default::default();
let chain_index = ChainIndex::new(db.shallow_clone());
let ec_calculator_finalized_epoch = Arc::new(AtomicI64::new(
ChainGetTipSetFinalityStatus::get_ec_finality_epoch(&chain_index, &chain_config, &head),
));
let chain_index = chain_index.with_is_tipset_finalized(Arc::new({
let f3_finalized_tipset = f3_finalized_tipset.shallow_clone();
let ec_calculator_finalized_epoch = ec_calculator_finalized_epoch.shallow_clone();
move |ts| {
let finalized = f3_finalized_tipset
.read()
.as_ref()
.map(|ts| ts.epoch())
.unwrap_or_default()
.max(ec_calculator_finalized_epoch.load(atomic::Ordering::Acquire));
ts.epoch() <= finalized
}
}));
Ok(Self {
head_changes_tx: publisher,
chain_index,
tipset_tracker: TipsetTracker::new(db, chain_config.clone()),
heaviest_tipset,
f3_finalized_tipset,
ec_calculator_finalized_epoch,
genesis_block_header: genesis_block_header.into(),
validated_blocks: SizeTrackingCache::new_with_metrics(
"validated_blocks",
VALIDATED_BLOCKS_CACHE_SIZE,
),
chain_config,
messages_in_tipset_cache: Default::default(),
})
}
pub fn set_f3_finalized_tipset(&self, ts: Tipset) {
self.f3_finalized_tipset.write().replace(ts);
}
pub fn f3_finalized_tipset(&self) -> Option<Tipset> {
self.f3_finalized_tipset.read().clone()
}
pub fn ec_calculator_finalized_epoch(&self) -> ChainEpoch {
self.ec_calculator_finalized_epoch
.load(atomic::Ordering::Acquire)
}
pub fn messages_in_tipset_cache(&self) -> &MessagesInTipsetCache {
&self.messages_in_tipset_cache
}
pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> {
head.key().save(self.db())?;
self.db().set_heaviest_tipset_key(head.key())?;
let old_head = std::mem::replace(&mut *self.heaviest_tipset.write(), head.shallow_clone());
self.ec_calculator_finalized_epoch.store(
ChainGetTipSetFinalityStatus::get_ec_finality_epoch(
self.chain_index(),
self.chain_config(),
&head,
),
atomic::Ordering::Release,
);
if crate::utils::broadcast::has_subscribers(&self.head_changes_tx) {
let changes = match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key())
{
Ok(changes) => changes,
Err(e) => {
if old_head.epoch() > 0 {
error!("failed to get chain path changes: {e:#}");
}
PathChanges {
applies: vec![head],
reverts: vec![],
}
}
};
if self.head_changes_tx.send(changes).is_err() {
debug!("did not publish changes, no active receivers");
}
}
Ok(())
}
pub fn add_to_tipset_tracker(&self, header: &CachingBlockHeader) {
self.tipset_tracker.add(header);
}
pub fn put_tipset(&self, ts: &Tipset) -> Result<(), Error> {
persist_objects(self.db(), ts.block_headers().iter())?;
let expanded = self.expand_tipset(ts.min_ticket_block().clone())?;
self.update_heaviest(expanded)?;
Ok(())
}
pub fn get_required_tipset_key(&self, hash: &EthHash) -> Result<TipsetKey, Error> {
Ok(TipsetKey::load(self.db(), &hash.to_cid())?)
}
pub fn put_mapping(&self, k: EthHash, v: Cid, timestamp: u64) -> Result<(), Error> {
self.db().write_obj(&k, &(v, timestamp))?;
Ok(())
}
pub fn get_mapping(&self, hash: &EthHash) -> Result<Option<Cid>, Error> {
Ok(self.db().read_obj::<(Cid, u64)>(hash)?.map(|(cid, _)| cid))
}
fn expand_tipset(&self, header: CachingBlockHeader) -> Result<Tipset, Error> {
self.tipset_tracker.expand(header)
}
pub fn genesis_block_header(&self) -> &CachingBlockHeader {
&self.genesis_block_header
}
pub fn heaviest_tipset(&self) -> Tipset {
self.heaviest_tipset.read().clone()
}
pub fn genesis_tipset(&self) -> Tipset {
Tipset::from(self.genesis_block_header())
}
pub fn subscribe_head_changes(&self) -> broadcast::Receiver<HeadChanges> {
self.head_changes_tx.subscribe()
}
pub fn db(&self) -> &DbImpl {
self.chain_index().db()
}
pub fn db_owned(&self) -> DbImpl {
self.chain_index().db_owned()
}
pub fn chain_index(&self) -> &ChainIndex {
&self.chain_index
}
pub fn chain_config(&self) -> &Arc<ChainConfig> {
&self.chain_config
}
#[tracing::instrument(skip_all)]
pub fn load_required_tipset_or_heaviest<'a>(
&self,
maybe_key: impl Into<Option<&'a TipsetKey>>,
) -> Result<Tipset, Error> {
match maybe_key.into() {
Some(key) => self.chain_index.load_required_tipset(key),
None => Ok(self.heaviest_tipset()),
}
}
pub fn load_child_tipset(&self, ts: &Tipset) -> Result<Option<Tipset>, Error> {
let head = self.heaviest_tipset();
if head.parents() == ts.key() {
Ok(Some(head))
} else if head.epoch() > ts.epoch() {
match self.chain_index().tipset_by_height(
ts.epoch() + 1,
head,
ResolveNullTipset::TakeNewer,
)? {
Some(maybe_child) if maybe_child.parents() == ts.key() => Ok(Some(maybe_child)),
_ => Ok(None),
}
} else {
Ok(None)
}
}
fn update_heaviest(&self, ts: Tipset) -> Result<(), Error> {
let heaviest_weight = fil_cns::weight(self.db(), &self.heaviest_tipset())?;
let new_weight = fil_cns::weight(self.db(), &ts)?;
let curr_weight = heaviest_weight;
if new_weight > curr_weight {
self.set_heaviest_tipset(ts)?;
}
Ok(())
}
pub fn is_block_validated(&self, cid: &Cid) -> bool {
let validated = self.validated_blocks.get(cid).is_some();
if validated {
trace!("Block {cid} was previously validated");
}
validated
}
pub fn mark_block_as_validated(&self, cid: &Cid) {
self.validated_blocks.insert((*cid).into(), ());
}
pub fn unmark_block_as_validated(&self, cid: &Cid) {
self.validated_blocks.remove(cid);
}
pub fn messages_for_tipset(&self, ts: &Tipset) -> Result<Arc<Vec<ChainMessage>>, Error> {
Ok(self
.messages_in_tipset_cache()
.get_or_insert_with(ts.key(), || {
let bmsgs = BlockMessages::for_tipset(self.db(), ts)?;
anyhow::Ok(
bmsgs
.into_iter()
.flat_map(|bm| bm.messages)
.collect_vec()
.into(),
)
})?)
}
pub fn get_lookback_tipset_for_round(
chain_index: &ChainIndex,
chain_config: &Arc<ChainConfig>,
heaviest_tipset: &Tipset,
round: ChainEpoch,
) -> Result<(Tipset, Cid), Error> {
let version = chain_config.network_version(round);
let lb = if version <= NetworkVersion::V3 {
ChainEpoch::from(10)
} else {
chain_config.policy.chain_finality
};
let lbr = (round - lb).max(0);
if lbr >= heaviest_tipset.epoch() {
let genesis_timestamp = heaviest_tipset.genesis(chain_index.db())?.timestamp;
let beacon = Arc::new(chain_config.get_beacon_schedule(genesis_timestamp));
let ExecutedTipset { state_root, .. } = crate::state_manager::apply_block_messages(
genesis_timestamp,
chain_index.shallow_clone(),
chain_config.shallow_clone(),
beacon,
&crate::shim::machine::GLOBAL_MULTI_ENGINE,
heaviest_tipset.clone(),
crate::state_manager::NO_CALLBACK,
VMTrace::NotTraced,
)
.map_err(|e| Error::Other(e.to_string()))?;
return Ok((heaviest_tipset.clone(), state_root));
}
let next_ts = chain_index
.load_required_tipset_by_height(
lbr + 1,
heaviest_tipset.clone(),
ResolveNullTipset::TakeNewer,
)
.map_err(|e| Error::Other(format!("Could not get tipset by height {e:?}")))?;
if lbr > next_ts.epoch() {
return Err(Error::Other(format!(
"failed to find non-null tipset {:?} {} which is known to exist, found {:?} {}",
heaviest_tipset.key(),
heaviest_tipset.epoch(),
next_ts.key(),
next_ts.epoch()
)));
}
let lbts = chain_index
.load_required_tipset(next_ts.parents())
.map_err(|e| Error::Other(format!("Could not get tipset from keys {e:?}")))?;
Ok((lbts, *next_ts.parent_state()))
}
pub fn process_signed_messages(&self, messages: &[(SignedMessage, u64)]) -> anyhow::Result<()> {
let eth_txs: Vec<(EthHash, Cid, u64, usize)> = messages
.iter()
.enumerate()
.filter_map(|(i, (smsg, timestamp))| {
if let Ok((_, tx)) =
eth_tx_from_signed_eth_message(smsg, self.chain_config.eth_chain_id)
{
if let Ok(hash) = tx.eth_hash() {
Some((hash.into(), smsg.cid(), *timestamp, i))
} else {
None
}
} else {
None
}
})
.collect();
let filtered = filter_lowest_index(eth_txs);
let num_entries = filtered.len();
for (k, v, timestamp) in filtered.into_iter() {
tracing::trace!("Insert mapping {} => {}", k, v);
self.put_mapping(k, v, timestamp)?;
}
tracing::trace!("Wrote {} entries in Ethereum mapping", num_entries);
Ok(())
}
pub fn headers_delegated_messages<'a>(
&self,
headers: impl Iterator<Item = &'a CachingBlockHeader>,
) -> anyhow::Result<Vec<(SignedMessage, u64)>> {
let mut delegated_messages = vec![];
let filtered_headers =
headers.filter(|bh| bh.epoch >= self.chain_config.epoch(Height::Hygge));
for bh in filtered_headers {
if let Ok((_, secp_cids)) = block_messages(self.db(), bh) {
let mut messages: Vec<_> = secp_cids
.into_iter()
.filter(|msg| msg.is_delegated())
.map(|m| (m, bh.timestamp))
.collect();
delegated_messages.append(&mut messages);
}
}
Ok(delegated_messages)
}
}
fn filter_lowest_index(values: Vec<(EthHash, Cid, u64, usize)>) -> Vec<(EthHash, Cid, u64)> {
let map: HashMap<EthHash, (Cid, u64, usize)> = values.into_iter().fold(
HashMap::default(),
|mut acc, (hash, cid, timestamp, index)| {
acc.entry(hash)
.and_modify(|&mut (_, _, ref mut min_index)| {
if index < *min_index {
*min_index = index;
}
})
.or_insert((cid, timestamp, index));
acc
},
);
map.into_iter()
.map(|(hash, (cid, timestamp, _))| (hash, cid, timestamp))
.collect()
}
pub fn block_messages<DB>(
db: &DB,
bh: &CachingBlockHeader,
) -> Result<(Vec<Message>, Vec<SignedMessage>), Error>
where
DB: Blockstore,
{
let (bls_cids, secpk_cids) = read_msg_cids(db, bh)?;
let bls_msgs: Vec<Message> = messages_from_cids(db, &bls_cids)?;
let secp_msgs: Vec<SignedMessage> = messages_from_cids(db, &secpk_cids)?;
Ok((bls_msgs, secp_msgs))
}
pub fn block_messages_from_cids<DB>(
db: &DB,
bls_cids: &[Cid],
secp_cids: &[Cid],
) -> Result<(Vec<Message>, Vec<SignedMessage>), Error>
where
DB: Blockstore,
{
let bls_msgs: Vec<Message> = messages_from_cids(db, bls_cids)?;
let secp_msgs: Vec<SignedMessage> = messages_from_cids(db, secp_cids)?;
Ok((bls_msgs, secp_msgs))
}
pub fn read_msg_cids<DB>(
db: &DB,
block_header: &CachingBlockHeader,
) -> Result<(Vec<Cid>, Vec<Cid>), Error>
where
DB: Blockstore,
{
let msg_cid = &block_header.messages;
if let Some(roots) = db.get_cbor::<TxMeta>(msg_cid)? {
let bls_cids = read_amt_cids(db, &roots.bls_message_root)?;
let secpk_cids = read_amt_cids(db, &roots.secp_message_root)?;
Ok((bls_cids, secpk_cids))
} else {
Err(Error::UndefinedKey(format!(
"no msg root with cid {msg_cid} at epoch {} in block {}",
block_header.epoch,
block_header.cid(),
)))
}
}
pub fn persist_objects<'a, DB, C>(
db: &DB,
headers: impl Iterator<Item = &'a C>,
) -> Result<(), Error>
where
DB: Blockstore,
C: 'a + Serialize,
{
for chunk in &headers.chunks(256) {
db.bulk_put(chunk, DB::default_code())?;
}
Ok(())
}
fn read_amt_cids<DB>(db: &DB, root: &Cid) -> Result<Vec<Cid>, Error>
where
DB: Blockstore,
{
let amt = Amt::<Cid, _>::load(root, db)?;
let mut cids = Vec::with_capacity(amt.count() as usize);
amt.for_each_cacheless(|_, c| {
cids.push(*c);
Ok(())
})?;
Ok(cids)
}
pub fn get_chain_message<DB>(db: &DB, key: &Cid) -> Result<ChainMessage, Error>
where
DB: Blockstore,
{
db.get_cbor(key)?
.ok_or_else(|| Error::UndefinedKey(key.to_string()))
}
#[derive(derive_more::Deref)]
pub struct MessagesInTipsetCache(SizeTrackingCache<TipsetKey, Arc<Vec<ChainMessage>>>);
impl MessagesInTipsetCache {
pub fn new(capacity: NonZeroUsize) -> Self {
Self(SizeTrackingCache::new_with_metrics(
"msg_in_tipset",
capacity,
))
}
fn read_cache_size() -> NonZeroUsize {
const DEFAULT: NonZeroUsize = nonzero!(1024usize);
std::env::var("FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT)
}
}
impl Default for MessagesInTipsetCache {
fn default() -> Self {
Self::new(Self::read_cache_size())
}
}
impl ShallowClone for MessagesInTipsetCache {
fn shallow_clone(&self) -> Self {
Self(self.deref().shallow_clone())
}
}
pub fn messages_from_cids<DB, T>(db: &DB, keys: &[Cid]) -> Result<Vec<T>, Error>
where
DB: Blockstore,
T: DeserializeOwned,
{
keys.iter().map(|k| message_from_cid(db, k)).collect()
}
pub fn message_from_cid<DB, T>(db: &DB, key: &Cid) -> Result<T, Error>
where
DB: Blockstore,
T: DeserializeOwned,
{
db.get_cbor(key)?
.ok_or_else(|| Error::UndefinedKey(key.to_string()))
}
pub fn get_parent_receipt(
db: &impl Blockstore,
block_header: &CachingBlockHeader,
i: usize,
) -> Result<Option<Receipt>, Error> {
Ok(Receipt::get_receipt(
db,
&block_header.message_receipts,
i as u64,
)?)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::multihash::prelude::*;
use crate::{blocks::RawBlockHeader, shim::address::Address};
use fvm_ipld_encoding::DAG_CBOR;
#[test]
fn genesis_test() {
let db = Arc::new(crate::db::MemoryDB::default());
let chain_config = Arc::new(ChainConfig::default());
let gen_block = CachingBlockHeader::new(RawBlockHeader {
miner_address: Address::new_id(0),
state_root: Cid::new_v1(DAG_CBOR, MultihashCode::Identity.digest(&[])),
epoch: 1,
weight: 2u32.into(),
messages: Cid::new_v1(DAG_CBOR, MultihashCode::Identity.digest(&[])),
message_receipts: Cid::new_v1(DAG_CBOR, MultihashCode::Identity.digest(&[])),
..Default::default()
});
let cs = ChainStore::new(db, chain_config, gen_block.clone()).unwrap();
assert_eq!(cs.genesis_block_header(), &gen_block);
}
#[test]
fn block_validation_cache_basic() {
let db = DbImpl::from(Arc::new(crate::db::MemoryDB::default()));
let chain_config = Arc::new(ChainConfig::default());
let gen_block = CachingBlockHeader::new(RawBlockHeader {
miner_address: Address::new_id(0),
..Default::default()
});
let cs = ChainStore::new(db, chain_config, gen_block).unwrap();
let cid = Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&[1, 2, 3]));
assert!(!cs.is_block_validated(&cid));
cs.mark_block_as_validated(&cid);
assert!(cs.is_block_validated(&cid));
}
#[test]
fn test_messages_in_tipset_cache() {
let cache = MessagesInTipsetCache::new(nonzero!(2_usize));
let key1 = TipsetKey::from(nunny::vec![Cid::new_v1(
DAG_CBOR,
MultihashCode::Blake2b256.digest(&[1])
)]);
assert!(cache.get(&key1).is_none());
let msgs = Arc::new(vec![Message::default().into()]);
cache.insert(key1.clone(), msgs.clone());
assert_eq!(&msgs, &cache.get(&key1).unwrap());
let inserter_executed: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
let key_inserter = || {
inserter_executed.store(true, std::sync::atomic::Ordering::Relaxed);
anyhow::Ok(msgs.clone())
};
assert_eq!(
&msgs,
&cache.get_or_insert_with(&key1, key_inserter).unwrap()
);
assert!(!inserter_executed.load(std::sync::atomic::Ordering::Relaxed));
let key2 = TipsetKey::from(nunny::vec![Cid::new_v1(
DAG_CBOR,
MultihashCode::Blake2b256.digest(&[2])
)]);
assert!(cache.get(&key2).is_none());
assert_eq!(
&msgs,
&cache.get_or_insert_with(&key2, key_inserter).unwrap()
);
assert!(inserter_executed.load(std::sync::atomic::Ordering::Relaxed));
}
}