1
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4use std::rc::Rc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8use dioxus::prelude::{Signal, WritableExt, dioxus_core::Task};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11
12use crate::signals::ForgeSignals;
13use crate::types::{
14 ConnectionState, ForgeClientError, ForgeError, RpcEnvelopeRaw, StreamEvent,
15};
16
17type TokenProvider = Rc<dyn Fn() -> Option<String>>;
18type AuthErrorHandler = Rc<dyn Fn(ForgeError)>;
19type MutationErrorHandler = Rc<dyn Fn(ForgeClientError)>;
20type EventSender = futures_channel::mpsc::UnboundedSender<SseDispatch>;
21type ConnectWaiter = futures_channel::oneshot::Sender<Result<(), ForgeClientError>>;
22
23static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
24
25enum SseDispatch {
26 Data(serde_json::Value),
27 Error { code: String, message: String },
28}
29
30struct RegistrationMeta {
31 endpoint: &'static str,
32 payload: serde_json::Value,
33}
34
35#[derive(Default, Clone, Copy, PartialEq, Eq)]
36enum SseState {
37 #[default]
38 Idle,
39 Connecting,
40 Connected,
41}
42
43#[derive(Default)]
45struct SseManager {
46 session_id: Option<String>,
47 session_secret: Option<String>,
48 state: SseState,
49 ever_connected: bool,
50 listeners: HashMap<String, EventSender>,
51 registrations: HashMap<String, RegistrationMeta>,
52 event_loop_task: Option<Task>,
53 reconnect_attempts: u32,
54 connect_waiters: Vec<ConnectWaiter>,
55}
56
57const MAX_RECONNECT_ATTEMPTS: u32 = 10;
58
59#[derive(Clone)]
60pub struct ForgeClientConfig {
61 pub url: String,
62 pub get_token: Option<TokenProvider>,
63 pub on_auth_error: Option<AuthErrorHandler>,
64 pub on_mutation_error: Option<MutationErrorHandler>,
65 pub(crate) connection_state: Option<Signal<ConnectionState>>,
66}
67
68impl ForgeClientConfig {
69 pub fn new(url: impl Into<String>) -> Self {
70 Self {
71 url: url.into(),
72 get_token: None,
73 on_auth_error: None,
74 on_mutation_error: None,
75 connection_state: None,
76 }
77 }
78
79 pub fn with_token_provider(mut self, provider: impl Fn() -> Option<String> + 'static) -> Self {
80 self.get_token = Some(Rc::new(provider));
81 self
82 }
83
84 pub fn with_auth_error_handler(
85 mut self,
86 handler: impl Fn(ForgeError) + 'static,
87 ) -> Self {
88 self.on_auth_error = Some(Rc::new(handler));
89 self
90 }
91
92 pub fn with_mutation_error_handler(
94 mut self,
95 handler: impl Fn(ForgeClientError) + 'static,
96 ) -> Self {
97 self.on_mutation_error = Some(Rc::new(handler));
98 self
99 }
100
101 pub(crate) fn with_connection_state(mut self, state: Signal<ConnectionState>) -> Self {
102 self.connection_state = Some(state);
103 self
104 }
105}
106
107#[derive(Clone)]
108pub struct ForgeClient {
109 inner: Rc<ForgeClientInner>,
110}
111
112struct ForgeClientInner {
113 url: String,
114 get_token: Option<TokenProvider>,
115 on_auth_error: Option<AuthErrorHandler>,
116 on_mutation_error: Option<MutationErrorHandler>,
117 connection_state: Option<Signal<ConnectionState>>,
118 sse: RefCell<SseManager>,
119 signals: RefCell<Option<ForgeSignals>>,
120}
121
122impl ForgeClient {
123 pub fn new(config: ForgeClientConfig) -> Self {
124 Self {
125 inner: Rc::new(ForgeClientInner {
126 url: config.url.trim_end_matches('/').to_string(),
127 get_token: config.get_token,
128 on_auth_error: config.on_auth_error,
129 on_mutation_error: config.on_mutation_error,
130 connection_state: config.connection_state,
131 sse: RefCell::new(SseManager::default()),
132 signals: RefCell::new(None),
133 }),
134 }
135 }
136
137 pub fn set_signals(&self, signals: ForgeSignals) {
139 *self.inner.signals.borrow_mut() = Some(signals);
140 }
141
142 pub fn get_url(&self) -> &str {
144 &self.inner.url
145 }
146
147 pub fn notify_mutation_error(&self, error: ForgeClientError) {
150 if let Some(handler) = &self.inner.on_mutation_error {
151 handler(error);
152 }
153 }
154
155 fn correlation_id(&self) -> Option<String> {
157 self.inner.signals.borrow().as_ref().map(|s| s.next_correlation_id())
158 }
159
160 pub async fn call<TArgs, TResult>(
161 &self,
162 function_name: &str,
163 args: TArgs,
164 ) -> Result<TResult, ForgeClientError>
165 where
166 TArgs: Serialize,
167 TResult: DeserializeOwned,
168 {
169 let body = serde_json::json!({ "args": args });
170 let correlation_id = self.correlation_id();
171 let envelope = platform::request_json(
172 self,
173 &format!("{}/_api/rpc/{}", self.inner.url, function_name),
174 body,
175 correlation_id.as_deref(),
176 )
177 .await?;
178 self.decode_envelope(envelope)
179 }
180
181 #[cfg(target_arch = "wasm32")]
182 pub async fn call_multipart<TResult>(
183 &self,
184 function_name: &str,
185 form: web_sys::FormData,
186 ) -> Result<TResult, ForgeClientError>
187 where
188 TResult: DeserializeOwned,
189 {
190 let correlation_id = self.correlation_id();
191 let envelope = platform::request_multipart(
192 self,
193 &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
194 form,
195 correlation_id.as_deref(),
196 )
197 .await?;
198 self.decode_envelope(envelope)
199 }
200
201 #[cfg(not(target_arch = "wasm32"))]
202 pub async fn call_multipart<TResult>(
203 &self,
204 function_name: &str,
205 form: reqwest::multipart::Form,
206 ) -> Result<TResult, ForgeClientError>
207 where
208 TResult: DeserializeOwned,
209 {
210 let correlation_id = self.correlation_id();
211 let envelope = platform::request_multipart(
212 self,
213 &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
214 form,
215 correlation_id.as_deref(),
216 )
217 .await?;
218 self.decode_envelope(envelope)
219 }
220
221 pub fn subscribe_query<TArgs, TResult, F>(
222 &self,
223 function_name: &str,
224 args: TArgs,
225 callback: F,
226 ) -> SubscriptionHandle
227 where
228 TArgs: Serialize + Clone + 'static,
229 TResult: DeserializeOwned + Clone + 'static,
230 F: FnMut(StreamEvent<TResult>) + 'static,
231 {
232 let sub_id = self.random_id("sub");
233 let target = format!("sub:{sub_id}");
234
235 let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
236 self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
237
238 let args_value = serde_json::to_value(&args).unwrap_or(serde_json::Value::Null);
239 let reg_payload = serde_json::json!({
240 "id": sub_id,
241 "function": function_name,
242 "args": args_value,
243 });
244 self.inner.sse.borrow_mut().registrations.insert(
245 sub_id.clone(),
246 RegistrationMeta {
247 endpoint: "/_api/subscribe",
248 payload: reg_payload,
249 },
250 );
251
252 self.spawn_subscription(sub_id, target, rx, callback, |client, envelope, cb| {
253 match client.decode_envelope::<TResult>(envelope) {
254 Ok(data) => cb(StreamEvent::Data(data)),
255 Err(err) => cb(StreamEvent::Error(err)),
256 }
257 })
258 }
259
260 pub fn subscribe_job<TResult, F>(&self, job_id: String, callback: F) -> SubscriptionHandle
261 where
262 TResult: DeserializeOwned + Clone + 'static,
263 F: FnMut(StreamEvent<TResult>) + 'static,
264 {
265 self.subscribe_tracker("job", serde_json::json!({ "job_id": job_id }), "/_api/subscribe-job", callback)
266 }
267
268 pub fn subscribe_workflow<TResult, F>(
269 &self,
270 workflow_id: String,
271 callback: F,
272 ) -> SubscriptionHandle
273 where
274 TResult: DeserializeOwned + Clone + 'static,
275 F: FnMut(StreamEvent<TResult>) + 'static,
276 {
277 self.subscribe_tracker(
278 "wf",
279 serde_json::json!({ "workflow_id": workflow_id }),
280 "/_api/subscribe-workflow",
281 callback,
282 )
283 }
284
285 fn subscribe_tracker<TResult, F>(
286 &self,
287 prefix: &str,
288 payload: serde_json::Value,
289 endpoint: &'static str,
290 callback: F,
291 ) -> SubscriptionHandle
292 where
293 TResult: DeserializeOwned + Clone + 'static,
294 F: FnMut(StreamEvent<TResult>) + 'static,
295 {
296 let sub_id = self.random_id(prefix);
297 let target = format!("{prefix}:{sub_id}");
298
299 let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
300 self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
301
302 let mut reg_payload = payload;
303 reg_payload
304 .as_object_mut()
305 .expect("tracker payload must be an object")
306 .insert("id".to_string(), serde_json::Value::String(sub_id.clone()));
307 self.inner.sse.borrow_mut().registrations.insert(
308 sub_id.clone(),
309 RegistrationMeta {
310 endpoint,
311 payload: reg_payload,
312 },
313 );
314
315 self.spawn_subscription(sub_id, target, rx, callback, |_client, envelope, cb| {
316 if envelope.success {
317 if let Some(data) = envelope.data {
318 match serde_json::from_value::<TResult>(data) {
319 Ok(parsed) => cb(StreamEvent::Data(parsed)),
320 Err(e) => cb(StreamEvent::Error(ForgeClientError::new(
321 "DESERIALIZATION_ERROR",
322 e.to_string(),
323 None,
324 ))),
325 }
326 }
327 }
328 })
329 }
330
331 fn spawn_subscription<TResult, F>(
332 &self,
333 sub_id: String,
334 target: String,
335 mut rx: futures_channel::mpsc::UnboundedReceiver<SseDispatch>,
336 mut callback: F,
337 on_initial: impl FnOnce(&ForgeClient, RpcEnvelopeRaw, &mut F) + 'static,
338 ) -> SubscriptionHandle
339 where
340 TResult: DeserializeOwned + Clone + 'static,
341 F: FnMut(StreamEvent<TResult>) + 'static,
342 {
343 let client = self.clone();
344 let handle = SubscriptionHandle::new(sub_id.clone(), target, self.clone());
345 let handle_task = handle.clone();
346
347 let task = dioxus::prelude::spawn(async move {
348 callback(StreamEvent::Connection(ConnectionState::Connecting));
349
350 if let Err(e) = client.ensure_connected().await {
351 callback(StreamEvent::Error(e));
352 callback(StreamEvent::Connection(ConnectionState::Disconnected));
353 handle_task.finish();
354 return;
355 }
356
357 match client.register_subscription(&sub_id).await {
358 Ok(envelope) => {
359 callback(StreamEvent::Connection(ConnectionState::Connected));
360 on_initial(&client, envelope, &mut callback);
361 }
362 Err(err) => {
363 callback(StreamEvent::Error(err));
364 callback(StreamEvent::Connection(ConnectionState::Disconnected));
365 handle_task.finish();
366 return;
367 }
368 }
369
370 while let Some(event) = futures_util::StreamExt::next(&mut rx).await {
371 Self::deliver_event::<TResult, F>(&mut callback, &client, event);
372 }
373
374 handle_task.finish();
375 });
376
377 handle.set_task(task);
378 handle
379 }
380
381 fn deliver_event<TResult, F>(
382 callback: &mut F,
383 client: &ForgeClient,
384 event: SseDispatch,
385 ) where
386 TResult: DeserializeOwned,
387 F: FnMut(StreamEvent<TResult>),
388 {
389 match event {
390 SseDispatch::Data(value) => match serde_json::from_value::<TResult>(value) {
391 Ok(data) => callback(StreamEvent::Data(data)),
392 Err(e) => {
393 callback(StreamEvent::Error(ForgeClientError::new(
394 "DESERIALIZATION_ERROR",
395 e.to_string(),
396 None,
397 )));
398 }
399 },
400 SseDispatch::Error { code, message } => {
401 let err = ForgeClientError::new(&code, &message, None);
402 if code == "UNAUTHORIZED" {
403 if let Some(handler) = &client.inner.on_auth_error {
404 handler(err.as_forge_error());
405 }
406 }
407 callback(StreamEvent::Error(err));
408 }
409 }
410 }
411
412 async fn ensure_connected(&self) -> Result<(), ForgeClientError> {
414 let rx = {
415 let mut sse = self.inner.sse.borrow_mut();
416 if sse.state == SseState::Connected {
417 return Ok(());
418 }
419
420 let (tx, rx) = futures_channel::oneshot::channel();
421 sse.connect_waiters.push(tx);
422
423 if sse.state == SseState::Idle {
424 sse.state = SseState::Connecting;
425 drop(sse);
426 platform::start_event_loop(self.clone());
427 }
428
429 rx
430 };
431
432 rx.await.unwrap_or_else(|_| {
433 Err(ForgeClientError::new(
434 "SSE_CONNECTION_FAILED",
435 "Connection attempt cancelled",
436 None,
437 ))
438 })
439 }
440
441 async fn register_subscription(
444 &self,
445 sub_id: &str,
446 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
447 let envelope = self.try_register_subscription(sub_id).await?;
448
449 let needs_retry = !envelope.success
451 && envelope
452 .error
453 .as_ref()
454 .is_some_and(|e| e.code == "SESSION_NOT_FOUND" || e.code == "SESSION_PRINCIPAL_MISMATCH");
455
456 if needs_retry {
457 self.force_reconnect().await;
458 self.ensure_connected().await?;
459 let retried = self.try_register_subscription(sub_id).await?;
460 self.notify_auth_error_if_needed(&retried);
461 return Ok(retried);
462 }
463
464 self.notify_auth_error_if_needed(&envelope);
465 Ok(envelope)
466 }
467
468 fn notify_auth_error_if_needed(&self, envelope: &RpcEnvelopeRaw) {
469 if let Some(err) = envelope.error.as_ref().filter(|_| !envelope.success) {
470 if (err.code == "UNAUTHORIZED" || err.code == "FORBIDDEN")
471 && let Some(handler) = &self.inner.on_auth_error
472 {
473 handler(err.clone());
474 }
475 }
476 }
477
478 async fn try_register_subscription(
479 &self,
480 sub_id: &str,
481 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
482 let (endpoint, payload) = {
483 let sse = self.inner.sse.borrow();
484 let meta = sse
485 .registrations
486 .get(sub_id)
487 .ok_or_else(|| {
488 ForgeClientError::new("INTERNAL_ERROR", "Registration metadata not found", None)
489 })?;
490 let session_id = sse.session_id.clone().unwrap_or_default();
491 let session_secret = sse.session_secret.clone().unwrap_or_default();
492 let mut payload = meta.payload.clone();
493 let obj = payload
494 .as_object_mut()
495 .expect("registration payload must be an object");
496 obj.insert("session_id".into(), serde_json::Value::String(session_id));
497 obj.insert("session_secret".into(), serde_json::Value::String(session_secret));
498 (meta.endpoint, payload)
499 };
500
501 let url = format!("{}{}", self.inner.url, endpoint);
502 platform::request_json(self, &url, payload, None).await
503 }
504
505 async fn force_reconnect(&self) {
506 let task = {
507 let mut sse = self.inner.sse.borrow_mut();
508 sse.session_id = None;
509 sse.session_secret = None;
510 sse.state = SseState::Idle;
511 sse.event_loop_task.take()
514 };
515 if let Some(task) = task {
516 task.cancel();
517 }
518 sleep(Duration::from_millis(10)).await;
519 }
520
521 pub fn reconnect_sse(&self) {
528 let has_listeners = {
529 let mut sse = self.inner.sse.borrow_mut();
530 if sse.state == SseState::Idle && sse.event_loop_task.is_none() && sse.listeners.is_empty() {
531 return;
532 }
533 if sse.state == SseState::Connecting && sse.event_loop_task.is_some() {
535 return;
536 }
537 if let Some(task) = sse.event_loop_task.take() {
538 task.cancel();
539 }
540 sse.session_id = None;
541 sse.session_secret = None;
542 sse.reconnect_attempts = 0;
543 let has_listeners = !sse.listeners.is_empty();
544 sse.state = if has_listeners {
545 SseState::Connecting
546 } else {
547 SseState::Idle
548 };
549 has_listeners
550 };
551 if has_listeners {
552 platform::start_event_loop(self.clone());
553 }
554 }
555
556 async fn reregister_all(&self) {
557 let sub_ids: Vec<String> = {
558 let sse = self.inner.sse.borrow();
559 sse.registrations.keys().cloned().collect()
560 };
561
562 for sub_id in sub_ids {
563 let _ = self.register_subscription(&sub_id).await;
564 }
565 }
566
567 fn dispatch_event(&self, target: &str, event: SseDispatch) {
568 let tx = {
569 let sse = self.inner.sse.borrow();
570 sse.listeners.get(target).cloned()
571 };
572 if let Some(tx) = tx {
573 let _ = tx.unbounded_send(event);
574 }
575 }
576
577 fn broadcast_connection(&self, state: ConnectionState) {
578 if let Some(mut signal) = self.inner.connection_state {
579 signal.set(state);
580 }
581 }
582
583 fn mark_connected(&self, session_id: String, session_secret: String) -> bool {
584 let mut sse = self.inner.sse.borrow_mut();
585 let is_reconnect = sse.ever_connected;
586 sse.session_id = Some(session_id);
587 sse.session_secret = Some(session_secret);
588 sse.state = SseState::Connected;
589 sse.reconnect_attempts = 0;
590 sse.ever_connected = true;
591 for waiter in sse.connect_waiters.drain(..) {
592 let _ = waiter.send(Ok(()));
593 }
594 is_reconnect
595 }
596
597 fn mark_disconnected(&self) {
598 let mut sse = self.inner.sse.borrow_mut();
599 sse.session_id = None;
600 sse.session_secret = None;
601 sse.state = SseState::Idle;
602 sse.event_loop_task = None;
603 let err = || ForgeClientError::new("SSE_CONNECTION_FAILED", "SSE connection lost", None);
604 for waiter in sse.connect_waiters.drain(..) {
605 let _ = waiter.send(Err(err()));
606 }
607 }
608
609 fn should_reconnect(&self) -> Option<u32> {
610 let mut sse = self.inner.sse.borrow_mut();
611 if sse.listeners.is_empty() {
612 return None;
613 }
614 let attempts = sse.reconnect_attempts;
615 if attempts >= MAX_RECONNECT_ATTEMPTS {
616 return None;
617 }
618 sse.reconnect_attempts = attempts + 1;
619 Some(attempts)
620 }
621
622 fn get_token(&self) -> Option<String> {
623 self.inner
624 .get_token
625 .as_ref()
626 .and_then(|provider| provider())
627 .filter(|t| !t.is_empty())
628 }
629
630 fn decode_envelope<TResult>(
631 &self,
632 envelope: RpcEnvelopeRaw,
633 ) -> Result<TResult, ForgeClientError>
634 where
635 TResult: DeserializeOwned,
636 {
637 if !envelope.success {
638 let error = envelope.error.unwrap_or(ForgeError {
639 code: "UNKNOWN".to_string(),
640 message: "Unknown error".to_string(),
641 details: None,
642 });
643 if error.code == "UNAUTHORIZED" || error.code == "FORBIDDEN" {
644 if let Some(handler) = &self.inner.on_auth_error {
645 handler(error.clone());
646 }
647 }
648 return Err(ForgeClientError::new(error.code, error.message, error.details));
649 }
650
651 let data = envelope.data.ok_or_else(|| {
652 ForgeClientError::new("EMPTY_RESPONSE", "Server returned no data", None)
653 })?;
654 serde_json::from_value(data)
655 .map_err(|err| ForgeClientError::new("DESERIALIZATION_ERROR", err.to_string(), None))
656 }
657
658 fn random_id(&self, prefix: &str) -> String {
659 let id = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
660 format!("{prefix}-{id}")
661 }
662}
663
664async fn sleep(duration: Duration) {
665 #[cfg(target_arch = "wasm32")]
666 {
667 gloo_timers::future::sleep(duration).await;
668 }
669
670 #[cfg(not(target_arch = "wasm32"))]
671 {
672 tokio::time::sleep(duration).await;
673 }
674}
675
676#[derive(Clone)]
677pub struct SubscriptionHandle {
678 closed: Rc<Cell<bool>>,
679 task: Rc<RefCell<Option<Task>>>,
680 cleanup: Rc<RefCell<Option<Box<dyn FnOnce()>>>>,
681}
682
683impl SubscriptionHandle {
684 fn new(sub_id: String, target: String, client: ForgeClient) -> Self {
685 let cleanup: Box<dyn FnOnce()> = Box::new(move || {
686 let mut sse = client.inner.sse.borrow_mut();
687 sse.listeners.remove(&target);
688 sse.registrations.remove(&sub_id);
689 if target.starts_with("sub:") {
691 let session_id = sse.session_id.clone();
692 let session_secret = sse.session_secret.clone();
693 drop(sse);
694 if let (Some(sid), Some(ss)) = (session_id, session_secret) {
695 let url = format!("{}/_api/unsubscribe", client.inner.url);
696 let payload = serde_json::json!({
697 "session_id": sid,
698 "session_secret": ss,
699 "id": sub_id,
700 });
701 let client = client.clone();
702 dioxus::prelude::spawn(async move {
703 let _ = platform::request_json(&client, &url, payload, None).await;
704 });
705 }
706 }
707 });
708
709 Self {
710 closed: Rc::new(Cell::new(false)),
711 task: Rc::new(RefCell::new(None)),
712 cleanup: Rc::new(RefCell::new(Some(cleanup))),
713 }
714 }
715
716 fn set_task(&self, task: Task) {
717 *self.task.borrow_mut() = Some(task);
718 }
719
720 pub(crate) fn finish(&self) {
721 if self.closed.replace(true) {
722 return;
723 }
724 if let Some(cleanup) = self.cleanup.borrow_mut().take() {
725 cleanup();
726 }
727 self.task.borrow_mut().take();
728 }
729
730 pub fn close(&self) {
731 let task = { self.task.borrow_mut().clone() };
732 self.finish();
733 if let Some(task) = task {
734 task.cancel();
735 }
736 }
737
738 pub fn is_closed(&self) -> bool {
739 self.closed.get()
740 }
741}
742
743impl Drop for SubscriptionHandle {
744 fn drop(&mut self) {
745 self.close();
746 }
747}
748
749#[cfg(target_arch = "wasm32")]
752mod platform {
753 use dioxus::prelude::spawn;
754 use futures_util::{StreamExt, stream};
755 use gloo_net::eventsource::futures::EventSource;
756 use gloo_net::http::Request;
757 use js_sys::{JSON, encode_uri_component};
758
759 use super::{ForgeClient, SseDispatch, sleep};
760 use crate::signals::platform_tag;
761 use crate::types::{
762 ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
763 };
764
765 pub(super) async fn request_json(
766 client: &ForgeClient,
767 url: &str,
768 body: serde_json::Value,
769 correlation_id: Option<&str>,
770 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
771 let mut request = Request::post(url)
772 .header("Content-Type", "application/json")
773 .header("x-forge-platform", platform_tag())
774 .credentials(web_sys::RequestCredentials::Include);
775 if let Some(token) = client.get_token() {
776 request = request.header("Authorization", &format!("Bearer {token}"));
777 }
778 if let Some(cid) = correlation_id {
779 request = request.header("x-correlation-id", cid);
780 }
781
782 let request = request.body(body.to_string()).map_err(request_error)?;
783 request
784 .send()
785 .await
786 .map_err(request_error)?
787 .json()
788 .await
789 .map_err(request_error)
790 }
791
792 pub(super) async fn request_multipart(
793 client: &ForgeClient,
794 url: &str,
795 form: web_sys::FormData,
796 correlation_id: Option<&str>,
797 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
798 let mut request = Request::post(url)
799 .header("x-forge-platform", platform_tag())
800 .credentials(web_sys::RequestCredentials::Include);
801 if let Some(token) = client.get_token() {
802 request = request.header("Authorization", &format!("Bearer {token}"));
803 }
804 if let Some(cid) = correlation_id {
805 request = request.header("x-correlation-id", cid);
806 }
807
808 let response = request.body(form).map_err(request_error)?;
809 response
810 .send()
811 .await
812 .map_err(request_error)?
813 .json()
814 .await
815 .map_err(request_error)
816 }
817
818 fn message_data_as_string(message: &web_sys::MessageEvent) -> Option<String> {
819 let data = message.data();
820 data.as_string().or_else(|| {
821 JSON::stringify(&data)
822 .ok()
823 .and_then(|value| value.as_string())
824 .map(|raw| serde_json::from_str::<String>(&raw).unwrap_or(raw))
825 })
826 }
827
828 fn events_url(client: &ForgeClient) -> String {
829 match client.get_token() {
830 Some(token) => format!(
831 "{}/_api/events?token={}",
832 client.inner.url,
833 encode_uri_component(&token)
834 ),
835 None => format!("{}/_api/events", client.inner.url),
836 }
837 }
838
839 pub(super) fn start_event_loop(client: ForgeClient) {
841 let client_for_task = client.clone();
842 let task = spawn(async move {
843 let was_connected = run_event_loop(&client_for_task).await;
844
845 client_for_task.mark_disconnected();
846 client_for_task.broadcast_connection(ConnectionState::Disconnected);
847
848 if !was_connected && client_for_task.get_token().is_some() {
852 if let Some(handler) = &client_for_task.inner.on_auth_error {
853 handler(crate::types::ForgeError {
854 code: "UNAUTHORIZED".into(),
855 message: "SSE authentication failed".into(),
856 details: None,
857 });
858 }
859 return;
860 }
861
862 if let Some(attempts) = client_for_task.should_reconnect() {
863 let delay = 1000 * (1u64 << attempts.min(4));
864 let jitter = (js_sys::Math::random() * 500.0) as u64;
865 sleep(std::time::Duration::from_millis(delay + jitter)).await;
866
867 client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
868 start_event_loop(client_for_task);
869 }
870 });
871
872 client.inner.sse.borrow_mut().event_loop_task = Some(task);
873 }
874
875 async fn run_event_loop(client: &ForgeClient) -> bool {
877 let mut event_source = match EventSource::new(&events_url(client)) {
878 Ok(source) => source,
879 Err(_) => {
880 return false;
881 }
882 };
883
884 let mut connected_stream = match event_source.subscribe("connected") {
885 Ok(stream) => stream,
886 Err(_) => return false,
887 };
888 let update_stream = match event_source.subscribe("update") {
889 Ok(stream) => stream,
890 Err(_) => return false,
891 };
892 let error_stream = match event_source.subscribe("error") {
893 Ok(stream) => stream,
894 Err(_) => return false,
895 };
896
897 let connected_event = match connected_stream.next().await {
899 Some(Ok((_kind, message))) => {
900 let Some(raw) = message_data_as_string(&message) else {
901 return false;
902 };
903 match serde_json::from_str::<ConnectedEvent>(&raw) {
904 Ok(event) => event,
905 Err(_) => return false,
906 }
907 }
908 _ => return false,
909 };
910
911 let session_id = connected_event.session_id.unwrap_or_default();
912 let session_secret = connected_event.session_secret.unwrap_or_default();
913
914 if session_id.is_empty() || session_secret.is_empty() {
915 return false;
916 }
917
918 let is_reconnect = client.mark_connected(session_id, session_secret);
919 client.broadcast_connection(ConnectionState::Connected);
920
921 if is_reconnect {
923 client.reregister_all().await;
924 }
925
926 let mut events = stream::select(update_stream, error_stream);
927 while let Some(event) = events.next().await {
928 match event {
929 Ok((kind, message)) => {
930 let Some(raw) = message_data_as_string(&message) else {
931 continue;
932 };
933 let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&raw) else {
934 continue;
935 };
936
937 let Some(target) = envelope.target else {
938 continue;
939 };
940
941 if kind == "update" {
942 if let Some(payload) = envelope.payload {
943 client.dispatch_event(&target, SseDispatch::Data(payload));
944 }
945 } else {
946 let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
947 let message = envelope.message.unwrap_or_else(|| "Subscription error".to_string());
948 client.dispatch_event(&target, SseDispatch::Error { code, message });
949 }
950 }
951 Err(_) => break,
952 }
953 }
954
955 event_source.close();
956 true
957 }
958
959 fn request_error(err: gloo_net::Error) -> ForgeClientError {
960 ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
961 }
962}
963
964#[cfg(not(target_arch = "wasm32"))]
967mod platform {
968 use dioxus::prelude::spawn;
969 use futures_util::StreamExt;
970 use reqwest::Client;
971 use reqwest_eventsource::{Event, EventSource};
972
973 use super::{ForgeClient, SseDispatch, sleep};
974 use crate::signals::platform_tag;
975 use crate::types::{
976 ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
977 };
978
979 pub(super) async fn request_json(
980 client: &ForgeClient,
981 url: &str,
982 body: serde_json::Value,
983 correlation_id: Option<&str>,
984 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
985 let mut request = Client::new()
986 .post(url)
987 .header("x-forge-platform", platform_tag())
988 .json(&body);
989 if let Some(token) = client.get_token() {
990 request = request.bearer_auth(token);
991 }
992 if let Some(cid) = correlation_id {
993 request = request.header("x-correlation-id", cid);
994 }
995
996 request
997 .send()
998 .await
999 .map_err(request_error)?
1000 .json()
1001 .await
1002 .map_err(request_error)
1003 }
1004
1005 pub(super) async fn request_multipart(
1006 client: &ForgeClient,
1007 url: &str,
1008 form: reqwest::multipart::Form,
1009 correlation_id: Option<&str>,
1010 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
1011 let mut request = Client::new()
1012 .post(url)
1013 .header("x-forge-platform", platform_tag())
1014 .multipart(form);
1015 if let Some(token) = client.get_token() {
1016 request = request.bearer_auth(token);
1017 }
1018 if let Some(cid) = correlation_id {
1019 request = request.header("x-correlation-id", cid);
1020 }
1021
1022 request
1023 .send()
1024 .await
1025 .map_err(request_error)?
1026 .json()
1027 .await
1028 .map_err(request_error)
1029 }
1030
1031 pub(super) fn start_event_loop(client: ForgeClient) {
1033 let client_for_task = client.clone();
1034 let task = spawn(async move {
1035 let was_connected = run_event_loop(&client_for_task).await;
1036
1037 client_for_task.mark_disconnected();
1038 client_for_task.broadcast_connection(ConnectionState::Disconnected);
1039
1040 if !was_connected && client_for_task.get_token().is_some() {
1041 if let Some(handler) = &client_for_task.inner.on_auth_error {
1042 handler(crate::types::ForgeError {
1043 code: "UNAUTHORIZED".into(),
1044 message: "SSE authentication failed".into(),
1045 details: None,
1046 });
1047 }
1048 return;
1049 }
1050
1051 if let Some(attempts) = client_for_task.should_reconnect() {
1052 let delay = 1000 * (1u64 << attempts.min(4));
1053 sleep(std::time::Duration::from_millis(delay)).await;
1054
1055 client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
1056 start_event_loop(client_for_task);
1057 }
1058 });
1059
1060 client.inner.sse.borrow_mut().event_loop_task = Some(task);
1061 }
1062
1063 async fn run_event_loop(client: &ForgeClient) -> bool {
1065 let mut request = Client::new().get(format!("{}/_api/events", client.inner.url));
1066 if let Some(token) = client.get_token() {
1067 request = request.bearer_auth(token);
1068 }
1069
1070 let mut event_source = match EventSource::new(request) {
1071 Ok(source) => source,
1072 Err(_) => return false,
1073 };
1074
1075 let connected_event = loop {
1077 let Some(event) = event_source.next().await else {
1078 return false;
1079 };
1080 match event {
1081 Ok(Event::Open) => continue,
1082 Ok(Event::Message(msg)) if msg.event == "connected" => {
1083 match serde_json::from_str::<ConnectedEvent>(&msg.data) {
1084 Ok(event) => break event,
1085 Err(_) => return false,
1086 }
1087 }
1088 Ok(Event::Message(_)) => continue,
1089 Err(_) => return false,
1090 }
1091 };
1092
1093 let session_id = connected_event.session_id.unwrap_or_default();
1094 let session_secret = connected_event.session_secret.unwrap_or_default();
1095
1096 if session_id.is_empty() || session_secret.is_empty() {
1097 return false;
1098 }
1099
1100 let is_reconnect = client.mark_connected(session_id, session_secret);
1101 client.broadcast_connection(ConnectionState::Connected);
1102
1103 if is_reconnect {
1104 client.reregister_all().await;
1105 }
1106
1107 while let Some(event) = event_source.next().await {
1108 match event {
1109 Ok(Event::Open) => {}
1110 Ok(Event::Message(msg)) if msg.event == "update" || msg.event == "error" => {
1111 let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&msg.data) else {
1112 continue;
1113 };
1114 let Some(target) = envelope.target else {
1115 continue;
1116 };
1117
1118 if msg.event == "update" {
1119 if let Some(payload) = envelope.payload {
1120 client.dispatch_event(&target, SseDispatch::Data(payload));
1121 }
1122 } else {
1123 let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
1124 let message =
1125 envelope.message.unwrap_or_else(|| "Subscription error".to_string());
1126 client.dispatch_event(&target, SseDispatch::Error { code, message });
1127 }
1128 }
1129 Ok(Event::Message(_)) => {}
1130 Err(_) => break,
1131 }
1132 }
1133
1134 event_source.close();
1135 true
1136 }
1137
1138 fn request_error(err: reqwest::Error) -> ForgeClientError {
1139 ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
1140 }
1141}