Skip to main content

forge_dioxus/
client.rs

1
2use std::cell::{Cell, RefCell};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use dioxus::prelude::{Signal, WritableExt, dioxus_core::Task};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9
10use crate::types::{
11    ConnectionState, ForgeClientError, ForgeError, RpcEnvelopeRaw, SseEnvelopeRaw, StreamEvent,
12};
13
14type TokenProvider = Rc<dyn Fn() -> Option<String>>;
15type AuthErrorHandler = Rc<dyn Fn(ForgeError)>;
16
17static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
18
19#[derive(Clone)]
20pub struct ForgeClientConfig {
21    pub url: String,
22    pub get_token: Option<TokenProvider>,
23    pub on_auth_error: Option<AuthErrorHandler>,
24    pub(crate) connection_state: Option<Signal<ConnectionState>>,
25}
26
27impl ForgeClientConfig {
28    pub fn new(url: impl Into<String>) -> Self {
29        Self {
30            url: url.into(),
31            get_token: None,
32            on_auth_error: None,
33            connection_state: None,
34        }
35    }
36
37    pub fn with_token_provider(mut self, provider: impl Fn() -> Option<String> + 'static) -> Self {
38        self.get_token = Some(Rc::new(provider));
39        self
40    }
41
42    pub fn with_auth_error_handler(
43        mut self,
44        handler: impl Fn(ForgeError) + 'static,
45    ) -> Self {
46        self.on_auth_error = Some(Rc::new(handler));
47        self
48    }
49
50    pub(crate) fn with_connection_state(mut self, state: Signal<ConnectionState>) -> Self {
51        self.connection_state = Some(state);
52        self
53    }
54}
55
56#[derive(Clone)]
57pub struct ForgeClient {
58    inner: Rc<ForgeClientInner>,
59}
60
61struct ForgeClientInner {
62    url: String,
63    get_token: Option<TokenProvider>,
64    on_auth_error: Option<AuthErrorHandler>,
65    connection_state: Option<Signal<ConnectionState>>,
66}
67
68impl ForgeClient {
69    pub fn new(config: ForgeClientConfig) -> Self {
70        Self {
71            inner: Rc::new(ForgeClientInner {
72                url: config.url.trim_end_matches('/').to_string(),
73                get_token: config.get_token,
74                on_auth_error: config.on_auth_error,
75                connection_state: config.connection_state,
76            }),
77        }
78    }
79
80    pub async fn call<TArgs, TResult>(
81        &self,
82        function_name: &str,
83        args: TArgs,
84    ) -> Result<TResult, ForgeClientError>
85    where
86        TArgs: Serialize,
87        TResult: DeserializeOwned,
88    {
89        let body = serde_json::json!({ "args": args });
90        let envelope = platform::request_json(
91            self,
92            &format!("{}/_api/rpc/{}", self.inner.url, function_name),
93            body,
94        )
95        .await?;
96        self.decode_envelope(envelope)
97    }
98
99    #[cfg(target_arch = "wasm32")]
100    pub async fn call_multipart<TResult>(
101        &self,
102        function_name: &str,
103        form: web_sys::FormData,
104    ) -> Result<TResult, ForgeClientError>
105    where
106        TResult: DeserializeOwned,
107    {
108        let envelope = platform::request_multipart(
109            self,
110            &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
111            form,
112        )
113        .await?;
114        self.decode_envelope(envelope)
115    }
116
117    #[cfg(not(target_arch = "wasm32"))]
118    pub async fn call_multipart<TResult>(
119        &self,
120        function_name: &str,
121        form: reqwest::multipart::Form,
122    ) -> Result<TResult, ForgeClientError>
123    where
124        TResult: DeserializeOwned,
125    {
126        let envelope = platform::request_multipart(
127            self,
128            &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
129            form,
130        )
131        .await?;
132        self.decode_envelope(envelope)
133    }
134
135    pub fn subscribe_query<TArgs, TResult, F>(
136        &self,
137        function_name: &str,
138        args: TArgs,
139        callback: F,
140    ) -> SubscriptionHandle
141    where
142        TArgs: Serialize + Clone + 'static,
143        TResult: DeserializeOwned + Clone + 'static,
144        F: FnMut(StreamEvent<TResult>) + 'static,
145    {
146        platform::subscribe_query(self.clone(), function_name.to_string(), args, callback)
147    }
148
149    pub fn subscribe_job<TResult, F>(&self, job_id: String, callback: F) -> SubscriptionHandle
150    where
151        TResult: DeserializeOwned + Clone + 'static,
152        F: FnMut(StreamEvent<TResult>) + 'static,
153    {
154        self.subscribe_tracker(
155            "job",
156            serde_json::json!({ "job_id": job_id }),
157            "/_api/subscribe-job",
158            callback,
159        )
160    }
161
162    pub fn subscribe_workflow<TResult, F>(
163        &self,
164        workflow_id: String,
165        callback: F,
166    ) -> SubscriptionHandle
167    where
168        TResult: DeserializeOwned + Clone + 'static,
169        F: FnMut(StreamEvent<TResult>) + 'static,
170    {
171        self.subscribe_tracker(
172            "wf",
173            serde_json::json!({ "workflow_id": workflow_id }),
174            "/_api/subscribe-workflow",
175            callback,
176        )
177    }
178
179    fn subscribe_tracker<TResult, F>(
180        &self,
181        prefix: &str,
182        payload: serde_json::Value,
183        endpoint: &str,
184        callback: F,
185    ) -> SubscriptionHandle
186    where
187        TResult: DeserializeOwned + Clone + 'static,
188        F: FnMut(StreamEvent<TResult>) + 'static,
189    {
190        platform::subscribe_tracker(
191            self.clone(),
192            prefix.to_string(),
193            payload,
194            endpoint.to_string(),
195            callback,
196        )
197    }
198
199    fn get_token(&self) -> Option<String> {
200        self.inner.get_token.as_ref().and_then(|provider| provider())
201    }
202
203    fn emit_connection<TValue, T>(&self, callback: &Rc<RefCell<T>>, state: ConnectionState)
204    where
205        T: FnMut(StreamEvent<TValue>),
206    {
207        if let Some(mut signal) = self.inner.connection_state {
208            signal.set(state);
209        }
210        (callback.borrow_mut())(StreamEvent::Connection(state));
211    }
212
213    fn emit_error<TValue, T>(&self, callback: &Rc<RefCell<T>>, error: ForgeClientError)
214    where
215        T: FnMut(StreamEvent<TValue>),
216    {
217        if error.code == "UNAUTHORIZED" {
218            if let Some(handler) = &self.inner.on_auth_error {
219                handler(error.as_forge_error());
220            }
221        }
222        (callback.borrow_mut())(StreamEvent::Error(error));
223    }
224
225    fn decode_envelope<TResult>(
226        &self,
227        envelope: RpcEnvelopeRaw,
228    ) -> Result<TResult, ForgeClientError>
229    where
230        TResult: DeserializeOwned,
231    {
232        if !envelope.success {
233            let error = envelope.error.unwrap_or(ForgeError {
234                code: "UNKNOWN".to_string(),
235                message: "Unknown error".to_string(),
236                details: None,
237            });
238            return Err(ForgeClientError::new(error.code, error.message, error.details));
239        }
240
241        let data = envelope.data.ok_or_else(|| {
242            ForgeClientError::new("EMPTY_RESPONSE", "Server returned no data", None)
243        })?;
244        serde_json::from_value(data)
245            .map_err(|err| ForgeClientError::new("DESERIALIZATION_ERROR", err.to_string(), None))
246    }
247
248    fn random_id(&self, prefix: &str) -> String {
249        let id = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
250        format!("{prefix}-{id}")
251    }
252}
253
254#[derive(Clone)]
255pub struct SubscriptionHandle {
256    closed: Rc<Cell<bool>>,
257    task: Rc<RefCell<Option<Task>>>,
258}
259
260impl SubscriptionHandle {
261    fn new() -> Self {
262        Self {
263            closed: Rc::new(Cell::new(false)),
264            task: Rc::new(RefCell::new(None)),
265        }
266    }
267
268    fn set_task(&self, task: Task) {
269        *self.task.borrow_mut() = Some(task);
270    }
271
272    fn finish(&self) {
273        self.closed.set(true);
274        self.task.borrow_mut().take();
275    }
276
277    pub fn close(&self) {
278        self.closed.set(true);
279        if let Some(task) = self.task.borrow_mut().take() {
280            task.cancel();
281        }
282    }
283
284    pub fn is_closed(&self) -> bool {
285        self.closed.get()
286    }
287}
288
289impl Drop for SubscriptionHandle {
290    fn drop(&mut self) {
291        self.close();
292    }
293}
294
295fn parse_json_str<T>(raw: &str) -> Result<T, ForgeClientError>
296where
297    T: DeserializeOwned,
298{
299    serde_json::from_str(raw)
300        .map_err(|err| ForgeClientError::new("INVALID_SSE_PAYLOAD", err.to_string(), None))
301}
302
303fn emit_sse_error<TValue, T>(
304    client: &ForgeClient,
305    callback: &Rc<RefCell<T>>,
306    envelope: SseEnvelopeRaw,
307) where
308    T: FnMut(StreamEvent<TValue>),
309{
310    client.emit_error(
311        callback,
312        ForgeClientError::new(
313            envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string()),
314            envelope
315                .message
316                .unwrap_or_else(|| "Subscription error".to_string()),
317            None,
318        ),
319    );
320}
321
322#[cfg(target_arch = "wasm32")]
323mod platform {
324    use std::cell::RefCell;
325    use std::rc::Rc;
326
327    use dioxus::prelude::spawn;
328    use futures_util::{StreamExt, stream};
329    use gloo_net::eventsource::futures::EventSource;
330    use gloo_net::http::Request;
331    use js_sys::encode_uri_component;
332    use serde::Serialize;
333    use serde::de::DeserializeOwned;
334
335    use super::{ForgeClient, SubscriptionHandle, emit_sse_error, parse_json_str};
336    use crate::types::{
337        ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
338        StreamEvent,
339    };
340
341    pub(super) async fn request_json(
342        client: &ForgeClient,
343        url: &str,
344        body: serde_json::Value,
345    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
346        let mut request = Request::post(url).header("Content-Type", "application/json");
347        if let Some(token) = client.get_token() {
348            request = request.header("Authorization", &format!("Bearer {token}"));
349        }
350
351        let request = request.body(body.to_string()).map_err(request_error)?;
352        request
353            .send()
354            .await
355            .map_err(request_error)?
356            .json()
357            .await
358            .map_err(request_error)
359    }
360
361    pub(super) async fn request_multipart(
362        client: &ForgeClient,
363        url: &str,
364        form: web_sys::FormData,
365    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
366        let mut request = Request::post(url);
367        if let Some(token) = client.get_token() {
368            request = request.header("Authorization", &format!("Bearer {token}"));
369        }
370
371        let response = request.body(form).map_err(request_error)?;
372        response
373            .send()
374            .await
375            .map_err(request_error)?
376            .json()
377            .await
378            .map_err(request_error)
379    }
380
381    pub(super) fn subscribe_query<TArgs, TResult, F>(
382        client: ForgeClient,
383        function_name: String,
384        args: TArgs,
385        callback: F,
386    ) -> SubscriptionHandle
387    where
388        TArgs: Serialize + Clone + 'static,
389        TResult: DeserializeOwned + Clone + 'static,
390        F: FnMut(StreamEvent<TResult>) + 'static,
391    {
392        let handle = SubscriptionHandle::new();
393        let handle_task = handle.clone();
394        let callback = Rc::new(RefCell::new(callback));
395
396        let task = spawn(async move {
397            client.emit_connection(&callback, ConnectionState::Connecting);
398
399            let args_value = match serde_json::to_value(args) {
400                Ok(value) => value,
401                Err(err) => {
402                    client.emit_error(
403                        &callback,
404                        ForgeClientError::new("SERIALIZATION_ERROR", err.to_string(), None),
405                    );
406                    client.emit_connection(&callback, ConnectionState::Disconnected);
407                    handle_task.finish();
408                    return;
409                }
410            };
411
412            let mut event_source = match EventSource::new(&events_url(&client)) {
413                Ok(source) => source,
414                Err(err) => {
415                    client.emit_error(
416                        &callback,
417                        ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
418                    );
419                    client.emit_connection(&callback, ConnectionState::Disconnected);
420                    handle_task.finish();
421                    return;
422                }
423            };
424
425            let mut connected_stream = match event_source.subscribe("connected") {
426                Ok(stream) => stream,
427                Err(err) => {
428                    client.emit_error(
429                        &callback,
430                        ForgeClientError::new("SSE_SUBSCRIBE_FAILED", err.to_string(), None),
431                    );
432                    client.emit_connection(&callback, ConnectionState::Disconnected);
433                    handle_task.finish();
434                    return;
435                }
436            };
437            let update_stream = match event_source.subscribe("update") {
438                Ok(stream) => stream,
439                Err(err) => {
440                    client.emit_error(
441                        &callback,
442                        ForgeClientError::new("SSE_SUBSCRIBE_FAILED", err.to_string(), None),
443                    );
444                    client.emit_connection(&callback, ConnectionState::Disconnected);
445                    handle_task.finish();
446                    return;
447                }
448            };
449            let error_stream = match event_source.subscribe("error") {
450                Ok(stream) => stream,
451                Err(err) => {
452                    client.emit_error(
453                        &callback,
454                        ForgeClientError::new("SSE_SUBSCRIBE_FAILED", err.to_string(), None),
455                    );
456                    client.emit_connection(&callback, ConnectionState::Disconnected);
457                    handle_task.finish();
458                    return;
459                }
460            };
461
462            let connected_event = match connected_stream.next().await {
463                Some(Ok((_kind, message))) => {
464                    let Some(raw) = message.data().as_string() else {
465                        client.emit_error(
466                            &callback,
467                            ForgeClientError::new(
468                                "INVALID_SSE_PAYLOAD",
469                                "SSE payload was not a string",
470                                None,
471                            ),
472                        );
473                        client.emit_connection(&callback, ConnectionState::Disconnected);
474                        handle_task.finish();
475                        return;
476                    };
477                    match parse_json_str::<ConnectedEvent>(&raw) {
478                        Ok(event) => event,
479                        Err(err) => {
480                            client.emit_error(&callback, err);
481                            client.emit_connection(&callback, ConnectionState::Disconnected);
482                            handle_task.finish();
483                            return;
484                        }
485                    }
486                }
487                Some(Err(err)) => {
488                    client.emit_error(
489                        &callback,
490                        ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
491                    );
492                    client.emit_connection(&callback, ConnectionState::Disconnected);
493                    handle_task.finish();
494                    return;
495                }
496                None => {
497                    client.emit_connection(&callback, ConnectionState::Disconnected);
498                    handle_task.finish();
499                    return;
500                }
501            };
502
503            if handle_task.is_closed() {
504                event_source.close();
505                handle_task.finish();
506                return;
507            }
508
509            let register_payload = serde_json::json!({
510                "session_id": connected_event.session_id,
511                "session_secret": connected_event.session_secret,
512                "id": client.random_id("sub"),
513                "function": function_name,
514                "args": args_value,
515            });
516
517            match request_json(
518                &client,
519                &format!("{}/_api/subscribe", client.inner.url),
520                register_payload,
521            )
522            .await
523            {
524                Ok(envelope) => match client.decode_envelope::<TResult>(envelope) {
525                    Ok(data) => {
526                        client.emit_connection(&callback, ConnectionState::Connected);
527                        (callback.borrow_mut())(StreamEvent::Data(data));
528                    }
529                    Err(err) => {
530                        client.emit_error(&callback, err);
531                        client.emit_connection(&callback, ConnectionState::Disconnected);
532                        handle_task.finish();
533                        return;
534                    }
535                },
536                Err(err) => {
537                    client.emit_error(&callback, err);
538                    client.emit_connection(&callback, ConnectionState::Disconnected);
539                    handle_task.finish();
540                    return;
541                }
542            }
543
544            let mut events = stream::select(update_stream, error_stream);
545            while !handle_task.is_closed() {
546                let Some(event) = events.next().await else {
547                    break;
548                };
549
550                match event {
551                    Ok((kind, message)) if kind == "update" => {
552                        let Some(raw) = message.data().as_string() else {
553                            client.emit_error(
554                                &callback,
555                                ForgeClientError::new(
556                                    "INVALID_SSE_PAYLOAD",
557                                    "SSE payload was not a string",
558                                    None,
559                                ),
560                            );
561                            continue;
562                        };
563                        let envelope = match parse_json_str::<SseEnvelopeRaw>(&raw) {
564                            Ok(value) => value,
565                            Err(err) => {
566                                client.emit_error(&callback, err);
567                                continue;
568                            }
569                        };
570                        if let Some(data) = envelope.payload {
571                            let parsed = match serde_json::from_value::<TResult>(data) {
572                                Ok(value) => value,
573                                Err(err) => {
574                                    client.emit_error(
575                                        &callback,
576                                        ForgeClientError::new(
577                                            "INVALID_SSE_PAYLOAD",
578                                            err.to_string(),
579                                            None,
580                                        ),
581                                    );
582                                    continue;
583                                }
584                            };
585                            (callback.borrow_mut())(StreamEvent::Data(parsed));
586                        }
587                    }
588                    Ok((_kind, message)) => {
589                        let Some(raw) = message.data().as_string() else {
590                            client.emit_error(
591                                &callback,
592                                ForgeClientError::new(
593                                    "INVALID_SSE_PAYLOAD",
594                                    "SSE payload was not a string",
595                                    None,
596                                ),
597                            );
598                            continue;
599                        };
600                        let envelope = match parse_json_str::<SseEnvelopeRaw>(&raw) {
601                            Ok(value) => value,
602                            Err(err) => {
603                                client.emit_error(&callback, err);
604                                continue;
605                            }
606                        };
607                        emit_sse_error(&client, &callback, envelope);
608                    }
609                    Err(err) => {
610                        client.emit_error(
611                            &callback,
612                            ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
613                        );
614                        break;
615                    }
616                }
617            }
618
619            event_source.close();
620            client.emit_connection(&callback, ConnectionState::Disconnected);
621            handle_task.finish();
622        });
623
624        handle.set_task(task);
625        handle
626    }
627
628    pub(super) fn subscribe_tracker<TResult, F>(
629        client: ForgeClient,
630        prefix: String,
631        payload: serde_json::Value,
632        endpoint: String,
633        callback: F,
634    ) -> SubscriptionHandle
635    where
636        TResult: DeserializeOwned + Clone + 'static,
637        F: FnMut(StreamEvent<TResult>) + 'static,
638    {
639        let handle = SubscriptionHandle::new();
640        let handle_task = handle.clone();
641        let callback = Rc::new(RefCell::new(callback));
642
643        let task = spawn(async move {
644            client.emit_connection(&callback, ConnectionState::Connecting);
645
646            let mut event_source = match EventSource::new(&events_url(&client)) {
647                Ok(source) => source,
648                Err(err) => {
649                    client.emit_error(
650                        &callback,
651                        ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
652                    );
653                    client.emit_connection(&callback, ConnectionState::Disconnected);
654                    handle_task.finish();
655                    return;
656                }
657            };
658
659            let mut connected_stream = match event_source.subscribe("connected") {
660                Ok(stream) => stream,
661                Err(err) => {
662                    client.emit_error(
663                        &callback,
664                        ForgeClientError::new("SSE_SUBSCRIBE_FAILED", err.to_string(), None),
665                    );
666                    client.emit_connection(&callback, ConnectionState::Disconnected);
667                    handle_task.finish();
668                    return;
669                }
670            };
671            let update_stream = match event_source.subscribe("update") {
672                Ok(stream) => stream,
673                Err(err) => {
674                    client.emit_error(
675                        &callback,
676                        ForgeClientError::new("SSE_SUBSCRIBE_FAILED", err.to_string(), None),
677                    );
678                    client.emit_connection(&callback, ConnectionState::Disconnected);
679                    handle_task.finish();
680                    return;
681                }
682            };
683            let error_stream = match event_source.subscribe("error") {
684                Ok(stream) => stream,
685                Err(err) => {
686                    client.emit_error(
687                        &callback,
688                        ForgeClientError::new("SSE_SUBSCRIBE_FAILED", err.to_string(), None),
689                    );
690                    client.emit_connection(&callback, ConnectionState::Disconnected);
691                    handle_task.finish();
692                    return;
693                }
694            };
695
696            let connected_event = match connected_stream.next().await {
697                Some(Ok((_kind, message))) => {
698                    let Some(raw) = message.data().as_string() else {
699                        client.emit_error(
700                            &callback,
701                            ForgeClientError::new(
702                                "INVALID_SSE_PAYLOAD",
703                                "SSE payload was not a string",
704                                None,
705                            ),
706                        );
707                        client.emit_connection(&callback, ConnectionState::Disconnected);
708                        handle_task.finish();
709                        return;
710                    };
711                    match parse_json_str::<ConnectedEvent>(&raw) {
712                        Ok(event) => event,
713                        Err(err) => {
714                            client.emit_error(&callback, err);
715                            client.emit_connection(&callback, ConnectionState::Disconnected);
716                            handle_task.finish();
717                            return;
718                        }
719                    }
720                }
721                Some(Err(err)) => {
722                    client.emit_error(
723                        &callback,
724                        ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
725                    );
726                    client.emit_connection(&callback, ConnectionState::Disconnected);
727                    handle_task.finish();
728                    return;
729                }
730                None => {
731                    client.emit_connection(&callback, ConnectionState::Disconnected);
732                    handle_task.finish();
733                    return;
734                }
735            };
736
737            if handle_task.is_closed() {
738                event_source.close();
739                handle_task.finish();
740                return;
741            }
742
743            let mut register_payload = payload;
744            let register_object = register_payload
745                .as_object_mut()
746                .expect("tracker payload must be an object");
747            register_object.insert(
748                "session_id".to_string(),
749                serde_json::Value::String(connected_event.session_id.unwrap_or_default()),
750            );
751            register_object.insert(
752                "session_secret".to_string(),
753                serde_json::Value::String(connected_event.session_secret.unwrap_or_default()),
754            );
755            register_object.insert(
756                "id".to_string(),
757                serde_json::Value::String(client.random_id(&prefix)),
758            );
759
760            if let Err(err) = request_json(
761                &client,
762                &format!("{}{}", client.inner.url, endpoint),
763                register_payload,
764            )
765            .await
766            {
767                client.emit_error(&callback, err);
768                client.emit_connection(&callback, ConnectionState::Disconnected);
769                handle_task.finish();
770                return;
771            }
772
773            client.emit_connection(&callback, ConnectionState::Connected);
774
775            let mut events = stream::select(update_stream, error_stream);
776            while !handle_task.is_closed() {
777                let Some(event) = events.next().await else {
778                    break;
779                };
780
781                match event {
782                    Ok((kind, message)) if kind == "update" => {
783                        let Some(raw) = message.data().as_string() else {
784                            client.emit_error(
785                                &callback,
786                                ForgeClientError::new(
787                                    "INVALID_SSE_PAYLOAD",
788                                    "SSE payload was not a string",
789                                    None,
790                                ),
791                            );
792                            continue;
793                        };
794                        let envelope = match parse_json_str::<SseEnvelopeRaw>(&raw) {
795                            Ok(value) => value,
796                            Err(err) => {
797                                client.emit_error(&callback, err);
798                                continue;
799                            }
800                        };
801                        if let Some(data) = envelope.payload {
802                            let parsed = match serde_json::from_value::<TResult>(data) {
803                                Ok(value) => value,
804                                Err(err) => {
805                                    client.emit_error(
806                                        &callback,
807                                        ForgeClientError::new(
808                                            "INVALID_SSE_PAYLOAD",
809                                            err.to_string(),
810                                            None,
811                                        ),
812                                    );
813                                    continue;
814                                }
815                            };
816                            (callback.borrow_mut())(StreamEvent::Data(parsed));
817                        }
818                    }
819                    Ok((_kind, message)) => {
820                        let Some(raw) = message.data().as_string() else {
821                            client.emit_error(
822                                &callback,
823                                ForgeClientError::new(
824                                    "INVALID_SSE_PAYLOAD",
825                                    "SSE payload was not a string",
826                                    None,
827                                ),
828                            );
829                            continue;
830                        };
831                        let envelope = match parse_json_str::<SseEnvelopeRaw>(&raw) {
832                            Ok(value) => value,
833                            Err(err) => {
834                                client.emit_error(&callback, err);
835                                continue;
836                            }
837                        };
838                        emit_sse_error(&client, &callback, envelope);
839                    }
840                    Err(err) => {
841                        client.emit_error(
842                            &callback,
843                            ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
844                        );
845                        break;
846                    }
847                }
848            }
849
850            event_source.close();
851            client.emit_connection(&callback, ConnectionState::Disconnected);
852            handle_task.finish();
853        });
854
855        handle.set_task(task);
856        handle
857    }
858
859    fn events_url(client: &ForgeClient) -> String {
860        match client.get_token() {
861            Some(token) => format!(
862                "{}/_api/events?token={}",
863                client.inner.url,
864                encode_uri_component(&token)
865            ),
866            None => format!("{}/_api/events", client.inner.url),
867        }
868    }
869
870    fn request_error(err: gloo_net::Error) -> ForgeClientError {
871        ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
872    }
873}
874
875#[cfg(not(target_arch = "wasm32"))]
876mod platform {
877    use std::cell::RefCell;
878    use std::rc::Rc;
879
880    use dioxus::prelude::spawn;
881    use futures_util::StreamExt;
882    use reqwest::Client;
883    use reqwest_eventsource::{Event, EventSource};
884    use serde::Serialize;
885    use serde::de::DeserializeOwned;
886
887    use super::{ForgeClient, SubscriptionHandle, emit_sse_error, parse_json_str};
888    use crate::types::{
889        ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
890        StreamEvent,
891    };
892
893    pub(super) async fn request_json(
894        client: &ForgeClient,
895        url: &str,
896        body: serde_json::Value,
897    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
898        let mut request = Client::new().post(url).json(&body);
899        if let Some(token) = client.get_token() {
900            request = request.bearer_auth(token);
901        }
902
903        request
904            .send()
905            .await
906            .map_err(request_error)?
907            .json()
908            .await
909            .map_err(request_error)
910    }
911
912    pub(super) async fn request_multipart(
913        client: &ForgeClient,
914        url: &str,
915        form: reqwest::multipart::Form,
916    ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
917        let mut request = Client::new().post(url).multipart(form);
918        if let Some(token) = client.get_token() {
919            request = request.bearer_auth(token);
920        }
921
922        request
923            .send()
924            .await
925            .map_err(request_error)?
926            .json()
927            .await
928            .map_err(request_error)
929    }
930
931    pub(super) fn subscribe_query<TArgs, TResult, F>(
932        client: ForgeClient,
933        function_name: String,
934        args: TArgs,
935        callback: F,
936    ) -> SubscriptionHandle
937    where
938        TArgs: Serialize + Clone + 'static,
939        TResult: DeserializeOwned + Clone + 'static,
940        F: FnMut(StreamEvent<TResult>) + 'static,
941    {
942        let handle = SubscriptionHandle::new();
943        let handle_task = handle.clone();
944        let callback = Rc::new(RefCell::new(callback));
945
946        let task = spawn(async move {
947            client.emit_connection(&callback, ConnectionState::Connecting);
948
949            let args_value = match serde_json::to_value(args) {
950                Ok(value) => value,
951                Err(err) => {
952                    client.emit_error(
953                        &callback,
954                        ForgeClientError::new("SERIALIZATION_ERROR", err.to_string(), None),
955                    );
956                    client.emit_connection(&callback, ConnectionState::Disconnected);
957                    handle_task.finish();
958                    return;
959                }
960            };
961
962            let mut event_source = match open_event_source(&client) {
963                Ok(source) => source,
964                Err(err) => {
965                    client.emit_error(&callback, err);
966                    client.emit_connection(&callback, ConnectionState::Disconnected);
967                    handle_task.finish();
968                    return;
969                }
970            };
971
972            let connected_event =
973                match next_connected_event(&mut event_source, &client, &callback).await {
974                    Ok(Some(event)) => event,
975                    Ok(None) => {
976                        client.emit_connection(&callback, ConnectionState::Disconnected);
977                        handle_task.finish();
978                        return;
979                    }
980                    Err(err) => {
981                        client.emit_error(&callback, err);
982                        client.emit_connection(&callback, ConnectionState::Disconnected);
983                        handle_task.finish();
984                        return;
985                    }
986                };
987
988            if handle_task.is_closed() {
989                event_source.close();
990                handle_task.finish();
991                return;
992            }
993
994            let register_payload = serde_json::json!({
995                "session_id": connected_event.session_id,
996                "session_secret": connected_event.session_secret,
997                "id": client.random_id("sub"),
998                "function": function_name,
999                "args": args_value,
1000            });
1001
1002            match request_json(
1003                &client,
1004                &format!("{}/_api/subscribe", client.inner.url),
1005                register_payload,
1006            )
1007            .await
1008            {
1009                Ok(envelope) => match client.decode_envelope::<TResult>(envelope) {
1010                    Ok(data) => {
1011                        client.emit_connection(&callback, ConnectionState::Connected);
1012                        (callback.borrow_mut())(StreamEvent::Data(data));
1013                    }
1014                    Err(err) => {
1015                        client.emit_error(&callback, err);
1016                        client.emit_connection(&callback, ConnectionState::Disconnected);
1017                        handle_task.finish();
1018                        return;
1019                    }
1020                },
1021                Err(err) => {
1022                    client.emit_error(&callback, err);
1023                    client.emit_connection(&callback, ConnectionState::Disconnected);
1024                    handle_task.finish();
1025                    return;
1026                }
1027            }
1028
1029            while !handle_task.is_closed() {
1030                let Some(event) = event_source.next().await else {
1031                    break;
1032                };
1033
1034                match event {
1035                    Ok(Event::Open) => {}
1036                    Ok(Event::Message(message)) if message.event == "update" => {
1037                        let envelope = match parse_json_str::<SseEnvelopeRaw>(&message.data) {
1038                            Ok(value) => value,
1039                            Err(err) => {
1040                                client.emit_error(&callback, err);
1041                                continue;
1042                            }
1043                        };
1044                        if let Some(data) = envelope.payload {
1045                            let parsed = match serde_json::from_value::<TResult>(data) {
1046                                Ok(value) => value,
1047                                Err(err) => {
1048                                    client.emit_error(
1049                                        &callback,
1050                                        ForgeClientError::new(
1051                                            "INVALID_SSE_PAYLOAD",
1052                                            err.to_string(),
1053                                            None,
1054                                        ),
1055                                    );
1056                                    continue;
1057                                }
1058                            };
1059                            (callback.borrow_mut())(StreamEvent::Data(parsed));
1060                        }
1061                    }
1062                    Ok(Event::Message(message)) if message.event == "error" => {
1063                        let envelope = match parse_json_str::<SseEnvelopeRaw>(&message.data) {
1064                            Ok(value) => value,
1065                            Err(err) => {
1066                                client.emit_error(&callback, err);
1067                                continue;
1068                            }
1069                        };
1070                        emit_sse_error(&client, &callback, envelope);
1071                    }
1072                    Ok(Event::Message(_)) => {}
1073                    Err(err) => {
1074                        client.emit_error(
1075                            &callback,
1076                            ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
1077                        );
1078                        break;
1079                    }
1080                }
1081            }
1082
1083            event_source.close();
1084            client.emit_connection(&callback, ConnectionState::Disconnected);
1085            handle_task.finish();
1086        });
1087
1088        handle.set_task(task);
1089        handle
1090    }
1091
1092    pub(super) fn subscribe_tracker<TResult, F>(
1093        client: ForgeClient,
1094        prefix: String,
1095        payload: serde_json::Value,
1096        endpoint: String,
1097        callback: F,
1098    ) -> SubscriptionHandle
1099    where
1100        TResult: DeserializeOwned + Clone + 'static,
1101        F: FnMut(StreamEvent<TResult>) + 'static,
1102    {
1103        let handle = SubscriptionHandle::new();
1104        let handle_task = handle.clone();
1105        let callback = Rc::new(RefCell::new(callback));
1106
1107        let task = spawn(async move {
1108            client.emit_connection(&callback, ConnectionState::Connecting);
1109
1110            let mut event_source = match open_event_source(&client) {
1111                Ok(source) => source,
1112                Err(err) => {
1113                    client.emit_error(&callback, err);
1114                    client.emit_connection(&callback, ConnectionState::Disconnected);
1115                    handle_task.finish();
1116                    return;
1117                }
1118            };
1119
1120            let connected_event =
1121                match next_connected_event(&mut event_source, &client, &callback).await {
1122                    Ok(Some(event)) => event,
1123                    Ok(None) => {
1124                        client.emit_connection(&callback, ConnectionState::Disconnected);
1125                        handle_task.finish();
1126                        return;
1127                    }
1128                    Err(err) => {
1129                        client.emit_error(&callback, err);
1130                        client.emit_connection(&callback, ConnectionState::Disconnected);
1131                        handle_task.finish();
1132                        return;
1133                    }
1134                };
1135
1136            if handle_task.is_closed() {
1137                event_source.close();
1138                handle_task.finish();
1139                return;
1140            }
1141
1142            let mut register_payload = payload;
1143            let register_object = register_payload
1144                .as_object_mut()
1145                .expect("tracker payload must be an object");
1146            register_object.insert(
1147                "session_id".to_string(),
1148                serde_json::Value::String(connected_event.session_id.unwrap_or_default()),
1149            );
1150            register_object.insert(
1151                "session_secret".to_string(),
1152                serde_json::Value::String(connected_event.session_secret.unwrap_or_default()),
1153            );
1154            register_object.insert(
1155                "id".to_string(),
1156                serde_json::Value::String(client.random_id(&prefix)),
1157            );
1158
1159            if let Err(err) = request_json(
1160                &client,
1161                &format!("{}{}", client.inner.url, endpoint),
1162                register_payload,
1163            )
1164            .await
1165            {
1166                client.emit_error(&callback, err);
1167                client.emit_connection(&callback, ConnectionState::Disconnected);
1168                handle_task.finish();
1169                return;
1170            }
1171
1172            client.emit_connection(&callback, ConnectionState::Connected);
1173
1174            while !handle_task.is_closed() {
1175                let Some(event) = event_source.next().await else {
1176                    break;
1177                };
1178
1179                match event {
1180                    Ok(Event::Open) => {}
1181                    Ok(Event::Message(message)) if message.event == "update" => {
1182                        let envelope = match parse_json_str::<SseEnvelopeRaw>(&message.data) {
1183                            Ok(value) => value,
1184                            Err(err) => {
1185                                client.emit_error(&callback, err);
1186                                continue;
1187                            }
1188                        };
1189                        if let Some(data) = envelope.payload {
1190                            let parsed = match serde_json::from_value::<TResult>(data) {
1191                                Ok(value) => value,
1192                                Err(err) => {
1193                                    client.emit_error(
1194                                        &callback,
1195                                        ForgeClientError::new(
1196                                            "INVALID_SSE_PAYLOAD",
1197                                            err.to_string(),
1198                                            None,
1199                                        ),
1200                                    );
1201                                    continue;
1202                                }
1203                            };
1204                            (callback.borrow_mut())(StreamEvent::Data(parsed));
1205                        }
1206                    }
1207                    Ok(Event::Message(message)) if message.event == "error" => {
1208                        let envelope = match parse_json_str::<SseEnvelopeRaw>(&message.data) {
1209                            Ok(value) => value,
1210                            Err(err) => {
1211                                client.emit_error(&callback, err);
1212                                continue;
1213                            }
1214                        };
1215                        emit_sse_error(&client, &callback, envelope);
1216                    }
1217                    Ok(Event::Message(_)) => {}
1218                    Err(err) => {
1219                        client.emit_error(
1220                            &callback,
1221                            ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
1222                        );
1223                        break;
1224                    }
1225                }
1226            }
1227
1228            event_source.close();
1229            client.emit_connection(&callback, ConnectionState::Disconnected);
1230            handle_task.finish();
1231        });
1232
1233        handle.set_task(task);
1234        handle
1235    }
1236
1237    fn open_event_source(client: &ForgeClient) -> Result<EventSource, ForgeClientError> {
1238        let mut request = Client::new().get(format!("{}/_api/events", client.inner.url));
1239        if let Some(token) = client.get_token() {
1240            request = request.bearer_auth(token);
1241        }
1242
1243        EventSource::new(request)
1244            .map_err(|err| ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None))
1245    }
1246
1247    async fn next_connected_event<TValue, T>(
1248        event_source: &mut EventSource,
1249        client: &ForgeClient,
1250        callback: &Rc<RefCell<T>>,
1251    ) -> Result<Option<ConnectedEvent>, ForgeClientError>
1252    where
1253        T: FnMut(StreamEvent<TValue>),
1254    {
1255        while let Some(event) = event_source.next().await {
1256            match event {
1257                Ok(Event::Open) => continue,
1258                Ok(Event::Message(message)) if message.event == "connected" => {
1259                    return parse_json_str::<ConnectedEvent>(&message.data).map(Some);
1260                }
1261                Ok(Event::Message(message)) if message.event == "error" => {
1262                    let envelope = parse_json_str::<SseEnvelopeRaw>(&message.data)?;
1263                    emit_sse_error(client, callback, envelope);
1264                }
1265                Ok(Event::Message(_)) => {}
1266                Err(err) => {
1267                    return Err(ForgeClientError::new(
1268                        "SSE_CONNECTION_FAILED",
1269                        err.to_string(),
1270                        None,
1271                    ));
1272                }
1273            }
1274        }
1275
1276        Ok(None)
1277    }
1278
1279    fn request_error(err: reqwest::Error) -> ForgeClientError {
1280        ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
1281    }
1282}