1use std::{num::NonZeroUsize, sync::Arc, time::Duration};
10
11use crate::blocks::{CachingBlockHeader, Tipset};
12use crate::chain::{HeadChange, MINIMUM_BASE_FEE};
13#[cfg(test)]
14use crate::db::SettingsStore;
15use crate::eth::is_valid_eth_tx_for_sending;
16use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
17use crate::message::{ChainMessage, Message, SignedMessage, valid_for_block_inclusion};
18use crate::networks::{ChainConfig, NEWEST_NETWORK_VERSION};
19use crate::shim::{
20 address::Address,
21 crypto::{Signature, SignatureType},
22 econ::TokenAmount,
23 gas::{Gas, price_list_by_network_version},
24};
25use crate::state_manager::utils::is_valid_for_sending;
26use crate::utils::cache::SizeTrackingLruCache;
27use crate::utils::get_size::CidWrapper;
28use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
29use anyhow::Context as _;
30use cid::Cid;
31use futures::StreamExt;
32use fvm_ipld_encoding::to_vec;
33use itertools::Itertools;
34use nonzero_ext::nonzero;
35use parking_lot::RwLock as SyncRwLock;
36use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
37use tracing::warn;
38
39use crate::message_pool::{
40 config::MpoolConfig,
41 errors::Error,
42 head_change, metrics,
43 msgpool::{
44 BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, RBF_DENOM, RBF_NUM, recover_sig,
45 republish_pending_messages,
46 },
47 provider::Provider,
48 utils::get_base_fee_lower_bound,
49};
50
51const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize);
53const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize);
54
55pub const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000;
56pub const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10;
57const MAX_MESSAGE_SIZE: usize = 64 << 10; #[derive(Clone, Default, Debug)]
64pub struct MsgSet {
65 pub(in crate::message_pool) msgs: HashMap<u64, SignedMessage>,
66 next_sequence: u64,
67}
68
69impl MsgSet {
70 pub fn new(sequence: u64) -> Self {
73 MsgSet {
74 msgs: HashMap::new(),
75 next_sequence: sequence,
76 }
77 }
78
79 pub fn add_trusted<T>(&mut self, api: &T, m: SignedMessage) -> Result<(), Error>
83 where
84 T: Provider,
85 {
86 self.add(api, m, true)
87 }
88
89 #[allow(dead_code)]
93 pub fn add_untrusted<T>(&mut self, api: &T, m: SignedMessage) -> Result<(), Error>
94 where
95 T: Provider,
96 {
97 self.add(api, m, false)
98 }
99
100 fn add<T>(&mut self, api: &T, m: SignedMessage, trusted: bool) -> Result<(), Error>
101 where
102 T: Provider,
103 {
104 let max_actor_pending_messages = if trusted {
105 api.max_actor_pending_messages()
106 } else {
107 api.max_untrusted_actor_pending_messages()
108 };
109
110 if self.msgs.is_empty() || m.sequence() >= self.next_sequence {
111 self.next_sequence = m.sequence() + 1;
112 }
113
114 let has_existing = if let Some(exms) = self.msgs.get(&m.sequence()) {
115 if m.cid() != exms.cid() {
116 let premium = &exms.message().gas_premium;
117 let min_price = premium.clone()
118 + ((premium * RBF_NUM).div_floor(RBF_DENOM))
119 + TokenAmount::from_atto(1u8);
120 if m.message().gas_premium <= min_price {
121 return Err(Error::GasPriceTooLow);
122 }
123 } else {
124 return Err(Error::DuplicateSequence);
125 }
126 true
127 } else {
128 false
129 };
130
131 if !has_existing && self.msgs.len() as u64 >= max_actor_pending_messages {
133 return Err(Error::TooManyPendingMessages(
134 m.message.from().to_string(),
135 trusted,
136 ));
137 }
138 if self.msgs.insert(m.sequence(), m).is_none() {
139 metrics::MPOOL_MESSAGE_TOTAL.inc();
140 }
141 Ok(())
142 }
143
144 pub fn rm(&mut self, sequence: u64, applied: bool) {
147 if self.msgs.remove(&sequence).is_none() {
148 if applied && sequence >= self.next_sequence {
149 self.next_sequence = sequence + 1;
150 while self.msgs.contains_key(&self.next_sequence) {
151 self.next_sequence += 1;
152 }
153 }
154 return;
155 }
156 metrics::MPOOL_MESSAGE_TOTAL.dec();
157
158 if applied {
160 if sequence >= self.next_sequence {
163 self.next_sequence = sequence + 1;
164 }
165 return;
166 }
167 if sequence < self.next_sequence {
170 self.next_sequence = sequence;
171 }
172 }
173}
174
175pub struct MessagePool<T> {
179 local_addrs: Arc<SyncRwLock<Vec<Address>>>,
181 pub pending: Arc<SyncRwLock<HashMap<Address, MsgSet>>>,
183 pub cur_tipset: Arc<SyncRwLock<Tipset>>,
185 pub api: Arc<T>,
187 pub network_sender: flume::Sender<NetworkMessage>,
189 pub bls_sig_cache: Arc<SizeTrackingLruCache<CidWrapper, Signature>>,
191 pub sig_val_cache: Arc<SizeTrackingLruCache<CidWrapper, ()>>,
193 pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
195 pub repub_trigger: flume::Sender<()>,
198 local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
199 pub config: MpoolConfig,
201 pub chain_config: Arc<ChainConfig>,
203}
204
205impl<T> MessagePool<T>
206where
207 T: Provider,
208{
209 pub fn current_tipset(&self) -> Tipset {
211 self.cur_tipset.read().clone()
212 }
213
214 fn add_local(&self, m: SignedMessage) -> Result<(), Error> {
216 self.local_addrs.write().push(m.from());
217 self.local_msgs.write().insert(m);
218 Ok(())
219 }
220
221 pub async fn push(&self, msg: SignedMessage) -> Result<Cid, Error> {
224 self.check_message(&msg)?;
225 let cid = msg.cid();
226 let cur_ts = self.current_tipset();
227 let publish = self.add_tipset(msg.clone(), &cur_ts, true)?;
228 let msg_ser = to_vec(&msg)?;
229 let network_name = self.chain_config.network.genesis_name();
230 self.add_local(msg)?;
231 if publish {
232 self.network_sender
233 .send_async(NetworkMessage::PubsubMessage {
234 topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
235 message: msg_ser,
236 })
237 .await
238 .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
239 }
240 Ok(cid)
241 }
242
243 fn check_message(&self, msg: &SignedMessage) -> Result<(), Error> {
244 if to_vec(msg)?.len() > MAX_MESSAGE_SIZE {
245 return Err(Error::MessageTooBig);
246 }
247 valid_for_block_inclusion(msg.message(), Gas::new(0), NEWEST_NETWORK_VERSION)?;
248 if msg.value() > *crate::shim::econ::TOTAL_FILECOIN {
249 return Err(Error::MessageValueTooHigh);
250 }
251 if msg.gas_fee_cap().atto() < &MINIMUM_BASE_FEE.into() {
252 return Err(Error::GasFeeCapTooLow);
253 }
254 self.verify_msg_sig(msg)
255 }
256
257 pub fn add(&self, msg: SignedMessage) -> Result<(), Error> {
260 self.check_message(&msg)?;
261 let ts = self.current_tipset();
262 self.add_tipset(msg, &ts, false)?;
263 Ok(())
264 }
265
266 fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> {
270 let cid = msg.cid();
271
272 if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) {
273 return Ok(());
274 }
275
276 msg.verify(self.chain_config.eth_chain_id)
277 .map_err(|e| Error::Other(e.to_string()))?;
278
279 self.sig_val_cache.push(cid.into(), ());
280
281 Ok(())
282 }
283
284 fn add_tipset(&self, msg: SignedMessage, cur_ts: &Tipset, local: bool) -> Result<bool, Error> {
288 let sequence = self.get_state_sequence(&msg.from(), cur_ts)?;
289
290 if sequence > msg.message().sequence {
291 return Err(Error::SequenceTooLow);
292 }
293
294 let sender_actor = self.api.get_actor_after(&msg.message().from(), cur_ts)?;
295
296 let nv = self.chain_config.network_version(cur_ts.epoch() + 1);
298 let eth_chain_id = self.chain_config.eth_chain_id;
299 if msg.signature().signature_type() == SignatureType::Delegated
300 && !is_valid_eth_tx_for_sending(eth_chain_id, nv, &msg)
301 {
302 return Err(Error::Other(
303 "Invalid Ethereum message for the current network version".to_owned(),
304 ));
305 }
306 if !is_valid_for_sending(nv, &sender_actor) {
307 return Err(Error::Other(
308 "Sender actor is not a valid top-level sender".to_owned(),
309 ));
310 }
311
312 let publish = verify_msg_before_add(&msg, cur_ts, local, &self.chain_config)?;
313
314 let balance = self.get_state_balance(&msg.from(), cur_ts)?;
315
316 let msg_balance = msg.required_funds();
317 if balance < msg_balance {
318 return Err(Error::NotEnoughFunds);
319 }
320 self.add_helper(msg)?;
321 Ok(publish)
322 }
323
324 fn add_helper(&self, msg: SignedMessage) -> Result<(), Error> {
329 let from = msg.from();
330 let cur_ts = self.current_tipset();
331 add_helper(
332 self.api.as_ref(),
333 self.bls_sig_cache.as_ref(),
334 self.pending.as_ref(),
335 msg,
336 self.get_state_sequence(&from, &cur_ts)?,
337 )
338 }
339
340 pub fn get_sequence(&self, addr: &Address) -> Result<u64, Error> {
343 let cur_ts = self.current_tipset();
344
345 let sequence = self.get_state_sequence(addr, &cur_ts)?;
346
347 let pending = self.pending.read();
348
349 let msgset = pending.get(addr);
350 match msgset {
351 Some(mset) => {
352 if sequence > mset.next_sequence {
353 return Ok(sequence);
354 }
355 Ok(mset.next_sequence)
356 }
357 None => Ok(sequence),
358 }
359 }
360
361 fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error> {
363 let actor = self.api.get_actor_after(addr, cur_ts)?;
364 Ok(actor.sequence)
365 }
366
367 fn get_state_balance(&self, addr: &Address, ts: &Tipset) -> Result<TokenAmount, Error> {
370 let actor = self.api.get_actor_after(addr, ts)?;
371 Ok(TokenAmount::from(&actor.balance))
372 }
373
374 pub fn pending(&self) -> Result<(Vec<SignedMessage>, Tipset), Error> {
377 let mut out: Vec<SignedMessage> = Vec::new();
378 let pending = self.pending.read().clone();
379
380 for (addr, _) in pending {
381 out.append(
382 self.pending_for(&addr)
383 .ok_or(Error::InvalidFromAddr)?
384 .as_mut(),
385 )
386 }
387
388 let cur_ts = self.current_tipset();
389
390 Ok((out, cur_ts))
391 }
392
393 pub fn pending_for(&self, a: &Address) -> Option<Vec<SignedMessage>> {
397 let pending = self.pending.read();
398 let mset = pending.get(a)?;
399 if mset.msgs.is_empty() {
400 return None;
401 }
402
403 Some(
404 mset.msgs
405 .values()
406 .cloned()
407 .sorted_by_key(|v| v.message().sequence)
408 .collect(),
409 )
410 }
411
412 pub fn messages_for_blocks<'a>(
414 &self,
415 blks: impl Iterator<Item = &'a CachingBlockHeader>,
416 ) -> Result<Vec<SignedMessage>, Error> {
417 let mut msg_vec: Vec<SignedMessage> = Vec::new();
418
419 for block in blks {
420 let (umsg, mut smsgs) = self.api.messages_for_block(block)?;
421
422 msg_vec.append(smsgs.as_mut());
423 for msg in umsg {
424 let smsg = recover_sig(self.bls_sig_cache.as_ref(), msg)?;
425 msg_vec.push(smsg)
426 }
427 }
428 Ok(msg_vec)
429 }
430
431 pub fn load_local(&mut self) -> Result<(), Error> {
433 let mut local_msgs = self.local_msgs.write();
434 for k in local_msgs.iter().cloned().collect_vec() {
435 self.add(k.clone()).unwrap_or_else(|err| {
436 if err == Error::SequenceTooLow {
437 warn!("error adding message: {:?}", err);
438 local_msgs.remove(&k);
439 }
440 })
441 }
442
443 Ok(())
444 }
445
446 #[cfg(test)]
447 pub fn get_config(&self) -> &MpoolConfig {
448 &self.config
449 }
450
451 #[cfg(test)]
452 pub fn set_config<DB: SettingsStore>(
453 &mut self,
454 db: &DB,
455 cfg: MpoolConfig,
456 ) -> Result<(), Error> {
457 cfg.save_config(db)
458 .map_err(|e| Error::Other(e.to_string()))?;
459 self.config = cfg;
460 Ok(())
461 }
462}
463
464impl<T> MessagePool<T>
465where
466 T: Provider + Send + Sync + 'static,
467{
468 pub fn new(
470 api: T,
471 network_sender: flume::Sender<NetworkMessage>,
472 config: MpoolConfig,
473 chain_config: Arc<ChainConfig>,
474 services: &mut JoinSet<anyhow::Result<()>>,
475 ) -> Result<MessagePool<T>, Error>
476 where
477 T: Provider,
478 {
479 let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
480 let pending = Arc::new(SyncRwLock::new(HashMap::new()));
481 let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset()));
482 let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
483 "bls_sig".into(),
484 BLS_SIG_CACHE_SIZE,
485 ));
486 let sig_val_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
487 "sig_val".into(),
488 SIG_VAL_CACHE_SIZE,
489 ));
490 let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
491 let republished = Arc::new(SyncRwLock::new(HashSet::new()));
492 let block_delay = chain_config.block_delay_secs;
493
494 let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4);
495 let mut mp = MessagePool {
496 local_addrs,
497 pending,
498 cur_tipset: tipset,
499 api: Arc::new(api),
500 bls_sig_cache,
501 sig_val_cache,
502 local_msgs,
503 republished,
504 config,
505 network_sender,
506 repub_trigger,
507 chain_config: Arc::clone(&chain_config),
508 };
509
510 mp.load_local()?;
511
512 let mut subscriber = mp.api.subscribe_head_changes();
513
514 let api = mp.api.clone();
515 let bls_sig_cache = mp.bls_sig_cache.clone();
516 let pending = mp.pending.clone();
517 let republished = mp.republished.clone();
518
519 let cur_tipset = mp.cur_tipset.clone();
520 let repub_trigger = mp.repub_trigger.clone();
521
522 services.spawn(async move {
524 loop {
525 match subscriber.recv().await {
526 Ok(ts) => {
527 let (cur, rev, app) = match ts {
528 HeadChange::Apply(tipset) => {
529 (cur_tipset.clone(), Vec::new(), vec![tipset])
530 }
531 };
532 head_change(
533 api.as_ref(),
534 bls_sig_cache.as_ref(),
535 repub_trigger.clone(),
536 republished.as_ref(),
537 pending.as_ref(),
538 cur.as_ref(),
539 rev,
540 app,
541 )
542 .await
543 .context("Error changing head")?;
544 }
545 Err(RecvError::Lagged(e)) => {
546 warn!("Head change subscriber lagged: skipping {} events", e);
547 }
548 Err(RecvError::Closed) => {
549 break Ok(());
550 }
551 }
552 }
553 });
554
555 let api = mp.api.clone();
556 let pending = mp.pending.clone();
557 let cur_tipset = mp.cur_tipset.clone();
558 let republished = mp.republished.clone();
559 let local_addrs = mp.local_addrs.clone();
560 let network_sender = Arc::new(mp.network_sender.clone());
561 let republish_interval = (10 * block_delay + chain_config.propagation_delay_secs) as u64;
562 services.spawn(async move {
564 let mut repub_trigger_rx = repub_trigger_rx.stream();
565 let mut interval = interval(Duration::from_secs(republish_interval));
566 loop {
567 tokio::select! {
568 _ = interval.tick() => (),
569 _ = repub_trigger_rx.next() => (),
570 }
571 if let Err(e) = republish_pending_messages(
572 api.as_ref(),
573 network_sender.as_ref(),
574 pending.as_ref(),
575 cur_tipset.as_ref(),
576 republished.as_ref(),
577 local_addrs.as_ref(),
578 &chain_config,
579 )
580 .await
581 {
582 warn!("Failed to republish pending messages: {}", e.to_string());
583 }
584 }
585 });
586 Ok(mp)
587 }
588}
589
590pub(in crate::message_pool) fn add_helper<T>(
597 api: &T,
598 bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
599 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
600 msg: SignedMessage,
601 sequence: u64,
602) -> Result<(), Error>
603where
604 T: Provider,
605{
606 if msg.signature().signature_type() == SignatureType::Bls {
607 bls_sig_cache.push(msg.cid().into(), msg.signature().clone());
608 }
609
610 api.put_message(&ChainMessage::Signed(msg.clone()))?;
611 api.put_message(&ChainMessage::Unsigned(msg.message().clone()))?;
612
613 let mut pending = pending.write();
614 let msett = pending.get_mut(&msg.from());
615 match msett {
616 Some(mset) => mset.add_trusted(api, msg)?,
617 None => {
618 let mut mset = MsgSet::new(sequence);
619 let from = msg.from();
620 mset.add_trusted(api, msg)?;
621 pending.insert(from, mset);
622 }
623 }
624
625 Ok(())
626}
627
628fn verify_msg_before_add(
629 m: &SignedMessage,
630 cur_ts: &Tipset,
631 local: bool,
632 chain_config: &ChainConfig,
633) -> Result<bool, Error> {
634 let epoch = cur_ts.epoch();
635 let min_gas = price_list_by_network_version(chain_config.network_version(epoch))
636 .on_chain_message(m.chain_length()?);
637 valid_for_block_inclusion(m.message(), min_gas.total(), NEWEST_NETWORK_VERSION)?;
638 if !cur_ts.block_headers().is_empty() {
639 let base_fee = &cur_ts.block_headers().first().parent_base_fee;
640 let base_fee_lower_bound =
641 get_base_fee_lower_bound(base_fee, BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE);
642 if m.gas_fee_cap() < base_fee_lower_bound {
643 if local {
644 warn!(
645 "local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound: {})",
646 m.gas_fee_cap(),
647 base_fee_lower_bound
648 );
649 return Ok(false);
650 }
651 return Err(Error::SoftValidationFailure(format!(
652 "GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound:{})",
653 m.gas_fee_cap(),
654 base_fee_lower_bound
655 )));
656 }
657 }
658 Ok(local)
659}
660
661pub fn remove(
663 from: &Address,
664 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
665 sequence: u64,
666 applied: bool,
667) -> Result<(), Error> {
668 let mut pending = pending.write();
669 let mset = if let Some(mset) = pending.get_mut(from) {
670 mset
671 } else {
672 return Ok(());
673 };
674
675 mset.rm(sequence, applied);
676
677 if mset.msgs.is_empty() {
678 pending.remove(from);
679 }
680
681 Ok(())
682}
683
684#[cfg(test)]
685mod tests {
686 use crate::message_pool::test_provider::TestApi;
687
688 use super::*;
689 use crate::shim::message::Message as ShimMessage;
690
691 #[test]
694 fn add_helper_message_gas_limit_test() {
695 let api = TestApi::default();
696 let bls_sig_cache = SizeTrackingLruCache::new_mocked();
697 let pending = SyncRwLock::new(HashMap::new());
698 let message = ShimMessage {
699 gas_limit: 666_666_666,
700 ..ShimMessage::default()
701 };
702 let msg = SignedMessage::mock_bls_signed_message(message);
703 let sequence = msg.message().sequence;
704 let res = add_helper(&api, &bls_sig_cache, &pending, msg, sequence);
705 assert!(res.is_ok());
706 }
707
708 #[test]
711 fn test_rbf_at_capacity() {
712 use crate::shim::econ::TokenAmount;
713
714 let api = TestApi::with_max_actor_pending_messages(10);
715 let mut mset = MsgSet::new(0);
716
717 for i in 0..10 {
719 let message = ShimMessage {
720 sequence: i,
721 gas_premium: TokenAmount::from_atto(100u64),
722 ..ShimMessage::default()
723 };
724 let msg = SignedMessage::mock_bls_signed_message(message);
725 let res = mset.add_trusted(&api, msg);
726 assert!(res.is_ok(), "Failed to add message {}: {:?}", i, res);
727 }
728
729 let message_new = ShimMessage {
731 sequence: 10,
732 gas_premium: TokenAmount::from_atto(100u64),
733 ..ShimMessage::default()
734 };
735 let msg_new = SignedMessage::mock_bls_signed_message(message_new);
736 let res_new = mset.add_trusted(&api, msg_new);
737 assert!(matches!(res_new, Err(Error::TooManyPendingMessages(_, _))));
738
739 let message_rbf = ShimMessage {
742 sequence: 5,
743 gas_premium: TokenAmount::from_atto(200u64),
744 ..ShimMessage::default()
745 };
746 let msg_rbf = SignedMessage::mock_bls_signed_message(message_rbf);
747 let res_rbf = mset.add_trusted(&api, msg_rbf);
748 assert!(
749 res_rbf.is_ok(),
750 "RBF should be allowed at capacity: {:?}",
751 res_rbf
752 );
753 }
754}