Skip to main content

options/
monitor.rs

1use crate::{validation::Error, Cache, ChangeTokenSource, DefaultFactory, Factory, Ref, Value};
2use cfg_if::cfg_if;
3use std::any::type_name;
4use std::sync::{Arc, RwLock, Weak};
5use tracing::{error, trace};
6
7cfg_if! {
8    if #[cfg(not(feature = "async"))] {
9        use std::cell::RefCell;
10        use tokens::ChangeToken;
11    }
12}
13
14type Callback<T> = dyn Fn(&str, Ref<T>) + Send + Sync;
15
16/// Represents a change subscription.
17///
18/// # Remarks
19///
20/// When the subscription is dropped, the underlying callback is unsubscribed.
21pub struct Subscription<T: Value>(#[allow(unused)] Arc<Callback<T>>);
22
23impl<T: Value> Subscription<T> {
24    /// Initializes a new change token registration.
25    ///
26    /// # Arguments
27    ///
28    /// * `callback` - The subscription callback function
29    #[inline]
30    pub fn new(callback: Arc<Callback<T>>) -> Self {
31        Self(callback)
32    }
33}
34
35/// Defines the behavior for notifications when options instances change.
36#[cfg_attr(feature = "async", maybe_impl::traits(Send, Sync))]
37pub trait Monitor<T: Value> {
38    /// Gets the default, unnamed configuration options.
39    fn get(&self) -> Result<Ref<T>, Error> {
40        self.get_named("")
41    }
42
43    /// Gets the default, unnamed configuration options.
44    ///
45    /// # Remarks
46    ///
47    /// This function panics if the configuration options could not be successfully retrieved.
48    fn get_unchecked(&self) -> Ref<T> {
49        match self.get_named("") {
50            Ok(value) => value,
51            Err(error) => {
52                error!("{error:?}");
53                panic!("{}", error)
54            }
55        }
56    }
57
58    /// Gets the configuration options with the given name.
59    ///
60    /// # Arguments
61    ///
62    /// * `name` - The name associated with the options.
63    fn get_named(&self, name: &str) -> Result<Ref<T>, Error>;
64
65    /// Gets the configuration options with the given name.
66    ///
67    /// # Arguments
68    ///
69    /// * `name` - The optional name of the options to retrieve
70    ///
71    /// # Remarks
72    ///
73    /// This function panics if the configuration options could not be successfully retrieved.
74    fn get_named_unchecked(&self, name: &str) -> Ref<T> {
75        match self.get_named(name) {
76            Ok(value) => value,
77            Err(error) => {
78                error!("[{name}] {error:?}");
79                panic!("[{name}] {}", error)
80            }
81        }
82    }
83
84    /// Registers a callback function to be invoked when the configured instance with the given name changes.
85    ///
86    /// # Arguments
87    ///
88    /// * `changed` - The callback function to invoke
89    ///
90    /// # Returns
91    ///
92    /// A change subscription for the specified options. When the subscription is dropped, no further notifications
93    /// will be propagated.
94    #[must_use = "no change notifications occur after the subscription is dropped"]
95    fn on_change(&self, changed: Box<Callback<T>>) -> Subscription<T>;
96}
97
98/// Represents the default implementation for notifications when option instances change.
99pub struct DefaultMonitor<T: Value> {
100    tracker: Arc<ChangeTracker<T>>,
101    _subscriptions: Vec<Box<dyn tokens::Subscription>>,
102}
103
104#[cfg(feature = "async")]
105impl<T: Value + 'static> DefaultMonitor<T> {
106    /// Initializes a new default options monitor.
107    ///
108    /// # Arguments
109    ///
110    /// * `cache` - The [cache](crate::Cache) used for monitored options
111    /// * `sources` - The [source tokens](crate::ChangeTokenSource) used to track option changes
112    /// * `factory` - The [factory](crate::Factory) used to create new options
113    pub fn new(
114        cache: Ref<Cache<T>>,
115        sources: Vec<Ref<dyn ChangeTokenSource<T>>>,
116        factory: Ref<dyn Factory<T>>,
117    ) -> Self {
118        let tracker = Arc::new(ChangeTracker::new(cache, factory));
119        let mut subscriptions = Vec::new();
120
121        for source in sources {
122            let producer = Producer::new(source.clone());
123            let consumer = tracker.clone();
124            let state = Arc::new(source.name().to_owned());
125            let subscription: Box<dyn tokens::Subscription> = Box::new(tokens::on_change(
126                move || producer.token(),
127                move |state| {
128                    let kind = type_name::<T>().rsplit_once("::").unwrap().1;
129
130                    if let Some(name) = state {
131                        trace!("{} ({name}) have changed", kind);
132                        consumer.on_change(&name);
133                    } else {
134                        trace!("{} options have changed", kind);
135                        consumer.on_change("");
136                    };
137                },
138                Some(state),
139            ));
140            subscriptions.push(subscription);
141        }
142
143        Self {
144            tracker,
145            _subscriptions: subscriptions,
146        }
147    }
148}
149
150#[cfg(not(feature = "async"))]
151impl<T: Value + 'static> DefaultMonitor<T> {
152    /// Initializes a new default options monitor.
153    ///
154    /// # Arguments
155    ///
156    /// * `cache` - The [cache](crate::Cache) used for monitored options
157    /// * `sources` - The [source tokens](crate::ChangeTokenSource) used to track option changes
158    /// * `factory` - The [factory](crate::Factory) used to create new options
159    pub fn new(
160        cache: Ref<Cache<T>>,
161        sources: Vec<Ref<dyn ChangeTokenSource<T>>>,
162        factory: Ref<dyn Factory<T>>,
163    ) -> Self {
164        Self {
165            tracker: Arc::new(ChangeTracker::new(cache, sources, factory)),
166            _subscriptions: Vec::new(),
167        }
168    }
169}
170
171impl<T: Value> Monitor<T> for DefaultMonitor<T> {
172    fn get_named(&self, name: &str) -> Result<Ref<T>, Error> {
173        cfg_if! {
174            if #[cfg(not(feature = "async"))] {
175                self.tracker.check_for_changes();
176            }
177        }
178
179        self.tracker.get(name)
180    }
181
182    #[inline]
183    fn on_change(&self, changed: Box<Callback<T>>) -> Subscription<T> {
184        self.tracker.add(changed)
185    }
186}
187
188struct ChangeTracker<T: Value> {
189    cache: Ref<Cache<T>>,
190    factory: Ref<dyn Factory<T>>,
191    listeners: RwLock<Vec<Weak<Callback<T>>>>,
192
193    #[cfg(not(feature = "async"))]
194    sources: Vec<Ref<dyn ChangeTokenSource<T>>>,
195
196    #[cfg(not(feature = "async"))]
197    tokens: RefCell<Vec<Box<dyn ChangeToken>>>,
198
199    /// tracks whether each source's current token change has already been processed, which prevents re-firing when
200    /// `source.token()` returns a token that is already in the "changed" state
201    #[cfg(not(feature = "async"))]
202    processed: RefCell<Vec<bool>>,
203}
204
205impl<T: Value> ChangeTracker<T> {
206    fn get(&self, name: &str) -> Result<Ref<T>, Error> {
207        self.cache.get_or_add(name, &|n| self.factory.create(n))
208    }
209
210    fn add(&self, listener: Box<Callback<T>>) -> Subscription<T> {
211        let mut listeners = self.listeners.write().unwrap();
212
213        // trim any dead callbacks while holding the write-lock
214        for i in (0..listeners.len()).rev() {
215            if listeners[i].upgrade().is_none() {
216                listeners.remove(i);
217            }
218        }
219
220        let source: Arc<Callback<T>> = Arc::from(listener);
221
222        listeners.push(Arc::downgrade(&source));
223        Subscription::new(source)
224    }
225
226    fn on_change(&self, name: &str) {
227        // acquire a read-lock and capture any callbacks that are still alive. do NOT invoke the callback with the
228        // read-lock held. the callback might register a new callback on the same token which will result in a deadlock.
229        // invoking the callbacks after the read-lock is released ensures that won't happen.
230        let callbacks: Vec<_> = self
231            .listeners
232            .read()
233            .unwrap()
234            .iter()
235            .filter_map(|c| c.upgrade())
236            .collect();
237
238        self.cache.remove(name);
239
240        for callback in callbacks {
241            if let Ok(options) = self.get(name) {
242                callback(name, options);
243            }
244        }
245    }
246}
247
248#[cfg(feature = "async")]
249impl<T: Value> ChangeTracker<T> {
250    #[inline]
251    fn new(cache: Ref<Cache<T>>, factory: Ref<dyn Factory<T>>) -> Self {
252        Self {
253            cache,
254            factory,
255            listeners: Default::default(),
256        }
257    }
258}
259
260#[cfg(not(feature = "async"))]
261impl<T: Value> ChangeTracker<T> {
262    fn new(cache: Ref<Cache<T>>, sources: Vec<Ref<dyn ChangeTokenSource<T>>>, factory: Ref<dyn Factory<T>>) -> Self {
263        let len = sources.len();
264        let tokens = sources.iter().map(|s| s.token()).collect();
265
266        Self {
267            cache,
268            factory,
269            listeners: Default::default(),
270            sources,
271            tokens: RefCell::new(tokens),
272            processed: RefCell::new(vec![false; len]),
273        }
274    }
275
276    fn check_for_changes(&self) {
277        let mut tokens = self.tokens.borrow_mut();
278        let mut processed = self.processed.borrow_mut();
279
280        for (i, source) in self.sources.iter().enumerate() {
281            if tokens[i].changed() && !processed[i] {
282                let kind = type_name::<T>().rsplit_once("::").unwrap().1;
283                let name = source.name();
284
285                if name.is_empty() {
286                    trace!("{} have changed", kind);
287                } else {
288                    trace!("{} ({name}) have changed", kind);
289                }
290
291                self.on_change(source.name());
292
293                let new_token = source.token();
294
295                // if the new token is already changed (e.g. SharedChangeToken shares state with the old one), mark it
296                // as processed so we don't re-fire on the next check_for_changes() call.
297                processed[i] = new_token.changed();
298                tokens[i] = new_token;
299            }
300        }
301    }
302}
303
304cfg_if! {
305    if #[cfg(feature = "async")] {
306        struct Producer<T: Value>(Ref<dyn ChangeTokenSource<T>>);
307
308        impl<T: Value> Producer<T> {
309            #[inline]
310            fn new(source: Ref<dyn ChangeTokenSource<T>>) -> Self {
311                Self(source)
312            }
313        }
314
315        impl<T: Value> std::ops::Deref for Producer<T> {
316            type Target = dyn ChangeTokenSource<T>;
317
318            #[inline]
319            fn deref(&self) -> &Self::Target {
320                self.0.deref()
321            }
322        }
323    }
324}
325
326impl<T: Value + Default + 'static> From<DefaultFactory<T>> for DefaultMonitor<T> {
327    #[inline]
328    fn from(factory: DefaultFactory<T>) -> Self {
329        Self::new(Default::default(), Default::default(), Ref::new(factory))
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use crate::{Cache, Configure, DefaultFactory};
337    use std::{
338        cell::RefCell,
339        sync::{
340            atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
341            Mutex,
342        },
343    };
344    use tokens::{ChangeToken, SharedChangeToken, SingleChangeToken};
345
346    #[derive(Default)]
347    struct Config {
348        retries: u8,
349    }
350
351    pub struct OptionsState {
352        dirty: AtomicBool,
353    }
354
355    impl OptionsState {
356        #[inline]
357        fn is_dirty(&self) -> bool {
358            self.dirty.load(Ordering::SeqCst)
359        }
360
361        #[inline]
362        fn mark_dirty(&self) {
363            self.dirty.store(true, Ordering::SeqCst)
364        }
365
366        #[inline]
367        fn reset(&self) {
368            self.dirty.store(false, Ordering::SeqCst)
369        }
370    }
371
372    impl Default for OptionsState {
373        #[inline]
374        fn default() -> Self {
375            Self {
376                dirty: AtomicBool::new(true),
377            }
378        }
379    }
380
381    #[derive(Default)]
382    struct ConfigSetup {
383        counter: AtomicU8,
384    }
385
386    impl Configure<Config> for ConfigSetup {
387        fn run(&self, name: &str, options: &mut Config) {
388            if name.is_empty() {
389                let retries = self.counter.fetch_add(1, Ordering::SeqCst) + 1;
390                options.retries = retries;
391            }
392        }
393    }
394
395    #[derive(Default)]
396    struct ConfigSource {
397        token: SharedChangeToken<SingleChangeToken>,
398    }
399
400    impl ConfigSource {
401        #[inline]
402        fn changed(&self) {
403            self.token.notify()
404        }
405    }
406
407    impl ChangeTokenSource<Config> for ConfigSource {
408        #[inline]
409        fn token(&self) -> Box<dyn ChangeToken> {
410            Box::new(self.token.clone())
411        }
412    }
413
414    struct Foo {
415        monitor: Ref<dyn Monitor<Config>>,
416        _sub: Subscription<Config>,
417        state: Arc<OptionsState>,
418        retries: RefCell<u8>,
419    }
420
421    impl Foo {
422        fn new(monitor: Ref<dyn Monitor<Config>>) -> Self {
423            let state = Arc::new(OptionsState::default());
424            let other = state.clone();
425
426            Self {
427                monitor: monitor.clone(),
428                _sub: monitor.on_change(Box::new(move |_name: &str, _options: Ref<Config>| other.mark_dirty())),
429                state,
430                retries: RefCell::default(),
431            }
432        }
433
434        fn retries(&self) -> u8 {
435            if self.state.is_dirty() {
436                *self.retries.borrow_mut() = self.monitor.get_unchecked().retries;
437                self.state.reset();
438            }
439
440            self.retries.borrow().clone()
441        }
442    }
443
444    fn new_monitor() -> (Ref<dyn Monitor<Config>>, Ref<ConfigSource>, Ref<ConfigSetup>) {
445        let cache = Ref::new(Cache::<Config>::default());
446        let setup = Ref::new(ConfigSetup::default());
447        let factory = Ref::new(DefaultFactory::new(vec![setup.clone()], Vec::default(), Vec::default()));
448        let source = Ref::new(ConfigSource::default());
449        let monitor: Ref<dyn Monitor<Config>> = Ref::new(DefaultMonitor::new(cache, vec![source.clone()], factory));
450        (monitor, source, setup)
451    }
452
453    /// A named [ChangeTokenSource] for testing multi-source scenarios.
454    struct NamedConfigSource {
455        name: String,
456        token: SharedChangeToken<SingleChangeToken>,
457    }
458
459    impl NamedConfigSource {
460        #[inline]
461        fn new(name: &str) -> Self {
462            Self {
463                name: name.to_owned(),
464                token: SharedChangeToken::default(),
465            }
466        }
467
468        #[inline]
469        fn changed(&self) {
470            self.token.notify()
471        }
472    }
473
474    impl ChangeTokenSource<Config> for NamedConfigSource {
475        #[inline]
476        fn token(&self) -> Box<dyn ChangeToken> {
477            Box::new(self.token.clone())
478        }
479
480        #[inline]
481        fn name(&self) -> &str {
482            &self.name
483        }
484    }
485
486    /// A [Configure] that sets retries based on name using an atomic counter per name to track how many times
487    /// the factory has been called.
488    struct NamedConfigSetup {
489        a: AtomicU8,
490        b: AtomicU8,
491    }
492
493    impl Default for NamedConfigSetup {
494        fn default() -> Self {
495            Self {
496                a: AtomicU8::new(0),
497                b: AtomicU8::new(0),
498            }
499        }
500    }
501
502    impl Configure<Config> for NamedConfigSetup {
503        fn run(&self, name: &str, options: &mut Config) {
504            match name {
505                "a" => options.retries = self.a.fetch_add(1, Ordering::SeqCst) + 10,
506                "b" => options.retries = self.b.fetch_add(1, Ordering::SeqCst) + 20,
507                _ => {}
508            }
509        }
510    }
511
512    #[test]
513    fn monitored_options_should_update_when_source_changes() {
514        // arrange
515        let cache = Ref::new(Cache::<Config>::default());
516        let setup = Ref::new(ConfigSetup::default());
517        let factory = Ref::new(DefaultFactory::new(vec![setup], Vec::default(), Vec::default()));
518        let source = Ref::new(ConfigSource::default());
519        let monitor: Ref<dyn Monitor<Config>> = Ref::new(DefaultMonitor::new(cache, vec![source.clone()], factory));
520        let foo = Foo::new(monitor.clone());
521        let initial = foo.retries();
522
523        // act
524        source.changed();
525
526        let _ = monitor.get_unchecked();
527
528        // assert
529        assert_eq!(initial, 1);
530        assert_eq!(foo.retries(), 2);
531    }
532
533    #[test]
534    fn get_none_returns_factory_created_value() {
535        // arrange
536        let (monitor, source, _) = new_monitor();
537
538        let first = monitor.get_unchecked();
539        assert_eq!(first.retries, 1);
540
541        let cached = monitor.get_unchecked();
542        assert_eq!(cached.retries, 1);
543
544        // act
545        source.changed();
546
547        // assert
548        let updated = monitor.get_unchecked();
549
550        assert_eq!(updated.retries, 2);
551    }
552
553    #[test]
554    fn on_change_callbacks_fire_with_correct_name_and_value() {
555        // arrange
556        let (monitor, source, _) = new_monitor();
557        let _ = monitor.get();
558        let observed_name: Arc<Mutex<String>> = Arc::new(Mutex::new(String::new()));
559        let observed_retries: Arc<Mutex<u8>> = Arc::new(Mutex::new(0));
560        let name_clone = observed_name.clone();
561        let retries_clone = observed_retries.clone();
562        let _sub = monitor.on_change(Box::new(move |name, opts| {
563            *name_clone.lock().unwrap() = name.to_owned();
564            *retries_clone.lock().unwrap() = opts.retries;
565        }));
566
567        // act
568        source.changed();
569        let _ = monitor.get();
570
571        // assert
572        let name_val = observed_name.lock().unwrap();
573
574        assert_eq!(*name_val, "", "callback should receive '' for unnamed source");
575
576        let retries_val = observed_retries.lock().unwrap();
577
578        assert_eq!(*retries_val, 2, "callback should receive updated retries value");
579    }
580
581    #[test]
582    fn dropping_subscription_prevents_further_callbacks() {
583        // arrange
584        let (monitor, source, _setup) = new_monitor();
585        let _ = monitor.get();
586        let call_count = Arc::new(AtomicU32::new(0));
587        let count_clone = call_count.clone();
588        let sub = monitor.on_change(Box::new(move |_, _| {
589            count_clone.fetch_add(1, Ordering::SeqCst);
590        }));
591
592        // act
593        source.changed();
594        let _ = monitor.get();
595
596        assert_eq!(
597            call_count.load(Ordering::SeqCst),
598            1,
599            "callback should fire once after first change"
600        );
601
602        drop(sub);
603
604        // act
605        source.changed();
606        let _ = monitor.get();
607
608        // assert
609        assert_eq!(
610            call_count.load(Ordering::SeqCst),
611            1,
612            "callback should not fire after subscription is dropped"
613        );
614    }
615
616    #[test]
617    fn multiple_sources_changing_one_only_invalidates_that_source() {
618        // arrange
619        let cache = Ref::new(Cache::<Config>::default());
620        let setup = Ref::new(NamedConfigSetup::default());
621        let factory = Ref::new(DefaultFactory::new(vec![setup], Vec::default(), Vec::default()));
622        let source_a = Ref::new(NamedConfigSource::new("a"));
623        let source_b = Ref::new(NamedConfigSource::new("b"));
624        let monitor: Ref<dyn Monitor<Config>> = Ref::new(DefaultMonitor::new(
625            cache,
626            vec![source_a.clone(), source_b.clone()],
627            factory,
628        ));
629        let val_a = monitor.get_named_unchecked("a");
630        let val_b = monitor.get_named_unchecked("b");
631
632        assert_eq!(val_a.retries, 10, "source a initial retries");
633        assert_eq!(val_b.retries, 20, "source b initial retries");
634
635        let callback_names: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
636        let names_clone = callback_names.clone();
637        let _sub = monitor.on_change(Box::new(move |name, _| {
638            names_clone.lock().unwrap().push(name.to_owned());
639        }));
640
641        // act
642        source_a.changed();
643
644        let _ = monitor.get_named("a");
645
646        // assert
647        let names = callback_names.lock().unwrap();
648        assert_eq!(names.len(), 1, "only one callback should fire");
649        assert_eq!(names[0], "a", "callback should fire for source a");
650        drop(names);
651
652        let val_a_updated = monitor.get_named_unchecked("a");
653        assert_eq!(val_a_updated.retries, 11, "source a should be invalidated → new value");
654
655        let val_b_same = monitor.get_named_unchecked("b");
656        assert_eq!(val_b_same.retries, 20, "source b should still be cached → same value");
657    }
658}