Skip to main content

dualcache_ff/
daemon.rs

1#[cfg(not(feature = "std"))]
2use alloc::{boxed::Box, vec::Vec};
3
4use crate::sync::{Arc, ArcSlice, new_arc_slice};
5use crate::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, AtomicU64, Ordering};
6use core::hash::{Hash, BuildHasher};
7
8use crate::arena::Arena;
9use crate::storage::{Cache, Node};
10use crate::filters::{T1, T2};
11use crate::lossy_queue::{LossyQueue, OneshotAck};
12use crate::{WorkerState, GLOBAL_EPOCH};
13
14/// Maximum rank (Revolution Shield value).
15/// A newly inserted or hit item gets rank = MAX_RANK, granting it
16/// MAX_RANK Pendulum sweeps of guaranteed survival.
17const MAX_RANK: u8 = 3;
18
19// ── Command ───────────────────────────────────────────────────────────────
20
21pub enum Command<K, V> {
22    /// Single insert from Worker (goes through probation gate).
23    Insert(K, V, u64),
24    /// Batch of (K, V, hash) from sharded worker buffers.
25    BatchInsert(Vec<(K, V, u64)>),
26    /// Remove by key+hash.
27    Remove(K, u64),
28    /// Blocking clear — caller spins on `OneshotAck::wait()`.
29    Clear(Arc<OneshotAck>),
30    /// Blocking maintenance flush — caller spins on `OneshotAck::wait()`.
31    Sync(Arc<OneshotAck>),
32    /// Signal Daemon to exit its run loop.
33    Shutdown,
34}
35
36// ── Daemon ────────────────────────────────────────────────────────────────
37
38pub struct Daemon<K, V, S> {
39    pub hasher: S,
40    pub arena: Arena,
41    pub t1: Arc<T1<K, V>>,
42    pub t2: Arc<T2<K, V>>,
43    pub cache: Arc<Cache<K, V>>,
44    pub cmd_rx: Arc<LossyQueue<Command<K, V>>>,
45    pub hit_rx: Arc<LossyQueue<[usize; 64]>>,
46    pub epoch: Arc<AtomicU32>,
47    /// Configurable poll interval in microseconds (1 000–10 000 µs).
48    /// Controls the trade-off between CPU idle cost and hit-signal latency.
49    pub poll_us: u64,
50    pub admission: Arc<AdmissionFilter>,
51    /// Pre-allocated accumulator for deferred-sort hit processing.
52    pub hit_accumulator: Vec<usize>,
53    pub last_decay_epoch: u32,
54    pub garbage_queue: Vec<(*mut Node<K, V>, usize)>,
55    pub worker_states: ArcSlice<WorkerState>,
56    /// Monotonically increasing tick counter — incremented on every poll loop.
57    /// Workers read this (Relaxed) to decide whether to time-flush their TLS
58    /// buffers without needing a hardware clock in no_std mode.
59    pub daemon_tick: Arc<AtomicU64>,
60    /// Cold-start flag shared with DualCacheFF
61    pub is_cold_start: Arc<AtomicBool>,
62}
63
64unsafe impl<K: Send, V: Send, S: Send> Send for Daemon<K, V, S> {}
65
66impl<K, V, S> Daemon<K, V, S>
67where
68    K: Hash + Eq + Send + Sync + Clone + 'static,
69    V: Send + Sync + Clone + 'static,
70    S: BuildHasher + Clone + Send + 'static,
71{
72    #[allow(clippy::too_many_arguments)]
73    pub fn new(
74        hasher: S,
75        capacity: usize,
76        t1: Arc<T1<K, V>>,
77        t2: Arc<T2<K, V>>,
78        cache: Arc<Cache<K, V>>,
79        cmd_rx: Arc<LossyQueue<Command<K, V>>>,
80        hit_rx: Arc<LossyQueue<[usize; 64]>>,
81        epoch: Arc<AtomicU32>,
82        duration: u32,
83        poll_us: u64,
84        worker_states: ArcSlice<WorkerState>,
85        daemon_tick: Arc<AtomicU64>,
86        is_cold_start: Arc<AtomicBool>,
87    ) -> Self {
88        let _ = duration; // duration is stored in the epoch tick rate; kept for API compat
89        Self {
90            hasher,
91            arena: Arena::new(capacity),
92            t1,
93            t2,
94            cache,
95            cmd_rx,
96            hit_rx,
97            epoch,
98            poll_us,
99            admission: Arc::new(AdmissionFilter::new(capacity)),
100            hit_accumulator: Vec::with_capacity(8192),
101            last_decay_epoch: 0,
102            garbage_queue: Vec::new(),
103            worker_states,
104            daemon_tick,
105            is_cold_start,
106        }
107    }
108
109    /// Main Daemon event loop.
110    ///
111    /// # std mode
112    /// Called from a dedicated `std::thread::spawn` inside `DualCacheFF::new`.
113    /// Sleeps `poll_us` microseconds when the command queue is empty.
114    ///
115    /// # no_std mode
116    /// The caller (e.g. RTOS task) must invoke `daemon.run()` on a dedicated
117    /// task. The loop uses `core::hint::spin_loop()` between iterations;
118    /// the RTOS scheduler handles preemption and CPU sharing.
119    pub fn run(mut self) {
120        #[cfg(feature = "std")]
121        let mut last_epoch_tick = std::time::Instant::now();
122
123        loop {
124            // ── Drain command queue (up to 8192 commands per poll) ────────
125            let mut processed = 0u32;
126            loop {
127                match self.cmd_rx.try_recv() {
128                    Some(Command::Shutdown) => return,
129                    Some(cmd) => {
130                        self.process_cmd(cmd);
131                        processed += 1;
132                        if processed >= 8192 {
133                            break;
134                        }
135                    }
136                    None => break,
137                }
138            }
139
140            // ── Epoch tick ────────────────────────────────────────────────
141            // In std mode: wall-clock driven, every ~100 ms.
142            // In no_std mode: daemon_tick driven, every 100 poll iterations.
143            #[cfg(feature = "std")]
144            {
145                let now = std::time::Instant::now();
146                if now.duration_since(last_epoch_tick)
147                    >= std::time::Duration::from_millis(100)
148                {
149                    self.epoch.fetch_add(1, Ordering::Relaxed);
150                    last_epoch_tick = now;
151                }
152            }
153            #[cfg(not(feature = "std"))]
154            {
155                let tick = self.daemon_tick.load(Ordering::Relaxed);
156                if tick % 100 == 0 {
157                    self.epoch.fetch_add(1, Ordering::Relaxed);
158                }
159            }
160
161            // ── Maintenance (GC + hit processing + eviction) ──────────────
162            self.maintenance();
163
164            // ── Advance daemon_tick ───────────────────────────────────────
165            self.daemon_tick.fetch_add(1, Ordering::Relaxed);
166
167            // ── Idle sleep / spin ─────────────────────────────────────────
168            if processed == 0 {
169                #[cfg(any(feature = "loom", loom))]
170                loom::thread::yield_now();
171                #[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
172                std::thread::sleep(std::time::Duration::from_micros(self.poll_us));
173                #[cfg(not(feature = "std"))]
174                core::hint::spin_loop();
175            }
176        }
177    }
178
179    #[inline(always)]
180    fn process_cmd(&mut self, cmd: Command<K, V>) {
181        match cmd {
182            Command::Insert(k, v, hash) => self.handle_admission_insert(k, v, hash),
183            Command::BatchInsert(batch) => {
184                for (k, v, hash) in batch {
185                    self.handle_admission_insert(k, v, hash);
186                }
187            }
188            Command::Remove(k, hash) => self.handle_remove(k, hash),
189            Command::Clear(ack) => {
190                self.handle_clear();
191                ack.signal();
192            }
193            Command::Sync(ack) => {
194                self.maintenance();
195                ack.signal();
196            }
197            Command::Shutdown => unreachable!("handled in run()"),
198        }
199    }
200
201    /// Binary Valve Admission:
202    /// 1. Cold Start Mode (free slots > 5%): accept all.
203    /// 2. Steady State Mode: only accept if Ghost Set recognises the item.
204    fn handle_admission_insert(&mut self, k: K, v: V, hash: u64) {
205        let cold_start = self.arena.free_list_len() > self.arena.capacity / 20;
206        self.is_cold_start.store(cold_start, Ordering::Relaxed);
207        if cold_start || self.admission.check_ghost(hash) {
208            self.handle_insert_with_hash(k, v, hash);
209        }
210    }
211
212    fn handle_insert_with_hash(&mut self, k: K, v: V, hash: u64) {
213        let tag = (hash >> 48) as u16;
214
215        // 1. Check if it's an update of an existing entry
216        let global_idx = if let Some(existing_idx) = self.cache.index_probe(hash, tag) {
217            existing_idx
218        } else {
219            // 2. New insert: need a free slot
220            if self.arena.free_list_empty() {
221                self.evict_batch();
222            }
223            if let Some(new_idx) = self.arena.pop_free_slot() {
224                new_idx
225            } else {
226                return; // Still no slots after eviction — drop
227            }
228        };
229
230        let entry = (tag as u64) << 48 | (global_idx as u64 & 0x0000_FFFF_FFFF_FFFF);
231
232        let node_ptr = Box::into_raw(Box::new(Node {
233            key: k,
234            value: v,
235            expire_at: self.epoch.load(Ordering::Relaxed) + self.get_duration(),
236            g_idx: global_idx as u32,
237        }));
238
239        let old_ptr = self.cache.nodes[global_idx].swap(node_ptr, Ordering::Release);
240        if !old_ptr.is_null() {
241            let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
242            self.garbage_queue.push((old_ptr, epoch));
243        }
244
245        self.cache.index_store(hash, tag, entry);
246        self.arena.set_hash(global_idx, hash);
247        // Revolution Shield: new items start with MAX_RANK protection
248        self.arena.set_rank(global_idx, MAX_RANK);
249    }
250
251    fn get_duration(&self) -> u32 {
252        // Default: 10 epoch ticks ≈ 1 second (epoch ticks every 100 ms)
253        // This preserves the original API's `duration` field semantics.
254        10
255    }
256
257    fn handle_remove(&mut self, _k: K, hash: u64) {
258        let tag = (hash >> 48) as u16;
259        if let Some(g_idx) = self.cache.index_probe(hash, tag) {
260            let old_ptr =
261                self.cache.nodes[g_idx].swap(core::ptr::null_mut(), Ordering::Release);
262            if !old_ptr.is_null() {
263                let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
264                self.garbage_queue.push((old_ptr, epoch));
265                self.t1.clear_if_matches(hash, old_ptr);
266                self.t2.clear_if_matches(hash, old_ptr);
267            }
268            self.cache.index_remove(hash, tag, g_idx);
269            self.arena.set_rank(g_idx, 0); // Fast eviction next cycle
270        }
271    }
272
273    fn handle_clear(&mut self) {
274        self.cache.clear();
275        for i in 0..self.t1.len() {
276            self.t1.clear_at(i);
277        }
278        for i in 0..self.t2.len() {
279            self.t2.clear_at(i);
280        }
281        self.admission.clear();
282        self.arena.clear();
283        self.is_cold_start.store(true, Ordering::Relaxed);
284    }
285
286    fn maintenance(&mut self) {
287        // ── Phase 0: QSBR Garbage Collection ─────────────────────────────
288        let current_global = GLOBAL_EPOCH.load(Ordering::Relaxed);
289        GLOBAL_EPOCH.store(current_global + 1, Ordering::Release);
290
291        let mut min_active_epoch = current_global + 1;
292        for state in self.worker_states.iter() {
293            let local = state.local_epoch.load(Ordering::Acquire);
294            if local != 0 && local < min_active_epoch {
295                min_active_epoch = local;
296            }
297        }
298
299        self.garbage_queue.retain(|&(ptr, epoch)| {
300            if epoch < min_active_epoch {
301                unsafe { drop(Box::from_raw(ptr)) };
302                false
303            } else {
304                true
305            }
306        });
307
308        // ── Phase 1: Collect hit indices into accumulator ─────────────────
309        while let Some(batch) = self.hit_rx.try_recv() {
310            for &g_idx in batch.iter() {
311                if g_idx < self.arena.capacity {
312                    self.hit_accumulator.push(g_idx);
313                }
314            }
315            if self.hit_accumulator.len() >= 8192 {
316                break;
317            }
318        }
319
320        // ── Phase 2: Sort + Revolution Shield hit processing ──────────────
321        if !self.hit_accumulator.is_empty() {
322            self.hit_accumulator.sort_unstable();
323
324            for &g_idx in &self.hit_accumulator {
325                // Revolution Shield: refill to MAX_RANK on every hit
326                self.arena.set_rank(g_idx, MAX_RANK);
327
328                let hash = self.arena.get_hash(g_idx);
329
330                // Promotion: hot items migrate to T1
331                let ptr = self.cache.nodes[g_idx].load(Ordering::Acquire);
332                if !ptr.is_null() && self.t1.load_slot(hash) != ptr {
333                    self.t1.store_slot(hash, ptr);
334                }
335            }
336
337            self.hit_accumulator.clear();
338        }
339
340        if self.arena.free_list_len() < self.arena.capacity / 10 {
341            self.evict_batch();
342        }
343
344        let cold_start = self.arena.free_list_len() > self.arena.capacity / 20;
345        self.is_cold_start.store(cold_start, Ordering::Relaxed);
346    }
347
348    /// Avg-rank eviction: scan the Pendulum cursor, compare each slot's rank
349    /// with the running average. Guaranteed O(1) amortised candidate search.
350    fn evict_batch(&mut self) {
351        let count = 128;
352        let avg = (self.arena.count_sum() / self.arena.capacity as u64) as u8;
353        let threshold = avg.max(1);
354
355        for _ in 0..count {
356            if self.arena.free_list_len() > self.arena.capacity / 10 {
357                break;
358            }
359
360            let idx = self.arena.cursor();
361            let r = self.arena.get_rank(idx);
362
363            if r <= threshold {
364                // Evict
365                let hash = self.arena.get_hash(idx);
366                let tag = (hash >> 48) as u16;
367
368                let old_ptr =
369                    self.cache.nodes[idx].swap(core::ptr::null_mut(), Ordering::Release);
370                if !old_ptr.is_null() {
371                    let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
372                    self.garbage_queue.push((old_ptr, epoch));
373                    self.t1.clear_if_matches(hash, old_ptr);
374                    self.t2.clear_if_matches(hash, old_ptr);
375                }
376
377                self.cache.index_remove(hash, tag, idx);
378
379                // Task 5 — Ghost Set dynamically scaled to capacity:
380                // record_death writes to ghost_set[hash & ghost_mask],
381                // where ghost_mask = capacity - 1 (already aligned).
382                self.admission.record_death(hash);
383                self.arena.push_free_slot(idx);
384                self.arena.set_rank(idx, 0);
385            } else {
386                // Decay — decrement rank by 1
387                self.arena.decrement_rank(idx);
388            }
389            self.arena.advance_cursor();
390        }
391    }
392}
393
394impl<K, V, S> Drop for Daemon<K, V, S> {
395    fn drop(&mut self) {
396        for &(ptr, _) in self.garbage_queue.iter() {
397            if !ptr.is_null() {
398                unsafe {
399                    let _ = Box::from_raw(ptr);
400                }
401            }
402        }
403    }
404}
405
406// ── AdmissionFilter (Ghost Set) ───────────────────────────────────────────
407
408/// Ghost Set — direct-mapped fingerprint array.
409///
410/// Records the 16-bit fingerprint of evicted items so that previously-hot
411/// items bypass TLS probation on re-insertion.
412///
413/// # Task 5 — Capacity Align
414/// Ghost Set size is always equal to the Arena's `capacity` (already a power
415/// of two). When the user sets a small capacity (e.g. 2000 items rounded to
416/// 2048), the Ghost Set is also 2048 × 2 bytes = 4 KB — not the previous
417/// bloated `capacity.next_power_of_two()` of a larger default.
418pub struct AdmissionFilter {
419    pub ghost_mask: usize,
420    pub ghost_set: ArcSlice<AtomicU16>,
421}
422
423impl AdmissionFilter {
424    /// `capacity` must be a power of two (enforced by `Config`).
425    /// Ghost Set is exactly `capacity` entries (2 bytes each).
426    pub fn new(capacity: usize) -> Self {
427        // Capacity is already power-of-two — no extra `.next_power_of_two()`.
428        // Minimum 256 entries to keep the false-positive rate reasonable.
429        let ghost_size = capacity.max(256);
430
431        let mut ghost_vec = Vec::with_capacity(ghost_size);
432        for _ in 0..ghost_size {
433            ghost_vec.push(AtomicU16::new(0));
434        }
435
436        Self {
437            ghost_mask: ghost_size - 1,
438            ghost_set: new_arc_slice(ghost_vec),
439        }
440    }
441
442    /// Called by Daemon on eviction: record this item's 16-bit fingerprint.
443    #[inline(always)]
444    pub fn record_death(&self, hash: u64) {
445        let fp = (hash >> 48) as u16;
446        let idx = (hash as usize) & self.ghost_mask;
447        self.ghost_set[idx].store(fp, Ordering::Relaxed);
448    }
449
450    /// Called by Worker on insert: `true` if the fingerprint matches a
451    /// previously evicted item → bypass TLS probation.
452    #[inline(always)]
453    pub fn check_ghost(&self, hash: u64) -> bool {
454        let fp = (hash >> 48) as u16;
455        let ghost_idx = (hash as usize) & self.ghost_mask;
456        self.ghost_set[ghost_idx].load(Ordering::Relaxed) == fp
457    }
458
459    pub fn clear(&self) {
460        for val in self.ghost_set.iter() {
461            val.store(0, Ordering::Relaxed);
462        }
463    }
464}