Skip to main content

reddb_server/storage/cache/
promotion_pool.rs

1//! Async promotion pool — turbo module for off-CPU L1 cache promotion.
2//!
3//! # Why this module exists
4//!
5//! `BlobCache::get` is on the read hot path. Today, an L2 hit synchronously
6//! promotes the blob to L1 (mutates `RwLock`s, runs admission policy, fires
7//! eviction loops). That promotion work — bookkeeping, not the actual byte
8//! transfer — adds tens of microseconds to every L2 hit.
9//!
10//! `AsyncPromotionPool` decouples the two: `get` decides "this should go to
11//! L1" and hands the request to a bounded queue, then returns the bytes to
12//! the caller immediately. A small worker pool drains the queue and performs
13//! the promotion off the read path.
14//!
15//! Inspired by Postgres's `bgwriter` and Linux's `kswapd`: the slow,
16//! housekeeping work belongs on a dedicated thread, not in the hot path.
17//!
18//! # Design
19//!
20//! - **Bounded queue** (`crossbeam::queue::ArrayQueue`) — back-pressure
21//!   without unbounded memory growth.
22//! - **Drop-oldest on saturation** — when the queue is full we evict the
23//!   oldest pending request and admit the new one. Rationale: under load
24//!   the freshest accesses are the most likely to be re-read soon, so a
25//!   FIFO drop loses the least value.
26//! - **Decoupled executor** — the closure that performs the actual
27//!   promotion is injected at construction (`new_with_executor`). The pool
28//!   knows nothing about `BlobCache` and is therefore unit-testable in
29//!   isolation.
30//! - **Atomic metrics** — counters are `Relaxed`-incremented by hot paths;
31//!   `metrics()` returns a consistent snapshot.
32//! - **Graceful shutdown** — `shutdown` flips a flag, lets workers drain a
33//!   bounded number of remaining requests, then they exit.
34//!
35//! # Wiring (deferred)
36//!
37//! This file is purely additive. Wiring into `BlobCache::get` and the
38//! `pub mod promotion_pool;` registration in `mod.rs` happen in a
39//! sequential follow-up, after all three turbo modules from issue #193
40//! have shipped.
41
42use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
43use std::sync::Arc;
44use std::time::Duration;
45
46use crossbeam_queue::ArrayQueue;
47
48use super::blob::BlobCachePolicy;
49
50// ---------------------------------------------------------------------------
51// Public surface
52// ---------------------------------------------------------------------------
53
54/// Configuration for the promotion pool.
55#[derive(Debug, Clone, Copy)]
56pub struct PoolOpts {
57    /// Maximum number of pending promotion requests. When full, the pool
58    /// drops the oldest entry to admit the newest (see `ScheduleOutcome`).
59    pub queue_capacity: usize,
60    /// Number of tokio worker tasks draining the queue.
61    pub worker_count: usize,
62}
63
64impl Default for PoolOpts {
65    fn default() -> Self {
66        Self {
67            queue_capacity: 1024,
68            worker_count: 2,
69        }
70    }
71}
72
73/// A single async promotion request handed to the pool by `BlobCache::get`
74/// (or, in tests, by the test harness).
75///
76/// `bytes` is `Arc<[u8]>` so that the same buffer the caller is returning
77/// to the user is shared zero-copy with the L1 promotion.
78#[derive(Debug, Clone)]
79pub struct PromotionRequest {
80    pub namespace: String,
81    pub key: String,
82    pub bytes: Arc<[u8]>,
83    pub policy: BlobCachePolicy,
84}
85
86/// Result of `AsyncPromotionPool::schedule`.
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum ScheduleOutcome {
89    /// Request accepted into the queue.
90    Queued,
91    /// Queue was full. `evicted_oldest = true` means we dropped the oldest
92    /// pending request to admit this one. `evicted_oldest = false` means the
93    /// queue was so contended even the eviction `pop` failed and *this*
94    /// request was dropped instead.
95    DroppedQueueFull { evicted_oldest: bool },
96}
97
98/// Snapshot of the pool's atomic counters. Returned by `metrics()`.
99#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
100pub struct PromotionMetrics {
101    pub queued_total: u64,
102    pub dropped_total: u64,
103    pub completed_total: u64,
104    pub queue_depth: usize,
105}
106
107/// The closure that actually performs the L1 promotion. Injected at
108/// construction so this module has no compile-time dependency on
109/// `BlobCache`.
110///
111/// The wiring slice will pass a closure of the form:
112///
113/// ```ignore
114/// let cache = self.clone();
115/// Arc::new(move |req| cache.do_l1_promotion(req))
116/// ```
117pub type PromotionExecutor =
118    Arc<dyn Fn(PromotionRequest) -> Result<(), String> + Send + Sync + 'static>;
119
120// ---------------------------------------------------------------------------
121// AsyncPromotionPool
122// ---------------------------------------------------------------------------
123
124/// Bounded, drop-oldest async promotion pool.
125///
126/// See module docs for design rationale.
127pub struct AsyncPromotionPool {
128    queue: Arc<ArrayQueue<PromotionRequest>>,
129    executor: PromotionExecutor,
130    shutdown: Arc<AtomicBool>,
131
132    queued_total: AtomicU64,
133    dropped_total: AtomicU64,
134    completed_total: AtomicU64,
135
136    /// Soft cap on how many remaining requests workers will drain after
137    /// `shutdown()` is called before exiting. Prevents a pathological
138    /// flood of late requests from blocking shutdown indefinitely.
139    drain_budget: usize,
140}
141
142impl AsyncPromotionPool {
143    /// Construct a pool with a no-op executor. Useful only for tests / dry
144    /// runs where you want metrics but no actual promotion side-effects.
145    pub fn new(opts: PoolOpts) -> Arc<Self> {
146        Self::new_with_executor(opts, Arc::new(|_| Ok(())))
147    }
148
149    /// Construct a pool with a caller-provided executor closure.
150    ///
151    /// Spawns `opts.worker_count` tokio tasks that drain the queue. Each
152    /// task holds a `Weak<Self>` so the pool is dropped cleanly once the
153    /// caller releases its `Arc` and the workers exit.
154    pub fn new_with_executor(opts: PoolOpts, executor: PromotionExecutor) -> Arc<Self> {
155        let capacity = opts.queue_capacity.max(1);
156        let workers = opts.worker_count.max(1);
157
158        let pool = Arc::new(Self {
159            queue: Arc::new(ArrayQueue::new(capacity)),
160            executor,
161            shutdown: Arc::new(AtomicBool::new(false)),
162            queued_total: AtomicU64::new(0),
163            dropped_total: AtomicU64::new(0),
164            completed_total: AtomicU64::new(0),
165            // Drain at most one queue-worth of late requests per worker.
166            drain_budget: capacity,
167        });
168
169        for _ in 0..workers {
170            let pool_for_worker = Arc::clone(&pool);
171            tokio::spawn(async move {
172                worker_loop(pool_for_worker).await;
173            });
174        }
175
176        pool
177    }
178
179    /// Hand a promotion request to the pool.
180    ///
181    /// Never blocks. If the queue has capacity, the request is enqueued and
182    /// `Queued` is returned. If the queue is full, the oldest request is
183    /// popped (and dropped) to make room — the caller learns this via
184    /// `DroppedQueueFull { evicted_oldest: true }`. In the rare case where
185    /// the queue is so contended that even the `pop` fails, the *new*
186    /// request is dropped: `DroppedQueueFull { evicted_oldest: false }`.
187    pub fn schedule(&self, request: PromotionRequest) -> ScheduleOutcome {
188        // After shutdown, refuse new work to keep the drain bounded.
189        if self.shutdown.load(Ordering::Acquire) {
190            self.dropped_total.fetch_add(1, Ordering::Relaxed);
191            return ScheduleOutcome::DroppedQueueFull {
192                evicted_oldest: false,
193            };
194        }
195
196        match self.queue.push(request) {
197            Ok(()) => {
198                self.queued_total.fetch_add(1, Ordering::Relaxed);
199                ScheduleOutcome::Queued
200            }
201            Err(rejected) => {
202                // Full. Try to evict oldest to admit the newest.
203                let evicted_oldest = self.queue.pop().is_some();
204                if evicted_oldest {
205                    self.dropped_total.fetch_add(1, Ordering::Relaxed);
206                }
207                match self.queue.push(rejected) {
208                    Ok(()) => {
209                        self.queued_total.fetch_add(1, Ordering::Relaxed);
210                        ScheduleOutcome::DroppedQueueFull { evicted_oldest }
211                    }
212                    Err(_) => {
213                        // Lost the race — another producer refilled the
214                        // slot before us. Drop the new request.
215                        self.dropped_total.fetch_add(1, Ordering::Relaxed);
216                        ScheduleOutcome::DroppedQueueFull {
217                            evicted_oldest: false,
218                        }
219                    }
220                }
221            }
222        }
223    }
224
225    /// Signal workers to drain remaining work and exit.
226    ///
227    /// Workers will process at most `drain_budget` more requests after the
228    /// shutdown flag is observed, then return. New `schedule` calls after
229    /// shutdown are rejected (counted in `dropped_total`).
230    pub fn shutdown(self: Arc<Self>) {
231        self.shutdown.store(true, Ordering::Release);
232    }
233
234    /// Snapshot of the pool's atomic counters.
235    ///
236    /// Each counter is read with `Relaxed` ordering; the snapshot is not
237    /// strictly atomic across counters (it can show, e.g., one more
238    /// `queued_total` than the queue depth implies if a worker is mid-pop).
239    /// This is acceptable for monitoring; consumers that need a strictly
240    /// consistent view should sample twice and take the difference.
241    pub fn metrics(&self) -> PromotionMetrics {
242        PromotionMetrics {
243            queued_total: self.queued_total.load(Ordering::Relaxed),
244            dropped_total: self.dropped_total.load(Ordering::Relaxed),
245            completed_total: self.completed_total.load(Ordering::Relaxed),
246            queue_depth: self.queue.len(),
247        }
248    }
249}
250
251// ---------------------------------------------------------------------------
252// Worker loop
253// ---------------------------------------------------------------------------
254
255/// Idle backoff between empty polls. Short enough that latency stays low,
256/// long enough that an idle pool doesn't spin a CPU.
257const WORKER_IDLE_BACKOFF: Duration = Duration::from_millis(1);
258
259async fn worker_loop(pool: Arc<AsyncPromotionPool>) {
260    loop {
261        match pool.queue.pop() {
262            Some(req) => {
263                // Run the executor. We swallow errors here because the
264                // promotion is best-effort by definition — the read path
265                // already handed bytes to the user. Errors are surfaced
266                // via tracing for observability.
267                if let Err(err) = (pool.executor)(req) {
268                    tracing::warn!(error = %err, "async promotion executor failed");
269                }
270                pool.completed_total.fetch_add(1, Ordering::Relaxed);
271            }
272            None => {
273                // Queue empty.
274                if pool.shutdown.load(Ordering::Acquire) {
275                    // Drain at most `drain_budget` more items (in case a
276                    // late `schedule` slipped in before the shutdown flag
277                    // was published) and exit.
278                    let mut drained = 0;
279                    while drained < pool.drain_budget {
280                        match pool.queue.pop() {
281                            Some(req) => {
282                                let _ = (pool.executor)(req);
283                                pool.completed_total.fetch_add(1, Ordering::Relaxed);
284                                drained += 1;
285                            }
286                            None => break,
287                        }
288                    }
289                    return;
290                }
291                tokio::time::sleep(WORKER_IDLE_BACKOFF).await;
292            }
293        }
294    }
295}
296
297// ---------------------------------------------------------------------------
298// Tests
299// ---------------------------------------------------------------------------
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304    use std::sync::atomic::AtomicUsize;
305    use std::sync::Mutex;
306    use std::time::Instant;
307
308    fn req(key: &str) -> PromotionRequest {
309        PromotionRequest {
310            namespace: "ns".to_string(),
311            key: key.to_string(),
312            bytes: Arc::from(vec![0u8; 8].into_boxed_slice()),
313            policy: BlobCachePolicy::default(),
314        }
315    }
316
317    /// Build a pool whose executor never runs (no workers spawned by us);
318    /// used for queue-semantics tests where worker drain would race.
319    ///
320    /// We do this by constructing the pool manually rather than going
321    /// through `new_with_executor`, which always spawns workers. For pure
322    /// queue-semantics tests we want zero concurrent draining.
323    fn pool_no_workers(capacity: usize) -> Arc<AsyncPromotionPool> {
324        Arc::new(AsyncPromotionPool {
325            queue: Arc::new(ArrayQueue::new(capacity)),
326            executor: Arc::new(|_| Ok(())),
327            shutdown: Arc::new(AtomicBool::new(false)),
328            queued_total: AtomicU64::new(0),
329            dropped_total: AtomicU64::new(0),
330            completed_total: AtomicU64::new(0),
331            drain_budget: capacity,
332        })
333    }
334
335    #[test]
336    fn schedule_returns_queued_when_capacity_available() {
337        let pool = pool_no_workers(4);
338        assert_eq!(pool.schedule(req("a")), ScheduleOutcome::Queued);
339        assert_eq!(pool.schedule(req("b")), ScheduleOutcome::Queued);
340        assert_eq!(pool.metrics().queued_total, 2);
341        assert_eq!(pool.metrics().queue_depth, 2);
342    }
343
344    #[test]
345    fn schedule_drops_oldest_when_saturated() {
346        let pool = pool_no_workers(2);
347        assert_eq!(pool.schedule(req("a")), ScheduleOutcome::Queued);
348        assert_eq!(pool.schedule(req("b")), ScheduleOutcome::Queued);
349
350        let outcome = pool.schedule(req("c"));
351        assert_eq!(
352            outcome,
353            ScheduleOutcome::DroppedQueueFull {
354                evicted_oldest: true
355            }
356        );
357        assert_eq!(pool.metrics().dropped_total, 1);
358        assert_eq!(pool.metrics().queue_depth, 2);
359    }
360
361    #[test]
362    fn drop_oldest_semantics_preserve_newest() {
363        // Insert N+1 items into a capacity-N queue. The oldest must be
364        // gone, the newest must survive.
365        let cap = 3;
366        let pool = pool_no_workers(cap);
367
368        for k in ["a", "b", "c"] {
369            assert_eq!(pool.schedule(req(k)), ScheduleOutcome::Queued);
370        }
371        // Saturating insert.
372        assert_eq!(
373            pool.schedule(req("d")),
374            ScheduleOutcome::DroppedQueueFull {
375                evicted_oldest: true
376            }
377        );
378
379        // Drain in FIFO order and check contents.
380        let mut seen = Vec::new();
381        while let Some(r) = pool.queue.pop() {
382            seen.push(r.key);
383        }
384        assert_eq!(
385            seen,
386            vec!["b".to_string(), "c".to_string(), "d".to_string()]
387        );
388    }
389
390    #[tokio::test]
391    async fn worker_executes_injected_closure() {
392        let counter = Arc::new(AtomicUsize::new(0));
393        let counter_for_exec = Arc::clone(&counter);
394        let executor: PromotionExecutor = Arc::new(move |_req| {
395            counter_for_exec.fetch_add(1, Ordering::Relaxed);
396            Ok(())
397        });
398
399        let pool = AsyncPromotionPool::new_with_executor(
400            PoolOpts {
401                queue_capacity: 16,
402                worker_count: 1,
403            },
404            executor,
405        );
406
407        for k in 0..5 {
408            pool.schedule(req(&format!("k{k}")));
409        }
410
411        // Wait for workers to drain.
412        let deadline = Instant::now() + Duration::from_secs(2);
413        while counter.load(Ordering::Relaxed) < 5 && Instant::now() < deadline {
414            tokio::time::sleep(Duration::from_millis(5)).await;
415        }
416
417        assert_eq!(counter.load(Ordering::Relaxed), 5);
418        assert_eq!(pool.metrics().completed_total, 5);
419
420        Arc::clone(&pool).shutdown();
421    }
422
423    #[tokio::test]
424    async fn shutdown_drains_queue_within_budget() {
425        let executed = Arc::new(AtomicUsize::new(0));
426        let executed_for_exec = Arc::clone(&executed);
427        let executor: PromotionExecutor = Arc::new(move |_req| {
428            executed_for_exec.fetch_add(1, Ordering::Relaxed);
429            Ok(())
430        });
431
432        let pool = AsyncPromotionPool::new_with_executor(
433            PoolOpts {
434                queue_capacity: 32,
435                worker_count: 2,
436            },
437            executor,
438        );
439
440        for k in 0..20 {
441            pool.schedule(req(&format!("k{k}")));
442        }
443
444        Arc::clone(&pool).shutdown();
445
446        // Workers should drain everything queued before shutdown was set.
447        let deadline = Instant::now() + Duration::from_secs(2);
448        while executed.load(Ordering::Relaxed) < 20 && Instant::now() < deadline {
449            tokio::time::sleep(Duration::from_millis(5)).await;
450        }
451
452        assert_eq!(executed.load(Ordering::Relaxed), 20);
453
454        // Post-shutdown schedule is rejected.
455        let outcome = pool.schedule(req("late"));
456        assert_eq!(
457            outcome,
458            ScheduleOutcome::DroppedQueueFull {
459                evicted_oldest: false
460            }
461        );
462        assert!(pool.metrics().dropped_total >= 1);
463    }
464
465    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
466    async fn concurrent_schedulers_no_deadlock_all_completions_counted() {
467        let executed = Arc::new(AtomicUsize::new(0));
468        let executed_for_exec = Arc::clone(&executed);
469        let executor: PromotionExecutor = Arc::new(move |_req| {
470            executed_for_exec.fetch_add(1, Ordering::Relaxed);
471            Ok(())
472        });
473
474        let pool = AsyncPromotionPool::new_with_executor(
475            PoolOpts {
476                queue_capacity: 64,
477                worker_count: 2,
478            },
479            executor,
480        );
481
482        let producers = 8;
483        let per_producer = 200;
484        // Producers track local outcomes so we can check accounting
485        // without relying on any single internal counter formula. The
486        // pool's `dropped_total` aggregates two distinct events
487        // (evicted-oldest + outright-rejected), so we use the
488        // ScheduleOutcome variants directly.
489        let outright_drops = Arc::new(AtomicUsize::new(0));
490        let mut handles = Vec::new();
491        for p in 0..producers {
492            let pool_p = Arc::clone(&pool);
493            let drops_p = Arc::clone(&outright_drops);
494            handles.push(tokio::spawn(async move {
495                for i in 0..per_producer {
496                    let r = PromotionRequest {
497                        namespace: format!("ns{p}"),
498                        key: format!("k{i}"),
499                        bytes: Arc::from(vec![0u8; 4].into_boxed_slice()),
500                        policy: BlobCachePolicy::default(),
501                    };
502                    if let ScheduleOutcome::DroppedQueueFull {
503                        evicted_oldest: false,
504                    } = pool_p.schedule(r)
505                    {
506                        drops_p.fetch_add(1, Ordering::Relaxed);
507                    }
508                    if i % 32 == 0 {
509                        tokio::task::yield_now().await;
510                    }
511                }
512            }));
513        }
514        for h in handles {
515            h.await.unwrap();
516        }
517
518        // Invariant we check after producers finish AND workers catch up:
519        //
520        //   queued_total + outright_drops == submitted
521        //
522        // (every submission was either admitted or rejected outright; an
523        // "evicted oldest" event still admits the new request)
524        //
525        // and once the queue drains:
526        //
527        //   completed_total == queued_total - dropped_via_eviction
528        //                   == queued_total - (dropped_total - outright_drops)
529        //
530        // Equivalently: completed + dropped_total == queued + outright_drops.
531        let submitted = (producers * per_producer) as u64;
532
533        let deadline = Instant::now() + Duration::from_secs(5);
534        loop {
535            let m = pool.metrics();
536            let outright = outright_drops.load(Ordering::Relaxed) as u64;
537            let admitted_invariant = m.queued_total + outright == submitted;
538            let drained_invariant =
539                m.completed_total + m.dropped_total == m.queued_total + outright;
540            if admitted_invariant && drained_invariant && m.queue_depth == 0 {
541                break;
542            }
543            if Instant::now() > deadline {
544                panic!(
545                    "did not converge: submitted={submitted} queued={} dropped={} completed={} depth={} outright={}",
546                    m.queued_total, m.dropped_total, m.completed_total, m.queue_depth, outright
547                );
548            }
549            tokio::time::sleep(Duration::from_millis(10)).await;
550        }
551
552        Arc::clone(&pool).shutdown();
553    }
554
555    #[test]
556    fn metrics_snapshot_is_consistent_under_simple_load() {
557        let pool = pool_no_workers(8);
558        for k in 0..5 {
559            pool.schedule(req(&format!("k{k}")));
560        }
561        let m = pool.metrics();
562        assert_eq!(m.queued_total, 5);
563        assert_eq!(m.dropped_total, 0);
564        assert_eq!(m.completed_total, 0);
565        assert_eq!(m.queue_depth, 5);
566    }
567
568    /// Sanity: the executor sees the same bytes/key/namespace the producer
569    /// scheduled. Catches accidental Arc/Box mix-ups in the queue plumbing.
570    #[tokio::test]
571    async fn executor_receives_unmodified_request() {
572        let captured: Arc<Mutex<Vec<(String, String, usize)>>> = Arc::new(Mutex::new(Vec::new()));
573        let captured_for_exec = Arc::clone(&captured);
574        let executor: PromotionExecutor = Arc::new(move |req| {
575            captured_for_exec
576                .lock()
577                .unwrap()
578                .push((req.namespace, req.key, req.bytes.len()));
579            Ok(())
580        });
581
582        let pool = AsyncPromotionPool::new_with_executor(
583            PoolOpts {
584                queue_capacity: 4,
585                worker_count: 1,
586            },
587            executor,
588        );
589
590        pool.schedule(PromotionRequest {
591            namespace: "users".to_string(),
592            key: "42".to_string(),
593            bytes: Arc::from(vec![1u8, 2, 3, 4, 5].into_boxed_slice()),
594            policy: BlobCachePolicy::default(),
595        });
596
597        let deadline = Instant::now() + Duration::from_secs(2);
598        while captured.lock().unwrap().is_empty() && Instant::now() < deadline {
599            tokio::time::sleep(Duration::from_millis(5)).await;
600        }
601
602        let seen = captured.lock().unwrap().clone();
603        assert_eq!(seen, vec![("users".to_string(), "42".to_string(), 5)]);
604
605        Arc::clone(&pool).shutdown();
606    }
607}