1use core::mem::MaybeUninit;
2use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst};
3use core::{ptr, slice};
4
5pub struct Sender<'a, T, const N: usize> {
6 spsc: &'a Spsc<T, N>,
7}
8impl<'a, T, const N: usize> Sender<'a, T, N> {
9 const fn new(spsc: &'a Spsc<T, N>) -> Self {
10 Sender { spsc }
11 }
12 pub fn try_send(&mut self, t: T) -> Result<(), T> {
13 self.spsc.push(t)
14 }
15}
16impl<'a, T, const N: usize> Drop for Sender<'a, T, N> {
17 fn drop(&mut self) {
18 unsafe { self.spsc.free_sender() };
19 }
20}
21
22pub struct Receiver<'a, T, const N: usize> {
23 spsc: &'a Spsc<T, N>,
24}
25impl<'a, T, const N: usize> Receiver<'a, T, N> {
26 const fn new(spsc: &'a Spsc<T, N>) -> Self {
27 Receiver { spsc }
28 }
29 pub fn try_recv(&mut self) -> Option<T> {
30 self.spsc.pop()
31 }
32}
33impl<'a, T, const N: usize> Drop for Receiver<'a, T, N> {
34 fn drop(&mut self) {
35 unsafe { self.spsc.free_recver() };
36 }
37}
38
39pub struct Spsc<T, const N: usize> {
40 buf: MaybeUninit<[T; N]>,
41 start: AtomicUsize,
43 end: AtomicUsize,
44 has_sender: AtomicBool,
45 has_receiver: AtomicBool,
46}
47impl<T, const N: usize> Spsc<T, N> {
48 const CAPACITY: usize = N;
49 pub const fn new() -> Self {
50 Spsc {
51 buf: MaybeUninit::uninit(),
52 end: AtomicUsize::new(0),
53 start: AtomicUsize::new(0),
54 has_sender: AtomicBool::new(true),
55 has_receiver: AtomicBool::new(true),
56 }
57 }
58 pub fn take_sender(&self) -> Option<Sender<T, N>> {
59 match self
60 .has_sender
61 .compare_exchange(true, false, SeqCst, SeqCst)
62 {
63 Ok(_) => Some(Sender::new(self)),
64 Err(_) => None,
65 }
66 }
67 pub(crate) unsafe fn free_sender(&self) {
68 self.has_sender.store(true, SeqCst)
69 }
70 pub fn take_recver(&self) -> Option<Receiver<T, N>> {
71 match self
72 .has_receiver
73 .compare_exchange(true, false, SeqCst, SeqCst)
74 {
75 Ok(_) => Some(Receiver::new(self)),
76 Err(_) => None,
77 }
78 }
79 pub(crate) unsafe fn free_recver(&self) {
80 self.has_receiver.store(true, SeqCst)
81 }
82 fn ptr(&self) -> *mut T {
83 self.buf.as_ptr() as *mut T
84 }
85 pub fn capacity(&self) -> usize {
86 Self::CAPACITY
87 }
88 fn wrap_max(&self) -> usize {
89 usize::MAX / Self::CAPACITY * Self::CAPACITY
90 }
91 fn wrap_len(&self, start: usize, end: usize) -> usize {
92 if end >= start {
93 end - start
94 } else {
95 self.wrap_max() - start + end
96 }
97 }
98 pub fn len(&self) -> usize {
99 let start = self.start.load(SeqCst);
100 let end = self.end.load(SeqCst);
101 self.wrap_len(start, end)
102 }
103 pub fn is_empty(&self) -> bool {
104 self.len() == 0
105 }
106 pub fn is_full(&self) -> bool {
107 self.len() >= self.capacity()
108 }
109 #[inline]
110 unsafe fn buffer_read(&self, off: usize) -> T {
111 ptr::read(self.ptr().add(off))
112 }
113 #[inline]
114 unsafe fn buffer_write(&self, off: usize, value: T) {
115 ptr::write(self.ptr().add(off), value);
116 }
117 #[inline]
118 fn index(&self, idx: usize) -> usize {
119 idx % Self::CAPACITY
120 }
121 #[inline]
122 fn next_idx(&self, old: usize) -> usize {
123 if old == self.wrap_max() - 1 {
124 0
125 } else {
126 old + 1
127 }
128 }
129 pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
130 let ptr = self.ptr();
131 let start = self.start.load(SeqCst);
132 let end = self.end.load(SeqCst);
133 if start == end {
134 return (&mut [], &mut []);
135 }
136 let start = self.index(start);
137 let end = self.index(end);
138 if end > start {
139 (
140 unsafe { slice::from_raw_parts_mut(ptr.add(start), end - start) },
141 &mut [],
142 )
143 } else {
144 (
145 unsafe { slice::from_raw_parts_mut(ptr.add(start), N - start) },
146 unsafe { slice::from_raw_parts_mut(ptr, end) },
147 )
148 }
149 }
150 pub fn clear(&mut self) {
151 let (a, b) = self.as_mut_slices();
152 unsafe { ptr::drop_in_place(a) };
153 unsafe { ptr::drop_in_place(b) };
154 self.end.store(0, SeqCst);
155 self.start.store(0, SeqCst);
156 }
157 fn pop(&self) -> Option<T> {
158 let end = self.end.load(SeqCst);
159 let start = self.start.load(SeqCst);
160 let len = self.wrap_len(start, end);
161 if len == 0 || len > self.capacity() {
162 return None;
163 }
164
165 let index = self.index(start);
166 let ret = unsafe { Some(self.buffer_read(index)) };
167 self.start
168 .compare_exchange(start, self.next_idx(start), SeqCst, SeqCst)
169 .unwrap();
170 ret
171 }
172 fn push(&self, value: T) -> Result<(), T> {
173 let start = self.start.load(SeqCst);
174 let end = self.end.load(SeqCst);
175 let len = self.wrap_len(start, end);
176 if len >= self.capacity() {
177 return Err(value);
178 }
179
180 let index = self.index(end);
181 unsafe { self.buffer_write(index, value) };
182 self.end
183 .compare_exchange(end, self.next_idx(end), SeqCst, SeqCst)
184 .unwrap();
185 Ok(())
186 }
187}
188impl<T, const N: usize> Drop for Spsc<T, N> {
189 fn drop(&mut self) {
190 self.clear()
191 }
192}