Skip to main content

dualcache_ff/
lib.rs

1// ── no_std gate ───────────────────────────────────────────────────────────
2// When the "std" feature is disabled we enter no_std mode.
3// `extern crate alloc` provides Vec, Box, Arc etc. from the allocator crate.
4// The RTOS / bare-metal environment must supply a global allocator.
5#![cfg_attr(not(feature = "std"), no_std)]
6#[cfg(not(feature = "std"))]
7extern crate alloc;
8
9// ── Public sub-modules ────────────────────────────────────────────────────
10pub mod arena;
11pub mod cache_padded;
12pub mod daemon;
13pub mod filters;
14pub mod lossy_queue;
15pub mod storage;
16pub mod unsafe_core;
17pub mod workers;
18pub mod static_cache;
19
20
21// ── Internal sync abstraction ─────────────────────────────────────────────
22/// Type-routing module: selects the correct `Arc` implementation based on
23/// the active feature flags.
24///
25/// | Feature    | Source               |
26/// |------------|----------------------|
27/// | `std`      | `std::sync::Arc`     |
28/// | `loom`     | `loom::sync::Arc`    |
29/// | _(neither)_| `alloc::sync::Arc`   |
30pub(crate) mod sync {
31    #[cfg(not(feature = "std"))]
32    use alloc::vec::Vec;
33    #[cfg(feature = "std")]
34    use std::vec::Vec;
35
36    #[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
37    pub use std::sync::Arc;
38
39    #[cfg(any(feature = "loom", loom))]
40    pub use loom::sync::Arc;
41
42    #[cfg(all(not(feature = "std"), not(any(feature = "loom", loom))))]
43    pub use alloc::sync::Arc;
44
45    #[cfg(not(any(feature = "loom", loom)))]
46    pub type ArcSlice<T> = Arc<[T]>;
47
48    #[cfg(any(feature = "loom", loom))]
49    pub type ArcSlice<T> = Arc<Vec<T>>;
50
51    #[cfg(not(any(feature = "loom", loom)))]
52    #[inline(always)]
53    pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
54        vec.into_boxed_slice().into()
55    }
56
57    #[cfg(any(feature = "loom", loom))]
58    #[inline(always)]
59    pub fn new_arc_slice<T>(vec: Vec<T>) -> ArcSlice<T> {
60        Arc::new(vec)
61    }
62
63    pub mod atomic {
64        #[cfg(not(any(feature = "loom", loom)))]
65        pub use core::sync::atomic::{
66            AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
67        };
68
69        #[cfg(any(feature = "loom", loom))]
70        pub use loom::sync::atomic::{
71            AtomicBool, AtomicPtr, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicU8, Ordering,
72        };
73    }
74
75    pub mod cell {
76        #[cfg(not(any(feature = "loom", loom)))]
77        pub struct UnsafeCell<T>(core::cell::UnsafeCell<T>);
78
79        #[cfg(not(any(feature = "loom", loom)))]
80        impl<T> UnsafeCell<T> {
81            #[inline(always)]
82            pub const fn new(data: T) -> Self {
83                Self(core::cell::UnsafeCell::new(data))
84            }
85
86            #[inline(always)]
87            pub fn get(&self) -> *mut T {
88                self.0.get()
89            }
90
91            #[inline(always)]
92            pub fn with<F, R>(&self, f: F) -> R
93            where
94                F: FnOnce(*const T) -> R,
95            {
96                f(self.0.get() as *const T)
97            }
98
99            #[inline(always)]
100            pub fn with_mut<F, R>(&self, f: F) -> R
101            where
102                F: FnOnce(*mut T) -> R,
103            {
104                f(self.0.get())
105            }
106        }
107
108        #[cfg(any(feature = "loom", loom))]
109        pub use loom::cell::UnsafeCell;
110    }
111}
112
113// ── Imports ───────────────────────────────────────────────────────────────
114#[cfg(not(feature = "std"))]
115use alloc::vec::Vec;
116
117use crate::cache_padded::CachePadded;
118use crate::daemon::{Command, Daemon};
119use crate::lossy_queue::{LossyQueue, OneshotAck};
120use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
121use ahash::RandomState;
122use core::hash::{BuildHasher, Hash};
123use sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
124use sync::{Arc, ArcSlice, new_arc_slice};
125
126// ── Config ────────────────────────────────────────────────────────────────
127
128#[derive(Debug, Clone, Copy)]
129pub struct Config {
130    pub capacity: usize,
131    pub t1_slots: usize,
132    pub t2_slots: usize,
133    /// TTL duration in epoch ticks (one tick ≈ 100 ms in std mode).
134    pub duration: u32,
135    pub threads: usize,
136    /// Daemon poll interval in **microseconds** (1 000–10 000 µs = 1–10 ms).
137    /// Controls the latency vs. idle-CPU trade-off.
138    /// Lower = faster hit-signal delivery, higher CPU when idle.
139    /// Higher = more efficient idle, but hotter TLS buffers may stall longer.
140    pub poll_us: u64,
141    /// TLS flush threshold in **daemon ticks**.
142    /// A Worker forces a TLS buffer flush when it detects that the Daemon
143    /// tick counter has advanced by at least this many ticks since the last
144    /// flush, regardless of buffer fill level.
145    ///
146    /// Rule of thumb: `flush_tick_threshold ≈ 1ms / poll_us`.
147    /// For poll_us = 1 000 µs (1 ms): threshold = 1.
148    /// For poll_us = 5 000 µs (5 ms): threshold = 1 (flush every 5 ms).
149    pub flush_tick_threshold: u64,
150}
151
152impl Config {
153    /// Budget-based constructor: specify RAM and TTL, the engine picks sizes.
154    pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
155        // Assume total overhead per item is ~128 bytes
156        let raw_capacity = (ram_mb * 1024 * 1024) / 128;
157        let capacity = raw_capacity.next_power_of_two().max(256);
158
159        Self {
160            capacity,
161            // T1 fits in L1 cache: max 2048 × 8-byte pointers = 16 KB
162            t1_slots: 2048,
163            // T2 intercepts warm data: 20% of capacity (80/20 rule)
164            t2_slots: (capacity / 5).next_power_of_two().max(4096),
165            duration,
166            #[cfg(feature = "std")]
167            threads: std::thread::available_parallelism()
168                .map(|p| p.get())
169                .unwrap_or(16),
170            #[cfg(not(feature = "std"))]
171            threads: 8,
172            poll_us: 1_000,
173            flush_tick_threshold: 1,
174        }
175    }
176
177    /// Expert constructor with explicit physical-law assertions.
178    pub fn new_expert(
179        capacity: usize,
180        t1_slots: usize,
181        t2_slots: usize,
182        duration: u32,
183        threads: usize,
184    ) -> Self {
185        // Physical Law 1: Bitmask routing requires powers of two
186        assert!(capacity.is_power_of_two(), "Capacity MUST be a power of two");
187        assert!(t1_slots.is_power_of_two(), "T1 slots MUST be a power of two");
188        assert!(t2_slots.is_power_of_two(), "T2 slots MUST be a power of two");
189
190        // Physical Law 2: T1 absolutely cannot exceed L1 cache
191        assert!(
192            t1_slots <= 4096,
193            "T1 size exceeds L1 Cache physical limits! Max slots: 4096"
194        );
195
196        Self {
197            capacity,
198            t1_slots,
199            t2_slots,
200            duration,
201            threads,
202            poll_us: 1_000,
203            flush_tick_threshold: 1,
204        }
205    }
206
207    /// Builder: set Daemon poll interval (1 000–10 000 µs).
208    pub fn with_poll_us(mut self, poll_us: u64) -> Self {
209        self.poll_us = poll_us.clamp(1_000, 10_000);
210        self
211    }
212
213    /// Builder: set TLS flush threshold in daemon ticks.
214    pub fn with_flush_tick_threshold(mut self, ticks: u64) -> Self {
215        self.flush_tick_threshold = ticks.max(1);
216        self
217    }
218}
219
220// ── QSBR global epoch ─────────────────────────────────────────────────────
221
222/// Global QSBR epoch. Daemon increments this every maintenance cycle.
223/// Workers store their local epoch on `get()` entry and reset to 0 on exit,
224/// allowing Daemon to safely reclaim stale pointers.
225#[cfg(any(feature = "loom", loom))]
226loom::lazy_static! {
227    pub static ref GLOBAL_EPOCH: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(1);
228}
229
230#[cfg(not(any(feature = "loom", loom)))]
231pub static GLOBAL_EPOCH: sync::atomic::AtomicUsize = sync::atomic::AtomicUsize::new(1);
232
233/// Per-worker QSBR state — cache-line padded to prevent false sharing
234/// between workers checking in/out simultaneously.
235pub struct WorkerState {
236    pub local_epoch: CachePadded<AtomicUsize>,
237}
238
239impl WorkerState {
240    pub fn new() -> Self {
241        Self {
242            local_epoch: CachePadded::new(AtomicUsize::new(0)),
243        }
244    }
245}
246
247// ── Thread-local state (std only) ────────────────────────────────────────
248// In no_std / RTOS mode, TLS is not available. Worker state must be
249// managed by the application (e.g. passed as function arguments or stored
250// in RTOS task-local storage). The cache's `get` / `insert` / `remove`
251// methods fall back to safe, lock-free direct-send paths in no_std mode.
252
253#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
254use std::sync::Mutex;
255
256#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
257struct IdAllocator {
258    free_list: Mutex<Vec<usize>>,
259    next_id: sync::atomic::AtomicUsize,
260}
261
262#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
263static ALLOCATOR: IdAllocator = IdAllocator {
264    free_list: Mutex::new(Vec::new()),
265    next_id: sync::atomic::AtomicUsize::new(0),
266};
267
268#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
269struct ThreadIdGuard {
270    id: usize,
271}
272
273#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
274impl Drop for ThreadIdGuard {
275    fn drop(&mut self) {
276        if let Ok(mut list) = ALLOCATOR.free_list.lock() {
277            list.push(self.id);
278        }
279    }
280}
281
282#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
283use core::cell::{Cell, RefCell};
284
285#[cfg(all(feature = "std", not(any(feature = "loom", loom))))]
286thread_local! {
287    static WORKER_ID: usize = {
288        let id = if let Ok(mut list) = ALLOCATOR.free_list.lock() {
289            list.pop().unwrap_or_else(|| ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed))
290        } else {
291            ALLOCATOR.next_id.fetch_add(1, Ordering::Relaxed)
292        };
293        
294        GUARD.with(|g| {
295            *g.borrow_mut() = Some(ThreadIdGuard { id });
296        });
297        id
298    };
299
300    static GUARD: RefCell<Option<ThreadIdGuard>> = const { RefCell::new(None) };
301
302    /// Hit index buffer: batches 64 Cache-hit global indices before sending
303    /// to Daemon via the hit queue.
304    static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
305
306    /// TLS probation filter: prevents single-hit items from reaching the
307    /// Arena. A 4 KB sketch that decays periodically.
308    static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
309
310    /// Task 6 — last daemon_tick observed at TLS flush time.
311    /// When `daemon_tick - LAST_FLUSH_TICK >= flush_tick_threshold`, the
312    /// Worker force-drains its TLS buffer even if it is not full.
313    static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
314}
315
316#[cfg(any(feature = "loom", loom))]
317loom::lazy_static! {
318    static ref NEXT_THREAD_ID: loom::sync::atomic::AtomicUsize = loom::sync::atomic::AtomicUsize::new(0);
319}
320
321#[cfg(any(feature = "loom", loom))]
322use core::cell::{Cell, RefCell};
323
324#[cfg(any(feature = "loom", loom))]
325loom::thread_local! {
326    static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
327
328    /// Hit index buffer: batches 64 Cache-hit global indices before sending
329    /// to Daemon via the hit queue.
330    static HIT_BUF: RefCell<([usize; 64], usize)> = RefCell::new(([0; 64], 0));
331
332    /// TLS probation filter: prevents single-hit items from reaching the
333    /// Arena. A 4 KB sketch that decays periodically.
334    /// Heap-allocated under Loom via `vec!` to prevent virtual coroutine stack overflow.
335    static L1_FILTER: RefCell<(Box<[u8]>, usize)> = RefCell::new((vec![0u8; 4096].into_boxed_slice(), 0));
336
337    /// Task 6 — last daemon_tick observed at TLS flush time.
338    /// When `daemon_tick - LAST_FLUSH_TICK >= flush_tick_threshold`, the
339    /// Worker force-drains its TLS buffer even if it is not full.
340    static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
341}
342
343// ── DualCacheFF ───────────────────────────────────────────────────────────
344
345pub struct DualCacheFF<K, V, S = RandomState> {
346    pub hasher: S,
347    pub t1: Arc<T1<K, V>>,
348    pub t2: Arc<T2<K, V>>,
349    pub cache: Arc<Cache<K, V>>,
350    pub cmd_tx: Arc<LossyQueue<Command<K, V>>>,
351    pub hit_tx: Arc<LossyQueue<[usize; 64]>>,
352    pub epoch: Arc<AtomicU32>,
353    /// QSBR registry: one entry per thread slot.
354    pub worker_states: ArcSlice<WorkerState>,
355    /// Per-worker zero-lock batch buffers, indexed by WORKER_ID.
356    pub miss_buffers: ArcSlice<WorkerSlot<K, V>>,
357    /// Daemon tick counter — shared with the Daemon thread.
358    /// Workers read this (Relaxed) to implement time-based TLS flush.
359    pub daemon_tick: Arc<AtomicU64>,
360    /// Number of daemon_tick advances that correspond to ≈1 ms of real time.
361    pub flush_tick_threshold: u64,
362    /// Cold-start flag: Daemon sets this to false when capacity is reached.
363    pub is_cold_start: Arc<sync::atomic::AtomicBool>,
364}
365
366impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
367    fn clone(&self) -> Self {
368        Self {
369            hasher: self.hasher.clone(),
370            t1: self.t1.clone(),
371            t2: self.t2.clone(),
372            cache: self.cache.clone(),
373            cmd_tx: self.cmd_tx.clone(),
374            hit_tx: self.hit_tx.clone(),
375            epoch: self.epoch.clone(),
376            worker_states: self.worker_states.clone(),
377            miss_buffers: self.miss_buffers.clone(),
378            daemon_tick: self.daemon_tick.clone(),
379            flush_tick_threshold: self.flush_tick_threshold,
380            is_cold_start: self.is_cold_start.clone(),
381        }
382    }
383}
384
385// ── Constructor (std mode — auto-spawns Daemon thread) ────────────────────
386
387#[cfg(feature = "std")]
388impl<K, V> DualCacheFF<K, V, RandomState>
389where
390    K: Hash + Eq + Send + Sync + Clone + 'static,
391    V: Send + Sync + Clone + 'static,
392{
393    /// Create a new `DualCacheFF` and automatically spawn the background Daemon.
394    ///
395    /// Use this in `std` environments (servers, desktops).
396    pub fn new(config: Config) -> Self {
397        let (cache, daemon) = Self::new_headless(config);
398        #[cfg(any(feature = "loom", loom))]
399        {
400            let _ = daemon;
401        }
402        #[cfg(not(any(feature = "loom", loom)))]
403        std::thread::spawn(move || daemon.run());
404        cache
405    }
406}
407
408// ── Constructor (universal — returns Daemon for manual scheduling) ─────────
409
410impl<K, V> DualCacheFF<K, V, RandomState>
411where
412    K: Hash + Eq + Send + Sync + Clone + 'static,
413    V: Send + Sync + Clone + 'static,
414{
415    /// Create a `DualCacheFF` and its `Daemon` without spawning any thread.
416    ///
417    /// # std mode
418    /// Prefer `DualCacheFF::new()` which spawns the daemon automatically.
419    ///
420    /// # no_std / RTOS mode
421    /// Use `new_headless()` to obtain both the cache handle and the daemon.
422    /// Schedule `daemon.run()` on a dedicated RTOS task:
423    /// ```ignore
424    /// let (cache, daemon) = DualCacheFF::new_headless(config);
425    /// rtos::spawn_task(|| daemon.run()); // RTOS-specific API
426    /// ```
427    pub fn new_headless(config: Config) -> (Self, Daemon<K, V, RandomState>) {
428        let hasher = RandomState::new();
429        let t1 = Arc::new(T1::new(config.t1_slots));
430        let t2 = Arc::new(T2::new(config.t2_slots));
431        let cache = Arc::new(Cache::new(config.capacity));
432        let cmd_q: Arc<LossyQueue<Command<K, V>>> = Arc::new(LossyQueue::new(8192));
433        let hit_q: Arc<LossyQueue<[usize; 64]>> = Arc::new(LossyQueue::new(1024));
434        let epoch = Arc::new(AtomicU32::new(0));
435        let daemon_tick = Arc::new(AtomicU64::new(0));
436        let is_cold_start = Arc::new(sync::atomic::AtomicBool::new(true));
437
438        let mut buffers = Vec::with_capacity(config.threads);
439        let mut states = Vec::with_capacity(config.threads);
440        for _ in 0..config.threads {
441            buffers.push(WorkerSlot::new());
442            states.push(WorkerState::new());
443        }
444        let miss_buffers = new_arc_slice(buffers);
445        let worker_states = new_arc_slice(states);
446
447        let daemon = Daemon::new(
448            hasher.clone(),
449            config.capacity,
450            t1.clone(),
451            t2.clone(),
452            cache.clone(),
453            cmd_q.clone(),
454            hit_q.clone(),
455            epoch.clone(),
456            config.duration,
457            config.poll_us,
458            worker_states.clone(),
459            daemon_tick.clone(),
460            is_cold_start.clone(),
461        );
462
463        let this = Self {
464            hasher,
465            t1,
466            t2,
467            cache,
468            cmd_tx: cmd_q,
469            hit_tx: hit_q,
470            epoch,
471            worker_states,
472            miss_buffers,
473            daemon_tick,
474            flush_tick_threshold: config.flush_tick_threshold,
475            is_cold_start,
476        };
477
478        (this, daemon)
479    }
480}
481
482// ── Public API (std + no_std) ─────────────────────────────────────────────
483
484impl<K, V, S> DualCacheFF<K, V, S>
485where
486    K: Hash + Eq + Send + Sync + Clone + 'static,
487    V: Send + Sync + Clone + 'static,
488    S: BuildHasher + Clone + Send + 'static,
489{
490    /// Flush all pending TLS buffers and wait for the Daemon to process them.
491    ///
492    /// Blocks via `OneshotAck::wait()` (spin-wait, safe in both std and no_std).
493    pub fn sync(&self) {
494        // ── std: flush TLS hit buffer ─────────────────────────────────────
495        #[cfg(feature = "std")]
496        HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
497            let mut state = buf.borrow_mut();
498            if state.1 > 0_usize {
499                let _ = self.hit_tx.try_send(state.0);
500                state.1 = 0;
501            }
502        });
503
504        // ── std: flush all worker slots ───────────────────────────────────
505        #[cfg(feature = "std")]
506        for slot in self.miss_buffers.iter() {
507            let buf = unsafe { slot.get_mut_unchecked() };
508            if buf.len() > 0 {
509                let batch = buf.drain_to_vec();
510                let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
511            }
512        }
513
514        // Send a Sync command and spin-wait for acknowledgment
515        let ack = OneshotAck::new();
516        self.cmd_tx.send_blocking(Command::Sync(ack.clone()));
517        ack.wait();
518    }
519
520    /// Look up a key.
521    ///
522    /// Hot-path order: T1 (L1 direct-map) → T2 (L2 direct-map) → Cache (L3).
523    /// Records a hit signal into the TLS buffer for Daemon processing.
524    pub fn get(&self, key: &K) -> Option<V> {
525        let hash = self.hash(key);
526        let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
527
528        // ── QSBR Check-in (std only — requires TLS) ───────────────────────
529        #[cfg(feature = "std")]
530        let mut id_opt = None;
531        #[cfg(feature = "std")]
532        {
533            let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
534            WORKER_ID.with(|&id| {
535                if id < self.worker_states.len() {
536                    self.worker_states[id]
537                        .local_epoch
538                        .store(global_epoch, Ordering::Relaxed);
539                    id_opt = Some(id);
540                }
541            });
542        }
543
544        #[cfg(feature = "std")]
545        let has_epoch = id_opt.is_some();
546        #[cfg(not(feature = "std"))]
547        let has_epoch = true;
548
549        let mut res: Option<V> = None;
550        let mut hit_g_idx: Option<u32> = None;
551
552        if has_epoch {
553            // ── T1 check ──────────────────────────────────────────────────────
554            let ptr_t1: *mut crate::storage::Node<K, V> = self.t1.load_slot(hash);
555            if !ptr_t1.is_null() {
556                let node = unsafe { &*ptr_t1 };
557                if node.key == *key
558                    && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
559                {
560                    res = Some(node.value.clone());
561                    hit_g_idx = Some(node.g_idx);
562                }
563            }
564
565            // ── T2 check ──────────────────────────────────────────────────────
566            if res.is_none() {
567                let ptr_t2: *mut crate::storage::Node<K, V> = self.t2.load_slot(hash);
568                if !ptr_t2.is_null() {
569                    let node = unsafe { &*ptr_t2 };
570                    if node.key == *key
571                        && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
572                    {
573                        res = Some(node.value.clone());
574                        hit_g_idx = Some(node.g_idx);
575                    }
576                }
577            }
578
579            // ── Cache (L3) check ──────────────────────────────────────────────
580            if res.is_none() {
581                let tag = (hash >> 48) as u16;
582                if let Some(global_idx) = self.cache.index_probe(hash, tag) {
583                    if let Some(v) = self
584                        .cache
585                        .node_get_full(global_idx, key, current_epoch_cache)
586                    {
587                        res = Some(v);
588                        hit_g_idx = Some(global_idx as u32);
589                    }
590                }
591            }
592        }
593
594        // ── QSBR Check-out (std only) ─────────────────────────────────────
595        #[cfg(feature = "std")]
596        if let Some(id) = id_opt {
597            self.worker_states[id]
598                .local_epoch
599                .store(0, Ordering::Relaxed);
600        }
601
602        if let Some(g_idx) = hit_g_idx {
603            self.record_hit(g_idx as usize);
604        }
605
606        res
607    }
608
609    /// Insert a key-value pair.
610    ///
611    /// # L1 Probation Filter (std only)
612    /// Items that appear only once in a TLS epoch are silently dropped.
613    /// This prevents cache pollution from scan traffic.
614    /// In no_std mode the filter is skipped and all items are forwarded.
615    ///
616    /// # Task 6 — Time-based TLS Flush (std only)
617    /// The TLS batch buffer normally flushes when it reaches 32 items.
618    /// Additionally, if the Daemon tick counter has advanced by at least
619    /// `flush_tick_threshold` since the last flush, the buffer is force-drained
620    /// even if nearly empty. This prevents hot items from being invisible to
621    /// the Daemon for too long (the "split-brain eviction" bug).
622    pub fn insert(&self, key: K, value: V) {
623        let hash = self.hash(&key);
624
625        // ── std path: L1 Probation Filter + TLS batch ─────────────────────
626        #[cfg(feature = "std")]
627        {
628            let is_cold = self.is_cold_start.load(Ordering::Relaxed);
629            let mut bypass = is_cold;
630
631            if !bypass {
632                // Perform thread-safe fast lookup to see if key exists
633                // ── QSBR Check-in ───────────────────────
634                let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
635                let mut id_opt = None;
636                WORKER_ID.with(|&id| {
637                    if id < self.worker_states.len() {
638                        self.worker_states[id]
639                            .local_epoch
640                            .store(global_epoch, Ordering::Relaxed);
641                        id_opt = Some(id);
642                    }
643                });
644
645                if id_opt.is_some() {
646                    // T1 check
647                    let ptr_t1 = self.t1.load_slot(hash);
648                    if !ptr_t1.is_null() {
649                        let node = unsafe { &*ptr_t1 };
650                        if node.key == key {
651                            bypass = true;
652                        }
653                    }
654
655                    // T2 check
656                    if !bypass {
657                        let ptr_t2 = self.t2.load_slot(hash);
658                        if !ptr_t2.is_null() {
659                            let node = unsafe { &*ptr_t2 };
660                            if node.key == key {
661                                bypass = true;
662                            }
663                        }
664                    }
665
666                    // Cache (L3) check
667                    if !bypass {
668                        let tag = (hash >> 48) as u16;
669                        if let Some(global_idx) = self.cache.index_probe(hash, tag) {
670                            let ptr = self.cache.nodes[global_idx].load(Ordering::Acquire);
671                            if !ptr.is_null() {
672                                let node = unsafe { &*ptr };
673                                if node.key == key {
674                                    bypass = true;
675                                }
676                            }
677                        }
678                    }
679                }
680
681                // ── QSBR Check-out ─────────────────────────────────────
682                if let Some(id) = id_opt {
683                    self.worker_states[id]
684                        .local_epoch
685                        .store(0, Ordering::Relaxed);
686                }
687            }
688
689            let pass = if bypass {
690                true
691            } else {
692                // L1 Probation Filter
693                #[cfg(any(feature = "loom", loom))]
694                {
695                    L1_FILTER.with(|f| {
696                        let mut state = f.borrow_mut();
697                        let idx = (hash as usize) & 4095_usize;
698                        let val = state.0[idx];
699
700                        state.1 += 1;
701                        if state.1 >= 4096_usize {
702                            for x in state.0.iter_mut() {
703                                *x >>= 1;
704                            }
705                            state.1 = 0;
706                        }
707
708                        if val < 1_u8 {
709                            state.0[idx] = 1;
710                            false
711                        } else {
712                            if val < 2_u8 {
713                                state.0[idx] = 2;
714                            }
715                            true
716                        }
717                    })
718                }
719
720                #[cfg(not(any(feature = "loom", loom)))]
721                {
722                    L1_FILTER.with(|f: &RefCell<([u8; 4096], usize)>| {
723                        let mut state = f.borrow_mut();
724                        let idx = (hash as usize) & 4095_usize;
725                        let val = state.0[idx];
726
727                        state.1 += 1;
728                        if state.1 >= 4096_usize {
729                            for x in state.0.iter_mut() {
730                                *x >>= 1;
731                            }
732                            state.1 = 0;
733                        }
734
735                        if val < 1_u8 {
736                            state.0[idx] = 1;
737                            false
738                        } else {
739                            if val < 2_u8 {
740                                state.0[idx] = 2;
741                            }
742                            true
743                        }
744                    })
745                }
746            };
747
748            if !pass {
749                return;
750            }
751
752            // Task 6: Time-based flush detection
753            let current_tick = self.daemon_tick.load(Ordering::Relaxed);
754            let should_time_flush = LAST_FLUSH_TICK.with(|c: &Cell<u64>| {
755                current_tick.wrapping_sub(c.get()) >= self.flush_tick_threshold
756            });
757
758            // Worker TLS batch buffer
759            WORKER_ID.with(|&id| {
760                if id >= self.miss_buffers.len() {
761                    // Worker overflow: gracefully degrade to direct send
762                    let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
763                    return;
764                }
765
766                // Safety: WORKER_ID is unique per thread → exclusive slot access
767                let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
768                let capacity_flush = buf.push((key, value, hash));
769
770                if capacity_flush || (should_time_flush && !buf.is_empty()) {
771                    let batch = buf.drain_to_vec();
772                    let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
773                    LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(current_tick));
774                }
775            });
776        }
777
778        // ── no_std path: direct send (no TLS available) ───────────────────
779        #[cfg(not(feature = "std"))]
780        {
781            let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
782        }
783    }
784
785    /// Remove a key from the cache.
786    pub fn remove(&self, key: &K) {
787        let hash = self.hash(key);
788
789        // ── std: flush this thread's buffer first for causal ordering ─────
790        #[cfg(feature = "std")]
791        WORKER_ID.with(|&id| {
792            if id < self.miss_buffers.len() {
793                let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
794                if buf.len() > 0 {
795                    let batch = buf.drain_to_vec();
796                    let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
797                    let tick = self.daemon_tick.load(Ordering::Relaxed);
798                    LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(tick));
799                }
800            }
801        });
802
803        self.cmd_tx.send_blocking(Command::Remove(key.clone(), hash));
804    }
805
806    /// Clear all cached data.
807    pub fn clear(&self) {
808        let ack = OneshotAck::new();
809        self.cmd_tx.send_blocking(Command::Clear(ack.clone()));
810        ack.wait();
811    }
812
813    // ── Internals ─────────────────────────────────────────────────────────
814
815    #[inline(always)]
816    fn hash(&self, key: &K) -> u64 {
817        self.hasher.hash_one(key)
818    }
819
820    /// Buffer a Cache-hit global index for Daemon processing.
821    ///
822    /// std: fills the 64-element TLS array and ships it to `hit_tx` when full.
823    /// no_std: sends directly (no TLS batch buffering available).
824    #[inline(always)]
825    fn record_hit(&self, global_idx: usize) {
826        #[cfg(feature = "std")]
827        HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
828            let mut state = buf.borrow_mut();
829            let idx = state.1;
830            state.0[idx] = global_idx;
831            state.1 += 1;
832            if state.1 == 64_usize {
833                let _ = self.hit_tx.try_send(state.0);
834                state.1 = 0;
835            }
836        });
837
838        // no_std: no TLS; hit signals are not batched.
839        // Daemon still processes hits via hit_rx if sent individually.
840        #[cfg(not(feature = "std"))]
841        {
842            let mut batch = [0usize; 64];
843            batch[0] = global_idx;
844            let _ = self.hit_tx.try_send(batch);
845        }
846    }
847}
848
849impl<K, V, S> Drop for DualCacheFF<K, V, S> {
850    fn drop(&mut self) {
851        if Arc::strong_count(&self.cmd_tx) <= 2 {
852            let _ = self.cmd_tx.try_send(Command::Shutdown);
853        }
854    }
855}
856