lilos_watch/lib.rs
1//! A simple mechanism for sharing a piece of data and being notified if it
2//! changes.
3//!
4//! This module provides [`Watch<T>`], a data structure that lets you share some
5//! data of type `T`, update it from multiple producers, and efficiently notify
6//! multiple consumers when it changes.
7//!
8//! A `Watch<T>` is particularly useful for things like configuration data,
9//! which are effectively "global" (shared by many tasks) but may need special
10//! handling of changes.
11//!
12//! ```
13//! let shared_number = Watch::new(1234_u32);
14//! ```
15//!
16//! You can create a _receive handle_ to the `Watch<T>` by calling
17//! [`subscribe`][Watch::subscribe]. This produces a [`Receiver<T>`] that allows
18//! its holder to inspect the shared data, and be notified when it changes.
19//!
20//! ```
21//! let rcvr = shared_number.subscribe();
22//! // A receiver only tracks changes made _after_ it's created:
23//! assert_eq!(rcvr.is_changed(), false);
24//! ```
25//!
26//! You can create a _send handle_ to the `Watch<T>` by calling
27//! [`sender`][Watch::sender]. This produces a [`Sender<T>`] that allows its
28//! holder to update the shared data.
29//!
30//! ```
31//! let sender = shared_number.sender();
32//!
33//! // Update the shared data:
34//! sender.send(4567);
35//!
36//! // Now the receiver sees a change:
37//! assert_eq!(rcvr.is_changed(), true);
38//!
39//! // We can inspect it and mark it as seen:
40//! rcvr.glimpse_and_update(|value| assert_eq!(*value, 4567));
41//! ```
42//!
43//! Code that wants to monitor changes to the data can use the
44//! [`Receiver::changed`] future to do so:
45//!
46//! ```
47//! loop {
48//! rcvr.changed().await;
49//! rcvr.glimpse_and_update(|value| process(value));
50//! }
51//! ```
52//!
53//! # Reentrancy and panics
54//!
55//! It is possible to attempt to use handles for a single `Watch<T>` reentrantly
56//! if you try hard enough. The implementation checks for this and will panic.
57//! For instance, attempting to send a new value from _inside_ the closure
58//! passed to [`Receiver::glimpse`]:
59//!
60//! ```
61//! let shared = Watch::new(some_data);
62//! let sender = shared.sender();
63//! let rcvr = shared.subscribe();
64//!
65//! // This line will panic at runtime.
66//! rcvr.glimpse(|contents| sender.send(*contents));
67//! ```
68//!
69//! It is perfectly safe to send or inspect a _different_ `Watch<T>` instance
70//! from within one of these closures, just not the same one.
71//!
72//! In practice it's pretty hard to do this accidentally, but, now you know.
73//!
74//! # Implementation
75//!
76//! Specifically, the `Watch<T>` contains a _change count_. Each time its
77//! contents are updated by any `Sender`, the change count gets incremented.
78//!
79//! Each `Receiver` has its own copy of the change count, reflecting what the
80//! count was at the last time that `Receiver` accessed the shared data. If the
81//! change count stored in the `Watch` is different from the one stored in the
82//! `Receiver`, then there is an update that hasn't been seen by its holder yet.
83//!
84//! Because the `Watch<T>` only stores a single copy of the data and a counter,
85//! a `Receiver` may not see _every_ update to the data. If multiple updates
86//! happen between checks, the `Receiver` will only ever see the last one. This
87//! keeps both the storage requirements, and the cost of updates, low.
88//!
89//! `Watch<T>` contains a [`Notify`] internally, which it uses to efficiently
90//! wake tasks that are awaiting [`Receiver::changed`].
91
92#![no_std]
93
94#![forbid(unsafe_code)]
95#![warn(
96 elided_lifetimes_in_paths,
97 explicit_outlives_requirements,
98 missing_debug_implementations,
99 missing_docs,
100 semicolon_in_expressions_from_macros,
101 single_use_lifetimes,
102 trivial_casts,
103 trivial_numeric_casts,
104 unreachable_pub,
105 unsafe_op_in_unsafe_fn,
106 unused_qualifications,
107)]
108
109use core::cell::{Cell, RefCell};
110
111use lilos::exec::Notify;
112
113/// Store some data of type `T` and efficiently notify multiple consumers if it
114/// changes.
115///
116/// See the [module docs][crate] for more specifics and examples.
117#[derive(Debug)]
118pub struct Watch<T> {
119 version: Cell<u64>,
120 contents: RefCell<T>,
121 update: Notify,
122}
123
124impl<T> Watch<T> {
125 /// Creates a new `Watch<T>` that will initially contain `contents`.
126 pub const fn new(contents: T) -> Self {
127 Self {
128 version: Cell::new(0),
129 contents: RefCell::new(contents),
130 update: Notify::new(),
131 }
132 }
133
134 /// Creates a new send-only handle to the watched data.
135 pub fn sender(&self) -> Sender<'_, T> {
136 Sender {
137 shared: self,
138 }
139 }
140
141 /// Creates a new receive-only handle to the watched data.
142 ///
143 /// The returned `Receiver` treats the current contents as "seen," and any
144 /// attempt to wait for changes will wait for the _next_ change. If this
145 /// isn't what you want, you can force the `Receiver` to treat its initial
146 /// contents as novel by calling [`Receiver::mark_as_unseen`].
147 pub fn subscribe(&self) -> Receiver<'_, T> {
148 let version = self.version.get();
149 Receiver {
150 version,
151 shared: self,
152 }
153 }
154}
155
156/// Posts new values to a `Watch<T>`.
157///
158/// There can be many `Sender`s for a single `Watch<T>`. They're not
159/// distinguished from each other --- any code observing the watched data will
160/// just see a change, and won't be told where the change was made.
161#[derive(Clone, Debug)]
162pub struct Sender<'a, T> {
163 shared: &'a Watch<T>,
164}
165
166impl<'a, T> Sender<'a, T> {
167 /// Replaces the watched data with `value` and signals that a change has
168 /// occurred.
169 ///
170 /// This advances the internal change count and wakes any tasks that have
171 /// blocked waiting for changes.
172 ///
173 /// # Panics
174 ///
175 /// If called from inside the closure passed to [`Receiver::glimpse`] or
176 /// [`Receiver::glimpse_and_update`], on a `Receiver` that would receive
177 /// `value`, this will panic. Don't do that.
178 pub fn send(&self, value: T) {
179 self.send_replace(value);
180 }
181
182 /// Replaces the watched data with `value` and signals that a change has
183 /// occurred.
184 ///
185 /// This advances the internal change count and wakes any tasks that have
186 /// blocked waiting for changes.
187 ///
188 /// # Panics
189 ///
190 /// If called from inside the closure passed to [`Receiver::glimpse`] or
191 /// [`Receiver::glimpse_and_update`], on a `Receiver` that would receive
192 /// `value`, this will panic. Don't do that.
193 pub fn send_replace(&self, value: T) -> T {
194 self.send_modify(|r| core::mem::replace(r, value))
195 }
196
197 /// Applies `body` to the watched data and then signals that a change has
198 /// occurred. `body` is free to update the watched data however it likes,
199 /// but even if it leaves it unchanged, observers will still be signaled.
200 ///
201 /// This advances the internal change count and wakes any tasks that have
202 /// blocked waiting for changes.
203 ///
204 /// # Panics
205 ///
206 /// Calling this inside the closure passed to a `Receiver` on the _same
207 /// watched data,_ or vice versa, will panic. Don't do that.
208 pub fn send_modify<R>(&self, body: impl FnOnce(&mut T) -> R) -> R {
209 let w = self.shared;
210 let Ok(mut r) = w.contents.try_borrow_mut() else {
211 panic!("attempt to send during glimpse or send_modify");
212 };
213 let result = body(&mut *r);
214 w.version.set(w.version.get().wrapping_add(1));
215
216 self.shared.update.notify();
217
218 result
219 }
220
221 /// Makes a `Receiver` that will watch the data posted by this `Sender`.
222 ///
223 /// This is the same operation as `Watch::subscribe`, provided here because
224 /// it's sometimes convenient to use directly on a `Sender`.
225 pub fn subscribe(&self) -> Receiver<'a, T> {
226 self.shared.subscribe()
227 }
228}
229
230/// Receives changes made to the data in a `Watch<T>`.
231///
232/// Each `Receiver` keeps track of which versions of the watched data it has
233/// seen. This is updated in one of three ways:
234///
235/// - When a future produced by [`Receiver::changed`] resolves.
236/// - When you call [`Receiver::glimpse_and_update`].
237/// - When you call [`Receiver::mark_as_seen`].
238#[derive(Clone, Debug)]
239pub struct Receiver<'a, T> {
240 version: u64,
241 shared: &'a Watch<T>,
242}
243
244impl<T> Receiver<'_, T> {
245 /// Returns a future that resolves when the value being observed has
246 /// been updated.
247 ///
248 /// When this future resolves, it will mark the most recent version as
249 /// having been seen. This means calling `changed` again will produce a
250 /// future that waits until _another_ change to the watched value.
251 ///
252 /// To actually observe the value when this resolves, use
253 /// [`glimpse`][Receiver::glimpse].
254 ///
255 /// Note that the value may have been updated _multiple times_ before this
256 /// future resolves; those updates will all be collapsed from this
257 /// receiver's perspective.
258 ///
259 /// # Cancellation
260 ///
261 /// This is cancel safe. If you drop the future before it resolves, nothing
262 /// will happen --- in particular, the notion of which data the `Receiver`
263 /// has or has not seen won't change.
264 pub async fn changed(&mut self) {
265 let w = self.shared;
266 let v = w.update.until(|| {
267 let v = w.version.get();
268 if v != self.version {
269 Some(v)
270 } else {
271 None
272 }
273 }).await;
274 self.version = v;
275 }
276
277 /// Checks whether the watched data has changed since it was last marked as
278 /// seen by this `Receiver`.
279 pub fn is_changed(&self) -> bool {
280 let w = self.shared;
281 let v = w.version.get();
282 v != self.version
283 }
284
285 /// Gets a brief look at the watched value.
286 ///
287 /// `glimpse` runs its `body` parameter with a reference to the watched
288 /// value. This lets you inspect the contents without necessarily having to
289 /// copy them out, which might be expensive or undesirable for some other
290 /// reason (perhaps the watched value does not impl `Clone`).
291 ///
292 /// This takes a closure instead of returning a guard to ensure that there's
293 /// no way to `await` with access to the watched value. Because this can't
294 /// hold a reference to the watched value across an `await`, _and_ the
295 /// sending side can't hold a reference across an `await`, this is
296 /// guaranteed to complete promptly without blocking or deadlocks.
297 ///
298 /// `glimpse` does not, itself, mark the latest value as having been seen.
299 /// Generally you'll use `glimpse` along with `changed` like so:
300 ///
301 /// ```rust
302 /// loop {
303 /// watcher.changed().await;
304 /// watcher.glimpse(|value| {
305 /// do_stuff_with(value);
306 /// });
307 /// }
308 /// ```
309 ///
310 /// In that example, `changed` is responsible for making the value as seen.
311 ///
312 /// If you'd like to access the value and simultaneously mark it as seen
313 /// without blocking or messing around with `changed`, use
314 /// [`glimpse_and_update`][Receiver::glimpse_and_update].
315 ///
316 /// **Note:** Even though `glimpse` does not mark the contents as seen, that
317 /// doesn't mean multiple calls to `glimpse` will see the same contents!
318 ///
319 /// # Panics
320 ///
321 /// If you attempt to use [`Sender::send`] to post to the same watched data
322 /// you are observing during `glimpse`, it'll panic. Don't do that.
323 pub fn glimpse<R>(&self, body: impl FnOnce(&T) -> R) -> R {
324 let Ok(r) = self.shared.contents.try_borrow() else {
325 panic!("attempt to glimpse during send_modify");
326 };
327 body(&*r)
328 }
329
330 /// Makes a copy of the current state of the watched data, without marking
331 /// it as seen.
332 ///
333 /// This can be useful when `T` is relatively small --- and implements
334 /// `Copy`, of course. This is equivalent to the handwritten version:
335 ///
336 /// ```
337 /// let copy = watch.glimpse(|current| *current);
338 /// ```
339 ///
340 /// To copy the contents and also mark them as seen, see
341 /// [`copy_current_and_update`][Self::copy_current_and_update].
342 #[inline(always)]
343 pub fn copy_current(&self) -> T
344 where T: Copy
345 {
346 self.glimpse(|value| *value)
347 }
348
349 /// Gets a brief look at the watched value, and records that it has been
350 /// seen.
351 ///
352 /// `glimpse_and_update` runs its `body` parameter with a reference to the
353 /// watched value. This lets you inspect the contents without necessarily
354 /// having to copy them out, which might be expensive or undesirable for
355 /// some other reason (perhaps the watched value does not impl `Clone`).
356 ///
357 /// This takes a closure instead of returning a guard to ensure that there's
358 /// no way to `await` with access to the watched value. Because this can't
359 /// hold a reference to the watched value across an `await`, _and_ the
360 /// sending side can't hold a reference across an `await`, this is
361 /// guaranteed to complete promptly without blocking or deadlocks.
362 ///
363 /// `glimpse_and_update` marks the latest value as having been seen, under
364 /// the assumption that you're not interested in waiting for changes to
365 /// occur. If you'd like to only process data in response to changes, you'll
366 /// probably use [`changed`][Receiver::changed] to do so, in which case it's
367 /// slightly cheaper (and less typing) to use [`glimpse`][Receiver::glimpse]
368 /// instead.
369 ///
370 /// # Panics
371 ///
372 /// If you attempt to use [`Sender::send`] to post to the same watched data
373 /// you are observing during `glimpse_and_update`, it'll panic. Don't do
374 /// that.
375 pub fn glimpse_and_update<R>(&mut self, body: impl FnOnce(&T) -> R) -> R {
376 let r = self.glimpse(body);
377 self.mark_as_seen();
378 r
379 }
380
381 /// Makes a copy of the current state of the watched data, and marks it as
382 /// seen.
383 ///
384 /// This can be useful when `T` is relatively small --- and implements
385 /// `Copy`, of course. This is equivalent to the handwritten version:
386 ///
387 /// ```
388 /// let copy = watch.glimpse_and_update(|current| *current);
389 /// ```
390 ///
391 /// To copy the contents _without_ marking them as seen, see
392 /// [`copy_current`][Self::copy_current].
393 pub fn copy_current_and_update(&mut self) -> T
394 where T: Copy
395 {
396 self.glimpse_and_update(|value| *value)
397 }
398
399 /// Resets this `Receiver`'s notion of what data has been seen, ensuring
400 /// that the current contents of the watched data will be treated as new for
401 /// the purposes of [`changed`][Self::changed].
402 pub fn mark_as_unseen(&mut self) {
403 // What's an integer that is guaranteed to not be equal to the current
404 // version, and is also quite unlikely to be equal to any version in the
405 // near future? How about the _previous_ version!
406 self.version = self.shared.version.get().wrapping_sub(1);
407 }
408
409 /// Marks the current contents of the watched data as having been seen,
410 /// whether or not this `Receiver` has actually seen them.
411 pub fn mark_as_seen(&mut self) {
412 self.version = self.shared.version.get();
413 }
414}