1
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4use std::rc::Rc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8use dioxus::prelude::{Signal, WritableExt, dioxus_core::Task};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11
12use crate::signals::ForgeSignals;
13use crate::types::{
14 ConnectionState, ForgeClientError, ForgeError, RpcEnvelopeRaw, StreamEvent,
15};
16
17type TokenProvider = Rc<dyn Fn() -> Option<String>>;
18type AuthErrorHandler = Rc<dyn Fn(ForgeError)>;
19type EventSender = futures_channel::mpsc::UnboundedSender<SseDispatch>;
20type ConnectWaiter = futures_channel::oneshot::Sender<Result<(), ForgeClientError>>;
21
22static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
23
24enum SseDispatch {
25 Data(serde_json::Value),
26 Error { code: String, message: String },
27}
28
29struct RegistrationMeta {
30 endpoint: &'static str,
31 payload: serde_json::Value,
32}
33
34#[derive(Default, Clone, Copy, PartialEq, Eq)]
35enum SseState {
36 #[default]
37 Idle,
38 Connecting,
39 Connected,
40}
41
42#[derive(Default)]
44struct SseManager {
45 session_id: Option<String>,
46 session_secret: Option<String>,
47 state: SseState,
48 ever_connected: bool,
49 listeners: HashMap<String, EventSender>,
50 registrations: HashMap<String, RegistrationMeta>,
51 event_loop_task: Option<Task>,
52 reconnect_attempts: u32,
53 connect_waiters: Vec<ConnectWaiter>,
54}
55
56const MAX_RECONNECT_ATTEMPTS: u32 = 10;
57
58#[derive(Clone)]
59pub struct ForgeClientConfig {
60 pub url: String,
61 pub get_token: Option<TokenProvider>,
62 pub on_auth_error: Option<AuthErrorHandler>,
63 pub(crate) connection_state: Option<Signal<ConnectionState>>,
64}
65
66impl ForgeClientConfig {
67 pub fn new(url: impl Into<String>) -> Self {
68 Self {
69 url: url.into(),
70 get_token: None,
71 on_auth_error: None,
72 connection_state: None,
73 }
74 }
75
76 pub fn with_token_provider(mut self, provider: impl Fn() -> Option<String> + 'static) -> Self {
77 self.get_token = Some(Rc::new(provider));
78 self
79 }
80
81 pub fn with_auth_error_handler(
82 mut self,
83 handler: impl Fn(ForgeError) + 'static,
84 ) -> Self {
85 self.on_auth_error = Some(Rc::new(handler));
86 self
87 }
88
89 pub(crate) fn with_connection_state(mut self, state: Signal<ConnectionState>) -> Self {
90 self.connection_state = Some(state);
91 self
92 }
93}
94
95#[derive(Clone)]
96pub struct ForgeClient {
97 inner: Rc<ForgeClientInner>,
98}
99
100struct ForgeClientInner {
101 url: String,
102 get_token: Option<TokenProvider>,
103 on_auth_error: Option<AuthErrorHandler>,
104 connection_state: Option<Signal<ConnectionState>>,
105 sse: RefCell<SseManager>,
106 signals: RefCell<Option<ForgeSignals>>,
107}
108
109impl ForgeClient {
110 pub fn new(config: ForgeClientConfig) -> Self {
111 Self {
112 inner: Rc::new(ForgeClientInner {
113 url: config.url.trim_end_matches('/').to_string(),
114 get_token: config.get_token,
115 on_auth_error: config.on_auth_error,
116 connection_state: config.connection_state,
117 sse: RefCell::new(SseManager::default()),
118 signals: RefCell::new(None),
119 }),
120 }
121 }
122
123 pub fn set_signals(&self, signals: ForgeSignals) {
125 *self.inner.signals.borrow_mut() = Some(signals);
126 }
127
128 pub fn get_url(&self) -> &str {
130 &self.inner.url
131 }
132
133 fn correlation_id(&self) -> Option<String> {
135 self.inner.signals.borrow().as_ref().map(|s| s.next_correlation_id())
136 }
137
138 pub async fn call<TArgs, TResult>(
139 &self,
140 function_name: &str,
141 args: TArgs,
142 ) -> Result<TResult, ForgeClientError>
143 where
144 TArgs: Serialize,
145 TResult: DeserializeOwned,
146 {
147 let body = serde_json::json!({ "args": args });
148 let correlation_id = self.correlation_id();
149 let envelope = platform::request_json(
150 self,
151 &format!("{}/_api/rpc/{}", self.inner.url, function_name),
152 body,
153 correlation_id.as_deref(),
154 )
155 .await?;
156 self.decode_envelope(envelope)
157 }
158
159 #[cfg(target_arch = "wasm32")]
160 pub async fn call_multipart<TResult>(
161 &self,
162 function_name: &str,
163 form: web_sys::FormData,
164 ) -> Result<TResult, ForgeClientError>
165 where
166 TResult: DeserializeOwned,
167 {
168 let correlation_id = self.correlation_id();
169 let envelope = platform::request_multipart(
170 self,
171 &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
172 form,
173 correlation_id.as_deref(),
174 )
175 .await?;
176 self.decode_envelope(envelope)
177 }
178
179 #[cfg(not(target_arch = "wasm32"))]
180 pub async fn call_multipart<TResult>(
181 &self,
182 function_name: &str,
183 form: reqwest::multipart::Form,
184 ) -> Result<TResult, ForgeClientError>
185 where
186 TResult: DeserializeOwned,
187 {
188 let correlation_id = self.correlation_id();
189 let envelope = platform::request_multipart(
190 self,
191 &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
192 form,
193 correlation_id.as_deref(),
194 )
195 .await?;
196 self.decode_envelope(envelope)
197 }
198
199 pub fn subscribe_query<TArgs, TResult, F>(
200 &self,
201 function_name: &str,
202 args: TArgs,
203 callback: F,
204 ) -> SubscriptionHandle
205 where
206 TArgs: Serialize + Clone + 'static,
207 TResult: DeserializeOwned + Clone + 'static,
208 F: FnMut(StreamEvent<TResult>) + 'static,
209 {
210 let sub_id = self.random_id("sub");
211 let target = format!("sub:{sub_id}");
212
213 let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
214 self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
215
216 let args_value = serde_json::to_value(&args).unwrap_or(serde_json::Value::Null);
217 let reg_payload = serde_json::json!({
218 "id": sub_id,
219 "function": function_name,
220 "args": args_value,
221 });
222 self.inner.sse.borrow_mut().registrations.insert(
223 sub_id.clone(),
224 RegistrationMeta {
225 endpoint: "/_api/subscribe",
226 payload: reg_payload,
227 },
228 );
229
230 self.spawn_subscription(sub_id, target, rx, callback, |client, envelope, cb| {
231 match client.decode_envelope::<TResult>(envelope) {
232 Ok(data) => cb(StreamEvent::Data(data)),
233 Err(err) => cb(StreamEvent::Error(err)),
234 }
235 })
236 }
237
238 pub fn subscribe_job<TResult, F>(&self, job_id: String, callback: F) -> SubscriptionHandle
239 where
240 TResult: DeserializeOwned + Clone + 'static,
241 F: FnMut(StreamEvent<TResult>) + 'static,
242 {
243 self.subscribe_tracker("job", serde_json::json!({ "job_id": job_id }), "/_api/subscribe-job", callback)
244 }
245
246 pub fn subscribe_workflow<TResult, F>(
247 &self,
248 workflow_id: String,
249 callback: F,
250 ) -> SubscriptionHandle
251 where
252 TResult: DeserializeOwned + Clone + 'static,
253 F: FnMut(StreamEvent<TResult>) + 'static,
254 {
255 self.subscribe_tracker(
256 "wf",
257 serde_json::json!({ "workflow_id": workflow_id }),
258 "/_api/subscribe-workflow",
259 callback,
260 )
261 }
262
263 fn subscribe_tracker<TResult, F>(
264 &self,
265 prefix: &str,
266 payload: serde_json::Value,
267 endpoint: &'static str,
268 callback: F,
269 ) -> SubscriptionHandle
270 where
271 TResult: DeserializeOwned + Clone + 'static,
272 F: FnMut(StreamEvent<TResult>) + 'static,
273 {
274 let sub_id = self.random_id(prefix);
275 let target = format!("{prefix}:{sub_id}");
276
277 let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
278 self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
279
280 let mut reg_payload = payload;
281 reg_payload
282 .as_object_mut()
283 .expect("tracker payload must be an object")
284 .insert("id".to_string(), serde_json::Value::String(sub_id.clone()));
285 self.inner.sse.borrow_mut().registrations.insert(
286 sub_id.clone(),
287 RegistrationMeta {
288 endpoint,
289 payload: reg_payload,
290 },
291 );
292
293 self.spawn_subscription(sub_id, target, rx, callback, |_client, envelope, cb| {
294 if envelope.success {
295 if let Some(data) = envelope.data {
296 match serde_json::from_value::<TResult>(data) {
297 Ok(parsed) => cb(StreamEvent::Data(parsed)),
298 Err(e) => cb(StreamEvent::Error(ForgeClientError::new(
299 "DESERIALIZATION_ERROR",
300 e.to_string(),
301 None,
302 ))),
303 }
304 }
305 }
306 })
307 }
308
309 fn spawn_subscription<TResult, F>(
310 &self,
311 sub_id: String,
312 target: String,
313 mut rx: futures_channel::mpsc::UnboundedReceiver<SseDispatch>,
314 mut callback: F,
315 on_initial: impl FnOnce(&ForgeClient, RpcEnvelopeRaw, &mut F) + 'static,
316 ) -> SubscriptionHandle
317 where
318 TResult: DeserializeOwned + Clone + 'static,
319 F: FnMut(StreamEvent<TResult>) + 'static,
320 {
321 let client = self.clone();
322 let handle = SubscriptionHandle::new(sub_id.clone(), target, self.clone());
323 let handle_task = handle.clone();
324
325 let task = dioxus::prelude::spawn(async move {
326 callback(StreamEvent::Connection(ConnectionState::Connecting));
327
328 if let Err(e) = client.ensure_connected().await {
329 callback(StreamEvent::Error(e));
330 callback(StreamEvent::Connection(ConnectionState::Disconnected));
331 handle_task.finish();
332 return;
333 }
334
335 match client.register_subscription(&sub_id).await {
336 Ok(envelope) => {
337 callback(StreamEvent::Connection(ConnectionState::Connected));
338 on_initial(&client, envelope, &mut callback);
339 }
340 Err(err) => {
341 callback(StreamEvent::Error(err));
342 callback(StreamEvent::Connection(ConnectionState::Disconnected));
343 handle_task.finish();
344 return;
345 }
346 }
347
348 while let Some(event) = futures_util::StreamExt::next(&mut rx).await {
349 Self::deliver_event::<TResult, F>(&mut callback, &client, event);
350 }
351
352 handle_task.finish();
353 });
354
355 handle.set_task(task);
356 handle
357 }
358
359 fn deliver_event<TResult, F>(
360 callback: &mut F,
361 client: &ForgeClient,
362 event: SseDispatch,
363 ) where
364 TResult: DeserializeOwned,
365 F: FnMut(StreamEvent<TResult>),
366 {
367 match event {
368 SseDispatch::Data(value) => match serde_json::from_value::<TResult>(value) {
369 Ok(data) => callback(StreamEvent::Data(data)),
370 Err(e) => {
371 callback(StreamEvent::Error(ForgeClientError::new(
372 "DESERIALIZATION_ERROR",
373 e.to_string(),
374 None,
375 )));
376 }
377 },
378 SseDispatch::Error { code, message } => {
379 let err = ForgeClientError::new(&code, &message, None);
380 if code == "UNAUTHORIZED" {
381 if let Some(handler) = &client.inner.on_auth_error {
382 handler(err.as_forge_error());
383 }
384 }
385 callback(StreamEvent::Error(err));
386 }
387 }
388 }
389
390 async fn ensure_connected(&self) -> Result<(), ForgeClientError> {
392 let rx = {
393 let mut sse = self.inner.sse.borrow_mut();
394 if sse.state == SseState::Connected {
395 return Ok(());
396 }
397
398 let (tx, rx) = futures_channel::oneshot::channel();
399 sse.connect_waiters.push(tx);
400
401 if sse.state == SseState::Idle {
402 sse.state = SseState::Connecting;
403 drop(sse);
404 platform::start_event_loop(self.clone());
405 }
406
407 rx
408 };
409
410 rx.await.unwrap_or_else(|_| {
411 Err(ForgeClientError::new(
412 "SSE_CONNECTION_FAILED",
413 "Connection attempt cancelled",
414 None,
415 ))
416 })
417 }
418
419 async fn register_subscription(
422 &self,
423 sub_id: &str,
424 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
425 let envelope = self.try_register_subscription(sub_id).await?;
426
427 let needs_retry = !envelope.success
429 && envelope
430 .error
431 .as_ref()
432 .is_some_and(|e| e.code == "SESSION_NOT_FOUND" || e.code == "SESSION_PRINCIPAL_MISMATCH");
433
434 if needs_retry {
435 self.force_reconnect().await;
436 self.ensure_connected().await?;
437 return self.try_register_subscription(sub_id).await;
438 }
439
440 Ok(envelope)
441 }
442
443 async fn try_register_subscription(
444 &self,
445 sub_id: &str,
446 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
447 let (endpoint, payload) = {
448 let sse = self.inner.sse.borrow();
449 let meta = sse
450 .registrations
451 .get(sub_id)
452 .ok_or_else(|| {
453 ForgeClientError::new("INTERNAL_ERROR", "Registration metadata not found", None)
454 })?;
455 let session_id = sse.session_id.clone().unwrap_or_default();
456 let session_secret = sse.session_secret.clone().unwrap_or_default();
457 let mut payload = meta.payload.clone();
458 let obj = payload
459 .as_object_mut()
460 .expect("registration payload must be an object");
461 obj.insert("session_id".into(), serde_json::Value::String(session_id));
462 obj.insert("session_secret".into(), serde_json::Value::String(session_secret));
463 (meta.endpoint, payload)
464 };
465
466 let url = format!("{}{}", self.inner.url, endpoint);
467 platform::request_json(self, &url, payload, None).await
468 }
469
470 async fn force_reconnect(&self) {
471 let task = {
472 let mut sse = self.inner.sse.borrow_mut();
473 sse.session_id = None;
474 sse.session_secret = None;
475 sse.state = SseState::Idle;
476 sse.event_loop_task.take()
479 };
480 if let Some(task) = task {
481 task.cancel();
482 }
483 sleep(Duration::from_millis(10)).await;
484 }
485
486 async fn reregister_all(&self) {
487 let sub_ids: Vec<String> = {
488 let sse = self.inner.sse.borrow();
489 sse.registrations.keys().cloned().collect()
490 };
491
492 for sub_id in sub_ids {
493 let _ = self.register_subscription(&sub_id).await;
494 }
495 }
496
497 fn dispatch_event(&self, target: &str, event: SseDispatch) {
498 let tx = {
499 let sse = self.inner.sse.borrow();
500 sse.listeners.get(target).cloned()
501 };
502 if let Some(tx) = tx {
503 let _ = tx.unbounded_send(event);
504 }
505 }
506
507 fn broadcast_connection(&self, state: ConnectionState) {
508 if let Some(mut signal) = self.inner.connection_state {
509 signal.set(state);
510 }
511 }
512
513 fn mark_connected(&self, session_id: String, session_secret: String) -> bool {
514 let mut sse = self.inner.sse.borrow_mut();
515 let is_reconnect = sse.ever_connected;
516 sse.session_id = Some(session_id);
517 sse.session_secret = Some(session_secret);
518 sse.state = SseState::Connected;
519 sse.reconnect_attempts = 0;
520 sse.ever_connected = true;
521 for waiter in sse.connect_waiters.drain(..) {
522 let _ = waiter.send(Ok(()));
523 }
524 is_reconnect
525 }
526
527 fn mark_disconnected(&self) {
528 let mut sse = self.inner.sse.borrow_mut();
529 sse.session_id = None;
530 sse.session_secret = None;
531 sse.state = SseState::Idle;
532 sse.event_loop_task = None;
533 let err = || ForgeClientError::new("SSE_CONNECTION_FAILED", "SSE connection lost", None);
534 for waiter in sse.connect_waiters.drain(..) {
535 let _ = waiter.send(Err(err()));
536 }
537 }
538
539 fn should_reconnect(&self) -> Option<u32> {
540 let mut sse = self.inner.sse.borrow_mut();
541 if sse.listeners.is_empty() {
542 return None;
543 }
544 let attempts = sse.reconnect_attempts;
545 if attempts >= MAX_RECONNECT_ATTEMPTS {
546 return None;
547 }
548 sse.reconnect_attempts = attempts + 1;
549 Some(attempts)
550 }
551
552 fn get_token(&self) -> Option<String> {
553 self.inner
554 .get_token
555 .as_ref()
556 .and_then(|provider| provider())
557 .filter(|t| !t.is_empty())
558 }
559
560 fn decode_envelope<TResult>(
561 &self,
562 envelope: RpcEnvelopeRaw,
563 ) -> Result<TResult, ForgeClientError>
564 where
565 TResult: DeserializeOwned,
566 {
567 if !envelope.success {
568 let error = envelope.error.unwrap_or(ForgeError {
569 code: "UNKNOWN".to_string(),
570 message: "Unknown error".to_string(),
571 details: None,
572 });
573 return Err(ForgeClientError::new(error.code, error.message, error.details));
574 }
575
576 let data = envelope.data.ok_or_else(|| {
577 ForgeClientError::new("EMPTY_RESPONSE", "Server returned no data", None)
578 })?;
579 serde_json::from_value(data)
580 .map_err(|err| ForgeClientError::new("DESERIALIZATION_ERROR", err.to_string(), None))
581 }
582
583 fn random_id(&self, prefix: &str) -> String {
584 let id = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
585 format!("{prefix}-{id}")
586 }
587}
588
589async fn sleep(duration: Duration) {
590 #[cfg(target_arch = "wasm32")]
591 {
592 gloo_timers::future::sleep(duration).await;
593 }
594
595 #[cfg(not(target_arch = "wasm32"))]
596 {
597 tokio::time::sleep(duration).await;
598 }
599}
600
601#[derive(Clone)]
602pub struct SubscriptionHandle {
603 closed: Rc<Cell<bool>>,
604 task: Rc<RefCell<Option<Task>>>,
605 cleanup: Rc<RefCell<Option<Box<dyn FnOnce()>>>>,
606}
607
608impl SubscriptionHandle {
609 fn new(sub_id: String, target: String, client: ForgeClient) -> Self {
610 let cleanup: Box<dyn FnOnce()> = Box::new(move || {
611 let mut sse = client.inner.sse.borrow_mut();
612 sse.listeners.remove(&target);
613 sse.registrations.remove(&sub_id);
614 if target.starts_with("sub:") {
616 let session_id = sse.session_id.clone();
617 let session_secret = sse.session_secret.clone();
618 drop(sse);
619 if let (Some(sid), Some(ss)) = (session_id, session_secret) {
620 let url = format!("{}/_api/unsubscribe", client.inner.url);
621 let payload = serde_json::json!({
622 "session_id": sid,
623 "session_secret": ss,
624 "id": sub_id,
625 });
626 let client = client.clone();
627 dioxus::prelude::spawn(async move {
628 let _ = platform::request_json(&client, &url, payload, None).await;
629 });
630 }
631 }
632 });
633
634 Self {
635 closed: Rc::new(Cell::new(false)),
636 task: Rc::new(RefCell::new(None)),
637 cleanup: Rc::new(RefCell::new(Some(cleanup))),
638 }
639 }
640
641 fn set_task(&self, task: Task) {
642 *self.task.borrow_mut() = Some(task);
643 }
644
645 pub(crate) fn finish(&self) {
646 if self.closed.replace(true) {
647 return;
648 }
649 if let Some(cleanup) = self.cleanup.borrow_mut().take() {
650 cleanup();
651 }
652 self.task.borrow_mut().take();
653 }
654
655 pub fn close(&self) {
656 let task = { self.task.borrow_mut().clone() };
657 self.finish();
658 if let Some(task) = task {
659 task.cancel();
660 }
661 }
662
663 pub fn is_closed(&self) -> bool {
664 self.closed.get()
665 }
666}
667
668impl Drop for SubscriptionHandle {
669 fn drop(&mut self) {
670 self.close();
671 }
672}
673
674#[cfg(target_arch = "wasm32")]
677mod platform {
678 use dioxus::prelude::spawn;
679 use futures_util::{StreamExt, stream};
680 use gloo_net::eventsource::futures::EventSource;
681 use gloo_net::http::Request;
682 use js_sys::{JSON, encode_uri_component};
683
684 use super::{ForgeClient, SseDispatch, sleep};
685 use crate::signals::platform_tag;
686 use crate::types::{
687 ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
688 };
689
690 pub(super) async fn request_json(
691 client: &ForgeClient,
692 url: &str,
693 body: serde_json::Value,
694 correlation_id: Option<&str>,
695 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
696 let mut request = Request::post(url)
697 .header("Content-Type", "application/json")
698 .header("x-forge-platform", platform_tag())
699 .credentials(web_sys::RequestCredentials::Include);
700 if let Some(token) = client.get_token() {
701 request = request.header("Authorization", &format!("Bearer {token}"));
702 }
703 if let Some(cid) = correlation_id {
704 request = request.header("x-correlation-id", cid);
705 }
706
707 let request = request.body(body.to_string()).map_err(request_error)?;
708 request
709 .send()
710 .await
711 .map_err(request_error)?
712 .json()
713 .await
714 .map_err(request_error)
715 }
716
717 pub(super) async fn request_multipart(
718 client: &ForgeClient,
719 url: &str,
720 form: web_sys::FormData,
721 correlation_id: Option<&str>,
722 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
723 let mut request = Request::post(url)
724 .header("x-forge-platform", platform_tag())
725 .credentials(web_sys::RequestCredentials::Include);
726 if let Some(token) = client.get_token() {
727 request = request.header("Authorization", &format!("Bearer {token}"));
728 }
729 if let Some(cid) = correlation_id {
730 request = request.header("x-correlation-id", cid);
731 }
732
733 let response = request.body(form).map_err(request_error)?;
734 response
735 .send()
736 .await
737 .map_err(request_error)?
738 .json()
739 .await
740 .map_err(request_error)
741 }
742
743 fn message_data_as_string(message: &web_sys::MessageEvent) -> Option<String> {
744 let data = message.data();
745 data.as_string().or_else(|| {
746 JSON::stringify(&data)
747 .ok()
748 .and_then(|value| value.as_string())
749 .map(|raw| serde_json::from_str::<String>(&raw).unwrap_or(raw))
750 })
751 }
752
753 fn events_url(client: &ForgeClient) -> String {
754 match client.get_token() {
755 Some(token) => format!(
756 "{}/_api/events?token={}",
757 client.inner.url,
758 encode_uri_component(&token)
759 ),
760 None => format!("{}/_api/events", client.inner.url),
761 }
762 }
763
764 pub(super) fn start_event_loop(client: ForgeClient) {
766 let client_for_task = client.clone();
767 let task = spawn(async move {
768 run_event_loop(&client_for_task).await;
769
770 client_for_task.mark_disconnected();
772 client_for_task.broadcast_connection(ConnectionState::Disconnected);
773
774 if let Some(attempts) = client_for_task.should_reconnect() {
775 let delay = 1000 * (1u64 << attempts.min(4));
776 let jitter = (js_sys::Math::random() * 500.0) as u64;
777 sleep(std::time::Duration::from_millis(delay + jitter)).await;
778
779 client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
780 start_event_loop(client_for_task);
781 }
782 });
783
784 client.inner.sse.borrow_mut().event_loop_task = Some(task);
785 }
786
787 async fn run_event_loop(client: &ForgeClient) {
788 let mut event_source = match EventSource::new(&events_url(client)) {
789 Ok(source) => source,
790 Err(_) => {
791 return;
792 }
793 };
794
795 let mut connected_stream = match event_source.subscribe("connected") {
796 Ok(stream) => stream,
797 Err(_) => return,
798 };
799 let update_stream = match event_source.subscribe("update") {
800 Ok(stream) => stream,
801 Err(_) => return,
802 };
803 let error_stream = match event_source.subscribe("error") {
804 Ok(stream) => stream,
805 Err(_) => return,
806 };
807
808 let connected_event = match connected_stream.next().await {
810 Some(Ok((_kind, message))) => {
811 let Some(raw) = message_data_as_string(&message) else {
812 return;
813 };
814 match serde_json::from_str::<ConnectedEvent>(&raw) {
815 Ok(event) => event,
816 Err(_) => return,
817 }
818 }
819 _ => return,
820 };
821
822 let session_id = connected_event.session_id.unwrap_or_default();
823 let session_secret = connected_event.session_secret.unwrap_or_default();
824
825 if session_id.is_empty() || session_secret.is_empty() {
826 return;
827 }
828
829 let is_reconnect = client.mark_connected(session_id, session_secret);
830 client.broadcast_connection(ConnectionState::Connected);
831
832 if is_reconnect {
834 client.reregister_all().await;
835 }
836
837 let mut events = stream::select(update_stream, error_stream);
838 while let Some(event) = events.next().await {
839 match event {
840 Ok((kind, message)) => {
841 let Some(raw) = message_data_as_string(&message) else {
842 continue;
843 };
844 let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&raw) else {
845 continue;
846 };
847
848 let Some(target) = envelope.target else {
849 continue;
850 };
851
852 if kind == "update" {
853 if let Some(payload) = envelope.payload {
854 client.dispatch_event(&target, SseDispatch::Data(payload));
855 }
856 } else {
857 let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
858 let message = envelope.message.unwrap_or_else(|| "Subscription error".to_string());
859 client.dispatch_event(&target, SseDispatch::Error { code, message });
860 }
861 }
862 Err(_) => break,
863 }
864 }
865
866 event_source.close();
867 }
868
869 fn request_error(err: gloo_net::Error) -> ForgeClientError {
870 ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
871 }
872}
873
874#[cfg(not(target_arch = "wasm32"))]
877mod platform {
878 use dioxus::prelude::spawn;
879 use futures_util::StreamExt;
880 use reqwest::Client;
881 use reqwest_eventsource::{Event, EventSource};
882
883 use super::{ForgeClient, SseDispatch, sleep};
884 use crate::signals::platform_tag;
885 use crate::types::{
886 ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
887 };
888
889 pub(super) async fn request_json(
890 client: &ForgeClient,
891 url: &str,
892 body: serde_json::Value,
893 correlation_id: Option<&str>,
894 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
895 let mut request = Client::new()
896 .post(url)
897 .header("x-forge-platform", platform_tag())
898 .json(&body);
899 if let Some(token) = client.get_token() {
900 request = request.bearer_auth(token);
901 }
902 if let Some(cid) = correlation_id {
903 request = request.header("x-correlation-id", cid);
904 }
905
906 request
907 .send()
908 .await
909 .map_err(request_error)?
910 .json()
911 .await
912 .map_err(request_error)
913 }
914
915 pub(super) async fn request_multipart(
916 client: &ForgeClient,
917 url: &str,
918 form: reqwest::multipart::Form,
919 correlation_id: Option<&str>,
920 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
921 let mut request = Client::new()
922 .post(url)
923 .header("x-forge-platform", platform_tag())
924 .multipart(form);
925 if let Some(token) = client.get_token() {
926 request = request.bearer_auth(token);
927 }
928 if let Some(cid) = correlation_id {
929 request = request.header("x-correlation-id", cid);
930 }
931
932 request
933 .send()
934 .await
935 .map_err(request_error)?
936 .json()
937 .await
938 .map_err(request_error)
939 }
940
941 pub(super) fn start_event_loop(client: ForgeClient) {
943 let client_for_task = client.clone();
944 let task = spawn(async move {
945 run_event_loop(&client_for_task).await;
946
947 client_for_task.mark_disconnected();
948 client_for_task.broadcast_connection(ConnectionState::Disconnected);
949
950 if let Some(attempts) = client_for_task.should_reconnect() {
951 let delay = 1000 * (1u64 << attempts.min(4));
952 sleep(std::time::Duration::from_millis(delay)).await;
953
954 client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
955 start_event_loop(client_for_task);
956 }
957 });
958
959 client.inner.sse.borrow_mut().event_loop_task = Some(task);
960 }
961
962 async fn run_event_loop(client: &ForgeClient) {
963 let mut request = Client::new().get(format!("{}/_api/events", client.inner.url));
964 if let Some(token) = client.get_token() {
965 request = request.bearer_auth(token);
966 }
967
968 let mut event_source = match EventSource::new(request) {
969 Ok(source) => source,
970 Err(_) => return,
971 };
972
973 let connected_event = loop {
975 let Some(event) = event_source.next().await else {
976 return;
977 };
978 match event {
979 Ok(Event::Open) => continue,
980 Ok(Event::Message(msg)) if msg.event == "connected" => {
981 match serde_json::from_str::<ConnectedEvent>(&msg.data) {
982 Ok(event) => break event,
983 Err(_) => return,
984 }
985 }
986 Ok(Event::Message(_)) => continue,
987 Err(_) => return,
988 }
989 };
990
991 let session_id = connected_event.session_id.unwrap_or_default();
992 let session_secret = connected_event.session_secret.unwrap_or_default();
993
994 if session_id.is_empty() || session_secret.is_empty() {
995 return;
996 }
997
998 let is_reconnect = client.mark_connected(session_id, session_secret);
999 client.broadcast_connection(ConnectionState::Connected);
1000
1001 if is_reconnect {
1002 client.reregister_all().await;
1003 }
1004
1005 while let Some(event) = event_source.next().await {
1006 match event {
1007 Ok(Event::Open) => {}
1008 Ok(Event::Message(msg)) if msg.event == "update" || msg.event == "error" => {
1009 let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&msg.data) else {
1010 continue;
1011 };
1012 let Some(target) = envelope.target else {
1013 continue;
1014 };
1015
1016 if msg.event == "update" {
1017 if let Some(payload) = envelope.payload {
1018 client.dispatch_event(&target, SseDispatch::Data(payload));
1019 }
1020 } else {
1021 let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
1022 let message =
1023 envelope.message.unwrap_or_else(|| "Subscription error".to_string());
1024 client.dispatch_event(&target, SseDispatch::Error { code, message });
1025 }
1026 }
1027 Ok(Event::Message(_)) => {}
1028 Err(_) => break,
1029 }
1030 }
1031
1032 event_source.close();
1033 }
1034
1035 fn request_error(err: reqwest::Error) -> ForgeClientError {
1036 ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
1037 }
1038}