use super::tracker::{dispose_subscriber, start_tracking, stop_tracking, Subscriber, SubscriberId};
use crate::utils::lock::{read_or_recover, write_or_recover};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
pub struct Effect {
effect_fn: Arc<dyn Fn() + Send + Sync>,
active: Arc<AtomicBool>,
id: SubscriberId,
}
impl Effect {
pub fn new(f: impl Fn() + Send + Sync + 'static) -> Self {
let active = Arc::new(AtomicBool::new(true));
let id = SubscriberId::new();
let active_clone = active.clone();
let effect_fn: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
if active_clone.load(Ordering::SeqCst) {
f();
}
});
let effect = Self {
effect_fn,
active,
id,
};
effect.run_tracked();
effect
}
pub fn lazy(f: impl Fn() + Send + Sync + 'static) -> Self {
let active = Arc::new(AtomicBool::new(true));
let id = SubscriberId::new();
let active_clone = active.clone();
let effect_fn: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
if active_clone.load(Ordering::SeqCst) {
f();
}
});
Self {
effect_fn,
active,
id,
}
}
fn run_tracked(&self) {
if !self.active.load(Ordering::SeqCst) {
return;
}
let effect_fn = self.effect_fn.clone();
let id = self.id;
type CallbackType = Arc<dyn Fn() + Send + Sync>;
let callback_cell: Arc<RwLock<Option<CallbackType>>> = Arc::new(RwLock::new(None));
let callback_cell_clone = callback_cell.clone();
let callback: CallbackType = Arc::new(move || {
let self_callback = match read_or_recover(&callback_cell_clone).as_ref() {
Some(cb) => cb.clone(),
None => {
#[cfg(debug_assertions)]
{
panic!("Callback must be initialized before invocation - this is a bug in the effect system");
}
#[cfg(not(debug_assertions))]
{
return;
}
}
};
let subscriber = Subscriber {
id,
callback: self_callback,
};
start_tracking(subscriber);
effect_fn();
stop_tracking();
});
*write_or_recover(&callback_cell) = Some(callback.clone());
let subscriber = Subscriber { id, callback };
start_tracking(subscriber);
(self.effect_fn)();
stop_tracking();
}
pub fn run(&self) {
self.run_tracked();
}
pub fn stop(&self) {
self.active.store(false, Ordering::SeqCst);
dispose_subscriber(self.id);
}
pub fn resume(&self) {
self.active.store(true, Ordering::SeqCst);
}
pub fn is_active(&self) -> bool {
self.active.load(Ordering::SeqCst)
}
pub fn id(&self) -> SubscriberId {
self.id
}
}
impl Drop for Effect {
fn drop(&mut self) {
self.active.store(false, Ordering::SeqCst);
dispose_subscriber(self.id);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicI32, Ordering};
#[test]
fn test_effect_new_runs_immediately() {
let count = Arc::new(AtomicI32::new(0));
let count_clone = count.clone();
Effect::new(move || {
count_clone.fetch_add(1, Ordering::SeqCst);
});
assert_eq!(count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_effect_new_with_closure() {
let executed = Arc::new(AtomicBool::new(false));
let executed_clone = executed.clone();
Effect::new(move || {
executed_clone.store(true, Ordering::SeqCst);
});
assert!(executed.load(Ordering::SeqCst));
}
#[test]
fn test_effect_lazy_does_not_run_immediately() {
let count = Arc::new(AtomicI32::new(0));
let count_clone = count.clone();
Effect::lazy(move || {
count_clone.fetch_add(1, Ordering::SeqCst);
});
assert_eq!(count.load(Ordering::SeqCst), 0);
}
#[test]
fn test_effect_lazy_run_manually() {
let count = Arc::new(AtomicI32::new(0));
let count_clone = count.clone();
let effect = Effect::lazy(move || {
count_clone.fetch_add(1, Ordering::SeqCst);
});
assert_eq!(count.load(Ordering::SeqCst), 0);
effect.run();
assert_eq!(count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_effect_run_multiple_times() {
let count = Arc::new(AtomicI32::new(0));
let count_clone = count.clone();
let effect = Effect::lazy(move || {
count_clone.fetch_add(1, Ordering::SeqCst);
});
effect.run();
effect.run();
effect.run();
assert_eq!(count.load(Ordering::SeqCst), 3);
}
#[test]
fn test_effect_stop_prevents_running() {
let count = Arc::new(AtomicI32::new(0));
let count_clone = count.clone();
let effect = Effect::lazy(move || {
count_clone.fetch_add(1, Ordering::SeqCst);
});
effect.run();
assert_eq!(count.load(Ordering::SeqCst), 1);
effect.stop();
effect.run();
assert_eq!(count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_effect_resume_allows_running() {
let count = Arc::new(AtomicI32::new(0));
let count_clone = count.clone();
let effect = Effect::lazy(move || {
count_clone.fetch_add(1, Ordering::SeqCst);
});
effect.stop();
effect.run();
assert_eq!(count.load(Ordering::SeqCst), 0);
effect.resume();
effect.run();
assert_eq!(count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_effect_is_active_initially() {
let effect = Effect::new(|| {});
assert!(effect.is_active());
}
#[test]
fn test_effect_is_active_after_stop() {
let effect = Effect::new(|| {});
effect.stop();
assert!(!effect.is_active());
}
#[test]
fn test_effect_is_active_after_resume() {
let effect = Effect::new(|| {});
effect.stop();
assert!(!effect.is_active());
effect.resume();
assert!(effect.is_active());
}
#[test]
fn test_effect_id_returns_value() {
let effect = Effect::new(|| {});
let _id = effect.id();
}
#[test]
fn test_effect_id_is_unique() {
let effect1 = Effect::new(|| {});
let effect2 = Effect::new(|| {});
assert_ne!(effect1.id(), effect2.id());
}
#[test]
fn test_effect_lazy_is_active_initially() {
let effect = Effect::lazy(|| {});
assert!(effect.is_active());
}
#[test]
fn test_effect_drop_cleanup() {
let executed = Arc::new(AtomicBool::new(false));
let executed_clone = executed.clone();
{
let _effect = Effect::lazy(move || {
executed_clone.store(true, Ordering::SeqCst);
});
}
assert!(!executed.load(Ordering::SeqCst));
}
}