Skip to main content

dualcache_ff/
lib.rs

1// ── no_std gate ───────────────────────────────────────────────────────────
2// When the "std" feature is disabled we enter no_std mode.
3// `extern crate alloc` provides Vec, Box, Arc etc. from the allocator crate.
4// The RTOS / bare-metal environment must supply a global allocator.
5#![cfg_attr(not(feature = "std"), no_std)]
6#[cfg(not(feature = "std"))]
7extern crate alloc;
8
9// ── Public sub-modules ────────────────────────────────────────────────────
10pub mod arena;
11pub mod cache_padded;
12pub mod daemon;
13pub mod filters;
14pub mod lossy_queue;
15pub mod storage;
16pub mod unsafe_core;
17pub mod workers;
18
19// ── Internal sync abstraction ─────────────────────────────────────────────
20/// Type-routing module: selects the correct `Arc` implementation based on
21/// the active feature flags.
22///
23/// | Feature    | Source               |
24/// |------------|----------------------|
25/// | `std`      | `std::sync::Arc`     |
26/// | `loom`     | `loom::sync::Arc`    |
27/// | _(neither)_| `alloc::sync::Arc`   |
28pub(crate) mod sync {
29    #[cfg(all(feature = "std", not(feature = "loom")))]
30    pub use std::sync::Arc;
31
32    #[cfg(feature = "loom")]
33    pub use loom::sync::Arc;
34
35    #[cfg(not(any(feature = "std", feature = "loom")))]
36    pub use alloc::sync::Arc;
37}
38
39// ── Imports ───────────────────────────────────────────────────────────────
40#[cfg(not(feature = "std"))]
41use alloc::vec::Vec;
42
43use crate::cache_padded::CachePadded;
44use crate::daemon::{Command, Daemon};
45use crate::lossy_queue::{LossyQueue, OneshotAck};
46use crate::unsafe_core::{Cache, T1, T2, WorkerSlot};
47use ahash::RandomState;
48use core::hash::{BuildHasher, Hash};
49use core::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
50use sync::Arc;
51
52// ── Config ────────────────────────────────────────────────────────────────
53
54pub struct Config {
55    pub capacity: usize,
56    pub t1_slots: usize,
57    pub t2_slots: usize,
58    /// TTL duration in epoch ticks (one tick ≈ 100 ms in std mode).
59    pub duration: u32,
60    pub threads: usize,
61    /// Daemon poll interval in **microseconds** (1 000–10 000 µs = 1–10 ms).
62    /// Controls the latency vs. idle-CPU trade-off.
63    /// Lower = faster hit-signal delivery, higher CPU when idle.
64    /// Higher = more efficient idle, but hotter TLS buffers may stall longer.
65    pub poll_us: u64,
66    /// TLS flush threshold in **daemon ticks**.
67    /// A Worker forces a TLS buffer flush when it detects that the Daemon
68    /// tick counter has advanced by at least this many ticks since the last
69    /// flush, regardless of buffer fill level.
70    ///
71    /// Rule of thumb: `flush_tick_threshold ≈ 1ms / poll_us`.
72    /// For poll_us = 1 000 µs (1 ms): threshold = 1.
73    /// For poll_us = 5 000 µs (5 ms): threshold = 1 (flush every 5 ms).
74    pub flush_tick_threshold: u64,
75}
76
77impl Config {
78    /// Budget-based constructor: specify RAM and TTL, the engine picks sizes.
79    pub fn with_memory_budget(ram_mb: usize, duration: u32) -> Self {
80        // Assume total overhead per item is ~128 bytes
81        let raw_capacity = (ram_mb * 1024 * 1024) / 128;
82        let capacity = raw_capacity.next_power_of_two().max(256);
83
84        Self {
85            capacity,
86            // T1 fits in L1 cache: max 2048 × 8-byte pointers = 16 KB
87            t1_slots: 2048,
88            // T2 intercepts warm data: 20% of capacity (80/20 rule)
89            t2_slots: (capacity / 5).next_power_of_two().max(4096),
90            duration,
91            #[cfg(feature = "std")]
92            threads: std::thread::available_parallelism()
93                .map(|p| p.get())
94                .unwrap_or(16),
95            #[cfg(not(feature = "std"))]
96            threads: 8,
97            poll_us: 1_000,
98            flush_tick_threshold: 1,
99        }
100    }
101
102    /// Expert constructor with explicit physical-law assertions.
103    pub fn new_expert(
104        capacity: usize,
105        t1_slots: usize,
106        t2_slots: usize,
107        duration: u32,
108        threads: usize,
109    ) -> Self {
110        // Physical Law 1: Bitmask routing requires powers of two
111        assert!(capacity.is_power_of_two(), "Capacity MUST be a power of two");
112        assert!(t1_slots.is_power_of_two(), "T1 slots MUST be a power of two");
113        assert!(t2_slots.is_power_of_two(), "T2 slots MUST be a power of two");
114
115        // Physical Law 2: T1 absolutely cannot exceed L1 cache
116        assert!(
117            t1_slots <= 4096,
118            "T1 size exceeds L1 Cache physical limits! Max slots: 4096"
119        );
120
121        Self {
122            capacity,
123            t1_slots,
124            t2_slots,
125            duration,
126            threads,
127            poll_us: 1_000,
128            flush_tick_threshold: 1,
129        }
130    }
131
132    /// Builder: set Daemon poll interval (1 000–10 000 µs).
133    pub fn with_poll_us(mut self, poll_us: u64) -> Self {
134        self.poll_us = poll_us.clamp(1_000, 10_000);
135        self
136    }
137
138    /// Builder: set TLS flush threshold in daemon ticks.
139    pub fn with_flush_tick_threshold(mut self, ticks: u64) -> Self {
140        self.flush_tick_threshold = ticks.max(1);
141        self
142    }
143}
144
145// ── QSBR global epoch ─────────────────────────────────────────────────────
146
147/// Global QSBR epoch. Daemon increments this every maintenance cycle.
148/// Workers store their local epoch on `get()` entry and reset to 0 on exit,
149/// allowing Daemon to safely reclaim stale pointers.
150pub static GLOBAL_EPOCH: AtomicUsize = AtomicUsize::new(1);
151
152/// Per-worker QSBR state — cache-line padded to prevent false sharing
153/// between workers checking in/out simultaneously.
154pub struct WorkerState {
155    pub local_epoch: CachePadded<AtomicUsize>,
156}
157
158impl WorkerState {
159    pub fn new() -> Self {
160        Self {
161            local_epoch: CachePadded::new(AtomicUsize::new(0)),
162        }
163    }
164}
165
166// ── Thread-local state (std only) ────────────────────────────────────────
167// In no_std / RTOS mode, TLS is not available. Worker state must be
168// managed by the application (e.g. passed as function arguments or stored
169// in RTOS task-local storage). The cache's `get` / `insert` / `remove`
170// methods fall back to safe, lock-free direct-send paths in no_std mode.
171
172#[cfg(feature = "std")]
173/// Unique monotonic worker ID assigned once per thread on first use.
174static NEXT_THREAD_ID: AtomicUsize = AtomicUsize::new(0);
175
176#[cfg(feature = "std")]
177use core::cell::{Cell, RefCell};
178
179#[cfg(feature = "std")]
180thread_local! {
181    static WORKER_ID: usize = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
182
183    /// Hit index buffer: batches 64 Cache-hit global indices before sending
184    /// to Daemon via the hit queue.
185    static HIT_BUF: RefCell<([usize; 64], usize)> = const { RefCell::new(([0; 64], 0)) };
186
187    /// TLS probation filter: prevents single-hit items from reaching the
188    /// Arena. A 4 KB sketch that decays periodically.
189    static L1_FILTER: RefCell<([u8; 4096], usize)> = const { RefCell::new(([0; 4096], 0)) };
190
191    /// Task 6 — last daemon_tick observed at TLS flush time.
192    /// When `daemon_tick - LAST_FLUSH_TICK >= flush_tick_threshold`, the
193    /// Worker force-drains its TLS buffer even if it is not full.
194    static LAST_FLUSH_TICK: Cell<u64> = Cell::new(0);
195}
196
197// ── DualCacheFF ───────────────────────────────────────────────────────────
198
199pub struct DualCacheFF<K, V, S = RandomState> {
200    pub hasher: S,
201    pub t1: Arc<T1<K, V>>,
202    pub t2: Arc<T2<K, V>>,
203    pub cache: Arc<Cache<K, V>>,
204    pub cmd_tx: Arc<LossyQueue<Command<K, V>>>,
205    pub hit_tx: Arc<LossyQueue<[usize; 64]>>,
206    pub epoch: Arc<AtomicU32>,
207    /// QSBR registry: one entry per thread slot.
208    pub worker_states: Arc<[WorkerState]>,
209    /// Per-worker zero-lock batch buffers, indexed by WORKER_ID.
210    pub miss_buffers: Arc<[WorkerSlot<K, V>]>,
211    /// Daemon tick counter — shared with the Daemon thread.
212    /// Workers read this (Relaxed) to implement time-based TLS flush.
213    pub daemon_tick: Arc<AtomicU64>,
214    /// Number of daemon_tick advances that correspond to ≈1 ms of real time.
215    pub flush_tick_threshold: u64,
216}
217
218impl<K, V, S: Clone> Clone for DualCacheFF<K, V, S> {
219    fn clone(&self) -> Self {
220        Self {
221            hasher: self.hasher.clone(),
222            t1: self.t1.clone(),
223            t2: self.t2.clone(),
224            cache: self.cache.clone(),
225            cmd_tx: self.cmd_tx.clone(),
226            hit_tx: self.hit_tx.clone(),
227            epoch: self.epoch.clone(),
228            worker_states: self.worker_states.clone(),
229            miss_buffers: self.miss_buffers.clone(),
230            daemon_tick: self.daemon_tick.clone(),
231            flush_tick_threshold: self.flush_tick_threshold,
232        }
233    }
234}
235
236// ── Constructor (std mode — auto-spawns Daemon thread) ────────────────────
237
238#[cfg(feature = "std")]
239impl<K, V> DualCacheFF<K, V, RandomState>
240where
241    K: Hash + Eq + Send + Sync + Clone + 'static,
242    V: Send + Sync + Clone + 'static,
243{
244    /// Create a new `DualCacheFF` and automatically spawn the background Daemon.
245    ///
246    /// Use this in `std` environments (servers, desktops).
247    pub fn new(config: Config) -> Self {
248        let (cache, daemon) = Self::new_headless(config);
249        std::thread::spawn(move || daemon.run());
250        cache
251    }
252}
253
254// ── Constructor (universal — returns Daemon for manual scheduling) ─────────
255
256impl<K, V> DualCacheFF<K, V, RandomState>
257where
258    K: Hash + Eq + Send + Sync + Clone + 'static,
259    V: Send + Sync + Clone + 'static,
260{
261    /// Create a `DualCacheFF` and its `Daemon` without spawning any thread.
262    ///
263    /// # std mode
264    /// Prefer `DualCacheFF::new()` which spawns the daemon automatically.
265    ///
266    /// # no_std / RTOS mode
267    /// Use `new_headless()` to obtain both the cache handle and the daemon.
268    /// Schedule `daemon.run()` on a dedicated RTOS task:
269    /// ```ignore
270    /// let (cache, daemon) = DualCacheFF::new_headless(config);
271    /// rtos::spawn_task(|| daemon.run()); // RTOS-specific API
272    /// ```
273    pub fn new_headless(config: Config) -> (Self, Daemon<K, V, RandomState>) {
274        let hasher = RandomState::new();
275        let t1 = Arc::new(T1::new(config.t1_slots));
276        let t2 = Arc::new(T2::new(config.t2_slots));
277        let cache = Arc::new(Cache::new(config.capacity));
278        let cmd_q: Arc<LossyQueue<Command<K, V>>> = Arc::new(LossyQueue::new(8192));
279        let hit_q: Arc<LossyQueue<[usize; 64]>> = Arc::new(LossyQueue::new(1024));
280        let epoch = Arc::new(AtomicU32::new(0));
281        let daemon_tick = Arc::new(AtomicU64::new(0));
282
283        let mut buffers = Vec::with_capacity(config.threads);
284        let mut states = Vec::with_capacity(config.threads);
285        for _ in 0..config.threads {
286            buffers.push(WorkerSlot::new());
287            states.push(WorkerState::new());
288        }
289        let miss_buffers: Arc<[_]> = buffers.into_boxed_slice().into();
290        let worker_states: Arc<[_]> = states.into_boxed_slice().into();
291
292        let daemon = Daemon::new(
293            hasher.clone(),
294            config.capacity,
295            t1.clone(),
296            t2.clone(),
297            cache.clone(),
298            cmd_q.clone(),
299            hit_q.clone(),
300            epoch.clone(),
301            config.duration,
302            config.poll_us,
303            worker_states.clone(),
304            daemon_tick.clone(),
305        );
306
307        let this = Self {
308            hasher,
309            t1,
310            t2,
311            cache,
312            cmd_tx: cmd_q,
313            hit_tx: hit_q,
314            epoch,
315            worker_states,
316            miss_buffers,
317            daemon_tick,
318            flush_tick_threshold: config.flush_tick_threshold,
319        };
320
321        (this, daemon)
322    }
323}
324
325// ── Public API (std + no_std) ─────────────────────────────────────────────
326
327impl<K, V, S> DualCacheFF<K, V, S>
328where
329    K: Hash + Eq + Send + Sync + Clone + 'static,
330    V: Send + Sync + Clone + 'static,
331    S: BuildHasher + Clone + Send + 'static,
332{
333    /// Flush all pending TLS buffers and wait for the Daemon to process them.
334    ///
335    /// Blocks via `OneshotAck::wait()` (spin-wait, safe in both std and no_std).
336    pub fn sync(&self) {
337        // ── std: flush TLS hit buffer ─────────────────────────────────────
338        #[cfg(feature = "std")]
339        HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
340            let mut state = buf.borrow_mut();
341            if state.1 > 0_usize {
342                let _ = self.hit_tx.try_send(state.0);
343                state.1 = 0;
344            }
345        });
346
347        // ── std: flush all worker slots ───────────────────────────────────
348        #[cfg(feature = "std")]
349        for slot in self.miss_buffers.iter() {
350            let buf = unsafe { slot.get_mut_unchecked() };
351            if buf.len() > 0 {
352                let batch = buf.drain_to_vec();
353                let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
354            }
355        }
356
357        // Send a Sync command and spin-wait for acknowledgment
358        let ack = OneshotAck::new();
359        self.cmd_tx.send_blocking(Command::Sync(ack.clone()));
360        ack.wait();
361    }
362
363    /// Look up a key.
364    ///
365    /// Hot-path order: T1 (L1 direct-map) → T2 (L2 direct-map) → Cache (L3).
366    /// Records a hit signal into the TLS buffer for Daemon processing.
367    pub fn get(&self, key: &K) -> Option<V> {
368        let hash = self.hash(key);
369        let current_epoch_cache = self.epoch.load(Ordering::Relaxed);
370
371        // ── QSBR Check-in (std only — requires TLS) ───────────────────────
372        #[cfg(feature = "std")]
373        {
374            let global_epoch = GLOBAL_EPOCH.load(Ordering::Relaxed);
375            WORKER_ID.with(|&id| {
376                if id < self.worker_states.len() {
377                    self.worker_states[id]
378                        .local_epoch
379                        .store(global_epoch, Ordering::Relaxed);
380                }
381            });
382        }
383
384        let mut res: Option<V> = None;
385        let mut hit_g_idx: Option<u32> = None;
386
387        // ── T1 check ──────────────────────────────────────────────────────
388        let ptr_t1: *mut crate::storage::Node<K, V> = self.t1.load_slot(hash);
389        if !ptr_t1.is_null() {
390            let node = unsafe { &*ptr_t1 };
391            if node.key == *key
392                && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
393            {
394                res = Some(node.value.clone());
395                hit_g_idx = Some(node.g_idx);
396            }
397        }
398
399        // ── T2 check ──────────────────────────────────────────────────────
400        if res.is_none() {
401            let ptr_t2: *mut crate::storage::Node<K, V> = self.t2.load_slot(hash);
402            if !ptr_t2.is_null() {
403                let node = unsafe { &*ptr_t2 };
404                if node.key == *key
405                    && (node.expire_at == 0 || node.expire_at >= current_epoch_cache)
406                {
407                    res = Some(node.value.clone());
408                    hit_g_idx = Some(node.g_idx);
409                }
410            }
411        }
412
413        // ── Cache (L3) check ──────────────────────────────────────────────
414        if res.is_none() {
415            let tag = (hash >> 48) as u16;
416            if let Some(global_idx) = self.cache.index_probe(hash, tag) {
417                if let Some(v) = self
418                    .cache
419                    .node_get_full(global_idx, key, current_epoch_cache)
420                {
421                    res = Some(v);
422                    hit_g_idx = Some(global_idx as u32);
423                }
424            }
425        }
426
427        // ── QSBR Check-out (std only) ─────────────────────────────────────
428        #[cfg(feature = "std")]
429        WORKER_ID.with(|&id| {
430            if id < self.worker_states.len() {
431                self.worker_states[id]
432                    .local_epoch
433                    .store(0, Ordering::Relaxed);
434            }
435        });
436
437        if let Some(g_idx) = hit_g_idx {
438            self.record_hit(g_idx as usize);
439        }
440
441        res
442    }
443
444    /// Insert a key-value pair.
445    ///
446    /// # L1 Probation Filter (std only)
447    /// Items that appear only once in a TLS epoch are silently dropped.
448    /// This prevents cache pollution from scan traffic.
449    /// In no_std mode the filter is skipped and all items are forwarded.
450    ///
451    /// # Task 6 — Time-based TLS Flush (std only)
452    /// The TLS batch buffer normally flushes when it reaches 32 items.
453    /// Additionally, if the Daemon tick counter has advanced by at least
454    /// `flush_tick_threshold` since the last flush, the buffer is force-drained
455    /// even if nearly empty. This prevents hot items from being invisible to
456    /// the Daemon for too long (the "split-brain eviction" bug).
457    pub fn insert(&self, key: K, value: V) {
458        let hash = self.hash(&key);
459
460        // ── std path: L1 Probation Filter + TLS batch ─────────────────────
461        #[cfg(feature = "std")]
462        {
463            // L1 Probation Filter
464            let pass = L1_FILTER.with(|f: &RefCell<([u8; 4096], usize)>| {
465                let mut state = f.borrow_mut();
466                let idx = (hash as usize) & 4095_usize;
467                let val = state.0[idx];
468
469                state.1 += 1;
470                if state.1 >= 4096_usize {
471                    for x in state.0.iter_mut() {
472                        *x >>= 1;
473                    }
474                    state.1 = 0;
475                }
476
477                if val < 1_u8 {
478                    state.0[idx] = 1;
479                    false
480                } else {
481                    if val < 2_u8 {
482                        state.0[idx] = 2;
483                    }
484                    true
485                }
486            });
487
488            if !pass {
489                return;
490            }
491
492            // Task 6: Time-based flush detection
493            let current_tick = self.daemon_tick.load(Ordering::Relaxed);
494            let should_time_flush = LAST_FLUSH_TICK.with(|c: &Cell<u64>| {
495                current_tick.wrapping_sub(c.get()) >= self.flush_tick_threshold
496            });
497
498            // Worker TLS batch buffer
499            WORKER_ID.with(|&id| {
500                if id >= self.miss_buffers.len() {
501                    // Worker overflow: gracefully degrade to direct send
502                    let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
503                    return;
504                }
505
506                // Safety: WORKER_ID is unique per thread → exclusive slot access
507                let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
508                let capacity_flush = buf.push((key, value, hash));
509
510                if capacity_flush || (should_time_flush && !buf.is_empty()) {
511                    let batch = buf.drain_to_vec();
512                    let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
513                    LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(current_tick));
514                }
515            });
516        }
517
518        // ── no_std path: direct send (no TLS available) ───────────────────
519        #[cfg(not(feature = "std"))]
520        {
521            let _ = self.cmd_tx.try_send(Command::Insert(key, value, hash));
522        }
523    }
524
525    /// Remove a key from the cache.
526    pub fn remove(&self, key: &K) {
527        let hash = self.hash(key);
528
529        // ── std: flush this thread's buffer first for causal ordering ─────
530        #[cfg(feature = "std")]
531        WORKER_ID.with(|&id| {
532            if id < self.miss_buffers.len() {
533                let buf = unsafe { self.miss_buffers[id].get_mut_unchecked() };
534                if buf.len() > 0 {
535                    let batch = buf.drain_to_vec();
536                    let _ = self.cmd_tx.try_send(Command::BatchInsert(batch));
537                    let tick = self.daemon_tick.load(Ordering::Relaxed);
538                    LAST_FLUSH_TICK.with(|c: &Cell<u64>| c.set(tick));
539                }
540            }
541        });
542
543        self.cmd_tx.send_blocking(Command::Remove(key.clone(), hash));
544    }
545
546    /// Clear all cached data.
547    pub fn clear(&self) {
548        let ack = OneshotAck::new();
549        self.cmd_tx.send_blocking(Command::Clear(ack.clone()));
550        ack.wait();
551    }
552
553    // ── Internals ─────────────────────────────────────────────────────────
554
555    #[inline(always)]
556    fn hash(&self, key: &K) -> u64 {
557        self.hasher.hash_one(key)
558    }
559
560    /// Buffer a Cache-hit global index for Daemon processing.
561    ///
562    /// std: fills the 64-element TLS array and ships it to `hit_tx` when full.
563    /// no_std: sends directly (no TLS batch buffering available).
564    #[inline(always)]
565    fn record_hit(&self, global_idx: usize) {
566        #[cfg(feature = "std")]
567        HIT_BUF.with(|buf: &RefCell<([usize; 64], usize)>| {
568            let mut state = buf.borrow_mut();
569            let idx = state.1;
570            state.0[idx] = global_idx;
571            state.1 += 1;
572            if state.1 == 64_usize {
573                let _ = self.hit_tx.try_send(state.0);
574                state.1 = 0;
575            }
576        });
577
578        // no_std: no TLS; hit signals are not batched.
579        // Daemon still processes hits via hit_rx if sent individually.
580        #[cfg(not(feature = "std"))]
581        {
582            let mut batch = [0usize; 64];
583            batch[0] = global_idx;
584            let _ = self.hit_tx.try_send(batch);
585        }
586    }
587}