1use crate::{Callback, ChangeToken, Registration};
2use std::{
3 any::Any,
4 mem,
5 sync::{Arc, Mutex, Weak},
6};
7
8type State = Option<Arc<dyn Any>>;
9type StatefulCallback = dyn Fn(State) + Send + Sync;
10type CallbackWithState = (Weak<StatefulCallback>, State);
11
12struct Ready {
13 fired: bool,
14 callbacks: Vec<(Arc<StatefulCallback>, State)>,
15}
16
17impl IntoIterator for Ready {
18 type Item = (Arc<StatefulCallback>, State);
19 type IntoIter = std::vec::IntoIter<Self::Item>;
20
21 fn into_iter(self) -> Self::IntoIter {
22 self.callbacks.into_iter()
23 }
24}
25
26#[derive(Default)]
27struct Notification {
28 fired: bool,
29 callbacks: Vec<CallbackWithState>,
30}
31
32impl Notification {
33 fn fire(&mut self, once: bool) -> Ready {
34 Ready {
35 fired: mem::replace(&mut self.fired, once),
36 callbacks: self
37 .callbacks
38 .iter()
39 .filter_map(|r| r.0.upgrade().map(|c| (c, r.1.clone())))
40 .collect(),
41 }
42 }
43
44 fn register(
45 &mut self,
46 callback: Callback,
47 state: Option<Arc<dyn Any>>,
48 ) -> Arc<StatefulCallback> {
49 if !self.callbacks.is_empty() {
51 for i in (0..self.callbacks.len()).rev() {
52 if self.callbacks[i].0.upgrade().is_none() {
53 self.callbacks.remove(i);
54 }
55 }
56 }
57
58 let source: Arc<StatefulCallback> = Arc::from(callback);
59
60 self.callbacks.push((Arc::downgrade(&source), state));
61 source
62 }
63}
64
65#[derive(Default)]
67pub struct DefaultChangeToken {
68 once: bool,
69 notification: Mutex<Notification>,
70}
71
72impl DefaultChangeToken {
73 pub(crate) fn once() -> Self {
74 Self {
75 once: true,
76 ..Default::default()
77 }
78 }
79
80 pub fn new() -> Self {
82 Self::default()
83 }
84
85 pub fn notify(&self) {
87 let notification = self.notification.lock().unwrap().fire(self.once);
88
89 if !notification.fired {
90 for (callback, state) in notification {
91 callback(state);
92 }
93 }
94 }
95}
96
97impl ChangeToken for DefaultChangeToken {
98 fn changed(&self) -> bool {
99 self.notification.lock().unwrap().fired
104 }
105
106 fn register(&self, callback: Callback, state: Option<Arc<dyn Any>>) -> Registration {
107 Registration::new(self.notification.lock().unwrap().register(callback, state))
108 }
109}
110
111unsafe impl Send for DefaultChangeToken {}
112unsafe impl Sync for DefaultChangeToken {}
113
114#[cfg(test)]
115mod tests {
116
117 use super::*;
118 use std::sync::{
119 atomic::{AtomicU8, Ordering::Relaxed},
120 Arc,
121 };
122
123 #[test]
124 fn default_change_token_should_be_unchanged() {
125 let token = DefaultChangeToken::default();
127
128 let changed = token.changed();
130
131 assert_eq!(changed, false);
133 }
134
135 #[test]
136 fn default_change_token_should_invoke_callback() {
137 let counter = Arc::new(AtomicU8::default());
139 let token = DefaultChangeToken::default();
140 let _registration = token.register(
141 Box::new(|state| {
142 state
143 .unwrap()
144 .downcast_ref::<AtomicU8>()
145 .unwrap()
146 .fetch_add(1, Relaxed);
147 }),
148 Some(counter.clone()),
149 );
150
151 token.notify();
153
154 assert_eq!(counter.load(Relaxed), 1);
156 }
157
158 #[test]
159 fn default_change_token_should_invoke_callback_multiple_times() {
160 let counter = Arc::new(AtomicU8::default());
162 let token = DefaultChangeToken::default();
163 let _registration = token.register(
164 Box::new(|state| {
165 state
166 .unwrap()
167 .downcast_ref::<AtomicU8>()
168 .unwrap()
169 .fetch_add(1, Relaxed);
170 }),
171 Some(counter.clone()),
172 );
173 token.notify();
174
175 token.notify();
177
178 assert_eq!(counter.load(Relaxed), 2);
180 }
181}