1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Condvar, Mutex};
4use std::task::Waker;
5use std::task::{Context, Poll};
6
7pub fn filled_oneshot<T>(value: T) -> ReceiveOne<T> {
8 let shared: Arc<Shared<T>> = Arc::new(Shared {
9 state: Mutex::new(State {
10 value: Some(Ok(value)),
11 waker: None,
12 }),
13 cv: Condvar::new(),
14 });
15
16 ReceiveOne { shared }
17}
18
19pub fn oneshot<T>() -> (SendOne<T>, ReceiveOne<T>) {
20 let shared: Arc<Shared<T>> = Arc::default();
21
22 (
23 SendOne {
24 shared: shared.clone(),
25 sent: false,
26 },
27 ReceiveOne { shared },
28 )
29}
30
31struct State<T> {
32 value: Option<Result<T, SenderDropped>>,
33 waker: Option<Waker>,
34}
35
36impl<T> Default for State<T> {
37 fn default() -> State<T> {
38 State {
39 value: None,
40 waker: None,
41 }
42 }
43}
44
45pub struct Shared<T> {
46 state: Mutex<State<T>>,
47 cv: Condvar,
48}
49
50impl<T> Default for Shared<T> {
51 fn default() -> Shared<T> {
52 Shared {
53 state: Mutex::default(),
54 cv: Condvar::new(),
55 }
56 }
57}
58
59pub struct SendOne<T> {
60 shared: Arc<Shared<T>>,
61 sent: bool,
62}
63
64pub struct ReceiveOne<T> {
65 shared: Arc<Shared<T>>,
66}
67
68#[derive(Debug, Clone, Copy)]
69pub struct SenderDropped;
70
71impl<T> SendOne<T> {
72 pub fn send(mut self, t: T) {
73 self.send_inner(Ok(t));
74 self.sent = true;
75 }
76
77 fn send_inner(&mut self, t_res: Result<T, SenderDropped>) {
78 let mut state = self.shared.state.lock().unwrap();
79
80 assert!(state.value.is_none());
81
82 state.value = Some(t_res);
83
84 let waker_opt = state.waker.clone();
85
86 drop(state);
87
88 self.shared.cv.notify_one();
89
90 if let Some(waker) = waker_opt {
91 waker.wake();
92 }
93 }
94}
95
96impl<T> Drop for SendOne<T> {
97 fn drop(&mut self) {
98 if !self.sent {
99 self.send_inner(Err(SenderDropped));
100 }
101 }
102}
103
104impl<T> ReceiveOne<T> {
105 pub fn recv(self) -> Result<T, SenderDropped> {
106 let mut state = self.shared.state.lock().unwrap();
107
108 while state.value.is_none() {
109 state = self.shared.cv.wait(state).unwrap();
110 }
111
112 state.value.take().unwrap()
113 }
114}
115
116impl<T> Future for ReceiveOne<T> {
117 type Output = Result<T, SenderDropped>;
118
119 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
120 let mut state = self.shared.state.lock().unwrap();
121
122 let waker = cx.waker().clone();
123
124 state.waker = Some(waker);
125
126 if let Some(filled_value) = state.value.take() {
127 Poll::Ready(filled_value)
128 } else {
129 Poll::Pending
130 }
131 }
132}