use std::sync::{Arc, RwLock};
use once_cell::sync::Lazy;
use crate::{RUNTIME, Sender};
use super::SubscriberFn;
pub trait Reducible {
type Input;
fn init() -> Self;
fn reduce(&mut self, input: Self::Input) -> bool;
}
struct ReducerInner<Data: Reducible> {
sender: Sender<Data::Input>,
subscribers: Arc<RwLock<Vec<SubscriberFn<Data>>>>,
}
impl<Data> Default for ReducerInner<Data>
where
Data: Reducible + Send + 'static,
Data::Input: Send,
{
fn default() -> Self {
let (sender, receiver) = crate::channel();
let subscribers: Arc<RwLock<Vec<SubscriberFn<Data>>>> = Arc::default();
let rt_subscribers = subscribers.clone();
RUNTIME.spawn(async move {
let mut data = Data::init();
while let Some(input) = receiver.recv().await {
if data.reduce(input) {
rt_subscribers
.write()
.unwrap()
.retain(|subscriber| subscriber(&data));
}
}
});
Self {
sender,
subscribers,
}
}
}
impl<Data> std::fmt::Debug for ReducerInner<Data>
where
Data: std::fmt::Debug + Reducible,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReducerInner")
.field("sender", &self.sender)
.field("subscribers", &self.subscribers.try_read().map(|s| s.len()))
.finish()
}
}
#[derive(Debug)]
pub struct Reducer<Data: Reducible> {
inner: Lazy<ReducerInner<Data>>,
}
impl<Data> Default for Reducer<Data>
where
Data: Reducible + Send + 'static,
Data::Input: Send,
{
fn default() -> Self {
Self::new()
}
}
impl<Data> Reducer<Data>
where
Data: Reducible + Send + 'static,
Data::Input: Send,
{
#[must_use]
pub const fn new() -> Self {
Self {
inner: Lazy::new(ReducerInner::default),
}
}
pub fn subscribe<Msg, F>(&self, sender: &Sender<Msg>, f: F)
where
F: Fn(&Data) -> Msg + 'static + Send + Sync,
Msg: Send + 'static,
{
let sender = sender.clone();
self.inner
.subscribers
.write()
.unwrap()
.push(Box::new(move |data: &Data| {
let msg = f(data);
sender.send(msg).is_ok()
}));
}
pub fn subscribe_optional<Msg, F>(&self, sender: &Sender<Msg>, f: F)
where
F: Fn(&Data) -> Option<Msg> + 'static + Send + Sync,
Msg: Send + 'static,
{
let sender = sender.clone();
self.inner
.subscribers
.write()
.unwrap()
.push(Box::new(move |data: &Data| {
if let Some(msg) = f(data) {
sender.send(msg).is_ok()
} else {
true
}
}));
}
pub fn emit(&self, input: Data::Input) {
assert!(
self.inner.sender.send(input).is_ok(),
"Reducer runtime was dropped. Maybe a subscriber or the update function panicked?"
);
}
}
#[cfg(test)]
mod test {
use std::time::Duration;
use super::{Reducer, Reducible};
struct CounterReducer(u8);
enum CounterInput {
Increment,
Decrement,
}
impl Reducible for CounterReducer {
type Input = CounterInput;
fn init() -> Self {
Self(0)
}
fn reduce(&mut self, input: Self::Input) -> bool {
match input {
CounterInput::Increment => {
self.0 += 1;
}
CounterInput::Decrement => {
self.0 -= 1;
}
}
true
}
}
static REDUCER: Reducer<CounterReducer> = Reducer::new();
#[test]
fn shared_state() {
REDUCER.emit(CounterInput::Increment);
REDUCER.emit(CounterInput::Increment);
REDUCER.emit(CounterInput::Increment);
std::thread::sleep(Duration::from_millis(10));
let (sender, receiver) = crate::channel();
REDUCER.subscribe(&sender, |data| data.0);
REDUCER.emit(CounterInput::Increment);
assert_eq!(receiver.recv_sync().unwrap(), 4);
REDUCER.emit(CounterInput::Decrement);
assert_eq!(receiver.recv_sync().unwrap(), 3);
}
}