orx_concurrent_option/
concurrent.rs

1use crate::{ConcurrentOption, states::*};
2use core::{mem::MaybeUninit, sync::atomic::Ordering};
3
4impl<T> ConcurrentOption<T> {
5    // concurrent state mutation - special
6
7    /// Thread safe method to initiate the value of the option with the given `value`.
8    ///
9    /// * Returns `true` if the option was `is_none` variant and initiated with the given value.
10    /// * It does nothing if the concurrent option is already of `is_some` variant, and returns `false`.
11    ///
12    /// Note that it is safe to call this method with a shared reference `&self`.
13    ///
14    /// This method is particularly useful for enabling concurrent read & write operations,
15    /// while the value is expected to be initialized only once.
16    ///
17    /// # Suggestion on Concurrent Read & Write Operations
18    ///
19    /// Note that this is a one-time-write method which can happen safely while other threads are reading the option.
20    /// It is however recommended to call read-methods with a `&self` reference,
21    /// such as `as_ref`, using an ordering stronger than `Relaxed`, such as `Acquire` or `SeqCst`,
22    /// if one or more threads are expected to call write methods `initialize_if_none` or `initialize_unchecked` concurrently.
23    ///
24    /// # Example
25    ///
26    /// ```rust
27    /// use orx_concurrent_option::*;
28    ///
29    /// let x = ConcurrentOption::<String>::none();
30    /// let inserted = x.initialize_if_none(3.to_string());
31    /// assert!(inserted);
32    /// assert_eq!(unsafe { x.as_ref() }, Some(&3.to_string()));
33    ///
34    /// let x = ConcurrentOption::some(7.to_string());
35    /// let inserted = x.initialize_if_none(3.to_string()); // does nothing
36    /// assert!(!inserted);
37    /// assert_eq!(unsafe { x.as_ref() }, Some(&7.to_string()));
38    /// ```
39    ///
40    /// A more advanced and useful example is demonstrated below:
41    /// * there exist multiple readers checking the value of an optional; they will receive
42    ///   * None if the value is not initialized yet;
43    ///   * a reference to the value otherwise.
44    /// * there exist multiple initializers each trying to set the value of optional;
45    ///   * only the first one will succeed,
46    ///   * since this is an initialization method, succeeding calls will safely be ignored.
47    ///
48    /// ```rust
49    /// use orx_concurrent_option::*;
50    ///
51    /// fn reader(maybe: &ConcurrentOption<String>) {
52    ///     let mut is_none_at_least_once = false;
53    ///     let mut is_seven_at_least_once = false;
54    ///
55    ///     for _ in 0..100 {
56    ///         std::thread::sleep(std::time::Duration::from_millis(100));
57    ///
58    ///         let read = unsafe { maybe.as_ref() };
59    ///
60    ///         let is_none = read.is_none();
61    ///         let is_seven = read == Some(&7.to_string());
62    ///
63    ///         assert!(is_none || is_seven);
64    ///
65    ///         is_none_at_least_once |= is_none;
66    ///         is_seven_at_least_once |= is_seven;
67    ///     }
68    ///
69    ///     assert!(is_none_at_least_once && is_seven_at_least_once);
70    /// }
71    ///
72    /// fn initializer(maybe: &ConcurrentOption<String>) {
73    ///     for _ in 0..50 {
74    ///         // wait for a while to simulate a delay
75    ///         std::thread::sleep(std::time::Duration::from_millis(100));
76    ///     }
77    ///
78    ///     let _ = maybe.initialize_if_none(7.to_string());
79    ///
80    ///     for _ in 0..50 {
81    ///         // it is safe to call `initialize_if_none` on Some variant, it will do nothing
82    ///         let inserted = maybe.initialize_if_none(1_000_000.to_string());
83    ///         assert!(!inserted);
84    ///     }
85    /// }
86    ///
87    /// let num_readers = 8;
88    /// let num_writers = 8;
89    ///
90    /// let maybe = ConcurrentOption::<String>::none();
91    /// let maybe_ref = &maybe;
92    ///
93    /// std::thread::scope(|s| {
94    ///     for _ in 0..num_readers {
95    ///         s.spawn(|| reader(maybe_ref));
96    ///     }
97    ///
98    ///     for _ in 0..num_writers {
99    ///         s.spawn(|| initializer(maybe_ref));
100    ///     }
101    /// });
102    ///
103    /// assert_eq!(maybe.unwrap(), 7.to_string());
104    /// ```
105    pub fn initialize_if_none(&self, value: T) -> bool {
106        match self.get_handle(NONE, SOME) {
107            Some(_handle) => {
108                unsafe { &mut *self.value.get() }.write(value);
109                true
110            }
111            None => false,
112        }
113    }
114
115    /// Thread safe method to initiate the value of the option with the given `value`
116    /// **provided that** the concurrent option `is_none` at the point of initializing.
117    ///
118    /// See [`initialize_if_none`] for checked version.
119    ///
120    /// [`initialize_if_none`]: ConcurrentOption::initialize_if_none
121    ///
122    /// Note that it is safe to call this method with a shared reference `&self`
123    /// **provided that** the concurrent option `is_none` at the point of initializing.
124    ///
125    /// This method is particularly useful for enabling concurrent read & write operations,
126    /// while the value is expected to be initialized only once.
127    ///
128    /// # Safety
129    ///
130    /// This method can be safely called when the concurrent option is guaranteed to be of None variant.
131    ///
132    /// On the other hand, calling it when the option has a value leads to undefined behavior
133    /// due to the following possible data race where we can be reading the value with a `&self` reference,
134    /// while at the same time writing with this unsafe method and a `&self` reference.
135    ///
136    /// # Suggestion on Concurrent Read & Write Operations
137    ///
138    /// Note that this is a one-time-write method which can happen safely while other threads are reading the option.
139    /// It is however recommended to call read-methods with a `&self` reference,
140    /// such as `as_ref`, using an ordering stronger than `Relaxed`, such as `Acquire` or `SeqCst`,
141    /// if one or more threads are expected to call write methods `initialize_if_none` or `initialize_unchecked` concurrently.
142    ///
143    /// # Example
144    ///
145    /// ```rust
146    /// use orx_concurrent_option::*;
147    /// use core::sync::atomic::Ordering;
148    ///
149    /// let x = ConcurrentOption::<String>::none();
150    /// unsafe { x.initialize_unchecked(3.to_string()) };
151    /// assert_eq!(unsafe { x.as_ref() }, Some(&3.to_string()));
152    ///
153    /// #[cfg(not(miri))]
154    /// {
155    ///     let x = ConcurrentOption::some(7.to_string());
156    ///     unsafe { x.initialize_unchecked(3.to_string()) }; // undefined behavior!
157    ///     assert_eq!(unsafe { x.as_ref() }, Some(&3.to_string()));
158    /// }
159    /// ```
160    ///
161    /// A more advanced and useful example is demonstrated below:
162    /// * there exist multiple readers checking the value of an optional; they will receive
163    ///   * None if the value is not initialized yet;
164    ///   * a reference to the value otherwise.
165    /// * there exist multiple initializers each trying to set the value of optional;
166    ///   * only the first one will succeed,
167    ///   * since this is an initialization method, succeeding calls will safely be ignored.
168    ///
169    /// ```rust
170    /// use orx_concurrent_option::*;
171    /// use core::sync::atomic::Ordering;
172    ///
173    /// fn reader(maybe: &ConcurrentOption<String>) {
174    ///     let mut is_none_at_least_once = false;
175    ///     let mut is_seven_at_least_once = false;
176    ///
177    ///     for _ in 0..100 {
178    ///         std::thread::sleep(std::time::Duration::from_millis(100));
179    ///
180    ///         let read = unsafe { maybe.as_ref() };
181    ///
182    ///         let is_none = read.is_none();
183    ///         let is_seven = read == Some(&7.to_string());
184    ///
185    ///         assert!(is_none || is_seven);
186    ///
187    ///         is_none_at_least_once |= is_none;
188    ///         is_seven_at_least_once |= is_seven;
189    ///     }
190    ///
191    ///     assert!(is_none_at_least_once && is_seven_at_least_once);
192    /// }
193    ///
194    /// fn unsafe_initializer(maybe: &ConcurrentOption<String>) {
195    ///     for _ in 0..50 {
196    ///         // wait for a while to simulate a delay
197    ///         std::thread::sleep(std::time::Duration::from_millis(100));
198    ///     }
199    ///
200    ///     // we need to make sure to call initialize_unchecked only once
201    ///     unsafe { maybe.initialize_unchecked(7.to_string()) };
202    /// }
203    ///
204    /// let num_readers = 8;
205    ///
206    /// let maybe = ConcurrentOption::<String>::none();
207    /// let maybe_ref = &maybe;
208    ///
209    /// std::thread::scope(|s| {
210    ///     for _ in 0..num_readers {
211    ///         s.spawn(|| reader(maybe_ref));
212    ///     }
213    ///
214    ///     s.spawn(|| unsafe_initializer(maybe_ref));
215    /// });
216    ///
217    /// assert_eq!(maybe.unwrap(), 7.to_string());
218    /// ```
219    pub unsafe fn initialize_unchecked(&self, value: T) {
220        unsafe { &mut *self.value.get() }.write(value);
221        self.state.store(SOME, Ordering::Release);
222    }
223
224    // concurrent state mutation
225
226    /// Thread safe method to update the value of the option if it is of Some variant.
227    /// Does nothing if it is None.
228    /// Returns whether or not the value is updated.
229    ///
230    /// # Example
231    ///
232    /// ```rust
233    /// use orx_concurrent_option::*;
234    ///
235    /// let maybe = ConcurrentOption::<String>::none();
236    /// let updated = maybe.update_if_some(|x| *x = format!("{}!", x));
237    /// assert!(maybe.is_none());
238    /// assert_eq!(updated, false);
239    ///
240    /// let maybe = ConcurrentOption::some(42.to_string());
241    /// let updated = maybe.update_if_some(|x| *x = format!("{}!", x));
242    /// assert!(maybe.is_some_and(|x| x == &"42!".to_string()));
243    /// assert_eq!(updated, true);
244    /// ```
245    pub fn update_if_some<F>(&self, mut f: F) -> bool
246    where
247        F: FnMut(&mut T),
248    {
249        match self.spin_get_handle(SOME, SOME) {
250            Some(_handle) => {
251                let x = unsafe { MaybeUninit::assume_init_mut(&mut *self.value.get()) };
252                f(x);
253                true
254            }
255            None => false,
256        }
257    }
258
259    /// Thread safe method to take the value out of the option if Some,
260    /// leaving a None in its place.
261    ///
262    /// Has no impact and returns None, if the option is of None variant.
263    ///
264    /// # Examples
265    ///
266    /// ```rust
267    /// use orx_concurrent_option::*;
268    ///
269    /// let x = ConcurrentOption::some(42);
270    /// let y = x.take();
271    /// assert_eq!(x, ConcurrentOption::none());
272    /// assert_eq!(y, Some(42));
273    ///
274    /// let x: ConcurrentOption<u32> = ConcurrentOption::none();
275    /// let y = x.take();
276    /// assert_eq!(x, ConcurrentOption::none());
277    /// assert_eq!(y, None);
278    /// ```
279    pub fn take(&self) -> Option<T> {
280        match self.spin_get_handle(SOME, NONE) {
281            Some(_handle) => {
282                let x = unsafe { &*self.value.get() };
283                Some(unsafe { MaybeUninit::assume_init_read(x) })
284            }
285            None => None,
286        }
287    }
288
289    /// Thread safe method to take the value out of the option, but only if the predicate evaluates to
290    /// `true` on a mutable reference to the value.
291    ///
292    /// In other words, replaces `self` with None if the predicate returns `true`.
293    /// This method operates similar to [`ConcurrentOption::take`] but conditional.
294    ///
295    /// # Examples
296    ///
297    /// ```rust
298    /// use orx_concurrent_option::*;
299    ///
300    /// let x = ConcurrentOption::some(42);
301    ///
302    /// let prev = x.take_if(|v| if *v == 42 {
303    ///     *v += 1;
304    ///     false
305    /// } else {
306    ///     false
307    /// });
308    /// assert_eq!(x, ConcurrentOption::some(43));
309    /// assert_eq!(prev, None);
310    ///
311    /// let prev = x.take_if(|v| *v == 43);
312    /// assert_eq!(x, ConcurrentOption::none());
313    /// assert_eq!(prev, Some(43));
314    /// ```
315    #[allow(clippy::missing_panics_doc, clippy::unwrap_in_result)]
316    pub fn take_if<P>(&self, predicate: P) -> Option<T>
317    where
318        P: FnOnce(&mut T) -> bool,
319    {
320        loop {
321            match self
322                .state
323                .compare_exchange(SOME, RESERVED, ORDER_LOAD, ORDER_LOAD)
324            {
325                Ok(_) => {
326                    let x = unsafe { &mut *self.value.get() };
327                    let x_mut = unsafe { MaybeUninit::assume_init_mut(x) };
328                    let output = match predicate(x_mut) {
329                        false => None,
330                        true => Some(unsafe { MaybeUninit::assume_init_read(x) }),
331                    };
332
333                    let success_state = match output.is_some() {
334                        true => NONE,
335                        false => SOME,
336                    };
337                    self.state
338                        .compare_exchange(RESERVED, success_state, ORDER_STORE, ORDER_STORE)
339                        .expect(
340                            "Failed to update the concurrent state after concurrent state mutation",
341                        );
342
343                    return output;
344                }
345                Err(previous_state) => match previous_state {
346                    RESERVED => continue,
347                    _ => return None,
348                },
349            }
350        }
351    }
352
353    /// Thread safe method to replace the actual value in the option by the value given in parameter,
354    /// returning the old value if present,
355    /// leaving a Some in its place without de-initializing either one.
356    ///
357    /// # Examples
358    ///
359    /// ```rust
360    /// use orx_concurrent_option::*;
361    ///
362    /// let x = ConcurrentOption::some(2);
363    /// let old = x.replace(5);
364    /// assert_eq!(x, ConcurrentOption::some(5));
365    /// assert_eq!(old, Some(2));
366    ///
367    /// let x: ConcurrentOption<u32> = ConcurrentOption::none();
368    /// let old = x.replace(3);
369    /// assert_eq!(x, ConcurrentOption::some(3));
370    /// assert_eq!(old, None);
371    /// ```
372    pub fn replace(&self, value: T) -> Option<T> {
373        loop {
374            if let Some(_handle) = self.spin_get_handle(SOME, SOME) {
375                let x = unsafe { (*self.value.get()).assume_init_mut() };
376                let old = core::mem::replace(x, value);
377                return Some(old);
378            }
379
380            if let Some(_handle) = self.spin_get_handle(NONE, SOME) {
381                let x = unsafe { &mut *self.value.get() };
382                x.write(value);
383                return None;
384            }
385        }
386    }
387
388    /// true if updated; false if initiated
389    pub fn set_some(&self, value: T) -> bool {
390        loop {
391            if let Some(_handle) = self.spin_get_handle(SOME, SOME) {
392                let x = unsafe { (*self.value.get()).assume_init_mut() };
393                let _old = core::mem::replace(x, value);
394                return true;
395            }
396
397            if let Some(_handle) = self.spin_get_handle(NONE, SOME) {
398                let x = unsafe { &mut *self.value.get() };
399                x.write(value);
400                return false;
401            }
402        }
403    }
404
405    /// Partially thread safe method to insert `value` into the option, and then to return a mutable reference to it.
406    ///
407    /// If the option already contains a value, the old value is dropped.
408    ///
409    /// See also [`Option::get_or_insert`], which doesn't update the value if
410    /// the option already contains Some.
411    ///
412    /// # Safety
413    ///
414    /// Note that the insertion part of this method is thread safe.
415    ///
416    /// The method is `unsafe` due to the returned mutable reference to the underlying value.
417    ///
418    /// * It is safe to use this method if the returned mutable reference is discarded (miri would still complain).
419    /// * It is also safe to use this method if the caller is able to guarantee that there exist
420    ///   no concurrent reads or writes while mutating the value.
421    /// * Otherwise, it will lead to an **Undefined Behavior** due to data race.
422    ///
423    /// # Examples
424    ///
425    /// ```rust
426    /// use orx_concurrent_option::*;
427    ///
428    /// let opt: ConcurrentOption<_> = ConcurrentOption::none();
429    ///
430    /// let val = unsafe { opt.insert(1) };
431    /// assert_eq!(*val, 1);
432    /// assert_eq!(unsafe { opt.as_ref() }, Some(&1));
433    ///
434    /// let val = unsafe { opt.insert(2) };
435    /// assert_eq!(*val, 2);
436    /// *val = 3;
437    /// assert_eq!(opt.unwrap(), 3);
438    /// ```
439    #[allow(clippy::mut_from_ref)]
440    pub unsafe fn insert(&self, value: T) -> &mut T {
441        loop {
442            if let Some(_handle) = self.spin_get_handle(SOME, SOME) {
443                let x = unsafe { (*self.value.get()).assume_init_mut() };
444                let _old = core::mem::replace(x, value);
445                return x;
446            }
447
448            if let Some(_handle) = self.spin_get_handle(NONE, SOME) {
449                let x = unsafe { &mut *self.value.get() };
450                x.write(value);
451                return unsafe { x.assume_init_mut() };
452            }
453        }
454    }
455
456    /// Inserts `value` into the option if it is None, then
457    /// returns a mutable reference to the contained value.
458    ///
459    /// See also [`ConcurrentOption::insert`], which updates the value even if
460    /// the option already contains Some.
461    ///
462    /// # Safety
463    ///
464    /// Note that the insertion part of this method is thread safe.
465    ///
466    /// The method is `unsafe` due to the returned mutable reference to the underlying value.
467    ///
468    /// * It is safe to use this method if the returned mutable reference is discarded (miri would still complain).
469    /// * It is also safe to use this method if the caller is able to guarantee that there exist
470    ///   no concurrent reads or writes while mutating the value.
471    /// * Otherwise, it will lead to an **Undefined Behavior** due to data race.
472    ///
473    /// # Examples
474    ///
475    /// ```rust
476    /// use orx_concurrent_option::*;
477    ///
478    /// let x = ConcurrentOption::none();
479    ///
480    /// {
481    ///     let y: &mut u32 = unsafe { x.get_or_insert(5) };
482    ///     assert_eq!(y, &5);
483    ///
484    ///     *y = 7;
485    /// }
486    ///
487    /// assert_eq!(x, ConcurrentOption::some(7));
488    /// ```
489    #[allow(clippy::mut_from_ref)]
490    pub unsafe fn get_or_insert(&self, value: T) -> &mut T {
491        loop {
492            if let Some(_handle) = self.spin_get_handle(SOME, SOME) {
493                return unsafe { (*self.value.get()).assume_init_mut() };
494            }
495
496            if let Some(_handle) = self.spin_get_handle(NONE, SOME) {
497                let x = unsafe { &mut *self.value.get() };
498                x.write(value);
499                return unsafe { x.assume_init_mut() };
500            }
501        }
502    }
503
504    /// Partially thread safe method to insert a value computed from `f` into the option if it is None,
505    /// then returns a mutable reference to the contained value.
506    ///
507    /// # Safety
508    ///
509    /// Note that the insertion part of this method is thread safe.
510    ///
511    /// The method is `unsafe` due to the returned mutable reference to the underlying value.
512    ///
513    /// * It is safe to use this method if the returned mutable reference is discarded (miri would still complain).
514    /// * It is also safe to use this method if the caller is able to guarantee that there exist
515    ///   no concurrent reads or writes while mutating the value.
516    /// * Otherwise, it will lead to an **Undefined Behavior** due to data race.
517    ///
518    /// # Examples
519    ///
520    /// ```rust
521    /// use orx_concurrent_option::*;
522    ///
523    /// let x = ConcurrentOption::none();
524    ///
525    /// {
526    ///     let y: &mut u32 = unsafe { x.get_or_insert_with(|| 5) };
527    ///     assert_eq!(y, &5);
528    ///
529    ///     *y = 7;
530    /// }
531    ///
532    /// assert_eq!(x, ConcurrentOption::some(7));
533    /// ```
534    #[allow(clippy::mut_from_ref)]
535    pub unsafe fn get_or_insert_with<F>(&self, f: F) -> &mut T
536    where
537        F: FnOnce() -> T,
538    {
539        loop {
540            if let Some(_handle) = self.spin_get_handle(SOME, SOME) {
541                return unsafe { (*self.value.get()).assume_init_mut() };
542            }
543
544            if let Some(_handle) = self.spin_get_handle(NONE, SOME) {
545                let x = unsafe { &mut *self.value.get() };
546                x.write(f());
547                return unsafe { x.assume_init_mut() };
548            }
549        }
550    }
551}