reactive_graph/signal/
subscriber_traits.rs1use crate::{
12 graph::{
13 AnySource, AnySubscriber, ReactiveNode, Source, SubscriberSet,
14 ToAnySource,
15 },
16 traits::{DefinedAt, IsDisposed},
17 unwrap_signal,
18};
19use or_poisoned::OrPoisoned;
20use std::{
21 borrow::Borrow,
22 sync::{Arc, RwLock, Weak},
23};
24
25pub(crate) trait AsSubscriberSet {
26 type Output: Borrow<RwLock<SubscriberSet>>;
27
28 fn as_subscriber_set(&self) -> Option<Self::Output>;
29}
30
31impl<'a> AsSubscriberSet for &'a RwLock<SubscriberSet> {
32 type Output = &'a RwLock<SubscriberSet>;
33
34 #[inline(always)]
35 fn as_subscriber_set(&self) -> Option<Self::Output> {
36 Some(self)
37 }
38}
39
40impl DefinedAt for RwLock<SubscriberSet> {
41 fn defined_at(&self) -> Option<&'static std::panic::Location<'static>> {
42 None
43 }
44}
45
46impl<T: AsSubscriberSet + DefinedAt> ReactiveNode for T {
50 fn mark_dirty(&self) {
51 self.mark_subscribers_check();
52 }
53
54 fn mark_check(&self) {}
55
56 fn mark_subscribers_check(&self) {
57 if let Some(inner) = self.as_subscriber_set() {
58 let subs = inner.borrow().read().unwrap().clone();
59 for sub in subs {
60 sub.mark_dirty();
61 }
62 }
63 }
64
65 fn update_if_necessary(&self) -> bool {
66 false
73 }
74}
75
76impl<T: AsSubscriberSet + DefinedAt> Source for T {
77 fn clear_subscribers(&self) {
78 if let Some(inner) = self.as_subscriber_set() {
79 inner.borrow().write().unwrap().take();
80 }
81 }
82
83 fn add_subscriber(&self, subscriber: AnySubscriber) {
84 if let Some(inner) = self.as_subscriber_set() {
85 inner.borrow().write().unwrap().subscribe(subscriber)
86 }
87 }
88
89 fn remove_subscriber(&self, subscriber: &AnySubscriber) {
90 if let Some(inner) = self.as_subscriber_set() {
91 inner.borrow().write().unwrap().unsubscribe(subscriber)
92 }
93 }
94}
95
96impl<T: AsSubscriberSet + DefinedAt + IsDisposed> ToAnySource for T
97where
98 T::Output: Borrow<Arc<RwLock<SubscriberSet>>>,
99{
100 #[track_caller]
101 fn to_any_source(&self) -> AnySource {
102 self.as_subscriber_set()
103 .map(|subs| {
104 let subs = subs.borrow();
105 AnySource(
106 Arc::as_ptr(subs) as usize,
107 Arc::downgrade(subs) as Weak<dyn Source + Send + Sync>,
108 #[cfg(any(debug_assertions, leptos_debuginfo))]
109 self.defined_at().expect("no DefinedAt in debug mode"),
110 )
111 })
112 .unwrap_or_else(unwrap_signal!(self))
113 }
114}
115
116impl ReactiveNode for RwLock<SubscriberSet> {
117 fn mark_dirty(&self) {
118 self.mark_subscribers_check();
119 }
120
121 fn mark_check(&self) {}
122
123 fn mark_subscribers_check(&self) {
124 let subs = self.write().unwrap().take();
125 for sub in subs {
126 sub.mark_dirty();
127 }
128 }
129
130 fn update_if_necessary(&self) -> bool {
131 false
138 }
139}
140
141impl Source for RwLock<SubscriberSet> {
142 fn clear_subscribers(&self) {
143 self.write().or_poisoned().take();
144 }
145
146 fn add_subscriber(&self, subscriber: AnySubscriber) {
147 self.write().or_poisoned().subscribe(subscriber)
148 }
149
150 fn remove_subscriber(&self, subscriber: &AnySubscriber) {
151 self.write().or_poisoned().unsubscribe(subscriber)
152 }
153}