Skip to main content

dualcache_ff/
lib.rs

1// ── no_std gate ───────────────────────────────────────────────────────────
2// When the "std" feature is disabled we enter no_std mode.
3// `extern crate alloc` provides Vec, Box, Arc etc. from the allocator crate.
4// The RTOS / bare-metal environment must supply a global allocator.
5#![cfg_attr(not(feature = "std"), no_std)]
6#[cfg(not(feature = "std"))]
7extern crate alloc;
8
9// ── Public sub-modules ────────────────────────────────────────────────────
10pub mod arena;
11pub mod cache_padded;
12pub mod daemon;
13pub mod filters;
14pub mod lossy_queue;
15pub mod storage;
16pub mod unsafe_core;
17pub mod workers;
18
19// ── Internal sync abstraction ─────────────────────────────────────────────
20/// Type-routing module: selects the correct `Arc` implementation based on
21/// the active feature flags.
22///
23/// | Feature    | Source               |
24/// |------------|----------------------|
25/// | `std`      | `std::sync::Arc`     |
26/// | `loom`     | `loom::sync::Arc`    |
27/// | _(neither)_| `alloc::sync::Arc`   |
28pub(crate) mod sync {
29    #[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
30    pub use std::sync::Arc;
31
32    #[cfg(any(feature = "loom", loom))]
33    pub use loom::sync::Arc;
34
35    #[cfg(all(not(feature = "std"), not(any(feature = "loom", loom))))]
36    pub use alloc::sync::Arc;
37
38    #[cfg(not(any(feature = "loom", loom)))]
39    pub type ArcSlice<T> = Arc<[T]>;
40
41    #[cfg(any(feature = "loom", loom))]
42    pub type ArcSlice<T> = Arc<Vec<T>>;
43
44    #[cfg(not(any(feature = "loom", loom)))]
45    #[inline(always)]
46    pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
47        vec.into_boxed_slice().into()
48    }
49
50    #[cfg(any(feature = "loom", loom))]
51    #[inline(always)]
52    pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
53        Arc::new(vec)
54    }
55
56    pub mod atomic {
57        #[cfg(not(any(feature = "loom", loom)))]
58        pub use core::sync::atomic::{
59            AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
60        };
61
62        #[cfg(any(feature = "loom", loom))]
63        pub use loom::sync::atomic::{
64            AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
65        };
66    }
67
68    pub mod cell {
69        #[cfg(not(any(feature = "loom", loom)))]
70        pub struct UnsafeCell<T>(core::cell::UnsafeCell<T>);
71
72        #[cfg(not(any(feature = "loom", loom)))]
73        impl<T> UnsafeCell<T> {
74            #[inline(always)]
75            pub const fn new(data: T) -> Self {
76                Self(core::cell::UnsafeCell::new(data))
77            }
78
79            #[inline(always)]
80            pub fn get(&self) -> *mut T {
81                self.0.get()
82            }
83
84            #[inline(always)]
85            pub fn with<F, R>(&self, f: F) -> R
86            where
87                F: FnOnce(*const T) -> R,
88            {
89                f(self.0.get() as *const T)
90            }
91
92            #[inline(always)]
93            pub fn with_mut<F, R>(&self, f: F) -> R
94            where
95                F: FnOnce(*mut T) -> R,
96            {
97                f(self.0.get())
98            }
99        }
100
101        #[cfg(any(feature = "loom", loom))]
102        pub use loom::cell::UnsafeCell;
103    }
104}
105
106// ── Imports ───────────────────────────────────────────────────────────────
107#[cfg(not(feature = "std"))]
108use alloc::vec::Vec;
109
110use crate::cache_padded::CachePadded;
111use crate::daemon::{Command, Daemon};
112use crate::lossy_queue::{LossyQueue, OneshotAck};
113use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
114use ahash::RandomState;
115use core::hash::{BuildHasher, Hash};
116use sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
117use sync::{Arc, ArcSlice, new_arc_slice};
118
119// ── Config ────────────────────────────────────────────────────────────────
120
121#[derive(Debug, Clone, Copy)]
122pub struct Config {
123    pub capacity: usize,
124    pub t1_slots: usize,
125    pub t2_slots: usize,
126    /// TTL duration in epoch ticks (one tick ≈ 100 ms in std mode).
127    pub duration: u32,
128    pub threads: usize,
129    /// Daemon poll interval in **microseconds** (1 000–10 000 µs = 1–10 ms).
130    /// Controls the latency vs. idle-CPU trade-off.
131    /// Lower = faster hit-signal delivery, higher CPU when idle.
132    /// Higher = more efficient idle, but hotter TLS buffers may stall longer.
133    pub poll_us: u64,
134    /// TLS flush threshold in **daemon ticks**.
135    /// A Worker forces a TLS buffer flush when it detects that the Daemon
136    /// tick counter has advanced by at least this many ticks since the last
137    /// flush, regardless of buffer fill level.
138    ///
139    /// Rule of thumb: `flush_tick_threshold ≈ 1ms / poll_us`.
140    /// For poll_us = 1 000 µs (1 ms): threshold = 1.
141    /// For poll_us = 5 000 µs (5 ms): threshold = 1 (flush every 5 ms).
142    pub flush_tick_threshold: u64,
143}
144
145impl Config {
146    /// Budget-based constructor: specify RAM and TTL, the engine picks sizes.
147    pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
148        // Assume total overhead per item is ~128 bytes
149        let raw_capacity = (ram_mb * 1024 * 1024) / 128;
150        let capacity = raw_capacity.next_power_of_two().max(256);
151
152        Self {
153            capacity,
154            // T1 fits in L1 cache: max 2048 × 8-byte pointers = 16 KB
155            t1_slots: 2048,
156            // T2 intercepts warm data: 20% of capacity (80/20 rule)
157            t2_slots: (capacity / 5).next_power_of_two().max(4096),
158            duration,
159            #[cfg(feature = "std")]
160            threads: std::thread::available_parallelism()
161                .map(|p| p.get())
162                .unwrap_or(16),
163            #[cfg(not(feature = "std"))]
164            threads: 8,
165            poll_us: 1_000,
166            flush_tick_threshold: 1,
167        }
168    }
169
170    /// Expert constructor with explicit physical-law assertions.
171    pub fn new_expert(
172        capacity: usize,
173        t1_slots: usize,
174        t2_slots: usize,
175        duration: u32,
176        threads: usize,
177    ) -> Self {
178        // Physical Law 1: Bitmask routing requires powers of two
179        assert!(capacity.is_power_of_two(), "Capacity MUST be a power of two");
180        assert!(t1_slots.is_power_of_two(), "T1 slots MUST be a power of two");
181        assert!(t2_slots.is_power_of_two(), "T2 slots MUST be a power of two");
182
183        // Physical Law 2: T1 absolutely cannot exceed L1 cache
184        assert!(
185            t1_slots <= 4096,
186            "T1 size exceeds L1 Cache physical limits! Max slots: 4096"
187        );
188
189        Self {
190            capacity,
191            t1_slots,
192            t2_slots,
193            duration,
194            threads,
195            poll_us: 1_000,
196            flush_tick_threshold: 1,
197        }
198    }
199
200    /// Builder: set Daemon poll interval (1 000–10 000 µs).
201    pub fn with_poll_us(mut self, poll_us: u64) -> Self {
202        self.poll_us = poll_us.clamp(1_000, 10_000);
203        self
204    }
205
206    /// Builder: set TLS flush threshold in daemon ticks.
207    pub fn with_flush_tick_threshold(mut self, ticks: u64) -> Self {
208        self.flush_tick_threshold = ticks.max(1);
209        self
210    }
211}
212
213// ── QSBR global epoch ─────────────────────────────────────────────────────
214
215/// Global QSBR epoch. Daemon increments this every maintenance cycle.
216/// Workers store their local epoch on `get()` entry and reset to 0 on exit,
217/// allowing Daemon to safely reclaim stale pointers.
218#[cfg(any(feature = "loom", loom))]
219loom::lazy_static! {
220    pub static ref GLOBAL_EPOCH: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(1);
221}
222
223#[cfg(not(any(feature = "loom", loom)))]
224pub static GLOBAL_EPOCH: sync::atomic::AtomicUsize = sync::atomic::AtomicUsize::new(1);
225
226/// Per-worker QSBR state — cache-line padded to prevent false sharing
227/// between workers checking in/out simultaneously.
228pub struct WorkerState {
229    pub local_epoch: CachePadded<AtomicUsize>,
230}
231
232impl WorkerState {
233    pub fn new() -> Self {
234        Self {
235            local_epoch: CachePadded::new(AtomicUsize::new(0)),
236        }
237    }
238}
239
240// ── Thread-local state (std only) ────────────────────────────────────────
241// In no_std / RTOS mode, TLS is not available. Worker state must be
242// managed by the application (e.g. passed as function arguments or stored
243// in RTOS task-local storage). The cache's `get` / `insert` / `remove`
244// methods fall back to safe, lock-free direct-send paths in no_std mode.
245
246#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
247use std::sync::Mutex;
248
249#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
250struct IdAllocator {
251    free_list: Mutex<Vec<usize>>,
252    next_id: sync::atomic::AtomicUsize,
253}
254
255#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
256static ALLOCATOR: IdAllocator = IdAllocator {
257    free_list: Mutex::new(Vec::new()),
258    next_id: sync::atomic::AtomicUsize::new(0),
259};
260
261#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
262struct ThreadIdGuard {
263    id: usize,
264}
265
266#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
267impl Drop for ThreadIdGuard {
268    fn drop(&mut self) {
269        if let Ok(mut list) = ALLOCATOR.free_list.lock() {
270            list.push(self.id);
271        }
272    }
273}
274
275#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
276use core::cell::{Cell, RefCell};
277
278#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
279thread_local! {
280    static WORKER_ID: usize = {
281        let id = if let Ok(mut list) = ALLOCATOR.free_list.lock() {
282            list.pop().unwrap_or_else(|| ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed))
283        } else {
284            ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed)
285        };
286        
287        GUARD.with(|g| {
288            *g.borrow_mut() = Some(ThreadIdGuard { id });
289        });
290        id
291    };
292
293    static GUARD: RefCell<Option<ThreadIdGuard>> = const { RefCell::new(None) };
294
295    /// Hit index buffer: batches 64 Cache-hit global indices before sending
296    /// to Daemon via the hit queue.
297    static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
298
299    /// TLS probation filter: prevents single-hit items from reaching the
300    /// Arena. A 4 KB sketch that decays periodically.
301    static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
302
303    /// Task 6 — last daemon_tick observed at TLS flush time.
304    /// When `daemon_tick - LAST_FLUSH_TICK >= flush_tick_threshold`, the
305    /// Worker force-drains its TLS buffer even if it is not full.
306    static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
307}
308
309#[cfg(any(feature = "loom", loom))]
310loom::lazy_static! {
311    static ref NEXT_THREAD_ID: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(0);
312}
313
314#[cfg(any(feature = "loom", loom))]
315use core::cell::{Cell, RefCell};
316
317#[cfg(any(feature = "loom", loom))]
318loom::thread_local! {
319    static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
320
321    /// Hit index buffer: batches 64 Cache-hit global indices before sending
322    /// to Daemon via the hit queue.
323    static HIT_BUF: RefCell<([usize; 64], usize)> = RefCell::new(([0; 64], 0));
324
325    /// TLS probation filter: prevents single-hit items from reaching the
326    /// Arena. A 4 KB sketch that decays periodically.
327    /// Heap-allocated under Loom via `vec!` to prevent virtual coroutine stack overflow.
328    static L1_FILTER: RefCell<(Box<[u8]>, usize)> = RefCell::new((vec![0u8; 4096].into_boxed_slice(), 0));
329
330    /// Task 6 — last daemon_tick observed at TLS flush time.
331    /// When `daemon_tick - LAST_FLUSH_TICK >= flush_tick_threshold`, the
332    /// Worker force-drains its TLS buffer even if it is not full.
333    static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
334}
335
336// ── DualCacheFF ───────────────────────────────────────────────────────────
337
338pub struct DualCacheFF<K, V, S = RandomState> {
339    pub hasher: S,
340    pub t1: Arc<T1<K, V>>,
341    pub t2: Arc<T2<K, V>>,
342    pub cache: Arc<Cache<K, V>>,
343    pub cmd_tx: Arc<LossyQueue<Command<K, V>>>,
344    pub hit_tx: Arc<LossyQueue<[usize; 64]>>,
345    pub epoch: Arc<AtomicU32>,
346    /// QSBR registry: one entry per thread slot.
347    pub worker_states: ArcSlice<WorkerState>,
348    /// Per-worker zero-lock batch buffers, indexed by WORKER_ID.
349    pub miss_buffers: ArcSlice<WorkerSlot<K, V>>,
350    /// Daemon tick counter — shared with the Daemon thread.
351    /// Workers read this (Relaxed) to implement time-based TLS flush.
352    pub daemon_tick: Arc<AtomicU64>,
353    /// Number of daemon_tick advances that correspond to ≈1 ms of real time.
354    pub flush_tick_threshold: u64,
355    /// Cold-start flag: Daemon sets this to false when capacity is reached.
356    pub is_cold_start: Arc<sync::atomic::AtomicBool>,
357}
358
359impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
360    fn clone(&self) -> Self {
361        Self {
362            hasher: self.hasher.clone(),
363            t1: self.t1.clone(),
364            t2: self.t2.clone(),
365            cache: self.cache.clone(),
366            cmd_tx: self.cmd_tx.clone(),
367            hit_tx: self.hit_tx.clone(),
368            epoch: self.epoch.clone(),
369            worker_states: self.worker_states.clone(),
370            miss_buffers: self.miss_buffers.clone(),
371            daemon_tick: self.daemon_tick.clone(),
372            flush_tick_threshold: self.flush_tick_threshold,
373            is_cold_start: self.is_cold_start.clone(),
374        }
375    }
376}
377
378// ── Constructor (std mode — auto-spawns Daemon thread) ────────────────────
379
380#[cfg(feature = "std")]
381impl<K, V> DualCacheFF<K, V, RandomState>
382where
383    K: Hash + Eq + Send + Sync + Clone + 'static,
384    V: Send + Sync + Clone + 'static,
385{
386    /// Create a new `DualCacheFF` and automatically spawn the background Daemon.
387    ///
388    /// Use this in `std` environments (servers, desktops).
389    pub fn new(config: Config) -> Self {
390        let (cache, daemon) = Self::new_headless(config);
391        #[cfg(any(feature = "loom", loom))]
392        {
393            let _ = daemon;
394        }
395        #[cfg(not(any(feature = "loom", loom)))]
396        std::thread::spawn(move || daemon.run());
397        cache
398    }
399}
400
401// ── Constructor (universal — returns Daemon for manual scheduling) ─────────
402
403impl<K, V> DualCacheFF<K, V, RandomState>
404where
405    K: Hash + Eq + Send + Sync + Clone + 'static,
406    V: Send + Sync + Clone + 'static,
407{
408    /// Create a `DualCacheFF` and its `Daemon` without spawning any thread.
409    ///
410    /// # std mode
411    /// Prefer `DualCacheFF::new()` which spawns the daemon automatically.
412    ///
413    /// # no_std / RTOS mode
414    /// Use `new_headless()` to obtain both the cache handle and the daemon.
415    /// Schedule `daemon.run()` on a dedicated RTOS task:
416    /// ```ignore
417    /// let (cache, daemon) = DualCacheFF::new_headless(config);
418    /// rtos::spawn_task(|| daemon.run()); // RTOS-specific API
419    /// ```
420    pub fn new_headless(config: Config) -> (Self, Daemon<K, V, RandomState>) {
421        let hasher = RandomState::new();
422        let t1 = Arc::new(T1::new(config.t1_slots));
423        let t2 = Arc::new(T2::new(config.t2_slots));
424        let cache = Arc::new(Cache::new(config.capacity));
425        let cmd_q: Arc<LossyQueue<Command<K, V>>> = Arc::new(LossyQueue::new(8192));
426        let hit_q: Arc<LossyQueue<[usize; 64]>> = Arc::new(LossyQueue::new(1024));
427        let epoch = Arc::new(AtomicU32::new(0));
428        let daemon_tick = Arc::new(AtomicU64::new(0));
429        let is_cold_start = Arc::new(sync::atomic::AtomicBool::new(true));
430
431        let mut buffers = Vec::with_capacity(config.threads);
432        let mut states = Vec::with_capacity(config.threads);
433        for _ in 0..config.threads {
434            buffers.push(WorkerSlot::new());
435            states.push(WorkerState::new());
436        }
437        let miss_buffers = new_arc_slice(buffers);
438        let worker_states = new_arc_slice(states);
439
440        let daemon = Daemon::new(
441            hasher.clone(),
442            config.capacity,
443            t1.clone(),
444            t2.clone(),
445            cache.clone(),
446            cmd_q.clone(),
447            hit_q.clone(),
448            epoch.clone(),
449            config.duration,
450            config.poll_us,
451            worker_states.clone(),
452            daemon_tick.clone(),
453            is_cold_start.clone(),
454        );
455
456        let this = Self {
457            hasher,
458            t1,
459            t2,
460            cache,
461            cmd_tx: cmd_q,
462            hit_tx: hit_q,
463            epoch,
464            worker_states,
465            miss_buffers,
466            daemon_tick,
467            flush_tick_threshold: config.flush_tick_threshold,
468            is_cold_start,
469        };
470
471        (this, daemon)
472    }
473}
474
475// ── Public API (std + no_std) ─────────────────────────────────────────────
476
477impl<K, V, S> DualCacheFF<K, V, S>
478where
479    K: Hash + Eq + Send + Sync + Clone + 'static,
480    V: Send + Sync + Clone + 'static,
481    S: BuildHasher + Clone + Send + 'static,
482{
483    /// Flush all pending TLS buffers and wait for the Daemon to process them.
484    ///
485    /// Blocks via `OneshotAck::wait()` (spin-wait, safe in both std and no_std).
486    pub fn sync(&self) {
487        // ── std: flush TLS hit buffer ─────────────────────────────────────
488        #[cfg(feature = "std")]
489        HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
490            let mut state = buf.borrow_mut();
491            if state.1 > 0_usize {
492                let _ = self.hit_tx.try_send(state.0);
493                state.1 = 0;
494            }
495        });
496
497        // ── std: flush all worker slots ───────────────────────────────────
498        #[cfg(feature = "std")]
499        for slot in self.miss_buffers.iter() {
500            let buf = unsafe { slot.get_mut_unchecked() };
501            if buf.len() > 0 {
502                let batch = buf.drain_to_vec();
503                let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
504            }
505        }
506
507        // Send a Sync command and spin-wait for acknowledgment
508        let ack = OneshotAck::new();
509        self.cmd_tx.send_blocking(Command::Sync(ack.clone()));
510        ack.wait();
511    }
512
513    /// Look up a key.
514    ///
515    /// Hot-path order: T1 (L1 direct-map) → T2 (L2 direct-map) → Cache (L3).
516    /// Records a hit signal into the TLS buffer for Daemon processing.
517    pub fn get(&self, key: &K) -> Option<V> {
518        let hash = self.hash(key);
519        let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
520
521        // ── QSBR Check-in (std only — requires TLS) ───────────────────────
522        #[cfg(feature = "std")]
523        let mut id_opt = None;
524        #[cfg(feature = "std")]
525        {
526            let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
527            WORKER_ID.with(|&id| {
528                if id < self.worker_states.len() {
529                    self.worker_states[id]
530                        .local_epoch
531                        .store(global_epoch, Ordering::Relaxed);
532                    id_opt = Some(id);
533                }
534            });
535        }
536
537        #[cfg(feature = "std")]
538        let has_epoch = id_opt.is_some();
539        #[cfg(not(feature = "std"))]
540        let has_epoch = true;
541
542        let mut res: Option<V> = None;
543        let mut hit_g_idx: Option<u32> = None;
544
545        if has_epoch {
546            // ── T1 check ──────────────────────────────────────────────────────
547            let ptr_t1: *mut crate::storage::Node<K, V> = self.t1.load_slot(hash);
548            if !ptr_t1.is_null() {
549                let node = unsafe { &*ptr_t1 };
550                if node.key == *key
551                    && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
552                {
553                    res = Some(node.value.clone());
554                    hit_g_idx = Some(node.g_idx);
555                }
556            }
557
558            // ── T2 check ──────────────────────────────────────────────────────
559            if res.is_none() {
560                let ptr_t2: *mut crate::storage::Node<K, V> = self.t2.load_slot(hash);
561                if !ptr_t2.is_null() {
562                    let node = unsafe { &*ptr_t2 };
563                    if node.key == *key
564                        && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
565                    {
566                        res = Some(node.value.clone());
567                        hit_g_idx = Some(node.g_idx);
568                    }
569                }
570            }
571
572            // ── Cache (L3) check ──────────────────────────────────────────────
573            if res.is_none() {
574                let tag = (hash >> 48) as u16;
575                if let Some(global_idx) = self.cache.index_probe(hash, tag) {
576                    if let Some(v) = self
577                        .cache
578                        .node_get_full(global_idx, key, current_epoch_cache)
579                    {
580                        res = Some(v);
581                        hit_g_idx = Some(global_idx as u32);
582                    }
583                }
584            }
585        }
586
587        // ── QSBR Check-out (std only) ─────────────────────────────────────
588        #[cfg(feature = "std")]
589        if let Some(id) = id_opt {
590            self.worker_states[id]
591                .local_epoch
592                .store(0, Ordering::Relaxed);
593        }
594
595        if let Some(g_idx) = hit_g_idx {
596            self.record_hit(g_idx as usize);
597        }
598
599        res
600    }
601
602    /// Insert a key-value pair.
603    ///
604    /// # L1 Probation Filter (std only)
605    /// Items that appear only once in a TLS epoch are silently dropped.
606    /// This prevents cache pollution from scan traffic.
607    /// In no_std mode the filter is skipped and all items are forwarded.
608    ///
609    /// # Task 6 — Time-based TLS Flush (std only)
610    /// The TLS batch buffer normally flushes when it reaches 32 items.
611    /// Additionally, if the Daemon tick counter has advanced by at least
612    /// `flush_tick_threshold` since the last flush, the buffer is force-drained
613    /// even if nearly empty. This prevents hot items from being invisible to
614    /// the Daemon for too long (the "split-brain eviction" bug).
615    pub fn insert(&self, key: K, value: V) {
616        let hash = self.hash(&key);
617
618        // ── std path: L1 Probation Filter + TLS batch ─────────────────────
619        #[cfg(feature = "std")]
620        {
621            let is_cold = self.is_cold_start.load(Ordering::Relaxed);
622            let mut bypass = is_cold;
623
624            if !bypass {
625                // Perform thread-safe fast lookup to see if key exists
626                // ── QSBR Check-in ───────────────────────
627                let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
628                let mut id_opt = None;
629                WORKER_ID.with(|&id| {
630                    if id < self.worker_states.len() {
631                        self.worker_states[id]
632                            .local_epoch
633                            .store(global_epoch, Ordering::Relaxed);
634                        id_opt = Some(id);
635                    }
636                });
637
638                if id_opt.is_some() {
639                    // T1 check
640                    let ptr_t1 = self.t1.load_slot(hash);
641                    if !ptr_t1.is_null() {
642                        let node = unsafe { &*ptr_t1 };
643                        if node.key == key {
644                            bypass = true;
645                        }
646                    }
647
648                    // T2 check
649                    if !bypass {
650                        let ptr_t2 = self.t2.load_slot(hash);
651                        if !ptr_t2.is_null() {
652                            let node = unsafe { &*ptr_t2 };
653                            if node.key == key {
654                                bypass = true;
655                            }
656                        }
657                    }
658
659                    // Cache (L3) check
660                    if !bypass {
661                        let tag = (hash >> 48) as u16;
662                        if let Some(global_idx) = self.cache.index_probe(hash, tag) {
663                            let ptr = self.cache.nodes[global_idx].load(Ordering::Acquire);
664                            if !ptr.is_null() {
665                                let node = unsafe { &*ptr };
666                                if node.key == key {
667                                    bypass = true;
668                                }
669                            }
670                        }
671                    }
672                }
673
674                // ── QSBR Check-out ─────────────────────────────────────
675                if let Some(id) = id_opt {
676                    self.worker_states[id]
677                        .local_epoch
678                        .store(0, Ordering::Relaxed);
679                }
680            }
681
682            let pass = if bypass {
683                true
684            } else {
685                // L1 Probation Filter
686                #[cfg(any(feature = "loom", loom))]
687                {
688                    L1_FILTER.with(|f| {
689                        let mut state = f.borrow_mut();
690                        let idx = (hash as usize) & 4095_usize;
691                        let val = state.0[idx];
692
693                        state.1 += 1;
694                        if state.1 >= 4096_usize {
695                            for x in state.0.iter_mut() {
696                                *x >>= 1;
697                            }
698                            state.1 = 0;
699                        }
700
701                        if val < 1_u8 {
702                            state.0[idx] = 1;
703                            false
704                        } else {
705                            if val < 2_u8 {
706                                state.0[idx] = 2;
707                            }
708                            true
709                        }
710                    })
711                }
712
713                #[cfg(not(any(feature = "loom", loom)))]
714                {
715                    L1_FILTER.with(|f: &RefCell<([u8; 4096], usize)>| {
716                        let mut state = f.borrow_mut();
717                        let idx = (hash as usize) & 4095_usize;
718                        let val = state.0[idx];
719
720                        state.1 += 1;
721                        if state.1 >= 4096_usize {
722                            for x in state.0.iter_mut() {
723                                *x >>= 1;
724                            }
725                            state.1 = 0;
726                        }
727
728                        if val < 1_u8 {
729                            state.0[idx] = 1;
730                            false
731                        } else {
732                            if val < 2_u8 {
733                                state.0[idx] = 2;
734                            }
735                            true
736                        }
737                    })
738                }
739            };
740
741            if !pass {
742                return;
743            }
744
745            // Task 6: Time-based flush detection
746            let current_tick = self.daemon_tick.load(Ordering::Relaxed);
747            let should_time_flush = LAST_FLUSH_TICK.with(|c: &Cell<u64>| {
748                current_tick.wrapping_sub(c.get()) >= self.flush_tick_threshold
749            });
750
751            // Worker TLS batch buffer
752            WORKER_ID.with(|&id| {
753                if id >= self.miss_buffers.len() {
754                    // Worker overflow: gracefully degrade to direct send
755                    let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
756                    return;
757                }
758
759                // Safety: WORKER_ID is unique per thread → exclusive slot access
760                let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
761                let capacity_flush = buf.push((key, value, hash));
762
763                if capacity_flush || (should_time_flush && !buf.is_empty()) {
764                    let batch = buf.drain_to_vec();
765                    let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
766                    LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(current_tick));
767                }
768            });
769        }
770
771        // ── no_std path: direct send (no TLS available) ───────────────────
772        #[cfg(not(feature = "std"))]
773        {
774            let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
775        }
776    }
777
778    /// Remove a key from the cache.
779    pub fn remove(&self, key: &K) {
780        let hash = self.hash(key);
781
782        // ── std: flush this thread's buffer first for causal ordering ─────
783        #[cfg(feature = "std")]
784        WORKER_ID.with(|&id| {
785            if id < self.miss_buffers.len() {
786                let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
787                if buf.len() > 0 {
788                    let batch = buf.drain_to_vec();
789                    let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
790                    let tick = self.daemon_tick.load(Ordering::Relaxed);
791                    LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(tick));
792                }
793            }
794        });
795
796        self.cmd_tx.send_blocking(Command::Remove(key.clone(), hash));
797    }
798
799    /// Clear all cached data.
800    pub fn clear(&self) {
801        let ack = OneshotAck::new();
802        self.cmd_tx.send_blocking(Command::Clear(ack.clone()));
803        ack.wait();
804    }
805
806    // ── Internals ─────────────────────────────────────────────────────────
807
808    #[inline(always)]
809    fn hash(&self, key: &K) -> u64 {
810        self.hasher.hash_one(key)
811    }
812
813    /// Buffer a Cache-hit global index for Daemon processing.
814    ///
815    /// std: fills the 64-element TLS array and ships it to `hit_tx` when full.
816    /// no_std: sends directly (no TLS batch buffering available).
817    #[inline(always)]
818    fn record_hit(&self, global_idx: usize) {
819        #[cfg(feature = "std")]
820        HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
821            let mut state = buf.borrow_mut();
822            let idx = state.1;
823            state.0[idx] = global_idx;
824            state.1 += 1;
825            if state.1 == 64_usize {
826                let _ = self.hit_tx.try_send(state.0);
827                state.1 = 0;
828            }
829        });
830
831        // no_std: no TLS; hit signals are not batched.
832        // Daemon still processes hits via hit_rx if sent individually.
833        #[cfg(not(feature = "std"))]
834        {
835            let mut batch = [0usize; 64];
836            batch[0] = global_idx;
837            let _ = self.hit_tx.try_send(batch);
838        }
839    }
840}
841
842impl<K, V, S> Drop for DualCacheFF<K, V, S> {
843    fn drop(&mut self) {
844        if Arc::strong_count(&self.cmd_tx) <= 2 {
845            let _ = self.cmd_tx.try_send(Command::Shutdown);
846        }
847    }
848}
849