Skip to main content

forge_dioxus/
client.rs

1
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4use std::rc::Rc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8use dioxus::prelude::{Signal, WritableExt, dioxus_core::Task};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11
12use crate::types::{
13    ConnectionState, ForgeClientError, ForgeError, RpcEnvelopeRaw, StreamEvent,
14};
15
16type TokenProvider = Rc<dyn Fn() -> Option<String>>;
17type AuthErrorHandler = Rc<dyn Fn(ForgeError)>;
18type EventSender = futures_channel::mpsc::UnboundedSender<SseDispatch>;
19type ConnectWaiter = futures_channel::oneshot::Sender<Result<(), ForgeClientError>>;
20
21static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
22
23enum SseDispatch {
24    Data(serde_json::Value),
25    Error { code: String, message: String },
26}
27
28struct RegistrationMeta {
29    endpoint: &'static str,
30    payload: serde_json::Value,
31}
32
33#[derive(Default, Clone, Copy, PartialEq, Eq)]
34enum SseState {
35    #[default]
36    Idle,
37    Connecting,
38    Connected,
39}
40
41/// Shared SSE connection state, one per ForgeClient.
42#[derive(Default)]
43struct SseManager {
44    session_id: Option<String>,
45    session_secret: Option<String>,
46    state: SseState,
47    ever_connected: bool,
48    listeners: HashMap<String, EventSender>,
49    registrations: HashMap<String, RegistrationMeta>,
50    event_loop_task: Option<Task>,
51    reconnect_attempts: u32,
52    connect_waiters: Vec<ConnectWaiter>,
53}
54
55const MAX_RECONNECT_ATTEMPTS: u32 = 10;
56
57#[derive(Clone)]
58pub struct ForgeClientConfig {
59    pub url: String,
60    pub get_token: Option<TokenProvider>,
61    pub on_auth_error: Option<AuthErrorHandler>,
62    pub(crate) connection_state: Option<Signal<ConnectionState>>,
63}
64
65impl ForgeClientConfig {
66    pub fn new(url: impl Into<String>) -> Self {
67        Self {
68            url: url.into(),
69            get_token: None,
70            on_auth_error: None,
71            connection_state: None,
72        }
73    }
74
75    pub fn with_token_provider(mut self, provider: impl Fn() -> Option<String> + 'static) -> Self {
76        self.get_token = Some(Rc::new(provider));
77        self
78    }
79
80    pub fn with_auth_error_handler(
81        mut self,
82        handler: impl Fn(ForgeError) + 'static,
83    ) -> Self {
84        self.on_auth_error = Some(Rc::new(handler));
85        self
86    }
87
88    pub(crate) fn with_connection_state(mut self, state: Signal<ConnectionState>) -> Self {
89        self.connection_state = Some(state);
90        self
91    }
92}
93
94#[derive(Clone)]
95pub struct ForgeClient {
96    inner: Rc<ForgeClientInner>,
97}
98
99struct ForgeClientInner {
100    url: String,
101    get_token: Option<TokenProvider>,
102    on_auth_error: Option<AuthErrorHandler>,
103    connection_state: Option<Signal<ConnectionState>>,
104    sse: RefCell<SseManager>,
105}
106
107impl ForgeClient {
108    pub fn new(config: ForgeClientConfig) -> Self {
109        Self {
110            inner: Rc::new(ForgeClientInner {
111                url: config.url.trim_end_matches('/').to_string(),
112                get_token: config.get_token,
113                on_auth_error: config.on_auth_error,
114                connection_state: config.connection_state,
115                sse: RefCell::new(SseManager::default()),
116            }),
117        }
118    }
119
120    pub async fn call<TArgs, TResult>(
121        &self,
122        function_name: &str,
123        args: TArgs,
124    ) -> Result<TResult, ForgeClientError>
125    where
126        TArgs: Serialize,
127        TResult: DeserializeOwned,
128    {
129        let body = serde_json::json!({ "args": args });
130        let envelope = platform::request_json(
131            self,
132            &format!("{}/_api/rpc/{}", self.inner.url, function_name),
133            body,
134        )
135        .await?;
136        self.decode_envelope(envelope)
137    }
138
139    #[cfg(target_arch = "wasm32")]
140    pub async fn call_multipart<TResult>(
141        &self,
142        function_name: &str,
143        form: web_sys::FormData,
144    ) -> Result<TResult, ForgeClientError>
145    where
146        TResult: DeserializeOwned,
147    {
148        let envelope = platform::request_multipart(
149            self,
150            &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
151            form,
152        )
153        .await?;
154        self.decode_envelope(envelope)
155    }
156
157    #[cfg(not(target_arch = "wasm32"))]
158    pub async fn call_multipart<TResult>(
159        &self,
160        function_name: &str,
161        form: reqwest::multipart::Form,
162    ) -> Result<TResult, ForgeClientError>
163    where
164        TResult: DeserializeOwned,
165    {
166        let envelope = platform::request_multipart(
167            self,
168            &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
169            form,
170        )
171        .await?;
172        self.decode_envelope(envelope)
173    }
174
175    pub fn subscribe_query<TArgs, TResult, F>(
176        &self,
177        function_name: &str,
178        args: TArgs,
179        callback: F,
180    ) -> SubscriptionHandle
181    where
182        TArgs: Serialize + Clone + 'static,
183        TResult: DeserializeOwned + Clone + 'static,
184        F: FnMut(StreamEvent<TResult>) + 'static,
185    {
186        let sub_id = self.random_id("sub");
187        let target = format!("sub:{sub_id}");
188
189        let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
190        self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
191
192        let args_value = serde_json::to_value(&args).unwrap_or(serde_json::Value::Null);
193        let reg_payload = serde_json::json!({
194            "id": sub_id,
195            "function": function_name,
196            "args": args_value,
197        });
198        self.inner.sse.borrow_mut().registrations.insert(
199            sub_id.clone(),
200            RegistrationMeta {
201                endpoint: "/_api/subscribe",
202                payload: reg_payload,
203            },
204        );
205
206        self.spawn_subscription(sub_id, target, rx, callback, |client, envelope, cb| {
207            match client.decode_envelope::<TResult>(envelope) {
208                Ok(data) => cb(StreamEvent::Data(data)),
209                Err(err) => cb(StreamEvent::Error(err)),
210            }
211        })
212    }
213
214    pub fn subscribe_job<TResult, F>(&self, job_id: String, callback: F) -> SubscriptionHandle
215    where
216        TResult: DeserializeOwned + Clone + 'static,
217        F: FnMut(StreamEvent<TResult>) + 'static,
218    {
219        self.subscribe_tracker("job", serde_json::json!({ "job_id": job_id }), "/_api/subscribe-job", callback)
220    }
221
222    pub fn subscribe_workflow<TResult, F>(
223        &self,
224        workflow_id: String,
225        callback: F,
226    ) -> SubscriptionHandle
227    where
228        TResult: DeserializeOwned + Clone + 'static,
229        F: FnMut(StreamEvent<TResult>) + 'static,
230    {
231        self.subscribe_tracker(
232            "wf",
233            serde_json::json!({ "workflow_id": workflow_id }),
234            "/_api/subscribe-workflow",
235            callback,
236        )
237    }
238
239    fn subscribe_tracker<TResult, F>(
240        &self,
241        prefix: &str,
242        payload: serde_json::Value,
243        endpoint: &'static str,
244        callback: F,
245    ) -> SubscriptionHandle
246    where
247        TResult: DeserializeOwned + Clone + 'static,
248        F: FnMut(StreamEvent<TResult>) + 'static,
249    {
250        let sub_id = self.random_id(prefix);
251        let target = format!("{prefix}:{sub_id}");
252
253        let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
254        self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
255
256        let mut reg_payload = payload;
257        reg_payload
258            .as_object_mut()
259            .expect("tracker payload must be an object")
260            .insert("id".to_string(), serde_json::Value::String(sub_id.clone()));
261        self.inner.sse.borrow_mut().registrations.insert(
262            sub_id.clone(),
263            RegistrationMeta {
264                endpoint,
265                payload: reg_payload,
266            },
267        );
268
269        self.spawn_subscription(sub_id, target, rx, callback, |_client, envelope, cb| {
270            if envelope.success {
271                if let Some(data) = envelope.data {
272                    match serde_json::from_value::<TResult>(data) {
273                        Ok(parsed) => cb(StreamEvent::Data(parsed)),
274                        Err(e) => cb(StreamEvent::Error(ForgeClientError::new(
275                            "DESERIALIZATION_ERROR",
276                            e.to_string(),
277                            None,
278                        ))),
279                    }
280                }
281            }
282        })
283    }
284
285    fn spawn_subscription<TResult, F>(
286        &self,
287        sub_id: String,
288        target: String,
289        mut rx: futures_channel::mpsc::UnboundedReceiver<SseDispatch>,
290        mut callback: F,
291        on_initial: impl FnOnce(&ForgeClient, RpcEnvelopeRaw, &mut F) + 'static,
292    ) -> SubscriptionHandle
293    where
294        TResult: DeserializeOwned + Clone + 'static,
295        F: FnMut(StreamEvent<TResult>) + 'static,
296    {
297        let client = self.clone();
298        let handle = SubscriptionHandle::new(sub_id.clone(), target, self.clone());
299        let handle_task = handle.clone();
300
301        let task = dioxus::prelude::spawn(async move {
302            callback(StreamEvent::Connection(ConnectionState::Connecting));
303
304            if let Err(e) = client.ensure_connected().await {
305                callback(StreamEvent::Error(e));
306                callback(StreamEvent::Connection(ConnectionState::Disconnected));
307                handle_task.finish();
308                return;
309            }
310
311            match client.register_subscription(&sub_id).await {
312                Ok(envelope) => {
313                    callback(StreamEvent::Connection(ConnectionState::Connected));
314                    on_initial(&client, envelope, &mut callback);
315                }
316                Err(err) => {
317                    callback(StreamEvent::Error(err));
318                    callback(StreamEvent::Connection(ConnectionState::Disconnected));
319                    handle_task.finish();
320                    return;
321                }
322            }
323
324            while let Some(event) = futures_util::StreamExt::next(&mut rx).await {
325                Self::deliver_event::<TResult, F>(&mut callback, &client, event);
326            }
327
328            handle_task.finish();
329        });
330
331        handle.set_task(task);
332        handle
333    }
334
335    fn deliver_event<TResult, F>(
336        callback: &mut F,
337        client: &ForgeClient,
338        event: SseDispatch,
339    ) where
340        TResult: DeserializeOwned,
341        F: FnMut(StreamEvent<TResult>),
342    {
343        match event {
344            SseDispatch::Data(value) => match serde_json::from_value::<TResult>(value) {
345                Ok(data) => callback(StreamEvent::Data(data)),
346                Err(e) => {
347                    callback(StreamEvent::Error(ForgeClientError::new(
348                        "DESERIALIZATION_ERROR",
349                        e.to_string(),
350                        None,
351                    )));
352                }
353            },
354            SseDispatch::Error { code, message } => {
355                let err = ForgeClientError::new(&code, &message, None);
356                if code == "UNAUTHORIZED" {
357                    if let Some(handler) = &client.inner.on_auth_error {
358                        handler(err.as_forge_error());
359                    }
360                }
361                callback(StreamEvent::Error(err));
362            }
363        }
364    }
365
366    /// Ensure the shared SSE connection is established. Spawns the event loop on first call.
367    async fn ensure_connected(&self) -> Result<(), ForgeClientError> {
368        let rx = {
369            let mut sse = self.inner.sse.borrow_mut();
370            if sse.state == SseState::Connected {
371                return Ok(());
372            }
373
374            let (tx, rx) = futures_channel::oneshot::channel();
375            sse.connect_waiters.push(tx);
376
377            if sse.state == SseState::Idle {
378                sse.state = SseState::Connecting;
379                drop(sse);
380                platform::start_event_loop(self.clone());
381            }
382
383            rx
384        };
385
386        rx.await.unwrap_or_else(|_| {
387            Err(ForgeClientError::new(
388                "SSE_CONNECTION_FAILED",
389                "Connection attempt cancelled",
390                None,
391            ))
392        })
393    }
394
395    /// Register a subscription with the server via POST.
396    /// If the server returns SESSION_NOT_FOUND, forces a reconnect and retries once.
397    async fn register_subscription(
398        &self,
399        sub_id: &str,
400    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
401        let envelope = self.try_register_subscription(sub_id).await?;
402
403        // Stale session: force reconnect and retry once
404        let needs_retry = !envelope.success
405            && envelope
406                .error
407                .as_ref()
408                .is_some_and(|e| e.code == "SESSION_NOT_FOUND" || e.code == "SESSION_PRINCIPAL_MISMATCH");
409
410        if needs_retry {
411            self.force_reconnect().await;
412            self.ensure_connected().await?;
413            return self.try_register_subscription(sub_id).await;
414        }
415
416        Ok(envelope)
417    }
418
419    async fn try_register_subscription(
420        &self,
421        sub_id: &str,
422    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
423        let (endpoint, payload) = {
424            let sse = self.inner.sse.borrow();
425            let meta = sse
426                .registrations
427                .get(sub_id)
428                .ok_or_else(|| {
429                    ForgeClientError::new("INTERNAL_ERROR", "Registration metadata not found", None)
430                })?;
431            let session_id = sse.session_id.clone().unwrap_or_default();
432            let session_secret = sse.session_secret.clone().unwrap_or_default();
433            let mut payload = meta.payload.clone();
434            let obj = payload
435                .as_object_mut()
436                .expect("registration payload must be an object");
437            obj.insert("session_id".into(), serde_json::Value::String(session_id));
438            obj.insert("session_secret".into(), serde_json::Value::String(session_secret));
439            (meta.endpoint, payload)
440        };
441
442        let url = format!("{}{}", self.inner.url, endpoint);
443        platform::request_json(self, &url, payload).await
444    }
445
446    async fn force_reconnect(&self) {
447        let task = {
448            let mut sse = self.inner.sse.borrow_mut();
449            sse.session_id = None;
450            sse.session_secret = None;
451            sse.state = SseState::Idle;
452            // No need to drain waiters; force_reconnect is only called mid-registration
453            // where the caller already passed ensure_connected
454            sse.event_loop_task.take()
455        };
456        if let Some(task) = task {
457            task.cancel();
458        }
459        sleep(Duration::from_millis(10)).await;
460    }
461
462    async fn reregister_all(&self) {
463        let sub_ids: Vec<String> = {
464            let sse = self.inner.sse.borrow();
465            sse.registrations.keys().cloned().collect()
466        };
467
468        for sub_id in sub_ids {
469            let _ = self.register_subscription(&sub_id).await;
470        }
471    }
472
473    fn dispatch_event(&self, target: &str, event: SseDispatch) {
474        let tx = {
475            let sse = self.inner.sse.borrow();
476            sse.listeners.get(target).cloned()
477        };
478        if let Some(tx) = tx {
479            let _ = tx.unbounded_send(event);
480        }
481    }
482
483    fn broadcast_connection(&self, state: ConnectionState) {
484        if let Some(mut signal) = self.inner.connection_state {
485            signal.set(state);
486        }
487    }
488
489    fn mark_connected(&self, session_id: String, session_secret: String) -> bool {
490        let mut sse = self.inner.sse.borrow_mut();
491        let is_reconnect = sse.ever_connected;
492        sse.session_id = Some(session_id);
493        sse.session_secret = Some(session_secret);
494        sse.state = SseState::Connected;
495        sse.reconnect_attempts = 0;
496        sse.ever_connected = true;
497        for waiter in sse.connect_waiters.drain(..) {
498            let _ = waiter.send(Ok(()));
499        }
500        is_reconnect
501    }
502
503    fn mark_disconnected(&self) {
504        let mut sse = self.inner.sse.borrow_mut();
505        sse.session_id = None;
506        sse.session_secret = None;
507        sse.state = SseState::Idle;
508        sse.event_loop_task = None;
509        let err = || ForgeClientError::new("SSE_CONNECTION_FAILED", "SSE connection lost", None);
510        for waiter in sse.connect_waiters.drain(..) {
511            let _ = waiter.send(Err(err()));
512        }
513    }
514
515    fn should_reconnect(&self) -> Option<u32> {
516        let mut sse = self.inner.sse.borrow_mut();
517        if sse.listeners.is_empty() {
518            return None;
519        }
520        let attempts = sse.reconnect_attempts;
521        if attempts >= MAX_RECONNECT_ATTEMPTS {
522            return None;
523        }
524        sse.reconnect_attempts = attempts + 1;
525        Some(attempts)
526    }
527
528    fn get_token(&self) -> Option<String> {
529        self.inner
530            .get_token
531            .as_ref()
532            .and_then(|provider| provider())
533            .filter(|t| !t.is_empty())
534    }
535
536    fn decode_envelope<TResult>(
537        &self,
538        envelope: RpcEnvelopeRaw,
539    ) -> Result<TResult, ForgeClientError>
540    where
541        TResult: DeserializeOwned,
542    {
543        if !envelope.success {
544            let error = envelope.error.unwrap_or(ForgeError {
545                code: "UNKNOWN".to_string(),
546                message: "Unknown error".to_string(),
547                details: None,
548            });
549            return Err(ForgeClientError::new(error.code, error.message, error.details));
550        }
551
552        let data = envelope.data.ok_or_else(|| {
553            ForgeClientError::new("EMPTY_RESPONSE", "Server returned no data", None)
554        })?;
555        serde_json::from_value(data)
556            .map_err(|err| ForgeClientError::new("DESERIALIZATION_ERROR", err.to_string(), None))
557    }
558
559    fn random_id(&self, prefix: &str) -> String {
560        let id = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
561        format!("{prefix}-{id}")
562    }
563}
564
565async fn sleep(duration: Duration) {
566    #[cfg(target_arch = "wasm32")]
567    {
568        gloo_timers::future::sleep(duration).await;
569    }
570
571    #[cfg(not(target_arch = "wasm32"))]
572    {
573        tokio::time::sleep(duration).await;
574    }
575}
576
577#[derive(Clone)]
578pub struct SubscriptionHandle {
579    closed: Rc<Cell<bool>>,
580    task: Rc<RefCell<Option<Task>>>,
581    cleanup: Rc<RefCell<Option<Box<dyn FnOnce()>>>>,
582}
583
584impl SubscriptionHandle {
585    fn new(sub_id: String, target: String, client: ForgeClient) -> Self {
586        let cleanup: Box<dyn FnOnce()> = Box::new(move || {
587            let mut sse = client.inner.sse.borrow_mut();
588            sse.listeners.remove(&target);
589            sse.registrations.remove(&sub_id);
590            // Jobs/workflows have server-managed lifecycles; only query subs need explicit unsubscribe
591            if target.starts_with("sub:") {
592                let session_id = sse.session_id.clone();
593                let session_secret = sse.session_secret.clone();
594                drop(sse);
595                if let (Some(sid), Some(ss)) = (session_id, session_secret) {
596                    let url = format!("{}/_api/unsubscribe", client.inner.url);
597                    let payload = serde_json::json!({
598                        "session_id": sid,
599                        "session_secret": ss,
600                        "id": sub_id,
601                    });
602                    let client = client.clone();
603                    dioxus::prelude::spawn(async move {
604                        let _ = platform::request_json(&client, &url, payload).await;
605                    });
606                }
607            }
608        });
609
610        Self {
611            closed: Rc::new(Cell::new(false)),
612            task: Rc::new(RefCell::new(None)),
613            cleanup: Rc::new(RefCell::new(Some(cleanup))),
614        }
615    }
616
617    fn set_task(&self, task: Task) {
618        *self.task.borrow_mut() = Some(task);
619    }
620
621    pub(crate) fn finish(&self) {
622        if self.closed.replace(true) {
623            return;
624        }
625        if let Some(cleanup) = self.cleanup.borrow_mut().take() {
626            cleanup();
627        }
628        self.task.borrow_mut().take();
629    }
630
631    pub fn close(&self) {
632        let task = { self.task.borrow_mut().clone() };
633        self.finish();
634        if let Some(task) = task {
635            task.cancel();
636        }
637    }
638
639    pub fn is_closed(&self) -> bool {
640        self.closed.get()
641    }
642}
643
644impl Drop for SubscriptionHandle {
645    fn drop(&mut self) {
646        self.close();
647    }
648}
649
650// ── WASM platform ───────────────────────────────────────────────────────────
651
652#[cfg(target_arch = "wasm32")]
653mod platform {
654    use dioxus::prelude::spawn;
655    use futures_util::{StreamExt, stream};
656    use gloo_net::eventsource::futures::EventSource;
657    use gloo_net::http::Request;
658    use js_sys::{JSON, encode_uri_component};
659
660    use super::{ForgeClient, SseDispatch, sleep};
661    use crate::types::{
662        ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
663    };
664
665    pub(super) async fn request_json(
666        client: &ForgeClient,
667        url: &str,
668        body: serde_json::Value,
669    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
670        let mut request = Request::post(url)
671            .header("Content-Type", "application/json")
672            .credentials(web_sys::RequestCredentials::Include);
673        if let Some(token) = client.get_token() {
674            request = request.header("Authorization", &format!("Bearer {token}"));
675        }
676
677        let request = request.body(body.to_string()).map_err(request_error)?;
678        request
679            .send()
680            .await
681            .map_err(request_error)?
682            .json()
683            .await
684            .map_err(request_error)
685    }
686
687    pub(super) async fn request_multipart(
688        client: &ForgeClient,
689        url: &str,
690        form: web_sys::FormData,
691    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
692        let mut request = Request::post(url)
693            .credentials(web_sys::RequestCredentials::Include);
694        if let Some(token) = client.get_token() {
695            request = request.header("Authorization", &format!("Bearer {token}"));
696        }
697
698        let response = request.body(form).map_err(request_error)?;
699        response
700            .send()
701            .await
702            .map_err(request_error)?
703            .json()
704            .await
705            .map_err(request_error)
706    }
707
708    fn message_data_as_string(message: &web_sys::MessageEvent) -> Option<String> {
709        let data = message.data();
710        data.as_string().or_else(|| {
711            JSON::stringify(&data)
712                .ok()
713                .and_then(|value| value.as_string())
714                .map(|raw| serde_json::from_str::<String>(&raw).unwrap_or(raw))
715        })
716    }
717
718    fn events_url(client: &ForgeClient) -> String {
719        match client.get_token() {
720            Some(token) => format!(
721                "{}/_api/events?token={}",
722                client.inner.url,
723                encode_uri_component(&token)
724            ),
725            None => format!("{}/_api/events", client.inner.url),
726        }
727    }
728
729    /// Start the single shared SSE event loop.
730    pub(super) fn start_event_loop(client: ForgeClient) {
731        let client_for_task = client.clone();
732        let task = spawn(async move {
733            run_event_loop(&client_for_task).await;
734
735            // Disconnected. Try to reconnect.
736            client_for_task.mark_disconnected();
737            client_for_task.broadcast_connection(ConnectionState::Disconnected);
738
739            if let Some(attempts) = client_for_task.should_reconnect() {
740                let delay = 1000 * (1u64 << attempts.min(4));
741                let jitter = (js_sys::Math::random() * 500.0) as u64;
742                sleep(std::time::Duration::from_millis(delay + jitter)).await;
743
744                client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
745                start_event_loop(client_for_task);
746            }
747        });
748
749        client.inner.sse.borrow_mut().event_loop_task = Some(task);
750    }
751
752    async fn run_event_loop(client: &ForgeClient) {
753        let mut event_source = match EventSource::new(&events_url(client)) {
754            Ok(source) => source,
755            Err(_) => {
756                return;
757            }
758        };
759
760        let mut connected_stream = match event_source.subscribe("connected") {
761            Ok(stream) => stream,
762            Err(_) => return,
763        };
764        let update_stream = match event_source.subscribe("update") {
765            Ok(stream) => stream,
766            Err(_) => return,
767        };
768        let error_stream = match event_source.subscribe("error") {
769            Ok(stream) => stream,
770            Err(_) => return,
771        };
772
773        // Wait for the connected event
774        let connected_event = match connected_stream.next().await {
775            Some(Ok((_kind, message))) => {
776                let Some(raw) = message_data_as_string(&message) else {
777                    return;
778                };
779                match serde_json::from_str::<ConnectedEvent>(&raw) {
780                    Ok(event) => event,
781                    Err(_) => return,
782                }
783            }
784            _ => return,
785        };
786
787        let session_id = connected_event.session_id.unwrap_or_default();
788        let session_secret = connected_event.session_secret.unwrap_or_default();
789
790        if session_id.is_empty() || session_secret.is_empty() {
791            return;
792        }
793
794        let is_reconnect = client.mark_connected(session_id, session_secret);
795        client.broadcast_connection(ConnectionState::Connected);
796
797        // Only re-register on reconnect; initial registrations are handled by each subscription task
798        if is_reconnect {
799            client.reregister_all().await;
800        }
801
802        let mut events = stream::select(update_stream, error_stream);
803        while let Some(event) = events.next().await {
804            match event {
805                Ok((kind, message)) => {
806                    let Some(raw) = message_data_as_string(&message) else {
807                        continue;
808                    };
809                    let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&raw) else {
810                        continue;
811                    };
812
813                    let Some(target) = envelope.target else {
814                        continue;
815                    };
816
817                    if kind == "update" {
818                        if let Some(payload) = envelope.payload {
819                            client.dispatch_event(&target, SseDispatch::Data(payload));
820                        }
821                    } else {
822                        let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
823                        let message = envelope.message.unwrap_or_else(|| "Subscription error".to_string());
824                        client.dispatch_event(&target, SseDispatch::Error { code, message });
825                    }
826                }
827                Err(_) => break,
828            }
829        }
830
831        event_source.close();
832    }
833
834    fn request_error(err: gloo_net::Error) -> ForgeClientError {
835        ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
836    }
837}
838
839// ── Native platform ─────────────────────────────────────────────────────────
840
841#[cfg(not(target_arch = "wasm32"))]
842mod platform {
843    use dioxus::prelude::spawn;
844    use futures_util::StreamExt;
845    use reqwest::Client;
846    use reqwest_eventsource::{Event, EventSource};
847
848    use super::{ForgeClient, SseDispatch, sleep};
849    use crate::types::{
850        ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
851    };
852
853    pub(super) async fn request_json(
854        client: &ForgeClient,
855        url: &str,
856        body: serde_json::Value,
857    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
858        let mut request = Client::new().post(url).json(&body);
859        if let Some(token) = client.get_token() {
860            request = request.bearer_auth(token);
861        }
862
863        request
864            .send()
865            .await
866            .map_err(request_error)?
867            .json()
868            .await
869            .map_err(request_error)
870    }
871
872    pub(super) async fn request_multipart(
873        client: &ForgeClient,
874        url: &str,
875        form: reqwest::multipart::Form,
876    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
877        let mut request = Client::new().post(url).multipart(form);
878        if let Some(token) = client.get_token() {
879            request = request.bearer_auth(token);
880        }
881
882        request
883            .send()
884            .await
885            .map_err(request_error)?
886            .json()
887            .await
888            .map_err(request_error)
889    }
890
891    /// Start the single shared SSE event loop.
892    pub(super) fn start_event_loop(client: ForgeClient) {
893        let client_for_task = client.clone();
894        let task = spawn(async move {
895            run_event_loop(&client_for_task).await;
896
897            client_for_task.mark_disconnected();
898            client_for_task.broadcast_connection(ConnectionState::Disconnected);
899
900            if let Some(attempts) = client_for_task.should_reconnect() {
901                let delay = 1000 * (1u64 << attempts.min(4));
902                sleep(std::time::Duration::from_millis(delay)).await;
903
904                client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
905                start_event_loop(client_for_task);
906            }
907        });
908
909        client.inner.sse.borrow_mut().event_loop_task = Some(task);
910    }
911
912    async fn run_event_loop(client: &ForgeClient) {
913        let mut request = Client::new().get(format!("{}/_api/events", client.inner.url));
914        if let Some(token) = client.get_token() {
915            request = request.bearer_auth(token);
916        }
917
918        let mut event_source = match EventSource::new(request) {
919            Ok(source) => source,
920            Err(_) => return,
921        };
922
923        // Wait for connected event
924        let connected_event = loop {
925            let Some(event) = event_source.next().await else {
926                return;
927            };
928            match event {
929                Ok(Event::Open) => continue,
930                Ok(Event::Message(msg)) if msg.event == "connected" => {
931                    match serde_json::from_str::<ConnectedEvent>(&msg.data) {
932                        Ok(event) => break event,
933                        Err(_) => return,
934                    }
935                }
936                Ok(Event::Message(_)) => continue,
937                Err(_) => return,
938            }
939        };
940
941        let session_id = connected_event.session_id.unwrap_or_default();
942        let session_secret = connected_event.session_secret.unwrap_or_default();
943
944        if session_id.is_empty() || session_secret.is_empty() {
945            return;
946        }
947
948        let is_reconnect = client.mark_connected(session_id, session_secret);
949        client.broadcast_connection(ConnectionState::Connected);
950
951        if is_reconnect {
952            client.reregister_all().await;
953        }
954
955        while let Some(event) = event_source.next().await {
956            match event {
957                Ok(Event::Open) => {}
958                Ok(Event::Message(msg)) if msg.event == "update" || msg.event == "error" => {
959                    let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&msg.data) else {
960                        continue;
961                    };
962                    let Some(target) = envelope.target else {
963                        continue;
964                    };
965
966                    if msg.event == "update" {
967                        if let Some(payload) = envelope.payload {
968                            client.dispatch_event(&target, SseDispatch::Data(payload));
969                        }
970                    } else {
971                        let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
972                        let message =
973                            envelope.message.unwrap_or_else(|| "Subscription error".to_string());
974                        client.dispatch_event(&target, SseDispatch::Error { code, message });
975                    }
976                }
977                Ok(Event::Message(_)) => {}
978                Err(_) => break,
979            }
980        }
981
982        event_source.close();
983    }
984
985    fn request_error(err: reqwest::Error) -> ForgeClientError {
986        ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
987    }
988}