Skip to main content

forest/tool/subcommands/api_cmd/
generate_test_snapshot.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::*;
5use crate::chain_sync::SyncStatusReport;
6use crate::{
7    KeyStore, KeyStoreConfig,
8    blocks::{Tipset, TipsetKey},
9    chain::ChainStore,
10    chain_sync::network_context::SyncNetworkContext,
11    daemon::{bundle::load_actor_bundles, db_util::load_all_forest_cars},
12    db::{
13        CAR_DB_DIR_NAME, EthMappingsStore, HeaviestTipsetKeyProvider, MemoryDB, SettingsStore,
14        SettingsStoreExt, db_engine::open_db, parity_db::ParityDb,
15    },
16    genesis::read_genesis_header,
17    libp2p::{NetworkMessage, PeerManager},
18    libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite, Block64},
19    message_pool::{MessagePool, MpoolLocker, NonceTracker},
20    networks::ChainConfig,
21    shim::address::CurrentNetwork,
22    state_manager::StateManager,
23};
24use api_compare_tests::TestDump;
25use fvm_shared4::address::Network;
26use openrpc_types::ParamStructure;
27use parking_lot::RwLock;
28use rpc::{RPCState, RpcMethod as _, eth::filter::EthEventHandler};
29use std::sync::atomic::{AtomicBool, Ordering};
30use tokio::{sync::mpsc, task::JoinSet};
31
32pub async fn run_test_with_dump(
33    test_dump: &TestDump,
34    db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
35    chain: &NetworkChain,
36    allow_response_mismatch: bool,
37    allow_failure: bool,
38) -> anyhow::Result<()> {
39    if chain.is_testnet() {
40        CurrentNetwork::set_global(Network::Testnet);
41    }
42    let mut run = false;
43    let chain_config = Arc::new(ChainConfig::from_chain(chain));
44    let (ctx, _, _) = ctx(db, chain_config).await?;
45    let params_raw = Some(serde_json::to_string(&test_dump.request.params)?);
46    let mut ext = http::Extensions::new();
47    ext.insert(test_dump.path);
48    macro_rules! run_test {
49        ($ty:ty) => {
50            if test_dump.request.method_name.as_ref() == <$ty>::NAME
51                && <$ty>::API_PATHS.contains(test_dump.path)
52            {
53                let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?;
54                match <$ty>::handle(ctx.clone(), params, &ext).await {
55                    Ok(result) => {
56                        anyhow::ensure!(
57                            allow_response_mismatch
58                                || test_dump.forest_response == Ok(result.into_lotus_json_value()?),
59                            "Response mismatch between Forest and Lotus"
60                        );
61                    }
62                    Err(_) if allow_failure => {
63                        // If we allow failure, we do not check the error
64                    }
65                    Err(e) => {
66                        bail!("Error running test {}: {}", <$ty>::NAME, e);
67                    }
68                }
69                run = true;
70            }
71        };
72    }
73    crate::for_each_rpc_method!(run_test);
74    anyhow::ensure!(run, "RPC method not found");
75    Ok(())
76}
77
78pub async fn load_db(
79    db_root: &Path,
80    chain: Option<&NetworkChain>,
81) -> anyhow::Result<Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>> {
82    let db_writer = open_db(db_root.into(), &Default::default())?;
83    let db = ManyCar::new(db_writer);
84    let forest_car_db_dir = db_root.join(CAR_DB_DIR_NAME);
85    load_all_forest_cars(&db, &forest_car_db_dir)?;
86    if let Some(chain) = chain {
87        load_actor_bundles(&db, chain).await?;
88    }
89    Ok(Arc::new(ReadOpsTrackingStore::new(db)))
90}
91
92pub(super) fn build_index(db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>) -> Option<Index> {
93    let mut index = Index::default();
94    let reader = db.tracker.eth_mappings_db.read();
95    for (k, v) in reader.iter() {
96        index
97            .eth_mappings
98            .get_or_insert_with(ahash::HashMap::default)
99            .insert(k.to_string(), Payload(v.clone()));
100    }
101    if index == Index::default() {
102        None
103    } else {
104        Some(index)
105    }
106}
107
108async fn ctx(
109    db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
110    chain_config: Arc<ChainConfig>,
111) -> anyhow::Result<(
112    Arc<RPCState<ReadOpsTrackingStore<ManyCar<ParityDb>>>>,
113    flume::Receiver<NetworkMessage>,
114    tokio::sync::mpsc::Receiver<()>,
115)> {
116    let (network_send, network_rx) = flume::bounded(5);
117    let (tipset_send, _) = flume::bounded(5);
118    let genesis_header =
119        read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?;
120
121    let chain_store = Arc::new(
122        ChainStore::new(
123            db.clone(),
124            db.clone(),
125            db.clone(),
126            chain_config,
127            genesis_header,
128        )
129        .unwrap(),
130    );
131
132    let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap());
133    let message_pool = MessagePool::new(
134        chain_store.clone(),
135        network_send.clone(),
136        Default::default(),
137        state_manager.chain_config().clone(),
138        &mut JoinSet::new(),
139    )?;
140
141    let peer_manager = Arc::new(PeerManager::default());
142    let sync_network_context =
143        SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
144    let (shutdown, shutdown_recv) = mpsc::channel(1);
145    let nonce_tracker = NonceTracker::new();
146    let rpc_state = Arc::new(RPCState {
147        state_manager,
148        keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)),
149        mpool: Arc::new(message_pool),
150        bad_blocks: Default::default(),
151        sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
152        eth_event_handler: Arc::new(EthEventHandler::new()),
153        sync_network_context,
154        start_time: chrono::Utc::now(),
155        shutdown,
156        tipset_send,
157        snapshot_progress_tracker: Default::default(),
158        mpool_locker: MpoolLocker::new(),
159        nonce_tracker,
160        temp_dir: Arc::new(std::env::temp_dir()),
161    });
162    Ok((rpc_state, network_rx, shutdown_recv))
163}
164
165/// A [`Blockstore`] wrapper that tracks read operations to the inner [`Blockstore`] with an [`MemoryDB`]
166pub struct ReadOpsTrackingStore<T> {
167    inner: T,
168    pub tracker: Arc<MemoryDB>,
169    tracking: AtomicBool,
170}
171
172impl<T> ReadOpsTrackingStore<T> {
173    pub fn resume_tracking(&self) {
174        self.tracking.store(true, Ordering::Relaxed);
175    }
176
177    pub fn pause_tracking(&self) {
178        self.tracking.store(false, Ordering::Relaxed);
179    }
180
181    fn tracking(&self) -> bool {
182        self.tracking.load(Ordering::Relaxed)
183    }
184}
185
186impl<T> ReadOpsTrackingStore<T>
187where
188    T: Blockstore + SettingsStore + HeaviestTipsetKeyProvider,
189{
190    fn is_chain_head_tracked(&self) -> anyhow::Result<bool> {
191        SettingsStore::exists(&self.tracker, crate::db::setting_keys::HEAD_KEY)
192    }
193
194    pub fn ensure_chain_head_is_tracked(&self) -> anyhow::Result<()> {
195        if !self.is_chain_head_tracked()? {
196            SettingsStoreExt::write_obj(
197                &self.tracker,
198                crate::db::setting_keys::HEAD_KEY,
199                &self
200                    .inner
201                    .heaviest_tipset_key()?
202                    .context("heaviest tipset key not found")?,
203            )?;
204        }
205
206        Ok(())
207    }
208}
209
210impl<T> ReadOpsTrackingStore<T> {
211    pub fn new(inner: T) -> Self {
212        Self {
213            inner,
214            tracker: Arc::new(Default::default()),
215            tracking: AtomicBool::new(true),
216        }
217    }
218
219    pub async fn export_forest_car<W: tokio::io::AsyncWrite + Unpin>(
220        &self,
221        writer: &mut W,
222    ) -> anyhow::Result<()> {
223        self.tracker.export_forest_car(writer).await
224    }
225}
226
227impl<T: HeaviestTipsetKeyProvider> HeaviestTipsetKeyProvider for ReadOpsTrackingStore<T> {
228    fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
229        self.inner.heaviest_tipset_key()
230    }
231
232    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
233        self.inner.set_heaviest_tipset_key(tsk)
234    }
235}
236
237impl<T: Blockstore> Blockstore for ReadOpsTrackingStore<T> {
238    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
239        let result = self.inner.get(k)?;
240        if self.tracking()
241            && let Some(v) = &result
242        {
243            self.tracker.put_keyed(k, v.as_slice())?;
244        }
245        Ok(result)
246    }
247
248    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
249        self.inner.put_keyed(k, block)
250    }
251}
252
253impl<T: SettingsStore> SettingsStore for ReadOpsTrackingStore<T> {
254    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
255        let result = self.inner.read_bin(key)?;
256        if self.tracking()
257            && let Some(v) = &result
258        {
259            SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
260        }
261        Ok(result)
262    }
263
264    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
265        self.inner.write_bin(key, value)
266    }
267
268    fn exists(&self, key: &str) -> anyhow::Result<bool> {
269        let result = self.inner.read_bin(key)?;
270        if self.tracking()
271            && let Some(v) = &result
272        {
273            SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
274        }
275        Ok(result.is_some())
276    }
277
278    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
279        self.inner.setting_keys()
280    }
281}
282
283impl<T: BitswapStoreRead> BitswapStoreRead for ReadOpsTrackingStore<T> {
284    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
285        let result = self.inner.get(cid)?;
286        if self.tracking()
287            && let Some(v) = &result
288        {
289            Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
290        }
291        Ok(result.is_some())
292    }
293
294    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
295        let result = self.inner.get(cid)?;
296        if self.tracking()
297            && let Some(v) = &result
298        {
299            Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
300        }
301        Ok(result)
302    }
303}
304
305impl<T: BitswapStoreReadWrite> BitswapStoreReadWrite for ReadOpsTrackingStore<T> {
306    type Hashes = <T as BitswapStoreReadWrite>::Hashes;
307
308    fn insert(&self, block: &Block64<Self::Hashes>) -> anyhow::Result<()> {
309        self.inner.insert(block)
310    }
311}
312
313impl<T: EthMappingsStore> EthMappingsStore for ReadOpsTrackingStore<T> {
314    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
315        let result = self.inner.read_bin(key)?;
316        if self.tracking()
317            && let Some(v) = &result
318        {
319            EthMappingsStore::write_bin(&self.tracker, key, v.as_slice())?;
320        }
321        Ok(result)
322    }
323
324    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
325        self.inner.write_bin(key, value)
326    }
327
328    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
329        self.inner.exists(key)
330    }
331
332    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
333        self.inner.get_message_cids()
334    }
335
336    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
337        self.inner.delete(keys)
338    }
339
340    fn tipset_key_by_epoch(&self, epoch: i64) -> anyhow::Result<Option<TipsetKey>> {
341        self.inner.tipset_key_by_epoch(epoch)
342    }
343
344    fn set_tipset_key_at_epoch(&self, ts: &Tipset) -> anyhow::Result<()> {
345        self.inner.set_tipset_key_at_epoch(ts)
346    }
347}