1use std::cell::{Cell, RefCell};
4use std::collections::VecDeque;
5use std::rc::Rc;
6
7use static_assertions::assert_not_impl_any;
8
9use super::shared::Permit;
10use crate::channel::prelude::*;
11use crate::channel::{self, SendError, TryRecvError, TrySendError};
12use crate::coroutine::{self, Resumption};
13use crate::select::{self, Identifier, PermitReader, PermitWriter, Selectable, Selector, TrySelectError};
14
15enum Waker {
16 Selector(Selector),
17 Resumption(Resumption<()>),
18}
19
20impl From<Resumption<()>> for Waker {
21 fn from(resumption: Resumption<()>) -> Self {
22 Waker::Resumption(resumption)
23 }
24}
25
26impl From<Selector> for Waker {
27 fn from(selector: Selector) -> Self {
28 Waker::Selector(selector)
29 }
30}
31
32impl Waker {
33 fn wake(self) -> bool {
34 match self {
35 Waker::Selector(selector) => selector.apply(Permit::Consume.into()),
36 Waker::Resumption(resumption) => {
37 resumption.resume(());
38 true
39 },
40 }
41 }
42
43 fn matches(&self, identifier: &Identifier) -> bool {
44 if let Waker::Selector(selector) = self {
45 selector.identify(identifier)
46 } else {
47 false
48 }
49 }
50}
51
52struct State<T: 'static> {
53 closed: bool,
54
55 bound: usize,
56 deque: VecDeque<T>,
57 senders: VecDeque<Waker>,
58 receivers: VecDeque<Waker>,
59}
60
61impl<T: 'static> State<T> {
62 fn new(cap: usize, bound: usize) -> Self {
63 State {
64 closed: false,
65 bound,
66 deque: VecDeque::with_capacity(cap),
67 senders: VecDeque::with_capacity(16),
68 receivers: VecDeque::with_capacity(5),
69 }
70 }
71
72 fn is_full(&self) -> bool {
73 self.bound == self.deque.len()
74 }
75
76 fn is_sendable(&self) -> bool {
77 self.closed || !self.is_full()
78 }
79
80 fn is_recvable(&self) -> bool {
81 !self.deque.is_empty() || self.closed
82 }
83
84 fn wake_sender(&mut self) {
85 while let Some(waker) = self.senders.pop_front() {
86 if waker.wake() {
87 break;
88 }
89 }
90 }
91
92 fn wake_receiver(&mut self) {
93 while let Some(receiver) = self.receivers.pop_front() {
94 if receiver.wake() {
95 break;
96 }
97 }
98 }
99
100 fn close(&mut self) {
101 self.closed = true;
102 while let Some(waker) = self.senders.pop_front() {
103 waker.wake();
104 }
105 while let Some(receiver) = self.receivers.pop_front() {
106 receiver.wake();
107 }
108 }
109}
110
111impl<T: 'static> Drop for State<T> {
112 fn drop(&mut self) {
113 self.close();
114 }
115}
116
117struct Channel<T: 'static> {
118 state: RefCell<State<T>>,
119 senders: Cell<usize>,
120 receivers: Cell<usize>,
121}
122
123impl<T: 'static> Channel<T> {
124 fn new(cap: usize, bound: usize) -> Rc<Channel<T>> {
125 let state = State::new(cap, bound);
126 Rc::new(Channel { state: RefCell::new(state), senders: Cell::new(1), receivers: Cell::new(1) })
127 }
128
129 fn add_sender(&self) {
130 let senders = self.senders.get() + 1;
131 self.senders.set(senders);
132 }
133
134 fn remove_sender(&self) {
135 let senders = self.senders.get() - 1;
136 self.senders.set(senders);
137 if senders == 0 {
138 let mut state = self.state.borrow_mut();
139 state.close();
140 }
141 }
142
143 fn add_receiver(&self) {
144 let receivers = self.receivers.get() + 1;
145 self.receivers.set(receivers);
146 }
147
148 fn remove_receiver(&self) {
149 let receivers = self.receivers.get() - 1;
150 self.receivers.set(receivers);
151 if receivers == 0 {
152 let mut state = self.state.borrow_mut();
153 state.close();
154 }
155 }
156
157 fn close(&self) {
158 let mut state = self.state.borrow_mut();
159 state.close();
160 }
161
162 fn send(&self, trying: bool, value: T) -> Result<(), TrySendError<T>> {
163 loop {
164 let mut state = self.state.borrow_mut();
165 if state.closed {
166 return Err(TrySendError::Closed(value));
167 } else if !state.is_full() {
168 state.deque.push_back(value);
169 state.wake_receiver();
170 return Ok(());
171 } else if trying {
172 return Err(TrySendError::Full(value));
173 } else {
174 let (suspension, resumption) = coroutine::suspension();
175 state.senders.push_back(Waker::from(resumption));
176 drop(state);
177 suspension.suspend();
178 }
179 }
180 }
181
182 fn is_sendable(&self) -> bool {
183 let state = self.state.borrow();
184 state.is_sendable()
185 }
186
187 fn is_recvable(&self) -> bool {
188 let state = self.state.borrow();
189 state.is_recvable()
190 }
191
192 fn watch_send_permit(&self, watcher: Selector) -> bool {
193 assert!(!self.is_sendable(), "wait on sendable channel");
194 let mut state = self.state.borrow_mut();
195 state.senders.push_back(Waker::from(watcher));
196 true
197 }
198
199 fn watch_recv_permit(&self, selector: Selector) -> bool {
200 assert!(!self.is_recvable(), "wait on recvable channel");
201 let mut state = self.state.borrow_mut();
202 state.receivers.push_back(Waker::from(selector));
203 true
204 }
205
206 fn unwatch_send_permit(&self, identifier: &Identifier) {
207 let mut state = self.state.borrow_mut();
208 if let Some(position) = state.senders.iter().position(|w| w.matches(identifier)) {
209 state.senders.remove(position);
210 }
211 }
212
213 fn unwatch_recv_permit(&self, identifier: &Identifier) {
214 let mut state = self.state.borrow_mut();
215 if let Some(position) = state.receivers.iter().position(|w| w.matches(identifier)) {
216 state.receivers.remove(position);
217 }
218 }
219
220 fn recv(&self, trying: bool) -> Result<T, TryRecvError> {
221 loop {
222 let mut state = self.state.borrow_mut();
223 if let Some(value) = state.deque.pop_front() {
224 state.wake_sender();
225 return Ok(value);
226 } else if state.closed {
227 return Err(TryRecvError::Closed);
228 } else if trying {
229 return Err(TryRecvError::Empty);
230 }
231 let (suspension, resumption) = coroutine::suspension();
232 state.receivers.push_back(Waker::from(resumption));
233 drop(state);
234 suspension.suspend();
235 }
236 }
237}
238
239impl<T> super::shared::Channel<T> for Rc<Channel<T>> {
240 fn send(&self, trying: bool, value: T) -> Result<(), TrySendError<T>> {
241 Channel::send(self, trying, value)
242 }
243
244 fn add_sender(&self) {
245 Channel::add_sender(self)
246 }
247
248 fn remove_sender(&self) {
249 Channel::remove_sender(self)
250 }
251
252 fn select_send_permit(&self) -> Option<Permit> {
253 if self.is_sendable() {
254 Some(Permit::Consume)
255 } else {
256 None
257 }
258 }
259
260 fn consume_send_permit(&self, value: T) -> Result<(), SendError<T>> {
261 match self.send(true, value) {
262 Ok(()) => Ok(()),
263 Err(TrySendError::Closed(value)) => Err(SendError::Closed(value)),
264 Err(TrySendError::Full(_)) => panic!("not ready to send"),
265 }
266 }
267
268 fn watch_send_permit(&self, selector: Selector) -> bool {
269 Channel::watch_send_permit(self, selector)
270 }
271
272 fn unwatch_send_permit(&self, identifier: &Identifier) {
273 Channel::unwatch_send_permit(self, identifier)
274 }
275
276 fn recv(&self, trying: bool) -> Result<T, TryRecvError> {
277 Channel::recv(self, trying)
278 }
279
280 fn add_receiver(&self) {
281 Channel::add_receiver(self)
282 }
283
284 fn remove_receiver(&self) {
285 Channel::remove_receiver(self)
286 }
287
288 fn select_recv_permit(&self) -> Option<Permit> {
289 if self.is_recvable() {
290 Some(Permit::Consume)
291 } else {
292 None
293 }
294 }
295
296 fn consume_recv_permit(&self) -> Option<T> {
297 match self.recv(true) {
298 Ok(value) => Some(value),
299 Err(TryRecvError::Empty) => panic!("not ready to recv"),
300 Err(TryRecvError::Closed) => None,
301 }
302 }
303
304 fn watch_recv_permit(&self, selector: Selector) -> bool {
305 Channel::watch_recv_permit(self, selector)
306 }
307
308 fn unwatch_recv_permit(&self, identifier: &Identifier) {
309 Channel::unwatch_recv_permit(self, identifier)
310 }
311
312 fn close(&self) {
313 Channel::close(self)
314 }
315}
316
317pub struct Sender<T: 'static>(super::shared::Sender<T, Rc<Channel<T>>>);
319
320impl<T: 'static> channel::Sender<T> for Sender<T> {
321 fn send(&mut self, value: T) -> Result<(), SendError<T>> {
322 self.0.send(value)
323 }
324
325 fn try_send(&mut self, value: T) -> Result<(), TrySendError<T>> {
326 self.0.try_send(value)
327 }
328
329 fn close(&mut self) {
330 self.0.close()
331 }
332
333 fn is_closed(&self) -> bool {
334 self.0.is_closed()
335 }
336}
337
338impl<T: 'static> Clone for Sender<T> {
339 fn clone(&self) -> Self {
340 Sender(self.0.clone())
341 }
342}
343
344impl<T: 'static> Selectable for Sender<T> {
345 fn parallel(&self) -> bool {
346 false
347 }
348
349 fn select_permit(&self) -> Result<select::Permit, TrySelectError> {
350 self.0.select_permit()
351 }
352
353 fn watch_permit(&self, selector: Selector) -> bool {
354 self.0.watch_permit(selector)
355 }
356
357 fn unwatch_permit(&self, identifier: &Identifier) {
358 self.0.unwatch_permit(identifier)
359 }
360}
361
362impl<T: 'static> PermitWriter for Sender<T> {
363 type Item = T;
364 type Result = Result<(), SendError<T>>;
365
366 fn consume_permit(&mut self, permit: select::Permit, value: Self::Item) -> Self::Result {
367 self.0.consume_permit(permit, value)
368 }
369}
370
371pub struct Receiver<T: 'static>(super::shared::Receiver<T, Rc<Channel<T>>>);
373
374impl<T: 'static> channel::Receiver<T> for Receiver<T> {
375 fn recv(&mut self) -> Option<T> {
376 self.0.recv()
377 }
378
379 fn try_recv(&mut self) -> Result<T, TryRecvError> {
380 self.0.try_recv()
381 }
382
383 fn close(&mut self) {
384 self.0.close()
385 }
386
387 fn terminate(&mut self) {
388 self.0.terminate()
389 }
390
391 fn is_drained(&self) -> bool {
392 self.0.is_drained()
393 }
394}
395
396impl<T: 'static> Clone for Receiver<T> {
397 fn clone(&self) -> Self {
398 Receiver(self.0.clone())
399 }
400}
401
402impl<T: 'static> Selectable for Receiver<T> {
403 fn parallel(&self) -> bool {
404 false
405 }
406
407 fn select_permit(&self) -> Result<select::Permit, TrySelectError> {
408 self.0.select_permit()
409 }
410
411 fn watch_permit(&self, selector: Selector) -> bool {
412 self.0.watch_permit(selector)
413 }
414
415 fn unwatch_permit(&self, identifier: &Identifier) {
416 self.0.unwatch_permit(identifier)
417 }
418}
419
420impl<T: 'static> PermitReader for Receiver<T> {
421 type Result = Option<T>;
422
423 fn consume_permit(&mut self, permit: select::Permit) -> Self::Result {
424 self.0.consume_permit(permit)
425 }
426}
427
428assert_not_impl_any!(Sender<()>: Send, Sync);
429assert_not_impl_any!(Receiver<()>: Send, Sync);
430
431impl<T: 'static> IntoIterator for Receiver<T> {
432 type IntoIter = IntoIter<T>;
433 type Item = T;
434
435 fn into_iter(self) -> Self::IntoIter {
436 IntoIter { receiver: self }
437 }
438}
439
440pub struct IntoIter<T: 'static> {
442 receiver: Receiver<T>,
443}
444
445impl<T: 'static> std::iter::Iterator for IntoIter<T> {
446 type Item = T;
447
448 fn next(&mut self) -> Option<Self::Item> {
449 self.receiver.recv()
450 }
451}
452
453impl<T: 'static> std::iter::FusedIterator for IntoIter<T> {}
454
455fn channel<T: 'static>(capacity: usize, bound: usize) -> (Sender<T>, Receiver<T>) {
456 let channel = Channel::new(capacity, bound);
457 let sender = Sender(super::shared::Sender::new(channel.clone()));
458 let receiver = Receiver(super::shared::Receiver::new(channel));
459 (sender, receiver)
460}
461
462pub fn completed<T: 'static>() -> (Sender<T>, Receiver<T>) {
464 let sender = Sender(super::shared::Sender::empty());
465 let receiver = Receiver(super::shared::Receiver::empty());
466 (sender, receiver)
467}
468
469pub fn bounded<T: 'static>(bound: usize) -> (Sender<T>, Receiver<T>) {
471 assert!(bound > 0, "bound must be greater than 0");
472 channel(bound, bound)
473}
474
475pub fn unbounded<T: 'static>(capacity: usize) -> (Sender<T>, Receiver<T>) {
477 channel(capacity, usize::MAX)
478}
479
480#[cfg(test)]
481mod tests {
482 use std::time::{Duration, Instant};
483
484 use ignore_result::Ignore;
485 use more_asserts::{assert_ge, assert_le};
486 use pretty_assertions::assert_eq;
487
488 use super::*;
489 use crate::channel::serial;
490 use crate::{select, time};
491
492 #[crate::test(crate = "crate")]
493 fn completed() {
494 let (mut sender, mut receiver) = serial::completed();
495 assert_eq!(receiver.try_recv(), Err(TryRecvError::Closed));
496 assert_eq!(receiver.recv(), None);
497
498 assert_eq!(sender.send(()), Err(SendError::Closed(())));
499 assert_eq!(sender.try_send(()), Err(TrySendError::Closed(())));
500
501 assert!(sender.is_closed());
502 assert!(receiver.is_drained());
503 select! {
504 _ = <-receiver => unreachable!("completed"),
505 complete => {},
506 }
507 select! {
508 _ = sender<-() => unreachable!("completed"),
509 complete => {},
510 }
511 }
512
513 #[crate::test(crate = "crate")]
514 fn receiver_into_iter() {
515 let (mut sender, receiver) = serial::bounded(3);
516 sender.send(1).unwrap();
517 sender.send(2).unwrap();
518 sender.send(3).unwrap();
519 drop(sender);
520
521 let mut iter = receiver.into_iter();
522 assert_eq!(iter.next(), Some(1));
523 assert_eq!(iter.next(), Some(2));
524 assert_eq!(iter.next(), Some(3));
525 assert_eq!(iter.next(), None);
526 assert_eq!(iter.next(), None);
527 }
528
529 #[test]
530 #[should_panic]
531 fn bounded_zero() {
532 bounded::<()>(0);
533 }
534
535 fn series_send(mut sender: Sender<i32>, mut receiver: Receiver<i32>) {
536 sender.send(1).unwrap();
537 sender.send(2).unwrap();
538 assert_eq!(1, receiver.recv().unwrap());
539 assert_eq!(2, receiver.recv().unwrap());
540 drop(receiver);
541
542 assert_eq!(sender.send(3).unwrap_err(), SendError::Closed(3));
543 assert_eq!(sender.try_send(6).unwrap_err(), TrySendError::Closed(6));
544 }
545
546 #[test]
547 fn bounded_send() {
548 let (sender, receiver) = bounded::<i32>(2);
549 series_send(sender, receiver);
550 }
551
552 #[test]
553 fn unbounded_send() {
554 let (sender, receiver) = unbounded::<i32>(2);
555 series_send(sender, receiver);
556 }
557
558 #[test]
559 fn bounded_try_send_full() {
560 let (mut sender, mut receiver) = bounded::<i32>(2);
561 sender.try_send(1).unwrap();
562 sender.try_send(2).unwrap();
563 assert_eq!(sender.try_send(3).unwrap_err(), TrySendError::Full(3));
564 drop(sender);
565 assert_eq!(1, receiver.recv().unwrap());
566 assert_eq!(2, receiver.recv().unwrap());
567 assert_eq!(None, receiver.recv());
568 }
569
570 #[test]
571 fn unbounded_try_send() {
572 let (mut sender, mut receiver) = unbounded::<i32>(1);
573 sender.try_send(1).unwrap();
574 sender.try_send(2).unwrap();
575 sender.try_send(3).unwrap();
576 drop(sender);
577 assert_eq!(1, receiver.recv().unwrap());
578 assert_eq!(2, receiver.recv().unwrap());
579 assert_eq!(3, receiver.recv().unwrap());
580 assert_eq!(None, receiver.recv());
581 }
582
583 #[crate::test(crate = "crate")]
584 fn bounded_blocking() {
585 let (mut ready_sender, mut ready_receiver) = bounded::<()>(1);
586 let (mut sender, mut receiver) = bounded::<i32>(5);
587 let sending = coroutine::spawn(move || {
588 let now = Instant::now();
589 sender.send(1).unwrap();
590 sender.send(2).unwrap();
591 sender.send(3).unwrap();
592 sender.send(4).unwrap();
593 sender.send(5).unwrap();
594 assert_le!(now.elapsed(), Duration::from_secs(5));
595 let now = Instant::now();
596 ready_sender.send(()).unwrap();
597 sender.send(6).unwrap();
598 assert_ge!(now.elapsed(), Duration::from_secs(5));
599 });
600 ready_receiver.recv().unwrap();
601 time::sleep(Duration::from_secs(6));
602 assert_eq!(1, receiver.recv().unwrap());
603 assert_eq!(2, receiver.recv().unwrap());
604 assert_eq!(3, receiver.recv().unwrap());
605 assert_eq!(4, receiver.recv().unwrap());
606 assert_eq!(5, receiver.recv().unwrap());
607 assert_eq!(6, receiver.recv().unwrap());
608 assert_eq!(None, receiver.recv());
609 sending.join().unwrap();
610 }
611
612 #[crate::test(crate = "crate")]
613 fn unbounded_nonblocking() {
614 let (mut ready_sender, mut ready_receiver) = bounded::<()>(1);
615 let (mut sender, mut receiver) = unbounded::<i32>(0);
616 let sending = coroutine::spawn(move || {
617 ready_sender.send(()).unwrap();
618 let now = Instant::now();
619 sender.send(1).unwrap();
620 sender.send(2).unwrap();
621 sender.send(3).unwrap();
622 sender.send(4).unwrap();
623 sender.send(5).unwrap();
624 sender.send(6).unwrap();
625 assert_le!(now.elapsed(), Duration::from_secs(5));
626 });
627 ready_receiver.recv().unwrap();
628 time::sleep(Duration::from_secs(6));
629 assert_eq!(1, receiver.recv().unwrap());
630 assert_eq!(2, receiver.recv().unwrap());
631 assert_eq!(3, receiver.recv().unwrap());
632 assert_eq!(4, receiver.recv().unwrap());
633 assert_eq!(5, receiver.recv().unwrap());
634 assert_eq!(6, receiver.recv().unwrap());
635 assert_eq!(None, receiver.recv());
636 sending.join().unwrap();
637 }
638
639 #[crate::test(crate = "crate")]
640 fn send_close() {
641 let (mut sender, _receiver) = bounded(3);
642 sender.send(1).unwrap();
643
644 sender.close();
645
646 select! {
647 _ = sender<-2 => panic!("completed"),
648 complete => {},
649 }
650
651 assert_eq!(sender.send(3), Err(SendError::Closed(3)));
652 }
653
654 #[crate::test(crate = "crate")]
655 fn receiver_close() {
656 let (mut sender, mut receiver) = bounded(3);
657 sender.send(1).unwrap();
658 sender.send(2).unwrap();
659 sender.send(3).unwrap();
660
661 receiver.close();
662 assert_eq!(receiver.recv(), Some(1));
663 select! {
664 r = <-receiver => assert_eq!(r, Some(2)),
665 complete => panic!("not completed"),
666 }
667 assert_eq!(receiver.recv(), Some(3));
668 select! {
669 r = <-receiver => assert_eq!(r, None),
670 complete => panic!("not completed"),
671 }
672 select! {
673 _ = <-receiver => panic!("completed"),
674 complete => {},
675 }
676 assert_eq!(receiver.recv(), None);
677 }
678
679 #[crate::test(crate = "crate")]
680 fn receiver_terminate() {
681 let (mut sender, mut receiver) = bounded(3);
682 sender.send(1).unwrap();
683 sender.send(2).unwrap();
684
685 receiver.terminate();
686 select! {
687 _ = <-receiver => panic!("terminated"),
688 complete => {},
689 }
690 assert_eq!(receiver.recv(), None);
691 }
692
693 #[crate::test(crate = "crate")]
694 fn sender_select() {
695 let (mut sender1, receiver1) = bounded(1);
696 let (mut sender2, receiver2) = unbounded(1);
697
698 let task1 = coroutine::spawn(move || receiver1.into_iter().collect::<Vec<_>>());
699
700 let task2 = coroutine::spawn(move || receiver2.into_iter().collect::<Vec<_>>());
701
702 let mut values1 = VecDeque::from(vec![1, 3, 5]);
703 let mut values2 = VecDeque::from(vec![2, 4, 6]);
704
705 loop {
706 select! {
707 _ = sender1<-values1.pop_front().unwrap() => if values1.is_empty() {
708 sender1.close();
709 },
710 _ = sender2<-values2.pop_front().unwrap() => if values2.is_empty() {
711 sender2.close();
712 },
713 complete => break,
714 }
715 }
716
717 assert_eq!(task1.join().unwrap(), vec![1, 3, 5]);
718 assert_eq!(task2.join().unwrap(), vec![2, 4, 6]);
719 }
720
721 #[crate::test(crate = "crate")]
722 fn receiver_select() {
723 let (mut sender1, mut receiver1) = bounded(10);
724 let (mut sender2, mut receiver2) = unbounded(10);
725
726 coroutine::spawn(move || {
727 for v in vec![1, 3, 5] {
728 sender1.send(v).ignore();
729 }
730 });
731
732 coroutine::spawn(move || {
733 for v in vec![2, 4, 6] {
734 sender2.send(v).ignore();
735 }
736 });
737
738 let mut values1 = Vec::new();
739 let mut values2 = Vec::new();
740
741 loop {
742 select! {
743 r = <-receiver1 => if let Some(v) = r {
744 values1.push(v);
745 },
746 r = <-receiver2 => if let Some(v) = r {
747 values2.push(v);
748 },
749 complete => break,
750 }
751 }
752
753 assert_eq!(values1, vec![1, 3, 5]);
754 assert_eq!(values2, vec![2, 4, 6]);
755 }
756}