1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
//! Crds Gossip Pull overlay
//! This module implements the anti-entropy protocol for the network.
//!
//! The basic strategy is as follows:
//! 1. Construct a bloom filter of the local data set
//! 2. Randomly ask a node on the network for data that is not contained in the bloom filter.
//!
//! Bloom filters have a false positive rate.  Each requests uses a different bloom filter
//! with random hash functions.  So each subsequent request will have a different distribution
//! of false positives.

use crate::contact_info::ContactInfo;
use crate::crds::Crds;
use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS};
use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use rand;
use rand::distributions::{Distribution, WeightedIndex};
use rand::Rng;
use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use std::cmp;
use std::collections::HashMap;
use std::collections::VecDeque;

pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
// The maximum age of a value received over pull responses
pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000;
pub const FALSE_RATE: f64 = 0.1f64;
pub const KEYS: f64 = 8f64;

#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)]
pub struct CrdsFilter {
    pub filter: Bloom<Hash>,
    mask: u64,
    mask_bits: u32,
}

impl CrdsFilter {
    pub fn new_rand(num_items: usize, max_bytes: usize) -> Self {
        let max_bits = (max_bytes * 8) as f64;
        let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS);
        let mask_bits = Self::mask_bits(num_items as f64, max_items as f64);
        let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
        let seed: u64 = rand::thread_rng().gen_range(0, 2u64.pow(mask_bits));
        let mask = Self::compute_mask(seed, mask_bits);
        CrdsFilter {
            filter,
            mask,
            mask_bits,
        }
    }
    // generates a vec of filters that together hold a complete set of Hashes
    pub fn new_complete_set(num_items: usize, max_bytes: usize) -> Vec<Self> {
        let max_bits = (max_bytes * 8) as f64;
        let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS);
        let mask_bits = Self::mask_bits(num_items as f64, max_items as f64);
        // for each possible mask combination, generate a new filter.
        let mut filters = vec![];
        for seed in 0..2u64.pow(mask_bits) {
            let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
            let mask = Self::compute_mask(seed, mask_bits);
            let filter = CrdsFilter {
                filter,
                mask,
                mask_bits,
            };
            filters.push(filter)
        }
        filters
    }
    fn compute_mask(seed: u64, mask_bits: u32) -> u64 {
        assert!(seed <= 2u64.pow(mask_bits));
        let seed: u64 = seed.checked_shl(64 - mask_bits).unwrap_or(0x0);
        seed | (!0u64).checked_shr(mask_bits).unwrap_or(!0x0) as u64
    }
    pub fn max_items(max_bits: f64, false_rate: f64, num_keys: f64) -> f64 {
        let m = max_bits;
        let p = false_rate;
        let k = num_keys;
        (m / (-k / (1f64 - (p.ln() / k).exp()).ln())).ceil()
    }
    fn mask_bits(num_items: f64, max_items: f64) -> u32 {
        // for small ratios this can result in a negative number, ensure it returns 0 instead
        ((num_items / max_items).log2().ceil()).max(0.0) as u32
    }
    fn hash_as_u64(item: &Hash) -> u64 {
        let arr = item.as_ref();
        let mut accum = 0;
        for (i, val) in arr.iter().enumerate().take(8) {
            accum |= (u64::from(*val)) << (i * 8) as u64;
        }
        accum
    }
    pub fn test_mask(&self, item: &Hash) -> bool {
        // only consider the highest mask_bits bits from the hash and set the rest to 1.
        let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64);
        let bits = Self::hash_as_u64(item) | ones;
        bits == self.mask
    }
    pub fn add(&mut self, item: &Hash) {
        if self.test_mask(item) {
            self.filter.add(item);
        }
    }
    pub fn contains(&self, item: &Hash) -> bool {
        if !self.test_mask(item) {
            return true;
        }
        self.filter.contains(item)
    }
}

#[derive(Clone)]
pub struct CrdsGossipPull {
    /// timestamp of last request
    pub pull_request_time: HashMap<Pubkey, u64>,
    /// hash and insert time
    purged_values: VecDeque<(Hash, u64)>,
    pub crds_timeout: u64,
    pub msg_timeout: u64,
}

impl Default for CrdsGossipPull {
    fn default() -> Self {
        Self {
            purged_values: VecDeque::new(),
            pull_request_time: HashMap::new(),
            crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
            msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
        }
    }
}
impl CrdsGossipPull {
    /// generate a random request
    pub fn new_pull_request(
        &self,
        crds: &Crds,
        self_id: &Pubkey,
        now: u64,
        stakes: &HashMap<Pubkey, u64>,
        bloom_size: usize,
    ) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> {
        let options = self.pull_options(crds, &self_id, now, stakes);
        if options.is_empty() {
            return Err(CrdsGossipError::NoPeers);
        }
        let filters = self.build_crds_filters(crds, bloom_size);
        let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap();
        let random = index.sample(&mut rand::thread_rng());
        let self_info = crds
            .lookup(&CrdsValueLabel::ContactInfo(*self_id))
            .unwrap_or_else(|| panic!("self_id invalid {}", self_id));
        Ok((options[random].1.id, filters, self_info.clone()))
    }

    fn pull_options<'a>(
        &self,
        crds: &'a Crds,
        self_id: &Pubkey,
        now: u64,
        stakes: &HashMap<Pubkey, u64>,
    ) -> Vec<(f32, &'a ContactInfo)> {
        crds.table
            .values()
            .filter_map(|v| v.value.contact_info())
            .filter(|v| v.id != *self_id && ContactInfo::is_valid_address(&v.gossip))
            .map(|item| {
                let max_weight = f32::from(u16::max_value()) - 1.0;
                let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0);
                let since = ((now - req_time) / 1024) as u32;
                let stake = get_stake(&item.id, stakes);
                let weight = get_weight(max_weight, since, stake);
                (weight, item)
            })
            .collect()
    }

    /// time when a request to `from` was initiated
    /// This is used for weighted random selection during `new_pull_request`
    /// It's important to use the local nodes request creation time as the weight
    /// instead of the response received time otherwise failed nodes will increase their weight.
    pub fn mark_pull_request_creation_time(&mut self, from: &Pubkey, now: u64) {
        self.pull_request_time.insert(*from, now);
    }

    /// Store an old hash in the purged values set
    pub fn record_old_hash(&mut self, hash: Hash, timestamp: u64) {
        self.purged_values.push_back((hash, timestamp))
    }

    /// process a pull request and create a response
    pub fn process_pull_requests(
        &mut self,
        crds: &mut Crds,
        requests: Vec<(CrdsValue, CrdsFilter)>,
        now: u64,
    ) -> Vec<Vec<CrdsValue>> {
        let rv = self.filter_crds_values(crds, &requests);
        requests.into_iter().for_each(|(caller, _)| {
            let key = caller.label().pubkey();
            let old = crds.insert(caller, now);
            if let Some(val) = old.ok().and_then(|opt| opt) {
                self.purged_values
                    .push_back((val.value_hash, val.local_timestamp));
            }
            crds.update_record_timestamp(&key, now);
        });
        rv
    }
    /// process a pull response
    pub fn process_pull_response(
        &mut self,
        crds: &mut Crds,
        from: &Pubkey,
        timeouts: &HashMap<Pubkey, u64>,
        response: Vec<CrdsValue>,
        now: u64,
    ) -> usize {
        let mut failed = 0;
        for r in response {
            let owner = r.label().pubkey();
            // Check if the crds value is older than the msg_timeout
            if now
                > r.wallclock()
                    .checked_add(self.msg_timeout)
                    .unwrap_or_else(|| 0)
                || now + self.msg_timeout < r.wallclock()
            {
                match &r.label() {
                    CrdsValueLabel::ContactInfo(_) => {
                        // Check if this ContactInfo is actually too old, it's possible that it has
                        // stake and so might have a longer effective timeout
                        let timeout = *timeouts
                            .get(&owner)
                            .unwrap_or_else(|| timeouts.get(&Pubkey::default()).unwrap());
                        if now > r.wallclock().checked_add(timeout).unwrap_or_else(|| 0)
                            || now + timeout < r.wallclock()
                        {
                            inc_new_counter_warn!(
                                "cluster_info-gossip_pull_response_value_timeout",
                                1
                            );
                            failed += 1;
                            continue;
                        }
                    }
                    _ => {
                        // Before discarding this value, check if a ContactInfo for the owner
                        // exists in the table. If it doesn't, that implies that this value can be discarded
                        if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() {
                            inc_new_counter_warn!(
                                "cluster_info-gossip_pull_response_value_timeout",
                                1
                            );
                            failed += 1;
                            continue;
                        } else {
                            // Silently insert this old value without bumping record timestamps
                            failed += crds.insert(r, now).is_err() as usize;
                            continue;
                        }
                    }
                }
            }
            let old = crds.insert(r, now);
            failed += old.is_err() as usize;
            old.ok().map(|opt| {
                crds.update_record_timestamp(&owner, now);
                opt.map(|val| {
                    self.purged_values
                        .push_back((val.value_hash, val.local_timestamp))
                })
            });
        }
        crds.update_record_timestamp(from, now);
        failed
    }
    // build a set of filters of the current crds table
    // num_filters - used to increase the likelyhood of a value in crds being added to some filter
    pub fn build_crds_filters(&self, crds: &Crds, bloom_size: usize) -> Vec<CrdsFilter> {
        let num = cmp::max(
            CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS,
            crds.table.values().count() + self.purged_values.len(),
        );
        let mut filters = CrdsFilter::new_complete_set(num, bloom_size);
        for v in crds.table.values() {
            filters
                .iter_mut()
                .for_each(|filter| filter.add(&v.value_hash));
        }
        for (value_hash, _insert_timestamp) in &self.purged_values {
            filters.iter_mut().for_each(|filter| filter.add(value_hash));
        }
        filters
    }
    /// filter values that fail the bloom filter up to max_bytes
    fn filter_crds_values(
        &self,
        crds: &Crds,
        filters: &[(CrdsValue, CrdsFilter)],
    ) -> Vec<Vec<CrdsValue>> {
        let mut ret = vec![vec![]; filters.len()];
        for v in crds.table.values() {
            filters.iter().enumerate().for_each(|(i, (_, filter))| {
                if !filter.contains(&v.value_hash) {
                    ret[i].push(v.value.clone());
                }
            });
        }
        ret
    }
    pub fn make_timeouts_def(
        &self,
        self_id: &Pubkey,
        stakes: &HashMap<Pubkey, u64>,
        epoch_ms: u64,
        min_ts: u64,
    ) -> HashMap<Pubkey, u64> {
        let mut timeouts: HashMap<Pubkey, u64> = stakes.keys().map(|s| (*s, epoch_ms)).collect();
        timeouts.insert(*self_id, std::u64::MAX);
        timeouts.insert(Pubkey::default(), min_ts);
        timeouts
    }

    pub fn make_timeouts(
        &self,
        self_id: &Pubkey,
        stakes: &HashMap<Pubkey, u64>,
        epoch_ms: u64,
    ) -> HashMap<Pubkey, u64> {
        self.make_timeouts_def(self_id, stakes, epoch_ms, self.crds_timeout)
    }

    /// Purge values from the crds that are older then `active_timeout`
    /// The value_hash of an active item is put into self.purged_values queue
    pub fn purge_active(
        &mut self,
        crds: &mut Crds,
        now: u64,
        timeouts: &HashMap<Pubkey, u64>,
    ) -> usize {
        let old = crds.find_old_labels(now, timeouts);
        let mut purged: VecDeque<_> = old
            .iter()
            .filter_map(|label| {
                let rv = crds
                    .lookup_versioned(label)
                    .map(|val| (val.value_hash, val.local_timestamp));
                crds.remove(label);
                rv
            })
            .collect();
        let ret = purged.len();
        self.purged_values.append(&mut purged);
        ret
    }
    /// Purge values from the `self.purged_values` queue that are older then purge_timeout
    pub fn purge_purged(&mut self, min_ts: u64) {
        let cnt = self
            .purged_values
            .iter()
            .take_while(|v| v.1 < min_ts)
            .count();
        self.purged_values.drain(..cnt);
    }
}
#[cfg(test)]
mod test {
    use super::*;
    use crate::contact_info::ContactInfo;
    use crate::crds_value::{CrdsData, Vote};
    use itertools::Itertools;
    use solana_perf::test_tx::test_tx;
    use solana_sdk::hash::hash;
    use solana_sdk::packet::PACKET_DATA_SIZE;

    #[test]
    fn test_new_pull_with_stakes() {
        let mut crds = Crds::default();
        let mut stakes = HashMap::new();
        let node = CrdsGossipPull::default();
        let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &Pubkey::new_rand(),
            0,
        )));
        crds.insert(me.clone(), 0).unwrap();
        for i in 1..=30 {
            let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
                &Pubkey::new_rand(),
                0,
            )));
            let id = entry.label().pubkey();
            crds.insert(entry.clone(), 0).unwrap();
            stakes.insert(id, i * 100);
        }
        let now = 1024;
        let mut options = node.pull_options(&crds, &me.label().pubkey(), now, &stakes);
        assert!(!options.is_empty());
        options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
        // check that the highest stake holder is also the heaviest weighted.
        assert_eq!(
            *stakes.get(&options.get(0).unwrap().1.id).unwrap(),
            3000_u64
        );
    }

    #[test]
    fn test_new_pull_request() {
        let mut crds = Crds::default();
        let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &Pubkey::new_rand(),
            0,
        )));
        let id = entry.label().pubkey();
        let node = CrdsGossipPull::default();
        assert_eq!(
            node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE),
            Err(CrdsGossipError::NoPeers)
        );

        crds.insert(entry.clone(), 0).unwrap();
        assert_eq!(
            node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE),
            Err(CrdsGossipError::NoPeers)
        );

        let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &Pubkey::new_rand(),
            0,
        )));
        crds.insert(new.clone(), 0).unwrap();
        let req = node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE);
        let (to, _, self_info) = req.unwrap();
        assert_eq!(to, new.label().pubkey());
        assert_eq!(self_info, entry);
    }

    #[test]
    fn test_new_mark_creation_time() {
        let mut crds = Crds::default();
        let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &Pubkey::new_rand(),
            0,
        )));
        let node_pubkey = entry.label().pubkey();
        let mut node = CrdsGossipPull::default();
        crds.insert(entry.clone(), 0).unwrap();
        let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &Pubkey::new_rand(),
            0,
        )));
        crds.insert(old.clone(), 0).unwrap();
        let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &Pubkey::new_rand(),
            0,
        )));
        crds.insert(new.clone(), 0).unwrap();

        // set request creation time to max_value
        node.mark_pull_request_creation_time(&new.label().pubkey(), u64::max_value());

        // odds of getting the other request should be 1 in u64::max_value()
        for _ in 0..10 {
            let req = node.new_pull_request(
                &crds,
                &node_pubkey,
                u64::max_value(),
                &HashMap::new(),
                PACKET_DATA_SIZE,
            );
            let (to, _, self_info) = req.unwrap();
            assert_eq!(to, old.label().pubkey());
            assert_eq!(self_info, entry);
        }
    }

    #[test]
    fn test_process_pull_request() {
        let mut node_crds = Crds::default();
        let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &Pubkey::new_rand(),
            0,
        )));
        let node_pubkey = entry.label().pubkey();
        let node = CrdsGossipPull::default();
        node_crds.insert(entry.clone(), 0).unwrap();
        let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &Pubkey::new_rand(),
            0,
        )));
        node_crds.insert(new.clone(), 0).unwrap();
        let req = node.new_pull_request(
            &node_crds,
            &node_pubkey,
            0,
            &HashMap::new(),
            PACKET_DATA_SIZE,
        );

        let mut dest_crds = Crds::default();
        let mut dest = CrdsGossipPull::default();
        let (_, filters, caller) = req.unwrap();
        let filters = filters.into_iter().map(|f| (caller.clone(), f)).collect();
        let rsp = dest.process_pull_requests(&mut dest_crds, filters, 1);
        assert!(rsp.iter().all(|rsp| rsp.is_empty()));
        assert!(dest_crds.lookup(&caller.label()).is_some());
        assert_eq!(
            dest_crds
                .lookup_versioned(&caller.label())
                .unwrap()
                .insert_timestamp,
            1
        );
        assert_eq!(
            dest_crds
                .lookup_versioned(&caller.label())
                .unwrap()
                .local_timestamp,
            1
        );
    }
    #[test]
    fn test_process_pull_request_response() {
        let mut node_crds = Crds::default();
        let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &Pubkey::new_rand(),
            0,
        )));
        let node_pubkey = entry.label().pubkey();
        let mut node = CrdsGossipPull::default();
        node_crds.insert(entry.clone(), 0).unwrap();

        let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &Pubkey::new_rand(),
            0,
        )));
        node_crds.insert(new.clone(), 0).unwrap();

        let mut dest = CrdsGossipPull::default();
        let mut dest_crds = Crds::default();
        let new_id = Pubkey::new_rand();
        let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &new_id, 1,
        )));
        dest_crds.insert(new.clone(), 0).unwrap();

        // node contains a key from the dest node, but at an older local timestamp
        let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &new_id, 0,
        )));
        assert_eq!(same_key.label(), new.label());
        assert!(same_key.wallclock() < new.wallclock());
        node_crds.insert(same_key.clone(), 0).unwrap();
        assert_eq!(
            node_crds
                .lookup_versioned(&same_key.label())
                .unwrap()
                .local_timestamp,
            0
        );
        let mut done = false;
        for _ in 0..30 {
            // there is a chance of a false positive with bloom filters
            let req = node.new_pull_request(
                &node_crds,
                &node_pubkey,
                0,
                &HashMap::new(),
                PACKET_DATA_SIZE,
            );
            let (_, filters, caller) = req.unwrap();
            let filters = filters.into_iter().map(|f| (caller.clone(), f)).collect();
            let mut rsp = dest.process_pull_requests(&mut dest_crds, filters, 0);
            // if there is a false positive this is empty
            // prob should be around 0.1 per iteration
            if rsp.is_empty() {
                continue;
            }

            if rsp.is_empty() {
                continue;
            }
            assert_eq!(rsp.len(), 1);
            let failed = node.process_pull_response(
                &mut node_crds,
                &node_pubkey,
                &node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1),
                rsp.pop().unwrap(),
                1,
            );
            assert_eq!(failed, 0);
            assert_eq!(
                node_crds
                    .lookup_versioned(&new.label())
                    .unwrap()
                    .local_timestamp,
                1
            );
            // verify that the whole record was updated for dest since this is a response from dest
            assert_eq!(
                node_crds
                    .lookup_versioned(&same_key.label())
                    .unwrap()
                    .local_timestamp,
                1
            );
            done = true;
            break;
        }
        assert!(done);
    }
    #[test]
    fn test_gossip_purge() {
        let mut node_crds = Crds::default();
        let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &Pubkey::new_rand(),
            0,
        )));
        let node_label = entry.label();
        let node_pubkey = node_label.pubkey();
        let mut node = CrdsGossipPull::default();
        node_crds.insert(entry.clone(), 0).unwrap();
        let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
            &Pubkey::new_rand(),
            0,
        )));
        node_crds.insert(old.clone(), 0).unwrap();
        let value_hash = node_crds.lookup_versioned(&old.label()).unwrap().value_hash;

        //verify self is valid
        assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);

        // purge
        let timeouts = node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1);
        node.purge_active(&mut node_crds, 2, &timeouts);

        //verify self is still valid after purge
        assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label);

        assert_eq!(node_crds.lookup_versioned(&old.label()), None);
        assert_eq!(node.purged_values.len(), 1);
        for _ in 0..30 {
            // there is a chance of a false positive with bloom filters
            // assert that purged value is still in the set
            // chance of 30 consecutive false positives is 0.1^30
            let filters = node.build_crds_filters(&node_crds, PACKET_DATA_SIZE);
            assert!(filters.iter().any(|filter| filter.contains(&value_hash)));
        }

        // purge the value
        node.purge_purged(1);
        assert_eq!(node.purged_values.len(), 0);
    }
    #[test]
    fn test_crds_filter_mask() {
        let filter = CrdsFilter::new_rand(1, 128);
        assert_eq!(filter.mask, !0x0);
        assert_eq!(CrdsFilter::max_items(80f64, 0.01, 8f64), 9f64);
        //1000/9 = 111, so 7 bits are needed to mask it
        assert_eq!(CrdsFilter::mask_bits(1000f64, 9f64), 7u32);
        let filter = CrdsFilter::new_rand(1000, 10);
        assert_eq!(filter.mask & 0x00ffffffff, 0x00ffffffff);
    }
    #[test]
    fn test_crds_filter_add_no_mask() {
        let mut filter = CrdsFilter::new_rand(1, 128);
        let h: Hash = hash(Hash::default().as_ref());
        assert!(!filter.contains(&h));
        filter.add(&h);
        assert!(filter.contains(&h));
        let h: Hash = hash(h.as_ref());
        assert!(!filter.contains(&h));
    }
    #[test]
    fn test_crds_filter_add_mask() {
        let mut filter = CrdsFilter::new_rand(1000, 10);
        let mut h: Hash = Hash::default();
        while !filter.test_mask(&h) {
            h = hash(h.as_ref());
        }
        assert!(filter.test_mask(&h));
        //if the mask succeeds, we want the guaranteed negative
        assert!(!filter.contains(&h));
        filter.add(&h);
        assert!(filter.contains(&h));
    }
    #[test]
    fn test_crds_filter_complete_set_add_mask() {
        let mut filters = CrdsFilter::new_complete_set(1000, 10);
        assert!(filters.iter().all(|f| f.mask_bits > 0));
        let mut h: Hash = Hash::default();
        // rev to make the hash::default() miss on the first few test_masks
        while !filters.iter().rev().any(|f| f.test_mask(&h)) {
            h = hash(h.as_ref());
        }
        let filter = filters.iter_mut().find(|f| f.test_mask(&h)).unwrap();
        assert!(filter.test_mask(&h));
        //if the mask succeeds, we want the guaranteed negative
        assert!(!filter.contains(&h));
        filter.add(&h);
        assert!(filter.contains(&h));
    }
    #[test]
    fn test_crds_filter_contains_mask() {
        let filter = CrdsFilter::new_rand(1000, 10);
        assert!(filter.mask_bits > 0);
        let mut h: Hash = Hash::default();
        while filter.test_mask(&h) {
            h = hash(h.as_ref());
        }
        assert!(!filter.test_mask(&h));
        //if the mask fails, the hash is contained in the set, and can be treated as a false
        //positive
        assert!(filter.contains(&h));
    }
    #[test]
    fn test_mask() {
        for i in 0..16 {
            run_test_mask(i);
        }
    }
    fn run_test_mask(mask_bits: u32) {
        let masks: Vec<_> = (0..2u64.pow(mask_bits))
            .into_iter()
            .map(|seed| CrdsFilter::compute_mask(seed, mask_bits))
            .dedup()
            .collect();
        assert_eq!(masks.len(), 2u64.pow(mask_bits) as usize)
    }

    #[test]
    fn test_process_pull_response() {
        let mut node_crds = Crds::default();
        let mut node = CrdsGossipPull::default();

        let peer_pubkey = Pubkey::new_rand();
        let peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(
            ContactInfo::new_localhost(&peer_pubkey, 0),
        ));
        let mut timeouts = HashMap::new();
        timeouts.insert(Pubkey::default(), node.crds_timeout);
        timeouts.insert(peer_pubkey, node.msg_timeout + 1);
        // inserting a fresh value should be fine.
        assert_eq!(
            node.process_pull_response(
                &mut node_crds,
                &peer_pubkey,
                &timeouts,
                vec![peer_entry.clone()],
                1,
            ),
            0
        );

        let mut node_crds = Crds::default();
        let unstaked_peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(
            ContactInfo::new_localhost(&peer_pubkey, 0),
        ));
        // check that old contact infos fail if they are too old, regardless of "timeouts"
        assert_eq!(
            node.process_pull_response(
                &mut node_crds,
                &peer_pubkey,
                &timeouts,
                vec![peer_entry.clone(), unstaked_peer_entry],
                node.msg_timeout + 100,
            ),
            2
        );

        let mut node_crds = Crds::default();
        // check that old contact infos can still land as long as they have a "timeouts" entry
        assert_eq!(
            node.process_pull_response(
                &mut node_crds,
                &peer_pubkey,
                &timeouts,
                vec![peer_entry.clone()],
                node.msg_timeout + 1,
            ),
            0
        );

        // construct something that's not a contact info
        let peer_vote =
            CrdsValue::new_unsigned(CrdsData::Vote(0, Vote::new(&peer_pubkey, test_tx(), 0)));
        // check that older CrdsValues (non-ContactInfos) infos pass even if are too old,
        // but a recent contact info (inserted above) exists
        assert_eq!(
            node.process_pull_response(
                &mut node_crds,
                &peer_pubkey,
                &timeouts,
                vec![peer_vote.clone()],
                node.msg_timeout + 1,
            ),
            0
        );

        let mut node_crds = Crds::default();
        // without a contact info, inserting an old value should fail
        assert_eq!(
            node.process_pull_response(
                &mut node_crds,
                &peer_pubkey,
                &timeouts,
                vec![peer_vote.clone()],
                node.msg_timeout + 1,
            ),
            1
        );
    }
}