orx_concurrent_option/concurrent_option.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
use crate::{handle::Handle, mut_handle::MutHandle, states::StateU8};
use core::{cell::UnsafeCell, mem::MaybeUninit, sync::atomic::AtomicU8};
/// ConcurrentOption is a thread-safe and lock-free read-write option type.
///
/// ## ConcurrentOption Methods In Groups
///
/// ConcurrentOption methods are based on the standard Option with minor differences in order to better fit concurrent programs.
///
/// For example, instead of `fn map<U, F>(self, f: F) -> Option<U>`
/// * ConcurrentOption implements `fn map<U, F>(&self, f: F) -> Option<U>` which is specialized to map over the reference while guaranteeing the lack of data race.
/// * Note that the prior result can trivially be obtained by `maybe.exclusive_take().map(f)` when we have the ownership.
///
/// ### ⬤ Methods requiring self or &mut self
///
/// These methods are safe by the borrow checker and they behave similar to the original variants.
///
/// In order to separate them from the thread-safe versions, methods requiring `&mut self` are prefixed with **exclusive_**.
///
/// Some such methods are `unwrap`, `expect`, `exclusive_mut` or `exclusive_take`.
///
/// ### ⬤ Thread safe versions of mutating methods
///
/// Thread safe variants of mutating methods are available and they can be safely be called with a shared `&self` reference.
///
/// Some examples are `take`, `take_if`, `replace`, etc.
///
/// These methods guarantee that there exist no other mutation or no reading during the mutation.
///
/// ### ⬤ Thread safe versions of read methods
///
/// Thread safe variants of methods which access the underlying value to calculate a result are available.
///
/// Some examples are `is_some`, `map`, `and_then`, etc.
///
/// These methods guarantee that there exist no mutation while reading the data.
///
/// ### ⬤ Partially thread safe methods
///
/// Methods which return a shared reference `&T` or mutable reference `&mut T` to the underlying value of the optional are marked as `unsafe`.
///
/// These methods internally guarantee the creation of a valid reference in the absence of a data race. In this sense, they are thread safe.
///
/// On the other hand, since they return the reference, the reference is leaked outside the type. A succeeding mutation might lead to a data race, and hence, to an undefined behavior.
///
/// Some example methods are `as_ref`, `as_deref`, `insert`, etc.
///
/// ### ⬤ Methods to allow manual control on concurrency
///
/// ConcurrentOption also exposes methods which accepts a `core::sync::atomic::Ordering` and gives the control to the caller. These methods are suffixed with **with_order**, except for the state.
///
/// Some such methods are `state`, `as_ref_with_order`, `get_raw_with_order`, `clone_with_order`, etc.
///
/// ## Examples
///
/// ### Concurrent Read & Write
///
/// The following example demonstrates the ease of concurrently mutating the state of the option while safely reading the underlying data with multiple reader and writer threads.
///
/// ```rust
/// use orx_concurrent_option::*;
/// use std::time::Duration;
///
/// enum MutOperation {
///     InitializeIfNone,
///     UpdateIfSome,
///     Replace,
///     Take,
///     TakeIf,
/// }
///
/// impl MutOperation {
///     fn new(i: usize) -> Self {
///         match i % 5 {
///             0 => Self::InitializeIfNone,
///             1 => Self::UpdateIfSome,
///             2 => Self::Replace,
///             3 => Self::Take,
///             _ => Self::TakeIf,
///         }
///     }
/// }
///
/// let num_readers = 8;
/// let num_writers = 8;
///
/// let values = vec![ConcurrentOption::<String>::none(); 8];
///
/// std::thread::scope(|s| {
///     for _ in 0..num_readers {
///         s.spawn(|| {
///             for _ in 0..100 {
///                 std::thread::sleep(Duration::from_millis(100));
///                 let mut num_chars = 0;
///                 for maybe in &values {
///                     // concurrently access the value
///                     num_chars += maybe.map(|x| x.len()).unwrap_or(0);
///                 }
///                 assert!(num_chars <= 100);
///             }
///         });
///     }
///
///     for _ in 0..num_writers {
///         s.spawn(|| {
///             for i in 0..100 {
///                 std::thread::sleep(Duration::from_millis(100));
///                 let e = i % values.len();
///
///                 // concurrently update the option
///                 match MutOperation::new(i) {
///                     MutOperation::InitializeIfNone => {
///                         values[e].initialize_if_none(e.to_string());
///                     }
///                     MutOperation::UpdateIfSome => {
///                         values[e].update_if_some(|x| *x = format!("{}!", x));
///                     }
///                     MutOperation::Replace => {
///                         values[e].replace(e.to_string());
///                     }
///                     MutOperation::Take => {
///                         _ = values[e].take();
///                     }
///                     MutOperation::TakeIf => _ = values[e].take_if(|x| x.len() < 2),
///                 }
///                 let e = i % values.len();
///                 _ = values[e].initialize_if_none(e.to_string());
///             }
///         });
///     }
/// })
/// ```
///
/// ### Concurrent Initialize & Read
///
/// A common use case for option is to model a delayed initialization; rather than concurrent mutation. In other words, we start with a None variant and at some point we receive the value and convert our option to Some(value), which will then stay as Some(value) throughout its lifetime.
///
/// This scenario demonstrates a use case where we can safely leak a reference outside the optional:
/// * All references provided by ConcurrentOption are valid and data race free at the point they are obtained. In other words, we can only obtain a reference after the value is initialized; i.e., the option becomes Some(value).
/// * Since we will never mutate the option after initialization, we can safely keep a reference to it without a concern about a data race.
///   * However, no further mutation is our promise and responsibility as the caller. ConcurrentOption has no control over the leaked references; and hence, obtaining the reference is through the unsafe `as_ref` method.
///
/// For this scenario, we can make use of two matching methods:
/// * `initialize_if_none` is a thread safe method to initialize the value of the option to the given value. It is safe to call the method on a Some variant, it will have no impact. Further, it makes sure that no reader can access the value until it is completely initialized.
/// * `as_ref` method returns a reference to the underlying value if the option is a Some variant. Otherwise, if the value has not been initialized, we will safely receive None. Note that we could also use `as_ref_with_order` paired up with `Acquire` or `SeqCst` ordering if we want to model the access ordering manually.
///
/// ```rust
/// use orx_concurrent_option::*;
///
/// fn reader(maybe: &ConcurrentOption<String>) {
///     let mut is_none_at_least_once = false;
///     let mut is_seven_at_least_once = false;
///     for _ in 0..100 {
///         std::thread::sleep(std::time::Duration::from_millis(100));
///
///         let read = unsafe { maybe.as_ref() };
///         let is_none = read.is_none();
///         let is_seven = read == Some(&7.to_string());
///
///         assert!(is_none || is_seven);
///
///         is_none_at_least_once |= is_none;
///         is_seven_at_least_once |= is_seven;
///     }
///     assert!(is_none_at_least_once && is_seven_at_least_once);
/// }
///
/// fn initializer(maybe: &ConcurrentOption<String>) {
///     for _ in 0..50 {
///         // wait for a while to simulate a delay
///         std::thread::sleep(std::time::Duration::from_millis(100));
///     }
///
///     let _ = maybe.initialize_if_none(7.to_string());
///
///     for _ in 0..50 {
///         // it is safe to call `initialize_if_none` on Some variant
///         // it will do nothing
///         let inserted = maybe.initialize_if_none(1_000_000.to_string());
///         assert!(!inserted);
///     }
/// }
///
/// let num_readers = 8;
/// let num_writers = 8;
///
/// let maybe = ConcurrentOption::<String>::none();
/// let maybe_ref = &maybe;
///
/// std::thread::scope(|s| {
///     for _ in 0..num_readers {
///         s.spawn(|| reader(maybe_ref));
///     }
///     for _ in 0..num_writers {
///         s.spawn(|| initializer(maybe_ref));
///     }
/// });
///
/// assert_eq!(maybe.unwrap(), 7.to_string());
/// ```
pub struct ConcurrentOption<T> {
    pub(crate) value: UnsafeCell<MaybeUninit<T>>,
    pub(crate) state: AtomicU8,
}
impl<T> ConcurrentOption<T> {
    pub(crate) fn get_handle(
        &self,
        initial_state: StateU8,
        success_state: StateU8,
    ) -> Option<Handle<'_>> {
        Handle::get(&self.state, initial_state, success_state)
    }
    #[inline(always)]
    pub(crate) fn spin_get_handle(
        &self,
        initial_state: StateU8,
        success_state: StateU8,
    ) -> Option<Handle<'_>> {
        Handle::spin_get(&self.state, initial_state, success_state)
    }
    /// Provides the mut handle on the value of the optional:
    /// * the optional must be in the `initial_state` for this method to succeed,
    /// * the optional will be brought to `success_state` once the handle is dropped.
    ///
    /// # Safety
    ///
    /// This method is unsafe since the handle provides direct access to the underlying
    /// value, skipping thread-safety guarantees.
    pub unsafe fn mut_handle(
        &self,
        initial_state: StateU8,
        success_state: StateU8,
    ) -> Option<MutHandle<T>> {
        MutHandle::spin_get(self, initial_state, success_state)
    }
}
unsafe impl<T: Send> Send for ConcurrentOption<T> {}
unsafe impl<T: Sync> Sync for ConcurrentOption<T> {}