Skip to main content

forge_dioxus/
client.rs

1
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4use std::rc::Rc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8use dioxus::prelude::{Signal, WritableExt, dioxus_core::Task};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11
12use crate::signals::ForgeSignals;
13use crate::types::{
14    ConnectionState, ForgeClientError, ForgeError, RpcEnvelopeRaw, StreamEvent,
15};
16
17type TokenProvider = Rc<dyn Fn() -> Option<String>>;
18type AuthErrorHandler = Rc<dyn Fn(ForgeError)>;
19type EventSender = futures_channel::mpsc::UnboundedSender<SseDispatch>;
20type ConnectWaiter = futures_channel::oneshot::Sender<Result<(), ForgeClientError>>;
21
22static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
23
24enum SseDispatch {
25    Data(serde_json::Value),
26    Error { code: String, message: String },
27}
28
29struct RegistrationMeta {
30    endpoint: &'static str,
31    payload: serde_json::Value,
32}
33
34#[derive(Default, Clone, Copy, PartialEq, Eq)]
35enum SseState {
36    #[default]
37    Idle,
38    Connecting,
39    Connected,
40}
41
42/// Shared SSE connection state, one per ForgeClient.
43#[derive(Default)]
44struct SseManager {
45    session_id: Option<String>,
46    session_secret: Option<String>,
47    state: SseState,
48    ever_connected: bool,
49    listeners: HashMap<String, EventSender>,
50    registrations: HashMap<String, RegistrationMeta>,
51    event_loop_task: Option<Task>,
52    reconnect_attempts: u32,
53    connect_waiters: Vec<ConnectWaiter>,
54}
55
56const MAX_RECONNECT_ATTEMPTS: u32 = 10;
57
58#[derive(Clone)]
59pub struct ForgeClientConfig {
60    pub url: String,
61    pub get_token: Option<TokenProvider>,
62    pub on_auth_error: Option<AuthErrorHandler>,
63    pub(crate) connection_state: Option<Signal<ConnectionState>>,
64}
65
66impl ForgeClientConfig {
67    pub fn new(url: impl Into<String>) -> Self {
68        Self {
69            url: url.into(),
70            get_token: None,
71            on_auth_error: None,
72            connection_state: None,
73        }
74    }
75
76    pub fn with_token_provider(mut self, provider: impl Fn() -> Option<String> + 'static) -> Self {
77        self.get_token = Some(Rc::new(provider));
78        self
79    }
80
81    pub fn with_auth_error_handler(
82        mut self,
83        handler: impl Fn(ForgeError) + 'static,
84    ) -> Self {
85        self.on_auth_error = Some(Rc::new(handler));
86        self
87    }
88
89    pub(crate) fn with_connection_state(mut self, state: Signal<ConnectionState>) -> Self {
90        self.connection_state = Some(state);
91        self
92    }
93}
94
95#[derive(Clone)]
96pub struct ForgeClient {
97    inner: Rc<ForgeClientInner>,
98}
99
100struct ForgeClientInner {
101    url: String,
102    get_token: Option<TokenProvider>,
103    on_auth_error: Option<AuthErrorHandler>,
104    connection_state: Option<Signal<ConnectionState>>,
105    sse: RefCell<SseManager>,
106    signals: RefCell<Option<ForgeSignals>>,
107}
108
109impl ForgeClient {
110    pub fn new(config: ForgeClientConfig) -> Self {
111        Self {
112            inner: Rc::new(ForgeClientInner {
113                url: config.url.trim_end_matches('/').to_string(),
114                get_token: config.get_token,
115                on_auth_error: config.on_auth_error,
116                connection_state: config.connection_state,
117                sse: RefCell::new(SseManager::default()),
118                signals: RefCell::new(None),
119            }),
120        }
121    }
122
123    /// Wire signals for correlation ID injection on RPC calls.
124    pub fn set_signals(&self, signals: ForgeSignals) {
125        *self.inner.signals.borrow_mut() = Some(signals);
126    }
127
128    /// Get the base URL of this client.
129    pub fn get_url(&self) -> &str {
130        &self.inner.url
131    }
132
133    /// Generate a correlation ID from the wired signals instance, if any.
134    fn correlation_id(&self) -> Option<String> {
135        self.inner.signals.borrow().as_ref().map(|s| s.next_correlation_id())
136    }
137
138    pub async fn call<TArgs, TResult>(
139        &self,
140        function_name: &str,
141        args: TArgs,
142    ) -> Result<TResult, ForgeClientError>
143    where
144        TArgs: Serialize,
145        TResult: DeserializeOwned,
146    {
147        let body = serde_json::json!({ "args": args });
148        let correlation_id = self.correlation_id();
149        let envelope = platform::request_json(
150            self,
151            &format!("{}/_api/rpc/{}", self.inner.url, function_name),
152            body,
153            correlation_id.as_deref(),
154        )
155        .await?;
156        self.decode_envelope(envelope)
157    }
158
159    #[cfg(target_arch = "wasm32")]
160    pub async fn call_multipart<TResult>(
161        &self,
162        function_name: &str,
163        form: web_sys::FormData,
164    ) -> Result<TResult, ForgeClientError>
165    where
166        TResult: DeserializeOwned,
167    {
168        let correlation_id = self.correlation_id();
169        let envelope = platform::request_multipart(
170            self,
171            &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
172            form,
173            correlation_id.as_deref(),
174        )
175        .await?;
176        self.decode_envelope(envelope)
177    }
178
179    #[cfg(not(target_arch = "wasm32"))]
180    pub async fn call_multipart<TResult>(
181        &self,
182        function_name: &str,
183        form: reqwest::multipart::Form,
184    ) -> Result<TResult, ForgeClientError>
185    where
186        TResult: DeserializeOwned,
187    {
188        let correlation_id = self.correlation_id();
189        let envelope = platform::request_multipart(
190            self,
191            &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
192            form,
193            correlation_id.as_deref(),
194        )
195        .await?;
196        self.decode_envelope(envelope)
197    }
198
199    pub fn subscribe_query<TArgs, TResult, F>(
200        &self,
201        function_name: &str,
202        args: TArgs,
203        callback: F,
204    ) -> SubscriptionHandle
205    where
206        TArgs: Serialize + Clone + 'static,
207        TResult: DeserializeOwned + Clone + 'static,
208        F: FnMut(StreamEvent<TResult>) + 'static,
209    {
210        let sub_id = self.random_id("sub");
211        let target = format!("sub:{sub_id}");
212
213        let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
214        self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
215
216        let args_value = serde_json::to_value(&args).unwrap_or(serde_json::Value::Null);
217        let reg_payload = serde_json::json!({
218            "id": sub_id,
219            "function": function_name,
220            "args": args_value,
221        });
222        self.inner.sse.borrow_mut().registrations.insert(
223            sub_id.clone(),
224            RegistrationMeta {
225                endpoint: "/_api/subscribe",
226                payload: reg_payload,
227            },
228        );
229
230        self.spawn_subscription(sub_id, target, rx, callback, |client, envelope, cb| {
231            match client.decode_envelope::<TResult>(envelope) {
232                Ok(data) => cb(StreamEvent::Data(data)),
233                Err(err) => cb(StreamEvent::Error(err)),
234            }
235        })
236    }
237
238    pub fn subscribe_job<TResult, F>(&self, job_id: String, callback: F) -> SubscriptionHandle
239    where
240        TResult: DeserializeOwned + Clone + 'static,
241        F: FnMut(StreamEvent<TResult>) + 'static,
242    {
243        self.subscribe_tracker("job", serde_json::json!({ "job_id": job_id }), "/_api/subscribe-job", callback)
244    }
245
246    pub fn subscribe_workflow<TResult, F>(
247        &self,
248        workflow_id: String,
249        callback: F,
250    ) -> SubscriptionHandle
251    where
252        TResult: DeserializeOwned + Clone + 'static,
253        F: FnMut(StreamEvent<TResult>) + 'static,
254    {
255        self.subscribe_tracker(
256            "wf",
257            serde_json::json!({ "workflow_id": workflow_id }),
258            "/_api/subscribe-workflow",
259            callback,
260        )
261    }
262
263    fn subscribe_tracker<TResult, F>(
264        &self,
265        prefix: &str,
266        payload: serde_json::Value,
267        endpoint: &'static str,
268        callback: F,
269    ) -> SubscriptionHandle
270    where
271        TResult: DeserializeOwned + Clone + 'static,
272        F: FnMut(StreamEvent<TResult>) + 'static,
273    {
274        let sub_id = self.random_id(prefix);
275        let target = format!("{prefix}:{sub_id}");
276
277        let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
278        self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
279
280        let mut reg_payload = payload;
281        reg_payload
282            .as_object_mut()
283            .expect("tracker payload must be an object")
284            .insert("id".to_string(), serde_json::Value::String(sub_id.clone()));
285        self.inner.sse.borrow_mut().registrations.insert(
286            sub_id.clone(),
287            RegistrationMeta {
288                endpoint,
289                payload: reg_payload,
290            },
291        );
292
293        self.spawn_subscription(sub_id, target, rx, callback, |_client, envelope, cb| {
294            if envelope.success {
295                if let Some(data) = envelope.data {
296                    match serde_json::from_value::<TResult>(data) {
297                        Ok(parsed) => cb(StreamEvent::Data(parsed)),
298                        Err(e) => cb(StreamEvent::Error(ForgeClientError::new(
299                            "DESERIALIZATION_ERROR",
300                            e.to_string(),
301                            None,
302                        ))),
303                    }
304                }
305            }
306        })
307    }
308
309    fn spawn_subscription<TResult, F>(
310        &self,
311        sub_id: String,
312        target: String,
313        mut rx: futures_channel::mpsc::UnboundedReceiver<SseDispatch>,
314        mut callback: F,
315        on_initial: impl FnOnce(&ForgeClient, RpcEnvelopeRaw, &mut F) + 'static,
316    ) -> SubscriptionHandle
317    where
318        TResult: DeserializeOwned + Clone + 'static,
319        F: FnMut(StreamEvent<TResult>) + 'static,
320    {
321        let client = self.clone();
322        let handle = SubscriptionHandle::new(sub_id.clone(), target, self.clone());
323        let handle_task = handle.clone();
324
325        let task = dioxus::prelude::spawn(async move {
326            callback(StreamEvent::Connection(ConnectionState::Connecting));
327
328            if let Err(e) = client.ensure_connected().await {
329                callback(StreamEvent::Error(e));
330                callback(StreamEvent::Connection(ConnectionState::Disconnected));
331                handle_task.finish();
332                return;
333            }
334
335            match client.register_subscription(&sub_id).await {
336                Ok(envelope) => {
337                    callback(StreamEvent::Connection(ConnectionState::Connected));
338                    on_initial(&client, envelope, &mut callback);
339                }
340                Err(err) => {
341                    callback(StreamEvent::Error(err));
342                    callback(StreamEvent::Connection(ConnectionState::Disconnected));
343                    handle_task.finish();
344                    return;
345                }
346            }
347
348            while let Some(event) = futures_util::StreamExt::next(&mut rx).await {
349                Self::deliver_event::<TResult, F>(&mut callback, &client, event);
350            }
351
352            handle_task.finish();
353        });
354
355        handle.set_task(task);
356        handle
357    }
358
359    fn deliver_event<TResult, F>(
360        callback: &mut F,
361        client: &ForgeClient,
362        event: SseDispatch,
363    ) where
364        TResult: DeserializeOwned,
365        F: FnMut(StreamEvent<TResult>),
366    {
367        match event {
368            SseDispatch::Data(value) => match serde_json::from_value::<TResult>(value) {
369                Ok(data) => callback(StreamEvent::Data(data)),
370                Err(e) => {
371                    callback(StreamEvent::Error(ForgeClientError::new(
372                        "DESERIALIZATION_ERROR",
373                        e.to_string(),
374                        None,
375                    )));
376                }
377            },
378            SseDispatch::Error { code, message } => {
379                let err = ForgeClientError::new(&code, &message, None);
380                if code == "UNAUTHORIZED" {
381                    if let Some(handler) = &client.inner.on_auth_error {
382                        handler(err.as_forge_error());
383                    }
384                }
385                callback(StreamEvent::Error(err));
386            }
387        }
388    }
389
390    /// Ensure the shared SSE connection is established. Spawns the event loop on first call.
391    async fn ensure_connected(&self) -> Result<(), ForgeClientError> {
392        let rx = {
393            let mut sse = self.inner.sse.borrow_mut();
394            if sse.state == SseState::Connected {
395                return Ok(());
396            }
397
398            let (tx, rx) = futures_channel::oneshot::channel();
399            sse.connect_waiters.push(tx);
400
401            if sse.state == SseState::Idle {
402                sse.state = SseState::Connecting;
403                drop(sse);
404                platform::start_event_loop(self.clone());
405            }
406
407            rx
408        };
409
410        rx.await.unwrap_or_else(|_| {
411            Err(ForgeClientError::new(
412                "SSE_CONNECTION_FAILED",
413                "Connection attempt cancelled",
414                None,
415            ))
416        })
417    }
418
419    /// Register a subscription with the server via POST.
420    /// If the server returns SESSION_NOT_FOUND, forces a reconnect and retries once.
421    async fn register_subscription(
422        &self,
423        sub_id: &str,
424    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
425        let envelope = self.try_register_subscription(sub_id).await?;
426
427        // Stale session: force reconnect and retry once
428        let needs_retry = !envelope.success
429            && envelope
430                .error
431                .as_ref()
432                .is_some_and(|e| e.code == "SESSION_NOT_FOUND" || e.code == "SESSION_PRINCIPAL_MISMATCH");
433
434        if needs_retry {
435            self.force_reconnect().await;
436            self.ensure_connected().await?;
437            return self.try_register_subscription(sub_id).await;
438        }
439
440        Ok(envelope)
441    }
442
443    async fn try_register_subscription(
444        &self,
445        sub_id: &str,
446    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
447        let (endpoint, payload) = {
448            let sse = self.inner.sse.borrow();
449            let meta = sse
450                .registrations
451                .get(sub_id)
452                .ok_or_else(|| {
453                    ForgeClientError::new("INTERNAL_ERROR", "Registration metadata not found", None)
454                })?;
455            let session_id = sse.session_id.clone().unwrap_or_default();
456            let session_secret = sse.session_secret.clone().unwrap_or_default();
457            let mut payload = meta.payload.clone();
458            let obj = payload
459                .as_object_mut()
460                .expect("registration payload must be an object");
461            obj.insert("session_id".into(), serde_json::Value::String(session_id));
462            obj.insert("session_secret".into(), serde_json::Value::String(session_secret));
463            (meta.endpoint, payload)
464        };
465
466        let url = format!("{}{}", self.inner.url, endpoint);
467        platform::request_json(self, &url, payload, None).await
468    }
469
470    async fn force_reconnect(&self) {
471        let task = {
472            let mut sse = self.inner.sse.borrow_mut();
473            sse.session_id = None;
474            sse.session_secret = None;
475            sse.state = SseState::Idle;
476            // No need to drain waiters; force_reconnect is only called mid-registration
477            // where the caller already passed ensure_connected
478            sse.event_loop_task.take()
479        };
480        if let Some(task) = task {
481            task.cancel();
482        }
483        sleep(Duration::from_millis(10)).await;
484    }
485
486    async fn reregister_all(&self) {
487        let sub_ids: Vec<String> = {
488            let sse = self.inner.sse.borrow();
489            sse.registrations.keys().cloned().collect()
490        };
491
492        for sub_id in sub_ids {
493            let _ = self.register_subscription(&sub_id).await;
494        }
495    }
496
497    fn dispatch_event(&self, target: &str, event: SseDispatch) {
498        let tx = {
499            let sse = self.inner.sse.borrow();
500            sse.listeners.get(target).cloned()
501        };
502        if let Some(tx) = tx {
503            let _ = tx.unbounded_send(event);
504        }
505    }
506
507    fn broadcast_connection(&self, state: ConnectionState) {
508        if let Some(mut signal) = self.inner.connection_state {
509            signal.set(state);
510        }
511    }
512
513    fn mark_connected(&self, session_id: String, session_secret: String) -> bool {
514        let mut sse = self.inner.sse.borrow_mut();
515        let is_reconnect = sse.ever_connected;
516        sse.session_id = Some(session_id);
517        sse.session_secret = Some(session_secret);
518        sse.state = SseState::Connected;
519        sse.reconnect_attempts = 0;
520        sse.ever_connected = true;
521        for waiter in sse.connect_waiters.drain(..) {
522            let _ = waiter.send(Ok(()));
523        }
524        is_reconnect
525    }
526
527    fn mark_disconnected(&self) {
528        let mut sse = self.inner.sse.borrow_mut();
529        sse.session_id = None;
530        sse.session_secret = None;
531        sse.state = SseState::Idle;
532        sse.event_loop_task = None;
533        let err = || ForgeClientError::new("SSE_CONNECTION_FAILED", "SSE connection lost", None);
534        for waiter in sse.connect_waiters.drain(..) {
535            let _ = waiter.send(Err(err()));
536        }
537    }
538
539    fn should_reconnect(&self) -> Option<u32> {
540        let mut sse = self.inner.sse.borrow_mut();
541        if sse.listeners.is_empty() {
542            return None;
543        }
544        let attempts = sse.reconnect_attempts;
545        if attempts >= MAX_RECONNECT_ATTEMPTS {
546            return None;
547        }
548        sse.reconnect_attempts = attempts + 1;
549        Some(attempts)
550    }
551
552    fn get_token(&self) -> Option<String> {
553        self.inner
554            .get_token
555            .as_ref()
556            .and_then(|provider| provider())
557            .filter(|t| !t.is_empty())
558    }
559
560    fn decode_envelope<TResult>(
561        &self,
562        envelope: RpcEnvelopeRaw,
563    ) -> Result<TResult, ForgeClientError>
564    where
565        TResult: DeserializeOwned,
566    {
567        if !envelope.success {
568            let error = envelope.error.unwrap_or(ForgeError {
569                code: "UNKNOWN".to_string(),
570                message: "Unknown error".to_string(),
571                details: None,
572            });
573            return Err(ForgeClientError::new(error.code, error.message, error.details));
574        }
575
576        let data = envelope.data.ok_or_else(|| {
577            ForgeClientError::new("EMPTY_RESPONSE", "Server returned no data", None)
578        })?;
579        serde_json::from_value(data)
580            .map_err(|err| ForgeClientError::new("DESERIALIZATION_ERROR", err.to_string(), None))
581    }
582
583    fn random_id(&self, prefix: &str) -> String {
584        let id = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
585        format!("{prefix}-{id}")
586    }
587}
588
589async fn sleep(duration: Duration) {
590    #[cfg(target_arch = "wasm32")]
591    {
592        gloo_timers::future::sleep(duration).await;
593    }
594
595    #[cfg(not(target_arch = "wasm32"))]
596    {
597        tokio::time::sleep(duration).await;
598    }
599}
600
601#[derive(Clone)]
602pub struct SubscriptionHandle {
603    closed: Rc<Cell<bool>>,
604    task: Rc<RefCell<Option<Task>>>,
605    cleanup: Rc<RefCell<Option<Box<dyn FnOnce()>>>>,
606}
607
608impl SubscriptionHandle {
609    fn new(sub_id: String, target: String, client: ForgeClient) -> Self {
610        let cleanup: Box<dyn FnOnce()> = Box::new(move || {
611            let mut sse = client.inner.sse.borrow_mut();
612            sse.listeners.remove(&target);
613            sse.registrations.remove(&sub_id);
614            // Jobs/workflows have server-managed lifecycles; only query subs need explicit unsubscribe
615            if target.starts_with("sub:") {
616                let session_id = sse.session_id.clone();
617                let session_secret = sse.session_secret.clone();
618                drop(sse);
619                if let (Some(sid), Some(ss)) = (session_id, session_secret) {
620                    let url = format!("{}/_api/unsubscribe", client.inner.url);
621                    let payload = serde_json::json!({
622                        "session_id": sid,
623                        "session_secret": ss,
624                        "id": sub_id,
625                    });
626                    let client = client.clone();
627                    dioxus::prelude::spawn(async move {
628                        let _ = platform::request_json(&client, &url, payload, None).await;
629                    });
630                }
631            }
632        });
633
634        Self {
635            closed: Rc::new(Cell::new(false)),
636            task: Rc::new(RefCell::new(None)),
637            cleanup: Rc::new(RefCell::new(Some(cleanup))),
638        }
639    }
640
641    fn set_task(&self, task: Task) {
642        *self.task.borrow_mut() = Some(task);
643    }
644
645    pub(crate) fn finish(&self) {
646        if self.closed.replace(true) {
647            return;
648        }
649        if let Some(cleanup) = self.cleanup.borrow_mut().take() {
650            cleanup();
651        }
652        self.task.borrow_mut().take();
653    }
654
655    pub fn close(&self) {
656        let task = { self.task.borrow_mut().clone() };
657        self.finish();
658        if let Some(task) = task {
659            task.cancel();
660        }
661    }
662
663    pub fn is_closed(&self) -> bool {
664        self.closed.get()
665    }
666}
667
668impl Drop for SubscriptionHandle {
669    fn drop(&mut self) {
670        self.close();
671    }
672}
673
674// ── WASM platform ───────────────────────────────────────────────────────────
675
676#[cfg(target_arch = "wasm32")]
677mod platform {
678    use dioxus::prelude::spawn;
679    use futures_util::{StreamExt, stream};
680    use gloo_net::eventsource::futures::EventSource;
681    use gloo_net::http::Request;
682    use js_sys::{JSON, encode_uri_component};
683
684    use super::{ForgeClient, SseDispatch, sleep};
685    use crate::signals::platform_tag;
686    use crate::types::{
687        ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
688    };
689
690    pub(super) async fn request_json(
691        client: &ForgeClient,
692        url: &str,
693        body: serde_json::Value,
694        correlation_id: Option<&str>,
695    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
696        let mut request = Request::post(url)
697            .header("Content-Type", "application/json")
698            .header("x-forge-platform", platform_tag())
699            .credentials(web_sys::RequestCredentials::Include);
700        if let Some(token) = client.get_token() {
701            request = request.header("Authorization", &format!("Bearer {token}"));
702        }
703        if let Some(cid) = correlation_id {
704            request = request.header("x-correlation-id", cid);
705        }
706
707        let request = request.body(body.to_string()).map_err(request_error)?;
708        request
709            .send()
710            .await
711            .map_err(request_error)?
712            .json()
713            .await
714            .map_err(request_error)
715    }
716
717    pub(super) async fn request_multipart(
718        client: &ForgeClient,
719        url: &str,
720        form: web_sys::FormData,
721        correlation_id: Option<&str>,
722    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
723        let mut request = Request::post(url)
724            .header("x-forge-platform", platform_tag())
725            .credentials(web_sys::RequestCredentials::Include);
726        if let Some(token) = client.get_token() {
727            request = request.header("Authorization", &format!("Bearer {token}"));
728        }
729        if let Some(cid) = correlation_id {
730            request = request.header("x-correlation-id", cid);
731        }
732
733        let response = request.body(form).map_err(request_error)?;
734        response
735            .send()
736            .await
737            .map_err(request_error)?
738            .json()
739            .await
740            .map_err(request_error)
741    }
742
743    fn message_data_as_string(message: &web_sys::MessageEvent) -> Option<String> {
744        let data = message.data();
745        data.as_string().or_else(|| {
746            JSON::stringify(&data)
747                .ok()
748                .and_then(|value| value.as_string())
749                .map(|raw| serde_json::from_str::<String>(&raw).unwrap_or(raw))
750        })
751    }
752
753    fn events_url(client: &ForgeClient) -> String {
754        match client.get_token() {
755            Some(token) => format!(
756                "{}/_api/events?token={}",
757                client.inner.url,
758                encode_uri_component(&token)
759            ),
760            None => format!("{}/_api/events", client.inner.url),
761        }
762    }
763
764    /// Start the single shared SSE event loop.
765    pub(super) fn start_event_loop(client: ForgeClient) {
766        let client_for_task = client.clone();
767        let task = spawn(async move {
768            run_event_loop(&client_for_task).await;
769
770            // Disconnected. Try to reconnect.
771            client_for_task.mark_disconnected();
772            client_for_task.broadcast_connection(ConnectionState::Disconnected);
773
774            if let Some(attempts) = client_for_task.should_reconnect() {
775                let delay = 1000 * (1u64 << attempts.min(4));
776                let jitter = (js_sys::Math::random() * 500.0) as u64;
777                sleep(std::time::Duration::from_millis(delay + jitter)).await;
778
779                client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
780                start_event_loop(client_for_task);
781            }
782        });
783
784        client.inner.sse.borrow_mut().event_loop_task = Some(task);
785    }
786
787    async fn run_event_loop(client: &ForgeClient) {
788        let mut event_source = match EventSource::new(&events_url(client)) {
789            Ok(source) => source,
790            Err(_) => {
791                return;
792            }
793        };
794
795        let mut connected_stream = match event_source.subscribe("connected") {
796            Ok(stream) => stream,
797            Err(_) => return,
798        };
799        let update_stream = match event_source.subscribe("update") {
800            Ok(stream) => stream,
801            Err(_) => return,
802        };
803        let error_stream = match event_source.subscribe("error") {
804            Ok(stream) => stream,
805            Err(_) => return,
806        };
807
808        // Wait for the connected event
809        let connected_event = match connected_stream.next().await {
810            Some(Ok((_kind, message))) => {
811                let Some(raw) = message_data_as_string(&message) else {
812                    return;
813                };
814                match serde_json::from_str::<ConnectedEvent>(&raw) {
815                    Ok(event) => event,
816                    Err(_) => return,
817                }
818            }
819            _ => return,
820        };
821
822        let session_id = connected_event.session_id.unwrap_or_default();
823        let session_secret = connected_event.session_secret.unwrap_or_default();
824
825        if session_id.is_empty() || session_secret.is_empty() {
826            return;
827        }
828
829        let is_reconnect = client.mark_connected(session_id, session_secret);
830        client.broadcast_connection(ConnectionState::Connected);
831
832        // Only re-register on reconnect; initial registrations are handled by each subscription task
833        if is_reconnect {
834            client.reregister_all().await;
835        }
836
837        let mut events = stream::select(update_stream, error_stream);
838        while let Some(event) = events.next().await {
839            match event {
840                Ok((kind, message)) => {
841                    let Some(raw) = message_data_as_string(&message) else {
842                        continue;
843                    };
844                    let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&raw) else {
845                        continue;
846                    };
847
848                    let Some(target) = envelope.target else {
849                        continue;
850                    };
851
852                    if kind == "update" {
853                        if let Some(payload) = envelope.payload {
854                            client.dispatch_event(&target, SseDispatch::Data(payload));
855                        }
856                    } else {
857                        let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
858                        let message = envelope.message.unwrap_or_else(|| "Subscription error".to_string());
859                        client.dispatch_event(&target, SseDispatch::Error { code, message });
860                    }
861                }
862                Err(_) => break,
863            }
864        }
865
866        event_source.close();
867    }
868
869    fn request_error(err: gloo_net::Error) -> ForgeClientError {
870        ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
871    }
872}
873
874// ── Native platform ─────────────────────────────────────────────────────────
875
876#[cfg(not(target_arch = "wasm32"))]
877mod platform {
878    use dioxus::prelude::spawn;
879    use futures_util::StreamExt;
880    use reqwest::Client;
881    use reqwest_eventsource::{Event, EventSource};
882
883    use super::{ForgeClient, SseDispatch, sleep};
884    use crate::signals::platform_tag;
885    use crate::types::{
886        ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
887    };
888
889    pub(super) async fn request_json(
890        client: &ForgeClient,
891        url: &str,
892        body: serde_json::Value,
893        correlation_id: Option<&str>,
894    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
895        let mut request = Client::new()
896            .post(url)
897            .header("x-forge-platform", platform_tag())
898            .json(&body);
899        if let Some(token) = client.get_token() {
900            request = request.bearer_auth(token);
901        }
902        if let Some(cid) = correlation_id {
903            request = request.header("x-correlation-id", cid);
904        }
905
906        request
907            .send()
908            .await
909            .map_err(request_error)?
910            .json()
911            .await
912            .map_err(request_error)
913    }
914
915    pub(super) async fn request_multipart(
916        client: &ForgeClient,
917        url: &str,
918        form: reqwest::multipart::Form,
919        correlation_id: Option<&str>,
920    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
921        let mut request = Client::new()
922            .post(url)
923            .header("x-forge-platform", platform_tag())
924            .multipart(form);
925        if let Some(token) = client.get_token() {
926            request = request.bearer_auth(token);
927        }
928        if let Some(cid) = correlation_id {
929            request = request.header("x-correlation-id", cid);
930        }
931
932        request
933            .send()
934            .await
935            .map_err(request_error)?
936            .json()
937            .await
938            .map_err(request_error)
939    }
940
941    /// Start the single shared SSE event loop.
942    pub(super) fn start_event_loop(client: ForgeClient) {
943        let client_for_task = client.clone();
944        let task = spawn(async move {
945            run_event_loop(&client_for_task).await;
946
947            client_for_task.mark_disconnected();
948            client_for_task.broadcast_connection(ConnectionState::Disconnected);
949
950            if let Some(attempts) = client_for_task.should_reconnect() {
951                let delay = 1000 * (1u64 << attempts.min(4));
952                sleep(std::time::Duration::from_millis(delay)).await;
953
954                client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
955                start_event_loop(client_for_task);
956            }
957        });
958
959        client.inner.sse.borrow_mut().event_loop_task = Some(task);
960    }
961
962    async fn run_event_loop(client: &ForgeClient) {
963        let mut request = Client::new().get(format!("{}/_api/events", client.inner.url));
964        if let Some(token) = client.get_token() {
965            request = request.bearer_auth(token);
966        }
967
968        let mut event_source = match EventSource::new(request) {
969            Ok(source) => source,
970            Err(_) => return,
971        };
972
973        // Wait for connected event
974        let connected_event = loop {
975            let Some(event) = event_source.next().await else {
976                return;
977            };
978            match event {
979                Ok(Event::Open) => continue,
980                Ok(Event::Message(msg)) if msg.event == "connected" => {
981                    match serde_json::from_str::<ConnectedEvent>(&msg.data) {
982                        Ok(event) => break event,
983                        Err(_) => return,
984                    }
985                }
986                Ok(Event::Message(_)) => continue,
987                Err(_) => return,
988            }
989        };
990
991        let session_id = connected_event.session_id.unwrap_or_default();
992        let session_secret = connected_event.session_secret.unwrap_or_default();
993
994        if session_id.is_empty() || session_secret.is_empty() {
995            return;
996        }
997
998        let is_reconnect = client.mark_connected(session_id, session_secret);
999        client.broadcast_connection(ConnectionState::Connected);
1000
1001        if is_reconnect {
1002            client.reregister_all().await;
1003        }
1004
1005        while let Some(event) = event_source.next().await {
1006            match event {
1007                Ok(Event::Open) => {}
1008                Ok(Event::Message(msg)) if msg.event == "update" || msg.event == "error" => {
1009                    let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&msg.data) else {
1010                        continue;
1011                    };
1012                    let Some(target) = envelope.target else {
1013                        continue;
1014                    };
1015
1016                    if msg.event == "update" {
1017                        if let Some(payload) = envelope.payload {
1018                            client.dispatch_event(&target, SseDispatch::Data(payload));
1019                        }
1020                    } else {
1021                        let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
1022                        let message =
1023                            envelope.message.unwrap_or_else(|| "Subscription error".to_string());
1024                        client.dispatch_event(&target, SseDispatch::Error { code, message });
1025                    }
1026                }
1027                Ok(Event::Message(_)) => {}
1028                Err(_) => break,
1029            }
1030        }
1031
1032        event_source.close();
1033    }
1034
1035    fn request_error(err: reqwest::Error) -> ForgeClientError {
1036        ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
1037    }
1038}