1mod block_proposal_process;
2mod block_transactions_process;
3mod block_transactions_verifier;
4mod block_uncles_verifier;
5mod compact_block_process;
6mod compact_block_verifier;
7mod get_block_proposal_process;
8mod get_block_transactions_process;
9mod get_transactions_process;
10#[cfg(test)]
11pub(crate) mod tests;
12mod transaction_hashes_process;
13mod transactions_process;
14
15use self::block_proposal_process::BlockProposalProcess;
16use self::block_transactions_process::BlockTransactionsProcess;
17pub(crate) use self::compact_block_process::CompactBlockProcess;
18use self::get_block_proposal_process::GetBlockProposalProcess;
19use self::get_block_transactions_process::GetBlockTransactionsProcess;
20use self::get_transactions_process::GetTransactionsProcess;
21use self::transaction_hashes_process::TransactionHashesProcess;
22use self::transactions_process::TransactionsProcess;
23use crate::types::{ActiveChain, SyncShared, post_sync_process};
24use crate::utils::{
25 MetricDirection, async_quick_send_message_to, async_send_message_to, metric_ckb_message_bytes,
26 send_block_proposals,
27};
28use crate::{Status, StatusCode};
29use ckb_chain::VerifyResult;
30use ckb_chain::{ChainController, RemoteBlock};
31use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
32use ckb_error::is_internal_db_error;
33use ckb_logger::{
34 debug, debug_target, error, error_target, info_target, trace_target, warn_target,
35};
36use ckb_network::{
37 CKBProtocolContext, CKBProtocolHandler, PeerIndex, SupportProtocols, TargetSession,
38 async_trait, bytes::Bytes,
39};
40use ckb_shared::Shared;
41use ckb_shared::block_status::BlockStatus;
42use ckb_systemtime::unix_time_as_millis;
43use ckb_tx_pool::service::TxVerificationResult;
44use ckb_types::BlockNumberAndHash;
45use ckb_types::{
46 core::{self, BlockView},
47 packed::{self, Byte32, ProposalShortId},
48 prelude::*,
49};
50use itertools::Itertools;
51use std::collections::{HashMap, HashSet};
52use std::sync::Arc;
53use std::time::{Duration, Instant};
54
55pub const TX_PROPOSAL_TOKEN: u64 = 0;
56pub const ASK_FOR_TXS_TOKEN: u64 = 1;
57pub const TX_HASHES_TOKEN: u64 = 2;
58
59pub const MAX_RELAY_PEERS: usize = 128;
60pub const MAX_RELAY_TXS_NUM_PER_BATCH: usize = 32767;
61pub const MAX_RELAY_TXS_BYTES_PER_BATCH: usize = 1024 * 1024;
62
63type RateLimiter<T> = governor::RateLimiter<
64 T,
65 governor::state::keyed::HashMapStateStore<T>,
66 governor::clock::DefaultClock,
67>;
68
69#[derive(Debug, Eq, PartialEq)]
70pub enum ReconstructionResult {
71 Block(BlockView),
72 Missing(Vec<usize>, Vec<usize>),
73 Collided,
74 Error(Status),
75}
76
77pub struct Relayer {
79 chain: ChainController,
80 pub(crate) shared: Arc<SyncShared>,
81 rate_limiter: RateLimiter<(PeerIndex, u32)>,
82}
83
84impl Relayer {
85 pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Self {
89 let quota = governor::Quota::per_second(std::num::NonZeroU32::new(30).unwrap());
92 let rate_limiter = RateLimiter::hashmap(quota);
93
94 Relayer {
95 chain,
96 shared,
97 rate_limiter,
98 }
99 }
100
101 pub fn shared(&self) -> &Arc<SyncShared> {
103 &self.shared
104 }
105
106 async fn try_process(
107 &mut self,
108 nc: Arc<dyn CKBProtocolContext + Sync>,
109 peer: PeerIndex,
110 message: packed::RelayMessageUnionReader<'_>,
111 ) -> Status {
112 let should_check_rate =
114 !matches!(message, packed::RelayMessageUnionReader::CompactBlock(_));
115
116 if should_check_rate
117 && self
118 .rate_limiter
119 .check_key(&(peer, message.item_id()))
120 .is_err()
121 {
122 return StatusCode::TooManyRequests.with_context(message.item_name());
123 }
124
125 match message {
126 packed::RelayMessageUnionReader::CompactBlock(reader) => {
127 CompactBlockProcess::new(reader, self, nc, peer)
128 .execute()
129 .await
130 }
131 packed::RelayMessageUnionReader::RelayTransactions(reader) => {
132 if reader.check_data() {
133 TransactionsProcess::new(reader, self, nc, peer).execute()
134 } else {
135 StatusCode::ProtocolMessageIsMalformed
136 .with_context("RelayTransactions is invalid")
137 }
138 }
139 packed::RelayMessageUnionReader::RelayTransactionHashes(reader) => {
140 TransactionHashesProcess::new(reader, self, peer).execute()
141 }
142 packed::RelayMessageUnionReader::GetRelayTransactions(reader) => {
143 GetTransactionsProcess::new(reader, self, nc, peer)
144 .execute()
145 .await
146 }
147 packed::RelayMessageUnionReader::GetBlockTransactions(reader) => {
148 GetBlockTransactionsProcess::new(reader, self, nc, peer)
149 .execute()
150 .await
151 }
152 packed::RelayMessageUnionReader::BlockTransactions(reader) => {
153 if reader.check_data() {
154 BlockTransactionsProcess::new(reader, self, nc, peer)
155 .execute()
156 .await
157 } else {
158 StatusCode::ProtocolMessageIsMalformed
159 .with_context("BlockTransactions is invalid")
160 }
161 }
162 packed::RelayMessageUnionReader::GetBlockProposal(reader) => {
163 GetBlockProposalProcess::new(reader, self, nc, peer)
164 .execute()
165 .await
166 }
167 packed::RelayMessageUnionReader::BlockProposal(reader) => {
168 BlockProposalProcess::new(reader, self).execute().await
169 }
170 }
171 }
172
173 async fn process(
174 &mut self,
175 nc: Arc<dyn CKBProtocolContext + Sync>,
176 peer: PeerIndex,
177 message: packed::RelayMessageUnionReader<'_>,
178 ) {
179 let item_name = message.item_name();
180 let item_bytes = message.as_slice().len() as u64;
181 let status = self.try_process(Arc::clone(&nc), peer, message).await;
182
183 metric_ckb_message_bytes(
184 MetricDirection::In,
185 &SupportProtocols::RelayV3.name(),
186 message.item_name(),
187 Some(status.code()),
188 item_bytes,
189 );
190
191 if let Some(ban_time) = status.should_ban() {
192 error_target!(
193 crate::LOG_TARGET_RELAY,
194 "receive {} from {}, ban {:?} for {}",
195 item_name,
196 peer,
197 ban_time,
198 status
199 );
200 nc.ban_peer(peer, ban_time, status.to_string());
201 } else if status.should_warn() {
202 warn_target!(
203 crate::LOG_TARGET_RELAY,
204 "receive {} from {}, {}",
205 item_name,
206 peer,
207 status
208 );
209 } else if !status.is_ok() {
210 debug_target!(
211 crate::LOG_TARGET_RELAY,
212 "receive {} from {}, {}",
213 item_name,
214 peer,
215 status
216 );
217 }
218 }
219
220 pub fn request_proposal_txs(
222 &self,
223 nc: &Arc<dyn CKBProtocolContext + Sync>,
224 peer: PeerIndex,
225 block_hash_and_number: BlockNumberAndHash,
226 proposals: Vec<packed::ProposalShortId>,
227 ) {
228 let tx_pool = self.shared.shared().tx_pool_controller().clone();
229 let shared = Arc::clone(&self.shared);
230 let nc = Arc::clone(nc);
231 self.shared().shared().async_handle().spawn(async move {
232 let fresh_proposals: Vec<ProposalShortId> =
233 match tx_pool.fresh_proposals_filter(proposals).await {
234 Err(err) => {
235 debug_target!(
236 crate::LOG_TARGET_RELAY,
237 "tx_pool fresh_proposals_filter error: {:?}",
238 err,
239 );
240 return;
241 }
242 Ok(fresh_proposals) => fresh_proposals.into_iter().unique().collect(),
243 };
244
245 let to_ask_proposals: Vec<ProposalShortId> = shared
246 .state()
247 .insert_inflight_proposals(fresh_proposals.clone(), block_hash_and_number.number)
248 .into_iter()
249 .zip(fresh_proposals)
250 .filter_map(|(firstly_in, id)| if firstly_in { Some(id) } else { None })
251 .collect();
252 if !to_ask_proposals.is_empty() {
253 let content = packed::GetBlockProposal::new_builder()
254 .block_hash(block_hash_and_number.hash)
255 .proposals(to_ask_proposals.clone())
256 .build();
257 let message = packed::RelayMessage::new_builder().set(content).build();
258 if !async_quick_send_message_to(&nc, peer, &message)
259 .await
260 .is_ok()
261 {
262 shared.state().remove_inflight_proposals(&to_ask_proposals);
263 }
264 }
265 });
266 }
267
268 #[allow(clippy::needless_collect)]
270 pub fn accept_block(
271 &self,
272 nc: Arc<dyn CKBProtocolContext + Sync>,
273 peer_id: PeerIndex,
274 block: core::BlockView,
275 msg_name: &str,
276 ) {
277 if self
278 .shared()
279 .active_chain()
280 .contains_block_status(&block.hash(), BlockStatus::BLOCK_STORED)
281 {
282 return;
283 }
284
285 let block = Arc::new(block);
286
287 let verify_callback = {
288 let nc: Arc<dyn CKBProtocolContext + Sync> = Arc::clone(&nc);
289 let block = Arc::clone(&block);
290 let shared = Arc::clone(self.shared());
291 let msg_name = msg_name.to_owned();
292 Box::new(move |result: VerifyResult| match result {
293 Ok(verified) => {
294 if !verified {
295 debug!(
296 "block {}-{} has verified already, won't build compact block and broadcast it",
297 block.number(),
298 block.hash()
299 );
300 return;
301 }
302
303 build_and_broadcast_compact_block(nc.as_ref(), shared.shared(), peer_id, block);
304 }
305 Err(err) => {
306 error!(
307 "verify block {}-{} failed: {:?}, won't build compact block and broadcast it",
308 block.number(),
309 block.hash(),
310 err
311 );
312
313 let is_internal_db_error = is_internal_db_error(&err);
314 if is_internal_db_error {
315 return;
316 }
317
318 post_sync_process(
320 nc.as_ref(),
321 peer_id,
322 &msg_name,
323 StatusCode::BlockIsInvalid.with_context(format!(
324 "block {} is invalid, reason: {}",
325 block.hash(),
326 err
327 )),
328 );
329 }
330 })
331 };
332
333 let remote_block = RemoteBlock {
334 block,
335 verify_callback,
336 };
337
338 self.shared.accept_remote_block(&self.chain, remote_block);
339 }
340
341 pub async fn reconstruct_block(
351 &self,
352 active_chain: &ActiveChain,
353 compact_block: &packed::CompactBlock,
354 received_transactions: Vec<core::TransactionView>,
355 uncles_index: &[u32],
356 received_uncles: &[core::UncleBlockView],
357 ) -> ReconstructionResult {
358 let block_txs_len = received_transactions.len();
359 let compact_block_hash = compact_block.calc_header_hash();
360 debug_target!(
361 crate::LOG_TARGET_RELAY,
362 "start block reconstruction, block hash: {}, received transactions len: {}",
363 compact_block_hash,
364 block_txs_len,
365 );
366
367 let mut short_ids_set: HashSet<ProposalShortId> =
368 compact_block.short_ids().into_iter().collect();
369
370 let mut txs_map: HashMap<ProposalShortId, core::TransactionView> = received_transactions
371 .into_iter()
372 .filter_map(|tx| {
373 let short_id = tx.proposal_short_id();
374 if short_ids_set.remove(&short_id) {
375 Some((short_id, tx))
376 } else {
377 None
378 }
379 })
380 .collect();
381
382 if !short_ids_set.is_empty() {
383 let tx_pool = self.shared.shared().tx_pool_controller();
384 let fetch_txs = tx_pool.fetch_txs(short_ids_set).await;
385 if let Err(e) = fetch_txs {
386 return ReconstructionResult::Error(StatusCode::TxPool.with_context(e));
387 }
388 txs_map.extend(fetch_txs.unwrap());
389 }
390
391 let txs_len = compact_block.txs_len();
392 let mut block_transactions: Vec<Option<core::TransactionView>> =
393 Vec::with_capacity(txs_len);
394
395 let short_ids_iter = &mut compact_block.short_ids().into_iter();
396 compact_block
398 .prefilled_transactions()
399 .into_iter()
400 .for_each(|pt| {
401 let index: usize = pt.index().into();
402 let gap = index - block_transactions.len();
403 if gap > 0 {
404 short_ids_iter
405 .take(gap)
406 .for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
407 }
408 block_transactions.push(Some(pt.transaction().into_view()));
409 });
410
411 short_ids_iter.for_each(|short_id| block_transactions.push(txs_map.remove(&short_id)));
413
414 let missing = block_transactions.iter().any(Option::is_none);
415
416 let mut missing_uncles = Vec::with_capacity(compact_block.uncles().len());
417 let mut uncles = Vec::with_capacity(compact_block.uncles().len());
418
419 let mut position = 0;
420 for (i, uncle_hash) in compact_block.uncles().into_iter().enumerate() {
421 if uncles_index.contains(&(i as u32)) {
422 uncles.push(
423 received_uncles
424 .get(position)
425 .expect("have checked the indexes")
426 .clone()
427 .data(),
428 );
429 position += 1;
430 continue;
431 };
432 let status = active_chain.get_block_status(&uncle_hash);
433 match status {
434 BlockStatus::UNKNOWN | BlockStatus::HEADER_VALID => missing_uncles.push(i),
435 BlockStatus::BLOCK_STORED | BlockStatus::BLOCK_VALID => {
436 if let Some(uncle) = active_chain.get_block(&uncle_hash) {
437 uncles.push(uncle.as_uncle().data());
438 } else {
439 debug_target!(
440 crate::LOG_TARGET_RELAY,
441 "reconstruct_block could not find {:#?} uncle block: {:#?}",
442 status,
443 uncle_hash,
444 );
445 missing_uncles.push(i);
446 }
447 }
448 BlockStatus::BLOCK_RECEIVED => {
449 if let Some(uncle) = self
450 .chain
451 .get_orphan_block(self.shared().store(), &uncle_hash)
452 {
453 uncles.push(uncle.as_uncle().data());
454 } else {
455 debug_target!(
456 crate::LOG_TARGET_RELAY,
457 "reconstruct_block could not find {:#?} uncle block: {:#?}",
458 status,
459 uncle_hash,
460 );
461 missing_uncles.push(i);
462 }
463 }
464 BlockStatus::BLOCK_INVALID => {
465 return ReconstructionResult::Error(
466 StatusCode::CompactBlockHasInvalidUncle.with_context(uncle_hash),
467 );
468 }
469 _ => missing_uncles.push(i),
470 }
471 }
472
473 if !missing && missing_uncles.is_empty() {
474 let txs = block_transactions
475 .into_iter()
476 .collect::<Option<Vec<_>>>()
477 .expect("missing checked, should not fail");
478 let block = if let Some(extension) = compact_block.extension() {
479 packed::BlockV1::new_builder()
480 .header(compact_block.header())
481 .uncles(uncles)
482 .transactions(txs.into_iter().map(|tx| tx.data()).collect::<Vec<_>>())
483 .proposals(compact_block.proposals())
484 .extension(extension)
485 .build()
486 .as_v0()
487 } else {
488 packed::Block::new_builder()
489 .header(compact_block.header())
490 .uncles(uncles)
491 .transactions(txs.into_iter().map(|tx| tx.data()).collect::<Vec<_>>())
492 .proposals(compact_block.proposals())
493 .build()
494 }
495 .into_view();
496
497 debug_target!(
498 crate::LOG_TARGET_RELAY,
499 "finish block reconstruction, block hash: {}",
500 compact_block.calc_header_hash(),
501 );
502
503 let compact_block_tx_root = compact_block.header().raw().transactions_root();
504 let reconstruct_block_tx_root = block.transactions_root();
505 if compact_block_tx_root != reconstruct_block_tx_root {
506 if compact_block.short_ids().is_empty()
507 || compact_block.short_ids().len() == block_txs_len
508 {
509 return ReconstructionResult::Error(
510 StatusCode::CompactBlockHasUnmatchedTransactionRootWithReconstructedBlock
511 .with_context(format!(
512 "Compact_block_tx_root({}) != reconstruct_block_tx_root({})",
513 compact_block.header().raw().transactions_root(),
514 block.transactions_root(),
515 )),
516 );
517 } else {
518 if let Some(metrics) = ckb_metrics::handle() {
519 metrics.ckb_relay_transaction_short_id_collide.inc();
520 }
521 return ReconstructionResult::Collided;
522 }
523 }
524
525 ReconstructionResult::Block(block)
526 } else {
527 let missing_indexes: Vec<usize> = block_transactions
528 .iter()
529 .enumerate()
530 .filter_map(|(i, t)| if t.is_none() { Some(i) } else { None })
531 .collect();
532
533 debug_target!(
534 crate::LOG_TARGET_RELAY,
535 "block reconstruction failed, block hash: {}, missing: {}, total: {}",
536 compact_block.calc_header_hash(),
537 missing_indexes.len(),
538 compact_block.short_ids().len(),
539 );
540
541 ReconstructionResult::Missing(missing_indexes, missing_uncles)
542 }
543 }
544
545 async fn prune_tx_proposal_request(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
546 let get_block_proposals = self.shared().state().drain_get_block_proposals();
547 let tx_pool = self.shared.shared().tx_pool_controller();
548
549 let fetch_txs = tx_pool
550 .fetch_txs(
551 get_block_proposals
552 .iter()
553 .map(|kv_pair| kv_pair.key().clone())
554 .collect(),
555 )
556 .await;
557 if let Err(err) = fetch_txs {
558 debug_target!(
559 crate::LOG_TARGET_RELAY,
560 "relayer prune_tx_proposal_request internal error: {:?}",
561 err,
562 );
563 return;
564 }
565
566 let txs = fetch_txs.unwrap();
567
568 let mut peer_txs = HashMap::new();
569 for (id, peer_indices) in get_block_proposals.into_iter() {
570 if let Some(tx) = txs.get(&id) {
571 for peer_index in peer_indices {
572 let tx_set = peer_txs.entry(peer_index).or_insert_with(Vec::new);
573 tx_set.push(tx.clone());
574 }
575 }
576 }
577
578 let mut relay_bytes = 0;
579 let mut relay_proposals = Vec::new();
580 for (peer_index, txs) in peer_txs {
581 for tx in txs {
582 let data = tx.data();
583 let tx_size = data.total_size();
584 if relay_bytes + tx_size > MAX_RELAY_TXS_BYTES_PER_BATCH {
585 send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals))
586 .await;
587 relay_bytes = tx_size;
588 } else {
589 relay_bytes += tx_size;
590 }
591 relay_proposals.push(data);
592 }
593 if !relay_proposals.is_empty() {
594 send_block_proposals(nc, peer_index, std::mem::take(&mut relay_proposals)).await;
595 relay_bytes = 0;
596 }
597 }
598 }
599
600 pub async fn ask_for_txs(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
602 for (peer, mut tx_hashes) in self.shared().state().pop_ask_for_txs() {
603 if !tx_hashes.is_empty() {
604 debug_target!(
605 crate::LOG_TARGET_RELAY,
606 "Send get transaction ({} hashes) to {}",
607 tx_hashes.len(),
608 peer,
609 );
610 tx_hashes.truncate(MAX_RELAY_TXS_NUM_PER_BATCH);
611 let content = packed::GetRelayTransactions::new_builder()
612 .tx_hashes(tx_hashes)
613 .build();
614 let message = packed::RelayMessage::new_builder().set(content).build();
615 let status = async_send_message_to(nc, peer, &message).await;
616 if !status.is_ok() {
617 ckb_logger::error!(
618 "interrupted request for transactions, status: {:?}",
619 status
620 );
621 }
622 }
623 }
624 }
625
626 pub async fn send_bulk_of_tx_hashes(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
628 const BUFFER_SIZE: usize = 42;
629
630 let connected_peers = nc.full_relay_connected_peers();
631 if connected_peers.is_empty() {
632 return;
633 }
634
635 let tx_verify_results = self
636 .shared
637 .state()
638 .take_relay_tx_verify_results(MAX_RELAY_TXS_NUM_PER_BATCH);
639 let mut selected: HashMap<PeerIndex, Vec<Byte32>> = HashMap::default();
640 {
641 for tx_verify_result in tx_verify_results {
642 match tx_verify_result {
643 TxVerificationResult::Ok {
644 original_peer,
645 tx_hash,
646 } => {
647 for target in &connected_peers {
648 match original_peer {
649 Some(peer) => {
650 if peer != *target {
652 let hashes = selected
653 .entry(*target)
654 .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
655 hashes.push(tx_hash.clone());
656 }
657 }
658 None => {
659 let hashes = selected
661 .entry(*target)
662 .or_insert_with(|| Vec::with_capacity(BUFFER_SIZE));
663 hashes.push(tx_hash.clone());
664 self.shared.state().mark_as_known_tx(tx_hash.clone());
665 }
666 }
667 }
668 }
669 TxVerificationResult::Reject { tx_hash } => {
670 self.shared.state().remove_from_known_txs(&tx_hash);
671 }
672 TxVerificationResult::UnknownParents { peer, parents } => {
673 let tx_hashes: Vec<_> = {
674 let mut tx_filter = self.shared.state().tx_filter();
675 tx_filter.remove_expired();
676 parents
677 .into_iter()
678 .filter(|tx_hash| !tx_filter.contains(tx_hash))
679 .collect()
680 };
681 self.shared.state().add_ask_for_txs(peer, tx_hashes);
682 }
683 }
684 }
685 }
686 for (peer, hashes) in selected {
687 let content = packed::RelayTransactionHashes::new_builder()
688 .tx_hashes(hashes)
689 .build();
690 let message = packed::RelayMessage::new_builder().set(content).build();
691
692 if let Err(err) = nc
693 .async_filter_broadcast(TargetSession::Single(peer), message.as_bytes())
694 .await
695 {
696 debug_target!(
697 crate::LOG_TARGET_RELAY,
698 "relayer send TransactionHashes error: {:?}",
699 err,
700 );
701 }
702 }
703 }
704}
705
706fn build_and_broadcast_compact_block(
707 nc: &dyn CKBProtocolContext,
708 shared: &Shared,
709 peer: PeerIndex,
710 block: Arc<BlockView>,
711) {
712 debug_target!(
713 crate::LOG_TARGET_RELAY,
714 "[block_relay] relayer accept_block {} {}",
715 block.header().hash(),
716 unix_time_as_millis()
717 );
718 let block_hash = block.hash();
719 shared.remove_header_view(&block_hash);
720 let cb = packed::CompactBlock::build_from_block(&block, &HashSet::new());
721 let message = packed::RelayMessage::new_builder().set(cb).build();
722
723 let selected_peers: Vec<PeerIndex> = nc
724 .connected_peers()
725 .into_iter()
726 .filter(|target_peer| peer != *target_peer)
727 .take(MAX_RELAY_PEERS)
728 .collect();
729 let handle = shared.async_handle();
730 if let Err(err) = handle.block_on(nc.async_quick_filter_broadcast(
731 TargetSession::Multi(Box::new(selected_peers.into_iter())),
732 message.as_bytes(),
733 )) {
734 debug_target!(
735 crate::LOG_TARGET_RELAY,
736 "relayer send block when accept block error: {:?}",
737 err,
738 );
739 }
740
741 let snapshot = shared.snapshot();
742 let parent_chain_root = {
743 let mmr = snapshot.chain_root_mmr(block.header().number() - 1);
744 match mmr.get_root() {
745 Ok(root) => root,
746 Err(err) => {
747 error_target!(
748 crate::LOG_TARGET_RELAY,
749 "Generate last state to light client failed: {:?}",
750 err
751 );
752 return;
753 }
754 }
755 };
756
757 let tip_header = packed::VerifiableHeader::new_builder()
758 .header(block.header().data())
759 .uncles_hash(block.calc_uncles_hash())
760 .extension(Pack::pack(&block.extension()))
761 .parent_chain_root(parent_chain_root)
762 .build();
763 let light_client_message = {
764 let content = packed::SendLastState::new_builder()
765 .last_header(tip_header)
766 .build();
767 packed::LightClientMessage::new_builder()
768 .set(content)
769 .build()
770 };
771 let light_client_peers: HashSet<PeerIndex> = nc
772 .connected_peers()
773 .into_iter()
774 .filter_map(|index| nc.get_peer(index).map(|peer| (index, peer)))
775 .filter(|(_id, peer)| peer.if_lightclient_subscribed)
776 .map(|(id, _)| id)
777 .collect();
778 if let Err(err) = handle.block_on(nc.async_filter_broadcast_with_proto(
779 SupportProtocols::LightClient.protocol_id(),
780 TargetSession::Filter(Box::new(move |id| light_client_peers.contains(id))),
781 light_client_message.as_bytes(),
782 )) {
783 debug_target!(
784 crate::LOG_TARGET_RELAY,
785 "relayer send last state to light client when accept block, error: {:?}",
786 err,
787 );
788 }
789}
790
791#[async_trait]
792impl CKBProtocolHandler for Relayer {
793 async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
794 nc.set_notify(Duration::from_millis(100), TX_PROPOSAL_TOKEN)
795 .await
796 .expect("set_notify at init is ok");
797 nc.set_notify(Duration::from_millis(100), ASK_FOR_TXS_TOKEN)
798 .await
799 .expect("set_notify at init is ok");
800 nc.set_notify(Duration::from_millis(300), TX_HASHES_TOKEN)
801 .await
802 .expect("set_notify at init is ok");
803 }
804
805 async fn received(
806 &mut self,
807 nc: Arc<dyn CKBProtocolContext + Sync>,
808 peer_index: PeerIndex,
809 data: Bytes,
810 ) {
811 if self.shared.active_chain().is_initial_block_download() {
813 return;
814 }
815
816 let msg = match packed::RelayMessageReader::from_compatible_slice(&data) {
817 Ok(msg) => {
818 let item = msg.to_enum();
819 if let packed::RelayMessageUnionReader::CompactBlock(ref reader) = item {
820 if reader.count_extra_fields() > 1 {
821 info_target!(
822 crate::LOG_TARGET_RELAY,
823 "Peer {} sends us a malformed message: \
824 too many fields in CompactBlock",
825 peer_index
826 );
827 nc.ban_peer(
828 peer_index,
829 BAD_MESSAGE_BAN_TIME,
830 String::from(
831 "send us a malformed message: \
832 too many fields in CompactBlock",
833 ),
834 );
835 return;
836 } else {
837 item
838 }
839 } else {
840 match packed::RelayMessageReader::from_slice(&data) {
841 Ok(msg) => msg.to_enum(),
842 _ => {
843 info_target!(
844 crate::LOG_TARGET_RELAY,
845 "Peer {} sends us a malformed message: \
846 too many fields",
847 peer_index
848 );
849 nc.ban_peer(
850 peer_index,
851 BAD_MESSAGE_BAN_TIME,
852 String::from(
853 "send us a malformed message \
854 too many fields",
855 ),
856 );
857 return;
858 }
859 }
860 }
861 }
862 _ => {
863 info_target!(
864 crate::LOG_TARGET_RELAY,
865 "Peer {} sends us a malformed message",
866 peer_index
867 );
868 nc.ban_peer(
869 peer_index,
870 BAD_MESSAGE_BAN_TIME,
871 String::from("send us a malformed message"),
872 );
873 return;
874 }
875 };
876
877 debug_target!(
878 crate::LOG_TARGET_RELAY,
879 "received msg {} from {}",
880 msg.item_name(),
881 peer_index
882 );
883 #[cfg(feature = "with_sentry")]
884 {
885 let sentry_hub = sentry::Hub::current();
886 let _scope_guard = sentry_hub.push_scope();
887 sentry_hub.configure_scope(|scope| {
888 scope.set_tag("p2p.protocol", "relayer");
889 scope.set_tag("p2p.message", msg.item_name());
890 });
891 }
892
893 let start_time = Instant::now();
894 self.process(nc, peer_index, msg).await;
895 debug_target!(
896 crate::LOG_TARGET_RELAY,
897 "process message={}, peer={}, cost={:?}",
898 msg.item_name(),
899 peer_index,
900 Instant::now().saturating_duration_since(start_time),
901 );
902 }
903
904 async fn connected(
905 &mut self,
906 _nc: Arc<dyn CKBProtocolContext + Sync>,
907 peer_index: PeerIndex,
908 version: &str,
909 ) {
910 self.shared().state().peers().relay_connected(peer_index);
911 info_target!(
912 crate::LOG_TARGET_RELAY,
913 "RelayProtocol({}).connected peer={}",
914 version,
915 peer_index
916 );
917 }
918
919 async fn disconnected(
920 &mut self,
921 _nc: Arc<dyn CKBProtocolContext + Sync>,
922 peer_index: PeerIndex,
923 ) {
924 info_target!(
925 crate::LOG_TARGET_RELAY,
926 "RelayProtocol.disconnected peer={}",
927 peer_index
928 );
929 self.rate_limiter.retain_recent();
931 }
932
933 async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
934 if self.shared.active_chain().is_initial_block_download() {
936 return;
937 }
938
939 let start_time = Instant::now();
940 trace_target!(
941 crate::LOG_TARGET_RELAY,
942 "start notifas_ref()y token={}",
943 token
944 );
945 match token {
946 TX_PROPOSAL_TOKEN => self.prune_tx_proposal_request(&nc).await,
947 ASK_FOR_TXS_TOKEN => self.ask_for_txs(&nc).await,
948 TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(&nc).await,
949 _ => unreachable!(),
950 }
951 trace_target!(
952 crate::LOG_TARGET_RELAY,
953 "finished notify token={} cost={:?}",
954 token,
955 Instant::now().saturating_duration_since(start_time)
956 );
957 }
958}