Skip to main content

forest/tool/subcommands/api_cmd/
stateful_tests.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3use crate::eth::EVMMethod;
4use crate::message::SignedMessage;
5use crate::networks::calibnet::ETH_CHAIN_ID;
6use crate::rpc::eth::EthUint64;
7use crate::rpc::eth::types::*;
8use crate::rpc::types::ApiTipsetKey;
9use crate::rpc::{self, RpcMethod, prelude::*};
10use crate::shim::{address::Address, message::Message};
11
12use anyhow::{Context, ensure};
13use cbor4ii::core::Value;
14use cid::Cid;
15use futures::{SinkExt, StreamExt};
16use serde_json::json;
17use tokio::time::Duration;
18use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
19
20use std::io::{self, Write};
21use std::pin::Pin;
22use std::sync::Arc;
23
24type TestRunner = Arc<
25    dyn Fn(Arc<rpc::Client>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
26        + Send
27        + Sync,
28>;
29
30#[derive(Clone)]
31pub struct TestTransaction {
32    pub to: Address,
33    pub from: Address,
34    pub payload: Vec<u8>,
35    pub topic: EthHash,
36}
37
38#[derive(Clone)]
39pub struct RpcTestScenario {
40    pub run: TestRunner,
41    pub name: Option<&'static str>,
42    pub should_fail_with: Option<&'static str>,
43    pub used_methods: Vec<&'static str>,
44    pub ignore: Option<&'static str>,
45}
46
47impl RpcTestScenario {
48    /// Create a basic scenario from a simple async closure.
49    pub fn basic<F, Fut>(run_fn: F) -> Self
50    where
51        F: Fn(Arc<rpc::Client>) -> Fut + Send + Sync + 'static,
52        Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
53    {
54        let run = Arc::new(move |client: Arc<rpc::Client>| {
55            Box::pin(run_fn(client)) as Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
56        });
57        Self {
58            run,
59            name: Default::default(),
60            should_fail_with: Default::default(),
61            used_methods: Default::default(),
62            ignore: None,
63        }
64    }
65
66    fn name(mut self, name: &'static str) -> Self {
67        self.name = Some(name);
68        self
69    }
70
71    pub fn should_fail_with(mut self, msg: &'static str) -> Self {
72        self.should_fail_with = Some(msg);
73        self
74    }
75
76    fn using<const ARITY: usize, M>(mut self) -> Self
77    where
78        M: RpcMethod<ARITY>,
79    {
80        self.used_methods.push(M::NAME);
81        if let Some(alias) = M::NAME_ALIAS {
82            self.used_methods.push(alias);
83        }
84        self
85    }
86
87    fn _ignore(mut self, msg: &'static str) -> Self {
88        self.ignore = Some(msg);
89        self
90    }
91}
92
93pub(super) async fn run_tests(
94    tests: impl IntoIterator<Item = RpcTestScenario> + Clone,
95    client: impl Into<Arc<rpc::Client>>,
96    filter: String,
97) -> anyhow::Result<()> {
98    let client: Arc<rpc::Client> = client.into();
99    let mut passed = 0;
100    let mut failed = 0;
101    let mut ignored = 0;
102    let mut filtered = 0;
103
104    println!("running {} tests", tests.clone().into_iter().count());
105
106    for (i, test) in tests.into_iter().enumerate() {
107        if !filter.is_empty() && !test.used_methods.iter().any(|m| m.starts_with(&filter)) {
108            filtered += 1;
109            continue;
110        }
111        if test.ignore.is_some() {
112            ignored += 1;
113            println!(
114                "test {} ... ignored",
115                if let Some(name) = test.name {
116                    name.to_string()
117                } else {
118                    format!("#{i}")
119                },
120            );
121            continue;
122        }
123
124        print!(
125            "test {} ... ",
126            if let Some(name) = test.name {
127                name.to_string()
128            } else {
129                format!("#{i}")
130            }
131        );
132
133        io::stdout().flush()?;
134
135        let result = (test.run)(client.clone()).await;
136
137        match result {
138            Ok(_) => {
139                if let Some(expected_msg) = test.should_fail_with {
140                    println!("FAILED (expected failure containing '{expected_msg}')");
141                    failed += 1;
142                } else {
143                    println!("ok");
144                    passed += 1;
145                }
146            }
147            Err(e) => {
148                if let Some(expected_msg) = test.should_fail_with {
149                    let err_str = format!("{e:#}");
150                    if err_str
151                        .to_lowercase()
152                        .contains(&expected_msg.to_lowercase())
153                    {
154                        println!("ok");
155                        passed += 1;
156                    } else {
157                        println!("FAILED ({e:#})");
158                        failed += 1;
159                    }
160                } else {
161                    println!("FAILED {e:#}");
162                    failed += 1;
163                }
164            }
165        }
166    }
167    let status = if failed == 0 { "ok" } else { "FAILED" };
168    println!(
169        "test result: {status}. {passed} passed; {failed} failed; {ignored} ignored; {filtered} filtered out"
170    );
171    ensure!(failed == 0, "{failed} test(s) failed");
172    Ok(())
173}
174
175#[allow(unreachable_code)]
176async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> {
177    async fn close_channel(
178        stream: &mut tokio_tungstenite::WebSocketStream<
179            tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
180        >,
181        id: &serde_json::Value,
182    ) -> anyhow::Result<()> {
183        let request = json!({
184            "jsonrpc": "2.0",
185            "id": 1,
186            "method": "xrpc.cancel",
187            "params": [id]
188        });
189
190        stream
191            .send(WsMessage::Text(request.to_string().into()))
192            .await
193            .context("failed to send close channel request")?;
194
195        Ok(())
196    }
197
198    let mut url = client.base_url().clone();
199    url.set_scheme("ws")
200        .map_err(|_| anyhow::anyhow!("failed to set scheme"))?;
201    url.set_path("/rpc/v1");
202
203    // Note: The token is not required for the ChainNotify method.
204    let (mut ws_stream, _) = connect_async(url.as_str()).await?;
205
206    let request = json!({
207        "jsonrpc": "2.0",
208        "id": 1,
209        "method": "Filecoin.ChainNotify",
210        "params": []
211    });
212    ws_stream
213        .send(WsMessage::Text(request.to_string().into()))
214        .await?;
215
216    let mut channel_id: Option<serde_json::Value> = None;
217
218    // The goal of this loop is to wait for a new tipset to arrive without using a busy loop or sleep.
219    // It processes incoming WebSocket messages until it encounters an "apply" or "revert" change type.
220    // If an "apply" change is found, it closes the channel and exits. If a "revert" change is found,
221    // it closes the channel and raises an error. Any channel protocol or parameter validation issues result in an error.
222    loop {
223        let msg = match tokio::time::timeout(Duration::from_secs(180), ws_stream.next()).await {
224            Ok(Some(msg)) => msg,
225            Ok(None) => anyhow::bail!("WebSocket stream closed"),
226            Err(_) => {
227                if let Some(id) = channel_id.as_ref() {
228                    let _ = close_channel(&mut ws_stream, id).await;
229                }
230                let _ = ws_stream.close(None).await;
231                anyhow::bail!("timeout waiting for tipset");
232            }
233        };
234        match msg {
235            Ok(WsMessage::Text(text)) => {
236                let json: serde_json::Value = serde_json::from_str(&text)?;
237
238                if let Some(id) = json.get("result") {
239                    channel_id = Some(id.clone());
240                } else {
241                    let method = json!("xrpc.ch.val");
242                    anyhow::ensure!(json.get("method") == Some(&method));
243
244                    if let Some(params) = json.get("params").and_then(|v| v.as_array()) {
245                        if let Some(id) = params.first() {
246                            anyhow::ensure!(Some(id) == channel_id.as_ref());
247                        } else {
248                            anyhow::bail!("expecting an open channel");
249                        }
250                        if let Some(changes) = params.get(1).and_then(|v| v.as_array()) {
251                            for change in changes {
252                                if let Some(type_) = change.get("Type").and_then(|v| v.as_str()) {
253                                    if type_ == "apply" {
254                                        let id = channel_id.as_ref().ok_or_else(|| {
255                                            anyhow::anyhow!("subscription not opened")
256                                        })?;
257                                        close_channel(&mut ws_stream, id).await?;
258                                        ws_stream.close(None).await?;
259                                        return Ok(());
260                                    } else if type_ == "revert" {
261                                        let id = channel_id.as_ref().ok_or_else(|| {
262                                            anyhow::anyhow!("subscription not opened")
263                                        })?;
264                                        close_channel(&mut ws_stream, id).await?;
265                                        ws_stream.close(None).await?;
266                                        anyhow::bail!("revert");
267                                    }
268                                }
269                            }
270                        }
271                    } else {
272                        let id = channel_id
273                            .as_ref()
274                            .ok_or_else(|| anyhow::anyhow!("subscription not opened"))?;
275                        close_channel(&mut ws_stream, id).await?;
276                        ws_stream.close(None).await?;
277                        anyhow::bail!("expecting params");
278                    }
279                }
280            }
281            Err(..) | Ok(WsMessage::Close(..)) => {
282                let id = channel_id
283                    .as_ref()
284                    .ok_or_else(|| anyhow::anyhow!("subscription not opened"))?;
285                close_channel(&mut ws_stream, id).await?;
286                ws_stream.close(None).await?;
287                anyhow::bail!("unexpected error or close message");
288            }
289            _ => {
290                // Ignore other message types
291            }
292        }
293    }
294
295    unreachable!("loop always returns within the branches above")
296}
297
298async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> {
299    let tipset = client.call(ChainHead::request(())?).await?;
300    let mut retries = 100;
301    loop {
302        let pending = client
303            .call(MpoolPending::request((ApiTipsetKey(None),))?)
304            .await?;
305
306        if pending.0.iter().any(|msg| msg.cid() == message_cid) {
307            client
308                .call(
309                    StateWaitMsg::request((message_cid, 1, tipset.epoch(), true))?
310                        .with_timeout(Duration::from_secs(300)),
311                )
312                .await?;
313            break Ok(());
314        }
315        ensure!(retries != 0, "Message not found in mpool");
316        retries -= 1;
317
318        tokio::time::sleep(Duration::from_millis(10)).await;
319    }
320}
321
322async fn invoke_contract(client: &rpc::Client, tx: &TestTransaction) -> anyhow::Result<Cid> {
323    let encoded_params = cbor4ii::serde::to_vec(
324        Vec::with_capacity(tx.payload.len()),
325        &Value::Bytes(tx.payload.clone()),
326    )
327    .context("failed to encode params")?;
328    let nonce = client.call(MpoolGetNonce::request((tx.from,))?).await?;
329    let message = Message {
330        to: tx.to,
331        from: tx.from,
332        sequence: nonce,
333        method_num: EVMMethod::InvokeContract as u64,
334        params: encoded_params.into(),
335        ..Default::default()
336    };
337    let unsigned_msg = client
338        .call(GasEstimateMessageGas::request((
339            message,
340            None,
341            ApiTipsetKey(None),
342        ))?)
343        .await?;
344
345    let eth_tx_args = crate::eth::EthEip1559TxArgsBuilder::default()
346        .chain_id(ETH_CHAIN_ID)
347        .unsigned_message(&unsigned_msg.message)?
348        .build()
349        .map_err(|e| anyhow::anyhow!("Failed to build EIP-1559 transaction: {}", e))?;
350    let eth_tx = crate::eth::EthTx::Eip1559(Box::new(eth_tx_args));
351    let data = eth_tx.rlp_unsigned_message(ETH_CHAIN_ID)?;
352
353    let sig = client.call(WalletSign::request((tx.from, data))?).await?;
354    let smsg = SignedMessage::new_unchecked(unsigned_msg.message, sig);
355    let cid = smsg.cid();
356
357    client.call(MpoolPush::request((smsg,))?).await?;
358
359    Ok(cid)
360}
361
362fn create_eth_new_filter_test() -> RpcTestScenario {
363    RpcTestScenario::basic(|client| async move {
364        const BLOCK_RANGE: u64 = 200;
365
366        let last_block = client.call(EthBlockNumber::request(())?).await?;
367
368        let filter_spec = EthFilterSpec {
369            from_block: Some(EthUint64(last_block.0.saturating_sub(BLOCK_RANGE)).to_hex_string()),
370            to_block: Some(last_block.to_hex_string()),
371            ..Default::default()
372        };
373
374        let filter_id = client.call(EthNewFilter::request((filter_spec,))?).await?;
375
376        let removed = client
377            .call(EthUninstallFilter::request((filter_id.clone(),))?)
378            .await?;
379        anyhow::ensure!(removed);
380
381        let removed = client
382            .call(EthUninstallFilter::request((filter_id,))?)
383            .await?;
384        anyhow::ensure!(!removed);
385
386        Ok(())
387    })
388}
389
390fn create_eth_new_filter_limit_test(count: usize) -> RpcTestScenario {
391    RpcTestScenario::basic(move |client| async move {
392        const BLOCK_RANGE: u64 = 200;
393
394        let last_block = client.call(EthBlockNumber::request(())?).await?;
395
396        let filter_spec = EthFilterSpec {
397            from_block: Some(format!("0x{:x}", last_block.0.saturating_sub(BLOCK_RANGE))),
398            to_block: Some(last_block.to_hex_string()),
399            ..Default::default()
400        };
401
402        let mut ids = vec![];
403
404        for _ in 0..count {
405            let result = client
406                .call(EthNewFilter::request((filter_spec.clone(),))?)
407                .await;
408
409            match result {
410                Ok(filter_id) => ids.push(filter_id),
411                Err(e) => {
412                    // Cleanup any filters created so far to leave a clean state
413                    for id in ids {
414                        let removed = client.call(EthUninstallFilter::request((id,))?).await?;
415                        anyhow::ensure!(removed);
416                    }
417                    anyhow::bail!(e)
418                }
419            }
420        }
421
422        for id in ids {
423            let removed = client.call(EthUninstallFilter::request((id,))?).await?;
424            anyhow::ensure!(removed);
425        }
426
427        Ok(())
428    })
429}
430
431fn eth_new_block_filter() -> RpcTestScenario {
432    RpcTestScenario::basic(move |client| async move {
433        async fn process_filter(client: &rpc::Client, filter_id: &FilterID) -> anyhow::Result<()> {
434            let filter_result = client
435                .call(EthGetFilterChanges::request((filter_id.clone(),))?)
436                .await?;
437
438            if let EthFilterResult::Hashes(prev_hashes) = filter_result {
439                let verify_hashes = async |hashes: &[EthHash]| -> anyhow::Result<()> {
440                    for hash in hashes {
441                        let _block = client
442                            .call(EthGetBlockByHash::request((hash.clone(), false))?)
443                            .await?;
444                    }
445                    Ok(())
446                };
447                verify_hashes(&prev_hashes).await?;
448
449                // Wait for the next block to arrive
450                next_tipset(client).await?;
451
452                let filter_result = client
453                    .call(EthGetFilterChanges::request((filter_id.clone(),))?)
454                    .await?;
455
456                if let EthFilterResult::Hashes(hashes) = filter_result {
457                    verify_hashes(&hashes).await?;
458                    anyhow::ensure!(
459                        (prev_hashes.is_empty() && hashes.is_empty()) || prev_hashes != hashes,
460                    );
461
462                    Ok(())
463                } else {
464                    Err(anyhow::anyhow!("expecting blocks"))
465                }
466            } else {
467                Err(anyhow::anyhow!("expecting blocks"))
468            }
469        }
470
471        let mut retries = 5;
472        loop {
473            // Create the filter
474            let filter_id = client.call(EthNewBlockFilter::request(())?).await?;
475
476            let result = match process_filter(&client, &filter_id).await {
477                Ok(()) => Ok(()),
478                Err(e) if retries != 0 && e.to_string().contains("revert") => {
479                    // Cleanup
480                    let removed = client
481                        .call(EthUninstallFilter::request((filter_id,))?)
482                        .await?;
483                    anyhow::ensure!(removed);
484
485                    retries -= 1;
486                    continue;
487                }
488                Err(e) => Err(e),
489            };
490
491            // Cleanup
492            let removed = client
493                .call(EthUninstallFilter::request((filter_id,))?)
494                .await?;
495            anyhow::ensure!(removed);
496
497            break result;
498        }
499    })
500}
501
502fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario {
503    RpcTestScenario::basic(move |client| {
504        let tx = tx.clone();
505        async move {
506            let filter_id = client
507                .call(EthNewPendingTransactionFilter::request(())?)
508                .await?;
509
510            let filter_result = client
511                .call(EthGetFilterChanges::request((filter_id.clone(),))?)
512                .await?;
513
514            let result = if let EthFilterResult::Hashes(prev_hashes) = filter_result {
515                let cid = invoke_contract(&client, &tx).await?;
516
517                wait_pending_message(&client, cid).await?;
518
519                let filter_result = client
520                    .call(EthGetFilterChanges::request((filter_id.clone(),))?)
521                    .await?;
522
523                if let EthFilterResult::Hashes(hashes) = filter_result {
524                    anyhow::ensure!(
525                        prev_hashes != hashes,
526                        "prev_hashes={prev_hashes:?} hashes={hashes:?}"
527                    );
528
529                    let mut cids = vec![];
530                    for hash in &hashes {
531                        if let Some(cid) = client
532                            .call(EthGetMessageCidByTransactionHash::request((hash.clone(),))?)
533                            .await?
534                        {
535                            cids.push(cid);
536                        }
537                    }
538
539                    anyhow::ensure!(
540                        cids.contains(&cid),
541                        "CID missing from filter results: cid={cid:?} cids={cids:?} hashes={hashes:?}"
542                    );
543
544                    Ok(())
545                } else {
546                    Err(anyhow::anyhow!("expecting hashes"))
547                }
548            } else {
549                Err(anyhow::anyhow!("expecting transactions"))
550            };
551
552            let removed = client
553                .call(EthUninstallFilter::request((filter_id,))?)
554                .await?;
555            anyhow::ensure!(removed);
556
557            result
558        }
559    })
560}
561
562fn as_logs(input: EthFilterResult) -> EthFilterResult {
563    match input {
564        EthFilterResult::Hashes(vec) if vec.is_empty() => EthFilterResult::Logs(Vec::new()),
565        other => other,
566    }
567}
568
569fn eth_get_filter_logs(tx: TestTransaction) -> RpcTestScenario {
570    RpcTestScenario::basic(move |client| {
571        let tx = tx.clone();
572        async move {
573            const BLOCK_RANGE: u64 = 1;
574
575            let tipset = client.call(ChainHead::request(())?).await?;
576            let cid = invoke_contract(&client, &tx).await?;
577            let lookup = client
578                .call(
579                    StateWaitMsg::request((cid, 1, tipset.epoch(), true))?
580                        .with_timeout(Duration::from_secs(300)),
581                )
582                .await?;
583            let block_num = EthUint64(lookup.height as u64);
584
585            let topics = EthTopicSpec(vec![EthHashList::Single(Some(tx.topic))]);
586            let filter_spec = EthFilterSpec {
587                from_block: Some(format!("0x{:x}", block_num.0.saturating_sub(BLOCK_RANGE))),
588                topics: Some(topics),
589                ..Default::default()
590            };
591
592            let filter_id = client
593                .call(EthNewFilter::request((filter_spec.clone(),))?)
594                .await?;
595            let filter_result = as_logs(
596                client
597                    .call(EthGetFilterLogs::request((filter_id.clone(),))?)
598                    .await?,
599            );
600            let result = if let EthFilterResult::Logs(logs) = filter_result {
601                anyhow::ensure!(
602                    !logs.is_empty(),
603                    "Empty logs: filter_spec={filter_spec:?} cid={cid:?}",
604                );
605                Ok(())
606            } else {
607                Err(anyhow::anyhow!("expecting logs"))
608            };
609
610            let removed = client
611                .call(EthUninstallFilter::request((filter_id,))?)
612                .await?;
613            anyhow::ensure!(removed);
614
615            result
616        }
617    })
618}
619
620const LOTUS_EVENTS_MAXFILTERS: usize = 100;
621
622macro_rules! with_methods {
623    ( $builder:expr, $( $method:ty ),+ ) => {{
624        let mut b = $builder;
625        $(
626            b = b.using::<{ <$method>::N_REQUIRED_PARAMS }, $method>();
627        )+
628        b
629    }};
630}
631
632pub(super) async fn create_tests(tx: TestTransaction) -> Vec<RpcTestScenario> {
633    vec![
634        with_methods!(
635            create_eth_new_filter_test().name("eth_newFilter install/uninstall"),
636            EthNewFilter,
637            EthUninstallFilter
638        ),
639        with_methods!(
640            create_eth_new_filter_limit_test(20).name("eth_newFilter under limit"),
641            EthNewFilter,
642            EthUninstallFilter
643        ),
644        with_methods!(
645            create_eth_new_filter_limit_test(LOTUS_EVENTS_MAXFILTERS)
646                .name("eth_newFilter just under limit"),
647            EthNewFilter,
648            EthUninstallFilter
649        ),
650        with_methods!(
651            create_eth_new_filter_limit_test(LOTUS_EVENTS_MAXFILTERS + 1)
652                .name("eth_newFilter over limit")
653                .should_fail_with("maximum number of filters registered"),
654            EthNewFilter,
655            EthUninstallFilter
656        ),
657        with_methods!(
658            eth_new_block_filter().name("eth_newBlockFilter works"),
659            EthNewBlockFilter,
660            EthGetFilterChanges,
661            EthUninstallFilter
662        ),
663        with_methods!(
664            eth_new_pending_transaction_filter(tx.clone())
665                .name("eth_newPendingTransactionFilter works"),
666            EthNewPendingTransactionFilter,
667            EthGetFilterChanges,
668            EthUninstallFilter
669        ),
670        with_methods!(
671            eth_get_filter_logs(tx.clone()).name("eth_getFilterLogs works"),
672            EthNewFilter,
673            EthGetFilterLogs,
674            EthUninstallFilter
675        ),
676    ]
677}