use crate::{Status, StatusCode};
use ckb_logger::error;
use ckb_network::{CKBProtocolContext, PeerIndex, ProtocolId, SupportProtocols};
use ckb_types::packed::{self, RelayMessageReader, SyncMessageReader};
use ckb_types::prelude::*;
use std::{
fmt::{self, Formatter},
sync::Arc,
};
#[allow(dead_code)]
pub(crate) fn send_message<Message: Entity>(
protocol_id: ProtocolId,
nc: &dyn CKBProtocolContext,
peer_index: PeerIndex,
message: &Message,
) -> Status {
if let Err(err) = nc.send_message(protocol_id, peer_index, message.as_bytes()) {
let name = message_name(protocol_id, message);
let error_message = format!("nc.send_message {name}, error: {err:?}");
error!("{}", error_message);
return StatusCode::Network.with_context(error_message);
}
let bytes = message.as_bytes().len() as u64;
let item_name = item_name(protocol_id, message);
let protocol_name = protocol_name(protocol_id);
metric_ckb_message_bytes(
MetricDirection::Out,
&protocol_name,
&item_name,
None,
bytes,
);
Status::ok()
}
pub(crate) async fn quick_send_message_async<Message: Entity>(
protocol_id: ProtocolId,
nc: &Arc<dyn CKBProtocolContext + Sync>,
peer_index: PeerIndex,
message: &Message,
) -> Status {
if let Err(err) = nc
.async_quick_send_message(protocol_id, peer_index, message.as_bytes())
.await
{
let name = message_name(protocol_id, message);
let error_message = format!("nc.quick_send_message {name}, error: {err:?}");
error!("{}", error_message);
return StatusCode::Network.with_context(error_message);
}
let bytes = message.as_bytes().len() as u64;
let item_name = item_name(protocol_id, message);
let protocol_name = protocol_name(protocol_id);
metric_ckb_message_bytes(
MetricDirection::Out,
&protocol_name,
&item_name,
None,
bytes,
);
Status::ok()
}
pub(crate) async fn async_send_message<Message: Entity>(
protocol_id: ProtocolId,
nc: &Arc<dyn CKBProtocolContext + Sync>,
peer_index: PeerIndex,
message: &Message,
) -> Status {
if let Err(err) = nc
.async_send_message(protocol_id, peer_index, message.as_bytes())
.await
{
let name = message_name(protocol_id, message);
let error_message = format!("nc.send_message {name}, error: {err:?}");
ckb_logger::error!("{}", error_message);
return StatusCode::Network.with_context(error_message);
}
let bytes = message.as_bytes().len() as u64;
let item_name = item_name(protocol_id, message);
let protocol_name = protocol_name(protocol_id);
metric_ckb_message_bytes(
MetricDirection::Out,
&protocol_name,
&item_name,
None,
bytes,
);
Status::ok()
}
pub(crate) enum MetricDirection {
In,
Out,
}
impl fmt::Display for MetricDirection {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
MetricDirection::In => write!(f, "in"),
MetricDirection::Out => write!(f, "out"),
}
}
}
pub(crate) fn metric_ckb_message_bytes(
direction: MetricDirection,
protocol_name: &str,
item_name: &str,
status_code: Option<StatusCode>,
bytes: u64,
) {
if let Some(metrics) = ckb_metrics::handle() {
metrics
.ckb_message_bytes
.with_label_values(&[
direction.to_string().as_str(),
protocol_name,
item_name,
&status_code.unwrap_or(StatusCode::Ignored).name(),
])
.observe(bytes as f64);
}
}
#[allow(dead_code)]
pub(crate) fn send_message_to<Message: Entity>(
nc: &dyn CKBProtocolContext,
peer_index: PeerIndex,
message: &Message,
) -> Status {
let protocol_id = nc.protocol_id();
send_message(protocol_id, nc, peer_index, message)
}
pub(crate) async fn async_send_message_to<Message: Entity>(
nc: &Arc<dyn CKBProtocolContext + Sync>,
peer_index: PeerIndex,
message: &Message,
) -> Status {
let protocol_id = nc.protocol_id();
async_send_message(protocol_id, nc, peer_index, message).await
}
pub(crate) async fn async_quick_send_message_to<Message: Entity>(
nc: &Arc<dyn CKBProtocolContext + Sync>,
peer_index: PeerIndex,
message: &Message,
) -> Status {
let protocol_id = nc.protocol_id();
quick_send_message_async(protocol_id, nc, peer_index, message).await
}
fn message_name<Message: Entity>(protocol_id: ProtocolId, message: &Message) -> String {
if protocol_id == SupportProtocols::Sync.protocol_id() {
SyncMessageReader::new_unchecked(message.as_slice())
.to_enum()
.item_name()
.to_owned()
} else if protocol_id == SupportProtocols::RelayV3.protocol_id() {
RelayMessageReader::new_unchecked(message.as_slice())
.to_enum()
.item_name()
.to_owned()
} else {
Message::NAME.to_owned()
}
}
fn item_name<Message: Entity>(protocol_id: ProtocolId, message: &Message) -> String {
if protocol_id == SupportProtocols::Sync.protocol_id() {
SyncMessageReader::verify(message.as_slice(), true)
.map(|_| {
SyncMessageReader::new_unchecked(message.as_slice())
.to_enum()
.item_name()
.to_owned()
})
.unwrap_or_else(|err| {
error!("SyncMessageReader::verify error: {:?}", err);
"none".to_owned()
})
} else if protocol_id == SupportProtocols::RelayV3.protocol_id() {
RelayMessageReader::verify(message.as_slice(), true)
.map(|_| {
RelayMessageReader::new_unchecked(message.as_slice())
.to_enum()
.item_name()
.to_owned()
})
.unwrap_or_else(|err| {
error!("RelayMessageReader::verify error: {:?}", err);
"none".to_owned()
})
} else {
"none".to_owned()
}
}
fn protocol_name(protocol_id: ProtocolId) -> String {
match protocol_id.value() {
0 => SupportProtocols::Ping.name(),
1 => SupportProtocols::Discovery.name(),
2 => SupportProtocols::Identify.name(),
3 => SupportProtocols::Feeler.name(),
4 => SupportProtocols::DisconnectMessage.name(),
100 => SupportProtocols::Sync.name(),
101 => SupportProtocols::RelayV3.name(),
102 => SupportProtocols::Time.name(),
110 => SupportProtocols::Alert.name(),
120 => SupportProtocols::LightClient.name(),
121 => SupportProtocols::Filter.name(),
_ => {
error!("send_message got an unknown protocol id: {}", protocol_id);
"Unknown".to_owned()
}
}
}
pub(crate) async fn send_block_proposals(
nc: &Arc<dyn CKBProtocolContext + Sync>,
peer_index: PeerIndex,
txs: Vec<packed::Transaction>,
) {
let content = packed::BlockProposal::new_builder()
.transactions(txs)
.build();
let message = packed::RelayMessage::new_builder().set(content).build();
let status = async_quick_send_message_to(nc, peer_index, &message).await;
if !status.is_ok() {
ckb_logger::error!(
"send RelayBlockProposal to {}, status: {:?}",
peer_index,
status
);
}
}