forest/tool/subcommands/api_cmd/
generate_test_snapshot.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::*;
5use crate::chain_sync::SyncStatusReport;
6use crate::{
7    KeyStore, KeyStoreConfig,
8    blocks::TipsetKey,
9    chain::ChainStore,
10    chain_sync::network_context::SyncNetworkContext,
11    daemon::db_util::load_all_forest_cars,
12    db::{
13        CAR_DB_DIR_NAME, EthMappingsStore, HeaviestTipsetKeyProvider, IndicesStore, MemoryDB,
14        SettingsStore, SettingsStoreExt, db_engine::open_db, parity_db::ParityDb,
15    },
16    genesis::read_genesis_header,
17    libp2p::{NetworkMessage, PeerManager},
18    libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite, Block64},
19    message_pool::{MessagePool, MpoolRpcProvider},
20    networks::ChainConfig,
21    shim::address::CurrentNetwork,
22    state_manager::StateManager,
23};
24use api_compare_tests::TestDump;
25use fvm_shared4::address::Network;
26use openrpc_types::ParamStructure;
27use parking_lot::RwLock;
28use rpc::{RPCState, RpcMethod as _, eth::filter::EthEventHandler};
29use tokio::{sync::mpsc, task::JoinSet};
30
31pub async fn run_test_with_dump(
32    test_dump: &TestDump,
33    db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
34    chain: &NetworkChain,
35    allow_response_mismatch: bool,
36    allow_failure: bool,
37) -> anyhow::Result<()> {
38    if chain.is_testnet() {
39        CurrentNetwork::set_global(Network::Testnet);
40    }
41    let mut run = false;
42    let chain_config = Arc::new(ChainConfig::from_chain(chain));
43    let (ctx, _, _) = ctx(db, chain_config).await?;
44    let params_raw = Some(serde_json::to_string(&test_dump.request.params)?);
45    macro_rules! run_test {
46        ($ty:ty) => {
47            if test_dump.request.method_name.as_ref() == <$ty>::NAME {
48                let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?;
49                match <$ty>::handle(ctx.clone(), params).await {
50                    Ok(result) => {
51                        anyhow::ensure!(
52                            allow_response_mismatch
53                                || test_dump.forest_response == Ok(result.into_lotus_json_value()?),
54                            "Response mismatch between Forest and Lotus"
55                        );
56                    }
57                    Err(_) if allow_failure => {
58                        // If we allow failure, we do not check the error
59                    }
60                    Err(e) => {
61                        bail!("Error running test {}: {}", <$ty>::NAME, e);
62                    }
63                }
64                run = true;
65            }
66        };
67    }
68    crate::for_each_rpc_method!(run_test);
69    anyhow::ensure!(run, "RPC method not found");
70    Ok(())
71}
72
73pub fn load_db(db_root: &Path) -> anyhow::Result<Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>> {
74    let db_writer = open_db(db_root.into(), &Default::default())?;
75    let db = ManyCar::new(db_writer);
76    let forest_car_db_dir = db_root.join(CAR_DB_DIR_NAME);
77    load_all_forest_cars(&db, &forest_car_db_dir)?;
78    Ok(Arc::new(ReadOpsTrackingStore::new(db)))
79}
80
81pub(super) fn build_index(db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>) -> Option<Index> {
82    let mut index = Index::default();
83    let reader = db.tracker.eth_mappings_db.read();
84    for (k, v) in reader.iter() {
85        index
86            .eth_mappings
87            .get_or_insert_with(ahash::HashMap::default)
88            .insert(k.to_string(), Payload(v.clone()));
89    }
90    let reader = db.tracker.indices_db.read();
91    for (k, v) in reader.iter() {
92        index
93            .indices
94            .get_or_insert_with(ahash::HashMap::default)
95            .insert(k.to_string(), Payload(v.clone()));
96    }
97    if index == Index::default() {
98        None
99    } else {
100        Some(index)
101    }
102}
103
104async fn ctx(
105    db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
106    chain_config: Arc<ChainConfig>,
107) -> anyhow::Result<(
108    Arc<RPCState<ReadOpsTrackingStore<ManyCar<ParityDb>>>>,
109    flume::Receiver<NetworkMessage>,
110    tokio::sync::mpsc::Receiver<()>,
111)> {
112    let (network_send, network_rx) = flume::bounded(5);
113    let (tipset_send, _) = flume::bounded(5);
114    let genesis_header =
115        read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?;
116
117    let chain_store = Arc::new(
118        ChainStore::new(
119            db.clone(),
120            db.clone(),
121            db.clone(),
122            db,
123            chain_config,
124            genesis_header,
125        )
126        .unwrap(),
127    );
128
129    let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap());
130    let message_pool = MessagePool::new(
131        MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()),
132        network_send.clone(),
133        Default::default(),
134        state_manager.chain_config().clone(),
135        &mut JoinSet::new(),
136    )?;
137
138    let peer_manager = Arc::new(PeerManager::default());
139    let sync_network_context =
140        SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
141    let (shutdown, shutdown_recv) = mpsc::channel(1);
142    let rpc_state = Arc::new(RPCState {
143        state_manager,
144        keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)),
145        mpool: Arc::new(message_pool),
146        bad_blocks: Default::default(),
147        msgs_in_tipset: Default::default(),
148        sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
149        eth_event_handler: Arc::new(EthEventHandler::new()),
150        sync_network_context,
151        start_time: chrono::Utc::now(),
152        shutdown,
153        tipset_send,
154        snapshot_progress_tracker: Default::default(),
155    });
156    Ok((rpc_state, network_rx, shutdown_recv))
157}
158
159/// A [`Blockstore`] wrapper that tracks read operations to the inner [`Blockstore`] with an [`MemoryDB`]
160pub struct ReadOpsTrackingStore<T> {
161    inner: T,
162    pub tracker: Arc<MemoryDB>,
163}
164
165impl<T> ReadOpsTrackingStore<T>
166where
167    T: Blockstore + SettingsStore + HeaviestTipsetKeyProvider,
168{
169    fn is_chain_head_tracked(&self) -> anyhow::Result<bool> {
170        SettingsStore::exists(&self.tracker, crate::db::setting_keys::HEAD_KEY)
171    }
172
173    pub fn ensure_chain_head_is_tracked(&self) -> anyhow::Result<()> {
174        if !self.is_chain_head_tracked()? {
175            SettingsStoreExt::write_obj(
176                &self.tracker,
177                crate::db::setting_keys::HEAD_KEY,
178                &self.inner.heaviest_tipset_key()?,
179            )?;
180        }
181
182        Ok(())
183    }
184}
185
186impl<T> ReadOpsTrackingStore<T>
187where
188    T: Blockstore + SettingsStore,
189{
190    pub fn new(inner: T) -> Self {
191        Self {
192            inner,
193            tracker: Arc::new(Default::default()),
194        }
195    }
196
197    pub async fn export_forest_car<W: tokio::io::AsyncWrite + Unpin>(
198        &self,
199        writer: &mut W,
200    ) -> anyhow::Result<()> {
201        self.tracker.export_forest_car(writer).await
202    }
203}
204
205impl<T: HeaviestTipsetKeyProvider> HeaviestTipsetKeyProvider for ReadOpsTrackingStore<T> {
206    fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
207        self.inner.heaviest_tipset_key()
208    }
209
210    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
211        self.inner.set_heaviest_tipset_key(tsk)
212    }
213}
214
215impl<T: Blockstore> Blockstore for ReadOpsTrackingStore<T> {
216    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
217        let result = self.inner.get(k)?;
218        if let Some(v) = &result {
219            self.tracker.put_keyed(k, v.as_slice())?;
220        }
221        Ok(result)
222    }
223
224    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
225        self.inner.put_keyed(k, block)
226    }
227}
228
229impl<T: SettingsStore> SettingsStore for ReadOpsTrackingStore<T> {
230    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
231        let result = self.inner.read_bin(key)?;
232        if let Some(v) = &result {
233            SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
234        }
235        Ok(result)
236    }
237
238    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
239        self.inner.write_bin(key, value)
240    }
241
242    fn exists(&self, key: &str) -> anyhow::Result<bool> {
243        let result = self.inner.read_bin(key)?;
244        if let Some(v) = &result {
245            SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
246        }
247        Ok(result.is_some())
248    }
249
250    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
251        self.inner.setting_keys()
252    }
253}
254
255impl<T: BitswapStoreRead> BitswapStoreRead for ReadOpsTrackingStore<T> {
256    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
257        let result = self.inner.get(cid)?;
258        if let Some(v) = &result {
259            Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
260        }
261        Ok(result.is_some())
262    }
263
264    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
265        let result = self.inner.get(cid)?;
266        if let Some(v) = &result {
267            Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
268        }
269        Ok(result)
270    }
271}
272
273impl<T: BitswapStoreReadWrite> BitswapStoreReadWrite for ReadOpsTrackingStore<T> {
274    type Hashes = <T as BitswapStoreReadWrite>::Hashes;
275
276    fn insert(&self, block: &Block64<Self::Hashes>) -> anyhow::Result<()> {
277        self.inner.insert(block)
278    }
279}
280
281impl<T: EthMappingsStore> EthMappingsStore for ReadOpsTrackingStore<T> {
282    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
283        let result = self.inner.read_bin(key)?;
284        if let Some(v) = &result {
285            EthMappingsStore::write_bin(&self.tracker, key, v.as_slice())?;
286        }
287        self.inner.read_bin(key)
288    }
289
290    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
291        self.inner.write_bin(key, value)
292    }
293
294    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
295        self.inner.exists(key)
296    }
297
298    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
299        self.inner.get_message_cids()
300    }
301
302    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
303        self.inner.delete(keys)
304    }
305}
306
307impl<T: IndicesStore> IndicesStore for ReadOpsTrackingStore<T> {
308    fn read_bin(&self, key: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
309        let result = self.inner.read_bin(key)?;
310        if let Some(v) = &result {
311            IndicesStore::write_bin(&self.tracker, key, v.as_slice())?;
312        }
313        self.inner.read_bin(key)
314    }
315
316    fn write_bin(&self, key: &Cid, value: &[u8]) -> anyhow::Result<()> {
317        self.inner.write_bin(key, value)
318    }
319
320    fn exists(&self, key: &Cid) -> anyhow::Result<bool> {
321        self.inner.exists(key)
322    }
323}