ankurah_react_signals/
react_binding.rs1use std::{
2 cell::RefCell,
3 sync::{atomic::AtomicU64, Arc, RwLock},
4};
5
6use futures::StreamExt;
7use or_poisoned::OrPoisoned;
8use reactive_graph::graph::{AnySource, AnySubscriber, ReactiveNode, Subscriber};
9use std::sync::Weak;
10
11use wasm_bindgen::prelude::*;
12
13#[wasm_bindgen(module = "react")]
14extern "C" {
15 #[wasm_bindgen(catch)]
16 fn useRef() -> Result<JsValue, JsValue>;
17 #[wasm_bindgen(catch)]
18 fn useSyncExternalStore(
19 subscribe: &Closure<dyn Fn(js_sys::Function) -> JsValue>,
20 get_snapshot: &Closure<dyn Fn() -> JsValue>,
21 get_server_snapshot: &Closure<dyn Fn() -> JsValue>,
22 ) -> Result<JsValue, JsValue>;
23}
24
25#[wasm_bindgen(js_name = withSignals)]
62pub fn with_signals(f: &js_sys::Function) -> Result<JsValue, JsValue> {
63 let ref_value = useRef()?;
64
65 let mut store = js_sys::Reflect::get(&ref_value, &"current".into()).unwrap();
66 if store.is_undefined() {
67 let new_store = EffectStore::new();
68 useSyncExternalStore(&new_store.0.subscribe_fn, &new_store.0.get_snapshot, &new_store.0.get_snapshot)?;
69
70 store = JsValue::from(new_store.clone());
72 js_sys::Reflect::set(&ref_value, &"current".into(), &store).unwrap();
73 Ok(new_store.with_observer(f))
74 } else {
75 let ptr = js_sys::Reflect::get(&store, &JsValue::from_str("__wbg_ptr")).unwrap();
76 let store = {
77 let ptr_u32: u32 = ptr.as_f64().unwrap() as u32;
80 use wasm_bindgen::convert::RefFromWasmAbi;
81 unsafe { EffectStore::ref_from_abi(ptr_u32) }
82 };
83
84 useSyncExternalStore(&store.0.subscribe_fn, &store.0.get_snapshot, &store.0.get_snapshot)?;
85 Ok(store.with_observer(f))
86 }
87}
88
89#[derive(Clone)]
90#[wasm_bindgen]
91pub struct EffectStore(Arc<Inner>);
92
93struct Inner {
94 subscribe_fn: Closure<dyn Fn(js_sys::Function) -> JsValue>,
96 get_snapshot: Closure<dyn Fn() -> JsValue>,
98
99 tracker: Tracker,
101}
102
103thread_local! {
105 pub static CURRENT_STORE: RefCell<Option<EffectStore>> = const { RefCell::new(None) };
106}
107
108impl Default for EffectStore {
109 fn default() -> Self { Self::new() }
110}
111
112impl EffectStore {
113 pub fn new() -> Self {
114 let version = Arc::new(AtomicU64::new(0));
116 let (sender, rx) = crate::effect::channel::channel();
117 let tracker = Tracker(Arc::new(TrackerInner(RwLock::new(TrackerState {
118 dirty: false,
119 notifier: sender,
120 sources: Vec::new(),
121 version: version.clone(),
122 }))));
123 let rx = RefCell::new(Some(rx));
124 let subscribe_fn = {
125 let tracker = tracker.clone();
126 let version = version.clone();
127 Closure::wrap(Box::new(move |on_store_change: js_sys::Function| {
128 let Some(mut rx) = rx.borrow_mut().take() else {
129 return JsValue::UNDEFINED;
130 };
131
132 any_spawner::Executor::spawn_local({
133 let subscriber = tracker.to_any_subscriber();
135
136 async move {
137 while rx.next().await.is_some() {
138 if subscriber.update_if_necessary() {
141 subscriber.clear_sources(&subscriber);
142 on_store_change.call0(&JsValue::NULL).unwrap();
143 }
144 }
145 }
146 });
147
148 version.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
149
150 JsValue::UNDEFINED
152 }) as Box<dyn Fn(js_sys::Function) -> JsValue>)
153 };
154 let get_snapshot = {
155 let version = version.clone();
156
157 Closure::wrap(Box::new(move || {
158 let version = version.load(std::sync::atomic::Ordering::Relaxed);
159 JsValue::from(version)
160 }) as Box<dyn Fn() -> JsValue>)
161 };
162
163 Self(Arc::new(Inner { subscribe_fn, get_snapshot, tracker }))
164 }
165}
166
167#[wasm_bindgen]
168impl EffectStore {
169 pub fn with_observer(&self, f: &js_sys::Function) -> JsValue {
173 use reactive_graph::graph::WithObserver;
174 self.0.tracker.to_any_subscriber().with_observer(|| f.call0(&JsValue::NULL).unwrap_or(JsValue::UNDEFINED))
175 }
176}
177#[derive(Debug, Clone)]
178pub struct Tracker(Arc<TrackerInner>);
179
180#[derive(Debug)]
181pub struct TrackerInner(RwLock<TrackerState>);
182
183#[derive(Debug)]
184struct TrackerState {
185 dirty: bool,
186 notifier: crate::effect::channel::Sender,
187 sources: Vec<AnySource>,
188 version: Arc<AtomicU64>,
189}
190
191impl ReactiveNode for TrackerInner {
192 fn mark_subscribers_check(&self) {}
193
194 fn update_if_necessary(&self) -> bool {
195 let mut guard = self.0.write().or_poisoned();
196 let (is_dirty, sources) = (guard.dirty, (!guard.dirty).then(|| guard.sources.clone()));
197
198 if is_dirty {
199 guard.dirty = false;
200 return true;
201 }
202
203 drop(guard);
204 for source in sources.into_iter().flatten() {
205 if source.update_if_necessary() {
206 return true;
207 }
208 }
209 false
210 }
211
212 fn mark_check(&self) { self.0.write().or_poisoned().notifier.notify(); }
213
214 fn mark_dirty(&self) {
215 let mut lock = self.0.write().or_poisoned();
216 lock.dirty = true;
217 lock.version.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
218 lock.notifier.notify();
219 }
220}
221
222impl Subscriber for TrackerInner {
223 fn add_source(&self, source: AnySource) { self.0.write().or_poisoned().sources.push(source); }
224
225 fn clear_sources(&self, _subscriber: &AnySubscriber) { self.0.write().or_poisoned().sources.clear(); }
226}
227
228impl ToAnySubscriber for Tracker {
229 fn to_any_subscriber(&self) -> AnySubscriber {
230 AnySubscriber(Arc::as_ptr(&self.0) as usize, Arc::downgrade(&self.0) as Weak<dyn Subscriber + Send + Sync>)
231 }
232}
233
234pub trait ToAnySubscriber {
235 fn to_any_subscriber(&self) -> AnySubscriber;
237}