use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
pub struct SharedBufferState {
pub(crate) committed_bytes: AtomicUsize,
pub(crate) committed_ops: AtomicUsize,
pub(crate) reserved_bytes: AtomicUsize,
pub(crate) reserved_ops: AtomicUsize,
pub(crate) active_guards: AtomicUsize,
}
impl SharedBufferState {
#[must_use]
pub fn new() -> Self {
Self {
committed_bytes: AtomicUsize::new(0),
committed_ops: AtomicUsize::new(0),
reserved_bytes: AtomicUsize::new(0),
reserved_ops: AtomicUsize::new(0),
active_guards: AtomicUsize::new(0),
}
}
#[inline]
#[must_use]
pub fn committed_bytes(&self) -> usize {
self.committed_bytes.load(Ordering::Acquire)
}
#[inline]
#[must_use]
pub fn committed_ops(&self) -> usize {
self.committed_ops.load(Ordering::Acquire)
}
#[inline]
#[must_use]
pub fn reserved_bytes(&self) -> usize {
self.reserved_bytes.load(Ordering::Acquire)
}
#[inline]
#[must_use]
pub fn reserved_ops(&self) -> usize {
self.reserved_ops.load(Ordering::Acquire)
}
#[inline]
#[must_use]
pub fn active_guards(&self) -> usize {
self.active_guards.load(Ordering::Acquire)
}
#[inline]
#[must_use]
pub fn total_bytes(&self) -> usize {
self.committed_bytes() + self.reserved_bytes()
}
#[inline]
#[must_use]
pub fn total_ops(&self) -> usize {
self.committed_ops() + self.reserved_ops()
}
#[must_use]
pub fn snapshot(&self) -> BufferStateSnapshot {
BufferStateSnapshot {
committed_bytes: self.committed_bytes(),
committed_ops: self.committed_ops(),
reserved_bytes: self.reserved_bytes(),
reserved_ops: self.reserved_ops(),
active_guards: self.active_guards(),
}
}
#[inline]
#[allow(dead_code)] pub(crate) fn increment_active_guards(&self) {
self.active_guards.fetch_add(1, Ordering::AcqRel);
}
#[inline]
pub(crate) fn decrement_active_guards(&self) {
let prev = self.active_guards.fetch_sub(1, Ordering::AcqRel);
assert!(prev > 0, "active_guards underflow: decrement from 0");
}
#[inline]
#[allow(dead_code)] pub(crate) fn add_reserved(&self, bytes: usize, ops: usize) {
self.reserved_bytes.fetch_add(bytes, Ordering::AcqRel);
self.reserved_ops.fetch_add(ops, Ordering::AcqRel);
}
#[inline]
pub(crate) fn sub_reserved(&self, bytes: usize, ops: usize) {
let prev_bytes = self.reserved_bytes.fetch_sub(bytes, Ordering::AcqRel);
let prev_ops = self.reserved_ops.fetch_sub(ops, Ordering::AcqRel);
assert!(
prev_bytes >= bytes,
"reserved_bytes underflow: {prev_bytes} < {bytes}"
);
assert!(
prev_ops >= ops,
"reserved_ops underflow: {prev_ops} < {ops}"
);
}
pub(crate) fn transfer_reserved_to_committed(
&self,
reserved_bytes: usize,
reserved_ops: usize,
actual_bytes: usize,
actual_ops: usize,
) {
assert!(
actual_bytes <= reserved_bytes,
"actual_bytes {actual_bytes} exceeds reserved_bytes {reserved_bytes}"
);
assert!(
actual_ops <= reserved_ops,
"actual_ops {actual_ops} exceeds reserved_ops {reserved_ops}"
);
let prev_bytes = self
.reserved_bytes
.fetch_sub(reserved_bytes, Ordering::AcqRel);
let prev_ops = self.reserved_ops.fetch_sub(reserved_ops, Ordering::AcqRel);
assert!(
prev_bytes >= reserved_bytes,
"reserved_bytes underflow: {prev_bytes} < {reserved_bytes}"
);
assert!(
prev_ops >= reserved_ops,
"reserved_ops underflow: {prev_ops} < {reserved_ops}"
);
self.committed_bytes
.fetch_add(actual_bytes, Ordering::AcqRel);
self.committed_ops.fetch_add(actual_ops, Ordering::AcqRel);
}
pub fn sub_committed(&self, bytes: usize, ops: usize) {
let prev_bytes = self.committed_bytes.fetch_sub(bytes, Ordering::AcqRel);
let prev_ops = self.committed_ops.fetch_sub(ops, Ordering::AcqRel);
assert!(
prev_bytes >= bytes,
"committed_bytes underflow: {prev_bytes} < {bytes}"
);
assert!(
prev_ops >= ops,
"committed_ops underflow: {prev_ops} < {ops}"
);
}
pub fn reset_counters(
&self,
committed_bytes: usize,
committed_ops: usize,
reserved_bytes: usize,
reserved_ops: usize,
) {
let active = self.active_guards.load(Ordering::Acquire);
assert_eq!(
active, 0,
"Cannot reset counters with {active} active guards"
);
self.committed_bytes
.store(committed_bytes, Ordering::Release);
self.committed_ops.store(committed_ops, Ordering::Release);
self.reserved_bytes.store(reserved_bytes, Ordering::Release);
self.reserved_ops.store(reserved_ops, Ordering::Release);
}
pub fn reset_to_zero(&self) {
self.reset_counters(0, 0, 0, 0);
}
pub fn try_reset_to_zero(&self) -> Result<(), usize> {
let active = self.active_guards.load(Ordering::Acquire);
if active > 0 {
return Err(active);
}
self.committed_bytes.store(0, Ordering::Release);
self.committed_ops.store(0, Ordering::Release);
self.reserved_bytes.store(0, Ordering::Release);
self.reserved_ops.store(0, Ordering::Release);
Ok(())
}
}
impl Default for SharedBufferState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BufferStateSnapshot {
pub committed_bytes: usize,
pub committed_ops: usize,
pub reserved_bytes: usize,
pub reserved_ops: usize,
pub active_guards: usize,
}
impl BufferStateSnapshot {
#[inline]
#[must_use]
pub const fn total_bytes(&self) -> usize {
self.committed_bytes + self.reserved_bytes
}
#[inline]
#[must_use]
pub const fn total_ops(&self) -> usize {
self.committed_ops + self.reserved_ops
}
}
impl fmt::Display for BufferStateSnapshot {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"committed: {} bytes/{} ops, reserved: {} bytes/{} ops, guards: {}",
self.committed_bytes,
self.committed_ops,
self.reserved_bytes,
self.reserved_ops,
self.active_guards
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_state() {
let state = SharedBufferState::new();
assert_eq!(state.committed_bytes(), 0);
assert_eq!(state.committed_ops(), 0);
assert_eq!(state.reserved_bytes(), 0);
assert_eq!(state.reserved_ops(), 0);
assert_eq!(state.active_guards(), 0);
}
#[test]
fn test_default() {
let state: SharedBufferState = SharedBufferState::default();
assert_eq!(state.committed_bytes(), 0);
}
#[test]
fn test_add_reserved() {
let state = SharedBufferState::new();
state.add_reserved(100, 5);
assert_eq!(state.reserved_bytes(), 100);
assert_eq!(state.reserved_ops(), 5);
assert_eq!(state.committed_bytes(), 0);
}
#[test]
fn test_sub_reserved() {
let state = SharedBufferState::new();
state.add_reserved(100, 5);
state.sub_reserved(50, 2);
assert_eq!(state.reserved_bytes(), 50);
assert_eq!(state.reserved_ops(), 3);
}
#[test]
fn test_transfer_reserved_to_committed() {
let state = SharedBufferState::new();
state.add_reserved(100, 5);
state.transfer_reserved_to_committed(100, 5, 80, 4);
assert_eq!(state.reserved_bytes(), 0);
assert_eq!(state.reserved_ops(), 0);
assert_eq!(state.committed_bytes(), 80);
assert_eq!(state.committed_ops(), 4);
}
#[test]
fn test_sub_committed() {
let state = SharedBufferState::new();
state.add_reserved(100, 5);
state.transfer_reserved_to_committed(100, 5, 100, 5);
assert_eq!(state.committed_bytes(), 100);
state.sub_committed(60, 3);
assert_eq!(state.committed_bytes(), 40);
assert_eq!(state.committed_ops(), 2);
}
#[test]
fn test_active_guards() {
let state = SharedBufferState::new();
assert_eq!(state.active_guards(), 0);
state.increment_active_guards();
assert_eq!(state.active_guards(), 1);
state.increment_active_guards();
assert_eq!(state.active_guards(), 2);
state.decrement_active_guards();
assert_eq!(state.active_guards(), 1);
state.decrement_active_guards();
assert_eq!(state.active_guards(), 0);
}
#[test]
fn test_total_bytes_ops() {
let state = SharedBufferState::new();
state.add_reserved(100, 5);
state.transfer_reserved_to_committed(50, 2, 50, 2);
assert_eq!(state.total_bytes(), 50 + 50); assert_eq!(state.total_ops(), 2 + 3);
}
#[test]
fn test_snapshot() {
let state = SharedBufferState::new();
state.add_reserved(100, 5);
state.increment_active_guards();
let snap = state.snapshot();
assert_eq!(snap.reserved_bytes, 100);
assert_eq!(snap.reserved_ops, 5);
assert_eq!(snap.committed_bytes, 0);
assert_eq!(snap.active_guards, 1);
assert_eq!(snap.total_bytes(), 100);
}
#[test]
fn test_reset_counters() {
let state = SharedBufferState::new();
state.add_reserved(100, 5);
state.transfer_reserved_to_committed(100, 5, 100, 5);
state.reset_counters(50, 3, 20, 1);
assert_eq!(state.committed_bytes(), 50);
assert_eq!(state.committed_ops(), 3);
assert_eq!(state.reserved_bytes(), 20);
assert_eq!(state.reserved_ops(), 1);
}
#[test]
fn test_reset_to_zero() {
let state = SharedBufferState::new();
state.add_reserved(100, 5);
state.transfer_reserved_to_committed(100, 5, 100, 5);
state.reset_to_zero();
assert_eq!(state.committed_bytes(), 0);
assert_eq!(state.committed_ops(), 0);
assert_eq!(state.reserved_bytes(), 0);
assert_eq!(state.reserved_ops(), 0);
}
#[test]
#[should_panic(expected = "Cannot reset counters with 1 active guards")]
fn test_reset_with_active_guards_panics() {
let state = SharedBufferState::new();
state.increment_active_guards();
state.reset_to_zero(); }
#[test]
#[should_panic(expected = "committed_bytes underflow")]
fn test_sub_committed_underflow_panics() {
let state = SharedBufferState::new();
state.sub_committed(100, 0); }
#[test]
fn test_snapshot_display() {
let snap = BufferStateSnapshot {
committed_bytes: 100,
committed_ops: 5,
reserved_bytes: 50,
reserved_ops: 2,
active_guards: 1,
};
let display = format!("{snap}");
assert!(display.contains("committed: 100 bytes/5 ops"));
assert!(display.contains("reserved: 50 bytes/2 ops"));
assert!(display.contains("guards: 1"));
}
#[test]
fn test_concurrent_add_reserved() {
use std::sync::Arc;
use std::thread;
let state = Arc::new(SharedBufferState::new());
let mut handles = vec![];
for _ in 0..10 {
let state = Arc::clone(&state);
handles.push(thread::spawn(move || {
state.add_reserved(100, 1);
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(state.reserved_bytes(), 1000);
assert_eq!(state.reserved_ops(), 10);
}
}