1use ahash::RandomState;
19use crossbeam_queue::SegQueue;
20use std::marker::PhantomData;
21use std::sync::atomic::AtomicUsize;
22use std::sync::atomic::{
23 AtomicBool, AtomicU8,
24 Ordering::{Acquire, Relaxed, SeqCst},
25};
26
27mod atomic;
28mod buckets;
29mod estimator;
30mod uno;
31
32use buckets::Buckets;
33use std::hash::Hash;
34use uno::{UnoLearner, UnoSketch};
35
36const SMALL: bool = false;
37const MAIN: bool = true;
38
39#[derive(Debug, Default)]
41struct Location(AtomicBool);
42
43impl Location {
44 fn new_small() -> Self {
45 Self(AtomicBool::new(SMALL))
46 }
47
48 fn value(&self) -> bool {
49 self.0.load(Relaxed)
50 }
51
52 fn is_main(&self) -> bool {
53 self.value()
54 }
55
56 fn move_to_main(&self) {
57 self.0.store(true, Relaxed);
58 }
59}
60
61const USES_CAP: u8 = 3;
64
65#[derive(Debug, Default)]
66struct Uses(AtomicU8);
67
68impl Uses {
69 pub fn inc_uses(&self) -> u8 {
70 loop {
71 let uses = self.uses();
72 if uses >= USES_CAP {
73 return uses;
74 }
75 if let Err(new) = self.0.compare_exchange(uses, uses + 1, Acquire, Relaxed) {
76 if new >= USES_CAP {
78 return new;
80 } } else {
82 return uses + 1;
83 }
84 }
85 }
86
87 pub fn decr_uses(&self) -> u8 {
89 loop {
90 let uses = self.uses();
91 if uses == 0 {
92 return 0;
93 }
94 if let Err(new) = self.0.compare_exchange(uses, uses - 1, Acquire, Relaxed) {
95 if new == 0 {
97 return 0;
98 } } else {
100 return uses;
101 }
102 }
103 }
104
105 pub fn uses(&self) -> u8 {
106 self.0.load(Relaxed)
107 }
108}
109
110type Key = u64;
111type Weight = u16;
112
113#[derive(Clone)]
115pub struct KV<T> {
116 pub key: Key,
119 pub data: T,
120 pub weight: Weight,
121}
122
123struct Bucket<T> {
125 uses: Uses,
126 queue: Location,
127 weight: Weight,
128 data: T,
129}
130
131const SMALL_QUEUE_PERCENTAGE: f32 = 0.1;
132
133struct FiFoQueues<T> {
134 total_weight_limit: usize,
135
136 small: SegQueue<Key>,
137 small_weight: AtomicUsize,
138
139 main: SegQueue<Key>,
140 main_weight: AtomicUsize,
141
142 estimator: UnoSketch,
144 learner: UnoLearner,
146
147 _t: PhantomData<T>,
148}
149
150impl<T: Clone + Send + Sync + 'static> FiFoQueues<T> {
151 fn admit(
152 &self,
153 key: Key,
154 data: T,
155 weight: u16,
156 ignore_learner: bool,
157 buckets: &Buckets<T>,
158 ) -> Vec<KV<T>> {
159 assert!(weight > 0);
165 let new_bucket = {
166 let Some((uses, queue, weight)) = buckets.get_map(&key, |bucket| {
167 let old_weight = bucket.weight;
169 let uses = bucket.uses.uses();
170 fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) {
171 if old == new {
172 return;
173 }
174 if old > new {
175 weight.fetch_sub((old - new) as usize, SeqCst);
176 } else {
177 weight.fetch_add((new - old) as usize, SeqCst);
178 }
179 }
180 let queue = bucket.queue.is_main();
181 if queue == MAIN {
182 update_atomic(&self.main_weight, old_weight, weight);
183 } else {
184 update_atomic(&self.small_weight, old_weight, weight);
185 }
186 (uses, queue, weight)
187 }) else {
188 let mut evicted = self.evict_to_limit(weight, buckets);
189 let (key, data, weight) = if !ignore_learner && evicted.len() == 1 {
190 self.estimator.tick();
193 let evicted_first = &evicted[0];
194 let evicted_metrics = self.estimator.get(evicted_first.key);
195 let evicted_pred = self.learner.predict(evicted_metrics.0, evicted_metrics.1);
196 let new_metrics = self.estimator.get(key);
197 let new_pred = self.learner.predict(new_metrics.0, new_metrics.1);
198 if evicted_pred > new_pred {
199 let first = evicted.pop().expect("just check non-empty");
201 evicted.push(KV { key, data, weight });
203 if evicted_pred < 0.0 {
204 self.learner
205 .weight_update(evicted_metrics.0, evicted_metrics.1, true);
206 }
207 self.estimator.incr_freq_only(key);
208 (first.key, first.data, first.weight)
209 } else {
210 if evicted_pred > 0.0 {
211 self.learner
212 .weight_update(evicted_metrics.0, evicted_metrics.1, false);
213 }
214 self.estimator.incr_freq_only(key);
215 self.estimator.incr_reuse_only(key);
216 (key, data, weight)
217 }
218 } else {
219 self.estimator.incr(key);
220 (key, data, weight)
221 };
222
223 let metrics = self.estimator.get(key);
225 let check_main =
226 metrics.0 > 2 && (metrics.0 > 3 || metrics.1 < self.learner.scale_factor * 8);
227
228 let location = Location::new_small();
229 let count = Uses::default();
230 if check_main {
231 location.move_to_main();
232 }
233
234 let bucket = Bucket {
235 queue: location,
236 weight,
237 uses: count, data,
239 };
240 let old = buckets.insert(key, bucket);
241 if old.is_none() {
242 if check_main {
246 self.main.push(key);
247 self.main_weight.fetch_add(weight as usize, SeqCst);
248 } else {
249 self.small.push(key);
250 self.small_weight.fetch_add(weight as usize, SeqCst);
251 }
252 } return evicted;
254 };
255 Bucket {
256 queue: Location(queue.into()),
257 weight,
258 uses: Uses(uses.into()),
259 data,
260 }
261 };
262
263 buckets.insert(key, new_bucket);
265
266 self.evict_to_limit(0, buckets)
270 }
271
272 fn evict_to_limit(&self, extra_weight: Weight, buckets: &Buckets<T>) -> Vec<KV<T>> {
275 let mut evicted = if self.total_weight_limit
276 < self.small_weight.load(SeqCst) + self.main_weight.load(SeqCst) + extra_weight as usize
277 {
278 Vec::with_capacity(1)
279 } else {
280 vec![]
281 };
282 if self.total_weight_limit
283 < self.small_weight.load(SeqCst) + self.main_weight.load(SeqCst) + extra_weight as usize
284 {
285 if let Some(evicted_item) = self.evict_one(buckets) {
286 evicted.push(evicted_item);
287 }
288 }
289
290 evicted
291 }
292
293 fn evict_one(&self, buckets: &Buckets<T>) -> Option<KV<T>> {
294 let evict_small = self.small_weight_limit() <= self.small_weight.load(SeqCst);
295
296 if evict_small {
297 let evicted = self.evict_one_from_small(buckets);
298 if evicted.is_some() {
301 return evicted;
302 }
303 }
304 self.evict_one_from_main(buckets)
305 }
306
307 fn small_weight_limit(&self) -> usize {
308 (self.total_weight_limit as f32 * SMALL_QUEUE_PERCENTAGE).floor() as usize + 1
309 }
310
311 fn evict_one_from_small(&self, buckets: &Buckets<T>) -> Option<KV<T>> {
312 loop {
313 let Some(to_evict) = self.small.pop() else {
314 return None;
316 };
317
318 let v = buckets
319 .get_map(&to_evict, |bucket| {
320 let weight = bucket.weight;
321 self.small_weight.fetch_sub(weight as usize, SeqCst);
322
323 if bucket.uses.uses() > 1 {
324 bucket.queue.move_to_main();
326 self.main.push(to_evict);
327 self.main_weight.fetch_add(weight as usize, SeqCst);
328 None
330 } else {
331 let data = bucket.data.clone();
332 let weight = bucket.weight;
333 buckets.remove(&to_evict);
334 Some(KV {
335 key: to_evict,
336 data,
337 weight,
338 })
339 }
340 })
341 .flatten();
342 if v.is_some() {
343 return v;
345 }
346 }
347 }
348
349 fn evict_one_from_main(&self, buckets: &Buckets<T>) -> Option<KV<T>> {
350 loop {
351 let to_evict = self.main.pop()?;
352
353 if let Some(v) = buckets
354 .get_map(&to_evict, |bucket| {
355 if bucket.uses.decr_uses() > 0 {
356 self.main.push(to_evict);
358 None
360 } else {
361 let weight = bucket.weight;
363 self.main_weight.fetch_sub(weight as usize, SeqCst);
364 let data = bucket.data.clone();
365 buckets.remove(&to_evict);
366 Some(KV {
367 key: to_evict,
368 data,
369 weight,
370 })
371 }
372 })
373 .flatten()
374 {
375 return Some(v);
377 }
378 }
379 }
380}
381
382pub struct Breve<K, T> {
384 queues: FiFoQueues<T>,
385 buckets: Buckets<T>,
386 random_status: RandomState,
387 _k: PhantomData<K>,
388}
389impl<K: Hash, T: Clone + Send + Sync + 'static> Breve<K, T> {
390 pub fn new(total_weight_limit: usize, estimated_size: usize, learning_rate: f32) -> Self {
393 let queues = FiFoQueues {
394 small: SegQueue::new(),
395 small_weight: 0.into(),
396 main: SegQueue::new(),
397 main_weight: 0.into(),
398 total_weight_limit,
399 estimator: UnoSketch::new(estimated_size),
400 learner: UnoLearner::new(learning_rate, estimated_size),
401 _t: PhantomData,
402 };
403 Breve {
404 queues,
405 buckets: Buckets::new_fast(estimated_size),
406 random_status: RandomState::new(),
407 _k: PhantomData,
408 }
409 }
410
411 pub fn new_compact(
416 total_weight_limit: usize,
417 estimated_size: usize,
418 learning_rate: f32,
419 ) -> Self {
420 let queues = FiFoQueues {
421 small: SegQueue::new(),
422 small_weight: 0.into(),
423 main: SegQueue::new(),
424 main_weight: 0.into(),
425 total_weight_limit,
426 estimator: UnoSketch::new_compact(
427 (estimated_size as f32 * SMALL_QUEUE_PERCENTAGE) as usize,
428 ),
429 learner: UnoLearner::new(
430 learning_rate,
431 (estimated_size as f32 * SMALL_QUEUE_PERCENTAGE) as usize,
432 ),
433 _t: PhantomData,
434 };
435 Breve {
436 queues,
437 buckets: Buckets::new_compact(estimated_size, 32),
438 random_status: RandomState::new(),
439 _k: PhantomData,
440 }
441 }
442
443 pub fn get(&self, key: &K) -> Option<T> {
449 let key = self.random_status.hash_one(key);
450 self.buckets.get_map(&key, |p| {
451 p.uses.inc_uses();
452 p.data.clone()
453 })
454 }
455
456 pub fn put(&self, key: K, data: T, weight: Weight) -> Vec<KV<T>> {
460 let key = self.random_status.hash_one(&key);
461 self.queues.admit(key, data, weight, false, &self.buckets)
462 }
463
464 pub fn remove(&self, key: K) {
465 let key = self.random_status.hash_one(key);
466 self.buckets.get_map(&key, |p| {
467 if p.queue.is_main() {
468 self.queues.main_weight.fetch_sub(p.weight as usize, SeqCst);
469 } else {
470 self.queues
471 .small_weight
472 .fetch_sub(p.weight as usize, SeqCst);
473 }
474 });
475
476 self.buckets.remove(&key);
478 }
479
480 pub fn force_put(&self, key: K, data: T, weight: Weight) -> Vec<KV<T>> {
495 let key = self.random_status.hash_one(&key);
496 self.queues.admit(key, data, weight, true, &self.buckets)
497 }
498}