Skip to main content

edgebase_core/
room.rs

1// room.rs — RoomClient v2 for EdgeBase Rust SDK.
2//
3// Complete redesign from v1:
4//   - 2 client-visible state areas: sharedState (all clients), playerState (per-player)
5//   - Client can only read + subscribe + send(). All writes are server-only.
6//   - send() returns a Result resolved by requestId matching via oneshot channel
7//   - Subscription returns a Subscription struct with unsubscribe()
8//   - namespace + roomId identification (replaces single roomId)
9//
10// Usage:
11//   let room = RoomClient::new(&url, "game", "lobby-1", move || token.clone(), None);
12//   room.join().await?;
13//   let sub = room.on_shared_state(|state, changes| { ... });
14//   let result = room.send("SET_SCORE", Some(json!({"score": 42}))).await?;
15//   sub.unsubscribe();
16//   room.leave().await;
17
18use crate::error::Error;
19use serde_json::{json, Value};
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex, RwLock};
22use tokio::sync::{mpsc, oneshot};
23use tokio::time::{sleep, timeout, Duration};
24use uuid::Uuid;
25
26// ── Public types ──────────────────────────────────────────────────────────────
27
28pub struct RoomOptions {
29    pub auto_reconnect: bool,
30    pub max_reconnect_attempts: u32,
31    pub reconnect_base_delay_ms: u64,
32    /// Timeout for send() requests in ms (default: 10000)
33    pub send_timeout_ms: u64,
34    /// Timeout for WebSocket connection establishment in ms (default: 15000)
35    pub connection_timeout_ms: u64,
36}
37
38impl Default for RoomOptions {
39    fn default() -> Self {
40        Self {
41            auto_reconnect: true,
42            max_reconnect_attempts: 10,
43            reconnect_base_delay_ms: 1000,
44            send_timeout_ms: 10_000,
45            connection_timeout_ms: 15_000,
46        }
47    }
48}
49
50// ── Subscription ──────────────────────────────────────────────────────────────
51
52/// Handle returned by on_* methods. Call `unsubscribe()` to remove the handler.
53pub struct Subscription {
54    remove_fn: Mutex<Option<Box<dyn FnOnce() + Send>>>,
55}
56
57impl Subscription {
58    fn new(remove_fn: impl FnOnce() + Send + 'static) -> Self {
59        Self {
60            remove_fn: Mutex::new(Some(Box::new(remove_fn))),
61        }
62    }
63
64    /// Remove the handler. Safe to call multiple times (subsequent calls are no-ops).
65    pub fn unsubscribe(self) {
66        if let Some(f) = self.remove_fn.lock().unwrap().take() {
67            f();
68        }
69    }
70}
71
72impl Drop for Subscription {
73    fn drop(&mut self) {
74        if let Some(f) = self.remove_fn.lock().unwrap().take() {
75            f();
76        }
77    }
78}
79
80// ── Handler type aliases (internal) ──────────────────────────────────────────
81
82type StateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
83type MessageHandler = Arc<dyn Fn(&Value) + Send + Sync>;
84type ErrorHandler = Arc<dyn Fn(&str, &str) + Send + Sync>;
85type KickedHandler = Arc<dyn Fn() + Send + Sync>;
86type MembersSyncHandler = Arc<dyn Fn(&Value) + Send + Sync>;
87type MemberHandler = Arc<dyn Fn(&Value) + Send + Sync>;
88type MemberLeaveHandler = Arc<dyn Fn(&Value, &str) + Send + Sync>;
89type MemberStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
90type SignalHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
91type AnySignalHandler = Arc<dyn Fn(&str, &Value, &Value) + Send + Sync>;
92type MediaTrackHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
93type MediaStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
94type MediaDeviceHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
95type ReconnectHandler = Arc<dyn Fn(&Value) + Send + Sync>;
96type ConnectionStateHandler = Arc<dyn Fn(&str) + Send + Sync>;
97
98/// ID-tagged handler entry for removal by Subscription.
99type HandlerList<H> = Arc<Mutex<Vec<(u64, H)>>>;
100
101pub(crate) enum RoomWsCommand {
102    Send(String),
103    Close,
104}
105
106const ROOM_EXPLICIT_LEAVE_CLOSE_DELAY: Duration = Duration::from_millis(40);
107const ROOM_STATE_IDLE: &str = "idle";
108const ROOM_STATE_CONNECTING: &str = "connecting";
109const ROOM_STATE_CONNECTED: &str = "connected";
110const ROOM_STATE_RECONNECTING: &str = "reconnecting";
111const ROOM_STATE_DISCONNECTED: &str = "disconnected";
112const ROOM_STATE_KICKED: &str = "kicked";
113
114// ── RoomClient v2 ─────────────────────────────────────────────────────────────
115
116pub struct RoomClient {
117    /// Room namespace (e.g. "game", "chat")
118    pub namespace: String,
119    /// Room instance ID within the namespace
120    pub room_id: String,
121
122    // ── State ───
123    shared_state: RwLock<Value>,
124    shared_version: RwLock<u64>,
125    player_state: RwLock<Value>,
126    player_version: RwLock<u64>,
127    members: RwLock<Value>,
128    media_members: RwLock<Value>,
129    current_user_id: Mutex<Option<String>>,
130    current_connection_id: Mutex<Option<String>>,
131    connection_state: RwLock<String>,
132    reconnect_info: RwLock<Option<Value>>,
133
134    // ── Config ───
135    base_url: String,
136    token_fn: Box<dyn Fn() -> String + Send + Sync>,
137    opts: RoomOptions,
138
139    // ── Handlers (Arc-wrapped so Subscription closures can hold clones) ───
140    shared_state_handlers: HandlerList<StateHandler>,
141    player_state_handlers: HandlerList<StateHandler>,
142    message_handlers: Arc<Mutex<HashMap<String, Vec<(u64, MessageHandler)>>>>,
143    error_handlers: HandlerList<ErrorHandler>,
144    kicked_handlers: HandlerList<KickedHandler>,
145    member_sync_handlers: HandlerList<MembersSyncHandler>,
146    member_join_handlers: HandlerList<MemberHandler>,
147    member_leave_handlers: HandlerList<MemberLeaveHandler>,
148    member_state_handlers: HandlerList<MemberStateHandler>,
149    signal_handlers: Arc<Mutex<HashMap<String, Vec<(u64, SignalHandler)>>>>,
150    any_signal_handlers: HandlerList<AnySignalHandler>,
151    media_track_handlers: HandlerList<MediaTrackHandler>,
152    media_track_removed_handlers: HandlerList<MediaTrackHandler>,
153    media_state_handlers: HandlerList<MediaStateHandler>,
154    media_device_handlers: HandlerList<MediaDeviceHandler>,
155    reconnect_handlers: HandlerList<ReconnectHandler>,
156    connection_state_handlers: HandlerList<ConnectionStateHandler>,
157    handler_id_counter: Mutex<u64>,
158
159    // ── Pending send() requests (requestId → oneshot Sender) ───
160    pending_requests: Mutex<HashMap<String, oneshot::Sender<Result<Value, Error>>>>,
161    pending_signal_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
162    pending_admin_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
163    pending_member_state_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
164    pending_media_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
165
166    // ── WS send channel ───
167    send_tx: Mutex<Option<mpsc::Sender<RoomWsCommand>>>,
168
169    // ── Control ───
170    stop_tx: Mutex<Option<mpsc::Sender<()>>>,
171    intentionally_left: Mutex<bool>,
172}
173
174impl RoomClient {
175    /// Create a new v2 RoomClient.
176    ///
177    /// # Arguments
178    /// * `base_url` - EdgeBase server URL (http or https)
179    /// * `namespace` - Room namespace (e.g. "game", "chat")
180    /// * `room_id` - Room instance ID within the namespace
181    /// * `token_fn` - Closure that returns the current access token
182    /// * `opts` - Optional RoomOptions for reconnect and timeout configuration
183    pub fn new(
184        base_url: &str,
185        namespace: &str,
186        room_id: &str,
187        token_fn: impl Fn() -> String + Send + Sync + 'static,
188        opts: Option<RoomOptions>,
189    ) -> Arc<Self> {
190        Arc::new(Self {
191            namespace: namespace.to_string(),
192            room_id: room_id.to_string(),
193            shared_state: RwLock::new(json!({})),
194            shared_version: RwLock::new(0),
195            player_state: RwLock::new(json!({})),
196            player_version: RwLock::new(0),
197            members: RwLock::new(json!([])),
198            media_members: RwLock::new(json!([])),
199            current_user_id: Mutex::new(None),
200            current_connection_id: Mutex::new(None),
201            connection_state: RwLock::new(ROOM_STATE_IDLE.to_string()),
202            reconnect_info: RwLock::new(None),
203            base_url: base_url.trim_end_matches('/').to_string(),
204            token_fn: Box::new(token_fn),
205            opts: opts.unwrap_or_default(),
206            shared_state_handlers: Arc::new(Mutex::new(vec![])),
207            player_state_handlers: Arc::new(Mutex::new(vec![])),
208            message_handlers: Arc::new(Mutex::new(HashMap::new())),
209            error_handlers: Arc::new(Mutex::new(vec![])),
210            kicked_handlers: Arc::new(Mutex::new(vec![])),
211            member_sync_handlers: Arc::new(Mutex::new(vec![])),
212            member_join_handlers: Arc::new(Mutex::new(vec![])),
213            member_leave_handlers: Arc::new(Mutex::new(vec![])),
214            member_state_handlers: Arc::new(Mutex::new(vec![])),
215            signal_handlers: Arc::new(Mutex::new(HashMap::new())),
216            any_signal_handlers: Arc::new(Mutex::new(vec![])),
217            media_track_handlers: Arc::new(Mutex::new(vec![])),
218            media_track_removed_handlers: Arc::new(Mutex::new(vec![])),
219            media_state_handlers: Arc::new(Mutex::new(vec![])),
220            media_device_handlers: Arc::new(Mutex::new(vec![])),
221            reconnect_handlers: Arc::new(Mutex::new(vec![])),
222            connection_state_handlers: Arc::new(Mutex::new(vec![])),
223            handler_id_counter: Mutex::new(0),
224            pending_requests: Mutex::new(HashMap::new()),
225            pending_signal_requests: Mutex::new(HashMap::new()),
226            pending_admin_requests: Mutex::new(HashMap::new()),
227            pending_member_state_requests: Mutex::new(HashMap::new()),
228            pending_media_requests: Mutex::new(HashMap::new()),
229            send_tx: Mutex::new(None),
230            stop_tx: Mutex::new(None),
231            intentionally_left: Mutex::new(false),
232        })
233    }
234
235    // ── State Accessors ──────────────────────────────────────────────────────
236
237    /// Get current shared state (read-only snapshot).
238    pub fn get_shared_state(&self) -> Value {
239        self.shared_state.read().unwrap().clone()
240    }
241
242    /// Get current player state (read-only snapshot).
243    pub fn get_player_state(&self) -> Value {
244        self.player_state.read().unwrap().clone()
245    }
246
247    /// Get the current logical room members snapshot.
248    pub fn list_members(&self) -> Value {
249        self.members.read().unwrap().clone()
250    }
251
252    /// Get the current media member snapshot.
253    pub fn list_media_members(&self) -> Value {
254        self.media_members.read().unwrap().clone()
255    }
256
257    /// Get the current session connection state.
258    pub fn connection_state(&self) -> String {
259        self.connection_state.read().unwrap().clone()
260    }
261
262    pub fn state(self: &Arc<Self>) -> RoomStateNamespace {
263        RoomStateNamespace::new(Arc::clone(self))
264    }
265
266    pub fn meta(self: &Arc<Self>) -> RoomMetaNamespace {
267        RoomMetaNamespace::new(Arc::clone(self))
268    }
269
270    pub fn signals(self: &Arc<Self>) -> RoomSignalsNamespace {
271        RoomSignalsNamespace::new(Arc::clone(self))
272    }
273
274    pub fn members(self: &Arc<Self>) -> RoomMembersNamespace {
275        RoomMembersNamespace::new(Arc::clone(self))
276    }
277
278    pub fn admin(self: &Arc<Self>) -> RoomAdminNamespace {
279        RoomAdminNamespace::new(Arc::clone(self))
280    }
281
282    pub fn media(self: &Arc<Self>) -> RoomMediaNamespace {
283        RoomMediaNamespace::new(Arc::clone(self))
284    }
285
286    pub fn session(self: &Arc<Self>) -> RoomSessionNamespace {
287        RoomSessionNamespace::new(Arc::clone(self))
288    }
289
290    // ── Metadata (HTTP, no WebSocket needed) ─────────────────────────────────
291
292    /// Get room metadata without joining (HTTP GET).
293    /// Returns developer-defined metadata set by room.setMetadata() on the server.
294    pub async fn get_metadata(&self) -> Result<Value, Error> {
295        Self::get_metadata_static(&self.base_url, &self.namespace, &self.room_id).await
296    }
297
298    /// Static: Get room metadata without creating a RoomClient instance.
299    /// Useful for lobby screens where you need room info before joining.
300    pub async fn get_metadata_static(
301        base_url: &str,
302        namespace: &str,
303        room_id: &str,
304    ) -> Result<Value, Error> {
305        let url = format!(
306            "{}/api/room/metadata?namespace={}&id={}",
307            base_url.trim_end_matches('/'),
308            urlencoding::encode(namespace),
309            urlencoding::encode(room_id)
310        );
311        let resp = reqwest::get(&url)
312            .await
313            .map_err(|e| Error::Room(format!("Failed to get room metadata: {}", e)))?;
314        if !resp.status().is_success() {
315            return Err(Error::Room(format!(
316                "Failed to get room metadata: {}",
317                resp.status()
318            )));
319        }
320        let body = resp
321            .text()
322            .await
323            .map_err(|e| Error::Room(format!("Failed to read room metadata body: {}", e)))?;
324        serde_json::from_str(&body)
325            .map_err(|e| Error::Room(format!("Failed to parse room metadata: {}", e)))
326    }
327
328    // ── Connection Lifecycle ─────────────────────────────────────────────────
329
330    /// Connect to the room, authenticate, and join.
331    pub async fn join(self: &Arc<Self>) -> Result<(), Error> {
332        *self.intentionally_left.lock().unwrap() = false;
333        self.set_connection_state(ROOM_STATE_CONNECTING);
334
335        let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
336        *self.stop_tx.lock().unwrap() = Some(stop_tx);
337
338        let this = Arc::clone(self);
339        tokio::spawn(async move {
340            let mut attempts = 0u32;
341            loop {
342                match this.establish().await {
343                    Ok(mut ws_rx) => {
344                        attempts = 0;
345                        loop {
346                            tokio::select! {
347                                _ = stop_rx.recv() => return,
348                                msg = ws_rx.recv() => {
349                                    match msg {
350                                        Some(raw) => this.handle_message(&raw),
351                                        None => {
352                                            // WS closed — reject pending requests
353                                            // if disconnect was not intentional
354                                            if !*this.intentionally_left.lock().unwrap() {
355                                                this.reject_all_pending(
356                                                    "WebSocket disconnected",
357                                                );
358                                            }
359                                            break;
360                                        }
361                                    }
362                                }
363                            }
364                        }
365                    }
366                    Err(_) => {}
367                }
368                if *this.intentionally_left.lock().unwrap() {
369                    return;
370                }
371                if !this.opts.auto_reconnect || attempts >= this.opts.max_reconnect_attempts {
372                    this.set_connection_state(ROOM_STATE_DISCONNECTED);
373                    return;
374                }
375                let delay = (this.opts.reconnect_base_delay_ms * (1u64 << attempts)).min(30_000);
376                attempts += 1;
377                this.begin_reconnect_attempt(attempts as u64);
378                sleep(Duration::from_millis(delay)).await;
379            }
380        });
381        Ok(())
382    }
383
384    /// Leave the room and disconnect. Cleans up all pending send() requests.
385    pub async fn leave(&self) {
386        *self.intentionally_left.lock().unwrap() = true;
387
388        let send_tx = self.send_tx.lock().unwrap().clone();
389        if let Some(tx) = send_tx.as_ref() {
390            let _ = tx
391                .send(RoomWsCommand::Send(
392                    json!({"type": "leave"}).to_string(),
393                ))
394                .await;
395            sleep(ROOM_EXPLICIT_LEAVE_CLOSE_DELAY).await;
396            let _ = tx.send(RoomWsCommand::Close).await;
397        }
398
399        if let Some(tx) = self.stop_tx.lock().unwrap().take() {
400            let _ = tx.send(()).await;
401        }
402
403        *self.send_tx.lock().unwrap() = None;
404
405        // Reject all pending requests with explicit error
406        self.reject_all_pending("Room left");
407
408        // Reset state
409        *self.shared_state.write().unwrap() = json!({});
410        *self.shared_version.write().unwrap() = 0;
411        *self.player_state.write().unwrap() = json!({});
412        *self.player_version.write().unwrap() = 0;
413        *self.members.write().unwrap() = json!([]);
414        *self.media_members.write().unwrap() = json!([]);
415        *self.current_user_id.lock().unwrap() = None;
416        *self.current_connection_id.lock().unwrap() = None;
417        *self.reconnect_info.write().unwrap() = None;
418        self.set_connection_state(ROOM_STATE_IDLE);
419    }
420
421    // ── Actions ──────────────────────────────────────────────────────────────
422
423    /// Send an action to the server.
424    /// Returns a Result that resolves with the action result from the server.
425    ///
426    /// # Example
427    /// ```ignore
428    /// let result = room.send("SET_SCORE", Some(json!({"score": 42}))).await?;
429    /// ```
430    pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
431        if self.send_tx.lock().unwrap().is_none() {
432            return Err(Error::Room("Not connected to room".to_string()));
433        }
434
435        let request_id = Uuid::new_v4().to_string();
436        let (tx, rx) = oneshot::channel::<Result<Value, Error>>();
437
438        self.pending_requests
439            .lock()
440            .unwrap()
441            .insert(request_id.clone(), tx);
442
443        self.ws_send(json!({
444            "type": "send",
445            "actionType": action_type,
446            "payload": payload.unwrap_or(json!({})),
447            "requestId": request_id,
448        }));
449
450        let timeout_ms = self.opts.send_timeout_ms;
451        let action_type_owned = action_type.to_string();
452        let req_id = request_id.clone();
453
454        match timeout(Duration::from_millis(timeout_ms), rx).await {
455            Ok(Ok(result)) => result,
456            Ok(Err(_)) => {
457                // oneshot channel closed (room left or sender dropped)
458                self.pending_requests.lock().unwrap().remove(&req_id);
459                Err(Error::Room(
460                    "Room left while waiting for action result".to_string(),
461                ))
462            }
463            Err(_) => {
464                // Timeout
465                self.pending_requests.lock().unwrap().remove(&req_id);
466                Err(Error::RoomTimeout(format!(
467                    "Action '{}' timed out",
468                    action_type_owned
469                )))
470            }
471        }
472    }
473
474    // ── Subscriptions (v2 API) ───────────────────────────────────────────────
475
476    /// Subscribe to shared state changes.
477    /// Handler receives (full_state, changes) on each sync/delta.
478    pub fn on_shared_state(
479        &self,
480        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
481    ) -> Subscription {
482        let id = self.next_handler_id();
483        let handler = Arc::new(handler) as StateHandler;
484        self.shared_state_handlers.lock().unwrap().push((id, handler));
485
486        let list = Arc::clone(&self.shared_state_handlers);
487        Subscription::new(move || {
488            list.lock().unwrap().retain(|(hid, _)| *hid != id);
489        })
490    }
491
492    /// Subscribe to player state changes.
493    /// Handler receives (full_state, changes) on each sync/delta.
494    pub fn on_player_state(
495        &self,
496        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
497    ) -> Subscription {
498        let id = self.next_handler_id();
499        let handler = Arc::new(handler) as StateHandler;
500        self.player_state_handlers.lock().unwrap().push((id, handler));
501
502        let list = Arc::clone(&self.player_state_handlers);
503        Subscription::new(move || {
504            list.lock().unwrap().retain(|(hid, _)| *hid != id);
505        })
506    }
507
508    /// Subscribe to messages of a specific type sent by room.sendMessage().
509    ///
510    /// # Example
511    /// ```ignore
512    /// let sub = room.on_message("game_over", |data| { println!("{:?}", data); });
513    /// ```
514    pub fn on_message(
515        &self,
516        msg_type: &str,
517        handler: impl Fn(&Value) + Send + Sync + 'static,
518    ) -> Subscription {
519        let id = self.next_handler_id();
520        let handler = Arc::new(handler) as MessageHandler;
521        let msg_type = msg_type.to_string();
522        {
523            let mut map = self.message_handlers.lock().unwrap();
524            map.entry(msg_type.clone())
525                .or_insert_with(Vec::new)
526                .push((id, handler));
527        }
528
529        let map = Arc::clone(&self.message_handlers);
530        Subscription::new(move || {
531            if let Some(list) = map.lock().unwrap().get_mut(&msg_type) {
532                list.retain(|(hid, _)| *hid != id);
533            }
534        })
535    }
536
537    /// Subscribe to error events.
538    pub fn on_error(
539        &self,
540        handler: impl Fn(&str, &str) + Send + Sync + 'static,
541    ) -> Subscription {
542        let id = self.next_handler_id();
543        let handler = Arc::new(handler) as ErrorHandler;
544        self.error_handlers.lock().unwrap().push((id, handler));
545
546        let list = Arc::clone(&self.error_handlers);
547        Subscription::new(move || {
548            list.lock().unwrap().retain(|(hid, _)| *hid != id);
549        })
550    }
551
552    /// Subscribe to kick events. After being kicked, auto-reconnect is disabled.
553    pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
554        let id = self.next_handler_id();
555        let handler = Arc::new(handler) as KickedHandler;
556        self.kicked_handlers.lock().unwrap().push((id, handler));
557
558        let list = Arc::clone(&self.kicked_handlers);
559        Subscription::new(move || {
560            list.lock().unwrap().retain(|(hid, _)| *hid != id);
561        })
562    }
563
564    pub fn on_members_sync(
565        &self,
566        handler: impl Fn(&Value) + Send + Sync + 'static,
567    ) -> Subscription {
568        let id = self.next_handler_id();
569        let handler = Arc::new(handler) as MembersSyncHandler;
570        self.member_sync_handlers.lock().unwrap().push((id, handler));
571
572        let list = Arc::clone(&self.member_sync_handlers);
573        Subscription::new(move || {
574            list.lock().unwrap().retain(|(hid, _)| *hid != id);
575        })
576    }
577
578    pub fn on_member_join(
579        &self,
580        handler: impl Fn(&Value) + Send + Sync + 'static,
581    ) -> Subscription {
582        let id = self.next_handler_id();
583        let handler = Arc::new(handler) as MemberHandler;
584        self.member_join_handlers.lock().unwrap().push((id, handler));
585
586        let list = Arc::clone(&self.member_join_handlers);
587        Subscription::new(move || {
588            list.lock().unwrap().retain(|(hid, _)| *hid != id);
589        })
590    }
591
592    pub fn on_member_leave(
593        &self,
594        handler: impl Fn(&Value, &str) + Send + Sync + 'static,
595    ) -> Subscription {
596        let id = self.next_handler_id();
597        let handler = Arc::new(handler) as MemberLeaveHandler;
598        self.member_leave_handlers.lock().unwrap().push((id, handler));
599
600        let list = Arc::clone(&self.member_leave_handlers);
601        Subscription::new(move || {
602            list.lock().unwrap().retain(|(hid, _)| *hid != id);
603        })
604    }
605
606    pub fn on_member_state_change(
607        &self,
608        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
609    ) -> Subscription {
610        let id = self.next_handler_id();
611        let handler = Arc::new(handler) as MemberStateHandler;
612        self.member_state_handlers.lock().unwrap().push((id, handler));
613
614        let list = Arc::clone(&self.member_state_handlers);
615        Subscription::new(move || {
616            list.lock().unwrap().retain(|(hid, _)| *hid != id);
617        })
618    }
619
620    pub fn on_signal(
621        &self,
622        event: &str,
623        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
624    ) -> Subscription {
625        let id = self.next_handler_id();
626        let handler = Arc::new(handler) as SignalHandler;
627        let event = event.to_string();
628        {
629            let mut map = self.signal_handlers.lock().unwrap();
630            map.entry(event.clone())
631                .or_insert_with(Vec::new)
632                .push((id, handler));
633        }
634
635        let map = Arc::clone(&self.signal_handlers);
636        Subscription::new(move || {
637            if let Some(list) = map.lock().unwrap().get_mut(&event) {
638                list.retain(|(hid, _)| *hid != id);
639            }
640        })
641    }
642
643    pub fn on_any_signal(
644        &self,
645        handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
646    ) -> Subscription {
647        let id = self.next_handler_id();
648        let handler = Arc::new(handler) as AnySignalHandler;
649        self.any_signal_handlers.lock().unwrap().push((id, handler));
650
651        let list = Arc::clone(&self.any_signal_handlers);
652        Subscription::new(move || {
653            list.lock().unwrap().retain(|(hid, _)| *hid != id);
654        })
655    }
656
657    pub fn on_media_track(
658        &self,
659        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
660    ) -> Subscription {
661        let id = self.next_handler_id();
662        let handler = Arc::new(handler) as MediaTrackHandler;
663        self.media_track_handlers.lock().unwrap().push((id, handler));
664
665        let list = Arc::clone(&self.media_track_handlers);
666        Subscription::new(move || {
667            list.lock().unwrap().retain(|(hid, _)| *hid != id);
668        })
669    }
670
671    pub fn on_media_track_removed(
672        &self,
673        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
674    ) -> Subscription {
675        let id = self.next_handler_id();
676        let handler = Arc::new(handler) as MediaTrackHandler;
677        self.media_track_removed_handlers
678            .lock()
679            .unwrap()
680            .push((id, handler));
681
682        let list = Arc::clone(&self.media_track_removed_handlers);
683        Subscription::new(move || {
684            list.lock().unwrap().retain(|(hid, _)| *hid != id);
685        })
686    }
687
688    pub fn on_media_state_change(
689        &self,
690        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
691    ) -> Subscription {
692        let id = self.next_handler_id();
693        let handler = Arc::new(handler) as MediaStateHandler;
694        self.media_state_handlers.lock().unwrap().push((id, handler));
695
696        let list = Arc::clone(&self.media_state_handlers);
697        Subscription::new(move || {
698            list.lock().unwrap().retain(|(hid, _)| *hid != id);
699        })
700    }
701
702    pub fn on_media_device_change(
703        &self,
704        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
705    ) -> Subscription {
706        let id = self.next_handler_id();
707        let handler = Arc::new(handler) as MediaDeviceHandler;
708        self.media_device_handlers.lock().unwrap().push((id, handler));
709
710        let list = Arc::clone(&self.media_device_handlers);
711        Subscription::new(move || {
712            list.lock().unwrap().retain(|(hid, _)| *hid != id);
713        })
714    }
715
716    pub fn on_reconnect(
717        &self,
718        handler: impl Fn(&Value) + Send + Sync + 'static,
719    ) -> Subscription {
720        let id = self.next_handler_id();
721        let handler = Arc::new(handler) as ReconnectHandler;
722        self.reconnect_handlers.lock().unwrap().push((id, handler));
723
724        let list = Arc::clone(&self.reconnect_handlers);
725        Subscription::new(move || {
726            list.lock().unwrap().retain(|(hid, _)| *hid != id);
727        })
728    }
729
730    pub fn on_connection_state_change(
731        &self,
732        handler: impl Fn(&str) + Send + Sync + 'static,
733    ) -> Subscription {
734        let id = self.next_handler_id();
735        let handler = Arc::new(handler) as ConnectionStateHandler;
736        self.connection_state_handlers
737            .lock()
738            .unwrap()
739            .push((id, handler));
740
741        let list = Arc::clone(&self.connection_state_handlers);
742        Subscription::new(move || {
743            list.lock().unwrap().retain(|(hid, _)| *hid != id);
744        })
745    }
746
747    pub async fn send_signal(
748        &self,
749        event: &str,
750        payload: Option<Value>,
751        options: Option<Value>,
752    ) -> Result<(), Error> {
753        if self.send_tx.lock().unwrap().is_none() {
754            return Err(Error::Room("Not connected to room".to_string()));
755        }
756
757        let request_id = Uuid::new_v4().to_string();
758        let options = options.unwrap_or_else(|| json!({}));
759        let message = json!({
760            "type": "signal",
761            "event": event,
762            "payload": payload.unwrap_or_else(|| json!({})),
763            "requestId": request_id,
764            "memberId": options.get("memberId").cloned().unwrap_or(Value::Null),
765            "includeSelf": options.get("includeSelf").and_then(Value::as_bool).unwrap_or(false),
766        });
767
768        self.send_unit_request(&self.pending_signal_requests, request_id, message, format!("Signal '{}' timed out", event)).await
769    }
770
771    pub async fn send_member_state(&self, state: Value) -> Result<(), Error> {
772        if self.send_tx.lock().unwrap().is_none() {
773            return Err(Error::Room("Not connected to room".to_string()));
774        }
775
776        let request_id = Uuid::new_v4().to_string();
777        let message = json!({
778            "type": "member_state",
779            "state": state,
780            "requestId": request_id,
781        });
782
783        self.send_unit_request(
784            &self.pending_member_state_requests,
785            request_id,
786            message,
787            "Member state update timed out".to_string(),
788        ).await
789    }
790
791    pub async fn clear_member_state(&self) -> Result<(), Error> {
792        if self.send_tx.lock().unwrap().is_none() {
793            return Err(Error::Room("Not connected to room".to_string()));
794        }
795
796        let request_id = Uuid::new_v4().to_string();
797        let message = json!({
798            "type": "member_state_clear",
799            "requestId": request_id,
800        });
801
802        self.send_unit_request(
803            &self.pending_member_state_requests,
804            request_id,
805            message,
806            "Member state clear timed out".to_string(),
807        ).await
808    }
809
810    pub async fn send_admin(
811        &self,
812        operation: &str,
813        member_id: &str,
814        payload: Option<Value>,
815    ) -> Result<(), Error> {
816        if self.send_tx.lock().unwrap().is_none() {
817            return Err(Error::Room("Not connected to room".to_string()));
818        }
819
820        let request_id = Uuid::new_v4().to_string();
821        let message = json!({
822            "type": "admin",
823            "operation": operation,
824            "memberId": member_id,
825            "payload": payload.unwrap_or_else(|| json!({})),
826            "requestId": request_id,
827        });
828
829        self.send_unit_request(
830            &self.pending_admin_requests,
831            request_id,
832            message,
833            format!("Admin '{}' timed out", operation),
834        ).await
835    }
836
837    pub async fn send_media(
838        &self,
839        operation: &str,
840        kind: &str,
841        payload: Option<Value>,
842    ) -> Result<(), Error> {
843        if self.send_tx.lock().unwrap().is_none() {
844            return Err(Error::Room("Not connected to room".to_string()));
845        }
846
847        let request_id = Uuid::new_v4().to_string();
848        let message = json!({
849            "type": "media",
850            "operation": operation,
851            "kind": kind,
852            "payload": payload.unwrap_or_else(|| json!({})),
853            "requestId": request_id,
854        });
855
856        self.send_unit_request(
857            &self.pending_media_requests,
858            request_id,
859            message,
860            format!("Media '{}:{}' timed out", operation, kind),
861        ).await
862    }
863
864    pub async fn switch_media_devices(&self, payload: Value) -> Result<(), Error> {
865        if let Some(device_id) = payload.get("audioInputId").and_then(Value::as_str) {
866            self.send_media("device", "audio", Some(json!({ "deviceId": device_id }))).await?;
867        }
868        if let Some(device_id) = payload.get("videoInputId").and_then(Value::as_str) {
869            self.send_media("device", "video", Some(json!({ "deviceId": device_id }))).await?;
870        }
871        if let Some(device_id) = payload.get("screenInputId").and_then(Value::as_str) {
872            self.send_media("device", "screen", Some(json!({ "deviceId": device_id }))).await?;
873        }
874        Ok(())
875    }
876
877    /// Reject all pending requests across all pending maps with an error message.
878    /// Called when the WebSocket disconnects unexpectedly.
879    fn reject_all_pending(&self, message: &str) {
880        let error_msg = message.to_string();
881
882        // Reject pending action requests
883        for (_, tx) in self.pending_requests.lock().unwrap().drain() {
884            let _ = tx.send(Err(Error::Room(error_msg.clone())));
885        }
886
887        // Reject pending signal requests
888        for (_, tx) in self.pending_signal_requests.lock().unwrap().drain() {
889            let _ = tx.send(Err(Error::Room(error_msg.clone())));
890        }
891
892        // Reject pending admin requests
893        for (_, tx) in self.pending_admin_requests.lock().unwrap().drain() {
894            let _ = tx.send(Err(Error::Room(error_msg.clone())));
895        }
896
897        // Reject pending member state requests
898        for (_, tx) in self.pending_member_state_requests.lock().unwrap().drain() {
899            let _ = tx.send(Err(Error::Room(error_msg.clone())));
900        }
901
902        // Reject pending media requests
903        for (_, tx) in self.pending_media_requests.lock().unwrap().drain() {
904            let _ = tx.send(Err(Error::Room(error_msg.clone())));
905        }
906    }
907
908    /// Leave the room, clear all handler lists, and release resources.
909    /// After calling destroy(), this RoomClient instance should not be reused.
910    pub async fn destroy(self: &Arc<Self>) {
911        self.leave().await;
912
913        // Clear all handler lists
914        self.shared_state_handlers.lock().unwrap().clear();
915        self.player_state_handlers.lock().unwrap().clear();
916        self.message_handlers.lock().unwrap().clear();
917        self.error_handlers.lock().unwrap().clear();
918        self.kicked_handlers.lock().unwrap().clear();
919        self.member_sync_handlers.lock().unwrap().clear();
920        self.member_join_handlers.lock().unwrap().clear();
921        self.member_leave_handlers.lock().unwrap().clear();
922        self.member_state_handlers.lock().unwrap().clear();
923        self.signal_handlers.lock().unwrap().clear();
924        self.any_signal_handlers.lock().unwrap().clear();
925        self.media_track_handlers.lock().unwrap().clear();
926        self.media_track_removed_handlers.lock().unwrap().clear();
927        self.media_state_handlers.lock().unwrap().clear();
928        self.media_device_handlers.lock().unwrap().clear();
929        self.reconnect_handlers.lock().unwrap().clear();
930        self.connection_state_handlers.lock().unwrap().clear();
931    }
932
933    // ── Private: Connection ──────────────────────────────────────────────────
934
935    fn ws_url(&self) -> String {
936        let u = self
937            .base_url
938            .replace("https://", "wss://")
939            .replace("http://", "ws://");
940        format!(
941            "{}/api/room?namespace={}&id={}",
942            u,
943            urlencoding::encode(&self.namespace),
944            urlencoding::encode(&self.room_id)
945        )
946    }
947
948    /// Send a raw JSON message over the WS write channel.
949    fn ws_send(&self, msg: Value) {
950        let s = msg.to_string();
951        if let Some(tx) = self.send_tx.lock().unwrap().as_ref() {
952            let tx = tx.clone();
953            tokio::spawn(async move {
954                let _ = tx.send(RoomWsCommand::Send(s)).await;
955            });
956        }
957    }
958
959    #[cfg(test)]
960    pub(crate) fn attach_send_channel_for_testing(&self, tx: mpsc::Sender<RoomWsCommand>) {
961        *self.send_tx.lock().unwrap() = Some(tx);
962    }
963
964    #[cfg(test)]
965    pub(crate) fn handle_message_for_testing(&self, raw: &str) {
966        self.handle_message(raw);
967    }
968
969    async fn establish(&self) -> anyhow::Result<mpsc::Receiver<String>> {
970        use futures_util::{SinkExt, StreamExt};
971        use tokio_tungstenite::{connect_async, tungstenite::Message};
972
973        let (ws_stream, _response) = tokio::time::timeout(
974            std::time::Duration::from_millis(self.opts.connection_timeout_ms),
975            connect_async(self.ws_url()),
976        )
977        .await
978        .map_err(|_| anyhow::anyhow!(
979            "Room WebSocket connection timed out after {}ms. Is the server running?",
980            self.opts.connection_timeout_ms,
981        ))??;
982        let (mut write, mut read) = ws_stream.split();
983
984        // Auth
985        let auth = json!({"type": "auth", "token": (self.token_fn)(), "sdkVersion": "0.2.2"});
986        write.send(Message::Text(auth.to_string().into())).await?;
987
988        // Wait for auth_success
989        let join_raw = if let Some(Ok(Message::Text(raw))) = read.next().await {
990            let raw_str: &str = &raw;
991            let resp: Value = serde_json::from_str(raw_str)?;
992            let t = resp["type"].as_str().unwrap_or("");
993            if t != "auth_success" && t != "auth_refreshed" {
994                anyhow::bail!("Room auth failed: {}", resp["message"]);
995            }
996
997            *self.current_user_id.lock().unwrap() = resp["userId"].as_str().map(|value| value.to_string());
998            *self.current_connection_id.lock().unwrap() = resp["connectionId"].as_str().map(|value| value.to_string());
999
1000            // v2: join with last known shared + player state for eviction recovery
1001            let join_msg = json!({
1002                "type": "join",
1003                "lastSharedState": *self.shared_state.read().unwrap(),
1004                "lastSharedVersion": *self.shared_version.read().unwrap(),
1005                "lastPlayerState": *self.player_state.read().unwrap(),
1006                "lastPlayerVersion": *self.player_version.read().unwrap(),
1007            });
1008            join_msg.to_string()
1009        } else {
1010            anyhow::bail!("Room: no auth response");
1011        };
1012
1013        // Shared WS write channel
1014        let (write_tx, mut write_rx) = mpsc::channel::<RoomWsCommand>(128);
1015        *self.send_tx.lock().unwrap() = Some(write_tx.clone());
1016
1017        // WS writer task
1018        tokio::spawn(async move {
1019            while let Some(command) = write_rx.recv().await {
1020                match command {
1021                    RoomWsCommand::Send(msg) => {
1022                        if write.send(Message::Text(msg.into())).await.is_err() {
1023                            break;
1024                        }
1025                    }
1026                    RoomWsCommand::Close => {
1027                        let _ = write.send(Message::Close(None)).await;
1028                        break;
1029                    }
1030                }
1031            }
1032        });
1033
1034        // Send join message
1035        let _ = write_tx.send(RoomWsCommand::Send(join_raw)).await;
1036
1037        // Heartbeat
1038        let htx = write_tx.clone();
1039        tokio::spawn(async move {
1040            loop {
1041                sleep(Duration::from_secs(30)).await;
1042                if htx
1043                    .send(RoomWsCommand::Send(json!({"type":"ping"}).to_string()))
1044                    .await
1045                    .is_err()
1046                {
1047                    break;
1048                }
1049            }
1050        });
1051
1052        // WS reader → msg_rx channel consumed by join() loop
1053        let (msg_tx, msg_rx) = mpsc::channel::<String>(128);
1054        tokio::spawn(async move {
1055            while let Some(Ok(Message::Text(raw))) = read.next().await {
1056                let raw_str: String = raw.into();
1057                if msg_tx.send(raw_str).await.is_err() {
1058                    break;
1059                }
1060            }
1061        });
1062
1063        Ok(msg_rx)
1064    }
1065
1066    // ── Private: Message Handling ────────────────────────────────────────────
1067
1068    fn handle_message(&self, raw: &str) {
1069        let msg: Value = match serde_json::from_str(raw) {
1070            Ok(v) => v,
1071            Err(_) => return,
1072        };
1073        let t = msg["type"].as_str().unwrap_or("");
1074
1075        match t {
1076            "sync" => self.handle_sync(&msg),
1077            "shared_delta" => self.handle_shared_delta(&msg),
1078            "player_delta" => self.handle_player_delta(&msg),
1079            "action_result" => self.handle_action_result(&msg),
1080            "action_error" => self.handle_action_error(&msg),
1081            "message" => self.handle_server_message(&msg),
1082            "signal" => self.handle_signal(&msg),
1083            "signal_sent" => self.resolve_pending_unit_request(&self.pending_signal_requests, &msg),
1084            "signal_error" => self.reject_pending_unit_request(&self.pending_signal_requests, &msg, "Signal error"),
1085            "members_sync" => self.handle_members_sync(&msg),
1086            "member_join" => self.handle_member_join(&msg),
1087            "member_leave" => self.handle_member_leave(&msg),
1088            "member_state" => self.handle_member_state(&msg),
1089            "member_state_error" => self.reject_pending_unit_request(&self.pending_member_state_requests, &msg, "Member state error"),
1090            "admin_result" => self.resolve_pending_unit_request(&self.pending_admin_requests, &msg),
1091            "admin_error" => self.reject_pending_unit_request(&self.pending_admin_requests, &msg, "Admin error"),
1092            "media_sync" => self.handle_media_sync(&msg),
1093            "media_track" => self.handle_media_track(&msg),
1094            "media_track_removed" => self.handle_media_track_removed(&msg),
1095            "media_state" => self.handle_media_state(&msg),
1096            "media_device" => self.handle_media_device(&msg),
1097            "media_result" => self.resolve_pending_unit_request(&self.pending_media_requests, &msg),
1098            "media_error" => self.reject_pending_unit_request(&self.pending_media_requests, &msg, "Media error"),
1099            "kicked" => self.handle_kicked(),
1100            "error" => self.handle_error(&msg),
1101            "pong" => {}
1102            _ => {}
1103        }
1104    }
1105
1106    fn handle_sync(&self, msg: &Value) {
1107        *self.shared_state.write().unwrap() = msg["sharedState"].clone();
1108        *self.shared_version.write().unwrap() = msg["sharedVersion"].as_u64().unwrap_or(0);
1109        *self.player_state.write().unwrap() = msg["playerState"].clone();
1110        *self.player_version.write().unwrap() = msg["playerVersion"].as_u64().unwrap_or(0);
1111        self.set_connection_state(ROOM_STATE_CONNECTED);
1112
1113        if let Some(info) = self.reconnect_info.write().unwrap().take() {
1114            for (_, handler) in self.reconnect_handlers.lock().unwrap().iter() {
1115                handler(&info);
1116            }
1117        }
1118
1119        // Notify shared state handlers (full state as changes on sync)
1120        let shared = self.shared_state.read().unwrap().clone();
1121        for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1122            handler(&shared, &shared);
1123        }
1124        let player = self.player_state.read().unwrap().clone();
1125        for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1126            handler(&player, &player);
1127        }
1128    }
1129
1130    fn handle_shared_delta(&self, msg: &Value) {
1131        let delta = &msg["delta"];
1132        *self.shared_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1133
1134        if let Value::Object(map) = delta {
1135            let mut state = self.shared_state.write().unwrap();
1136            for (path, value) in map {
1137                deep_set(&mut state, path, value.clone());
1138            }
1139        }
1140
1141        let state = self.shared_state.read().unwrap().clone();
1142        for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1143            handler(&state, delta);
1144        }
1145    }
1146
1147    fn handle_player_delta(&self, msg: &Value) {
1148        let delta = &msg["delta"];
1149        *self.player_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1150
1151        if let Value::Object(map) = delta {
1152            let mut state = self.player_state.write().unwrap();
1153            for (path, value) in map {
1154                deep_set(&mut state, path, value.clone());
1155            }
1156        }
1157
1158        let state = self.player_state.read().unwrap().clone();
1159        for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1160            handler(&state, delta);
1161        }
1162    }
1163
1164    fn handle_action_result(&self, msg: &Value) {
1165        let request_id = msg["requestId"].as_str().unwrap_or("");
1166        if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1167            let _ = tx.send(Ok(msg["result"].clone()));
1168        }
1169    }
1170
1171    fn handle_action_error(&self, msg: &Value) {
1172        let request_id = msg["requestId"].as_str().unwrap_or("");
1173        if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1174            let message = msg["message"].as_str().unwrap_or("Unknown error");
1175            let _ = tx.send(Err(Error::Room(message.to_string())));
1176        }
1177    }
1178
1179    fn handle_server_message(&self, msg: &Value) {
1180        let message_type = msg["messageType"].as_str().unwrap_or("");
1181        let data = &msg["data"];
1182
1183        let handlers = self.message_handlers.lock().unwrap();
1184        if let Some(list) = handlers.get(message_type) {
1185            for (_, handler) in list {
1186                handler(data);
1187            }
1188        }
1189    }
1190
1191    fn handle_signal(&self, msg: &Value) {
1192        let event = msg["event"].as_str().unwrap_or("");
1193        let payload = &msg["payload"];
1194        let meta = &msg["meta"];
1195
1196        if let Some(list) = self.signal_handlers.lock().unwrap().get(event) {
1197            for (_, handler) in list {
1198                handler(payload, meta);
1199            }
1200        }
1201
1202        for (_, handler) in self.any_signal_handlers.lock().unwrap().iter() {
1203            handler(event, payload, meta);
1204        }
1205    }
1206
1207    fn handle_members_sync(&self, msg: &Value) {
1208        let members = normalize_members(&msg["members"]);
1209        *self.members.write().unwrap() = members.clone();
1210        self.merge_members_into_media();
1211
1212        for (_, handler) in self.member_sync_handlers.lock().unwrap().iter() {
1213            handler(&members);
1214        }
1215    }
1216
1217    fn handle_member_join(&self, msg: &Value) {
1218        if let Some(member) = normalize_member(&msg["member"]) {
1219            self.upsert_member(member.clone());
1220            for (_, handler) in self.member_join_handlers.lock().unwrap().iter() {
1221                handler(&member);
1222            }
1223        }
1224    }
1225
1226    fn handle_member_leave(&self, msg: &Value) {
1227        if let Some(member) = normalize_member(&msg["member"]) {
1228            let member_id = member["memberId"].as_str().unwrap_or("");
1229            self.remove_member(member_id);
1230            let reason = msg["reason"].as_str().unwrap_or("leave");
1231            for (_, handler) in self.member_leave_handlers.lock().unwrap().iter() {
1232                handler(&member, reason);
1233            }
1234        }
1235    }
1236
1237    fn handle_member_state(&self, msg: &Value) {
1238        if let Some(mut member) = normalize_member(&msg["member"]) {
1239            let state = object_or_empty(&msg["state"]);
1240            member["state"] = state.clone();
1241            self.upsert_member(member.clone());
1242            self.resolve_pending_unit_request(&self.pending_member_state_requests, msg);
1243
1244            for (_, handler) in self.member_state_handlers.lock().unwrap().iter() {
1245                handler(&member, &state);
1246            }
1247        }
1248    }
1249
1250    fn handle_media_sync(&self, msg: &Value) {
1251        let media_members = normalize_media_members(&msg["members"]);
1252        *self.media_members.write().unwrap() = media_members.clone();
1253        self.merge_members_into_media();
1254    }
1255
1256    fn handle_media_track(&self, msg: &Value) {
1257        if let (Some(member), Some(track)) = (
1258            normalize_member(&msg["member"]),
1259            normalize_track(&msg["track"]),
1260        ) {
1261            self.upsert_media_track(&member, &track);
1262            for (_, handler) in self.media_track_handlers.lock().unwrap().iter() {
1263                handler(&track, &member);
1264            }
1265        }
1266    }
1267
1268    fn handle_media_track_removed(&self, msg: &Value) {
1269        if let (Some(member), Some(track)) = (
1270            normalize_member(&msg["member"]),
1271            normalize_track(&msg["track"]),
1272        ) {
1273            self.remove_media_track(&member, &track);
1274            for (_, handler) in self
1275                .media_track_removed_handlers
1276                .lock()
1277                .unwrap()
1278                .iter()
1279            {
1280                handler(&track, &member);
1281            }
1282        }
1283    }
1284
1285    fn handle_media_state(&self, msg: &Value) {
1286        if let Some(member) = normalize_member(&msg["member"]) {
1287            let state = object_or_empty(&msg["state"]);
1288            self.upsert_media_state(&member, state.clone());
1289            for (_, handler) in self.media_state_handlers.lock().unwrap().iter() {
1290                handler(&member, &state);
1291            }
1292        }
1293    }
1294
1295    fn handle_media_device(&self, msg: &Value) {
1296        if let Some(member) = normalize_member(&msg["member"]) {
1297            let change = json!({
1298                "kind": msg["kind"].clone(),
1299                "deviceId": msg["deviceId"].clone(),
1300            });
1301            self.apply_media_device_change(&member, &change);
1302            for (_, handler) in self.media_device_handlers.lock().unwrap().iter() {
1303                handler(&member, &change);
1304            }
1305        }
1306    }
1307
1308    fn handle_kicked(&self) {
1309        self.set_connection_state(ROOM_STATE_KICKED);
1310        *self.intentionally_left.lock().unwrap() = true;
1311        for (_, handler) in self.kicked_handlers.lock().unwrap().iter() {
1312            handler();
1313        }
1314    }
1315
1316    fn handle_error(&self, msg: &Value) {
1317        let code = msg["code"].as_str().unwrap_or("");
1318        let message = msg["message"].as_str().unwrap_or("");
1319        for (_, handler) in self.error_handlers.lock().unwrap().iter() {
1320            handler(code, message);
1321        }
1322    }
1323
1324    // ── Private: Helpers ─────────────────────────────────────────────────────
1325
1326    async fn send_unit_request(
1327        &self,
1328        pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1329        request_id: String,
1330        message: Value,
1331        timeout_message: String,
1332    ) -> Result<(), Error> {
1333        let (tx, rx) = oneshot::channel::<Result<(), Error>>();
1334        pending.lock().unwrap().insert(request_id.clone(), tx);
1335        self.ws_send(message);
1336
1337        match timeout(Duration::from_millis(self.opts.send_timeout_ms), rx).await {
1338            Ok(Ok(result)) => result,
1339            Ok(Err(_)) => {
1340                pending.lock().unwrap().remove(&request_id);
1341                Err(Error::Room(
1342                    "Room left while waiting for room control result".to_string(),
1343                ))
1344            }
1345            Err(_) => {
1346                pending.lock().unwrap().remove(&request_id);
1347                Err(Error::RoomTimeout(timeout_message))
1348            }
1349        }
1350    }
1351
1352    fn resolve_pending_unit_request(
1353        &self,
1354        pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1355        msg: &Value,
1356    ) {
1357        let request_id = msg["requestId"].as_str().unwrap_or("");
1358        if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1359            let _ = tx.send(Ok(()));
1360        }
1361    }
1362
1363    fn reject_pending_unit_request(
1364        &self,
1365        pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1366        msg: &Value,
1367        fallback: &str,
1368    ) {
1369        let request_id = msg["requestId"].as_str().unwrap_or("");
1370        if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1371            let message = msg["message"].as_str().unwrap_or(fallback);
1372            let _ = tx.send(Err(Error::Room(message.to_string())));
1373        }
1374    }
1375
1376    fn set_connection_state(&self, next: &str) {
1377        let mut state = self.connection_state.write().unwrap();
1378        if state.as_str() == next {
1379            return;
1380        }
1381        *state = next.to_string();
1382        drop(state);
1383
1384        for (_, handler) in self.connection_state_handlers.lock().unwrap().iter() {
1385            handler(next);
1386        }
1387    }
1388
1389    fn begin_reconnect_attempt(&self, attempt: u64) {
1390        *self.reconnect_info.write().unwrap() = Some(json!({ "attempt": attempt }));
1391        self.set_connection_state(ROOM_STATE_RECONNECTING);
1392    }
1393
1394    fn upsert_member(&self, member: Value) {
1395        let member_id = member["memberId"].as_str().unwrap_or("").to_string();
1396        if member_id.is_empty() {
1397            return;
1398        }
1399
1400        let mut members = self.members.write().unwrap();
1401        let list = members.as_array_mut().expect("members array");
1402        if let Some(existing) = list
1403            .iter_mut()
1404            .find(|entry| entry["memberId"].as_str() == Some(member_id.as_str()))
1405        {
1406            *existing = member;
1407        } else {
1408            list.push(member);
1409        }
1410        drop(members);
1411        self.merge_members_into_media();
1412    }
1413
1414    fn remove_member(&self, member_id: &str) {
1415        {
1416            let mut members = self.members.write().unwrap();
1417            if let Some(list) = members.as_array_mut() {
1418                list.retain(|entry| entry["memberId"].as_str() != Some(member_id));
1419            }
1420        }
1421        {
1422            let mut media_members = self.media_members.write().unwrap();
1423            if let Some(list) = media_members.as_array_mut() {
1424                list.retain(|entry| entry["member"]["memberId"].as_str() != Some(member_id));
1425            }
1426        }
1427    }
1428
1429    fn merge_members_into_media(&self) {
1430        let members = self.members.read().unwrap().clone();
1431        let mut media_members = self.media_members.write().unwrap();
1432        if let Some(list) = media_members.as_array_mut() {
1433            for media_member in list.iter_mut() {
1434                let member_id = media_member["member"]["memberId"].as_str().unwrap_or("");
1435                if let Some(member) = members
1436                    .as_array()
1437                    .and_then(|entries| {
1438                        entries
1439                            .iter()
1440                            .find(|entry| entry["memberId"].as_str() == Some(member_id))
1441                    })
1442                {
1443                    media_member["member"] = member.clone();
1444                }
1445            }
1446        }
1447    }
1448
1449    fn ensure_media_member(&self, member: &Value) -> usize {
1450        self.upsert_member(member.clone());
1451        let member_id = member["memberId"].as_str().unwrap_or("");
1452        let mut media_members = self.media_members.write().unwrap();
1453        let list = media_members.as_array_mut().expect("media members array");
1454        if let Some(index) = list
1455            .iter()
1456            .position(|entry| entry["member"]["memberId"].as_str() == Some(member_id))
1457        {
1458            list[index]["member"] = member.clone();
1459            return index;
1460        }
1461
1462        list.push(json!({
1463            "member": member,
1464            "state": {},
1465            "tracks": [],
1466        }));
1467        list.len() - 1
1468    }
1469
1470    fn upsert_media_track(&self, member: &Value, track: &Value) {
1471        let index = self.ensure_media_member(member);
1472        let mut media_members = self.media_members.write().unwrap();
1473        let list = media_members.as_array_mut().expect("media members array");
1474        let tracks = list[index]["tracks"].as_array_mut().expect("tracks array");
1475        let kind = track["kind"].as_str().unwrap_or("");
1476        if let Some(existing) = tracks
1477            .iter_mut()
1478            .find(|entry| entry["kind"].as_str() == Some(kind))
1479        {
1480            *existing = track.clone();
1481        } else {
1482            tracks.push(track.clone());
1483        }
1484        apply_track_to_state(&mut list[index]["state"], track, true);
1485    }
1486
1487    fn remove_media_track(&self, member: &Value, track: &Value) {
1488        let index = self.ensure_media_member(member);
1489        let mut media_members = self.media_members.write().unwrap();
1490        let list = media_members.as_array_mut().expect("media members array");
1491        let kind = track["kind"].as_str().unwrap_or("");
1492        if let Some(tracks) = list[index]["tracks"].as_array_mut() {
1493            tracks.retain(|entry| entry["kind"].as_str() != Some(kind));
1494        }
1495        apply_track_to_state(&mut list[index]["state"], track, false);
1496    }
1497
1498    fn upsert_media_state(&self, member: &Value, state: Value) {
1499        let index = self.ensure_media_member(member);
1500        let mut media_members = self.media_members.write().unwrap();
1501        let list = media_members.as_array_mut().expect("media members array");
1502        list[index]["state"] = state;
1503    }
1504
1505    fn apply_media_device_change(&self, member: &Value, change: &Value) {
1506        let index = self.ensure_media_member(member);
1507        let mut media_members = self.media_members.write().unwrap();
1508        let list = media_members.as_array_mut().expect("media members array");
1509        let kind = change["kind"].as_str().unwrap_or("");
1510        let device_id = change["deviceId"].clone();
1511        if let Some(state) = list[index]["state"].as_object_mut() {
1512            let kind_state = state.entry(kind.to_string()).or_insert_with(|| json!({}));
1513            if let Some(kind_state_map) = kind_state.as_object_mut() {
1514                kind_state_map.insert("deviceId".to_string(), device_id);
1515            }
1516        }
1517    }
1518
1519    fn next_handler_id(&self) -> u64 {
1520        let mut counter = self.handler_id_counter.lock().unwrap();
1521        *counter += 1;
1522        *counter
1523    }
1524}
1525
1526// ── Helper ────────────────────────────────────────────────────────────────────
1527
1528fn deep_set(obj: &mut Value, path: &str, value: Value) {
1529    if let Some(dot) = path.find('.') {
1530        let head = &path[..dot];
1531        let tail = &path[dot + 1..];
1532        if let Value::Object(map) = obj {
1533            let nested = map.entry(head.to_string()).or_insert(json!({}));
1534            deep_set(nested, tail, value);
1535        }
1536    } else if let Value::Object(map) = obj {
1537        if value.is_null() {
1538            map.remove(path);
1539        } else {
1540            map.insert(path.to_string(), value);
1541        }
1542    }
1543}
1544
1545fn object_or_empty(value: &Value) -> Value {
1546    if value.is_object() {
1547        value.clone()
1548    } else {
1549        json!({})
1550    }
1551}
1552
1553fn normalize_member(value: &Value) -> Option<Value> {
1554    let member_id = value["memberId"].as_str()?;
1555    let user_id = value["userId"].as_str()?;
1556    let mut member = json!({
1557        "memberId": member_id,
1558        "userId": user_id,
1559        "state": object_or_empty(&value["state"]),
1560    });
1561
1562    if let Some(connection_id) = value["connectionId"].as_str() {
1563        member["connectionId"] = Value::String(connection_id.to_string());
1564    }
1565    if let Some(connection_count) = value["connectionCount"].as_u64() {
1566        member["connectionCount"] = Value::from(connection_count);
1567    }
1568    if let Some(role) = value["role"].as_str() {
1569        member["role"] = Value::String(role.to_string());
1570    }
1571    Some(member)
1572}
1573
1574fn normalize_members(value: &Value) -> Value {
1575    Value::Array(
1576        value
1577            .as_array()
1578            .into_iter()
1579            .flatten()
1580            .filter_map(normalize_member)
1581            .collect(),
1582    )
1583}
1584
1585fn normalize_track(value: &Value) -> Option<Value> {
1586    let kind = value["kind"].as_str()?;
1587    let mut track = json!({
1588        "kind": kind,
1589        "muted": value["muted"].as_bool().unwrap_or(false),
1590    });
1591
1592    if let Some(track_id) = value["trackId"].as_str() {
1593        track["trackId"] = Value::String(track_id.to_string());
1594    }
1595    if let Some(device_id) = value["deviceId"].as_str() {
1596        track["deviceId"] = Value::String(device_id.to_string());
1597    }
1598    if let Some(published_at) = value["publishedAt"].as_u64() {
1599        track["publishedAt"] = Value::from(published_at);
1600    }
1601    if let Some(admin_disabled) = value["adminDisabled"].as_bool() {
1602        track["adminDisabled"] = Value::Bool(admin_disabled);
1603    }
1604    Some(track)
1605}
1606
1607fn normalize_tracks(value: &Value) -> Value {
1608    Value::Array(
1609        value
1610            .as_array()
1611            .into_iter()
1612            .flatten()
1613            .filter_map(normalize_track)
1614            .collect(),
1615    )
1616}
1617
1618fn normalize_media_members(value: &Value) -> Value {
1619    Value::Array(
1620        value
1621            .as_array()
1622            .into_iter()
1623            .flatten()
1624            .filter_map(|entry| {
1625                let member = normalize_member(&entry["member"])?;
1626                Some(json!({
1627                    "member": member,
1628                    "state": object_or_empty(&entry["state"]),
1629                    "tracks": normalize_tracks(&entry["tracks"]),
1630                }))
1631            })
1632            .collect(),
1633    )
1634}
1635
1636fn apply_track_to_state(state: &mut Value, track: &Value, published: bool) {
1637    if !state.is_object() {
1638        *state = json!({});
1639    }
1640
1641    let kind = match track["kind"].as_str() {
1642        Some(kind) => kind,
1643        None => return,
1644    };
1645
1646    let state_map = state.as_object_mut().expect("state object");
1647    let kind_state = state_map
1648        .entry(kind.to_string())
1649        .or_insert_with(|| json!({}));
1650    let kind_state_map = kind_state.as_object_mut().expect("kind state object");
1651    kind_state_map.insert(
1652        "published".to_string(),
1653        Value::Bool(published),
1654    );
1655    kind_state_map.insert(
1656        "muted".to_string(),
1657        Value::Bool(track["muted"].as_bool().unwrap_or(false)),
1658    );
1659
1660    if published {
1661        if let Some(track_id) = track.get("trackId") {
1662            kind_state_map.insert("trackId".to_string(), track_id.clone());
1663        }
1664        if let Some(device_id) = track.get("deviceId") {
1665            kind_state_map.insert("deviceId".to_string(), device_id.clone());
1666        }
1667        if let Some(published_at) = track.get("publishedAt") {
1668            kind_state_map.insert("publishedAt".to_string(), published_at.clone());
1669        }
1670        if let Some(admin_disabled) = track.get("adminDisabled") {
1671            kind_state_map.insert("adminDisabled".to_string(), admin_disabled.clone());
1672        }
1673    } else {
1674        kind_state_map.remove("trackId");
1675        kind_state_map.remove("publishedAt");
1676        if let Some(admin_disabled) = track.get("adminDisabled") {
1677            kind_state_map.insert("adminDisabled".to_string(), admin_disabled.clone());
1678        }
1679    }
1680}
1681
1682pub struct RoomStateNamespace {
1683    client: Arc<RoomClient>,
1684}
1685
1686impl RoomStateNamespace {
1687    fn new(client: Arc<RoomClient>) -> Self {
1688        Self { client }
1689    }
1690
1691    pub fn get_shared(&self) -> Value {
1692        self.client.get_shared_state()
1693    }
1694
1695    pub fn get_mine(&self) -> Value {
1696        self.client.get_player_state()
1697    }
1698
1699    pub fn on_shared_change(
1700        &self,
1701        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1702    ) -> Subscription {
1703        self.client.on_shared_state(handler)
1704    }
1705
1706    pub fn on_mine_change(
1707        &self,
1708        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1709    ) -> Subscription {
1710        self.client.on_player_state(handler)
1711    }
1712
1713    pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
1714        self.client.send(action_type, payload).await
1715    }
1716}
1717
1718pub struct RoomMetaNamespace {
1719    client: Arc<RoomClient>,
1720}
1721
1722impl RoomMetaNamespace {
1723    fn new(client: Arc<RoomClient>) -> Self {
1724        Self { client }
1725    }
1726
1727    pub async fn get(&self) -> Result<Value, Error> {
1728        self.client.get_metadata().await
1729    }
1730}
1731
1732pub struct RoomSignalsNamespace {
1733    client: Arc<RoomClient>,
1734}
1735
1736impl RoomSignalsNamespace {
1737    fn new(client: Arc<RoomClient>) -> Self {
1738        Self { client }
1739    }
1740
1741    pub async fn send(
1742        &self,
1743        event: &str,
1744        payload: Option<Value>,
1745        options: Option<Value>,
1746    ) -> Result<(), Error> {
1747        self.client.send_signal(event, payload, options).await
1748    }
1749
1750    pub async fn send_to(
1751        &self,
1752        member_id: &str,
1753        event: &str,
1754        payload: Option<Value>,
1755    ) -> Result<(), Error> {
1756        self.client
1757            .send_signal(event, payload, Some(json!({ "memberId": member_id })))
1758            .await
1759    }
1760
1761    pub fn on(
1762        &self,
1763        event: &str,
1764        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1765    ) -> Subscription {
1766        self.client.on_signal(event, handler)
1767    }
1768
1769    pub fn on_any(
1770        &self,
1771        handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
1772    ) -> Subscription {
1773        self.client.on_any_signal(handler)
1774    }
1775}
1776
1777pub struct RoomMembersNamespace {
1778    client: Arc<RoomClient>,
1779}
1780
1781impl RoomMembersNamespace {
1782    fn new(client: Arc<RoomClient>) -> Self {
1783        Self { client }
1784    }
1785
1786    pub fn list(&self) -> Value {
1787        self.client.list_members()
1788    }
1789
1790    pub fn on_sync(
1791        &self,
1792        handler: impl Fn(&Value) + Send + Sync + 'static,
1793    ) -> Subscription {
1794        self.client.on_members_sync(handler)
1795    }
1796
1797    pub fn on_join(
1798        &self,
1799        handler: impl Fn(&Value) + Send + Sync + 'static,
1800    ) -> Subscription {
1801        self.client.on_member_join(handler)
1802    }
1803
1804    pub fn on_leave(
1805        &self,
1806        handler: impl Fn(&Value, &str) + Send + Sync + 'static,
1807    ) -> Subscription {
1808        self.client.on_member_leave(handler)
1809    }
1810
1811    pub async fn set_state(&self, state: Value) -> Result<(), Error> {
1812        self.client.send_member_state(state).await
1813    }
1814
1815    pub async fn clear_state(&self) -> Result<(), Error> {
1816        self.client.clear_member_state().await
1817    }
1818
1819    pub fn on_state_change(
1820        &self,
1821        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1822    ) -> Subscription {
1823        self.client.on_member_state_change(handler)
1824    }
1825}
1826
1827pub struct RoomAdminNamespace {
1828    client: Arc<RoomClient>,
1829}
1830
1831impl RoomAdminNamespace {
1832    fn new(client: Arc<RoomClient>) -> Self {
1833        Self { client }
1834    }
1835
1836    pub async fn kick(&self, member_id: &str) -> Result<(), Error> {
1837        self.client.send_admin("kick", member_id, None).await
1838    }
1839
1840    pub async fn mute(&self, member_id: &str) -> Result<(), Error> {
1841        self.client.send_admin("mute", member_id, None).await
1842    }
1843
1844    pub async fn block(&self, member_id: &str) -> Result<(), Error> {
1845        self.client.send_admin("block", member_id, None).await
1846    }
1847
1848    pub async fn set_role(&self, member_id: &str, role: &str) -> Result<(), Error> {
1849        self.client
1850            .send_admin("setRole", member_id, Some(json!({ "role": role })))
1851            .await
1852    }
1853
1854    pub async fn disable_video(&self, member_id: &str) -> Result<(), Error> {
1855        self.client.send_admin("disableVideo", member_id, None).await
1856    }
1857
1858    pub async fn stop_screen_share(&self, member_id: &str) -> Result<(), Error> {
1859        self.client
1860            .send_admin("stopScreenShare", member_id, None)
1861            .await
1862    }
1863}
1864
1865pub struct RoomMediaKindNamespace {
1866    client: Arc<RoomClient>,
1867    kind: &'static str,
1868}
1869
1870impl RoomMediaKindNamespace {
1871    fn new(client: Arc<RoomClient>, kind: &'static str) -> Self {
1872        Self { client, kind }
1873    }
1874
1875    pub async fn enable(&self, payload: Option<Value>) -> Result<(), Error> {
1876        self.client.send_media("publish", self.kind, payload).await
1877    }
1878
1879    pub async fn disable(&self) -> Result<(), Error> {
1880        self.client.send_media("unpublish", self.kind, None).await
1881    }
1882
1883    pub async fn set_muted(&self, muted: bool) -> Result<(), Error> {
1884        self.client
1885            .send_media("mute", self.kind, Some(json!({ "muted": muted })))
1886            .await
1887    }
1888}
1889
1890pub struct RoomScreenMediaNamespace {
1891    client: Arc<RoomClient>,
1892}
1893
1894impl RoomScreenMediaNamespace {
1895    fn new(client: Arc<RoomClient>) -> Self {
1896        Self { client }
1897    }
1898
1899    pub async fn start(&self, payload: Option<Value>) -> Result<(), Error> {
1900        self.client.send_media("publish", "screen", payload).await
1901    }
1902
1903    pub async fn stop(&self) -> Result<(), Error> {
1904        self.client.send_media("unpublish", "screen", None).await
1905    }
1906}
1907
1908pub struct RoomMediaDevicesNamespace {
1909    client: Arc<RoomClient>,
1910}
1911
1912impl RoomMediaDevicesNamespace {
1913    fn new(client: Arc<RoomClient>) -> Self {
1914        Self { client }
1915    }
1916
1917    pub async fn switch_inputs(&self, payload: Value) -> Result<(), Error> {
1918        self.client.switch_media_devices(payload).await
1919    }
1920}
1921
1922pub struct RoomMediaNamespace {
1923    client: Arc<RoomClient>,
1924}
1925
1926impl RoomMediaNamespace {
1927    fn new(client: Arc<RoomClient>) -> Self {
1928        Self { client }
1929    }
1930
1931    pub fn list(&self) -> Value {
1932        self.client.list_media_members()
1933    }
1934
1935    pub fn audio(&self) -> RoomMediaKindNamespace {
1936        RoomMediaKindNamespace::new(Arc::clone(&self.client), "audio")
1937    }
1938
1939    pub fn video(&self) -> RoomMediaKindNamespace {
1940        RoomMediaKindNamespace::new(Arc::clone(&self.client), "video")
1941    }
1942
1943    pub fn screen(&self) -> RoomScreenMediaNamespace {
1944        RoomScreenMediaNamespace::new(Arc::clone(&self.client))
1945    }
1946
1947    pub fn devices(&self) -> RoomMediaDevicesNamespace {
1948        RoomMediaDevicesNamespace::new(Arc::clone(&self.client))
1949    }
1950
1951    pub fn on_track(
1952        &self,
1953        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1954    ) -> Subscription {
1955        self.client.on_media_track(handler)
1956    }
1957
1958    pub fn on_track_removed(
1959        &self,
1960        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1961    ) -> Subscription {
1962        self.client.on_media_track_removed(handler)
1963    }
1964
1965    pub fn on_state_change(
1966        &self,
1967        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1968    ) -> Subscription {
1969        self.client.on_media_state_change(handler)
1970    }
1971
1972    pub fn on_device_change(
1973        &self,
1974        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1975    ) -> Subscription {
1976        self.client.on_media_device_change(handler)
1977    }
1978}
1979
1980pub struct RoomSessionNamespace {
1981    client: Arc<RoomClient>,
1982}
1983
1984impl RoomSessionNamespace {
1985    fn new(client: Arc<RoomClient>) -> Self {
1986        Self { client }
1987    }
1988
1989    pub fn on_error(
1990        &self,
1991        handler: impl Fn(&str, &str) + Send + Sync + 'static,
1992    ) -> Subscription {
1993        self.client.on_error(handler)
1994    }
1995
1996    pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
1997        self.client.on_kicked(handler)
1998    }
1999
2000    pub fn on_reconnect(
2001        &self,
2002        handler: impl Fn(&Value) + Send + Sync + 'static,
2003    ) -> Subscription {
2004        self.client.on_reconnect(handler)
2005    }
2006
2007    pub fn on_connection_state_change(
2008        &self,
2009        handler: impl Fn(&str) + Send + Sync + 'static,
2010    ) -> Subscription {
2011        self.client.on_connection_state_change(handler)
2012    }
2013
2014    pub fn connection_state(&self) -> String {
2015        self.client.connection_state()
2016    }
2017
2018    pub fn user_id(&self) -> Option<String> {
2019        self.client.current_user_id.lock().unwrap().clone()
2020    }
2021
2022    pub fn connection_id(&self) -> Option<String> {
2023        self.client.current_connection_id.lock().unwrap().clone()
2024    }
2025}