breadthread/
value.rs

1// MIT/Apache2 License
2
3use crate::sync::{self, Mutex};
4use alloc::sync::Arc;
5use core::{
6    cell::UnsafeCell,
7    marker::Unpin,
8    mem::{self, MaybeUninit},
9    ptr,
10    task::Waker,
11};
12
13#[cfg(feature = "std")]
14use parking::Unparker;
15
16use core::{
17    future::Future,
18    pin::Pin,
19    task::{Context, Poll},
20};
21
22#[cfg(not(loom))]
23use core::sync::atomic::{AtomicBool, Ordering::SeqCst};
24#[cfg(loom)]
25use loom::sync::atomic::{AtomicBool, Ordering::SeqCst};
26
27/// A value that may eventually resolve.
28pub struct Value<T>(Arc<ValueInner<T>>);
29
30/// A setter for a `Value`.
31pub(crate) struct Setter<T>(Value<T>);
32
33impl<T> Unpin for Value<T> {}
34
35struct ValueInner<T> {
36    // whether or not the value in "slot" is valid
37    valid: AtomicBool,
38    // the slot that contains the value
39    slot: UnsafeCell<MaybeUninit<T>>,
40    // the waiter waiting on this value
41    waiter: Mutex<Waiter>,
42}
43
44// SAFETY: valid + slot makes up what's essentially a partially
45// atomic `Option<T>` that never gives out references
46unsafe impl<T: Send> Send for ValueInner<T> {}
47unsafe impl<T: Send> Sync for ValueInner<T> {}
48
49enum Waiter {
50    None,
51    #[cfg(feature = "std")]
52    Unpark(Unparker),
53    Waker(Waker),
54}
55
56impl<T> Value<T> {
57    pub(crate) fn new() -> (Self, Setter<T>) {
58        let inner = Arc::new(ValueInner {
59            valid: AtomicBool::new(false),
60            slot: UnsafeCell::new(MaybeUninit::uninit()),
61            waiter: Mutex::new(Waiter::None),
62        });
63
64        (Self(inner.clone()), Setter(Self(inner)))
65    }
66
67    /// Create a `Value` that is already resolved.
68    ///
69    /// This is useful in code where once branch may need to use
70    /// the driving thread, but another can be resolved immediately.
71    pub fn immediate(val: T) -> Self {
72        Value(Arc::new(ValueInner {
73            valid: AtomicBool::new(true),
74            slot: UnsafeCell::new(MaybeUninit::new(val)),
75            waiter: Mutex::new(Waiter::None),
76        }))
77    }
78
79    pub fn is_resolved(&self) -> bool {
80        self.0.valid.load(SeqCst)
81    }
82
83    /// Convert to the inner value.
84    ///
85    /// # Safety
86    ///
87    /// This is unsafe because the value may not be resolved.
88    unsafe fn take_inner(&mut self) -> T {
89        debug_assert!(self.is_resolved());
90
91        self.0.valid.store(false, SeqCst);
92        ptr::read(self.0.slot.get() as *const T)
93    }
94
95    /// Try to resolve the value immediately.
96    pub fn resolve(mut self) -> Result<T, Value<T>> {
97        // tell if we can resolve yet
98        if self.is_resolved() {
99            Ok(unsafe { self.take_inner() })
100        } else {
101            Err(self)
102        }
103    }
104
105    fn put_waiter(&mut self, waiter: Waiter) {
106        sync::lock(&self.0.waiter).replace(waiter);
107    }
108
109    /// Wait for the value to be resolved.
110    #[cfg(feature = "std")]
111    pub fn wait(self) -> T {
112        let mut this = match self.resolve() {
113            Ok(val) => return val,
114            Err(this) => this,
115        };
116
117        // wait for the value to be resolved
118        let (parker, unparker) = parking::pair();
119        this.put_waiter(Waiter::Unpark(unparker));
120
121        match this.resolve() {
122            Ok(val) => val,
123            Err(mut this) => {
124                // wait for the value to be resolved
125                parker.park();
126
127                // this will only occur if we've been resolved
128                unsafe { this.take_inner() }
129            }
130        }
131    }
132}
133
134impl<T> Future for Value<T> {
135    type Output = T;
136
137    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
138        let this = self.get_mut();
139
140        if this.is_resolved() {
141            return Poll::Ready(unsafe { this.take_inner() });
142        }
143
144        // set the atomic waker to our new waker
145        this.put_waiter(Waiter::Waker(cx.waker().clone()));
146
147        // try again
148        if this.is_resolved() {
149            Poll::Ready(unsafe { this.take_inner() })
150        } else {
151            Poll::Pending
152        }
153    }
154}
155
156impl<T> Setter<T> {
157    fn inner(&self) -> &ValueInner<T> {
158        &(self.0).0
159    }
160
161    /// Put in a value, and perhaps signaling a `Value` on the other end.
162    pub(crate) fn put(mut self, value: T) {
163        // SAFETY: it is impossible for another value to be put in here
164        unsafe { ptr::write(self.inner().slot.get() as *mut T, value) };
165        self.inner().valid.store(true, SeqCst);
166        self.0.put_waiter(Waiter::None)
167    }
168}
169
170impl Waiter {
171    fn replace(&mut self, other: Waiter) {
172        let this = mem::replace(self, other);
173
174        match this {
175            Self::None => {}
176            Self::Waker(w) => w.wake(),
177            #[cfg(feature = "std")]
178            Self::Unpark(p) => {
179                p.unpark();
180            }
181        }
182    }
183}
184
185impl<T> Drop for ValueInner<T> {
186    fn drop(&mut self) {
187        // drop if the valid is valid
188        if *self.valid.get_mut() {
189            // drop the value
190            unsafe {
191                ptr::drop_in_place(self.slot.get() as *mut T);
192            }
193        }
194    }
195}