Skip to main content

forest/cli/subcommands/
mpool_cmd.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::Tipset;
5use crate::lotus_json::{HasLotusJson as _, NotNullVec};
6use crate::message::SignedMessage;
7use crate::rpc::{self, prelude::*, types::ApiTipsetKey};
8use crate::shim::address::StrictAddress;
9use crate::shim::message::Message;
10use crate::shim::{address::Address, econ::TokenAmount};
11
12use ahash::{HashMap, HashSet};
13use clap::Subcommand;
14use num::BigInt;
15
16#[derive(Debug, Subcommand)]
17pub enum MpoolCommands {
18    /// Get pending messages
19    Pending {
20        /// Print pending messages for addresses in local wallet only
21        #[arg(long)]
22        local: bool,
23        /// Only print `CIDs` of messages in output
24        #[arg(long)]
25        cids: bool,
26        /// Return messages to a given address
27        #[arg(long)]
28        to: Option<StrictAddress>,
29        /// Return messages from a given address
30        #[arg(long)]
31        from: Option<StrictAddress>,
32    },
33    /// Get the current nonce for an address
34    Nonce {
35        /// Address to check nonce for
36        address: StrictAddress,
37    },
38    /// Print mempool stats
39    Stat {
40        /// Number of blocks to look back for minimum `basefee`
41        #[arg(long, default_value = "60")]
42        basefee_lookback: u32,
43        /// Print stats for addresses in local wallet only
44        #[arg(long)]
45        local: bool,
46    },
47}
48
49fn filter_messages(
50    messages: Vec<SignedMessage>,
51    local_addrs: Option<HashSet<Address>>,
52    to: &Option<StrictAddress>,
53    from: &Option<StrictAddress>,
54) -> anyhow::Result<Vec<SignedMessage>> {
55    use crate::message::Message;
56
57    let filtered = messages
58        .into_iter()
59        .filter(|msg| {
60            local_addrs
61                .as_ref()
62                .map(|addrs| addrs.contains(&msg.from()))
63                .unwrap_or(true)
64                && to.map(|addr| msg.to() == addr.into()).unwrap_or(true)
65                && from.map(|addr| msg.from() == addr.into()).unwrap_or(true)
66        })
67        .collect();
68
69    Ok(filtered)
70}
71
72async fn get_actor_sequence(
73    message: &Message,
74    tipset: &Tipset,
75    client: &rpc::Client,
76) -> Option<u64> {
77    let address = message.from;
78    let get_actor_result = StateGetActor::call(client, (address, tipset.key().into())).await;
79    let actor_state = match get_actor_result {
80        Ok(maybe_actor) => {
81            if let Some(state) = maybe_actor {
82                state
83            } else {
84                println!("{address}, actor state not found");
85                return None;
86            }
87        }
88        Err(err) => {
89            println!("{address}, err: {err}");
90            return None;
91        }
92    };
93
94    Some(actor_state.sequence)
95}
96
97type StatBucket = HashMap<u64, Message>;
98
99#[derive(Debug, Default, Eq, PartialEq)]
100struct MpStat {
101    address: String,
102    past: u64,
103    current: u64,
104    future: u64,
105    below_current: u64,
106    below_past: u64,
107    gas_limit: BigInt,
108}
109
110fn compute_stats(
111    messages: &[Message],
112    actor_sequences: HashMap<Address, u64>,
113    curr_base_fee: TokenAmount,
114    min_base_fee: TokenAmount,
115) -> Vec<MpStat> {
116    let mut buckets = HashMap::<Address, StatBucket>::default();
117    for msg in messages {
118        buckets
119            .entry(msg.from)
120            .or_insert(StatBucket::default())
121            .insert(msg.sequence, msg.to_owned());
122    }
123
124    let mut stats: Vec<MpStat> = Vec::with_capacity(buckets.len());
125
126    for (address, bucket) in buckets {
127        let actor_sequence = *actor_sequences.get(&address).expect("get must succeed");
128
129        let mut curr_sequence = actor_sequence;
130        while bucket.contains_key(&curr_sequence) {
131            curr_sequence += 1;
132        }
133
134        let mut stat = MpStat {
135            address: address.to_string(),
136            ..Default::default()
137        };
138
139        for (_, msg) in bucket {
140            if msg.sequence < actor_sequence {
141                stat.past += 1;
142            } else if msg.sequence > curr_sequence {
143                stat.future += 1;
144            } else {
145                stat.current += 1;
146            }
147
148            if msg.gas_fee_cap < curr_base_fee {
149                stat.below_current += 1;
150            }
151            if msg.gas_fee_cap < min_base_fee {
152                stat.below_past += 1;
153            }
154
155            stat.gas_limit += msg.gas_limit;
156        }
157
158        stats.push(stat);
159    }
160
161    stats.sort_by(|m1, m2| m1.address.cmp(&m2.address));
162    stats
163}
164
165fn print_stats(stats: &[MpStat], basefee_lookback: u32) {
166    let mut total = MpStat::default();
167
168    for stat in stats {
169        total.past += stat.past;
170        total.current += stat.current;
171        total.future += stat.future;
172        total.below_current += stat.below_current;
173        total.below_past += stat.below_past;
174        total.gas_limit += &stat.gas_limit;
175
176        println!(
177            "{}: Nonce past: {}, cur: {}, future: {}; FeeCap cur: {}, min-{}: {}, gasLimit: {}",
178            stat.address,
179            stat.past,
180            stat.current,
181            stat.future,
182            stat.below_current,
183            basefee_lookback,
184            stat.below_past,
185            stat.gas_limit
186        );
187    }
188
189    println!("-----");
190    println!(
191        "total: Nonce past: {}, cur: {}, future: {}; FeeCap cur: {}, min-{}: {}, gasLimit: {}",
192        total.past,
193        total.current,
194        total.future,
195        total.below_current,
196        basefee_lookback,
197        total.below_past,
198        total.gas_limit
199    );
200}
201
202impl MpoolCommands {
203    pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> {
204        match self {
205            Self::Pending {
206                local,
207                cids,
208                to,
209                from,
210            } => {
211                let NotNullVec(messages) =
212                    MpoolPending::call(&client, (ApiTipsetKey(None),)).await?;
213
214                let local_addrs = if local {
215                    let response = WalletList::call(&client, ()).await?;
216                    Some(HashSet::from_iter(response))
217                } else {
218                    None
219                };
220
221                let filtered_messages = filter_messages(messages, local_addrs, &to, &from)?;
222
223                for msg in filtered_messages {
224                    if cids {
225                        println!("{}", msg.cid());
226                    } else {
227                        println!("{}", msg.into_lotus_json_string_pretty()?);
228                    }
229                }
230
231                Ok(())
232            }
233            Self::Stat {
234                basefee_lookback,
235                local,
236            } => {
237                let tipset = ChainHead::call(&client, ()).await?;
238                let curr_base_fee = tipset.block_headers().first().parent_base_fee.to_owned();
239
240                let atto_str = ChainGetMinBaseFee::call(&client, (basefee_lookback,)).await?;
241                let min_base_fee = TokenAmount::from_atto(atto_str.parse::<BigInt>()?);
242
243                let NotNullVec(messages) =
244                    MpoolPending::call(&client, (ApiTipsetKey(None),)).await?;
245
246                let local_addrs = if local {
247                    let response = WalletList::call(&client, ()).await?;
248                    Some(HashSet::from_iter(response))
249                } else {
250                    None
251                };
252
253                let messages: Vec<Message> = filter_messages(messages, local_addrs, &None, &None)?
254                    .into_iter()
255                    .map(|it| it.message)
256                    .collect();
257
258                let mut actor_sequences: HashMap<Address, u64> = HashMap::default();
259                for msg in messages.iter() {
260                    if let Some(sequence) = get_actor_sequence(msg, &tipset, &client).await {
261                        actor_sequences.insert(msg.from, sequence);
262                    }
263                }
264
265                let stats = compute_stats(&messages, actor_sequences, curr_base_fee, min_base_fee);
266
267                print_stats(&stats, basefee_lookback);
268
269                Ok(())
270            }
271            Self::Nonce { address } => {
272                let nonce = MpoolGetNonce::call(&client, (address.into(),)).await?;
273                println!("{nonce}");
274
275                Ok(())
276            }
277        }
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
285    use crate::message::{Message, SignedMessage};
286    use crate::message_pool::tests::create_smsg;
287    use crate::shim::crypto::SignatureType;
288    use itertools::Itertools as _;
289    use std::borrow::BorrowMut;
290
291    #[test]
292    fn message_filtering_none() {
293        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
294        let mut wallet = Wallet::new(keystore);
295        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
296        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
297
298        let mut smsg_vec = Vec::new();
299        for i in 0..4 {
300            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
301            smsg_vec.push(msg);
302        }
303
304        let smsg_json_vec = smsg_vec.clone().into_iter().collect_vec();
305
306        // No filtering is set up
307        let smsg_filtered: Vec<SignedMessage> = filter_messages(smsg_json_vec, None, &None, &None)
308            .unwrap()
309            .into_iter()
310            .collect();
311
312        assert_eq!(smsg_vec, smsg_filtered);
313    }
314
315    #[test]
316    fn message_filtering_local() {
317        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
318        let mut wallet = Wallet::new(keystore);
319        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
320        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
321
322        let mut smsg_vec = Vec::new();
323        for i in 0..4 {
324            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
325            smsg_vec.push(msg);
326        }
327
328        // Create a message with addresses from an external wallet
329        let ext_keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
330        let mut ext_wallet = Wallet::new(ext_keystore);
331        let ext_sender = ext_wallet.generate_addr(SignatureType::Secp256k1).unwrap();
332        let ext_target = ext_wallet.generate_addr(SignatureType::Secp256k1).unwrap();
333
334        let msg = create_smsg(
335            &ext_target,
336            &ext_sender,
337            ext_wallet.borrow_mut(),
338            4,
339            1000000,
340            1,
341        );
342        smsg_vec.push(msg);
343
344        let smsg_json_vec: Vec<SignedMessage> = smsg_vec.clone().into_iter().collect();
345        let local_addrs = HashSet::from_iter(wallet.list_addrs().unwrap());
346
347        // Filter local addresses
348        let smsg_filtered: Vec<SignedMessage> =
349            filter_messages(smsg_json_vec, Some(local_addrs), &None, &None)
350                .unwrap()
351                .into_iter()
352                .collect();
353
354        for smsg in smsg_filtered.iter() {
355            assert_eq!(smsg.from(), sender);
356        }
357    }
358
359    #[test]
360    fn message_filtering_from() {
361        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
362        let mut wallet = Wallet::new(keystore);
363        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
364        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
365
366        let mut smsg_vec = Vec::new();
367        for i in 0..4 {
368            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
369            smsg_vec.push(msg);
370        }
371
372        // Create a message from a second sender
373        let sender2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
374
375        let msg = create_smsg(&target, &sender2, wallet.borrow_mut(), 4, 1000000, 1);
376        smsg_vec.push(msg);
377
378        let smsg_json_vec: Vec<SignedMessage> = smsg_vec.clone().into_iter().collect();
379
380        // Filtering messages from sender2
381        let smsg_filtered: Vec<SignedMessage> =
382            filter_messages(smsg_json_vec, None, &None, &Some(sender2.into()))
383                .unwrap()
384                .into_iter()
385                .collect();
386
387        for smsg in smsg_filtered.iter() {
388            assert_eq!(smsg.from(), sender2);
389        }
390    }
391
392    #[test]
393    fn message_filtering_to() {
394        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
395        let mut wallet = Wallet::new(keystore);
396        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
397        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
398
399        let mut smsg_vec = Vec::new();
400        for i in 0..4 {
401            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
402            smsg_vec.push(msg);
403        }
404
405        // Create a message to a second target
406        let target2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
407
408        let msg = create_smsg(&target2, &sender, wallet.borrow_mut(), 4, 1000000, 1);
409        smsg_vec.push(msg);
410
411        let smsg_json_vec: Vec<SignedMessage> = smsg_vec.clone().into_iter().collect();
412
413        // Filtering messages to target2
414        let smsg_filtered: Vec<SignedMessage> =
415            filter_messages(smsg_json_vec, None, &Some(target2.into()), &None)
416                .unwrap()
417                .into_iter()
418                .collect();
419
420        for smsg in smsg_filtered.iter() {
421            assert_eq!(smsg.to(), target2);
422        }
423    }
424
425    #[test]
426    fn compute_statistics() {
427        use crate::shim::message::Message;
428        use fvm_ipld_encoding::RawBytes;
429        use std::str::FromStr;
430
431        let addr0 = Address::from_str("t3urxivigpzih5f6ih3oq3lr2jlunw3m5oehbe5efts4ub5wy2oi4fbo5cw7333a4rrffo5535tjdq24wkc2aa").unwrap();
432        let addr1 = Address::from_str("t410fot3vkzzorqg4alowvghvxx4mhofhtazixbm6z2i").unwrap();
433        let messages = [
434            Message {
435                version: 0,
436                from: addr0,
437                to: Address::default(),
438                sequence: 1210,
439                value: TokenAmount::default(),
440                method_num: 5,
441                params: RawBytes::new(vec![]),
442                gas_limit: 25201703,
443                gas_fee_cap: TokenAmount::from_atto(101774),
444                gas_premium: TokenAmount::from_atto(100720),
445            },
446            Message {
447                version: 0,
448                from: addr1,
449                to: Address::default(),
450                sequence: 190,
451                value: TokenAmount::default(),
452                method_num: 5,
453                params: RawBytes::new(vec![]),
454                gas_limit: 21148671,
455                gas_fee_cap: TokenAmount::from_atto(101774),
456                gas_premium: TokenAmount::from_atto(100720),
457            },
458            Message {
459                version: 0,
460                from: addr1,
461                to: Address::default(),
462                sequence: 191,
463                value: TokenAmount::default(),
464                method_num: 5,
465                params: RawBytes::new(vec![]),
466                gas_limit: 112795625,
467                gas_fee_cap: TokenAmount::from_atto(101774),
468                gas_premium: TokenAmount::from_atto(100720),
469            },
470        ];
471        let actor_sequences = HashMap::from_iter([(addr0, 1210), (addr1, 195)]);
472        let curr_base_fee = TokenAmount::from_atto(100);
473        let min_base_fee = TokenAmount::from_atto(100);
474
475        let stats = compute_stats(&messages, actor_sequences, curr_base_fee, min_base_fee);
476
477        let expected = vec![
478            MpStat {
479                address: addr0.to_string(),
480                past: 0,
481                current: 1,
482                future: 0,
483                below_current: 0,
484                below_past: 0,
485                gas_limit: 25201703.into(),
486            },
487            MpStat {
488                address: addr1.to_string(),
489                past: 2,
490                current: 0,
491                future: 0,
492                below_current: 0,
493                below_past: 0,
494                gas_limit: 133944296.into(),
495            },
496        ];
497
498        assert_eq!(stats, expected);
499    }
500}