1use crate::{OptionsChangeTokenSource, OptionsFactory, OptionsMonitorCache, Ref, Value};
2use std::ops::Deref;
3use std::sync::{Arc, RwLock, Weak};
4
5pub struct Subscription<T: Value>(Arc<dyn Fn(Option<&str>, Ref<T>) + Send + Sync>);
11
12impl<T: Value> Subscription<T> {
13 pub fn new(callback: Arc<dyn Fn(Option<&str>, Ref<T>) + Send + Sync>) -> Self {
15 Self(callback)
16 }
17}
18
19unsafe impl<T: Send + Sync> Send for Subscription<T> {}
20unsafe impl<T: Send + Sync> Sync for Subscription<T> {}
21
22#[cfg_attr(feature = "async", maybe_impl::traits(Send, Sync))]
24pub trait OptionsMonitor<T: Value> {
25 fn current_value(&self) -> Ref<T> {
27 self.get(None)
28 }
29
30 fn get(&self, name: Option<&str>) -> Ref<T>;
36
37 fn on_change(
48 &self,
49 listener: Box<dyn Fn(Option<&str>, Ref<T>) + Send + Sync>,
50 ) -> Subscription<T>;
51}
52
53pub struct DefaultOptionsMonitor<T: Value> {
55 tracker: Arc<ChangeTracker<T>>,
56 _subscriptions: Vec<Box<dyn tokens::Subscription>>,
57}
58
59impl<T: Value + 'static> DefaultOptionsMonitor<T> {
60 pub fn new(
68 cache: Ref<dyn OptionsMonitorCache<T>>,
69 sources: Vec<Ref<dyn OptionsChangeTokenSource<T>>>,
70 factory: Ref<dyn OptionsFactory<T>>,
71 ) -> Self {
72 let tracker = Arc::new(ChangeTracker::new(cache, factory));
73 let mut subscriptions = Vec::new();
74
75 for source in sources {
77 let producer = Producer::new(source.clone());
78 let consumer = tracker.clone();
79 let state = source.name().map(|n| Arc::new(n.to_owned()));
80 let subscription: Box<dyn tokens::Subscription> = Box::new(tokens::on_change(
81 move || producer.token(),
82 move |state| {
83 if let Some(name) = state {
84 consumer.on_change(Some(name.as_str()));
85 } else {
86 consumer.on_change(None);
87 };
88 },
89 state,
90 ));
91 subscriptions.push(subscription);
92 }
93
94 Self {
95 tracker,
96 _subscriptions: subscriptions,
97 }
98 }
99}
100
101unsafe impl<T: Send + Sync> Send for DefaultOptionsMonitor<T> {}
102unsafe impl<T: Send + Sync> Sync for DefaultOptionsMonitor<T> {}
103
104impl<T: Value> OptionsMonitor<T> for DefaultOptionsMonitor<T> {
105 fn get(&self, name: Option<&str>) -> Ref<T> {
106 self.tracker.get(name)
107 }
108
109 fn on_change(
110 &self,
111 listener: Box<dyn Fn(Option<&str>, Ref<T>) + Send + Sync>,
112 ) -> Subscription<T> {
113 self.tracker.add(listener)
114 }
115}
116
117struct ChangeTracker<T: Value> {
118 cache: Ref<dyn OptionsMonitorCache<T>>,
119 factory: Ref<dyn OptionsFactory<T>>,
120 listeners: RwLock<Vec<Weak<dyn Fn(Option<&str>, Ref<T>) + Send + Sync>>>,
121}
122
123impl<T: Value> ChangeTracker<T> {
124 fn new(cache: Ref<dyn OptionsMonitorCache<T>>, factory: Ref<dyn OptionsFactory<T>>) -> Self {
125 Self {
126 cache,
127 factory,
128 listeners: Default::default(),
129 }
130 }
131
132 fn get(&self, name: Option<&str>) -> Ref<T> {
133 self.cache
134 .get_or_add(name, &|n| self.factory.create(n).unwrap())
135 }
136
137 fn add(&self, listener: Box<dyn Fn(Option<&str>, Ref<T>) + Send + Sync>) -> Subscription<T> {
138 let mut listeners = self.listeners.write().unwrap();
139
140 for i in (0..listeners.len()).rev() {
143 if listeners[i].upgrade().is_none() {
144 listeners.remove(i);
145 }
146 }
147
148 let source: Arc<dyn Fn(Option<&str>, Ref<T>) + Send + Sync> = Arc::from(listener);
149
150 listeners.push(Arc::downgrade(&source));
151 Subscription::new(source)
152 }
153
154 fn on_change(&self, name: Option<&str>) {
155 let callbacks: Vec<_> = self
160 .listeners
161 .read()
162 .unwrap()
163 .iter()
164 .filter_map(|c| c.upgrade())
165 .collect();
166
167 self.cache.try_remove(name);
168
169 for callback in callbacks {
170 callback(name, self.get(name));
171 }
172 }
173}
174
175unsafe impl<T: Value> Send for ChangeTracker<T> {}
176unsafe impl<T: Value> Sync for ChangeTracker<T> {}
177
178struct Producer<T: Value>(Ref<dyn OptionsChangeTokenSource<T>>);
179
180impl<T: Value> Producer<T> {
181 fn new(source: Ref<dyn OptionsChangeTokenSource<T>>) -> Self {
182 Self(source)
183 }
184}
185
186impl<T: Value> Deref for Producer<T> {
187 type Target = dyn OptionsChangeTokenSource<T>;
188
189 fn deref(&self) -> &Self::Target {
190 self.0.deref()
191 }
192}
193
194unsafe impl<T: Value> Send for Producer<T> {}
195unsafe impl<T: Value> Sync for Producer<T> {}
196
197#[cfg(test)]
198mod tests {
199
200 use super::*;
201 use crate::*;
202 use std::{
203 cell::RefCell,
204 sync::atomic::{AtomicBool, AtomicU8, Ordering},
205 };
206 use tokens::{ChangeToken, SharedChangeToken, SingleChangeToken};
207
208 #[derive(Default)]
209 struct Config {
210 retries: u8,
211 }
212
213 pub struct OptionsState {
214 dirty: AtomicBool,
215 }
216
217 impl OptionsState {
218 fn is_dirty(&self) -> bool {
219 self.dirty.load(Ordering::SeqCst)
220 }
221
222 fn mark_dirty(&self) {
223 self.dirty.store(true, Ordering::SeqCst)
224 }
225
226 fn reset(&self) {
227 self.dirty.store(false, Ordering::SeqCst)
228 }
229 }
230
231 impl Default for OptionsState {
232 fn default() -> Self {
233 Self {
234 dirty: AtomicBool::new(true),
235 }
236 }
237 }
238
239 #[derive(Default)]
240 struct ConfigSetup {
241 counter: AtomicU8,
242 }
243
244 impl ConfigureOptions<Config> for ConfigSetup {
245 fn configure(&self, name: Option<&str>, options: &mut Config) {
246 if name.is_none() {
247 let retries = self.counter.fetch_add(1, Ordering::SeqCst) + 1;
248 options.retries = retries;
249 }
250 }
251 }
252
253 #[derive(Default)]
254 struct ConfigSource {
255 token: SharedChangeToken<SingleChangeToken>,
256 }
257
258 impl ConfigSource {
259 fn changed(&self) {
260 self.token.notify()
261 }
262 }
263
264 impl OptionsChangeTokenSource<Config> for ConfigSource {
265 fn token(&self) -> Box<dyn ChangeToken> {
266 Box::new(self.token.clone())
267 }
268 }
269
270 struct Foo {
271 monitor: Ref<dyn OptionsMonitor<Config>>,
272 _sub: Subscription<Config>,
273 state: Arc<OptionsState>,
274 retries: RefCell<u8>,
275 }
276
277 impl Foo {
278 fn new(monitor: Ref<dyn OptionsMonitor<Config>>) -> Self {
279 let state = Arc::new(OptionsState::default());
280 let other = state.clone();
281
282 Self {
283 monitor: monitor.clone(),
284 _sub: monitor.on_change(Box::new(
285 move |_name: Option<&str>, _options: Ref<Config>| other.mark_dirty(),
286 )),
287 state,
288 retries: RefCell::default(),
289 }
290 }
291
292 fn retries(&self) -> u8 {
293 if self.state.is_dirty() {
294 *self.retries.borrow_mut() = self.monitor.current_value().retries;
295 self.state.reset();
296 }
297
298 self.retries.borrow().clone()
299 }
300 }
301
302 #[test]
303 fn monitored_options_should_update_when_source_changes() {
304 let cache = Ref::new(OptionsCache::<Config>::default());
306 let setup = Ref::new(ConfigSetup::default());
307 let factory = Ref::new(DefaultOptionsFactory::new(
308 vec![setup],
309 Vec::default(),
310 Vec::default(),
311 ));
312 let source = Ref::new(ConfigSource::default());
313 let monitor = Ref::new(DefaultOptionsMonitor::new(
314 cache,
315 vec![source.clone()],
316 factory,
317 ));
318 let foo = Foo::new(monitor);
319 let initial = foo.retries();
320
321 source.changed();
323
324 assert_eq!(initial, 1);
326 assert_eq!(foo.retries(), 2);
327 }
328}