1use crate::blocks::Tipset;
5use crate::lotus_json::{HasLotusJson as _, NotNullVec};
6use crate::message::SignedMessage;
7use crate::rpc::{self, prelude::*, types::ApiTipsetKey};
8use crate::shim::address::StrictAddress;
9use crate::shim::message::Message;
10use crate::shim::{address::Address, econ::TokenAmount};
11
12use ahash::{HashMap, HashSet};
13use clap::Subcommand;
14use num::BigInt;
15
16#[derive(Debug, Subcommand)]
17pub enum MpoolCommands {
18 Pending {
20 #[arg(long)]
22 local: bool,
23 #[arg(long)]
25 cids: bool,
26 #[arg(long)]
28 to: Option<StrictAddress>,
29 #[arg(long)]
31 from: Option<StrictAddress>,
32 },
33 Nonce {
35 address: StrictAddress,
37 },
38 Stat {
40 #[arg(long, default_value = "60")]
42 basefee_lookback: u32,
43 #[arg(long)]
45 local: bool,
46 },
47}
48
49fn filter_messages(
50 messages: Vec<SignedMessage>,
51 local_addrs: Option<HashSet<Address>>,
52 to: &Option<StrictAddress>,
53 from: &Option<StrictAddress>,
54) -> anyhow::Result<Vec<SignedMessage>> {
55 use crate::message::Message;
56
57 let filtered = messages
58 .into_iter()
59 .filter(|msg| {
60 local_addrs
61 .as_ref()
62 .map(|addrs| addrs.contains(&msg.from()))
63 .unwrap_or(true)
64 && to.map(|addr| msg.to() == addr.into()).unwrap_or(true)
65 && from.map(|addr| msg.from() == addr.into()).unwrap_or(true)
66 })
67 .collect();
68
69 Ok(filtered)
70}
71
72async fn get_actor_sequence(
73 message: &Message,
74 tipset: &Tipset,
75 client: &rpc::Client,
76) -> Option<u64> {
77 let address = message.from;
78 let get_actor_result = StateGetActor::call(client, (address, tipset.key().into())).await;
79 let actor_state = match get_actor_result {
80 Ok(maybe_actor) => {
81 if let Some(state) = maybe_actor {
82 state
83 } else {
84 println!("{address}, actor state not found");
85 return None;
86 }
87 }
88 Err(err) => {
89 println!("{address}, err: {err}");
90 return None;
91 }
92 };
93
94 Some(actor_state.sequence)
95}
96
97type StatBucket = HashMap<u64, Message>;
98
99#[derive(Debug, Default, Eq, PartialEq)]
100struct MpStat {
101 address: String,
102 past: u64,
103 current: u64,
104 future: u64,
105 below_current: u64,
106 below_past: u64,
107 gas_limit: BigInt,
108}
109
110fn compute_stats(
111 messages: &[Message],
112 actor_sequences: HashMap<Address, u64>,
113 curr_base_fee: TokenAmount,
114 min_base_fee: TokenAmount,
115) -> Vec<MpStat> {
116 let mut buckets = HashMap::<Address, StatBucket>::default();
117 for msg in messages {
118 buckets
119 .entry(msg.from)
120 .or_insert(StatBucket::default())
121 .insert(msg.sequence, msg.to_owned());
122 }
123
124 let mut stats: Vec<MpStat> = Vec::with_capacity(buckets.len());
125
126 for (address, bucket) in buckets {
127 let actor_sequence = *actor_sequences.get(&address).expect("get must succeed");
128
129 let mut curr_sequence = actor_sequence;
130 while bucket.contains_key(&curr_sequence) {
131 curr_sequence += 1;
132 }
133
134 let mut stat = MpStat {
135 address: address.to_string(),
136 ..Default::default()
137 };
138
139 for (_, msg) in bucket {
140 if msg.sequence < actor_sequence {
141 stat.past += 1;
142 } else if msg.sequence > curr_sequence {
143 stat.future += 1;
144 } else {
145 stat.current += 1;
146 }
147
148 if msg.gas_fee_cap < curr_base_fee {
149 stat.below_current += 1;
150 }
151 if msg.gas_fee_cap < min_base_fee {
152 stat.below_past += 1;
153 }
154
155 stat.gas_limit += msg.gas_limit;
156 }
157
158 stats.push(stat);
159 }
160
161 stats.sort_by(|m1, m2| m1.address.cmp(&m2.address));
162 stats
163}
164
165fn print_stats(stats: &[MpStat], basefee_lookback: u32) {
166 let mut total = MpStat::default();
167
168 for stat in stats {
169 total.past += stat.past;
170 total.current += stat.current;
171 total.future += stat.future;
172 total.below_current += stat.below_current;
173 total.below_past += stat.below_past;
174 total.gas_limit += &stat.gas_limit;
175
176 println!(
177 "{}: Nonce past: {}, cur: {}, future: {}; FeeCap cur: {}, min-{}: {}, gasLimit: {}",
178 stat.address,
179 stat.past,
180 stat.current,
181 stat.future,
182 stat.below_current,
183 basefee_lookback,
184 stat.below_past,
185 stat.gas_limit
186 );
187 }
188
189 println!("-----");
190 println!(
191 "total: Nonce past: {}, cur: {}, future: {}; FeeCap cur: {}, min-{}: {}, gasLimit: {}",
192 total.past,
193 total.current,
194 total.future,
195 total.below_current,
196 basefee_lookback,
197 total.below_past,
198 total.gas_limit
199 );
200}
201
202impl MpoolCommands {
203 pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> {
204 match self {
205 Self::Pending {
206 local,
207 cids,
208 to,
209 from,
210 } => {
211 let NotNullVec(messages) =
212 MpoolPending::call(&client, (ApiTipsetKey(None),)).await?;
213
214 let local_addrs = if local {
215 let response = WalletList::call(&client, ()).await?;
216 Some(HashSet::from_iter(response))
217 } else {
218 None
219 };
220
221 let filtered_messages = filter_messages(messages, local_addrs, &to, &from)?;
222
223 for msg in filtered_messages {
224 if cids {
225 println!("{}", msg.cid());
226 } else {
227 println!("{}", msg.into_lotus_json_string_pretty()?);
228 }
229 }
230
231 Ok(())
232 }
233 Self::Stat {
234 basefee_lookback,
235 local,
236 } => {
237 let tipset = ChainHead::call(&client, ()).await?;
238 let curr_base_fee = tipset.block_headers().first().parent_base_fee.to_owned();
239
240 let atto_str = ChainGetMinBaseFee::call(&client, (basefee_lookback,)).await?;
241 let min_base_fee = TokenAmount::from_atto(atto_str.parse::<BigInt>()?);
242
243 let NotNullVec(messages) =
244 MpoolPending::call(&client, (ApiTipsetKey(None),)).await?;
245
246 let local_addrs = if local {
247 let response = WalletList::call(&client, ()).await?;
248 Some(HashSet::from_iter(response))
249 } else {
250 None
251 };
252
253 let messages: Vec<Message> = filter_messages(messages, local_addrs, &None, &None)?
254 .into_iter()
255 .map(|it| it.message)
256 .collect();
257
258 let mut actor_sequences: HashMap<Address, u64> = HashMap::default();
259 for msg in messages.iter() {
260 if let Some(sequence) = get_actor_sequence(msg, &tipset, &client).await {
261 actor_sequences.insert(msg.from, sequence);
262 }
263 }
264
265 let stats = compute_stats(&messages, actor_sequences, curr_base_fee, min_base_fee);
266
267 print_stats(&stats, basefee_lookback);
268
269 Ok(())
270 }
271 Self::Nonce { address } => {
272 let nonce = MpoolGetNonce::call(&client, (address.into(),)).await?;
273 println!("{nonce}");
274
275 Ok(())
276 }
277 }
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284 use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
285 use crate::message::{Message, SignedMessage};
286 use crate::message_pool::tests::create_smsg;
287 use crate::shim::crypto::SignatureType;
288 use itertools::Itertools as _;
289 use std::borrow::BorrowMut;
290
291 #[test]
292 fn message_filtering_none() {
293 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
294 let mut wallet = Wallet::new(keystore);
295 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
296 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
297
298 let mut smsg_vec = Vec::new();
299 for i in 0..4 {
300 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
301 smsg_vec.push(msg);
302 }
303
304 let smsg_json_vec = smsg_vec.clone().into_iter().collect_vec();
305
306 let smsg_filtered: Vec<SignedMessage> = filter_messages(smsg_json_vec, None, &None, &None)
308 .unwrap()
309 .into_iter()
310 .collect();
311
312 assert_eq!(smsg_vec, smsg_filtered);
313 }
314
315 #[test]
316 fn message_filtering_local() {
317 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
318 let mut wallet = Wallet::new(keystore);
319 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
320 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
321
322 let mut smsg_vec = Vec::new();
323 for i in 0..4 {
324 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
325 smsg_vec.push(msg);
326 }
327
328 let ext_keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
330 let mut ext_wallet = Wallet::new(ext_keystore);
331 let ext_sender = ext_wallet.generate_addr(SignatureType::Secp256k1).unwrap();
332 let ext_target = ext_wallet.generate_addr(SignatureType::Secp256k1).unwrap();
333
334 let msg = create_smsg(
335 &ext_target,
336 &ext_sender,
337 ext_wallet.borrow_mut(),
338 4,
339 1000000,
340 1,
341 );
342 smsg_vec.push(msg);
343
344 let smsg_json_vec: Vec<SignedMessage> = smsg_vec.clone().into_iter().collect();
345 let local_addrs = HashSet::from_iter(wallet.list_addrs().unwrap());
346
347 let smsg_filtered: Vec<SignedMessage> =
349 filter_messages(smsg_json_vec, Some(local_addrs), &None, &None)
350 .unwrap()
351 .into_iter()
352 .collect();
353
354 for smsg in smsg_filtered.iter() {
355 assert_eq!(smsg.from(), sender);
356 }
357 }
358
359 #[test]
360 fn message_filtering_from() {
361 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
362 let mut wallet = Wallet::new(keystore);
363 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
364 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
365
366 let mut smsg_vec = Vec::new();
367 for i in 0..4 {
368 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
369 smsg_vec.push(msg);
370 }
371
372 let sender2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
374
375 let msg = create_smsg(&target, &sender2, wallet.borrow_mut(), 4, 1000000, 1);
376 smsg_vec.push(msg);
377
378 let smsg_json_vec: Vec<SignedMessage> = smsg_vec.clone().into_iter().collect();
379
380 let smsg_filtered: Vec<SignedMessage> =
382 filter_messages(smsg_json_vec, None, &None, &Some(sender2.into()))
383 .unwrap()
384 .into_iter()
385 .collect();
386
387 for smsg in smsg_filtered.iter() {
388 assert_eq!(smsg.from(), sender2);
389 }
390 }
391
392 #[test]
393 fn message_filtering_to() {
394 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
395 let mut wallet = Wallet::new(keystore);
396 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
397 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
398
399 let mut smsg_vec = Vec::new();
400 for i in 0..4 {
401 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
402 smsg_vec.push(msg);
403 }
404
405 let target2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
407
408 let msg = create_smsg(&target2, &sender, wallet.borrow_mut(), 4, 1000000, 1);
409 smsg_vec.push(msg);
410
411 let smsg_json_vec: Vec<SignedMessage> = smsg_vec.clone().into_iter().collect();
412
413 let smsg_filtered: Vec<SignedMessage> =
415 filter_messages(smsg_json_vec, None, &Some(target2.into()), &None)
416 .unwrap()
417 .into_iter()
418 .collect();
419
420 for smsg in smsg_filtered.iter() {
421 assert_eq!(smsg.to(), target2);
422 }
423 }
424
425 #[test]
426 fn compute_statistics() {
427 use crate::shim::message::Message;
428 use fvm_ipld_encoding::RawBytes;
429 use std::str::FromStr;
430
431 let addr0 = Address::from_str("t3urxivigpzih5f6ih3oq3lr2jlunw3m5oehbe5efts4ub5wy2oi4fbo5cw7333a4rrffo5535tjdq24wkc2aa").unwrap();
432 let addr1 = Address::from_str("t410fot3vkzzorqg4alowvghvxx4mhofhtazixbm6z2i").unwrap();
433 let messages = [
434 Message {
435 version: 0,
436 from: addr0,
437 to: Address::default(),
438 sequence: 1210,
439 value: TokenAmount::default(),
440 method_num: 5,
441 params: RawBytes::new(vec![]),
442 gas_limit: 25201703,
443 gas_fee_cap: TokenAmount::from_atto(101774),
444 gas_premium: TokenAmount::from_atto(100720),
445 },
446 Message {
447 version: 0,
448 from: addr1,
449 to: Address::default(),
450 sequence: 190,
451 value: TokenAmount::default(),
452 method_num: 5,
453 params: RawBytes::new(vec![]),
454 gas_limit: 21148671,
455 gas_fee_cap: TokenAmount::from_atto(101774),
456 gas_premium: TokenAmount::from_atto(100720),
457 },
458 Message {
459 version: 0,
460 from: addr1,
461 to: Address::default(),
462 sequence: 191,
463 value: TokenAmount::default(),
464 method_num: 5,
465 params: RawBytes::new(vec![]),
466 gas_limit: 112795625,
467 gas_fee_cap: TokenAmount::from_atto(101774),
468 gas_premium: TokenAmount::from_atto(100720),
469 },
470 ];
471 let actor_sequences = HashMap::from_iter([(addr0, 1210), (addr1, 195)]);
472 let curr_base_fee = TokenAmount::from_atto(100);
473 let min_base_fee = TokenAmount::from_atto(100);
474
475 let stats = compute_stats(&messages, actor_sequences, curr_base_fee, min_base_fee);
476
477 let expected = vec![
478 MpStat {
479 address: addr0.to_string(),
480 past: 0,
481 current: 1,
482 future: 0,
483 below_current: 0,
484 below_past: 0,
485 gas_limit: 25201703.into(),
486 },
487 MpStat {
488 address: addr1.to_string(),
489 past: 2,
490 current: 0,
491 future: 0,
492 below_current: 0,
493 below_past: 0,
494 gas_limit: 133944296.into(),
495 },
496 ];
497
498 assert_eq!(stats, expected);
499 }
500}