local_sync/
once_cell.rs

1//! Once Cell impl borrowed from tokio.
2
3use std::cell::{RefCell, UnsafeCell};
4use std::error::Error;
5use std::fmt;
6use std::future::Future;
7use std::mem::MaybeUninit;
8use std::ops::Drop;
9use std::ptr;
10
11use crate::semaphore::{Semaphore, SemaphorePermit, TryAcquireError};
12
13// This file contains an implementation of an OnceCell. The principle
14// behind the safety the of the cell is that any thread with an `&OnceCell` may
15// access the `value` field according the following rules:
16//
17//  1. When `value_set` is false, the `value` field may be modified by the
18//     thread holding the permit on the semaphore.
19//  2. When `value_set` is true, the `value` field may be accessed immutably by
20//     any thread.
21//
22// It is an invariant that if the semaphore is closed, then `value_set` is true.
23// The reverse does not necessarily hold — but if not, the semaphore may not
24// have any available permits.
25//
26// A thread with a `&mut OnceCell` may modify the value in any way it wants as
27// long as the invariants are upheld.
28
29/// A thread-safe cell that can be written to only once.
30///
31/// A `OnceCell` is typically used for global variables that need to be
32/// initialized once on first use, but need no further changes. The `OnceCell`
33/// in Tokio allows the initialization procedure to be asynchronous.
34///
35/// # Examples
36///
37/// ```
38/// use local_sync::OnceCell;
39///
40/// async fn some_computation() -> u32 {
41///     1 + 1
42/// }
43///
44/// thread_local! {
45///     static ONCE: OnceCell<u32> = OnceCell::new();
46/// }
47///
48/// #[monoio::main]
49/// async fn main() {
50///     let once = ONCE.with(|once| unsafe {
51///         std::ptr::NonNull::new_unchecked(once as *const _ as *mut OnceCell<u32>).as_ref()
52///     });
53///     let result = once.get_or_init(some_computation).await;
54///     assert_eq!(*result, 2);
55/// }
56/// ```
57///
58/// It is often useful to write a wrapper method for accessing the value.
59///
60/// ```
61/// use local_sync::OnceCell;
62///
63/// thread_local! {
64///     static ONCE: OnceCell<u32> = OnceCell::new();
65/// }
66///
67/// async fn get_global_integer() -> &'static u32 {
68///     let once = ONCE.with(|once| unsafe {
69///         std::ptr::NonNull::new_unchecked(once as *const _ as *mut OnceCell<u32>).as_ref()
70///     });
71///     once.get_or_init(|| async {
72///         1 + 1
73///     }).await
74/// }
75///
76/// #[monoio::main]
77/// async fn main() {
78///     let result = get_global_integer().await;
79///     assert_eq!(*result, 2);
80/// }
81/// ```
82pub struct OnceCell<T> {
83    value_set: RefCell<bool>,
84    value: UnsafeCell<MaybeUninit<T>>,
85    semaphore: Semaphore,
86}
87
88impl<T> Default for OnceCell<T> {
89    fn default() -> OnceCell<T> {
90        OnceCell::new()
91    }
92}
93
94impl<T: fmt::Debug> fmt::Debug for OnceCell<T> {
95    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
96        fmt.debug_struct("OnceCell")
97            .field("value", &self.get())
98            .finish()
99    }
100}
101
102impl<T: Clone> Clone for OnceCell<T> {
103    fn clone(&self) -> OnceCell<T> {
104        OnceCell::new_with(self.get().cloned())
105    }
106}
107
108impl<T: PartialEq> PartialEq for OnceCell<T> {
109    fn eq(&self, other: &OnceCell<T>) -> bool {
110        self.get() == other.get()
111    }
112}
113
114impl<T: Eq> Eq for OnceCell<T> {}
115
116impl<T> Drop for OnceCell<T> {
117    fn drop(&mut self) {
118        if self.initialized_mut() {
119            unsafe {
120                let ptr = self.value.get();
121                ptr::drop_in_place((&mut *ptr).as_mut_ptr());
122            };
123        }
124    }
125}
126
127impl<T> From<T> for OnceCell<T> {
128    fn from(value: T) -> Self {
129        let semaphore = Semaphore::new(0);
130        semaphore.close();
131        OnceCell {
132            value_set: RefCell::new(true),
133            value: UnsafeCell::new(MaybeUninit::new(value)),
134            semaphore,
135        }
136    }
137}
138
139impl<T> OnceCell<T> {
140    /// Creates a new empty `OnceCell` instance.
141    pub const fn new() -> Self {
142        OnceCell {
143            value_set: RefCell::new(false),
144            value: UnsafeCell::new(MaybeUninit::uninit()),
145            semaphore: Semaphore::new(1),
146        }
147    }
148
149    /// Creates a new `OnceCell` that contains the provided value, if any.
150    ///
151    /// If the `Option` is `None`, this is equivalent to `OnceCell::new`.
152    ///
153    /// [`OnceCell::new`]: crate::sync::OnceCell::new
154    pub fn new_with(value: Option<T>) -> Self {
155        if let Some(v) = value {
156            OnceCell::from(v)
157        } else {
158            OnceCell::new()
159        }
160    }
161
162    /// Returns `true` if the `OnceCell` currently contains a value, and `false`
163    /// otherwise.
164    pub fn initialized(&self) -> bool {
165        // Using acquire ordering so any threads that read a true from this
166        // atomic is able to read the value.
167        *self.value_set.borrow()
168    }
169
170    /// Returns `true` if the `OnceCell` currently contains a value, and `false`
171    /// otherwise.
172    fn initialized_mut(&mut self) -> bool {
173        *self.value_set.get_mut()
174    }
175
176    // SAFETY: The OnceCell must not be empty.
177    unsafe fn get_unchecked(&self) -> &T {
178        let ptr = self.value.get();
179        &*(*ptr).as_ptr()
180    }
181
182    // SAFETY: The OnceCell must not be empty.
183    unsafe fn get_unchecked_mut(&mut self) -> &mut T {
184        let ptr = self.value.get();
185        &mut *(*ptr).as_mut_ptr()
186    }
187
188    fn set_value(&self, value: T, permit: SemaphorePermit<'_>) -> &T {
189        // SAFETY: We are holding the only permit on the semaphore.
190        unsafe {
191            let ptr = self.value.get();
192            (*ptr).as_mut_ptr().write(value);
193        }
194
195        // Using release ordering so any threads that read a true from this
196        // atomic is able to read the value we just stored.
197        *self.value_set.borrow_mut() = true;
198        self.semaphore.close();
199        permit.forget();
200
201        // SAFETY: We just initialized the cell.
202        unsafe { self.get_unchecked() }
203    }
204
205    /// Returns a reference to the value currently stored in the `OnceCell`, or
206    /// `None` if the `OnceCell` is empty.
207    pub fn get(&self) -> Option<&T> {
208        if self.initialized() {
209            Some(unsafe { self.get_unchecked() })
210        } else {
211            None
212        }
213    }
214
215    /// Returns a mutable reference to the value currently stored in the
216    /// `OnceCell`, or `None` if the `OnceCell` is empty.
217    ///
218    /// Since this call borrows the `OnceCell` mutably, it is safe to mutate the
219    /// value inside the `OnceCell` — the mutable borrow statically guarantees
220    /// no other references exist.
221    pub fn get_mut(&mut self) -> Option<&mut T> {
222        if self.initialized_mut() {
223            Some(unsafe { self.get_unchecked_mut() })
224        } else {
225            None
226        }
227    }
228
229    /// Set the value of the `OnceCell` to the given value if the `OnceCell` is
230    /// empty.
231    ///
232    /// If the `OnceCell` already has a value, this call will fail with an
233    /// [`SetError::AlreadyInitializedError`].
234    ///
235    /// If the `OnceCell` is empty, but some other task is currently trying to
236    /// set the value, this call will fail with [`SetError::InitializingError`].
237    ///
238    /// [`SetError::AlreadyInitializedError`]: crate::sync::SetError::AlreadyInitializedError
239    /// [`SetError::InitializingError`]: crate::sync::SetError::InitializingError
240    pub fn set(&self, value: T) -> Result<(), SetError<T>> {
241        if self.initialized() {
242            return Err(SetError::AlreadyInitializedError(value));
243        }
244
245        // Another task might be initializing the cell, in which case
246        // `try_acquire` will return an error. If we succeed to acquire the
247        // permit, then we can set the value.
248        match self.semaphore.try_acquire() {
249            Ok(permit) => {
250                debug_assert!(!self.initialized());
251                self.set_value(value, permit);
252                Ok(())
253            }
254            Err(TryAcquireError::NoPermits) => {
255                // Some other task is holding the permit. That task is
256                // currently trying to initialize the value.
257                Err(SetError::InitializingError(value))
258            }
259            Err(TryAcquireError::Closed) => {
260                // The semaphore was closed. Some other task has initialized
261                // the value.
262                Err(SetError::AlreadyInitializedError(value))
263            }
264        }
265    }
266
267    /// Get the value currently in the `OnceCell`, or initialize it with the
268    /// given asynchronous operation.
269    ///
270    /// If some other task is currently working on initializing the `OnceCell`,
271    /// this call will wait for that other task to finish, then return the value
272    /// that the other task produced.
273    ///
274    /// If the provided operation is cancelled or panics, the initialization
275    /// attempt is cancelled. If there are other tasks waiting for the value to
276    /// be initialized, one of them will start another attempt at initializing
277    /// the value.
278    ///
279    /// This will deadlock if `f` tries to initialize the cell recursively.
280    pub async fn get_or_init<F, Fut>(&self, f: F) -> &T
281    where
282        F: FnOnce() -> Fut,
283        Fut: Future<Output = T>,
284    {
285        if self.initialized() {
286            // SAFETY: The OnceCell has been fully initialized.
287            unsafe { self.get_unchecked() }
288        } else {
289            // Here we try to acquire the semaphore permit. Holding the permit
290            // will allow us to set the value of the OnceCell, and prevents
291            // other tasks from initializing the OnceCell while we are holding
292            // it.
293            match self.semaphore.acquire().await {
294                Ok(permit) => {
295                    debug_assert!(!self.initialized());
296
297                    // If `f()` panics or `select!` is called, this
298                    // `get_or_init` call is aborted and the semaphore permit is
299                    // dropped.
300                    let value = f().await;
301
302                    self.set_value(value, permit)
303                }
304                Err(_) => {
305                    debug_assert!(self.initialized());
306
307                    // SAFETY: The semaphore has been closed. This only happens
308                    // when the OnceCell is fully initialized.
309                    unsafe { self.get_unchecked() }
310                }
311            }
312        }
313    }
314
315    /// Get the value currently in the `OnceCell`, or initialize it with the
316    /// given asynchronous operation.
317    ///
318    /// If some other task is currently working on initializing the `OnceCell`,
319    /// this call will wait for that other task to finish, then return the value
320    /// that the other task produced.
321    ///
322    /// If the provided operation returns an error, is cancelled or panics, the
323    /// initialization attempt is cancelled. If there are other tasks waiting
324    /// for the value to be initialized, one of them will start another attempt
325    /// at initializing the value.
326    ///
327    /// This will deadlock if `f` tries to initialize the cell recursively.
328    pub async fn get_or_try_init<E, F, Fut>(&self, f: F) -> Result<&T, E>
329    where
330        F: FnOnce() -> Fut,
331        Fut: Future<Output = Result<T, E>>,
332    {
333        if self.initialized() {
334            // SAFETY: The OnceCell has been fully initialized.
335            unsafe { Ok(self.get_unchecked()) }
336        } else {
337            // Here we try to acquire the semaphore permit. Holding the permit
338            // will allow us to set the value of the OnceCell, and prevents
339            // other tasks from initializing the OnceCell while we are holding
340            // it.
341            match self.semaphore.acquire().await {
342                Ok(permit) => {
343                    debug_assert!(!self.initialized());
344
345                    // If `f()` panics or `select!` is called, this
346                    // `get_or_try_init` call is aborted and the semaphore
347                    // permit is dropped.
348                    let value = f().await;
349
350                    match value {
351                        Ok(value) => Ok(self.set_value(value, permit)),
352                        Err(e) => Err(e),
353                    }
354                }
355                Err(_) => {
356                    debug_assert!(self.initialized());
357
358                    // SAFETY: The semaphore has been closed. This only happens
359                    // when the OnceCell is fully initialized.
360                    unsafe { Ok(self.get_unchecked()) }
361                }
362            }
363        }
364    }
365
366    /// Take the value from the cell, destroying the cell in the process.
367    /// Returns `None` if the cell is empty.
368    pub fn into_inner(mut self) -> Option<T> {
369        if self.initialized_mut() {
370            // Set to uninitialized for the destructor of `OnceCell` to work properly
371            *self.value_set.get_mut() = false;
372
373            let ptr = self.value.get();
374            Some(unsafe { ptr::read(ptr).assume_init() })
375        } else {
376            None
377        }
378    }
379
380    /// Takes ownership of the current value, leaving the cell empty.  Returns
381    /// `None` if the cell is empty.
382    pub fn take(&mut self) -> Option<T> {
383        std::mem::take(self).into_inner()
384    }
385}
386
387/// Errors that can be returned from [`OnceCell::set`].
388///
389/// [`OnceCell::set`]: crate::sync::OnceCell::set
390#[derive(Debug, PartialEq)]
391pub enum SetError<T> {
392    /// The cell was already initialized when [`OnceCell::set`] was called.
393    ///
394    /// [`OnceCell::set`]: crate::sync::OnceCell::set
395    AlreadyInitializedError(T),
396
397    /// The cell is currently being initialized.
398    InitializingError(T),
399}
400
401impl<T> fmt::Display for SetError<T> {
402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403        match self {
404            SetError::AlreadyInitializedError(_) => write!(f, "AlreadyInitializedError"),
405            SetError::InitializingError(_) => write!(f, "InitializingError"),
406        }
407    }
408}
409
410impl<T: fmt::Debug> Error for SetError<T> {}
411
412impl<T> SetError<T> {
413    /// Whether `SetError` is `SetError::AlreadyInitializedError`.
414    pub fn is_already_init_err(&self) -> bool {
415        match self {
416            SetError::AlreadyInitializedError(_) => true,
417            SetError::InitializingError(_) => false,
418        }
419    }
420
421    /// Whether `SetError` is `SetError::InitializingError`
422    pub fn is_initializing_err(&self) -> bool {
423        match self {
424            SetError::AlreadyInitializedError(_) => false,
425            SetError::InitializingError(_) => true,
426        }
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use std::ptr::NonNull;
433
434    use super::OnceCell;
435
436    #[monoio::test]
437    async fn test_once_cell_global() {
438        thread_local! {
439            static ONCE: OnceCell<u32> = OnceCell::new();
440        }
441        async fn get_global_integer() -> &'static u32 {
442            let once = ONCE.with(|once| unsafe {
443                NonNull::new_unchecked(once as *const _ as *mut OnceCell<u32>).as_ref()
444            });
445            once.get_or_init(|| async { 1 + 1 }).await
446        }
447
448        assert_eq!(*get_global_integer().await, 2);
449        assert_eq!(*get_global_integer().await, 2);
450    }
451
452    #[monoio::test]
453    async fn test_once_cell() {
454        let once: OnceCell<u32> = OnceCell::new();
455        assert_eq!(once.get_or_init(|| async { 1 + 1 }).await, &2);
456        assert_eq!(once.get_or_init(|| async { 1 + 2 }).await, &2);
457    }
458}