1use {
6 crate::{
7 completed_data_sets_service::CompletedDataSetsSender,
8 repair::repair_service::{
9 OutstandingShredRepairs, RepairInfo, RepairService, RepairServiceChannels,
10 },
11 result::{Error, Result},
12 },
13 agave_feature_set as feature_set,
14 crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
15 rayon::{prelude::*, ThreadPool},
16 solana_clock::{Slot, DEFAULT_MS_PER_SLOT},
17 solana_gossip::cluster_info::ClusterInfo,
18 solana_ledger::{
19 blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
20 leader_schedule_cache::LeaderScheduleCache,
21 shred::{self, ReedSolomonCache, Shred},
22 },
23 solana_measure::measure::Measure,
24 solana_metrics::inc_new_counter_error,
25 solana_rayon_threadlimit::get_thread_count,
26 solana_runtime::bank_forks::BankForks,
27 solana_streamer::evicting_sender::EvictingSender,
28 solana_turbine::cluster_nodes,
29 std::{
30 borrow::Cow,
31 net::UdpSocket,
32 sync::{
33 atomic::{AtomicBool, AtomicUsize, Ordering},
34 Arc, RwLock,
35 },
36 thread::{self, Builder, JoinHandle},
37 time::{Duration, Instant},
38 },
39};
40
41type DuplicateSlotSender = Sender<Slot>;
42pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
43
44#[derive(Default)]
45struct WindowServiceMetrics {
46 run_insert_count: u64,
47 num_repairs: AtomicUsize,
48 num_shreds_received: usize,
49 handle_packets_elapsed_us: u64,
50 shred_receiver_elapsed_us: u64,
51 num_errors: u64,
52 num_errors_blockstore: u64,
53 num_errors_cross_beam_recv_timeout: u64,
54 num_errors_other: u64,
55 num_errors_try_crossbeam_send: u64,
56}
57
58impl WindowServiceMetrics {
59 fn report_metrics(&self, metric_name: &'static str) {
60 datapoint_info!(
61 metric_name,
62 (
63 "handle_packets_elapsed_us",
64 self.handle_packets_elapsed_us,
65 i64
66 ),
67 ("run_insert_count", self.run_insert_count as i64, i64),
68 ("num_repairs", self.num_repairs.load(Ordering::Relaxed), i64),
69 ("num_shreds_received", self.num_shreds_received, i64),
70 (
71 "shred_receiver_elapsed_us",
72 self.shred_receiver_elapsed_us as i64,
73 i64
74 ),
75 ("num_errors", self.num_errors, i64),
76 ("num_errors_blockstore", self.num_errors_blockstore, i64),
77 ("num_errors_other", self.num_errors_other, i64),
78 (
79 "num_errors_try_crossbeam_send",
80 self.num_errors_try_crossbeam_send,
81 i64
82 ),
83 (
84 "num_errors_cross_beam_recv_timeout",
85 self.num_errors_cross_beam_recv_timeout,
86 i64
87 ),
88 );
89 }
90
91 fn record_error(&mut self, err: &Error) {
92 self.num_errors += 1;
93 match err {
94 Error::TrySend => self.num_errors_try_crossbeam_send += 1,
95 Error::RecvTimeout(_) => self.num_errors_cross_beam_recv_timeout += 1,
96 Error::Blockstore(err) => {
97 self.num_errors_blockstore += 1;
98 error!("blockstore error: {err}");
99 }
100 _ => self.num_errors_other += 1,
101 }
102 }
103}
104
105fn run_check_duplicate(
106 cluster_info: &ClusterInfo,
107 blockstore: &Blockstore,
108 shred_receiver: &Receiver<PossibleDuplicateShred>,
109 duplicate_slots_sender: &DuplicateSlotSender,
110 bank_forks: &RwLock<BankForks>,
111) -> Result<()> {
112 let mut root_bank = bank_forks.read().unwrap().root_bank();
113 let mut last_updated = Instant::now();
114 let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
115 if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
116 last_updated = Instant::now();
118 root_bank = bank_forks.read().unwrap().root_bank();
119 }
120 let shred_slot = shred.slot();
121 let chained_merkle_conflict_duplicate_proofs = cluster_nodes::check_feature_activation(
122 &feature_set::chained_merkle_conflict_duplicate_proofs::id(),
123 shred_slot,
124 &root_bank,
125 );
126 let (shred1, shred2) = match shred {
127 PossibleDuplicateShred::LastIndexConflict(shred, conflict)
128 | PossibleDuplicateShred::ErasureConflict(shred, conflict)
129 | PossibleDuplicateShred::MerkleRootConflict(shred, conflict) => (shred, conflict),
130 PossibleDuplicateShred::ChainedMerkleRootConflict(shred, conflict) => {
131 if chained_merkle_conflict_duplicate_proofs {
132 if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
136 return Ok(());
137 }
138 blockstore.store_duplicate_slot(
139 shred_slot,
140 conflict.clone(),
141 shred.clone().into_payload(),
142 )?;
143 (shred, conflict)
144 } else {
145 return Ok(());
146 }
147 }
148 PossibleDuplicateShred::Exists(shred) => {
149 if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
153 return Ok(()); }
155 let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) else {
156 return Ok(()); };
158 blockstore.store_duplicate_slot(
159 shred_slot,
160 existing_shred_payload.clone(),
161 shred.clone().into_payload(),
162 )?;
163 (shred, shred::Payload::from(existing_shred_payload))
164 }
165 };
166
167 cluster_info.push_duplicate_shred(&shred1, &shred2)?;
169 duplicate_slots_sender.send(shred_slot)?;
171
172 Ok(())
173 };
174 const RECV_TIMEOUT: Duration = Duration::from_millis(200);
175 std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?)
176 .chain(shred_receiver.try_iter())
177 .try_for_each(check_duplicate)
178}
179
180#[allow(clippy::too_many_arguments)]
181fn run_insert<F>(
182 thread_pool: &ThreadPool,
183 verified_receiver: &Receiver<Vec<(shred::Payload, bool)>>,
184 blockstore: &Blockstore,
185 leader_schedule_cache: &LeaderScheduleCache,
186 handle_duplicate: F,
187 metrics: &mut BlockstoreInsertionMetrics,
188 ws_metrics: &mut WindowServiceMetrics,
189 completed_data_sets_sender: Option<&CompletedDataSetsSender>,
190 retransmit_sender: &EvictingSender<Vec<shred::Payload>>,
191 reed_solomon_cache: &ReedSolomonCache,
192 accept_repairs_only: bool,
193) -> Result<()>
194where
195 F: Fn(PossibleDuplicateShred),
196{
197 const RECV_TIMEOUT: Duration = Duration::from_millis(200);
198 let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
199 let mut shreds = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
200 shreds.extend(verified_receiver.try_iter().flatten());
201 shred_receiver_elapsed.stop();
202 ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
203 ws_metrics.run_insert_count += 1;
204 let handle_shred = |(shred, repair): (shred::Payload, bool)| {
205 if accept_repairs_only && !repair {
206 return None;
207 }
208 if repair {
209 ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed);
210 }
211 let shred = Shred::new_from_serialized_shred(shred).ok()?;
212 Some((Cow::Owned(shred), repair))
213 };
214 let now = Instant::now();
215 let shreds: Vec<_> = thread_pool.install(|| {
216 shreds
217 .into_par_iter()
218 .with_min_len(32)
219 .filter_map(handle_shred)
220 .collect()
221 });
222 ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
223 ws_metrics.num_shreds_received += shreds.len();
224 let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
225 shreds,
226 Some(leader_schedule_cache),
227 false, retransmit_sender,
229 &handle_duplicate,
230 reed_solomon_cache,
231 metrics,
232 )?;
233
234 if let Some(sender) = completed_data_sets_sender {
235 sender.try_send(completed_data_sets)?;
236 }
237
238 Ok(())
239}
240
241pub struct WindowServiceChannels {
242 pub verified_receiver: Receiver<Vec<(shred::Payload, bool)>>,
243 pub retransmit_sender: EvictingSender<Vec<shred::Payload>>,
244 pub completed_data_sets_sender: Option<CompletedDataSetsSender>,
245 pub duplicate_slots_sender: DuplicateSlotSender,
246 pub repair_service_channels: RepairServiceChannels,
247}
248
249impl WindowServiceChannels {
250 pub fn new(
251 verified_receiver: Receiver<Vec<(shred::Payload, bool)>>,
252 retransmit_sender: EvictingSender<Vec<shred::Payload>>,
253 completed_data_sets_sender: Option<CompletedDataSetsSender>,
254 duplicate_slots_sender: DuplicateSlotSender,
255 repair_service_channels: RepairServiceChannels,
256 ) -> Self {
257 Self {
258 verified_receiver,
259 retransmit_sender,
260 completed_data_sets_sender,
261 duplicate_slots_sender,
262 repair_service_channels,
263 }
264 }
265}
266
267pub(crate) struct WindowService {
268 t_insert: JoinHandle<()>,
269 t_check_duplicate: JoinHandle<()>,
270 repair_service: RepairService,
271}
272
273impl WindowService {
274 pub(crate) fn new(
275 blockstore: Arc<Blockstore>,
276 repair_socket: Arc<UdpSocket>,
277 ancestor_hashes_socket: Arc<UdpSocket>,
278 exit: Arc<AtomicBool>,
279 repair_info: RepairInfo,
280 window_service_channels: WindowServiceChannels,
281 leader_schedule_cache: Arc<LeaderScheduleCache>,
282 outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
283 ) -> WindowService {
284 let cluster_info = repair_info.cluster_info.clone();
285 let bank_forks = repair_info.bank_forks.clone();
286
287 let accept_repairs_only = repair_info.wen_restart_repair_slots.is_some();
290
291 let WindowServiceChannels {
292 verified_receiver,
293 retransmit_sender,
294 completed_data_sets_sender,
295 duplicate_slots_sender,
296 repair_service_channels,
297 } = window_service_channels;
298
299 let repair_service = RepairService::new(
300 blockstore.clone(),
301 exit.clone(),
302 repair_socket,
303 ancestor_hashes_socket,
304 repair_info,
305 outstanding_repair_requests.clone(),
306 repair_service_channels,
307 );
308
309 let (duplicate_sender, duplicate_receiver) = unbounded();
310
311 let t_check_duplicate = Self::start_check_duplicate_thread(
312 cluster_info,
313 exit.clone(),
314 blockstore.clone(),
315 duplicate_receiver,
316 duplicate_slots_sender,
317 bank_forks,
318 );
319
320 let t_insert = Self::start_window_insert_thread(
321 exit,
322 blockstore,
323 leader_schedule_cache,
324 verified_receiver,
325 duplicate_sender,
326 completed_data_sets_sender,
327 retransmit_sender,
328 accept_repairs_only,
329 );
330
331 WindowService {
332 t_insert,
333 t_check_duplicate,
334 repair_service,
335 }
336 }
337
338 fn start_check_duplicate_thread(
339 cluster_info: Arc<ClusterInfo>,
340 exit: Arc<AtomicBool>,
341 blockstore: Arc<Blockstore>,
342 duplicate_receiver: Receiver<PossibleDuplicateShred>,
343 duplicate_slots_sender: DuplicateSlotSender,
344 bank_forks: Arc<RwLock<BankForks>>,
345 ) -> JoinHandle<()> {
346 let handle_error = || {
347 inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
348 };
349 Builder::new()
350 .name("solWinCheckDup".to_string())
351 .spawn(move || {
352 while !exit.load(Ordering::Relaxed) {
353 if let Err(e) = run_check_duplicate(
354 &cluster_info,
355 &blockstore,
356 &duplicate_receiver,
357 &duplicate_slots_sender,
358 &bank_forks,
359 ) {
360 if Self::should_exit_on_error(e, &handle_error) {
361 break;
362 }
363 }
364 }
365 })
366 .unwrap()
367 }
368
369 fn start_window_insert_thread(
370 exit: Arc<AtomicBool>,
371 blockstore: Arc<Blockstore>,
372 leader_schedule_cache: Arc<LeaderScheduleCache>,
373 verified_receiver: Receiver<Vec<(shred::Payload, bool)>>,
374 check_duplicate_sender: Sender<PossibleDuplicateShred>,
375 completed_data_sets_sender: Option<CompletedDataSetsSender>,
376 retransmit_sender: EvictingSender<Vec<shred::Payload>>,
377 accept_repairs_only: bool,
378 ) -> JoinHandle<()> {
379 let handle_error = || {
380 inc_new_counter_error!("solana-window-insert-error", 1, 1);
381 };
382 let reed_solomon_cache = ReedSolomonCache::default();
383 Builder::new()
384 .name("solWinInsert".to_string())
385 .spawn(move || {
386 let thread_pool = rayon::ThreadPoolBuilder::new()
387 .num_threads(get_thread_count().min(8))
388 .use_current_thread()
392 .thread_name(|i| format!("solWinInsert{i:02}"))
393 .build()
394 .unwrap();
395 let handle_duplicate = |possible_duplicate_shred| {
396 let _ = check_duplicate_sender.send(possible_duplicate_shred);
397 };
398 let mut metrics = BlockstoreInsertionMetrics::default();
399 let mut ws_metrics = WindowServiceMetrics::default();
400 let mut last_print = Instant::now();
401 while !exit.load(Ordering::Relaxed) {
402 if let Err(e) = run_insert(
403 &thread_pool,
404 &verified_receiver,
405 &blockstore,
406 &leader_schedule_cache,
407 handle_duplicate,
408 &mut metrics,
409 &mut ws_metrics,
410 completed_data_sets_sender.as_ref(),
411 &retransmit_sender,
412 &reed_solomon_cache,
413 accept_repairs_only,
414 ) {
415 ws_metrics.record_error(&e);
416 if Self::should_exit_on_error(e, &handle_error) {
417 break;
418 }
419 }
420
421 if last_print.elapsed().as_secs() > 2 {
422 metrics.report_metrics("blockstore-insert-shreds");
423 metrics = BlockstoreInsertionMetrics::default();
424 ws_metrics.report_metrics("recv-window-insert-shreds");
425 ws_metrics = WindowServiceMetrics::default();
426 last_print = Instant::now();
427 }
428 }
429 })
430 .unwrap()
431 }
432
433 fn should_exit_on_error<H>(e: Error, handle_error: &H) -> bool
434 where
435 H: Fn(),
436 {
437 match e {
438 Error::RecvTimeout(RecvTimeoutError::Disconnected) => true,
439 Error::RecvTimeout(RecvTimeoutError::Timeout) => false,
440 Error::Send => true,
441 _ => {
442 handle_error();
443 error!("thread {:?} error {:?}", thread::current().name(), e);
444 false
445 }
446 }
447 }
448
449 pub(crate) fn join(self) -> thread::Result<()> {
450 self.t_insert.join()?;
451 self.t_check_duplicate.join()?;
452 self.repair_service.join()
453 }
454}
455
456#[cfg(test)]
457mod test {
458 use {
459 super::*,
460 rand::Rng,
461 solana_entry::entry::{create_ticks, Entry},
462 solana_gossip::contact_info::ContactInfo,
463 solana_hash::Hash,
464 solana_keypair::Keypair,
465 solana_ledger::{
466 blockstore::{make_many_slot_entries, Blockstore},
467 genesis_utils::create_genesis_config,
468 get_tmp_ledger_path_auto_delete,
469 shred::{ProcessShredsStats, Shredder},
470 },
471 solana_runtime::bank::Bank,
472 solana_signer::Signer,
473 solana_streamer::socket::SocketAddrSpace,
474 solana_time_utils::timestamp,
475 };
476
477 fn local_entries_to_shred(
478 entries: &[Entry],
479 slot: Slot,
480 parent: Slot,
481 keypair: &Keypair,
482 ) -> Vec<Shred> {
483 let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
484 let (data_shreds, _) = shredder.entries_to_merkle_shreds_for_tests(
485 keypair,
486 entries,
487 true, Some(Hash::new_from_array(rand::thread_rng().gen())),
490 0, 0, &ReedSolomonCache::default(),
493 &mut ProcessShredsStats::default(),
494 );
495 data_shreds
496 }
497
498 #[test]
499 fn test_process_shred() {
500 let ledger_path = get_tmp_ledger_path_auto_delete!();
501 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
502 let num_entries = 10;
503 let original_entries = create_ticks(num_entries, 0, Hash::default());
504 let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new());
505 shreds.reverse();
506 blockstore
507 .insert_shreds(shreds, None, false)
508 .expect("Expect successful processing of shred");
509
510 assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);
511 }
512
513 #[test]
514 fn test_run_check_duplicate() {
515 let ledger_path = get_tmp_ledger_path_auto_delete!();
516 let genesis_config = create_genesis_config(10_000).genesis_config;
517 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
518 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
519 let (sender, receiver) = unbounded();
520 let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
521 let (shreds, _) = make_many_slot_entries(5, 5, 10);
522 blockstore
523 .insert_shreds(shreds.clone(), None, false)
524 .unwrap();
525 let duplicate_index = 0;
526 let original_shred = shreds[duplicate_index].clone();
527 let duplicate_shred = {
528 let (mut shreds, _) = make_many_slot_entries(5, 1, 10);
529 shreds.swap_remove(duplicate_index)
530 };
531 assert_eq!(duplicate_shred.slot(), shreds[0].slot());
532 let duplicate_shred_slot = duplicate_shred.slot();
533 sender
534 .send(PossibleDuplicateShred::Exists(duplicate_shred.clone()))
535 .unwrap();
536 assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
537 let keypair = Keypair::new();
538 let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
539 let cluster_info = ClusterInfo::new(
540 contact_info,
541 Arc::new(keypair),
542 SocketAddrSpace::Unspecified,
543 );
544 run_check_duplicate(
545 &cluster_info,
546 &blockstore,
547 &receiver,
548 &duplicate_slot_sender,
549 &bank_forks,
550 )
551 .unwrap();
552
553 let duplicate_proof = blockstore.get_duplicate_slot(duplicate_shred_slot).unwrap();
555 assert_eq!(duplicate_proof.shred1, *original_shred.payload());
556 assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
557
558 assert_eq!(
560 duplicate_slot_receiver.try_recv().unwrap(),
561 duplicate_shred_slot
562 );
563 }
564
565 #[test]
566 fn test_store_duplicate_shreds_same_batch() {
567 let ledger_path = get_tmp_ledger_path_auto_delete!();
568 let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
569 let (duplicate_shred_sender, duplicate_shred_receiver) = unbounded();
570 let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
571 let exit = Arc::new(AtomicBool::new(false));
572 let keypair = Keypair::new();
573 let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
574 let cluster_info = Arc::new(ClusterInfo::new(
575 contact_info,
576 Arc::new(keypair),
577 SocketAddrSpace::Unspecified,
578 ));
579 let genesis_config = create_genesis_config(10_000).genesis_config;
580 let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
581
582 let t_check_duplicate = WindowService::start_check_duplicate_thread(
584 cluster_info,
585 exit.clone(),
586 blockstore.clone(),
587 duplicate_shred_receiver,
588 duplicate_slot_sender,
589 bank_forks,
590 );
591
592 let handle_duplicate = |shred| {
593 let _ = duplicate_shred_sender.send(shred);
594 };
595 let num_trials = 100;
596 let (dummy_retransmit_sender, _) = EvictingSender::new_bounded(0);
597 for slot in 0..num_trials {
598 let (shreds, _) = make_many_slot_entries(slot, 1, 10);
599 let duplicate_index = 0;
600 let original_shred = shreds[duplicate_index].clone();
601 let duplicate_shred = {
602 let (mut shreds, _) = make_many_slot_entries(slot, 1, 10);
603 shreds.swap_remove(duplicate_index)
604 };
605 assert_eq!(duplicate_shred.slot(), slot);
606 let shreds = [&original_shred, &duplicate_shred]
608 .into_iter()
609 .map(|shred| (Cow::Borrowed(shred), false));
610 blockstore
611 .insert_shreds_handle_duplicate(
612 shreds,
613 None,
614 false, &dummy_retransmit_sender,
616 &handle_duplicate,
617 &ReedSolomonCache::default(),
618 &mut BlockstoreInsertionMetrics::default(),
619 )
620 .unwrap();
621
622 assert_eq!(
624 duplicate_slot_receiver
625 .recv_timeout(Duration::from_millis(5_000))
626 .unwrap(),
627 slot
628 );
629
630 let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap();
632 assert_eq!(duplicate_proof.shred1, *original_shred.payload());
633 assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
634 }
635 exit.store(true, Ordering::Relaxed);
636 t_check_duplicate.join().unwrap();
637 }
638}