forest/cli/subcommands/
mpool_cmd.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::Tipset;
5use crate::lotus_json::{HasLotusJson as _, NotNullVec};
6use crate::message::SignedMessage;
7use crate::rpc::{self, prelude::*, types::ApiTipsetKey};
8use crate::shim::address::StrictAddress;
9use crate::shim::message::Message;
10use crate::shim::{address::Address, econ::TokenAmount};
11
12use ahash::{HashMap, HashSet};
13use clap::Subcommand;
14use num::BigInt;
15
16#[derive(Debug, Subcommand)]
17pub enum MpoolCommands {
18    /// Get pending messages
19    Pending {
20        /// Print pending messages for addresses in local wallet only
21        #[arg(long)]
22        local: bool,
23        /// Only print `CIDs` of messages in output
24        #[arg(long)]
25        cids: bool,
26        /// Return messages to a given address
27        #[arg(long)]
28        to: Option<StrictAddress>,
29        /// Return messages from a given address
30        #[arg(long)]
31        from: Option<StrictAddress>,
32    },
33    /// Get the current nonce for an address
34    Nonce {
35        /// Address to check nonce for
36        address: StrictAddress,
37    },
38    /// Print mempool stats
39    Stat {
40        /// Number of blocks to look back for minimum `basefee`
41        #[arg(long, default_value = "60")]
42        basefee_lookback: u32,
43        /// Print stats for addresses in local wallet only
44        #[arg(long)]
45        local: bool,
46    },
47}
48
49fn filter_messages(
50    messages: Vec<SignedMessage>,
51    local_addrs: Option<HashSet<Address>>,
52    to: &Option<StrictAddress>,
53    from: &Option<StrictAddress>,
54) -> anyhow::Result<Vec<SignedMessage>> {
55    use crate::message::Message;
56
57    let filtered = messages
58        .into_iter()
59        .filter(|msg| {
60            local_addrs
61                .as_ref()
62                .map(|addrs| addrs.contains(&msg.from()))
63                .unwrap_or(true)
64                && to.map(|addr| msg.to() == addr.into()).unwrap_or(true)
65                && from.map(|addr| msg.from() == addr.into()).unwrap_or(true)
66        })
67        .collect();
68
69    Ok(filtered)
70}
71
72async fn get_actor_sequence(
73    message: &Message,
74    tipset: &Tipset,
75    client: &rpc::Client,
76) -> Option<u64> {
77    let address = message.from;
78    let get_actor_result = StateGetActor::call(client, (address, tipset.key().into())).await;
79    let actor_state = match get_actor_result {
80        Ok(maybe_actor) => {
81            if let Some(state) = maybe_actor {
82                state
83            } else {
84                println!("{address}, actor state not found");
85                return None;
86            }
87        }
88        Err(err) => {
89            println!("{address}, err: {err}");
90            return None;
91        }
92    };
93
94    Some(actor_state.sequence)
95}
96
97type StatBucket = HashMap<u64, Message>;
98
99#[derive(Debug, Default, Eq, PartialEq)]
100struct MpStat {
101    address: String,
102    past: u64,
103    current: u64,
104    future: u64,
105    below_current: u64,
106    below_past: u64,
107    gas_limit: BigInt,
108}
109
110fn compute_stats(
111    messages: &[Message],
112    actor_sequences: HashMap<Address, u64>,
113    curr_base_fee: TokenAmount,
114    min_base_fee: TokenAmount,
115) -> Vec<MpStat> {
116    let mut buckets = HashMap::<Address, StatBucket>::default();
117    for msg in messages {
118        buckets
119            .entry(msg.from)
120            .or_insert(StatBucket::default())
121            .insert(msg.sequence, msg.to_owned());
122    }
123
124    let mut stats: Vec<MpStat> = Vec::with_capacity(buckets.len());
125
126    for (address, bucket) in buckets {
127        let actor_sequence = *actor_sequences.get(&address).expect("get must succeed");
128
129        let mut curr_sequence = actor_sequence;
130        while bucket.contains_key(&curr_sequence) {
131            curr_sequence += 1;
132        }
133
134        let mut stat = MpStat {
135            address: address.to_string(),
136            ..Default::default()
137        };
138
139        for (_, msg) in bucket {
140            if msg.sequence < actor_sequence {
141                stat.past += 1;
142            } else if msg.sequence > curr_sequence {
143                stat.future += 1;
144            } else {
145                stat.current += 1;
146            }
147
148            if msg.gas_fee_cap < curr_base_fee {
149                stat.below_current += 1;
150            }
151            if msg.gas_fee_cap < min_base_fee {
152                stat.below_past += 1;
153            }
154
155            stat.gas_limit += msg.gas_limit;
156        }
157
158        stats.push(stat);
159    }
160
161    stats.sort_by(|m1, m2| m1.address.cmp(&m2.address));
162    stats
163}
164
165fn print_stats(stats: &[MpStat], basefee_lookback: u32) {
166    let mut total = MpStat::default();
167
168    for stat in stats {
169        total.past += stat.past;
170        total.current += stat.current;
171        total.future += stat.future;
172        total.below_current += stat.below_current;
173        total.below_past += stat.below_past;
174        total.gas_limit += &stat.gas_limit;
175
176        println!(
177            "{}: Nonce past: {}, cur: {}, future: {}; FeeCap cur: {}, min-{}: {}, gasLimit: {}",
178            stat.address,
179            stat.past,
180            stat.current,
181            stat.future,
182            stat.below_current,
183            basefee_lookback,
184            stat.below_past,
185            stat.gas_limit
186        );
187    }
188
189    println!("-----");
190    println!(
191        "total: Nonce past: {}, cur: {}, future: {}; FeeCap cur: {}, min-{}: {}, gasLimit: {}",
192        total.past,
193        total.current,
194        total.future,
195        total.below_current,
196        basefee_lookback,
197        total.below_past,
198        total.gas_limit
199    );
200}
201
202impl MpoolCommands {
203    pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> {
204        match self {
205            Self::Pending {
206                local,
207                cids,
208                to,
209                from,
210            } => {
211                let NotNullVec(messages) =
212                    MpoolPending::call(&client, (ApiTipsetKey(None),)).await?;
213
214                let local_addrs = if local {
215                    let response = WalletList::call(&client, ()).await?;
216                    Some(HashSet::from_iter(response))
217                } else {
218                    None
219                };
220
221                let filtered_messages = filter_messages(messages, local_addrs, &to, &from)?;
222
223                for msg in filtered_messages {
224                    if cids {
225                        println!("{}", msg.cid());
226                    } else {
227                        println!("{}", msg.into_lotus_json_string_pretty()?);
228                    }
229                }
230
231                Ok(())
232            }
233            Self::Stat {
234                basefee_lookback,
235                local,
236            } => {
237                let tipset = ChainHead::call(&client, ()).await?;
238                let curr_base_fee = tipset.block_headers().first().parent_base_fee.to_owned();
239
240                let atto_str = ChainGetMinBaseFee::call(&client, (basefee_lookback,)).await?;
241                let min_base_fee = TokenAmount::from_atto(atto_str.parse::<BigInt>()?);
242
243                let NotNullVec(messages) =
244                    MpoolPending::call(&client, (ApiTipsetKey(None),)).await?;
245
246                let local_addrs = if local {
247                    let response = WalletList::call(&client, ()).await?;
248                    Some(HashSet::from_iter(response))
249                } else {
250                    None
251                };
252
253                let messages: Vec<Message> = filter_messages(messages, local_addrs, &None, &None)?
254                    .into_iter()
255                    .map(|it| it.message)
256                    .collect();
257
258                let mut actor_sequences: HashMap<Address, u64> = HashMap::default();
259                for msg in messages.iter() {
260                    if let Some(sequence) = get_actor_sequence(msg, &tipset, &client).await {
261                        actor_sequences.insert(msg.from, sequence);
262                    }
263                }
264
265                let stats = compute_stats(&messages, actor_sequences, curr_base_fee, min_base_fee);
266
267                print_stats(&stats, basefee_lookback);
268
269                Ok(())
270            }
271            Self::Nonce { address } => {
272                let nonce = MpoolGetNonce::call(&client, (address.into(),)).await?;
273                println!("{nonce}");
274
275                Ok(())
276            }
277        }
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
285    use crate::message::{Message, SignedMessage};
286    use crate::message_pool::tests::create_smsg;
287    use crate::shim::crypto::SignatureType;
288    use std::borrow::BorrowMut;
289
290    #[test]
291    fn message_filtering_none() {
292        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
293        let mut wallet = Wallet::new(keystore);
294        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
295        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
296
297        let mut smsg_vec = Vec::new();
298        for i in 0..4 {
299            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
300            smsg_vec.push(msg);
301        }
302
303        let smsg_json_vec = smsg_vec.clone().into_iter().collect::<Vec<_>>();
304
305        // No filtering is set up
306        let smsg_filtered: Vec<SignedMessage> = filter_messages(smsg_json_vec, None, &None, &None)
307            .unwrap()
308            .into_iter()
309            .collect();
310
311        assert_eq!(smsg_vec, smsg_filtered);
312    }
313
314    #[test]
315    fn message_filtering_local() {
316        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
317        let mut wallet = Wallet::new(keystore);
318        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
319        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
320
321        let mut smsg_vec = Vec::new();
322        for i in 0..4 {
323            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
324            smsg_vec.push(msg);
325        }
326
327        // Create a message with adresses from an external wallet
328        let ext_keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
329        let mut ext_wallet = Wallet::new(ext_keystore);
330        let ext_sender = ext_wallet.generate_addr(SignatureType::Secp256k1).unwrap();
331        let ext_target = ext_wallet.generate_addr(SignatureType::Secp256k1).unwrap();
332
333        let msg = create_smsg(
334            &ext_target,
335            &ext_sender,
336            ext_wallet.borrow_mut(),
337            4,
338            1000000,
339            1,
340        );
341        smsg_vec.push(msg);
342
343        let smsg_json_vec: Vec<SignedMessage> = smsg_vec.clone().into_iter().collect();
344        let local_addrs = HashSet::from_iter(wallet.list_addrs().unwrap());
345
346        // Filter local addresses
347        let smsg_filtered: Vec<SignedMessage> =
348            filter_messages(smsg_json_vec, Some(local_addrs), &None, &None)
349                .unwrap()
350                .into_iter()
351                .collect();
352
353        for smsg in smsg_filtered.iter() {
354            assert_eq!(smsg.from(), sender);
355        }
356    }
357
358    #[test]
359    fn message_filtering_from() {
360        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
361        let mut wallet = Wallet::new(keystore);
362        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
363        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
364
365        let mut smsg_vec = Vec::new();
366        for i in 0..4 {
367            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
368            smsg_vec.push(msg);
369        }
370
371        // Create a message from a second sender
372        let sender2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
373
374        let msg = create_smsg(&target, &sender2, wallet.borrow_mut(), 4, 1000000, 1);
375        smsg_vec.push(msg);
376
377        let smsg_json_vec: Vec<SignedMessage> = smsg_vec.clone().into_iter().collect();
378
379        // Filtering messages from sender2
380        let smsg_filtered: Vec<SignedMessage> =
381            filter_messages(smsg_json_vec, None, &None, &Some(sender2.into()))
382                .unwrap()
383                .into_iter()
384                .collect();
385
386        for smsg in smsg_filtered.iter() {
387            assert_eq!(smsg.from(), sender2);
388        }
389    }
390
391    #[test]
392    fn message_filtering_to() {
393        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
394        let mut wallet = Wallet::new(keystore);
395        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
396        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
397
398        let mut smsg_vec = Vec::new();
399        for i in 0..4 {
400            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
401            smsg_vec.push(msg);
402        }
403
404        // Create a message to a second target
405        let target2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
406
407        let msg = create_smsg(&target2, &sender, wallet.borrow_mut(), 4, 1000000, 1);
408        smsg_vec.push(msg);
409
410        let smsg_json_vec: Vec<SignedMessage> = smsg_vec.clone().into_iter().collect();
411
412        // Filtering messages to target2
413        let smsg_filtered: Vec<SignedMessage> =
414            filter_messages(smsg_json_vec, None, &Some(target2.into()), &None)
415                .unwrap()
416                .into_iter()
417                .collect();
418
419        for smsg in smsg_filtered.iter() {
420            assert_eq!(smsg.to(), target2);
421        }
422    }
423
424    #[test]
425    fn compute_statistics() {
426        use crate::shim::message::Message;
427        use fvm_ipld_encoding::RawBytes;
428        use std::str::FromStr;
429
430        let addr0 = Address::from_str("t3urxivigpzih5f6ih3oq3lr2jlunw3m5oehbe5efts4ub5wy2oi4fbo5cw7333a4rrffo5535tjdq24wkc2aa").unwrap();
431        let addr1 = Address::from_str("t410fot3vkzzorqg4alowvghvxx4mhofhtazixbm6z2i").unwrap();
432        let messages = [
433            Message {
434                version: 0,
435                from: addr0,
436                to: Address::default(),
437                sequence: 1210,
438                value: TokenAmount::default(),
439                method_num: 5,
440                params: RawBytes::new(vec![]),
441                gas_limit: 25201703,
442                gas_fee_cap: TokenAmount::from_atto(101774),
443                gas_premium: TokenAmount::from_atto(100720),
444            },
445            Message {
446                version: 0,
447                from: addr1,
448                to: Address::default(),
449                sequence: 190,
450                value: TokenAmount::default(),
451                method_num: 5,
452                params: RawBytes::new(vec![]),
453                gas_limit: 21148671,
454                gas_fee_cap: TokenAmount::from_atto(101774),
455                gas_premium: TokenAmount::from_atto(100720),
456            },
457            Message {
458                version: 0,
459                from: addr1,
460                to: Address::default(),
461                sequence: 191,
462                value: TokenAmount::default(),
463                method_num: 5,
464                params: RawBytes::new(vec![]),
465                gas_limit: 112795625,
466                gas_fee_cap: TokenAmount::from_atto(101774),
467                gas_premium: TokenAmount::from_atto(100720),
468            },
469        ];
470        let actor_sequences = HashMap::from_iter([(addr0, 1210), (addr1, 195)]);
471        let curr_base_fee = TokenAmount::from_atto(100);
472        let min_base_fee = TokenAmount::from_atto(100);
473
474        let stats = compute_stats(&messages, actor_sequences, curr_base_fee, min_base_fee);
475
476        let expected = vec![
477            MpStat {
478                address: addr0.to_string(),
479                past: 0,
480                current: 1,
481                future: 0,
482                below_current: 0,
483                below_past: 0,
484                gas_limit: 25201703.into(),
485            },
486            MpStat {
487                address: addr1.to_string(),
488                past: 2,
489                current: 0,
490                future: 0,
491                below_current: 0,
492                below_past: 0,
493                gas_limit: 133944296.into(),
494            },
495        ];
496
497        assert_eq!(stats, expected);
498    }
499}