1mod get_block_filter_check_points_process;
2mod get_block_filter_hashes_process;
3mod get_block_filters_process;
4
5use crate::{Status, types::SyncShared};
6use get_block_filter_check_points_process::GetBlockFilterCheckPointsProcess;
7use get_block_filter_hashes_process::GetBlockFilterHashesProcess;
8use get_block_filters_process::GetBlockFiltersProcess;
9
10use crate::utils::{MetricDirection, metric_ckb_message_bytes};
11use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
12use ckb_logger::{debug_target, error_target, info_target, warn_target};
13use ckb_network::{
14 CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols, async_trait, bytes::Bytes,
15};
16use ckb_types::{packed, prelude::*};
17use std::sync::Arc;
18use std::time::Instant;
19
20#[derive(Clone)]
22pub struct BlockFilter {
23 shared: Arc<SyncShared>,
25}
26
27impl BlockFilter {
28 pub fn new(shared: Arc<SyncShared>) -> Self {
30 Self { shared }
31 }
32
33 async fn try_process(
34 &mut self,
35 nc: Arc<dyn CKBProtocolContext + Sync>,
36 peer: PeerIndex,
37 message: packed::BlockFilterMessageUnionReader<'_>,
38 ) -> Status {
39 match message {
40 packed::BlockFilterMessageUnionReader::GetBlockFilters(msg) => {
41 GetBlockFiltersProcess::new(msg, self, nc, peer)
42 .execute()
43 .await
44 }
45 packed::BlockFilterMessageUnionReader::GetBlockFilterHashes(msg) => {
46 GetBlockFilterHashesProcess::new(msg, self, nc, peer)
47 .execute()
48 .await
49 }
50 packed::BlockFilterMessageUnionReader::GetBlockFilterCheckPoints(msg) => {
51 GetBlockFilterCheckPointsProcess::new(msg, self, nc, peer)
52 .execute()
53 .await
54 }
55 packed::BlockFilterMessageUnionReader::BlockFilters(_)
56 | packed::BlockFilterMessageUnionReader::BlockFilterHashes(_)
57 | packed::BlockFilterMessageUnionReader::BlockFilterCheckPoints(_) => {
58 warn_target!(
61 crate::LOG_TARGET_FILTER,
62 "Received unexpected message from peer: {:?}",
63 peer
64 );
65 Status::ignored()
66 }
67 }
68 }
69
70 async fn process(
71 &mut self,
72 nc: Arc<dyn CKBProtocolContext + Sync>,
73 peer: PeerIndex,
74 message: packed::BlockFilterMessageUnionReader<'_>,
75 ) {
76 let item_name = message.item_name();
77 let item_bytes = message.as_slice().len() as u64;
78 let status = self.try_process(Arc::clone(&nc), peer, message).await;
79
80 metric_ckb_message_bytes(
81 MetricDirection::In,
82 &SupportProtocols::Filter.name(),
83 message.item_name(),
84 Some(status.code()),
85 item_bytes,
86 );
87
88 if let Some(ban_time) = status.should_ban() {
89 error_target!(
90 crate::LOG_TARGET_RELAY,
91 "receive {} from {}, ban {:?} for {}",
92 item_name,
93 peer,
94 ban_time,
95 status
96 );
97 nc.ban_peer(peer, ban_time, status.to_string());
98 } else if status.should_warn() {
99 warn_target!(
100 crate::LOG_TARGET_RELAY,
101 "receive {} from {}, {}",
102 item_name,
103 peer,
104 status
105 );
106 } else if !status.is_ok() {
107 debug_target!(
108 crate::LOG_TARGET_RELAY,
109 "receive {} from {}, {}",
110 item_name,
111 peer,
112 status
113 );
114 }
115 }
116}
117
118#[async_trait]
119impl CKBProtocolHandler for BlockFilter {
120 async fn init(&mut self, _nc: Arc<dyn CKBProtocolContext + Sync>) {}
121
122 async fn received(
123 &mut self,
124 nc: Arc<dyn CKBProtocolContext + Sync>,
125 peer_index: PeerIndex,
126 data: Bytes,
127 ) {
128 let msg = match packed::BlockFilterMessageReader::from_compatible_slice(&data) {
129 Ok(msg) => msg.to_enum(),
130 _ => {
131 info_target!(
132 crate::LOG_TARGET_FILTER,
133 "Peer {} sends us a malformed message",
134 peer_index
135 );
136 nc.ban_peer(
137 peer_index,
138 BAD_MESSAGE_BAN_TIME,
139 String::from("send us a malformed message"),
140 );
141 return;
142 }
143 };
144
145 debug_target!(
146 crate::LOG_TARGET_FILTER,
147 "received msg {} from {}",
148 msg.item_name(),
149 peer_index
150 );
151 let start_time = Instant::now();
152 self.process(nc, peer_index, msg).await;
153 debug_target!(
154 crate::LOG_TARGET_FILTER,
155 "process message={}, peer={}, cost={:?}",
156 msg.item_name(),
157 peer_index,
158 Instant::now().saturating_duration_since(start_time),
159 );
160 }
161
162 async fn connected(
163 &mut self,
164 _nc: Arc<dyn CKBProtocolContext + Sync>,
165 peer_index: PeerIndex,
166 _version: &str,
167 ) {
168 info_target!(
169 crate::LOG_TARGET_FILTER,
170 "FilterProtocol.connected peer={}",
171 peer_index
172 );
173 }
174
175 async fn disconnected(
176 &mut self,
177 _nc: Arc<dyn CKBProtocolContext + Sync>,
178 peer_index: PeerIndex,
179 ) {
180 info_target!(
181 crate::LOG_TARGET_FILTER,
182 "FilterProtocol.disconnected peer={}",
183 peer_index
184 );
185 }
186}