1#[cfg(not(feature = "std"))]
2use alloc::{boxed::Box, vec::Vec};
3
4use crate::sync::{Arc, ArcSlice, new_arc_slice};
5use crate::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, AtomicU64, Ordering};
6use core::hash::{Hash, BuildHasher};
7
8use crate::arena::Arena;
9use crate::storage::{Cache, Node};
10use crate::filters::{T1, T2};
11use crate::lossy_queue::{LossyQueue, OneshotAck};
12use crate::{WorkerState, GLOBAL_EPOCH};
13
14const MAX_RANK: u8 = 3;
18
19pub enum Command<K, V> {
22 Insert(K, V, u64),
24 BatchInsert(Vec<(K, V, u64)>),
26 Remove(K, u64),
28 Clear(Arc<OneshotAck>),
30 Sync(Arc<OneshotAck>),
32 Shutdown,
34}
35
36pub struct Daemon<K, V, S> {
39 pub hasher: S,
40 pub arena: Arena,
41 pub t1: Arc<T1<K, V>>,
42 pub t2: Arc<T2<K, V>>,
43 pub cache: Arc<Cache<K, V>>,
44 pub cmd_rx: Arc<LossyQueue<Command<K, V>>>,
45 pub hit_rx: Arc<LossyQueue<[usize; 64]>>,
46 pub epoch: Arc<AtomicU32>,
47 pub poll_us: u64,
50 pub admission: Arc<AdmissionFilter>,
51 pub hit_accumulator: Vec<usize>,
53 pub last_decay_epoch: u32,
54 pub garbage_queue: Vec<(*mut Node<K, V>, usize)>,
55 pub worker_states: ArcSlice<WorkerState>,
56 pub daemon_tick: Arc<AtomicU64>,
60 pub is_cold_start: Arc<AtomicBool>,
62}
63
64unsafe impl<K: Send, V: Send, S: Send> Send for Daemon<K, V, S> {}
65
66impl<K, V, S> Daemon<K, V, S>
67where
68 K: Hash + Eq + Send + Sync + Clone + 'static,
69 V: Send + Sync + Clone + 'static,
70 S: BuildHasher + Clone + Send + 'static,
71{
72 #[allow(clippy::too_many_arguments)]
73 pub fn new(
74 hasher: S,
75 capacity: usize,
76 t1: Arc<T1<K, V>>,
77 t2: Arc<T2<K, V>>,
78 cache: Arc<Cache<K, V>>,
79 cmd_rx: Arc<LossyQueue<Command<K, V>>>,
80 hit_rx: Arc<LossyQueue<[usize; 64]>>,
81 epoch: Arc<AtomicU32>,
82 duration: u32,
83 poll_us: u64,
84 worker_states: ArcSlice<WorkerState>,
85 daemon_tick: Arc<AtomicU64>,
86 is_cold_start: Arc<AtomicBool>,
87 ) -> Self {
88 let _ = duration; Self {
90 hasher,
91 arena: Arena::new(capacity),
92 t1,
93 t2,
94 cache,
95 cmd_rx,
96 hit_rx,
97 epoch,
98 poll_us,
99 admission: Arc::new(AdmissionFilter::new(capacity)),
100 hit_accumulator: Vec::with_capacity(8192),
101 last_decay_epoch: 0,
102 garbage_queue: Vec::new(),
103 worker_states,
104 daemon_tick,
105 is_cold_start,
106 }
107 }
108
109 pub fn run(mut self) {
120 #[cfg(feature = "std")]
121 let mut last_epoch_tick = std::time::Instant::now();
122
123 loop {
124 let mut processed = 0u32;
126 loop {
127 match self.cmd_rx.try_recv() {
128 Some(Command::Shutdown) => return,
129 Some(cmd) => {
130 self.process_cmd(cmd);
131 processed += 1;
132 if processed >= 8192 {
133 break;
134 }
135 }
136 None => break,
137 }
138 }
139
140 #[cfg(feature = "std")]
144 {
145 let now = std::time::Instant::now();
146 if now.duration_since(last_epoch_tick)
147 >= std::time::Duration::from_millis(100)
148 {
149 self.epoch.fetch_add(1, Ordering::Relaxed);
150 last_epoch_tick = now;
151 }
152 }
153 #[cfg(not(feature = "std"))]
154 {
155 let tick = self.daemon_tick.load(Ordering::Relaxed);
156 if tick % 100 == 0 {
157 self.epoch.fetch_add(1, Ordering::Relaxed);
158 }
159 }
160
161 self.maintenance();
163
164 self.daemon_tick.fetch_add(1, Ordering::Relaxed);
166
167 if processed == 0 {
169 #[cfg(any(feature = "loom", loom))]
170 loom::thread::yield_now();
171 #[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
172 std::thread::sleep(std::time::Duration::from_micros(self.poll_us));
173 #[cfg(not(feature = "std"))]
174 core::hint::spin_loop();
175 }
176 }
177 }
178
179 #[inline(always)]
180 fn process_cmd(&mut self, cmd: Command<K, V>) {
181 match cmd {
182 Command::Insert(k, v, hash) => self.handle_admission_insert(k, v, hash),
183 Command::BatchInsert(batch) => {
184 for (k, v, hash) in batch {
185 self.handle_admission_insert(k, v, hash);
186 }
187 }
188 Command::Remove(k, hash) => self.handle_remove(k, hash),
189 Command::Clear(ack) => {
190 self.handle_clear();
191 ack.signal();
192 }
193 Command::Sync(ack) => {
194 self.maintenance();
195 ack.signal();
196 }
197 Command::Shutdown => unreachable!("handled in run()"),
198 }
199 }
200
201 fn handle_admission_insert(&mut self, k: K, v: V, hash: u64) {
205 let cold_start = self.arena.free_list_len() > self.arena.capacity / 20;
206 self.is_cold_start.store(cold_start, Ordering::Relaxed);
207 if cold_start || self.admission.check_ghost(hash) {
208 self.handle_insert_with_hash(k, v, hash);
209 }
210 }
211
212 fn handle_insert_with_hash(&mut self, k: K, v: V, hash: u64) {
213 let tag = (hash >> 48) as u16;
214
215 let global_idx = if let Some(existing_idx) = self.cache.index_probe(hash, tag) {
217 existing_idx
218 } else {
219 if self.arena.free_list_empty() {
221 self.evict_batch();
222 }
223 if let Some(new_idx) = self.arena.pop_free_slot() {
224 new_idx
225 } else {
226 return; }
228 };
229
230 let entry = (tag as u64) << 48 | (global_idx as u64 & 0x0000_FFFF_FFFF_FFFF);
231
232 let node_ptr = Box::into_raw(Box::new(Node {
233 key: k,
234 value: v,
235 expire_at: self.epoch.load(Ordering::Relaxed) + self.get_duration(),
236 g_idx: global_idx as u32,
237 }));
238
239 let old_ptr = self.cache.nodes[global_idx].swap(node_ptr, Ordering::Release);
240 if !old_ptr.is_null() {
241 let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
242 self.garbage_queue.push((old_ptr, epoch));
243 }
244
245 self.cache.index_store(hash, tag, entry);
246 self.arena.set_hash(global_idx, hash);
247 self.arena.set_rank(global_idx, MAX_RANK);
249 }
250
251 fn get_duration(&self) -> u32 {
252 10
255 }
256
257 fn handle_remove(&mut self, _k: K, hash: u64) {
258 let tag = (hash >> 48) as u16;
259 if let Some(g_idx) = self.cache.index_probe(hash, tag) {
260 let old_ptr =
261 self.cache.nodes[g_idx].swap(core::ptr::null_mut(), Ordering::Release);
262 if !old_ptr.is_null() {
263 let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
264 self.garbage_queue.push((old_ptr, epoch));
265 self.t1.clear_if_matches(hash, old_ptr);
266 self.t2.clear_if_matches(hash, old_ptr);
267 }
268 self.cache.index_remove(hash, tag, g_idx);
269 self.arena.set_rank(g_idx, 0); }
271 }
272
273 fn handle_clear(&mut self) {
274 self.cache.clear();
275 for i in 0..self.t1.len() {
276 self.t1.clear_at(i);
277 }
278 for i in 0..self.t2.len() {
279 self.t2.clear_at(i);
280 }
281 self.admission.clear();
282 self.arena.clear();
283 self.is_cold_start.store(true, Ordering::Relaxed);
284 }
285
286 fn maintenance(&mut self) {
287 let current_global = GLOBAL_EPOCH.load(Ordering::Relaxed);
289 GLOBAL_EPOCH.store(current_global + 1, Ordering::Release);
290
291 let mut min_active_epoch = current_global + 1;
292 for state in self.worker_states.iter() {
293 let local = state.local_epoch.load(Ordering::Acquire);
294 if local != 0 && local < min_active_epoch {
295 min_active_epoch = local;
296 }
297 }
298
299 self.garbage_queue.retain(|&(ptr, epoch)| {
300 if epoch < min_active_epoch {
301 unsafe { drop(Box::from_raw(ptr)) };
302 false
303 } else {
304 true
305 }
306 });
307
308 while let Some(batch) = self.hit_rx.try_recv() {
310 for &g_idx in batch.iter() {
311 if g_idx < self.arena.capacity {
312 self.hit_accumulator.push(g_idx);
313 }
314 }
315 if self.hit_accumulator.len() >= 8192 {
316 break;
317 }
318 }
319
320 if !self.hit_accumulator.is_empty() {
322 self.hit_accumulator.sort_unstable();
323
324 for &g_idx in &self.hit_accumulator {
325 self.arena.set_rank(g_idx, MAX_RANK);
327
328 let hash = self.arena.get_hash(g_idx);
329
330 let ptr = self.cache.nodes[g_idx].load(Ordering::Acquire);
332 if !ptr.is_null() && self.t1.load_slot(hash) != ptr {
333 self.t1.store_slot(hash, ptr);
334 }
335 }
336
337 self.hit_accumulator.clear();
338 }
339
340 if self.arena.free_list_len() < self.arena.capacity / 10 {
341 self.evict_batch();
342 }
343
344 let cold_start = self.arena.free_list_len() > self.arena.capacity / 20;
345 self.is_cold_start.store(cold_start, Ordering::Relaxed);
346 }
347
348 fn evict_batch(&mut self) {
351 let count = 128;
352 let avg = (self.arena.count_sum() / self.arena.capacity as u64) as u8;
353 let threshold = avg.max(1);
354
355 for _ in 0..count {
356 if self.arena.free_list_len() > self.arena.capacity / 10 {
357 break;
358 }
359
360 let idx = self.arena.cursor();
361 let r = self.arena.get_rank(idx);
362
363 if r <= threshold {
364 let hash = self.arena.get_hash(idx);
366 let tag = (hash >> 48) as u16;
367
368 let old_ptr =
369 self.cache.nodes[idx].swap(core::ptr::null_mut(), Ordering::Release);
370 if !old_ptr.is_null() {
371 let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
372 self.garbage_queue.push((old_ptr, epoch));
373 self.t1.clear_if_matches(hash, old_ptr);
374 self.t2.clear_if_matches(hash, old_ptr);
375 }
376
377 self.cache.index_remove(hash, tag, idx);
378
379 self.admission.record_death(hash);
383 self.arena.push_free_slot(idx);
384 self.arena.set_rank(idx, 0);
385 } else {
386 self.arena.decrement_rank(idx);
388 }
389 self.arena.advance_cursor();
390 }
391 }
392}
393
394impl<K, V, S> Drop for Daemon<K, V, S> {
395 fn drop(&mut self) {
396 for &(ptr, _) in self.garbage_queue.iter() {
397 if !ptr.is_null() {
398 unsafe {
399 let _ = Box::from_raw(ptr);
400 }
401 }
402 }
403 }
404}
405
406pub struct AdmissionFilter {
419 pub ghost_mask: usize,
420 pub ghost_set: ArcSlice<AtomicU16>,
421}
422
423impl AdmissionFilter {
424 pub fn new(capacity: usize) -> Self {
427 let ghost_size = capacity.max(256);
430
431 let mut ghost_vec = Vec::with_capacity(ghost_size);
432 for _ in 0..ghost_size {
433 ghost_vec.push(AtomicU16::new(0));
434 }
435
436 Self {
437 ghost_mask: ghost_size - 1,
438 ghost_set: new_arc_slice(ghost_vec),
439 }
440 }
441
442 #[inline(always)]
444 pub fn record_death(&self, hash: u64) {
445 let fp = (hash >> 48) as u16;
446 let idx = (hash as usize) & self.ghost_mask;
447 self.ghost_set[idx].store(fp, Ordering::Relaxed);
448 }
449
450 #[inline(always)]
453 pub fn check_ghost(&self, hash: u64) -> bool {
454 let fp = (hash >> 48) as u16;
455 let ghost_idx = (hash as usize) & self.ghost_mask;
456 self.ghost_set[ghost_idx].load(Ordering::Relaxed) == fp
457 }
458
459 pub fn clear(&self) {
460 for val in self.ghost_set.iter() {
461 val.store(0, Ordering::Relaxed);
462 }
463 }
464}