1use std::{num::NonZeroUsize, sync::Arc, time::Duration};
10
11use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey};
12use crate::chain::{HeadChanges, MINIMUM_BASE_FEE};
13#[cfg(test)]
14use crate::db::SettingsStore;
15use crate::eth::is_valid_eth_tx_for_sending;
16use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
17use crate::message::{ChainMessage, MessageRead as _, SignedMessage, valid_for_block_inclusion};
18use crate::networks::{ChainConfig, NEWEST_NETWORK_VERSION};
19use crate::rpc::eth::types::EthAddress;
20use crate::shim::{
21 address::{Address, Protocol},
22 crypto::{Signature, SignatureType},
23 econ::TokenAmount,
24 gas::{Gas, price_list_by_network_version},
25};
26use crate::state_manager::IdToAddressCache;
27use crate::state_manager::utils::is_valid_for_sending;
28use crate::utils::ShallowClone as _;
29use crate::utils::cache::SizeTrackingLruCache;
30use crate::utils::get_size::CidWrapper;
31use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
32use anyhow::Context as _;
33use cid::Cid;
34use futures::StreamExt;
35use fvm_ipld_encoding::to_vec;
36use get_size2::GetSize;
37use itertools::Itertools;
38use nonzero_ext::nonzero;
39use parking_lot::RwLock as SyncRwLock;
40use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
41use tracing::warn;
42
43use crate::message_pool::{
44 config::MpoolConfig,
45 errors::Error,
46 head_change, metrics,
47 msgpool::{
48 BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, RBF_DENOM, RBF_NUM, recover_sig,
49 republish_pending_messages,
50 },
51 provider::Provider,
52 utils::get_base_fee_lower_bound,
53};
54
55const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize);
57const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize);
58const KEY_CACHE_SIZE: NonZeroUsize = nonzero!(1_048_576usize);
59const STATE_NONCE_CACHE_SIZE: NonZeroUsize = nonzero!(32768usize);
60
61#[derive(Clone, Debug, Hash, PartialEq, Eq, GetSize)]
62pub(crate) struct StateNonceCacheKey {
63 tipset_key: TipsetKey,
64 addr: Address,
65}
66
67pub const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000;
68pub const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10;
69const MAX_NONCE_GAP: u64 = 4;
70const MAX_MESSAGE_SIZE: usize = 64 << 10; #[derive(Clone, Copy, Debug, PartialEq, Eq)]
77pub enum TrustPolicy {
78 Trusted,
79 Untrusted,
80}
81
82#[derive(Clone, Copy, Debug, PartialEq, Eq)]
84pub enum StrictnessPolicy {
85 Strict,
86 Relaxed,
87}
88
89#[derive(Clone, Default, Debug)]
92pub struct MsgSet {
93 pub(in crate::message_pool) msgs: HashMap<u64, SignedMessage>,
94 next_sequence: u64,
95}
96
97impl MsgSet {
98 pub fn new(sequence: u64) -> Self {
101 MsgSet {
102 msgs: HashMap::new(),
103 next_sequence: sequence,
104 }
105 }
106
107 pub fn add_trusted<T>(
111 &mut self,
112 api: &T,
113 m: SignedMessage,
114 strictness: StrictnessPolicy,
115 ) -> Result<(), Error>
116 where
117 T: Provider,
118 {
119 self.add(api, m, strictness, true)
120 }
121
122 pub fn add_untrusted<T>(
126 &mut self,
127 api: &T,
128 m: SignedMessage,
129 strictness: StrictnessPolicy,
130 ) -> Result<(), Error>
131 where
132 T: Provider,
133 {
134 self.add(api, m, strictness, false)
135 }
136
137 pub(in crate::message_pool) fn add<T>(
150 &mut self,
151 api: &T,
152 m: SignedMessage,
153 strictness: StrictnessPolicy,
154 trusted: bool,
155 ) -> Result<(), Error>
156 where
157 T: Provider,
158 {
159 let strict = matches!(strictness, StrictnessPolicy::Strict);
160 let max_nonce_gap: u64 = if trusted { MAX_NONCE_GAP } else { 0 };
161 let max_actor_pending_messages = if trusted {
162 api.max_actor_pending_messages()
163 } else {
164 api.max_untrusted_actor_pending_messages()
165 };
166
167 let mut next_nonce = self.next_sequence;
168 let nonce_gap = if m.sequence() == next_nonce {
169 next_nonce += 1;
170 while self.msgs.contains_key(&next_nonce) {
171 next_nonce += 1;
172 }
173 false
174 } else if strict && m.sequence() > next_nonce + max_nonce_gap {
175 tracing::debug!(
176 nonce = m.sequence(),
177 next_nonce,
178 "message nonce has too big a gap from expected nonce"
179 );
180 return Err(Error::NonceGap);
181 } else {
182 m.sequence() > next_nonce
183 };
184
185 let has_existing = if let Some(exms) = self.msgs.get(&m.sequence()) {
186 if strict && nonce_gap {
187 tracing::debug!(
188 nonce = m.sequence(),
189 next_nonce,
190 "rejecting replace by fee because of nonce gap"
191 );
192 return Err(Error::NonceGap);
193 }
194 if m.cid() != exms.cid() {
195 let premium = &exms.message().gas_premium;
196 let min_price = premium.clone()
197 + ((premium * RBF_NUM).div_floor(RBF_DENOM))
198 + TokenAmount::from_atto(1u8);
199 if m.message().gas_premium <= min_price {
200 return Err(Error::GasPriceTooLow);
201 }
202 } else {
203 return Err(Error::DuplicateSequence);
204 }
205 true
206 } else {
207 false
208 };
209
210 if !has_existing && self.msgs.len() as u64 >= max_actor_pending_messages {
212 return Err(Error::TooManyPendingMessages(
213 m.message.from().to_string(),
214 trusted,
215 ));
216 }
217
218 if strict && nonce_gap {
219 tracing::debug!(
220 from = %m.from(),
221 nonce = m.sequence(),
222 next_nonce,
223 "adding nonce-gapped message"
224 );
225 }
226
227 self.next_sequence = next_nonce;
228 if self.msgs.insert(m.sequence(), m).is_none() {
229 metrics::MPOOL_MESSAGE_TOTAL.inc();
230 }
231 Ok(())
232 }
233
234 pub fn rm(&mut self, sequence: u64, applied: bool) {
242 if self.msgs.remove(&sequence).is_none() {
243 if applied && sequence >= self.next_sequence {
244 self.next_sequence = sequence + 1;
245 while self.msgs.contains_key(&self.next_sequence) {
246 self.next_sequence += 1;
247 }
248 }
249 return;
250 }
251 metrics::MPOOL_MESSAGE_TOTAL.dec();
252
253 if applied {
255 if sequence >= self.next_sequence {
258 self.next_sequence = sequence + 1;
259 }
260 return;
261 }
262 if sequence < self.next_sequence {
265 self.next_sequence = sequence;
266 }
267 }
268}
269
270pub struct MessagePool<T> {
274 local_addrs: Arc<SyncRwLock<Vec<Address>>>,
276 pub pending: Arc<SyncRwLock<HashMap<Address, MsgSet>>>,
278 pub cur_tipset: Arc<SyncRwLock<Tipset>>,
280 pub api: Arc<T>,
282 pub network_sender: flume::Sender<NetworkMessage>,
284 pub bls_sig_cache: SizeTrackingLruCache<CidWrapper, Signature>,
286 pub sig_val_cache: SizeTrackingLruCache<CidWrapper, ()>,
288 pub key_cache: IdToAddressCache,
290 pub state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64>,
292 pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
294 pub repub_trigger: flume::Sender<()>,
297 local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
298 pub config: MpoolConfig,
300 pub chain_config: Arc<ChainConfig>,
302}
303
304pub(in crate::message_pool) fn resolve_to_key<T: Provider>(
307 api: &T,
308 key_cache: &IdToAddressCache,
309 addr: &Address,
310 cur_ts: &Tipset,
311) -> Result<Address, Error> {
312 let id = addr.id().ok();
313 if let Some(id) = &id
314 && let Some(resolved) = key_cache.get_cloned(id)
315 {
316 return Ok(resolved);
317 }
318 let resolved = api.resolve_to_deterministic_address_at_finality(addr, cur_ts)?;
319 if let Some(id) = id {
320 key_cache.push(id, resolved);
321 }
322 Ok(resolved)
323}
324
325pub(in crate::message_pool) fn get_state_sequence<T: Provider>(
327 api: &T,
328 key_cache: &IdToAddressCache,
329 state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
330 addr: &Address,
331 cur_ts: &Tipset,
332) -> Result<u64, Error> {
333 let nk = StateNonceCacheKey {
334 tipset_key: cur_ts.key().clone(),
335 addr: *addr,
336 };
337
338 if let Some(cached) = state_nonce_cache.get_cloned(&nk) {
339 return Ok(cached);
340 }
341
342 let actor = api.get_actor_after(addr, cur_ts)?;
343 let mut next_nonce = actor.sequence;
344
345 if let (Ok(resolved), Ok(messages)) = (
346 resolve_to_key(api, key_cache, addr, cur_ts)
347 .inspect_err(|e| tracing::warn!(%addr, "failed to resolve address to key: {e:#}")),
348 api.messages_for_tipset(cur_ts)
349 .inspect_err(|e| tracing::warn!("failed to get messages for tipset: {e:#}")),
350 ) {
351 for msg in messages.iter() {
352 if let Ok(from) = resolve_to_key(api, key_cache, &msg.from(), cur_ts).inspect_err(
353 |e| tracing::warn!(from = %msg.from(), "failed to resolve message sender: {e:#}"),
354 ) && from == resolved
355 {
356 let n = msg.sequence() + 1;
357 if n > next_nonce {
358 next_nonce = n;
359 }
360 }
361 }
362 }
363
364 state_nonce_cache.push(nk, next_nonce);
365 Ok(next_nonce)
366}
367
368impl<T> MessagePool<T>
369where
370 T: Provider,
371{
372 pub fn current_tipset(&self) -> Tipset {
374 self.cur_tipset.read().clone()
375 }
376
377 pub fn resolve_to_key(&self, addr: &Address, cur_ts: &Tipset) -> Result<Address, Error> {
378 resolve_to_key(self.api.as_ref(), &self.key_cache, addr, cur_ts)
379 }
380
381 fn add_local(&self, m: SignedMessage) -> Result<(), Error> {
383 let cur_ts = self.current_tipset();
384 let resolved = self.resolve_to_key(&m.from(), &cur_ts)?;
385 self.local_addrs.write().push(resolved);
386 self.local_msgs.write().insert(m);
387 Ok(())
388 }
389
390 pub async fn push_internal(
393 &self,
394 msg: SignedMessage,
395 trust_policy: TrustPolicy,
396 ) -> Result<Cid, Error> {
397 self.check_message(&msg)?;
398 let cid = msg.cid();
399 let cur_ts = self.current_tipset();
400 let publish = self.add_tipset(msg.clone(), &cur_ts, true, trust_policy)?;
401 let msg_ser = to_vec(&msg)?;
402 let network_name = self.chain_config.network.genesis_name();
403 self.add_local(msg)?;
404 if publish {
405 self.network_sender
406 .send_async(NetworkMessage::PubsubMessage {
407 topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
408 message: msg_ser,
409 })
410 .await
411 .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
412 }
413 Ok(cid)
414 }
415
416 pub async fn push(&self, msg: SignedMessage) -> Result<Cid, Error> {
418 self.push_internal(msg, TrustPolicy::Trusted).await
419 }
420
421 pub async fn push_untrusted(&self, msg: SignedMessage) -> Result<Cid, Error> {
423 self.push_internal(msg, TrustPolicy::Untrusted).await
424 }
425
426 fn check_message(&self, msg: &SignedMessage) -> Result<(), Error> {
427 if to_vec(msg)?.len() > MAX_MESSAGE_SIZE {
428 return Err(Error::MessageTooBig);
429 }
430 let to = msg.message().to();
431 if to.protocol() == Protocol::Delegated {
432 EthAddress::from_filecoin_address(&to).context(format!(
433 "message recipient {to} is a delegated address but not a valid Eth Address"
434 ))?;
435 }
436 valid_for_block_inclusion(msg.message(), Gas::new(0), NEWEST_NETWORK_VERSION)?;
437 if msg.value() > *crate::shim::econ::TOTAL_FILECOIN {
438 return Err(Error::MessageValueTooHigh);
439 }
440 if msg.gas_fee_cap().atto() < &MINIMUM_BASE_FEE.into() {
441 return Err(Error::GasFeeCapTooLow);
442 }
443 self.verify_msg_sig(msg)
444 }
445
446 pub fn add(&self, msg: SignedMessage) -> Result<(), Error> {
449 self.check_message(&msg)?;
450 let ts = self.current_tipset();
451 self.add_tipset(msg, &ts, false, TrustPolicy::Trusted)?;
452 Ok(())
453 }
454
455 fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> {
459 let cid = msg.cid();
460
461 if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) {
462 return Ok(());
463 }
464
465 msg.verify(self.chain_config.eth_chain_id)
466 .map_err(|e| Error::Other(e.to_string()))?;
467
468 self.sig_val_cache.push(cid.into(), ());
469
470 Ok(())
471 }
472
473 fn add_tipset(
477 &self,
478 msg: SignedMessage,
479 cur_ts: &Tipset,
480 local: bool,
481 trust_policy: TrustPolicy,
482 ) -> Result<bool, Error> {
483 let sequence = self.get_state_sequence(&msg.from(), cur_ts)?;
484
485 if sequence > msg.message().sequence {
486 return Err(Error::SequenceTooLow);
487 }
488
489 let sender_actor = self.api.get_actor_after(&msg.message().from(), cur_ts)?;
490
491 let nv = self.chain_config.network_version(cur_ts.epoch() + 1);
493 let eth_chain_id = self.chain_config.eth_chain_id;
494 if msg.signature().signature_type() == SignatureType::Delegated
495 && !is_valid_eth_tx_for_sending(eth_chain_id, nv, &msg)
496 {
497 return Err(Error::Other(
498 "Invalid Ethereum message for the current network version".to_owned(),
499 ));
500 }
501 if !is_valid_for_sending(nv, &sender_actor) {
502 return Err(Error::Other(
503 "Sender actor is not a valid top-level sender".to_owned(),
504 ));
505 }
506
507 let publish = verify_msg_before_add(&msg, cur_ts, local, &self.chain_config)?;
508
509 let balance = self.get_state_balance(&msg.from(), cur_ts)?;
510
511 let msg_balance = msg.required_funds();
512 if balance < msg_balance {
513 return Err(Error::NotEnoughFunds);
514 }
515 let strictness = if local {
516 StrictnessPolicy::Relaxed
517 } else {
518 StrictnessPolicy::Strict
519 };
520 self.add_helper(msg, trust_policy, strictness)?;
521 Ok(publish)
522 }
523
524 fn add_helper(
529 &self,
530 msg: SignedMessage,
531 trust_policy: TrustPolicy,
532 strictness: StrictnessPolicy,
533 ) -> Result<(), Error> {
534 let from = msg.from();
535 let cur_ts = self.current_tipset();
536 add_helper(
537 self.api.as_ref(),
538 &self.bls_sig_cache,
539 self.pending.as_ref(),
540 &self.key_cache,
541 &cur_ts,
542 msg,
543 self.get_state_sequence(&from, &cur_ts)?,
544 trust_policy,
545 strictness,
546 )
547 }
548
549 pub fn get_sequence(&self, addr: &Address) -> Result<u64, Error> {
552 let cur_ts = self.current_tipset();
553
554 let sequence = self.get_state_sequence(addr, &cur_ts)?;
555
556 let pending = self.pending.read();
557 let msgset = self
558 .resolve_to_key(addr, &cur_ts)
559 .ok()
560 .and_then(|resolved| pending.get(&resolved))
561 .or_else(|| pending.get(addr));
562 match msgset {
563 Some(mset) => {
564 if sequence > mset.next_sequence {
565 return Ok(sequence);
566 }
567 Ok(mset.next_sequence)
568 }
569 None => Ok(sequence),
570 }
571 }
572
573 fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error> {
575 get_state_sequence(
576 self.api.as_ref(),
577 &self.key_cache,
578 &self.state_nonce_cache,
579 addr,
580 cur_ts,
581 )
582 }
583
584 fn get_state_balance(&self, addr: &Address, ts: &Tipset) -> Result<TokenAmount, Error> {
587 let actor = self.api.get_actor_after(addr, ts)?;
588 Ok(TokenAmount::from(&actor.balance))
589 }
590
591 pub fn pending(&self) -> (Vec<SignedMessage>, Tipset) {
594 let pending = self.pending.read().clone();
595 let len = pending.values().map(|mset| mset.msgs.len()).sum();
596 let mut out = Vec::with_capacity(len);
597
598 for mset in pending.into_values() {
599 out.extend(
600 mset.msgs
601 .into_values()
602 .sorted_unstable_by_key(|m| m.message().sequence),
603 );
604 }
605
606 let cur_ts = self.current_tipset();
607
608 (out, cur_ts)
609 }
610
611 pub fn pending_for(&self, a: &Address) -> Option<Vec<SignedMessage>> {
615 let cur_ts = self.current_tipset();
616 let resolved = self
617 .resolve_to_key(a, &cur_ts)
618 .inspect_err(|e| tracing::debug!(%a, "pending_for: failed to resolve address: {e:#}"))
619 .ok()?;
620 let pending = self.pending.read();
621 let mset = pending.get(&resolved)?;
622 if mset.msgs.is_empty() {
623 return None;
624 }
625
626 Some(
627 mset.msgs
628 .values()
629 .cloned()
630 .sorted_by_key(|v| v.message().sequence)
631 .collect(),
632 )
633 }
634
635 pub fn messages_for_blocks<'a>(
637 &self,
638 blks: impl Iterator<Item = &'a CachingBlockHeader>,
639 ) -> Result<Vec<SignedMessage>, Error> {
640 let mut msg_vec: Vec<SignedMessage> = Vec::new();
641
642 for block in blks {
643 let (umsg, mut smsgs) = self.api.messages_for_block(block)?;
644
645 msg_vec.append(smsgs.as_mut());
646 for msg in umsg {
647 let smsg = recover_sig(&self.bls_sig_cache, msg)?;
648 msg_vec.push(smsg)
649 }
650 }
651 Ok(msg_vec)
652 }
653
654 pub fn load_local(&mut self) -> Result<(), Error> {
656 let mut local_msgs = self.local_msgs.write();
657 for k in local_msgs.iter().cloned().collect_vec() {
658 self.add(k.clone()).unwrap_or_else(|err| {
659 if err == Error::SequenceTooLow {
660 warn!("error adding message: {:?}", err);
661 local_msgs.remove(&k);
662 }
663 })
664 }
665
666 Ok(())
667 }
668
669 #[cfg(test)]
670 pub fn get_config(&self) -> &MpoolConfig {
671 &self.config
672 }
673
674 #[cfg(test)]
675 pub fn set_config<DB: SettingsStore>(
676 &mut self,
677 db: &DB,
678 cfg: MpoolConfig,
679 ) -> Result<(), Error> {
680 cfg.save_config(db)
681 .map_err(|e| Error::Other(e.to_string()))?;
682 self.config = cfg;
683 Ok(())
684 }
685
686 #[cfg(test)]
687 pub async fn apply_head_change(
688 &self,
689 revert: Vec<crate::blocks::Tipset>,
690 apply: Vec<crate::blocks::Tipset>,
691 ) -> Result<(), Error>
692 where
693 T: 'static,
694 {
695 head_change(
696 self.api.as_ref(),
697 &self.bls_sig_cache,
698 self.repub_trigger.clone(),
699 self.republished.as_ref(),
700 self.pending.as_ref(),
701 self.cur_tipset.as_ref(),
702 &self.key_cache,
703 &self.state_nonce_cache,
704 revert,
705 apply,
706 )
707 .await
708 }
709}
710
711impl<T> MessagePool<T>
712where
713 T: Provider + Send + Sync + 'static,
714{
715 pub fn new(
717 api: T,
718 network_sender: flume::Sender<NetworkMessage>,
719 config: MpoolConfig,
720 chain_config: Arc<ChainConfig>,
721 services: &mut JoinSet<anyhow::Result<()>>,
722 ) -> Result<MessagePool<T>, Error>
723 where
724 T: Provider,
725 {
726 let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
727 let pending = Arc::new(SyncRwLock::new(HashMap::new()));
728 let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset()));
729 let bls_sig_cache =
730 SizeTrackingLruCache::new_with_metrics("bls_sig".into(), BLS_SIG_CACHE_SIZE);
731 let sig_val_cache =
732 SizeTrackingLruCache::new_with_metrics("sig_val".into(), SIG_VAL_CACHE_SIZE);
733 let key_cache = SizeTrackingLruCache::new_with_metrics("mpool_key".into(), KEY_CACHE_SIZE);
734 let state_nonce_cache =
735 SizeTrackingLruCache::new_with_metrics("state_nonce".into(), STATE_NONCE_CACHE_SIZE);
736 let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
737 let republished = Arc::new(SyncRwLock::new(HashSet::new()));
738 let block_delay = chain_config.block_delay_secs;
739
740 let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4);
741 let mut mp = MessagePool {
742 local_addrs,
743 pending,
744 cur_tipset: tipset,
745 api: Arc::new(api),
746 bls_sig_cache,
747 sig_val_cache,
748 key_cache,
749 state_nonce_cache,
750 local_msgs,
751 republished,
752 config,
753 network_sender,
754 repub_trigger,
755 chain_config: Arc::clone(&chain_config),
756 };
757
758 mp.load_local()?;
759
760 let mut head_changes_rx = mp.api.subscribe_head_changes();
761
762 let api = mp.api.clone();
763 let bls_sig_cache = mp.bls_sig_cache.shallow_clone();
764 let pending = mp.pending.clone();
765 let republished = mp.republished.clone();
766 let key_cache = mp.key_cache.shallow_clone();
767 let state_nonce_cache = mp.state_nonce_cache.shallow_clone();
768
769 let current_ts = mp.cur_tipset.clone();
770 let repub_trigger = mp.repub_trigger.clone();
771
772 services.spawn(async move {
774 loop {
775 match head_changes_rx.recv().await {
776 Ok(HeadChanges { reverts, applies }) => {
777 if let Err(e) = head_change(
778 api.as_ref(),
779 &bls_sig_cache,
780 repub_trigger.clone(),
781 republished.as_ref(),
782 pending.as_ref(),
783 ¤t_ts,
784 &key_cache,
785 &state_nonce_cache,
786 reverts,
787 applies,
788 )
789 .await
790 {
791 tracing::warn!("Error changing head: {e}");
792 }
793 }
794 Err(RecvError::Lagged(e)) => {
795 warn!("Head change subscriber lagged: skipping {e} events");
796 }
797 Err(RecvError::Closed) => {
798 break Ok(());
799 }
800 }
801 }
802 });
803
804 let api = mp.api.clone();
805 let pending = mp.pending.clone();
806 let cur_tipset = mp.cur_tipset.clone();
807 let republished = mp.republished.clone();
808 let local_addrs = mp.local_addrs.clone();
809 let key_cache = mp.key_cache.shallow_clone();
810 let network_sender = Arc::new(mp.network_sender.clone());
811 let republish_interval = u64::from(10 * block_delay + chain_config.propagation_delay_secs);
812 services.spawn(async move {
814 let mut repub_trigger_rx = repub_trigger_rx.stream();
815 let mut interval = interval(Duration::from_secs(republish_interval));
816 loop {
817 tokio::select! {
818 _ = interval.tick() => (),
819 _ = repub_trigger_rx.next() => (),
820 }
821 if let Err(e) = republish_pending_messages(
822 api.as_ref(),
823 network_sender.as_ref(),
824 pending.as_ref(),
825 cur_tipset.as_ref(),
826 republished.as_ref(),
827 local_addrs.as_ref(),
828 &key_cache,
829 &chain_config,
830 )
831 .await
832 {
833 warn!("Failed to republish pending messages: {}", e.to_string());
834 }
835 }
836 });
837 Ok(mp)
838 }
839}
840
841#[allow(clippy::too_many_arguments)]
848pub(in crate::message_pool) fn add_helper<T>(
849 api: &T,
850 bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
851 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
852 key_cache: &IdToAddressCache,
853 cur_ts: &Tipset,
854 msg: SignedMessage,
855 sequence: u64,
856 trust_policy: TrustPolicy,
857 strictness: StrictnessPolicy,
858) -> Result<(), Error>
859where
860 T: Provider,
861{
862 if msg.signature().signature_type() == SignatureType::Bls {
863 bls_sig_cache.push(msg.cid().into(), msg.signature().clone());
864 }
865
866 api.put_message(&ChainMessage::Signed(msg.clone().into()))?;
867 api.put_message(&ChainMessage::Unsigned(msg.message().clone().into()))?;
868
869 let resolved_from = resolve_to_key(api, key_cache, &msg.from(), cur_ts)?;
870 let mut pending = pending.write();
871 let mset = pending
872 .entry(resolved_from)
873 .or_insert_with(|| MsgSet::new(sequence));
874 match trust_policy {
875 TrustPolicy::Trusted => mset.add_trusted(api, msg, strictness)?,
876 TrustPolicy::Untrusted => mset.add_untrusted(api, msg, strictness)?,
877 }
878
879 Ok(())
880}
881
882fn verify_msg_before_add(
883 m: &SignedMessage,
884 cur_ts: &Tipset,
885 local: bool,
886 chain_config: &ChainConfig,
887) -> Result<bool, Error> {
888 let epoch = cur_ts.epoch();
889 let min_gas = price_list_by_network_version(chain_config.network_version(epoch))
890 .on_chain_message(m.chain_length()?);
891 valid_for_block_inclusion(m.message(), min_gas.total(), NEWEST_NETWORK_VERSION)?;
892 if !cur_ts.block_headers().is_empty() {
893 let base_fee = &cur_ts.block_headers().first().parent_base_fee;
894 let base_fee_lower_bound =
895 get_base_fee_lower_bound(base_fee, BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE);
896 if m.gas_fee_cap() < base_fee_lower_bound {
897 if local {
898 warn!(
899 "local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound: {})",
900 m.gas_fee_cap(),
901 base_fee_lower_bound
902 );
903 return Ok(false);
904 }
905 return Err(Error::SoftValidationFailure(format!(
906 "GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound:{})",
907 m.gas_fee_cap(),
908 base_fee_lower_bound
909 )));
910 }
911 }
912 Ok(local)
913}
914
915pub fn remove(
918 from: &Address,
919 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
920 sequence: u64,
921 applied: bool,
922) -> Result<(), Error> {
923 let mut pending = pending.write();
924 let mset = if let Some(mset) = pending.get_mut(from) {
925 mset
926 } else {
927 return Ok(());
928 };
929
930 mset.rm(sequence, applied);
931
932 if mset.msgs.is_empty() {
933 pending.remove(from);
934 }
935
936 Ok(())
937}
938
939#[cfg(test)]
940mod tests {
941 use crate::blocks::RawBlockHeader;
942 use crate::chain::ChainStore;
943 use crate::db::MemoryDB;
944 use crate::message_pool::provider::Provider;
945 use crate::message_pool::test_provider::TestApi;
946 use crate::networks::ChainConfig;
947 use crate::shim::econ::TokenAmount;
948 use crate::shim::state_tree::{ActorState, StateTree, StateTreeVersion};
949 use crate::utils::db::CborStoreExt as _;
950
951 use super::*;
952 use crate::shim::message::Message as ShimMessage;
953
954 fn make_smsg(from: Address, seq: u64, premium: u64) -> SignedMessage {
955 SignedMessage::mock_bls_signed_message(ShimMessage {
956 from,
957 sequence: seq,
958 gas_premium: TokenAmount::from_atto(premium),
959 gas_limit: 1_000_000,
960 ..ShimMessage::default()
961 })
962 }
963
964 #[test]
967 fn add_helper_message_gas_limit_test() {
968 let api = TestApi::default();
969 let bls_sig_cache = SizeTrackingLruCache::new_mocked();
970 let key_cache = SizeTrackingLruCache::new_mocked();
971 let pending = SyncRwLock::new(HashMap::new());
972 let cur_ts = api.get_heaviest_tipset();
973 let message = ShimMessage {
974 gas_limit: 666_666_666,
975 ..ShimMessage::default()
976 };
977 let msg = SignedMessage::mock_bls_signed_message(message);
978 let sequence = msg.message().sequence;
979 let res = add_helper(
980 &api,
981 &bls_sig_cache,
982 &pending,
983 &key_cache,
984 &cur_ts,
985 msg,
986 sequence,
987 TrustPolicy::Trusted,
988 StrictnessPolicy::Relaxed,
989 );
990 assert!(res.is_ok());
991 }
992
993 #[test]
996 fn test_rbf_at_capacity() {
997 let api = TestApi::with_max_actor_pending_messages(10);
998 let mut mset = MsgSet::new(0);
999
1000 for i in 0..10 {
1002 let res = mset.add_trusted(
1003 &api,
1004 make_smsg(Address::default(), i, 100),
1005 StrictnessPolicy::Relaxed,
1006 );
1007 assert!(res.is_ok(), "Failed to add message {i}");
1008 }
1009
1010 let res = mset.add_trusted(
1012 &api,
1013 make_smsg(Address::default(), 10, 100),
1014 StrictnessPolicy::Relaxed,
1015 );
1016 assert!(matches!(res, Err(Error::TooManyPendingMessages(_, _))));
1017
1018 let res = mset.add_trusted(
1021 &api,
1022 make_smsg(Address::default(), 5, 200),
1023 StrictnessPolicy::Relaxed,
1024 );
1025 assert!(res.is_ok(), "RBF should be allowed at capacity");
1026 }
1027
1028 #[test]
1029 fn test_resolve_to_key_returns_non_id_unchanged() {
1030 let api = TestApi::default();
1031 let key_cache = SizeTrackingLruCache::new_mocked();
1032 let ts = api.get_heaviest_tipset();
1033
1034 let bls_addr = Address::new_bls(&[1u8; 48]).unwrap();
1035 let result = resolve_to_key(&api, &key_cache, &bls_addr, &ts).unwrap();
1036 assert_eq!(result, bls_addr);
1037 assert_eq!(
1038 key_cache.len(),
1039 0,
1040 "cache should not be populated for non-ID addresses"
1041 );
1042 }
1043
1044 #[test]
1045 fn test_resolve_to_key_resolves_id_and_caches() {
1046 let api = TestApi::default();
1047 let key_cache = SizeTrackingLruCache::new_mocked();
1048 let ts = api.get_heaviest_tipset();
1049
1050 let id_addr = Address::new_id(100);
1051 let key_addr = Address::new_bls(&[5u8; 48]).unwrap();
1052 api.set_key_address_mapping(&id_addr, &key_addr);
1053
1054 let result = resolve_to_key(&api, &key_cache, &id_addr, &ts).unwrap();
1055 assert_eq!(result, key_addr);
1056 assert_eq!(
1057 key_cache.len(),
1058 1,
1059 "cache should have one entry after resolution"
1060 );
1061
1062 let result2 = resolve_to_key(&api, &key_cache, &id_addr, &ts).unwrap();
1064 assert_eq!(result2, key_addr);
1065 }
1066
1067 #[test]
1068 fn test_add_helper_keys_pending_by_resolved_address() {
1069 let api = TestApi::default();
1070 let bls_sig_cache = SizeTrackingLruCache::new_mocked();
1071 let key_cache = SizeTrackingLruCache::new_mocked();
1072 let pending = SyncRwLock::new(HashMap::new());
1073 let cur_ts = api.get_heaviest_tipset();
1074
1075 let id_addr = Address::new_id(200);
1076 let key_addr = Address::new_bls(&[7u8; 48]).unwrap();
1077 api.set_key_address_mapping(&id_addr, &key_addr);
1078 api.set_state_sequence(&key_addr, 0);
1079
1080 let message = ShimMessage {
1081 from: id_addr,
1082 gas_limit: 1_000_000,
1083 ..ShimMessage::default()
1084 };
1085 let msg = SignedMessage::mock_bls_signed_message(message);
1086
1087 add_helper(
1088 &api,
1089 &bls_sig_cache,
1090 &pending,
1091 &key_cache,
1092 &cur_ts,
1093 msg,
1094 0,
1095 TrustPolicy::Trusted,
1096 StrictnessPolicy::Relaxed,
1097 )
1098 .unwrap();
1099
1100 let pending_read = pending.read();
1101 assert!(
1102 pending_read.get(&key_addr).is_some(),
1103 "pending should be keyed by the resolved key address"
1104 );
1105 assert!(
1106 pending_read.get(&id_addr).is_none(),
1107 "pending should NOT have an entry under the raw ID address"
1108 );
1109 }
1110
1111 #[test]
1112 fn test_get_sequence_works_with_both_address_forms() {
1113 let api = TestApi::default();
1114 let bls_sig_cache = SizeTrackingLruCache::new_mocked();
1115 let key_cache = SizeTrackingLruCache::new_mocked();
1116 let pending = SyncRwLock::new(HashMap::new());
1117 let cur_ts = api.get_heaviest_tipset();
1118
1119 let id_addr = Address::new_id(300);
1120 let key_addr = Address::new_bls(&[9u8; 48]).unwrap();
1121 api.set_key_address_mapping(&id_addr, &key_addr);
1122 api.set_state_sequence(&key_addr, 0);
1123
1124 for seq in 0..2 {
1126 let message = ShimMessage {
1127 from: id_addr,
1128 sequence: seq,
1129 gas_limit: 1_000_000,
1130 ..ShimMessage::default()
1131 };
1132 let msg = SignedMessage::mock_bls_signed_message(message);
1133 add_helper(
1134 &api,
1135 &bls_sig_cache,
1136 &pending,
1137 &key_cache,
1138 &cur_ts,
1139 msg,
1140 0,
1141 TrustPolicy::Trusted,
1142 StrictnessPolicy::Relaxed,
1143 )
1144 .unwrap();
1145 }
1146
1147 let state_seq = api.get_actor_after(&id_addr, &cur_ts).unwrap().sequence;
1148 let resolved_for_id = resolve_to_key(&api, &key_cache, &id_addr, &cur_ts).unwrap();
1149 let resolved_for_key = resolve_to_key(&api, &key_cache, &key_addr, &cur_ts).unwrap();
1150 assert_eq!(resolved_for_id, resolved_for_key);
1151
1152 let mset = pending.read();
1153 let next_seq = mset.get(&resolved_for_id).unwrap().next_sequence;
1154 let expected = std::cmp::max(state_seq, next_seq);
1155 assert_eq!(expected, 2, "should reflect both pending messages");
1156 }
1157
1158 #[test]
1159 fn test_gap_filling_advances_next_sequence() {
1160 let api = TestApi::default();
1161 let mut mset = MsgSet::new(0);
1162
1163 mset.add_trusted(
1164 &api,
1165 make_smsg(Address::default(), 0, 100),
1166 StrictnessPolicy::Relaxed,
1167 )
1168 .unwrap();
1169 assert_eq!(mset.next_sequence, 1);
1170
1171 mset.add_trusted(
1172 &api,
1173 make_smsg(Address::default(), 2, 100),
1174 StrictnessPolicy::Relaxed,
1175 )
1176 .unwrap();
1177 assert_eq!(mset.next_sequence, 1, "gap at 1, so next_sequence stays");
1178
1179 mset.add_trusted(
1180 &api,
1181 make_smsg(Address::default(), 1, 100),
1182 StrictnessPolicy::Relaxed,
1183 )
1184 .unwrap();
1185 assert_eq!(
1186 mset.next_sequence, 3,
1187 "filling the gap should advance past all consecutive messages"
1188 );
1189 }
1190
1191 #[test]
1192 fn test_trusted_allows_any_nonce_gap() {
1193 let api = TestApi::default();
1194 let mut mset = MsgSet::new(0);
1195
1196 mset.add_trusted(
1197 &api,
1198 make_smsg(Address::default(), 0, 100),
1199 StrictnessPolicy::Relaxed,
1200 )
1201 .unwrap();
1202 let res = mset.add_trusted(
1203 &api,
1204 make_smsg(Address::default(), 10, 100),
1205 StrictnessPolicy::Relaxed,
1206 );
1207 assert!(
1208 res.is_ok(),
1209 "trusted adds skip nonce gap enforcement (StrictnessPolicy::Relaxed)"
1210 );
1211 }
1212
1213 #[test]
1214 fn test_strict_allows_small_nonce_gap() {
1215 let api = TestApi::default();
1216 let mut mset = MsgSet::new(0);
1217
1218 mset.add(
1220 &api,
1221 make_smsg(Address::default(), 0, 100),
1222 StrictnessPolicy::Strict,
1223 true,
1224 )
1225 .unwrap();
1226 let res = mset.add(
1227 &api,
1228 make_smsg(Address::default(), 3, 100),
1229 StrictnessPolicy::Strict,
1230 true,
1231 );
1232 assert!(
1233 res.is_ok(),
1234 "strict+trusted: gap of 2 (within MAX_NONCE_GAP=4) should succeed"
1235 );
1236 }
1237
1238 #[test]
1239 fn test_strict_rejects_large_nonce_gap() {
1240 let api = TestApi::default();
1241 let mut mset = MsgSet::new(0);
1242
1243 mset.add(
1245 &api,
1246 make_smsg(Address::default(), 0, 100),
1247 StrictnessPolicy::Strict,
1248 true,
1249 )
1250 .unwrap();
1251 let res = mset.add(
1252 &api,
1253 make_smsg(Address::default(), 6, 100),
1254 StrictnessPolicy::Strict,
1255 true,
1256 );
1257 assert_eq!(
1258 res,
1259 Err(Error::NonceGap),
1260 "strict+trusted: gap of 5 (exceeds MAX_NONCE_GAP=4) should be rejected"
1261 );
1262 }
1263
1264 #[test]
1265 fn test_strict_untrusted_rejects_any_gap() {
1266 let api = TestApi::default();
1267 let mut mset = MsgSet::new(0);
1268
1269 mset.add(
1271 &api,
1272 make_smsg(Address::default(), 0, 100),
1273 StrictnessPolicy::Strict,
1274 false,
1275 )
1276 .unwrap();
1277 let res = mset.add(
1278 &api,
1279 make_smsg(Address::default(), 2, 100),
1280 StrictnessPolicy::Strict,
1281 false,
1282 );
1283 assert_eq!(
1284 res,
1285 Err(Error::NonceGap),
1286 "strict+untrusted: any gap (maxNonceGap=0) is rejected"
1287 );
1288 }
1289
1290 #[test]
1291 fn test_non_strict_untrusted_skips_gap_check() {
1292 let api = TestApi::default();
1293 let mut mset = MsgSet::new(0);
1294
1295 mset.add_untrusted(
1297 &api,
1298 make_smsg(Address::default(), 0, 100),
1299 StrictnessPolicy::Relaxed,
1300 )
1301 .unwrap();
1302 let res = mset.add_untrusted(
1303 &api,
1304 make_smsg(Address::default(), 5, 100),
1305 StrictnessPolicy::Relaxed,
1306 );
1307 assert!(
1308 res.is_ok(),
1309 "non-strict untrusted (PushUntrusted) skips gap enforcement"
1310 );
1311 }
1312
1313 #[test]
1314 fn test_strict_rbf_during_gap_rejected() {
1315 let api = TestApi::default();
1316 let mut mset = MsgSet::new(0);
1317
1318 mset.add_trusted(
1320 &api,
1321 make_smsg(Address::default(), 0, 100),
1322 StrictnessPolicy::Relaxed,
1323 )
1324 .unwrap();
1325 mset.add_trusted(
1326 &api,
1327 make_smsg(Address::default(), 2, 100),
1328 StrictnessPolicy::Relaxed,
1329 )
1330 .unwrap();
1331
1332 let res = mset.add(
1334 &api,
1335 make_smsg(Address::default(), 2, 200),
1336 StrictnessPolicy::Strict,
1337 true,
1338 );
1339 assert_eq!(
1340 res,
1341 Err(Error::NonceGap),
1342 "strict RBF should be rejected when nonce gap exists"
1343 );
1344 }
1345
1346 #[test]
1347 fn test_rbf_without_gap_still_works() {
1348 let api = TestApi::default();
1349 let mut mset = MsgSet::new(0);
1350
1351 mset.add_trusted(
1352 &api,
1353 make_smsg(Address::default(), 0, 100),
1354 StrictnessPolicy::Relaxed,
1355 )
1356 .unwrap();
1357 mset.add_trusted(
1358 &api,
1359 make_smsg(Address::default(), 1, 100),
1360 StrictnessPolicy::Relaxed,
1361 )
1362 .unwrap();
1363 mset.add_trusted(
1364 &api,
1365 make_smsg(Address::default(), 2, 100),
1366 StrictnessPolicy::Relaxed,
1367 )
1368 .unwrap();
1369
1370 let res = mset.add_trusted(
1371 &api,
1372 make_smsg(Address::default(), 1, 200),
1373 StrictnessPolicy::Relaxed,
1374 );
1375 assert!(res.is_ok(), "RBF without a nonce gap should succeed");
1376 }
1377
1378 #[test]
1379 fn test_get_state_sequence_accounts_for_tipset_messages() {
1380 use crate::message_pool::test_provider::mock_block;
1381
1382 let api = TestApi::default();
1383 let key_cache = SizeTrackingLruCache::new_mocked();
1384 let state_nonce_cache = SizeTrackingLruCache::new_mocked();
1385
1386 let sender = Address::new_bls(&[3u8; 48]).unwrap();
1387 api.set_state_sequence(&sender, 5);
1388
1389 let block = mock_block(1, 1);
1390 api.inner.lock().set_block_messages(
1391 &block,
1392 vec![make_smsg(sender, 5, 100), make_smsg(sender, 7, 100)],
1393 );
1394 let ts = Tipset::from(block);
1395
1396 let nonce = get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap();
1397 assert_eq!(
1398 nonce, 8,
1399 "should account for non-consecutive tipset message at nonce 7"
1400 );
1401 }
1402
1403 #[test]
1404 fn test_get_state_sequence_ignores_other_addresses() {
1405 use crate::message_pool::test_provider::mock_block;
1406
1407 let api = TestApi::default();
1408 let key_cache = SizeTrackingLruCache::new_mocked();
1409 let state_nonce_cache = SizeTrackingLruCache::new_mocked();
1410
1411 let addr_a = Address::new_bls(&[4u8; 48]).unwrap();
1412 let addr_b = Address::new_bls(&[5u8; 48]).unwrap();
1413 api.set_state_sequence(&addr_a, 0);
1414 api.set_state_sequence(&addr_b, 0);
1415
1416 let block = mock_block(1, 1);
1417 api.inner.lock().set_block_messages(
1418 &block,
1419 vec![
1420 make_smsg(addr_b, 0, 100),
1421 make_smsg(addr_b, 1, 100),
1422 make_smsg(addr_b, 2, 100),
1423 ],
1424 );
1425 let ts = Tipset::from(block);
1426
1427 let nonce_a =
1428 get_state_sequence(&api, &key_cache, &state_nonce_cache, &addr_a, &ts).unwrap();
1429 assert_eq!(
1430 nonce_a, 0,
1431 "addr_a nonce should be unaffected by addr_b's messages"
1432 );
1433
1434 let nonce_b =
1435 get_state_sequence(&api, &key_cache, &state_nonce_cache, &addr_b, &ts).unwrap();
1436 assert_eq!(
1437 nonce_b, 3,
1438 "addr_b nonce should reflect its tipset messages"
1439 );
1440 }
1441
1442 #[test]
1443 fn test_get_state_sequence_cache_hit() {
1444 use crate::message_pool::test_provider::mock_block;
1445
1446 let api = TestApi::default();
1447 let key_cache = SizeTrackingLruCache::new_mocked();
1448 let state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64> =
1449 SizeTrackingLruCache::new_mocked();
1450
1451 let sender = Address::new_bls(&[6u8; 48]).unwrap();
1452 api.set_state_sequence(&sender, 5);
1453
1454 let block = mock_block(1, 1);
1455 api.inner
1456 .lock()
1457 .set_block_messages(&block, vec![make_smsg(sender, 5, 100)]);
1458 let ts = Tipset::from(block);
1459
1460 let nonce1 =
1461 get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap();
1462 assert_eq!(nonce1, 6);
1463
1464 api.set_state_sequence(&sender, 99);
1466 let nonce2 =
1467 get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap();
1468 assert_eq!(
1469 nonce2, 6,
1470 "second call should return the cached value, not re-read state"
1471 );
1472 }
1473
1474 #[test]
1475 fn test_get_state_sequence_cache_miss_on_different_tipset() {
1476 use crate::message_pool::test_provider::mock_block;
1477
1478 let api = TestApi::default();
1479 let key_cache = SizeTrackingLruCache::new_mocked();
1480 let state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64> =
1481 SizeTrackingLruCache::new_mocked();
1482
1483 let sender = Address::new_bls(&[7u8; 48]).unwrap();
1484 api.set_state_sequence(&sender, 10);
1485
1486 let block_a = mock_block(1, 1);
1487 let ts_a = Tipset::from(&block_a);
1488
1489 let nonce_a =
1490 get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts_a).unwrap();
1491 assert_eq!(nonce_a, 10);
1492
1493 api.set_state_sequence(&sender, 20);
1495 let block_b = mock_block(2, 2);
1496 let ts_b = Tipset::from(&block_b);
1497
1498 let nonce_b =
1499 get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts_b).unwrap();
1500 assert_eq!(
1501 nonce_b, 20,
1502 "different tipset should miss the cache and read fresh state"
1503 );
1504 }
1505
1506 #[test]
1507 fn resolve_to_key_uses_finality_lookback() {
1508 let db = Arc::new(MemoryDB::default());
1509
1510 let mut cfg = ChainConfig::default();
1511 cfg.policy.chain_finality = 1;
1512 let cfg = Arc::new(cfg);
1513
1514 let bls_a = Address::new_bls(&[8u8; 48]).unwrap();
1515 let bls_b = Address::new_bls(&[9u8; 48]).unwrap();
1516
1517 let mut st_a = StateTree::new(db.clone(), StateTreeVersion::V5).unwrap();
1519 st_a.set_actor(
1520 &Address::new_id(300),
1521 ActorState::new_empty(Cid::default(), Some(bls_a)),
1522 )
1523 .unwrap();
1524 let root_a = st_a.flush().unwrap();
1525
1526 let mut st_b = StateTree::new(db.clone(), StateTreeVersion::V5).unwrap();
1528 st_b.set_actor(
1529 &Address::new_id(400),
1530 ActorState::new_empty(Cid::default(), Some(bls_b)),
1531 )
1532 .unwrap();
1533 let root_b = st_b.flush().unwrap();
1534
1535 let genesis = Tipset::from(CachingBlockHeader::new(RawBlockHeader {
1536 state_root: root_a,
1537 ..Default::default()
1538 }));
1539 db.put_cbor_default(genesis.block_headers().first())
1540 .unwrap();
1541
1542 let ts1 = Tipset::from(CachingBlockHeader::new(RawBlockHeader {
1543 parents: genesis.key().clone(),
1544 epoch: 1,
1545 state_root: root_a,
1546 timestamp: 1,
1547 ..Default::default()
1548 }));
1549 db.put_cbor_default(ts1.block_headers().first()).unwrap();
1550
1551 let head = Tipset::from(CachingBlockHeader::new(RawBlockHeader {
1552 parents: ts1.key().clone(),
1553 epoch: 2,
1554 state_root: root_b,
1555 timestamp: 2,
1556 ..Default::default()
1557 }));
1558 db.put_cbor_default(head.block_headers().first()).unwrap();
1559
1560 let cs = ChainStore::new(
1561 db.clone(),
1562 db.clone(),
1563 db,
1564 cfg,
1565 genesis.block_headers().first().clone(),
1566 )
1567 .unwrap();
1568
1569 let result = Provider::resolve_to_deterministic_address_at_finality(
1571 &cs,
1572 &Address::new_id(300),
1573 &head,
1574 )
1575 .unwrap();
1576 assert_eq!(result, bls_a);
1577
1578 Provider::resolve_to_deterministic_address_at_finality(&cs, &Address::new_id(400), &head)
1580 .expect_err("actor only in head state must not resolve via finality lookback");
1581 }
1582}