cl_tds/lib.rs
1// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2// Copyright 2026 Deendayal Kumawat <deendayal_kumawat@outlook.com>
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13// implied. See the License for the specific language governing
14// permissions and limitations under the License.
15// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
16
17//! # CL-TDS
18//!
19//! **Cache-Locked Temporal Decay Sketch** — a fixed-memory, lock-free streaming
20//! algorithm for detecting heavy hitters in real-time data streams.
21//!
22//! CL-TDS answers one question: *"What items are appearing most frequently
23//! in this stream, right now?"* — using only **1 MB** of memory, with old
24//! data fading automatically.
25//!
26//! # Getting Started
27//!
28//! Add to your `Cargo.toml`:
29//! ```toml
30//! [dependencies]
31//! cl-tds = { git = "https://github.com/ddsha441981/cl-tds" }
32//! ```
33//!
34//! Basic usage:
35//! ```rust
36//! use cl_tds::ClTds;
37//!
38//! let sketch = ClTds::new();
39//!
40//! // Record items (accepts any u64 hash)
41//! sketch.increment(0xDEAD_BEEF);
42//! sketch.increment(0xDEAD_BEEF);
43//! sketch.increment(0xCAFE_BABE);
44//!
45//! // Query frequency estimates
46//! assert!(sketch.query(0xDEAD_BEEF) >= 2);
47//! assert!(sketch.query(0xCAFE_BABE) >= 1);
48//! ```
49//!
50//! # How To Use
51//!
52//! CL-TDS is a **library**. You integrate it into your application and feed it
53//! your data. The algorithm does not care where data comes from — network packets,
54//! log files, database queries, Kafka streams, or anything else.
55//!
56//! ### Step 1: Hash your data into `u64`
57//!
58//! CL-TDS operates on `u64` identifiers. Hash your domain objects:
59//!
60//! ```rust
61//! use std::hash::{Hash, Hasher};
62//! use std::collections::hash_map::DefaultHasher;
63//!
64//! fn to_id<T: Hash>(item: &T) -> u64 {
65//! let mut h = DefaultHasher::new();
66//! item.hash(&mut h);
67//! h.finish()
68//! }
69//!
70//! // Works with any type:
71//! // to_id(&"192.168.1.1") → for IP addresses
72//! // to_id(&"POST /api/pay") → for API endpoints
73//! // to_id(&"#trending") → for hashtags
74//! // to_id(&12345u64) → for user IDs
75//! ```
76//!
77//! ### Step 2: Choose your mode
78//!
79//! ```rust
80//! use cl_tds::ClTds;
81//!
82//! // Manual mode — you control when data decays (good for batch processing)
83//! let sketch = ClTds::new();
84//! sketch.increment(42);
85//! sketch.tick_epoch(); // data halves each tick
86//!
87//! // Auto mode — data decays based on wall clock (good for real-time)
88//! let sketch = ClTds::with_epoch_interval(1000); // decay every 1 second
89//! sketch.increment(42);
90//! // count automatically halves every second — no manual ticking needed
91//! ```
92//!
93//! ### Step 3: Feed your stream and query
94//!
95//! ```rust
96//! use cl_tds::ClTds;
97//!
98//! let sketch = ClTds::with_epoch_interval(5000); // decay every 5s
99//!
100//! // Your data loop (network, logs, events — anything)
101//! # let packets: Vec<u64> = vec![1, 2, 1, 1, 3];
102//! for packet in packets {
103//! sketch.increment(packet);
104//! }
105//!
106//! // Check if something is a heavy hitter
107//! let threshold = 1000;
108//! # let suspect: u64 = 1;
109//! if sketch.query(suspect) > threshold {
110//! // alert! this item appeared too frequently
111//! }
112//! ```
113//!
114//! # Real-World Integration Patterns
115//!
116//! All patterns use the same core:
117//! `sketch.increment(hash)` to record, `sketch.query(hash)` to check.
118//! Only the **data source** and **threshold** change per domain.
119//!
120//! ### Network Security — DDoS Detection
121//! ```rust,no_run
122//! # use cl_tds::ClTds; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher};
123//! # fn hash_str(s: &str) -> u64 { let mut h = DefaultHasher::new(); s.hash(&mut h); h.finish() }
124//! let sketch = ClTds::with_epoch_interval(1000); // 1s window
125//! // Feed source IPs from packet capture
126//! sketch.increment(hash_str("1.2.3.4"));
127//! if sketch.query(hash_str("1.2.3.4")) > 10_000 { /* DDoS alert */ }
128//! ```
129//!
130//! ### API Rate Limiting
131//! ```rust,no_run
132//! # use cl_tds::ClTds;
133//! # let user_id: u64 = 1;
134//! let limiter = ClTds::with_epoch_interval(60_000); // 1-min window
135//! // Feed user IDs from request handler
136//! limiter.increment(user_id);
137//! if limiter.query(user_id) > 100 { /* throttle: >100 req/min */ }
138//! ```
139//!
140//! ### Log Analysis — Error Surge Detection
141//! ```rust,no_run
142//! # use cl_tds::ClTds; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher};
143//! # fn hash_str(s: &str) -> u64 { let mut h = DefaultHasher::new(); s.hash(&mut h); h.finish() }
144//! let monitor = ClTds::with_epoch_interval(10_000); // 10s window
145//! // Feed endpoint+status from log pipeline
146//! monitor.increment(hash_str("POST /api/checkout 500"));
147//! if monitor.query(hash_str("POST /api/checkout 500")) > 50 { /* alert */ }
148//! ```
149//!
150//! ### Gaming — Anti-Cheat
151//! ```rust,no_run
152//! # use cl_tds::ClTds;
153//! # let player_id: u64 = 1;
154//! let detector = ClTds::with_epoch_interval(5_000); // 5s window
155//! // Feed player action events
156//! detector.increment(player_id);
157//! if detector.query(player_id) > 500 { /* abnormal speed — possible cheat */ }
158//! ```
159//!
160//! ### Telecom — SIM-Box Fraud
161//! ```rust,no_run
162//! # use cl_tds::ClTds; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher};
163//! # fn hash_str(s: &str) -> u64 { let mut h = DefaultHasher::new(); s.hash(&mut h); h.finish() }
164//! let cdr = ClTds::with_epoch_interval(3600_000); // 1-hour window
165//! // Feed caller numbers from CDR stream
166//! cdr.increment(hash_str("+91-9999000001"));
167//! if cdr.query(hash_str("+91-9999000001")) > 1_000 { /* fraud alert */ }
168//! ```
169//!
170//! ### IoT — Faulty Sensor Detection
171//! ```rust,no_run
172//! # use cl_tds::ClTds;
173//! # let sensor_id: u64 = 42;
174//! let sensors = ClTds::with_epoch_interval(60_000); // 1-min window
175//! // Feed sensor alert events
176//! sensors.increment(sensor_id);
177//! if sensors.query(sensor_id) > 100 { /* sensor #42 firing too often */ }
178//! ```
179//!
180//! ### Social Media — Trending Hashtags
181//! ```rust,no_run
182//! # use cl_tds::ClTds; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher};
183//! # fn hash_str(s: &str) -> u64 { let mut h = DefaultHasher::new(); s.hash(&mut h); h.finish() }
184//! let trends = ClTds::with_epoch_interval(30_000); // 30s window
185//! // Feed hashtags from post stream
186//! trends.increment(hash_str("#BreakingNews"));
187//! if trends.query(hash_str("#BreakingNews")) > 5_000 { /* trending! */ }
188//! ```
189//!
190//! ### Fintech — Card Fraud
191//! ```rust,no_run
192//! # use cl_tds::ClTds; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher};
193//! # fn hash_str(s: &str) -> u64 { let mut h = DefaultHasher::new(); s.hash(&mut h); h.finish() }
194//! let fraud = ClTds::with_epoch_interval(3600_000); // 1-hour window
195//! // Feed card numbers from transaction stream
196//! fraud.increment(hash_str("card-XXXX-4567"));
197//! if fraud.query(hash_str("card-XXXX-4567")) > 50 { /* suspicious activity */ }
198//! ```
199//!
200//! **Same pattern works for all 17 proven domains** — DNS monitoring, CDN caching,
201//! click fraud, spam detection, search query tracking, smart grid monitoring, and more.
202//! See `Applicable Domains` section for the full list.
203//!
204//! # Key Properties
205//!
206//! | Property | Value |
207//! |----------|-------|
208//! | Memory | Fixed 1 MB (4 rows × 65536 cols × 4 bytes) |
209//! | Operations | `increment()` O(1), `query()` O(1) |
210//! | Thread safety | Lock-free atomic CAS — safe from any number of threads |
211//! | Decay | Lazy — O(1) per touch, no background thread |
212//! | Dependencies | Zero — pure `std::sync::atomic` |
213//! | Error bound (ε) | ≈ 0.0000414 (overcount per stream item) |
214//! | Failure probability (δ) | ≤ 1.8% (4 independent rows) |
215//! | Persistence | [`ClTds::to_bytes`] / [`ClTds::from_bytes`] |
216//!
217//! # Multi-Instance Scaling
218//!
219//! Each sketch uses exactly **1 MB**. A typical CPU L3 cache is **6–12 MB**,
220//! so you can run **5 independent sketches** simultaneously — each monitoring
221//! a different stream — and all fit in L3 cache without evicting each other.
222//!
223//! ```rust
224//! use cl_tds::ClTds;
225//! use std::sync::Arc;
226//! use std::thread;
227//!
228//! // 5 sketches × 1 MB = 5 MB — fits in any modern L3 cache
229//! let sketches: Vec<Arc<ClTds>> = (0..5)
230//! .map(|_| Arc::new(ClTds::with_epoch_interval(1000)))
231//! .collect();
232//!
233//! // Each thread monitors a different domain — zero contention
234//! # let sketches_clone: Vec<Arc<ClTds>> = sketches.iter().map(|s| Arc::clone(s)).collect();
235//! # let handles: Vec<_> = sketches_clone.into_iter().enumerate().map(|(i, sketch)| {
236//! // thread::spawn(move || {
237//! // // Thread 0: Network DDoS monitoring
238//! // // Thread 1: API rate limiting
239//! // // Thread 2: Log error tracking
240//! // // Thread 3: Click fraud detection
241//! // // Thread 4: DNS query monitoring
242//! // sketch.increment(i as u64);
243//! // });
244//! # thread::spawn(move || { sketch.increment(i as u64); })
245//! # }).collect();
246//! # for h in handles { h.join().unwrap(); }
247//! ```
248//!
249//! **Benchmarked:** 5 threads running concurrently, each processing its own
250//! domain (Network, Crypto, Analytics, Gaming, IP Flood) — all maintaining
251//! full throughput with zero speed loss.
252//!
253//! # Bring Your Own Data
254//!
255//! CL-TDS accepts any `u64` hash, so you can feed it data from **any source** —
256//! CSV files, JSON logs, database streams, Kafka topics, or raw sockets.
257//!
258//! Read from a file:
259//!
260//! ```rust,no_run
261//! use cl_tds::ClTds;
262//! use std::io::{BufRead, BufReader};
263//! use std::fs::File;
264//! use std::collections::hash_map::DefaultHasher;
265//! use std::hash::{Hash, Hasher};
266//!
267//! fn hash_line(s: &str) -> u64 {
268//! let mut h = DefaultHasher::new();
269//! s.hash(&mut h);
270//! h.finish()
271//! }
272//!
273//! let sketch = ClTds::with_epoch_interval(1000);
274//! let file = File::open("traffic.csv").unwrap();
275//!
276//! for line in BufReader::new(file).lines() {
277//! if let Ok(item) = line {
278//! sketch.increment(hash_line(&item));
279//! }
280//! }
281//!
282//! // Now query any item's frequency
283//! println!("Frequency: {}", sketch.query(hash_line("suspicious_item")));
284//! ```
285//!
286//! # Applicable Domains
287//!
288//! CL-TDS works for any domain that needs: **heavy hitter detection** in a
289//! **continuous stream** with **bounded memory** and **temporal decay**.
290//!
291//! **17 domains benchmarked and proven:** Network security (DDoS), financial
292//! trading (HFT), web analytics, gaming anti-cheat, DNS monitoring, API rate
293//! limiting, click fraud, CDN caching, telecom fraud, IoT sensors, log analysis,
294//! spam detection, social trending, search tracking, smart grid, fintech fraud,
295//! and IP flood detection.
296//!
297//! **Not suitable for:** Exact counting (ledgers/voting), listing unique items,
298//! item deletion (GDPR), small datasets (< 100K — use `HashMap`),
299//! relationship or sequence detection.
300//!
301//! # CL-TDS vs HashMap
302//!
303//! | Unique Items | HashMap | CL-TDS | Savings |
304//! |---|---|---|---|
305//! | 100K | 1.9 MB | 1 MB | 2x |
306//! | 1M | 29.8 MB | 1 MB | **30x** |
307//! | 5M | 119 MB | 1 MB | **119x** |
308//! | 10M | 238 MB | 1 MB | **238x** |
309//!
310//! CL-TDS is also **1.8x faster on insert** and **4.8x faster on query**
311//! because it fits entirely in L3 cache. Plus HashMap can't do temporal
312//! decay — stale data stays forever.
313//!
314//! # Testing
315//!
316//! ```bash
317//! cargo test # 40 unit tests + 14 doc tests
318//! cargo run --example basic # Insert/query demo
319//! cargo run --example decay # Temporal decay visualization
320//! cargo run --example multithread # Concurrent stress test
321//! cargo run --example persistence # Save/restore to disk
322//! ```
323use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
324use std::time::Instant;
325
326/// Matrix width — columns per row. 2^16 = 65536.
327pub const WIDTH: usize = 1 << 16;
328
329/// Matrix depth — number of independent hash rows.
330/// δ = e^{-DEPTH} ≈ 1.8% with DEPTH=4.
331pub const DEPTH: usize = 4;
332
333/// Bits allocated to timestamp in each cell.
334const TS_BITS: u32 = 8;
335
336/// Bits allocated to frequency counter in each cell.
337const COUNT_BITS: u32 = 24;
338
339/// Bitmask for extracting counter (lower 24 bits).
340const COUNT_MASK: u32 = (1 << COUNT_BITS) - 1; // 0x00FF_FFFF
341
342/// Maximum counter value before saturation.
343pub const MAX_COUNT: u32 = COUNT_MASK; // 16,777,215
344
345/// Maximum meaningful decay shifts (beyond this, count = 0).
346const MAX_DECAY: u32 = COUNT_BITS;
347
348/// Timestamp mask (lower 8 bits of epoch).
349const TS_MASK: u32 = (1 << TS_BITS) - 1; // 0xFF
350
351/// Packs a timestamp (upper 8 bits) and a counter (lower 24 bits) into one `u32` cell.
352#[inline(always)]
353pub fn pack(timestamp: u32, count: u32) -> u32 {
354 ((timestamp & TS_MASK) << COUNT_BITS) | (count & COUNT_MASK)
355}
356
357/// Extracts the timestamp and counter from a packed `u32` cell.
358#[inline(always)]
359pub fn unpack(cell: u32) -> (u32, u32) {
360 let timestamp = cell >> COUNT_BITS;
361 let count = cell & COUNT_MASK;
362 (timestamp, count)
363}
364
365/// Calculates how many epochs have passed since a cell was last touched.
366/// Handles 8-bit timestamp wraparound. Result is capped at 24
367/// because shifting a 24-bit counter more than 24 times always gives zero.
368#[inline(always)]
369pub fn decay_steps(cell_ts: u32, current_epoch: u64) -> u32 {
370 let epoch_low = (current_epoch & TS_MASK as u64) as u32;
371 let diff = epoch_low.wrapping_sub(cell_ts) & TS_MASK;
372 diff.min(MAX_DECAY)
373}
374
375/// Halves the count value `steps` times (right-shift by `steps`).
376#[inline(always)]
377fn apply_decay(count: u32, steps: u32) -> u32 {
378 if steps >= MAX_DECAY {
379 0
380 } else {
381 count >> steps
382 }
383}
384
385/// Fixed hash constants used only in deterministic/testing mode.
386const DEFAULT_HASH_A: [u64; DEPTH] = [
387 0x9e3779b97f4a7c15, // golden ratio derivative
388 0x517cc1b727220a95, // FxHash constant
389 0x6c62272e07bb0142, // splitmix64 constant
390 0xbf58476d1ce4e5b9, // splitmix64 constant 2
391];
392
393const DEFAULT_HASH_B: [u64; DEPTH] = [
394 0xd2a98b26625eee7b,
395 0x94d049bb133111eb,
396 0xc4ceb9fe1a85ec53,
397 0xe7037ed1a0b428db,
398];
399
400/// Generates unique random hash seeds using system entropy.
401/// Each sketch instance gets different seeds, making it resistant to targeted attacks.
402fn random_hash_params() -> ([u64; DEPTH], [u64; DEPTH]) {
403 use std::collections::hash_map::RandomState;
404 use std::hash::{BuildHasher, Hasher};
405
406 let mut a = [0u64; DEPTH];
407 let mut b = [0u64; DEPTH];
408
409 for i in 0..DEPTH {
410 let state = RandomState::new();
411 let mut hasher = state.build_hasher();
412 hasher.write_usize(i);
413 a[i] = hasher.finish() | 1; // ensure odd (better distribution)
414
415 let state2 = RandomState::new();
416 let mut hasher2 = state2.build_hasher();
417 hasher2.write_usize(i + DEPTH);
418 b[i] = hasher2.finish();
419 }
420
421 (a, b)
422}
423
424/// Maps an item ID to a column index (0..65535) for the given row.
425/// Uses the universal hash formula: `h(x) = (A·x + B) >> 48`.
426#[inline(always)]
427fn compute_hash(id: u64, row: usize, hash_a: &[u64; DEPTH], hash_b: &[u64; DEPTH]) -> usize {
428 let h = hash_a[row].wrapping_mul(id).wrapping_add(hash_b[row]);
429 (h >> 48) as usize // top 16 bits
430}
431
432/// One row of the sketch matrix (65536 atomic counters, cache-line aligned).
433#[repr(align(64))]
434pub struct Row([AtomicU32; WIDTH]);
435
436/// The main sketch data structure. Uses 1 MB of memory to track item
437/// frequencies in a stream, with old data fading out automatically.
438///
439/// All operations are lock-free and safe to call from multiple threads.
440///
441/// # Guarantees
442///
443/// - **Error bound:** estimated count can overcount by at most ε·N (ε ≈ 0.00004)
444/// - **False positive rate:** at most 1.8% chance of misidentifying a non-heavy-hitter
445/// - **Lazy decay:** produces the exact same result as decaying every cell every epoch
446///
447/// # Mathematical Formulation
448///
449/// ```text
450/// Theorem 1 (Error): E[query(x)] ≤ f_t(x) + ε · N_effective(t), ε = e/65536
451/// Theorem 2 (FP): P[query(x) > φ·N_eff] ≤ δ = e^{-4} ≈ 1.8%
452/// Theorem 3 (Decay): V_lazy(C) = V_full(C) (exact equality at touch time)
453/// ```
454pub struct ClTds {
455 /// 4 independent hash rows, each 256 KB = total 1 MB.
456 rows: Box<[Row; DEPTH]>,
457 /// Global monotonic epoch counter (manual mode).
458 epoch: AtomicU64,
459 /// Creation timestamp (auto mode). None = manual mode.
460 created_at: Option<Instant>,
461 /// Epoch interval in milliseconds (auto mode).
462 epoch_interval_ms: u64,
463 /// Per-instance hash multiplicative constants (adversarial resistance).
464 hash_a: [u64; DEPTH],
465 /// Per-instance hash additive constants (adversarial resistance).
466 hash_b: [u64; DEPTH],
467}
468
469impl Default for ClTds {
470 fn default() -> Self {
471 Self::new()
472 }
473}
474
475impl ClTds {
476 /// Creates a new sketch in manual mode. You control decay by calling
477 /// [`tick_epoch()`](Self::tick_epoch) yourself. Hash seeds are randomized.
478 pub fn new() -> Self {
479 let (a, b) = random_hash_params();
480 Self::alloc(None, 0, a, b)
481 }
482
483 /// Creates a sketch with fixed hash seeds. Two sketches created this
484 /// way will produce identical results for identical inputs.
485 ///
486 /// Only use this in tests — it is NOT safe against targeted attacks.
487 pub fn new_deterministic() -> Self {
488 Self::alloc(None, 0, DEFAULT_HASH_A, DEFAULT_HASH_B)
489 }
490
491 /// Creates a sketch in auto mode. The decay clock ticks automatically
492 /// based on real time — no background thread needed.
493 ///
494 /// `interval_ms` sets how often data halves. For example, `1000` means
495 /// every second, all counts are halved. After 24 seconds, old data is gone.
496 pub fn with_epoch_interval(interval_ms: u64) -> Self {
497 assert!(interval_ms > 0, "epoch interval must be > 0");
498 let (a, b) = random_hash_params();
499 Self::alloc(Some(Instant::now()), interval_ms, a, b)
500 }
501
502 /// Allocates the 1 MB matrix on the heap, zeroed.
503 fn alloc(
504 created_at: Option<Instant>,
505 epoch_interval_ms: u64,
506 hash_a: [u64; DEPTH],
507 hash_b: [u64; DEPTH],
508 ) -> Self {
509 // SAFETY: AtomicU32 with zeroed bytes = AtomicU32::new(0).
510 // Guaranteed by Rust's atomic type representation.
511 let rows = unsafe {
512 let layout = std::alloc::Layout::new::<[Row; DEPTH]>();
513 let ptr = std::alloc::alloc_zeroed(layout) as *mut [Row; DEPTH];
514 if ptr.is_null() {
515 std::alloc::handle_alloc_error(layout);
516 }
517 Box::from_raw(ptr)
518 };
519 ClTds {
520 rows,
521 epoch: AtomicU64::new(0),
522 created_at,
523 epoch_interval_ms,
524 hash_a,
525 hash_b,
526 }
527 }
528
529 /// Returns the current epoch number. In manual mode, this reflects how
530 /// many times `tick_epoch()` was called. In auto mode, it's derived from
531 /// the wall clock.
532 #[inline(always)]
533 fn current_epoch(&self) -> u64 {
534 match self.created_at {
535 Some(t) => t.elapsed().as_millis() as u64 / self.epoch_interval_ms,
536 None => self.epoch.load(Ordering::Relaxed),
537 }
538 }
539
540 /// Hashes `id` to a column index for the given `row`.
541 #[inline(always)]
542 fn hash(&self, id: u64, row: usize) -> usize {
543 compute_hash(id, row, &self.hash_a, &self.hash_b)
544 }
545
546 /// Records one occurrence of `id` in the stream.
547 ///
548 /// This is the main write operation. It hashes `id` to 4 cells (one per row),
549 /// decays any stale values, and increments the counter — all atomically.
550 ///
551 /// Safe to call from multiple threads without any locking.
552 ///
553 /// Decay is lazy and exact: `(x >> a) >> b = x >> (a + b)` — no approximation.
554 pub fn increment(&self, id: u64) {
555 let epoch = self.current_epoch();
556 let epoch_low = (epoch & TS_MASK as u64) as u32;
557
558 for row_idx in 0..DEPTH {
559 let col = self.hash(id, row_idx);
560 let cell = &self.rows[row_idx].0[col];
561
562 loop {
563 let old = cell.load(Ordering::Relaxed);
564 let (old_ts, old_count) = unpack(old);
565 let steps = decay_steps(old_ts, epoch);
566 let decayed = apply_decay(old_count, steps);
567 let new_count = (decayed + 1).min(MAX_COUNT);
568 let new_val = pack(epoch_low, new_count);
569 match cell.compare_exchange_weak(old, new_val, Ordering::Relaxed, Ordering::Relaxed)
570 {
571 Ok(_) => break,
572 Err(_) => continue,
573 }
574 }
575 }
576 }
577
578 /// Returns the estimated frequency of `id` in the current time window.
579 ///
580 /// Looks up `id` in all 4 rows, decays stale values, and returns the
581 /// minimum count. The minimum filters out noise from hash collisions,
582 /// giving you the tightest possible estimate.
583 ///
584 /// ```text
585 /// Guarantee: E[query(x)] ≤ f_t(x) + ε · N_effective(t)
586 /// where ε = e / 65536 ≈ 0.0000414
587 /// False positive: P ≤ e^{-4} ≈ 1.8%
588 /// ```
589 pub fn query(&self, id: u64) -> u32 {
590 let epoch = self.current_epoch();
591 let mut min_count = u32::MAX;
592
593 for row_idx in 0..DEPTH {
594 let col = self.hash(id, row_idx);
595 let cell = &self.rows[row_idx].0[col];
596 let val = cell.load(Ordering::Relaxed);
597 let (ts, count) = unpack(val);
598 let steps = decay_steps(ts, epoch);
599 let decayed = apply_decay(count, steps);
600
601 min_count = min_count.min(decayed);
602 }
603
604 min_count
605 }
606
607 /// Advances the decay clock by one step (manual mode only).
608 ///
609 /// Each tick halves all counters the next time they're touched.
610 /// Call this on a timer — for example, once per second.
611 pub fn tick_epoch(&self) {
612 self.epoch.fetch_add(1, Ordering::Relaxed);
613 }
614
615 /// Returns the current epoch number.
616 pub fn epoch(&self) -> u64 {
617 self.current_epoch()
618 }
619
620 /// Returns `true` if the sketch was created with [`with_epoch_interval`](Self::with_epoch_interval).
621 pub fn is_auto_epoch(&self) -> bool {
622 self.created_at.is_some()
623 }
624
625 /// Returns the total memory used by the matrix, in bytes. Always `1,048,576` (1 MB).
626 pub fn memory_bytes(&self) -> usize {
627 DEPTH * WIDTH * std::mem::size_of::<AtomicU32>()
628 }
629
630 /// Returns the algorithm's tuning parameters: `(ε, δ, width, depth)`.
631 ///
632 /// - `ε` — max overcount per stream item (~0.00004)
633 /// - `δ` — probability of a false positive (~1.8%)
634 /// - `width` — columns per row (65536)
635 /// - `depth` — number of rows (4)
636 pub fn algorithm_parameters() -> (f64, f64, usize, usize) {
637 let epsilon = std::f64::consts::E / WIDTH as f64;
638 let delta = (-(DEPTH as f64)).exp();
639 (epsilon, delta, WIDTH, DEPTH)
640 }
641
642 /// Tells you the worst-case overcount for a stream of `n_effective` items.
643 ///
644 /// For example, if you've seen 1 million items, the max error is about 41.
645 pub fn error_bound(n_effective: u64) -> f64 {
646 let epsilon = std::f64::consts::E / WIDTH as f64;
647 epsilon * n_effective as f64
648 }
649
650 /// Saves the entire sketch state to a byte vector.
651 ///
652 /// Write this to disk for crash recovery. The output is about 1 MB
653 /// (matrix data + a small header with epoch and hash seeds).
654 pub fn to_bytes(&self) -> Vec<u8> {
655 let epoch = self.current_epoch();
656 let matrix_size = DEPTH * WIDTH * 4;
657 let header_size = 8 + (DEPTH * 8 * 2); // epoch + hash params
658 let mut buf = Vec::with_capacity(header_size + matrix_size);
659
660 buf.extend_from_slice(&epoch.to_le_bytes());
661 for i in 0..DEPTH {
662 buf.extend_from_slice(&self.hash_a[i].to_le_bytes());
663 }
664 for i in 0..DEPTH {
665 buf.extend_from_slice(&self.hash_b[i].to_le_bytes());
666 }
667 for row in 0..DEPTH {
668 for col in 0..WIDTH {
669 let val = self.rows[row].0[col].load(Ordering::Relaxed);
670 buf.extend_from_slice(&val.to_le_bytes());
671 }
672 }
673 buf
674 }
675
676 /// Restores a sketch from a byte slice previously created by [`to_bytes()`](Self::to_bytes).
677 ///
678 /// Returns `None` if the data is corrupted or the wrong size.
679 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
680 let matrix_size = DEPTH * WIDTH * 4;
681 let header_size = 8 + (DEPTH * 8 * 2);
682 if bytes.len() != header_size + matrix_size {
683 return None;
684 }
685
686 let mut pos = 0;
687 let epoch = u64::from_le_bytes(bytes[pos..pos + 8].try_into().ok()?);
688 pos += 8;
689
690 let mut hash_a = [0u64; DEPTH];
691 for item in hash_a.iter_mut() {
692 *item = u64::from_le_bytes(bytes[pos..pos + 8].try_into().ok()?);
693 pos += 8;
694 }
695 let mut hash_b = [0u64; DEPTH];
696 for item in hash_b.iter_mut() {
697 *item = u64::from_le_bytes(bytes[pos..pos + 8].try_into().ok()?);
698 pos += 8;
699 }
700
701 let sketch = Self::alloc(None, 0, hash_a, hash_b);
702 sketch.epoch.store(epoch, Ordering::Relaxed);
703
704 for row in 0..DEPTH {
705 for col in 0..WIDTH {
706 let val = u32::from_le_bytes(bytes[pos..pos + 4].try_into().ok()?);
707 sketch.rows[row].0[col].store(val, Ordering::Relaxed);
708 pos += 4;
709 }
710 }
711 Some(sketch)
712 }
713}
714
715#[cfg(test)]
716mod tests {
717 use super::*;
718
719 // === Bit Packing Tests ===
720
721 #[test]
722 fn pack_unpack_roundtrip() {
723 for ts in [0, 1, 127, 255] {
724 for count in [0, 1, 1000, MAX_COUNT] {
725 let packed = pack(ts, count);
726 let (got_ts, got_count) = unpack(packed);
727 assert_eq!(got_ts, ts, "ts mismatch");
728 assert_eq!(got_count, count, "count mismatch");
729 }
730 }
731 }
732
733 #[test]
734 fn pack_truncates_overflow() {
735 // Timestamp overflow (>255) should be masked
736 let (ts, _) = unpack(pack(256, 0));
737 assert_eq!(ts, 0); // 256 & 0xFF = 0
738
739 // Count overflow should be masked
740 let (_, count) = unpack(pack(0, MAX_COUNT + 1));
741 assert_eq!(count, 0); // (MAX_COUNT+1) & COUNT_MASK wraps
742 }
743
744 #[test]
745 fn decay_steps_same_epoch() {
746 assert_eq!(decay_steps(5, 5), 0);
747 assert_eq!(decay_steps(0, 0), 0);
748 assert_eq!(decay_steps(255, 255), 0);
749 }
750
751 #[test]
752 fn decay_steps_simple_gap() {
753 assert_eq!(decay_steps(0, 1), 1);
754 assert_eq!(decay_steps(0, 5), 5);
755 assert_eq!(decay_steps(10, 15), 5);
756 }
757
758 #[test]
759 fn decay_steps_wraparound() {
760 // 8-bit wrap: epoch=2, cell_ts=250 → gap = (2-250) & 0xFF = 8
761 assert_eq!(decay_steps(250, 2), 8);
762 }
763
764 #[test]
765 fn decay_steps_capped_at_count_bits() {
766 // Gap of 100 should be capped at 24
767 assert_eq!(decay_steps(0, 100), 24);
768 }
769
770 // === Hash Tests ===
771
772 #[test]
773 fn hash_produces_valid_indices() {
774 for row in 0..DEPTH {
775 for id in [0, 1, u64::MAX, 0xDEADBEEF, 42] {
776 let idx = compute_hash(id, row, &DEFAULT_HASH_A, &DEFAULT_HASH_B);
777 assert!(idx < WIDTH, "hash out of range: {}", idx);
778 }
779 }
780 }
781
782 #[test]
783 fn hash_different_rows_differ() {
784 // Same ID should map to different columns in different rows
785 // (with very high probability for most IDs)
786 let id = 0xDEADBEEF_u64;
787 let indices: Vec<usize> = (0..DEPTH)
788 .map(|r| compute_hash(id, r, &DEFAULT_HASH_A, &DEFAULT_HASH_B))
789 .collect();
790 // At least 2 of 4 should differ (probabilistic but near-certain)
791 let unique: std::collections::HashSet<_> = indices.iter().collect();
792 assert!(unique.len() >= 2, "hash functions not independent enough");
793 }
794
795 #[test]
796 fn hash_distribution_uniformity() {
797 // Insert 100K random-ish IDs, check bucket distribution
798 let mut buckets = vec![0u32; WIDTH];
799 for id in 0..100_000u64 {
800 let idx = compute_hash(
801 id.wrapping_mul(0x12345),
802 0,
803 &DEFAULT_HASH_A,
804 &DEFAULT_HASH_B,
805 );
806 buckets[idx] += 1;
807 }
808 let max = *buckets.iter().max().unwrap();
809 let expected = 100_000.0 / WIDTH as f64;
810 // Max bucket should be < 10x expected (very loose bound)
811 assert!(
812 max as f64 <= expected * 10.0,
813 "distribution too skewed: max={}, expected={:.1}",
814 max,
815 expected
816 );
817 }
818
819 // === Core Algorithm Tests ===
820
821 #[test]
822 fn basic_increment_and_query() {
823 let sketch = ClTds::new_deterministic();
824 let id = 42u64;
825
826 // Insert 100 times
827 for _ in 0..100 {
828 sketch.increment(id);
829 }
830
831 // Query should return exactly 100 (no collisions expected for single item)
832 let count = sketch.query(id);
833 assert_eq!(count, 100, "expected 100, got {}", count);
834 }
835
836 #[test]
837 fn query_unseen_item_returns_zero() {
838 let sketch = ClTds::new_deterministic();
839 sketch.increment(1);
840 sketch.increment(2);
841
842 // Item 999 was never inserted
843 let count = sketch.query(999);
844 assert_eq!(count, 0, "unseen item should have count 0, got {}", count);
845 }
846
847 #[test]
848 fn multiple_items_independent() {
849 let sketch = ClTds::new_deterministic();
850
851 sketch.increment(100);
852 sketch.increment(100);
853 sketch.increment(100);
854
855 sketch.increment(200);
856
857 assert_eq!(sketch.query(100), 3);
858 assert_eq!(sketch.query(200), 1);
859 }
860
861 #[test]
862 fn counter_saturation() {
863 let sketch = ClTds::new_deterministic();
864 let id = 1u64;
865
866 // Manually set a cell near MAX_COUNT to test saturation
867 let col = sketch.hash(id, 0);
868 let near_max = pack(0, MAX_COUNT - 1);
869 sketch.rows[0].0[col].store(near_max, Ordering::Relaxed);
870
871 // Increment should saturate at MAX_COUNT, not overflow
872 sketch.increment(id);
873 let val = sketch.rows[0].0[col].load(Ordering::Relaxed);
874 let (_, count) = unpack(val);
875 assert_eq!(count, MAX_COUNT);
876 }
877
878 // === Decay Tests ===
879
880 #[test]
881 fn decay_halves_count() {
882 let sketch = ClTds::new_deterministic();
883 let id = 1u64;
884
885 // Insert 1000 times at epoch 0
886 for _ in 0..1000 {
887 sketch.increment(id);
888 }
889 assert_eq!(sketch.query(id), 1000);
890
891 // Advance 1 epoch → count should halve
892 sketch.tick_epoch();
893 assert_eq!(sketch.query(id), 500); // 1000 >> 1
894
895 // Advance 1 more → halve again
896 sketch.tick_epoch();
897 assert_eq!(sketch.query(id), 250); // 1000 >> 2
898 }
899
900 #[test]
901 fn decay_multiple_epochs() {
902 let sketch = ClTds::new_deterministic();
903 let id = 7u64;
904
905 for _ in 0..1024 {
906 sketch.increment(id);
907 }
908
909 // Advance 5 epochs at once
910 for _ in 0..5 {
911 sketch.tick_epoch();
912 }
913
914 // 1024 >> 5 = 32
915 assert_eq!(sketch.query(id), 32);
916 }
917
918 #[test]
919 fn decay_to_zero() {
920 let sketch = ClTds::new_deterministic();
921 let id = 1u64;
922
923 sketch.increment(id);
924 assert_eq!(sketch.query(id), 1);
925
926 // Advance 1 epoch → 1 >> 1 = 0
927 sketch.tick_epoch();
928 assert_eq!(sketch.query(id), 0);
929 }
930
931 #[test]
932 fn decay_with_interleaved_inserts() {
933 let sketch = ClTds::new_deterministic();
934 let id = 1u64;
935
936 // Epoch 0: insert 100
937 for _ in 0..100 {
938 sketch.increment(id);
939 }
940
941 // Epoch 1: insert 50 more
942 sketch.tick_epoch();
943 for _ in 0..50 {
944 sketch.increment(id);
945 }
946
947 // Query: 100 >> 1 + 50 = 50 + 50 = 100
948 // (first 100 decayed by 1 epoch, plus 50 new)
949 assert_eq!(sketch.query(id), 100);
950 }
951
952 // === Claim 1 Verification: Error Bound ===
953
954 #[test]
955 fn claim1_no_undercounting() {
956 // CMS guarantee: query(x) ≥ true_decayed_count (no undercount)
957 let sketch = ClTds::new_deterministic();
958 let target = 42u64;
959
960 for _ in 0..500 {
961 sketch.increment(target);
962 }
963 // Add noise from other items
964 for id in 1000..2000u64 {
965 sketch.increment(id);
966 }
967
968 let est = sketch.query(target);
969 assert!(
970 est >= 500,
971 "Claim 1 violation: undercounting! got {} < 500",
972 est
973 );
974 }
975
976 #[test]
977 fn claim1_overcount_bounded() {
978 // E[query(x)] ≤ f(x) + ε·N where ε = e/w ≈ 0.0000414
979 let sketch = ClTds::new_deterministic();
980
981 let heavy_hitter = 42u64;
982 let true_count = 1000u32;
983
984 for _ in 0..true_count {
985 sketch.increment(heavy_hitter);
986 }
987
988 // Insert N total other items (noise)
989 let n_noise = 100_000u64;
990 for id in 0..n_noise {
991 sketch.increment(id + 10_000);
992 }
993
994 let n_total = true_count as f64 + n_noise as f64;
995 let epsilon = std::f64::consts::E / WIDTH as f64;
996 let max_overcount = epsilon * n_total;
997
998 let est = sketch.query(heavy_hitter);
999 let overcount = est as f64 - true_count as f64;
1000
1001 // Allow 10x the expected error (statistical slack)
1002 assert!(
1003 overcount <= max_overcount * 10.0,
1004 "Claim 1 error too large: overcount={:.0}, bound={:.0}",
1005 overcount,
1006 max_overcount * 10.0
1007 );
1008 }
1009
1010 // === Claim 2 Verification: False Positive Bound ===
1011
1012 #[test]
1013 fn claim2_false_positive_rate() {
1014 // Insert some heavy hitters, then query many non-existent items.
1015 // False positive rate should be ≤ ~2% (δ ≤ e^{-4}).
1016 let sketch = ClTds::new_deterministic();
1017
1018 // Insert 100 heavy hitters
1019 for id in 0..100u64 {
1020 for _ in 0..1000 {
1021 sketch.increment(id);
1022 }
1023 }
1024
1025 let _n_total = 100 * 1000;
1026 let threshold = 50u32; // items above this = "heavy hitter"
1027
1028 // Query 10,000 non-existent items
1029 let n_queries = 10_000u64;
1030 let mut false_positives = 0u64;
1031
1032 for id in 1_000_000..1_000_000 + n_queries {
1033 if sketch.query(id) > threshold {
1034 false_positives += 1;
1035 }
1036 }
1037
1038 let fp_rate = false_positives as f64 / n_queries as f64;
1039
1040 // δ ≤ e^{-4} ≈ 1.8%, allow 5% margin
1041 assert!(
1042 fp_rate <= 0.05,
1043 "Claim 2 violation: false positive rate {:.2}% > 5%",
1044 fp_rate * 100.0
1045 );
1046 }
1047
1048 // === Claim 3 Verification: Lazy = Full Decay ===
1049
1050 #[test]
1051 fn claim3_lazy_equals_full_decay() {
1052 // Verify: decaying step-by-step = decaying all-at-once
1053 // This is the core mathematical claim: 2^{-a} · 2^{-b} = 2^{-(a+b)}
1054 for initial in [1, 7, 100, 1000, 65535, MAX_COUNT] {
1055 for total_steps in 0..=24u32 {
1056 // Full decay: shift 1 bit at a time
1057 let mut full = initial;
1058 for _ in 0..total_steps {
1059 full >>= 1;
1060 }
1061
1062 // Lazy decay: shift all at once
1063 let lazy = apply_decay(initial, total_steps);
1064
1065 assert_eq!(
1066 full, lazy,
1067 "Claim 3 violation! initial={}, steps={}: full={}, lazy={}",
1068 initial, total_steps, full, lazy
1069 );
1070 }
1071 }
1072 }
1073
1074 #[test]
1075 fn claim3_lazy_decay_in_sketch() {
1076 // Verify lazy decay produces same result as manual step-by-step
1077 let sketch_lazy = ClTds::new_deterministic();
1078 let id = 99u64;
1079
1080 // Insert 1024 at epoch 0
1081 for _ in 0..1024 {
1082 sketch_lazy.increment(id);
1083 }
1084
1085 // Advance 5 epochs (lazy: decay applied at next query)
1086 for _ in 0..5 {
1087 sketch_lazy.tick_epoch();
1088 }
1089 let lazy_result = sketch_lazy.query(id);
1090
1091 // Manual full decay: 1024 >> 5 = 32
1092 let full_result = 1024u32 >> 5;
1093
1094 assert_eq!(
1095 lazy_result, full_result,
1096 "Claim 3 in-sketch: lazy={}, full={}",
1097 lazy_result, full_result
1098 );
1099 }
1100
1101 // === Thread Safety Test ===
1102
1103 #[test]
1104 fn concurrent_increments() {
1105 use std::sync::Arc;
1106
1107 let sketch = Arc::new(ClTds::new_deterministic());
1108 let id = 42u64;
1109 let threads = 4;
1110 let per_thread = 10_000;
1111
1112 std::thread::scope(|s| {
1113 for _ in 0..threads {
1114 let sk = Arc::clone(&sketch);
1115 s.spawn(move || {
1116 for _ in 0..per_thread {
1117 sk.increment(id);
1118 }
1119 });
1120 }
1121 });
1122
1123 let total = sketch.query(id);
1124 let expected = (threads * per_thread) as u32;
1125
1126 assert_eq!(
1127 total, expected,
1128 "Thread safety: got {}, expected {}",
1129 total, expected
1130 );
1131 }
1132
1133 // === Memory Size Verification ===
1134
1135 #[test]
1136 fn matrix_size_is_1mb() {
1137 let sketch = ClTds::new_deterministic();
1138 let size = sketch.memory_bytes();
1139 assert_eq!(
1140 size, 1_048_576,
1141 "Matrix should be exactly 1 MB, got {} bytes",
1142 size
1143 );
1144 }
1145
1146 #[test]
1147 fn epoch_advances() {
1148 let sketch = ClTds::new_deterministic();
1149 assert_eq!(sketch.epoch(), 0);
1150 sketch.tick_epoch();
1151 assert_eq!(sketch.epoch(), 1);
1152 sketch.tick_epoch();
1153 sketch.tick_epoch();
1154 assert_eq!(sketch.epoch(), 3);
1155 }
1156
1157 // === Claim 2: Zipf Real-World Stress Test ===
1158
1159 #[test]
1160 fn claim2_zipf_stress_test() {
1161 // Real network traffic follows Zipf distribution:
1162 // top 1% of IPs = ~80% of all packets
1163 // This tests Claim 2 under realistic conditions.
1164 let sketch = ClTds::new_deterministic();
1165
1166 let n_heavy = 10u64; // 10 heavy hitters (top ~1%)
1167 let n_mice = 990u64; // 990 normal IPs
1168 let per_heavy = 8_000u64; // 80K total from heavy hitters
1169 let per_mouse = 20u64; // 19.8K total from mice
1170 // Total ≈ 100K packets, heavy = 80%
1171
1172 // Insert heavy hitters
1173 for id in 1..=n_heavy {
1174 for _ in 0..per_heavy {
1175 sketch.increment(id);
1176 }
1177 }
1178
1179 // Insert normal traffic
1180 for id in 0..n_mice {
1181 for _ in 0..per_mouse {
1182 sketch.increment(id + 100_000);
1183 }
1184 }
1185
1186 // All heavy hitters should be detected
1187 let threshold = 1000u32;
1188 for id in 1..=n_heavy {
1189 let count = sketch.query(id);
1190 assert!(
1191 count >= per_heavy as u32,
1192 "Zipf: heavy hitter {} not detected: count={} < {}",
1193 id,
1194 count,
1195 per_heavy
1196 );
1197 }
1198
1199 // False positive check on 50K unseen IPs
1200 let n_test = 50_000u64;
1201 let mut fp = 0u64;
1202 for id in 1_000_000..1_000_000 + n_test {
1203 if sketch.query(id) > threshold {
1204 fp += 1;
1205 }
1206 }
1207 let fp_rate = fp as f64 / n_test as f64;
1208 assert!(
1209 fp_rate <= 0.02,
1210 "Zipf Claim 2: false positive rate {:.3}% exceeds 2%",
1211 fp_rate * 100.0
1212 );
1213 }
1214
1215 // === Auto Epoch Tests ===
1216
1217 #[test]
1218 fn manual_mode_by_default() {
1219 let sketch = ClTds::new_deterministic();
1220 assert!(!sketch.is_auto_epoch());
1221 assert_eq!(sketch.epoch(), 0);
1222 }
1223
1224 #[test]
1225 fn auto_mode_via_constructor() {
1226 let sketch = ClTds::with_epoch_interval(1000);
1227 assert!(sketch.is_auto_epoch());
1228 // Epoch starts at 0 (just created)
1229 assert_eq!(sketch.epoch(), 0);
1230 }
1231
1232 #[test]
1233 fn auto_epoch_advances_with_time() {
1234 // 1ms per epoch → should advance quickly
1235 let sketch = ClTds::with_epoch_interval(1);
1236
1237 // Insert some data
1238 for _ in 0..100 {
1239 sketch.increment(42);
1240 }
1241
1242 // Sleep 15ms → epoch should be ≥10
1243 std::thread::sleep(std::time::Duration::from_millis(15));
1244 let epoch = sketch.epoch();
1245 assert!(
1246 epoch >= 10,
1247 "auto epoch should advance with time, got {}",
1248 epoch
1249 );
1250 }
1251
1252 #[test]
1253 fn auto_epoch_decay_works() {
1254 // 10ms per epoch
1255 let sketch = ClTds::with_epoch_interval(10);
1256
1257 for _ in 0..1024 {
1258 sketch.increment(7);
1259 }
1260 assert_eq!(sketch.query(7), 1024);
1261
1262 // Wait 50ms → ~5 epochs → 1024 >> 5 = 32
1263 std::thread::sleep(std::time::Duration::from_millis(55));
1264 let count = sketch.query(7);
1265 // Allow some tolerance (timing isn't exact)
1266 assert!(
1267 count <= 64 && count >= 16,
1268 "auto decay after ~5 epochs: expected ~32, got {}",
1269 count
1270 );
1271 }
1272
1273 #[test]
1274 #[should_panic(expected = "epoch interval must be > 0")]
1275 fn auto_epoch_zero_interval_panics() {
1276 let _sketch = ClTds::with_epoch_interval(0);
1277 }
1278
1279 // === Adversarial Hash Resistance Tests ===
1280
1281 #[test]
1282 fn two_sketches_different_hashes() {
1283 // Two new() sketches should have different hash mappings
1284 let s1 = ClTds::new();
1285 let s2 = ClTds::new();
1286
1287 let id = 42u64;
1288 let idx1: Vec<usize> = (0..DEPTH).map(|r| s1.hash(id, r)).collect();
1289 let idx2: Vec<usize> = (0..DEPTH).map(|r| s2.hash(id, r)).collect();
1290
1291 // With random hashes, extremely unlikely to be identical
1292 assert_ne!(
1293 idx1, idx2,
1294 "Two sketches should have different hash mappings"
1295 );
1296 }
1297
1298 #[test]
1299 fn random_hashes_produce_valid_results() {
1300 // new() with random hashes should still work correctly
1301 let sketch = ClTds::new(); // random hashes
1302
1303 for _ in 0..100 {
1304 sketch.increment(42);
1305 }
1306 let count = sketch.query(42);
1307 assert_eq!(count, 100, "random hashes: expected 100, got {}", count);
1308 assert_eq!(sketch.query(999999), 0, "unseen item should be 0");
1309 }
1310
1311 #[test]
1312 fn deterministic_hashes_reproducible() {
1313 // Two deterministic sketches should produce identical results
1314 let s1 = ClTds::new_deterministic();
1315 let s2 = ClTds::new_deterministic();
1316
1317 for _ in 0..500 {
1318 s1.increment(42);
1319 s2.increment(42);
1320 }
1321
1322 assert_eq!(s1.query(42), s2.query(42));
1323 }
1324
1325 // === Algorithm Parameters Tests ===
1326
1327 #[test]
1328 fn algorithm_parameters_correct() {
1329 let (epsilon, delta, w, d) = ClTds::algorithm_parameters();
1330
1331 assert_eq!(w, 65536);
1332 assert_eq!(d, 4);
1333 assert!((epsilon - std::f64::consts::E / 65536.0).abs() < 1e-10);
1334 assert!((delta - (-4.0_f64).exp()).abs() < 1e-10);
1335
1336 // Human-readable checks
1337 assert!(epsilon < 0.0001, "ε should be tiny");
1338 assert!(delta < 0.02, "δ should be < 2%");
1339 }
1340
1341 #[test]
1342 fn error_bound_scales_with_stream() {
1343 let err_1m = ClTds::error_bound(1_000_000);
1344 let err_10m = ClTds::error_bound(10_000_000);
1345
1346 // Error should scale linearly with N
1347 assert!((err_10m / err_1m - 10.0).abs() < 0.001);
1348
1349 // Error for 1M stream: e/65536 * 1M ≈ 41.4
1350 assert!(err_1m < 50.0, "error bound for 1M stream should be < 50");
1351 }
1352
1353 // === Phase 4: Complete Algorithm Validation ===
1354
1355 #[test]
1356 fn adversarial_collision_min_filter_holds() {
1357 // Even if an attacker floods specific IDs to pollute buckets,
1358 // the 4-row min-filter should prevent false positives for
1359 // unrelated queries.
1360 let sketch = ClTds::new_deterministic();
1361
1362 // Attacker floods 1000 different IDs, each 1000 times
1363 for id in 0..1000u64 {
1364 for _ in 0..1000 {
1365 sketch.increment(id);
1366 }
1367 }
1368
1369 // Query 10,000 IDs that were NEVER inserted
1370 // Min-filter should keep most at 0 or very low
1371 let mut high_false = 0u64;
1372 for id in 500_000..510_000u64 {
1373 let count = sketch.query(id);
1374 if count > 100 {
1375 high_false += 1;
1376 }
1377 }
1378
1379 let fp_rate = high_false as f64 / 10_000.0;
1380 assert!(
1381 fp_rate < 0.02,
1382 "Adversarial: false positive rate {:.2}% exceeds 2%",
1383 fp_rate * 100.0
1384 );
1385 }
1386
1387 #[test]
1388 fn decay_accuracy_over_many_epochs() {
1389 // Verify decay stays mathematically exact over 20 epochs
1390 let sketch = ClTds::new_deterministic();
1391 let id = 1u64;
1392 let initial = 1_000_000u32;
1393
1394 for _ in 0..initial {
1395 sketch.increment(id);
1396 }
1397
1398 // Verify exact halving for each epoch (up to 20)
1399 for epoch in 1..=20u32 {
1400 sketch.tick_epoch();
1401 let expected = initial >> epoch;
1402 let actual = sketch.query(id);
1403 assert_eq!(
1404 actual, expected,
1405 "Epoch {}: expected {}, got {}",
1406 epoch, expected, actual
1407 );
1408 }
1409 }
1410
1411 #[test]
1412 fn concurrent_increment_with_decay() {
1413 // Multiple threads inserting WHILE epoch advances
1414 use std::sync::Arc;
1415
1416 let sketch = Arc::new(ClTds::new_deterministic());
1417 let id = 99u64;
1418 let threads = 4;
1419 let per_thread = 5_000;
1420
1421 std::thread::scope(|s| {
1422 // 4 writer threads
1423 for _ in 0..threads {
1424 let sk = Arc::clone(&sketch);
1425 s.spawn(move || {
1426 for _ in 0..per_thread {
1427 sk.increment(id);
1428 }
1429 });
1430 }
1431
1432 // 1 epoch ticker thread (advances 3 times during inserts)
1433 let sk = Arc::clone(&sketch);
1434 s.spawn(move || {
1435 std::thread::sleep(std::time::Duration::from_millis(1));
1436 sk.tick_epoch();
1437 std::thread::sleep(std::time::Duration::from_millis(1));
1438 sk.tick_epoch();
1439 std::thread::sleep(std::time::Duration::from_millis(1));
1440 sk.tick_epoch();
1441 });
1442 });
1443
1444 // With 3 decay epochs during 20K inserts, count should be
1445 // significantly less than 20K but still substantial
1446 let count = sketch.query(id);
1447 assert!(
1448 count > 0 && count <= (threads * per_thread) as u32,
1449 "Concurrent+decay: count {} out of reasonable range",
1450 count
1451 );
1452 }
1453
1454 #[test]
1455 fn full_integration_test() {
1456 // Combines: random hashes + auto epoch + Zipf + decay + query
1457 let sketch = ClTds::new(); // random hashes (adversarial-resistant)
1458
1459 // Phase 1: Insert heavy hitters
1460 for id in 1..=5u64 {
1461 for _ in 0..10_000 {
1462 sketch.increment(id);
1463 }
1464 }
1465
1466 // Phase 2: Insert normal traffic
1467 for id in 100..1100u64 {
1468 for _ in 0..10 {
1469 sketch.increment(id);
1470 }
1471 }
1472
1473 // Phase 3: Verify detection
1474 for id in 1..=5u64 {
1475 assert!(
1476 sketch.query(id) >= 10_000,
1477 "Integration: heavy hitter {} not detected",
1478 id
1479 );
1480 }
1481
1482 // Phase 4: Decay and verify forgetting
1483 for _ in 0..10 {
1484 sketch.tick_epoch();
1485 }
1486 for id in 1..=5u64 {
1487 let decayed = sketch.query(id);
1488 // 10000 >> 10 = 9
1489 assert!(
1490 decayed <= 15,
1491 "Integration: heavy hitter {} not forgotten after decay: {}",
1492 id,
1493 decayed
1494 );
1495 }
1496
1497 // Phase 5: New traffic after decay should be detected fresh
1498 for _ in 0..500 {
1499 sketch.increment(42);
1500 }
1501 assert_eq!(
1502 sketch.query(42),
1503 500,
1504 "New traffic after decay should be exact"
1505 );
1506
1507 // Phase 6: Verify algorithm parameters are accessible
1508 let (eps, delta, w, d) = ClTds::algorithm_parameters();
1509 assert!(eps > 0.0 && delta > 0.0 && w == WIDTH && d == DEPTH);
1510 }
1511}