1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
//! The `repair_service` module implements the tools necessary to generate a thread which
//! regularly finds missing shreds in the ledger and sends repair requests for those shreds
use crate::{
    cluster_info::ClusterInfo,
    result::Result,
    serve_repair::{RepairType, ServeRepair},
};
use solana_ledger::{
    bank_forks::BankForks,
    blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta},
};
use solana_sdk::clock::DEFAULT_SLOTS_PER_EPOCH;
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey};
use std::{
    collections::BTreeSet,
    net::UdpSocket,
    ops::Bound::{Included, Unbounded},
    sync::atomic::{AtomicBool, Ordering},
    sync::{Arc, RwLock},
    thread::sleep,
    thread::{self, Builder, JoinHandle},
    time::Duration,
};

pub const MAX_REPAIR_LENGTH: usize = 512;
pub const REPAIR_MS: u64 = 100;
pub const MAX_ORPHANS: usize = 5;

const MAX_COMPLETED_SLOT_CACHE_LEN: usize = 256;
const COMPLETED_SLOT_CACHE_FLUSH_TRIGGER: usize = 512;

pub enum RepairStrategy {
    RepairRange(RepairSlotRange),
    RepairAll {
        bank_forks: Arc<RwLock<BankForks>>,
        completed_slots_receiver: CompletedSlotsReceiver,
        epoch_schedule: EpochSchedule,
    },
}

pub struct RepairSlotRange {
    pub start: Slot,
    pub end: Slot,
}

impl Default for RepairSlotRange {
    fn default() -> Self {
        RepairSlotRange {
            start: 0,
            end: std::u64::MAX,
        }
    }
}

pub struct RepairService {
    t_repair: JoinHandle<()>,
}

impl RepairService {
    pub fn new(
        blockstore: Arc<Blockstore>,
        exit: Arc<AtomicBool>,
        repair_socket: Arc<UdpSocket>,
        cluster_info: Arc<RwLock<ClusterInfo>>,
        repair_strategy: RepairStrategy,
    ) -> Self {
        let t_repair = Builder::new()
            .name("solana-repair-service".to_string())
            .spawn(move || {
                Self::run(
                    &blockstore,
                    &exit,
                    &repair_socket,
                    &cluster_info,
                    repair_strategy,
                )
            })
            .unwrap();

        RepairService { t_repair }
    }

    fn run(
        blockstore: &Arc<Blockstore>,
        exit: &Arc<AtomicBool>,
        repair_socket: &Arc<UdpSocket>,
        cluster_info: &Arc<RwLock<ClusterInfo>>,
        repair_strategy: RepairStrategy,
    ) {
        let serve_repair = ServeRepair::new(cluster_info.clone());
        let mut epoch_slots: BTreeSet<Slot> = BTreeSet::new();
        let mut old_incomplete_slots: BTreeSet<Slot> = BTreeSet::new();
        let id = cluster_info.read().unwrap().id();
        if let RepairStrategy::RepairAll {
            ref epoch_schedule, ..
        } = repair_strategy
        {
            let current_root = blockstore.last_root();
            Self::initialize_epoch_slots(
                id,
                blockstore,
                &mut epoch_slots,
                &old_incomplete_slots,
                current_root,
                epoch_schedule,
                cluster_info,
            );
        }
        loop {
            if exit.load(Ordering::Relaxed) {
                break;
            }

            let repairs = {
                match repair_strategy {
                    RepairStrategy::RepairRange(ref repair_slot_range) => {
                        // Strategy used by archivers
                        Self::generate_repairs_in_range(
                            blockstore,
                            MAX_REPAIR_LENGTH,
                            repair_slot_range,
                        )
                    }

                    RepairStrategy::RepairAll {
                        ref completed_slots_receiver,
                        ..
                    } => {
                        let new_root = blockstore.last_root();
                        let lowest_slot = blockstore.lowest_slot();
                        Self::update_epoch_slots(
                            id,
                            new_root,
                            lowest_slot,
                            &mut epoch_slots,
                            &mut old_incomplete_slots,
                            &cluster_info,
                            completed_slots_receiver,
                        );
                        Self::generate_repairs(blockstore, new_root, MAX_REPAIR_LENGTH)
                    }
                }
            };

            if let Ok(repairs) = repairs {
                let reqs: Vec<_> = repairs
                    .into_iter()
                    .filter_map(|repair_request| {
                        serve_repair
                            .repair_request(&repair_request)
                            .map(|result| (result, repair_request))
                            .ok()
                    })
                    .collect();

                for ((to, req), _) in reqs {
                    repair_socket.send_to(&req, to).unwrap_or_else(|e| {
                        info!("{} repair req send_to({}) error {:?}", id, to, e);
                        0
                    });
                }
            }
            sleep(Duration::from_millis(REPAIR_MS));
        }
    }

    // Generate repairs for all slots `x` in the repair_range.start <= x <= repair_range.end
    pub fn generate_repairs_in_range(
        blockstore: &Blockstore,
        max_repairs: usize,
        repair_range: &RepairSlotRange,
    ) -> Result<Vec<RepairType>> {
        // Slot height and shred indexes for shreds we want to repair
        let mut repairs: Vec<RepairType> = vec![];
        for slot in repair_range.start..=repair_range.end {
            if repairs.len() >= max_repairs {
                break;
            }

            let meta = blockstore
                .meta(slot)
                .expect("Unable to lookup slot meta")
                .unwrap_or(SlotMeta {
                    slot,
                    ..SlotMeta::default()
                });

            let new_repairs = Self::generate_repairs_for_slot(
                blockstore,
                slot,
                &meta,
                max_repairs - repairs.len(),
            );
            repairs.extend(new_repairs);
        }

        Ok(repairs)
    }

    fn generate_repairs(
        blockstore: &Blockstore,
        root: Slot,
        max_repairs: usize,
    ) -> Result<Vec<RepairType>> {
        // Slot height and shred indexes for shreds we want to repair
        let mut repairs: Vec<RepairType> = vec![];
        Self::generate_repairs_for_fork(blockstore, &mut repairs, max_repairs, root);

        // TODO: Incorporate gossip to determine priorities for repair?

        // Try to resolve orphans in blockstore
        let mut orphans = blockstore.get_orphans(Some(MAX_ORPHANS));
        orphans.retain(|x| *x > root);

        Self::generate_repairs_for_orphans(&orphans[..], &mut repairs);
        Ok(repairs)
    }

    fn generate_repairs_for_slot(
        blockstore: &Blockstore,
        slot: Slot,
        slot_meta: &SlotMeta,
        max_repairs: usize,
    ) -> Vec<RepairType> {
        if slot_meta.is_full() {
            vec![]
        } else if slot_meta.consumed == slot_meta.received {
            vec![RepairType::HighestShred(slot, slot_meta.received)]
        } else {
            let reqs = blockstore.find_missing_data_indexes(
                slot,
                slot_meta.first_shred_timestamp,
                slot_meta.consumed,
                slot_meta.received,
                max_repairs,
            );
            reqs.into_iter()
                .map(|i| RepairType::Shred(slot, i))
                .collect()
        }
    }

    fn generate_repairs_for_orphans(orphans: &[u64], repairs: &mut Vec<RepairType>) {
        repairs.extend(orphans.iter().map(|h| RepairType::Orphan(*h)));
    }

    /// Repairs any fork starting at the input slot
    fn generate_repairs_for_fork(
        blockstore: &Blockstore,
        repairs: &mut Vec<RepairType>,
        max_repairs: usize,
        slot: Slot,
    ) {
        let mut pending_slots = vec![slot];
        while repairs.len() < max_repairs && !pending_slots.is_empty() {
            let slot = pending_slots.pop().unwrap();
            if let Some(slot_meta) = blockstore.meta(slot).unwrap() {
                let new_repairs = Self::generate_repairs_for_slot(
                    blockstore,
                    slot,
                    &slot_meta,
                    max_repairs - repairs.len(),
                );
                repairs.extend(new_repairs);
                let next_slots = slot_meta.next_slots;
                pending_slots.extend(next_slots);
            } else {
                break;
            }
        }
    }

    fn get_completed_slots_past_root(
        blockstore: &Blockstore,
        slots_in_gossip: &mut BTreeSet<Slot>,
        root: Slot,
        epoch_schedule: &EpochSchedule,
    ) {
        let last_confirmed_epoch = epoch_schedule.get_leader_schedule_epoch(root);
        let last_epoch_slot = epoch_schedule.get_last_slot_in_epoch(last_confirmed_epoch);

        let meta_iter = blockstore
            .slot_meta_iterator(root + 1)
            .expect("Couldn't get db iterator");

        for (current_slot, meta) in meta_iter {
            if current_slot > last_epoch_slot {
                break;
            }
            if meta.is_full() {
                slots_in_gossip.insert(current_slot);
            }
        }
    }

    fn initialize_epoch_slots(
        id: Pubkey,
        blockstore: &Blockstore,
        slots_in_gossip: &mut BTreeSet<Slot>,
        old_incomplete_slots: &BTreeSet<Slot>,
        root: Slot,
        epoch_schedule: &EpochSchedule,
        cluster_info: &RwLock<ClusterInfo>,
    ) {
        Self::get_completed_slots_past_root(blockstore, slots_in_gossip, root, epoch_schedule);

        // Safe to set into gossip because by this time, the leader schedule cache should
        // also be updated with the latest root (done in blockstore_processor) and thus
        // will provide a schedule to window_service for any incoming shreds up to the
        // last_confirmed_epoch.
        cluster_info.write().unwrap().push_epoch_slots(
            id,
            root,
            blockstore.lowest_slot(),
            slots_in_gossip.clone(),
            old_incomplete_slots,
        );
    }

    // Update the gossiped structure used for the "Repairmen" repair protocol. See docs
    // for details.
    fn update_epoch_slots(
        id: Pubkey,
        latest_known_root: Slot,
        lowest_slot: Slot,
        completed_slot_cache: &mut BTreeSet<Slot>,
        incomplete_slot_stash: &mut BTreeSet<Slot>,
        cluster_info: &RwLock<ClusterInfo>,
        completed_slots_receiver: &CompletedSlotsReceiver,
    ) {
        let mut should_update = false;
        while let Ok(completed_slots) = completed_slots_receiver.try_recv() {
            for slot in completed_slots {
                let last_slot_in_stash = *incomplete_slot_stash.iter().next_back().unwrap_or(&0);
                let removed_from_stash = incomplete_slot_stash.remove(&slot);
                // If the newly completed slot was not being tracked in stash, and is > last
                // slot being tracked in stash, add it to cache. Also, update gossip
                if !removed_from_stash && slot >= last_slot_in_stash {
                    should_update |= completed_slot_cache.insert(slot);
                }
                // If the slot was removed from stash, update gossip
                should_update |= removed_from_stash;
            }
        }

        if should_update {
            if completed_slot_cache.len() >= COMPLETED_SLOT_CACHE_FLUSH_TRIGGER {
                Self::stash_old_incomplete_slots(completed_slot_cache, incomplete_slot_stash);
                let lowest_completed_slot_in_cache =
                    *completed_slot_cache.iter().next().unwrap_or(&0);
                Self::prune_incomplete_slot_stash(
                    incomplete_slot_stash,
                    lowest_completed_slot_in_cache,
                );
            }

            cluster_info.write().unwrap().push_epoch_slots(
                id,
                latest_known_root,
                lowest_slot,
                completed_slot_cache.clone(),
                incomplete_slot_stash,
            );
        }
    }

    fn stash_old_incomplete_slots(cache: &mut BTreeSet<Slot>, stash: &mut BTreeSet<Slot>) {
        if cache.len() > MAX_COMPLETED_SLOT_CACHE_LEN {
            let mut prev = *cache.iter().next().expect("Expected to find some slot");
            cache.remove(&prev);
            while cache.len() >= MAX_COMPLETED_SLOT_CACHE_LEN {
                let next = *cache.iter().next().expect("Expected to find some slot");
                cache.remove(&next);
                // Prev slot and next slot are not included in incomplete slot list.
                (prev + 1..next).for_each(|slot| {
                    stash.insert(slot);
                });
                prev = next;
            }
        }
    }

    fn prune_incomplete_slot_stash(
        stash: &mut BTreeSet<Slot>,
        lowest_completed_slot_in_cache: Slot,
    ) {
        if let Some(oldest_incomplete_slot) = stash.iter().next() {
            // Prune old slots
            // Prune in batches to reduce overhead. Pruning starts when oldest slot is 1.5 epochs
            // earlier than the new root. But, we prune all the slots that are older than 1 epoch.
            // So slots in a batch of half epoch are getting pruned
            if oldest_incomplete_slot + DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2
                < lowest_completed_slot_in_cache
            {
                let oldest_slot_to_retain =
                    lowest_completed_slot_in_cache.saturating_sub(DEFAULT_SLOTS_PER_EPOCH);
                *stash = stash
                    .range((Included(&oldest_slot_to_retain), Unbounded))
                    .cloned()
                    .collect();
            }
        }
    }

    pub fn join(self) -> thread::Result<()> {
        self.t_repair.join()
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use crate::cluster_info::Node;
    use itertools::Itertools;
    use rand::seq::SliceRandom;
    use rand::{thread_rng, Rng};
    use solana_ledger::blockstore::{
        make_chaining_slot_entries, make_many_slot_entries, make_slot_entries,
    };
    use solana_ledger::shred::max_ticks_per_n_shreds;
    use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path};
    use std::thread::Builder;

    #[test]
    pub fn test_repair_orphan() {
        let blockstore_path = get_tmp_ledger_path!();
        {
            let blockstore = Blockstore::open(&blockstore_path).unwrap();

            // Create some orphan slots
            let (mut shreds, _) = make_slot_entries(1, 0, 1);
            let (shreds2, _) = make_slot_entries(5, 2, 1);
            shreds.extend(shreds2);
            blockstore.insert_shreds(shreds, None, false).unwrap();
            assert_eq!(
                RepairService::generate_repairs(&blockstore, 0, 2).unwrap(),
                vec![RepairType::HighestShred(0, 0), RepairType::Orphan(2)]
            );
        }

        Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
    }

    #[test]
    pub fn test_repair_empty_slot() {
        let blockstore_path = get_tmp_ledger_path!();
        {
            let blockstore = Blockstore::open(&blockstore_path).unwrap();

            let (shreds, _) = make_slot_entries(2, 0, 1);

            // Write this shred to slot 2, should chain to slot 0, which we haven't received
            // any shreds for
            blockstore.insert_shreds(shreds, None, false).unwrap();

            // Check that repair tries to patch the empty slot
            assert_eq!(
                RepairService::generate_repairs(&blockstore, 0, 2).unwrap(),
                vec![RepairType::HighestShred(0, 0)]
            );
        }
        Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
    }

    #[test]
    pub fn test_generate_repairs() {
        let blockstore_path = get_tmp_ledger_path!();
        {
            let blockstore = Blockstore::open(&blockstore_path).unwrap();

            let nth = 3;
            let num_slots = 2;

            // Create some shreds
            let (mut shreds, _) = make_many_slot_entries(0, num_slots as u64, 150 as u64);
            let num_shreds = shreds.len() as u64;
            let num_shreds_per_slot = num_shreds / num_slots;

            // write every nth shred
            let mut shreds_to_write = vec![];
            let mut missing_indexes_per_slot = vec![];
            for i in (0..num_shreds).rev() {
                let index = i % num_shreds_per_slot;
                if index % nth == 0 {
                    shreds_to_write.insert(0, shreds.remove(i as usize));
                } else if i < num_shreds_per_slot {
                    missing_indexes_per_slot.insert(0, index);
                }
            }
            blockstore
                .insert_shreds(shreds_to_write, None, false)
                .unwrap();
            // sleep so that the holes are ready for repair
            sleep(Duration::from_secs(1));
            let expected: Vec<RepairType> = (0..num_slots)
                .flat_map(|slot| {
                    missing_indexes_per_slot
                        .iter()
                        .map(move |shred_index| RepairType::Shred(slot as u64, *shred_index))
                })
                .collect();

            assert_eq!(
                RepairService::generate_repairs(&blockstore, 0, std::usize::MAX).unwrap(),
                expected
            );

            assert_eq!(
                RepairService::generate_repairs(&blockstore, 0, expected.len() - 2).unwrap()[..],
                expected[0..expected.len() - 2]
            );
        }
        Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
    }

    #[test]
    pub fn test_generate_highest_repair() {
        let blockstore_path = get_tmp_ledger_path!();
        {
            let blockstore = Blockstore::open(&blockstore_path).unwrap();

            let num_entries_per_slot = 100;

            // Create some shreds
            let (mut shreds, _) = make_slot_entries(0, 0, num_entries_per_slot as u64);
            let num_shreds_per_slot = shreds.len() as u64;

            // Remove last shred (which is also last in slot) so that slot is not complete
            shreds.pop();

            blockstore.insert_shreds(shreds, None, false).unwrap();

            // We didn't get the last shred for this slot, so ask for the highest shred for that slot
            let expected: Vec<RepairType> =
                vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)];

            assert_eq!(
                RepairService::generate_repairs(&blockstore, 0, std::usize::MAX).unwrap(),
                expected
            );
        }
        Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
    }

    #[test]
    pub fn test_repair_range() {
        let blockstore_path = get_tmp_ledger_path!();
        {
            let blockstore = Blockstore::open(&blockstore_path).unwrap();

            let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
            let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1;

            let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
            for (mut slot_shreds, _) in shreds.into_iter() {
                slot_shreds.remove(0);
                blockstore.insert_shreds(slot_shreds, None, false).unwrap();
            }
            // sleep to make slot eligible for repair
            sleep(Duration::from_secs(1));
            // Iterate through all possible combinations of start..end (inclusive on both
            // sides of the range)
            for start in 0..slots.len() {
                for end in start..slots.len() {
                    let mut repair_slot_range = RepairSlotRange::default();
                    repair_slot_range.start = slots[start];
                    repair_slot_range.end = slots[end];
                    let expected: Vec<RepairType> = (repair_slot_range.start
                        ..=repair_slot_range.end)
                        .map(|slot_index| {
                            if slots.contains(&(slot_index as u64)) {
                                RepairType::Shred(slot_index as u64, 0)
                            } else {
                                RepairType::HighestShred(slot_index as u64, 0)
                            }
                        })
                        .collect();

                    assert_eq!(
                        RepairService::generate_repairs_in_range(
                            &blockstore,
                            std::usize::MAX,
                            &repair_slot_range
                        )
                        .unwrap(),
                        expected
                    );
                }
            }
        }
        Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
    }

    #[test]
    pub fn test_repair_range_highest() {
        let blockstore_path = get_tmp_ledger_path!();
        {
            let blockstore = Blockstore::open(&blockstore_path).unwrap();

            let num_entries_per_slot = 10;

            let num_slots = 1;
            let start = 5;

            // Create some shreds in slots 0..num_slots
            for i in start..start + num_slots {
                let parent = if i > 0 { i - 1 } else { 0 };
                let (shreds, _) = make_slot_entries(i, parent, num_entries_per_slot as u64);

                blockstore.insert_shreds(shreds, None, false).unwrap();
            }

            let end = 4;
            let expected: Vec<RepairType> = vec![
                RepairType::HighestShred(end - 2, 0),
                RepairType::HighestShred(end - 1, 0),
                RepairType::HighestShred(end, 0),
            ];

            let mut repair_slot_range = RepairSlotRange::default();
            repair_slot_range.start = 2;
            repair_slot_range.end = end;

            assert_eq!(
                RepairService::generate_repairs_in_range(
                    &blockstore,
                    std::usize::MAX,
                    &repair_slot_range
                )
                .unwrap(),
                expected
            );
        }
        Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
    }

    #[test]
    pub fn test_get_completed_slots_past_root() {
        let blockstore_path = get_tmp_ledger_path!();
        {
            let blockstore = Blockstore::open(&blockstore_path).unwrap();
            let num_entries_per_slot = 10;
            let root = 10;

            let fork1 = vec![5, 7, root, 15, 20, 21];
            let fork1_shreds: Vec<_> = make_chaining_slot_entries(&fork1, num_entries_per_slot)
                .into_iter()
                .flat_map(|(shreds, _)| shreds)
                .collect();
            let fork2 = vec![8, 12];
            let fork2_shreds = make_chaining_slot_entries(&fork2, num_entries_per_slot);

            // Remove the last shred from each slot to make an incomplete slot
            let fork2_incomplete_shreds: Vec<_> = fork2_shreds
                .into_iter()
                .flat_map(|(mut shreds, _)| {
                    shreds.pop();
                    shreds
                })
                .collect();
            let mut full_slots = BTreeSet::new();

            blockstore.insert_shreds(fork1_shreds, None, false).unwrap();
            blockstore
                .insert_shreds(fork2_incomplete_shreds, None, false)
                .unwrap();

            // Test that only slots > root from fork1 were included
            let epoch_schedule = EpochSchedule::custom(32, 32, false);

            RepairService::get_completed_slots_past_root(
                &blockstore,
                &mut full_slots,
                root,
                &epoch_schedule,
            );

            let mut expected: BTreeSet<_> = fork1.into_iter().filter(|x| *x > root).collect();
            assert_eq!(full_slots, expected);

            // Test that slots past the last confirmed epoch boundary don't get included
            let last_epoch = epoch_schedule.get_leader_schedule_epoch(root);
            let last_slot = epoch_schedule.get_last_slot_in_epoch(last_epoch);
            let fork3 = vec![last_slot, last_slot + 1];
            let fork3_shreds: Vec<_> = make_chaining_slot_entries(&fork3, num_entries_per_slot)
                .into_iter()
                .flat_map(|(shreds, _)| shreds)
                .collect();
            blockstore.insert_shreds(fork3_shreds, None, false).unwrap();
            RepairService::get_completed_slots_past_root(
                &blockstore,
                &mut full_slots,
                root,
                &epoch_schedule,
            );
            expected.insert(last_slot);
            assert_eq!(full_slots, expected);
        }
        Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
    }

    #[test]
    pub fn test_update_epoch_slots() {
        let blockstore_path = get_tmp_ledger_path!();
        {
            // Create blockstore
            let (blockstore, _, completed_slots_receiver) =
                Blockstore::open_with_signal(&blockstore_path).unwrap();

            let blockstore = Arc::new(blockstore);

            let mut root = 0;
            let num_slots = 100;
            let entries_per_slot = 5;
            let blockstore_ = blockstore.clone();

            // Spin up thread to write to blockstore
            let writer = Builder::new()
                .name("writer".to_string())
                .spawn(move || {
                    let slots: Vec<_> = (1..num_slots + 1).collect();
                    let mut shreds: Vec<_> = make_chaining_slot_entries(&slots, entries_per_slot)
                        .into_iter()
                        .flat_map(|(shreds, _)| shreds)
                        .collect();
                    shreds.shuffle(&mut thread_rng());
                    let mut i = 0;
                    let max_step = entries_per_slot * 4;
                    let repair_interval_ms = 10;
                    let mut rng = rand::thread_rng();
                    let num_shreds = shreds.len();
                    while i < num_shreds {
                        let step = rng.gen_range(1, max_step + 1) as usize;
                        let step = std::cmp::min(step, num_shreds - i);
                        let shreds_to_insert = shreds.drain(..step).collect_vec();
                        blockstore_
                            .insert_shreds(shreds_to_insert, None, false)
                            .unwrap();
                        sleep(Duration::from_millis(repair_interval_ms));
                        i += step;
                    }
                })
                .unwrap();

            let mut completed_slots = BTreeSet::new();
            let node_info = Node::new_localhost_with_pubkey(&Pubkey::default());
            let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair(
                node_info.info.clone(),
            ));

            let mut old_incomplete_slots: BTreeSet<Slot> = BTreeSet::new();
            while completed_slots.len() < num_slots as usize {
                RepairService::update_epoch_slots(
                    Pubkey::default(),
                    root,
                    blockstore.lowest_slot(),
                    &mut completed_slots,
                    &mut old_incomplete_slots,
                    &cluster_info,
                    &completed_slots_receiver,
                );
            }

            let mut expected: BTreeSet<_> = (1..num_slots + 1).collect();
            assert_eq!(completed_slots, expected);

            // Update with new root, should filter out the slots <= root
            root = num_slots / 2;
            let (shreds, _) = make_slot_entries(num_slots + 2, num_slots + 1, entries_per_slot);
            blockstore.insert_shreds(shreds, None, false).unwrap();
            RepairService::update_epoch_slots(
                Pubkey::default(),
                root,
                0,
                &mut completed_slots,
                &mut old_incomplete_slots,
                &cluster_info,
                &completed_slots_receiver,
            );
            expected.insert(num_slots + 2);
            assert_eq!(completed_slots, expected);
            writer.join().unwrap();
        }
        Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
    }

    #[test]
    fn test_stash_old_incomplete_slots() {
        let mut cache: BTreeSet<Slot> = BTreeSet::new();
        let mut stash: BTreeSet<Slot> = BTreeSet::new();

        // When cache is empty.
        RepairService::stash_old_incomplete_slots(&mut cache, &mut stash);
        assert_eq!(stash.len(), 0);

        // Insert some slots in cache ( < MAX_COMPLETED_SLOT_CACHE_LEN + 1)
        cache.insert(101);
        cache.insert(102);
        cache.insert(104);
        cache.insert(105);

        // Not enough slots in cache. So stash should remain empty.
        RepairService::stash_old_incomplete_slots(&mut cache, &mut stash);
        assert_eq!(stash.len(), 0);
        assert_eq!(cache.len(), 4);

        // Insert slots in cache ( = MAX_COMPLETED_SLOT_CACHE_LEN)
        let mut cache: BTreeSet<Slot> = BTreeSet::new();
        (0..MAX_COMPLETED_SLOT_CACHE_LEN as u64)
            .into_iter()
            .for_each(|slot| {
                cache.insert(slot);
            });

        // Not enough slots in cache. So stash should remain empty.
        RepairService::stash_old_incomplete_slots(&mut cache, &mut stash);
        assert_eq!(stash.len(), 0);
        assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN);

        // Insert 1 more to cross the threshold
        cache.insert(MAX_COMPLETED_SLOT_CACHE_LEN as u64);
        RepairService::stash_old_incomplete_slots(&mut cache, &mut stash);
        // Stash is still empty, as no missing slots
        assert_eq!(stash.len(), 0);
        // It removed some entries from cache
        assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1);

        // Insert more slots to create a missing slot
        let mut cache: BTreeSet<Slot> = BTreeSet::new();
        cache.insert(0);
        (2..=MAX_COMPLETED_SLOT_CACHE_LEN as u64 + 2)
            .into_iter()
            .for_each(|slot| {
                cache.insert(slot);
            });
        RepairService::stash_old_incomplete_slots(&mut cache, &mut stash);

        // Stash is not empty
        assert!(stash.contains(&1));
        // It removed some entries from cache
        assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1);

        // Test multiple missing slots at dispersed locations
        let mut cache: BTreeSet<Slot> = BTreeSet::new();
        (0..MAX_COMPLETED_SLOT_CACHE_LEN as u64 * 2)
            .into_iter()
            .for_each(|slot| {
                cache.insert(slot);
            });

        cache.remove(&10);
        cache.remove(&11);

        cache.remove(&28);
        cache.remove(&29);

        cache.remove(&148);
        cache.remove(&149);
        cache.remove(&150);
        cache.remove(&151);

        RepairService::stash_old_incomplete_slots(&mut cache, &mut stash);

        // Stash is not empty
        assert!(stash.contains(&10));
        assert!(stash.contains(&11));
        assert!(stash.contains(&28));
        assert!(stash.contains(&29));
        assert!(stash.contains(&148));
        assert!(stash.contains(&149));
        assert!(stash.contains(&150));
        assert!(stash.contains(&151));

        assert!(!stash.contains(&147));
        assert!(!stash.contains(&152));
        // It removed some entries from cache
        assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1);
        (MAX_COMPLETED_SLOT_CACHE_LEN + 1..MAX_COMPLETED_SLOT_CACHE_LEN * 2)
            .into_iter()
            .for_each(|slot| {
                let slot: u64 = slot as u64;
                assert!(cache.contains(&slot));
            });
    }

    #[test]
    fn test_prune_incomplete_slot_stash() {
        // Prune empty stash
        let mut stash: BTreeSet<Slot> = BTreeSet::new();
        RepairService::prune_incomplete_slot_stash(&mut stash, 0);
        assert!(stash.is_empty());

        // Prune stash with slots < DEFAULT_SLOTS_PER_EPOCH
        stash.insert(0);
        stash.insert(10);
        stash.insert(11);
        stash.insert(50);
        assert_eq!(stash.len(), 4);
        RepairService::prune_incomplete_slot_stash(&mut stash, 100);
        assert_eq!(stash.len(), 4);

        // Prune stash with slots > DEFAULT_SLOTS_PER_EPOCH, but < 1.5 * DEFAULT_SLOTS_PER_EPOCH
        stash.insert(DEFAULT_SLOTS_PER_EPOCH + 50);
        assert_eq!(stash.len(), 5);
        RepairService::prune_incomplete_slot_stash(&mut stash, DEFAULT_SLOTS_PER_EPOCH + 100);
        assert_eq!(stash.len(), 5);

        // Prune stash with slots > 1.5 * DEFAULT_SLOTS_PER_EPOCH
        stash.insert(DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2);
        assert_eq!(stash.len(), 6);
        RepairService::prune_incomplete_slot_stash(
            &mut stash,
            DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2 + 1,
        );
        assert_eq!(stash.len(), 2);
    }
}