solana_core/
window_service.rs

1//! `window_service` handles the data plane incoming shreds, storing them in
2//!   blockstore and retransmitting where required
3//!
4
5use {
6    crate::{
7        completed_data_sets_service::CompletedDataSetsSender,
8        repair::repair_service::{
9            OutstandingShredRepairs, RepairInfo, RepairService, RepairServiceChannels,
10        },
11        result::{Error, Result},
12    },
13    agave_feature_set as feature_set,
14    crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
15    rayon::{prelude::*, ThreadPool},
16    solana_clock::{Slot, DEFAULT_MS_PER_SLOT},
17    solana_gossip::cluster_info::ClusterInfo,
18    solana_ledger::{
19        blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
20        leader_schedule_cache::LeaderScheduleCache,
21        shred::{self, ReedSolomonCache, Shred},
22    },
23    solana_measure::measure::Measure,
24    solana_metrics::inc_new_counter_error,
25    solana_rayon_threadlimit::get_thread_count,
26    solana_runtime::bank_forks::BankForks,
27    solana_streamer::evicting_sender::EvictingSender,
28    solana_turbine::cluster_nodes,
29    std::{
30        borrow::Cow,
31        net::UdpSocket,
32        sync::{
33            atomic::{AtomicBool, AtomicUsize, Ordering},
34            Arc, RwLock,
35        },
36        thread::{self, Builder, JoinHandle},
37        time::{Duration, Instant},
38    },
39};
40
41type DuplicateSlotSender = Sender<Slot>;
42pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
43
44#[derive(Default)]
45struct WindowServiceMetrics {
46    run_insert_count: u64,
47    num_repairs: AtomicUsize,
48    num_shreds_received: usize,
49    handle_packets_elapsed_us: u64,
50    shred_receiver_elapsed_us: u64,
51    num_errors: u64,
52    num_errors_blockstore: u64,
53    num_errors_cross_beam_recv_timeout: u64,
54    num_errors_other: u64,
55    num_errors_try_crossbeam_send: u64,
56}
57
58impl WindowServiceMetrics {
59    fn report_metrics(&self, metric_name: &'static str) {
60        datapoint_info!(
61            metric_name,
62            (
63                "handle_packets_elapsed_us",
64                self.handle_packets_elapsed_us,
65                i64
66            ),
67            ("run_insert_count", self.run_insert_count as i64, i64),
68            ("num_repairs", self.num_repairs.load(Ordering::Relaxed), i64),
69            ("num_shreds_received", self.num_shreds_received, i64),
70            (
71                "shred_receiver_elapsed_us",
72                self.shred_receiver_elapsed_us as i64,
73                i64
74            ),
75            ("num_errors", self.num_errors, i64),
76            ("num_errors_blockstore", self.num_errors_blockstore, i64),
77            ("num_errors_other", self.num_errors_other, i64),
78            (
79                "num_errors_try_crossbeam_send",
80                self.num_errors_try_crossbeam_send,
81                i64
82            ),
83            (
84                "num_errors_cross_beam_recv_timeout",
85                self.num_errors_cross_beam_recv_timeout,
86                i64
87            ),
88        );
89    }
90
91    fn record_error(&mut self, err: &Error) {
92        self.num_errors += 1;
93        match err {
94            Error::TrySend => self.num_errors_try_crossbeam_send += 1,
95            Error::RecvTimeout(_) => self.num_errors_cross_beam_recv_timeout += 1,
96            Error::Blockstore(err) => {
97                self.num_errors_blockstore += 1;
98                error!("blockstore error: {err}");
99            }
100            _ => self.num_errors_other += 1,
101        }
102    }
103}
104
105fn run_check_duplicate(
106    cluster_info: &ClusterInfo,
107    blockstore: &Blockstore,
108    shred_receiver: &Receiver<PossibleDuplicateShred>,
109    duplicate_slots_sender: &DuplicateSlotSender,
110    bank_forks: &RwLock<BankForks>,
111) -> Result<()> {
112    let mut root_bank = bank_forks.read().unwrap().root_bank();
113    let mut last_updated = Instant::now();
114    let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
115        if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
116            // Grabs bank forks lock once a slot
117            last_updated = Instant::now();
118            root_bank = bank_forks.read().unwrap().root_bank();
119        }
120        let shred_slot = shred.slot();
121        let chained_merkle_conflict_duplicate_proofs = cluster_nodes::check_feature_activation(
122            &feature_set::chained_merkle_conflict_duplicate_proofs::id(),
123            shred_slot,
124            &root_bank,
125        );
126        let (shred1, shred2) = match shred {
127            PossibleDuplicateShred::LastIndexConflict(shred, conflict)
128            | PossibleDuplicateShred::ErasureConflict(shred, conflict)
129            | PossibleDuplicateShred::MerkleRootConflict(shred, conflict) => (shred, conflict),
130            PossibleDuplicateShred::ChainedMerkleRootConflict(shred, conflict) => {
131                if chained_merkle_conflict_duplicate_proofs {
132                    // Although this proof can be immediately stored on detection, we wait until
133                    // here in order to check the feature flag, as storage in blockstore can
134                    // preclude the detection of other duplicate proofs in this slot
135                    if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
136                        return Ok(());
137                    }
138                    blockstore.store_duplicate_slot(
139                        shred_slot,
140                        conflict.clone(),
141                        shred.clone().into_payload(),
142                    )?;
143                    (shred, conflict)
144                } else {
145                    return Ok(());
146                }
147            }
148            PossibleDuplicateShred::Exists(shred) => {
149                // Unlike the other cases we have to wait until here to decide to handle the duplicate and store
150                // in blockstore. This is because the duplicate could have been part of the same insert batch,
151                // so we wait until the batch has been written.
152                if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
153                    return Ok(()); // A duplicate is already recorded
154                }
155                let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) else {
156                    return Ok(()); // Not a duplicate
157                };
158                blockstore.store_duplicate_slot(
159                    shred_slot,
160                    existing_shred_payload.clone(),
161                    shred.clone().into_payload(),
162                )?;
163                (shred, shred::Payload::from(existing_shred_payload))
164            }
165        };
166
167        // Propagate duplicate proof through gossip
168        cluster_info.push_duplicate_shred(&shred1, &shred2)?;
169        // Notify duplicate consensus state machine
170        duplicate_slots_sender.send(shred_slot)?;
171
172        Ok(())
173    };
174    const RECV_TIMEOUT: Duration = Duration::from_millis(200);
175    std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?)
176        .chain(shred_receiver.try_iter())
177        .try_for_each(check_duplicate)
178}
179
180#[allow(clippy::too_many_arguments)]
181fn run_insert<F>(
182    thread_pool: &ThreadPool,
183    verified_receiver: &Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
184    blockstore: &Blockstore,
185    leader_schedule_cache: &LeaderScheduleCache,
186    handle_duplicate: F,
187    metrics: &mut BlockstoreInsertionMetrics,
188    ws_metrics: &mut WindowServiceMetrics,
189    completed_data_sets_sender: Option<&CompletedDataSetsSender>,
190    retransmit_sender: &EvictingSender<Vec<shred::Payload>>,
191    reed_solomon_cache: &ReedSolomonCache,
192    accept_repairs_only: bool,
193) -> Result<()>
194where
195    F: Fn(PossibleDuplicateShred),
196{
197    const RECV_TIMEOUT: Duration = Duration::from_millis(200);
198    let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
199    let mut shreds = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
200    shreds.extend(verified_receiver.try_iter().flatten());
201    shred_receiver_elapsed.stop();
202    ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
203    ws_metrics.run_insert_count += 1;
204    let handle_shred = |(shred, repair): (shred::Payload, bool)| {
205        if accept_repairs_only && !repair {
206            return None;
207        }
208        if repair {
209            ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed);
210        }
211        let shred = Shred::new_from_serialized_shred(shred).ok()?;
212        Some((Cow::Owned(shred), repair))
213    };
214    let now = Instant::now();
215    let shreds: Vec<_> = thread_pool.install(|| {
216        shreds
217            .into_par_iter()
218            .with_min_len(32)
219            .filter_map(handle_shred)
220            .collect()
221    });
222    ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
223    ws_metrics.num_shreds_received += shreds.len();
224    let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
225        shreds,
226        Some(leader_schedule_cache),
227        false, // is_trusted
228        retransmit_sender,
229        &handle_duplicate,
230        reed_solomon_cache,
231        metrics,
232    )?;
233
234    if let Some(sender) = completed_data_sets_sender {
235        sender.try_send(completed_data_sets)?;
236    }
237
238    Ok(())
239}
240
241pub struct WindowServiceChannels {
242    pub verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
243    pub retransmit_sender: EvictingSender<Vec<shred::Payload>>,
244    pub completed_data_sets_sender: Option<CompletedDataSetsSender>,
245    pub duplicate_slots_sender: DuplicateSlotSender,
246    pub repair_service_channels: RepairServiceChannels,
247}
248
249impl WindowServiceChannels {
250    pub fn new(
251        verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
252        retransmit_sender: EvictingSender<Vec<shred::Payload>>,
253        completed_data_sets_sender: Option<CompletedDataSetsSender>,
254        duplicate_slots_sender: DuplicateSlotSender,
255        repair_service_channels: RepairServiceChannels,
256    ) -> Self {
257        Self {
258            verified_receiver,
259            retransmit_sender,
260            completed_data_sets_sender,
261            duplicate_slots_sender,
262            repair_service_channels,
263        }
264    }
265}
266
267pub(crate) struct WindowService {
268    t_insert: JoinHandle<()>,
269    t_check_duplicate: JoinHandle<()>,
270    repair_service: RepairService,
271}
272
273impl WindowService {
274    pub(crate) fn new(
275        blockstore: Arc<Blockstore>,
276        repair_socket: Arc<UdpSocket>,
277        ancestor_hashes_socket: Arc<UdpSocket>,
278        exit: Arc<AtomicBool>,
279        repair_info: RepairInfo,
280        window_service_channels: WindowServiceChannels,
281        leader_schedule_cache: Arc<LeaderScheduleCache>,
282        outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
283    ) -> WindowService {
284        let cluster_info = repair_info.cluster_info.clone();
285        let bank_forks = repair_info.bank_forks.clone();
286
287        // In wen_restart, we discard all shreds from Turbine and keep only those from repair to
288        // avoid new shreds make validator OOM before wen_restart is over.
289        let accept_repairs_only = repair_info.wen_restart_repair_slots.is_some();
290
291        let WindowServiceChannels {
292            verified_receiver,
293            retransmit_sender,
294            completed_data_sets_sender,
295            duplicate_slots_sender,
296            repair_service_channels,
297        } = window_service_channels;
298
299        let repair_service = RepairService::new(
300            blockstore.clone(),
301            exit.clone(),
302            repair_socket,
303            ancestor_hashes_socket,
304            repair_info,
305            outstanding_repair_requests.clone(),
306            repair_service_channels,
307        );
308
309        let (duplicate_sender, duplicate_receiver) = unbounded();
310
311        let t_check_duplicate = Self::start_check_duplicate_thread(
312            cluster_info,
313            exit.clone(),
314            blockstore.clone(),
315            duplicate_receiver,
316            duplicate_slots_sender,
317            bank_forks,
318        );
319
320        let t_insert = Self::start_window_insert_thread(
321            exit,
322            blockstore,
323            leader_schedule_cache,
324            verified_receiver,
325            duplicate_sender,
326            completed_data_sets_sender,
327            retransmit_sender,
328            accept_repairs_only,
329        );
330
331        WindowService {
332            t_insert,
333            t_check_duplicate,
334            repair_service,
335        }
336    }
337
338    fn start_check_duplicate_thread(
339        cluster_info: Arc<ClusterInfo>,
340        exit: Arc<AtomicBool>,
341        blockstore: Arc<Blockstore>,
342        duplicate_receiver: Receiver<PossibleDuplicateShred>,
343        duplicate_slots_sender: DuplicateSlotSender,
344        bank_forks: Arc<RwLock<BankForks>>,
345    ) -> JoinHandle<()> {
346        let handle_error = || {
347            inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
348        };
349        Builder::new()
350            .name("solWinCheckDup".to_string())
351            .spawn(move || {
352                while !exit.load(Ordering::Relaxed) {
353                    if let Err(e) = run_check_duplicate(
354                        &cluster_info,
355                        &blockstore,
356                        &duplicate_receiver,
357                        &duplicate_slots_sender,
358                        &bank_forks,
359                    ) {
360                        if Self::should_exit_on_error(e, &handle_error) {
361                            break;
362                        }
363                    }
364                }
365            })
366            .unwrap()
367    }
368
369    fn start_window_insert_thread(
370        exit: Arc<AtomicBool>,
371        blockstore: Arc<Blockstore>,
372        leader_schedule_cache: Arc<LeaderScheduleCache>,
373        verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
374        check_duplicate_sender: Sender<PossibleDuplicateShred>,
375        completed_data_sets_sender: Option<CompletedDataSetsSender>,
376        retransmit_sender: EvictingSender<Vec<shred::Payload>>,
377        accept_repairs_only: bool,
378    ) -> JoinHandle<()> {
379        let handle_error = || {
380            inc_new_counter_error!("solana-window-insert-error", 1, 1);
381        };
382        let reed_solomon_cache = ReedSolomonCache::default();
383        Builder::new()
384            .name("solWinInsert".to_string())
385            .spawn(move || {
386                let thread_pool = rayon::ThreadPoolBuilder::new()
387                    .num_threads(get_thread_count().min(8))
388                    // Use the current thread as one of the workers. This reduces overhead when the
389                    // pool is used to process a small number of shreds, since they'll be processed
390                    // directly on the current thread.
391                    .use_current_thread()
392                    .thread_name(|i| format!("solWinInsert{i:02}"))
393                    .build()
394                    .unwrap();
395                let handle_duplicate = |possible_duplicate_shred| {
396                    let _ = check_duplicate_sender.send(possible_duplicate_shred);
397                };
398                let mut metrics = BlockstoreInsertionMetrics::default();
399                let mut ws_metrics = WindowServiceMetrics::default();
400                let mut last_print = Instant::now();
401                while !exit.load(Ordering::Relaxed) {
402                    if let Err(e) = run_insert(
403                        &thread_pool,
404                        &verified_receiver,
405                        &blockstore,
406                        &leader_schedule_cache,
407                        handle_duplicate,
408                        &mut metrics,
409                        &mut ws_metrics,
410                        completed_data_sets_sender.as_ref(),
411                        &retransmit_sender,
412                        &reed_solomon_cache,
413                        accept_repairs_only,
414                    ) {
415                        ws_metrics.record_error(&e);
416                        if Self::should_exit_on_error(e, &handle_error) {
417                            break;
418                        }
419                    }
420
421                    if last_print.elapsed().as_secs() > 2 {
422                        metrics.report_metrics("blockstore-insert-shreds");
423                        metrics = BlockstoreInsertionMetrics::default();
424                        ws_metrics.report_metrics("recv-window-insert-shreds");
425                        ws_metrics = WindowServiceMetrics::default();
426                        last_print = Instant::now();
427                    }
428                }
429            })
430            .unwrap()
431    }
432
433    fn should_exit_on_error<H>(e: Error, handle_error: &H) -> bool
434    where
435        H: Fn(),
436    {
437        match e {
438            Error::RecvTimeout(RecvTimeoutError::Disconnected) => true,
439            Error::RecvTimeout(RecvTimeoutError::Timeout) => false,
440            Error::Send => true,
441            _ => {
442                handle_error();
443                error!("thread {:?} error {:?}", thread::current().name(), e);
444                false
445            }
446        }
447    }
448
449    pub(crate) fn join(self) -> thread::Result<()> {
450        self.t_insert.join()?;
451        self.t_check_duplicate.join()?;
452        self.repair_service.join()
453    }
454}
455
456#[cfg(test)]
457mod test {
458    use {
459        super::*,
460        rand::Rng,
461        solana_entry::entry::{create_ticks, Entry},
462        solana_gossip::contact_info::ContactInfo,
463        solana_hash::Hash,
464        solana_keypair::Keypair,
465        solana_ledger::{
466            blockstore::{make_many_slot_entries, Blockstore},
467            genesis_utils::create_genesis_config,
468            get_tmp_ledger_path_auto_delete,
469            shred::{ProcessShredsStats, Shredder},
470        },
471        solana_runtime::bank::Bank,
472        solana_signer::Signer,
473        solana_streamer::socket::SocketAddrSpace,
474        solana_time_utils::timestamp,
475    };
476
477    fn local_entries_to_shred(
478        entries: &[Entry],
479        slot: Slot,
480        parent: Slot,
481        keypair: &Keypair,
482    ) -> Vec<Shred> {
483        let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
484        let (data_shreds, _) = shredder.entries_to_merkle_shreds_for_tests(
485            keypair,
486            entries,
487            true, // is_last_in_slot
488            // chained_merkle_root
489            Some(Hash::new_from_array(rand::thread_rng().gen())),
490            0, // next_shred_index
491            0, // next_code_index
492            &ReedSolomonCache::default(),
493            &mut ProcessShredsStats::default(),
494        );
495        data_shreds
496    }
497
498    #[test]
499    fn test_process_shred() {
500        let ledger_path = get_tmp_ledger_path_auto_delete!();
501        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
502        let num_entries = 10;
503        let original_entries = create_ticks(num_entries, 0, Hash::default());
504        let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new());
505        shreds.reverse();
506        blockstore
507            .insert_shreds(shreds, None, false)
508            .expect("Expect successful processing of shred");
509
510        assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);
511    }
512
513    #[test]
514    fn test_run_check_duplicate() {
515        let ledger_path = get_tmp_ledger_path_auto_delete!();
516        let genesis_config = create_genesis_config(10_000).genesis_config;
517        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
518        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
519        let (sender, receiver) = unbounded();
520        let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
521        let (shreds, _) = make_many_slot_entries(5, 5, 10);
522        blockstore
523            .insert_shreds(shreds.clone(), None, false)
524            .unwrap();
525        let duplicate_index = 0;
526        let original_shred = shreds[duplicate_index].clone();
527        let duplicate_shred = {
528            let (mut shreds, _) = make_many_slot_entries(5, 1, 10);
529            shreds.swap_remove(duplicate_index)
530        };
531        assert_eq!(duplicate_shred.slot(), shreds[0].slot());
532        let duplicate_shred_slot = duplicate_shred.slot();
533        sender
534            .send(PossibleDuplicateShred::Exists(duplicate_shred.clone()))
535            .unwrap();
536        assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
537        let keypair = Keypair::new();
538        let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
539        let cluster_info = ClusterInfo::new(
540            contact_info,
541            Arc::new(keypair),
542            SocketAddrSpace::Unspecified,
543        );
544        run_check_duplicate(
545            &cluster_info,
546            &blockstore,
547            &receiver,
548            &duplicate_slot_sender,
549            &bank_forks,
550        )
551        .unwrap();
552
553        // Make sure the correct duplicate proof was stored
554        let duplicate_proof = blockstore.get_duplicate_slot(duplicate_shred_slot).unwrap();
555        assert_eq!(duplicate_proof.shred1, *original_shred.payload());
556        assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
557
558        // Make sure a duplicate signal was sent
559        assert_eq!(
560            duplicate_slot_receiver.try_recv().unwrap(),
561            duplicate_shred_slot
562        );
563    }
564
565    #[test]
566    fn test_store_duplicate_shreds_same_batch() {
567        let ledger_path = get_tmp_ledger_path_auto_delete!();
568        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
569        let (duplicate_shred_sender, duplicate_shred_receiver) = unbounded();
570        let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
571        let exit = Arc::new(AtomicBool::new(false));
572        let keypair = Keypair::new();
573        let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
574        let cluster_info = Arc::new(ClusterInfo::new(
575            contact_info,
576            Arc::new(keypair),
577            SocketAddrSpace::Unspecified,
578        ));
579        let genesis_config = create_genesis_config(10_000).genesis_config;
580        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
581
582        // Start duplicate thread receiving and inserting duplicates
583        let t_check_duplicate = WindowService::start_check_duplicate_thread(
584            cluster_info,
585            exit.clone(),
586            blockstore.clone(),
587            duplicate_shred_receiver,
588            duplicate_slot_sender,
589            bank_forks,
590        );
591
592        let handle_duplicate = |shred| {
593            let _ = duplicate_shred_sender.send(shred);
594        };
595        let num_trials = 100;
596        let (dummy_retransmit_sender, _) = EvictingSender::new_bounded(0);
597        for slot in 0..num_trials {
598            let (shreds, _) = make_many_slot_entries(slot, 1, 10);
599            let duplicate_index = 0;
600            let original_shred = shreds[duplicate_index].clone();
601            let duplicate_shred = {
602                let (mut shreds, _) = make_many_slot_entries(slot, 1, 10);
603                shreds.swap_remove(duplicate_index)
604            };
605            assert_eq!(duplicate_shred.slot(), slot);
606            // Simulate storing both duplicate shreds in the same batch
607            let shreds = [&original_shred, &duplicate_shred]
608                .into_iter()
609                .map(|shred| (Cow::Borrowed(shred), /*is_repaired:*/ false));
610            blockstore
611                .insert_shreds_handle_duplicate(
612                    shreds,
613                    None,
614                    false, // is_trusted
615                    &dummy_retransmit_sender,
616                    &handle_duplicate,
617                    &ReedSolomonCache::default(),
618                    &mut BlockstoreInsertionMetrics::default(),
619                )
620                .unwrap();
621
622            // Make sure a duplicate signal was sent
623            assert_eq!(
624                duplicate_slot_receiver
625                    .recv_timeout(Duration::from_millis(5_000))
626                    .unwrap(),
627                slot
628            );
629
630            // Make sure the correct duplicate proof was stored
631            let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap();
632            assert_eq!(duplicate_proof.shred1, *original_shred.payload());
633            assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
634        }
635        exit.store(true, Ordering::Relaxed);
636        t_check_duplicate.join().unwrap();
637    }
638}