relm4/shared_state/
reducer.rs

1use std::sync::{Arc, RwLock};
2
3use once_cell::sync::Lazy;
4
5use crate::{Sender, RUNTIME};
6
7use super::SubscriberFn;
8
9/// A trait that implements a reducer function.
10///
11/// For more information, see [`Reducer`].
12pub trait Reducible {
13    /// The input message type used to modify the data.
14    type Input;
15
16    /// Initialize the data.
17    fn init() -> Self;
18
19    /// Process the input message and update the state.
20    ///
21    /// Return [`true`] to notify all subscribers.
22    /// Return [`false`] to ignore all subscribers.
23    ///
24    /// For example, it makes sense to return [`false`] to indicate
25    /// that the message had no (noteworthy) effect on the data and
26    /// the subscribers don't need to be notified.
27    fn reduce(&mut self, input: Self::Input) -> bool;
28}
29
30struct ReducerInner<Data: Reducible> {
31    sender: Sender<Data::Input>,
32    subscribers: Arc<RwLock<Vec<SubscriberFn<Data>>>>,
33}
34
35impl<Data> Default for ReducerInner<Data>
36where
37    Data: Reducible + Send + 'static,
38    Data::Input: Send,
39{
40    fn default() -> Self {
41        let (sender, receiver) = crate::channel();
42        let subscribers: Arc<RwLock<Vec<SubscriberFn<Data>>>> = Arc::default();
43
44        let rt_subscribers = subscribers.clone();
45        RUNTIME.spawn(async move {
46            let mut data = Data::init();
47            while let Some(input) = receiver.recv().await {
48                if data.reduce(input) {
49                    // Remove all elements which had their senders dropped.
50                    rt_subscribers
51                        .write()
52                        .unwrap()
53                        .retain(|subscriber| subscriber(&data));
54                }
55            }
56        });
57
58        Self {
59            sender,
60            subscribers,
61        }
62    }
63}
64
65impl<Data> std::fmt::Debug for ReducerInner<Data>
66where
67    Data: std::fmt::Debug + Reducible,
68{
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("ReducerInner")
71            .field("sender", &self.sender)
72            .field("subscribers", &self.subscribers.try_read().map(|s| s.len()))
73            .finish()
74    }
75}
76
77/// A type that allows you to share information across your
78/// application easily.
79///
80/// Reducers receive messages, update their state accordingly
81/// and notify their subscribers.
82///
83/// Unlike [`SharedState`](super::SharedState), this type doesn't
84/// allow direct access to the internal data.
85/// Instead, it updates its state after receiving messages, similar to components.
86/// After the message is processed, all subscribers will be notified.
87///
88/// # Example
89///
90/// ```
91/// use relm4::{Reducer, Reducible};
92///
93/// struct CounterReducer(u8);
94///
95/// enum CounterInput {
96///     Increment,
97///     Decrement,
98/// }
99///
100/// impl Reducible for CounterReducer {
101///     type Input = CounterInput;
102///
103///     fn init() -> Self {
104///         Self(0)
105///     }
106///
107///     fn reduce(&mut self, input: Self::Input) -> bool {
108///         match input {
109///             CounterInput::Increment => {
110///                 self.0 += 1;
111///             }
112///             CounterInput::Decrement =>  {
113///                 self.0 -= 1;
114///             }
115///         }
116///         true
117///     }
118/// }
119///
120/// // Create the reducer.
121/// static REDUCER: Reducer<CounterReducer> = Reducer::new();
122///
123/// // Update the reducer.
124/// REDUCER.emit(CounterInput::Increment);
125/// # use std::time::Duration;
126/// # std::thread::sleep(Duration::from_millis(10));
127///
128/// // Create a channel and subscribe to changes.
129/// let (sender, receiver) = relm4::channel();
130/// REDUCER.subscribe(&sender, |data| data.0);
131///
132/// // Count up to 2.
133/// REDUCER.emit(CounterInput::Increment);
134/// assert_eq!(receiver.recv_sync().unwrap(), 2);
135/// ```
136#[derive(Debug)]
137pub struct Reducer<Data: Reducible> {
138    inner: Lazy<ReducerInner<Data>>,
139}
140
141impl<Data> Default for Reducer<Data>
142where
143    Data: Reducible + Send + 'static,
144    Data::Input: Send,
145{
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151impl<Data> Reducer<Data>
152where
153    Data: Reducible + Send + 'static,
154    Data::Input: Send,
155{
156    /// Create a new [`Reducer`] variable.
157    ///
158    /// The data will be initialized lazily on the first access.
159    #[must_use]
160    pub const fn new() -> Self {
161        Self {
162            inner: Lazy::new(ReducerInner::default),
163        }
164    }
165
166    /// Subscribe to a [`Reducer`].
167    /// Any subscriber will be notified with a message every time
168    /// you modify the reducer (by calling [`Self::emit()`]).
169    pub fn subscribe<Msg, F>(&self, sender: &Sender<Msg>, f: F)
170    where
171        F: Fn(&Data) -> Msg + 'static + Send + Sync,
172        Msg: Send + 'static,
173    {
174        let sender = sender.clone();
175        self.inner
176            .subscribers
177            .write()
178            .unwrap()
179            .push(Box::new(move |data: &Data| {
180                let msg = f(data);
181                sender.send(msg).is_ok()
182            }));
183    }
184
185    /// An alternative version of [`subscribe()`](Self::subscribe()) that only send a message if
186    /// the closure returns [`Some`].
187    pub fn subscribe_optional<Msg, F>(&self, sender: &Sender<Msg>, f: F)
188    where
189        F: Fn(&Data) -> Option<Msg> + 'static + Send + Sync,
190        Msg: Send + 'static,
191    {
192        let sender = sender.clone();
193        self.inner
194            .subscribers
195            .write()
196            .unwrap()
197            .push(Box::new(move |data: &Data| {
198                if let Some(msg) = f(data) {
199                    sender.send(msg).is_ok()
200                } else {
201                    true
202                }
203            }));
204    }
205
206    /// Sends a message to the reducer to update its state.
207    ///
208    /// If the [`Reducible::reduce()`] method returns [`true`],
209    /// all subscribers will be notified.
210    pub fn emit(&self, input: Data::Input) {
211        assert!(
212            self.inner.sender.send(input).is_ok(),
213            "Reducer runtime was dropped. Maybe a subscriber or the update function panicked?"
214        );
215    }
216}
217
218#[cfg(test)]
219mod test {
220    use std::time::Duration;
221
222    use super::{Reducer, Reducible};
223
224    struct CounterReducer(u8);
225
226    enum CounterInput {
227        Increment,
228        Decrement,
229    }
230
231    impl Reducible for CounterReducer {
232        type Input = CounterInput;
233
234        fn init() -> Self {
235            Self(0)
236        }
237
238        fn reduce(&mut self, input: Self::Input) -> bool {
239            match input {
240                CounterInput::Increment => {
241                    self.0 += 1;
242                }
243                CounterInput::Decrement => {
244                    self.0 -= 1;
245                }
246            }
247            true
248        }
249    }
250
251    static REDUCER: Reducer<CounterReducer> = Reducer::new();
252
253    #[test]
254    fn shared_state() {
255        // Count up to 3 and wait for events to be processed.
256        REDUCER.emit(CounterInput::Increment);
257        REDUCER.emit(CounterInput::Increment);
258        REDUCER.emit(CounterInput::Increment);
259        std::thread::sleep(Duration::from_millis(10));
260
261        let (sender, receiver) = crate::channel();
262
263        REDUCER.subscribe(&sender, |data| data.0);
264
265        // Count up to 4 with receiver.
266        REDUCER.emit(CounterInput::Increment);
267        assert_eq!(receiver.recv_sync().unwrap(), 4);
268
269        // Count down to 3.
270        REDUCER.emit(CounterInput::Decrement);
271
272        assert_eq!(receiver.recv_sync().unwrap(), 3);
273    }
274}