Skip to main content

dualcache_ff/
lib.rs

1// ── no_std gate ───────────────────────────────────────────────────────────
2// When the "std" feature is disabled we enter no_std mode.
3// `extern crate alloc` provides Vec, Box, Arc etc. from the allocator crate.
4// The RTOS / bare-metal environment must supply a global allocator.
5#![cfg_attr(not(feature = "std"), no_std)]
6#[cfg(not(feature = "std"))]
7extern crate alloc;
8
9// ── Public sub-modules ────────────────────────────────────────────────────
10pub mod arena;
11pub mod cache_padded;
12pub mod daemon;
13pub mod filters;
14pub mod lossy_queue;
15pub mod storage;
16pub mod unsafe_core;
17pub mod workers;
18
19// ── Internal sync abstraction ─────────────────────────────────────────────
20/// Type-routing module: selects the correct `Arc` implementation based on
21/// the active feature flags.
22///
23/// | Feature    | Source               |
24/// |------------|----------------------|
25/// | `std`      | `std::sync::Arc`     |
26/// | `loom`     | `loom::sync::Arc`    |
27/// | _(neither)_| `alloc::sync::Arc`   |
28pub(crate) mod sync {
29    #[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
30    pub use std::sync::Arc;
31
32    #[cfg(any(feature = "loom", loom))]
33    pub use loom::sync::Arc;
34
35    #[cfg(all(not(feature = "std"), not(any(feature = "loom", loom))))]
36    pub use alloc::sync::Arc;
37
38    #[cfg(not(any(feature = "loom", loom)))]
39    pub type ArcSlice<T> = Arc<[T]>;
40
41    #[cfg(any(feature = "loom", loom))]
42    pub type ArcSlice<T> = Arc<Vec<T>>;
43
44    #[cfg(not(any(feature = "loom", loom)))]
45    #[inline(always)]
46    pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
47        vec.into_boxed_slice().into()
48    }
49
50    #[cfg(any(feature = "loom", loom))]
51    #[inline(always)]
52    pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
53        Arc::new(vec)
54    }
55
56    pub mod atomic {
57        #[cfg(not(any(feature = "loom", loom)))]
58        pub use core::sync::atomic::{
59            AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
60        };
61
62        #[cfg(any(feature = "loom", loom))]
63        pub use loom::sync::atomic::{
64            AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
65        };
66    }
67
68    pub mod cell {
69        #[cfg(not(any(feature = "loom", loom)))]
70        pub struct UnsafeCell<T>(core::cell::UnsafeCell<T>);
71
72        #[cfg(not(any(feature = "loom", loom)))]
73        impl<T> UnsafeCell<T> {
74            #[inline(always)]
75            pub const fn new(data: T) -> Self {
76                Self(core::cell::UnsafeCell::new(data))
77            }
78
79            #[inline(always)]
80            pub fn get(&self) -> *mut T {
81                self.0.get()
82            }
83
84            #[inline(always)]
85            pub fn with<F, R>(&self, f: F) -> R
86            where
87                F: FnOnce(*const T) -> R,
88            {
89                f(self.0.get() as *const T)
90            }
91
92            #[inline(always)]
93            pub fn with_mut<F, R>(&self, f: F) -> R
94            where
95                F: FnOnce(*mut T) -> R,
96            {
97                f(self.0.get())
98            }
99        }
100
101        #[cfg(any(feature = "loom", loom))]
102        pub use loom::cell::UnsafeCell;
103    }
104}
105
106// ── Imports ───────────────────────────────────────────────────────────────
107#[cfg(not(feature = "std"))]
108use alloc::vec::Vec;
109
110use crate::cache_padded::CachePadded;
111use crate::daemon::{Command, Daemon};
112use crate::lossy_queue::{LossyQueue, OneshotAck};
113use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
114use ahash::RandomState;
115use core::hash::{BuildHasher, Hash};
116use sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
117use sync::{Arc, ArcSlice, new_arc_slice};
118
119// ── Config ────────────────────────────────────────────────────────────────
120
121pub struct Config {
122    pub capacity: usize,
123    pub t1_slots: usize,
124    pub t2_slots: usize,
125    /// TTL duration in epoch ticks (one tick ≈ 100 ms in std mode).
126    pub duration: u32,
127    pub threads: usize,
128    /// Daemon poll interval in **microseconds** (1 000–10 000 µs = 1–10 ms).
129    /// Controls the latency vs. idle-CPU trade-off.
130    /// Lower = faster hit-signal delivery, higher CPU when idle.
131    /// Higher = more efficient idle, but hotter TLS buffers may stall longer.
132    pub poll_us: u64,
133    /// TLS flush threshold in **daemon ticks**.
134    /// A Worker forces a TLS buffer flush when it detects that the Daemon
135    /// tick counter has advanced by at least this many ticks since the last
136    /// flush, regardless of buffer fill level.
137    ///
138    /// Rule of thumb: `flush_tick_threshold ≈ 1ms / poll_us`.
139    /// For poll_us = 1 000 µs (1 ms): threshold = 1.
140    /// For poll_us = 5 000 µs (5 ms): threshold = 1 (flush every 5 ms).
141    pub flush_tick_threshold: u64,
142}
143
144impl Config {
145    /// Budget-based constructor: specify RAM and TTL, the engine picks sizes.
146    pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
147        // Assume total overhead per item is ~128 bytes
148        let raw_capacity = (ram_mb * 1024 * 1024) / 128;
149        let capacity = raw_capacity.next_power_of_two().max(256);
150
151        Self {
152            capacity,
153            // T1 fits in L1 cache: max 2048 × 8-byte pointers = 16 KB
154            t1_slots: 2048,
155            // T2 intercepts warm data: 20% of capacity (80/20 rule)
156            t2_slots: (capacity / 5).next_power_of_two().max(4096),
157            duration,
158            #[cfg(feature = "std")]
159            threads: std::thread::available_parallelism()
160                .map(|p| p.get())
161                .unwrap_or(16),
162            #[cfg(not(feature = "std"))]
163            threads: 8,
164            poll_us: 1_000,
165            flush_tick_threshold: 1,
166        }
167    }
168
169    /// Expert constructor with explicit physical-law assertions.
170    pub fn new_expert(
171        capacity: usize,
172        t1_slots: usize,
173        t2_slots: usize,
174        duration: u32,
175        threads: usize,
176    ) -> Self {
177        // Physical Law 1: Bitmask routing requires powers of two
178        assert!(capacity.is_power_of_two(), "Capacity MUST be a power of two");
179        assert!(t1_slots.is_power_of_two(), "T1 slots MUST be a power of two");
180        assert!(t2_slots.is_power_of_two(), "T2 slots MUST be a power of two");
181
182        // Physical Law 2: T1 absolutely cannot exceed L1 cache
183        assert!(
184            t1_slots <= 4096,
185            "T1 size exceeds L1 Cache physical limits! Max slots: 4096"
186        );
187
188        Self {
189            capacity,
190            t1_slots,
191            t2_slots,
192            duration,
193            threads,
194            poll_us: 1_000,
195            flush_tick_threshold: 1,
196        }
197    }
198
199    /// Builder: set Daemon poll interval (1 000–10 000 µs).
200    pub fn with_poll_us(mut self, poll_us: u64) -> Self {
201        self.poll_us = poll_us.clamp(1_000, 10_000);
202        self
203    }
204
205    /// Builder: set TLS flush threshold in daemon ticks.
206    pub fn with_flush_tick_threshold(mut self, ticks: u64) -> Self {
207        self.flush_tick_threshold = ticks.max(1);
208        self
209    }
210}
211
212// ── QSBR global epoch ─────────────────────────────────────────────────────
213
214/// Global QSBR epoch. Daemon increments this every maintenance cycle.
215/// Workers store their local epoch on `get()` entry and reset to 0 on exit,
216/// allowing Daemon to safely reclaim stale pointers.
217#[cfg(any(feature = "loom", loom))]
218loom::lazy_static! {
219    pub static ref GLOBAL_EPOCH: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(1);
220}
221
222#[cfg(not(any(feature = "loom", loom)))]
223pub static GLOBAL_EPOCH: sync::atomic::AtomicUsize = sync::atomic::AtomicUsize::new(1);
224
225/// Per-worker QSBR state — cache-line padded to prevent false sharing
226/// between workers checking in/out simultaneously.
227pub struct WorkerState {
228    pub local_epoch: CachePadded<AtomicUsize>,
229}
230
231impl WorkerState {
232    pub fn new() -> Self {
233        Self {
234            local_epoch: CachePadded::new(AtomicUsize::new(0)),
235        }
236    }
237}
238
239// ── Thread-local state (std only) ────────────────────────────────────────
240// In no_std / RTOS mode, TLS is not available. Worker state must be
241// managed by the application (e.g. passed as function arguments or stored
242// in RTOS task-local storage). The cache's `get` / `insert` / `remove`
243// methods fall back to safe, lock-free direct-send paths in no_std mode.
244
245#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
246use std::sync::Mutex;
247
248#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
249struct IdAllocator {
250    free_list: Mutex<Vec<usize>>,
251    next_id: sync::atomic::AtomicUsize,
252}
253
254#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
255static ALLOCATOR: IdAllocator = IdAllocator {
256    free_list: Mutex::new(Vec::new()),
257    next_id: sync::atomic::AtomicUsize::new(0),
258};
259
260#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
261struct ThreadIdGuard {
262    id: usize,
263}
264
265#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
266impl Drop for ThreadIdGuard {
267    fn drop(&mut self) {
268        if let Ok(mut list) = ALLOCATOR.free_list.lock() {
269            list.push(self.id);
270        }
271    }
272}
273
274#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
275use core::cell::{Cell, RefCell};
276
277#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
278thread_local! {
279    static WORKER_ID: usize = {
280        let id = if let Ok(mut list) = ALLOCATOR.free_list.lock() {
281            list.pop().unwrap_or_else(|| ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed))
282        } else {
283            ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed)
284        };
285        
286        GUARD.with(|g| {
287            *g.borrow_mut() = Some(ThreadIdGuard { id });
288        });
289        id
290    };
291
292    static GUARD: RefCell<Option<ThreadIdGuard>> = const { RefCell::new(None) };
293
294    /// Hit index buffer: batches 64 Cache-hit global indices before sending
295    /// to Daemon via the hit queue.
296    static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
297
298    /// TLS probation filter: prevents single-hit items from reaching the
299    /// Arena. A 4 KB sketch that decays periodically.
300    static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
301
302    /// Task 6 — last daemon_tick observed at TLS flush time.
303    /// When `daemon_tick - LAST_FLUSH_TICK >= flush_tick_threshold`, the
304    /// Worker force-drains its TLS buffer even if it is not full.
305    static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
306}
307
308#[cfg(any(feature = "loom", loom))]
309loom::lazy_static! {
310    static ref NEXT_THREAD_ID: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(0);
311}
312
313#[cfg(any(feature = "loom", loom))]
314use core::cell::{Cell, RefCell};
315
316#[cfg(any(feature = "loom", loom))]
317loom::thread_local! {
318    static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
319
320    /// Hit index buffer: batches 64 Cache-hit global indices before sending
321    /// to Daemon via the hit queue.
322    static HIT_BUF: RefCell<([usize; 64], usize)> = RefCell::new(([0; 64], 0));
323
324    /// TLS probation filter: prevents single-hit items from reaching the
325    /// Arena. A 4 KB sketch that decays periodically.
326    /// Heap-allocated under Loom via `vec!` to prevent virtual coroutine stack overflow.
327    static L1_FILTER: RefCell<(Box<[u8]>, usize)> = RefCell::new((vec![0u8; 4096].into_boxed_slice(), 0));
328
329    /// Task 6 — last daemon_tick observed at TLS flush time.
330    /// When `daemon_tick - LAST_FLUSH_TICK >= flush_tick_threshold`, the
331    /// Worker force-drains its TLS buffer even if it is not full.
332    static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
333}
334
335// ── DualCacheFF ───────────────────────────────────────────────────────────
336
337pub struct DualCacheFF<K, V, S = RandomState> {
338    pub hasher: S,
339    pub t1: Arc<T1<K, V>>,
340    pub t2: Arc<T2<K, V>>,
341    pub cache: Arc<Cache<K, V>>,
342    pub cmd_tx: Arc<LossyQueue<Command<K, V>>>,
343    pub hit_tx: Arc<LossyQueue<[usize; 64]>>,
344    pub epoch: Arc<AtomicU32>,
345    /// QSBR registry: one entry per thread slot.
346    pub worker_states: ArcSlice<WorkerState>,
347    /// Per-worker zero-lock batch buffers, indexed by WORKER_ID.
348    pub miss_buffers: ArcSlice<WorkerSlot<K, V>>,
349    /// Daemon tick counter — shared with the Daemon thread.
350    /// Workers read this (Relaxed) to implement time-based TLS flush.
351    pub daemon_tick: Arc<AtomicU64>,
352    /// Number of daemon_tick advances that correspond to ≈1 ms of real time.
353    pub flush_tick_threshold: u64,
354    /// Cold-start flag: Daemon sets this to false when capacity is reached.
355    pub is_cold_start: Arc<sync::atomic::AtomicBool>,
356}
357
358impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
359    fn clone(&self) -> Self {
360        Self {
361            hasher: self.hasher.clone(),
362            t1: self.t1.clone(),
363            t2: self.t2.clone(),
364            cache: self.cache.clone(),
365            cmd_tx: self.cmd_tx.clone(),
366            hit_tx: self.hit_tx.clone(),
367            epoch: self.epoch.clone(),
368            worker_states: self.worker_states.clone(),
369            miss_buffers: self.miss_buffers.clone(),
370            daemon_tick: self.daemon_tick.clone(),
371            flush_tick_threshold: self.flush_tick_threshold,
372            is_cold_start: self.is_cold_start.clone(),
373        }
374    }
375}
376
377// ── Constructor (std mode — auto-spawns Daemon thread) ────────────────────
378
379#[cfg(feature = "std")]
380impl<K, V> DualCacheFF<K, V, RandomState>
381where
382    K: Hash + Eq + Send + Sync + Clone + 'static,
383    V: Send + Sync + Clone + 'static,
384{
385    /// Create a new `DualCacheFF` and automatically spawn the background Daemon.
386    ///
387    /// Use this in `std` environments (servers, desktops).
388    pub fn new(config: Config) -> Self {
389        let (cache, daemon) = Self::new_headless(config);
390        #[cfg(any(feature = "loom", loom))]
391        {
392            let _ = daemon;
393        }
394        #[cfg(not(any(feature = "loom", loom)))]
395        std::thread::spawn(move || daemon.run());
396        cache
397    }
398}
399
400// ── Constructor (universal — returns Daemon for manual scheduling) ─────────
401
402impl<K, V> DualCacheFF<K, V, RandomState>
403where
404    K: Hash + Eq + Send + Sync + Clone + 'static,
405    V: Send + Sync + Clone + 'static,
406{
407    /// Create a `DualCacheFF` and its `Daemon` without spawning any thread.
408    ///
409    /// # std mode
410    /// Prefer `DualCacheFF::new()` which spawns the daemon automatically.
411    ///
412    /// # no_std / RTOS mode
413    /// Use `new_headless()` to obtain both the cache handle and the daemon.
414    /// Schedule `daemon.run()` on a dedicated RTOS task:
415    /// ```ignore
416    /// let (cache, daemon) = DualCacheFF::new_headless(config);
417    /// rtos::spawn_task(|| daemon.run()); // RTOS-specific API
418    /// ```
419    pub fn new_headless(config: Config) -> (Self, Daemon<K, V, RandomState>) {
420        let hasher = RandomState::new();
421        let t1 = Arc::new(T1::new(config.t1_slots));
422        let t2 = Arc::new(T2::new(config.t2_slots));
423        let cache = Arc::new(Cache::new(config.capacity));
424        let cmd_q: Arc<LossyQueue<Command<K, V>>> = Arc::new(LossyQueue::new(8192));
425        let hit_q: Arc<LossyQueue<[usize; 64]>> = Arc::new(LossyQueue::new(1024));
426        let epoch = Arc::new(AtomicU32::new(0));
427        let daemon_tick = Arc::new(AtomicU64::new(0));
428        let is_cold_start = Arc::new(sync::atomic::AtomicBool::new(true));
429
430        let mut buffers = Vec::with_capacity(config.threads);
431        let mut states = Vec::with_capacity(config.threads);
432        for _ in 0..config.threads {
433            buffers.push(WorkerSlot::new());
434            states.push(WorkerState::new());
435        }
436        let miss_buffers = new_arc_slice(buffers);
437        let worker_states = new_arc_slice(states);
438
439        let daemon = Daemon::new(
440            hasher.clone(),
441            config.capacity,
442            t1.clone(),
443            t2.clone(),
444            cache.clone(),
445            cmd_q.clone(),
446            hit_q.clone(),
447            epoch.clone(),
448            config.duration,
449            config.poll_us,
450            worker_states.clone(),
451            daemon_tick.clone(),
452            is_cold_start.clone(),
453        );
454
455        let this = Self {
456            hasher,
457            t1,
458            t2,
459            cache,
460            cmd_tx: cmd_q,
461            hit_tx: hit_q,
462            epoch,
463            worker_states,
464            miss_buffers,
465            daemon_tick,
466            flush_tick_threshold: config.flush_tick_threshold,
467            is_cold_start,
468        };
469
470        (this, daemon)
471    }
472}
473
474// ── Public API (std + no_std) ─────────────────────────────────────────────
475
476impl<K, V, S> DualCacheFF<K, V, S>
477where
478    K: Hash + Eq + Send + Sync + Clone + 'static,
479    V: Send + Sync + Clone + 'static,
480    S: BuildHasher + Clone + Send + 'static,
481{
482    /// Flush all pending TLS buffers and wait for the Daemon to process them.
483    ///
484    /// Blocks via `OneshotAck::wait()` (spin-wait, safe in both std and no_std).
485    pub fn sync(&self) {
486        // ── std: flush TLS hit buffer ─────────────────────────────────────
487        #[cfg(feature = "std")]
488        HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
489            let mut state = buf.borrow_mut();
490            if state.1 > 0_usize {
491                let _ = self.hit_tx.try_send(state.0);
492                state.1 = 0;
493            }
494        });
495
496        // ── std: flush all worker slots ───────────────────────────────────
497        #[cfg(feature = "std")]
498        for slot in self.miss_buffers.iter() {
499            let buf = unsafe { slot.get_mut_unchecked() };
500            if buf.len() > 0 {
501                let batch = buf.drain_to_vec();
502                let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
503            }
504        }
505
506        // Send a Sync command and spin-wait for acknowledgment
507        let ack = OneshotAck::new();
508        self.cmd_tx.send_blocking(Command::Sync(ack.clone()));
509        ack.wait();
510    }
511
512    /// Look up a key.
513    ///
514    /// Hot-path order: T1 (L1 direct-map) → T2 (L2 direct-map) → Cache (L3).
515    /// Records a hit signal into the TLS buffer for Daemon processing.
516    pub fn get(&self, key: &K) -> Option<V> {
517        let hash = self.hash(key);
518        let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
519
520        // ── QSBR Check-in (std only — requires TLS) ───────────────────────
521        #[cfg(feature = "std")]
522        let mut id_opt = None;
523        #[cfg(feature = "std")]
524        {
525            let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
526            WORKER_ID.with(|&id| {
527                if id < self.worker_states.len() {
528                    self.worker_states[id]
529                        .local_epoch
530                        .store(global_epoch, Ordering::Relaxed);
531                    id_opt = Some(id);
532                }
533            });
534        }
535
536        #[cfg(feature = "std")]
537        let has_epoch = id_opt.is_some();
538        #[cfg(not(feature = "std"))]
539        let has_epoch = true;
540
541        let mut res: Option<V> = None;
542        let mut hit_g_idx: Option<u32> = None;
543
544        if has_epoch {
545            // ── T1 check ──────────────────────────────────────────────────────
546            let ptr_t1: *mut crate::storage::Node<K, V> = self.t1.load_slot(hash);
547            if !ptr_t1.is_null() {
548                let node = unsafe { &*ptr_t1 };
549                if node.key == *key
550                    && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
551                {
552                    res = Some(node.value.clone());
553                    hit_g_idx = Some(node.g_idx);
554                }
555            }
556
557            // ── T2 check ──────────────────────────────────────────────────────
558            if res.is_none() {
559                let ptr_t2: *mut crate::storage::Node<K, V> = self.t2.load_slot(hash);
560                if !ptr_t2.is_null() {
561                    let node = unsafe { &*ptr_t2 };
562                    if node.key == *key
563                        && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
564                    {
565                        res = Some(node.value.clone());
566                        hit_g_idx = Some(node.g_idx);
567                    }
568                }
569            }
570
571            // ── Cache (L3) check ──────────────────────────────────────────────
572            if res.is_none() {
573                let tag = (hash >> 48) as u16;
574                if let Some(global_idx) = self.cache.index_probe(hash, tag) {
575                    if let Some(v) = self
576                        .cache
577                        .node_get_full(global_idx, key, current_epoch_cache)
578                    {
579                        res = Some(v);
580                        hit_g_idx = Some(global_idx as u32);
581                    }
582                }
583            }
584        }
585
586        // ── QSBR Check-out (std only) ─────────────────────────────────────
587        #[cfg(feature = "std")]
588        if let Some(id) = id_opt {
589            self.worker_states[id]
590                .local_epoch
591                .store(0, Ordering::Relaxed);
592        }
593
594        if let Some(g_idx) = hit_g_idx {
595            self.record_hit(g_idx as usize);
596        }
597
598        res
599    }
600
601    /// Insert a key-value pair.
602    ///
603    /// # L1 Probation Filter (std only)
604    /// Items that appear only once in a TLS epoch are silently dropped.
605    /// This prevents cache pollution from scan traffic.
606    /// In no_std mode the filter is skipped and all items are forwarded.
607    ///
608    /// # Task 6 — Time-based TLS Flush (std only)
609    /// The TLS batch buffer normally flushes when it reaches 32 items.
610    /// Additionally, if the Daemon tick counter has advanced by at least
611    /// `flush_tick_threshold` since the last flush, the buffer is force-drained
612    /// even if nearly empty. This prevents hot items from being invisible to
613    /// the Daemon for too long (the "split-brain eviction" bug).
614    pub fn insert(&self, key: K, value: V) {
615        let hash = self.hash(&key);
616
617        // ── std path: L1 Probation Filter + TLS batch ─────────────────────
618        #[cfg(feature = "std")]
619        {
620            let is_cold = self.is_cold_start.load(Ordering::Relaxed);
621            let mut bypass = is_cold;
622
623            if !bypass {
624                // Perform thread-safe fast lookup to see if key exists
625                // ── QSBR Check-in ───────────────────────
626                let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
627                let mut id_opt = None;
628                WORKER_ID.with(|&id| {
629                    if id < self.worker_states.len() {
630                        self.worker_states[id]
631                            .local_epoch
632                            .store(global_epoch, Ordering::Relaxed);
633                        id_opt = Some(id);
634                    }
635                });
636
637                if id_opt.is_some() {
638                    // T1 check
639                    let ptr_t1 = self.t1.load_slot(hash);
640                    if !ptr_t1.is_null() {
641                        let node = unsafe { &*ptr_t1 };
642                        if node.key == key {
643                            bypass = true;
644                        }
645                    }
646
647                    // T2 check
648                    if !bypass {
649                        let ptr_t2 = self.t2.load_slot(hash);
650                        if !ptr_t2.is_null() {
651                            let node = unsafe { &*ptr_t2 };
652                            if node.key == key {
653                                bypass = true;
654                            }
655                        }
656                    }
657
658                    // Cache (L3) check
659                    if !bypass {
660                        let tag = (hash >> 48) as u16;
661                        if let Some(global_idx) = self.cache.index_probe(hash, tag) {
662                            let ptr = self.cache.nodes[global_idx].load(Ordering::Acquire);
663                            if !ptr.is_null() {
664                                let node = unsafe { &*ptr };
665                                if node.key == key {
666                                    bypass = true;
667                                }
668                            }
669                        }
670                    }
671                }
672
673                // ── QSBR Check-out ─────────────────────────────────────
674                if let Some(id) = id_opt {
675                    self.worker_states[id]
676                        .local_epoch
677                        .store(0, Ordering::Relaxed);
678                }
679            }
680
681            let pass = if bypass {
682                true
683            } else {
684                // L1 Probation Filter
685                #[cfg(any(feature = "loom", loom))]
686                {
687                    L1_FILTER.with(|f| {
688                        let mut state = f.borrow_mut();
689                        let idx = (hash as usize) & 4095_usize;
690                        let val = state.0[idx];
691
692                        state.1 += 1;
693                        if state.1 >= 4096_usize {
694                            for x in state.0.iter_mut() {
695                                *x >>= 1;
696                            }
697                            state.1 = 0;
698                        }
699
700                        if val < 1_u8 {
701                            state.0[idx] = 1;
702                            false
703                        } else {
704                            if val < 2_u8 {
705                                state.0[idx] = 2;
706                            }
707                            true
708                        }
709                    })
710                }
711
712                #[cfg(not(any(feature = "loom", loom)))]
713                {
714                    L1_FILTER.with(|f: &RefCell<([u8; 4096], usize)>| {
715                        let mut state = f.borrow_mut();
716                        let idx = (hash as usize) & 4095_usize;
717                        let val = state.0[idx];
718
719                        state.1 += 1;
720                        if state.1 >= 4096_usize {
721                            for x in state.0.iter_mut() {
722                                *x >>= 1;
723                            }
724                            state.1 = 0;
725                        }
726
727                        if val < 1_u8 {
728                            state.0[idx] = 1;
729                            false
730                        } else {
731                            if val < 2_u8 {
732                                state.0[idx] = 2;
733                            }
734                            true
735                        }
736                    })
737                }
738            };
739
740            if !pass {
741                return;
742            }
743
744            // Task 6: Time-based flush detection
745            let current_tick = self.daemon_tick.load(Ordering::Relaxed);
746            let should_time_flush = LAST_FLUSH_TICK.with(|c: &Cell<u64>| {
747                current_tick.wrapping_sub(c.get()) >= self.flush_tick_threshold
748            });
749
750            // Worker TLS batch buffer
751            WORKER_ID.with(|&id| {
752                if id >= self.miss_buffers.len() {
753                    // Worker overflow: gracefully degrade to direct send
754                    let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
755                    return;
756                }
757
758                // Safety: WORKER_ID is unique per thread → exclusive slot access
759                let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
760                let capacity_flush = buf.push((key, value, hash));
761
762                if capacity_flush || (should_time_flush && !buf.is_empty()) {
763                    let batch = buf.drain_to_vec();
764                    let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
765                    LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(current_tick));
766                }
767            });
768        }
769
770        // ── no_std path: direct send (no TLS available) ───────────────────
771        #[cfg(not(feature = "std"))]
772        {
773            let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
774        }
775    }
776
777    /// Remove a key from the cache.
778    pub fn remove(&self, key: &K) {
779        let hash = self.hash(key);
780
781        // ── std: flush this thread's buffer first for causal ordering ─────
782        #[cfg(feature = "std")]
783        WORKER_ID.with(|&id| {
784            if id < self.miss_buffers.len() {
785                let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
786                if buf.len() > 0 {
787                    let batch = buf.drain_to_vec();
788                    let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
789                    let tick = self.daemon_tick.load(Ordering::Relaxed);
790                    LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(tick));
791                }
792            }
793        });
794
795        self.cmd_tx.send_blocking(Command::Remove(key.clone(), hash));
796    }
797
798    /// Clear all cached data.
799    pub fn clear(&self) {
800        let ack = OneshotAck::new();
801        self.cmd_tx.send_blocking(Command::Clear(ack.clone()));
802        ack.wait();
803    }
804
805    // ── Internals ─────────────────────────────────────────────────────────
806
807    #[inline(always)]
808    fn hash(&self, key: &K) -> u64 {
809        self.hasher.hash_one(key)
810    }
811
812    /// Buffer a Cache-hit global index for Daemon processing.
813    ///
814    /// std: fills the 64-element TLS array and ships it to `hit_tx` when full.
815    /// no_std: sends directly (no TLS batch buffering available).
816    #[inline(always)]
817    fn record_hit(&self, global_idx: usize) {
818        #[cfg(feature = "std")]
819        HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
820            let mut state = buf.borrow_mut();
821            let idx = state.1;
822            state.0[idx] = global_idx;
823            state.1 += 1;
824            if state.1 == 64_usize {
825                let _ = self.hit_tx.try_send(state.0);
826                state.1 = 0;
827            }
828        });
829
830        // no_std: no TLS; hit signals are not batched.
831        // Daemon still processes hits via hit_rx if sent individually.
832        #[cfg(not(feature = "std"))]
833        {
834            let mut batch = [0usize; 64];
835            batch[0] = global_idx;
836            let _ = self.hit_tx.try_send(batch);
837        }
838    }
839}
840
841impl<K, V, S> Drop for DualCacheFF<K, V, S> {
842    fn drop(&mut self) {
843        if Arc::strong_count(&self.cmd_tx) <= 2 {
844            let _ = self.cmd_tx.try_send(Command::Shutdown);
845        }
846    }
847}
848