use crate::{ChangeToken, Registration, State, Subscription};
use std::sync::{Arc, Mutex, Weak};
#[must_use]
pub fn on_change<TToken, TProducer, TConsumer, TState>(
producer: TProducer,
consumer: TConsumer,
state: Option<Arc<TState>>,
) -> impl Subscription
where
TState: Send + Sync + 'static,
TToken: ChangeToken + 'static,
TProducer: Fn() -> TToken + Send + Sync + 'static,
TConsumer: Fn(Option<Arc<TState>>) + Send + Sync + 'static,
{
ChangeTokenRegistration::new(producer, consumer, state)
}
struct ChangeTokenRegistration<TToken, TProducer, TConsumer, TState> {
me: Weak<Self>,
producer: TProducer,
consumer: TConsumer,
state: Option<Arc<TState>>,
registration: Mutex<(Option<TToken>, Registration)>,
}
impl<TToken, TProducer, TConsumer, TState> ChangeTokenRegistration<TToken, TProducer, TConsumer, TState>
where
TState: Send + Sync + 'static,
TToken: ChangeToken + 'static,
TProducer: Fn() -> TToken + Send + Sync + 'static,
TConsumer: Fn(Option<Arc<TState>>) + Send + Sync + 'static,
{
fn new(producer: TProducer, consumer: TConsumer, state: Option<Arc<TState>>) -> Arc<Self> {
let token = (producer)();
let instance = Arc::new_cyclic(|me| Self {
me: me.clone(),
producer,
consumer,
state,
registration: Default::default(),
});
instance.register(token);
instance
}
fn register(&self, token: TToken) {
let this = Arc::new(self.me.clone());
let registration = token.register(Box::new(Self::on_changed), Some(this));
if !token.changed() || token.must_poll() {
*self.registration.lock().unwrap() = (Some(token), registration);
}
}
fn on_changed(state: State) {
state
.unwrap()
.downcast_ref::<Weak<Self>>()
.unwrap()
.upgrade()
.unwrap()
.on_notified()
}
fn on_notified(&self) {
let token = (self.producer)();
(self.consumer)(self.state.clone());
self.register(token);
}
}
impl<TToken, TProducer, TConsumer, TState> Subscription
for Arc<ChangeTokenRegistration<TToken, TProducer, TConsumer, TState>>
where
TState: Send + Sync + 'static,
TToken: ChangeToken + 'static,
TProducer: Fn() -> TToken + Send + Sync + 'static,
TConsumer: Fn(Option<Arc<TState>>) + Send + Sync + 'static,
{
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{DefaultChangeToken, SharedChangeToken, SingleChangeToken};
use std::{
mem::ManuallyDrop,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc,
},
};
#[test]
fn changed_should_signal_consumer() {
let token = SharedChangeToken::<DefaultChangeToken>::default();
let fired = Arc::new(AtomicBool::default());
let producer = token.clone();
let _unused = on_change(
move || producer.clone(),
|state| state.unwrap().store(true, Relaxed),
Some(fired.clone()),
);
token.notify();
assert!(fired.load(Relaxed));
}
#[test]
fn changed_should_not_signal_consumer_after_registration_is_dropped() {
let token = SharedChangeToken::<SingleChangeToken>::default();
let fired = Arc::new(AtomicBool::default());
let producer = token.clone();
let subscription = ManuallyDrop::new(on_change(
move || producer.clone(),
|state| state.unwrap().store(true, Relaxed),
Some(fired.clone()),
));
let _ = ManuallyDrop::into_inner(subscription);
token.notify();
assert!(!fired.load(Relaxed));
}
}