reddb_server/storage/primitives/
hyperloglog.rs1const NUM_REGISTERS: usize = 16384;
18const P: u32 = 14;
20const ALPHA: f64 = 0.7213 / (1.0 + 1.079 / NUM_REGISTERS as f64);
22
23#[derive(Debug, Clone)]
25pub struct HyperLogLog {
26 registers: Vec<u8>,
28}
29
30impl HyperLogLog {
31 pub fn new() -> Self {
33 Self {
34 registers: vec![0u8; NUM_REGISTERS],
35 }
36 }
37
38 pub fn add(&mut self, data: &[u8]) {
40 let hash = Self::hash(data);
41 let index = (hash >> (64 - P)) as usize;
42 let remaining = (hash << P) | (1 << (P - 1)); let rho = remaining.leading_zeros() as u8 + 1;
44 if rho > self.registers[index] {
45 self.registers[index] = rho;
46 }
47 }
48
49 pub fn count(&self) -> u64 {
51 let m = NUM_REGISTERS as f64;
52
53 let mut sum = 0.0f64;
55 let mut zeros = 0u32;
56 for ® in &self.registers {
57 sum += 2.0f64.powi(-(reg as i32));
58 if reg == 0 {
59 zeros += 1;
60 }
61 }
62
63 let raw_estimate = ALPHA * m * m / sum;
64
65 if raw_estimate <= 2.5 * m && zeros > 0 {
67 let lc = m * (m / zeros as f64).ln();
68 return lc as u64;
69 }
70
71 let two_pow_64 = 2.0f64.powi(64);
73 if raw_estimate > two_pow_64 / 30.0 {
74 let corrected = -two_pow_64 * (1.0 - raw_estimate / two_pow_64).ln();
75 return corrected as u64;
76 }
77
78 raw_estimate as u64
79 }
80
81 pub fn merge(&mut self, other: &HyperLogLog) {
83 for (i, &other_reg) in other.registers.iter().enumerate() {
84 if other_reg > self.registers[i] {
85 self.registers[i] = other_reg;
86 }
87 }
88 }
89
90 pub fn merged(a: &HyperLogLog, b: &HyperLogLog) -> HyperLogLog {
92 let mut result = HyperLogLog::new();
93 for i in 0..NUM_REGISTERS {
94 result.registers[i] = a.registers[i].max(b.registers[i]);
95 }
96 result
97 }
98
99 pub fn clear(&mut self) {
101 for reg in &mut self.registers {
102 *reg = 0;
103 }
104 }
105
106 pub fn memory_bytes(&self) -> usize {
108 std::mem::size_of::<Self>() + self.registers.len()
109 }
110
111 pub fn as_bytes(&self) -> &[u8] {
113 &self.registers
114 }
115
116 pub fn from_bytes(bytes: Vec<u8>) -> Option<Self> {
118 if bytes.len() != NUM_REGISTERS {
119 return None;
120 }
121 Some(Self { registers: bytes })
122 }
123
124 fn hash(data: &[u8]) -> u64 {
126 let mut hash = 0xcbf29ce484222325u64;
127 for &byte in data {
128 hash ^= byte as u64;
129 hash = hash.wrapping_mul(0x100000001b3);
130 }
131 hash ^= hash >> 33;
133 hash = hash.wrapping_mul(0xff51afd7ed558ccd);
134 hash ^= hash >> 33;
135 hash = hash.wrapping_mul(0xc4ceb9fe1a85ec53);
136 hash ^= hash >> 33;
137 hash
138 }
139}
140
141impl Default for HyperLogLog {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[test]
152 fn test_hll_empty() {
153 let hll = HyperLogLog::new();
154 assert_eq!(hll.count(), 0);
155 }
156
157 #[test]
158 fn test_hll_single() {
159 let mut hll = HyperLogLog::new();
160 hll.add(b"hello");
161 assert!(hll.count() >= 1);
162 assert!(hll.count() <= 3); }
164
165 #[test]
166 fn test_hll_duplicates() {
167 let mut hll = HyperLogLog::new();
168 for _ in 0..1000 {
169 hll.add(b"same_value");
170 }
171 assert!(hll.count() <= 3);
173 }
174
175 #[test]
176 fn test_hll_accuracy_1k() {
177 let mut hll = HyperLogLog::new();
178 let n = 1000;
179 for i in 0..n {
180 let key = format!("element_{}", i);
181 hll.add(key.as_bytes());
182 }
183 let estimate = hll.count();
184 let error = (estimate as f64 - n as f64).abs() / n as f64;
185 println!(
186 "HLL 1K: actual={n}, estimate={estimate}, error={:.2}%",
187 error * 100.0
188 );
189 assert!(error < 0.10, "Error {error:.4} exceeds 10% for 1K elements");
190 }
191
192 #[test]
193 fn test_hll_accuracy_100k() {
194 let mut hll = HyperLogLog::new();
195 let n = 100_000;
196 for i in 0..n {
197 let key = format!("user_{}", i);
198 hll.add(key.as_bytes());
199 }
200 let estimate = hll.count();
201 let error = (estimate as f64 - n as f64).abs() / n as f64;
202 println!(
203 "HLL 100K: actual={n}, estimate={estimate}, error={:.2}%",
204 error * 100.0
205 );
206 assert!(
207 error < 0.05,
208 "Error {error:.4} exceeds 5% for 100K elements"
209 );
210 }
211
212 #[test]
213 fn test_hll_merge() {
214 let mut hll1 = HyperLogLog::new();
215 let mut hll2 = HyperLogLog::new();
216
217 for i in 0..500 {
218 hll1.add(format!("a_{}", i).as_bytes());
219 }
220 for i in 0..500 {
221 hll2.add(format!("b_{}", i).as_bytes());
222 }
223
224 let count1 = hll1.count();
225 let count2 = hll2.count();
226
227 hll1.merge(&hll2);
228 let merged_count = hll1.count();
229
230 assert!(merged_count > count1);
232 assert!(merged_count > count2);
233 let error = (merged_count as f64 - 1000.0).abs() / 1000.0;
234 assert!(error < 0.10);
235 }
236
237 #[test]
238 fn test_hll_serialization() {
239 let mut hll = HyperLogLog::new();
240 hll.add(b"test1");
241 hll.add(b"test2");
242
243 let bytes = hll.as_bytes().to_vec();
244 let restored = HyperLogLog::from_bytes(bytes).unwrap();
245 assert_eq!(hll.count(), restored.count());
246 }
247
248 #[test]
249 fn test_hll_memory() {
250 let hll = HyperLogLog::new();
251 let mem = hll.memory_bytes();
252 assert!(mem >= 16384);
254 assert!(mem < 20000);
255 }
256}