use super::core::{SyncReceiverWaiter, SyncSenderWaiter};
use super::{Receiver, Sender};
use crate::error::{RecvError, SendError, TryRecvError, TrySendError};
use crate::internal::cache_padded::CachePadded;
use crate::mpmc_exp::backoff;
use crate::RecvErrorTimeout;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, Instant};
pub(crate) fn send_sync<T: Send>(sender: &Sender<T>, item: T) -> Result<(), SendError> {
let mut item = Some(item);
let is_rendezvous = sender.shared.capacity == 0;
loop {
let current_item = item.take().unwrap();
match sender.shared.try_send_core(current_item) {
Ok(()) => return Ok(()), Err(TrySendError::Closed(_)) => return Err(SendError::Closed),
Err(TrySendError::Sent(_)) => unreachable!(),
Err(TrySendError::Full(returned)) => {
item = Some(returned); }
}
let done_flag = CachePadded::new(AtomicBool::new(false));
{
let mut guard = sender.shared.internal.lock();
if !guard.waiting_async_receivers.is_empty() || !guard.waiting_sync_receivers.is_empty() || (sender.shared.capacity > 0 && guard.queue.len() < sender.shared.capacity)
{
drop(guard);
continue;
}
if guard.receiver_count == 0 {
return Err(SendError::Closed);
}
let waiter = SyncSenderWaiter {
thread: thread::current(),
data: if is_rendezvous { item.take() } else { None },
done: &done_flag,
};
guard.waiting_sync_senders.push_back(waiter);
}
backoff::adaptive_wait(&done_flag);
if is_rendezvous {
return if sender.is_closed() && !done_flag.load(Ordering::Acquire) {
Err(SendError::Closed)
} else {
Ok(())
};
}
}
}
pub(crate) fn recv_sync<T: Send>(receiver: &Receiver<T>) -> Result<T, RecvError> {
loop {
match receiver.shared.try_recv_core() {
Ok(item) => return Ok(item), Err(TryRecvError::Disconnected) => return Err(RecvError::Disconnected),
Err(TryRecvError::Empty) => {
}
}
let done_flag = CachePadded::new(AtomicBool::new(false));
{
let mut guard = receiver.shared.internal.lock();
if !guard.queue.is_empty()
|| (receiver.shared.capacity == 0
&& (!guard.waiting_sync_senders.is_empty() || !guard.waiting_async_senders.is_empty()))
{
continue; }
if guard.sender_count == 0 {
return Err(RecvError::Disconnected);
}
let waiter = SyncReceiverWaiter {
thread: thread::current(),
done: &done_flag,
};
guard.waiting_sync_receivers.push_back(waiter);
}
backoff::adaptive_wait(&done_flag);
}
}
pub(crate) fn recv_timeout_sync<T: Send>(
receiver: &Receiver<T>,
timeout: Duration,
) -> Result<T, RecvErrorTimeout> {
let start_time = Instant::now();
match receiver.shared.try_recv_core() {
Ok(item) => return Ok(item),
Err(TryRecvError::Disconnected) => return Err(RecvErrorTimeout::Disconnected),
Err(TryRecvError::Empty) => { }
}
loop {
let elapsed = start_time.elapsed();
if elapsed >= timeout {
return Err(RecvErrorTimeout::Timeout);
}
let remaining_timeout = timeout - elapsed;
let done_flag = CachePadded::new(AtomicBool::new(false));
{
let mut guard = receiver.shared.internal.lock();
if !guard.queue.is_empty()
|| (receiver.shared.capacity == 0
&& (!guard.waiting_sync_senders.is_empty() || !guard.waiting_async_senders.is_empty()))
{
drop(guard);
match receiver.shared.try_recv_core() {
Ok(item) => return Ok(item),
Err(TryRecvError::Disconnected) => return Err(RecvErrorTimeout::Disconnected),
Err(TryRecvError::Empty) => continue,
}
}
if guard.sender_count == 0 {
return Err(RecvErrorTimeout::Disconnected);
}
let waiter = SyncReceiverWaiter {
thread: thread::current(),
done: &done_flag,
};
guard.waiting_sync_receivers.push_back(waiter);
}
thread::park_timeout(remaining_timeout);
if done_flag.load(Ordering::Acquire) {
match receiver.shared.try_recv_core() {
Ok(item) => return Ok(item),
Err(TryRecvError::Disconnected) => return Err(RecvErrorTimeout::Disconnected),
Err(TryRecvError::Empty) => continue, }
}
}
}