forest/tool/subcommands/api_cmd/
stateful_tests.rs1use crate::eth::EVMMethod;
4use crate::message::SignedMessage;
5use crate::networks::calibnet::ETH_CHAIN_ID;
6use crate::rpc::eth::EthUint64;
7use crate::rpc::eth::types::*;
8use crate::rpc::types::ApiTipsetKey;
9use crate::rpc::{self, RpcMethod, prelude::*};
10use crate::shim::{address::Address, message::Message};
11
12use anyhow::{Context, ensure};
13use cbor4ii::core::Value;
14use cid::Cid;
15use futures::{SinkExt, StreamExt};
16use serde_json::json;
17use tokio::time::Duration;
18use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
19
20use std::io::{self, Write};
21use std::pin::Pin;
22use std::sync::Arc;
23
24type TestRunner = Arc<
25 dyn Fn(Arc<rpc::Client>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
26 + Send
27 + Sync,
28>;
29
30#[derive(Clone)]
31pub struct TestTransaction {
32 pub to: Address,
33 pub from: Address,
34 pub payload: Vec<u8>,
35 pub topic: EthHash,
36}
37
38#[derive(Clone)]
39pub struct RpcTestScenario {
40 pub run: TestRunner,
41 pub name: Option<&'static str>,
42 pub should_fail_with: Option<&'static str>,
43 pub used_methods: Vec<&'static str>,
44 pub ignore: Option<&'static str>,
45}
46
47impl RpcTestScenario {
48 pub fn basic<F, Fut>(run_fn: F) -> Self
50 where
51 F: Fn(Arc<rpc::Client>) -> Fut + Send + Sync + 'static,
52 Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
53 {
54 let run = Arc::new(move |client: Arc<rpc::Client>| {
55 Box::pin(run_fn(client)) as Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
56 });
57 Self {
58 run,
59 name: Default::default(),
60 should_fail_with: Default::default(),
61 used_methods: Default::default(),
62 ignore: None,
63 }
64 }
65
66 fn name(mut self, name: &'static str) -> Self {
67 self.name = Some(name);
68 self
69 }
70
71 pub fn should_fail_with(mut self, msg: &'static str) -> Self {
72 self.should_fail_with = Some(msg);
73 self
74 }
75
76 fn using<const ARITY: usize, M>(mut self) -> Self
77 where
78 M: RpcMethod<ARITY>,
79 {
80 self.used_methods.push(M::NAME);
81 if let Some(alias) = M::NAME_ALIAS {
82 self.used_methods.push(alias);
83 }
84 self
85 }
86
87 fn _ignore(mut self, msg: &'static str) -> Self {
88 self.ignore = Some(msg);
89 self
90 }
91}
92
93pub(super) async fn run_tests(
94 tests: impl IntoIterator<Item = RpcTestScenario> + Clone,
95 client: impl Into<Arc<rpc::Client>>,
96 filter: String,
97) -> anyhow::Result<()> {
98 let client: Arc<rpc::Client> = client.into();
99 let mut passed = 0;
100 let mut failed = 0;
101 let mut ignored = 0;
102 let mut filtered = 0;
103
104 println!("running {} tests", tests.clone().into_iter().count());
105
106 for (i, test) in tests.into_iter().enumerate() {
107 if !filter.is_empty() && !test.used_methods.iter().any(|m| m.starts_with(&filter)) {
108 filtered += 1;
109 continue;
110 }
111 if test.ignore.is_some() {
112 ignored += 1;
113 println!(
114 "test {} ... ignored",
115 if let Some(name) = test.name {
116 name.to_string()
117 } else {
118 format!("#{i}")
119 },
120 );
121 continue;
122 }
123
124 print!(
125 "test {} ... ",
126 if let Some(name) = test.name {
127 name.to_string()
128 } else {
129 format!("#{i}")
130 }
131 );
132
133 io::stdout().flush()?;
134
135 let result = (test.run)(client.clone()).await;
136
137 match result {
138 Ok(_) => {
139 if let Some(expected_msg) = test.should_fail_with {
140 println!("FAILED (expected failure containing '{expected_msg}')");
141 failed += 1;
142 } else {
143 println!("ok");
144 passed += 1;
145 }
146 }
147 Err(e) => {
148 if let Some(expected_msg) = test.should_fail_with {
149 let err_str = format!("{e:#}");
150 if err_str
151 .to_lowercase()
152 .contains(&expected_msg.to_lowercase())
153 {
154 println!("ok");
155 passed += 1;
156 } else {
157 println!("FAILED ({e:#})");
158 failed += 1;
159 }
160 } else {
161 println!("FAILED {e:#}");
162 failed += 1;
163 }
164 }
165 }
166 }
167 let status = if failed == 0 { "ok" } else { "FAILED" };
168 println!(
169 "test result: {status}. {passed} passed; {failed} failed; {ignored} ignored; {filtered} filtered out"
170 );
171 ensure!(failed == 0, "{failed} test(s) failed");
172 Ok(())
173}
174
175#[allow(unreachable_code)]
176async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> {
177 async fn close_channel(
178 stream: &mut tokio_tungstenite::WebSocketStream<
179 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
180 >,
181 id: &serde_json::Value,
182 ) -> anyhow::Result<()> {
183 let request = json!({
184 "jsonrpc": "2.0",
185 "id": 1,
186 "method": "xrpc.cancel",
187 "params": [id]
188 });
189
190 stream
191 .send(WsMessage::Text(request.to_string().into()))
192 .await
193 .context("failed to send close channel request")?;
194
195 Ok(())
196 }
197
198 let mut url = client.base_url().clone();
199 url.set_scheme("ws")
200 .map_err(|_| anyhow::anyhow!("failed to set scheme"))?;
201 url.set_path("/rpc/v1");
202
203 let (mut ws_stream, _) = connect_async(url.as_str()).await?;
205
206 let request = json!({
207 "jsonrpc": "2.0",
208 "id": 1,
209 "method": "Filecoin.ChainNotify",
210 "params": []
211 });
212 ws_stream
213 .send(WsMessage::Text(request.to_string().into()))
214 .await?;
215
216 let mut channel_id: Option<serde_json::Value> = None;
217
218 loop {
223 let msg = match tokio::time::timeout(Duration::from_secs(180), ws_stream.next()).await {
224 Ok(Some(msg)) => msg,
225 Ok(None) => anyhow::bail!("WebSocket stream closed"),
226 Err(_) => {
227 if let Some(id) = channel_id.as_ref() {
228 let _ = close_channel(&mut ws_stream, id).await;
229 }
230 let _ = ws_stream.close(None).await;
231 anyhow::bail!("timeout waiting for tipset");
232 }
233 };
234 match msg {
235 Ok(WsMessage::Text(text)) => {
236 let json: serde_json::Value = serde_json::from_str(&text)?;
237
238 if let Some(id) = json.get("result") {
239 channel_id = Some(id.clone());
240 } else {
241 let method = json!("xrpc.ch.val");
242 anyhow::ensure!(json.get("method") == Some(&method));
243
244 if let Some(params) = json.get("params").and_then(|v| v.as_array()) {
245 if let Some(id) = params.first() {
246 anyhow::ensure!(Some(id) == channel_id.as_ref());
247 } else {
248 anyhow::bail!("expecting an open channel");
249 }
250 if let Some(changes) = params.get(1).and_then(|v| v.as_array()) {
251 for change in changes {
252 if let Some(type_) = change.get("Type").and_then(|v| v.as_str()) {
253 if type_ == "apply" {
254 let id = channel_id.as_ref().ok_or_else(|| {
255 anyhow::anyhow!("subscription not opened")
256 })?;
257 close_channel(&mut ws_stream, id).await?;
258 ws_stream.close(None).await?;
259 return Ok(());
260 } else if type_ == "revert" {
261 let id = channel_id.as_ref().ok_or_else(|| {
262 anyhow::anyhow!("subscription not opened")
263 })?;
264 close_channel(&mut ws_stream, id).await?;
265 ws_stream.close(None).await?;
266 anyhow::bail!("revert");
267 }
268 }
269 }
270 }
271 } else {
272 let id = channel_id
273 .as_ref()
274 .ok_or_else(|| anyhow::anyhow!("subscription not opened"))?;
275 close_channel(&mut ws_stream, id).await?;
276 ws_stream.close(None).await?;
277 anyhow::bail!("expecting params");
278 }
279 }
280 }
281 Err(..) | Ok(WsMessage::Close(..)) => {
282 let id = channel_id
283 .as_ref()
284 .ok_or_else(|| anyhow::anyhow!("subscription not opened"))?;
285 close_channel(&mut ws_stream, id).await?;
286 ws_stream.close(None).await?;
287 anyhow::bail!("unexpected error or close message");
288 }
289 _ => {
290 }
292 }
293 }
294
295 unreachable!("loop always returns within the branches above")
296}
297
298async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> {
299 let tipset = client.call(ChainHead::request(())?).await?;
300 let mut retries = 100;
301 loop {
302 let pending = client
303 .call(MpoolPending::request((ApiTipsetKey(None),))?)
304 .await?;
305
306 if pending.0.iter().any(|msg| msg.cid() == message_cid) {
307 client
308 .call(
309 StateWaitMsg::request((message_cid, 1, tipset.epoch(), true))?
310 .with_timeout(Duration::from_secs(300)),
311 )
312 .await?;
313 break Ok(());
314 }
315 ensure!(retries != 0, "Message not found in mpool");
316 retries -= 1;
317
318 tokio::time::sleep(Duration::from_millis(10)).await;
319 }
320}
321
322async fn invoke_contract(client: &rpc::Client, tx: &TestTransaction) -> anyhow::Result<Cid> {
323 let encoded_params = cbor4ii::serde::to_vec(
324 Vec::with_capacity(tx.payload.len()),
325 &Value::Bytes(tx.payload.clone()),
326 )
327 .context("failed to encode params")?;
328 let nonce = client.call(MpoolGetNonce::request((tx.from,))?).await?;
329 let message = Message {
330 to: tx.to,
331 from: tx.from,
332 sequence: nonce,
333 method_num: EVMMethod::InvokeContract as u64,
334 params: encoded_params.into(),
335 ..Default::default()
336 };
337 let unsigned_msg = client
338 .call(GasEstimateMessageGas::request((
339 message,
340 None,
341 ApiTipsetKey(None),
342 ))?)
343 .await?;
344
345 let eth_tx_args = crate::eth::EthEip1559TxArgsBuilder::default()
346 .chain_id(ETH_CHAIN_ID)
347 .unsigned_message(&unsigned_msg.message)?
348 .build()
349 .map_err(|e| anyhow::anyhow!("Failed to build EIP-1559 transaction: {}", e))?;
350 let eth_tx = crate::eth::EthTx::Eip1559(Box::new(eth_tx_args));
351 let data = eth_tx.rlp_unsigned_message(ETH_CHAIN_ID)?;
352
353 let sig = client.call(WalletSign::request((tx.from, data))?).await?;
354 let smsg = SignedMessage::new_unchecked(unsigned_msg.message, sig);
355 let cid = smsg.cid();
356
357 client.call(MpoolPush::request((smsg,))?).await?;
358
359 Ok(cid)
360}
361
362fn create_eth_new_filter_test() -> RpcTestScenario {
363 RpcTestScenario::basic(|client| async move {
364 const BLOCK_RANGE: u64 = 200;
365
366 let last_block = client.call(EthBlockNumber::request(())?).await?;
367
368 let filter_spec = EthFilterSpec {
369 from_block: Some(EthUint64(last_block.0.saturating_sub(BLOCK_RANGE)).to_hex_string()),
370 to_block: Some(last_block.to_hex_string()),
371 ..Default::default()
372 };
373
374 let filter_id = client.call(EthNewFilter::request((filter_spec,))?).await?;
375
376 let removed = client
377 .call(EthUninstallFilter::request((filter_id.clone(),))?)
378 .await?;
379 anyhow::ensure!(removed);
380
381 let removed = client
382 .call(EthUninstallFilter::request((filter_id,))?)
383 .await?;
384 anyhow::ensure!(!removed);
385
386 Ok(())
387 })
388}
389
390fn create_eth_new_filter_limit_test(count: usize) -> RpcTestScenario {
391 RpcTestScenario::basic(move |client| async move {
392 const BLOCK_RANGE: u64 = 200;
393
394 let last_block = client.call(EthBlockNumber::request(())?).await?;
395
396 let filter_spec = EthFilterSpec {
397 from_block: Some(format!("0x{:x}", last_block.0.saturating_sub(BLOCK_RANGE))),
398 to_block: Some(last_block.to_hex_string()),
399 ..Default::default()
400 };
401
402 let mut ids = vec![];
403
404 for _ in 0..count {
405 let result = client
406 .call(EthNewFilter::request((filter_spec.clone(),))?)
407 .await;
408
409 match result {
410 Ok(filter_id) => ids.push(filter_id),
411 Err(e) => {
412 for id in ids {
414 let removed = client.call(EthUninstallFilter::request((id,))?).await?;
415 anyhow::ensure!(removed);
416 }
417 anyhow::bail!(e)
418 }
419 }
420 }
421
422 for id in ids {
423 let removed = client.call(EthUninstallFilter::request((id,))?).await?;
424 anyhow::ensure!(removed);
425 }
426
427 Ok(())
428 })
429}
430
431fn eth_new_block_filter() -> RpcTestScenario {
432 RpcTestScenario::basic(move |client| async move {
433 async fn process_filter(client: &rpc::Client, filter_id: &FilterID) -> anyhow::Result<()> {
434 let filter_result = client
435 .call(EthGetFilterChanges::request((filter_id.clone(),))?)
436 .await?;
437
438 if let EthFilterResult::Hashes(prev_hashes) = filter_result {
439 let verify_hashes = async |hashes: &[EthHash]| -> anyhow::Result<()> {
440 for hash in hashes {
441 let _block = client
442 .call(EthGetBlockByHash::request((hash.clone(), false))?)
443 .await?;
444 }
445 Ok(())
446 };
447 verify_hashes(&prev_hashes).await?;
448
449 next_tipset(client).await?;
451
452 let filter_result = client
453 .call(EthGetFilterChanges::request((filter_id.clone(),))?)
454 .await?;
455
456 if let EthFilterResult::Hashes(hashes) = filter_result {
457 verify_hashes(&hashes).await?;
458 anyhow::ensure!(
459 (prev_hashes.is_empty() && hashes.is_empty()) || prev_hashes != hashes,
460 );
461
462 Ok(())
463 } else {
464 Err(anyhow::anyhow!("expecting blocks"))
465 }
466 } else {
467 Err(anyhow::anyhow!("expecting blocks"))
468 }
469 }
470
471 let mut retries = 5;
472 loop {
473 let filter_id = client.call(EthNewBlockFilter::request(())?).await?;
475
476 let result = match process_filter(&client, &filter_id).await {
477 Ok(()) => Ok(()),
478 Err(e) if retries != 0 && e.to_string().contains("revert") => {
479 let removed = client
481 .call(EthUninstallFilter::request((filter_id,))?)
482 .await?;
483 anyhow::ensure!(removed);
484
485 retries -= 1;
486 continue;
487 }
488 Err(e) => Err(e),
489 };
490
491 let removed = client
493 .call(EthUninstallFilter::request((filter_id,))?)
494 .await?;
495 anyhow::ensure!(removed);
496
497 break result;
498 }
499 })
500}
501
502fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario {
503 RpcTestScenario::basic(move |client| {
504 let tx = tx.clone();
505 async move {
506 let filter_id = client
507 .call(EthNewPendingTransactionFilter::request(())?)
508 .await?;
509
510 let filter_result = client
511 .call(EthGetFilterChanges::request((filter_id.clone(),))?)
512 .await?;
513
514 let result = if let EthFilterResult::Hashes(prev_hashes) = filter_result {
515 let cid = invoke_contract(&client, &tx).await?;
516
517 wait_pending_message(&client, cid).await?;
518
519 let filter_result = client
520 .call(EthGetFilterChanges::request((filter_id.clone(),))?)
521 .await?;
522
523 if let EthFilterResult::Hashes(hashes) = filter_result {
524 anyhow::ensure!(
525 prev_hashes != hashes,
526 "prev_hashes={prev_hashes:?} hashes={hashes:?}"
527 );
528
529 let mut cids = vec![];
530 for hash in &hashes {
531 if let Some(cid) = client
532 .call(EthGetMessageCidByTransactionHash::request((hash.clone(),))?)
533 .await?
534 {
535 cids.push(cid);
536 }
537 }
538
539 anyhow::ensure!(
540 cids.contains(&cid),
541 "CID missing from filter results: cid={cid:?} cids={cids:?} hashes={hashes:?}"
542 );
543
544 Ok(())
545 } else {
546 Err(anyhow::anyhow!("expecting hashes"))
547 }
548 } else {
549 Err(anyhow::anyhow!("expecting transactions"))
550 };
551
552 let removed = client
553 .call(EthUninstallFilter::request((filter_id,))?)
554 .await?;
555 anyhow::ensure!(removed);
556
557 result
558 }
559 })
560}
561
562fn as_logs(input: EthFilterResult) -> EthFilterResult {
563 match input {
564 EthFilterResult::Hashes(vec) if vec.is_empty() => EthFilterResult::Logs(Vec::new()),
565 other => other,
566 }
567}
568
569fn eth_get_filter_logs(tx: TestTransaction) -> RpcTestScenario {
570 RpcTestScenario::basic(move |client| {
571 let tx = tx.clone();
572 async move {
573 const BLOCK_RANGE: u64 = 1;
574
575 let tipset = client.call(ChainHead::request(())?).await?;
576 let cid = invoke_contract(&client, &tx).await?;
577 let lookup = client
578 .call(
579 StateWaitMsg::request((cid, 1, tipset.epoch(), true))?
580 .with_timeout(Duration::from_secs(300)),
581 )
582 .await?;
583 let block_num = EthUint64(lookup.height as u64);
584
585 let topics = EthTopicSpec(vec![EthHashList::Single(Some(tx.topic))]);
586 let filter_spec = EthFilterSpec {
587 from_block: Some(format!("0x{:x}", block_num.0.saturating_sub(BLOCK_RANGE))),
588 topics: Some(topics),
589 ..Default::default()
590 };
591
592 let filter_id = client
593 .call(EthNewFilter::request((filter_spec.clone(),))?)
594 .await?;
595 let filter_result = as_logs(
596 client
597 .call(EthGetFilterLogs::request((filter_id.clone(),))?)
598 .await?,
599 );
600 let result = if let EthFilterResult::Logs(logs) = filter_result {
601 anyhow::ensure!(
602 !logs.is_empty(),
603 "Empty logs: filter_spec={filter_spec:?} cid={cid:?}",
604 );
605 Ok(())
606 } else {
607 Err(anyhow::anyhow!("expecting logs"))
608 };
609
610 let removed = client
611 .call(EthUninstallFilter::request((filter_id,))?)
612 .await?;
613 anyhow::ensure!(removed);
614
615 result
616 }
617 })
618}
619
620const LOTUS_EVENTS_MAXFILTERS: usize = 100;
621
622macro_rules! with_methods {
623 ( $builder:expr, $( $method:ty ),+ ) => {{
624 let mut b = $builder;
625 $(
626 b = b.using::<{ <$method>::N_REQUIRED_PARAMS }, $method>();
627 )+
628 b
629 }};
630}
631
632pub(super) async fn create_tests(tx: TestTransaction) -> Vec<RpcTestScenario> {
633 vec![
634 with_methods!(
635 create_eth_new_filter_test().name("eth_newFilter install/uninstall"),
636 EthNewFilter,
637 EthUninstallFilter
638 ),
639 with_methods!(
640 create_eth_new_filter_limit_test(20).name("eth_newFilter under limit"),
641 EthNewFilter,
642 EthUninstallFilter
643 ),
644 with_methods!(
645 create_eth_new_filter_limit_test(LOTUS_EVENTS_MAXFILTERS)
646 .name("eth_newFilter just under limit"),
647 EthNewFilter,
648 EthUninstallFilter
649 ),
650 with_methods!(
651 create_eth_new_filter_limit_test(LOTUS_EVENTS_MAXFILTERS + 1)
652 .name("eth_newFilter over limit")
653 .should_fail_with("maximum number of filters registered"),
654 EthNewFilter,
655 EthUninstallFilter
656 ),
657 with_methods!(
658 eth_new_block_filter().name("eth_newBlockFilter works"),
659 EthNewBlockFilter,
660 EthGetFilterChanges,
661 EthUninstallFilter
662 ),
663 with_methods!(
664 eth_new_pending_transaction_filter(tx.clone())
665 .name("eth_newPendingTransactionFilter works"),
666 EthNewPendingTransactionFilter,
667 EthGetFilterChanges,
668 EthUninstallFilter
669 ),
670 with_methods!(
671 eth_get_filter_logs(tx.clone()).name("eth_getFilterLogs works"),
672 EthNewFilter,
673 EthGetFilterLogs,
674 EthUninstallFilter
675 ),
676 ]
677}