ckb_sync/filter/
mod.rs

1mod get_block_filter_check_points_process;
2mod get_block_filter_hashes_process;
3mod get_block_filters_process;
4
5use crate::{Status, types::SyncShared};
6use get_block_filter_check_points_process::GetBlockFilterCheckPointsProcess;
7use get_block_filter_hashes_process::GetBlockFilterHashesProcess;
8use get_block_filters_process::GetBlockFiltersProcess;
9
10use crate::utils::{MetricDirection, metric_ckb_message_bytes};
11use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
12use ckb_logger::{debug_target, error_target, info_target, warn_target};
13use ckb_network::{
14    CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols, async_trait, bytes::Bytes,
15};
16use ckb_types::{packed, prelude::*};
17use std::sync::Arc;
18use std::time::Instant;
19
20/// Filter protocol handle
21#[derive(Clone)]
22pub struct BlockFilter {
23    /// Sync shared state
24    shared: Arc<SyncShared>,
25}
26
27impl BlockFilter {
28    /// Create a new block filter protocol handler
29    pub fn new(shared: Arc<SyncShared>) -> Self {
30        Self { shared }
31    }
32
33    fn try_process(
34        &mut self,
35        nc: Arc<dyn CKBProtocolContext + Sync>,
36        peer: PeerIndex,
37        message: packed::BlockFilterMessageUnionReader<'_>,
38    ) -> Status {
39        match message {
40            packed::BlockFilterMessageUnionReader::GetBlockFilters(msg) => {
41                GetBlockFiltersProcess::new(msg, self, nc, peer).execute()
42            }
43            packed::BlockFilterMessageUnionReader::GetBlockFilterHashes(msg) => {
44                GetBlockFilterHashesProcess::new(msg, self, nc, peer).execute()
45            }
46            packed::BlockFilterMessageUnionReader::GetBlockFilterCheckPoints(msg) => {
47                GetBlockFilterCheckPointsProcess::new(msg, self, nc, peer).execute()
48            }
49            packed::BlockFilterMessageUnionReader::BlockFilters(_)
50            | packed::BlockFilterMessageUnionReader::BlockFilterHashes(_)
51            | packed::BlockFilterMessageUnionReader::BlockFilterCheckPoints(_) => {
52                // remote peer should not send block filter to us without asking
53                // TODO: ban remote peer
54                warn_target!(
55                    crate::LOG_TARGET_FILTER,
56                    "Received unexpected message from peer: {:?}",
57                    peer
58                );
59                Status::ignored()
60            }
61        }
62    }
63
64    fn process(
65        &mut self,
66        nc: Arc<dyn CKBProtocolContext + Sync>,
67        peer: PeerIndex,
68        message: packed::BlockFilterMessageUnionReader<'_>,
69    ) {
70        let item_name = message.item_name();
71        let item_bytes = message.as_slice().len() as u64;
72        let status = self.try_process(Arc::clone(&nc), peer, message);
73
74        metric_ckb_message_bytes(
75            MetricDirection::In,
76            &SupportProtocols::Filter.name(),
77            message.item_name(),
78            Some(status.code()),
79            item_bytes,
80        );
81
82        if let Some(ban_time) = status.should_ban() {
83            error_target!(
84                crate::LOG_TARGET_RELAY,
85                "receive {} from {}, ban {:?} for {}",
86                item_name,
87                peer,
88                ban_time,
89                status
90            );
91            nc.ban_peer(peer, ban_time, status.to_string());
92        } else if status.should_warn() {
93            warn_target!(
94                crate::LOG_TARGET_RELAY,
95                "receive {} from {}, {}",
96                item_name,
97                peer,
98                status
99            );
100        } else if !status.is_ok() {
101            debug_target!(
102                crate::LOG_TARGET_RELAY,
103                "receive {} from {}, {}",
104                item_name,
105                peer,
106                status
107            );
108        }
109    }
110}
111
112#[async_trait]
113impl CKBProtocolHandler for BlockFilter {
114    async fn init(&mut self, _nc: Arc<dyn CKBProtocolContext + Sync>) {}
115
116    async fn received(
117        &mut self,
118        nc: Arc<dyn CKBProtocolContext + Sync>,
119        peer_index: PeerIndex,
120        data: Bytes,
121    ) {
122        let msg = match packed::BlockFilterMessageReader::from_compatible_slice(&data) {
123            Ok(msg) => msg.to_enum(),
124            _ => {
125                info_target!(
126                    crate::LOG_TARGET_FILTER,
127                    "Peer {} sends us a malformed message",
128                    peer_index
129                );
130                nc.ban_peer(
131                    peer_index,
132                    BAD_MESSAGE_BAN_TIME,
133                    String::from("send us a malformed message"),
134                );
135                return;
136            }
137        };
138
139        debug_target!(
140            crate::LOG_TARGET_FILTER,
141            "received msg {} from {}",
142            msg.item_name(),
143            peer_index
144        );
145        let start_time = Instant::now();
146        self.process(nc, peer_index, msg);
147        debug_target!(
148            crate::LOG_TARGET_FILTER,
149            "process message={}, peer={}, cost={:?}",
150            msg.item_name(),
151            peer_index,
152            Instant::now().saturating_duration_since(start_time),
153        );
154    }
155
156    async fn connected(
157        &mut self,
158        _nc: Arc<dyn CKBProtocolContext + Sync>,
159        peer_index: PeerIndex,
160        _version: &str,
161    ) {
162        info_target!(
163            crate::LOG_TARGET_FILTER,
164            "FilterProtocol.connected peer={}",
165            peer_index
166        );
167    }
168
169    async fn disconnected(
170        &mut self,
171        _nc: Arc<dyn CKBProtocolContext + Sync>,
172        peer_index: PeerIndex,
173    ) {
174        info_target!(
175            crate::LOG_TARGET_FILTER,
176            "FilterProtocol.disconnected peer={}",
177            peer_index
178        );
179    }
180}