1
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4use std::rc::Rc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::Duration;
7
8use dioxus::prelude::{Signal, WritableExt, dioxus_core::Task};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11
12use crate::types::{
13 ConnectionState, ForgeClientError, ForgeError, RpcEnvelopeRaw, StreamEvent,
14};
15
16type TokenProvider = Rc<dyn Fn() -> Option<String>>;
17type AuthErrorHandler = Rc<dyn Fn(ForgeError)>;
18type EventSender = futures_channel::mpsc::UnboundedSender<SseDispatch>;
19type ConnectWaiter = futures_channel::oneshot::Sender<Result<(), ForgeClientError>>;
20
21static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
22
23enum SseDispatch {
24 Data(serde_json::Value),
25 Error { code: String, message: String },
26}
27
28struct RegistrationMeta {
29 endpoint: &'static str,
30 payload: serde_json::Value,
31}
32
33#[derive(Default, Clone, Copy, PartialEq, Eq)]
34enum SseState {
35 #[default]
36 Idle,
37 Connecting,
38 Connected,
39}
40
41#[derive(Default)]
43struct SseManager {
44 session_id: Option<String>,
45 session_secret: Option<String>,
46 state: SseState,
47 ever_connected: bool,
48 listeners: HashMap<String, EventSender>,
49 registrations: HashMap<String, RegistrationMeta>,
50 event_loop_task: Option<Task>,
51 reconnect_attempts: u32,
52 connect_waiters: Vec<ConnectWaiter>,
53}
54
55const MAX_RECONNECT_ATTEMPTS: u32 = 10;
56
57#[derive(Clone)]
58pub struct ForgeClientConfig {
59 pub url: String,
60 pub get_token: Option<TokenProvider>,
61 pub on_auth_error: Option<AuthErrorHandler>,
62 pub(crate) connection_state: Option<Signal<ConnectionState>>,
63}
64
65impl ForgeClientConfig {
66 pub fn new(url: impl Into<String>) -> Self {
67 Self {
68 url: url.into(),
69 get_token: None,
70 on_auth_error: None,
71 connection_state: None,
72 }
73 }
74
75 pub fn with_token_provider(mut self, provider: impl Fn() -> Option<String> + 'static) -> Self {
76 self.get_token = Some(Rc::new(provider));
77 self
78 }
79
80 pub fn with_auth_error_handler(
81 mut self,
82 handler: impl Fn(ForgeError) + 'static,
83 ) -> Self {
84 self.on_auth_error = Some(Rc::new(handler));
85 self
86 }
87
88 pub(crate) fn with_connection_state(mut self, state: Signal<ConnectionState>) -> Self {
89 self.connection_state = Some(state);
90 self
91 }
92}
93
94#[derive(Clone)]
95pub struct ForgeClient {
96 inner: Rc<ForgeClientInner>,
97}
98
99struct ForgeClientInner {
100 url: String,
101 get_token: Option<TokenProvider>,
102 on_auth_error: Option<AuthErrorHandler>,
103 connection_state: Option<Signal<ConnectionState>>,
104 sse: RefCell<SseManager>,
105}
106
107impl ForgeClient {
108 pub fn new(config: ForgeClientConfig) -> Self {
109 Self {
110 inner: Rc::new(ForgeClientInner {
111 url: config.url.trim_end_matches('/').to_string(),
112 get_token: config.get_token,
113 on_auth_error: config.on_auth_error,
114 connection_state: config.connection_state,
115 sse: RefCell::new(SseManager::default()),
116 }),
117 }
118 }
119
120 pub async fn call<TArgs, TResult>(
121 &self,
122 function_name: &str,
123 args: TArgs,
124 ) -> Result<TResult, ForgeClientError>
125 where
126 TArgs: Serialize,
127 TResult: DeserializeOwned,
128 {
129 let body = serde_json::json!({ "args": args });
130 let envelope = platform::request_json(
131 self,
132 &format!("{}/_api/rpc/{}", self.inner.url, function_name),
133 body,
134 )
135 .await?;
136 self.decode_envelope(envelope)
137 }
138
139 #[cfg(target_arch = "wasm32")]
140 pub async fn call_multipart<TResult>(
141 &self,
142 function_name: &str,
143 form: web_sys::FormData,
144 ) -> Result<TResult, ForgeClientError>
145 where
146 TResult: DeserializeOwned,
147 {
148 let envelope = platform::request_multipart(
149 self,
150 &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
151 form,
152 )
153 .await?;
154 self.decode_envelope(envelope)
155 }
156
157 #[cfg(not(target_arch = "wasm32"))]
158 pub async fn call_multipart<TResult>(
159 &self,
160 function_name: &str,
161 form: reqwest::multipart::Form,
162 ) -> Result<TResult, ForgeClientError>
163 where
164 TResult: DeserializeOwned,
165 {
166 let envelope = platform::request_multipart(
167 self,
168 &format!("{}/_api/rpc/{}/upload", self.inner.url, function_name),
169 form,
170 )
171 .await?;
172 self.decode_envelope(envelope)
173 }
174
175 pub fn subscribe_query<TArgs, TResult, F>(
176 &self,
177 function_name: &str,
178 args: TArgs,
179 callback: F,
180 ) -> SubscriptionHandle
181 where
182 TArgs: Serialize + Clone + 'static,
183 TResult: DeserializeOwned + Clone + 'static,
184 F: FnMut(StreamEvent<TResult>) + 'static,
185 {
186 let sub_id = self.random_id("sub");
187 let target = format!("sub:{sub_id}");
188
189 let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
190 self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
191
192 let args_value = serde_json::to_value(&args).unwrap_or(serde_json::Value::Null);
193 let reg_payload = serde_json::json!({
194 "id": sub_id,
195 "function": function_name,
196 "args": args_value,
197 });
198 self.inner.sse.borrow_mut().registrations.insert(
199 sub_id.clone(),
200 RegistrationMeta {
201 endpoint: "/_api/subscribe",
202 payload: reg_payload,
203 },
204 );
205
206 self.spawn_subscription(sub_id, target, rx, callback, |client, envelope, cb| {
207 match client.decode_envelope::<TResult>(envelope) {
208 Ok(data) => cb(StreamEvent::Data(data)),
209 Err(err) => cb(StreamEvent::Error(err)),
210 }
211 })
212 }
213
214 pub fn subscribe_job<TResult, F>(&self, job_id: String, callback: F) -> SubscriptionHandle
215 where
216 TResult: DeserializeOwned + Clone + 'static,
217 F: FnMut(StreamEvent<TResult>) + 'static,
218 {
219 self.subscribe_tracker("job", serde_json::json!({ "job_id": job_id }), "/_api/subscribe-job", callback)
220 }
221
222 pub fn subscribe_workflow<TResult, F>(
223 &self,
224 workflow_id: String,
225 callback: F,
226 ) -> SubscriptionHandle
227 where
228 TResult: DeserializeOwned + Clone + 'static,
229 F: FnMut(StreamEvent<TResult>) + 'static,
230 {
231 self.subscribe_tracker(
232 "wf",
233 serde_json::json!({ "workflow_id": workflow_id }),
234 "/_api/subscribe-workflow",
235 callback,
236 )
237 }
238
239 fn subscribe_tracker<TResult, F>(
240 &self,
241 prefix: &str,
242 payload: serde_json::Value,
243 endpoint: &'static str,
244 callback: F,
245 ) -> SubscriptionHandle
246 where
247 TResult: DeserializeOwned + Clone + 'static,
248 F: FnMut(StreamEvent<TResult>) + 'static,
249 {
250 let sub_id = self.random_id(prefix);
251 let target = format!("{prefix}:{sub_id}");
252
253 let (tx, rx) = futures_channel::mpsc::unbounded::<SseDispatch>();
254 self.inner.sse.borrow_mut().listeners.insert(target.clone(), tx);
255
256 let mut reg_payload = payload;
257 reg_payload
258 .as_object_mut()
259 .expect("tracker payload must be an object")
260 .insert("id".to_string(), serde_json::Value::String(sub_id.clone()));
261 self.inner.sse.borrow_mut().registrations.insert(
262 sub_id.clone(),
263 RegistrationMeta {
264 endpoint,
265 payload: reg_payload,
266 },
267 );
268
269 self.spawn_subscription(sub_id, target, rx, callback, |_client, envelope, cb| {
270 if envelope.success {
271 if let Some(data) = envelope.data {
272 match serde_json::from_value::<TResult>(data) {
273 Ok(parsed) => cb(StreamEvent::Data(parsed)),
274 Err(e) => cb(StreamEvent::Error(ForgeClientError::new(
275 "DESERIALIZATION_ERROR",
276 e.to_string(),
277 None,
278 ))),
279 }
280 }
281 }
282 })
283 }
284
285 fn spawn_subscription<TResult, F>(
286 &self,
287 sub_id: String,
288 target: String,
289 mut rx: futures_channel::mpsc::UnboundedReceiver<SseDispatch>,
290 mut callback: F,
291 on_initial: impl FnOnce(&ForgeClient, RpcEnvelopeRaw, &mut F) + 'static,
292 ) -> SubscriptionHandle
293 where
294 TResult: DeserializeOwned + Clone + 'static,
295 F: FnMut(StreamEvent<TResult>) + 'static,
296 {
297 let client = self.clone();
298 let handle = SubscriptionHandle::new(sub_id.clone(), target, self.clone());
299 let handle_task = handle.clone();
300
301 let task = dioxus::prelude::spawn(async move {
302 callback(StreamEvent::Connection(ConnectionState::Connecting));
303
304 if let Err(e) = client.ensure_connected().await {
305 callback(StreamEvent::Error(e));
306 callback(StreamEvent::Connection(ConnectionState::Disconnected));
307 handle_task.finish();
308 return;
309 }
310
311 match client.register_subscription(&sub_id).await {
312 Ok(envelope) => {
313 callback(StreamEvent::Connection(ConnectionState::Connected));
314 on_initial(&client, envelope, &mut callback);
315 }
316 Err(err) => {
317 callback(StreamEvent::Error(err));
318 callback(StreamEvent::Connection(ConnectionState::Disconnected));
319 handle_task.finish();
320 return;
321 }
322 }
323
324 while let Some(event) = futures_util::StreamExt::next(&mut rx).await {
325 Self::deliver_event::<TResult, F>(&mut callback, &client, event);
326 }
327
328 handle_task.finish();
329 });
330
331 handle.set_task(task);
332 handle
333 }
334
335 fn deliver_event<TResult, F>(
336 callback: &mut F,
337 client: &ForgeClient,
338 event: SseDispatch,
339 ) where
340 TResult: DeserializeOwned,
341 F: FnMut(StreamEvent<TResult>),
342 {
343 match event {
344 SseDispatch::Data(value) => match serde_json::from_value::<TResult>(value) {
345 Ok(data) => callback(StreamEvent::Data(data)),
346 Err(e) => {
347 callback(StreamEvent::Error(ForgeClientError::new(
348 "DESERIALIZATION_ERROR",
349 e.to_string(),
350 None,
351 )));
352 }
353 },
354 SseDispatch::Error { code, message } => {
355 let err = ForgeClientError::new(&code, &message, None);
356 if code == "UNAUTHORIZED" {
357 if let Some(handler) = &client.inner.on_auth_error {
358 handler(err.as_forge_error());
359 }
360 }
361 callback(StreamEvent::Error(err));
362 }
363 }
364 }
365
366 async fn ensure_connected(&self) -> Result<(), ForgeClientError> {
368 let rx = {
369 let mut sse = self.inner.sse.borrow_mut();
370 if sse.state == SseState::Connected {
371 return Ok(());
372 }
373
374 let (tx, rx) = futures_channel::oneshot::channel();
375 sse.connect_waiters.push(tx);
376
377 if sse.state == SseState::Idle {
378 sse.state = SseState::Connecting;
379 drop(sse);
380 platform::start_event_loop(self.clone());
381 }
382
383 rx
384 };
385
386 rx.await.unwrap_or_else(|_| {
387 Err(ForgeClientError::new(
388 "SSE_CONNECTION_FAILED",
389 "Connection attempt cancelled",
390 None,
391 ))
392 })
393 }
394
395 async fn register_subscription(
398 &self,
399 sub_id: &str,
400 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
401 let envelope = self.try_register_subscription(sub_id).await?;
402
403 let needs_retry = !envelope.success
405 && envelope
406 .error
407 .as_ref()
408 .is_some_and(|e| e.code == "SESSION_NOT_FOUND" || e.code == "SESSION_PRINCIPAL_MISMATCH");
409
410 if needs_retry {
411 self.force_reconnect().await;
412 self.ensure_connected().await?;
413 return self.try_register_subscription(sub_id).await;
414 }
415
416 Ok(envelope)
417 }
418
419 async fn try_register_subscription(
420 &self,
421 sub_id: &str,
422 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
423 let (endpoint, payload) = {
424 let sse = self.inner.sse.borrow();
425 let meta = sse
426 .registrations
427 .get(sub_id)
428 .ok_or_else(|| {
429 ForgeClientError::new("INTERNAL_ERROR", "Registration metadata not found", None)
430 })?;
431 let session_id = sse.session_id.clone().unwrap_or_default();
432 let session_secret = sse.session_secret.clone().unwrap_or_default();
433 let mut payload = meta.payload.clone();
434 let obj = payload
435 .as_object_mut()
436 .expect("registration payload must be an object");
437 obj.insert("session_id".into(), serde_json::Value::String(session_id));
438 obj.insert("session_secret".into(), serde_json::Value::String(session_secret));
439 (meta.endpoint, payload)
440 };
441
442 let url = format!("{}{}", self.inner.url, endpoint);
443 platform::request_json(self, &url, payload).await
444 }
445
446 async fn force_reconnect(&self) {
447 let task = {
448 let mut sse = self.inner.sse.borrow_mut();
449 sse.session_id = None;
450 sse.session_secret = None;
451 sse.state = SseState::Idle;
452 sse.event_loop_task.take()
455 };
456 if let Some(task) = task {
457 task.cancel();
458 }
459 sleep(Duration::from_millis(10)).await;
460 }
461
462 async fn reregister_all(&self) {
463 let sub_ids: Vec<String> = {
464 let sse = self.inner.sse.borrow();
465 sse.registrations.keys().cloned().collect()
466 };
467
468 for sub_id in sub_ids {
469 let _ = self.register_subscription(&sub_id).await;
470 }
471 }
472
473 fn dispatch_event(&self, target: &str, event: SseDispatch) {
474 let tx = {
475 let sse = self.inner.sse.borrow();
476 sse.listeners.get(target).cloned()
477 };
478 if let Some(tx) = tx {
479 let _ = tx.unbounded_send(event);
480 }
481 }
482
483 fn broadcast_connection(&self, state: ConnectionState) {
484 if let Some(mut signal) = self.inner.connection_state {
485 signal.set(state);
486 }
487 }
488
489 fn mark_connected(&self, session_id: String, session_secret: String) -> bool {
490 let mut sse = self.inner.sse.borrow_mut();
491 let is_reconnect = sse.ever_connected;
492 sse.session_id = Some(session_id);
493 sse.session_secret = Some(session_secret);
494 sse.state = SseState::Connected;
495 sse.reconnect_attempts = 0;
496 sse.ever_connected = true;
497 for waiter in sse.connect_waiters.drain(..) {
498 let _ = waiter.send(Ok(()));
499 }
500 is_reconnect
501 }
502
503 fn mark_disconnected(&self) {
504 let mut sse = self.inner.sse.borrow_mut();
505 sse.session_id = None;
506 sse.session_secret = None;
507 sse.state = SseState::Idle;
508 sse.event_loop_task = None;
509 let err = || ForgeClientError::new("SSE_CONNECTION_FAILED", "SSE connection lost", None);
510 for waiter in sse.connect_waiters.drain(..) {
511 let _ = waiter.send(Err(err()));
512 }
513 }
514
515 fn should_reconnect(&self) -> Option<u32> {
516 let mut sse = self.inner.sse.borrow_mut();
517 if sse.listeners.is_empty() {
518 return None;
519 }
520 let attempts = sse.reconnect_attempts;
521 if attempts >= MAX_RECONNECT_ATTEMPTS {
522 return None;
523 }
524 sse.reconnect_attempts = attempts + 1;
525 Some(attempts)
526 }
527
528 fn get_token(&self) -> Option<String> {
529 self.inner
530 .get_token
531 .as_ref()
532 .and_then(|provider| provider())
533 .filter(|t| !t.is_empty())
534 }
535
536 fn decode_envelope<TResult>(
537 &self,
538 envelope: RpcEnvelopeRaw,
539 ) -> Result<TResult, ForgeClientError>
540 where
541 TResult: DeserializeOwned,
542 {
543 if !envelope.success {
544 let error = envelope.error.unwrap_or(ForgeError {
545 code: "UNKNOWN".to_string(),
546 message: "Unknown error".to_string(),
547 details: None,
548 });
549 return Err(ForgeClientError::new(error.code, error.message, error.details));
550 }
551
552 let data = envelope.data.ok_or_else(|| {
553 ForgeClientError::new("EMPTY_RESPONSE", "Server returned no data", None)
554 })?;
555 serde_json::from_value(data)
556 .map_err(|err| ForgeClientError::new("DESERIALIZATION_ERROR", err.to_string(), None))
557 }
558
559 fn random_id(&self, prefix: &str) -> String {
560 let id = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
561 format!("{prefix}-{id}")
562 }
563}
564
565async fn sleep(duration: Duration) {
566 #[cfg(target_arch = "wasm32")]
567 {
568 gloo_timers::future::sleep(duration).await;
569 }
570
571 #[cfg(not(target_arch = "wasm32"))]
572 {
573 tokio::time::sleep(duration).await;
574 }
575}
576
577#[derive(Clone)]
578pub struct SubscriptionHandle {
579 closed: Rc<Cell<bool>>,
580 task: Rc<RefCell<Option<Task>>>,
581 cleanup: Rc<RefCell<Option<Box<dyn FnOnce()>>>>,
582}
583
584impl SubscriptionHandle {
585 fn new(sub_id: String, target: String, client: ForgeClient) -> Self {
586 let cleanup: Box<dyn FnOnce()> = Box::new(move || {
587 let mut sse = client.inner.sse.borrow_mut();
588 sse.listeners.remove(&target);
589 sse.registrations.remove(&sub_id);
590 if target.starts_with("sub:") {
592 let session_id = sse.session_id.clone();
593 let session_secret = sse.session_secret.clone();
594 drop(sse);
595 if let (Some(sid), Some(ss)) = (session_id, session_secret) {
596 let url = format!("{}/_api/unsubscribe", client.inner.url);
597 let payload = serde_json::json!({
598 "session_id": sid,
599 "session_secret": ss,
600 "id": sub_id,
601 });
602 let client = client.clone();
603 dioxus::prelude::spawn(async move {
604 let _ = platform::request_json(&client, &url, payload).await;
605 });
606 }
607 }
608 });
609
610 Self {
611 closed: Rc::new(Cell::new(false)),
612 task: Rc::new(RefCell::new(None)),
613 cleanup: Rc::new(RefCell::new(Some(cleanup))),
614 }
615 }
616
617 fn set_task(&self, task: Task) {
618 *self.task.borrow_mut() = Some(task);
619 }
620
621 pub(crate) fn finish(&self) {
622 if self.closed.replace(true) {
623 return;
624 }
625 if let Some(cleanup) = self.cleanup.borrow_mut().take() {
626 cleanup();
627 }
628 self.task.borrow_mut().take();
629 }
630
631 pub fn close(&self) {
632 let task = { self.task.borrow_mut().clone() };
633 self.finish();
634 if let Some(task) = task {
635 task.cancel();
636 }
637 }
638
639 pub fn is_closed(&self) -> bool {
640 self.closed.get()
641 }
642}
643
644impl Drop for SubscriptionHandle {
645 fn drop(&mut self) {
646 self.close();
647 }
648}
649
650#[cfg(target_arch = "wasm32")]
653mod platform {
654 use dioxus::prelude::spawn;
655 use futures_util::{StreamExt, stream};
656 use gloo_net::eventsource::futures::EventSource;
657 use gloo_net::http::Request;
658 use js_sys::{JSON, encode_uri_component};
659
660 use super::{ForgeClient, SseDispatch, sleep};
661 use crate::types::{
662 ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
663 };
664
665 pub(super) async fn request_json(
666 client: &ForgeClient,
667 url: &str,
668 body: serde_json::Value,
669 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
670 let mut request = Request::post(url)
671 .header("Content-Type", "application/json")
672 .credentials(web_sys::RequestCredentials::Include);
673 if let Some(token) = client.get_token() {
674 request = request.header("Authorization", &format!("Bearer {token}"));
675 }
676
677 let request = request.body(body.to_string()).map_err(request_error)?;
678 request
679 .send()
680 .await
681 .map_err(request_error)?
682 .json()
683 .await
684 .map_err(request_error)
685 }
686
687 pub(super) async fn request_multipart(
688 client: &ForgeClient,
689 url: &str,
690 form: web_sys::FormData,
691 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
692 let mut request = Request::post(url)
693 .credentials(web_sys::RequestCredentials::Include);
694 if let Some(token) = client.get_token() {
695 request = request.header("Authorization", &format!("Bearer {token}"));
696 }
697
698 let response = request.body(form).map_err(request_error)?;
699 response
700 .send()
701 .await
702 .map_err(request_error)?
703 .json()
704 .await
705 .map_err(request_error)
706 }
707
708 fn message_data_as_string(message: &web_sys::MessageEvent) -> Option<String> {
709 let data = message.data();
710 data.as_string().or_else(|| {
711 JSON::stringify(&data)
712 .ok()
713 .and_then(|value| value.as_string())
714 .map(|raw| serde_json::from_str::<String>(&raw).unwrap_or(raw))
715 })
716 }
717
718 fn events_url(client: &ForgeClient) -> String {
719 match client.get_token() {
720 Some(token) => format!(
721 "{}/_api/events?token={}",
722 client.inner.url,
723 encode_uri_component(&token)
724 ),
725 None => format!("{}/_api/events", client.inner.url),
726 }
727 }
728
729 pub(super) fn start_event_loop(client: ForgeClient) {
731 let client_for_task = client.clone();
732 let task = spawn(async move {
733 run_event_loop(&client_for_task).await;
734
735 client_for_task.mark_disconnected();
737 client_for_task.broadcast_connection(ConnectionState::Disconnected);
738
739 if let Some(attempts) = client_for_task.should_reconnect() {
740 let delay = 1000 * (1u64 << attempts.min(4));
741 let jitter = (js_sys::Math::random() * 500.0) as u64;
742 sleep(std::time::Duration::from_millis(delay + jitter)).await;
743
744 client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
745 start_event_loop(client_for_task);
746 }
747 });
748
749 client.inner.sse.borrow_mut().event_loop_task = Some(task);
750 }
751
752 async fn run_event_loop(client: &ForgeClient) {
753 let mut event_source = match EventSource::new(&events_url(client)) {
754 Ok(source) => source,
755 Err(_) => {
756 return;
757 }
758 };
759
760 let mut connected_stream = match event_source.subscribe("connected") {
761 Ok(stream) => stream,
762 Err(_) => return,
763 };
764 let update_stream = match event_source.subscribe("update") {
765 Ok(stream) => stream,
766 Err(_) => return,
767 };
768 let error_stream = match event_source.subscribe("error") {
769 Ok(stream) => stream,
770 Err(_) => return,
771 };
772
773 let connected_event = match connected_stream.next().await {
775 Some(Ok((_kind, message))) => {
776 let Some(raw) = message_data_as_string(&message) else {
777 return;
778 };
779 match serde_json::from_str::<ConnectedEvent>(&raw) {
780 Ok(event) => event,
781 Err(_) => return,
782 }
783 }
784 _ => return,
785 };
786
787 let session_id = connected_event.session_id.unwrap_or_default();
788 let session_secret = connected_event.session_secret.unwrap_or_default();
789
790 if session_id.is_empty() || session_secret.is_empty() {
791 return;
792 }
793
794 let is_reconnect = client.mark_connected(session_id, session_secret);
795 client.broadcast_connection(ConnectionState::Connected);
796
797 if is_reconnect {
799 client.reregister_all().await;
800 }
801
802 let mut events = stream::select(update_stream, error_stream);
803 while let Some(event) = events.next().await {
804 match event {
805 Ok((kind, message)) => {
806 let Some(raw) = message_data_as_string(&message) else {
807 continue;
808 };
809 let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&raw) else {
810 continue;
811 };
812
813 let Some(target) = envelope.target else {
814 continue;
815 };
816
817 if kind == "update" {
818 if let Some(payload) = envelope.payload {
819 client.dispatch_event(&target, SseDispatch::Data(payload));
820 }
821 } else {
822 let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
823 let message = envelope.message.unwrap_or_else(|| "Subscription error".to_string());
824 client.dispatch_event(&target, SseDispatch::Error { code, message });
825 }
826 }
827 Err(_) => break,
828 }
829 }
830
831 event_source.close();
832 }
833
834 fn request_error(err: gloo_net::Error) -> ForgeClientError {
835 ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
836 }
837}
838
839#[cfg(not(target_arch = "wasm32"))]
842mod platform {
843 use dioxus::prelude::spawn;
844 use futures_util::StreamExt;
845 use reqwest::Client;
846 use reqwest_eventsource::{Event, EventSource};
847
848 use super::{ForgeClient, SseDispatch, sleep};
849 use crate::types::{
850 ConnectedEvent, ConnectionState, ForgeClientError, RpcEnvelopeRaw, SseEnvelopeRaw,
851 };
852
853 pub(super) async fn request_json(
854 client: &ForgeClient,
855 url: &str,
856 body: serde_json::Value,
857 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
858 let mut request = Client::new().post(url).json(&body);
859 if let Some(token) = client.get_token() {
860 request = request.bearer_auth(token);
861 }
862
863 request
864 .send()
865 .await
866 .map_err(request_error)?
867 .json()
868 .await
869 .map_err(request_error)
870 }
871
872 pub(super) async fn request_multipart(
873 client: &ForgeClient,
874 url: &str,
875 form: reqwest::multipart::Form,
876 ) -> Result<RpcEnvelopeRaw, ForgeClientError> {
877 let mut request = Client::new().post(url).multipart(form);
878 if let Some(token) = client.get_token() {
879 request = request.bearer_auth(token);
880 }
881
882 request
883 .send()
884 .await
885 .map_err(request_error)?
886 .json()
887 .await
888 .map_err(request_error)
889 }
890
891 pub(super) fn start_event_loop(client: ForgeClient) {
893 let client_for_task = client.clone();
894 let task = spawn(async move {
895 run_event_loop(&client_for_task).await;
896
897 client_for_task.mark_disconnected();
898 client_for_task.broadcast_connection(ConnectionState::Disconnected);
899
900 if let Some(attempts) = client_for_task.should_reconnect() {
901 let delay = 1000 * (1u64 << attempts.min(4));
902 sleep(std::time::Duration::from_millis(delay)).await;
903
904 client_for_task.inner.sse.borrow_mut().state = super::SseState::Connecting;
905 start_event_loop(client_for_task);
906 }
907 });
908
909 client.inner.sse.borrow_mut().event_loop_task = Some(task);
910 }
911
912 async fn run_event_loop(client: &ForgeClient) {
913 let mut request = Client::new().get(format!("{}/_api/events", client.inner.url));
914 if let Some(token) = client.get_token() {
915 request = request.bearer_auth(token);
916 }
917
918 let mut event_source = match EventSource::new(request) {
919 Ok(source) => source,
920 Err(_) => return,
921 };
922
923 let connected_event = loop {
925 let Some(event) = event_source.next().await else {
926 return;
927 };
928 match event {
929 Ok(Event::Open) => continue,
930 Ok(Event::Message(msg)) if msg.event == "connected" => {
931 match serde_json::from_str::<ConnectedEvent>(&msg.data) {
932 Ok(event) => break event,
933 Err(_) => return,
934 }
935 }
936 Ok(Event::Message(_)) => continue,
937 Err(_) => return,
938 }
939 };
940
941 let session_id = connected_event.session_id.unwrap_or_default();
942 let session_secret = connected_event.session_secret.unwrap_or_default();
943
944 if session_id.is_empty() || session_secret.is_empty() {
945 return;
946 }
947
948 let is_reconnect = client.mark_connected(session_id, session_secret);
949 client.broadcast_connection(ConnectionState::Connected);
950
951 if is_reconnect {
952 client.reregister_all().await;
953 }
954
955 while let Some(event) = event_source.next().await {
956 match event {
957 Ok(Event::Open) => {}
958 Ok(Event::Message(msg)) if msg.event == "update" || msg.event == "error" => {
959 let Ok(envelope) = serde_json::from_str::<SseEnvelopeRaw>(&msg.data) else {
960 continue;
961 };
962 let Some(target) = envelope.target else {
963 continue;
964 };
965
966 if msg.event == "update" {
967 if let Some(payload) = envelope.payload {
968 client.dispatch_event(&target, SseDispatch::Data(payload));
969 }
970 } else {
971 let code = envelope.code.unwrap_or_else(|| "SSE_ERROR".to_string());
972 let message =
973 envelope.message.unwrap_or_else(|| "Subscription error".to_string());
974 client.dispatch_event(&target, SseDispatch::Error { code, message });
975 }
976 }
977 Ok(Event::Message(_)) => {}
978 Err(_) => break,
979 }
980 }
981
982 event_source.close();
983 }
984
985 fn request_error(err: reqwest::Error) -> ForgeClientError {
986 ForgeClientError::new("REQUEST_FAILED", err.to_string(), None)
987 }
988}