Skip to main content

reddb_server/storage/primitives/
hyperloglog.rs

1//! HyperLogLog — Probabilistic Cardinality Estimation
2//!
3//! Estimates the number of distinct elements in a set using ~12KB of memory
4//! with ~0.81% standard error. Based on the HyperLogLog algorithm by Flajolet
5//! et al. with bias correction.
6//!
7//! # Example
8//! ```ignore
9//! let mut hll = HyperLogLog::new();
10//! hll.add(b"user1");
11//! hll.add(b"user2");
12//! hll.add(b"user1"); // duplicate
13//! assert!(hll.count() >= 2); // approximately 2
14//! ```
15
16/// Number of registers (2^14 = 16384) — standard HLL precision
17const NUM_REGISTERS: usize = 16384;
18/// Bits used for bucket index (14 bits → 16384 buckets)
19const P: u32 = 14;
20/// Alpha constant for bias correction with m=16384
21const ALPHA: f64 = 0.7213 / (1.0 + 1.079 / NUM_REGISTERS as f64);
22
23/// HyperLogLog cardinality estimator
24#[derive(Debug, Clone)]
25pub struct HyperLogLog {
26    /// Registers storing max leading zeros + 1
27    registers: Vec<u8>,
28}
29
30impl HyperLogLog {
31    /// Create a new HLL with 16384 registers (~16KB memory)
32    pub fn new() -> Self {
33        Self {
34            registers: vec![0u8; NUM_REGISTERS],
35        }
36    }
37
38    /// Add an element to the HLL
39    pub fn add(&mut self, data: &[u8]) {
40        let hash = Self::hash(data);
41        let index = (hash >> (64 - P)) as usize;
42        let remaining = (hash << P) | (1 << (P - 1)); // ensure non-zero
43        let rho = remaining.leading_zeros() as u8 + 1;
44        if rho > self.registers[index] {
45            self.registers[index] = rho;
46        }
47    }
48
49    /// Estimate the cardinality (number of distinct elements)
50    pub fn count(&self) -> u64 {
51        let m = NUM_REGISTERS as f64;
52
53        // Harmonic mean of 2^(-register[i])
54        let mut sum = 0.0f64;
55        let mut zeros = 0u32;
56        for &reg in &self.registers {
57            sum += 2.0f64.powi(-(reg as i32));
58            if reg == 0 {
59                zeros += 1;
60            }
61        }
62
63        let raw_estimate = ALPHA * m * m / sum;
64
65        // Small range correction (linear counting)
66        if raw_estimate <= 2.5 * m && zeros > 0 {
67            let lc = m * (m / zeros as f64).ln();
68            return lc as u64;
69        }
70
71        // Large range correction (for 64-bit hashes this is rarely needed)
72        let two_pow_64 = 2.0f64.powi(64);
73        if raw_estimate > two_pow_64 / 30.0 {
74            let corrected = -two_pow_64 * (1.0 - raw_estimate / two_pow_64).ln();
75            return corrected as u64;
76        }
77
78        raw_estimate as u64
79    }
80
81    /// Merge another HLL into this one (union)
82    pub fn merge(&mut self, other: &HyperLogLog) {
83        for (i, &other_reg) in other.registers.iter().enumerate() {
84            if other_reg > self.registers[i] {
85                self.registers[i] = other_reg;
86            }
87        }
88    }
89
90    /// Create a merged HLL from two HLLs without modifying either
91    pub fn merged(a: &HyperLogLog, b: &HyperLogLog) -> HyperLogLog {
92        let mut result = HyperLogLog::new();
93        for i in 0..NUM_REGISTERS {
94            result.registers[i] = a.registers[i].max(b.registers[i]);
95        }
96        result
97    }
98
99    /// Clear the HLL
100    pub fn clear(&mut self) {
101        for reg in &mut self.registers {
102            *reg = 0;
103        }
104    }
105
106    /// Memory usage in bytes
107    pub fn memory_bytes(&self) -> usize {
108        std::mem::size_of::<Self>() + self.registers.len()
109    }
110
111    /// Serialize to bytes
112    pub fn as_bytes(&self) -> &[u8] {
113        &self.registers
114    }
115
116    /// Deserialize from bytes
117    pub fn from_bytes(bytes: Vec<u8>) -> Option<Self> {
118        if bytes.len() != NUM_REGISTERS {
119            return None;
120        }
121        Some(Self { registers: bytes })
122    }
123
124    /// FNV-1a hash producing a 64-bit value
125    fn hash(data: &[u8]) -> u64 {
126        let mut hash = 0xcbf29ce484222325u64;
127        for &byte in data {
128            hash ^= byte as u64;
129            hash = hash.wrapping_mul(0x100000001b3);
130        }
131        // Finalizer: mix bits for better distribution
132        hash ^= hash >> 33;
133        hash = hash.wrapping_mul(0xff51afd7ed558ccd);
134        hash ^= hash >> 33;
135        hash = hash.wrapping_mul(0xc4ceb9fe1a85ec53);
136        hash ^= hash >> 33;
137        hash
138    }
139}
140
141impl Default for HyperLogLog {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn test_hll_empty() {
153        let hll = HyperLogLog::new();
154        assert_eq!(hll.count(), 0);
155    }
156
157    #[test]
158    fn test_hll_single() {
159        let mut hll = HyperLogLog::new();
160        hll.add(b"hello");
161        assert!(hll.count() >= 1);
162        assert!(hll.count() <= 3); // some margin for probabilistic nature
163    }
164
165    #[test]
166    fn test_hll_duplicates() {
167        let mut hll = HyperLogLog::new();
168        for _ in 0..1000 {
169            hll.add(b"same_value");
170        }
171        // Should still be ~1
172        assert!(hll.count() <= 3);
173    }
174
175    #[test]
176    fn test_hll_accuracy_1k() {
177        let mut hll = HyperLogLog::new();
178        let n = 1000;
179        for i in 0..n {
180            let key = format!("element_{}", i);
181            hll.add(key.as_bytes());
182        }
183        let estimate = hll.count();
184        let error = (estimate as f64 - n as f64).abs() / n as f64;
185        println!(
186            "HLL 1K: actual={n}, estimate={estimate}, error={:.2}%",
187            error * 100.0
188        );
189        assert!(error < 0.10, "Error {error:.4} exceeds 10% for 1K elements");
190    }
191
192    #[test]
193    fn test_hll_accuracy_100k() {
194        let mut hll = HyperLogLog::new();
195        let n = 100_000;
196        for i in 0..n {
197            let key = format!("user_{}", i);
198            hll.add(key.as_bytes());
199        }
200        let estimate = hll.count();
201        let error = (estimate as f64 - n as f64).abs() / n as f64;
202        println!(
203            "HLL 100K: actual={n}, estimate={estimate}, error={:.2}%",
204            error * 100.0
205        );
206        assert!(
207            error < 0.05,
208            "Error {error:.4} exceeds 5% for 100K elements"
209        );
210    }
211
212    #[test]
213    fn test_hll_merge() {
214        let mut hll1 = HyperLogLog::new();
215        let mut hll2 = HyperLogLog::new();
216
217        for i in 0..500 {
218            hll1.add(format!("a_{}", i).as_bytes());
219        }
220        for i in 0..500 {
221            hll2.add(format!("b_{}", i).as_bytes());
222        }
223
224        let count1 = hll1.count();
225        let count2 = hll2.count();
226
227        hll1.merge(&hll2);
228        let merged_count = hll1.count();
229
230        // Merged count should be roughly count1 + count2 (no overlap)
231        assert!(merged_count > count1);
232        assert!(merged_count > count2);
233        let error = (merged_count as f64 - 1000.0).abs() / 1000.0;
234        assert!(error < 0.10);
235    }
236
237    #[test]
238    fn test_hll_serialization() {
239        let mut hll = HyperLogLog::new();
240        hll.add(b"test1");
241        hll.add(b"test2");
242
243        let bytes = hll.as_bytes().to_vec();
244        let restored = HyperLogLog::from_bytes(bytes).unwrap();
245        assert_eq!(hll.count(), restored.count());
246    }
247
248    #[test]
249    fn test_hll_memory() {
250        let hll = HyperLogLog::new();
251        let mem = hll.memory_bytes();
252        // Should be around 16KB + struct overhead
253        assert!(mem >= 16384);
254        assert!(mem < 20000);
255    }
256}