mod get_block_filter_check_points_process;
mod get_block_filter_hashes_process;
mod get_block_filters_process;
use crate::{types::SyncShared, Status};
use get_block_filter_check_points_process::GetBlockFilterCheckPointsProcess;
use get_block_filter_hashes_process::GetBlockFilterHashesProcess;
use get_block_filters_process::GetBlockFiltersProcess;
use crate::utils::{metric_ckb_message_bytes, MetricDirection};
use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
use ckb_logger::{debug_target, error_target, info_target, warn_target};
use ckb_network::{
async_trait, bytes::Bytes, CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols,
};
use ckb_types::{packed, prelude::*};
use std::sync::Arc;
use std::time::Instant;
#[derive(Clone)]
pub struct BlockFilter {
shared: Arc<SyncShared>,
}
impl BlockFilter {
pub fn new(shared: Arc<SyncShared>) -> Self {
Self { shared }
}
fn try_process(
&mut self,
nc: Arc<dyn CKBProtocolContext + Sync>,
peer: PeerIndex,
message: packed::BlockFilterMessageUnionReader<'_>,
) -> Status {
match message {
packed::BlockFilterMessageUnionReader::GetBlockFilters(msg) => {
GetBlockFiltersProcess::new(msg, self, nc, peer).execute()
}
packed::BlockFilterMessageUnionReader::GetBlockFilterHashes(msg) => {
GetBlockFilterHashesProcess::new(msg, self, nc, peer).execute()
}
packed::BlockFilterMessageUnionReader::GetBlockFilterCheckPoints(msg) => {
GetBlockFilterCheckPointsProcess::new(msg, self, nc, peer).execute()
}
packed::BlockFilterMessageUnionReader::BlockFilters(_)
| packed::BlockFilterMessageUnionReader::BlockFilterHashes(_)
| packed::BlockFilterMessageUnionReader::BlockFilterCheckPoints(_) => {
warn_target!(
crate::LOG_TARGET_FILTER,
"Received unexpected message from peer: {:?}",
peer
);
Status::ignored()
}
}
}
fn process(
&mut self,
nc: Arc<dyn CKBProtocolContext + Sync>,
peer: PeerIndex,
message: packed::BlockFilterMessageUnionReader<'_>,
) {
let item_name = message.item_name();
let item_bytes = message.as_slice().len() as u64;
let status = self.try_process(Arc::clone(&nc), peer, message);
metric_ckb_message_bytes(
MetricDirection::In,
&SupportProtocols::Filter.name(),
message.item_name(),
Some(status.code()),
item_bytes,
);
if let Some(ban_time) = status.should_ban() {
error_target!(
crate::LOG_TARGET_RELAY,
"receive {} from {}, ban {:?} for {}",
item_name,
peer,
ban_time,
status
);
nc.ban_peer(peer, ban_time, status.to_string());
} else if status.should_warn() {
warn_target!(
crate::LOG_TARGET_RELAY,
"receive {} from {}, {}",
item_name,
peer,
status
);
} else if !status.is_ok() {
debug_target!(
crate::LOG_TARGET_RELAY,
"receive {} from {}, {}",
item_name,
peer,
status
);
}
}
}
#[async_trait]
impl CKBProtocolHandler for BlockFilter {
async fn init(&mut self, _nc: Arc<dyn CKBProtocolContext + Sync>) {}
async fn received(
&mut self,
nc: Arc<dyn CKBProtocolContext + Sync>,
peer_index: PeerIndex,
data: Bytes,
) {
let msg = match packed::BlockFilterMessageReader::from_compatible_slice(&data) {
Ok(msg) => msg.to_enum(),
_ => {
info_target!(
crate::LOG_TARGET_FILTER,
"Peer {} sends us a malformed message",
peer_index
);
nc.ban_peer(
peer_index,
BAD_MESSAGE_BAN_TIME,
String::from("send us a malformed message"),
);
return;
}
};
debug_target!(
crate::LOG_TARGET_FILTER,
"received msg {} from {}",
msg.item_name(),
peer_index
);
let start_time = Instant::now();
self.process(nc, peer_index, msg);
debug_target!(
crate::LOG_TARGET_FILTER,
"process message={}, peer={}, cost={:?}",
msg.item_name(),
peer_index,
Instant::now().saturating_duration_since(start_time),
);
}
async fn connected(
&mut self,
_nc: Arc<dyn CKBProtocolContext + Sync>,
peer_index: PeerIndex,
_version: &str,
) {
info_target!(
crate::LOG_TARGET_FILTER,
"FilterProtocol.connected peer={}",
peer_index
);
}
async fn disconnected(
&mut self,
_nc: Arc<dyn CKBProtocolContext + Sync>,
peer_index: PeerIndex,
) {
info_target!(
crate::LOG_TARGET_FILTER,
"FilterProtocol.disconnected peer={}",
peer_index
);
}
}