1mod block_proposal_process;
2mod block_transactions_process;
3mod block_transactions_verifier;
4mod block_uncles_verifier;
5mod compact_block_process;
6mod compact_block_verifier;
7mod get_block_proposal_process;
8mod get_block_transactions_process;
9mod get_transactions_process;
10#[cfg(test)]
11pub(crate) mod tests;
12mod transaction_hashes_process;
13mod transactions_process;
14
15use self::block_proposal_process::BlockProposalProcess;
16use self::block_transactions_process::BlockTransactionsProcess;
17pub(crate) use self::compact_block_process::CompactBlockProcess;
18use self::get_block_proposal_process::GetBlockProposalProcess;
19use self::get_block_transactions_process::GetBlockTransactionsProcess;
20use self::get_transactions_process::GetTransactionsProcess;
21use self::transaction_hashes_process::TransactionHashesProcess;
22use self::transactions_process::TransactionsProcess;
23use crate::types::{ActiveChain, SyncShared, post_sync_process};
24use crate::utils::{MetricDirection, metric_ckb_message_bytes, send_message_to};
25use crate::{Status, StatusCode};
26use ckb_chain::VerifyResult;
27use ckb_chain::{ChainController, RemoteBlock};
28use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
29use ckb_error::is_internal_db_error;
30use ckb_logger::{
31 debug, debug_target, error, error_target, info_target, trace_target, warn_target,
32};
33use ckb_network::{
34 CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols, TargetSession,
35 async_trait, bytes::Bytes, tokio,
36};
37use ckb_shared::Shared;
38use ckb_shared::block_status::BlockStatus;
39use ckb_systemtime::unix_time_as_millis;
40use ckb_tx_pool::service::TxVerificationResult;
41use ckb_types::BlockNumberAndHash;
42use ckb_types::{
43 core::{self, BlockView},
44 packed::{self, Byte32, ProposalShortId},
45 prelude::*,
46};
47use itertools::Itertools;
48use std::collections::{HashMap, HashSet};
49use std::sync::Arc;
50use std::time::{Duration, Instant};
51
52pub const TX_PROPOSAL_TOKEN: u64 = 0;
53pub const ASK_FOR_TXS_TOKEN: u64 = 1;
54pub const TX_HASHES_TOKEN: u64 = 2;
55
56pub const MAX_RELAY_PEERS: usize = 128;
57pub const MAX_RELAY_TXS_NUM_PER_BATCH: usize = 32767;
58pub const MAX_RELAY_TXS_BYTES_PER_BATCH: usize = 1024 * 1024;
59
60type RateLimiter<T> = governor::RateLimiter<
61 T,
62 governor::state::keyed::HashMapStateStore<T>,
63 governor::clock::DefaultClock,
64>;
65
66#[derive(Debug, Eq, PartialEq)]
67pub enum ReconstructionResult {
68 Block(BlockView),
69 Missing(Vec<usize>, Vec<usize>),
70 Collided,
71 Error(Status),
72}
73
74pub struct Relayer {
76 chain: ChainController,
77 pub(crate) shared: Arc<SyncShared>,
78 rate_limiter: RateLimiter<(PeerIndex, u32)>,
79 v3: bool,
80}
81
82impl Relayer {
83 pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Self {
87 let quota = governor::Quota::per_second(std::num::NonZeroU32::new(30).unwrap());
90 let rate_limiter = RateLimiter::hashmap(quota);
91
92 Relayer {
93 chain,
94 shared,
95 rate_limiter,
96 v3: false,
97 }
98 }
99
100 pub fn v3(mut self) -> Self {
102 self.v3 = true;
103 self
104 }
105
106 pub fn shared(&self) -> &Arc<SyncShared> {
108 &self.shared
109 }
110
111 fn try_process(
112 &mut self,
113 nc: Arc<dyn CKBProtocolContext + Sync>,
114 peer: PeerIndex,
115 message: packed::RelayMessageUnionReader<'_>,
116 ) -> Status {
117 let should_check_rate =
119 !matches!(message, packed::RelayMessageUnionReader::CompactBlock(_));
120
121 if should_check_rate
122 && self
123 .rate_limiter
124 .check_key(&(peer, message.item_id()))
125 .is_err()
126 {
127 return StatusCode::TooManyRequests.with_context(message.item_name());
128 }
129
130 match message {
131 packed::RelayMessageUnionReader::CompactBlock(reader) => {
132 CompactBlockProcess::new(reader, self, nc, peer).execute()
133 }
134 packed::RelayMessageUnionReader::RelayTransactions(reader) => {
135 match RelaySwitch::new(&nc, self.v3) {
138 RelaySwitch::Ckb2023RelayV2 | RelaySwitch::Ckb2021RelayV3 => {
139 return Status::ignored();
140 }
141 RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
142 }
143 if reader.check_data() {
144 TransactionsProcess::new(reader, self, nc, peer).execute()
145 } else {
146 StatusCode::ProtocolMessageIsMalformed
147 .with_context("RelayTransactions is invalid")
148 }
149 }
150 packed::RelayMessageUnionReader::RelayTransactionHashes(reader) => {
151 match RelaySwitch::new(&nc, self.v3) {
154 RelaySwitch::Ckb2023RelayV2 | RelaySwitch::Ckb2021RelayV3 => {
155 return Status::ignored();
156 }
157 RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
158 }
159 TransactionHashesProcess::new(reader, self, peer).execute()
160 }
161 packed::RelayMessageUnionReader::GetRelayTransactions(reader) => {
162 match RelaySwitch::new(&nc, self.v3) {
165 RelaySwitch::Ckb2023RelayV2 | RelaySwitch::Ckb2021RelayV3 => {
166 return Status::ignored();
167 }
168 RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
169 }
170 GetTransactionsProcess::new(reader, self, nc, peer).execute()
171 }
172 packed::RelayMessageUnionReader::GetBlockTransactions(reader) => {
173 GetBlockTransactionsProcess::new(reader, self, nc, peer).execute()
174 }
175 packed::RelayMessageUnionReader::BlockTransactions(reader) => {
176 if reader.check_data() {
177 BlockTransactionsProcess::new(reader, self, nc, peer).execute()
178 } else {
179 StatusCode::ProtocolMessageIsMalformed
180 .with_context("BlockTransactions is invalid")
181 }
182 }
183 packed::RelayMessageUnionReader::GetBlockProposal(reader) => {
184 GetBlockProposalProcess::new(reader, self, nc, peer).execute()
185 }
186 packed::RelayMessageUnionReader::BlockProposal(reader) => {
187 BlockProposalProcess::new(reader, self).execute()
188 }
189 }
190 }
191
192 fn process(
193 &mut self,
194 nc: Arc<dyn CKBProtocolContext + Sync>,
195 peer: PeerIndex,
196 message: packed::RelayMessageUnionReader<'_>,
197 ) {
198 let item_name = message.item_name();
199 let item_bytes = message.as_slice().len() as u64;
200 let status = self.try_process(Arc::clone(&nc), peer, message);
201
202 metric_ckb_message_bytes(
203 MetricDirection::In,
204 &SupportProtocols::RelayV2.name(),
205 message.item_name(),
206 Some(status.code()),
207 item_bytes,
208 );
209
210 if let Some(ban_time) = status.should_ban() {
211 error_target!(
212 crate::LOG_TARGET_RELAY,
213 "receive {} from {}, ban {:?} for {}",
214 item_name,
215 peer,
216 ban_time,
217 status
218 );
219 nc.ban_peer(peer, ban_time, status.to_string());
220 } else if status.should_warn() {
221 warn_target!(
222 crate::LOG_TARGET_RELAY,
223 "receive {} from {}, {}",
224 item_name,
225 peer,
226 status
227 );
228 } else if !status.is_ok() {
229 debug_target!(
230 crate::LOG_TARGET_RELAY,
231 "receive {} from {}, {}",
232 item_name,
233 peer,
234 status
235 );
236 }
237 }
238
239 pub fn request_proposal_txs(
241 &self,
242 nc: &dyn CKBProtocolContext,
243 peer: PeerIndex,
244 block_hash_and_number: BlockNumberAndHash,
245 proposals: Vec<packed::ProposalShortId>,
246 ) {
247 let tx_pool = self.shared.shared().tx_pool_controller();
248 let fresh_proposals: Vec<ProposalShortId> = match tx_pool.fresh_proposals_filter(proposals)
249 {
250 Err(err) => {
251 debug_target!(
252 crate::LOG_TARGET_RELAY,
253 "tx_pool fresh_proposals_filter error: {:?}",
254 err,
255 );
256 return;
257 }
258 Ok(fresh_proposals) => fresh_proposals.into_iter().unique().collect(),
259 };
260
261 let to_ask_proposals: Vec<ProposalShortId> = self
262 .shared()
263 .state()
264 .insert_inflight_proposals(fresh_proposals.clone(), block_hash_and_number.number)
265 .into_iter()
266 .zip(fresh_proposals)
267 .filter_map(|(firstly_in, id)| if firstly_in { Some(id) } else { None })
268 .collect();
269 if !to_ask_proposals.is_empty() {
270 let content = packed::GetBlockProposal::new_builder()
271 .block_hash(block_hash_and_number.hash)
272 .proposals(to_ask_proposals.clone().pack())
273 .build();
274 let message = packed::RelayMessage::new_builder().set(content).build();
275 if !send_message_to(nc, peer, &message).is_ok() {
276 self.shared()
277 .state()
278 .remove_inflight_proposals(&to_ask_proposals);
279 }
280 }
281 }
282
283 #[allow(clippy::needless_collect)]
285 pub fn accept_block(
286 &self,
287 nc: Arc<dyn CKBProtocolContext + Sync>,
288 peer_id: PeerIndex,
289 block: core::BlockView,
290 msg_name: &str,
291 ) {
292 if self
293 .shared()
294 .active_chain()
295 .contains_block_status(&block.hash(), BlockStatus::BLOCK_STORED)
296 {
297 return;
298 }
299
300 let block = Arc::new(block);
301
302 let verify_callback = {
303 let nc: Arc<dyn CKBProtocolContext + Sync> = Arc::clone(&nc);
304 let block = Arc::clone(&block);
305 let shared = Arc::clone(self.shared());
306 let msg_name = msg_name.to_owned();
307 Box::new(move |result: VerifyResult| match result {
308 Ok(verified) => {
309 if !verified {
310 debug!(
311 "block {}-{} has verified already, won't build compact block and broadcast it",
312 block.number(),
313 block.hash()
314 );
315 return;
316 }
317
318 build_and_broadcast_compact_block(nc.as_ref(), shared.shared(), peer_id, block);
319 }
320 Err(err) => {
321 error!(
322 "verify block {}-{} failed: {:?}, won't build compact block and broadcast it",
323 block.number(),
324 block.hash(),
325 err
326 );
327
328 let is_internal_db_error = is_internal_db_error(&err);
329 if is_internal_db_error {
330 return;
331 }
332
333 post_sync_process(
335 nc.as_ref(),
336 peer_id,
337 &msg_name,
338 StatusCode::BlockIsInvalid.with_context(format!(
339 "block {} is invalid, reason: {}",
340 block.hash(),
341 err
342 )),
343 );
344 }
345 })
346 };
347
348 let remote_block = RemoteBlock {
349 block,
350 verify_callback,
351 };
352
353 self.shared.accept_remote_block(&self.chain, remote_block);
354 }
355
356 pub fn reconstruct_block(
366 &self,
367 active_chain: &ActiveChain,
368 compact_block: &packed::CompactBlock,
369 received_transactions: Vec<core::TransactionView>,
370 uncles_index: &[u32],
371 received_uncles: &[core::UncleBlockView],
372 ) -> ReconstructionResult {
373 let block_txs_len = received_transactions.len();
374 let compact_block_hash = compact_block.calc_header_hash();
375 debug_target!(
376 crate::LOG_TARGET_RELAY,
377 "start block reconstruction, block hash: {}, received transactions len: {}",
378 compact_block_hash,
379 block_txs_len,
380 );
381
382 let mut short_ids_set: HashSet<ProposalShortId> =
383 compact_block.short_ids().into_iter().collect();
384
385 let mut txs_map: HashMap<ProposalShortId, core::TransactionView> = received_transactions
386 .into_iter()
387 .filter_map(|tx| {
388 let short_id = tx.proposal_short_id();
389 if short_ids_set.remove(&short_id) {
390 Some((short_id, tx))
391 } else {
392 None
393 }
394 })
395 .collect();
396
397 if !short_ids_set.is_empty() {
398 let tx_pool = self.shared.shared().tx_pool_controller();
399 let fetch_txs = tx_pool.fetch_txs(short_ids_set);
400 if let Err(e) = fetch_txs {
401 return ReconstructionResult::Error(StatusCode::TxPool.with_context(e));
402 }
403 txs_map.extend(fetch_txs.unwrap());
404 }
405
406 let txs_len = compact_block.txs_len();
407 let mut block_transactions: Vec<Option<core::TransactionView>> =
408 Vec::with_capacity(txs_len);
409
410 let short_ids_iter = &mut compact_block.short_ids().into_iter();
411 compact_block
413 .prefilled_transactions()
414 .into_iter()
415 .for_each(|pt| {
416 let index: usize = pt.index().unpack();
417 let gap = index - block_transactions.len();
418 if gap > 0 {
419 short_ids_iter
420 .take(gap)
421 .for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
422 }
423 block_transactions.push(Some(pt.transaction().into_view()));
424 });
425
426 short_ids_iter.for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
428
429 let missing = block_transactions.iter().any(Option::is_none);
430
431 let mut missing_uncles = Vec::with_capacity(compact_block.uncles().len());
432 let mut uncles = Vec::with_capacity(compact_block.uncles().len());
433
434 let mut position = 0;
435 for (i, uncle_hash) in compact_block.uncles().into_iter().enumerate() {
436 if uncles_index.contains(&(i as u32)) {
437 uncles.push(
438 received_uncles
439 .get(position)
440 .expect("have checked the indexes")
441 .clone()
442 .data(),
443 );
444 position += 1;
445 continue;
446 };
447 let status = active_chain.get_block_status(&uncle_hash);
448 match status {
449 BlockStatus::UNKNOWN | BlockStatus::HEADER_VALID => missing_uncles.push(i),
450 BlockStatus::BLOCK_STORED | BlockStatus::BLOCK_VALID => {
451 if let Some(uncle) = active_chain.get_block(&uncle_hash) {
452 uncles.push(uncle.as_uncle().data());
453 } else {
454 debug_target!(
455 crate::LOG_TARGET_RELAY,
456 "reconstruct_block could not find {:#?} uncle block: {:#?}",
457 status,
458 uncle_hash,
459 );
460 missing_uncles.push(i);
461 }
462 }
463 BlockStatus::BLOCK_RECEIVED => {
464 if let Some(uncle) = self
465 .chain
466 .get_orphan_block(self.shared().store(), &uncle_hash)
467 {
468 uncles.push(uncle.as_uncle().data());
469 } else {
470 debug_target!(
471 crate::LOG_TARGET_RELAY,
472 "reconstruct_block could not find {:#?} uncle block: {:#?}",
473 status,
474 uncle_hash,
475 );
476 missing_uncles.push(i);
477 }
478 }
479 BlockStatus::BLOCK_INVALID => {
480 return ReconstructionResult::Error(
481 StatusCode::CompactBlockHasInvalidUncle.with_context(uncle_hash),
482 );
483 }
484 _ => missing_uncles.push(i),
485 }
486 }
487
488 if !missing && missing_uncles.is_empty() {
489 let txs = block_transactions
490 .into_iter()
491 .collect::<Option<Vec<_>>>()
492 .expect("missing checked, should not fail");
493 let block = if let Some(extension) = compact_block.extension() {
494 packed::BlockV1::new_builder()
495 .header(compact_block.header())
496 .uncles(uncles.pack())
497 .transactions(txs.into_iter().map(|tx| tx.data()).pack())
498 .proposals(compact_block.proposals())
499 .extension(extension)
500 .build()
501 .as_v0()
502 } else {
503 packed::Block::new_builder()
504 .header(compact_block.header())
505 .uncles(uncles.pack())
506 .transactions(txs.into_iter().map(|tx| tx.data()).pack())
507 .proposals(compact_block.proposals())
508 .build()
509 }
510 .into_view();
511
512 debug_target!(
513 crate::LOG_TARGET_RELAY,
514 "finish block reconstruction, block hash: {}",
515 compact_block.calc_header_hash(),
516 );
517
518 let compact_block_tx_root = compact_block.header().raw().transactions_root();
519 let reconstruct_block_tx_root = block.transactions_root();
520 if compact_block_tx_root != reconstruct_block_tx_root {
521 if compact_block.short_ids().is_empty()
522 || compact_block.short_ids().len() == block_txs_len
523 {
524 return ReconstructionResult::Error(
525 StatusCode::CompactBlockHasUnmatchedTransactionRootWithReconstructedBlock
526 .with_context(format!(
527 "Compact_block_tx_root({}) != reconstruct_block_tx_root({})",
528 compact_block.header().raw().transactions_root(),
529 block.transactions_root(),
530 )),
531 );
532 } else {
533 if let Some(metrics) = ckb_metrics::handle() {
534 metrics.ckb_relay_transaction_short_id_collide.inc();
535 }
536 return ReconstructionResult::Collided;
537 }
538 }
539
540 ReconstructionResult::Block(block)
541 } else {
542 let missing_indexes: Vec<usize> = block_transactions
543 .iter()
544 .enumerate()
545 .filter_map(|(i, t)| if t.is_none() { Some(i) } else { None })
546 .collect();
547
548 debug_target!(
549 crate::LOG_TARGET_RELAY,
550 "block reconstruction failed, block hash: {}, missing: {}, total: {}",
551 compact_block.calc_header_hash(),
552 missing_indexes.len(),
553 compact_block.short_ids().len(),
554 );
555
556 ReconstructionResult::Missing(missing_indexes, missing_uncles)
557 }
558 }
559
560 fn prune_tx_proposal_request(&self, nc: &dyn CKBProtocolContext) {
561 let get_block_proposals = self.shared().state().drain_get_block_proposals();
562 let tx_pool = self.shared.shared().tx_pool_controller();
563
564 let fetch_txs = tx_pool.fetch_txs(
565 get_block_proposals
566 .iter()
567 .map(|kv_pair| kv_pair.key().clone())
568 .collect(),
569 );
570 if let Err(err) = fetch_txs {
571 debug_target!(
572 crate::LOG_TARGET_RELAY,
573 "relayer prune_tx_proposal_request internal error: {:?}",
574 err,
575 );
576 return;
577 }
578
579 let txs = fetch_txs.unwrap();
580
581 let mut peer_txs = HashMap::new();
582 for (id, peer_indices) in get_block_proposals.into_iter() {
583 if let Some(tx) = txs.get(&id) {
584 for peer_index in peer_indices {
585 let tx_set = peer_txs.entry(peer_index).or_insert_with(Vec::new);
586 tx_set.push(tx.clone());
587 }
588 }
589 }
590
591 let send_block_proposals =
592 |nc: &dyn CKBProtocolContext, peer_index: PeerIndex, txs: Vec<packed::Transaction>| {
593 let content = packed::BlockProposal::new_builder()
594 .transactions(txs.into_iter().pack())
595 .build();
596 let message = packed::RelayMessage::new_builder().set(content).build();
597 let status = send_message_to(nc, peer_index, &message);
598 if !status.is_ok() {
599 ckb_logger::error!(
600 "send RelayBlockProposal to {}, status: {:?}",
601 peer_index,
602 status
603 );
604 }
605 };
606
607 let mut relay_bytes = 0;
608 let mut relay_proposals = Vec::new();
609 for (peer_index, txs) in peer_txs {
610 for tx in txs {
611 let data = tx.data();
612 let tx_size = data.total_size();
613 if relay_bytes + tx_size > MAX_RELAY_TXS_BYTES_PER_BATCH {
614 send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals));
615 relay_bytes = tx_size;
616 } else {
617 relay_bytes += tx_size;
618 }
619 relay_proposals.push(data);
620 }
621 if !relay_proposals.is_empty() {
622 send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals));
623 relay_bytes = 0;
624 }
625 }
626 }
627
628 pub fn ask_for_txs(&self, nc: &dyn CKBProtocolContext) {
630 for (peer, mut tx_hashes) in self.shared().state().pop_ask_for_txs() {
631 if !tx_hashes.is_empty() {
632 debug_target!(
633 crate::LOG_TARGET_RELAY,
634 "Send get transaction ({} hashes) to {}",
635 tx_hashes.len(),
636 peer,
637 );
638 tx_hashes.truncate(MAX_RELAY_TXS_NUM_PER_BATCH);
639 let content = packed::GetRelayTransactions::new_builder()
640 .tx_hashes(tx_hashes.pack())
641 .build();
642 let message = packed::RelayMessage::new_builder().set(content).build();
643 let status = send_message_to(nc, peer, &message);
644 if !status.is_ok() {
645 ckb_logger::error!(
646 "interrupted request for transactions, status: {:?}",
647 status
648 );
649 }
650 }
651 }
652 }
653
654 pub fn send_bulk_of_tx_hashes(&self, nc: &dyn CKBProtocolContext) {
656 const BUFFER_SIZE: usize = 42;
657
658 let connected_peers = nc.connected_peers();
659 if connected_peers.is_empty() {
660 return;
661 }
662
663 let ckb2023 = nc.ckb2023();
664 let tx_verify_results = self
665 .shared
666 .state()
667 .take_relay_tx_verify_results(MAX_RELAY_TXS_NUM_PER_BATCH);
668 let mut selected: HashMap<PeerIndex, Vec<Byte32>> = HashMap::default();
669 {
670 for tx_verify_result in tx_verify_results {
671 match tx_verify_result {
672 TxVerificationResult::Ok {
673 original_peer,
674 with_vm_2023,
675 tx_hash,
676 } => {
677 if ckb2023 != with_vm_2023 {
679 continue;
680 }
681 for target in &connected_peers {
682 match original_peer {
683 Some(peer) => {
684 if peer != *target {
686 let hashes = selected
687 .entry(*target)
688 .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
689 hashes.push(tx_hash.clone());
690 }
691 }
692 None => {
693 let hashes = selected
695 .entry(*target)
696 .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
697 hashes.push(tx_hash.clone());
698 self.shared.state().mark_as_known_tx(tx_hash.clone());
699 }
700 }
701 }
702 }
703 TxVerificationResult::Reject { tx_hash } => {
704 self.shared.state().remove_from_known_txs(&tx_hash);
705 }
706 TxVerificationResult::UnknownParents { peer, parents } => {
707 let tx_hashes: Vec<_> = {
708 let mut tx_filter = self.shared.state().tx_filter();
709 tx_filter.remove_expired();
710 parents
711 .into_iter()
712 .filter(|tx_hash| !tx_filter.contains(tx_hash))
713 .collect()
714 };
715 self.shared.state().add_ask_for_txs(peer, tx_hashes);
716 }
717 }
718 }
719 }
720 for (peer, hashes) in selected {
721 let content = packed::RelayTransactionHashes::new_builder()
722 .tx_hashes(hashes.pack())
723 .build();
724 let message = packed::RelayMessage::new_builder().set(content).build();
725
726 if let Err(err) = nc.filter_broadcast(TargetSession::Single(peer), message.as_bytes()) {
727 debug_target!(
728 crate::LOG_TARGET_RELAY,
729 "relayer send TransactionHashes error: {:?}",
730 err,
731 );
732 }
733 }
734 }
735}
736
737fn build_and_broadcast_compact_block(
738 nc: &dyn CKBProtocolContext,
739 shared: &Shared,
740 peer: PeerIndex,
741 block: Arc<BlockView>,
742) {
743 debug_target!(
744 crate::LOG_TARGET_RELAY,
745 "[block_relay] relayer accept_block {} {}",
746 block.header().hash(),
747 unix_time_as_millis()
748 );
749 let block_hash = block.hash();
750 shared.remove_header_view(&block_hash);
751 let cb = packed::CompactBlock::build_from_block(&block, &HashSet::new());
752 let message = packed::RelayMessage::new_builder().set(cb).build();
753
754 let selected_peers: Vec<PeerIndex> = nc
755 .connected_peers()
756 .into_iter()
757 .filter(|target_peer| peer != *target_peer)
758 .take(MAX_RELAY_PEERS)
759 .collect();
760 if let Err(err) = nc.quick_filter_broadcast(
761 TargetSession::Multi(Box::new(selected_peers.into_iter())),
762 message.as_bytes(),
763 ) {
764 debug_target!(
765 crate::LOG_TARGET_RELAY,
766 "relayer send block when accept block error: {:?}",
767 err,
768 );
769 }
770
771 if let Some(p2p_control) = nc.p2p_control() {
772 let snapshot = shared.snapshot();
773 let parent_chain_root = {
774 let mmr = snapshot.chain_root_mmr(block.header().number() - 1);
775 match mmr.get_root() {
776 Ok(root) => root,
777 Err(err) => {
778 error_target!(
779 crate::LOG_TARGET_RELAY,
780 "Generate last state to light client failed: {:?}",
781 err
782 );
783 return;
784 }
785 }
786 };
787
788 let tip_header = packed::VerifiableHeader::new_builder()
789 .header(block.header().data())
790 .uncles_hash(block.calc_uncles_hash())
791 .extension(Pack::pack(&block.extension()))
792 .parent_chain_root(parent_chain_root)
793 .build();
794 let light_client_message = {
795 let content = packed::SendLastState::new_builder()
796 .last_header(tip_header)
797 .build();
798 packed::LightClientMessage::new_builder()
799 .set(content)
800 .build()
801 };
802 let light_client_peers: HashSet<PeerIndex> = nc
803 .connected_peers()
804 .into_iter()
805 .filter_map(|index| nc.get_peer(index).map(|peer| (index, peer)))
806 .filter(|(_id, peer)| peer.if_lightclient_subscribed)
807 .map(|(id, _)| id)
808 .collect();
809 if let Err(err) = p2p_control.filter_broadcast(
810 TargetSession::Filter(Box::new(move |id| light_client_peers.contains(id))),
811 SupportProtocols::LightClient.protocol_id(),
812 light_client_message.as_bytes(),
813 ) {
814 debug_target!(
815 crate::LOG_TARGET_RELAY,
816 "relayer send last state to light client when accept block, error: {:?}",
817 err,
818 );
819 }
820 }
821}
822
823#[async_trait]
824impl CKBProtocolHandler for Relayer {
825 async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
826 nc.set_notify(Duration::from_millis(100), TX_PROPOSAL_TOKEN)
827 .await
828 .expect("set_notify at init is ok");
829 nc.set_notify(Duration::from_millis(100), ASK_FOR_TXS_TOKEN)
830 .await
831 .expect("set_notify at init is ok");
832 nc.set_notify(Duration::from_millis(300), TX_HASHES_TOKEN)
833 .await
834 .expect("set_notify at init is ok");
835 }
836
837 async fn received(
838 &mut self,
839 nc: Arc<dyn CKBProtocolContext + Sync>,
840 peer_index: PeerIndex,
841 data: Bytes,
842 ) {
843 if self.shared.active_chain().is_initial_block_download() {
845 return;
846 }
847
848 let msg = match packed::RelayMessageReader::from_compatible_slice(&data) {
849 Ok(msg) => {
850 let item = msg.to_enum();
851 if let packed::RelayMessageUnionReader::CompactBlock(ref reader) = item {
852 if reader.count_extra_fields() > 1 {
853 info_target!(
854 crate::LOG_TARGET_RELAY,
855 "Peer {} sends us a malformed message: \
856 too many fields in CompactBlock",
857 peer_index
858 );
859 nc.ban_peer(
860 peer_index,
861 BAD_MESSAGE_BAN_TIME,
862 String::from(
863 "send us a malformed message: \
864 too many fields in CompactBlock",
865 ),
866 );
867 return;
868 } else {
869 item
870 }
871 } else {
872 match packed::RelayMessageReader::from_slice(&data) {
873 Ok(msg) => msg.to_enum(),
874 _ => {
875 info_target!(
876 crate::LOG_TARGET_RELAY,
877 "Peer {} sends us a malformed message: \
878 too many fields",
879 peer_index
880 );
881 nc.ban_peer(
882 peer_index,
883 BAD_MESSAGE_BAN_TIME,
884 String::from(
885 "send us a malformed message \
886 too many fields",
887 ),
888 );
889 return;
890 }
891 }
892 }
893 }
894 _ => {
895 info_target!(
896 crate::LOG_TARGET_RELAY,
897 "Peer {} sends us a malformed message",
898 peer_index
899 );
900 nc.ban_peer(
901 peer_index,
902 BAD_MESSAGE_BAN_TIME,
903 String::from("send us a malformed message"),
904 );
905 return;
906 }
907 };
908
909 debug_target!(
910 crate::LOG_TARGET_RELAY,
911 "received msg {} from {}",
912 msg.item_name(),
913 peer_index
914 );
915 #[cfg(feature = "with_sentry")]
916 {
917 let sentry_hub = sentry::Hub::current();
918 let _scope_guard = sentry_hub.push_scope();
919 sentry_hub.configure_scope(|scope| {
920 scope.set_tag("p2p.protocol", "relayer");
921 scope.set_tag("p2p.message", msg.item_name());
922 });
923 }
924
925 let start_time = Instant::now();
926 tokio::task::block_in_place(|| self.process(nc, peer_index, msg));
927 debug_target!(
928 crate::LOG_TARGET_RELAY,
929 "process message={}, peer={}, cost={:?}",
930 msg.item_name(),
931 peer_index,
932 Instant::now().saturating_duration_since(start_time),
933 );
934 }
935
936 async fn connected(
937 &mut self,
938 _nc: Arc<dyn CKBProtocolContext + Sync>,
939 peer_index: PeerIndex,
940 version: &str,
941 ) {
942 self.shared().state().peers().relay_connected(peer_index);
943 info_target!(
944 crate::LOG_TARGET_RELAY,
945 "RelayProtocol({}).connected peer={}",
946 version,
947 peer_index
948 );
949 }
950
951 async fn disconnected(
952 &mut self,
953 _nc: Arc<dyn CKBProtocolContext + Sync>,
954 peer_index: PeerIndex,
955 ) {
956 info_target!(
957 crate::LOG_TARGET_RELAY,
958 "RelayProtocol.disconnected peer={}",
959 peer_index
960 );
961 self.rate_limiter.retain_recent();
963 }
964
965 async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
966 if self.shared.active_chain().is_initial_block_download() {
968 return;
969 }
970
971 match RelaySwitch::new(&nc, self.v3) {
972 RelaySwitch::Ckb2021RelayV3 => return,
973 RelaySwitch::Ckb2023RelayV2 => {
974 if nc.remove_notify(TX_PROPOSAL_TOKEN).await.is_err() {
975 trace_target!(crate::LOG_TARGET_RELAY, "remove v2 relay notify fail");
976 }
977 if nc.remove_notify(ASK_FOR_TXS_TOKEN).await.is_err() {
978 trace_target!(crate::LOG_TARGET_RELAY, "remove v2 relay notify fail");
979 }
980 if nc.remove_notify(TX_HASHES_TOKEN).await.is_err() {
981 trace_target!(crate::LOG_TARGET_RELAY, "remove v2 relay notify fail");
982 }
983 for kv_pair in self.shared().state().peers().state.iter() {
984 let (peer, state) = kv_pair.pair();
985 if !state.peer_flags.is_2023edition {
986 let _ignore = nc.disconnect(*peer, "Evict low-version clients ");
987 }
988 }
989 return;
990 }
991 RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
992 }
993
994 let start_time = Instant::now();
995 trace_target!(crate::LOG_TARGET_RELAY, "start notify token={}", token);
996 match token {
997 TX_PROPOSAL_TOKEN => {
998 tokio::task::block_in_place(|| self.prune_tx_proposal_request(nc.as_ref()))
999 }
1000 ASK_FOR_TXS_TOKEN => self.ask_for_txs(nc.as_ref()),
1001 TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(nc.as_ref()),
1002 _ => unreachable!(),
1003 }
1004 trace_target!(
1005 crate::LOG_TARGET_RELAY,
1006 "finished notify token={} cost={:?}",
1007 token,
1008 Instant::now().saturating_duration_since(start_time)
1009 );
1010 }
1011}
1012
1013#[derive(Copy, Clone, Debug)]
1014enum RelaySwitch {
1015 Ckb2021RelayV2,
1016 Ckb2021RelayV3,
1017 Ckb2023RelayV2,
1018 Ckb2023RelayV3,
1019}
1020
1021impl RelaySwitch {
1022 fn new(nc: &Arc<dyn CKBProtocolContext + Sync>, is_relay_v3: bool) -> Self {
1023 match (nc.ckb2023(), is_relay_v3) {
1024 (true, true) => Self::Ckb2023RelayV3,
1025 (true, false) => Self::Ckb2023RelayV2,
1026 (false, true) => Self::Ckb2021RelayV3,
1027 (false, false) => Self::Ckb2021RelayV2,
1028 }
1029 }
1030}