linch/
schannel.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5    time::{Duration, Instant},
6};
7
8use crate::error::{
9    ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, SendError,
10    SendTimeoutError, TryRecvError, TrySelectError, TrySendError,
11};
12use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
13use futures::Stream;
14use pin_project_lite::pin_project;
15
16/// Creates a bounded channel with the specified capacity using the schannel implementation.
17///
18/// This channel is a wrapper around the crossbeam channel that provides a more
19/// ergonomic API for sending and receiving items with both synchronous and asynchronous operations.
20///
21/// # Performance Characteristics
22///
23/// The asynchronous send/receive operations in this implementation are optimized for high throughput
24/// scenarios. They use active polling which makes them suitable for situations where:
25///
26/// * Active async polling is acceptable and desired
27/// * Throughput and latency are more important than CPU efficiency
28/// * You want maximum performance for benchmarking against other channel implementations
29///
30/// This implementation does not expect channels to remain empty for extended periods and
31/// will continuously poll when operations are pending.
32///
33/// # Arguments
34///
35/// * `capacity` - The capacity of the channel buffer. Must be greater than 0.
36///
37/// # Returns
38///
39/// A tuple containing a [`Sender`] and [`Receiver`] pair.
40///
41/// # Panics
42///
43/// Panics if `capacity` is 0.
44///
45/// # Examples
46///
47/// ```rust
48/// use linch::schannel;
49///
50/// // Create a channel with capacity 10
51/// let (sender, receiver) = schannel::bounded(10);
52///
53/// // Send synchronously
54/// sender.send(42).unwrap();
55///
56/// // Receive synchronously
57/// let value = receiver.recv().unwrap();
58/// assert_eq!(value, 42);
59/// ```
60///
61/// # Async Example
62///
63/// ```rust
64/// use linch::schannel;
65///
66/// # tokio_test::block_on(async {
67/// let (sender, receiver) = schannel::bounded(10);
68///
69/// // Send asynchronously
70/// sender.send_async(42).await.unwrap();
71///
72/// // Receive asynchronously  
73/// let value = receiver.recv_async().await.unwrap();
74/// assert_eq!(value, 42);
75/// # });
76/// ```
77pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
78    assert!(capacity > 0);
79    let (tx, rx) = crossbeam_channel::bounded(capacity);
80    (Sender::new(tx), Receiver::new(rx))
81}
82
83/// Creates an unbounded channel using the schannel implementation.
84///
85/// This channel is a wrapper around the crossbeam unbounded channel that provides a more
86/// ergonomic API for sending and receiving items with both synchronous and asynchronous operations.
87///
88/// # Performance Characteristics
89///
90/// The asynchronous send/receive operations in this implementation are optimized for high throughput
91/// scenarios. They use active polling which makes them suitable for situations where:
92///
93/// * Active async polling is acceptable and desired
94/// * Throughput and latency are more important than CPU efficiency
95/// * You want maximum performance for benchmarking against other channel implementations
96///
97/// This implementation does not expect channels to remain empty for extended periods and
98/// will continuously poll when operations are pending.
99///
100/// Unlike bounded channels, unbounded channels have no capacity limit, so send operations
101/// will never block due to a full buffer. However, this can lead to unbounded memory growth
102/// if the receiver cannot keep up with the sender.
103///
104/// # Returns
105///
106/// A tuple containing a [`Sender`] and [`Receiver`] pair.
107///
108/// # Examples
109///
110/// ```rust
111/// use linch::schannel;
112///
113/// // Create an unbounded channel
114/// let (sender, receiver) = schannel::unbounded();
115///
116/// // Send synchronously - never blocks
117/// sender.send(42).unwrap();
118///
119/// // Receive synchronously
120/// let value = receiver.recv().unwrap();
121/// assert_eq!(value, 42);
122/// ```
123///
124/// # Async Example
125///
126/// ```rust
127/// use linch::schannel;
128///
129/// # tokio_test::block_on(async {
130/// let (sender, receiver) = schannel::unbounded();
131///
132/// // Send asynchronously - never blocks
133/// sender.send_async(42).await.unwrap();
134///
135/// // Receive asynchronously  
136/// let value = receiver.recv_async().await.unwrap();
137/// assert_eq!(value, 42);
138/// # });
139/// ```
140pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
141    let (tx, rx) = crossbeam_channel::unbounded();
142    (Sender::new(tx), Receiver::new(rx))
143}
144
145/// The sending half of an schannel channel.
146///
147/// This struct allows sending values both synchronously and asynchronously using
148/// an optimized implementation designed for high throughput scenarios.
149///
150/// Senders can be cloned to create multiple sending endpoints for the same channel.
151/// All synchronous crossbeam channel operations are available through [`Deref`].
152///
153/// # Examples
154///
155/// ```rust
156/// use linch::schannel;
157///
158/// let (sender, receiver) = schannel::bounded(5);
159///
160/// // Send synchronously
161/// sender.send(42).unwrap();
162///
163/// // Clone the sender
164/// let sender2 = sender.clone();
165/// sender2.send(43).unwrap();
166/// ```
167#[derive(Debug)]
168pub struct Sender<T> {
169    tx: CrossbeamSender<T>,
170}
171
172impl<T> Clone for Sender<T> {
173    fn clone(&self) -> Self {
174        Self {
175            tx: self.tx.clone(),
176        }
177    }
178}
179
180impl<T> Sender<T> {
181    /// Creates a new sender from the underlying crossbeam sender.
182    ///
183    /// This is typically not called directly by users, but rather through
184    /// [`bounded`].
185    pub fn new(tx: CrossbeamSender<T>) -> Self {
186        Self { tx }
187    }
188
189    /// Sends a value synchronously.
190    ///
191    /// This method blocks until there is space in the channel buffer or all receivers have been dropped.
192    ///
193    /// # Arguments
194    ///
195    /// * `value` - The value to send
196    ///
197    /// # Returns
198    ///
199    /// * `Ok(())` if the value was sent successfully
200    /// * `Err(SendError(value))` if all receivers have been dropped
201    ///
202    /// # Examples
203    ///
204    /// ```rust
205    /// use linch::schannel;
206    ///
207    /// let (sender, receiver) = schannel::bounded(1);
208    /// sender.send(42).unwrap();
209    /// assert_eq!(receiver.recv().unwrap(), 42);
210    /// ```
211    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
212        Ok(self.tx.send(value)?)
213    }
214
215    /// Sends a value synchronously with a timeout.
216    ///
217    /// This method blocks until there is space in the channel buffer, the timeout
218    /// expires, or all receivers have been dropped.
219    ///
220    /// # Arguments
221    ///
222    /// * `value` - The value to send
223    /// * `timeout` - The maximum duration to wait
224    ///
225    /// # Returns
226    ///
227    /// * `Ok(())` if the value was sent successfully
228    /// * `Err(SendTimeoutError::Timeout(value))` if the timeout expired
229    /// * `Err(SendTimeoutError::Disconnected(value))` if all receivers have been dropped
230    ///
231    /// # Examples
232    ///
233    /// ```rust
234    /// use linch::schannel;
235    /// use std::time::Duration;
236    ///
237    /// let (sender, _receiver) = schannel::bounded(1);
238    /// sender.send(1).unwrap(); // Fill the buffer
239    ///
240    /// // This will timeout since the buffer is full
241    /// let result = sender.send_timeout(2, Duration::from_millis(10));
242    /// assert!(result.is_err());
243    /// ```
244    pub fn send_timeout(&self, value: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
245        Ok(self.tx.send_timeout(value, timeout)?)
246    }
247
248    /// Attempts to send a value without blocking.
249    ///
250    /// This method returns immediately without blocking.
251    ///
252    /// # Arguments
253    ///
254    /// * `value` - The value to send
255    ///
256    /// # Returns
257    ///
258    /// * `Ok(())` if the value was sent successfully
259    /// * `Err(TrySendError::Full(value))` if the channel is full
260    /// * `Err(TrySendError::Disconnected(value))` if all receivers have been dropped
261    ///
262    /// # Examples
263    ///
264    /// ```rust
265    /// use linch::schannel;
266    ///
267    /// let (sender, receiver) = schannel::bounded(1);
268    /// sender.send(1).unwrap(); // Fill the buffer
269    ///
270    /// // This will fail since the buffer is full
271    /// let result = sender.try_send(2);
272    /// assert!(result.is_err());
273    /// ```
274    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
275        Ok(self.tx.try_send(value)?)
276    }
277
278    /// Sends a value asynchronously with a timeout.
279    ///
280    /// The timeout is calculated from the time this method is called, not when
281    /// the future is first polled.
282    ///
283    /// # Arguments
284    ///
285    /// * `item` - The value to send
286    /// * `timeout` - The maximum duration to wait
287    ///
288    /// # Returns
289    ///
290    /// A [`SendTimeoutFut`] that resolves to:
291    /// * `Ok(())` if the value was sent successfully
292    /// * `Err(SendTimeoutError::Timeout(item))` if the timeout expired
293    /// * `Err(SendTimeoutError::Disconnected(item))` if all receivers have been dropped
294    ///
295    /// # Examples
296    ///
297    /// ```rust
298    /// use linch::schannel;
299    /// use std::time::Duration;
300    ///
301    /// # tokio_test::block_on(async {
302    /// let (sender, _receiver) = schannel::bounded(1);
303    /// sender.send(1).unwrap(); // Fill the buffer
304    ///
305    /// // This will timeout since the buffer is full
306    /// let result = sender.send_timeout_async(2, Duration::from_millis(10)).await;
307    /// assert!(result.is_err());
308    /// # });
309    /// ```
310    pub fn send_timeout_async(&self, item: T, timeout: Duration) -> SendTimeoutFut<'_, T> {
311        SendTimeoutFut {
312            tx: self,
313            item: Some(item),
314            deadline: Some(Instant::now() + timeout),
315        }
316    }
317
318    /// Sends a value asynchronously.
319    ///
320    /// This method returns a future that will complete when there is space in the
321    /// channel buffer or when all receivers have been dropped.
322    ///
323    /// # Arguments
324    ///
325    /// * `item` - The value to send
326    ///
327    /// # Returns
328    ///
329    /// A [`SendFut`] that resolves to:
330    /// * `Ok(())` if the value was sent successfully
331    /// * `Err(SendError(item))` if all receivers have been dropped
332    ///
333    /// # Examples
334    ///
335    /// ```rust
336    /// use linch::schannel;
337    ///
338    /// # tokio_test::block_on(async {
339    /// let (sender, receiver) = schannel::bounded(1);
340    /// sender.send_async(42).await.unwrap();
341    /// assert_eq!(receiver.recv().unwrap(), 42);
342    /// # });
343    /// ```
344    pub fn send_async(&self, item: T) -> SendFut<'_, T> {
345        SendFut {
346            fut: SendTimeoutFut {
347                tx: self,
348                item: Some(item),
349                deadline: None,
350            },
351        }
352    }
353}
354
355impl<T> From<CrossbeamSender<T>> for Sender<T> {
356    fn from(tx: CrossbeamSender<T>) -> Self {
357        Self { tx }
358    }
359}
360
361/// A future representing an asynchronous send operation.
362///
363/// This future is created by the [`send_async`](Sender::send_async) method and will
364/// complete when the value has been sent or when all receivers have been dropped.
365///
366/// # Examples
367///
368/// ```rust
369/// use linch::schannel;
370///
371/// # tokio_test::block_on(async {
372/// let (sender, receiver) = schannel::bounded(1);
373/// let send_fut = sender.send_async(42);
374/// send_fut.await.unwrap();
375/// assert_eq!(receiver.recv().unwrap(), 42);
376/// # });
377/// ```
378pub struct SendFut<'a, T> {
379    fut: SendTimeoutFut<'a, T>,
380}
381
382impl<'a, T> SendFut<'a, T> {
383    pub fn new(tx: &'a Sender<T>, item: T, timeout: Option<Duration>) -> Self {
384        Self {
385            fut: SendTimeoutFut {
386                tx,
387                item: Some(item),
388                deadline: timeout.map(|d| Instant::now() + d),
389            },
390        }
391    }
392}
393
394impl<'a, T> Future for SendFut<'a, T> {
395    type Output = Result<(), SendError<T>>;
396
397    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
398        match Pin::new(&mut self.fut).poll(cx) {
399            Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
400            Poll::Ready(Err(SendTimeoutError::Timeout(_))) => {
401                unreachable!("SendTimeoutError::Timeout should not be returned by SendFut");
402            }
403            Poll::Ready(Err(SendTimeoutError::Disconnected(item))) => {
404                Poll::Ready(Err(SendError(item)))
405            }
406            Poll::Pending => Poll::Pending,
407        }
408    }
409}
410
411pin_project! {
412    /// A future representing an asynchronous send operation with a timeout.
413    ///
414    /// This future is created by the [`send_timeout_async`](Sender::send_timeout_async) method and will
415    /// complete when the value has been sent, the timeout expires, or when all receivers have been dropped.
416    ///
417    /// # Examples
418    ///
419    /// ```rust
420    /// use linch::schannel;
421    /// use std::time::Duration;
422    ///
423    /// # tokio_test::block_on(async {
424    /// let (sender, receiver) = schannel::bounded(1);
425    /// let timeout_fut = sender.send_timeout_async(42, Duration::from_secs(1));
426    /// timeout_fut.await.unwrap();
427    /// assert_eq!(receiver.recv().unwrap(), 42);
428    /// # });
429    /// ```
430    pub struct SendTimeoutFut<'a, T> {
431        tx: &'a Sender<T>,
432        item: Option<T>,
433        deadline: Option<Instant>,
434    }
435}
436
437// this is not very efficient. we should not always take/replace the item.
438// but in order to avoid that we would need to pin the item, send a pointer to the item,
439// and then have the receiver take the item.
440// this is not worth the effort right now.
441impl<'a, T> Future for SendTimeoutFut<'a, T> {
442    type Output = Result<(), SendTimeoutError<T>>;
443
444    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
445        let this = self.project();
446
447        if let Some(deadline) = this.deadline {
448            if deadline <= &mut Instant::now() {
449                return Poll::Ready(Err(SendTimeoutError::Timeout(this.item.take().unwrap())));
450            }
451        }
452
453        match this.tx.try_send(this.item.take().unwrap()) {
454            Ok(_) => Poll::Ready(Ok(())),
455            Err(e) => match e {
456                TrySendError::Disconnected(item) => {
457                    Poll::Ready(Err(SendTimeoutError::Disconnected(item)))
458                }
459                TrySendError::Full(item) => {
460                    this.item.replace(item);
461                    cx.waker().wake_by_ref();
462                    Poll::Pending
463                }
464            },
465        }
466    }
467}
468
469/// The receiving half of an schannel channel.
470///
471/// This struct allows receiving values both synchronously and asynchronously using
472/// an optimized implementation designed for high throughput scenarios.
473///
474/// Receivers can be cloned to create multiple receiving endpoints for the same channel.
475/// All synchronous crossbeam channel operations are available through [`Deref`].
476/// The receiver can also be used as a [`Stream`] for async iteration.
477///
478/// # Examples
479///
480/// ```rust
481/// use linch::schannel;
482///
483/// let (sender, receiver) = schannel::bounded(5);
484/// sender.send(42).unwrap();
485///
486/// // Receive synchronously
487/// let value = receiver.recv().unwrap();
488/// assert_eq!(value, 42);
489///
490/// // Clone the receiver
491/// let receiver2 = receiver.clone();
492/// ```
493#[derive(Debug)]
494pub struct Receiver<T> {
495    rx: CrossbeamReceiver<T>,
496}
497
498impl<T> Clone for Receiver<T> {
499    fn clone(&self) -> Self {
500        Self {
501            rx: self.rx.clone(),
502        }
503    }
504}
505
506impl<T> Receiver<T> {
507    /// Creates a new receiver from the underlying crossbeam receiver.
508    ///
509    /// This is typically not called directly by users, but rather through
510    /// [`bounded`].
511    pub fn new(rx: CrossbeamReceiver<T>) -> Self {
512        Self { rx }
513    }
514
515    /// Receives a value synchronously.
516    ///
517    /// This method blocks until a value is available in the channel or all senders have been dropped.
518    ///
519    /// # Returns
520    ///
521    /// * `Ok(value)` if a value was received successfully
522    /// * `Err(RecvError)` if all senders have been dropped and the channel is empty
523    ///
524    /// # Examples
525    ///
526    /// ```rust
527    /// use linch::schannel;
528    ///
529    /// let (sender, receiver) = schannel::bounded(1);
530    /// sender.send(42).unwrap();
531    /// assert_eq!(receiver.recv().unwrap(), 42);
532    /// ```
533    pub fn recv(&self) -> Result<T, RecvError> {
534        Ok(self.rx.recv()?)
535    }
536
537    /// Receives a value synchronously with a timeout.
538    ///
539    /// This method blocks until a value is available in the channel, the timeout
540    /// expires, or all senders have been dropped.
541    ///
542    /// # Arguments
543    ///
544    /// * `timeout` - The maximum duration to wait
545    ///
546    /// # Returns
547    ///
548    /// * `Ok(value)` if a value was received successfully
549    /// * `Err(RecvTimeoutError::Timeout)` if the timeout expired
550    /// * `Err(RecvTimeoutError::Disconnected)` if all senders have been dropped
551    ///
552    /// # Examples
553    ///
554    /// ```rust
555    /// use linch::schannel;
556    /// use std::time::Duration;
557    ///
558    /// let (_sender, receiver) = schannel::bounded::<i32>(1);
559    ///
560    /// // This will timeout since no values are sent
561    /// let result = receiver.recv_timeout(Duration::from_millis(10));
562    /// assert!(result.is_err());
563    /// ```
564    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
565        Ok(self.rx.recv_timeout(timeout)?)
566    }
567
568    /// Attempts to receive a value without blocking.
569    ///
570    /// This method returns immediately without blocking.
571    ///
572    /// # Returns
573    ///
574    /// * `Ok(value)` if a value was received successfully
575    /// * `Err(TryRecvError::Empty)` if the channel is empty but not closed
576    /// * `Err(TryRecvError::Disconnected)` if all senders have been dropped and the channel is empty
577    ///
578    /// # Examples
579    ///
580    /// ```rust
581    /// use linch::schannel;
582    ///
583    /// let (sender, receiver) = schannel::bounded(1);
584    ///
585    /// // This will fail since no values are available
586    /// let result = receiver.try_recv();
587    /// assert!(result.is_err());
588    ///
589    /// sender.send(42).unwrap();
590    /// assert_eq!(receiver.try_recv().unwrap(), 42);
591    /// ```
592    pub fn try_recv(&self) -> Result<T, TryRecvError> {
593        Ok(self.rx.try_recv()?)
594    }
595
596    /// Receives a value asynchronously with a timeout.
597    ///
598    /// The timeout is calculated from the time this method is called, not when
599    /// the future is first polled.
600    ///
601    /// # Arguments
602    ///
603    /// * `timeout` - The maximum duration to wait
604    ///
605    /// # Returns
606    ///
607    /// A [`RecvTimeoutFut`] that resolves to:
608    /// * `Ok(value)` if a value was received successfully
609    /// * `Err(RecvTimeoutError::Timeout)` if the timeout expired
610    /// * `Err(RecvTimeoutError::Disconnected)` if all senders have been dropped
611    ///
612    /// # Examples
613    ///
614    /// ```rust
615    /// use linch::schannel;
616    /// use std::time::Duration;
617    ///
618    /// # tokio_test::block_on(async {
619    /// let (sender, receiver) = schannel::bounded(1);
620    /// sender.send(42).unwrap();
621    /// let timeout_fut = receiver.recv_timeout_async(Duration::from_secs(1));
622    /// assert_eq!(timeout_fut.await.unwrap(), 42);
623    /// # });
624    /// ```
625    pub fn recv_timeout_async(&self, timeout: Duration) -> RecvTimeoutFut<'_, T> {
626        RecvTimeoutFut {
627            rx: self,
628            deadline: Some(Instant::now() + timeout),
629        }
630    }
631
632    /// Receives a value asynchronously.
633    ///
634    /// This method returns a future that will complete when a value is available
635    /// in the channel or when all senders have been dropped.
636    ///
637    /// # Returns
638    ///
639    /// A [`RecvFut`] that resolves to:
640    /// * `Ok(value)` if a value was received successfully
641    /// * `Err(RecvError)` if all senders have been dropped and the channel is empty
642    ///
643    /// # Examples
644    ///
645    /// ```rust
646    /// use linch::schannel;
647    ///
648    /// # tokio_test::block_on(async {
649    /// let (sender, receiver) = schannel::bounded(1);
650    /// sender.send(42).unwrap();
651    /// assert_eq!(receiver.recv_async().await.unwrap(), 42);
652    /// # });
653    /// ```
654    pub fn recv_async(&self) -> RecvFut<'_, T> {
655        RecvFut {
656            fut: RecvTimeoutFut {
657                rx: self,
658                deadline: None,
659            },
660        }
661    }
662
663    /// Converts the receiver into a stream.
664    ///
665    /// This allows using the receiver with async stream utilities and for iteration.
666    ///
667    /// # Returns
668    ///
669    /// A [`RecvStream`] that yields values from the channel
670    ///
671    /// # Examples
672    ///
673    /// ```rust
674    /// use linch::schannel;
675    /// use futures::StreamExt;
676    ///
677    /// # tokio_test::block_on(async {
678    /// let (sender, receiver) = schannel::bounded(3);
679    /// sender.send(1).unwrap();
680    /// sender.send(2).unwrap();
681    /// sender.send(3).unwrap();
682    /// drop(sender); // Close the channel
683    ///
684    /// let mut stream = receiver.into_stream();
685    /// let values: Vec<_> = stream.collect().await;
686    /// assert_eq!(values, vec![1, 2, 3]);
687    /// # });
688    /// ```
689    pub fn into_stream(self) -> RecvStream<T> {
690        RecvStream { rx: self }
691    }
692}
693
694impl<T> From<CrossbeamReceiver<T>> for Receiver<T> {
695    fn from(rx: CrossbeamReceiver<T>) -> Self {
696        Self { rx }
697    }
698}
699
700pin_project! {
701    /// A future representing an asynchronous receive operation with a timeout.
702    ///
703    /// This future is created by the [`recv_timeout_async`](Receiver::recv_timeout_async) method and will
704    /// complete when a value is available, the timeout expires, or when all senders have been dropped.
705    ///
706    /// # Examples
707    ///
708    /// ```rust
709    /// use linch::schannel;
710    /// use std::time::Duration;
711    ///
712    /// # tokio_test::block_on(async {
713    /// let (sender, receiver) = schannel::bounded(1);
714    /// sender.send(42).unwrap();
715    /// let timeout_fut = receiver.recv_timeout_async(Duration::from_secs(1));
716    /// assert_eq!(timeout_fut.await.unwrap(), 42);
717    /// # });
718    /// ```
719    pub struct RecvTimeoutFut<'a, T> {
720        rx: &'a Receiver<T>,
721        deadline: Option<Instant>,
722    }
723}
724
725impl<'a, T> Future for RecvTimeoutFut<'a, T> {
726    type Output = Result<T, RecvTimeoutError>;
727
728    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
729        let this = self.project();
730
731        if let Some(deadline) = this.deadline {
732            if deadline <= &mut Instant::now() {
733                return Poll::Ready(Err(RecvTimeoutError::Timeout));
734            }
735        }
736
737        match this.rx.try_recv() {
738            Ok(item) => Poll::Ready(Ok(item)),
739            Err(e) => match e {
740                TryRecvError::Disconnected => Poll::Ready(Err(RecvTimeoutError::Disconnected)),
741                TryRecvError::Empty => {
742                    cx.waker().wake_by_ref();
743                    Poll::Pending
744                }
745            },
746        }
747    }
748}
749
750/// A future representing an asynchronous receive operation.
751///
752/// This future is created by the [`recv_async`](Receiver::recv_async) method and will
753/// complete when a value is available or when all senders have been dropped.
754///
755/// # Examples
756///
757/// ```rust
758/// use linch::schannel;
759///
760/// # tokio_test::block_on(async {
761/// let (sender, receiver) = schannel::bounded(1);
762/// sender.send(42).unwrap();
763/// let recv_fut = receiver.recv_async();
764/// assert_eq!(recv_fut.await.unwrap(), 42);
765/// # });
766/// ```
767pub struct RecvFut<'a, T> {
768    fut: RecvTimeoutFut<'a, T>,
769}
770
771impl<'a, T> Future for RecvFut<'a, T> {
772    type Output = Result<T, RecvError>;
773
774    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
775        match Pin::new(&mut self.fut).poll(cx) {
776            Poll::Ready(Ok(item)) => Poll::Ready(Ok(item)),
777            Poll::Ready(Err(RecvTimeoutError::Timeout)) => {
778                unreachable!("RecvTimeoutError::Timeout should not be returned by RecvTimeoutFut");
779            }
780            Poll::Ready(Err(RecvTimeoutError::Disconnected)) => Poll::Ready(Err(RecvError)),
781            Poll::Pending => Poll::Pending,
782        }
783    }
784}
785
786/// A stream that yields values from a channel receiver.
787///
788/// This stream is created by the [`into_stream`](Receiver::into_stream) method and implements
789/// the [`Stream`] trait for async iteration over channel values.
790///
791/// The stream will yield values until all senders are dropped and the channel is empty.
792///
793/// # Examples
794///
795/// ```rust
796/// use linch::schannel;
797/// use futures::StreamExt;
798///
799/// # tokio_test::block_on(async {
800/// let (sender, receiver) = schannel::bounded(3);
801/// sender.send(1).unwrap();
802/// sender.send(2).unwrap();
803/// drop(sender); // Close the channel
804///
805/// let mut stream = receiver.into_stream();
806/// let mut values = Vec::new();
807/// while let Some(value) = stream.next().await {
808///     values.push(value);
809/// }
810/// assert_eq!(values, vec![1, 2]);
811/// # });
812/// ```
813pub struct RecvStream<T> {
814    rx: Receiver<T>,
815}
816
817impl<T> Stream for RecvStream<T> {
818    type Item = T;
819
820    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
821        let mut fut = self.rx.recv_async();
822
823        match Pin::new(&mut fut).poll(cx) {
824            Poll::Ready(Ok(item)) => Poll::Ready(Some(item)),
825            Poll::Ready(Err(_e)) => Poll::Ready(None),
826            Poll::Pending => Poll::Pending,
827        }
828    }
829}
830
831/// A handle for selecting over multiple schannel operations.
832///
833/// This allows you to wait on multiple send or receive operations simultaneously
834/// and get notified when any one of them becomes ready.
835///
836/// # Examples
837///
838/// ```rust
839/// use linch::schannel;
840///
841/// let (tx1, rx1) = schannel::bounded(1);
842/// let (tx2, rx2) = schannel::bounded(1);
843///
844/// tx1.send(1).unwrap();
845/// tx2.send(2).unwrap();
846///
847/// let mut sel = schannel::Select::new();
848/// let idx1 = sel.recv(&rx1);
849/// let idx2 = sel.recv(&rx2);
850///
851/// let op = sel.select();
852/// match op.index() {
853///     i if i == idx1 => {
854///         let value = op.recv(&rx1).unwrap();
855///         println!("Received {} from first channel", value);
856///     },
857///     i if i == idx2 => {
858///         let value = op.recv(&rx2).unwrap();
859///         println!("Received {} from second channel", value);
860///     },
861///     _ => unreachable!(),
862/// }
863/// ```
864pub struct Select<'a> {
865    select: crossbeam_channel::Select<'a>,
866}
867
868impl<'a> Default for Select<'a> {
869    fn default() -> Self {
870        Self::new()
871    }
872}
873
874impl<'a> Select<'a> {
875    /// Creates a new select handle.
876    ///
877    /// Operations will be selected in a pseudo-random order to ensure fairness.
878    ///
879    /// # Examples
880    ///
881    /// ```rust
882    /// use linch::schannel;
883    ///
884    /// let mut sel = schannel::Select::new();
885    /// ```
886    pub fn new() -> Self {
887        let select = crossbeam_channel::Select::new();
888        Self { select }
889    }
890
891    /// Creates a new biased select handle.
892    ///
893    /// Operations will be selected in the order they were added, giving
894    /// priority to earlier operations.
895    ///
896    /// # Examples
897    ///
898    /// ```rust
899    /// use linch::schannel;
900    ///
901    /// let mut sel = schannel::Select::new_biased();
902    /// ```
903    pub fn new_biased() -> Self {
904        let select = crossbeam_channel::Select::new_biased();
905        Self { select }
906    }
907
908    /// Adds a send operation to the select.
909    ///
910    /// Returns the index of this operation, which can be used to identify
911    /// which operation was selected.
912    ///
913    /// # Arguments
914    ///
915    /// * `sender` - The sender to add to the select
916    ///
917    /// # Returns
918    ///
919    /// The index of this send operation
920    ///
921    /// # Examples
922    ///
923    /// ```rust
924    /// use linch::schannel;
925    ///
926    /// let (tx, _rx) = schannel::bounded::<i32>(1);
927    /// let mut sel = schannel::Select::new();
928    /// let send_idx = sel.send(&tx);
929    /// ```
930    pub fn send<T>(&mut self, sender: &'a Sender<T>) -> usize {
931        self.select.send(&sender.tx)
932    }
933
934    /// Adds a receive operation to the select.
935    ///
936    /// Returns the index of this operation, which can be used to identify
937    /// which operation was selected.
938    ///
939    /// # Arguments
940    ///
941    /// * `receiver` - The receiver to add to the select
942    ///
943    /// # Returns
944    ///
945    /// The index of this receive operation
946    ///
947    /// # Examples
948    ///
949    /// ```rust
950    /// use linch::schannel;
951    ///
952    /// let (_tx, rx) = schannel::bounded::<i32>(1);
953    /// let mut sel = schannel::Select::new();
954    /// let recv_idx = sel.recv(&rx);
955    /// ```
956    pub fn recv<T>(&mut self, receiver: &'a Receiver<T>) -> usize {
957        self.select.recv(&receiver.rx)
958    }
959
960    /// Blocks until one of the operations becomes ready and returns it.
961    ///
962    /// # Returns
963    ///
964    /// A [`SelectedOperation`] that can be used to complete the operation
965    ///
966    /// # Examples
967    ///
968    /// ```rust
969    /// use linch::schannel;
970    ///
971    /// let (tx, rx) = schannel::bounded(1);
972    /// tx.send(42).unwrap();
973    ///
974    /// let mut sel = schannel::Select::new();
975    /// let recv_idx = sel.recv(&rx);
976    ///
977    /// let op = sel.select();
978    /// if op.index() == recv_idx {
979    ///     let value = op.recv(&rx).unwrap();
980    ///     assert_eq!(value, 42);
981    /// }
982    /// ```
983    pub fn select(&mut self) -> SelectedOperation<'a> {
984        self.select.select().into()
985    }
986
987    /// Attempts to select a ready operation without blocking.
988    ///
989    /// # Returns
990    ///
991    /// * `Ok(SelectedOperation)` if an operation was ready
992    /// * `Err(TrySelectError)` if no operations were ready
993    ///
994    /// # Examples
995    ///
996    /// ```rust
997    /// use linch::schannel;
998    ///
999    /// let (tx, rx) = schannel::bounded::<i32>(1);
1000    /// let mut sel = schannel::Select::new();
1001    /// let recv_idx = sel.recv(&rx);
1002    ///
1003    /// // This will return an error since no values are available
1004    /// assert!(sel.try_select().is_err());
1005    ///
1006    /// tx.send(42).unwrap();
1007    /// // Now it will succeed
1008    /// let op = sel.try_select().unwrap();
1009    /// if op.index() == recv_idx {
1010    ///     let value = op.recv(&rx).unwrap();
1011    ///     assert_eq!(value, 42);
1012    /// }
1013    /// ```
1014    pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
1015        Ok(SelectedOperation::from(self.select.try_select()?))
1016    }
1017
1018    /// Blocks until one of the operations becomes ready or the timeout expires.
1019    ///
1020    /// # Arguments
1021    ///
1022    /// * `timeout` - The maximum duration to wait
1023    ///
1024    /// # Returns
1025    ///
1026    /// * `Ok(SelectedOperation)` if an operation became ready
1027    /// * `Err(SelectTimeoutError)` if the timeout expired
1028    ///
1029    /// # Examples
1030    ///
1031    /// ```rust
1032    /// use linch::schannel;
1033    /// use std::time::Duration;
1034    ///
1035    /// let (_tx, rx) = schannel::bounded::<i32>(1);
1036    /// let mut sel = schannel::Select::new();
1037    /// sel.recv(&rx);
1038    ///
1039    /// // This will timeout since no values are available
1040    /// let result = sel.select_timeout(Duration::from_millis(10));
1041    /// assert!(result.is_err());
1042    /// ```
1043    pub fn select_timeout(
1044        &mut self,
1045        timeout: Duration,
1046    ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
1047        Ok(SelectedOperation::from(
1048            self.select.select_timeout(timeout)?,
1049        ))
1050    }
1051
1052    /// Returns the index of a ready operation, if any.
1053    ///
1054    /// This is a non-blocking operation that returns immediately.
1055    ///
1056    /// # Returns
1057    ///
1058    /// The index of a ready operation, or a value indicating no operations are ready
1059    pub fn ready(&mut self) -> usize {
1060        self.select.ready()
1061    }
1062
1063    /// Returns the index of a ready operation, waiting up to the timeout.
1064    ///
1065    /// # Arguments
1066    ///
1067    /// * `timeout` - The maximum duration to wait
1068    ///
1069    /// # Returns
1070    ///
1071    /// * `Ok(index)` if an operation became ready
1072    /// * `Err(ReadyTimeoutError)` if the timeout expired
1073    pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1074        Ok(self.select.ready_timeout(timeout)?)
1075    }
1076
1077    /// Removes an operation from the select.
1078    ///
1079    /// # Arguments
1080    ///
1081    /// * `index` - The index of the operation to remove
1082    pub fn remove(&mut self, index: usize) {
1083        self.select.remove(index);
1084    }
1085}
1086
1087/// A selected operation that is ready to complete.
1088///
1089/// This struct represents an operation that was selected by a [`Select`] and is
1090/// ready to be executed. You can use the [`index`](SelectedOperation::index) method
1091/// to determine which operation was selected, and then call the appropriate
1092/// [`send`](SelectedOperation::send) or [`recv`](SelectedOperation::recv) method.
1093///
1094/// # Examples
1095///
1096/// ```rust
1097/// use linch::schannel;
1098///
1099/// let (tx, rx) = schannel::bounded(1);
1100/// tx.send(42).unwrap();
1101///
1102/// let mut sel = schannel::Select::new();
1103/// let recv_idx = sel.recv(&rx);
1104///
1105/// let op = sel.select();
1106/// if op.index() == recv_idx {
1107///     let value = op.recv(&rx).unwrap();
1108///     assert_eq!(value, 42);
1109/// }
1110/// ```
1111pub struct SelectedOperation<'a>(crossbeam_channel::SelectedOperation<'a>);
1112
1113impl<'a> From<crossbeam_channel::SelectedOperation<'a>> for SelectedOperation<'a> {
1114    fn from(value: crossbeam_channel::SelectedOperation<'a>) -> Self {
1115        Self(value)
1116    }
1117}
1118
1119impl<'a> SelectedOperation<'a> {
1120    /// Returns the index of the selected operation.
1121    ///
1122    /// This index corresponds to the value returned by the [`send`](Select::send)
1123    /// or [`recv`](Select::recv) method that added this operation to the select.
1124    ///
1125    /// # Returns
1126    ///
1127    /// The index of the selected operation
1128    ///
1129    /// # Examples
1130    ///
1131    /// ```rust
1132    /// use linch::schannel;
1133    ///
1134    /// let (tx, rx) = schannel::bounded::<i32>(1);
1135    /// tx.send(42).unwrap();
1136    ///
1137    /// let mut sel = schannel::Select::new();
1138    /// let recv_idx = sel.recv(&rx);
1139    ///
1140    /// let op = sel.select();
1141    /// assert_eq!(op.index(), recv_idx);
1142    /// let value = op.recv(&rx).unwrap();
1143    /// assert_eq!(value, 42);
1144    /// ```
1145    pub fn index(&self) -> usize {
1146        self.0.index()
1147    }
1148
1149    /// Completes the selected send operation.
1150    ///
1151    /// This method should only be called if the selected operation was a send operation.
1152    ///
1153    /// # Arguments
1154    ///
1155    /// * `sender` - The sender that was selected
1156    /// * `msg` - The message to send
1157    ///
1158    /// # Returns
1159    ///
1160    /// * `Ok(())` if the message was sent successfully
1161    /// * `Err(SendError(msg))` if all receivers have been dropped
1162    ///
1163    /// # Examples
1164    ///
1165    /// ```rust
1166    /// use linch::schannel;
1167    ///
1168    /// let (tx, rx) = schannel::bounded(1);
1169    /// let mut sel = schannel::Select::new();
1170    /// let send_idx = sel.send(&tx);
1171    ///
1172    /// let op = sel.select();
1173    /// if op.index() == send_idx {
1174    ///     op.send(&tx, 42).unwrap();
1175    ///     assert_eq!(rx.recv().unwrap(), 42);
1176    /// }
1177    /// ```
1178    pub fn send<T>(self, sender: &'a Sender<T>, msg: T) -> Result<(), SendError<T>> {
1179        Ok(self.0.send(&sender.tx, msg)?)
1180    }
1181
1182    /// Completes the selected receive operation.
1183    ///
1184    /// This method should only be called if the selected operation was a receive operation.
1185    ///
1186    /// # Arguments
1187    ///
1188    /// * `receiver` - The receiver that was selected
1189    ///
1190    /// # Returns
1191    ///
1192    /// * `Ok(value)` if a value was received successfully
1193    /// * `Err(RecvError)` if all senders have been dropped and the channel is empty
1194    ///
1195    /// # Examples
1196    ///
1197    /// ```rust
1198    /// use linch::schannel;
1199    ///
1200    /// let (tx, rx) = schannel::bounded(1);
1201    /// tx.send(42).unwrap();
1202    ///
1203    /// let mut sel = schannel::Select::new();
1204    /// let recv_idx = sel.recv(&rx);
1205    ///
1206    /// let op = sel.select();
1207    /// if op.index() == recv_idx {
1208    ///     let value = op.recv(&rx).unwrap();
1209    ///     assert_eq!(value, 42);
1210    /// }
1211    /// ```
1212    pub fn recv<T>(self, receiver: &'a Receiver<T>) -> Result<T, RecvError> {
1213        Ok(self.0.recv(&receiver.rx)?)
1214    }
1215}
1216
1217#[cfg(test)]
1218mod tests {
1219    use super::*;
1220    use crossbeam_channel;
1221    use std::{thread, time::Duration};
1222    use tokio::time::timeout;
1223
1224    fn create_bounded_channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
1225        let (tx, rx) = crossbeam_channel::bounded(capacity);
1226        (Sender::new(tx), Receiver::new(rx))
1227    }
1228
1229    // Test 1: Synchronous send, synchronous receive
1230    #[test]
1231    fn test_sync_send_sync_recv_success() {
1232        let (sender, receiver) = create_bounded_channel(1);
1233
1234        // Send synchronously
1235        sender.send(42).unwrap();
1236
1237        // Receive synchronously
1238        let received = receiver.recv().unwrap();
1239        assert_eq!(received, 42);
1240    }
1241
1242    #[test]
1243    fn test_sync_send_sync_recv_with_timeout_success() {
1244        let (sender, receiver) = create_bounded_channel(1);
1245
1246        // Send synchronously
1247        sender.send(42).unwrap();
1248
1249        // Receive synchronously with timeout
1250        let received = receiver.recv_timeout(Duration::from_millis(100)).unwrap();
1251        assert_eq!(received, 42);
1252    }
1253
1254    #[test]
1255    fn test_sync_send_sync_recv_timeout() {
1256        let (_sender, receiver) = create_bounded_channel::<i32>(1);
1257
1258        // Try to receive with timeout - should timeout since nothing was sent
1259        let result = receiver.recv_timeout(Duration::from_millis(10));
1260        assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
1261    }
1262
1263    #[test]
1264    fn test_sync_send_sync_recv_disconnected() {
1265        let (sender, receiver) = create_bounded_channel::<i32>(1);
1266
1267        // Drop sender to simulate disconnection
1268        drop(sender);
1269
1270        // Try to receive - should return disconnected error
1271        let result = receiver.recv();
1272        assert!(matches!(result, Err(RecvError)));
1273    }
1274
1275    // Test 2: Synchronous send, asynchronous receive
1276    #[tokio::test]
1277    async fn test_sync_send_async_recv_success() {
1278        let (sender, receiver) = create_bounded_channel(1);
1279
1280        // Send synchronously in a separate thread
1281        let sender_clone = sender.clone();
1282        thread::spawn(move || {
1283            thread::sleep(Duration::from_millis(10));
1284            sender_clone.send(42).unwrap();
1285        });
1286
1287        // Receive asynchronously
1288        let received = receiver.recv_async().await.unwrap();
1289        assert_eq!(received, 42);
1290    }
1291
1292    #[tokio::test]
1293    async fn test_sync_send_async_recv_with_timeout_success() {
1294        let (sender, receiver) = create_bounded_channel(1);
1295
1296        // Send synchronously in a separate thread
1297        let sender_clone = sender.clone();
1298        thread::spawn(move || {
1299            thread::sleep(Duration::from_millis(10));
1300            sender_clone.send(42).unwrap();
1301        });
1302
1303        // Receive asynchronously with timeout
1304        let received = receiver
1305            .recv_timeout_async(Duration::from_millis(100))
1306            .await
1307            .unwrap();
1308        assert_eq!(received, 42);
1309    }
1310
1311    #[tokio::test]
1312    async fn test_sync_send_async_recv_timeout() {
1313        let (_sender, receiver) = create_bounded_channel::<i32>(1);
1314
1315        // Try to receive with timeout - should timeout since nothing was sent
1316        let result = receiver.recv_timeout_async(Duration::from_millis(10)).await;
1317        assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
1318    }
1319
1320    #[tokio::test]
1321    async fn test_sync_send_async_recv_disconnected() {
1322        let (sender, receiver) = create_bounded_channel::<i32>(1);
1323
1324        // Drop sender to simulate disconnection
1325        drop(sender);
1326
1327        // Try to receive asynchronously - should return disconnected error
1328        let result = receiver.recv_async().await;
1329        assert!(matches!(result, Err(RecvError)));
1330    }
1331
1332    // Test 3: Asynchronous send, synchronous receive
1333    #[tokio::test]
1334    async fn test_async_send_sync_recv_success() {
1335        let (sender, receiver) = create_bounded_channel(1);
1336
1337        // Send asynchronously and receive synchronously in separate tasks
1338        let send_task = tokio::spawn(async move {
1339            sender.send_async(42).await.unwrap();
1340            // rx may have already dropped, so we need to ignore the error
1341            let _ = sender.send_async(43).await;
1342        });
1343
1344        let recv_task = tokio::spawn(async move {
1345            // Small delay to ensure send happens first
1346            tokio::time::sleep(Duration::from_millis(10)).await;
1347            receiver.recv().unwrap()
1348        });
1349
1350        let (_, received) = tokio::try_join!(send_task, recv_task).unwrap();
1351        assert_eq!(received, 42);
1352    }
1353
1354    #[tokio::test]
1355    async fn test_async_send_sync_recv_with_timeout_success() {
1356        let (sender, receiver) = create_bounded_channel(1);
1357
1358        // Send asynchronously
1359        let send_task = tokio::spawn(async move {
1360            sender.send_async(42).await.unwrap();
1361            // rx may have already dropped, so we need to ignore the error
1362            let _ = sender.send_async(43).await;
1363        });
1364
1365        let recv_task = tokio::spawn(async move {
1366            // Small delay to ensure send happens first
1367            tokio::time::sleep(Duration::from_millis(10)).await;
1368            receiver.recv_timeout(Duration::from_millis(100)).unwrap()
1369        });
1370
1371        let (_, received) = tokio::try_join!(send_task, recv_task).unwrap();
1372        assert_eq!(received, 42);
1373    }
1374
1375    #[tokio::test]
1376    async fn test_async_send_sync_recv_timeout() {
1377        let (_sender, receiver) = create_bounded_channel::<i32>(1);
1378
1379        // Try to receive with timeout - should timeout since nothing was sent
1380        let result =
1381            tokio::task::spawn_blocking(move || receiver.recv_timeout(Duration::from_millis(10)))
1382                .await
1383                .unwrap();
1384
1385        assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
1386    }
1387
1388    #[tokio::test]
1389    async fn test_async_send_with_timeout_success() {
1390        let (sender, receiver) = create_bounded_channel(1);
1391
1392        // Send with timeout should succeed when channel has capacity
1393        let result = sender
1394            .send_timeout_async(42, Duration::from_millis(100))
1395            .await;
1396        assert!(result.is_ok());
1397
1398        // Verify the message was sent
1399        let received = receiver.recv().unwrap();
1400        assert_eq!(received, 42);
1401    }
1402
1403    #[tokio::test]
1404    async fn test_async_send_timeout() {
1405        let (sender, _receiver) = create_bounded_channel(1);
1406
1407        // Fill the channel
1408        sender.send(1).unwrap();
1409
1410        // Try to send with a short timeout - should timeout
1411        let result = sender
1412            .send_timeout_async(2, Duration::from_millis(10))
1413            .await;
1414        assert!(matches!(result, Err(SendTimeoutError::Timeout(_))));
1415    }
1416
1417    #[tokio::test]
1418    async fn test_async_send_disconnected() {
1419        let (sender, receiver) = create_bounded_channel::<i32>(1);
1420
1421        // Drop receiver to simulate disconnection
1422        drop(receiver);
1423
1424        // Try to send asynchronously - should return disconnected error
1425        let result = sender.send_async(42).await;
1426        assert!(matches!(result, Err(SendError(_))));
1427    }
1428
1429    // Test 4: Asynchronous send, asynchronous receive
1430    #[tokio::test]
1431    async fn test_async_send_async_recv_success() {
1432        let (sender, receiver) = create_bounded_channel(1);
1433
1434        // Send and receive asynchronously
1435        let send_task = tokio::spawn(async move {
1436            sender.send_async(42).await.unwrap();
1437        });
1438
1439        let recv_task = tokio::spawn(async move { receiver.recv_async().await.unwrap() });
1440
1441        let (_, received) = tokio::try_join!(send_task, recv_task).unwrap();
1442        assert_eq!(received, 42);
1443    }
1444
1445    #[tokio::test]
1446    async fn test_async_send_async_recv_with_timeout_success() {
1447        let (sender, receiver) = create_bounded_channel(1);
1448
1449        // Send asynchronously
1450        let send_task = tokio::spawn(async move {
1451            tokio::time::sleep(Duration::from_millis(10)).await;
1452            sender.send_async(42).await.unwrap();
1453        });
1454
1455        let recv_task = tokio::spawn(async move {
1456            receiver
1457                .recv_timeout_async(Duration::from_millis(100))
1458                .await
1459                .unwrap()
1460        });
1461
1462        let (_, received) = tokio::try_join!(send_task, recv_task).unwrap();
1463        assert_eq!(received, 42);
1464    }
1465
1466    #[tokio::test]
1467    async fn test_async_send_async_recv_timeout() {
1468        let (_sender, receiver) = create_bounded_channel::<i32>(1);
1469
1470        // Try to receive with timeout - should timeout since nothing was sent
1471        let result = receiver.recv_timeout_async(Duration::from_millis(10)).await;
1472        assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
1473    }
1474
1475    #[tokio::test]
1476    async fn test_async_send_async_recv_both_timeout() {
1477        let (sender, _receiver) = create_bounded_channel(1);
1478
1479        // Fill the channel
1480        sender.send(1).unwrap();
1481
1482        // Try to send with timeout (should timeout)
1483        let send_result = sender
1484            .send_timeout_async(2, Duration::from_millis(10))
1485            .await;
1486        assert!(matches!(send_result, Err(SendTimeoutError::Timeout(_))));
1487
1488        // Try to receive from empty channel with timeout (should timeout)
1489        let (_sender2, receiver2) = create_bounded_channel::<i32>(1);
1490        let recv_result = receiver2
1491            .recv_timeout_async(Duration::from_millis(10))
1492            .await;
1493        assert!(matches!(recv_result, Err(RecvTimeoutError::Timeout)));
1494    }
1495
1496    #[tokio::test]
1497    async fn test_async_send_async_recv_disconnected() {
1498        let (sender, receiver) = create_bounded_channel::<i32>(1);
1499
1500        // Test send disconnection
1501        drop(receiver);
1502        let send_result = sender.send_async(42).await;
1503        assert!(matches!(send_result, Err(SendError(_))));
1504
1505        // Test receive disconnection
1506        let (sender2, receiver2) = create_bounded_channel::<i32>(1);
1507        drop(sender2);
1508        let recv_result = receiver2.recv_async().await;
1509        assert!(matches!(recv_result, Err(RecvError)));
1510    }
1511
1512    // Additional edge case tests
1513    #[tokio::test]
1514    async fn test_multiple_senders_single_receiver() {
1515        let (sender, receiver) = create_bounded_channel(1);
1516
1517        let sender1 = sender.clone();
1518        let sender2 = sender.clone();
1519
1520        // Send from multiple senders
1521        let send_task1 = tokio::spawn(async move {
1522            sender1.send_async(1).await.unwrap();
1523        });
1524
1525        let send_task2 = tokio::spawn(async move {
1526            sender2.send_async(2).await.unwrap();
1527        });
1528
1529        // Receive both messages
1530        let recv_task = tokio::spawn(async move {
1531            let mut received = vec![];
1532            received.push(receiver.recv_async().await.unwrap());
1533            received.push(receiver.recv_async().await.unwrap());
1534            received.sort(); // Sort since order is not guaranteed
1535            received
1536        });
1537
1538        let (_, _, received) = tokio::try_join!(send_task1, send_task2, recv_task).unwrap();
1539        assert_eq!(received, vec![1, 2]);
1540    }
1541
1542    #[tokio::test]
1543    async fn test_bounded_channel_backpressure() {
1544        let (sender, receiver) = create_bounded_channel(2);
1545
1546        // Fill the channel to capacity
1547        sender.send(1).unwrap();
1548        sender.send(2).unwrap();
1549
1550        // This send should block/timeout since channel is full
1551        let send_result = timeout(Duration::from_millis(10), sender.send_async(3)).await;
1552
1553        assert!(send_result.is_err()); // Should timeout
1554
1555        // Receive one item to make space
1556        let received = receiver.recv().unwrap();
1557        assert_eq!(received, 1);
1558
1559        // Now the send should succeed
1560        let send_result = sender.send_async(3).await;
1561        assert!(send_result.is_ok());
1562    }
1563
1564    #[test]
1565    fn test_sender_receiver_debug_clone() {
1566        let (sender, receiver) = create_bounded_channel::<i32>(1);
1567
1568        // Test Debug implementation
1569        let debug_str = format!("{:?}", sender);
1570        assert!(debug_str.contains("Sender"));
1571
1572        let debug_str = format!("{:?}", receiver);
1573        assert!(debug_str.contains("Receiver"));
1574
1575        // Test Clone implementation for Sender
1576        let _sender_clone = sender.clone();
1577
1578        // Test Clone implementation for Receiver
1579        let _receiver_clone = receiver.clone();
1580    }
1581
1582    #[test]
1583    fn test_deref_implementations() {
1584        let (sender, receiver) = create_bounded_channel(1);
1585
1586        // Test that we can use CrossbeamSender methods directly
1587        sender.send(42).unwrap();
1588
1589        // Test that we can use CrossbeamReceiver methods directly
1590        let received = receiver.recv().unwrap();
1591        assert_eq!(received, 42);
1592
1593        // Test try_send and try_recv
1594        let result = sender.try_send(43);
1595        assert!(result.is_ok());
1596
1597        let result = receiver.try_recv();
1598        assert_eq!(result.unwrap(), 43);
1599    }
1600
1601    // ===== SELECT TESTS =====
1602
1603    #[test]
1604    fn test_select_basic_recv() {
1605        let (tx1, rx1) = create_bounded_channel(1);
1606        let (tx2, rx2) = create_bounded_channel(1);
1607
1608        tx1.send(1).unwrap();
1609        tx2.send(2).unwrap();
1610
1611        let mut sel = Select::new();
1612        let idx1 = sel.recv(&rx1);
1613        let idx2 = sel.recv(&rx2);
1614
1615        let op = sel.select();
1616        match op.index() {
1617            i if i == idx1 => {
1618                let value = op.recv(&rx1).unwrap();
1619                assert_eq!(value, 1);
1620            }
1621            i if i == idx2 => {
1622                let value = op.recv(&rx2).unwrap();
1623                assert_eq!(value, 2);
1624            }
1625            _ => panic!("Unexpected index"),
1626        }
1627    }
1628
1629    #[test]
1630    fn test_select_basic_send() {
1631        let (tx1, rx1) = create_bounded_channel(1);
1632        let (tx2, rx2) = create_bounded_channel(1);
1633
1634        let mut sel = Select::new();
1635        let idx1 = sel.send(&tx1);
1636        let idx2 = sel.send(&tx2);
1637
1638        let op = sel.select();
1639        match op.index() {
1640            i if i == idx1 => {
1641                op.send(&tx1, 1).unwrap();
1642                assert_eq!(rx1.recv().unwrap(), 1);
1643            }
1644            i if i == idx2 => {
1645                op.send(&tx2, 2).unwrap();
1646                assert_eq!(rx2.recv().unwrap(), 2);
1647            }
1648            _ => panic!("Unexpected index"),
1649        }
1650    }
1651
1652    #[test]
1653    fn test_select_mixed_operations() {
1654        let (tx1, rx1) = create_bounded_channel(1);
1655        let (tx2, rx2) = create_bounded_channel(1);
1656
1657        // Send to first channel
1658        tx1.send(1).unwrap();
1659
1660        let mut sel = Select::new();
1661        let recv_idx = sel.recv(&rx1);
1662        let send_idx = sel.send(&tx2);
1663
1664        let op = sel.select();
1665        match op.index() {
1666            i if i == recv_idx => {
1667                let value = op.recv(&rx1).unwrap();
1668                assert_eq!(value, 1);
1669            }
1670            i if i == send_idx => {
1671                op.send(&tx2, 2).unwrap();
1672                assert_eq!(rx2.recv().unwrap(), 2);
1673            }
1674            _ => panic!("Unexpected index"),
1675        }
1676    }
1677
1678    #[test]
1679    fn test_select_try_select() {
1680        let (tx, rx) = create_bounded_channel::<i32>(1);
1681
1682        let mut sel = Select::new();
1683        let recv_idx = sel.recv(&rx);
1684
1685        // Should fail since no values are available
1686        assert!(sel.try_select().is_err());
1687
1688        // Send a value
1689        tx.send(42).unwrap();
1690
1691        // Now it should succeed
1692        let op = sel.try_select().unwrap();
1693        assert_eq!(op.index(), recv_idx);
1694        let value = op.recv(&rx).unwrap();
1695        assert_eq!(value, 42);
1696    }
1697
1698    #[test]
1699    fn test_select_timeout() {
1700        let (_tx, rx) = create_bounded_channel::<i32>(1);
1701
1702        let mut sel = Select::new();
1703        sel.recv(&rx);
1704
1705        // Should timeout since no values are available
1706        let result = sel.select_timeout(Duration::from_millis(10));
1707        assert!(result.is_err());
1708    }
1709
1710    #[test]
1711    fn test_select_timeout_success() {
1712        let (tx, rx) = create_bounded_channel(1);
1713
1714        // Send a value in a separate thread
1715        let tx_clone = tx.clone();
1716        thread::spawn(move || {
1717            thread::sleep(Duration::from_millis(50));
1718            tx_clone.send(42).unwrap();
1719        });
1720
1721        let mut sel = Select::new();
1722        let recv_idx = sel.recv(&rx);
1723
1724        // Should succeed within timeout
1725        let result = sel.select_timeout(Duration::from_millis(100));
1726        assert!(result.is_ok());
1727
1728        let op = result.unwrap();
1729        assert_eq!(op.index(), recv_idx);
1730        let value = op.recv(&rx).unwrap();
1731        assert_eq!(value, 42);
1732    }
1733
1734    #[test]
1735    fn test_select_ready() {
1736        let (tx, rx) = create_bounded_channel(1);
1737
1738        let mut sel = Select::new();
1739        let recv_idx = sel.recv(&rx);
1740
1741        // Send a value first
1742        tx.send(42).unwrap();
1743
1744        // Now the receive operation should be ready
1745        assert_eq!(sel.ready(), recv_idx);
1746    }
1747
1748    #[test]
1749    fn test_select_ready_timeout() {
1750        let (_tx, rx) = create_bounded_channel::<i32>(1);
1751
1752        let mut sel = Select::new();
1753        sel.recv(&rx);
1754
1755        // Should timeout since no values are available
1756        let result = sel.ready_timeout(Duration::from_millis(10));
1757        assert!(result.is_err());
1758    }
1759
1760    #[test]
1761    fn test_select_ready_timeout_success() {
1762        let (tx, rx) = create_bounded_channel(1);
1763
1764        // Send a value in a separate thread
1765        let tx_clone = tx.clone();
1766        thread::spawn(move || {
1767            thread::sleep(Duration::from_millis(50));
1768            tx_clone.send(42).unwrap();
1769        });
1770
1771        let mut sel = Select::new();
1772        let recv_idx = sel.recv(&rx);
1773
1774        // Should succeed within timeout
1775        let result = sel.ready_timeout(Duration::from_millis(100));
1776        assert!(result.is_ok());
1777        assert_eq!(result.unwrap(), recv_idx);
1778    }
1779
1780    #[test]
1781    fn test_select_remove() {
1782        let (tx1, rx1) = create_bounded_channel(1);
1783        let (tx2, rx2) = create_bounded_channel(1);
1784
1785        let mut sel = Select::new();
1786        let idx1 = sel.recv(&rx1);
1787        let idx2 = sel.recv(&rx2);
1788
1789        // Remove the first operation
1790        sel.remove(idx1);
1791
1792        // Send to both channels
1793        tx1.send(1).unwrap();
1794        tx2.send(2).unwrap();
1795
1796        // Only the second operation should be selected
1797        let op = sel.select();
1798        assert_eq!(op.index(), idx2);
1799        let value = op.recv(&rx2).unwrap();
1800        assert_eq!(value, 2);
1801    }
1802
1803    #[test]
1804    fn test_select_biased() {
1805        let (tx1, rx1) = create_bounded_channel(1);
1806        let (tx2, rx2) = create_bounded_channel(1);
1807
1808        tx1.send(1).unwrap();
1809        tx2.send(2).unwrap();
1810
1811        let mut sel = Select::new_biased();
1812        let idx1 = sel.recv(&rx1);
1813        let _idx2 = sel.recv(&rx2);
1814
1815        // With biased selection, the first operation should be selected
1816        let op = sel.select();
1817        assert_eq!(op.index(), idx1);
1818        let value = op.recv(&rx1).unwrap();
1819        assert_eq!(value, 1);
1820    }
1821
1822    #[test]
1823    fn test_select_multiple_channels() {
1824        let (tx1, rx1) = create_bounded_channel(1);
1825        let (tx2, rx2) = create_bounded_channel(1);
1826        let (tx3, rx3) = create_bounded_channel(1);
1827
1828        tx1.send(1).unwrap();
1829        tx2.send(2).unwrap();
1830        tx3.send(3).unwrap();
1831
1832        let mut sel = Select::new();
1833        let idx1 = sel.recv(&rx1);
1834        let idx2 = sel.recv(&rx2);
1835        let idx3 = sel.recv(&rx3);
1836
1837        let mut received = Vec::new();
1838
1839        // Select all three operations
1840        for _ in 0..3 {
1841            let op = sel.select();
1842            match op.index() {
1843                i if i == idx1 => {
1844                    let value = op.recv(&rx1).unwrap();
1845                    received.push(value);
1846                }
1847                i if i == idx2 => {
1848                    let value = op.recv(&rx2).unwrap();
1849                    received.push(value);
1850                }
1851                i if i == idx3 => {
1852                    let value = op.recv(&rx3).unwrap();
1853                    received.push(value);
1854                }
1855                _ => panic!("Unexpected index"),
1856            }
1857        }
1858
1859        received.sort();
1860        assert_eq!(received, vec![1, 2, 3]);
1861    }
1862
1863    #[test]
1864    fn test_select_send_blocking() {
1865        let (tx1, rx1) = create_bounded_channel(1);
1866        let (tx2, rx2) = create_bounded_channel(1);
1867
1868        // Fill both channels
1869        tx1.send(1).unwrap();
1870        tx2.send(2).unwrap();
1871
1872        let mut sel = Select::new();
1873        let _send_idx1 = sel.send(&tx1);
1874        let _send_idx2 = sel.send(&tx2);
1875
1876        // Both sends should block, but we can still select
1877        // This will block until one of the receivers consumes a value
1878        let tx1_clone = tx1.clone();
1879        let tx2_clone = tx2.clone();
1880        let rx1_clone = rx1.clone();
1881        let _rx2_clone = rx2.clone();
1882
1883        let sender_handle = thread::spawn(move || {
1884            let mut sel = Select::new();
1885            let send_idx1 = sel.send(&tx1_clone);
1886            let send_idx2 = sel.send(&tx2_clone);
1887
1888            let op = sel.select();
1889            match op.index() {
1890                i if i == send_idx1 => {
1891                    op.send(&tx1_clone, 3).unwrap();
1892                }
1893                i if i == send_idx2 => {
1894                    op.send(&tx2_clone, 4).unwrap();
1895                }
1896                _ => panic!("Unexpected index"),
1897            }
1898        });
1899
1900        // Consume a value to unblock one of the sends
1901        thread::sleep(Duration::from_millis(10));
1902        let _ = rx1_clone.recv().unwrap();
1903
1904        sender_handle.join().unwrap();
1905
1906        // Verify the new value was sent
1907        let value = rx1.recv().unwrap();
1908        assert_eq!(value, 3);
1909    }
1910
1911    #[test]
1912    fn test_select_receive_blocking() {
1913        let (tx1, rx1) = create_bounded_channel::<i32>(1);
1914        let (tx2, rx2) = create_bounded_channel::<i32>(1);
1915
1916        let mut sel = Select::new();
1917        let _recv_idx1 = sel.recv(&rx1);
1918        let _recv_idx2 = sel.recv(&rx2);
1919
1920        // Both receives should block, but we can still select
1921        // This will block until one of the senders sends a value
1922        let _tx1_clone = tx1.clone();
1923        let _tx2_clone = tx2.clone();
1924        let rx1_clone = rx1.clone();
1925        let rx2_clone = rx2.clone();
1926
1927        let receiver_handle = thread::spawn(move || {
1928            let mut sel = Select::new();
1929            let recv_idx1 = sel.recv(&rx1_clone);
1930            let recv_idx2 = sel.recv(&rx2_clone);
1931
1932            let op = sel.select();
1933            match op.index() {
1934                i if i == recv_idx1 => {
1935                    let value = op.recv(&rx1_clone).unwrap();
1936                    value
1937                }
1938                i if i == recv_idx2 => {
1939                    let value = op.recv(&rx2_clone).unwrap();
1940                    value
1941                }
1942                _ => panic!("Unexpected index"),
1943            }
1944        });
1945
1946        // Send a value to unblock one of the receives
1947        thread::sleep(Duration::from_millis(10));
1948        tx1.send(42).unwrap();
1949
1950        let received_value = receiver_handle.join().unwrap();
1951        assert_eq!(received_value, 42);
1952    }
1953
1954    #[test]
1955    fn test_select_disconnected_send() {
1956        let (tx, rx) = create_bounded_channel::<i32>(1);
1957
1958        // Drop the receiver
1959        drop(rx);
1960
1961        let mut sel = Select::new();
1962        let send_idx = sel.send(&tx);
1963
1964        let op = sel.select();
1965        assert_eq!(op.index(), send_idx);
1966
1967        // Send should fail with disconnected error
1968        let result = op.send(&tx, 42);
1969        assert!(result.is_err());
1970    }
1971
1972    #[test]
1973    fn test_select_disconnected_recv() {
1974        let (tx, rx) = create_bounded_channel::<i32>(1);
1975
1976        // Drop the sender
1977        drop(tx);
1978
1979        let mut sel = Select::new();
1980        let recv_idx = sel.recv(&rx);
1981
1982        let op = sel.select();
1983        assert_eq!(op.index(), recv_idx);
1984
1985        // Receive should fail with disconnected error
1986        let result = op.recv(&rx);
1987        assert!(result.is_err());
1988    }
1989
1990    #[test]
1991    fn test_select_default() {
1992        let (tx, rx) = create_bounded_channel(1);
1993        tx.send(42).unwrap();
1994
1995        let mut sel = Select::default(); // Test Default implementation
1996        let recv_idx = sel.recv(&rx);
1997
1998        let op = sel.select();
1999        assert_eq!(op.index(), recv_idx);
2000        let value = op.recv(&rx).unwrap();
2001        assert_eq!(value, 42);
2002    }
2003
2004    #[test]
2005    fn test_select_complex_scenario() {
2006        let (tx1, rx1) = create_bounded_channel(1);
2007        let (tx2, rx2) = create_bounded_channel(1);
2008        let (tx3, rx3) = create_bounded_channel(1);
2009
2010        // Send to first two channels
2011        tx1.send(1).unwrap();
2012        tx2.send(2).unwrap();
2013
2014        let mut sel = Select::new();
2015        let recv_idx1 = sel.recv(&rx1);
2016        let recv_idx2 = sel.recv(&rx2);
2017        let send_idx3 = sel.send(&tx3);
2018
2019        let mut received = Vec::new();
2020
2021        // First selection should be one of the receives
2022        let op = sel.select();
2023        match op.index() {
2024            i if i == recv_idx1 => {
2025                let value = op.recv(&rx1).unwrap();
2026                received.push(value);
2027            }
2028            i if i == recv_idx2 => {
2029                let value = op.recv(&rx2).unwrap();
2030                received.push(value);
2031            }
2032            _ => panic!("Unexpected index for first selection"),
2033        }
2034
2035        // Second selection should be the other receive
2036        let op = sel.select();
2037        match op.index() {
2038            i if i == recv_idx1 => {
2039                let value = op.recv(&rx1).unwrap();
2040                received.push(value);
2041            }
2042            i if i == recv_idx2 => {
2043                let value = op.recv(&rx2).unwrap();
2044                received.push(value);
2045            }
2046            _ => panic!("Unexpected index for second selection"),
2047        }
2048
2049        // Third selection should be the send
2050        let op = sel.select();
2051        assert_eq!(op.index(), send_idx3);
2052        op.send(&tx3, 3).unwrap();
2053
2054        // Verify all operations completed correctly
2055        received.sort();
2056        assert_eq!(received, vec![1, 2]);
2057        assert_eq!(rx3.recv().unwrap(), 3);
2058    }
2059}