use std::sync::Arc;
#[cfg(test)]
use ringbuf::traits::Observer;
use ringbuf::{
HeapCons, HeapProd, HeapRb,
traits::{Consumer, Producer, Split},
};
pub(crate) trait WakeSignal: Send + Sync + 'static {
fn wake(&self);
}
pub(crate) struct Outlet<T> {
producer: HeapProd<T>,
overflow: Option<T>,
wake: Option<Arc<dyn WakeSignal>>,
}
impl<T> Outlet<T> {
pub(crate) fn flush(&mut self) -> bool {
let Some(item) = self.overflow.take() else {
return true;
};
self.push_or_park(item)
}
pub(crate) fn has_pending(&self) -> bool {
self.overflow.is_some()
}
#[cfg(test)]
pub(crate) fn is_full(&self) -> bool {
self.overflow.is_some() && self.producer.is_full()
}
fn notify(&self) {
if let Some(wake) = &self.wake {
wake.wake();
}
}
fn push_or_park(&mut self, item: T) -> bool {
debug_assert!(
self.overflow.is_none(),
"push_or_park called with non-empty overflow"
);
match self.producer.try_push(item) {
Ok(()) => {
self.notify();
true
}
Err(item) => {
self.overflow = Some(item);
false
}
}
}
pub(crate) fn take_pending(&mut self) -> Option<T> {
self.overflow.take()
}
pub(crate) fn try_push(&mut self, item: T) -> Result<(), T> {
if !self.flush() {
return Err(item);
}
let _ = self.push_or_park(item);
Ok(())
}
}
pub(crate) struct Inlet<T> {
consumer: HeapCons<T>,
}
impl<T> Inlet<T> {
#[cfg(test)]
pub(crate) fn is_empty(&self) -> bool {
self.consumer.is_empty()
}
pub(crate) fn try_pop(&mut self) -> Option<T> {
self.consumer.try_pop()
}
}
#[must_use]
pub(crate) fn connect<T>(
capacity: usize,
wake: Option<Arc<dyn WakeSignal>>,
) -> (Outlet<T>, Inlet<T>) {
let rb = HeapRb::<T>::new(capacity);
let (producer, consumer) = rb.split();
(
Outlet {
producer,
wake,
overflow: None,
},
Inlet { consumer },
)
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use kithara_test_utils::kithara;
use super::*;
struct TestWake {
woken: AtomicBool,
}
impl WakeSignal for TestWake {
fn wake(&self) {
self.woken.store(true, Ordering::SeqCst);
}
}
struct CountingWake {
count: AtomicUsize,
}
impl WakeSignal for CountingWake {
fn wake(&self) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}
#[kithara::test]
fn connect_push_pop() {
let (mut out, mut inl) = connect::<i32>(2, None);
assert!(inl.is_empty());
assert!(!out.is_full());
assert_eq!(out.try_push(1), Ok(()));
assert_eq!(out.try_push(2), Ok(()));
assert_eq!(out.try_push(3), Ok(()));
assert!(out.has_pending());
assert_eq!(out.try_push(4), Err(4));
assert!(out.is_full());
assert_eq!(inl.try_pop(), Some(1));
assert_eq!(inl.try_pop(), Some(2));
assert_eq!(inl.try_pop(), None);
assert!(out.flush());
assert!(!out.has_pending());
assert_eq!(inl.try_pop(), Some(3));
assert_eq!(inl.try_pop(), None);
assert!(inl.is_empty());
}
#[kithara::test]
fn try_push_drains_overflow_first() {
let (mut out, mut inl) = connect::<i32>(1, None);
assert_eq!(out.try_push(1), Ok(()));
assert_eq!(out.try_push(2), Ok(()));
assert!(out.has_pending());
assert_eq!(inl.try_pop(), Some(1));
assert_eq!(out.try_push(3), Ok(()));
assert!(out.has_pending());
assert_eq!(inl.try_pop(), Some(2));
assert!(out.flush());
assert_eq!(inl.try_pop(), Some(3));
}
#[kithara::test]
fn flush_returns_false_when_ring_full() {
let (mut out, mut inl) = connect::<i32>(1, None);
assert_eq!(out.try_push(1), Ok(()));
assert_eq!(out.try_push(2), Ok(()));
assert!(out.has_pending());
assert!(!out.flush());
assert!(out.has_pending());
assert_eq!(inl.try_pop(), Some(1));
assert!(out.flush());
assert!(!out.has_pending());
}
#[kithara::test]
fn take_pending_clears_overflow() {
let (mut out, _inl) = connect::<i32>(1, None);
assert_eq!(out.try_push(1), Ok(()));
assert_eq!(out.try_push(2), Ok(()));
assert_eq!(out.take_pending(), Some(2));
assert!(!out.has_pending());
assert_eq!(out.take_pending(), None);
}
#[kithara::test]
fn wake_signal() {
let wake = Arc::new(TestWake {
woken: AtomicBool::new(false),
});
let (mut out, _inl) = connect::<i32>(2, Some(wake.clone()));
assert!(!wake.woken.load(Ordering::SeqCst));
assert_eq!(out.try_push(42), Ok(()));
assert!(wake.woken.load(Ordering::SeqCst));
}
#[kithara::test]
fn wake_skipped_when_parking_in_overflow() {
let wake = Arc::new(CountingWake {
count: AtomicUsize::new(0),
});
let (mut out, mut inl) = connect::<i32>(1, Some(wake.clone()));
assert_eq!(out.try_push(1), Ok(()));
assert_eq!(wake.count.load(Ordering::SeqCst), 1);
assert_eq!(out.try_push(2), Ok(()));
assert_eq!(wake.count.load(Ordering::SeqCst), 1);
assert_eq!(inl.try_pop(), Some(1));
assert!(out.flush());
assert_eq!(wake.count.load(Ordering::SeqCst), 2);
}
}