use super::*;
use crate::error::{RecvError, TryRecvError, TrySendError};
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;
const TEST_TIMEOUT: Duration = Duration::from_secs(1);
#[cfg(not(miri))]
#[tokio::test]
async fn send_recv_ok() {
let (tx, rx) = oneshot::<String>();
let message = "hello oneshot".to_string();
tokio::spawn(async move {
tx.send(message.clone()).expect("Send failed");
});
let received = timeout(TEST_TIMEOUT, rx.recv())
.await
.expect("Receive timed out")
.unwrap();
assert_eq!(received, "hello oneshot");
}
#[cfg(not(miri))]
#[tokio::test]
async fn try_recv_before_send() {
let (tx, rx) = oneshot::<i32>();
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
drop(tx); assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
}
#[cfg(not(miri))]
#[tokio::test]
async fn try_recv_after_send() {
let (tx, rx) = oneshot::<i32>();
tx.send(123).expect("Send failed");
assert_eq!(rx.try_recv().unwrap(), 123);
assert!(matches!(
rx.try_recv(),
Err(TryRecvError::Disconnected) | Err(TryRecvError::Empty)
));
}
#[cfg(not(miri))]
#[tokio::test]
async fn recv_after_all_senders_dropped_no_send() {
let (tx1, rx) = oneshot::<i32>();
let tx2 = tx1.clone();
let tx3 = tx2.clone();
drop(tx1);
drop(tx2);
drop(tx3);
match timeout(TEST_TIMEOUT, rx.recv()).await {
Ok(Err(RecvError::Disconnected)) => {} res => panic!("Expected Disconnected, got {:?}", res),
}
}
#[cfg(not(miri))]
#[tokio::test]
async fn send_fails_if_receiver_dropped() {
let (tx, rx) = oneshot::<String>();
drop(rx);
let message = "won't be sent".to_string();
match tx.send(message.clone()) {
Err(TrySendError::Closed(returned_message)) => {
assert_eq!(returned_message, message);
}
res => panic!("Expected TrySendError::Closed, got {:?}", res),
}
}
#[cfg(not(miri))]
#[tokio::test]
async fn only_first_send_succeeds_cloned_senders() {
let (tx1, rx) = oneshot::<i32>();
let tx2 = tx1.clone();
let tx3 = tx1.clone();
tokio::spawn(async move {
tx1.send(1).expect("Send 1 failed");
});
assert_eq!(
timeout(TEST_TIMEOUT, rx.recv())
.await
.expect("Timeout")
.unwrap(),
1
);
match tx2.send(2) {
Err(TrySendError::Sent(val)) => assert_eq!(val, 2),
res => panic!("Expected TrySendError::Sent from tx2, got {:?}", res),
}
match tx3.send(3) {
Err(TrySendError::Sent(val)) => assert_eq!(val, 3),
res => panic!("Expected TrySendError::Sent from tx3, got {:?}", res),
}
}
#[cfg(not(miri))]
#[tokio::test]
async fn receiver_dropped_after_send_value_is_dropped() {
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug)]
struct DroppableVal(String);
impl Drop for DroppableVal {
fn drop(&mut self) {
println!("Dropping DroppableVal: {}", self.0);
DROP_COUNT.fetch_add(1, AtomicOrdering::Relaxed);
}
}
DROP_COUNT.store(0, AtomicOrdering::Relaxed);
{
let (tx, rx) = oneshot::<DroppableVal>();
tx.send(DroppableVal("should be dropped".to_string()))
.expect("Send failed");
drop(rx);
}
assert_eq!(DROP_COUNT.load(AtomicOrdering::Relaxed), 1);
}
#[cfg(not(miri))]
#[tokio::test]
async fn receiver_dropped_while_sender_sending_concurrently() {
let (tx, rx) = oneshot::<i32>();
let sender_task = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
tx.send(123) });
tokio::time::sleep(Duration::from_millis(5)).await; drop(rx);
match sender_task.await.unwrap() {
Ok(()) => println!("Sender completed send (receiver likely dropped after value placed)"),
Err(TrySendError::Closed(_)) => println!("Sender saw receiver dropped before completing send"),
Err(e) => panic!("Unexpected send error: {:?}", e),
}
}
#[cfg(not(miri))]
#[tokio::test]
async fn select_on_recv() {
let (tx1, rx1) = oneshot::<i32>();
let (_tx2, rx2) = oneshot::<i32>();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
tx1.send(100).unwrap();
});
let start = std::time::Instant::now();
tokio::select! {
biased; Ok(val) = rx1.recv() => {
assert_eq!(val, 100);
assert!(start.elapsed() >= Duration::from_millis(40)); }
_ = rx2.recv() => {
panic!("Should not have received from rx2");
}
_ = tokio::time::sleep(TEST_TIMEOUT) => {
panic!("Select timed out");
}
}
}
#[cfg(not(miri))]
#[tokio::test]
async fn sender_clones_drop_receiver_gets_disconnected() {
let (tx_orig, rx) = oneshot::<()>();
let mut senders = Vec::new();
for _ in 0..5 {
senders.push(tx_orig.clone());
}
drop(tx_orig);
while let Some(s) = senders.pop() {
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty))); drop(s);
}
assert_eq!(rx.recv().await, Err(RecvError::Disconnected));
}
#[cfg(not(miri))]
#[tokio::test]
async fn send_consumes_sender() {
let (tx, rx) = oneshot::<i32>();
let _ = tx.send(1);
assert_eq!(rx.recv().await.unwrap(), 1);
}
#[cfg(not(miri))]
#[tokio::test]
async fn is_closed_and_is_sent_semantics() {
let (tx1, rx) = oneshot::<i32>();
let tx2 = tx1.clone();
assert!(!tx1.is_closed()); assert!(!tx1.is_sent());
assert!(!rx.is_closed());
let tx_to_send = tx1.clone(); drop(tx1);
tx_to_send.send(123).unwrap();
assert!(tx2.is_sent());
assert_eq!(rx.recv().await.unwrap(), 123);
assert!(tx2.is_sent());
drop(tx2); assert!(rx.is_closed());
let (tx3, rx2) = oneshot::<i32>();
assert!(!tx3.is_closed());
drop(rx2);
assert!(tx3.is_closed()); }
#[test]
fn test_oneshot_drop_race_leak() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
struct DropTracker {
counter: Arc<AtomicUsize>,
}
impl Drop for DropTracker {
fn drop(&mut self) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
}
let drop_counter = Arc::new(AtomicUsize::new(0));
let tracked_value = DropTracker {
counter: Arc::clone(&drop_counter),
};
let (tx, rx) = oneshot::<DropTracker>();
let shared = Arc::clone(&tx.shared);
let lock_guard = shared.value_slot.lock().unwrap();
let sender_thread = thread::spawn(move || {
let _ = tx.send(tracked_value);
});
while shared.state.load(Ordering::Acquire) != super::core::STATE_WRITING {
thread::yield_now();
}
drop(rx);
drop(lock_guard);
sender_thread.join().unwrap();
assert_eq!(
drop_counter.load(Ordering::SeqCst),
1,
"The value inside the oneshot channel was leaked!"
);
}
#[test]
fn test_oneshot_sender_count_underflow() {
let (tx, rx) = oneshot::<i32>();
let shared = Arc::clone(&tx.shared);
drop(rx);
let tx_clone = tx.clone();
assert_eq!(shared.sender_count.load(Ordering::Relaxed), 2);
drop(tx);
assert_eq!(shared.sender_count.load(Ordering::Relaxed), 1);
drop(tx_clone);
let final_count = shared.sender_count.load(Ordering::Relaxed);
assert_eq!(
final_count, 0,
"sender_count underflowed to {}!",
final_count
);
}