lambda_channel/
channel.rs

1use std::fmt;
2use std::time::{Duration, Instant};
3
4use crossbeam_channel::*;
5
6/// The sending side of a channel, almost identical to [`crossbeam_channel::Sender`]. The only difference is that
7/// you can make one channel depend on another channel. If channel `A` depends on channel `B`, channel `A`
8/// will **ACT** disconnected when channel `B` is disconnected. This mean that dependency is not transitive.
9/// If channel `Z` depends on channel `A`, channel `Z` will not **ACT** disconnected when channel `B` is disconnected.
10///
11/// # Examples
12///
13/// Channel without dependency:
14///
15/// ```
16/// use std::thread;
17/// use crossbeam_channel::RecvError;
18/// use lambda_channel::channel::new_channel;
19///
20/// let (s1, r) = new_channel(None);
21/// let s2 = s1.clone();
22///
23/// thread::spawn(move || s1.send(1).unwrap());
24/// thread::spawn(move || s2.send(2).unwrap());
25///
26/// let msg1 = r.recv().unwrap();
27/// let msg2 = r.recv().unwrap();
28///
29/// assert_eq!(msg1 + msg2, 3);
30/// assert_eq!(r.recv(), Err(RecvError)) // All senders have been dropped
31/// ```
32///
33/// Channel with dependency:
34///
35/// ```
36/// use std::thread;
37/// use crossbeam_channel::{RecvError, SendError};
38/// use lambda_channel::channel::{new_channel, new_channel_with_dependency};
39///
40/// let (s_b1, r_b) = new_channel(None);
41/// // Channel A depends on channel B
42/// let (s_a1, r_a) = new_channel_with_dependency(None, &s_b1, &r_b);
43/// let s_b2 = s_b1.clone();
44///
45/// s_a1.send(0).unwrap();
46///
47/// thread::spawn(move || s_b1.send(1).unwrap());
48/// thread::spawn(move || s_b2.send(2).unwrap());
49///
50/// let msg1 = r_b.recv().unwrap();
51/// let msg2 = r_b.recv().unwrap();
52///
53/// assert_eq!(msg1 + msg2, 3);
54/// assert_eq!(r_b.recv(), Err(RecvError)); // All `B` senders have been dropped
55///
56/// // Channel `B` is disconnected, channel `A` disconnects as well
57/// assert_eq!(s_a1.send(3), Err(SendError(3)));
58/// assert_eq!(r_a.recv(), Ok(0));
59/// assert_eq!(r_a.recv(), Err(RecvError));
60/// ```
61pub struct Sender<T> {
62    pub(super) _liveness_check: crossbeam_channel::Sender<()>,
63    pub(super) sender: crossbeam_channel::Sender<T>,
64    pub(super) liveness_check: crossbeam_channel::Receiver<()>,
65    pub(super) depends_on: Option<(
66        crossbeam_channel::Receiver<()>,
67        crossbeam_channel::Receiver<()>,
68    )>,
69}
70
71impl<T> Sender<T> {
72    /// Blocks the current thread until a message is sent or the channel is disconnected.
73    ///
74    /// If the channel is full and not disconnected, this call will block until the send operation
75    /// can proceed. If the channel becomes disconnected, this call will wake up and return an
76    /// error. The returned error contains the original message.
77    ///
78    /// If called on a zero-capacity channel, this method will wait for a receive operation to
79    /// appear on the other side of the channel.
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// use std::thread;
85    /// use std::time::Duration;
86    /// use crossbeam_channel::SendError;
87    /// use lambda_channel::channel::new_channel;
88    ///
89    /// let (s, r) = new_channel(Some(1));
90    /// assert_eq!(s.send(1), Ok(()));
91    ///
92    /// thread::spawn(move || {
93    ///     assert_eq!(r.recv(), Ok(1));
94    ///     thread::sleep(Duration::from_secs(1));
95    ///     drop(r);
96    /// });
97    ///
98    /// assert_eq!(s.send(2), Ok(()));
99    /// assert_eq!(s.send(3), Err(SendError(3)));
100    /// ```
101    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
102        if let Some(dependency) = &self.depends_on {
103            select_biased! {
104                recv(dependency.0) -> _ => {
105                    Err(SendError(msg))
106                },
107                recv(dependency.1) -> _ => {
108                    Err(SendError(msg))
109                },
110                send(self.sender, msg) -> e => {
111                    e
112                }
113            }
114        } else {
115            self.sender.send(msg)
116        }
117    }
118
119    /// Attempts to send a message into the channel without blocking.
120    ///
121    /// This method will either send a message into the channel immediately or return an error if
122    /// the channel is full or disconnected. The returned error contains the original message.
123    ///
124    /// If called on a zero-capacity channel, this method will send the message only if there
125    /// happens to be a receive operation on the other side of the channel at the same time.
126    ///
127    /// # Examples
128    ///
129    /// ```
130    /// use crossbeam_channel::TrySendError;
131    /// use lambda_channel::channel::new_channel;
132    ///
133    /// let (s, r) = new_channel(Some(1));
134    ///
135    /// assert_eq!(s.try_send(1), Ok(()));
136    /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
137    ///
138    /// drop(r);
139    /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
140    /// ```
141    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
142        if let Some(dependency) = &self.depends_on {
143            select_biased! {
144                recv(dependency.0) -> _ => {
145                    Err(TrySendError::Disconnected(msg))
146                },
147                recv(dependency.1) -> _ => {
148                    Err(TrySendError::Disconnected(msg))
149                },
150                default() => self.sender.try_send(msg)
151            }
152        } else {
153            self.sender.try_send(msg)
154        }
155    }
156
157    /// Waits for a message to be sent into the channel, but only for a limited time.
158    ///
159    /// If the channel is full and not disconnected, this call will block until the send operation
160    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
161    /// wake up and return an error. The returned error contains the original message.
162    ///
163    /// If called on a zero-capacity channel, this method will wait for a receive operation to
164    /// appear on the other side of the channel.
165    ///
166    /// # Examples
167    ///
168    /// ```
169    /// use std::thread;
170    /// use std::time::Duration;
171    /// use crossbeam_channel::SendTimeoutError;
172    /// use lambda_channel::channel::new_channel;
173    ///
174    /// let (s, r) = new_channel(Some(0));
175    ///
176    /// thread::spawn(move || {
177    ///     thread::sleep(Duration::from_secs(1));
178    ///     assert_eq!(r.recv(), Ok(2));
179    ///     drop(r);
180    /// });
181    ///
182    /// assert_eq!(
183    ///     s.send_timeout(1, Duration::from_millis(500)),
184    ///     Err(SendTimeoutError::Timeout(1)),
185    /// );
186    /// assert_eq!(
187    ///     s.send_timeout(2, Duration::from_secs(1)),
188    ///     Ok(()),
189    /// );
190    /// assert_eq!(
191    ///     s.send_timeout(3, Duration::from_millis(500)),
192    ///     Err(SendTimeoutError::Disconnected(3)),
193    /// );
194    /// ```
195    pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
196        if let Some(dependency) = &self.depends_on {
197            select_biased! {
198                recv(dependency.0) -> _ => {
199                    Err(SendTimeoutError::Disconnected(msg))
200                },
201                recv(dependency.1) -> _ => {
202                    Err(SendTimeoutError::Disconnected(msg))
203                },
204                send(self.sender, msg) -> res => {
205                    res.map_err(|e| SendTimeoutError::Disconnected(e.into_inner()))
206                },
207                default(timeout) => Err(SendTimeoutError::Timeout(msg))
208            }
209        } else {
210            self.sender.send_timeout(msg, timeout)
211        }
212    }
213
214    /// Waits for a message to be sent into the channel, but only until a given deadline.
215    ///
216    /// If the channel is full and not disconnected, this call will block until the send operation
217    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
218    /// wake up and return an error. The returned error contains the original message.
219    ///
220    /// If called on a zero-capacity channel, this method will wait for a receive operation to
221    /// appear on the other side of the channel.
222    ///
223    /// # Examples
224    ///
225    /// ```
226    /// use std::thread;
227    /// use std::time::{Duration, Instant};
228    /// use crossbeam_channel::SendTimeoutError;
229    /// use lambda_channel::channel::new_channel;
230    ///
231    /// let (s, r) = new_channel(Some(0));
232    ///
233    /// thread::spawn(move || {
234    ///     thread::sleep(Duration::from_secs(1));
235    ///     assert_eq!(r.recv(), Ok(2));
236    ///     drop(r);
237    /// });
238    ///
239    /// let now = Instant::now();
240    ///
241    /// assert_eq!(
242    ///     s.send_deadline(1, now + Duration::from_millis(500)),
243    ///     Err(SendTimeoutError::Timeout(1)),
244    /// );
245    /// assert_eq!(
246    ///     s.send_deadline(2, now + Duration::from_millis(1500)),
247    ///     Ok(()),
248    /// );
249    /// assert_eq!(
250    ///     s.send_deadline(3, now + Duration::from_millis(2000)),
251    ///     Err(SendTimeoutError::Disconnected(3)),
252    /// );
253    /// ```
254    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
255        if let Some(dependency) = &self.depends_on {
256            select_biased! {
257                recv(dependency.0) -> _ => {
258                    Err(SendTimeoutError::Disconnected(msg))
259                },
260                recv(dependency.1) -> _ => {
261                    Err(SendTimeoutError::Disconnected(msg))
262                },
263                send(self.sender, msg) -> res => {
264                    res.map_err(|e| SendTimeoutError::Disconnected(e.into_inner()))
265                },
266                default(deadline.saturating_duration_since(Instant::now())) => Err(SendTimeoutError::Timeout(msg))
267            }
268        } else {
269            self.sender.send_deadline(msg, deadline)
270        }
271    }
272
273    /// Returns `true` if the channel is empty.
274    ///
275    /// Note: Zero-capacity channels are always empty.
276    ///
277    /// # Examples
278    ///
279    /// ```
280    /// use lambda_channel::channel::new_channel;
281    ///
282    /// let (s, _r) = new_channel(None);
283    /// assert!(s.is_empty());
284    ///
285    /// s.send(0).unwrap();
286    /// assert!(!s.is_empty());
287    /// ```
288    pub fn is_empty(&self) -> bool {
289        self.sender.is_empty()
290    }
291
292    /// Returns `true` if the channel is full.
293    ///
294    /// Note: Zero-capacity channels are always full.
295    ///
296    /// # Examples
297    ///
298    /// ```
299    /// use lambda_channel::channel::new_channel;
300    ///
301    /// let (s, _r) = new_channel(Some(1));
302    ///
303    /// assert!(!s.is_full());
304    /// s.send(0).unwrap();
305    /// assert!(s.is_full());
306    /// ```
307    pub fn is_full(&self) -> bool {
308        self.sender.is_full()
309    }
310
311    /// Returns the number of messages in the channel.
312    ///
313    /// # Examples
314    ///
315    /// ```
316    /// use lambda_channel::channel::new_channel;
317    ///
318    /// let (s, _r) = new_channel(None);
319    /// assert_eq!(s.len(), 0);
320    ///
321    /// s.send(1).unwrap();
322    /// s.send(2).unwrap();
323    /// assert_eq!(s.len(), 2);
324    /// ```
325    pub fn len(&self) -> usize {
326        self.sender.len()
327    }
328
329    /// Returns the channel's capacity.
330    ///
331    /// # Examples
332    ///
333    /// ```
334    /// use lambda_channel::channel::new_channel;
335    ///
336    /// let (s, _) = new_channel::<i32>(None);
337    /// assert_eq!(s.capacity(), None);
338    ///
339    /// let (s, _) = new_channel::<i32>(Some(5));
340    /// assert_eq!(s.capacity(), Some(5));
341    ///
342    /// let (s, _) = new_channel::<i32>(Some(0));
343    /// assert_eq!(s.capacity(), Some(0));
344    /// ```
345    pub fn capacity(&self) -> Option<usize> {
346        self.sender.capacity()
347    }
348
349    /// Returns `true` if senders belong to the same channel.
350    ///
351    /// # Examples
352    ///
353    /// ```rust
354    /// use lambda_channel::channel::new_channel;
355    ///
356    /// let (s, _) = new_channel::<usize>(None);
357    ///
358    /// let s2 = s.clone();
359    /// assert!(s.same_channel(&s2));
360    ///
361    /// let (s3, _) = new_channel(None);
362    /// assert!(!s.same_channel(&s3));
363    /// ```
364    pub fn same_channel(&self, other: &Sender<T>) -> bool {
365        self.sender.same_channel(&other.sender)
366    }
367}
368
369impl<T> Clone for Sender<T> {
370    fn clone(&self) -> Self {
371        Sender {
372            _liveness_check: self._liveness_check.clone(),
373            sender: self.sender.clone(),
374            liveness_check: self.liveness_check.clone(),
375            depends_on: self.depends_on.clone(),
376        }
377    }
378}
379
380impl<T> fmt::Debug for Sender<T> {
381    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
382        f.pad("Lamba Channel Sender { .. }")
383    }
384}
385
386/// The sending side of a channel, almost identical to [`crossbeam_channel::Receiver`]. The only difference is that
387/// you can make one channel depend on another channel. If channel `A` depends on channel `B`, channel `A`
388/// will **ACT** disconnected when channel `B` is disconnected. This mean that dependency is not transitive.
389/// If channel `Z` depends on channel `A`, channel `Z` will not **ACT** disconnected when channel `B` is disconnected.
390///
391/// # Examples
392///
393/// Channel without dependency:
394///
395/// ```
396/// use std::thread;
397/// use std::time::Duration;
398/// use crossbeam_channel::RecvError;
399/// use lambda_channel::channel::new_channel;
400///
401/// let (s, r) = new_channel(None);
402///
403/// thread::spawn(move || {
404///     let _ = s.send(1);
405///     thread::sleep(Duration::from_secs(1));
406///     let _ = s.send(2);
407/// });
408///
409/// assert_eq!(r.recv(), Ok(1)); // Received immediately.
410/// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
411/// assert_eq!(r.recv(), Err(RecvError)); // All senders have been dropped
412/// ```
413///
414/// Channel with dependency:
415///
416/// ```
417/// use std::thread;
418/// use std::time::Duration;
419/// use crossbeam_channel::{RecvError, SendError};
420/// use lambda_channel::channel::{new_channel, new_channel_with_dependency};
421///
422/// let (s_b, r_b) = new_channel(None);
423/// let (s_a, r_a) = new_channel_with_dependency(None, &s_b, &r_b);
424///
425/// s_a.send(0).unwrap();
426///
427/// thread::spawn(move || {
428///     let _ = s_b.send(1);
429///     thread::sleep(Duration::from_secs(1));
430///     let _ = s_b.send(2);
431/// });
432///
433/// assert_eq!(r_b.recv(), Ok(1)); // Received immediately.
434/// assert_eq!(r_b.recv(), Ok(2)); // Received after 1 second.
435/// assert_eq!(r_b.recv(), Err(RecvError)); // All `B` senders have been dropped
436///
437/// // Channel `B` is disconnected, channel `A` disconnects as well
438/// assert_eq!(s_a.send(3), Err(SendError(3)));
439/// assert_eq!(r_a.recv(), Ok(0));
440/// assert_eq!(r_a.recv(), Err(RecvError));
441/// ```
442pub struct Receiver<T> {
443    pub(super) _liveness_check: crossbeam_channel::Sender<()>,
444    pub(super) receiver: crossbeam_channel::Receiver<T>,
445    pub(super) liveness_check: crossbeam_channel::Receiver<()>,
446    pub(super) depends_on: Option<(
447        crossbeam_channel::Receiver<()>,
448        crossbeam_channel::Receiver<()>,
449    )>,
450}
451
452impl<T> Receiver<T> {
453    /// Blocks the current thread until a message is received or the channel is empty and
454    /// disconnected.
455    ///
456    /// If the channel is empty and not disconnected, this call will block until the receive
457    /// operation can proceed. If the channel is empty and becomes disconnected, this call will
458    /// wake up and return an error.
459    ///
460    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
461    /// on the other side of the channel.
462    ///
463    /// # Examples
464    ///
465    /// ```
466    /// use std::thread;
467    /// use std::time::Duration;
468    /// use crossbeam_channel::RecvError;
469    /// use lambda_channel::channel::new_channel;
470    ///
471    /// let (s, r) = new_channel(None);
472    ///
473    /// thread::spawn(move || {
474    ///     thread::sleep(Duration::from_secs(1));
475    ///     s.send(5).unwrap();
476    ///     drop(s);
477    /// });
478    ///
479    /// assert_eq!(r.recv(), Ok(5));
480    /// assert_eq!(r.recv(), Err(RecvError));
481    /// ```
482    pub fn recv(&self) -> Result<T, RecvError> {
483        if let Some(dependency) = &self.depends_on {
484            select_biased! {
485                recv(self.receiver) -> e => {
486                    e
487                },
488                recv(dependency.0) -> _ => {
489                    Err(RecvError)
490                },
491                recv(dependency.1) -> _ => {
492                    Err(RecvError)
493                },
494            }
495        } else {
496            self.receiver.recv()
497        }
498    }
499
500    /// Attempts to receive a message from the channel without blocking.
501    ///
502    /// This method will either receive a message from the channel immediately or return an error
503    /// if the channel is empty.
504    ///
505    /// If called on a zero-capacity channel, this method will receive a message only if there
506    /// happens to be a send operation on the other side of the channel at the same time.
507    ///
508    /// # Examples
509    ///
510    /// ```
511    /// use crossbeam_channel::TryRecvError;
512    /// use lambda_channel::channel::new_channel;
513    ///
514    /// let (s, r) = new_channel(None);
515    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
516    ///
517    /// s.send(5).unwrap();
518    /// drop(s);
519    ///
520    /// assert_eq!(r.try_recv(), Ok(5));
521    /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
522    /// ```
523    pub fn try_recv(&self) -> Result<T, TryRecvError> {
524        if let Some(dependency) = &self.depends_on {
525            select_biased! {
526                recv(dependency.0) -> _ => {
527                    self.receiver.try_recv().map_err(|_| TryRecvError::Disconnected)
528                },
529                recv(dependency.1) -> _ => {
530                    self.receiver.try_recv().map_err(|_| TryRecvError::Disconnected)
531                },
532                default() => self.receiver.try_recv()
533            }
534        } else {
535            self.receiver.try_recv()
536        }
537    }
538
539    /// Waits for a message to be received from the channel, but only for a limited time.
540    ///
541    /// If the channel is empty and not disconnected, this call will block until the receive
542    /// operation can proceed or the operation times out. If the channel is empty and becomes
543    /// disconnected, this call will wake up and return an error.
544    ///
545    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
546    /// on the other side of the channel.
547    ///
548    /// # Examples
549    ///
550    /// ```
551    /// use std::thread;
552    /// use std::time::Duration;
553    /// use crossbeam_channel::RecvTimeoutError;
554    /// use lambda_channel::channel::new_channel;
555    ///
556    /// let (s, r) = new_channel(None);
557    ///
558    /// thread::spawn(move || {
559    ///     thread::sleep(Duration::from_secs(1));
560    ///     s.send(5).unwrap();
561    ///     drop(s);
562    /// });
563    ///
564    /// assert_eq!(
565    ///     r.recv_timeout(Duration::from_millis(500)),
566    ///     Err(RecvTimeoutError::Timeout),
567    /// );
568    /// assert_eq!(
569    ///     r.recv_timeout(Duration::from_secs(1)),
570    ///     Ok(5),
571    /// );
572    /// assert_eq!(
573    ///     r.recv_timeout(Duration::from_secs(1)),
574    ///     Err(RecvTimeoutError::Disconnected),
575    /// );
576    /// ```
577    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
578        if let Some(dependency) = &self.depends_on {
579            select_biased! {
580                recv(self.receiver) -> res => {
581                    res.map_err(|_| RecvTimeoutError::Disconnected)
582                },
583                recv(dependency.0) -> _ => {
584                    Err(RecvTimeoutError::Disconnected)
585                },
586                recv(dependency.1) -> _ => {
587                    Err(RecvTimeoutError::Disconnected)
588                },
589                default(timeout) => Err(RecvTimeoutError::Timeout),
590            }
591        } else {
592            self.receiver.recv_timeout(timeout)
593        }
594    }
595
596    /// Waits for a message to be received from the channel, but only before a given deadline.
597    ///
598    /// If the channel is empty and not disconnected, this call will block until the receive
599    /// operation can proceed or the operation times out. If the channel is empty and becomes
600    /// disconnected, this call will wake up and return an error.
601    ///
602    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
603    /// on the other side of the channel.
604    ///
605    /// # Examples
606    ///
607    /// ```
608    /// use std::thread;
609    /// use std::time::{Instant, Duration};
610    /// use crossbeam_channel::RecvTimeoutError;
611    /// use lambda_channel::channel::new_channel;
612    ///
613    /// let (s, r) = new_channel(None);
614    ///
615    /// thread::spawn(move || {
616    ///     thread::sleep(Duration::from_secs(1));
617    ///     s.send(5).unwrap();
618    ///     drop(s);
619    /// });
620    ///
621    /// let now = Instant::now();
622    ///
623    /// assert_eq!(
624    ///     r.recv_deadline(now + Duration::from_millis(500)),
625    ///     Err(RecvTimeoutError::Timeout),
626    /// );
627    /// assert_eq!(
628    ///     r.recv_deadline(now + Duration::from_millis(1500)),
629    ///     Ok(5),
630    /// );
631    /// assert_eq!(
632    ///     r.recv_deadline(now + Duration::from_secs(5)),
633    ///     Err(RecvTimeoutError::Disconnected),
634    /// );
635    /// ```
636    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
637        if let Some(dependency) = &self.depends_on {
638            select_biased! {
639                recv(self.receiver) -> res => {
640                    res.map_err(|_| RecvTimeoutError::Disconnected)
641                },
642                recv(dependency.0) -> _ => {
643                    Err(RecvTimeoutError::Disconnected)
644                },
645                recv(dependency.1) -> _ => {
646                    Err(RecvTimeoutError::Disconnected)
647                },
648                default(deadline.saturating_duration_since(Instant::now())) => Err(RecvTimeoutError::Timeout),
649            }
650        } else {
651            self.receiver.recv_deadline(deadline)
652        }
653    }
654
655    /// Returns `true` if the channel is empty.
656    ///
657    /// Note: Zero-capacity channels are always empty.
658    ///
659    /// # Examples
660    ///
661    /// ```
662    /// use lambda_channel::channel::new_channel;
663    ///
664    /// let (s, r) = new_channel(None);
665    ///
666    /// assert!(r.is_empty());
667    /// s.send(0).unwrap();
668    /// assert!(!r.is_empty());
669    /// ```
670    pub fn is_empty(&self) -> bool {
671        self.receiver.is_empty()
672    }
673
674    /// Returns `true` if the channel is full.
675    ///
676    /// Note: Zero-capacity channels are always full.
677    ///
678    /// # Examples
679    ///
680    /// ```
681    /// use lambda_channel::channel::new_channel;
682    ///
683    /// let (s, r) = new_channel(Some(1));
684    ///
685    /// assert!(!r.is_full());
686    /// s.send(0).unwrap();
687    /// assert!(r.is_full());
688    /// ```
689    pub fn is_full(&self) -> bool {
690        self.receiver.is_full()
691    }
692
693    /// Returns the number of messages in the channel.
694    ///
695    /// # Examples
696    ///
697    /// ```
698    /// use lambda_channel::channel::new_channel;
699    ///
700    /// let (s, r) = new_channel(None);
701    /// assert_eq!(r.len(), 0);
702    ///
703    /// s.send(1).unwrap();
704    /// s.send(2).unwrap();
705    /// assert_eq!(r.len(), 2);
706    /// ```
707    pub fn len(&self) -> usize {
708        self.receiver.len()
709    }
710
711    /// Returns the channel's capacity.
712    ///
713    /// # Examples
714    ///
715    /// ```
716    /// use lambda_channel::channel::new_channel;
717    ///
718    /// let (_, r) = new_channel::<i32>(None);
719    /// assert_eq!(r.capacity(), None);
720    ///
721    /// let (_, r) = new_channel::<i32>(Some(5));
722    /// assert_eq!(r.capacity(), Some(5));
723    ///
724    /// let (_, r) = new_channel::<i32>(Some(0));
725    /// assert_eq!(r.capacity(), Some(0));
726    /// ```
727    pub fn capacity(&self) -> Option<usize> {
728        self.receiver.capacity()
729    }
730
731    /// A blocking iterator over messages in the channel.
732    ///
733    /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
734    /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
735    ///
736    /// [`next`]: Iterator::next
737    ///
738    /// # Examples
739    ///
740    /// ```
741    /// use std::thread;
742    /// use lambda_channel::channel::new_channel;
743    ///
744    /// let (s, r) = new_channel(None);
745    ///
746    /// thread::spawn(move || {
747    ///     s.send(1).unwrap();
748    ///     s.send(2).unwrap();
749    ///     s.send(3).unwrap();
750    ///     drop(s); // Disconnect the channel.
751    /// });
752    ///
753    /// // Collect all messages from the channel.
754    /// // Note that the call to `collect` blocks until the sender is dropped.
755    /// let v: Vec<_> = r.iter().collect();
756    ///
757    /// assert_eq!(v, [1, 2, 3]);
758    /// ```
759    pub fn iter(&self) -> Iter<'_, T> {
760        self.receiver.iter()
761    }
762
763    /// A non-blocking iterator over messages in the channel.
764    ///
765    /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
766    /// never blocks waiting for the next message.
767    ///
768    /// [`next`]: Iterator::next
769    ///
770    /// # Examples
771    ///
772    /// ```
773    /// use std::thread;
774    /// use std::time::Duration;
775    /// use lambda_channel::channel::new_channel;
776    ///
777    /// let (s, r) = new_channel::<i32>(None);
778    ///
779    /// # let t =
780    /// thread::spawn(move || {
781    ///     s.send(1).unwrap();
782    ///     thread::sleep(Duration::from_secs(1));
783    ///     s.send(2).unwrap();
784    ///     thread::sleep(Duration::from_secs(2));
785    ///     s.send(3).unwrap();
786    /// });
787    ///
788    /// thread::sleep(Duration::from_secs(2));
789    ///
790    /// // Collect all messages from the channel without blocking.
791    /// // The third message hasn't been sent yet so we'll collect only the first two.
792    /// let v: Vec<_> = r.try_iter().collect();
793    ///
794    /// assert_eq!(v, [1, 2]);
795    /// # t.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371
796    /// ```
797    pub fn try_iter(&self) -> TryIter<'_, T> {
798        self.receiver.try_iter()
799    }
800
801    /// Returns `true` if receivers belong to the same channel.
802    ///
803    /// # Examples
804    ///
805    /// ```rust
806    /// use lambda_channel::channel::new_channel;
807    ///
808    /// let (_, r) = new_channel::<usize>(None);
809    ///
810    /// let r2 = r.clone();
811    /// assert!(r.same_channel(&r2));
812    ///
813    /// let (_, r3) = new_channel(None);
814    /// assert!(!r.same_channel(&r3));
815    /// ```
816    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
817        self.receiver.same_channel(&other.receiver)
818    }
819}
820
821impl<T> Clone for Receiver<T> {
822    fn clone(&self) -> Self {
823        Receiver {
824            _liveness_check: self._liveness_check.clone(),
825            receiver: self.receiver.clone(),
826            liveness_check: self.liveness_check.clone(),
827            depends_on: self.depends_on.clone(),
828        }
829    }
830}
831
832impl<T> fmt::Debug for Receiver<T> {
833    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
834        f.pad("Lamba Channel Receiver { .. }")
835    }
836}
837
838/// Creates a multi-producer multi-consumer channel of either bounded or unbounded capacity.
839///
840/// - If `capacity` is `None`, this channel has a growable buffer that can hold any number of messages at a time.
841/// - If `capacity` is `Some(n)`, this channel has a buffer that can hold at most `n` messages at a time.
842///
843/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
844/// receive operations must appear at the same time in order to pair up and pass the message over.
845///
846/// # Examples
847///
848/// A channel of unbounded capacity:
849///
850/// ```
851/// use std::thread;
852/// use lambda_channel::channel::new_channel;
853///
854/// let (s, r) = new_channel(None);
855///
856/// // Computes the n-th Fibonacci number.
857/// fn fib(n: i32) -> i32 {
858///     if n <= 1 {
859///         n
860///     } else {
861///         fib(n - 1) + fib(n - 2)
862///     }
863/// }
864///
865/// // Spawn an asynchronous computation.
866/// thread::spawn(move || s.send(fib(20)).unwrap());
867/// assert_eq!(r.recv(), Ok(6765))
868/// ```
869///
870/// A channel of capacity 1:
871///
872/// ```
873/// use std::thread;
874/// use std::time::Duration;
875/// use lambda_channel::channel::new_channel;
876///
877/// let (s, r) = new_channel(Some(1));
878///
879/// // This call returns immediately because there is enough space in the channel.
880/// s.send(1).unwrap();
881///
882/// thread::spawn(move || {
883///     // This call blocks the current thread because the channel is full.
884///     // It will be able to complete only after the first message is received.
885///     s.send(2).unwrap();
886/// });
887///
888/// thread::sleep(Duration::from_secs(1));
889/// assert_eq!(r.recv(), Ok(1));
890/// assert_eq!(r.recv(), Ok(2));
891/// ```
892///
893/// A zero-capacity channel:
894///
895/// ```
896/// use std::thread;
897/// use std::time::Duration;
898/// use lambda_channel::channel::new_channel;
899///
900/// let (s, r) = new_channel(Some(0));
901///
902/// thread::spawn(move || {
903///     // This call blocks the current thread until a receive operation appears
904///     // on the other side of the channel.
905///     s.send(1).unwrap();
906/// });
907///
908/// thread::sleep(Duration::from_secs(1));
909/// assert_eq!(r.recv(), Ok(1));
910/// ```
911pub fn new_channel<T>(capacity: Option<usize>) -> (Sender<T>, Receiver<T>) {
912    let (sender, receiver) = match capacity {
913        None => unbounded(),
914        Some(n) => bounded(n),
915    };
916
917    let (_sender_liveness_check, sender_liveness_check) = bounded(0);
918    let (_receiver_liveness_check, receiver_liveness_check) = bounded(0);
919
920    let sender = Sender {
921        _liveness_check: _sender_liveness_check,
922        sender,
923        liveness_check: receiver_liveness_check,
924        depends_on: None,
925    };
926
927    let receiver = Receiver {
928        _liveness_check: _receiver_liveness_check,
929        receiver,
930        liveness_check: sender_liveness_check,
931        depends_on: None,
932    };
933
934    (sender, receiver)
935}
936
937/// Creates a multi-producer multi-consumer channel of either bounded or unbounded capacity that **ACTS**
938/// disconnected when the channel it depends on is disconnected.
939///
940/// - If `capacity` is `None`, this channel has a growable buffer that can hold any number of messages at a time.
941/// - If `capacity` is `Some(n)`, this channel has a buffer that can hold at most `n` messages at a time.
942///
943/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
944/// receive operations must appear at the same time in order to pair up and pass the message over.
945///
946/// # Examples
947///
948/// ```
949/// use std::thread;
950/// use crossbeam_channel::SendError;
951/// use lambda_channel::channel::{new_channel, new_channel_with_dependency};
952///
953/// let (s_b, r_b) = new_channel(None);
954/// let (s_a, r_a) = new_channel_with_dependency(None, &s_b, &r_b);
955///
956/// fn fib(n: i32) -> u64 {
957///     if n <= 1 {
958///         n as u64
959///     } else {
960///         fib(n - 1) + fib(n - 2)
961///     }
962/// }
963///
964/// // Spawn an asynchronous computation.
965/// let handle = thread::spawn(move || {
966///     while let Ok(v) = r_a.recv() {
967///         s_b.send(fib(v)).unwrap();
968///     }
969/// });
970///
971/// s_a.send(20).unwrap();
972/// assert_eq!(r_b.recv(), Ok(6765));
973///
974/// drop(r_b);
975/// let _ = handle.join();
976/// assert_eq!(s_a.send(10), Err(SendError(10)));
977/// ```
978pub fn new_channel_with_dependency<T, U>(
979    capacity: Option<usize>,
980    dependency_sender: &Sender<U>,
981    dependency_receiver: &Receiver<U>,
982) -> (Sender<T>, Receiver<T>) {
983    let (sender, receiver) = match capacity {
984        None => unbounded(),
985        Some(n) => bounded(n),
986    };
987
988    let (_sender_liveness_check, sender_liveness_check) = bounded(0);
989    let (_receiver_liveness_check, receiver_liveness_check) = bounded(0);
990
991    let sender = Sender {
992        _liveness_check: _sender_liveness_check,
993        sender,
994        liveness_check: receiver_liveness_check,
995        depends_on: Some((
996            dependency_sender.liveness_check.clone(),
997            dependency_receiver.liveness_check.clone(),
998        )),
999    };
1000
1001    let receiver = Receiver {
1002        _liveness_check: _receiver_liveness_check,
1003        receiver,
1004        liveness_check: sender_liveness_check,
1005        depends_on: Some((
1006            dependency_sender.liveness_check.clone(),
1007            dependency_receiver.liveness_check.clone(),
1008        )),
1009    };
1010
1011    (sender, receiver)
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016    use super::*;
1017    use std::sync::atomic::{AtomicBool, Ordering};
1018    use std::sync::Arc;
1019    use std::thread;
1020
1021    use quanta::Clock;
1022
1023    fn send_test(tx: Sender<u16>, rx: Receiver<u16>, handle: Option<thread::JoinHandle<()>>) {
1024        assert_eq!(tx.send(1), Ok(()));
1025        assert_eq!(rx.recv(), Ok(1));
1026
1027        drop(rx);
1028        assert_eq!(tx.send(2), Err(SendError(2)));
1029
1030        if let Some(h) = handle {
1031            let _ = h.join();
1032        }
1033    }
1034
1035    #[test]
1036    fn test_send() {
1037        let (tx, rx) = new_channel(None);
1038        send_test(tx, rx, None);
1039    }
1040
1041    #[test]
1042    fn test_dependent_send() {
1043        let (out_tx, rx) = new_channel(None);
1044        let (tx, in_rx) = new_channel_with_dependency(None, &out_tx, &rx);
1045
1046        let handle = thread::spawn(move || loop {
1047            let _ = out_tx.send(in_rx.recv().unwrap());
1048        });
1049
1050        send_test(tx, rx, Some(handle));
1051    }
1052
1053    fn try_send_test(tx: Sender<u16>, rx: Receiver<u16>, handle: Option<thread::JoinHandle<()>>) {
1054        assert_eq!(tx.try_send(1), Ok(()));
1055        assert_eq!(tx.try_send(2), Err(TrySendError::Full(2)));
1056        assert_eq!(rx.recv(), Ok(1));
1057
1058        drop(rx);
1059        assert_eq!(tx.try_send(3), Err(TrySendError::Disconnected(3)));
1060
1061        if let Some(h) = handle {
1062            let _ = h.join();
1063        }
1064    }
1065
1066    #[test]
1067    fn test_try_send() {
1068        let (tx, rx) = new_channel(Some(1));
1069        try_send_test(tx, rx, None);
1070    }
1071
1072    #[test]
1073    fn test_dependent_try_send() {
1074        let (out_tx, rx) = new_channel(Some(0));
1075        let (tx, in_rx) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1076
1077        let handle = thread::spawn(move || loop {
1078            let _ = out_tx.send(in_rx.recv().unwrap());
1079        });
1080
1081        // Make sure the thread is ready
1082        assert_eq!(tx.send(0), Ok(()));
1083        assert_eq!(rx.recv(), Ok(0));
1084
1085        try_send_test(tx, rx, Some(handle));
1086    }
1087
1088    fn send_timeout_test(
1089        tx: Sender<u16>,
1090        rx: Receiver<u16>,
1091        handle: Option<thread::JoinHandle<()>>,
1092    ) {
1093        let timeout = Duration::from_millis(10);
1094        let clock = Clock::new();
1095
1096        let mut s = clock.now();
1097        assert_eq!(tx.send_timeout(1, timeout), Ok(()));
1098        assert!(s.elapsed() < timeout / 4);
1099
1100        s = clock.now();
1101        assert_eq!(
1102            tx.send_timeout(2, timeout),
1103            Err(SendTimeoutError::Timeout(2))
1104        );
1105        assert!(s.elapsed() >= timeout);
1106        assert_eq!(rx.recv(), Ok(1));
1107
1108        drop(rx);
1109        s = clock.now();
1110        assert_eq!(
1111            tx.send_timeout(3, timeout),
1112            Err(SendTimeoutError::Disconnected(3))
1113        );
1114        assert!(s.elapsed() < timeout / 4);
1115
1116        if let Some(h) = handle {
1117            let _ = h.join();
1118        }
1119    }
1120
1121    #[test]
1122    fn test_send_timeout() {
1123        let (tx, rx) = new_channel(Some(1));
1124        send_timeout_test(tx, rx, None);
1125    }
1126
1127    #[test]
1128    fn test_dependent_send_timeout() {
1129        let (out_tx, rx) = new_channel(Some(0));
1130        let (tx, in_rx) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1131
1132        let handle = thread::spawn(move || loop {
1133            let _ = out_tx.send(in_rx.recv().unwrap());
1134        });
1135
1136        send_timeout_test(tx, rx, Some(handle));
1137    }
1138
1139    fn send_deadline_test(
1140        tx: Sender<u16>,
1141        rx: Receiver<u16>,
1142        handle: Option<thread::JoinHandle<()>>,
1143    ) {
1144        let timeout = Duration::from_millis(10);
1145        let clock = Clock::new();
1146
1147        let mut s = clock.now();
1148        let mut deadline = Instant::now() + timeout;
1149        assert_eq!(tx.send_deadline(1, deadline), Ok(()));
1150        assert!(s.elapsed() < timeout / 4);
1151
1152        s = clock.now();
1153        deadline = Instant::now() + timeout;
1154        assert_eq!(
1155            tx.send_deadline(2, deadline),
1156            Err(SendTimeoutError::Timeout(2))
1157        );
1158        assert!(s.elapsed() >= timeout);
1159        assert_eq!(rx.recv(), Ok(1));
1160
1161        drop(rx);
1162        s = clock.now();
1163        deadline = Instant::now() + timeout;
1164        assert_eq!(
1165            tx.send_deadline(3, deadline),
1166            Err(SendTimeoutError::Disconnected(3))
1167        );
1168        assert!(s.elapsed() < timeout / 4);
1169
1170        if let Some(h) = handle {
1171            let _ = h.join();
1172        }
1173    }
1174
1175    #[test]
1176    fn test_send_deadline() {
1177        let (tx, rx) = new_channel(Some(1));
1178        send_deadline_test(tx, rx, None);
1179    }
1180
1181    #[test]
1182    fn test_dependent_send_deadline() {
1183        let (out_tx, rx) = new_channel(Some(0));
1184        let (tx, in_rx) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1185
1186        let handle = thread::spawn(move || loop {
1187            let _ = out_tx.send(in_rx.recv().unwrap());
1188        });
1189
1190        send_deadline_test(tx, rx, Some(handle));
1191    }
1192
1193    fn recv_test(tx: Sender<u16>, rx: Receiver<u16>, handle: Option<thread::JoinHandle<()>>) {
1194        assert_eq!(tx.send(1), Ok(()));
1195        assert_eq!(rx.recv(), Ok(1));
1196
1197        assert_eq!(tx.send(2), Ok(()));
1198        drop(tx);
1199
1200        assert_eq!(rx.recv(), Ok(2));
1201        assert_eq!(rx.recv(), Err(RecvError));
1202
1203        if let Some(h) = handle {
1204            let _ = h.join();
1205        }
1206    }
1207
1208    #[test]
1209    fn test_recv() {
1210        let (tx, rx) = new_channel(None);
1211        recv_test(tx, rx, None);
1212    }
1213
1214    #[test]
1215    fn test_dependent_recv() {
1216        let (out_tx, rx) = new_channel(None);
1217        let (tx, in_rx) = new_channel_with_dependency(None, &out_tx, &rx);
1218
1219        let handle = thread::spawn(move || loop {
1220            let _ = out_tx.send(in_rx.recv().unwrap());
1221        });
1222
1223        recv_test(tx, rx, Some(handle));
1224    }
1225
1226    fn try_recv_test(tx: Sender<u16>, rx: Receiver<u16>, handle: Option<thread::JoinHandle<()>>) {
1227        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1228
1229        assert_eq!(tx.send(1), Ok(()));
1230
1231        loop {
1232            match rx.try_recv() {
1233                Ok(v) => {
1234                    assert_eq!(v, 1);
1235                    break;
1236                }
1237                Err(e) => {
1238                    assert_eq!(e, TryRecvError::Empty);
1239                }
1240            };
1241        }
1242
1243        assert_eq!(tx.send(2), Ok(()));
1244        drop(tx);
1245
1246        loop {
1247            match rx.try_recv() {
1248                Ok(v) => {
1249                    assert_eq!(v, 2);
1250                    break;
1251                }
1252                Err(e) => {
1253                    assert_eq!(e, TryRecvError::Empty);
1254                }
1255            };
1256        }
1257
1258        loop {
1259            match rx.try_recv() {
1260                Ok(_) => {
1261                    assert!(false);
1262                }
1263                Err(e) => match e {
1264                    TryRecvError::Empty => {}
1265                    TryRecvError::Disconnected => break,
1266                },
1267            };
1268        }
1269
1270        if let Some(h) = handle {
1271            let _ = h.join();
1272        }
1273    }
1274
1275    #[test]
1276    fn test_try_recv() {
1277        let (tx, rx) = new_channel(Some(1));
1278        try_recv_test(tx, rx, None);
1279    }
1280
1281    #[test]
1282    fn test_dependent_try_recv() {
1283        let (out_tx, rx) = new_channel(Some(0));
1284        let (tx, in_rx) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1285
1286        let handle = thread::spawn(move || loop {
1287            let _ = out_tx.send(in_rx.recv().unwrap());
1288        });
1289
1290        try_recv_test(tx, rx, Some(handle));
1291    }
1292
1293    fn recv_timeout_test(
1294        tx: Sender<u16>,
1295        rx: Receiver<u16>,
1296        handle: Option<thread::JoinHandle<()>>,
1297    ) {
1298        let timeout: Duration = Duration::from_millis(10);
1299        let clock = Clock::new();
1300
1301        let mut s = clock.now();
1302        assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
1303        assert!(s.elapsed() >= timeout);
1304
1305        assert_eq!(tx.send(1), Ok(()));
1306        s = clock.now();
1307        assert_eq!(rx.recv_timeout(timeout), Ok(1));
1308        assert!(s.elapsed() < timeout / 4);
1309
1310        assert_eq!(tx.send(2), Ok(()));
1311        drop(tx);
1312
1313        s = clock.now();
1314        assert_eq!(rx.recv_timeout(timeout), Ok(2));
1315        assert!(s.elapsed() < timeout / 4);
1316
1317        s = clock.now();
1318        assert_eq!(
1319            rx.recv_timeout(timeout),
1320            Err(RecvTimeoutError::Disconnected)
1321        );
1322        assert!(s.elapsed() < timeout / 4);
1323
1324        if let Some(h) = handle {
1325            let _ = h.join();
1326        }
1327    }
1328
1329    #[test]
1330    fn test_recv_timeout() {
1331        let (tx, rx) = new_channel(Some(1));
1332        recv_timeout_test(tx, rx, None);
1333    }
1334
1335    #[test]
1336    fn test_dependent_recv_timeout() {
1337        let (out_tx, rx) = new_channel(Some(0));
1338        let (tx, in_rx) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1339
1340        let handle = thread::spawn(move || loop {
1341            let _ = out_tx.send(in_rx.recv().unwrap());
1342        });
1343
1344        recv_timeout_test(tx, rx, Some(handle));
1345    }
1346
1347    fn recv_deadline_test(
1348        tx: Sender<u16>,
1349        rx: Receiver<u16>,
1350        handle: Option<thread::JoinHandle<()>>,
1351    ) {
1352        let timeout: Duration = Duration::from_millis(10);
1353        let clock = Clock::new();
1354
1355        let mut s = clock.now();
1356        let mut deadline = Instant::now() + timeout;
1357        assert_eq!(rx.recv_deadline(deadline), Err(RecvTimeoutError::Timeout));
1358        assert!(s.elapsed() >= timeout);
1359
1360        assert_eq!(tx.send(1), Ok(()));
1361        s = clock.now();
1362        deadline = Instant::now() + timeout;
1363        assert_eq!(rx.recv_deadline(deadline), Ok(1));
1364        assert!(s.elapsed() < timeout / 4);
1365
1366        assert_eq!(tx.send(2), Ok(()));
1367        drop(tx);
1368
1369        s = clock.now();
1370        deadline = Instant::now() + timeout;
1371        assert_eq!(rx.recv_deadline(deadline), Ok(2));
1372        assert!(s.elapsed() < timeout / 4);
1373
1374        s = clock.now();
1375        deadline = Instant::now() + timeout;
1376        assert_eq!(
1377            rx.recv_deadline(deadline),
1378            Err(RecvTimeoutError::Disconnected)
1379        );
1380        assert!(s.elapsed() < timeout / 4);
1381
1382        if let Some(h) = handle {
1383            let _ = h.join();
1384        }
1385    }
1386
1387    #[test]
1388    fn test_recv_deadline() {
1389        let (tx, rx) = new_channel(Some(1));
1390        recv_deadline_test(tx, rx, None);
1391    }
1392
1393    #[test]
1394    fn test_dependent_recv_deadline() {
1395        let (out_tx, rx) = new_channel(Some(0));
1396        let (tx, in_rx) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1397
1398        let handle = thread::spawn(move || loop {
1399            let _ = out_tx.send(in_rx.recv().unwrap());
1400        });
1401
1402        recv_deadline_test(tx, rx, Some(handle));
1403    }
1404
1405    #[test]
1406    fn test_crazy_chain_drop_receiver() {
1407        let (out_tx, rx) = new_channel(Some(0));
1408        let (tx1, in_rx1) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1409        let (tx2, in_rx2) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1410        let (tx11, in_rx11) = new_channel_with_dependency(Some(0), &tx1, &in_rx1);
1411        let (tx12, in_rx12) = new_channel_with_dependency(Some(0), &tx1, &in_rx1);
1412        let (tx21, in_rx21) = new_channel_with_dependency(Some(0), &tx2, &in_rx2);
1413        let (tx22, in_rx22) = new_channel_with_dependency(Some(0), &tx2, &in_rx2);
1414
1415        let v = out_tx.clone();
1416        let handle1 = thread::spawn(move || loop {
1417            let _ = v.send(in_rx1.recv().unwrap());
1418        });
1419
1420        let handle2 = thread::spawn(move || loop {
1421            let _ = out_tx.send(in_rx2.recv().unwrap());
1422        });
1423
1424        let v = tx1.clone();
1425        let handle11 = thread::spawn(move || loop {
1426            let _ = v.send(in_rx11.recv().unwrap());
1427        });
1428
1429        let handle12 = thread::spawn(move || loop {
1430            let _ = tx1.send(in_rx12.recv().unwrap());
1431        });
1432
1433        let v = tx2.clone();
1434        let handle21 = thread::spawn(move || loop {
1435            let _ = v.send(in_rx21.recv().unwrap());
1436        });
1437
1438        let handle22 = thread::spawn(move || loop {
1439            let _ = tx2.send(in_rx22.recv().unwrap());
1440        });
1441
1442        assert_eq!(tx11.send(1), Ok(()));
1443        assert_eq!(rx.recv(), Ok(1));
1444        assert_eq!(tx12.send(2), Ok(()));
1445        assert_eq!(rx.recv(), Ok(2));
1446        assert_eq!(tx21.send(3), Ok(()));
1447        assert_eq!(rx.recv(), Ok(3));
1448        assert_eq!(tx22.send(4), Ok(()));
1449        assert_eq!(rx.recv(), Ok(4));
1450
1451        drop(rx);
1452
1453        let _ = handle1.join();
1454        let _ = handle2.join();
1455        let _ = handle11.join();
1456        let _ = handle12.join();
1457        let _ = handle21.join();
1458        let _ = handle22.join();
1459
1460        assert_eq!(tx11.send(6), Err(SendError(6)));
1461        assert_eq!(tx12.send(7), Err(SendError(7)));
1462        assert_eq!(tx21.send(8), Err(SendError(8)));
1463        assert_eq!(tx22.send(9), Err(SendError(9)));
1464    }
1465
1466    #[test]
1467    fn test_crazy_chain_drop_senders() {
1468        let (out_tx, rx) = new_channel(Some(0));
1469        let (tx1, in_rx1) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1470        let (tx2, in_rx2) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1471        let (tx11, in_rx11) = new_channel_with_dependency(Some(0), &tx1, &in_rx1);
1472        let (tx12, in_rx12) = new_channel_with_dependency(Some(0), &tx1, &in_rx1);
1473        let (tx21, in_rx21) = new_channel_with_dependency(Some(0), &tx2, &in_rx2);
1474        let (tx22, in_rx22) = new_channel_with_dependency(Some(0), &tx2, &in_rx2);
1475
1476        let v = out_tx.clone();
1477        let handle1 = thread::spawn(move || loop {
1478            let _ = v.send(in_rx1.recv().unwrap());
1479        });
1480
1481        let handle2 = thread::spawn(move || loop {
1482            let _ = out_tx.send(in_rx2.recv().unwrap());
1483        });
1484
1485        let v = tx1.clone();
1486        let handle11 = thread::spawn(move || loop {
1487            let _ = v.send(in_rx11.recv().unwrap());
1488        });
1489
1490        let handle12 = thread::spawn(move || loop {
1491            let _ = tx1.send(in_rx12.recv().unwrap());
1492        });
1493
1494        let v = tx2.clone();
1495        let handle21 = thread::spawn(move || loop {
1496            let _ = v.send(in_rx21.recv().unwrap());
1497        });
1498
1499        let handle22 = thread::spawn(move || loop {
1500            let _ = tx2.send(in_rx22.recv().unwrap());
1501        });
1502
1503        assert_eq!(tx11.send(1), Ok(()));
1504        assert_eq!(rx.recv(), Ok(1));
1505        assert_eq!(tx12.send(2), Ok(()));
1506        assert_eq!(rx.recv(), Ok(2));
1507        assert_eq!(tx21.send(3), Ok(()));
1508        assert_eq!(rx.recv(), Ok(3));
1509        assert_eq!(tx22.send(4), Ok(()));
1510        assert_eq!(rx.recv(), Ok(4));
1511
1512        assert_eq!(tx11.send(5), Ok(()));
1513        drop(tx11);
1514        drop(tx12);
1515        drop(tx21);
1516        drop(tx22);
1517
1518        assert_eq!(rx.recv(), Ok(5));
1519        assert_eq!(rx.recv(), Err(RecvError));
1520
1521        let _ = handle1.join();
1522        let _ = handle2.join();
1523        let _ = handle11.join();
1524        let _ = handle12.join();
1525        let _ = handle21.join();
1526        let _ = handle22.join();
1527    }
1528
1529    #[test]
1530    fn test_crazy_chain_drop_threads() {
1531        let (out_tx, rx) = new_channel(Some(0));
1532        let (tx1, in_rx1) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1533        let (tx2, in_rx2) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1534        let (tx11, in_rx11) = new_channel_with_dependency(Some(0), &tx1, &in_rx1);
1535        let (tx12, in_rx12) = new_channel_with_dependency(Some(0), &tx1, &in_rx1);
1536        let (tx21, in_rx21) = new_channel_with_dependency(Some(0), &tx2, &in_rx2);
1537        let (tx22, in_rx22) = new_channel_with_dependency(Some(0), &tx2, &in_rx2);
1538
1539        let v = out_tx.clone();
1540        let thread1_kill = Arc::new(AtomicBool::new(true));
1541        let tk1 = thread1_kill.clone();
1542        let handle1 = thread::spawn(move || {
1543            while tk1.load(Ordering::Acquire) {
1544                let _ = v.send(in_rx1.recv().unwrap());
1545            }
1546        });
1547
1548        let thread2_kill = Arc::new(AtomicBool::new(true));
1549        let tk2 = thread2_kill.clone();
1550        let handle2 = thread::spawn(move || {
1551            while tk2.load(Ordering::Acquire) {
1552                let _ = out_tx.send(in_rx2.recv().unwrap());
1553            }
1554        });
1555
1556        let v = tx1.clone();
1557        let handle11 = thread::spawn(move || loop {
1558            let _ = v.send(in_rx11.recv().unwrap());
1559        });
1560
1561        let handle12 = thread::spawn(move || loop {
1562            let _ = tx1.send(in_rx12.recv().unwrap());
1563        });
1564
1565        let v = tx2.clone();
1566        let handle21 = thread::spawn(move || loop {
1567            let _ = v.send(in_rx21.recv().unwrap());
1568        });
1569
1570        let handle22 = thread::spawn(move || loop {
1571            let _ = tx2.send(in_rx22.recv().unwrap());
1572        });
1573
1574        assert_eq!(tx11.send(1), Ok(()));
1575        assert_eq!(rx.recv(), Ok(1));
1576        assert_eq!(tx11.send(2), Ok(()));
1577        assert_eq!(rx.recv(), Ok(2));
1578        assert_eq!(tx11.send(3), Ok(()));
1579        assert_eq!(rx.recv(), Ok(3));
1580        assert_eq!(tx11.send(4), Ok(()));
1581        assert_eq!(rx.recv(), Ok(4));
1582
1583        thread1_kill.store(false, Ordering::Release);
1584        assert_eq!(tx11.send(5), Ok(()));
1585        assert_eq!(rx.recv(), Ok(5));
1586        let _ = handle1.join();
1587
1588        assert_eq!(tx11.send(6), Err(SendError(6)));
1589        assert_eq!(tx12.send(7), Err(SendError(7)));
1590        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1591
1592        thread2_kill.store(false, Ordering::Release);
1593        assert_eq!(tx21.send(8), Ok(()));
1594        assert_eq!(rx.recv(), Ok(8));
1595        let _ = handle2.join();
1596
1597        assert_eq!(tx21.send(9), Err(SendError(9)));
1598        assert_eq!(tx22.send(10), Err(SendError(10)));
1599        assert_eq!(rx.recv(), Err(RecvError));
1600
1601        let _ = handle11.join();
1602        let _ = handle12.join();
1603        let _ = handle21.join();
1604        let _ = handle22.join();
1605    }
1606
1607    #[test]
1608    fn test_dependency_sender_loss() {
1609        let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1610        let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1611
1612        assert_eq!(tx.send(1), Ok(()));
1613
1614        drop(dep_tx);
1615
1616        assert_eq!(tx.send(2), Err(SendError(2)));
1617        assert_eq!(rx.recv(), Ok(1));
1618        assert_eq!(rx.recv(), Err(RecvError));
1619
1620        drop(dep_rx);
1621
1622        assert_eq!(tx.send(3), Err(SendError(3)));
1623        assert_eq!(rx.recv(), Err(RecvError));
1624    }
1625
1626    #[test]
1627    fn test_dependency_receiver_loss() {
1628        let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1629        let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1630
1631        assert_eq!(tx.send(1), Ok(()));
1632
1633        drop(dep_rx);
1634
1635        assert_eq!(tx.send(2), Err(SendError(2)));
1636        assert_eq!(rx.recv(), Ok(1));
1637        assert_eq!(rx.recv(), Err(RecvError));
1638
1639        drop(dep_tx);
1640
1641        assert_eq!(tx.send(3), Err(SendError(3)));
1642        assert_eq!(rx.recv(), Err(RecvError));
1643    }
1644
1645    #[test]
1646    fn test_dependency_sender_loss_try() {
1647        let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1648        let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1649
1650        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1651        assert_eq!(tx.try_send(1), Ok(()));
1652        assert_eq!(tx.try_send(2), Err(TrySendError::Full(2)));
1653
1654        drop(dep_tx);
1655
1656        assert_eq!(tx.try_send(3), Err(TrySendError::Disconnected(3)));
1657        assert_eq!(rx.try_recv(), Ok(1));
1658        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1659
1660        drop(dep_rx);
1661
1662        assert_eq!(tx.try_send(4), Err(TrySendError::Disconnected(4)));
1663        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1664    }
1665
1666    #[test]
1667    fn test_dependency_receiver_loss_try() {
1668        let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1669        let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1670
1671        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1672        assert_eq!(tx.try_send(1), Ok(()));
1673        assert_eq!(tx.try_send(2), Err(TrySendError::Full(2)));
1674
1675        drop(dep_rx);
1676
1677        assert_eq!(tx.try_send(3), Err(TrySendError::Disconnected(3)));
1678        assert_eq!(rx.try_recv(), Ok(1));
1679        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1680
1681        drop(dep_tx);
1682
1683        assert_eq!(tx.try_send(4), Err(TrySendError::Disconnected(4)));
1684        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1685    }
1686
1687    #[test]
1688    fn test_dependency_sender_loss_timeout() {
1689        let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1690        let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1691
1692        let timeout = Duration::from_millis(10);
1693        let clock = Clock::new();
1694
1695        let mut s = clock.now();
1696        assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
1697        assert!(s.elapsed() >= timeout);
1698
1699        s = clock.now();
1700        assert_eq!(tx.send_timeout(1, timeout), Ok(()));
1701        assert!(s.elapsed() < timeout / 4);
1702
1703        s = clock.now();
1704        assert_eq!(
1705            tx.send_timeout(2, timeout),
1706            Err(SendTimeoutError::Timeout(2))
1707        );
1708        assert!(s.elapsed() >= timeout);
1709
1710        drop(dep_tx);
1711
1712        s = clock.now();
1713        assert_eq!(
1714            tx.send_timeout(3, timeout),
1715            Err(SendTimeoutError::Disconnected(3))
1716        );
1717        assert!(s.elapsed() < timeout / 4);
1718
1719        s = clock.now();
1720        assert_eq!(rx.recv_timeout(timeout), Ok(1));
1721        assert!(s.elapsed() < timeout / 4);
1722
1723        s = clock.now();
1724        assert_eq!(
1725            rx.recv_timeout(timeout),
1726            Err(RecvTimeoutError::Disconnected)
1727        );
1728        assert!(s.elapsed() < timeout / 4);
1729
1730        drop(dep_rx);
1731
1732        s = clock.now();
1733        assert_eq!(
1734            tx.send_timeout(4, timeout),
1735            Err(SendTimeoutError::Disconnected(4))
1736        );
1737        assert!(s.elapsed() < timeout / 4);
1738
1739        s = clock.now();
1740        assert_eq!(
1741            rx.recv_timeout(timeout),
1742            Err(RecvTimeoutError::Disconnected)
1743        );
1744        assert!(s.elapsed() < timeout / 4);
1745    }
1746
1747    #[test]
1748    fn test_dependency_receiver_loss_timeout() {
1749        let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1750        let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1751
1752        let timeout = Duration::from_millis(10);
1753        let clock = Clock::new();
1754
1755        let mut s = clock.now();
1756        assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
1757        assert!(s.elapsed() >= timeout);
1758
1759        s = clock.now();
1760        assert_eq!(tx.send_timeout(1, timeout), Ok(()));
1761        assert!(s.elapsed() < timeout / 4);
1762
1763        s = clock.now();
1764        assert_eq!(
1765            tx.send_timeout(2, timeout),
1766            Err(SendTimeoutError::Timeout(2))
1767        );
1768        assert!(s.elapsed() >= timeout);
1769
1770        drop(dep_rx);
1771
1772        s = clock.now();
1773        assert_eq!(
1774            tx.send_timeout(3, timeout),
1775            Err(SendTimeoutError::Disconnected(3))
1776        );
1777        assert!(s.elapsed() < timeout / 4);
1778
1779        s = clock.now();
1780        assert_eq!(rx.recv_timeout(timeout), Ok(1));
1781        assert!(s.elapsed() < timeout / 4);
1782
1783        s = clock.now();
1784        assert_eq!(
1785            rx.recv_timeout(timeout),
1786            Err(RecvTimeoutError::Disconnected)
1787        );
1788        assert!(s.elapsed() < timeout / 4);
1789
1790        drop(dep_tx);
1791
1792        s = clock.now();
1793        assert_eq!(
1794            tx.send_timeout(4, timeout),
1795            Err(SendTimeoutError::Disconnected(4))
1796        );
1797        assert!(s.elapsed() < timeout / 4);
1798
1799        s = clock.now();
1800        assert_eq!(
1801            rx.recv_timeout(timeout),
1802            Err(RecvTimeoutError::Disconnected)
1803        );
1804        assert!(s.elapsed() < timeout / 4);
1805    }
1806
1807    #[test]
1808    fn test_dependency_sender_loss_deadline() {
1809        let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1810        let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1811
1812        let timeout = Duration::from_millis(10);
1813        let clock = Clock::new();
1814
1815        let mut s = clock.now();
1816        let mut deadline = Instant::now() + timeout;
1817        assert_eq!(rx.recv_deadline(deadline), Err(RecvTimeoutError::Timeout));
1818        assert!(s.elapsed() >= timeout);
1819
1820        s = clock.now();
1821        deadline = Instant::now() + timeout;
1822        assert_eq!(tx.send_deadline(1, deadline), Ok(()));
1823        assert!(s.elapsed() < timeout / 4);
1824
1825        s = clock.now();
1826        deadline = Instant::now() + timeout;
1827        assert_eq!(
1828            tx.send_deadline(2, deadline),
1829            Err(SendTimeoutError::Timeout(2))
1830        );
1831        assert!(s.elapsed() >= timeout);
1832
1833        drop(dep_tx);
1834
1835        s = clock.now();
1836        deadline = Instant::now() + timeout;
1837        assert_eq!(
1838            tx.send_deadline(3, deadline),
1839            Err(SendTimeoutError::Disconnected(3))
1840        );
1841        assert!(s.elapsed() < timeout / 4);
1842
1843        s = clock.now();
1844        deadline = Instant::now() + timeout;
1845        assert_eq!(rx.recv_deadline(deadline), Ok(1));
1846        assert!(s.elapsed() < timeout / 4);
1847
1848        s = clock.now();
1849        deadline = Instant::now() + timeout;
1850        assert_eq!(
1851            rx.recv_deadline(deadline),
1852            Err(RecvTimeoutError::Disconnected)
1853        );
1854        assert!(s.elapsed() < timeout / 4);
1855
1856        drop(dep_rx);
1857
1858        s = clock.now();
1859        deadline = Instant::now() + timeout;
1860        assert_eq!(
1861            tx.send_deadline(4, deadline),
1862            Err(SendTimeoutError::Disconnected(4))
1863        );
1864        assert!(s.elapsed() < timeout / 4);
1865
1866        s = clock.now();
1867        deadline = Instant::now() + timeout;
1868        assert_eq!(
1869            rx.recv_deadline(deadline),
1870            Err(RecvTimeoutError::Disconnected)
1871        );
1872        assert!(s.elapsed() < timeout / 4);
1873    }
1874
1875    #[test]
1876    fn test_dependency_receiver_loss_deadline() {
1877        let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1878        let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1879
1880        let timeout = Duration::from_millis(10);
1881        let clock = Clock::new();
1882
1883        let mut s = clock.now();
1884        let mut deadline = Instant::now() + timeout;
1885        assert_eq!(rx.recv_deadline(deadline), Err(RecvTimeoutError::Timeout));
1886        assert!(s.elapsed() >= timeout);
1887
1888        s = clock.now();
1889        deadline = Instant::now() + timeout;
1890        assert_eq!(tx.send_deadline(1, deadline), Ok(()));
1891        assert!(s.elapsed() < timeout / 4);
1892
1893        s = clock.now();
1894        deadline = Instant::now() + timeout;
1895        assert_eq!(
1896            tx.send_deadline(2, deadline),
1897            Err(SendTimeoutError::Timeout(2))
1898        );
1899        assert!(s.elapsed() >= timeout);
1900
1901        drop(dep_rx);
1902
1903        s = clock.now();
1904        deadline = Instant::now() + timeout;
1905        assert_eq!(
1906            tx.send_deadline(3, deadline),
1907            Err(SendTimeoutError::Disconnected(3))
1908        );
1909        assert!(s.elapsed() < timeout / 4);
1910
1911        s = clock.now();
1912        deadline = Instant::now() + timeout;
1913        assert_eq!(rx.recv_deadline(deadline), Ok(1));
1914        assert!(s.elapsed() < timeout / 4);
1915
1916        s = clock.now();
1917        deadline = Instant::now() + timeout;
1918        assert_eq!(
1919            rx.recv_deadline(deadline),
1920            Err(RecvTimeoutError::Disconnected)
1921        );
1922        assert!(s.elapsed() < timeout / 4);
1923
1924        drop(dep_tx);
1925
1926        s = clock.now();
1927        deadline = Instant::now() + timeout;
1928        assert_eq!(
1929            tx.send_deadline(4, deadline),
1930            Err(SendTimeoutError::Disconnected(4))
1931        );
1932        assert!(s.elapsed() < timeout / 4);
1933
1934        s = clock.now();
1935        deadline = Instant::now() + timeout;
1936        assert_eq!(
1937            rx.recv_deadline(deadline),
1938            Err(RecvTimeoutError::Disconnected)
1939        );
1940        assert!(s.elapsed() < timeout / 4);
1941    }
1942}