Skip to main content

dualcache_ff/
daemon.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU16, AtomicU32, Ordering};
3use crossbeam_channel::{Receiver, Sender};
4use crate::unsafe_core::{T1, T2, Cache, Node, Arena};
5use crate::{WorkerState, GLOBAL_EPOCH};
6use std::hash::{Hash, BuildHasher};
7
8/// Maximum rank (shield value). An item hit gets rank = MAX_RANK,
9/// granting it MAX_RANK sweeps of guaranteed survival.
10const MAX_RANK: u8 = 3;
11
12pub enum Command<K, V> {
13    /// Insert request from Worker. Daemon applies global Probation gate before Cache write.
14    Insert(K, V, u64),
15    /// Batch of (K,V) pairs from sharded buffer. Each item goes through probation.
16    BatchInsert(Vec<(K, V, u64)>),
17    Remove(K, u64),
18    Clear(Sender<()>),
19    Sync(Sender<()>),
20}
21
22
23
24pub struct Daemon<K, V, S> {
25    pub hasher: S,
26    pub arena: Arena,
27    pub t1: Arc<T1<K, V>>,
28    pub t2: Arc<T2<K, V>>,
29    pub cache: Arc<Cache<K, V>>,
30    pub cmd_rx: Receiver<Command<K, V>>,
31    pub hit_rx: Receiver<[usize; 64]>,
32    pub epoch: Arc<AtomicU32>,
33    pub duration: u32,
34    pub admission: Arc<AdmissionFilter>,
35    // Pre-allocated accumulator for deferred-sort hit processing
36    pub hit_accumulator: Vec<usize>,
37    pub last_decay_epoch: u32,
38    pub garbage_queue: Vec<(*mut Node<K, V>, usize)>,
39    pub worker_states: Arc<[WorkerState]>,
40}
41
42unsafe impl<K: Send, V: Send, S: Send> Send for Daemon<K, V, S> {}
43
44impl<K, V, S> Daemon<K, V, S>
45where K: Hash + Eq + Send + Sync + Clone + 'static,
46      V: Send + Sync + Clone + 'static,
47      S: BuildHasher + Clone + Send + 'static
48{
49    #[allow(clippy::too_many_arguments)]
50    pub fn new(
51        hasher: S,
52        capacity: usize,
53        t1: Arc<T1<K, V>>,
54        t2: Arc<T2<K, V>>,
55        cache: Arc<Cache<K, V>>,
56        cmd_rx: Receiver<Command<K, V>>,
57        hit_rx: Receiver<[usize; 64]>,
58        epoch: Arc<AtomicU32>,
59        duration: u32,
60        worker_states: Arc<[WorkerState]>,
61    ) -> Self {
62        Self {
63            hasher,
64            arena: Arena::new(capacity),
65            t1,
66            t2,
67            cache,
68            cmd_rx,
69            hit_rx,
70            epoch,
71            duration,
72            admission: Arc::new(AdmissionFilter::new(capacity)),
73            hit_accumulator: Vec::with_capacity(8192),
74            last_decay_epoch: 0,
75            garbage_queue: Vec::new(),
76            worker_states,
77        }
78    }
79
80    pub fn run(mut self) {
81        let mut last_tick = std::time::Instant::now();
82        loop {
83            let mut processed = 0;
84
85            match self.cmd_rx.recv_timeout(std::time::Duration::from_millis(5)) {
86                Ok(cmd) => {
87                    self.process_cmd(cmd);
88                    processed += 1;
89
90                    while processed < 8192 {
91                        match self.cmd_rx.try_recv() {
92                            Ok(cmd) => {
93                                self.process_cmd(cmd);
94                                processed += 1;
95                            }
96                            Err(_) => break,
97                        }
98                    }
99                }
100                Err(crossbeam_channel::RecvTimeoutError::Timeout) => {}
101                Err(crossbeam_channel::RecvTimeoutError::Disconnected) => break,
102            }
103
104            if last_tick.elapsed() >= std::time::Duration::from_millis(100) {
105                self.epoch.fetch_add(1, Ordering::Relaxed);
106                last_tick = std::time::Instant::now();
107            }
108
109            self.maintenance();
110        }
111    }
112
113    #[inline(always)]
114    fn process_cmd(&mut self, cmd: Command<K, V>) {
115        match cmd {
116            Command::Insert(k, v, hash) => self.handle_admission_insert(k, v, hash),
117            Command::BatchInsert(batch) => {
118                for (k, v, hash) in batch {
119                    self.handle_admission_insert(k, v, hash);
120                }
121            }
122            Command::Remove(k, hash) => self.handle_remove(k, hash),
123            Command::Clear(tx) => {
124                self.handle_clear();
125                let _ = tx.send(());
126            }
127            Command::Sync(tx) => {
128                self.maintenance();
129                let _ = tx.send(());
130            }
131        }
132    }
133
134    /// Binary Valve Admission: 
135    /// 1. Cold Start Mode (free slots > 5%): accept all.
136    /// 2. Steady State Mode: only accept if in Ghost Set.
137    fn handle_admission_insert(&mut self, k: K, v: V, hash: u64) {
138        let cold_start = self.arena.free_list_len() > self.arena.capacity / 20;
139        if cold_start || self.admission.check_ghost(hash) {
140            self.handle_insert_with_hash(k, v, hash);
141        }
142    }
143
144    fn handle_insert_with_hash(&mut self, k: K, v: V, hash: u64) {
145        let tag = (hash >> 48) as u16;
146        
147        // 1. Check if it's an update
148        let global_idx = if let Some(existing_idx) = self.cache.index_probe(hash, tag) {
149            existing_idx
150        } else {
151            // 2. New insert: need a free slot
152            if self.arena.free_list_empty() {
153                self.evict_batch();
154            }
155            if let Some(new_idx) = self.arena.pop_free_slot() {
156                new_idx
157            } else {
158                return; // Still no slots
159            }
160        };
161
162        let entry = (tag as u64) << 48 | (global_idx as u64 & 0x0000_FFFF_FFFF_FFFF);
163        
164        let node_ptr = Box::into_raw(Box::new(Node {
165            key: k,
166            value: v,
167            expire_at: self.epoch.load(Ordering::Relaxed) + self.duration,
168            g_idx: global_idx as u32,
169        }));
170
171        let old_ptr = self.cache.nodes[global_idx].swap(node_ptr, Ordering::Release);
172        if !old_ptr.is_null() {
173            let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
174            self.garbage_queue.push((old_ptr, epoch));
175        }
176
177        // Linear probing for Cache index (will overwrite if same tag/idx exists)
178        self.cache.index_store(hash, tag, entry);
179        
180        self.arena.set_hash(global_idx, hash);
181        // Revolution Shield: new items get MAX_RANK shield
182        self.arena.set_rank(global_idx, MAX_RANK);
183    }
184
185    fn handle_remove(&mut self, _k: K, hash: u64) {
186        let tag = (hash >> 48) as u16;
187        if let Some(g_idx) = self.cache.index_probe(hash, tag) {
188            let old_ptr = self.cache.nodes[g_idx].swap(std::ptr::null_mut(), Ordering::Release);
189            if !old_ptr.is_null() {
190                let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
191                self.garbage_queue.push((old_ptr, epoch));
192                self.t1.clear_if_matches(hash, old_ptr);
193                self.t2.clear_if_matches(hash, old_ptr);
194            }
195            self.cache.index_remove(hash, tag, g_idx);
196            self.arena.set_rank(g_idx, 0); // Fast eviction next cycle
197        }
198    }
199
200    fn handle_clear(&mut self) {
201        self.cache.clear();
202        for i in 0..self.t1.len() {
203            self.t1.clear_at(i);
204        }
205        for i in 0..self.t2.len() {
206            self.t2.clear_at(i);
207        }
208        self.admission.clear();
209        self.arena.clear();
210    }
211
212    fn maintenance(&mut self) {
213        // --- Phase 0: QSBR Garbage Collection ---
214        let current_global = GLOBAL_EPOCH.load(Ordering::Relaxed);
215        GLOBAL_EPOCH.store(current_global + 1, Ordering::Release);
216        
217        let mut min_active_epoch = current_global + 1;
218        for state in self.worker_states.iter() {
219            let local = state.local_epoch.load(Ordering::Acquire);
220            if local != 0 && local < min_active_epoch {
221                min_active_epoch = local;
222            }
223        }
224
225        self.garbage_queue.retain(|&(ptr, epoch)| {
226            if epoch < min_active_epoch {
227                unsafe { drop(Box::from_raw(ptr)); }
228                false
229            } else {
230                true
231            }
232        });
233
234        // --- Phase 1: Collect hit indices into accumulator ---
235        while let Ok(batch) = self.hit_rx.try_recv() {
236            for &g_idx in batch.iter() {
237                if g_idx < self.arena.capacity {
238                    self.hit_accumulator.push(g_idx);
239                }
240            }
241            if self.hit_accumulator.len() >= 8192 {
242                break;
243            }
244        }
245
246        // --- Phase 2: Sort + Revolution Shield hit processing ---
247        if !self.hit_accumulator.is_empty() {
248            self.hit_accumulator.sort_unstable();
249
250            for &g_idx in &self.hit_accumulator {
251                // Revolution Shield: refill to MAX_RANK on every hit
252                self.arena.set_rank(g_idx, MAX_RANK);
253
254                let hash = self.arena.get_hash(g_idx);
255
256                // Promotion: items go to T1. Since we hit, node is in Cache.
257                let ptr = self.cache.nodes[g_idx].load(Ordering::Acquire);
258                if !ptr.is_null() && self.t1.load_slot(hash) != ptr {
259                    self.t1.store_slot(hash, ptr);
260                }
261            }
262
263            self.hit_accumulator.clear();
264        }
265        
266        if self.arena.free_list_len() < self.arena.capacity / 10 {
267            self.evict_batch();
268        }
269    }
270
271    /// Avg-based eviction: scan cursor, compare rank with avg.
272    /// Guaranteed O(1) find candidates.
273    fn evict_batch(&mut self) {
274        let count = 128;
275        let avg = (self.arena.count_sum() / self.arena.capacity as u64) as u8;
276        let threshold = avg.max(1);
277
278        for _ in 0..count {
279            if self.arena.free_list_len() > self.arena.capacity / 10 {
280                break;
281            }
282
283            let idx = self.arena.cursor();
284            let r = self.arena.get_rank(idx);
285            
286            if r <= threshold {
287                // Evict
288                let hash = self.arena.get_hash(idx);
289                let tag = (hash >> 48) as u16;
290                
291                let old_ptr = self.cache.nodes[idx].swap(std::ptr::null_mut(), Ordering::Release);
292                if !old_ptr.is_null() {
293                    let epoch = crate::GLOBAL_EPOCH.load(Ordering::Relaxed);
294                    self.garbage_queue.push((old_ptr, epoch));
295                    self.t1.clear_if_matches(hash, old_ptr);
296                    self.t2.clear_if_matches(hash, old_ptr);
297                }
298
299                self.cache.index_remove(hash, tag, idx);
300
301                self.admission.record_death(hash);
302                self.arena.push_free_slot(idx);
303                self.arena.set_rank(idx, 0); // Correctly update count_sum
304            } else {
305                // Decay
306                self.arena.decrement_rank(idx);
307            }
308            self.arena.advance_cursor();
309        }
310    }
311}
312
313/// Ghost Set: direct-mapped fingerprint array.
314/// Records the 16-bit fingerprint of evicted items so that
315/// previously-hot items can be resurrected immediately on re-insert,
316/// bypassing TLS probation.
317pub struct AdmissionFilter {
318    pub ghost_mask: usize,
319    pub ghost_set: Arc<[AtomicU16]>,
320}
321
322impl AdmissionFilter {
323    pub fn new(capacity: usize) -> Self {
324        let ghost_size = capacity.next_power_of_two();
325
326        let mut ghost_vec = Vec::with_capacity(ghost_size);
327        for _ in 0..ghost_size {
328            ghost_vec.push(AtomicU16::new(0));
329        }
330
331        Self {
332            ghost_mask: ghost_size - 1,
333            ghost_set: ghost_vec.into_boxed_slice().into(),
334        }
335    }
336
337    /// Called by Daemon on eviction: record this item's fingerprint.
338    #[inline(always)]
339    pub fn record_death(&self, hash: u64) {
340        let fp = (hash >> 48) as u16;
341        let idx = (hash as usize) & self.ghost_mask;
342        self.ghost_set[idx].store(fp, Ordering::Relaxed);
343    }
344
345    /// Called by Frontend Worker on insert: true if this item was previously evicted.
346    /// Fast-path: skips TLS probation and sends directly to Daemon.
347    #[inline(always)]
348    pub fn check_ghost(&self, hash: u64) -> bool {
349        let fp = (hash >> 48) as u16;
350        let ghost_idx = (hash as usize) & self.ghost_mask;
351        self.ghost_set[ghost_idx].load(Ordering::Relaxed) == fp
352    }
353
354    pub fn clear(&self) {
355        for val in self.ghost_set.iter() {
356            val.store(0, Ordering::Relaxed);
357        }
358    }
359}