1mod block_proposal_process;
2mod block_transactions_process;
3mod block_transactions_verifier;
4mod block_uncles_verifier;
5mod compact_block_process;
6mod compact_block_verifier;
7mod get_block_proposal_process;
8mod get_block_transactions_process;
9mod get_transactions_process;
10#[cfg(test)]
11pub(crate) mod tests;
12mod transaction_hashes_process;
13mod transactions_process;
14
15use self::block_proposal_process::BlockProposalProcess;
16use self::block_transactions_process::BlockTransactionsProcess;
17pub(crate) use self::compact_block_process::CompactBlockProcess;
18use self::get_block_proposal_process::GetBlockProposalProcess;
19use self::get_block_transactions_process::GetBlockTransactionsProcess;
20use self::get_transactions_process::GetTransactionsProcess;
21use self::transaction_hashes_process::TransactionHashesProcess;
22use self::transactions_process::TransactionsProcess;
23use crate::types::{ActiveChain, SyncShared, post_sync_process};
24use crate::utils::{MetricDirection, metric_ckb_message_bytes, send_message_to};
25use crate::{Status, StatusCode};
26use ckb_chain::VerifyResult;
27use ckb_chain::{ChainController, RemoteBlock};
28use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
29use ckb_error::is_internal_db_error;
30use ckb_logger::{
31 debug, debug_target, error, error_target, info_target, trace_target, warn_target,
32};
33use ckb_network::{
34 CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols, TargetSession,
35 async_trait, bytes::Bytes, tokio,
36};
37use ckb_shared::Shared;
38use ckb_shared::block_status::BlockStatus;
39use ckb_systemtime::unix_time_as_millis;
40use ckb_tx_pool::service::TxVerificationResult;
41use ckb_types::BlockNumberAndHash;
42use ckb_types::{
43 core::{self, BlockView},
44 packed::{self, Byte32, ProposalShortId},
45 prelude::*,
46};
47use ckb_util::Mutex;
48use itertools::Itertools;
49use std::collections::{HashMap, HashSet};
50use std::sync::Arc;
51use std::time::{Duration, Instant};
52
53pub const TX_PROPOSAL_TOKEN: u64 = 0;
54pub const ASK_FOR_TXS_TOKEN: u64 = 1;
55pub const TX_HASHES_TOKEN: u64 = 2;
56
57pub const MAX_RELAY_PEERS: usize = 128;
58pub const MAX_RELAY_TXS_NUM_PER_BATCH: usize = 32767;
59pub const MAX_RELAY_TXS_BYTES_PER_BATCH: usize = 1024 * 1024;
60
61type RateLimiter<T> = governor::RateLimiter<
62 T,
63 governor::state::keyed::DefaultKeyedStateStore<T>,
64 governor::clock::DefaultClock,
65>;
66
67#[derive(Debug, Eq, PartialEq)]
68pub enum ReconstructionResult {
69 Block(BlockView),
70 Missing(Vec<usize>, Vec<usize>),
71 Collided,
72 Error(Status),
73}
74
75pub struct Relayer {
77 chain: ChainController,
78 pub(crate) shared: Arc<SyncShared>,
79 rate_limiter: Arc<Mutex<RateLimiter<(PeerIndex, u32)>>>,
80 v3: bool,
81}
82
83impl Relayer {
84 pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Self {
88 let quota = governor::Quota::per_second(std::num::NonZeroU32::new(30).unwrap());
91 let rate_limiter = Arc::new(Mutex::new(RateLimiter::keyed(quota)));
92
93 Relayer {
94 chain,
95 shared,
96 rate_limiter,
97 v3: false,
98 }
99 }
100
101 pub fn v3(mut self) -> Self {
103 self.v3 = true;
104 self
105 }
106
107 pub fn shared(&self) -> &Arc<SyncShared> {
109 &self.shared
110 }
111
112 fn try_process(
113 &mut self,
114 nc: Arc<dyn CKBProtocolContext + Sync>,
115 peer: PeerIndex,
116 message: packed::RelayMessageUnionReader<'_>,
117 ) -> Status {
118 let should_check_rate =
120 !matches!(message, packed::RelayMessageUnionReader::CompactBlock(_));
121
122 if should_check_rate
123 && self
124 .rate_limiter
125 .lock()
126 .check_key(&(peer, message.item_id()))
127 .is_err()
128 {
129 return StatusCode::TooManyRequests.with_context(message.item_name());
130 }
131
132 match message {
133 packed::RelayMessageUnionReader::CompactBlock(reader) => {
134 CompactBlockProcess::new(reader, self, nc, peer).execute()
135 }
136 packed::RelayMessageUnionReader::RelayTransactions(reader) => {
137 match RelaySwitch::new(&nc, self.v3) {
140 RelaySwitch::Ckb2023RelayV2 | RelaySwitch::Ckb2021RelayV3 => {
141 return Status::ignored();
142 }
143 RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
144 }
145 if reader.check_data() {
146 TransactionsProcess::new(reader, self, nc, peer).execute()
147 } else {
148 StatusCode::ProtocolMessageIsMalformed
149 .with_context("RelayTransactions is invalid")
150 }
151 }
152 packed::RelayMessageUnionReader::RelayTransactionHashes(reader) => {
153 match RelaySwitch::new(&nc, self.v3) {
156 RelaySwitch::Ckb2023RelayV2 | RelaySwitch::Ckb2021RelayV3 => {
157 return Status::ignored();
158 }
159 RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
160 }
161 TransactionHashesProcess::new(reader, self, peer).execute()
162 }
163 packed::RelayMessageUnionReader::GetRelayTransactions(reader) => {
164 match RelaySwitch::new(&nc, self.v3) {
167 RelaySwitch::Ckb2023RelayV2 | RelaySwitch::Ckb2021RelayV3 => {
168 return Status::ignored();
169 }
170 RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
171 }
172 GetTransactionsProcess::new(reader, self, nc, peer).execute()
173 }
174 packed::RelayMessageUnionReader::GetBlockTransactions(reader) => {
175 GetBlockTransactionsProcess::new(reader, self, nc, peer).execute()
176 }
177 packed::RelayMessageUnionReader::BlockTransactions(reader) => {
178 if reader.check_data() {
179 BlockTransactionsProcess::new(reader, self, nc, peer).execute()
180 } else {
181 StatusCode::ProtocolMessageIsMalformed
182 .with_context("BlockTransactions is invalid")
183 }
184 }
185 packed::RelayMessageUnionReader::GetBlockProposal(reader) => {
186 GetBlockProposalProcess::new(reader, self, nc, peer).execute()
187 }
188 packed::RelayMessageUnionReader::BlockProposal(reader) => {
189 BlockProposalProcess::new(reader, self).execute()
190 }
191 }
192 }
193
194 fn process(
195 &mut self,
196 nc: Arc<dyn CKBProtocolContext + Sync>,
197 peer: PeerIndex,
198 message: packed::RelayMessageUnionReader<'_>,
199 ) {
200 let item_name = message.item_name();
201 let item_bytes = message.as_slice().len() as u64;
202 let status = self.try_process(Arc::clone(&nc), peer, message);
203
204 metric_ckb_message_bytes(
205 MetricDirection::In,
206 &SupportProtocols::RelayV2.name(),
207 message.item_name(),
208 Some(status.code()),
209 item_bytes,
210 );
211
212 if let Some(ban_time) = status.should_ban() {
213 error_target!(
214 crate::LOG_TARGET_RELAY,
215 "receive {} from {}, ban {:?} for {}",
216 item_name,
217 peer,
218 ban_time,
219 status
220 );
221 nc.ban_peer(peer, ban_time, status.to_string());
222 } else if status.should_warn() {
223 warn_target!(
224 crate::LOG_TARGET_RELAY,
225 "receive {} from {}, {}",
226 item_name,
227 peer,
228 status
229 );
230 } else if !status.is_ok() {
231 debug_target!(
232 crate::LOG_TARGET_RELAY,
233 "receive {} from {}, {}",
234 item_name,
235 peer,
236 status
237 );
238 }
239 }
240
241 pub fn request_proposal_txs(
243 &self,
244 nc: &dyn CKBProtocolContext,
245 peer: PeerIndex,
246 block_hash_and_number: BlockNumberAndHash,
247 proposals: Vec<packed::ProposalShortId>,
248 ) {
249 let tx_pool = self.shared.shared().tx_pool_controller();
250 let fresh_proposals: Vec<ProposalShortId> = match tx_pool.fresh_proposals_filter(proposals)
251 {
252 Err(err) => {
253 debug_target!(
254 crate::LOG_TARGET_RELAY,
255 "tx_pool fresh_proposals_filter error: {:?}",
256 err,
257 );
258 return;
259 }
260 Ok(fresh_proposals) => fresh_proposals.into_iter().unique().collect(),
261 };
262
263 let to_ask_proposals: Vec<ProposalShortId> = self
264 .shared()
265 .state()
266 .insert_inflight_proposals(fresh_proposals.clone(), block_hash_and_number.number)
267 .into_iter()
268 .zip(fresh_proposals)
269 .filter_map(|(firstly_in, id)| if firstly_in { Some(id) } else { None })
270 .collect();
271 if !to_ask_proposals.is_empty() {
272 let content = packed::GetBlockProposal::new_builder()
273 .block_hash(block_hash_and_number.hash)
274 .proposals(to_ask_proposals.clone().pack())
275 .build();
276 let message = packed::RelayMessage::new_builder().set(content).build();
277 if !send_message_to(nc, peer, &message).is_ok() {
278 self.shared()
279 .state()
280 .remove_inflight_proposals(&to_ask_proposals);
281 }
282 }
283 }
284
285 #[allow(clippy::needless_collect)]
287 pub fn accept_block(
288 &self,
289 nc: Arc<dyn CKBProtocolContext + Sync>,
290 peer_id: PeerIndex,
291 block: core::BlockView,
292 msg_name: &str,
293 ) {
294 if self
295 .shared()
296 .active_chain()
297 .contains_block_status(&block.hash(), BlockStatus::BLOCK_STORED)
298 {
299 return;
300 }
301
302 let block = Arc::new(block);
303
304 let verify_callback = {
305 let nc: Arc<dyn CKBProtocolContext + Sync> = Arc::clone(&nc);
306 let block = Arc::clone(&block);
307 let shared = Arc::clone(self.shared());
308 let msg_name = msg_name.to_owned();
309 Box::new(move |result: VerifyResult| match result {
310 Ok(verified) => {
311 if !verified {
312 debug!(
313 "block {}-{} has verified already, won't build compact block and broadcast it",
314 block.number(),
315 block.hash()
316 );
317 return;
318 }
319
320 build_and_broadcast_compact_block(nc.as_ref(), shared.shared(), peer_id, block);
321 }
322 Err(err) => {
323 error!(
324 "verify block {}-{} failed: {:?}, won't build compact block and broadcast it",
325 block.number(),
326 block.hash(),
327 err
328 );
329
330 let is_internal_db_error = is_internal_db_error(&err);
331 if is_internal_db_error {
332 return;
333 }
334
335 post_sync_process(
337 nc.as_ref(),
338 peer_id,
339 &msg_name,
340 StatusCode::BlockIsInvalid.with_context(format!(
341 "block {} is invalid, reason: {}",
342 block.hash(),
343 err
344 )),
345 );
346 }
347 })
348 };
349
350 let remote_block = RemoteBlock {
351 block,
352 verify_callback,
353 };
354
355 self.shared.accept_remote_block(&self.chain, remote_block);
356 }
357
358 pub fn reconstruct_block(
368 &self,
369 active_chain: &ActiveChain,
370 compact_block: &packed::CompactBlock,
371 received_transactions: Vec<core::TransactionView>,
372 uncles_index: &[u32],
373 received_uncles: &[core::UncleBlockView],
374 ) -> ReconstructionResult {
375 let block_txs_len = received_transactions.len();
376 let compact_block_hash = compact_block.calc_header_hash();
377 debug_target!(
378 crate::LOG_TARGET_RELAY,
379 "start block reconstruction, block hash: {}, received transactions len: {}",
380 compact_block_hash,
381 block_txs_len,
382 );
383
384 let mut short_ids_set: HashSet<ProposalShortId> =
385 compact_block.short_ids().into_iter().collect();
386
387 let mut txs_map: HashMap<ProposalShortId, core::TransactionView> = received_transactions
388 .into_iter()
389 .filter_map(|tx| {
390 let short_id = tx.proposal_short_id();
391 if short_ids_set.remove(&short_id) {
392 Some((short_id, tx))
393 } else {
394 None
395 }
396 })
397 .collect();
398
399 if !short_ids_set.is_empty() {
400 let tx_pool = self.shared.shared().tx_pool_controller();
401 let fetch_txs = tx_pool.fetch_txs(short_ids_set);
402 if let Err(e) = fetch_txs {
403 return ReconstructionResult::Error(StatusCode::TxPool.with_context(e));
404 }
405 txs_map.extend(fetch_txs.unwrap());
406 }
407
408 let txs_len = compact_block.txs_len();
409 let mut block_transactions: Vec<Option<core::TransactionView>> =
410 Vec::with_capacity(txs_len);
411
412 let short_ids_iter = &mut compact_block.short_ids().into_iter();
413 compact_block
415 .prefilled_transactions()
416 .into_iter()
417 .for_each(|pt| {
418 let index: usize = pt.index().unpack();
419 let gap = index - block_transactions.len();
420 if gap > 0 {
421 short_ids_iter
422 .take(gap)
423 .for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
424 }
425 block_transactions.push(Some(pt.transaction().into_view()));
426 });
427
428 short_ids_iter.for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
430
431 let missing = block_transactions.iter().any(Option::is_none);
432
433 let mut missing_uncles = Vec::with_capacity(compact_block.uncles().len());
434 let mut uncles = Vec::with_capacity(compact_block.uncles().len());
435
436 let mut position = 0;
437 for (i, uncle_hash) in compact_block.uncles().into_iter().enumerate() {
438 if uncles_index.contains(&(i as u32)) {
439 uncles.push(
440 received_uncles
441 .get(position)
442 .expect("have checked the indexes")
443 .clone()
444 .data(),
445 );
446 position += 1;
447 continue;
448 };
449 let status = active_chain.get_block_status(&uncle_hash);
450 match status {
451 BlockStatus::UNKNOWN | BlockStatus::HEADER_VALID => missing_uncles.push(i),
452 BlockStatus::BLOCK_STORED | BlockStatus::BLOCK_VALID => {
453 if let Some(uncle) = active_chain.get_block(&uncle_hash) {
454 uncles.push(uncle.as_uncle().data());
455 } else {
456 debug_target!(
457 crate::LOG_TARGET_RELAY,
458 "reconstruct_block could not find {:#?} uncle block: {:#?}",
459 status,
460 uncle_hash,
461 );
462 missing_uncles.push(i);
463 }
464 }
465 BlockStatus::BLOCK_RECEIVED => {
466 if let Some(uncle) = self
467 .chain
468 .get_orphan_block(self.shared().store(), &uncle_hash)
469 {
470 uncles.push(uncle.as_uncle().data());
471 } else {
472 debug_target!(
473 crate::LOG_TARGET_RELAY,
474 "reconstruct_block could not find {:#?} uncle block: {:#?}",
475 status,
476 uncle_hash,
477 );
478 missing_uncles.push(i);
479 }
480 }
481 BlockStatus::BLOCK_INVALID => {
482 return ReconstructionResult::Error(
483 StatusCode::CompactBlockHasInvalidUncle.with_context(uncle_hash),
484 );
485 }
486 _ => missing_uncles.push(i),
487 }
488 }
489
490 if !missing && missing_uncles.is_empty() {
491 let txs = block_transactions
492 .into_iter()
493 .collect::<Option<Vec<_>>>()
494 .expect("missing checked, should not fail");
495 let block = if let Some(extension) = compact_block.extension() {
496 packed::BlockV1::new_builder()
497 .header(compact_block.header())
498 .uncles(uncles.pack())
499 .transactions(txs.into_iter().map(|tx| tx.data()).pack())
500 .proposals(compact_block.proposals())
501 .extension(extension)
502 .build()
503 .as_v0()
504 } else {
505 packed::Block::new_builder()
506 .header(compact_block.header())
507 .uncles(uncles.pack())
508 .transactions(txs.into_iter().map(|tx| tx.data()).pack())
509 .proposals(compact_block.proposals())
510 .build()
511 }
512 .into_view();
513
514 debug_target!(
515 crate::LOG_TARGET_RELAY,
516 "finish block reconstruction, block hash: {}",
517 compact_block.calc_header_hash(),
518 );
519
520 let compact_block_tx_root = compact_block.header().raw().transactions_root();
521 let reconstruct_block_tx_root = block.transactions_root();
522 if compact_block_tx_root != reconstruct_block_tx_root {
523 if compact_block.short_ids().is_empty()
524 || compact_block.short_ids().len() == block_txs_len
525 {
526 return ReconstructionResult::Error(
527 StatusCode::CompactBlockHasUnmatchedTransactionRootWithReconstructedBlock
528 .with_context(format!(
529 "Compact_block_tx_root({}) != reconstruct_block_tx_root({})",
530 compact_block.header().raw().transactions_root(),
531 block.transactions_root(),
532 )),
533 );
534 } else {
535 if let Some(metrics) = ckb_metrics::handle() {
536 metrics.ckb_relay_transaction_short_id_collide.inc();
537 }
538 return ReconstructionResult::Collided;
539 }
540 }
541
542 ReconstructionResult::Block(block)
543 } else {
544 let missing_indexes: Vec<usize> = block_transactions
545 .iter()
546 .enumerate()
547 .filter_map(|(i, t)| if t.is_none() { Some(i) } else { None })
548 .collect();
549
550 debug_target!(
551 crate::LOG_TARGET_RELAY,
552 "block reconstruction failed, block hash: {}, missing: {}, total: {}",
553 compact_block.calc_header_hash(),
554 missing_indexes.len(),
555 compact_block.short_ids().len(),
556 );
557
558 ReconstructionResult::Missing(missing_indexes, missing_uncles)
559 }
560 }
561
562 fn prune_tx_proposal_request(&self, nc: &dyn CKBProtocolContext) {
563 let get_block_proposals = self.shared().state().drain_get_block_proposals();
564 let tx_pool = self.shared.shared().tx_pool_controller();
565
566 let fetch_txs = tx_pool.fetch_txs(
567 get_block_proposals
568 .iter()
569 .map(|kv_pair| kv_pair.key().clone())
570 .collect(),
571 );
572 if let Err(err) = fetch_txs {
573 debug_target!(
574 crate::LOG_TARGET_RELAY,
575 "relayer prune_tx_proposal_request internal error: {:?}",
576 err,
577 );
578 return;
579 }
580
581 let txs = fetch_txs.unwrap();
582
583 let mut peer_txs = HashMap::new();
584 for (id, peer_indices) in get_block_proposals.into_iter() {
585 if let Some(tx) = txs.get(&id) {
586 for peer_index in peer_indices {
587 let tx_set = peer_txs.entry(peer_index).or_insert_with(Vec::new);
588 tx_set.push(tx.clone());
589 }
590 }
591 }
592
593 let send_block_proposals =
594 |nc: &dyn CKBProtocolContext, peer_index: PeerIndex, txs: Vec<packed::Transaction>| {
595 let content = packed::BlockProposal::new_builder()
596 .transactions(txs.into_iter().pack())
597 .build();
598 let message = packed::RelayMessage::new_builder().set(content).build();
599 let status = send_message_to(nc, peer_index, &message);
600 if !status.is_ok() {
601 ckb_logger::error!(
602 "send RelayBlockProposal to {}, status: {:?}",
603 peer_index,
604 status
605 );
606 }
607 };
608
609 let mut relay_bytes = 0;
610 let mut relay_proposals = Vec::new();
611 for (peer_index, txs) in peer_txs {
612 for tx in txs {
613 let data = tx.data();
614 let tx_size = data.total_size();
615 if relay_bytes + tx_size > MAX_RELAY_TXS_BYTES_PER_BATCH {
616 send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals));
617 relay_bytes = tx_size;
618 } else {
619 relay_bytes += tx_size;
620 }
621 relay_proposals.push(data);
622 }
623 if !relay_proposals.is_empty() {
624 send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals));
625 relay_bytes = 0;
626 }
627 }
628 }
629
630 pub fn ask_for_txs(&self, nc: &dyn CKBProtocolContext) {
632 for (peer, mut tx_hashes) in self.shared().state().pop_ask_for_txs() {
633 if !tx_hashes.is_empty() {
634 debug_target!(
635 crate::LOG_TARGET_RELAY,
636 "Send get transaction ({} hashes) to {}",
637 tx_hashes.len(),
638 peer,
639 );
640 tx_hashes.truncate(MAX_RELAY_TXS_NUM_PER_BATCH);
641 let content = packed::GetRelayTransactions::new_builder()
642 .tx_hashes(tx_hashes.pack())
643 .build();
644 let message = packed::RelayMessage::new_builder().set(content).build();
645 let status = send_message_to(nc, peer, &message);
646 if !status.is_ok() {
647 ckb_logger::error!(
648 "interrupted request for transactions, status: {:?}",
649 status
650 );
651 }
652 }
653 }
654 }
655
656 pub fn send_bulk_of_tx_hashes(&self, nc: &dyn CKBProtocolContext) {
658 const BUFFER_SIZE: usize = 42;
659
660 let connected_peers = nc.connected_peers();
661 if connected_peers.is_empty() {
662 return;
663 }
664
665 let ckb2023 = nc.ckb2023();
666 let tx_verify_results = self
667 .shared
668 .state()
669 .take_relay_tx_verify_results(MAX_RELAY_TXS_NUM_PER_BATCH);
670 let mut selected: HashMap<PeerIndex, Vec<Byte32>> = HashMap::default();
671 {
672 for tx_verify_result in tx_verify_results {
673 match tx_verify_result {
674 TxVerificationResult::Ok {
675 original_peer,
676 with_vm_2023,
677 tx_hash,
678 } => {
679 if ckb2023 != with_vm_2023 {
681 continue;
682 }
683 for target in &connected_peers {
684 match original_peer {
685 Some(peer) => {
686 if peer != *target {
688 let hashes = selected
689 .entry(*target)
690 .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
691 hashes.push(tx_hash.clone());
692 }
693 }
694 None => {
695 let hashes = selected
697 .entry(*target)
698 .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
699 hashes.push(tx_hash.clone());
700 self.shared.state().mark_as_known_tx(tx_hash.clone());
701 }
702 }
703 }
704 }
705 TxVerificationResult::Reject { tx_hash } => {
706 self.shared.state().remove_from_known_txs(&tx_hash);
707 }
708 TxVerificationResult::UnknownParents { peer, parents } => {
709 let tx_hashes: Vec<_> = {
710 let mut tx_filter = self.shared.state().tx_filter();
711 tx_filter.remove_expired();
712 parents
713 .into_iter()
714 .filter(|tx_hash| !tx_filter.contains(tx_hash))
715 .collect()
716 };
717 self.shared.state().add_ask_for_txs(peer, tx_hashes);
718 }
719 }
720 }
721 }
722 for (peer, hashes) in selected {
723 let content = packed::RelayTransactionHashes::new_builder()
724 .tx_hashes(hashes.pack())
725 .build();
726 let message = packed::RelayMessage::new_builder().set(content).build();
727
728 if let Err(err) = nc.filter_broadcast(TargetSession::Single(peer), message.as_bytes()) {
729 debug_target!(
730 crate::LOG_TARGET_RELAY,
731 "relayer send TransactionHashes error: {:?}",
732 err,
733 );
734 }
735 }
736 }
737}
738
739fn build_and_broadcast_compact_block(
740 nc: &dyn CKBProtocolContext,
741 shared: &Shared,
742 peer: PeerIndex,
743 block: Arc<BlockView>,
744) {
745 debug_target!(
746 crate::LOG_TARGET_RELAY,
747 "[block_relay] relayer accept_block {} {}",
748 block.header().hash(),
749 unix_time_as_millis()
750 );
751 let block_hash = block.hash();
752 shared.remove_header_view(&block_hash);
753 let cb = packed::CompactBlock::build_from_block(&block, &HashSet::new());
754 let message = packed::RelayMessage::new_builder().set(cb).build();
755
756 let selected_peers: Vec<PeerIndex> = nc
757 .connected_peers()
758 .into_iter()
759 .filter(|target_peer| peer != *target_peer)
760 .take(MAX_RELAY_PEERS)
761 .collect();
762 if let Err(err) = nc.quick_filter_broadcast(
763 TargetSession::Multi(Box::new(selected_peers.into_iter())),
764 message.as_bytes(),
765 ) {
766 debug_target!(
767 crate::LOG_TARGET_RELAY,
768 "relayer send block when accept block error: {:?}",
769 err,
770 );
771 }
772
773 if let Some(p2p_control) = nc.p2p_control() {
774 let snapshot = shared.snapshot();
775 let parent_chain_root = {
776 let mmr = snapshot.chain_root_mmr(block.header().number() - 1);
777 match mmr.get_root() {
778 Ok(root) => root,
779 Err(err) => {
780 error_target!(
781 crate::LOG_TARGET_RELAY,
782 "Generate last state to light client failed: {:?}",
783 err
784 );
785 return;
786 }
787 }
788 };
789
790 let tip_header = packed::VerifiableHeader::new_builder()
791 .header(block.header().data())
792 .uncles_hash(block.calc_uncles_hash())
793 .extension(Pack::pack(&block.extension()))
794 .parent_chain_root(parent_chain_root)
795 .build();
796 let light_client_message = {
797 let content = packed::SendLastState::new_builder()
798 .last_header(tip_header)
799 .build();
800 packed::LightClientMessage::new_builder()
801 .set(content)
802 .build()
803 };
804 let light_client_peers: HashSet<PeerIndex> = nc
805 .connected_peers()
806 .into_iter()
807 .filter_map(|index| nc.get_peer(index).map(|peer| (index, peer)))
808 .filter(|(_id, peer)| peer.if_lightclient_subscribed)
809 .map(|(id, _)| id)
810 .collect();
811 if let Err(err) = p2p_control.filter_broadcast(
812 TargetSession::Filter(Box::new(move |id| light_client_peers.contains(id))),
813 SupportProtocols::LightClient.protocol_id(),
814 light_client_message.as_bytes(),
815 ) {
816 debug_target!(
817 crate::LOG_TARGET_RELAY,
818 "relayer send last state to light client when accept block, error: {:?}",
819 err,
820 );
821 }
822 }
823}
824
825#[async_trait]
826impl CKBProtocolHandler for Relayer {
827 async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
828 nc.set_notify(Duration::from_millis(100), TX_PROPOSAL_TOKEN)
829 .await
830 .expect("set_notify at init is ok");
831 nc.set_notify(Duration::from_millis(100), ASK_FOR_TXS_TOKEN)
832 .await
833 .expect("set_notify at init is ok");
834 nc.set_notify(Duration::from_millis(300), TX_HASHES_TOKEN)
835 .await
836 .expect("set_notify at init is ok");
837 }
838
839 async fn received(
840 &mut self,
841 nc: Arc<dyn CKBProtocolContext + Sync>,
842 peer_index: PeerIndex,
843 data: Bytes,
844 ) {
845 if self.shared.active_chain().is_initial_block_download() {
847 return;
848 }
849
850 let msg = match packed::RelayMessageReader::from_compatible_slice(&data) {
851 Ok(msg) => {
852 let item = msg.to_enum();
853 if let packed::RelayMessageUnionReader::CompactBlock(ref reader) = item {
854 if reader.count_extra_fields() > 1 {
855 info_target!(
856 crate::LOG_TARGET_RELAY,
857 "Peer {} sends us a malformed message: \
858 too many fields in CompactBlock",
859 peer_index
860 );
861 nc.ban_peer(
862 peer_index,
863 BAD_MESSAGE_BAN_TIME,
864 String::from(
865 "send us a malformed message: \
866 too many fields in CompactBlock",
867 ),
868 );
869 return;
870 } else {
871 item
872 }
873 } else {
874 match packed::RelayMessageReader::from_slice(&data) {
875 Ok(msg) => msg.to_enum(),
876 _ => {
877 info_target!(
878 crate::LOG_TARGET_RELAY,
879 "Peer {} sends us a malformed message: \
880 too many fields",
881 peer_index
882 );
883 nc.ban_peer(
884 peer_index,
885 BAD_MESSAGE_BAN_TIME,
886 String::from(
887 "send us a malformed message \
888 too many fields",
889 ),
890 );
891 return;
892 }
893 }
894 }
895 }
896 _ => {
897 info_target!(
898 crate::LOG_TARGET_RELAY,
899 "Peer {} sends us a malformed message",
900 peer_index
901 );
902 nc.ban_peer(
903 peer_index,
904 BAD_MESSAGE_BAN_TIME,
905 String::from("send us a malformed message"),
906 );
907 return;
908 }
909 };
910
911 debug_target!(
912 crate::LOG_TARGET_RELAY,
913 "received msg {} from {}",
914 msg.item_name(),
915 peer_index
916 );
917 #[cfg(feature = "with_sentry")]
918 {
919 let sentry_hub = sentry::Hub::current();
920 let _scope_guard = sentry_hub.push_scope();
921 sentry_hub.configure_scope(|scope| {
922 scope.set_tag("p2p.protocol", "relayer");
923 scope.set_tag("p2p.message", msg.item_name());
924 });
925 }
926
927 let start_time = Instant::now();
928 tokio::task::block_in_place(|| self.process(nc, peer_index, msg));
929 debug_target!(
930 crate::LOG_TARGET_RELAY,
931 "process message={}, peer={}, cost={:?}",
932 msg.item_name(),
933 peer_index,
934 Instant::now().saturating_duration_since(start_time),
935 );
936 }
937
938 async fn connected(
939 &mut self,
940 _nc: Arc<dyn CKBProtocolContext + Sync>,
941 peer_index: PeerIndex,
942 version: &str,
943 ) {
944 self.shared().state().peers().relay_connected(peer_index);
945 info_target!(
946 crate::LOG_TARGET_RELAY,
947 "RelayProtocol({}).connected peer={}",
948 version,
949 peer_index
950 );
951 }
952
953 async fn disconnected(
954 &mut self,
955 _nc: Arc<dyn CKBProtocolContext + Sync>,
956 peer_index: PeerIndex,
957 ) {
958 info_target!(
959 crate::LOG_TARGET_RELAY,
960 "RelayProtocol.disconnected peer={}",
961 peer_index
962 );
963 self.rate_limiter.lock().retain_recent();
965 }
966
967 async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
968 if self.shared.active_chain().is_initial_block_download() {
970 return;
971 }
972
973 match RelaySwitch::new(&nc, self.v3) {
974 RelaySwitch::Ckb2021RelayV3 => return,
975 RelaySwitch::Ckb2023RelayV2 => {
976 if nc.remove_notify(TX_PROPOSAL_TOKEN).await.is_err() {
977 trace_target!(crate::LOG_TARGET_RELAY, "remove v2 relay notify fail");
978 }
979 if nc.remove_notify(ASK_FOR_TXS_TOKEN).await.is_err() {
980 trace_target!(crate::LOG_TARGET_RELAY, "remove v2 relay notify fail");
981 }
982 if nc.remove_notify(TX_HASHES_TOKEN).await.is_err() {
983 trace_target!(crate::LOG_TARGET_RELAY, "remove v2 relay notify fail");
984 }
985 for kv_pair in self.shared().state().peers().state.iter() {
986 let (peer, state) = kv_pair.pair();
987 if !state.peer_flags.is_2023edition {
988 let _ignore = nc.disconnect(*peer, "Evict low-version clients ");
989 }
990 }
991 return;
992 }
993 RelaySwitch::Ckb2023RelayV3 | RelaySwitch::Ckb2021RelayV2 => (),
994 }
995
996 let start_time = Instant::now();
997 trace_target!(crate::LOG_TARGET_RELAY, "start notify token={}", token);
998 match token {
999 TX_PROPOSAL_TOKEN => {
1000 tokio::task::block_in_place(|| self.prune_tx_proposal_request(nc.as_ref()))
1001 }
1002 ASK_FOR_TXS_TOKEN => self.ask_for_txs(nc.as_ref()),
1003 TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(nc.as_ref()),
1004 _ => unreachable!(),
1005 }
1006 trace_target!(
1007 crate::LOG_TARGET_RELAY,
1008 "finished notify token={} cost={:?}",
1009 token,
1010 Instant::now().saturating_duration_since(start_time)
1011 );
1012 }
1013}
1014
1015#[derive(Copy, Clone, Debug)]
1016enum RelaySwitch {
1017 Ckb2021RelayV2,
1018 Ckb2021RelayV3,
1019 Ckb2023RelayV2,
1020 Ckb2023RelayV3,
1021}
1022
1023impl RelaySwitch {
1024 fn new(nc: &Arc<dyn CKBProtocolContext + Sync>, is_relay_v3: bool) -> Self {
1025 match (nc.ckb2023(), is_relay_v3) {
1026 (true, true) => Self::Ckb2023RelayV3,
1027 (true, false) => Self::Ckb2023RelayV2,
1028 (false, true) => Self::Ckb2021RelayV3,
1029 (false, false) => Self::Ckb2021RelayV2,
1030 }
1031 }
1032}