1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
#[cfg(not(oneshot_loom))]
use core::cell::UnsafeCell;
#[cfg(oneshot_loom)]
use loom::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::ptr;
#[cfg(feature = "async")]
use core::task::{self, Poll};
use crate::atomic::AtomicU8;
#[cfg(feature = "async")]
use crate::atomic::{Ordering::*, fence};
#[cfg(feature = "async")]
use crate::states::{DISCONNECTED, EMPTY, MESSAGE, RECEIVING};
use crate::waker::ReceiverWaker;
#[cfg(feature = "async")]
use crate::RecvError;
/// Internal channel data structure. The `channel` method allocates and puts one instance
/// of this struct on the heap for each oneshot channel instance.
pub struct Channel<T> {
/// The current state of the channel. This is initialized to EMPTY, and always has the value
/// of one of the constants in the `states` module. This atomic field is what allows the
/// `Sender` and `Receiver` to communicate with each other and coordinate their actions
/// in a thread safe manner.
state: AtomicU8,
/// The message in the channel. This memory is uninitialized until the message is sent.
///
/// This field is wrapped in an `UnsafeCell` since interior mutability is required.
/// Both ends of the channel will access this field mutably through a shared reference.
message: UnsafeCell<MaybeUninit<T>>,
/// The waker instance for the thread or task that is currently receiving on this channel.
/// This memory is uninitialized until the receiver starts receiving.
///
/// This field is wrapped in an `UnsafeCell` since interior mutability is required.
/// Both ends of the channel will access this field mutably through a shared reference.
waker: UnsafeCell<MaybeUninit<ReceiverWaker>>,
}
impl<T> Channel<T> {
pub fn new() -> Self {
Self {
state: AtomicU8::new(crate::states::EMPTY),
message: UnsafeCell::new(MaybeUninit::uninit()),
waker: UnsafeCell::new(MaybeUninit::uninit()),
}
}
/// Returns a shared reference to the atomic state
#[inline(always)]
pub fn state(&self) -> &AtomicU8 {
&self.state
}
/// Returns a reference to the message
///
/// # Safety
///
/// Must be called only when the caller can guarantee the message has been initialized, and
/// no other thread will access the message field for the lifetime of the returned reference.
#[inline(always)]
pub unsafe fn message(&self) -> &T {
// SAFETY: The caller guarantees that no other thread will access the message field.
let message_container = unsafe {
#[cfg(oneshot_loom)]
{
self.message.with(|ptr| &*ptr)
}
#[cfg(not(oneshot_loom))]
{
&*self.message.get()
}
};
// SAFETY: The caller guarantees that the message has been initialized.
unsafe { message_container.assume_init_ref() }
}
/// Runs a closure with mutable access to the message field in the channel.
///
/// # Safety
///
/// This uses interior mutability to provide mutable access via a shared reference to
/// the channel. As a result, the caller must guarantee exclusive access to the message
/// field during this call.
#[inline(always)]
unsafe fn with_message_mut<F>(&self, op: F)
where
F: FnOnce(&mut MaybeUninit<T>),
{
// SAFETY: The caller guarantees exclusive access to the message field.
#[cfg(oneshot_loom)]
unsafe {
self.message.with_mut(|ptr| op(&mut *ptr))
}
// SAFETY: The caller guarantees exclusive access to the message field.
#[cfg(not(oneshot_loom))]
op(unsafe { &mut *self.message.get() })
}
/// Runs a closure with mutable access to the waker field in the channel.
///
/// # Safety
///
/// This uses interior mutability to provide mutable access via a shared reference to
/// the channel. As a result, the caller must guarantee exclusive access to the waker
/// field during this call.
#[inline(always)]
#[cfg(any(feature = "std", feature = "async"))]
unsafe fn with_waker_mut<F>(&self, op: F)
where
F: FnOnce(&mut MaybeUninit<ReceiverWaker>),
{
#[cfg(oneshot_loom)]
{
// SAFETY: The caller guarantees exclusive access to the waker field.
self.waker.with_mut(|ptr| op(unsafe { &mut *ptr }))
}
#[cfg(not(oneshot_loom))]
{
// SAFETY: The caller guarantees exclusive access to the waker field.
op(unsafe { &mut *self.waker.get() })
}
}
/// Writes a message to the message field in the channel. Will overwrite whatever
/// is currently stored in the field. To avoid potential memory leaks, the caller
/// should ensure that the waker field does not currently have any initialized
/// waker in it before calling this function.
///
/// # Safety
///
/// Caller must guarantee exclusive access to the message field during this call.
#[inline(always)]
pub unsafe fn write_message(&self, message: T) {
// SAFETY: The caller guarantees exclusive access to the message field.
unsafe {
self.with_message_mut(|slot| slot.as_mut_ptr().write(message));
}
}
/// Reads the message from the channel and returns it.
///
/// # Safety
///
/// Must only be called after having observed the MESSAGE state with an acquire
/// memory ordering to synchronize with the other thread's write of the message.
#[inline(always)]
pub unsafe fn take_message(&self) -> T {
// SAFETY: The caller guarantees that no other thread will access the message field.
let message_container = unsafe {
#[cfg(oneshot_loom)]
{
self.message.with(|ptr| ptr::read(ptr))
}
#[cfg(not(oneshot_loom))]
{
ptr::read(self.message.get())
}
};
// SAFETY: The caller guarantees that the message has been initialized.
unsafe { message_container.assume_init() }
}
/// # Safety
///
/// Must only be called after having observed the MESSAGE state with an acquire
/// memory ordering to synchronize with the other thread's write of the message.
#[inline(always)]
pub unsafe fn drop_message(&self) {
// SAFETY: The caller guarantees that the message has been initialized and that
// we have exclusive access for the duration of this call.
unsafe {
self.with_message_mut(|slot| slot.assume_init_drop());
}
}
/// Writes a waker to the waker field in the channel. Will overwrite whatever
/// is currently stored in the field. To avoid potential memory leaks, the caller
/// should ensure that the waker field does not currently have any initialized
/// waker in it before calling this function.
///
/// # Safety
///
/// Caller must guarantee exclusive access to the waker field during this call.
#[cfg(any(feature = "std", feature = "async"))]
#[inline(always)]
pub unsafe fn write_waker(&self, waker: ReceiverWaker) {
// SAFETY: The caller guarantees exclusive access to the waker field.
unsafe { self.with_waker_mut(|slot| slot.as_mut_ptr().write(waker)) };
}
/// # Safety
///
/// Must be called only when the caller can guarantee the waker has been initialized (and
/// not already dropped), and no other thread will access the waker field during this call.
#[inline(always)]
pub unsafe fn take_waker(&self) -> ReceiverWaker {
// SAFETY: The caller guarantees that a waker has been initialized, and
// that no other thread will access the waker field during this call.
unsafe {
#[cfg(oneshot_loom)]
{
self.waker.with(|ptr| ptr::read(ptr)).assume_init()
}
#[cfg(not(oneshot_loom))]
{
ptr::read(self.waker.get()).assume_init()
}
}
}
/// Runs the `Drop` implementation on the channel waker.
///
/// # Safety
///
/// Must be called only when the caller can guarantee the waker has been initialized (and
/// not already dropped), and no other thread will access the waker field during this call.
#[cfg(any(feature = "std", feature = "async"))]
#[inline(always)]
pub unsafe fn drop_waker(&self) {
// SAFETY: The caller guarantees that a waker has been initialized, and that
// we have exclusive access while this method runs.
unsafe { self.with_waker_mut(|slot| slot.assume_init_drop()) };
}
/// # Safety
///
/// * `Channel::waker` must not have a waker stored in it when calling this method.
/// * Channel state must not be RECEIVING or UNPARKING when calling this method.
#[cfg(feature = "async")]
pub unsafe fn write_async_waker(
&self,
cx: &mut task::Context<'_>,
) -> Poll<Result<T, RecvError>> {
// SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
// try to access the waker until it sees the state set to RECEIVING below
unsafe { self.write_waker(ReceiverWaker::task_waker(cx)) };
// ORDERING: we use release ordering on success so the sender can synchronize with
// our write of the waker. We use relaxed ordering on failure since the sender does
// not need to synchronize with our write and the individual match arms handle any
// additional synchronization
match self
.state
.compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
{
// We stored our waker, now we return and let the sender wake us up
Ok(_) => Poll::Pending,
// The sender sent the message while we prepared to park.
// We take the message and mark the channel disconnected.
Err(MESSAGE) => {
// SAFETY: We wrote a waker above. The sender cannot have observed the
// RECEIVING state, so it has not accessed the waker. We must drop it.
unsafe { self.drop_waker() };
// ORDERING: sender does not exist, so this update only needs to be visible to us
self.state.store(DISCONNECTED, Relaxed);
// ORDERING: Synchronize with the write of the message. This branch is
// unlikely to be taken, so it's likely more efficient to use a fence here
// instead of AcqRel ordering on the compare_exchange operation
fence(Acquire);
// SAFETY: The MESSAGE state + acquire ordering guarantees initialized message
Poll::Ready(Ok(unsafe { self.take_message() }))
}
// The sender was dropped before sending anything while we prepared to park.
Err(DISCONNECTED) => {
// SAFETY: We wrote a waker above. The sender cannot have observed the
// RECEIVING state, so it has not accessed the waker. We must drop it.
unsafe { self.drop_waker() };
Poll::Ready(Err(RecvError))
}
_ => unreachable!(),
}
}
}