laminar_core/sink/
dedup.rs1#![allow(clippy::cast_possible_truncation)]
4
5use std::collections::VecDeque;
6use std::hash::{Hash, Hasher};
7
8use fxhash::FxHashSet;
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub struct RecordId {
15 bytes: Vec<u8>,
17}
18
19impl RecordId {
20 #[must_use]
22 pub fn from_bytes(bytes: &[u8]) -> Self {
23 Self {
24 bytes: bytes.to_vec(),
25 }
26 }
27
28 #[must_use]
30 pub fn from_string(s: &str) -> Self {
31 Self {
32 bytes: s.as_bytes().to_vec(),
33 }
34 }
35
36 #[must_use]
38 pub fn from_u64(value: u64) -> Self {
39 Self {
40 bytes: value.to_le_bytes().to_vec(),
41 }
42 }
43
44 #[must_use]
48 pub fn from_hash(data: &[u8]) -> Self {
49 use fxhash::FxHasher;
50 let mut hasher = FxHasher::default();
51 data.hash(&mut hasher);
52 let hash = hasher.finish();
53 Self::from_u64(hash)
54 }
55
56 #[must_use]
58 pub fn composite(parts: &[&[u8]]) -> Self {
59 let mut bytes = Vec::new();
60 for part in parts {
61 bytes.extend_from_slice(&(part.len() as u32).to_le_bytes());
62 bytes.extend_from_slice(part);
63 }
64 Self::from_hash(&bytes)
65 }
66
67 #[must_use]
69 pub fn as_bytes(&self) -> &[u8] {
70 &self.bytes
71 }
72}
73
74pub trait DeduplicationStore: Send {
76 fn is_new(&self, id: &RecordId) -> bool;
78
79 fn mark_seen(&mut self, id: RecordId);
81
82 fn mark_seen_batch(&mut self, ids: impl IntoIterator<Item = RecordId>) {
84 for id in ids {
85 self.mark_seen(id);
86 }
87 }
88
89 fn filter_new<'a>(&self, ids: impl IntoIterator<Item = &'a RecordId>) -> Vec<&'a RecordId> {
91 ids.into_iter().filter(|id| self.is_new(id)).collect()
92 }
93
94 fn len(&self) -> usize;
96
97 fn is_empty(&self) -> bool {
99 self.len() == 0
100 }
101
102 fn clear(&mut self);
104
105 fn to_bytes(&self) -> Vec<u8>;
107
108 fn restore(&mut self, bytes: &[u8]) -> Result<(), String>;
114}
115
116pub struct InMemoryDedup {
124 seen: FxHashSet<RecordId>,
126
127 order: VecDeque<RecordId>,
129
130 capacity: usize,
132}
133
134impl InMemoryDedup {
135 #[must_use]
137 pub fn new(capacity: usize) -> Self {
138 Self {
139 seen: FxHashSet::default(),
140 order: VecDeque::with_capacity(capacity.min(10000)),
141 capacity,
142 }
143 }
144
145 #[must_use]
147 pub fn capacity(&self) -> usize {
148 self.capacity
149 }
150
151 fn evict_if_needed(&mut self) {
153 while self.order.len() >= self.capacity {
154 if let Some(old_id) = self.order.pop_front() {
155 self.seen.remove(&old_id);
156 }
157 }
158 }
159}
160
161impl DeduplicationStore for InMemoryDedup {
162 fn is_new(&self, id: &RecordId) -> bool {
163 !self.seen.contains(id)
164 }
165
166 fn mark_seen(&mut self, id: RecordId) {
167 if self.seen.insert(id.clone()) {
168 self.order.push_back(id);
170 self.evict_if_needed();
171 }
172 }
173
174 fn len(&self) -> usize {
175 self.seen.len()
176 }
177
178 fn clear(&mut self) {
179 self.seen.clear();
180 self.order.clear();
181 }
182
183 fn to_bytes(&self) -> Vec<u8> {
184 let mut bytes = Vec::new();
185
186 bytes.push(1u8);
188
189 bytes.extend_from_slice(&(self.capacity as u64).to_le_bytes());
191
192 bytes.extend_from_slice(&(self.order.len() as u32).to_le_bytes());
194
195 for id in &self.order {
197 let id_bytes = id.as_bytes();
198 bytes.extend_from_slice(&(id_bytes.len() as u32).to_le_bytes());
199 bytes.extend_from_slice(id_bytes);
200 }
201
202 bytes
203 }
204
205 fn restore(&mut self, bytes: &[u8]) -> Result<(), String> {
206 if bytes.is_empty() {
207 return Err("Empty dedup data".to_string());
208 }
209
210 let mut pos = 0;
211
212 let version = bytes[pos];
214 pos += 1;
215 if version != 1 {
216 return Err(format!("Unsupported dedup version: {version}"));
217 }
218
219 if pos + 8 > bytes.len() {
221 return Err("Unexpected end of data (capacity)".to_string());
222 }
223 let capacity = u64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap()) as usize;
224 pos += 8;
225 self.capacity = capacity;
226
227 if pos + 4 > bytes.len() {
229 return Err("Unexpected end of data (count)".to_string());
230 }
231 let num_entries = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
232 pos += 4;
233
234 self.clear();
236
237 for _ in 0..num_entries {
238 if pos + 4 > bytes.len() {
239 return Err("Unexpected end of data (entry length)".to_string());
240 }
241 let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
242 pos += 4;
243
244 if pos + len > bytes.len() {
245 return Err("Unexpected end of data (entry data)".to_string());
246 }
247 let id = RecordId::from_bytes(&bytes[pos..pos + len]);
248 pos += len;
249
250 self.seen.insert(id.clone());
251 self.order.push_back(id);
252 }
253
254 Ok(())
255 }
256}
257
258#[allow(dead_code)] pub struct BloomFilterDedup {
265 bits: Vec<u64>,
267
268 num_hashes: usize,
270
271 num_bits: usize,
273
274 count: usize,
276}
277
278#[allow(dead_code)] impl BloomFilterDedup {
280 #[must_use]
287 #[allow(
288 clippy::cast_sign_loss,
289 clippy::cast_precision_loss,
290 clippy::cast_possible_truncation
291 )]
292 pub fn new(expected_elements: usize, false_positive_rate: f64) -> Self {
293 let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
296 let num_bits =
297 (-(expected_elements as f64) * false_positive_rate.ln() / ln2_squared).ceil() as usize;
298 let num_bits = num_bits.max(64); let num_hashes =
303 ((num_bits as f64 / expected_elements as f64) * std::f64::consts::LN_2).ceil() as usize;
304 let num_hashes = num_hashes.clamp(1, 16);
305
306 let num_words = num_bits.div_ceil(64);
307
308 Self {
309 bits: vec![0u64; num_words],
310 num_hashes,
311 num_bits,
312 count: 0,
313 }
314 }
315
316 fn get_indices(&self, id: &RecordId) -> Vec<usize> {
318 use fxhash::FxHasher;
319
320 let mut indices = Vec::with_capacity(self.num_hashes);
321
322 let mut hasher1 = FxHasher::default();
324 id.bytes.hash(&mut hasher1);
325 let h1 = hasher1.finish() as usize;
326
327 let mut hasher2 = FxHasher::default();
328 hasher1.finish().hash(&mut hasher2);
329 let h2 = hasher2.finish() as usize;
330
331 for i in 0..self.num_hashes {
332 let index = (h1.wrapping_add(i.wrapping_mul(h2))) % self.num_bits;
333 indices.push(index);
334 }
335
336 indices
337 }
338
339 fn might_contain(&self, id: &RecordId) -> bool {
341 let indices = self.get_indices(id);
342 for idx in indices {
343 let word = idx / 64;
344 let bit = idx % 64;
345 if self.bits[word] & (1u64 << bit) == 0 {
346 return false;
347 }
348 }
349 true
350 }
351
352 fn add(&mut self, id: &RecordId) {
354 let indices = self.get_indices(id);
355 for idx in indices {
356 let word = idx / 64;
357 let bit = idx % 64;
358 self.bits[word] |= 1u64 << bit;
359 }
360 self.count += 1;
361 }
362
363 #[must_use]
365 #[allow(clippy::cast_precision_loss)]
366 pub fn false_positive_rate(&self) -> f64 {
367 let k = self.num_hashes as f64;
369 let n = self.count as f64;
370 let m = self.num_bits as f64;
371 (1.0 - (-k * n / m).exp()).powf(k)
372 }
373}
374
375impl DeduplicationStore for BloomFilterDedup {
376 fn is_new(&self, id: &RecordId) -> bool {
377 !self.might_contain(id)
378 }
379
380 fn mark_seen(&mut self, id: RecordId) {
381 self.add(&id);
382 }
383
384 fn len(&self) -> usize {
385 self.count
386 }
387
388 fn clear(&mut self) {
389 self.bits.fill(0);
390 self.count = 0;
391 }
392
393 fn to_bytes(&self) -> Vec<u8> {
394 let mut bytes = Vec::new();
395
396 bytes.push(2u8); bytes.extend_from_slice(&(self.num_bits as u64).to_le_bytes());
401 bytes.extend_from_slice(&(self.num_hashes as u32).to_le_bytes());
402 bytes.extend_from_slice(&(self.count as u64).to_le_bytes());
403
404 bytes.extend_from_slice(&(self.bits.len() as u32).to_le_bytes());
406 for word in &self.bits {
407 bytes.extend_from_slice(&word.to_le_bytes());
408 }
409
410 bytes
411 }
412
413 fn restore(&mut self, bytes: &[u8]) -> Result<(), String> {
414 if bytes.is_empty() {
415 return Err("Empty bloom filter data".to_string());
416 }
417
418 let mut pos = 0;
419
420 let version = bytes[pos];
422 pos += 1;
423 if version != 2 {
424 return Err(format!("Unsupported bloom filter version: {version}"));
425 }
426
427 if pos + 20 > bytes.len() {
429 return Err("Unexpected end of data (parameters)".to_string());
430 }
431 self.num_bits = u64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap()) as usize;
432 pos += 8;
433 self.num_hashes = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
434 pos += 4;
435 self.count = u64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap()) as usize;
436 pos += 8;
437
438 if pos + 4 > bytes.len() {
440 return Err("Unexpected end of data (bits length)".to_string());
441 }
442 let num_words = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
443 pos += 4;
444
445 if pos + num_words * 8 > bytes.len() {
446 return Err("Unexpected end of data (bits)".to_string());
447 }
448
449 self.bits = Vec::with_capacity(num_words);
450 for _ in 0..num_words {
451 let word = u64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap());
452 self.bits.push(word);
453 pos += 8;
454 }
455
456 Ok(())
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463
464 #[test]
465 fn test_record_id_from_bytes() {
466 let id = RecordId::from_bytes(b"test-id");
467 assert_eq!(id.as_bytes(), b"test-id");
468 }
469
470 #[test]
471 fn test_record_id_from_u64() {
472 let id = RecordId::from_u64(12345);
473 assert_eq!(id.as_bytes().len(), 8);
474 }
475
476 #[test]
477 fn test_record_id_from_hash() {
478 let id1 = RecordId::from_hash(b"some data");
479 let id2 = RecordId::from_hash(b"some data");
480 let id3 = RecordId::from_hash(b"other data");
481
482 assert_eq!(id1, id2); assert_ne!(id1, id3); }
485
486 #[test]
487 fn test_record_id_composite() {
488 let id1 = RecordId::composite(&[b"table", b"key1"]);
489 let id2 = RecordId::composite(&[b"table", b"key1"]);
490 let id3 = RecordId::composite(&[b"table", b"key2"]);
491
492 assert_eq!(id1, id2);
493 assert_ne!(id1, id3);
494 }
495
496 #[test]
497 fn test_in_memory_dedup_basic() {
498 let mut dedup = InMemoryDedup::new(1000);
499
500 let id = RecordId::from_string("test");
501 assert!(dedup.is_new(&id));
502
503 dedup.mark_seen(id.clone());
504 assert!(!dedup.is_new(&id));
505
506 assert_eq!(dedup.len(), 1);
507 }
508
509 #[test]
510 fn test_in_memory_dedup_capacity() {
511 let mut dedup = InMemoryDedup::new(3);
512
513 for i in 0..5 {
514 let id = RecordId::from_u64(i);
515 dedup.mark_seen(id);
516 }
517
518 assert!(dedup.len() <= 3);
520
521 assert!(!dedup.is_new(&RecordId::from_u64(4)));
523 }
524
525 #[test]
526 fn test_in_memory_dedup_serialization() {
527 let mut dedup = InMemoryDedup::new(100);
528 dedup.mark_seen(RecordId::from_string("id1"));
529 dedup.mark_seen(RecordId::from_string("id2"));
530
531 let bytes = dedup.to_bytes();
532
533 let mut restored = InMemoryDedup::new(1); restored.restore(&bytes).unwrap();
535
536 assert_eq!(restored.capacity(), 100);
537 assert_eq!(restored.len(), 2);
538 assert!(!restored.is_new(&RecordId::from_string("id1")));
539 assert!(!restored.is_new(&RecordId::from_string("id2")));
540 }
541
542 #[test]
543 fn test_in_memory_dedup_filter_new() {
544 let mut dedup = InMemoryDedup::new(100);
545 dedup.mark_seen(RecordId::from_u64(1));
546 dedup.mark_seen(RecordId::from_u64(3));
547
548 let ids = [
549 RecordId::from_u64(1),
550 RecordId::from_u64(2),
551 RecordId::from_u64(3),
552 RecordId::from_u64(4),
553 ];
554
555 let new_ids: Vec<_> = dedup.filter_new(ids.iter()).into_iter().cloned().collect();
556 assert_eq!(new_ids.len(), 2);
557 assert!(new_ids.contains(&RecordId::from_u64(2)));
558 assert!(new_ids.contains(&RecordId::from_u64(4)));
559 }
560
561 #[test]
562 fn test_bloom_filter_basic() {
563 let mut bloom = BloomFilterDedup::new(1000, 0.01);
564
565 let id = RecordId::from_string("test");
566 assert!(bloom.is_new(&id));
567
568 bloom.mark_seen(id.clone());
569 assert!(!bloom.is_new(&id));
570 }
571
572 #[test]
573 fn test_bloom_filter_no_false_negatives() {
574 let mut bloom = BloomFilterDedup::new(100, 0.01);
575
576 for i in 0..100 {
578 let id = RecordId::from_u64(i);
579 bloom.mark_seen(id);
580 }
581
582 for i in 0..100 {
584 let id = RecordId::from_u64(i);
585 assert!(!bloom.is_new(&id), "False negative for id {i}");
586 }
587 }
588
589 #[test]
590 fn test_bloom_filter_serialization() {
591 let mut bloom = BloomFilterDedup::new(100, 0.01);
592 bloom.mark_seen(RecordId::from_string("id1"));
593 bloom.mark_seen(RecordId::from_string("id2"));
594
595 let bytes = bloom.to_bytes();
596
597 let mut restored = BloomFilterDedup::new(10, 0.1); restored.restore(&bytes).unwrap();
599
600 assert!(!restored.is_new(&RecordId::from_string("id1")));
601 assert!(!restored.is_new(&RecordId::from_string("id2")));
602 }
603}