use crate::{AsyncEvent, ChannelError, MutInPlaceCell};
use std::{cell::Cell, rc::Rc, time::Instant};
pub fn oneshot<T>() -> (SenderOneshot<T>, ReceiverOneshot<T>) {
let inner = Rc::new(OneshotChannel::new());
(
SenderOneshot {
inner: inner.clone(),
},
ReceiverOneshot { inner },
)
}
pub struct SenderOneshot<T> {
inner: Rc<OneshotChannel<T>>,
}
impl<T> SenderOneshot<T> {
pub fn send(self, value: T) -> Result<(), T> {
self.inner.send(value)
}
}
impl<T> Drop for SenderOneshot<T> {
fn drop(&mut self) {
self.inner.close();
}
}
impl<T> std::fmt::Debug for SenderOneshot<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SenderOneshot")
.field("complete", &self.inner.has_value.is_set())
.finish()
}
}
pub struct ReceiverOneshot<T> {
inner: Rc<OneshotChannel<T>>,
}
impl<T> ReceiverOneshot<T> {
pub async fn recv(self) -> Result<T, ChannelError> {
self.inner.recv().await
}
pub async fn recv_with_deadline(self, deadline: Option<Instant>) -> Result<T, ChannelError> {
self.inner.recv_with_deadline(deadline).await
}
pub fn try_recv(&self) -> Result<Option<T>, ChannelError> {
self.inner.try_recv()
}
}
impl<T> Drop for ReceiverOneshot<T> {
fn drop(&mut self) {
self.inner.close();
}
}
impl<T> std::fmt::Debug for ReceiverOneshot<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReceiverOneshot")
.field("complete", &self.inner.has_value.is_set())
.finish()
}
}
struct OneshotChannel<T> {
value: MutInPlaceCell<Option<T>>,
has_value: AsyncEvent,
closed: Cell<bool>,
}
impl<T> OneshotChannel<T> {
fn new() -> Self {
Self {
value: MutInPlaceCell::new(None),
has_value: AsyncEvent::new(),
closed: Cell::new(false),
}
}
fn send(&self, value: T) -> Result<(), T> {
if self.closed.get() {
return Err(value);
}
self.value.use_mut(|v| *v = Some(value));
self.has_value.set();
Ok(())
}
async fn recv(&self) -> Result<T, ChannelError> {
loop {
if let Some(value) = self.try_recv()? {
return Ok(value);
}
self.has_value.wait().await?;
}
}
async fn recv_with_deadline(&self, deadline: Option<Instant>) -> Result<T, ChannelError> {
loop {
if let Some(value) = self.try_recv()? {
return Ok(value);
}
self.has_value.wait_with_deadline(deadline).await?;
}
}
fn try_recv(&self) -> Result<Option<T>, ChannelError> {
if let Some(value) = self.value.use_mut(|v| v.take()) {
Ok(Some(value))
} else if self.closed.get() {
Err(ChannelError::Closed)
} else {
Ok(None)
}
}
fn close(&self) {
self.closed.set(true);
self.has_value.set();
}
}
#[cfg(test)]
mod test {
use super::oneshot;
use crate::ChannelError;
#[crate::test]
async fn oneshot_recv_before_send() {
use crate::operations::spawn_task;
let (tx, rx) = oneshot();
let recv_task = spawn_task(async move {
assert_eq!(rx.recv().await, Ok(42));
});
tx.send(42).unwrap();
recv_task.await.unwrap();
}
#[crate::test]
async fn oneshot_recv_after_send() {
let (tx, rx) = oneshot();
tx.send(42).unwrap();
assert_eq!(rx.recv().await, Ok(42));
}
#[crate::test]
async fn oneshot_recv_and_send_dropped() {
let (tx, rx) = oneshot::<i32>();
drop(tx);
assert_eq!(rx.recv().await, Err(ChannelError::Closed));
}
#[crate::test]
async fn oneshot_send_dropped_before_recv() {
let (tx, rx) = oneshot::<i32>();
drop(tx);
assert_eq!(rx.recv().await, Err(ChannelError::Closed));
}
#[crate::test]
async fn oneshot_recv_with_deadline_success() {
use crate::operations::spawn_task;
use std::time::{Duration, Instant};
let (tx, rx) = oneshot();
let deadline = Instant::now() + Duration::from_millis(100);
let recv_task = spawn_task(async move { rx.recv_with_deadline(Some(deadline)).await });
tx.send(42).unwrap();
assert_eq!(recv_task.await.unwrap(), Ok(42));
}
#[crate::test]
async fn oneshot_recv_with_deadline_timeout() {
use std::time::{Duration, Instant};
let (_tx, rx) = oneshot::<i32>();
let deadline = Instant::now() + Duration::from_millis(10);
let result = rx.recv_with_deadline(Some(deadline)).await;
assert!(result.is_err());
}
#[crate::test]
async fn oneshot_recv_with_deadline_no_deadline() {
use crate::operations::spawn_task;
let (tx, rx) = oneshot();
let recv_task = spawn_task(async move { rx.recv_with_deadline(None).await });
tx.send(123).unwrap();
assert_eq!(recv_task.await.unwrap(), Ok(123));
}
#[crate::test]
async fn oneshot_recv_with_deadline_closed_channel() {
use std::time::{Duration, Instant};
let (tx, rx) = oneshot::<i32>();
drop(tx);
let deadline = Instant::now() + Duration::from_millis(100);
let result = rx.recv_with_deadline(Some(deadline)).await;
assert_eq!(result, Err(ChannelError::Closed));
}
#[crate::test]
async fn oneshot_try_recv_available() {
let (tx, rx) = oneshot();
tx.send(42).unwrap();
assert_eq!(rx.try_recv(), Ok(Some(42)));
assert_eq!(rx.try_recv(), Err(ChannelError::Closed));
}
#[crate::test]
async fn oneshot_try_recv_not_available() {
let (_tx, rx) = oneshot::<i32>();
assert_eq!(rx.try_recv(), Ok(None));
}
#[crate::test]
async fn oneshot_try_recv_closed() {
let (tx, rx) = oneshot::<i32>();
drop(tx);
assert_eq!(rx.try_recv(), Err(ChannelError::Closed));
}
#[crate::test]
async fn oneshot_try_recv_multiple_attempts() {
use crate::operations::spawn_task;
let (tx, rx) = oneshot();
assert_eq!(rx.try_recv(), Ok(None));
let send_task = spawn_task(async move {
crate::operations::sleep(std::time::Duration::from_millis(10))
.await
.unwrap();
tx.send(99).unwrap();
});
send_task.await.unwrap();
assert_eq!(rx.try_recv(), Ok(Some(99)));
}
#[crate::test]
async fn oneshot_debug_sender() {
let (tx, _rx) = oneshot::<i32>();
let debug_str = format!("{tx:?}");
assert!(debug_str.contains("SenderOneshot"));
assert!(debug_str.contains("complete: false"));
tx.send(42).unwrap();
}
#[crate::test]
async fn oneshot_debug_receiver() {
let (tx, rx) = oneshot::<i32>();
let debug_str = format!("{rx:?}");
assert!(debug_str.contains("ReceiverOneshot"));
assert!(debug_str.contains("complete: false"));
tx.send(42).unwrap();
let debug_str_after_send = format!("{rx:?}");
assert!(debug_str_after_send.contains("ReceiverOneshot"));
assert!(debug_str_after_send.contains("complete: true"));
let _value = rx.recv().await.unwrap();
}
#[crate::test]
async fn oneshot_debug_closed_sender() {
let (tx, rx) = oneshot::<i32>();
drop(rx);
let debug_str = format!("{tx:?}");
assert!(debug_str.contains("SenderOneshot"));
assert!(debug_str.contains("complete:"));
}
#[crate::test]
async fn oneshot_debug_closed_receiver() {
let (tx, rx) = oneshot::<i32>();
drop(tx);
let debug_str = format!("{rx:?}");
assert!(debug_str.contains("ReceiverOneshot"));
assert!(debug_str.contains("complete:"));
}
#[crate::test]
async fn oneshot_channel_recv_with_deadline_direct() {
use std::time::{Duration, Instant};
let channel = super::OneshotChannel::<i32>::new();
let deadline = Instant::now() + Duration::from_millis(10);
let result = channel.recv_with_deadline(Some(deadline)).await;
assert!(result.is_err());
}
#[crate::test]
async fn oneshot_channel_recv_with_deadline_success_direct() {
use crate::operations::spawn_task;
use std::time::{Duration, Instant};
let channel = std::rc::Rc::new(super::OneshotChannel::<i32>::new());
let deadline = Instant::now() + Duration::from_millis(100);
let recv_channel = channel.clone();
let recv_task =
spawn_task(async move { recv_channel.recv_with_deadline(Some(deadline)).await });
channel.send(42).unwrap();
assert_eq!(recv_task.await.unwrap(), Ok(42));
}
}