#![allow(unsafe_code)]
use core::ptr;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
use crate::errors::QueueError;
use crate::policy::EdgePolicy;
use crate::prelude::{AdmissionDecision, BatchView, HeaderStore};
use crate::types::MessageToken;
pub struct SpscAtomicRing {
buf: Box<[MaybeUninit<MessageToken>]>,
cap: usize,
head: AtomicUsize, tail: AtomicUsize, bytes_in_queue: AtomicUsize,
}
impl SpscAtomicRing {
pub unsafe fn with_capacity(capacity: usize) -> Self {
assert!(
capacity.is_power_of_two(),
"capacity must be a power of two"
);
let mut v: Vec<MaybeUninit<MessageToken>> = Vec::with_capacity(capacity);
unsafe {
v.set_len(capacity);
}
Self {
buf: v.into_boxed_slice(),
cap: capacity,
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
bytes_in_queue: AtomicUsize::new(0),
}
}
#[inline]
fn mask(&self) -> usize {
self.cap - 1
}
#[inline]
fn len(&self) -> usize {
let h = self.head.load(Ordering::Acquire);
let t = self.tail.load(Ordering::Acquire);
t.wrapping_sub(h) & self.mask()
}
#[inline]
fn is_full(&self) -> bool {
self.len() == self.cap - 1
}
#[inline]
fn push_raw(&self, token: MessageToken) {
let t = self.tail.load(Ordering::Relaxed);
let idx = t & self.mask();
let base: *mut MaybeUninit<MessageToken> =
self.buf.as_ptr() as *mut MaybeUninit<MessageToken>;
let slot: *mut MessageToken = unsafe { base.add(idx) as *mut MessageToken };
unsafe { ptr::write(slot, token) };
self.tail.store(t.wrapping_add(1), Ordering::Release);
}
#[inline]
fn pop_raw(&self) -> MessageToken {
let h = self.head.load(Ordering::Relaxed);
let idx = h & self.mask();
let base: *const MaybeUninit<MessageToken> = self.buf.as_ptr();
let slot: *const MessageToken = unsafe { base.add(idx) as *const MessageToken };
let token = unsafe { ptr::read(slot) };
self.head.store(h.wrapping_add(1), Ordering::Release);
token
}
#[inline]
fn peek_ref_at_offset(&self, offset: usize) -> &MessageToken {
let h = self.head.load(Ordering::Acquire);
let idx = h.wrapping_add(offset) & self.mask();
let base: *const MaybeUninit<MessageToken> = self.buf.as_ptr();
let slot: *const MessageToken = unsafe { base.add(idx) as *const MessageToken };
unsafe { &*slot }
}
}
impl Drop for SpscAtomicRing {
fn drop(&mut self) {
while self.len() > 0 {
let _ = self.pop_raw();
}
}
}
impl Edge for SpscAtomicRing {
fn try_push<H: HeaderStore>(
&mut self,
token: MessageToken,
policy: &EdgePolicy,
headers: &H,
) -> EnqueueResult {
let decision = self.get_admission_decision(policy, token, headers);
let item_bytes = headers
.peek_header(token)
.map(|h| *h.payload_size_bytes())
.unwrap_or(0);
match decision {
AdmissionDecision::Admit => {
let items = self.len();
let bytes = self.bytes_in_queue.load(Ordering::Acquire);
if self.is_full() || policy.caps.at_or_above_hard(items, bytes) {
return EnqueueResult::Rejected;
}
self.bytes_in_queue.fetch_add(item_bytes, Ordering::AcqRel);
self.push_raw(token);
EnqueueResult::Enqueued
}
AdmissionDecision::DropNewest => EnqueueResult::DroppedNewest,
AdmissionDecision::Reject => EnqueueResult::Rejected,
AdmissionDecision::Block => EnqueueResult::Rejected,
AdmissionDecision::Evict(_) | AdmissionDecision::EvictUntilBelowHard => {
let items = self.len();
let bytes = self.bytes_in_queue.load(Ordering::Acquire);
if self.is_full() || policy.caps.at_or_above_hard(items, bytes) {
return EnqueueResult::Rejected;
}
self.bytes_in_queue.fetch_add(item_bytes, Ordering::AcqRel);
self.push_raw(token);
EnqueueResult::Enqueued
}
}
}
fn try_pop<H: HeaderStore>(&mut self, headers: &H) -> Result<MessageToken, QueueError> {
if self.len() == 0 {
return Err(QueueError::Empty);
}
let token = self.pop_raw();
let tok_bytes = headers
.peek_header(token)
.map(|h| *h.payload_size_bytes())
.unwrap_or(0);
self.bytes_in_queue.fetch_sub(tok_bytes, Ordering::AcqRel);
Ok(token)
}
fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
let items = self.len();
let bytes = self.bytes_in_queue.load(Ordering::Acquire);
let watermark = policy.watermark(items, bytes);
EdgeOccupancy::new(items, bytes, watermark)
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn try_peek(&self) -> Result<MessageToken, QueueError> {
if self.len() == 0 {
return Err(QueueError::Empty);
}
let t = self.peek_ref_at_offset(0);
Ok(*t)
}
fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError> {
let available = self.len();
if index >= available {
return Err(QueueError::Empty);
}
let t = self.peek_ref_at_offset(index);
Ok(*t)
}
fn try_pop_batch<H: HeaderStore>(
&mut self,
policy: &crate::policy::BatchingPolicy,
headers: &H,
) -> Result<BatchView<'_, MessageToken>, QueueError> {
use crate::policy::WindowKind;
let available = self.len();
if available == 0 {
return Err(QueueError::Empty);
}
let fixed_opt = *policy.fixed_n();
let delta_t_opt = *policy.max_delta_t();
let window_kind = policy.window_kind();
let effective_fixed: Option<usize> = if fixed_opt.is_none() && delta_t_opt.is_none() {
Some(1)
} else {
fixed_opt
};
let mut delta_count = available;
if let Some(cap) = delta_t_opt {
if let Ok(front_header) = headers.peek_header(*self.peek_ref_at_offset(0)) {
let front_ticks = *front_header.creation_tick();
let mut c = 0usize;
while c < available {
if let Ok(h) = headers.peek_header(*self.peek_ref_at_offset(c)) {
let tick = *h.creation_tick();
let delta = tick.saturating_sub(front_ticks);
if delta <= cap {
c += 1;
} else {
break;
}
} else {
break;
}
}
delta_count = c;
}
}
let apply_fixed = |limit: usize| -> usize {
if let Some(n) = effective_fixed {
core::cmp::min(limit, n)
} else {
limit
}
};
if let WindowKind::Disjoint = window_kind {
let take_n = apply_fixed(core::cmp::min(available, delta_count));
if take_n == 0 {
return Err(QueueError::Empty);
}
let mut out: alloc::vec::Vec<MessageToken> = alloc::vec::Vec::with_capacity(take_n);
for _ in 0..take_n {
let tok = self.pop_raw();
let tok_bytes = headers
.peek_header(tok)
.map(|h| *h.payload_size_bytes())
.unwrap_or(0);
self.bytes_in_queue.fetch_sub(tok_bytes, Ordering::AcqRel);
out.push(tok);
}
return Ok(BatchView::from_owned(out));
}
if let WindowKind::Sliding(sw) = window_kind {
let stride = *sw.stride();
let size = effective_fixed.unwrap_or(1);
let mut max_present = core::cmp::min(available, size);
max_present = apply_fixed(core::cmp::min(max_present, delta_count));
if max_present == 0 {
return Err(QueueError::Empty);
}
let stride_to_pop = core::cmp::min(stride, available);
let mut out: alloc::vec::Vec<MessageToken> =
alloc::vec::Vec::with_capacity(max_present);
for _ in 0..stride_to_pop {
let tok = self.pop_raw();
let tok_bytes = headers
.peek_header(tok)
.map(|h| *h.payload_size_bytes())
.unwrap_or(0);
self.bytes_in_queue.fetch_sub(tok_bytes, Ordering::AcqRel);
out.push(tok);
}
for i in stride_to_pop..max_present {
let t = *self.peek_ref_at_offset(i - stride_to_pop);
out.push(t);
}
return Ok(BatchView::from_owned(out));
}
let mut take_n = core::cmp::min(available, delta_count);
take_n = apply_fixed(take_n);
if take_n == 0 {
return Err(QueueError::Empty);
}
let mut out: alloc::vec::Vec<MessageToken> = alloc::vec::Vec::with_capacity(take_n);
for _ in 0..take_n {
let tok = self.pop_raw();
let tok_bytes = headers
.peek_header(tok)
.map(|h| *h.payload_size_bytes())
.unwrap_or(0);
self.bytes_in_queue.fetch_sub(tok_bytes, Ordering::AcqRel);
out.push(tok);
}
Ok(BatchView::from_owned(out))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::static_manager::StaticMemoryManager;
use crate::message::{Message, MessageHeader};
use crate::policy::{AdmissionPolicy, EdgePolicy, OverBudgetAction, QueueCaps};
use crate::prelude::{
create_test_tensor_filled_with, HeaderStore, MemoryManager as _, TestTensor,
};
use crate::types::Ticks;
fn mk_header(tick: u64) -> MessageHeader {
let mut h = MessageHeader::empty();
h.set_creation_tick(Ticks::new(tick));
h.set_payload_size_bytes(8usize);
h
}
fn make_msg_with_tick(tick: u64) -> Message<TestTensor> {
let h = mk_header(tick);
Message::new(h, create_test_tensor_filled_with(0))
}
fn make_ring() -> SpscAtomicRing {
const CAPACITY: usize = 32;
unsafe { SpscAtomicRing::with_capacity(CAPACITY) }
}
const POLICY: EdgePolicy = EdgePolicy::new(
QueueCaps::new(8, 6, None, None),
AdmissionPolicy::DropNewest,
OverBudgetAction::Drop,
);
crate::run_edge_contract_tests!(spsc_atomic_ring_contract, || make_ring());
#[test]
fn pushes_and_pops_tokens_with_byte_accounting() {
const MGR_DEPTH: usize = 64;
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let mut ring = make_ring();
let t1 = mgr.store(make_msg_with_tick(1)).expect("store t1");
let t2 = mgr.store(make_msg_with_tick(2)).expect("store t2");
assert_eq!(ring.try_push(t1, &POLICY, &mgr), EnqueueResult::Enqueued);
assert_eq!(ring.try_push(t2, &POLICY, &mgr), EnqueueResult::Enqueued);
let occ = ring.occupancy(&POLICY);
assert_eq!(*occ.items(), 2usize);
assert!(*occ.bytes() > 0usize);
let p1 = ring.try_pop(&mgr).expect("pop p1");
let p2 = ring.try_pop(&mgr).expect("pop p2");
let h1 = mgr.peek_header(p1).expect("h1");
let h2 = mgr.peek_header(p2).expect("h2");
assert_eq!(*h1.creation_tick().as_u64(), 1u64);
assert_eq!(*h2.creation_tick().as_u64(), 2u64);
}
}