ckb_sync/relayer/
mod.rs

1mod block_proposal_process;
2mod block_transactions_process;
3mod block_transactions_verifier;
4mod block_uncles_verifier;
5mod compact_block_process;
6mod compact_block_verifier;
7mod get_block_proposal_process;
8mod get_block_transactions_process;
9mod get_transactions_process;
10#[cfg(test)]
11pub(crate) mod tests;
12mod transaction_hashes_process;
13mod transactions_process;
14
15use self::block_proposal_process::BlockProposalProcess;
16use self::block_transactions_process::BlockTransactionsProcess;
17pub(crate) use self::compact_block_process::CompactBlockProcess;
18use self::get_block_proposal_process::GetBlockProposalProcess;
19use self::get_block_transactions_process::GetBlockTransactionsProcess;
20use self::get_transactions_process::GetTransactionsProcess;
21use self::transaction_hashes_process::TransactionHashesProcess;
22use self::transactions_process::TransactionsProcess;
23use crate::types::{ActiveChain, SyncShared, post_sync_process};
24use crate::utils::{
25    MetricDirection, async_quick_send_message_to, async_send_message_to, metric_ckb_message_bytes,
26    send_block_proposals,
27};
28use crate::{Status, StatusCode};
29use ckb_chain::VerifyResult;
30use ckb_chain::{ChainController, RemoteBlock};
31use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
32use ckb_error::is_internal_db_error;
33use ckb_logger::{
34    debug, debug_target, error, error_target, info_target, trace_target, warn_target,
35};
36use ckb_network::{
37    CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols, TargetSession,
38    async_trait, bytes::Bytes,
39};
40use ckb_shared::Shared;
41use ckb_shared::block_status::BlockStatus;
42use ckb_systemtime::unix_time_as_millis;
43use ckb_tx_pool::service::TxVerificationResult;
44use ckb_types::BlockNumberAndHash;
45use ckb_types::{
46    core::{self, BlockView},
47    packed::{self, Byte32, ProposalShortId},
48    prelude::*,
49};
50use itertools::Itertools;
51use std::collections::{HashMap, HashSet};
52use std::sync::Arc;
53use std::time::{Duration, Instant};
54
55pub const TX_PROPOSAL_TOKEN: u64 = 0;
56pub const ASK_FOR_TXS_TOKEN: u64 = 1;
57pub const TX_HASHES_TOKEN: u64 = 2;
58
59pub const MAX_RELAY_PEERS: usize = 128;
60pub const MAX_RELAY_TXS_NUM_PER_BATCH: usize = 32767;
61pub const MAX_RELAY_TXS_BYTES_PER_BATCH: usize = 1024 * 1024;
62
63type RateLimiter<T> = governor::RateLimiter<
64    T,
65    governor::state::keyed::HashMapStateStore<T>,
66    governor::clock::DefaultClock,
67>;
68
69#[derive(Debug, Eq, PartialEq)]
70pub enum ReconstructionResult {
71    Block(BlockView),
72    Missing(Vec<usize>, Vec<usize>),
73    Collided,
74    Error(Status),
75}
76
77/// Relayer protocol handle
78pub struct Relayer {
79    chain: ChainController,
80    pub(crate) shared: Arc<SyncShared>,
81    rate_limiter: RateLimiter<(PeerIndex, u32)>,
82}
83
84impl Relayer {
85    /// Init relay protocol handle
86    ///
87    /// This is a runtime relay protocol shared state, and any relay messages will be processed and forwarded by it
88    pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Self {
89        // setup a rate limiter keyed by peer and message type that lets through 30 requests per second
90        // current max rps is 10 (ASK_FOR_TXS_TOKEN / TX_PROPOSAL_TOKEN), 30 is a flexible hard cap with buffer
91        let quota = governor::Quota::per_second(std::num::NonZeroU32::new(30).unwrap());
92        let rate_limiter = RateLimiter::hashmap(quota);
93
94        Relayer {
95            chain,
96            shared,
97            rate_limiter,
98        }
99    }
100
101    /// Get shared state
102    pub fn shared(&self) -> &Arc<SyncShared> {
103        &self.shared
104    }
105
106    async fn try_process(
107        &mut self,
108        nc: Arc<dyn CKBProtocolContext + Sync>,
109        peer: PeerIndex,
110        message: packed::RelayMessageUnionReader<'_>,
111    ) -> Status {
112        // CompactBlock will be verified by POW, it's OK to skip rate limit checking.
113        let should_check_rate =
114            !matches!(message, packed::RelayMessageUnionReader::CompactBlock(_));
115
116        if should_check_rate
117            && self
118                .rate_limiter
119                .check_key(&(peer, message.item_id()))
120                .is_err()
121        {
122            return StatusCode::TooManyRequests.with_context(message.item_name());
123        }
124
125        match message {
126            packed::RelayMessageUnionReader::CompactBlock(reader) => {
127                CompactBlockProcess::new(reader, self, nc, peer)
128                    .execute()
129                    .await
130            }
131            packed::RelayMessageUnionReader::RelayTransactions(reader) => {
132                if reader.check_data() {
133                    TransactionsProcess::new(reader, self, nc, peer).execute()
134                } else {
135                    StatusCode::ProtocolMessageIsMalformed
136                        .with_context("RelayTransactions is invalid")
137                }
138            }
139            packed::RelayMessageUnionReader::RelayTransactionHashes(reader) => {
140                TransactionHashesProcess::new(reader, self, peer).execute()
141            }
142            packed::RelayMessageUnionReader::GetRelayTransactions(reader) => {
143                GetTransactionsProcess::new(reader, self, nc, peer)
144                    .execute()
145                    .await
146            }
147            packed::RelayMessageUnionReader::GetBlockTransactions(reader) => {
148                GetBlockTransactionsProcess::new(reader, self, nc, peer)
149                    .execute()
150                    .await
151            }
152            packed::RelayMessageUnionReader::BlockTransactions(reader) => {
153                if reader.check_data() {
154                    BlockTransactionsProcess::new(reader, self, nc, peer)
155                        .execute()
156                        .await
157                } else {
158                    StatusCode::ProtocolMessageIsMalformed
159                        .with_context("BlockTransactions is invalid")
160                }
161            }
162            packed::RelayMessageUnionReader::GetBlockProposal(reader) => {
163                GetBlockProposalProcess::new(reader, self, nc, peer)
164                    .execute()
165                    .await
166            }
167            packed::RelayMessageUnionReader::BlockProposal(reader) => {
168                BlockProposalProcess::new(reader, self).execute().await
169            }
170        }
171    }
172
173    async fn process(
174        &mut self,
175        nc: Arc<dyn CKBProtocolContext + Sync>,
176        peer: PeerIndex,
177        message: packed::RelayMessageUnionReader<'_>,
178    ) {
179        let item_name = message.item_name();
180        let item_bytes = message.as_slice().len() as u64;
181        let status = self.try_process(Arc::clone(&nc), peer, message).await;
182
183        metric_ckb_message_bytes(
184            MetricDirection::In,
185            &SupportProtocols::RelayV3.name(),
186            message.item_name(),
187            Some(status.code()),
188            item_bytes,
189        );
190
191        if let Some(ban_time) = status.should_ban() {
192            error_target!(
193                crate::LOG_TARGET_RELAY,
194                "receive {} from {}, ban {:?} for {}",
195                item_name,
196                peer,
197                ban_time,
198                status
199            );
200            nc.ban_peer(peer, ban_time, status.to_string());
201        } else if status.should_warn() {
202            warn_target!(
203                crate::LOG_TARGET_RELAY,
204                "receive {} from {}, {}",
205                item_name,
206                peer,
207                status
208            );
209        } else if !status.is_ok() {
210            debug_target!(
211                crate::LOG_TARGET_RELAY,
212                "receive {} from {}, {}",
213                item_name,
214                peer,
215                status
216            );
217        }
218    }
219
220    /// Request the transaction corresponding to the proposal id from the specified node
221    pub fn request_proposal_txs(
222        &self,
223        nc: &Arc<dyn CKBProtocolContext + Sync>,
224        peer: PeerIndex,
225        block_hash_and_number: BlockNumberAndHash,
226        proposals: Vec<packed::ProposalShortId>,
227    ) {
228        let tx_pool = self.shared.shared().tx_pool_controller().clone();
229        let shared = Arc::clone(&self.shared);
230        let nc = Arc::clone(nc);
231        self.shared().shared().async_handle().spawn(async move {
232            let fresh_proposals: Vec<ProposalShortId> =
233                match tx_pool.fresh_proposals_filter(proposals).await {
234                    Err(err) => {
235                        debug_target!(
236                            crate::LOG_TARGET_RELAY,
237                            "tx_pool fresh_proposals_filter error: {:?}",
238                            err,
239                        );
240                        return;
241                    }
242                    Ok(fresh_proposals) => fresh_proposals.into_iter().unique().collect(),
243                };
244
245            let to_ask_proposals: Vec<ProposalShortId> = shared
246                .state()
247                .insert_inflight_proposals(fresh_proposals.clone(), block_hash_and_number.number)
248                .into_iter()
249                .zip(fresh_proposals)
250                .filter_map(|(firstly_in, id)| if firstly_in { Some(id) } else { None })
251                .collect();
252            if !to_ask_proposals.is_empty() {
253                let content = packed::GetBlockProposal::new_builder()
254                    .block_hash(block_hash_and_number.hash)
255                    .proposals(to_ask_proposals.clone())
256                    .build();
257                let message = packed::RelayMessage::new_builder().set(content).build();
258                if !async_quick_send_message_to(&nc, peer, &message)
259                    .await
260                    .is_ok()
261                {
262                    shared.state().remove_inflight_proposals(&to_ask_proposals);
263                }
264            }
265        });
266    }
267
268    /// Accept a new block from network
269    #[allow(clippy::needless_collect)]
270    pub fn accept_block(
271        &self,
272        nc: Arc<dyn CKBProtocolContext + Sync>,
273        peer_id: PeerIndex,
274        block: core::BlockView,
275        msg_name: &str,
276    ) {
277        if self
278            .shared()
279            .active_chain()
280            .contains_block_status(&block.hash(), BlockStatus::BLOCK_STORED)
281        {
282            return;
283        }
284
285        let block = Arc::new(block);
286
287        let verify_callback = {
288            let nc: Arc<dyn CKBProtocolContext + Sync> = Arc::clone(&nc);
289            let block = Arc::clone(&block);
290            let shared = Arc::clone(self.shared());
291            let msg_name = msg_name.to_owned();
292            Box::new(move |result: VerifyResult| match result {
293                Ok(verified) => {
294                    if !verified {
295                        debug!(
296                            "block {}-{} has verified already, won't build compact block and broadcast it",
297                            block.number(),
298                            block.hash()
299                        );
300                        return;
301                    }
302
303                    build_and_broadcast_compact_block(nc.as_ref(), shared.shared(), peer_id, block);
304                }
305                Err(err) => {
306                    error!(
307                        "verify block {}-{} failed: {:?}, won't build compact block and broadcast it",
308                        block.number(),
309                        block.hash(),
310                        err
311                    );
312
313                    let is_internal_db_error = is_internal_db_error(&err);
314                    if is_internal_db_error {
315                        return;
316                    }
317
318                    // punish the malicious peer
319                    post_sync_process(
320                        nc.as_ref(),
321                        peer_id,
322                        &msg_name,
323                        StatusCode::BlockIsInvalid.with_context(format!(
324                            "block {} is invalid, reason: {}",
325                            block.hash(),
326                            err
327                        )),
328                    );
329                }
330            })
331        };
332
333        let remote_block = RemoteBlock {
334            block,
335            verify_callback,
336        };
337
338        self.shared.accept_remote_block(&self.chain, remote_block);
339    }
340
341    /// Reorganize the full block according to the compact block/txs/uncles
342    // nodes should attempt to reconstruct the full block by taking the prefilledtxn transactions
343    // from the original CompactBlock message and placing them in the marked positions,
344    // then for each short transaction ID from the original compact_block message, in order,
345    // find the corresponding transaction either from the BlockTransactions message or
346    // from other sources and place it in the first available position in the block
347    // then once the block has been reconstructed, it shall be processed as normal,
348    // keeping in mind that short_ids are expected to occasionally collide,
349    // and that nodes must not be penalized for such collisions, wherever they appear.
350    pub async fn reconstruct_block(
351        &self,
352        active_chain: &ActiveChain,
353        compact_block: &packed::CompactBlock,
354        received_transactions: Vec<core::TransactionView>,
355        uncles_index: &[u32],
356        received_uncles: &[core::UncleBlockView],
357    ) -> ReconstructionResult {
358        let block_txs_len = received_transactions.len();
359        let compact_block_hash = compact_block.calc_header_hash();
360        debug_target!(
361            crate::LOG_TARGET_RELAY,
362            "start block reconstruction, block hash: {}, received transactions len: {}",
363            compact_block_hash,
364            block_txs_len,
365        );
366
367        let mut short_ids_set: HashSet<ProposalShortId> =
368            compact_block.short_ids().into_iter().collect();
369
370        let mut txs_map: HashMap<ProposalShortId, core::TransactionView> = received_transactions
371            .into_iter()
372            .filter_map(|tx| {
373                let short_id = tx.proposal_short_id();
374                if short_ids_set.remove(&short_id) {
375                    Some((short_id, tx))
376                } else {
377                    None
378                }
379            })
380            .collect();
381
382        if !short_ids_set.is_empty() {
383            let tx_pool = self.shared.shared().tx_pool_controller();
384            let fetch_txs = tx_pool.fetch_txs(short_ids_set).await;
385            if let Err(e) = fetch_txs {
386                return ReconstructionResult::Error(StatusCode::TxPool.with_context(e));
387            }
388            txs_map.extend(fetch_txs.unwrap());
389        }
390
391        let txs_len = compact_block.txs_len();
392        let mut block_transactions: Vec<Option<core::TransactionView>> =
393            Vec::with_capacity(txs_len);
394
395        let short_ids_iter = &mut compact_block.short_ids().into_iter();
396        // fill transactions gap
397        compact_block
398            .prefilled_transactions()
399            .into_iter()
400            .for_each(|pt| {
401                let index: usize = pt.index().into();
402                let gap = index - block_transactions.len();
403                if gap > 0 {
404                    short_ids_iter
405                        .take(gap)
406                        .for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
407                }
408                block_transactions.push(Some(pt.transaction().into_view()));
409            });
410
411        // append remain transactions
412        short_ids_iter.for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
413
414        let missing = block_transactions.iter().any(Option::is_none);
415
416        let mut missing_uncles = Vec::with_capacity(compact_block.uncles().len());
417        let mut uncles = Vec::with_capacity(compact_block.uncles().len());
418
419        let mut position = 0;
420        for (i, uncle_hash) in compact_block.uncles().into_iter().enumerate() {
421            if uncles_index.contains(&(i as u32)) {
422                uncles.push(
423                    received_uncles
424                        .get(position)
425                        .expect("have checked the indexes")
426                        .clone()
427                        .data(),
428                );
429                position += 1;
430                continue;
431            };
432            let status = active_chain.get_block_status(&uncle_hash);
433            match status {
434                BlockStatus::UNKNOWN | BlockStatus::HEADER_VALID => missing_uncles.push(i),
435                BlockStatus::BLOCK_STORED | BlockStatus::BLOCK_VALID => {
436                    if let Some(uncle) = active_chain.get_block(&uncle_hash) {
437                        uncles.push(uncle.as_uncle().data());
438                    } else {
439                        debug_target!(
440                            crate::LOG_TARGET_RELAY,
441                            "reconstruct_block could not find {:#?} uncle block: {:#?}",
442                            status,
443                            uncle_hash,
444                        );
445                        missing_uncles.push(i);
446                    }
447                }
448                BlockStatus::BLOCK_RECEIVED => {
449                    if let Some(uncle) = self
450                        .chain
451                        .get_orphan_block(self.shared().store(), &uncle_hash)
452                    {
453                        uncles.push(uncle.as_uncle().data());
454                    } else {
455                        debug_target!(
456                            crate::LOG_TARGET_RELAY,
457                            "reconstruct_block could not find {:#?} uncle block: {:#?}",
458                            status,
459                            uncle_hash,
460                        );
461                        missing_uncles.push(i);
462                    }
463                }
464                BlockStatus::BLOCK_INVALID => {
465                    return ReconstructionResult::Error(
466                        StatusCode::CompactBlockHasInvalidUncle.with_context(uncle_hash),
467                    );
468                }
469                _ => missing_uncles.push(i),
470            }
471        }
472
473        if !missing && missing_uncles.is_empty() {
474            let txs = block_transactions
475                .into_iter()
476                .collect::<Option<Vec<_>>>()
477                .expect("missing checked, should not fail");
478            let block = if let Some(extension) = compact_block.extension() {
479                packed::BlockV1::new_builder()
480                    .header(compact_block.header())
481                    .uncles(uncles)
482                    .transactions(txs.into_iter().map(|tx| tx.data()).collect::<Vec<_>>())
483                    .proposals(compact_block.proposals())
484                    .extension(extension)
485                    .build()
486                    .as_v0()
487            } else {
488                packed::Block::new_builder()
489                    .header(compact_block.header())
490                    .uncles(uncles)
491                    .transactions(txs.into_iter().map(|tx| tx.data()).collect::<Vec<_>>())
492                    .proposals(compact_block.proposals())
493                    .build()
494            }
495            .into_view();
496
497            debug_target!(
498                crate::LOG_TARGET_RELAY,
499                "finish block reconstruction, block hash: {}",
500                compact_block.calc_header_hash(),
501            );
502
503            let compact_block_tx_root = compact_block.header().raw().transactions_root();
504            let reconstruct_block_tx_root = block.transactions_root();
505            if compact_block_tx_root != reconstruct_block_tx_root {
506                if compact_block.short_ids().is_empty()
507                    || compact_block.short_ids().len() == block_txs_len
508                {
509                    return ReconstructionResult::Error(
510                        StatusCode::CompactBlockHasUnmatchedTransactionRootWithReconstructedBlock
511                            .with_context(format!(
512                                "Compact_block_tx_root({}) != reconstruct_block_tx_root({})",
513                                compact_block.header().raw().transactions_root(),
514                                block.transactions_root(),
515                            )),
516                    );
517                } else {
518                    if let Some(metrics) = ckb_metrics::handle() {
519                        metrics.ckb_relay_transaction_short_id_collide.inc();
520                    }
521                    return ReconstructionResult::Collided;
522                }
523            }
524
525            ReconstructionResult::Block(block)
526        } else {
527            let missing_indexes: Vec<usize> = block_transactions
528                .iter()
529                .enumerate()
530                .filter_map(|(i, t)| if t.is_none() { Some(i) } else { None })
531                .collect();
532
533            debug_target!(
534                crate::LOG_TARGET_RELAY,
535                "block reconstruction failed, block hash: {}, missing: {}, total: {}",
536                compact_block.calc_header_hash(),
537                missing_indexes.len(),
538                compact_block.short_ids().len(),
539            );
540
541            ReconstructionResult::Missing(missing_indexes, missing_uncles)
542        }
543    }
544
545    async fn prune_tx_proposal_request(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
546        let get_block_proposals = self.shared().state().drain_get_block_proposals();
547        let tx_pool = self.shared.shared().tx_pool_controller();
548
549        let fetch_txs = tx_pool
550            .fetch_txs(
551                get_block_proposals
552                    .iter()
553                    .map(|kv_pair| kv_pair.key().clone())
554                    .collect(),
555            )
556            .await;
557        if let Err(err) = fetch_txs {
558            debug_target!(
559                crate::LOG_TARGET_RELAY,
560                "relayer prune_tx_proposal_request internal error: {:?}",
561                err,
562            );
563            return;
564        }
565
566        let txs = fetch_txs.unwrap();
567
568        let mut peer_txs = HashMap::new();
569        for (id, peer_indices) in get_block_proposals.into_iter() {
570            if let Some(tx) = txs.get(&id) {
571                for peer_index in peer_indices {
572                    let tx_set = peer_txs.entry(peer_index).or_insert_with(Vec::new);
573                    tx_set.push(tx.clone());
574                }
575            }
576        }
577
578        let mut relay_bytes = 0;
579        let mut relay_proposals = Vec::new();
580        for (peer_index, txs) in peer_txs {
581            for tx in txs {
582                let data = tx.data();
583                let tx_size = data.total_size();
584                if relay_bytes + tx_size > MAX_RELAY_TXS_BYTES_PER_BATCH {
585                    send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals))
586                        .await;
587                    relay_bytes = tx_size;
588                } else {
589                    relay_bytes += tx_size;
590                }
591                relay_proposals.push(data);
592            }
593            if !relay_proposals.is_empty() {
594                send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals)).await;
595                relay_bytes = 0;
596            }
597        }
598    }
599
600    /// Ask for relay transaction by hash from all peers
601    pub async fn ask_for_txs(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
602        for (peer, mut tx_hashes) in self.shared().state().pop_ask_for_txs() {
603            if !tx_hashes.is_empty() {
604                debug_target!(
605                    crate::LOG_TARGET_RELAY,
606                    "Send get transaction ({} hashes) to {}",
607                    tx_hashes.len(),
608                    peer,
609                );
610                tx_hashes.truncate(MAX_RELAY_TXS_NUM_PER_BATCH);
611                let content = packed::GetRelayTransactions::new_builder()
612                    .tx_hashes(tx_hashes)
613                    .build();
614                let message = packed::RelayMessage::new_builder().set(content).build();
615                let status = async_send_message_to(nc, peer, &message).await;
616                if !status.is_ok() {
617                    ckb_logger::error!(
618                        "interrupted request for transactions, status: {:?}",
619                        status
620                    );
621                }
622            }
623        }
624    }
625
626    /// Send bulk of tx hashes to selected peers
627    pub async fn send_bulk_of_tx_hashes(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
628        const BUFFER_SIZE: usize = 42;
629
630        let connected_peers = nc.full_relay_connected_peers();
631        if connected_peers.is_empty() {
632            return;
633        }
634
635        let tx_verify_results = self
636            .shared
637            .state()
638            .take_relay_tx_verify_results(MAX_RELAY_TXS_NUM_PER_BATCH);
639        let mut selected: HashMap<PeerIndex, Vec<Byte32>> = HashMap::default();
640        {
641            for tx_verify_result in tx_verify_results {
642                match tx_verify_result {
643                    TxVerificationResult::Ok {
644                        original_peer,
645                        tx_hash,
646                    } => {
647                        for target in &connected_peers {
648                            match original_peer {
649                                Some(peer) => {
650                                    // broadcast tx hash to all connected peers except original peer
651                                    if peer != *target {
652                                        let hashes = selected
653                                            .entry(*target)
654                                            .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
655                                        hashes.push(tx_hash.clone());
656                                    }
657                                }
658                                None => {
659                                    // since this tx is submitted through local rpc, it is assumed to be a new tx for all connected peers
660                                    let hashes = selected
661                                        .entry(*target)
662                                        .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
663                                    hashes.push(tx_hash.clone());
664                                    self.shared.state().mark_as_known_tx(tx_hash.clone());
665                                }
666                            }
667                        }
668                    }
669                    TxVerificationResult::Reject { tx_hash } => {
670                        self.shared.state().remove_from_known_txs(&tx_hash);
671                    }
672                    TxVerificationResult::UnknownParents { peer, parents } => {
673                        let tx_hashes: Vec<_> = {
674                            let mut tx_filter = self.shared.state().tx_filter();
675                            tx_filter.remove_expired();
676                            parents
677                                .into_iter()
678                                .filter(|tx_hash| !tx_filter.contains(tx_hash))
679                                .collect()
680                        };
681                        self.shared.state().add_ask_for_txs(peer, tx_hashes);
682                    }
683                }
684            }
685        }
686        for (peer, hashes) in selected {
687            let content = packed::RelayTransactionHashes::new_builder()
688                .tx_hashes(hashes)
689                .build();
690            let message = packed::RelayMessage::new_builder().set(content).build();
691
692            if let Err(err) = nc
693                .async_filter_broadcast(TargetSession::Single(peer), message.as_bytes())
694                .await
695            {
696                debug_target!(
697                    crate::LOG_TARGET_RELAY,
698                    "relayer send TransactionHashes error: {:?}",
699                    err,
700                );
701            }
702        }
703    }
704}
705
706fn build_and_broadcast_compact_block(
707    nc: &dyn CKBProtocolContext,
708    shared: &Shared,
709    peer: PeerIndex,
710    block: Arc<BlockView>,
711) {
712    debug_target!(
713        crate::LOG_TARGET_RELAY,
714        "[block_relay] relayer accept_block {} {}",
715        block.header().hash(),
716        unix_time_as_millis()
717    );
718    let block_hash = block.hash();
719    shared.remove_header_view(&block_hash);
720    let cb = packed::CompactBlock::build_from_block(&block, &HashSet::new());
721    let message = packed::RelayMessage::new_builder().set(cb).build();
722
723    let selected_peers: Vec<PeerIndex> = nc
724        .connected_peers()
725        .into_iter()
726        .filter(|target_peer| peer != *target_peer)
727        .take(MAX_RELAY_PEERS)
728        .collect();
729    let handle = shared.async_handle();
730    if let Err(err) = handle.block_on(nc.async_quick_filter_broadcast(
731        TargetSession::Multi(Box::new(selected_peers.into_iter())),
732        message.as_bytes(),
733    )) {
734        debug_target!(
735            crate::LOG_TARGET_RELAY,
736            "relayer send block when accept block error: {:?}",
737            err,
738        );
739    }
740
741    let snapshot = shared.snapshot();
742    let parent_chain_root = {
743        let mmr = snapshot.chain_root_mmr(block.header().number() - 1);
744        match mmr.get_root() {
745            Ok(root) => root,
746            Err(err) => {
747                error_target!(
748                    crate::LOG_TARGET_RELAY,
749                    "Generate last state to light client failed: {:?}",
750                    err
751                );
752                return;
753            }
754        }
755    };
756
757    let tip_header = packed::VerifiableHeader::new_builder()
758        .header(block.header().data())
759        .uncles_hash(block.calc_uncles_hash())
760        .extension(Pack::pack(&block.extension()))
761        .parent_chain_root(parent_chain_root)
762        .build();
763    let light_client_message = {
764        let content = packed::SendLastState::new_builder()
765            .last_header(tip_header)
766            .build();
767        packed::LightClientMessage::new_builder()
768            .set(content)
769            .build()
770    };
771    let light_client_peers: HashSet<PeerIndex> = nc
772        .connected_peers()
773        .into_iter()
774        .filter_map(|index| nc.get_peer(index).map(|peer| (index, peer)))
775        .filter(|(_id, peer)| peer.if_lightclient_subscribed)
776        .map(|(id, _)| id)
777        .collect();
778    if let Err(err) = handle.block_on(nc.async_filter_broadcast_with_proto(
779        SupportProtocols::LightClient.protocol_id(),
780        TargetSession::Filter(Box::new(move |id| light_client_peers.contains(id))),
781        light_client_message.as_bytes(),
782    )) {
783        debug_target!(
784            crate::LOG_TARGET_RELAY,
785            "relayer send last state to light client when accept block, error: {:?}",
786            err,
787        );
788    }
789}
790
791#[async_trait]
792impl CKBProtocolHandler for Relayer {
793    async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
794        nc.set_notify(Duration::from_millis(100), TX_PROPOSAL_TOKEN)
795            .await
796            .expect("set_notify at init is ok");
797        nc.set_notify(Duration::from_millis(100), ASK_FOR_TXS_TOKEN)
798            .await
799            .expect("set_notify at init is ok");
800        nc.set_notify(Duration::from_millis(300), TX_HASHES_TOKEN)
801            .await
802            .expect("set_notify at init is ok");
803    }
804
805    async fn received(
806        &mut self,
807        nc: Arc<dyn CKBProtocolContext + Sync>,
808        peer_index: PeerIndex,
809        data: Bytes,
810    ) {
811        // If self is in the IBD state, don't process any relayer message.
812        if self.shared.active_chain().is_initial_block_download() {
813            return;
814        }
815
816        let msg = match packed::RelayMessageReader::from_compatible_slice(&data) {
817            Ok(msg) => {
818                let item = msg.to_enum();
819                if let packed::RelayMessageUnionReader::CompactBlock(ref reader) = item {
820                    if reader.count_extra_fields() > 1 {
821                        info_target!(
822                            crate::LOG_TARGET_RELAY,
823                            "Peer {} sends us a malformed message: \
824                             too many fields in CompactBlock",
825                            peer_index
826                        );
827                        nc.ban_peer(
828                            peer_index,
829                            BAD_MESSAGE_BAN_TIME,
830                            String::from(
831                                "send us a malformed message: \
832                                 too many fields in CompactBlock",
833                            ),
834                        );
835                        return;
836                    } else {
837                        item
838                    }
839                } else {
840                    match packed::RelayMessageReader::from_slice(&data) {
841                        Ok(msg) => msg.to_enum(),
842                        _ => {
843                            info_target!(
844                                crate::LOG_TARGET_RELAY,
845                                "Peer {} sends us a malformed message: \
846                                 too many fields",
847                                peer_index
848                            );
849                            nc.ban_peer(
850                                peer_index,
851                                BAD_MESSAGE_BAN_TIME,
852                                String::from(
853                                    "send us a malformed message \
854                                     too many fields",
855                                ),
856                            );
857                            return;
858                        }
859                    }
860                }
861            }
862            _ => {
863                info_target!(
864                    crate::LOG_TARGET_RELAY,
865                    "Peer {} sends us a malformed message",
866                    peer_index
867                );
868                nc.ban_peer(
869                    peer_index,
870                    BAD_MESSAGE_BAN_TIME,
871                    String::from("send us a malformed message"),
872                );
873                return;
874            }
875        };
876
877        debug_target!(
878            crate::LOG_TARGET_RELAY,
879            "received msg {} from {}",
880            msg.item_name(),
881            peer_index
882        );
883        #[cfg(feature = "with_sentry")]
884        {
885            let sentry_hub = sentry::Hub::current();
886            let _scope_guard = sentry_hub.push_scope();
887            sentry_hub.configure_scope(|scope| {
888                scope.set_tag("p2p.protocol", "relayer");
889                scope.set_tag("p2p.message", msg.item_name());
890            });
891        }
892
893        let start_time = Instant::now();
894        self.process(nc, peer_index, msg).await;
895        debug_target!(
896            crate::LOG_TARGET_RELAY,
897            "process message={}, peer={}, cost={:?}",
898            msg.item_name(),
899            peer_index,
900            Instant::now().saturating_duration_since(start_time),
901        );
902    }
903
904    async fn connected(
905        &mut self,
906        _nc: Arc<dyn CKBProtocolContext + Sync>,
907        peer_index: PeerIndex,
908        version: &str,
909    ) {
910        self.shared().state().peers().relay_connected(peer_index);
911        info_target!(
912            crate::LOG_TARGET_RELAY,
913            "RelayProtocol({}).connected peer={}",
914            version,
915            peer_index
916        );
917    }
918
919    async fn disconnected(
920        &mut self,
921        _nc: Arc<dyn CKBProtocolContext + Sync>,
922        peer_index: PeerIndex,
923    ) {
924        info_target!(
925            crate::LOG_TARGET_RELAY,
926            "RelayProtocol.disconnected peer={}",
927            peer_index
928        );
929        // Retains all keys in the rate limiter that were used recently enough.
930        self.rate_limiter.retain_recent();
931    }
932
933    async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
934        // If self is in the IBD state, don't trigger any relayer notify.
935        if self.shared.active_chain().is_initial_block_download() {
936            return;
937        }
938
939        let start_time = Instant::now();
940        trace_target!(
941            crate::LOG_TARGET_RELAY,
942            "start notifas_ref()y token={}",
943            token
944        );
945        match token {
946            TX_PROPOSAL_TOKEN => self.prune_tx_proposal_request(&nc).await,
947            ASK_FOR_TXS_TOKEN => self.ask_for_txs(&nc).await,
948            TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(&nc).await,
949            _ => unreachable!(),
950        }
951        trace_target!(
952            crate::LOG_TARGET_RELAY,
953            "finished notify token={} cost={:?}",
954            token,
955            Instant::now().saturating_duration_since(start_time)
956        );
957    }
958}