1use crate::{validation::Error, Cache, ChangeTokenSource, DefaultFactory, Factory, Ref, Value};
2use cfg_if::cfg_if;
3use std::any::type_name;
4use std::sync::{Arc, RwLock, Weak};
5use tracing::{error, trace};
6
7cfg_if! {
8 if #[cfg(not(feature = "async"))] {
9 use std::cell::RefCell;
10 use tokens::ChangeToken;
11 }
12}
13
14type Callback<T> = dyn Fn(&str, Ref<T>) + Send + Sync;
15
16pub struct Subscription<T: Value>(#[allow(unused)] Arc<Callback<T>>);
22
23impl<T: Value> Subscription<T> {
24 #[inline]
30 pub fn new(callback: Arc<Callback<T>>) -> Self {
31 Self(callback)
32 }
33}
34
35#[cfg_attr(feature = "async", maybe_impl::traits(Send, Sync))]
37pub trait Monitor<T: Value> {
38 fn get(&self) -> Result<Ref<T>, Error> {
40 self.get_named("")
41 }
42
43 fn get_unchecked(&self) -> Ref<T> {
49 match self.get_named("") {
50 Ok(value) => value,
51 Err(error) => {
52 error!("{error:?}");
53 panic!("{}", error)
54 }
55 }
56 }
57
58 fn get_named(&self, name: &str) -> Result<Ref<T>, Error>;
64
65 fn get_named_unchecked(&self, name: &str) -> Ref<T> {
75 match self.get_named(name) {
76 Ok(value) => value,
77 Err(error) => {
78 error!("[{name}] {error:?}");
79 panic!("[{name}] {}", error)
80 }
81 }
82 }
83
84 #[must_use = "no change notifications occur after the subscription is dropped"]
95 fn on_change(&self, changed: Box<Callback<T>>) -> Subscription<T>;
96}
97
98pub struct DefaultMonitor<T: Value> {
100 tracker: Arc<ChangeTracker<T>>,
101 _subscriptions: Vec<Box<dyn tokens::Subscription>>,
102}
103
104#[cfg(feature = "async")]
105impl<T: Value + 'static> DefaultMonitor<T> {
106 pub fn new(
114 cache: Ref<Cache<T>>,
115 sources: Vec<Ref<dyn ChangeTokenSource<T>>>,
116 factory: Ref<dyn Factory<T>>,
117 ) -> Self {
118 let tracker = Arc::new(ChangeTracker::new(cache, factory));
119 let mut subscriptions = Vec::new();
120
121 for source in sources {
122 let producer = Producer::new(source.clone());
123 let consumer = tracker.clone();
124 let state = Arc::new(source.name().to_owned());
125 let subscription: Box<dyn tokens::Subscription> = Box::new(tokens::on_change(
126 move || producer.token(),
127 move |state| {
128 let kind = type_name::<T>().rsplit_once("::").unwrap().1;
129
130 if let Some(name) = state {
131 trace!("{} ({name}) have changed", kind);
132 consumer.on_change(&name);
133 } else {
134 trace!("{} options have changed", kind);
135 consumer.on_change("");
136 };
137 },
138 Some(state),
139 ));
140 subscriptions.push(subscription);
141 }
142
143 Self {
144 tracker,
145 _subscriptions: subscriptions,
146 }
147 }
148}
149
150#[cfg(not(feature = "async"))]
151impl<T: Value + 'static> DefaultMonitor<T> {
152 pub fn new(
160 cache: Ref<Cache<T>>,
161 sources: Vec<Ref<dyn ChangeTokenSource<T>>>,
162 factory: Ref<dyn Factory<T>>,
163 ) -> Self {
164 Self {
165 tracker: Arc::new(ChangeTracker::new(cache, sources, factory)),
166 _subscriptions: Vec::new(),
167 }
168 }
169}
170
171impl<T: Value> Monitor<T> for DefaultMonitor<T> {
172 fn get_named(&self, name: &str) -> Result<Ref<T>, Error> {
173 cfg_if! {
174 if #[cfg(not(feature = "async"))] {
175 self.tracker.check_for_changes();
176 }
177 }
178
179 self.tracker.get(name)
180 }
181
182 #[inline]
183 fn on_change(&self, changed: Box<Callback<T>>) -> Subscription<T> {
184 self.tracker.add(changed)
185 }
186}
187
188struct ChangeTracker<T: Value> {
189 cache: Ref<Cache<T>>,
190 factory: Ref<dyn Factory<T>>,
191 listeners: RwLock<Vec<Weak<Callback<T>>>>,
192
193 #[cfg(not(feature = "async"))]
194 sources: Vec<Ref<dyn ChangeTokenSource<T>>>,
195
196 #[cfg(not(feature = "async"))]
197 tokens: RefCell<Vec<Box<dyn ChangeToken>>>,
198
199 #[cfg(not(feature = "async"))]
202 processed: RefCell<Vec<bool>>,
203}
204
205impl<T: Value> ChangeTracker<T> {
206 fn get(&self, name: &str) -> Result<Ref<T>, Error> {
207 self.cache.get_or_add(name, &|n| self.factory.create(n))
208 }
209
210 fn add(&self, listener: Box<Callback<T>>) -> Subscription<T> {
211 let mut listeners = self.listeners.write().unwrap();
212
213 for i in (0..listeners.len()).rev() {
215 if listeners[i].upgrade().is_none() {
216 listeners.remove(i);
217 }
218 }
219
220 let source: Arc<Callback<T>> = Arc::from(listener);
221
222 listeners.push(Arc::downgrade(&source));
223 Subscription::new(source)
224 }
225
226 fn on_change(&self, name: &str) {
227 let callbacks: Vec<_> = self
231 .listeners
232 .read()
233 .unwrap()
234 .iter()
235 .filter_map(|c| c.upgrade())
236 .collect();
237
238 self.cache.remove(name);
239
240 for callback in callbacks {
241 if let Ok(options) = self.get(name) {
242 callback(name, options);
243 }
244 }
245 }
246}
247
248#[cfg(feature = "async")]
249impl<T: Value> ChangeTracker<T> {
250 #[inline]
251 fn new(cache: Ref<Cache<T>>, factory: Ref<dyn Factory<T>>) -> Self {
252 Self {
253 cache,
254 factory,
255 listeners: Default::default(),
256 }
257 }
258}
259
260#[cfg(not(feature = "async"))]
261impl<T: Value> ChangeTracker<T> {
262 fn new(cache: Ref<Cache<T>>, sources: Vec<Ref<dyn ChangeTokenSource<T>>>, factory: Ref<dyn Factory<T>>) -> Self {
263 let len = sources.len();
264 let tokens = sources.iter().map(|s| s.token()).collect();
265
266 Self {
267 cache,
268 factory,
269 listeners: Default::default(),
270 sources,
271 tokens: RefCell::new(tokens),
272 processed: RefCell::new(vec![false; len]),
273 }
274 }
275
276 fn check_for_changes(&self) {
277 let mut tokens = self.tokens.borrow_mut();
278 let mut processed = self.processed.borrow_mut();
279
280 for (i, source) in self.sources.iter().enumerate() {
281 if tokens[i].changed() && !processed[i] {
282 let kind = type_name::<T>().rsplit_once("::").unwrap().1;
283 let name = source.name();
284
285 if name.is_empty() {
286 trace!("{} have changed", kind);
287 } else {
288 trace!("{} ({name}) have changed", kind);
289 }
290
291 self.on_change(source.name());
292
293 let new_token = source.token();
294
295 processed[i] = new_token.changed();
298 tokens[i] = new_token;
299 }
300 }
301 }
302}
303
304cfg_if! {
305 if #[cfg(feature = "async")] {
306 struct Producer<T: Value>(Ref<dyn ChangeTokenSource<T>>);
307
308 impl<T: Value> Producer<T> {
309 #[inline]
310 fn new(source: Ref<dyn ChangeTokenSource<T>>) -> Self {
311 Self(source)
312 }
313 }
314
315 impl<T: Value> std::ops::Deref for Producer<T> {
316 type Target = dyn ChangeTokenSource<T>;
317
318 #[inline]
319 fn deref(&self) -> &Self::Target {
320 self.0.deref()
321 }
322 }
323 }
324}
325
326impl<T: Value + Default + 'static> From<DefaultFactory<T>> for DefaultMonitor<T> {
327 #[inline]
328 fn from(factory: DefaultFactory<T>) -> Self {
329 Self::new(Default::default(), Default::default(), Ref::new(factory))
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use crate::{Cache, Configure, DefaultFactory};
337 use std::{
338 cell::RefCell,
339 sync::{
340 atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
341 Mutex,
342 },
343 };
344 use tokens::{ChangeToken, SharedChangeToken, SingleChangeToken};
345
346 #[derive(Default)]
347 struct Config {
348 retries: u8,
349 }
350
351 pub struct OptionsState {
352 dirty: AtomicBool,
353 }
354
355 impl OptionsState {
356 #[inline]
357 fn is_dirty(&self) -> bool {
358 self.dirty.load(Ordering::SeqCst)
359 }
360
361 #[inline]
362 fn mark_dirty(&self) {
363 self.dirty.store(true, Ordering::SeqCst)
364 }
365
366 #[inline]
367 fn reset(&self) {
368 self.dirty.store(false, Ordering::SeqCst)
369 }
370 }
371
372 impl Default for OptionsState {
373 #[inline]
374 fn default() -> Self {
375 Self {
376 dirty: AtomicBool::new(true),
377 }
378 }
379 }
380
381 #[derive(Default)]
382 struct ConfigSetup {
383 counter: AtomicU8,
384 }
385
386 impl Configure<Config> for ConfigSetup {
387 fn run(&self, name: &str, options: &mut Config) {
388 if name.is_empty() {
389 let retries = self.counter.fetch_add(1, Ordering::SeqCst) + 1;
390 options.retries = retries;
391 }
392 }
393 }
394
395 #[derive(Default)]
396 struct ConfigSource {
397 token: SharedChangeToken<SingleChangeToken>,
398 }
399
400 impl ConfigSource {
401 #[inline]
402 fn changed(&self) {
403 self.token.notify()
404 }
405 }
406
407 impl ChangeTokenSource<Config> for ConfigSource {
408 #[inline]
409 fn token(&self) -> Box<dyn ChangeToken> {
410 Box::new(self.token.clone())
411 }
412 }
413
414 struct Foo {
415 monitor: Ref<dyn Monitor<Config>>,
416 _sub: Subscription<Config>,
417 state: Arc<OptionsState>,
418 retries: RefCell<u8>,
419 }
420
421 impl Foo {
422 fn new(monitor: Ref<dyn Monitor<Config>>) -> Self {
423 let state = Arc::new(OptionsState::default());
424 let other = state.clone();
425
426 Self {
427 monitor: monitor.clone(),
428 _sub: monitor.on_change(Box::new(move |_name: &str, _options: Ref<Config>| other.mark_dirty())),
429 state,
430 retries: RefCell::default(),
431 }
432 }
433
434 fn retries(&self) -> u8 {
435 if self.state.is_dirty() {
436 *self.retries.borrow_mut() = self.monitor.get_unchecked().retries;
437 self.state.reset();
438 }
439
440 self.retries.borrow().clone()
441 }
442 }
443
444 fn new_monitor() -> (Ref<dyn Monitor<Config>>, Ref<ConfigSource>, Ref<ConfigSetup>) {
445 let cache = Ref::new(Cache::<Config>::default());
446 let setup = Ref::new(ConfigSetup::default());
447 let factory = Ref::new(DefaultFactory::new(vec![setup.clone()], Vec::default(), Vec::default()));
448 let source = Ref::new(ConfigSource::default());
449 let monitor: Ref<dyn Monitor<Config>> = Ref::new(DefaultMonitor::new(cache, vec![source.clone()], factory));
450 (monitor, source, setup)
451 }
452
453 struct NamedConfigSource {
455 name: String,
456 token: SharedChangeToken<SingleChangeToken>,
457 }
458
459 impl NamedConfigSource {
460 #[inline]
461 fn new(name: &str) -> Self {
462 Self {
463 name: name.to_owned(),
464 token: SharedChangeToken::default(),
465 }
466 }
467
468 #[inline]
469 fn changed(&self) {
470 self.token.notify()
471 }
472 }
473
474 impl ChangeTokenSource<Config> for NamedConfigSource {
475 #[inline]
476 fn token(&self) -> Box<dyn ChangeToken> {
477 Box::new(self.token.clone())
478 }
479
480 #[inline]
481 fn name(&self) -> &str {
482 &self.name
483 }
484 }
485
486 struct NamedConfigSetup {
489 a: AtomicU8,
490 b: AtomicU8,
491 }
492
493 impl Default for NamedConfigSetup {
494 fn default() -> Self {
495 Self {
496 a: AtomicU8::new(0),
497 b: AtomicU8::new(0),
498 }
499 }
500 }
501
502 impl Configure<Config> for NamedConfigSetup {
503 fn run(&self, name: &str, options: &mut Config) {
504 match name {
505 "a" => options.retries = self.a.fetch_add(1, Ordering::SeqCst) + 10,
506 "b" => options.retries = self.b.fetch_add(1, Ordering::SeqCst) + 20,
507 _ => {}
508 }
509 }
510 }
511
512 #[test]
513 fn monitored_options_should_update_when_source_changes() {
514 let cache = Ref::new(Cache::<Config>::default());
516 let setup = Ref::new(ConfigSetup::default());
517 let factory = Ref::new(DefaultFactory::new(vec![setup], Vec::default(), Vec::default()));
518 let source = Ref::new(ConfigSource::default());
519 let monitor: Ref<dyn Monitor<Config>> = Ref::new(DefaultMonitor::new(cache, vec![source.clone()], factory));
520 let foo = Foo::new(monitor.clone());
521 let initial = foo.retries();
522
523 source.changed();
525
526 let _ = monitor.get_unchecked();
527
528 assert_eq!(initial, 1);
530 assert_eq!(foo.retries(), 2);
531 }
532
533 #[test]
534 fn get_none_returns_factory_created_value() {
535 let (monitor, source, _) = new_monitor();
537
538 let first = monitor.get_unchecked();
539 assert_eq!(first.retries, 1);
540
541 let cached = monitor.get_unchecked();
542 assert_eq!(cached.retries, 1);
543
544 source.changed();
546
547 let updated = monitor.get_unchecked();
549
550 assert_eq!(updated.retries, 2);
551 }
552
553 #[test]
554 fn on_change_callbacks_fire_with_correct_name_and_value() {
555 let (monitor, source, _) = new_monitor();
557 let _ = monitor.get();
558 let observed_name: Arc<Mutex<String>> = Arc::new(Mutex::new(String::new()));
559 let observed_retries: Arc<Mutex<u8>> = Arc::new(Mutex::new(0));
560 let name_clone = observed_name.clone();
561 let retries_clone = observed_retries.clone();
562 let _sub = monitor.on_change(Box::new(move |name, opts| {
563 *name_clone.lock().unwrap() = name.to_owned();
564 *retries_clone.lock().unwrap() = opts.retries;
565 }));
566
567 source.changed();
569 let _ = monitor.get();
570
571 let name_val = observed_name.lock().unwrap();
573
574 assert_eq!(*name_val, "", "callback should receive '' for unnamed source");
575
576 let retries_val = observed_retries.lock().unwrap();
577
578 assert_eq!(*retries_val, 2, "callback should receive updated retries value");
579 }
580
581 #[test]
582 fn dropping_subscription_prevents_further_callbacks() {
583 let (monitor, source, _setup) = new_monitor();
585 let _ = monitor.get();
586 let call_count = Arc::new(AtomicU32::new(0));
587 let count_clone = call_count.clone();
588 let sub = monitor.on_change(Box::new(move |_, _| {
589 count_clone.fetch_add(1, Ordering::SeqCst);
590 }));
591
592 source.changed();
594 let _ = monitor.get();
595
596 assert_eq!(
597 call_count.load(Ordering::SeqCst),
598 1,
599 "callback should fire once after first change"
600 );
601
602 drop(sub);
603
604 source.changed();
606 let _ = monitor.get();
607
608 assert_eq!(
610 call_count.load(Ordering::SeqCst),
611 1,
612 "callback should not fire after subscription is dropped"
613 );
614 }
615
616 #[test]
617 fn multiple_sources_changing_one_only_invalidates_that_source() {
618 let cache = Ref::new(Cache::<Config>::default());
620 let setup = Ref::new(NamedConfigSetup::default());
621 let factory = Ref::new(DefaultFactory::new(vec![setup], Vec::default(), Vec::default()));
622 let source_a = Ref::new(NamedConfigSource::new("a"));
623 let source_b = Ref::new(NamedConfigSource::new("b"));
624 let monitor: Ref<dyn Monitor<Config>> = Ref::new(DefaultMonitor::new(
625 cache,
626 vec![source_a.clone(), source_b.clone()],
627 factory,
628 ));
629 let val_a = monitor.get_named_unchecked("a");
630 let val_b = monitor.get_named_unchecked("b");
631
632 assert_eq!(val_a.retries, 10, "source a initial retries");
633 assert_eq!(val_b.retries, 20, "source b initial retries");
634
635 let callback_names: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
636 let names_clone = callback_names.clone();
637 let _sub = monitor.on_change(Box::new(move |name, _| {
638 names_clone.lock().unwrap().push(name.to_owned());
639 }));
640
641 source_a.changed();
643
644 let _ = monitor.get_named("a");
645
646 let names = callback_names.lock().unwrap();
648 assert_eq!(names.len(), 1, "only one callback should fire");
649 assert_eq!(names[0], "a", "callback should fire for source a");
650 drop(names);
651
652 let val_a_updated = monitor.get_named_unchecked("a");
653 assert_eq!(val_a_updated.retries, 11, "source a should be invalidated → new value");
654
655 let val_b_same = monitor.get_named_unchecked("b");
656 assert_eq!(val_b_same.retries, 20, "source b should still be cached → same value");
657 }
658}