ring_channel/
control.rs

1use crate::buffer::*;
2use alloc::boxed::Box;
3use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4use core::{ops::Deref, ptr::NonNull};
5use crossbeam_utils::CachePadded;
6use derivative::Derivative;
7
8#[cfg(feature = "futures_api")]
9use crate::waitlist::*;
10
11#[cfg(feature = "futures_api")]
12use core::task::Waker;
13
14#[derive(Derivative)]
15#[derivative(Debug(bound = ""))]
16pub(super) struct ControlBlock<T> {
17    pub(super) senders: CachePadded<AtomicUsize>,
18    pub(super) receivers: CachePadded<AtomicUsize>,
19    pub(super) connected: AtomicBool,
20    pub(super) buffer: RingBuffer<T>,
21
22    #[cfg(feature = "futures_api")]
23    pub(super) waitlist: Waitlist<Waker>,
24}
25
26impl<T> ControlBlock<T> {
27    fn new(capacity: usize) -> Self {
28        Self {
29            senders: CachePadded::new(AtomicUsize::new(1)),
30            receivers: CachePadded::new(AtomicUsize::new(1)),
31            connected: AtomicBool::new(true),
32            buffer: RingBuffer::new(capacity),
33
34            #[cfg(feature = "futures_api")]
35            waitlist: Waitlist::new(),
36        }
37    }
38}
39
40#[derive(Derivative)]
41#[derivative(
42    Debug(bound = ""),
43    Clone(bound = ""),
44    Eq(bound = ""),
45    PartialEq(bound = "")
46)]
47pub(super) struct ControlBlockRef<T>(NonNull<ControlBlock<T>>);
48
49impl<T> Unpin for ControlBlockRef<T> {}
50
51impl<T> ControlBlockRef<T> {
52    pub(super) fn new(capacity: usize) -> Self {
53        ControlBlockRef(unsafe {
54            NonNull::new_unchecked(Box::into_raw(Box::new(ControlBlock::new(capacity))))
55        })
56    }
57}
58
59impl<T> Deref for ControlBlockRef<T> {
60    type Target = ControlBlock<T>;
61
62    #[inline]
63    fn deref(&self) -> &Self::Target {
64        unsafe { self.0.as_ref() }
65    }
66}
67
68impl<T> Drop for ControlBlockRef<T> {
69    fn drop(&mut self) {
70        debug_assert!(!self.connected.load(Ordering::SeqCst));
71        debug_assert_eq!(self.senders.load(Ordering::SeqCst), 0);
72        debug_assert_eq!(self.receivers.load(Ordering::SeqCst), 0);
73
74        unsafe { drop(Box::from_raw(self.0.as_mut() as *mut ControlBlock<T>)) };
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81    use crate::Void;
82    use test_strategy::proptest;
83
84    #[proptest]
85    fn control_block_starts_connected() {
86        let ctrl = ControlBlock::<Void>::new(1);
87        assert!(ctrl.connected.load(Ordering::SeqCst));
88    }
89
90    #[proptest]
91    fn control_block_starts_with_reference_counters_equal_to_one() {
92        let ctrl = ControlBlock::<Void>::new(1);
93        assert_eq!(ctrl.senders.load(Ordering::SeqCst), 1);
94        assert_eq!(ctrl.receivers.load(Ordering::SeqCst), 1);
95    }
96
97    #[proptest]
98    fn control_block_allocates_buffer_given_capacity(#[strategy(1..=10usize)] cap: usize) {
99        let ctrl = ControlBlock::<Void>::new(cap);
100        assert_eq!(ctrl.buffer.capacity(), cap);
101    }
102}