use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::Notify;
pub(crate) fn bounded_ring<T>(capacity: usize) -> (RingSender<T>, RingReceiver<T>) {
assert!(capacity > 0, "BoundedRing capacity must be > 0");
let inner = Arc::new(RingInner::<T>::new(capacity));
(
RingSender {
inner: inner.clone(),
},
RingReceiver { inner },
)
}
struct RingInner<T> {
capacity: usize,
buf: Mutex<VecDeque<T>>,
notify: Notify,
dropped_oldest: AtomicUsize,
sender_count: AtomicUsize,
receiver_dropped: AtomicBool,
}
impl<T> RingInner<T> {
fn new(capacity: usize) -> Self {
Self {
capacity,
buf: Mutex::new(VecDeque::with_capacity(capacity)),
notify: Notify::new(),
dropped_oldest: AtomicUsize::new(0),
sender_count: AtomicUsize::new(1),
receiver_dropped: AtomicBool::new(false),
}
}
}
pub(crate) struct RingSender<T> {
inner: Arc<RingInner<T>>,
}
impl<T> Clone for RingSender<T> {
fn clone(&self) -> Self {
self.inner.sender_count.fetch_add(1, Ordering::SeqCst);
Self {
inner: self.inner.clone(),
}
}
}
impl<T> Drop for RingSender<T> {
fn drop(&mut self) {
let prev = self.inner.sender_count.fetch_sub(1, Ordering::SeqCst);
if prev == 1 {
self.inner.notify.notify_one();
}
}
}
impl<T> RingSender<T> {
pub(crate) fn push(&self, value: T) -> usize {
if self.inner.receiver_dropped.load(Ordering::SeqCst) {
return 0;
}
let mut buf = self
.inner
.buf
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let dropped = if buf.len() >= self.inner.capacity {
buf.pop_front();
self.inner.dropped_oldest.fetch_add(1, Ordering::SeqCst);
1
} else {
0
};
buf.push_back(value);
drop(buf);
self.inner.notify.notify_one();
dropped
}
#[cfg(test)]
pub(crate) fn dropped_oldest_count(&self) -> usize {
self.inner.dropped_oldest.load(Ordering::SeqCst)
}
pub(crate) fn is_receiver_dropped(&self) -> bool {
self.inner.receiver_dropped.load(Ordering::SeqCst)
}
}
pub(crate) struct RingReceiver<T> {
inner: Arc<RingInner<T>>,
}
impl<T> Drop for RingReceiver<T> {
fn drop(&mut self) {
self.inner.receiver_dropped.store(true, Ordering::SeqCst);
}
}
impl<T> RingReceiver<T> {
pub(crate) async fn recv(&mut self) -> Option<T> {
loop {
let notified = self.inner.notify.notified();
tokio::pin!(notified);
{
let mut buf = self
.inner
.buf
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(value) = buf.pop_front() {
return Some(value);
}
if self.inner.sender_count.load(Ordering::SeqCst) == 0 {
return None;
}
}
notified.as_mut().await;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn push_then_recv_delivers_value_in_order() {
let (tx, mut rx) = bounded_ring::<i32>(4);
assert_eq!(tx.push(1), 0);
assert_eq!(tx.push(2), 0);
assert_eq!(rx.recv().await, Some(1));
assert_eq!(rx.recv().await, Some(2));
}
#[tokio::test]
async fn push_at_capacity_drops_oldest() {
let (tx, mut rx) = bounded_ring::<i32>(2);
assert_eq!(tx.push(1), 0);
assert_eq!(tx.push(2), 0);
assert_eq!(tx.push(3), 1); assert_eq!(rx.recv().await, Some(2));
assert_eq!(rx.recv().await, Some(3));
assert_eq!(tx.dropped_oldest_count(), 1);
}
#[tokio::test]
async fn recv_returns_none_when_sender_dropped_and_ring_empty() {
let (tx, mut rx) = bounded_ring::<i32>(2);
drop(tx);
assert_eq!(rx.recv().await, None);
}
#[tokio::test]
async fn recv_awaits_then_completes_on_push() {
let (tx, mut rx) = bounded_ring::<i32>(2);
let handle = tokio::spawn(async move { rx.recv().await });
tokio::task::yield_now().await;
assert_eq!(tx.push(42), 0);
assert_eq!(handle.await.unwrap(), Some(42));
}
#[tokio::test]
async fn push_after_recv_close_signals_dropped_oldest_zero() {
let (tx, rx) = bounded_ring::<i32>(2);
drop(rx);
assert_eq!(tx.push(1), 0);
assert_eq!(tx.dropped_oldest_count(), 0);
}
#[test]
#[should_panic(expected = "BoundedRing capacity must be > 0")]
fn bounded_ring_panics_on_zero_capacity() {
let _ = bounded_ring::<i32>(0);
}
#[tokio::test]
async fn recv_returns_none_only_after_last_sender_clone_drops() {
let (tx, mut rx) = bounded_ring::<i32>(2);
let tx_clone = tx.clone();
let recv_handle = tokio::spawn(async move { rx.recv().await });
tokio::task::yield_now().await;
drop(tx);
tokio::task::yield_now().await;
tokio::task::yield_now().await;
let dropped = tx_clone.push(7);
assert_eq!(dropped, 0);
let value = tokio::time::timeout(std::time::Duration::from_millis(100), recv_handle)
.await
.expect("recv resolved within timeout")
.expect("recv task did not panic");
assert_eq!(value, Some(7));
}
#[tokio::test]
async fn recv_returns_none_when_all_clones_drop_and_ring_empty() {
let (tx, mut rx) = bounded_ring::<i32>(2);
let tx_clone = tx.clone();
drop(tx);
drop(tx_clone);
assert_eq!(rx.recv().await, None);
}
}