1use std::cell::{Cell, RefCell};
2use std::rc::Rc;
3use std::time::Duration;
4
5use dioxus::dioxus_core::use_drop;
6use dioxus::prelude::*;
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9
10use crate::{
11 ConnectionState, JobExecutionState, Mutation, QueryState, StreamEvent, SubscriptionHandle,
12 SubscriptionState, WorkflowExecutionState, use_forge_client,
13};
14use crate::types::{OptimisticMutation, PendingOptimistic};
15
16pub(crate) async fn sleep(duration: Duration) {
17 #[cfg(target_arch = "wasm32")]
18 {
19 gloo_timers::future::sleep(duration).await;
20 }
21
22 #[cfg(not(target_arch = "wasm32"))]
23 {
24 tokio::time::sleep(duration).await;
25 }
26}
27
28#[derive(Debug, Clone, serde::Deserialize)]
29struct JobStartResponse {
30 job_id: String,
31}
32
33#[derive(Debug, Clone, serde::Deserialize)]
34struct WorkflowStartResponse {
35 workflow_id: String,
36}
37
38pub fn use_forge_query_signal<TArgs, TResult>(
39 function_name: &'static str,
40 args: TArgs,
41) -> Signal<QueryState<TResult>>
42where
43 TArgs: Serialize + Clone + PartialEq + 'static,
44 TResult: DeserializeOwned + Clone + 'static,
45{
46 let client = use_forge_client();
47 let state = use_signal(QueryState::<TResult>::default);
48 let request_id = use_hook(|| Rc::new(Cell::new(0_u64)));
49
50 use_effect(use_reactive!(|(args,)| {
51 let client = client.clone();
52 let mut state = state;
53 let request_id = request_id.clone();
54 let current_id = request_id.get() + 1;
55 request_id.set(current_id);
56
57 state.set(QueryState {
58 loading: true,
59 data: None,
60 error: None,
61 });
62
63 spawn(async move {
64 match client.call::<_, TResult>(function_name, args).await {
65 Ok(data) if request_id.get() == current_id => {
66 state.set(QueryState {
67 loading: false,
68 data: Some(data),
69 error: None,
70 });
71 }
72 Err(err) if request_id.get() == current_id => {
73 state.set(QueryState {
74 loading: false,
75 data: None,
76 error: Some(err.as_forge_error()),
77 });
78 }
79 _ => {}
80 }
81 });
82 }));
83
84 state
85}
86
87pub fn use_forge_query<TArgs, TResult>(
88 function_name: &'static str,
89 args: TArgs,
90) -> QueryState<TResult>
91where
92 TArgs: Serialize + Clone + PartialEq + 'static,
93 TResult: DeserializeOwned + Clone + 'static,
94{
95 use_forge_query_signal(function_name, args)()
96}
97
98pub fn use_forge_subscription_signal<TArgs, TResult>(
99 function_name: &'static str,
100 args: TArgs,
101) -> Signal<SubscriptionState<TResult>>
102where
103 TArgs: Serialize + Clone + PartialEq + 'static,
104 TResult: DeserializeOwned + Clone + 'static,
105{
106 let client = use_forge_client();
107 let state = use_signal(SubscriptionState::<TResult>::default);
108 let handle = use_hook(|| Rc::new(RefCell::new(None::<SubscriptionHandle>)));
109 let generation = use_hook(|| Rc::new(Cell::new(0_u64)));
110 let pending_data = use_hook(|| Rc::new(RefCell::new(None::<TResult>)));
111 let flush_scheduled = use_hook(|| Rc::new(Cell::new(false)));
112 let has_received_data = use_hook(|| Rc::new(Cell::new(false)));
113 let reconnect_attempts = use_hook(|| Rc::new(Cell::new(0_u32)));
114 let reconnect_nonce = use_signal(|| 0_u64);
115 let effect_handle = handle.clone();
116 let reconnect_key = reconnect_nonce();
117
118 use_effect(use_reactive!(|(args, reconnect_key)| {
119 let is_reconnect = reconnect_key > 0;
120 let current_generation = generation.get() + 1;
121 generation.set(current_generation);
122
123 if let Some(existing) = effect_handle.borrow_mut().take() {
124 existing.close();
125 }
126
127 let mut state = state;
128 let previous = state.peek().clone();
129 pending_data.borrow_mut().take();
130 flush_scheduled.set(false);
131 let had_data = previous.data.is_some();
132 has_received_data.set(had_data);
133
134 if !is_reconnect {
137 state.set(SubscriptionState {
138 loading: !had_data,
139 data: previous.data,
140 error: None,
141 stale: had_data,
142 connection_state: ConnectionState::Connecting,
143 });
144 }
145
146 let reconnect_generation = generation.clone();
147 let reconnect_attempts = reconnect_attempts.clone();
148 if !is_reconnect {
149 reconnect_attempts.set(0);
150 }
151 let pending_data = pending_data.clone();
152 let flush_scheduled = flush_scheduled.clone();
153 let has_received_data = has_received_data.clone();
154
155 let subscription = client.subscribe_query(function_name, args, move |event| match event {
156 StreamEvent::Connection(connection_state) => {
157 if connection_state == ConnectionState::Connected {
158 reconnect_attempts.set(0);
159 let mut next = state.peek().clone();
160 next.connection_state = ConnectionState::Connected;
161 next.stale = false;
162 state.set(next);
163 }
164
165 if connection_state == ConnectionState::Disconnected
166 && reconnect_generation.get() == current_generation
167 {
168 let attempts = reconnect_attempts.get();
169 if attempts >= 10 {
170 let mut next = state.peek().clone();
172 next.loading = false;
173 next.connection_state = ConnectionState::Disconnected;
174 next.stale = true;
175 state.set(next);
176 return;
177 }
178 reconnect_attempts.set(attempts + 1);
179 let delay = 1000 * (1 << attempts.min(4));
180 let mut reconnect_nonce = reconnect_nonce;
181 spawn(async move {
182 sleep(Duration::from_millis(delay as u64)).await;
183 reconnect_nonce += 1;
184 });
185 }
186 }
187 StreamEvent::Data(data) => {
188 if !has_received_data.replace(true) {
189 let conn = state.peek().connection_state;
190 state.set(SubscriptionState {
191 loading: false,
192 data: Some(data),
193 error: None,
194 stale: false,
195 connection_state: conn,
196 });
197 return;
198 }
199
200 *pending_data.borrow_mut() = Some(data);
201 if flush_scheduled.replace(true) {
202 return;
203 }
204
205 let pending_data = pending_data.clone();
206 let flush_scheduled = flush_scheduled.clone();
207 let mut state = state;
208 spawn(async move {
209 sleep(Duration::from_millis(120)).await;
210 flush_scheduled.set(false);
211
212 let Some(data) = pending_data.borrow_mut().take() else {
213 return;
214 };
215
216 let conn = state.peek().connection_state;
217 state.set(SubscriptionState {
218 loading: false,
219 data: Some(data),
220 error: None,
221 stale: false,
222 connection_state: conn,
223 });
224 });
225 }
226 StreamEvent::Error(err) => {
227 let attempts = reconnect_attempts.get();
230 if attempts > 0 && attempts < 10 {
231 return;
232 }
233 let mut next = state.peek().clone();
234 next.loading = false;
235 next.error = Some(err.as_forge_error());
236 next.stale = true;
237 state.set(next);
238 }
239 });
240
241 *effect_handle.borrow_mut() = Some(subscription);
242 }));
243
244 use_drop({
245 let handle = handle.clone();
246 move || {
247 if let Some(existing) = handle.borrow_mut().take() {
248 existing.close();
249 }
250 }
251 });
252
253 state
254}
255
256pub fn use_forge_subscription<TArgs, TResult>(
257 function_name: &'static str,
258 args: TArgs,
259) -> SubscriptionState<TResult>
260where
261 TArgs: Serialize + Clone + PartialEq + 'static,
262 TResult: DeserializeOwned + Clone + 'static,
263{
264 use_forge_subscription_signal(function_name, args)()
265}
266
267pub fn use_forge_mutation<TArgs, TResult>(
268 function_name: &'static str,
269) -> Mutation<TArgs, TResult>
270where
271 TArgs: Serialize + 'static,
272 TResult: DeserializeOwned + 'static,
273{
274 let client = use_forge_client();
275 Mutation::new(client, function_name)
276}
277
278pub fn use_forge_job_signal<TArgs, TResult>(
279 function_name: &'static str,
280 args: TArgs,
281) -> Signal<JobExecutionState<TResult>>
282where
283 TArgs: Serialize + Clone + PartialEq + 'static,
284 TResult: DeserializeOwned + Clone + 'static,
285{
286 let client = use_forge_client();
287 let state = use_signal(JobExecutionState::<TResult>::default);
288 let handle = use_hook(|| Rc::new(RefCell::new(None::<SubscriptionHandle>)));
289 let effect_handle = handle.clone();
290
291 use_effect(use_reactive!(|(args,)| {
292 if let Some(existing) = effect_handle.borrow_mut().take() {
293 existing.close();
294 }
295
296 let client = client.clone();
297 let handle = effect_handle.clone();
298 let mut state = state;
299 state.set(JobExecutionState::default());
300
301 spawn(async move {
302 match client
303 .call::<_, JobStartResponse>(function_name, args)
304 .await
305 {
306 Ok(started) => {
307 let subscription =
308 client.subscribe_job(started.job_id.clone(), move |event| match event {
309 StreamEvent::Connection(connection_state) => {
310 let mut next = state.peek().clone();
311 next.connection_state = connection_state;
312 state.set(next);
313 }
314 StreamEvent::Data(job_state) => {
315 let conn = state.peek().connection_state;
316 state.set(JobExecutionState {
317 loading: false,
318 connection_state: conn,
319 state: job_state,
320 });
321 }
322 StreamEvent::Error(err) => {
323 let mut next = state.peek().clone();
324 next.loading = false;
325 next.state.error = Some(err.message);
326 state.set(next);
327 }
328 });
329 *handle.borrow_mut() = Some(subscription);
330 }
331 Err(err) => {
332 let mut next = state.peek().clone();
333 next.loading = false;
334 next.state.error = Some(err.message);
335 state.set(next);
336 }
337 }
338 });
339 }));
340
341 use_drop({
342 let handle = handle.clone();
343 move || {
344 if let Some(existing) = handle.borrow_mut().take() {
345 existing.close();
346 }
347 }
348 });
349
350 state
351}
352
353pub fn use_forge_job<TArgs, TResult>(
354 function_name: &'static str,
355 args: TArgs,
356) -> JobExecutionState<TResult>
357where
358 TArgs: Serialize + Clone + PartialEq + 'static,
359 TResult: DeserializeOwned + Clone + 'static,
360{
361 use_forge_job_signal(function_name, args)()
362}
363
364pub fn use_forge_workflow_signal<TArgs, TResult>(
365 function_name: &'static str,
366 args: TArgs,
367) -> Signal<WorkflowExecutionState<TResult>>
368where
369 TArgs: Serialize + Clone + PartialEq + 'static,
370 TResult: DeserializeOwned + Clone + 'static,
371{
372 let client = use_forge_client();
373 let state = use_signal(WorkflowExecutionState::<TResult>::default);
374 let handle = use_hook(|| Rc::new(RefCell::new(None::<SubscriptionHandle>)));
375 let effect_handle = handle.clone();
376
377 use_effect(use_reactive!(|(args,)| {
378 if let Some(existing) = effect_handle.borrow_mut().take() {
379 existing.close();
380 }
381
382 let client = client.clone();
383 let handle = effect_handle.clone();
384 let mut state = state;
385 state.set(WorkflowExecutionState::default());
386
387 spawn(async move {
388 match client
389 .call::<_, WorkflowStartResponse>(function_name, args)
390 .await
391 {
392 Ok(started) => {
393 let subscription =
394 client.subscribe_workflow(started.workflow_id.clone(), move |event| {
395 match event {
396 StreamEvent::Connection(connection_state) => {
397 let mut next = state.peek().clone();
398 next.connection_state = connection_state;
399 state.set(next);
400 }
401 StreamEvent::Data(workflow_state) => {
402 let conn = state.peek().connection_state;
403 state.set(WorkflowExecutionState {
404 loading: false,
405 connection_state: conn,
406 state: workflow_state,
407 });
408 }
409 StreamEvent::Error(err) => {
410 let mut next = state.peek().clone();
411 next.loading = false;
412 next.state.error = Some(err.message);
413 state.set(next);
414 }
415 }
416 });
417 *handle.borrow_mut() = Some(subscription);
418 }
419 Err(err) => {
420 let mut next = state.peek().clone();
421 next.loading = false;
422 next.state.error = Some(err.message);
423 state.set(next);
424 }
425 }
426 });
427 }));
428
429 use_drop({
430 let handle = handle.clone();
431 move || {
432 if let Some(existing) = handle.borrow_mut().take() {
433 existing.close();
434 }
435 }
436 });
437
438 state
439}
440
441pub fn use_forge_workflow<TArgs, TResult>(
442 function_name: &'static str,
443 args: TArgs,
444) -> WorkflowExecutionState<TResult>
445where
446 TArgs: Serialize + Clone + PartialEq + 'static,
447 TResult: DeserializeOwned + Clone + 'static,
448{
449 use_forge_workflow_signal(function_name, args)()
450}
451
452pub fn use_optimistic<A, R, D>(
472 mutation: Mutation<A, R>,
473 subscription: Signal<SubscriptionState<D>>,
474 apply: impl Fn(&D, &A) -> D + 'static,
475) -> OptimisticMutation<A, R, D>
476where
477 A: Serialize + Clone + 'static,
478 R: DeserializeOwned + 'static,
479 D: Clone + PartialEq + 'static,
480{
481 let mut view: Signal<Option<D>> = use_signal(|| subscription.read().data.clone());
482 let mut pending: Signal<Option<PendingOptimistic<D>>> = use_signal(|| None);
483 let apply = use_hook(|| Rc::new(apply));
484
485 use_effect(move || {
488 let sub_data = subscription.read().data.clone();
489 if pending.read().is_some() {
490 pending.set(None);
493 }
494 view.set(sub_data);
495 });
496
497 OptimisticMutation {
498 mutation,
499 view,
500 apply,
501 subscription,
502 pending,
503 }
504}
505