1use {
4 crate::packet::{Packet, PacketBatch},
5 ahash::RandomState,
6 rand::Rng,
7 std::{
8 hash::{BuildHasher, Hash, Hasher},
9 iter::repeat_with,
10 marker::PhantomData,
11 sync::atomic::{AtomicU64, Ordering},
12 time::{Duration, Instant},
13 },
14};
15
16pub struct Deduper<const K: usize, T: ?Sized> {
17 num_bits: u64,
18 bits: Vec<AtomicU64>,
19 state: [RandomState; K],
20 clock: Instant,
21 popcount: AtomicU64, _phantom: PhantomData<T>,
23}
24
25impl<const K: usize, T: ?Sized + Hash> Deduper<K, T> {
26 pub fn new<R: Rng>(rng: &mut R, num_bits: u64) -> Self {
27 let size = num_bits.checked_add(63).unwrap() / 64;
28 let size = usize::try_from(size).unwrap();
29 Self {
30 num_bits,
31 state: std::array::from_fn(|_| new_random_state(rng)),
32 clock: Instant::now(),
33 bits: repeat_with(AtomicU64::default).take(size).collect(),
34 popcount: AtomicU64::default(),
35 _phantom: PhantomData::<T>,
36 }
37 }
38
39 fn false_positive_rate(&self) -> f64 {
40 let popcount = self.popcount.load(Ordering::Relaxed);
41 let ones_ratio = popcount.min(self.num_bits) as f64 / self.num_bits as f64;
42 ones_ratio.powi(K as i32)
43 }
44
45 pub fn maybe_reset<R: Rng>(
49 &mut self,
50 rng: &mut R,
51 false_positive_rate: f64,
52 reset_cycle: Duration,
53 ) -> bool {
54 assert!(0.0 < false_positive_rate && false_positive_rate < 1.0);
55 let saturated = self.false_positive_rate() >= false_positive_rate;
56 if saturated || self.clock.elapsed() >= reset_cycle {
57 self.state = std::array::from_fn(|_| new_random_state(rng));
58 self.clock = Instant::now();
59 self.bits.fill_with(AtomicU64::default);
60 self.popcount = AtomicU64::default();
61 }
62 saturated
63 }
64
65 #[must_use]
67 #[allow(clippy::arithmetic_side_effects)]
68 pub fn dedup(&self, data: &T) -> bool {
69 let mut out = true;
70 let hashers = self.state.iter().map(RandomState::build_hasher);
71 for mut hasher in hashers {
72 data.hash(&mut hasher);
73 let hash: u64 = hasher.finish() % self.num_bits;
74 let index = (hash >> 6) as usize;
75 let mask: u64 = 1u64 << (hash & 63);
76 let old = self.bits[index].fetch_or(mask, Ordering::Relaxed);
77 if old & mask == 0u64 {
78 self.popcount.fetch_add(1, Ordering::Relaxed);
79 out = false;
80 }
81 }
82 out
83 }
84}
85
86fn new_random_state<R: Rng>(rng: &mut R) -> RandomState {
87 RandomState::with_seeds(rng.gen(), rng.gen(), rng.gen(), rng.gen())
88}
89
90pub fn dedup_packets_and_count_discards<const K: usize>(
91 deduper: &Deduper<K, [u8]>,
92 batches: &mut [PacketBatch],
93 mut process_received_packet: impl FnMut(&mut Packet, bool, bool),
94) -> u64 {
95 batches
96 .iter_mut()
97 .flat_map(PacketBatch::iter_mut)
98 .map(|packet| {
99 if packet.meta().discard() {
100 process_received_packet(packet, true, false);
101 } else if packet
102 .data(..)
103 .map(|data| deduper.dedup(data))
104 .unwrap_or(true)
105 {
106 packet.meta_mut().set_discard(true);
107 process_received_packet(packet, false, true);
108 } else {
109 process_received_packet(packet, false, false);
110 }
111 u64::from(packet.meta().discard())
112 })
113 .sum()
114}
115
116#[cfg(test)]
117#[allow(clippy::arithmetic_side_effects)]
118mod tests {
119 use {
120 super::*,
121 crate::{packet::to_packet_batches, sigverify, test_tx::test_tx},
122 rand::SeedableRng,
123 rand_chacha::ChaChaRng,
124 solana_sdk::packet::{Meta, PACKET_DATA_SIZE},
125 test_case::test_case,
126 };
127
128 #[test]
129 fn test_dedup_same() {
130 let tx = test_tx();
131
132 let mut batches =
133 to_packet_batches(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 128);
134 let packet_count = sigverify::count_packets_in_batches(&batches);
135 let mut rng = rand::thread_rng();
136 let filter = Deduper::<2, [u8]>::new(&mut rng, 63_999_979);
137 let mut num_deduped = 0;
138 let discard = dedup_packets_and_count_discards(
139 &filter,
140 &mut batches,
141 |_deduped_packet, _removed_before_sigverify_stage, _is_dup| {
142 num_deduped += 1;
143 },
144 ) as usize;
145 assert_eq!(num_deduped, discard + 1);
146 assert_eq!(packet_count, discard + 1);
147 }
148
149 #[test]
150 fn test_dedup_diff() {
151 let mut rng = rand::thread_rng();
152 let mut filter = Deduper::<2, [u8]>::new(&mut rng, 63_999_979);
153 let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
154 let discard =
155 dedup_packets_and_count_discards(&filter, &mut batches, |_, _, _| ()) as usize;
156 assert_eq!(discard, 0);
158 assert!(!filter.maybe_reset(
159 &mut rng,
160 0.001, Duration::from_millis(0), ));
163 for i in filter.bits {
164 assert_eq!(i.load(Ordering::Relaxed), 0);
165 }
166 }
167
168 fn get_capacity<const K: usize>(num_bits: u64, false_positive_rate: f64) -> u64 {
169 (num_bits as f64 * false_positive_rate.powf(1f64 / K as f64)) as u64
170 }
171
172 #[test]
173 #[ignore]
174 fn test_dedup_saturated() {
175 const NUM_BITS: u64 = 63_999_979;
176 const FALSE_POSITIVE_RATE: f64 = 0.001;
177 let mut rng = rand::thread_rng();
178 let mut filter = Deduper::<2, [u8]>::new(&mut rng, NUM_BITS);
179 let capacity = get_capacity::<2>(NUM_BITS, FALSE_POSITIVE_RATE);
180 let mut discard = 0;
181 assert!(filter.popcount.load(Ordering::Relaxed) < capacity);
182 for i in 0..1000 {
183 let mut batches =
184 to_packet_batches(&(0..1000).map(|_| test_tx()).collect::<Vec<_>>(), 128);
185 discard +=
186 dedup_packets_and_count_discards(&filter, &mut batches, |_, _, _| ()) as usize;
187 trace!("{} {}", i, discard);
188 if filter.popcount.load(Ordering::Relaxed) > capacity {
189 break;
190 }
191 }
192 assert!(filter.popcount.load(Ordering::Relaxed) > capacity);
193 assert!(filter.false_positive_rate() >= FALSE_POSITIVE_RATE);
194 assert!(filter.maybe_reset(
195 &mut rng,
196 FALSE_POSITIVE_RATE,
197 Duration::from_millis(0), ));
199 }
200
201 #[test]
202 fn test_dedup_false_positive() {
203 let mut rng = rand::thread_rng();
204 let filter = Deduper::<2, [u8]>::new(&mut rng, 63_999_979);
205 let mut discard = 0;
206 for i in 0..10 {
207 let mut batches =
208 to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
209 discard +=
210 dedup_packets_and_count_discards(&filter, &mut batches, |_, _, _| ()) as usize;
211 debug!("false positive rate: {}/{}", discard, i * 1024);
212 }
213 assert!(discard < 2);
215 }
216
217 #[test_case(63_999_979, 0.001, 2_023_857)]
218 #[test_case(622_401_961, 0.001, 19_682_078)]
219 #[test_case(622_401_979, 0.001, 19_682_078)]
220 #[test_case(629_145_593, 0.001, 19_895_330)]
221 #[test_case(632_455_543, 0.001, 20_000_000)]
222 #[test_case(637_534_199, 0.001, 20_160_601)]
223 #[test_case(622_401_961, 0.0001, 6_224_019)]
224 #[test_case(622_401_979, 0.0001, 6_224_019)]
225 #[test_case(629_145_593, 0.0001, 6_291_455)]
226 #[test_case(632_455_543, 0.0001, 6_324_555)]
227 #[test_case(637_534_199, 0.0001, 6_375_341)]
228 fn test_dedup_capacity(num_bits: u64, false_positive_rate: f64, capacity: u64) {
229 let mut rng = rand::thread_rng();
230 assert_eq!(get_capacity::<2>(num_bits, false_positive_rate), capacity);
231 let mut deduper = Deduper::<2, [u8]>::new(&mut rng, num_bits);
232 assert_eq!(deduper.false_positive_rate(), 0.0);
233 deduper.popcount.store(capacity, Ordering::Relaxed);
234 assert!(deduper.false_positive_rate() < false_positive_rate);
235 deduper.popcount.store(capacity + 1, Ordering::Relaxed);
236 assert!(deduper.false_positive_rate() >= false_positive_rate);
237 assert!(deduper.maybe_reset(
238 &mut rng,
239 false_positive_rate,
240 Duration::from_millis(0), ));
242 }
243
244 #[test_case([0xf9; 32], 3_199_997, 101_192, 51_414, 77, 101_083)]
245 #[test_case([0xdc; 32], 3_200_003, 101_192, 51_414, 64, 101_097)]
246 #[test_case([0xa5; 32], 6_399_971, 202_384, 102_828, 117, 202_257)]
247 #[test_case([0xdb; 32], 6_400_013, 202_386, 102_828, 135, 202_254)]
248 #[test_case([0xcd; 32], 12_799_987, 404_771, 205_655, 273, 404_521)]
249 #[test_case([0xc3; 32], 12_800_009, 404_771, 205_656, 283, 404_365)]
250 fn test_dedup_seeded(
251 seed: [u8; 32],
252 num_bits: u64,
253 capacity: u64,
254 num_packets: usize,
255 num_dups: usize,
256 popcount: u64,
257 ) {
258 const FALSE_POSITIVE_RATE: f64 = 0.001;
259 let mut rng = ChaChaRng::from_seed(seed);
260 let mut deduper = Deduper::<2, [u8]>::new(&mut rng, num_bits);
261 assert_eq!(get_capacity::<2>(num_bits, FALSE_POSITIVE_RATE), capacity);
262 let mut packet = Packet::new([0u8; PACKET_DATA_SIZE], Meta::default());
263 let mut dup_count = 0usize;
264 for _ in 0..num_packets {
265 let size = rng.gen_range(0..PACKET_DATA_SIZE);
266 packet.meta_mut().size = size;
267 rng.fill(&mut packet.buffer_mut()[0..size]);
268 if deduper.dedup(packet.data(..).unwrap()) {
269 dup_count += 1;
270 }
271 assert!(deduper.dedup(packet.data(..).unwrap()));
272 }
273 assert_eq!(dup_count, num_dups);
274 assert_eq!(deduper.popcount.load(Ordering::Relaxed), popcount);
275 assert!(deduper.false_positive_rate() < FALSE_POSITIVE_RATE);
276 assert!(!deduper.maybe_reset(
277 &mut rng,
278 FALSE_POSITIVE_RATE,
279 Duration::from_millis(0), ));
281 }
282}