medea_reactive/field/
mod.rs

1//! Implementations of basic reactive containers.
2
3// TODO: Needs refactoring.
4#![expect(clippy::module_name_repetitions, reason = "needs refactoring")]
5
6pub mod cell;
7pub mod progressable_cell;
8
9use std::{
10    cell::RefCell,
11    fmt,
12    ops::{Deref, DerefMut},
13};
14
15use futures::{
16    channel::{mpsc, oneshot},
17    future::{self, LocalBoxFuture},
18    stream::{self, LocalBoxStream, StreamExt as _},
19};
20
21#[doc(inline)]
22pub use self::{cell::ObservableCell, progressable_cell::ProgressableCell};
23use crate::subscribers_store::{
24    SubscribersStore as _, progressable, progressable::Processed,
25};
26
27/// Default type of [`ObservableField`] subscribers.
28type DefaultSubscribers<D> = RefCell<Vec<UniversalSubscriber<D>>>;
29
30/// [`ObservableField`] that allows to subscribe to all changes
31/// ([`ObservableField::subscribe`]) and to concrete changes
32/// ([`ObservableField::when`] and [`ObservableField::when_eq`]).
33pub type Observable<D> = ObservableField<D, DefaultSubscribers<D>>;
34
35/// [`ObservableField`] that allows to subscribe to all changes
36/// ([`ObservableField::subscribe`]) and to concrete changes
37/// ([`ObservableField::when`] and [`ObservableField::when_eq`]).
38///
39/// Can recognise when all updates was processed by subscribers.
40pub type Progressable<D> = ObservableField<D, progressable::SubStore<D>>;
41
42/// Reactive cell which emits all modifications to its subscribers.
43///
44/// Subscribing to this field modifications is done with
45/// [`ObservableField::subscribe`] method.
46///
47/// If you want to get [`Future`] which will resolved only when an underlying
48/// data of this field will become equal to some value, you can use
49/// [`ObservableField::when`] or [`ObservableField::when_eq`] methods.
50#[derive(Debug)]
51pub struct ObservableField<D, S> {
52    /// Data which is stored by this [`ObservableField`].
53    data: D,
54
55    /// Subscribers to [`ObservableField`]'s data mutations.
56    subs: S,
57}
58
59impl<D> ObservableField<D, RefCell<Vec<UniversalSubscriber<D>>>>
60where
61    D: 'static,
62{
63    /// Returns new [`ObservableField`] with subscribable mutations.
64    ///
65    /// Also you can subscribe to concrete mutations with
66    /// [`ObservableField::when`] and [`ObservableField::when_eq`] methods.
67    #[must_use]
68    pub const fn new(data: D) -> Self {
69        Self { data, subs: RefCell::new(Vec::new()) }
70    }
71}
72
73impl<D, S> ObservableField<D, S>
74where
75    D: 'static,
76    S: Whenable<D>,
77{
78    /// Returns [`Future`] which will resolve only on modifications that
79    /// the given `assert_fn` returns `true` on.
80    pub fn when<F>(
81        &self,
82        assert_fn: F,
83    ) -> LocalBoxFuture<'static, Result<(), DroppedError>>
84    where
85        F: Fn(&D) -> bool + 'static,
86    {
87        // TODO: This is kinda broken.
88        //       See https://github.com/instrumentisto/medea/issues/163 issue.
89        if (assert_fn)(&self.data) {
90            Box::pin(future::ok(()))
91        } else {
92            self.subs.when(Box::new(assert_fn))
93        }
94    }
95}
96
97impl<D: 'static> Progressable<D> {
98    /// Returns new [`ObservableField`] with subscribable mutations.
99    ///
100    /// Also, you can wait for all updates processing by awaiting on
101    /// [`ObservableField::when_all_processed()`].
102    #[must_use]
103    pub fn new(data: D) -> Self {
104        Self { data, subs: progressable::SubStore::default() }
105    }
106}
107
108impl<D> Progressable<D>
109where
110    D: Clone + 'static,
111{
112    /// Returns [`Stream`] into which underlying data updates (wrapped in the
113    /// [`progressable::Guarded`]) will be emitted.
114    ///
115    /// [`Stream`]: futures::Stream
116    pub fn subscribe(
117        &self,
118    ) -> LocalBoxStream<'static, progressable::Guarded<D>> {
119        let data = self.subs.wrap(self.data.clone());
120        Box::pin(stream::once(async move { data }).chain(self.subs.subscribe()))
121    }
122
123    /// Returns [`Future`] resolving when all data updates will be processed by
124    /// subscribers.
125    pub fn when_all_processed(&self) -> Processed<'static> {
126        self.subs.when_all_processed()
127    }
128}
129
130impl<D> Observable<D>
131where
132    D: Clone + 'static,
133{
134    /// Returns [`Stream`] into which underlying data updates will be emitted.
135    ///
136    /// [`Stream`]: futures::Stream
137    pub fn subscribe(&self) -> LocalBoxStream<'static, D> {
138        let data = self.data.clone();
139        let (tx, rx) = mpsc::unbounded();
140        self.subs.borrow_mut().push(UniversalSubscriber::Subscribe(tx));
141
142        Box::pin(stream::once(async move { data }).chain(Box::pin(rx)))
143    }
144}
145
146impl<D, S> ObservableField<D, S>
147where
148    D: PartialEq + 'static,
149    S: Whenable<D>,
150{
151    /// Returns [`Future`] which will resolve only when an underlying data of
152    /// this [`ObservableField`] will become equal to the provided `should_be`
153    /// value.
154    // TODO: This is kinda broken.
155    //       See https://github.com/instrumentisto/medea/issues/163 issue.
156    pub fn when_eq(
157        &self,
158        should_be: D,
159    ) -> LocalBoxFuture<'static, Result<(), DroppedError>> {
160        self.when(move |data| data == &should_be)
161    }
162}
163
164impl<D, S> ObservableField<D, S>
165where
166    S: OnObservableFieldModification<D>,
167    D: Clone + PartialEq,
168{
169    /// Returns [`MutObservableFieldGuard`] which can be mutably dereferenced to
170    /// an underlying data.
171    ///
172    /// If some mutation of data happens between calling
173    /// [`ObservableField::borrow_mut`] and dropping of
174    /// [`MutObservableFieldGuard`], then all subscribers of this
175    /// [`ObservableField`] will be notified about this.
176    ///
177    /// Notification about mutation will be sent only if this field __really__
178    /// changed. This will be checked with [`PartialEq`] implementation of
179    /// underlying data.
180    pub fn borrow_mut(&mut self) -> MutObservableFieldGuard<'_, D, S> {
181        MutObservableFieldGuard {
182            value_before_mutation: self.data.clone(),
183            data: &mut self.data,
184            subs: &mut self.subs,
185        }
186    }
187}
188
189/// Abstraction over catching all unique modifications of an
190/// [`ObservableField`].
191pub trait OnObservableFieldModification<D> {
192    /// This function will be called on each [`ObservableField`]'s modification.
193    ///
194    /// On this function call subscriber (which implements
195    /// [`OnObservableFieldModification`]) should send an update to [`Stream`]
196    /// or resolve [`Future`].
197    ///
198    /// [`Stream`]: futures::Stream
199    fn on_modify(&mut self, data: &D);
200}
201
202/// Subscriber that implements subscribing and [`Whenable`] in [`Vec`].
203///
204/// This structure should be wrapped into [`Vec`].
205pub enum UniversalSubscriber<D> {
206    /// Subscriber for [`Whenable`].
207    When {
208        /// [`oneshot::Sender`] with which [`Whenable::when`]'s [`Future`] will
209        /// resolve.
210        sender: RefCell<Option<oneshot::Sender<()>>>,
211
212        /// Function with which will be checked that [`Whenable::when`]'s
213        /// [`Future`] should resolve.
214        assert_fn: Box<dyn Fn(&D) -> bool>,
215    },
216
217    /// Subscriber for data updates.
218    Subscribe(mpsc::UnboundedSender<D>),
219}
220
221impl<D> fmt::Debug for UniversalSubscriber<D> {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        match self {
224            Self::When { .. } => {
225                write!(f, "UniversalSubscriber::When")
226            }
227            Self::Subscribe(_) => {
228                write!(f, "UniversalSubscriber::Subscribe")
229            }
230        }
231    }
232}
233
234/// Error that is sent to all subscribers when this [`ObservableField`] /
235/// [`ObservableCell`] is dropped.
236#[derive(Clone, Copy, Debug)]
237pub struct DroppedError;
238
239impl fmt::Display for DroppedError {
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        write!(f, "Observable value has been dropped")
242    }
243}
244
245impl From<oneshot::Canceled> for DroppedError {
246    fn from(_: oneshot::Canceled) -> Self {
247        Self
248    }
249}
250
251/// Abstraction over [`ObservableField::when`] and [`ObservableField::when_eq`]
252/// implementations for custom types.
253pub trait Whenable<D: 'static> {
254    /// This function will be called on [`ObservableField::when`].
255    ///
256    /// Should return [`LocalBoxFuture`] to which will be sent `()` when
257    /// provided `assert_fn` returns `true`.
258    fn when(
259        &self,
260        assert_fn: Box<dyn Fn(&D) -> bool>,
261    ) -> LocalBoxFuture<'static, Result<(), DroppedError>>;
262}
263
264impl<D: 'static> Whenable<D> for RefCell<Vec<UniversalSubscriber<D>>> {
265    fn when(
266        &self,
267        assert_fn: Box<dyn Fn(&D) -> bool>,
268    ) -> LocalBoxFuture<'static, Result<(), DroppedError>> {
269        let (tx, rx) = oneshot::channel();
270        self.borrow_mut().push(UniversalSubscriber::When {
271            sender: RefCell::new(Some(tx)),
272            assert_fn,
273        });
274        Box::pin(async move { Ok(rx.await?) })
275    }
276}
277
278impl<D: Clone + 'static> OnObservableFieldModification<D>
279    for progressable::SubStore<D>
280{
281    fn on_modify(&mut self, data: &D) {
282        self.send_update(data.clone());
283    }
284}
285
286impl<D: Clone> OnObservableFieldModification<D>
287    for RefCell<Vec<UniversalSubscriber<D>>>
288{
289    fn on_modify(&mut self, data: &D) {
290        self.borrow_mut().retain(|sub| match sub {
291            UniversalSubscriber::When { assert_fn, sender } => {
292                #[expect(clippy::expect_used, reason = "single use expected")]
293                if (assert_fn)(data) {
294                    sender
295                        .borrow_mut()
296                        .take()
297                        .expect("`UniversalSubscriber::When` used already")
298                        .send(())
299                        .is_ok_and(|()| false)
300                } else {
301                    true
302                }
303            }
304            UniversalSubscriber::Subscribe(sender) => {
305                sender.unbounded_send(data.clone()).is_ok()
306            }
307        });
308    }
309}
310
311impl<D, S> Deref for ObservableField<D, S> {
312    type Target = D;
313
314    fn deref(&self) -> &Self::Target {
315        &self.data
316    }
317}
318
319impl<D, S> fmt::Display for ObservableField<D, S>
320where
321    D: fmt::Display,
322{
323    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
324        fmt::Display::fmt(&self.data, f)
325    }
326}
327
328/// Mutable [`ObservableField`] reference returned by
329/// [`ObservableField::borrow_mut`].
330///
331/// When this guard is [`Drop`]ped, a check for modifications will be performed.
332/// If data was changed, then [`OnObservableFieldModification::on_modify`] will
333/// be called.
334#[derive(Debug)]
335pub struct MutObservableFieldGuard<'a, D, S>
336where
337    S: OnObservableFieldModification<D>,
338    D: PartialEq,
339{
340    /// Data stored by this [`ObservableField`].
341    data: &'a mut D,
342
343    /// Subscribers to [`ObservableField`]'s data mutations.
344    subs: &'a mut S,
345
346    /// Data stored by this [`ObservableField`] before mutation.
347    value_before_mutation: D,
348}
349
350impl<D, S> Deref for MutObservableFieldGuard<'_, D, S>
351where
352    S: OnObservableFieldModification<D>,
353    D: PartialEq,
354{
355    type Target = D;
356
357    fn deref(&self) -> &Self::Target {
358        self.data
359    }
360}
361
362impl<D, S> DerefMut for MutObservableFieldGuard<'_, D, S>
363where
364    S: OnObservableFieldModification<D>,
365    D: PartialEq,
366{
367    fn deref_mut(&mut self) -> &mut Self::Target {
368        self.data
369    }
370}
371
372impl<D, S> Drop for MutObservableFieldGuard<'_, D, S>
373where
374    S: OnObservableFieldModification<D>,
375    D: PartialEq,
376{
377    fn drop(&mut self) {
378        if self.data != &self.value_before_mutation {
379            self.subs.on_modify(self.data);
380        }
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use std::{cell::RefCell, time::Duration};
387
388    use futures::{StreamExt as _, poll, task::Poll};
389    use tokio::time::timeout;
390
391    use crate::{Observable, Progressable};
392
393    #[tokio::test]
394    async fn subscriber_receives_current_data() {
395        let field = Observable::new(9);
396        let current_data = field.subscribe().next().await.unwrap();
397        assert_eq!(current_data, 9);
398    }
399
400    #[tokio::test]
401    async fn when_eq_resolves_if_value_eq_already() {
402        let field = Observable::new(9);
403        field.when_eq(9).await.unwrap();
404    }
405
406    #[tokio::test]
407    async fn when_eq_doesnt_resolve_if_value_is_not_eq() {
408        let field = Observable::new(9);
409        let _ = timeout(Duration::from_millis(50), field.when_eq(0))
410            .await
411            .unwrap_err();
412    }
413
414    #[tokio::test]
415    async fn current_value_is_provided_into_assert_fn_on_when_call() {
416        let field = Observable::new(9);
417
418        timeout(Duration::from_millis(50), field.when(|val| val == &9))
419            .await
420            .unwrap()
421            .unwrap();
422    }
423
424    #[tokio::test]
425    async fn value_updates_are_sent_to_subs() {
426        let mut field = Observable::new(0);
427        let mut subscription_on_changes = field.subscribe();
428
429        for _ in 0..100 {
430            *field.borrow_mut() += 1;
431        }
432        loop {
433            if let Some(change) = subscription_on_changes.next().await {
434                if change == 100 {
435                    break;
436                }
437            } else {
438                panic!("Stream ended too early!");
439            }
440        }
441    }
442
443    #[tokio::test]
444    async fn when_resolves_on_value_update() {
445        let mut field = Observable::new(0);
446        let subscription = field.when(|change| change == &100);
447
448        for _ in 0..100 {
449            *field.borrow_mut() += 1;
450        }
451
452        timeout(Duration::from_millis(50), subscription)
453            .await
454            .unwrap()
455            .unwrap();
456    }
457
458    #[tokio::test]
459    async fn when_eq_resolves_on_value_update() {
460        let mut field = Observable::new(0);
461        let subscription = field.when_eq(100);
462
463        for _ in 0..100 {
464            *field.borrow_mut() += 1;
465        }
466
467        timeout(Duration::from_millis(50), subscription)
468            .await
469            .unwrap()
470            .unwrap();
471    }
472
473    #[tokio::test]
474    async fn when_returns_dropped_error_on_drop() {
475        let field = Observable::new(0);
476        let subscription = field.when(|change| change == &100);
477        drop(field);
478        let _ = subscription.await.unwrap_err();
479    }
480
481    #[tokio::test]
482    async fn when_eq_returns_dropped_error_on_drop() {
483        let field = Observable::new(0);
484        let subscription = field.when_eq(100);
485        drop(field);
486        let _ = subscription.await.unwrap_err();
487    }
488
489    #[tokio::test]
490    async fn stream_ends_when_reactive_field_is_dropped() {
491        let field = Observable::new(0);
492        let subscription = field.subscribe();
493        drop(field);
494        assert!(subscription.skip(1).next().await.is_none());
495    }
496
497    #[tokio::test]
498    async fn no_update_should_be_emitted_on_field_mutation() {
499        let mut field = Observable::new(0);
500        let subscription = field.subscribe();
501        *field.borrow_mut() = 0;
502        let _ = timeout(
503            Duration::from_millis(50),
504            Box::pin(subscription.skip(1).next()),
505        )
506        .await
507        .unwrap_err();
508    }
509
510    #[tokio::test]
511    async fn only_last_update_should_be_sent_to_subscribers() {
512        let mut field = Observable::new(0);
513        let subscription = field.subscribe();
514        let mut field_mut_guard = field.borrow_mut();
515        *field_mut_guard = 100;
516        *field_mut_guard = 200;
517        *field_mut_guard = 300;
518        drop(field_mut_guard);
519        assert_eq!(subscription.skip(1).next().await.unwrap(), 300);
520    }
521
522    #[tokio::test]
523    async fn reactive_with_refcell_inside() {
524        let field = RefCell::new(Observable::new(0));
525        let subscription = field.borrow().when_eq(1);
526        *field.borrow_mut().borrow_mut() = 1;
527        timeout(Duration::from_millis(50), Box::pin(subscription))
528            .await
529            .unwrap()
530            .unwrap();
531    }
532
533    #[tokio::test]
534    async fn when_all_processed_works() {
535        let mut field = Progressable::new(1);
536        assert_eq!(poll!(field.when_all_processed()), Poll::Ready(()));
537        *field.borrow_mut() = 2;
538        assert_eq!(poll!(field.when_all_processed()), Poll::Ready(()));
539
540        let mut subscribe = field.subscribe();
541        assert_eq!(poll!(field.when_all_processed()), Poll::Pending);
542
543        assert_eq!(subscribe.next().await.unwrap().into_inner(), 2);
544        *field.borrow_mut() = 3;
545        assert_eq!(poll!(field.when_all_processed()), Poll::Pending);
546        assert_eq!(subscribe.next().await.unwrap().into_inner(), 3);
547        assert_eq!(poll!(field.when_all_processed()), Poll::Ready(()));
548    }
549}