1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
//! Reactive change notifications using futures. //! //! This is the successor of //! [raii-change-tracker](https://crates.io/crates/raii-change-tracker). //! //! The main item of interest is [`ChangeTracker`](struct.ChangeTracker.html). //! Creating a new with [`ChangeTracker::new(value: //! T)`](struct.ChangeTracker.html#method.new) will take ownership of the value //! of type `T`. You can then create a futures::Stream (with //! [`get_changes()`](struct.ChangeTracker.html#method.get_changes)) that emits //! a tuple `(old_value, new_value)` of type `(T, T)` upon every change to the //! owned value. The value can be changed with the //! [`modify()`](struct.ChangeTracker.html#method.modify) method of //! `ChangeTracker` and read using the `as_ref()` method from the `AsRef` trait. //! //! ## Example //! //! In this example, all the functionality of `ChangeTracker` is shown. A //! [`tokio`](https://crates.io/crates/tokio) runtime is used to execute the //! futures. //! //! ```rust //! extern crate async_change_tracker; //! extern crate futures; //! extern crate tokio; //! //! use futures::Stream; //! //! // Wrap an integer `with ChangeTracker` //! let mut change_tracker = async_change_tracker::ChangeTracker::new( 123 ); //! //! // Create an receiver that fires when the value changes //! let rx = change_tracker.get_changes(1); //! //! // In this example, upon change, check the old and new value are correct. //! let rx_printer = rx.for_each(|(old_value, new_value)| { //! assert_eq!( old_value, 123); //! assert_eq!( new_value, 124); //! // Here in this example, we return error to abort the stream. //! // In normal usage, typically an `Ok` value would be returned. //! futures::future::err(()) //! }); //! //! // Now, check and then change the value. //! change_tracker.modify(|scoped_store| { //! assert_eq!(*scoped_store, 123); //! *scoped_store += 1; //! }); //! //! // Wait until the stream is done. In this example, the stream ends due to //! // the error we return in the `for_each` closure above. In normal usage, //! // typically the stream would finish for a different reason. //! match tokio::runtime::current_thread::block_on_all(rx_printer) { //! Ok(_) => panic!("should not get here"), //! Err(()) => (), //! } //! //! // Finally, check that the final value is as expected. //! assert!(*change_tracker.as_ref() == 124); //! ``` #![deny(missing_docs)] #[macro_use] extern crate log; extern crate futures; extern crate parking_lot; use futures::sync::mpsc; use futures::Sink; use parking_lot::Mutex; use std::sync::Arc; /// Tracks changes to data. Notifies listeners via a `futures::Stream`. /// /// The data to be tracked is type `T`. The value of type `T` is wrapped in a /// private field. The `AsRef` trait is implemented so `&T` can be obtained by /// calling `as_ref()`. Read and write access can be gained by calling the /// `modify` method. /// /// Subsribe to changes by calling `get_changes`. /// /// See the module-level documentation for more information and a usage example. pub struct ChangeTracker<T> { value: T, senders: Arc<Mutex<Vec<mpsc::Sender<(T, T)>>>>, } impl<T> ChangeTracker<T> where T: Clone, { /// Create a new `ChangeTracker` which takes ownership /// of the data of type `T`. pub fn new(value: T) -> Self { Self { value, senders: Arc::new(Mutex::new(Vec::new())), } } /// Returns a `futures::Stream` that emits a message when a change occurs /// /// The capacity of the underlying channel is specified with the `capacity` /// argument. /// /// To remove a listener, drop the Receiver. pub fn get_changes(&self, capacity: usize) -> mpsc::Receiver<(T, T)> { let (tx, rx) = mpsc::channel(capacity); let mut senders = self.senders.lock(); senders.push(tx); rx } /// Modify the data value, notifying listeners upon change. pub fn modify<F>(&mut self, f: F) where F: FnOnce(&mut T), { let orig = self.value.clone(); f(&mut self.value); let newval = self.value.clone(); { let mut senders = self.senders.lock(); let mut keep = vec![]; for mut on_changed_tx in senders.drain(0..) { // TODO use .send() here? match on_changed_tx.start_send((orig.clone(), newval.clone())) { Ok(_) => keep.push(on_changed_tx), Err(_) => trace!("receiver dropped"), } } senders.extend(keep); } } } impl<T> AsRef<T> for ChangeTracker<T> { fn as_ref(&self) -> &T { &self.value } }