Skip to main content

options/
monitor.rs

1use crate::{OptionsChangeTokenSource, OptionsFactory, OptionsMonitorCache, Ref, Value};
2use cfg_if::cfg_if;
3use std::sync::{Arc, RwLock, Weak};
4
5cfg_if! {
6    if #[cfg(not(feature = "async"))] {
7        use std::cell::RefCell;
8        use tokens::ChangeToken;
9    }
10}
11
12type Callback<T> = dyn Fn(Option<&str>, Ref<T>) + Send + Sync;
13
14/// Represents a change subscription.
15///
16/// # Remarks
17///
18/// When the subscription is dropped, the underlying callback is unsubscribed.
19pub struct Subscription<T: Value>(#[allow(unused)] Arc<Callback<T>>);
20
21impl<T: Value> Subscription<T> {
22    /// Initializes a new change token registration.
23    ///
24    /// # Arguments
25    ///
26    /// * `callback` - The subscription callback function
27    #[inline]
28    pub fn new(callback: Arc<Callback<T>>) -> Self {
29        Self(callback)
30    }
31}
32
33/// Defines the behavior for notifications when [`Options`](crate::Options) instances change.
34#[cfg_attr(feature = "async", maybe_impl::traits(Send, Sync))]
35pub trait OptionsMonitor<T: Value> {
36    /// Returns the current instance with the default options name.
37    fn current_value(&self) -> Ref<T> {
38        self.get(None)
39    }
40
41    /// Returns a configured instance with the given name.
42    ///
43    /// # Arguments
44    ///
45    /// * `name` - The name associated with the options.
46    fn get(&self, name: Option<&str>) -> Ref<T>;
47
48    /// Registers a callback function to be invoked when the configured instance with the given name changes.
49    ///
50    /// # Arguments
51    ///
52    /// * `changed` - The callback function to invoke
53    ///
54    /// # Returns
55    ///
56    /// A change subscription for the specified options. When the subscription is dropped, no further
57    /// notifications will be propagated.
58    fn on_change(&self, changed: Box<Callback<T>>) -> Subscription<T>;
59}
60
61/// Represents the default implementation for notifications when option instances change.
62pub struct DefaultOptionsMonitor<T: Value> {
63    tracker: Arc<ChangeTracker<T>>,
64    _subscriptions: Vec<Box<dyn tokens::Subscription>>,
65}
66
67#[cfg(feature = "async")]
68impl<T: Value + 'static> DefaultOptionsMonitor<T> {
69    /// Initializes a new default options monitor.
70    ///
71    /// # Arguments
72    ///
73    /// * `cache` - The [cache](crate::OptionsMonitorCache) used for monitored options
74    /// * `sources` - The [source tokens](crate::OptionsChangeTokenSource) used to track option changes
75    /// * `factory` - The [factory](crate::OptionsFactory) used to create new options
76    pub fn new(
77        cache: Ref<dyn OptionsMonitorCache<T>>,
78        sources: Vec<Ref<dyn OptionsChangeTokenSource<T>>>,
79        factory: Ref<dyn OptionsFactory<T>>,
80    ) -> Self {
81        let tracker = Arc::new(ChangeTracker::new(cache, factory));
82        let mut subscriptions = Vec::new();
83
84        for source in sources {
85            let producer = Producer::new(source.clone());
86            let consumer = tracker.clone();
87            let state = source.name().map(|n| Arc::new(n.to_owned()));
88            let subscription: Box<dyn tokens::Subscription> = Box::new(tokens::on_change(
89                move || producer.token(),
90                move |state| {
91                    if let Some(name) = state {
92                        consumer.on_change(Some(name.as_str()));
93                    } else {
94                        consumer.on_change(None);
95                    };
96                },
97                state,
98            ));
99            subscriptions.push(subscription);
100        }
101
102        Self {
103            tracker,
104            _subscriptions: subscriptions,
105        }
106    }
107}
108
109#[cfg(not(feature = "async"))]
110impl<T: Value + 'static> DefaultOptionsMonitor<T> {
111    /// Initializes a new default options monitor.
112    ///
113    /// # Arguments
114    ///
115    /// * `cache` - The [cache](crate::OptionsMonitorCache) used for monitored options
116    /// * `sources` - The [source tokens](crate::OptionsChangeTokenSource) used to track option changes
117    /// * `factory` - The [factory](crate::OptionsFactory) used to create new options
118    pub fn new(
119        cache: Ref<dyn OptionsMonitorCache<T>>,
120        sources: Vec<Ref<dyn OptionsChangeTokenSource<T>>>,
121        factory: Ref<dyn OptionsFactory<T>>,
122    ) -> Self {
123        Self {
124            tracker: Arc::new(ChangeTracker::new(cache, sources, factory)),
125            _subscriptions: Vec::new(),
126        }
127    }
128}
129
130impl<T: Value> OptionsMonitor<T> for DefaultOptionsMonitor<T> {
131    fn get(&self, name: Option<&str>) -> Ref<T> {
132        cfg_if! {
133            if #[cfg(not(feature = "async"))] {
134                self.tracker.check_for_changes();
135            }
136        }
137
138        self.tracker.get(name)
139    }
140
141    #[inline]
142    fn on_change(&self, changed: Box<Callback<T>>) -> Subscription<T> {
143        self.tracker.add(changed)
144    }
145}
146
147struct ChangeTracker<T: Value> {
148    cache: Ref<dyn OptionsMonitorCache<T>>,
149    factory: Ref<dyn OptionsFactory<T>>,
150    listeners: RwLock<Vec<Weak<Callback<T>>>>,
151
152    #[cfg(not(feature = "async"))]
153    sources: Vec<Ref<dyn OptionsChangeTokenSource<T>>>,
154
155    #[cfg(not(feature = "async"))]
156    tokens: RefCell<Vec<Box<dyn ChangeToken>>>,
157
158    /// tracks whether each source's current token change has already been processed, which prevents re-firing when
159    /// `source.token()` returns a token that is already in the "changed" state
160    #[cfg(not(feature = "async"))]
161    processed: RefCell<Vec<bool>>,
162}
163
164impl<T: Value> ChangeTracker<T> {
165    fn get(&self, name: Option<&str>) -> Ref<T> {
166        self.cache
167            .get_or_add(name, &|n| self.factory.create(n).unwrap_or_else(|e| panic!("{}", e)))
168    }
169
170    fn add(&self, listener: Box<Callback<T>>) -> Subscription<T> {
171        let mut listeners = self.listeners.write().unwrap();
172
173        // trim any dead callbacks while holding the write-lock
174        for i in (0..listeners.len()).rev() {
175            if listeners[i].upgrade().is_none() {
176                listeners.remove(i);
177            }
178        }
179
180        let source: Arc<Callback<T>> = Arc::from(listener);
181
182        listeners.push(Arc::downgrade(&source));
183        Subscription::new(source)
184    }
185
186    fn on_change(&self, name: Option<&str>) {
187        // acquire a read-lock and capture any callbacks that are still alive. do NOT invoke the callback with the
188        // read-lock held. the callback might register a new callback on the same token which will result in a deadlock.
189        // invoking the callbacks after the read-lock is released ensures that won't happen.
190        let callbacks: Vec<_> = self
191            .listeners
192            .read()
193            .unwrap()
194            .iter()
195            .filter_map(|c| c.upgrade())
196            .collect();
197
198        self.cache.try_remove(name);
199
200        for callback in callbacks {
201            callback(name, self.get(name));
202        }
203    }
204}
205
206#[cfg(feature = "async")]
207impl<T: Value> ChangeTracker<T> {
208    #[inline]
209    fn new(cache: Ref<dyn OptionsMonitorCache<T>>, factory: Ref<dyn OptionsFactory<T>>) -> Self {
210        Self {
211            cache,
212            factory,
213            listeners: Default::default(),
214        }
215    }
216}
217
218#[cfg(not(feature = "async"))]
219impl<T: Value> ChangeTracker<T> {
220    fn new(
221        cache: Ref<dyn OptionsMonitorCache<T>>,
222        sources: Vec<Ref<dyn OptionsChangeTokenSource<T>>>,
223        factory: Ref<dyn OptionsFactory<T>>,
224    ) -> Self {
225        let len = sources.len();
226        let tokens = sources.iter().map(|s| s.token()).collect();
227        Self {
228            cache,
229            factory,
230            listeners: Default::default(),
231            sources,
232            tokens: RefCell::new(tokens),
233            processed: RefCell::new(vec![false; len]),
234        }
235    }
236
237    fn check_for_changes(&self) {
238        let mut tokens = self.tokens.borrow_mut();
239        let mut processed = self.processed.borrow_mut();
240
241        for (i, source) in self.sources.iter().enumerate() {
242            if tokens[i].changed() && !processed[i] {
243                self.on_change(source.name());
244
245                let new_token = source.token();
246
247                // if the new token is already changed (e.g. SharedChangeToken shares state with the old one), mark it
248                // as processed so we don't re-fire on the next check_for_changes() call.
249                processed[i] = new_token.changed();
250                tokens[i] = new_token;
251            }
252        }
253    }
254}
255
256cfg_if! {
257    if #[cfg(feature = "async")] {
258        struct Producer<T: Value>(Ref<dyn OptionsChangeTokenSource<T>>);
259
260        impl<T: Value> Producer<T> {
261            #[inline]
262            fn new(source: Ref<dyn OptionsChangeTokenSource<T>>) -> Self {
263                Self(source)
264            }
265        }
266
267        impl<T: Value> std::ops::Deref for Producer<T> {
268            type Target = dyn OptionsChangeTokenSource<T>;
269
270            #[inline]
271            fn deref(&self) -> &Self::Target {
272                self.0.deref()
273            }
274        }
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use crate::*;
282    use std::{
283        cell::RefCell,
284        sync::{
285            atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
286            Mutex,
287        },
288    };
289    use tokens::{ChangeToken, SharedChangeToken, SingleChangeToken};
290
291    #[derive(Default)]
292    struct Config {
293        retries: u8,
294    }
295
296    pub struct OptionsState {
297        dirty: AtomicBool,
298    }
299
300    impl OptionsState {
301        fn is_dirty(&self) -> bool {
302            self.dirty.load(Ordering::SeqCst)
303        }
304
305        fn mark_dirty(&self) {
306            self.dirty.store(true, Ordering::SeqCst)
307        }
308
309        fn reset(&self) {
310            self.dirty.store(false, Ordering::SeqCst)
311        }
312    }
313
314    impl Default for OptionsState {
315        fn default() -> Self {
316            Self {
317                dirty: AtomicBool::new(true),
318            }
319        }
320    }
321
322    #[derive(Default)]
323    struct ConfigSetup {
324        counter: AtomicU8,
325    }
326
327    impl ConfigureOptions<Config> for ConfigSetup {
328        fn configure(&self, name: Option<&str>, options: &mut Config) {
329            if name.is_none() {
330                let retries = self.counter.fetch_add(1, Ordering::SeqCst) + 1;
331                options.retries = retries;
332            }
333        }
334    }
335
336    #[derive(Default)]
337    struct ConfigSource {
338        token: SharedChangeToken<SingleChangeToken>,
339    }
340
341    impl ConfigSource {
342        fn changed(&self) {
343            self.token.notify()
344        }
345    }
346
347    impl OptionsChangeTokenSource<Config> for ConfigSource {
348        fn token(&self) -> Box<dyn ChangeToken> {
349            Box::new(self.token.clone())
350        }
351    }
352
353    struct Foo {
354        monitor: Ref<dyn OptionsMonitor<Config>>,
355        _sub: Subscription<Config>,
356        state: Arc<OptionsState>,
357        retries: RefCell<u8>,
358    }
359
360    impl Foo {
361        fn new(monitor: Ref<dyn OptionsMonitor<Config>>) -> Self {
362            let state = Arc::new(OptionsState::default());
363            let other = state.clone();
364
365            Self {
366                monitor: monitor.clone(),
367                _sub: monitor.on_change(Box::new(move |_name: Option<&str>, _options: Ref<Config>| {
368                    other.mark_dirty()
369                })),
370                state,
371                retries: RefCell::default(),
372            }
373        }
374
375        fn retries(&self) -> u8 {
376            if self.state.is_dirty() {
377                *self.retries.borrow_mut() = self.monitor.current_value().retries;
378                self.state.reset();
379            }
380
381            self.retries.borrow().clone()
382        }
383    }
384
385    fn new_monitor() -> (Ref<dyn OptionsMonitor<Config>>, Ref<ConfigSource>, Ref<ConfigSetup>) {
386        let cache = Ref::new(OptionsCache::<Config>::default());
387        let setup = Ref::new(ConfigSetup::default());
388        let factory = Ref::new(DefaultOptionsFactory::new(
389            vec![setup.clone()],
390            Vec::default(),
391            Vec::default(),
392        ));
393        let source = Ref::new(ConfigSource::default());
394        let monitor: Ref<dyn OptionsMonitor<Config>> =
395            Ref::new(DefaultOptionsMonitor::new(cache, vec![source.clone()], factory));
396        (monitor, source, setup)
397    }
398
399    /// A named [OptionsChangeTokenSource] for testing multi-source scenarios.
400    struct NamedConfigSource {
401        name: String,
402        token: SharedChangeToken<SingleChangeToken>,
403    }
404
405    impl NamedConfigSource {
406        fn new(name: &str) -> Self {
407            Self {
408                name: name.to_owned(),
409                token: SharedChangeToken::default(),
410            }
411        }
412
413        fn changed(&self) {
414            self.token.notify()
415        }
416    }
417
418    impl OptionsChangeTokenSource<Config> for NamedConfigSource {
419        fn token(&self) -> Box<dyn ChangeToken> {
420            Box::new(self.token.clone())
421        }
422
423        fn name(&self) -> Option<&str> {
424            Some(&self.name)
425        }
426    }
427
428    /// A [ConfigureOptions] that sets retries based on name using an atomic counter per name to track how many times
429    /// the factory has been called.
430    struct NamedConfigSetup {
431        a: AtomicU8,
432        b: AtomicU8,
433    }
434
435    impl Default for NamedConfigSetup {
436        fn default() -> Self {
437            Self {
438                a: AtomicU8::new(0),
439                b: AtomicU8::new(0),
440            }
441        }
442    }
443
444    impl ConfigureOptions<Config> for NamedConfigSetup {
445        fn configure(&self, name: Option<&str>, options: &mut Config) {
446            match name {
447                Some("a") => {
448                    options.retries = self.a.fetch_add(1, Ordering::SeqCst) + 10;
449                }
450                Some("b") => {
451                    options.retries = self.b.fetch_add(1, Ordering::SeqCst) + 20;
452                }
453                _ => {}
454            }
455        }
456    }
457
458    #[test]
459    fn monitored_options_should_update_when_source_changes() {
460        // arrange
461        let cache = Ref::new(OptionsCache::<Config>::default());
462        let setup = Ref::new(ConfigSetup::default());
463        let factory = Ref::new(DefaultOptionsFactory::new(vec![setup], Vec::default(), Vec::default()));
464        let source = Ref::new(ConfigSource::default());
465        let monitor: Ref<dyn OptionsMonitor<Config>> =
466            Ref::new(DefaultOptionsMonitor::new(cache, vec![source.clone()], factory));
467        let foo = Foo::new(monitor.clone());
468        let initial = foo.retries();
469
470        // act
471        source.changed();
472
473        let _ = monitor.get(None);
474
475        // assert
476        assert_eq!(initial, 1);
477        assert_eq!(foo.retries(), 2);
478    }
479
480    #[test]
481    fn get_none_returns_factory_created_value() {
482        // arrange
483        let (monitor, source, _setup) = new_monitor();
484
485        let first = monitor.get(None);
486        assert_eq!(first.retries, 1);
487
488        let cached = monitor.get(None);
489        assert_eq!(cached.retries, 1);
490
491        // act
492        source.changed();
493
494        // assert
495        let updated = monitor.get(None);
496
497        assert_eq!(updated.retries, 2);
498    }
499
500    #[test]
501    fn on_change_callbacks_fire_with_correct_name_and_value() {
502        // arrange
503        let (monitor, source, _setup) = new_monitor();
504        let _ = monitor.get(None);
505        let observed_name: Arc<Mutex<Option<Option<String>>>> = Arc::new(Mutex::new(None));
506        let observed_retries: Arc<Mutex<Option<u8>>> = Arc::new(Mutex::new(None));
507        let name_clone = observed_name.clone();
508        let retries_clone = observed_retries.clone();
509
510        let _sub = monitor.on_change(Box::new(move |name: Option<&str>, opts: Ref<Config>| {
511            *name_clone.lock().unwrap() = Some(name.map(|s| s.to_owned()));
512            *retries_clone.lock().unwrap() = Some(opts.retries);
513        }));
514
515        // act
516        source.changed();
517        let _ = monitor.get(None);
518
519        // assert
520        let name_val = observed_name.lock().unwrap();
521
522        assert_eq!(
523            *name_val,
524            Some(None),
525            "callback should receive name=None for unnamed source"
526        );
527
528        let retries_val = observed_retries.lock().unwrap();
529
530        assert_eq!(*retries_val, Some(2), "callback should receive updated retries value");
531    }
532
533    #[test]
534    fn dropping_subscription_prevents_further_callbacks() {
535        // arrange
536        let (monitor, source, _setup) = new_monitor();
537        let _ = monitor.get(None);
538        let call_count = Arc::new(AtomicU32::new(0));
539        let count_clone = call_count.clone();
540        let sub = monitor.on_change(Box::new(move |_name: Option<&str>, _opts: Ref<Config>| {
541            count_clone.fetch_add(1, Ordering::SeqCst);
542        }));
543
544        // act
545        source.changed();
546        let _ = monitor.get(None);
547
548        assert_eq!(
549            call_count.load(Ordering::SeqCst),
550            1,
551            "callback should fire once after first change"
552        );
553
554        drop(sub);
555
556        // act
557        source.changed();
558        let _ = monitor.get(None);
559
560        // assert
561        assert_eq!(
562            call_count.load(Ordering::SeqCst),
563            1,
564            "callback should not fire after subscription is dropped"
565        );
566    }
567
568    #[test]
569    fn multiple_sources_changing_one_only_invalidates_that_source() {
570        // arrange
571        let cache = Ref::new(OptionsCache::<Config>::default());
572        let setup = Ref::new(NamedConfigSetup::default());
573        let factory = Ref::new(DefaultOptionsFactory::new(vec![setup], Vec::default(), Vec::default()));
574        let source_a = Ref::new(NamedConfigSource::new("a"));
575        let source_b = Ref::new(NamedConfigSource::new("b"));
576        let monitor: Ref<dyn OptionsMonitor<Config>> = Ref::new(DefaultOptionsMonitor::new(
577            cache,
578            vec![source_a.clone(), source_b.clone()],
579            factory,
580        ));
581        let val_a = monitor.get(Some("a"));
582        let val_b = monitor.get(Some("b"));
583
584        assert_eq!(val_a.retries, 10, "source a initial retries");
585        assert_eq!(val_b.retries, 20, "source b initial retries");
586
587        let callback_names: Arc<Mutex<Vec<Option<String>>>> = Arc::new(Mutex::new(Vec::new()));
588        let names_clone = callback_names.clone();
589        let _sub = monitor.on_change(Box::new(move |name: Option<&str>, _opts: Ref<Config>| {
590            names_clone.lock().unwrap().push(name.map(|s| s.to_owned()));
591        }));
592
593        // act
594        source_a.changed();
595
596        let _ = monitor.get(Some("a"));
597
598        // assert
599        let names = callback_names.lock().unwrap();
600        assert_eq!(names.len(), 1, "only one callback should fire");
601        assert_eq!(names[0], Some("a".to_owned()), "callback should fire for source a");
602        drop(names);
603
604        let val_a_updated = monitor.get(Some("a"));
605        assert_eq!(val_a_updated.retries, 11, "source a should be invalidated → new value");
606
607        let val_b_same = monitor.get(Some("b"));
608        assert_eq!(val_b_same.retries, 20, "source b should still be cached → same value");
609    }
610}