ckb_sync/relayer/
mod.rs

1mod block_proposal_process;
2mod block_transactions_process;
3mod block_transactions_verifier;
4mod block_uncles_verifier;
5mod compact_block_process;
6mod compact_block_verifier;
7mod get_block_proposal_process;
8mod get_block_transactions_process;
9mod get_transactions_process;
10#[cfg(test)]
11pub(crate) mod tests;
12mod transaction_hashes_process;
13mod transactions_process;
14
15use self::block_proposal_process::BlockProposalProcess;
16use self::block_transactions_process::BlockTransactionsProcess;
17pub(crate) use self::compact_block_process::CompactBlockProcess;
18use self::get_block_proposal_process::GetBlockProposalProcess;
19use self::get_block_transactions_process::GetBlockTransactionsProcess;
20use self::get_transactions_process::GetTransactionsProcess;
21use self::transaction_hashes_process::TransactionHashesProcess;
22use self::transactions_process::TransactionsProcess;
23use crate::types::{ActiveChain, SyncShared, post_sync_process};
24use crate::utils::{MetricDirection, metric_ckb_message_bytes, send_message_to};
25use crate::{Status, StatusCode};
26use ckb_chain::VerifyResult;
27use ckb_chain::{ChainController, RemoteBlock};
28use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
29use ckb_error::is_internal_db_error;
30use ckb_logger::{
31    debug, debug_target, error, error_target, info_target, trace_target, warn_target,
32};
33use ckb_network::{
34    CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols, TargetSession,
35    async_trait, bytes::Bytes, tokio,
36};
37use ckb_shared::Shared;
38use ckb_shared::block_status::BlockStatus;
39use ckb_systemtime::unix_time_as_millis;
40use ckb_tx_pool::service::TxVerificationResult;
41use ckb_types::BlockNumberAndHash;
42use ckb_types::{
43    core::{self, BlockView},
44    packed::{self, Byte32, ProposalShortId},
45    prelude::*,
46};
47use itertools::Itertools;
48use std::collections::{HashMap, HashSet};
49use std::sync::Arc;
50use std::time::{Duration, Instant};
51
52pub const TX_PROPOSAL_TOKEN: u64 = 0;
53pub const ASK_FOR_TXS_TOKEN: u64 = 1;
54pub const TX_HASHES_TOKEN: u64 = 2;
55
56pub const MAX_RELAY_PEERS: usize = 128;
57pub const MAX_RELAY_TXS_NUM_PER_BATCH: usize = 32767;
58pub const MAX_RELAY_TXS_BYTES_PER_BATCH: usize = 1024 * 1024;
59
60type RateLimiter<T> = governor::RateLimiter<
61    T,
62    governor::state::keyed::HashMapStateStore<T>,
63    governor::clock::DefaultClock,
64>;
65
66#[derive(Debug, Eq, PartialEq)]
67pub enum ReconstructionResult {
68    Block(BlockView),
69    Missing(Vec<usize>, Vec<usize>),
70    Collided,
71    Error(Status),
72}
73
74/// Relayer protocol handle
75pub struct Relayer {
76    chain: ChainController,
77    pub(crate) shared: Arc<SyncShared>,
78    rate_limiter: RateLimiter<(PeerIndex, u32)>,
79    v3: bool,
80}
81
82impl Relayer {
83    /// Init relay protocol handle
84    ///
85    /// This is a runtime relay protocol shared state, and any relay messages will be processed and forwarded by it
86    pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Self {
87        // setup a rate limiter keyed by peer and message type that lets through 30 requests per second
88        // current max rps is 10 (ASK_FOR_TXS_TOKEN / TX_PROPOSAL_TOKEN), 30 is a flexible hard cap with buffer
89        let quota = governor::Quota::per_second(std::num::NonZeroU32::new(30).unwrap());
90        let rate_limiter = RateLimiter::hashmap(quota);
91
92        Relayer {
93            chain,
94            shared,
95            rate_limiter,
96            v3: false,
97        }
98    }
99
100    /// Set relay to v3
101    pub fn v3(mut self) -> Self {
102        self.v3 = true;
103        self
104    }
105
106    /// Get shared state
107    pub fn shared(&self) -> &Arc<SyncShared> {
108        &self.shared
109    }
110
111    fn try_process(
112        &mut self,
113        nc: Arc<dyn CKBProtocolContext + Sync>,
114        peer: PeerIndex,
115        message: packed::RelayMessageUnionReader<'_>,
116    ) -> Status {
117        // CompactBlock will be verified by POW, it's OK to skip rate limit checking.
118        let should_check_rate =
119            !matches!(message, packed::RelayMessageUnionReader::CompactBlock(_));
120
121        if should_check_rate
122            && self
123                .rate_limiter
124                .check_key(&(peer, message.item_id()))
125                .is_err()
126        {
127            return StatusCode::TooManyRequests.with_context(message.item_name());
128        }
129
130        match message {
131            packed::RelayMessageUnionReader::CompactBlock(reader) => {
132                CompactBlockProcess::new(reader, self, nc, peer).execute()
133            }
134            packed::RelayMessageUnionReader::RelayTransactions(reader) => {
135                // after ckb2023, v2 doesn't work with relay tx
136                // before ckb2023, v3 doesn't work with relay tx
137                match RelaySwitch::new(&nc, self.v3) {
138                    RelaySwitch::Ckb2023RelayV2 | RelaySwitch::Ckb2021RelayV3 => {
139                        return Status::ignored();
140                    }
141                    RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
142                }
143                if reader.check_data() {
144                    TransactionsProcess::new(reader, self, nc, peer).execute()
145                } else {
146                    StatusCode::ProtocolMessageIsMalformed
147                        .with_context("RelayTransactions is invalid")
148                }
149            }
150            packed::RelayMessageUnionReader::RelayTransactionHashes(reader) => {
151                // after ckb2023, v2 doesn't work with relay tx
152                // before ckb2023, v3 doesn't work with relay tx
153                match RelaySwitch::new(&nc, self.v3) {
154                    RelaySwitch::Ckb2023RelayV2 | RelaySwitch::Ckb2021RelayV3 => {
155                        return Status::ignored();
156                    }
157                    RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
158                }
159                TransactionHashesProcess::new(reader, self, peer).execute()
160            }
161            packed::RelayMessageUnionReader::GetRelayTransactions(reader) => {
162                // after ckb2023, v2 doesn't work with relay tx
163                // before ckb2023, v3 doesn't work with relay tx
164                match RelaySwitch::new(&nc, self.v3) {
165                    RelaySwitch::Ckb2023RelayV2 | RelaySwitch::Ckb2021RelayV3 => {
166                        return Status::ignored();
167                    }
168                    RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
169                }
170                GetTransactionsProcess::new(reader, self, nc, peer).execute()
171            }
172            packed::RelayMessageUnionReader::GetBlockTransactions(reader) => {
173                GetBlockTransactionsProcess::new(reader, self, nc, peer).execute()
174            }
175            packed::RelayMessageUnionReader::BlockTransactions(reader) => {
176                if reader.check_data() {
177                    BlockTransactionsProcess::new(reader, self, nc, peer).execute()
178                } else {
179                    StatusCode::ProtocolMessageIsMalformed
180                        .with_context("BlockTransactions is invalid")
181                }
182            }
183            packed::RelayMessageUnionReader::GetBlockProposal(reader) => {
184                GetBlockProposalProcess::new(reader, self, nc, peer).execute()
185            }
186            packed::RelayMessageUnionReader::BlockProposal(reader) => {
187                BlockProposalProcess::new(reader, self).execute()
188            }
189        }
190    }
191
192    fn process(
193        &mut self,
194        nc: Arc<dyn CKBProtocolContext + Sync>,
195        peer: PeerIndex,
196        message: packed::RelayMessageUnionReader<'_>,
197    ) {
198        let item_name = message.item_name();
199        let item_bytes = message.as_slice().len() as u64;
200        let status = self.try_process(Arc::clone(&nc), peer, message);
201
202        metric_ckb_message_bytes(
203            MetricDirection::In,
204            &SupportProtocols::RelayV2.name(),
205            message.item_name(),
206            Some(status.code()),
207            item_bytes,
208        );
209
210        if let Some(ban_time) = status.should_ban() {
211            error_target!(
212                crate::LOG_TARGET_RELAY,
213                "receive {} from {}, ban {:?} for {}",
214                item_name,
215                peer,
216                ban_time,
217                status
218            );
219            nc.ban_peer(peer, ban_time, status.to_string());
220        } else if status.should_warn() {
221            warn_target!(
222                crate::LOG_TARGET_RELAY,
223                "receive {} from {}, {}",
224                item_name,
225                peer,
226                status
227            );
228        } else if !status.is_ok() {
229            debug_target!(
230                crate::LOG_TARGET_RELAY,
231                "receive {} from {}, {}",
232                item_name,
233                peer,
234                status
235            );
236        }
237    }
238
239    /// Request the transaction corresponding to the proposal id from the specified node
240    pub fn request_proposal_txs(
241        &self,
242        nc: &dyn CKBProtocolContext,
243        peer: PeerIndex,
244        block_hash_and_number: BlockNumberAndHash,
245        proposals: Vec<packed::ProposalShortId>,
246    ) {
247        let tx_pool = self.shared.shared().tx_pool_controller();
248        let fresh_proposals: Vec<ProposalShortId> = match tx_pool.fresh_proposals_filter(proposals)
249        {
250            Err(err) => {
251                debug_target!(
252                    crate::LOG_TARGET_RELAY,
253                    "tx_pool fresh_proposals_filter error: {:?}",
254                    err,
255                );
256                return;
257            }
258            Ok(fresh_proposals) => fresh_proposals.into_iter().unique().collect(),
259        };
260
261        let to_ask_proposals: Vec<ProposalShortId> = self
262            .shared()
263            .state()
264            .insert_inflight_proposals(fresh_proposals.clone(), block_hash_and_number.number)
265            .into_iter()
266            .zip(fresh_proposals)
267            .filter_map(|(firstly_in, id)| if firstly_in { Some(id) } else { None })
268            .collect();
269        if !to_ask_proposals.is_empty() {
270            let content = packed::GetBlockProposal::new_builder()
271                .block_hash(block_hash_and_number.hash)
272                .proposals(to_ask_proposals.clone().pack())
273                .build();
274            let message = packed::RelayMessage::new_builder().set(content).build();
275            if !send_message_to(nc, peer, &message).is_ok() {
276                self.shared()
277                    .state()
278                    .remove_inflight_proposals(&to_ask_proposals);
279            }
280        }
281    }
282
283    /// Accept a new block from network
284    #[allow(clippy::needless_collect)]
285    pub fn accept_block(
286        &self,
287        nc: Arc<dyn CKBProtocolContext + Sync>,
288        peer_id: PeerIndex,
289        block: core::BlockView,
290        msg_name: &str,
291    ) {
292        if self
293            .shared()
294            .active_chain()
295            .contains_block_status(&block.hash(), BlockStatus::BLOCK_STORED)
296        {
297            return;
298        }
299
300        let block = Arc::new(block);
301
302        let verify_callback = {
303            let nc: Arc<dyn CKBProtocolContext + Sync> = Arc::clone(&nc);
304            let block = Arc::clone(&block);
305            let shared = Arc::clone(self.shared());
306            let msg_name = msg_name.to_owned();
307            Box::new(move |result: VerifyResult| match result {
308                Ok(verified) => {
309                    if !verified {
310                        debug!(
311                            "block {}-{} has verified already, won't build compact block and broadcast it",
312                            block.number(),
313                            block.hash()
314                        );
315                        return;
316                    }
317
318                    build_and_broadcast_compact_block(nc.as_ref(), shared.shared(), peer_id, block);
319                }
320                Err(err) => {
321                    error!(
322                        "verify block {}-{} failed: {:?}, won't build compact block and broadcast it",
323                        block.number(),
324                        block.hash(),
325                        err
326                    );
327
328                    let is_internal_db_error = is_internal_db_error(&err);
329                    if is_internal_db_error {
330                        return;
331                    }
332
333                    // punish the malicious peer
334                    post_sync_process(
335                        nc.as_ref(),
336                        peer_id,
337                        &msg_name,
338                        StatusCode::BlockIsInvalid.with_context(format!(
339                            "block {} is invalid, reason: {}",
340                            block.hash(),
341                            err
342                        )),
343                    );
344                }
345            })
346        };
347
348        let remote_block = RemoteBlock {
349            block,
350            verify_callback,
351        };
352
353        self.shared.accept_remote_block(&self.chain, remote_block);
354    }
355
356    /// Reorganize the full block according to the compact block/txs/uncles
357    // nodes should attempt to reconstruct the full block by taking the prefilledtxn transactions
358    // from the original CompactBlock message and placing them in the marked positions,
359    // then for each short transaction ID from the original compact_block message, in order,
360    // find the corresponding transaction either from the BlockTransactions message or
361    // from other sources and place it in the first available position in the block
362    // then once the block has been reconstructed, it shall be processed as normal,
363    // keeping in mind that short_ids are expected to occasionally collide,
364    // and that nodes must not be penalized for such collisions, wherever they appear.
365    pub fn reconstruct_block(
366        &self,
367        active_chain: &ActiveChain,
368        compact_block: &packed::CompactBlock,
369        received_transactions: Vec<core::TransactionView>,
370        uncles_index: &[u32],
371        received_uncles: &[core::UncleBlockView],
372    ) -> ReconstructionResult {
373        let block_txs_len = received_transactions.len();
374        let compact_block_hash = compact_block.calc_header_hash();
375        debug_target!(
376            crate::LOG_TARGET_RELAY,
377            "start block reconstruction, block hash: {}, received transactions len: {}",
378            compact_block_hash,
379            block_txs_len,
380        );
381
382        let mut short_ids_set: HashSet<ProposalShortId> =
383            compact_block.short_ids().into_iter().collect();
384
385        let mut txs_map: HashMap<ProposalShortId, core::TransactionView> = received_transactions
386            .into_iter()
387            .filter_map(|tx| {
388                let short_id = tx.proposal_short_id();
389                if short_ids_set.remove(&short_id) {
390                    Some((short_id, tx))
391                } else {
392                    None
393                }
394            })
395            .collect();
396
397        if !short_ids_set.is_empty() {
398            let tx_pool = self.shared.shared().tx_pool_controller();
399            let fetch_txs = tx_pool.fetch_txs(short_ids_set);
400            if let Err(e) = fetch_txs {
401                return ReconstructionResult::Error(StatusCode::TxPool.with_context(e));
402            }
403            txs_map.extend(fetch_txs.unwrap());
404        }
405
406        let txs_len = compact_block.txs_len();
407        let mut block_transactions: Vec<Option<core::TransactionView>> =
408            Vec::with_capacity(txs_len);
409
410        let short_ids_iter = &mut compact_block.short_ids().into_iter();
411        // fill transactions gap
412        compact_block
413            .prefilled_transactions()
414            .into_iter()
415            .for_each(|pt| {
416                let index: usize = pt.index().unpack();
417                let gap = index - block_transactions.len();
418                if gap > 0 {
419                    short_ids_iter
420                        .take(gap)
421                        .for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
422                }
423                block_transactions.push(Some(pt.transaction().into_view()));
424            });
425
426        // append remain transactions
427        short_ids_iter.for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
428
429        let missing = block_transactions.iter().any(Option::is_none);
430
431        let mut missing_uncles = Vec::with_capacity(compact_block.uncles().len());
432        let mut uncles = Vec::with_capacity(compact_block.uncles().len());
433
434        let mut position = 0;
435        for (i, uncle_hash) in compact_block.uncles().into_iter().enumerate() {
436            if uncles_index.contains(&(i as u32)) {
437                uncles.push(
438                    received_uncles
439                        .get(position)
440                        .expect("have checked the indexes")
441                        .clone()
442                        .data(),
443                );
444                position += 1;
445                continue;
446            };
447            let status = active_chain.get_block_status(&uncle_hash);
448            match status {
449                BlockStatus::UNKNOWN | BlockStatus::HEADER_VALID => missing_uncles.push(i),
450                BlockStatus::BLOCK_STORED | BlockStatus::BLOCK_VALID => {
451                    if let Some(uncle) = active_chain.get_block(&uncle_hash) {
452                        uncles.push(uncle.as_uncle().data());
453                    } else {
454                        debug_target!(
455                            crate::LOG_TARGET_RELAY,
456                            "reconstruct_block could not find {:#?} uncle block: {:#?}",
457                            status,
458                            uncle_hash,
459                        );
460                        missing_uncles.push(i);
461                    }
462                }
463                BlockStatus::BLOCK_RECEIVED => {
464                    if let Some(uncle) = self
465                        .chain
466                        .get_orphan_block(self.shared().store(), &uncle_hash)
467                    {
468                        uncles.push(uncle.as_uncle().data());
469                    } else {
470                        debug_target!(
471                            crate::LOG_TARGET_RELAY,
472                            "reconstruct_block could not find {:#?} uncle block: {:#?}",
473                            status,
474                            uncle_hash,
475                        );
476                        missing_uncles.push(i);
477                    }
478                }
479                BlockStatus::BLOCK_INVALID => {
480                    return ReconstructionResult::Error(
481                        StatusCode::CompactBlockHasInvalidUncle.with_context(uncle_hash),
482                    );
483                }
484                _ => missing_uncles.push(i),
485            }
486        }
487
488        if !missing && missing_uncles.is_empty() {
489            let txs = block_transactions
490                .into_iter()
491                .collect::<Option<Vec<_>>>()
492                .expect("missing checked, should not fail");
493            let block = if let Some(extension) = compact_block.extension() {
494                packed::BlockV1::new_builder()
495                    .header(compact_block.header())
496                    .uncles(uncles.pack())
497                    .transactions(txs.into_iter().map(|tx| tx.data()).pack())
498                    .proposals(compact_block.proposals())
499                    .extension(extension)
500                    .build()
501                    .as_v0()
502            } else {
503                packed::Block::new_builder()
504                    .header(compact_block.header())
505                    .uncles(uncles.pack())
506                    .transactions(txs.into_iter().map(|tx| tx.data()).pack())
507                    .proposals(compact_block.proposals())
508                    .build()
509            }
510            .into_view();
511
512            debug_target!(
513                crate::LOG_TARGET_RELAY,
514                "finish block reconstruction, block hash: {}",
515                compact_block.calc_header_hash(),
516            );
517
518            let compact_block_tx_root = compact_block.header().raw().transactions_root();
519            let reconstruct_block_tx_root = block.transactions_root();
520            if compact_block_tx_root != reconstruct_block_tx_root {
521                if compact_block.short_ids().is_empty()
522                    || compact_block.short_ids().len() == block_txs_len
523                {
524                    return ReconstructionResult::Error(
525                        StatusCode::CompactBlockHasUnmatchedTransactionRootWithReconstructedBlock
526                            .with_context(format!(
527                                "Compact_block_tx_root({}) != reconstruct_block_tx_root({})",
528                                compact_block.header().raw().transactions_root(),
529                                block.transactions_root(),
530                            )),
531                    );
532                } else {
533                    if let Some(metrics) = ckb_metrics::handle() {
534                        metrics.ckb_relay_transaction_short_id_collide.inc();
535                    }
536                    return ReconstructionResult::Collided;
537                }
538            }
539
540            ReconstructionResult::Block(block)
541        } else {
542            let missing_indexes: Vec<usize> = block_transactions
543                .iter()
544                .enumerate()
545                .filter_map(|(i, t)| if t.is_none() { Some(i) } else { None })
546                .collect();
547
548            debug_target!(
549                crate::LOG_TARGET_RELAY,
550                "block reconstruction failed, block hash: {}, missing: {}, total: {}",
551                compact_block.calc_header_hash(),
552                missing_indexes.len(),
553                compact_block.short_ids().len(),
554            );
555
556            ReconstructionResult::Missing(missing_indexes, missing_uncles)
557        }
558    }
559
560    fn prune_tx_proposal_request(&self, nc: &dyn CKBProtocolContext) {
561        let get_block_proposals = self.shared().state().drain_get_block_proposals();
562        let tx_pool = self.shared.shared().tx_pool_controller();
563
564        let fetch_txs = tx_pool.fetch_txs(
565            get_block_proposals
566                .iter()
567                .map(|kv_pair| kv_pair.key().clone())
568                .collect(),
569        );
570        if let Err(err) = fetch_txs {
571            debug_target!(
572                crate::LOG_TARGET_RELAY,
573                "relayer prune_tx_proposal_request internal error: {:?}",
574                err,
575            );
576            return;
577        }
578
579        let txs = fetch_txs.unwrap();
580
581        let mut peer_txs = HashMap::new();
582        for (id, peer_indices) in get_block_proposals.into_iter() {
583            if let Some(tx) = txs.get(&id) {
584                for peer_index in peer_indices {
585                    let tx_set = peer_txs.entry(peer_index).or_insert_with(Vec::new);
586                    tx_set.push(tx.clone());
587                }
588            }
589        }
590
591        let send_block_proposals =
592            |nc: &dyn CKBProtocolContext, peer_index: PeerIndex, txs: Vec<packed::Transaction>| {
593                let content = packed::BlockProposal::new_builder()
594                    .transactions(txs.into_iter().pack())
595                    .build();
596                let message = packed::RelayMessage::new_builder().set(content).build();
597                let status = send_message_to(nc, peer_index, &message);
598                if !status.is_ok() {
599                    ckb_logger::error!(
600                        "send RelayBlockProposal to {}, status: {:?}",
601                        peer_index,
602                        status
603                    );
604                }
605            };
606
607        let mut relay_bytes = 0;
608        let mut relay_proposals = Vec::new();
609        for (peer_index, txs) in peer_txs {
610            for tx in txs {
611                let data = tx.data();
612                let tx_size = data.total_size();
613                if relay_bytes + tx_size > MAX_RELAY_TXS_BYTES_PER_BATCH {
614                    send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals));
615                    relay_bytes = tx_size;
616                } else {
617                    relay_bytes += tx_size;
618                }
619                relay_proposals.push(data);
620            }
621            if !relay_proposals.is_empty() {
622                send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals));
623                relay_bytes = 0;
624            }
625        }
626    }
627
628    /// Ask for relay transaction by hash from all peers
629    pub fn ask_for_txs(&self, nc: &dyn CKBProtocolContext) {
630        for (peer, mut tx_hashes) in self.shared().state().pop_ask_for_txs() {
631            if !tx_hashes.is_empty() {
632                debug_target!(
633                    crate::LOG_TARGET_RELAY,
634                    "Send get transaction ({} hashes) to {}",
635                    tx_hashes.len(),
636                    peer,
637                );
638                tx_hashes.truncate(MAX_RELAY_TXS_NUM_PER_BATCH);
639                let content = packed::GetRelayTransactions::new_builder()
640                    .tx_hashes(tx_hashes.pack())
641                    .build();
642                let message = packed::RelayMessage::new_builder().set(content).build();
643                let status = send_message_to(nc, peer, &message);
644                if !status.is_ok() {
645                    ckb_logger::error!(
646                        "interrupted request for transactions, status: {:?}",
647                        status
648                    );
649                }
650            }
651        }
652    }
653
654    /// Send bulk of tx hashes to selected peers
655    pub fn send_bulk_of_tx_hashes(&self, nc: &dyn CKBProtocolContext) {
656        const BUFFER_SIZE: usize = 42;
657
658        let connected_peers = nc.connected_peers();
659        if connected_peers.is_empty() {
660            return;
661        }
662
663        let ckb2023 = nc.ckb2023();
664        let tx_verify_results = self
665            .shared
666            .state()
667            .take_relay_tx_verify_results(MAX_RELAY_TXS_NUM_PER_BATCH);
668        let mut selected: HashMap<PeerIndex, Vec<Byte32>> = HashMap::default();
669        {
670            for tx_verify_result in tx_verify_results {
671                match tx_verify_result {
672                    TxVerificationResult::Ok {
673                        original_peer,
674                        with_vm_2023,
675                        tx_hash,
676                    } => {
677                        // must all fork or all no-fork
678                        if ckb2023 != with_vm_2023 {
679                            continue;
680                        }
681                        for target in &connected_peers {
682                            match original_peer {
683                                Some(peer) => {
684                                    // broadcast tx hash to all connected peers except original peer
685                                    if peer != *target {
686                                        let hashes = selected
687                                            .entry(*target)
688                                            .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
689                                        hashes.push(tx_hash.clone());
690                                    }
691                                }
692                                None => {
693                                    // since this tx is submitted through local rpc, it is assumed to be a new tx for all connected peers
694                                    let hashes = selected
695                                        .entry(*target)
696                                        .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
697                                    hashes.push(tx_hash.clone());
698                                    self.shared.state().mark_as_known_tx(tx_hash.clone());
699                                }
700                            }
701                        }
702                    }
703                    TxVerificationResult::Reject { tx_hash } => {
704                        self.shared.state().remove_from_known_txs(&tx_hash);
705                    }
706                    TxVerificationResult::UnknownParents { peer, parents } => {
707                        let tx_hashes: Vec<_> = {
708                            let mut tx_filter = self.shared.state().tx_filter();
709                            tx_filter.remove_expired();
710                            parents
711                                .into_iter()
712                                .filter(|tx_hash| !tx_filter.contains(tx_hash))
713                                .collect()
714                        };
715                        self.shared.state().add_ask_for_txs(peer, tx_hashes);
716                    }
717                }
718            }
719        }
720        for (peer, hashes) in selected {
721            let content = packed::RelayTransactionHashes::new_builder()
722                .tx_hashes(hashes.pack())
723                .build();
724            let message = packed::RelayMessage::new_builder().set(content).build();
725
726            if let Err(err) = nc.filter_broadcast(TargetSession::Single(peer), message.as_bytes()) {
727                debug_target!(
728                    crate::LOG_TARGET_RELAY,
729                    "relayer send TransactionHashes error: {:?}",
730                    err,
731                );
732            }
733        }
734    }
735}
736
737fn build_and_broadcast_compact_block(
738    nc: &dyn CKBProtocolContext,
739    shared: &Shared,
740    peer: PeerIndex,
741    block: Arc<BlockView>,
742) {
743    debug_target!(
744        crate::LOG_TARGET_RELAY,
745        "[block_relay] relayer accept_block {} {}",
746        block.header().hash(),
747        unix_time_as_millis()
748    );
749    let block_hash = block.hash();
750    shared.remove_header_view(&block_hash);
751    let cb = packed::CompactBlock::build_from_block(&block, &HashSet::new());
752    let message = packed::RelayMessage::new_builder().set(cb).build();
753
754    let selected_peers: Vec<PeerIndex> = nc
755        .connected_peers()
756        .into_iter()
757        .filter(|target_peer| peer != *target_peer)
758        .take(MAX_RELAY_PEERS)
759        .collect();
760    if let Err(err) = nc.quick_filter_broadcast(
761        TargetSession::Multi(Box::new(selected_peers.into_iter())),
762        message.as_bytes(),
763    ) {
764        debug_target!(
765            crate::LOG_TARGET_RELAY,
766            "relayer send block when accept block error: {:?}",
767            err,
768        );
769    }
770
771    if let Some(p2p_control) = nc.p2p_control() {
772        let snapshot = shared.snapshot();
773        let parent_chain_root = {
774            let mmr = snapshot.chain_root_mmr(block.header().number() - 1);
775            match mmr.get_root() {
776                Ok(root) => root,
777                Err(err) => {
778                    error_target!(
779                        crate::LOG_TARGET_RELAY,
780                        "Generate last state to light client failed: {:?}",
781                        err
782                    );
783                    return;
784                }
785            }
786        };
787
788        let tip_header = packed::VerifiableHeader::new_builder()
789            .header(block.header().data())
790            .uncles_hash(block.calc_uncles_hash())
791            .extension(Pack::pack(&block.extension()))
792            .parent_chain_root(parent_chain_root)
793            .build();
794        let light_client_message = {
795            let content = packed::SendLastState::new_builder()
796                .last_header(tip_header)
797                .build();
798            packed::LightClientMessage::new_builder()
799                .set(content)
800                .build()
801        };
802        let light_client_peers: HashSet<PeerIndex> = nc
803            .connected_peers()
804            .into_iter()
805            .filter_map(|index| nc.get_peer(index).map(|peer| (index, peer)))
806            .filter(|(_id, peer)| peer.if_lightclient_subscribed)
807            .map(|(id, _)| id)
808            .collect();
809        if let Err(err) = p2p_control.filter_broadcast(
810            TargetSession::Filter(Box::new(move |id| light_client_peers.contains(id))),
811            SupportProtocols::LightClient.protocol_id(),
812            light_client_message.as_bytes(),
813        ) {
814            debug_target!(
815                crate::LOG_TARGET_RELAY,
816                "relayer send last state to light client when accept block, error: {:?}",
817                err,
818            );
819        }
820    }
821}
822
823#[async_trait]
824impl CKBProtocolHandler for Relayer {
825    async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
826        nc.set_notify(Duration::from_millis(100), TX_PROPOSAL_TOKEN)
827            .await
828            .expect("set_notify at init is ok");
829        nc.set_notify(Duration::from_millis(100), ASK_FOR_TXS_TOKEN)
830            .await
831            .expect("set_notify at init is ok");
832        nc.set_notify(Duration::from_millis(300), TX_HASHES_TOKEN)
833            .await
834            .expect("set_notify at init is ok");
835    }
836
837    async fn received(
838        &mut self,
839        nc: Arc<dyn CKBProtocolContext + Sync>,
840        peer_index: PeerIndex,
841        data: Bytes,
842    ) {
843        // If self is in the IBD state, don't process any relayer message.
844        if self.shared.active_chain().is_initial_block_download() {
845            return;
846        }
847
848        let msg = match packed::RelayMessageReader::from_compatible_slice(&data) {
849            Ok(msg) => {
850                let item = msg.to_enum();
851                if let packed::RelayMessageUnionReader::CompactBlock(ref reader) = item {
852                    if reader.count_extra_fields() > 1 {
853                        info_target!(
854                            crate::LOG_TARGET_RELAY,
855                            "Peer {} sends us a malformed message: \
856                             too many fields in CompactBlock",
857                            peer_index
858                        );
859                        nc.ban_peer(
860                            peer_index,
861                            BAD_MESSAGE_BAN_TIME,
862                            String::from(
863                                "send us a malformed message: \
864                                 too many fields in CompactBlock",
865                            ),
866                        );
867                        return;
868                    } else {
869                        item
870                    }
871                } else {
872                    match packed::RelayMessageReader::from_slice(&data) {
873                        Ok(msg) => msg.to_enum(),
874                        _ => {
875                            info_target!(
876                                crate::LOG_TARGET_RELAY,
877                                "Peer {} sends us a malformed message: \
878                                 too many fields",
879                                peer_index
880                            );
881                            nc.ban_peer(
882                                peer_index,
883                                BAD_MESSAGE_BAN_TIME,
884                                String::from(
885                                    "send us a malformed message \
886                                     too many fields",
887                                ),
888                            );
889                            return;
890                        }
891                    }
892                }
893            }
894            _ => {
895                info_target!(
896                    crate::LOG_TARGET_RELAY,
897                    "Peer {} sends us a malformed message",
898                    peer_index
899                );
900                nc.ban_peer(
901                    peer_index,
902                    BAD_MESSAGE_BAN_TIME,
903                    String::from("send us a malformed message"),
904                );
905                return;
906            }
907        };
908
909        debug_target!(
910            crate::LOG_TARGET_RELAY,
911            "received msg {} from {}",
912            msg.item_name(),
913            peer_index
914        );
915        #[cfg(feature = "with_sentry")]
916        {
917            let sentry_hub = sentry::Hub::current();
918            let _scope_guard = sentry_hub.push_scope();
919            sentry_hub.configure_scope(|scope| {
920                scope.set_tag("p2p.protocol", "relayer");
921                scope.set_tag("p2p.message", msg.item_name());
922            });
923        }
924
925        let start_time = Instant::now();
926        tokio::task::block_in_place(|| self.process(nc, peer_index, msg));
927        debug_target!(
928            crate::LOG_TARGET_RELAY,
929            "process message={}, peer={}, cost={:?}",
930            msg.item_name(),
931            peer_index,
932            Instant::now().saturating_duration_since(start_time),
933        );
934    }
935
936    async fn connected(
937        &mut self,
938        _nc: Arc<dyn CKBProtocolContext + Sync>,
939        peer_index: PeerIndex,
940        version: &str,
941    ) {
942        self.shared().state().peers().relay_connected(peer_index);
943        info_target!(
944            crate::LOG_TARGET_RELAY,
945            "RelayProtocol({}).connected peer={}",
946            version,
947            peer_index
948        );
949    }
950
951    async fn disconnected(
952        &mut self,
953        _nc: Arc<dyn CKBProtocolContext + Sync>,
954        peer_index: PeerIndex,
955    ) {
956        info_target!(
957            crate::LOG_TARGET_RELAY,
958            "RelayProtocol.disconnected peer={}",
959            peer_index
960        );
961        // Retains all keys in the rate limiter that were used recently enough.
962        self.rate_limiter.retain_recent();
963    }
964
965    async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
966        // If self is in the IBD state, don't trigger any relayer notify.
967        if self.shared.active_chain().is_initial_block_download() {
968            return;
969        }
970
971        match RelaySwitch::new(&nc, self.v3) {
972            RelaySwitch::Ckb2021RelayV3 => return,
973            RelaySwitch::Ckb2023RelayV2 => {
974                if nc.remove_notify(TX_PROPOSAL_TOKEN).await.is_err() {
975                    trace_target!(crate::LOG_TARGET_RELAY, "remove v2 relay notify fail");
976                }
977                if nc.remove_notify(ASK_FOR_TXS_TOKEN).await.is_err() {
978                    trace_target!(crate::LOG_TARGET_RELAY, "remove v2 relay notify fail");
979                }
980                if nc.remove_notify(TX_HASHES_TOKEN).await.is_err() {
981                    trace_target!(crate::LOG_TARGET_RELAY, "remove v2 relay notify fail");
982                }
983                for kv_pair in self.shared().state().peers().state.iter() {
984                    let (peer, state) = kv_pair.pair();
985                    if !state.peer_flags.is_2023edition {
986                        let _ignore = nc.disconnect(*peer, "Evict low-version clients ");
987                    }
988                }
989                return;
990            }
991            RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
992        }
993
994        let start_time = Instant::now();
995        trace_target!(crate::LOG_TARGET_RELAY, "start notify token={}", token);
996        match token {
997            TX_PROPOSAL_TOKEN => {
998                tokio::task::block_in_place(|| self.prune_tx_proposal_request(nc.as_ref()))
999            }
1000            ASK_FOR_TXS_TOKEN => self.ask_for_txs(nc.as_ref()),
1001            TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(nc.as_ref()),
1002            _ => unreachable!(),
1003        }
1004        trace_target!(
1005            crate::LOG_TARGET_RELAY,
1006            "finished notify token={} cost={:?}",
1007            token,
1008            Instant::now().saturating_duration_since(start_time)
1009        );
1010    }
1011}
1012
1013#[derive(Copy, Clone, Debug)]
1014enum RelaySwitch {
1015    Ckb2021RelayV2,
1016    Ckb2021RelayV3,
1017    Ckb2023RelayV2,
1018    Ckb2023RelayV3,
1019}
1020
1021impl RelaySwitch {
1022    fn new(nc: &Arc<dyn CKBProtocolContext + Sync>, is_relay_v3: bool) -> Self {
1023        match (nc.ckb2023(), is_relay_v3) {
1024            (true, true) => Self::Ckb2023RelayV3,
1025            (true, false) => Self::Ckb2023RelayV2,
1026            (false, true) => Self::Ckb2021RelayV3,
1027            (false, false) => Self::Ckb2021RelayV2,
1028        }
1029    }
1030}