Skip to main content

forge_dioxus/
hooks.rs

1use std::cell::{Cell, RefCell};
2use std::rc::Rc;
3use std::time::Duration;
4
5use dioxus::dioxus_core::use_drop;
6use dioxus::prelude::*;
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9
10use crate::{
11    ConnectionState, JobExecutionState, Mutation, QueryState, StreamEvent, SubscriptionHandle,
12    SubscriptionState, WorkflowExecutionState, use_forge_client,
13};
14use crate::types::{OptimisticMutation, PendingOptimistic};
15
16pub(crate) async fn sleep(duration: Duration) {
17    #[cfg(target_arch = "wasm32")]
18    {
19        gloo_timers::future::sleep(duration).await;
20    }
21
22    #[cfg(not(target_arch = "wasm32"))]
23    {
24        tokio::time::sleep(duration).await;
25    }
26}
27
28#[derive(Debug, Clone, serde::Deserialize)]
29struct JobStartResponse {
30    job_id: String,
31}
32
33#[derive(Debug, Clone, serde::Deserialize)]
34struct WorkflowStartResponse {
35    workflow_id: String,
36}
37
38pub fn use_forge_query_signal<TArgs, TResult>(
39    function_name: &'static str,
40    args: TArgs,
41) -> Signal<QueryState<TResult>>
42where
43    TArgs: Serialize + Clone + PartialEq + 'static,
44    TResult: DeserializeOwned + Clone + 'static,
45{
46    let client = use_forge_client();
47    let state = use_signal(QueryState::<TResult>::default);
48    let request_id = use_hook(|| Rc::new(Cell::new(0_u64)));
49
50    use_effect(use_reactive!(|(args,)| {
51        let client = client.clone();
52        let mut state = state;
53        let request_id = request_id.clone();
54        let current_id = request_id.get() + 1;
55        request_id.set(current_id);
56
57        state.set(QueryState {
58            loading: true,
59            data: None,
60            error: None,
61        });
62
63        spawn(async move {
64            match client.call::<_, TResult>(function_name, args).await {
65                Ok(data) if request_id.get() == current_id => {
66                    state.set(QueryState {
67                        loading: false,
68                        data: Some(data),
69                        error: None,
70                    });
71                }
72                Err(err) if request_id.get() == current_id => {
73                    state.set(QueryState {
74                        loading: false,
75                        data: None,
76                        error: Some(err.as_forge_error()),
77                    });
78                }
79                _ => {}
80            }
81        });
82    }));
83
84    state
85}
86
87pub fn use_forge_query<TArgs, TResult>(
88    function_name: &'static str,
89    args: TArgs,
90) -> QueryState<TResult>
91where
92    TArgs: Serialize + Clone + PartialEq + 'static,
93    TResult: DeserializeOwned + Clone + 'static,
94{
95    use_forge_query_signal(function_name, args)()
96}
97
98pub fn use_forge_subscription_signal<TArgs, TResult>(
99    function_name: &'static str,
100    args: TArgs,
101) -> Signal<SubscriptionState<TResult>>
102where
103    TArgs: Serialize + Clone + PartialEq + 'static,
104    TResult: DeserializeOwned + Clone + 'static,
105{
106    let client = use_forge_client();
107    let state = use_signal(SubscriptionState::<TResult>::default);
108    let handle = use_hook(|| Rc::new(RefCell::new(None::<SubscriptionHandle>)));
109    let generation = use_hook(|| Rc::new(Cell::new(0_u64)));
110    let pending_data = use_hook(|| Rc::new(RefCell::new(None::<TResult>)));
111    let flush_scheduled = use_hook(|| Rc::new(Cell::new(false)));
112    let has_received_data = use_hook(|| Rc::new(Cell::new(false)));
113    let reconnect_attempts = use_hook(|| Rc::new(Cell::new(0_u32)));
114    let reconnect_nonce = use_signal(|| 0_u64);
115    let effect_handle = handle.clone();
116    let reconnect_key = reconnect_nonce();
117
118    use_effect(use_reactive!(|(args, reconnect_key)| {
119        let is_reconnect = reconnect_key > 0;
120        let current_generation = generation.get() + 1;
121        generation.set(current_generation);
122
123        if let Some(existing) = effect_handle.borrow_mut().take() {
124            existing.close();
125        }
126
127        let mut state = state;
128        let previous = state.peek().clone();
129        pending_data.borrow_mut().take();
130        flush_scheduled.set(false);
131        let had_data = previous.data.is_some();
132        has_received_data.set(had_data);
133
134        // Only update visible state on the initial connection, not retries.
135        // Retries happen silently to avoid re-render storms in desktop WebViews.
136        if !is_reconnect {
137            state.set(SubscriptionState {
138                loading: !had_data,
139                data: previous.data,
140                error: None,
141                stale: had_data,
142                connection_state: ConnectionState::Connecting,
143            });
144        }
145
146        let reconnect_generation = generation.clone();
147        let reconnect_attempts = reconnect_attempts.clone();
148        if !is_reconnect {
149            reconnect_attempts.set(0);
150        }
151        let pending_data = pending_data.clone();
152        let flush_scheduled = flush_scheduled.clone();
153        let has_received_data = has_received_data.clone();
154
155        let subscription = client.subscribe_query(function_name, args, move |event| match event {
156            StreamEvent::Connection(connection_state) => {
157                if connection_state == ConnectionState::Connected {
158                    reconnect_attempts.set(0);
159                    let mut next = state.peek().clone();
160                    next.connection_state = ConnectionState::Connected;
161                    next.stale = false;
162                    state.set(next);
163                }
164
165                if connection_state == ConnectionState::Disconnected
166                    && reconnect_generation.get() == current_generation
167                {
168                    let attempts = reconnect_attempts.get();
169                    if attempts >= 10 {
170                        // Exhausted retries, now surface the error
171                        let mut next = state.peek().clone();
172                        next.loading = false;
173                        next.connection_state = ConnectionState::Disconnected;
174                        next.stale = true;
175                        state.set(next);
176                        return;
177                    }
178                    reconnect_attempts.set(attempts + 1);
179                    let delay = 1000 * (1 << attempts.min(4));
180                    let mut reconnect_nonce = reconnect_nonce;
181                    spawn(async move {
182                        sleep(Duration::from_millis(delay as u64)).await;
183                        reconnect_nonce += 1;
184                    });
185                }
186            }
187            StreamEvent::Data(data) => {
188                if !has_received_data.replace(true) {
189                    let conn = state.peek().connection_state;
190                    state.set(SubscriptionState {
191                        loading: false,
192                        data: Some(data),
193                        error: None,
194                        stale: false,
195                        connection_state: conn,
196                    });
197                    return;
198                }
199
200                *pending_data.borrow_mut() = Some(data);
201                if flush_scheduled.replace(true) {
202                    return;
203                }
204
205                let pending_data = pending_data.clone();
206                let flush_scheduled = flush_scheduled.clone();
207                let mut state = state;
208                spawn(async move {
209                    sleep(Duration::from_millis(120)).await;
210                    flush_scheduled.set(false);
211
212                    let Some(data) = pending_data.borrow_mut().take() else {
213                        return;
214                    };
215
216                    let conn = state.peek().connection_state;
217                    state.set(SubscriptionState {
218                        loading: false,
219                        data: Some(data),
220                        error: None,
221                        stale: false,
222                        connection_state: conn,
223                    });
224                });
225            }
226            StreamEvent::Error(err) => {
227                // During reconnect attempts, suppress errors to avoid UI churn.
228                // Only surface errors on the initial connection or after retries exhaust.
229                let attempts = reconnect_attempts.get();
230                if attempts > 0 && attempts < 10 {
231                    return;
232                }
233                let mut next = state.peek().clone();
234                next.loading = false;
235                next.error = Some(err.as_forge_error());
236                next.stale = true;
237                state.set(next);
238            }
239        });
240
241        *effect_handle.borrow_mut() = Some(subscription);
242    }));
243
244    use_drop({
245        let handle = handle.clone();
246        move || {
247            if let Some(existing) = handle.borrow_mut().take() {
248                existing.close();
249            }
250        }
251    });
252
253    state
254}
255
256pub fn use_forge_subscription<TArgs, TResult>(
257    function_name: &'static str,
258    args: TArgs,
259) -> SubscriptionState<TResult>
260where
261    TArgs: Serialize + Clone + PartialEq + 'static,
262    TResult: DeserializeOwned + Clone + 'static,
263{
264    use_forge_subscription_signal(function_name, args)()
265}
266
267pub fn use_forge_mutation<TArgs, TResult>(
268    function_name: &'static str,
269) -> Mutation<TArgs, TResult>
270where
271    TArgs: Serialize + 'static,
272    TResult: DeserializeOwned + 'static,
273{
274    let client = use_forge_client();
275    Mutation::new(client, function_name)
276}
277
278pub fn use_forge_job_signal<TArgs, TResult>(
279    function_name: &'static str,
280    args: TArgs,
281) -> Signal<JobExecutionState<TResult>>
282where
283    TArgs: Serialize + Clone + PartialEq + 'static,
284    TResult: DeserializeOwned + Clone + 'static,
285{
286    let client = use_forge_client();
287    let state = use_signal(JobExecutionState::<TResult>::default);
288    let handle = use_hook(|| Rc::new(RefCell::new(None::<SubscriptionHandle>)));
289    let effect_handle = handle.clone();
290
291    use_effect(use_reactive!(|(args,)| {
292        if let Some(existing) = effect_handle.borrow_mut().take() {
293            existing.close();
294        }
295
296        let client = client.clone();
297        let handle = effect_handle.clone();
298        let mut state = state;
299        state.set(JobExecutionState::default());
300
301        spawn(async move {
302            match client
303                .call::<_, JobStartResponse>(function_name, args)
304                .await
305            {
306                Ok(started) => {
307                    let subscription =
308                        client.subscribe_job(started.job_id.clone(), move |event| match event {
309                            StreamEvent::Connection(connection_state) => {
310                                let mut next = state.peek().clone();
311                                next.connection_state = connection_state;
312                                state.set(next);
313                            }
314                            StreamEvent::Data(job_state) => {
315                                let conn = state.peek().connection_state;
316                                state.set(JobExecutionState {
317                                    loading: false,
318                                    connection_state: conn,
319                                    state: job_state,
320                                });
321                            }
322                            StreamEvent::Error(err) => {
323                                let mut next = state.peek().clone();
324                                next.loading = false;
325                                next.state.error = Some(err.message);
326                                state.set(next);
327                            }
328                        });
329                    *handle.borrow_mut() = Some(subscription);
330                }
331                Err(err) => {
332                    let mut next = state.peek().clone();
333                    next.loading = false;
334                    next.state.error = Some(err.message);
335                    state.set(next);
336                }
337            }
338        });
339    }));
340
341    use_drop({
342        let handle = handle.clone();
343        move || {
344            if let Some(existing) = handle.borrow_mut().take() {
345                existing.close();
346            }
347        }
348    });
349
350    state
351}
352
353pub fn use_forge_job<TArgs, TResult>(
354    function_name: &'static str,
355    args: TArgs,
356) -> JobExecutionState<TResult>
357where
358    TArgs: Serialize + Clone + PartialEq + 'static,
359    TResult: DeserializeOwned + Clone + 'static,
360{
361    use_forge_job_signal(function_name, args)()
362}
363
364pub fn use_forge_workflow_signal<TArgs, TResult>(
365    function_name: &'static str,
366    args: TArgs,
367) -> Signal<WorkflowExecutionState<TResult>>
368where
369    TArgs: Serialize + Clone + PartialEq + 'static,
370    TResult: DeserializeOwned + Clone + 'static,
371{
372    let client = use_forge_client();
373    let state = use_signal(WorkflowExecutionState::<TResult>::default);
374    let handle = use_hook(|| Rc::new(RefCell::new(None::<SubscriptionHandle>)));
375    let effect_handle = handle.clone();
376
377    use_effect(use_reactive!(|(args,)| {
378        if let Some(existing) = effect_handle.borrow_mut().take() {
379            existing.close();
380        }
381
382        let client = client.clone();
383        let handle = effect_handle.clone();
384        let mut state = state;
385        state.set(WorkflowExecutionState::default());
386
387        spawn(async move {
388            match client
389                .call::<_, WorkflowStartResponse>(function_name, args)
390                .await
391            {
392                Ok(started) => {
393                    let subscription =
394                        client.subscribe_workflow(started.workflow_id.clone(), move |event| {
395                            match event {
396                                StreamEvent::Connection(connection_state) => {
397                                    let mut next = state.peek().clone();
398                                    next.connection_state = connection_state;
399                                    state.set(next);
400                                }
401                                StreamEvent::Data(workflow_state) => {
402                                    let conn = state.peek().connection_state;
403                                    state.set(WorkflowExecutionState {
404                                        loading: false,
405                                        connection_state: conn,
406                                        state: workflow_state,
407                                    });
408                                }
409                                StreamEvent::Error(err) => {
410                                    let mut next = state.peek().clone();
411                                    next.loading = false;
412                                    next.state.error = Some(err.message);
413                                    state.set(next);
414                                }
415                            }
416                        });
417                    *handle.borrow_mut() = Some(subscription);
418                }
419                Err(err) => {
420                    let mut next = state.peek().clone();
421                    next.loading = false;
422                    next.state.error = Some(err.message);
423                    state.set(next);
424                }
425            }
426        });
427    }));
428
429    use_drop({
430        let handle = handle.clone();
431        move || {
432            if let Some(existing) = handle.borrow_mut().take() {
433                existing.close();
434            }
435        }
436    });
437
438    state
439}
440
441pub fn use_forge_workflow<TArgs, TResult>(
442    function_name: &'static str,
443    args: TArgs,
444) -> WorkflowExecutionState<TResult>
445where
446    TArgs: Serialize + Clone + PartialEq + 'static,
447    TResult: DeserializeOwned + Clone + 'static,
448{
449    use_forge_workflow_signal(function_name, args)()
450}
451
452/// Create an optimistic mutation that layers local patches over a live
453/// subscription. Returns an [`OptimisticMutation`] whose `.data()` reflects
454/// the optimistic state and whose `.fire()` applies the transform, sends the
455/// mutation, and auto-reverts on error or TTL expiry.
456///
457/// ```ignore
458/// let reorder = use_optimistic(
459///     use_reorder_task(),
460///     use_list_tasks_live_signal(),
461///     |tasks, args: &ReorderTaskInput| {
462///         tasks.iter().map(|t| {
463///             if t.id == args.id { Task { status: args.status, position: args.position, ..t.clone() } }
464///             else { t.clone() }
465///         }).collect()
466///     },
467/// );
468/// // Read from reorder.data() instead of the raw subscription
469/// // Call reorder.fire(args) for optimistic + server mutation
470/// ```
471pub fn use_optimistic<A, R, D>(
472    mutation: Mutation<A, R>,
473    subscription: Signal<SubscriptionState<D>>,
474    apply: impl Fn(&D, &A) -> D + 'static,
475) -> OptimisticMutation<A, R, D>
476where
477    A: Serialize + Clone + 'static,
478    R: DeserializeOwned + 'static,
479    D: Clone + PartialEq + 'static,
480{
481    let mut view: Signal<Option<D>> = use_signal(|| subscription.read().data.clone());
482    let mut pending: Signal<Option<PendingOptimistic<D>>> = use_signal(|| None);
483    let apply = use_hook(|| Rc::new(apply));
484
485    // Sync view with subscription data. When a pending optimistic update
486    // exists, an incoming SSE push is treated as server confirmation.
487    use_effect(move || {
488        let sub_data = subscription.read().data.clone();
489        if pending.read().is_some() {
490            // SSE delivered fresh data while optimistic patch was active.
491            // Treat as confirmation: adopt server state, clear pending.
492            pending.set(None);
493        }
494        view.set(sub_data);
495    });
496
497    OptimisticMutation {
498        mutation,
499        view,
500        apply,
501        subscription,
502        pending,
503    }
504}
505