1mod get_block_filter_check_points_process;
2mod get_block_filter_hashes_process;
3mod get_block_filters_process;
4
5use crate::{Status, types::SyncShared};
6use get_block_filter_check_points_process::GetBlockFilterCheckPointsProcess;
7use get_block_filter_hashes_process::GetBlockFilterHashesProcess;
8use get_block_filters_process::GetBlockFiltersProcess;
9
10use crate::utils::{MetricDirection, metric_ckb_message_bytes};
11use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
12use ckb_logger::{debug_target, error_target, info_target, warn_target};
13use ckb_network::{
14 CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols, async_trait, bytes::Bytes,
15};
16use ckb_types::{packed, prelude::*};
17use std::sync::Arc;
18use std::time::Instant;
19
20#[derive(Clone)]
22pub struct BlockFilter {
23 shared: Arc<SyncShared>,
25}
26
27impl BlockFilter {
28 pub fn new(shared: Arc<SyncShared>) -> Self {
30 Self { shared }
31 }
32
33 fn try_process(
34 &mut self,
35 nc: Arc<dyn CKBProtocolContext + Sync>,
36 peer: PeerIndex,
37 message: packed::BlockFilterMessageUnionReader<'_>,
38 ) -> Status {
39 match message {
40 packed::BlockFilterMessageUnionReader::GetBlockFilters(msg) => {
41 GetBlockFiltersProcess::new(msg, self, nc, peer).execute()
42 }
43 packed::BlockFilterMessageUnionReader::GetBlockFilterHashes(msg) => {
44 GetBlockFilterHashesProcess::new(msg, self, nc, peer).execute()
45 }
46 packed::BlockFilterMessageUnionReader::GetBlockFilterCheckPoints(msg) => {
47 GetBlockFilterCheckPointsProcess::new(msg, self, nc, peer).execute()
48 }
49 packed::BlockFilterMessageUnionReader::BlockFilters(_)
50 | packed::BlockFilterMessageUnionReader::BlockFilterHashes(_)
51 | packed::BlockFilterMessageUnionReader::BlockFilterCheckPoints(_) => {
52 warn_target!(
55 crate::LOG_TARGET_FILTER,
56 "Received unexpected message from peer: {:?}",
57 peer
58 );
59 Status::ignored()
60 }
61 }
62 }
63
64 fn process(
65 &mut self,
66 nc: Arc<dyn CKBProtocolContext + Sync>,
67 peer: PeerIndex,
68 message: packed::BlockFilterMessageUnionReader<'_>,
69 ) {
70 let item_name = message.item_name();
71 let item_bytes = message.as_slice().len() as u64;
72 let status = self.try_process(Arc::clone(&nc), peer, message);
73
74 metric_ckb_message_bytes(
75 MetricDirection::In,
76 &SupportProtocols::Filter.name(),
77 message.item_name(),
78 Some(status.code()),
79 item_bytes,
80 );
81
82 if let Some(ban_time) = status.should_ban() {
83 error_target!(
84 crate::LOG_TARGET_RELAY,
85 "receive {} from {}, ban {:?} for {}",
86 item_name,
87 peer,
88 ban_time,
89 status
90 );
91 nc.ban_peer(peer, ban_time, status.to_string());
92 } else if status.should_warn() {
93 warn_target!(
94 crate::LOG_TARGET_RELAY,
95 "receive {} from {}, {}",
96 item_name,
97 peer,
98 status
99 );
100 } else if !status.is_ok() {
101 debug_target!(
102 crate::LOG_TARGET_RELAY,
103 "receive {} from {}, {}",
104 item_name,
105 peer,
106 status
107 );
108 }
109 }
110}
111
112#[async_trait]
113impl CKBProtocolHandler for BlockFilter {
114 async fn init(&mut self, _nc: Arc<dyn CKBProtocolContext + Sync>) {}
115
116 async fn received(
117 &mut self,
118 nc: Arc<dyn CKBProtocolContext + Sync>,
119 peer_index: PeerIndex,
120 data: Bytes,
121 ) {
122 let msg = match packed::BlockFilterMessageReader::from_compatible_slice(&data) {
123 Ok(msg) => msg.to_enum(),
124 _ => {
125 info_target!(
126 crate::LOG_TARGET_FILTER,
127 "Peer {} sends us a malformed message",
128 peer_index
129 );
130 nc.ban_peer(
131 peer_index,
132 BAD_MESSAGE_BAN_TIME,
133 String::from("send us a malformed message"),
134 );
135 return;
136 }
137 };
138
139 debug_target!(
140 crate::LOG_TARGET_FILTER,
141 "received msg {} from {}",
142 msg.item_name(),
143 peer_index
144 );
145 let start_time = Instant::now();
146 self.process(nc, peer_index, msg);
147 debug_target!(
148 crate::LOG_TARGET_FILTER,
149 "process message={}, peer={}, cost={:?}",
150 msg.item_name(),
151 peer_index,
152 Instant::now().saturating_duration_since(start_time),
153 );
154 }
155
156 async fn connected(
157 &mut self,
158 _nc: Arc<dyn CKBProtocolContext + Sync>,
159 peer_index: PeerIndex,
160 _version: &str,
161 ) {
162 info_target!(
163 crate::LOG_TARGET_FILTER,
164 "FilterProtocol.connected peer={}",
165 peer_index
166 );
167 }
168
169 async fn disconnected(
170 &mut self,
171 _nc: Arc<dyn CKBProtocolContext + Sync>,
172 peer_index: PeerIndex,
173 ) {
174 info_target!(
175 crate::LOG_TARGET_FILTER,
176 "FilterProtocol.disconnected peer={}",
177 peer_index
178 );
179 }
180}