1use core::mem::MaybeUninit;
2use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst};
3use core::{ptr, slice};
4
5pub struct Sender<'a, T, const N: usize> {
6 spsc: &'a Spsc<T, N>,
7}
8impl<'a, T, const N: usize> Sender<'a, T, N> {
9 const fn new(spsc: &'a Spsc<T, N>) -> Self {
10 Sender { spsc }
11 }
12 pub fn try_send(&mut self, t: T) -> Result<(), T> {
13 self.spsc.push(t)
14 }
15}
16impl<'a, T, const N: usize> Drop for Sender<'a, T, N> {
17 fn drop(&mut self) {
18 unsafe { self.spsc.free_sender() };
19 }
20}
21
22pub struct Receiver<'a, T, const N: usize> {
23 spsc: &'a Spsc<T, N>,
24}
25impl<'a, T, const N: usize> Receiver<'a, T, N> {
26 const fn new(spsc: &'a Spsc<T, N>) -> Self {
27 Receiver { spsc }
28 }
29 pub fn try_recv(&mut self) -> Option<T> {
30 self.spsc.pop()
31 }
32}
33impl<'a, T, const N: usize> Drop for Receiver<'a, T, N> {
34 fn drop(&mut self) {
35 unsafe { self.spsc.free_recver() };
36 }
37}
38
39pub struct Spsc<T, const N: usize> {
40 buf: MaybeUninit<[T; N]>,
41 start: AtomicUsize,
43 end: AtomicUsize,
44 has_sender: AtomicBool,
45 has_receiver: AtomicBool,
46}
47impl<T, const N: usize> Default for Spsc<T, N> {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52impl<T, const N: usize> Spsc<T, N> {
53 const CAPACITY: usize = N;
54 pub const fn new() -> Self {
55 Spsc {
56 buf: MaybeUninit::uninit(),
57 end: AtomicUsize::new(0),
58 start: AtomicUsize::new(0),
59 has_sender: AtomicBool::new(true),
60 has_receiver: AtomicBool::new(true),
61 }
62 }
63 pub fn take_sender(&self) -> Option<Sender<'_, T, N>> {
64 match self
65 .has_sender
66 .compare_exchange(true, false, SeqCst, SeqCst)
67 {
68 Ok(_) => Some(Sender::new(self)),
69 Err(_) => None,
70 }
71 }
72 pub(crate) unsafe fn free_sender(&self) {
73 self.has_sender.store(true, SeqCst)
74 }
75 pub fn take_recver(&self) -> Option<Receiver<'_, T, N>> {
76 match self
77 .has_receiver
78 .compare_exchange(true, false, SeqCst, SeqCst)
79 {
80 Ok(_) => Some(Receiver::new(self)),
81 Err(_) => None,
82 }
83 }
84 pub(crate) unsafe fn free_recver(&self) {
85 self.has_receiver.store(true, SeqCst)
86 }
87 fn ptr(&self) -> *mut T {
88 self.buf.as_ptr() as *mut T
89 }
90 pub fn capacity(&self) -> usize {
91 Self::CAPACITY
92 }
93 fn wrap_max(&self) -> usize {
94 usize::MAX / Self::CAPACITY * Self::CAPACITY
95 }
96 fn wrap_len(&self, start: usize, end: usize) -> usize {
97 if end >= start {
98 end - start
99 } else {
100 self.wrap_max() - start + end
101 }
102 }
103 pub fn len(&self) -> usize {
104 let start = self.start.load(SeqCst);
105 let end = self.end.load(SeqCst);
106 self.wrap_len(start, end)
107 }
108 pub fn is_empty(&self) -> bool {
109 self.len() == 0
110 }
111 pub fn is_full(&self) -> bool {
112 self.len() >= self.capacity()
113 }
114 #[inline]
115 unsafe fn buffer_read(&self, off: usize) -> T {
116 ptr::read(self.ptr().add(off))
117 }
118 #[inline]
119 unsafe fn buffer_write(&self, off: usize, value: T) {
120 ptr::write(self.ptr().add(off), value);
121 }
122 #[inline]
123 fn index(&self, idx: usize) -> usize {
124 idx % Self::CAPACITY
125 }
126 #[inline]
127 fn next_idx(&self, old: usize) -> usize {
128 if old == self.wrap_max() - 1 {
129 0
130 } else {
131 old + 1
132 }
133 }
134 pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
135 let ptr = self.ptr();
136 let start = self.start.load(SeqCst);
137 let end = self.end.load(SeqCst);
138 if start == end {
139 return (&mut [], &mut []);
140 }
141 let start = self.index(start);
142 let end = self.index(end);
143 if end > start {
144 (
145 unsafe { slice::from_raw_parts_mut(ptr.add(start), end - start) },
146 &mut [],
147 )
148 } else {
149 (
150 unsafe { slice::from_raw_parts_mut(ptr.add(start), N - start) },
151 unsafe { slice::from_raw_parts_mut(ptr, end) },
152 )
153 }
154 }
155 pub fn clear(&mut self) {
156 let (a, b) = self.as_mut_slices();
157 unsafe { ptr::drop_in_place(a) };
158 unsafe { ptr::drop_in_place(b) };
159 self.end.store(0, SeqCst);
160 self.start.store(0, SeqCst);
161 }
162 fn pop(&self) -> Option<T> {
163 let end = self.end.load(SeqCst);
164 let start = self.start.load(SeqCst);
165 let len = self.wrap_len(start, end);
166 if len == 0 || len > self.capacity() {
167 return None;
168 }
169
170 let index = self.index(start);
171 let ret = unsafe { Some(self.buffer_read(index)) };
172 self.start
173 .compare_exchange(start, self.next_idx(start), SeqCst, SeqCst)
174 .unwrap();
175 ret
176 }
177 fn push(&self, value: T) -> Result<(), T> {
178 let start = self.start.load(SeqCst);
179 let end = self.end.load(SeqCst);
180 let len = self.wrap_len(start, end);
181 if len >= self.capacity() {
182 return Err(value);
183 }
184
185 let index = self.index(end);
186 unsafe { self.buffer_write(index, value) };
187 self.end
188 .compare_exchange(end, self.next_idx(end), SeqCst, SeqCst)
189 .unwrap();
190 Ok(())
191 }
192}
193impl<T, const N: usize> Drop for Spsc<T, N> {
194 fn drop(&mut self) {
195 self.clear()
196 }
197}