pub use crate::error::{RecvError, SendError, TryRecvError, TrySendError};
mod core;
use self::core::{OneShotShared, STATE_SENT, STATE_TAKEN};
use ::core::sync::atomic::Ordering;
use std::fmt; use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
pub fn oneshot<T>() -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(OneShotShared::new());
(
Sender {
shared: Arc::clone(&shared),
},
Receiver {
shared,
_phantom_not_sync: PhantomData, },
)
}
pub struct Sender<T> {
shared: Arc<OneShotShared<T>>,
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender")
.field("shared", &self.shared) .finish()
}
}
pub struct Receiver<T> {
shared: Arc<OneShotShared<T>>,
_phantom_not_sync: PhantomData<*mut ()>,
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver").field("shared", &self.shared).finish()
}
}
impl<T> Sender<T> {
pub fn send(self, value: T) -> Result<(), TrySendError<T>> {
let result = self.shared.send(value);
result
}
pub fn is_closed(&self) -> bool {
self.shared.receiver_dropped.load(std::sync::atomic::Ordering::Acquire)
}
pub fn is_sent(&self) -> bool {
let state = self.shared.state.load(std::sync::atomic::Ordering::Acquire);
state == STATE_SENT || state == STATE_TAKEN
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.shared.increment_senders();
Sender {
shared: Arc::clone(&self.shared),
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.shared.decrement_senders();
}
}
impl<T> Receiver<T> {
pub fn recv(&self) -> ReceiveFuture<'_, T> {
ReceiveFuture {
receiver_shared: &self.shared,
} }
pub fn try_recv(&self) -> Result<T, TryRecvError> {
self.shared.try_recv()
}
pub fn is_closed(&self) -> bool {
let state = self.shared.state.load(std::sync::atomic::Ordering::Acquire);
if state == core::STATE_TAKEN || state == core::STATE_CLOSED {
return true; }
if self.shared.sender_count.load(std::sync::atomic::Ordering::Acquire) == 0 {
return state == core::STATE_EMPTY || state == core::STATE_WRITING;
}
false }
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.shared.mark_receiver_dropped();
if self
.shared
.state
.compare_exchange(
STATE_SENT,
STATE_TAKEN, Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
let mut guard = self.shared.value_slot.lock().unwrap_or_else(|e| e.into_inner());
if let Some(mut mu_value) = guard.take() {
unsafe {
mu_value.assume_init_drop();
}
}
}
}
}
#[must_use = "futures do nothing unless you .await or poll them"]
pub struct ReceiveFuture<'a, T> {
receiver_shared: &'a Arc<OneShotShared<T>>,
}
impl<'a, T> Future for ReceiveFuture<'a, T> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let shared_ref: &OneShotShared<T> = &*self.receiver_shared;
shared_ref.poll_recv(cx)
}
}
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}
impl<T> Receiver<T> {
}
#[cfg(test)]
mod tests;