forest/tool/subcommands/api_cmd/
generate_test_snapshot.rs1use super::*;
5use crate::chain_sync::SyncStatusReport;
6use crate::daemon::bundle::load_actor_bundles;
7use crate::{
8 KeyStore, KeyStoreConfig,
9 blocks::TipsetKey,
10 chain::ChainStore,
11 chain_sync::network_context::SyncNetworkContext,
12 daemon::db_util::load_all_forest_cars,
13 db::{
14 CAR_DB_DIR_NAME, EthMappingsStore, HeaviestTipsetKeyProvider, MemoryDB, SettingsStore,
15 SettingsStoreExt, db_engine::open_db, parity_db::ParityDb,
16 },
17 genesis::read_genesis_header,
18 libp2p::{NetworkMessage, PeerManager},
19 libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite, Block64},
20 message_pool::{MessagePool, MpoolRpcProvider},
21 networks::ChainConfig,
22 shim::address::CurrentNetwork,
23 state_manager::StateManager,
24};
25use api_compare_tests::TestDump;
26use fvm_shared4::address::Network;
27use openrpc_types::ParamStructure;
28use parking_lot::RwLock;
29use rpc::{RPCState, RpcMethod as _, eth::filter::EthEventHandler};
30use std::sync::atomic::{AtomicBool, Ordering};
31use tokio::{sync::mpsc, task::JoinSet};
32
33pub async fn run_test_with_dump(
34 test_dump: &TestDump,
35 db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
36 chain: &NetworkChain,
37 allow_response_mismatch: bool,
38 allow_failure: bool,
39) -> anyhow::Result<()> {
40 if chain.is_testnet() {
41 CurrentNetwork::set_global(Network::Testnet);
42 }
43 let mut run = false;
44 let chain_config = Arc::new(ChainConfig::from_chain(chain));
45 let (ctx, _, _) = ctx(db, chain_config).await?;
46 let params_raw = Some(serde_json::to_string(&test_dump.request.params)?);
47 let mut ext = http::Extensions::new();
48 ext.insert(test_dump.path);
49 macro_rules! run_test {
50 ($ty:ty) => {
51 if test_dump.request.method_name.as_ref() == <$ty>::NAME
52 && <$ty>::API_PATHS.contains(test_dump.path)
53 {
54 let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?;
55 match <$ty>::handle(ctx.clone(), params, &ext).await {
56 Ok(result) => {
57 anyhow::ensure!(
58 allow_response_mismatch
59 || test_dump.forest_response == Ok(result.into_lotus_json_value()?),
60 "Response mismatch between Forest and Lotus"
61 );
62 }
63 Err(_) if allow_failure => {
64 }
66 Err(e) => {
67 bail!("Error running test {}: {}", <$ty>::NAME, e);
68 }
69 }
70 run = true;
71 }
72 };
73 }
74 crate::for_each_rpc_method!(run_test);
75 anyhow::ensure!(run, "RPC method not found");
76 Ok(())
77}
78
79pub async fn load_db(
80 db_root: &Path,
81 chain: Option<&NetworkChain>,
82) -> anyhow::Result<Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>> {
83 let db_writer = open_db(db_root.into(), &Default::default())?;
84 let db = ManyCar::new(db_writer);
85 let forest_car_db_dir = db_root.join(CAR_DB_DIR_NAME);
86 load_all_forest_cars(&db, &forest_car_db_dir)?;
87 if let Some(chain) = chain {
88 load_actor_bundles(&db, chain).await?;
89 }
90 Ok(Arc::new(ReadOpsTrackingStore::new(db)))
91}
92
93pub(super) fn build_index(db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>) -> Option<Index> {
94 let mut index = Index::default();
95 let reader = db.tracker.eth_mappings_db.read();
96 for (k, v) in reader.iter() {
97 index
98 .eth_mappings
99 .get_or_insert_with(ahash::HashMap::default)
100 .insert(k.to_string(), Payload(v.clone()));
101 }
102 if index == Index::default() {
103 None
104 } else {
105 Some(index)
106 }
107}
108
109async fn ctx(
110 db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
111 chain_config: Arc<ChainConfig>,
112) -> anyhow::Result<(
113 Arc<RPCState<ReadOpsTrackingStore<ManyCar<ParityDb>>>>,
114 flume::Receiver<NetworkMessage>,
115 tokio::sync::mpsc::Receiver<()>,
116)> {
117 let (network_send, network_rx) = flume::bounded(5);
118 let (tipset_send, _) = flume::bounded(5);
119 let genesis_header =
120 read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?;
121
122 let chain_store = Arc::new(
123 ChainStore::new(
124 db.clone(),
125 db.clone(),
126 db.clone(),
127 chain_config,
128 genesis_header,
129 )
130 .unwrap(),
131 );
132
133 let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap());
134 let message_pool = MessagePool::new(
135 MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()),
136 network_send.clone(),
137 Default::default(),
138 state_manager.chain_config().clone(),
139 &mut JoinSet::new(),
140 )?;
141
142 let peer_manager = Arc::new(PeerManager::default());
143 let sync_network_context =
144 SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
145 let (shutdown, shutdown_recv) = mpsc::channel(1);
146 let rpc_state = Arc::new(RPCState {
147 state_manager,
148 keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)),
149 mpool: Arc::new(message_pool),
150 bad_blocks: Default::default(),
151 msgs_in_tipset: Default::default(),
152 sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
153 eth_event_handler: Arc::new(EthEventHandler::new()),
154 sync_network_context,
155 start_time: chrono::Utc::now(),
156 shutdown,
157 tipset_send,
158 snapshot_progress_tracker: Default::default(),
159 });
160 Ok((rpc_state, network_rx, shutdown_recv))
161}
162
163pub struct ReadOpsTrackingStore<T> {
165 inner: T,
166 pub tracker: Arc<MemoryDB>,
167 tracking: AtomicBool,
168}
169
170impl<T> ReadOpsTrackingStore<T> {
171 pub fn resume_tracking(&self) {
172 self.tracking.store(true, Ordering::Relaxed);
173 }
174
175 pub fn pause_tracking(&self) {
176 self.tracking.store(false, Ordering::Relaxed);
177 }
178
179 fn tracking(&self) -> bool {
180 self.tracking.load(Ordering::Relaxed)
181 }
182}
183
184impl<T> ReadOpsTrackingStore<T>
185where
186 T: Blockstore + SettingsStore + HeaviestTipsetKeyProvider,
187{
188 fn is_chain_head_tracked(&self) -> anyhow::Result<bool> {
189 SettingsStore::exists(&self.tracker, crate::db::setting_keys::HEAD_KEY)
190 }
191
192 pub fn ensure_chain_head_is_tracked(&self) -> anyhow::Result<()> {
193 if !self.is_chain_head_tracked()? {
194 SettingsStoreExt::write_obj(
195 &self.tracker,
196 crate::db::setting_keys::HEAD_KEY,
197 &self.inner.heaviest_tipset_key()?,
198 )?;
199 }
200
201 Ok(())
202 }
203}
204
205impl<T> ReadOpsTrackingStore<T>
206where
207 T: Blockstore + SettingsStore,
208{
209 pub fn new(inner: T) -> Self {
210 Self {
211 inner,
212 tracker: Arc::new(Default::default()),
213 tracking: AtomicBool::new(true),
214 }
215 }
216
217 pub async fn export_forest_car<W: tokio::io::AsyncWrite + Unpin>(
218 &self,
219 writer: &mut W,
220 ) -> anyhow::Result<()> {
221 self.tracker.export_forest_car(writer).await
222 }
223}
224
225impl<T: HeaviestTipsetKeyProvider> HeaviestTipsetKeyProvider for ReadOpsTrackingStore<T> {
226 fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
227 self.inner.heaviest_tipset_key()
228 }
229
230 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
231 self.inner.set_heaviest_tipset_key(tsk)
232 }
233}
234
235impl<T: Blockstore> Blockstore for ReadOpsTrackingStore<T> {
236 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
237 let result = self.inner.get(k)?;
238 if self.tracking()
239 && let Some(v) = &result
240 {
241 self.tracker.put_keyed(k, v.as_slice())?;
242 }
243 Ok(result)
244 }
245
246 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
247 self.inner.put_keyed(k, block)
248 }
249}
250
251impl<T: SettingsStore> SettingsStore for ReadOpsTrackingStore<T> {
252 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
253 let result = self.inner.read_bin(key)?;
254 if self.tracking()
255 && let Some(v) = &result
256 {
257 SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
258 }
259 Ok(result)
260 }
261
262 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
263 self.inner.write_bin(key, value)
264 }
265
266 fn exists(&self, key: &str) -> anyhow::Result<bool> {
267 let result = self.inner.read_bin(key)?;
268 if self.tracking()
269 && let Some(v) = &result
270 {
271 SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
272 }
273 Ok(result.is_some())
274 }
275
276 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
277 self.inner.setting_keys()
278 }
279}
280
281impl<T: BitswapStoreRead> BitswapStoreRead for ReadOpsTrackingStore<T> {
282 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
283 let result = self.inner.get(cid)?;
284 if self.tracking()
285 && let Some(v) = &result
286 {
287 Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
288 }
289 Ok(result.is_some())
290 }
291
292 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
293 let result = self.inner.get(cid)?;
294 if self.tracking()
295 && let Some(v) = &result
296 {
297 Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
298 }
299 Ok(result)
300 }
301}
302
303impl<T: BitswapStoreReadWrite> BitswapStoreReadWrite for ReadOpsTrackingStore<T> {
304 type Hashes = <T as BitswapStoreReadWrite>::Hashes;
305
306 fn insert(&self, block: &Block64<Self::Hashes>) -> anyhow::Result<()> {
307 self.inner.insert(block)
308 }
309}
310
311impl<T: EthMappingsStore> EthMappingsStore for ReadOpsTrackingStore<T> {
312 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
313 let result = self.inner.read_bin(key)?;
314 if self.tracking()
315 && let Some(v) = &result
316 {
317 EthMappingsStore::write_bin(&self.tracker, key, v.as_slice())?;
318 }
319 Ok(result)
320 }
321
322 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
323 self.inner.write_bin(key, value)
324 }
325
326 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
327 self.inner.exists(key)
328 }
329
330 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
331 self.inner.get_message_cids()
332 }
333
334 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
335 self.inner.delete(keys)
336 }
337}