reactive_graph/effect/
inner.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use crate::{
    channel::Sender,
    graph::{
        AnySource, AnySubscriber, ReactiveNode, SourceSet, Subscriber,
        ToAnySubscriber,
    },
};
use or_poisoned::OrPoisoned;
use std::sync::{Arc, RwLock, Weak};

/// Handles internal subscription logic for effects.
#[derive(Debug)]
pub struct EffectInner {
    pub(crate) dirty: bool,
    pub(crate) observer: Sender,
    pub(crate) sources: SourceSet,
}

impl ToAnySubscriber for Arc<RwLock<EffectInner>> {
    fn to_any_subscriber(&self) -> AnySubscriber {
        AnySubscriber(
            Arc::as_ptr(self) as usize,
            Arc::downgrade(self) as Weak<dyn Subscriber + Send + Sync>,
        )
    }
}

impl ReactiveNode for RwLock<EffectInner> {
    fn mark_subscribers_check(&self) {}

    fn update_if_necessary(&self) -> bool {
        let mut guard = self.write().or_poisoned();
        let (is_dirty, sources) =
            (guard.dirty, (!guard.dirty).then(|| guard.sources.clone()));

        if is_dirty {
            guard.dirty = false;
            return true;
        }

        drop(guard);
        for source in sources.into_iter().flatten() {
            if source.update_if_necessary() {
                return true;
            }
        }
        false
    }

    fn mark_check(&self) {
        self.write().or_poisoned().observer.notify()
    }

    fn mark_dirty(&self) {
        let mut lock = self.write().or_poisoned();
        lock.dirty = true;
        lock.observer.notify()
    }
}

impl Subscriber for RwLock<EffectInner> {
    fn add_source(&self, source: AnySource) {
        self.write().or_poisoned().sources.insert(source);
    }

    fn clear_sources(&self, subscriber: &AnySubscriber) {
        self.write().or_poisoned().sources.clear_sources(subscriber);
    }
}