Skip to main content

forest/tool/subcommands/api_cmd/
stateful_tests.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3use crate::eth::EVMMethod;
4use crate::message::SignedMessage;
5use crate::networks::calibnet::ETH_CHAIN_ID;
6use crate::rpc::eth::EthUint64;
7use crate::rpc::eth::types::*;
8use crate::rpc::types::ApiTipsetKey;
9use crate::rpc::{self, RpcMethod, prelude::*};
10use crate::shim::{address::Address, message::Message};
11
12use anyhow::{Context, ensure};
13use cbor4ii::core::Value;
14use cid::Cid;
15use futures::{SinkExt, StreamExt};
16use serde_json::json;
17use tokio::time::Duration;
18use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
19
20use std::io::{self, Write};
21use std::pin::Pin;
22use std::sync::Arc;
23
24type TestRunner = Arc<
25    dyn Fn(Arc<rpc::Client>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
26        + Send
27        + Sync,
28>;
29
30#[derive(Clone)]
31pub struct TestTransaction {
32    pub to: Address,
33    pub from: Address,
34    pub payload: Vec<u8>,
35    pub topic: EthHash,
36}
37
38#[derive(Clone)]
39pub struct RpcTestScenario {
40    pub run: TestRunner,
41    pub name: Option<&'static str>,
42    pub should_fail_with: Option<&'static str>,
43    pub used_methods: Vec<&'static str>,
44    pub ignore: Option<&'static str>,
45}
46
47impl RpcTestScenario {
48    /// Create a basic scenario from a simple async closure.
49    pub fn basic<F, Fut>(run_fn: F) -> Self
50    where
51        F: Fn(Arc<rpc::Client>) -> Fut + Send + Sync + 'static,
52        Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
53    {
54        let run = Arc::new(move |client: Arc<rpc::Client>| {
55            Box::pin(run_fn(client)) as Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
56        });
57        Self {
58            run,
59            name: Default::default(),
60            should_fail_with: Default::default(),
61            used_methods: Default::default(),
62            ignore: None,
63        }
64    }
65
66    fn name(mut self, name: &'static str) -> Self {
67        self.name = Some(name);
68        self
69    }
70
71    pub fn should_fail_with(mut self, msg: &'static str) -> Self {
72        self.should_fail_with = Some(msg);
73        self
74    }
75
76    fn using<const ARITY: usize, M>(mut self) -> Self
77    where
78        M: RpcMethod<ARITY>,
79    {
80        self.used_methods.push(M::NAME);
81        if let Some(alias) = M::NAME_ALIAS {
82            self.used_methods.push(alias);
83        }
84        self
85    }
86
87    fn _ignore(mut self, msg: &'static str) -> Self {
88        self.ignore = Some(msg);
89        self
90    }
91}
92
93pub(super) async fn run_tests(
94    tests: impl IntoIterator<Item = RpcTestScenario> + Clone,
95    client: impl Into<Arc<rpc::Client>>,
96    filter: String,
97) -> anyhow::Result<()> {
98    let client: Arc<rpc::Client> = client.into();
99    let mut passed = 0;
100    let mut failed = 0;
101    let mut ignored = 0;
102    let mut filtered = 0;
103
104    println!("running {} tests", tests.clone().into_iter().count());
105
106    for (i, test) in tests.into_iter().enumerate() {
107        if !filter.is_empty() && !test.used_methods.iter().any(|m| m.starts_with(&filter)) {
108            filtered += 1;
109            continue;
110        }
111        if test.ignore.is_some() {
112            ignored += 1;
113            println!(
114                "test {} ... ignored",
115                if let Some(name) = test.name {
116                    name.to_string()
117                } else {
118                    format!("#{i}")
119                },
120            );
121            continue;
122        }
123
124        print!(
125            "test {} ... ",
126            if let Some(name) = test.name {
127                name.to_string()
128            } else {
129                format!("#{i}")
130            }
131        );
132
133        io::stdout().flush()?;
134
135        let result = (test.run)(client.clone()).await;
136
137        match result {
138            Ok(_) => {
139                if let Some(expected_msg) = test.should_fail_with {
140                    println!("FAILED (expected failure containing '{expected_msg}')");
141                    failed += 1;
142                } else {
143                    println!("ok");
144                    passed += 1;
145                }
146            }
147            Err(e) => {
148                if let Some(expected_msg) = test.should_fail_with {
149                    let err_str = format!("{e:#}");
150                    if err_str
151                        .to_lowercase()
152                        .contains(&expected_msg.to_lowercase())
153                    {
154                        println!("ok");
155                        passed += 1;
156                    } else {
157                        println!("FAILED ({e:#})");
158                        failed += 1;
159                    }
160                } else {
161                    println!("FAILED {e:#}");
162                    failed += 1;
163                }
164            }
165        }
166    }
167    let status = if failed == 0 { "ok" } else { "FAILED" };
168    println!(
169        "test result: {status}. {passed} passed; {failed} failed; {ignored} ignored; {filtered} filtered out"
170    );
171    ensure!(failed == 0, "{failed} test(s) failed");
172    Ok(())
173}
174
175#[allow(unreachable_code)]
176async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> {
177    async fn close_channel(
178        stream: &mut tokio_tungstenite::WebSocketStream<
179            tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
180        >,
181        id: &serde_json::Value,
182    ) -> anyhow::Result<()> {
183        let request = json!({
184            "jsonrpc": "2.0",
185            "id": 1,
186            "method": "xrpc.cancel",
187            "params": [id]
188        });
189
190        stream
191            .send(WsMessage::Text(request.to_string().into()))
192            .await
193            .context("failed to send close channel request")?;
194
195        Ok(())
196    }
197
198    let mut url = client.base_url().clone();
199    url.set_scheme("ws")
200        .map_err(|_| anyhow::anyhow!("failed to set scheme"))?;
201    url.set_path("/rpc/v1");
202
203    // Note: The token is not required for the ChainNotify method.
204    let (mut ws_stream, _) = connect_async(url.as_str()).await?;
205
206    let request = json!({
207        "jsonrpc": "2.0",
208        "id": 1,
209        "method": "Filecoin.ChainNotify",
210        "params": []
211    });
212    ws_stream
213        .send(WsMessage::Text(request.to_string().into()))
214        .await?;
215
216    let mut channel_id: Option<serde_json::Value> = None;
217
218    // The goal of this loop is to wait for a new tipset to arrive without using a busy loop or sleep.
219    // It processes incoming WebSocket messages until it encounters an "apply" or "revert" change type.
220    // If an "apply" change is found, it closes the channel and exits. If a "revert" change is found,
221    // it closes the channel and raises an error. Any channel protocol or parameter validation issues result in an error.
222    loop {
223        let msg = match tokio::time::timeout(Duration::from_secs(180), ws_stream.next()).await {
224            Ok(Some(msg)) => msg,
225            Ok(None) => anyhow::bail!("WebSocket stream closed"),
226            Err(_) => {
227                if let Some(id) = channel_id.as_ref() {
228                    let _ = close_channel(&mut ws_stream, id).await;
229                }
230                let _ = ws_stream.close(None).await;
231                anyhow::bail!("timeout waiting for tipset");
232            }
233        };
234        match msg {
235            Ok(WsMessage::Text(text)) => {
236                let json: serde_json::Value = serde_json::from_str(&text)?;
237
238                if let Some(id) = json.get("result") {
239                    channel_id = Some(id.clone());
240                } else {
241                    let method = json!("xrpc.ch.val");
242                    anyhow::ensure!(json.get("method") == Some(&method));
243
244                    if let Some(params) = json.get("params").and_then(|v| v.as_array()) {
245                        if let Some(id) = params.first() {
246                            anyhow::ensure!(Some(id) == channel_id.as_ref());
247                        } else {
248                            anyhow::bail!("expecting an open channel");
249                        }
250                        if let Some(changes) = params.get(1).and_then(|v| v.as_array()) {
251                            for change in changes {
252                                if let Some(type_) = change.get("Type").and_then(|v| v.as_str()) {
253                                    if type_ == "apply" {
254                                        let id = channel_id.as_ref().ok_or_else(|| {
255                                            anyhow::anyhow!("subscription not opened")
256                                        })?;
257                                        close_channel(&mut ws_stream, id).await?;
258                                        ws_stream.close(None).await?;
259                                        return Ok(());
260                                    } else if type_ == "revert" {
261                                        let id = channel_id.as_ref().ok_or_else(|| {
262                                            anyhow::anyhow!("subscription not opened")
263                                        })?;
264                                        close_channel(&mut ws_stream, id).await?;
265                                        ws_stream.close(None).await?;
266                                        anyhow::bail!("revert");
267                                    }
268                                }
269                            }
270                        }
271                    } else {
272                        let id = channel_id
273                            .as_ref()
274                            .ok_or_else(|| anyhow::anyhow!("subscription not opened"))?;
275                        close_channel(&mut ws_stream, id).await?;
276                        ws_stream.close(None).await?;
277                        anyhow::bail!("expecting params");
278                    }
279                }
280            }
281            Err(..) | Ok(WsMessage::Close(..)) => {
282                let id = channel_id
283                    .as_ref()
284                    .ok_or_else(|| anyhow::anyhow!("subscription not opened"))?;
285                close_channel(&mut ws_stream, id).await?;
286                ws_stream.close(None).await?;
287                anyhow::bail!("unexpected error or close message");
288            }
289            _ => {
290                // Ignore other message types
291            }
292        }
293    }
294
295    unreachable!("loop always returns within the branches above")
296}
297
298async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> {
299    let tipset = client.call(ChainHead::request(())?).await?;
300    let mut retries = 100;
301    loop {
302        let pending = client
303            .call(MpoolPending::request((ApiTipsetKey(None),))?)
304            .await?;
305
306        if pending.0.iter().any(|msg| msg.cid() == message_cid) {
307            client
308                .call(
309                    StateWaitMsg::request((message_cid, 1, tipset.epoch(), true))?
310                        .with_timeout(Duration::from_secs(300)),
311                )
312                .await?;
313            break Ok(());
314        }
315        ensure!(retries != 0, "Message not found in mpool");
316        retries -= 1;
317
318        tokio::time::sleep(Duration::from_millis(10)).await;
319    }
320}
321
322async fn invoke_contract(client: &rpc::Client, tx: &TestTransaction) -> anyhow::Result<Cid> {
323    let encoded_params = cbor4ii::serde::to_vec(
324        Vec::with_capacity(tx.payload.len()),
325        &Value::Bytes(tx.payload.clone()),
326    )
327    .context("failed to encode params")?;
328    let nonce = client.call(MpoolGetNonce::request((tx.from,))?).await?;
329    let message = Message {
330        to: tx.to,
331        from: tx.from,
332        sequence: nonce,
333        method_num: EVMMethod::InvokeContract as u64,
334        params: encoded_params.into(),
335        ..Default::default()
336    };
337    let unsigned_msg = client
338        .call(GasEstimateMessageGas::request((
339            message,
340            None,
341            ApiTipsetKey(None),
342        ))?)
343        .await?;
344
345    let eth_tx_args = crate::eth::EthEip1559TxArgsBuilder::default()
346        .chain_id(ETH_CHAIN_ID)
347        .unsigned_message(&unsigned_msg.message)?
348        .build()
349        .map_err(|e| anyhow::anyhow!("Failed to build EIP-1559 transaction: {}", e))?;
350    let eth_tx = crate::eth::EthTx::from(eth_tx_args);
351    let data = eth_tx.rlp_unsigned_message(ETH_CHAIN_ID)?;
352
353    let sig = client.call(WalletSign::request((tx.from, data))?).await?;
354    let smsg = SignedMessage::new_unchecked(unsigned_msg.message, sig);
355    let cid = smsg.cid();
356
357    client.call(MpoolPush::request((smsg,))?).await?;
358
359    Ok(cid)
360}
361
362fn create_eth_new_filter_test() -> RpcTestScenario {
363    RpcTestScenario::basic(|client| async move {
364        const BLOCK_RANGE: u64 = 200;
365
366        let last_block = client.call(EthBlockNumber::request(())?).await?;
367
368        let filter_spec = EthFilterSpec {
369            from_block: Some(EthUint64(last_block.0.saturating_sub(BLOCK_RANGE)).to_hex_string()),
370            to_block: Some(last_block.to_hex_string()),
371            ..Default::default()
372        };
373
374        let filter_id = client.call(EthNewFilter::request((filter_spec,))?).await?;
375
376        let removed = client
377            .call(EthUninstallFilter::request((filter_id.clone(),))?)
378            .await?;
379        anyhow::ensure!(removed);
380
381        let removed = client
382            .call(EthUninstallFilter::request((filter_id,))?)
383            .await?;
384        anyhow::ensure!(!removed);
385
386        Ok(())
387    })
388}
389
390fn create_eth_new_filter_limit_test(count: usize) -> RpcTestScenario {
391    RpcTestScenario::basic(move |client| async move {
392        const BLOCK_RANGE: u64 = 200;
393
394        let last_block = client.call(EthBlockNumber::request(())?).await?;
395
396        let filter_spec = EthFilterSpec {
397            from_block: Some(format!("0x{:x}", last_block.0.saturating_sub(BLOCK_RANGE))),
398            to_block: Some(last_block.to_hex_string()),
399            ..Default::default()
400        };
401
402        let mut ids = vec![];
403
404        for _ in 0..count {
405            let result = client
406                .call(EthNewFilter::request((filter_spec.clone(),))?)
407                .await;
408
409            match result {
410                Ok(filter_id) => ids.push(filter_id),
411                Err(e) => {
412                    // Cleanup any filters created so far to leave a clean state
413                    for id in ids {
414                        let removed = client.call(EthUninstallFilter::request((id,))?).await?;
415                        anyhow::ensure!(removed);
416                    }
417                    anyhow::bail!(e)
418                }
419            }
420        }
421
422        for id in ids {
423            let removed = client.call(EthUninstallFilter::request((id,))?).await?;
424            anyhow::ensure!(removed);
425        }
426
427        Ok(())
428    })
429}
430
431fn eth_new_block_filter() -> RpcTestScenario {
432    RpcTestScenario::basic(move |client| async move {
433        async fn process_filter(client: &rpc::Client, filter_id: &FilterID) -> anyhow::Result<()> {
434            let filter_result = client
435                .call(EthGetFilterChanges::request((filter_id.clone(),))?)
436                .await?;
437
438            if let EthFilterResult::Hashes(prev_hashes) = filter_result {
439                let verify_hashes = async |hashes: &[EthHash]| -> anyhow::Result<()> {
440                    for hash in hashes {
441                        let _block = client
442                            .call(EthGetBlockByHash::request((*hash, false))?)
443                            .await?;
444                    }
445                    Ok(())
446                };
447                verify_hashes(&prev_hashes).await?;
448
449                // Wait for the next block to arrive
450                next_tipset(client).await?;
451
452                let filter_result = client
453                    .call(EthGetFilterChanges::request((filter_id.clone(),))?)
454                    .await?;
455
456                if let EthFilterResult::Hashes(hashes) = filter_result {
457                    verify_hashes(&hashes).await?;
458                    anyhow::ensure!(
459                        (prev_hashes.is_empty() && hashes.is_empty()) || prev_hashes != hashes,
460                    );
461
462                    Ok(())
463                } else {
464                    Err(anyhow::anyhow!("expecting blocks"))
465                }
466            } else {
467                Err(anyhow::anyhow!("expecting blocks"))
468            }
469        }
470
471        let mut retries = 5;
472        loop {
473            // Create the filter
474            let filter_id = client.call(EthNewBlockFilter::request(())?).await?;
475
476            let result = match process_filter(&client, &filter_id).await {
477                Ok(()) => Ok(()),
478                Err(e) if retries != 0 && e.to_string().contains("revert") => {
479                    // Cleanup
480                    let removed = client
481                        .call(EthUninstallFilter::request((filter_id,))?)
482                        .await?;
483                    anyhow::ensure!(removed);
484
485                    retries -= 1;
486                    continue;
487                }
488                Err(e) => Err(e),
489            };
490
491            // Cleanup
492            let removed = client
493                .call(EthUninstallFilter::request((filter_id,))?)
494                .await?;
495            anyhow::ensure!(removed);
496
497            break result;
498        }
499    })
500}
501
502fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario {
503    RpcTestScenario::basic(move |client| {
504        let tx = tx.clone();
505        async move {
506            let filter_id = client
507                .call(EthNewPendingTransactionFilter::request(())?)
508                .await?;
509
510            let filter_result = client
511                .call(EthGetFilterChanges::request((filter_id.clone(),))?)
512                .await?;
513
514            let result = if let EthFilterResult::Hashes(prev_hashes) = filter_result {
515                let cid = invoke_contract(&client, &tx).await?;
516
517                // Get the Eth transaction hash for our CID directly, rather than
518                // reverse-mapping every hash from the filter results back to CIDs
519                // (which is fragile — the mapping can return None for recent txns).
520                let tx_hash = client
521                    .call(EthGetTransactionHashByCid::request((cid,))?)
522                    .await?
523                    .context("no Eth transaction hash for CID")?;
524
525                wait_pending_message(&client, cid).await?;
526
527                let filter_result = client
528                    .call(EthGetFilterChanges::request((filter_id.clone(),))?)
529                    .await?;
530
531                if let EthFilterResult::Hashes(hashes) = filter_result {
532                    anyhow::ensure!(
533                        prev_hashes != hashes,
534                        "prev_hashes={prev_hashes:?} hashes={hashes:?}"
535                    );
536
537                    anyhow::ensure!(
538                        hashes.contains(&tx_hash),
539                        "transaction hash missing from filter results: tx_hash={tx_hash:?} cid={cid:?} hashes={hashes:?}"
540                    );
541
542                    Ok(())
543                } else {
544                    Err(anyhow::anyhow!("expecting hashes"))
545                }
546            } else {
547                Err(anyhow::anyhow!("expecting transactions"))
548            };
549
550            let removed = client
551                .call(EthUninstallFilter::request((filter_id,))?)
552                .await?;
553            anyhow::ensure!(removed);
554
555            result
556        }
557    })
558}
559
560fn as_logs(input: EthFilterResult) -> EthFilterResult {
561    match input {
562        EthFilterResult::Hashes(vec) if vec.is_empty() => EthFilterResult::Logs(Vec::new()),
563        other => other,
564    }
565}
566
567fn eth_get_filter_logs(tx: TestTransaction) -> RpcTestScenario {
568    RpcTestScenario::basic(move |client| {
569        let tx = tx.clone();
570        async move {
571            const BLOCK_RANGE: u64 = 1;
572
573            let tipset = client.call(ChainHead::request(())?).await?;
574            let cid = invoke_contract(&client, &tx).await?;
575            let lookup = client
576                .call(
577                    StateWaitMsg::request((cid, 1, tipset.epoch(), true))?
578                        .with_timeout(Duration::from_secs(300)),
579                )
580                .await?;
581            let block_num = EthUint64(lookup.height as u64);
582
583            let topics = EthTopicSpec(vec![EthHashList::Single(Some(tx.topic))]);
584            let filter_spec = EthFilterSpec {
585                from_block: Some(format!("0x{:x}", block_num.0.saturating_sub(BLOCK_RANGE))),
586                topics: Some(topics),
587                ..Default::default()
588            };
589
590            let filter_id = client
591                .call(EthNewFilter::request((filter_spec.clone(),))?)
592                .await?;
593            let filter_result = as_logs(
594                client
595                    .call(EthGetFilterLogs::request((filter_id.clone(),))?)
596                    .await?,
597            );
598            let result = if let EthFilterResult::Logs(logs) = filter_result {
599                anyhow::ensure!(
600                    !logs.is_empty(),
601                    "Empty logs: filter_spec={filter_spec:?} cid={cid:?}",
602                );
603                Ok(())
604            } else {
605                Err(anyhow::anyhow!("expecting logs"))
606            };
607
608            let removed = client
609                .call(EthUninstallFilter::request((filter_id,))?)
610                .await?;
611            anyhow::ensure!(removed);
612
613            result
614        }
615    })
616}
617
618const LOTUS_EVENTS_MAXFILTERS: usize = 100;
619
620macro_rules! with_methods {
621    ( $builder:expr, $( $method:ty ),+ ) => {{
622        let mut b = $builder;
623        $(
624            b = b.using::<{ <$method>::N_REQUIRED_PARAMS }, $method>();
625        )+
626        b
627    }};
628}
629
630pub(super) async fn create_tests(tx: TestTransaction) -> Vec<RpcTestScenario> {
631    vec![
632        with_methods!(
633            create_eth_new_filter_test().name("eth_newFilter install/uninstall"),
634            EthNewFilter,
635            EthUninstallFilter
636        ),
637        with_methods!(
638            create_eth_new_filter_limit_test(20).name("eth_newFilter under limit"),
639            EthNewFilter,
640            EthUninstallFilter
641        ),
642        with_methods!(
643            create_eth_new_filter_limit_test(LOTUS_EVENTS_MAXFILTERS)
644                .name("eth_newFilter just under limit"),
645            EthNewFilter,
646            EthUninstallFilter
647        ),
648        with_methods!(
649            create_eth_new_filter_limit_test(LOTUS_EVENTS_MAXFILTERS + 1)
650                .name("eth_newFilter over limit")
651                .should_fail_with("maximum number of filters registered"),
652            EthNewFilter,
653            EthUninstallFilter
654        ),
655        with_methods!(
656            eth_new_block_filter().name("eth_newBlockFilter works"),
657            EthNewBlockFilter,
658            EthGetFilterChanges,
659            EthUninstallFilter
660        ),
661        with_methods!(
662            eth_new_pending_transaction_filter(tx.clone())
663                .name("eth_newPendingTransactionFilter works"),
664            EthNewPendingTransactionFilter,
665            EthGetFilterChanges,
666            EthGetTransactionHashByCid,
667            EthUninstallFilter
668        ),
669        with_methods!(
670            eth_get_filter_logs(tx.clone()).name("eth_getFilterLogs works"),
671            EthNewFilter,
672            EthGetFilterLogs,
673            EthUninstallFilter
674        ),
675    ]
676}