Skip to main content

edgebase_core/
room.rs

1// room.rs — RoomClient v2 for EdgeBase Rust SDK.
2//
3// Complete redesign from v1:
4//   - 2 client-visible state areas: sharedState (all clients), playerState (per-player)
5//   - Client can only read + subscribe + send(). All writes are server-only.
6//   - send() returns a Result resolved by requestId matching via oneshot channel
7//   - Subscription returns a Subscription struct with unsubscribe()
8//   - namespace + roomId identification (replaces single roomId)
9//
10// Usage:
11//   let room = RoomClient::new(&url, "game", "lobby-1", move || token.clone(), None);
12//   room.join().await?;
13//   let sub = room.on_shared_state(|state, changes| { ... });
14//   let result = room.send("SET_SCORE", Some(json!({"score": 42}))).await?;
15//   sub.unsubscribe();
16//   room.leave().await;
17
18use crate::error::Error;
19use serde_json::{json, Value};
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex, RwLock};
22use tokio::sync::{mpsc, oneshot};
23use tokio::time::{sleep, timeout, Duration};
24use uuid::Uuid;
25
26// ── Public types ──────────────────────────────────────────────────────────────
27
28pub struct RoomOptions {
29    pub auto_reconnect: bool,
30    pub max_reconnect_attempts: u32,
31    pub reconnect_base_delay_ms: u64,
32    /// Timeout for send() requests in ms (default: 10000)
33    pub send_timeout_ms: u64,
34    /// Timeout for WebSocket connection establishment in ms (default: 15000)
35    pub connection_timeout_ms: u64,
36}
37
38impl Default for RoomOptions {
39    fn default() -> Self {
40        Self {
41            auto_reconnect: true,
42            max_reconnect_attempts: 10,
43            reconnect_base_delay_ms: 1000,
44            send_timeout_ms: 10_000,
45            connection_timeout_ms: 15_000,
46        }
47    }
48}
49
50// ── Subscription ──────────────────────────────────────────────────────────────
51
52/// Handle returned by on_* methods. Call `unsubscribe()` to remove the handler.
53pub struct Subscription {
54    remove_fn: Mutex<Option<Box<dyn FnOnce() + Send>>>,
55}
56
57impl Subscription {
58    fn new(remove_fn: impl FnOnce() + Send + 'static) -> Self {
59        Self {
60            remove_fn: Mutex::new(Some(Box::new(remove_fn))),
61        }
62    }
63
64    /// Remove the handler. Safe to call multiple times (subsequent calls are no-ops).
65    pub fn unsubscribe(self) {
66        if let Some(f) = self.remove_fn.lock().unwrap().take() {
67            f();
68        }
69    }
70}
71
72impl Drop for Subscription {
73    fn drop(&mut self) {
74        if let Some(f) = self.remove_fn.lock().unwrap().take() {
75            f();
76        }
77    }
78}
79
80// ── Handler type aliases (internal) ──────────────────────────────────────────
81
82type StateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
83type MessageHandler = Arc<dyn Fn(&Value) + Send + Sync>;
84type ErrorHandler = Arc<dyn Fn(&str, &str) + Send + Sync>;
85type KickedHandler = Arc<dyn Fn() + Send + Sync>;
86type MembersSyncHandler = Arc<dyn Fn(&Value) + Send + Sync>;
87type MemberHandler = Arc<dyn Fn(&Value) + Send + Sync>;
88type MemberLeaveHandler = Arc<dyn Fn(&Value, &str) + Send + Sync>;
89type MemberStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
90type SignalHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
91type AnySignalHandler = Arc<dyn Fn(&str, &Value, &Value) + Send + Sync>;
92type MediaTrackHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
93type MediaStateHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
94type MediaDeviceHandler = Arc<dyn Fn(&Value, &Value) + Send + Sync>;
95type ReconnectHandler = Arc<dyn Fn(&Value) + Send + Sync>;
96type ConnectionStateHandler = Arc<dyn Fn(&str) + Send + Sync>;
97
98/// ID-tagged handler entry for removal by Subscription.
99type HandlerList<H> = Arc<Mutex<Vec<(u64, H)>>>;
100
101pub(crate) enum RoomWsCommand {
102    Send(String),
103    Close,
104}
105
106const ROOM_EXPLICIT_LEAVE_CLOSE_DELAY: Duration = Duration::from_millis(40);
107const ROOM_STATE_IDLE: &str = "idle";
108const ROOM_STATE_CONNECTING: &str = "connecting";
109const ROOM_STATE_CONNECTED: &str = "connected";
110const ROOM_STATE_RECONNECTING: &str = "reconnecting";
111const ROOM_STATE_DISCONNECTED: &str = "disconnected";
112const ROOM_STATE_KICKED: &str = "kicked";
113
114// ── RoomClient v2 ─────────────────────────────────────────────────────────────
115
116pub struct RoomClient {
117    /// Room namespace (e.g. "game", "chat")
118    pub namespace: String,
119    /// Room instance ID within the namespace
120    pub room_id: String,
121
122    // ── State ───
123    shared_state: RwLock<Value>,
124    shared_version: RwLock<u64>,
125    player_state: RwLock<Value>,
126    player_version: RwLock<u64>,
127    members: RwLock<Value>,
128    media_members: RwLock<Value>,
129    current_user_id: Mutex<Option<String>>,
130    current_connection_id: Mutex<Option<String>>,
131    connection_state: RwLock<String>,
132    reconnect_info: RwLock<Option<Value>>,
133
134    // ── Config ───
135    base_url: String,
136    token_fn: Box<dyn Fn() -> String + Send + Sync>,
137    opts: RoomOptions,
138
139    // ── Handlers (Arc-wrapped so Subscription closures can hold clones) ───
140    shared_state_handlers: HandlerList<StateHandler>,
141    player_state_handlers: HandlerList<StateHandler>,
142    message_handlers: Arc<Mutex<HashMap<String, Vec<(u64, MessageHandler)>>>>,
143    error_handlers: HandlerList<ErrorHandler>,
144    kicked_handlers: HandlerList<KickedHandler>,
145    member_sync_handlers: HandlerList<MembersSyncHandler>,
146    member_join_handlers: HandlerList<MemberHandler>,
147    member_leave_handlers: HandlerList<MemberLeaveHandler>,
148    member_state_handlers: HandlerList<MemberStateHandler>,
149    signal_handlers: Arc<Mutex<HashMap<String, Vec<(u64, SignalHandler)>>>>,
150    any_signal_handlers: HandlerList<AnySignalHandler>,
151    media_track_handlers: HandlerList<MediaTrackHandler>,
152    media_track_removed_handlers: HandlerList<MediaTrackHandler>,
153    media_state_handlers: HandlerList<MediaStateHandler>,
154    media_device_handlers: HandlerList<MediaDeviceHandler>,
155    reconnect_handlers: HandlerList<ReconnectHandler>,
156    connection_state_handlers: HandlerList<ConnectionStateHandler>,
157    handler_id_counter: Mutex<u64>,
158
159    // ── Pending send() requests (requestId → oneshot Sender) ───
160    pending_requests: Mutex<HashMap<String, oneshot::Sender<Result<Value, Error>>>>,
161    pending_signal_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
162    pending_admin_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
163    pending_member_state_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
164    pending_media_requests: Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
165
166    // ── WS send channel ───
167    send_tx: Mutex<Option<mpsc::Sender<RoomWsCommand>>>,
168
169    // ── Control ───
170    stop_tx: Mutex<Option<mpsc::Sender<()>>>,
171    intentionally_left: Mutex<bool>,
172}
173
174impl RoomClient {
175    /// Create a new v2 RoomClient.
176    ///
177    /// # Arguments
178    /// * `base_url` - EdgeBase server URL (http or https)
179    /// * `namespace` - Room namespace (e.g. "game", "chat")
180    /// * `room_id` - Room instance ID within the namespace
181    /// * `token_fn` - Closure that returns the current access token
182    /// * `opts` - Optional RoomOptions for reconnect and timeout configuration
183    pub fn new(
184        base_url: &str,
185        namespace: &str,
186        room_id: &str,
187        token_fn: impl Fn() -> String + Send + Sync + 'static,
188        opts: Option<RoomOptions>,
189    ) -> Arc<Self> {
190        Arc::new(Self {
191            namespace: namespace.to_string(),
192            room_id: room_id.to_string(),
193            shared_state: RwLock::new(json!({})),
194            shared_version: RwLock::new(0),
195            player_state: RwLock::new(json!({})),
196            player_version: RwLock::new(0),
197            members: RwLock::new(json!([])),
198            media_members: RwLock::new(json!([])),
199            current_user_id: Mutex::new(None),
200            current_connection_id: Mutex::new(None),
201            connection_state: RwLock::new(ROOM_STATE_IDLE.to_string()),
202            reconnect_info: RwLock::new(None),
203            base_url: base_url.trim_end_matches('/').to_string(),
204            token_fn: Box::new(token_fn),
205            opts: opts.unwrap_or_default(),
206            shared_state_handlers: Arc::new(Mutex::new(vec![])),
207            player_state_handlers: Arc::new(Mutex::new(vec![])),
208            message_handlers: Arc::new(Mutex::new(HashMap::new())),
209            error_handlers: Arc::new(Mutex::new(vec![])),
210            kicked_handlers: Arc::new(Mutex::new(vec![])),
211            member_sync_handlers: Arc::new(Mutex::new(vec![])),
212            member_join_handlers: Arc::new(Mutex::new(vec![])),
213            member_leave_handlers: Arc::new(Mutex::new(vec![])),
214            member_state_handlers: Arc::new(Mutex::new(vec![])),
215            signal_handlers: Arc::new(Mutex::new(HashMap::new())),
216            any_signal_handlers: Arc::new(Mutex::new(vec![])),
217            media_track_handlers: Arc::new(Mutex::new(vec![])),
218            media_track_removed_handlers: Arc::new(Mutex::new(vec![])),
219            media_state_handlers: Arc::new(Mutex::new(vec![])),
220            media_device_handlers: Arc::new(Mutex::new(vec![])),
221            reconnect_handlers: Arc::new(Mutex::new(vec![])),
222            connection_state_handlers: Arc::new(Mutex::new(vec![])),
223            handler_id_counter: Mutex::new(0),
224            pending_requests: Mutex::new(HashMap::new()),
225            pending_signal_requests: Mutex::new(HashMap::new()),
226            pending_admin_requests: Mutex::new(HashMap::new()),
227            pending_member_state_requests: Mutex::new(HashMap::new()),
228            pending_media_requests: Mutex::new(HashMap::new()),
229            send_tx: Mutex::new(None),
230            stop_tx: Mutex::new(None),
231            intentionally_left: Mutex::new(false),
232        })
233    }
234
235    // ── State Accessors ──────────────────────────────────────────────────────
236
237    /// Get current shared state (read-only snapshot).
238    pub fn get_shared_state(&self) -> Value {
239        self.shared_state.read().unwrap().clone()
240    }
241
242    /// Get current player state (read-only snapshot).
243    pub fn get_player_state(&self) -> Value {
244        self.player_state.read().unwrap().clone()
245    }
246
247    /// Get the current logical room members snapshot.
248    pub fn list_members(&self) -> Value {
249        self.members.read().unwrap().clone()
250    }
251
252    /// Get the current media member snapshot.
253    pub fn list_media_members(&self) -> Value {
254        self.media_members.read().unwrap().clone()
255    }
256
257    /// Get the current session connection state.
258    pub fn connection_state(&self) -> String {
259        self.connection_state.read().unwrap().clone()
260    }
261
262    pub fn state(self: &Arc<Self>) -> RoomStateNamespace {
263        RoomStateNamespace::new(Arc::clone(self))
264    }
265
266    pub fn meta(self: &Arc<Self>) -> RoomMetaNamespace {
267        RoomMetaNamespace::new(Arc::clone(self))
268    }
269
270    pub fn signals(self: &Arc<Self>) -> RoomSignalsNamespace {
271        RoomSignalsNamespace::new(Arc::clone(self))
272    }
273
274    pub fn members(self: &Arc<Self>) -> RoomMembersNamespace {
275        RoomMembersNamespace::new(Arc::clone(self))
276    }
277
278    pub fn admin(self: &Arc<Self>) -> RoomAdminNamespace {
279        RoomAdminNamespace::new(Arc::clone(self))
280    }
281
282    pub fn media(self: &Arc<Self>) -> RoomMediaNamespace {
283        RoomMediaNamespace::new(Arc::clone(self))
284    }
285
286    pub fn session(self: &Arc<Self>) -> RoomSessionNamespace {
287        RoomSessionNamespace::new(Arc::clone(self))
288    }
289
290    // ── Metadata (HTTP, no WebSocket needed) ─────────────────────────────────
291
292    /// Get room metadata without joining (HTTP GET).
293    /// Returns developer-defined metadata set by room.setMetadata() on the server.
294    pub async fn get_metadata(&self) -> Result<Value, Error> {
295        Self::get_metadata_static(&self.base_url, &self.namespace, &self.room_id).await
296    }
297
298    /// Static: Get room metadata without creating a RoomClient instance.
299    /// Useful for lobby screens where you need room info before joining.
300    pub async fn get_metadata_static(
301        base_url: &str,
302        namespace: &str,
303        room_id: &str,
304    ) -> Result<Value, Error> {
305        let url = format!(
306            "{}/api/room/metadata?namespace={}&id={}",
307            base_url.trim_end_matches('/'),
308            urlencoding::encode(namespace),
309            urlencoding::encode(room_id)
310        );
311        let resp = reqwest::get(&url)
312            .await
313            .map_err(|e| Error::Room(format!(
314                "Room metadata request could not reach {}. Make sure the EdgeBase server is running and reachable. Cause: {}",
315                url, e
316            )))?;
317        let status = resp.status();
318        let body = resp
319            .text()
320            .await
321            .map_err(|e| Error::Room(format!("Failed to read room metadata body from {}: {}", url, e)))?;
322        if !status.is_success() {
323            if let Ok(json) = serde_json::from_str::<Value>(&body) {
324                for key in ["message", "error", "detail"] {
325                    if let Some(message) = json.get(key).and_then(|value| value.as_str()) {
326                        let trimmed = message.trim();
327                        if !trimmed.is_empty() {
328                            return Err(Error::Room(trimmed.to_string()));
329                        }
330                    }
331                }
332            }
333            return Err(Error::Room(format!(
334                "Failed to load room metadata for '{}' in namespace '{}' (HTTP {}).",
335                room_id, namespace, status
336            )));
337        }
338        serde_json::from_str(&body)
339            .map_err(|e| Error::Room(format!("Failed to parse room metadata: {}", e)))
340    }
341
342    // ── Connection Lifecycle ─────────────────────────────────────────────────
343
344    /// Connect to the room, authenticate, and join.
345    pub async fn join(self: &Arc<Self>) -> Result<(), Error> {
346        *self.intentionally_left.lock().unwrap() = false;
347        self.set_connection_state(ROOM_STATE_CONNECTING);
348
349        let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
350        *self.stop_tx.lock().unwrap() = Some(stop_tx);
351
352        let this = Arc::clone(self);
353        tokio::spawn(async move {
354            let mut attempts = 0u32;
355            loop {
356                match this.establish().await {
357                    Ok(mut ws_rx) => {
358                        attempts = 0;
359                        loop {
360                            tokio::select! {
361                                _ = stop_rx.recv() => return,
362                                msg = ws_rx.recv() => {
363                                    match msg {
364                                        Some(raw) => this.handle_message(&raw),
365                                        None => {
366                                            // WS closed — reject pending requests
367                                            // if disconnect was not intentional
368                                            if !*this.intentionally_left.lock().unwrap() {
369                                                this.reject_all_pending(
370                                                    "WebSocket disconnected",
371                                                );
372                                            }
373                                            break;
374                                        }
375                                    }
376                                }
377                            }
378                        }
379                    }
380                    Err(_) => {}
381                }
382                if *this.intentionally_left.lock().unwrap() {
383                    return;
384                }
385                if !this.opts.auto_reconnect || attempts >= this.opts.max_reconnect_attempts {
386                    this.set_connection_state(ROOM_STATE_DISCONNECTED);
387                    return;
388                }
389                let delay = (this.opts.reconnect_base_delay_ms * (1u64 << attempts)).min(30_000);
390                attempts += 1;
391                this.begin_reconnect_attempt(attempts as u64);
392                sleep(Duration::from_millis(delay)).await;
393            }
394        });
395        Ok(())
396    }
397
398    /// Leave the room and disconnect. Cleans up all pending send() requests.
399    pub async fn leave(&self) {
400        *self.intentionally_left.lock().unwrap() = true;
401
402        let send_tx = self.send_tx.lock().unwrap().clone();
403        if let Some(tx) = send_tx.as_ref() {
404            let _ = tx
405                .send(RoomWsCommand::Send(
406                    json!({"type": "leave"}).to_string(),
407                ))
408                .await;
409            sleep(ROOM_EXPLICIT_LEAVE_CLOSE_DELAY).await;
410            let _ = tx.send(RoomWsCommand::Close).await;
411        }
412
413        if let Some(tx) = self.stop_tx.lock().unwrap().take() {
414            let _ = tx.send(()).await;
415        }
416
417        *self.send_tx.lock().unwrap() = None;
418
419        // Reject all pending requests with explicit error
420        self.reject_all_pending("Room left");
421
422        // Reset state
423        *self.shared_state.write().unwrap() = json!({});
424        *self.shared_version.write().unwrap() = 0;
425        *self.player_state.write().unwrap() = json!({});
426        *self.player_version.write().unwrap() = 0;
427        *self.members.write().unwrap() = json!([]);
428        *self.media_members.write().unwrap() = json!([]);
429        *self.current_user_id.lock().unwrap() = None;
430        *self.current_connection_id.lock().unwrap() = None;
431        *self.reconnect_info.write().unwrap() = None;
432        self.set_connection_state(ROOM_STATE_IDLE);
433    }
434
435    // ── Actions ──────────────────────────────────────────────────────────────
436
437    /// Send an action to the server.
438    /// Returns a Result that resolves with the action result from the server.
439    ///
440    /// # Example
441    /// ```ignore
442    /// let result = room.send("SET_SCORE", Some(json!({"score": 42}))).await?;
443    /// ```
444    pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
445        if self.send_tx.lock().unwrap().is_none() {
446            return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions, signals, or media.".to_string()));
447        }
448
449        let request_id = Uuid::new_v4().to_string();
450        let (tx, rx) = oneshot::channel::<Result<Value, Error>>();
451
452        self.pending_requests
453            .lock()
454            .unwrap()
455            .insert(request_id.clone(), tx);
456
457        self.ws_send(json!({
458            "type": "send",
459            "actionType": action_type,
460            "payload": payload.unwrap_or(json!({})),
461            "requestId": request_id,
462        }));
463
464        let timeout_ms = self.opts.send_timeout_ms;
465        let action_type_owned = action_type.to_string();
466        let req_id = request_id.clone();
467
468        match timeout(Duration::from_millis(timeout_ms), rx).await {
469            Ok(Ok(result)) => result,
470            Ok(Err(_)) => {
471                // oneshot channel closed (room left or sender dropped)
472                self.pending_requests.lock().unwrap().remove(&req_id);
473                Err(Error::Room(
474                    "Room left while waiting for action result".to_string(),
475                ))
476            }
477            Err(_) => {
478                // Timeout
479                self.pending_requests.lock().unwrap().remove(&req_id);
480                Err(Error::RoomTimeout(format!(
481                    "Action '{}' timed out",
482                    action_type_owned
483                )))
484            }
485        }
486    }
487
488    // ── Subscriptions (v2 API) ───────────────────────────────────────────────
489
490    /// Subscribe to shared state changes.
491    /// Handler receives (full_state, changes) on each sync/delta.
492    pub fn on_shared_state(
493        &self,
494        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
495    ) -> Subscription {
496        let id = self.next_handler_id();
497        let handler = Arc::new(handler) as StateHandler;
498        self.shared_state_handlers.lock().unwrap().push((id, handler));
499
500        let list = Arc::clone(&self.shared_state_handlers);
501        Subscription::new(move || {
502            list.lock().unwrap().retain(|(hid, _)| *hid != id);
503        })
504    }
505
506    /// Subscribe to player state changes.
507    /// Handler receives (full_state, changes) on each sync/delta.
508    pub fn on_player_state(
509        &self,
510        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
511    ) -> Subscription {
512        let id = self.next_handler_id();
513        let handler = Arc::new(handler) as StateHandler;
514        self.player_state_handlers.lock().unwrap().push((id, handler));
515
516        let list = Arc::clone(&self.player_state_handlers);
517        Subscription::new(move || {
518            list.lock().unwrap().retain(|(hid, _)| *hid != id);
519        })
520    }
521
522    /// Subscribe to messages of a specific type sent by room.sendMessage().
523    ///
524    /// # Example
525    /// ```ignore
526    /// let sub = room.on_message("game_over", |data| { println!("{:?}", data); });
527    /// ```
528    pub fn on_message(
529        &self,
530        msg_type: &str,
531        handler: impl Fn(&Value) + Send + Sync + 'static,
532    ) -> Subscription {
533        let id = self.next_handler_id();
534        let handler = Arc::new(handler) as MessageHandler;
535        let msg_type = msg_type.to_string();
536        {
537            let mut map = self.message_handlers.lock().unwrap();
538            map.entry(msg_type.clone())
539                .or_insert_with(Vec::new)
540                .push((id, handler));
541        }
542
543        let map = Arc::clone(&self.message_handlers);
544        Subscription::new(move || {
545            if let Some(list) = map.lock().unwrap().get_mut(&msg_type) {
546                list.retain(|(hid, _)| *hid != id);
547            }
548        })
549    }
550
551    /// Subscribe to error events.
552    pub fn on_error(
553        &self,
554        handler: impl Fn(&str, &str) + Send + Sync + 'static,
555    ) -> Subscription {
556        let id = self.next_handler_id();
557        let handler = Arc::new(handler) as ErrorHandler;
558        self.error_handlers.lock().unwrap().push((id, handler));
559
560        let list = Arc::clone(&self.error_handlers);
561        Subscription::new(move || {
562            list.lock().unwrap().retain(|(hid, _)| *hid != id);
563        })
564    }
565
566    /// Subscribe to kick events. After being kicked, auto-reconnect is disabled.
567    pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
568        let id = self.next_handler_id();
569        let handler = Arc::new(handler) as KickedHandler;
570        self.kicked_handlers.lock().unwrap().push((id, handler));
571
572        let list = Arc::clone(&self.kicked_handlers);
573        Subscription::new(move || {
574            list.lock().unwrap().retain(|(hid, _)| *hid != id);
575        })
576    }
577
578    pub fn on_members_sync(
579        &self,
580        handler: impl Fn(&Value) + Send + Sync + 'static,
581    ) -> Subscription {
582        let id = self.next_handler_id();
583        let handler = Arc::new(handler) as MembersSyncHandler;
584        self.member_sync_handlers.lock().unwrap().push((id, handler));
585
586        let list = Arc::clone(&self.member_sync_handlers);
587        Subscription::new(move || {
588            list.lock().unwrap().retain(|(hid, _)| *hid != id);
589        })
590    }
591
592    pub fn on_member_join(
593        &self,
594        handler: impl Fn(&Value) + Send + Sync + 'static,
595    ) -> Subscription {
596        let id = self.next_handler_id();
597        let handler = Arc::new(handler) as MemberHandler;
598        self.member_join_handlers.lock().unwrap().push((id, handler));
599
600        let list = Arc::clone(&self.member_join_handlers);
601        Subscription::new(move || {
602            list.lock().unwrap().retain(|(hid, _)| *hid != id);
603        })
604    }
605
606    pub fn on_member_leave(
607        &self,
608        handler: impl Fn(&Value, &str) + Send + Sync + 'static,
609    ) -> Subscription {
610        let id = self.next_handler_id();
611        let handler = Arc::new(handler) as MemberLeaveHandler;
612        self.member_leave_handlers.lock().unwrap().push((id, handler));
613
614        let list = Arc::clone(&self.member_leave_handlers);
615        Subscription::new(move || {
616            list.lock().unwrap().retain(|(hid, _)| *hid != id);
617        })
618    }
619
620    pub fn on_member_state_change(
621        &self,
622        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
623    ) -> Subscription {
624        let id = self.next_handler_id();
625        let handler = Arc::new(handler) as MemberStateHandler;
626        self.member_state_handlers.lock().unwrap().push((id, handler));
627
628        let list = Arc::clone(&self.member_state_handlers);
629        Subscription::new(move || {
630            list.lock().unwrap().retain(|(hid, _)| *hid != id);
631        })
632    }
633
634    pub fn on_signal(
635        &self,
636        event: &str,
637        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
638    ) -> Subscription {
639        let id = self.next_handler_id();
640        let handler = Arc::new(handler) as SignalHandler;
641        let event = event.to_string();
642        {
643            let mut map = self.signal_handlers.lock().unwrap();
644            map.entry(event.clone())
645                .or_insert_with(Vec::new)
646                .push((id, handler));
647        }
648
649        let map = Arc::clone(&self.signal_handlers);
650        Subscription::new(move || {
651            if let Some(list) = map.lock().unwrap().get_mut(&event) {
652                list.retain(|(hid, _)| *hid != id);
653            }
654        })
655    }
656
657    pub fn on_any_signal(
658        &self,
659        handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
660    ) -> Subscription {
661        let id = self.next_handler_id();
662        let handler = Arc::new(handler) as AnySignalHandler;
663        self.any_signal_handlers.lock().unwrap().push((id, handler));
664
665        let list = Arc::clone(&self.any_signal_handlers);
666        Subscription::new(move || {
667            list.lock().unwrap().retain(|(hid, _)| *hid != id);
668        })
669    }
670
671    pub fn on_media_track(
672        &self,
673        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
674    ) -> Subscription {
675        let id = self.next_handler_id();
676        let handler = Arc::new(handler) as MediaTrackHandler;
677        self.media_track_handlers.lock().unwrap().push((id, handler));
678
679        let list = Arc::clone(&self.media_track_handlers);
680        Subscription::new(move || {
681            list.lock().unwrap().retain(|(hid, _)| *hid != id);
682        })
683    }
684
685    pub fn on_media_track_removed(
686        &self,
687        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
688    ) -> Subscription {
689        let id = self.next_handler_id();
690        let handler = Arc::new(handler) as MediaTrackHandler;
691        self.media_track_removed_handlers
692            .lock()
693            .unwrap()
694            .push((id, handler));
695
696        let list = Arc::clone(&self.media_track_removed_handlers);
697        Subscription::new(move || {
698            list.lock().unwrap().retain(|(hid, _)| *hid != id);
699        })
700    }
701
702    pub fn on_media_state_change(
703        &self,
704        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
705    ) -> Subscription {
706        let id = self.next_handler_id();
707        let handler = Arc::new(handler) as MediaStateHandler;
708        self.media_state_handlers.lock().unwrap().push((id, handler));
709
710        let list = Arc::clone(&self.media_state_handlers);
711        Subscription::new(move || {
712            list.lock().unwrap().retain(|(hid, _)| *hid != id);
713        })
714    }
715
716    pub fn on_media_device_change(
717        &self,
718        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
719    ) -> Subscription {
720        let id = self.next_handler_id();
721        let handler = Arc::new(handler) as MediaDeviceHandler;
722        self.media_device_handlers.lock().unwrap().push((id, handler));
723
724        let list = Arc::clone(&self.media_device_handlers);
725        Subscription::new(move || {
726            list.lock().unwrap().retain(|(hid, _)| *hid != id);
727        })
728    }
729
730    pub fn on_reconnect(
731        &self,
732        handler: impl Fn(&Value) + Send + Sync + 'static,
733    ) -> Subscription {
734        let id = self.next_handler_id();
735        let handler = Arc::new(handler) as ReconnectHandler;
736        self.reconnect_handlers.lock().unwrap().push((id, handler));
737
738        let list = Arc::clone(&self.reconnect_handlers);
739        Subscription::new(move || {
740            list.lock().unwrap().retain(|(hid, _)| *hid != id);
741        })
742    }
743
744    pub fn on_connection_state_change(
745        &self,
746        handler: impl Fn(&str) + Send + Sync + 'static,
747    ) -> Subscription {
748        let id = self.next_handler_id();
749        let handler = Arc::new(handler) as ConnectionStateHandler;
750        self.connection_state_handlers
751            .lock()
752            .unwrap()
753            .push((id, handler));
754
755        let list = Arc::clone(&self.connection_state_handlers);
756        Subscription::new(move || {
757            list.lock().unwrap().retain(|(hid, _)| *hid != id);
758        })
759    }
760
761    pub async fn send_signal(
762        &self,
763        event: &str,
764        payload: Option<Value>,
765        options: Option<Value>,
766    ) -> Result<(), Error> {
767        if self.send_tx.lock().unwrap().is_none() {
768            return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions, signals, or media.".to_string()));
769        }
770
771        let request_id = Uuid::new_v4().to_string();
772        let options = options.unwrap_or_else(|| json!({}));
773        let message = json!({
774            "type": "signal",
775            "event": event,
776            "payload": payload.unwrap_or_else(|| json!({})),
777            "requestId": request_id,
778            "memberId": options.get("memberId").cloned().unwrap_or(Value::Null),
779            "includeSelf": options.get("includeSelf").and_then(Value::as_bool).unwrap_or(false),
780        });
781
782        self.send_unit_request(&self.pending_signal_requests, request_id, message, format!("Signal '{}' timed out", event)).await
783    }
784
785    pub async fn send_member_state(&self, state: Value) -> Result<(), Error> {
786        if self.send_tx.lock().unwrap().is_none() {
787            return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions, signals, or media.".to_string()));
788        }
789
790        let request_id = Uuid::new_v4().to_string();
791        let message = json!({
792            "type": "member_state",
793            "state": state,
794            "requestId": request_id,
795        });
796
797        self.send_unit_request(
798            &self.pending_member_state_requests,
799            request_id,
800            message,
801            "Member state update timed out".to_string(),
802        ).await
803    }
804
805    pub async fn clear_member_state(&self) -> Result<(), Error> {
806        if self.send_tx.lock().unwrap().is_none() {
807            return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions, signals, or media.".to_string()));
808        }
809
810        let request_id = Uuid::new_v4().to_string();
811        let message = json!({
812            "type": "member_state_clear",
813            "requestId": request_id,
814        });
815
816        self.send_unit_request(
817            &self.pending_member_state_requests,
818            request_id,
819            message,
820            "Member state clear timed out".to_string(),
821        ).await
822    }
823
824    pub async fn send_admin(
825        &self,
826        operation: &str,
827        member_id: &str,
828        payload: Option<Value>,
829    ) -> Result<(), Error> {
830        if self.send_tx.lock().unwrap().is_none() {
831            return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions, signals, or media.".to_string()));
832        }
833
834        let request_id = Uuid::new_v4().to_string();
835        let message = json!({
836            "type": "admin",
837            "operation": operation,
838            "memberId": member_id,
839            "payload": payload.unwrap_or_else(|| json!({})),
840            "requestId": request_id,
841        });
842
843        self.send_unit_request(
844            &self.pending_admin_requests,
845            request_id,
846            message,
847            format!("Admin '{}' timed out", operation),
848        ).await
849    }
850
851    pub async fn send_media(
852        &self,
853        operation: &str,
854        kind: &str,
855        payload: Option<Value>,
856    ) -> Result<(), Error> {
857        if self.send_tx.lock().unwrap().is_none() {
858            return Err(Error::Room("Not connected to room. Call join() and wait for the room to connect before sending actions, signals, or media.".to_string()));
859        }
860
861        let request_id = Uuid::new_v4().to_string();
862        let message = json!({
863            "type": "media",
864            "operation": operation,
865            "kind": kind,
866            "payload": payload.unwrap_or_else(|| json!({})),
867            "requestId": request_id,
868        });
869
870        self.send_unit_request(
871            &self.pending_media_requests,
872            request_id,
873            message,
874            format!("Media '{}:{}' timed out", operation, kind),
875        ).await
876    }
877
878    pub async fn switch_media_devices(&self, payload: Value) -> Result<(), Error> {
879        if let Some(device_id) = payload.get("audioInputId").and_then(Value::as_str) {
880            self.send_media("device", "audio", Some(json!({ "deviceId": device_id }))).await?;
881        }
882        if let Some(device_id) = payload.get("videoInputId").and_then(Value::as_str) {
883            self.send_media("device", "video", Some(json!({ "deviceId": device_id }))).await?;
884        }
885        if let Some(device_id) = payload.get("screenInputId").and_then(Value::as_str) {
886            self.send_media("device", "screen", Some(json!({ "deviceId": device_id }))).await?;
887        }
888        Ok(())
889    }
890
891    /// Reject all pending requests across all pending maps with an error message.
892    /// Called when the WebSocket disconnects unexpectedly.
893    fn reject_all_pending(&self, message: &str) {
894        let error_msg = message.to_string();
895
896        // Reject pending action requests
897        for (_, tx) in self.pending_requests.lock().unwrap().drain() {
898            let _ = tx.send(Err(Error::Room(error_msg.clone())));
899        }
900
901        // Reject pending signal requests
902        for (_, tx) in self.pending_signal_requests.lock().unwrap().drain() {
903            let _ = tx.send(Err(Error::Room(error_msg.clone())));
904        }
905
906        // Reject pending admin requests
907        for (_, tx) in self.pending_admin_requests.lock().unwrap().drain() {
908            let _ = tx.send(Err(Error::Room(error_msg.clone())));
909        }
910
911        // Reject pending member state requests
912        for (_, tx) in self.pending_member_state_requests.lock().unwrap().drain() {
913            let _ = tx.send(Err(Error::Room(error_msg.clone())));
914        }
915
916        // Reject pending media requests
917        for (_, tx) in self.pending_media_requests.lock().unwrap().drain() {
918            let _ = tx.send(Err(Error::Room(error_msg.clone())));
919        }
920    }
921
922    /// Leave the room, clear all handler lists, and release resources.
923    /// After calling destroy(), this RoomClient instance should not be reused.
924    pub async fn destroy(self: &Arc<Self>) {
925        self.leave().await;
926
927        // Clear all handler lists
928        self.shared_state_handlers.lock().unwrap().clear();
929        self.player_state_handlers.lock().unwrap().clear();
930        self.message_handlers.lock().unwrap().clear();
931        self.error_handlers.lock().unwrap().clear();
932        self.kicked_handlers.lock().unwrap().clear();
933        self.member_sync_handlers.lock().unwrap().clear();
934        self.member_join_handlers.lock().unwrap().clear();
935        self.member_leave_handlers.lock().unwrap().clear();
936        self.member_state_handlers.lock().unwrap().clear();
937        self.signal_handlers.lock().unwrap().clear();
938        self.any_signal_handlers.lock().unwrap().clear();
939        self.media_track_handlers.lock().unwrap().clear();
940        self.media_track_removed_handlers.lock().unwrap().clear();
941        self.media_state_handlers.lock().unwrap().clear();
942        self.media_device_handlers.lock().unwrap().clear();
943        self.reconnect_handlers.lock().unwrap().clear();
944        self.connection_state_handlers.lock().unwrap().clear();
945    }
946
947    // ── Private: Connection ──────────────────────────────────────────────────
948
949    fn ws_url(&self) -> String {
950        let u = self
951            .base_url
952            .replace("https://", "wss://")
953            .replace("http://", "ws://");
954        format!(
955            "{}/api/room?namespace={}&id={}",
956            u,
957            urlencoding::encode(&self.namespace),
958            urlencoding::encode(&self.room_id)
959        )
960    }
961
962    /// Send a raw JSON message over the WS write channel.
963    fn ws_send(&self, msg: Value) {
964        let s = msg.to_string();
965        if let Some(tx) = self.send_tx.lock().unwrap().as_ref() {
966            let tx = tx.clone();
967            tokio::spawn(async move {
968                let _ = tx.send(RoomWsCommand::Send(s)).await;
969            });
970        }
971    }
972
973    #[cfg(test)]
974    pub(crate) fn attach_send_channel_for_testing(&self, tx: mpsc::Sender<RoomWsCommand>) {
975        *self.send_tx.lock().unwrap() = Some(tx);
976    }
977
978    #[cfg(test)]
979    pub(crate) fn handle_message_for_testing(&self, raw: &str) {
980        self.handle_message(raw);
981    }
982
983    async fn establish(&self) -> anyhow::Result<mpsc::Receiver<String>> {
984        use futures_util::{SinkExt, StreamExt};
985        use tokio_tungstenite::{connect_async, tungstenite::Message};
986
987        let (ws_stream, _response) = tokio::time::timeout(
988            std::time::Duration::from_millis(self.opts.connection_timeout_ms),
989            connect_async(self.ws_url()),
990        )
991        .await
992        .map_err(|_| anyhow::anyhow!(
993            "Room WebSocket connection timed out after {}ms. Is the server running?",
994            self.opts.connection_timeout_ms,
995        ))??;
996        let (mut write, mut read) = ws_stream.split();
997
998        // Auth
999        let auth = json!({"type": "auth", "token": (self.token_fn)(), "sdkVersion": "0.2.6"});
1000        write.send(Message::Text(auth.to_string().into())).await?;
1001
1002        // Wait for auth_success
1003        let join_raw = if let Some(Ok(Message::Text(raw))) = read.next().await {
1004            let raw_str: &str = &raw;
1005            let resp: Value = serde_json::from_str(raw_str)?;
1006            let t = resp["type"].as_str().unwrap_or("");
1007            if t != "auth_success" && t != "auth_refreshed" {
1008                anyhow::bail!("Room auth failed: {}", resp["message"]);
1009            }
1010
1011            *self.current_user_id.lock().unwrap() = resp["userId"].as_str().map(|value| value.to_string());
1012            *self.current_connection_id.lock().unwrap() = resp["connectionId"].as_str().map(|value| value.to_string());
1013
1014            // v2: join with last known shared + player state for eviction recovery
1015            let join_msg = json!({
1016                "type": "join",
1017                "lastSharedState": *self.shared_state.read().unwrap(),
1018                "lastSharedVersion": *self.shared_version.read().unwrap(),
1019                "lastPlayerState": *self.player_state.read().unwrap(),
1020                "lastPlayerVersion": *self.player_version.read().unwrap(),
1021            });
1022            join_msg.to_string()
1023        } else {
1024            anyhow::bail!("Room: no auth response");
1025        };
1026
1027        // Shared WS write channel
1028        let (write_tx, mut write_rx) = mpsc::channel::<RoomWsCommand>(128);
1029        *self.send_tx.lock().unwrap() = Some(write_tx.clone());
1030
1031        // WS writer task
1032        tokio::spawn(async move {
1033            while let Some(command) = write_rx.recv().await {
1034                match command {
1035                    RoomWsCommand::Send(msg) => {
1036                        if write.send(Message::Text(msg.into())).await.is_err() {
1037                            break;
1038                        }
1039                    }
1040                    RoomWsCommand::Close => {
1041                        let _ = write.send(Message::Close(None)).await;
1042                        break;
1043                    }
1044                }
1045            }
1046        });
1047
1048        // Send join message
1049        let _ = write_tx.send(RoomWsCommand::Send(join_raw)).await;
1050
1051        // Heartbeat
1052        let htx = write_tx.clone();
1053        tokio::spawn(async move {
1054            loop {
1055                sleep(Duration::from_secs(30)).await;
1056                if htx
1057                    .send(RoomWsCommand::Send(json!({"type":"ping"}).to_string()))
1058                    .await
1059                    .is_err()
1060                {
1061                    break;
1062                }
1063            }
1064        });
1065
1066        // WS reader → msg_rx channel consumed by join() loop
1067        let (msg_tx, msg_rx) = mpsc::channel::<String>(128);
1068        tokio::spawn(async move {
1069            while let Some(Ok(Message::Text(raw))) = read.next().await {
1070                let raw_str: String = raw.into();
1071                if msg_tx.send(raw_str).await.is_err() {
1072                    break;
1073                }
1074            }
1075        });
1076
1077        Ok(msg_rx)
1078    }
1079
1080    // ── Private: Message Handling ────────────────────────────────────────────
1081
1082    fn handle_message(&self, raw: &str) {
1083        let msg: Value = match serde_json::from_str(raw) {
1084            Ok(v) => v,
1085            Err(_) => return,
1086        };
1087        let t = msg["type"].as_str().unwrap_or("");
1088
1089        match t {
1090            "sync" => self.handle_sync(&msg),
1091            "shared_delta" => self.handle_shared_delta(&msg),
1092            "player_delta" => self.handle_player_delta(&msg),
1093            "action_result" => self.handle_action_result(&msg),
1094            "action_error" => self.handle_action_error(&msg),
1095            "message" => self.handle_server_message(&msg),
1096            "signal" => self.handle_signal(&msg),
1097            "signal_sent" => self.resolve_pending_unit_request(&self.pending_signal_requests, &msg),
1098            "signal_error" => self.reject_pending_unit_request(&self.pending_signal_requests, &msg, "Signal error"),
1099            "members_sync" => self.handle_members_sync(&msg),
1100            "member_join" => self.handle_member_join(&msg),
1101            "member_leave" => self.handle_member_leave(&msg),
1102            "member_state" => self.handle_member_state(&msg),
1103            "member_state_error" => self.reject_pending_unit_request(&self.pending_member_state_requests, &msg, "Member state error"),
1104            "admin_result" => self.resolve_pending_unit_request(&self.pending_admin_requests, &msg),
1105            "admin_error" => self.reject_pending_unit_request(&self.pending_admin_requests, &msg, "Admin error"),
1106            "media_sync" => self.handle_media_sync(&msg),
1107            "media_track" => self.handle_media_track(&msg),
1108            "media_track_removed" => self.handle_media_track_removed(&msg),
1109            "media_state" => self.handle_media_state(&msg),
1110            "media_device" => self.handle_media_device(&msg),
1111            "media_result" => self.resolve_pending_unit_request(&self.pending_media_requests, &msg),
1112            "media_error" => self.reject_pending_unit_request(&self.pending_media_requests, &msg, "Media error"),
1113            "kicked" => self.handle_kicked(),
1114            "error" => self.handle_error(&msg),
1115            "pong" => {}
1116            _ => {}
1117        }
1118    }
1119
1120    fn handle_sync(&self, msg: &Value) {
1121        *self.shared_state.write().unwrap() = msg["sharedState"].clone();
1122        *self.shared_version.write().unwrap() = msg["sharedVersion"].as_u64().unwrap_or(0);
1123        *self.player_state.write().unwrap() = msg["playerState"].clone();
1124        *self.player_version.write().unwrap() = msg["playerVersion"].as_u64().unwrap_or(0);
1125        self.set_connection_state(ROOM_STATE_CONNECTED);
1126
1127        if let Some(info) = self.reconnect_info.write().unwrap().take() {
1128            for (_, handler) in self.reconnect_handlers.lock().unwrap().iter() {
1129                handler(&info);
1130            }
1131        }
1132
1133        // Notify shared state handlers (full state as changes on sync)
1134        let shared = self.shared_state.read().unwrap().clone();
1135        for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1136            handler(&shared, &shared);
1137        }
1138        let player = self.player_state.read().unwrap().clone();
1139        for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1140            handler(&player, &player);
1141        }
1142    }
1143
1144    fn handle_shared_delta(&self, msg: &Value) {
1145        let delta = &msg["delta"];
1146        *self.shared_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1147
1148        if let Value::Object(map) = delta {
1149            let mut state = self.shared_state.write().unwrap();
1150            for (path, value) in map {
1151                deep_set(&mut state, path, value.clone());
1152            }
1153        }
1154
1155        let state = self.shared_state.read().unwrap().clone();
1156        for (_, handler) in self.shared_state_handlers.lock().unwrap().iter() {
1157            handler(&state, delta);
1158        }
1159    }
1160
1161    fn handle_player_delta(&self, msg: &Value) {
1162        let delta = &msg["delta"];
1163        *self.player_version.write().unwrap() = msg["version"].as_u64().unwrap_or(0);
1164
1165        if let Value::Object(map) = delta {
1166            let mut state = self.player_state.write().unwrap();
1167            for (path, value) in map {
1168                deep_set(&mut state, path, value.clone());
1169            }
1170        }
1171
1172        let state = self.player_state.read().unwrap().clone();
1173        for (_, handler) in self.player_state_handlers.lock().unwrap().iter() {
1174            handler(&state, delta);
1175        }
1176    }
1177
1178    fn handle_action_result(&self, msg: &Value) {
1179        let request_id = msg["requestId"].as_str().unwrap_or("");
1180        if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1181            let _ = tx.send(Ok(msg["result"].clone()));
1182        }
1183    }
1184
1185    fn handle_action_error(&self, msg: &Value) {
1186        let request_id = msg["requestId"].as_str().unwrap_or("");
1187        if let Some(tx) = self.pending_requests.lock().unwrap().remove(request_id) {
1188            let message = msg["message"].as_str().unwrap_or("Unknown EdgeBase error. Check the server response or logs for details.");
1189            let _ = tx.send(Err(Error::Room(message.to_string())));
1190        }
1191    }
1192
1193    fn handle_server_message(&self, msg: &Value) {
1194        let message_type = msg["messageType"].as_str().unwrap_or("");
1195        let data = &msg["data"];
1196
1197        let handlers = self.message_handlers.lock().unwrap();
1198        if let Some(list) = handlers.get(message_type) {
1199            for (_, handler) in list {
1200                handler(data);
1201            }
1202        }
1203    }
1204
1205    fn handle_signal(&self, msg: &Value) {
1206        let event = msg["event"].as_str().unwrap_or("");
1207        let payload = &msg["payload"];
1208        let meta = &msg["meta"];
1209
1210        if let Some(list) = self.signal_handlers.lock().unwrap().get(event) {
1211            for (_, handler) in list {
1212                handler(payload, meta);
1213            }
1214        }
1215
1216        for (_, handler) in self.any_signal_handlers.lock().unwrap().iter() {
1217            handler(event, payload, meta);
1218        }
1219    }
1220
1221    fn handle_members_sync(&self, msg: &Value) {
1222        let members = normalize_members(&msg["members"]);
1223        *self.members.write().unwrap() = members.clone();
1224        self.merge_members_into_media();
1225
1226        for (_, handler) in self.member_sync_handlers.lock().unwrap().iter() {
1227            handler(&members);
1228        }
1229    }
1230
1231    fn handle_member_join(&self, msg: &Value) {
1232        if let Some(member) = normalize_member(&msg["member"]) {
1233            self.upsert_member(member.clone());
1234            for (_, handler) in self.member_join_handlers.lock().unwrap().iter() {
1235                handler(&member);
1236            }
1237        }
1238    }
1239
1240    fn handle_member_leave(&self, msg: &Value) {
1241        if let Some(member) = normalize_member(&msg["member"]) {
1242            let member_id = member["memberId"].as_str().unwrap_or("");
1243            self.remove_member(member_id);
1244            let reason = msg["reason"].as_str().unwrap_or("leave");
1245            for (_, handler) in self.member_leave_handlers.lock().unwrap().iter() {
1246                handler(&member, reason);
1247            }
1248        }
1249    }
1250
1251    fn handle_member_state(&self, msg: &Value) {
1252        if let Some(mut member) = normalize_member(&msg["member"]) {
1253            let state = object_or_empty(&msg["state"]);
1254            member["state"] = state.clone();
1255            self.upsert_member(member.clone());
1256            self.resolve_pending_unit_request(&self.pending_member_state_requests, msg);
1257
1258            for (_, handler) in self.member_state_handlers.lock().unwrap().iter() {
1259                handler(&member, &state);
1260            }
1261        }
1262    }
1263
1264    fn handle_media_sync(&self, msg: &Value) {
1265        let media_members = normalize_media_members(&msg["members"]);
1266        *self.media_members.write().unwrap() = media_members.clone();
1267        self.merge_members_into_media();
1268    }
1269
1270    fn handle_media_track(&self, msg: &Value) {
1271        if let (Some(member), Some(track)) = (
1272            normalize_member(&msg["member"]),
1273            normalize_track(&msg["track"]),
1274        ) {
1275            self.upsert_media_track(&member, &track);
1276            for (_, handler) in self.media_track_handlers.lock().unwrap().iter() {
1277                handler(&track, &member);
1278            }
1279        }
1280    }
1281
1282    fn handle_media_track_removed(&self, msg: &Value) {
1283        if let (Some(member), Some(track)) = (
1284            normalize_member(&msg["member"]),
1285            normalize_track(&msg["track"]),
1286        ) {
1287            self.remove_media_track(&member, &track);
1288            for (_, handler) in self
1289                .media_track_removed_handlers
1290                .lock()
1291                .unwrap()
1292                .iter()
1293            {
1294                handler(&track, &member);
1295            }
1296        }
1297    }
1298
1299    fn handle_media_state(&self, msg: &Value) {
1300        if let Some(member) = normalize_member(&msg["member"]) {
1301            let state = object_or_empty(&msg["state"]);
1302            self.upsert_media_state(&member, state.clone());
1303            for (_, handler) in self.media_state_handlers.lock().unwrap().iter() {
1304                handler(&member, &state);
1305            }
1306        }
1307    }
1308
1309    fn handle_media_device(&self, msg: &Value) {
1310        if let Some(member) = normalize_member(&msg["member"]) {
1311            let change = json!({
1312                "kind": msg["kind"].clone(),
1313                "deviceId": msg["deviceId"].clone(),
1314            });
1315            self.apply_media_device_change(&member, &change);
1316            for (_, handler) in self.media_device_handlers.lock().unwrap().iter() {
1317                handler(&member, &change);
1318            }
1319        }
1320    }
1321
1322    fn handle_kicked(&self) {
1323        self.set_connection_state(ROOM_STATE_KICKED);
1324        *self.intentionally_left.lock().unwrap() = true;
1325        for (_, handler) in self.kicked_handlers.lock().unwrap().iter() {
1326            handler();
1327        }
1328    }
1329
1330    fn handle_error(&self, msg: &Value) {
1331        let code = msg["code"].as_str().unwrap_or("");
1332        let message = msg["message"].as_str().unwrap_or("");
1333        for (_, handler) in self.error_handlers.lock().unwrap().iter() {
1334            handler(code, message);
1335        }
1336    }
1337
1338    // ── Private: Helpers ─────────────────────────────────────────────────────
1339
1340    async fn send_unit_request(
1341        &self,
1342        pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1343        request_id: String,
1344        message: Value,
1345        timeout_message: String,
1346    ) -> Result<(), Error> {
1347        let (tx, rx) = oneshot::channel::<Result<(), Error>>();
1348        pending.lock().unwrap().insert(request_id.clone(), tx);
1349        self.ws_send(message);
1350
1351        match timeout(Duration::from_millis(self.opts.send_timeout_ms), rx).await {
1352            Ok(Ok(result)) => result,
1353            Ok(Err(_)) => {
1354                pending.lock().unwrap().remove(&request_id);
1355                Err(Error::Room(
1356                    "Room left while waiting for room control result".to_string(),
1357                ))
1358            }
1359            Err(_) => {
1360                pending.lock().unwrap().remove(&request_id);
1361                Err(Error::RoomTimeout(timeout_message))
1362            }
1363        }
1364    }
1365
1366    fn resolve_pending_unit_request(
1367        &self,
1368        pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1369        msg: &Value,
1370    ) {
1371        let request_id = msg["requestId"].as_str().unwrap_or("");
1372        if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1373            let _ = tx.send(Ok(()));
1374        }
1375    }
1376
1377    fn reject_pending_unit_request(
1378        &self,
1379        pending: &Mutex<HashMap<String, oneshot::Sender<Result<(), Error>>>>,
1380        msg: &Value,
1381        fallback: &str,
1382    ) {
1383        let request_id = msg["requestId"].as_str().unwrap_or("");
1384        if let Some(tx) = pending.lock().unwrap().remove(request_id) {
1385            let message = msg["message"].as_str().unwrap_or(fallback);
1386            let _ = tx.send(Err(Error::Room(message.to_string())));
1387        }
1388    }
1389
1390    fn set_connection_state(&self, next: &str) {
1391        let mut state = self.connection_state.write().unwrap();
1392        if state.as_str() == next {
1393            return;
1394        }
1395        *state = next.to_string();
1396        drop(state);
1397
1398        for (_, handler) in self.connection_state_handlers.lock().unwrap().iter() {
1399            handler(next);
1400        }
1401    }
1402
1403    fn begin_reconnect_attempt(&self, attempt: u64) {
1404        *self.reconnect_info.write().unwrap() = Some(json!({ "attempt": attempt }));
1405        self.set_connection_state(ROOM_STATE_RECONNECTING);
1406    }
1407
1408    fn upsert_member(&self, member: Value) {
1409        let member_id = member["memberId"].as_str().unwrap_or("").to_string();
1410        if member_id.is_empty() {
1411            return;
1412        }
1413
1414        let mut members = self.members.write().unwrap();
1415        let list = members.as_array_mut().expect("members array");
1416        if let Some(existing) = list
1417            .iter_mut()
1418            .find(|entry| entry["memberId"].as_str() == Some(member_id.as_str()))
1419        {
1420            *existing = member;
1421        } else {
1422            list.push(member);
1423        }
1424        drop(members);
1425        self.merge_members_into_media();
1426    }
1427
1428    fn remove_member(&self, member_id: &str) {
1429        {
1430            let mut members = self.members.write().unwrap();
1431            if let Some(list) = members.as_array_mut() {
1432                list.retain(|entry| entry["memberId"].as_str() != Some(member_id));
1433            }
1434        }
1435        {
1436            let mut media_members = self.media_members.write().unwrap();
1437            if let Some(list) = media_members.as_array_mut() {
1438                list.retain(|entry| entry["member"]["memberId"].as_str() != Some(member_id));
1439            }
1440        }
1441    }
1442
1443    fn merge_members_into_media(&self) {
1444        let members = self.members.read().unwrap().clone();
1445        let mut media_members = self.media_members.write().unwrap();
1446        if let Some(list) = media_members.as_array_mut() {
1447            for media_member in list.iter_mut() {
1448                let member_id = media_member["member"]["memberId"].as_str().unwrap_or("");
1449                if let Some(member) = members
1450                    .as_array()
1451                    .and_then(|entries| {
1452                        entries
1453                            .iter()
1454                            .find(|entry| entry["memberId"].as_str() == Some(member_id))
1455                    })
1456                {
1457                    media_member["member"] = member.clone();
1458                }
1459            }
1460        }
1461    }
1462
1463    fn ensure_media_member(&self, member: &Value) -> usize {
1464        self.upsert_member(member.clone());
1465        let member_id = member["memberId"].as_str().unwrap_or("");
1466        let mut media_members = self.media_members.write().unwrap();
1467        let list = media_members.as_array_mut().expect("media members array");
1468        if let Some(index) = list
1469            .iter()
1470            .position(|entry| entry["member"]["memberId"].as_str() == Some(member_id))
1471        {
1472            list[index]["member"] = member.clone();
1473            return index;
1474        }
1475
1476        list.push(json!({
1477            "member": member,
1478            "state": {},
1479            "tracks": [],
1480        }));
1481        list.len() - 1
1482    }
1483
1484    fn upsert_media_track(&self, member: &Value, track: &Value) {
1485        let index = self.ensure_media_member(member);
1486        let mut media_members = self.media_members.write().unwrap();
1487        let list = media_members.as_array_mut().expect("media members array");
1488        let tracks = list[index]["tracks"].as_array_mut().expect("tracks array");
1489        let kind = track["kind"].as_str().unwrap_or("");
1490        if let Some(existing) = tracks
1491            .iter_mut()
1492            .find(|entry| entry["kind"].as_str() == Some(kind))
1493        {
1494            *existing = track.clone();
1495        } else {
1496            tracks.push(track.clone());
1497        }
1498        apply_track_to_state(&mut list[index]["state"], track, true);
1499    }
1500
1501    fn remove_media_track(&self, member: &Value, track: &Value) {
1502        let index = self.ensure_media_member(member);
1503        let mut media_members = self.media_members.write().unwrap();
1504        let list = media_members.as_array_mut().expect("media members array");
1505        let kind = track["kind"].as_str().unwrap_or("");
1506        if let Some(tracks) = list[index]["tracks"].as_array_mut() {
1507            tracks.retain(|entry| entry["kind"].as_str() != Some(kind));
1508        }
1509        apply_track_to_state(&mut list[index]["state"], track, false);
1510    }
1511
1512    fn upsert_media_state(&self, member: &Value, state: Value) {
1513        let index = self.ensure_media_member(member);
1514        let mut media_members = self.media_members.write().unwrap();
1515        let list = media_members.as_array_mut().expect("media members array");
1516        list[index]["state"] = state;
1517    }
1518
1519    fn apply_media_device_change(&self, member: &Value, change: &Value) {
1520        let index = self.ensure_media_member(member);
1521        let mut media_members = self.media_members.write().unwrap();
1522        let list = media_members.as_array_mut().expect("media members array");
1523        let kind = change["kind"].as_str().unwrap_or("");
1524        let device_id = change["deviceId"].clone();
1525        if let Some(state) = list[index]["state"].as_object_mut() {
1526            let kind_state = state.entry(kind.to_string()).or_insert_with(|| json!({}));
1527            if let Some(kind_state_map) = kind_state.as_object_mut() {
1528                kind_state_map.insert("deviceId".to_string(), device_id);
1529            }
1530        }
1531    }
1532
1533    fn next_handler_id(&self) -> u64 {
1534        let mut counter = self.handler_id_counter.lock().unwrap();
1535        *counter += 1;
1536        *counter
1537    }
1538}
1539
1540// ── Helper ────────────────────────────────────────────────────────────────────
1541
1542fn deep_set(obj: &mut Value, path: &str, value: Value) {
1543    if let Some(dot) = path.find('.') {
1544        let head = &path[..dot];
1545        let tail = &path[dot + 1..];
1546        if let Value::Object(map) = obj {
1547            let nested = map.entry(head.to_string()).or_insert(json!({}));
1548            deep_set(nested, tail, value);
1549        }
1550    } else if let Value::Object(map) = obj {
1551        if value.is_null() {
1552            map.remove(path);
1553        } else {
1554            map.insert(path.to_string(), value);
1555        }
1556    }
1557}
1558
1559fn object_or_empty(value: &Value) -> Value {
1560    if value.is_object() {
1561        value.clone()
1562    } else {
1563        json!({})
1564    }
1565}
1566
1567fn normalize_member(value: &Value) -> Option<Value> {
1568    let member_id = value["memberId"].as_str()?;
1569    let user_id = value["userId"].as_str()?;
1570    let mut member = json!({
1571        "memberId": member_id,
1572        "userId": user_id,
1573        "state": object_or_empty(&value["state"]),
1574    });
1575
1576    if let Some(connection_id) = value["connectionId"].as_str() {
1577        member["connectionId"] = Value::String(connection_id.to_string());
1578    }
1579    if let Some(connection_count) = value["connectionCount"].as_u64() {
1580        member["connectionCount"] = Value::from(connection_count);
1581    }
1582    if let Some(role) = value["role"].as_str() {
1583        member["role"] = Value::String(role.to_string());
1584    }
1585    Some(member)
1586}
1587
1588fn normalize_members(value: &Value) -> Value {
1589    Value::Array(
1590        value
1591            .as_array()
1592            .into_iter()
1593            .flatten()
1594            .filter_map(normalize_member)
1595            .collect(),
1596    )
1597}
1598
1599fn normalize_track(value: &Value) -> Option<Value> {
1600    let kind = value["kind"].as_str()?;
1601    let mut track = json!({
1602        "kind": kind,
1603        "muted": value["muted"].as_bool().unwrap_or(false),
1604    });
1605
1606    if let Some(track_id) = value["trackId"].as_str() {
1607        track["trackId"] = Value::String(track_id.to_string());
1608    }
1609    if let Some(device_id) = value["deviceId"].as_str() {
1610        track["deviceId"] = Value::String(device_id.to_string());
1611    }
1612    if let Some(published_at) = value["publishedAt"].as_u64() {
1613        track["publishedAt"] = Value::from(published_at);
1614    }
1615    if let Some(admin_disabled) = value["adminDisabled"].as_bool() {
1616        track["adminDisabled"] = Value::Bool(admin_disabled);
1617    }
1618    Some(track)
1619}
1620
1621fn normalize_tracks(value: &Value) -> Value {
1622    Value::Array(
1623        value
1624            .as_array()
1625            .into_iter()
1626            .flatten()
1627            .filter_map(normalize_track)
1628            .collect(),
1629    )
1630}
1631
1632fn normalize_media_members(value: &Value) -> Value {
1633    Value::Array(
1634        value
1635            .as_array()
1636            .into_iter()
1637            .flatten()
1638            .filter_map(|entry| {
1639                let member = normalize_member(&entry["member"])?;
1640                Some(json!({
1641                    "member": member,
1642                    "state": object_or_empty(&entry["state"]),
1643                    "tracks": normalize_tracks(&entry["tracks"]),
1644                }))
1645            })
1646            .collect(),
1647    )
1648}
1649
1650fn apply_track_to_state(state: &mut Value, track: &Value, published: bool) {
1651    if !state.is_object() {
1652        *state = json!({});
1653    }
1654
1655    let kind = match track["kind"].as_str() {
1656        Some(kind) => kind,
1657        None => return,
1658    };
1659
1660    let state_map = state.as_object_mut().expect("state object");
1661    let kind_state = state_map
1662        .entry(kind.to_string())
1663        .or_insert_with(|| json!({}));
1664    let kind_state_map = kind_state.as_object_mut().expect("kind state object");
1665    kind_state_map.insert(
1666        "published".to_string(),
1667        Value::Bool(published),
1668    );
1669    kind_state_map.insert(
1670        "muted".to_string(),
1671        Value::Bool(track["muted"].as_bool().unwrap_or(false)),
1672    );
1673
1674    if published {
1675        if let Some(track_id) = track.get("trackId") {
1676            kind_state_map.insert("trackId".to_string(), track_id.clone());
1677        }
1678        if let Some(device_id) = track.get("deviceId") {
1679            kind_state_map.insert("deviceId".to_string(), device_id.clone());
1680        }
1681        if let Some(published_at) = track.get("publishedAt") {
1682            kind_state_map.insert("publishedAt".to_string(), published_at.clone());
1683        }
1684        if let Some(admin_disabled) = track.get("adminDisabled") {
1685            kind_state_map.insert("adminDisabled".to_string(), admin_disabled.clone());
1686        }
1687    } else {
1688        kind_state_map.remove("trackId");
1689        kind_state_map.remove("publishedAt");
1690        if let Some(admin_disabled) = track.get("adminDisabled") {
1691            kind_state_map.insert("adminDisabled".to_string(), admin_disabled.clone());
1692        }
1693    }
1694}
1695
1696pub struct RoomStateNamespace {
1697    client: Arc<RoomClient>,
1698}
1699
1700impl RoomStateNamespace {
1701    fn new(client: Arc<RoomClient>) -> Self {
1702        Self { client }
1703    }
1704
1705    pub fn get_shared(&self) -> Value {
1706        self.client.get_shared_state()
1707    }
1708
1709    pub fn get_mine(&self) -> Value {
1710        self.client.get_player_state()
1711    }
1712
1713    pub fn on_shared_change(
1714        &self,
1715        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1716    ) -> Subscription {
1717        self.client.on_shared_state(handler)
1718    }
1719
1720    pub fn on_mine_change(
1721        &self,
1722        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1723    ) -> Subscription {
1724        self.client.on_player_state(handler)
1725    }
1726
1727    pub async fn send(&self, action_type: &str, payload: Option<Value>) -> Result<Value, Error> {
1728        self.client.send(action_type, payload).await
1729    }
1730}
1731
1732pub struct RoomMetaNamespace {
1733    client: Arc<RoomClient>,
1734}
1735
1736impl RoomMetaNamespace {
1737    fn new(client: Arc<RoomClient>) -> Self {
1738        Self { client }
1739    }
1740
1741    pub async fn get(&self) -> Result<Value, Error> {
1742        self.client.get_metadata().await
1743    }
1744}
1745
1746pub struct RoomSignalsNamespace {
1747    client: Arc<RoomClient>,
1748}
1749
1750impl RoomSignalsNamespace {
1751    fn new(client: Arc<RoomClient>) -> Self {
1752        Self { client }
1753    }
1754
1755    pub async fn send(
1756        &self,
1757        event: &str,
1758        payload: Option<Value>,
1759        options: Option<Value>,
1760    ) -> Result<(), Error> {
1761        self.client.send_signal(event, payload, options).await
1762    }
1763
1764    pub async fn send_to(
1765        &self,
1766        member_id: &str,
1767        event: &str,
1768        payload: Option<Value>,
1769    ) -> Result<(), Error> {
1770        self.client
1771            .send_signal(event, payload, Some(json!({ "memberId": member_id })))
1772            .await
1773    }
1774
1775    pub fn on(
1776        &self,
1777        event: &str,
1778        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1779    ) -> Subscription {
1780        self.client.on_signal(event, handler)
1781    }
1782
1783    pub fn on_any(
1784        &self,
1785        handler: impl Fn(&str, &Value, &Value) + Send + Sync + 'static,
1786    ) -> Subscription {
1787        self.client.on_any_signal(handler)
1788    }
1789}
1790
1791pub struct RoomMembersNamespace {
1792    client: Arc<RoomClient>,
1793}
1794
1795impl RoomMembersNamespace {
1796    fn new(client: Arc<RoomClient>) -> Self {
1797        Self { client }
1798    }
1799
1800    pub fn list(&self) -> Value {
1801        self.client.list_members()
1802    }
1803
1804    pub fn on_sync(
1805        &self,
1806        handler: impl Fn(&Value) + Send + Sync + 'static,
1807    ) -> Subscription {
1808        self.client.on_members_sync(handler)
1809    }
1810
1811    pub fn on_join(
1812        &self,
1813        handler: impl Fn(&Value) + Send + Sync + 'static,
1814    ) -> Subscription {
1815        self.client.on_member_join(handler)
1816    }
1817
1818    pub fn on_leave(
1819        &self,
1820        handler: impl Fn(&Value, &str) + Send + Sync + 'static,
1821    ) -> Subscription {
1822        self.client.on_member_leave(handler)
1823    }
1824
1825    pub async fn set_state(&self, state: Value) -> Result<(), Error> {
1826        self.client.send_member_state(state).await
1827    }
1828
1829    pub async fn clear_state(&self) -> Result<(), Error> {
1830        self.client.clear_member_state().await
1831    }
1832
1833    pub fn on_state_change(
1834        &self,
1835        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1836    ) -> Subscription {
1837        self.client.on_member_state_change(handler)
1838    }
1839}
1840
1841pub struct RoomAdminNamespace {
1842    client: Arc<RoomClient>,
1843}
1844
1845impl RoomAdminNamespace {
1846    fn new(client: Arc<RoomClient>) -> Self {
1847        Self { client }
1848    }
1849
1850    pub async fn kick(&self, member_id: &str) -> Result<(), Error> {
1851        self.client.send_admin("kick", member_id, None).await
1852    }
1853
1854    pub async fn mute(&self, member_id: &str) -> Result<(), Error> {
1855        self.client.send_admin("mute", member_id, None).await
1856    }
1857
1858    pub async fn block(&self, member_id: &str) -> Result<(), Error> {
1859        self.client.send_admin("block", member_id, None).await
1860    }
1861
1862    pub async fn set_role(&self, member_id: &str, role: &str) -> Result<(), Error> {
1863        self.client
1864            .send_admin("setRole", member_id, Some(json!({ "role": role })))
1865            .await
1866    }
1867
1868    pub async fn disable_video(&self, member_id: &str) -> Result<(), Error> {
1869        self.client.send_admin("disableVideo", member_id, None).await
1870    }
1871
1872    pub async fn stop_screen_share(&self, member_id: &str) -> Result<(), Error> {
1873        self.client
1874            .send_admin("stopScreenShare", member_id, None)
1875            .await
1876    }
1877}
1878
1879pub struct RoomMediaKindNamespace {
1880    client: Arc<RoomClient>,
1881    kind: &'static str,
1882}
1883
1884impl RoomMediaKindNamespace {
1885    fn new(client: Arc<RoomClient>, kind: &'static str) -> Self {
1886        Self { client, kind }
1887    }
1888
1889    pub async fn enable(&self, payload: Option<Value>) -> Result<(), Error> {
1890        self.client.send_media("publish", self.kind, payload).await
1891    }
1892
1893    pub async fn disable(&self) -> Result<(), Error> {
1894        self.client.send_media("unpublish", self.kind, None).await
1895    }
1896
1897    pub async fn set_muted(&self, muted: bool) -> Result<(), Error> {
1898        self.client
1899            .send_media("mute", self.kind, Some(json!({ "muted": muted })))
1900            .await
1901    }
1902}
1903
1904pub struct RoomScreenMediaNamespace {
1905    client: Arc<RoomClient>,
1906}
1907
1908impl RoomScreenMediaNamespace {
1909    fn new(client: Arc<RoomClient>) -> Self {
1910        Self { client }
1911    }
1912
1913    pub async fn start(&self, payload: Option<Value>) -> Result<(), Error> {
1914        self.client.send_media("publish", "screen", payload).await
1915    }
1916
1917    pub async fn stop(&self) -> Result<(), Error> {
1918        self.client.send_media("unpublish", "screen", None).await
1919    }
1920}
1921
1922pub struct RoomMediaDevicesNamespace {
1923    client: Arc<RoomClient>,
1924}
1925
1926impl RoomMediaDevicesNamespace {
1927    fn new(client: Arc<RoomClient>) -> Self {
1928        Self { client }
1929    }
1930
1931    pub async fn switch_inputs(&self, payload: Value) -> Result<(), Error> {
1932        self.client.switch_media_devices(payload).await
1933    }
1934}
1935
1936pub struct RoomMediaNamespace {
1937    client: Arc<RoomClient>,
1938}
1939
1940impl RoomMediaNamespace {
1941    fn new(client: Arc<RoomClient>) -> Self {
1942        Self { client }
1943    }
1944
1945    pub fn list(&self) -> Value {
1946        self.client.list_media_members()
1947    }
1948
1949    pub fn audio(&self) -> RoomMediaKindNamespace {
1950        RoomMediaKindNamespace::new(Arc::clone(&self.client), "audio")
1951    }
1952
1953    pub fn video(&self) -> RoomMediaKindNamespace {
1954        RoomMediaKindNamespace::new(Arc::clone(&self.client), "video")
1955    }
1956
1957    pub fn screen(&self) -> RoomScreenMediaNamespace {
1958        RoomScreenMediaNamespace::new(Arc::clone(&self.client))
1959    }
1960
1961    pub fn devices(&self) -> RoomMediaDevicesNamespace {
1962        RoomMediaDevicesNamespace::new(Arc::clone(&self.client))
1963    }
1964
1965    pub fn on_track(
1966        &self,
1967        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1968    ) -> Subscription {
1969        self.client.on_media_track(handler)
1970    }
1971
1972    pub fn on_track_removed(
1973        &self,
1974        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1975    ) -> Subscription {
1976        self.client.on_media_track_removed(handler)
1977    }
1978
1979    pub fn on_state_change(
1980        &self,
1981        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1982    ) -> Subscription {
1983        self.client.on_media_state_change(handler)
1984    }
1985
1986    pub fn on_device_change(
1987        &self,
1988        handler: impl Fn(&Value, &Value) + Send + Sync + 'static,
1989    ) -> Subscription {
1990        self.client.on_media_device_change(handler)
1991    }
1992}
1993
1994pub struct RoomSessionNamespace {
1995    client: Arc<RoomClient>,
1996}
1997
1998impl RoomSessionNamespace {
1999    fn new(client: Arc<RoomClient>) -> Self {
2000        Self { client }
2001    }
2002
2003    pub fn on_error(
2004        &self,
2005        handler: impl Fn(&str, &str) + Send + Sync + 'static,
2006    ) -> Subscription {
2007        self.client.on_error(handler)
2008    }
2009
2010    pub fn on_kicked(&self, handler: impl Fn() + Send + Sync + 'static) -> Subscription {
2011        self.client.on_kicked(handler)
2012    }
2013
2014    pub fn on_reconnect(
2015        &self,
2016        handler: impl Fn(&Value) + Send + Sync + 'static,
2017    ) -> Subscription {
2018        self.client.on_reconnect(handler)
2019    }
2020
2021    pub fn on_connection_state_change(
2022        &self,
2023        handler: impl Fn(&str) + Send + Sync + 'static,
2024    ) -> Subscription {
2025        self.client.on_connection_state_change(handler)
2026    }
2027
2028    pub fn connection_state(&self) -> String {
2029        self.client.connection_state()
2030    }
2031
2032    pub fn user_id(&self) -> Option<String> {
2033        self.client.current_user_id.lock().unwrap().clone()
2034    }
2035
2036    pub fn connection_id(&self) -> Option<String> {
2037        self.client.current_connection_id.lock().unwrap().clone()
2038    }
2039}