Skip to main content

forge_dioxus/
client.rs

1
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4use std::rc::Rc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8use dioxus::prelude::{Signal, WritableExt, dioxus_core::Task};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11
12use crate::signals::ForgeSignals;
13use crate::types::{
14    ConnectionState, ForgeClientError, ForgeError, RpcEnvelopeRaw, StreamEvent,
15};
16
17type TokenProvider = Rc<dyn Fn() -> Option<String>>;
18type AuthErrorHandler = Rc<dyn Fn(ForgeError)>;
19type MutationErrorHandler = Rc<dyn Fn(ForgeClientError)>;
20type EventSender = futures_channel::mpsc::UnboundedSender<SseDispatch>;
21type ConnectWaiter = futures_channel::oneshot::Sender<Result<(), ForgeClientError>>;
22
23static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
24
25enum SseDispatch {
26    Data(serde_json::Value),
27    Error { code: String, message: String },
28}
29
30struct RegistrationMeta {
31    endpoint: &'static str,
32    payload: serde_json::Value,
33}
34
35#[derive(Default, Clone, Copy, PartialEq, Eq)]
36enum SseState {
37    #[default]
38    Idle,
39    Connecting,
40    Connected,
41}
42
43/// Shared SSE connection state, one per ForgeClient.
44#[derive(Default)]
45struct SseManager {
46    session_id: Option<String>,
47    session_secret: Option<String>,
48    state: SseState,
49    ever_connected: bool,
50    listeners: HashMap<String, EventSender>,
51    registrations: HashMap<String, RegistrationMeta>,
52    event_loop_task: Option<Task>,
53    reconnect_attempts: u32,
54    connect_waiters: Vec<ConnectWaiter>,
55}
56
57const MAX_RECONNECT_ATTEMPTS: u32 = 10;
58
59#[derive(Clone)]
60pub struct ForgeClientConfig {
61    pub url: String,
62    pub get_token: Option<TokenProvider>,
63    pub on_auth_error: Option<AuthErrorHandler>,
64    pub on_mutation_error: Option<MutationErrorHandler>,
65    pub(crate) connection_state: Option<Signal<ConnectionState>>,
66}
67
68impl ForgeClientConfig {
69    pub fn new(url: impl Into<String>) -> Self {
70        Self {
71            url: url.into(),
72            get_token: None,
73            on_auth_error: None,
74            on_mutation_error: None,
75            connection_state: None,
76        }
77    }
78
79    pub fn with_token_provider(mut self, provider: impl Fn() -> Option<String> + 'static) -> Self {
80        self.get_token = Some(Rc::new(provider));
81        self
82    }
83
84    pub fn with_auth_error_handler(
85        mut self,
86        handler: impl Fn(ForgeError) + 'static,
87    ) -> Self {
88        self.on_auth_error = Some(Rc::new(handler));
89        self
90    }
91
92    /// Register a callback invoked when [`Mutation::fire`] encounters an error.
93    pub fn with_mutation_error_handler(
94        mut self,
95        handler: impl Fn(ForgeClientError) + 'static,
96    ) -> Self {
97        self.on_mutation_error = Some(Rc::new(handler));
98        self
99    }
100
101    pub(crate) fn with_connection_state(mut self, state: Signal<ConnectionState>) -> Self {
102        self.connection_state = Some(state);
103        self
104    }
105}
106
107#[derive(Clone)]
108pub struct ForgeClient {
109    inner: Rc<ForgeClientInner>,
110}
111
112struct ForgeClientInner {
113    url: String,
114    get_token: Option<TokenProvider>,
115    on_auth_error: Option<AuthErrorHandler>,
116    on_mutation_error: Option<MutationErrorHandler>,
117    connection_state: Option<Signal<ConnectionState>>,
118    sse: RefCell<SseManager>,
119    signals: RefCell<Option<ForgeSignals>>,
120}
121
122impl ForgeClient {
123    pub fn new(config: ForgeClientConfig) -> Self {
124        Self {
125            inner: Rc::new(ForgeClientInner {
126                url: config.url.trim_end_matches('/').to_string(),
127                get_token: config.get_token,
128                on_auth_error: config.on_auth_error,
129                on_mutation_error: config.on_mutation_error,
130                connection_state: config.connection_state,
131                sse: RefCell::new(SseManager::default()),
132                signals: RefCell::new(None),
133            }),
134        }
135    }
136
137    /// Wire signals for correlation ID injection on RPC calls.
138    pub fn set_signals(&self, signals: ForgeSignals) {
139        *self.inner.signals.borrow_mut() = Some(signals);
140    }
141
142    /// Get the base URL of this client.
143    pub fn get_url(&self) -> &str {
144        &self.inner.url
145    }
146
147    /// Notify the registered mutation error handler, if any. Called by
148    /// [`Mutation::fire`] when a call fails and no per-call handler was provided.
149    pub fn notify_mutation_error(&self, error: ForgeClientError) {
150        if let Some(handler) = &self.inner.on_mutation_error {
151            handler(error);
152        }
153    }
154
155    /// Generate a correlation ID from the wired signals instance, if any.
156    fn correlation_id(&self) -> Option<String> {
157        self.inner.signals.borrow().as_ref().map(|s| s.next_correlation_id())
158    }
159
160    pub async fn call<TArgs, TResult>(
161        &self,
162        function_name: &str,
163        args: TArgs,
164    ) -> Result<TResult, ForgeClientError>
165    where
166        TArgs: Serialize,
167        TResult: DeserializeOwned,
168    {
169        let body = serde_json::json!({ "args": args });
170        let correlation_id = self.correlation_id();
171        let envelope = platform::request_json(
172            self,
173            &format!("{}/_api/rpc/{}", self.inner.url, function_name),
174            body,
175            correlation_id.as_deref(),
176        )
177        .await?;
178        self.decode_envelope(envelope)
179    }
180
181    #[cfg(target_arch = "wasm32")]
182    pub async fn call_multipart<TResult>(
183        &self,
184        function_name: &str,
185        form: web_sys::FormData,
186    ) -> Result<TResult, ForgeClientError>
187    where
188        TResult: DeserializeOwned,
189    {
190        let correlation_id = self.correlation_id();
191        let envelope = platform::request_multipart(
192            self,
193            &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
194            form,
195            correlation_id.as_deref(),
196        )
197        .await?;
198        self.decode_envelope(envelope)
199    }
200
201    #[cfg(not(target_arch = "wasm32"))]
202    pub async fn call_multipart<TResult>(
203        &self,
204        function_name: &str,
205        form: reqwest::multipart::Form,
206    ) -> Result<TResult, ForgeClientError>
207    where
208        TResult: DeserializeOwned,
209    {
210        let correlation_id = self.correlation_id();
211        let envelope = platform::request_multipart(
212            self,
213            &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
214            form,
215            correlation_id.as_deref(),
216        )
217        .await?;
218        self.decode_envelope(envelope)
219    }
220
221    pub fn subscribe_query<TArgs, TResult, F>(
222        &self,
223        function_name: &str,
224        args: TArgs,
225        callback: F,
226    ) -> SubscriptionHandle
227    where
228        TArgs: Serialize + Clone + 'static,
229        TResult: DeserializeOwned + Clone + 'static,
230        F: FnMut(StreamEvent<TResult>) + 'static,
231    {
232        let sub_id = self.random_id("sub");
233        let target = format!("sub:{sub_id}");
234
235        let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
236        self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
237
238        let args_value = serde_json::to_value(&args).unwrap_or(serde_json::Value::Null);
239        let reg_payload = serde_json::json!({
240            "id": sub_id,
241            "function": function_name,
242            "args": args_value,
243        });
244        self.inner.sse.borrow_mut().registrations.insert(
245            sub_id.clone(),
246            RegistrationMeta {
247                endpoint: "/_api/subscribe",
248                payload: reg_payload,
249            },
250        );
251
252        self.spawn_subscription(sub_id, target, rx, callback, |client, envelope, cb| {
253            match client.decode_envelope::<TResult>(envelope) {
254                Ok(data) => cb(StreamEvent::Data(data)),
255                Err(err) => cb(StreamEvent::Error(err)),
256            }
257        })
258    }
259
260    pub fn subscribe_job<TResult, F>(&self, job_id: String, callback: F) -> SubscriptionHandle
261    where
262        TResult: DeserializeOwned + Clone + 'static,
263        F: FnMut(StreamEvent<TResult>) + 'static,
264    {
265        self.subscribe_tracker("job", serde_json::json!({ "job_id": job_id }), "/_api/subscribe-job", callback)
266    }
267
268    pub fn subscribe_workflow<TResult, F>(
269        &self,
270        workflow_id: String,
271        callback: F,
272    ) -> SubscriptionHandle
273    where
274        TResult: DeserializeOwned + Clone + 'static,
275        F: FnMut(StreamEvent<TResult>) + 'static,
276    {
277        self.subscribe_tracker(
278            "wf",
279            serde_json::json!({ "workflow_id": workflow_id }),
280            "/_api/subscribe-workflow",
281            callback,
282        )
283    }
284
285    fn subscribe_tracker<TResult, F>(
286        &self,
287        prefix: &str,
288        payload: serde_json::Value,
289        endpoint: &'static str,
290        callback: F,
291    ) -> SubscriptionHandle
292    where
293        TResult: DeserializeOwned + Clone + 'static,
294        F: FnMut(StreamEvent<TResult>) + 'static,
295    {
296        let sub_id = self.random_id(prefix);
297        let target = format!("{prefix}:{sub_id}");
298
299        let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
300        self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
301
302        let mut reg_payload = payload;
303        reg_payload
304            .as_object_mut()
305            .expect("tracker payload must be an object")
306            .insert("id".to_string(), serde_json::Value::String(sub_id.clone()));
307        self.inner.sse.borrow_mut().registrations.insert(
308            sub_id.clone(),
309            RegistrationMeta {
310                endpoint,
311                payload: reg_payload,
312            },
313        );
314
315        self.spawn_subscription(sub_id, target, rx, callback, |_client, envelope, cb| {
316            if envelope.success {
317                if let Some(data) = envelope.data {
318                    match serde_json::from_value::<TResult>(data) {
319                        Ok(parsed) => cb(StreamEvent::Data(parsed)),
320                        Err(e) => cb(StreamEvent::Error(ForgeClientError::new(
321                            "DESERIALIZATION_ERROR",
322                            e.to_string(),
323                            None,
324                        ))),
325                    }
326                }
327            }
328        })
329    }
330
331    fn spawn_subscription<TResult, F>(
332        &self,
333        sub_id: String,
334        target: String,
335        mut rx: futures_channel::mpsc::UnboundedReceiver<SseDispatch>,
336        mut callback: F,
337        on_initial: impl FnOnce(&ForgeClient, RpcEnvelopeRaw, &mut F) + 'static,
338    ) -> SubscriptionHandle
339    where
340        TResult: DeserializeOwned + Clone + 'static,
341        F: FnMut(StreamEvent<TResult>) + 'static,
342    {
343        let client = self.clone();
344        let handle = SubscriptionHandle::new(sub_id.clone(), target, self.clone());
345        let handle_task = handle.clone();
346
347        let task = dioxus::prelude::spawn(async move {
348            callback(StreamEvent::Connection(ConnectionState::Connecting));
349
350            if let Err(e) = client.ensure_connected().await {
351                callback(StreamEvent::Error(e));
352                callback(StreamEvent::Connection(ConnectionState::Disconnected));
353                handle_task.finish();
354                return;
355            }
356
357            match client.register_subscription(&sub_id).await {
358                Ok(envelope) => {
359                    callback(StreamEvent::Connection(ConnectionState::Connected));
360                    on_initial(&client, envelope, &mut callback);
361                }
362                Err(err) => {
363                    callback(StreamEvent::Error(err));
364                    callback(StreamEvent::Connection(ConnectionState::Disconnected));
365                    handle_task.finish();
366                    return;
367                }
368            }
369
370            while let Some(event) = futures_util::StreamExt::next(&mut rx).await {
371                Self::deliver_event::<TResult, F>(&mut callback, &client, event);
372            }
373
374            handle_task.finish();
375        });
376
377        handle.set_task(task);
378        handle
379    }
380
381    fn deliver_event<TResult, F>(
382        callback: &mut F,
383        client: &ForgeClient,
384        event: SseDispatch,
385    ) where
386        TResult: DeserializeOwned,
387        F: FnMut(StreamEvent<TResult>),
388    {
389        match event {
390            SseDispatch::Data(value) => match serde_json::from_value::<TResult>(value) {
391                Ok(data) => callback(StreamEvent::Data(data)),
392                Err(e) => {
393                    callback(StreamEvent::Error(ForgeClientError::new(
394                        "DESERIALIZATION_ERROR",
395                        e.to_string(),
396                        None,
397                    )));
398                }
399            },
400            SseDispatch::Error { code, message } => {
401                let err = ForgeClientError::new(&code, &message, None);
402                if code == "UNAUTHORIZED" {
403                    if let Some(handler) = &client.inner.on_auth_error {
404                        handler(err.as_forge_error());
405                    }
406                }
407                callback(StreamEvent::Error(err));
408            }
409        }
410    }
411
412    /// Ensure the shared SSE connection is established. Spawns the event loop on first call.
413    async fn ensure_connected(&self) -> Result<(), ForgeClientError> {
414        let rx = {
415            let mut sse = self.inner.sse.borrow_mut();
416            if sse.state == SseState::Connected {
417                return Ok(());
418            }
419
420            let (tx, rx) = futures_channel::oneshot::channel();
421            sse.connect_waiters.push(tx);
422
423            if sse.state == SseState::Idle {
424                sse.state = SseState::Connecting;
425                drop(sse);
426                platform::start_event_loop(self.clone());
427            }
428
429            rx
430        };
431
432        rx.await.unwrap_or_else(|_| {
433            Err(ForgeClientError::new(
434                "SSE_CONNECTION_FAILED",
435                "Connection attempt cancelled",
436                None,
437            ))
438        })
439    }
440
441    /// Register a subscription with the server via POST.
442    /// If the server returns SESSION_NOT_FOUND, forces a reconnect and retries once.
443    async fn register_subscription(
444        &self,
445        sub_id: &str,
446    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
447        let envelope = self.try_register_subscription(sub_id).await?;
448
449        // Stale session: force reconnect and retry once
450        let needs_retry = !envelope.success
451            && envelope
452                .error
453                .as_ref()
454                .is_some_and(|e| e.code == "SESSION_NOT_FOUND" || e.code == "SESSION_PRINCIPAL_MISMATCH");
455
456        if needs_retry {
457            self.force_reconnect().await;
458            self.ensure_connected().await?;
459            let retried = self.try_register_subscription(sub_id).await?;
460            self.notify_auth_error_if_needed(&retried);
461            return Ok(retried);
462        }
463
464        self.notify_auth_error_if_needed(&envelope);
465        Ok(envelope)
466    }
467
468    fn notify_auth_error_if_needed(&self, envelope: &RpcEnvelopeRaw) {
469        if let Some(err) = envelope.error.as_ref().filter(|_| !envelope.success) {
470            if (err.code == "UNAUTHORIZED" || err.code == "FORBIDDEN")
471                && let Some(handler) = &self.inner.on_auth_error
472            {
473                handler(err.clone());
474            }
475        }
476    }
477
478    async fn try_register_subscription(
479        &self,
480        sub_id: &str,
481    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
482        let (endpoint, payload) = {
483            let sse = self.inner.sse.borrow();
484            let meta = sse
485                .registrations
486                .get(sub_id)
487                .ok_or_else(|| {
488                    ForgeClientError::new("INTERNAL_ERROR", "Registration metadata not found", None)
489                })?;
490            let session_id = sse.session_id.clone().unwrap_or_default();
491            let session_secret = sse.session_secret.clone().unwrap_or_default();
492            let mut payload = meta.payload.clone();
493            let obj = payload
494                .as_object_mut()
495                .expect("registration payload must be an object");
496            obj.insert("session_id".into(), serde_json::Value::String(session_id));
497            obj.insert("session_secret".into(), serde_json::Value::String(session_secret));
498            (meta.endpoint, payload)
499        };
500
501        let url = format!("{}{}", self.inner.url, endpoint);
502        platform::request_json(self, &url, payload, None).await
503    }
504
505    async fn force_reconnect(&self) {
506        let task = {
507            let mut sse = self.inner.sse.borrow_mut();
508            sse.session_id = None;
509            sse.session_secret = None;
510            sse.state = SseState::Idle;
511            // No need to drain waiters; force_reconnect is only called mid-registration
512            // where the caller already passed ensure_connected
513            sse.event_loop_task.take()
514        };
515        if let Some(task) = task {
516            task.cancel();
517        }
518        sleep(Duration::from_millis(10)).await;
519    }
520
521    /// Tear down the current SSE connection and start a fresh one.
522    ///
523    /// The new connection calls `get_token()` again, picking up any tokens
524    /// that were updated since the original connection was established.
525    /// Existing subscriptions are automatically re-registered once the new
526    /// connection's "connected" handshake completes.
527    pub fn reconnect_sse(&self) {
528        let has_listeners = {
529            let mut sse = self.inner.sse.borrow_mut();
530            if sse.state == SseState::Idle && sse.event_loop_task.is_none() && sse.listeners.is_empty() {
531                return;
532            }
533            // Already tearing down and reconnecting, don't stack another one
534            if sse.state == SseState::Connecting && sse.event_loop_task.is_some() {
535                return;
536            }
537            if let Some(task) = sse.event_loop_task.take() {
538                task.cancel();
539            }
540            sse.session_id = None;
541            sse.session_secret = None;
542            sse.reconnect_attempts = 0;
543            let has_listeners = !sse.listeners.is_empty();
544            sse.state = if has_listeners {
545                SseState::Connecting
546            } else {
547                SseState::Idle
548            };
549            has_listeners
550        };
551        if has_listeners {
552            platform::start_event_loop(self.clone());
553        }
554    }
555
556    async fn reregister_all(&self) {
557        let sub_ids: Vec<String> = {
558            let sse = self.inner.sse.borrow();
559            sse.registrations.keys().cloned().collect()
560        };
561
562        for sub_id in sub_ids {
563            let _ = self.register_subscription(&sub_id).await;
564        }
565    }
566
567    fn dispatch_event(&self, target: &str, event: SseDispatch) {
568        let tx = {
569            let sse = self.inner.sse.borrow();
570            sse.listeners.get(target).cloned()
571        };
572        if let Some(tx) = tx {
573            let _ = tx.unbounded_send(event);
574        }
575    }
576
577    fn broadcast_connection(&self, state: ConnectionState) {
578        if let Some(mut signal) = self.inner.connection_state {
579            signal.set(state);
580        }
581    }
582
583    fn mark_connected(&self, session_id: String, session_secret: String) -> bool {
584        let mut sse = self.inner.sse.borrow_mut();
585        let is_reconnect = sse.ever_connected;
586        sse.session_id = Some(session_id);
587        sse.session_secret = Some(session_secret);
588        sse.state = SseState::Connected;
589        sse.reconnect_attempts = 0;
590        sse.ever_connected = true;
591        for waiter in sse.connect_waiters.drain(..) {
592            let _ = waiter.send(Ok(()));
593        }
594        is_reconnect
595    }
596
597    fn mark_disconnected(&self) {
598        let mut sse = self.inner.sse.borrow_mut();
599        sse.session_id = None;
600        sse.session_secret = None;
601        sse.state = SseState::Idle;
602        sse.event_loop_task = None;
603        let err = || ForgeClientError::new("SSE_CONNECTION_FAILED", "SSE connection lost", None);
604        for waiter in sse.connect_waiters.drain(..) {
605            let _ = waiter.send(Err(err()));
606        }
607    }
608
609    fn should_reconnect(&self) -> Option<u32> {
610        let mut sse = self.inner.sse.borrow_mut();
611        if sse.listeners.is_empty() {
612            return None;
613        }
614        let attempts = sse.reconnect_attempts;
615        if attempts >= MAX_RECONNECT_ATTEMPTS {
616            return None;
617        }
618        sse.reconnect_attempts = attempts + 1;
619        Some(attempts)
620    }
621
622    fn get_token(&self) -> Option<String> {
623        self.inner
624            .get_token
625            .as_ref()
626            .and_then(|provider| provider())
627            .filter(|t| !t.is_empty())
628    }
629
630    fn decode_envelope<TResult>(
631        &self,
632        envelope: RpcEnvelopeRaw,
633    ) -> Result<TResult, ForgeClientError>
634    where
635        TResult: DeserializeOwned,
636    {
637        if !envelope.success {
638            let error = envelope.error.unwrap_or(ForgeError {
639                code: "UNKNOWN".to_string(),
640                message: "Unknown error".to_string(),
641                details: None,
642            });
643            if error.code == "UNAUTHORIZED" || error.code == "FORBIDDEN" {
644                if let Some(handler) = &self.inner.on_auth_error {
645                    handler(error.clone());
646                }
647            }
648            return Err(ForgeClientError::new(error.code, error.message, error.details));
649        }
650
651        let data = envelope.data.ok_or_else(|| {
652            ForgeClientError::new("EMPTY_RESPONSE", "Server returned no data", None)
653        })?;
654        serde_json::from_value(data)
655            .map_err(|err| ForgeClientError::new("DESERIALIZATION_ERROR", err.to_string(), None))
656    }
657
658    fn random_id(&self, prefix: &str) -> String {
659        let id = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
660        format!("{prefix}-{id}")
661    }
662}
663
664async fn sleep(duration: Duration) {
665    #[cfg(target_arch = "wasm32")]
666    {
667        gloo_timers::future::sleep(duration).await;
668    }
669
670    #[cfg(not(target_arch = "wasm32"))]
671    {
672        tokio::time::sleep(duration).await;
673    }
674}
675
676#[derive(Clone)]
677pub struct SubscriptionHandle {
678    closed: Rc<Cell<bool>>,
679    task: Rc<RefCell<Option<Task>>>,
680    cleanup: Rc<RefCell<Option<Box<dyn FnOnce()>>>>,
681}
682
683impl SubscriptionHandle {
684    fn new(sub_id: String, target: String, client: ForgeClient) -> Self {
685        let cleanup: Box<dyn FnOnce()> = Box::new(move || {
686            let mut sse = client.inner.sse.borrow_mut();
687            sse.listeners.remove(&target);
688            sse.registrations.remove(&sub_id);
689            // Jobs/workflows have server-managed lifecycles; only query subs need explicit unsubscribe
690            if target.starts_with("sub:") {
691                let session_id = sse.session_id.clone();
692                let session_secret = sse.session_secret.clone();
693                drop(sse);
694                if let (Some(sid), Some(ss)) = (session_id, session_secret) {
695                    let url = format!("{}/_api/unsubscribe", client.inner.url);
696                    let payload = serde_json::json!({
697                        "session_id": sid,
698                        "session_secret": ss,
699                        "id": sub_id,
700                    });
701                    let client = client.clone();
702                    dioxus::prelude::spawn(async move {
703                        let _ = platform::request_json(&client, &url, payload, None).await;
704                    });
705                }
706            }
707        });
708
709        Self {
710            closed: Rc::new(Cell::new(false)),
711            task: Rc::new(RefCell::new(None)),
712            cleanup: Rc::new(RefCell::new(Some(cleanup))),
713        }
714    }
715
716    fn set_task(&self, task: Task) {
717        *self.task.borrow_mut() = Some(task);
718    }
719
720    pub(crate) fn finish(&self) {
721        if self.closed.replace(true) {
722            return;
723        }
724        if let Some(cleanup) = self.cleanup.borrow_mut().take() {
725            cleanup();
726        }
727        self.task.borrow_mut().take();
728    }
729
730    pub fn close(&self) {
731        let task = { self.task.borrow_mut().clone() };
732        self.finish();
733        if let Some(task) = task {
734            task.cancel();
735        }
736    }
737
738    pub fn is_closed(&self) -> bool {
739        self.closed.get()
740    }
741}
742
743impl Drop for SubscriptionHandle {
744    fn drop(&mut self) {
745        self.close();
746    }
747}
748
749// ── WASM platform ───────────────────────────────────────────────────────────
750
751#[cfg(target_arch = "wasm32")]
752mod platform {
753    use dioxus::prelude::spawn;
754    use futures_util::{StreamExt, stream};
755    use gloo_net::eventsource::futures::EventSource;
756    use gloo_net::http::Request;
757    use js_sys::{JSON, encode_uri_component};
758
759    use super::{ForgeClient, SseDispatch, sleep};
760    use crate::signals::platform_tag;
761    use crate::types::{
762        ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
763    };
764
765    pub(super) async fn request_json(
766        client: &ForgeClient,
767        url: &str,
768        body: serde_json::Value,
769        correlation_id: Option<&str>,
770    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
771        let mut request = Request::post(url)
772            .header("Content-Type", "application/json")
773            .header("x-forge-platform", platform_tag())
774            .credentials(web_sys::RequestCredentials::Include);
775        if let Some(token) = client.get_token() {
776            request = request.header("Authorization", &format!("Bearer {token}"));
777        }
778        if let Some(cid) = correlation_id {
779            request = request.header("x-correlation-id", cid);
780        }
781
782        let request = request.body(body.to_string()).map_err(request_error)?;
783        request
784            .send()
785            .await
786            .map_err(request_error)?
787            .json()
788            .await
789            .map_err(request_error)
790    }
791
792    pub(super) async fn request_multipart(
793        client: &ForgeClient,
794        url: &str,
795        form: web_sys::FormData,
796        correlation_id: Option<&str>,
797    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
798        let mut request = Request::post(url)
799            .header("x-forge-platform", platform_tag())
800            .credentials(web_sys::RequestCredentials::Include);
801        if let Some(token) = client.get_token() {
802            request = request.header("Authorization", &format!("Bearer {token}"));
803        }
804        if let Some(cid) = correlation_id {
805            request = request.header("x-correlation-id", cid);
806        }
807
808        let response = request.body(form).map_err(request_error)?;
809        response
810            .send()
811            .await
812            .map_err(request_error)?
813            .json()
814            .await
815            .map_err(request_error)
816    }
817
818    fn message_data_as_string(message: &web_sys::MessageEvent) -> Option<String> {
819        let data = message.data();
820        data.as_string().or_else(|| {
821            JSON::stringify(&data)
822                .ok()
823                .and_then(|value| value.as_string())
824                .map(|raw| serde_json::from_str::<String>(&raw).unwrap_or(raw))
825        })
826    }
827
828    fn events_url(client: &ForgeClient) -> String {
829        match client.get_token() {
830            Some(token) => format!(
831                "{}/_api/events?token={}",
832                client.inner.url,
833                encode_uri_component(&token)
834            ),
835            None => format!("{}/_api/events", client.inner.url),
836        }
837    }
838
839    /// Start the single shared SSE event loop.
840    pub(super) fn start_event_loop(client: ForgeClient) {
841        let client_for_task = client.clone();
842        let task = spawn(async move {
843            let was_connected = run_event_loop(&client_for_task).await;
844
845            client_for_task.mark_disconnected();
846            client_for_task.broadcast_connection(ConnectionState::Disconnected);
847
848            // Never reached "connected" handshake while holding a token:
849            // almost certainly a 401. Trigger refresh instead of retrying
850            // with the same expired token.
851            if !was_connected && client_for_task.get_token().is_some() {
852                if let Some(handler) = &client_for_task.inner.on_auth_error {
853                    handler(crate::types::ForgeError {
854                        code: "UNAUTHORIZED".into(),
855                        message: "SSE authentication failed".into(),
856                        details: None,
857                    });
858                }
859                return;
860            }
861
862            if let Some(attempts) = client_for_task.should_reconnect() {
863                let delay = 1000 * (1u64 << attempts.min(4));
864                let jitter = (js_sys::Math::random() * 500.0) as u64;
865                sleep(std::time::Duration::from_millis(delay + jitter)).await;
866
867                client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
868                start_event_loop(client_for_task);
869            }
870        });
871
872        client.inner.sse.borrow_mut().event_loop_task = Some(task);
873    }
874
875    /// Returns `true` if the connection was established at some point.
876    async fn run_event_loop(client: &ForgeClient) -> bool {
877        let mut event_source = match EventSource::new(&events_url(client)) {
878            Ok(source) => source,
879            Err(_) => {
880                return false;
881            }
882        };
883
884        let mut connected_stream = match event_source.subscribe("connected") {
885            Ok(stream) => stream,
886            Err(_) => return false,
887        };
888        let update_stream = match event_source.subscribe("update") {
889            Ok(stream) => stream,
890            Err(_) => return false,
891        };
892        let error_stream = match event_source.subscribe("error") {
893            Ok(stream) => stream,
894            Err(_) => return false,
895        };
896
897        // Wait for the connected event
898        let connected_event = match connected_stream.next().await {
899            Some(Ok((_kind, message))) => {
900                let Some(raw) = message_data_as_string(&message) else {
901                    return false;
902                };
903                match serde_json::from_str::<ConnectedEvent>(&raw) {
904                    Ok(event) => event,
905                    Err(_) => return false,
906                }
907            }
908            _ => return false,
909        };
910
911        let session_id = connected_event.session_id.unwrap_or_default();
912        let session_secret = connected_event.session_secret.unwrap_or_default();
913
914        if session_id.is_empty() || session_secret.is_empty() {
915            return false;
916        }
917
918        let is_reconnect = client.mark_connected(session_id, session_secret);
919        client.broadcast_connection(ConnectionState::Connected);
920
921        // Only re-register on reconnect; initial registrations are handled by each subscription task
922        if is_reconnect {
923            client.reregister_all().await;
924        }
925
926        let mut events = stream::select(update_stream, error_stream);
927        while let Some(event) = events.next().await {
928            match event {
929                Ok((kind, message)) => {
930                    let Some(raw) = message_data_as_string(&message) else {
931                        continue;
932                    };
933                    let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&raw) else {
934                        continue;
935                    };
936
937                    let Some(target) = envelope.target else {
938                        continue;
939                    };
940
941                    if kind == "update" {
942                        if let Some(payload) = envelope.payload {
943                            client.dispatch_event(&target, SseDispatch::Data(payload));
944                        }
945                    } else {
946                        let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
947                        let message = envelope.message.unwrap_or_else(|| "Subscription error".to_string());
948                        client.dispatch_event(&target, SseDispatch::Error { code, message });
949                    }
950                }
951                Err(_) => break,
952            }
953        }
954
955        event_source.close();
956        true
957    }
958
959    fn request_error(err: gloo_net::Error) -> ForgeClientError {
960        ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
961    }
962}
963
964// ── Native platform ─────────────────────────────────────────────────────────
965
966#[cfg(not(target_arch = "wasm32"))]
967mod platform {
968    use dioxus::prelude::spawn;
969    use futures_util::StreamExt;
970    use reqwest::Client;
971    use reqwest_eventsource::{Event, EventSource};
972
973    use super::{ForgeClient, SseDispatch, sleep};
974    use crate::signals::platform_tag;
975    use crate::types::{
976        ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
977    };
978
979    pub(super) async fn request_json(
980        client: &ForgeClient,
981        url: &str,
982        body: serde_json::Value,
983        correlation_id: Option<&str>,
984    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
985        let mut request = Client::new()
986            .post(url)
987            .header("x-forge-platform", platform_tag())
988            .json(&body);
989        if let Some(token) = client.get_token() {
990            request = request.bearer_auth(token);
991        }
992        if let Some(cid) = correlation_id {
993            request = request.header("x-correlation-id", cid);
994        }
995
996        request
997            .send()
998            .await
999            .map_err(request_error)?
1000            .json()
1001            .await
1002            .map_err(request_error)
1003    }
1004
1005    pub(super) async fn request_multipart(
1006        client: &ForgeClient,
1007        url: &str,
1008        form: reqwest::multipart::Form,
1009        correlation_id: Option<&str>,
1010    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
1011        let mut request = Client::new()
1012            .post(url)
1013            .header("x-forge-platform", platform_tag())
1014            .multipart(form);
1015        if let Some(token) = client.get_token() {
1016            request = request.bearer_auth(token);
1017        }
1018        if let Some(cid) = correlation_id {
1019            request = request.header("x-correlation-id", cid);
1020        }
1021
1022        request
1023            .send()
1024            .await
1025            .map_err(request_error)?
1026            .json()
1027            .await
1028            .map_err(request_error)
1029    }
1030
1031    /// Start the single shared SSE event loop.
1032    pub(super) fn start_event_loop(client: ForgeClient) {
1033        let client_for_task = client.clone();
1034        let task = spawn(async move {
1035            let was_connected = run_event_loop(&client_for_task).await;
1036
1037            client_for_task.mark_disconnected();
1038            client_for_task.broadcast_connection(ConnectionState::Disconnected);
1039
1040            if !was_connected && client_for_task.get_token().is_some() {
1041                if let Some(handler) = &client_for_task.inner.on_auth_error {
1042                    handler(crate::types::ForgeError {
1043                        code: "UNAUTHORIZED".into(),
1044                        message: "SSE authentication failed".into(),
1045                        details: None,
1046                    });
1047                }
1048                return;
1049            }
1050
1051            if let Some(attempts) = client_for_task.should_reconnect() {
1052                let delay = 1000 * (1u64 << attempts.min(4));
1053                sleep(std::time::Duration::from_millis(delay)).await;
1054
1055                client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
1056                start_event_loop(client_for_task);
1057            }
1058        });
1059
1060        client.inner.sse.borrow_mut().event_loop_task = Some(task);
1061    }
1062
1063    /// Returns `true` if the connection was established at some point.
1064    async fn run_event_loop(client: &ForgeClient) -> bool {
1065        let mut request = Client::new().get(format!("{}/_api/events", client.inner.url));
1066        if let Some(token) = client.get_token() {
1067            request = request.bearer_auth(token);
1068        }
1069
1070        let mut event_source = match EventSource::new(request) {
1071            Ok(source) => source,
1072            Err(_) => return false,
1073        };
1074
1075        // Wait for connected event
1076        let connected_event = loop {
1077            let Some(event) = event_source.next().await else {
1078                return false;
1079            };
1080            match event {
1081                Ok(Event::Open) => continue,
1082                Ok(Event::Message(msg)) if msg.event == "connected" => {
1083                    match serde_json::from_str::<ConnectedEvent>(&msg.data) {
1084                        Ok(event) => break event,
1085                        Err(_) => return false,
1086                    }
1087                }
1088                Ok(Event::Message(_)) => continue,
1089                Err(_) => return false,
1090            }
1091        };
1092
1093        let session_id = connected_event.session_id.unwrap_or_default();
1094        let session_secret = connected_event.session_secret.unwrap_or_default();
1095
1096        if session_id.is_empty() || session_secret.is_empty() {
1097            return false;
1098        }
1099
1100        let is_reconnect = client.mark_connected(session_id, session_secret);
1101        client.broadcast_connection(ConnectionState::Connected);
1102
1103        if is_reconnect {
1104            client.reregister_all().await;
1105        }
1106
1107        while let Some(event) = event_source.next().await {
1108            match event {
1109                Ok(Event::Open) => {}
1110                Ok(Event::Message(msg)) if msg.event == "update" || msg.event == "error" => {
1111                    let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&msg.data) else {
1112                        continue;
1113                    };
1114                    let Some(target) = envelope.target else {
1115                        continue;
1116                    };
1117
1118                    if msg.event == "update" {
1119                        if let Some(payload) = envelope.payload {
1120                            client.dispatch_event(&target, SseDispatch::Data(payload));
1121                        }
1122                    } else {
1123                        let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
1124                        let message =
1125                            envelope.message.unwrap_or_else(|| "Subscription error".to_string());
1126                        client.dispatch_event(&target, SseDispatch::Error { code, message });
1127                    }
1128                }
1129                Ok(Event::Message(_)) => {}
1130                Err(_) => break,
1131            }
1132        }
1133
1134        event_source.close();
1135        true
1136    }
1137
1138    fn request_error(err: reqwest::Error) -> ForgeClientError {
1139        ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
1140    }
1141}