ckb_sync/relayer/
mod.rs

1mod block_proposal_process;
2mod block_transactions_process;
3mod block_transactions_verifier;
4mod block_uncles_verifier;
5mod compact_block_process;
6mod compact_block_verifier;
7mod get_block_proposal_process;
8mod get_block_transactions_process;
9mod get_transactions_process;
10#[cfg(test)]
11pub(crate) mod tests;
12mod transaction_hashes_process;
13mod transactions_process;
14
15use self::block_proposal_process::BlockProposalProcess;
16use self::block_transactions_process::BlockTransactionsProcess;
17pub(crate) use self::compact_block_process::CompactBlockProcess;
18use self::get_block_proposal_process::GetBlockProposalProcess;
19use self::get_block_transactions_process::GetBlockTransactionsProcess;
20use self::get_transactions_process::GetTransactionsProcess;
21use self::transaction_hashes_process::TransactionHashesProcess;
22use self::transactions_process::TransactionsProcess;
23use crate::types::{ActiveChain, SyncShared, post_sync_process};
24use crate::utils::{MetricDirection, metric_ckb_message_bytes, send_message_to};
25use crate::{Status, StatusCode};
26use ckb_chain::VerifyResult;
27use ckb_chain::{ChainController, RemoteBlock};
28use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
29use ckb_error::is_internal_db_error;
30use ckb_logger::{
31    debug, debug_target, error, error_target, info_target, trace_target, warn_target,
32};
33use ckb_network::{
34    CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols, TargetSession,
35    async_trait, bytes::Bytes, tokio,
36};
37use ckb_shared::Shared;
38use ckb_shared::block_status::BlockStatus;
39use ckb_systemtime::unix_time_as_millis;
40use ckb_tx_pool::service::TxVerificationResult;
41use ckb_types::BlockNumberAndHash;
42use ckb_types::{
43    core::{self, BlockView},
44    packed::{self, Byte32, ProposalShortId},
45    prelude::*,
46};
47use itertools::Itertools;
48use std::collections::{HashMap, HashSet};
49use std::sync::Arc;
50use std::time::{Duration, Instant};
51
52pub const TX_PROPOSAL_TOKEN: u64 = 0;
53pub const ASK_FOR_TXS_TOKEN: u64 = 1;
54pub const TX_HASHES_TOKEN: u64 = 2;
55
56pub const MAX_RELAY_PEERS: usize = 128;
57pub const MAX_RELAY_TXS_NUM_PER_BATCH: usize = 32767;
58pub const MAX_RELAY_TXS_BYTES_PER_BATCH: usize = 1024 * 1024;
59
60type RateLimiter<T> = governor::RateLimiter<
61    T,
62    governor::state::keyed::HashMapStateStore<T>,
63    governor::clock::DefaultClock,
64>;
65
66#[derive(Debug, Eq, PartialEq)]
67pub enum ReconstructionResult {
68    Block(BlockView),
69    Missing(Vec<usize>, Vec<usize>),
70    Collided,
71    Error(Status),
72}
73
74/// Relayer protocol handle
75pub struct Relayer {
76    chain: ChainController,
77    pub(crate) shared: Arc<SyncShared>,
78    rate_limiter: RateLimiter<(PeerIndex, u32)>,
79}
80
81impl Relayer {
82    /// Init relay protocol handle
83    ///
84    /// This is a runtime relay protocol shared state, and any relay messages will be processed and forwarded by it
85    pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Self {
86        // setup a rate limiter keyed by peer and message type that lets through 30 requests per second
87        // current max rps is 10 (ASK_FOR_TXS_TOKEN / TX_PROPOSAL_TOKEN), 30 is a flexible hard cap with buffer
88        let quota = governor::Quota::per_second(std::num::NonZeroU32::new(30).unwrap());
89        let rate_limiter = RateLimiter::hashmap(quota);
90
91        Relayer {
92            chain,
93            shared,
94            rate_limiter,
95        }
96    }
97
98    /// Get shared state
99    pub fn shared(&self) -> &Arc<SyncShared> {
100        &self.shared
101    }
102
103    fn try_process(
104        &mut self,
105        nc: Arc<dyn CKBProtocolContext + Sync>,
106        peer: PeerIndex,
107        message: packed::RelayMessageUnionReader<'_>,
108    ) -> Status {
109        // CompactBlock will be verified by POW, it's OK to skip rate limit checking.
110        let should_check_rate =
111            !matches!(message, packed::RelayMessageUnionReader::CompactBlock(_));
112
113        if should_check_rate
114            && self
115                .rate_limiter
116                .check_key(&(peer, message.item_id()))
117                .is_err()
118        {
119            return StatusCode::TooManyRequests.with_context(message.item_name());
120        }
121
122        match message {
123            packed::RelayMessageUnionReader::CompactBlock(reader) => {
124                CompactBlockProcess::new(reader, self, nc, peer).execute()
125            }
126            packed::RelayMessageUnionReader::RelayTransactions(reader) => {
127                if reader.check_data() {
128                    TransactionsProcess::new(reader, self, nc, peer).execute()
129                } else {
130                    StatusCode::ProtocolMessageIsMalformed
131                        .with_context("RelayTransactions is invalid")
132                }
133            }
134            packed::RelayMessageUnionReader::RelayTransactionHashes(reader) => {
135                TransactionHashesProcess::new(reader, self, peer).execute()
136            }
137            packed::RelayMessageUnionReader::GetRelayTransactions(reader) => {
138                GetTransactionsProcess::new(reader, self, nc, peer).execute()
139            }
140            packed::RelayMessageUnionReader::GetBlockTransactions(reader) => {
141                GetBlockTransactionsProcess::new(reader, self, nc, peer).execute()
142            }
143            packed::RelayMessageUnionReader::BlockTransactions(reader) => {
144                if reader.check_data() {
145                    BlockTransactionsProcess::new(reader, self, nc, peer).execute()
146                } else {
147                    StatusCode::ProtocolMessageIsMalformed
148                        .with_context("BlockTransactions is invalid")
149                }
150            }
151            packed::RelayMessageUnionReader::GetBlockProposal(reader) => {
152                GetBlockProposalProcess::new(reader, self, nc, peer).execute()
153            }
154            packed::RelayMessageUnionReader::BlockProposal(reader) => {
155                BlockProposalProcess::new(reader, self).execute()
156            }
157        }
158    }
159
160    fn process(
161        &mut self,
162        nc: Arc<dyn CKBProtocolContext + Sync>,
163        peer: PeerIndex,
164        message: packed::RelayMessageUnionReader<'_>,
165    ) {
166        let item_name = message.item_name();
167        let item_bytes = message.as_slice().len() as u64;
168        let status = self.try_process(Arc::clone(&nc), peer, message);
169
170        metric_ckb_message_bytes(
171            MetricDirection::In,
172            &SupportProtocols::RelayV3.name(),
173            message.item_name(),
174            Some(status.code()),
175            item_bytes,
176        );
177
178        if let Some(ban_time) = status.should_ban() {
179            error_target!(
180                crate::LOG_TARGET_RELAY,
181                "receive {} from {}, ban {:?} for {}",
182                item_name,
183                peer,
184                ban_time,
185                status
186            );
187            nc.ban_peer(peer, ban_time, status.to_string());
188        } else if status.should_warn() {
189            warn_target!(
190                crate::LOG_TARGET_RELAY,
191                "receive {} from {}, {}",
192                item_name,
193                peer,
194                status
195            );
196        } else if !status.is_ok() {
197            debug_target!(
198                crate::LOG_TARGET_RELAY,
199                "receive {} from {}, {}",
200                item_name,
201                peer,
202                status
203            );
204        }
205    }
206
207    /// Request the transaction corresponding to the proposal id from the specified node
208    pub fn request_proposal_txs(
209        &self,
210        nc: &dyn CKBProtocolContext,
211        peer: PeerIndex,
212        block_hash_and_number: BlockNumberAndHash,
213        proposals: Vec<packed::ProposalShortId>,
214    ) {
215        let tx_pool = self.shared.shared().tx_pool_controller();
216        let fresh_proposals: Vec<ProposalShortId> = match tx_pool.fresh_proposals_filter(proposals)
217        {
218            Err(err) => {
219                debug_target!(
220                    crate::LOG_TARGET_RELAY,
221                    "tx_pool fresh_proposals_filter error: {:?}",
222                    err,
223                );
224                return;
225            }
226            Ok(fresh_proposals) => fresh_proposals.into_iter().unique().collect(),
227        };
228
229        let to_ask_proposals: Vec<ProposalShortId> = self
230            .shared()
231            .state()
232            .insert_inflight_proposals(fresh_proposals.clone(), block_hash_and_number.number)
233            .into_iter()
234            .zip(fresh_proposals)
235            .filter_map(|(firstly_in, id)| if firstly_in { Some(id) } else { None })
236            .collect();
237        if !to_ask_proposals.is_empty() {
238            let content = packed::GetBlockProposal::new_builder()
239                .block_hash(block_hash_and_number.hash)
240                .proposals(to_ask_proposals.clone())
241                .build();
242            let message = packed::RelayMessage::new_builder().set(content).build();
243            if !send_message_to(nc, peer, &message).is_ok() {
244                self.shared()
245                    .state()
246                    .remove_inflight_proposals(&to_ask_proposals);
247            }
248        }
249    }
250
251    /// Accept a new block from network
252    #[allow(clippy::needless_collect)]
253    pub fn accept_block(
254        &self,
255        nc: Arc<dyn CKBProtocolContext + Sync>,
256        peer_id: PeerIndex,
257        block: core::BlockView,
258        msg_name: &str,
259    ) {
260        if self
261            .shared()
262            .active_chain()
263            .contains_block_status(&block.hash(), BlockStatus::BLOCK_STORED)
264        {
265            return;
266        }
267
268        let block = Arc::new(block);
269
270        let verify_callback = {
271            let nc: Arc<dyn CKBProtocolContext + Sync> = Arc::clone(&nc);
272            let block = Arc::clone(&block);
273            let shared = Arc::clone(self.shared());
274            let msg_name = msg_name.to_owned();
275            Box::new(move |result: VerifyResult| match result {
276                Ok(verified) => {
277                    if !verified {
278                        debug!(
279                            "block {}-{} has verified already, won't build compact block and broadcast it",
280                            block.number(),
281                            block.hash()
282                        );
283                        return;
284                    }
285
286                    build_and_broadcast_compact_block(nc.as_ref(), shared.shared(), peer_id, block);
287                }
288                Err(err) => {
289                    error!(
290                        "verify block {}-{} failed: {:?}, won't build compact block and broadcast it",
291                        block.number(),
292                        block.hash(),
293                        err
294                    );
295
296                    let is_internal_db_error = is_internal_db_error(&err);
297                    if is_internal_db_error {
298                        return;
299                    }
300
301                    // punish the malicious peer
302                    post_sync_process(
303                        nc.as_ref(),
304                        peer_id,
305                        &msg_name,
306                        StatusCode::BlockIsInvalid.with_context(format!(
307                            "block {} is invalid, reason: {}",
308                            block.hash(),
309                            err
310                        )),
311                    );
312                }
313            })
314        };
315
316        let remote_block = RemoteBlock {
317            block,
318            verify_callback,
319        };
320
321        self.shared.accept_remote_block(&self.chain, remote_block);
322    }
323
324    /// Reorganize the full block according to the compact block/txs/uncles
325    // nodes should attempt to reconstruct the full block by taking the prefilledtxn transactions
326    // from the original CompactBlock message and placing them in the marked positions,
327    // then for each short transaction ID from the original compact_block message, in order,
328    // find the corresponding transaction either from the BlockTransactions message or
329    // from other sources and place it in the first available position in the block
330    // then once the block has been reconstructed, it shall be processed as normal,
331    // keeping in mind that short_ids are expected to occasionally collide,
332    // and that nodes must not be penalized for such collisions, wherever they appear.
333    pub fn reconstruct_block(
334        &self,
335        active_chain: &ActiveChain,
336        compact_block: &packed::CompactBlock,
337        received_transactions: Vec<core::TransactionView>,
338        uncles_index: &[u32],
339        received_uncles: &[core::UncleBlockView],
340    ) -> ReconstructionResult {
341        let block_txs_len = received_transactions.len();
342        let compact_block_hash = compact_block.calc_header_hash();
343        debug_target!(
344            crate::LOG_TARGET_RELAY,
345            "start block reconstruction, block hash: {}, received transactions len: {}",
346            compact_block_hash,
347            block_txs_len,
348        );
349
350        let mut short_ids_set: HashSet<ProposalShortId> =
351            compact_block.short_ids().into_iter().collect();
352
353        let mut txs_map: HashMap<ProposalShortId, core::TransactionView> = received_transactions
354            .into_iter()
355            .filter_map(|tx| {
356                let short_id = tx.proposal_short_id();
357                if short_ids_set.remove(&short_id) {
358                    Some((short_id, tx))
359                } else {
360                    None
361                }
362            })
363            .collect();
364
365        if !short_ids_set.is_empty() {
366            let tx_pool = self.shared.shared().tx_pool_controller();
367            let fetch_txs = tx_pool.fetch_txs(short_ids_set);
368            if let Err(e) = fetch_txs {
369                return ReconstructionResult::Error(StatusCode::TxPool.with_context(e));
370            }
371            txs_map.extend(fetch_txs.unwrap());
372        }
373
374        let txs_len = compact_block.txs_len();
375        let mut block_transactions: Vec<Option<core::TransactionView>> =
376            Vec::with_capacity(txs_len);
377
378        let short_ids_iter = &mut compact_block.short_ids().into_iter();
379        // fill transactions gap
380        compact_block
381            .prefilled_transactions()
382            .into_iter()
383            .for_each(|pt| {
384                let index: usize = pt.index().into();
385                let gap = index - block_transactions.len();
386                if gap > 0 {
387                    short_ids_iter
388                        .take(gap)
389                        .for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
390                }
391                block_transactions.push(Some(pt.transaction().into_view()));
392            });
393
394        // append remain transactions
395        short_ids_iter.for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
396
397        let missing = block_transactions.iter().any(Option::is_none);
398
399        let mut missing_uncles = Vec::with_capacity(compact_block.uncles().len());
400        let mut uncles = Vec::with_capacity(compact_block.uncles().len());
401
402        let mut position = 0;
403        for (i, uncle_hash) in compact_block.uncles().into_iter().enumerate() {
404            if uncles_index.contains(&(i as u32)) {
405                uncles.push(
406                    received_uncles
407                        .get(position)
408                        .expect("have checked the indexes")
409                        .clone()
410                        .data(),
411                );
412                position += 1;
413                continue;
414            };
415            let status = active_chain.get_block_status(&uncle_hash);
416            match status {
417                BlockStatus::UNKNOWN | BlockStatus::HEADER_VALID => missing_uncles.push(i),
418                BlockStatus::BLOCK_STORED | BlockStatus::BLOCK_VALID => {
419                    if let Some(uncle) = active_chain.get_block(&uncle_hash) {
420                        uncles.push(uncle.as_uncle().data());
421                    } else {
422                        debug_target!(
423                            crate::LOG_TARGET_RELAY,
424                            "reconstruct_block could not find {:#?} uncle block: {:#?}",
425                            status,
426                            uncle_hash,
427                        );
428                        missing_uncles.push(i);
429                    }
430                }
431                BlockStatus::BLOCK_RECEIVED => {
432                    if let Some(uncle) = self
433                        .chain
434                        .get_orphan_block(self.shared().store(), &uncle_hash)
435                    {
436                        uncles.push(uncle.as_uncle().data());
437                    } else {
438                        debug_target!(
439                            crate::LOG_TARGET_RELAY,
440                            "reconstruct_block could not find {:#?} uncle block: {:#?}",
441                            status,
442                            uncle_hash,
443                        );
444                        missing_uncles.push(i);
445                    }
446                }
447                BlockStatus::BLOCK_INVALID => {
448                    return ReconstructionResult::Error(
449                        StatusCode::CompactBlockHasInvalidUncle.with_context(uncle_hash),
450                    );
451                }
452                _ => missing_uncles.push(i),
453            }
454        }
455
456        if !missing && missing_uncles.is_empty() {
457            let txs = block_transactions
458                .into_iter()
459                .collect::<Option<Vec<_>>>()
460                .expect("missing checked, should not fail");
461            let block = if let Some(extension) = compact_block.extension() {
462                packed::BlockV1::new_builder()
463                    .header(compact_block.header())
464                    .uncles(uncles)
465                    .transactions(txs.into_iter().map(|tx| tx.data()).collect::<Vec<_>>())
466                    .proposals(compact_block.proposals())
467                    .extension(extension)
468                    .build()
469                    .as_v0()
470            } else {
471                packed::Block::new_builder()
472                    .header(compact_block.header())
473                    .uncles(uncles)
474                    .transactions(txs.into_iter().map(|tx| tx.data()).collect::<Vec<_>>())
475                    .proposals(compact_block.proposals())
476                    .build()
477            }
478            .into_view();
479
480            debug_target!(
481                crate::LOG_TARGET_RELAY,
482                "finish block reconstruction, block hash: {}",
483                compact_block.calc_header_hash(),
484            );
485
486            let compact_block_tx_root = compact_block.header().raw().transactions_root();
487            let reconstruct_block_tx_root = block.transactions_root();
488            if compact_block_tx_root != reconstruct_block_tx_root {
489                if compact_block.short_ids().is_empty()
490                    || compact_block.short_ids().len() == block_txs_len
491                {
492                    return ReconstructionResult::Error(
493                        StatusCode::CompactBlockHasUnmatchedTransactionRootWithReconstructedBlock
494                            .with_context(format!(
495                                "Compact_block_tx_root({}) != reconstruct_block_tx_root({})",
496                                compact_block.header().raw().transactions_root(),
497                                block.transactions_root(),
498                            )),
499                    );
500                } else {
501                    if let Some(metrics) = ckb_metrics::handle() {
502                        metrics.ckb_relay_transaction_short_id_collide.inc();
503                    }
504                    return ReconstructionResult::Collided;
505                }
506            }
507
508            ReconstructionResult::Block(block)
509        } else {
510            let missing_indexes: Vec<usize> = block_transactions
511                .iter()
512                .enumerate()
513                .filter_map(|(i, t)| if t.is_none() { Some(i) } else { None })
514                .collect();
515
516            debug_target!(
517                crate::LOG_TARGET_RELAY,
518                "block reconstruction failed, block hash: {}, missing: {}, total: {}",
519                compact_block.calc_header_hash(),
520                missing_indexes.len(),
521                compact_block.short_ids().len(),
522            );
523
524            ReconstructionResult::Missing(missing_indexes, missing_uncles)
525        }
526    }
527
528    fn prune_tx_proposal_request(&self, nc: &dyn CKBProtocolContext) {
529        let get_block_proposals = self.shared().state().drain_get_block_proposals();
530        let tx_pool = self.shared.shared().tx_pool_controller();
531
532        let fetch_txs = tx_pool.fetch_txs(
533            get_block_proposals
534                .iter()
535                .map(|kv_pair| kv_pair.key().clone())
536                .collect(),
537        );
538        if let Err(err) = fetch_txs {
539            debug_target!(
540                crate::LOG_TARGET_RELAY,
541                "relayer prune_tx_proposal_request internal error: {:?}",
542                err,
543            );
544            return;
545        }
546
547        let txs = fetch_txs.unwrap();
548
549        let mut peer_txs = HashMap::new();
550        for (id, peer_indices) in get_block_proposals.into_iter() {
551            if let Some(tx) = txs.get(&id) {
552                for peer_index in peer_indices {
553                    let tx_set = peer_txs.entry(peer_index).or_insert_with(Vec::new);
554                    tx_set.push(tx.clone());
555                }
556            }
557        }
558
559        let send_block_proposals =
560            |nc: &dyn CKBProtocolContext, peer_index: PeerIndex, txs: Vec<packed::Transaction>| {
561                let content = packed::BlockProposal::new_builder()
562                    .transactions(txs)
563                    .build();
564                let message = packed::RelayMessage::new_builder().set(content).build();
565                let status = send_message_to(nc, peer_index, &message);
566                if !status.is_ok() {
567                    ckb_logger::error!(
568                        "send RelayBlockProposal to {}, status: {:?}",
569                        peer_index,
570                        status
571                    );
572                }
573            };
574
575        let mut relay_bytes = 0;
576        let mut relay_proposals = Vec::new();
577        for (peer_index, txs) in peer_txs {
578            for tx in txs {
579                let data = tx.data();
580                let tx_size = data.total_size();
581                if relay_bytes + tx_size > MAX_RELAY_TXS_BYTES_PER_BATCH {
582                    send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals));
583                    relay_bytes = tx_size;
584                } else {
585                    relay_bytes += tx_size;
586                }
587                relay_proposals.push(data);
588            }
589            if !relay_proposals.is_empty() {
590                send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals));
591                relay_bytes = 0;
592            }
593        }
594    }
595
596    /// Ask for relay transaction by hash from all peers
597    pub fn ask_for_txs(&self, nc: &dyn CKBProtocolContext) {
598        for (peer, mut tx_hashes) in self.shared().state().pop_ask_for_txs() {
599            if !tx_hashes.is_empty() {
600                debug_target!(
601                    crate::LOG_TARGET_RELAY,
602                    "Send get transaction ({} hashes) to {}",
603                    tx_hashes.len(),
604                    peer,
605                );
606                tx_hashes.truncate(MAX_RELAY_TXS_NUM_PER_BATCH);
607                let content = packed::GetRelayTransactions::new_builder()
608                    .tx_hashes(tx_hashes)
609                    .build();
610                let message = packed::RelayMessage::new_builder().set(content).build();
611                let status = send_message_to(nc, peer, &message);
612                if !status.is_ok() {
613                    ckb_logger::error!(
614                        "interrupted request for transactions, status: {:?}",
615                        status
616                    );
617                }
618            }
619        }
620    }
621
622    /// Send bulk of tx hashes to selected peers
623    pub fn send_bulk_of_tx_hashes(&self, nc: &dyn CKBProtocolContext) {
624        const BUFFER_SIZE: usize = 42;
625
626        let connected_peers = nc.full_relay_connected_peers();
627        if connected_peers.is_empty() {
628            return;
629        }
630
631        let tx_verify_results = self
632            .shared
633            .state()
634            .take_relay_tx_verify_results(MAX_RELAY_TXS_NUM_PER_BATCH);
635        let mut selected: HashMap<PeerIndex, Vec<Byte32>> = HashMap::default();
636        {
637            for tx_verify_result in tx_verify_results {
638                match tx_verify_result {
639                    TxVerificationResult::Ok {
640                        original_peer,
641                        tx_hash,
642                    } => {
643                        for target in &connected_peers {
644                            match original_peer {
645                                Some(peer) => {
646                                    // broadcast tx hash to all connected peers except original peer
647                                    if peer != *target {
648                                        let hashes = selected
649                                            .entry(*target)
650                                            .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
651                                        hashes.push(tx_hash.clone());
652                                    }
653                                }
654                                None => {
655                                    // since this tx is submitted through local rpc, it is assumed to be a new tx for all connected peers
656                                    let hashes = selected
657                                        .entry(*target)
658                                        .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
659                                    hashes.push(tx_hash.clone());
660                                    self.shared.state().mark_as_known_tx(tx_hash.clone());
661                                }
662                            }
663                        }
664                    }
665                    TxVerificationResult::Reject { tx_hash } => {
666                        self.shared.state().remove_from_known_txs(&tx_hash);
667                    }
668                    TxVerificationResult::UnknownParents { peer, parents } => {
669                        let tx_hashes: Vec<_> = {
670                            let mut tx_filter = self.shared.state().tx_filter();
671                            tx_filter.remove_expired();
672                            parents
673                                .into_iter()
674                                .filter(|tx_hash| !tx_filter.contains(tx_hash))
675                                .collect()
676                        };
677                        self.shared.state().add_ask_for_txs(peer, tx_hashes);
678                    }
679                }
680            }
681        }
682        for (peer, hashes) in selected {
683            let content = packed::RelayTransactionHashes::new_builder()
684                .tx_hashes(hashes)
685                .build();
686            let message = packed::RelayMessage::new_builder().set(content).build();
687
688            if let Err(err) = nc.filter_broadcast(TargetSession::Single(peer), message.as_bytes()) {
689                debug_target!(
690                    crate::LOG_TARGET_RELAY,
691                    "relayer send TransactionHashes error: {:?}",
692                    err,
693                );
694            }
695        }
696    }
697}
698
699fn build_and_broadcast_compact_block(
700    nc: &dyn CKBProtocolContext,
701    shared: &Shared,
702    peer: PeerIndex,
703    block: Arc<BlockView>,
704) {
705    debug_target!(
706        crate::LOG_TARGET_RELAY,
707        "[block_relay] relayer accept_block {} {}",
708        block.header().hash(),
709        unix_time_as_millis()
710    );
711    let block_hash = block.hash();
712    shared.remove_header_view(&block_hash);
713    let cb = packed::CompactBlock::build_from_block(&block, &HashSet::new());
714    let message = packed::RelayMessage::new_builder().set(cb).build();
715
716    let selected_peers: Vec<PeerIndex> = nc
717        .connected_peers()
718        .into_iter()
719        .filter(|target_peer| peer != *target_peer)
720        .take(MAX_RELAY_PEERS)
721        .collect();
722    if let Err(err) = nc.quick_filter_broadcast(
723        TargetSession::Multi(Box::new(selected_peers.into_iter())),
724        message.as_bytes(),
725    ) {
726        debug_target!(
727            crate::LOG_TARGET_RELAY,
728            "relayer send block when accept block error: {:?}",
729            err,
730        );
731    }
732
733    if let Some(p2p_control) = nc.p2p_control() {
734        let snapshot = shared.snapshot();
735        let parent_chain_root = {
736            let mmr = snapshot.chain_root_mmr(block.header().number() - 1);
737            match mmr.get_root() {
738                Ok(root) => root,
739                Err(err) => {
740                    error_target!(
741                        crate::LOG_TARGET_RELAY,
742                        "Generate last state to light client failed: {:?}",
743                        err
744                    );
745                    return;
746                }
747            }
748        };
749
750        let tip_header = packed::VerifiableHeader::new_builder()
751            .header(block.header().data())
752            .uncles_hash(block.calc_uncles_hash())
753            .extension(Pack::pack(&block.extension()))
754            .parent_chain_root(parent_chain_root)
755            .build();
756        let light_client_message = {
757            let content = packed::SendLastState::new_builder()
758                .last_header(tip_header)
759                .build();
760            packed::LightClientMessage::new_builder()
761                .set(content)
762                .build()
763        };
764        let light_client_peers: HashSet<PeerIndex> = nc
765            .connected_peers()
766            .into_iter()
767            .filter_map(|index| nc.get_peer(index).map(|peer| (index, peer)))
768            .filter(|(_id, peer)| peer.if_lightclient_subscribed)
769            .map(|(id, _)| id)
770            .collect();
771        if let Err(err) = p2p_control.filter_broadcast(
772            TargetSession::Filter(Box::new(move |id| light_client_peers.contains(id))),
773            SupportProtocols::LightClient.protocol_id(),
774            light_client_message.as_bytes(),
775        ) {
776            debug_target!(
777                crate::LOG_TARGET_RELAY,
778                "relayer send last state to light client when accept block, error: {:?}",
779                err,
780            );
781        }
782    }
783}
784
785#[async_trait]
786impl CKBProtocolHandler for Relayer {
787    async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
788        nc.set_notify(Duration::from_millis(100), TX_PROPOSAL_TOKEN)
789            .await
790            .expect("set_notify at init is ok");
791        nc.set_notify(Duration::from_millis(100), ASK_FOR_TXS_TOKEN)
792            .await
793            .expect("set_notify at init is ok");
794        nc.set_notify(Duration::from_millis(300), TX_HASHES_TOKEN)
795            .await
796            .expect("set_notify at init is ok");
797    }
798
799    async fn received(
800        &mut self,
801        nc: Arc<dyn CKBProtocolContext + Sync>,
802        peer_index: PeerIndex,
803        data: Bytes,
804    ) {
805        // If self is in the IBD state, don't process any relayer message.
806        if self.shared.active_chain().is_initial_block_download() {
807            return;
808        }
809
810        let msg = match packed::RelayMessageReader::from_compatible_slice(&data) {
811            Ok(msg) => {
812                let item = msg.to_enum();
813                if let packed::RelayMessageUnionReader::CompactBlock(ref reader) = item {
814                    if reader.count_extra_fields() > 1 {
815                        info_target!(
816                            crate::LOG_TARGET_RELAY,
817                            "Peer {} sends us a malformed message: \
818                             too many fields in CompactBlock",
819                            peer_index
820                        );
821                        nc.ban_peer(
822                            peer_index,
823                            BAD_MESSAGE_BAN_TIME,
824                            String::from(
825                                "send us a malformed message: \
826                                 too many fields in CompactBlock",
827                            ),
828                        );
829                        return;
830                    } else {
831                        item
832                    }
833                } else {
834                    match packed::RelayMessageReader::from_slice(&data) {
835                        Ok(msg) => msg.to_enum(),
836                        _ => {
837                            info_target!(
838                                crate::LOG_TARGET_RELAY,
839                                "Peer {} sends us a malformed message: \
840                                 too many fields",
841                                peer_index
842                            );
843                            nc.ban_peer(
844                                peer_index,
845                                BAD_MESSAGE_BAN_TIME,
846                                String::from(
847                                    "send us a malformed message \
848                                     too many fields",
849                                ),
850                            );
851                            return;
852                        }
853                    }
854                }
855            }
856            _ => {
857                info_target!(
858                    crate::LOG_TARGET_RELAY,
859                    "Peer {} sends us a malformed message",
860                    peer_index
861                );
862                nc.ban_peer(
863                    peer_index,
864                    BAD_MESSAGE_BAN_TIME,
865                    String::from("send us a malformed message"),
866                );
867                return;
868            }
869        };
870
871        debug_target!(
872            crate::LOG_TARGET_RELAY,
873            "received msg {} from {}",
874            msg.item_name(),
875            peer_index
876        );
877        #[cfg(feature = "with_sentry")]
878        {
879            let sentry_hub = sentry::Hub::current();
880            let _scope_guard = sentry_hub.push_scope();
881            sentry_hub.configure_scope(|scope| {
882                scope.set_tag("p2p.protocol", "relayer");
883                scope.set_tag("p2p.message", msg.item_name());
884            });
885        }
886
887        let start_time = Instant::now();
888        tokio::task::block_in_place(|| self.process(nc, peer_index, msg));
889        debug_target!(
890            crate::LOG_TARGET_RELAY,
891            "process message={}, peer={}, cost={:?}",
892            msg.item_name(),
893            peer_index,
894            Instant::now().saturating_duration_since(start_time),
895        );
896    }
897
898    async fn connected(
899        &mut self,
900        _nc: Arc<dyn CKBProtocolContext + Sync>,
901        peer_index: PeerIndex,
902        version: &str,
903    ) {
904        self.shared().state().peers().relay_connected(peer_index);
905        info_target!(
906            crate::LOG_TARGET_RELAY,
907            "RelayProtocol({}).connected peer={}",
908            version,
909            peer_index
910        );
911    }
912
913    async fn disconnected(
914        &mut self,
915        _nc: Arc<dyn CKBProtocolContext + Sync>,
916        peer_index: PeerIndex,
917    ) {
918        info_target!(
919            crate::LOG_TARGET_RELAY,
920            "RelayProtocol.disconnected peer={}",
921            peer_index
922        );
923        // Retains all keys in the rate limiter that were used recently enough.
924        self.rate_limiter.retain_recent();
925    }
926
927    async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
928        // If self is in the IBD state, don't trigger any relayer notify.
929        if self.shared.active_chain().is_initial_block_download() {
930            return;
931        }
932
933        let start_time = Instant::now();
934        trace_target!(crate::LOG_TARGET_RELAY, "start notify token={}", token);
935        match token {
936            TX_PROPOSAL_TOKEN => {
937                tokio::task::block_in_place(|| self.prune_tx_proposal_request(nc.as_ref()))
938            }
939            ASK_FOR_TXS_TOKEN => self.ask_for_txs(nc.as_ref()),
940            TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(nc.as_ref()),
941            _ => unreachable!(),
942        }
943        trace_target!(
944            crate::LOG_TARGET_RELAY,
945            "finished notify token={} cost={:?}",
946            token,
947            Instant::now().saturating_duration_since(start_time)
948        );
949    }
950}