forest/chain/store/
tipset_tracker.rs1use std::{collections::BTreeMap, sync::Arc};
5
6use super::Error;
7use crate::blocks::{CachingBlockHeader, Tipset};
8use crate::networks::ChainConfig;
9use crate::shim::clock::ChainEpoch;
10use cid::Cid;
11use fvm_ipld_blockstore::Blockstore;
12use nunny::vec as nonempty;
13use parking_lot::Mutex;
14use tracing::{debug, warn};
15
16#[derive(Default)]
18pub(in crate::chain) struct TipsetTracker<DB> {
19 entries: Mutex<BTreeMap<ChainEpoch, Vec<Cid>>>,
20 db: Arc<DB>,
21 chain_config: Arc<ChainConfig>,
22}
23
24impl<DB: Blockstore> TipsetTracker<DB> {
25 pub fn new(db: Arc<DB>, chain_config: Arc<ChainConfig>) -> Self {
26 Self {
27 entries: Default::default(),
28 db,
29 chain_config,
30 }
31 }
32
33 pub fn add(&self, header: &CachingBlockHeader) {
35 let mut map_lock = self.entries.lock();
36 let cids = map_lock.entry(header.epoch).or_default();
37 if cids.contains(header.cid()) {
38 debug!("tried to add block to tipset tracker that was already there");
39 return;
40 }
41 let cids_to_verify = cids.to_owned();
42 cids.push(*header.cid());
43 drop(map_lock);
44
45 self.check_multiple_blocks_from_same_miner(&cids_to_verify, header);
46 self.prune_entries(header.epoch);
47 }
48
49 fn check_multiple_blocks_from_same_miner(&self, cids: &[Cid], header: &CachingBlockHeader) {
56 for cid in cids.iter() {
57 if let Ok(Some(block)) = CachingBlockHeader::load(&self.db, *cid)
58 && header.miner_address == block.miner_address
59 {
60 warn!(
61 "Have multiple blocks from miner {} at height {} in our tipset cache {}-{}",
62 header.miner_address,
63 header.epoch,
64 header.cid(),
65 cid
66 );
67 }
68 }
69 }
70
71 fn prune_entries(&self, header_epoch: ChainEpoch) {
74 let cut_off_epoch = header_epoch - self.chain_config.policy.chain_finality;
75 let mut entries = self.entries.lock();
76 let mut finality_entries = entries.split_off(&cut_off_epoch);
77 debug!(
78 "Cleared {} entries, cut off at {}",
79 entries.len(),
80 cut_off_epoch,
81 );
82 std::mem::swap(&mut finality_entries, &mut entries);
83 }
84
85 pub fn expand(&self, header: CachingBlockHeader) -> Result<Tipset, Error> {
88 let epoch = header.epoch;
89 let mut headers = nonempty![header];
90
91 if let Some(entries) = self.entries.lock().get(&epoch).cloned() {
92 for cid in entries {
93 if &cid == headers.first().cid() {
94 continue;
95 }
96
97 let h = CachingBlockHeader::load(&self.db, cid)
98 .ok()
99 .flatten()
100 .ok_or_else(|| {
101 Error::Other(format!("failed to load block ({cid}) for tipset expansion"))
102 })?;
103
104 if h.parents == headers.first().parents {
105 headers.push(h);
106 }
107 }
108 }
109
110 let ts = Tipset::new(headers)?;
111 Ok(ts)
112 }
113}
114
115#[cfg(test)]
116mod test {
117 use crate::db::MemoryDB;
118
119 use super::*;
120
121 #[test]
122 fn ensure_tipset_is_bounded() {
123 let db = MemoryDB::default();
124 let chain_config = Arc::new(ChainConfig::default());
125
126 let head_epoch = 2023;
127
128 let entries = BTreeMap::from([
129 (head_epoch - chain_config.policy.chain_finality - 3, vec![]),
130 (head_epoch - chain_config.policy.chain_finality - 1, vec![]),
131 (head_epoch - chain_config.policy.chain_finality, vec![]),
132 (head_epoch - chain_config.policy.chain_finality + 1, vec![]),
133 (head_epoch - chain_config.policy.chain_finality + 3, vec![]),
134 ]);
135 let tipset_tracker = TipsetTracker {
136 db: Arc::new(db),
137 chain_config: chain_config.clone(),
138 entries: Mutex::new(entries),
139 };
140
141 tipset_tracker.prune_entries(head_epoch);
142
143 let keys = tipset_tracker
144 .entries
145 .lock()
146 .keys()
147 .cloned()
148 .collect::<Vec<_>>();
149
150 assert_eq!(
151 keys,
152 vec![
153 head_epoch - chain_config.policy.chain_finality,
154 head_epoch - chain_config.policy.chain_finality + 1,
155 head_epoch - chain_config.policy.chain_finality + 3,
156 ]
157 );
158 }
159}