ckb_sync/relayer/
mod.rs

1mod block_proposal_process;
2mod block_transactions_process;
3mod block_transactions_verifier;
4mod block_uncles_verifier;
5mod compact_block_process;
6mod compact_block_verifier;
7mod get_block_proposal_process;
8mod get_block_transactions_process;
9mod get_transactions_process;
10#[cfg(test)]
11pub(crate) mod tests;
12mod transaction_hashes_process;
13mod transactions_process;
14
15use self::block_proposal_process::BlockProposalProcess;
16use self::block_transactions_process::BlockTransactionsProcess;
17pub(crate) use self::compact_block_process::CompactBlockProcess;
18use self::get_block_proposal_process::GetBlockProposalProcess;
19use self::get_block_transactions_process::GetBlockTransactionsProcess;
20use self::get_transactions_process::GetTransactionsProcess;
21use self::transaction_hashes_process::TransactionHashesProcess;
22use self::transactions_process::TransactionsProcess;
23use crate::types::{ActiveChain, SyncShared, post_sync_process};
24use crate::utils::{MetricDirection, metric_ckb_message_bytes, send_message_to};
25use crate::{Status, StatusCode};
26use ckb_chain::VerifyResult;
27use ckb_chain::{ChainController, RemoteBlock};
28use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
29use ckb_error::is_internal_db_error;
30use ckb_logger::{
31    debug, debug_target, error, error_target, info_target, trace_target, warn_target,
32};
33use ckb_network::{
34    CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols, TargetSession,
35    async_trait, bytes::Bytes, tokio,
36};
37use ckb_shared::Shared;
38use ckb_shared::block_status::BlockStatus;
39use ckb_systemtime::unix_time_as_millis;
40use ckb_tx_pool::service::TxVerificationResult;
41use ckb_types::BlockNumberAndHash;
42use ckb_types::{
43    core::{self, BlockView},
44    packed::{self, Byte32, ProposalShortId},
45    prelude::*,
46};
47use ckb_util::Mutex;
48use itertools::Itertools;
49use std::collections::{HashMap, HashSet};
50use std::sync::Arc;
51use std::time::{Duration, Instant};
52
53pub const TX_PROPOSAL_TOKEN: u64 = 0;
54pub const ASK_FOR_TXS_TOKEN: u64 = 1;
55pub const TX_HASHES_TOKEN: u64 = 2;
56
57pub const MAX_RELAY_PEERS: usize = 128;
58pub const MAX_RELAY_TXS_NUM_PER_BATCH: usize = 32767;
59pub const MAX_RELAY_TXS_BYTES_PER_BATCH: usize = 1024 * 1024;
60
61type RateLimiter<T> = governor::RateLimiter<
62    T,
63    governor::state::keyed::DefaultKeyedStateStore<T>,
64    governor::clock::DefaultClock,
65>;
66
67#[derive(Debug, Eq, PartialEq)]
68pub enum ReconstructionResult {
69    Block(BlockView),
70    Missing(Vec<usize>, Vec<usize>),
71    Collided,
72    Error(Status),
73}
74
75/// Relayer protocol handle
76pub struct Relayer {
77    chain: ChainController,
78    pub(crate) shared: Arc<SyncShared>,
79    rate_limiter: Arc<Mutex<RateLimiter<(PeerIndex, u32)>>>,
80    v3: bool,
81}
82
83impl Relayer {
84    /// Init relay protocol handle
85    ///
86    /// This is a runtime relay protocol shared state, and any relay messages will be processed and forwarded by it
87    pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Self {
88        // setup a rate limiter keyed by peer and message type that lets through 30 requests per second
89        // current max rps is 10 (ASK_FOR_TXS_TOKEN / TX_PROPOSAL_TOKEN), 30 is a flexible hard cap with buffer
90        let quota = governor::Quota::per_second(std::num::NonZeroU32::new(30).unwrap());
91        let rate_limiter = Arc::new(Mutex::new(RateLimiter::keyed(quota)));
92
93        Relayer {
94            chain,
95            shared,
96            rate_limiter,
97            v3: false,
98        }
99    }
100
101    /// Set relay to v3
102    pub fn v3(mut self) -> Self {
103        self.v3 = true;
104        self
105    }
106
107    /// Get shared state
108    pub fn shared(&self) -> &Arc<SyncShared> {
109        &self.shared
110    }
111
112    fn try_process(
113        &mut self,
114        nc: Arc<dyn CKBProtocolContext + Sync>,
115        peer: PeerIndex,
116        message: packed::RelayMessageUnionReader<'_>,
117    ) -> Status {
118        // CompactBlock will be verified by POW, it's OK to skip rate limit checking.
119        let should_check_rate =
120            !matches!(message, packed::RelayMessageUnionReader::CompactBlock(_));
121
122        if should_check_rate
123            && self
124                .rate_limiter
125                .lock()
126                .check_key(&(peer, message.item_id()))
127                .is_err()
128        {
129            return StatusCode::TooManyRequests.with_context(message.item_name());
130        }
131
132        match message {
133            packed::RelayMessageUnionReader::CompactBlock(reader) => {
134                CompactBlockProcess::new(reader, self, nc, peer).execute()
135            }
136            packed::RelayMessageUnionReader::RelayTransactions(reader) => {
137                // after ckb2023, v2 doesn't work with relay tx
138                // before ckb2023, v3 doesn't work with relay tx
139                match RelaySwitch::new(&nc, self.v3) {
140                    RelaySwitch::Ckb2023RelayV2 | RelaySwitch::Ckb2021RelayV3 => {
141                        return Status::ignored();
142                    }
143                    RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
144                }
145                if reader.check_data() {
146                    TransactionsProcess::new(reader, self, nc, peer).execute()
147                } else {
148                    StatusCode::ProtocolMessageIsMalformed
149                        .with_context("RelayTransactions is invalid")
150                }
151            }
152            packed::RelayMessageUnionReader::RelayTransactionHashes(reader) => {
153                // after ckb2023, v2 doesn't work with relay tx
154                // before ckb2023, v3 doesn't work with relay tx
155                match RelaySwitch::new(&nc, self.v3) {
156                    RelaySwitch::Ckb2023RelayV2 | RelaySwitch::Ckb2021RelayV3 => {
157                        return Status::ignored();
158                    }
159                    RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
160                }
161                TransactionHashesProcess::new(reader, self, peer).execute()
162            }
163            packed::RelayMessageUnionReader::GetRelayTransactions(reader) => {
164                // after ckb2023, v2 doesn't work with relay tx
165                // before ckb2023, v3 doesn't work with relay tx
166                match RelaySwitch::new(&nc, self.v3) {
167                    RelaySwitch::Ckb2023RelayV2 | RelaySwitch::Ckb2021RelayV3 => {
168                        return Status::ignored();
169                    }
170                    RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
171                }
172                GetTransactionsProcess::new(reader, self, nc, peer).execute()
173            }
174            packed::RelayMessageUnionReader::GetBlockTransactions(reader) => {
175                GetBlockTransactionsProcess::new(reader, self, nc, peer).execute()
176            }
177            packed::RelayMessageUnionReader::BlockTransactions(reader) => {
178                if reader.check_data() {
179                    BlockTransactionsProcess::new(reader, self, nc, peer).execute()
180                } else {
181                    StatusCode::ProtocolMessageIsMalformed
182                        .with_context("BlockTransactions is invalid")
183                }
184            }
185            packed::RelayMessageUnionReader::GetBlockProposal(reader) => {
186                GetBlockProposalProcess::new(reader, self, nc, peer).execute()
187            }
188            packed::RelayMessageUnionReader::BlockProposal(reader) => {
189                BlockProposalProcess::new(reader, self).execute()
190            }
191        }
192    }
193
194    fn process(
195        &mut self,
196        nc: Arc<dyn CKBProtocolContext + Sync>,
197        peer: PeerIndex,
198        message: packed::RelayMessageUnionReader<'_>,
199    ) {
200        let item_name = message.item_name();
201        let item_bytes = message.as_slice().len() as u64;
202        let status = self.try_process(Arc::clone(&nc), peer, message);
203
204        metric_ckb_message_bytes(
205            MetricDirection::In,
206            &SupportProtocols::RelayV2.name(),
207            message.item_name(),
208            Some(status.code()),
209            item_bytes,
210        );
211
212        if let Some(ban_time) = status.should_ban() {
213            error_target!(
214                crate::LOG_TARGET_RELAY,
215                "receive {} from {}, ban {:?} for {}",
216                item_name,
217                peer,
218                ban_time,
219                status
220            );
221            nc.ban_peer(peer, ban_time, status.to_string());
222        } else if status.should_warn() {
223            warn_target!(
224                crate::LOG_TARGET_RELAY,
225                "receive {} from {}, {}",
226                item_name,
227                peer,
228                status
229            );
230        } else if !status.is_ok() {
231            debug_target!(
232                crate::LOG_TARGET_RELAY,
233                "receive {} from {}, {}",
234                item_name,
235                peer,
236                status
237            );
238        }
239    }
240
241    /// Request the transaction corresponding to the proposal id from the specified node
242    pub fn request_proposal_txs(
243        &self,
244        nc: &dyn CKBProtocolContext,
245        peer: PeerIndex,
246        block_hash_and_number: BlockNumberAndHash,
247        proposals: Vec<packed::ProposalShortId>,
248    ) {
249        let tx_pool = self.shared.shared().tx_pool_controller();
250        let fresh_proposals: Vec<ProposalShortId> = match tx_pool.fresh_proposals_filter(proposals)
251        {
252            Err(err) => {
253                debug_target!(
254                    crate::LOG_TARGET_RELAY,
255                    "tx_pool fresh_proposals_filter error: {:?}",
256                    err,
257                );
258                return;
259            }
260            Ok(fresh_proposals) => fresh_proposals.into_iter().unique().collect(),
261        };
262
263        let to_ask_proposals: Vec<ProposalShortId> = self
264            .shared()
265            .state()
266            .insert_inflight_proposals(fresh_proposals.clone(), block_hash_and_number.number)
267            .into_iter()
268            .zip(fresh_proposals)
269            .filter_map(|(firstly_in, id)| if firstly_in { Some(id) } else { None })
270            .collect();
271        if !to_ask_proposals.is_empty() {
272            let content = packed::GetBlockProposal::new_builder()
273                .block_hash(block_hash_and_number.hash)
274                .proposals(to_ask_proposals.clone().pack())
275                .build();
276            let message = packed::RelayMessage::new_builder().set(content).build();
277            if !send_message_to(nc, peer, &message).is_ok() {
278                self.shared()
279                    .state()
280                    .remove_inflight_proposals(&to_ask_proposals);
281            }
282        }
283    }
284
285    /// Accept a new block from network
286    #[allow(clippy::needless_collect)]
287    pub fn accept_block(
288        &self,
289        nc: Arc<dyn CKBProtocolContext + Sync>,
290        peer_id: PeerIndex,
291        block: core::BlockView,
292        msg_name: &str,
293    ) {
294        if self
295            .shared()
296            .active_chain()
297            .contains_block_status(&block.hash(), BlockStatus::BLOCK_STORED)
298        {
299            return;
300        }
301
302        let block = Arc::new(block);
303
304        let verify_callback = {
305            let nc: Arc<dyn CKBProtocolContext + Sync> = Arc::clone(&nc);
306            let block = Arc::clone(&block);
307            let shared = Arc::clone(self.shared());
308            let msg_name = msg_name.to_owned();
309            Box::new(move |result: VerifyResult| match result {
310                Ok(verified) => {
311                    if !verified {
312                        debug!(
313                            "block {}-{} has verified already, won't build compact block and broadcast it",
314                            block.number(),
315                            block.hash()
316                        );
317                        return;
318                    }
319
320                    build_and_broadcast_compact_block(nc.as_ref(), shared.shared(), peer_id, block);
321                }
322                Err(err) => {
323                    error!(
324                        "verify block {}-{} failed: {:?}, won't build compact block and broadcast it",
325                        block.number(),
326                        block.hash(),
327                        err
328                    );
329
330                    let is_internal_db_error = is_internal_db_error(&err);
331                    if is_internal_db_error {
332                        return;
333                    }
334
335                    // punish the malicious peer
336                    post_sync_process(
337                        nc.as_ref(),
338                        peer_id,
339                        &msg_name,
340                        StatusCode::BlockIsInvalid.with_context(format!(
341                            "block {} is invalid, reason: {}",
342                            block.hash(),
343                            err
344                        )),
345                    );
346                }
347            })
348        };
349
350        let remote_block = RemoteBlock {
351            block,
352            verify_callback,
353        };
354
355        self.shared.accept_remote_block(&self.chain, remote_block);
356    }
357
358    /// Reorganize the full block according to the compact block/txs/uncles
359    // nodes should attempt to reconstruct the full block by taking the prefilledtxn transactions
360    // from the original CompactBlock message and placing them in the marked positions,
361    // then for each short transaction ID from the original compact_block message, in order,
362    // find the corresponding transaction either from the BlockTransactions message or
363    // from other sources and place it in the first available position in the block
364    // then once the block has been reconstructed, it shall be processed as normal,
365    // keeping in mind that short_ids are expected to occasionally collide,
366    // and that nodes must not be penalized for such collisions, wherever they appear.
367    pub fn reconstruct_block(
368        &self,
369        active_chain: &ActiveChain,
370        compact_block: &packed::CompactBlock,
371        received_transactions: Vec<core::TransactionView>,
372        uncles_index: &[u32],
373        received_uncles: &[core::UncleBlockView],
374    ) -> ReconstructionResult {
375        let block_txs_len = received_transactions.len();
376        let compact_block_hash = compact_block.calc_header_hash();
377        debug_target!(
378            crate::LOG_TARGET_RELAY,
379            "start block reconstruction, block hash: {}, received transactions len: {}",
380            compact_block_hash,
381            block_txs_len,
382        );
383
384        let mut short_ids_set: HashSet<ProposalShortId> =
385            compact_block.short_ids().into_iter().collect();
386
387        let mut txs_map: HashMap<ProposalShortId, core::TransactionView> = received_transactions
388            .into_iter()
389            .filter_map(|tx| {
390                let short_id = tx.proposal_short_id();
391                if short_ids_set.remove(&short_id) {
392                    Some((short_id, tx))
393                } else {
394                    None
395                }
396            })
397            .collect();
398
399        if !short_ids_set.is_empty() {
400            let tx_pool = self.shared.shared().tx_pool_controller();
401            let fetch_txs = tx_pool.fetch_txs(short_ids_set);
402            if let Err(e) = fetch_txs {
403                return ReconstructionResult::Error(StatusCode::TxPool.with_context(e));
404            }
405            txs_map.extend(fetch_txs.unwrap());
406        }
407
408        let txs_len = compact_block.txs_len();
409        let mut block_transactions: Vec<Option<core::TransactionView>> =
410            Vec::with_capacity(txs_len);
411
412        let short_ids_iter = &mut compact_block.short_ids().into_iter();
413        // fill transactions gap
414        compact_block
415            .prefilled_transactions()
416            .into_iter()
417            .for_each(|pt| {
418                let index: usize = pt.index().unpack();
419                let gap = index - block_transactions.len();
420                if gap > 0 {
421                    short_ids_iter
422                        .take(gap)
423                        .for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
424                }
425                block_transactions.push(Some(pt.transaction().into_view()));
426            });
427
428        // append remain transactions
429        short_ids_iter.for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
430
431        let missing = block_transactions.iter().any(Option::is_none);
432
433        let mut missing_uncles = Vec::with_capacity(compact_block.uncles().len());
434        let mut uncles = Vec::with_capacity(compact_block.uncles().len());
435
436        let mut position = 0;
437        for (i, uncle_hash) in compact_block.uncles().into_iter().enumerate() {
438            if uncles_index.contains(&(i as u32)) {
439                uncles.push(
440                    received_uncles
441                        .get(position)
442                        .expect("have checked the indexes")
443                        .clone()
444                        .data(),
445                );
446                position += 1;
447                continue;
448            };
449            let status = active_chain.get_block_status(&uncle_hash);
450            match status {
451                BlockStatus::UNKNOWN | BlockStatus::HEADER_VALID => missing_uncles.push(i),
452                BlockStatus::BLOCK_STORED | BlockStatus::BLOCK_VALID => {
453                    if let Some(uncle) = active_chain.get_block(&uncle_hash) {
454                        uncles.push(uncle.as_uncle().data());
455                    } else {
456                        debug_target!(
457                            crate::LOG_TARGET_RELAY,
458                            "reconstruct_block could not find {:#?} uncle block: {:#?}",
459                            status,
460                            uncle_hash,
461                        );
462                        missing_uncles.push(i);
463                    }
464                }
465                BlockStatus::BLOCK_RECEIVED => {
466                    if let Some(uncle) = self
467                        .chain
468                        .get_orphan_block(self.shared().store(), &uncle_hash)
469                    {
470                        uncles.push(uncle.as_uncle().data());
471                    } else {
472                        debug_target!(
473                            crate::LOG_TARGET_RELAY,
474                            "reconstruct_block could not find {:#?} uncle block: {:#?}",
475                            status,
476                            uncle_hash,
477                        );
478                        missing_uncles.push(i);
479                    }
480                }
481                BlockStatus::BLOCK_INVALID => {
482                    return ReconstructionResult::Error(
483                        StatusCode::CompactBlockHasInvalidUncle.with_context(uncle_hash),
484                    );
485                }
486                _ => missing_uncles.push(i),
487            }
488        }
489
490        if !missing && missing_uncles.is_empty() {
491            let txs = block_transactions
492                .into_iter()
493                .collect::<Option<Vec<_>>>()
494                .expect("missing checked, should not fail");
495            let block = if let Some(extension) = compact_block.extension() {
496                packed::BlockV1::new_builder()
497                    .header(compact_block.header())
498                    .uncles(uncles.pack())
499                    .transactions(txs.into_iter().map(|tx| tx.data()).pack())
500                    .proposals(compact_block.proposals())
501                    .extension(extension)
502                    .build()
503                    .as_v0()
504            } else {
505                packed::Block::new_builder()
506                    .header(compact_block.header())
507                    .uncles(uncles.pack())
508                    .transactions(txs.into_iter().map(|tx| tx.data()).pack())
509                    .proposals(compact_block.proposals())
510                    .build()
511            }
512            .into_view();
513
514            debug_target!(
515                crate::LOG_TARGET_RELAY,
516                "finish block reconstruction, block hash: {}",
517                compact_block.calc_header_hash(),
518            );
519
520            let compact_block_tx_root = compact_block.header().raw().transactions_root();
521            let reconstruct_block_tx_root = block.transactions_root();
522            if compact_block_tx_root != reconstruct_block_tx_root {
523                if compact_block.short_ids().is_empty()
524                    || compact_block.short_ids().len() == block_txs_len
525                {
526                    return ReconstructionResult::Error(
527                        StatusCode::CompactBlockHasUnmatchedTransactionRootWithReconstructedBlock
528                            .with_context(format!(
529                                "Compact_block_tx_root({}) != reconstruct_block_tx_root({})",
530                                compact_block.header().raw().transactions_root(),
531                                block.transactions_root(),
532                            )),
533                    );
534                } else {
535                    if let Some(metrics) = ckb_metrics::handle() {
536                        metrics.ckb_relay_transaction_short_id_collide.inc();
537                    }
538                    return ReconstructionResult::Collided;
539                }
540            }
541
542            ReconstructionResult::Block(block)
543        } else {
544            let missing_indexes: Vec<usize> = block_transactions
545                .iter()
546                .enumerate()
547                .filter_map(|(i, t)| if t.is_none() { Some(i) } else { None })
548                .collect();
549
550            debug_target!(
551                crate::LOG_TARGET_RELAY,
552                "block reconstruction failed, block hash: {}, missing: {}, total: {}",
553                compact_block.calc_header_hash(),
554                missing_indexes.len(),
555                compact_block.short_ids().len(),
556            );
557
558            ReconstructionResult::Missing(missing_indexes, missing_uncles)
559        }
560    }
561
562    fn prune_tx_proposal_request(&self, nc: &dyn CKBProtocolContext) {
563        let get_block_proposals = self.shared().state().drain_get_block_proposals();
564        let tx_pool = self.shared.shared().tx_pool_controller();
565
566        let fetch_txs = tx_pool.fetch_txs(
567            get_block_proposals
568                .iter()
569                .map(|kv_pair| kv_pair.key().clone())
570                .collect(),
571        );
572        if let Err(err) = fetch_txs {
573            debug_target!(
574                crate::LOG_TARGET_RELAY,
575                "relayer prune_tx_proposal_request internal error: {:?}",
576                err,
577            );
578            return;
579        }
580
581        let txs = fetch_txs.unwrap();
582
583        let mut peer_txs = HashMap::new();
584        for (id, peer_indices) in get_block_proposals.into_iter() {
585            if let Some(tx) = txs.get(&id) {
586                for peer_index in peer_indices {
587                    let tx_set = peer_txs.entry(peer_index).or_insert_with(Vec::new);
588                    tx_set.push(tx.clone());
589                }
590            }
591        }
592
593        let send_block_proposals =
594            |nc: &dyn CKBProtocolContext, peer_index: PeerIndex, txs: Vec<packed::Transaction>| {
595                let content = packed::BlockProposal::new_builder()
596                    .transactions(txs.into_iter().pack())
597                    .build();
598                let message = packed::RelayMessage::new_builder().set(content).build();
599                let status = send_message_to(nc, peer_index, &message);
600                if !status.is_ok() {
601                    ckb_logger::error!(
602                        "send RelayBlockProposal to {}, status: {:?}",
603                        peer_index,
604                        status
605                    );
606                }
607            };
608
609        let mut relay_bytes = 0;
610        let mut relay_proposals = Vec::new();
611        for (peer_index, txs) in peer_txs {
612            for tx in txs {
613                let data = tx.data();
614                let tx_size = data.total_size();
615                if relay_bytes + tx_size > MAX_RELAY_TXS_BYTES_PER_BATCH {
616                    send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals));
617                    relay_bytes = tx_size;
618                } else {
619                    relay_bytes += tx_size;
620                }
621                relay_proposals.push(data);
622            }
623            if !relay_proposals.is_empty() {
624                send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals));
625                relay_bytes = 0;
626            }
627        }
628    }
629
630    /// Ask for relay transaction by hash from all peers
631    pub fn ask_for_txs(&self, nc: &dyn CKBProtocolContext) {
632        for (peer, mut tx_hashes) in self.shared().state().pop_ask_for_txs() {
633            if !tx_hashes.is_empty() {
634                debug_target!(
635                    crate::LOG_TARGET_RELAY,
636                    "Send get transaction ({} hashes) to {}",
637                    tx_hashes.len(),
638                    peer,
639                );
640                tx_hashes.truncate(MAX_RELAY_TXS_NUM_PER_BATCH);
641                let content = packed::GetRelayTransactions::new_builder()
642                    .tx_hashes(tx_hashes.pack())
643                    .build();
644                let message = packed::RelayMessage::new_builder().set(content).build();
645                let status = send_message_to(nc, peer, &message);
646                if !status.is_ok() {
647                    ckb_logger::error!(
648                        "interrupted request for transactions, status: {:?}",
649                        status
650                    );
651                }
652            }
653        }
654    }
655
656    /// Send bulk of tx hashes to selected peers
657    pub fn send_bulk_of_tx_hashes(&self, nc: &dyn CKBProtocolContext) {
658        const BUFFER_SIZE: usize = 42;
659
660        let connected_peers = nc.connected_peers();
661        if connected_peers.is_empty() {
662            return;
663        }
664
665        let ckb2023 = nc.ckb2023();
666        let tx_verify_results = self
667            .shared
668            .state()
669            .take_relay_tx_verify_results(MAX_RELAY_TXS_NUM_PER_BATCH);
670        let mut selected: HashMap<PeerIndex, Vec<Byte32>> = HashMap::default();
671        {
672            for tx_verify_result in tx_verify_results {
673                match tx_verify_result {
674                    TxVerificationResult::Ok {
675                        original_peer,
676                        with_vm_2023,
677                        tx_hash,
678                    } => {
679                        // must all fork or all no-fork
680                        if ckb2023 != with_vm_2023 {
681                            continue;
682                        }
683                        for target in &connected_peers {
684                            match original_peer {
685                                Some(peer) => {
686                                    // broadcast tx hash to all connected peers except original peer
687                                    if peer != *target {
688                                        let hashes = selected
689                                            .entry(*target)
690                                            .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
691                                        hashes.push(tx_hash.clone());
692                                    }
693                                }
694                                None => {
695                                    // since this tx is submitted through local rpc, it is assumed to be a new tx for all connected peers
696                                    let hashes = selected
697                                        .entry(*target)
698                                        .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
699                                    hashes.push(tx_hash.clone());
700                                    self.shared.state().mark_as_known_tx(tx_hash.clone());
701                                }
702                            }
703                        }
704                    }
705                    TxVerificationResult::Reject { tx_hash } => {
706                        self.shared.state().remove_from_known_txs(&tx_hash);
707                    }
708                    TxVerificationResult::UnknownParents { peer, parents } => {
709                        let tx_hashes: Vec<_> = {
710                            let mut tx_filter = self.shared.state().tx_filter();
711                            tx_filter.remove_expired();
712                            parents
713                                .into_iter()
714                                .filter(|tx_hash| !tx_filter.contains(tx_hash))
715                                .collect()
716                        };
717                        self.shared.state().add_ask_for_txs(peer, tx_hashes);
718                    }
719                }
720            }
721        }
722        for (peer, hashes) in selected {
723            let content = packed::RelayTransactionHashes::new_builder()
724                .tx_hashes(hashes.pack())
725                .build();
726            let message = packed::RelayMessage::new_builder().set(content).build();
727
728            if let Err(err) = nc.filter_broadcast(TargetSession::Single(peer), message.as_bytes()) {
729                debug_target!(
730                    crate::LOG_TARGET_RELAY,
731                    "relayer send TransactionHashes error: {:?}",
732                    err,
733                );
734            }
735        }
736    }
737}
738
739fn build_and_broadcast_compact_block(
740    nc: &dyn CKBProtocolContext,
741    shared: &Shared,
742    peer: PeerIndex,
743    block: Arc<BlockView>,
744) {
745    debug_target!(
746        crate::LOG_TARGET_RELAY,
747        "[block_relay] relayer accept_block {} {}",
748        block.header().hash(),
749        unix_time_as_millis()
750    );
751    let block_hash = block.hash();
752    shared.remove_header_view(&block_hash);
753    let cb = packed::CompactBlock::build_from_block(&block, &HashSet::new());
754    let message = packed::RelayMessage::new_builder().set(cb).build();
755
756    let selected_peers: Vec<PeerIndex> = nc
757        .connected_peers()
758        .into_iter()
759        .filter(|target_peer| peer != *target_peer)
760        .take(MAX_RELAY_PEERS)
761        .collect();
762    if let Err(err) = nc.quick_filter_broadcast(
763        TargetSession::Multi(Box::new(selected_peers.into_iter())),
764        message.as_bytes(),
765    ) {
766        debug_target!(
767            crate::LOG_TARGET_RELAY,
768            "relayer send block when accept block error: {:?}",
769            err,
770        );
771    }
772
773    if let Some(p2p_control) = nc.p2p_control() {
774        let snapshot = shared.snapshot();
775        let parent_chain_root = {
776            let mmr = snapshot.chain_root_mmr(block.header().number() - 1);
777            match mmr.get_root() {
778                Ok(root) => root,
779                Err(err) => {
780                    error_target!(
781                        crate::LOG_TARGET_RELAY,
782                        "Generate last state to light client failed: {:?}",
783                        err
784                    );
785                    return;
786                }
787            }
788        };
789
790        let tip_header = packed::VerifiableHeader::new_builder()
791            .header(block.header().data())
792            .uncles_hash(block.calc_uncles_hash())
793            .extension(Pack::pack(&block.extension()))
794            .parent_chain_root(parent_chain_root)
795            .build();
796        let light_client_message = {
797            let content = packed::SendLastState::new_builder()
798                .last_header(tip_header)
799                .build();
800            packed::LightClientMessage::new_builder()
801                .set(content)
802                .build()
803        };
804        let light_client_peers: HashSet<PeerIndex> = nc
805            .connected_peers()
806            .into_iter()
807            .filter_map(|index| nc.get_peer(index).map(|peer| (index, peer)))
808            .filter(|(_id, peer)| peer.if_lightclient_subscribed)
809            .map(|(id, _)| id)
810            .collect();
811        if let Err(err) = p2p_control.filter_broadcast(
812            TargetSession::Filter(Box::new(move |id| light_client_peers.contains(id))),
813            SupportProtocols::LightClient.protocol_id(),
814            light_client_message.as_bytes(),
815        ) {
816            debug_target!(
817                crate::LOG_TARGET_RELAY,
818                "relayer send last state to light client when accept block, error: {:?}",
819                err,
820            );
821        }
822    }
823}
824
825#[async_trait]
826impl CKBProtocolHandler for Relayer {
827    async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
828        nc.set_notify(Duration::from_millis(100), TX_PROPOSAL_TOKEN)
829            .await
830            .expect("set_notify at init is ok");
831        nc.set_notify(Duration::from_millis(100), ASK_FOR_TXS_TOKEN)
832            .await
833            .expect("set_notify at init is ok");
834        nc.set_notify(Duration::from_millis(300), TX_HASHES_TOKEN)
835            .await
836            .expect("set_notify at init is ok");
837    }
838
839    async fn received(
840        &mut self,
841        nc: Arc<dyn CKBProtocolContext + Sync>,
842        peer_index: PeerIndex,
843        data: Bytes,
844    ) {
845        // If self is in the IBD state, don't process any relayer message.
846        if self.shared.active_chain().is_initial_block_download() {
847            return;
848        }
849
850        let msg = match packed::RelayMessageReader::from_compatible_slice(&data) {
851            Ok(msg) => {
852                let item = msg.to_enum();
853                if let packed::RelayMessageUnionReader::CompactBlock(ref reader) = item {
854                    if reader.count_extra_fields() > 1 {
855                        info_target!(
856                            crate::LOG_TARGET_RELAY,
857                            "Peer {} sends us a malformed message: \
858                             too many fields in CompactBlock",
859                            peer_index
860                        );
861                        nc.ban_peer(
862                            peer_index,
863                            BAD_MESSAGE_BAN_TIME,
864                            String::from(
865                                "send us a malformed message: \
866                                 too many fields in CompactBlock",
867                            ),
868                        );
869                        return;
870                    } else {
871                        item
872                    }
873                } else {
874                    match packed::RelayMessageReader::from_slice(&data) {
875                        Ok(msg) => msg.to_enum(),
876                        _ => {
877                            info_target!(
878                                crate::LOG_TARGET_RELAY,
879                                "Peer {} sends us a malformed message: \
880                                 too many fields",
881                                peer_index
882                            );
883                            nc.ban_peer(
884                                peer_index,
885                                BAD_MESSAGE_BAN_TIME,
886                                String::from(
887                                    "send us a malformed message \
888                                     too many fields",
889                                ),
890                            );
891                            return;
892                        }
893                    }
894                }
895            }
896            _ => {
897                info_target!(
898                    crate::LOG_TARGET_RELAY,
899                    "Peer {} sends us a malformed message",
900                    peer_index
901                );
902                nc.ban_peer(
903                    peer_index,
904                    BAD_MESSAGE_BAN_TIME,
905                    String::from("send us a malformed message"),
906                );
907                return;
908            }
909        };
910
911        debug_target!(
912            crate::LOG_TARGET_RELAY,
913            "received msg {} from {}",
914            msg.item_name(),
915            peer_index
916        );
917        #[cfg(feature = "with_sentry")]
918        {
919            let sentry_hub = sentry::Hub::current();
920            let _scope_guard = sentry_hub.push_scope();
921            sentry_hub.configure_scope(|scope| {
922                scope.set_tag("p2p.protocol", "relayer");
923                scope.set_tag("p2p.message", msg.item_name());
924            });
925        }
926
927        let start_time = Instant::now();
928        tokio::task::block_in_place(|| self.process(nc, peer_index, msg));
929        debug_target!(
930            crate::LOG_TARGET_RELAY,
931            "process message={}, peer={}, cost={:?}",
932            msg.item_name(),
933            peer_index,
934            Instant::now().saturating_duration_since(start_time),
935        );
936    }
937
938    async fn connected(
939        &mut self,
940        _nc: Arc<dyn CKBProtocolContext + Sync>,
941        peer_index: PeerIndex,
942        version: &str,
943    ) {
944        self.shared().state().peers().relay_connected(peer_index);
945        info_target!(
946            crate::LOG_TARGET_RELAY,
947            "RelayProtocol({}).connected peer={}",
948            version,
949            peer_index
950        );
951    }
952
953    async fn disconnected(
954        &mut self,
955        _nc: Arc<dyn CKBProtocolContext + Sync>,
956        peer_index: PeerIndex,
957    ) {
958        info_target!(
959            crate::LOG_TARGET_RELAY,
960            "RelayProtocol.disconnected peer={}",
961            peer_index
962        );
963        // Retains all keys in the rate limiter that were used recently enough.
964        self.rate_limiter.lock().retain_recent();
965    }
966
967    async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
968        // If self is in the IBD state, don't trigger any relayer notify.
969        if self.shared.active_chain().is_initial_block_download() {
970            return;
971        }
972
973        match RelaySwitch::new(&nc, self.v3) {
974            RelaySwitch::Ckb2021RelayV3 => return,
975            RelaySwitch::Ckb2023RelayV2 => {
976                if nc.remove_notify(TX_PROPOSAL_TOKEN).await.is_err() {
977                    trace_target!(crate::LOG_TARGET_RELAY, "remove v2 relay notify fail");
978                }
979                if nc.remove_notify(ASK_FOR_TXS_TOKEN).await.is_err() {
980                    trace_target!(crate::LOG_TARGET_RELAY, "remove v2 relay notify fail");
981                }
982                if nc.remove_notify(TX_HASHES_TOKEN).await.is_err() {
983                    trace_target!(crate::LOG_TARGET_RELAY, "remove v2 relay notify fail");
984                }
985                for kv_pair in self.shared().state().peers().state.iter() {
986                    let (peer, state) = kv_pair.pair();
987                    if !state.peer_flags.is_2023edition {
988                        let _ignore = nc.disconnect(*peer, "Evict low-version clients ");
989                    }
990                }
991                return;
992            }
993            RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
994        }
995
996        let start_time = Instant::now();
997        trace_target!(crate::LOG_TARGET_RELAY, "start notify token={}", token);
998        match token {
999            TX_PROPOSAL_TOKEN => {
1000                tokio::task::block_in_place(|| self.prune_tx_proposal_request(nc.as_ref()))
1001            }
1002            ASK_FOR_TXS_TOKEN => self.ask_for_txs(nc.as_ref()),
1003            TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(nc.as_ref()),
1004            _ => unreachable!(),
1005        }
1006        trace_target!(
1007            crate::LOG_TARGET_RELAY,
1008            "finished notify token={} cost={:?}",
1009            token,
1010            Instant::now().saturating_duration_since(start_time)
1011        );
1012    }
1013}
1014
1015#[derive(Copy, Clone, Debug)]
1016enum RelaySwitch {
1017    Ckb2021RelayV2,
1018    Ckb2021RelayV3,
1019    Ckb2023RelayV2,
1020    Ckb2023RelayV3,
1021}
1022
1023impl RelaySwitch {
1024    fn new(nc: &Arc<dyn CKBProtocolContext + Sync>, is_relay_v3: bool) -> Self {
1025        match (nc.ckb2023(), is_relay_v3) {
1026            (true, true) => Self::Ckb2023RelayV3,
1027            (true, false) => Self::Ckb2023RelayV2,
1028            (false, true) => Self::Ckb2021RelayV3,
1029            (false, false) => Self::Ckb2021RelayV2,
1030        }
1031    }
1032}