reactive_graph/computed/async_derived/
inner.rs

1use super::suspense::TaskHandle;
2use crate::{
3    channel::Sender,
4    computed::suspense::SuspenseContext,
5    graph::{
6        AnySource, AnySubscriber, ReactiveNode, Source, SourceSet, Subscriber,
7        SubscriberSet,
8    },
9    owner::Owner,
10};
11use or_poisoned::OrPoisoned;
12use std::sync::RwLock;
13
14pub(crate) struct ArcAsyncDerivedInner {
15    pub owner: Owner,
16    // holds subscribers so the dependency can be cleared when this needs to rerun
17    pub sources: SourceSet,
18    // tracks reactive subscribers so they can be notified
19    // when the new async value is ready
20    pub subscribers: SubscriberSet,
21    // when a source changes, notifying this will cause the async work to rerun
22    pub notifier: Sender,
23    pub state: AsyncDerivedState,
24    pub version: usize,
25    pub suspenses: Vec<SuspenseContext>,
26    pub pending_suspenses: Vec<TaskHandle>,
27}
28
29#[derive(Debug, PartialEq, Eq)]
30pub(crate) enum AsyncDerivedState {
31    Clean,
32    Dirty,
33    Notifying,
34}
35
36impl ReactiveNode for RwLock<ArcAsyncDerivedInner> {
37    fn mark_dirty(&self) {
38        let mut lock = self.write().or_poisoned();
39        if lock.state != AsyncDerivedState::Notifying {
40            lock.state = AsyncDerivedState::Dirty;
41            lock.notifier.notify();
42        }
43    }
44
45    fn mark_check(&self) {
46        let mut lock = self.write().or_poisoned();
47        if lock.state != AsyncDerivedState::Notifying {
48            lock.notifier.notify();
49        }
50    }
51
52    fn mark_subscribers_check(&self) {
53        let lock = self.read().or_poisoned();
54        for sub in (&lock.subscribers).into_iter() {
55            sub.mark_check();
56        }
57    }
58
59    fn update_if_necessary(&self) -> bool {
60        let mut guard = self.write().or_poisoned();
61        let (is_dirty, sources) = (
62            guard.state == AsyncDerivedState::Dirty,
63            (guard.state != AsyncDerivedState::Notifying)
64                .then(|| guard.sources.clone()),
65        );
66
67        if is_dirty {
68            guard.state = AsyncDerivedState::Clean;
69            return true;
70        }
71        drop(guard);
72
73        for source in sources.into_iter().flatten() {
74            if source.update_if_necessary() {
75                return true;
76            }
77        }
78        false
79    }
80}
81
82impl Source for RwLock<ArcAsyncDerivedInner> {
83    fn add_subscriber(&self, subscriber: AnySubscriber) {
84        self.write().or_poisoned().subscribers.subscribe(subscriber);
85    }
86
87    fn remove_subscriber(&self, subscriber: &AnySubscriber) {
88        self.write()
89            .or_poisoned()
90            .subscribers
91            .unsubscribe(subscriber);
92    }
93
94    fn clear_subscribers(&self) {
95        self.write().or_poisoned().subscribers.take();
96    }
97}
98
99impl Subscriber for RwLock<ArcAsyncDerivedInner> {
100    fn add_source(&self, source: AnySource) {
101        self.write().or_poisoned().sources.insert(source);
102    }
103
104    fn clear_sources(&self, subscriber: &AnySubscriber) {
105        self.write().or_poisoned().sources.clear_sources(subscriber);
106    }
107}