1use crate::error::Error;
19use serde_json::{json, Value};
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex, RwLock};
22use tokio::sync::{mpsc, oneshot};
23use tokio::time::{sleep, timeout, Duration};
24use uuid::Uuid;
25
26pub struct RoomOptions {
29 pub auto_reconnect: bool,
30 pub max_reconnect_attempts: u32,
31 pub reconnect_base_delay_ms: u64,
32 pub send_timeout_ms: u64,
34 pub connection_timeout_ms: u64,
36}
37
38impl Default for RoomOptions {
39 fn default() -> Self {
40 Self {
41 auto_reconnect: true,
42 max_reconnect_attempts: 10,
43 reconnect_base_delay_ms: 1000,
44 send_timeout_ms: 10_000,
45 connection_timeout_ms: 15_000,
46 }
47 }
48}
49
50pub struct Subscription {
54 remove_fn: Mutex<Option<Box<dyn FnOnce() + Send>>>,
55}
56
57impl Subscription {
58 fn new(remove_fn: impl FnOnce() + Send + 'static) -> Self {
59 Self {
60 remove_fn: Mutex::new(Some(Box::new(remove_fn))),
61 }
62 }
63
64 pub fn unsubscribe(self) {
66 if let Some(f) = self.remove_fn.lock().unwrap().take() {
67 f();
68 }
69 }
70}
71
72impl Drop for Subscription {
73 fn drop(&mut self) {
74 if let Some(f) = self.remove_fn.lock().unwrap().take() {
75 f();
76 }
77 }
78}
79
80type StateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
83type MessageHandler = Arc<dyn Fn(&Value) + Send + Sync>;
84type ErrorHandler = Arc<dyn Fn(&str, &str) + Send + Sync>;
85type KickedHandler = Arc<dyn Fn() + Send + Sync>;
86type MembersSyncHandler = Arc<dyn Fn(&Value) + Send + Sync>;
87type MemberHandler = Arc<dyn Fn(&Value) + Send + Sync>;
88type MemberLeaveHandler = Arc<dyn Fn(&Value, &str) + Send + Sync>;
89type MemberStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
90type SignalHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
91type AnySignalHandler = Arc<dyn Fn(&str, &Value, &Value) + Send + Sync>;
92type ReconnectHandler = Arc<dyn Fn(&Value) + Send + Sync>;
93type ConnectionStateHandler = Arc<dyn Fn(&str) + Send + Sync>;
94
95type HandlerList<H> = Arc<Mutex<Vec<(u64, H)>>>;
97
98pub(crate) enum RoomWsCommand {
99 Send(String),
100 Close,
101}
102
103const ROOM_EXPLICIT_LEAVE_CLOSE_DELAY: Duration = Duration::from_millis(40);
104const ROOM_STATE_IDLE: &str = "idle";
105const ROOM_STATE_CONNECTING: &str = "connecting";
106const ROOM_STATE_CONNECTED: &str = "connected";
107const ROOM_STATE_RECONNECTING: &str = "reconnecting";
108const ROOM_STATE_DISCONNECTED: &str = "disconnected";
109const ROOM_STATE_KICKED: &str = "kicked";
110
111pub struct RoomClient {
114 pub namespace: String,
116 pub room_id: String,
118
119 shared_state: RwLock<Value>,
121 shared_version: RwLock<u64>,
122 player_state: RwLock<Value>,
123 player_version: RwLock<u64>,
124 members: RwLock<Value>,
125 current_user_id: Mutex<Option<String>>,
126 current_connection_id: Mutex<Option<String>>,
127 connection_state: RwLock<String>,
128 reconnect_info: RwLock<Option<Value>>,
129
130 base_url: String,
132 token_fn: Box<dyn Fn() -> String + Send + Sync>,
133 opts: RoomOptions,
134
135 shared_state_handlers: HandlerList<StateHandler>,
137 player_state_handlers: HandlerList<StateHandler>,
138 message_handlers: Arc<Mutex<HashMap<String, Vec<(u64, MessageHandler)>>>>,
139 error_handlers: HandlerList<ErrorHandler>,
140 kicked_handlers: HandlerList<KickedHandler>,
141 member_sync_handlers: HandlerList<MembersSyncHandler>,
142 member_join_handlers: HandlerList<MemberHandler>,
143 member_leave_handlers: HandlerList<MemberLeaveHandler>,
144 member_state_handlers: HandlerList<MemberStateHandler>,
145 signal_handlers: Arc<Mutex<HashMap<String, Vec<(u64, SignalHandler)>>>>,
146 any_signal_handlers: HandlerList<AnySignalHandler>,
147 reconnect_handlers: HandlerList<ReconnectHandler>,
148 connection_state_handlers: HandlerList<ConnectionStateHandler>,
149 handler_id_counter: Mutex<u64>,
150
151 pending_requests: Mutex<HashMap<String, oneshot::Sender<Result<Value, Error>>>>,
153 pending_signal_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
154 pending_admin_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
155 pending_member_state_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
156 send_tx: Mutex<Option<mpsc::Sender<RoomWsCommand>>>,
158
159 stop_tx: Mutex<Option<mpsc::Sender<()>>>,
161 intentionally_left: Mutex<bool>,
162}
163
164impl RoomClient {
165 pub fn new(
174 base_url: &str,
175 namespace: &str,
176 room_id: &str,
177 token_fn: impl Fn() -> String + Send + Sync + 'static,
178 opts: Option<RoomOptions>,
179 ) -> Arc<Self> {
180 Arc::new(Self {
181 namespace: namespace.to_string(),
182 room_id: room_id.to_string(),
183 shared_state: RwLock::new(json!({})),
184 shared_version: RwLock::new(0),
185 player_state: RwLock::new(json!({})),
186 player_version: RwLock::new(0),
187 members: RwLock::new(json!([])),
188 current_user_id: Mutex::new(None),
189 current_connection_id: Mutex::new(None),
190 connection_state: RwLock::new(ROOM_STATE_IDLE.to_string()),
191 reconnect_info: RwLock::new(None),
192 base_url: base_url.trim_end_matches('/').to_string(),
193 token_fn: Box::new(token_fn),
194 opts: opts.unwrap_or_default(),
195 shared_state_handlers: Arc::new(Mutex::new(vec![])),
196 player_state_handlers: Arc::new(Mutex::new(vec![])),
197 message_handlers: Arc::new(Mutex::new(HashMap::new())),
198 error_handlers: Arc::new(Mutex::new(vec![])),
199 kicked_handlers: Arc::new(Mutex::new(vec![])),
200 member_sync_handlers: Arc::new(Mutex::new(vec![])),
201 member_join_handlers: Arc::new(Mutex::new(vec![])),
202 member_leave_handlers: Arc::new(Mutex::new(vec![])),
203 member_state_handlers: Arc::new(Mutex::new(vec![])),
204 signal_handlers: Arc::new(Mutex::new(HashMap::new())),
205 any_signal_handlers: Arc::new(Mutex::new(vec![])),
206 reconnect_handlers: Arc::new(Mutex::new(vec![])),
207 connection_state_handlers: Arc::new(Mutex::new(vec![])),
208 handler_id_counter: Mutex::new(0),
209 pending_requests: Mutex::new(HashMap::new()),
210 pending_signal_requests: Mutex::new(HashMap::new()),
211 pending_admin_requests: Mutex::new(HashMap::new()),
212 pending_member_state_requests: Mutex::new(HashMap::new()),
213 send_tx: Mutex::new(None),
214 stop_tx: Mutex::new(None),
215 intentionally_left: Mutex::new(false),
216 })
217 }
218
219 pub fn get_shared_state(&self) -> Value {
223 self.shared_state.read().unwrap().clone()
224 }
225
226 pub fn get_player_state(&self) -> Value {
228 self.player_state.read().unwrap().clone()
229 }
230
231 pub fn list_members(&self) -> Value {
233 self.members.read().unwrap().clone()
234 }
235
236 pub fn connection_state(&self) -> String {
238 self.connection_state.read().unwrap().clone()
239 }
240
241 pub fn state(self: &Arc<Self>) -> RoomStateNamespace {
242 RoomStateNamespace::new(Arc::clone(self))
243 }
244
245 pub fn meta(self: &Arc<Self>) -> RoomMetaNamespace {
246 RoomMetaNamespace::new(Arc::clone(self))
247 }
248
249 pub fn signals(self: &Arc<Self>) -> RoomSignalsNamespace {
250 RoomSignalsNamespace::new(Arc::clone(self))
251 }
252
253 pub fn members(self: &Arc<Self>) -> RoomMembersNamespace {
254 RoomMembersNamespace::new(Arc::clone(self))
255 }
256
257 pub fn admin(self: &Arc<Self>) -> RoomAdminNamespace {
258 RoomAdminNamespace::new(Arc::clone(self))
259 }
260
261 pub fn session(self: &Arc<Self>) -> RoomSessionNamespace {
262 RoomSessionNamespace::new(Arc::clone(self))
263 }
264
265 pub async fn get_metadata(&self) -> Result<Value, Error> {
270 Self::get_metadata_static(&self.base_url, &self.namespace, &self.room_id).await
271 }
272
273 pub async fn get_metadata_static(
276 base_url: &str,
277 namespace: &str,
278 room_id: &str,
279 ) -> Result<Value, Error> {
280 let url = format!(
281 "{}/api/room/metadata?namespace={}&id={}",
282 base_url.trim_end_matches('/'),
283 urlencoding::encode(namespace),
284 urlencoding::encode(room_id)
285 );
286 let resp = reqwest::get(&url)
287 .await
288 .map_err(|e| Error::Room(format!(
289 "Room metadata request could not reach {}. Make sure the EdgeBase server is running and reachable. Cause: {}",
290 url, e
291 )))?;
292 let status = resp.status();
293 let body = resp
294 .text()
295 .await
296 .map_err(|e| Error::Room(format!("Failed to read room metadata body from {}: {}", url, e)))?;
297 if !status.is_success() {
298 if let Ok(json) = serde_json::from_str::<Value>(&body) {
299 for key in ["message", "error", "detail"] {
300 if let Some(message) = json.get(key).and_then(|value| value.as_str()) {
301 let trimmed = message.trim();
302 if !trimmed.is_empty() {
303 return Err(Error::Room(trimmed.to_string()));
304 }
305 }
306 }
307 }
308 return Err(Error::Room(format!(
309 "Failed to load room metadata for '{}' in namespace '{}' (HTTP {}).",
310 room_id, namespace, status
311 )));
312 }
313 serde_json::from_str(&body)
314 .map_err(|e| Error::Room(format!("Failed to parse room metadata: {}", e)))
315 }
316
317 pub async fn join(self: &Arc<Self>) -> Result<(), Error> {
321 *self.intentionally_left.lock().unwrap() = false;
322 self.set_connection_state(ROOM_STATE_CONNECTING);
323
324 let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
325 *self.stop_tx.lock().unwrap() = Some(stop_tx);
326
327 let this = Arc::clone(self);
328 tokio::spawn(async move {
329 let mut attempts = 0u32;
330 loop {
331 match this.establish().await {
332 Ok(mut ws_rx) => {
333 attempts = 0;
334 loop {
335 tokio::select! {
336 _ = stop_rx.recv() => return,
337 msg = ws_rx.recv() => {
338 match msg {
339 Some(raw) => this.handle_message(&raw),
340 None => {
341 if !*this.intentionally_left.lock().unwrap() {
344 this.reject_all_pending(
345 "WebSocket disconnected",
346 );
347 }
348 break;
349 }
350 }
351 }
352 }
353 }
354 }
355 Err(_) => {}
356 }
357 if *this.intentionally_left.lock().unwrap() {
358 return;
359 }
360 if !this.opts.auto_reconnect || attempts >= this.opts.max_reconnect_attempts {
361 this.set_connection_state(ROOM_STATE_DISCONNECTED);
362 return;
363 }
364 let delay = (this.opts.reconnect_base_delay_ms * (1u64 << attempts)).min(30_000);
365 attempts += 1;
366 this.begin_reconnect_attempt(attempts as u64);
367 sleep(Duration::from_millis(delay)).await;
368 }
369 });
370 Ok(())
371 }
372
373 pub async fn leave(&self) {
375 *self.intentionally_left.lock().unwrap() = true;
376
377 let send_tx = self.send_tx.lock().unwrap().clone();
378 if let Some(tx) = send_tx.as_ref() {
379 let _ = tx
380 .send(RoomWsCommand::Send(
381 json!({"type": "leave"}).to_string(),
382 ))
383 .await;
384 sleep(ROOM_EXPLICIT_LEAVE_CLOSE_DELAY).await;
385 let _ = tx.send(RoomWsCommand::Close).await;
386 }
387
388 if let Some(tx) = self.stop_tx.lock().unwrap().take() {
389 let _ = tx.send(()).await;
390 }
391
392 *self.send_tx.lock().unwrap() = None;
393
394 self.reject_all_pending("Room left");
396
397 *self.shared_state.write().unwrap() = json!({});
399 *self.shared_version.write().unwrap() = 0;
400 *self.player_state.write().unwrap() = json!({});
401 *self.player_version.write().unwrap() = 0;
402 *self.members.write().unwrap() = json!([]);
403 *self.current_user_id.lock().unwrap() = None;
404 *self.current_connection_id.lock().unwrap() = None;
405 *self.reconnect_info.write().unwrap() = None;
406 self.set_connection_state(ROOM_STATE_IDLE);
407 }
408
409 pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
419 if self.send_tx.lock().unwrap().is_none() {
420 return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions or signals.".to_string()));
421 }
422
423 let request_id = Uuid::new_v4().to_string();
424 let (tx, rx) = oneshot::channel::<Result<Value, Error>>();
425
426 self.pending_requests
427 .lock()
428 .unwrap()
429 .insert(request_id.clone(), tx);
430
431 self.ws_send(json!({
432 "type": "send",
433 "actionType": action_type,
434 "payload": payload.unwrap_or(json!({})),
435 "requestId": request_id,
436 }));
437
438 let timeout_ms = self.opts.send_timeout_ms;
439 let action_type_owned = action_type.to_string();
440 let req_id = request_id.clone();
441
442 match timeout(Duration::from_millis(timeout_ms), rx).await {
443 Ok(Ok(result)) => result,
444 Ok(Err(_)) => {
445 self.pending_requests.lock().unwrap().remove(&req_id);
447 Err(Error::Room(
448 "Room left while waiting for action result".to_string(),
449 ))
450 }
451 Err(_) => {
452 self.pending_requests.lock().unwrap().remove(&req_id);
454 Err(Error::RoomTimeout(format!(
455 "Action '{}' timed out",
456 action_type_owned
457 )))
458 }
459 }
460 }
461
462 pub fn on_shared_state(
467 &self,
468 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
469 ) -> Subscription {
470 let id = self.next_handler_id();
471 let handler = Arc::new(handler) as StateHandler;
472 self.shared_state_handlers.lock().unwrap().push((id, handler));
473
474 let list = Arc::clone(&self.shared_state_handlers);
475 Subscription::new(move || {
476 list.lock().unwrap().retain(|(hid, _)| *hid != id);
477 })
478 }
479
480 pub fn on_player_state(
483 &self,
484 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
485 ) -> Subscription {
486 let id = self.next_handler_id();
487 let handler = Arc::new(handler) as StateHandler;
488 self.player_state_handlers.lock().unwrap().push((id, handler));
489
490 let list = Arc::clone(&self.player_state_handlers);
491 Subscription::new(move || {
492 list.lock().unwrap().retain(|(hid, _)| *hid != id);
493 })
494 }
495
496 pub fn on_message(
503 &self,
504 msg_type: &str,
505 handler: impl Fn(&Value) + Send + Sync + 'static,
506 ) -> Subscription {
507 let id = self.next_handler_id();
508 let handler = Arc::new(handler) as MessageHandler;
509 let msg_type = msg_type.to_string();
510 {
511 let mut map = self.message_handlers.lock().unwrap();
512 map.entry(msg_type.clone())
513 .or_insert_with(Vec::new)
514 .push((id, handler));
515 }
516
517 let map = Arc::clone(&self.message_handlers);
518 Subscription::new(move || {
519 if let Some(list) = map.lock().unwrap().get_mut(&msg_type) {
520 list.retain(|(hid, _)| *hid != id);
521 }
522 })
523 }
524
525 pub fn on_error(
527 &self,
528 handler: impl Fn(&str, &str) + Send + Sync + 'static,
529 ) -> Subscription {
530 let id = self.next_handler_id();
531 let handler = Arc::new(handler) as ErrorHandler;
532 self.error_handlers.lock().unwrap().push((id, handler));
533
534 let list = Arc::clone(&self.error_handlers);
535 Subscription::new(move || {
536 list.lock().unwrap().retain(|(hid, _)| *hid != id);
537 })
538 }
539
540 pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
542 let id = self.next_handler_id();
543 let handler = Arc::new(handler) as KickedHandler;
544 self.kicked_handlers.lock().unwrap().push((id, handler));
545
546 let list = Arc::clone(&self.kicked_handlers);
547 Subscription::new(move || {
548 list.lock().unwrap().retain(|(hid, _)| *hid != id);
549 })
550 }
551
552 pub fn on_members_sync(
553 &self,
554 handler: impl Fn(&Value) + Send + Sync + 'static,
555 ) -> Subscription {
556 let id = self.next_handler_id();
557 let handler = Arc::new(handler) as MembersSyncHandler;
558 self.member_sync_handlers.lock().unwrap().push((id, handler));
559
560 let list = Arc::clone(&self.member_sync_handlers);
561 Subscription::new(move || {
562 list.lock().unwrap().retain(|(hid, _)| *hid != id);
563 })
564 }
565
566 pub fn on_member_join(
567 &self,
568 handler: impl Fn(&Value) + Send + Sync + 'static,
569 ) -> Subscription {
570 let id = self.next_handler_id();
571 let handler = Arc::new(handler) as MemberHandler;
572 self.member_join_handlers.lock().unwrap().push((id, handler));
573
574 let list = Arc::clone(&self.member_join_handlers);
575 Subscription::new(move || {
576 list.lock().unwrap().retain(|(hid, _)| *hid != id);
577 })
578 }
579
580 pub fn on_member_leave(
581 &self,
582 handler: impl Fn(&Value, &str) + Send + Sync + 'static,
583 ) -> Subscription {
584 let id = self.next_handler_id();
585 let handler = Arc::new(handler) as MemberLeaveHandler;
586 self.member_leave_handlers.lock().unwrap().push((id, handler));
587
588 let list = Arc::clone(&self.member_leave_handlers);
589 Subscription::new(move || {
590 list.lock().unwrap().retain(|(hid, _)| *hid != id);
591 })
592 }
593
594 pub fn on_member_state_change(
595 &self,
596 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
597 ) -> Subscription {
598 let id = self.next_handler_id();
599 let handler = Arc::new(handler) as MemberStateHandler;
600 self.member_state_handlers.lock().unwrap().push((id, handler));
601
602 let list = Arc::clone(&self.member_state_handlers);
603 Subscription::new(move || {
604 list.lock().unwrap().retain(|(hid, _)| *hid != id);
605 })
606 }
607
608 pub fn on_signal(
609 &self,
610 event: &str,
611 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
612 ) -> Subscription {
613 let id = self.next_handler_id();
614 let handler = Arc::new(handler) as SignalHandler;
615 let event = event.to_string();
616 {
617 let mut map = self.signal_handlers.lock().unwrap();
618 map.entry(event.clone())
619 .or_insert_with(Vec::new)
620 .push((id, handler));
621 }
622
623 let map = Arc::clone(&self.signal_handlers);
624 Subscription::new(move || {
625 if let Some(list) = map.lock().unwrap().get_mut(&event) {
626 list.retain(|(hid, _)| *hid != id);
627 }
628 })
629 }
630
631 pub fn on_any_signal(
632 &self,
633 handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
634 ) -> Subscription {
635 let id = self.next_handler_id();
636 let handler = Arc::new(handler) as AnySignalHandler;
637 self.any_signal_handlers.lock().unwrap().push((id, handler));
638
639 let list = Arc::clone(&self.any_signal_handlers);
640 Subscription::new(move || {
641 list.lock().unwrap().retain(|(hid, _)| *hid != id);
642 })
643 }
644
645 pub fn on_reconnect(
646 &self,
647 handler: impl Fn(&Value) + Send + Sync + 'static,
648 ) -> Subscription {
649 let id = self.next_handler_id();
650 let handler = Arc::new(handler) as ReconnectHandler;
651 self.reconnect_handlers.lock().unwrap().push((id, handler));
652
653 let list = Arc::clone(&self.reconnect_handlers);
654 Subscription::new(move || {
655 list.lock().unwrap().retain(|(hid, _)| *hid != id);
656 })
657 }
658
659 pub fn on_connection_state_change(
660 &self,
661 handler: impl Fn(&str) + Send + Sync + 'static,
662 ) -> Subscription {
663 let id = self.next_handler_id();
664 let handler = Arc::new(handler) as ConnectionStateHandler;
665 self.connection_state_handlers
666 .lock()
667 .unwrap()
668 .push((id, handler));
669
670 let list = Arc::clone(&self.connection_state_handlers);
671 Subscription::new(move || {
672 list.lock().unwrap().retain(|(hid, _)| *hid != id);
673 })
674 }
675
676 pub async fn send_signal(
677 &self,
678 event: &str,
679 payload: Option<Value>,
680 options: Option<Value>,
681 ) -> Result<(), Error> {
682 if self.send_tx.lock().unwrap().is_none() {
683 return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions or signals.".to_string()));
684 }
685
686 let request_id = Uuid::new_v4().to_string();
687 let options = options.unwrap_or_else(|| json!({}));
688 let message = json!({
689 "type": "signal",
690 "event": event,
691 "payload": payload.unwrap_or_else(|| json!({})),
692 "requestId": request_id,
693 "memberId": options.get("memberId").cloned().unwrap_or(Value::Null),
694 "includeSelf": options.get("includeSelf").and_then(Value::as_bool).unwrap_or(false),
695 });
696
697 self.send_unit_request(&self.pending_signal_requests, request_id, message, format!("Signal '{}' timed out", event)).await
698 }
699
700 pub async fn send_member_state(&self, state: Value) -> Result<(), Error> {
701 if self.send_tx.lock().unwrap().is_none() {
702 return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions or signals.".to_string()));
703 }
704
705 let request_id = Uuid::new_v4().to_string();
706 let message = json!({
707 "type": "member_state",
708 "state": state,
709 "requestId": request_id,
710 });
711
712 self.send_unit_request(
713 &self.pending_member_state_requests,
714 request_id,
715 message,
716 "Member state update timed out".to_string(),
717 ).await
718 }
719
720 pub async fn clear_member_state(&self) -> Result<(), Error> {
721 if self.send_tx.lock().unwrap().is_none() {
722 return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions or signals.".to_string()));
723 }
724
725 let request_id = Uuid::new_v4().to_string();
726 let message = json!({
727 "type": "member_state_clear",
728 "requestId": request_id,
729 });
730
731 self.send_unit_request(
732 &self.pending_member_state_requests,
733 request_id,
734 message,
735 "Member state clear timed out".to_string(),
736 ).await
737 }
738
739 pub async fn send_admin(
740 &self,
741 operation: &str,
742 member_id: &str,
743 payload: Option<Value>,
744 ) -> Result<(), Error> {
745 if self.send_tx.lock().unwrap().is_none() {
746 return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions or signals.".to_string()));
747 }
748
749 let request_id = Uuid::new_v4().to_string();
750 let message = json!({
751 "type": "admin",
752 "operation": operation,
753 "memberId": member_id,
754 "payload": payload.unwrap_or_else(|| json!({})),
755 "requestId": request_id,
756 });
757
758 self.send_unit_request(
759 &self.pending_admin_requests,
760 request_id,
761 message,
762 format!("Admin '{}' timed out", operation),
763 ).await
764 }
765
766 fn reject_all_pending(&self, message: &str) {
769 let error_msg = message.to_string();
770
771 for (_, tx) in self.pending_requests.lock().unwrap().drain() {
773 let _ = tx.send(Err(Error::Room(error_msg.clone())));
774 }
775
776 for (_, tx) in self.pending_signal_requests.lock().unwrap().drain() {
778 let _ = tx.send(Err(Error::Room(error_msg.clone())));
779 }
780
781 for (_, tx) in self.pending_admin_requests.lock().unwrap().drain() {
783 let _ = tx.send(Err(Error::Room(error_msg.clone())));
784 }
785
786 for (_, tx) in self.pending_member_state_requests.lock().unwrap().drain() {
788 let _ = tx.send(Err(Error::Room(error_msg.clone())));
789 }
790
791 }
792
793 pub async fn destroy(self: &Arc<Self>) {
796 self.leave().await;
797
798 self.shared_state_handlers.lock().unwrap().clear();
800 self.player_state_handlers.lock().unwrap().clear();
801 self.message_handlers.lock().unwrap().clear();
802 self.error_handlers.lock().unwrap().clear();
803 self.kicked_handlers.lock().unwrap().clear();
804 self.member_sync_handlers.lock().unwrap().clear();
805 self.member_join_handlers.lock().unwrap().clear();
806 self.member_leave_handlers.lock().unwrap().clear();
807 self.member_state_handlers.lock().unwrap().clear();
808 self.signal_handlers.lock().unwrap().clear();
809 self.any_signal_handlers.lock().unwrap().clear();
810 self.reconnect_handlers.lock().unwrap().clear();
811 self.connection_state_handlers.lock().unwrap().clear();
812 }
813
814 fn ws_url(&self) -> String {
817 let u = self
818 .base_url
819 .replace("https://", "wss://")
820 .replace("http://", "ws://");
821 format!(
822 "{}/api/room?namespace={}&id={}",
823 u,
824 urlencoding::encode(&self.namespace),
825 urlencoding::encode(&self.room_id)
826 )
827 }
828
829 fn ws_send(&self, msg: Value) {
831 let s = msg.to_string();
832 if let Some(tx) = self.send_tx.lock().unwrap().as_ref() {
833 let tx = tx.clone();
834 tokio::spawn(async move {
835 let _ = tx.send(RoomWsCommand::Send(s)).await;
836 });
837 }
838 }
839
840 #[cfg(test)]
841 pub(crate) fn attach_send_channel_for_testing(&self, tx: mpsc::Sender<RoomWsCommand>) {
842 *self.send_tx.lock().unwrap() = Some(tx);
843 }
844
845 #[cfg(test)]
846 pub(crate) fn handle_message_for_testing(&self, raw: &str) {
847 self.handle_message(raw);
848 }
849
850 async fn establish(&self) -> anyhow::Result<mpsc::Receiver<String>> {
851 use futures_util::{SinkExt, StreamExt};
852 use tokio_tungstenite::{connect_async, tungstenite::Message};
853
854 let (ws_stream, _response) = tokio::time::timeout(
855 std::time::Duration::from_millis(self.opts.connection_timeout_ms),
856 connect_async(self.ws_url()),
857 )
858 .await
859 .map_err(|_| anyhow::anyhow!(
860 "Room WebSocket connection timed out after {}ms. Is the server running?",
861 self.opts.connection_timeout_ms,
862 ))??;
863 let (mut write, mut read) = ws_stream.split();
864
865 let auth = json!({"type": "auth", "token": (self.token_fn)(), "sdkVersion": "0.2.8"});
867 write.send(Message::Text(auth.to_string().into())).await?;
868
869 let join_raw = if let Some(Ok(Message::Text(raw))) = read.next().await {
871 let raw_str: &str = &raw;
872 let resp: Value = serde_json::from_str(raw_str)?;
873 let t = resp["type"].as_str().unwrap_or("");
874 if t != "auth_success" && t != "auth_refreshed" {
875 anyhow::bail!("Room auth failed: {}", resp["message"]);
876 }
877
878 *self.current_user_id.lock().unwrap() = resp["userId"].as_str().map(|value| value.to_string());
879 *self.current_connection_id.lock().unwrap() = resp["connectionId"].as_str().map(|value| value.to_string());
880
881 let join_msg = json!({
883 "type": "join",
884 "lastSharedState": *self.shared_state.read().unwrap(),
885 "lastSharedVersion": *self.shared_version.read().unwrap(),
886 "lastPlayerState": *self.player_state.read().unwrap(),
887 "lastPlayerVersion": *self.player_version.read().unwrap(),
888 });
889 join_msg.to_string()
890 } else {
891 anyhow::bail!("Room: no auth response");
892 };
893
894 let (write_tx, mut write_rx) = mpsc::channel::<RoomWsCommand>(128);
896 *self.send_tx.lock().unwrap() = Some(write_tx.clone());
897
898 tokio::spawn(async move {
900 while let Some(command) = write_rx.recv().await {
901 match command {
902 RoomWsCommand::Send(msg) => {
903 if write.send(Message::Text(msg.into())).await.is_err() {
904 break;
905 }
906 }
907 RoomWsCommand::Close => {
908 let _ = write.send(Message::Close(None)).await;
909 break;
910 }
911 }
912 }
913 });
914
915 let _ = write_tx.send(RoomWsCommand::Send(join_raw)).await;
917
918 let htx = write_tx.clone();
920 tokio::spawn(async move {
921 loop {
922 sleep(Duration::from_secs(30)).await;
923 if htx
924 .send(RoomWsCommand::Send(json!({"type":"ping"}).to_string()))
925 .await
926 .is_err()
927 {
928 break;
929 }
930 }
931 });
932
933 let (msg_tx, msg_rx) = mpsc::channel::<String>(128);
935 tokio::spawn(async move {
936 while let Some(Ok(Message::Text(raw))) = read.next().await {
937 let raw_str: String = raw.into();
938 if msg_tx.send(raw_str).await.is_err() {
939 break;
940 }
941 }
942 });
943
944 Ok(msg_rx)
945 }
946
947 fn handle_message(&self, raw: &str) {
950 let msg: Value = match serde_json::from_str(raw) {
951 Ok(v) => v,
952 Err(_) => return,
953 };
954 let t = msg["type"].as_str().unwrap_or("");
955
956 match t {
957 "sync" => self.handle_sync(&msg),
958 "shared_delta" => self.handle_shared_delta(&msg),
959 "player_delta" => self.handle_player_delta(&msg),
960 "action_result" => self.handle_action_result(&msg),
961 "action_error" => self.handle_action_error(&msg),
962 "message" => self.handle_server_message(&msg),
963 "signal" => self.handle_signal(&msg),
964 "signal_sent" => self.resolve_pending_unit_request(&self.pending_signal_requests, &msg),
965 "signal_error" => self.reject_pending_unit_request(&self.pending_signal_requests, &msg, "Signal error"),
966 "members_sync" => self.handle_members_sync(&msg),
967 "member_join" => self.handle_member_join(&msg),
968 "member_leave" => self.handle_member_leave(&msg),
969 "member_state" => self.handle_member_state(&msg),
970 "member_state_error" => self.reject_pending_unit_request(&self.pending_member_state_requests, &msg, "Member state error"),
971 "admin_result" => self.resolve_pending_unit_request(&self.pending_admin_requests, &msg),
972 "admin_error" => self.reject_pending_unit_request(&self.pending_admin_requests, &msg, "Admin error"),
973 "kicked" => self.handle_kicked(),
974 "error" => self.handle_error(&msg),
975 "pong" => {}
976 _ => {}
977 }
978 }
979
980 fn handle_sync(&self, msg: &Value) {
981 *self.shared_state.write().unwrap() = msg["sharedState"].clone();
982 *self.shared_version.write().unwrap() = msg["sharedVersion"].as_u64().unwrap_or(0);
983 *self.player_state.write().unwrap() = msg["playerState"].clone();
984 *self.player_version.write().unwrap() = msg["playerVersion"].as_u64().unwrap_or(0);
985 self.set_connection_state(ROOM_STATE_CONNECTED);
986
987 if let Some(info) = self.reconnect_info.write().unwrap().take() {
988 for (_, handler) in self.reconnect_handlers.lock().unwrap().iter() {
989 handler(&info);
990 }
991 }
992
993 let shared = self.shared_state.read().unwrap().clone();
995 for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
996 handler(&shared, &shared);
997 }
998 let player = self.player_state.read().unwrap().clone();
999 for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1000 handler(&player, &player);
1001 }
1002 }
1003
1004 fn handle_shared_delta(&self, msg: &Value) {
1005 let delta = &msg["delta"];
1006 *self.shared_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1007
1008 if let Value::Object(map) = delta {
1009 let mut state = self.shared_state.write().unwrap();
1010 for (path, value) in map {
1011 deep_set(&mut state, path, value.clone());
1012 }
1013 }
1014
1015 let state = self.shared_state.read().unwrap().clone();
1016 for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1017 handler(&state, delta);
1018 }
1019 }
1020
1021 fn handle_player_delta(&self, msg: &Value) {
1022 let delta = &msg["delta"];
1023 *self.player_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1024
1025 if let Value::Object(map) = delta {
1026 let mut state = self.player_state.write().unwrap();
1027 for (path, value) in map {
1028 deep_set(&mut state, path, value.clone());
1029 }
1030 }
1031
1032 let state = self.player_state.read().unwrap().clone();
1033 for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1034 handler(&state, delta);
1035 }
1036 }
1037
1038 fn handle_action_result(&self, msg: &Value) {
1039 let request_id = msg["requestId"].as_str().unwrap_or("");
1040 if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1041 let _ = tx.send(Ok(msg["result"].clone()));
1042 }
1043 }
1044
1045 fn handle_action_error(&self, msg: &Value) {
1046 let request_id = msg["requestId"].as_str().unwrap_or("");
1047 if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1048 let message = msg["message"].as_str().unwrap_or("Unknown EdgeBase error. Check the server response or logs for details.");
1049 let _ = tx.send(Err(Error::Room(message.to_string())));
1050 }
1051 }
1052
1053 fn handle_server_message(&self, msg: &Value) {
1054 let message_type = msg["messageType"].as_str().unwrap_or("");
1055 let data = &msg["data"];
1056
1057 let handlers = self.message_handlers.lock().unwrap();
1058 if let Some(list) = handlers.get(message_type) {
1059 for (_, handler) in list {
1060 handler(data);
1061 }
1062 }
1063 }
1064
1065 fn handle_signal(&self, msg: &Value) {
1066 let event = msg["event"].as_str().unwrap_or("");
1067 let payload = &msg["payload"];
1068 let meta = &msg["meta"];
1069
1070 if let Some(list) = self.signal_handlers.lock().unwrap().get(event) {
1071 for (_, handler) in list {
1072 handler(payload, meta);
1073 }
1074 }
1075
1076 for (_, handler) in self.any_signal_handlers.lock().unwrap().iter() {
1077 handler(event, payload, meta);
1078 }
1079 }
1080
1081 fn handle_members_sync(&self, msg: &Value) {
1082 let members = normalize_members(&msg["members"]);
1083 *self.members.write().unwrap() = members.clone();
1084
1085 for (_, handler) in self.member_sync_handlers.lock().unwrap().iter() {
1086 handler(&members);
1087 }
1088 }
1089
1090 fn handle_member_join(&self, msg: &Value) {
1091 if let Some(member) = normalize_member(&msg["member"]) {
1092 self.upsert_member(member.clone());
1093 for (_, handler) in self.member_join_handlers.lock().unwrap().iter() {
1094 handler(&member);
1095 }
1096 }
1097 }
1098
1099 fn handle_member_leave(&self, msg: &Value) {
1100 if let Some(member) = normalize_member(&msg["member"]) {
1101 let member_id = member["memberId"].as_str().unwrap_or("");
1102 self.remove_member(member_id);
1103 let reason = msg["reason"].as_str().unwrap_or("leave");
1104 for (_, handler) in self.member_leave_handlers.lock().unwrap().iter() {
1105 handler(&member, reason);
1106 }
1107 }
1108 }
1109
1110 fn handle_member_state(&self, msg: &Value) {
1111 if let Some(mut member) = normalize_member(&msg["member"]) {
1112 let state = object_or_empty(&msg["state"]);
1113 member["state"] = state.clone();
1114 self.upsert_member(member.clone());
1115 self.resolve_pending_unit_request(&self.pending_member_state_requests, msg);
1116
1117 for (_, handler) in self.member_state_handlers.lock().unwrap().iter() {
1118 handler(&member, &state);
1119 }
1120 }
1121 }
1122
1123 fn handle_kicked(&self) {
1124 self.set_connection_state(ROOM_STATE_KICKED);
1125 *self.intentionally_left.lock().unwrap() = true;
1126 for (_, handler) in self.kicked_handlers.lock().unwrap().iter() {
1127 handler();
1128 }
1129 }
1130
1131 fn handle_error(&self, msg: &Value) {
1132 let code = msg["code"].as_str().unwrap_or("");
1133 let message = msg["message"].as_str().unwrap_or("");
1134 for (_, handler) in self.error_handlers.lock().unwrap().iter() {
1135 handler(code, message);
1136 }
1137 }
1138
1139 async fn send_unit_request(
1142 &self,
1143 pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1144 request_id: String,
1145 message: Value,
1146 timeout_message: String,
1147 ) -> Result<(), Error> {
1148 let (tx, rx) = oneshot::channel::<Result<(), Error>>();
1149 pending.lock().unwrap().insert(request_id.clone(), tx);
1150 self.ws_send(message);
1151
1152 match timeout(Duration::from_millis(self.opts.send_timeout_ms), rx).await {
1153 Ok(Ok(result)) => result,
1154 Ok(Err(_)) => {
1155 pending.lock().unwrap().remove(&request_id);
1156 Err(Error::Room(
1157 "Room left while waiting for room control result".to_string(),
1158 ))
1159 }
1160 Err(_) => {
1161 pending.lock().unwrap().remove(&request_id);
1162 Err(Error::RoomTimeout(timeout_message))
1163 }
1164 }
1165 }
1166
1167 fn resolve_pending_unit_request(
1168 &self,
1169 pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1170 msg: &Value,
1171 ) {
1172 let request_id = msg["requestId"].as_str().unwrap_or("");
1173 if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1174 let _ = tx.send(Ok(()));
1175 }
1176 }
1177
1178 fn reject_pending_unit_request(
1179 &self,
1180 pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1181 msg: &Value,
1182 fallback: &str,
1183 ) {
1184 let request_id = msg["requestId"].as_str().unwrap_or("");
1185 if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1186 let message = msg["message"].as_str().unwrap_or(fallback);
1187 let _ = tx.send(Err(Error::Room(message.to_string())));
1188 }
1189 }
1190
1191 fn set_connection_state(&self, next: &str) {
1192 let mut state = self.connection_state.write().unwrap();
1193 if state.as_str() == next {
1194 return;
1195 }
1196 *state = next.to_string();
1197 drop(state);
1198
1199 for (_, handler) in self.connection_state_handlers.lock().unwrap().iter() {
1200 handler(next);
1201 }
1202 }
1203
1204 fn begin_reconnect_attempt(&self, attempt: u64) {
1205 *self.reconnect_info.write().unwrap() = Some(json!({ "attempt": attempt }));
1206 self.set_connection_state(ROOM_STATE_RECONNECTING);
1207 }
1208
1209 fn upsert_member(&self, member: Value) {
1210 let member_id = member["memberId"].as_str().unwrap_or("").to_string();
1211 if member_id.is_empty() {
1212 return;
1213 }
1214
1215 let mut members = self.members.write().unwrap();
1216 let list = members.as_array_mut().expect("members array");
1217 if let Some(existing) = list
1218 .iter_mut()
1219 .find(|entry| entry["memberId"].as_str() == Some(member_id.as_str()))
1220 {
1221 *existing = member;
1222 } else {
1223 list.push(member);
1224 }
1225 }
1226
1227 fn remove_member(&self, member_id: &str) {
1228 let mut members = self.members.write().unwrap();
1229 if let Some(list) = members.as_array_mut() {
1230 list.retain(|entry| entry["memberId"].as_str() != Some(member_id));
1231 }
1232 }
1233
1234 fn next_handler_id(&self) -> u64 {
1235 let mut counter = self.handler_id_counter.lock().unwrap();
1236 *counter += 1;
1237 *counter
1238 }
1239}
1240
1241fn deep_set(obj: &mut Value, path: &str, value: Value) {
1244 if let Some(dot) = path.find('.') {
1245 let head = &path[..dot];
1246 let tail = &path[dot + 1..];
1247 if let Value::Object(map) = obj {
1248 let nested = map.entry(head.to_string()).or_insert(json!({}));
1249 deep_set(nested, tail, value);
1250 }
1251 } else if let Value::Object(map) = obj {
1252 if value.is_null() {
1253 map.remove(path);
1254 } else {
1255 map.insert(path.to_string(), value);
1256 }
1257 }
1258}
1259
1260fn object_or_empty(value: &Value) -> Value {
1261 if value.is_object() {
1262 value.clone()
1263 } else {
1264 json!({})
1265 }
1266}
1267
1268fn normalize_member(value: &Value) -> Option<Value> {
1269 let member_id = value["memberId"].as_str()?;
1270 let user_id = value["userId"].as_str()?;
1271 let mut member = json!({
1272 "memberId": member_id,
1273 "userId": user_id,
1274 "state": object_or_empty(&value["state"]),
1275 });
1276
1277 if let Some(connection_id) = value["connectionId"].as_str() {
1278 member["connectionId"] = Value::String(connection_id.to_string());
1279 }
1280 if let Some(connection_count) = value["connectionCount"].as_u64() {
1281 member["connectionCount"] = Value::from(connection_count);
1282 }
1283 if let Some(role) = value["role"].as_str() {
1284 member["role"] = Value::String(role.to_string());
1285 }
1286 Some(member)
1287}
1288
1289fn normalize_members(value: &Value) -> Value {
1290 Value::Array(
1291 value
1292 .as_array()
1293 .into_iter()
1294 .flatten()
1295 .filter_map(normalize_member)
1296 .collect(),
1297 )
1298}
1299
1300pub struct RoomStateNamespace {
1301 client: Arc<RoomClient>,
1302}
1303
1304impl RoomStateNamespace {
1305 fn new(client: Arc<RoomClient>) -> Self {
1306 Self { client }
1307 }
1308
1309 pub fn get_shared(&self) -> Value {
1310 self.client.get_shared_state()
1311 }
1312
1313 pub fn get_mine(&self) -> Value {
1314 self.client.get_player_state()
1315 }
1316
1317 pub fn on_shared_change(
1318 &self,
1319 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1320 ) -> Subscription {
1321 self.client.on_shared_state(handler)
1322 }
1323
1324 pub fn on_mine_change(
1325 &self,
1326 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1327 ) -> Subscription {
1328 self.client.on_player_state(handler)
1329 }
1330
1331 pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
1332 self.client.send(action_type, payload).await
1333 }
1334}
1335
1336pub struct RoomMetaNamespace {
1337 client: Arc<RoomClient>,
1338}
1339
1340impl RoomMetaNamespace {
1341 fn new(client: Arc<RoomClient>) -> Self {
1342 Self { client }
1343 }
1344
1345 pub async fn get(&self) -> Result<Value, Error> {
1346 self.client.get_metadata().await
1347 }
1348}
1349
1350pub struct RoomSignalsNamespace {
1351 client: Arc<RoomClient>,
1352}
1353
1354impl RoomSignalsNamespace {
1355 fn new(client: Arc<RoomClient>) -> Self {
1356 Self { client }
1357 }
1358
1359 pub async fn send(
1360 &self,
1361 event: &str,
1362 payload: Option<Value>,
1363 options: Option<Value>,
1364 ) -> Result<(), Error> {
1365 self.client.send_signal(event, payload, options).await
1366 }
1367
1368 pub async fn send_to(
1369 &self,
1370 member_id: &str,
1371 event: &str,
1372 payload: Option<Value>,
1373 ) -> Result<(), Error> {
1374 self.client
1375 .send_signal(event, payload, Some(json!({ "memberId": member_id })))
1376 .await
1377 }
1378
1379 pub fn on(
1380 &self,
1381 event: &str,
1382 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1383 ) -> Subscription {
1384 self.client.on_signal(event, handler)
1385 }
1386
1387 pub fn on_any(
1388 &self,
1389 handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
1390 ) -> Subscription {
1391 self.client.on_any_signal(handler)
1392 }
1393}
1394
1395pub struct RoomMembersNamespace {
1396 client: Arc<RoomClient>,
1397}
1398
1399impl RoomMembersNamespace {
1400 fn new(client: Arc<RoomClient>) -> Self {
1401 Self { client }
1402 }
1403
1404 pub fn list(&self) -> Value {
1405 self.client.list_members()
1406 }
1407
1408 pub fn on_sync(
1409 &self,
1410 handler: impl Fn(&Value) + Send + Sync + 'static,
1411 ) -> Subscription {
1412 self.client.on_members_sync(handler)
1413 }
1414
1415 pub fn on_join(
1416 &self,
1417 handler: impl Fn(&Value) + Send + Sync + 'static,
1418 ) -> Subscription {
1419 self.client.on_member_join(handler)
1420 }
1421
1422 pub fn on_leave(
1423 &self,
1424 handler: impl Fn(&Value, &str) + Send + Sync + 'static,
1425 ) -> Subscription {
1426 self.client.on_member_leave(handler)
1427 }
1428
1429 pub async fn set_state(&self, state: Value) -> Result<(), Error> {
1430 self.client.send_member_state(state).await
1431 }
1432
1433 pub async fn clear_state(&self) -> Result<(), Error> {
1434 self.client.clear_member_state().await
1435 }
1436
1437 pub fn on_state_change(
1438 &self,
1439 handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1440 ) -> Subscription {
1441 self.client.on_member_state_change(handler)
1442 }
1443}
1444
1445pub struct RoomAdminNamespace {
1446 client: Arc<RoomClient>,
1447}
1448
1449impl RoomAdminNamespace {
1450 fn new(client: Arc<RoomClient>) -> Self {
1451 Self { client }
1452 }
1453
1454 pub async fn kick(&self, member_id: &str) -> Result<(), Error> {
1455 self.client.send_admin("kick", member_id, None).await
1456 }
1457
1458 pub async fn block(&self, member_id: &str) -> Result<(), Error> {
1459 self.client.send_admin("block", member_id, None).await
1460 }
1461
1462 pub async fn set_role(&self, member_id: &str, role: &str) -> Result<(), Error> {
1463 self.client
1464 .send_admin("setRole", member_id, Some(json!({ "role": role })))
1465 .await
1466 }
1467
1468}
1469
1470pub struct RoomSessionNamespace {
1471 client: Arc<RoomClient>,
1472}
1473
1474impl RoomSessionNamespace {
1475 fn new(client: Arc<RoomClient>) -> Self {
1476 Self { client }
1477 }
1478
1479 pub fn on_error(
1480 &self,
1481 handler: impl Fn(&str, &str) + Send + Sync + 'static,
1482 ) -> Subscription {
1483 self.client.on_error(handler)
1484 }
1485
1486 pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
1487 self.client.on_kicked(handler)
1488 }
1489
1490 pub fn on_reconnect(
1491 &self,
1492 handler: impl Fn(&Value) + Send + Sync + 'static,
1493 ) -> Subscription {
1494 self.client.on_reconnect(handler)
1495 }
1496
1497 pub fn on_connection_state_change(
1498 &self,
1499 handler: impl Fn(&str) + Send + Sync + 'static,
1500 ) -> Subscription {
1501 self.client.on_connection_state_change(handler)
1502 }
1503
1504 pub fn connection_state(&self) -> String {
1505 self.client.connection_state()
1506 }
1507
1508 pub fn user_id(&self) -> Option<String> {
1509 self.client.current_user_id.lock().unwrap().clone()
1510 }
1511
1512 pub fn connection_id(&self) -> Option<String> {
1513 self.client.current_connection_id.lock().unwrap().clone()
1514 }
1515}