1use core::{
5 cell::{Cell, UnsafeCell},
6 task::Waker,
7};
8
9#[cfg(feature = "tracing")]
10use tracing::trace;
11
12use crate::{
13 Subscriber,
14 chain::Chain,
15 error::{SendError, TryRecvError, TrySendError},
16};
17
18#[cfg(feature = "alloc")]
19#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
20pub mod shared;
21
22pub mod pinned;
23
24pub mod backing;
25
26pub use pinned::{Channel, ChannelExt};
27
28pub use backing::{ArrayBuf, Rendezvous};
29
30#[cfg(feature = "alloc")]
31pub use shared::channel;
32
33pub trait RingBuf {
38 type Item;
40
41 fn enqueue(&mut self, item: Self::Item) -> Result<(), Self::Item>;
44
45 fn dequeue(&mut self) -> Option<Self::Item>;
48
49 fn clear(&mut self);
51
52 fn len(&self) -> usize;
54
55 fn is_empty(&self) -> bool {
57 self.len() == 0
58 }
59}
60
61struct Inner<T, R: ?Sized + RingBuf<Item = T>> {
70 chain: Chain<OneShot<T, Waker>>,
73 pending: Cell<isize>,
79 buffer: UnsafeCell<R>,
82}
83
84impl<R: ?Sized + RingBuf> Inner<R::Item, R> {
85 fn new(buffer: R) -> Self
87 where
88 R: Sized,
89 {
90 Self {
91 chain: Chain::new(),
92 pending: Cell::new(0),
93 buffer: UnsafeCell::new(buffer),
94 }
95 }
96
97 fn close(&self) {
102 if !self.is_closed() {
103 #[cfg(feature = "tracing")]
104 trace!("closing channel");
105
106 self.pending.set(isize::MIN);
107 self.chain.notify_all().go();
108 }
109 }
110
111 fn is_closed(&self) -> bool {
113 self.pending.get() == isize::MIN
114 }
115
116 fn add_pending_sender(&self) {
117 self.pending.set(self.pending.get() + 1);
118 }
119
120 fn sub_pending_sender(&self) {
121 self.pending.set(self.pending.get() - 1);
122 }
123
124 fn add_pending_receiver(&self) {
125 self.sub_pending_sender();
126 }
127
128 fn sub_pending_receiver(&self) {
129 self.add_pending_sender();
130 }
131
132 fn has_pending_sender(&self) -> bool {
133 self.pending.get() > 0
134 }
135
136 fn has_pending_receiver(&self) -> bool {
137 self.pending.get() < 0
138 }
139
140 fn reset(&self) {
144 match self.pending.get() {
146 0 | isize::MIN => self.pending.set(0),
147 n => panic!(
148 "resetting the channel failed because there are {} pending \
149 {}-operations",
150 n.unsigned_abs(),
151 if n > 0 { "send" } else { "recv" }
152 ),
153 }
154
155 unsafe { &mut *self.buffer.get() }.clear();
157 }
158
159 fn try_send(&self, item: R::Item) -> Result<(), TrySendError<R::Item>> {
169 if self.has_pending_receiver() {
171 if !self.is_closed() {
173 self.pending.set(self.pending.get() + 1);
175
176 #[cfg(feature = "tracing")]
177 trace!(
178 remaining = self.pending.get().unsigned_abs(),
179 "passing message to the first receiver"
180 );
181
182 self.chain
184 .notify_one()
185 .then(|c| c.give(item))
186 .go()
187 .expect("receiver waiting for message");
188
189 Ok(())
190 } else {
191 Err(TrySendError::Disconnected(item)).inspect_err(|_err| {
193 #[cfg(feature = "tracing")]
194 trace!("message cannot be sent: {_err}");
195 })
196 }
197 } else {
198 self.give(item)
200 .map_err(TrySendError::Full)
201 .inspect(|_| {
202 #[cfg(feature = "tracing")]
203 trace!(
204 buffer_len = self.buffer_len(),
205 "message buffered in channel"
206 );
207 })
208 .inspect_err(|_err| {
209 #[cfg(feature = "tracing")]
210 trace!("message cannot be sent: {_err}");
211 })
212 }
213 }
214
215 fn try_recv(&self) -> Result<R::Item, TryRecvError> {
223 match self.take() {
225 Some(msg) => {
226 #[cfg(feature = "tracing")]
227 trace!(
228 buffer_len = self.buffer_len(),
229 "extracted message from the channel's buffer"
230 );
231
232 if self.has_pending_sender() {
234 let item = unsafe {
236 self.chain
237 .notify_one()
238 .then(OneShot::take)
239 .go()
240 .unwrap_unchecked()
241 .unwrap_unchecked()
242 };
243
244 self.sub_pending_sender();
246
247 self.give(item)
249 .ok()
250 .expect("buffer should not be full after an element has been extracted");
251 }
252
253 Ok(msg)
255 }
256 _ => {
257 if self.has_pending_sender() {
258 self.sub_pending_sender();
260
261 #[cfg(feature = "tracing")]
262 trace!(
263 remaining = self.pending.get(),
264 "received message from a waiting sender"
265 );
266
267 let Some(Some(msg)) = self.chain.notify_one().then(OneShot::take).go() else {
268 unreachable!("pending senders but no message")
269 };
270
271 Ok(msg)
272 } else {
273 if self.is_closed() {
274 Err(TryRecvError::Disconnected)
275 } else {
276 Err(TryRecvError::Empty)
277 }
278 .inspect_err(|_err| {
279 #[cfg(feature = "tracing")]
280 trace!("message could not be received: {_err}");
281 })
282 }
283 }
284 }
285 }
286
287 fn count(&self) -> isize {
292 self.buffer_len() as isize + self.pending.get()
293 }
294
295 fn buffer_len(&self) -> usize {
297 unsafe { &*self.buffer.get() }.len()
298 }
299
300 fn give(&self, item: R::Item) -> Result<(), R::Item> {
304 unsafe { &mut *self.buffer.get() }.enqueue(item)
305 }
306
307 fn take(&self) -> Option<R::Item> {
310 unsafe { &mut *self.buffer.get() }.dequeue()
311 }
312}
313
314struct OneShot<T, N> {
319 payload: Cell<Option<T>>,
321 notify: N,
323}
324
325impl<T, N> OneShot<T, N> {
326 pub const fn new(notify: N, payload: Option<T>) -> Self {
329 OneShot {
330 payload: Cell::new(payload),
331 notify,
332 }
333 }
334
335 pub fn take(&self) -> Option<T> {
337 self.payload.replace(None)
338 }
339
340 pub fn give(&self, item: T) {
342 self.payload.set(Some(item));
343 }
344}
345
346impl<T, N: Subscriber> Subscriber for OneShot<T, N> {
347 fn notify(&self) {
348 self.notify.notify();
349 }
350}