komora_sync/
oneshot.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Condvar, Mutex};
4use std::task::Waker;
5use std::task::{Context, Poll};
6
7pub fn filled_oneshot<T>(value: T) -> ReceiveOne<T> {
8    let shared: Arc<Shared<T>> = Arc::new(Shared {
9        state: Mutex::new(State {
10            value: Some(Ok(value)),
11            waker: None,
12        }),
13        cv: Condvar::new(),
14    });
15
16    ReceiveOne { shared }
17}
18
19pub fn oneshot<T>() -> (SendOne<T>, ReceiveOne<T>) {
20    let shared: Arc<Shared<T>> = Arc::default();
21
22    (
23        SendOne {
24            shared: shared.clone(),
25            sent: false,
26        },
27        ReceiveOne { shared },
28    )
29}
30
31struct State<T> {
32    value: Option<Result<T, SenderDropped>>,
33    waker: Option<Waker>,
34}
35
36impl<T> Default for State<T> {
37    fn default() -> State<T> {
38        State {
39            value: None,
40            waker: None,
41        }
42    }
43}
44
45pub struct Shared<T> {
46    state: Mutex<State<T>>,
47    cv: Condvar,
48}
49
50impl<T> Default for Shared<T> {
51    fn default() -> Shared<T> {
52        Shared {
53            state: Mutex::default(),
54            cv: Condvar::new(),
55        }
56    }
57}
58
59pub struct SendOne<T> {
60    shared: Arc<Shared<T>>,
61    sent: bool,
62}
63
64pub struct ReceiveOne<T> {
65    shared: Arc<Shared<T>>,
66}
67
68#[derive(Debug, Clone, Copy)]
69pub struct SenderDropped;
70
71impl<T> SendOne<T> {
72    pub fn send(mut self, t: T) {
73        self.send_inner(Ok(t));
74        self.sent = true;
75    }
76
77    fn send_inner(&mut self, t_res: Result<T, SenderDropped>) {
78        let mut state = self.shared.state.lock().unwrap();
79
80        assert!(state.value.is_none());
81
82        state.value = Some(t_res);
83
84        let waker_opt = state.waker.clone();
85
86        drop(state);
87
88        self.shared.cv.notify_one();
89
90        if let Some(waker) = waker_opt {
91            waker.wake();
92        }
93    }
94}
95
96impl<T> Drop for SendOne<T> {
97    fn drop(&mut self) {
98        if !self.sent {
99            self.send_inner(Err(SenderDropped));
100        }
101    }
102}
103
104impl<T> ReceiveOne<T> {
105    pub fn recv(self) -> Result<T, SenderDropped> {
106        let mut state = self.shared.state.lock().unwrap();
107
108        while state.value.is_none() {
109            state = self.shared.cv.wait(state).unwrap();
110        }
111
112        state.value.take().unwrap()
113    }
114}
115
116impl<T> Future for ReceiveOne<T> {
117    type Output = Result<T, SenderDropped>;
118
119    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
120        let mut state = self.shared.state.lock().unwrap();
121
122        let waker = cx.waker().clone();
123
124        state.waker = Some(waker);
125
126        if let Some(filled_value) = state.value.take() {
127            Poll::Ready(filled_value)
128        } else {
129            Poll::Pending
130        }
131    }
132}