Skip to main content

forest/tool/subcommands/api_cmd/
generate_test_snapshot.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::*;
5use crate::chain_sync::SyncStatusReport;
6use crate::daemon::bundle::load_actor_bundles;
7use crate::{
8    KeyStore, KeyStoreConfig,
9    blocks::TipsetKey,
10    chain::ChainStore,
11    chain_sync::network_context::SyncNetworkContext,
12    daemon::db_util::load_all_forest_cars,
13    db::{
14        CAR_DB_DIR_NAME, EthMappingsStore, HeaviestTipsetKeyProvider, MemoryDB, SettingsStore,
15        SettingsStoreExt, db_engine::open_db, parity_db::ParityDb,
16    },
17    genesis::read_genesis_header,
18    libp2p::{NetworkMessage, PeerManager},
19    libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite, Block64},
20    message_pool::{MessagePool, MpoolLocker, NonceTracker},
21    networks::ChainConfig,
22    shim::address::CurrentNetwork,
23    state_manager::StateManager,
24};
25use api_compare_tests::TestDump;
26use fvm_shared4::address::Network;
27use openrpc_types::ParamStructure;
28use parking_lot::RwLock;
29use rpc::{RPCState, RpcMethod as _, eth::filter::EthEventHandler};
30use std::sync::atomic::{AtomicBool, Ordering};
31use tokio::{sync::mpsc, task::JoinSet};
32
33pub async fn run_test_with_dump(
34    test_dump: &TestDump,
35    db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
36    chain: &NetworkChain,
37    allow_response_mismatch: bool,
38    allow_failure: bool,
39) -> anyhow::Result<()> {
40    if chain.is_testnet() {
41        CurrentNetwork::set_global(Network::Testnet);
42    }
43    let mut run = false;
44    let chain_config = Arc::new(ChainConfig::from_chain(chain));
45    let (ctx, _, _) = ctx(db, chain_config).await?;
46    let params_raw = Some(serde_json::to_string(&test_dump.request.params)?);
47    let mut ext = http::Extensions::new();
48    ext.insert(test_dump.path);
49    macro_rules! run_test {
50        ($ty:ty) => {
51            if test_dump.request.method_name.as_ref() == <$ty>::NAME
52                && <$ty>::API_PATHS.contains(test_dump.path)
53            {
54                let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?;
55                match <$ty>::handle(ctx.clone(), params, &ext).await {
56                    Ok(result) => {
57                        anyhow::ensure!(
58                            allow_response_mismatch
59                                || test_dump.forest_response == Ok(result.into_lotus_json_value()?),
60                            "Response mismatch between Forest and Lotus"
61                        );
62                    }
63                    Err(_) if allow_failure => {
64                        // If we allow failure, we do not check the error
65                    }
66                    Err(e) => {
67                        bail!("Error running test {}: {}", <$ty>::NAME, e);
68                    }
69                }
70                run = true;
71            }
72        };
73    }
74    crate::for_each_rpc_method!(run_test);
75    anyhow::ensure!(run, "RPC method not found");
76    Ok(())
77}
78
79pub async fn load_db(
80    db_root: &Path,
81    chain: Option<&NetworkChain>,
82) -> anyhow::Result<Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>> {
83    let db_writer = open_db(db_root.into(), &Default::default())?;
84    let db = ManyCar::new(db_writer);
85    let forest_car_db_dir = db_root.join(CAR_DB_DIR_NAME);
86    load_all_forest_cars(&db, &forest_car_db_dir)?;
87    if let Some(chain) = chain {
88        load_actor_bundles(&db, chain).await?;
89    }
90    Ok(Arc::new(ReadOpsTrackingStore::new(db)))
91}
92
93pub(super) fn build_index(db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>) -> Option<Index> {
94    let mut index = Index::default();
95    let reader = db.tracker.eth_mappings_db.read();
96    for (k, v) in reader.iter() {
97        index
98            .eth_mappings
99            .get_or_insert_with(ahash::HashMap::default)
100            .insert(k.to_string(), Payload(v.clone()));
101    }
102    if index == Index::default() {
103        None
104    } else {
105        Some(index)
106    }
107}
108
109async fn ctx(
110    db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
111    chain_config: Arc<ChainConfig>,
112) -> anyhow::Result<(
113    Arc<RPCState<ReadOpsTrackingStore<ManyCar<ParityDb>>>>,
114    flume::Receiver<NetworkMessage>,
115    tokio::sync::mpsc::Receiver<()>,
116)> {
117    let (network_send, network_rx) = flume::bounded(5);
118    let (tipset_send, _) = flume::bounded(5);
119    let genesis_header =
120        read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?;
121
122    let chain_store = Arc::new(
123        ChainStore::new(
124            db.clone(),
125            db.clone(),
126            db.clone(),
127            chain_config,
128            genesis_header,
129        )
130        .unwrap(),
131    );
132
133    let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap());
134    let message_pool = MessagePool::new(
135        chain_store.clone(),
136        network_send.clone(),
137        Default::default(),
138        state_manager.chain_config().clone(),
139        &mut JoinSet::new(),
140    )?;
141
142    let peer_manager = Arc::new(PeerManager::default());
143    let sync_network_context =
144        SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
145    let (shutdown, shutdown_recv) = mpsc::channel(1);
146    let nonce_tracker = NonceTracker::new();
147    let rpc_state = Arc::new(RPCState {
148        state_manager,
149        keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)),
150        mpool: Arc::new(message_pool),
151        bad_blocks: Default::default(),
152        sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
153        eth_event_handler: Arc::new(EthEventHandler::new()),
154        sync_network_context,
155        start_time: chrono::Utc::now(),
156        shutdown,
157        tipset_send,
158        snapshot_progress_tracker: Default::default(),
159        mpool_locker: MpoolLocker::new(),
160        nonce_tracker,
161        temp_dir: Arc::new(std::env::temp_dir()),
162    });
163    Ok((rpc_state, network_rx, shutdown_recv))
164}
165
166/// A [`Blockstore`] wrapper that tracks read operations to the inner [`Blockstore`] with an [`MemoryDB`]
167pub struct ReadOpsTrackingStore<T> {
168    inner: T,
169    pub tracker: Arc<MemoryDB>,
170    tracking: AtomicBool,
171}
172
173impl<T> ReadOpsTrackingStore<T> {
174    pub fn resume_tracking(&self) {
175        self.tracking.store(true, Ordering::Relaxed);
176    }
177
178    pub fn pause_tracking(&self) {
179        self.tracking.store(false, Ordering::Relaxed);
180    }
181
182    fn tracking(&self) -> bool {
183        self.tracking.load(Ordering::Relaxed)
184    }
185}
186
187impl<T> ReadOpsTrackingStore<T>
188where
189    T: Blockstore + SettingsStore + HeaviestTipsetKeyProvider,
190{
191    fn is_chain_head_tracked(&self) -> anyhow::Result<bool> {
192        SettingsStore::exists(&self.tracker, crate::db::setting_keys::HEAD_KEY)
193    }
194
195    pub fn ensure_chain_head_is_tracked(&self) -> anyhow::Result<()> {
196        if !self.is_chain_head_tracked()? {
197            SettingsStoreExt::write_obj(
198                &self.tracker,
199                crate::db::setting_keys::HEAD_KEY,
200                &self
201                    .inner
202                    .heaviest_tipset_key()?
203                    .context("heaviest tipset key not found")?,
204            )?;
205        }
206
207        Ok(())
208    }
209}
210
211impl<T> ReadOpsTrackingStore<T> {
212    pub fn new(inner: T) -> Self {
213        Self {
214            inner,
215            tracker: Arc::new(Default::default()),
216            tracking: AtomicBool::new(true),
217        }
218    }
219
220    pub async fn export_forest_car<W: tokio::io::AsyncWrite + Unpin>(
221        &self,
222        writer: &mut W,
223    ) -> anyhow::Result<()> {
224        self.tracker.export_forest_car(writer).await
225    }
226}
227
228impl<T: HeaviestTipsetKeyProvider> HeaviestTipsetKeyProvider for ReadOpsTrackingStore<T> {
229    fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
230        self.inner.heaviest_tipset_key()
231    }
232
233    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
234        self.inner.set_heaviest_tipset_key(tsk)
235    }
236}
237
238impl<T: Blockstore> Blockstore for ReadOpsTrackingStore<T> {
239    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
240        let result = self.inner.get(k)?;
241        if self.tracking()
242            && let Some(v) = &result
243        {
244            self.tracker.put_keyed(k, v.as_slice())?;
245        }
246        Ok(result)
247    }
248
249    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
250        self.inner.put_keyed(k, block)
251    }
252}
253
254impl<T: SettingsStore> SettingsStore for ReadOpsTrackingStore<T> {
255    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
256        let result = self.inner.read_bin(key)?;
257        if self.tracking()
258            && let Some(v) = &result
259        {
260            SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
261        }
262        Ok(result)
263    }
264
265    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
266        self.inner.write_bin(key, value)
267    }
268
269    fn exists(&self, key: &str) -> anyhow::Result<bool> {
270        let result = self.inner.read_bin(key)?;
271        if self.tracking()
272            && let Some(v) = &result
273        {
274            SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
275        }
276        Ok(result.is_some())
277    }
278
279    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
280        self.inner.setting_keys()
281    }
282}
283
284impl<T: BitswapStoreRead> BitswapStoreRead for ReadOpsTrackingStore<T> {
285    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
286        let result = self.inner.get(cid)?;
287        if self.tracking()
288            && let Some(v) = &result
289        {
290            Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
291        }
292        Ok(result.is_some())
293    }
294
295    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
296        let result = self.inner.get(cid)?;
297        if self.tracking()
298            && let Some(v) = &result
299        {
300            Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
301        }
302        Ok(result)
303    }
304}
305
306impl<T: BitswapStoreReadWrite> BitswapStoreReadWrite for ReadOpsTrackingStore<T> {
307    type Hashes = <T as BitswapStoreReadWrite>::Hashes;
308
309    fn insert(&self, block: &Block64<Self::Hashes>) -> anyhow::Result<()> {
310        self.inner.insert(block)
311    }
312}
313
314impl<T: EthMappingsStore> EthMappingsStore for ReadOpsTrackingStore<T> {
315    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
316        let result = self.inner.read_bin(key)?;
317        if self.tracking()
318            && let Some(v) = &result
319        {
320            EthMappingsStore::write_bin(&self.tracker, key, v.as_slice())?;
321        }
322        Ok(result)
323    }
324
325    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
326        self.inner.write_bin(key, value)
327    }
328
329    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
330        self.inner.exists(key)
331    }
332
333    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
334        self.inner.get_message_cids()
335    }
336
337    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
338        self.inner.delete(keys)
339    }
340}