Skip to main content

forest/tool/subcommands/api_cmd/
generate_test_snapshot.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::*;
5use crate::chain_sync::SyncStatusReport;
6use crate::daemon::bundle::load_actor_bundles;
7use crate::{
8    KeyStore, KeyStoreConfig,
9    blocks::TipsetKey,
10    chain::ChainStore,
11    chain_sync::network_context::SyncNetworkContext,
12    daemon::db_util::load_all_forest_cars,
13    db::{
14        CAR_DB_DIR_NAME, EthMappingsStore, HeaviestTipsetKeyProvider, MemoryDB, SettingsStore,
15        SettingsStoreExt, db_engine::open_db, parity_db::ParityDb,
16    },
17    genesis::read_genesis_header,
18    libp2p::{NetworkMessage, PeerManager},
19    libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite, Block64},
20    message_pool::{MessagePool, MpoolRpcProvider},
21    networks::ChainConfig,
22    shim::address::CurrentNetwork,
23    state_manager::StateManager,
24};
25use api_compare_tests::TestDump;
26use fvm_shared4::address::Network;
27use openrpc_types::ParamStructure;
28use parking_lot::RwLock;
29use rpc::{RPCState, RpcMethod as _, eth::filter::EthEventHandler};
30use std::sync::atomic::{AtomicBool, Ordering};
31use tokio::{sync::mpsc, task::JoinSet};
32
33pub async fn run_test_with_dump(
34    test_dump: &TestDump,
35    db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
36    chain: &NetworkChain,
37    allow_response_mismatch: bool,
38    allow_failure: bool,
39) -> anyhow::Result<()> {
40    if chain.is_testnet() {
41        CurrentNetwork::set_global(Network::Testnet);
42    }
43    let mut run = false;
44    let chain_config = Arc::new(ChainConfig::from_chain(chain));
45    let (ctx, _, _) = ctx(db, chain_config).await?;
46    let params_raw = Some(serde_json::to_string(&test_dump.request.params)?);
47    macro_rules! run_test {
48        ($ty:ty) => {
49            if test_dump.request.method_name.as_ref() == <$ty>::NAME
50                && <$ty>::API_PATHS.contains(test_dump.path)
51            {
52                let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?;
53                match <$ty>::handle(ctx.clone(), params).await {
54                    Ok(result) => {
55                        anyhow::ensure!(
56                            allow_response_mismatch
57                                || test_dump.forest_response == Ok(result.into_lotus_json_value()?),
58                            "Response mismatch between Forest and Lotus"
59                        );
60                    }
61                    Err(_) if allow_failure => {
62                        // If we allow failure, we do not check the error
63                    }
64                    Err(e) => {
65                        bail!("Error running test {}: {}", <$ty>::NAME, e);
66                    }
67                }
68                run = true;
69            }
70        };
71    }
72    crate::for_each_rpc_method!(run_test);
73    anyhow::ensure!(run, "RPC method not found");
74    Ok(())
75}
76
77pub async fn load_db(
78    db_root: &Path,
79    chain: Option<&NetworkChain>,
80) -> anyhow::Result<Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>> {
81    let db_writer = open_db(db_root.into(), &Default::default())?;
82    let db = ManyCar::new(db_writer);
83    let forest_car_db_dir = db_root.join(CAR_DB_DIR_NAME);
84    load_all_forest_cars(&db, &forest_car_db_dir)?;
85    if let Some(chain) = chain {
86        load_actor_bundles(&db, chain).await?;
87    }
88    Ok(Arc::new(ReadOpsTrackingStore::new(db)))
89}
90
91pub(super) fn build_index(db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>) -> Option<Index> {
92    let mut index = Index::default();
93    let reader = db.tracker.eth_mappings_db.read();
94    for (k, v) in reader.iter() {
95        index
96            .eth_mappings
97            .get_or_insert_with(ahash::HashMap::default)
98            .insert(k.to_string(), Payload(v.clone()));
99    }
100    if index == Index::default() {
101        None
102    } else {
103        Some(index)
104    }
105}
106
107async fn ctx(
108    db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
109    chain_config: Arc<ChainConfig>,
110) -> anyhow::Result<(
111    Arc<RPCState<ReadOpsTrackingStore<ManyCar<ParityDb>>>>,
112    flume::Receiver<NetworkMessage>,
113    tokio::sync::mpsc::Receiver<()>,
114)> {
115    let (network_send, network_rx) = flume::bounded(5);
116    let (tipset_send, _) = flume::bounded(5);
117    let genesis_header =
118        read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?;
119
120    let chain_store = Arc::new(
121        ChainStore::new(
122            db.clone(),
123            db.clone(),
124            db.clone(),
125            chain_config,
126            genesis_header,
127        )
128        .unwrap(),
129    );
130
131    let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap());
132    let message_pool = MessagePool::new(
133        MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()),
134        network_send.clone(),
135        Default::default(),
136        state_manager.chain_config().clone(),
137        &mut JoinSet::new(),
138    )?;
139
140    let peer_manager = Arc::new(PeerManager::default());
141    let sync_network_context =
142        SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
143    let (shutdown, shutdown_recv) = mpsc::channel(1);
144    let rpc_state = Arc::new(RPCState {
145        state_manager,
146        keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)),
147        mpool: Arc::new(message_pool),
148        bad_blocks: Default::default(),
149        msgs_in_tipset: Default::default(),
150        sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
151        eth_event_handler: Arc::new(EthEventHandler::new()),
152        sync_network_context,
153        start_time: chrono::Utc::now(),
154        shutdown,
155        tipset_send,
156        snapshot_progress_tracker: Default::default(),
157    });
158    Ok((rpc_state, network_rx, shutdown_recv))
159}
160
161/// A [`Blockstore`] wrapper that tracks read operations to the inner [`Blockstore`] with an [`MemoryDB`]
162pub struct ReadOpsTrackingStore<T> {
163    inner: T,
164    pub tracker: Arc<MemoryDB>,
165    tracking: AtomicBool,
166}
167
168impl<T> ReadOpsTrackingStore<T> {
169    pub fn resume_tracking(&self) {
170        self.tracking.store(true, Ordering::Relaxed);
171    }
172
173    pub fn pause_tracking(&self) {
174        self.tracking.store(false, Ordering::Relaxed);
175    }
176
177    fn tracking(&self) -> bool {
178        self.tracking.load(Ordering::Relaxed)
179    }
180}
181
182impl<T> ReadOpsTrackingStore<T>
183where
184    T: Blockstore + SettingsStore + HeaviestTipsetKeyProvider,
185{
186    fn is_chain_head_tracked(&self) -> anyhow::Result<bool> {
187        SettingsStore::exists(&self.tracker, crate::db::setting_keys::HEAD_KEY)
188    }
189
190    pub fn ensure_chain_head_is_tracked(&self) -> anyhow::Result<()> {
191        if !self.is_chain_head_tracked()? {
192            SettingsStoreExt::write_obj(
193                &self.tracker,
194                crate::db::setting_keys::HEAD_KEY,
195                &self.inner.heaviest_tipset_key()?,
196            )?;
197        }
198
199        Ok(())
200    }
201}
202
203impl<T> ReadOpsTrackingStore<T>
204where
205    T: Blockstore + SettingsStore,
206{
207    pub fn new(inner: T) -> Self {
208        Self {
209            inner,
210            tracker: Arc::new(Default::default()),
211            tracking: AtomicBool::new(true),
212        }
213    }
214
215    pub async fn export_forest_car<W: tokio::io::AsyncWrite + Unpin>(
216        &self,
217        writer: &mut W,
218    ) -> anyhow::Result<()> {
219        self.tracker.export_forest_car(writer).await
220    }
221}
222
223impl<T: HeaviestTipsetKeyProvider> HeaviestTipsetKeyProvider for ReadOpsTrackingStore<T> {
224    fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
225        self.inner.heaviest_tipset_key()
226    }
227
228    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
229        self.inner.set_heaviest_tipset_key(tsk)
230    }
231}
232
233impl<T: Blockstore> Blockstore for ReadOpsTrackingStore<T> {
234    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
235        let result = self.inner.get(k)?;
236        if self.tracking()
237            && let Some(v) = &result
238        {
239            self.tracker.put_keyed(k, v.as_slice())?;
240        }
241        Ok(result)
242    }
243
244    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
245        self.inner.put_keyed(k, block)
246    }
247}
248
249impl<T: SettingsStore> SettingsStore for ReadOpsTrackingStore<T> {
250    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
251        let result = self.inner.read_bin(key)?;
252        if self.tracking()
253            && let Some(v) = &result
254        {
255            SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
256        }
257        Ok(result)
258    }
259
260    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
261        self.inner.write_bin(key, value)
262    }
263
264    fn exists(&self, key: &str) -> anyhow::Result<bool> {
265        let result = self.inner.read_bin(key)?;
266        if self.tracking()
267            && let Some(v) = &result
268        {
269            SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
270        }
271        Ok(result.is_some())
272    }
273
274    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
275        self.inner.setting_keys()
276    }
277}
278
279impl<T: BitswapStoreRead> BitswapStoreRead for ReadOpsTrackingStore<T> {
280    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
281        let result = self.inner.get(cid)?;
282        if self.tracking()
283            && let Some(v) = &result
284        {
285            Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
286        }
287        Ok(result.is_some())
288    }
289
290    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
291        let result = self.inner.get(cid)?;
292        if self.tracking()
293            && let Some(v) = &result
294        {
295            Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
296        }
297        Ok(result)
298    }
299}
300
301impl<T: BitswapStoreReadWrite> BitswapStoreReadWrite for ReadOpsTrackingStore<T> {
302    type Hashes = <T as BitswapStoreReadWrite>::Hashes;
303
304    fn insert(&self, block: &Block64<Self::Hashes>) -> anyhow::Result<()> {
305        self.inner.insert(block)
306    }
307}
308
309impl<T: EthMappingsStore> EthMappingsStore for ReadOpsTrackingStore<T> {
310    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
311        let result = self.inner.read_bin(key)?;
312        if self.tracking()
313            && let Some(v) = &result
314        {
315            EthMappingsStore::write_bin(&self.tracker, key, v.as_slice())?;
316        }
317        Ok(result)
318    }
319
320    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
321        self.inner.write_bin(key, value)
322    }
323
324    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
325        self.inner.exists(key)
326    }
327
328    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
329        self.inner.get_message_cids()
330    }
331
332    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
333        self.inner.delete(keys)
334    }
335}