use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
use crate::errors::QueueError;
use crate::policy::{AdmissionDecision, EdgePolicy};
use crate::prelude::{BatchView, HeaderStore};
use crate::types::MessageToken;
use core::mem;
pub struct TestSpscRingBuf<const N: usize> {
buf: [MessageToken; N],
head: usize,
tail: usize,
len: usize,
bytes: usize,
}
impl<const N: usize> TestSpscRingBuf<N> {
#[inline]
pub fn new() -> Self {
Self {
buf: core::array::from_fn(|_| MessageToken::default()),
head: 0,
tail: 0,
len: 0,
bytes: 0,
}
}
#[inline]
fn is_full(&self) -> bool {
self.len == N
}
#[inline]
fn push_raw(&mut self, item: MessageToken) {
self.buf[self.tail] = item;
self.tail = (self.tail + 1) % N;
self.len += 1;
}
#[inline]
fn pop_raw(&mut self) -> MessageToken {
let item = mem::take(&mut self.buf[self.head]);
self.head = (self.head + 1) % N;
self.len -= 1;
item
}
fn normalize(&mut self) {
if self.len == 0 {
self.head = 0;
self.tail = 0;
return;
}
if self.head == 0 {
self.tail = (self.head + self.len) % N;
return;
}
for i in 0..self.len {
let src_idx = (self.head + i) % N;
let tmp = mem::take(&mut self.buf[src_idx]);
self.buf[i] = tmp;
}
for i in self.len..N {
let _ = mem::take(&mut self.buf[i]);
}
self.head = 0;
self.tail = (self.head + self.len) % N;
}
}
impl<const N: usize> Edge for TestSpscRingBuf<N> {
fn try_push<H: HeaderStore>(
&mut self,
token: MessageToken,
policy: &EdgePolicy,
headers: &H,
) -> EnqueueResult {
let decision = self.get_admission_decision(policy, token, headers);
let item_bytes = headers
.peek_header(token)
.map(|h| *h.payload_size_bytes())
.unwrap_or(0);
match decision {
AdmissionDecision::Admit => {
if self.is_full() || policy.caps.at_or_above_hard(self.len, self.bytes) {
return EnqueueResult::Rejected;
}
self.bytes = self.bytes.saturating_add(item_bytes);
self.push_raw(token);
EnqueueResult::Enqueued
}
AdmissionDecision::DropNewest => {
EnqueueResult::DroppedNewest
}
AdmissionDecision::Reject => EnqueueResult::Rejected,
AdmissionDecision::Block => {
EnqueueResult::Rejected
}
AdmissionDecision::Evict(_) | AdmissionDecision::EvictUntilBelowHard => {
if self.is_full() || policy.caps.at_or_above_hard(self.len, self.bytes) {
return EnqueueResult::Rejected;
}
self.bytes = self.bytes.saturating_add(item_bytes);
self.push_raw(token);
EnqueueResult::Enqueued
}
}
}
fn try_pop<H: HeaderStore>(&mut self, headers: &H) -> Result<MessageToken, QueueError> {
if self.len == 0 {
return Err(QueueError::Empty);
}
let token = self.pop_raw();
let tok_bytes = headers
.peek_header(token)
.map(|h| *h.payload_size_bytes())
.unwrap_or(0);
self.bytes = self.bytes.saturating_sub(tok_bytes);
Ok(token)
}
fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
let items = self.len;
let bytes = self.bytes;
let watermark = policy.watermark(items, bytes);
EdgeOccupancy {
items,
bytes,
watermark,
}
}
fn is_empty(&self) -> bool {
self.len == 0
}
fn try_peek(&self) -> Result<MessageToken, QueueError> {
if self.len == 0 {
return Err(QueueError::Empty);
}
Ok(self.buf[self.head])
}
#[inline]
fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError> {
if self.len == 0 || index >= self.len {
return Err(QueueError::Empty);
}
let idx = (self.head + index) % N;
Ok(self.buf[idx])
}
fn try_pop_batch<H: HeaderStore>(
&mut self,
policy: &crate::policy::BatchingPolicy,
headers: &H,
) -> Result<BatchView<'_, MessageToken>, QueueError> {
use crate::policy::WindowKind;
if self.len == 0 {
return Err(QueueError::Empty);
}
self.normalize();
let old_len = self.len;
let fixed_opt = *policy.fixed_n();
let delta_t_opt = *policy.max_delta_t();
let window_kind = policy.window_kind();
let effective_fixed: Option<usize> = if fixed_opt.is_none() && delta_t_opt.is_none() {
Some(1)
} else {
fixed_opt
};
let mut delta_count = self.len;
if let Some(cap) = delta_t_opt {
if let Ok(front_header) = headers.peek_header(self.buf[0]) {
let front_ticks = *front_header.creation_tick();
let mut c = 0usize;
while c < self.len {
if let Ok(h) = headers.peek_header(self.buf[c]) {
let tick = *h.creation_tick();
let delta = tick.saturating_sub(front_ticks);
if delta <= cap {
c += 1;
} else {
break;
}
} else {
break;
}
}
delta_count = c;
}
}
let apply_fixed = |limit: usize| -> usize {
if let Some(n) = effective_fixed {
core::cmp::min(limit, n)
} else {
limit
}
};
if let WindowKind::Disjoint = window_kind {
let take_n = apply_fixed(core::cmp::min(self.len, delta_count));
if take_n == 0 {
return Err(QueueError::Empty);
}
let new_head = take_n % N;
self.len = old_len - take_n;
self.head = new_head;
self.tail = (self.head + self.len) % N;
let mut dropped_bytes = 0usize;
for i in 0..take_n {
if let Ok(h) = headers.peek_header(self.buf[i]) {
dropped_bytes = dropped_bytes.saturating_add(*h.payload_size_bytes());
}
}
self.bytes = self.bytes.saturating_sub(dropped_bytes);
let slice = &mut self.buf[..take_n];
return Ok(BatchView::from_borrowed(slice, take_n));
}
if let WindowKind::Sliding(sw) = window_kind {
let stride = *sw.stride();
let size = effective_fixed.unwrap_or(1);
let mut max_present = core::cmp::min(self.len, size);
max_present = apply_fixed(core::cmp::min(max_present, delta_count));
let stride_to_pop = core::cmp::min(stride, self.len);
if max_present == 0 {
return Err(QueueError::Empty);
}
let new_head = stride_to_pop % N;
self.len = old_len - stride_to_pop;
self.head = new_head;
self.tail = (self.head + self.len) % N;
let mut popped_bytes = 0usize;
for i in 0..stride_to_pop {
if let Ok(h) = headers.peek_header(self.buf[i]) {
popped_bytes = popped_bytes.saturating_add(*h.payload_size_bytes());
}
}
self.bytes = self.bytes.saturating_sub(popped_bytes);
let slice = &mut self.buf[..max_present];
return Ok(BatchView::from_borrowed(slice, max_present));
}
let mut take_n = core::cmp::min(self.len, delta_count);
take_n = apply_fixed(take_n);
if take_n == 0 {
return Err(QueueError::Empty);
}
let new_head = take_n % N;
self.len = old_len - take_n;
self.head = new_head;
self.tail = (self.head + self.len) % N;
let mut dropped_bytes = 0usize;
for i in 0..take_n {
if let Ok(h) = headers.peek_header(self.buf[i]) {
dropped_bytes = dropped_bytes.saturating_add(*h.payload_size_bytes());
}
}
self.bytes = self.bytes.saturating_sub(dropped_bytes);
let slice = &mut self.buf[..take_n];
Ok(BatchView::from_borrowed(slice, take_n))
}
}
impl<const N: usize> Default for TestSpscRingBuf<N> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
crate::run_edge_contract_tests!(test_spsc_ring_buf_contract, || {
TestSpscRingBuf::<16>::new()
});
}