forest/tool/subcommands/api_cmd/
generate_test_snapshot.rs1use super::*;
5use crate::chain_sync::SyncStatusReport;
6use crate::daemon::bundle::load_actor_bundles;
7use crate::{
8 KeyStore, KeyStoreConfig,
9 blocks::TipsetKey,
10 chain::ChainStore,
11 chain_sync::network_context::SyncNetworkContext,
12 daemon::db_util::load_all_forest_cars,
13 db::{
14 CAR_DB_DIR_NAME, EthMappingsStore, HeaviestTipsetKeyProvider, MemoryDB, SettingsStore,
15 SettingsStoreExt, db_engine::open_db, parity_db::ParityDb,
16 },
17 genesis::read_genesis_header,
18 libp2p::{NetworkMessage, PeerManager},
19 libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite, Block64},
20 message_pool::{MessagePool, MpoolLocker, NonceTracker},
21 networks::ChainConfig,
22 shim::address::CurrentNetwork,
23 state_manager::StateManager,
24};
25use api_compare_tests::TestDump;
26use fvm_shared4::address::Network;
27use openrpc_types::ParamStructure;
28use parking_lot::RwLock;
29use rpc::{RPCState, RpcMethod as _, eth::filter::EthEventHandler};
30use std::sync::atomic::{AtomicBool, Ordering};
31use tokio::{sync::mpsc, task::JoinSet};
32
33pub async fn run_test_with_dump(
34 test_dump: &TestDump,
35 db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
36 chain: &NetworkChain,
37 allow_response_mismatch: bool,
38 allow_failure: bool,
39) -> anyhow::Result<()> {
40 if chain.is_testnet() {
41 CurrentNetwork::set_global(Network::Testnet);
42 }
43 let mut run = false;
44 let chain_config = Arc::new(ChainConfig::from_chain(chain));
45 let (ctx, _, _) = ctx(db, chain_config).await?;
46 let params_raw = Some(serde_json::to_string(&test_dump.request.params)?);
47 let mut ext = http::Extensions::new();
48 ext.insert(test_dump.path);
49 macro_rules! run_test {
50 ($ty:ty) => {
51 if test_dump.request.method_name.as_ref() == <$ty>::NAME
52 && <$ty>::API_PATHS.contains(test_dump.path)
53 {
54 let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?;
55 match <$ty>::handle(ctx.clone(), params, &ext).await {
56 Ok(result) => {
57 anyhow::ensure!(
58 allow_response_mismatch
59 || test_dump.forest_response == Ok(result.into_lotus_json_value()?),
60 "Response mismatch between Forest and Lotus"
61 );
62 }
63 Err(_) if allow_failure => {
64 }
66 Err(e) => {
67 bail!("Error running test {}: {}", <$ty>::NAME, e);
68 }
69 }
70 run = true;
71 }
72 };
73 }
74 crate::for_each_rpc_method!(run_test);
75 anyhow::ensure!(run, "RPC method not found");
76 Ok(())
77}
78
79pub async fn load_db(
80 db_root: &Path,
81 chain: Option<&NetworkChain>,
82) -> anyhow::Result<Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>> {
83 let db_writer = open_db(db_root.into(), &Default::default())?;
84 let db = ManyCar::new(db_writer);
85 let forest_car_db_dir = db_root.join(CAR_DB_DIR_NAME);
86 load_all_forest_cars(&db, &forest_car_db_dir)?;
87 if let Some(chain) = chain {
88 load_actor_bundles(&db, chain).await?;
89 }
90 Ok(Arc::new(ReadOpsTrackingStore::new(db)))
91}
92
93pub(super) fn build_index(db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>) -> Option<Index> {
94 let mut index = Index::default();
95 let reader = db.tracker.eth_mappings_db.read();
96 for (k, v) in reader.iter() {
97 index
98 .eth_mappings
99 .get_or_insert_with(ahash::HashMap::default)
100 .insert(k.to_string(), Payload(v.clone()));
101 }
102 if index == Index::default() {
103 None
104 } else {
105 Some(index)
106 }
107}
108
109async fn ctx(
110 db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
111 chain_config: Arc<ChainConfig>,
112) -> anyhow::Result<(
113 Arc<RPCState<ReadOpsTrackingStore<ManyCar<ParityDb>>>>,
114 flume::Receiver<NetworkMessage>,
115 tokio::sync::mpsc::Receiver<()>,
116)> {
117 let (network_send, network_rx) = flume::bounded(5);
118 let (tipset_send, _) = flume::bounded(5);
119 let genesis_header =
120 read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?;
121
122 let chain_store = Arc::new(
123 ChainStore::new(
124 db.clone(),
125 db.clone(),
126 db.clone(),
127 chain_config,
128 genesis_header,
129 )
130 .unwrap(),
131 );
132
133 let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap());
134 let message_pool = MessagePool::new(
135 chain_store.clone(),
136 network_send.clone(),
137 Default::default(),
138 state_manager.chain_config().clone(),
139 &mut JoinSet::new(),
140 )?;
141
142 let peer_manager = Arc::new(PeerManager::default());
143 let sync_network_context =
144 SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
145 let (shutdown, shutdown_recv) = mpsc::channel(1);
146 let nonce_tracker = NonceTracker::new();
147 let rpc_state = Arc::new(RPCState {
148 state_manager,
149 keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)),
150 mpool: Arc::new(message_pool),
151 bad_blocks: Default::default(),
152 sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
153 eth_event_handler: Arc::new(EthEventHandler::new()),
154 sync_network_context,
155 start_time: chrono::Utc::now(),
156 shutdown,
157 tipset_send,
158 snapshot_progress_tracker: Default::default(),
159 mpool_locker: MpoolLocker::new(),
160 nonce_tracker,
161 temp_dir: Arc::new(std::env::temp_dir()),
162 });
163 Ok((rpc_state, network_rx, shutdown_recv))
164}
165
166pub struct ReadOpsTrackingStore<T> {
168 inner: T,
169 pub tracker: Arc<MemoryDB>,
170 tracking: AtomicBool,
171}
172
173impl<T> ReadOpsTrackingStore<T> {
174 pub fn resume_tracking(&self) {
175 self.tracking.store(true, Ordering::Relaxed);
176 }
177
178 pub fn pause_tracking(&self) {
179 self.tracking.store(false, Ordering::Relaxed);
180 }
181
182 fn tracking(&self) -> bool {
183 self.tracking.load(Ordering::Relaxed)
184 }
185}
186
187impl<T> ReadOpsTrackingStore<T>
188where
189 T: Blockstore + SettingsStore + HeaviestTipsetKeyProvider,
190{
191 fn is_chain_head_tracked(&self) -> anyhow::Result<bool> {
192 SettingsStore::exists(&self.tracker, crate::db::setting_keys::HEAD_KEY)
193 }
194
195 pub fn ensure_chain_head_is_tracked(&self) -> anyhow::Result<()> {
196 if !self.is_chain_head_tracked()? {
197 SettingsStoreExt::write_obj(
198 &self.tracker,
199 crate::db::setting_keys::HEAD_KEY,
200 &self
201 .inner
202 .heaviest_tipset_key()?
203 .context("heaviest tipset key not found")?,
204 )?;
205 }
206
207 Ok(())
208 }
209}
210
211impl<T> ReadOpsTrackingStore<T> {
212 pub fn new(inner: T) -> Self {
213 Self {
214 inner,
215 tracker: Arc::new(Default::default()),
216 tracking: AtomicBool::new(true),
217 }
218 }
219
220 pub async fn export_forest_car<W: tokio::io::AsyncWrite + Unpin>(
221 &self,
222 writer: &mut W,
223 ) -> anyhow::Result<()> {
224 self.tracker.export_forest_car(writer).await
225 }
226}
227
228impl<T: HeaviestTipsetKeyProvider> HeaviestTipsetKeyProvider for ReadOpsTrackingStore<T> {
229 fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
230 self.inner.heaviest_tipset_key()
231 }
232
233 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
234 self.inner.set_heaviest_tipset_key(tsk)
235 }
236}
237
238impl<T: Blockstore> Blockstore for ReadOpsTrackingStore<T> {
239 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
240 let result = self.inner.get(k)?;
241 if self.tracking()
242 && let Some(v) = &result
243 {
244 self.tracker.put_keyed(k, v.as_slice())?;
245 }
246 Ok(result)
247 }
248
249 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
250 self.inner.put_keyed(k, block)
251 }
252}
253
254impl<T: SettingsStore> SettingsStore for ReadOpsTrackingStore<T> {
255 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
256 let result = self.inner.read_bin(key)?;
257 if self.tracking()
258 && let Some(v) = &result
259 {
260 SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
261 }
262 Ok(result)
263 }
264
265 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
266 self.inner.write_bin(key, value)
267 }
268
269 fn exists(&self, key: &str) -> anyhow::Result<bool> {
270 let result = self.inner.read_bin(key)?;
271 if self.tracking()
272 && let Some(v) = &result
273 {
274 SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
275 }
276 Ok(result.is_some())
277 }
278
279 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
280 self.inner.setting_keys()
281 }
282}
283
284impl<T: BitswapStoreRead> BitswapStoreRead for ReadOpsTrackingStore<T> {
285 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
286 let result = self.inner.get(cid)?;
287 if self.tracking()
288 && let Some(v) = &result
289 {
290 Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
291 }
292 Ok(result.is_some())
293 }
294
295 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
296 let result = self.inner.get(cid)?;
297 if self.tracking()
298 && let Some(v) = &result
299 {
300 Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
301 }
302 Ok(result)
303 }
304}
305
306impl<T: BitswapStoreReadWrite> BitswapStoreReadWrite for ReadOpsTrackingStore<T> {
307 type Hashes = <T as BitswapStoreReadWrite>::Hashes;
308
309 fn insert(&self, block: &Block64<Self::Hashes>) -> anyhow::Result<()> {
310 self.inner.insert(block)
311 }
312}
313
314impl<T: EthMappingsStore> EthMappingsStore for ReadOpsTrackingStore<T> {
315 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
316 let result = self.inner.read_bin(key)?;
317 if self.tracking()
318 && let Some(v) = &result
319 {
320 EthMappingsStore::write_bin(&self.tracker, key, v.as_slice())?;
321 }
322 Ok(result)
323 }
324
325 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
326 self.inner.write_bin(key, value)
327 }
328
329 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
330 self.inner.exists(key)
331 }
332
333 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
334 self.inner.get_message_cids()
335 }
336
337 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
338 self.inner.delete(keys)
339 }
340}