1use {
6 crate::{
7 completed_data_sets_service::CompletedDataSetsSender,
8 repair::repair_service::{
9 OutstandingShredRepairs, RepairInfo, RepairService, RepairServiceChannels,
10 },
11 result::{Error, Result},
12 },
13 agave_feature_set as feature_set,
14 assert_matches::debug_assert_matches,
15 crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
16 rayon::{prelude::*, ThreadPool},
17 solana_clock::{Slot, DEFAULT_MS_PER_SLOT},
18 solana_gossip::cluster_info::ClusterInfo,
19 solana_ledger::{
20 blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
21 leader_schedule_cache::LeaderScheduleCache,
22 shred::{self, ReedSolomonCache, Shred},
23 },
24 solana_measure::measure::Measure,
25 solana_metrics::inc_new_counter_error,
26 solana_rayon_threadlimit::get_thread_count,
27 solana_runtime::bank_forks::BankForks,
28 solana_streamer::evicting_sender::EvictingSender,
29 solana_turbine::cluster_nodes,
30 std::{
31 borrow::Cow,
32 net::UdpSocket,
33 sync::{
34 atomic::{AtomicBool, AtomicUsize, Ordering},
35 Arc, RwLock,
36 },
37 thread::{self, Builder, JoinHandle},
38 time::{Duration, Instant},
39 },
40};
41
42type DuplicateSlotSender = Sender<Slot>;
43pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
44
45#[derive(Default)]
46struct WindowServiceMetrics {
47 run_insert_count: u64,
48 num_repairs: AtomicUsize,
49 num_shreds_received: usize,
50 handle_packets_elapsed_us: u64,
51 shred_receiver_elapsed_us: u64,
52 num_errors: u64,
53 num_errors_blockstore: u64,
54 num_errors_cross_beam_recv_timeout: u64,
55 num_errors_other: u64,
56 num_errors_try_crossbeam_send: u64,
57}
58
59impl WindowServiceMetrics {
60 fn report_metrics(&self, metric_name: &'static str) {
61 datapoint_info!(
62 metric_name,
63 (
64 "handle_packets_elapsed_us",
65 self.handle_packets_elapsed_us,
66 i64
67 ),
68 ("run_insert_count", self.run_insert_count as i64, i64),
69 ("num_repairs", self.num_repairs.load(Ordering::Relaxed), i64),
70 ("num_shreds_received", self.num_shreds_received, i64),
71 (
72 "shred_receiver_elapsed_us",
73 self.shred_receiver_elapsed_us as i64,
74 i64
75 ),
76 ("num_errors", self.num_errors, i64),
77 ("num_errors_blockstore", self.num_errors_blockstore, i64),
78 ("num_errors_other", self.num_errors_other, i64),
79 (
80 "num_errors_try_crossbeam_send",
81 self.num_errors_try_crossbeam_send,
82 i64
83 ),
84 (
85 "num_errors_cross_beam_recv_timeout",
86 self.num_errors_cross_beam_recv_timeout,
87 i64
88 ),
89 );
90 }
91
92 fn record_error(&mut self, err: &Error) {
93 self.num_errors += 1;
94 match err {
95 Error::TrySend => self.num_errors_try_crossbeam_send += 1,
96 Error::RecvTimeout(_) => self.num_errors_cross_beam_recv_timeout += 1,
97 Error::Blockstore(err) => {
98 self.num_errors_blockstore += 1;
99 error!("blockstore error: {}", err);
100 }
101 _ => self.num_errors_other += 1,
102 }
103 }
104}
105
106fn run_check_duplicate(
107 cluster_info: &ClusterInfo,
108 blockstore: &Blockstore,
109 shred_receiver: &Receiver<PossibleDuplicateShred>,
110 duplicate_slots_sender: &DuplicateSlotSender,
111 bank_forks: &RwLock<BankForks>,
112) -> Result<()> {
113 let mut root_bank = bank_forks.read().unwrap().root_bank();
114 let mut last_updated = Instant::now();
115 let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
116 if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
117 last_updated = Instant::now();
119 root_bank = bank_forks.read().unwrap().root_bank();
120 }
121 let shred_slot = shred.slot();
122 let chained_merkle_conflict_duplicate_proofs = cluster_nodes::check_feature_activation(
123 &feature_set::chained_merkle_conflict_duplicate_proofs::id(),
124 shred_slot,
125 &root_bank,
126 );
127 let (shred1, shred2) = match shred {
128 PossibleDuplicateShred::LastIndexConflict(shred, conflict)
129 | PossibleDuplicateShred::ErasureConflict(shred, conflict)
130 | PossibleDuplicateShred::MerkleRootConflict(shred, conflict) => (shred, conflict),
131 PossibleDuplicateShred::ChainedMerkleRootConflict(shred, conflict) => {
132 if chained_merkle_conflict_duplicate_proofs {
133 if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
137 return Ok(());
138 }
139 blockstore.store_duplicate_slot(
140 shred_slot,
141 conflict.clone(),
142 shred.clone().into_payload(),
143 )?;
144 (shred, conflict)
145 } else {
146 return Ok(());
147 }
148 }
149 PossibleDuplicateShred::Exists(shred) => {
150 if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
154 return Ok(()); }
156 let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) else {
157 return Ok(()); };
159 blockstore.store_duplicate_slot(
160 shred_slot,
161 existing_shred_payload.clone(),
162 shred.clone().into_payload(),
163 )?;
164 (shred, shred::Payload::from(existing_shred_payload))
165 }
166 };
167
168 cluster_info.push_duplicate_shred(&shred1, &shred2)?;
170 duplicate_slots_sender.send(shred_slot)?;
172
173 Ok(())
174 };
175 const RECV_TIMEOUT: Duration = Duration::from_millis(200);
176 std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?)
177 .chain(shred_receiver.try_iter())
178 .try_for_each(check_duplicate)
179}
180
181#[allow(clippy::too_many_arguments)]
182fn run_insert<F>(
183 thread_pool: &ThreadPool,
184 verified_receiver: &Receiver<Vec<(shred::Payload, bool)>>,
185 blockstore: &Blockstore,
186 leader_schedule_cache: &LeaderScheduleCache,
187 handle_duplicate: F,
188 metrics: &mut BlockstoreInsertionMetrics,
189 ws_metrics: &mut WindowServiceMetrics,
190 completed_data_sets_sender: Option<&CompletedDataSetsSender>,
191 retransmit_sender: &EvictingSender<Vec<shred::Payload>>,
192 reed_solomon_cache: &ReedSolomonCache,
193 accept_repairs_only: bool,
194) -> Result<()>
195where
196 F: Fn(PossibleDuplicateShred),
197{
198 const RECV_TIMEOUT: Duration = Duration::from_millis(200);
199 let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
200 let mut shreds = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
201 shreds.extend(verified_receiver.try_iter().flatten());
202 shred_receiver_elapsed.stop();
203 ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
204 ws_metrics.run_insert_count += 1;
205 let handle_shred = |(shred, repair): (shred::Payload, bool)| {
206 if accept_repairs_only && !repair {
207 return None;
208 }
209 if repair {
210 ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed);
211 debug_assert_matches!(shred, shred::Payload::Unique(_));
212 } else {
213 debug_assert_matches!(shred, shred::Payload::Shared(_));
214 }
215 let shred = Shred::new_from_serialized_shred(shred).ok()?;
216 Some((Cow::Owned(shred), repair))
217 };
218 let now = Instant::now();
219 let shreds: Vec<_> = thread_pool.install(|| {
220 shreds
221 .into_par_iter()
222 .with_min_len(32)
223 .filter_map(handle_shred)
224 .collect()
225 });
226 ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
227 ws_metrics.num_shreds_received += shreds.len();
228 let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
229 shreds,
230 Some(leader_schedule_cache),
231 false, retransmit_sender,
233 &handle_duplicate,
234 reed_solomon_cache,
235 metrics,
236 )?;
237
238 if let Some(sender) = completed_data_sets_sender {
239 sender.try_send(completed_data_sets)?;
240 }
241
242 Ok(())
243}
244
245pub struct WindowServiceChannels {
246 pub verified_receiver: Receiver<Vec<(shred::Payload, bool)>>,
247 pub retransmit_sender: EvictingSender<Vec<shred::Payload>>,
248 pub completed_data_sets_sender: Option<CompletedDataSetsSender>,
249 pub duplicate_slots_sender: DuplicateSlotSender,
250 pub repair_service_channels: RepairServiceChannels,
251}
252
253impl WindowServiceChannels {
254 pub fn new(
255 verified_receiver: Receiver<Vec<(shred::Payload, bool)>>,
256 retransmit_sender: EvictingSender<Vec<shred::Payload>>,
257 completed_data_sets_sender: Option<CompletedDataSetsSender>,
258 duplicate_slots_sender: DuplicateSlotSender,
259 repair_service_channels: RepairServiceChannels,
260 ) -> Self {
261 Self {
262 verified_receiver,
263 retransmit_sender,
264 completed_data_sets_sender,
265 duplicate_slots_sender,
266 repair_service_channels,
267 }
268 }
269}
270
271pub(crate) struct WindowService {
272 t_insert: JoinHandle<()>,
273 t_check_duplicate: JoinHandle<()>,
274 repair_service: RepairService,
275}
276
277impl WindowService {
278 pub(crate) fn new(
279 blockstore: Arc<Blockstore>,
280 repair_socket: Arc<UdpSocket>,
281 ancestor_hashes_socket: Arc<UdpSocket>,
282 exit: Arc<AtomicBool>,
283 repair_info: RepairInfo,
284 window_service_channels: WindowServiceChannels,
285 leader_schedule_cache: Arc<LeaderScheduleCache>,
286 outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
287 ) -> WindowService {
288 let cluster_info = repair_info.cluster_info.clone();
289 let bank_forks = repair_info.bank_forks.clone();
290
291 let accept_repairs_only = repair_info.wen_restart_repair_slots.is_some();
294
295 let WindowServiceChannels {
296 verified_receiver,
297 retransmit_sender,
298 completed_data_sets_sender,
299 duplicate_slots_sender,
300 repair_service_channels,
301 } = window_service_channels;
302
303 let repair_service = RepairService::new(
304 blockstore.clone(),
305 exit.clone(),
306 repair_socket,
307 ancestor_hashes_socket,
308 repair_info,
309 outstanding_repair_requests.clone(),
310 repair_service_channels,
311 );
312
313 let (duplicate_sender, duplicate_receiver) = unbounded();
314
315 let t_check_duplicate = Self::start_check_duplicate_thread(
316 cluster_info,
317 exit.clone(),
318 blockstore.clone(),
319 duplicate_receiver,
320 duplicate_slots_sender,
321 bank_forks,
322 );
323
324 let t_insert = Self::start_window_insert_thread(
325 exit,
326 blockstore,
327 leader_schedule_cache,
328 verified_receiver,
329 duplicate_sender,
330 completed_data_sets_sender,
331 retransmit_sender,
332 accept_repairs_only,
333 );
334
335 WindowService {
336 t_insert,
337 t_check_duplicate,
338 repair_service,
339 }
340 }
341
342 fn start_check_duplicate_thread(
343 cluster_info: Arc<ClusterInfo>,
344 exit: Arc<AtomicBool>,
345 blockstore: Arc<Blockstore>,
346 duplicate_receiver: Receiver<PossibleDuplicateShred>,
347 duplicate_slots_sender: DuplicateSlotSender,
348 bank_forks: Arc<RwLock<BankForks>>,
349 ) -> JoinHandle<()> {
350 let handle_error = || {
351 inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
352 };
353 Builder::new()
354 .name("solWinCheckDup".to_string())
355 .spawn(move || {
356 while !exit.load(Ordering::Relaxed) {
357 if let Err(e) = run_check_duplicate(
358 &cluster_info,
359 &blockstore,
360 &duplicate_receiver,
361 &duplicate_slots_sender,
362 &bank_forks,
363 ) {
364 if Self::should_exit_on_error(e, &handle_error) {
365 break;
366 }
367 }
368 }
369 })
370 .unwrap()
371 }
372
373 fn start_window_insert_thread(
374 exit: Arc<AtomicBool>,
375 blockstore: Arc<Blockstore>,
376 leader_schedule_cache: Arc<LeaderScheduleCache>,
377 verified_receiver: Receiver<Vec<(shred::Payload, bool)>>,
378 check_duplicate_sender: Sender<PossibleDuplicateShred>,
379 completed_data_sets_sender: Option<CompletedDataSetsSender>,
380 retransmit_sender: EvictingSender<Vec<shred::Payload>>,
381 accept_repairs_only: bool,
382 ) -> JoinHandle<()> {
383 let handle_error = || {
384 inc_new_counter_error!("solana-window-insert-error", 1, 1);
385 };
386 let reed_solomon_cache = ReedSolomonCache::default();
387 Builder::new()
388 .name("solWinInsert".to_string())
389 .spawn(move || {
390 let thread_pool = rayon::ThreadPoolBuilder::new()
391 .num_threads(get_thread_count().min(8))
392 .use_current_thread()
396 .thread_name(|i| format!("solWinInsert{i:02}"))
397 .build()
398 .unwrap();
399 let handle_duplicate = |possible_duplicate_shred| {
400 let _ = check_duplicate_sender.send(possible_duplicate_shred);
401 };
402 let mut metrics = BlockstoreInsertionMetrics::default();
403 let mut ws_metrics = WindowServiceMetrics::default();
404 let mut last_print = Instant::now();
405 while !exit.load(Ordering::Relaxed) {
406 if let Err(e) = run_insert(
407 &thread_pool,
408 &verified_receiver,
409 &blockstore,
410 &leader_schedule_cache,
411 handle_duplicate,
412 &mut metrics,
413 &mut ws_metrics,
414 completed_data_sets_sender.as_ref(),
415 &retransmit_sender,
416 &reed_solomon_cache,
417 accept_repairs_only,
418 ) {
419 ws_metrics.record_error(&e);
420 if Self::should_exit_on_error(e, &handle_error) {
421 break;
422 }
423 }
424
425 if last_print.elapsed().as_secs() > 2 {
426 metrics.report_metrics("blockstore-insert-shreds");
427 metrics = BlockstoreInsertionMetrics::default();
428 ws_metrics.report_metrics("recv-window-insert-shreds");
429 ws_metrics = WindowServiceMetrics::default();
430 last_print = Instant::now();
431 }
432 }
433 })
434 .unwrap()
435 }
436
437 fn should_exit_on_error<H>(e: Error, handle_error: &H) -> bool
438 where
439 H: Fn(),
440 {
441 match e {
442 Error::RecvTimeout(RecvTimeoutError::Disconnected) => true,
443 Error::RecvTimeout(RecvTimeoutError::Timeout) => false,
444 Error::Send => true,
445 _ => {
446 handle_error();
447 error!("thread {:?} error {:?}", thread::current().name(), e);
448 false
449 }
450 }
451 }
452
453 pub(crate) fn join(self) -> thread::Result<()> {
454 self.t_insert.join()?;
455 self.t_check_duplicate.join()?;
456 self.repair_service.join()
457 }
458}
459
460#[cfg(test)]
461mod test {
462 use {
463 super::*,
464 rand::Rng,
465 solana_entry::entry::{create_ticks, Entry},
466 solana_gossip::contact_info::ContactInfo,
467 solana_hash::Hash,
468 solana_keypair::Keypair,
469 solana_ledger::{
470 blockstore::{make_many_slot_entries, Blockstore},
471 genesis_utils::create_genesis_config,
472 get_tmp_ledger_path_auto_delete,
473 shred::{ProcessShredsStats, Shredder},
474 },
475 solana_runtime::bank::Bank,
476 solana_signer::Signer,
477 solana_streamer::socket::SocketAddrSpace,
478 solana_time_utils::timestamp,
479 };
480
481 fn local_entries_to_shred(
482 entries: &[Entry],
483 slot: Slot,
484 parent: Slot,
485 keypair: &Keypair,
486 ) -> Vec<Shred> {
487 let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
488 let (data_shreds, _) = shredder.entries_to_shreds(
489 keypair,
490 entries,
491 true, Some(Hash::new_from_array(rand::thread_rng().gen())),
494 0, 0, true, &ReedSolomonCache::default(),
498 &mut ProcessShredsStats::default(),
499 );
500 data_shreds
501 }
502
503 #[test]
504 fn test_process_shred() {
505 let ledger_path = get_tmp_ledger_path_auto_delete!();
506 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
507 let num_entries = 10;
508 let original_entries = create_ticks(num_entries, 0, Hash::default());
509 let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new());
510 shreds.reverse();
511 blockstore
512 .insert_shreds(shreds, None, false)
513 .expect("Expect successful processing of shred");
514
515 assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);
516 }
517
518 #[test]
519 fn test_run_check_duplicate() {
520 let ledger_path = get_tmp_ledger_path_auto_delete!();
521 let genesis_config = create_genesis_config(10_000).genesis_config;
522 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
523 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
524 let (sender, receiver) = unbounded();
525 let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
526 let (shreds, _) = make_many_slot_entries(5, 5, 10);
527 blockstore
528 .insert_shreds(shreds.clone(), None, false)
529 .unwrap();
530 let duplicate_index = 0;
531 let original_shred = shreds[duplicate_index].clone();
532 let duplicate_shred = {
533 let (mut shreds, _) = make_many_slot_entries(5, 1, 10);
534 shreds.swap_remove(duplicate_index)
535 };
536 assert_eq!(duplicate_shred.slot(), shreds[0].slot());
537 let duplicate_shred_slot = duplicate_shred.slot();
538 sender
539 .send(PossibleDuplicateShred::Exists(duplicate_shred.clone()))
540 .unwrap();
541 assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
542 let keypair = Keypair::new();
543 let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
544 let cluster_info = ClusterInfo::new(
545 contact_info,
546 Arc::new(keypair),
547 SocketAddrSpace::Unspecified,
548 );
549 run_check_duplicate(
550 &cluster_info,
551 &blockstore,
552 &receiver,
553 &duplicate_slot_sender,
554 &bank_forks,
555 )
556 .unwrap();
557
558 let duplicate_proof = blockstore.get_duplicate_slot(duplicate_shred_slot).unwrap();
560 assert_eq!(duplicate_proof.shred1, *original_shred.payload());
561 assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
562
563 assert_eq!(
565 duplicate_slot_receiver.try_recv().unwrap(),
566 duplicate_shred_slot
567 );
568 }
569
570 #[test]
571 fn test_store_duplicate_shreds_same_batch() {
572 let ledger_path = get_tmp_ledger_path_auto_delete!();
573 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
574 let (duplicate_shred_sender, duplicate_shred_receiver) = unbounded();
575 let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
576 let exit = Arc::new(AtomicBool::new(false));
577 let keypair = Keypair::new();
578 let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
579 let cluster_info = Arc::new(ClusterInfo::new(
580 contact_info,
581 Arc::new(keypair),
582 SocketAddrSpace::Unspecified,
583 ));
584 let genesis_config = create_genesis_config(10_000).genesis_config;
585 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
586
587 let t_check_duplicate = WindowService::start_check_duplicate_thread(
589 cluster_info,
590 exit.clone(),
591 blockstore.clone(),
592 duplicate_shred_receiver,
593 duplicate_slot_sender,
594 bank_forks,
595 );
596
597 let handle_duplicate = |shred| {
598 let _ = duplicate_shred_sender.send(shred);
599 };
600 let num_trials = 100;
601 let (dummy_retransmit_sender, _) = EvictingSender::new_bounded(0);
602 for slot in 0..num_trials {
603 let (shreds, _) = make_many_slot_entries(slot, 1, 10);
604 let duplicate_index = 0;
605 let original_shred = shreds[duplicate_index].clone();
606 let duplicate_shred = {
607 let (mut shreds, _) = make_many_slot_entries(slot, 1, 10);
608 shreds.swap_remove(duplicate_index)
609 };
610 assert_eq!(duplicate_shred.slot(), slot);
611 let shreds = [&original_shred, &duplicate_shred]
613 .into_iter()
614 .map(|shred| (Cow::Borrowed(shred), false));
615 blockstore
616 .insert_shreds_handle_duplicate(
617 shreds,
618 None,
619 false, &dummy_retransmit_sender,
621 &handle_duplicate,
622 &ReedSolomonCache::default(),
623 &mut BlockstoreInsertionMetrics::default(),
624 )
625 .unwrap();
626
627 assert_eq!(
629 duplicate_slot_receiver
630 .recv_timeout(Duration::from_millis(5_000))
631 .unwrap(),
632 slot
633 );
634
635 let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap();
637 assert_eq!(duplicate_proof.shred1, *original_shred.payload());
638 assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
639 }
640 exit.store(true, Ordering::Relaxed);
641 t_check_duplicate.join().unwrap();
642 }
643}