use std::{sync::Arc, time::Duration};
use blanket::blanket;
use parking_lot::{Mutex, MutexGuard};
use crate::correlated_randomness::stream::CorrelatedStreamError;
#[derive(Debug, Clone)]
pub struct BufferConfig {
capacity: usize,
max_request_size: usize,
refill_threshold: usize,
}
impl BufferConfig {
pub fn eager(capacity: usize) -> Self {
assert!(capacity > 0, "capacity must be greater than 0");
Self {
capacity,
max_request_size: capacity,
refill_threshold: capacity,
}
}
pub fn eager_with(capacity: usize, max_request_size: usize) -> Self {
assert!(capacity > 0, "capacity must be greater than 0");
Self {
capacity,
max_request_size: max_request_size.min(capacity),
refill_threshold: capacity,
}
}
pub fn lazy(capacity: usize, refill_threshold: usize) -> Self {
assert!(capacity > 0, "capacity must be greater than 0");
Self {
capacity,
max_request_size: capacity,
refill_threshold: refill_threshold.min(capacity),
}
}
pub fn lazy_with(capacity: usize, refill_threshold: usize, max_request_size: usize) -> Self {
assert!(capacity > 0, "capacity must be greater than 0");
Self {
capacity,
max_request_size: max_request_size.min(capacity),
refill_threshold: refill_threshold.min(capacity),
}
}
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
#[inline]
pub fn max_request_size(&self) -> usize {
self.max_request_size
}
#[inline]
pub fn refill_threshold(&self) -> usize {
self.refill_threshold
}
pub fn set_capacity(&mut self, capacity: usize) {
assert!(capacity > 0, "capacity must be greater than 0");
self.capacity = capacity;
if self.max_request_size > capacity {
self.max_request_size = capacity;
}
if self.refill_threshold > capacity {
self.refill_threshold = capacity;
}
}
pub fn set_max_request_size(&mut self, max_request_size: usize) {
self.max_request_size = max_request_size;
if self.max_request_size > self.capacity {
self.capacity = max_request_size;
}
}
pub fn set_refill_threshold(&mut self, refill_threshold: usize) {
self.refill_threshold = refill_threshold;
if self.refill_threshold > self.capacity {
self.capacity = refill_threshold;
}
}
}
pub const LOCK_TIMEOUT: Duration = Duration::from_millis(500);
#[inline]
pub fn try_lock_config(
m: &Mutex<BufferConfig>,
) -> Result<MutexGuard<'_, BufferConfig>, CorrelatedStreamError> {
m.try_lock_for(LOCK_TIMEOUT)
.ok_or(CorrelatedStreamError::LockTimeout {
timeout_ms: LOCK_TIMEOUT.as_millis() as u64,
})
}
#[blanket(derive(Arc, Ref, Mut))]
pub trait Buffer {
fn config(&self) -> &Arc<Mutex<BufferConfig>>;
#[inline]
fn capacity(&self) -> Result<usize, CorrelatedStreamError> {
Ok(try_lock_config(self.config())?.capacity())
}
#[inline]
fn max_request_size(&self) -> Result<usize, CorrelatedStreamError> {
Ok(try_lock_config(self.config())?.max_request_size())
}
#[inline]
fn refill_threshold(&self) -> Result<usize, CorrelatedStreamError> {
Ok(try_lock_config(self.config())?.refill_threshold())
}
#[inline]
fn set_capacity(&self, capacity: usize) -> Result<(), CorrelatedStreamError> {
try_lock_config(self.config())?.set_capacity(capacity);
Ok(())
}
#[inline]
fn set_max_request_size(&self, n: usize) -> Result<(), CorrelatedStreamError> {
try_lock_config(self.config())?.set_max_request_size(n);
Ok(())
}
#[inline]
fn set_refill_threshold(&self, n: usize) -> Result<(), CorrelatedStreamError> {
try_lock_config(self.config())?.set_refill_threshold(n);
Ok(())
}
}