eyeball/unique.rs
1//! This module defines a unique [`Observable`] type that requires `&mut` access
2//! to update its inner value but can be dereferenced (immutably).
3//!
4//! Use this in situations where only a single location in the code should be
5//! able to update the inner value.
6
7use std::{fmt, hash::Hash, mem, ops, ptr};
8
9use readlock::Shared;
10#[cfg(feature = "async-lock")]
11use readlock_tokio::Shared as SharedAsync;
12
13#[cfg(feature = "async-lock")]
14use crate::AsyncLock;
15use crate::{lock::Lock, shared::SharedObservable, state::ObservableState, Subscriber, SyncLock};
16
17/// A value whose changes will be broadcast to subscribers.
18///
19/// `Observable<T>` dereferences to `T`, and does not have methods of its own to
20/// not clash with methods of the inner type. Instead, to interact with the
21/// `Observable` itself rather than the inner value, use its associated
22/// functions (e.g. `Observable::subscribe(observable)`).
23///
24/// # Async-aware locking
25///
26/// Contrary to [`SharedObservable`]'s async-aware locking support, using
27/// `Observable` with `L` = [`AsyncLock`] with this type is rarely useful since
28/// having access to the `Observable` means nobody can be mutating the inner
29/// value in parallel. It allows a subscriber to read-lock the value over a
30/// `.await` point without losing `Send`-ness of the future though.
31pub struct Observable<T, L: Lock = SyncLock> {
32 state: L::Shared<ObservableState<T>>,
33}
34
35impl<T> Observable<T> {
36 /// Create a new `Observable` with the given initial value.
37 #[must_use]
38 pub fn new(value: T) -> Self {
39 let state = Shared::new(ObservableState::new(value));
40 Self::from_inner(state)
41 }
42
43 /// Obtain a new subscriber.
44 ///
45 /// Calling `.next().await` or `.next_ref().await` on the returned
46 /// subscriber only resolves once the inner value has been updated again
47 /// after the call to `subscribe`.
48 ///
49 /// See [`subscribe_reset`][Self::subscribe_reset] if you want to obtain a
50 /// subscriber that immediately yields without any updates.
51 pub fn subscribe(this: &Self) -> Subscriber<T> {
52 Subscriber::new(Shared::get_read_lock(&this.state), this.state.version())
53 }
54
55 /// Obtain a new subscriber that immediately yields.
56 ///
57 /// `.subscribe_reset()` is equivalent to `.subscribe()` with a subsequent
58 /// call to [`.reset()`][Subscriber::reset] on the returned subscriber.
59 ///
60 /// In contrast to [`subscribe`][Self::subscribe], calling `.next().await`
61 /// or `.next_ref().await` on the returned subscriber before updating the
62 /// inner value yields the current value instead of waiting. Further calls
63 /// to either of the two will wait for updates.
64 pub fn subscribe_reset(this: &Self) -> Subscriber<T> {
65 Subscriber::new(Shared::get_read_lock(&this.state), 0)
66 }
67
68 /// Get a reference to the inner value.
69 ///
70 /// Usually, you don't need to call this function since `Observable<T>`
71 /// implements `Deref`. Use this if you want to pass the inner value to a
72 /// generic function where the compiler can't infer that you want to have
73 /// the `Observable` dereferenced otherwise.
74 pub fn get(this: &Self) -> &T {
75 this.state.get()
76 }
77
78 /// Set the inner value to the given `value`, notify subscribers and return
79 /// the previous value.
80 pub fn set(this: &mut Self, value: T) -> T {
81 Shared::lock(&mut this.state).set(value)
82 }
83
84 /// Set the inner value to the given `value` if it doesn't compare equal to
85 /// the existing value.
86 ///
87 /// If the inner value is set, subscribers are notified and
88 /// `Some(previous_value)` is returned. Otherwise, `None` is returned.
89 pub fn set_if_not_eq(this: &mut Self, value: T) -> Option<T>
90 where
91 T: PartialEq,
92 {
93 Shared::lock(&mut this.state).set_if_not_eq(value)
94 }
95
96 /// Set the inner value to the given `value` if it has a different hash than
97 /// the existing value.
98 ///
99 /// If the inner value is set, subscribers are notified and
100 /// `Some(previous_value)` is returned. Otherwise, `None` is returned.
101 pub fn set_if_hash_not_eq(this: &mut Self, value: T) -> Option<T>
102 where
103 T: Hash,
104 {
105 Shared::lock(&mut this.state).set_if_hash_not_eq(value)
106 }
107
108 /// Set the inner value to a `Default` instance of its type, notify
109 /// subscribers and return the previous value.
110 ///
111 /// Shorthand for `Observable::set(this, T::default())`.
112 pub fn take(this: &mut Self) -> T
113 where
114 T: Default,
115 {
116 Self::set(this, T::default())
117 }
118
119 /// Update the inner value and notify subscribers.
120 ///
121 /// Note that even if the inner value is not actually changed by the
122 /// closure, subscribers will be notified as if it was. Use
123 /// [`update_if`][Self::update_if] if you want to conditionally mutate the
124 /// inner value.
125 pub fn update(this: &mut Self, f: impl FnOnce(&mut T)) {
126 Shared::lock(&mut this.state).update(f);
127 }
128
129 /// Maybe update the inner value and notify subscribers if it changed.
130 ///
131 /// The closure given to this function must return `true` if subscribers
132 /// should be notified of a change to the inner value.
133 pub fn update_if(this: &mut Self, f: impl FnOnce(&mut T) -> bool) {
134 Shared::lock(&mut this.state).update_if(f);
135 }
136}
137
138#[cfg(feature = "async-lock")]
139impl<T: Send + Sync + 'static> Observable<T, AsyncLock> {
140 /// Create a new `Observable` with the given initial value.
141 #[must_use]
142 pub fn new_async(value: T) -> Self {
143 let state = SharedAsync::new(ObservableState::new(value));
144 Self::from_inner(state)
145 }
146
147 /// Obtain a new subscriber.
148 ///
149 /// Calling `.next().await` or `.next_ref().await` on the returned
150 /// subscriber only resolves once the inner value has been updated again
151 /// after the call to `subscribe`.
152 ///
153 /// See [`subscribe_reset`][Self::subscribe_reset] if you want to obtain a
154 /// subscriber that immediately yields without any updates.
155 pub fn subscribe_async(this: &Self) -> Subscriber<T, AsyncLock> {
156 Subscriber::new_async(SharedAsync::get_read_lock(&this.state), this.state.version())
157 }
158
159 /// Obtain a new subscriber that immediately yields.
160 ///
161 /// `.subscribe_reset()` is equivalent to `.subscribe()` with a subsequent
162 /// call to [`.reset()`][Subscriber::reset] on the returned subscriber.
163 ///
164 /// In contrast to [`subscribe`][Self::subscribe], calling `.next().await`
165 /// or `.next_ref().await` on the returned subscriber before updating the
166 /// inner value yields the current value instead of waiting. Further calls
167 /// to either of the two will wait for updates.
168 pub fn subscribe_reset_async(this: &Self) -> Subscriber<T, AsyncLock> {
169 Subscriber::new_async(SharedAsync::get_read_lock(&this.state), 0)
170 }
171
172 /// Get a reference to the inner value.
173 ///
174 /// Usually, you don't need to call this function since `Observable<T>`
175 /// implements `Deref`. Use this if you want to pass the inner value to a
176 /// generic function where the compiler can't infer that you want to have
177 /// the `Observable` dereferenced otherwise.
178 pub fn get_async(this: &Self) -> &T {
179 this.state.get()
180 }
181
182 /// Set the inner value to the given `value`, notify subscribers and return
183 /// the previous value.
184 pub async fn set_async(this: &mut Self, value: T) -> T {
185 SharedAsync::lock(&mut this.state).await.set(value)
186 }
187
188 /// Set the inner value to the given `value` if it doesn't compare equal to
189 /// the existing value.
190 ///
191 /// If the inner value is set, subscribers are notified and
192 /// `Some(previous_value)` is returned. Otherwise, `None` is returned.
193 pub async fn set_if_not_eq_async(this: &mut Self, value: T) -> Option<T>
194 where
195 T: PartialEq,
196 {
197 SharedAsync::lock(&mut this.state).await.set_if_not_eq(value)
198 }
199
200 /// Set the inner value to the given `value` if it has a different hash than
201 /// the existing value.
202 ///
203 /// If the inner value is set, subscribers are notified and
204 /// `Some(previous_value)` is returned. Otherwise, `None` is returned.
205 pub async fn set_if_hash_not_eq_async(this: &mut Self, value: T) -> Option<T>
206 where
207 T: Hash,
208 {
209 SharedAsync::lock(&mut this.state).await.set_if_hash_not_eq(value)
210 }
211
212 /// Set the inner value to a `Default` instance of its type, notify
213 /// subscribers and return the previous value.
214 ///
215 /// Shorthand for `Observable::set(this, T::default())`.
216 pub async fn take_async(this: &mut Self) -> T
217 where
218 T: Default,
219 {
220 Self::set_async(this, T::default()).await
221 }
222
223 /// Update the inner value and notify subscribers.
224 ///
225 /// Note that even if the inner value is not actually changed by the
226 /// closure, subscribers will be notified as if it was. Use
227 /// [`update_if`][Self::update_if] if you want to conditionally mutate the
228 /// inner value.
229 pub async fn update_async(this: &mut Self, f: impl FnOnce(&mut T)) {
230 SharedAsync::lock(&mut this.state).await.update(f);
231 }
232
233 /// Maybe update the inner value and notify subscribers if it changed.
234 ///
235 /// The closure given to this function must return `true` if subscribers
236 /// should be notified of a change to the inner value.
237 pub async fn update_if_async(this: &mut Self, f: impl FnOnce(&mut T) -> bool) {
238 SharedAsync::lock(&mut this.state).await.update_if(f);
239 }
240}
241
242impl<T, L: Lock> Observable<T, L> {
243 pub(crate) fn from_inner(state: L::Shared<ObservableState<T>>) -> Self {
244 Self { state }
245 }
246
247 /// Get the number of subscribers.
248 ///
249 /// Be careful when using this. The result is only reliable if it is exactly
250 /// `0`, as otherwise it could be incremented right after your call to this
251 /// function, before you look at its result or do anything based on that.
252 #[must_use]
253 pub fn subscriber_count(this: &Self) -> usize {
254 L::shared_read_count(&this.state)
255 }
256
257 /// Convert this unique `Observable` into a [`SharedObservable`].
258 ///
259 /// Any subscribers created for `self` remain valid.
260 pub fn into_shared(this: Self) -> SharedObservable<T, L> {
261 // Destructure `this` without running `Drop`.
262 let state = unsafe { ptr::read(&this.state) };
263 mem::forget(this);
264
265 let rwlock = L::shared_into_inner(state);
266 SharedObservable::from_inner(rwlock)
267 }
268}
269
270impl<T, L: Lock> fmt::Debug for Observable<T, L>
271where
272 L::Shared<ObservableState<T>>: fmt::Debug,
273{
274 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275 f.debug_struct("SharedObservable").field("state", &self.state).finish()
276 }
277}
278
279impl<T, L> Default for Observable<T, L>
280where
281 T: Default,
282 L: Lock,
283{
284 fn default() -> Self {
285 let shared = L::new_shared(ObservableState::new(T::default()));
286 Self::from_inner(shared)
287 }
288}
289
290// Note: No DerefMut because all mutating must go through inherent methods that
291// notify subscribers
292impl<T> ops::Deref for Observable<T> {
293 type Target = T;
294
295 fn deref(&self) -> &Self::Target {
296 self.state.get()
297 }
298}
299
300impl<T, L: Lock> Drop for Observable<T, L> {
301 fn drop(&mut self) {
302 self.state.close();
303 }
304}