Skip to main content

dualcache_ff/
daemon.rs

1#[cfg(not(feature = "std"))]
2use alloc::{boxed::Box, sync::Arc, vec::Vec};
3#[cfg(feature = "std")]
4use std::sync::Arc;
5
6use core::sync::atomic::{AtomicU16, AtomicU32, AtomicU64, Ordering};
7use core::hash::{Hash, BuildHasher};
8
9use crate::arena::Arena;
10use crate::storage::{Cache, Node};
11use crate::filters::{T1, T2};
12use crate::lossy_queue::{LossyQueue, OneshotAck};
13use crate::{WorkerState, GLOBAL_EPOCH};
14
15/// Maximum rank (Revolution Shield value).
16/// A newly inserted or hit item gets rank = MAX_RANK, granting it
17/// MAX_RANK Pendulum sweeps of guaranteed survival.
18const MAX_RANK: u8 = 3;
19
20// ── Command ───────────────────────────────────────────────────────────────
21
22pub enum Command<K, V> {
23    /// Single insert from Worker (goes through probation gate).
24    Insert(K, V, u64),
25    /// Batch of (K, V, hash) from sharded worker buffers.
26    BatchInsert(Vec<(K, V, u64)>),
27    /// Remove by key+hash.
28    Remove(K, u64),
29    /// Blocking clear — caller spins on `OneshotAck::wait()`.
30    Clear(Arc<OneshotAck>),
31    /// Blocking maintenance flush — caller spins on `OneshotAck::wait()`.
32    Sync(Arc<OneshotAck>),
33    /// Signal Daemon to exit its run loop.
34    Shutdown,
35}
36
37// ── Daemon ────────────────────────────────────────────────────────────────
38
39pub struct Daemon<K, V, S> {
40    pub hasher: S,
41    pub arena: Arena,
42    pub t1: Arc<T1<K, V>>,
43    pub t2: Arc<T2<K, V>>,
44    pub cache: Arc<Cache<K, V>>,
45    pub cmd_rx: Arc<LossyQueue<Command<K, V>>>,
46    pub hit_rx: Arc<LossyQueue<[usize; 64]>>,
47    pub epoch: Arc<AtomicU32>,
48    /// Configurable poll interval in microseconds (1 000–10 000 µs).
49    /// Controls the trade-off between CPU idle cost and hit-signal latency.
50    pub poll_us: u64,
51    pub admission: Arc<AdmissionFilter>,
52    /// Pre-allocated accumulator for deferred-sort hit processing.
53    pub hit_accumulator: Vec<usize>,
54    pub last_decay_epoch: u32,
55    pub garbage_queue: Vec<(*mut Node<K, V>, usize)>,
56    pub worker_states: Arc<[WorkerState]>,
57    /// Monotonically increasing tick counter — incremented on every poll loop.
58    /// Workers read this (Relaxed) to decide whether to time-flush their TLS
59    /// buffers without needing a hardware clock in no_std mode.
60    pub daemon_tick: Arc<AtomicU64>,
61}
62
63unsafe impl<K: Send, V: Send, S: Send> Send for Daemon<K, V, S> {}
64
65impl<K, V, S> Daemon<K, V, S>
66where
67    K: Hash + Eq + Send + Sync + Clone + 'static,
68    V: Send + Sync + Clone + 'static,
69    S: BuildHasher + Clone + Send + 'static,
70{
71    #[allow(clippy::too_many_arguments)]
72    pub fn new(
73        hasher: S,
74        capacity: usize,
75        t1: Arc<T1<K, V>>,
76        t2: Arc<T2<K, V>>,
77        cache: Arc<Cache<K, V>>,
78        cmd_rx: Arc<LossyQueue<Command<K, V>>>,
79        hit_rx: Arc<LossyQueue<[usize; 64]>>,
80        epoch: Arc<AtomicU32>,
81        duration: u32,
82        poll_us: u64,
83        worker_states: Arc<[WorkerState]>,
84        daemon_tick: Arc<AtomicU64>,
85    ) -> Self {
86        let _ = duration; // duration is stored in the epoch tick rate; kept for API compat
87        Self {
88            hasher,
89            arena: Arena::new(capacity),
90            t1,
91            t2,
92            cache,
93            cmd_rx,
94            hit_rx,
95            epoch,
96            poll_us,
97            admission: Arc::new(AdmissionFilter::new(capacity)),
98            hit_accumulator: Vec::with_capacity(8192),
99            last_decay_epoch: 0,
100            garbage_queue: Vec::new(),
101            worker_states,
102            daemon_tick,
103        }
104    }
105
106    /// Main Daemon event loop.
107    ///
108    /// # std mode
109    /// Called from a dedicated `std::thread::spawn` inside `DualCacheFF::new`.
110    /// Sleeps `poll_us` microseconds when the command queue is empty.
111    ///
112    /// # no_std mode
113    /// The caller (e.g. RTOS task) must invoke `daemon.run()` on a dedicated
114    /// task. The loop uses `core::hint::spin_loop()` between iterations;
115    /// the RTOS scheduler handles preemption and CPU sharing.
116    pub fn run(mut self) {
117        #[cfg(feature = "std")]
118        let mut last_epoch_tick = std::time::Instant::now();
119
120        loop {
121            // ── Drain command queue (up to 8192 commands per poll) ────────
122            let mut processed = 0u32;
123            loop {
124                match self.cmd_rx.try_recv() {
125                    Some(Command::Shutdown) => return,
126                    Some(cmd) => {
127                        self.process_cmd(cmd);
128                        processed += 1;
129                        if processed >= 8192 {
130                            break;
131                        }
132                    }
133                    None => break,
134                }
135            }
136
137            // ── Epoch tick ────────────────────────────────────────────────
138            // In std mode: wall-clock driven, every ~100 ms.
139            // In no_std mode: daemon_tick driven, every 100 poll iterations.
140            #[cfg(feature = "std")]
141            {
142                let now = std::time::Instant::now();
143                if now.duration_since(last_epoch_tick)
144                    >= std::time::Duration::from_millis(100)
145                {
146                    self.epoch.fetch_add(1, Ordering::Relaxed);
147                    last_epoch_tick = now;
148                }
149            }
150            #[cfg(not(feature = "std"))]
151            {
152                let tick = self.daemon_tick.load(Ordering::Relaxed);
153                if tick % 100 == 0 {
154                    self.epoch.fetch_add(1, Ordering::Relaxed);
155                }
156            }
157
158            // ── Maintenance (GC + hit processing + eviction) ──────────────
159            self.maintenance();
160
161            // ── Advance daemon_tick ───────────────────────────────────────
162            self.daemon_tick.fetch_add(1, Ordering::Relaxed);
163
164            // ── Idle sleep / spin ─────────────────────────────────────────
165            if processed == 0 {
166                #[cfg(feature = "std")]
167                std::thread::sleep(std::time::Duration::from_micros(self.poll_us));
168                #[cfg(not(feature = "std"))]
169                core::hint::spin_loop();
170            }
171        }
172    }
173
174    #[inline(always)]
175    fn process_cmd(&mut self, cmd: Command<K, V>) {
176        match cmd {
177            Command::Insert(k, v, hash) => self.handle_admission_insert(k, v, hash),
178            Command::BatchInsert(batch) => {
179                for (k, v, hash) in batch {
180                    self.handle_admission_insert(k, v, hash);
181                }
182            }
183            Command::Remove(k, hash) => self.handle_remove(k, hash),
184            Command::Clear(ack) => {
185                self.handle_clear();
186                ack.signal();
187            }
188            Command::Sync(ack) => {
189                self.maintenance();
190                ack.signal();
191            }
192            Command::Shutdown => unreachable!("handled in run()"),
193        }
194    }
195
196    /// Binary Valve Admission:
197    /// 1. Cold Start Mode (free slots > 5%): accept all.
198    /// 2. Steady State Mode: only accept if Ghost Set recognises the item.
199    fn handle_admission_insert(&mut self, k: K, v: V, hash: u64) {
200        let cold_start = self.arena.free_list_len() > self.arena.capacity / 20;
201        if cold_start || self.admission.check_ghost(hash) {
202            self.handle_insert_with_hash(k, v, hash);
203        }
204    }
205
206    fn handle_insert_with_hash(&mut self, k: K, v: V, hash: u64) {
207        let tag = (hash >> 48) as u16;
208
209        // 1. Check if it's an update of an existing entry
210        let global_idx = if let Some(existing_idx) = self.cache.index_probe(hash, tag) {
211            existing_idx
212        } else {
213            // 2. New insert: need a free slot
214            if self.arena.free_list_empty() {
215                self.evict_batch();
216            }
217            if let Some(new_idx) = self.arena.pop_free_slot() {
218                new_idx
219            } else {
220                return; // Still no slots after eviction — drop
221            }
222        };
223
224        let entry = (tag as u64) << 48 | (global_idx as u64 & 0x0000_FFFF_FFFF_FFFF);
225
226        let node_ptr = Box::into_raw(Box::new(Node {
227            key: k,
228            value: v,
229            expire_at: self.epoch.load(Ordering::Relaxed) + self.get_duration(),
230            g_idx: global_idx as u32,
231        }));
232
233        let old_ptr = self.cache.nodes[global_idx].swap(node_ptr, Ordering::Release);
234        if !old_ptr.is_null() {
235            let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
236            self.garbage_queue.push((old_ptr, epoch));
237        }
238
239        self.cache.index_store(hash, tag, entry);
240        self.arena.set_hash(global_idx, hash);
241        // Revolution Shield: new items start with MAX_RANK protection
242        self.arena.set_rank(global_idx, MAX_RANK);
243    }
244
245    fn get_duration(&self) -> u32 {
246        // Default: 10 epoch ticks ≈ 1 second (epoch ticks every 100 ms)
247        // This preserves the original API's `duration` field semantics.
248        10
249    }
250
251    fn handle_remove(&mut self, _k: K, hash: u64) {
252        let tag = (hash >> 48) as u16;
253        if let Some(g_idx) = self.cache.index_probe(hash, tag) {
254            let old_ptr =
255                self.cache.nodes[g_idx].swap(core::ptr::null_mut(), Ordering::Release);
256            if !old_ptr.is_null() {
257                let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
258                self.garbage_queue.push((old_ptr, epoch));
259                self.t1.clear_if_matches(hash, old_ptr);
260                self.t2.clear_if_matches(hash, old_ptr);
261            }
262            self.cache.index_remove(hash, tag, g_idx);
263            self.arena.set_rank(g_idx, 0); // Fast eviction next cycle
264        }
265    }
266
267    fn handle_clear(&mut self) {
268        self.cache.clear();
269        for i in 0..self.t1.len() {
270            self.t1.clear_at(i);
271        }
272        for i in 0..self.t2.len() {
273            self.t2.clear_at(i);
274        }
275        self.admission.clear();
276        self.arena.clear();
277    }
278
279    fn maintenance(&mut self) {
280        // ── Phase 0: QSBR Garbage Collection ─────────────────────────────
281        let current_global = GLOBAL_EPOCH.load(Ordering::Relaxed);
282        GLOBAL_EPOCH.store(current_global + 1, Ordering::Release);
283
284        let mut min_active_epoch = current_global + 1;
285        for state in self.worker_states.iter() {
286            let local = state.local_epoch.load(Ordering::Acquire);
287            if local != 0 && local < min_active_epoch {
288                min_active_epoch = local;
289            }
290        }
291
292        self.garbage_queue.retain(|&(ptr, epoch)| {
293            if epoch < min_active_epoch {
294                unsafe { drop(Box::from_raw(ptr)) };
295                false
296            } else {
297                true
298            }
299        });
300
301        // ── Phase 1: Collect hit indices into accumulator ─────────────────
302        while let Some(batch) = self.hit_rx.try_recv() {
303            for &g_idx in batch.iter() {
304                if g_idx < self.arena.capacity {
305                    self.hit_accumulator.push(g_idx);
306                }
307            }
308            if self.hit_accumulator.len() >= 8192 {
309                break;
310            }
311        }
312
313        // ── Phase 2: Sort + Revolution Shield hit processing ──────────────
314        if !self.hit_accumulator.is_empty() {
315            self.hit_accumulator.sort_unstable();
316
317            for &g_idx in &self.hit_accumulator {
318                // Revolution Shield: refill to MAX_RANK on every hit
319                self.arena.set_rank(g_idx, MAX_RANK);
320
321                let hash = self.arena.get_hash(g_idx);
322
323                // Promotion: hot items migrate to T1
324                let ptr = self.cache.nodes[g_idx].load(Ordering::Acquire);
325                if !ptr.is_null() && self.t1.load_slot(hash) != ptr {
326                    self.t1.store_slot(hash, ptr);
327                }
328            }
329
330            self.hit_accumulator.clear();
331        }
332
333        if self.arena.free_list_len() < self.arena.capacity / 10 {
334            self.evict_batch();
335        }
336    }
337
338    /// Avg-rank eviction: scan the Pendulum cursor, compare each slot's rank
339    /// with the running average. Guaranteed O(1) amortised candidate search.
340    fn evict_batch(&mut self) {
341        let count = 128;
342        let avg = (self.arena.count_sum() / self.arena.capacity as u64) as u8;
343        let threshold = avg.max(1);
344
345        for _ in 0..count {
346            if self.arena.free_list_len() > self.arena.capacity / 10 {
347                break;
348            }
349
350            let idx = self.arena.cursor();
351            let r = self.arena.get_rank(idx);
352
353            if r <= threshold {
354                // Evict
355                let hash = self.arena.get_hash(idx);
356                let tag = (hash >> 48) as u16;
357
358                let old_ptr =
359                    self.cache.nodes[idx].swap(core::ptr::null_mut(), Ordering::Release);
360                if !old_ptr.is_null() {
361                    let epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
362                    self.garbage_queue.push((old_ptr, epoch));
363                    self.t1.clear_if_matches(hash, old_ptr);
364                    self.t2.clear_if_matches(hash, old_ptr);
365                }
366
367                self.cache.index_remove(hash, tag, idx);
368
369                // Task 5 — Ghost Set dynamically scaled to capacity:
370                // record_death writes to ghost_set[hash & ghost_mask],
371                // where ghost_mask = capacity - 1 (already aligned).
372                self.admission.record_death(hash);
373                self.arena.push_free_slot(idx);
374                self.arena.set_rank(idx, 0);
375            } else {
376                // Decay — decrement rank by 1
377                self.arena.decrement_rank(idx);
378            }
379            self.arena.advance_cursor();
380        }
381    }
382}
383
384// ── AdmissionFilter (Ghost Set) ───────────────────────────────────────────
385
386/// Ghost Set — direct-mapped fingerprint array.
387///
388/// Records the 16-bit fingerprint of evicted items so that previously-hot
389/// items bypass TLS probation on re-insertion.
390///
391/// # Task 5 — Capacity Align
392/// Ghost Set size is always equal to the Arena's `capacity` (already a power
393/// of two). When the user sets a small capacity (e.g. 2000 items rounded to
394/// 2048), the Ghost Set is also 2048 × 2 bytes = 4 KB — not the previous
395/// bloated `capacity.next_power_of_two()` of a larger default.
396pub struct AdmissionFilter {
397    pub ghost_mask: usize,
398    pub ghost_set: Arc<[AtomicU16]>,
399}
400
401impl AdmissionFilter {
402    /// `capacity` must be a power of two (enforced by `Config`).
403    /// Ghost Set is exactly `capacity` entries (2 bytes each).
404    pub fn new(capacity: usize) -> Self {
405        // Capacity is already power-of-two — no extra `.next_power_of_two()`.
406        // Minimum 256 entries to keep the false-positive rate reasonable.
407        let ghost_size = capacity.max(256);
408
409        let mut ghost_vec = Vec::with_capacity(ghost_size);
410        for _ in 0..ghost_size {
411            ghost_vec.push(AtomicU16::new(0));
412        }
413
414        Self {
415            ghost_mask: ghost_size - 1,
416            ghost_set: ghost_vec.into_boxed_slice().into(),
417        }
418    }
419
420    /// Called by Daemon on eviction: record this item's 16-bit fingerprint.
421    #[inline(always)]
422    pub fn record_death(&self, hash: u64) {
423        let fp = (hash >> 48) as u16;
424        let idx = (hash as usize) & self.ghost_mask;
425        self.ghost_set[idx].store(fp, Ordering::Relaxed);
426    }
427
428    /// Called by Worker on insert: `true` if the fingerprint matches a
429    /// previously evicted item → bypass TLS probation.
430    #[inline(always)]
431    pub fn check_ghost(&self, hash: u64) -> bool {
432        let fp = (hash >> 48) as u16;
433        let ghost_idx = (hash as usize) & self.ghost_mask;
434        self.ghost_set[ghost_idx].load(Ordering::Relaxed) == fp
435    }
436
437    pub fn clear(&self) {
438        for val in self.ghost_set.iter() {
439            val.store(0, Ordering::Relaxed);
440        }
441    }
442}