Skip to main content

forge_dioxus/
client.rs

1
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4use std::rc::Rc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8use dioxus::prelude::{Signal, WritableExt, dioxus_core::Task};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11
12use crate::types::{
13    ConnectionState, ForgeClientError, ForgeError, RpcEnvelopeRaw, StreamEvent,
14};
15
16type TokenProvider = Rc<dyn Fn() -> Option<String>>;
17type AuthErrorHandler = Rc<dyn Fn(ForgeError)>;
18type EventSender = futures_channel::mpsc::UnboundedSender<SseDispatch>;
19type ConnectWaiter = futures_channel::oneshot::Sender<Result<(), ForgeClientError>>;
20
21static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
22
23enum SseDispatch {
24    Data(serde_json::Value),
25    Error { code: String, message: String },
26}
27
28struct RegistrationMeta {
29    endpoint: &'static str,
30    payload: serde_json::Value,
31}
32
33#[derive(Default, Clone, Copy, PartialEq, Eq)]
34enum SseState {
35    #[default]
36    Idle,
37    Connecting,
38    Connected,
39}
40
41/// Shared SSE connection state, one per ForgeClient.
42#[derive(Default)]
43struct SseManager {
44    session_id: Option<String>,
45    session_secret: Option<String>,
46    state: SseState,
47    ever_connected: bool,
48    listeners: HashMap<String, EventSender>,
49    registrations: HashMap<String, RegistrationMeta>,
50    event_loop_task: Option<Task>,
51    reconnect_attempts: u32,
52    connect_waiters: Vec<ConnectWaiter>,
53}
54
55const MAX_RECONNECT_ATTEMPTS: u32 = 10;
56
57#[derive(Clone)]
58pub struct ForgeClientConfig {
59    pub url: String,
60    pub get_token: Option<TokenProvider>,
61    pub on_auth_error: Option<AuthErrorHandler>,
62    pub(crate) connection_state: Option<Signal<ConnectionState>>,
63}
64
65impl ForgeClientConfig {
66    pub fn new(url: impl Into<String>) -> Self {
67        Self {
68            url: url.into(),
69            get_token: None,
70            on_auth_error: None,
71            connection_state: None,
72        }
73    }
74
75    pub fn with_token_provider(mut self, provider: impl Fn() -> Option<String> + 'static) -> Self {
76        self.get_token = Some(Rc::new(provider));
77        self
78    }
79
80    pub fn with_auth_error_handler(
81        mut self,
82        handler: impl Fn(ForgeError) + 'static,
83    ) -> Self {
84        self.on_auth_error = Some(Rc::new(handler));
85        self
86    }
87
88    pub(crate) fn with_connection_state(mut self, state: Signal<ConnectionState>) -> Self {
89        self.connection_state = Some(state);
90        self
91    }
92}
93
94#[derive(Clone)]
95pub struct ForgeClient {
96    inner: Rc<ForgeClientInner>,
97}
98
99struct ForgeClientInner {
100    url: String,
101    get_token: Option<TokenProvider>,
102    on_auth_error: Option<AuthErrorHandler>,
103    connection_state: Option<Signal<ConnectionState>>,
104    sse: RefCell<SseManager>,
105}
106
107impl ForgeClient {
108    pub fn new(config: ForgeClientConfig) -> Self {
109        Self {
110            inner: Rc::new(ForgeClientInner {
111                url: config.url.trim_end_matches('/').to_string(),
112                get_token: config.get_token,
113                on_auth_error: config.on_auth_error,
114                connection_state: config.connection_state,
115                sse: RefCell::new(SseManager::default()),
116            }),
117        }
118    }
119
120    pub async fn call<TArgs, TResult>(
121        &self,
122        function_name: &str,
123        args: TArgs,
124    ) -> Result<TResult, ForgeClientError>
125    where
126        TArgs: Serialize,
127        TResult: DeserializeOwned,
128    {
129        let body = serde_json::json!({ "args": args });
130        let envelope = platform::request_json(
131            self,
132            &format!("{}/_api/rpc/{}", self.inner.url, function_name),
133            body,
134        )
135        .await?;
136        self.decode_envelope(envelope)
137    }
138
139    #[cfg(target_arch = "wasm32")]
140    pub async fn call_multipart<TResult>(
141        &self,
142        function_name: &str,
143        form: web_sys::FormData,
144    ) -> Result<TResult, ForgeClientError>
145    where
146        TResult: DeserializeOwned,
147    {
148        let envelope = platform::request_multipart(
149            self,
150            &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
151            form,
152        )
153        .await?;
154        self.decode_envelope(envelope)
155    }
156
157    #[cfg(not(target_arch = "wasm32"))]
158    pub async fn call_multipart<TResult>(
159        &self,
160        function_name: &str,
161        form: reqwest::multipart::Form,
162    ) -> Result<TResult, ForgeClientError>
163    where
164        TResult: DeserializeOwned,
165    {
166        let envelope = platform::request_multipart(
167            self,
168            &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
169            form,
170        )
171        .await?;
172        self.decode_envelope(envelope)
173    }
174
175    pub fn subscribe_query<TArgs, TResult, F>(
176        &self,
177        function_name: &str,
178        args: TArgs,
179        callback: F,
180    ) -> SubscriptionHandle
181    where
182        TArgs: Serialize + Clone + 'static,
183        TResult: DeserializeOwned + Clone + 'static,
184        F: FnMut(StreamEvent<TResult>) + 'static,
185    {
186        let sub_id = self.random_id("sub");
187        let target = format!("sub:{sub_id}");
188
189        let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
190        self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
191
192        let args_value = serde_json::to_value(&args).unwrap_or(serde_json::Value::Null);
193        let reg_payload = serde_json::json!({
194            "id": sub_id,
195            "function": function_name,
196            "args": args_value,
197        });
198        self.inner.sse.borrow_mut().registrations.insert(
199            sub_id.clone(),
200            RegistrationMeta {
201                endpoint: "/_api/subscribe",
202                payload: reg_payload,
203            },
204        );
205
206        self.spawn_subscription(sub_id, target, rx, callback, |client, envelope, cb| {
207            match client.decode_envelope::<TResult>(envelope) {
208                Ok(data) => cb(StreamEvent::Data(data)),
209                Err(err) => cb(StreamEvent::Error(err)),
210            }
211        })
212    }
213
214    pub fn subscribe_job<TResult, F>(&self, job_id: String, callback: F) -> SubscriptionHandle
215    where
216        TResult: DeserializeOwned + Clone + 'static,
217        F: FnMut(StreamEvent<TResult>) + 'static,
218    {
219        self.subscribe_tracker("job", serde_json::json!({ "job_id": job_id }), "/_api/subscribe-job", callback)
220    }
221
222    pub fn subscribe_workflow<TResult, F>(
223        &self,
224        workflow_id: String,
225        callback: F,
226    ) -> SubscriptionHandle
227    where
228        TResult: DeserializeOwned + Clone + 'static,
229        F: FnMut(StreamEvent<TResult>) + 'static,
230    {
231        self.subscribe_tracker(
232            "wf",
233            serde_json::json!({ "workflow_id": workflow_id }),
234            "/_api/subscribe-workflow",
235            callback,
236        )
237    }
238
239    fn subscribe_tracker<TResult, F>(
240        &self,
241        prefix: &str,
242        payload: serde_json::Value,
243        endpoint: &'static str,
244        callback: F,
245    ) -> SubscriptionHandle
246    where
247        TResult: DeserializeOwned + Clone + 'static,
248        F: FnMut(StreamEvent<TResult>) + 'static,
249    {
250        let sub_id = self.random_id(prefix);
251        let target = format!("{prefix}:{sub_id}");
252
253        let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
254        self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
255
256        let mut reg_payload = payload;
257        reg_payload
258            .as_object_mut()
259            .expect("tracker payload must be an object")
260            .insert("id".to_string(), serde_json::Value::String(sub_id.clone()));
261        self.inner.sse.borrow_mut().registrations.insert(
262            sub_id.clone(),
263            RegistrationMeta {
264                endpoint,
265                payload: reg_payload,
266            },
267        );
268
269        self.spawn_subscription(sub_id, target, rx, callback, |_client, envelope, cb| {
270            if envelope.success {
271                if let Some(data) = envelope.data {
272                    match serde_json::from_value::<TResult>(data) {
273                        Ok(parsed) => cb(StreamEvent::Data(parsed)),
274                        Err(e) => cb(StreamEvent::Error(ForgeClientError::new(
275                            "DESERIALIZATION_ERROR",
276                            e.to_string(),
277                            None,
278                        ))),
279                    }
280                }
281            }
282        })
283    }
284
285    fn spawn_subscription<TResult, F>(
286        &self,
287        sub_id: String,
288        target: String,
289        mut rx: futures_channel::mpsc::UnboundedReceiver<SseDispatch>,
290        mut callback: F,
291        on_initial: impl FnOnce(&ForgeClient, RpcEnvelopeRaw, &mut F) + 'static,
292    ) -> SubscriptionHandle
293    where
294        TResult: DeserializeOwned + Clone + 'static,
295        F: FnMut(StreamEvent<TResult>) + 'static,
296    {
297        let client = self.clone();
298        let handle = SubscriptionHandle::new(sub_id.clone(), target, self.clone());
299        let handle_task = handle.clone();
300
301        let task = dioxus::prelude::spawn(async move {
302            callback(StreamEvent::Connection(ConnectionState::Connecting));
303
304            if let Err(e) = client.ensure_connected().await {
305                callback(StreamEvent::Error(e));
306                callback(StreamEvent::Connection(ConnectionState::Disconnected));
307                handle_task.finish();
308                return;
309            }
310
311            match client.register_subscription(&sub_id).await {
312                Ok(envelope) => {
313                    callback(StreamEvent::Connection(ConnectionState::Connected));
314                    on_initial(&client, envelope, &mut callback);
315                }
316                Err(err) => {
317                    callback(StreamEvent::Error(err));
318                    callback(StreamEvent::Connection(ConnectionState::Disconnected));
319                    handle_task.finish();
320                    return;
321                }
322            }
323
324            while let Some(event) = futures_util::StreamExt::next(&mut rx).await {
325                Self::deliver_event::<TResult, F>(&mut callback, &client, event);
326            }
327
328            handle_task.finish();
329        });
330
331        handle.set_task(task);
332        handle
333    }
334
335    fn deliver_event<TResult, F>(
336        callback: &mut F,
337        client: &ForgeClient,
338        event: SseDispatch,
339    ) where
340        TResult: DeserializeOwned,
341        F: FnMut(StreamEvent<TResult>),
342    {
343        match event {
344            SseDispatch::Data(value) => match serde_json::from_value::<TResult>(value) {
345                Ok(data) => callback(StreamEvent::Data(data)),
346                Err(e) => {
347                    callback(StreamEvent::Error(ForgeClientError::new(
348                        "DESERIALIZATION_ERROR",
349                        e.to_string(),
350                        None,
351                    )));
352                }
353            },
354            SseDispatch::Error { code, message } => {
355                let err = ForgeClientError::new(&code, &message, None);
356                if code == "UNAUTHORIZED" {
357                    if let Some(handler) = &client.inner.on_auth_error {
358                        handler(err.as_forge_error());
359                    }
360                }
361                callback(StreamEvent::Error(err));
362            }
363        }
364    }
365
366    /// Ensure the shared SSE connection is established. Spawns the event loop on first call.
367    async fn ensure_connected(&self) -> Result<(), ForgeClientError> {
368        let rx = {
369            let mut sse = self.inner.sse.borrow_mut();
370            if sse.state == SseState::Connected {
371                return Ok(());
372            }
373
374            let (tx, rx) = futures_channel::oneshot::channel();
375            sse.connect_waiters.push(tx);
376
377            if sse.state == SseState::Idle {
378                sse.state = SseState::Connecting;
379                drop(sse);
380                platform::start_event_loop(self.clone());
381            }
382
383            rx
384        };
385
386        rx.await.unwrap_or_else(|_| {
387            Err(ForgeClientError::new(
388                "SSE_CONNECTION_FAILED",
389                "Connection attempt cancelled",
390                None,
391            ))
392        })
393    }
394
395    /// Register a subscription with the server via POST.
396    /// If the server returns SESSION_NOT_FOUND, forces a reconnect and retries once.
397    async fn register_subscription(
398        &self,
399        sub_id: &str,
400    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
401        let envelope = self.try_register_subscription(sub_id).await?;
402
403        // Stale session: force reconnect and retry once
404        let needs_retry = !envelope.success
405            && envelope
406                .error
407                .as_ref()
408                .is_some_and(|e| e.code == "SESSION_NOT_FOUND" || e.code == "SESSION_PRINCIPAL_MISMATCH");
409
410        if needs_retry {
411            self.force_reconnect().await;
412            self.ensure_connected().await?;
413            return self.try_register_subscription(sub_id).await;
414        }
415
416        Ok(envelope)
417    }
418
419    async fn try_register_subscription(
420        &self,
421        sub_id: &str,
422    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
423        let (endpoint, payload) = {
424            let sse = self.inner.sse.borrow();
425            let meta = sse
426                .registrations
427                .get(sub_id)
428                .ok_or_else(|| {
429                    ForgeClientError::new("INTERNAL_ERROR", "Registration metadata not found", None)
430                })?;
431            let session_id = sse.session_id.clone().unwrap_or_default();
432            let session_secret = sse.session_secret.clone().unwrap_or_default();
433            let mut payload = meta.payload.clone();
434            let obj = payload
435                .as_object_mut()
436                .expect("registration payload must be an object");
437            obj.insert("session_id".into(), serde_json::Value::String(session_id));
438            obj.insert("session_secret".into(), serde_json::Value::String(session_secret));
439            (meta.endpoint, payload)
440        };
441
442        let url = format!("{}{}", self.inner.url, endpoint);
443        platform::request_json(self, &url, payload).await
444    }
445
446    async fn force_reconnect(&self) {
447        let task = {
448            let mut sse = self.inner.sse.borrow_mut();
449            sse.session_id = None;
450            sse.session_secret = None;
451            sse.state = SseState::Idle;
452            // No need to drain waiters; force_reconnect is only called mid-registration
453            // where the caller already passed ensure_connected
454            sse.event_loop_task.take()
455        };
456        if let Some(task) = task {
457            task.cancel();
458        }
459        sleep(Duration::from_millis(10)).await;
460    }
461
462    async fn reregister_all(&self) {
463        let sub_ids: Vec<String> = {
464            let sse = self.inner.sse.borrow();
465            sse.registrations.keys().cloned().collect()
466        };
467
468        for sub_id in sub_ids {
469            let _ = self.register_subscription(&sub_id).await;
470        }
471    }
472
473    fn dispatch_event(&self, target: &str, event: SseDispatch) {
474        let tx = {
475            let sse = self.inner.sse.borrow();
476            sse.listeners.get(target).cloned()
477        };
478        if let Some(tx) = tx {
479            let _ = tx.unbounded_send(event);
480        }
481    }
482
483    fn broadcast_connection(&self, state: ConnectionState) {
484        if let Some(mut signal) = self.inner.connection_state {
485            signal.set(state);
486        }
487    }
488
489    fn mark_connected(&self, session_id: String, session_secret: String) -> bool {
490        let mut sse = self.inner.sse.borrow_mut();
491        let is_reconnect = sse.ever_connected;
492        sse.session_id = Some(session_id);
493        sse.session_secret = Some(session_secret);
494        sse.state = SseState::Connected;
495        sse.reconnect_attempts = 0;
496        sse.ever_connected = true;
497        for waiter in sse.connect_waiters.drain(..) {
498            let _ = waiter.send(Ok(()));
499        }
500        is_reconnect
501    }
502
503    fn mark_disconnected(&self) {
504        let mut sse = self.inner.sse.borrow_mut();
505        sse.session_id = None;
506        sse.session_secret = None;
507        sse.state = SseState::Idle;
508        sse.event_loop_task = None;
509        let err = || ForgeClientError::new("SSE_CONNECTION_FAILED", "SSE connection lost", None);
510        for waiter in sse.connect_waiters.drain(..) {
511            let _ = waiter.send(Err(err()));
512        }
513    }
514
515    fn should_reconnect(&self) -> Option<u32> {
516        let mut sse = self.inner.sse.borrow_mut();
517        if sse.listeners.is_empty() {
518            return None;
519        }
520        let attempts = sse.reconnect_attempts;
521        if attempts >= MAX_RECONNECT_ATTEMPTS {
522            return None;
523        }
524        sse.reconnect_attempts = attempts + 1;
525        Some(attempts)
526    }
527
528    fn get_token(&self) -> Option<String> {
529        self.inner
530            .get_token
531            .as_ref()
532            .and_then(|provider| provider())
533            .filter(|t| !t.is_empty())
534    }
535
536    fn decode_envelope<TResult>(
537        &self,
538        envelope: RpcEnvelopeRaw,
539    ) -> Result<TResult, ForgeClientError>
540    where
541        TResult: DeserializeOwned,
542    {
543        if !envelope.success {
544            let error = envelope.error.unwrap_or(ForgeError {
545                code: "UNKNOWN".to_string(),
546                message: "Unknown error".to_string(),
547                details: None,
548            });
549            return Err(ForgeClientError::new(error.code, error.message, error.details));
550        }
551
552        let data = envelope.data.ok_or_else(|| {
553            ForgeClientError::new("EMPTY_RESPONSE", "Server returned no data", None)
554        })?;
555        serde_json::from_value(data)
556            .map_err(|err| ForgeClientError::new("DESERIALIZATION_ERROR", err.to_string(), None))
557    }
558
559    fn random_id(&self, prefix: &str) -> String {
560        let id = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
561        format!("{prefix}-{id}")
562    }
563}
564
565async fn sleep(duration: Duration) {
566    #[cfg(target_arch = "wasm32")]
567    {
568        gloo_timers::future::sleep(duration).await;
569    }
570
571    #[cfg(not(target_arch = "wasm32"))]
572    {
573        tokio::time::sleep(duration).await;
574    }
575}
576
577#[derive(Clone)]
578pub struct SubscriptionHandle {
579    closed: Rc<Cell<bool>>,
580    task: Rc<RefCell<Option<Task>>>,
581    cleanup: Rc<RefCell<Option<Box<dyn FnOnce()>>>>,
582}
583
584impl SubscriptionHandle {
585    fn new(sub_id: String, target: String, client: ForgeClient) -> Self {
586        let cleanup: Box<dyn FnOnce()> = Box::new(move || {
587            let mut sse = client.inner.sse.borrow_mut();
588            sse.listeners.remove(&target);
589            sse.registrations.remove(&sub_id);
590            // Jobs/workflows have server-managed lifecycles; only query subs need explicit unsubscribe
591            if target.starts_with("sub:") {
592                let session_id = sse.session_id.clone();
593                let session_secret = sse.session_secret.clone();
594                drop(sse);
595                if let (Some(sid), Some(ss)) = (session_id, session_secret) {
596                    let url = format!("{}/_api/unsubscribe", client.inner.url);
597                    let payload = serde_json::json!({
598                        "session_id": sid,
599                        "session_secret": ss,
600                        "id": sub_id,
601                    });
602                    let client = client.clone();
603                    dioxus::prelude::spawn(async move {
604                        let _ = platform::request_json(&client, &url, payload).await;
605                    });
606                }
607            }
608        });
609
610        Self {
611            closed: Rc::new(Cell::new(false)),
612            task: Rc::new(RefCell::new(None)),
613            cleanup: Rc::new(RefCell::new(Some(cleanup))),
614        }
615    }
616
617    fn set_task(&self, task: Task) {
618        *self.task.borrow_mut() = Some(task);
619    }
620
621    pub(crate) fn finish(&self) {
622        if self.closed.replace(true) {
623            return;
624        }
625        if let Some(cleanup) = self.cleanup.borrow_mut().take() {
626            cleanup();
627        }
628        self.task.borrow_mut().take();
629    }
630
631    pub fn close(&self) {
632        let task = { self.task.borrow_mut().clone() };
633        self.finish();
634        if let Some(task) = task {
635            task.cancel();
636        }
637    }
638
639    pub fn is_closed(&self) -> bool {
640        self.closed.get()
641    }
642}
643
644impl Drop for SubscriptionHandle {
645    fn drop(&mut self) {
646        self.close();
647    }
648}
649
650// ── WASM platform ───────────────────────────────────────────────────────────
651
652#[cfg(target_arch = "wasm32")]
653mod platform {
654    use dioxus::prelude::spawn;
655    use futures_util::{StreamExt, stream};
656    use gloo_net::eventsource::futures::EventSource;
657    use gloo_net::http::Request;
658    use js_sys::{JSON, encode_uri_component};
659
660    use super::{ForgeClient, SseDispatch, sleep};
661    use crate::types::{
662        ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
663    };
664
665    pub(super) async fn request_json(
666        client: &ForgeClient,
667        url: &str,
668        body: serde_json::Value,
669    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
670        let mut request = Request::post(url).header("Content-Type", "application/json");
671        if let Some(token) = client.get_token() {
672            request = request.header("Authorization", &format!("Bearer {token}"));
673        }
674
675        let request = request.body(body.to_string()).map_err(request_error)?;
676        request
677            .send()
678            .await
679            .map_err(request_error)?
680            .json()
681            .await
682            .map_err(request_error)
683    }
684
685    pub(super) async fn request_multipart(
686        client: &ForgeClient,
687        url: &str,
688        form: web_sys::FormData,
689    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
690        let mut request = Request::post(url);
691        if let Some(token) = client.get_token() {
692            request = request.header("Authorization", &format!("Bearer {token}"));
693        }
694
695        let response = request.body(form).map_err(request_error)?;
696        response
697            .send()
698            .await
699            .map_err(request_error)?
700            .json()
701            .await
702            .map_err(request_error)
703    }
704
705    fn message_data_as_string(message: &web_sys::MessageEvent) -> Option<String> {
706        let data = message.data();
707        data.as_string().or_else(|| {
708            JSON::stringify(&data)
709                .ok()
710                .and_then(|value| value.as_string())
711                .map(|raw| serde_json::from_str::<String>(&raw).unwrap_or(raw))
712        })
713    }
714
715    fn events_url(client: &ForgeClient) -> String {
716        match client.get_token() {
717            Some(token) => format!(
718                "{}/_api/events?token={}",
719                client.inner.url,
720                encode_uri_component(&token)
721            ),
722            None => format!("{}/_api/events", client.inner.url),
723        }
724    }
725
726    /// Start the single shared SSE event loop.
727    pub(super) fn start_event_loop(client: ForgeClient) {
728        let client_for_task = client.clone();
729        let task = spawn(async move {
730            run_event_loop(&client_for_task).await;
731
732            // Disconnected. Try to reconnect.
733            client_for_task.mark_disconnected();
734            client_for_task.broadcast_connection(ConnectionState::Disconnected);
735
736            if let Some(attempts) = client_for_task.should_reconnect() {
737                let delay = 1000 * (1u64 << attempts.min(4));
738                let jitter = (js_sys::Math::random() * 500.0) as u64;
739                sleep(std::time::Duration::from_millis(delay + jitter)).await;
740
741                client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
742                start_event_loop(client_for_task);
743            }
744        });
745
746        client.inner.sse.borrow_mut().event_loop_task = Some(task);
747    }
748
749    async fn run_event_loop(client: &ForgeClient) {
750        let mut event_source = match EventSource::new(&events_url(client)) {
751            Ok(source) => source,
752            Err(_) => {
753                return;
754            }
755        };
756
757        let mut connected_stream = match event_source.subscribe("connected") {
758            Ok(stream) => stream,
759            Err(_) => return,
760        };
761        let update_stream = match event_source.subscribe("update") {
762            Ok(stream) => stream,
763            Err(_) => return,
764        };
765        let error_stream = match event_source.subscribe("error") {
766            Ok(stream) => stream,
767            Err(_) => return,
768        };
769
770        // Wait for the connected event
771        let connected_event = match connected_stream.next().await {
772            Some(Ok((_kind, message))) => {
773                let Some(raw) = message_data_as_string(&message) else {
774                    return;
775                };
776                match serde_json::from_str::<ConnectedEvent>(&raw) {
777                    Ok(event) => event,
778                    Err(_) => return,
779                }
780            }
781            _ => return,
782        };
783
784        let session_id = connected_event.session_id.unwrap_or_default();
785        let session_secret = connected_event.session_secret.unwrap_or_default();
786
787        if session_id.is_empty() || session_secret.is_empty() {
788            return;
789        }
790
791        let is_reconnect = client.mark_connected(session_id, session_secret);
792        client.broadcast_connection(ConnectionState::Connected);
793
794        // Only re-register on reconnect; initial registrations are handled by each subscription task
795        if is_reconnect {
796            client.reregister_all().await;
797        }
798
799        let mut events = stream::select(update_stream, error_stream);
800        while let Some(event) = events.next().await {
801            match event {
802                Ok((kind, message)) => {
803                    let Some(raw) = message_data_as_string(&message) else {
804                        continue;
805                    };
806                    let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&raw) else {
807                        continue;
808                    };
809
810                    let Some(target) = envelope.target else {
811                        continue;
812                    };
813
814                    if kind == "update" {
815                        if let Some(payload) = envelope.payload {
816                            client.dispatch_event(&target, SseDispatch::Data(payload));
817                        }
818                    } else {
819                        let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
820                        let message = envelope.message.unwrap_or_else(|| "Subscription error".to_string());
821                        client.dispatch_event(&target, SseDispatch::Error { code, message });
822                    }
823                }
824                Err(_) => break,
825            }
826        }
827
828        event_source.close();
829    }
830
831    fn request_error(err: gloo_net::Error) -> ForgeClientError {
832        ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
833    }
834}
835
836// ── Native platform ─────────────────────────────────────────────────────────
837
838#[cfg(not(target_arch = "wasm32"))]
839mod platform {
840    use dioxus::prelude::spawn;
841    use futures_util::StreamExt;
842    use reqwest::Client;
843    use reqwest_eventsource::{Event, EventSource};
844
845    use super::{ForgeClient, SseDispatch, sleep};
846    use crate::types::{
847        ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
848    };
849
850    pub(super) async fn request_json(
851        client: &ForgeClient,
852        url: &str,
853        body: serde_json::Value,
854    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
855        let mut request = Client::new().post(url).json(&body);
856        if let Some(token) = client.get_token() {
857            request = request.bearer_auth(token);
858        }
859
860        request
861            .send()
862            .await
863            .map_err(request_error)?
864            .json()
865            .await
866            .map_err(request_error)
867    }
868
869    pub(super) async fn request_multipart(
870        client: &ForgeClient,
871        url: &str,
872        form: reqwest::multipart::Form,
873    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
874        let mut request = Client::new().post(url).multipart(form);
875        if let Some(token) = client.get_token() {
876            request = request.bearer_auth(token);
877        }
878
879        request
880            .send()
881            .await
882            .map_err(request_error)?
883            .json()
884            .await
885            .map_err(request_error)
886    }
887
888    /// Start the single shared SSE event loop.
889    pub(super) fn start_event_loop(client: ForgeClient) {
890        let client_for_task = client.clone();
891        let task = spawn(async move {
892            run_event_loop(&client_for_task).await;
893
894            client_for_task.mark_disconnected();
895            client_for_task.broadcast_connection(ConnectionState::Disconnected);
896
897            if let Some(attempts) = client_for_task.should_reconnect() {
898                let delay = 1000 * (1u64 << attempts.min(4));
899                sleep(std::time::Duration::from_millis(delay)).await;
900
901                client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
902                start_event_loop(client_for_task);
903            }
904        });
905
906        client.inner.sse.borrow_mut().event_loop_task = Some(task);
907    }
908
909    async fn run_event_loop(client: &ForgeClient) {
910        let mut request = Client::new().get(format!("{}/_api/events", client.inner.url));
911        if let Some(token) = client.get_token() {
912            request = request.bearer_auth(token);
913        }
914
915        let mut event_source = match EventSource::new(request) {
916            Ok(source) => source,
917            Err(_) => return,
918        };
919
920        // Wait for connected event
921        let connected_event = loop {
922            let Some(event) = event_source.next().await else {
923                return;
924            };
925            match event {
926                Ok(Event::Open) => continue,
927                Ok(Event::Message(msg)) if msg.event == "connected" => {
928                    match serde_json::from_str::<ConnectedEvent>(&msg.data) {
929                        Ok(event) => break event,
930                        Err(_) => return,
931                    }
932                }
933                Ok(Event::Message(_)) => continue,
934                Err(_) => return,
935            }
936        };
937
938        let session_id = connected_event.session_id.unwrap_or_default();
939        let session_secret = connected_event.session_secret.unwrap_or_default();
940
941        if session_id.is_empty() || session_secret.is_empty() {
942            return;
943        }
944
945        let is_reconnect = client.mark_connected(session_id, session_secret);
946        client.broadcast_connection(ConnectionState::Connected);
947
948        if is_reconnect {
949            client.reregister_all().await;
950        }
951
952        while let Some(event) = event_source.next().await {
953            match event {
954                Ok(Event::Open) => {}
955                Ok(Event::Message(msg)) if msg.event == "update" || msg.event == "error" => {
956                    let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&msg.data) else {
957                        continue;
958                    };
959                    let Some(target) = envelope.target else {
960                        continue;
961                    };
962
963                    if msg.event == "update" {
964                        if let Some(payload) = envelope.payload {
965                            client.dispatch_event(&target, SseDispatch::Data(payload));
966                        }
967                    } else {
968                        let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
969                        let message =
970                            envelope.message.unwrap_or_else(|| "Subscription error".to_string());
971                        client.dispatch_event(&target, SseDispatch::Error { code, message });
972                    }
973                }
974                Ok(Event::Message(_)) => {}
975                Err(_) => break,
976            }
977        }
978
979        event_source.close();
980    }
981
982    fn request_error(err: reqwest::Error) -> ForgeClientError {
983        ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
984    }
985}