1mod block_proposal_process;
2mod block_transactions_process;
3mod block_transactions_verifier;
4mod block_uncles_verifier;
5mod compact_block_process;
6mod compact_block_verifier;
7mod get_block_proposal_process;
8mod get_block_transactions_process;
9mod get_transactions_process;
10#[cfg(test)]
11pub(crate) mod tests;
12mod transaction_hashes_process;
13mod transactions_process;
14
15use self::block_proposal_process::BlockProposalProcess;
16use self::block_transactions_process::BlockTransactionsProcess;
17pub(crate) use self::compact_block_process::CompactBlockProcess;
18use self::get_block_proposal_process::GetBlockProposalProcess;
19use self::get_block_transactions_process::GetBlockTransactionsProcess;
20use self::get_transactions_process::GetTransactionsProcess;
21use self::transaction_hashes_process::TransactionHashesProcess;
22use self::transactions_process::TransactionsProcess;
23use crate::types::{ActiveChain, SyncShared, post_sync_process};
24use crate::utils::{MetricDirection, metric_ckb_message_bytes, send_message_to};
25use crate::{Status, StatusCode};
26use ckb_chain::VerifyResult;
27use ckb_chain::{ChainController, RemoteBlock};
28use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
29use ckb_error::is_internal_db_error;
30use ckb_logger::{
31 debug, debug_target, error, error_target, info_target, trace_target, warn_target,
32};
33use ckb_network::{
34 CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols, TargetSession,
35 async_trait, bytes::Bytes, tokio,
36};
37use ckb_shared::Shared;
38use ckb_shared::block_status::BlockStatus;
39use ckb_systemtime::unix_time_as_millis;
40use ckb_tx_pool::service::TxVerificationResult;
41use ckb_types::BlockNumberAndHash;
42use ckb_types::{
43 core::{self, BlockView},
44 packed::{self, Byte32, ProposalShortId},
45 prelude::*,
46};
47use itertools::Itertools;
48use std::collections::{HashMap, HashSet};
49use std::sync::Arc;
50use std::time::{Duration, Instant};
51
52pub const TX_PROPOSAL_TOKEN: u64 = 0;
53pub const ASK_FOR_TXS_TOKEN: u64 = 1;
54pub const TX_HASHES_TOKEN: u64 = 2;
55
56pub const MAX_RELAY_PEERS: usize = 128;
57pub const MAX_RELAY_TXS_NUM_PER_BATCH: usize = 32767;
58pub const MAX_RELAY_TXS_BYTES_PER_BATCH: usize = 1024 * 1024;
59
60type RateLimiter<T> = governor::RateLimiter<
61 T,
62 governor::state::keyed::HashMapStateStore<T>,
63 governor::clock::DefaultClock,
64>;
65
66#[derive(Debug, Eq, PartialEq)]
67pub enum ReconstructionResult {
68 Block(BlockView),
69 Missing(Vec<usize>, Vec<usize>),
70 Collided,
71 Error(Status),
72}
73
74pub struct Relayer {
76 chain: ChainController,
77 pub(crate) shared: Arc<SyncShared>,
78 rate_limiter: RateLimiter<(PeerIndex, u32)>,
79}
80
81impl Relayer {
82 pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Self {
86 let quota = governor::Quota::per_second(std::num::NonZeroU32::new(30).unwrap());
89 let rate_limiter = RateLimiter::hashmap(quota);
90
91 Relayer {
92 chain,
93 shared,
94 rate_limiter,
95 }
96 }
97
98 pub fn shared(&self) -> &Arc<SyncShared> {
100 &self.shared
101 }
102
103 fn try_process(
104 &mut self,
105 nc: Arc<dyn CKBProtocolContext + Sync>,
106 peer: PeerIndex,
107 message: packed::RelayMessageUnionReader<'_>,
108 ) -> Status {
109 let should_check_rate =
111 !matches!(message, packed::RelayMessageUnionReader::CompactBlock(_));
112
113 if should_check_rate
114 && self
115 .rate_limiter
116 .check_key(&(peer, message.item_id()))
117 .is_err()
118 {
119 return StatusCode::TooManyRequests.with_context(message.item_name());
120 }
121
122 match message {
123 packed::RelayMessageUnionReader::CompactBlock(reader) => {
124 CompactBlockProcess::new(reader, self, nc, peer).execute()
125 }
126 packed::RelayMessageUnionReader::RelayTransactions(reader) => {
127 if reader.check_data() {
128 TransactionsProcess::new(reader, self, nc, peer).execute()
129 } else {
130 StatusCode::ProtocolMessageIsMalformed
131 .with_context("RelayTransactions is invalid")
132 }
133 }
134 packed::RelayMessageUnionReader::RelayTransactionHashes(reader) => {
135 TransactionHashesProcess::new(reader, self, peer).execute()
136 }
137 packed::RelayMessageUnionReader::GetRelayTransactions(reader) => {
138 GetTransactionsProcess::new(reader, self, nc, peer).execute()
139 }
140 packed::RelayMessageUnionReader::GetBlockTransactions(reader) => {
141 GetBlockTransactionsProcess::new(reader, self, nc, peer).execute()
142 }
143 packed::RelayMessageUnionReader::BlockTransactions(reader) => {
144 if reader.check_data() {
145 BlockTransactionsProcess::new(reader, self, nc, peer).execute()
146 } else {
147 StatusCode::ProtocolMessageIsMalformed
148 .with_context("BlockTransactions is invalid")
149 }
150 }
151 packed::RelayMessageUnionReader::GetBlockProposal(reader) => {
152 GetBlockProposalProcess::new(reader, self, nc, peer).execute()
153 }
154 packed::RelayMessageUnionReader::BlockProposal(reader) => {
155 BlockProposalProcess::new(reader, self).execute()
156 }
157 }
158 }
159
160 fn process(
161 &mut self,
162 nc: Arc<dyn CKBProtocolContext + Sync>,
163 peer: PeerIndex,
164 message: packed::RelayMessageUnionReader<'_>,
165 ) {
166 let item_name = message.item_name();
167 let item_bytes = message.as_slice().len() as u64;
168 let status = self.try_process(Arc::clone(&nc), peer, message);
169
170 metric_ckb_message_bytes(
171 MetricDirection::In,
172 &SupportProtocols::RelayV3.name(),
173 message.item_name(),
174 Some(status.code()),
175 item_bytes,
176 );
177
178 if let Some(ban_time) = status.should_ban() {
179 error_target!(
180 crate::LOG_TARGET_RELAY,
181 "receive {} from {}, ban {:?} for {}",
182 item_name,
183 peer,
184 ban_time,
185 status
186 );
187 nc.ban_peer(peer, ban_time, status.to_string());
188 } else if status.should_warn() {
189 warn_target!(
190 crate::LOG_TARGET_RELAY,
191 "receive {} from {}, {}",
192 item_name,
193 peer,
194 status
195 );
196 } else if !status.is_ok() {
197 debug_target!(
198 crate::LOG_TARGET_RELAY,
199 "receive {} from {}, {}",
200 item_name,
201 peer,
202 status
203 );
204 }
205 }
206
207 pub fn request_proposal_txs(
209 &self,
210 nc: &dyn CKBProtocolContext,
211 peer: PeerIndex,
212 block_hash_and_number: BlockNumberAndHash,
213 proposals: Vec<packed::ProposalShortId>,
214 ) {
215 let tx_pool = self.shared.shared().tx_pool_controller();
216 let fresh_proposals: Vec<ProposalShortId> = match tx_pool.fresh_proposals_filter(proposals)
217 {
218 Err(err) => {
219 debug_target!(
220 crate::LOG_TARGET_RELAY,
221 "tx_pool fresh_proposals_filter error: {:?}",
222 err,
223 );
224 return;
225 }
226 Ok(fresh_proposals) => fresh_proposals.into_iter().unique().collect(),
227 };
228
229 let to_ask_proposals: Vec<ProposalShortId> = self
230 .shared()
231 .state()
232 .insert_inflight_proposals(fresh_proposals.clone(), block_hash_and_number.number)
233 .into_iter()
234 .zip(fresh_proposals)
235 .filter_map(|(firstly_in, id)| if firstly_in { Some(id) } else { None })
236 .collect();
237 if !to_ask_proposals.is_empty() {
238 let content = packed::GetBlockProposal::new_builder()
239 .block_hash(block_hash_and_number.hash)
240 .proposals(to_ask_proposals.clone())
241 .build();
242 let message = packed::RelayMessage::new_builder().set(content).build();
243 if !send_message_to(nc, peer, &message).is_ok() {
244 self.shared()
245 .state()
246 .remove_inflight_proposals(&to_ask_proposals);
247 }
248 }
249 }
250
251 #[allow(clippy::needless_collect)]
253 pub fn accept_block(
254 &self,
255 nc: Arc<dyn CKBProtocolContext + Sync>,
256 peer_id: PeerIndex,
257 block: core::BlockView,
258 msg_name: &str,
259 ) {
260 if self
261 .shared()
262 .active_chain()
263 .contains_block_status(&block.hash(), BlockStatus::BLOCK_STORED)
264 {
265 return;
266 }
267
268 let block = Arc::new(block);
269
270 let verify_callback = {
271 let nc: Arc<dyn CKBProtocolContext + Sync> = Arc::clone(&nc);
272 let block = Arc::clone(&block);
273 let shared = Arc::clone(self.shared());
274 let msg_name = msg_name.to_owned();
275 Box::new(move |result: VerifyResult| match result {
276 Ok(verified) => {
277 if !verified {
278 debug!(
279 "block {}-{} has verified already, won't build compact block and broadcast it",
280 block.number(),
281 block.hash()
282 );
283 return;
284 }
285
286 build_and_broadcast_compact_block(nc.as_ref(), shared.shared(), peer_id, block);
287 }
288 Err(err) => {
289 error!(
290 "verify block {}-{} failed: {:?}, won't build compact block and broadcast it",
291 block.number(),
292 block.hash(),
293 err
294 );
295
296 let is_internal_db_error = is_internal_db_error(&err);
297 if is_internal_db_error {
298 return;
299 }
300
301 post_sync_process(
303 nc.as_ref(),
304 peer_id,
305 &msg_name,
306 StatusCode::BlockIsInvalid.with_context(format!(
307 "block {} is invalid, reason: {}",
308 block.hash(),
309 err
310 )),
311 );
312 }
313 })
314 };
315
316 let remote_block = RemoteBlock {
317 block,
318 verify_callback,
319 };
320
321 self.shared.accept_remote_block(&self.chain, remote_block);
322 }
323
324 pub fn reconstruct_block(
334 &self,
335 active_chain: &ActiveChain,
336 compact_block: &packed::CompactBlock,
337 received_transactions: Vec<core::TransactionView>,
338 uncles_index: &[u32],
339 received_uncles: &[core::UncleBlockView],
340 ) -> ReconstructionResult {
341 let block_txs_len = received_transactions.len();
342 let compact_block_hash = compact_block.calc_header_hash();
343 debug_target!(
344 crate::LOG_TARGET_RELAY,
345 "start block reconstruction, block hash: {}, received transactions len: {}",
346 compact_block_hash,
347 block_txs_len,
348 );
349
350 let mut short_ids_set: HashSet<ProposalShortId> =
351 compact_block.short_ids().into_iter().collect();
352
353 let mut txs_map: HashMap<ProposalShortId, core::TransactionView> = received_transactions
354 .into_iter()
355 .filter_map(|tx| {
356 let short_id = tx.proposal_short_id();
357 if short_ids_set.remove(&short_id) {
358 Some((short_id, tx))
359 } else {
360 None
361 }
362 })
363 .collect();
364
365 if !short_ids_set.is_empty() {
366 let tx_pool = self.shared.shared().tx_pool_controller();
367 let fetch_txs = tx_pool.fetch_txs(short_ids_set);
368 if let Err(e) = fetch_txs {
369 return ReconstructionResult::Error(StatusCode::TxPool.with_context(e));
370 }
371 txs_map.extend(fetch_txs.unwrap());
372 }
373
374 let txs_len = compact_block.txs_len();
375 let mut block_transactions: Vec<Option<core::TransactionView>> =
376 Vec::with_capacity(txs_len);
377
378 let short_ids_iter = &mut compact_block.short_ids().into_iter();
379 compact_block
381 .prefilled_transactions()
382 .into_iter()
383 .for_each(|pt| {
384 let index: usize = pt.index().into();
385 let gap = index - block_transactions.len();
386 if gap > 0 {
387 short_ids_iter
388 .take(gap)
389 .for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
390 }
391 block_transactions.push(Some(pt.transaction().into_view()));
392 });
393
394 short_ids_iter.for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
396
397 let missing = block_transactions.iter().any(Option::is_none);
398
399 let mut missing_uncles = Vec::with_capacity(compact_block.uncles().len());
400 let mut uncles = Vec::with_capacity(compact_block.uncles().len());
401
402 let mut position = 0;
403 for (i, uncle_hash) in compact_block.uncles().into_iter().enumerate() {
404 if uncles_index.contains(&(i as u32)) {
405 uncles.push(
406 received_uncles
407 .get(position)
408 .expect("have checked the indexes")
409 .clone()
410 .data(),
411 );
412 position += 1;
413 continue;
414 };
415 let status = active_chain.get_block_status(&uncle_hash);
416 match status {
417 BlockStatus::UNKNOWN | BlockStatus::HEADER_VALID => missing_uncles.push(i),
418 BlockStatus::BLOCK_STORED | BlockStatus::BLOCK_VALID => {
419 if let Some(uncle) = active_chain.get_block(&uncle_hash) {
420 uncles.push(uncle.as_uncle().data());
421 } else {
422 debug_target!(
423 crate::LOG_TARGET_RELAY,
424 "reconstruct_block could not find {:#?} uncle block: {:#?}",
425 status,
426 uncle_hash,
427 );
428 missing_uncles.push(i);
429 }
430 }
431 BlockStatus::BLOCK_RECEIVED => {
432 if let Some(uncle) = self
433 .chain
434 .get_orphan_block(self.shared().store(), &uncle_hash)
435 {
436 uncles.push(uncle.as_uncle().data());
437 } else {
438 debug_target!(
439 crate::LOG_TARGET_RELAY,
440 "reconstruct_block could not find {:#?} uncle block: {:#?}",
441 status,
442 uncle_hash,
443 );
444 missing_uncles.push(i);
445 }
446 }
447 BlockStatus::BLOCK_INVALID => {
448 return ReconstructionResult::Error(
449 StatusCode::CompactBlockHasInvalidUncle.with_context(uncle_hash),
450 );
451 }
452 _ => missing_uncles.push(i),
453 }
454 }
455
456 if !missing && missing_uncles.is_empty() {
457 let txs = block_transactions
458 .into_iter()
459 .collect::<Option<Vec<_>>>()
460 .expect("missing checked, should not fail");
461 let block = if let Some(extension) = compact_block.extension() {
462 packed::BlockV1::new_builder()
463 .header(compact_block.header())
464 .uncles(uncles)
465 .transactions(txs.into_iter().map(|tx| tx.data()).collect::<Vec<_>>())
466 .proposals(compact_block.proposals())
467 .extension(extension)
468 .build()
469 .as_v0()
470 } else {
471 packed::Block::new_builder()
472 .header(compact_block.header())
473 .uncles(uncles)
474 .transactions(txs.into_iter().map(|tx| tx.data()).collect::<Vec<_>>())
475 .proposals(compact_block.proposals())
476 .build()
477 }
478 .into_view();
479
480 debug_target!(
481 crate::LOG_TARGET_RELAY,
482 "finish block reconstruction, block hash: {}",
483 compact_block.calc_header_hash(),
484 );
485
486 let compact_block_tx_root = compact_block.header().raw().transactions_root();
487 let reconstruct_block_tx_root = block.transactions_root();
488 if compact_block_tx_root != reconstruct_block_tx_root {
489 if compact_block.short_ids().is_empty()
490 || compact_block.short_ids().len() == block_txs_len
491 {
492 return ReconstructionResult::Error(
493 StatusCode::CompactBlockHasUnmatchedTransactionRootWithReconstructedBlock
494 .with_context(format!(
495 "Compact_block_tx_root({}) != reconstruct_block_tx_root({})",
496 compact_block.header().raw().transactions_root(),
497 block.transactions_root(),
498 )),
499 );
500 } else {
501 if let Some(metrics) = ckb_metrics::handle() {
502 metrics.ckb_relay_transaction_short_id_collide.inc();
503 }
504 return ReconstructionResult::Collided;
505 }
506 }
507
508 ReconstructionResult::Block(block)
509 } else {
510 let missing_indexes: Vec<usize> = block_transactions
511 .iter()
512 .enumerate()
513 .filter_map(|(i, t)| if t.is_none() { Some(i) } else { None })
514 .collect();
515
516 debug_target!(
517 crate::LOG_TARGET_RELAY,
518 "block reconstruction failed, block hash: {}, missing: {}, total: {}",
519 compact_block.calc_header_hash(),
520 missing_indexes.len(),
521 compact_block.short_ids().len(),
522 );
523
524 ReconstructionResult::Missing(missing_indexes, missing_uncles)
525 }
526 }
527
528 fn prune_tx_proposal_request(&self, nc: &dyn CKBProtocolContext) {
529 let get_block_proposals = self.shared().state().drain_get_block_proposals();
530 let tx_pool = self.shared.shared().tx_pool_controller();
531
532 let fetch_txs = tx_pool.fetch_txs(
533 get_block_proposals
534 .iter()
535 .map(|kv_pair| kv_pair.key().clone())
536 .collect(),
537 );
538 if let Err(err) = fetch_txs {
539 debug_target!(
540 crate::LOG_TARGET_RELAY,
541 "relayer prune_tx_proposal_request internal error: {:?}",
542 err,
543 );
544 return;
545 }
546
547 let txs = fetch_txs.unwrap();
548
549 let mut peer_txs = HashMap::new();
550 for (id, peer_indices) in get_block_proposals.into_iter() {
551 if let Some(tx) = txs.get(&id) {
552 for peer_index in peer_indices {
553 let tx_set = peer_txs.entry(peer_index).or_insert_with(Vec::new);
554 tx_set.push(tx.clone());
555 }
556 }
557 }
558
559 let send_block_proposals =
560 |nc: &dyn CKBProtocolContext, peer_index: PeerIndex, txs: Vec<packed::Transaction>| {
561 let content = packed::BlockProposal::new_builder()
562 .transactions(txs)
563 .build();
564 let message = packed::RelayMessage::new_builder().set(content).build();
565 let status = send_message_to(nc, peer_index, &message);
566 if !status.is_ok() {
567 ckb_logger::error!(
568 "send RelayBlockProposal to {}, status: {:?}",
569 peer_index,
570 status
571 );
572 }
573 };
574
575 let mut relay_bytes = 0;
576 let mut relay_proposals = Vec::new();
577 for (peer_index, txs) in peer_txs {
578 for tx in txs {
579 let data = tx.data();
580 let tx_size = data.total_size();
581 if relay_bytes + tx_size > MAX_RELAY_TXS_BYTES_PER_BATCH {
582 send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals));
583 relay_bytes = tx_size;
584 } else {
585 relay_bytes += tx_size;
586 }
587 relay_proposals.push(data);
588 }
589 if !relay_proposals.is_empty() {
590 send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals));
591 relay_bytes = 0;
592 }
593 }
594 }
595
596 pub fn ask_for_txs(&self, nc: &dyn CKBProtocolContext) {
598 for (peer, mut tx_hashes) in self.shared().state().pop_ask_for_txs() {
599 if !tx_hashes.is_empty() {
600 debug_target!(
601 crate::LOG_TARGET_RELAY,
602 "Send get transaction ({} hashes) to {}",
603 tx_hashes.len(),
604 peer,
605 );
606 tx_hashes.truncate(MAX_RELAY_TXS_NUM_PER_BATCH);
607 let content = packed::GetRelayTransactions::new_builder()
608 .tx_hashes(tx_hashes)
609 .build();
610 let message = packed::RelayMessage::new_builder().set(content).build();
611 let status = send_message_to(nc, peer, &message);
612 if !status.is_ok() {
613 ckb_logger::error!(
614 "interrupted request for transactions, status: {:?}",
615 status
616 );
617 }
618 }
619 }
620 }
621
622 pub fn send_bulk_of_tx_hashes(&self, nc: &dyn CKBProtocolContext) {
624 const BUFFER_SIZE: usize = 42;
625
626 let connected_peers = nc.full_relay_connected_peers();
627 if connected_peers.is_empty() {
628 return;
629 }
630
631 let tx_verify_results = self
632 .shared
633 .state()
634 .take_relay_tx_verify_results(MAX_RELAY_TXS_NUM_PER_BATCH);
635 let mut selected: HashMap<PeerIndex, Vec<Byte32>> = HashMap::default();
636 {
637 for tx_verify_result in tx_verify_results {
638 match tx_verify_result {
639 TxVerificationResult::Ok {
640 original_peer,
641 tx_hash,
642 } => {
643 for target in &connected_peers {
644 match original_peer {
645 Some(peer) => {
646 if peer != *target {
648 let hashes = selected
649 .entry(*target)
650 .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
651 hashes.push(tx_hash.clone());
652 }
653 }
654 None => {
655 let hashes = selected
657 .entry(*target)
658 .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
659 hashes.push(tx_hash.clone());
660 self.shared.state().mark_as_known_tx(tx_hash.clone());
661 }
662 }
663 }
664 }
665 TxVerificationResult::Reject { tx_hash } => {
666 self.shared.state().remove_from_known_txs(&tx_hash);
667 }
668 TxVerificationResult::UnknownParents { peer, parents } => {
669 let tx_hashes: Vec<_> = {
670 let mut tx_filter = self.shared.state().tx_filter();
671 tx_filter.remove_expired();
672 parents
673 .into_iter()
674 .filter(|tx_hash| !tx_filter.contains(tx_hash))
675 .collect()
676 };
677 self.shared.state().add_ask_for_txs(peer, tx_hashes);
678 }
679 }
680 }
681 }
682 for (peer, hashes) in selected {
683 let content = packed::RelayTransactionHashes::new_builder()
684 .tx_hashes(hashes)
685 .build();
686 let message = packed::RelayMessage::new_builder().set(content).build();
687
688 if let Err(err) = nc.filter_broadcast(TargetSession::Single(peer), message.as_bytes()) {
689 debug_target!(
690 crate::LOG_TARGET_RELAY,
691 "relayer send TransactionHashes error: {:?}",
692 err,
693 );
694 }
695 }
696 }
697}
698
699fn build_and_broadcast_compact_block(
700 nc: &dyn CKBProtocolContext,
701 shared: &Shared,
702 peer: PeerIndex,
703 block: Arc<BlockView>,
704) {
705 debug_target!(
706 crate::LOG_TARGET_RELAY,
707 "[block_relay] relayer accept_block {} {}",
708 block.header().hash(),
709 unix_time_as_millis()
710 );
711 let block_hash = block.hash();
712 shared.remove_header_view(&block_hash);
713 let cb = packed::CompactBlock::build_from_block(&block, &HashSet::new());
714 let message = packed::RelayMessage::new_builder().set(cb).build();
715
716 let selected_peers: Vec<PeerIndex> = nc
717 .connected_peers()
718 .into_iter()
719 .filter(|target_peer| peer != *target_peer)
720 .take(MAX_RELAY_PEERS)
721 .collect();
722 if let Err(err) = nc.quick_filter_broadcast(
723 TargetSession::Multi(Box::new(selected_peers.into_iter())),
724 message.as_bytes(),
725 ) {
726 debug_target!(
727 crate::LOG_TARGET_RELAY,
728 "relayer send block when accept block error: {:?}",
729 err,
730 );
731 }
732
733 if let Some(p2p_control) = nc.p2p_control() {
734 let snapshot = shared.snapshot();
735 let parent_chain_root = {
736 let mmr = snapshot.chain_root_mmr(block.header().number() - 1);
737 match mmr.get_root() {
738 Ok(root) => root,
739 Err(err) => {
740 error_target!(
741 crate::LOG_TARGET_RELAY,
742 "Generate last state to light client failed: {:?}",
743 err
744 );
745 return;
746 }
747 }
748 };
749
750 let tip_header = packed::VerifiableHeader::new_builder()
751 .header(block.header().data())
752 .uncles_hash(block.calc_uncles_hash())
753 .extension(Pack::pack(&block.extension()))
754 .parent_chain_root(parent_chain_root)
755 .build();
756 let light_client_message = {
757 let content = packed::SendLastState::new_builder()
758 .last_header(tip_header)
759 .build();
760 packed::LightClientMessage::new_builder()
761 .set(content)
762 .build()
763 };
764 let light_client_peers: HashSet<PeerIndex> = nc
765 .connected_peers()
766 .into_iter()
767 .filter_map(|index| nc.get_peer(index).map(|peer| (index, peer)))
768 .filter(|(_id, peer)| peer.if_lightclient_subscribed)
769 .map(|(id, _)| id)
770 .collect();
771 if let Err(err) = p2p_control.filter_broadcast(
772 TargetSession::Filter(Box::new(move |id| light_client_peers.contains(id))),
773 SupportProtocols::LightClient.protocol_id(),
774 light_client_message.as_bytes(),
775 ) {
776 debug_target!(
777 crate::LOG_TARGET_RELAY,
778 "relayer send last state to light client when accept block, error: {:?}",
779 err,
780 );
781 }
782 }
783}
784
785#[async_trait]
786impl CKBProtocolHandler for Relayer {
787 async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
788 nc.set_notify(Duration::from_millis(100), TX_PROPOSAL_TOKEN)
789 .await
790 .expect("set_notify at init is ok");
791 nc.set_notify(Duration::from_millis(100), ASK_FOR_TXS_TOKEN)
792 .await
793 .expect("set_notify at init is ok");
794 nc.set_notify(Duration::from_millis(300), TX_HASHES_TOKEN)
795 .await
796 .expect("set_notify at init is ok");
797 }
798
799 async fn received(
800 &mut self,
801 nc: Arc<dyn CKBProtocolContext + Sync>,
802 peer_index: PeerIndex,
803 data: Bytes,
804 ) {
805 if self.shared.active_chain().is_initial_block_download() {
807 return;
808 }
809
810 let msg = match packed::RelayMessageReader::from_compatible_slice(&data) {
811 Ok(msg) => {
812 let item = msg.to_enum();
813 if let packed::RelayMessageUnionReader::CompactBlock(ref reader) = item {
814 if reader.count_extra_fields() > 1 {
815 info_target!(
816 crate::LOG_TARGET_RELAY,
817 "Peer {} sends us a malformed message: \
818 too many fields in CompactBlock",
819 peer_index
820 );
821 nc.ban_peer(
822 peer_index,
823 BAD_MESSAGE_BAN_TIME,
824 String::from(
825 "send us a malformed message: \
826 too many fields in CompactBlock",
827 ),
828 );
829 return;
830 } else {
831 item
832 }
833 } else {
834 match packed::RelayMessageReader::from_slice(&data) {
835 Ok(msg) => msg.to_enum(),
836 _ => {
837 info_target!(
838 crate::LOG_TARGET_RELAY,
839 "Peer {} sends us a malformed message: \
840 too many fields",
841 peer_index
842 );
843 nc.ban_peer(
844 peer_index,
845 BAD_MESSAGE_BAN_TIME,
846 String::from(
847 "send us a malformed message \
848 too many fields",
849 ),
850 );
851 return;
852 }
853 }
854 }
855 }
856 _ => {
857 info_target!(
858 crate::LOG_TARGET_RELAY,
859 "Peer {} sends us a malformed message",
860 peer_index
861 );
862 nc.ban_peer(
863 peer_index,
864 BAD_MESSAGE_BAN_TIME,
865 String::from("send us a malformed message"),
866 );
867 return;
868 }
869 };
870
871 debug_target!(
872 crate::LOG_TARGET_RELAY,
873 "received msg {} from {}",
874 msg.item_name(),
875 peer_index
876 );
877 #[cfg(feature = "with_sentry")]
878 {
879 let sentry_hub = sentry::Hub::current();
880 let _scope_guard = sentry_hub.push_scope();
881 sentry_hub.configure_scope(|scope| {
882 scope.set_tag("p2p.protocol", "relayer");
883 scope.set_tag("p2p.message", msg.item_name());
884 });
885 }
886
887 let start_time = Instant::now();
888 tokio::task::block_in_place(|| self.process(nc, peer_index, msg));
889 debug_target!(
890 crate::LOG_TARGET_RELAY,
891 "process message={}, peer={}, cost={:?}",
892 msg.item_name(),
893 peer_index,
894 Instant::now().saturating_duration_since(start_time),
895 );
896 }
897
898 async fn connected(
899 &mut self,
900 _nc: Arc<dyn CKBProtocolContext + Sync>,
901 peer_index: PeerIndex,
902 version: &str,
903 ) {
904 self.shared().state().peers().relay_connected(peer_index);
905 info_target!(
906 crate::LOG_TARGET_RELAY,
907 "RelayProtocol({}).connected peer={}",
908 version,
909 peer_index
910 );
911 }
912
913 async fn disconnected(
914 &mut self,
915 _nc: Arc<dyn CKBProtocolContext + Sync>,
916 peer_index: PeerIndex,
917 ) {
918 info_target!(
919 crate::LOG_TARGET_RELAY,
920 "RelayProtocol.disconnected peer={}",
921 peer_index
922 );
923 self.rate_limiter.retain_recent();
925 }
926
927 async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
928 if self.shared.active_chain().is_initial_block_download() {
930 return;
931 }
932
933 let start_time = Instant::now();
934 trace_target!(crate::LOG_TARGET_RELAY, "start notify token={}", token);
935 match token {
936 TX_PROPOSAL_TOKEN => {
937 tokio::task::block_in_place(|| self.prune_tx_proposal_request(nc.as_ref()))
938 }
939 ASK_FOR_TXS_TOKEN => self.ask_for_txs(nc.as_ref()),
940 TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(nc.as_ref()),
941 _ => unreachable!(),
942 }
943 trace_target!(
944 crate::LOG_TARGET_RELAY,
945 "finished notify token={} cost={:?}",
946 token,
947 Instant::now().saturating_duration_since(start_time)
948 );
949 }
950}