forest/tool/subcommands/api_cmd/
test_snapshot.rs1use crate::chain_sync::SyncStatusReport;
5use crate::{
6 KeyStore, KeyStoreConfig,
7 chain::ChainStore,
8 chain_sync::network_context::SyncNetworkContext,
9 db::{
10 MemoryDB,
11 car::{AnyCar, ManyCar},
12 },
13 genesis::read_genesis_header,
14 libp2p::{NetworkMessage, PeerManager},
15 lotus_json::HasLotusJson,
16 message_pool::{MessagePool, MpoolRpcProvider},
17 networks::{ChainConfig, NetworkChain},
18 rpc::{
19 ApiPaths, RPCState, RpcMethod, RpcMethodExt as _,
20 eth::{filter::EthEventHandler, types::EthHash},
21 },
22 shim::address::{CurrentNetwork, Network},
23 state_manager::StateManager,
24};
25use cid::Cid;
26use openrpc_types::ParamStructure;
27use parking_lot::RwLock;
28use serde::{Deserialize, Serialize};
29use std::{path::Path, str::FromStr, sync::Arc};
30use tokio::{sync::mpsc, task::JoinSet};
31
32#[derive(Default, Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
33pub struct Payload(#[serde(with = "crate::lotus_json::base64_standard")] pub Vec<u8>);
34
35#[derive(Default, PartialEq, Debug, Clone, Serialize, Deserialize)]
36pub struct Index {
37 pub eth_mappings: Option<ahash::HashMap<String, Payload>>,
38 pub indices: Option<ahash::HashMap<String, Payload>>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct RpcTestSnapshot {
43 pub chain: NetworkChain,
44 pub name: String,
45 pub params: serde_json::Value,
46 pub response: Result<serde_json::Value, String>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub index: Option<Index>,
49 #[serde(with = "crate::lotus_json::base64_standard")]
50 pub db: Vec<u8>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub api_path: Option<ApiPaths>,
53}
54
55fn backfill_eth_mappings(db: &MemoryDB, index: Option<Index>) -> anyhow::Result<()> {
56 if let Some(index) = index {
57 if let Some(mut guard) = db.eth_mappings_db.try_write()
58 && let Some(eth_mappings) = index.eth_mappings
59 {
60 for (k, v) in eth_mappings.iter() {
61 guard.insert(EthHash::from_str(k)?, v.0.clone());
62 }
63 }
64 if let Some(mut guard) = db.indices_db.try_write()
65 && let Some(indices) = index.indices
66 {
67 for (k, v) in indices.iter() {
68 guard.insert(Cid::from_str(k)?, v.0.clone());
69 }
70 }
71 }
72 Ok(())
73}
74
75pub async fn run_test_from_snapshot(path: &Path) -> anyhow::Result<()> {
76 let mut run = false;
77 let snapshot_bytes = std::fs::read(path)?;
78 let snapshot_bytes = if let Ok(bytes) = zstd::decode_all(snapshot_bytes.as_slice()) {
79 bytes
80 } else {
81 snapshot_bytes
82 };
83 let RpcTestSnapshot {
84 chain,
85 name: method_name,
86 params,
87 index,
88 db: db_bytes,
89 response: expected_response,
90 api_path,
91 } = serde_json::from_slice(snapshot_bytes.as_slice())?;
92 if chain.is_testnet() {
93 CurrentNetwork::set_global(Network::Testnet);
94 }
95 let api_path = api_path.unwrap_or(ApiPaths::V1);
96 let db = Arc::new(ManyCar::new(MemoryDB::default()).with_read_only(AnyCar::new(db_bytes)?)?);
97 backfill_eth_mappings(db.writer(), index)?;
99 let chain_config = Arc::new(ChainConfig::from_chain(&chain));
100 let (ctx, _, _) = ctx(db, chain_config).await?;
101 let params_raw = match serde_json::to_string(¶ms)? {
102 s if s.is_empty() => None,
103 s => Some(s),
104 };
105
106 macro_rules! run_test {
107 ($ty:ty) => {
108 if method_name.as_str() == <$ty>::NAME && <$ty>::API_PATHS.contains(api_path) {
109 let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?;
110 let result = <$ty>::handle(ctx.clone(), params)
111 .await
112 .map(|r| r.into_lotus_json())
113 .map_err(|e| e.inner().to_string());
114 let expected = match expected_response.clone() {
115 Ok(v) => serde_json::from_value(v).map_err(|e| e.to_string()),
116 Err(e) => Err(e),
117 };
118 assert_eq!(result, expected);
119 run = true;
120 }
121 };
122 }
123
124 crate::for_each_rpc_method!(run_test);
125
126 assert!(run, "RPC method {method_name} not found");
127
128 Ok(())
129}
130
131async fn ctx(
132 db: Arc<ManyCar<MemoryDB>>,
133 chain_config: Arc<ChainConfig>,
134) -> anyhow::Result<(
135 Arc<RPCState<ManyCar<MemoryDB>>>,
136 flume::Receiver<NetworkMessage>,
137 tokio::sync::mpsc::Receiver<()>,
138)> {
139 let (network_send, network_rx) = flume::bounded(5);
140 let (tipset_send, _) = flume::bounded(5);
141 let genesis_header =
142 read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?;
143 let chain_store = Arc::new(
144 ChainStore::new(
145 db.clone(),
146 db.clone(),
147 db.clone(),
148 db,
149 chain_config,
150 genesis_header.clone(),
151 )
152 .unwrap(),
153 );
154 let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap());
155 let message_pool = MessagePool::new(
156 MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()),
157 network_send.clone(),
158 Default::default(),
159 state_manager.chain_config().clone(),
160 &mut JoinSet::new(),
161 )?;
162
163 let peer_manager = Arc::new(PeerManager::default());
164 let sync_network_context =
165 SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
166 let (shutdown, shutdown_recv) = mpsc::channel(1);
167 let rpc_state = Arc::new(RPCState {
168 state_manager,
169 keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)),
170 mpool: Arc::new(message_pool),
171 bad_blocks: Default::default(),
172 msgs_in_tipset: Default::default(),
173 sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
174 eth_event_handler: Arc::new(EthEventHandler::new()),
175 sync_network_context,
176 start_time: chrono::Utc::now(),
177 shutdown,
178 tipset_send,
179 snapshot_progress_tracker: Default::default(),
180 });
181 Ok((rpc_state, network_rx, shutdown_recv))
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187 use crate::Config;
188 use crate::utils::net::{DownloadFileOption, download_file_with_cache};
189 use crate::utils::proofs_api::ensure_proof_params_downloaded;
190 use ahash::HashSet;
191 use anyhow::Context as _;
192 use directories::ProjectDirs;
193 use std::sync::LazyLock;
194 use std::time::{Duration, Instant};
195 use tokio::sync::Mutex;
196 use url::Url;
197
198 include!(concat!(env!("OUT_DIR"), "/__rpc_regression_tests_gen.rs"));
200
201 async fn rpc_regression_test_run(name: &str) {
202 if crate::utils::is_ci() && crate::utils::is_debug_build() {
204 return;
205 }
206
207 {
209 static PROOF_PARAMS_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
210 let _guard = PROOF_PARAMS_LOCK.lock().await;
211 crate::utils::proofs_api::maybe_set_proofs_parameter_cache_dir_env(
212 &Config::default().client.data_dir,
213 );
214 ensure_proof_params_downloaded().await.unwrap();
215 }
216 let url: Url =
217 format!("https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/rpc_test/{name}")
218 .parse()
219 .with_context(|| format!("Failed to parse URL for test: {name}"))
220 .unwrap();
221 let project_dir = ProjectDirs::from("com", "ChainSafe", "Forest").unwrap();
222 let cache_dir = project_dir.cache_dir().join("test").join("rpc-snapshots");
223 let path = crate::utils::retry(
224 crate::utils::RetryArgs {
225 timeout: Some(Duration::from_secs(if crate::utils::is_ci() {
226 20
227 } else {
228 120
229 })),
230 max_retries: Some(5),
231 ..Default::default()
232 },
233 || async {
234 download_file_with_cache(&url, &cache_dir, DownloadFileOption::NonResumable).await
235 },
236 )
237 .await
238 .unwrap()
239 .path;
240
241 unsafe { std::env::set_var(crate::utils::rand::FIXED_RNG_SEED_ENV, "4213666") };
246 print!("Testing {name} ...");
247 let start = Instant::now();
248 run_test_from_snapshot(&path).await.unwrap();
249 println!(
250 " succeeded, took {}.",
251 humantime::format_duration(start.elapsed())
252 );
253 }
254
255 #[test]
256 fn rpc_regression_tests_print_uncovered() {
257 let pattern = lazy_regex::regex!(r#"^(?P<name>filecoin_.+)_\d+\.rpcsnap\.json\.zst$"#);
258 let covered = HashSet::from_iter(
259 include_str!("test_snapshots.txt")
260 .trim()
261 .split("\n")
262 .map(|i| {
263 let captures = pattern.captures(i).expect("pattern capture failure");
264 captures
265 .name("name")
266 .expect("no named capture group")
267 .as_str()
268 .replace("_", ".")
269 .to_lowercase()
270 }),
271 );
272 let ignored = HashSet::from_iter(
273 include_str!("test_snapshots_ignored.txt")
274 .trim()
275 .split("\n")
276 .map(str::to_lowercase),
277 );
278
279 let mut uncovered = vec![];
280
281 macro_rules! print_uncovered {
282 ($ty:ty) => {
283 let name = <$ty>::NAME.to_lowercase();
284 if !covered.contains(&name) && !ignored.contains(&name) {
285 uncovered.push(<$ty>::NAME);
286 }
287 };
288 }
289
290 crate::for_each_rpc_method!(print_uncovered);
291
292 if !uncovered.is_empty() {
293 uncovered.sort();
294 println!("Uncovered RPC methods:");
295 for i in uncovered.iter() {
296 println!("{i}");
297 }
298 }
299
300 assert!(
301 uncovered.is_empty(),
302 "either ignore or upload test snapshots for uncovered RPC methods"
303 );
304 }
305}