use rtrb::{Consumer, Producer, RingBuffer};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
#[must_use]
pub fn channel<T>(capacity: usize) -> (SpscSender<T>, SpscReceiver<T>) {
SpscChannel::new(capacity)
}
pub struct SpscChannel;
impl SpscChannel {
#[allow(clippy::new_ret_no_self)]
#[must_use]
pub fn new<T>(capacity: usize) -> (SpscSender<T>, SpscReceiver<T>) {
let (producer, consumer) = RingBuffer::new(capacity);
let closed = Arc::new(AtomicBool::new(false));
(
SpscSender {
producer,
closed: Arc::clone(&closed),
},
SpscReceiver { consumer, closed },
)
}
}
pub struct SpscSender<T> {
producer: Producer<T>,
closed: Arc<AtomicBool>,
}
impl<T> SpscSender<T> {
#[inline(always)]
pub fn send(&mut self, item: T) -> Result<(), T> {
if self.closed.load(Ordering::Relaxed) {
return Err(item);
}
self.producer.push(item).map_err(|e| match e {
rtrb::PushError::Full(item) => item,
})
}
#[inline(always)]
pub fn try_send(&mut self, item: T) -> Result<(), T> {
self.send(item)
}
#[inline(always)]
#[must_use]
pub fn is_connected(&self) -> bool {
!self.closed.load(Ordering::Relaxed)
}
#[inline]
#[must_use]
pub fn len(&self) -> usize {
self.producer.slots()
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
#[must_use]
pub fn capacity(&self) -> usize {
self.producer.buffer().capacity()
}
}
impl<T> Drop for SpscSender<T> {
fn drop(&mut self) {
self.closed.store(true, Ordering::Release);
}
}
pub struct SpscReceiver<T> {
consumer: Consumer<T>,
closed: Arc<AtomicBool>,
}
impl<T> SpscReceiver<T> {
#[inline(always)]
pub fn recv(&mut self) -> Option<T> {
self.consumer.pop().ok()
}
#[inline(always)]
pub fn try_recv(&mut self) -> Option<T> {
self.recv()
}
#[inline(always)]
pub fn recv_spin(&mut self) -> T {
loop {
if let Ok(item) = self.consumer.pop() {
return item;
}
std::hint::spin_loop();
}
}
#[inline]
pub fn recv_spin_limited(&mut self, spin_count: usize) -> Option<T> {
for _ in 0..spin_count {
if let Ok(item) = self.consumer.pop() {
return Some(item);
}
std::hint::spin_loop();
}
None
}
#[inline]
pub fn drain(&mut self) -> impl Iterator<Item = T> + '_ {
std::iter::from_fn(|| self.consumer.pop().ok())
}
#[inline(always)]
#[must_use]
pub fn is_connected(&self) -> bool {
!self.closed.load(Ordering::Relaxed) || !self.is_empty()
}
#[inline]
#[must_use]
pub fn len(&self) -> usize {
self.consumer.slots()
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl<T> Drop for SpscReceiver<T> {
fn drop(&mut self) {
self.closed.store(true, Ordering::Release);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_send_recv() {
let (mut tx, mut rx) = channel::<u64>(16);
assert!(tx.send(42).is_ok());
assert_eq!(rx.recv(), Some(42));
assert_eq!(rx.recv(), None);
}
#[test]
fn test_multiple_items() {
let (mut tx, mut rx) = channel::<u64>(16);
for i in 0..10 {
assert!(tx.send(i).is_ok());
}
for i in 0..10 {
assert_eq!(rx.recv(), Some(i));
}
}
#[test]
fn test_full_channel() {
let (mut tx, mut rx) = channel::<u64>(4);
for i in 0..4 {
assert!(tx.send(i).is_ok());
}
assert!(tx.send(100).is_err());
assert_eq!(rx.recv(), Some(0));
assert!(tx.send(100).is_ok());
}
#[test]
fn test_drain() {
let (mut tx, mut rx) = channel::<u64>(16);
for i in 0..5 {
tx.send(i).unwrap();
}
let items: Vec<_> = rx.drain().collect();
assert_eq!(items, vec![0, 1, 2, 3, 4]);
assert!(rx.is_empty());
}
#[test]
fn test_disconnect_detection() {
let (tx, rx) = channel::<u64>(16);
assert!(rx.is_connected());
drop(tx);
assert!(!rx.is_connected());
}
#[test]
fn test_recv_spin_limited() {
let (mut tx, mut rx) = channel::<u64>(16);
assert_eq!(rx.recv_spin_limited(100), None);
tx.send(42).unwrap();
assert_eq!(rx.recv_spin_limited(100), Some(42));
}
#[test]
fn test_sender_is_connected() {
let (tx, rx) = channel::<u64>(16);
assert!(tx.is_connected());
drop(rx);
assert!(!tx.is_connected());
}
#[test]
fn test_sender_capacity() {
let (tx, _rx) = channel::<u64>(16);
assert_eq!(tx.capacity(), 16);
}
#[test]
fn test_receiver_len_and_is_empty() {
let (mut tx, rx) = channel::<u64>(16);
assert!(rx.is_empty());
assert_eq!(rx.len(), 0);
tx.send(1).unwrap();
tx.send(2).unwrap();
assert!(!rx.is_empty());
}
#[test]
fn test_try_send() {
let (mut tx, mut rx) = channel::<u64>(2);
assert!(tx.try_send(1).is_ok());
assert!(tx.try_send(2).is_ok());
assert!(tx.try_send(3).is_err());
rx.recv();
assert!(tx.try_send(3).is_ok());
}
#[test]
fn test_try_recv() {
let (mut tx, mut rx) = channel::<u64>(16);
assert!(rx.try_recv().is_none());
tx.send(42).unwrap();
assert_eq!(rx.try_recv(), Some(42));
assert!(rx.try_recv().is_none());
}
#[test]
fn test_send_after_receiver_dropped() {
let (mut tx, rx) = channel::<u64>(16);
drop(rx);
assert!(tx.send(42).is_err());
}
}