use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
use crate::error::StreamingError;
#[derive(Debug, Clone)]
pub struct CreditPool {
credits: Arc<AtomicI64>,
capacity: i64,
}
impl CreditPool {
pub fn new(capacity: i64) -> Self {
assert!(capacity > 0, "CreditPool capacity must be positive");
Self {
credits: Arc::new(AtomicI64::new(capacity)),
capacity,
}
}
pub fn try_acquire(&self, n: i64) -> bool {
assert!(n > 0, "must acquire at least 1 credit");
let mut current = self.credits.load(Ordering::Relaxed);
loop {
if current < n {
return false;
}
match self.credits.compare_exchange_weak(
current,
current - n,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(actual) => current = actual,
}
}
}
pub fn release(&self, n: i64) {
assert!(n > 0, "must release at least 1 credit");
let prev = self.credits.fetch_add(n, Ordering::AcqRel);
let after = prev + n;
if after > self.capacity {
let mut current = after;
loop {
if current <= self.capacity {
break;
}
match self.credits.compare_exchange_weak(
current,
self.capacity,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
}
pub fn available(&self) -> i64 {
self.credits.load(Ordering::Acquire)
}
pub fn capacity(&self) -> i64 {
self.capacity
}
pub fn utilization(&self) -> f64 {
let avail = self.available().max(0);
1.0 - (avail as f64 / self.capacity as f64)
}
}
#[derive(Debug)]
pub struct PendingItem<T> {
pub item: T,
pub credits_required: i64,
}
pub struct BackpressureProducer<T> {
pool: CreditPool,
pending: VecDeque<PendingItem<T>>,
emitted_total: u64,
dropped_total: u64,
}
impl<T> BackpressureProducer<T> {
pub fn new(pool: CreditPool) -> Self {
Self {
pool,
pending: VecDeque::new(),
emitted_total: 0,
dropped_total: 0,
}
}
pub fn try_emit(&mut self, item: T, credits: i64) -> Result<bool, StreamingError> {
if credits <= 0 {
return Err(StreamingError::InvalidOperation(
"credits must be positive".into(),
));
}
if self.pool.try_acquire(credits) {
self.pending.push_back(PendingItem {
item,
credits_required: credits,
});
self.emitted_total += 1;
Ok(true)
} else {
self.dropped_total += 1;
Ok(false)
}
}
pub fn drain(&mut self) -> impl Iterator<Item = PendingItem<T>> + '_ {
self.pending.drain(..)
}
pub fn emitted_total(&self) -> u64 {
self.emitted_total
}
pub fn dropped_total(&self) -> u64 {
self.dropped_total
}
pub fn pending_count(&self) -> usize {
self.pending.len()
}
pub fn pool(&self) -> &CreditPool {
&self.pool
}
}
pub struct BackpressureConsumer {
pool: CreditPool,
consumed_total: u64,
}
impl BackpressureConsumer {
pub fn new(pool: CreditPool) -> Self {
Self {
pool,
consumed_total: 0,
}
}
pub fn consume(&mut self, credits: i64) {
self.pool.release(credits);
self.consumed_total += 1;
}
pub fn consumed_total(&self) -> u64 {
self.consumed_total
}
pub fn pool(&self) -> &CreditPool {
&self.pool
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_credit_pool_initial_credits() {
let pool = CreditPool::new(100);
assert_eq!(pool.available(), 100);
assert_eq!(pool.capacity(), 100);
}
#[test]
fn test_credit_pool_try_acquire_success() {
let pool = CreditPool::new(50);
assert!(pool.try_acquire(30));
assert_eq!(pool.available(), 20);
}
#[test]
fn test_credit_pool_try_acquire_fail_insufficient() {
let pool = CreditPool::new(10);
assert!(!pool.try_acquire(11));
assert_eq!(pool.available(), 10); }
#[test]
fn test_credit_pool_release_replenishes() {
let pool = CreditPool::new(100);
assert!(pool.try_acquire(40));
pool.release(40);
assert_eq!(pool.available(), 100);
}
#[test]
fn test_credit_pool_over_release_clamped_to_capacity() {
let pool = CreditPool::new(50);
pool.release(30); assert_eq!(pool.available(), 50);
}
#[test]
fn test_credit_pool_utilization_zero_when_full() {
let pool = CreditPool::new(100);
assert!((pool.utilization() - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_credit_pool_utilization_one_when_empty() {
let pool = CreditPool::new(100);
assert!(pool.try_acquire(100));
assert!((pool.utilization() - 1.0).abs() < f64::EPSILON);
}
#[test]
fn test_producer_emit_success() {
let pool = CreditPool::new(10);
let mut producer = BackpressureProducer::new(pool);
let ok = producer
.try_emit("hello", 5)
.expect("try_emit should not error");
assert!(ok);
assert_eq!(producer.emitted_total(), 1);
assert_eq!(producer.pending_count(), 1);
}
#[test]
fn test_producer_backpressure_when_no_credits() {
let pool = CreditPool::new(5);
let mut producer = BackpressureProducer::new(pool);
assert!(
producer
.try_emit("first", 5)
.expect("emit should not error")
);
let ok = producer
.try_emit("second", 1)
.expect("emit should not error");
assert!(!ok);
assert_eq!(producer.dropped_total(), 1);
}
#[test]
fn test_producer_drain_yields_pending_items() {
let pool = CreditPool::new(20);
let mut producer = BackpressureProducer::new(pool);
producer.try_emit(1u32, 4).expect("emit ok");
producer.try_emit(2u32, 4).expect("emit ok");
let items: Vec<_> = producer.drain().map(|p| p.item).collect();
assert_eq!(items, vec![1, 2]);
assert_eq!(producer.pending_count(), 0);
}
#[test]
fn test_consumer_consume_increments_count() {
let pool = CreditPool::new(100);
let mut consumer = BackpressureConsumer::new(pool);
consumer.consume(10);
consumer.consume(10);
assert_eq!(consumer.consumed_total(), 2);
}
#[test]
fn test_consumer_consume_releases_credits() {
let pool = CreditPool::new(100);
let consumer_pool = pool.clone();
let mut producer = BackpressureProducer::new(pool);
producer.try_emit("x", 100).expect("emit ok");
assert_eq!(producer.pool().available(), 0);
let mut consumer = BackpressureConsumer::new(consumer_pool);
consumer.consume(50);
assert_eq!(consumer.pool().available(), 50);
}
#[test]
fn test_credit_pool_clone_shares_state() {
let pool = CreditPool::new(100);
let pool2 = pool.clone();
assert!(pool.try_acquire(40));
assert_eq!(pool2.available(), 60);
}
}