eyeball/
unique.rs

1//! This module defines a unique [`Observable`] type that requires `&mut` access
2//! to update its inner value but can be dereferenced (immutably).
3//!
4//! Use this in situations where only a single location in the code should be
5//! able to update the inner value.
6
7use std::{fmt, hash::Hash, mem, ops, ptr};
8
9use readlock::Shared;
10#[cfg(feature = "async-lock")]
11use readlock_tokio::Shared as SharedAsync;
12
13#[cfg(feature = "async-lock")]
14use crate::AsyncLock;
15use crate::{lock::Lock, shared::SharedObservable, state::ObservableState, Subscriber, SyncLock};
16
17/// A value whose changes will be broadcast to subscribers.
18///
19/// `Observable<T>` dereferences to `T`, and does not have methods of its own to
20/// not clash with methods of the inner type. Instead, to interact with the
21/// `Observable` itself rather than the inner value, use its associated
22/// functions (e.g. `Observable::subscribe(observable)`).
23///
24/// # Async-aware locking
25///
26/// Contrary to [`SharedObservable`]'s async-aware locking support, using
27/// `Observable` with `L` = [`AsyncLock`] with this type is rarely useful since
28/// having access to the `Observable` means nobody can be mutating the inner
29/// value in parallel. It allows a subscriber to read-lock the value over a
30/// `.await` point without losing `Send`-ness of the future though.
31pub struct Observable<T, L: Lock = SyncLock> {
32    state: L::Shared<ObservableState<T>>,
33}
34
35impl<T> Observable<T> {
36    /// Create a new `Observable` with the given initial value.
37    #[must_use]
38    pub fn new(value: T) -> Self {
39        let state = Shared::new(ObservableState::new(value));
40        Self::from_inner(state)
41    }
42
43    /// Obtain a new subscriber.
44    ///
45    /// Calling `.next().await` or `.next_ref().await` on the returned
46    /// subscriber only resolves once the inner value has been updated again
47    /// after the call to `subscribe`.
48    ///
49    /// See [`subscribe_reset`][Self::subscribe_reset] if you want to obtain a
50    /// subscriber that immediately yields without any updates.
51    pub fn subscribe(this: &Self) -> Subscriber<T> {
52        Subscriber::new(Shared::get_read_lock(&this.state), this.state.version())
53    }
54
55    /// Obtain a new subscriber that immediately yields.
56    ///
57    /// `.subscribe_reset()` is equivalent to `.subscribe()` with a subsequent
58    /// call to [`.reset()`][Subscriber::reset] on the returned subscriber.
59    ///
60    /// In contrast to [`subscribe`][Self::subscribe], calling `.next().await`
61    /// or `.next_ref().await` on the returned subscriber before updating the
62    /// inner value yields the current value instead of waiting. Further calls
63    /// to either of the two will wait for updates.
64    pub fn subscribe_reset(this: &Self) -> Subscriber<T> {
65        Subscriber::new(Shared::get_read_lock(&this.state), 0)
66    }
67
68    /// Get a reference to the inner value.
69    ///
70    /// Usually, you don't need to call this function since `Observable<T>`
71    /// implements `Deref`. Use this if you want to pass the inner value to a
72    /// generic function where the compiler can't infer that you want to have
73    /// the `Observable` dereferenced otherwise.
74    pub fn get(this: &Self) -> &T {
75        this.state.get()
76    }
77
78    /// Set the inner value to the given `value`, notify subscribers and return
79    /// the previous value.
80    pub fn set(this: &mut Self, value: T) -> T {
81        Shared::lock(&mut this.state).set(value)
82    }
83
84    /// Set the inner value to the given `value` if it doesn't compare equal to
85    /// the existing value.
86    ///
87    /// If the inner value is set, subscribers are notified and
88    /// `Some(previous_value)` is returned. Otherwise, `None` is returned.
89    pub fn set_if_not_eq(this: &mut Self, value: T) -> Option<T>
90    where
91        T: PartialEq,
92    {
93        Shared::lock(&mut this.state).set_if_not_eq(value)
94    }
95
96    /// Set the inner value to the given `value` if it has a different hash than
97    /// the existing value.
98    ///
99    /// If the inner value is set, subscribers are notified and
100    /// `Some(previous_value)` is returned. Otherwise, `None` is returned.
101    pub fn set_if_hash_not_eq(this: &mut Self, value: T) -> Option<T>
102    where
103        T: Hash,
104    {
105        Shared::lock(&mut this.state).set_if_hash_not_eq(value)
106    }
107
108    /// Set the inner value to a `Default` instance of its type, notify
109    /// subscribers and return the previous value.
110    ///
111    /// Shorthand for `Observable::set(this, T::default())`.
112    pub fn take(this: &mut Self) -> T
113    where
114        T: Default,
115    {
116        Self::set(this, T::default())
117    }
118
119    /// Update the inner value and notify subscribers.
120    ///
121    /// Note that even if the inner value is not actually changed by the
122    /// closure, subscribers will be notified as if it was. Use
123    /// [`update_if`][Self::update_if] if you want to conditionally mutate the
124    /// inner value.
125    pub fn update(this: &mut Self, f: impl FnOnce(&mut T)) {
126        Shared::lock(&mut this.state).update(f);
127    }
128
129    /// Maybe update the inner value and notify subscribers if it changed.
130    ///
131    /// The closure given to this function must return `true` if subscribers
132    /// should be notified of a change to the inner value.
133    pub fn update_if(this: &mut Self, f: impl FnOnce(&mut T) -> bool) {
134        Shared::lock(&mut this.state).update_if(f);
135    }
136}
137
138#[cfg(feature = "async-lock")]
139impl<T: Send + Sync + 'static> Observable<T, AsyncLock> {
140    /// Create a new `Observable` with the given initial value.
141    #[must_use]
142    pub fn new_async(value: T) -> Self {
143        let state = SharedAsync::new(ObservableState::new(value));
144        Self::from_inner(state)
145    }
146
147    /// Obtain a new subscriber.
148    ///
149    /// Calling `.next().await` or `.next_ref().await` on the returned
150    /// subscriber only resolves once the inner value has been updated again
151    /// after the call to `subscribe`.
152    ///
153    /// See [`subscribe_reset`][Self::subscribe_reset] if you want to obtain a
154    /// subscriber that immediately yields without any updates.
155    pub fn subscribe_async(this: &Self) -> Subscriber<T, AsyncLock> {
156        Subscriber::new_async(SharedAsync::get_read_lock(&this.state), this.state.version())
157    }
158
159    /// Obtain a new subscriber that immediately yields.
160    ///
161    /// `.subscribe_reset()` is equivalent to `.subscribe()` with a subsequent
162    /// call to [`.reset()`][Subscriber::reset] on the returned subscriber.
163    ///
164    /// In contrast to [`subscribe`][Self::subscribe], calling `.next().await`
165    /// or `.next_ref().await` on the returned subscriber before updating the
166    /// inner value yields the current value instead of waiting. Further calls
167    /// to either of the two will wait for updates.
168    pub fn subscribe_reset_async(this: &Self) -> Subscriber<T, AsyncLock> {
169        Subscriber::new_async(SharedAsync::get_read_lock(&this.state), 0)
170    }
171
172    /// Get a reference to the inner value.
173    ///
174    /// Usually, you don't need to call this function since `Observable<T>`
175    /// implements `Deref`. Use this if you want to pass the inner value to a
176    /// generic function where the compiler can't infer that you want to have
177    /// the `Observable` dereferenced otherwise.
178    pub fn get_async(this: &Self) -> &T {
179        this.state.get()
180    }
181
182    /// Set the inner value to the given `value`, notify subscribers and return
183    /// the previous value.
184    pub async fn set_async(this: &mut Self, value: T) -> T {
185        SharedAsync::lock(&mut this.state).await.set(value)
186    }
187
188    /// Set the inner value to the given `value` if it doesn't compare equal to
189    /// the existing value.
190    ///
191    /// If the inner value is set, subscribers are notified and
192    /// `Some(previous_value)` is returned. Otherwise, `None` is returned.
193    pub async fn set_if_not_eq_async(this: &mut Self, value: T) -> Option<T>
194    where
195        T: PartialEq,
196    {
197        SharedAsync::lock(&mut this.state).await.set_if_not_eq(value)
198    }
199
200    /// Set the inner value to the given `value` if it has a different hash than
201    /// the existing value.
202    ///
203    /// If the inner value is set, subscribers are notified and
204    /// `Some(previous_value)` is returned. Otherwise, `None` is returned.
205    pub async fn set_if_hash_not_eq_async(this: &mut Self, value: T) -> Option<T>
206    where
207        T: Hash,
208    {
209        SharedAsync::lock(&mut this.state).await.set_if_hash_not_eq(value)
210    }
211
212    /// Set the inner value to a `Default` instance of its type, notify
213    /// subscribers and return the previous value.
214    ///
215    /// Shorthand for `Observable::set(this, T::default())`.
216    pub async fn take_async(this: &mut Self) -> T
217    where
218        T: Default,
219    {
220        Self::set_async(this, T::default()).await
221    }
222
223    /// Update the inner value and notify subscribers.
224    ///
225    /// Note that even if the inner value is not actually changed by the
226    /// closure, subscribers will be notified as if it was. Use
227    /// [`update_if`][Self::update_if] if you want to conditionally mutate the
228    /// inner value.
229    pub async fn update_async(this: &mut Self, f: impl FnOnce(&mut T)) {
230        SharedAsync::lock(&mut this.state).await.update(f);
231    }
232
233    /// Maybe update the inner value and notify subscribers if it changed.
234    ///
235    /// The closure given to this function must return `true` if subscribers
236    /// should be notified of a change to the inner value.
237    pub async fn update_if_async(this: &mut Self, f: impl FnOnce(&mut T) -> bool) {
238        SharedAsync::lock(&mut this.state).await.update_if(f);
239    }
240}
241
242impl<T, L: Lock> Observable<T, L> {
243    pub(crate) fn from_inner(state: L::Shared<ObservableState<T>>) -> Self {
244        Self { state }
245    }
246
247    /// Get the number of subscribers.
248    ///
249    /// Be careful when using this. The result is only reliable if it is exactly
250    /// `0`, as otherwise it could be incremented right after your call to this
251    /// function, before you look at its result or do anything based on that.
252    #[must_use]
253    pub fn subscriber_count(this: &Self) -> usize {
254        L::shared_read_count(&this.state)
255    }
256
257    /// Convert this unique `Observable` into a [`SharedObservable`].
258    ///
259    /// Any subscribers created for `self` remain valid.
260    pub fn into_shared(this: Self) -> SharedObservable<T, L> {
261        // Destructure `this` without running `Drop`.
262        let state = unsafe { ptr::read(&this.state) };
263        mem::forget(this);
264
265        let rwlock = L::shared_into_inner(state);
266        SharedObservable::from_inner(rwlock)
267    }
268}
269
270impl<T, L: Lock> fmt::Debug for Observable<T, L>
271where
272    L::Shared<ObservableState<T>>: fmt::Debug,
273{
274    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275        f.debug_struct("SharedObservable").field("state", &self.state).finish()
276    }
277}
278
279impl<T, L> Default for Observable<T, L>
280where
281    T: Default,
282    L: Lock,
283{
284    fn default() -> Self {
285        let shared = L::new_shared(ObservableState::new(T::default()));
286        Self::from_inner(shared)
287    }
288}
289
290// Note: No DerefMut because all mutating must go through inherent methods that
291// notify subscribers
292impl<T> ops::Deref for Observable<T> {
293    type Target = T;
294
295    fn deref(&self) -> &Self::Target {
296        self.state.get()
297    }
298}
299
300impl<T, L: Lock> Drop for Observable<T, L> {
301    fn drop(&mut self) {
302        self.state.close();
303    }
304}