use crate::blocks::Tipset;
use crate::message::{MessageRead as _, SignedMessage};
use crate::message_pool::msgpool::utils;
use crate::message_pool::{
Error,
msg_pool::{StrictnessPolicy, TrustPolicy},
msgpool::{msg_pool::MessagePool, recover_sig},
provider::Provider,
};
use crate::shim::address::Address;
use crate::utils::ShallowClone as _;
use ahash::{HashMap, HashMapExt};
impl<T> MessagePool<T>
where
T: Provider + 'static,
{
pub(in crate::message_pool) async fn apply_head_change(
&self,
revert: Vec<Tipset>,
apply: Vec<Tipset>,
) -> Result<(), Error> {
let mut repub = false;
let mut rmsgs: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
for ts in revert {
let Ok(pts) = self.api.load_tipset(ts.parents()) else {
tracing::error!("error loading reverted tipset parent");
continue;
};
*self.cur_tipset.write() = pts;
let mut msgs: Vec<SignedMessage> = Vec::new();
for block in ts.block_headers() {
let Ok((umsg, smsgs)) = self.api.messages_for_block(block) else {
tracing::error!("error retrieving messages for reverted block");
continue;
};
msgs.extend(smsgs);
for msg in umsg {
let msg_cid = msg.cid();
let Ok(smsg) = recover_sig(&self.caches.bls_sig, msg) else {
tracing::debug!("could not recover signature for bls message {}", msg_cid);
continue;
};
msgs.push(smsg)
}
}
for msg in msgs {
utils::add_to_selected_msgs(msg, &mut rmsgs);
}
}
for ts in apply {
for b in ts.block_headers() {
let Ok((msgs, smsgs)) = self.api.messages_for_block(b) else {
tracing::error!("error retrieving messages for block");
continue;
};
for msg in smsgs {
self.remove_applied_from_pool(&msg.from(), msg.sequence(), &mut rmsgs, &ts)?;
if !repub && self.republish.was_republished(&msg.cid()) {
repub = true;
}
}
for msg in msgs {
self.remove_applied_from_pool(&msg.from, msg.sequence, &mut rmsgs, &ts)?;
if !repub && self.republish.was_republished(&msg.cid()) {
repub = true;
}
}
}
*self.cur_tipset.write() = ts;
}
if repub {
self.republish.trigger()?;
}
let cur_ts = self.cur_tipset.read().shallow_clone();
for (_, hm) in rmsgs {
for (_, msg) in hm {
if let Err(e) = self.add_to_pool_unchecked(
&cur_ts,
msg,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
) {
tracing::error!("Failed to read message from reorg to mpool: {}", e);
}
}
}
self.pending.shrink_to_fit();
Ok(())
}
fn remove_applied_from_pool(
&self,
from: &Address,
sequence: u64,
rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
ts: &Tipset,
) -> Result<(), Error> {
if rmsgs
.get_mut(from)
.and_then(|temp| temp.remove(&sequence))
.is_none()
&& let Ok(resolved) = self
.resolve_to_key(from, ts)
.inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}"))
{
let _ = self.pending.remove(&resolved, sequence, true);
}
Ok(())
}
}