use crate::types::DEFAULT_SYMBOL_SIZE;
use core::fmt;
use parking_lot::Mutex;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub symbol_size: u16,
pub initial_size: usize,
pub max_size: usize,
pub allow_growth: bool,
pub growth_increment: usize,
}
impl PoolConfig {
#[must_use]
#[inline]
pub const fn new(
symbol_size: u16,
initial_size: usize,
max_size: usize,
allow_growth: bool,
growth_increment: usize,
) -> Self {
Self {
symbol_size,
initial_size,
max_size,
allow_growth,
growth_increment,
}
}
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
symbol_size: DEFAULT_SYMBOL_SIZE as u16,
initial_size: 0,
max_size: 0,
allow_growth: false,
growth_increment: 0,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PoolStats {
pub allocations: u64,
pub deallocations: u64,
pub pool_hits: u64,
pub pool_misses: u64,
pub peak_usage: usize,
pub current_usage: usize,
pub growth_events: u64,
}
#[derive(Debug)]
pub struct SymbolBuffer {
data: Box<[u8]>,
checked_out: bool,
owner_pool_id: Option<u64>,
}
impl SymbolBuffer {
#[must_use]
#[inline]
pub fn new(symbol_size: u16) -> Self {
let size = usize::from(symbol_size);
Self {
data: vec![0_u8; size].into_boxed_slice(),
checked_out: false,
owner_pool_id: None,
}
}
#[must_use]
#[inline]
pub fn len(&self) -> usize {
self.data.len()
}
#[must_use]
#[inline]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
#[must_use]
#[inline]
pub fn as_slice(&self) -> &[u8] {
&self.data
}
#[inline]
pub fn as_mut_slice(&mut self) -> &mut [u8] {
&mut self.data
}
#[must_use]
pub fn into_boxed_slice(mut self) -> Box<[u8]> {
std::mem::take(&mut self.data)
}
fn mark_checked_out(&mut self, owner_pool_id: u64) {
self.checked_out = true;
self.owner_pool_id = Some(owner_pool_id);
}
fn clear_checkout_state(&mut self) {
self.checked_out = false;
self.owner_pool_id = None;
}
}
impl Drop for SymbolBuffer {
fn drop(&mut self) {
self.data.fill(0);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PoolExhausted;
impl fmt::Display for PoolExhausted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("symbol pool exhausted")
}
}
impl std::error::Error for PoolExhausted {}
#[derive(Debug)]
pub struct SymbolPool {
free_list: Vec<SymbolBuffer>,
allocated: usize,
pool_id: u64,
config: PoolConfig,
stats: PoolStats,
}
impl SymbolPool {
#[must_use]
#[inline]
pub fn new(mut config: PoolConfig) -> Self {
if config.max_size < config.initial_size {
config.max_size = config.initial_size;
}
let mut free_list = Vec::with_capacity(config.initial_size);
for _ in 0..config.initial_size {
free_list.push(SymbolBuffer::new(config.symbol_size));
}
Self {
free_list,
allocated: 0,
pool_id: next_symbol_pool_id(),
config,
stats: PoolStats::default(),
}
}
#[must_use]
#[inline]
pub fn config(&self) -> &PoolConfig {
&self.config
}
#[must_use]
#[inline]
pub fn stats(&self) -> &PoolStats {
&self.stats
}
#[must_use]
#[inline]
pub fn free_count(&self) -> usize {
self.free_list.len()
}
pub fn reset_stats(&mut self) {
self.stats = PoolStats {
allocations: 0,
deallocations: 0,
pool_hits: 0,
pool_misses: 0,
peak_usage: self.allocated,
current_usage: self.allocated,
growth_events: 0,
};
}
pub fn allocate(&mut self) -> Result<SymbolBuffer, PoolExhausted> {
self.try_allocate().ok_or(PoolExhausted)
}
pub fn try_allocate(&mut self) -> Option<SymbolBuffer> {
if let Some(mut buffer) = self.free_list.pop() {
buffer.mark_checked_out(self.pool_id);
self.record_allocation(true);
return Some(buffer);
}
self.stats.pool_misses = self.stats.pool_misses.saturating_add(1);
if self.grow() {
if let Some(mut buffer) = self.free_list.pop() {
buffer.mark_checked_out(self.pool_id);
self.record_allocation(false);
return Some(buffer);
}
}
None
}
pub fn deallocate(&mut self, mut buffer: SymbolBuffer) {
assert_eq!(
buffer.len(),
usize::from(self.config.symbol_size),
"Cannot deallocate buffer of size {} into pool of size {}",
buffer.len(),
self.config.symbol_size
);
assert!(
buffer.checked_out,
"Cannot deallocate buffer that is not currently checked out from a pool"
);
assert_eq!(
buffer.owner_pool_id,
Some(self.pool_id),
"Cannot deallocate buffer checked out from a different pool"
);
if self.allocated > 0 {
self.allocated -= 1;
}
self.stats.deallocations = self.stats.deallocations.saturating_add(1);
self.stats.current_usage = self.allocated;
buffer.as_mut_slice().fill(0);
buffer.clear_checkout_state();
self.free_list.push(buffer);
}
pub fn shrink_to_fit(&mut self) {
let min_free = self.config.initial_size.saturating_sub(self.allocated);
if self.free_list.len() > min_free {
self.free_list.truncate(min_free);
}
}
pub fn warm(&mut self, count: usize) {
let max_free = self.config.max_size.saturating_sub(self.allocated);
let target_free = count.min(max_free);
while self.free_list.len() < target_free {
self.free_list
.push(SymbolBuffer::new(self.config.symbol_size));
}
}
fn grow(&mut self) -> bool {
if !self.config.allow_growth || self.config.growth_increment == 0 {
return false;
}
let total = self.allocated + self.free_list.len();
if total >= self.config.max_size {
return false;
}
let remaining = self.config.max_size - total;
let grow_by = self.config.growth_increment.min(remaining);
for _ in 0..grow_by {
self.free_list
.push(SymbolBuffer::new(self.config.symbol_size));
}
self.stats.growth_events = self.stats.growth_events.saturating_add(1);
true
}
fn record_allocation(&mut self, from_free_list: bool) {
self.allocated = self.allocated.saturating_add(1);
self.stats.allocations = self.stats.allocations.saturating_add(1);
if from_free_list {
self.stats.pool_hits = self.stats.pool_hits.saturating_add(1);
}
self.stats.current_usage = self.allocated;
self.stats.peak_usage = self.stats.peak_usage.max(self.allocated);
}
}
#[inline]
fn next_symbol_pool_id() -> u64 {
static NEXT_SYMBOL_POOL_ID: AtomicU64 = AtomicU64::new(1);
NEXT_SYMBOL_POOL_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResourceKind {
SymbolMemory,
EncodingOps,
DecodingOps,
SymbolsInFlight,
PerObjectMemory,
}
#[derive(Debug, Clone, Default)]
pub struct ResourceLimits {
pub max_symbol_memory: usize,
pub max_encoding_ops: usize,
pub max_decoding_ops: usize,
pub max_symbols_in_flight: usize,
pub max_per_object_memory: usize,
}
impl ResourceLimits {
#[must_use]
#[inline]
pub const fn is_zero(&self) -> bool {
self.max_symbol_memory == 0
&& self.max_encoding_ops == 0
&& self.max_decoding_ops == 0
&& self.max_symbols_in_flight == 0
&& self.max_per_object_memory == 0
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ResourceUsage {
pub symbol_memory: usize,
pub encoding_ops: usize,
pub decoding_ops: usize,
pub symbols_in_flight: usize,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct ResourceRequest {
symbol_memory: usize,
encoding_ops: usize,
decoding_ops: usize,
symbols_in_flight: usize,
}
impl ResourceRequest {
#[must_use]
#[inline]
pub const fn new(
symbol_memory: usize,
encoding_ops: usize,
decoding_ops: usize,
symbols_in_flight: usize,
) -> Self {
Self {
symbol_memory,
encoding_ops,
decoding_ops,
symbols_in_flight,
}
}
#[must_use]
#[inline]
pub const fn symbol_memory(&self) -> usize {
self.symbol_memory
}
}
pub trait ResourceObserver: Send + Sync {
fn on_pressure_change(&self, pressure: f64);
fn on_limit_approached(&self, resource: ResourceKind, usage_percent: f64);
fn on_limit_exceeded(&self, resource: ResourceKind);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResourceExhausted {
SymbolMemory,
EncodingOps,
DecodingOps,
SymbolsInFlight,
PerObjectMemory,
}
impl fmt::Display for ResourceExhausted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let msg = match self {
Self::SymbolMemory => "symbol memory limit exceeded",
Self::EncodingOps => "encoding operations limit exceeded",
Self::DecodingOps => "decoding operations limit exceeded",
Self::SymbolsInFlight => "symbols in flight limit exceeded",
Self::PerObjectMemory => "per-object memory limit exceeded",
};
f.write_str(msg)
}
}
impl std::error::Error for ResourceExhausted {}
pub struct ResourceTracker {
limits: ResourceLimits,
current: ResourceUsage,
observers: Vec<Box<dyn ResourceObserver>>,
}
impl ResourceTracker {
#[must_use]
#[inline]
pub fn new(limits: ResourceLimits) -> Self {
Self {
limits,
current: ResourceUsage::default(),
observers: Vec::new(),
}
}
#[must_use]
#[inline]
pub fn shared(limits: ResourceLimits) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(Self::new(limits)))
}
#[must_use]
#[inline]
pub fn usage(&self) -> &ResourceUsage {
&self.current
}
#[must_use]
#[inline]
pub fn limits(&self) -> &ResourceLimits {
&self.limits
}
#[must_use]
#[inline]
pub fn can_acquire(&self, request: &ResourceRequest) -> bool {
self.check_limits(request).is_ok()
}
pub fn add_observer(&mut self, observer: Box<dyn ResourceObserver>) {
observer.on_pressure_change(self.pressure());
self.observers.push(observer);
}
#[must_use]
#[inline]
pub fn pressure(&self) -> f64 {
let mut max_ratio: f64 = 0.0;
max_ratio = max_ratio.max(ratio(
self.current.symbol_memory,
self.limits.max_symbol_memory,
));
max_ratio = max_ratio.max(ratio(
self.current.encoding_ops,
self.limits.max_encoding_ops,
));
max_ratio = max_ratio.max(ratio(
self.current.decoding_ops,
self.limits.max_decoding_ops,
));
max_ratio = max_ratio.max(ratio(
self.current.symbols_in_flight,
self.limits.max_symbols_in_flight,
));
max_ratio.min(1.0)
}
pub fn try_acquire_encoding(
tracker: &Arc<Mutex<Self>>,
memory_needed: usize,
) -> Result<ResourceGuard, ResourceExhausted> {
let request = ResourceRequest::new(memory_needed, 1, 0, 0);
Self::try_acquire(tracker, request)
}
pub fn try_acquire_decoding(
tracker: &Arc<Mutex<Self>>,
memory_needed: usize,
) -> Result<ResourceGuard, ResourceExhausted> {
let request = ResourceRequest::new(memory_needed, 0, 1, 0);
Self::try_acquire(tracker, request)
}
pub fn try_acquire(
tracker: &Arc<Mutex<Self>>,
request: ResourceRequest,
) -> Result<ResourceGuard, ResourceExhausted> {
let mut guard = tracker.lock();
guard.check_limits(&request)?;
guard.current.symbol_memory = guard
.current
.symbol_memory
.saturating_add(request.symbol_memory);
guard.current.encoding_ops = guard
.current
.encoding_ops
.saturating_add(request.encoding_ops);
guard.current.decoding_ops = guard
.current
.decoding_ops
.saturating_add(request.decoding_ops);
guard.current.symbols_in_flight = guard
.current
.symbols_in_flight
.saturating_add(request.symbols_in_flight);
guard.notify_observers();
drop(guard);
Ok(ResourceGuard {
tracker: Arc::clone(tracker),
acquired: ResourceUsage {
symbol_memory: request.symbol_memory,
encoding_ops: request.encoding_ops,
decoding_ops: request.decoding_ops,
symbols_in_flight: request.symbols_in_flight,
},
})
}
fn release_locked(&mut self, acquired: &ResourceUsage) {
self.current.symbol_memory = self
.current
.symbol_memory
.saturating_sub(acquired.symbol_memory);
self.current.encoding_ops = self
.current
.encoding_ops
.saturating_sub(acquired.encoding_ops);
self.current.decoding_ops = self
.current
.decoding_ops
.saturating_sub(acquired.decoding_ops);
self.current.symbols_in_flight = self
.current
.symbols_in_flight
.saturating_sub(acquired.symbols_in_flight);
self.notify_observers();
}
fn check_limits(&self, request: &ResourceRequest) -> Result<(), ResourceExhausted> {
if exceeds(self.limits.max_per_object_memory, request.symbol_memory) {
return Err(ResourceExhausted::PerObjectMemory);
}
if exceeds_with_current(
self.limits.max_symbol_memory,
self.current.symbol_memory,
request.symbol_memory,
) {
return Err(ResourceExhausted::SymbolMemory);
}
if exceeds_with_current(
self.limits.max_encoding_ops,
self.current.encoding_ops,
request.encoding_ops,
) {
return Err(ResourceExhausted::EncodingOps);
}
if exceeds_with_current(
self.limits.max_decoding_ops,
self.current.decoding_ops,
request.decoding_ops,
) {
return Err(ResourceExhausted::DecodingOps);
}
if exceeds_with_current(
self.limits.max_symbols_in_flight,
self.current.symbols_in_flight,
request.symbols_in_flight,
) {
return Err(ResourceExhausted::SymbolsInFlight);
}
Ok(())
}
fn notify_observers(&self) {
let pressure = self.pressure();
for observer in &self.observers {
observer.on_pressure_change(pressure);
}
self.notify_limit(
ResourceKind::SymbolMemory,
self.current.symbol_memory,
self.limits.max_symbol_memory,
);
self.notify_limit(
ResourceKind::EncodingOps,
self.current.encoding_ops,
self.limits.max_encoding_ops,
);
self.notify_limit(
ResourceKind::DecodingOps,
self.current.decoding_ops,
self.limits.max_decoding_ops,
);
self.notify_limit(
ResourceKind::SymbolsInFlight,
self.current.symbols_in_flight,
self.limits.max_symbols_in_flight,
);
}
#[allow(clippy::cast_precision_loss)]
fn notify_limit(&self, kind: ResourceKind, usage: usize, limit: usize) {
if limit == 0 {
if usage > 0 {
for observer in &self.observers {
observer.on_limit_exceeded(kind);
}
}
return;
}
let usage_ratio = (usage as f64) / (limit as f64);
if usage_ratio >= 1.0 {
for observer in &self.observers {
observer.on_limit_exceeded(kind);
}
} else if usage_ratio >= 0.9 {
for observer in &self.observers {
observer.on_limit_approached(kind, usage_ratio);
}
}
}
}
impl fmt::Debug for ResourceTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ResourceTracker")
.field("limits", &self.limits)
.field("current", &self.current)
.field("observer_count", &self.observers.len())
.finish()
}
}
#[derive(Debug)]
pub struct ResourceGuard {
tracker: Arc<Mutex<ResourceTracker>>,
acquired: ResourceUsage,
}
impl Drop for ResourceGuard {
fn drop(&mut self) {
let mut tracker = self.tracker.lock();
tracker.release_locked(&self.acquired);
}
}
#[inline]
fn exceeds(limit: usize, requested: usize) -> bool {
limit == 0 && requested > 0 || (limit > 0 && requested > limit)
}
#[inline]
fn exceeds_with_current(limit: usize, current: usize, requested: usize) -> bool {
if limit == 0 {
return requested > 0;
}
current.saturating_add(requested) > limit
}
#[allow(clippy::cast_precision_loss)]
#[inline]
fn ratio(usage: usize, limit: usize) -> f64 {
if limit == 0 {
if usage == 0 { 0.0 } else { 1.0 }
} else {
(usage as f64) / (limit as f64)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn pool_allocate_deallocate() {
let config = PoolConfig::new(64, 1, 1, false, 0);
let mut pool = SymbolPool::new(config);
let buffer = pool.allocate().expect("expected allocation");
assert_eq!(buffer.len(), 64);
pool.deallocate(buffer);
assert_eq!(pool.stats().current_usage, 0);
}
#[test]
fn pool_exhaustion() {
let config = PoolConfig::new(32, 1, 1, false, 0);
let mut pool = SymbolPool::new(config);
let _ = pool.allocate().expect("first allocation");
assert!(matches!(pool.allocate(), Err(PoolExhausted)));
}
#[test]
fn pool_growth() {
let config = PoolConfig::new(16, 0, 2, true, 2);
let mut pool = SymbolPool::new(config);
let _ = pool.allocate().expect("allocation after growth");
assert_eq!(pool.stats().growth_events, 1);
assert_eq!(pool.stats().pool_misses, 1);
}
#[test]
fn pool_no_growth_when_disabled() {
let config = PoolConfig::new(16, 0, 2, false, 2);
let mut pool = SymbolPool::new(config);
assert!(matches!(pool.allocate(), Err(PoolExhausted)));
}
#[test]
fn pool_max_size_respected() {
let config = PoolConfig::new(16, 1, 1, true, 1);
let mut pool = SymbolPool::new(config);
let _ = pool.allocate().expect("first allocation");
assert!(matches!(pool.allocate(), Err(PoolExhausted)));
}
#[test]
fn pool_stats_tracking() {
let config = PoolConfig::new(8, 2, 2, false, 0);
let mut pool = SymbolPool::new(config);
let a = pool.allocate().expect("alloc a");
let b = pool.allocate().expect("alloc b");
pool.deallocate(a);
pool.deallocate(b);
assert_eq!(pool.stats().allocations, 2);
assert_eq!(pool.stats().deallocations, 2);
assert_eq!(pool.stats().pool_hits, 2);
}
#[test]
fn pool_deallocate_rejects_foreign_sized_buffers_without_mutating_state() {
let config = PoolConfig::new(8, 1, 1, false, 0);
let mut pool = SymbolPool::new(config);
let valid = pool.allocate().expect("alloc valid");
for invalid_len in [4_u16, 16_u16] {
let result = catch_unwind(AssertUnwindSafe(|| {
pool.deallocate(SymbolBuffer::new(invalid_len));
}));
assert!(
result.is_err(),
"invalid buffer length {invalid_len} must panic"
);
assert_eq!(pool.stats().deallocations, 0);
assert_eq!(pool.stats().current_usage, 1);
assert_eq!(pool.free_count(), 0);
}
pool.deallocate(valid);
assert_eq!(pool.stats().deallocations, 1);
assert_eq!(pool.stats().current_usage, 0);
assert_eq!(pool.free_count(), 1);
}
#[test]
fn pool_deallocate_rejects_unchecked_or_cross_pool_buffers_without_mutating_state() {
let config = PoolConfig::new(8, 1, 1, false, 0);
let mut pool = SymbolPool::new(config.clone());
let valid = pool.allocate().expect("alloc valid");
let mut other_pool = SymbolPool::new(config);
let foreign = other_pool.allocate().expect("foreign alloc");
for invalid in [SymbolBuffer::new(8), foreign] {
let result = catch_unwind(AssertUnwindSafe(|| {
pool.deallocate(invalid);
}));
assert!(result.is_err(), "invalid buffer return must panic");
assert_eq!(pool.stats().deallocations, 0);
assert_eq!(pool.stats().current_usage, 1);
assert_eq!(pool.free_count(), 0);
}
pool.deallocate(valid);
assert_eq!(pool.stats().deallocations, 1);
assert_eq!(pool.stats().current_usage, 0);
assert_eq!(pool.free_count(), 1);
assert_eq!(other_pool.stats().current_usage, 1);
assert_eq!(other_pool.free_count(), 0);
}
#[test]
fn pool_shrink_and_warm() {
let config = PoolConfig::new(8, 1, 4, true, 1);
let mut pool = SymbolPool::new(config);
pool.warm(4);
assert_eq!(pool.free_count(), 4);
let _ = pool.allocate().expect("allocation");
pool.shrink_to_fit();
assert!(pool.free_count() <= 1);
}
#[test]
fn pool_reuses_zeroed_buffers_after_deallocate() {
let config = PoolConfig::new(8, 1, 1, false, 0);
let mut pool = SymbolPool::new(config);
let mut buffer = pool.allocate().expect("initial allocation");
buffer.as_mut_slice().fill(0xAA);
pool.deallocate(buffer);
let reused = pool.allocate().expect("reused allocation");
assert!(
reused.as_slice().iter().all(|byte| *byte == 0),
"recycled buffers must not retain prior payload bytes"
);
}
#[test]
fn resource_acquire_within_limits() {
let limits = ResourceLimits {
max_symbol_memory: 100,
max_encoding_ops: 1,
max_decoding_ops: 1,
max_symbols_in_flight: 10,
max_per_object_memory: 100,
};
let tracker = ResourceTracker::shared(limits);
let guard =
ResourceTracker::try_acquire_encoding(&tracker, 50).expect("expected acquisition");
drop(guard);
let usage = tracker.lock().usage().clone();
assert_eq!(usage.symbol_memory, 0);
}
#[test]
fn resource_acquire_exceeds_limit() {
let limits = ResourceLimits {
max_symbol_memory: 10,
max_encoding_ops: 1,
max_decoding_ops: 1,
max_symbols_in_flight: 10,
max_per_object_memory: 10,
};
let tracker = ResourceTracker::shared(limits);
let err = ResourceTracker::try_acquire_encoding(&tracker, 20)
.expect_err("expected limit failure");
assert_eq!(err, ResourceExhausted::PerObjectMemory);
}
#[test]
fn resource_guard_releases_on_drop() {
let limits = ResourceLimits {
max_symbol_memory: 100,
max_encoding_ops: 1,
max_decoding_ops: 1,
max_symbols_in_flight: 1,
max_per_object_memory: 100,
};
let tracker = ResourceTracker::shared(limits);
{
let _guard = ResourceTracker::try_acquire_encoding(&tracker, 10).expect("acquire");
}
let usage = tracker.lock().usage().clone();
assert_eq!(usage.symbol_memory, 0);
assert_eq!(usage.encoding_ops, 0);
}
#[test]
fn resource_zero_limit_fails() {
let limits = ResourceLimits {
max_symbol_memory: 0,
max_encoding_ops: 0,
max_decoding_ops: 0,
max_symbols_in_flight: 0,
max_per_object_memory: 0,
};
let tracker = ResourceTracker::shared(limits);
let err = ResourceTracker::try_acquire_encoding(&tracker, 1).expect_err("expected failure");
assert_eq!(err, ResourceExhausted::PerObjectMemory);
}
struct TestObserver {
pressure_calls: Arc<AtomicUsize>,
limit_calls: Arc<AtomicUsize>,
}
impl TestObserver {
fn new(pressure_calls: Arc<AtomicUsize>, limit_calls: Arc<AtomicUsize>) -> Self {
Self {
pressure_calls,
limit_calls,
}
}
}
impl ResourceObserver for TestObserver {
fn on_pressure_change(&self, _pressure: f64) {
self.pressure_calls.fetch_add(1, Ordering::Relaxed);
}
fn on_limit_approached(&self, _resource: ResourceKind, _usage_percent: f64) {
self.limit_calls.fetch_add(1, Ordering::Relaxed);
}
fn on_limit_exceeded(&self, _resource: ResourceKind) {
self.limit_calls.fetch_add(1, Ordering::Relaxed);
}
}
#[test]
fn resource_observer_notified() {
let limits = ResourceLimits {
max_symbol_memory: 10,
max_encoding_ops: 1,
max_decoding_ops: 1,
max_symbols_in_flight: 1,
max_per_object_memory: 10,
};
let tracker = ResourceTracker::shared(limits);
let pressure_calls = Arc::new(AtomicUsize::new(0));
let limit_calls = Arc::new(AtomicUsize::new(0));
let observer = Box::new(TestObserver::new(
Arc::clone(&pressure_calls),
Arc::clone(&limit_calls),
));
tracker.lock().add_observer(observer);
let _guard = ResourceTracker::try_acquire_encoding(&tracker, 9).expect("acquire");
assert!(pressure_calls.load(Ordering::Relaxed) > 0);
assert!(limit_calls.load(Ordering::Relaxed) > 0);
}
#[test]
fn pressure_ignores_per_object_limit_as_aggregate() {
let limits = ResourceLimits {
max_symbol_memory: 1000,
max_encoding_ops: 10,
max_decoding_ops: 10,
max_symbols_in_flight: 100,
max_per_object_memory: 100,
};
let tracker = ResourceTracker::shared(limits);
let guard = ResourceTracker::try_acquire(&tracker, ResourceRequest::new(90, 0, 0, 0))
.expect("acquire should satisfy per-object limit");
let pressure = tracker.lock().pressure();
assert!(
(pressure - 0.09).abs() < f64::EPSILON,
"pressure={pressure}"
);
drop(guard);
}
#[test]
fn observer_does_not_emit_per_object_limit_for_aggregate_usage() {
let limits = ResourceLimits {
max_symbol_memory: 1000,
max_encoding_ops: 10,
max_decoding_ops: 10,
max_symbols_in_flight: 100,
max_per_object_memory: 100,
};
let tracker = ResourceTracker::shared(limits);
let pressure_calls = Arc::new(AtomicUsize::new(0));
let limit_calls = Arc::new(AtomicUsize::new(0));
let observer = Box::new(TestObserver::new(
Arc::clone(&pressure_calls),
Arc::clone(&limit_calls),
));
tracker.lock().add_observer(observer);
let _guard = ResourceTracker::try_acquire(&tracker, ResourceRequest::new(90, 0, 0, 0))
.expect("acquire should satisfy per-object limit");
assert!(pressure_calls.load(Ordering::Relaxed) > 0);
assert_eq!(
limit_calls.load(Ordering::Relaxed),
0,
"aggregate usage should not trigger per-object notifications"
);
}
#[test]
fn resource_tracker_lock_survives_panicking_holder() {
let limits = ResourceLimits {
max_symbol_memory: 100,
max_encoding_ops: 1,
max_decoding_ops: 1,
max_symbols_in_flight: 1,
max_per_object_memory: 100,
};
let tracker = ResourceTracker::shared(limits);
let tracker_clone = Arc::clone(&tracker);
let panicked = std::thread::spawn(move || {
let _guard = tracker_clone.lock();
panic!("simulate panic while holding resource tracker lock");
})
.join();
assert!(panicked.is_err(), "panic thread should panic");
let guard =
ResourceTracker::try_acquire_encoding(&tracker, 10).expect("acquire after panic");
drop(guard);
let usage = tracker.lock().usage().clone();
assert_eq!(usage.symbol_memory, 0);
assert_eq!(usage.encoding_ops, 0);
}
#[test]
fn pool_config_default() {
let cfg = PoolConfig::default();
assert_eq!(cfg.symbol_size, DEFAULT_SYMBOL_SIZE as u16);
assert_eq!(cfg.initial_size, 0);
assert_eq!(cfg.max_size, 0);
assert!(!cfg.allow_growth);
assert_eq!(cfg.growth_increment, 0);
}
#[test]
fn pool_config_debug() {
let cfg = PoolConfig::new(256, 10, 100, true, 5);
let dbg = format!("{cfg:?}");
assert!(dbg.contains("PoolConfig"), "{dbg}");
assert!(dbg.contains("256"), "{dbg}");
}
#[test]
fn pool_stats_default() {
let stats = PoolStats::default();
assert_eq!(stats.allocations, 0);
assert_eq!(stats.deallocations, 0);
assert_eq!(stats.pool_hits, 0);
assert_eq!(stats.pool_misses, 0);
assert_eq!(stats.peak_usage, 0);
assert_eq!(stats.current_usage, 0);
assert_eq!(stats.growth_events, 0);
}
#[test]
fn pool_stats_debug() {
let stats = PoolStats {
allocations: 5,
..PoolStats::default()
};
let dbg = format!("{stats:?}");
assert!(dbg.contains("PoolStats"), "{dbg}");
assert!(dbg.contains('5'), "{dbg}");
}
#[test]
fn symbol_buffer_operations() {
let mut buf = SymbolBuffer::new(16);
assert_eq!(buf.len(), 16);
assert!(!buf.is_empty());
assert_eq!(buf.as_slice().len(), 16);
assert!(buf.as_slice().iter().all(|&b| b == 0));
buf.as_mut_slice()[0] = 42;
assert_eq!(buf.as_slice()[0], 42);
let boxed = buf.into_boxed_slice();
assert_eq!(boxed.len(), 16);
assert_eq!(boxed[0], 42);
}
#[test]
fn symbol_buffer_zero_size() {
let buf = SymbolBuffer::new(0);
assert_eq!(buf.len(), 0);
assert!(buf.is_empty());
}
#[test]
fn symbol_buffer_debug() {
let buf = SymbolBuffer::new(4);
let dbg = format!("{buf:?}");
assert!(dbg.contains("SymbolBuffer"), "{dbg}");
}
#[test]
fn pool_exhausted_display_and_error() {
let err = PoolExhausted;
assert_eq!(err.to_string(), "symbol pool exhausted");
assert!(std::error::Error::source(&err).is_none());
assert_eq!(err, PoolExhausted);
}
#[test]
fn resource_exhausted_display_all_variants() {
let cases = [
(ResourceExhausted::SymbolMemory, "symbol memory"),
(ResourceExhausted::EncodingOps, "encoding operations"),
(ResourceExhausted::DecodingOps, "decoding operations"),
(ResourceExhausted::SymbolsInFlight, "symbols in flight"),
(ResourceExhausted::PerObjectMemory, "per-object memory"),
];
for (variant, expected_substr) in cases {
let msg = variant.to_string();
assert!(msg.contains(expected_substr), "{msg}");
assert!(std::error::Error::source(&variant).is_none());
}
}
#[test]
fn resource_kind_debug_all_variants() {
let kinds = [
ResourceKind::SymbolMemory,
ResourceKind::EncodingOps,
ResourceKind::DecodingOps,
ResourceKind::SymbolsInFlight,
ResourceKind::PerObjectMemory,
];
for kind in &kinds {
let dbg = format!("{kind:?}");
assert!(!dbg.is_empty());
}
}
#[test]
fn resource_limits_default_is_zero() {
let limits = ResourceLimits::default();
assert!(limits.is_zero());
}
#[test]
fn resource_limits_non_zero() {
let limits = ResourceLimits {
max_symbol_memory: 1,
..ResourceLimits::default()
};
assert!(!limits.is_zero());
}
#[test]
fn resource_usage_default() {
let usage = ResourceUsage::default();
assert_eq!(usage.symbol_memory, 0);
assert_eq!(usage.encoding_ops, 0);
assert_eq!(usage.decoding_ops, 0);
assert_eq!(usage.symbols_in_flight, 0);
}
#[test]
fn resource_request_default_and_accessor() {
let req = ResourceRequest::default();
assert_eq!(req.symbol_memory(), 0);
let req = ResourceRequest::new(1024, 2, 3, 10);
assert_eq!(req.symbol_memory(), 1024);
}
#[test]
fn resource_tracker_debug() {
let limits = ResourceLimits::default();
let tracker = ResourceTracker::new(limits);
let dbg = format!("{tracker:?}");
assert!(dbg.contains("ResourceTracker"), "{dbg}");
}
#[test]
fn resource_tracker_pressure_with_zero_limits() {
let limits = ResourceLimits::default();
let tracker = ResourceTracker::new(limits);
assert!(
(tracker.pressure()).abs() < f64::EPSILON,
"expected 0.0, got {}",
tracker.pressure()
);
}
#[test]
fn resource_tracker_can_acquire_predicate() {
let limits = ResourceLimits {
max_symbol_memory: 100,
max_encoding_ops: 1,
max_decoding_ops: 1,
max_symbols_in_flight: 10,
max_per_object_memory: 100,
};
let tracker = ResourceTracker::new(limits);
let req = ResourceRequest::new(50, 1, 0, 0);
assert!(tracker.can_acquire(&req));
let req = ResourceRequest::new(200, 1, 0, 0);
assert!(!tracker.can_acquire(&req));
}
#[test]
fn resource_guard_debug() {
let limits = ResourceLimits {
max_symbol_memory: 100,
max_encoding_ops: 1,
max_decoding_ops: 1,
max_symbols_in_flight: 10,
max_per_object_memory: 100,
};
let tracker = ResourceTracker::shared(limits);
let guard = ResourceTracker::try_acquire_encoding(&tracker, 10).expect("acquire");
let dbg = format!("{guard:?}");
assert!(dbg.contains("ResourceGuard"), "{dbg}");
}
#[test]
fn pool_reset_stats() {
let config = PoolConfig::new(8, 2, 2, false, 0);
let mut pool = SymbolPool::new(config);
let buf = pool.allocate().expect("alloc");
pool.deallocate(buf);
assert!(pool.stats().allocations > 0);
pool.reset_stats();
assert_eq!(pool.stats().allocations, 0);
assert_eq!(pool.stats().deallocations, 0);
}
#[test]
fn pool_peak_usage_tracking() {
let config = PoolConfig::new(8, 4, 4, false, 0);
let mut pool = SymbolPool::new(config);
let a = pool.allocate().expect("a");
let b = pool.allocate().expect("b");
let c = pool.allocate().expect("c");
assert_eq!(pool.stats().peak_usage, 3);
pool.deallocate(a);
pool.deallocate(b);
assert_eq!(pool.stats().peak_usage, 3);
assert_eq!(pool.stats().current_usage, 1);
pool.deallocate(c);
}
#[test]
fn try_acquire_decoding() {
let limits = ResourceLimits {
max_symbol_memory: 100,
max_encoding_ops: 1,
max_decoding_ops: 1,
max_symbols_in_flight: 10,
max_per_object_memory: 100,
};
let tracker = ResourceTracker::shared(limits);
let guard = ResourceTracker::try_acquire_decoding(&tracker, 50).expect("decode acquire");
{
let t = tracker.lock();
assert_eq!(t.usage().decoding_ops, 1);
assert_eq!(t.usage().symbol_memory, 50);
drop(t);
}
drop(guard);
let t = tracker.lock();
assert_eq!(t.usage().decoding_ops, 0);
drop(t);
}
#[test]
fn pool_config_max_clamp() {
let config = PoolConfig::new(8, 5, 2, false, 0);
let pool = SymbolPool::new(config);
assert!(pool.config().max_size >= pool.config().initial_size);
}
#[test]
fn pool_stats_debug_clone_default() {
let s = PoolStats::default();
assert_eq!(s.allocations, 0);
assert_eq!(s.current_usage, 0);
let s2 = s;
let dbg = format!("{s2:?}");
assert!(dbg.contains("PoolStats"));
}
#[test]
fn pool_exhausted_debug_clone_copy_eq() {
let e = PoolExhausted;
let e2 = e; let e3 = e;
assert_eq!(e, e2);
assert_eq!(e2, e3);
let dbg = format!("{e:?}");
assert!(dbg.contains("PoolExhausted"));
}
#[test]
fn resource_kind_debug_clone_copy_eq() {
let k = ResourceKind::SymbolMemory;
let k2 = k; assert_eq!(k, k2);
assert_ne!(k, ResourceKind::EncodingOps);
let dbg = format!("{k:?}");
assert!(dbg.contains("SymbolMemory"));
}
#[test]
fn resource_limits_debug_clone_default() {
let l = ResourceLimits::default();
assert!(l.is_zero());
let l2 = l;
assert_eq!(l2.max_symbol_memory, 0);
let dbg = format!("{l2:?}");
assert!(dbg.contains("ResourceLimits"));
}
#[test]
fn resource_usage_debug_clone_default_eq() {
let u = ResourceUsage::default();
let u2 = u.clone();
assert_eq!(u, u2);
assert_eq!(u.symbol_memory, 0);
let dbg = format!("{u:?}");
assert!(dbg.contains("ResourceUsage"));
}
#[test]
fn resource_request_debug_clone_copy_default() {
let r = ResourceRequest::default();
let r2 = r; let r3 = r;
let _ = r2;
let _ = r3;
let dbg = format!("{r:?}");
assert!(dbg.contains("ResourceRequest"));
}
#[test]
fn resource_exhausted_debug_clone_copy_eq() {
let e = ResourceExhausted::SymbolMemory;
let e2 = e; let e3 = e;
assert_eq!(e, e2);
assert_eq!(e2, e3);
assert_ne!(e, ResourceExhausted::DecodingOps);
let dbg = format!("{e:?}");
assert!(dbg.contains("SymbolMemory"));
}
}