use std::marker::PhantomData;
use std::mem;
use std::sync::Arc;
mod lockfree;
pub use crate::error::{RecvError, SendError, TryRecvError, TrySendError};
pub use lockfree::{AsyncConsumer, AsyncProducer, Consumer, Producer, RecvFuture, SendFuture};
pub fn channel<T: Send>() -> (Producer<T>, Consumer<T>) {
let shared = Arc::new(lockfree::MpscShared::new());
let producer = Producer {
shared: Arc::clone(&shared),
};
let consumer = Consumer {
shared,
_phantom: PhantomData,
};
(producer, consumer)
}
pub fn channel_async<T: Send>() -> (AsyncProducer<T>, AsyncConsumer<T>) {
let shared = Arc::new(lockfree::MpscShared::new());
let producer = AsyncProducer {
shared: Arc::clone(&shared),
};
let consumer = AsyncConsumer {
shared,
_phantom: PhantomData,
};
(producer, consumer)
}
impl<T: Send> Producer<T> {
pub fn to_async(self) -> AsyncProducer<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
AsyncProducer { shared }
}
}
impl<T: Send> Consumer<T> {
pub fn to_async(self) -> AsyncConsumer<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
AsyncConsumer {
shared,
_phantom: PhantomData,
}
}
}
impl<T: Send> AsyncProducer<T> {
pub fn to_sync(self) -> Producer<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
Producer { shared }
}
}
impl<T: Send> AsyncConsumer<T> {
pub fn to_sync(self) -> Consumer<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
Consumer {
shared,
_phantom: PhantomData,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
use tokio::runtime::Runtime;
#[test]
fn sync_to_sync_blocking() {
let (tx, mut rx) = channel::<i32>();
let handle = thread::spawn(move || rx.recv().unwrap());
thread::sleep(Duration::from_millis(50));
tx.send(123).unwrap();
assert_eq!(handle.join().unwrap(), 123);
}
#[tokio::test]
async fn async_to_async() {
let (tx, mut rx) = channel_async::<i32>();
let handle = tokio::spawn(async move { rx.recv().await.unwrap() });
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(456).await.unwrap();
assert_eq!(handle.await.unwrap(), 456);
}
#[tokio::test]
async fn sync_to_async_conversion() {
let (tx_async, mut rx_async) = channel_async::<i32>();
let tx_sync = tx_async.to_sync();
let producer_handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx_sync.send(789).unwrap();
});
assert_eq!(rx_async.recv().await.unwrap(), 789);
producer_handle.join().unwrap();
}
#[test]
fn async_to_sync_conversion() {
let (tx_async, rx_sync) = channel_async::<i32>();
let mut rx_sync = rx_sync.to_sync();
let rt = Runtime::new().unwrap();
rt.spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
tx_async.send(101).await.unwrap();
});
assert_eq!(rx_sync.recv().unwrap(), 101);
}
}