1use {
6 crate::{
7 cluster_info_vote_listener::VerifiedVoteReceiver,
8 completed_data_sets_service::CompletedDataSetsSender,
9 repair::{
10 ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
11 repair_service::{
12 DumpedSlotsReceiver, OutstandingShredRepairs, PopularPrunedForksSender, RepairInfo,
13 RepairService,
14 },
15 },
16 result::{Error, Result},
17 },
18 assert_matches::debug_assert_matches,
19 bytes::Bytes,
20 crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
21 rayon::{prelude::*, ThreadPool},
22 solana_feature_set as feature_set,
23 solana_gossip::cluster_info::ClusterInfo,
24 solana_ledger::{
25 blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
26 leader_schedule_cache::LeaderScheduleCache,
27 shred::{self, ReedSolomonCache, Shred},
28 },
29 solana_measure::measure::Measure,
30 solana_metrics::inc_new_counter_error,
31 solana_rayon_threadlimit::get_thread_count,
32 solana_runtime::bank_forks::BankForks,
33 solana_sdk::{
34 clock::{Slot, DEFAULT_MS_PER_SLOT},
35 pubkey::Pubkey,
36 },
37 solana_turbine::cluster_nodes,
38 std::{
39 net::{SocketAddr, UdpSocket},
40 sync::{
41 atomic::{AtomicBool, AtomicUsize, Ordering},
42 Arc, RwLock,
43 },
44 thread::{self, Builder, JoinHandle},
45 time::{Duration, Instant},
46 },
47 tokio::sync::mpsc::Sender as AsyncSender,
48};
49
50type DuplicateSlotSender = Sender<Slot>;
51pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
52
53#[derive(Default)]
54struct WindowServiceMetrics {
55 run_insert_count: u64,
56 num_repairs: AtomicUsize,
57 num_shreds_received: usize,
58 handle_packets_elapsed_us: u64,
59 shred_receiver_elapsed_us: u64,
60 num_errors: u64,
61 num_errors_blockstore: u64,
62 num_errors_cross_beam_recv_timeout: u64,
63 num_errors_other: u64,
64 num_errors_try_crossbeam_send: u64,
65}
66
67impl WindowServiceMetrics {
68 fn report_metrics(&self, metric_name: &'static str) {
69 datapoint_info!(
70 metric_name,
71 (
72 "handle_packets_elapsed_us",
73 self.handle_packets_elapsed_us,
74 i64
75 ),
76 ("run_insert_count", self.run_insert_count as i64, i64),
77 ("num_repairs", self.num_repairs.load(Ordering::Relaxed), i64),
78 ("num_shreds_received", self.num_shreds_received, i64),
79 (
80 "shred_receiver_elapsed_us",
81 self.shred_receiver_elapsed_us as i64,
82 i64
83 ),
84 ("num_errors", self.num_errors, i64),
85 ("num_errors_blockstore", self.num_errors_blockstore, i64),
86 ("num_errors_other", self.num_errors_other, i64),
87 (
88 "num_errors_try_crossbeam_send",
89 self.num_errors_try_crossbeam_send,
90 i64
91 ),
92 (
93 "num_errors_cross_beam_recv_timeout",
94 self.num_errors_cross_beam_recv_timeout,
95 i64
96 ),
97 );
98 }
99
100 fn record_error(&mut self, err: &Error) {
101 self.num_errors += 1;
102 match err {
103 Error::TrySend => self.num_errors_try_crossbeam_send += 1,
104 Error::RecvTimeout(_) => self.num_errors_cross_beam_recv_timeout += 1,
105 Error::Blockstore(err) => {
106 self.num_errors_blockstore += 1;
107 error!("blockstore error: {}", err);
108 }
109 _ => self.num_errors_other += 1,
110 }
111 }
112}
113
114fn run_check_duplicate(
115 cluster_info: &ClusterInfo,
116 blockstore: &Blockstore,
117 shred_receiver: &Receiver<PossibleDuplicateShred>,
118 duplicate_slots_sender: &DuplicateSlotSender,
119 bank_forks: &RwLock<BankForks>,
120) -> Result<()> {
121 let mut root_bank = bank_forks.read().unwrap().root_bank();
122 let mut last_updated = Instant::now();
123 let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
124 if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
125 last_updated = Instant::now();
127 root_bank = bank_forks.read().unwrap().root_bank();
128 }
129 let shred_slot = shred.slot();
130 let chained_merkle_conflict_duplicate_proofs = cluster_nodes::check_feature_activation(
131 &feature_set::chained_merkle_conflict_duplicate_proofs::id(),
132 shred_slot,
133 &root_bank,
134 );
135 let (shred1, shred2) = match shred {
136 PossibleDuplicateShred::LastIndexConflict(shred, conflict)
137 | PossibleDuplicateShred::ErasureConflict(shred, conflict)
138 | PossibleDuplicateShred::MerkleRootConflict(shred, conflict) => (shred, conflict),
139 PossibleDuplicateShred::ChainedMerkleRootConflict(shred, conflict) => {
140 if chained_merkle_conflict_duplicate_proofs {
141 if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
145 return Ok(());
146 }
147 blockstore.store_duplicate_slot(
148 shred_slot,
149 conflict.clone(),
150 shred.clone().into_payload(),
151 )?;
152 (shred, conflict)
153 } else {
154 return Ok(());
155 }
156 }
157 PossibleDuplicateShred::Exists(shred) => {
158 if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
162 return Ok(()); }
164 let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) else {
165 return Ok(()); };
167 blockstore.store_duplicate_slot(
168 shred_slot,
169 existing_shred_payload.clone(),
170 shred.clone().into_payload(),
171 )?;
172 (shred, shred::Payload::from(existing_shred_payload))
173 }
174 };
175
176 cluster_info.push_duplicate_shred(&shred1, &shred2)?;
178 duplicate_slots_sender.send(shred_slot)?;
180
181 Ok(())
182 };
183 const RECV_TIMEOUT: Duration = Duration::from_millis(200);
184 std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?)
185 .chain(shred_receiver.try_iter())
186 .try_for_each(check_duplicate)
187}
188
189#[allow(clippy::too_many_arguments)]
190fn run_insert<F>(
191 thread_pool: &ThreadPool,
192 verified_receiver: &Receiver<Vec<(shred::Payload, bool)>>,
193 blockstore: &Blockstore,
194 leader_schedule_cache: &LeaderScheduleCache,
195 handle_duplicate: F,
196 metrics: &mut BlockstoreInsertionMetrics,
197 ws_metrics: &mut WindowServiceMetrics,
198 completed_data_sets_sender: Option<&CompletedDataSetsSender>,
199 retransmit_sender: &Sender<Vec<shred::Payload>>,
200 reed_solomon_cache: &ReedSolomonCache,
201 accept_repairs_only: bool,
202) -> Result<()>
203where
204 F: Fn(PossibleDuplicateShred),
205{
206 const RECV_TIMEOUT: Duration = Duration::from_millis(200);
207 let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
208 let mut shreds = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
209 shreds.extend(verified_receiver.try_iter().flatten());
210 shred_receiver_elapsed.stop();
211 ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
212 ws_metrics.run_insert_count += 1;
213 let handle_shred = |(shred, repair): (shred::Payload, bool)| {
214 if accept_repairs_only && !repair {
215 return None;
216 }
217 if repair {
218 ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed);
219 debug_assert_matches!(shred, shred::Payload::Unique(_));
220 } else {
221 debug_assert_matches!(shred, shred::Payload::Shared(_));
222 }
223 let shred = Shred::new_from_serialized_shred(shred).ok()?;
224 Some((shred, repair))
225 };
226 let now = Instant::now();
227 let shreds: Vec<_> = thread_pool.install(|| {
228 shreds
229 .into_par_iter()
230 .with_min_len(32)
231 .filter_map(handle_shred)
232 .collect()
233 });
234 ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
235 ws_metrics.num_shreds_received += shreds.len();
236 let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
237 shreds,
238 Some(leader_schedule_cache),
239 false, retransmit_sender,
241 &handle_duplicate,
242 reed_solomon_cache,
243 metrics,
244 )?;
245
246 if let Some(sender) = completed_data_sets_sender {
247 sender.try_send(completed_data_sets)?;
248 }
249
250 Ok(())
251}
252
253pub(crate) struct WindowService {
254 t_insert: JoinHandle<()>,
255 t_check_duplicate: JoinHandle<()>,
256 repair_service: RepairService,
257}
258
259impl WindowService {
260 #[allow(clippy::too_many_arguments)]
261 pub(crate) fn new(
262 blockstore: Arc<Blockstore>,
263 verified_receiver: Receiver<Vec<(shred::Payload, bool)>>,
264 retransmit_sender: Sender<Vec<shred::Payload>>,
265 repair_socket: Arc<UdpSocket>,
266 ancestor_hashes_socket: Arc<UdpSocket>,
267 repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
268 ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
269 ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
270 exit: Arc<AtomicBool>,
271 repair_info: RepairInfo,
272 leader_schedule_cache: Arc<LeaderScheduleCache>,
273 verified_vote_receiver: VerifiedVoteReceiver,
274 completed_data_sets_sender: Option<CompletedDataSetsSender>,
275 duplicate_slots_sender: DuplicateSlotSender,
276 ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
277 dumped_slots_receiver: DumpedSlotsReceiver,
278 popular_pruned_forks_sender: PopularPrunedForksSender,
279 outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
280 ) -> WindowService {
281 let cluster_info = repair_info.cluster_info.clone();
282 let bank_forks = repair_info.bank_forks.clone();
283
284 let accept_repairs_only = repair_info.wen_restart_repair_slots.is_some();
287
288 let repair_service = RepairService::new(
289 blockstore.clone(),
290 exit.clone(),
291 repair_socket,
292 ancestor_hashes_socket,
293 repair_request_quic_sender,
294 ancestor_hashes_request_quic_sender,
295 ancestor_hashes_response_quic_receiver,
296 repair_info,
297 verified_vote_receiver,
298 outstanding_repair_requests.clone(),
299 ancestor_hashes_replay_update_receiver,
300 dumped_slots_receiver,
301 popular_pruned_forks_sender,
302 );
303
304 let (duplicate_sender, duplicate_receiver) = unbounded();
305
306 let t_check_duplicate = Self::start_check_duplicate_thread(
307 cluster_info,
308 exit.clone(),
309 blockstore.clone(),
310 duplicate_receiver,
311 duplicate_slots_sender,
312 bank_forks,
313 );
314
315 let t_insert = Self::start_window_insert_thread(
316 exit,
317 blockstore,
318 leader_schedule_cache,
319 verified_receiver,
320 duplicate_sender,
321 completed_data_sets_sender,
322 retransmit_sender,
323 accept_repairs_only,
324 );
325
326 WindowService {
327 t_insert,
328 t_check_duplicate,
329 repair_service,
330 }
331 }
332
333 fn start_check_duplicate_thread(
334 cluster_info: Arc<ClusterInfo>,
335 exit: Arc<AtomicBool>,
336 blockstore: Arc<Blockstore>,
337 duplicate_receiver: Receiver<PossibleDuplicateShred>,
338 duplicate_slots_sender: DuplicateSlotSender,
339 bank_forks: Arc<RwLock<BankForks>>,
340 ) -> JoinHandle<()> {
341 let handle_error = || {
342 inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
343 };
344 Builder::new()
345 .name("solWinCheckDup".to_string())
346 .spawn(move || {
347 while !exit.load(Ordering::Relaxed) {
348 if let Err(e) = run_check_duplicate(
349 &cluster_info,
350 &blockstore,
351 &duplicate_receiver,
352 &duplicate_slots_sender,
353 &bank_forks,
354 ) {
355 if Self::should_exit_on_error(e, &handle_error) {
356 break;
357 }
358 }
359 }
360 })
361 .unwrap()
362 }
363
364 fn start_window_insert_thread(
365 exit: Arc<AtomicBool>,
366 blockstore: Arc<Blockstore>,
367 leader_schedule_cache: Arc<LeaderScheduleCache>,
368 verified_receiver: Receiver<Vec<(shred::Payload, bool)>>,
369 check_duplicate_sender: Sender<PossibleDuplicateShred>,
370 completed_data_sets_sender: Option<CompletedDataSetsSender>,
371 retransmit_sender: Sender<Vec<shred::Payload>>,
372 accept_repairs_only: bool,
373 ) -> JoinHandle<()> {
374 let handle_error = || {
375 inc_new_counter_error!("solana-window-insert-error", 1, 1);
376 };
377 let reed_solomon_cache = ReedSolomonCache::default();
378 Builder::new()
379 .name("solWinInsert".to_string())
380 .spawn(move || {
381 let thread_pool = rayon::ThreadPoolBuilder::new()
382 .num_threads(get_thread_count().min(8))
383 .use_current_thread()
387 .thread_name(|i| format!("solWinInsert{i:02}"))
388 .build()
389 .unwrap();
390 let handle_duplicate = |possible_duplicate_shred| {
391 let _ = check_duplicate_sender.send(possible_duplicate_shred);
392 };
393 let mut metrics = BlockstoreInsertionMetrics::default();
394 let mut ws_metrics = WindowServiceMetrics::default();
395 let mut last_print = Instant::now();
396 while !exit.load(Ordering::Relaxed) {
397 if let Err(e) = run_insert(
398 &thread_pool,
399 &verified_receiver,
400 &blockstore,
401 &leader_schedule_cache,
402 handle_duplicate,
403 &mut metrics,
404 &mut ws_metrics,
405 completed_data_sets_sender.as_ref(),
406 &retransmit_sender,
407 &reed_solomon_cache,
408 accept_repairs_only,
409 ) {
410 ws_metrics.record_error(&e);
411 if Self::should_exit_on_error(e, &handle_error) {
412 break;
413 }
414 }
415
416 if last_print.elapsed().as_secs() > 2 {
417 metrics.report_metrics("blockstore-insert-shreds");
418 metrics = BlockstoreInsertionMetrics::default();
419 ws_metrics.report_metrics("recv-window-insert-shreds");
420 ws_metrics = WindowServiceMetrics::default();
421 last_print = Instant::now();
422 }
423 }
424 })
425 .unwrap()
426 }
427
428 fn should_exit_on_error<H>(e: Error, handle_error: &H) -> bool
429 where
430 H: Fn(),
431 {
432 match e {
433 Error::RecvTimeout(RecvTimeoutError::Disconnected) => true,
434 Error::RecvTimeout(RecvTimeoutError::Timeout) => false,
435 Error::Send => true,
436 _ => {
437 handle_error();
438 error!("thread {:?} error {:?}", thread::current().name(), e);
439 false
440 }
441 }
442 }
443
444 pub(crate) fn join(self) -> thread::Result<()> {
445 self.t_insert.join()?;
446 self.t_check_duplicate.join()?;
447 self.repair_service.join()
448 }
449}
450
451#[cfg(test)]
452mod test {
453 use {
454 super::*,
455 rand::Rng,
456 solana_entry::entry::{create_ticks, Entry},
457 solana_gossip::contact_info::ContactInfo,
458 solana_ledger::{
459 blockstore::{make_many_slot_entries, Blockstore},
460 genesis_utils::create_genesis_config,
461 get_tmp_ledger_path_auto_delete,
462 shred::{ProcessShredsStats, Shredder},
463 },
464 solana_runtime::bank::Bank,
465 solana_sdk::{
466 hash::Hash,
467 signature::{Keypair, Signer},
468 timing::timestamp,
469 },
470 solana_streamer::socket::SocketAddrSpace,
471 };
472
473 fn local_entries_to_shred(
474 entries: &[Entry],
475 slot: Slot,
476 parent: Slot,
477 keypair: &Keypair,
478 ) -> Vec<Shred> {
479 let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
480 let (data_shreds, _) = shredder.entries_to_shreds(
481 keypair,
482 entries,
483 true, Some(Hash::new_from_array(rand::thread_rng().gen())),
486 0, 0, true, &ReedSolomonCache::default(),
490 &mut ProcessShredsStats::default(),
491 );
492 data_shreds
493 }
494
495 #[test]
496 fn test_process_shred() {
497 let ledger_path = get_tmp_ledger_path_auto_delete!();
498 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
499 let num_entries = 10;
500 let original_entries = create_ticks(num_entries, 0, Hash::default());
501 let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new());
502 shreds.reverse();
503 blockstore
504 .insert_shreds(shreds, None, false)
505 .expect("Expect successful processing of shred");
506
507 assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);
508 }
509
510 #[test]
511 fn test_run_check_duplicate() {
512 let ledger_path = get_tmp_ledger_path_auto_delete!();
513 let genesis_config = create_genesis_config(10_000).genesis_config;
514 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
515 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
516 let (sender, receiver) = unbounded();
517 let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
518 let (shreds, _) = make_many_slot_entries(5, 5, 10);
519 blockstore
520 .insert_shreds(shreds.clone(), None, false)
521 .unwrap();
522 let duplicate_index = 0;
523 let original_shred = shreds[duplicate_index].clone();
524 let duplicate_shred = {
525 let (mut shreds, _) = make_many_slot_entries(5, 1, 10);
526 shreds.swap_remove(duplicate_index)
527 };
528 assert_eq!(duplicate_shred.slot(), shreds[0].slot());
529 let duplicate_shred_slot = duplicate_shred.slot();
530 sender
531 .send(PossibleDuplicateShred::Exists(duplicate_shred.clone()))
532 .unwrap();
533 assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
534 let keypair = Keypair::new();
535 let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
536 let cluster_info = ClusterInfo::new(
537 contact_info,
538 Arc::new(keypair),
539 SocketAddrSpace::Unspecified,
540 );
541 run_check_duplicate(
542 &cluster_info,
543 &blockstore,
544 &receiver,
545 &duplicate_slot_sender,
546 &bank_forks,
547 )
548 .unwrap();
549
550 let duplicate_proof = blockstore.get_duplicate_slot(duplicate_shred_slot).unwrap();
552 assert_eq!(duplicate_proof.shred1, *original_shred.payload());
553 assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
554
555 assert_eq!(
557 duplicate_slot_receiver.try_recv().unwrap(),
558 duplicate_shred_slot
559 );
560 }
561
562 #[test]
563 fn test_store_duplicate_shreds_same_batch() {
564 let ledger_path = get_tmp_ledger_path_auto_delete!();
565 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
566 let (duplicate_shred_sender, duplicate_shred_receiver) = unbounded();
567 let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
568 let exit = Arc::new(AtomicBool::new(false));
569 let keypair = Keypair::new();
570 let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
571 let cluster_info = Arc::new(ClusterInfo::new(
572 contact_info,
573 Arc::new(keypair),
574 SocketAddrSpace::Unspecified,
575 ));
576 let genesis_config = create_genesis_config(10_000).genesis_config;
577 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
578
579 let t_check_duplicate = WindowService::start_check_duplicate_thread(
581 cluster_info,
582 exit.clone(),
583 blockstore.clone(),
584 duplicate_shred_receiver,
585 duplicate_slot_sender,
586 bank_forks,
587 );
588
589 let handle_duplicate = |shred| {
590 let _ = duplicate_shred_sender.send(shred);
591 };
592 let num_trials = 100;
593 let (dummy_retransmit_sender, _) = crossbeam_channel::bounded(0);
594 for slot in 0..num_trials {
595 let (shreds, _) = make_many_slot_entries(slot, 1, 10);
596 let duplicate_index = 0;
597 let original_shred = shreds[duplicate_index].clone();
598 let duplicate_shred = {
599 let (mut shreds, _) = make_many_slot_entries(slot, 1, 10);
600 shreds.swap_remove(duplicate_index)
601 };
602 assert_eq!(duplicate_shred.slot(), slot);
603 let shreds = [original_shred.clone(), duplicate_shred.clone()]
605 .into_iter()
606 .map(|shred| (shred, false));
607 blockstore
608 .insert_shreds_handle_duplicate(
609 shreds,
610 None,
611 false, &dummy_retransmit_sender,
613 &handle_duplicate,
614 &ReedSolomonCache::default(),
615 &mut BlockstoreInsertionMetrics::default(),
616 )
617 .unwrap();
618
619 assert_eq!(
621 duplicate_slot_receiver
622 .recv_timeout(Duration::from_millis(5_000))
623 .unwrap(),
624 slot
625 );
626
627 let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap();
629 assert_eq!(duplicate_proof.shred1, *original_shred.payload());
630 assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
631 }
632 exit.store(true, Ordering::Relaxed);
633 t_check_duplicate.join().unwrap();
634 }
635}