stuck/channel/
serial.rs

1//! Channel utilities for commnication across coroutines in one task.
2
3use std::cell::{Cell, RefCell};
4use std::collections::VecDeque;
5use std::rc::Rc;
6
7use static_assertions::assert_not_impl_any;
8
9use super::shared::Permit;
10use crate::channel::prelude::*;
11use crate::channel::{self, SendError, TryRecvError, TrySendError};
12use crate::coroutine::{self, Resumption};
13use crate::select::{self, Identifier, PermitReader, PermitWriter, Selectable, Selector, TrySelectError};
14
15enum Waker {
16    Selector(Selector),
17    Resumption(Resumption<()>),
18}
19
20impl From<Resumption<()>> for Waker {
21    fn from(resumption: Resumption<()>) -> Self {
22        Waker::Resumption(resumption)
23    }
24}
25
26impl From<Selector> for Waker {
27    fn from(selector: Selector) -> Self {
28        Waker::Selector(selector)
29    }
30}
31
32impl Waker {
33    fn wake(self) -> bool {
34        match self {
35            Waker::Selector(selector) => selector.apply(Permit::Consume.into()),
36            Waker::Resumption(resumption) => {
37                resumption.resume(());
38                true
39            },
40        }
41    }
42
43    fn matches(&self, identifier: &Identifier) -> bool {
44        if let Waker::Selector(selector) = self {
45            selector.identify(identifier)
46        } else {
47            false
48        }
49    }
50}
51
52struct State<T: 'static> {
53    closed: bool,
54
55    bound: usize,
56    deque: VecDeque<T>,
57    senders: VecDeque<Waker>,
58    receivers: VecDeque<Waker>,
59}
60
61impl<T: 'static> State<T> {
62    fn new(cap: usize, bound: usize) -> Self {
63        State {
64            closed: false,
65            bound,
66            deque: VecDeque::with_capacity(cap),
67            senders: VecDeque::with_capacity(16),
68            receivers: VecDeque::with_capacity(5),
69        }
70    }
71
72    fn is_full(&self) -> bool {
73        self.bound == self.deque.len()
74    }
75
76    fn is_sendable(&self) -> bool {
77        self.closed || !self.is_full()
78    }
79
80    fn is_recvable(&self) -> bool {
81        !self.deque.is_empty() || self.closed
82    }
83
84    fn wake_sender(&mut self) {
85        while let Some(waker) = self.senders.pop_front() {
86            if waker.wake() {
87                break;
88            }
89        }
90    }
91
92    fn wake_receiver(&mut self) {
93        while let Some(receiver) = self.receivers.pop_front() {
94            if receiver.wake() {
95                break;
96            }
97        }
98    }
99
100    fn close(&mut self) {
101        self.closed = true;
102        while let Some(waker) = self.senders.pop_front() {
103            waker.wake();
104        }
105        while let Some(receiver) = self.receivers.pop_front() {
106            receiver.wake();
107        }
108    }
109}
110
111impl<T: 'static> Drop for State<T> {
112    fn drop(&mut self) {
113        self.close();
114    }
115}
116
117struct Channel<T: 'static> {
118    state: RefCell<State<T>>,
119    senders: Cell<usize>,
120    receivers: Cell<usize>,
121}
122
123impl<T: 'static> Channel<T> {
124    fn new(cap: usize, bound: usize) -> Rc<Channel<T>> {
125        let state = State::new(cap, bound);
126        Rc::new(Channel { state: RefCell::new(state), senders: Cell::new(1), receivers: Cell::new(1) })
127    }
128
129    fn add_sender(&self) {
130        let senders = self.senders.get() + 1;
131        self.senders.set(senders);
132    }
133
134    fn remove_sender(&self) {
135        let senders = self.senders.get() - 1;
136        self.senders.set(senders);
137        if senders == 0 {
138            let mut state = self.state.borrow_mut();
139            state.close();
140        }
141    }
142
143    fn add_receiver(&self) {
144        let receivers = self.receivers.get() + 1;
145        self.receivers.set(receivers);
146    }
147
148    fn remove_receiver(&self) {
149        let receivers = self.receivers.get() - 1;
150        self.receivers.set(receivers);
151        if receivers == 0 {
152            let mut state = self.state.borrow_mut();
153            state.close();
154        }
155    }
156
157    fn close(&self) {
158        let mut state = self.state.borrow_mut();
159        state.close();
160    }
161
162    fn send(&self, trying: bool, value: T) -> Result<(), TrySendError<T>> {
163        loop {
164            let mut state = self.state.borrow_mut();
165            if state.closed {
166                return Err(TrySendError::Closed(value));
167            } else if !state.is_full() {
168                state.deque.push_back(value);
169                state.wake_receiver();
170                return Ok(());
171            } else if trying {
172                return Err(TrySendError::Full(value));
173            } else {
174                let (suspension, resumption) = coroutine::suspension();
175                state.senders.push_back(Waker::from(resumption));
176                drop(state);
177                suspension.suspend();
178            }
179        }
180    }
181
182    fn is_sendable(&self) -> bool {
183        let state = self.state.borrow();
184        state.is_sendable()
185    }
186
187    fn is_recvable(&self) -> bool {
188        let state = self.state.borrow();
189        state.is_recvable()
190    }
191
192    fn watch_send_permit(&self, watcher: Selector) -> bool {
193        assert!(!self.is_sendable(), "wait on sendable channel");
194        let mut state = self.state.borrow_mut();
195        state.senders.push_back(Waker::from(watcher));
196        true
197    }
198
199    fn watch_recv_permit(&self, selector: Selector) -> bool {
200        assert!(!self.is_recvable(), "wait on recvable channel");
201        let mut state = self.state.borrow_mut();
202        state.receivers.push_back(Waker::from(selector));
203        true
204    }
205
206    fn unwatch_send_permit(&self, identifier: &Identifier) {
207        let mut state = self.state.borrow_mut();
208        if let Some(position) = state.senders.iter().position(|w| w.matches(identifier)) {
209            state.senders.remove(position);
210        }
211    }
212
213    fn unwatch_recv_permit(&self, identifier: &Identifier) {
214        let mut state = self.state.borrow_mut();
215        if let Some(position) = state.receivers.iter().position(|w| w.matches(identifier)) {
216            state.receivers.remove(position);
217        }
218    }
219
220    fn recv(&self, trying: bool) -> Result<T, TryRecvError> {
221        loop {
222            let mut state = self.state.borrow_mut();
223            if let Some(value) = state.deque.pop_front() {
224                state.wake_sender();
225                return Ok(value);
226            } else if state.closed {
227                return Err(TryRecvError::Closed);
228            } else if trying {
229                return Err(TryRecvError::Empty);
230            }
231            let (suspension, resumption) = coroutine::suspension();
232            state.receivers.push_back(Waker::from(resumption));
233            drop(state);
234            suspension.suspend();
235        }
236    }
237}
238
239impl<T> super::shared::Channel<T> for Rc<Channel<T>> {
240    fn send(&self, trying: bool, value: T) -> Result<(), TrySendError<T>> {
241        Channel::send(self, trying, value)
242    }
243
244    fn add_sender(&self) {
245        Channel::add_sender(self)
246    }
247
248    fn remove_sender(&self) {
249        Channel::remove_sender(self)
250    }
251
252    fn select_send_permit(&self) -> Option<Permit> {
253        if self.is_sendable() {
254            Some(Permit::Consume)
255        } else {
256            None
257        }
258    }
259
260    fn consume_send_permit(&self, value: T) -> Result<(), SendError<T>> {
261        match self.send(true, value) {
262            Ok(()) => Ok(()),
263            Err(TrySendError::Closed(value)) => Err(SendError::Closed(value)),
264            Err(TrySendError::Full(_)) => panic!("not ready to send"),
265        }
266    }
267
268    fn watch_send_permit(&self, selector: Selector) -> bool {
269        Channel::watch_send_permit(self, selector)
270    }
271
272    fn unwatch_send_permit(&self, identifier: &Identifier) {
273        Channel::unwatch_send_permit(self, identifier)
274    }
275
276    fn recv(&self, trying: bool) -> Result<T, TryRecvError> {
277        Channel::recv(self, trying)
278    }
279
280    fn add_receiver(&self) {
281        Channel::add_receiver(self)
282    }
283
284    fn remove_receiver(&self) {
285        Channel::remove_receiver(self)
286    }
287
288    fn select_recv_permit(&self) -> Option<Permit> {
289        if self.is_recvable() {
290            Some(Permit::Consume)
291        } else {
292            None
293        }
294    }
295
296    fn consume_recv_permit(&self) -> Option<T> {
297        match self.recv(true) {
298            Ok(value) => Some(value),
299            Err(TryRecvError::Empty) => panic!("not ready to recv"),
300            Err(TryRecvError::Closed) => None,
301        }
302    }
303
304    fn watch_recv_permit(&self, selector: Selector) -> bool {
305        Channel::watch_recv_permit(self, selector)
306    }
307
308    fn unwatch_recv_permit(&self, identifier: &Identifier) {
309        Channel::unwatch_recv_permit(self, identifier)
310    }
311
312    fn close(&self) {
313        Channel::close(self)
314    }
315}
316
317/// Sending peer of [Receiver]. Additional senders could be constructed by [Sender::clone].
318pub struct Sender<T: 'static>(super::shared::Sender<T, Rc<Channel<T>>>);
319
320impl<T: 'static> channel::Sender<T> for Sender<T> {
321    fn send(&mut self, value: T) -> Result<(), SendError<T>> {
322        self.0.send(value)
323    }
324
325    fn try_send(&mut self, value: T) -> Result<(), TrySendError<T>> {
326        self.0.try_send(value)
327    }
328
329    fn close(&mut self) {
330        self.0.close()
331    }
332
333    fn is_closed(&self) -> bool {
334        self.0.is_closed()
335    }
336}
337
338impl<T: 'static> Clone for Sender<T> {
339    fn clone(&self) -> Self {
340        Sender(self.0.clone())
341    }
342}
343
344impl<T: 'static> Selectable for Sender<T> {
345    fn parallel(&self) -> bool {
346        false
347    }
348
349    fn select_permit(&self) -> Result<select::Permit, TrySelectError> {
350        self.0.select_permit()
351    }
352
353    fn watch_permit(&self, selector: Selector) -> bool {
354        self.0.watch_permit(selector)
355    }
356
357    fn unwatch_permit(&self, identifier: &Identifier) {
358        self.0.unwatch_permit(identifier)
359    }
360}
361
362impl<T: 'static> PermitWriter for Sender<T> {
363    type Item = T;
364    type Result = Result<(), SendError<T>>;
365
366    fn consume_permit(&mut self, permit: select::Permit, value: Self::Item) -> Self::Result {
367        self.0.consume_permit(permit, value)
368    }
369}
370
371/// Receiving peer of [Sender].
372pub struct Receiver<T: 'static>(super::shared::Receiver<T, Rc<Channel<T>>>);
373
374impl<T: 'static> channel::Receiver<T> for Receiver<T> {
375    fn recv(&mut self) -> Option<T> {
376        self.0.recv()
377    }
378
379    fn try_recv(&mut self) -> Result<T, TryRecvError> {
380        self.0.try_recv()
381    }
382
383    fn close(&mut self) {
384        self.0.close()
385    }
386
387    fn terminate(&mut self) {
388        self.0.terminate()
389    }
390
391    fn is_drained(&self) -> bool {
392        self.0.is_drained()
393    }
394}
395
396impl<T: 'static> Clone for Receiver<T> {
397    fn clone(&self) -> Self {
398        Receiver(self.0.clone())
399    }
400}
401
402impl<T: 'static> Selectable for Receiver<T> {
403    fn parallel(&self) -> bool {
404        false
405    }
406
407    fn select_permit(&self) -> Result<select::Permit, TrySelectError> {
408        self.0.select_permit()
409    }
410
411    fn watch_permit(&self, selector: Selector) -> bool {
412        self.0.watch_permit(selector)
413    }
414
415    fn unwatch_permit(&self, identifier: &Identifier) {
416        self.0.unwatch_permit(identifier)
417    }
418}
419
420impl<T: 'static> PermitReader for Receiver<T> {
421    type Result = Option<T>;
422
423    fn consume_permit(&mut self, permit: select::Permit) -> Self::Result {
424        self.0.consume_permit(permit)
425    }
426}
427
428assert_not_impl_any!(Sender<()>: Send, Sync);
429assert_not_impl_any!(Receiver<()>: Send, Sync);
430
431impl<T: 'static> IntoIterator for Receiver<T> {
432    type IntoIter = IntoIter<T>;
433    type Item = T;
434
435    fn into_iter(self) -> Self::IntoIter {
436        IntoIter { receiver: self }
437    }
438}
439
440/// An iterator that owns its source receiver.
441pub struct IntoIter<T: 'static> {
442    receiver: Receiver<T>,
443}
444
445impl<T: 'static> std::iter::Iterator for IntoIter<T> {
446    type Item = T;
447
448    fn next(&mut self) -> Option<Self::Item> {
449        self.receiver.recv()
450    }
451}
452
453impl<T: 'static> std::iter::FusedIterator for IntoIter<T> {}
454
455fn channel<T: 'static>(capacity: usize, bound: usize) -> (Sender<T>, Receiver<T>) {
456    let channel = Channel::new(capacity, bound);
457    let sender = Sender(super::shared::Sender::new(channel.clone()));
458    let receiver = Receiver(super::shared::Receiver::new(channel));
459    (sender, receiver)
460}
461
462/// Constructs a pair of completed sender and receiver.
463pub fn completed<T: 'static>() -> (Sender<T>, Receiver<T>) {
464    let sender = Sender(super::shared::Sender::empty());
465    let receiver = Receiver(super::shared::Receiver::empty());
466    (sender, receiver)
467}
468
469/// Constructs a bounded channel.
470pub fn bounded<T: 'static>(bound: usize) -> (Sender<T>, Receiver<T>) {
471    assert!(bound > 0, "bound must be greater than 0");
472    channel(bound, bound)
473}
474
475/// Constructs a unbounded channel.
476pub fn unbounded<T: 'static>(capacity: usize) -> (Sender<T>, Receiver<T>) {
477    channel(capacity, usize::MAX)
478}
479
480#[cfg(test)]
481mod tests {
482    use std::time::{Duration, Instant};
483
484    use ignore_result::Ignore;
485    use more_asserts::{assert_ge, assert_le};
486    use pretty_assertions::assert_eq;
487
488    use super::*;
489    use crate::channel::serial;
490    use crate::{select, time};
491
492    #[crate::test(crate = "crate")]
493    fn completed() {
494        let (mut sender, mut receiver) = serial::completed();
495        assert_eq!(receiver.try_recv(), Err(TryRecvError::Closed));
496        assert_eq!(receiver.recv(), None);
497
498        assert_eq!(sender.send(()), Err(SendError::Closed(())));
499        assert_eq!(sender.try_send(()), Err(TrySendError::Closed(())));
500
501        assert!(sender.is_closed());
502        assert!(receiver.is_drained());
503        select! {
504            _ = <-receiver => unreachable!("completed"),
505            complete => {},
506        }
507        select! {
508            _ = sender<-() => unreachable!("completed"),
509            complete => {},
510        }
511    }
512
513    #[crate::test(crate = "crate")]
514    fn receiver_into_iter() {
515        let (mut sender, receiver) = serial::bounded(3);
516        sender.send(1).unwrap();
517        sender.send(2).unwrap();
518        sender.send(3).unwrap();
519        drop(sender);
520
521        let mut iter = receiver.into_iter();
522        assert_eq!(iter.next(), Some(1));
523        assert_eq!(iter.next(), Some(2));
524        assert_eq!(iter.next(), Some(3));
525        assert_eq!(iter.next(), None);
526        assert_eq!(iter.next(), None);
527    }
528
529    #[test]
530    #[should_panic]
531    fn bounded_zero() {
532        bounded::<()>(0);
533    }
534
535    fn series_send(mut sender: Sender<i32>, mut receiver: Receiver<i32>) {
536        sender.send(1).unwrap();
537        sender.send(2).unwrap();
538        assert_eq!(1, receiver.recv().unwrap());
539        assert_eq!(2, receiver.recv().unwrap());
540        drop(receiver);
541
542        assert_eq!(sender.send(3).unwrap_err(), SendError::Closed(3));
543        assert_eq!(sender.try_send(6).unwrap_err(), TrySendError::Closed(6));
544    }
545
546    #[test]
547    fn bounded_send() {
548        let (sender, receiver) = bounded::<i32>(2);
549        series_send(sender, receiver);
550    }
551
552    #[test]
553    fn unbounded_send() {
554        let (sender, receiver) = unbounded::<i32>(2);
555        series_send(sender, receiver);
556    }
557
558    #[test]
559    fn bounded_try_send_full() {
560        let (mut sender, mut receiver) = bounded::<i32>(2);
561        sender.try_send(1).unwrap();
562        sender.try_send(2).unwrap();
563        assert_eq!(sender.try_send(3).unwrap_err(), TrySendError::Full(3));
564        drop(sender);
565        assert_eq!(1, receiver.recv().unwrap());
566        assert_eq!(2, receiver.recv().unwrap());
567        assert_eq!(None, receiver.recv());
568    }
569
570    #[test]
571    fn unbounded_try_send() {
572        let (mut sender, mut receiver) = unbounded::<i32>(1);
573        sender.try_send(1).unwrap();
574        sender.try_send(2).unwrap();
575        sender.try_send(3).unwrap();
576        drop(sender);
577        assert_eq!(1, receiver.recv().unwrap());
578        assert_eq!(2, receiver.recv().unwrap());
579        assert_eq!(3, receiver.recv().unwrap());
580        assert_eq!(None, receiver.recv());
581    }
582
583    #[crate::test(crate = "crate")]
584    fn bounded_blocking() {
585        let (mut ready_sender, mut ready_receiver) = bounded::<()>(1);
586        let (mut sender, mut receiver) = bounded::<i32>(5);
587        let sending = coroutine::spawn(move || {
588            let now = Instant::now();
589            sender.send(1).unwrap();
590            sender.send(2).unwrap();
591            sender.send(3).unwrap();
592            sender.send(4).unwrap();
593            sender.send(5).unwrap();
594            assert_le!(now.elapsed(), Duration::from_secs(5));
595            let now = Instant::now();
596            ready_sender.send(()).unwrap();
597            sender.send(6).unwrap();
598            assert_ge!(now.elapsed(), Duration::from_secs(5));
599        });
600        ready_receiver.recv().unwrap();
601        time::sleep(Duration::from_secs(6));
602        assert_eq!(1, receiver.recv().unwrap());
603        assert_eq!(2, receiver.recv().unwrap());
604        assert_eq!(3, receiver.recv().unwrap());
605        assert_eq!(4, receiver.recv().unwrap());
606        assert_eq!(5, receiver.recv().unwrap());
607        assert_eq!(6, receiver.recv().unwrap());
608        assert_eq!(None, receiver.recv());
609        sending.join().unwrap();
610    }
611
612    #[crate::test(crate = "crate")]
613    fn unbounded_nonblocking() {
614        let (mut ready_sender, mut ready_receiver) = bounded::<()>(1);
615        let (mut sender, mut receiver) = unbounded::<i32>(0);
616        let sending = coroutine::spawn(move || {
617            ready_sender.send(()).unwrap();
618            let now = Instant::now();
619            sender.send(1).unwrap();
620            sender.send(2).unwrap();
621            sender.send(3).unwrap();
622            sender.send(4).unwrap();
623            sender.send(5).unwrap();
624            sender.send(6).unwrap();
625            assert_le!(now.elapsed(), Duration::from_secs(5));
626        });
627        ready_receiver.recv().unwrap();
628        time::sleep(Duration::from_secs(6));
629        assert_eq!(1, receiver.recv().unwrap());
630        assert_eq!(2, receiver.recv().unwrap());
631        assert_eq!(3, receiver.recv().unwrap());
632        assert_eq!(4, receiver.recv().unwrap());
633        assert_eq!(5, receiver.recv().unwrap());
634        assert_eq!(6, receiver.recv().unwrap());
635        assert_eq!(None, receiver.recv());
636        sending.join().unwrap();
637    }
638
639    #[crate::test(crate = "crate")]
640    fn send_close() {
641        let (mut sender, _receiver) = bounded(3);
642        sender.send(1).unwrap();
643
644        sender.close();
645
646        select! {
647            _ = sender<-2 => panic!("completed"),
648            complete => {},
649        }
650
651        assert_eq!(sender.send(3), Err(SendError::Closed(3)));
652    }
653
654    #[crate::test(crate = "crate")]
655    fn receiver_close() {
656        let (mut sender, mut receiver) = bounded(3);
657        sender.send(1).unwrap();
658        sender.send(2).unwrap();
659        sender.send(3).unwrap();
660
661        receiver.close();
662        assert_eq!(receiver.recv(), Some(1));
663        select! {
664            r = <-receiver => assert_eq!(r, Some(2)),
665            complete => panic!("not completed"),
666        }
667        assert_eq!(receiver.recv(), Some(3));
668        select! {
669            r = <-receiver => assert_eq!(r, None),
670            complete => panic!("not completed"),
671        }
672        select! {
673            _ = <-receiver => panic!("completed"),
674            complete => {},
675        }
676        assert_eq!(receiver.recv(), None);
677    }
678
679    #[crate::test(crate = "crate")]
680    fn receiver_terminate() {
681        let (mut sender, mut receiver) = bounded(3);
682        sender.send(1).unwrap();
683        sender.send(2).unwrap();
684
685        receiver.terminate();
686        select! {
687            _ = <-receiver => panic!("terminated"),
688            complete => {},
689        }
690        assert_eq!(receiver.recv(), None);
691    }
692
693    #[crate::test(crate = "crate")]
694    fn sender_select() {
695        let (mut sender1, receiver1) = bounded(1);
696        let (mut sender2, receiver2) = unbounded(1);
697
698        let task1 = coroutine::spawn(move || receiver1.into_iter().collect::<Vec<_>>());
699
700        let task2 = coroutine::spawn(move || receiver2.into_iter().collect::<Vec<_>>());
701
702        let mut values1 = VecDeque::from(vec![1, 3, 5]);
703        let mut values2 = VecDeque::from(vec![2, 4, 6]);
704
705        loop {
706            select! {
707                _ = sender1<-values1.pop_front().unwrap() => if values1.is_empty() {
708                    sender1.close();
709                },
710                _ = sender2<-values2.pop_front().unwrap() => if values2.is_empty() {
711                    sender2.close();
712                },
713                complete => break,
714            }
715        }
716
717        assert_eq!(task1.join().unwrap(), vec![1, 3, 5]);
718        assert_eq!(task2.join().unwrap(), vec![2, 4, 6]);
719    }
720
721    #[crate::test(crate = "crate")]
722    fn receiver_select() {
723        let (mut sender1, mut receiver1) = bounded(10);
724        let (mut sender2, mut receiver2) = unbounded(10);
725
726        coroutine::spawn(move || {
727            for v in vec![1, 3, 5] {
728                sender1.send(v).ignore();
729            }
730        });
731
732        coroutine::spawn(move || {
733            for v in vec![2, 4, 6] {
734                sender2.send(v).ignore();
735            }
736        });
737
738        let mut values1 = Vec::new();
739        let mut values2 = Vec::new();
740
741        loop {
742            select! {
743                r = <-receiver1 => if let Some(v) = r {
744                    values1.push(v);
745                },
746                r = <-receiver2 => if let Some(v) = r {
747                    values2.push(v);
748                },
749                complete => break,
750            }
751        }
752
753        assert_eq!(values1, vec![1, 3, 5]);
754        assert_eq!(values2, vec![2, 4, 6]);
755    }
756}