use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use super::{CaducusError, CaducusErrorKind};
static SLOT_SENTINEL: OnceLock<Instant> = OnceLock::new();
fn slot_sentinel() -> Instant {
*SLOT_SENTINEL.get_or_init(Instant::now)
}
pub trait ReportChannel<T>: Send + Sync + 'static {
fn send(&self, item: T) -> Result<(), T>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ChannelMode {
Spsc,
Mpsc,
}
pub(crate) struct PopResult<T> {
pub item: T,
pub expires_at: Instant,
pub expiry_channel: Option<Arc<dyn ReportChannel<T>>>,
pub shutdown_channel: Option<Arc<dyn ReportChannel<T>>>,
}
impl<T: std::fmt::Debug> std::fmt::Debug for PopResult<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PopResult")
.field("item", &self.item)
.field("expires_at", &self.expires_at)
.field(
"expiry_channel",
&self.expiry_channel.as_ref().map(|_| ".."),
)
.field(
"shutdown_channel",
&self.shutdown_channel.as_ref().map(|_| ".."),
)
.finish()
}
}
struct Slot<T> {
item: Option<T>,
expires_at: Instant,
expiry_channel: Option<Arc<dyn ReportChannel<T>>>,
shutdown_channel: Option<Arc<dyn ReportChannel<T>>>,
}
impl<T> Slot<T> {
fn empty() -> Self {
Self {
item: None,
expires_at: slot_sentinel(),
expiry_channel: None,
shutdown_channel: None,
}
}
fn is_occupied(&self) -> bool {
self.item.is_some()
}
fn populate(
&mut self,
item: T,
expires_at: Instant,
expiry_channel: Option<Arc<dyn ReportChannel<T>>>,
shutdown_channel: Option<Arc<dyn ReportChannel<T>>>,
) {
self.item = Some(item);
self.expires_at = expires_at;
self.expiry_channel = expiry_channel;
self.shutdown_channel = shutdown_channel;
}
fn take(&mut self) -> Option<PopResult<T>> {
Some(PopResult {
item: self.item.take()?,
expires_at: self.expires_at,
expiry_channel: self.expiry_channel.take(),
shutdown_channel: self.shutdown_channel.take(),
})
}
}
pub(crate) struct Ring<T> {
slots: Vec<Slot<T>>,
head: usize,
tail: usize,
len: usize,
target_capacity: usize,
ttl: Duration,
shutdown: bool,
mode: ChannelMode,
expiry_channel: Option<Arc<dyn ReportChannel<T>>>,
shutdown_channel: Option<Arc<dyn ReportChannel<T>>>,
ttl_reduced: bool,
}
impl<T> Ring<T> {
pub(crate) const MIN_TTL: Duration = Duration::from_millis(1);
pub(crate) const MAX_TTL: Duration = Duration::from_secs(365 * 24 * 60 * 60);
pub fn new(
capacity: usize,
ttl: Duration,
mode: ChannelMode,
expiry_channel: Option<Arc<dyn ReportChannel<T>>>,
shutdown_channel: Option<Arc<dyn ReportChannel<T>>>,
) -> Result<Self, CaducusError> {
Self::validate_ttl(ttl)?;
let capacity = capacity.max(1);
let mut slots = Vec::with_capacity(capacity);
for _ in 0..capacity {
slots.push(Slot::empty());
}
Ok(Self {
slots,
head: 0,
tail: 0,
len: 0,
target_capacity: capacity,
ttl,
shutdown: false,
mode,
expiry_channel,
shutdown_channel,
ttl_reduced: false,
})
}
pub fn try_push_spsc(&mut self, item: T) -> Result<(), CaducusError<T>> {
if self.mode != ChannelMode::Spsc {
return Err(CaducusError {
kind: CaducusErrorKind::InvalidPattern(item),
});
}
self.push_common(item, None, None, None)
}
pub fn try_push_spsc_with_expires_at(
&mut self,
item: T,
expires_at: Instant,
) -> Result<(), CaducusError<T>> {
if self.mode != ChannelMode::Spsc {
return Err(CaducusError {
kind: CaducusErrorKind::InvalidPattern(item),
});
}
self.push_common(item, Some(expires_at), None, None)
}
pub fn try_push_mpsc(
&mut self,
item: T,
expiry_channel: Option<Arc<dyn ReportChannel<T>>>,
shutdown_channel: Option<Arc<dyn ReportChannel<T>>>,
) -> Result<(), CaducusError<T>> {
if self.mode != ChannelMode::Mpsc {
return Err(CaducusError {
kind: CaducusErrorKind::InvalidPattern(item),
});
}
self.push_common(item, None, expiry_channel, shutdown_channel)
}
pub fn try_push_mpsc_with_expires_at(
&mut self,
item: T,
expires_at: Instant,
expiry_channel: Option<Arc<dyn ReportChannel<T>>>,
shutdown_channel: Option<Arc<dyn ReportChannel<T>>>,
) -> Result<(), CaducusError<T>> {
if self.mode != ChannelMode::Mpsc {
return Err(CaducusError {
kind: CaducusErrorKind::InvalidPattern(item),
});
}
self.push_common(item, Some(expires_at), expiry_channel, shutdown_channel)
}
fn push_common(
&mut self,
item: T,
expires_at: Option<Instant>,
expiry_channel: Option<Arc<dyn ReportChannel<T>>>,
shutdown_channel: Option<Arc<dyn ReportChannel<T>>>,
) -> Result<(), CaducusError<T>> {
if self.shutdown {
return Err(CaducusError {
kind: CaducusErrorKind::Shutdown(item),
});
}
if self.len >= self.target_capacity {
return Err(CaducusError {
kind: CaducusErrorKind::Full(item),
});
}
let expires_at = expires_at.unwrap_or_else(|| self.default_expires_at());
self.track_deadline_order(expires_at);
self.slots[self.tail].populate(item, expires_at, expiry_channel, shutdown_channel);
self.tail = (self.tail + 1) % self.slots.len();
self.len += 1;
Ok(())
}
fn default_expires_at(&self) -> Instant {
Self::expires_at_from_ttl(self.ttl()).unwrap_or_else(|()| self.fallback_expires_at())
}
pub(crate) fn expires_at_from_ttl(ttl: Duration) -> Result<Instant, ()> {
if !(Self::MIN_TTL..=Self::MAX_TTL).contains(&ttl) {
return Err(());
}
let now = Instant::now();
Ok(now
.checked_add(ttl)
.or_else(|| now.checked_add(Self::MIN_TTL))
.unwrap_or(now))
}
fn fallback_expires_at(&self) -> Instant {
let now = Instant::now();
now.checked_add(Self::MIN_TTL).unwrap_or(now)
}
pub(crate) fn validate_deadline(deadline: Instant) -> Result<Instant, ()> {
if deadline <= Instant::now() {
return Err(());
}
Ok(deadline)
}
fn track_deadline_order(&mut self, expires_at: Instant) {
if self.len == 0 {
return;
}
let previous_tail = if self.tail == 0 {
self.slots.len() - 1
} else {
self.tail - 1
};
if expires_at < self.slots[previous_tail].expires_at {
self.ttl_reduced = true;
}
}
pub fn try_pop(&mut self) -> Option<PopResult<T>> {
if self.len == 0 {
return None;
}
let mut result = self.slots[self.head].take();
self.head = (self.head + 1) % self.slots.len();
self.len -= 1;
if self.should_compact() {
self.compact();
}
if let Some(ref mut pop) = result {
if self.mode == ChannelMode::Spsc {
pop.expiry_channel = self.expiry_channel.clone();
pop.shutdown_channel = self.shutdown_channel.clone();
}
}
result
}
pub fn peek_expires_at(&self) -> Option<Instant> {
if self.len == 0 {
return None;
}
if !self.ttl_reduced {
return Some(self.slots[self.head].expires_at);
}
let cap = self.slots.len();
let mut min_deadline = self.slots[self.head].expires_at;
for i in 1..self.len {
let idx = (self.head + i) % cap;
let d = self.slots[idx].expires_at;
if d < min_deadline {
min_deadline = d;
}
}
Some(min_deadline)
}
pub fn drain_expired(&mut self, now: Instant) -> Vec<PopResult<T>> {
if !self.ttl_reduced {
let mut items = Vec::new();
while let Some(expires_at) = self.slots.get(self.head).map(|s| s.expires_at) {
if self.len == 0 || expires_at > now {
break;
}
if let Some(pop) = self.try_pop() {
items.push(pop);
} else {
break;
}
}
return items;
}
let cap = self.slots.len();
let original_len = self.len;
let mut items: Vec<PopResult<T>> = Vec::new();
let mut gap_opened = false;
let mut survivor_seen = false;
let mut prev_survivor_deadline: Option<Instant> = None;
let mut survivors_monotonic = true;
for i in 0..original_len {
let idx = (self.head + i) % cap;
debug_assert!(self.slots[idx].is_occupied());
let expires_at = self.slots[idx].expires_at;
if expires_at <= now {
if let Some(mut pop) = self.slots[idx].take() {
if self.mode == ChannelMode::Spsc {
pop.expiry_channel = self.expiry_channel.clone();
pop.shutdown_channel = self.shutdown_channel.clone();
}
items.push(pop);
self.len -= 1;
if survivor_seen {
gap_opened = true;
}
}
} else {
survivor_seen = true;
if let Some(prev) = prev_survivor_deadline {
if expires_at < prev {
survivors_monotonic = false;
}
}
prev_survivor_deadline = Some(expires_at);
}
}
if gap_opened {
let new_cap = self.target_capacity;
let mut new_slots: Vec<Slot<T>> = Vec::with_capacity(new_cap);
for i in 0..original_len {
let idx = (self.head + i) % cap;
if self.slots[idx].is_occupied() {
let mut empty = Slot::empty();
std::mem::swap(&mut self.slots[idx], &mut empty);
new_slots.push(empty);
}
}
let placed = new_slots.len();
while new_slots.len() < new_cap {
new_slots.push(Slot::empty());
}
self.slots = new_slots;
self.head = 0;
self.tail = placed % self.slots.len();
self.target_capacity = new_cap;
debug_assert_eq!(self.len, placed);
} else {
let removed = original_len - self.len;
self.head = (self.head + removed) % cap;
}
if self.should_compact() {
self.compact();
}
if self.len <= 1 || survivors_monotonic {
self.ttl_reduced = false;
}
items
}
pub fn shutdown(&mut self) -> Vec<PopResult<T>> {
if self.shutdown {
return Vec::new();
}
self.shutdown = true;
let mut items = Vec::with_capacity(self.len);
while self.len > 0 {
if let Some(mut pop) = self.slots[self.head].take() {
if self.mode == ChannelMode::Spsc {
pop.expiry_channel = self.expiry_channel.clone();
pop.shutdown_channel = self.shutdown_channel.clone();
}
items.push(pop);
}
self.head = (self.head + 1) % self.slots.len();
self.len -= 1;
}
self.head = 0;
self.tail = 0;
items
}
pub fn is_shutdown(&self) -> bool {
self.shutdown
}
pub fn ttl(&self) -> Duration {
self.ttl.clamp(Self::MIN_TTL, Self::MAX_TTL)
}
pub fn set_ttl(&mut self, ttl: Duration) -> Result<(), CaducusError> {
Self::validate_ttl(ttl)?;
if ttl < self.ttl && self.len > 0 {
self.ttl_reduced = true;
}
self.ttl = ttl;
Ok(())
}
pub fn request_capacity(&mut self, new: usize) {
let new = new.max(1);
let current = self.slots.len();
if new == current && self.target_capacity == current {
return;
}
if new > current {
self.target_capacity = new;
self.linearize(new);
} else if new < current {
self.target_capacity = new;
if self.len <= new {
self.linearize(new);
}
} else {
self.target_capacity = new;
}
}
fn should_compact(&self) -> bool {
self.target_capacity < self.slots.len() && self.len <= self.target_capacity
}
fn compact(&mut self) {
self.linearize(self.target_capacity);
}
fn linearize(&mut self, new_capacity: usize) {
let mut new_slots = Vec::with_capacity(new_capacity);
let old_len = self.slots.len();
for i in 0..self.len {
let idx = (self.head + i) % old_len;
debug_assert!(
self.slots[idx].is_occupied(),
"linearize precondition violated: slot at logical position {i} (index {idx}) is empty"
);
let mut empty = Slot::empty();
std::mem::swap(&mut self.slots[idx], &mut empty);
new_slots.push(empty);
}
for _ in self.len..new_capacity {
new_slots.push(Slot::empty());
}
self.slots = new_slots;
self.head = 0;
self.tail = self.len % self.slots.len();
self.target_capacity = new_capacity;
}
fn validate_ttl(ttl: Duration) -> Result<(), CaducusError> {
if !(Self::MIN_TTL..=Self::MAX_TTL).contains(&ttl) {
return Err(CaducusError {
kind: CaducusErrorKind::InvalidArgument,
});
}
Ok(())
}
}