solana_core/
window_service.rs

1//! `window_service` handles the data plane incoming shreds, storing them in
2//!   blockstore and retransmitting where required
3//!
4
5use {
6    crate::{
7        cluster_info_vote_listener::VerifiedVoteReceiver,
8        completed_data_sets_service::CompletedDataSetsSender,
9        repair::{
10            ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
11            repair_service::{
12                DumpedSlotsReceiver, OutstandingShredRepairs, PopularPrunedForksSender, RepairInfo,
13                RepairService,
14            },
15        },
16        result::{Error, Result},
17    },
18    assert_matches::debug_assert_matches,
19    bytes::Bytes,
20    crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
21    rayon::{prelude::*, ThreadPool},
22    solana_feature_set as feature_set,
23    solana_gossip::cluster_info::ClusterInfo,
24    solana_ledger::{
25        blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
26        leader_schedule_cache::LeaderScheduleCache,
27        shred::{self, ReedSolomonCache, Shred},
28    },
29    solana_measure::measure::Measure,
30    solana_metrics::inc_new_counter_error,
31    solana_rayon_threadlimit::get_thread_count,
32    solana_runtime::bank_forks::BankForks,
33    solana_sdk::{
34        clock::{Slot, DEFAULT_MS_PER_SLOT},
35        pubkey::Pubkey,
36    },
37    solana_turbine::cluster_nodes,
38    std::{
39        net::{SocketAddr, UdpSocket},
40        sync::{
41            atomic::{AtomicBool, AtomicUsize, Ordering},
42            Arc, RwLock,
43        },
44        thread::{self, Builder, JoinHandle},
45        time::{Duration, Instant},
46    },
47    tokio::sync::mpsc::Sender as AsyncSender,
48};
49
50type DuplicateSlotSender = Sender<Slot>;
51pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
52
53#[derive(Default)]
54struct WindowServiceMetrics {
55    run_insert_count: u64,
56    num_repairs: AtomicUsize,
57    num_shreds_received: usize,
58    handle_packets_elapsed_us: u64,
59    shred_receiver_elapsed_us: u64,
60    num_errors: u64,
61    num_errors_blockstore: u64,
62    num_errors_cross_beam_recv_timeout: u64,
63    num_errors_other: u64,
64    num_errors_try_crossbeam_send: u64,
65}
66
67impl WindowServiceMetrics {
68    fn report_metrics(&self, metric_name: &'static str) {
69        datapoint_info!(
70            metric_name,
71            (
72                "handle_packets_elapsed_us",
73                self.handle_packets_elapsed_us,
74                i64
75            ),
76            ("run_insert_count", self.run_insert_count as i64, i64),
77            ("num_repairs", self.num_repairs.load(Ordering::Relaxed), i64),
78            ("num_shreds_received", self.num_shreds_received, i64),
79            (
80                "shred_receiver_elapsed_us",
81                self.shred_receiver_elapsed_us as i64,
82                i64
83            ),
84            ("num_errors", self.num_errors, i64),
85            ("num_errors_blockstore", self.num_errors_blockstore, i64),
86            ("num_errors_other", self.num_errors_other, i64),
87            (
88                "num_errors_try_crossbeam_send",
89                self.num_errors_try_crossbeam_send,
90                i64
91            ),
92            (
93                "num_errors_cross_beam_recv_timeout",
94                self.num_errors_cross_beam_recv_timeout,
95                i64
96            ),
97        );
98    }
99
100    fn record_error(&mut self, err: &Error) {
101        self.num_errors += 1;
102        match err {
103            Error::TrySend => self.num_errors_try_crossbeam_send += 1,
104            Error::RecvTimeout(_) => self.num_errors_cross_beam_recv_timeout += 1,
105            Error::Blockstore(err) => {
106                self.num_errors_blockstore += 1;
107                error!("blockstore error: {}", err);
108            }
109            _ => self.num_errors_other += 1,
110        }
111    }
112}
113
114fn run_check_duplicate(
115    cluster_info: &ClusterInfo,
116    blockstore: &Blockstore,
117    shred_receiver: &Receiver<PossibleDuplicateShred>,
118    duplicate_slots_sender: &DuplicateSlotSender,
119    bank_forks: &RwLock<BankForks>,
120) -> Result<()> {
121    let mut root_bank = bank_forks.read().unwrap().root_bank();
122    let mut last_updated = Instant::now();
123    let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
124        if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
125            // Grabs bank forks lock once a slot
126            last_updated = Instant::now();
127            root_bank = bank_forks.read().unwrap().root_bank();
128        }
129        let shred_slot = shred.slot();
130        let chained_merkle_conflict_duplicate_proofs = cluster_nodes::check_feature_activation(
131            &feature_set::chained_merkle_conflict_duplicate_proofs::id(),
132            shred_slot,
133            &root_bank,
134        );
135        let (shred1, shred2) = match shred {
136            PossibleDuplicateShred::LastIndexConflict(shred, conflict)
137            | PossibleDuplicateShred::ErasureConflict(shred, conflict)
138            | PossibleDuplicateShred::MerkleRootConflict(shred, conflict) => (shred, conflict),
139            PossibleDuplicateShred::ChainedMerkleRootConflict(shred, conflict) => {
140                if chained_merkle_conflict_duplicate_proofs {
141                    // Although this proof can be immediately stored on detection, we wait until
142                    // here in order to check the feature flag, as storage in blockstore can
143                    // preclude the detection of other duplicate proofs in this slot
144                    if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
145                        return Ok(());
146                    }
147                    blockstore.store_duplicate_slot(
148                        shred_slot,
149                        conflict.clone(),
150                        shred.clone().into_payload(),
151                    )?;
152                    (shred, conflict)
153                } else {
154                    return Ok(());
155                }
156            }
157            PossibleDuplicateShred::Exists(shred) => {
158                // Unlike the other cases we have to wait until here to decide to handle the duplicate and store
159                // in blockstore. This is because the duplicate could have been part of the same insert batch,
160                // so we wait until the batch has been written.
161                if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
162                    return Ok(()); // A duplicate is already recorded
163                }
164                let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) else {
165                    return Ok(()); // Not a duplicate
166                };
167                blockstore.store_duplicate_slot(
168                    shred_slot,
169                    existing_shred_payload.clone(),
170                    shred.clone().into_payload(),
171                )?;
172                (shred, shred::Payload::from(existing_shred_payload))
173            }
174        };
175
176        // Propagate duplicate proof through gossip
177        cluster_info.push_duplicate_shred(&shred1, &shred2)?;
178        // Notify duplicate consensus state machine
179        duplicate_slots_sender.send(shred_slot)?;
180
181        Ok(())
182    };
183    const RECV_TIMEOUT: Duration = Duration::from_millis(200);
184    std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?)
185        .chain(shred_receiver.try_iter())
186        .try_for_each(check_duplicate)
187}
188
189#[allow(clippy::too_many_arguments)]
190fn run_insert<F>(
191    thread_pool: &ThreadPool,
192    verified_receiver: &Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
193    blockstore: &Blockstore,
194    leader_schedule_cache: &LeaderScheduleCache,
195    handle_duplicate: F,
196    metrics: &mut BlockstoreInsertionMetrics,
197    ws_metrics: &mut WindowServiceMetrics,
198    completed_data_sets_sender: Option<&CompletedDataSetsSender>,
199    retransmit_sender: &Sender<Vec<shred::Payload>>,
200    reed_solomon_cache: &ReedSolomonCache,
201    accept_repairs_only: bool,
202) -> Result<()>
203where
204    F: Fn(PossibleDuplicateShred),
205{
206    const RECV_TIMEOUT: Duration = Duration::from_millis(200);
207    let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
208    let mut shreds = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
209    shreds.extend(verified_receiver.try_iter().flatten());
210    shred_receiver_elapsed.stop();
211    ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
212    ws_metrics.run_insert_count += 1;
213    let handle_shred = |(shred, repair): (shred::Payload, bool)| {
214        if accept_repairs_only && !repair {
215            return None;
216        }
217        if repair {
218            ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed);
219            debug_assert_matches!(shred, shred::Payload::Unique(_));
220        } else {
221            debug_assert_matches!(shred, shred::Payload::Shared(_));
222        }
223        let shred = Shred::new_from_serialized_shred(shred).ok()?;
224        Some((shred, repair))
225    };
226    let now = Instant::now();
227    let shreds: Vec<_> = thread_pool.install(|| {
228        shreds
229            .into_par_iter()
230            .with_min_len(32)
231            .filter_map(handle_shred)
232            .collect()
233    });
234    ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
235    ws_metrics.num_shreds_received += shreds.len();
236    let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
237        shreds,
238        Some(leader_schedule_cache),
239        false, // is_trusted
240        retransmit_sender,
241        &handle_duplicate,
242        reed_solomon_cache,
243        metrics,
244    )?;
245
246    if let Some(sender) = completed_data_sets_sender {
247        sender.try_send(completed_data_sets)?;
248    }
249
250    Ok(())
251}
252
253pub(crate) struct WindowService {
254    t_insert: JoinHandle<()>,
255    t_check_duplicate: JoinHandle<()>,
256    repair_service: RepairService,
257}
258
259impl WindowService {
260    #[allow(clippy::too_many_arguments)]
261    pub(crate) fn new(
262        blockstore: Arc<Blockstore>,
263        verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
264        retransmit_sender: Sender<Vec<shred::Payload>>,
265        repair_socket: Arc<UdpSocket>,
266        ancestor_hashes_socket: Arc<UdpSocket>,
267        repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
268        ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
269        ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
270        exit: Arc<AtomicBool>,
271        repair_info: RepairInfo,
272        leader_schedule_cache: Arc<LeaderScheduleCache>,
273        verified_vote_receiver: VerifiedVoteReceiver,
274        completed_data_sets_sender: Option<CompletedDataSetsSender>,
275        duplicate_slots_sender: DuplicateSlotSender,
276        ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
277        dumped_slots_receiver: DumpedSlotsReceiver,
278        popular_pruned_forks_sender: PopularPrunedForksSender,
279        outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
280    ) -> WindowService {
281        let cluster_info = repair_info.cluster_info.clone();
282        let bank_forks = repair_info.bank_forks.clone();
283
284        // In wen_restart, we discard all shreds from Turbine and keep only those from repair to
285        // avoid new shreds make validator OOM before wen_restart is over.
286        let accept_repairs_only = repair_info.wen_restart_repair_slots.is_some();
287
288        let repair_service = RepairService::new(
289            blockstore.clone(),
290            exit.clone(),
291            repair_socket,
292            ancestor_hashes_socket,
293            repair_request_quic_sender,
294            ancestor_hashes_request_quic_sender,
295            ancestor_hashes_response_quic_receiver,
296            repair_info,
297            verified_vote_receiver,
298            outstanding_repair_requests.clone(),
299            ancestor_hashes_replay_update_receiver,
300            dumped_slots_receiver,
301            popular_pruned_forks_sender,
302        );
303
304        let (duplicate_sender, duplicate_receiver) = unbounded();
305
306        let t_check_duplicate = Self::start_check_duplicate_thread(
307            cluster_info,
308            exit.clone(),
309            blockstore.clone(),
310            duplicate_receiver,
311            duplicate_slots_sender,
312            bank_forks,
313        );
314
315        let t_insert = Self::start_window_insert_thread(
316            exit,
317            blockstore,
318            leader_schedule_cache,
319            verified_receiver,
320            duplicate_sender,
321            completed_data_sets_sender,
322            retransmit_sender,
323            accept_repairs_only,
324        );
325
326        WindowService {
327            t_insert,
328            t_check_duplicate,
329            repair_service,
330        }
331    }
332
333    fn start_check_duplicate_thread(
334        cluster_info: Arc<ClusterInfo>,
335        exit: Arc<AtomicBool>,
336        blockstore: Arc<Blockstore>,
337        duplicate_receiver: Receiver<PossibleDuplicateShred>,
338        duplicate_slots_sender: DuplicateSlotSender,
339        bank_forks: Arc<RwLock<BankForks>>,
340    ) -> JoinHandle<()> {
341        let handle_error = || {
342            inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
343        };
344        Builder::new()
345            .name("solWinCheckDup".to_string())
346            .spawn(move || {
347                while !exit.load(Ordering::Relaxed) {
348                    if let Err(e) = run_check_duplicate(
349                        &cluster_info,
350                        &blockstore,
351                        &duplicate_receiver,
352                        &duplicate_slots_sender,
353                        &bank_forks,
354                    ) {
355                        if Self::should_exit_on_error(e, &handle_error) {
356                            break;
357                        }
358                    }
359                }
360            })
361            .unwrap()
362    }
363
364    fn start_window_insert_thread(
365        exit: Arc<AtomicBool>,
366        blockstore: Arc<Blockstore>,
367        leader_schedule_cache: Arc<LeaderScheduleCache>,
368        verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
369        check_duplicate_sender: Sender<PossibleDuplicateShred>,
370        completed_data_sets_sender: Option<CompletedDataSetsSender>,
371        retransmit_sender: Sender<Vec<shred::Payload>>,
372        accept_repairs_only: bool,
373    ) -> JoinHandle<()> {
374        let handle_error = || {
375            inc_new_counter_error!("solana-window-insert-error", 1, 1);
376        };
377        let reed_solomon_cache = ReedSolomonCache::default();
378        Builder::new()
379            .name("solWinInsert".to_string())
380            .spawn(move || {
381                let thread_pool = rayon::ThreadPoolBuilder::new()
382                    .num_threads(get_thread_count().min(8))
383                    // Use the current thread as one of the workers. This reduces overhead when the
384                    // pool is used to process a small number of shreds, since they'll be processed
385                    // directly on the current thread.
386                    .use_current_thread()
387                    .thread_name(|i| format!("solWinInsert{i:02}"))
388                    .build()
389                    .unwrap();
390                let handle_duplicate = |possible_duplicate_shred| {
391                    let _ = check_duplicate_sender.send(possible_duplicate_shred);
392                };
393                let mut metrics = BlockstoreInsertionMetrics::default();
394                let mut ws_metrics = WindowServiceMetrics::default();
395                let mut last_print = Instant::now();
396                while !exit.load(Ordering::Relaxed) {
397                    if let Err(e) = run_insert(
398                        &thread_pool,
399                        &verified_receiver,
400                        &blockstore,
401                        &leader_schedule_cache,
402                        handle_duplicate,
403                        &mut metrics,
404                        &mut ws_metrics,
405                        completed_data_sets_sender.as_ref(),
406                        &retransmit_sender,
407                        &reed_solomon_cache,
408                        accept_repairs_only,
409                    ) {
410                        ws_metrics.record_error(&e);
411                        if Self::should_exit_on_error(e, &handle_error) {
412                            break;
413                        }
414                    }
415
416                    if last_print.elapsed().as_secs() > 2 {
417                        metrics.report_metrics("blockstore-insert-shreds");
418                        metrics = BlockstoreInsertionMetrics::default();
419                        ws_metrics.report_metrics("recv-window-insert-shreds");
420                        ws_metrics = WindowServiceMetrics::default();
421                        last_print = Instant::now();
422                    }
423                }
424            })
425            .unwrap()
426    }
427
428    fn should_exit_on_error<H>(e: Error, handle_error: &H) -> bool
429    where
430        H: Fn(),
431    {
432        match e {
433            Error::RecvTimeout(RecvTimeoutError::Disconnected) => true,
434            Error::RecvTimeout(RecvTimeoutError::Timeout) => false,
435            Error::Send => true,
436            _ => {
437                handle_error();
438                error!("thread {:?} error {:?}", thread::current().name(), e);
439                false
440            }
441        }
442    }
443
444    pub(crate) fn join(self) -> thread::Result<()> {
445        self.t_insert.join()?;
446        self.t_check_duplicate.join()?;
447        self.repair_service.join()
448    }
449}
450
451#[cfg(test)]
452mod test {
453    use {
454        super::*,
455        rand::Rng,
456        solana_entry::entry::{create_ticks, Entry},
457        solana_gossip::contact_info::ContactInfo,
458        solana_ledger::{
459            blockstore::{make_many_slot_entries, Blockstore},
460            genesis_utils::create_genesis_config,
461            get_tmp_ledger_path_auto_delete,
462            shred::{ProcessShredsStats, Shredder},
463        },
464        solana_runtime::bank::Bank,
465        solana_sdk::{
466            hash::Hash,
467            signature::{Keypair, Signer},
468            timing::timestamp,
469        },
470        solana_streamer::socket::SocketAddrSpace,
471    };
472
473    fn local_entries_to_shred(
474        entries: &[Entry],
475        slot: Slot,
476        parent: Slot,
477        keypair: &Keypair,
478    ) -> Vec<Shred> {
479        let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
480        let (data_shreds, _) = shredder.entries_to_shreds(
481            keypair,
482            entries,
483            true, // is_last_in_slot
484            // chained_merkle_root
485            Some(Hash::new_from_array(rand::thread_rng().gen())),
486            0,    // next_shred_index
487            0,    // next_code_index
488            true, // merkle_variant
489            &ReedSolomonCache::default(),
490            &mut ProcessShredsStats::default(),
491        );
492        data_shreds
493    }
494
495    #[test]
496    fn test_process_shred() {
497        let ledger_path = get_tmp_ledger_path_auto_delete!();
498        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
499        let num_entries = 10;
500        let original_entries = create_ticks(num_entries, 0, Hash::default());
501        let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new());
502        shreds.reverse();
503        blockstore
504            .insert_shreds(shreds, None, false)
505            .expect("Expect successful processing of shred");
506
507        assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);
508    }
509
510    #[test]
511    fn test_run_check_duplicate() {
512        let ledger_path = get_tmp_ledger_path_auto_delete!();
513        let genesis_config = create_genesis_config(10_000).genesis_config;
514        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
515        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
516        let (sender, receiver) = unbounded();
517        let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
518        let (shreds, _) = make_many_slot_entries(5, 5, 10);
519        blockstore
520            .insert_shreds(shreds.clone(), None, false)
521            .unwrap();
522        let duplicate_index = 0;
523        let original_shred = shreds[duplicate_index].clone();
524        let duplicate_shred = {
525            let (mut shreds, _) = make_many_slot_entries(5, 1, 10);
526            shreds.swap_remove(duplicate_index)
527        };
528        assert_eq!(duplicate_shred.slot(), shreds[0].slot());
529        let duplicate_shred_slot = duplicate_shred.slot();
530        sender
531            .send(PossibleDuplicateShred::Exists(duplicate_shred.clone()))
532            .unwrap();
533        assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
534        let keypair = Keypair::new();
535        let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
536        let cluster_info = ClusterInfo::new(
537            contact_info,
538            Arc::new(keypair),
539            SocketAddrSpace::Unspecified,
540        );
541        run_check_duplicate(
542            &cluster_info,
543            &blockstore,
544            &receiver,
545            &duplicate_slot_sender,
546            &bank_forks,
547        )
548        .unwrap();
549
550        // Make sure the correct duplicate proof was stored
551        let duplicate_proof = blockstore.get_duplicate_slot(duplicate_shred_slot).unwrap();
552        assert_eq!(duplicate_proof.shred1, *original_shred.payload());
553        assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
554
555        // Make sure a duplicate signal was sent
556        assert_eq!(
557            duplicate_slot_receiver.try_recv().unwrap(),
558            duplicate_shred_slot
559        );
560    }
561
562    #[test]
563    fn test_store_duplicate_shreds_same_batch() {
564        let ledger_path = get_tmp_ledger_path_auto_delete!();
565        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
566        let (duplicate_shred_sender, duplicate_shred_receiver) = unbounded();
567        let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
568        let exit = Arc::new(AtomicBool::new(false));
569        let keypair = Keypair::new();
570        let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
571        let cluster_info = Arc::new(ClusterInfo::new(
572            contact_info,
573            Arc::new(keypair),
574            SocketAddrSpace::Unspecified,
575        ));
576        let genesis_config = create_genesis_config(10_000).genesis_config;
577        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
578
579        // Start duplicate thread receiving and inserting duplicates
580        let t_check_duplicate = WindowService::start_check_duplicate_thread(
581            cluster_info,
582            exit.clone(),
583            blockstore.clone(),
584            duplicate_shred_receiver,
585            duplicate_slot_sender,
586            bank_forks,
587        );
588
589        let handle_duplicate = |shred| {
590            let _ = duplicate_shred_sender.send(shred);
591        };
592        let num_trials = 100;
593        let (dummy_retransmit_sender, _) = crossbeam_channel::bounded(0);
594        for slot in 0..num_trials {
595            let (shreds, _) = make_many_slot_entries(slot, 1, 10);
596            let duplicate_index = 0;
597            let original_shred = shreds[duplicate_index].clone();
598            let duplicate_shred = {
599                let (mut shreds, _) = make_many_slot_entries(slot, 1, 10);
600                shreds.swap_remove(duplicate_index)
601            };
602            assert_eq!(duplicate_shred.slot(), slot);
603            // Simulate storing both duplicate shreds in the same batch
604            let shreds = [original_shred.clone(), duplicate_shred.clone()]
605                .into_iter()
606                .map(|shred| (shred, /*is_repaired:*/ false));
607            blockstore
608                .insert_shreds_handle_duplicate(
609                    shreds,
610                    None,
611                    false, // is_trusted
612                    &dummy_retransmit_sender,
613                    &handle_duplicate,
614                    &ReedSolomonCache::default(),
615                    &mut BlockstoreInsertionMetrics::default(),
616                )
617                .unwrap();
618
619            // Make sure a duplicate signal was sent
620            assert_eq!(
621                duplicate_slot_receiver
622                    .recv_timeout(Duration::from_millis(5_000))
623                    .unwrap(),
624                slot
625            );
626
627            // Make sure the correct duplicate proof was stored
628            let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap();
629            assert_eq!(duplicate_proof.shred1, *original_shred.payload());
630            assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
631        }
632        exit.store(true, Ordering::Relaxed);
633        t_check_duplicate.join().unwrap();
634    }
635}