forest/tool/subcommands/api_cmd/
generate_test_snapshot.rs1use super::*;
5use crate::chain_sync::SyncStatusReport;
6use crate::daemon::bundle::load_actor_bundles;
7use crate::{
8 KeyStore, KeyStoreConfig,
9 blocks::TipsetKey,
10 chain::ChainStore,
11 chain_sync::network_context::SyncNetworkContext,
12 daemon::db_util::load_all_forest_cars,
13 db::{
14 CAR_DB_DIR_NAME, EthMappingsStore, HeaviestTipsetKeyProvider, MemoryDB, SettingsStore,
15 SettingsStoreExt, db_engine::open_db, parity_db::ParityDb,
16 },
17 genesis::read_genesis_header,
18 libp2p::{NetworkMessage, PeerManager},
19 libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite, Block64},
20 message_pool::{MessagePool, MpoolRpcProvider},
21 networks::ChainConfig,
22 shim::address::CurrentNetwork,
23 state_manager::StateManager,
24};
25use api_compare_tests::TestDump;
26use fvm_shared4::address::Network;
27use openrpc_types::ParamStructure;
28use parking_lot::RwLock;
29use rpc::{RPCState, RpcMethod as _, eth::filter::EthEventHandler};
30use std::sync::atomic::{AtomicBool, Ordering};
31use tokio::{sync::mpsc, task::JoinSet};
32
33pub async fn run_test_with_dump(
34 test_dump: &TestDump,
35 db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
36 chain: &NetworkChain,
37 allow_response_mismatch: bool,
38 allow_failure: bool,
39) -> anyhow::Result<()> {
40 if chain.is_testnet() {
41 CurrentNetwork::set_global(Network::Testnet);
42 }
43 let mut run = false;
44 let chain_config = Arc::new(ChainConfig::from_chain(chain));
45 let (ctx, _, _) = ctx(db, chain_config).await?;
46 let params_raw = Some(serde_json::to_string(&test_dump.request.params)?);
47 macro_rules! run_test {
48 ($ty:ty) => {
49 if test_dump.request.method_name.as_ref() == <$ty>::NAME
50 && <$ty>::API_PATHS.contains(test_dump.path)
51 {
52 let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?;
53 match <$ty>::handle(ctx.clone(), params).await {
54 Ok(result) => {
55 anyhow::ensure!(
56 allow_response_mismatch
57 || test_dump.forest_response == Ok(result.into_lotus_json_value()?),
58 "Response mismatch between Forest and Lotus"
59 );
60 }
61 Err(_) if allow_failure => {
62 }
64 Err(e) => {
65 bail!("Error running test {}: {}", <$ty>::NAME, e);
66 }
67 }
68 run = true;
69 }
70 };
71 }
72 crate::for_each_rpc_method!(run_test);
73 anyhow::ensure!(run, "RPC method not found");
74 Ok(())
75}
76
77pub async fn load_db(
78 db_root: &Path,
79 chain: Option<&NetworkChain>,
80) -> anyhow::Result<Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>> {
81 let db_writer = open_db(db_root.into(), &Default::default())?;
82 let db = ManyCar::new(db_writer);
83 let forest_car_db_dir = db_root.join(CAR_DB_DIR_NAME);
84 load_all_forest_cars(&db, &forest_car_db_dir)?;
85 if let Some(chain) = chain {
86 load_actor_bundles(&db, chain).await?;
87 }
88 Ok(Arc::new(ReadOpsTrackingStore::new(db)))
89}
90
91pub(super) fn build_index(db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>) -> Option<Index> {
92 let mut index = Index::default();
93 let reader = db.tracker.eth_mappings_db.read();
94 for (k, v) in reader.iter() {
95 index
96 .eth_mappings
97 .get_or_insert_with(ahash::HashMap::default)
98 .insert(k.to_string(), Payload(v.clone()));
99 }
100 if index == Index::default() {
101 None
102 } else {
103 Some(index)
104 }
105}
106
107async fn ctx(
108 db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
109 chain_config: Arc<ChainConfig>,
110) -> anyhow::Result<(
111 Arc<RPCState<ReadOpsTrackingStore<ManyCar<ParityDb>>>>,
112 flume::Receiver<NetworkMessage>,
113 tokio::sync::mpsc::Receiver<()>,
114)> {
115 let (network_send, network_rx) = flume::bounded(5);
116 let (tipset_send, _) = flume::bounded(5);
117 let genesis_header =
118 read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?;
119
120 let chain_store = Arc::new(
121 ChainStore::new(
122 db.clone(),
123 db.clone(),
124 db.clone(),
125 chain_config,
126 genesis_header,
127 )
128 .unwrap(),
129 );
130
131 let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap());
132 let message_pool = MessagePool::new(
133 MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()),
134 network_send.clone(),
135 Default::default(),
136 state_manager.chain_config().clone(),
137 &mut JoinSet::new(),
138 )?;
139
140 let peer_manager = Arc::new(PeerManager::default());
141 let sync_network_context =
142 SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
143 let (shutdown, shutdown_recv) = mpsc::channel(1);
144 let rpc_state = Arc::new(RPCState {
145 state_manager,
146 keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)),
147 mpool: Arc::new(message_pool),
148 bad_blocks: Default::default(),
149 msgs_in_tipset: Default::default(),
150 sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
151 eth_event_handler: Arc::new(EthEventHandler::new()),
152 sync_network_context,
153 start_time: chrono::Utc::now(),
154 shutdown,
155 tipset_send,
156 snapshot_progress_tracker: Default::default(),
157 });
158 Ok((rpc_state, network_rx, shutdown_recv))
159}
160
161pub struct ReadOpsTrackingStore<T> {
163 inner: T,
164 pub tracker: Arc<MemoryDB>,
165 tracking: AtomicBool,
166}
167
168impl<T> ReadOpsTrackingStore<T> {
169 pub fn resume_tracking(&self) {
170 self.tracking.store(true, Ordering::Relaxed);
171 }
172
173 pub fn pause_tracking(&self) {
174 self.tracking.store(false, Ordering::Relaxed);
175 }
176
177 fn tracking(&self) -> bool {
178 self.tracking.load(Ordering::Relaxed)
179 }
180}
181
182impl<T> ReadOpsTrackingStore<T>
183where
184 T: Blockstore + SettingsStore + HeaviestTipsetKeyProvider,
185{
186 fn is_chain_head_tracked(&self) -> anyhow::Result<bool> {
187 SettingsStore::exists(&self.tracker, crate::db::setting_keys::HEAD_KEY)
188 }
189
190 pub fn ensure_chain_head_is_tracked(&self) -> anyhow::Result<()> {
191 if !self.is_chain_head_tracked()? {
192 SettingsStoreExt::write_obj(
193 &self.tracker,
194 crate::db::setting_keys::HEAD_KEY,
195 &self.inner.heaviest_tipset_key()?,
196 )?;
197 }
198
199 Ok(())
200 }
201}
202
203impl<T> ReadOpsTrackingStore<T>
204where
205 T: Blockstore + SettingsStore,
206{
207 pub fn new(inner: T) -> Self {
208 Self {
209 inner,
210 tracker: Arc::new(Default::default()),
211 tracking: AtomicBool::new(true),
212 }
213 }
214
215 pub async fn export_forest_car<W: tokio::io::AsyncWrite + Unpin>(
216 &self,
217 writer: &mut W,
218 ) -> anyhow::Result<()> {
219 self.tracker.export_forest_car(writer).await
220 }
221}
222
223impl<T: HeaviestTipsetKeyProvider> HeaviestTipsetKeyProvider for ReadOpsTrackingStore<T> {
224 fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
225 self.inner.heaviest_tipset_key()
226 }
227
228 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
229 self.inner.set_heaviest_tipset_key(tsk)
230 }
231}
232
233impl<T: Blockstore> Blockstore for ReadOpsTrackingStore<T> {
234 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
235 let result = self.inner.get(k)?;
236 if self.tracking()
237 && let Some(v) = &result
238 {
239 self.tracker.put_keyed(k, v.as_slice())?;
240 }
241 Ok(result)
242 }
243
244 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
245 self.inner.put_keyed(k, block)
246 }
247}
248
249impl<T: SettingsStore> SettingsStore for ReadOpsTrackingStore<T> {
250 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
251 let result = self.inner.read_bin(key)?;
252 if self.tracking()
253 && let Some(v) = &result
254 {
255 SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
256 }
257 Ok(result)
258 }
259
260 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
261 self.inner.write_bin(key, value)
262 }
263
264 fn exists(&self, key: &str) -> anyhow::Result<bool> {
265 let result = self.inner.read_bin(key)?;
266 if self.tracking()
267 && let Some(v) = &result
268 {
269 SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
270 }
271 Ok(result.is_some())
272 }
273
274 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
275 self.inner.setting_keys()
276 }
277}
278
279impl<T: BitswapStoreRead> BitswapStoreRead for ReadOpsTrackingStore<T> {
280 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
281 let result = self.inner.get(cid)?;
282 if self.tracking()
283 && let Some(v) = &result
284 {
285 Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
286 }
287 Ok(result.is_some())
288 }
289
290 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
291 let result = self.inner.get(cid)?;
292 if self.tracking()
293 && let Some(v) = &result
294 {
295 Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
296 }
297 Ok(result)
298 }
299}
300
301impl<T: BitswapStoreReadWrite> BitswapStoreReadWrite for ReadOpsTrackingStore<T> {
302 type Hashes = <T as BitswapStoreReadWrite>::Hashes;
303
304 fn insert(&self, block: &Block64<Self::Hashes>) -> anyhow::Result<()> {
305 self.inner.insert(block)
306 }
307}
308
309impl<T: EthMappingsStore> EthMappingsStore for ReadOpsTrackingStore<T> {
310 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
311 let result = self.inner.read_bin(key)?;
312 if self.tracking()
313 && let Some(v) = &result
314 {
315 EthMappingsStore::write_bin(&self.tracker, key, v.as_slice())?;
316 }
317 Ok(result)
318 }
319
320 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
321 self.inner.write_bin(key, value)
322 }
323
324 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
325 self.inner.exists(key)
326 }
327
328 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
329 self.inner.get_message_cids()
330 }
331
332 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
333 self.inner.delete(keys)
334 }
335}