linch/schannel.rs
1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5 time::{Duration, Instant},
6};
7
8use crate::error::{
9 ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, SendError,
10 SendTimeoutError, TryRecvError, TrySelectError, TrySendError,
11};
12use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
13use futures::Stream;
14use pin_project_lite::pin_project;
15
16/// Creates a bounded channel with the specified capacity using the schannel implementation.
17///
18/// This channel is a wrapper around the crossbeam channel that provides a more
19/// ergonomic API for sending and receiving items with both synchronous and asynchronous operations.
20///
21/// # Performance Characteristics
22///
23/// The asynchronous send/receive operations in this implementation are optimized for high throughput
24/// scenarios. They use active polling which makes them suitable for situations where:
25///
26/// * Active async polling is acceptable and desired
27/// * Throughput and latency are more important than CPU efficiency
28/// * You want maximum performance for benchmarking against other channel implementations
29///
30/// This implementation does not expect channels to remain empty for extended periods and
31/// will continuously poll when operations are pending.
32///
33/// # Arguments
34///
35/// * `capacity` - The capacity of the channel buffer. Must be greater than 0.
36///
37/// # Returns
38///
39/// A tuple containing a [`Sender`] and [`Receiver`] pair.
40///
41/// # Panics
42///
43/// Panics if `capacity` is 0.
44///
45/// # Examples
46///
47/// ```rust
48/// use linch::schannel;
49///
50/// // Create a channel with capacity 10
51/// let (sender, receiver) = schannel::bounded(10);
52///
53/// // Send synchronously
54/// sender.send(42).unwrap();
55///
56/// // Receive synchronously
57/// let value = receiver.recv().unwrap();
58/// assert_eq!(value, 42);
59/// ```
60///
61/// # Async Example
62///
63/// ```rust
64/// use linch::schannel;
65///
66/// # tokio_test::block_on(async {
67/// let (sender, receiver) = schannel::bounded(10);
68///
69/// // Send asynchronously
70/// sender.send_async(42).await.unwrap();
71///
72/// // Receive asynchronously
73/// let value = receiver.recv_async().await.unwrap();
74/// assert_eq!(value, 42);
75/// # });
76/// ```
77pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
78 assert!(capacity > 0);
79 let (tx, rx) = crossbeam_channel::bounded(capacity);
80 (Sender::new(tx), Receiver::new(rx))
81}
82
83/// Creates an unbounded channel using the schannel implementation.
84///
85/// This channel is a wrapper around the crossbeam unbounded channel that provides a more
86/// ergonomic API for sending and receiving items with both synchronous and asynchronous operations.
87///
88/// # Performance Characteristics
89///
90/// The asynchronous send/receive operations in this implementation are optimized for high throughput
91/// scenarios. They use active polling which makes them suitable for situations where:
92///
93/// * Active async polling is acceptable and desired
94/// * Throughput and latency are more important than CPU efficiency
95/// * You want maximum performance for benchmarking against other channel implementations
96///
97/// This implementation does not expect channels to remain empty for extended periods and
98/// will continuously poll when operations are pending.
99///
100/// Unlike bounded channels, unbounded channels have no capacity limit, so send operations
101/// will never block due to a full buffer. However, this can lead to unbounded memory growth
102/// if the receiver cannot keep up with the sender.
103///
104/// # Returns
105///
106/// A tuple containing a [`Sender`] and [`Receiver`] pair.
107///
108/// # Examples
109///
110/// ```rust
111/// use linch::schannel;
112///
113/// // Create an unbounded channel
114/// let (sender, receiver) = schannel::unbounded();
115///
116/// // Send synchronously - never blocks
117/// sender.send(42).unwrap();
118///
119/// // Receive synchronously
120/// let value = receiver.recv().unwrap();
121/// assert_eq!(value, 42);
122/// ```
123///
124/// # Async Example
125///
126/// ```rust
127/// use linch::schannel;
128///
129/// # tokio_test::block_on(async {
130/// let (sender, receiver) = schannel::unbounded();
131///
132/// // Send asynchronously - never blocks
133/// sender.send_async(42).await.unwrap();
134///
135/// // Receive asynchronously
136/// let value = receiver.recv_async().await.unwrap();
137/// assert_eq!(value, 42);
138/// # });
139/// ```
140pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
141 let (tx, rx) = crossbeam_channel::unbounded();
142 (Sender::new(tx), Receiver::new(rx))
143}
144
145/// The sending half of an schannel channel.
146///
147/// This struct allows sending values both synchronously and asynchronously using
148/// an optimized implementation designed for high throughput scenarios.
149///
150/// Senders can be cloned to create multiple sending endpoints for the same channel.
151/// All synchronous crossbeam channel operations are available through [`Deref`].
152///
153/// # Examples
154///
155/// ```rust
156/// use linch::schannel;
157///
158/// let (sender, receiver) = schannel::bounded(5);
159///
160/// // Send synchronously
161/// sender.send(42).unwrap();
162///
163/// // Clone the sender
164/// let sender2 = sender.clone();
165/// sender2.send(43).unwrap();
166/// ```
167#[derive(Debug)]
168pub struct Sender<T> {
169 tx: CrossbeamSender<T>,
170}
171
172impl<T> Clone for Sender<T> {
173 fn clone(&self) -> Self {
174 Self {
175 tx: self.tx.clone(),
176 }
177 }
178}
179
180impl<T> Sender<T> {
181 /// Creates a new sender from the underlying crossbeam sender.
182 ///
183 /// This is typically not called directly by users, but rather through
184 /// [`bounded`].
185 pub fn new(tx: CrossbeamSender<T>) -> Self {
186 Self { tx }
187 }
188
189 /// Sends a value synchronously.
190 ///
191 /// This method blocks until there is space in the channel buffer or all receivers have been dropped.
192 ///
193 /// # Arguments
194 ///
195 /// * `value` - The value to send
196 ///
197 /// # Returns
198 ///
199 /// * `Ok(())` if the value was sent successfully
200 /// * `Err(SendError(value))` if all receivers have been dropped
201 ///
202 /// # Examples
203 ///
204 /// ```rust
205 /// use linch::schannel;
206 ///
207 /// let (sender, receiver) = schannel::bounded(1);
208 /// sender.send(42).unwrap();
209 /// assert_eq!(receiver.recv().unwrap(), 42);
210 /// ```
211 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
212 Ok(self.tx.send(value)?)
213 }
214
215 /// Sends a value synchronously with a timeout.
216 ///
217 /// This method blocks until there is space in the channel buffer, the timeout
218 /// expires, or all receivers have been dropped.
219 ///
220 /// # Arguments
221 ///
222 /// * `value` - The value to send
223 /// * `timeout` - The maximum duration to wait
224 ///
225 /// # Returns
226 ///
227 /// * `Ok(())` if the value was sent successfully
228 /// * `Err(SendTimeoutError::Timeout(value))` if the timeout expired
229 /// * `Err(SendTimeoutError::Disconnected(value))` if all receivers have been dropped
230 ///
231 /// # Examples
232 ///
233 /// ```rust
234 /// use linch::schannel;
235 /// use std::time::Duration;
236 ///
237 /// let (sender, _receiver) = schannel::bounded(1);
238 /// sender.send(1).unwrap(); // Fill the buffer
239 ///
240 /// // This will timeout since the buffer is full
241 /// let result = sender.send_timeout(2, Duration::from_millis(10));
242 /// assert!(result.is_err());
243 /// ```
244 pub fn send_timeout(&self, value: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
245 Ok(self.tx.send_timeout(value, timeout)?)
246 }
247
248 /// Attempts to send a value without blocking.
249 ///
250 /// This method returns immediately without blocking.
251 ///
252 /// # Arguments
253 ///
254 /// * `value` - The value to send
255 ///
256 /// # Returns
257 ///
258 /// * `Ok(())` if the value was sent successfully
259 /// * `Err(TrySendError::Full(value))` if the channel is full
260 /// * `Err(TrySendError::Disconnected(value))` if all receivers have been dropped
261 ///
262 /// # Examples
263 ///
264 /// ```rust
265 /// use linch::schannel;
266 ///
267 /// let (sender, receiver) = schannel::bounded(1);
268 /// sender.send(1).unwrap(); // Fill the buffer
269 ///
270 /// // This will fail since the buffer is full
271 /// let result = sender.try_send(2);
272 /// assert!(result.is_err());
273 /// ```
274 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
275 Ok(self.tx.try_send(value)?)
276 }
277
278 /// Sends a value asynchronously with a timeout.
279 ///
280 /// The timeout is calculated from the time this method is called, not when
281 /// the future is first polled.
282 ///
283 /// # Arguments
284 ///
285 /// * `item` - The value to send
286 /// * `timeout` - The maximum duration to wait
287 ///
288 /// # Returns
289 ///
290 /// A [`SendTimeoutFut`] that resolves to:
291 /// * `Ok(())` if the value was sent successfully
292 /// * `Err(SendTimeoutError::Timeout(item))` if the timeout expired
293 /// * `Err(SendTimeoutError::Disconnected(item))` if all receivers have been dropped
294 ///
295 /// # Examples
296 ///
297 /// ```rust
298 /// use linch::schannel;
299 /// use std::time::Duration;
300 ///
301 /// # tokio_test::block_on(async {
302 /// let (sender, _receiver) = schannel::bounded(1);
303 /// sender.send(1).unwrap(); // Fill the buffer
304 ///
305 /// // This will timeout since the buffer is full
306 /// let result = sender.send_timeout_async(2, Duration::from_millis(10)).await;
307 /// assert!(result.is_err());
308 /// # });
309 /// ```
310 pub fn send_timeout_async(&self, item: T, timeout: Duration) -> SendTimeoutFut<'_, T> {
311 SendTimeoutFut {
312 tx: self,
313 item: Some(item),
314 deadline: Some(Instant::now() + timeout),
315 }
316 }
317
318 /// Sends a value asynchronously.
319 ///
320 /// This method returns a future that will complete when there is space in the
321 /// channel buffer or when all receivers have been dropped.
322 ///
323 /// # Arguments
324 ///
325 /// * `item` - The value to send
326 ///
327 /// # Returns
328 ///
329 /// A [`SendFut`] that resolves to:
330 /// * `Ok(())` if the value was sent successfully
331 /// * `Err(SendError(item))` if all receivers have been dropped
332 ///
333 /// # Examples
334 ///
335 /// ```rust
336 /// use linch::schannel;
337 ///
338 /// # tokio_test::block_on(async {
339 /// let (sender, receiver) = schannel::bounded(1);
340 /// sender.send_async(42).await.unwrap();
341 /// assert_eq!(receiver.recv().unwrap(), 42);
342 /// # });
343 /// ```
344 pub fn send_async(&self, item: T) -> SendFut<'_, T> {
345 SendFut {
346 fut: SendTimeoutFut {
347 tx: self,
348 item: Some(item),
349 deadline: None,
350 },
351 }
352 }
353}
354
355impl<T> From<CrossbeamSender<T>> for Sender<T> {
356 fn from(tx: CrossbeamSender<T>) -> Self {
357 Self { tx }
358 }
359}
360
361/// A future representing an asynchronous send operation.
362///
363/// This future is created by the [`send_async`](Sender::send_async) method and will
364/// complete when the value has been sent or when all receivers have been dropped.
365///
366/// # Examples
367///
368/// ```rust
369/// use linch::schannel;
370///
371/// # tokio_test::block_on(async {
372/// let (sender, receiver) = schannel::bounded(1);
373/// let send_fut = sender.send_async(42);
374/// send_fut.await.unwrap();
375/// assert_eq!(receiver.recv().unwrap(), 42);
376/// # });
377/// ```
378pub struct SendFut<'a, T> {
379 fut: SendTimeoutFut<'a, T>,
380}
381
382impl<'a, T> SendFut<'a, T> {
383 pub fn new(tx: &'a Sender<T>, item: T, timeout: Option<Duration>) -> Self {
384 Self {
385 fut: SendTimeoutFut {
386 tx,
387 item: Some(item),
388 deadline: timeout.map(|d| Instant::now() + d),
389 },
390 }
391 }
392}
393
394impl<'a, T> Future for SendFut<'a, T> {
395 type Output = Result<(), SendError<T>>;
396
397 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
398 match Pin::new(&mut self.fut).poll(cx) {
399 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
400 Poll::Ready(Err(SendTimeoutError::Timeout(_))) => {
401 unreachable!("SendTimeoutError::Timeout should not be returned by SendFut");
402 }
403 Poll::Ready(Err(SendTimeoutError::Disconnected(item))) => {
404 Poll::Ready(Err(SendError(item)))
405 }
406 Poll::Pending => Poll::Pending,
407 }
408 }
409}
410
411pin_project! {
412 /// A future representing an asynchronous send operation with a timeout.
413 ///
414 /// This future is created by the [`send_timeout_async`](Sender::send_timeout_async) method and will
415 /// complete when the value has been sent, the timeout expires, or when all receivers have been dropped.
416 ///
417 /// # Examples
418 ///
419 /// ```rust
420 /// use linch::schannel;
421 /// use std::time::Duration;
422 ///
423 /// # tokio_test::block_on(async {
424 /// let (sender, receiver) = schannel::bounded(1);
425 /// let timeout_fut = sender.send_timeout_async(42, Duration::from_secs(1));
426 /// timeout_fut.await.unwrap();
427 /// assert_eq!(receiver.recv().unwrap(), 42);
428 /// # });
429 /// ```
430 pub struct SendTimeoutFut<'a, T> {
431 tx: &'a Sender<T>,
432 item: Option<T>,
433 deadline: Option<Instant>,
434 }
435}
436
437// this is not very efficient. we should not always take/replace the item.
438// but in order to avoid that we would need to pin the item, send a pointer to the item,
439// and then have the receiver take the item.
440// this is not worth the effort right now.
441impl<'a, T> Future for SendTimeoutFut<'a, T> {
442 type Output = Result<(), SendTimeoutError<T>>;
443
444 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
445 let this = self.project();
446
447 if let Some(deadline) = this.deadline {
448 if deadline <= &mut Instant::now() {
449 return Poll::Ready(Err(SendTimeoutError::Timeout(this.item.take().unwrap())));
450 }
451 }
452
453 match this.tx.try_send(this.item.take().unwrap()) {
454 Ok(_) => Poll::Ready(Ok(())),
455 Err(e) => match e {
456 TrySendError::Disconnected(item) => {
457 Poll::Ready(Err(SendTimeoutError::Disconnected(item)))
458 }
459 TrySendError::Full(item) => {
460 this.item.replace(item);
461 cx.waker().wake_by_ref();
462 Poll::Pending
463 }
464 },
465 }
466 }
467}
468
469/// The receiving half of an schannel channel.
470///
471/// This struct allows receiving values both synchronously and asynchronously using
472/// an optimized implementation designed for high throughput scenarios.
473///
474/// Receivers can be cloned to create multiple receiving endpoints for the same channel.
475/// All synchronous crossbeam channel operations are available through [`Deref`].
476/// The receiver can also be used as a [`Stream`] for async iteration.
477///
478/// # Examples
479///
480/// ```rust
481/// use linch::schannel;
482///
483/// let (sender, receiver) = schannel::bounded(5);
484/// sender.send(42).unwrap();
485///
486/// // Receive synchronously
487/// let value = receiver.recv().unwrap();
488/// assert_eq!(value, 42);
489///
490/// // Clone the receiver
491/// let receiver2 = receiver.clone();
492/// ```
493#[derive(Debug)]
494pub struct Receiver<T> {
495 rx: CrossbeamReceiver<T>,
496}
497
498impl<T> Clone for Receiver<T> {
499 fn clone(&self) -> Self {
500 Self {
501 rx: self.rx.clone(),
502 }
503 }
504}
505
506impl<T> Receiver<T> {
507 /// Creates a new receiver from the underlying crossbeam receiver.
508 ///
509 /// This is typically not called directly by users, but rather through
510 /// [`bounded`].
511 pub fn new(rx: CrossbeamReceiver<T>) -> Self {
512 Self { rx }
513 }
514
515 /// Receives a value synchronously.
516 ///
517 /// This method blocks until a value is available in the channel or all senders have been dropped.
518 ///
519 /// # Returns
520 ///
521 /// * `Ok(value)` if a value was received successfully
522 /// * `Err(RecvError)` if all senders have been dropped and the channel is empty
523 ///
524 /// # Examples
525 ///
526 /// ```rust
527 /// use linch::schannel;
528 ///
529 /// let (sender, receiver) = schannel::bounded(1);
530 /// sender.send(42).unwrap();
531 /// assert_eq!(receiver.recv().unwrap(), 42);
532 /// ```
533 pub fn recv(&self) -> Result<T, RecvError> {
534 Ok(self.rx.recv()?)
535 }
536
537 /// Receives a value synchronously with a timeout.
538 ///
539 /// This method blocks until a value is available in the channel, the timeout
540 /// expires, or all senders have been dropped.
541 ///
542 /// # Arguments
543 ///
544 /// * `timeout` - The maximum duration to wait
545 ///
546 /// # Returns
547 ///
548 /// * `Ok(value)` if a value was received successfully
549 /// * `Err(RecvTimeoutError::Timeout)` if the timeout expired
550 /// * `Err(RecvTimeoutError::Disconnected)` if all senders have been dropped
551 ///
552 /// # Examples
553 ///
554 /// ```rust
555 /// use linch::schannel;
556 /// use std::time::Duration;
557 ///
558 /// let (_sender, receiver) = schannel::bounded::<i32>(1);
559 ///
560 /// // This will timeout since no values are sent
561 /// let result = receiver.recv_timeout(Duration::from_millis(10));
562 /// assert!(result.is_err());
563 /// ```
564 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
565 Ok(self.rx.recv_timeout(timeout)?)
566 }
567
568 /// Attempts to receive a value without blocking.
569 ///
570 /// This method returns immediately without blocking.
571 ///
572 /// # Returns
573 ///
574 /// * `Ok(value)` if a value was received successfully
575 /// * `Err(TryRecvError::Empty)` if the channel is empty but not closed
576 /// * `Err(TryRecvError::Disconnected)` if all senders have been dropped and the channel is empty
577 ///
578 /// # Examples
579 ///
580 /// ```rust
581 /// use linch::schannel;
582 ///
583 /// let (sender, receiver) = schannel::bounded(1);
584 ///
585 /// // This will fail since no values are available
586 /// let result = receiver.try_recv();
587 /// assert!(result.is_err());
588 ///
589 /// sender.send(42).unwrap();
590 /// assert_eq!(receiver.try_recv().unwrap(), 42);
591 /// ```
592 pub fn try_recv(&self) -> Result<T, TryRecvError> {
593 Ok(self.rx.try_recv()?)
594 }
595
596 /// Receives a value asynchronously with a timeout.
597 ///
598 /// The timeout is calculated from the time this method is called, not when
599 /// the future is first polled.
600 ///
601 /// # Arguments
602 ///
603 /// * `timeout` - The maximum duration to wait
604 ///
605 /// # Returns
606 ///
607 /// A [`RecvTimeoutFut`] that resolves to:
608 /// * `Ok(value)` if a value was received successfully
609 /// * `Err(RecvTimeoutError::Timeout)` if the timeout expired
610 /// * `Err(RecvTimeoutError::Disconnected)` if all senders have been dropped
611 ///
612 /// # Examples
613 ///
614 /// ```rust
615 /// use linch::schannel;
616 /// use std::time::Duration;
617 ///
618 /// # tokio_test::block_on(async {
619 /// let (sender, receiver) = schannel::bounded(1);
620 /// sender.send(42).unwrap();
621 /// let timeout_fut = receiver.recv_timeout_async(Duration::from_secs(1));
622 /// assert_eq!(timeout_fut.await.unwrap(), 42);
623 /// # });
624 /// ```
625 pub fn recv_timeout_async(&self, timeout: Duration) -> RecvTimeoutFut<'_, T> {
626 RecvTimeoutFut {
627 rx: self,
628 deadline: Some(Instant::now() + timeout),
629 }
630 }
631
632 /// Receives a value asynchronously.
633 ///
634 /// This method returns a future that will complete when a value is available
635 /// in the channel or when all senders have been dropped.
636 ///
637 /// # Returns
638 ///
639 /// A [`RecvFut`] that resolves to:
640 /// * `Ok(value)` if a value was received successfully
641 /// * `Err(RecvError)` if all senders have been dropped and the channel is empty
642 ///
643 /// # Examples
644 ///
645 /// ```rust
646 /// use linch::schannel;
647 ///
648 /// # tokio_test::block_on(async {
649 /// let (sender, receiver) = schannel::bounded(1);
650 /// sender.send(42).unwrap();
651 /// assert_eq!(receiver.recv_async().await.unwrap(), 42);
652 /// # });
653 /// ```
654 pub fn recv_async(&self) -> RecvFut<'_, T> {
655 RecvFut {
656 fut: RecvTimeoutFut {
657 rx: self,
658 deadline: None,
659 },
660 }
661 }
662
663 /// Converts the receiver into a stream.
664 ///
665 /// This allows using the receiver with async stream utilities and for iteration.
666 ///
667 /// # Returns
668 ///
669 /// A [`RecvStream`] that yields values from the channel
670 ///
671 /// # Examples
672 ///
673 /// ```rust
674 /// use linch::schannel;
675 /// use futures::StreamExt;
676 ///
677 /// # tokio_test::block_on(async {
678 /// let (sender, receiver) = schannel::bounded(3);
679 /// sender.send(1).unwrap();
680 /// sender.send(2).unwrap();
681 /// sender.send(3).unwrap();
682 /// drop(sender); // Close the channel
683 ///
684 /// let mut stream = receiver.into_stream();
685 /// let values: Vec<_> = stream.collect().await;
686 /// assert_eq!(values, vec![1, 2, 3]);
687 /// # });
688 /// ```
689 pub fn into_stream(self) -> RecvStream<T> {
690 RecvStream { rx: self }
691 }
692}
693
694impl<T> From<CrossbeamReceiver<T>> for Receiver<T> {
695 fn from(rx: CrossbeamReceiver<T>) -> Self {
696 Self { rx }
697 }
698}
699
700pin_project! {
701 /// A future representing an asynchronous receive operation with a timeout.
702 ///
703 /// This future is created by the [`recv_timeout_async`](Receiver::recv_timeout_async) method and will
704 /// complete when a value is available, the timeout expires, or when all senders have been dropped.
705 ///
706 /// # Examples
707 ///
708 /// ```rust
709 /// use linch::schannel;
710 /// use std::time::Duration;
711 ///
712 /// # tokio_test::block_on(async {
713 /// let (sender, receiver) = schannel::bounded(1);
714 /// sender.send(42).unwrap();
715 /// let timeout_fut = receiver.recv_timeout_async(Duration::from_secs(1));
716 /// assert_eq!(timeout_fut.await.unwrap(), 42);
717 /// # });
718 /// ```
719 pub struct RecvTimeoutFut<'a, T> {
720 rx: &'a Receiver<T>,
721 deadline: Option<Instant>,
722 }
723}
724
725impl<'a, T> Future for RecvTimeoutFut<'a, T> {
726 type Output = Result<T, RecvTimeoutError>;
727
728 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
729 let this = self.project();
730
731 if let Some(deadline) = this.deadline {
732 if deadline <= &mut Instant::now() {
733 return Poll::Ready(Err(RecvTimeoutError::Timeout));
734 }
735 }
736
737 match this.rx.try_recv() {
738 Ok(item) => Poll::Ready(Ok(item)),
739 Err(e) => match e {
740 TryRecvError::Disconnected => Poll::Ready(Err(RecvTimeoutError::Disconnected)),
741 TryRecvError::Empty => {
742 cx.waker().wake_by_ref();
743 Poll::Pending
744 }
745 },
746 }
747 }
748}
749
750/// A future representing an asynchronous receive operation.
751///
752/// This future is created by the [`recv_async`](Receiver::recv_async) method and will
753/// complete when a value is available or when all senders have been dropped.
754///
755/// # Examples
756///
757/// ```rust
758/// use linch::schannel;
759///
760/// # tokio_test::block_on(async {
761/// let (sender, receiver) = schannel::bounded(1);
762/// sender.send(42).unwrap();
763/// let recv_fut = receiver.recv_async();
764/// assert_eq!(recv_fut.await.unwrap(), 42);
765/// # });
766/// ```
767pub struct RecvFut<'a, T> {
768 fut: RecvTimeoutFut<'a, T>,
769}
770
771impl<'a, T> Future for RecvFut<'a, T> {
772 type Output = Result<T, RecvError>;
773
774 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
775 match Pin::new(&mut self.fut).poll(cx) {
776 Poll::Ready(Ok(item)) => Poll::Ready(Ok(item)),
777 Poll::Ready(Err(RecvTimeoutError::Timeout)) => {
778 unreachable!("RecvTimeoutError::Timeout should not be returned by RecvTimeoutFut");
779 }
780 Poll::Ready(Err(RecvTimeoutError::Disconnected)) => Poll::Ready(Err(RecvError)),
781 Poll::Pending => Poll::Pending,
782 }
783 }
784}
785
786/// A stream that yields values from a channel receiver.
787///
788/// This stream is created by the [`into_stream`](Receiver::into_stream) method and implements
789/// the [`Stream`] trait for async iteration over channel values.
790///
791/// The stream will yield values until all senders are dropped and the channel is empty.
792///
793/// # Examples
794///
795/// ```rust
796/// use linch::schannel;
797/// use futures::StreamExt;
798///
799/// # tokio_test::block_on(async {
800/// let (sender, receiver) = schannel::bounded(3);
801/// sender.send(1).unwrap();
802/// sender.send(2).unwrap();
803/// drop(sender); // Close the channel
804///
805/// let mut stream = receiver.into_stream();
806/// let mut values = Vec::new();
807/// while let Some(value) = stream.next().await {
808/// values.push(value);
809/// }
810/// assert_eq!(values, vec![1, 2]);
811/// # });
812/// ```
813pub struct RecvStream<T> {
814 rx: Receiver<T>,
815}
816
817impl<T> Stream for RecvStream<T> {
818 type Item = T;
819
820 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
821 let mut fut = self.rx.recv_async();
822
823 match Pin::new(&mut fut).poll(cx) {
824 Poll::Ready(Ok(item)) => Poll::Ready(Some(item)),
825 Poll::Ready(Err(_e)) => Poll::Ready(None),
826 Poll::Pending => Poll::Pending,
827 }
828 }
829}
830
831/// A handle for selecting over multiple schannel operations.
832///
833/// This allows you to wait on multiple send or receive operations simultaneously
834/// and get notified when any one of them becomes ready.
835///
836/// # Examples
837///
838/// ```rust
839/// use linch::schannel;
840///
841/// let (tx1, rx1) = schannel::bounded(1);
842/// let (tx2, rx2) = schannel::bounded(1);
843///
844/// tx1.send(1).unwrap();
845/// tx2.send(2).unwrap();
846///
847/// let mut sel = schannel::Select::new();
848/// let idx1 = sel.recv(&rx1);
849/// let idx2 = sel.recv(&rx2);
850///
851/// let op = sel.select();
852/// match op.index() {
853/// i if i == idx1 => {
854/// let value = op.recv(&rx1).unwrap();
855/// println!("Received {} from first channel", value);
856/// },
857/// i if i == idx2 => {
858/// let value = op.recv(&rx2).unwrap();
859/// println!("Received {} from second channel", value);
860/// },
861/// _ => unreachable!(),
862/// }
863/// ```
864pub struct Select<'a> {
865 select: crossbeam_channel::Select<'a>,
866}
867
868impl<'a> Default for Select<'a> {
869 fn default() -> Self {
870 Self::new()
871 }
872}
873
874impl<'a> Select<'a> {
875 /// Creates a new select handle.
876 ///
877 /// Operations will be selected in a pseudo-random order to ensure fairness.
878 ///
879 /// # Examples
880 ///
881 /// ```rust
882 /// use linch::schannel;
883 ///
884 /// let mut sel = schannel::Select::new();
885 /// ```
886 pub fn new() -> Self {
887 let select = crossbeam_channel::Select::new();
888 Self { select }
889 }
890
891 /// Creates a new biased select handle.
892 ///
893 /// Operations will be selected in the order they were added, giving
894 /// priority to earlier operations.
895 ///
896 /// # Examples
897 ///
898 /// ```rust
899 /// use linch::schannel;
900 ///
901 /// let mut sel = schannel::Select::new_biased();
902 /// ```
903 pub fn new_biased() -> Self {
904 let select = crossbeam_channel::Select::new_biased();
905 Self { select }
906 }
907
908 /// Adds a send operation to the select.
909 ///
910 /// Returns the index of this operation, which can be used to identify
911 /// which operation was selected.
912 ///
913 /// # Arguments
914 ///
915 /// * `sender` - The sender to add to the select
916 ///
917 /// # Returns
918 ///
919 /// The index of this send operation
920 ///
921 /// # Examples
922 ///
923 /// ```rust
924 /// use linch::schannel;
925 ///
926 /// let (tx, _rx) = schannel::bounded::<i32>(1);
927 /// let mut sel = schannel::Select::new();
928 /// let send_idx = sel.send(&tx);
929 /// ```
930 pub fn send<T>(&mut self, sender: &'a Sender<T>) -> usize {
931 self.select.send(&sender.tx)
932 }
933
934 /// Adds a receive operation to the select.
935 ///
936 /// Returns the index of this operation, which can be used to identify
937 /// which operation was selected.
938 ///
939 /// # Arguments
940 ///
941 /// * `receiver` - The receiver to add to the select
942 ///
943 /// # Returns
944 ///
945 /// The index of this receive operation
946 ///
947 /// # Examples
948 ///
949 /// ```rust
950 /// use linch::schannel;
951 ///
952 /// let (_tx, rx) = schannel::bounded::<i32>(1);
953 /// let mut sel = schannel::Select::new();
954 /// let recv_idx = sel.recv(&rx);
955 /// ```
956 pub fn recv<T>(&mut self, receiver: &'a Receiver<T>) -> usize {
957 self.select.recv(&receiver.rx)
958 }
959
960 /// Blocks until one of the operations becomes ready and returns it.
961 ///
962 /// # Returns
963 ///
964 /// A [`SelectedOperation`] that can be used to complete the operation
965 ///
966 /// # Examples
967 ///
968 /// ```rust
969 /// use linch::schannel;
970 ///
971 /// let (tx, rx) = schannel::bounded(1);
972 /// tx.send(42).unwrap();
973 ///
974 /// let mut sel = schannel::Select::new();
975 /// let recv_idx = sel.recv(&rx);
976 ///
977 /// let op = sel.select();
978 /// if op.index() == recv_idx {
979 /// let value = op.recv(&rx).unwrap();
980 /// assert_eq!(value, 42);
981 /// }
982 /// ```
983 pub fn select(&mut self) -> SelectedOperation<'a> {
984 self.select.select().into()
985 }
986
987 /// Attempts to select a ready operation without blocking.
988 ///
989 /// # Returns
990 ///
991 /// * `Ok(SelectedOperation)` if an operation was ready
992 /// * `Err(TrySelectError)` if no operations were ready
993 ///
994 /// # Examples
995 ///
996 /// ```rust
997 /// use linch::schannel;
998 ///
999 /// let (tx, rx) = schannel::bounded::<i32>(1);
1000 /// let mut sel = schannel::Select::new();
1001 /// let recv_idx = sel.recv(&rx);
1002 ///
1003 /// // This will return an error since no values are available
1004 /// assert!(sel.try_select().is_err());
1005 ///
1006 /// tx.send(42).unwrap();
1007 /// // Now it will succeed
1008 /// let op = sel.try_select().unwrap();
1009 /// if op.index() == recv_idx {
1010 /// let value = op.recv(&rx).unwrap();
1011 /// assert_eq!(value, 42);
1012 /// }
1013 /// ```
1014 pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
1015 Ok(SelectedOperation::from(self.select.try_select()?))
1016 }
1017
1018 /// Blocks until one of the operations becomes ready or the timeout expires.
1019 ///
1020 /// # Arguments
1021 ///
1022 /// * `timeout` - The maximum duration to wait
1023 ///
1024 /// # Returns
1025 ///
1026 /// * `Ok(SelectedOperation)` if an operation became ready
1027 /// * `Err(SelectTimeoutError)` if the timeout expired
1028 ///
1029 /// # Examples
1030 ///
1031 /// ```rust
1032 /// use linch::schannel;
1033 /// use std::time::Duration;
1034 ///
1035 /// let (_tx, rx) = schannel::bounded::<i32>(1);
1036 /// let mut sel = schannel::Select::new();
1037 /// sel.recv(&rx);
1038 ///
1039 /// // This will timeout since no values are available
1040 /// let result = sel.select_timeout(Duration::from_millis(10));
1041 /// assert!(result.is_err());
1042 /// ```
1043 pub fn select_timeout(
1044 &mut self,
1045 timeout: Duration,
1046 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
1047 Ok(SelectedOperation::from(
1048 self.select.select_timeout(timeout)?,
1049 ))
1050 }
1051
1052 /// Returns the index of a ready operation, if any.
1053 ///
1054 /// This is a non-blocking operation that returns immediately.
1055 ///
1056 /// # Returns
1057 ///
1058 /// The index of a ready operation, or a value indicating no operations are ready
1059 pub fn ready(&mut self) -> usize {
1060 self.select.ready()
1061 }
1062
1063 /// Returns the index of a ready operation, waiting up to the timeout.
1064 ///
1065 /// # Arguments
1066 ///
1067 /// * `timeout` - The maximum duration to wait
1068 ///
1069 /// # Returns
1070 ///
1071 /// * `Ok(index)` if an operation became ready
1072 /// * `Err(ReadyTimeoutError)` if the timeout expired
1073 pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1074 Ok(self.select.ready_timeout(timeout)?)
1075 }
1076
1077 /// Removes an operation from the select.
1078 ///
1079 /// # Arguments
1080 ///
1081 /// * `index` - The index of the operation to remove
1082 pub fn remove(&mut self, index: usize) {
1083 self.select.remove(index);
1084 }
1085}
1086
1087/// A selected operation that is ready to complete.
1088///
1089/// This struct represents an operation that was selected by a [`Select`] and is
1090/// ready to be executed. You can use the [`index`](SelectedOperation::index) method
1091/// to determine which operation was selected, and then call the appropriate
1092/// [`send`](SelectedOperation::send) or [`recv`](SelectedOperation::recv) method.
1093///
1094/// # Examples
1095///
1096/// ```rust
1097/// use linch::schannel;
1098///
1099/// let (tx, rx) = schannel::bounded(1);
1100/// tx.send(42).unwrap();
1101///
1102/// let mut sel = schannel::Select::new();
1103/// let recv_idx = sel.recv(&rx);
1104///
1105/// let op = sel.select();
1106/// if op.index() == recv_idx {
1107/// let value = op.recv(&rx).unwrap();
1108/// assert_eq!(value, 42);
1109/// }
1110/// ```
1111pub struct SelectedOperation<'a>(crossbeam_channel::SelectedOperation<'a>);
1112
1113impl<'a> From<crossbeam_channel::SelectedOperation<'a>> for SelectedOperation<'a> {
1114 fn from(value: crossbeam_channel::SelectedOperation<'a>) -> Self {
1115 Self(value)
1116 }
1117}
1118
1119impl<'a> SelectedOperation<'a> {
1120 /// Returns the index of the selected operation.
1121 ///
1122 /// This index corresponds to the value returned by the [`send`](Select::send)
1123 /// or [`recv`](Select::recv) method that added this operation to the select.
1124 ///
1125 /// # Returns
1126 ///
1127 /// The index of the selected operation
1128 ///
1129 /// # Examples
1130 ///
1131 /// ```rust
1132 /// use linch::schannel;
1133 ///
1134 /// let (tx, rx) = schannel::bounded::<i32>(1);
1135 /// tx.send(42).unwrap();
1136 ///
1137 /// let mut sel = schannel::Select::new();
1138 /// let recv_idx = sel.recv(&rx);
1139 ///
1140 /// let op = sel.select();
1141 /// assert_eq!(op.index(), recv_idx);
1142 /// let value = op.recv(&rx).unwrap();
1143 /// assert_eq!(value, 42);
1144 /// ```
1145 pub fn index(&self) -> usize {
1146 self.0.index()
1147 }
1148
1149 /// Completes the selected send operation.
1150 ///
1151 /// This method should only be called if the selected operation was a send operation.
1152 ///
1153 /// # Arguments
1154 ///
1155 /// * `sender` - The sender that was selected
1156 /// * `msg` - The message to send
1157 ///
1158 /// # Returns
1159 ///
1160 /// * `Ok(())` if the message was sent successfully
1161 /// * `Err(SendError(msg))` if all receivers have been dropped
1162 ///
1163 /// # Examples
1164 ///
1165 /// ```rust
1166 /// use linch::schannel;
1167 ///
1168 /// let (tx, rx) = schannel::bounded(1);
1169 /// let mut sel = schannel::Select::new();
1170 /// let send_idx = sel.send(&tx);
1171 ///
1172 /// let op = sel.select();
1173 /// if op.index() == send_idx {
1174 /// op.send(&tx, 42).unwrap();
1175 /// assert_eq!(rx.recv().unwrap(), 42);
1176 /// }
1177 /// ```
1178 pub fn send<T>(self, sender: &'a Sender<T>, msg: T) -> Result<(), SendError<T>> {
1179 Ok(self.0.send(&sender.tx, msg)?)
1180 }
1181
1182 /// Completes the selected receive operation.
1183 ///
1184 /// This method should only be called if the selected operation was a receive operation.
1185 ///
1186 /// # Arguments
1187 ///
1188 /// * `receiver` - The receiver that was selected
1189 ///
1190 /// # Returns
1191 ///
1192 /// * `Ok(value)` if a value was received successfully
1193 /// * `Err(RecvError)` if all senders have been dropped and the channel is empty
1194 ///
1195 /// # Examples
1196 ///
1197 /// ```rust
1198 /// use linch::schannel;
1199 ///
1200 /// let (tx, rx) = schannel::bounded(1);
1201 /// tx.send(42).unwrap();
1202 ///
1203 /// let mut sel = schannel::Select::new();
1204 /// let recv_idx = sel.recv(&rx);
1205 ///
1206 /// let op = sel.select();
1207 /// if op.index() == recv_idx {
1208 /// let value = op.recv(&rx).unwrap();
1209 /// assert_eq!(value, 42);
1210 /// }
1211 /// ```
1212 pub fn recv<T>(self, receiver: &'a Receiver<T>) -> Result<T, RecvError> {
1213 Ok(self.0.recv(&receiver.rx)?)
1214 }
1215}
1216
1217#[cfg(test)]
1218mod tests {
1219 use super::*;
1220 use crossbeam_channel;
1221 use std::{thread, time::Duration};
1222 use tokio::time::timeout;
1223
1224 fn create_bounded_channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
1225 let (tx, rx) = crossbeam_channel::bounded(capacity);
1226 (Sender::new(tx), Receiver::new(rx))
1227 }
1228
1229 // Test 1: Synchronous send, synchronous receive
1230 #[test]
1231 fn test_sync_send_sync_recv_success() {
1232 let (sender, receiver) = create_bounded_channel(1);
1233
1234 // Send synchronously
1235 sender.send(42).unwrap();
1236
1237 // Receive synchronously
1238 let received = receiver.recv().unwrap();
1239 assert_eq!(received, 42);
1240 }
1241
1242 #[test]
1243 fn test_sync_send_sync_recv_with_timeout_success() {
1244 let (sender, receiver) = create_bounded_channel(1);
1245
1246 // Send synchronously
1247 sender.send(42).unwrap();
1248
1249 // Receive synchronously with timeout
1250 let received = receiver.recv_timeout(Duration::from_millis(100)).unwrap();
1251 assert_eq!(received, 42);
1252 }
1253
1254 #[test]
1255 fn test_sync_send_sync_recv_timeout() {
1256 let (_sender, receiver) = create_bounded_channel::<i32>(1);
1257
1258 // Try to receive with timeout - should timeout since nothing was sent
1259 let result = receiver.recv_timeout(Duration::from_millis(10));
1260 assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
1261 }
1262
1263 #[test]
1264 fn test_sync_send_sync_recv_disconnected() {
1265 let (sender, receiver) = create_bounded_channel::<i32>(1);
1266
1267 // Drop sender to simulate disconnection
1268 drop(sender);
1269
1270 // Try to receive - should return disconnected error
1271 let result = receiver.recv();
1272 assert!(matches!(result, Err(RecvError)));
1273 }
1274
1275 // Test 2: Synchronous send, asynchronous receive
1276 #[tokio::test]
1277 async fn test_sync_send_async_recv_success() {
1278 let (sender, receiver) = create_bounded_channel(1);
1279
1280 // Send synchronously in a separate thread
1281 let sender_clone = sender.clone();
1282 thread::spawn(move || {
1283 thread::sleep(Duration::from_millis(10));
1284 sender_clone.send(42).unwrap();
1285 });
1286
1287 // Receive asynchronously
1288 let received = receiver.recv_async().await.unwrap();
1289 assert_eq!(received, 42);
1290 }
1291
1292 #[tokio::test]
1293 async fn test_sync_send_async_recv_with_timeout_success() {
1294 let (sender, receiver) = create_bounded_channel(1);
1295
1296 // Send synchronously in a separate thread
1297 let sender_clone = sender.clone();
1298 thread::spawn(move || {
1299 thread::sleep(Duration::from_millis(10));
1300 sender_clone.send(42).unwrap();
1301 });
1302
1303 // Receive asynchronously with timeout
1304 let received = receiver
1305 .recv_timeout_async(Duration::from_millis(100))
1306 .await
1307 .unwrap();
1308 assert_eq!(received, 42);
1309 }
1310
1311 #[tokio::test]
1312 async fn test_sync_send_async_recv_timeout() {
1313 let (_sender, receiver) = create_bounded_channel::<i32>(1);
1314
1315 // Try to receive with timeout - should timeout since nothing was sent
1316 let result = receiver.recv_timeout_async(Duration::from_millis(10)).await;
1317 assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
1318 }
1319
1320 #[tokio::test]
1321 async fn test_sync_send_async_recv_disconnected() {
1322 let (sender, receiver) = create_bounded_channel::<i32>(1);
1323
1324 // Drop sender to simulate disconnection
1325 drop(sender);
1326
1327 // Try to receive asynchronously - should return disconnected error
1328 let result = receiver.recv_async().await;
1329 assert!(matches!(result, Err(RecvError)));
1330 }
1331
1332 // Test 3: Asynchronous send, synchronous receive
1333 #[tokio::test]
1334 async fn test_async_send_sync_recv_success() {
1335 let (sender, receiver) = create_bounded_channel(1);
1336
1337 // Send asynchronously and receive synchronously in separate tasks
1338 let send_task = tokio::spawn(async move {
1339 sender.send_async(42).await.unwrap();
1340 // rx may have already dropped, so we need to ignore the error
1341 let _ = sender.send_async(43).await;
1342 });
1343
1344 let recv_task = tokio::spawn(async move {
1345 // Small delay to ensure send happens first
1346 tokio::time::sleep(Duration::from_millis(10)).await;
1347 receiver.recv().unwrap()
1348 });
1349
1350 let (_, received) = tokio::try_join!(send_task, recv_task).unwrap();
1351 assert_eq!(received, 42);
1352 }
1353
1354 #[tokio::test]
1355 async fn test_async_send_sync_recv_with_timeout_success() {
1356 let (sender, receiver) = create_bounded_channel(1);
1357
1358 // Send asynchronously
1359 let send_task = tokio::spawn(async move {
1360 sender.send_async(42).await.unwrap();
1361 // rx may have already dropped, so we need to ignore the error
1362 let _ = sender.send_async(43).await;
1363 });
1364
1365 let recv_task = tokio::spawn(async move {
1366 // Small delay to ensure send happens first
1367 tokio::time::sleep(Duration::from_millis(10)).await;
1368 receiver.recv_timeout(Duration::from_millis(100)).unwrap()
1369 });
1370
1371 let (_, received) = tokio::try_join!(send_task, recv_task).unwrap();
1372 assert_eq!(received, 42);
1373 }
1374
1375 #[tokio::test]
1376 async fn test_async_send_sync_recv_timeout() {
1377 let (_sender, receiver) = create_bounded_channel::<i32>(1);
1378
1379 // Try to receive with timeout - should timeout since nothing was sent
1380 let result =
1381 tokio::task::spawn_blocking(move || receiver.recv_timeout(Duration::from_millis(10)))
1382 .await
1383 .unwrap();
1384
1385 assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
1386 }
1387
1388 #[tokio::test]
1389 async fn test_async_send_with_timeout_success() {
1390 let (sender, receiver) = create_bounded_channel(1);
1391
1392 // Send with timeout should succeed when channel has capacity
1393 let result = sender
1394 .send_timeout_async(42, Duration::from_millis(100))
1395 .await;
1396 assert!(result.is_ok());
1397
1398 // Verify the message was sent
1399 let received = receiver.recv().unwrap();
1400 assert_eq!(received, 42);
1401 }
1402
1403 #[tokio::test]
1404 async fn test_async_send_timeout() {
1405 let (sender, _receiver) = create_bounded_channel(1);
1406
1407 // Fill the channel
1408 sender.send(1).unwrap();
1409
1410 // Try to send with a short timeout - should timeout
1411 let result = sender
1412 .send_timeout_async(2, Duration::from_millis(10))
1413 .await;
1414 assert!(matches!(result, Err(SendTimeoutError::Timeout(_))));
1415 }
1416
1417 #[tokio::test]
1418 async fn test_async_send_disconnected() {
1419 let (sender, receiver) = create_bounded_channel::<i32>(1);
1420
1421 // Drop receiver to simulate disconnection
1422 drop(receiver);
1423
1424 // Try to send asynchronously - should return disconnected error
1425 let result = sender.send_async(42).await;
1426 assert!(matches!(result, Err(SendError(_))));
1427 }
1428
1429 // Test 4: Asynchronous send, asynchronous receive
1430 #[tokio::test]
1431 async fn test_async_send_async_recv_success() {
1432 let (sender, receiver) = create_bounded_channel(1);
1433
1434 // Send and receive asynchronously
1435 let send_task = tokio::spawn(async move {
1436 sender.send_async(42).await.unwrap();
1437 });
1438
1439 let recv_task = tokio::spawn(async move { receiver.recv_async().await.unwrap() });
1440
1441 let (_, received) = tokio::try_join!(send_task, recv_task).unwrap();
1442 assert_eq!(received, 42);
1443 }
1444
1445 #[tokio::test]
1446 async fn test_async_send_async_recv_with_timeout_success() {
1447 let (sender, receiver) = create_bounded_channel(1);
1448
1449 // Send asynchronously
1450 let send_task = tokio::spawn(async move {
1451 tokio::time::sleep(Duration::from_millis(10)).await;
1452 sender.send_async(42).await.unwrap();
1453 });
1454
1455 let recv_task = tokio::spawn(async move {
1456 receiver
1457 .recv_timeout_async(Duration::from_millis(100))
1458 .await
1459 .unwrap()
1460 });
1461
1462 let (_, received) = tokio::try_join!(send_task, recv_task).unwrap();
1463 assert_eq!(received, 42);
1464 }
1465
1466 #[tokio::test]
1467 async fn test_async_send_async_recv_timeout() {
1468 let (_sender, receiver) = create_bounded_channel::<i32>(1);
1469
1470 // Try to receive with timeout - should timeout since nothing was sent
1471 let result = receiver.recv_timeout_async(Duration::from_millis(10)).await;
1472 assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
1473 }
1474
1475 #[tokio::test]
1476 async fn test_async_send_async_recv_both_timeout() {
1477 let (sender, _receiver) = create_bounded_channel(1);
1478
1479 // Fill the channel
1480 sender.send(1).unwrap();
1481
1482 // Try to send with timeout (should timeout)
1483 let send_result = sender
1484 .send_timeout_async(2, Duration::from_millis(10))
1485 .await;
1486 assert!(matches!(send_result, Err(SendTimeoutError::Timeout(_))));
1487
1488 // Try to receive from empty channel with timeout (should timeout)
1489 let (_sender2, receiver2) = create_bounded_channel::<i32>(1);
1490 let recv_result = receiver2
1491 .recv_timeout_async(Duration::from_millis(10))
1492 .await;
1493 assert!(matches!(recv_result, Err(RecvTimeoutError::Timeout)));
1494 }
1495
1496 #[tokio::test]
1497 async fn test_async_send_async_recv_disconnected() {
1498 let (sender, receiver) = create_bounded_channel::<i32>(1);
1499
1500 // Test send disconnection
1501 drop(receiver);
1502 let send_result = sender.send_async(42).await;
1503 assert!(matches!(send_result, Err(SendError(_))));
1504
1505 // Test receive disconnection
1506 let (sender2, receiver2) = create_bounded_channel::<i32>(1);
1507 drop(sender2);
1508 let recv_result = receiver2.recv_async().await;
1509 assert!(matches!(recv_result, Err(RecvError)));
1510 }
1511
1512 // Additional edge case tests
1513 #[tokio::test]
1514 async fn test_multiple_senders_single_receiver() {
1515 let (sender, receiver) = create_bounded_channel(1);
1516
1517 let sender1 = sender.clone();
1518 let sender2 = sender.clone();
1519
1520 // Send from multiple senders
1521 let send_task1 = tokio::spawn(async move {
1522 sender1.send_async(1).await.unwrap();
1523 });
1524
1525 let send_task2 = tokio::spawn(async move {
1526 sender2.send_async(2).await.unwrap();
1527 });
1528
1529 // Receive both messages
1530 let recv_task = tokio::spawn(async move {
1531 let mut received = vec![];
1532 received.push(receiver.recv_async().await.unwrap());
1533 received.push(receiver.recv_async().await.unwrap());
1534 received.sort(); // Sort since order is not guaranteed
1535 received
1536 });
1537
1538 let (_, _, received) = tokio::try_join!(send_task1, send_task2, recv_task).unwrap();
1539 assert_eq!(received, vec![1, 2]);
1540 }
1541
1542 #[tokio::test]
1543 async fn test_bounded_channel_backpressure() {
1544 let (sender, receiver) = create_bounded_channel(2);
1545
1546 // Fill the channel to capacity
1547 sender.send(1).unwrap();
1548 sender.send(2).unwrap();
1549
1550 // This send should block/timeout since channel is full
1551 let send_result = timeout(Duration::from_millis(10), sender.send_async(3)).await;
1552
1553 assert!(send_result.is_err()); // Should timeout
1554
1555 // Receive one item to make space
1556 let received = receiver.recv().unwrap();
1557 assert_eq!(received, 1);
1558
1559 // Now the send should succeed
1560 let send_result = sender.send_async(3).await;
1561 assert!(send_result.is_ok());
1562 }
1563
1564 #[test]
1565 fn test_sender_receiver_debug_clone() {
1566 let (sender, receiver) = create_bounded_channel::<i32>(1);
1567
1568 // Test Debug implementation
1569 let debug_str = format!("{:?}", sender);
1570 assert!(debug_str.contains("Sender"));
1571
1572 let debug_str = format!("{:?}", receiver);
1573 assert!(debug_str.contains("Receiver"));
1574
1575 // Test Clone implementation for Sender
1576 let _sender_clone = sender.clone();
1577
1578 // Test Clone implementation for Receiver
1579 let _receiver_clone = receiver.clone();
1580 }
1581
1582 #[test]
1583 fn test_deref_implementations() {
1584 let (sender, receiver) = create_bounded_channel(1);
1585
1586 // Test that we can use CrossbeamSender methods directly
1587 sender.send(42).unwrap();
1588
1589 // Test that we can use CrossbeamReceiver methods directly
1590 let received = receiver.recv().unwrap();
1591 assert_eq!(received, 42);
1592
1593 // Test try_send and try_recv
1594 let result = sender.try_send(43);
1595 assert!(result.is_ok());
1596
1597 let result = receiver.try_recv();
1598 assert_eq!(result.unwrap(), 43);
1599 }
1600
1601 // ===== SELECT TESTS =====
1602
1603 #[test]
1604 fn test_select_basic_recv() {
1605 let (tx1, rx1) = create_bounded_channel(1);
1606 let (tx2, rx2) = create_bounded_channel(1);
1607
1608 tx1.send(1).unwrap();
1609 tx2.send(2).unwrap();
1610
1611 let mut sel = Select::new();
1612 let idx1 = sel.recv(&rx1);
1613 let idx2 = sel.recv(&rx2);
1614
1615 let op = sel.select();
1616 match op.index() {
1617 i if i == idx1 => {
1618 let value = op.recv(&rx1).unwrap();
1619 assert_eq!(value, 1);
1620 }
1621 i if i == idx2 => {
1622 let value = op.recv(&rx2).unwrap();
1623 assert_eq!(value, 2);
1624 }
1625 _ => panic!("Unexpected index"),
1626 }
1627 }
1628
1629 #[test]
1630 fn test_select_basic_send() {
1631 let (tx1, rx1) = create_bounded_channel(1);
1632 let (tx2, rx2) = create_bounded_channel(1);
1633
1634 let mut sel = Select::new();
1635 let idx1 = sel.send(&tx1);
1636 let idx2 = sel.send(&tx2);
1637
1638 let op = sel.select();
1639 match op.index() {
1640 i if i == idx1 => {
1641 op.send(&tx1, 1).unwrap();
1642 assert_eq!(rx1.recv().unwrap(), 1);
1643 }
1644 i if i == idx2 => {
1645 op.send(&tx2, 2).unwrap();
1646 assert_eq!(rx2.recv().unwrap(), 2);
1647 }
1648 _ => panic!("Unexpected index"),
1649 }
1650 }
1651
1652 #[test]
1653 fn test_select_mixed_operations() {
1654 let (tx1, rx1) = create_bounded_channel(1);
1655 let (tx2, rx2) = create_bounded_channel(1);
1656
1657 // Send to first channel
1658 tx1.send(1).unwrap();
1659
1660 let mut sel = Select::new();
1661 let recv_idx = sel.recv(&rx1);
1662 let send_idx = sel.send(&tx2);
1663
1664 let op = sel.select();
1665 match op.index() {
1666 i if i == recv_idx => {
1667 let value = op.recv(&rx1).unwrap();
1668 assert_eq!(value, 1);
1669 }
1670 i if i == send_idx => {
1671 op.send(&tx2, 2).unwrap();
1672 assert_eq!(rx2.recv().unwrap(), 2);
1673 }
1674 _ => panic!("Unexpected index"),
1675 }
1676 }
1677
1678 #[test]
1679 fn test_select_try_select() {
1680 let (tx, rx) = create_bounded_channel::<i32>(1);
1681
1682 let mut sel = Select::new();
1683 let recv_idx = sel.recv(&rx);
1684
1685 // Should fail since no values are available
1686 assert!(sel.try_select().is_err());
1687
1688 // Send a value
1689 tx.send(42).unwrap();
1690
1691 // Now it should succeed
1692 let op = sel.try_select().unwrap();
1693 assert_eq!(op.index(), recv_idx);
1694 let value = op.recv(&rx).unwrap();
1695 assert_eq!(value, 42);
1696 }
1697
1698 #[test]
1699 fn test_select_timeout() {
1700 let (_tx, rx) = create_bounded_channel::<i32>(1);
1701
1702 let mut sel = Select::new();
1703 sel.recv(&rx);
1704
1705 // Should timeout since no values are available
1706 let result = sel.select_timeout(Duration::from_millis(10));
1707 assert!(result.is_err());
1708 }
1709
1710 #[test]
1711 fn test_select_timeout_success() {
1712 let (tx, rx) = create_bounded_channel(1);
1713
1714 // Send a value in a separate thread
1715 let tx_clone = tx.clone();
1716 thread::spawn(move || {
1717 thread::sleep(Duration::from_millis(50));
1718 tx_clone.send(42).unwrap();
1719 });
1720
1721 let mut sel = Select::new();
1722 let recv_idx = sel.recv(&rx);
1723
1724 // Should succeed within timeout
1725 let result = sel.select_timeout(Duration::from_millis(100));
1726 assert!(result.is_ok());
1727
1728 let op = result.unwrap();
1729 assert_eq!(op.index(), recv_idx);
1730 let value = op.recv(&rx).unwrap();
1731 assert_eq!(value, 42);
1732 }
1733
1734 #[test]
1735 fn test_select_ready() {
1736 let (tx, rx) = create_bounded_channel(1);
1737
1738 let mut sel = Select::new();
1739 let recv_idx = sel.recv(&rx);
1740
1741 // Send a value first
1742 tx.send(42).unwrap();
1743
1744 // Now the receive operation should be ready
1745 assert_eq!(sel.ready(), recv_idx);
1746 }
1747
1748 #[test]
1749 fn test_select_ready_timeout() {
1750 let (_tx, rx) = create_bounded_channel::<i32>(1);
1751
1752 let mut sel = Select::new();
1753 sel.recv(&rx);
1754
1755 // Should timeout since no values are available
1756 let result = sel.ready_timeout(Duration::from_millis(10));
1757 assert!(result.is_err());
1758 }
1759
1760 #[test]
1761 fn test_select_ready_timeout_success() {
1762 let (tx, rx) = create_bounded_channel(1);
1763
1764 // Send a value in a separate thread
1765 let tx_clone = tx.clone();
1766 thread::spawn(move || {
1767 thread::sleep(Duration::from_millis(50));
1768 tx_clone.send(42).unwrap();
1769 });
1770
1771 let mut sel = Select::new();
1772 let recv_idx = sel.recv(&rx);
1773
1774 // Should succeed within timeout
1775 let result = sel.ready_timeout(Duration::from_millis(100));
1776 assert!(result.is_ok());
1777 assert_eq!(result.unwrap(), recv_idx);
1778 }
1779
1780 #[test]
1781 fn test_select_remove() {
1782 let (tx1, rx1) = create_bounded_channel(1);
1783 let (tx2, rx2) = create_bounded_channel(1);
1784
1785 let mut sel = Select::new();
1786 let idx1 = sel.recv(&rx1);
1787 let idx2 = sel.recv(&rx2);
1788
1789 // Remove the first operation
1790 sel.remove(idx1);
1791
1792 // Send to both channels
1793 tx1.send(1).unwrap();
1794 tx2.send(2).unwrap();
1795
1796 // Only the second operation should be selected
1797 let op = sel.select();
1798 assert_eq!(op.index(), idx2);
1799 let value = op.recv(&rx2).unwrap();
1800 assert_eq!(value, 2);
1801 }
1802
1803 #[test]
1804 fn test_select_biased() {
1805 let (tx1, rx1) = create_bounded_channel(1);
1806 let (tx2, rx2) = create_bounded_channel(1);
1807
1808 tx1.send(1).unwrap();
1809 tx2.send(2).unwrap();
1810
1811 let mut sel = Select::new_biased();
1812 let idx1 = sel.recv(&rx1);
1813 let _idx2 = sel.recv(&rx2);
1814
1815 // With biased selection, the first operation should be selected
1816 let op = sel.select();
1817 assert_eq!(op.index(), idx1);
1818 let value = op.recv(&rx1).unwrap();
1819 assert_eq!(value, 1);
1820 }
1821
1822 #[test]
1823 fn test_select_multiple_channels() {
1824 let (tx1, rx1) = create_bounded_channel(1);
1825 let (tx2, rx2) = create_bounded_channel(1);
1826 let (tx3, rx3) = create_bounded_channel(1);
1827
1828 tx1.send(1).unwrap();
1829 tx2.send(2).unwrap();
1830 tx3.send(3).unwrap();
1831
1832 let mut sel = Select::new();
1833 let idx1 = sel.recv(&rx1);
1834 let idx2 = sel.recv(&rx2);
1835 let idx3 = sel.recv(&rx3);
1836
1837 let mut received = Vec::new();
1838
1839 // Select all three operations
1840 for _ in 0..3 {
1841 let op = sel.select();
1842 match op.index() {
1843 i if i == idx1 => {
1844 let value = op.recv(&rx1).unwrap();
1845 received.push(value);
1846 }
1847 i if i == idx2 => {
1848 let value = op.recv(&rx2).unwrap();
1849 received.push(value);
1850 }
1851 i if i == idx3 => {
1852 let value = op.recv(&rx3).unwrap();
1853 received.push(value);
1854 }
1855 _ => panic!("Unexpected index"),
1856 }
1857 }
1858
1859 received.sort();
1860 assert_eq!(received, vec![1, 2, 3]);
1861 }
1862
1863 #[test]
1864 fn test_select_send_blocking() {
1865 let (tx1, rx1) = create_bounded_channel(1);
1866 let (tx2, rx2) = create_bounded_channel(1);
1867
1868 // Fill both channels
1869 tx1.send(1).unwrap();
1870 tx2.send(2).unwrap();
1871
1872 let mut sel = Select::new();
1873 let _send_idx1 = sel.send(&tx1);
1874 let _send_idx2 = sel.send(&tx2);
1875
1876 // Both sends should block, but we can still select
1877 // This will block until one of the receivers consumes a value
1878 let tx1_clone = tx1.clone();
1879 let tx2_clone = tx2.clone();
1880 let rx1_clone = rx1.clone();
1881 let _rx2_clone = rx2.clone();
1882
1883 let sender_handle = thread::spawn(move || {
1884 let mut sel = Select::new();
1885 let send_idx1 = sel.send(&tx1_clone);
1886 let send_idx2 = sel.send(&tx2_clone);
1887
1888 let op = sel.select();
1889 match op.index() {
1890 i if i == send_idx1 => {
1891 op.send(&tx1_clone, 3).unwrap();
1892 }
1893 i if i == send_idx2 => {
1894 op.send(&tx2_clone, 4).unwrap();
1895 }
1896 _ => panic!("Unexpected index"),
1897 }
1898 });
1899
1900 // Consume a value to unblock one of the sends
1901 thread::sleep(Duration::from_millis(10));
1902 let _ = rx1_clone.recv().unwrap();
1903
1904 sender_handle.join().unwrap();
1905
1906 // Verify the new value was sent
1907 let value = rx1.recv().unwrap();
1908 assert_eq!(value, 3);
1909 }
1910
1911 #[test]
1912 fn test_select_receive_blocking() {
1913 let (tx1, rx1) = create_bounded_channel::<i32>(1);
1914 let (tx2, rx2) = create_bounded_channel::<i32>(1);
1915
1916 let mut sel = Select::new();
1917 let _recv_idx1 = sel.recv(&rx1);
1918 let _recv_idx2 = sel.recv(&rx2);
1919
1920 // Both receives should block, but we can still select
1921 // This will block until one of the senders sends a value
1922 let _tx1_clone = tx1.clone();
1923 let _tx2_clone = tx2.clone();
1924 let rx1_clone = rx1.clone();
1925 let rx2_clone = rx2.clone();
1926
1927 let receiver_handle = thread::spawn(move || {
1928 let mut sel = Select::new();
1929 let recv_idx1 = sel.recv(&rx1_clone);
1930 let recv_idx2 = sel.recv(&rx2_clone);
1931
1932 let op = sel.select();
1933 match op.index() {
1934 i if i == recv_idx1 => {
1935 let value = op.recv(&rx1_clone).unwrap();
1936 value
1937 }
1938 i if i == recv_idx2 => {
1939 let value = op.recv(&rx2_clone).unwrap();
1940 value
1941 }
1942 _ => panic!("Unexpected index"),
1943 }
1944 });
1945
1946 // Send a value to unblock one of the receives
1947 thread::sleep(Duration::from_millis(10));
1948 tx1.send(42).unwrap();
1949
1950 let received_value = receiver_handle.join().unwrap();
1951 assert_eq!(received_value, 42);
1952 }
1953
1954 #[test]
1955 fn test_select_disconnected_send() {
1956 let (tx, rx) = create_bounded_channel::<i32>(1);
1957
1958 // Drop the receiver
1959 drop(rx);
1960
1961 let mut sel = Select::new();
1962 let send_idx = sel.send(&tx);
1963
1964 let op = sel.select();
1965 assert_eq!(op.index(), send_idx);
1966
1967 // Send should fail with disconnected error
1968 let result = op.send(&tx, 42);
1969 assert!(result.is_err());
1970 }
1971
1972 #[test]
1973 fn test_select_disconnected_recv() {
1974 let (tx, rx) = create_bounded_channel::<i32>(1);
1975
1976 // Drop the sender
1977 drop(tx);
1978
1979 let mut sel = Select::new();
1980 let recv_idx = sel.recv(&rx);
1981
1982 let op = sel.select();
1983 assert_eq!(op.index(), recv_idx);
1984
1985 // Receive should fail with disconnected error
1986 let result = op.recv(&rx);
1987 assert!(result.is_err());
1988 }
1989
1990 #[test]
1991 fn test_select_default() {
1992 let (tx, rx) = create_bounded_channel(1);
1993 tx.send(42).unwrap();
1994
1995 let mut sel = Select::default(); // Test Default implementation
1996 let recv_idx = sel.recv(&rx);
1997
1998 let op = sel.select();
1999 assert_eq!(op.index(), recv_idx);
2000 let value = op.recv(&rx).unwrap();
2001 assert_eq!(value, 42);
2002 }
2003
2004 #[test]
2005 fn test_select_complex_scenario() {
2006 let (tx1, rx1) = create_bounded_channel(1);
2007 let (tx2, rx2) = create_bounded_channel(1);
2008 let (tx3, rx3) = create_bounded_channel(1);
2009
2010 // Send to first two channels
2011 tx1.send(1).unwrap();
2012 tx2.send(2).unwrap();
2013
2014 let mut sel = Select::new();
2015 let recv_idx1 = sel.recv(&rx1);
2016 let recv_idx2 = sel.recv(&rx2);
2017 let send_idx3 = sel.send(&tx3);
2018
2019 let mut received = Vec::new();
2020
2021 // First selection should be one of the receives
2022 let op = sel.select();
2023 match op.index() {
2024 i if i == recv_idx1 => {
2025 let value = op.recv(&rx1).unwrap();
2026 received.push(value);
2027 }
2028 i if i == recv_idx2 => {
2029 let value = op.recv(&rx2).unwrap();
2030 received.push(value);
2031 }
2032 _ => panic!("Unexpected index for first selection"),
2033 }
2034
2035 // Second selection should be the other receive
2036 let op = sel.select();
2037 match op.index() {
2038 i if i == recv_idx1 => {
2039 let value = op.recv(&rx1).unwrap();
2040 received.push(value);
2041 }
2042 i if i == recv_idx2 => {
2043 let value = op.recv(&rx2).unwrap();
2044 received.push(value);
2045 }
2046 _ => panic!("Unexpected index for second selection"),
2047 }
2048
2049 // Third selection should be the send
2050 let op = sel.select();
2051 assert_eq!(op.index(), send_idx3);
2052 op.send(&tx3, 3).unwrap();
2053
2054 // Verify all operations completed correctly
2055 received.sort();
2056 assert_eq!(received, vec![1, 2]);
2057 assert_eq!(rx3.recv().unwrap(), 3);
2058 }
2059}