reactive_graph/computed/async_derived/
inner.rs1use super::suspense::TaskHandle;
2use crate::{
3 channel::Sender,
4 computed::suspense::SuspenseContext,
5 graph::{
6 AnySource, AnySubscriber, ReactiveNode, Source, SourceSet, Subscriber,
7 SubscriberSet,
8 },
9 owner::Owner,
10};
11use or_poisoned::OrPoisoned;
12use std::sync::RwLock;
13
14pub(crate) struct ArcAsyncDerivedInner {
15 pub owner: Owner,
16 pub sources: SourceSet,
18 pub subscribers: SubscriberSet,
21 pub notifier: Sender,
23 pub state: AsyncDerivedState,
24 pub version: usize,
25 pub suspenses: Vec<SuspenseContext>,
26 pub pending_suspenses: Vec<TaskHandle>,
27}
28
29#[derive(Debug, PartialEq, Eq)]
30pub(crate) enum AsyncDerivedState {
31 Clean,
32 Dirty,
33 Notifying,
34}
35
36impl ReactiveNode for RwLock<ArcAsyncDerivedInner> {
37 fn mark_dirty(&self) {
38 let mut lock = self.write().or_poisoned();
39 if lock.state != AsyncDerivedState::Notifying {
40 lock.state = AsyncDerivedState::Dirty;
41 lock.notifier.notify();
42 }
43 }
44
45 fn mark_check(&self) {
46 let mut lock = self.write().or_poisoned();
47 if lock.state != AsyncDerivedState::Notifying {
48 lock.notifier.notify();
49 }
50 }
51
52 fn mark_subscribers_check(&self) {
53 let lock = self.read().or_poisoned();
54 for sub in (&lock.subscribers).into_iter() {
55 sub.mark_check();
56 }
57 }
58
59 fn update_if_necessary(&self) -> bool {
60 let mut guard = self.write().or_poisoned();
61 let (is_dirty, sources) = (
62 guard.state == AsyncDerivedState::Dirty,
63 (guard.state != AsyncDerivedState::Notifying)
64 .then(|| guard.sources.clone()),
65 );
66
67 if is_dirty {
68 guard.state = AsyncDerivedState::Clean;
69 return true;
70 }
71 drop(guard);
72
73 for source in sources.into_iter().flatten() {
74 if source.update_if_necessary() {
75 return true;
76 }
77 }
78 false
79 }
80}
81
82impl Source for RwLock<ArcAsyncDerivedInner> {
83 fn add_subscriber(&self, subscriber: AnySubscriber) {
84 self.write().or_poisoned().subscribers.subscribe(subscriber);
85 }
86
87 fn remove_subscriber(&self, subscriber: &AnySubscriber) {
88 self.write()
89 .or_poisoned()
90 .subscribers
91 .unsubscribe(subscriber);
92 }
93
94 fn clear_subscribers(&self) {
95 self.write().or_poisoned().subscribers.take();
96 }
97}
98
99impl Subscriber for RwLock<ArcAsyncDerivedInner> {
100 fn add_source(&self, source: AnySource) {
101 self.write().or_poisoned().sources.insert(source);
102 }
103
104 fn clear_sources(&self, subscriber: &AnySubscriber) {
105 self.write().or_poisoned().sources.clear_sources(subscriber);
106 }
107}