local_sync/once_cell.rs
1//! Once Cell impl borrowed from tokio.
2
3use std::cell::{RefCell, UnsafeCell};
4use std::error::Error;
5use std::fmt;
6use std::future::Future;
7use std::mem::MaybeUninit;
8use std::ops::Drop;
9use std::ptr;
10
11use crate::semaphore::{Semaphore, SemaphorePermit, TryAcquireError};
12
13// This file contains an implementation of an OnceCell. The principle
14// behind the safety the of the cell is that any thread with an `&OnceCell` may
15// access the `value` field according the following rules:
16//
17// 1. When `value_set` is false, the `value` field may be modified by the
18// thread holding the permit on the semaphore.
19// 2. When `value_set` is true, the `value` field may be accessed immutably by
20// any thread.
21//
22// It is an invariant that if the semaphore is closed, then `value_set` is true.
23// The reverse does not necessarily hold — but if not, the semaphore may not
24// have any available permits.
25//
26// A thread with a `&mut OnceCell` may modify the value in any way it wants as
27// long as the invariants are upheld.
28
29/// A thread-safe cell that can be written to only once.
30///
31/// A `OnceCell` is typically used for global variables that need to be
32/// initialized once on first use, but need no further changes. The `OnceCell`
33/// in Tokio allows the initialization procedure to be asynchronous.
34///
35/// # Examples
36///
37/// ```
38/// use local_sync::OnceCell;
39///
40/// async fn some_computation() -> u32 {
41/// 1 + 1
42/// }
43///
44/// thread_local! {
45/// static ONCE: OnceCell<u32> = OnceCell::new();
46/// }
47///
48/// #[monoio::main]
49/// async fn main() {
50/// let once = ONCE.with(|once| unsafe {
51/// std::ptr::NonNull::new_unchecked(once as *const _ as *mut OnceCell<u32>).as_ref()
52/// });
53/// let result = once.get_or_init(some_computation).await;
54/// assert_eq!(*result, 2);
55/// }
56/// ```
57///
58/// It is often useful to write a wrapper method for accessing the value.
59///
60/// ```
61/// use local_sync::OnceCell;
62///
63/// thread_local! {
64/// static ONCE: OnceCell<u32> = OnceCell::new();
65/// }
66///
67/// async fn get_global_integer() -> &'static u32 {
68/// let once = ONCE.with(|once| unsafe {
69/// std::ptr::NonNull::new_unchecked(once as *const _ as *mut OnceCell<u32>).as_ref()
70/// });
71/// once.get_or_init(|| async {
72/// 1 + 1
73/// }).await
74/// }
75///
76/// #[monoio::main]
77/// async fn main() {
78/// let result = get_global_integer().await;
79/// assert_eq!(*result, 2);
80/// }
81/// ```
82pub struct OnceCell<T> {
83 value_set: RefCell<bool>,
84 value: UnsafeCell<MaybeUninit<T>>,
85 semaphore: Semaphore,
86}
87
88impl<T> Default for OnceCell<T> {
89 fn default() -> OnceCell<T> {
90 OnceCell::new()
91 }
92}
93
94impl<T: fmt::Debug> fmt::Debug for OnceCell<T> {
95 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
96 fmt.debug_struct("OnceCell")
97 .field("value", &self.get())
98 .finish()
99 }
100}
101
102impl<T: Clone> Clone for OnceCell<T> {
103 fn clone(&self) -> OnceCell<T> {
104 OnceCell::new_with(self.get().cloned())
105 }
106}
107
108impl<T: PartialEq> PartialEq for OnceCell<T> {
109 fn eq(&self, other: &OnceCell<T>) -> bool {
110 self.get() == other.get()
111 }
112}
113
114impl<T: Eq> Eq for OnceCell<T> {}
115
116impl<T> Drop for OnceCell<T> {
117 fn drop(&mut self) {
118 if self.initialized_mut() {
119 unsafe {
120 let ptr = self.value.get();
121 ptr::drop_in_place((&mut *ptr).as_mut_ptr());
122 };
123 }
124 }
125}
126
127impl<T> From<T> for OnceCell<T> {
128 fn from(value: T) -> Self {
129 let semaphore = Semaphore::new(0);
130 semaphore.close();
131 OnceCell {
132 value_set: RefCell::new(true),
133 value: UnsafeCell::new(MaybeUninit::new(value)),
134 semaphore,
135 }
136 }
137}
138
139impl<T> OnceCell<T> {
140 /// Creates a new empty `OnceCell` instance.
141 pub const fn new() -> Self {
142 OnceCell {
143 value_set: RefCell::new(false),
144 value: UnsafeCell::new(MaybeUninit::uninit()),
145 semaphore: Semaphore::new(1),
146 }
147 }
148
149 /// Creates a new `OnceCell` that contains the provided value, if any.
150 ///
151 /// If the `Option` is `None`, this is equivalent to `OnceCell::new`.
152 ///
153 /// [`OnceCell::new`]: crate::sync::OnceCell::new
154 pub fn new_with(value: Option<T>) -> Self {
155 if let Some(v) = value {
156 OnceCell::from(v)
157 } else {
158 OnceCell::new()
159 }
160 }
161
162 /// Returns `true` if the `OnceCell` currently contains a value, and `false`
163 /// otherwise.
164 pub fn initialized(&self) -> bool {
165 // Using acquire ordering so any threads that read a true from this
166 // atomic is able to read the value.
167 *self.value_set.borrow()
168 }
169
170 /// Returns `true` if the `OnceCell` currently contains a value, and `false`
171 /// otherwise.
172 fn initialized_mut(&mut self) -> bool {
173 *self.value_set.get_mut()
174 }
175
176 // SAFETY: The OnceCell must not be empty.
177 unsafe fn get_unchecked(&self) -> &T {
178 let ptr = self.value.get();
179 &*(*ptr).as_ptr()
180 }
181
182 // SAFETY: The OnceCell must not be empty.
183 unsafe fn get_unchecked_mut(&mut self) -> &mut T {
184 let ptr = self.value.get();
185 &mut *(*ptr).as_mut_ptr()
186 }
187
188 fn set_value(&self, value: T, permit: SemaphorePermit<'_>) -> &T {
189 // SAFETY: We are holding the only permit on the semaphore.
190 unsafe {
191 let ptr = self.value.get();
192 (*ptr).as_mut_ptr().write(value);
193 }
194
195 // Using release ordering so any threads that read a true from this
196 // atomic is able to read the value we just stored.
197 *self.value_set.borrow_mut() = true;
198 self.semaphore.close();
199 permit.forget();
200
201 // SAFETY: We just initialized the cell.
202 unsafe { self.get_unchecked() }
203 }
204
205 /// Returns a reference to the value currently stored in the `OnceCell`, or
206 /// `None` if the `OnceCell` is empty.
207 pub fn get(&self) -> Option<&T> {
208 if self.initialized() {
209 Some(unsafe { self.get_unchecked() })
210 } else {
211 None
212 }
213 }
214
215 /// Returns a mutable reference to the value currently stored in the
216 /// `OnceCell`, or `None` if the `OnceCell` is empty.
217 ///
218 /// Since this call borrows the `OnceCell` mutably, it is safe to mutate the
219 /// value inside the `OnceCell` — the mutable borrow statically guarantees
220 /// no other references exist.
221 pub fn get_mut(&mut self) -> Option<&mut T> {
222 if self.initialized_mut() {
223 Some(unsafe { self.get_unchecked_mut() })
224 } else {
225 None
226 }
227 }
228
229 /// Set the value of the `OnceCell` to the given value if the `OnceCell` is
230 /// empty.
231 ///
232 /// If the `OnceCell` already has a value, this call will fail with an
233 /// [`SetError::AlreadyInitializedError`].
234 ///
235 /// If the `OnceCell` is empty, but some other task is currently trying to
236 /// set the value, this call will fail with [`SetError::InitializingError`].
237 ///
238 /// [`SetError::AlreadyInitializedError`]: crate::sync::SetError::AlreadyInitializedError
239 /// [`SetError::InitializingError`]: crate::sync::SetError::InitializingError
240 pub fn set(&self, value: T) -> Result<(), SetError<T>> {
241 if self.initialized() {
242 return Err(SetError::AlreadyInitializedError(value));
243 }
244
245 // Another task might be initializing the cell, in which case
246 // `try_acquire` will return an error. If we succeed to acquire the
247 // permit, then we can set the value.
248 match self.semaphore.try_acquire() {
249 Ok(permit) => {
250 debug_assert!(!self.initialized());
251 self.set_value(value, permit);
252 Ok(())
253 }
254 Err(TryAcquireError::NoPermits) => {
255 // Some other task is holding the permit. That task is
256 // currently trying to initialize the value.
257 Err(SetError::InitializingError(value))
258 }
259 Err(TryAcquireError::Closed) => {
260 // The semaphore was closed. Some other task has initialized
261 // the value.
262 Err(SetError::AlreadyInitializedError(value))
263 }
264 }
265 }
266
267 /// Get the value currently in the `OnceCell`, or initialize it with the
268 /// given asynchronous operation.
269 ///
270 /// If some other task is currently working on initializing the `OnceCell`,
271 /// this call will wait for that other task to finish, then return the value
272 /// that the other task produced.
273 ///
274 /// If the provided operation is cancelled or panics, the initialization
275 /// attempt is cancelled. If there are other tasks waiting for the value to
276 /// be initialized, one of them will start another attempt at initializing
277 /// the value.
278 ///
279 /// This will deadlock if `f` tries to initialize the cell recursively.
280 pub async fn get_or_init<F, Fut>(&self, f: F) -> &T
281 where
282 F: FnOnce() -> Fut,
283 Fut: Future<Output = T>,
284 {
285 if self.initialized() {
286 // SAFETY: The OnceCell has been fully initialized.
287 unsafe { self.get_unchecked() }
288 } else {
289 // Here we try to acquire the semaphore permit. Holding the permit
290 // will allow us to set the value of the OnceCell, and prevents
291 // other tasks from initializing the OnceCell while we are holding
292 // it.
293 match self.semaphore.acquire().await {
294 Ok(permit) => {
295 debug_assert!(!self.initialized());
296
297 // If `f()` panics or `select!` is called, this
298 // `get_or_init` call is aborted and the semaphore permit is
299 // dropped.
300 let value = f().await;
301
302 self.set_value(value, permit)
303 }
304 Err(_) => {
305 debug_assert!(self.initialized());
306
307 // SAFETY: The semaphore has been closed. This only happens
308 // when the OnceCell is fully initialized.
309 unsafe { self.get_unchecked() }
310 }
311 }
312 }
313 }
314
315 /// Get the value currently in the `OnceCell`, or initialize it with the
316 /// given asynchronous operation.
317 ///
318 /// If some other task is currently working on initializing the `OnceCell`,
319 /// this call will wait for that other task to finish, then return the value
320 /// that the other task produced.
321 ///
322 /// If the provided operation returns an error, is cancelled or panics, the
323 /// initialization attempt is cancelled. If there are other tasks waiting
324 /// for the value to be initialized, one of them will start another attempt
325 /// at initializing the value.
326 ///
327 /// This will deadlock if `f` tries to initialize the cell recursively.
328 pub async fn get_or_try_init<E, F, Fut>(&self, f: F) -> Result<&T, E>
329 where
330 F: FnOnce() -> Fut,
331 Fut: Future<Output = Result<T, E>>,
332 {
333 if self.initialized() {
334 // SAFETY: The OnceCell has been fully initialized.
335 unsafe { Ok(self.get_unchecked()) }
336 } else {
337 // Here we try to acquire the semaphore permit. Holding the permit
338 // will allow us to set the value of the OnceCell, and prevents
339 // other tasks from initializing the OnceCell while we are holding
340 // it.
341 match self.semaphore.acquire().await {
342 Ok(permit) => {
343 debug_assert!(!self.initialized());
344
345 // If `f()` panics or `select!` is called, this
346 // `get_or_try_init` call is aborted and the semaphore
347 // permit is dropped.
348 let value = f().await;
349
350 match value {
351 Ok(value) => Ok(self.set_value(value, permit)),
352 Err(e) => Err(e),
353 }
354 }
355 Err(_) => {
356 debug_assert!(self.initialized());
357
358 // SAFETY: The semaphore has been closed. This only happens
359 // when the OnceCell is fully initialized.
360 unsafe { Ok(self.get_unchecked()) }
361 }
362 }
363 }
364 }
365
366 /// Take the value from the cell, destroying the cell in the process.
367 /// Returns `None` if the cell is empty.
368 pub fn into_inner(mut self) -> Option<T> {
369 if self.initialized_mut() {
370 // Set to uninitialized for the destructor of `OnceCell` to work properly
371 *self.value_set.get_mut() = false;
372
373 let ptr = self.value.get();
374 Some(unsafe { ptr::read(ptr).assume_init() })
375 } else {
376 None
377 }
378 }
379
380 /// Takes ownership of the current value, leaving the cell empty. Returns
381 /// `None` if the cell is empty.
382 pub fn take(&mut self) -> Option<T> {
383 std::mem::take(self).into_inner()
384 }
385}
386
387/// Errors that can be returned from [`OnceCell::set`].
388///
389/// [`OnceCell::set`]: crate::sync::OnceCell::set
390#[derive(Debug, PartialEq)]
391pub enum SetError<T> {
392 /// The cell was already initialized when [`OnceCell::set`] was called.
393 ///
394 /// [`OnceCell::set`]: crate::sync::OnceCell::set
395 AlreadyInitializedError(T),
396
397 /// The cell is currently being initialized.
398 InitializingError(T),
399}
400
401impl<T> fmt::Display for SetError<T> {
402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403 match self {
404 SetError::AlreadyInitializedError(_) => write!(f, "AlreadyInitializedError"),
405 SetError::InitializingError(_) => write!(f, "InitializingError"),
406 }
407 }
408}
409
410impl<T: fmt::Debug> Error for SetError<T> {}
411
412impl<T> SetError<T> {
413 /// Whether `SetError` is `SetError::AlreadyInitializedError`.
414 pub fn is_already_init_err(&self) -> bool {
415 match self {
416 SetError::AlreadyInitializedError(_) => true,
417 SetError::InitializingError(_) => false,
418 }
419 }
420
421 /// Whether `SetError` is `SetError::InitializingError`
422 pub fn is_initializing_err(&self) -> bool {
423 match self {
424 SetError::AlreadyInitializedError(_) => false,
425 SetError::InitializingError(_) => true,
426 }
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use std::ptr::NonNull;
433
434 use super::OnceCell;
435
436 #[monoio::test]
437 async fn test_once_cell_global() {
438 thread_local! {
439 static ONCE: OnceCell<u32> = OnceCell::new();
440 }
441 async fn get_global_integer() -> &'static u32 {
442 let once = ONCE.with(|once| unsafe {
443 NonNull::new_unchecked(once as *const _ as *mut OnceCell<u32>).as_ref()
444 });
445 once.get_or_init(|| async { 1 + 1 }).await
446 }
447
448 assert_eq!(*get_global_integer().await, 2);
449 assert_eq!(*get_global_integer().await, 2);
450 }
451
452 #[monoio::test]
453 async fn test_once_cell() {
454 let once: OnceCell<u32> = OnceCell::new();
455 assert_eq!(once.get_or_init(|| async { 1 + 1 }).await, &2);
456 assert_eq!(once.get_or_init(|| async { 1 + 2 }).await, &2);
457 }
458}