use crate::relayer::block_transactions_verifier::BlockTransactionsVerifier;
use crate::relayer::block_uncles_verifier::BlockUnclesVerifier;
use crate::relayer::{ReconstructionResult, Relayer};
use crate::utils::send_message_to;
use crate::{attempt, Status, StatusCode};
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_types::{core, packed, prelude::*};
use std::collections::hash_map::Entry;
use std::mem;
use std::sync::Arc;
pub struct BlockTransactionsProcess<'a> {
message: packed::BlockTransactionsReader<'a>,
relayer: &'a Relayer,
nc: Arc<dyn CKBProtocolContext>,
peer: PeerIndex,
}
impl<'a> BlockTransactionsProcess<'a> {
pub fn new(
message: packed::BlockTransactionsReader<'a>,
relayer: &'a Relayer,
nc: Arc<dyn CKBProtocolContext>,
peer: PeerIndex,
) -> Self {
BlockTransactionsProcess {
message,
relayer,
nc,
peer,
}
}
pub fn execute(self) -> Status {
let shared = self.relayer.shared();
let active_chain = shared.active_chain();
let block_transactions = self.message.to_entity();
let block_hash = block_transactions.block_hash();
let received_transactions: Vec<core::TransactionView> = block_transactions
.transactions()
.into_iter()
.map(|tx| tx.into_view())
.collect();
let received_uncles: Vec<core::UncleBlockView> = block_transactions
.uncles()
.into_iter()
.map(|uncle| uncle.into_view())
.collect();
let mut missing_transactions: Vec<u32>;
let mut missing_uncles: Vec<u32>;
let mut collision = false;
if let Entry::Occupied(mut pending) = shared
.state()
.pending_compact_blocks()
.entry(block_hash.clone())
{
let (compact_block, peers_map, _) = pending.get_mut();
if let Entry::Occupied(mut value) = peers_map.entry(self.peer) {
let (expected_transaction_indexes, expected_uncle_indexes) = value.get_mut();
ckb_logger::info!(
"relayer receive BLOCKTXN of {}, peer: {}",
block_hash,
self.peer
);
attempt!(BlockTransactionsVerifier::verify(
compact_block,
expected_transaction_indexes,
&received_transactions,
));
attempt!(BlockUnclesVerifier::verify(
compact_block,
expected_uncle_indexes,
&received_uncles,
));
let ret = self.relayer.reconstruct_block(
&active_chain,
compact_block,
received_transactions,
expected_uncle_indexes,
&received_uncles,
);
{
let proposals: Vec<_> = received_uncles
.into_iter()
.flat_map(|u| u.data().proposals().into_iter())
.collect();
self.relayer.request_proposal_txs(
self.nc.as_ref(),
self.peer,
(
compact_block.header().into_view().number(),
block_hash.clone(),
)
.into(),
proposals,
);
}
match ret {
ReconstructionResult::Block(block) => {
pending.remove();
let status = self
.relayer
.accept_block(self.nc.as_ref(), self.peer, block);
return status;
}
ReconstructionResult::Missing(transactions, uncles) => {
missing_transactions = transactions
.into_iter()
.map(|i| i as u32)
.chain(expected_transaction_indexes.iter().copied())
.collect();
missing_uncles = uncles
.into_iter()
.map(|i| i as u32)
.chain(expected_uncle_indexes.iter().copied())
.collect();
missing_transactions.sort_unstable();
missing_uncles.sort_unstable();
}
ReconstructionResult::Collided => {
missing_transactions = compact_block
.short_id_indexes()
.into_iter()
.map(|i| i as u32)
.collect();
collision = true;
missing_uncles = vec![];
}
ReconstructionResult::Error(status) => {
return status;
}
}
assert!(!missing_transactions.is_empty() || !missing_uncles.is_empty());
let content = packed::GetBlockTransactions::new_builder()
.block_hash(block_hash.clone())
.indexes(missing_transactions.pack())
.uncle_indexes(missing_uncles.pack())
.build();
let message = packed::RelayMessage::new_builder().set(content).build();
attempt!(send_message_to(self.nc.as_ref(), self.peer, &message));
let _ignore_prev_value =
mem::replace(expected_transaction_indexes, missing_transactions);
let _ignore_prev_value = mem::replace(expected_uncle_indexes, missing_uncles);
if collision {
return StatusCode::CompactBlockMeetsShortIdsCollision.with_context(block_hash);
} else {
return StatusCode::CompactBlockRequiresFreshTransactions
.with_context(block_hash);
}
}
}
Status::ignored()
}
}