forest/tool/subcommands/api_cmd/
stateful_tests.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3use crate::eth::EVMMethod;
4use crate::rpc::eth::EthUint64;
5use crate::rpc::eth::types::*;
6use crate::rpc::types::ApiTipsetKey;
7use crate::rpc::{self, RpcMethod, prelude::*};
8use crate::shim::{address::Address, message::Message};
9
10use anyhow::{Context, ensure};
11use cbor4ii::core::Value;
12use cid::Cid;
13use futures::{SinkExt, StreamExt};
14use serde_json::json;
15use tokio::time::Duration;
16use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
17
18use std::io::{self, Write};
19use std::pin::Pin;
20use std::sync::Arc;
21
22type TestRunner = Arc<
23    dyn Fn(Arc<rpc::Client>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
24        + Send
25        + Sync,
26>;
27
28#[derive(Clone)]
29pub struct TestTransaction {
30    pub to: Address,
31    pub from: Address,
32    pub payload: Vec<u8>,
33    pub topic: EthHash,
34}
35
36#[derive(Clone)]
37pub struct RpcTestScenario {
38    pub run: TestRunner,
39    pub name: Option<&'static str>,
40    pub should_fail_with: Option<&'static str>,
41    pub used_methods: Vec<&'static str>,
42    pub ignore: Option<&'static str>,
43}
44
45impl RpcTestScenario {
46    /// Create a basic scenario from a simple async closure.
47    pub fn basic<F, Fut>(run_fn: F) -> Self
48    where
49        F: Fn(Arc<rpc::Client>) -> Fut + Send + Sync + 'static,
50        Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
51    {
52        let run = Arc::new(move |client: Arc<rpc::Client>| {
53            Box::pin(run_fn(client)) as Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
54        });
55        Self {
56            run,
57            name: Default::default(),
58            should_fail_with: Default::default(),
59            used_methods: Default::default(),
60            ignore: None,
61        }
62    }
63
64    fn name(mut self, name: &'static str) -> Self {
65        self.name = Some(name);
66        self
67    }
68
69    pub fn should_fail_with(mut self, msg: &'static str) -> Self {
70        self.should_fail_with = Some(msg);
71        self
72    }
73
74    fn using<const ARITY: usize, M>(mut self) -> Self
75    where
76        M: RpcMethod<ARITY>,
77    {
78        self.used_methods.push(M::NAME);
79        if let Some(alias) = M::NAME_ALIAS {
80            self.used_methods.push(alias);
81        }
82        self
83    }
84
85    fn ignore(mut self, msg: &'static str) -> Self {
86        self.ignore = Some(msg);
87        self
88    }
89}
90
91pub(super) async fn run_tests(
92    tests: impl IntoIterator<Item = RpcTestScenario> + Clone,
93    client: impl Into<Arc<rpc::Client>>,
94    filter: String,
95) -> anyhow::Result<()> {
96    let client: Arc<rpc::Client> = client.into();
97    let mut passed = 0;
98    let mut failed = 0;
99    let mut ignored = 0;
100    let mut filtered = 0;
101
102    println!("running {} tests", tests.clone().into_iter().count());
103
104    for (i, test) in tests.into_iter().enumerate() {
105        if !filter.is_empty() && !test.used_methods.iter().any(|m| m.starts_with(&filter)) {
106            filtered += 1;
107            continue;
108        }
109        if test.ignore.is_some() {
110            ignored += 1;
111            println!(
112                "test {} ... ignored",
113                if let Some(name) = test.name {
114                    name.to_string()
115                } else {
116                    format!("#{i}")
117                },
118            );
119            continue;
120        }
121
122        print!(
123            "test {} ... ",
124            if let Some(name) = test.name {
125                name.to_string()
126            } else {
127                format!("#{i}")
128            }
129        );
130
131        io::stdout().flush()?;
132
133        let result = (test.run)(client.clone()).await;
134
135        match result {
136            Ok(_) => {
137                if let Some(expected_msg) = test.should_fail_with {
138                    println!("FAILED (expected failure containing '{expected_msg}')");
139                    failed += 1;
140                } else {
141                    println!("ok");
142                    passed += 1;
143                }
144            }
145            Err(e) => {
146                if let Some(expected_msg) = test.should_fail_with {
147                    let err_str = format!("{e:#}");
148                    if err_str
149                        .to_lowercase()
150                        .contains(&expected_msg.to_lowercase())
151                    {
152                        println!("ok");
153                        passed += 1;
154                    } else {
155                        println!("FAILED ({e:#})");
156                        failed += 1;
157                    }
158                } else {
159                    println!("FAILED {e:#}");
160                    failed += 1;
161                }
162            }
163        }
164    }
165    let status = if failed == 0 { "ok" } else { "FAILED" };
166    println!(
167        "test result: {status}. {passed} passed; {failed} failed; {ignored} ignored; {filtered} filtered out"
168    );
169    ensure!(failed == 0, "{failed} test(s) failed");
170    Ok(())
171}
172
173#[allow(unreachable_code)]
174async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> {
175    async fn close_channel(
176        stream: &mut tokio_tungstenite::WebSocketStream<
177            tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
178        >,
179        id: &serde_json::Value,
180    ) -> anyhow::Result<()> {
181        let request = json!({
182            "jsonrpc": "2.0",
183            "id": 1,
184            "method": "xrpc.cancel",
185            "params": [id]
186        });
187
188        stream
189            .send(WsMessage::Text(request.to_string().into()))
190            .await
191            .context("failed to send close channel request")?;
192
193        Ok(())
194    }
195
196    let mut url = client.base_url().clone();
197    url.set_scheme("ws")
198        .map_err(|_| anyhow::anyhow!("failed to set scheme"))?;
199    url.set_path("/rpc/v1");
200
201    // Note: The token is not required for the ChainNotify method.
202    let (mut ws_stream, _) = connect_async(url.as_str()).await?;
203
204    let request = json!({
205        "jsonrpc": "2.0",
206        "id": 1,
207        "method": "Filecoin.ChainNotify",
208        "params": []
209    });
210    ws_stream
211        .send(WsMessage::Text(request.to_string().into()))
212        .await?;
213
214    let mut channel_id: Option<serde_json::Value> = None;
215
216    // The goal of this loop is to wait for a new tipset to arrive without using a busy loop or sleep.
217    // It processes incoming WebSocket messages until it encounters an "apply" or "revert" change type.
218    // If an "apply" change is found, it closes the channel and exits. If a "revert" change is found,
219    // it closes the channel and raises an error. Any channel protocol or parameter validation issues result in an error.
220    loop {
221        let msg = match tokio::time::timeout(Duration::from_secs(180), ws_stream.next()).await {
222            Ok(Some(msg)) => msg,
223            Ok(None) => anyhow::bail!("WebSocket stream closed"),
224            Err(_) => {
225                if let Some(id) = channel_id.as_ref() {
226                    let _ = close_channel(&mut ws_stream, id).await;
227                }
228                let _ = ws_stream.close(None).await;
229                anyhow::bail!("timeout waiting for tipset");
230            }
231        };
232        match msg {
233            Ok(WsMessage::Text(text)) => {
234                let json: serde_json::Value = serde_json::from_str(&text)?;
235
236                if let Some(id) = json.get("result") {
237                    channel_id = Some(id.clone());
238                } else {
239                    let method = json!("xrpc.ch.val");
240                    anyhow::ensure!(json.get("method") == Some(&method));
241
242                    if let Some(params) = json.get("params").and_then(|v| v.as_array()) {
243                        if let Some(id) = params.first() {
244                            anyhow::ensure!(Some(id) == channel_id.as_ref());
245                        } else {
246                            anyhow::bail!("expecting an open channel");
247                        }
248                        if let Some(changes) = params.get(1).and_then(|v| v.as_array()) {
249                            for change in changes {
250                                if let Some(type_) = change.get("Type").and_then(|v| v.as_str()) {
251                                    if type_ == "apply" {
252                                        let id = channel_id.as_ref().ok_or_else(|| {
253                                            anyhow::anyhow!("subscription not opened")
254                                        })?;
255                                        close_channel(&mut ws_stream, id).await?;
256                                        ws_stream.close(None).await?;
257                                        return Ok(());
258                                    } else if type_ == "revert" {
259                                        let id = channel_id.as_ref().ok_or_else(|| {
260                                            anyhow::anyhow!("subscription not opened")
261                                        })?;
262                                        close_channel(&mut ws_stream, id).await?;
263                                        ws_stream.close(None).await?;
264                                        anyhow::bail!("revert");
265                                    }
266                                }
267                            }
268                        }
269                    } else {
270                        let id = channel_id
271                            .as_ref()
272                            .ok_or_else(|| anyhow::anyhow!("subscription not opened"))?;
273                        close_channel(&mut ws_stream, id).await?;
274                        ws_stream.close(None).await?;
275                        anyhow::bail!("expecting params");
276                    }
277                }
278            }
279            Err(..) | Ok(WsMessage::Close(..)) => {
280                let id = channel_id
281                    .as_ref()
282                    .ok_or_else(|| anyhow::anyhow!("subscription not opened"))?;
283                close_channel(&mut ws_stream, id).await?;
284                ws_stream.close(None).await?;
285                anyhow::bail!("unexpected error or close message");
286            }
287            _ => {
288                // Ignore other message types
289            }
290        }
291    }
292
293    unreachable!("loop always returns within the branches above")
294}
295
296async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> {
297    let mut retries = 100;
298    loop {
299        let pending = client
300            .call(MpoolPending::request((ApiTipsetKey(None),))?)
301            .await?;
302
303        if pending.0.iter().any(|msg| msg.cid() == message_cid) {
304            break Ok(());
305        }
306        ensure!(retries != 0, "Message not found in mpool");
307        retries -= 1;
308
309        tokio::time::sleep(Duration::from_millis(10)).await;
310    }
311}
312
313fn create_eth_new_filter_test() -> RpcTestScenario {
314    RpcTestScenario::basic(|client| async move {
315        const BLOCK_RANGE: u64 = 200;
316
317        let last_block = client.call(EthBlockNumber::request(())?).await?;
318
319        let filter_spec = EthFilterSpec {
320            from_block: Some(EthUint64(last_block.0.saturating_sub(BLOCK_RANGE)).to_hex_string()),
321            to_block: Some(last_block.to_hex_string()),
322            ..Default::default()
323        };
324
325        let filter_id = client.call(EthNewFilter::request((filter_spec,))?).await?;
326
327        let removed = client
328            .call(EthUninstallFilter::request((filter_id.clone(),))?)
329            .await?;
330        anyhow::ensure!(removed);
331
332        let removed = client
333            .call(EthUninstallFilter::request((filter_id,))?)
334            .await?;
335        anyhow::ensure!(!removed);
336
337        Ok(())
338    })
339}
340
341fn create_eth_new_filter_limit_test(count: usize) -> RpcTestScenario {
342    RpcTestScenario::basic(move |client| async move {
343        const BLOCK_RANGE: u64 = 200;
344
345        let last_block = client.call(EthBlockNumber::request(())?).await?;
346
347        let filter_spec = EthFilterSpec {
348            from_block: Some(format!("0x{:x}", last_block.0.saturating_sub(BLOCK_RANGE))),
349            to_block: Some(last_block.to_hex_string()),
350            ..Default::default()
351        };
352
353        let mut ids = vec![];
354
355        for _ in 0..count {
356            let result = client
357                .call(EthNewFilter::request((filter_spec.clone(),))?)
358                .await;
359
360            match result {
361                Ok(filter_id) => ids.push(filter_id),
362                Err(e) => {
363                    // Cleanup any filters created so far to leave a clean state
364                    for id in ids {
365                        let removed = client.call(EthUninstallFilter::request((id,))?).await?;
366                        anyhow::ensure!(removed);
367                    }
368                    anyhow::bail!(e)
369                }
370            }
371        }
372
373        for id in ids {
374            let removed = client.call(EthUninstallFilter::request((id,))?).await?;
375            anyhow::ensure!(removed);
376        }
377
378        Ok(())
379    })
380}
381
382fn eth_new_block_filter() -> RpcTestScenario {
383    RpcTestScenario::basic(move |client| async move {
384        async fn process_filter(client: &rpc::Client, filter_id: &FilterID) -> anyhow::Result<()> {
385            let filter_result = client
386                .call(EthGetFilterChanges::request((filter_id.clone(),))?)
387                .await?;
388
389            if let EthFilterResult::Hashes(prev_hashes) = filter_result {
390                let verify_hashes = async |hashes: &[EthHash]| -> anyhow::Result<()> {
391                    for hash in hashes {
392                        let _block = client
393                            .call(EthGetBlockByHash::request((hash.clone(), false))?)
394                            .await?;
395                    }
396                    Ok(())
397                };
398                verify_hashes(&prev_hashes).await?;
399
400                // Wait for the next block to arrive
401                next_tipset(client).await?;
402
403                let filter_result = client
404                    .call(EthGetFilterChanges::request((filter_id.clone(),))?)
405                    .await?;
406
407                if let EthFilterResult::Hashes(hashes) = filter_result {
408                    verify_hashes(&hashes).await?;
409                    anyhow::ensure!(
410                        (prev_hashes.is_empty() && hashes.is_empty()) || prev_hashes != hashes,
411                    );
412
413                    Ok(())
414                } else {
415                    Err(anyhow::anyhow!("expecting blocks"))
416                }
417            } else {
418                Err(anyhow::anyhow!("expecting blocks"))
419            }
420        }
421
422        let mut retries = 5;
423        loop {
424            // Create the filter
425            let filter_id = client.call(EthNewBlockFilter::request(())?).await?;
426
427            let result = match process_filter(&client, &filter_id).await {
428                Ok(()) => Ok(()),
429                Err(e) if retries != 0 && e.to_string().contains("revert") => {
430                    // Cleanup
431                    let removed = client
432                        .call(EthUninstallFilter::request((filter_id,))?)
433                        .await?;
434                    anyhow::ensure!(removed);
435
436                    retries -= 1;
437                    continue;
438                }
439                Err(e) => Err(e),
440            };
441
442            // Cleanup
443            let removed = client
444                .call(EthUninstallFilter::request((filter_id,))?)
445                .await?;
446            anyhow::ensure!(removed);
447
448            break result;
449        }
450    })
451}
452
453fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario {
454    RpcTestScenario::basic(move |client| {
455        let tx = tx.clone();
456        async move {
457            let filter_id = client
458                .call(EthNewPendingTransactionFilter::request(())?)
459                .await?;
460
461            let filter_result = client
462                .call(EthGetFilterChanges::request((filter_id.clone(),))?)
463                .await?;
464
465            let result = if let EthFilterResult::Hashes(prev_hashes) = filter_result {
466                let encoded = cbor4ii::serde::to_vec(
467                    Vec::with_capacity(tx.payload.len()),
468                    &Value::Bytes(tx.payload.clone()),
469                )
470                .context("failed to encode params")?;
471
472                let message = Message {
473                    to: tx.to,
474                    from: tx.from,
475                    method_num: EVMMethod::InvokeContract as u64,
476                    params: encoded.into(),
477                    ..Default::default()
478                };
479
480                let smsg = client
481                    .call(MpoolPushMessage::request((message, None))?)
482                    .await?;
483
484                wait_pending_message(&client, smsg.cid()).await?;
485
486                let filter_result = client
487                    .call(EthGetFilterChanges::request((filter_id.clone(),))?)
488                    .await?;
489
490                if let EthFilterResult::Hashes(hashes) = filter_result {
491                    anyhow::ensure!(prev_hashes != hashes);
492
493                    let mut cids = vec![];
494                    for hash in hashes {
495                        if let Some(cid) = client
496                            .call(EthGetMessageCidByTransactionHash::request((hash,))?)
497                            .await?
498                        {
499                            cids.push(cid);
500                        }
501                    }
502
503                    anyhow::ensure!(cids.contains(&smsg.cid()));
504
505                    Ok(())
506                } else {
507                    Err(anyhow::anyhow!("expecting hashes"))
508                }
509            } else {
510                Err(anyhow::anyhow!("expecting transactions"))
511            };
512
513            let removed = client
514                .call(EthUninstallFilter::request((filter_id,))?)
515                .await?;
516            anyhow::ensure!(removed);
517
518            result
519        }
520    })
521}
522
523fn as_logs(input: EthFilterResult) -> EthFilterResult {
524    match input {
525        EthFilterResult::Hashes(vec) if vec.is_empty() => EthFilterResult::Logs(Vec::new()),
526        other => other,
527    }
528}
529
530fn eth_get_filter_logs(tx: TestTransaction) -> RpcTestScenario {
531    RpcTestScenario::basic(move |client| {
532        let tx = tx.clone();
533        async move {
534            const BLOCK_RANGE: u64 = 1;
535
536            let tipset = client.call(ChainHead::request(())?).await?;
537
538            let encoded = cbor4ii::serde::to_vec(
539                Vec::with_capacity(tx.payload.len()),
540                &Value::Bytes(tx.payload.clone()),
541            )
542            .context("failed to encode params")?;
543
544            let message = Message {
545                to: tx.to,
546                from: tx.from,
547                method_num: EVMMethod::InvokeContract as u64,
548                params: encoded.into(),
549                ..Default::default()
550            };
551
552            let smsg = client
553                .call(MpoolPushMessage::request((message, None))?)
554                .await?;
555
556            let lookup = client
557                .call(
558                    StateWaitMsg::request((smsg.cid(), 0, tipset.epoch(), false))?
559                        .with_timeout(Duration::from_secs(300)),
560                )
561                .await?;
562
563            let block_num = EthUint64(lookup.height as u64);
564
565            let topics = EthTopicSpec(vec![EthHashList::Single(Some(tx.topic))]);
566
567            let filter_spec = EthFilterSpec {
568                from_block: Some(format!("0x{:x}", block_num.0.saturating_sub(BLOCK_RANGE))),
569                to_block: Some(block_num.to_hex_string()),
570                topics: Some(topics),
571                ..Default::default()
572            };
573
574            let filter_id = client.call(EthNewFilter::request((filter_spec,))?).await?;
575
576            let filter_result = as_logs(
577                client
578                    .call(EthGetFilterLogs::request((filter_id.clone(),))?)
579                    .await?,
580            );
581
582            let result = if let EthFilterResult::Logs(logs) = filter_result {
583                anyhow::ensure!(!logs.is_empty());
584                Ok(())
585            } else {
586                Err(anyhow::anyhow!("expecting logs"))
587            };
588
589            let removed = client
590                .call(EthUninstallFilter::request((filter_id,))?)
591                .await?;
592            anyhow::ensure!(removed);
593
594            result
595        }
596    })
597}
598
599const LOTUS_EVENTS_MAXFILTERS: usize = 100;
600
601macro_rules! with_methods {
602    ( $builder:expr, $( $method:ty ),+ ) => {{
603        let mut b = $builder;
604        $(
605            b = b.using::<{ <$method>::N_REQUIRED_PARAMS }, $method>();
606        )+
607        b
608    }};
609}
610
611pub(super) async fn create_tests(tx: TestTransaction) -> Vec<RpcTestScenario> {
612    vec![
613        with_methods!(
614            create_eth_new_filter_test().name("eth_newFilter install/uninstall"),
615            EthNewFilter,
616            EthUninstallFilter
617        ),
618        with_methods!(
619            create_eth_new_filter_limit_test(20).name("eth_newFilter under limit"),
620            EthNewFilter,
621            EthUninstallFilter
622        ),
623        with_methods!(
624            create_eth_new_filter_limit_test(LOTUS_EVENTS_MAXFILTERS)
625                .name("eth_newFilter just under limit"),
626            EthNewFilter,
627            EthUninstallFilter
628        ),
629        with_methods!(
630            create_eth_new_filter_limit_test(LOTUS_EVENTS_MAXFILTERS + 1)
631                .name("eth_newFilter over limit")
632                .should_fail_with("maximum number of filters registered"),
633            EthNewFilter,
634            EthUninstallFilter
635        ),
636        with_methods!(
637            eth_new_block_filter().name("eth_newBlockFilter works"),
638            EthNewBlockFilter,
639            EthGetFilterChanges,
640            EthUninstallFilter
641        ),
642        with_methods!(
643            eth_new_pending_transaction_filter(tx.clone())
644                .name("eth_newPendingTransactionFilter works")
645                .ignore("https://github.com/ChainSafe/forest/issues/5916"),
646            EthNewPendingTransactionFilter,
647            EthGetFilterChanges,
648            EthUninstallFilter
649        ),
650        with_methods!(
651            eth_get_filter_logs(tx.clone())
652                .name("eth_getFilterLogs works")
653                .ignore("https://github.com/ChainSafe/forest/issues/5917"),
654            EthNewFilter,
655            EthGetFilterLogs,
656            EthUninstallFilter
657        ),
658    ]
659}