mod concurrency {
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum CaducusErrorKind<T = ()> {
InvalidArgument,
InvalidPattern(T),
Shutdown(T),
Full(T),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct CaducusError<T = ()> {
pub kind: CaducusErrorKind<T>,
}
pub mod ring_buffer {
include!("../src/concurrency/ring_buffer.rs");
pub fn validate<T>(ring: &Ring<T>) {
let cap = ring.capacity();
assert!(ring.len <= cap, "len {} exceeds capacity {}", ring.len, cap);
let mut occupied = 0;
for i in 0..ring.len {
let idx = (ring.head + i) % cap;
assert!(
ring.slots[idx].is_occupied(),
"slot {} (index {}) should be occupied",
i,
idx
);
occupied += 1;
}
assert_eq!(occupied, ring.len);
}
pub fn force_raw_ttl<T>(ring: &mut Ring<T>, ttl: Duration) {
ring.ttl = ttl;
}
impl<T> Ring<T> {
pub fn len(&self) -> usize {
self.len
}
pub fn capacity(&self) -> usize {
self.slots.len()
}
pub fn target_capacity(&self) -> usize {
self.target_capacity
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn is_full(&self) -> bool {
self.len >= self.target_capacity
}
pub fn ttl_reduced(&self) -> bool {
self.ttl_reduced
}
pub fn head_deadline(&self) -> Instant {
self.slots[self.head].expires_at
}
}
}
}
use concurrency::ring_buffer::{force_raw_ttl, validate, ChannelMode, ReportChannel, Ring};
use concurrency::CaducusErrorKind;
use std::sync::Arc;
use std::time::{Duration, Instant};
const DEFAULT_TTL: Duration = Duration::from_secs(5);
macro_rules! new_ring {
($capacity:expr, $ttl:expr) => {
Ring::<i32>::new($capacity, $ttl, ChannelMode::Mpsc, None, None).expect("valid TTL")
};
}
macro_rules! new_spsc_ring {
($capacity:expr, $ttl:expr, $expiry:expr, $shutdown:expr) => {
Ring::<i32>::new($capacity, $ttl, ChannelMode::Spsc, $expiry, $shutdown).expect("valid TTL")
};
}
struct TestChannel;
impl<T: Send + 'static> ReportChannel<T> for TestChannel {
fn send(&self, _item: T) -> Result<(), T> {
Ok(())
}
}
#[test]
fn report_channel_send_returns_ok() {
let ch: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
assert!(ch.send(7).is_ok());
}
fn push_val(ring: &mut Ring<i32>, val: i32) {
ring.try_push_mpsc(val, None, None)
.expect("push should succeed");
}
fn push_val_spsc(ring: &mut Ring<i32>, val: i32) {
ring.try_push_spsc(val).expect("push should succeed");
}
fn push_val_with_channels(
ring: &mut Ring<i32>,
val: i32,
expiry: Option<Arc<dyn ReportChannel<i32>>>,
shutdown: Option<Arc<dyn ReportChannel<i32>>>,
) {
ring.try_push_mpsc(val, expiry, shutdown)
.expect("push should succeed");
}
fn pop_val(ring: &mut Ring<i32>) -> i32 {
ring.try_pop().expect("pop should return an item").item
}
#[test]
fn new_allocates_empty_slots() {
let ring = new_ring!(4, DEFAULT_TTL);
assert_eq!(ring.capacity(), 4);
assert_eq!(ring.target_capacity(), 4);
assert_eq!(ring.len(), 0);
assert!(ring.is_empty());
assert!(!ring.is_full());
assert!(!ring.is_shutdown());
assert_eq!(ring.ttl(), DEFAULT_TTL);
validate(&ring);
}
#[test]
fn new_clamps_zero_to_one() {
let ring = new_ring!(0, DEFAULT_TTL);
assert_eq!(ring.capacity(), 1);
assert_eq!(ring.target_capacity(), 1);
validate(&ring);
}
#[test]
fn new_capacity_one() {
let ring = new_ring!(1, DEFAULT_TTL);
assert_eq!(ring.capacity(), 1);
validate(&ring);
}
#[test]
fn new_stores_provided_ttl() {
let ttl = Duration::from_millis(750);
let ring = new_ring!(4, ttl);
assert_eq!(ring.ttl(), ttl);
}
#[test]
fn new_rejects_ttl_below_minimum() {
let result = Ring::<i32>::new(4, Duration::from_micros(999), ChannelMode::Mpsc, None, None);
let err = result.err().expect("ttl below 1ms should be rejected");
assert_eq!(err.kind, CaducusErrorKind::InvalidArgument);
}
#[test]
fn new_rejects_ttl_above_maximum() {
let result = Ring::<i32>::new(
4,
Ring::<i32>::MAX_TTL + Duration::from_secs(1),
ChannelMode::Mpsc,
None,
None,
);
let err = result.err().expect("ttl above 1 year should be rejected");
assert_eq!(err.kind, CaducusErrorKind::InvalidArgument);
}
#[test]
fn new_accepts_minimum_ttl() {
let ring = Ring::<i32>::new(4, Ring::<i32>::MIN_TTL, ChannelMode::Mpsc, None, None)
.expect("valid TTL");
assert_eq!(ring.ttl(), Ring::<i32>::MIN_TTL);
}
#[test]
fn new_accepts_maximum_ttl() {
let ring = Ring::<i32>::new(4, Ring::<i32>::MAX_TTL, ChannelMode::Mpsc, None, None)
.expect("valid TTL");
assert_eq!(ring.ttl(), Ring::<i32>::MAX_TTL);
}
#[test]
fn set_ttl_updates_value() {
let mut ring = new_ring!(4, DEFAULT_TTL);
let new_ttl = Duration::from_secs(10);
ring.set_ttl(new_ttl).unwrap();
assert_eq!(ring.ttl(), new_ttl);
}
#[test]
fn set_ttl_does_not_affect_existing_items() {
let original_ttl = Duration::from_secs(5);
let mut ring = new_ring!(4, original_ttl);
let before = Instant::now();
push_val(&mut ring, 1);
let after = Instant::now();
ring.set_ttl(Duration::from_secs(1)).unwrap();
let result = ring.try_pop().unwrap();
assert!(result.expires_at >= before + original_ttl);
assert!(result.expires_at <= after + original_ttl);
}
#[test]
fn set_ttl_rejects_ttl_below_minimum() {
let mut ring = new_ring!(4, DEFAULT_TTL);
let err = ring
.set_ttl(Duration::from_micros(999))
.expect_err("ttl below 1ms should be rejected");
assert_eq!(err.kind, CaducusErrorKind::InvalidArgument);
}
#[test]
fn set_ttl_rejects_ttl_above_maximum() {
let mut ring = new_ring!(4, DEFAULT_TTL);
let err = ring
.set_ttl(Ring::<i32>::MAX_TTL + Duration::from_secs(1))
.expect_err("ttl above 1 year should be rejected");
assert_eq!(err.kind, CaducusErrorKind::InvalidArgument);
}
#[test]
fn set_ttl_accepts_minimum_ttl() {
let mut ring = new_ring!(4, DEFAULT_TTL);
ring.set_ttl(Ring::<i32>::MIN_TTL).unwrap();
assert_eq!(ring.ttl(), Ring::<i32>::MIN_TTL);
}
#[test]
fn set_ttl_accepts_maximum_ttl() {
let mut ring = new_ring!(4, DEFAULT_TTL);
ring.set_ttl(Ring::<i32>::MAX_TTL).unwrap();
assert_eq!(ring.ttl(), Ring::<i32>::MAX_TTL);
}
#[test]
fn ttl_clamps_below_minimum_when_raw_value_is_forced() {
let mut ring = new_ring!(4, DEFAULT_TTL);
force_raw_ttl(&mut ring, Duration::ZERO);
assert_eq!(ring.ttl(), Ring::<i32>::MIN_TTL);
}
#[test]
fn ttl_clamps_above_maximum_when_raw_value_is_forced() {
let mut ring = new_ring!(4, DEFAULT_TTL);
force_raw_ttl(&mut ring, Ring::<i32>::MAX_TTL + Duration::from_secs(1));
assert_eq!(ring.ttl(), Ring::<i32>::MAX_TTL);
}
#[test]
fn ttl_returns_stored_value_when_it_is_in_range() {
let ring = new_ring!(4, Duration::from_secs(10));
assert_eq!(ring.ttl(), Duration::from_secs(10));
}
#[test]
fn push_pop_fifo_order() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 10);
push_val(&mut ring, 20);
push_val(&mut ring, 30);
assert_eq!(ring.len(), 3);
assert_eq!(pop_val(&mut ring), 10);
assert_eq!(pop_val(&mut ring), 20);
assert_eq!(pop_val(&mut ring), 30);
assert!(ring.is_empty());
validate(&ring);
}
#[test]
fn fifo_with_wrap_around() {
let mut ring = new_ring!(3, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
push_val(&mut ring, 3);
assert!(ring.is_full());
assert_eq!(pop_val(&mut ring), 1);
assert_eq!(pop_val(&mut ring), 2);
push_val(&mut ring, 4);
push_val(&mut ring, 5);
assert_eq!(ring.len(), 3);
assert_eq!(pop_val(&mut ring), 3);
assert_eq!(pop_val(&mut ring), 4);
assert_eq!(pop_val(&mut ring), 5);
assert!(ring.is_empty());
validate(&ring);
}
#[test]
fn interleaved_push_pop() {
let mut ring = new_ring!(2, DEFAULT_TTL);
push_val(&mut ring, 1);
assert_eq!(pop_val(&mut ring), 1);
push_val(&mut ring, 2);
push_val(&mut ring, 3);
assert_eq!(pop_val(&mut ring), 2);
assert_eq!(pop_val(&mut ring), 3);
assert!(ring.is_empty());
validate(&ring);
}
#[test]
fn push_fails_when_full() {
let mut ring = new_ring!(2, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
assert!(ring.is_full());
let err = ring.try_push_mpsc(3, None, None).unwrap_err();
assert_eq!(err.kind, CaducusErrorKind::Full(3));
assert_eq!(ring.len(), 2);
validate(&ring);
}
#[test]
fn push_succeeds_after_pop_frees_space() {
let mut ring = new_ring!(2, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
assert_eq!(pop_val(&mut ring), 1);
push_val(&mut ring, 3);
assert_eq!(pop_val(&mut ring), 2);
assert_eq!(pop_val(&mut ring), 3);
validate(&ring);
}
#[test]
fn pop_returns_none_when_empty() {
let mut ring = new_ring!(4, DEFAULT_TTL);
assert!(ring.try_pop().is_none());
}
#[test]
fn pop_returns_none_after_emptied() {
let mut ring = new_ring!(2, DEFAULT_TTL);
push_val(&mut ring, 1);
assert_eq!(pop_val(&mut ring), 1);
assert!(ring.try_pop().is_none());
}
#[test]
fn peek_returns_none_when_empty() {
let ring = new_ring!(4, DEFAULT_TTL);
assert!(ring.peek_expires_at().is_none());
}
#[test]
fn empty_slots_do_not_leak_sentinel_through_peek_or_drain() {
let ring = new_ring!(1024, DEFAULT_TTL);
assert!(
ring.peek_expires_at().is_none(),
"empty ring must peek None"
);
let mut ring = ring;
let drained = ring.drain_expired(Instant::now());
assert!(
drained.is_empty(),
"empty ring must drain nothing, got {} items",
drained.len()
);
assert_eq!(ring.len(), 0);
}
#[test]
fn slot_sentinel_is_stable_across_constructions() {
let r1 = new_ring!(4, DEFAULT_TTL);
std::thread::sleep(Duration::from_millis(5));
let r2 = new_ring!(4, DEFAULT_TTL);
let after_both = Instant::now();
let mut r1 = r1;
let mut r2 = r2;
push_val(&mut r1, 1);
push_val(&mut r2, 1);
let h1 = r1.peek_expires_at().unwrap();
let h2 = r2.peek_expires_at().unwrap();
assert!(
h1 >= after_both,
"occupied head must have a now-derived deadline, not the sentinel"
);
assert!(
h2 >= after_both,
"occupied head must have a now-derived deadline, not the sentinel"
);
}
#[test]
fn peek_returns_head_expiry() {
let ttl = Duration::from_secs(5);
let mut ring = new_ring!(4, ttl);
let before = Instant::now();
push_val(&mut ring, 1);
push_val(&mut ring, 2);
let after = Instant::now();
let head_expiry = ring.peek_expires_at().unwrap();
assert!(head_expiry >= before + ttl);
assert!(head_expiry <= after + ttl);
pop_val(&mut ring);
let second_expiry = ring.peek_expires_at().unwrap();
assert!(second_expiry >= before + ttl);
assert!(second_expiry <= after + ttl);
}
#[test]
fn peek_does_not_remove_item() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 42);
let _ = ring.peek_expires_at();
assert_eq!(ring.len(), 1);
assert_eq!(pop_val(&mut ring), 42);
}
#[test]
fn mpsc_pop_returns_per_slot_channels() {
let mut ring = new_ring!(4, DEFAULT_TTL);
let expiry_ch: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
let shutdown_ch: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
push_val_with_channels(
&mut ring,
1,
Some(expiry_ch.clone()),
Some(shutdown_ch.clone()),
);
let result = ring.try_pop().unwrap();
assert_eq!(result.item, 1);
assert!(result.expiry_channel.is_some());
assert!(result.shutdown_channel.is_some());
}
#[test]
fn mpsc_pop_returns_none_channels_when_not_set() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 1);
let result = ring.try_pop().unwrap();
assert!(result.expiry_channel.is_none());
assert!(result.shutdown_channel.is_none());
}
#[test]
fn growth_preserves_slot_metadata() {
let mut ring = new_ring!(2, DEFAULT_TTL);
let ch: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
let before = Instant::now();
ring.try_push_mpsc(1, Some(ch.clone()), Some(ch.clone()))
.unwrap();
let after = Instant::now();
ring.request_capacity(4);
let result = ring.try_pop().unwrap();
assert_eq!(result.item, 1);
assert!(result.expires_at >= before + DEFAULT_TTL);
assert!(result.expires_at <= after + DEFAULT_TTL);
assert!(result.expiry_channel.is_some());
assert!(result.shutdown_channel.is_some());
}
#[test]
fn shrink_preserves_slot_metadata() {
let mut ring = new_ring!(4, DEFAULT_TTL);
let ch: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
let before = Instant::now();
ring.try_push_mpsc(1, Some(ch.clone()), Some(ch.clone()))
.unwrap();
let after = Instant::now();
ring.request_capacity(2);
let result = ring.try_pop().unwrap();
assert_eq!(result.item, 1);
assert!(result.expires_at >= before + DEFAULT_TTL);
assert!(result.expires_at <= after + DEFAULT_TTL);
assert!(result.expiry_channel.is_some());
assert!(result.shutdown_channel.is_some());
}
#[test]
fn shutdown_drain_preserves_slot_metadata() {
let mut ring = new_ring!(4, DEFAULT_TTL);
let ch: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
let before = Instant::now();
ring.try_push_mpsc(1, Some(ch.clone()), Some(ch.clone()))
.unwrap();
let after = Instant::now();
push_val(&mut ring, 2);
push_val(&mut ring, 3);
let items = ring.shutdown();
assert_eq!(items[0].item, 1);
assert!(items[0].expires_at >= before + DEFAULT_TTL);
assert!(items[0].expires_at <= after + DEFAULT_TTL);
assert!(items[0].expiry_channel.is_some());
assert!(items[0].shutdown_channel.is_some());
}
#[test]
fn mpsc_push_with_ttl_uses_item_ttl_without_mutating_default() {
let mut ring = new_ring!(4, Duration::from_secs(60));
let item_ttl = Duration::from_millis(20);
let before = Instant::now();
let expires_at = Ring::<i32>::expires_at_from_ttl(item_ttl).unwrap();
ring.try_push_mpsc_with_expires_at(1, expires_at, None, None)
.expect("per-item TTL push should succeed");
let after = Instant::now();
assert_eq!(ring.ttl(), Duration::from_secs(60));
let pop = ring.try_pop().expect("item should be present");
assert_eq!(pop.item, 1);
assert!(pop.expires_at >= before + item_ttl);
assert!(pop.expires_at <= after + item_ttl);
}
#[test]
fn per_item_ttl_validation_rejects_out_of_range_duration() {
assert!(Ring::<i32>::expires_at_from_ttl(Duration::ZERO).is_err());
assert!(Ring::<i32>::expires_at_from_ttl(Duration::from_secs(365 * 24 * 60 * 60 + 1)).is_err());
}
#[test]
fn mpsc_push_with_deadline_stores_exact_deadline() {
let mut ring = new_ring!(4, DEFAULT_TTL);
let deadline = Instant::now() + Duration::from_secs(30);
ring.try_push_mpsc_with_expires_at(1, deadline, None, None)
.expect("future deadline should be accepted");
let pop = ring.try_pop().expect("item should be present");
assert_eq!(pop.item, 1);
assert_eq!(pop.expires_at, deadline);
}
#[test]
fn per_item_deadline_validation_rejects_past_deadline() {
let deadline = Instant::now() - Duration::from_millis(1);
assert!(Ring::<i32>::validate_deadline(deadline).is_err());
}
#[test]
fn per_item_deadline_before_previous_tail_sets_full_scan_flag() {
let mut ring = new_ring!(4, DEFAULT_TTL);
let first_deadline = Instant::now() + Duration::from_secs(60);
let second_deadline = Instant::now() + Duration::from_secs(10);
ring.try_push_mpsc_with_expires_at(1, first_deadline, None, None)
.expect("first push should succeed");
assert!(!ring.ttl_reduced());
ring.try_push_mpsc_with_expires_at(2, second_deadline, None, None)
.expect("second push should succeed");
assert!(ring.ttl_reduced());
assert_eq!(ring.peek_expires_at(), Some(second_deadline));
}
#[test]
fn spsc_per_item_push_variants_use_supplied_expiry() {
let mut ring = new_spsc_ring!(4, DEFAULT_TTL, None, None);
let ttl = Duration::from_millis(10);
let before = Instant::now();
let expires_at = Ring::<i32>::expires_at_from_ttl(ttl).unwrap();
ring.try_push_spsc_with_expires_at(1, expires_at)
.expect("per-item TTL push should succeed");
let after = Instant::now();
let deadline = Instant::now() + Duration::from_secs(30);
ring.try_push_spsc_with_expires_at(2, deadline)
.expect("future deadline should be accepted");
let first = ring.try_pop().expect("first item should be present");
assert_eq!(first.item, 1);
assert!(first.expires_at >= before + ttl);
assert!(first.expires_at <= after + ttl);
let second = ring.try_pop().expect("second item should be present");
assert_eq!(second.item, 2);
assert_eq!(second.expires_at, deadline);
}
#[test]
fn shutdown_does_not_compact_storage() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
push_val(&mut ring, 3);
ring.request_capacity(2);
assert_eq!(ring.capacity(), 4); let _ = ring.shutdown();
assert_eq!(
ring.capacity(),
4,
"shutdown must not compact: storage stays at original size"
);
}
#[test]
fn growth_preserves_fifo_order() {
let mut ring = new_ring!(2, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
ring.request_capacity(4);
assert_eq!(ring.capacity(), 4);
assert_eq!(ring.target_capacity(), 4);
assert_eq!(ring.len(), 2);
assert_eq!(pop_val(&mut ring), 1);
assert_eq!(pop_val(&mut ring), 2);
validate(&ring);
}
#[test]
fn growth_with_wrap_preserves_fifo_order() {
let mut ring = new_ring!(3, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
push_val(&mut ring, 3);
pop_val(&mut ring);
push_val(&mut ring, 4);
ring.request_capacity(5);
assert_eq!(ring.capacity(), 5);
assert_eq!(ring.len(), 3);
assert_eq!(pop_val(&mut ring), 2);
assert_eq!(pop_val(&mut ring), 3);
assert_eq!(pop_val(&mut ring), 4);
validate(&ring);
}
#[test]
fn growth_from_empty() {
let mut ring = new_ring!(2, DEFAULT_TTL);
ring.request_capacity(8);
assert_eq!(ring.capacity(), 8);
assert!(ring.is_empty());
validate(&ring);
}
#[test]
fn growth_allows_more_pushes() {
let mut ring = new_ring!(2, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
assert!(ring.is_full());
ring.request_capacity(3);
push_val(&mut ring, 3);
assert_eq!(ring.len(), 3);
validate(&ring);
}
#[test]
fn immediate_shrink_preserves_fifo_order() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
ring.request_capacity(2);
assert_eq!(ring.capacity(), 2);
assert_eq!(ring.target_capacity(), 2);
assert_eq!(ring.len(), 2);
assert_eq!(pop_val(&mut ring), 1);
assert_eq!(pop_val(&mut ring), 2);
validate(&ring);
}
#[test]
fn immediate_shrink_from_empty() {
let mut ring = new_ring!(8, DEFAULT_TTL);
ring.request_capacity(2);
assert_eq!(ring.capacity(), 2);
assert!(ring.is_empty());
validate(&ring);
}
#[test]
fn immediate_shrink_with_wrap_preserves_order() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
push_val(&mut ring, 3);
pop_val(&mut ring);
pop_val(&mut ring);
ring.request_capacity(2);
assert_eq!(ring.capacity(), 2);
assert_eq!(ring.len(), 1);
assert_eq!(pop_val(&mut ring), 3);
validate(&ring);
}
#[test]
fn immediate_shrink_to_exact_len_then_pop_push() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
ring.request_capacity(2);
assert_eq!(ring.capacity(), 2);
assert!(ring.is_full());
assert_eq!(pop_val(&mut ring), 1);
push_val(&mut ring, 3);
assert_eq!(pop_val(&mut ring), 2);
assert_eq!(pop_val(&mut ring), 3);
validate(&ring);
}
#[test]
fn deferred_shrink_rejects_pushes_at_target() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
push_val(&mut ring, 3);
ring.request_capacity(2);
assert_eq!(ring.capacity(), 4); assert_eq!(ring.target_capacity(), 2);
assert!(ring.is_full()); let err = ring.try_push_mpsc(4, None, None).unwrap_err();
assert_eq!(err.kind, CaducusErrorKind::Full(4));
validate(&ring);
}
#[test]
fn deferred_shrink_compacts_when_len_reaches_target() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
push_val(&mut ring, 3);
ring.request_capacity(2);
assert_eq!(pop_val(&mut ring), 1);
assert_eq!(ring.capacity(), 2);
assert_eq!(ring.target_capacity(), 2);
assert_eq!(ring.len(), 2);
assert_eq!(pop_val(&mut ring), 2);
assert_eq!(pop_val(&mut ring), 3);
validate(&ring);
}
#[test]
fn deferred_shrink_fifo_preserved_after_compaction() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 10);
push_val(&mut ring, 20);
push_val(&mut ring, 30);
push_val(&mut ring, 40);
ring.request_capacity(2);
assert_eq!(pop_val(&mut ring), 10);
assert_eq!(pop_val(&mut ring), 20);
assert_eq!(ring.capacity(), 2);
assert_eq!(pop_val(&mut ring), 30);
assert_eq!(pop_val(&mut ring), 40);
validate(&ring);
}
#[test]
fn deferred_shrink_compact_then_pop_push() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
push_val(&mut ring, 3);
ring.request_capacity(2);
assert_eq!(pop_val(&mut ring), 1);
assert_eq!(ring.capacity(), 2);
assert!(ring.is_full());
assert_eq!(pop_val(&mut ring), 2);
push_val(&mut ring, 4);
assert_eq!(pop_val(&mut ring), 3);
assert_eq!(pop_val(&mut ring), 4);
validate(&ring);
}
#[test]
fn shutdown_returns_all_items_fifo() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
push_val(&mut ring, 3);
let items: Vec<i32> = ring.shutdown().into_iter().map(|r| r.item).collect();
assert_eq!(items, vec![1, 2, 3]);
assert!(ring.is_empty());
assert!(ring.is_shutdown());
validate(&ring);
}
#[test]
fn shutdown_on_empty_buffer() {
let mut ring = new_ring!(4, DEFAULT_TTL);
let items = ring.shutdown();
assert!(items.is_empty());
assert!(ring.is_shutdown());
validate(&ring);
}
#[test]
fn shutdown_rejects_subsequent_pushes() {
let mut ring = new_ring!(4, DEFAULT_TTL);
ring.shutdown();
let err = ring.try_push_mpsc(1, None, None).unwrap_err();
assert_eq!(err.kind, CaducusErrorKind::Shutdown(1));
}
#[test]
fn shutdown_is_irreversible() {
let mut ring = new_ring!(4, DEFAULT_TTL);
ring.shutdown();
let items = ring.shutdown();
assert!(items.is_empty());
assert!(ring.is_shutdown());
}
#[test]
fn shutdown_returns_items_with_channels() {
let mut ring = new_ring!(4, DEFAULT_TTL);
let ch: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
push_val_with_channels(&mut ring, 1, None, Some(ch.clone()));
push_val_with_channels(&mut ring, 2, None, Some(ch.clone()));
let items = ring.shutdown();
assert_eq!(items.len(), 2);
assert!(items[0].shutdown_channel.is_some());
assert!(items[1].shutdown_channel.is_some());
}
#[test]
fn shutdown_with_wrap_around() {
let mut ring = new_ring!(3, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
push_val(&mut ring, 3);
pop_val(&mut ring);
push_val(&mut ring, 4);
let items: Vec<i32> = ring.shutdown().into_iter().map(|r| r.item).collect();
assert_eq!(items, vec![2, 3, 4]);
}
#[test]
fn request_capacity_zero_clamps_to_one() {
let mut ring = new_ring!(4, DEFAULT_TTL);
ring.request_capacity(0);
assert_eq!(ring.target_capacity(), 1);
}
#[test]
fn request_same_capacity_is_noop() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
ring.request_capacity(4);
assert_eq!(ring.capacity(), 4);
assert_eq!(ring.len(), 2);
assert_eq!(pop_val(&mut ring), 1);
assert_eq!(pop_val(&mut ring), 2);
validate(&ring);
}
#[test]
fn capacity_one_push_pop_cycle() {
let mut ring = new_ring!(1, DEFAULT_TTL);
push_val(&mut ring, 10);
assert!(ring.is_full());
assert_eq!(pop_val(&mut ring), 10);
assert!(ring.is_empty());
push_val(&mut ring, 20);
assert_eq!(pop_val(&mut ring), 20);
validate(&ring);
}
#[test]
fn grow_then_shrink() {
let mut ring = new_ring!(2, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
ring.request_capacity(4);
push_val(&mut ring, 3);
assert_eq!(ring.capacity(), 4);
ring.request_capacity(2);
assert_eq!(ring.capacity(), 4);
assert_eq!(ring.target_capacity(), 2);
assert_eq!(pop_val(&mut ring), 1);
assert_eq!(ring.capacity(), 2);
assert_eq!(pop_val(&mut ring), 2);
assert_eq!(pop_val(&mut ring), 3);
validate(&ring);
}
#[test]
fn shrink_then_grow_cancels_deferred() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 1);
push_val(&mut ring, 2);
push_val(&mut ring, 3);
ring.request_capacity(2);
assert_eq!(ring.target_capacity(), 2);
ring.request_capacity(6);
assert_eq!(ring.capacity(), 6);
assert_eq!(ring.target_capacity(), 6);
assert!(!ring.is_full());
push_val(&mut ring, 4);
assert_eq!(pop_val(&mut ring), 1);
assert_eq!(pop_val(&mut ring), 2);
assert_eq!(pop_val(&mut ring), 3);
assert_eq!(pop_val(&mut ring), 4);
validate(&ring);
}
#[test]
fn spsc_push_succeeds_in_spsc_mode() {
let mut ring = new_spsc_ring!(4, DEFAULT_TTL, None, None);
push_val_spsc(&mut ring, 1);
assert_eq!(ring.len(), 1);
assert_eq!(pop_val(&mut ring), 1);
}
#[test]
fn spsc_push_rejected_in_mpsc_mode() {
let mut ring = new_ring!(4, DEFAULT_TTL);
let err = ring.try_push_spsc(1).unwrap_err();
assert_eq!(err.kind, CaducusErrorKind::InvalidPattern(1));
}
#[test]
fn spsc_push_returns_shutdown_when_shutdown() {
let mut ring = new_spsc_ring!(4, DEFAULT_TTL, None, None);
ring.shutdown();
let err = ring.try_push_spsc(1).unwrap_err();
assert_eq!(err.kind, CaducusErrorKind::Shutdown(1));
}
#[test]
fn spsc_push_returns_full_when_full() {
let mut ring = new_spsc_ring!(1, DEFAULT_TTL, None, None);
push_val_spsc(&mut ring, 1);
let err = ring.try_push_spsc(2).unwrap_err();
assert_eq!(err.kind, CaducusErrorKind::Full(2));
}
#[test]
fn mpsc_push_succeeds_in_mpsc_mode() {
let mut ring = new_ring!(4, DEFAULT_TTL);
push_val(&mut ring, 1);
assert_eq!(ring.len(), 1);
assert_eq!(pop_val(&mut ring), 1);
}
#[test]
fn mpsc_push_rejected_in_spsc_mode() {
let mut ring = new_spsc_ring!(4, DEFAULT_TTL, None, None);
let err = ring.try_push_mpsc(1, None, None).unwrap_err();
assert_eq!(err.kind, CaducusErrorKind::InvalidPattern(1));
}
#[test]
fn spsc_pop_returns_ring_level_channels() {
let expiry_ch: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
let shutdown_ch: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
let mut ring = new_spsc_ring!(
4,
DEFAULT_TTL,
Some(expiry_ch.clone()),
Some(shutdown_ch.clone())
);
push_val_spsc(&mut ring, 1);
let result = ring.try_pop().unwrap();
assert_eq!(result.item, 1);
assert!(result.expiry_channel.is_some());
assert!(result.shutdown_channel.is_some());
}
#[test]
fn spsc_pop_returns_none_channels_when_ring_has_none() {
let mut ring = new_spsc_ring!(4, DEFAULT_TTL, None, None);
push_val_spsc(&mut ring, 1);
let result = ring.try_pop().unwrap();
assert!(result.expiry_channel.is_none());
assert!(result.shutdown_channel.is_none());
}
#[test]
fn spsc_shutdown_returns_items_with_ring_level_channels() {
let expiry_ch: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
let shutdown_ch: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
let mut ring = new_spsc_ring!(
4,
DEFAULT_TTL,
Some(expiry_ch.clone()),
Some(shutdown_ch.clone())
);
push_val_spsc(&mut ring, 1);
push_val_spsc(&mut ring, 2);
let items = ring.shutdown();
assert_eq!(items.len(), 2);
for item in &items {
assert!(item.expiry_channel.is_some());
assert!(item.shutdown_channel.is_some());
}
}
#[test]
fn spsc_per_slot_channels_are_none() {
let expiry_ch: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
let mut ring = new_spsc_ring!(4, DEFAULT_TTL, Some(expiry_ch.clone()), None);
push_val_spsc(&mut ring, 1);
let result = ring.try_pop().unwrap();
assert!(result.expiry_channel.is_some());
assert!(result.shutdown_channel.is_none());
}
#[test]
fn set_ttl_reduces_with_items_sets_flag() {
let mut ring = new_ring!(4, Duration::from_secs(60));
push_val(&mut ring, 1);
assert!(!ring.ttl_reduced(), "flag clear before set_ttl");
ring.set_ttl(Duration::from_millis(50)).unwrap();
assert!(
ring.ttl_reduced(),
"set_ttl with smaller value on non-empty ring must set the flag"
);
}
#[test]
fn set_ttl_reduces_when_empty_does_not_set_flag() {
let mut ring = new_ring!(4, Duration::from_secs(60));
ring.set_ttl(Duration::from_millis(50)).unwrap();
assert!(
!ring.ttl_reduced(),
"set_ttl on empty ring must not set the flag"
);
}
#[test]
fn set_ttl_increase_does_not_change_flag() {
let mut ring = new_ring!(4, Duration::from_secs(60));
push_val(&mut ring, 1);
ring.set_ttl(Duration::from_millis(50)).unwrap();
assert!(ring.ttl_reduced());
ring.set_ttl(Duration::from_secs(60)).unwrap();
assert!(
ring.ttl_reduced(),
"TTL increase must not clear the flag (cannot repair existing non-monotonic ring)"
);
}
#[test]
fn drain_expired_head_only_fast_path_when_flag_clear() {
let mut ring = new_ring!(4, Duration::from_millis(20));
push_val(&mut ring, 1);
std::thread::sleep(Duration::from_millis(50));
push_val(&mut ring, 2);
push_val(&mut ring, 3);
assert!(!ring.ttl_reduced());
let drained = ring.drain_expired(Instant::now());
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].item, 1);
assert_eq!(ring.len(), 2);
assert_eq!(ring.try_pop().unwrap().item, 2);
assert_eq!(ring.try_pop().unwrap().item, 3);
}
#[test]
fn drain_expired_after_ttl_shrink_finds_non_head_expired() {
let mut ring = new_ring!(4, Duration::from_secs(60));
push_val(&mut ring, 1); ring.set_ttl(Duration::from_millis(20)).unwrap();
push_val(&mut ring, 2); assert!(ring.ttl_reduced());
std::thread::sleep(Duration::from_millis(60));
let drained = ring.drain_expired(Instant::now());
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].item, 2);
assert_eq!(ring.len(), 1);
assert_eq!(ring.try_pop().unwrap().item, 1);
}
#[test]
fn drain_expired_clears_flag_when_single_survivor_remains() {
let mut ring = new_ring!(4, Duration::from_secs(60));
push_val(&mut ring, 1);
ring.set_ttl(Duration::from_millis(20)).unwrap();
push_val(&mut ring, 2);
assert!(ring.ttl_reduced());
std::thread::sleep(Duration::from_millis(60));
let _ = ring.drain_expired(Instant::now());
assert_eq!(ring.len(), 1);
assert!(
!ring.ttl_reduced(),
"single survivor must clear ttl_reduced flag"
);
}
#[test]
fn drain_expired_clears_flag_when_empty() {
let mut ring = new_ring!(4, Duration::from_millis(40));
push_val(&mut ring, 1);
ring.set_ttl(Duration::from_millis(10)).unwrap();
assert!(ring.ttl_reduced());
push_val(&mut ring, 2);
std::thread::sleep(Duration::from_millis(80));
let drained = ring.drain_expired(Instant::now());
assert_eq!(drained.len(), 2);
assert_eq!(ring.len(), 0);
assert!(
!ring.ttl_reduced(),
"empty ring after drain must clear ttl_reduced flag"
);
}
#[test]
fn drain_expired_full_scan_clears_flag_when_survivors_monotonic() {
let mut ring = new_ring!(8, Duration::from_secs(60));
push_val(&mut ring, 1); ring.set_ttl(Duration::from_millis(20)).unwrap();
push_val(&mut ring, 2); ring.set_ttl(Duration::from_secs(60)).unwrap(); push_val(&mut ring, 3); assert!(ring.ttl_reduced());
std::thread::sleep(Duration::from_millis(60));
let drained = ring.drain_expired(Instant::now());
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].item, 2);
assert_eq!(ring.len(), 2);
assert!(
!ring.ttl_reduced(),
"monotonic survivors should clear the flag"
);
}
#[test]
fn drain_expired_full_scan_keeps_flag_when_survivors_non_monotonic() {
let mut ring = new_ring!(8, Duration::from_secs(60));
push_val(&mut ring, 1); ring.set_ttl(Duration::from_millis(50)).unwrap();
push_val(&mut ring, 2); assert!(ring.ttl_reduced());
let drained = ring.drain_expired(Instant::now());
assert!(drained.is_empty());
assert_eq!(ring.len(), 2);
assert!(
ring.ttl_reduced(),
"non-monotonic survivors must keep the flag set"
);
}
#[test]
fn drain_expired_with_pending_shrink_compacts_to_target() {
let mut ring = new_ring!(8, Duration::from_millis(100));
push_val(&mut ring, 1);
push_val(&mut ring, 2);
push_val(&mut ring, 3);
ring.set_ttl(Duration::from_millis(50)).unwrap();
assert!(ring.ttl_reduced());
ring.request_capacity(2);
assert_eq!(ring.target_capacity(), 2);
assert_eq!(ring.capacity(), 8, "shrink must defer while len > target");
std::thread::sleep(Duration::from_millis(200));
let drained = ring.drain_expired(Instant::now());
assert_eq!(drained.len(), 3);
assert_eq!(ring.len(), 0);
assert_eq!(
ring.capacity(),
2,
"deferred shrink must compact during slow-path drain"
);
assert_eq!(ring.target_capacity(), 2);
validate(&ring);
}
#[test]
fn flag_round_trip_after_shrink_and_drain() {
let mut ring = new_ring!(8, Duration::from_secs(60));
push_val(&mut ring, 1); assert!(!ring.ttl_reduced());
ring.set_ttl(Duration::from_millis(50)).unwrap();
assert!(ring.ttl_reduced(), "shrink on non-empty ring sets the flag");
push_val(&mut ring, 2);
let drained = ring.drain_expired(Instant::now());
assert!(drained.is_empty());
assert_eq!(ring.len(), 2);
assert!(
ring.ttl_reduced(),
"non-monotonic survivors keep the flag set across drain"
);
std::thread::sleep(Duration::from_millis(120));
let drained = ring.drain_expired(Instant::now());
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].item, 2);
assert_eq!(ring.len(), 1);
assert!(
!ring.ttl_reduced(),
"monotonic survivors clear the flag once non-monotonic items leave"
);
}
#[test]
fn drain_expired_compacts_when_gap_opened() {
let mut ring = new_ring!(4, Duration::from_secs(60));
push_val(&mut ring, 1); ring.set_ttl(Duration::from_millis(20)).unwrap();
push_val(&mut ring, 2); push_val(&mut ring, 3); ring.set_ttl(Duration::from_secs(60)).unwrap();
push_val(&mut ring, 4); std::thread::sleep(Duration::from_millis(60));
let drained = ring.drain_expired(Instant::now());
let drained_items: Vec<i32> = drained.iter().map(|p| p.item).collect();
assert_eq!(drained_items, vec![2, 3], "drain must return B,C in FIFO");
assert_eq!(ring.len(), 2);
assert_eq!(ring.try_pop().unwrap().item, 1);
assert_eq!(ring.try_pop().unwrap().item, 4);
}
#[test]
fn drain_expired_metadata_preserved_for_survivors_after_compaction() {
let expiry: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
let shutdown: Arc<dyn ReportChannel<i32>> = Arc::new(TestChannel);
let mut ring = new_ring!(4, Duration::from_secs(60));
push_val_with_channels(&mut ring, 1, Some(expiry.clone()), Some(shutdown.clone())); ring.set_ttl(Duration::from_millis(20)).unwrap();
push_val_with_channels(&mut ring, 2, Some(expiry.clone()), Some(shutdown.clone())); ring.set_ttl(Duration::from_secs(60)).unwrap();
push_val_with_channels(&mut ring, 3, Some(expiry.clone()), Some(shutdown.clone())); std::thread::sleep(Duration::from_millis(60));
let _ = ring.drain_expired(Instant::now());
assert_eq!(ring.len(), 2);
let a = ring.try_pop().unwrap();
assert_eq!(a.item, 1);
assert!(a.expiry_channel.is_some());
assert!(a.shutdown_channel.is_some());
let c = ring.try_pop().unwrap();
assert_eq!(c.item, 3);
assert!(c.expiry_channel.is_some());
assert!(c.shutdown_channel.is_some());
}
#[test]
fn peek_expires_at_head_only_when_flag_clear() {
let mut ring = new_ring!(4, Duration::from_secs(60));
push_val(&mut ring, 1);
push_val(&mut ring, 2);
assert!(!ring.ttl_reduced());
let head_deadline = ring.head_deadline();
assert_eq!(ring.peek_expires_at().unwrap(), head_deadline);
}
#[test]
fn peek_expires_at_returns_minimum_when_flag_set() {
let mut ring = new_ring!(4, Duration::from_secs(60));
push_val(&mut ring, 1); ring.set_ttl(Duration::from_millis(50)).unwrap();
push_val(&mut ring, 2); ring.set_ttl(Duration::from_secs(60)).unwrap();
push_val(&mut ring, 3); assert!(ring.ttl_reduced());
let min = ring.peek_expires_at().unwrap();
let head = ring.head_deadline();
assert!(
min < head,
"peek must return minimum (B's deadline), not head's (A's)"
);
}
#[test]
fn peek_expires_at_does_not_clear_flag() {
let mut ring = new_ring!(4, Duration::from_secs(60));
push_val(&mut ring, 1);
ring.set_ttl(Duration::from_millis(50)).unwrap();
push_val(&mut ring, 2);
assert!(ring.ttl_reduced());
let _ = ring.peek_expires_at();
let _ = ring.peek_expires_at();
assert!(
ring.ttl_reduced(),
"peek_expires_at must not mutate the flag"
);
}