ckb_sync/filter/
mod.rs

1mod get_block_filter_check_points_process;
2mod get_block_filter_hashes_process;
3mod get_block_filters_process;
4
5use crate::{Status, types::SyncShared};
6use get_block_filter_check_points_process::GetBlockFilterCheckPointsProcess;
7use get_block_filter_hashes_process::GetBlockFilterHashesProcess;
8use get_block_filters_process::GetBlockFiltersProcess;
9
10use crate::utils::{MetricDirection, metric_ckb_message_bytes};
11use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
12use ckb_logger::{debug_target, error_target, info_target, warn_target};
13use ckb_network::{
14    CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols, async_trait, bytes::Bytes,
15};
16use ckb_types::{packed, prelude::*};
17use std::sync::Arc;
18use std::time::Instant;
19
20/// Filter protocol handle
21#[derive(Clone)]
22pub struct BlockFilter {
23    /// Sync shared state
24    shared: Arc<SyncShared>,
25}
26
27impl BlockFilter {
28    /// Create a new block filter protocol handler
29    pub fn new(shared: Arc<SyncShared>) -> Self {
30        Self { shared }
31    }
32
33    async fn try_process(
34        &mut self,
35        nc: Arc<dyn CKBProtocolContext + Sync>,
36        peer: PeerIndex,
37        message: packed::BlockFilterMessageUnionReader<'_>,
38    ) -> Status {
39        match message {
40            packed::BlockFilterMessageUnionReader::GetBlockFilters(msg) => {
41                GetBlockFiltersProcess::new(msg, self, nc, peer)
42                    .execute()
43                    .await
44            }
45            packed::BlockFilterMessageUnionReader::GetBlockFilterHashes(msg) => {
46                GetBlockFilterHashesProcess::new(msg, self, nc, peer)
47                    .execute()
48                    .await
49            }
50            packed::BlockFilterMessageUnionReader::GetBlockFilterCheckPoints(msg) => {
51                GetBlockFilterCheckPointsProcess::new(msg, self, nc, peer)
52                    .execute()
53                    .await
54            }
55            packed::BlockFilterMessageUnionReader::BlockFilters(_)
56            | packed::BlockFilterMessageUnionReader::BlockFilterHashes(_)
57            | packed::BlockFilterMessageUnionReader::BlockFilterCheckPoints(_) => {
58                // remote peer should not send block filter to us without asking
59                // TODO: ban remote peer
60                warn_target!(
61                    crate::LOG_TARGET_FILTER,
62                    "Received unexpected message from peer: {:?}",
63                    peer
64                );
65                Status::ignored()
66            }
67        }
68    }
69
70    async fn process(
71        &mut self,
72        nc: Arc<dyn CKBProtocolContext + Sync>,
73        peer: PeerIndex,
74        message: packed::BlockFilterMessageUnionReader<'_>,
75    ) {
76        let item_name = message.item_name();
77        let item_bytes = message.as_slice().len() as u64;
78        let status = self.try_process(Arc::clone(&nc), peer, message).await;
79
80        metric_ckb_message_bytes(
81            MetricDirection::In,
82            &SupportProtocols::Filter.name(),
83            message.item_name(),
84            Some(status.code()),
85            item_bytes,
86        );
87
88        if let Some(ban_time) = status.should_ban() {
89            error_target!(
90                crate::LOG_TARGET_RELAY,
91                "receive {} from {}, ban {:?} for {}",
92                item_name,
93                peer,
94                ban_time,
95                status
96            );
97            nc.ban_peer(peer, ban_time, status.to_string());
98        } else if status.should_warn() {
99            warn_target!(
100                crate::LOG_TARGET_RELAY,
101                "receive {} from {}, {}",
102                item_name,
103                peer,
104                status
105            );
106        } else if !status.is_ok() {
107            debug_target!(
108                crate::LOG_TARGET_RELAY,
109                "receive {} from {}, {}",
110                item_name,
111                peer,
112                status
113            );
114        }
115    }
116}
117
118#[async_trait]
119impl CKBProtocolHandler for BlockFilter {
120    async fn init(&mut self, _nc: Arc<dyn CKBProtocolContext + Sync>) {}
121
122    async fn received(
123        &mut self,
124        nc: Arc<dyn CKBProtocolContext + Sync>,
125        peer_index: PeerIndex,
126        data: Bytes,
127    ) {
128        let msg = match packed::BlockFilterMessageReader::from_compatible_slice(&data) {
129            Ok(msg) => msg.to_enum(),
130            _ => {
131                info_target!(
132                    crate::LOG_TARGET_FILTER,
133                    "Peer {} sends us a malformed message",
134                    peer_index
135                );
136                nc.ban_peer(
137                    peer_index,
138                    BAD_MESSAGE_BAN_TIME,
139                    String::from("send us a malformed message"),
140                );
141                return;
142            }
143        };
144
145        debug_target!(
146            crate::LOG_TARGET_FILTER,
147            "received msg {} from {}",
148            msg.item_name(),
149            peer_index
150        );
151        let start_time = Instant::now();
152        self.process(nc, peer_index, msg).await;
153        debug_target!(
154            crate::LOG_TARGET_FILTER,
155            "process message={}, peer={}, cost={:?}",
156            msg.item_name(),
157            peer_index,
158            Instant::now().saturating_duration_since(start_time),
159        );
160    }
161
162    async fn connected(
163        &mut self,
164        _nc: Arc<dyn CKBProtocolContext + Sync>,
165        peer_index: PeerIndex,
166        _version: &str,
167    ) {
168        info_target!(
169            crate::LOG_TARGET_FILTER,
170            "FilterProtocol.connected peer={}",
171            peer_index
172        );
173    }
174
175    async fn disconnected(
176        &mut self,
177        _nc: Arc<dyn CKBProtocolContext + Sync>,
178        peer_index: PeerIndex,
179    ) {
180        info_target!(
181            crate::LOG_TARGET_FILTER,
182            "FilterProtocol.disconnected peer={}",
183            peer_index
184        );
185    }
186}