use super::core::{SyncWaiter, WaiterData, STATE_CANCELLED, STATE_SUCCESS, STATE_WAITING};
use super::{Receiver, Sender};
use crate::error::{RecvError, SendError, TryRecvError, TrySendError};
use crate::mpmc_v2::backoff;
use crate::RecvErrorTimeout;
use std::sync::atomic::{AtomicU8, Ordering};
use std::thread;
use std::time::{Duration, Instant};
pub(crate) fn send_sync<T: Send>(sender: &Sender<T>, item: T) -> Result<(), SendError> {
let mut current_item_opt = Some(item);
loop {
let item_to_send = current_item_opt
.take()
.expect("Item should always exist at the start of the loop");
match sender.shared.try_send_core(item_to_send) {
Ok(()) => return Ok(()), Err(TrySendError::Closed(_)) => {
return Err(SendError::Closed);
}
Err(TrySendError::Full(returned_item)) => {
current_item_opt = Some(returned_item);
}
Err(TrySendError::Sent(_)) => unreachable!(),
}
let done_flag = AtomicU8::new(STATE_WAITING);
let done_ptr = &done_flag as *const AtomicU8;
let is_rendezvous = sender.shared.capacity == 0;
let waiter = SyncWaiter {
thread: thread::current(),
data: if is_rendezvous {
Some(WaiterData::SenderItem(current_item_opt.take()))
} else {
None
},
state: done_ptr,
};
{
let mut guard = sender.shared.internal.lock();
if !guard.waiting_async_receivers.is_empty()
|| !guard.waiting_sync_receivers.is_empty()
|| (sender.shared.capacity > 0 && guard.queue.len() < sender.shared.capacity)
{
if is_rendezvous {
let mut temp_waiter = waiter;
current_item_opt = temp_waiter.take_item_from_sender_slot();
}
continue;
}
if guard.receiver_count == 0 {
return Err(SendError::Closed);
}
guard.waiting_sync_senders.push_back(waiter);
}
backoff::adaptive_wait(|| done_flag.load(Ordering::Acquire) == STATE_SUCCESS);
if is_rendezvous {
if sender.is_closed() && done_flag.load(Ordering::Acquire) != STATE_SUCCESS {
return Err(SendError::Closed);
}
return Ok(());
}
}
}
pub(crate) fn recv_sync<T: Send>(receiver: &Receiver<T>) -> Result<T, RecvError> {
loop {
match receiver.shared.try_recv_core() {
Ok(item) => return Ok(item), Err(TryRecvError::Disconnected) => return Err(RecvError::Disconnected),
Err(TryRecvError::Empty) => {}
}
let done_flag = AtomicU8::new(STATE_WAITING);
let done_ptr = &done_flag as *const AtomicU8;
let waiter = SyncWaiter {
thread: thread::current(),
data: None, state: done_ptr,
};
{
let mut guard = receiver.shared.internal.lock();
if !guard.queue.is_empty()
|| (receiver.shared.capacity == 0
&& (!guard.waiting_sync_senders.is_empty() || !guard.waiting_async_senders.is_empty()))
{
continue; }
if guard.sender_count == 0 {
return Err(RecvError::Disconnected);
}
guard.waiting_sync_receivers.push_back(waiter);
}
backoff::adaptive_wait(|| done_flag.load(Ordering::Acquire) == STATE_SUCCESS);
}
}
pub(crate) fn recv_timeout_sync<T: Send>(
receiver: &Receiver<T>,
timeout: Duration,
) -> Result<T, RecvErrorTimeout> {
let start_time = Instant::now();
match receiver.shared.try_recv_core() {
Ok(item) => return Ok(item),
Err(TryRecvError::Disconnected) => return Err(RecvErrorTimeout::Disconnected),
Err(TryRecvError::Empty) => { }
}
let done_flag = AtomicU8::new(STATE_WAITING);
let done_ptr = &done_flag as *const AtomicU8;
let waiter = SyncWaiter {
thread: thread::current(),
data: None,
state: done_ptr,
};
{
let mut guard = receiver.shared.internal.lock();
if !guard.queue.is_empty()
|| (receiver.shared.capacity == 0
&& (!guard.waiting_sync_senders.is_empty() || !guard.waiting_async_senders.is_empty()))
{
drop(guard);
match receiver.shared.try_recv_core() {
Ok(item) => return Ok(item),
Err(TryRecvError::Disconnected) => return Err(RecvErrorTimeout::Disconnected),
Err(TryRecvError::Empty) => {}
}
} else {
if guard.sender_count == 0 {
return Err(RecvErrorTimeout::Disconnected);
}
guard.waiting_sync_receivers.push_back(waiter);
}
}
loop {
let elapsed = start_time.elapsed();
if elapsed >= timeout {
match done_flag.compare_exchange(
STATE_WAITING,
STATE_CANCELLED,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
let mut guard = receiver.shared.internal.lock();
guard.waiting_sync_receivers.retain(|w| w.state != done_ptr);
return Err(RecvErrorTimeout::Timeout);
}
Err(_) => {
match receiver.shared.try_recv_core() {
Ok(item) => return Ok(item),
Err(TryRecvError::Disconnected) => return Err(RecvErrorTimeout::Disconnected),
Err(TryRecvError::Empty) => unreachable!("state was SUCCESS but channel empty"),
}
}
}
}
let remaining_timeout = timeout - elapsed;
thread::park_timeout(remaining_timeout);
if done_flag.load(Ordering::Acquire) == STATE_SUCCESS {
match receiver.shared.try_recv_core() {
Ok(item) => return Ok(item),
Err(TryRecvError::Disconnected) => return Err(RecvErrorTimeout::Disconnected),
Err(TryRecvError::Empty) => {} }
}
}
}