forest/tool/subcommands/api_cmd/
stateful_tests.rs1use crate::eth::EVMMethod;
4use crate::rpc::eth::EthUint64;
5use crate::rpc::eth::types::*;
6use crate::rpc::types::ApiTipsetKey;
7use crate::rpc::{self, RpcMethod, prelude::*};
8use crate::shim::{address::Address, message::Message};
9
10use anyhow::{Context, ensure};
11use cbor4ii::core::Value;
12use cid::Cid;
13use futures::{SinkExt, StreamExt};
14use serde_json::json;
15use tokio::time::Duration;
16use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
17
18use std::io::{self, Write};
19use std::pin::Pin;
20use std::sync::Arc;
21
22type TestRunner = Arc<
23 dyn Fn(Arc<rpc::Client>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
24 + Send
25 + Sync,
26>;
27
28#[derive(Clone)]
29pub struct TestTransaction {
30 pub to: Address,
31 pub from: Address,
32 pub payload: Vec<u8>,
33 pub topic: EthHash,
34}
35
36#[derive(Clone)]
37pub struct RpcTestScenario {
38 pub run: TestRunner,
39 pub name: Option<&'static str>,
40 pub should_fail_with: Option<&'static str>,
41 pub used_methods: Vec<&'static str>,
42 pub ignore: Option<&'static str>,
43}
44
45impl RpcTestScenario {
46 pub fn basic<F, Fut>(run_fn: F) -> Self
48 where
49 F: Fn(Arc<rpc::Client>) -> Fut + Send + Sync + 'static,
50 Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
51 {
52 let run = Arc::new(move |client: Arc<rpc::Client>| {
53 Box::pin(run_fn(client)) as Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
54 });
55 Self {
56 run,
57 name: Default::default(),
58 should_fail_with: Default::default(),
59 used_methods: Default::default(),
60 ignore: None,
61 }
62 }
63
64 fn name(mut self, name: &'static str) -> Self {
65 self.name = Some(name);
66 self
67 }
68
69 pub fn should_fail_with(mut self, msg: &'static str) -> Self {
70 self.should_fail_with = Some(msg);
71 self
72 }
73
74 fn using<const ARITY: usize, M>(mut self) -> Self
75 where
76 M: RpcMethod<ARITY>,
77 {
78 self.used_methods.push(M::NAME);
79 if let Some(alias) = M::NAME_ALIAS {
80 self.used_methods.push(alias);
81 }
82 self
83 }
84
85 fn ignore(mut self, msg: &'static str) -> Self {
86 self.ignore = Some(msg);
87 self
88 }
89}
90
91pub(super) async fn run_tests(
92 tests: impl IntoIterator<Item = RpcTestScenario> + Clone,
93 client: impl Into<Arc<rpc::Client>>,
94 filter: String,
95) -> anyhow::Result<()> {
96 let client: Arc<rpc::Client> = client.into();
97 let mut passed = 0;
98 let mut failed = 0;
99 let mut ignored = 0;
100 let mut filtered = 0;
101
102 println!("running {} tests", tests.clone().into_iter().count());
103
104 for (i, test) in tests.into_iter().enumerate() {
105 if !filter.is_empty() && !test.used_methods.iter().any(|m| m.starts_with(&filter)) {
106 filtered += 1;
107 continue;
108 }
109 if test.ignore.is_some() {
110 ignored += 1;
111 println!(
112 "test {} ... ignored",
113 if let Some(name) = test.name {
114 name.to_string()
115 } else {
116 format!("#{i}")
117 },
118 );
119 continue;
120 }
121
122 print!(
123 "test {} ... ",
124 if let Some(name) = test.name {
125 name.to_string()
126 } else {
127 format!("#{i}")
128 }
129 );
130
131 io::stdout().flush()?;
132
133 let result = (test.run)(client.clone()).await;
134
135 match result {
136 Ok(_) => {
137 if let Some(expected_msg) = test.should_fail_with {
138 println!("FAILED (expected failure containing '{expected_msg}')");
139 failed += 1;
140 } else {
141 println!("ok");
142 passed += 1;
143 }
144 }
145 Err(e) => {
146 if let Some(expected_msg) = test.should_fail_with {
147 let err_str = format!("{e:#}");
148 if err_str
149 .to_lowercase()
150 .contains(&expected_msg.to_lowercase())
151 {
152 println!("ok");
153 passed += 1;
154 } else {
155 println!("FAILED ({e:#})");
156 failed += 1;
157 }
158 } else {
159 println!("FAILED {e:#}");
160 failed += 1;
161 }
162 }
163 }
164 }
165 let status = if failed == 0 { "ok" } else { "FAILED" };
166 println!(
167 "test result: {status}. {passed} passed; {failed} failed; {ignored} ignored; {filtered} filtered out"
168 );
169 ensure!(failed == 0, "{failed} test(s) failed");
170 Ok(())
171}
172
173#[allow(unreachable_code)]
174async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> {
175 async fn close_channel(
176 stream: &mut tokio_tungstenite::WebSocketStream<
177 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
178 >,
179 id: &serde_json::Value,
180 ) -> anyhow::Result<()> {
181 let request = json!({
182 "jsonrpc": "2.0",
183 "id": 1,
184 "method": "xrpc.cancel",
185 "params": [id]
186 });
187
188 stream
189 .send(WsMessage::Text(request.to_string().into()))
190 .await
191 .context("failed to send close channel request")?;
192
193 Ok(())
194 }
195
196 let mut url = client.base_url().clone();
197 url.set_scheme("ws")
198 .map_err(|_| anyhow::anyhow!("failed to set scheme"))?;
199 url.set_path("/rpc/v1");
200
201 let (mut ws_stream, _) = connect_async(url.as_str()).await?;
203
204 let request = json!({
205 "jsonrpc": "2.0",
206 "id": 1,
207 "method": "Filecoin.ChainNotify",
208 "params": []
209 });
210 ws_stream
211 .send(WsMessage::Text(request.to_string().into()))
212 .await?;
213
214 let mut channel_id: Option<serde_json::Value> = None;
215
216 loop {
221 let msg = match tokio::time::timeout(Duration::from_secs(180), ws_stream.next()).await {
222 Ok(Some(msg)) => msg,
223 Ok(None) => anyhow::bail!("WebSocket stream closed"),
224 Err(_) => {
225 if let Some(id) = channel_id.as_ref() {
226 let _ = close_channel(&mut ws_stream, id).await;
227 }
228 let _ = ws_stream.close(None).await;
229 anyhow::bail!("timeout waiting for tipset");
230 }
231 };
232 match msg {
233 Ok(WsMessage::Text(text)) => {
234 let json: serde_json::Value = serde_json::from_str(&text)?;
235
236 if let Some(id) = json.get("result") {
237 channel_id = Some(id.clone());
238 } else {
239 let method = json!("xrpc.ch.val");
240 anyhow::ensure!(json.get("method") == Some(&method));
241
242 if let Some(params) = json.get("params").and_then(|v| v.as_array()) {
243 if let Some(id) = params.first() {
244 anyhow::ensure!(Some(id) == channel_id.as_ref());
245 } else {
246 anyhow::bail!("expecting an open channel");
247 }
248 if let Some(changes) = params.get(1).and_then(|v| v.as_array()) {
249 for change in changes {
250 if let Some(type_) = change.get("Type").and_then(|v| v.as_str()) {
251 if type_ == "apply" {
252 let id = channel_id.as_ref().ok_or_else(|| {
253 anyhow::anyhow!("subscription not opened")
254 })?;
255 close_channel(&mut ws_stream, id).await?;
256 ws_stream.close(None).await?;
257 return Ok(());
258 } else if type_ == "revert" {
259 let id = channel_id.as_ref().ok_or_else(|| {
260 anyhow::anyhow!("subscription not opened")
261 })?;
262 close_channel(&mut ws_stream, id).await?;
263 ws_stream.close(None).await?;
264 anyhow::bail!("revert");
265 }
266 }
267 }
268 }
269 } else {
270 let id = channel_id
271 .as_ref()
272 .ok_or_else(|| anyhow::anyhow!("subscription not opened"))?;
273 close_channel(&mut ws_stream, id).await?;
274 ws_stream.close(None).await?;
275 anyhow::bail!("expecting params");
276 }
277 }
278 }
279 Err(..) | Ok(WsMessage::Close(..)) => {
280 let id = channel_id
281 .as_ref()
282 .ok_or_else(|| anyhow::anyhow!("subscription not opened"))?;
283 close_channel(&mut ws_stream, id).await?;
284 ws_stream.close(None).await?;
285 anyhow::bail!("unexpected error or close message");
286 }
287 _ => {
288 }
290 }
291 }
292
293 unreachable!("loop always returns within the branches above")
294}
295
296async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> {
297 let mut retries = 100;
298 loop {
299 let pending = client
300 .call(MpoolPending::request((ApiTipsetKey(None),))?)
301 .await?;
302
303 if pending.0.iter().any(|msg| msg.cid() == message_cid) {
304 break Ok(());
305 }
306 ensure!(retries != 0, "Message not found in mpool");
307 retries -= 1;
308
309 tokio::time::sleep(Duration::from_millis(10)).await;
310 }
311}
312
313fn create_eth_new_filter_test() -> RpcTestScenario {
314 RpcTestScenario::basic(|client| async move {
315 const BLOCK_RANGE: u64 = 200;
316
317 let last_block = client.call(EthBlockNumber::request(())?).await?;
318
319 let filter_spec = EthFilterSpec {
320 from_block: Some(EthUint64(last_block.0.saturating_sub(BLOCK_RANGE)).to_hex_string()),
321 to_block: Some(last_block.to_hex_string()),
322 ..Default::default()
323 };
324
325 let filter_id = client.call(EthNewFilter::request((filter_spec,))?).await?;
326
327 let removed = client
328 .call(EthUninstallFilter::request((filter_id.clone(),))?)
329 .await?;
330 anyhow::ensure!(removed);
331
332 let removed = client
333 .call(EthUninstallFilter::request((filter_id,))?)
334 .await?;
335 anyhow::ensure!(!removed);
336
337 Ok(())
338 })
339}
340
341fn create_eth_new_filter_limit_test(count: usize) -> RpcTestScenario {
342 RpcTestScenario::basic(move |client| async move {
343 const BLOCK_RANGE: u64 = 200;
344
345 let last_block = client.call(EthBlockNumber::request(())?).await?;
346
347 let filter_spec = EthFilterSpec {
348 from_block: Some(format!("0x{:x}", last_block.0.saturating_sub(BLOCK_RANGE))),
349 to_block: Some(last_block.to_hex_string()),
350 ..Default::default()
351 };
352
353 let mut ids = vec![];
354
355 for _ in 0..count {
356 let result = client
357 .call(EthNewFilter::request((filter_spec.clone(),))?)
358 .await;
359
360 match result {
361 Ok(filter_id) => ids.push(filter_id),
362 Err(e) => {
363 for id in ids {
365 let removed = client.call(EthUninstallFilter::request((id,))?).await?;
366 anyhow::ensure!(removed);
367 }
368 anyhow::bail!(e)
369 }
370 }
371 }
372
373 for id in ids {
374 let removed = client.call(EthUninstallFilter::request((id,))?).await?;
375 anyhow::ensure!(removed);
376 }
377
378 Ok(())
379 })
380}
381
382fn eth_new_block_filter() -> RpcTestScenario {
383 RpcTestScenario::basic(move |client| async move {
384 async fn process_filter(client: &rpc::Client, filter_id: &FilterID) -> anyhow::Result<()> {
385 let filter_result = client
386 .call(EthGetFilterChanges::request((filter_id.clone(),))?)
387 .await?;
388
389 if let EthFilterResult::Hashes(prev_hashes) = filter_result {
390 let verify_hashes = async |hashes: &[EthHash]| -> anyhow::Result<()> {
391 for hash in hashes {
392 let _block = client
393 .call(EthGetBlockByHash::request((hash.clone(), false))?)
394 .await?;
395 }
396 Ok(())
397 };
398 verify_hashes(&prev_hashes).await?;
399
400 next_tipset(client).await?;
402
403 let filter_result = client
404 .call(EthGetFilterChanges::request((filter_id.clone(),))?)
405 .await?;
406
407 if let EthFilterResult::Hashes(hashes) = filter_result {
408 verify_hashes(&hashes).await?;
409 anyhow::ensure!(
410 (prev_hashes.is_empty() && hashes.is_empty()) || prev_hashes != hashes,
411 );
412
413 Ok(())
414 } else {
415 Err(anyhow::anyhow!("expecting blocks"))
416 }
417 } else {
418 Err(anyhow::anyhow!("expecting blocks"))
419 }
420 }
421
422 let mut retries = 5;
423 loop {
424 let filter_id = client.call(EthNewBlockFilter::request(())?).await?;
426
427 let result = match process_filter(&client, &filter_id).await {
428 Ok(()) => Ok(()),
429 Err(e) if retries != 0 && e.to_string().contains("revert") => {
430 let removed = client
432 .call(EthUninstallFilter::request((filter_id,))?)
433 .await?;
434 anyhow::ensure!(removed);
435
436 retries -= 1;
437 continue;
438 }
439 Err(e) => Err(e),
440 };
441
442 let removed = client
444 .call(EthUninstallFilter::request((filter_id,))?)
445 .await?;
446 anyhow::ensure!(removed);
447
448 break result;
449 }
450 })
451}
452
453fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario {
454 RpcTestScenario::basic(move |client| {
455 let tx = tx.clone();
456 async move {
457 let filter_id = client
458 .call(EthNewPendingTransactionFilter::request(())?)
459 .await?;
460
461 let filter_result = client
462 .call(EthGetFilterChanges::request((filter_id.clone(),))?)
463 .await?;
464
465 let result = if let EthFilterResult::Hashes(prev_hashes) = filter_result {
466 let encoded = cbor4ii::serde::to_vec(
467 Vec::with_capacity(tx.payload.len()),
468 &Value::Bytes(tx.payload.clone()),
469 )
470 .context("failed to encode params")?;
471
472 let message = Message {
473 to: tx.to,
474 from: tx.from,
475 method_num: EVMMethod::InvokeContract as u64,
476 params: encoded.into(),
477 ..Default::default()
478 };
479
480 let smsg = client
481 .call(MpoolPushMessage::request((message, None))?)
482 .await?;
483
484 wait_pending_message(&client, smsg.cid()).await?;
485
486 let filter_result = client
487 .call(EthGetFilterChanges::request((filter_id.clone(),))?)
488 .await?;
489
490 if let EthFilterResult::Hashes(hashes) = filter_result {
491 anyhow::ensure!(prev_hashes != hashes);
492
493 let mut cids = vec![];
494 for hash in hashes {
495 if let Some(cid) = client
496 .call(EthGetMessageCidByTransactionHash::request((hash,))?)
497 .await?
498 {
499 cids.push(cid);
500 }
501 }
502
503 anyhow::ensure!(cids.contains(&smsg.cid()));
504
505 Ok(())
506 } else {
507 Err(anyhow::anyhow!("expecting hashes"))
508 }
509 } else {
510 Err(anyhow::anyhow!("expecting transactions"))
511 };
512
513 let removed = client
514 .call(EthUninstallFilter::request((filter_id,))?)
515 .await?;
516 anyhow::ensure!(removed);
517
518 result
519 }
520 })
521}
522
523fn as_logs(input: EthFilterResult) -> EthFilterResult {
524 match input {
525 EthFilterResult::Hashes(vec) if vec.is_empty() => EthFilterResult::Logs(Vec::new()),
526 other => other,
527 }
528}
529
530fn eth_get_filter_logs(tx: TestTransaction) -> RpcTestScenario {
531 RpcTestScenario::basic(move |client| {
532 let tx = tx.clone();
533 async move {
534 const BLOCK_RANGE: u64 = 1;
535
536 let tipset = client.call(ChainHead::request(())?).await?;
537
538 let encoded = cbor4ii::serde::to_vec(
539 Vec::with_capacity(tx.payload.len()),
540 &Value::Bytes(tx.payload.clone()),
541 )
542 .context("failed to encode params")?;
543
544 let message = Message {
545 to: tx.to,
546 from: tx.from,
547 method_num: EVMMethod::InvokeContract as u64,
548 params: encoded.into(),
549 ..Default::default()
550 };
551
552 let smsg = client
553 .call(MpoolPushMessage::request((message, None))?)
554 .await?;
555
556 let lookup = client
557 .call(
558 StateWaitMsg::request((smsg.cid(), 0, tipset.epoch(), false))?
559 .with_timeout(Duration::from_secs(300)),
560 )
561 .await?;
562
563 let block_num = EthUint64(lookup.height as u64);
564
565 let topics = EthTopicSpec(vec![EthHashList::Single(Some(tx.topic))]);
566
567 let filter_spec = EthFilterSpec {
568 from_block: Some(format!("0x{:x}", block_num.0.saturating_sub(BLOCK_RANGE))),
569 to_block: Some(block_num.to_hex_string()),
570 topics: Some(topics),
571 ..Default::default()
572 };
573
574 let filter_id = client.call(EthNewFilter::request((filter_spec,))?).await?;
575
576 let filter_result = as_logs(
577 client
578 .call(EthGetFilterLogs::request((filter_id.clone(),))?)
579 .await?,
580 );
581
582 let result = if let EthFilterResult::Logs(logs) = filter_result {
583 anyhow::ensure!(!logs.is_empty());
584 Ok(())
585 } else {
586 Err(anyhow::anyhow!("expecting logs"))
587 };
588
589 let removed = client
590 .call(EthUninstallFilter::request((filter_id,))?)
591 .await?;
592 anyhow::ensure!(removed);
593
594 result
595 }
596 })
597}
598
599const LOTUS_EVENTS_MAXFILTERS: usize = 100;
600
601macro_rules! with_methods {
602 ( $builder:expr, $( $method:ty ),+ ) => {{
603 let mut b = $builder;
604 $(
605 b = b.using::<{ <$method>::N_REQUIRED_PARAMS }, $method>();
606 )+
607 b
608 }};
609}
610
611pub(super) async fn create_tests(tx: TestTransaction) -> Vec<RpcTestScenario> {
612 vec![
613 with_methods!(
614 create_eth_new_filter_test().name("eth_newFilter install/uninstall"),
615 EthNewFilter,
616 EthUninstallFilter
617 ),
618 with_methods!(
619 create_eth_new_filter_limit_test(20).name("eth_newFilter under limit"),
620 EthNewFilter,
621 EthUninstallFilter
622 ),
623 with_methods!(
624 create_eth_new_filter_limit_test(LOTUS_EVENTS_MAXFILTERS)
625 .name("eth_newFilter just under limit"),
626 EthNewFilter,
627 EthUninstallFilter
628 ),
629 with_methods!(
630 create_eth_new_filter_limit_test(LOTUS_EVENTS_MAXFILTERS + 1)
631 .name("eth_newFilter over limit")
632 .should_fail_with("maximum number of filters registered"),
633 EthNewFilter,
634 EthUninstallFilter
635 ),
636 with_methods!(
637 eth_new_block_filter().name("eth_newBlockFilter works"),
638 EthNewBlockFilter,
639 EthGetFilterChanges,
640 EthUninstallFilter
641 ),
642 with_methods!(
643 eth_new_pending_transaction_filter(tx.clone())
644 .name("eth_newPendingTransactionFilter works")
645 .ignore("https://github.com/ChainSafe/forest/issues/5916"),
646 EthNewPendingTransactionFilter,
647 EthGetFilterChanges,
648 EthUninstallFilter
649 ),
650 with_methods!(
651 eth_get_filter_logs(tx.clone())
652 .name("eth_getFilterLogs works")
653 .ignore("https://github.com/ChainSafe/forest/issues/5917"),
654 EthNewFilter,
655 EthGetFilterLogs,
656 EthUninstallFilter
657 ),
658 ]
659}