1use std::{num::NonZeroUsize, sync::Arc, time::Duration};
10
11use crate::blocks::{CachingBlockHeader, Tipset};
12use crate::chain::{HeadChange, MINIMUM_BASE_FEE};
13#[cfg(test)]
14use crate::db::SettingsStore;
15use crate::eth::is_valid_eth_tx_for_sending;
16use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
17use crate::message::{ChainMessage, Message, SignedMessage, valid_for_block_inclusion};
18use crate::networks::{ChainConfig, NEWEST_NETWORK_VERSION};
19use crate::shim::{
20 address::Address,
21 crypto::{Signature, SignatureType},
22 econ::TokenAmount,
23 gas::{Gas, price_list_by_network_version},
24};
25use crate::state_manager::utils::is_valid_for_sending;
26use crate::utils::cache::SizeTrackingLruCache;
27use crate::utils::get_size::CidWrapper;
28use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
29use anyhow::Context as _;
30use cid::Cid;
31use futures::StreamExt;
32use fvm_ipld_encoding::to_vec;
33use itertools::Itertools;
34use nonzero_ext::nonzero;
35use parking_lot::RwLock as SyncRwLock;
36use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
37use tracing::warn;
38
39use crate::message_pool::{
40 config::MpoolConfig,
41 errors::Error,
42 head_change, metrics,
43 msgpool::{
44 BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, RBF_DENOM, RBF_NUM, recover_sig,
45 republish_pending_messages,
46 },
47 provider::Provider,
48 utils::get_base_fee_lower_bound,
49};
50
51const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize);
53const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize);
54
55pub const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000;
56pub const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10;
57const MAX_MESSAGE_SIZE: usize = 64 << 10; #[derive(Clone, Copy, Debug, PartialEq, Eq)]
64pub enum TrustPolicy {
65 Trusted,
66 Untrusted,
67}
68
69#[derive(Clone, Default, Debug)]
72pub struct MsgSet {
73 pub(in crate::message_pool) msgs: HashMap<u64, SignedMessage>,
74 next_sequence: u64,
75}
76
77impl MsgSet {
78 pub fn new(sequence: u64) -> Self {
81 MsgSet {
82 msgs: HashMap::new(),
83 next_sequence: sequence,
84 }
85 }
86
87 pub fn add_trusted<T>(&mut self, api: &T, m: SignedMessage) -> Result<(), Error>
91 where
92 T: Provider,
93 {
94 self.add(api, m, true)
95 }
96
97 pub fn add_untrusted<T>(&mut self, api: &T, m: SignedMessage) -> Result<(), Error>
101 where
102 T: Provider,
103 {
104 self.add(api, m, false)
105 }
106
107 fn add<T>(&mut self, api: &T, m: SignedMessage, trusted: bool) -> Result<(), Error>
108 where
109 T: Provider,
110 {
111 let max_actor_pending_messages = if trusted {
112 api.max_actor_pending_messages()
113 } else {
114 api.max_untrusted_actor_pending_messages()
115 };
116
117 if self.msgs.is_empty() || m.sequence() >= self.next_sequence {
118 self.next_sequence = m.sequence() + 1;
119 }
120
121 let has_existing = if let Some(exms) = self.msgs.get(&m.sequence()) {
122 if m.cid() != exms.cid() {
123 let premium = &exms.message().gas_premium;
124 let min_price = premium.clone()
125 + ((premium * RBF_NUM).div_floor(RBF_DENOM))
126 + TokenAmount::from_atto(1u8);
127 if m.message().gas_premium <= min_price {
128 return Err(Error::GasPriceTooLow);
129 }
130 } else {
131 return Err(Error::DuplicateSequence);
132 }
133 true
134 } else {
135 false
136 };
137
138 if !has_existing && self.msgs.len() as u64 >= max_actor_pending_messages {
140 return Err(Error::TooManyPendingMessages(
141 m.message.from().to_string(),
142 trusted,
143 ));
144 }
145 if self.msgs.insert(m.sequence(), m).is_none() {
146 metrics::MPOOL_MESSAGE_TOTAL.inc();
147 }
148 Ok(())
149 }
150
151 pub fn rm(&mut self, sequence: u64, applied: bool) {
154 if self.msgs.remove(&sequence).is_none() {
155 if applied && sequence >= self.next_sequence {
156 self.next_sequence = sequence + 1;
157 while self.msgs.contains_key(&self.next_sequence) {
158 self.next_sequence += 1;
159 }
160 }
161 return;
162 }
163 metrics::MPOOL_MESSAGE_TOTAL.dec();
164
165 if applied {
167 if sequence >= self.next_sequence {
170 self.next_sequence = sequence + 1;
171 }
172 return;
173 }
174 if sequence < self.next_sequence {
177 self.next_sequence = sequence;
178 }
179 }
180}
181
182pub struct MessagePool<T> {
186 local_addrs: Arc<SyncRwLock<Vec<Address>>>,
188 pub pending: Arc<SyncRwLock<HashMap<Address, MsgSet>>>,
190 pub cur_tipset: Arc<SyncRwLock<Tipset>>,
192 pub api: Arc<T>,
194 pub network_sender: flume::Sender<NetworkMessage>,
196 pub bls_sig_cache: Arc<SizeTrackingLruCache<CidWrapper, Signature>>,
198 pub sig_val_cache: Arc<SizeTrackingLruCache<CidWrapper, ()>>,
200 pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
202 pub repub_trigger: flume::Sender<()>,
205 local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
206 pub config: MpoolConfig,
208 pub chain_config: Arc<ChainConfig>,
210}
211
212impl<T> MessagePool<T>
213where
214 T: Provider,
215{
216 pub fn current_tipset(&self) -> Tipset {
218 self.cur_tipset.read().clone()
219 }
220
221 fn add_local(&self, m: SignedMessage) -> Result<(), Error> {
223 self.local_addrs.write().push(m.from());
224 self.local_msgs.write().insert(m);
225 Ok(())
226 }
227
228 pub async fn push_internal(
231 &self,
232 msg: SignedMessage,
233 trust_policy: TrustPolicy,
234 ) -> Result<Cid, Error> {
235 self.check_message(&msg)?;
236 let cid = msg.cid();
237 let cur_ts = self.current_tipset();
238 let publish = self.add_tipset(msg.clone(), &cur_ts, true, trust_policy)?;
239 let msg_ser = to_vec(&msg)?;
240 let network_name = self.chain_config.network.genesis_name();
241 self.add_local(msg)?;
242 if publish {
243 self.network_sender
244 .send_async(NetworkMessage::PubsubMessage {
245 topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
246 message: msg_ser,
247 })
248 .await
249 .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
250 }
251 Ok(cid)
252 }
253
254 pub async fn push(&self, msg: SignedMessage) -> Result<Cid, Error> {
256 self.push_internal(msg, TrustPolicy::Trusted).await
257 }
258
259 pub async fn push_untrusted(&self, msg: SignedMessage) -> Result<Cid, Error> {
261 self.push_internal(msg, TrustPolicy::Untrusted).await
262 }
263
264 fn check_message(&self, msg: &SignedMessage) -> Result<(), Error> {
265 if to_vec(msg)?.len() > MAX_MESSAGE_SIZE {
266 return Err(Error::MessageTooBig);
267 }
268 valid_for_block_inclusion(msg.message(), Gas::new(0), NEWEST_NETWORK_VERSION)?;
269 if msg.value() > *crate::shim::econ::TOTAL_FILECOIN {
270 return Err(Error::MessageValueTooHigh);
271 }
272 if msg.gas_fee_cap().atto() < &MINIMUM_BASE_FEE.into() {
273 return Err(Error::GasFeeCapTooLow);
274 }
275 self.verify_msg_sig(msg)
276 }
277
278 pub fn add(&self, msg: SignedMessage) -> Result<(), Error> {
281 self.check_message(&msg)?;
282 let ts = self.current_tipset();
283 self.add_tipset(msg, &ts, false, TrustPolicy::Trusted)?;
284 Ok(())
285 }
286
287 fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> {
291 let cid = msg.cid();
292
293 if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) {
294 return Ok(());
295 }
296
297 msg.verify(self.chain_config.eth_chain_id)
298 .map_err(|e| Error::Other(e.to_string()))?;
299
300 self.sig_val_cache.push(cid.into(), ());
301
302 Ok(())
303 }
304
305 fn add_tipset(
309 &self,
310 msg: SignedMessage,
311 cur_ts: &Tipset,
312 local: bool,
313 trust_policy: TrustPolicy,
314 ) -> Result<bool, Error> {
315 let sequence = self.get_state_sequence(&msg.from(), cur_ts)?;
316
317 if sequence > msg.message().sequence {
318 return Err(Error::SequenceTooLow);
319 }
320
321 let sender_actor = self.api.get_actor_after(&msg.message().from(), cur_ts)?;
322
323 let nv = self.chain_config.network_version(cur_ts.epoch() + 1);
325 let eth_chain_id = self.chain_config.eth_chain_id;
326 if msg.signature().signature_type() == SignatureType::Delegated
327 && !is_valid_eth_tx_for_sending(eth_chain_id, nv, &msg)
328 {
329 return Err(Error::Other(
330 "Invalid Ethereum message for the current network version".to_owned(),
331 ));
332 }
333 if !is_valid_for_sending(nv, &sender_actor) {
334 return Err(Error::Other(
335 "Sender actor is not a valid top-level sender".to_owned(),
336 ));
337 }
338
339 let publish = verify_msg_before_add(&msg, cur_ts, local, &self.chain_config)?;
340
341 let balance = self.get_state_balance(&msg.from(), cur_ts)?;
342
343 let msg_balance = msg.required_funds();
344 if balance < msg_balance {
345 return Err(Error::NotEnoughFunds);
346 }
347 self.add_helper(msg, trust_policy)?;
348 Ok(publish)
349 }
350
351 fn add_helper(&self, msg: SignedMessage, trust_policy: TrustPolicy) -> Result<(), Error> {
356 let from = msg.from();
357 let cur_ts = self.current_tipset();
358 add_helper(
359 self.api.as_ref(),
360 self.bls_sig_cache.as_ref(),
361 self.pending.as_ref(),
362 msg,
363 self.get_state_sequence(&from, &cur_ts)?,
364 trust_policy,
365 )
366 }
367
368 pub fn get_sequence(&self, addr: &Address) -> Result<u64, Error> {
371 let cur_ts = self.current_tipset();
372
373 let sequence = self.get_state_sequence(addr, &cur_ts)?;
374
375 let pending = self.pending.read();
376
377 let msgset = pending.get(addr);
378 match msgset {
379 Some(mset) => {
380 if sequence > mset.next_sequence {
381 return Ok(sequence);
382 }
383 Ok(mset.next_sequence)
384 }
385 None => Ok(sequence),
386 }
387 }
388
389 fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error> {
391 let actor = self.api.get_actor_after(addr, cur_ts)?;
392 Ok(actor.sequence)
393 }
394
395 fn get_state_balance(&self, addr: &Address, ts: &Tipset) -> Result<TokenAmount, Error> {
398 let actor = self.api.get_actor_after(addr, ts)?;
399 Ok(TokenAmount::from(&actor.balance))
400 }
401
402 pub fn pending(&self) -> Result<(Vec<SignedMessage>, Tipset), Error> {
405 let mut out: Vec<SignedMessage> = Vec::new();
406 let pending = self.pending.read().clone();
407
408 for (addr, _) in pending {
409 out.append(
410 self.pending_for(&addr)
411 .ok_or(Error::InvalidFromAddr)?
412 .as_mut(),
413 )
414 }
415
416 let cur_ts = self.current_tipset();
417
418 Ok((out, cur_ts))
419 }
420
421 pub fn pending_for(&self, a: &Address) -> Option<Vec<SignedMessage>> {
425 let pending = self.pending.read();
426 let mset = pending.get(a)?;
427 if mset.msgs.is_empty() {
428 return None;
429 }
430
431 Some(
432 mset.msgs
433 .values()
434 .cloned()
435 .sorted_by_key(|v| v.message().sequence)
436 .collect(),
437 )
438 }
439
440 pub fn messages_for_blocks<'a>(
442 &self,
443 blks: impl Iterator<Item = &'a CachingBlockHeader>,
444 ) -> Result<Vec<SignedMessage>, Error> {
445 let mut msg_vec: Vec<SignedMessage> = Vec::new();
446
447 for block in blks {
448 let (umsg, mut smsgs) = self.api.messages_for_block(block)?;
449
450 msg_vec.append(smsgs.as_mut());
451 for msg in umsg {
452 let smsg = recover_sig(self.bls_sig_cache.as_ref(), msg)?;
453 msg_vec.push(smsg)
454 }
455 }
456 Ok(msg_vec)
457 }
458
459 pub fn load_local(&mut self) -> Result<(), Error> {
461 let mut local_msgs = self.local_msgs.write();
462 for k in local_msgs.iter().cloned().collect_vec() {
463 self.add(k.clone()).unwrap_or_else(|err| {
464 if err == Error::SequenceTooLow {
465 warn!("error adding message: {:?}", err);
466 local_msgs.remove(&k);
467 }
468 })
469 }
470
471 Ok(())
472 }
473
474 #[cfg(test)]
475 pub fn get_config(&self) -> &MpoolConfig {
476 &self.config
477 }
478
479 #[cfg(test)]
480 pub fn set_config<DB: SettingsStore>(
481 &mut self,
482 db: &DB,
483 cfg: MpoolConfig,
484 ) -> Result<(), Error> {
485 cfg.save_config(db)
486 .map_err(|e| Error::Other(e.to_string()))?;
487 self.config = cfg;
488 Ok(())
489 }
490}
491
492impl<T> MessagePool<T>
493where
494 T: Provider + Send + Sync + 'static,
495{
496 pub fn new(
498 api: T,
499 network_sender: flume::Sender<NetworkMessage>,
500 config: MpoolConfig,
501 chain_config: Arc<ChainConfig>,
502 services: &mut JoinSet<anyhow::Result<()>>,
503 ) -> Result<MessagePool<T>, Error>
504 where
505 T: Provider,
506 {
507 let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
508 let pending = Arc::new(SyncRwLock::new(HashMap::new()));
509 let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset()));
510 let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
511 "bls_sig".into(),
512 BLS_SIG_CACHE_SIZE,
513 ));
514 let sig_val_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
515 "sig_val".into(),
516 SIG_VAL_CACHE_SIZE,
517 ));
518 let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
519 let republished = Arc::new(SyncRwLock::new(HashSet::new()));
520 let block_delay = chain_config.block_delay_secs;
521
522 let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4);
523 let mut mp = MessagePool {
524 local_addrs,
525 pending,
526 cur_tipset: tipset,
527 api: Arc::new(api),
528 bls_sig_cache,
529 sig_val_cache,
530 local_msgs,
531 republished,
532 config,
533 network_sender,
534 repub_trigger,
535 chain_config: Arc::clone(&chain_config),
536 };
537
538 mp.load_local()?;
539
540 let mut subscriber = mp.api.subscribe_head_changes();
541
542 let api = mp.api.clone();
543 let bls_sig_cache = mp.bls_sig_cache.clone();
544 let pending = mp.pending.clone();
545 let republished = mp.republished.clone();
546
547 let cur_tipset = mp.cur_tipset.clone();
548 let repub_trigger = mp.repub_trigger.clone();
549
550 services.spawn(async move {
552 loop {
553 match subscriber.recv().await {
554 Ok(ts) => {
555 let (cur, rev, app) = match ts {
556 HeadChange::Apply(tipset) => {
557 (cur_tipset.clone(), Vec::new(), vec![tipset])
558 }
559 };
560 head_change(
561 api.as_ref(),
562 bls_sig_cache.as_ref(),
563 repub_trigger.clone(),
564 republished.as_ref(),
565 pending.as_ref(),
566 cur.as_ref(),
567 rev,
568 app,
569 )
570 .await
571 .context("Error changing head")?;
572 }
573 Err(RecvError::Lagged(e)) => {
574 warn!("Head change subscriber lagged: skipping {} events", e);
575 }
576 Err(RecvError::Closed) => {
577 break Ok(());
578 }
579 }
580 }
581 });
582
583 let api = mp.api.clone();
584 let pending = mp.pending.clone();
585 let cur_tipset = mp.cur_tipset.clone();
586 let republished = mp.republished.clone();
587 let local_addrs = mp.local_addrs.clone();
588 let network_sender = Arc::new(mp.network_sender.clone());
589 let republish_interval = (10 * block_delay + chain_config.propagation_delay_secs) as u64;
590 services.spawn(async move {
592 let mut repub_trigger_rx = repub_trigger_rx.stream();
593 let mut interval = interval(Duration::from_secs(republish_interval));
594 loop {
595 tokio::select! {
596 _ = interval.tick() => (),
597 _ = repub_trigger_rx.next() => (),
598 }
599 if let Err(e) = republish_pending_messages(
600 api.as_ref(),
601 network_sender.as_ref(),
602 pending.as_ref(),
603 cur_tipset.as_ref(),
604 republished.as_ref(),
605 local_addrs.as_ref(),
606 &chain_config,
607 )
608 .await
609 {
610 warn!("Failed to republish pending messages: {}", e.to_string());
611 }
612 }
613 });
614 Ok(mp)
615 }
616}
617
618pub(in crate::message_pool) fn add_helper<T>(
625 api: &T,
626 bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
627 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
628 msg: SignedMessage,
629 sequence: u64,
630 trust_policy: TrustPolicy,
631) -> Result<(), Error>
632where
633 T: Provider,
634{
635 if msg.signature().signature_type() == SignatureType::Bls {
636 bls_sig_cache.push(msg.cid().into(), msg.signature().clone());
637 }
638
639 api.put_message(&ChainMessage::Signed(msg.clone()))?;
640 api.put_message(&ChainMessage::Unsigned(msg.message().clone()))?;
641
642 let mut pending = pending.write();
643 let from = msg.from();
644 let mset = pending.entry(from).or_insert_with(|| MsgSet::new(sequence));
645 match trust_policy {
646 TrustPolicy::Untrusted => mset.add_untrusted(api, msg)?,
647 TrustPolicy::Trusted => mset.add_trusted(api, msg)?,
648 }
649
650 Ok(())
651}
652
653fn verify_msg_before_add(
654 m: &SignedMessage,
655 cur_ts: &Tipset,
656 local: bool,
657 chain_config: &ChainConfig,
658) -> Result<bool, Error> {
659 let epoch = cur_ts.epoch();
660 let min_gas = price_list_by_network_version(chain_config.network_version(epoch))
661 .on_chain_message(m.chain_length()?);
662 valid_for_block_inclusion(m.message(), min_gas.total(), NEWEST_NETWORK_VERSION)?;
663 if !cur_ts.block_headers().is_empty() {
664 let base_fee = &cur_ts.block_headers().first().parent_base_fee;
665 let base_fee_lower_bound =
666 get_base_fee_lower_bound(base_fee, BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE);
667 if m.gas_fee_cap() < base_fee_lower_bound {
668 if local {
669 warn!(
670 "local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound: {})",
671 m.gas_fee_cap(),
672 base_fee_lower_bound
673 );
674 return Ok(false);
675 }
676 return Err(Error::SoftValidationFailure(format!(
677 "GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound:{})",
678 m.gas_fee_cap(),
679 base_fee_lower_bound
680 )));
681 }
682 }
683 Ok(local)
684}
685
686pub fn remove(
688 from: &Address,
689 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
690 sequence: u64,
691 applied: bool,
692) -> Result<(), Error> {
693 let mut pending = pending.write();
694 let mset = if let Some(mset) = pending.get_mut(from) {
695 mset
696 } else {
697 return Ok(());
698 };
699
700 mset.rm(sequence, applied);
701
702 if mset.msgs.is_empty() {
703 pending.remove(from);
704 }
705
706 Ok(())
707}
708
709#[cfg(test)]
710mod tests {
711 use crate::message_pool::test_provider::TestApi;
712
713 use super::*;
714 use crate::shim::message::Message as ShimMessage;
715
716 #[test]
719 fn add_helper_message_gas_limit_test() {
720 let api = TestApi::default();
721 let bls_sig_cache = SizeTrackingLruCache::new_mocked();
722 let pending = SyncRwLock::new(HashMap::new());
723 let message = ShimMessage {
724 gas_limit: 666_666_666,
725 ..ShimMessage::default()
726 };
727 let msg = SignedMessage::mock_bls_signed_message(message);
728 let sequence = msg.message().sequence;
729 let res = add_helper(
730 &api,
731 &bls_sig_cache,
732 &pending,
733 msg,
734 sequence,
735 TrustPolicy::Trusted,
736 );
737 assert!(res.is_ok());
738 }
739
740 #[test]
743 fn test_rbf_at_capacity() {
744 use crate::shim::econ::TokenAmount;
745
746 let api = TestApi::with_max_actor_pending_messages(10);
747 let mut mset = MsgSet::new(0);
748
749 for i in 0..10 {
751 let message = ShimMessage {
752 sequence: i,
753 gas_premium: TokenAmount::from_atto(100u64),
754 ..ShimMessage::default()
755 };
756 let msg = SignedMessage::mock_bls_signed_message(message);
757 let res = mset.add_trusted(&api, msg);
758 assert!(res.is_ok(), "Failed to add message {}: {:?}", i, res);
759 }
760
761 let message_new = ShimMessage {
763 sequence: 10,
764 gas_premium: TokenAmount::from_atto(100u64),
765 ..ShimMessage::default()
766 };
767 let msg_new = SignedMessage::mock_bls_signed_message(message_new);
768 let res_new = mset.add_trusted(&api, msg_new);
769 assert!(matches!(res_new, Err(Error::TooManyPendingMessages(_, _))));
770
771 let message_rbf = ShimMessage {
774 sequence: 5,
775 gas_premium: TokenAmount::from_atto(200u64),
776 ..ShimMessage::default()
777 };
778 let msg_rbf = SignedMessage::mock_bls_signed_message(message_rbf);
779 let res_rbf = mset.add_trusted(&api, msg_rbf);
780 assert!(
781 res_rbf.is_ok(),
782 "RBF should be allowed at capacity: {:?}",
783 res_rbf
784 );
785 }
786}