async_change_tracker/
lib.rs

1//! Reactive change notifications using futures.
2//!
3//! The `ChangeTracker<T>` type wraps an owned value `T`. Changes to `T` are
4//! done within a function or closure implementing `FnOnce(&mut T)`. When this
5//! returns, any changes are sent to listeners using a `futures::Stream`.
6//!
7//! In slightly more detail, create a `ChangeTracker<T>` with
8//! [`ChangeTracker::new(value: T)`](struct.ChangeTracker.html#method.new). This
9//! will take ownership of the value of type `T`. You can then create a
10//! `futures::Stream` (with
11//! [`get_changes()`](struct.ChangeTracker.html#method.get_changes)) that emits
12//! a tuple `(old_value, new_value)` of type `(T, T)` upon every change to the
13//! owned value. The value can be changed with the
14//! [`modify()`](struct.ChangeTracker.html#method.modify) method of
15//! `ChangeTracker` and read using the `as_ref()` method from the `AsRef` trait.
16//!
17//! ## Example
18//!
19//! In this example, the functionality of `ChangeTracker` is shown.
20//!
21//! ```rust
22//! use futures::stream::StreamExt;
23//!
24//! // Wrap an integer `with ChangeTracker`
25//! let mut change_tracker = async_change_tracker::ChangeTracker::new( 123 );
26//!
27//! // Create an receiver that fires when the value changes. The channel size
28//! // is 1, meaning at most one change can be buffered before backpressure
29//! // is applied.
30//! let rx = change_tracker.get_changes(1);
31//!
32//! // In this example take a single change and check that the old and new value
33//! // are correct.
34//! let rx_printer = rx.take(1).for_each(|(old_value, new_value)| {
35//!     assert_eq!( old_value, 123);
36//!     assert_eq!( new_value, 124);
37//!     futures::future::ready(())
38//! });
39//!
40//! // Now, check and then change the value.
41//! change_tracker.modify(|mut_ref_value| {
42//!     assert_eq!(*mut_ref_value, 123);
43//!     *mut_ref_value += 1;
44//! });
45//!
46//! // Wait until the stream is done. In this example, the stream ends due to
47//! // the use of `.take(1)` prior to `for_each` above. In normal usage,
48//! // typically the stream would finish for a different reason.
49//! futures::executor::block_on(rx_printer);
50//!
51//! // Finally, check that the final value is as expected.
52//! assert!(*change_tracker.as_ref() == 124);
53//! ```
54//!
55//! ## Testing
56//!
57//! To test:
58//!
59//! ```text
60//! cargo test
61//! ```
62#![deny(missing_docs)]
63
64use futures::channel::mpsc;
65use std::sync::{Arc, RwLock};
66
67/// Tracks changes to data. Notifies listeners via a `futures::Stream`.
68///
69/// The data to be tracked is type `T`. The value of type `T` is wrapped in a
70/// private field. The `AsRef` trait is implemented so `&T` can be obtained by
71/// calling `as_ref()`. Read and write access can be gained by calling the
72/// `modify` method.
73///
74/// Subscribe to changes by calling `get_changes`.
75///
76/// Note that this does not implement Clone because typically this is not what
77/// you want. Rather, you should wrap ChangeTracker in `Arc<RwLock>` or similar.
78///
79/// See the module-level documentation for more information and a usage example.
80pub struct ChangeTracker<T> {
81    value: T,
82    senders: Arc<RwLock<VecSender<T>>>,
83}
84
85type VecSender<T> = Vec<mpsc::Sender<(T, T)>>;
86
87impl<T> ChangeTracker<T>
88where
89    T: Clone,
90{
91    /// Create a new `ChangeTracker` which takes ownership
92    /// of the data of type `T`.
93    pub fn new(value: T) -> Self {
94        Self {
95            value,
96            senders: Arc::new(RwLock::new(Vec::new())),
97        }
98    }
99
100    /// Returns a `futures::Stream` that emits a message when a change occurs
101    ///
102    /// The capacity of the underlying channel is specified with the `capacity`
103    /// argument.
104    ///
105    /// To remove a listener, drop the Receiver.
106    pub fn get_changes(&self, capacity: usize) -> mpsc::Receiver<(T, T)> {
107        let (tx, rx) = mpsc::channel(capacity);
108        let mut senders = self.senders.write().unwrap();
109        senders.push(tx);
110        rx
111    }
112
113    /// Modify the data value, notifying listeners upon change.
114    pub fn modify<F>(&mut self, f: F)
115    where
116        F: FnOnce(&mut T),
117    {
118        let orig = self.value.clone();
119        f(&mut self.value);
120        let newval = self.value.clone();
121        {
122            let mut senders = self.senders.write().unwrap();
123            let mut keep = vec![];
124            for mut on_changed_tx in senders.drain(0..) {
125                // TODO use .send() here?
126                match on_changed_tx.start_send((orig.clone(), newval.clone())) {
127                    Ok(_) => {
128                        keep.push(on_changed_tx);
129                    }
130                    Err(e) => {
131                        if e.is_disconnected() {
132                            tracing::trace!("receiver dropped");
133                        } else {
134                            tracing::trace!("error on start_send: {e}");
135                            keep.push(on_changed_tx);
136                        }
137                    }
138                }
139            }
140            senders.extend(keep);
141        }
142    }
143}
144
145impl<T> AsRef<T> for ChangeTracker<T> {
146    fn as_ref(&self) -> &T {
147        &self.value
148    }
149}