use {
crate::{
completed_data_sets_service::CompletedDataSetsSender,
repair::repair_service::{
OutstandingShredRepairs, RepairInfo, RepairService, RepairServiceChannels,
},
result::{Error, Result},
},
agave_feature_set as feature_set,
crossbeam_channel::{Receiver, RecvTimeoutError, Sender, unbounded},
rayon::{ThreadPool, prelude::*},
solana_clock::{DEFAULT_MS_PER_SLOT, Slot},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
leader_schedule_cache::LeaderScheduleCache,
shred::{self, ReedSolomonCache, Shred},
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
solana_rayon_threadlimit::get_thread_count,
solana_runtime::bank_forks::BankForks,
solana_streamer::evicting_sender::EvictingSender,
solana_turbine::cluster_nodes,
std::{
borrow::Cow,
net::UdpSocket,
sync::{
Arc, RwLock,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
};
type DuplicateSlotSender = Sender<Slot>;
pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
#[derive(Default)]
struct WindowServiceMetrics {
run_insert_count: u64,
num_repairs: AtomicUsize,
num_shreds_received: usize,
handle_packets_elapsed_us: u64,
shred_receiver_elapsed_us: u64,
num_errors: u64,
num_errors_blockstore: u64,
num_errors_cross_beam_recv_timeout: u64,
num_errors_other: u64,
num_errors_try_crossbeam_send: u64,
}
impl WindowServiceMetrics {
fn report_metrics(&self, metric_name: &'static str) {
datapoint_info!(
metric_name,
(
"handle_packets_elapsed_us",
self.handle_packets_elapsed_us,
i64
),
("run_insert_count", self.run_insert_count as i64, i64),
("num_repairs", self.num_repairs.load(Ordering::Relaxed), i64),
("num_shreds_received", self.num_shreds_received, i64),
(
"shred_receiver_elapsed_us",
self.shred_receiver_elapsed_us as i64,
i64
),
("num_errors", self.num_errors, i64),
("num_errors_blockstore", self.num_errors_blockstore, i64),
("num_errors_other", self.num_errors_other, i64),
(
"num_errors_try_crossbeam_send",
self.num_errors_try_crossbeam_send,
i64
),
(
"num_errors_cross_beam_recv_timeout",
self.num_errors_cross_beam_recv_timeout,
i64
),
);
}
fn record_error(&mut self, err: &Error) {
self.num_errors += 1;
match err {
Error::TrySend => self.num_errors_try_crossbeam_send += 1,
Error::RecvTimeout(_) => self.num_errors_cross_beam_recv_timeout += 1,
Error::Blockstore(err) => {
self.num_errors_blockstore += 1;
error!("blockstore error: {err}");
}
_ => self.num_errors_other += 1,
}
}
}
fn run_check_duplicate(
cluster_info: &ClusterInfo,
blockstore: &Blockstore,
shred_receiver: &Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: &DuplicateSlotSender,
bank_forks: &RwLock<BankForks>,
) -> Result<()> {
let mut root_bank = bank_forks.read().unwrap().root_bank();
let mut last_updated = Instant::now();
let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
last_updated = Instant::now();
root_bank = bank_forks.read().unwrap().root_bank();
}
let shred_slot = shred.slot();
let chained_merkle_conflict_duplicate_proofs = cluster_nodes::check_feature_activation(
&feature_set::chained_merkle_conflict_duplicate_proofs::id(),
shred_slot,
&root_bank,
);
let (shred1, shred2) = match shred {
PossibleDuplicateShred::LastIndexConflict(shred, conflict)
| PossibleDuplicateShred::ErasureConflict(shred, conflict)
| PossibleDuplicateShred::MerkleRootConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::ChainedMerkleRootConflict(shred, conflict) => {
if chained_merkle_conflict_duplicate_proofs {
if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
return Ok(());
}
blockstore.store_duplicate_slot(
shred_slot,
conflict.clone(),
shred.clone().into_payload(),
)?;
(shred, conflict)
} else {
return Ok(());
}
}
PossibleDuplicateShred::Exists(shred) => {
if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
return Ok(()); }
let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) else {
return Ok(()); };
blockstore.store_duplicate_slot(
shred_slot,
existing_shred_payload.clone(),
shred.clone().into_payload(),
)?;
(shred, shred::Payload::from(existing_shred_payload))
}
};
cluster_info.push_duplicate_shred(&shred1, &shred2)?;
duplicate_slots_sender.send(shred_slot)?;
Ok(())
};
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?)
.chain(shred_receiver.try_iter())
.try_for_each(check_duplicate)
}
#[allow(clippy::too_many_arguments)]
fn run_insert<F>(
thread_pool: &ThreadPool,
verified_receiver: &Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
blockstore: &Blockstore,
leader_schedule_cache: &LeaderScheduleCache,
handle_duplicate: F,
metrics: &mut BlockstoreInsertionMetrics,
ws_metrics: &mut WindowServiceMetrics,
completed_data_sets_sender: Option<&CompletedDataSetsSender>,
retransmit_sender: &EvictingSender<Vec<shred::Payload>>,
reed_solomon_cache: &ReedSolomonCache,
) -> Result<()>
where
F: Fn(PossibleDuplicateShred),
{
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
let mut shreds = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
shreds.extend(verified_receiver.try_iter().flatten());
shred_receiver_elapsed.stop();
ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
ws_metrics.run_insert_count += 1;
let handle_shred = |(shred, repair): (shred::Payload, bool)| {
if repair {
ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed);
}
let shred = Shred::new_from_serialized_shred(shred).ok()?;
Some((Cow::Owned(shred), repair))
};
let now = Instant::now();
let shreds: Vec<_> = thread_pool.install(|| {
shreds
.into_par_iter()
.with_min_len(32)
.filter_map(handle_shred)
.collect()
});
ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
ws_metrics.num_shreds_received += shreds.len();
let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
shreds,
Some(leader_schedule_cache),
false, retransmit_sender,
&handle_duplicate,
reed_solomon_cache,
metrics,
)?;
if let Some(sender) = completed_data_sets_sender {
sender.try_send(completed_data_sets)?;
}
Ok(())
}
pub struct WindowServiceChannels {
pub verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
pub retransmit_sender: EvictingSender<Vec<shred::Payload>>,
pub completed_data_sets_sender: Option<CompletedDataSetsSender>,
pub duplicate_slots_sender: DuplicateSlotSender,
pub repair_service_channels: RepairServiceChannels,
}
impl WindowServiceChannels {
pub fn new(
verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
retransmit_sender: EvictingSender<Vec<shred::Payload>>,
completed_data_sets_sender: Option<CompletedDataSetsSender>,
duplicate_slots_sender: DuplicateSlotSender,
repair_service_channels: RepairServiceChannels,
) -> Self {
Self {
verified_receiver,
retransmit_sender,
completed_data_sets_sender,
duplicate_slots_sender,
repair_service_channels,
}
}
}
pub(crate) struct WindowService {
t_insert: JoinHandle<()>,
t_check_duplicate: JoinHandle<()>,
repair_service: RepairService,
}
impl WindowService {
pub(crate) fn new(
blockstore: Arc<Blockstore>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
repair_info: RepairInfo,
window_service_channels: WindowServiceChannels,
leader_schedule_cache: Arc<LeaderScheduleCache>,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
) -> WindowService {
let cluster_info = repair_info.cluster_info.clone();
let bank_forks = repair_info.bank_forks.clone();
let WindowServiceChannels {
verified_receiver,
retransmit_sender,
completed_data_sets_sender,
duplicate_slots_sender,
repair_service_channels,
} = window_service_channels;
let repair_service = RepairService::new(
blockstore.clone(),
exit.clone(),
repair_socket,
ancestor_hashes_socket,
repair_info,
outstanding_repair_requests,
repair_service_channels,
);
let (duplicate_sender, duplicate_receiver) = unbounded();
let t_check_duplicate = Self::start_check_duplicate_thread(
cluster_info,
exit.clone(),
blockstore.clone(),
duplicate_receiver,
duplicate_slots_sender,
bank_forks,
);
let t_insert = Self::start_window_insert_thread(
exit,
blockstore,
leader_schedule_cache,
verified_receiver,
duplicate_sender,
completed_data_sets_sender,
retransmit_sender,
);
WindowService {
t_insert,
t_check_duplicate,
repair_service,
}
}
fn start_check_duplicate_thread(
cluster_info: Arc<ClusterInfo>,
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
duplicate_receiver: Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: DuplicateSlotSender,
bank_forks: Arc<RwLock<BankForks>>,
) -> JoinHandle<()> {
let handle_error = || {
inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
};
Builder::new()
.name("solWinCheckDup".to_string())
.spawn(move || {
while !exit.load(Ordering::Relaxed) {
if let Err(e) = run_check_duplicate(
&cluster_info,
&blockstore,
&duplicate_receiver,
&duplicate_slots_sender,
&bank_forks,
) {
if Self::should_exit_on_error(e, &handle_error) {
break;
}
}
}
})
.unwrap()
}
fn start_window_insert_thread(
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
check_duplicate_sender: Sender<PossibleDuplicateShred>,
completed_data_sets_sender: Option<CompletedDataSetsSender>,
retransmit_sender: EvictingSender<Vec<shred::Payload>>,
) -> JoinHandle<()> {
let handle_error = || {
inc_new_counter_error!("solana-window-insert-error", 1, 1);
};
let reed_solomon_cache = ReedSolomonCache::default();
Builder::new()
.name("solWinInsert".to_string())
.spawn(move || {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count().min(8))
.use_current_thread()
.thread_name(|i| format!("solWinInsert{i:02}"))
.build()
.unwrap();
let handle_duplicate = |possible_duplicate_shred| {
let _ = check_duplicate_sender.send(possible_duplicate_shred);
};
let mut metrics = BlockstoreInsertionMetrics::default();
let mut ws_metrics = WindowServiceMetrics::default();
let mut last_print = Instant::now();
while !exit.load(Ordering::Relaxed) {
if let Err(e) = run_insert(
&thread_pool,
&verified_receiver,
&blockstore,
&leader_schedule_cache,
handle_duplicate,
&mut metrics,
&mut ws_metrics,
completed_data_sets_sender.as_ref(),
&retransmit_sender,
&reed_solomon_cache,
) {
ws_metrics.record_error(&e);
if Self::should_exit_on_error(e, &handle_error) {
break;
}
}
if last_print.elapsed().as_secs() > 2 {
metrics.report_metrics("blockstore-insert-shreds");
metrics = BlockstoreInsertionMetrics::default();
ws_metrics.report_metrics("recv-window-insert-shreds");
ws_metrics = WindowServiceMetrics::default();
last_print = Instant::now();
}
}
})
.unwrap()
}
fn should_exit_on_error<H>(e: Error, handle_error: &H) -> bool
where
H: Fn(),
{
match e {
Error::RecvTimeout(RecvTimeoutError::Disconnected) => true,
Error::RecvTimeout(RecvTimeoutError::Timeout) => false,
Error::Send => true,
_ => {
handle_error();
error!("thread {:?} error {:?}", thread::current().name(), e);
false
}
}
}
pub(crate) fn join(self) -> thread::Result<()> {
self.t_insert.join()?;
self.t_check_duplicate.join()?;
self.repair_service.join()
}
}
#[cfg(test)]
mod test {
use {
super::*,
rand::Rng,
solana_entry::entry::{Entry, create_ticks},
solana_gossip::contact_info::ContactInfo,
solana_hash::Hash,
solana_keypair::Keypair,
solana_ledger::{
blockstore::{Blockstore, make_many_slot_entries},
genesis_utils::create_genesis_config,
get_tmp_ledger_path_auto_delete,
shred::{ProcessShredsStats, Shredder},
},
solana_net_utils::SocketAddrSpace,
solana_runtime::bank::Bank,
solana_signer::Signer,
solana_time_utils::timestamp,
};
fn local_entries_to_shred(
entries: &[Entry],
slot: Slot,
parent: Slot,
keypair: &Keypair,
) -> Vec<Shred> {
let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
let (data_shreds, _) = shredder.entries_to_merkle_shreds_for_tests(
keypair,
entries,
true, Hash::new_from_array(rand::rng().random()),
0, 0, &ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
data_shreds
}
#[test]
fn test_process_shred() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let num_entries = 10;
let original_entries = create_ticks(num_entries, 0, Hash::default());
let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new());
shreds.reverse();
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful processing of shred");
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);
}
#[test]
fn test_run_check_duplicate() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let genesis_config = create_genesis_config(10_000).genesis_config;
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let (sender, receiver) = unbounded();
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
let (shreds, _) = make_many_slot_entries(5, 5, 10);
blockstore
.insert_shreds(shreds.clone(), None, false)
.unwrap();
let duplicate_index = 0;
let original_shred = shreds[duplicate_index].clone();
let duplicate_shred = {
let (mut shreds, _) = make_many_slot_entries(5, 1, 10);
shreds.swap_remove(duplicate_index)
};
assert_eq!(duplicate_shred.slot(), shreds[0].slot());
let duplicate_shred_slot = duplicate_shred.slot();
sender
.send(PossibleDuplicateShred::Exists(duplicate_shred.clone()))
.unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
let cluster_info = ClusterInfo::new(
contact_info,
Arc::new(keypair),
SocketAddrSpace::Unspecified,
);
run_check_duplicate(
&cluster_info,
&blockstore,
&receiver,
&duplicate_slot_sender,
&bank_forks,
)
.unwrap();
let duplicate_proof = blockstore.get_duplicate_slot(duplicate_shred_slot).unwrap();
assert_eq!(duplicate_proof.shred1, *original_shred.payload());
assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
assert_eq!(
duplicate_slot_receiver.try_recv().unwrap(),
duplicate_shred_slot
);
}
#[test]
fn test_store_duplicate_shreds_same_batch() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let (duplicate_shred_sender, duplicate_shred_receiver) = unbounded();
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
let exit = Arc::new(AtomicBool::new(false));
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
let cluster_info = Arc::new(ClusterInfo::new(
contact_info,
Arc::new(keypair),
SocketAddrSpace::Unspecified,
));
let genesis_config = create_genesis_config(10_000).genesis_config;
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
let t_check_duplicate = WindowService::start_check_duplicate_thread(
cluster_info,
exit.clone(),
blockstore.clone(),
duplicate_shred_receiver,
duplicate_slot_sender,
bank_forks,
);
let handle_duplicate = |shred| {
let _ = duplicate_shred_sender.send(shred);
};
let num_trials = 100;
let (dummy_retransmit_sender, _) = EvictingSender::new_bounded(0);
for slot in 0..num_trials {
let (shreds, _) = make_many_slot_entries(slot, 1, 10);
let duplicate_index = 0;
let original_shred = shreds[duplicate_index].clone();
let duplicate_shred = {
let (mut shreds, _) = make_many_slot_entries(slot, 1, 10);
shreds.swap_remove(duplicate_index)
};
assert_eq!(duplicate_shred.slot(), slot);
let shreds = [&original_shred, &duplicate_shred]
.into_iter()
.map(|shred| (Cow::Borrowed(shred), false));
blockstore
.insert_shreds_handle_duplicate(
shreds,
None,
false, &dummy_retransmit_sender,
&handle_duplicate,
&ReedSolomonCache::default(),
&mut BlockstoreInsertionMetrics::default(),
)
.unwrap();
assert_eq!(
duplicate_slot_receiver
.recv_timeout(Duration::from_millis(5_000))
.unwrap(),
slot
);
let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap();
assert_eq!(duplicate_proof.shred1, *original_shred.payload());
assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
}
exit.store(true, Ordering::Relaxed);
t_check_duplicate.join().unwrap();
}
}