reddb_server/storage/primitives/
hyperloglog.rs1const DEFAULT_PRECISION: u8 = 14;
17const MIN_PRECISION: u8 = 4;
18const MAX_PRECISION: u8 = 18;
19
20#[derive(Debug, Clone)]
22pub struct HyperLogLog {
23 precision: u8,
24 registers: Vec<u8>,
26}
27
28impl HyperLogLog {
29 pub fn new() -> Self {
31 Self::with_precision(DEFAULT_PRECISION).expect("default HLL precision is valid")
32 }
33
34 pub fn with_precision(precision: u8) -> Option<Self> {
35 if !(MIN_PRECISION..=MAX_PRECISION).contains(&precision) {
36 return None;
37 }
38 Some(Self {
39 precision,
40 registers: vec![0u8; 1usize << precision],
41 })
42 }
43
44 pub fn precision(&self) -> u8 {
45 self.precision
46 }
47
48 pub fn add(&mut self, data: &[u8]) {
50 let hash = Self::hash(data);
51 let precision = self.precision as u32;
52 let index = (hash >> (64 - precision)) as usize;
53 let remaining = (hash << precision) | (1 << (precision - 1)); let rho = remaining.leading_zeros() as u8 + 1;
55 if rho > self.registers[index] {
56 self.registers[index] = rho;
57 }
58 }
59
60 pub fn count(&self) -> u64 {
62 let m = self.registers.len() as f64;
63 let alpha = 0.7213 / (1.0 + 1.079 / m);
64
65 let mut sum = 0.0f64;
67 let mut zeros = 0u32;
68 for ® in &self.registers {
69 sum += 2.0f64.powi(-(reg as i32));
70 if reg == 0 {
71 zeros += 1;
72 }
73 }
74
75 let raw_estimate = alpha * m * m / sum;
76
77 if raw_estimate <= 2.5 * m && zeros > 0 {
79 let lc = m * (m / zeros as f64).ln();
80 return lc as u64;
81 }
82
83 let two_pow_64 = 2.0f64.powi(64);
85 if raw_estimate > two_pow_64 / 30.0 {
86 let corrected = -two_pow_64 * (1.0 - raw_estimate / two_pow_64).ln();
87 return corrected as u64;
88 }
89
90 raw_estimate as u64
91 }
92
93 pub fn merge(&mut self, other: &HyperLogLog) {
95 if self.precision != other.precision {
96 return;
97 }
98 for (i, &other_reg) in other.registers.iter().enumerate() {
99 if other_reg > self.registers[i] {
100 self.registers[i] = other_reg;
101 }
102 }
103 }
104
105 pub fn merged(a: &HyperLogLog, b: &HyperLogLog) -> HyperLogLog {
107 let mut result = HyperLogLog::with_precision(a.precision).unwrap_or_default();
108 if a.precision != b.precision {
109 return result;
110 }
111 for i in 0..a.registers.len() {
112 result.registers[i] = a.registers[i].max(b.registers[i]);
113 }
114 result
115 }
116
117 pub fn clear(&mut self) {
119 for reg in &mut self.registers {
120 *reg = 0;
121 }
122 }
123
124 pub fn memory_bytes(&self) -> usize {
126 std::mem::size_of::<Self>() + self.registers.len()
127 }
128
129 pub fn as_bytes(&self) -> &[u8] {
131 &self.registers
132 }
133
134 pub fn from_bytes(bytes: Vec<u8>) -> Option<Self> {
136 let precision = bytes.len().checked_ilog2()? as u8;
137 if 1usize << precision != bytes.len()
138 || !(MIN_PRECISION..=MAX_PRECISION).contains(&precision)
139 {
140 return None;
141 }
142 Some(Self {
143 precision,
144 registers: bytes,
145 })
146 }
147
148 fn hash(data: &[u8]) -> u64 {
150 let mut hash = 0xcbf29ce484222325u64;
151 for &byte in data {
152 hash ^= byte as u64;
153 hash = hash.wrapping_mul(0x100000001b3);
154 }
155 hash ^= hash >> 33;
157 hash = hash.wrapping_mul(0xff51afd7ed558ccd);
158 hash ^= hash >> 33;
159 hash = hash.wrapping_mul(0xc4ceb9fe1a85ec53);
160 hash ^= hash >> 33;
161 hash
162 }
163}
164
165impl Default for HyperLogLog {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 #[test]
176 fn test_hll_empty() {
177 let hll = HyperLogLog::new();
178 assert_eq!(hll.count(), 0);
179 }
180
181 #[test]
182 fn test_hll_single() {
183 let mut hll = HyperLogLog::new();
184 hll.add(b"hello");
185 assert!(hll.count() >= 1);
186 assert!(hll.count() <= 3); }
188
189 #[test]
190 fn test_hll_duplicates() {
191 let mut hll = HyperLogLog::new();
192 for _ in 0..1000 {
193 hll.add(b"same_value");
194 }
195 assert!(hll.count() <= 3);
197 }
198
199 #[test]
200 fn test_hll_accuracy_1k() {
201 let mut hll = HyperLogLog::new();
202 let n = 1000;
203 for i in 0..n {
204 let key = format!("element_{}", i);
205 hll.add(key.as_bytes());
206 }
207 let estimate = hll.count();
208 let error = (estimate as f64 - n as f64).abs() / n as f64;
209 println!(
210 "HLL 1K: actual={n}, estimate={estimate}, error={:.2}%",
211 error * 100.0
212 );
213 assert!(error < 0.10, "Error {error:.4} exceeds 10% for 1K elements");
214 }
215
216 #[test]
217 fn test_hll_accuracy_100k() {
218 let mut hll = HyperLogLog::new();
219 let n = 100_000;
220 for i in 0..n {
221 let key = format!("user_{}", i);
222 hll.add(key.as_bytes());
223 }
224 let estimate = hll.count();
225 let error = (estimate as f64 - n as f64).abs() / n as f64;
226 println!(
227 "HLL 100K: actual={n}, estimate={estimate}, error={:.2}%",
228 error * 100.0
229 );
230 assert!(
231 error < 0.05,
232 "Error {error:.4} exceeds 5% for 100K elements"
233 );
234 }
235
236 #[test]
237 fn test_hll_merge() {
238 let mut hll1 = HyperLogLog::new();
239 let mut hll2 = HyperLogLog::new();
240
241 for i in 0..500 {
242 hll1.add(format!("a_{}", i).as_bytes());
243 }
244 for i in 0..500 {
245 hll2.add(format!("b_{}", i).as_bytes());
246 }
247
248 let count1 = hll1.count();
249 let count2 = hll2.count();
250
251 hll1.merge(&hll2);
252 let merged_count = hll1.count();
253
254 assert!(merged_count > count1);
256 assert!(merged_count > count2);
257 let error = (merged_count as f64 - 1000.0).abs() / 1000.0;
258 assert!(error < 0.10);
259 }
260
261 #[test]
262 fn test_hll_serialization() {
263 let mut hll = HyperLogLog::new();
264 hll.add(b"test1");
265 hll.add(b"test2");
266
267 let bytes = hll.as_bytes().to_vec();
268 let restored = HyperLogLog::from_bytes(bytes).unwrap();
269 assert_eq!(hll.count(), restored.count());
270 }
271
272 #[test]
273 fn test_hll_memory() {
274 let hll = HyperLogLog::new();
275 let mem = hll.memory_bytes();
276 assert!(mem >= 16384);
278 assert!(mem < 20000);
279 }
280}