#[cfg(test)]
use {
crate::repair::standard_repair_handler::StandardRepairHandler,
solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path_auto_delete},
solana_runtime::bank_forks::BankForks,
};
use {
crate::{
cluster_slots_service::cluster_slots::ClusterSlots,
repair::{
duplicate_repair_status::get_ancestor_hash_repair_sample_size,
quic_endpoint::RemoteRequest,
repair_handler::RepairHandler,
repair_service::{OutstandingShredRepairs, REPAIR_MS, RepairStats},
request_response::RequestResponse,
result::{Error, RepairVerifyError, Result},
},
},
agave_votor_messages::migration::MigrationStatus,
bincode::{Options, serialize},
bytes::Bytes,
crossbeam_channel::{Receiver, RecvTimeoutError},
lru::LruCache,
rand::{
Rng,
distr::{
Distribution,
weighted::{Error as WeightedError, WeightedIndex},
},
},
serde::{Deserialize, Serialize},
solana_clock::Slot,
solana_cluster_type::ClusterType,
solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::{ContactInfo, Protocol},
ping_pong::{self, Pong},
weighted_shuffle::WeightedShuffle,
},
solana_hash::{HASH_BYTES, Hash},
solana_keypair::{Keypair, signable::Signable},
solana_ledger::shred::{self, Nonce, SIZE_OF_NONCE, ShredFetchStats},
solana_net_utils::SocketAddrSpace,
solana_packet::PACKET_DATA_SIZE,
solana_perf::{
data_budget::DataBudget,
packet::{Packet, PacketBatch, PacketBatchRecycler, RecycledPacketBatch},
},
solana_poh::poh_recorder::SharedLeaderState,
solana_pubkey::{PUBKEY_BYTES, Pubkey},
solana_runtime::bank_forks::SharableBanks,
solana_signature::{SIGNATURE_BYTES, Signature},
solana_signer::Signer,
solana_streamer::{
sendmmsg::{SendPktsError, batch_send},
streamer::PacketBatchSender,
},
solana_time_utils::timestamp,
std::{
cmp::Reverse,
collections::{HashMap, HashSet},
net::{SocketAddr, UdpSocket},
sync::{
Arc, RwLock,
atomic::{AtomicBool, Ordering},
},
thread::{Builder, JoinHandle},
time::{Duration, Instant},
},
tokio::sync::mpsc::Sender as AsyncSender,
};
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 11;
pub(crate) const REPAIR_PEERS_CACHE_CAPACITY: usize = 128;
const REPAIR_PEERS_CACHE_TTL: Duration = Duration::from_secs(10);
#[cfg(test)]
static_assertions::const_assert_eq!(MAX_ANCESTOR_BYTES_IN_PACKET, 1220);
pub const MAX_ANCESTOR_BYTES_IN_PACKET: usize =
PACKET_DATA_SIZE -
SIZE_OF_NONCE -
4 -
4 ;
pub const MAX_ANCESTOR_RESPONSES: usize =
MAX_ANCESTOR_BYTES_IN_PACKET / std::mem::size_of::<(Slot, Hash)>();
const REPAIR_PING_TOKEN_SIZE: usize = HASH_BYTES;
pub const REPAIR_PING_CACHE_CAPACITY: usize = 65536;
pub const REPAIR_PING_CACHE_TTL: Duration = Duration::from_secs(1280);
const REPAIR_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(2);
pub(crate) const REPAIR_RESPONSE_SERIALIZED_PING_BYTES: usize =
4 + PUBKEY_BYTES + REPAIR_PING_TOKEN_SIZE + SIGNATURE_BYTES;
const SIGNED_REPAIR_TIME_WINDOW: Duration = Duration::from_secs(60 * 10);
#[cfg(test)]
static_assertions::const_assert_eq!(MAX_ANCESTOR_RESPONSES, 30);
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum ShredRepairType {
Orphan(Slot),
HighestShred(Slot, u64),
Shred(Slot, u64),
}
impl ShredRepairType {
pub fn slot(&self) -> Slot {
match self {
ShredRepairType::Orphan(slot)
| ShredRepairType::HighestShred(slot, _)
| ShredRepairType::Shred(slot, _) => *slot,
}
}
}
impl RequestResponse for ShredRepairType {
type Response = [u8]; fn num_expected_responses(&self) -> u32 {
match self {
ShredRepairType::Orphan(_) => MAX_ORPHAN_REPAIR_RESPONSES as u32,
ShredRepairType::Shred(_, _) | ShredRepairType::HighestShred(_, _) => 1,
}
}
fn verify_response(&self, shred: &Self::Response) -> bool {
#[inline]
fn get_shred_index(shred: &[u8]) -> Option<u64> {
shred::layout::get_index(shred).map(u64::from)
}
let Some(shred_slot) = shred::layout::get_slot(shred) else {
return false;
};
match self {
ShredRepairType::Orphan(slot) => shred_slot <= *slot,
ShredRepairType::HighestShred(slot, index) => {
shred_slot == *slot && get_shred_index(shred) >= Some(*index)
}
ShredRepairType::Shred(slot, index) => {
shred_slot == *slot && get_shred_index(shred) == Some(*index)
}
}
}
}
#[derive(Copy, Clone)]
pub struct AncestorHashesRepairType(pub Slot);
impl AncestorHashesRepairType {
pub fn slot(&self) -> Slot {
self.0
}
}
#[derive(Debug, Deserialize, Serialize)]
pub enum AncestorHashesResponse {
Hashes(Vec<(Slot, Hash)>),
Ping(Ping),
}
impl RequestResponse for AncestorHashesRepairType {
type Response = AncestorHashesResponse;
fn num_expected_responses(&self) -> u32 {
1
}
fn verify_response(&self, response: &AncestorHashesResponse) -> bool {
match response {
AncestorHashesResponse::Hashes(hashes) => hashes.len() <= MAX_ANCESTOR_RESPONSES,
AncestorHashesResponse::Ping(ping) => ping.verify(),
}
}
}
#[derive(Default)]
struct ServeRepairStats {
total_requests: usize,
dropped_requests_outbound_bandwidth: usize,
dropped_requests_load_shed: usize,
dropped_requests_load_shed_sigverify: usize,
dropped_requests_low_stake: usize,
whitelisted_requests: usize,
total_dropped_response_packets: usize,
total_response_packets: usize,
total_response_bytes_staked: usize,
total_response_bytes_unstaked: usize,
processed: usize,
window_index: usize,
highest_window_index: usize,
orphan: usize,
pong: usize,
ancestor_hashes: usize,
window_index_misses: usize,
ping_cache_check_failed: usize,
pings_sent: usize,
decode_time_us: u64,
handle_requests_time_us: u64,
handle_requests_staked: usize,
handle_requests_unstaked: usize,
err_self_repair: usize,
err_time_skew: usize,
err_malformed: usize,
err_sig_verify: usize,
err_unsigned: usize,
err_id_mismatch: usize,
}
#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[derive(Debug, Deserialize, Serialize)]
pub struct RepairRequestHeader {
signature: Signature,
sender: Pubkey,
recipient: Pubkey,
timestamp: u64,
nonce: Nonce,
}
impl RepairRequestHeader {
pub fn new(sender: Pubkey, recipient: Pubkey, timestamp: u64, nonce: Nonce) -> Self {
Self {
signature: Signature::default(),
sender,
recipient,
timestamp,
nonce,
}
}
}
type Ping = ping_pong::Ping<REPAIR_PING_TOKEN_SIZE>;
type PingCache = ping_pong::PingCache<REPAIR_PING_TOKEN_SIZE>;
#[cfg_attr(
feature = "frozen-abi",
derive(AbiEnumVisitor, AbiExample),
frozen_abi(digest = "HbUQDATKfpN8pjyyarSGa8uN4SuLNTvMf7T5b66ajnNZ")
)]
#[derive(Debug, Deserialize, Serialize)]
pub enum RepairProtocol {
LegacyWindowIndex,
LegacyHighestWindowIndex,
LegacyOrphan,
LegacyWindowIndexWithNonce,
LegacyHighestWindowIndexWithNonce,
LegacyOrphanWithNonce,
LegacyAncestorHashes,
Pong(ping_pong::Pong),
WindowIndex {
header: RepairRequestHeader,
slot: Slot,
shred_index: u64,
},
HighestWindowIndex {
header: RepairRequestHeader,
slot: Slot,
shred_index: u64,
},
Orphan {
header: RepairRequestHeader,
slot: Slot,
},
AncestorHashes {
header: RepairRequestHeader,
slot: Slot,
},
}
const REPAIR_REQUEST_PONG_SERIALIZED_BYTES: usize = PUBKEY_BYTES + HASH_BYTES + SIGNATURE_BYTES;
const REPAIR_REQUEST_MIN_BYTES: usize = REPAIR_REQUEST_PONG_SERIALIZED_BYTES;
fn discard_malformed_repair_requests(
requests: &mut Vec<RemoteRequest>,
stats: &mut ServeRepairStats,
) -> usize {
let num_requests = requests.len();
requests.retain(|request| request.bytes.len() >= REPAIR_REQUEST_MIN_BYTES);
stats.err_malformed += num_requests - requests.len();
requests.len()
}
#[derive(Debug, Deserialize, Serialize)]
pub(crate) enum RepairResponse {
Ping(Ping),
}
impl RepairProtocol {
fn sender(&self) -> Option<&Pubkey> {
match self {
Self::LegacyWindowIndex
| Self::LegacyHighestWindowIndex
| Self::LegacyOrphan
| Self::LegacyWindowIndexWithNonce
| Self::LegacyHighestWindowIndexWithNonce
| Self::LegacyOrphanWithNonce
| Self::LegacyAncestorHashes => None,
Self::Pong(pong) => Some(pong.from()),
Self::WindowIndex { header, .. } => Some(&header.sender),
Self::HighestWindowIndex { header, .. } => Some(&header.sender),
Self::Orphan { header, .. } => Some(&header.sender),
Self::AncestorHashes { header, .. } => Some(&header.sender),
}
}
fn supports_signature(&self) -> bool {
match self {
Self::LegacyWindowIndex
| Self::LegacyHighestWindowIndex
| Self::LegacyOrphan
| Self::LegacyWindowIndexWithNonce
| Self::LegacyHighestWindowIndexWithNonce
| Self::LegacyOrphanWithNonce
| Self::LegacyAncestorHashes => false,
Self::Pong(_)
| Self::WindowIndex { .. }
| Self::HighestWindowIndex { .. }
| Self::Orphan { .. }
| Self::AncestorHashes { .. } => true,
}
}
fn max_response_packets(&self) -> usize {
match self {
RepairProtocol::WindowIndex { .. }
| RepairProtocol::HighestWindowIndex { .. }
| RepairProtocol::AncestorHashes { .. } => 1,
RepairProtocol::Orphan { .. } => MAX_ORPHAN_REPAIR_RESPONSES,
RepairProtocol::Pong(_) => 0, RepairProtocol::LegacyWindowIndex
| RepairProtocol::LegacyHighestWindowIndex
| RepairProtocol::LegacyOrphan
| RepairProtocol::LegacyWindowIndexWithNonce
| RepairProtocol::LegacyHighestWindowIndexWithNonce
| RepairProtocol::LegacyOrphanWithNonce
| RepairProtocol::LegacyAncestorHashes => 0, }
}
fn max_response_bytes(&self) -> usize {
self.max_response_packets() * PACKET_DATA_SIZE
}
}
pub struct ServeRepair {
cluster_info: Arc<ClusterInfo>,
sharable_banks: SharableBanks,
repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
repair_handler: Box<dyn RepairHandler + Send + Sync>,
leader_state: Option<SharedLeaderState>,
migration_status: Arc<MigrationStatus>,
}
pub(crate) struct RepairPeers {
asof: Instant,
peers: Vec<Node>,
weighted_index: WeightedIndex<u64>,
}
struct Node {
pubkey: Pubkey,
serve_repair: SocketAddr,
serve_repair_quic: SocketAddr,
}
impl RepairPeers {
fn new(asof: Instant, peers: &[ContactInfo], weights: &[u64]) -> Result<Self> {
if peers.len() != weights.len() {
return Err(Error::from(WeightedError::InvalidWeight));
}
let (peers, weights): (Vec<_>, Vec<u64>) = peers
.iter()
.zip(weights)
.filter_map(|(peer, &weight)| {
let node = Node {
pubkey: *peer.pubkey(),
serve_repair: peer.serve_repair(Protocol::UDP)?,
serve_repair_quic: peer.serve_repair(Protocol::QUIC)?,
};
Some((node, weight))
})
.unzip();
if peers.is_empty() {
return Err(Error::from(ClusterInfoError::NoPeers));
}
let weighted_index = WeightedIndex::new(weights)?;
Ok(Self {
asof,
peers,
weighted_index,
})
}
fn sample<R: Rng>(&self, rng: &mut R) -> &Node {
let index = self.weighted_index.sample(rng);
&self.peers[index]
}
}
struct RepairRequestWithMeta {
request: RepairProtocol,
from_addr: SocketAddr,
protocol: Protocol,
stake: u64,
whitelisted: bool,
}
impl ServeRepair {
pub fn new(
cluster_info: Arc<ClusterInfo>,
sharable_banks: SharableBanks,
repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
repair_handler: Box<dyn RepairHandler + Send + Sync>,
migration_status: Arc<MigrationStatus>,
) -> Self {
Self {
cluster_info,
sharable_banks,
repair_whitelist,
repair_handler,
leader_state: None,
migration_status,
}
}
pub fn new_with_leader_state(
cluster_info: Arc<ClusterInfo>,
sharable_banks: SharableBanks,
repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
repair_handler: Box<dyn RepairHandler + Send + Sync>,
leader_state: SharedLeaderState,
migration_status: Arc<MigrationStatus>,
) -> Self {
Self {
cluster_info,
sharable_banks,
repair_whitelist,
repair_handler,
leader_state: Some(leader_state),
migration_status,
}
}
#[cfg(test)]
pub fn new_for_test(
cluster_info: Arc<ClusterInfo>,
bank_forks: Arc<RwLock<BankForks>>,
repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
) -> Self {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let repair_handler = Box::new(StandardRepairHandler::new(blockstore));
let bank_forks_r = bank_forks.read().unwrap();
Self::new(
cluster_info,
bank_forks_r.sharable_banks(),
repair_whitelist,
repair_handler,
bank_forks_r.migration_status(),
)
}
#[cfg(test)]
pub(crate) fn my_id(&self) -> Pubkey {
self.cluster_info.id()
}
fn handle_repair(
&self,
recycler: &PacketBatchRecycler,
from_addr: &SocketAddr,
request: RepairProtocol,
stats: &mut ServeRepairStats,
ping_cache: &mut PingCache,
) -> Option<PacketBatch> {
let now = Instant::now();
let (res, label) = {
match &request {
RepairProtocol::WindowIndex {
header: RepairRequestHeader { nonce, .. },
slot,
shred_index,
} => {
stats.window_index += 1;
let batch = self.repair_handler.run_window_request(
recycler,
from_addr,
*slot,
*shred_index,
*nonce,
);
if batch.is_none() {
stats.window_index_misses += 1;
}
(batch, "WindowIndexWithNonce")
}
RepairProtocol::HighestWindowIndex {
header: RepairRequestHeader { nonce, .. },
slot,
shred_index: highest_index,
} => {
stats.highest_window_index += 1;
(
self.repair_handler.run_highest_window_request(
recycler,
from_addr,
*slot,
*highest_index,
*nonce,
),
"HighestWindowIndexWithNonce",
)
}
RepairProtocol::Orphan {
header: RepairRequestHeader { nonce, .. },
slot,
} => {
stats.orphan += 1;
(
self.repair_handler.run_orphan(
recycler,
from_addr,
*slot,
MAX_ORPHAN_REPAIR_RESPONSES,
*nonce,
),
"OrphanWithNonce",
)
}
RepairProtocol::AncestorHashes {
header: RepairRequestHeader { nonce, .. },
slot,
} => {
stats.ancestor_hashes += 1;
if self
.migration_status
.should_respond_to_ancestor_hashes_requests(*slot)
{
(
self.repair_handler
.run_ancestor_hashes(recycler, from_addr, *slot, *nonce),
"AncestorHashes",
)
} else {
(None, "AncestorHashes")
}
}
RepairProtocol::Pong(pong) => {
stats.pong += 1;
ping_cache.add(pong, *from_addr, Instant::now());
(None, "Pong")
}
RepairProtocol::LegacyWindowIndex
| RepairProtocol::LegacyWindowIndexWithNonce
| RepairProtocol::LegacyHighestWindowIndex
| RepairProtocol::LegacyHighestWindowIndexWithNonce
| RepairProtocol::LegacyOrphan
| RepairProtocol::LegacyOrphanWithNonce
| RepairProtocol::LegacyAncestorHashes => {
error!("Unexpected legacy request: {request:?}");
debug_assert!(
false,
"Legacy requests should have been filtered out during signature \
verification. {request:?}"
);
(None, "Legacy")
}
}
};
Self::report_time_spent(label, &now.elapsed(), "");
res
}
fn report_time_spent(label: &str, time: &Duration, extra: &str) {
let count = time.as_millis();
if count > 5 {
info!("{label} took: {count} ms {extra}");
}
}
fn decode_request(
remote_request: RemoteRequest,
epoch_staked_nodes: &Option<Arc<HashMap<Pubkey, u64>>>,
whitelist: &HashSet<Pubkey>,
my_id: &Pubkey,
socket_addr_space: &SocketAddrSpace,
) -> Result<RepairRequestWithMeta> {
let Ok(request) = deserialize_request::<RepairProtocol>(&remote_request) else {
return Err(Error::from(RepairVerifyError::Malformed));
};
let from_addr = remote_request.remote_address;
if !ContactInfo::is_valid_address(&from_addr, socket_addr_space) {
return Err(Error::from(RepairVerifyError::Malformed));
}
Self::verify_signed_packet(my_id, &remote_request.bytes, &request)?;
if let Some(remote_pubkey) = remote_request.remote_pubkey {
if Some(&remote_pubkey) != request.sender() {
error!(
"remote pubkey {remote_pubkey} != request sender {:?}",
request.sender()
);
}
}
if request.sender() == Some(my_id) {
error!("self repair: from_addr={from_addr} my_id={my_id} request={request:?}");
return Err(Error::from(RepairVerifyError::SelfRepair));
}
let stake = *epoch_staked_nodes
.as_ref()
.and_then(|stakes| stakes.get(request.sender()?))
.unwrap_or(&0);
let whitelisted = request
.sender()
.map(|pubkey| whitelist.contains(pubkey))
.unwrap_or_default();
Ok(RepairRequestWithMeta {
request,
from_addr,
protocol: remote_request.protocol(),
stake,
whitelisted,
})
}
fn record_request_decode_error(error: &Error, stats: &mut ServeRepairStats) {
match error {
Error::RepairVerify(RepairVerifyError::IdMismatch) => {
stats.err_id_mismatch += 1;
}
Error::RepairVerify(RepairVerifyError::Malformed) => {
stats.err_malformed += 1;
}
Error::RepairVerify(RepairVerifyError::SelfRepair) => {
stats.err_self_repair += 1;
}
Error::RepairVerify(RepairVerifyError::SigVerify) => {
stats.err_sig_verify += 1;
}
Error::RepairVerify(RepairVerifyError::TimeSkew) => {
stats.err_time_skew += 1;
}
Error::RepairVerify(RepairVerifyError::Unsigned) => {
stats.err_unsigned += 1;
}
_ => {
debug_assert!(false, "unhandled error {error:?}");
}
}
}
fn decode_requests(
requests: Vec<RemoteRequest>,
epoch_staked_nodes: &Option<Arc<HashMap<Pubkey, u64>>>,
whitelist: &HashSet<Pubkey>,
my_id: &Pubkey,
socket_addr_space: &SocketAddrSpace,
mut remaining_budget_estimate: usize,
stats: &mut ServeRepairStats,
) -> Vec<RepairRequestWithMeta> {
const MIN_RESPONSE_SIZE: usize = PACKET_DATA_SIZE + SIZE_OF_NONCE;
let decode_request = |request| {
if remaining_budget_estimate < MIN_RESPONSE_SIZE {
stats.dropped_requests_load_shed_sigverify += 1;
return None;
}
let result = Self::decode_request(
request,
epoch_staked_nodes,
whitelist,
my_id,
socket_addr_space,
);
match &result {
Ok(req) => {
if req.stake == 0 {
stats.handle_requests_unstaked += 1;
} else {
stats.handle_requests_staked += 1;
}
remaining_budget_estimate -= MIN_RESPONSE_SIZE;
}
Err(e) => {
Self::record_request_decode_error(e, stats);
}
}
result.ok()
};
requests.into_iter().filter_map(decode_request).collect()
}
fn run_listen(
&mut self,
ping_cache: &mut PingCache,
recycler: &PacketBatchRecycler,
requests_receiver: &Receiver<RemoteRequest>,
response_sender: &PacketBatchSender,
repair_response_quic_sender: &AsyncSender<(SocketAddr, Bytes)>,
stats: &mut ServeRepairStats,
data_budget: &DataBudget,
) -> std::result::Result<(), RecvTimeoutError> {
const LEADER_BYTE_COST_MULTIPLIER: usize = 10;
const TIMEOUT: Duration = Duration::from_secs(1);
let mut requests = vec![requests_receiver.recv_timeout(TIMEOUT)?];
const MAX_REQUESTS_PER_ITERATION: usize = 1024;
let mut total_requests = requests.len();
let socket_addr_space = *self.cluster_info.socket_addr_space();
let root_bank = self.sharable_banks.root();
let epoch_staked_nodes = root_bank.epoch_staked_nodes(root_bank.epoch());
let identity_keypair = self.cluster_info.keypair();
let my_id = identity_keypair.pubkey();
let max_buffered_packets = if !self.repair_whitelist.read().unwrap().is_empty() {
4 * MAX_REQUESTS_PER_ITERATION
} else {
2 * MAX_REQUESTS_PER_ITERATION
};
let mut dropped_requests = 0;
let mut well_formed_requests = discard_malformed_repair_requests(&mut requests, stats);
loop {
let mut more: Vec<_> = requests_receiver.try_iter().collect();
if more.is_empty() {
break;
}
total_requests += more.len();
if well_formed_requests > max_buffered_packets {
dropped_requests += more.len();
continue;
}
let retained = discard_malformed_repair_requests(&mut more, stats);
well_formed_requests += retained;
if retained > 0 && well_formed_requests <= max_buffered_packets {
requests.extend(more);
} else {
dropped_requests += more.len();
}
}
stats.dropped_requests_load_shed += dropped_requests;
stats.total_requests += total_requests;
let is_leader = self
.leader_state
.as_ref()
.map(|ls| ls.load().working_bank().is_some())
.unwrap_or(false);
let byte_cost_multiplier = if is_leader {
LEADER_BYTE_COST_MULTIPLIER
} else {
1
};
let decode_start = Instant::now();
let mut decoded_requests = {
let effective_data_budget_estimate =
data_budget.get().saturating_mul(2) / byte_cost_multiplier;
let whitelist = self.repair_whitelist.read().unwrap();
Self::decode_requests(
requests,
&epoch_staked_nodes,
&whitelist,
&my_id,
&socket_addr_space,
effective_data_budget_estimate,
stats,
)
};
let whitelisted_request_count = decoded_requests.iter().filter(|r| r.whitelisted).count();
stats.decode_time_us += decode_start.elapsed().as_micros() as u64;
stats.whitelisted_requests += whitelisted_request_count.min(MAX_REQUESTS_PER_ITERATION);
if decoded_requests.len() > MAX_REQUESTS_PER_ITERATION {
stats.dropped_requests_low_stake += decoded_requests.len() - MAX_REQUESTS_PER_ITERATION;
decoded_requests.sort_unstable_by_key(|r| Reverse((r.whitelisted, r.stake)));
decoded_requests.truncate(MAX_REQUESTS_PER_ITERATION);
}
let handle_requests_start = Instant::now();
self.handle_requests(
ping_cache,
recycler,
decoded_requests,
response_sender,
repair_response_quic_sender,
stats,
data_budget,
byte_cost_multiplier,
);
stats.handle_requests_time_us += handle_requests_start.elapsed().as_micros() as u64;
Ok(())
}
fn report_reset_stats(&self, stats: &mut ServeRepairStats) {
if stats.err_self_repair > 0 {
let my_id = self.cluster_info.id();
warn!(
"{}: Ignored received repair requests from ME: {}",
my_id, stats.err_self_repair,
);
}
datapoint_info!(
"serve_repair-requests_received",
("total_requests", stats.total_requests, i64),
(
"dropped_requests_outbound_bandwidth",
stats.dropped_requests_outbound_bandwidth,
i64
),
(
"dropped_requests_load_shed",
stats.dropped_requests_load_shed,
i64
),
(
"dropped_requests_load_shed_sigverify",
stats.dropped_requests_load_shed_sigverify,
i64
),
(
"dropped_requests_low_stake",
stats.dropped_requests_low_stake,
i64
),
("whitelisted_requests", stats.whitelisted_requests, i64),
(
"total_dropped_response_packets",
stats.total_dropped_response_packets,
i64
),
("handle_requests_staked", stats.handle_requests_staked, i64),
(
"handle_requests_unstaked",
stats.handle_requests_unstaked,
i64
),
("processed", stats.processed, i64),
("total_response_packets", stats.total_response_packets, i64),
(
"total_response_bytes_staked",
stats.total_response_bytes_staked,
i64
),
(
"total_response_bytes_unstaked",
stats.total_response_bytes_unstaked,
i64
),
("self_repair", stats.err_self_repair, i64),
("window_index", stats.window_index, i64),
(
"request-highest-window-index",
stats.highest_window_index,
i64
),
("orphan", stats.orphan, i64),
(
"serve_repair-request-ancestor-hashes",
stats.ancestor_hashes,
i64
),
("pong", stats.pong, i64),
("window_index_misses", stats.window_index_misses, i64),
(
"ping_cache_check_failed",
stats.ping_cache_check_failed,
i64
),
("pings_sent", stats.pings_sent, i64),
("decode_time_us", stats.decode_time_us, i64),
(
"handle_requests_time_us",
stats.handle_requests_time_us,
i64
),
("err_time_skew", stats.err_time_skew, i64),
("err_malformed", stats.err_malformed, i64),
("err_sig_verify", stats.err_sig_verify, i64),
("err_unsigned", stats.err_unsigned, i64),
("err_id_mismatch", stats.err_id_mismatch, i64),
);
*stats = ServeRepairStats::default();
}
pub(crate) fn listen(
mut self,
requests_receiver: Receiver<RemoteRequest>,
response_sender: PacketBatchSender,
repair_response_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
const INTERVAL_MS: u64 = 1000;
const MAX_BYTES_PER_SECOND: usize = 12_000_000;
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
assert!(REPAIR_PING_CACHE_RATE_LIMIT_DELAY > Duration::from_millis(REPAIR_MS));
let mut ping_cache = PingCache::new(
&mut rand::rng(),
Instant::now(),
REPAIR_PING_CACHE_TTL,
REPAIR_PING_CACHE_RATE_LIMIT_DELAY,
REPAIR_PING_CACHE_CAPACITY,
);
let recycler = PacketBatchRecycler::default();
Builder::new()
.name("solRepairListen".to_string())
.spawn(move || {
let mut last_print = Instant::now();
let mut stats = ServeRepairStats::default();
let data_budget = DataBudget::new(MAX_BYTES_PER_INTERVAL);
while !exit.load(Ordering::Relaxed) {
let result = self.run_listen(
&mut ping_cache,
&recycler,
&requests_receiver,
&response_sender,
&repair_response_quic_sender,
&mut stats,
&data_budget,
);
match result {
Ok(_) | Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => {
info!("repair listener disconnected");
return;
}
};
if last_print.elapsed().as_secs() > 2 {
self.report_reset_stats(&mut stats);
last_print = Instant::now();
}
data_budget.update(INTERVAL_MS, |_bytes| MAX_BYTES_PER_INTERVAL);
}
})
.unwrap()
}
fn verify_signed_packet(my_id: &Pubkey, bytes: &[u8], request: &RepairProtocol) -> Result<()> {
match request {
RepairProtocol::LegacyWindowIndex
| RepairProtocol::LegacyHighestWindowIndex
| RepairProtocol::LegacyOrphan
| RepairProtocol::LegacyWindowIndexWithNonce
| RepairProtocol::LegacyHighestWindowIndexWithNonce
| RepairProtocol::LegacyOrphanWithNonce
| RepairProtocol::LegacyAncestorHashes => {
return Err(Error::from(RepairVerifyError::Unsigned));
}
RepairProtocol::Pong(pong) => {
if !pong.verify() {
return Err(Error::from(RepairVerifyError::SigVerify));
}
}
RepairProtocol::WindowIndex { header, .. }
| RepairProtocol::HighestWindowIndex { header, .. }
| RepairProtocol::Orphan { header, .. }
| RepairProtocol::AncestorHashes { header, .. } => {
if &header.recipient != my_id {
return Err(Error::from(RepairVerifyError::IdMismatch));
}
let time_diff_ms = timestamp().abs_diff(header.timestamp);
if u128::from(time_diff_ms) > SIGNED_REPAIR_TIME_WINDOW.as_millis() {
return Err(Error::from(RepairVerifyError::TimeSkew));
}
let Some(leading_buf) = bytes.get(..4) else {
debug_assert!(
false,
"request should have failed deserialization: {request:?}",
);
return Err(Error::from(RepairVerifyError::Malformed));
};
let Some(trailing_buf) = bytes.get(4 + SIGNATURE_BYTES..) else {
debug_assert!(
false,
"request should have failed deserialization: {request:?}",
);
return Err(Error::from(RepairVerifyError::Malformed));
};
let Some(from_id) = request.sender() else {
return Err(Error::from(RepairVerifyError::SigVerify));
};
let signed_data = [leading_buf, trailing_buf].concat();
if !header.signature.verify(from_id.as_ref(), &signed_data) {
return Err(Error::from(RepairVerifyError::SigVerify));
}
}
}
Ok(())
}
fn check_ping_cache(
ping_cache: &mut PingCache,
request: &RepairProtocol,
from_addr: &SocketAddr,
identity_keypair: &Keypair,
) -> (bool, Option<Packet>) {
let mut rng = rand::rng();
let (check, ping) = request
.sender()
.map(|&sender| {
ping_cache.check(
&mut rng,
identity_keypair,
Instant::now(),
(sender, *from_addr),
)
})
.unwrap_or_default();
let ping_pkt = if let Some(ping) = ping {
match request {
RepairProtocol::WindowIndex { .. }
| RepairProtocol::HighestWindowIndex { .. }
| RepairProtocol::Orphan { .. } => {
let ping = RepairResponse::Ping(ping);
Packet::from_data(Some(from_addr), ping).ok()
}
RepairProtocol::AncestorHashes { .. } => {
let ping = AncestorHashesResponse::Ping(ping);
Packet::from_data(Some(from_addr), ping).ok()
}
RepairProtocol::Pong(_) => None,
RepairProtocol::LegacyWindowIndex
| RepairProtocol::LegacyHighestWindowIndex
| RepairProtocol::LegacyOrphan
| RepairProtocol::LegacyWindowIndexWithNonce
| RepairProtocol::LegacyHighestWindowIndexWithNonce
| RepairProtocol::LegacyOrphanWithNonce
| RepairProtocol::LegacyAncestorHashes => {
error!("Unexpected legacy request: {request:?}");
debug_assert!(
false,
"Legacy requests should have been filtered out during signature \
verification. {request:?}"
);
None
}
}
} else {
None
};
(check, ping_pkt)
}
fn handle_requests(
&self,
ping_cache: &mut PingCache,
recycler: &PacketBatchRecycler,
requests: Vec<RepairRequestWithMeta>,
packet_batch_sender: &PacketBatchSender,
repair_response_quic_sender: &AsyncSender<(SocketAddr, Bytes)>,
stats: &mut ServeRepairStats,
data_budget: &DataBudget,
byte_cost_multiplier: usize,
) {
let identity_keypair = self.cluster_info.keypair();
let mut pending_pings = Vec::default();
for RepairRequestWithMeta {
request,
from_addr,
protocol,
stake,
whitelisted: _,
} in requests.into_iter()
{
let max_response_cost = request.max_response_bytes() * byte_cost_multiplier;
if !data_budget.take(max_response_cost) {
stats.dropped_requests_outbound_bandwidth += 1;
continue;
}
if !matches!(&request, RepairProtocol::Pong(_)) && protocol == Protocol::UDP {
let (check, ping_pkt) =
Self::check_ping_cache(ping_cache, &request, &from_addr, &identity_keypair);
if let Some(ping_pkt) = ping_pkt {
pending_pings.push(ping_pkt);
}
if !check {
stats.ping_cache_check_failed += 1;
continue;
}
}
stats.processed += 1;
let Some(rsp) = self.handle_repair(recycler, &from_addr, request, stats, ping_cache)
else {
continue;
};
let num_response_packets = rsp.len();
let num_response_bytes: usize = rsp.iter().map(|p| p.meta().size).sum();
data_budget
.put(max_response_cost.saturating_sub(num_response_bytes * byte_cost_multiplier));
if send_response(
rsp,
protocol,
packet_batch_sender,
repair_response_quic_sender,
) {
stats.total_response_packets += num_response_packets;
match stake > 0 {
true => stats.total_response_bytes_staked += num_response_bytes,
false => stats.total_response_bytes_unstaked += num_response_bytes,
}
} else {
stats.dropped_requests_outbound_bandwidth += 1;
stats.total_dropped_response_packets += num_response_packets;
}
}
if !pending_pings.is_empty() {
stats.pings_sent += pending_pings.len();
let batch = RecycledPacketBatch::new(pending_pings);
let _ = packet_batch_sender.send(batch.into());
}
}
pub fn ancestor_repair_request_bytes(
&self,
keypair: &Keypair,
repair_peer_id: &Pubkey,
request_slot: Slot,
nonce: Nonce,
) -> Result<Vec<u8>> {
let header = RepairRequestHeader {
signature: Signature::default(),
sender: keypair.pubkey(),
recipient: *repair_peer_id,
timestamp: timestamp(),
nonce,
};
let request = RepairProtocol::AncestorHashes {
header,
slot: request_slot,
};
Self::repair_proto_to_bytes(&request, keypair)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn repair_request(
&self,
cluster_slots: &ClusterSlots,
repair_request: ShredRepairType,
peers_cache: &mut LruCache<Slot, RepairPeers>,
repair_stats: &mut RepairStats,
repair_validators: &Option<HashSet<Pubkey>>,
outstanding_requests: &mut OutstandingShredRepairs,
identity_keypair: &Keypair,
repair_request_quic_sender: &AsyncSender<(SocketAddr, Bytes)>,
repair_protocol: Protocol,
) -> Result<Option<(SocketAddr, Vec<u8>)>> {
let slot = repair_request.slot();
let repair_peers = match peers_cache.get(&slot) {
Some(entry) if entry.asof.elapsed() < REPAIR_PEERS_CACHE_TTL => entry,
_ => {
peers_cache.pop(&slot);
let repair_peers =
self.repair_peers(repair_validators, slot, &identity_keypair.pubkey());
let weights = cluster_slots.compute_weights(slot, &repair_peers);
let repair_peers = RepairPeers::new(Instant::now(), &repair_peers, &weights)?;
peers_cache.put(slot, repair_peers);
peers_cache.get(&slot).unwrap()
}
};
let peer = repair_peers.sample(&mut rand::rng());
let nonce = outstanding_requests.add_request(repair_request, timestamp());
let out = self.map_repair_request(
&repair_request,
&peer.pubkey,
repair_stats,
nonce,
identity_keypair,
)?;
debug!(
"Sending repair request from {} to {} for {:#?}",
identity_keypair.pubkey(),
peer.pubkey,
repair_request
);
match repair_protocol {
Protocol::UDP => Ok(Some((peer.serve_repair, out))),
Protocol::QUIC => {
repair_request_quic_sender
.blocking_send((peer.serve_repair_quic, Bytes::from(out)))
.map_err(|_| Error::SendError)?;
Ok(None)
}
}
}
pub(crate) fn repair_request_ancestor_hashes_sample_peers(
&self,
slot: Slot,
cluster_slots: &ClusterSlots,
repair_validators: &Option<HashSet<Pubkey>>,
repair_protocol: Protocol,
my_pubkey: &Pubkey,
) -> Result<Vec<(Pubkey, SocketAddr)>> {
let repair_peers: Vec<_> = self.repair_peers(repair_validators, slot, my_pubkey);
if repair_peers.is_empty() {
return Err(ClusterInfoError::NoPeers.into());
}
let (weights, index) = cluster_slots.compute_weights_exclude_nonfrozen(slot, &repair_peers);
let peers = WeightedShuffle::new("repair_request_ancestor_hashes", weights)
.shuffle(&mut rand::rng())
.map(|i| index[i])
.filter_map(|i| {
let addr = repair_peers[i].serve_repair(repair_protocol)?;
Some((*repair_peers[i].pubkey(), addr))
})
.take(get_ancestor_hash_repair_sample_size())
.collect();
Ok(peers)
}
#[cfg(test)]
pub(crate) fn repair_request_duplicate_compute_best_peer(
&self,
slot: Slot,
cluster_slots: &ClusterSlots,
repair_validators: &Option<HashSet<Pubkey>>,
my_pubkey: &Pubkey,
) -> Option<(Pubkey, SocketAddr)> {
let repair_peers: Vec<_> = self.repair_peers(repair_validators, slot, my_pubkey);
if repair_peers.is_empty() {
return None;
}
let (weights, index) = cluster_slots.compute_weights_exclude_nonfrozen(slot, &repair_peers);
let k = WeightedIndex::new(weights).ok()?.sample(&mut rand::rng());
let n = index[k];
Some((
*repair_peers[n].pubkey(),
repair_peers[n].serve_repair(Protocol::UDP)?,
))
}
pub(crate) fn map_repair_request(
&self,
repair_request: &ShredRepairType,
repair_peer_id: &Pubkey,
repair_stats: &mut RepairStats,
nonce: Nonce,
identity_keypair: &Keypair,
) -> Result<Vec<u8>> {
let header = RepairRequestHeader {
signature: Signature::default(),
sender: identity_keypair.pubkey(),
recipient: *repair_peer_id,
timestamp: timestamp(),
nonce,
};
let request_proto = match repair_request {
ShredRepairType::Shred(slot, shred_index) => {
repair_stats
.shred
.update(repair_peer_id, *slot, *shred_index);
RepairProtocol::WindowIndex {
header,
slot: *slot,
shred_index: *shred_index,
}
}
ShredRepairType::HighestShred(slot, shred_index) => {
repair_stats
.highest_shred
.update(repair_peer_id, *slot, *shred_index);
RepairProtocol::HighestWindowIndex {
header,
slot: *slot,
shred_index: *shred_index,
}
}
ShredRepairType::Orphan(slot) => {
repair_stats.orphan.update(repair_peer_id, *slot, 0);
RepairProtocol::Orphan {
header,
slot: *slot,
}
}
};
Self::repair_proto_to_bytes(&request_proto, identity_keypair)
}
pub(crate) fn handle_repair_response_pings(
repair_socket: &UdpSocket,
keypair: &Keypair,
packet_batch: &mut PacketBatch,
stats: &mut ShredFetchStats,
) {
let mut pending_pongs = Vec::default();
for mut packet in packet_batch.iter_mut() {
if packet.meta().size != REPAIR_RESPONSE_SERIALIZED_PING_BYTES {
continue;
}
if let Ok(RepairResponse::Ping(ping)) = packet.deserialize_slice(..) {
if !ping.verify() {
stats.ping_err_verify_count += 1;
continue;
}
packet.meta_mut().set_discard(true);
stats.ping_count += 1;
let pong = RepairProtocol::Pong(Pong::new(&ping, keypair));
if let Ok(pong) = bincode::serialize(&pong) {
let from_addr = packet.meta().socket_addr();
pending_pongs.push((pong, from_addr));
}
}
}
if !pending_pongs.is_empty() {
let num_pkts = pending_pongs.len();
let pending_pongs = pending_pongs.iter().map(|(bytes, addr)| (bytes, addr));
match batch_send(repair_socket, pending_pongs) {
Ok(()) => (),
Err(SendPktsError::IoError(err, num_failed)) => {
warn!(
"batch_send failed to send {num_failed}/{num_pkts} packets. First error: \
{err:?}"
);
}
}
}
}
pub fn repair_proto_to_bytes(request: &RepairProtocol, keypair: &Keypair) -> Result<Vec<u8>> {
debug_assert!(request.supports_signature());
let mut payload = serialize(&request)?;
let signable_data = [&payload[..4], &payload[4 + SIGNATURE_BYTES..]].concat();
let signature = keypair.sign_message(&signable_data[..]);
payload[4..4 + SIGNATURE_BYTES].copy_from_slice(signature.as_ref());
Ok(payload)
}
fn repair_peers(
&self,
repair_validators: &Option<HashSet<Pubkey>>,
slot: Slot,
my_pubkey: &Pubkey,
) -> Vec<ContactInfo> {
if let Some(repair_validators) = repair_validators {
repair_validators
.iter()
.filter_map(|key| {
if key != my_pubkey {
self.cluster_info.lookup_contact_info(key, |ci| ci.clone())
} else {
None
}
})
.collect()
} else {
self.cluster_info.repair_peers(slot)
}
}
}
#[inline]
pub(crate) fn get_repair_protocol(_: ClusterType) -> Protocol {
Protocol::UDP
}
pub(crate) fn deserialize_request<T>(
request: &RemoteRequest,
) -> std::result::Result<T, bincode::Error>
where
T: serde::de::DeserializeOwned,
{
bincode::options()
.with_limit(request.bytes.len() as u64)
.with_fixint_encoding()
.reject_trailing_bytes()
.deserialize(&request.bytes)
}
fn send_response(
packets: PacketBatch,
protocol: Protocol,
packet_batch_sender: &PacketBatchSender,
repair_response_quic_sender: &AsyncSender<(SocketAddr, Bytes)>,
) -> bool {
match protocol {
Protocol::UDP => packet_batch_sender.send(packets).is_ok(),
Protocol::QUIC => packets
.iter()
.filter_map(|packet| {
let bytes = Bytes::from(Vec::from(packet.data(..)?));
Some((packet.meta().socket_addr(), bytes))
})
.all(|packet| repair_response_quic_sender.blocking_send(packet).is_ok()),
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::repair::repair_response,
agave_feature_set::FeatureSet,
solana_gossip::{contact_info::ContactInfo, socketaddr, socketaddr_any},
solana_hash::Hash,
solana_keypair::Keypair,
solana_ledger::{
blockstore::{Blockstore, make_many_slot_entries},
blockstore_processor::fill_blockstore_slot_with_ticks,
genesis_utils::{GenesisConfigInfo, create_genesis_config},
get_tmp_ledger_path_auto_delete,
shred::{
ProcessShredsStats, ReedSolomonCache, Shred, Shredder, max_ticks_per_n_shreds,
},
},
solana_net_utils::SocketAddrSpace,
solana_perf::packet::{Packet, PacketFlags, PacketRef, deserialize_from_with_limit},
solana_pubkey::Pubkey,
solana_runtime::bank::Bank,
solana_time_utils::timestamp,
std::{io::Cursor, net::Ipv4Addr},
};
#[test]
fn test_serialized_ping_size() {
let mut rng = rand::rng();
let keypair = Keypair::new();
let ping = Ping::new(rng.random(), &keypair);
let ping = RepairResponse::Ping(ping);
let pkt = Packet::from_data(None, ping).unwrap();
assert_eq!(pkt.meta().size, REPAIR_RESPONSE_SERIALIZED_PING_BYTES);
}
#[test]
fn test_deserialize_shred_as_ping() {
let keypair = Keypair::new();
let shred = Shredder::single_shred_for_tests(123, &keypair);
let mut pkt = Packet::default();
shred.copy_to_packet(&mut pkt);
pkt.meta_mut().size = REPAIR_RESPONSE_SERIALIZED_PING_BYTES;
let res = pkt.deserialize_slice::<RepairResponse, _>(..);
if let Ok(RepairResponse::Ping(ping)) = res {
assert!(!ping.verify());
} else {
assert!(res.is_err());
}
}
fn repair_request_header_for_tests() -> RepairRequestHeader {
RepairRequestHeader {
signature: Signature::default(),
sender: Pubkey::default(),
recipient: Pubkey::default(),
timestamp: timestamp(),
nonce: Nonce::default(),
}
}
fn make_remote_request(packet: &Packet) -> RemoteRequest {
RemoteRequest {
remote_pubkey: None,
remote_address: packet.meta().socket_addr(),
bytes: Bytes::from(Vec::from(packet.data(..).unwrap())),
}
}
#[test]
fn test_check_well_formed_repair_request() {
let mut rng = rand::rng();
let keypair = Keypair::new();
let ping = Ping::new(rng.random(), &keypair);
let pong = Pong::new(&ping, &keypair);
let request = RepairProtocol::Pong(pong);
let mut pkt = Packet::from_data(None, request).unwrap();
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 1);
pkt.meta_mut().size = 5;
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 0);
assert_eq!(stats.err_malformed, 1);
let request = RepairProtocol::WindowIndex {
header: repair_request_header_for_tests(),
slot: 123,
shred_index: 456,
};
let mut pkt = Packet::from_data(None, request).unwrap();
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 1);
pkt.meta_mut().size = 8;
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 0);
assert_eq!(stats.err_malformed, 1);
let request = RepairProtocol::AncestorHashes {
header: repair_request_header_for_tests(),
slot: 123,
};
let mut pkt = Packet::from_data(None, request).unwrap();
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 1);
pkt.meta_mut().size = 1;
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 0);
assert_eq!(stats.err_malformed, 1);
let request = RepairProtocol::Orphan {
header: repair_request_header_for_tests(),
slot: 262_547_696,
};
let mut pkt = Packet::from_data(None, request).unwrap();
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 1);
pkt.meta_mut().size = 3;
let mut batch = vec![make_remote_request(&pkt)];
let mut stats = ServeRepairStats::default();
let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats);
assert_eq!(num_well_formed, 0);
assert_eq!(stats.err_malformed, 1);
}
#[test]
fn test_serialize_deserialize_signed_request() {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(bank);
let cluster_info = Arc::new(new_test_cluster_info());
let serve_repair = ServeRepair::new_for_test(
cluster_info.clone(),
bank_forks,
Arc::new(RwLock::new(HashSet::default())),
);
let keypair = cluster_info.keypair();
let repair_peer_id = solana_pubkey::new_rand();
let repair_request = ShredRepairType::Orphan(123);
let rsp = serve_repair
.map_repair_request(
&repair_request,
&repair_peer_id,
&mut RepairStats::default(),
456,
&keypair,
)
.unwrap();
let mut cursor = Cursor::new(&rsp[..]);
let deserialized_request: RepairProtocol =
deserialize_from_with_limit(&mut cursor).unwrap();
assert_eq!(cursor.position(), rsp.len() as u64);
if let RepairProtocol::Orphan { header, slot } = deserialized_request {
assert_eq!(slot, 123);
assert_eq!(header.nonce, 456);
assert_eq!(&header.sender, &serve_repair.my_id());
assert_eq!(&header.recipient, &repair_peer_id);
let signed_data = [&rsp[..4], &rsp[4 + SIGNATURE_BYTES..]].concat();
assert!(
header
.signature
.verify(keypair.pubkey().as_ref(), &signed_data)
);
} else {
panic!("unexpected request type {:?}", &deserialized_request);
}
}
#[test]
fn test_serialize_deserialize_ancestor_hashes_request() {
let slot: Slot = 50;
let nonce = 70;
let cluster_info = Arc::new(new_test_cluster_info());
let repair_peer_id = solana_pubkey::new_rand();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let keypair = cluster_info.keypair();
let mut bank = Bank::new_for_tests(&genesis_config);
bank.feature_set = Arc::new(FeatureSet::all_enabled());
let bank_forks = BankForks::new_rw_arc(bank);
let serve_repair = ServeRepair::new_for_test(
cluster_info,
bank_forks,
Arc::new(RwLock::new(HashSet::default())),
);
let request_bytes = serve_repair
.ancestor_repair_request_bytes(&keypair, &repair_peer_id, slot, nonce)
.unwrap();
let mut cursor = Cursor::new(&request_bytes[..]);
let deserialized_request: RepairProtocol =
deserialize_from_with_limit(&mut cursor).unwrap();
assert_eq!(cursor.position(), request_bytes.len() as u64);
if let RepairProtocol::AncestorHashes {
header,
slot: deserialized_slot,
} = deserialized_request
{
assert_eq!(deserialized_slot, slot);
assert_eq!(header.nonce, nonce);
assert_eq!(&header.sender, &serve_repair.my_id());
assert_eq!(&header.recipient, &repair_peer_id);
let signed_data = [&request_bytes[..4], &request_bytes[4 + SIGNATURE_BYTES..]].concat();
assert!(
header
.signature
.verify(keypair.pubkey().as_ref(), &signed_data)
);
} else {
panic!("unexpected request type {:?}", &deserialized_request);
}
}
#[test]
fn test_map_requests_signed() {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(bank);
let cluster_info = Arc::new(new_test_cluster_info());
let serve_repair = ServeRepair::new_for_test(
cluster_info.clone(),
bank_forks,
Arc::new(RwLock::new(HashSet::default())),
);
let keypair = cluster_info.keypair();
let repair_peer_id = solana_pubkey::new_rand();
let slot = 50;
let shred_index = 60;
let nonce = 70;
let request = ShredRepairType::Shred(slot, shred_index);
let request_bytes = serve_repair
.map_repair_request(
&request,
&repair_peer_id,
&mut RepairStats::default(),
nonce,
&keypair,
)
.unwrap();
let mut cursor = Cursor::new(&request_bytes[..]);
let deserialized_request: RepairProtocol =
deserialize_from_with_limit(&mut cursor).unwrap();
assert_eq!(cursor.position(), request_bytes.len() as u64);
if let RepairProtocol::WindowIndex {
header,
slot: deserialized_slot,
shred_index: deserialized_shred_index,
} = deserialized_request
{
assert_eq!(deserialized_slot, slot);
assert_eq!(deserialized_shred_index, shred_index);
assert_eq!(header.nonce, nonce);
assert_eq!(&header.sender, &serve_repair.my_id());
assert_eq!(&header.recipient, &repair_peer_id);
let signed_data = [&request_bytes[..4], &request_bytes[4 + SIGNATURE_BYTES..]].concat();
assert!(
header
.signature
.verify(keypair.pubkey().as_ref(), &signed_data)
);
} else {
panic!("unexpected request type {:?}", &deserialized_request);
}
let request = ShredRepairType::HighestShred(slot, shred_index);
let request_bytes = serve_repair
.map_repair_request(
&request,
&repair_peer_id,
&mut RepairStats::default(),
nonce,
&keypair,
)
.unwrap();
let mut cursor = Cursor::new(&request_bytes[..]);
let deserialized_request: RepairProtocol =
deserialize_from_with_limit(&mut cursor).unwrap();
assert_eq!(cursor.position(), request_bytes.len() as u64);
if let RepairProtocol::HighestWindowIndex {
header,
slot: deserialized_slot,
shred_index: deserialized_shred_index,
} = deserialized_request
{
assert_eq!(deserialized_slot, slot);
assert_eq!(deserialized_shred_index, shred_index);
assert_eq!(header.nonce, nonce);
assert_eq!(&header.sender, &serve_repair.my_id());
assert_eq!(&header.recipient, &repair_peer_id);
let signed_data = [&request_bytes[..4], &request_bytes[4 + SIGNATURE_BYTES..]].concat();
assert!(
header
.signature
.verify(keypair.pubkey().as_ref(), &signed_data)
);
} else {
panic!("unexpected request type {:?}", &deserialized_request);
}
}
#[test]
fn test_verify_signed_packet() {
let my_keypair = Keypair::new();
let other_keypair = Keypair::new();
fn sign_packet(packet: &mut Packet, keypair: &Keypair) {
let signable_data = [
packet.data(..4).unwrap(),
packet.data(4 + SIGNATURE_BYTES..).unwrap(),
]
.concat();
let signature = keypair.sign_message(&signable_data[..]);
packet.buffer_mut()[4..4 + SIGNATURE_BYTES].copy_from_slice(signature.as_ref());
}
let packet = {
let header = RepairRequestHeader::new(
my_keypair.pubkey(),
other_keypair.pubkey(),
timestamp(),
678,
);
let slot = 239847;
let request = RepairProtocol::Orphan { header, slot };
let mut packet = Packet::from_data(None, request).unwrap();
sign_packet(&mut packet, &my_keypair);
packet
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
assert_matches!(
ServeRepair::verify_signed_packet(
&other_keypair.pubkey(),
packet.data(..).unwrap(),
&request
),
Ok(())
);
let packet = {
let header = RepairRequestHeader::new(
my_keypair.pubkey(),
other_keypair.pubkey(),
timestamp(),
678,
);
let slot = 239847;
let request = RepairProtocol::Orphan { header, slot };
let mut packet = Packet::from_data(None, request).unwrap();
sign_packet(&mut packet, &my_keypair);
packet
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
assert_matches!(
ServeRepair::verify_signed_packet(
&my_keypair.pubkey(),
packet.data(..).unwrap(),
&request
),
Err(Error::RepairVerify(RepairVerifyError::IdMismatch))
);
let packet = {
let time_diff_ms = u64::try_from(SIGNED_REPAIR_TIME_WINDOW.as_millis() * 2).unwrap();
let old_timestamp = timestamp().saturating_sub(time_diff_ms);
let header = RepairRequestHeader::new(
my_keypair.pubkey(),
other_keypair.pubkey(),
old_timestamp,
678,
);
let slot = 239847;
let request = RepairProtocol::Orphan { header, slot };
let mut packet = Packet::from_data(None, request).unwrap();
sign_packet(&mut packet, &my_keypair);
packet
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
assert_matches!(
ServeRepair::verify_signed_packet(
&other_keypair.pubkey(),
packet.data(..).unwrap(),
&request
),
Err(Error::RepairVerify(RepairVerifyError::TimeSkew))
);
let packet = {
let header = RepairRequestHeader::new(
my_keypair.pubkey(),
other_keypair.pubkey(),
timestamp(),
678,
);
let slot = 239847;
let request = RepairProtocol::Orphan { header, slot };
let mut packet = Packet::from_data(None, request).unwrap();
sign_packet(&mut packet, &other_keypair);
packet
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
assert_matches!(
ServeRepair::verify_signed_packet(
&other_keypair.pubkey(),
packet.data(..).unwrap(),
&request
),
Err(Error::RepairVerify(RepairVerifyError::SigVerify))
);
}
#[test]
fn test_run_highest_window_request() {
run_highest_window_request(5, 3, 9);
}
pub fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) {
let recycler = PacketBatchRecycler::default();
agave_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let handler = StandardRepairHandler::new(blockstore.clone());
let rv = handler.run_highest_window_request(&recycler, &socketaddr_any!(), 0, 0, nonce);
assert!(rv.is_none());
let _ = fill_blockstore_slot_with_ticks(
&blockstore,
max_ticks_per_n_shreds(1, None) + 1,
slot,
slot - num_slots + 1,
Hash::default(),
);
let index = 1;
let mut rv = handler
.run_highest_window_request(&recycler, &socketaddr_any!(), slot, index, nonce)
.expect("packets");
let request = ShredRepairType::HighestShred(slot, index);
verify_responses(&request, rv.iter());
let rv: Vec<Shred> = rv
.iter_mut()
.map(|mut packet| {
packet.meta_mut().flags |= PacketFlags::REPAIR;
let (shred, repair_nonce) =
shred::layout::get_shred_and_repair_nonce(packet.as_ref()).unwrap();
assert_eq!(repair_nonce.unwrap(), nonce);
Shred::new_from_serialized_shred(shred.to_vec()).unwrap()
})
.collect();
assert!(!rv.is_empty());
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
assert_eq!(rv[0].index(), index as u32);
assert_eq!(rv[0].slot(), slot);
let rv = handler.run_highest_window_request(
&recycler,
&socketaddr_any!(),
slot,
index + 1,
nonce,
);
assert!(rv.is_none());
}
#[test]
fn test_run_window_request() {
let slot = 2;
let nonce = 9;
let recycler = PacketBatchRecycler::default();
agave_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let handler = StandardRepairHandler::new(blockstore.clone());
let rv = handler.run_window_request(&recycler, &socketaddr_any!(), slot, 0, nonce);
assert!(rv.is_none());
let shredder = Shredder::new(slot, slot - 1, 0, 2).unwrap();
let keypair = Keypair::new();
let reed_solomon_cache = ReedSolomonCache::default();
let index = 1;
let (mut shreds, _) = shredder.entries_to_merkle_shreds_for_tests(
&keypair,
&[],
true,
Hash::default(),
index as u32,
index as u32,
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
shreds.truncate(1);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
let mut rv = handler
.run_window_request(&recycler, &socketaddr_any!(), slot, index, nonce)
.expect("packets");
let request = ShredRepairType::Shred(slot, index);
verify_responses(&request, rv.iter());
let rv: Vec<Shred> = rv
.iter_mut()
.map(|mut packet| {
packet.meta_mut().flags |= PacketFlags::REPAIR;
let (shred, repair_nonce) =
shred::layout::get_shred_and_repair_nonce(packet.as_ref()).unwrap();
assert_eq!(repair_nonce.unwrap(), nonce);
Shred::new_from_serialized_shred(shred.to_vec()).unwrap()
})
.collect();
assert_eq!(rv[0].index(), 1);
assert_eq!(rv[0].slot(), slot);
}
fn new_test_cluster_info() -> ClusterInfo {
let keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified)
}
#[test]
fn window_index_request() {
use Protocol::{QUIC, UDP};
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(bank);
let cluster_slots = ClusterSlots::default_for_tests();
let cluster_info = Arc::new(new_test_cluster_info());
let serve_repair = ServeRepair::new_for_test(
cluster_info.clone(),
bank_forks,
Arc::new(RwLock::new(HashSet::default())),
);
let identity_keypair = cluster_info.keypair();
let mut outstanding_requests = OutstandingShredRepairs::default();
let (repair_request_quic_sender, _) = tokio::sync::mpsc::channel( 128);
let rv = serve_repair.repair_request(
&cluster_slots,
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
&identity_keypair,
&repair_request_quic_sender,
Protocol::UDP, );
assert_matches!(rv, Err(Error::ClusterInfo(ClusterInfoError::NoPeers)));
let serve_repair_addr = socketaddr!(Ipv4Addr::LOCALHOST, 1243);
let mut nxt = ContactInfo::new(
solana_pubkey::new_rand(),
timestamp(), 0u16, );
nxt.set_gossip((Ipv4Addr::LOCALHOST, 1234)).unwrap();
nxt.set_tvu(UDP, (Ipv4Addr::LOCALHOST, 1235)).unwrap();
nxt.set_rpc((Ipv4Addr::LOCALHOST, 1241)).unwrap();
nxt.set_rpc_pubsub((Ipv4Addr::LOCALHOST, 1242)).unwrap();
nxt.set_serve_repair(UDP, serve_repair_addr).unwrap();
nxt.set_serve_repair(QUIC, (Ipv4Addr::LOCALHOST, 1237))
.unwrap();
cluster_info.insert_info(nxt.clone());
let rv = serve_repair
.repair_request(
&cluster_slots,
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
&identity_keypair,
&repair_request_quic_sender,
Protocol::UDP, )
.unwrap()
.unwrap();
assert_eq!(nxt.serve_repair(Protocol::UDP).unwrap(), serve_repair_addr);
assert_eq!(rv.0, nxt.serve_repair(Protocol::UDP).unwrap());
let serve_repair_addr2 = socketaddr!([127, 0, 0, 2], 1243);
let mut nxt = ContactInfo::new(
solana_pubkey::new_rand(),
timestamp(), 0u16, );
nxt.set_gossip((Ipv4Addr::LOCALHOST, 1234)).unwrap();
nxt.set_tvu(UDP, (Ipv4Addr::LOCALHOST, 1235)).unwrap();
nxt.set_tvu(QUIC, (Ipv4Addr::LOCALHOST, 1236)).unwrap();
nxt.set_rpc((Ipv4Addr::LOCALHOST, 1241)).unwrap();
nxt.set_rpc_pubsub((Ipv4Addr::LOCALHOST, 1242)).unwrap();
nxt.set_serve_repair(UDP, serve_repair_addr2).unwrap();
nxt.set_serve_repair(QUIC, (Ipv4Addr::LOCALHOST, 1237))
.unwrap();
cluster_info.insert_info(nxt);
let mut one = false;
let mut two = false;
while !one || !two {
let rv = serve_repair
.repair_request(
&cluster_slots,
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
&identity_keypair,
&repair_request_quic_sender,
Protocol::UDP, )
.unwrap()
.unwrap();
if rv.0 == serve_repair_addr {
one = true;
}
if rv.0 == serve_repair_addr2 {
two = true;
}
}
assert!(one && two);
}
#[test]
fn test_run_orphan() {
run_orphan(2, 3, 9);
}
pub fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) {
agave_logger::setup();
let recycler = PacketBatchRecycler::default();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let handler = StandardRepairHandler::new(blockstore.clone());
let rv = handler.run_orphan(&recycler, &socketaddr_any!(), slot, 5, nonce);
assert!(rv.is_none());
let (shreds, _) = make_many_slot_entries(slot, num_slots, 5);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
let rv = handler.run_orphan(&recycler, &socketaddr_any!(), slot + num_slots, 5, nonce);
assert!(rv.is_none());
let rv = handler
.run_orphan(
&recycler,
&socketaddr_any!(),
slot + num_slots - 1,
5,
nonce,
)
.expect("run_orphan packets");
let request = ShredRepairType::Orphan(slot + num_slots - 1);
verify_responses(&request, rv.iter());
let expected: Vec<_> = (slot..slot + num_slots)
.rev()
.filter_map(|slot| {
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
repair_response::repair_response_packet(
&blockstore,
slot,
index,
&socketaddr_any!(),
nonce,
)
})
.collect();
let expected = PacketBatch::Pinned(RecycledPacketBatch::new(expected));
assert_eq!(rv, expected);
}
#[test]
fn run_orphan_corrupted_shred_size() {
agave_logger::setup();
let recycler = PacketBatchRecycler::default();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let (mut shreds, _) = make_many_slot_entries(1, 2, 1);
assert_eq!(shreds[0].slot(), 1);
assert_eq!(shreds[0].index(), 0);
shreds.retain(|shred| shred.slot() != 1);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
let nonce = 42;
assert!(
repair_response::repair_response_packet(&blockstore, 1, 0, &socketaddr_any!(), nonce,)
.is_none()
);
let handler = StandardRepairHandler::new(blockstore.clone());
let rv = handler
.run_orphan(&recycler, &socketaddr_any!(), 2, 5, nonce)
.expect("run_orphan packets");
let expected = RecycledPacketBatch::new(vec![
repair_response::repair_response_packet(
&blockstore,
2,
31, &socketaddr_any!(),
nonce,
)
.unwrap(),
])
.into();
assert_eq!(rv, expected);
}
#[test]
fn test_run_ancestor_hashes() {
fn deserialize_ancestor_hashes_response(packet: PacketRef) -> AncestorHashesResponse {
packet
.deserialize_slice(..packet.meta().size - SIZE_OF_NONCE)
.unwrap()
}
agave_logger::setup();
let recycler = PacketBatchRecycler::default();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let slot = 0;
let num_slots = MAX_ANCESTOR_RESPONSES as u64;
let nonce = 10;
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let (shreds, _) = make_many_slot_entries(slot, num_slots, 5);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
let handler = StandardRepairHandler::new(blockstore.clone());
let rv = handler
.run_ancestor_hashes(&recycler, &socketaddr_any!(), slot + num_slots, nonce)
.expect("run_ancestor_hashes packets");
assert_eq!(rv.len(), 1);
let packet = rv.first().unwrap();
let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet);
match ancestor_hashes_response {
AncestorHashesResponse::Hashes(hashes) => {
assert!(hashes.is_empty());
}
_ => {
panic!("unexpected response: {:?}", &ancestor_hashes_response);
}
}
let rv = handler
.run_ancestor_hashes(&recycler, &socketaddr_any!(), slot + num_slots - 1, nonce)
.expect("run_ancestor_hashes packets");
assert_eq!(rv.len(), 1);
let packet = rv.first().unwrap();
let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet);
match ancestor_hashes_response {
AncestorHashesResponse::Hashes(hashes) => {
assert!(hashes.is_empty());
}
_ => {
panic!("unexpected response: {:?}", &ancestor_hashes_response);
}
}
let mut expected_ancestors = Vec::with_capacity(num_slots as usize);
expected_ancestors.resize(num_slots as usize, (0, Hash::default()));
for (i, duplicate_confirmed_slot) in (slot..slot + num_slots).enumerate() {
let frozen_hash = Hash::new_unique();
expected_ancestors[num_slots as usize - i - 1] =
(duplicate_confirmed_slot, frozen_hash);
blockstore.insert_bank_hash(duplicate_confirmed_slot, frozen_hash, true);
}
let rv = handler
.run_ancestor_hashes(&recycler, &socketaddr_any!(), slot + num_slots - 1, nonce)
.expect("run_ancestor_hashes packets");
assert_eq!(rv.len(), 1);
let packet = rv.first().unwrap();
let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet);
match ancestor_hashes_response {
AncestorHashesResponse::Hashes(hashes) => {
assert_eq!(hashes, expected_ancestors);
}
_ => {
panic!("unexpected response: {:?}", &ancestor_hashes_response);
}
}
}
#[test]
fn test_repair_with_repair_validators() {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(bank);
let cluster_slots = ClusterSlots::default_for_tests();
let cluster_info = Arc::new(new_test_cluster_info());
let me = cluster_info.my_contact_info();
let (repair_request_quic_sender, _) = tokio::sync::mpsc::channel( 128);
let contact_info2 = ContactInfo::new_localhost(&solana_pubkey::new_rand(), timestamp());
let contact_info3 = ContactInfo::new_localhost(&solana_pubkey::new_rand(), timestamp());
cluster_info.insert_info(contact_info2.clone());
cluster_info.insert_info(contact_info3.clone());
let identity_keypair = cluster_info.keypair();
let serve_repair = ServeRepair::new_for_test(
cluster_info,
bank_forks,
Arc::new(RwLock::new(HashSet::default())),
);
for pubkey in &[solana_pubkey::new_rand(), *me.pubkey()] {
let known_validators = Some(vec![*pubkey].into_iter().collect());
assert!(
serve_repair
.repair_peers(&known_validators, 1, &identity_keypair.pubkey())
.is_empty()
);
assert_matches!(
serve_repair.repair_request(
&cluster_slots,
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
&mut RepairStats::default(),
&known_validators,
&mut OutstandingShredRepairs::default(),
&identity_keypair,
&repair_request_quic_sender,
Protocol::UDP, ),
Err(Error::ClusterInfo(ClusterInfoError::NoPeers))
);
}
let known_validators = Some(vec![*contact_info2.pubkey()].into_iter().collect());
let repair_peers =
serve_repair.repair_peers(&known_validators, 1, &identity_keypair.pubkey());
assert_eq!(repair_peers.len(), 1);
assert_eq!(repair_peers[0].pubkey(), contact_info2.pubkey());
assert_matches!(
serve_repair.repair_request(
&cluster_slots,
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
&mut RepairStats::default(),
&known_validators,
&mut OutstandingShredRepairs::default(),
&identity_keypair,
&repair_request_quic_sender,
Protocol::UDP, ),
Ok(Some(_))
);
let repair_peers: HashSet<Pubkey> = serve_repair
.repair_peers(&None, 1, &identity_keypair.pubkey())
.into_iter()
.map(|node| *node.pubkey())
.collect();
assert_eq!(repair_peers.len(), 2);
assert!(repair_peers.contains(contact_info2.pubkey()));
assert!(repair_peers.contains(contact_info3.pubkey()));
assert_matches!(
serve_repair.repair_request(
&cluster_slots,
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut OutstandingShredRepairs::default(),
&identity_keypair,
&repair_request_quic_sender,
Protocol::UDP, ),
Ok(Some(_))
);
}
#[test]
fn test_verify_shred_response() {
fn new_test_data_shred(slot: Slot, index: u32) -> Shred {
let shredder = Shredder::new(slot, slot.saturating_sub(1), 0, 0).unwrap();
let keypair = Keypair::new();
let reed_solomon_cache = ReedSolomonCache::default();
let (mut shreds, _) = shredder.entries_to_merkle_shreds_for_tests(
&keypair,
&[],
true,
Hash::default(),
0,
0,
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
shreds.remove(index as usize)
}
let repair = ShredRepairType::Orphan(9);
match repair {
ShredRepairType::Orphan(_)
| ShredRepairType::HighestShred(_, _)
| ShredRepairType::Shred(_, _) => (),
};
let slot = 9;
let index = 5;
let shred = new_test_data_shred(slot, 0);
let request = ShredRepairType::Orphan(slot);
assert!(request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot - 1, 0);
assert!(request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot + 1, 0);
assert!(!request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot, index);
let request = ShredRepairType::HighestShred(slot, index as u64);
assert!(request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot, index + 1);
assert!(request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot, index - 1);
assert!(!request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot - 1, index);
assert!(!request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot + 1, index);
assert!(!request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot, index);
let request = ShredRepairType::Shred(slot, index as u64);
assert!(request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot, index + 1);
assert!(!request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot + 1, index);
assert!(!request.verify_response(shred.payload()));
}
fn verify_responses<'a>(
request: &ShredRepairType,
packets: impl Iterator<Item = PacketRef<'a>>,
) {
for packet in packets {
let shred = shred::layout::get_shred(packet).unwrap();
assert!(request.verify_response(shred));
}
}
#[test]
fn test_verify_ancestor_response() {
let request_slot = MAX_ANCESTOR_RESPONSES as Slot;
let repair = AncestorHashesRepairType(request_slot);
let mut response: Vec<(Slot, Hash)> = (0..request_slot)
.map(|slot| (slot, Hash::new_unique()))
.collect();
assert!(repair.verify_response(&AncestorHashesResponse::Hashes(response.clone())));
response.push((request_slot, Hash::new_unique()));
assert!(!repair.verify_response(&AncestorHashesResponse::Hashes(response)));
}
}