1#[cfg(not(feature = "std"))]
2use alloc::{boxed::Box, sync::Arc, vec::Vec};
3#[cfg(feature = "std")]
4use std::sync::Arc;
5
6use core::sync::atomic::{AtomicU16, AtomicU32, AtomicU64, Ordering};
7use core::hash::{Hash, BuildHasher};
8
9use crate::arena::Arena;
10use crate::storage::{Cache, Node};
11use crate::filters::{T1, T2};
12use crate::lossy_queue::{LossyQueue, OneshotAck};
13use crate::{WorkerState, GLOBAL_EPOCH};
14
15const MAX_RANK: u8 = 3;
19
20pub enum Command<K, V> {
23 Insert(K, V, u64),
25 BatchInsert(Vec<(K, V, u64)>),
27 Remove(K, u64),
29 Clear(Arc<OneshotAck>),
31 Sync(Arc<OneshotAck>),
33 Shutdown,
35}
36
37pub struct Daemon<K, V, S> {
40 pub hasher: S,
41 pub arena: Arena,
42 pub t1: Arc<T1<K, V>>,
43 pub t2: Arc<T2<K, V>>,
44 pub cache: Arc<Cache<K, V>>,
45 pub cmd_rx: Arc<LossyQueue<Command<K, V>>>,
46 pub hit_rx: Arc<LossyQueue<[usize; 64]>>,
47 pub epoch: Arc<AtomicU32>,
48 pub poll_us: u64,
51 pub admission: Arc<AdmissionFilter>,
52 pub hit_accumulator: Vec<usize>,
54 pub last_decay_epoch: u32,
55 pub garbage_queue: Vec<(*mut Node<K, V>, usize)>,
56 pub worker_states: Arc<[WorkerState]>,
57 pub daemon_tick: Arc<AtomicU64>,
61}
62
63unsafe impl<K: Send, V: Send, S: Send> Send for Daemon<K, V, S> {}
64
65impl<K, V, S> Daemon<K, V, S>
66where
67 K: Hash + Eq + Send + Sync + Clone + 'static,
68 V: Send + Sync + Clone + 'static,
69 S: BuildHasher + Clone + Send + 'static,
70{
71 #[allow(clippy::too_many_arguments)]
72 pub fn new(
73 hasher: S,
74 capacity: usize,
75 t1: Arc<T1<K, V>>,
76 t2: Arc<T2<K, V>>,
77 cache: Arc<Cache<K, V>>,
78 cmd_rx: Arc<LossyQueue<Command<K, V>>>,
79 hit_rx: Arc<LossyQueue<[usize; 64]>>,
80 epoch: Arc<AtomicU32>,
81 duration: u32,
82 poll_us: u64,
83 worker_states: Arc<[WorkerState]>,
84 daemon_tick: Arc<AtomicU64>,
85 ) -> Self {
86 let _ = duration; Self {
88 hasher,
89 arena: Arena::new(capacity),
90 t1,
91 t2,
92 cache,
93 cmd_rx,
94 hit_rx,
95 epoch,
96 poll_us,
97 admission: Arc::new(AdmissionFilter::new(capacity)),
98 hit_accumulator: Vec::with_capacity(8192),
99 last_decay_epoch: 0,
100 garbage_queue: Vec::new(),
101 worker_states,
102 daemon_tick,
103 }
104 }
105
106 pub fn run(mut self) {
117 #[cfg(feature = "std")]
118 let mut last_epoch_tick = std::time::Instant::now();
119
120 loop {
121 let mut processed = 0u32;
123 loop {
124 match self.cmd_rx.try_recv() {
125 Some(Command::Shutdown) => return,
126 Some(cmd) => {
127 self.process_cmd(cmd);
128 processed += 1;
129 if processed >= 8192 {
130 break;
131 }
132 }
133 None => break,
134 }
135 }
136
137 #[cfg(feature = "std")]
141 {
142 let now = std::time::Instant::now();
143 if now.duration_since(last_epoch_tick)
144 >= std::time::Duration::from_millis(100)
145 {
146 self.epoch.fetch_add(1, Ordering::Relaxed);
147 last_epoch_tick = now;
148 }
149 }
150 #[cfg(not(feature = "std"))]
151 {
152 let tick = self.daemon_tick.load(Ordering::Relaxed);
153 if tick % 100 == 0 {
154 self.epoch.fetch_add(1, Ordering::Relaxed);
155 }
156 }
157
158 self.maintenance();
160
161 self.daemon_tick.fetch_add(1, Ordering::Relaxed);
163
164 if processed == 0 {
166 #[cfg(feature = "std")]
167 std::thread::sleep(std::time::Duration::from_micros(self.poll_us));
168 #[cfg(not(feature = "std"))]
169 core::hint::spin_loop();
170 }
171 }
172 }
173
174 #[inline(always)]
175 fn process_cmd(&mut self, cmd: Command<K, V>) {
176 match cmd {
177 Command::Insert(k, v, hash) => self.handle_admission_insert(k, v, hash),
178 Command::BatchInsert(batch) => {
179 for (k, v, hash) in batch {
180 self.handle_admission_insert(k, v, hash);
181 }
182 }
183 Command::Remove(k, hash) => self.handle_remove(k, hash),
184 Command::Clear(ack) => {
185 self.handle_clear();
186 ack.signal();
187 }
188 Command::Sync(ack) => {
189 self.maintenance();
190 ack.signal();
191 }
192 Command::Shutdown => unreachable!("handled in run()"),
193 }
194 }
195
196 fn handle_admission_insert(&mut self, k: K, v: V, hash: u64) {
200 let cold_start = self.arena.free_list_len() > self.arena.capacity / 20;
201 if cold_start || self.admission.check_ghost(hash) {
202 self.handle_insert_with_hash(k, v, hash);
203 }
204 }
205
206 fn handle_insert_with_hash(&mut self, k: K, v: V, hash: u64) {
207 let tag = (hash >> 48) as u16;
208
209 let global_idx = if let Some(existing_idx) = self.cache.index_probe(hash, tag) {
211 existing_idx
212 } else {
213 if self.arena.free_list_empty() {
215 self.evict_batch();
216 }
217 if let Some(new_idx) = self.arena.pop_free_slot() {
218 new_idx
219 } else {
220 return; }
222 };
223
224 let entry = (tag as u64) << 48 | (global_idx as u64 & 0x0000_FFFF_FFFF_FFFF);
225
226 let node_ptr = Box::into_raw(Box::new(Node {
227 key: k,
228 value: v,
229 expire_at: self.epoch.load(Ordering::Relaxed) + self.get_duration(),
230 g_idx: global_idx as u32,
231 }));
232
233 let old_ptr = self.cache.nodes[global_idx].swap(node_ptr, Ordering::Release);
234 if !old_ptr.is_null() {
235 let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
236 self.garbage_queue.push((old_ptr, epoch));
237 }
238
239 self.cache.index_store(hash, tag, entry);
240 self.arena.set_hash(global_idx, hash);
241 self.arena.set_rank(global_idx, MAX_RANK);
243 }
244
245 fn get_duration(&self) -> u32 {
246 10
249 }
250
251 fn handle_remove(&mut self, _k: K, hash: u64) {
252 let tag = (hash >> 48) as u16;
253 if let Some(g_idx) = self.cache.index_probe(hash, tag) {
254 let old_ptr =
255 self.cache.nodes[g_idx].swap(core::ptr::null_mut(), Ordering::Release);
256 if !old_ptr.is_null() {
257 let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
258 self.garbage_queue.push((old_ptr, epoch));
259 self.t1.clear_if_matches(hash, old_ptr);
260 self.t2.clear_if_matches(hash, old_ptr);
261 }
262 self.cache.index_remove(hash, tag, g_idx);
263 self.arena.set_rank(g_idx, 0); }
265 }
266
267 fn handle_clear(&mut self) {
268 self.cache.clear();
269 for i in 0..self.t1.len() {
270 self.t1.clear_at(i);
271 }
272 for i in 0..self.t2.len() {
273 self.t2.clear_at(i);
274 }
275 self.admission.clear();
276 self.arena.clear();
277 }
278
279 fn maintenance(&mut self) {
280 let current_global = GLOBAL_EPOCH.load(Ordering::Relaxed);
282 GLOBAL_EPOCH.store(current_global + 1, Ordering::Release);
283
284 let mut min_active_epoch = current_global + 1;
285 for state in self.worker_states.iter() {
286 let local = state.local_epoch.load(Ordering::Acquire);
287 if local != 0 && local < min_active_epoch {
288 min_active_epoch = local;
289 }
290 }
291
292 self.garbage_queue.retain(|&(ptr, epoch)| {
293 if epoch < min_active_epoch {
294 unsafe { drop(Box::from_raw(ptr)) };
295 false
296 } else {
297 true
298 }
299 });
300
301 while let Some(batch) = self.hit_rx.try_recv() {
303 for &g_idx in batch.iter() {
304 if g_idx < self.arena.capacity {
305 self.hit_accumulator.push(g_idx);
306 }
307 }
308 if self.hit_accumulator.len() >= 8192 {
309 break;
310 }
311 }
312
313 if !self.hit_accumulator.is_empty() {
315 self.hit_accumulator.sort_unstable();
316
317 for &g_idx in &self.hit_accumulator {
318 self.arena.set_rank(g_idx, MAX_RANK);
320
321 let hash = self.arena.get_hash(g_idx);
322
323 let ptr = self.cache.nodes[g_idx].load(Ordering::Acquire);
325 if !ptr.is_null() && self.t1.load_slot(hash) != ptr {
326 self.t1.store_slot(hash, ptr);
327 }
328 }
329
330 self.hit_accumulator.clear();
331 }
332
333 if self.arena.free_list_len() < self.arena.capacity / 10 {
334 self.evict_batch();
335 }
336 }
337
338 fn evict_batch(&mut self) {
341 let count = 128;
342 let avg = (self.arena.count_sum() / self.arena.capacity as u64) as u8;
343 let threshold = avg.max(1);
344
345 for _ in 0..count {
346 if self.arena.free_list_len() > self.arena.capacity / 10 {
347 break;
348 }
349
350 let idx = self.arena.cursor();
351 let r = self.arena.get_rank(idx);
352
353 if r <= threshold {
354 let hash = self.arena.get_hash(idx);
356 let tag = (hash >> 48) as u16;
357
358 let old_ptr =
359 self.cache.nodes[idx].swap(core::ptr::null_mut(), Ordering::Release);
360 if !old_ptr.is_null() {
361 let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
362 self.garbage_queue.push((old_ptr, epoch));
363 self.t1.clear_if_matches(hash, old_ptr);
364 self.t2.clear_if_matches(hash, old_ptr);
365 }
366
367 self.cache.index_remove(hash, tag, idx);
368
369 self.admission.record_death(hash);
373 self.arena.push_free_slot(idx);
374 self.arena.set_rank(idx, 0);
375 } else {
376 self.arena.decrement_rank(idx);
378 }
379 self.arena.advance_cursor();
380 }
381 }
382}
383
384pub struct AdmissionFilter {
397 pub ghost_mask: usize,
398 pub ghost_set: Arc<[AtomicU16]>,
399}
400
401impl AdmissionFilter {
402 pub fn new(capacity: usize) -> Self {
405 let ghost_size = capacity.max(256);
408
409 let mut ghost_vec = Vec::with_capacity(ghost_size);
410 for _ in 0..ghost_size {
411 ghost_vec.push(AtomicU16::new(0));
412 }
413
414 Self {
415 ghost_mask: ghost_size - 1,
416 ghost_set: ghost_vec.into_boxed_slice().into(),
417 }
418 }
419
420 #[inline(always)]
422 pub fn record_death(&self, hash: u64) {
423 let fp = (hash >> 48) as u16;
424 let idx = (hash as usize) & self.ghost_mask;
425 self.ghost_set[idx].store(fp, Ordering::Relaxed);
426 }
427
428 #[inline(always)]
431 pub fn check_ghost(&self, hash: u64) -> bool {
432 let fp = (hash >> 48) as u16;
433 let ghost_idx = (hash as usize) & self.ghost_mask;
434 self.ghost_set[ghost_idx].load(Ordering::Relaxed) == fp
435 }
436
437 pub fn clear(&self) {
438 for val in self.ghost_set.iter() {
439 val.store(0, Ordering::Relaxed);
440 }
441 }
442}