lite_sync/oneshot/
common.rs1use crate::shim::sync::Arc;
6use core::fmt;
7use core::future::Future;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11use crate::atomic_waker::AtomicWaker;
12
13pub mod error {
18 use core::fmt;
21
22 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
26 pub struct RecvError;
27
28 impl fmt::Display for RecvError {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 write!(f, "channel closed")
31 }
32 }
33
34 impl core::error::Error for RecvError {}
35
36 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
40 pub enum TryRecvError {
41 Empty,
45 Closed,
49 }
50
51 impl fmt::Display for TryRecvError {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 match self {
54 TryRecvError::Empty => write!(f, "channel empty"),
55 TryRecvError::Closed => write!(f, "channel closed"),
56 }
57 }
58 }
59
60 impl core::error::Error for TryRecvError {}
61}
62
63pub use self::error::RecvError;
64pub use self::error::TryRecvError;
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum TakeResult<T> {
73 Ready(T),
75 Pending,
77 Closed,
79}
80
81impl<T> TakeResult<T> {
82 #[inline]
83 pub fn ok(self) -> Option<T> {
84 match self {
85 TakeResult::Ready(v) => Some(v),
86 _ => None,
87 }
88 }
89
90 #[inline]
91 pub fn is_closed(&self) -> bool {
92 matches!(self, TakeResult::Closed)
93 }
94}
95
96pub trait OneshotStorage: Send + Sync + Sized {
100 type Value: Send;
102
103 fn new() -> Self;
105
106 fn store(&self, value: Self::Value);
108
109 fn try_take(&self) -> TakeResult<Self::Value>;
111
112 fn is_sender_dropped(&self) -> bool;
114
115 fn mark_sender_dropped(&self);
117
118 fn is_receiver_closed(&self) -> bool;
122
123 fn mark_receiver_closed(&self);
127}
128
129pub struct Inner<S: OneshotStorage> {
135 pub(crate) waker: AtomicWaker,
136 pub(crate) storage: S,
137}
138
139impl<S: OneshotStorage> Inner<S> {
140 #[inline]
141 pub fn new() -> Arc<Self> {
142 Arc::new(Self {
143 waker: AtomicWaker::new(),
144 storage: S::new(),
145 })
146 }
147
148 #[inline]
149 pub fn send(&self, value: S::Value) {
150 self.storage.store(value);
151 self.waker.wake();
152 }
153
154 #[inline]
155 pub fn try_recv(&self) -> TakeResult<S::Value> {
156 self.storage.try_take()
157 }
158
159 #[inline]
160 pub fn register_waker(&self, waker: &core::task::Waker) {
161 self.waker.register(waker);
162 }
163
164 #[inline]
165 pub fn is_sender_dropped(&self) -> bool {
166 self.storage.is_sender_dropped()
167 }
168}
169
170impl<S: OneshotStorage> fmt::Debug for Inner<S> {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 f.debug_struct("Inner").finish_non_exhaustive()
173 }
174}
175
176pub struct Sender<S: OneshotStorage> {
184 pub(crate) inner: Arc<Inner<S>>,
185}
186
187impl<S: OneshotStorage> fmt::Debug for Sender<S> {
188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189 f.debug_struct("Sender").finish_non_exhaustive()
190 }
191}
192
193impl<S: OneshotStorage> Sender<S> {
194 #[inline]
196 pub fn new() -> (Self, Receiver<S>) {
197 let inner = Inner::new();
198 let sender = Sender {
199 inner: inner.clone(),
200 };
201 let receiver = Receiver { inner };
202 (sender, receiver)
203 }
204
205 #[inline]
209 pub fn send(self, value: S::Value) -> Result<(), S::Value> {
210 if self.is_closed() {
211 return Err(value);
212 }
213 self.send_unchecked(value);
214 Ok(())
215 }
216
217 #[inline]
221 pub fn send_unchecked(self, value: S::Value) {
222 self.inner.send(value);
223 core::mem::forget(self);
224 }
225
226 #[inline]
230 pub fn is_closed(&self) -> bool {
231 self.inner.storage.is_receiver_closed() || Arc::strong_count(&self.inner) == 1
233 }
234}
235
236impl<S: OneshotStorage> Drop for Sender<S> {
237 fn drop(&mut self) {
238 self.inner.storage.mark_sender_dropped();
239 self.inner.waker.wake();
240 }
241}
242
243pub struct Receiver<S: OneshotStorage> {
255 pub(crate) inner: Arc<Inner<S>>,
256}
257
258impl<S: OneshotStorage> fmt::Debug for Receiver<S> {
259 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260 f.debug_struct("Receiver").finish_non_exhaustive()
261 }
262}
263
264impl<S: OneshotStorage> Unpin for Receiver<S> {}
265
266impl<S: OneshotStorage> Receiver<S> {
267 #[inline]
269 pub async fn wait(self) -> Result<S::Value, RecvError> {
270 self.await
271 }
272
273 #[inline]
284 pub fn close(&mut self) {
285 self.inner.storage.mark_receiver_closed();
286 }
287
288 #[inline]
304 #[cfg(any(feature = "std", feature = "loom", test))]
305 pub fn blocking_recv(self) -> Result<S::Value, RecvError> {
306 use crate::shim::atomic::{AtomicBool, Ordering};
307 use core::task::{RawWaker, RawWakerVTable, Waker};
308
309 match self.inner.storage.try_take() {
311 TakeResult::Ready(value) => return Ok(value),
312 TakeResult::Closed => return Err(RecvError),
313 TakeResult::Pending => {}
314 }
315
316 struct ThreadParker {
318 thread: crate::shim::thread::Thread,
319 notified: AtomicBool,
320 }
321
322 const VTABLE: RawWakerVTable = RawWakerVTable::new(
323 |ptr| unsafe {
324 Arc::increment_strong_count(ptr as *const ThreadParker);
325 RawWaker::new(ptr, &VTABLE)
326 },
327 |ptr| unsafe {
328 let parker = Arc::from_raw(ptr as *const ThreadParker);
329 parker.notified.store(true, Ordering::Release);
330 parker.thread.unpark();
331 },
332 |ptr| unsafe {
333 let parker = &*(ptr as *const ThreadParker);
334 parker.notified.store(true, Ordering::Release);
335 parker.thread.unpark();
336 },
337 |ptr| unsafe {
338 Arc::decrement_strong_count(ptr as *const ThreadParker);
339 },
340 );
341
342 let parker = Arc::new(ThreadParker {
343 thread: crate::shim::thread::current(),
344 notified: AtomicBool::new(false),
345 });
346
347 let raw_waker = RawWaker::new(Arc::into_raw(parker.clone()) as *const (), &VTABLE);
348 let waker = unsafe { Waker::from_raw(raw_waker) };
349
350 self.inner.register_waker(&waker);
352
353 loop {
354 match self.inner.storage.try_take() {
355 TakeResult::Ready(value) => return Ok(value),
356 TakeResult::Closed => return Err(RecvError),
357 TakeResult::Pending => {}
358 }
359
360 if Arc::strong_count(&self.inner) == 1 && self.inner.is_sender_dropped() {
362 return Err(RecvError);
363 }
364
365 if !parker.notified.swap(false, Ordering::Acquire) {
367 crate::shim::thread::park();
368 }
369 }
370 }
371}
372
373impl<S: OneshotStorage> Future for Receiver<S> {
374 type Output = Result<S::Value, RecvError>;
375
376 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
377 let this = self.get_mut();
378
379 match this.inner.try_recv() {
381 TakeResult::Ready(value) => return Poll::Ready(Ok(value)),
382 TakeResult::Closed => return Poll::Ready(Err(RecvError)),
383 TakeResult::Pending => {}
384 }
385
386 this.inner.register_waker(cx.waker());
388
389 match this.inner.try_recv() {
391 TakeResult::Ready(value) => return Poll::Ready(Ok(value)),
392 TakeResult::Closed => return Poll::Ready(Err(RecvError)),
393 TakeResult::Pending => {}
394 }
395
396 if Arc::strong_count(&this.inner) == 1 && this.inner.is_sender_dropped() {
398 return Poll::Ready(Err(RecvError));
399 }
400
401 Poll::Pending
402 }
403}
404
405#[inline]
407pub fn channel<S: OneshotStorage>() -> (Sender<S>, Receiver<S>) {
408 Sender::new()
409}