reddb_server/storage/cache/promotion_pool.rs
1//! Async promotion pool — turbo module for off-CPU L1 cache promotion.
2//!
3//! # Why this module exists
4//!
5//! `BlobCache::get` is on the read hot path. Today, an L2 hit synchronously
6//! promotes the blob to L1 (mutates `RwLock`s, runs admission policy, fires
7//! eviction loops). That promotion work — bookkeeping, not the actual byte
8//! transfer — adds tens of microseconds to every L2 hit.
9//!
10//! `AsyncPromotionPool` decouples the two: `get` decides "this should go to
11//! L1" and hands the request to a bounded queue, then returns the bytes to
12//! the caller immediately. A small worker pool drains the queue and performs
13//! the promotion off the read path.
14//!
15//! Inspired by Postgres's `bgwriter` and Linux's `kswapd`: the slow,
16//! housekeeping work belongs on a dedicated thread, not in the hot path.
17//!
18//! # Design
19//!
20//! - **Bounded queue** (`crossbeam::queue::ArrayQueue`) — back-pressure
21//! without unbounded memory growth.
22//! - **Drop-oldest on saturation** — when the queue is full we evict the
23//! oldest pending request and admit the new one. Rationale: under load
24//! the freshest accesses are the most likely to be re-read soon, so a
25//! FIFO drop loses the least value.
26//! - **Decoupled executor** — the closure that performs the actual
27//! promotion is injected at construction (`new_with_executor`). The pool
28//! knows nothing about `BlobCache` and is therefore unit-testable in
29//! isolation.
30//! - **Atomic metrics** — counters are `Relaxed`-incremented by hot paths;
31//! `metrics()` returns a consistent snapshot.
32//! - **Graceful shutdown** — `shutdown` flips a flag, lets workers drain a
33//! bounded number of remaining requests, then they exit.
34//!
35//! # Wiring (deferred)
36//!
37//! This file is purely additive. Wiring into `BlobCache::get` and the
38//! `pub mod promotion_pool;` registration in `mod.rs` happen in a
39//! sequential follow-up, after all three turbo modules from issue #193
40//! have shipped.
41
42use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
43use std::sync::Arc;
44use std::time::Duration;
45
46use crossbeam_queue::ArrayQueue;
47
48use super::blob::BlobCachePolicy;
49
50// ---------------------------------------------------------------------------
51// Public surface
52// ---------------------------------------------------------------------------
53
54/// Configuration for the promotion pool.
55#[derive(Debug, Clone, Copy)]
56pub struct PoolOpts {
57 /// Maximum number of pending promotion requests. When full, the pool
58 /// drops the oldest entry to admit the newest (see `ScheduleOutcome`).
59 pub queue_capacity: usize,
60 /// Number of tokio worker tasks draining the queue.
61 pub worker_count: usize,
62}
63
64impl Default for PoolOpts {
65 fn default() -> Self {
66 Self {
67 queue_capacity: 1024,
68 worker_count: 2,
69 }
70 }
71}
72
73/// A single async promotion request handed to the pool by `BlobCache::get`
74/// (or, in tests, by the test harness).
75///
76/// `bytes` is `Arc<[u8]>` so that the same buffer the caller is returning
77/// to the user is shared zero-copy with the L1 promotion.
78#[derive(Debug, Clone)]
79pub struct PromotionRequest {
80 pub namespace: String,
81 pub key: String,
82 pub bytes: Arc<[u8]>,
83 pub policy: BlobCachePolicy,
84}
85
86/// Result of `AsyncPromotionPool::schedule`.
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum ScheduleOutcome {
89 /// Request accepted into the queue.
90 Queued,
91 /// Queue was full. `evicted_oldest = true` means we dropped the oldest
92 /// pending request to admit this one. `evicted_oldest = false` means the
93 /// queue was so contended even the eviction `pop` failed and *this*
94 /// request was dropped instead.
95 DroppedQueueFull { evicted_oldest: bool },
96}
97
98/// Snapshot of the pool's atomic counters. Returned by `metrics()`.
99#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
100pub struct PromotionMetrics {
101 pub queued_total: u64,
102 pub dropped_total: u64,
103 pub completed_total: u64,
104 pub queue_depth: usize,
105}
106
107/// The closure that actually performs the L1 promotion. Injected at
108/// construction so this module has no compile-time dependency on
109/// `BlobCache`.
110///
111/// The wiring slice will pass a closure of the form:
112///
113/// ```ignore
114/// let cache = self.clone();
115/// Arc::new(move |req| cache.do_l1_promotion(req))
116/// ```
117pub type PromotionExecutor =
118 Arc<dyn Fn(PromotionRequest) -> Result<(), String> + Send + Sync + 'static>;
119
120// ---------------------------------------------------------------------------
121// AsyncPromotionPool
122// ---------------------------------------------------------------------------
123
124/// Bounded, drop-oldest async promotion pool.
125///
126/// See module docs for design rationale.
127pub struct AsyncPromotionPool {
128 queue: Arc<ArrayQueue<PromotionRequest>>,
129 executor: PromotionExecutor,
130 shutdown: Arc<AtomicBool>,
131
132 queued_total: AtomicU64,
133 dropped_total: AtomicU64,
134 completed_total: AtomicU64,
135
136 /// Soft cap on how many remaining requests workers will drain after
137 /// `shutdown()` is called before exiting. Prevents a pathological
138 /// flood of late requests from blocking shutdown indefinitely.
139 drain_budget: usize,
140}
141
142impl AsyncPromotionPool {
143 /// Construct a pool with a no-op executor. Useful only for tests / dry
144 /// runs where you want metrics but no actual promotion side-effects.
145 pub fn new(opts: PoolOpts) -> Arc<Self> {
146 Self::new_with_executor(opts, Arc::new(|_| Ok(())))
147 }
148
149 /// Construct a pool with a caller-provided executor closure.
150 ///
151 /// Spawns `opts.worker_count` tokio tasks that drain the queue. Each
152 /// task holds a `Weak<Self>` so the pool is dropped cleanly once the
153 /// caller releases its `Arc` and the workers exit.
154 pub fn new_with_executor(opts: PoolOpts, executor: PromotionExecutor) -> Arc<Self> {
155 let capacity = opts.queue_capacity.max(1);
156 let workers = opts.worker_count.max(1);
157
158 let pool = Arc::new(Self {
159 queue: Arc::new(ArrayQueue::new(capacity)),
160 executor,
161 shutdown: Arc::new(AtomicBool::new(false)),
162 queued_total: AtomicU64::new(0),
163 dropped_total: AtomicU64::new(0),
164 completed_total: AtomicU64::new(0),
165 // Drain at most one queue-worth of late requests per worker.
166 drain_budget: capacity,
167 });
168
169 for _ in 0..workers {
170 let pool_for_worker = Arc::clone(&pool);
171 tokio::spawn(async move {
172 worker_loop(pool_for_worker).await;
173 });
174 }
175
176 pool
177 }
178
179 /// Hand a promotion request to the pool.
180 ///
181 /// Never blocks. If the queue has capacity, the request is enqueued and
182 /// `Queued` is returned. If the queue is full, the oldest request is
183 /// popped (and dropped) to make room — the caller learns this via
184 /// `DroppedQueueFull { evicted_oldest: true }`. In the rare case where
185 /// the queue is so contended that even the `pop` fails, the *new*
186 /// request is dropped: `DroppedQueueFull { evicted_oldest: false }`.
187 pub fn schedule(&self, request: PromotionRequest) -> ScheduleOutcome {
188 // After shutdown, refuse new work to keep the drain bounded.
189 if self.shutdown.load(Ordering::Acquire) {
190 self.dropped_total.fetch_add(1, Ordering::Relaxed);
191 return ScheduleOutcome::DroppedQueueFull {
192 evicted_oldest: false,
193 };
194 }
195
196 match self.queue.push(request) {
197 Ok(()) => {
198 self.queued_total.fetch_add(1, Ordering::Relaxed);
199 ScheduleOutcome::Queued
200 }
201 Err(rejected) => {
202 // Full. Try to evict oldest to admit the newest.
203 let evicted_oldest = self.queue.pop().is_some();
204 if evicted_oldest {
205 self.dropped_total.fetch_add(1, Ordering::Relaxed);
206 }
207 match self.queue.push(rejected) {
208 Ok(()) => {
209 self.queued_total.fetch_add(1, Ordering::Relaxed);
210 ScheduleOutcome::DroppedQueueFull { evicted_oldest }
211 }
212 Err(_) => {
213 // Lost the race — another producer refilled the
214 // slot before us. Drop the new request.
215 self.dropped_total.fetch_add(1, Ordering::Relaxed);
216 ScheduleOutcome::DroppedQueueFull {
217 evicted_oldest: false,
218 }
219 }
220 }
221 }
222 }
223 }
224
225 /// Signal workers to drain remaining work and exit.
226 ///
227 /// Workers will process at most `drain_budget` more requests after the
228 /// shutdown flag is observed, then return. New `schedule` calls after
229 /// shutdown are rejected (counted in `dropped_total`).
230 pub fn shutdown(self: Arc<Self>) {
231 self.shutdown.store(true, Ordering::Release);
232 }
233
234 /// Snapshot of the pool's atomic counters.
235 ///
236 /// Each counter is read with `Relaxed` ordering; the snapshot is not
237 /// strictly atomic across counters (it can show, e.g., one more
238 /// `queued_total` than the queue depth implies if a worker is mid-pop).
239 /// This is acceptable for monitoring; consumers that need a strictly
240 /// consistent view should sample twice and take the difference.
241 pub fn metrics(&self) -> PromotionMetrics {
242 PromotionMetrics {
243 queued_total: self.queued_total.load(Ordering::Relaxed),
244 dropped_total: self.dropped_total.load(Ordering::Relaxed),
245 completed_total: self.completed_total.load(Ordering::Relaxed),
246 queue_depth: self.queue.len(),
247 }
248 }
249}
250
251// ---------------------------------------------------------------------------
252// Worker loop
253// ---------------------------------------------------------------------------
254
255/// Idle backoff between empty polls. Short enough that latency stays low,
256/// long enough that an idle pool doesn't spin a CPU.
257const WORKER_IDLE_BACKOFF: Duration = Duration::from_millis(1);
258
259async fn worker_loop(pool: Arc<AsyncPromotionPool>) {
260 loop {
261 match pool.queue.pop() {
262 Some(req) => {
263 // Run the executor. We swallow errors here because the
264 // promotion is best-effort by definition — the read path
265 // already handed bytes to the user. Errors are surfaced
266 // via tracing for observability.
267 if let Err(err) = (pool.executor)(req) {
268 tracing::warn!(error = %err, "async promotion executor failed");
269 }
270 pool.completed_total.fetch_add(1, Ordering::Relaxed);
271 }
272 None => {
273 // Queue empty.
274 if pool.shutdown.load(Ordering::Acquire) {
275 // Drain at most `drain_budget` more items (in case a
276 // late `schedule` slipped in before the shutdown flag
277 // was published) and exit.
278 let mut drained = 0;
279 while drained < pool.drain_budget {
280 match pool.queue.pop() {
281 Some(req) => {
282 let _ = (pool.executor)(req);
283 pool.completed_total.fetch_add(1, Ordering::Relaxed);
284 drained += 1;
285 }
286 None => break,
287 }
288 }
289 return;
290 }
291 tokio::time::sleep(WORKER_IDLE_BACKOFF).await;
292 }
293 }
294 }
295}
296
297// ---------------------------------------------------------------------------
298// Tests
299// ---------------------------------------------------------------------------
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304 use std::sync::atomic::AtomicUsize;
305 use std::sync::Mutex;
306 use std::time::Instant;
307
308 fn req(key: &str) -> PromotionRequest {
309 PromotionRequest {
310 namespace: "ns".to_string(),
311 key: key.to_string(),
312 bytes: Arc::from(vec![0u8; 8].into_boxed_slice()),
313 policy: BlobCachePolicy::default(),
314 }
315 }
316
317 /// Build a pool whose executor never runs (no workers spawned by us);
318 /// used for queue-semantics tests where worker drain would race.
319 ///
320 /// We do this by constructing the pool manually rather than going
321 /// through `new_with_executor`, which always spawns workers. For pure
322 /// queue-semantics tests we want zero concurrent draining.
323 fn pool_no_workers(capacity: usize) -> Arc<AsyncPromotionPool> {
324 Arc::new(AsyncPromotionPool {
325 queue: Arc::new(ArrayQueue::new(capacity)),
326 executor: Arc::new(|_| Ok(())),
327 shutdown: Arc::new(AtomicBool::new(false)),
328 queued_total: AtomicU64::new(0),
329 dropped_total: AtomicU64::new(0),
330 completed_total: AtomicU64::new(0),
331 drain_budget: capacity,
332 })
333 }
334
335 #[test]
336 fn schedule_returns_queued_when_capacity_available() {
337 let pool = pool_no_workers(4);
338 assert_eq!(pool.schedule(req("a")), ScheduleOutcome::Queued);
339 assert_eq!(pool.schedule(req("b")), ScheduleOutcome::Queued);
340 assert_eq!(pool.metrics().queued_total, 2);
341 assert_eq!(pool.metrics().queue_depth, 2);
342 }
343
344 #[test]
345 fn schedule_drops_oldest_when_saturated() {
346 let pool = pool_no_workers(2);
347 assert_eq!(pool.schedule(req("a")), ScheduleOutcome::Queued);
348 assert_eq!(pool.schedule(req("b")), ScheduleOutcome::Queued);
349
350 let outcome = pool.schedule(req("c"));
351 assert_eq!(
352 outcome,
353 ScheduleOutcome::DroppedQueueFull {
354 evicted_oldest: true
355 }
356 );
357 assert_eq!(pool.metrics().dropped_total, 1);
358 assert_eq!(pool.metrics().queue_depth, 2);
359 }
360
361 #[test]
362 fn drop_oldest_semantics_preserve_newest() {
363 // Insert N+1 items into a capacity-N queue. The oldest must be
364 // gone, the newest must survive.
365 let cap = 3;
366 let pool = pool_no_workers(cap);
367
368 for k in ["a", "b", "c"] {
369 assert_eq!(pool.schedule(req(k)), ScheduleOutcome::Queued);
370 }
371 // Saturating insert.
372 assert_eq!(
373 pool.schedule(req("d")),
374 ScheduleOutcome::DroppedQueueFull {
375 evicted_oldest: true
376 }
377 );
378
379 // Drain in FIFO order and check contents.
380 let mut seen = Vec::new();
381 while let Some(r) = pool.queue.pop() {
382 seen.push(r.key);
383 }
384 assert_eq!(
385 seen,
386 vec!["b".to_string(), "c".to_string(), "d".to_string()]
387 );
388 }
389
390 #[tokio::test]
391 async fn worker_executes_injected_closure() {
392 let counter = Arc::new(AtomicUsize::new(0));
393 let counter_for_exec = Arc::clone(&counter);
394 let executor: PromotionExecutor = Arc::new(move |_req| {
395 counter_for_exec.fetch_add(1, Ordering::Relaxed);
396 Ok(())
397 });
398
399 let pool = AsyncPromotionPool::new_with_executor(
400 PoolOpts {
401 queue_capacity: 16,
402 worker_count: 1,
403 },
404 executor,
405 );
406
407 for k in 0..5 {
408 pool.schedule(req(&format!("k{k}")));
409 }
410
411 // Wait for workers to drain.
412 let deadline = Instant::now() + Duration::from_secs(2);
413 while counter.load(Ordering::Relaxed) < 5 && Instant::now() < deadline {
414 tokio::time::sleep(Duration::from_millis(5)).await;
415 }
416
417 assert_eq!(counter.load(Ordering::Relaxed), 5);
418 assert_eq!(pool.metrics().completed_total, 5);
419
420 Arc::clone(&pool).shutdown();
421 }
422
423 #[tokio::test]
424 async fn shutdown_drains_queue_within_budget() {
425 let executed = Arc::new(AtomicUsize::new(0));
426 let executed_for_exec = Arc::clone(&executed);
427 let executor: PromotionExecutor = Arc::new(move |_req| {
428 executed_for_exec.fetch_add(1, Ordering::Relaxed);
429 Ok(())
430 });
431
432 let pool = AsyncPromotionPool::new_with_executor(
433 PoolOpts {
434 queue_capacity: 32,
435 worker_count: 2,
436 },
437 executor,
438 );
439
440 for k in 0..20 {
441 pool.schedule(req(&format!("k{k}")));
442 }
443
444 Arc::clone(&pool).shutdown();
445
446 // Workers should drain everything queued before shutdown was set.
447 let deadline = Instant::now() + Duration::from_secs(2);
448 while executed.load(Ordering::Relaxed) < 20 && Instant::now() < deadline {
449 tokio::time::sleep(Duration::from_millis(5)).await;
450 }
451
452 assert_eq!(executed.load(Ordering::Relaxed), 20);
453
454 // Post-shutdown schedule is rejected.
455 let outcome = pool.schedule(req("late"));
456 assert_eq!(
457 outcome,
458 ScheduleOutcome::DroppedQueueFull {
459 evicted_oldest: false
460 }
461 );
462 assert!(pool.metrics().dropped_total >= 1);
463 }
464
465 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
466 async fn concurrent_schedulers_no_deadlock_all_completions_counted() {
467 let executed = Arc::new(AtomicUsize::new(0));
468 let executed_for_exec = Arc::clone(&executed);
469 let executor: PromotionExecutor = Arc::new(move |_req| {
470 executed_for_exec.fetch_add(1, Ordering::Relaxed);
471 Ok(())
472 });
473
474 let pool = AsyncPromotionPool::new_with_executor(
475 PoolOpts {
476 queue_capacity: 64,
477 worker_count: 2,
478 },
479 executor,
480 );
481
482 let producers = 8;
483 let per_producer = 200;
484 // Producers track local outcomes so we can check accounting
485 // without relying on any single internal counter formula. The
486 // pool's `dropped_total` aggregates two distinct events
487 // (evicted-oldest + outright-rejected), so we use the
488 // ScheduleOutcome variants directly.
489 let outright_drops = Arc::new(AtomicUsize::new(0));
490 let mut handles = Vec::new();
491 for p in 0..producers {
492 let pool_p = Arc::clone(&pool);
493 let drops_p = Arc::clone(&outright_drops);
494 handles.push(tokio::spawn(async move {
495 for i in 0..per_producer {
496 let r = PromotionRequest {
497 namespace: format!("ns{p}"),
498 key: format!("k{i}"),
499 bytes: Arc::from(vec![0u8; 4].into_boxed_slice()),
500 policy: BlobCachePolicy::default(),
501 };
502 if let ScheduleOutcome::DroppedQueueFull {
503 evicted_oldest: false,
504 } = pool_p.schedule(r)
505 {
506 drops_p.fetch_add(1, Ordering::Relaxed);
507 }
508 if i % 32 == 0 {
509 tokio::task::yield_now().await;
510 }
511 }
512 }));
513 }
514 for h in handles {
515 h.await.unwrap();
516 }
517
518 // Invariant we check after producers finish AND workers catch up:
519 //
520 // queued_total + outright_drops == submitted
521 //
522 // (every submission was either admitted or rejected outright; an
523 // "evicted oldest" event still admits the new request)
524 //
525 // and once the queue drains:
526 //
527 // completed_total == queued_total - dropped_via_eviction
528 // == queued_total - (dropped_total - outright_drops)
529 //
530 // Equivalently: completed + dropped_total == queued + outright_drops.
531 let submitted = (producers * per_producer) as u64;
532
533 let deadline = Instant::now() + Duration::from_secs(5);
534 loop {
535 let m = pool.metrics();
536 let outright = outright_drops.load(Ordering::Relaxed) as u64;
537 let admitted_invariant = m.queued_total + outright == submitted;
538 let drained_invariant =
539 m.completed_total + m.dropped_total == m.queued_total + outright;
540 if admitted_invariant && drained_invariant && m.queue_depth == 0 {
541 break;
542 }
543 if Instant::now() > deadline {
544 panic!(
545 "did not converge: submitted={submitted} queued={} dropped={} completed={} depth={} outright={}",
546 m.queued_total, m.dropped_total, m.completed_total, m.queue_depth, outright
547 );
548 }
549 tokio::time::sleep(Duration::from_millis(10)).await;
550 }
551
552 Arc::clone(&pool).shutdown();
553 }
554
555 #[test]
556 fn metrics_snapshot_is_consistent_under_simple_load() {
557 let pool = pool_no_workers(8);
558 for k in 0..5 {
559 pool.schedule(req(&format!("k{k}")));
560 }
561 let m = pool.metrics();
562 assert_eq!(m.queued_total, 5);
563 assert_eq!(m.dropped_total, 0);
564 assert_eq!(m.completed_total, 0);
565 assert_eq!(m.queue_depth, 5);
566 }
567
568 /// Sanity: the executor sees the same bytes/key/namespace the producer
569 /// scheduled. Catches accidental Arc/Box mix-ups in the queue plumbing.
570 #[tokio::test]
571 async fn executor_receives_unmodified_request() {
572 let captured: Arc<Mutex<Vec<(String, String, usize)>>> = Arc::new(Mutex::new(Vec::new()));
573 let captured_for_exec = Arc::clone(&captured);
574 let executor: PromotionExecutor = Arc::new(move |req| {
575 captured_for_exec
576 .lock()
577 .unwrap()
578 .push((req.namespace, req.key, req.bytes.len()));
579 Ok(())
580 });
581
582 let pool = AsyncPromotionPool::new_with_executor(
583 PoolOpts {
584 queue_capacity: 4,
585 worker_count: 1,
586 },
587 executor,
588 );
589
590 pool.schedule(PromotionRequest {
591 namespace: "users".to_string(),
592 key: "42".to_string(),
593 bytes: Arc::from(vec![1u8, 2, 3, 4, 5].into_boxed_slice()),
594 policy: BlobCachePolicy::default(),
595 });
596
597 let deadline = Instant::now() + Duration::from_secs(2);
598 while captured.lock().unwrap().is_empty() && Instant::now() < deadline {
599 tokio::time::sleep(Duration::from_millis(5)).await;
600 }
601
602 let seen = captured.lock().unwrap().clone();
603 assert_eq!(seen, vec![("users".to_string(), "42".to_string(), 5)]);
604
605 Arc::clone(&pool).shutdown();
606 }
607}