1use core::{
4 fmt,
5 task::{Context, Poll},
6};
7
8use crate::{
9 alloc::{collections::VecDeque, rc::Rc},
10 error::{SendError, TryRecvError},
11 mask::{COUNTED, UNCOUNTED},
12 queue::UnboundedQueue,
13};
14
15pub const fn channel<T>() -> UnboundedChannel<T> {
17 UnboundedChannel { queue: UnboundedQueue::new() }
18}
19
20pub fn channel_from_iter<T>(iter: impl IntoIterator<Item = T>) -> UnboundedChannel<T> {
22 UnboundedChannel::from_iter(iter)
23}
24
25pub struct UnboundedChannel<T> {
27 queue: UnboundedQueue<T>,
28}
29
30impl<T> FromIterator<T> for UnboundedChannel<T> {
31 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
32 Self { queue: UnboundedQueue::from_iter(iter) }
33 }
34}
35
36impl<T> UnboundedChannel<T> {
37 pub fn with_initial_capacity(initial: usize) -> Self {
39 Self { queue: UnboundedQueue::with_capacity(initial) }
40 }
41
42 pub fn split(&mut self) -> (UnboundedSenderRef<'_, T>, UnboundedReceiverRef<'_, T>) {
64 self.queue.0.get_mut().set_counted();
65 (UnboundedSenderRef { queue: &self.queue }, UnboundedReceiverRef { queue: &self.queue })
66 }
67
68 pub fn into_split(mut self) -> (UnboundedSender<T>, UnboundedReceiver<T>) {
76 self.queue.0.get_mut().set_counted();
77 let queue = Rc::new(self.queue);
78 (UnboundedSender { queue: Rc::clone(&queue) }, UnboundedReceiver { queue })
79 }
80
81 pub fn into_deque(self) -> VecDeque<T> {
83 self.queue.into_deque()
84 }
85
86 pub fn len(&self) -> usize {
88 self.queue.len()
89 }
90
91 #[cold]
93 pub fn close(&self) {
94 self.queue.close::<UNCOUNTED>();
95 }
96
97 pub fn is_closed(&self) -> bool {
99 self.queue.is_closed::<UNCOUNTED>()
100 }
101
102 pub fn is_empty(&self) -> bool {
104 self.len() == 0
105 }
106
107 pub fn try_recv(&self) -> Result<T, TryRecvError> {
114 self.queue.try_recv::<UNCOUNTED>()
115 }
116
117 pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
121 self.queue.poll_recv::<UNCOUNTED>(cx)
122 }
123
124 pub async fn recv(&self) -> Option<T> {
130 self.queue.recv::<UNCOUNTED>().await
131 }
132
133 pub fn send(&self, elem: T) -> Result<(), SendError<T>> {
139 self.queue.send::<UNCOUNTED>(elem)
140 }
141}
142
143impl<T> fmt::Debug for UnboundedChannel<T> {
144 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145 f.debug_struct("UnboundedChannel")
146 .field("len", &self.len())
147 .field("is_closed", &self.is_closed())
148 .finish()
149 }
150}
151
152pub struct UnboundedSender<T> {
155 queue: Rc<UnboundedQueue<T>>,
156}
157
158impl<T> UnboundedSender<T> {
159 pub fn len(&self) -> usize {
161 self.queue.len()
162 }
163
164 pub fn is_closed(&self) -> bool {
166 self.queue.is_closed::<COUNTED>()
167 }
168
169 pub fn is_empty(&self) -> bool {
171 self.queue.len() == 0
172 }
173
174 pub fn same_channel(&self, other: &Self) -> bool {
177 core::ptr::eq(Rc::as_ptr(&self.queue), Rc::as_ptr(&other.queue))
178 }
179
180 pub fn send(&self, elem: T) -> Result<(), SendError<T>> {
196 self.queue.send::<COUNTED>(elem)
197 }
198}
199
200impl<T> Clone for UnboundedSender<T> {
201 fn clone(&self) -> Self {
202 unsafe { (*self.queue.0.get()).mask.increase_sender_count() };
204 Self { queue: Rc::clone(&self.queue) }
205 }
206}
207
208impl<T> Drop for UnboundedSender<T> {
209 fn drop(&mut self) {
210 unsafe { (*self.queue.0.get()).decrease_sender_count() };
212 }
213}
214
215impl<T> fmt::Debug for UnboundedSender<T> {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 f.debug_struct("UnboundedSender")
218 .field("len", &self.len())
219 .field("is_closed", &self.is_closed())
220 .finish()
221 }
222}
223
224pub struct UnboundedSenderRef<'a, T> {
227 queue: &'a UnboundedQueue<T>,
228}
229
230impl<T> UnboundedSenderRef<'_, T> {
231 pub fn len(&self) -> usize {
233 self.queue.len()
234 }
235
236 pub fn is_closed(&self) -> bool {
238 self.queue.is_closed::<COUNTED>()
239 }
240
241 pub fn is_empty(&self) -> bool {
243 self.queue.len() == 0
244 }
245
246 pub fn same_channel(&self, other: &Self) -> bool {
249 core::ptr::eq(&self.queue, &other.queue)
250 }
251
252 pub fn send(&self, elem: T) -> Result<(), SendError<T>> {
269 self.queue.send::<COUNTED>(elem)
270 }
271}
272
273impl<T> Clone for UnboundedSenderRef<'_, T> {
274 fn clone(&self) -> Self {
275 unsafe { (*self.queue.0.get()).mask.increase_sender_count() };
277 Self { queue: self.queue }
278 }
279}
280
281impl<T> Drop for UnboundedSenderRef<'_, T> {
282 fn drop(&mut self) {
283 unsafe { (*self.queue.0.get()).decrease_sender_count() };
285 }
286}
287
288impl<T> fmt::Debug for UnboundedSenderRef<'_, T> {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 f.debug_struct("UnboundedSenderRef")
291 .field("len", &self.len())
292 .field("is_closed", &self.is_closed())
293 .finish()
294 }
295}
296
297pub struct UnboundedReceiver<T> {
300 queue: Rc<UnboundedQueue<T>>,
301}
302
303impl<T> UnboundedReceiver<T> {
304 #[cold]
306 pub fn close(&mut self) {
307 self.queue.close::<COUNTED>();
308 }
309
310 pub fn len(&self) -> usize {
312 self.queue.len()
313 }
314
315 pub fn is_closed(&self) -> bool {
317 self.queue.is_closed::<COUNTED>()
318 }
319
320 pub fn is_empty(&self) -> bool {
322 self.queue.len() == 0
323 }
324
325 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
332 self.queue.try_recv::<COUNTED>()
333 }
334
335 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
339 self.queue.poll_recv::<COUNTED>(cx)
340 }
341
342 pub async fn recv(&mut self) -> Option<T> {
349 self.queue.recv::<COUNTED>().await
350 }
351}
352
353impl<T> Drop for UnboundedReceiver<T> {
354 fn drop(&mut self) {
355 self.queue.close::<COUNTED>();
356 }
357}
358
359impl<T> fmt::Debug for UnboundedReceiver<T> {
360 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361 f.debug_struct("UnboundedReceiver")
362 .field("len", &self.len())
363 .field("is_closed", &self.is_closed())
364 .finish()
365 }
366}
367
368pub struct UnboundedReceiverRef<'a, T> {
371 queue: &'a UnboundedQueue<T>,
372}
373
374impl<T> UnboundedReceiverRef<'_, T> {
375 #[cold]
377 pub fn close(&mut self) {
378 self.queue.close::<COUNTED>();
379 }
380
381 pub fn len(&self) -> usize {
383 self.queue.len()
384 }
385
386 pub fn is_closed(&self) -> bool {
388 self.queue.is_closed::<COUNTED>()
389 }
390
391 pub fn is_empty(&self) -> bool {
393 self.queue.len() == 0
394 }
395
396 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
402 self.queue.try_recv::<COUNTED>()
403 }
404
405 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
409 self.queue.poll_recv::<COUNTED>(cx)
410 }
411
412 pub async fn recv(&mut self) -> Option<T> {
418 self.queue.recv::<COUNTED>().await
419 }
420}
421
422impl<T> Drop for UnboundedReceiverRef<'_, T> {
423 fn drop(&mut self) {
424 self.queue.close::<COUNTED>();
425 }
426}
427
428impl<T> fmt::Debug for UnboundedReceiverRef<'_, T> {
429 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430 f.debug_struct("UnboundedReceiverRef")
431 .field("len", &self.len())
432 .field("is_closed", &self.is_closed())
433 .finish()
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use core::{future::Future as _, task::Poll};
440
441 #[test]
442 fn try_recv() {
443 let chan = super::channel::<i32>();
444 assert!(chan.try_recv().is_err());
445 chan.send(-1).unwrap();
446 assert_eq!(chan.try_recv(), Ok(-1));
447 }
448
449 #[test]
450 fn try_recv_split() {
451 let mut chan = super::channel::<i32>();
452 let (tx, mut rx) = chan.split();
453
454 assert!(rx.try_recv().is_err());
455 tx.send(-1).unwrap();
456 assert_eq!(rx.try_recv(), Ok(-1));
457 }
458
459 #[test]
460 fn try_recv_closed() {
461 let chan = super::channel::<i32>();
462
463 assert!(chan.try_recv().is_err());
464 chan.send(-1).unwrap();
465 chan.close();
466
467 assert_eq!(chan.try_recv(), Ok(-1));
468 assert!(chan.try_recv().unwrap_err().is_disconnected());
469 }
470
471 #[test]
472 fn try_recv_closed_split() {
473 let mut chan = super::channel::<i32>();
474 let (tx, mut rx) = chan.split();
475
476 assert!(rx.try_recv().is_err());
477 tx.send(-1).unwrap();
478 drop(tx);
479
480 assert_eq!(rx.try_recv(), Ok(-1));
481 assert!(rx.try_recv().unwrap_err().is_disconnected());
482 }
483
484 #[test]
485 fn send_split() {
486 let mut chan = super::channel::<i32>();
487 let (tx, mut rx) = chan.split();
488
489 for i in 0..4 {
490 let _ = tx.send(i);
491 }
492
493 assert_eq!(rx.try_recv(), Ok(0));
494 assert_eq!(rx.try_recv(), Ok(1));
495 assert_eq!(rx.try_recv(), Ok(2));
496 assert_eq!(rx.try_recv(), Ok(3));
497 }
498
499 #[test]
500 fn send_closed() {
501 let chan = super::channel::<i32>();
502 chan.close();
503 assert!(chan.send(-1).is_err());
504 }
505
506 #[test]
507 fn send_closed_split() {
508 let mut chan = super::channel::<i32>();
509 let (tx, _) = chan.split();
510
511 assert!(tx.send(-1).is_err());
512 }
513 #[test]
514 fn recv() {
515 futures_lite::future::block_on(async {
516 let chan = super::channel::<i32>();
517
518 chan.send(-1).unwrap();
519 assert_eq!(chan.recv().await, Some(-1));
520 chan.send(-2).unwrap();
521 assert_eq!(chan.recv().await, Some(-2));
522 chan.close();
523 chan.send(-3).unwrap_err();
524 });
525 }
526
527 #[test]
528 fn recv_split() {
529 futures_lite::future::block_on(async {
530 let mut chan = super::channel::<i32>();
531 let (tx, mut rx) = chan.split();
532
533 tx.send(-1).unwrap();
534 assert_eq!(rx.recv().await, Some(-1));
535 tx.send(-2).unwrap();
536 assert_eq!(rx.recv().await, Some(-2));
537 drop(rx);
538 tx.send(-3).unwrap_err();
539 });
540 }
541
542 #[test]
543 fn recv_closed_split() {
544 futures_lite::future::block_on(async {
545 let mut chan = super::channel::<i32>();
546 let (tx, mut rx) = chan.split();
547
548 tx.send(-1).unwrap();
549 tx.send(-2).unwrap();
550 tx.send(-3).unwrap();
551
552 rx.close();
553 assert!(tx.send(-4).is_err());
554
555 assert_eq!(rx.recv().await, Some(-1));
556 assert_eq!(rx.recv().await, Some(-2));
557 assert_eq!(rx.recv().await, Some(-3));
558 assert_eq!(rx.recv().await, None);
559
560 assert!(tx.send(-4).is_err());
561 });
562 }
563
564 #[test]
565 fn poll_recv() {
566 futures_lite::future::block_on(async {
567 let chan = super::channel::<i32>();
568 core::future::poll_fn(|cx| {
569 assert!(chan.poll_recv(cx).is_pending());
570 assert!(chan.poll_recv(cx).is_pending());
571
572 chan.send(1).unwrap();
573 assert_eq!(chan.poll_recv(cx), Poll::Ready(Some(1)));
574
575 Poll::Ready(())
576 })
577 .await;
578 });
579 }
580
581 #[test]
582 fn multiple_recv() {
583 futures_lite::future::block_on(async {
584 let chan = super::channel::<i32>();
585 let mut recv1 = Box::pin(chan.recv());
586 let mut recv2 = Box::pin(chan.recv());
587
588 core::future::poll_fn(|cx| {
589 assert!(recv1.as_mut().poll(cx).is_pending());
591 assert!(recv2.as_mut().poll(cx).is_pending());
593
594 chan.send(1).unwrap();
596 assert_eq!(recv2.as_mut().poll(cx), Poll::Ready(Some(1)));
597
598 Poll::Ready(())
599 })
600 .await;
601
602 chan.send(2).unwrap();
603 assert_eq!(chan.recv().await, Some(2))
604 });
605 }
606
607 #[test]
608 fn use_after_split() {
609 futures_lite::future::block_on(async {
610 let mut chan = super::channel::<i32>();
611 {
612 let (tx, mut rx) = chan.split();
613 tx.send(1).unwrap();
614 tx.send(2).unwrap();
615 assert_eq!(rx.recv().await, Some(1));
616 rx.close();
617 }
618
619 assert!(chan.is_closed());
620 assert_eq!(chan.recv().await, Some(2));
621 assert_eq!(chan.recv().await, None);
622 });
623 }
624
625 #[test]
626 fn split_after_close() {
627 let mut chan = super::channel::<i32>();
628 chan.close();
629
630 let (tx, rx) = chan.split();
631 assert!(tx.is_closed());
632 assert!(rx.is_closed());
633 }
634}