forest/tool/subcommands/api_cmd/
generate_test_snapshot.rs1use super::*;
5use crate::chain_sync::SyncStatusReport;
6use crate::{
7 KeyStore, KeyStoreConfig,
8 blocks::TipsetKey,
9 chain::ChainStore,
10 chain_sync::network_context::SyncNetworkContext,
11 daemon::db_util::load_all_forest_cars,
12 db::{
13 CAR_DB_DIR_NAME, EthMappingsStore, HeaviestTipsetKeyProvider, IndicesStore, MemoryDB,
14 SettingsStore, SettingsStoreExt, db_engine::open_db, parity_db::ParityDb,
15 },
16 genesis::read_genesis_header,
17 libp2p::{NetworkMessage, PeerManager},
18 libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite, Block64},
19 message_pool::{MessagePool, MpoolRpcProvider},
20 networks::ChainConfig,
21 shim::address::CurrentNetwork,
22 state_manager::StateManager,
23};
24use api_compare_tests::TestDump;
25use fvm_shared4::address::Network;
26use openrpc_types::ParamStructure;
27use parking_lot::RwLock;
28use rpc::{RPCState, RpcMethod as _, eth::filter::EthEventHandler};
29use tokio::{sync::mpsc, task::JoinSet};
30
31pub async fn run_test_with_dump(
32 test_dump: &TestDump,
33 db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
34 chain: &NetworkChain,
35 allow_response_mismatch: bool,
36 allow_failure: bool,
37) -> anyhow::Result<()> {
38 if chain.is_testnet() {
39 CurrentNetwork::set_global(Network::Testnet);
40 }
41 let mut run = false;
42 let chain_config = Arc::new(ChainConfig::from_chain(chain));
43 let (ctx, _, _) = ctx(db, chain_config).await?;
44 let params_raw = Some(serde_json::to_string(&test_dump.request.params)?);
45 macro_rules! run_test {
46 ($ty:ty) => {
47 if test_dump.request.method_name.as_ref() == <$ty>::NAME {
48 let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?;
49 match <$ty>::handle(ctx.clone(), params).await {
50 Ok(result) => {
51 anyhow::ensure!(
52 allow_response_mismatch
53 || test_dump.forest_response == Ok(result.into_lotus_json_value()?),
54 "Response mismatch between Forest and Lotus"
55 );
56 }
57 Err(_) if allow_failure => {
58 }
60 Err(e) => {
61 bail!("Error running test {}: {}", <$ty>::NAME, e);
62 }
63 }
64 run = true;
65 }
66 };
67 }
68 crate::for_each_rpc_method!(run_test);
69 anyhow::ensure!(run, "RPC method not found");
70 Ok(())
71}
72
73pub fn load_db(db_root: &Path) -> anyhow::Result<Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>> {
74 let db_writer = open_db(db_root.into(), &Default::default())?;
75 let db = ManyCar::new(db_writer);
76 let forest_car_db_dir = db_root.join(CAR_DB_DIR_NAME);
77 load_all_forest_cars(&db, &forest_car_db_dir)?;
78 Ok(Arc::new(ReadOpsTrackingStore::new(db)))
79}
80
81pub(super) fn build_index(db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>) -> Option<Index> {
82 let mut index = Index::default();
83 let reader = db.tracker.eth_mappings_db.read();
84 for (k, v) in reader.iter() {
85 index
86 .eth_mappings
87 .get_or_insert_with(ahash::HashMap::default)
88 .insert(k.to_string(), Payload(v.clone()));
89 }
90 let reader = db.tracker.indices_db.read();
91 for (k, v) in reader.iter() {
92 index
93 .indices
94 .get_or_insert_with(ahash::HashMap::default)
95 .insert(k.to_string(), Payload(v.clone()));
96 }
97 if index == Index::default() {
98 None
99 } else {
100 Some(index)
101 }
102}
103
104async fn ctx(
105 db: Arc<ReadOpsTrackingStore<ManyCar<ParityDb>>>,
106 chain_config: Arc<ChainConfig>,
107) -> anyhow::Result<(
108 Arc<RPCState<ReadOpsTrackingStore<ManyCar<ParityDb>>>>,
109 flume::Receiver<NetworkMessage>,
110 tokio::sync::mpsc::Receiver<()>,
111)> {
112 let (network_send, network_rx) = flume::bounded(5);
113 let (tipset_send, _) = flume::bounded(5);
114 let genesis_header =
115 read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?;
116
117 let chain_store = Arc::new(
118 ChainStore::new(
119 db.clone(),
120 db.clone(),
121 db.clone(),
122 db,
123 chain_config,
124 genesis_header,
125 )
126 .unwrap(),
127 );
128
129 let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap());
130 let message_pool = MessagePool::new(
131 MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()),
132 network_send.clone(),
133 Default::default(),
134 state_manager.chain_config().clone(),
135 &mut JoinSet::new(),
136 )?;
137
138 let peer_manager = Arc::new(PeerManager::default());
139 let sync_network_context =
140 SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
141 let (shutdown, shutdown_recv) = mpsc::channel(1);
142 let rpc_state = Arc::new(RPCState {
143 state_manager,
144 keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)),
145 mpool: Arc::new(message_pool),
146 bad_blocks: Default::default(),
147 msgs_in_tipset: Default::default(),
148 sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
149 eth_event_handler: Arc::new(EthEventHandler::new()),
150 sync_network_context,
151 start_time: chrono::Utc::now(),
152 shutdown,
153 tipset_send,
154 snapshot_progress_tracker: Default::default(),
155 });
156 Ok((rpc_state, network_rx, shutdown_recv))
157}
158
159pub struct ReadOpsTrackingStore<T> {
161 inner: T,
162 pub tracker: Arc<MemoryDB>,
163}
164
165impl<T> ReadOpsTrackingStore<T>
166where
167 T: Blockstore + SettingsStore + HeaviestTipsetKeyProvider,
168{
169 fn is_chain_head_tracked(&self) -> anyhow::Result<bool> {
170 SettingsStore::exists(&self.tracker, crate::db::setting_keys::HEAD_KEY)
171 }
172
173 pub fn ensure_chain_head_is_tracked(&self) -> anyhow::Result<()> {
174 if !self.is_chain_head_tracked()? {
175 SettingsStoreExt::write_obj(
176 &self.tracker,
177 crate::db::setting_keys::HEAD_KEY,
178 &self.inner.heaviest_tipset_key()?,
179 )?;
180 }
181
182 Ok(())
183 }
184}
185
186impl<T> ReadOpsTrackingStore<T>
187where
188 T: Blockstore + SettingsStore,
189{
190 pub fn new(inner: T) -> Self {
191 Self {
192 inner,
193 tracker: Arc::new(Default::default()),
194 }
195 }
196
197 pub async fn export_forest_car<W: tokio::io::AsyncWrite + Unpin>(
198 &self,
199 writer: &mut W,
200 ) -> anyhow::Result<()> {
201 self.tracker.export_forest_car(writer).await
202 }
203}
204
205impl<T: HeaviestTipsetKeyProvider> HeaviestTipsetKeyProvider for ReadOpsTrackingStore<T> {
206 fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
207 self.inner.heaviest_tipset_key()
208 }
209
210 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
211 self.inner.set_heaviest_tipset_key(tsk)
212 }
213}
214
215impl<T: Blockstore> Blockstore for ReadOpsTrackingStore<T> {
216 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
217 let result = self.inner.get(k)?;
218 if let Some(v) = &result {
219 self.tracker.put_keyed(k, v.as_slice())?;
220 }
221 Ok(result)
222 }
223
224 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
225 self.inner.put_keyed(k, block)
226 }
227}
228
229impl<T: SettingsStore> SettingsStore for ReadOpsTrackingStore<T> {
230 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
231 let result = self.inner.read_bin(key)?;
232 if let Some(v) = &result {
233 SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
234 }
235 Ok(result)
236 }
237
238 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
239 self.inner.write_bin(key, value)
240 }
241
242 fn exists(&self, key: &str) -> anyhow::Result<bool> {
243 let result = self.inner.read_bin(key)?;
244 if let Some(v) = &result {
245 SettingsStore::write_bin(&self.tracker, key, v.as_slice())?;
246 }
247 Ok(result.is_some())
248 }
249
250 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
251 self.inner.setting_keys()
252 }
253}
254
255impl<T: BitswapStoreRead> BitswapStoreRead for ReadOpsTrackingStore<T> {
256 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
257 let result = self.inner.get(cid)?;
258 if let Some(v) = &result {
259 Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
260 }
261 Ok(result.is_some())
262 }
263
264 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
265 let result = self.inner.get(cid)?;
266 if let Some(v) = &result {
267 Blockstore::put_keyed(&self.tracker, cid, v.as_slice())?;
268 }
269 Ok(result)
270 }
271}
272
273impl<T: BitswapStoreReadWrite> BitswapStoreReadWrite for ReadOpsTrackingStore<T> {
274 type Hashes = <T as BitswapStoreReadWrite>::Hashes;
275
276 fn insert(&self, block: &Block64<Self::Hashes>) -> anyhow::Result<()> {
277 self.inner.insert(block)
278 }
279}
280
281impl<T: EthMappingsStore> EthMappingsStore for ReadOpsTrackingStore<T> {
282 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
283 let result = self.inner.read_bin(key)?;
284 if let Some(v) = &result {
285 EthMappingsStore::write_bin(&self.tracker, key, v.as_slice())?;
286 }
287 self.inner.read_bin(key)
288 }
289
290 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
291 self.inner.write_bin(key, value)
292 }
293
294 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
295 self.inner.exists(key)
296 }
297
298 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
299 self.inner.get_message_cids()
300 }
301
302 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
303 self.inner.delete(keys)
304 }
305}
306
307impl<T: IndicesStore> IndicesStore for ReadOpsTrackingStore<T> {
308 fn read_bin(&self, key: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
309 let result = self.inner.read_bin(key)?;
310 if let Some(v) = &result {
311 IndicesStore::write_bin(&self.tracker, key, v.as_slice())?;
312 }
313 self.inner.read_bin(key)
314 }
315
316 fn write_bin(&self, key: &Cid, value: &[u8]) -> anyhow::Result<()> {
317 self.inner.write_bin(key, value)
318 }
319
320 fn exists(&self, key: &Cid) -> anyhow::Result<bool> {
321 self.inner.exists(key)
322 }
323}