1
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4use std::rc::Rc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8use dioxus::prelude::{Signal, WritableExt, dioxus_core::Task};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11
12use crate::types::{
13 ConnectionState, ForgeClientError, ForgeError, RpcEnvelopeRaw, StreamEvent,
14};
15
16type TokenProvider = Rc<dyn Fn() -> Option<String>>;
17type AuthErrorHandler = Rc<dyn Fn(ForgeError)>;
18type EventSender = futures_channel::mpsc::UnboundedSender<SseDispatch>;
19type ConnectWaiter = futures_channel::oneshot::Sender<Result<(), ForgeClientError>>;
20
21static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
22
23enum SseDispatch {
24 Data(serde_json::Value),
25 Error { code: String, message: String },
26}
27
28struct RegistrationMeta {
29 endpoint: &'static str,
30 payload: serde_json::Value,
31}
32
33#[derive(Default, Clone, Copy, PartialEq, Eq)]
34enum SseState {
35 #[default]
36 Idle,
37 Connecting,
38 Connected,
39}
40
41#[derive(Default)]
43struct SseManager {
44 session_id: Option<String>,
45 session_secret: Option<String>,
46 state: SseState,
47 ever_connected: bool,
48 listeners: HashMap<String, EventSender>,
49 registrations: HashMap<String, RegistrationMeta>,
50 event_loop_task: Option<Task>,
51 reconnect_attempts: u32,
52 connect_waiters: Vec<ConnectWaiter>,
53}
54
55const MAX_RECONNECT_ATTEMPTS: u32 = 10;
56
57#[derive(Clone)]
58pub struct ForgeClientConfig {
59 pub url: String,
60 pub get_token: Option<TokenProvider>,
61 pub on_auth_error: Option<AuthErrorHandler>,
62 pub(crate) connection_state: Option<Signal<ConnectionState>>,
63}
64
65impl ForgeClientConfig {
66 pub fn new(url: impl Into<String>) -> Self {
67 Self {
68 url: url.into(),
69 get_token: None,
70 on_auth_error: None,
71 connection_state: None,
72 }
73 }
74
75 pub fn with_token_provider(mut self, provider: impl Fn() -> Option<String> + 'static) -> Self {
76 self.get_token = Some(Rc::new(provider));
77 self
78 }
79
80 pub fn with_auth_error_handler(
81 mut self,
82 handler: impl Fn(ForgeError) + 'static,
83 ) -> Self {
84 self.on_auth_error = Some(Rc::new(handler));
85 self
86 }
87
88 pub(crate) fn with_connection_state(mut self, state: Signal<ConnectionState>) -> Self {
89 self.connection_state = Some(state);
90 self
91 }
92}
93
94#[derive(Clone)]
95pub struct ForgeClient {
96 inner: Rc<ForgeClientInner>,
97}
98
99struct ForgeClientInner {
100 url: String,
101 get_token: Option<TokenProvider>,
102 on_auth_error: Option<AuthErrorHandler>,
103 connection_state: Option<Signal<ConnectionState>>,
104 sse: RefCell<SseManager>,
105}
106
107impl ForgeClient {
108 pub fn new(config: ForgeClientConfig) -> Self {
109 Self {
110 inner: Rc::new(ForgeClientInner {
111 url: config.url.trim_end_matches('/').to_string(),
112 get_token: config.get_token,
113 on_auth_error: config.on_auth_error,
114 connection_state: config.connection_state,
115 sse: RefCell::new(SseManager::default()),
116 }),
117 }
118 }
119
120 pub async fn call<TArgs, TResult>(
121 &self,
122 function_name: &str,
123 args: TArgs,
124 ) -> Result<TResult, ForgeClientError>
125 where
126 TArgs: Serialize,
127 TResult: DeserializeOwned,
128 {
129 let body = serde_json::json!({ "args": args });
130 let envelope = platform::request_json(
131 self,
132 &format!("{}/_api/rpc/{}", self.inner.url, function_name),
133 body,
134 )
135 .await?;
136 self.decode_envelope(envelope)
137 }
138
139 #[cfg(target_arch = "wasm32")]
140 pub async fn call_multipart<TResult>(
141 &self,
142 function_name: &str,
143 form: web_sys::FormData,
144 ) -> Result<TResult, ForgeClientError>
145 where
146 TResult: DeserializeOwned,
147 {
148 let envelope = platform::request_multipart(
149 self,
150 &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
151 form,
152 )
153 .await?;
154 self.decode_envelope(envelope)
155 }
156
157 #[cfg(not(target_arch = "wasm32"))]
158 pub async fn call_multipart<TResult>(
159 &self,
160 function_name: &str,
161 form: reqwest::multipart::Form,
162 ) -> Result<TResult, ForgeClientError>
163 where
164 TResult: DeserializeOwned,
165 {
166 let envelope = platform::request_multipart(
167 self,
168 &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
169 form,
170 )
171 .await?;
172 self.decode_envelope(envelope)
173 }
174
175 pub fn subscribe_query<TArgs, TResult, F>(
176 &self,
177 function_name: &str,
178 args: TArgs,
179 callback: F,
180 ) -> SubscriptionHandle
181 where
182 TArgs: Serialize + Clone + 'static,
183 TResult: DeserializeOwned + Clone + 'static,
184 F: FnMut(StreamEvent<TResult>) + 'static,
185 {
186 let sub_id = self.random_id("sub");
187 let target = format!("sub:{sub_id}");
188
189 let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
190 self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
191
192 let args_value = serde_json::to_value(&args).unwrap_or(serde_json::Value::Null);
193 let reg_payload = serde_json::json!({
194 "id": sub_id,
195 "function": function_name,
196 "args": args_value,
197 });
198 self.inner.sse.borrow_mut().registrations.insert(
199 sub_id.clone(),
200 RegistrationMeta {
201 endpoint: "/_api/subscribe",
202 payload: reg_payload,
203 },
204 );
205
206 self.spawn_subscription(sub_id, target, rx, callback, |client, envelope, cb| {
207 match client.decode_envelope::<TResult>(envelope) {
208 Ok(data) => cb(StreamEvent::Data(data)),
209 Err(err) => cb(StreamEvent::Error(err)),
210 }
211 })
212 }
213
214 pub fn subscribe_job<TResult, F>(&self, job_id: String, callback: F) -> SubscriptionHandle
215 where
216 TResult: DeserializeOwned + Clone + 'static,
217 F: FnMut(StreamEvent<TResult>) + 'static,
218 {
219 self.subscribe_tracker("job", serde_json::json!({ "job_id": job_id }), "/_api/subscribe-job", callback)
220 }
221
222 pub fn subscribe_workflow<TResult, F>(
223 &self,
224 workflow_id: String,
225 callback: F,
226 ) -> SubscriptionHandle
227 where
228 TResult: DeserializeOwned + Clone + 'static,
229 F: FnMut(StreamEvent<TResult>) + 'static,
230 {
231 self.subscribe_tracker(
232 "wf",
233 serde_json::json!({ "workflow_id": workflow_id }),
234 "/_api/subscribe-workflow",
235 callback,
236 )
237 }
238
239 fn subscribe_tracker<TResult, F>(
240 &self,
241 prefix: &str,
242 payload: serde_json::Value,
243 endpoint: &'static str,
244 callback: F,
245 ) -> SubscriptionHandle
246 where
247 TResult: DeserializeOwned + Clone + 'static,
248 F: FnMut(StreamEvent<TResult>) + 'static,
249 {
250 let sub_id = self.random_id(prefix);
251 let target = format!("{prefix}:{sub_id}");
252
253 let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
254 self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
255
256 let mut reg_payload = payload;
257 reg_payload
258 .as_object_mut()
259 .expect("tracker payload must be an object")
260 .insert("id".to_string(), serde_json::Value::String(sub_id.clone()));
261 self.inner.sse.borrow_mut().registrations.insert(
262 sub_id.clone(),
263 RegistrationMeta {
264 endpoint,
265 payload: reg_payload,
266 },
267 );
268
269 self.spawn_subscription(sub_id, target, rx, callback, |_client, envelope, cb| {
270 if envelope.success {
271 if let Some(data) = envelope.data {
272 match serde_json::from_value::<TResult>(data) {
273 Ok(parsed) => cb(StreamEvent::Data(parsed)),
274 Err(e) => cb(StreamEvent::Error(ForgeClientError::new(
275 "DESERIALIZATION_ERROR",
276 e.to_string(),
277 None,
278 ))),
279 }
280 }
281 }
282 })
283 }
284
285 fn spawn_subscription<TResult, F>(
286 &self,
287 sub_id: String,
288 target: String,
289 mut rx: futures_channel::mpsc::UnboundedReceiver<SseDispatch>,
290 mut callback: F,
291 on_initial: impl FnOnce(&ForgeClient, RpcEnvelopeRaw, &mut F) + 'static,
292 ) -> SubscriptionHandle
293 where
294 TResult: DeserializeOwned + Clone + 'static,
295 F: FnMut(StreamEvent<TResult>) + 'static,
296 {
297 let client = self.clone();
298 let handle = SubscriptionHandle::new(sub_id.clone(), target, self.clone());
299 let handle_task = handle.clone();
300
301 let task = dioxus::prelude::spawn(async move {
302 callback(StreamEvent::Connection(ConnectionState::Connecting));
303
304 if let Err(e) = client.ensure_connected().await {
305 callback(StreamEvent::Error(e));
306 callback(StreamEvent::Connection(ConnectionState::Disconnected));
307 handle_task.finish();
308 return;
309 }
310
311 match client.register_subscription(&sub_id).await {
312 Ok(envelope) => {
313 callback(StreamEvent::Connection(ConnectionState::Connected));
314 on_initial(&client, envelope, &mut callback);
315 }
316 Err(err) => {
317 callback(StreamEvent::Error(err));
318 callback(StreamEvent::Connection(ConnectionState::Disconnected));
319 handle_task.finish();
320 return;
321 }
322 }
323
324 while let Some(event) = futures_util::StreamExt::next(&mut rx).await {
325 Self::deliver_event::<TResult, F>(&mut callback, &client, event);
326 }
327
328 handle_task.finish();
329 });
330
331 handle.set_task(task);
332 handle
333 }
334
335 fn deliver_event<TResult, F>(
336 callback: &mut F,
337 client: &ForgeClient,
338 event: SseDispatch,
339 ) where
340 TResult: DeserializeOwned,
341 F: FnMut(StreamEvent<TResult>),
342 {
343 match event {
344 SseDispatch::Data(value) => match serde_json::from_value::<TResult>(value) {
345 Ok(data) => callback(StreamEvent::Data(data)),
346 Err(e) => {
347 callback(StreamEvent::Error(ForgeClientError::new(
348 "DESERIALIZATION_ERROR",
349 e.to_string(),
350 None,
351 )));
352 }
353 },
354 SseDispatch::Error { code, message } => {
355 let err = ForgeClientError::new(&code, &message, None);
356 if code == "UNAUTHORIZED" {
357 if let Some(handler) = &client.inner.on_auth_error {
358 handler(err.as_forge_error());
359 }
360 }
361 callback(StreamEvent::Error(err));
362 }
363 }
364 }
365
366 async fn ensure_connected(&self) -> Result<(), ForgeClientError> {
368 let rx = {
369 let mut sse = self.inner.sse.borrow_mut();
370 if sse.state == SseState::Connected {
371 return Ok(());
372 }
373
374 let (tx, rx) = futures_channel::oneshot::channel();
375 sse.connect_waiters.push(tx);
376
377 if sse.state == SseState::Idle {
378 sse.state = SseState::Connecting;
379 drop(sse);
380 platform::start_event_loop(self.clone());
381 }
382
383 rx
384 };
385
386 rx.await.unwrap_or_else(|_| {
387 Err(ForgeClientError::new(
388 "SSE_CONNECTION_FAILED",
389 "Connection attempt cancelled",
390 None,
391 ))
392 })
393 }
394
395 async fn register_subscription(
398 &self,
399 sub_id: &str,
400 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
401 let envelope = self.try_register_subscription(sub_id).await?;
402
403 let needs_retry = !envelope.success
405 && envelope
406 .error
407 .as_ref()
408 .is_some_and(|e| e.code == "SESSION_NOT_FOUND" || e.code == "SESSION_PRINCIPAL_MISMATCH");
409
410 if needs_retry {
411 self.force_reconnect().await;
412 self.ensure_connected().await?;
413 return self.try_register_subscription(sub_id).await;
414 }
415
416 Ok(envelope)
417 }
418
419 async fn try_register_subscription(
420 &self,
421 sub_id: &str,
422 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
423 let (endpoint, payload) = {
424 let sse = self.inner.sse.borrow();
425 let meta = sse
426 .registrations
427 .get(sub_id)
428 .ok_or_else(|| {
429 ForgeClientError::new("INTERNAL_ERROR", "Registration metadata not found", None)
430 })?;
431 let session_id = sse.session_id.clone().unwrap_or_default();
432 let session_secret = sse.session_secret.clone().unwrap_or_default();
433 let mut payload = meta.payload.clone();
434 let obj = payload
435 .as_object_mut()
436 .expect("registration payload must be an object");
437 obj.insert("session_id".into(), serde_json::Value::String(session_id));
438 obj.insert("session_secret".into(), serde_json::Value::String(session_secret));
439 (meta.endpoint, payload)
440 };
441
442 let url = format!("{}{}", self.inner.url, endpoint);
443 platform::request_json(self, &url, payload).await
444 }
445
446 async fn force_reconnect(&self) {
447 let task = {
448 let mut sse = self.inner.sse.borrow_mut();
449 sse.session_id = None;
450 sse.session_secret = None;
451 sse.state = SseState::Idle;
452 sse.event_loop_task.take()
455 };
456 if let Some(task) = task {
457 task.cancel();
458 }
459 sleep(Duration::from_millis(10)).await;
460 }
461
462 async fn reregister_all(&self) {
463 let sub_ids: Vec<String> = {
464 let sse = self.inner.sse.borrow();
465 sse.registrations.keys().cloned().collect()
466 };
467
468 for sub_id in sub_ids {
469 let _ = self.register_subscription(&sub_id).await;
470 }
471 }
472
473 fn dispatch_event(&self, target: &str, event: SseDispatch) {
474 let tx = {
475 let sse = self.inner.sse.borrow();
476 sse.listeners.get(target).cloned()
477 };
478 if let Some(tx) = tx {
479 let _ = tx.unbounded_send(event);
480 }
481 }
482
483 fn broadcast_connection(&self, state: ConnectionState) {
484 if let Some(mut signal) = self.inner.connection_state {
485 signal.set(state);
486 }
487 }
488
489 fn mark_connected(&self, session_id: String, session_secret: String) -> bool {
490 let mut sse = self.inner.sse.borrow_mut();
491 let is_reconnect = sse.ever_connected;
492 sse.session_id = Some(session_id);
493 sse.session_secret = Some(session_secret);
494 sse.state = SseState::Connected;
495 sse.reconnect_attempts = 0;
496 sse.ever_connected = true;
497 for waiter in sse.connect_waiters.drain(..) {
498 let _ = waiter.send(Ok(()));
499 }
500 is_reconnect
501 }
502
503 fn mark_disconnected(&self) {
504 let mut sse = self.inner.sse.borrow_mut();
505 sse.session_id = None;
506 sse.session_secret = None;
507 sse.state = SseState::Idle;
508 sse.event_loop_task = None;
509 let err = || ForgeClientError::new("SSE_CONNECTION_FAILED", "SSE connection lost", None);
510 for waiter in sse.connect_waiters.drain(..) {
511 let _ = waiter.send(Err(err()));
512 }
513 }
514
515 fn should_reconnect(&self) -> Option<u32> {
516 let mut sse = self.inner.sse.borrow_mut();
517 if sse.listeners.is_empty() {
518 return None;
519 }
520 let attempts = sse.reconnect_attempts;
521 if attempts >= MAX_RECONNECT_ATTEMPTS {
522 return None;
523 }
524 sse.reconnect_attempts = attempts + 1;
525 Some(attempts)
526 }
527
528 fn get_token(&self) -> Option<String> {
529 self.inner
530 .get_token
531 .as_ref()
532 .and_then(|provider| provider())
533 .filter(|t| !t.is_empty())
534 }
535
536 fn decode_envelope<TResult>(
537 &self,
538 envelope: RpcEnvelopeRaw,
539 ) -> Result<TResult, ForgeClientError>
540 where
541 TResult: DeserializeOwned,
542 {
543 if !envelope.success {
544 let error = envelope.error.unwrap_or(ForgeError {
545 code: "UNKNOWN".to_string(),
546 message: "Unknown error".to_string(),
547 details: None,
548 });
549 return Err(ForgeClientError::new(error.code, error.message, error.details));
550 }
551
552 let data = envelope.data.ok_or_else(|| {
553 ForgeClientError::new("EMPTY_RESPONSE", "Server returned no data", None)
554 })?;
555 serde_json::from_value(data)
556 .map_err(|err| ForgeClientError::new("DESERIALIZATION_ERROR", err.to_string(), None))
557 }
558
559 fn random_id(&self, prefix: &str) -> String {
560 let id = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
561 format!("{prefix}-{id}")
562 }
563}
564
565async fn sleep(duration: Duration) {
566 #[cfg(target_arch = "wasm32")]
567 {
568 gloo_timers::future::sleep(duration).await;
569 }
570
571 #[cfg(not(target_arch = "wasm32"))]
572 {
573 tokio::time::sleep(duration).await;
574 }
575}
576
577#[derive(Clone)]
578pub struct SubscriptionHandle {
579 closed: Rc<Cell<bool>>,
580 task: Rc<RefCell<Option<Task>>>,
581 cleanup: Rc<RefCell<Option<Box<dyn FnOnce()>>>>,
582}
583
584impl SubscriptionHandle {
585 fn new(sub_id: String, target: String, client: ForgeClient) -> Self {
586 let cleanup: Box<dyn FnOnce()> = Box::new(move || {
587 let mut sse = client.inner.sse.borrow_mut();
588 sse.listeners.remove(&target);
589 sse.registrations.remove(&sub_id);
590 if target.starts_with("sub:") {
592 let session_id = sse.session_id.clone();
593 let session_secret = sse.session_secret.clone();
594 drop(sse);
595 if let (Some(sid), Some(ss)) = (session_id, session_secret) {
596 let url = format!("{}/_api/unsubscribe", client.inner.url);
597 let payload = serde_json::json!({
598 "session_id": sid,
599 "session_secret": ss,
600 "id": sub_id,
601 });
602 let client = client.clone();
603 dioxus::prelude::spawn(async move {
604 let _ = platform::request_json(&client, &url, payload).await;
605 });
606 }
607 }
608 });
609
610 Self {
611 closed: Rc::new(Cell::new(false)),
612 task: Rc::new(RefCell::new(None)),
613 cleanup: Rc::new(RefCell::new(Some(cleanup))),
614 }
615 }
616
617 fn set_task(&self, task: Task) {
618 *self.task.borrow_mut() = Some(task);
619 }
620
621 pub(crate) fn finish(&self) {
622 if self.closed.replace(true) {
623 return;
624 }
625 if let Some(cleanup) = self.cleanup.borrow_mut().take() {
626 cleanup();
627 }
628 self.task.borrow_mut().take();
629 }
630
631 pub fn close(&self) {
632 let task = { self.task.borrow_mut().clone() };
633 self.finish();
634 if let Some(task) = task {
635 task.cancel();
636 }
637 }
638
639 pub fn is_closed(&self) -> bool {
640 self.closed.get()
641 }
642}
643
644impl Drop for SubscriptionHandle {
645 fn drop(&mut self) {
646 self.close();
647 }
648}
649
650#[cfg(target_arch = "wasm32")]
653mod platform {
654 use dioxus::prelude::spawn;
655 use futures_util::{StreamExt, stream};
656 use gloo_net::eventsource::futures::EventSource;
657 use gloo_net::http::Request;
658 use js_sys::{JSON, encode_uri_component};
659
660 use super::{ForgeClient, SseDispatch, sleep};
661 use crate::types::{
662 ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
663 };
664
665 pub(super) async fn request_json(
666 client: &ForgeClient,
667 url: &str,
668 body: serde_json::Value,
669 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
670 let mut request = Request::post(url).header("Content-Type", "application/json");
671 if let Some(token) = client.get_token() {
672 request = request.header("Authorization", &format!("Bearer {token}"));
673 }
674
675 let request = request.body(body.to_string()).map_err(request_error)?;
676 request
677 .send()
678 .await
679 .map_err(request_error)?
680 .json()
681 .await
682 .map_err(request_error)
683 }
684
685 pub(super) async fn request_multipart(
686 client: &ForgeClient,
687 url: &str,
688 form: web_sys::FormData,
689 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
690 let mut request = Request::post(url);
691 if let Some(token) = client.get_token() {
692 request = request.header("Authorization", &format!("Bearer {token}"));
693 }
694
695 let response = request.body(form).map_err(request_error)?;
696 response
697 .send()
698 .await
699 .map_err(request_error)?
700 .json()
701 .await
702 .map_err(request_error)
703 }
704
705 fn message_data_as_string(message: &web_sys::MessageEvent) -> Option<String> {
706 let data = message.data();
707 data.as_string().or_else(|| {
708 JSON::stringify(&data)
709 .ok()
710 .and_then(|value| value.as_string())
711 .map(|raw| serde_json::from_str::<String>(&raw).unwrap_or(raw))
712 })
713 }
714
715 fn events_url(client: &ForgeClient) -> String {
716 match client.get_token() {
717 Some(token) => format!(
718 "{}/_api/events?token={}",
719 client.inner.url,
720 encode_uri_component(&token)
721 ),
722 None => format!("{}/_api/events", client.inner.url),
723 }
724 }
725
726 pub(super) fn start_event_loop(client: ForgeClient) {
728 let client_for_task = client.clone();
729 let task = spawn(async move {
730 run_event_loop(&client_for_task).await;
731
732 client_for_task.mark_disconnected();
734 client_for_task.broadcast_connection(ConnectionState::Disconnected);
735
736 if let Some(attempts) = client_for_task.should_reconnect() {
737 let delay = 1000 * (1u64 << attempts.min(4));
738 let jitter = (js_sys::Math::random() * 500.0) as u64;
739 sleep(std::time::Duration::from_millis(delay + jitter)).await;
740
741 client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
742 start_event_loop(client_for_task);
743 }
744 });
745
746 client.inner.sse.borrow_mut().event_loop_task = Some(task);
747 }
748
749 async fn run_event_loop(client: &ForgeClient) {
750 let mut event_source = match EventSource::new(&events_url(client)) {
751 Ok(source) => source,
752 Err(_) => {
753 return;
754 }
755 };
756
757 let mut connected_stream = match event_source.subscribe("connected") {
758 Ok(stream) => stream,
759 Err(_) => return,
760 };
761 let update_stream = match event_source.subscribe("update") {
762 Ok(stream) => stream,
763 Err(_) => return,
764 };
765 let error_stream = match event_source.subscribe("error") {
766 Ok(stream) => stream,
767 Err(_) => return,
768 };
769
770 let connected_event = match connected_stream.next().await {
772 Some(Ok((_kind, message))) => {
773 let Some(raw) = message_data_as_string(&message) else {
774 return;
775 };
776 match serde_json::from_str::<ConnectedEvent>(&raw) {
777 Ok(event) => event,
778 Err(_) => return,
779 }
780 }
781 _ => return,
782 };
783
784 let session_id = connected_event.session_id.unwrap_or_default();
785 let session_secret = connected_event.session_secret.unwrap_or_default();
786
787 if session_id.is_empty() || session_secret.is_empty() {
788 return;
789 }
790
791 let is_reconnect = client.mark_connected(session_id, session_secret);
792 client.broadcast_connection(ConnectionState::Connected);
793
794 if is_reconnect {
796 client.reregister_all().await;
797 }
798
799 let mut events = stream::select(update_stream, error_stream);
800 while let Some(event) = events.next().await {
801 match event {
802 Ok((kind, message)) => {
803 let Some(raw) = message_data_as_string(&message) else {
804 continue;
805 };
806 let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&raw) else {
807 continue;
808 };
809
810 let Some(target) = envelope.target else {
811 continue;
812 };
813
814 if kind == "update" {
815 if let Some(payload) = envelope.payload {
816 client.dispatch_event(&target, SseDispatch::Data(payload));
817 }
818 } else {
819 let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
820 let message = envelope.message.unwrap_or_else(|| "Subscription error".to_string());
821 client.dispatch_event(&target, SseDispatch::Error { code, message });
822 }
823 }
824 Err(_) => break,
825 }
826 }
827
828 event_source.close();
829 }
830
831 fn request_error(err: gloo_net::Error) -> ForgeClientError {
832 ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
833 }
834}
835
836#[cfg(not(target_arch = "wasm32"))]
839mod platform {
840 use dioxus::prelude::spawn;
841 use futures_util::StreamExt;
842 use reqwest::Client;
843 use reqwest_eventsource::{Event, EventSource};
844
845 use super::{ForgeClient, SseDispatch, sleep};
846 use crate::types::{
847 ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
848 };
849
850 pub(super) async fn request_json(
851 client: &ForgeClient,
852 url: &str,
853 body: serde_json::Value,
854 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
855 let mut request = Client::new().post(url).json(&body);
856 if let Some(token) = client.get_token() {
857 request = request.bearer_auth(token);
858 }
859
860 request
861 .send()
862 .await
863 .map_err(request_error)?
864 .json()
865 .await
866 .map_err(request_error)
867 }
868
869 pub(super) async fn request_multipart(
870 client: &ForgeClient,
871 url: &str,
872 form: reqwest::multipart::Form,
873 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
874 let mut request = Client::new().post(url).multipart(form);
875 if let Some(token) = client.get_token() {
876 request = request.bearer_auth(token);
877 }
878
879 request
880 .send()
881 .await
882 .map_err(request_error)?
883 .json()
884 .await
885 .map_err(request_error)
886 }
887
888 pub(super) fn start_event_loop(client: ForgeClient) {
890 let client_for_task = client.clone();
891 let task = spawn(async move {
892 run_event_loop(&client_for_task).await;
893
894 client_for_task.mark_disconnected();
895 client_for_task.broadcast_connection(ConnectionState::Disconnected);
896
897 if let Some(attempts) = client_for_task.should_reconnect() {
898 let delay = 1000 * (1u64 << attempts.min(4));
899 sleep(std::time::Duration::from_millis(delay)).await;
900
901 client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
902 start_event_loop(client_for_task);
903 }
904 });
905
906 client.inner.sse.borrow_mut().event_loop_task = Some(task);
907 }
908
909 async fn run_event_loop(client: &ForgeClient) {
910 let mut request = Client::new().get(format!("{}/_api/events", client.inner.url));
911 if let Some(token) = client.get_token() {
912 request = request.bearer_auth(token);
913 }
914
915 let mut event_source = match EventSource::new(request) {
916 Ok(source) => source,
917 Err(_) => return,
918 };
919
920 let connected_event = loop {
922 let Some(event) = event_source.next().await else {
923 return;
924 };
925 match event {
926 Ok(Event::Open) => continue,
927 Ok(Event::Message(msg)) if msg.event == "connected" => {
928 match serde_json::from_str::<ConnectedEvent>(&msg.data) {
929 Ok(event) => break event,
930 Err(_) => return,
931 }
932 }
933 Ok(Event::Message(_)) => continue,
934 Err(_) => return,
935 }
936 };
937
938 let session_id = connected_event.session_id.unwrap_or_default();
939 let session_secret = connected_event.session_secret.unwrap_or_default();
940
941 if session_id.is_empty() || session_secret.is_empty() {
942 return;
943 }
944
945 let is_reconnect = client.mark_connected(session_id, session_secret);
946 client.broadcast_connection(ConnectionState::Connected);
947
948 if is_reconnect {
949 client.reregister_all().await;
950 }
951
952 while let Some(event) = event_source.next().await {
953 match event {
954 Ok(Event::Open) => {}
955 Ok(Event::Message(msg)) if msg.event == "update" || msg.event == "error" => {
956 let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&msg.data) else {
957 continue;
958 };
959 let Some(target) = envelope.target else {
960 continue;
961 };
962
963 if msg.event == "update" {
964 if let Some(payload) = envelope.payload {
965 client.dispatch_event(&target, SseDispatch::Data(payload));
966 }
967 } else {
968 let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
969 let message =
970 envelope.message.unwrap_or_else(|| "Subscription error".to_string());
971 client.dispatch_event(&target, SseDispatch::Error { code, message });
972 }
973 }
974 Ok(Event::Message(_)) => {}
975 Err(_) => break,
976 }
977 }
978
979 event_source.close();
980 }
981
982 fn request_error(err: reqwest::Error) -> ForgeClientError {
983 ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
984 }
985}