1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
//! Reactive change notifications using futures.
//!
//! The `ChangeTracker<T>` type wraps an owned value `T`. Changes to `T` are
//! done within a function or closure implementing `FnOnce(&mut T)`. When this
//! returns, any changes are sent to listeners using a `futures::Stream`.
//!
//! In slightly more detail, create a `ChangeTracker<T>` with
//! [`ChangeTracker::new(value: T)`](struct.ChangeTracker.html#method.new). This
//! will take ownership of the value of type `T`. You can then create a
//! `futures::Stream` (with
//! [`get_changes()`](struct.ChangeTracker.html#method.get_changes)) that emits
//! a tuple `(old_value, new_value)` of type `(T, T)` upon every change to the
//! owned value. The value can be changed with the
//! [`modify()`](struct.ChangeTracker.html#method.modify) method of
//! `ChangeTracker` and read using the `as_ref()` method from the `AsRef` trait.
//!
//! ## Example
//!
//! In this example, the functionality of `ChangeTracker` is shown.
//!
//! ```rust
//! use futures::stream::StreamExt;
//!
//! // Wrap an integer `with ChangeTracker`
//! let mut change_tracker = async_change_tracker::ChangeTracker::new( 123 );
//!
//! // Create an receiver that fires when the value changes. The channel size
//! // is 1, meaning at most one change can be buffered before backpressure
//! // is applied.
//! let rx = change_tracker.get_changes(1);
//!
//! // In this example take a single change and check that the old and new value
//! // are correct.
//! let rx_printer = rx.take(1).for_each(|(old_value, new_value)| {
//!     assert_eq!( old_value, 123);
//!     assert_eq!( new_value, 124);
//!     futures::future::ready(())
//! });
//!
//! // Now, check and then change the value.
//! change_tracker.modify(|mut_ref_value| {
//!     assert_eq!(*mut_ref_value, 123);
//!     *mut_ref_value += 1;
//! });
//!
//! // Wait until the stream is done. In this example, the stream ends due to
//! // the use of `.take(1)` prior to `for_each` above. In normal usage,
//! // typically the stream would finish for a different reason.
//! futures::executor::block_on(rx_printer);
//!
//! // Finally, check that the final value is as expected.
//! assert!(*change_tracker.as_ref() == 124);
//! ```
//!
//! ## Testing
//!
//! To test:
//!
//! ```text
//! cargo test
//! ```
#![deny(missing_docs)]

use futures::channel::mpsc;
use std::sync::{Arc, RwLock};

/// Tracks changes to data. Notifies listeners via a `futures::Stream`.
///
/// The data to be tracked is type `T`. The value of type `T` is wrapped in a
/// private field. The `AsRef` trait is implemented so `&T` can be obtained by
/// calling `as_ref()`. Read and write access can be gained by calling the
/// `modify` method.
///
/// Subscribe to changes by calling `get_changes`.
///
/// Note that this does not implement Clone because typically this is not what
/// you want. Rather, you should wrap ChangeTracker in `Arc<RwLock>` or similar.
///
/// See the module-level documentation for more information and a usage example.
pub struct ChangeTracker<T> {
    value: T,
    senders: Arc<RwLock<VecSender<T>>>,
}

type VecSender<T> = Vec<mpsc::Sender<(T, T)>>;

impl<T> ChangeTracker<T>
where
    T: Clone,
{
    /// Create a new `ChangeTracker` which takes ownership
    /// of the data of type `T`.
    pub fn new(value: T) -> Self {
        Self {
            value,
            senders: Arc::new(RwLock::new(Vec::new())),
        }
    }

    /// Returns a `futures::Stream` that emits a message when a change occurs
    ///
    /// The capacity of the underlying channel is specified with the `capacity`
    /// argument.
    ///
    /// To remove a listener, drop the Receiver.
    pub fn get_changes(&self, capacity: usize) -> mpsc::Receiver<(T, T)> {
        let (tx, rx) = mpsc::channel(capacity);
        let mut senders = self.senders.write().unwrap();
        senders.push(tx);
        rx
    }

    /// Modify the data value, notifying listeners upon change.
    pub fn modify<F>(&mut self, f: F)
    where
        F: FnOnce(&mut T),
    {
        let orig = self.value.clone();
        f(&mut self.value);
        let newval = self.value.clone();
        {
            let mut senders = self.senders.write().unwrap();
            let mut keep = vec![];
            for mut on_changed_tx in senders.drain(0..) {
                // TODO use .send() here?
                match on_changed_tx.start_send((orig.clone(), newval.clone())) {
                    Ok(_) => {
                        keep.push(on_changed_tx);
                    }
                    Err(e) => {
                        if e.is_disconnected() {
                            tracing::trace!("receiver dropped");
                        } else {
                            tracing::trace!("error on start_send: {e}");
                            keep.push(on_changed_tx);
                        }
                    }
                }
            }
            senders.extend(keep);
        }
    }
}

impl<T> AsRef<T> for ChangeTracker<T> {
    fn as_ref(&self) -> &T {
        &self.value
    }
}