1use crate::{OptionsChangeTokenSource, OptionsFactory, OptionsMonitorCache, Ref, Value};
2use cfg_if::cfg_if;
3use std::sync::{Arc, RwLock, Weak};
4
5cfg_if! {
6 if #[cfg(not(feature = "async"))] {
7 use std::cell::RefCell;
8 use tokens::ChangeToken;
9 }
10}
11
12type Callback<T> = dyn Fn(Option<&str>, Ref<T>) + Send + Sync;
13
14pub struct Subscription<T: Value>(#[allow(unused)] Arc<Callback<T>>);
20
21impl<T: Value> Subscription<T> {
22 #[inline]
28 pub fn new(callback: Arc<Callback<T>>) -> Self {
29 Self(callback)
30 }
31}
32
33#[cfg_attr(feature = "async", maybe_impl::traits(Send, Sync))]
35pub trait OptionsMonitor<T: Value> {
36 fn current_value(&self) -> Ref<T> {
38 self.get(None)
39 }
40
41 fn get(&self, name: Option<&str>) -> Ref<T>;
47
48 fn on_change(&self, changed: Box<Callback<T>>) -> Subscription<T>;
59}
60
61pub struct DefaultOptionsMonitor<T: Value> {
63 tracker: Arc<ChangeTracker<T>>,
64 _subscriptions: Vec<Box<dyn tokens::Subscription>>,
65}
66
67#[cfg(feature = "async")]
68impl<T: Value + 'static> DefaultOptionsMonitor<T> {
69 pub fn new(
77 cache: Ref<dyn OptionsMonitorCache<T>>,
78 sources: Vec<Ref<dyn OptionsChangeTokenSource<T>>>,
79 factory: Ref<dyn OptionsFactory<T>>,
80 ) -> Self {
81 let tracker = Arc::new(ChangeTracker::new(cache, factory));
82 let mut subscriptions = Vec::new();
83
84 for source in sources {
85 let producer = Producer::new(source.clone());
86 let consumer = tracker.clone();
87 let state = source.name().map(|n| Arc::new(n.to_owned()));
88 let subscription: Box<dyn tokens::Subscription> = Box::new(tokens::on_change(
89 move || producer.token(),
90 move |state| {
91 if let Some(name) = state {
92 consumer.on_change(Some(name.as_str()));
93 } else {
94 consumer.on_change(None);
95 };
96 },
97 state,
98 ));
99 subscriptions.push(subscription);
100 }
101
102 Self {
103 tracker,
104 _subscriptions: subscriptions,
105 }
106 }
107}
108
109#[cfg(not(feature = "async"))]
110impl<T: Value + 'static> DefaultOptionsMonitor<T> {
111 pub fn new(
119 cache: Ref<dyn OptionsMonitorCache<T>>,
120 sources: Vec<Ref<dyn OptionsChangeTokenSource<T>>>,
121 factory: Ref<dyn OptionsFactory<T>>,
122 ) -> Self {
123 Self {
124 tracker: Arc::new(ChangeTracker::new(cache, sources, factory)),
125 _subscriptions: Vec::new(),
126 }
127 }
128}
129
130impl<T: Value> OptionsMonitor<T> for DefaultOptionsMonitor<T> {
131 fn get(&self, name: Option<&str>) -> Ref<T> {
132 cfg_if! {
133 if #[cfg(not(feature = "async"))] {
134 self.tracker.check_for_changes();
135 }
136 }
137
138 self.tracker.get(name)
139 }
140
141 #[inline]
142 fn on_change(&self, changed: Box<Callback<T>>) -> Subscription<T> {
143 self.tracker.add(changed)
144 }
145}
146
147struct ChangeTracker<T: Value> {
148 cache: Ref<dyn OptionsMonitorCache<T>>,
149 factory: Ref<dyn OptionsFactory<T>>,
150 listeners: RwLock<Vec<Weak<Callback<T>>>>,
151
152 #[cfg(not(feature = "async"))]
153 sources: Vec<Ref<dyn OptionsChangeTokenSource<T>>>,
154
155 #[cfg(not(feature = "async"))]
156 tokens: RefCell<Vec<Box<dyn ChangeToken>>>,
157
158 #[cfg(not(feature = "async"))]
161 processed: RefCell<Vec<bool>>,
162}
163
164impl<T: Value> ChangeTracker<T> {
165 fn get(&self, name: Option<&str>) -> Ref<T> {
166 self.cache
167 .get_or_add(name, &|n| self.factory.create(n).unwrap_or_else(|e| panic!("{}", e)))
168 }
169
170 fn add(&self, listener: Box<Callback<T>>) -> Subscription<T> {
171 let mut listeners = self.listeners.write().unwrap();
172
173 for i in (0..listeners.len()).rev() {
175 if listeners[i].upgrade().is_none() {
176 listeners.remove(i);
177 }
178 }
179
180 let source: Arc<Callback<T>> = Arc::from(listener);
181
182 listeners.push(Arc::downgrade(&source));
183 Subscription::new(source)
184 }
185
186 fn on_change(&self, name: Option<&str>) {
187 let callbacks: Vec<_> = self
191 .listeners
192 .read()
193 .unwrap()
194 .iter()
195 .filter_map(|c| c.upgrade())
196 .collect();
197
198 self.cache.try_remove(name);
199
200 for callback in callbacks {
201 callback(name, self.get(name));
202 }
203 }
204}
205
206#[cfg(feature = "async")]
207impl<T: Value> ChangeTracker<T> {
208 #[inline]
209 fn new(cache: Ref<dyn OptionsMonitorCache<T>>, factory: Ref<dyn OptionsFactory<T>>) -> Self {
210 Self {
211 cache,
212 factory,
213 listeners: Default::default(),
214 }
215 }
216}
217
218#[cfg(not(feature = "async"))]
219impl<T: Value> ChangeTracker<T> {
220 fn new(
221 cache: Ref<dyn OptionsMonitorCache<T>>,
222 sources: Vec<Ref<dyn OptionsChangeTokenSource<T>>>,
223 factory: Ref<dyn OptionsFactory<T>>,
224 ) -> Self {
225 let len = sources.len();
226 let tokens = sources.iter().map(|s| s.token()).collect();
227 Self {
228 cache,
229 factory,
230 listeners: Default::default(),
231 sources,
232 tokens: RefCell::new(tokens),
233 processed: RefCell::new(vec![false; len]),
234 }
235 }
236
237 fn check_for_changes(&self) {
238 let mut tokens = self.tokens.borrow_mut();
239 let mut processed = self.processed.borrow_mut();
240
241 for (i, source) in self.sources.iter().enumerate() {
242 if tokens[i].changed() && !processed[i] {
243 self.on_change(source.name());
244
245 let new_token = source.token();
246
247 processed[i] = new_token.changed();
250 tokens[i] = new_token;
251 }
252 }
253 }
254}
255
256cfg_if! {
257 if #[cfg(feature = "async")] {
258 struct Producer<T: Value>(Ref<dyn OptionsChangeTokenSource<T>>);
259
260 impl<T: Value> Producer<T> {
261 #[inline]
262 fn new(source: Ref<dyn OptionsChangeTokenSource<T>>) -> Self {
263 Self(source)
264 }
265 }
266
267 impl<T: Value> std::ops::Deref for Producer<T> {
268 type Target = dyn OptionsChangeTokenSource<T>;
269
270 #[inline]
271 fn deref(&self) -> &Self::Target {
272 self.0.deref()
273 }
274 }
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use crate::*;
282 use std::{
283 cell::RefCell,
284 sync::{
285 atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
286 Mutex,
287 },
288 };
289 use tokens::{ChangeToken, SharedChangeToken, SingleChangeToken};
290
291 #[derive(Default)]
292 struct Config {
293 retries: u8,
294 }
295
296 pub struct OptionsState {
297 dirty: AtomicBool,
298 }
299
300 impl OptionsState {
301 fn is_dirty(&self) -> bool {
302 self.dirty.load(Ordering::SeqCst)
303 }
304
305 fn mark_dirty(&self) {
306 self.dirty.store(true, Ordering::SeqCst)
307 }
308
309 fn reset(&self) {
310 self.dirty.store(false, Ordering::SeqCst)
311 }
312 }
313
314 impl Default for OptionsState {
315 fn default() -> Self {
316 Self {
317 dirty: AtomicBool::new(true),
318 }
319 }
320 }
321
322 #[derive(Default)]
323 struct ConfigSetup {
324 counter: AtomicU8,
325 }
326
327 impl ConfigureOptions<Config> for ConfigSetup {
328 fn configure(&self, name: Option<&str>, options: &mut Config) {
329 if name.is_none() {
330 let retries = self.counter.fetch_add(1, Ordering::SeqCst) + 1;
331 options.retries = retries;
332 }
333 }
334 }
335
336 #[derive(Default)]
337 struct ConfigSource {
338 token: SharedChangeToken<SingleChangeToken>,
339 }
340
341 impl ConfigSource {
342 fn changed(&self) {
343 self.token.notify()
344 }
345 }
346
347 impl OptionsChangeTokenSource<Config> for ConfigSource {
348 fn token(&self) -> Box<dyn ChangeToken> {
349 Box::new(self.token.clone())
350 }
351 }
352
353 struct Foo {
354 monitor: Ref<dyn OptionsMonitor<Config>>,
355 _sub: Subscription<Config>,
356 state: Arc<OptionsState>,
357 retries: RefCell<u8>,
358 }
359
360 impl Foo {
361 fn new(monitor: Ref<dyn OptionsMonitor<Config>>) -> Self {
362 let state = Arc::new(OptionsState::default());
363 let other = state.clone();
364
365 Self {
366 monitor: monitor.clone(),
367 _sub: monitor.on_change(Box::new(move |_name: Option<&str>, _options: Ref<Config>| {
368 other.mark_dirty()
369 })),
370 state,
371 retries: RefCell::default(),
372 }
373 }
374
375 fn retries(&self) -> u8 {
376 if self.state.is_dirty() {
377 *self.retries.borrow_mut() = self.monitor.current_value().retries;
378 self.state.reset();
379 }
380
381 self.retries.borrow().clone()
382 }
383 }
384
385 fn new_monitor() -> (Ref<dyn OptionsMonitor<Config>>, Ref<ConfigSource>, Ref<ConfigSetup>) {
386 let cache = Ref::new(OptionsCache::<Config>::default());
387 let setup = Ref::new(ConfigSetup::default());
388 let factory = Ref::new(DefaultOptionsFactory::new(
389 vec![setup.clone()],
390 Vec::default(),
391 Vec::default(),
392 ));
393 let source = Ref::new(ConfigSource::default());
394 let monitor: Ref<dyn OptionsMonitor<Config>> =
395 Ref::new(DefaultOptionsMonitor::new(cache, vec![source.clone()], factory));
396 (monitor, source, setup)
397 }
398
399 struct NamedConfigSource {
401 name: String,
402 token: SharedChangeToken<SingleChangeToken>,
403 }
404
405 impl NamedConfigSource {
406 fn new(name: &str) -> Self {
407 Self {
408 name: name.to_owned(),
409 token: SharedChangeToken::default(),
410 }
411 }
412
413 fn changed(&self) {
414 self.token.notify()
415 }
416 }
417
418 impl OptionsChangeTokenSource<Config> for NamedConfigSource {
419 fn token(&self) -> Box<dyn ChangeToken> {
420 Box::new(self.token.clone())
421 }
422
423 fn name(&self) -> Option<&str> {
424 Some(&self.name)
425 }
426 }
427
428 struct NamedConfigSetup {
431 a: AtomicU8,
432 b: AtomicU8,
433 }
434
435 impl Default for NamedConfigSetup {
436 fn default() -> Self {
437 Self {
438 a: AtomicU8::new(0),
439 b: AtomicU8::new(0),
440 }
441 }
442 }
443
444 impl ConfigureOptions<Config> for NamedConfigSetup {
445 fn configure(&self, name: Option<&str>, options: &mut Config) {
446 match name {
447 Some("a") => {
448 options.retries = self.a.fetch_add(1, Ordering::SeqCst) + 10;
449 }
450 Some("b") => {
451 options.retries = self.b.fetch_add(1, Ordering::SeqCst) + 20;
452 }
453 _ => {}
454 }
455 }
456 }
457
458 #[test]
459 fn monitored_options_should_update_when_source_changes() {
460 let cache = Ref::new(OptionsCache::<Config>::default());
462 let setup = Ref::new(ConfigSetup::default());
463 let factory = Ref::new(DefaultOptionsFactory::new(vec![setup], Vec::default(), Vec::default()));
464 let source = Ref::new(ConfigSource::default());
465 let monitor: Ref<dyn OptionsMonitor<Config>> =
466 Ref::new(DefaultOptionsMonitor::new(cache, vec![source.clone()], factory));
467 let foo = Foo::new(monitor.clone());
468 let initial = foo.retries();
469
470 source.changed();
472
473 let _ = monitor.get(None);
474
475 assert_eq!(initial, 1);
477 assert_eq!(foo.retries(), 2);
478 }
479
480 #[test]
481 fn get_none_returns_factory_created_value() {
482 let (monitor, source, _setup) = new_monitor();
484
485 let first = monitor.get(None);
486 assert_eq!(first.retries, 1);
487
488 let cached = monitor.get(None);
489 assert_eq!(cached.retries, 1);
490
491 source.changed();
493
494 let updated = monitor.get(None);
496
497 assert_eq!(updated.retries, 2);
498 }
499
500 #[test]
501 fn on_change_callbacks_fire_with_correct_name_and_value() {
502 let (monitor, source, _setup) = new_monitor();
504 let _ = monitor.get(None);
505 let observed_name: Arc<Mutex<Option<Option<String>>>> = Arc::new(Mutex::new(None));
506 let observed_retries: Arc<Mutex<Option<u8>>> = Arc::new(Mutex::new(None));
507 let name_clone = observed_name.clone();
508 let retries_clone = observed_retries.clone();
509
510 let _sub = monitor.on_change(Box::new(move |name: Option<&str>, opts: Ref<Config>| {
511 *name_clone.lock().unwrap() = Some(name.map(|s| s.to_owned()));
512 *retries_clone.lock().unwrap() = Some(opts.retries);
513 }));
514
515 source.changed();
517 let _ = monitor.get(None);
518
519 let name_val = observed_name.lock().unwrap();
521
522 assert_eq!(
523 *name_val,
524 Some(None),
525 "callback should receive name=None for unnamed source"
526 );
527
528 let retries_val = observed_retries.lock().unwrap();
529
530 assert_eq!(*retries_val, Some(2), "callback should receive updated retries value");
531 }
532
533 #[test]
534 fn dropping_subscription_prevents_further_callbacks() {
535 let (monitor, source, _setup) = new_monitor();
537 let _ = monitor.get(None);
538 let call_count = Arc::new(AtomicU32::new(0));
539 let count_clone = call_count.clone();
540 let sub = monitor.on_change(Box::new(move |_name: Option<&str>, _opts: Ref<Config>| {
541 count_clone.fetch_add(1, Ordering::SeqCst);
542 }));
543
544 source.changed();
546 let _ = monitor.get(None);
547
548 assert_eq!(
549 call_count.load(Ordering::SeqCst),
550 1,
551 "callback should fire once after first change"
552 );
553
554 drop(sub);
555
556 source.changed();
558 let _ = monitor.get(None);
559
560 assert_eq!(
562 call_count.load(Ordering::SeqCst),
563 1,
564 "callback should not fire after subscription is dropped"
565 );
566 }
567
568 #[test]
569 fn multiple_sources_changing_one_only_invalidates_that_source() {
570 let cache = Ref::new(OptionsCache::<Config>::default());
572 let setup = Ref::new(NamedConfigSetup::default());
573 let factory = Ref::new(DefaultOptionsFactory::new(vec![setup], Vec::default(), Vec::default()));
574 let source_a = Ref::new(NamedConfigSource::new("a"));
575 let source_b = Ref::new(NamedConfigSource::new("b"));
576 let monitor: Ref<dyn OptionsMonitor<Config>> = Ref::new(DefaultOptionsMonitor::new(
577 cache,
578 vec![source_a.clone(), source_b.clone()],
579 factory,
580 ));
581 let val_a = monitor.get(Some("a"));
582 let val_b = monitor.get(Some("b"));
583
584 assert_eq!(val_a.retries, 10, "source a initial retries");
585 assert_eq!(val_b.retries, 20, "source b initial retries");
586
587 let callback_names: Arc<Mutex<Vec<Option<String>>>> = Arc::new(Mutex::new(Vec::new()));
588 let names_clone = callback_names.clone();
589 let _sub = monitor.on_change(Box::new(move |name: Option<&str>, _opts: Ref<Config>| {
590 names_clone.lock().unwrap().push(name.map(|s| s.to_owned()));
591 }));
592
593 source_a.changed();
595
596 let _ = monitor.get(Some("a"));
597
598 let names = callback_names.lock().unwrap();
600 assert_eq!(names.len(), 1, "only one callback should fire");
601 assert_eq!(names[0], Some("a".to_owned()), "callback should fire for source a");
602 drop(names);
603
604 let val_a_updated = monitor.get(Some("a"));
605 assert_eq!(val_a_updated.retries, 11, "source a should be invalidated → new value");
606
607 let val_b_same = monitor.get(Some("b"));
608 assert_eq!(val_b_same.retries, 20, "source b should still be cached → same value");
609 }
610}