1use crate::blocks::Tipset;
5use crate::lotus_json::{HasLotusJson as _, NotNullVec};
6use crate::message::SignedMessage;
7use crate::rpc::{self, prelude::*, types::ApiTipsetKey};
8use crate::shim::address::StrictAddress;
9use crate::shim::message::Message;
10use crate::shim::{address::Address, econ::TokenAmount};
11
12use ahash::{HashMap, HashSet};
13use clap::Subcommand;
14use num::BigInt;
15
16#[derive(Debug, Subcommand)]
17pub enum MpoolCommands {
18 Pending {
20 #[arg(long)]
22 local: bool,
23 #[arg(long)]
25 cids: bool,
26 #[arg(long)]
28 to: Option<StrictAddress>,
29 #[arg(long)]
31 from: Option<StrictAddress>,
32 },
33 Nonce {
35 address: StrictAddress,
37 },
38 Stat {
40 #[arg(long, default_value = "60")]
42 basefee_lookback: u32,
43 #[arg(long)]
45 local: bool,
46 },
47}
48
49fn filter_messages(
50 messages: Vec<SignedMessage>,
51 local_addrs: Option<HashSet<Address>>,
52 to: &Option<StrictAddress>,
53 from: &Option<StrictAddress>,
54) -> anyhow::Result<Vec<SignedMessage>> {
55 use crate::message::Message;
56
57 let filtered = messages
58 .into_iter()
59 .filter(|msg| {
60 local_addrs
61 .as_ref()
62 .map(|addrs| addrs.contains(&msg.from()))
63 .unwrap_or(true)
64 && to.map(|addr| msg.to() == addr.into()).unwrap_or(true)
65 && from.map(|addr| msg.from() == addr.into()).unwrap_or(true)
66 })
67 .collect();
68
69 Ok(filtered)
70}
71
72async fn get_actor_sequence(
73 message: &Message,
74 tipset: &Tipset,
75 client: &rpc::Client,
76) -> Option<u64> {
77 let address = message.from;
78 let get_actor_result = StateGetActor::call(client, (address, tipset.key().into())).await;
79 let actor_state = match get_actor_result {
80 Ok(maybe_actor) => {
81 if let Some(state) = maybe_actor {
82 state
83 } else {
84 println!("{address}, actor state not found");
85 return None;
86 }
87 }
88 Err(err) => {
89 println!("{address}, err: {err}");
90 return None;
91 }
92 };
93
94 Some(actor_state.sequence)
95}
96
97type StatBucket = HashMap<u64, Message>;
98
99#[derive(Debug, Default, Eq, PartialEq)]
100struct MpStat {
101 address: String,
102 past: u64,
103 current: u64,
104 future: u64,
105 below_current: u64,
106 below_past: u64,
107 gas_limit: BigInt,
108}
109
110fn compute_stats(
111 messages: &[Message],
112 actor_sequences: HashMap<Address, u64>,
113 curr_base_fee: TokenAmount,
114 min_base_fee: TokenAmount,
115) -> Vec<MpStat> {
116 let mut buckets = HashMap::<Address, StatBucket>::default();
117 for msg in messages {
118 buckets
119 .entry(msg.from)
120 .or_insert(StatBucket::default())
121 .insert(msg.sequence, msg.to_owned());
122 }
123
124 let mut stats: Vec<MpStat> = Vec::with_capacity(buckets.len());
125
126 for (address, bucket) in buckets {
127 let actor_sequence = *actor_sequences.get(&address).expect("get must succeed");
128
129 let mut curr_sequence = actor_sequence;
130 while bucket.contains_key(&curr_sequence) {
131 curr_sequence += 1;
132 }
133
134 let mut stat = MpStat {
135 address: address.to_string(),
136 ..Default::default()
137 };
138
139 for (_, msg) in bucket {
140 if msg.sequence < actor_sequence {
141 stat.past += 1;
142 } else if msg.sequence > curr_sequence {
143 stat.future += 1;
144 } else {
145 stat.current += 1;
146 }
147
148 if msg.gas_fee_cap < curr_base_fee {
149 stat.below_current += 1;
150 }
151 if msg.gas_fee_cap < min_base_fee {
152 stat.below_past += 1;
153 }
154
155 stat.gas_limit += msg.gas_limit;
156 }
157
158 stats.push(stat);
159 }
160
161 stats.sort_by(|m1, m2| m1.address.cmp(&m2.address));
162 stats
163}
164
165fn print_stats(stats: &[MpStat], basefee_lookback: u32) {
166 let mut total = MpStat::default();
167
168 for stat in stats {
169 total.past += stat.past;
170 total.current += stat.current;
171 total.future += stat.future;
172 total.below_current += stat.below_current;
173 total.below_past += stat.below_past;
174 total.gas_limit += &stat.gas_limit;
175
176 println!(
177 "{}: Nonce past: {}, cur: {}, future: {}; FeeCap cur: {}, min-{}: {}, gasLimit: {}",
178 stat.address,
179 stat.past,
180 stat.current,
181 stat.future,
182 stat.below_current,
183 basefee_lookback,
184 stat.below_past,
185 stat.gas_limit
186 );
187 }
188
189 println!("-----");
190 println!(
191 "total: Nonce past: {}, cur: {}, future: {}; FeeCap cur: {}, min-{}: {}, gasLimit: {}",
192 total.past,
193 total.current,
194 total.future,
195 total.below_current,
196 basefee_lookback,
197 total.below_past,
198 total.gas_limit
199 );
200}
201
202impl MpoolCommands {
203 pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> {
204 match self {
205 Self::Pending {
206 local,
207 cids,
208 to,
209 from,
210 } => {
211 let NotNullVec(messages) =
212 MpoolPending::call(&client, (ApiTipsetKey(None),)).await?;
213
214 let local_addrs = if local {
215 let response = WalletList::call(&client, ()).await?;
216 Some(HashSet::from_iter(response))
217 } else {
218 None
219 };
220
221 let filtered_messages = filter_messages(messages, local_addrs, &to, &from)?;
222
223 for msg in filtered_messages {
224 if cids {
225 println!("{}", msg.cid());
226 } else {
227 println!("{}", msg.into_lotus_json_string_pretty()?);
228 }
229 }
230
231 Ok(())
232 }
233 Self::Stat {
234 basefee_lookback,
235 local,
236 } => {
237 let tipset = ChainHead::call(&client, ()).await?;
238 let curr_base_fee = tipset.block_headers().first().parent_base_fee.to_owned();
239
240 let atto_str = ChainGetMinBaseFee::call(&client, (basefee_lookback,)).await?;
241 let min_base_fee = TokenAmount::from_atto(atto_str.parse::<BigInt>()?);
242
243 let NotNullVec(messages) =
244 MpoolPending::call(&client, (ApiTipsetKey(None),)).await?;
245
246 let local_addrs = if local {
247 let response = WalletList::call(&client, ()).await?;
248 Some(HashSet::from_iter(response))
249 } else {
250 None
251 };
252
253 let messages: Vec<Message> = filter_messages(messages, local_addrs, &None, &None)?
254 .into_iter()
255 .map(|it| it.message)
256 .collect();
257
258 let mut actor_sequences: HashMap<Address, u64> = HashMap::default();
259 for msg in messages.iter() {
260 if let Some(sequence) = get_actor_sequence(msg, &tipset, &client).await {
261 actor_sequences.insert(msg.from, sequence);
262 }
263 }
264
265 let stats = compute_stats(&messages, actor_sequences, curr_base_fee, min_base_fee);
266
267 print_stats(&stats, basefee_lookback);
268
269 Ok(())
270 }
271 Self::Nonce { address } => {
272 let nonce = MpoolGetNonce::call(&client, (address.into(),)).await?;
273 println!("{nonce}");
274
275 Ok(())
276 }
277 }
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284 use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
285 use crate::message::{Message, SignedMessage};
286 use crate::message_pool::tests::create_smsg;
287 use crate::shim::crypto::SignatureType;
288 use std::borrow::BorrowMut;
289
290 #[test]
291 fn message_filtering_none() {
292 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
293 let mut wallet = Wallet::new(keystore);
294 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
295 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
296
297 let mut smsg_vec = Vec::new();
298 for i in 0..4 {
299 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
300 smsg_vec.push(msg);
301 }
302
303 let smsg_json_vec = smsg_vec.clone().into_iter().collect::<Vec<_>>();
304
305 let smsg_filtered: Vec<SignedMessage> = filter_messages(smsg_json_vec, None, &None, &None)
307 .unwrap()
308 .into_iter()
309 .collect();
310
311 assert_eq!(smsg_vec, smsg_filtered);
312 }
313
314 #[test]
315 fn message_filtering_local() {
316 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
317 let mut wallet = Wallet::new(keystore);
318 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
319 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
320
321 let mut smsg_vec = Vec::new();
322 for i in 0..4 {
323 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
324 smsg_vec.push(msg);
325 }
326
327 let ext_keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
329 let mut ext_wallet = Wallet::new(ext_keystore);
330 let ext_sender = ext_wallet.generate_addr(SignatureType::Secp256k1).unwrap();
331 let ext_target = ext_wallet.generate_addr(SignatureType::Secp256k1).unwrap();
332
333 let msg = create_smsg(
334 &ext_target,
335 &ext_sender,
336 ext_wallet.borrow_mut(),
337 4,
338 1000000,
339 1,
340 );
341 smsg_vec.push(msg);
342
343 let smsg_json_vec: Vec<SignedMessage> = smsg_vec.clone().into_iter().collect();
344 let local_addrs = HashSet::from_iter(wallet.list_addrs().unwrap());
345
346 let smsg_filtered: Vec<SignedMessage> =
348 filter_messages(smsg_json_vec, Some(local_addrs), &None, &None)
349 .unwrap()
350 .into_iter()
351 .collect();
352
353 for smsg in smsg_filtered.iter() {
354 assert_eq!(smsg.from(), sender);
355 }
356 }
357
358 #[test]
359 fn message_filtering_from() {
360 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
361 let mut wallet = Wallet::new(keystore);
362 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
363 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
364
365 let mut smsg_vec = Vec::new();
366 for i in 0..4 {
367 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
368 smsg_vec.push(msg);
369 }
370
371 let sender2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
373
374 let msg = create_smsg(&target, &sender2, wallet.borrow_mut(), 4, 1000000, 1);
375 smsg_vec.push(msg);
376
377 let smsg_json_vec: Vec<SignedMessage> = smsg_vec.clone().into_iter().collect();
378
379 let smsg_filtered: Vec<SignedMessage> =
381 filter_messages(smsg_json_vec, None, &None, &Some(sender2.into()))
382 .unwrap()
383 .into_iter()
384 .collect();
385
386 for smsg in smsg_filtered.iter() {
387 assert_eq!(smsg.from(), sender2);
388 }
389 }
390
391 #[test]
392 fn message_filtering_to() {
393 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
394 let mut wallet = Wallet::new(keystore);
395 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
396 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
397
398 let mut smsg_vec = Vec::new();
399 for i in 0..4 {
400 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i as u64, 1000000, 1);
401 smsg_vec.push(msg);
402 }
403
404 let target2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
406
407 let msg = create_smsg(&target2, &sender, wallet.borrow_mut(), 4, 1000000, 1);
408 smsg_vec.push(msg);
409
410 let smsg_json_vec: Vec<SignedMessage> = smsg_vec.clone().into_iter().collect();
411
412 let smsg_filtered: Vec<SignedMessage> =
414 filter_messages(smsg_json_vec, None, &Some(target2.into()), &None)
415 .unwrap()
416 .into_iter()
417 .collect();
418
419 for smsg in smsg_filtered.iter() {
420 assert_eq!(smsg.to(), target2);
421 }
422 }
423
424 #[test]
425 fn compute_statistics() {
426 use crate::shim::message::Message;
427 use fvm_ipld_encoding::RawBytes;
428 use std::str::FromStr;
429
430 let addr0 = Address::from_str("t3urxivigpzih5f6ih3oq3lr2jlunw3m5oehbe5efts4ub5wy2oi4fbo5cw7333a4rrffo5535tjdq24wkc2aa").unwrap();
431 let addr1 = Address::from_str("t410fot3vkzzorqg4alowvghvxx4mhofhtazixbm6z2i").unwrap();
432 let messages = [
433 Message {
434 version: 0,
435 from: addr0,
436 to: Address::default(),
437 sequence: 1210,
438 value: TokenAmount::default(),
439 method_num: 5,
440 params: RawBytes::new(vec![]),
441 gas_limit: 25201703,
442 gas_fee_cap: TokenAmount::from_atto(101774),
443 gas_premium: TokenAmount::from_atto(100720),
444 },
445 Message {
446 version: 0,
447 from: addr1,
448 to: Address::default(),
449 sequence: 190,
450 value: TokenAmount::default(),
451 method_num: 5,
452 params: RawBytes::new(vec![]),
453 gas_limit: 21148671,
454 gas_fee_cap: TokenAmount::from_atto(101774),
455 gas_premium: TokenAmount::from_atto(100720),
456 },
457 Message {
458 version: 0,
459 from: addr1,
460 to: Address::default(),
461 sequence: 191,
462 value: TokenAmount::default(),
463 method_num: 5,
464 params: RawBytes::new(vec![]),
465 gas_limit: 112795625,
466 gas_fee_cap: TokenAmount::from_atto(101774),
467 gas_premium: TokenAmount::from_atto(100720),
468 },
469 ];
470 let actor_sequences = HashMap::from_iter([(addr0, 1210), (addr1, 195)]);
471 let curr_base_fee = TokenAmount::from_atto(100);
472 let min_base_fee = TokenAmount::from_atto(100);
473
474 let stats = compute_stats(&messages, actor_sequences, curr_base_fee, min_base_fee);
475
476 let expected = vec![
477 MpStat {
478 address: addr0.to_string(),
479 past: 0,
480 current: 1,
481 future: 0,
482 below_current: 0,
483 below_past: 0,
484 gas_limit: 25201703.into(),
485 },
486 MpStat {
487 address: addr1.to_string(),
488 past: 2,
489 current: 0,
490 future: 0,
491 below_current: 0,
492 below_past: 0,
493 gas_limit: 133944296.into(),
494 },
495 ];
496
497 assert_eq!(stats, expected);
498 }
499}