1use std::sync::Arc;
2use std::sync::atomic::{AtomicU16, AtomicU32, Ordering};
3use crossbeam_channel::{Receiver, Sender};
4use crate::unsafe_core::{T1, T2, Cache, Node, Arena};
5use crate::{WorkerState, GLOBAL_EPOCH};
6use std::hash::{Hash, BuildHasher};
7
8const MAX_RANK: u8 = 3;
11
12pub enum Command<K, V> {
13 Insert(K, V, u64),
15 BatchInsert(Vec<(K, V, u64)>),
17 Remove(K, u64),
18 Clear(Sender<()>),
19 Sync(Sender<()>),
20}
21
22
23
24pub struct Daemon<K, V, S> {
25 pub hasher: S,
26 pub arena: Arena,
27 pub t1: Arc<T1<K, V>>,
28 pub t2: Arc<T2<K, V>>,
29 pub cache: Arc<Cache<K, V>>,
30 pub cmd_rx: Receiver<Command<K, V>>,
31 pub hit_rx: Receiver<[usize; 64]>,
32 pub epoch: Arc<AtomicU32>,
33 pub duration: u32,
34 pub admission: Arc<AdmissionFilter>,
35 pub hit_accumulator: Vec<usize>,
37 pub last_decay_epoch: u32,
38 pub garbage_queue: Vec<(*mut Node<K, V>, usize)>,
39 pub worker_states: Arc<[WorkerState]>,
40}
41
42unsafe impl<K: Send, V: Send, S: Send> Send for Daemon<K, V, S> {}
43
44impl<K, V, S> Daemon<K, V, S>
45where K: Hash + Eq + Send + Sync + Clone + 'static,
46 V: Send + Sync + Clone + 'static,
47 S: BuildHasher + Clone + Send + 'static
48{
49 #[allow(clippy::too_many_arguments)]
50 pub fn new(
51 hasher: S,
52 capacity: usize,
53 t1: Arc<T1<K, V>>,
54 t2: Arc<T2<K, V>>,
55 cache: Arc<Cache<K, V>>,
56 cmd_rx: Receiver<Command<K, V>>,
57 hit_rx: Receiver<[usize; 64]>,
58 epoch: Arc<AtomicU32>,
59 duration: u32,
60 worker_states: Arc<[WorkerState]>,
61 ) -> Self {
62 Self {
63 hasher,
64 arena: Arena::new(capacity),
65 t1,
66 t2,
67 cache,
68 cmd_rx,
69 hit_rx,
70 epoch,
71 duration,
72 admission: Arc::new(AdmissionFilter::new(capacity)),
73 hit_accumulator: Vec::with_capacity(8192),
74 last_decay_epoch: 0,
75 garbage_queue: Vec::new(),
76 worker_states,
77 }
78 }
79
80 pub fn run(mut self) {
81 let mut last_tick = std::time::Instant::now();
82 loop {
83 let mut processed = 0;
84
85 match self.cmd_rx.recv_timeout(std::time::Duration::from_millis(5)) {
86 Ok(cmd) => {
87 self.process_cmd(cmd);
88 processed += 1;
89
90 while processed < 8192 {
91 match self.cmd_rx.try_recv() {
92 Ok(cmd) => {
93 self.process_cmd(cmd);
94 processed += 1;
95 }
96 Err(_) => break,
97 }
98 }
99 }
100 Err(crossbeam_channel::RecvTimeoutError::Timeout) => {}
101 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => break,
102 }
103
104 if last_tick.elapsed() >= std::time::Duration::from_millis(100) {
105 self.epoch.fetch_add(1, Ordering::Relaxed);
106 last_tick = std::time::Instant::now();
107 }
108
109 self.maintenance();
110 }
111 }
112
113 #[inline(always)]
114 fn process_cmd(&mut self, cmd: Command<K, V>) {
115 match cmd {
116 Command::Insert(k, v, hash) => self.handle_admission_insert(k, v, hash),
117 Command::BatchInsert(batch) => {
118 for (k, v, hash) in batch {
119 self.handle_admission_insert(k, v, hash);
120 }
121 }
122 Command::Remove(k, hash) => self.handle_remove(k, hash),
123 Command::Clear(tx) => {
124 self.handle_clear();
125 let _ = tx.send(());
126 }
127 Command::Sync(tx) => {
128 self.maintenance();
129 let _ = tx.send(());
130 }
131 }
132 }
133
134 fn handle_admission_insert(&mut self, k: K, v: V, hash: u64) {
138 let cold_start = self.arena.free_list_len() > self.arena.capacity / 20;
139 if cold_start || self.admission.check_ghost(hash) {
140 self.handle_insert_with_hash(k, v, hash);
141 }
142 }
143
144 fn handle_insert_with_hash(&mut self, k: K, v: V, hash: u64) {
145 let tag = (hash >> 48) as u16;
146
147 let global_idx = if let Some(existing_idx) = self.cache.index_probe(hash, tag) {
149 existing_idx
150 } else {
151 if self.arena.free_list_empty() {
153 self.evict_batch();
154 }
155 if let Some(new_idx) = self.arena.pop_free_slot() {
156 new_idx
157 } else {
158 return; }
160 };
161
162 let entry = (tag as u64) << 48 | (global_idx as u64 & 0x0000_FFFF_FFFF_FFFF);
163
164 let node_ptr = Box::into_raw(Box::new(Node {
165 key: k,
166 value: v,
167 expire_at: self.epoch.load(Ordering::Relaxed) + self.duration,
168 g_idx: global_idx as u32,
169 }));
170
171 let old_ptr = self.cache.nodes[global_idx].swap(node_ptr, Ordering::Release);
172 if !old_ptr.is_null() {
173 let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
174 self.garbage_queue.push((old_ptr, epoch));
175 }
176
177 self.cache.index_store(hash, tag, entry);
179
180 self.arena.set_hash(global_idx, hash);
181 self.arena.set_rank(global_idx, MAX_RANK);
183 }
184
185 fn handle_remove(&mut self, _k: K, hash: u64) {
186 let tag = (hash >> 48) as u16;
187 if let Some(g_idx) = self.cache.index_probe(hash, tag) {
188 let old_ptr = self.cache.nodes[g_idx].swap(std::ptr::null_mut(), Ordering::Release);
189 if !old_ptr.is_null() {
190 let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
191 self.garbage_queue.push((old_ptr, epoch));
192 self.t1.clear_if_matches(hash, old_ptr);
193 self.t2.clear_if_matches(hash, old_ptr);
194 }
195 self.cache.index_remove(hash, tag, g_idx);
196 self.arena.set_rank(g_idx, 0); }
198 }
199
200 fn handle_clear(&mut self) {
201 self.cache.clear();
202 for i in 0..self.t1.len() {
203 self.t1.clear_at(i);
204 }
205 for i in 0..self.t2.len() {
206 self.t2.clear_at(i);
207 }
208 self.admission.clear();
209 self.arena.clear();
210 }
211
212 fn maintenance(&mut self) {
213 let current_global = GLOBAL_EPOCH.load(Ordering::Relaxed);
215 GLOBAL_EPOCH.store(current_global + 1, Ordering::Release);
216
217 let mut min_active_epoch = current_global + 1;
218 for state in self.worker_states.iter() {
219 let local = state.local_epoch.load(Ordering::Acquire);
220 if local != 0 && local < min_active_epoch {
221 min_active_epoch = local;
222 }
223 }
224
225 self.garbage_queue.retain(|&(ptr, epoch)| {
226 if epoch < min_active_epoch {
227 unsafe { drop(Box::from_raw(ptr)); }
228 false
229 } else {
230 true
231 }
232 });
233
234 while let Ok(batch) = self.hit_rx.try_recv() {
236 for &g_idx in batch.iter() {
237 if g_idx < self.arena.capacity {
238 self.hit_accumulator.push(g_idx);
239 }
240 }
241 if self.hit_accumulator.len() >= 8192 {
242 break;
243 }
244 }
245
246 if !self.hit_accumulator.is_empty() {
248 self.hit_accumulator.sort_unstable();
249
250 for &g_idx in &self.hit_accumulator {
251 self.arena.set_rank(g_idx, MAX_RANK);
253
254 let hash = self.arena.get_hash(g_idx);
255
256 let ptr = self.cache.nodes[g_idx].load(Ordering::Acquire);
258 if !ptr.is_null() && self.t1.load_slot(hash) != ptr {
259 self.t1.store_slot(hash, ptr);
260 }
261 }
262
263 self.hit_accumulator.clear();
264 }
265
266 if self.arena.free_list_len() < self.arena.capacity / 10 {
267 self.evict_batch();
268 }
269 }
270
271 fn evict_batch(&mut self) {
274 let count = 128;
275 let avg = (self.arena.count_sum() / self.arena.capacity as u64) as u8;
276 let threshold = avg.max(1);
277
278 for _ in 0..count {
279 if self.arena.free_list_len() > self.arena.capacity / 10 {
280 break;
281 }
282
283 let idx = self.arena.cursor();
284 let r = self.arena.get_rank(idx);
285
286 if r <= threshold {
287 let hash = self.arena.get_hash(idx);
289 let tag = (hash >> 48) as u16;
290
291 let old_ptr = self.cache.nodes[idx].swap(std::ptr::null_mut(), Ordering::Release);
292 if !old_ptr.is_null() {
293 let epoch = crate::GLOBAL_EPOCH.load(Ordering::Relaxed);
294 self.garbage_queue.push((old_ptr, epoch));
295 self.t1.clear_if_matches(hash, old_ptr);
296 self.t2.clear_if_matches(hash, old_ptr);
297 }
298
299 self.cache.index_remove(hash, tag, idx);
300
301 self.admission.record_death(hash);
302 self.arena.push_free_slot(idx);
303 self.arena.set_rank(idx, 0); } else {
305 self.arena.decrement_rank(idx);
307 }
308 self.arena.advance_cursor();
309 }
310 }
311}
312
313pub struct AdmissionFilter {
318 pub ghost_mask: usize,
319 pub ghost_set: Arc<[AtomicU16]>,
320}
321
322impl AdmissionFilter {
323 pub fn new(capacity: usize) -> Self {
324 let ghost_size = capacity.next_power_of_two();
325
326 let mut ghost_vec = Vec::with_capacity(ghost_size);
327 for _ in 0..ghost_size {
328 ghost_vec.push(AtomicU16::new(0));
329 }
330
331 Self {
332 ghost_mask: ghost_size - 1,
333 ghost_set: ghost_vec.into_boxed_slice().into(),
334 }
335 }
336
337 #[inline(always)]
339 pub fn record_death(&self, hash: u64) {
340 let fp = (hash >> 48) as u16;
341 let idx = (hash as usize) & self.ghost_mask;
342 self.ghost_set[idx].store(fp, Ordering::Relaxed);
343 }
344
345 #[inline(always)]
348 pub fn check_ghost(&self, hash: u64) -> bool {
349 let fp = (hash >> 48) as u16;
350 let ghost_idx = (hash as usize) & self.ghost_mask;
351 self.ghost_set[ghost_idx].load(Ordering::Relaxed) == fp
352 }
353
354 pub fn clear(&self) {
355 for val in self.ghost_set.iter() {
356 val.store(0, Ordering::Relaxed);
357 }
358 }
359}