solana_core/
window_service.rs

1//! `window_service` handles the data plane incoming shreds, storing them in
2//!   blockstore and retransmitting where required
3//!
4
5use {
6    crate::{
7        completed_data_sets_service::CompletedDataSetsSender,
8        repair::repair_service::{
9            OutstandingShredRepairs, RepairInfo, RepairService, RepairServiceChannels,
10        },
11        result::{Error, Result},
12    },
13    agave_feature_set as feature_set,
14    assert_matches::debug_assert_matches,
15    crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
16    rayon::{prelude::*, ThreadPool},
17    solana_clock::{Slot, DEFAULT_MS_PER_SLOT},
18    solana_gossip::cluster_info::ClusterInfo,
19    solana_ledger::{
20        blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
21        leader_schedule_cache::LeaderScheduleCache,
22        shred::{self, ReedSolomonCache, Shred},
23    },
24    solana_measure::measure::Measure,
25    solana_metrics::inc_new_counter_error,
26    solana_rayon_threadlimit::get_thread_count,
27    solana_runtime::bank_forks::BankForks,
28    solana_streamer::evicting_sender::EvictingSender,
29    solana_turbine::cluster_nodes,
30    std::{
31        borrow::Cow,
32        net::UdpSocket,
33        sync::{
34            atomic::{AtomicBool, AtomicUsize, Ordering},
35            Arc, RwLock,
36        },
37        thread::{self, Builder, JoinHandle},
38        time::{Duration, Instant},
39    },
40};
41
42type DuplicateSlotSender = Sender<Slot>;
43pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
44
45#[derive(Default)]
46struct WindowServiceMetrics {
47    run_insert_count: u64,
48    num_repairs: AtomicUsize,
49    num_shreds_received: usize,
50    handle_packets_elapsed_us: u64,
51    shred_receiver_elapsed_us: u64,
52    num_errors: u64,
53    num_errors_blockstore: u64,
54    num_errors_cross_beam_recv_timeout: u64,
55    num_errors_other: u64,
56    num_errors_try_crossbeam_send: u64,
57}
58
59impl WindowServiceMetrics {
60    fn report_metrics(&self, metric_name: &'static str) {
61        datapoint_info!(
62            metric_name,
63            (
64                "handle_packets_elapsed_us",
65                self.handle_packets_elapsed_us,
66                i64
67            ),
68            ("run_insert_count", self.run_insert_count as i64, i64),
69            ("num_repairs", self.num_repairs.load(Ordering::Relaxed), i64),
70            ("num_shreds_received", self.num_shreds_received, i64),
71            (
72                "shred_receiver_elapsed_us",
73                self.shred_receiver_elapsed_us as i64,
74                i64
75            ),
76            ("num_errors", self.num_errors, i64),
77            ("num_errors_blockstore", self.num_errors_blockstore, i64),
78            ("num_errors_other", self.num_errors_other, i64),
79            (
80                "num_errors_try_crossbeam_send",
81                self.num_errors_try_crossbeam_send,
82                i64
83            ),
84            (
85                "num_errors_cross_beam_recv_timeout",
86                self.num_errors_cross_beam_recv_timeout,
87                i64
88            ),
89        );
90    }
91
92    fn record_error(&mut self, err: &Error) {
93        self.num_errors += 1;
94        match err {
95            Error::TrySend => self.num_errors_try_crossbeam_send += 1,
96            Error::RecvTimeout(_) => self.num_errors_cross_beam_recv_timeout += 1,
97            Error::Blockstore(err) => {
98                self.num_errors_blockstore += 1;
99                error!("blockstore error: {}", err);
100            }
101            _ => self.num_errors_other += 1,
102        }
103    }
104}
105
106fn run_check_duplicate(
107    cluster_info: &ClusterInfo,
108    blockstore: &Blockstore,
109    shred_receiver: &Receiver<PossibleDuplicateShred>,
110    duplicate_slots_sender: &DuplicateSlotSender,
111    bank_forks: &RwLock<BankForks>,
112) -> Result<()> {
113    let mut root_bank = bank_forks.read().unwrap().root_bank();
114    let mut last_updated = Instant::now();
115    let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
116        if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
117            // Grabs bank forks lock once a slot
118            last_updated = Instant::now();
119            root_bank = bank_forks.read().unwrap().root_bank();
120        }
121        let shred_slot = shred.slot();
122        let chained_merkle_conflict_duplicate_proofs = cluster_nodes::check_feature_activation(
123            &feature_set::chained_merkle_conflict_duplicate_proofs::id(),
124            shred_slot,
125            &root_bank,
126        );
127        let (shred1, shred2) = match shred {
128            PossibleDuplicateShred::LastIndexConflict(shred, conflict)
129            | PossibleDuplicateShred::ErasureConflict(shred, conflict)
130            | PossibleDuplicateShred::MerkleRootConflict(shred, conflict) => (shred, conflict),
131            PossibleDuplicateShred::ChainedMerkleRootConflict(shred, conflict) => {
132                if chained_merkle_conflict_duplicate_proofs {
133                    // Although this proof can be immediately stored on detection, we wait until
134                    // here in order to check the feature flag, as storage in blockstore can
135                    // preclude the detection of other duplicate proofs in this slot
136                    if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
137                        return Ok(());
138                    }
139                    blockstore.store_duplicate_slot(
140                        shred_slot,
141                        conflict.clone(),
142                        shred.clone().into_payload(),
143                    )?;
144                    (shred, conflict)
145                } else {
146                    return Ok(());
147                }
148            }
149            PossibleDuplicateShred::Exists(shred) => {
150                // Unlike the other cases we have to wait until here to decide to handle the duplicate and store
151                // in blockstore. This is because the duplicate could have been part of the same insert batch,
152                // so we wait until the batch has been written.
153                if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
154                    return Ok(()); // A duplicate is already recorded
155                }
156                let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) else {
157                    return Ok(()); // Not a duplicate
158                };
159                blockstore.store_duplicate_slot(
160                    shred_slot,
161                    existing_shred_payload.clone(),
162                    shred.clone().into_payload(),
163                )?;
164                (shred, shred::Payload::from(existing_shred_payload))
165            }
166        };
167
168        // Propagate duplicate proof through gossip
169        cluster_info.push_duplicate_shred(&shred1, &shred2)?;
170        // Notify duplicate consensus state machine
171        duplicate_slots_sender.send(shred_slot)?;
172
173        Ok(())
174    };
175    const RECV_TIMEOUT: Duration = Duration::from_millis(200);
176    std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?)
177        .chain(shred_receiver.try_iter())
178        .try_for_each(check_duplicate)
179}
180
181#[allow(clippy::too_many_arguments)]
182fn run_insert<F>(
183    thread_pool: &ThreadPool,
184    verified_receiver: &Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
185    blockstore: &Blockstore,
186    leader_schedule_cache: &LeaderScheduleCache,
187    handle_duplicate: F,
188    metrics: &mut BlockstoreInsertionMetrics,
189    ws_metrics: &mut WindowServiceMetrics,
190    completed_data_sets_sender: Option<&CompletedDataSetsSender>,
191    retransmit_sender: &EvictingSender<Vec<shred::Payload>>,
192    reed_solomon_cache: &ReedSolomonCache,
193    accept_repairs_only: bool,
194) -> Result<()>
195where
196    F: Fn(PossibleDuplicateShred),
197{
198    const RECV_TIMEOUT: Duration = Duration::from_millis(200);
199    let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
200    let mut shreds = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
201    shreds.extend(verified_receiver.try_iter().flatten());
202    shred_receiver_elapsed.stop();
203    ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
204    ws_metrics.run_insert_count += 1;
205    let handle_shred = |(shred, repair): (shred::Payload, bool)| {
206        if accept_repairs_only && !repair {
207            return None;
208        }
209        if repair {
210            ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed);
211            debug_assert_matches!(shred, shred::Payload::Unique(_));
212        } else {
213            debug_assert_matches!(shred, shred::Payload::Shared(_));
214        }
215        let shred = Shred::new_from_serialized_shred(shred).ok()?;
216        Some((Cow::Owned(shred), repair))
217    };
218    let now = Instant::now();
219    let shreds: Vec<_> = thread_pool.install(|| {
220        shreds
221            .into_par_iter()
222            .with_min_len(32)
223            .filter_map(handle_shred)
224            .collect()
225    });
226    ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
227    ws_metrics.num_shreds_received += shreds.len();
228    let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
229        shreds,
230        Some(leader_schedule_cache),
231        false, // is_trusted
232        retransmit_sender,
233        &handle_duplicate,
234        reed_solomon_cache,
235        metrics,
236    )?;
237
238    if let Some(sender) = completed_data_sets_sender {
239        sender.try_send(completed_data_sets)?;
240    }
241
242    Ok(())
243}
244
245pub struct WindowServiceChannels {
246    pub verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
247    pub retransmit_sender: EvictingSender<Vec<shred::Payload>>,
248    pub completed_data_sets_sender: Option<CompletedDataSetsSender>,
249    pub duplicate_slots_sender: DuplicateSlotSender,
250    pub repair_service_channels: RepairServiceChannels,
251}
252
253impl WindowServiceChannels {
254    pub fn new(
255        verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
256        retransmit_sender: EvictingSender<Vec<shred::Payload>>,
257        completed_data_sets_sender: Option<CompletedDataSetsSender>,
258        duplicate_slots_sender: DuplicateSlotSender,
259        repair_service_channels: RepairServiceChannels,
260    ) -> Self {
261        Self {
262            verified_receiver,
263            retransmit_sender,
264            completed_data_sets_sender,
265            duplicate_slots_sender,
266            repair_service_channels,
267        }
268    }
269}
270
271pub(crate) struct WindowService {
272    t_insert: JoinHandle<()>,
273    t_check_duplicate: JoinHandle<()>,
274    repair_service: RepairService,
275}
276
277impl WindowService {
278    pub(crate) fn new(
279        blockstore: Arc<Blockstore>,
280        repair_socket: Arc<UdpSocket>,
281        ancestor_hashes_socket: Arc<UdpSocket>,
282        exit: Arc<AtomicBool>,
283        repair_info: RepairInfo,
284        window_service_channels: WindowServiceChannels,
285        leader_schedule_cache: Arc<LeaderScheduleCache>,
286        outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
287    ) -> WindowService {
288        let cluster_info = repair_info.cluster_info.clone();
289        let bank_forks = repair_info.bank_forks.clone();
290
291        // In wen_restart, we discard all shreds from Turbine and keep only those from repair to
292        // avoid new shreds make validator OOM before wen_restart is over.
293        let accept_repairs_only = repair_info.wen_restart_repair_slots.is_some();
294
295        let WindowServiceChannels {
296            verified_receiver,
297            retransmit_sender,
298            completed_data_sets_sender,
299            duplicate_slots_sender,
300            repair_service_channels,
301        } = window_service_channels;
302
303        let repair_service = RepairService::new(
304            blockstore.clone(),
305            exit.clone(),
306            repair_socket,
307            ancestor_hashes_socket,
308            repair_info,
309            outstanding_repair_requests.clone(),
310            repair_service_channels,
311        );
312
313        let (duplicate_sender, duplicate_receiver) = unbounded();
314
315        let t_check_duplicate = Self::start_check_duplicate_thread(
316            cluster_info,
317            exit.clone(),
318            blockstore.clone(),
319            duplicate_receiver,
320            duplicate_slots_sender,
321            bank_forks,
322        );
323
324        let t_insert = Self::start_window_insert_thread(
325            exit,
326            blockstore,
327            leader_schedule_cache,
328            verified_receiver,
329            duplicate_sender,
330            completed_data_sets_sender,
331            retransmit_sender,
332            accept_repairs_only,
333        );
334
335        WindowService {
336            t_insert,
337            t_check_duplicate,
338            repair_service,
339        }
340    }
341
342    fn start_check_duplicate_thread(
343        cluster_info: Arc<ClusterInfo>,
344        exit: Arc<AtomicBool>,
345        blockstore: Arc<Blockstore>,
346        duplicate_receiver: Receiver<PossibleDuplicateShred>,
347        duplicate_slots_sender: DuplicateSlotSender,
348        bank_forks: Arc<RwLock<BankForks>>,
349    ) -> JoinHandle<()> {
350        let handle_error = || {
351            inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
352        };
353        Builder::new()
354            .name("solWinCheckDup".to_string())
355            .spawn(move || {
356                while !exit.load(Ordering::Relaxed) {
357                    if let Err(e) = run_check_duplicate(
358                        &cluster_info,
359                        &blockstore,
360                        &duplicate_receiver,
361                        &duplicate_slots_sender,
362                        &bank_forks,
363                    ) {
364                        if Self::should_exit_on_error(e, &handle_error) {
365                            break;
366                        }
367                    }
368                }
369            })
370            .unwrap()
371    }
372
373    fn start_window_insert_thread(
374        exit: Arc<AtomicBool>,
375        blockstore: Arc<Blockstore>,
376        leader_schedule_cache: Arc<LeaderScheduleCache>,
377        verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
378        check_duplicate_sender: Sender<PossibleDuplicateShred>,
379        completed_data_sets_sender: Option<CompletedDataSetsSender>,
380        retransmit_sender: EvictingSender<Vec<shred::Payload>>,
381        accept_repairs_only: bool,
382    ) -> JoinHandle<()> {
383        let handle_error = || {
384            inc_new_counter_error!("solana-window-insert-error", 1, 1);
385        };
386        let reed_solomon_cache = ReedSolomonCache::default();
387        Builder::new()
388            .name("solWinInsert".to_string())
389            .spawn(move || {
390                let thread_pool = rayon::ThreadPoolBuilder::new()
391                    .num_threads(get_thread_count().min(8))
392                    // Use the current thread as one of the workers. This reduces overhead when the
393                    // pool is used to process a small number of shreds, since they'll be processed
394                    // directly on the current thread.
395                    .use_current_thread()
396                    .thread_name(|i| format!("solWinInsert{i:02}"))
397                    .build()
398                    .unwrap();
399                let handle_duplicate = |possible_duplicate_shred| {
400                    let _ = check_duplicate_sender.send(possible_duplicate_shred);
401                };
402                let mut metrics = BlockstoreInsertionMetrics::default();
403                let mut ws_metrics = WindowServiceMetrics::default();
404                let mut last_print = Instant::now();
405                while !exit.load(Ordering::Relaxed) {
406                    if let Err(e) = run_insert(
407                        &thread_pool,
408                        &verified_receiver,
409                        &blockstore,
410                        &leader_schedule_cache,
411                        handle_duplicate,
412                        &mut metrics,
413                        &mut ws_metrics,
414                        completed_data_sets_sender.as_ref(),
415                        &retransmit_sender,
416                        &reed_solomon_cache,
417                        accept_repairs_only,
418                    ) {
419                        ws_metrics.record_error(&e);
420                        if Self::should_exit_on_error(e, &handle_error) {
421                            break;
422                        }
423                    }
424
425                    if last_print.elapsed().as_secs() > 2 {
426                        metrics.report_metrics("blockstore-insert-shreds");
427                        metrics = BlockstoreInsertionMetrics::default();
428                        ws_metrics.report_metrics("recv-window-insert-shreds");
429                        ws_metrics = WindowServiceMetrics::default();
430                        last_print = Instant::now();
431                    }
432                }
433            })
434            .unwrap()
435    }
436
437    fn should_exit_on_error<H>(e: Error, handle_error: &H) -> bool
438    where
439        H: Fn(),
440    {
441        match e {
442            Error::RecvTimeout(RecvTimeoutError::Disconnected) => true,
443            Error::RecvTimeout(RecvTimeoutError::Timeout) => false,
444            Error::Send => true,
445            _ => {
446                handle_error();
447                error!("thread {:?} error {:?}", thread::current().name(), e);
448                false
449            }
450        }
451    }
452
453    pub(crate) fn join(self) -> thread::Result<()> {
454        self.t_insert.join()?;
455        self.t_check_duplicate.join()?;
456        self.repair_service.join()
457    }
458}
459
460#[cfg(test)]
461mod test {
462    use {
463        super::*,
464        rand::Rng,
465        solana_entry::entry::{create_ticks, Entry},
466        solana_gossip::contact_info::ContactInfo,
467        solana_hash::Hash,
468        solana_keypair::Keypair,
469        solana_ledger::{
470            blockstore::{make_many_slot_entries, Blockstore},
471            genesis_utils::create_genesis_config,
472            get_tmp_ledger_path_auto_delete,
473            shred::{ProcessShredsStats, Shredder},
474        },
475        solana_runtime::bank::Bank,
476        solana_signer::Signer,
477        solana_streamer::socket::SocketAddrSpace,
478        solana_time_utils::timestamp,
479    };
480
481    fn local_entries_to_shred(
482        entries: &[Entry],
483        slot: Slot,
484        parent: Slot,
485        keypair: &Keypair,
486    ) -> Vec<Shred> {
487        let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
488        let (data_shreds, _) = shredder.entries_to_shreds(
489            keypair,
490            entries,
491            true, // is_last_in_slot
492            // chained_merkle_root
493            Some(Hash::new_from_array(rand::thread_rng().gen())),
494            0,    // next_shred_index
495            0,    // next_code_index
496            true, // merkle_variant
497            &ReedSolomonCache::default(),
498            &mut ProcessShredsStats::default(),
499        );
500        data_shreds
501    }
502
503    #[test]
504    fn test_process_shred() {
505        let ledger_path = get_tmp_ledger_path_auto_delete!();
506        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
507        let num_entries = 10;
508        let original_entries = create_ticks(num_entries, 0, Hash::default());
509        let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new());
510        shreds.reverse();
511        blockstore
512            .insert_shreds(shreds, None, false)
513            .expect("Expect successful processing of shred");
514
515        assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);
516    }
517
518    #[test]
519    fn test_run_check_duplicate() {
520        let ledger_path = get_tmp_ledger_path_auto_delete!();
521        let genesis_config = create_genesis_config(10_000).genesis_config;
522        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
523        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
524        let (sender, receiver) = unbounded();
525        let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
526        let (shreds, _) = make_many_slot_entries(5, 5, 10);
527        blockstore
528            .insert_shreds(shreds.clone(), None, false)
529            .unwrap();
530        let duplicate_index = 0;
531        let original_shred = shreds[duplicate_index].clone();
532        let duplicate_shred = {
533            let (mut shreds, _) = make_many_slot_entries(5, 1, 10);
534            shreds.swap_remove(duplicate_index)
535        };
536        assert_eq!(duplicate_shred.slot(), shreds[0].slot());
537        let duplicate_shred_slot = duplicate_shred.slot();
538        sender
539            .send(PossibleDuplicateShred::Exists(duplicate_shred.clone()))
540            .unwrap();
541        assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
542        let keypair = Keypair::new();
543        let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
544        let cluster_info = ClusterInfo::new(
545            contact_info,
546            Arc::new(keypair),
547            SocketAddrSpace::Unspecified,
548        );
549        run_check_duplicate(
550            &cluster_info,
551            &blockstore,
552            &receiver,
553            &duplicate_slot_sender,
554            &bank_forks,
555        )
556        .unwrap();
557
558        // Make sure the correct duplicate proof was stored
559        let duplicate_proof = blockstore.get_duplicate_slot(duplicate_shred_slot).unwrap();
560        assert_eq!(duplicate_proof.shred1, *original_shred.payload());
561        assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
562
563        // Make sure a duplicate signal was sent
564        assert_eq!(
565            duplicate_slot_receiver.try_recv().unwrap(),
566            duplicate_shred_slot
567        );
568    }
569
570    #[test]
571    fn test_store_duplicate_shreds_same_batch() {
572        let ledger_path = get_tmp_ledger_path_auto_delete!();
573        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
574        let (duplicate_shred_sender, duplicate_shred_receiver) = unbounded();
575        let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
576        let exit = Arc::new(AtomicBool::new(false));
577        let keypair = Keypair::new();
578        let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
579        let cluster_info = Arc::new(ClusterInfo::new(
580            contact_info,
581            Arc::new(keypair),
582            SocketAddrSpace::Unspecified,
583        ));
584        let genesis_config = create_genesis_config(10_000).genesis_config;
585        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
586
587        // Start duplicate thread receiving and inserting duplicates
588        let t_check_duplicate = WindowService::start_check_duplicate_thread(
589            cluster_info,
590            exit.clone(),
591            blockstore.clone(),
592            duplicate_shred_receiver,
593            duplicate_slot_sender,
594            bank_forks,
595        );
596
597        let handle_duplicate = |shred| {
598            let _ = duplicate_shred_sender.send(shred);
599        };
600        let num_trials = 100;
601        let (dummy_retransmit_sender, _) = EvictingSender::new_bounded(0);
602        for slot in 0..num_trials {
603            let (shreds, _) = make_many_slot_entries(slot, 1, 10);
604            let duplicate_index = 0;
605            let original_shred = shreds[duplicate_index].clone();
606            let duplicate_shred = {
607                let (mut shreds, _) = make_many_slot_entries(slot, 1, 10);
608                shreds.swap_remove(duplicate_index)
609            };
610            assert_eq!(duplicate_shred.slot(), slot);
611            // Simulate storing both duplicate shreds in the same batch
612            let shreds = [&original_shred, &duplicate_shred]
613                .into_iter()
614                .map(|shred| (Cow::Borrowed(shred), /*is_repaired:*/ false));
615            blockstore
616                .insert_shreds_handle_duplicate(
617                    shreds,
618                    None,
619                    false, // is_trusted
620                    &dummy_retransmit_sender,
621                    &handle_duplicate,
622                    &ReedSolomonCache::default(),
623                    &mut BlockstoreInsertionMetrics::default(),
624                )
625                .unwrap();
626
627            // Make sure a duplicate signal was sent
628            assert_eq!(
629                duplicate_slot_receiver
630                    .recv_timeout(Duration::from_millis(5_000))
631                    .unwrap(),
632                slot
633            );
634
635            // Make sure the correct duplicate proof was stored
636            let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap();
637            assert_eq!(duplicate_proof.shred1, *original_shred.payload());
638            assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
639        }
640        exit.store(true, Ordering::Relaxed);
641        t_check_duplicate.join().unwrap();
642    }
643}