1use crate::buffer::*;
2use alloc::boxed::Box;
3use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4use core::{ops::Deref, ptr::NonNull};
5use crossbeam_utils::CachePadded;
6use derivative::Derivative;
7
8#[cfg(feature = "futures_api")]
9use crate::waitlist::*;
10
11#[cfg(feature = "futures_api")]
12use core::task::Waker;
13
14#[derive(Derivative)]
15#[derivative(Debug(bound = ""))]
16pub(super) struct ControlBlock<T> {
17 pub(super) senders: CachePadded<AtomicUsize>,
18 pub(super) receivers: CachePadded<AtomicUsize>,
19 pub(super) connected: AtomicBool,
20 pub(super) buffer: RingBuffer<T>,
21
22 #[cfg(feature = "futures_api")]
23 pub(super) waitlist: Waitlist<Waker>,
24}
25
26impl<T> ControlBlock<T> {
27 fn new(capacity: usize) -> Self {
28 Self {
29 senders: CachePadded::new(AtomicUsize::new(1)),
30 receivers: CachePadded::new(AtomicUsize::new(1)),
31 connected: AtomicBool::new(true),
32 buffer: RingBuffer::new(capacity),
33
34 #[cfg(feature = "futures_api")]
35 waitlist: Waitlist::new(),
36 }
37 }
38}
39
40#[derive(Derivative)]
41#[derivative(
42 Debug(bound = ""),
43 Clone(bound = ""),
44 Eq(bound = ""),
45 PartialEq(bound = "")
46)]
47pub(super) struct ControlBlockRef<T>(NonNull<ControlBlock<T>>);
48
49impl<T> Unpin for ControlBlockRef<T> {}
50
51impl<T> ControlBlockRef<T> {
52 pub(super) fn new(capacity: usize) -> Self {
53 ControlBlockRef(unsafe {
54 NonNull::new_unchecked(Box::into_raw(Box::new(ControlBlock::new(capacity))))
55 })
56 }
57}
58
59impl<T> Deref for ControlBlockRef<T> {
60 type Target = ControlBlock<T>;
61
62 #[inline]
63 fn deref(&self) -> &Self::Target {
64 unsafe { self.0.as_ref() }
65 }
66}
67
68impl<T> Drop for ControlBlockRef<T> {
69 fn drop(&mut self) {
70 debug_assert!(!self.connected.load(Ordering::SeqCst));
71 debug_assert_eq!(self.senders.load(Ordering::SeqCst), 0);
72 debug_assert_eq!(self.receivers.load(Ordering::SeqCst), 0);
73
74 unsafe { drop(Box::from_raw(self.0.as_mut() as *mut ControlBlock<T>)) };
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use super::*;
81 use crate::Void;
82 use test_strategy::proptest;
83
84 #[proptest]
85 fn control_block_starts_connected() {
86 let ctrl = ControlBlock::<Void>::new(1);
87 assert!(ctrl.connected.load(Ordering::SeqCst));
88 }
89
90 #[proptest]
91 fn control_block_starts_with_reference_counters_equal_to_one() {
92 let ctrl = ControlBlock::<Void>::new(1);
93 assert_eq!(ctrl.senders.load(Ordering::SeqCst), 1);
94 assert_eq!(ctrl.receivers.load(Ordering::SeqCst), 1);
95 }
96
97 #[proptest]
98 fn control_block_allocates_buffer_given_capacity(#[strategy(1..=10usize)] cap: usize) {
99 let ctrl = ControlBlock::<Void>::new(cap);
100 assert_eq!(ctrl.buffer.capacity(), cap);
101 }
102}