use crate::pod::Pod;
use crate::slot::Slot;
use alloc::boxed::Box;
use alloc::sync::{Arc, Weak};
use alloc::vec::Vec;
use core::sync::atomic::AtomicU64;
use spin::Mutex;
#[repr(align(64))]
pub struct Padded<T>(pub T);
#[derive(Clone, Copy)]
pub(crate) struct RingIndex {
pub(crate) capacity: u64,
pub(crate) mask: u64,
pub(crate) reciprocal: u64,
pub(crate) is_pow2: bool,
}
impl RingIndex {
pub(crate) fn new(capacity: usize) -> Self {
assert!(capacity >= 2, "capacity must be at least 2");
let cap = capacity as u64;
let is_pow2 = capacity.is_power_of_two();
let mask = if is_pow2 { cap - 1 } else { 0 };
let reciprocal = ((1u128 << 64) / cap as u128) as u64;
RingIndex {
capacity: cap,
mask,
reciprocal,
is_pow2,
}
}
}
pub(crate) struct BackpressureState {
pub(crate) watermark: u64,
pub(crate) trackers: Mutex<Vec<Weak<Padded<AtomicU64>>>>,
}
pub(crate) struct SharedRing<T> {
slots: Box<[Slot<T>]>,
pub(crate) index: RingIndex,
pub(crate) cursor: Padded<AtomicU64>,
pub(crate) backpressure: Option<BackpressureState>,
pub(crate) next_seq: Option<Padded<AtomicU64>>,
}
impl<T: Pod> SharedRing<T> {
pub(crate) fn new(capacity: usize) -> Self {
let index = RingIndex::new(capacity);
let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
SharedRing {
slots: slots.into_boxed_slice(),
index,
cursor: Padded(AtomicU64::new(u64::MAX)),
backpressure: None,
next_seq: None,
}
}
pub(crate) fn new_bounded(capacity: usize, watermark: usize) -> Self {
assert!(capacity >= 2, "capacity must be at least 2");
assert!(watermark < capacity, "watermark must be less than capacity");
let index = RingIndex::new(capacity);
let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
SharedRing {
slots: slots.into_boxed_slice(),
index,
cursor: Padded(AtomicU64::new(u64::MAX)),
backpressure: Some(BackpressureState {
watermark: watermark as u64,
trackers: Mutex::new(Vec::new()),
}),
next_seq: None,
}
}
pub(crate) fn new_mpmc(capacity: usize) -> Self {
let index = RingIndex::new(capacity);
let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
SharedRing {
slots: slots.into_boxed_slice(),
index,
cursor: Padded(AtomicU64::new(u64::MAX)),
backpressure: None,
next_seq: Some(Padded(AtomicU64::new(0))),
}
}
#[inline]
pub(crate) fn slots_ptr(&self) -> *const Slot<T> {
self.slots.as_ptr()
}
#[inline]
pub(crate) fn cursor_ptr(&self) -> *const AtomicU64 {
&self.cursor.0 as *const AtomicU64
}
#[cfg(all(target_os = "linux", feature = "hugepages"))]
#[inline]
pub(crate) fn slots_byte_len(&self) -> usize {
self.slots.len() * core::mem::size_of::<Slot<T>>()
}
#[inline]
pub(crate) fn capacity(&self) -> u64 {
self.index.capacity
}
pub(crate) fn register_tracker(&self, initial: u64) -> Option<Arc<Padded<AtomicU64>>> {
let bp = self.backpressure.as_ref()?;
let tracker = Arc::new(Padded(AtomicU64::new(initial)));
bp.trackers.lock().push(Arc::downgrade(&tracker));
Some(tracker)
}
#[inline]
pub(crate) fn slowest_cursor(&self) -> Option<u64> {
let bp = self.backpressure.as_ref()?;
let mut trackers = bp.trackers.lock();
let mut min = u64::MAX;
let mut has_live = false;
trackers.retain(|weak| {
if let Some(arc) = weak.upgrade() {
let val = arc.0.load(core::sync::atomic::Ordering::Relaxed);
if val < min {
min = val;
}
has_live = true;
true } else {
false }
});
if has_live {
Some(min)
} else {
None
}
}
}