options/
monitor.rs

1use crate::{OptionsChangeTokenSource, OptionsFactory, OptionsMonitorCache, Ref, Value};
2use std::ops::Deref;
3use std::sync::{Arc, RwLock, Weak};
4
5/// Represents a change subscription.
6///
7/// # Remarks
8///
9/// When the subscription is dropped, the underlying callback is unsubscribed.
10pub struct Subscription<T: Value>(Arc<dyn Fn(Option<&str>, Ref<T>) + Send + Sync>);
11
12impl<T: Value> Subscription<T> {
13    /// Initializes a new change token registration.
14    pub fn new(callback: Arc<dyn Fn(Option<&str>, Ref<T>) + Send + Sync>) -> Self {
15        Self(callback)
16    }
17}
18
19unsafe impl<T: Send + Sync> Send for Subscription<T> {}
20unsafe impl<T: Send + Sync> Sync for Subscription<T> {}
21
22/// Defines the behavior for notifications when [`Options`](crate::Options) instances change.
23#[cfg_attr(feature = "async", maybe_impl::traits(Send, Sync))]
24pub trait OptionsMonitor<T: Value> {
25    /// Returns the current instance with the default options name.
26    fn current_value(&self) -> Ref<T> {
27        self.get(None)
28    }
29
30    /// Returns a configured instance with the given name.
31    ///
32    /// # Arguments
33    ///
34    /// * `name` - The name associated with the options.
35    fn get(&self, name: Option<&str>) -> Ref<T>;
36
37    /// Registers a callback function to be invoked when the configured instance with the given name changes.
38    ///
39    /// # Arguments
40    ///
41    /// * `listener` - The callback function to invoke
42    ///
43    /// # Returns
44    ///
45    /// A change subscription for the specified options. When the subscription is dropped, no further
46    /// notifications will be propagated.
47    fn on_change(
48        &self,
49        listener: Box<dyn Fn(Option<&str>, Ref<T>) + Send + Sync>,
50    ) -> Subscription<T>;
51}
52
53/// Represents the default implementation for notifications when option instances change.
54pub struct DefaultOptionsMonitor<T: Value> {
55    tracker: Arc<ChangeTracker<T>>,
56    _subscriptions: Vec<Box<dyn tokens::Subscription>>,
57}
58
59impl<T: Value + 'static> DefaultOptionsMonitor<T> {
60    /// Initializes a new default options monitor.
61    ///
62    /// # Arguments
63    ///
64    /// * `cache` - The [cache](crate::OptionsMonitorCache) used for monitored options
65    /// * `sources` - The [source tokens](crate::OptionsChangeTokenSource) used to track option changes
66    /// * `factory` - The [factory](crate::OptionsFactory) used to create new options
67    pub fn new(
68        cache: Ref<dyn OptionsMonitorCache<T>>,
69        sources: Vec<Ref<dyn OptionsChangeTokenSource<T>>>,
70        factory: Ref<dyn OptionsFactory<T>>,
71    ) -> Self {
72        let tracker = Arc::new(ChangeTracker::new(cache, factory));
73        let mut subscriptions = Vec::new();
74
75        // SAFETY: the following is not guaranteed to be safe unless 'async' is enabled
76        for source in sources {
77            let producer = Producer::new(source.clone());
78            let consumer = tracker.clone();
79            let state = source.name().map(|n| Arc::new(n.to_owned()));
80            let subscription: Box<dyn tokens::Subscription> = Box::new(tokens::on_change(
81                move || producer.token(),
82                move |state| {
83                    if let Some(name) = state {
84                        consumer.on_change(Some(name.as_str()));
85                    } else {
86                        consumer.on_change(None);
87                    };
88                },
89                state,
90            ));
91            subscriptions.push(subscription);
92        }
93
94        Self {
95            tracker,
96            _subscriptions: subscriptions,
97        }
98    }
99}
100
101unsafe impl<T: Send + Sync> Send for DefaultOptionsMonitor<T> {}
102unsafe impl<T: Send + Sync> Sync for DefaultOptionsMonitor<T> {}
103
104impl<T: Value> OptionsMonitor<T> for DefaultOptionsMonitor<T> {
105    fn get(&self, name: Option<&str>) -> Ref<T> {
106        self.tracker.get(name)
107    }
108
109    fn on_change(
110        &self,
111        listener: Box<dyn Fn(Option<&str>, Ref<T>) + Send + Sync>,
112    ) -> Subscription<T> {
113        self.tracker.add(listener)
114    }
115}
116
117struct ChangeTracker<T: Value> {
118    cache: Ref<dyn OptionsMonitorCache<T>>,
119    factory: Ref<dyn OptionsFactory<T>>,
120    listeners: RwLock<Vec<Weak<dyn Fn(Option<&str>, Ref<T>) + Send + Sync>>>,
121}
122
123impl<T: Value> ChangeTracker<T> {
124    fn new(cache: Ref<dyn OptionsMonitorCache<T>>, factory: Ref<dyn OptionsFactory<T>>) -> Self {
125        Self {
126            cache,
127            factory,
128            listeners: Default::default(),
129        }
130    }
131
132    fn get(&self, name: Option<&str>) -> Ref<T> {
133        self.cache
134            .get_or_add(name, &|n| self.factory.create(n).unwrap())
135    }
136
137    fn add(&self, listener: Box<dyn Fn(Option<&str>, Ref<T>) + Send + Sync>) -> Subscription<T> {
138        let mut listeners = self.listeners.write().unwrap();
139
140        // writes are much infrequent and we already need to escalate
141        // to a write-lock, so do the trimming of any dead callbacks now
142        for i in (0..listeners.len()).rev() {
143            if listeners[i].upgrade().is_none() {
144                listeners.remove(i);
145            }
146        }
147
148        let source: Arc<dyn Fn(Option<&str>, Ref<T>) + Send + Sync> = Arc::from(listener);
149
150        listeners.push(Arc::downgrade(&source));
151        Subscription::new(source)
152    }
153
154    fn on_change(&self, name: Option<&str>) {
155        // acquire a read-lock and capture any callbacks that are still alive.
156        // do NOT invoke the callback with the read-lock held. the callback might
157        // register a new callback on the same token which will result in a deadlock.
158        // invoking the callbacks after the read-lock is released ensures that won't happen.
159        let callbacks: Vec<_> = self
160            .listeners
161            .read()
162            .unwrap()
163            .iter()
164            .filter_map(|c| c.upgrade())
165            .collect();
166
167        self.cache.try_remove(name);
168
169        for callback in callbacks {
170            callback(name, self.get(name));
171        }
172    }
173}
174
175unsafe impl<T: Value> Send for ChangeTracker<T> {}
176unsafe impl<T: Value> Sync for ChangeTracker<T> {}
177
178struct Producer<T: Value>(Ref<dyn OptionsChangeTokenSource<T>>);
179
180impl<T: Value> Producer<T> {
181    fn new(source: Ref<dyn OptionsChangeTokenSource<T>>) -> Self {
182        Self(source)
183    }
184}
185
186impl<T: Value> Deref for Producer<T> {
187    type Target = dyn OptionsChangeTokenSource<T>;
188
189    fn deref(&self) -> &Self::Target {
190        self.0.deref()
191    }
192}
193
194unsafe impl<T: Value> Send for Producer<T> {}
195unsafe impl<T: Value> Sync for Producer<T> {}
196
197#[cfg(test)]
198mod tests {
199
200    use super::*;
201    use crate::*;
202    use std::{
203        cell::RefCell,
204        sync::atomic::{AtomicBool, AtomicU8, Ordering},
205    };
206    use tokens::{ChangeToken, SharedChangeToken, SingleChangeToken};
207
208    #[derive(Default)]
209    struct Config {
210        retries: u8,
211    }
212
213    pub struct OptionsState {
214        dirty: AtomicBool,
215    }
216
217    impl OptionsState {
218        fn is_dirty(&self) -> bool {
219            self.dirty.load(Ordering::SeqCst)
220        }
221
222        fn mark_dirty(&self) {
223            self.dirty.store(true, Ordering::SeqCst)
224        }
225
226        fn reset(&self) {
227            self.dirty.store(false, Ordering::SeqCst)
228        }
229    }
230
231    impl Default for OptionsState {
232        fn default() -> Self {
233            Self {
234                dirty: AtomicBool::new(true),
235            }
236        }
237    }
238
239    #[derive(Default)]
240    struct ConfigSetup {
241        counter: AtomicU8,
242    }
243
244    impl ConfigureOptions<Config> for ConfigSetup {
245        fn configure(&self, name: Option<&str>, options: &mut Config) {
246            if name.is_none() {
247                let retries = self.counter.fetch_add(1, Ordering::SeqCst) + 1;
248                options.retries = retries;
249            }
250        }
251    }
252
253    #[derive(Default)]
254    struct ConfigSource {
255        token: SharedChangeToken<SingleChangeToken>,
256    }
257
258    impl ConfigSource {
259        fn changed(&self) {
260            self.token.notify()
261        }
262    }
263
264    impl OptionsChangeTokenSource<Config> for ConfigSource {
265        fn token(&self) -> Box<dyn ChangeToken> {
266            Box::new(self.token.clone())
267        }
268    }
269
270    struct Foo {
271        monitor: Ref<dyn OptionsMonitor<Config>>,
272        _sub: Subscription<Config>,
273        state: Arc<OptionsState>,
274        retries: RefCell<u8>,
275    }
276
277    impl Foo {
278        fn new(monitor: Ref<dyn OptionsMonitor<Config>>) -> Self {
279            let state = Arc::new(OptionsState::default());
280            let other = state.clone();
281
282            Self {
283                monitor: monitor.clone(),
284                _sub: monitor.on_change(Box::new(
285                    move |_name: Option<&str>, _options: Ref<Config>| other.mark_dirty(),
286                )),
287                state,
288                retries: RefCell::default(),
289            }
290        }
291
292        fn retries(&self) -> u8 {
293            if self.state.is_dirty() {
294                *self.retries.borrow_mut() = self.monitor.current_value().retries;
295                self.state.reset();
296            }
297
298            self.retries.borrow().clone()
299        }
300    }
301
302    #[test]
303    fn monitored_options_should_update_when_source_changes() {
304        // arrange
305        let cache = Ref::new(OptionsCache::<Config>::default());
306        let setup = Ref::new(ConfigSetup::default());
307        let factory = Ref::new(DefaultOptionsFactory::new(
308            vec![setup],
309            Vec::default(),
310            Vec::default(),
311        ));
312        let source = Ref::new(ConfigSource::default());
313        let monitor = Ref::new(DefaultOptionsMonitor::new(
314            cache,
315            vec![source.clone()],
316            factory,
317        ));
318        let foo = Foo::new(monitor);
319        let initial = foo.retries();
320
321        // act
322        source.changed();
323
324        // assert
325        assert_eq!(initial, 1);
326        assert_eq!(foo.retries(), 2);
327    }
328}