use crate::message::{MessageRead as _, SignedMessage};
use crate::message_pool::{
Error,
msg_chain::{Chains, create_message_chains},
msgpool::{MIN_GAS, msg_pool::MessagePool},
provider::Provider,
utils::get_base_fee_lower_bound,
};
use crate::prelude::*;
use crate::shim::address::Address;
use ahash::{HashMap, HashSet};
use parking_lot::RwLock as SyncRwLock;
const REPUB_TRIGGER_CAPACITY: usize = 1;
const BASE_FEE_LOWER_BOUND_FACTOR: i64 = 10;
const REPUB_MSG_LIMIT: usize = 30;
pub(in crate::message_pool) struct RepublishState {
republished: SyncRwLock<HashSet<Cid>>,
trigger: flume::Sender<()>,
}
impl RepublishState {
pub(in crate::message_pool) fn new() -> (Self, flume::Receiver<()>) {
let (trigger, rx) = flume::bounded(REPUB_TRIGGER_CAPACITY);
(
Self {
republished: SyncRwLock::default(),
trigger,
},
rx,
)
}
pub(in crate::message_pool) fn was_republished(&self, cid: &Cid) -> bool {
self.republished.read().contains(cid)
}
pub(in crate::message_pool) fn trigger(&self) -> Result<(), Error> {
match self.trigger.try_send(()) {
Ok(()) | Err(flume::TrySendError::Full(_)) => Ok(()),
Err(flume::TrySendError::Disconnected(_)) => {
Err(Error::Other("republish receiver dropped".into()))
}
}
}
pub(in crate::message_pool) fn replace_with<I: IntoIterator<Item = Cid>>(&self, cids: I) {
let mut set = self.republished.write();
set.clear();
set.extend(cids);
}
}
impl<T: Provider> MessagePool<T> {
pub(in crate::message_pool) async fn run_republish_cycle(&self) -> Result<(), Error> {
let ts = self.cur_tipset.read().shallow_clone();
let local: Vec<Address> = self.local_addrs.read().iter().copied().collect();
let mut pending_map: HashMap<Address, HashMap<u64, SignedMessage>> =
HashMap::with_capacity(local.len());
for actor in &local {
if let Some(mset) = self.pending.snapshot_for(actor)
&& !mset.msgs.is_empty()
{
pending_map.insert(*actor, mset.msgs);
}
}
let msgs =
select_messages_to_republish(self.api.as_ref(), &self.chain_config, &ts, pending_map)?;
for m in msgs.iter() {
self.publish_pubsub(m).await?;
}
self.republish.replace_with(msgs.iter().map(|m| m.cid()));
Ok(())
}
}
fn select_messages_to_republish<T>(
api: &T,
chain_config: &crate::networks::ChainConfig,
base: &crate::blocks::Tipset,
pending: HashMap<Address, HashMap<u64, SignedMessage>>,
) -> Result<Vec<SignedMessage>, Error>
where
T: Provider,
{
let mut msgs: Vec<SignedMessage> = vec![];
let base_fee = api.chain_compute_base_fee(base)?;
let base_fee_lower_bound = get_base_fee_lower_bound(&base_fee, BASE_FEE_LOWER_BOUND_FACTOR);
if pending.is_empty() {
return Ok(msgs);
}
let mut chains = Chains::new();
for (actor, mset) in pending.iter() {
create_message_chains(
api,
actor,
mset,
&base_fee_lower_bound,
base,
&mut chains,
chain_config,
)?;
}
if chains.is_empty() {
return Ok(msgs);
}
chains.sort(false);
let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
let mut i = 0;
'l: while let Some(chain) = chains.get_mut_at(i) {
if msgs.len() > REPUB_MSG_LIMIT {
break;
}
if gas_limit <= MIN_GAS {
break;
}
if !chain.valid {
i += 1;
continue;
}
if chain.gas_limit <= gas_limit {
for m in chain.msgs.iter() {
if m.gas_fee_cap() < base_fee_lower_bound {
let key = chains.get_key_at(i);
chains.invalidate(key);
continue 'l;
}
gas_limit = gas_limit.saturating_sub(m.gas_limit());
msgs.push(m.clone());
}
i += 1;
continue;
}
chains.trim_msgs_at(i, gas_limit, REPUB_MSG_LIMIT, &base_fee);
chains.bubble_down_after_trim(i);
}
if msgs.len() > REPUB_MSG_LIMIT {
msgs.truncate(REPUB_MSG_LIMIT);
}
Ok(msgs)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message_pool::msg_chain::MsgChainNode;
use crate::shim::econ::TokenAmount;
fn chains_from_perfs(perfs: &[f64]) -> Chains {
let mut chains = Chains::new();
let mut key_vec = Vec::with_capacity(perfs.len());
for (i, &p) in perfs.iter().enumerate() {
let node = MsgChainNode {
gas_perf: p,
gas_reward: TokenAmount::from_atto(i as u64 + 1),
..Default::default()
};
chains.push_with(node, &mut key_vec);
}
chains.key_vec = key_vec;
chains
}
#[test]
fn bubble_down_after_trim_restores_compare_order() {
let mut chains = chains_from_perfs(&[1.0, 5.0, 3.0, 4.0]);
chains.bubble_down_after_trim(1);
let perfs: Vec<f64> = (0..chains.len()).map(|i| chains[i].gas_perf).collect();
assert_eq!(perfs, vec![1.0, 3.0, 4.0, 5.0]);
}
#[test]
fn was_republished_reflects_replace_with() {
let (state, _rx) = RepublishState::new();
let cid = Cid::default();
assert!(
!state.was_republished(&cid),
"fresh state should not contain any CIDs",
);
state.replace_with([cid]);
assert!(
state.was_republished(&cid),
"replace_with should populate the set",
);
state.replace_with(std::iter::empty());
assert!(
!state.was_republished(&cid),
"replace_with with empty iter should clear the set",
);
}
#[test]
fn trigger_succeeds_when_receiver_is_alive() {
let (state, rx) = RepublishState::new();
state.trigger().expect("send should succeed");
rx.try_recv()
.expect("trigger should be observable on the receiver");
}
#[test]
fn trigger_drops_silently_when_buffer_full() {
let (state, _rx) = RepublishState::new();
state.trigger().expect("first trigger should send");
state
.trigger()
.expect("overflow trigger should be dropped silently");
}
#[test]
fn trigger_errors_when_receiver_disconnected() {
let (state, rx) = RepublishState::new();
drop(rx);
let err = state
.trigger()
.expect_err("disconnected receiver should surface as an error");
assert!(matches!(err, Error::Other(_)));
}
}