tokens/
default.rs

1use crate::{Callback, ChangeToken, Registration};
2use std::{
3    any::Any,
4    mem,
5    sync::{Arc, Mutex, Weak},
6};
7
8type State = Option<Arc<dyn Any>>;
9type StatefulCallback = dyn Fn(State) + Send + Sync;
10type CallbackWithState = (Weak<StatefulCallback>, State);
11
12struct Ready {
13    fired: bool,
14    callbacks: Vec<(Arc<StatefulCallback>, State)>,
15}
16
17impl IntoIterator for Ready {
18    type Item = (Arc<StatefulCallback>, State);
19    type IntoIter = std::vec::IntoIter<Self::Item>;
20
21    fn into_iter(self) -> Self::IntoIter {
22        self.callbacks.into_iter()
23    }
24}
25
26#[derive(Default)]
27struct Notification {
28    fired: bool,
29    callbacks: Vec<CallbackWithState>,
30}
31
32impl Notification {
33    fn fire(&mut self, once: bool) -> Ready {
34        Ready {
35            fired: mem::replace(&mut self.fired, once),
36            callbacks: self
37                .callbacks
38                .iter()
39                .filter_map(|r| r.0.upgrade().map(|c| (c, r.1.clone())))
40                .collect(),
41        }
42    }
43
44    fn register(
45        &mut self,
46        callback: Callback,
47        state: Option<Arc<dyn Any>>,
48    ) -> Arc<StatefulCallback> {
49        // writes are much infrequent, so do the trimming of any dead callbacks now
50        if !self.callbacks.is_empty() {
51            for i in (0..self.callbacks.len()).rev() {
52                if self.callbacks[i].0.upgrade().is_none() {
53                    self.callbacks.remove(i);
54                }
55            }
56        }
57
58        let source: Arc<StatefulCallback> = Arc::from(callback);
59
60        self.callbacks.push((Arc::downgrade(&source), state));
61        source
62    }
63}
64
65/// Represents a default [`ChangeToken`](crate::ChangeToken) that may change zero or more times.
66#[derive(Default)]
67pub struct DefaultChangeToken {
68    once: bool,
69    notification: Mutex<Notification>,
70}
71
72impl DefaultChangeToken {
73    pub(crate) fn once() -> Self {
74        Self {
75            once: true,
76            ..Default::default()
77        }
78    }
79
80    /// Initializes a new default change token.
81    pub fn new() -> Self {
82        Self::default()
83    }
84
85    /// Notifies any registered callbacks of a change.
86    pub fn notify(&self) {
87        let notification = self.notification.lock().unwrap().fire(self.once);
88
89        if !notification.fired {
90            for (callback, state) in notification {
91                callback(state);
92            }
93        }
94    }
95}
96
97impl ChangeToken for DefaultChangeToken {
98    fn changed(&self) -> bool {
99        // this is uninteresting and unusable in sync contexts. the value
100        // will be true, invoke callbacks, and then likely revert to false
101        // before it can be observed. it 'might' be useful in an async context,
102        // but a callback is the most practical way a change would be observed
103        self.notification.lock().unwrap().fired
104    }
105
106    fn register(&self, callback: Callback, state: Option<Arc<dyn Any>>) -> Registration {
107        Registration::new(self.notification.lock().unwrap().register(callback, state))
108    }
109}
110
111unsafe impl Send for DefaultChangeToken {}
112unsafe impl Sync for DefaultChangeToken {}
113
114#[cfg(test)]
115mod tests {
116
117    use super::*;
118    use std::sync::{
119        atomic::{AtomicU8, Ordering::Relaxed},
120        Arc,
121    };
122
123    #[test]
124    fn default_change_token_should_be_unchanged() {
125        // arrange
126        let token = DefaultChangeToken::default();
127
128        // act
129        let changed = token.changed();
130
131        // assert
132        assert_eq!(changed, false);
133    }
134
135    #[test]
136    fn default_change_token_should_invoke_callback() {
137        // arrange
138        let counter = Arc::new(AtomicU8::default());
139        let token = DefaultChangeToken::default();
140        let _registration = token.register(
141            Box::new(|state| {
142                state
143                    .unwrap()
144                    .downcast_ref::<AtomicU8>()
145                    .unwrap()
146                    .fetch_add(1, Relaxed);
147            }),
148            Some(counter.clone()),
149        );
150
151        // act
152        token.notify();
153
154        // assert
155        assert_eq!(counter.load(Relaxed), 1);
156    }
157
158    #[test]
159    fn default_change_token_should_invoke_callback_multiple_times() {
160        // arrange
161        let counter = Arc::new(AtomicU8::default());
162        let token = DefaultChangeToken::default();
163        let _registration = token.register(
164            Box::new(|state| {
165                state
166                    .unwrap()
167                    .downcast_ref::<AtomicU8>()
168                    .unwrap()
169                    .fetch_add(1, Relaxed);
170            }),
171            Some(counter.clone()),
172        );
173        token.notify();
174
175        // act
176        token.notify();
177
178        // assert
179        assert_eq!(counter.load(Relaxed), 2);
180    }
181}