1use std::{num::NonZeroUsize, sync::Arc, time::Duration};
10
11use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey};
12use crate::chain::{HeadChanges, MINIMUM_BASE_FEE};
13#[cfg(test)]
14use crate::db::SettingsStore;
15use crate::eth::is_valid_eth_tx_for_sending;
16use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
17use crate::message::{ChainMessage, MessageRead as _, SignedMessage, valid_for_block_inclusion};
18use crate::networks::{ChainConfig, NEWEST_NETWORK_VERSION};
19use crate::rpc::eth::types::EthAddress;
20use crate::shim::{
21 address::{Address, Protocol},
22 crypto::{Signature, SignatureType},
23 econ::TokenAmount,
24 gas::{Gas, price_list_by_network_version},
25};
26use crate::state_manager::IdToAddressCache;
27use crate::state_manager::utils::is_valid_for_sending;
28use crate::utils::ShallowClone as _;
29use crate::utils::cache::SizeTrackingLruCache;
30use crate::utils::get_size::CidWrapper;
31use ahash::{HashSet, HashSetExt};
32use anyhow::Context as _;
33use cid::Cid;
34use futures::StreamExt;
35use fvm_ipld_encoding::to_vec;
36use get_size2::GetSize;
37use itertools::Itertools;
38use nonzero_ext::nonzero;
39use parking_lot::RwLock as SyncRwLock;
40use tokio::{
41 sync::broadcast::{self, error::RecvError},
42 task::JoinSet,
43 time::interval,
44};
45use tracing::warn;
46
47use crate::message_pool::{
48 config::MpoolConfig,
49 errors::Error,
50 head_change,
51 msgpool::{
52 BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolUpdate, pending_store::PendingStore,
53 recover_sig, republish_pending_messages,
54 },
55 provider::Provider,
56 utils::get_base_fee_lower_bound,
57};
58
59const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize);
61const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize);
62const KEY_CACHE_SIZE: NonZeroUsize = nonzero!(1_048_576usize);
63const STATE_NONCE_CACHE_SIZE: NonZeroUsize = nonzero!(32768usize);
64
65#[derive(Clone, Debug, Hash, PartialEq, Eq, GetSize)]
66pub(crate) struct StateNonceCacheKey {
67 tipset_key: TipsetKey,
68 addr: Address,
69}
70
71pub const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000;
72pub const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10;
73const MAX_MESSAGE_SIZE: usize = 64 << 10; #[derive(Clone, Copy, Debug, PartialEq, Eq)]
80pub enum TrustPolicy {
81 Trusted,
82 Untrusted,
83}
84
85pub use super::msg_set::{MsgSetLimits, StrictnessPolicy};
86
87pub struct MessagePool<T> {
91 local_addrs: Arc<SyncRwLock<Vec<Address>>>,
93 pub(in crate::message_pool) pending_store: PendingStore,
96 pub cur_tipset: Arc<SyncRwLock<Tipset>>,
98 pub api: Arc<T>,
100 pub network_sender: flume::Sender<NetworkMessage>,
102 pub bls_sig_cache: SizeTrackingLruCache<CidWrapper, Signature>,
104 pub sig_val_cache: SizeTrackingLruCache<CidWrapper, ()>,
106 pub key_cache: IdToAddressCache,
108 pub state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64>,
110 pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
112 pub repub_trigger: flume::Sender<()>,
115 local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
116 pub config: MpoolConfig,
118 pub chain_config: Arc<ChainConfig>,
120}
121
122pub(in crate::message_pool) fn resolve_to_key<T: Provider>(
125 api: &T,
126 key_cache: &IdToAddressCache,
127 addr: &Address,
128 cur_ts: &Tipset,
129) -> Result<Address, Error> {
130 let id = addr.id().ok();
131 if let Some(id) = &id
132 && let Some(resolved) = key_cache.get_cloned(id)
133 {
134 return Ok(resolved);
135 }
136 let resolved = api.resolve_to_deterministic_address_at_finality(addr, cur_ts)?;
137 if let Some(id) = id {
138 key_cache.push(id, resolved);
139 }
140 Ok(resolved)
141}
142
143pub(in crate::message_pool) fn get_state_sequence<T: Provider>(
145 api: &T,
146 key_cache: &IdToAddressCache,
147 state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
148 addr: &Address,
149 cur_ts: &Tipset,
150) -> Result<u64, Error> {
151 let nk = StateNonceCacheKey {
152 tipset_key: cur_ts.key().clone(),
153 addr: *addr,
154 };
155
156 if let Some(cached) = state_nonce_cache.get_cloned(&nk) {
157 return Ok(cached);
158 }
159
160 let actor = api.get_actor_after(addr, cur_ts)?;
161 let mut next_nonce = actor.sequence;
162
163 if let (Ok(resolved), Ok(messages)) = (
164 resolve_to_key(api, key_cache, addr, cur_ts)
165 .inspect_err(|e| tracing::warn!(%addr, "failed to resolve address to key: {e:#}")),
166 api.messages_for_tipset(cur_ts)
167 .inspect_err(|e| tracing::warn!("failed to get messages for tipset: {e:#}")),
168 ) {
169 for msg in messages.iter() {
170 if let Ok(from) = resolve_to_key(api, key_cache, &msg.from(), cur_ts).inspect_err(
171 |e| tracing::warn!(from = %msg.from(), "failed to resolve message sender: {e:#}"),
172 ) && from == resolved
173 {
174 let n = msg.sequence() + 1;
175 if n > next_nonce {
176 next_nonce = n;
177 }
178 }
179 }
180 }
181
182 state_nonce_cache.push(nk, next_nonce);
183 Ok(next_nonce)
184}
185
186impl<T> MessagePool<T>
187where
188 T: Provider,
189{
190 pub fn current_tipset(&self) -> Tipset {
192 self.cur_tipset.read().clone()
193 }
194
195 pub fn resolve_to_key(&self, addr: &Address, cur_ts: &Tipset) -> Result<Address, Error> {
196 resolve_to_key(self.api.as_ref(), &self.key_cache, addr, cur_ts)
197 }
198
199 fn add_local(&self, m: SignedMessage) -> Result<(), Error> {
201 let cur_ts = self.current_tipset();
202 let resolved = self.resolve_to_key(&m.from(), &cur_ts)?;
203 self.local_addrs.write().push(resolved);
204 self.local_msgs.write().insert(m);
205 Ok(())
206 }
207
208 pub async fn push_internal(
211 &self,
212 msg: SignedMessage,
213 trust_policy: TrustPolicy,
214 ) -> Result<Cid, Error> {
215 self.check_message(&msg)?;
216 let cid = msg.cid();
217 let cur_ts = self.current_tipset();
218 let publish = self.add_tipset(msg.clone(), &cur_ts, true, trust_policy)?;
219 let msg_ser = to_vec(&msg)?;
220 let network_name = self.chain_config.network.genesis_name();
221 self.add_local(msg)?;
222 if publish {
223 self.network_sender
224 .send_async(NetworkMessage::PubsubMessage {
225 topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
226 message: msg_ser,
227 })
228 .await
229 .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
230 }
231 Ok(cid)
232 }
233
234 pub async fn push(&self, msg: SignedMessage) -> Result<Cid, Error> {
236 self.push_internal(msg, TrustPolicy::Trusted).await
237 }
238
239 pub async fn push_untrusted(&self, msg: SignedMessage) -> Result<Cid, Error> {
241 self.push_internal(msg, TrustPolicy::Untrusted).await
242 }
243
244 fn check_message(&self, msg: &SignedMessage) -> Result<(), Error> {
245 if to_vec(msg)?.len() > MAX_MESSAGE_SIZE {
246 return Err(Error::MessageTooBig);
247 }
248 let to = msg.message().to();
249 if to.protocol() == Protocol::Delegated {
250 EthAddress::from_filecoin_address(&to).context(format!(
251 "message recipient {to} is a delegated address but not a valid Eth Address"
252 ))?;
253 }
254 valid_for_block_inclusion(msg.message(), Gas::new(0), NEWEST_NETWORK_VERSION)?;
255 if msg.value() > *crate::shim::econ::TOTAL_FILECOIN {
256 return Err(Error::MessageValueTooHigh);
257 }
258 if msg.gas_fee_cap().atto() < &MINIMUM_BASE_FEE.into() {
259 return Err(Error::GasFeeCapTooLow);
260 }
261 self.verify_msg_sig(msg)
262 }
263
264 pub fn add(&self, msg: SignedMessage) -> Result<(), Error> {
267 self.check_message(&msg)?;
268 let ts = self.current_tipset();
269 self.add_tipset(msg, &ts, false, TrustPolicy::Trusted)?;
270 Ok(())
271 }
272
273 fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> {
277 let cid = msg.cid();
278
279 if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) {
280 return Ok(());
281 }
282
283 msg.verify(self.chain_config.eth_chain_id)
284 .map_err(|e| Error::Other(e.to_string()))?;
285
286 self.sig_val_cache.push(cid.into(), ());
287
288 Ok(())
289 }
290
291 fn add_tipset(
295 &self,
296 msg: SignedMessage,
297 cur_ts: &Tipset,
298 local: bool,
299 trust_policy: TrustPolicy,
300 ) -> Result<bool, Error> {
301 let sequence = self.get_state_sequence(&msg.from(), cur_ts)?;
302
303 if sequence > msg.message().sequence {
304 return Err(Error::SequenceTooLow);
305 }
306
307 let sender_actor = self.api.get_actor_after(&msg.message().from(), cur_ts)?;
308
309 let nv = self.chain_config.network_version(cur_ts.epoch() + 1);
311 let eth_chain_id = self.chain_config.eth_chain_id;
312 if msg.signature().signature_type() == SignatureType::Delegated
313 && !is_valid_eth_tx_for_sending(eth_chain_id, nv, &msg)
314 {
315 return Err(Error::Other(
316 "Invalid Ethereum message for the current network version".to_owned(),
317 ));
318 }
319 if !is_valid_for_sending(nv, &sender_actor) {
320 return Err(Error::Other(
321 "Sender actor is not a valid top-level sender".to_owned(),
322 ));
323 }
324
325 let publish = verify_msg_before_add(&msg, cur_ts, local, &self.chain_config)?;
326
327 let balance = self.get_state_balance(&msg.from(), cur_ts)?;
328
329 let msg_balance = msg.required_funds();
330 if balance < msg_balance {
331 return Err(Error::NotEnoughFunds);
332 }
333 let strictness = if local {
334 StrictnessPolicy::Relaxed
335 } else {
336 StrictnessPolicy::Strict
337 };
338 self.add_helper(msg, trust_policy, strictness)?;
339 Ok(publish)
340 }
341
342 fn add_helper(
347 &self,
348 msg: SignedMessage,
349 trust_policy: TrustPolicy,
350 strictness: StrictnessPolicy,
351 ) -> Result<(), Error> {
352 let from = msg.from();
353 let cur_ts = self.current_tipset();
354 add_helper(
355 self.api.as_ref(),
356 &self.bls_sig_cache,
357 &self.pending_store,
358 &self.key_cache,
359 &cur_ts,
360 msg,
361 self.get_state_sequence(&from, &cur_ts)?,
362 trust_policy,
363 strictness,
364 )
365 }
366
367 pub fn get_sequence(&self, addr: &Address) -> Result<u64, Error> {
370 let cur_ts = self.current_tipset();
371
372 let sequence = self.get_state_sequence(addr, &cur_ts)?;
373
374 let resolved = self.resolve_to_key(addr, &cur_ts).ok();
375 let mset = resolved
376 .and_then(|r| self.pending_store.snapshot_for(&r))
377 .or_else(|| self.pending_store.snapshot_for(addr));
378 match mset {
379 Some(mset) => {
380 if sequence > mset.next_sequence {
381 return Ok(sequence);
382 }
383 Ok(mset.next_sequence)
384 }
385 None => Ok(sequence),
386 }
387 }
388
389 fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error> {
391 get_state_sequence(
392 self.api.as_ref(),
393 &self.key_cache,
394 &self.state_nonce_cache,
395 addr,
396 cur_ts,
397 )
398 }
399
400 fn get_state_balance(&self, addr: &Address, ts: &Tipset) -> Result<TokenAmount, Error> {
403 let actor = self.api.get_actor_after(addr, ts)?;
404 Ok(TokenAmount::from(&actor.balance))
405 }
406
407 pub fn pending(&self) -> (Vec<SignedMessage>, Tipset) {
410 let pending = self.pending_store.snapshot();
411 let len = pending.values().map(|mset| mset.msgs.len()).sum();
412 let mut out = Vec::with_capacity(len);
413
414 for mset in pending.into_values() {
415 out.extend(
416 mset.msgs
417 .into_values()
418 .sorted_unstable_by_key(|m| m.message().sequence),
419 );
420 }
421
422 let cur_ts = self.current_tipset();
423
424 (out, cur_ts)
425 }
426
427 pub fn pending_for(&self, a: &Address) -> Option<Vec<SignedMessage>> {
431 let cur_ts = self.current_tipset();
432 let resolved = self
433 .resolve_to_key(a, &cur_ts)
434 .inspect_err(|e| tracing::debug!(%a, "pending_for: failed to resolve address: {e:#}"))
435 .ok()?;
436 let mset = self.pending_store.snapshot_for(&resolved)?;
437 if mset.msgs.is_empty() {
438 return None;
439 }
440
441 Some(
442 mset.msgs
443 .into_values()
444 .sorted_by_key(|v| v.message().sequence)
445 .collect(),
446 )
447 }
448
449 #[allow(dead_code)] pub fn subscribe_to_updates(&self) -> broadcast::Receiver<MpoolUpdate> {
453 self.pending_store.subscribe()
454 }
455
456 pub fn messages_for_blocks<'a>(
458 &self,
459 blks: impl Iterator<Item = &'a CachingBlockHeader>,
460 ) -> Result<Vec<SignedMessage>, Error> {
461 let mut msg_vec: Vec<SignedMessage> = Vec::new();
462
463 for block in blks {
464 let (umsg, mut smsgs) = self.api.messages_for_block(block)?;
465
466 msg_vec.append(smsgs.as_mut());
467 for msg in umsg {
468 let smsg = recover_sig(&self.bls_sig_cache, msg)?;
469 msg_vec.push(smsg)
470 }
471 }
472 Ok(msg_vec)
473 }
474
475 pub fn load_local(&mut self) -> Result<(), Error> {
477 let mut local_msgs = self.local_msgs.write();
478 for k in local_msgs.iter().cloned().collect_vec() {
479 self.add(k.clone()).unwrap_or_else(|err| {
480 if err == Error::SequenceTooLow {
481 warn!("error adding message: {:?}", err);
482 local_msgs.remove(&k);
483 }
484 })
485 }
486
487 Ok(())
488 }
489
490 #[cfg(test)]
491 pub fn get_config(&self) -> &MpoolConfig {
492 &self.config
493 }
494
495 #[cfg(test)]
496 pub fn set_config<DB: SettingsStore>(
497 &mut self,
498 db: &DB,
499 cfg: MpoolConfig,
500 ) -> Result<(), Error> {
501 cfg.save_config(db)
502 .map_err(|e| Error::Other(e.to_string()))?;
503 self.config = cfg;
504 Ok(())
505 }
506
507 #[cfg(test)]
508 pub async fn apply_head_change(
509 &self,
510 revert: Vec<crate::blocks::Tipset>,
511 apply: Vec<crate::blocks::Tipset>,
512 ) -> Result<(), Error>
513 where
514 T: 'static,
515 {
516 head_change(
517 self.api.as_ref(),
518 &self.bls_sig_cache,
519 self.repub_trigger.clone(),
520 self.republished.as_ref(),
521 &self.pending_store,
522 self.cur_tipset.as_ref(),
523 &self.key_cache,
524 &self.state_nonce_cache,
525 revert,
526 apply,
527 )
528 .await
529 }
530}
531
532impl<T> MessagePool<T>
533where
534 T: Provider + Send + Sync + 'static,
535{
536 pub fn new(
538 api: T,
539 network_sender: flume::Sender<NetworkMessage>,
540 config: MpoolConfig,
541 chain_config: Arc<ChainConfig>,
542 services: &mut JoinSet<anyhow::Result<()>>,
543 ) -> Result<MessagePool<T>, Error>
544 where
545 T: Provider,
546 {
547 let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
548 let pending_store = PendingStore::new(MsgSetLimits::new(
551 api.max_actor_pending_messages(),
552 api.max_untrusted_actor_pending_messages(),
553 ));
554 let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset()));
555 let bls_sig_cache =
556 SizeTrackingLruCache::new_with_metrics("bls_sig".into(), BLS_SIG_CACHE_SIZE);
557 let sig_val_cache =
558 SizeTrackingLruCache::new_with_metrics("sig_val".into(), SIG_VAL_CACHE_SIZE);
559 let key_cache = SizeTrackingLruCache::new_with_metrics("mpool_key".into(), KEY_CACHE_SIZE);
560 let state_nonce_cache =
561 SizeTrackingLruCache::new_with_metrics("state_nonce".into(), STATE_NONCE_CACHE_SIZE);
562 let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
563 let republished = Arc::new(SyncRwLock::new(HashSet::new()));
564 let block_delay = chain_config.block_delay_secs;
565
566 let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4);
567 let mut mp = MessagePool {
568 local_addrs,
569 pending_store,
570 cur_tipset: tipset,
571 api: Arc::new(api),
572 bls_sig_cache,
573 sig_val_cache,
574 key_cache,
575 state_nonce_cache,
576 local_msgs,
577 republished,
578 config,
579 network_sender,
580 repub_trigger,
581 chain_config: Arc::clone(&chain_config),
582 };
583
584 mp.load_local()?;
585
586 let mut head_changes_rx = mp.api.subscribe_head_changes();
587
588 let api = mp.api.clone();
589 let bls_sig_cache = mp.bls_sig_cache.shallow_clone();
590 let pending_store = mp.pending_store.shallow_clone();
591 let republished = mp.republished.clone();
592 let key_cache = mp.key_cache.shallow_clone();
593 let state_nonce_cache = mp.state_nonce_cache.shallow_clone();
594
595 let current_ts = mp.cur_tipset.clone();
596 let repub_trigger = mp.repub_trigger.clone();
597
598 services.spawn(async move {
600 loop {
601 match head_changes_rx.recv().await {
602 Ok(HeadChanges { reverts, applies }) => {
603 if let Err(e) = head_change(
604 api.as_ref(),
605 &bls_sig_cache,
606 repub_trigger.clone(),
607 republished.as_ref(),
608 &pending_store,
609 ¤t_ts,
610 &key_cache,
611 &state_nonce_cache,
612 reverts,
613 applies,
614 )
615 .await
616 {
617 tracing::warn!("Error changing head: {e}");
618 }
619 }
620 Err(RecvError::Lagged(n)) => {
621 warn!("Head change subscriber lagged: skipping {n} events");
622 }
623 Err(RecvError::Closed) => {
624 break Ok(());
625 }
626 }
627 }
628 });
629
630 let api = mp.api.clone();
631 let pending_store = mp.pending_store.shallow_clone();
632 let cur_tipset = mp.cur_tipset.clone();
633 let republished = mp.republished.clone();
634 let local_addrs = mp.local_addrs.clone();
635 let key_cache = mp.key_cache.shallow_clone();
636 let network_sender = Arc::new(mp.network_sender.clone());
637 let republish_interval = u64::from(10 * block_delay + chain_config.propagation_delay_secs);
638 services.spawn(async move {
640 let mut repub_trigger_rx = repub_trigger_rx.stream();
641 let mut interval = interval(Duration::from_secs(republish_interval));
642 loop {
643 tokio::select! {
644 _ = interval.tick() => (),
645 _ = repub_trigger_rx.next() => (),
646 }
647 if let Err(e) = republish_pending_messages(
648 api.as_ref(),
649 network_sender.as_ref(),
650 &pending_store,
651 cur_tipset.as_ref(),
652 republished.as_ref(),
653 local_addrs.as_ref(),
654 &key_cache,
655 &chain_config,
656 )
657 .await
658 {
659 warn!("Failed to republish pending messages: {}", e.to_string());
660 }
661 }
662 });
663 Ok(mp)
664 }
665}
666
667#[allow(clippy::too_many_arguments)]
674pub(in crate::message_pool) fn add_helper<T>(
675 api: &T,
676 bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
677 pending_store: &PendingStore,
678 key_cache: &IdToAddressCache,
679 cur_ts: &Tipset,
680 msg: SignedMessage,
681 sequence: u64,
682 trust_policy: TrustPolicy,
683 strictness: StrictnessPolicy,
684) -> Result<(), Error>
685where
686 T: Provider,
687{
688 if msg.signature().signature_type() == SignatureType::Bls {
689 bls_sig_cache.push(msg.cid().into(), msg.signature().clone());
690 }
691
692 api.put_message(&ChainMessage::Signed(msg.clone().into()))?;
693 api.put_message(&ChainMessage::Unsigned(msg.message().clone().into()))?;
694
695 let resolved_from = resolve_to_key(api, key_cache, &msg.from(), cur_ts)?;
696 pending_store.insert(resolved_from, msg, sequence, trust_policy, strictness)
697}
698
699fn verify_msg_before_add(
700 m: &SignedMessage,
701 cur_ts: &Tipset,
702 local: bool,
703 chain_config: &ChainConfig,
704) -> Result<bool, Error> {
705 let epoch = cur_ts.epoch();
706 let min_gas = price_list_by_network_version(chain_config.network_version(epoch))
707 .on_chain_message(m.chain_length()?);
708 valid_for_block_inclusion(m.message(), min_gas.total(), NEWEST_NETWORK_VERSION)?;
709 if !cur_ts.block_headers().is_empty() {
710 let base_fee = &cur_ts.block_headers().first().parent_base_fee;
711 let base_fee_lower_bound =
712 get_base_fee_lower_bound(base_fee, BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE);
713 if m.gas_fee_cap() < base_fee_lower_bound {
714 if local {
715 warn!(
716 "local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound: {})",
717 m.gas_fee_cap(),
718 base_fee_lower_bound
719 );
720 return Ok(false);
721 }
722 return Err(Error::SoftValidationFailure(format!(
723 "GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound:{})",
724 m.gas_fee_cap(),
725 base_fee_lower_bound
726 )));
727 }
728 }
729 Ok(local)
730}
731
732#[cfg(test)]
733mod tests {
734 use crate::blocks::RawBlockHeader;
735 use crate::chain::ChainStore;
736 use crate::db::MemoryDB;
737 use crate::message_pool::provider::Provider;
738 use crate::message_pool::test_provider::TestApi;
739 use crate::networks::ChainConfig;
740 use crate::shim::econ::TokenAmount;
741 use crate::shim::state_tree::{ActorState, StateTree, StateTreeVersion};
742 use crate::utils::db::CborStoreExt as _;
743
744 use super::*;
745 use crate::shim::message::Message as ShimMessage;
746
747 fn make_smsg(from: Address, seq: u64, premium: u64) -> SignedMessage {
748 SignedMessage::mock_bls_signed_message(ShimMessage {
749 from,
750 sequence: seq,
751 gas_premium: TokenAmount::from_atto(premium),
752 gas_limit: 1_000_000,
753 ..ShimMessage::default()
754 })
755 }
756
757 fn test_pending_store(api: &TestApi) -> PendingStore {
759 PendingStore::new(MsgSetLimits::new(
760 api.max_actor_pending_messages(),
761 api.max_untrusted_actor_pending_messages(),
762 ))
763 }
764
765 #[test]
768 fn add_helper_message_gas_limit_test() {
769 let api = TestApi::default();
770 let bls_sig_cache = SizeTrackingLruCache::new_mocked();
771 let key_cache = SizeTrackingLruCache::new_mocked();
772 let pending_store = test_pending_store(&api);
773 let cur_ts = api.get_heaviest_tipset();
774 let message = ShimMessage {
775 gas_limit: 666_666_666,
776 ..ShimMessage::default()
777 };
778 let msg = SignedMessage::mock_bls_signed_message(message);
779 let sequence = msg.message().sequence;
780 let res = add_helper(
781 &api,
782 &bls_sig_cache,
783 &pending_store,
784 &key_cache,
785 &cur_ts,
786 msg,
787 sequence,
788 TrustPolicy::Trusted,
789 StrictnessPolicy::Relaxed,
790 );
791 assert!(res.is_ok());
792 }
793
794 #[test]
795 fn test_resolve_to_key_returns_non_id_unchanged() {
796 let api = TestApi::default();
797 let key_cache = SizeTrackingLruCache::new_mocked();
798 let ts = api.get_heaviest_tipset();
799
800 let bls_addr = Address::new_bls(&[1u8; 48]).unwrap();
801 let result = resolve_to_key(&api, &key_cache, &bls_addr, &ts).unwrap();
802 assert_eq!(result, bls_addr);
803 assert_eq!(
804 key_cache.len(),
805 0,
806 "cache should not be populated for non-ID addresses"
807 );
808 }
809
810 #[test]
811 fn test_resolve_to_key_resolves_id_and_caches() {
812 let api = TestApi::default();
813 let key_cache = SizeTrackingLruCache::new_mocked();
814 let ts = api.get_heaviest_tipset();
815
816 let id_addr = Address::new_id(100);
817 let key_addr = Address::new_bls(&[5u8; 48]).unwrap();
818 api.set_key_address_mapping(&id_addr, &key_addr);
819
820 let result = resolve_to_key(&api, &key_cache, &id_addr, &ts).unwrap();
821 assert_eq!(result, key_addr);
822 assert_eq!(
823 key_cache.len(),
824 1,
825 "cache should have one entry after resolution"
826 );
827
828 let result2 = resolve_to_key(&api, &key_cache, &id_addr, &ts).unwrap();
830 assert_eq!(result2, key_addr);
831 }
832
833 #[test]
834 fn test_add_helper_keys_pending_by_resolved_address() {
835 let api = TestApi::default();
836 let bls_sig_cache = SizeTrackingLruCache::new_mocked();
837 let key_cache = SizeTrackingLruCache::new_mocked();
838 let pending_store = test_pending_store(&api);
839 let cur_ts = api.get_heaviest_tipset();
840
841 let id_addr = Address::new_id(200);
842 let key_addr = Address::new_bls(&[7u8; 48]).unwrap();
843 api.set_key_address_mapping(&id_addr, &key_addr);
844 api.set_state_sequence(&key_addr, 0);
845
846 let message = ShimMessage {
847 from: id_addr,
848 gas_limit: 1_000_000,
849 ..ShimMessage::default()
850 };
851 let msg = SignedMessage::mock_bls_signed_message(message);
852
853 add_helper(
854 &api,
855 &bls_sig_cache,
856 &pending_store,
857 &key_cache,
858 &cur_ts,
859 msg,
860 0,
861 TrustPolicy::Trusted,
862 StrictnessPolicy::Relaxed,
863 )
864 .unwrap();
865
866 assert!(
867 pending_store.snapshot_for(&key_addr).is_some(),
868 "pending should be keyed by the resolved key address"
869 );
870 assert!(
871 pending_store.snapshot_for(&id_addr).is_none(),
872 "pending should NOT have an entry under the raw ID address"
873 );
874 }
875
876 #[test]
877 fn test_get_sequence_works_with_both_address_forms() {
878 let api = TestApi::default();
879 let bls_sig_cache = SizeTrackingLruCache::new_mocked();
880 let key_cache = SizeTrackingLruCache::new_mocked();
881 let pending_store = test_pending_store(&api);
882 let cur_ts = api.get_heaviest_tipset();
883
884 let id_addr = Address::new_id(300);
885 let key_addr = Address::new_bls(&[9u8; 48]).unwrap();
886 api.set_key_address_mapping(&id_addr, &key_addr);
887 api.set_state_sequence(&key_addr, 0);
888
889 for seq in 0..2 {
891 let message = ShimMessage {
892 from: id_addr,
893 sequence: seq,
894 gas_limit: 1_000_000,
895 ..ShimMessage::default()
896 };
897 let msg = SignedMessage::mock_bls_signed_message(message);
898 add_helper(
899 &api,
900 &bls_sig_cache,
901 &pending_store,
902 &key_cache,
903 &cur_ts,
904 msg,
905 0,
906 TrustPolicy::Trusted,
907 StrictnessPolicy::Relaxed,
908 )
909 .unwrap();
910 }
911
912 let state_seq = api.get_actor_after(&id_addr, &cur_ts).unwrap().sequence;
913 let resolved_for_id = resolve_to_key(&api, &key_cache, &id_addr, &cur_ts).unwrap();
914 let resolved_for_key = resolve_to_key(&api, &key_cache, &key_addr, &cur_ts).unwrap();
915 assert_eq!(resolved_for_id, resolved_for_key);
916
917 let next_seq = pending_store
918 .snapshot_for(&resolved_for_id)
919 .unwrap()
920 .next_sequence;
921 let expected = std::cmp::max(state_seq, next_seq);
922 assert_eq!(expected, 2, "should reflect both pending messages");
923 }
924
925 #[test]
926 fn test_get_state_sequence_accounts_for_tipset_messages() {
927 use crate::message_pool::test_provider::mock_block;
928
929 let api = TestApi::default();
930 let key_cache = SizeTrackingLruCache::new_mocked();
931 let state_nonce_cache = SizeTrackingLruCache::new_mocked();
932
933 let sender = Address::new_bls(&[3u8; 48]).unwrap();
934 api.set_state_sequence(&sender, 5);
935
936 let block = mock_block(1, 1);
937 api.inner.lock().set_block_messages(
938 &block,
939 vec![make_smsg(sender, 5, 100), make_smsg(sender, 7, 100)],
940 );
941 let ts = Tipset::from(block);
942
943 let nonce = get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap();
944 assert_eq!(
945 nonce, 8,
946 "should account for non-consecutive tipset message at nonce 7"
947 );
948 }
949
950 #[test]
951 fn test_get_state_sequence_ignores_other_addresses() {
952 use crate::message_pool::test_provider::mock_block;
953
954 let api = TestApi::default();
955 let key_cache = SizeTrackingLruCache::new_mocked();
956 let state_nonce_cache = SizeTrackingLruCache::new_mocked();
957
958 let addr_a = Address::new_bls(&[4u8; 48]).unwrap();
959 let addr_b = Address::new_bls(&[5u8; 48]).unwrap();
960 api.set_state_sequence(&addr_a, 0);
961 api.set_state_sequence(&addr_b, 0);
962
963 let block = mock_block(1, 1);
964 api.inner.lock().set_block_messages(
965 &block,
966 vec![
967 make_smsg(addr_b, 0, 100),
968 make_smsg(addr_b, 1, 100),
969 make_smsg(addr_b, 2, 100),
970 ],
971 );
972 let ts = Tipset::from(block);
973
974 let nonce_a =
975 get_state_sequence(&api, &key_cache, &state_nonce_cache, &addr_a, &ts).unwrap();
976 assert_eq!(
977 nonce_a, 0,
978 "addr_a nonce should be unaffected by addr_b's messages"
979 );
980
981 let nonce_b =
982 get_state_sequence(&api, &key_cache, &state_nonce_cache, &addr_b, &ts).unwrap();
983 assert_eq!(
984 nonce_b, 3,
985 "addr_b nonce should reflect its tipset messages"
986 );
987 }
988
989 #[test]
990 fn test_get_state_sequence_cache_hit() {
991 use crate::message_pool::test_provider::mock_block;
992
993 let api = TestApi::default();
994 let key_cache = SizeTrackingLruCache::new_mocked();
995 let state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64> =
996 SizeTrackingLruCache::new_mocked();
997
998 let sender = Address::new_bls(&[6u8; 48]).unwrap();
999 api.set_state_sequence(&sender, 5);
1000
1001 let block = mock_block(1, 1);
1002 api.inner
1003 .lock()
1004 .set_block_messages(&block, vec![make_smsg(sender, 5, 100)]);
1005 let ts = Tipset::from(block);
1006
1007 let nonce1 =
1008 get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap();
1009 assert_eq!(nonce1, 6);
1010
1011 api.set_state_sequence(&sender, 99);
1013 let nonce2 =
1014 get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap();
1015 assert_eq!(
1016 nonce2, 6,
1017 "second call should return the cached value, not re-read state"
1018 );
1019 }
1020
1021 #[test]
1022 fn test_get_state_sequence_cache_miss_on_different_tipset() {
1023 use crate::message_pool::test_provider::mock_block;
1024
1025 let api = TestApi::default();
1026 let key_cache = SizeTrackingLruCache::new_mocked();
1027 let state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64> =
1028 SizeTrackingLruCache::new_mocked();
1029
1030 let sender = Address::new_bls(&[7u8; 48]).unwrap();
1031 api.set_state_sequence(&sender, 10);
1032
1033 let block_a = mock_block(1, 1);
1034 let ts_a = Tipset::from(&block_a);
1035
1036 let nonce_a =
1037 get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts_a).unwrap();
1038 assert_eq!(nonce_a, 10);
1039
1040 api.set_state_sequence(&sender, 20);
1042 let block_b = mock_block(2, 2);
1043 let ts_b = Tipset::from(&block_b);
1044
1045 let nonce_b =
1046 get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts_b).unwrap();
1047 assert_eq!(
1048 nonce_b, 20,
1049 "different tipset should miss the cache and read fresh state"
1050 );
1051 }
1052
1053 #[test]
1054 fn resolve_to_key_uses_finality_lookback() {
1055 let db = Arc::new(MemoryDB::default());
1056
1057 let mut cfg = ChainConfig::default();
1058 cfg.policy.chain_finality = 1;
1059 let cfg = Arc::new(cfg);
1060
1061 let bls_a = Address::new_bls(&[8u8; 48]).unwrap();
1062 let bls_b = Address::new_bls(&[9u8; 48]).unwrap();
1063
1064 let mut st_a = StateTree::new(db.clone(), StateTreeVersion::V5).unwrap();
1066 st_a.set_actor(
1067 &Address::new_id(300),
1068 ActorState::new_empty(Cid::default(), Some(bls_a)),
1069 )
1070 .unwrap();
1071 let root_a = st_a.flush().unwrap();
1072
1073 let mut st_b = StateTree::new(db.clone(), StateTreeVersion::V5).unwrap();
1075 st_b.set_actor(
1076 &Address::new_id(400),
1077 ActorState::new_empty(Cid::default(), Some(bls_b)),
1078 )
1079 .unwrap();
1080 let root_b = st_b.flush().unwrap();
1081
1082 let genesis = Tipset::from(CachingBlockHeader::new(RawBlockHeader {
1083 state_root: root_a,
1084 ..Default::default()
1085 }));
1086 db.put_cbor_default(genesis.block_headers().first())
1087 .unwrap();
1088
1089 let ts1 = Tipset::from(CachingBlockHeader::new(RawBlockHeader {
1090 parents: genesis.key().clone(),
1091 epoch: 1,
1092 state_root: root_a,
1093 timestamp: 1,
1094 ..Default::default()
1095 }));
1096 db.put_cbor_default(ts1.block_headers().first()).unwrap();
1097
1098 let head = Tipset::from(CachingBlockHeader::new(RawBlockHeader {
1099 parents: ts1.key().clone(),
1100 epoch: 2,
1101 state_root: root_b,
1102 timestamp: 2,
1103 ..Default::default()
1104 }));
1105 db.put_cbor_default(head.block_headers().first()).unwrap();
1106
1107 let cs = ChainStore::new(
1108 db.clone(),
1109 db.clone(),
1110 db,
1111 cfg,
1112 genesis.block_headers().first().clone(),
1113 )
1114 .unwrap();
1115
1116 let result = Provider::resolve_to_deterministic_address_at_finality(
1118 &cs,
1119 &Address::new_id(300),
1120 &head,
1121 )
1122 .unwrap();
1123 assert_eq!(result, bls_a);
1124
1125 Provider::resolve_to_deterministic_address_at_finality(&cs, &Address::new_id(400), &head)
1127 .expect_err("actor only in head state must not resolve via finality lookback");
1128 }
1129}