1
2use std::cell::{Cell, RefCell};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use dioxus::prelude::{Signal, WritableExt, dioxus_core::Task};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9
10use crate::types::{
11 ConnectionState, ForgeClientError, ForgeError, RpcEnvelopeRaw, SseEnvelopeRaw, StreamEvent,
12};
13
14type TokenProvider = Rc<dyn Fn() -> Option<String>>;
15type AuthErrorHandler = Rc<dyn Fn(ForgeError)>;
16
17static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
18
19#[derive(Clone)]
20pub struct ForgeClientConfig {
21 pub url: String,
22 pub get_token: Option<TokenProvider>,
23 pub on_auth_error: Option<AuthErrorHandler>,
24 pub(crate) connection_state: Option<Signal<ConnectionState>>,
25}
26
27impl ForgeClientConfig {
28 pub fn new(url: impl Into<String>) -> Self {
29 Self {
30 url: url.into(),
31 get_token: None,
32 on_auth_error: None,
33 connection_state: None,
34 }
35 }
36
37 pub fn with_token_provider(mut self, provider: impl Fn() -> Option<String> + 'static) -> Self {
38 self.get_token = Some(Rc::new(provider));
39 self
40 }
41
42 pub fn with_auth_error_handler(
43 mut self,
44 handler: impl Fn(ForgeError) + 'static,
45 ) -> Self {
46 self.on_auth_error = Some(Rc::new(handler));
47 self
48 }
49
50 pub(crate) fn with_connection_state(mut self, state: Signal<ConnectionState>) -> Self {
51 self.connection_state = Some(state);
52 self
53 }
54}
55
56#[derive(Clone)]
57pub struct ForgeClient {
58 inner: Rc<ForgeClientInner>,
59}
60
61struct ForgeClientInner {
62 url: String,
63 get_token: Option<TokenProvider>,
64 on_auth_error: Option<AuthErrorHandler>,
65 connection_state: Option<Signal<ConnectionState>>,
66}
67
68impl ForgeClient {
69 pub fn new(config: ForgeClientConfig) -> Self {
70 Self {
71 inner: Rc::new(ForgeClientInner {
72 url: config.url.trim_end_matches('/').to_string(),
73 get_token: config.get_token,
74 on_auth_error: config.on_auth_error,
75 connection_state: config.connection_state,
76 }),
77 }
78 }
79
80 pub async fn call<TArgs, TResult>(
81 &self,
82 function_name: &str,
83 args: TArgs,
84 ) -> Result<TResult, ForgeClientError>
85 where
86 TArgs: Serialize,
87 TResult: DeserializeOwned,
88 {
89 let body = serde_json::json!({ "args": args });
90 let envelope = platform::request_json(
91 self,
92 &format!("{}/_api/rpc/{}", self.inner.url, function_name),
93 body,
94 )
95 .await?;
96 self.decode_envelope(envelope)
97 }
98
99 #[cfg(target_arch = "wasm32")]
100 pub async fn call_multipart<TResult>(
101 &self,
102 function_name: &str,
103 form: web_sys::FormData,
104 ) -> Result<TResult, ForgeClientError>
105 where
106 TResult: DeserializeOwned,
107 {
108 let envelope = platform::request_multipart(
109 self,
110 &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
111 form,
112 )
113 .await?;
114 self.decode_envelope(envelope)
115 }
116
117 #[cfg(not(target_arch = "wasm32"))]
118 pub async fn call_multipart<TResult>(
119 &self,
120 function_name: &str,
121 form: reqwest::multipart::Form,
122 ) -> Result<TResult, ForgeClientError>
123 where
124 TResult: DeserializeOwned,
125 {
126 let envelope = platform::request_multipart(
127 self,
128 &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
129 form,
130 )
131 .await?;
132 self.decode_envelope(envelope)
133 }
134
135 pub fn subscribe_query<TArgs, TResult, F>(
136 &self,
137 function_name: &str,
138 args: TArgs,
139 callback: F,
140 ) -> SubscriptionHandle
141 where
142 TArgs: Serialize + Clone + 'static,
143 TResult: DeserializeOwned + Clone + 'static,
144 F: FnMut(StreamEvent<TResult>) + 'static,
145 {
146 platform::subscribe_query(self.clone(), function_name.to_string(), args, callback)
147 }
148
149 pub fn subscribe_job<TResult, F>(&self, job_id: String, callback: F) -> SubscriptionHandle
150 where
151 TResult: DeserializeOwned + Clone + 'static,
152 F: FnMut(StreamEvent<TResult>) + 'static,
153 {
154 self.subscribe_tracker(
155 "job",
156 serde_json::json!({ "job_id": job_id }),
157 "/_api/subscribe-job",
158 callback,
159 )
160 }
161
162 pub fn subscribe_workflow<TResult, F>(
163 &self,
164 workflow_id: String,
165 callback: F,
166 ) -> SubscriptionHandle
167 where
168 TResult: DeserializeOwned + Clone + 'static,
169 F: FnMut(StreamEvent<TResult>) + 'static,
170 {
171 self.subscribe_tracker(
172 "wf",
173 serde_json::json!({ "workflow_id": workflow_id }),
174 "/_api/subscribe-workflow",
175 callback,
176 )
177 }
178
179 fn subscribe_tracker<TResult, F>(
180 &self,
181 prefix: &str,
182 payload: serde_json::Value,
183 endpoint: &str,
184 callback: F,
185 ) -> SubscriptionHandle
186 where
187 TResult: DeserializeOwned + Clone + 'static,
188 F: FnMut(StreamEvent<TResult>) + 'static,
189 {
190 platform::subscribe_tracker(
191 self.clone(),
192 prefix.to_string(),
193 payload,
194 endpoint.to_string(),
195 callback,
196 )
197 }
198
199 fn get_token(&self) -> Option<String> {
200 self.inner.get_token.as_ref().and_then(|provider| provider())
201 }
202
203 fn emit_connection<TValue, T>(&self, callback: &Rc<RefCell<T>>, state: ConnectionState)
204 where
205 T: FnMut(StreamEvent<TValue>),
206 {
207 if let Some(mut signal) = self.inner.connection_state {
208 signal.set(state);
209 }
210 (callback.borrow_mut())(StreamEvent::Connection(state));
211 }
212
213 fn emit_error<TValue, T>(&self, callback: &Rc<RefCell<T>>, error: ForgeClientError)
214 where
215 T: FnMut(StreamEvent<TValue>),
216 {
217 if error.code == "UNAUTHORIZED" {
218 if let Some(handler) = &self.inner.on_auth_error {
219 handler(error.as_forge_error());
220 }
221 }
222 (callback.borrow_mut())(StreamEvent::Error(error));
223 }
224
225 fn decode_envelope<TResult>(
226 &self,
227 envelope: RpcEnvelopeRaw,
228 ) -> Result<TResult, ForgeClientError>
229 where
230 TResult: DeserializeOwned,
231 {
232 if !envelope.success {
233 let error = envelope.error.unwrap_or(ForgeError {
234 code: "UNKNOWN".to_string(),
235 message: "Unknown error".to_string(),
236 details: None,
237 });
238 return Err(ForgeClientError::new(error.code, error.message, error.details));
239 }
240
241 let data = envelope.data.ok_or_else(|| {
242 ForgeClientError::new("EMPTY_RESPONSE", "Server returned no data", None)
243 })?;
244 serde_json::from_value(data)
245 .map_err(|err| ForgeClientError::new("DESERIALIZATION_ERROR", err.to_string(), None))
246 }
247
248 fn random_id(&self, prefix: &str) -> String {
249 let id = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
250 format!("{prefix}-{id}")
251 }
252}
253
254#[derive(Clone)]
255pub struct SubscriptionHandle {
256 closed: Rc<Cell<bool>>,
257 task: Rc<RefCell<Option<Task>>>,
258}
259
260impl SubscriptionHandle {
261 fn new() -> Self {
262 Self {
263 closed: Rc::new(Cell::new(false)),
264 task: Rc::new(RefCell::new(None)),
265 }
266 }
267
268 fn set_task(&self, task: Task) {
269 *self.task.borrow_mut() = Some(task);
270 }
271
272 fn finish(&self) {
273 self.closed.set(true);
274 self.task.borrow_mut().take();
275 }
276
277 pub fn close(&self) {
278 self.closed.set(true);
279 if let Some(task) = self.task.borrow_mut().take() {
280 task.cancel();
281 }
282 }
283
284 pub fn is_closed(&self) -> bool {
285 self.closed.get()
286 }
287}
288
289impl Drop for SubscriptionHandle {
290 fn drop(&mut self) {
291 self.close();
292 }
293}
294
295fn parse_json_str<T>(raw: &str) -> Result<T, ForgeClientError>
296where
297 T: DeserializeOwned,
298{
299 serde_json::from_str(raw)
300 .map_err(|err| ForgeClientError::new("INVALID_SSE_PAYLOAD", err.to_string(), None))
301}
302
303fn emit_sse_error<TValue, T>(
304 client: &ForgeClient,
305 callback: &Rc<RefCell<T>>,
306 envelope: SseEnvelopeRaw,
307) where
308 T: FnMut(StreamEvent<TValue>),
309{
310 client.emit_error(
311 callback,
312 ForgeClientError::new(
313 envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string()),
314 envelope
315 .message
316 .unwrap_or_else(|| "Subscription error".to_string()),
317 None,
318 ),
319 );
320}
321
322#[cfg(target_arch = "wasm32")]
323mod platform {
324 use std::cell::RefCell;
325 use std::rc::Rc;
326
327 use dioxus::prelude::spawn;
328 use futures_util::{StreamExt, stream};
329 use gloo_net::eventsource::futures::EventSource;
330 use gloo_net::http::Request;
331 use js_sys::encode_uri_component;
332 use serde::Serialize;
333 use serde::de::DeserializeOwned;
334
335 use super::{ForgeClient, SubscriptionHandle, emit_sse_error, parse_json_str};
336 use crate::types::{
337 ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
338 StreamEvent,
339 };
340
341 pub(super) async fn request_json(
342 client: &ForgeClient,
343 url: &str,
344 body: serde_json::Value,
345 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
346 let mut request = Request::post(url).header("Content-Type", "application/json");
347 if let Some(token) = client.get_token() {
348 request = request.header("Authorization", &format!("Bearer {token}"));
349 }
350
351 let request = request.body(body.to_string()).map_err(request_error)?;
352 request
353 .send()
354 .await
355 .map_err(request_error)?
356 .json()
357 .await
358 .map_err(request_error)
359 }
360
361 pub(super) async fn request_multipart(
362 client: &ForgeClient,
363 url: &str,
364 form: web_sys::FormData,
365 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
366 let mut request = Request::post(url);
367 if let Some(token) = client.get_token() {
368 request = request.header("Authorization", &format!("Bearer {token}"));
369 }
370
371 let response = request.body(form).map_err(request_error)?;
372 response
373 .send()
374 .await
375 .map_err(request_error)?
376 .json()
377 .await
378 .map_err(request_error)
379 }
380
381 pub(super) fn subscribe_query<TArgs, TResult, F>(
382 client: ForgeClient,
383 function_name: String,
384 args: TArgs,
385 callback: F,
386 ) -> SubscriptionHandle
387 where
388 TArgs: Serialize + Clone + 'static,
389 TResult: DeserializeOwned + Clone + 'static,
390 F: FnMut(StreamEvent<TResult>) + 'static,
391 {
392 let handle = SubscriptionHandle::new();
393 let handle_task = handle.clone();
394 let callback = Rc::new(RefCell::new(callback));
395
396 let task = spawn(async move {
397 client.emit_connection(&callback, ConnectionState::Connecting);
398
399 let args_value = match serde_json::to_value(args) {
400 Ok(value) => value,
401 Err(err) => {
402 client.emit_error(
403 &callback,
404 ForgeClientError::new("SERIALIZATION_ERROR", err.to_string(), None),
405 );
406 client.emit_connection(&callback, ConnectionState::Disconnected);
407 handle_task.finish();
408 return;
409 }
410 };
411
412 let mut event_source = match EventSource::new(&events_url(&client)) {
413 Ok(source) => source,
414 Err(err) => {
415 client.emit_error(
416 &callback,
417 ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
418 );
419 client.emit_connection(&callback, ConnectionState::Disconnected);
420 handle_task.finish();
421 return;
422 }
423 };
424
425 let mut connected_stream = match event_source.subscribe("connected") {
426 Ok(stream) => stream,
427 Err(err) => {
428 client.emit_error(
429 &callback,
430 ForgeClientError::new("SSE_SUBSCRIBE_FAILED", err.to_string(), None),
431 );
432 client.emit_connection(&callback, ConnectionState::Disconnected);
433 handle_task.finish();
434 return;
435 }
436 };
437 let update_stream = match event_source.subscribe("update") {
438 Ok(stream) => stream,
439 Err(err) => {
440 client.emit_error(
441 &callback,
442 ForgeClientError::new("SSE_SUBSCRIBE_FAILED", err.to_string(), None),
443 );
444 client.emit_connection(&callback, ConnectionState::Disconnected);
445 handle_task.finish();
446 return;
447 }
448 };
449 let error_stream = match event_source.subscribe("error") {
450 Ok(stream) => stream,
451 Err(err) => {
452 client.emit_error(
453 &callback,
454 ForgeClientError::new("SSE_SUBSCRIBE_FAILED", err.to_string(), None),
455 );
456 client.emit_connection(&callback, ConnectionState::Disconnected);
457 handle_task.finish();
458 return;
459 }
460 };
461
462 let connected_event = match connected_stream.next().await {
463 Some(Ok((_kind, message))) => {
464 let Some(raw) = message.data().as_string() else {
465 client.emit_error(
466 &callback,
467 ForgeClientError::new(
468 "INVALID_SSE_PAYLOAD",
469 "SSE payload was not a string",
470 None,
471 ),
472 );
473 client.emit_connection(&callback, ConnectionState::Disconnected);
474 handle_task.finish();
475 return;
476 };
477 match parse_json_str::<ConnectedEvent>(&raw) {
478 Ok(event) => event,
479 Err(err) => {
480 client.emit_error(&callback, err);
481 client.emit_connection(&callback, ConnectionState::Disconnected);
482 handle_task.finish();
483 return;
484 }
485 }
486 }
487 Some(Err(err)) => {
488 client.emit_error(
489 &callback,
490 ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
491 );
492 client.emit_connection(&callback, ConnectionState::Disconnected);
493 handle_task.finish();
494 return;
495 }
496 None => {
497 client.emit_connection(&callback, ConnectionState::Disconnected);
498 handle_task.finish();
499 return;
500 }
501 };
502
503 if handle_task.is_closed() {
504 event_source.close();
505 handle_task.finish();
506 return;
507 }
508
509 let register_payload = serde_json::json!({
510 "session_id": connected_event.session_id,
511 "session_secret": connected_event.session_secret,
512 "id": client.random_id("sub"),
513 "function": function_name,
514 "args": args_value,
515 });
516
517 match request_json(
518 &client,
519 &format!("{}/_api/subscribe", client.inner.url),
520 register_payload,
521 )
522 .await
523 {
524 Ok(envelope) => match client.decode_envelope::<TResult>(envelope) {
525 Ok(data) => {
526 client.emit_connection(&callback, ConnectionState::Connected);
527 (callback.borrow_mut())(StreamEvent::Data(data));
528 }
529 Err(err) => {
530 client.emit_error(&callback, err);
531 client.emit_connection(&callback, ConnectionState::Disconnected);
532 handle_task.finish();
533 return;
534 }
535 },
536 Err(err) => {
537 client.emit_error(&callback, err);
538 client.emit_connection(&callback, ConnectionState::Disconnected);
539 handle_task.finish();
540 return;
541 }
542 }
543
544 let mut events = stream::select(update_stream, error_stream);
545 while !handle_task.is_closed() {
546 let Some(event) = events.next().await else {
547 break;
548 };
549
550 match event {
551 Ok((kind, message)) if kind == "update" => {
552 let Some(raw) = message.data().as_string() else {
553 client.emit_error(
554 &callback,
555 ForgeClientError::new(
556 "INVALID_SSE_PAYLOAD",
557 "SSE payload was not a string",
558 None,
559 ),
560 );
561 continue;
562 };
563 let envelope = match parse_json_str::<SseEnvelopeRaw>(&raw) {
564 Ok(value) => value,
565 Err(err) => {
566 client.emit_error(&callback, err);
567 continue;
568 }
569 };
570 if let Some(data) = envelope.payload {
571 let parsed = match serde_json::from_value::<TResult>(data) {
572 Ok(value) => value,
573 Err(err) => {
574 client.emit_error(
575 &callback,
576 ForgeClientError::new(
577 "INVALID_SSE_PAYLOAD",
578 err.to_string(),
579 None,
580 ),
581 );
582 continue;
583 }
584 };
585 (callback.borrow_mut())(StreamEvent::Data(parsed));
586 }
587 }
588 Ok((_kind, message)) => {
589 let Some(raw) = message.data().as_string() else {
590 client.emit_error(
591 &callback,
592 ForgeClientError::new(
593 "INVALID_SSE_PAYLOAD",
594 "SSE payload was not a string",
595 None,
596 ),
597 );
598 continue;
599 };
600 let envelope = match parse_json_str::<SseEnvelopeRaw>(&raw) {
601 Ok(value) => value,
602 Err(err) => {
603 client.emit_error(&callback, err);
604 continue;
605 }
606 };
607 emit_sse_error(&client, &callback, envelope);
608 }
609 Err(err) => {
610 client.emit_error(
611 &callback,
612 ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
613 );
614 break;
615 }
616 }
617 }
618
619 event_source.close();
620 client.emit_connection(&callback, ConnectionState::Disconnected);
621 handle_task.finish();
622 });
623
624 handle.set_task(task);
625 handle
626 }
627
628 pub(super) fn subscribe_tracker<TResult, F>(
629 client: ForgeClient,
630 prefix: String,
631 payload: serde_json::Value,
632 endpoint: String,
633 callback: F,
634 ) -> SubscriptionHandle
635 where
636 TResult: DeserializeOwned + Clone + 'static,
637 F: FnMut(StreamEvent<TResult>) + 'static,
638 {
639 let handle = SubscriptionHandle::new();
640 let handle_task = handle.clone();
641 let callback = Rc::new(RefCell::new(callback));
642
643 let task = spawn(async move {
644 client.emit_connection(&callback, ConnectionState::Connecting);
645
646 let mut event_source = match EventSource::new(&events_url(&client)) {
647 Ok(source) => source,
648 Err(err) => {
649 client.emit_error(
650 &callback,
651 ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
652 );
653 client.emit_connection(&callback, ConnectionState::Disconnected);
654 handle_task.finish();
655 return;
656 }
657 };
658
659 let mut connected_stream = match event_source.subscribe("connected") {
660 Ok(stream) => stream,
661 Err(err) => {
662 client.emit_error(
663 &callback,
664 ForgeClientError::new("SSE_SUBSCRIBE_FAILED", err.to_string(), None),
665 );
666 client.emit_connection(&callback, ConnectionState::Disconnected);
667 handle_task.finish();
668 return;
669 }
670 };
671 let update_stream = match event_source.subscribe("update") {
672 Ok(stream) => stream,
673 Err(err) => {
674 client.emit_error(
675 &callback,
676 ForgeClientError::new("SSE_SUBSCRIBE_FAILED", err.to_string(), None),
677 );
678 client.emit_connection(&callback, ConnectionState::Disconnected);
679 handle_task.finish();
680 return;
681 }
682 };
683 let error_stream = match event_source.subscribe("error") {
684 Ok(stream) => stream,
685 Err(err) => {
686 client.emit_error(
687 &callback,
688 ForgeClientError::new("SSE_SUBSCRIBE_FAILED", err.to_string(), None),
689 );
690 client.emit_connection(&callback, ConnectionState::Disconnected);
691 handle_task.finish();
692 return;
693 }
694 };
695
696 let connected_event = match connected_stream.next().await {
697 Some(Ok((_kind, message))) => {
698 let Some(raw) = message.data().as_string() else {
699 client.emit_error(
700 &callback,
701 ForgeClientError::new(
702 "INVALID_SSE_PAYLOAD",
703 "SSE payload was not a string",
704 None,
705 ),
706 );
707 client.emit_connection(&callback, ConnectionState::Disconnected);
708 handle_task.finish();
709 return;
710 };
711 match parse_json_str::<ConnectedEvent>(&raw) {
712 Ok(event) => event,
713 Err(err) => {
714 client.emit_error(&callback, err);
715 client.emit_connection(&callback, ConnectionState::Disconnected);
716 handle_task.finish();
717 return;
718 }
719 }
720 }
721 Some(Err(err)) => {
722 client.emit_error(
723 &callback,
724 ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
725 );
726 client.emit_connection(&callback, ConnectionState::Disconnected);
727 handle_task.finish();
728 return;
729 }
730 None => {
731 client.emit_connection(&callback, ConnectionState::Disconnected);
732 handle_task.finish();
733 return;
734 }
735 };
736
737 if handle_task.is_closed() {
738 event_source.close();
739 handle_task.finish();
740 return;
741 }
742
743 let mut register_payload = payload;
744 let register_object = register_payload
745 .as_object_mut()
746 .expect("tracker payload must be an object");
747 register_object.insert(
748 "session_id".to_string(),
749 serde_json::Value::String(connected_event.session_id.unwrap_or_default()),
750 );
751 register_object.insert(
752 "session_secret".to_string(),
753 serde_json::Value::String(connected_event.session_secret.unwrap_or_default()),
754 );
755 register_object.insert(
756 "id".to_string(),
757 serde_json::Value::String(client.random_id(&prefix)),
758 );
759
760 if let Err(err) = request_json(
761 &client,
762 &format!("{}{}", client.inner.url, endpoint),
763 register_payload,
764 )
765 .await
766 {
767 client.emit_error(&callback, err);
768 client.emit_connection(&callback, ConnectionState::Disconnected);
769 handle_task.finish();
770 return;
771 }
772
773 client.emit_connection(&callback, ConnectionState::Connected);
774
775 let mut events = stream::select(update_stream, error_stream);
776 while !handle_task.is_closed() {
777 let Some(event) = events.next().await else {
778 break;
779 };
780
781 match event {
782 Ok((kind, message)) if kind == "update" => {
783 let Some(raw) = message.data().as_string() else {
784 client.emit_error(
785 &callback,
786 ForgeClientError::new(
787 "INVALID_SSE_PAYLOAD",
788 "SSE payload was not a string",
789 None,
790 ),
791 );
792 continue;
793 };
794 let envelope = match parse_json_str::<SseEnvelopeRaw>(&raw) {
795 Ok(value) => value,
796 Err(err) => {
797 client.emit_error(&callback, err);
798 continue;
799 }
800 };
801 if let Some(data) = envelope.payload {
802 let parsed = match serde_json::from_value::<TResult>(data) {
803 Ok(value) => value,
804 Err(err) => {
805 client.emit_error(
806 &callback,
807 ForgeClientError::new(
808 "INVALID_SSE_PAYLOAD",
809 err.to_string(),
810 None,
811 ),
812 );
813 continue;
814 }
815 };
816 (callback.borrow_mut())(StreamEvent::Data(parsed));
817 }
818 }
819 Ok((_kind, message)) => {
820 let Some(raw) = message.data().as_string() else {
821 client.emit_error(
822 &callback,
823 ForgeClientError::new(
824 "INVALID_SSE_PAYLOAD",
825 "SSE payload was not a string",
826 None,
827 ),
828 );
829 continue;
830 };
831 let envelope = match parse_json_str::<SseEnvelopeRaw>(&raw) {
832 Ok(value) => value,
833 Err(err) => {
834 client.emit_error(&callback, err);
835 continue;
836 }
837 };
838 emit_sse_error(&client, &callback, envelope);
839 }
840 Err(err) => {
841 client.emit_error(
842 &callback,
843 ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
844 );
845 break;
846 }
847 }
848 }
849
850 event_source.close();
851 client.emit_connection(&callback, ConnectionState::Disconnected);
852 handle_task.finish();
853 });
854
855 handle.set_task(task);
856 handle
857 }
858
859 fn events_url(client: &ForgeClient) -> String {
860 match client.get_token() {
861 Some(token) => format!(
862 "{}/_api/events?token={}",
863 client.inner.url,
864 encode_uri_component(&token)
865 ),
866 None => format!("{}/_api/events", client.inner.url),
867 }
868 }
869
870 fn request_error(err: gloo_net::Error) -> ForgeClientError {
871 ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
872 }
873}
874
875#[cfg(not(target_arch = "wasm32"))]
876mod platform {
877 use std::cell::RefCell;
878 use std::rc::Rc;
879
880 use dioxus::prelude::spawn;
881 use futures_util::StreamExt;
882 use reqwest::Client;
883 use reqwest_eventsource::{Event, EventSource};
884 use serde::Serialize;
885 use serde::de::DeserializeOwned;
886
887 use super::{ForgeClient, SubscriptionHandle, emit_sse_error, parse_json_str};
888 use crate::types::{
889 ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
890 StreamEvent,
891 };
892
893 pub(super) async fn request_json(
894 client: &ForgeClient,
895 url: &str,
896 body: serde_json::Value,
897 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
898 let mut request = Client::new().post(url).json(&body);
899 if let Some(token) = client.get_token() {
900 request = request.bearer_auth(token);
901 }
902
903 request
904 .send()
905 .await
906 .map_err(request_error)?
907 .json()
908 .await
909 .map_err(request_error)
910 }
911
912 pub(super) async fn request_multipart(
913 client: &ForgeClient,
914 url: &str,
915 form: reqwest::multipart::Form,
916 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
917 let mut request = Client::new().post(url).multipart(form);
918 if let Some(token) = client.get_token() {
919 request = request.bearer_auth(token);
920 }
921
922 request
923 .send()
924 .await
925 .map_err(request_error)?
926 .json()
927 .await
928 .map_err(request_error)
929 }
930
931 pub(super) fn subscribe_query<TArgs, TResult, F>(
932 client: ForgeClient,
933 function_name: String,
934 args: TArgs,
935 callback: F,
936 ) -> SubscriptionHandle
937 where
938 TArgs: Serialize + Clone + 'static,
939 TResult: DeserializeOwned + Clone + 'static,
940 F: FnMut(StreamEvent<TResult>) + 'static,
941 {
942 let handle = SubscriptionHandle::new();
943 let handle_task = handle.clone();
944 let callback = Rc::new(RefCell::new(callback));
945
946 let task = spawn(async move {
947 client.emit_connection(&callback, ConnectionState::Connecting);
948
949 let args_value = match serde_json::to_value(args) {
950 Ok(value) => value,
951 Err(err) => {
952 client.emit_error(
953 &callback,
954 ForgeClientError::new("SERIALIZATION_ERROR", err.to_string(), None),
955 );
956 client.emit_connection(&callback, ConnectionState::Disconnected);
957 handle_task.finish();
958 return;
959 }
960 };
961
962 let mut event_source = match open_event_source(&client) {
963 Ok(source) => source,
964 Err(err) => {
965 client.emit_error(&callback, err);
966 client.emit_connection(&callback, ConnectionState::Disconnected);
967 handle_task.finish();
968 return;
969 }
970 };
971
972 let connected_event =
973 match next_connected_event(&mut event_source, &client, &callback).await {
974 Ok(Some(event)) => event,
975 Ok(None) => {
976 client.emit_connection(&callback, ConnectionState::Disconnected);
977 handle_task.finish();
978 return;
979 }
980 Err(err) => {
981 client.emit_error(&callback, err);
982 client.emit_connection(&callback, ConnectionState::Disconnected);
983 handle_task.finish();
984 return;
985 }
986 };
987
988 if handle_task.is_closed() {
989 event_source.close();
990 handle_task.finish();
991 return;
992 }
993
994 let register_payload = serde_json::json!({
995 "session_id": connected_event.session_id,
996 "session_secret": connected_event.session_secret,
997 "id": client.random_id("sub"),
998 "function": function_name,
999 "args": args_value,
1000 });
1001
1002 match request_json(
1003 &client,
1004 &format!("{}/_api/subscribe", client.inner.url),
1005 register_payload,
1006 )
1007 .await
1008 {
1009 Ok(envelope) => match client.decode_envelope::<TResult>(envelope) {
1010 Ok(data) => {
1011 client.emit_connection(&callback, ConnectionState::Connected);
1012 (callback.borrow_mut())(StreamEvent::Data(data));
1013 }
1014 Err(err) => {
1015 client.emit_error(&callback, err);
1016 client.emit_connection(&callback, ConnectionState::Disconnected);
1017 handle_task.finish();
1018 return;
1019 }
1020 },
1021 Err(err) => {
1022 client.emit_error(&callback, err);
1023 client.emit_connection(&callback, ConnectionState::Disconnected);
1024 handle_task.finish();
1025 return;
1026 }
1027 }
1028
1029 while !handle_task.is_closed() {
1030 let Some(event) = event_source.next().await else {
1031 break;
1032 };
1033
1034 match event {
1035 Ok(Event::Open) => {}
1036 Ok(Event::Message(message)) if message.event == "update" => {
1037 let envelope = match parse_json_str::<SseEnvelopeRaw>(&message.data) {
1038 Ok(value) => value,
1039 Err(err) => {
1040 client.emit_error(&callback, err);
1041 continue;
1042 }
1043 };
1044 if let Some(data) = envelope.payload {
1045 let parsed = match serde_json::from_value::<TResult>(data) {
1046 Ok(value) => value,
1047 Err(err) => {
1048 client.emit_error(
1049 &callback,
1050 ForgeClientError::new(
1051 "INVALID_SSE_PAYLOAD",
1052 err.to_string(),
1053 None,
1054 ),
1055 );
1056 continue;
1057 }
1058 };
1059 (callback.borrow_mut())(StreamEvent::Data(parsed));
1060 }
1061 }
1062 Ok(Event::Message(message)) if message.event == "error" => {
1063 let envelope = match parse_json_str::<SseEnvelopeRaw>(&message.data) {
1064 Ok(value) => value,
1065 Err(err) => {
1066 client.emit_error(&callback, err);
1067 continue;
1068 }
1069 };
1070 emit_sse_error(&client, &callback, envelope);
1071 }
1072 Ok(Event::Message(_)) => {}
1073 Err(err) => {
1074 client.emit_error(
1075 &callback,
1076 ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
1077 );
1078 break;
1079 }
1080 }
1081 }
1082
1083 event_source.close();
1084 client.emit_connection(&callback, ConnectionState::Disconnected);
1085 handle_task.finish();
1086 });
1087
1088 handle.set_task(task);
1089 handle
1090 }
1091
1092 pub(super) fn subscribe_tracker<TResult, F>(
1093 client: ForgeClient,
1094 prefix: String,
1095 payload: serde_json::Value,
1096 endpoint: String,
1097 callback: F,
1098 ) -> SubscriptionHandle
1099 where
1100 TResult: DeserializeOwned + Clone + 'static,
1101 F: FnMut(StreamEvent<TResult>) + 'static,
1102 {
1103 let handle = SubscriptionHandle::new();
1104 let handle_task = handle.clone();
1105 let callback = Rc::new(RefCell::new(callback));
1106
1107 let task = spawn(async move {
1108 client.emit_connection(&callback, ConnectionState::Connecting);
1109
1110 let mut event_source = match open_event_source(&client) {
1111 Ok(source) => source,
1112 Err(err) => {
1113 client.emit_error(&callback, err);
1114 client.emit_connection(&callback, ConnectionState::Disconnected);
1115 handle_task.finish();
1116 return;
1117 }
1118 };
1119
1120 let connected_event =
1121 match next_connected_event(&mut event_source, &client, &callback).await {
1122 Ok(Some(event)) => event,
1123 Ok(None) => {
1124 client.emit_connection(&callback, ConnectionState::Disconnected);
1125 handle_task.finish();
1126 return;
1127 }
1128 Err(err) => {
1129 client.emit_error(&callback, err);
1130 client.emit_connection(&callback, ConnectionState::Disconnected);
1131 handle_task.finish();
1132 return;
1133 }
1134 };
1135
1136 if handle_task.is_closed() {
1137 event_source.close();
1138 handle_task.finish();
1139 return;
1140 }
1141
1142 let mut register_payload = payload;
1143 let register_object = register_payload
1144 .as_object_mut()
1145 .expect("tracker payload must be an object");
1146 register_object.insert(
1147 "session_id".to_string(),
1148 serde_json::Value::String(connected_event.session_id.unwrap_or_default()),
1149 );
1150 register_object.insert(
1151 "session_secret".to_string(),
1152 serde_json::Value::String(connected_event.session_secret.unwrap_or_default()),
1153 );
1154 register_object.insert(
1155 "id".to_string(),
1156 serde_json::Value::String(client.random_id(&prefix)),
1157 );
1158
1159 if let Err(err) = request_json(
1160 &client,
1161 &format!("{}{}", client.inner.url, endpoint),
1162 register_payload,
1163 )
1164 .await
1165 {
1166 client.emit_error(&callback, err);
1167 client.emit_connection(&callback, ConnectionState::Disconnected);
1168 handle_task.finish();
1169 return;
1170 }
1171
1172 client.emit_connection(&callback, ConnectionState::Connected);
1173
1174 while !handle_task.is_closed() {
1175 let Some(event) = event_source.next().await else {
1176 break;
1177 };
1178
1179 match event {
1180 Ok(Event::Open) => {}
1181 Ok(Event::Message(message)) if message.event == "update" => {
1182 let envelope = match parse_json_str::<SseEnvelopeRaw>(&message.data) {
1183 Ok(value) => value,
1184 Err(err) => {
1185 client.emit_error(&callback, err);
1186 continue;
1187 }
1188 };
1189 if let Some(data) = envelope.payload {
1190 let parsed = match serde_json::from_value::<TResult>(data) {
1191 Ok(value) => value,
1192 Err(err) => {
1193 client.emit_error(
1194 &callback,
1195 ForgeClientError::new(
1196 "INVALID_SSE_PAYLOAD",
1197 err.to_string(),
1198 None,
1199 ),
1200 );
1201 continue;
1202 }
1203 };
1204 (callback.borrow_mut())(StreamEvent::Data(parsed));
1205 }
1206 }
1207 Ok(Event::Message(message)) if message.event == "error" => {
1208 let envelope = match parse_json_str::<SseEnvelopeRaw>(&message.data) {
1209 Ok(value) => value,
1210 Err(err) => {
1211 client.emit_error(&callback, err);
1212 continue;
1213 }
1214 };
1215 emit_sse_error(&client, &callback, envelope);
1216 }
1217 Ok(Event::Message(_)) => {}
1218 Err(err) => {
1219 client.emit_error(
1220 &callback,
1221 ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None),
1222 );
1223 break;
1224 }
1225 }
1226 }
1227
1228 event_source.close();
1229 client.emit_connection(&callback, ConnectionState::Disconnected);
1230 handle_task.finish();
1231 });
1232
1233 handle.set_task(task);
1234 handle
1235 }
1236
1237 fn open_event_source(client: &ForgeClient) -> Result<EventSource, ForgeClientError> {
1238 let mut request = Client::new().get(format!("{}/_api/events", client.inner.url));
1239 if let Some(token) = client.get_token() {
1240 request = request.bearer_auth(token);
1241 }
1242
1243 EventSource::new(request)
1244 .map_err(|err| ForgeClientError::new("SSE_CONNECTION_FAILED", err.to_string(), None))
1245 }
1246
1247 async fn next_connected_event<TValue, T>(
1248 event_source: &mut EventSource,
1249 client: &ForgeClient,
1250 callback: &Rc<RefCell<T>>,
1251 ) -> Result<Option<ConnectedEvent>, ForgeClientError>
1252 where
1253 T: FnMut(StreamEvent<TValue>),
1254 {
1255 while let Some(event) = event_source.next().await {
1256 match event {
1257 Ok(Event::Open) => continue,
1258 Ok(Event::Message(message)) if message.event == "connected" => {
1259 return parse_json_str::<ConnectedEvent>(&message.data).map(Some);
1260 }
1261 Ok(Event::Message(message)) if message.event == "error" => {
1262 let envelope = parse_json_str::<SseEnvelopeRaw>(&message.data)?;
1263 emit_sse_error(client, callback, envelope);
1264 }
1265 Ok(Event::Message(_)) => {}
1266 Err(err) => {
1267 return Err(ForgeClientError::new(
1268 "SSE_CONNECTION_FAILED",
1269 err.to_string(),
1270 None,
1271 ));
1272 }
1273 }
1274 }
1275
1276 Ok(None)
1277 }
1278
1279 fn request_error(err: reqwest::Error) -> ForgeClientError {
1280 ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
1281 }
1282}