use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
use crate::error::{
ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, SendError,
SendTimeoutError, TryRecvError, TrySelectError, TrySendError,
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use futures::Stream;
use pin_project_lite::pin_project;
pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
assert!(capacity > 0);
let (tx, rx) = crossbeam_channel::bounded(capacity);
(Sender::new(tx), Receiver::new(rx))
}
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let (tx, rx) = crossbeam_channel::unbounded();
(Sender::new(tx), Receiver::new(rx))
}
#[derive(Debug)]
pub struct Sender<T> {
tx: CrossbeamSender<T>,
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}
impl<T> Sender<T> {
pub fn new(tx: CrossbeamSender<T>) -> Self {
Self { tx }
}
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
Ok(self.tx.send(value)?)
}
pub fn send_timeout(&self, value: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
Ok(self.tx.send_timeout(value, timeout)?)
}
pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
Ok(self.tx.try_send(value)?)
}
pub fn send_timeout_async(&self, item: T, timeout: Duration) -> SendTimeoutFut<'_, T> {
SendTimeoutFut {
tx: self,
item: Some(item),
deadline: Some(Instant::now() + timeout),
}
}
pub fn send_async(&self, item: T) -> SendFut<'_, T> {
SendFut {
fut: SendTimeoutFut {
tx: self,
item: Some(item),
deadline: None,
},
}
}
}
impl<T> From<CrossbeamSender<T>> for Sender<T> {
fn from(tx: CrossbeamSender<T>) -> Self {
Self { tx }
}
}
pub struct SendFut<'a, T> {
fut: SendTimeoutFut<'a, T>,
}
impl<'a, T> SendFut<'a, T> {
pub fn new(tx: &'a Sender<T>, item: T, timeout: Option<Duration>) -> Self {
Self {
fut: SendTimeoutFut {
tx,
item: Some(item),
deadline: timeout.map(|d| Instant::now() + d),
},
}
}
}
impl<'a, T> Future for SendFut<'a, T> {
type Output = Result<(), SendError<T>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.fut).poll(cx) {
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(SendTimeoutError::Timeout(_))) => {
unreachable!("SendTimeoutError::Timeout should not be returned by SendFut");
}
Poll::Ready(Err(SendTimeoutError::Disconnected(item))) => {
Poll::Ready(Err(SendError(item)))
}
Poll::Pending => Poll::Pending,
}
}
}
pin_project! {
pub struct SendTimeoutFut<'a, T> {
tx: &'a Sender<T>,
item: Option<T>,
deadline: Option<Instant>,
}
}
impl<'a, T> Future for SendTimeoutFut<'a, T> {
type Output = Result<(), SendTimeoutError<T>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Some(deadline) = this.deadline {
if deadline <= &mut Instant::now() {
return Poll::Ready(Err(SendTimeoutError::Timeout(this.item.take().unwrap())));
}
}
match this.tx.try_send(this.item.take().unwrap()) {
Ok(_) => Poll::Ready(Ok(())),
Err(e) => match e {
TrySendError::Disconnected(item) => {
Poll::Ready(Err(SendTimeoutError::Disconnected(item)))
}
TrySendError::Full(item) => {
this.item.replace(item);
cx.waker().wake_by_ref();
Poll::Pending
}
},
}
}
}
#[derive(Debug)]
pub struct Receiver<T> {
rx: CrossbeamReceiver<T>,
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
Self {
rx: self.rx.clone(),
}
}
}
impl<T> Receiver<T> {
pub fn new(rx: CrossbeamReceiver<T>) -> Self {
Self { rx }
}
pub fn recv(&self) -> Result<T, RecvError> {
Ok(self.rx.recv()?)
}
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
Ok(self.rx.recv_timeout(timeout)?)
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
Ok(self.rx.try_recv()?)
}
pub fn recv_timeout_async(&self, timeout: Duration) -> RecvTimeoutFut<'_, T> {
RecvTimeoutFut {
rx: self,
deadline: Some(Instant::now() + timeout),
}
}
pub fn recv_async(&self) -> RecvFut<'_, T> {
RecvFut {
fut: RecvTimeoutFut {
rx: self,
deadline: None,
},
}
}
pub fn into_stream(self) -> RecvStream<T> {
RecvStream { rx: self }
}
}
impl<T> From<CrossbeamReceiver<T>> for Receiver<T> {
fn from(rx: CrossbeamReceiver<T>) -> Self {
Self { rx }
}
}
pin_project! {
pub struct RecvTimeoutFut<'a, T> {
rx: &'a Receiver<T>,
deadline: Option<Instant>,
}
}
impl<'a, T> Future for RecvTimeoutFut<'a, T> {
type Output = Result<T, RecvTimeoutError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Some(deadline) = this.deadline {
if deadline <= &mut Instant::now() {
return Poll::Ready(Err(RecvTimeoutError::Timeout));
}
}
match this.rx.try_recv() {
Ok(item) => Poll::Ready(Ok(item)),
Err(e) => match e {
TryRecvError::Disconnected => Poll::Ready(Err(RecvTimeoutError::Disconnected)),
TryRecvError::Empty => {
cx.waker().wake_by_ref();
Poll::Pending
}
},
}
}
}
pub struct RecvFut<'a, T> {
fut: RecvTimeoutFut<'a, T>,
}
impl<'a, T> Future for RecvFut<'a, T> {
type Output = Result<T, RecvError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.fut).poll(cx) {
Poll::Ready(Ok(item)) => Poll::Ready(Ok(item)),
Poll::Ready(Err(RecvTimeoutError::Timeout)) => {
unreachable!("RecvTimeoutError::Timeout should not be returned by RecvTimeoutFut");
}
Poll::Ready(Err(RecvTimeoutError::Disconnected)) => Poll::Ready(Err(RecvError)),
Poll::Pending => Poll::Pending,
}
}
}
pub struct RecvStream<T> {
rx: Receiver<T>,
}
impl<T> Stream for RecvStream<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut fut = self.rx.recv_async();
match Pin::new(&mut fut).poll(cx) {
Poll::Ready(Ok(item)) => Poll::Ready(Some(item)),
Poll::Ready(Err(_e)) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
pub struct Select<'a> {
select: crossbeam_channel::Select<'a>,
}
impl<'a> Default for Select<'a> {
fn default() -> Self {
Self::new()
}
}
impl<'a> Select<'a> {
pub fn new() -> Self {
let select = crossbeam_channel::Select::new();
Self { select }
}
pub fn new_biased() -> Self {
let select = crossbeam_channel::Select::new_biased();
Self { select }
}
pub fn send<T>(&mut self, sender: &'a Sender<T>) -> usize {
self.select.send(&sender.tx)
}
pub fn recv<T>(&mut self, receiver: &'a Receiver<T>) -> usize {
self.select.recv(&receiver.rx)
}
pub fn select(&mut self) -> SelectedOperation<'a> {
self.select.select().into()
}
pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
Ok(SelectedOperation::from(self.select.try_select()?))
}
pub fn select_timeout(
&mut self,
timeout: Duration,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
Ok(SelectedOperation::from(
self.select.select_timeout(timeout)?,
))
}
pub fn ready(&mut self) -> usize {
self.select.ready()
}
pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
Ok(self.select.ready_timeout(timeout)?)
}
pub fn remove(&mut self, index: usize) {
self.select.remove(index);
}
}
pub struct SelectedOperation<'a>(crossbeam_channel::SelectedOperation<'a>);
impl<'a> From<crossbeam_channel::SelectedOperation<'a>> for SelectedOperation<'a> {
fn from(value: crossbeam_channel::SelectedOperation<'a>) -> Self {
Self(value)
}
}
impl<'a> SelectedOperation<'a> {
pub fn index(&self) -> usize {
self.0.index()
}
pub fn send<T>(self, sender: &'a Sender<T>, msg: T) -> Result<(), SendError<T>> {
Ok(self.0.send(&sender.tx, msg)?)
}
pub fn recv<T>(self, receiver: &'a Receiver<T>) -> Result<T, RecvError> {
Ok(self.0.recv(&receiver.rx)?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crossbeam_channel;
use std::{thread, time::Duration};
use tokio::time::timeout;
fn create_bounded_channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = crossbeam_channel::bounded(capacity);
(Sender::new(tx), Receiver::new(rx))
}
#[test]
fn test_sync_send_sync_recv_success() {
let (sender, receiver) = create_bounded_channel(1);
sender.send(42).unwrap();
let received = receiver.recv().unwrap();
assert_eq!(received, 42);
}
#[test]
fn test_sync_send_sync_recv_with_timeout_success() {
let (sender, receiver) = create_bounded_channel(1);
sender.send(42).unwrap();
let received = receiver.recv_timeout(Duration::from_millis(100)).unwrap();
assert_eq!(received, 42);
}
#[test]
fn test_sync_send_sync_recv_timeout() {
let (_sender, receiver) = create_bounded_channel::<i32>(1);
let result = receiver.recv_timeout(Duration::from_millis(10));
assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
}
#[test]
fn test_sync_send_sync_recv_disconnected() {
let (sender, receiver) = create_bounded_channel::<i32>(1);
drop(sender);
let result = receiver.recv();
assert!(matches!(result, Err(RecvError)));
}
#[tokio::test]
async fn test_sync_send_async_recv_success() {
let (sender, receiver) = create_bounded_channel(1);
let sender_clone = sender.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
sender_clone.send(42).unwrap();
});
let received = receiver.recv_async().await.unwrap();
assert_eq!(received, 42);
}
#[tokio::test]
async fn test_sync_send_async_recv_with_timeout_success() {
let (sender, receiver) = create_bounded_channel(1);
let sender_clone = sender.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
sender_clone.send(42).unwrap();
});
let received = receiver
.recv_timeout_async(Duration::from_millis(100))
.await
.unwrap();
assert_eq!(received, 42);
}
#[tokio::test]
async fn test_sync_send_async_recv_timeout() {
let (_sender, receiver) = create_bounded_channel::<i32>(1);
let result = receiver.recv_timeout_async(Duration::from_millis(10)).await;
assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
}
#[tokio::test]
async fn test_sync_send_async_recv_disconnected() {
let (sender, receiver) = create_bounded_channel::<i32>(1);
drop(sender);
let result = receiver.recv_async().await;
assert!(matches!(result, Err(RecvError)));
}
#[tokio::test]
async fn test_async_send_sync_recv_success() {
let (sender, receiver) = create_bounded_channel(1);
let send_task = tokio::spawn(async move {
sender.send_async(42).await.unwrap();
let _ = sender.send_async(43).await;
});
let recv_task = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
receiver.recv().unwrap()
});
let (_, received) = tokio::try_join!(send_task, recv_task).unwrap();
assert_eq!(received, 42);
}
#[tokio::test]
async fn test_async_send_sync_recv_with_timeout_success() {
let (sender, receiver) = create_bounded_channel(1);
let send_task = tokio::spawn(async move {
sender.send_async(42).await.unwrap();
let _ = sender.send_async(43).await;
});
let recv_task = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
receiver.recv_timeout(Duration::from_millis(100)).unwrap()
});
let (_, received) = tokio::try_join!(send_task, recv_task).unwrap();
assert_eq!(received, 42);
}
#[tokio::test]
async fn test_async_send_sync_recv_timeout() {
let (_sender, receiver) = create_bounded_channel::<i32>(1);
let result =
tokio::task::spawn_blocking(move || receiver.recv_timeout(Duration::from_millis(10)))
.await
.unwrap();
assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
}
#[tokio::test]
async fn test_async_send_with_timeout_success() {
let (sender, receiver) = create_bounded_channel(1);
let result = sender
.send_timeout_async(42, Duration::from_millis(100))
.await;
assert!(result.is_ok());
let received = receiver.recv().unwrap();
assert_eq!(received, 42);
}
#[tokio::test]
async fn test_async_send_timeout() {
let (sender, _receiver) = create_bounded_channel(1);
sender.send(1).unwrap();
let result = sender
.send_timeout_async(2, Duration::from_millis(10))
.await;
assert!(matches!(result, Err(SendTimeoutError::Timeout(_))));
}
#[tokio::test]
async fn test_async_send_disconnected() {
let (sender, receiver) = create_bounded_channel::<i32>(1);
drop(receiver);
let result = sender.send_async(42).await;
assert!(matches!(result, Err(SendError(_))));
}
#[tokio::test]
async fn test_async_send_async_recv_success() {
let (sender, receiver) = create_bounded_channel(1);
let send_task = tokio::spawn(async move {
sender.send_async(42).await.unwrap();
});
let recv_task = tokio::spawn(async move { receiver.recv_async().await.unwrap() });
let (_, received) = tokio::try_join!(send_task, recv_task).unwrap();
assert_eq!(received, 42);
}
#[tokio::test]
async fn test_async_send_async_recv_with_timeout_success() {
let (sender, receiver) = create_bounded_channel(1);
let send_task = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
sender.send_async(42).await.unwrap();
});
let recv_task = tokio::spawn(async move {
receiver
.recv_timeout_async(Duration::from_millis(100))
.await
.unwrap()
});
let (_, received) = tokio::try_join!(send_task, recv_task).unwrap();
assert_eq!(received, 42);
}
#[tokio::test]
async fn test_async_send_async_recv_timeout() {
let (_sender, receiver) = create_bounded_channel::<i32>(1);
let result = receiver.recv_timeout_async(Duration::from_millis(10)).await;
assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
}
#[tokio::test]
async fn test_async_send_async_recv_both_timeout() {
let (sender, _receiver) = create_bounded_channel(1);
sender.send(1).unwrap();
let send_result = sender
.send_timeout_async(2, Duration::from_millis(10))
.await;
assert!(matches!(send_result, Err(SendTimeoutError::Timeout(_))));
let (_sender2, receiver2) = create_bounded_channel::<i32>(1);
let recv_result = receiver2
.recv_timeout_async(Duration::from_millis(10))
.await;
assert!(matches!(recv_result, Err(RecvTimeoutError::Timeout)));
}
#[tokio::test]
async fn test_async_send_async_recv_disconnected() {
let (sender, receiver) = create_bounded_channel::<i32>(1);
drop(receiver);
let send_result = sender.send_async(42).await;
assert!(matches!(send_result, Err(SendError(_))));
let (sender2, receiver2) = create_bounded_channel::<i32>(1);
drop(sender2);
let recv_result = receiver2.recv_async().await;
assert!(matches!(recv_result, Err(RecvError)));
}
#[tokio::test]
async fn test_multiple_senders_single_receiver() {
let (sender, receiver) = create_bounded_channel(1);
let sender1 = sender.clone();
let sender2 = sender.clone();
let send_task1 = tokio::spawn(async move {
sender1.send_async(1).await.unwrap();
});
let send_task2 = tokio::spawn(async move {
sender2.send_async(2).await.unwrap();
});
let recv_task = tokio::spawn(async move {
let mut received = vec![];
received.push(receiver.recv_async().await.unwrap());
received.push(receiver.recv_async().await.unwrap());
received.sort(); received
});
let (_, _, received) = tokio::try_join!(send_task1, send_task2, recv_task).unwrap();
assert_eq!(received, vec![1, 2]);
}
#[tokio::test]
async fn test_bounded_channel_backpressure() {
let (sender, receiver) = create_bounded_channel(2);
sender.send(1).unwrap();
sender.send(2).unwrap();
let send_result = timeout(Duration::from_millis(10), sender.send_async(3)).await;
assert!(send_result.is_err());
let received = receiver.recv().unwrap();
assert_eq!(received, 1);
let send_result = sender.send_async(3).await;
assert!(send_result.is_ok());
}
#[test]
fn test_sender_receiver_debug_clone() {
let (sender, receiver) = create_bounded_channel::<i32>(1);
let debug_str = format!("{:?}", sender);
assert!(debug_str.contains("Sender"));
let debug_str = format!("{:?}", receiver);
assert!(debug_str.contains("Receiver"));
let _sender_clone = sender.clone();
let _receiver_clone = receiver.clone();
}
#[test]
fn test_deref_implementations() {
let (sender, receiver) = create_bounded_channel(1);
sender.send(42).unwrap();
let received = receiver.recv().unwrap();
assert_eq!(received, 42);
let result = sender.try_send(43);
assert!(result.is_ok());
let result = receiver.try_recv();
assert_eq!(result.unwrap(), 43);
}
#[test]
fn test_select_basic_recv() {
let (tx1, rx1) = create_bounded_channel(1);
let (tx2, rx2) = create_bounded_channel(1);
tx1.send(1).unwrap();
tx2.send(2).unwrap();
let mut sel = Select::new();
let idx1 = sel.recv(&rx1);
let idx2 = sel.recv(&rx2);
let op = sel.select();
match op.index() {
i if i == idx1 => {
let value = op.recv(&rx1).unwrap();
assert_eq!(value, 1);
}
i if i == idx2 => {
let value = op.recv(&rx2).unwrap();
assert_eq!(value, 2);
}
_ => panic!("Unexpected index"),
}
}
#[test]
fn test_select_basic_send() {
let (tx1, rx1) = create_bounded_channel(1);
let (tx2, rx2) = create_bounded_channel(1);
let mut sel = Select::new();
let idx1 = sel.send(&tx1);
let idx2 = sel.send(&tx2);
let op = sel.select();
match op.index() {
i if i == idx1 => {
op.send(&tx1, 1).unwrap();
assert_eq!(rx1.recv().unwrap(), 1);
}
i if i == idx2 => {
op.send(&tx2, 2).unwrap();
assert_eq!(rx2.recv().unwrap(), 2);
}
_ => panic!("Unexpected index"),
}
}
#[test]
fn test_select_mixed_operations() {
let (tx1, rx1) = create_bounded_channel(1);
let (tx2, rx2) = create_bounded_channel(1);
tx1.send(1).unwrap();
let mut sel = Select::new();
let recv_idx = sel.recv(&rx1);
let send_idx = sel.send(&tx2);
let op = sel.select();
match op.index() {
i if i == recv_idx => {
let value = op.recv(&rx1).unwrap();
assert_eq!(value, 1);
}
i if i == send_idx => {
op.send(&tx2, 2).unwrap();
assert_eq!(rx2.recv().unwrap(), 2);
}
_ => panic!("Unexpected index"),
}
}
#[test]
fn test_select_try_select() {
let (tx, rx) = create_bounded_channel::<i32>(1);
let mut sel = Select::new();
let recv_idx = sel.recv(&rx);
assert!(sel.try_select().is_err());
tx.send(42).unwrap();
let op = sel.try_select().unwrap();
assert_eq!(op.index(), recv_idx);
let value = op.recv(&rx).unwrap();
assert_eq!(value, 42);
}
#[test]
fn test_select_timeout() {
let (_tx, rx) = create_bounded_channel::<i32>(1);
let mut sel = Select::new();
sel.recv(&rx);
let result = sel.select_timeout(Duration::from_millis(10));
assert!(result.is_err());
}
#[test]
fn test_select_timeout_success() {
let (tx, rx) = create_bounded_channel(1);
let tx_clone = tx.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx_clone.send(42).unwrap();
});
let mut sel = Select::new();
let recv_idx = sel.recv(&rx);
let result = sel.select_timeout(Duration::from_millis(100));
assert!(result.is_ok());
let op = result.unwrap();
assert_eq!(op.index(), recv_idx);
let value = op.recv(&rx).unwrap();
assert_eq!(value, 42);
}
#[test]
fn test_select_ready() {
let (tx, rx) = create_bounded_channel(1);
let mut sel = Select::new();
let recv_idx = sel.recv(&rx);
tx.send(42).unwrap();
assert_eq!(sel.ready(), recv_idx);
}
#[test]
fn test_select_ready_timeout() {
let (_tx, rx) = create_bounded_channel::<i32>(1);
let mut sel = Select::new();
sel.recv(&rx);
let result = sel.ready_timeout(Duration::from_millis(10));
assert!(result.is_err());
}
#[test]
fn test_select_ready_timeout_success() {
let (tx, rx) = create_bounded_channel(1);
let tx_clone = tx.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx_clone.send(42).unwrap();
});
let mut sel = Select::new();
let recv_idx = sel.recv(&rx);
let result = sel.ready_timeout(Duration::from_millis(100));
assert!(result.is_ok());
assert_eq!(result.unwrap(), recv_idx);
}
#[test]
fn test_select_remove() {
let (tx1, rx1) = create_bounded_channel(1);
let (tx2, rx2) = create_bounded_channel(1);
let mut sel = Select::new();
let idx1 = sel.recv(&rx1);
let idx2 = sel.recv(&rx2);
sel.remove(idx1);
tx1.send(1).unwrap();
tx2.send(2).unwrap();
let op = sel.select();
assert_eq!(op.index(), idx2);
let value = op.recv(&rx2).unwrap();
assert_eq!(value, 2);
}
#[test]
fn test_select_biased() {
let (tx1, rx1) = create_bounded_channel(1);
let (tx2, rx2) = create_bounded_channel(1);
tx1.send(1).unwrap();
tx2.send(2).unwrap();
let mut sel = Select::new_biased();
let idx1 = sel.recv(&rx1);
let _idx2 = sel.recv(&rx2);
let op = sel.select();
assert_eq!(op.index(), idx1);
let value = op.recv(&rx1).unwrap();
assert_eq!(value, 1);
}
#[test]
fn test_select_multiple_channels() {
let (tx1, rx1) = create_bounded_channel(1);
let (tx2, rx2) = create_bounded_channel(1);
let (tx3, rx3) = create_bounded_channel(1);
tx1.send(1).unwrap();
tx2.send(2).unwrap();
tx3.send(3).unwrap();
let mut sel = Select::new();
let idx1 = sel.recv(&rx1);
let idx2 = sel.recv(&rx2);
let idx3 = sel.recv(&rx3);
let mut received = Vec::new();
for _ in 0..3 {
let op = sel.select();
match op.index() {
i if i == idx1 => {
let value = op.recv(&rx1).unwrap();
received.push(value);
}
i if i == idx2 => {
let value = op.recv(&rx2).unwrap();
received.push(value);
}
i if i == idx3 => {
let value = op.recv(&rx3).unwrap();
received.push(value);
}
_ => panic!("Unexpected index"),
}
}
received.sort();
assert_eq!(received, vec![1, 2, 3]);
}
#[test]
fn test_select_send_blocking() {
let (tx1, rx1) = create_bounded_channel(1);
let (tx2, rx2) = create_bounded_channel(1);
tx1.send(1).unwrap();
tx2.send(2).unwrap();
let mut sel = Select::new();
let _send_idx1 = sel.send(&tx1);
let _send_idx2 = sel.send(&tx2);
let tx1_clone = tx1.clone();
let tx2_clone = tx2.clone();
let rx1_clone = rx1.clone();
let _rx2_clone = rx2.clone();
let sender_handle = thread::spawn(move || {
let mut sel = Select::new();
let send_idx1 = sel.send(&tx1_clone);
let send_idx2 = sel.send(&tx2_clone);
let op = sel.select();
match op.index() {
i if i == send_idx1 => {
op.send(&tx1_clone, 3).unwrap();
}
i if i == send_idx2 => {
op.send(&tx2_clone, 4).unwrap();
}
_ => panic!("Unexpected index"),
}
});
thread::sleep(Duration::from_millis(10));
let _ = rx1_clone.recv().unwrap();
sender_handle.join().unwrap();
let value = rx1.recv().unwrap();
assert_eq!(value, 3);
}
#[test]
fn test_select_receive_blocking() {
let (tx1, rx1) = create_bounded_channel::<i32>(1);
let (tx2, rx2) = create_bounded_channel::<i32>(1);
let mut sel = Select::new();
let _recv_idx1 = sel.recv(&rx1);
let _recv_idx2 = sel.recv(&rx2);
let _tx1_clone = tx1.clone();
let _tx2_clone = tx2.clone();
let rx1_clone = rx1.clone();
let rx2_clone = rx2.clone();
let receiver_handle = thread::spawn(move || {
let mut sel = Select::new();
let recv_idx1 = sel.recv(&rx1_clone);
let recv_idx2 = sel.recv(&rx2_clone);
let op = sel.select();
match op.index() {
i if i == recv_idx1 => {
let value = op.recv(&rx1_clone).unwrap();
value
}
i if i == recv_idx2 => {
let value = op.recv(&rx2_clone).unwrap();
value
}
_ => panic!("Unexpected index"),
}
});
thread::sleep(Duration::from_millis(10));
tx1.send(42).unwrap();
let received_value = receiver_handle.join().unwrap();
assert_eq!(received_value, 42);
}
#[test]
fn test_select_disconnected_send() {
let (tx, rx) = create_bounded_channel::<i32>(1);
drop(rx);
let mut sel = Select::new();
let send_idx = sel.send(&tx);
let op = sel.select();
assert_eq!(op.index(), send_idx);
let result = op.send(&tx, 42);
assert!(result.is_err());
}
#[test]
fn test_select_disconnected_recv() {
let (tx, rx) = create_bounded_channel::<i32>(1);
drop(tx);
let mut sel = Select::new();
let recv_idx = sel.recv(&rx);
let op = sel.select();
assert_eq!(op.index(), recv_idx);
let result = op.recv(&rx);
assert!(result.is_err());
}
#[test]
fn test_select_default() {
let (tx, rx) = create_bounded_channel(1);
tx.send(42).unwrap();
let mut sel = Select::default(); let recv_idx = sel.recv(&rx);
let op = sel.select();
assert_eq!(op.index(), recv_idx);
let value = op.recv(&rx).unwrap();
assert_eq!(value, 42);
}
#[test]
fn test_select_complex_scenario() {
let (tx1, rx1) = create_bounded_channel(1);
let (tx2, rx2) = create_bounded_channel(1);
let (tx3, rx3) = create_bounded_channel(1);
tx1.send(1).unwrap();
tx2.send(2).unwrap();
let mut sel = Select::new();
let recv_idx1 = sel.recv(&rx1);
let recv_idx2 = sel.recv(&rx2);
let send_idx3 = sel.send(&tx3);
let mut received = Vec::new();
let op = sel.select();
match op.index() {
i if i == recv_idx1 => {
let value = op.recv(&rx1).unwrap();
received.push(value);
}
i if i == recv_idx2 => {
let value = op.recv(&rx2).unwrap();
received.push(value);
}
_ => panic!("Unexpected index for first selection"),
}
let op = sel.select();
match op.index() {
i if i == recv_idx1 => {
let value = op.recv(&rx1).unwrap();
received.push(value);
}
i if i == recv_idx2 => {
let value = op.recv(&rx2).unwrap();
received.push(value);
}
_ => panic!("Unexpected index for second selection"),
}
let op = sel.select();
assert_eq!(op.index(), send_idx3);
op.send(&tx3, 3).unwrap();
received.sort();
assert_eq!(received, vec![1, 2]);
assert_eq!(rx3.recv().unwrap(), 3);
}
}