orx_concurrent_option/concurrent.rs
1use crate::{ConcurrentOption, states::*};
2use core::{mem::MaybeUninit, sync::atomic::Ordering};
3
4impl<T> ConcurrentOption<T> {
5 // concurrent state mutation - special
6
7 /// Thread safe method to initiate the value of the option with the given `value`.
8 ///
9 /// * Returns `true` if the option was `is_none` variant and initiated with the given value.
10 /// * It does nothing if the concurrent option is already of `is_some` variant, and returns `false`.
11 ///
12 /// Note that it is safe to call this method with a shared reference `&self`.
13 ///
14 /// This method is particularly useful for enabling concurrent read & write operations,
15 /// while the value is expected to be initialized only once.
16 ///
17 /// # Suggestion on Concurrent Read & Write Operations
18 ///
19 /// Note that this is a one-time-write method which can happen safely while other threads are reading the option.
20 /// It is however recommended to call read-methods with a `&self` reference,
21 /// such as `as_ref`, using an ordering stronger than `Relaxed`, such as `Acquire` or `SeqCst`,
22 /// if one or more threads are expected to call write methods `initialize_if_none` or `initialize_unchecked` concurrently.
23 ///
24 /// # Example
25 ///
26 /// ```rust
27 /// use orx_concurrent_option::*;
28 ///
29 /// let x = ConcurrentOption::<String>::none();
30 /// let inserted = x.initialize_if_none(3.to_string());
31 /// assert!(inserted);
32 /// assert_eq!(unsafe { x.as_ref() }, Some(&3.to_string()));
33 ///
34 /// let x = ConcurrentOption::some(7.to_string());
35 /// let inserted = x.initialize_if_none(3.to_string()); // does nothing
36 /// assert!(!inserted);
37 /// assert_eq!(unsafe { x.as_ref() }, Some(&7.to_string()));
38 /// ```
39 ///
40 /// A more advanced and useful example is demonstrated below:
41 /// * there exist multiple readers checking the value of an optional; they will receive
42 /// * None if the value is not initialized yet;
43 /// * a reference to the value otherwise.
44 /// * there exist multiple initializers each trying to set the value of optional;
45 /// * only the first one will succeed,
46 /// * since this is an initialization method, succeeding calls will safely be ignored.
47 ///
48 /// ```rust
49 /// use orx_concurrent_option::*;
50 ///
51 /// fn reader(maybe: &ConcurrentOption<String>) {
52 /// let mut is_none_at_least_once = false;
53 /// let mut is_seven_at_least_once = false;
54 ///
55 /// for _ in 0..100 {
56 /// std::thread::sleep(std::time::Duration::from_millis(100));
57 ///
58 /// let read = unsafe { maybe.as_ref() };
59 ///
60 /// let is_none = read.is_none();
61 /// let is_seven = read == Some(&7.to_string());
62 ///
63 /// assert!(is_none || is_seven);
64 ///
65 /// is_none_at_least_once |= is_none;
66 /// is_seven_at_least_once |= is_seven;
67 /// }
68 ///
69 /// assert!(is_none_at_least_once && is_seven_at_least_once);
70 /// }
71 ///
72 /// fn initializer(maybe: &ConcurrentOption<String>) {
73 /// for _ in 0..50 {
74 /// // wait for a while to simulate a delay
75 /// std::thread::sleep(std::time::Duration::from_millis(100));
76 /// }
77 ///
78 /// let _ = maybe.initialize_if_none(7.to_string());
79 ///
80 /// for _ in 0..50 {
81 /// // it is safe to call `initialize_if_none` on Some variant, it will do nothing
82 /// let inserted = maybe.initialize_if_none(1_000_000.to_string());
83 /// assert!(!inserted);
84 /// }
85 /// }
86 ///
87 /// let num_readers = 8;
88 /// let num_writers = 8;
89 ///
90 /// let maybe = ConcurrentOption::<String>::none();
91 /// let maybe_ref = &maybe;
92 ///
93 /// std::thread::scope(|s| {
94 /// for _ in 0..num_readers {
95 /// s.spawn(|| reader(maybe_ref));
96 /// }
97 ///
98 /// for _ in 0..num_writers {
99 /// s.spawn(|| initializer(maybe_ref));
100 /// }
101 /// });
102 ///
103 /// assert_eq!(maybe.unwrap(), 7.to_string());
104 /// ```
105 pub fn initialize_if_none(&self, value: T) -> bool {
106 match self.get_handle(NONE, SOME) {
107 Some(_handle) => {
108 unsafe { &mut *self.value.get() }.write(value);
109 true
110 }
111 None => false,
112 }
113 }
114
115 /// Thread safe method to initiate the value of the option with the given `value`
116 /// **provided that** the concurrent option `is_none` at the point of initializing.
117 ///
118 /// See [`initialize_if_none`] for checked version.
119 ///
120 /// [`initialize_if_none`]: ConcurrentOption::initialize_if_none
121 ///
122 /// Note that it is safe to call this method with a shared reference `&self`
123 /// **provided that** the concurrent option `is_none` at the point of initializing.
124 ///
125 /// This method is particularly useful for enabling concurrent read & write operations,
126 /// while the value is expected to be initialized only once.
127 ///
128 /// # Safety
129 ///
130 /// This method can be safely called when the concurrent option is guaranteed to be of None variant.
131 ///
132 /// On the other hand, calling it when the option has a value leads to undefined behavior
133 /// due to the following possible data race where we can be reading the value with a `&self` reference,
134 /// while at the same time writing with this unsafe method and a `&self` reference.
135 ///
136 /// # Suggestion on Concurrent Read & Write Operations
137 ///
138 /// Note that this is a one-time-write method which can happen safely while other threads are reading the option.
139 /// It is however recommended to call read-methods with a `&self` reference,
140 /// such as `as_ref`, using an ordering stronger than `Relaxed`, such as `Acquire` or `SeqCst`,
141 /// if one or more threads are expected to call write methods `initialize_if_none` or `initialize_unchecked` concurrently.
142 ///
143 /// # Example
144 ///
145 /// ```rust
146 /// use orx_concurrent_option::*;
147 /// use core::sync::atomic::Ordering;
148 ///
149 /// let x = ConcurrentOption::<String>::none();
150 /// unsafe { x.initialize_unchecked(3.to_string()) };
151 /// assert_eq!(unsafe { x.as_ref() }, Some(&3.to_string()));
152 ///
153 /// #[cfg(not(miri))]
154 /// {
155 /// let x = ConcurrentOption::some(7.to_string());
156 /// unsafe { x.initialize_unchecked(3.to_string()) }; // undefined behavior!
157 /// assert_eq!(unsafe { x.as_ref() }, Some(&3.to_string()));
158 /// }
159 /// ```
160 ///
161 /// A more advanced and useful example is demonstrated below:
162 /// * there exist multiple readers checking the value of an optional; they will receive
163 /// * None if the value is not initialized yet;
164 /// * a reference to the value otherwise.
165 /// * there exist multiple initializers each trying to set the value of optional;
166 /// * only the first one will succeed,
167 /// * since this is an initialization method, succeeding calls will safely be ignored.
168 ///
169 /// ```rust
170 /// use orx_concurrent_option::*;
171 /// use core::sync::atomic::Ordering;
172 ///
173 /// fn reader(maybe: &ConcurrentOption<String>) {
174 /// let mut is_none_at_least_once = false;
175 /// let mut is_seven_at_least_once = false;
176 ///
177 /// for _ in 0..100 {
178 /// std::thread::sleep(std::time::Duration::from_millis(100));
179 ///
180 /// let read = unsafe { maybe.as_ref() };
181 ///
182 /// let is_none = read.is_none();
183 /// let is_seven = read == Some(&7.to_string());
184 ///
185 /// assert!(is_none || is_seven);
186 ///
187 /// is_none_at_least_once |= is_none;
188 /// is_seven_at_least_once |= is_seven;
189 /// }
190 ///
191 /// assert!(is_none_at_least_once && is_seven_at_least_once);
192 /// }
193 ///
194 /// fn unsafe_initializer(maybe: &ConcurrentOption<String>) {
195 /// for _ in 0..50 {
196 /// // wait for a while to simulate a delay
197 /// std::thread::sleep(std::time::Duration::from_millis(100));
198 /// }
199 ///
200 /// // we need to make sure to call initialize_unchecked only once
201 /// unsafe { maybe.initialize_unchecked(7.to_string()) };
202 /// }
203 ///
204 /// let num_readers = 8;
205 ///
206 /// let maybe = ConcurrentOption::<String>::none();
207 /// let maybe_ref = &maybe;
208 ///
209 /// std::thread::scope(|s| {
210 /// for _ in 0..num_readers {
211 /// s.spawn(|| reader(maybe_ref));
212 /// }
213 ///
214 /// s.spawn(|| unsafe_initializer(maybe_ref));
215 /// });
216 ///
217 /// assert_eq!(maybe.unwrap(), 7.to_string());
218 /// ```
219 pub unsafe fn initialize_unchecked(&self, value: T) {
220 unsafe { &mut *self.value.get() }.write(value);
221 self.state.store(SOME, Ordering::Release);
222 }
223
224 // concurrent state mutation
225
226 /// Thread safe method to update the value of the option if it is of Some variant.
227 /// Does nothing if it is None.
228 /// Returns whether or not the value is updated.
229 ///
230 /// # Example
231 ///
232 /// ```rust
233 /// use orx_concurrent_option::*;
234 ///
235 /// let maybe = ConcurrentOption::<String>::none();
236 /// let updated = maybe.update_if_some(|x| *x = format!("{}!", x));
237 /// assert!(maybe.is_none());
238 /// assert_eq!(updated, false);
239 ///
240 /// let maybe = ConcurrentOption::some(42.to_string());
241 /// let updated = maybe.update_if_some(|x| *x = format!("{}!", x));
242 /// assert!(maybe.is_some_and(|x| x == &"42!".to_string()));
243 /// assert_eq!(updated, true);
244 /// ```
245 pub fn update_if_some<F>(&self, mut f: F) -> bool
246 where
247 F: FnMut(&mut T),
248 {
249 match self.spin_get_handle(SOME, SOME) {
250 Some(_handle) => {
251 let x = unsafe { MaybeUninit::assume_init_mut(&mut *self.value.get()) };
252 f(x);
253 true
254 }
255 None => false,
256 }
257 }
258
259 /// Thread safe method to take the value out of the option if Some,
260 /// leaving a None in its place.
261 ///
262 /// Has no impact and returns None, if the option is of None variant.
263 ///
264 /// # Examples
265 ///
266 /// ```rust
267 /// use orx_concurrent_option::*;
268 ///
269 /// let x = ConcurrentOption::some(42);
270 /// let y = x.take();
271 /// assert_eq!(x, ConcurrentOption::none());
272 /// assert_eq!(y, Some(42));
273 ///
274 /// let x: ConcurrentOption<u32> = ConcurrentOption::none();
275 /// let y = x.take();
276 /// assert_eq!(x, ConcurrentOption::none());
277 /// assert_eq!(y, None);
278 /// ```
279 pub fn take(&self) -> Option<T> {
280 match self.spin_get_handle(SOME, NONE) {
281 Some(_handle) => {
282 let x = unsafe { &*self.value.get() };
283 Some(unsafe { MaybeUninit::assume_init_read(x) })
284 }
285 None => None,
286 }
287 }
288
289 /// Thread safe method to take the value out of the option, but only if the predicate evaluates to
290 /// `true` on a mutable reference to the value.
291 ///
292 /// In other words, replaces `self` with None if the predicate returns `true`.
293 /// This method operates similar to [`ConcurrentOption::take`] but conditional.
294 ///
295 /// # Examples
296 ///
297 /// ```rust
298 /// use orx_concurrent_option::*;
299 ///
300 /// let x = ConcurrentOption::some(42);
301 ///
302 /// let prev = x.take_if(|v| if *v == 42 {
303 /// *v += 1;
304 /// false
305 /// } else {
306 /// false
307 /// });
308 /// assert_eq!(x, ConcurrentOption::some(43));
309 /// assert_eq!(prev, None);
310 ///
311 /// let prev = x.take_if(|v| *v == 43);
312 /// assert_eq!(x, ConcurrentOption::none());
313 /// assert_eq!(prev, Some(43));
314 /// ```
315 #[allow(clippy::missing_panics_doc, clippy::unwrap_in_result)]
316 pub fn take_if<P>(&self, predicate: P) -> Option<T>
317 where
318 P: FnOnce(&mut T) -> bool,
319 {
320 loop {
321 match self
322 .state
323 .compare_exchange(SOME, RESERVED, ORDER_LOAD, ORDER_LOAD)
324 {
325 Ok(_) => {
326 let x = unsafe { &mut *self.value.get() };
327 let x_mut = unsafe { MaybeUninit::assume_init_mut(x) };
328 let output = match predicate(x_mut) {
329 false => None,
330 true => Some(unsafe { MaybeUninit::assume_init_read(x) }),
331 };
332
333 let success_state = match output.is_some() {
334 true => NONE,
335 false => SOME,
336 };
337 self.state
338 .compare_exchange(RESERVED, success_state, ORDER_STORE, ORDER_STORE)
339 .expect(
340 "Failed to update the concurrent state after concurrent state mutation",
341 );
342
343 return output;
344 }
345 Err(previous_state) => match previous_state {
346 RESERVED => continue,
347 _ => return None,
348 },
349 }
350 }
351 }
352
353 /// Thread safe method to replace the actual value in the option by the value given in parameter,
354 /// returning the old value if present,
355 /// leaving a Some in its place without de-initializing either one.
356 ///
357 /// # Examples
358 ///
359 /// ```rust
360 /// use orx_concurrent_option::*;
361 ///
362 /// let x = ConcurrentOption::some(2);
363 /// let old = x.replace(5);
364 /// assert_eq!(x, ConcurrentOption::some(5));
365 /// assert_eq!(old, Some(2));
366 ///
367 /// let x: ConcurrentOption<u32> = ConcurrentOption::none();
368 /// let old = x.replace(3);
369 /// assert_eq!(x, ConcurrentOption::some(3));
370 /// assert_eq!(old, None);
371 /// ```
372 pub fn replace(&self, value: T) -> Option<T> {
373 loop {
374 if let Some(_handle) = self.spin_get_handle(SOME, SOME) {
375 let x = unsafe { (*self.value.get()).assume_init_mut() };
376 let old = core::mem::replace(x, value);
377 return Some(old);
378 }
379
380 if let Some(_handle) = self.spin_get_handle(NONE, SOME) {
381 let x = unsafe { &mut *self.value.get() };
382 x.write(value);
383 return None;
384 }
385 }
386 }
387
388 /// true if updated; false if initiated
389 pub fn set_some(&self, value: T) -> bool {
390 loop {
391 if let Some(_handle) = self.spin_get_handle(SOME, SOME) {
392 let x = unsafe { (*self.value.get()).assume_init_mut() };
393 let _old = core::mem::replace(x, value);
394 return true;
395 }
396
397 if let Some(_handle) = self.spin_get_handle(NONE, SOME) {
398 let x = unsafe { &mut *self.value.get() };
399 x.write(value);
400 return false;
401 }
402 }
403 }
404
405 /// Partially thread safe method to insert `value` into the option, and then to return a mutable reference to it.
406 ///
407 /// If the option already contains a value, the old value is dropped.
408 ///
409 /// See also [`Option::get_or_insert`], which doesn't update the value if
410 /// the option already contains Some.
411 ///
412 /// # Safety
413 ///
414 /// Note that the insertion part of this method is thread safe.
415 ///
416 /// The method is `unsafe` due to the returned mutable reference to the underlying value.
417 ///
418 /// * It is safe to use this method if the returned mutable reference is discarded (miri would still complain).
419 /// * It is also safe to use this method if the caller is able to guarantee that there exist
420 /// no concurrent reads or writes while mutating the value.
421 /// * Otherwise, it will lead to an **Undefined Behavior** due to data race.
422 ///
423 /// # Examples
424 ///
425 /// ```rust
426 /// use orx_concurrent_option::*;
427 ///
428 /// let opt: ConcurrentOption<_> = ConcurrentOption::none();
429 ///
430 /// let val = unsafe { opt.insert(1) };
431 /// assert_eq!(*val, 1);
432 /// assert_eq!(unsafe { opt.as_ref() }, Some(&1));
433 ///
434 /// let val = unsafe { opt.insert(2) };
435 /// assert_eq!(*val, 2);
436 /// *val = 3;
437 /// assert_eq!(opt.unwrap(), 3);
438 /// ```
439 #[allow(clippy::mut_from_ref)]
440 pub unsafe fn insert(&self, value: T) -> &mut T {
441 loop {
442 if let Some(_handle) = self.spin_get_handle(SOME, SOME) {
443 let x = unsafe { (*self.value.get()).assume_init_mut() };
444 let _old = core::mem::replace(x, value);
445 return x;
446 }
447
448 if let Some(_handle) = self.spin_get_handle(NONE, SOME) {
449 let x = unsafe { &mut *self.value.get() };
450 x.write(value);
451 return unsafe { x.assume_init_mut() };
452 }
453 }
454 }
455
456 /// Inserts `value` into the option if it is None, then
457 /// returns a mutable reference to the contained value.
458 ///
459 /// See also [`ConcurrentOption::insert`], which updates the value even if
460 /// the option already contains Some.
461 ///
462 /// # Safety
463 ///
464 /// Note that the insertion part of this method is thread safe.
465 ///
466 /// The method is `unsafe` due to the returned mutable reference to the underlying value.
467 ///
468 /// * It is safe to use this method if the returned mutable reference is discarded (miri would still complain).
469 /// * It is also safe to use this method if the caller is able to guarantee that there exist
470 /// no concurrent reads or writes while mutating the value.
471 /// * Otherwise, it will lead to an **Undefined Behavior** due to data race.
472 ///
473 /// # Examples
474 ///
475 /// ```rust
476 /// use orx_concurrent_option::*;
477 ///
478 /// let x = ConcurrentOption::none();
479 ///
480 /// {
481 /// let y: &mut u32 = unsafe { x.get_or_insert(5) };
482 /// assert_eq!(y, &5);
483 ///
484 /// *y = 7;
485 /// }
486 ///
487 /// assert_eq!(x, ConcurrentOption::some(7));
488 /// ```
489 #[allow(clippy::mut_from_ref)]
490 pub unsafe fn get_or_insert(&self, value: T) -> &mut T {
491 loop {
492 if let Some(_handle) = self.spin_get_handle(SOME, SOME) {
493 return unsafe { (*self.value.get()).assume_init_mut() };
494 }
495
496 if let Some(_handle) = self.spin_get_handle(NONE, SOME) {
497 let x = unsafe { &mut *self.value.get() };
498 x.write(value);
499 return unsafe { x.assume_init_mut() };
500 }
501 }
502 }
503
504 /// Partially thread safe method to insert a value computed from `f` into the option if it is None,
505 /// then returns a mutable reference to the contained value.
506 ///
507 /// # Safety
508 ///
509 /// Note that the insertion part of this method is thread safe.
510 ///
511 /// The method is `unsafe` due to the returned mutable reference to the underlying value.
512 ///
513 /// * It is safe to use this method if the returned mutable reference is discarded (miri would still complain).
514 /// * It is also safe to use this method if the caller is able to guarantee that there exist
515 /// no concurrent reads or writes while mutating the value.
516 /// * Otherwise, it will lead to an **Undefined Behavior** due to data race.
517 ///
518 /// # Examples
519 ///
520 /// ```rust
521 /// use orx_concurrent_option::*;
522 ///
523 /// let x = ConcurrentOption::none();
524 ///
525 /// {
526 /// let y: &mut u32 = unsafe { x.get_or_insert_with(|| 5) };
527 /// assert_eq!(y, &5);
528 ///
529 /// *y = 7;
530 /// }
531 ///
532 /// assert_eq!(x, ConcurrentOption::some(7));
533 /// ```
534 #[allow(clippy::mut_from_ref)]
535 pub unsafe fn get_or_insert_with<F>(&self, f: F) -> &mut T
536 where
537 F: FnOnce() -> T,
538 {
539 loop {
540 if let Some(_handle) = self.spin_get_handle(SOME, SOME) {
541 return unsafe { (*self.value.get()).assume_init_mut() };
542 }
543
544 if let Some(_handle) = self.spin_get_handle(NONE, SOME) {
545 let x = unsafe { &mut *self.value.get() };
546 x.write(f());
547 return unsafe { x.assume_init_mut() };
548 }
549 }
550 }
551}