#![cfg_attr(docsrs, feature(doc_cfg))]
#![doc = include_str!("../README.md")]
#![doc = include_str!("example.md")]
use mutex::PinnedCondvar as Condvar;
use mutex::PinnedMutex as Mutex;
use mutex::PinnedMutexGuard as MutexGuard;
use pin_project::pin_project;
use pin_project::pinned_drop;
#[cfg(feature = "parking_lot")]
use pinned_mutex::parking_lot as mutex;
#[cfg(not(feature = "parking_lot"))]
use pinned_mutex::std as mutex;
use std::cmp::min;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::iter::Peekable;
use std::ops::AsyncFnOnce;
use std::pin::Pin;
use std::sync::OnceLock;
use std::task::Context;
use std::task::Poll;
use wakerset::ExtractedWakers;
use wakerset::WakerList;
use wakerset::WakerSlot;
const UNBOUNDED_CAPACITY: usize = usize::MAX;
macro_rules! derive_clone {
($t:ident) => {
impl<T> Clone for $t<T> {
fn clone(&self) -> Self {
Self {
core: self.core.clone(),
}
}
}
};
}
#[derive(Debug)]
#[pin_project]
struct StateBase {
capacity: usize,
closed: bool,
#[pin]
tx_wakers: WakerList,
#[pin]
rx_wakers: WakerList,
}
impl StateBase {
fn target_capacity(&self) -> usize {
self.capacity
}
fn pending_tx<T>(
self: Pin<&mut StateBase>,
slot: Pin<&mut WakerSlot>,
cx: &mut Context,
) -> Poll<T> {
self.project().tx_wakers.link(slot, cx.waker().clone());
Poll::Pending
}
fn pending_rx<T>(
self: Pin<&mut StateBase>,
slot: Pin<&mut WakerSlot>,
cx: &mut Context,
) -> Poll<T> {
self.project().rx_wakers.link(slot, cx.waker().clone());
Poll::Pending
}
}
#[derive(Debug)]
#[pin_project]
struct State<T> {
#[pin]
base: StateBase,
queue: VecDeque<T>,
}
impl<T> State<T> {
fn has_capacity(&self) -> bool {
self.queue.len() < self.target_capacity()
}
fn base(self: Pin<&mut Self>) -> Pin<&mut StateBase> {
self.project().base
}
}
impl<T> std::ops::Deref for State<T> {
type Target = StateBase;
fn deref(&self) -> &Self::Target {
&self.base
}
}
impl<T> std::ops::DerefMut for State<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.base
}
}
#[derive(Debug)]
#[pin_project]
struct Core<T> {
#[pin]
state: Mutex<State<T>>,
not_empty: OnceLock<Condvar>,
not_full: OnceLock<Condvar>,
}
impl<T> Core<T> {
fn block_until_not_empty(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
fn condition<T>(s: Pin<&mut State<T>>) -> bool {
!s.closed && s.queue.is_empty()
}
let mut state = self.project_ref().state.lock();
if !condition(state.as_mut()) {
return state;
}
let not_empty = self.not_empty.get_or_init(Default::default);
not_empty.wait_while(state, condition)
}
fn block_until_not_full(self: Pin<&Self>) -> MutexGuard<'_, State<T>> {
fn condition<T>(s: Pin<&mut State<T>>) -> bool {
!s.closed && !s.has_capacity()
}
let mut state = self.project_ref().state.lock();
if !condition(state.as_mut()) {
return state;
}
let not_full = self.not_full.get_or_init(Default::default);
not_full.wait_while(state, condition)
}
fn wake_rx_and_block_while_full<'a>(
self: Pin<&'a Self>,
mut state: MutexGuard<'a, State<T>>,
) -> MutexGuard<'a, State<T>> {
let cvar = self.not_empty.get();
let round = state
.as_mut()
.project()
.base
.project()
.rx_wakers
.begin_extraction();
let mut wakers = ExtractedWakers::new();
loop {
let more = state
.as_mut()
.project()
.base
.project()
.rx_wakers
.extract_some_wakers(round, &mut wakers);
drop(state);
wakers.wake_all();
if !more {
break;
}
state = self.project_ref().state.lock();
}
if let Some(cvar) = cvar {
cvar.notify_all();
}
state = self.project_ref().state.lock();
let not_full = self.not_full.get_or_init(Default::default);
not_full.wait_while(state, |s| !s.closed && !s.has_capacity())
}
fn wake_all_tx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
let cvar = self.not_full.get();
if state.as_mut().project().base.project().tx_wakers.is_empty() {
drop(state);
} else {
let round = state
.as_mut()
.project()
.base
.project()
.tx_wakers
.begin_extraction();
let mut wakers = ExtractedWakers::new();
loop {
let more = state
.as_mut()
.project()
.base
.project()
.tx_wakers
.extract_some_wakers(round, &mut wakers);
drop(state);
wakers.wake_all();
if !more {
break;
}
state = self.project_ref().state.lock();
}
}
if let Some(cvar) = cvar {
cvar.notify_all();
}
}
fn wake_one_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
let cvar = self.not_empty.get();
if state.as_mut().project().base.project().rx_wakers.is_empty() {
drop(state);
} else {
let round = state
.as_mut()
.project()
.base
.project()
.rx_wakers
.begin_extraction();
let mut wakers = ExtractedWakers::new();
loop {
let more = state
.as_mut()
.project()
.base
.project()
.rx_wakers
.extract_some_wakers(round, &mut wakers);
drop(state);
wakers.wake_all();
if !more {
break;
}
state = self.project_ref().state.lock();
}
}
if let Some(cvar) = cvar {
cvar.notify_one();
}
}
fn wake_all_rx<'a>(self: Pin<&'a Self>, mut state: MutexGuard<'a, State<T>>) {
let cvar = self.not_empty.get();
let round = state
.as_mut()
.project()
.base
.project()
.rx_wakers
.begin_extraction();
let mut wakers = ExtractedWakers::new();
loop {
let more = state
.as_mut()
.project()
.base
.project()
.rx_wakers
.extract_some_wakers(round, &mut wakers);
drop(state);
wakers.wake_all();
if !more {
break;
}
state = self.project_ref().state.lock();
}
if let Some(cvar) = cvar {
cvar.notify_all();
}
}
}
impl<T> splitrc::Notify for Core<T> {
fn last_tx_did_drop_pinned(self: Pin<&Self>) {
let mut state = self.project_ref().state.lock();
*state.as_mut().base().project().closed = true;
self.wake_all_rx(state);
}
fn last_rx_did_drop_pinned(self: Pin<&Self>) {
let mut state = self.project_ref().state.lock();
*state.as_mut().base().project().closed = true;
state.as_mut().project().queue.clear();
self.wake_all_tx(state);
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct SendError<T>(pub T);
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "failed to send value on channel")
}
}
impl<T: fmt::Debug> std::error::Error for SendError<T> {}
#[derive(Debug)]
pub struct SyncSender<T> {
core: Pin<splitrc::Tx<Core<T>>>,
}
derive_clone!(SyncSender);
impl<T> SyncSender<T> {
pub fn into_async(self) -> Sender<T> {
Sender { core: self.core }
}
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
let mut state = self.core.as_ref().block_until_not_full();
if state.closed {
assert!(state.as_ref().project_ref().queue.is_empty());
return Err(SendError(value));
}
state.as_mut().project().queue.push_back(value);
self.core.as_ref().wake_one_rx(state);
Ok(())
}
pub fn send_iter<I>(&self, values: I) -> Result<(), SendError<()>>
where
I: IntoIterator<Item = T>,
{
let mut values = values.into_iter();
let Some(mut value) = values.next() else {
return Ok(());
};
let mut sent_count = 0usize;
let mut state = self.core.as_ref().block_until_not_full();
'outer: loop {
if state.closed {
assert!(state.queue.is_empty());
return Err(SendError(()));
}
debug_assert!(state.has_capacity());
state.as_mut().project().queue.push_back(value);
sent_count += 1;
loop {
match values.next() {
Some(v) => {
if state.has_capacity() {
state.as_mut().project().queue.push_back(v);
sent_count += 1;
} else {
value = v;
state = self.core.as_ref().wake_rx_and_block_while_full(state);
continue 'outer;
}
}
None => {
if sent_count == 1 {
self.core.as_ref().wake_one_rx(state);
} else {
self.core.as_ref().wake_all_rx(state);
}
return Ok(());
}
}
}
}
}
pub fn send_vec(&self, mut values: Vec<T>) -> Result<Vec<T>, SendError<Vec<T>>> {
match self.send_iter(values.drain(..)) {
Ok(_) => Ok(values),
Err(_) => Err(SendError(values)),
}
}
pub fn autobatch<'a, F, R>(&'a mut self, batch_limit: usize, f: F) -> Result<R, SendError<()>>
where
F: (FnOnce(&mut SyncBatchSender<'a, T>) -> Result<R, SendError<()>>),
{
let mut tx = SyncBatchSender {
sender: self,
capacity: batch_limit,
buffer: Vec::with_capacity(batch_limit),
};
let r = f(&mut tx)?;
tx.drain()?;
Ok(r)
}
}
#[derive(Debug)]
pub struct SyncBatchSender<'a, T> {
sender: &'a mut SyncSender<T>,
capacity: usize,
buffer: Vec<T>,
}
impl<T> SyncBatchSender<'_, T> {
pub fn send(&mut self, value: T) -> Result<(), SendError<()>> {
self.buffer.push(value);
if self.buffer.len() == self.capacity {
self.drain()
} else {
Ok(())
}
}
pub fn send_iter<I: IntoIterator<Item = T>>(&mut self, values: I) -> Result<(), SendError<()>> {
for value in values.into_iter() {
self.send(value)?;
}
Ok(())
}
pub fn drain(&mut self) -> Result<(), SendError<()>> {
match self.sender.send_vec(std::mem::take(&mut self.buffer)) {
Ok(drained_vec) => {
self.buffer = drained_vec;
Ok(())
}
Err(_) => Err(SendError(())),
}
}
}
#[derive(Debug)]
pub struct Sender<T> {
core: Pin<splitrc::Tx<Core<T>>>,
}
derive_clone!(Sender);
impl<T> Sender<T> {
pub fn into_sync(self) -> SyncSender<T> {
SyncSender { core: self.core }
}
pub fn send(&self, value: T) -> impl Future<Output = Result<(), SendError<T>>> + '_ {
Send {
sender: self,
value: Some(value),
waker: WakerSlot::new(),
}
}
pub fn send_iter<'a, I>(
&'a self,
values: I,
) -> impl Future<Output = Result<(), SendError<()>>> + 'a
where
I: IntoIterator<Item = T> + 'a,
{
SendIter {
sender: self,
values: Some(values.into_iter().peekable()),
waker: WakerSlot::new(),
}
}
pub async fn autobatch<R>(
self,
batch_limit: usize,
f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<R, SendError<()>>,
) -> Result<R, SendError<()>> {
let mut tx = BatchSender {
sender: self,
batch_limit,
buffer: Vec::with_capacity(batch_limit),
};
let r = f(&mut tx).await?;
tx.drain().await?;
Ok(r)
}
pub async fn autobatch_or_cancel(
self,
capacity: usize,
f: impl AsyncFnOnce(&mut BatchSender<T>) -> Result<(), SendError<()>>,
) {
self.autobatch(capacity, f).await.unwrap_or(())
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project(PinnedDrop)]
struct Send<'a, T> {
sender: &'a Sender<T>,
value: Option<T>,
#[pin]
waker: WakerSlot,
}
#[pinned_drop]
impl<T> PinnedDrop for Send<'_, T> {
fn drop(mut self: Pin<&mut Self>) {
if self.waker.is_linked() {
let mut state = self.sender.core.as_ref().project_ref().state.lock();
state
.as_mut()
.base()
.project()
.tx_wakers
.unlink(self.project().waker);
}
}
}
impl<T> Future for Send<'_, T> {
type Output = Result<(), SendError<T>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.sender.core.as_ref().project_ref().state.lock();
if state.closed {
return Poll::Ready(Err(SendError(self.project().value.take().unwrap())));
}
if state.has_capacity() {
state
.as_mut()
.project()
.queue
.push_back(self.as_mut().project().value.take().unwrap());
self.project().sender.core.as_ref().wake_one_rx(state);
Poll::Ready(Ok(()))
} else {
state.as_mut().base().pending_tx(self.project().waker, cx)
}
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project(PinnedDrop)]
struct SendIter<'a, T, I: Iterator<Item = T>> {
sender: &'a Sender<T>,
values: Option<Peekable<I>>,
#[pin]
waker: WakerSlot,
}
#[pinned_drop]
impl<T, I: Iterator<Item = T>> PinnedDrop for SendIter<'_, T, I> {
fn drop(mut self: Pin<&mut Self>) {
if self.waker.is_linked() {
let mut state = self.sender.core.as_ref().project_ref().state.lock();
state
.as_mut()
.base()
.project()
.tx_wakers
.unlink(self.project().waker);
}
}
}
impl<T, I: Iterator<Item = T>> Future for SendIter<'_, T, I> {
type Output = Result<(), SendError<()>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
{
let pi = self.as_mut().project().values.as_mut().unwrap();
if pi.peek().is_none() {
return Poll::Ready(Ok(()));
}
}
let mut state = self.sender.core.as_ref().project_ref().state.lock();
let pi = self.as_mut().project().values.as_mut().unwrap();
debug_assert!(pi.peek().is_some());
if state.closed {
Poll::Ready(Err(SendError(())))
} else if !state.has_capacity() {
state.as_mut().base().pending_tx(self.project().waker, cx)
} else {
debug_assert!(state.has_capacity());
state.as_mut().project().queue.push_back(pi.next().unwrap());
while state.has_capacity() {
match pi.next() {
Some(value) => {
state.as_mut().project().queue.push_back(value);
}
None => {
self.sender.core.as_ref().wake_all_rx(state);
return Poll::Ready(Ok(()));
}
}
}
if pi.peek().is_none() {
self.sender.core.as_ref().wake_all_rx(state);
return Poll::Ready(Ok(()));
}
let pending = state
.as_mut()
.base()
.pending_tx(self.as_mut().project().waker, cx);
self.sender.core.as_ref().wake_all_rx(state);
pending
}
}
}
pub struct BatchSender<T> {
sender: Sender<T>,
batch_limit: usize,
buffer: Vec<T>,
}
impl<T> BatchSender<T> {
pub async fn send(&mut self, value: T) -> Result<(), SendError<()>> {
self.buffer.push(value);
if self.buffer.len() == self.batch_limit {
self.drain().await?;
}
Ok(())
}
async fn drain(&mut self) -> Result<(), SendError<()>> {
self.sender.send_iter(self.buffer.drain(..)).await?;
assert!(self.buffer.is_empty());
Ok(())
}
}
#[derive(Debug)]
pub struct Receiver<T> {
core: Pin<splitrc::Rx<Core<T>>>,
}
derive_clone!(Receiver);
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project(PinnedDrop)]
struct Recv<'a, T> {
receiver: &'a Receiver<T>,
#[pin]
waker: WakerSlot,
}
#[pinned_drop]
impl<T> PinnedDrop for Recv<'_, T> {
fn drop(mut self: Pin<&mut Self>) {
if self.waker.is_linked() {
let mut state = self.receiver.core.as_ref().project_ref().state.lock();
state
.as_mut()
.base()
.project()
.rx_wakers
.unlink(self.project().waker);
}
}
}
impl<T> Future for Recv<'_, T> {
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.receiver.core.as_ref().project_ref().state.lock();
match state.as_mut().project().queue.pop_front() {
Some(value) => {
self.receiver.core.as_ref().wake_all_tx(state);
Poll::Ready(Some(value))
}
None => {
if state.closed {
Poll::Ready(None)
} else {
state.as_mut().base().pending_rx(self.project().waker, cx)
}
}
}
}
}
#[must_use = "futures do nothing unless you .await or poll them"]
#[pin_project(PinnedDrop)]
struct RecvBatch<'a, T> {
receiver: &'a Receiver<T>,
element_limit: usize,
#[pin]
waker: WakerSlot,
}
#[pinned_drop]
impl<T> PinnedDrop for RecvBatch<'_, T> {
fn drop(mut self: Pin<&mut Self>) {
if self.waker.is_linked() {
let mut state = self.receiver.core.as_ref().project_ref().state.lock();
state
.as_mut()
.base()
.project()
.rx_wakers
.unlink(self.project().waker);
}
}
}
impl<T> Future for RecvBatch<'_, T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.receiver.core.as_ref().project_ref().state.lock();
let q = &mut state.as_mut().project().queue;
let q_len = q.len();
if q_len == 0 {
if state.closed {
return Poll::Ready(Vec::new());
} else {
return state.as_mut().base().pending_rx(self.project().waker, cx);
}
}
let capacity = min(q_len, self.element_limit);
let v = Vec::from_iter(q.drain(..capacity));
self.receiver.core.as_ref().wake_all_tx(state);
Poll::Ready(v)
}
}
#[must_use = "futures do nothing unless you .await or poll them"]
#[pin_project(PinnedDrop)]
struct RecvVec<'a, T> {
receiver: &'a Receiver<T>,
element_limit: usize,
vec: &'a mut Vec<T>,
#[pin]
waker: WakerSlot,
}
#[pinned_drop]
impl<T> PinnedDrop for RecvVec<'_, T> {
fn drop(mut self: Pin<&mut Self>) {
if self.waker.is_linked() {
let mut state = self.receiver.core.as_ref().project_ref().state.lock();
state
.as_mut()
.base()
.project()
.rx_wakers
.unlink(self.project().waker);
}
}
}
impl<T> Future for RecvVec<'_, T> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.receiver.core.as_ref().project_ref().state.lock();
let q = &mut state.as_mut().project().queue;
let q_len = q.len();
if q_len == 0 {
if state.closed {
assert!(self.vec.is_empty());
return Poll::Ready(());
} else {
return state.as_mut().base().pending_rx(self.project().waker, cx);
}
}
let capacity = min(q_len, self.element_limit);
self.as_mut().project().vec.extend(q.drain(..capacity));
self.project().receiver.core.as_ref().wake_all_tx(state);
Poll::Ready(())
}
}
impl<T> Receiver<T> {
pub fn into_sync(self) -> SyncReceiver<T> {
SyncReceiver { core: self.core }
}
pub fn recv(&self) -> impl Future<Output = Option<T>> + '_ {
Recv {
receiver: self,
waker: WakerSlot::new(),
}
}
pub fn recv_batch(&self, element_limit: usize) -> impl Future<Output = Vec<T>> + '_ {
RecvBatch {
receiver: self,
element_limit,
waker: WakerSlot::new(),
}
}
pub fn recv_vec<'a>(
&'a self,
element_limit: usize,
vec: &'a mut Vec<T>,
) -> impl Future<Output = ()> + 'a {
vec.clear();
RecvVec {
receiver: self,
element_limit,
vec,
waker: WakerSlot::new(),
}
}
#[cfg(feature = "futures-core")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures-core")))]
pub fn stream<'a>(&'a self) -> Stream<'a, T> {
let recv = Recv {
receiver: self,
waker: WakerSlot::new(),
};
Stream { recv }
}
}
#[cfg(feature = "futures-core")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures-core")))]
#[must_use = "streams do nothing unless you `.await` or poll them"]
#[pin_project]
pub struct Stream<'a, T> {
#[pin]
recv: Recv<'a, T>,
}
#[cfg(feature = "futures-core")]
impl<'a, T> futures_core::Stream for Stream<'a, T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().recv.poll(cx)
}
}
#[derive(Debug)]
pub struct SyncReceiver<T> {
core: Pin<splitrc::Rx<Core<T>>>,
}
derive_clone!(SyncReceiver);
impl<T> SyncReceiver<T> {
pub fn into_async(self) -> Receiver<T> {
Receiver { core: self.core }
}
pub fn recv(&self) -> Option<T> {
let mut state = self.core.as_ref().block_until_not_empty();
match state.as_mut().project().queue.pop_front() {
Some(value) => {
self.core.as_ref().wake_all_tx(state);
Some(value)
}
None => {
assert!(state.closed);
None
}
}
}
pub fn recv_batch(&self, element_limit: usize) -> Vec<T> {
let mut state = self.core.as_ref().block_until_not_empty();
let q = &mut state.as_mut().project().queue;
let q_len = q.len();
if q_len == 0 {
assert!(state.closed);
return Vec::new();
}
let capacity = min(q_len, element_limit);
let v = Vec::from_iter(q.drain(..capacity));
self.core.as_ref().wake_all_tx(state);
v
}
pub fn recv_vec(&self, element_limit: usize, vec: &mut Vec<T>) {
vec.clear();
let mut state = self.core.as_ref().block_until_not_empty();
let q = &mut state.as_mut().project().queue;
let q_len = q.len();
if q_len == 0 {
assert!(state.closed);
return;
}
let capacity = min(q_len, element_limit);
vec.extend(q.drain(..capacity));
self.core.as_ref().wake_all_tx(state);
}
}
impl<T> Iterator for SyncReceiver<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.recv()
}
}
pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
Builder::new().bounded(capacity).build_async()
}
pub fn bounded_sync<T>(capacity: usize) -> (SyncSender<T>, SyncReceiver<T>) {
let (tx, rx) = bounded(capacity);
(tx.into_sync(), rx.into_sync())
}
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
Builder::new().build_async()
}
pub fn unbounded_sync<T>() -> (SyncSender<T>, SyncReceiver<T>) {
let (tx, rx) = unbounded();
(tx.into_sync(), rx.into_sync())
}
#[derive(Debug, Default)]
pub struct Builder {
capacity: Option<usize>,
preallocate: bool,
}
impl Builder {
pub fn new() -> Self {
Default::default()
}
pub fn bounded(&mut self, capacity: usize) -> &mut Self {
self.capacity = Some(capacity);
self
}
pub fn preallocate(&mut self) -> &mut Self {
self.preallocate = true;
self
}
pub fn build_async<T>(&mut self) -> (Sender<T>, Receiver<T>) {
let capacity;
let queue;
match self.capacity {
Some(c) => {
capacity = c;
queue = if self.preallocate {
VecDeque::with_capacity(capacity)
} else {
VecDeque::new()
};
}
None => {
capacity = UNBOUNDED_CAPACITY;
queue = VecDeque::new();
}
};
let core = Core {
state: Mutex::new(State {
base: StateBase {
capacity,
closed: false,
tx_wakers: WakerList::new(),
rx_wakers: WakerList::new(),
},
queue,
}),
not_empty: OnceLock::new(),
not_full: OnceLock::new(),
};
let (core_tx, core_rx) = splitrc::pin(core);
(Sender { core: core_tx }, Receiver { core: core_rx })
}
pub fn build_sync<T>(&mut self) -> (SyncSender<T>, SyncReceiver<T>) {
let (tx, rx) = self.build_async();
(tx.into_sync(), rx.into_sync())
}
}