use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crossbeam_queue::ArrayQueue;
use super::blob::BlobCachePolicy;
#[derive(Debug, Clone, Copy)]
pub struct PoolOpts {
pub queue_capacity: usize,
pub worker_count: usize,
}
impl Default for PoolOpts {
fn default() -> Self {
Self {
queue_capacity: 1024,
worker_count: 2,
}
}
}
#[derive(Debug, Clone)]
pub struct PromotionRequest {
pub namespace: String,
pub key: String,
pub bytes: Arc<[u8]>,
pub policy: BlobCachePolicy,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ScheduleOutcome {
Queued,
DroppedQueueFull { evicted_oldest: bool },
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct PromotionMetrics {
pub queued_total: u64,
pub dropped_total: u64,
pub completed_total: u64,
pub queue_depth: usize,
}
pub type PromotionExecutor =
Arc<dyn Fn(PromotionRequest) -> Result<(), String> + Send + Sync + 'static>;
pub struct AsyncPromotionPool {
queue: Arc<ArrayQueue<PromotionRequest>>,
executor: PromotionExecutor,
shutdown: Arc<AtomicBool>,
queued_total: AtomicU64,
dropped_total: AtomicU64,
completed_total: AtomicU64,
drain_budget: usize,
}
impl AsyncPromotionPool {
pub fn new(opts: PoolOpts) -> Arc<Self> {
Self::new_with_executor(opts, Arc::new(|_| Ok(())))
}
pub fn new_with_executor(opts: PoolOpts, executor: PromotionExecutor) -> Arc<Self> {
let capacity = opts.queue_capacity.max(1);
let workers = opts.worker_count.max(1);
let pool = Arc::new(Self {
queue: Arc::new(ArrayQueue::new(capacity)),
executor,
shutdown: Arc::new(AtomicBool::new(false)),
queued_total: AtomicU64::new(0),
dropped_total: AtomicU64::new(0),
completed_total: AtomicU64::new(0),
drain_budget: capacity,
});
for _ in 0..workers {
let pool_for_worker = Arc::clone(&pool);
tokio::spawn(async move {
worker_loop(pool_for_worker).await;
});
}
pool
}
pub fn schedule(&self, request: PromotionRequest) -> ScheduleOutcome {
if self.shutdown.load(Ordering::Acquire) {
self.dropped_total.fetch_add(1, Ordering::Relaxed);
return ScheduleOutcome::DroppedQueueFull {
evicted_oldest: false,
};
}
match self.queue.push(request) {
Ok(()) => {
self.queued_total.fetch_add(1, Ordering::Relaxed);
ScheduleOutcome::Queued
}
Err(rejected) => {
let evicted_oldest = self.queue.pop().is_some();
if evicted_oldest {
self.dropped_total.fetch_add(1, Ordering::Relaxed);
}
match self.queue.push(rejected) {
Ok(()) => {
self.queued_total.fetch_add(1, Ordering::Relaxed);
ScheduleOutcome::DroppedQueueFull { evicted_oldest }
}
Err(_) => {
self.dropped_total.fetch_add(1, Ordering::Relaxed);
ScheduleOutcome::DroppedQueueFull {
evicted_oldest: false,
}
}
}
}
}
}
pub fn shutdown(self: Arc<Self>) {
self.shutdown.store(true, Ordering::Release);
}
pub fn metrics(&self) -> PromotionMetrics {
PromotionMetrics {
queued_total: self.queued_total.load(Ordering::Relaxed),
dropped_total: self.dropped_total.load(Ordering::Relaxed),
completed_total: self.completed_total.load(Ordering::Relaxed),
queue_depth: self.queue.len(),
}
}
}
const WORKER_IDLE_BACKOFF: Duration = Duration::from_millis(1);
async fn worker_loop(pool: Arc<AsyncPromotionPool>) {
loop {
match pool.queue.pop() {
Some(req) => {
if let Err(err) = (pool.executor)(req) {
tracing::warn!(error = %err, "async promotion executor failed");
}
pool.completed_total.fetch_add(1, Ordering::Relaxed);
}
None => {
if pool.shutdown.load(Ordering::Acquire) {
let mut drained = 0;
while drained < pool.drain_budget {
match pool.queue.pop() {
Some(req) => {
let _ = (pool.executor)(req);
pool.completed_total.fetch_add(1, Ordering::Relaxed);
drained += 1;
}
None => break,
}
}
return;
}
tokio::time::sleep(WORKER_IDLE_BACKOFF).await;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicUsize;
use std::sync::Mutex;
use std::time::Instant;
fn req(key: &str) -> PromotionRequest {
PromotionRequest {
namespace: "ns".to_string(),
key: key.to_string(),
bytes: Arc::from(vec![0u8; 8].into_boxed_slice()),
policy: BlobCachePolicy::default(),
}
}
fn pool_no_workers(capacity: usize) -> Arc<AsyncPromotionPool> {
Arc::new(AsyncPromotionPool {
queue: Arc::new(ArrayQueue::new(capacity)),
executor: Arc::new(|_| Ok(())),
shutdown: Arc::new(AtomicBool::new(false)),
queued_total: AtomicU64::new(0),
dropped_total: AtomicU64::new(0),
completed_total: AtomicU64::new(0),
drain_budget: capacity,
})
}
#[test]
fn schedule_returns_queued_when_capacity_available() {
let pool = pool_no_workers(4);
assert_eq!(pool.schedule(req("a")), ScheduleOutcome::Queued);
assert_eq!(pool.schedule(req("b")), ScheduleOutcome::Queued);
assert_eq!(pool.metrics().queued_total, 2);
assert_eq!(pool.metrics().queue_depth, 2);
}
#[test]
fn schedule_drops_oldest_when_saturated() {
let pool = pool_no_workers(2);
assert_eq!(pool.schedule(req("a")), ScheduleOutcome::Queued);
assert_eq!(pool.schedule(req("b")), ScheduleOutcome::Queued);
let outcome = pool.schedule(req("c"));
assert_eq!(
outcome,
ScheduleOutcome::DroppedQueueFull {
evicted_oldest: true
}
);
assert_eq!(pool.metrics().dropped_total, 1);
assert_eq!(pool.metrics().queue_depth, 2);
}
#[test]
fn drop_oldest_semantics_preserve_newest() {
let cap = 3;
let pool = pool_no_workers(cap);
for k in ["a", "b", "c"] {
assert_eq!(pool.schedule(req(k)), ScheduleOutcome::Queued);
}
assert_eq!(
pool.schedule(req("d")),
ScheduleOutcome::DroppedQueueFull {
evicted_oldest: true
}
);
let mut seen = Vec::new();
while let Some(r) = pool.queue.pop() {
seen.push(r.key);
}
assert_eq!(
seen,
vec!["b".to_string(), "c".to_string(), "d".to_string()]
);
}
#[tokio::test]
async fn worker_executes_injected_closure() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_for_exec = Arc::clone(&counter);
let executor: PromotionExecutor = Arc::new(move |_req| {
counter_for_exec.fetch_add(1, Ordering::Relaxed);
Ok(())
});
let pool = AsyncPromotionPool::new_with_executor(
PoolOpts {
queue_capacity: 16,
worker_count: 1,
},
executor,
);
for k in 0..5 {
pool.schedule(req(&format!("k{k}")));
}
let deadline = Instant::now() + Duration::from_secs(2);
while counter.load(Ordering::Relaxed) < 5 && Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(5)).await;
}
assert_eq!(counter.load(Ordering::Relaxed), 5);
assert_eq!(pool.metrics().completed_total, 5);
Arc::clone(&pool).shutdown();
}
#[tokio::test]
async fn shutdown_drains_queue_within_budget() {
let executed = Arc::new(AtomicUsize::new(0));
let executed_for_exec = Arc::clone(&executed);
let executor: PromotionExecutor = Arc::new(move |_req| {
executed_for_exec.fetch_add(1, Ordering::Relaxed);
Ok(())
});
let pool = AsyncPromotionPool::new_with_executor(
PoolOpts {
queue_capacity: 32,
worker_count: 2,
},
executor,
);
for k in 0..20 {
pool.schedule(req(&format!("k{k}")));
}
Arc::clone(&pool).shutdown();
let deadline = Instant::now() + Duration::from_secs(2);
while executed.load(Ordering::Relaxed) < 20 && Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(5)).await;
}
assert_eq!(executed.load(Ordering::Relaxed), 20);
let outcome = pool.schedule(req("late"));
assert_eq!(
outcome,
ScheduleOutcome::DroppedQueueFull {
evicted_oldest: false
}
);
assert!(pool.metrics().dropped_total >= 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_schedulers_no_deadlock_all_completions_counted() {
let executed = Arc::new(AtomicUsize::new(0));
let executed_for_exec = Arc::clone(&executed);
let executor: PromotionExecutor = Arc::new(move |_req| {
executed_for_exec.fetch_add(1, Ordering::Relaxed);
Ok(())
});
let pool = AsyncPromotionPool::new_with_executor(
PoolOpts {
queue_capacity: 64,
worker_count: 2,
},
executor,
);
let producers = 8;
let per_producer = 200;
let outright_drops = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for p in 0..producers {
let pool_p = Arc::clone(&pool);
let drops_p = Arc::clone(&outright_drops);
handles.push(tokio::spawn(async move {
for i in 0..per_producer {
let r = PromotionRequest {
namespace: format!("ns{p}"),
key: format!("k{i}"),
bytes: Arc::from(vec![0u8; 4].into_boxed_slice()),
policy: BlobCachePolicy::default(),
};
if let ScheduleOutcome::DroppedQueueFull {
evicted_oldest: false,
} = pool_p.schedule(r)
{
drops_p.fetch_add(1, Ordering::Relaxed);
}
if i % 32 == 0 {
tokio::task::yield_now().await;
}
}
}));
}
for h in handles {
h.await.unwrap();
}
let submitted = (producers * per_producer) as u64;
let deadline = Instant::now() + Duration::from_secs(5);
loop {
let m = pool.metrics();
let outright = outright_drops.load(Ordering::Relaxed) as u64;
let admitted_invariant = m.queued_total + outright == submitted;
let drained_invariant =
m.completed_total + m.dropped_total == m.queued_total + outright;
if admitted_invariant && drained_invariant && m.queue_depth == 0 {
break;
}
if Instant::now() > deadline {
panic!(
"did not converge: submitted={submitted} queued={} dropped={} completed={} depth={} outright={}",
m.queued_total, m.dropped_total, m.completed_total, m.queue_depth, outright
);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
Arc::clone(&pool).shutdown();
}
#[test]
fn metrics_snapshot_is_consistent_under_simple_load() {
let pool = pool_no_workers(8);
for k in 0..5 {
pool.schedule(req(&format!("k{k}")));
}
let m = pool.metrics();
assert_eq!(m.queued_total, 5);
assert_eq!(m.dropped_total, 0);
assert_eq!(m.completed_total, 0);
assert_eq!(m.queue_depth, 5);
}
#[tokio::test]
async fn executor_receives_unmodified_request() {
let captured: Arc<Mutex<Vec<(String, String, usize)>>> = Arc::new(Mutex::new(Vec::new()));
let captured_for_exec = Arc::clone(&captured);
let executor: PromotionExecutor = Arc::new(move |req| {
captured_for_exec
.lock()
.unwrap()
.push((req.namespace, req.key, req.bytes.len()));
Ok(())
});
let pool = AsyncPromotionPool::new_with_executor(
PoolOpts {
queue_capacity: 4,
worker_count: 1,
},
executor,
);
pool.schedule(PromotionRequest {
namespace: "users".to_string(),
key: "42".to_string(),
bytes: Arc::from(vec![1u8, 2, 3, 4, 5].into_boxed_slice()),
policy: BlobCachePolicy::default(),
});
let deadline = Instant::now() + Duration::from_secs(2);
while captured.lock().unwrap().is_empty() && Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(5)).await;
}
let seen = captured.lock().unwrap().clone();
assert_eq!(seen, vec![("users".to_string(), "42".to_string(), 5)]);
Arc::clone(&pool).shutdown();
}
}