lilos_watch/
lib.rs

1//! A simple mechanism for sharing a piece of data and being notified if it
2//! changes.
3//!
4//! This module provides [`Watch<T>`], a data structure that lets you share some
5//! data of type `T`, update it from multiple producers, and efficiently notify
6//! multiple consumers when it changes.
7//!
8//! A `Watch<T>` is particularly useful for things like configuration data,
9//! which are effectively "global" (shared by many tasks) but may need special
10//! handling of changes.
11//!
12//! ```
13//! let shared_number = Watch::new(1234_u32);
14//! ```
15//!
16//! You can create a _receive handle_ to the `Watch<T>` by calling
17//! [`subscribe`][Watch::subscribe]. This produces a [`Receiver<T>`] that allows
18//! its holder to inspect the shared data, and be notified when it changes.
19//!
20//! ```
21//! let rcvr = shared_number.subscribe();
22//! // A receiver only tracks changes made _after_ it's created:
23//! assert_eq!(rcvr.is_changed(), false);
24//! ```
25//!
26//! You can create a _send handle_ to the `Watch<T>` by calling
27//! [`sender`][Watch::sender]. This produces a [`Sender<T>`] that allows its
28//! holder to update the shared data.
29//!
30//! ```
31//! let sender = shared_number.sender();
32//!
33//! // Update the shared data:
34//! sender.send(4567);
35//!
36//! // Now the receiver sees a change:
37//! assert_eq!(rcvr.is_changed(), true);
38//!
39//! // We can inspect it and mark it as seen:
40//! rcvr.glimpse_and_update(|value| assert_eq!(*value, 4567));
41//! ```
42//!
43//! Code that wants to monitor changes to the data can use the
44//! [`Receiver::changed`] future to do so:
45//!
46//! ```
47//! loop {
48//!     rcvr.changed().await;
49//!     rcvr.glimpse_and_update(|value| process(value));
50//! }
51//! ```
52//!
53//! # Reentrancy and panics
54//!
55//! It is possible to attempt to use handles for a single `Watch<T>` reentrantly
56//! if you try hard enough. The implementation checks for this and will panic.
57//! For instance, attempting to send a new value from _inside_ the closure
58//! passed to [`Receiver::glimpse`]:
59//!
60//! ```
61//! let shared = Watch::new(some_data);
62//! let sender = shared.sender();
63//! let rcvr = shared.subscribe();
64//!
65//! // This line will panic at runtime.
66//! rcvr.glimpse(|contents| sender.send(*contents));
67//! ```
68//!
69//! It is perfectly safe to send or inspect a _different_ `Watch<T>` instance
70//! from within one of these closures, just not the same one.
71//!
72//! In practice it's pretty hard to do this accidentally, but, now you know.
73//!
74//! # Implementation
75//!
76//! Specifically, the `Watch<T>` contains a _change count_. Each time its
77//! contents are updated by any `Sender`, the change count gets incremented.
78//!
79//! Each `Receiver` has its own copy of the change count, reflecting what the
80//! count was at the last time that `Receiver` accessed the shared data. If the
81//! change count stored in the `Watch` is different from the one stored in the
82//! `Receiver`, then there is an update that hasn't been seen by its holder yet.
83//!
84//! Because the `Watch<T>` only stores a single copy of the data and a counter,
85//! a `Receiver` may not see _every_ update to the data. If multiple updates
86//! happen between checks, the `Receiver` will only ever see the last one. This
87//! keeps both the storage requirements, and the cost of updates, low.
88//!
89//! `Watch<T>` contains a [`Notify`] internally, which it uses to efficiently
90//! wake tasks that are awaiting [`Receiver::changed`].
91
92#![no_std]
93
94#![forbid(unsafe_code)]
95#![warn(
96    elided_lifetimes_in_paths,
97    explicit_outlives_requirements,
98    missing_debug_implementations,
99    missing_docs,
100    semicolon_in_expressions_from_macros,
101    single_use_lifetimes,
102    trivial_casts,
103    trivial_numeric_casts,
104    unreachable_pub,
105    unsafe_op_in_unsafe_fn,
106    unused_qualifications,
107)]
108
109use core::cell::{Cell, RefCell};
110
111use lilos::exec::Notify;
112
113/// Store some data of type `T` and efficiently notify multiple consumers if it
114/// changes.
115///
116/// See the [module docs][crate] for more specifics and examples.
117#[derive(Debug)]
118pub struct Watch<T> {
119    version: Cell<u64>,
120    contents: RefCell<T>,
121    update: Notify,
122}
123
124impl<T> Watch<T> {
125    /// Creates a new `Watch<T>` that will initially contain `contents`.
126    pub const fn new(contents: T) -> Self {
127        Self {
128            version: Cell::new(0),
129            contents: RefCell::new(contents),
130            update: Notify::new(),
131        }
132    }
133
134    /// Creates a new send-only handle to the watched data.
135    pub fn sender(&self) -> Sender<'_, T> {
136        Sender {
137            shared: self,
138        }
139    }
140
141    /// Creates a new receive-only handle to the watched data.
142    ///
143    /// The returned `Receiver` treats the current contents as "seen," and any
144    /// attempt to wait for changes will wait for the _next_ change. If this
145    /// isn't what you want, you can force the `Receiver` to treat its initial
146    /// contents as novel by calling [`Receiver::mark_as_unseen`].
147    pub fn subscribe(&self) -> Receiver<'_, T> {
148        let version = self.version.get();
149        Receiver {
150            version,
151            shared: self,
152        }
153    }
154}
155
156/// Posts new values to a `Watch<T>`.
157///
158/// There can be many `Sender`s for a single `Watch<T>`. They're not
159/// distinguished from each other --- any code observing the watched data will
160/// just see a change, and won't be told where the change was made.
161#[derive(Clone, Debug)]
162pub struct Sender<'a, T> {
163    shared: &'a Watch<T>,
164}
165
166impl<'a, T> Sender<'a, T> {
167    /// Replaces the watched data with `value` and signals that a change has
168    /// occurred.
169    ///
170    /// This advances the internal change count and wakes any tasks that have
171    /// blocked waiting for changes.
172    ///
173    /// # Panics
174    ///
175    /// If called from inside the closure passed to [`Receiver::glimpse`] or
176    /// [`Receiver::glimpse_and_update`], on a `Receiver` that would receive
177    /// `value`, this will panic. Don't do that.
178    pub fn send(&self, value: T) {
179        self.send_replace(value);
180    }
181
182    /// Replaces the watched data with `value` and signals that a change has
183    /// occurred.
184    ///
185    /// This advances the internal change count and wakes any tasks that have
186    /// blocked waiting for changes.
187    ///
188    /// # Panics
189    ///
190    /// If called from inside the closure passed to [`Receiver::glimpse`] or
191    /// [`Receiver::glimpse_and_update`], on a `Receiver` that would receive
192    /// `value`, this will panic. Don't do that.
193    pub fn send_replace(&self, value: T) -> T {
194        self.send_modify(|r| core::mem::replace(r, value))
195    }
196
197    /// Applies `body` to the watched data and then signals that a change has
198    /// occurred. `body` is free to update the watched data however it likes,
199    /// but even if it leaves it unchanged, observers will still be signaled.
200    ///
201    /// This advances the internal change count and wakes any tasks that have
202    /// blocked waiting for changes.
203    ///
204    /// # Panics
205    ///
206    /// Calling this inside the closure passed to a `Receiver` on the _same
207    /// watched data,_ or vice versa, will panic. Don't do that.
208    pub fn send_modify<R>(&self, body: impl FnOnce(&mut T) -> R) -> R {
209        let w = self.shared;
210        let Ok(mut r) = w.contents.try_borrow_mut() else {
211            panic!("attempt to send during glimpse or send_modify");
212        };
213        let result = body(&mut *r);
214        w.version.set(w.version.get().wrapping_add(1));
215
216        self.shared.update.notify();
217
218        result
219    }
220
221    /// Makes a `Receiver` that will watch the data posted by this `Sender`.
222    ///
223    /// This is the same operation as `Watch::subscribe`, provided here because
224    /// it's sometimes convenient to use directly on a `Sender`.
225    pub fn subscribe(&self) -> Receiver<'a, T> {
226        self.shared.subscribe()
227    }
228}
229
230/// Receives changes made to the data in a `Watch<T>`.
231///
232/// Each `Receiver` keeps track of which versions of the watched data it has
233/// seen. This is updated in one of three ways:
234///
235/// - When a future produced by [`Receiver::changed`] resolves.
236/// - When you call [`Receiver::glimpse_and_update`].
237/// - When you call [`Receiver::mark_as_seen`].
238#[derive(Clone, Debug)]
239pub struct Receiver<'a, T> {
240    version: u64,
241    shared: &'a Watch<T>,
242}
243
244impl<T> Receiver<'_, T> {
245    /// Returns a future that resolves when the value being observed has
246    /// been updated.
247    ///
248    /// When this future resolves, it will mark the most recent version as
249    /// having been seen. This means calling `changed` again will produce a
250    /// future that waits until _another_ change to the watched value.
251    ///
252    /// To actually observe the value when this resolves, use
253    /// [`glimpse`][Receiver::glimpse].
254    ///
255    /// Note that the value may have been updated _multiple times_ before this
256    /// future resolves; those updates will all be collapsed from this
257    /// receiver's perspective.
258    ///
259    /// # Cancellation
260    ///
261    /// This is cancel safe. If you drop the future before it resolves, nothing
262    /// will happen --- in particular, the notion of which data the `Receiver`
263    /// has or has not seen won't change.
264    pub async fn changed(&mut self) {
265        let w = self.shared;
266        let v = w.update.until(|| {
267            let v = w.version.get();
268            if v != self.version {
269                Some(v)
270            } else {
271                None
272            }
273        }).await;
274        self.version = v;
275    }
276
277    /// Checks whether the watched data has changed since it was last marked as
278    /// seen by this `Receiver`.
279    pub fn is_changed(&self) -> bool {
280        let w = self.shared;
281        let v = w.version.get();
282        v != self.version
283    }
284
285    /// Gets a brief look at the watched value.
286    ///
287    /// `glimpse` runs its `body` parameter with a reference to the watched
288    /// value. This lets you inspect the contents without necessarily having to
289    /// copy them out, which might be expensive or undesirable for some other
290    /// reason (perhaps the watched value does not impl `Clone`).
291    ///
292    /// This takes a closure instead of returning a guard to ensure that there's
293    /// no way to `await` with access to the watched value. Because this can't
294    /// hold a reference to the watched value across an `await`, _and_ the
295    /// sending side can't hold a reference across an `await`, this is
296    /// guaranteed to complete promptly without blocking or deadlocks.
297    ///
298    /// `glimpse` does not, itself, mark the latest value as having been seen.
299    /// Generally you'll use `glimpse` along with `changed` like so:
300    ///
301    /// ```rust
302    /// loop {
303    ///     watcher.changed().await;
304    ///     watcher.glimpse(|value| {
305    ///         do_stuff_with(value);
306    ///     });
307    /// }
308    /// ```
309    ///
310    /// In that example, `changed` is responsible for making the value as seen.
311    ///
312    /// If you'd like to access the value and simultaneously mark it as seen
313    /// without blocking or messing around with `changed`, use
314    /// [`glimpse_and_update`][Receiver::glimpse_and_update].
315    ///
316    /// **Note:** Even though `glimpse` does not mark the contents as seen, that
317    /// doesn't mean multiple calls to `glimpse` will see the same contents!
318    ///
319    /// # Panics
320    ///
321    /// If you attempt to use [`Sender::send`] to post to the same watched data
322    /// you are observing during `glimpse`, it'll panic. Don't do that.
323    pub fn glimpse<R>(&self, body: impl FnOnce(&T) -> R) -> R {
324        let Ok(r) = self.shared.contents.try_borrow() else {
325            panic!("attempt to glimpse during send_modify");
326        };
327        body(&*r)
328    }
329
330    /// Makes a copy of the current state of the watched data, without marking
331    /// it as seen.
332    ///
333    /// This can be useful when `T` is relatively small --- and implements
334    /// `Copy`, of course. This is equivalent to the handwritten version:
335    ///
336    /// ```
337    /// let copy = watch.glimpse(|current| *current);
338    /// ```
339    ///
340    /// To copy the contents and also mark them as seen, see
341    /// [`copy_current_and_update`][Self::copy_current_and_update].
342    #[inline(always)]
343    pub fn copy_current(&self) -> T
344        where T: Copy
345    {
346        self.glimpse(|value| *value)
347    }
348
349    /// Gets a brief look at the watched value, and records that it has been
350    /// seen.
351    ///
352    /// `glimpse_and_update` runs its `body` parameter with a reference to the
353    /// watched value. This lets you inspect the contents without necessarily
354    /// having to copy them out, which might be expensive or undesirable for
355    /// some other reason (perhaps the watched value does not impl `Clone`).
356    ///
357    /// This takes a closure instead of returning a guard to ensure that there's
358    /// no way to `await` with access to the watched value. Because this can't
359    /// hold a reference to the watched value across an `await`, _and_ the
360    /// sending side can't hold a reference across an `await`, this is
361    /// guaranteed to complete promptly without blocking or deadlocks.
362    ///
363    /// `glimpse_and_update` marks the latest value as having been seen, under
364    /// the assumption that you're not interested in waiting for changes to
365    /// occur. If you'd like to only process data in response to changes, you'll
366    /// probably use [`changed`][Receiver::changed] to do so, in which case it's
367    /// slightly cheaper (and less typing) to use [`glimpse`][Receiver::glimpse]
368    /// instead.
369    ///
370    /// # Panics
371    ///
372    /// If you attempt to use [`Sender::send`] to post to the same watched data
373    /// you are observing during `glimpse_and_update`, it'll panic. Don't do
374    /// that.
375    pub fn glimpse_and_update<R>(&mut self, body: impl FnOnce(&T) -> R) -> R {
376        let r = self.glimpse(body);
377        self.mark_as_seen();
378        r
379    }
380
381    /// Makes a copy of the current state of the watched data, and marks it as
382    /// seen.
383    ///
384    /// This can be useful when `T` is relatively small --- and implements
385    /// `Copy`, of course. This is equivalent to the handwritten version:
386    ///
387    /// ```
388    /// let copy = watch.glimpse_and_update(|current| *current);
389    /// ```
390    ///
391    /// To copy the contents _without_ marking them as seen, see
392    /// [`copy_current`][Self::copy_current].
393    pub fn copy_current_and_update(&mut self) -> T
394        where T: Copy
395    {
396        self.glimpse_and_update(|value| *value)
397    }
398
399    /// Resets this `Receiver`'s notion of what data has been seen, ensuring
400    /// that the current contents of the watched data will be treated as new for
401    /// the purposes of [`changed`][Self::changed].
402    pub fn mark_as_unseen(&mut self) {
403        // What's an integer that is guaranteed to not be equal to the current
404        // version, and is also quite unlikely to be equal to any version in the
405        // near future? How about the _previous_ version!
406        self.version = self.shared.version.get().wrapping_sub(1);
407    }
408
409    /// Marks the current contents of the watched data as having been seen,
410    /// whether or not this `Receiver` has actually seen them.
411    pub fn mark_as_seen(&mut self) {
412        self.version = self.shared.version.get();
413    }
414}