use bp_messages::{
source_chain::OnMessagesDelivered,
target_chain::{DispatchMessage, MessageDispatch},
LaneId, MessageNonce,
};
use bp_runtime::messages::MessageDispatchResult;
pub use bp_xcm_bridge_hub::XcmAsPlainPayload;
use bp_xcm_bridge_hub_router::XcmChannelStatusProvider;
use codec::{Decode, Encode};
use frame_support::{traits::Get, weights::Weight, CloneNoBound, EqNoBound, PartialEqNoBound};
use pallet_bridge_messages::{
Config as MessagesConfig, OutboundLanesCongestedSignals, WeightInfoExt as MessagesPalletWeights,
};
use scale_info::TypeInfo;
use sp_runtime::SaturatedConversion;
use sp_std::{fmt::Debug, marker::PhantomData};
use xcm::prelude::*;
use xcm_builder::{DispatchBlob, DispatchBlobError};
#[derive(CloneNoBound, EqNoBound, PartialEqNoBound, Encode, Decode, Debug, TypeInfo)]
pub enum XcmBlobMessageDispatchResult {
InvalidPayload,
Dispatched,
NotDispatched(#[codec(skip)] Option<DispatchBlobError>),
}
pub struct XcmBlobMessageDispatch<DispatchBlob, Weights, Channel> {
_marker: sp_std::marker::PhantomData<(DispatchBlob, Weights, Channel)>,
}
impl<
BlobDispatcher: DispatchBlob,
Weights: MessagesPalletWeights,
Channel: XcmChannelStatusProvider,
> MessageDispatch for XcmBlobMessageDispatch<BlobDispatcher, Weights, Channel>
{
type DispatchPayload = XcmAsPlainPayload;
type DispatchLevelResult = XcmBlobMessageDispatchResult;
fn is_active() -> bool {
!Channel::is_congested()
}
fn dispatch_weight(message: &mut DispatchMessage<Self::DispatchPayload>) -> Weight {
match message.data.payload {
Ok(ref payload) => {
let payload_size = payload.encoded_size().saturated_into();
Weights::message_dispatch_weight(payload_size)
},
Err(_) => Weight::zero(),
}
}
fn dispatch(
message: DispatchMessage<Self::DispatchPayload>,
) -> MessageDispatchResult<Self::DispatchLevelResult> {
let payload = match message.data.payload {
Ok(payload) => payload,
Err(e) => {
log::error!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"[XcmBlobMessageDispatch] payload error: {:?} - message_nonce: {:?}",
e,
message.key.nonce
);
return MessageDispatchResult {
unspent_weight: Weight::zero(),
dispatch_level_result: XcmBlobMessageDispatchResult::InvalidPayload,
}
},
};
let dispatch_level_result = match BlobDispatcher::dispatch_blob(payload) {
Ok(_) => {
log::debug!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"[XcmBlobMessageDispatch] DispatchBlob::dispatch_blob was ok - message_nonce: {:?}",
message.key.nonce
);
XcmBlobMessageDispatchResult::Dispatched
},
Err(e) => {
log::error!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"[XcmBlobMessageDispatch] DispatchBlob::dispatch_blob failed, error: {:?} - message_nonce: {:?}",
e, message.key.nonce
);
XcmBlobMessageDispatchResult::NotDispatched(Some(e))
},
};
MessageDispatchResult { unspent_weight: Weight::zero(), dispatch_level_result }
}
}
#[cfg_attr(feature = "std", derive(Debug, Eq, PartialEq))]
pub struct SenderAndLane {
pub location: Location,
pub lane: LaneId,
}
impl SenderAndLane {
pub fn new(location: Location, lane: LaneId) -> Self {
SenderAndLane { location, lane }
}
}
pub trait XcmBlobHauler {
type Runtime: MessagesConfig<Self::MessagesInstance>;
type MessagesInstance: 'static;
type ToSourceChainSender: SendXcm;
type CongestedMessage: Get<Option<Xcm<()>>>;
type UncongestedMessage: Get<Option<Xcm<()>>>;
fn supports_congestion_detection() -> bool {
Self::CongestedMessage::get().is_some() || Self::UncongestedMessage::get().is_some()
}
}
pub struct XcmBlobHaulerAdapter<XcmBlobHauler, Lanes>(
sp_std::marker::PhantomData<(XcmBlobHauler, Lanes)>,
);
impl<
H: XcmBlobHauler,
Lanes: Get<sp_std::vec::Vec<(SenderAndLane, (NetworkId, InteriorLocation))>>,
> OnMessagesDelivered for XcmBlobHaulerAdapter<H, Lanes>
{
fn on_messages_delivered(lane: LaneId, enqueued_messages: MessageNonce) {
if let Some(sender_and_lane) =
Lanes::get().iter().find(|link| link.0.lane == lane).map(|link| &link.0)
{
LocalXcmQueueManager::<H>::on_bridge_messages_delivered(
sender_and_lane,
enqueued_messages,
);
}
}
}
pub struct LocalXcmQueueManager<H>(PhantomData<H>);
const OUTBOUND_LANE_CONGESTED_THRESHOLD: MessageNonce = 8_192;
const OUTBOUND_LANE_UNCONGESTED_THRESHOLD: MessageNonce = 1_024;
impl<H: XcmBlobHauler> LocalXcmQueueManager<H> {
pub fn on_bridge_message_enqueued(
sender_and_lane: &SenderAndLane,
enqueued_messages: MessageNonce,
) {
if !H::supports_congestion_detection() {
return
}
if Self::is_congested_signal_sent(sender_and_lane.lane) {
return
}
let is_congested = enqueued_messages > OUTBOUND_LANE_CONGESTED_THRESHOLD;
if !is_congested {
return
}
log::info!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"Sending 'congested' XCM message to {:?} to avoid overloading lane {:?}: there are\
{} messages queued at the bridge queue",
sender_and_lane.location,
sender_and_lane.lane,
enqueued_messages,
);
if let Err(e) = Self::send_congested_signal(sender_and_lane) {
log::info!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"Failed to send the 'congested' XCM message to {:?}: {:?}",
sender_and_lane.location,
e,
);
}
}
pub fn on_bridge_messages_delivered(
sender_and_lane: &SenderAndLane,
enqueued_messages: MessageNonce,
) {
if !H::supports_congestion_detection() {
return
}
if !Self::is_congested_signal_sent(sender_and_lane.lane) {
return
}
let is_congested = enqueued_messages > OUTBOUND_LANE_UNCONGESTED_THRESHOLD;
if is_congested {
return
}
log::info!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"Sending 'uncongested' XCM message to {:?}. Lane {:?}: there are\
{} messages queued at the bridge queue",
sender_and_lane.location,
sender_and_lane.lane,
enqueued_messages,
);
if let Err(e) = Self::send_uncongested_signal(sender_and_lane) {
log::info!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"Failed to send the 'uncongested' XCM message to {:?}: {:?}",
sender_and_lane.location,
e,
);
}
}
fn is_congested_signal_sent(lane: LaneId) -> bool {
OutboundLanesCongestedSignals::<H::Runtime, H::MessagesInstance>::get(lane)
}
fn send_congested_signal(sender_and_lane: &SenderAndLane) -> Result<(), SendError> {
if let Some(msg) = H::CongestedMessage::get() {
send_xcm::<H::ToSourceChainSender>(sender_and_lane.location.clone(), msg)?;
OutboundLanesCongestedSignals::<H::Runtime, H::MessagesInstance>::insert(
sender_and_lane.lane,
true,
);
}
Ok(())
}
fn send_uncongested_signal(sender_and_lane: &SenderAndLane) -> Result<(), SendError> {
if let Some(msg) = H::UncongestedMessage::get() {
send_xcm::<H::ToSourceChainSender>(sender_and_lane.location.clone(), msg)?;
OutboundLanesCongestedSignals::<H::Runtime, H::MessagesInstance>::remove(
sender_and_lane.lane,
);
}
Ok(())
}
}
pub struct XcmVersionOfDestAndRemoteBridge<Version, RemoteBridge>(
sp_std::marker::PhantomData<(Version, RemoteBridge)>,
);
impl<Version: GetVersion, RemoteBridge: Get<Location>> GetVersion
for XcmVersionOfDestAndRemoteBridge<Version, RemoteBridge>
{
fn get_version_for(dest: &Location) -> Option<XcmVersion> {
let dest_version = Version::get_version_for(dest);
let bridge_hub_version = Version::get_version_for(&RemoteBridge::get());
match (dest_version, bridge_hub_version) {
(Some(dv), Some(bhv)) => Some(sp_std::cmp::min(dv, bhv)),
(Some(dv), None) => Some(dv),
(None, Some(bhv)) => Some(bhv),
(None, None) => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::*;
use bp_messages::OutboundLaneData;
use frame_support::parameter_types;
use pallet_bridge_messages::OutboundLanes;
parameter_types! {
pub TestSenderAndLane: SenderAndLane = SenderAndLane {
location: Location::new(1, [Parachain(1000)]),
lane: TEST_LANE_ID,
};
pub TestLanes: sp_std::vec::Vec<(SenderAndLane, (NetworkId, InteriorLocation))> = sp_std::vec![
(TestSenderAndLane::get(), (NetworkId::ByGenesis([0; 32]), InteriorLocation::Here))
];
pub DummyXcmMessage: Xcm<()> = Xcm::new();
}
struct DummySendXcm;
impl DummySendXcm {
fn messages_sent() -> u32 {
frame_support::storage::unhashed::get(b"DummySendXcm").unwrap_or(0)
}
}
impl SendXcm for DummySendXcm {
type Ticket = ();
fn validate(
_destination: &mut Option<Location>,
_message: &mut Option<Xcm<()>>,
) -> SendResult<Self::Ticket> {
Ok(((), Default::default()))
}
fn deliver(_ticket: Self::Ticket) -> Result<XcmHash, SendError> {
let messages_sent: u32 = Self::messages_sent();
frame_support::storage::unhashed::put(b"DummySendXcm", &(messages_sent + 1));
Ok(XcmHash::default())
}
}
struct TestBlobHauler;
impl XcmBlobHauler for TestBlobHauler {
type Runtime = TestRuntime;
type MessagesInstance = ();
type ToSourceChainSender = DummySendXcm;
type CongestedMessage = DummyXcmMessage;
type UncongestedMessage = DummyXcmMessage;
}
type TestBlobHaulerAdapter = XcmBlobHaulerAdapter<TestBlobHauler, TestLanes>;
fn fill_up_lane_to_congestion() -> MessageNonce {
let latest_generated_nonce = OUTBOUND_LANE_CONGESTED_THRESHOLD;
OutboundLanes::<TestRuntime, ()>::insert(
TEST_LANE_ID,
OutboundLaneData {
oldest_unpruned_nonce: 0,
latest_received_nonce: 0,
latest_generated_nonce,
},
);
latest_generated_nonce
}
#[test]
fn congested_signal_is_not_sent_twice() {
run_test(|| {
let enqueued = fill_up_lane_to_congestion();
LocalXcmQueueManager::<TestBlobHauler>::on_bridge_message_enqueued(
&TestSenderAndLane::get(),
enqueued + 1,
);
assert_eq!(DummySendXcm::messages_sent(), 1);
LocalXcmQueueManager::<TestBlobHauler>::on_bridge_message_enqueued(
&TestSenderAndLane::get(),
enqueued,
);
assert_eq!(DummySendXcm::messages_sent(), 1);
});
}
#[test]
fn congested_signal_is_not_sent_when_outbound_lane_is_not_congested() {
run_test(|| {
LocalXcmQueueManager::<TestBlobHauler>::on_bridge_message_enqueued(
&TestSenderAndLane::get(),
1,
);
assert_eq!(DummySendXcm::messages_sent(), 0);
});
}
#[test]
fn congested_signal_is_sent_when_outbound_lane_is_congested() {
run_test(|| {
let enqueued = fill_up_lane_to_congestion();
LocalXcmQueueManager::<TestBlobHauler>::on_bridge_message_enqueued(
&TestSenderAndLane::get(),
enqueued + 1,
);
assert_eq!(DummySendXcm::messages_sent(), 1);
assert!(LocalXcmQueueManager::<TestBlobHauler>::is_congested_signal_sent(TEST_LANE_ID));
});
}
#[test]
fn uncongested_signal_is_not_sent_when_messages_are_delivered_at_other_lane() {
run_test(|| {
LocalXcmQueueManager::<TestBlobHauler>::send_congested_signal(&TestSenderAndLane::get()).unwrap();
assert_eq!(DummySendXcm::messages_sent(), 1);
TestBlobHaulerAdapter::on_messages_delivered(LaneId([42, 42, 42, 42]), 0);
assert_eq!(DummySendXcm::messages_sent(), 1);
});
}
#[test]
fn uncongested_signal_is_not_sent_when_we_havent_send_congested_signal_before() {
run_test(|| {
TestBlobHaulerAdapter::on_messages_delivered(TEST_LANE_ID, 0);
assert_eq!(DummySendXcm::messages_sent(), 0);
});
}
#[test]
fn uncongested_signal_is_not_sent_if_outbound_lane_is_still_congested() {
run_test(|| {
LocalXcmQueueManager::<TestBlobHauler>::send_congested_signal(&TestSenderAndLane::get()).unwrap();
assert_eq!(DummySendXcm::messages_sent(), 1);
TestBlobHaulerAdapter::on_messages_delivered(
TEST_LANE_ID,
OUTBOUND_LANE_UNCONGESTED_THRESHOLD + 1,
);
assert_eq!(DummySendXcm::messages_sent(), 1);
});
}
#[test]
fn uncongested_signal_is_sent_if_outbound_lane_is_uncongested() {
run_test(|| {
LocalXcmQueueManager::<TestBlobHauler>::send_congested_signal(&TestSenderAndLane::get()).unwrap();
assert_eq!(DummySendXcm::messages_sent(), 1);
TestBlobHaulerAdapter::on_messages_delivered(
TEST_LANE_ID,
OUTBOUND_LANE_UNCONGESTED_THRESHOLD,
);
assert_eq!(DummySendXcm::messages_sent(), 2);
});
}
}